创建条件
在大多数情况下,应用程序将使用用户指南的条件组件章节中记录的几种提供的条件之一构建。本页说明了添加用户定义的条件以控制操作算子何时可以执行的高级用例。
在组装 C++ 应用程序时,可以使用两种类型的条件
原生 C++ 条件:在 C++ 中定义而无需使用 GXF API 的自定义条件,通过创建
holoscan::Condition
的子类。GXF 条件:通过从
holoscan::ops::GXFCondition
类继承,在底层 C++ 库中定义的条件。这些条件包装了来自 GXF 扩展的 GXF 调度项组件。示例包括CountCondition
,用于限制操作算子执行到指定的计数,以及PeriodicCondition
,用于将操作算子的执行速率限制为指定的周期。其他几个内置条件记录在条件组件章节中。
可以将 GXF 条件和原生条件的混合分配给操作算子。
原生条件
理解操作算子调度
holoscan::SchedulingStatusType
枚举定义了条件的当前状态。
条件调度状态 | 描述 |
---|---|
kNever | 操作算子将永远不会再次执行 |
kReady | 操作算子已准备好执行 |
kWait | 操作算子将来可能会执行 |
kWaitTime | 操作算子将在指定持续时间后准备好执行 |
kWaitEvent | 操作算子正在等待具有未知时间间隔的异步事件 |
操作算子的整体执行准备就绪状态将由操作算子上存在的所有单个条件的状态的 AND 组合决定。换句话说,只有当所有条件都处于 kReady
状态时,操作算子才能执行。如果任何条件处于 kNever
状态,则操作算子将永远不会再次执行。
当多个操作算子同时准备好执行时,它们的执行顺序将取决于应用程序正在使用的特定 Scheduler
。例如,GreedyScheduler
以固定的确定性顺序一次执行一个操作算子,而 EventBasedScheduler
和 MultiThreadScheduler
可以有多个工作线程,允许操作算子并行执行。
holoscan.core.SchedulingStatusType 枚举定义了条件的当前状态。
条件调度状态 | 描述 |
---|---|
NEVER | 操作算子将永远不会再次执行 |
READY | 操作算子已准备好执行 |
WAIT | 操作算子将来可能会执行 |
WAIT_TIME | 操作算子将在指定持续时间后准备好执行 |
WAIT_EVENT | 操作算子正在等待具有未知时间间隔的异步事件 |
操作算子的整体执行准备就绪状态将由操作算子上存在的所有单个条件的状态的 AND 组合决定。换句话说,只有当所有条件都处于 READY
状态时,操作算子才能执行。如果任何条件处于 NEVER
状态,则操作算子将永远不会再次执行。
当多个操作算子同时准备好执行时,它们的执行顺序将取决于应用程序正在使用的特定 Scheduler
。例如,GreedyScheduler
以固定的确定性顺序一次执行一个操作算子,而 EventBasedScheduler
和 MultiThreadScheduler
可以有多个工作线程,允许操作算子并行执行。
创建自定义条件 (C++)
在创建原生 Condition
(C++
/Python
) 时,通常需要重写以下基类组件方法
initialize
(C++
/Python
) 在应用程序的run
(C++
/Python
) 方法被调用后,在初始化期间调用一次。这可以用于设置条件定义的成员变量的任何初始状态。重要的是,此方法调用基本initialize
(C++
/Python
) 方法,然后再使用setup
(C++
/Python
) 定义的任何参数。setup
(C++
/Python
) 此方法用于配置为条件定义的任何参数。当Application
(C++
/Python
) 类的run
(C++
/Python
) 方法被调用时,将自动调用此方法。
还需要重写以下三种方法,这些方法将由底层 GXF 调度器使用。在这些方法中,check
方法是唯一一个始终需要具有非空实现的方法。
check
(C++
/Python
) 由底层 GXF 调度器调用,以检查分配了此条件的操作算子是否已准备好执行。仅当此检查将type
输出参数设置为holoscan::SchedulingStatusType::kReady
(C++) /holoscan.core.SchedulingStatusType.READY
(Python) 时,操作算子才会执行。on_execute
(C++
/Python
) 在操作算子的compute
方法 (C++
/Python
) 之后立即调用,就在任何发出的消息实际分发到下游接收器之前。update_state
(C++
/Python
) 始终在check
(C++
/Python
) 之前立即调用,并且始终将当前时间戳作为输入参数传递。这由其状态取决于当前时间戳的操作算子使用。
要在 C++ 中创建自定义条件,必须创建 Condition
(C++
/Python
) 的子类。以下示例演示了如何使用原生条件(不具有底层预编译 GXF SchedulingTerm 的条件)。
代码片段: examples/conditions/native/cpp/ping_periodic_native.cpp
列表 18 examples/conditions/native/cpp/ping_periodic_native.cpp
#include <optional>
#include "holoscan/holoscan.hpp"
#include "holoscan/operators/ping_rx/ping_rx.hpp"
#include "holoscan/operators/ping_tx/ping_tx.hpp"
namespace holoscan::conditions {
class NativePeriodicCondition : public Condition {
public:
HOLOSCAN_CONDITION_FORWARD_ARGS(NativePeriodicCondition)
NativePeriodicCondition() = default;
void initialize() override {
// call parent initialize or parameters will not be registered
Condition::initialize();
recess_period_ns_ = recess_period_.get();
};
void setup(ComponentSpec& spec) override {
spec.param(recess_period_,
"recess_period",
"Recess Period",
"Recession period in nanoseconds",
static_cast<int64_t>(0));
}
void check(int64_t timestamp, SchedulingStatusType* status_type,
int64_t* target_timestamp) const override {
if (status_type == nullptr) {
throw std::runtime_error(
fmt::format("Condition '{}' received nullptr for status_type", name()));
}
if (target_timestamp == nullptr) {
throw std::runtime_error(
fmt::format("Condition '{}' received nullptr for target_timestamp", name()));
}
if (!next_target_.has_value()) {
*status_type = SchedulingStatusType::kReady;
*target_timestamp = timestamp;
return;
}
*target_timestamp = next_target_.value();
*status_type = timestamp > *target_timestamp ? SchedulingStatusType::kReady
: SchedulingStatusType::kWaitTime;
};
void on_execute(int64_t timestamp) override {
if (next_target_.has_value()) {
next_target_ = next_target_.value() + recess_period_ns_;
} else {
next_target_ = timestamp + recess_period_ns_;
}
};
void update_state([[maybe_unused]] int64_t timestamp) override {
// no-op for this condition
};
private:
Parameter<int64_t> recess_period_;
int64_t recess_period_ns_ = 0;
std::optional<int64_t> next_target_ = std::nullopt;
};
} // namespace holoscan::conditions
class App : public holoscan::Application {
public:
void compose() override {
using namespace holoscan;
auto tx = make_operator<ops::PingTxOp>(
"tx",
make_condition<CountCondition>("count-condition", 5),
make_condition<conditions::NativePeriodicCondition>(
"dummy-condition", Arg("recess_period", static_cast<int64_t>(200'000'000))));
auto rx = make_operator<ops::PingRxOp>("rx");
add_flow(tx, rx);
}
};
int main([[maybe_unused]] int argc, char** argv) {
auto app = holoscan::make_application<App>();
app->run();
return 0;
}
代码片段: examples/conditions/native/python/ping_periodic_native.py
列表 19 examples/conditions/native/python/ping_periodic_native.py
from holoscan.conditions import CountCondition
from holoscan.core import Application, ComponentSpec, Condition, SchedulingStatusType
from holoscan.operators import PingRxOp, PingTxOp
# Now define a simple application using the operators defined above
class NativePeriodicCondition(Condition):
"""Example native Python periodic condition
This behaves like holoscan.conditions.PeriodicCondition (which wraps an
underlying C++ class). It is simplified in that it does not support a
separate `policy` kwarg.
Parameters
----------
fragment: holoscan.core.Fragment
The fragment (or Application) to which this condition will belong.
recess_period : int, optional
The time to wait before an operator can execute again (units are in
nanoseconds).
"""
def __init__(self, fragment, *args, recess_period=0, **kwargs):
self.recess_period_ns = recess_period
self.next_target = -1
super().__init__(fragment, *args, **kwargs)
# Could add a `recess_period` Parameter via `setup` like in the following
#
# def setup(self, ComponentSpec: spec):
# spec.param("recess_period", 0)
#
# and then configure that parameter in `initialize`, but for Python it is
# easier to just add parameters to `__init__` as shown above.
def setup(self, spec: ComponentSpec):
print("** native condition setup method called **")
def initialize(self):
print("** native condition initialize method called **")
def update_state(self, timestamp):
print("** native condition update_state method called **")
def check(self, timestamp):
print("** native condition check method called **")
# initially ready when the operator hasn't been called previously
if self.next_target < 0:
return (SchedulingStatusType.READY, timestamp)
# return WAIT_TIME and the timestamp if the specified `recess_period` hasn't been reached
status_type = (
SchedulingStatusType.READY
if (timestamp > self.next_target)
else SchedulingStatusType.WAIT_TIME
)
return (status_type, self.next_target)
def on_execute(self, timestamp):
print("** native condition on_execute method called **")
if self.next_target > 0:
self.next_target = self.next_target + self.recess_period_ns
else:
self.next_target = timestamp + self.recess_period_ns
class MyPingApp(Application):
def compose(self):
# Configure the operators. Here we use CountCondition to terminate
# execution after a specific number of messages have been sent.
# PeriodicCondition is used so that each subsequent message is
# sent only after a period of 200 milliseconds has elapsed.
tx = PingTxOp(
self,
CountCondition(self, 10),
NativePeriodicCondition(self, recess_period=200_000_000),
name="tx",
)
rx = PingRxOp(self, name="rx")
# Connect the operators into the workflow: tx -> rx
self.add_flow(tx, rx)
def main():
app = MyPingApp()
app.run()
if __name__ == "__main__":
main()
在此应用程序中,创建了两个操作算子:PingTxOp
(C++
/ Python
)。
tx
操作算子是一个源操作算子,每次调用时都会发出一个整数值。rx
操作算子是一个接收器操作算子,从tx
操作算子接收一个值。
通过从 Condition
(C++
/Python
) 类继承,创建了一个自定义条件 NativePeriodicCondition
。此条件是提供的 PeriodicCondition
(C++
/Python
) 的简化版本(它不实现 policy
参数,并且仅接受整数值 recess_period
)。我们仅创建此条件是为了提供一个简单的示例来说明如何创建自定义条件。
NativePeriodicCondition
的 setup
方法定义了一个名为 “recess_period” 的参数,它表示操作算子在执行后必须等待的时间量(以纳秒为单位),然后才能再次执行。
在定义 initialize
方法时,请注意我们首先调用 initialize
(C++
/Python
check
方法的实现是将 type
设置为 SchedulingStatusType::kReady
(C++) / SchedulingStatusType.READY
(Python),如果指定的周期已过去。否则,它将设置 target_timestamp
并将 type
设置为 SchedulingStatusType::kWaitTime
(C++) / SchedulingStatusType.WAIT_TIME
(Python)。在这种情况下,使用 kWaitTime
是因为我们明确知道目标时间戳是什么。请注意,对于其他类型的条件,我们可能不知道条件何时满足的具体时间。在这种不知道目标时间戳的情况下,应将状态设置为 kWait
(C++) / WAIT
(Python),并且不需要设置 target_timestamp
。还有一个 kWaitEvent
(C++) / WAIT_EVENT
(Python) 状态可以使用,但这不太常见。目前,只有内置的 AsynchronousCondition
使用此状态类型。最后,如果我们想表明操作算子将永远不再执行,我们将返回 kNever
(C++) / NEVER
(Python)(使用 never 的具体示例是 CountCondition
(C++
/Python
),它在达到指定计数后设置该状态)。
on_execute
方法将内部 next_target_
时间戳设置为传入的时间戳。由于底层 GXF 框架在 Operator::compute
之后立即调用此方法,因此这会将此条件等待的周期设置为从先前调用 compute
完成的时间开始。
此操作算子不需要 update_state
方法。此方法始终在 check
之前立即调用,有时条件会选择从 on_execute
方法调用它。它的目的是根据调用时的时间戳执行条件内部状态的某些更新。
对于 C++ API,我们可以使用 make_condition
方法构造指向条件实例的共享指针。然后可以将该条件传递给条件将应用于的操作算子的 make_operator
方法(在本例中为 PingTxOp
)。对于 Python API,我们改为直接将构造的 NativePeriodicCondition
作为位置参数传递给 tx
操作算子。
创建涉及发射器或接收器队列的自定义条件
某些条件类型取决于与操作算子的输入或输出端口对应的接收器或发射器队列的状态。这种类型的条件也可以创建为原生条件。第二个示例中给出了 MessageAvailableCondition
等效项的说明。请参阅 C++: examples/conditions/native/cpp/message_available_native.cpp,Python: examples/conditions/native/python/message_available_native.py。
设计这种基于消息的条件的主要额外考虑因素是如何检索 Receiver
(C++
/Python
) 或 Transmitter
(C++
/Python
) 对象,以便在原生 Condition 的方法中使用。
对于原生 C++ 条件,应定义类型为 Parameter<std::shared_ptr<holoscan::Receiver>> receiver_
的参数,如第 114 行和 67-71 行所示。然后可以使用查询队列大小的方法,如第 109-112 行的 check_min_size
方法所示。
代码片段: examples/conditions/native/cpp/message_available_native.cpp
列表 20 examples/conditions/native/cpp/message_available_native.cpp
class NativeMessageAvailableCondition : public Condition {
public:
HOLOSCAN_CONDITION_FORWARD_ARGS(NativeMessageAvailableCondition)
NativeMessageAvailableCondition() = default;
void initialize() override {
// call parent initialize or parameters will not be registered
Condition::initialize();
};
void setup(ComponentSpec& spec) override {
spec.param(receiver_,
"receiver",
"Receiver",
"The scheduling term permits execution if this channel has at least a given "
"number of messages available.");
spec.param(min_size_,
"min_size",
"Minimum size",
"The condition permits execution if the given receiver has at least the given "
"number of messages available",
static_cast<uint64_t>(1));
}
void check(int64_t timestamp, SchedulingStatusType* type,
int64_t* target_timestamp) const override {
if (type == nullptr) {
throw std::runtime_error(fmt::format("Condition '{}' received nullptr for type", name()));
}
if (target_timestamp == nullptr) {
throw std::runtime_error(
fmt::format("Condition '{}' received nullptr for target_timestamp", name()));
}
*type = current_state_;
*target_timestamp = last_state_change_;
};
void on_execute(int64_t timestamp) override { update_state(timestamp); };
void update_state(int64_t timestamp) override {
const bool is_ready = check_min_size();
if (is_ready && current_state_ != SchedulingStatusType::kReady) {
current_state_ = SchedulingStatusType::kReady;
last_state_change_ = timestamp;
}
if (!is_ready && current_state_ != SchedulingStatusType::kWait) {
current_state_ = SchedulingStatusType::kWait;
last_state_change_ = timestamp;
}
};
private:
bool check_min_size() {
auto recv = receiver_.get();
return recv->back_size() + recv->size() >= min_size_.get();
}
Parameter<std::shared_ptr<holoscan::Receiver>> receiver_;
Parameter<uint64_t> min_size_;
SchedulingStatusType current_state_ =
SchedulingStatusType::kWait; // The current state of the scheduling term
int64_t last_state_change_ = 0; // timestamp when the state changed the last time
};
对于原生 Python 条件,应将与感兴趣的接收器或发射器对应的输入或输出端口的名称传递给构造函数,如第 78-79 行所示。holoscan.core.Condition.receiver(或 holoscan.core.Condition.transmitter)方法然后可以用于检索与特定端口名称对应的实际 Receiver
(或 Transmitter
)对象,如第 94-98 行所示。一旦检索到该对象,就可以使用查询队列大小的方法,如 check_min_size
方法所示。
代码片段: examples/conditions/native/python/message_available_native.py
列表 21 examples/conditions/native/python/message_available_native.py
class NativeMessageAvailableCondition(Condition):
"""Example native Python periodic condition
This behaves like holoscan.conditions.MessageAvailableCondition (which
wraps an underlying C++ class). It is simplified in that it does not
support a separate `policy` kwarg.
Parameters
----------
fragment : holoscan.core.Fragment
The fragment (or Application) to which this condition will belong.
port_name : str
The name of the input port on the operator whose Receiver queue this condition will apply
to.
min_size : int, optional
The number of messages that must be present on the specified input port before the
operator is allowed to execute.
"""
def __init__(self, fragment, port_name: str, *args, min_size: int = 1, **kwargs):
self.port_name = port_name
if not isinstance(min_size, int) or min_size <= 0:
raise ValueError("min_size must be a positive integer")
self.min_size = min_size
# The current state of the scheduling term
self.current_state = SchedulingStatusType.WAIT
# timestamp when the state changed the last time
self.last_state_change_ = 0
super().__init__(fragment, *args, **kwargs)
def setup(self, spec: ComponentSpec):
print("** native condition setup method called **")
def initialize(self):
print("** native condition initialize method called **")
self.receiver_obj = self.receiver(self.port_name)
if self.receiver_obj is None:
raise RuntimeError(f"Receiver for port '{self.port_name}' not found")
def check_min_size(self):
return self.receiver_obj.back_size + self.receiver_obj.size >= self.min_size
def update_state(self, timestamp):
print("** native condition update_state method called **")
is_ready = self.check_min_size()
if is_ready and self.current_state != SchedulingStatusType.READY:
self.current_state = SchedulingStatusType.READY
self.last_state_change = timestamp
if not is_ready and self.current_state != SchedulingStatusType.WAIT:
self.current_state = SchedulingStatusType.WAIT
self.last_state_change = timestamp
def check(self, timestamp):
print("** native condition check method called **")
return self.current_state, self.last_state_change
def on_execute(self, timestamp):
print("** native condition on_execute method called **")
self.update_state(timestamp)
覆盖默认操作算子端口条件的行为
关于自定义输入和输出端口的部分解释说,当用户添加端口而未在 Operator::setup
中指定任何条件时,将添加默认条件。此默认条件是输入端口的 MessageAvailableCondition
或输出端口的 DownstreamMessageAffordableCondition
)。
如果用户已向 make_operator
(C++) 或作为位置参数向操作算子构造函数 (Python) 提供了他们自己的 Condition
,并且该条件具有与操作算子端口名称对应的 “receiver” 或 “transmitter” 参数,则不应向该端口添加默认条件。这样做是为了避免在同一端口上存在多个可能冲突的条件。但是,如果 Operator::setup
方法显式通过调用 IOSpec::condition
指定条件,则除了任何其他用户提供的条件之外,该显式条件仍将添加到端口。
为自定义 C++ 条件创建 Python 绑定
要将自定义 C++ 条件公开给 Python,可以使用 pybind11 包装它,就像任何其他 Condition
类一样。有关多个示例,请参阅 内置条件的绑定。绑定自定义条件与该文件夹中示例的唯一区别在于,自定义条件应在传递给 py::class_
调用的类列表中使用 holoscan::Condition
而不是 holoscan::gxf::GXFCondition
。