条件
下表显示了运算符调度状态的各种状态
调度状态 | 描述 |
---|---|
NEVER | 运算符将不再执行 |
READY | 运算符已准备好执行 |
WAIT | 运算符将来可能会执行 |
WAIT_TIME | 运算符将在指定持续时间后准备好执行 |
WAIT_EVENT | 运算符正在等待时间间隔未知的异步事件 |
任何单个运算符执行失败都会停止所有运算符的执行。
当运算符的调度状态达到
NEVER
状态时,它们自然会从执行中取消调度。
默认情况下,运算符始终处于 READY
状态,这意味着它们被调度为持续执行其 compute()
方法。要更改此行为,可以将一些条件类分配给运算符。Holoscan SDK 当前支持各种条件
MessageAvailableCondition(消息可用条件)
ExpiringMessageAvailableCondition(过期消息可用条件)
MultiMessageAvailableCondition(多消息可用条件)
MultiMessageAvailableTimeoutCondition(多消息可用超时条件)
DownstreamMessageAffordableCondition(下游消息可承受条件)
CountCondition(计数条件)
BooleanCondition(布尔条件)
PeriodicCondition(周期性条件)
AsynchronousCondition(异步条件)
这些条件属于以下详述的各种类型。通常,条件由应用程序作者显式添加到运算符,但也应注意,除非默认设置被覆盖,否则会自动为运算符的每个输入端口添加 MessageAvailableCondition
,并自动为其每个输出端口添加 DownstreamMessageAffordableCondition
。
条件是 AND 组合的
一个运算符可以与多个条件关联,这些条件定义其执行行为。条件是 AND 组合的,以描述运算符的当前状态。为了使运算符被调度程序执行,所有条件都必须处于 READY
状态,反之,当任何一个调度项达到 NEVER
状态时,运算符将从执行中取消调度。AND 组合期间各种状态的优先级顺序为 NEVER
、WAIT_EVENT
、WAIT
、WAIT_TIME
和 READY
。
下表粗略地分类了可用的条件类型,以帮助更好地理解其用途以及如何分配它们。以下各节提供了各个条件的更详细描述。
条件名称 | 分类 | 关联对象 |
---|---|---|
MessageAvailableCondition(消息可用条件) | 消息驱动 | 单个输入端口 |
ExpiringMessageAvailableCondition(过期消息可用条件) | 消息驱动 | 单个输入端口 |
MultiMessageAffordableCondition | 消息驱动 | 多个输入端口 |
MultiMessageAffordableTimeoutCondition | 消息驱动 | 单个或多个输入端口 |
DownstreamMessageAffordableCondition(下游消息可承受条件) | 消息驱动 | 单个输出端口 |
PeriodicCondition(周期性条件) | 时钟驱动 | 整个运算符 |
CountCondition(计数条件) | 其他 | 整个运算符 |
BooleanCondition(布尔条件) | 执行驱动 | 整个运算符 |
AsynchronousCondition(异步条件) | 执行驱动 | 整个运算符 |
CudaStreamCondition | 消息驱动(CUDA 同步) | 单个输入端口 |
CudaEventCondition | 消息驱动(CUDA 同步) | 单个输入端口 |
CudaBufferAvailableCondition | 消息驱动(CUDA 同步) | 单个输入端口 |
在这里,各种消息驱动条件与输入端口(接收器)或输出端口(发送器)相关联。与单个输入端口关联的消息驱动条件通常通过 IOSpec::condition
方法(C++
/Python
)方法分配,该方法从运算符的 setup
方法(C++
/Python
)调用。与多个输入端口关联的条件将通过 OperatorSpec::multi_port_condition
方法(C++
/Python
)分配,该方法从运算符的 setup
方法(C++
/Python
)调用。
所有其他条件类型通常在应用程序的 compose
方法中(即传递给 C++ 中的 make_operator()()
或 Python 中的运算符类的构造函数)作为位置参数或关键字参数传递。一旦分配了这些条件,它们将自动强制执行与该发送器/接收器关联的条件,作为控制运算符是否调用 compute
的条件的一部分。由于上面讨论的条件的 AND 组合,所有端口都必须满足其关联的条件,运算符才能调用 compute
。
从 Holoscan v2.8 开始,还可以添加一个基于消息的条件,该条件将 “receiver” 或 “transmitter” 参数作为位置参数传递给 Fragment::make_operator
(C++) 或运算符的构造函数 (Python)。条件的任何 “receiver” 或 “transmitter” 参数都应通过字符串值参数指定,该参数采用条件将应用到的端口的名称。然后,SDK 将负责在应用程序运行时自动换入命名端口使用的实际底层 Receiver
或 Transmitter
对象。作为一个具体的例子,如果运算符的 setup
方法在输入端口上设置了 ConditionType::kNone
(C++) 条件,但我们想要添加 MessageAvailableCondition
而不修改该 setup 方法。这可以通过以下方式完成
// assuming that an operator has an input port named "in1" we could explicitly create a condition for this port via
auto in1_condition = make_condition<MessageAvailableCondition>("in1_condition",
Arg("min_size_", static_cast<uint64_t>(1)),
Arg("receiver", "in1"));
// then `in1_condition` can be passed as an argument to the `Fragment::make_operator` call for the operator
或者等效地,在 Python 中
# assuming that an operator has an input port named "in1" we could explicitly create a condition for this port via
in1_condition = MessageAvailableCondition(fragment, name="in1_condition", min_size=1, receiver="in");
# then in1_condition can be passed as a positional argument to the operator's constructor
PeriodicCondition
是时钟驱动的。它根据与其关联的时钟的计时自动生效。CountCondition
是另一种自动生效的条件类型,它在达到指定计数后停止运算符的执行。
相比之下,标记为执行驱动的条件需要应用程序或运算符线程显式触发条件的变化。例如,内置的 HolovizOp
运算符的 compute
方法实现了逻辑,以在用户关闭显示窗口时更新关联的 BooleanCondition
以禁用运算符。类似地,AsynchronousCondition
需要某个线程发出事件以触发其状态的更新。
与 MessageAvailableCondition
(C++
/Python
) 关联的运算符在输入端口的关联队列至少有一定数量的元素时执行。此条件通过 OperatorSpec 的 input()
方法的返回值 (IOSpec) 上的 condition()
方法与运算符的特定输入端口关联。
允许运算符执行的最小消息数由 min_size
参数指定(默认值:1
)。此条件的可选参数是 front_stage_max_size
,即最大前台消息计数。如果设置了此参数,则仅当队列中的消息数不超过此计数时,条件才允许执行。它可用于不消耗队列中所有消息的运算符。
与 ExpiringMessageAvailableCondition
(C++
/Python
) 关联的运算符在关联队列中收到的第一条消息即将过期或队列中有足够的消息时执行。此条件通过 OperatorSpec 的 input()
或 output()
方法的返回值 (IOSpec) 上的 condition()
方法与运算符的特定输入或输出端口关联。
参数 max_batch_size
和 max_delay_ns
分别指示要批量处理的最大消息数以及从第一条消息开始等待的最大延迟,然后才执行实体。请注意,ExpiringMessageAvailableCondition
要求发送到使用此条件的任何端口的输入消息必须包含时间戳。这意味着上游运算符必须使用时间戳发出消息。
要获得类似的功能而无需时间戳,可以使用下面描述的 MultiMessageAvailableTimeoutCondition
,并且仅分配单个输入端口。定时计算的差异在于 MultiMessageAvailableTimeOutCondition
测量运算符上次调用 compute
之间的时间,而 ExpiringMessageAvailableCondition
则基于自消息到达运算符输入队列以来经过的时间。
DownstreamMessageAffordableCondition
(C++
/Python
) 条件指定,如果给定输出端口的下游运算符的输入端口可以接受新消息,则应执行运算符。此条件通过 OperatorSpec 的 output()
方法的返回值 (IOSpec) 上的 condition()
方法与运算符的特定输出端口关联。允许运算符执行的最小消息数由 min_size
参数指定(默认值:1
)。
与 MultiMessageAvailableCondition
(C++
/Python
) 关联的运算符在用户指定的多个输入端口的关联队列具有所需数量的元素时执行。
此条件通过 OperatorSpec 上的 multi_port_condition()
方法与运算符的多个输入端口关联。multi_port_condition
的 port_names
参数控制哪些输入端口与此条件关联。
此条件有两种操作模式。第一种模式是 MultiMessageAvailableCondition::SamplingMode::SumOfAll
(C++) 或 holoscan.conditions.MultiMessageAvailableCondition.SamplingMode.SUM_OF_ALL
(Python)。在此模式下,min_sum
参数用于指定必须在 port_names
中包含的所有端口上收到的消息总数,运算符才能执行。第二种可用模式是 MultiMessageAvailableCondition::SamplingMode::PerReceiver
(C++) 或 holoscan.conditions.MultiMessageAvailableCondition.SamplingMode.PER_RECEIVER
(Python)。此模式改为采用与 port_names
长度相等的 min_sizes
的向量/列表。这控制了每个单独端口必须到达的消息数,运算符才能执行。后一种 “per-receiver” 模式等效于在每个输入端口中单独设置 MessageAvailableCondition
。
与 CountCondition
(C++
/Python
) 关联的运算符会执行特定次数,次数由其 count
参数指定。与此条件关联的运算符的调度状态可以处于 READY
或 NEVER
状态。当运算符已执行 count
次时,调度状态将达到 NEVER
状态。count
参数可以设置为负值,以指示运算符应无限次执行(默认值:1
)。
与 BooleanCondition
(C++
/Python
) 关联的运算符在关联的布尔变量设置为 true
时执行。布尔变量通过调用 BooleanCondition
对象上的 enable_tick()
/disable_tick()
方法设置为 true
/false
。check_tick_enabled()
方法可用于检查布尔变量是否设置为 true
/false
。与此条件关联的运算符的调度状态可以处于 READY
或 NEVER
状态。如果布尔变量设置为 true
,则与此条件关联的运算符的调度状态设置为 READY
。如果布尔变量设置为 false
,则与此条件关联的运算符的调度状态设置为 NEVER
。enable_tick()
/disable_tick()
方法可以从工作流程中的任何运算符调用。
void compute(InputContext&, OutputContext& op_output, ExecutionContext&) override {
// ...
if (<condition expression>) { // e.g. if (index_ >= 10)
auto my_bool_condition = condition<BooleanCondition>("my_bool_condition");
if (my_bool_condition) { // if condition exists (not true or false)
my_bool_condition->disable_tick(); // this will stop the operator
}
}
// ...
}
def compute(self, op_input, op_output, context):
# ...
if <condition expression>: # e.g, self.index >= 10
my_bool_condition = self.conditions.get("my_bool_condition")
if my_bool_condition: # if condition exists (not true or false)
my_bool_condition.disable_tick() # this will stop the operator
# ...
与 PeriodicCondition
(C++
/Python
) 关联的运算符在由其 recess_period
参数指定的周期性时间间隔后执行。与此条件关联的运算符的调度状态可以处于 READY
或 WAIT_TIME
状态。首次或在周期性时间间隔后,与此条件关联的运算符的调度状态设置为 READY
,并且运算符被执行。运算符执行后,调度状态设置为 WAIT_TIME
,并且在 recess_period
时间间隔之前,运算符不会执行。
AsynchronousCondition
(C++
/Python
) 主要与处理调度程序执行的常规执行之外发生的异步事件的运算符相关联。由于这些事件本质上是非周期性的,因此 AsynchronousCondition
阻止调度程序定期轮询运算符的状态并降低 CPU 利用率。与此条件关联的运算符的调度状态可以根据其等待的异步事件处于 READY
、WAIT
、WAIT_EVENT
或 NEVER
状态。
异步事件的状态使用 AsynchronousEventState
描述,并使用 event_state()
API 更新。
AsynchronousEventState(异步事件状态) | 描述 |
---|---|
READY | Init 状态,compute() 方法的首次执行挂起 |
WAIT | 请求异步服务尚未发送,无事可做,只能等待 |
EVENT_WAITING(事件等待中) | 已向异步服务发送请求,等待事件完成通知 |
EVENT_DONE(事件完成) | 已收到事件完成通知,运算符已准备好被勾选 |
EVENT_NEVER(事件永不发生) | 运算符不想再次执行,执行结束 |
与此调度项关联的运算符很可能具有一个异步线程,该线程可以在调度程序执行的常规执行周期之外更新条件的状态。当异步事件状态处于 WAIT
状态时,调度程序会定期轮询运算符的调度状态。当异步事件状态处于 EVENT_WAITING
状态时,调度程序在收到事件通知之前不会再次检查运算符的调度状态。将异步事件的状态设置为 EVENT_DONE
会自动向调度程序发送事件通知。运算符可以使用 EVENT_NEVER
状态来指示其执行周期的结束。对于所有条件类型,该条件类型可以与任何调度程序一起使用。
此条件可用于要求在输入流上的工作完成之后,运算符才能准备好进行调度。当消息发送到已分配 CudaStreamCondition
的端口时,此条件会在在此输入端口上找到的 CUDA 流上设置内部主机回调函数。回调函数将在流上的其他工作完成后将运算符的状态设置为 READY。这将允许调度程序执行运算符。
CudaStreamCondition
的一个限制是,它仅在输入端口队列中的第一条消息中查找流。它目前不支持处理在同一消息(实体)内或跨队列中的多个消息具有多个不同输入流组件的端口。CudaStreamCondition
的行为足以满足 Holoscan 的默认队列大小(大小为 1),并与 receive_cuda_stream
一起使用,后者仅在上游运算符的传出消息中放置单个 CUDA 流组件。不适用它的情况包括
输入端口的队列大小被显式设置为 容量大于 1,并且不知道队列中的所有消息都对应于同一 CUDA 流。
输入端口是多接收器端口(即
IOSpec::kAnySize
),任何数量的上游运算符都可以连接到该端口。
在输入消息中未找到流的情况下,此条件将允许运算符执行。
此条件不适用于 Holoscan 应用程序中的常规用途,因为 Holoscan 未提供与 GXF 的 nvidia::gxf:CudaEvent
类型相关的任何 API。提供此条件纯粹是为了允许编写一个运算符,该运算符可以与另一个包装 GXF 代码段的运算符互操作,该代码段在其发出的输出消息中包含 CudaEvent
组件。它检查输入队列的第一条消息中是否包含具有指定 event_name
的 CudaEvent
。然后,只有当对相应事件的 cudaEventQuery
指示它已准备就绪时,它才允许执行运算符。
此条件不适用于 Holoscan 应用程序中的常规用途,因为 Holoscan 未提供与 GXF 的 nvidia::gxf:CudaBuffer
类型相关的任何 API。提供此条件纯粹是为了允许编写一个运算符,该运算符可以与另一个包装 GXF 代码段的运算符互操作,该代码段在其发出的输出消息中包含 CudaBuffer
组件。它检查输入队列的第一条消息中是否包含 CudaBuffer
组件,并且仅在该缓冲区状态为 CudaBuffer::State::DATA_AVAILABLE
时才允许执行运算符。