NVIDIA Holoscan SDK v2.9.0

创建条件

提示

在大多数情况下,应用程序将使用用户指南的条件组件章节中记录的几种提供的条件之一构建。本页说明了添加用户定义的条件以控制操作算子何时可以执行的高级用例。

在组装 C++ 应用程序时,可以使用两种类型的条件

  1. 原生 C++ 条件:在 C++ 中定义而无需使用 GXF API 的自定义条件,通过创建 holoscan::Condition 的子类。

  2. GXF 条件:通过从 holoscan::ops::GXFCondition 类继承,在底层 C++ 库中定义的条件。这些条件包装了来自 GXF 扩展的 GXF 调度项组件。示例包括 CountCondition,用于限制操作算子执行到指定的计数,以及 PeriodicCondition,用于将操作算子的执行速率限制为指定的周期。其他几个内置条件记录在条件组件章节中。

注意

可以将 GXF 条件和原生条件的混合分配给操作算子。

原生条件

理解操作算子调度

holoscan::SchedulingStatusType 枚举定义了条件的当前状态。

条件调度状态

描述

kNever 操作算子将永远不会再次执行
kReady 操作算子已准备好执行
kWait 操作算子将来可能会执行
kWaitTime 操作算子将在指定持续时间后准备好执行
kWaitEvent 操作算子正在等待具有未知时间间隔的异步事件

操作算子的整体执行准备就绪状态将由操作算子上存在的所有单个条件的状态的 AND 组合决定。换句话说,只有当所有条件都处于 kReady 状态时,操作算子才能执行。如果任何条件处于 kNever 状态,则操作算子将永远不会再次执行。

当多个操作算子同时准备好执行时,它们的执行顺序将取决于应用程序正在使用的特定 Scheduler。例如,GreedyScheduler 以固定的确定性顺序一次执行一个操作算子,而 EventBasedSchedulerMultiThreadScheduler 可以有多个工作线程,允许操作算子并行执行。

holoscan.core.SchedulingStatusType 枚举定义了条件的当前状态。

条件调度状态

描述

NEVER 操作算子将永远不会再次执行
READY 操作算子已准备好执行
WAIT 操作算子将来可能会执行
WAIT_TIME 操作算子将在指定持续时间后准备好执行
WAIT_EVENT 操作算子正在等待具有未知时间间隔的异步事件

操作算子的整体执行准备就绪状态将由操作算子上存在的所有单个条件的状态的 AND 组合决定。换句话说,只有当所有条件都处于 READY 状态时,操作算子才能执行。如果任何条件处于 NEVER 状态,则操作算子将永远不会再次执行。

当多个操作算子同时准备好执行时,它们的执行顺序将取决于应用程序正在使用的特定 Scheduler。例如,GreedyScheduler 以固定的确定性顺序一次执行一个操作算子,而 EventBasedSchedulerMultiThreadScheduler 可以有多个工作线程,允许操作算子并行执行。

创建自定义条件 (C++)

在创建原生 Condition (C++/Python) 时,通常需要重写以下基类组件方法

  • initialize (C++/Python) 在应用程序的 run (C++/Python) 方法被调用后,在初始化期间调用一次。这可以用于设置条件定义的成员变量的任何初始状态。重要的是,此方法调用基本 initialize (C++/Python) 方法,然后再使用 setup (C++/Python) 定义的任何参数。

  • setup (C++/Python) 此方法用于配置为条件定义的任何参数。当 Application (C++/Python) 类的 run (C++/Python) 方法被调用时,将自动调用此方法。

还需要重写以下三种方法,这些方法将由底层 GXF 调度器使用。在这些方法中,check 方法是唯一一个始终需要具有非空实现的方法。

  • check (C++/Python) 由底层 GXF 调度器调用,以检查分配了此条件的操作算子是否已准备好执行。仅当此检查将 type 输出参数设置为 holoscan::SchedulingStatusType::kReady (C++) / holoscan.core.SchedulingStatusType.READY (Python) 时,操作算子才会执行。

  • on_execute (C++/Python) 在操作算子的 compute 方法 (C++/Python) 之后立即调用,就在任何发出的消息实际分发到下游接收器之前。

  • update_state (C++/Python) 始终在 check (C++/Python) 之前立即调用,并且始终将当前时间戳作为输入参数传递。这由其状态取决于当前时间戳的操作算子使用。

要在 C++ 中创建自定义条件,必须创建 Condition (C++/Python) 的子类。以下示例演示了如何使用原生条件(不具有底层预编译 GXF SchedulingTerm 的条件)。

代码片段: examples/conditions/native/cpp/ping_periodic_native.cpp

列表 18 examples/conditions/native/cpp/ping_periodic_native.cpp

复制
已复制!
            

#include <optional> #include "holoscan/holoscan.hpp" #include "holoscan/operators/ping_rx/ping_rx.hpp" #include "holoscan/operators/ping_tx/ping_tx.hpp" namespace holoscan::conditions { class NativePeriodicCondition : public Condition { public: HOLOSCAN_CONDITION_FORWARD_ARGS(NativePeriodicCondition) NativePeriodicCondition() = default; void initialize() override { // call parent initialize or parameters will not be registered Condition::initialize(); recess_period_ns_ = recess_period_.get(); }; void setup(ComponentSpec& spec) override { spec.param(recess_period_, "recess_period", "Recess Period", "Recession period in nanoseconds", static_cast<int64_t>(0)); } void check(int64_t timestamp, SchedulingStatusType* status_type, int64_t* target_timestamp) const override { if (status_type == nullptr) { throw std::runtime_error( fmt::format("Condition '{}' received nullptr for status_type", name())); } if (target_timestamp == nullptr) { throw std::runtime_error( fmt::format("Condition '{}' received nullptr for target_timestamp", name())); } if (!next_target_.has_value()) { *status_type = SchedulingStatusType::kReady; *target_timestamp = timestamp; return; } *target_timestamp = next_target_.value(); *status_type = timestamp > *target_timestamp ? SchedulingStatusType::kReady : SchedulingStatusType::kWaitTime; }; void on_execute(int64_t timestamp) override { if (next_target_.has_value()) { next_target_ = next_target_.value() + recess_period_ns_; } else { next_target_ = timestamp + recess_period_ns_; } }; void update_state([[maybe_unused]] int64_t timestamp) override { // no-op for this condition }; private: Parameter<int64_t> recess_period_; int64_t recess_period_ns_ = 0; std::optional<int64_t> next_target_ = std::nullopt; }; } // namespace holoscan::conditions class App : public holoscan::Application { public: void compose() override { using namespace holoscan; auto tx = make_operator<ops::PingTxOp>( "tx", make_condition<CountCondition>("count-condition", 5), make_condition<conditions::NativePeriodicCondition>( "dummy-condition", Arg("recess_period", static_cast<int64_t>(200'000'000)))); auto rx = make_operator<ops::PingRxOp>("rx"); add_flow(tx, rx); } }; int main([[maybe_unused]] int argc, char** argv) { auto app = holoscan::make_application<App>(); app->run(); return 0; }


代码片段: examples/conditions/native/python/ping_periodic_native.py

列表 19 examples/conditions/native/python/ping_periodic_native.py

复制
已复制!
            

from holoscan.conditions import CountCondition from holoscan.core import Application, ComponentSpec, Condition, SchedulingStatusType from holoscan.operators import PingRxOp, PingTxOp # Now define a simple application using the operators defined above class NativePeriodicCondition(Condition): """Example native Python periodic condition This behaves like holoscan.conditions.PeriodicCondition (which wraps an underlying C++ class). It is simplified in that it does not support a separate `policy` kwarg. Parameters ---------- fragment: holoscan.core.Fragment The fragment (or Application) to which this condition will belong. recess_period : int, optional The time to wait before an operator can execute again (units are in nanoseconds). """ def __init__(self, fragment, *args, recess_period=0, **kwargs): self.recess_period_ns = recess_period self.next_target = -1 super().__init__(fragment, *args, **kwargs) # Could add a `recess_period` Parameter via `setup` like in the following # # def setup(self, ComponentSpec: spec): # spec.param("recess_period", 0) # # and then configure that parameter in `initialize`, but for Python it is # easier to just add parameters to `__init__` as shown above. def setup(self, spec: ComponentSpec): print("** native condition setup method called **") def initialize(self): print("** native condition initialize method called **") def update_state(self, timestamp): print("** native condition update_state method called **") def check(self, timestamp): print("** native condition check method called **") # initially ready when the operator hasn't been called previously if self.next_target < 0: return (SchedulingStatusType.READY, timestamp) # return WAIT_TIME and the timestamp if the specified `recess_period` hasn't been reached status_type = ( SchedulingStatusType.READY if (timestamp > self.next_target) else SchedulingStatusType.WAIT_TIME ) return (status_type, self.next_target) def on_execute(self, timestamp): print("** native condition on_execute method called **") if self.next_target > 0: self.next_target = self.next_target + self.recess_period_ns else: self.next_target = timestamp + self.recess_period_ns class MyPingApp(Application): def compose(self): # Configure the operators. Here we use CountCondition to terminate # execution after a specific number of messages have been sent. # PeriodicCondition is used so that each subsequent message is # sent only after a period of 200 milliseconds has elapsed. tx = PingTxOp( self, CountCondition(self, 10), NativePeriodicCondition(self, recess_period=200_000_000), name="tx", ) rx = PingRxOp(self, name="rx") # Connect the operators into the workflow: tx -> rx self.add_flow(tx, rx) def main(): app = MyPingApp() app.run() if __name__ == "__main__": main()


在此应用程序中,创建了两个操作算子:PingTxOp (C++ / Python)。

  1. tx 操作算子是一个源操作算子,每次调用时都会发出一个整数值。

  2. rx 操作算子是一个接收器操作算子,从 tx 操作算子接收一个值。

通过从 Condition (C++/Python) 类继承,创建了一个自定义条件 NativePeriodicCondition。此条件是提供的 PeriodicCondition (C++/Python) 的简化版本(它不实现 policy 参数,并且仅接受整数值 recess_period)。我们仅创建此条件是为了提供一个简单的示例来说明如何创建自定义条件。

NativePeriodicConditionsetup 方法定义了一个名为 “recess_period” 的参数,它表示操作算子在执行后必须等待的时间量(以纳秒为单位),然后才能再次执行。

在定义 initialize 方法时,请注意我们首先调用 initialize (C++/Python ),以便我们可以获取内置 “recess_period” 参数的值。然后,此 initialize 方法设置此操作算子的私有成员变量的初始状态。

check 方法的实现是将 type 设置为 SchedulingStatusType::kReady (C++) / SchedulingStatusType.READY (Python),如果指定的周期已过去。否则,它将设置 target_timestamp 并将 type 设置为 SchedulingStatusType::kWaitTime (C++) / SchedulingStatusType.WAIT_TIME (Python)。在这种情况下,使用 kWaitTime 是因为我们明确知道目标时间戳是什么。请注意,对于其他类型的条件,我们可能不知道条件何时满足的具体时间。在这种不知道目标时间戳的情况下,应将状态设置为 kWait (C++) / WAIT (Python),并且不需要设置 target_timestamp。还有一个 kWaitEvent (C++) / WAIT_EVENT (Python) 状态可以使用,但这不太常见。目前,只有内置的 AsynchronousCondition 使用此状态类型。最后,如果我们想表明操作算子将永远不再执行,我们将返回 kNever (C++) / NEVER (Python)(使用 never 的具体示例是 CountCondition (C++/Python),它在达到指定计数后设置该状态)。

on_execute 方法将内部 next_target_ 时间戳设置为传入的时间戳。由于底层 GXF 框架在 Operator::compute 之后立即调用此方法,因此这会将此条件等待的周期设置为从先前调用 compute 完成的时间开始。

此操作算子不需要 update_state 方法。此方法始终在 check 之前立即调用,有时条件会选择从 on_execute 方法调用它。它的目的是根据调用时的时间戳执行条件内部状态的某些更新。

对于 C++ API,我们可以使用 make_condition 方法构造指向条件实例的共享指针。然后可以将该条件传递给条件将应用于的操作算子的 make_operator 方法(在本例中为 PingTxOp)。对于 Python API,我们改为直接将构造的 NativePeriodicCondition 作为位置参数传递给 tx 操作算子。

创建涉及发射器或接收器队列的自定义条件

某些条件类型取决于与操作算子的输入或输出端口对应的接收器或发射器队列的状态。这种类型的条件也可以创建为原生条件。第二个示例中给出了 MessageAvailableCondition 等效项的说明。请参阅 C++: examples/conditions/native/cpp/message_available_native.cppPython: examples/conditions/native/python/message_available_native.py

设计这种基于消息的条件的主要额外考虑因素是如何检索 Receiver (C++/Python) 或 Transmitter (C++/Python) 对象,以便在原生 Condition 的方法中使用。

对于原生 C++ 条件,应定义类型为 Parameter<std::shared_ptr<holoscan::Receiver>> receiver_ 的参数,如第 114 行和 67-71 行所示。然后可以使用查询队列大小的方法,如第 109-112 行的 check_min_size 方法所示。

代码片段: examples/conditions/native/cpp/message_available_native.cpp

列表 20 examples/conditions/native/cpp/message_available_native.cpp

复制
已复制!
            

class NativeMessageAvailableCondition : public Condition { public: HOLOSCAN_CONDITION_FORWARD_ARGS(NativeMessageAvailableCondition) NativeMessageAvailableCondition() = default; void initialize() override { // call parent initialize or parameters will not be registered Condition::initialize(); }; void setup(ComponentSpec& spec) override { spec.param(receiver_, "receiver", "Receiver", "The scheduling term permits execution if this channel has at least a given " "number of messages available."); spec.param(min_size_, "min_size", "Minimum size", "The condition permits execution if the given receiver has at least the given " "number of messages available", static_cast<uint64_t>(1)); } void check(int64_t timestamp, SchedulingStatusType* type, int64_t* target_timestamp) const override { if (type == nullptr) { throw std::runtime_error(fmt::format("Condition '{}' received nullptr for type", name())); } if (target_timestamp == nullptr) { throw std::runtime_error( fmt::format("Condition '{}' received nullptr for target_timestamp", name())); } *type = current_state_; *target_timestamp = last_state_change_; }; void on_execute(int64_t timestamp) override { update_state(timestamp); }; void update_state(int64_t timestamp) override { const bool is_ready = check_min_size(); if (is_ready && current_state_ != SchedulingStatusType::kReady) { current_state_ = SchedulingStatusType::kReady; last_state_change_ = timestamp; } if (!is_ready && current_state_ != SchedulingStatusType::kWait) { current_state_ = SchedulingStatusType::kWait; last_state_change_ = timestamp; } }; private: bool check_min_size() { auto recv = receiver_.get(); return recv->back_size() + recv->size() >= min_size_.get(); } Parameter<std::shared_ptr<holoscan::Receiver>> receiver_; Parameter<uint64_t> min_size_; SchedulingStatusType current_state_ = SchedulingStatusType::kWait; // The current state of the scheduling term int64_t last_state_change_ = 0; // timestamp when the state changed the last time };


对于原生 Python 条件,应将与感兴趣的接收器或发射器对应的输入或输出端口的名称传递给构造函数,如第 78-79 行所示。holoscan.core.Condition.receiver(或 holoscan.core.Condition.transmitter)方法然后可以用于检索与特定端口名称对应的实际 Receiver(或 Transmitter)对象,如第 94-98 行所示。一旦检索到该对象,就可以使用查询队列大小的方法,如 check_min_size 方法所示。

代码片段: examples/conditions/native/python/message_available_native.py

列表 21 examples/conditions/native/python/message_available_native.py

复制
已复制!
            

class NativeMessageAvailableCondition(Condition): """Example native Python periodic condition This behaves like holoscan.conditions.MessageAvailableCondition (which wraps an underlying C++ class). It is simplified in that it does not support a separate `policy` kwarg. Parameters ---------- fragment : holoscan.core.Fragment The fragment (or Application) to which this condition will belong. port_name : str The name of the input port on the operator whose Receiver queue this condition will apply to. min_size : int, optional The number of messages that must be present on the specified input port before the operator is allowed to execute. """ def __init__(self, fragment, port_name: str, *args, min_size: int = 1, **kwargs): self.port_name = port_name if not isinstance(min_size, int) or min_size <= 0: raise ValueError("min_size must be a positive integer") self.min_size = min_size # The current state of the scheduling term self.current_state = SchedulingStatusType.WAIT # timestamp when the state changed the last time self.last_state_change_ = 0 super().__init__(fragment, *args, **kwargs) def setup(self, spec: ComponentSpec): print("** native condition setup method called **") def initialize(self): print("** native condition initialize method called **") self.receiver_obj = self.receiver(self.port_name) if self.receiver_obj is None: raise RuntimeError(f"Receiver for port '{self.port_name}' not found") def check_min_size(self): return self.receiver_obj.back_size + self.receiver_obj.size >= self.min_size def update_state(self, timestamp): print("** native condition update_state method called **") is_ready = self.check_min_size() if is_ready and self.current_state != SchedulingStatusType.READY: self.current_state = SchedulingStatusType.READY self.last_state_change = timestamp if not is_ready and self.current_state != SchedulingStatusType.WAIT: self.current_state = SchedulingStatusType.WAIT self.last_state_change = timestamp def check(self, timestamp): print("** native condition check method called **") return self.current_state, self.last_state_change def on_execute(self, timestamp): print("** native condition on_execute method called **") self.update_state(timestamp)


覆盖默认操作算子端口条件的行为

关于自定义输入和输出端口的部分解释说,当用户添加端口而未在 Operator::setup 中指定任何条件时,将添加默认条件。此默认条件是输入端口的 MessageAvailableCondition 或输出端口的 DownstreamMessageAffordableCondition)。

如果用户已向 make_operator (C++) 或作为位置参数向操作算子构造函数 (Python) 提供了他们自己的 Condition,并且该条件具有与操作算子端口名称对应的 “receiver” 或 “transmitter” 参数,则不应向该端口添加默认条件。这样做是为了避免在同一端口上存在多个可能冲突的条件。但是,如果 Operator::setup 方法显式通过调用 IOSpec::condition 指定条件,则除了任何其他用户提供的条件之外,该显式条件仍将添加到端口。

为自定义 C++ 条件创建 Python 绑定

要将自定义 C++ 条件公开给 Python,可以使用 pybind11 包装它,就像任何其他 Condition 类一样。有关多个示例,请参阅 内置条件的绑定。绑定自定义条件与该文件夹中示例的唯一区别在于,自定义条件应在传递给 py::class_ 调用的类列表中使用 holoscan::Condition 而不是 holoscan::gxf::GXFCondition

上一篇 创建操作算子
下一篇 Holoscan 应用程序中的 CUDA 流处理
© 版权所有 2022-2024, NVIDIA。 最后更新于 2025 年 1 月 27 日。