NVIDIA Holoscan SDK v2.9.0

创建算子

提示

ping_custom_op 示例中也演示了如何创建自定义算子。

在组装 C++ 应用程序时,可以使用两种类型的算子

  1. 原生 C++ 算子:在 C++ 中定义的自定义算子,不使用 GXF API,通过创建 holoscan::Operator 的子类来实现。这些 C++ 算子可以在算子之间传递任意 C++ 对象。

  2. GXF 算子:通过从 holoscan::ops::GXFOperator 类继承,在底层 C++ 库中定义的算子。这些算子包装了来自 GXF 扩展的 GXF 代码小组件。例如,VideoStreamReplayerOp 用于重放视频文件,FormatConverterOp 用于格式转换,以及 HolovizOp 用于可视化。

注意

可以使用 GXF 算子和原生算子的混合来创建应用程序。在这种情况下,必须特别注意适当地转换输入和输出张量,如下面的章节所示。

原生 C++ 算子

算子生命周期 (C++)

holoscan::Operator 的生命周期由三个阶段组成

  • start() 在算子启动时调用一次,用于初始化繁重的任务,例如分配内存资源和使用参数。

  • compute() 在算子被触发时调用,这可以在算子生命周期中的 start()stop() 之间发生任意次数。

  • stop() 在算子停止时调用一次,用于反初始化繁重的任务,例如释放先前在 start() 中分配的资源。

工作流程上的所有算子都安排执行。当一个算子首次执行时,将调用 start() 方法,然后调用 compute() 方法。当算子停止时,将调用 stop() 方法。compute() 方法在 start()stop() 之间被多次调用。

如果由 条件 指定的任何调度条件未满足(例如,如果算子已执行一定次数,则 CountCondition 会导致调度条件不满足),则算子停止并调用 stop() 方法。

我们将在用户指南的 指定算子输入和输出 (C++) 部分介绍如何使用 条件

通常,start()stop() 函数在应用程序的生命周期中仅调用一次。但是,如果再次满足调度条件,则可以再次调度算子执行,并且将再次调用 start() 方法。

graphviz-75ff72e35736eb2b8ccd3233ed065e0b088d5b17.png

图 15 Holoscan 算子生命周期中方法调用的顺序

警告

如果要为此 C++ 算子创建 Python 绑定,建议将 initialize() 和/或 start() 方法中分配的任何资源清理放入算子的 stop() 方法中,而不是在其析构函数中。这是必要的,因为目前存在一个问题,即不保证析构函数总是在 Python 应用程序终止之前被调用。stop() 方法将始终被显式调用,因此我们可以确信任何清理都会按预期发生。

我们可以通过实现上述方法来覆盖算子的默认行为。以下示例展示了如何实现一个自定义算子,该算子覆盖了 start、stop 和 compute 方法。

列表 2 Holoscan 算子的基本结构 (C++)

复制
已复制!
            

#include "holoscan/holoscan.hpp" using holoscan::Operator; using holoscan::OperatorSpec; using holoscan::InputContext; using holoscan::OutputContext; using holoscan::ExecutionContext; using holoscan::Arg; using holoscan::ArgList; class MyOp : public Operator { public: HOLOSCAN_OPERATOR_FORWARD_ARGS(MyOp) MyOp() = default; void setup(OperatorSpec& spec) override { } void start() override { HOLOSCAN_LOG_TRACE("MyOp::start()"); } void compute(InputContext&, OutputContext& op_output, ExecutionContext&) override { HOLOSCAN_LOG_TRACE("MyOp::compute()"); }; void stop() override { HOLOSCAN_LOG_TRACE("MyOp::stop()"); } };


创建自定义算子 (C++)

要在 C++ 中创建自定义算子,必须创建 holoscan::Operator 的子类。以下示例演示了如何使用原生算子(没有底层预编译 GXF Codelet 的算子)。

代码片段: examples/ping_multi_port/cpp/ping_multi_port.cpp

列表 3 examples/ping_multi_port/cpp/ping_multi_port.cpp

复制
已复制!
            

#include "holoscan/holoscan.hpp" class ValueData { public: ValueData() = default; explicit ValueData(int value) : data_(value) { HOLOSCAN_LOG_TRACE("ValueData::ValueData(): {}", data_); } ~ValueData() { HOLOSCAN_LOG_TRACE("ValueData::~ValueData(): {}", data_); } void data(int value) { data_ = value; } int data() const { return data_; } private: int data_; }; namespace holoscan::ops { class PingTxOp : public Operator { public: HOLOSCAN_OPERATOR_FORWARD_ARGS(PingTxOp) PingTxOp() = default; void setup(OperatorSpec& spec) override { spec.output<std::shared_ptr<ValueData>>("out1"); spec.output<std::shared_ptr<ValueData>>("out2"); } void compute(InputContext&, OutputContext& op_output, ExecutionContext&) override { auto value1 = std::make_shared<ValueData>(index_++); op_output.emit(value1, "out1"); auto value2 = std::make_shared<ValueData>(index_++); op_output.emit(value2, "out2"); }; int index_ = 1; }; class PingMxOp : public Operator { public: HOLOSCAN_OPERATOR_FORWARD_ARGS(PingMxOp) PingMxOp() = default; void setup(OperatorSpec& spec) override { spec.input<std::shared_ptr<ValueData>>("in1"); spec.input<std::shared_ptr<ValueData>>("in2"); spec.output<std::shared_ptr<ValueData>>("out1"); spec.output<std::shared_ptr<ValueData>>("out2"); spec.param(multiplier_, "multiplier", "Multiplier", "Multiply the input by this value", 2); } void compute(InputContext& op_input, OutputContext& op_output, ExecutionContext&) override { auto value1 = op_input.receive<std::shared_ptr<ValueData>>("in1").value(); auto value2 = op_input.receive<std::shared_ptr<ValueData>>("in2").value(); HOLOSCAN_LOG_INFO("Middle message received (count: {})", count_++); HOLOSCAN_LOG_INFO("Middle message value1: {}", value1->data()); HOLOSCAN_LOG_INFO("Middle message value2: {}", value2->data()); // Multiply the values by the multiplier parameter value1->data(value1->data() * multiplier_); value2->data(value2->data() * multiplier_); op_output.emit(value1, "out1"); op_output.emit(value2, "out2"); }; private: int count_ = 1; Parameter<int> multiplier_; }; class PingRxOp : public Operator { public: HOLOSCAN_OPERATOR_FORWARD_ARGS(PingRxOp) PingRxOp() = default; void setup(OperatorSpec& spec) override { // // Since Holoscan SDK v2.3, users can define a multi-receiver input port using 'spec.input()' // // with 'IOSpec::kAnySize'. // // The old way is to use 'spec.param()' with 'Parameter<std::vector<IOSpec*>> receivers_;'. // spec.param(receivers_, "receivers", "Input Receivers", "List of input receivers.", {}); spec.input<std::vector<std::shared_ptr<ValueData>>>("receivers", IOSpec::kAnySize); } void compute(InputContext& op_input, OutputContext&, ExecutionContext&) override { auto value_vector = op_input.receive<std::vector<std::shared_ptr<ValueData>>>("receivers").value(); HOLOSCAN_LOG_INFO("Rx message received (count: {}, size: {})", count_++, value_vector.size()); HOLOSCAN_LOG_INFO("Rx message value1: {}", value_vector[0]->data()); HOLOSCAN_LOG_INFO("Rx message value2: {}", value_vector[1]->data()); }; private: // // Since Holoscan SDK v2.3, the following line is no longer needed. // Parameter<std::vector<IOSpec*>> receivers_; int count_ = 1; }; } // namespace holoscan::ops class MyPingApp : public holoscan::Application { public: void compose() override { using namespace holoscan; // Define the tx, mx, rx operators, allowing the tx operator to execute 10 times auto tx = make_operator<ops::PingTxOp>("tx", make_condition<CountCondition>(10)); auto mx = make_operator<ops::PingMxOp>("mx", Arg("multiplier", 3)); auto rx = make_operator<ops::PingRxOp>("rx"); // Define the workflow add_flow(tx, mx, {{"out1", "in1"}, {"out2", "in2"}}); add_flow(mx, rx, {{"out1", "receivers"}, {"out2", "receivers"}}); } }; int main(int argc, char** argv) { auto app = holoscan::make_application<MyPingApp>(); app->run(); return 0; }


在此应用程序中,创建了三个算子:PingTxOpPingMxOpPingRxOp

  1. PingTxOp 算子是一个源算子,每次调用时都会发出两个值。这些值在两个不同的输出端口 out1(用于奇数整数)和 out2(用于偶数整数)上发出。

  2. PingMxOp 算子是一个中间算子,它从 PingTxOp 算子接收两个值,并在两个不同的输出端口上发出两个值。这些值乘以 multiplier 参数。

  3. PingRxOp 算子是一个接收器算子,它从 PingMxOp 算子接收两个值。这些值在单个输入 receivers 上接收,receivers 是输入端口的向量。PingRxOp 算子按照 PingMxOp 算子发出的顺序接收这些值。

如下文更详细的介绍,每个算子的输入都在算子的 setup() 方法中指定。然后,在 compute() 方法中通过 op_input.receive() 接收输入,并通过 op_output.emit() 发出输出。

请注意,对于此处定义的原生 C++ 算子,可以发出或接收任何对象,包括共享指针。对于大型对象(如张量),从性能角度来看,最好传输指向该对象的共享指针,而不是复制。当使用共享指针并且同一个张量发送到多个下游算子时,应避免对张量进行原地操作,否则可能会发生算子之间的竞争条件。

如果需要在初始化算子之前或之后配置参数或执行其他设置任务,可以覆盖 initialize() 方法。此方法在 start() 方法之前调用一次。

示例

复制
已复制!
            

void initialize() override { // Register custom type and codec for serialization register_converter<std::array<float, 3>>(); register_codec<std::vector<InputSpec>>("std::vector<holoscan::ops::HolovizOp::InputSpec>", true); // Set up prerequisite parameters before calling Operator::initialize() auto frag = fragment(); // Check if an argument for 'allocator' exists auto has_allocator = std::find_if( args().begin(), args().end(), [](const auto& arg) { return (arg.name() == "allocator"); }); // Create the allocator if no argument is provided if (has_allocator == args().end()) { allocator_ = frag->make_resource<UnboundedAllocator>("allocator"); add_arg(allocator_.get()); } // Call the parent class's initialize() method to complete the initialization. // Operator::initialize must occur after all arguments have been added. Operator::initialize(); // After Operator::initialize(), the operator is ready for use and the parameters are set int multiplier = multiplier_; HOLOSCAN_LOG_INFO("Multiplier: {}", multiplier); }

有关 register_converter()register_codec() 方法的详细信息,请参阅自定义参数类型的 holoscan::ComponentBase::register_converter() 和分布式应用程序的对象序列化部分 object serialization

指定算子参数 (C++)

在上面的 holoscan::ops::PingMxOp 算子示例中,您有一个参数 multiplier,它声明为类的一部分,作为私有成员,使用 param() 模板类型

复制
已复制!
            

Parameter<int> multiplier_;

然后将其添加到算子的 OperatorSpec 属性的 setup() 方法中,其中必须提供关联的字符串键。还可以提及其他属性,例如描述和默认值

复制
已复制!
            

// Provide key, and optionally other information spec.param(multiplier_, "multiplier", "Multiplier", "Multiply the input by this value", 2);

注意

如果您的参数是自定义类型,则必须注册该类型并提供 YAML 编码器/解码器,如 holoscan::ComponentBase::register_converter() 下的文档所述

请参阅 配置算子参数 部分,了解应用程序如何设置这些参数。

指定算子输入和输出 (C++)

要配置 C++ 原生算子的输入和输出,请在算子的 setup() 方法中调用 spec.input()spec.output() 方法。

对于要添加的每个输入和输出,spec.input()spec.output() 方法应调用一次。OperatorSpec 对象和 setup() 方法将由 Application 类在其 run() 方法被调用时自动初始化和调用。

这些方法 (spec.input()spec.output()) 返回一个 IOSpec 对象,该对象可用于配置输入/输出端口。

默认情况下,holoscan::MessageAvailableConditionholoscan::DownstreamMessageAffordableCondition 条件(min_size1)应用于输入/输出端口。这意味着在输入端口上有消息可用且下游算子的输入端口(队列)有足够的容量接收消息之前,不会调用算子的 compute() 方法。

复制
已复制!
            

void setup(OperatorSpec& spec) override { spec.input<std::shared_ptr<ValueData>>("in"); // Above statement is equivalent to: // spec.input<std::shared_ptr<ValueData>>("in") // .condition(ConditionType::kMessageAvailable, Arg("min_size") = static_cast<uint64_t>(1)); spec.output<std::shared_ptr<ValueData>>("out"); // Above statement is equivalent to: // spec.output<std::shared_ptr<ValueData>>("out") // .condition(ConditionType::kDownstreamMessageAffordable, Arg("min_size") = static_cast<uint64_t>(1)); ... }

在上面的示例中,spec.input() 方法用于配置输入端口,使其具有最小大小为 1 的 holoscan::MessageAvailableCondition。这意味着在算子的输入端口上有消息可用之前,不会调用算子的 compute() 方法。类似地,spec.output() 方法用于配置输出端口,使其具有最小大小为 1 的 holoscan::DownstreamMessageAffordableCondition。这意味着在下游算子的输入端口有足够的容量接收消息之前,不会调用算子的 compute() 方法。

如果要更改此行为,请使用 IOSpec::condition() 方法来配置条件。例如,要将输入和输出端口配置为没有条件,可以使用以下代码

复制
已复制!
            

void setup(OperatorSpec& spec) override { spec.input<std::shared_ptr<ValueData>>("in") .condition(ConditionType::kNone); spec.output<std::shared_ptr<ValueData>>("out") .condition(ConditionType::kNone); // ... }

setup() 方法中的示例代码将输入端口配置为没有条件,这意味着只要算子准备好计算,就会调用 compute() 方法。由于不保证输入端口有消息可用,因此 compute() 方法应在尝试读取输入端口上的消息之前检查是否有消息可用。

receive() 方法的 InputContext 对象可用于访问操作符类的 compute() 方法中的不同类型的输入数据,其中其模板参数 (DataT) 是输入数据的数据类型。此方法将输入端口的名称作为参数(如果您的算子只有一个输入端口,则可以省略),并返回输入数据。如果输入数据不可用,则该方法返回 holoscan::expected<std::shared_ptr<ValueData>, holoscan::RuntimeError> 类型的对象。holoscan::expected<T, E> 类模板用于表示预期对象,该对象可以保存 T 类型的值或 E 类型的错误。预期对象用于以比使用错误代码或异常更结构化的方式返回和传播错误。在这种情况下,预期对象可以保存 std::shared_ptr<ValueData> 对象或 holoscan::RuntimeError 类,其中包含描述失败原因的错误消息。

holoscan::RuntimeError 类是 std::runtime_error 的派生类,并支持访问更多错误信息,例如,使用 what() 方法。

在下面的示例代码片段中,PingRxOp 算子在名为“in”的端口上接收输入,数据类型为 std::shared_ptr<ValueData>receive() 方法用于访问输入数据。使用 if 条件检查 maybe_value 是否有效。如果输入数据中存在错误,则记录错误消息,并且算子抛出错误。如果输入数据有效,我们可以使用 expected 对象的 value() 方法访问输入数据的引用。为了避免复制输入数据(或创建另一个共享指针),输入数据的引用存储在 value 变量中(使用 auto& value = maybe_value.value())。然后调用 ValueData 类的 data() 方法来获取输入数据的值。

复制
已复制!
            

// ... class PingRxOp : public holoscan::Operator { public: HOLOSCAN_OPERATOR_FORWARD_ARGS(PingRxOp) PingRxOp() = default; void setup(holoscan::OperatorSpec& spec) override { spec.input<std::shared_ptr<ValueData>>("in"); } void compute(holoscan::InputContext& op_input, holoscan::OutputContext&, holoscan::ExecutionContext&) override { auto maybe_value = op_input.receive<std::shared_ptr<ValueData>>("in"); if (!maybe_value) { HOLOSCAN_LOG_ERROR("Failed to receive message - {}", maybe_value.error().what()); // [error] Failed to receive message - InputContext receive() Error: No message is received from the input port with name 'in' throw maybe_value.error(); // or `return;` } auto& value = maybe_value.value(); HOLOSCAN_LOG_INFO("Message received (value: {})", value->data()); } };

在内部,Holoscan 中的消息传递是使用 Message 类实现的,该类包装了一个 std::any 对象,并提供了一个类型安全的接口来访问输入数据。std::any 类是任何类型的单个值的类型安全容器,用于存储算子的输入和输出数据。std::any 类是 C++ 标准库的一部分,并在 any 头文件中定义。

由于 Holoscan SDK 使用 GXF 作为执行引擎,因此当在 Holoscan 原生算子和 GXF 算子之间传递数据时,holoscan::Message 对象也封装在 nvidia::gxf::Entity 对象中。这确保了数据与 GXF 框架兼容。

如果输入数据预计来自 GXF 算子或张量(在这两种情况下,数据都是 nvidia::gxf::Entity 的实例),则可以在 receive 方法的模板参数中使用 holoscan::gxf::Entity 类来访问输入数据。holoscan::gxf::Entity 类是 nvidia::gxf::Entity 类(类似于字典对象)的包装器,并提供了一种获取张量和向实体添加张量的方法。

Holoscan SDK 提供了称为 域对象 的内置数据类型,这些类型在 include/holoscan/core/domain 目录中定义。例如,holoscan::Tensor 是一个域对象类,它表示数据的多维数组,并且可以与底层 GXF 类 (nvidia::gxf::Tensor) 互操作。holoscan::Tensor 类提供了访问张量数据、形状和其他属性的方法。支持将 holoscan::Tensor 对象传递到 GXF 算子和从 GXF 算子传递 holoscan::Tensor 对象。

提示

holoscan::Tensor 类是包装 DLManagedTensorContext 结构体的包装器,该结构体保存 DLManagedTensor 对象。因此,它提供了访问张量数据的主要接口,并且可以与其他支持 DLPack 接口的框架互操作。

有关更多详细信息,请参阅 互操作性部分

在下面的示例中,TensorRx 算子在名为“in”的端口上接收输入,数据类型为 holoscan::gxf::Entity

复制
已复制!
            

// ... class TensorRxOp : public holoscan::Operator { public: HOLOSCAN_OPERATOR_FORWARD_ARGS(TensorRxOp) TensorRxOp() = default; void setup(holoscan::OperatorSpec& spec) override { spec.input<holoscan::gxf::Entity>("in"); } void compute(holoscan::InputContext& op_input, holoscan::OutputContext&, holoscan::ExecutionContext&) override { // Type of 'maybe_entity' is holoscan::expected<holoscan::gxf::Entity, holoscan::RuntimeError> auto maybe_entity = op_input.receive<holoscan::gxf::Entity>("in"); if (maybe_entity) { auto& entity = maybe_entity.value(); // holoscan::gxf::Entity& // Get a tensor from the entity if it exists. // Can pass a tensor name as an argument to get a specific tensor. auto tensor = entity.get<holoscan::Tensor>(); // std::shared_ptr<holoscan::Tensor> if (tensor) { HOLOSCAN_LOG_INFO("tensor nbytes: {}", tensor->nbytes()); } } } };

如果实体包含张量,则可以使用 holoscan::gxf::Entity 类的 get 方法来检索张量。get 方法返回一个 std::shared_ptr<holoscan::Tensor> 对象,该对象可用于访问张量数据。holoscan::Tensor 类的 nbytes 方法用于获取张量中的字节数。

通过使用 holoscan::TensorMap 类,该类存储张量名称到张量的映射 (std::unordered_map<std::string, std::shared_ptr<holoscan::Tensor>>),可以更新接收包含一个或多个张量对象的实体对象的代码,以接收 holoscan::TensorMap 对象而不是 holoscan::gxf::Entity 对象。holoscan::TensorMap 类提供了一种通过名称访问张量数据的方法,使用类似于 std::unordered_map 的接口。

复制
已复制!
            

// ... class TensorRxOp : public holoscan::Operator { public: HOLOSCAN_OPERATOR_FORWARD_ARGS(TensorRxOp) TensorRxOp() = default; void setup(holoscan::OperatorSpec& spec) override { spec.input<holoscan::TensorMap>("in"); } void compute(holoscan::InputContext& op_input, holoscan::OutputContext&, holoscan::ExecutionContext&) override { // Type of 'maybe_entity' is holoscan::expected<holoscan::TensorMap, holoscan::RuntimeError> auto maybe_tensor_map = op_input.receive<holoscan::TensorMap>("in"); if (maybe_tensor_map) { auto& tensor_map = maybe_tensor_map.value(); // holoscan::TensorMap& for (const auto& [name, tensor] : tensor_map) { HOLOSCAN_LOG_INFO("tensor name: {}", name); HOLOSCAN_LOG_INFO("tensor nbytes: {}", tensor->nbytes()); } } } };

在上面的示例中,TensorRxOp 算子在名为“in”的端口上接收输入,数据类型为 holoscan::TensorMapInputContext 对象的 receive 方法用于访问输入数据。receive 方法返回一个 expected 对象,该对象可以保存 holoscan::TensorMap 对象或 holoscan::RuntimeError 对象。holoscan::TensorMap 类是 std::unordered_map<std::string, std::shared_ptr<holoscan::Tensor>> 类的包装器,并提供了一种访问张量数据的方法。holoscan::Tensor 类的 nbytes 方法用于获取张量中的字节数。

如果类型 std::any 用于 receive 方法的模板参数,则 receive() 方法将返回一个 std::any 对象,其中包含指定名称的输入。在下面的示例中,PingRxOp 算子在名为“in”的端口上接收输入,数据类型为 std::anystd::any 对象的 type() 方法用于确定输入数据的实际类型,std::any_cast() 函数用于检索输入数据的值。

复制
已复制!
            

// ... class AnyRxOp : public holoscan::Operator { public: HOLOSCAN_OPERATOR_FORWARD_ARGS_SUPER(AnyRxOp, holoscan::ops::GXFOperator) AnyRxOp() = default; void setup(holoscan::OperatorSpec& spec) override { spec.input<std::any>("in"); } void compute(holoscan::InputContext& op_input, holoscan::OutputContext&, holoscan::ExecutionContext&) override { auto maybe_any = op_input.receive<std::any>("in"); if (!maybe_any) { HOLOSCAN_LOG_ERROR("Failed to receive message - {}", maybe_any.error().what()); return; } auto& in_any = maybe_any.value(); const auto& in_any_type = in_any.type(); try { if (in_any_type == typeid(holoscan::gxf::Entity)) { auto in_entity = std::any_cast<holoscan::gxf::Entity>(in_any); auto tensor = in_entity.get<holoscan::Tensor>(); // std::shared_ptr<holoscan::Tensor> if (tensor) { HOLOSCAN_LOG_INFO("tensor nbytes: {}", tensor->nbytes()); } } else if (in_any_type == typeid(std::shared_ptr<ValueData>)) { auto in_value = std::any_cast<std::shared_ptr<ValueData>>(in_any); HOLOSCAN_LOG_INFO("Received value: {}", in_value->data()); } else { HOLOSCAN_LOG_ERROR("Invalid message type: {}", in_any_type.name()); } } catch (const std::bad_any_cast& e) { HOLOSCAN_LOG_ERROR("Failed to cast message - {}", e.what()); } } };

接收任意数量的输入 (C++)

在某些情况下,最好允许在端口上接收任意数量的对象,而不是分配特定数量的输入端口。

使用 IOSpec::kAnySize 用于可变输入处理

实现此目的的一种方法是通过调用 spec.input<std::vector<T>>("port_name", IOSpec::kAnySize) 并将 IOSpec::kAnySize 作为 setup() 方法中算子的第二个参数来定义多接收器输入端口,其中 T 是输入数据类型(如 原生算子 ping 示例中的 PingRxOp 所做的那样)。

复制
已复制!
            

void setup(OperatorSpec& spec) override { spec.input<std::vector<std::shared_ptr<ValueData>>>("receivers", IOSpec::kAnySize); }

列表 4 examples/ping_multi_port/cpp/ping_multi_port.cpp

复制
已复制!
            

class PingRxOp : public Operator { public: HOLOSCAN_OPERATOR_FORWARD_ARGS(PingRxOp) PingRxOp() = default; void setup(OperatorSpec& spec) override { // // Since Holoscan SDK v2.3, users can define a multi-receiver input port using 'spec.input()' // // with 'IOSpec::kAnySize'. // // The old way is to use 'spec.param()' with 'Parameter<std::vector<IOSpec*>> receivers_;'. // spec.param(receivers_, "receivers", "Input Receivers", "List of input receivers.", {}); spec.input<std::vector<std::shared_ptr<ValueData>>>("receivers", IOSpec::kAnySize); } void compute(InputContext& op_input, OutputContext&, ExecutionContext&) override { auto value_vector = op_input.receive<std::vector<std::shared_ptr<ValueData>>>("receivers").value(); HOLOSCAN_LOG_INFO("Rx message received (count: {}, size: {})", count_++, value_vector.size()); HOLOSCAN_LOG_INFO("Rx message value1: {}", value_vector[0]->data()); HOLOSCAN_LOG_INFO("Rx message value2: {}", value_vector[1]->data()); }; private: // // Since Holoscan SDK v2.3, the following line is no longer needed. // Parameter<std::vector<IOSpec*>> receivers_; int count_ = 1; }; } // namespace holoscan::ops class MyPingApp : public holoscan::Application { public: void compose() override { using namespace holoscan; // Define the tx, mx, rx operators, allowing the tx operator to execute 10 times auto tx = make_operator<ops::PingTxOp>("tx", make_condition<CountCondition>(10)); auto mx = make_operator<ops::PingMxOp>("mx", Arg("multiplier", 3)); auto rx = make_operator<ops::PingRxOp>("rx"); // Define the workflow add_flow(tx, mx, {{"out1", "in1"}, {"out2", "in2"}}); add_flow(mx, rx, {{"out1", "receivers"}, {"out2", "receivers"}}); } };


然后,一旦在 compose() 方法中提供了以下配置,

复制
已复制!
            

add_flow(mx, rx, {{"out1", "receivers"}, {"out2", "receivers"}});

PingRxOp 将在 compute() 方法中的 receivers 端口上接收两个输入

复制
已复制!
            

auto value_vector = op_input.receive<std::vector<std::shared_ptr<ValueData>>>("receivers").value();

提示

当输入端口使用 IOSpec::kAnySize 定义时,框架会为端口上接收的每个输入对象创建一个新的输入端口。输入端口使用格式 <port_name>:<index> 命名,其中 <port_name> 是输入端口的名称,<index> 是端口上接收的输入对象的索引。例如,如果 receivers 端口接收两个输入对象,则输入端口将命名为 receivers:0receivers:1

框架在内部创建一个类型为 std::vector<holoscan::IOSpec*> 的参数 (receivers),隐式创建输入端口 (receivers:0receivers:1),并将它们连接起来(将输入端口的引用添加到 receivers 向量)。这样,当调用 receive() 方法时,框架可以将来自相应输入端口的输入数据作为向量返回。

复制
已复制!
            

auto value_vector = op_input.receive<std::vector<std::shared_ptr<ValueData>>>("receivers").value();

如果在 compose() 方法的末尾添加 HOLOSCAN_LOG_INFO(rx->description());,您将看到 PingRxOp 算子的描述,如下所示

复制
已复制!
            

id: -1 name: rx fragment: "" args: [] type: kNative conditions: [] resources: [] spec: fragment: "" params: - name: receivers type: std::vector<holoscan::IOSpec*> description: "" flag: kNone inputs: - name: receivers:1 io_type: kInput typeinfo_name: N8holoscan3gxf6EntityE connector_type: kDefault conditions: [] - name: receivers:0 io_type: kInput typeinfo_name: N8holoscan3gxf6EntityE connector_type: kDefault conditions: [] - name: receivers io_type: kInput typeinfo_name: St6vectorISt10shared_ptrI9ValueDataESaIS2_EE connector_type: kDefault conditions: [] outputs: []

配置输入端口队列大小和消息批处理条件 (C++)

如果您想在一个端口上接收多个对象并批量处理它们,您可以增加输入端口的队列大小,并将 MessageAvailableCondition 条件的 min_size 参数设置为所需的批次大小。这可以通过调用带有期望参数的 connector()condition() 方法来完成,分别使用批次大小作为 capacitymin_size 参数。

min_size 设置为 N 将确保操作器在 compute() 方法被调用之前接收到 N 个对象。

复制
已复制!
            

void setup(holoscan::OperatorSpec& spec) override { spec.input<std::shared_ptr<ValueData>>("receivers") .connector(holoscan::IOSpec::ConnectorType::kDoubleBuffer, holoscan::Arg("capacity", static_cast<uint64_t>(2))) .condition(holoscan::ConditionType::kMessageAvailable, holoscan::Arg("min_size", static_cast<uint64_t>(2))); }

然后,可以调用带有 receivers 端口名称的 receive() 方法来批量接收输入数据。

复制
已复制!
            

void compute(holoscan::InputContext& op_input, holoscan::OutputContext&, holoscan::ExecutionContext&) override { std::vector<std::shared_ptr<ValueData>> value_vector; auto maybe_value = op_input.receive<std::shared_ptr<ValueData>>("receivers"); while (maybe_value) { value_vector.push_back(maybe_value.value()); maybe_value = op_input.receive<std::shared_ptr<ValueData>>("receivers"); } HOLOSCAN_LOG_INFO("Rx message received (size: {})", value_vector.size()); }

在上面的示例中,操作器在名为“receivers”的端口上接收输入,队列大小为 2,min_size 为 2。receive() 方法在一个循环中被调用,以批量接收输入数据,批次大小为 2。由于操作器事先不知道要接收的对象数量,因此在一个循环中调用 receive() 方法,直到它返回错误。输入数据存储在一个向量中,并且在接收到所有输入数据后,向量的大小会被记录。

为了简化上述代码,Holoscan SDK 提供了 IOSpec::kPrecedingCount 常量作为 OperatorSpec 的 input() 方法的第二个参数,以指定输入端口的前置连接数(在本例中,到 receivers 端口的连接数为 2)作为批次大小。这可以用于批量接收输入数据,而无需在一个循环中调用 receive() 方法。

复制
已复制!
            

void setup(holoscan::OperatorSpec& spec) override { spec.input<std::vector<std::shared_ptr<ValueData>>>("receivers", holoscan::IOSpec::kPrecedingCount); }

然后,可以调用带有 receivers 端口名称的 receive() 方法来批量接收输入数据。

复制
已复制!
            

void compute(holoscan::InputContext& op_input, holoscan::OutputContext&, holoscan::ExecutionContext&) override { auto value_vector = op_input.receive<std::vector<std::shared_ptr<ValueData>>>("receivers").value(); HOLOSCAN_LOG_INFO("Rx message received (size: {})", value_vector.size()); HOLOSCAN_LOG_INFO("Rx message value1: {}", value_vector[0]->data()); HOLOSCAN_LOG_INFO("Rx message value2: {}", value_vector[1]->data()); }

在上面的示例中,操作器在名为“receivers”的端口上接收输入,批次大小为 2。receive() 方法被调用,并带有 receivers 端口名称,以批量接收输入数据,批次大小为 2。输入数据存储在一个向量中,并且在接收到所有输入数据后,向量的大小会被记录。

如果您想使用特定的批次大小,您可以使用 holoscan::IOSpec::IOSize(int64_t) 而不是 holoscan::IOSpec::kPrecedingCount 来指定批次大小。以这种方式使用 IOSize 等同于更详细的 condition()connector() 调用,以更新本节开始附近显示的 capacitymin_size 参数。

使用 IOSize 的简短形式,而不是 condition()connector() 方法的主要原因是,如果需要进行额外的参数更改,例如队列策略。有关 condition()connector() 方法的使用的更多详细信息,请参阅下面的高级主题部分(进一步自定义输入和输出)。

复制
已复制!
            

void setup(holoscan::OperatorSpec& spec) override { spec.input<std::vector<std::shared_ptr<ValueData>>>("receivers", holoscan::IOSpec::IOSize(2)); }

如果您想逐个接收输入数据,您可以调用 receive() 方法,而无需使用 std::vector<T> 模板参数。

复制
已复制!
            

void compute(holoscan::InputContext& op_input, holoscan::OutputContext&, holoscan::ExecutionContext&) override { while (true) { auto maybe_value = op_input.receive<std::shared_ptr<ValueData>>("receivers"); if (!maybe_value) { break; } auto& value = maybe_value.value(); // Process the input data HOLOSCAN_LOG_INFO("Rx message received (value: {})", value->data()); } }

上面的代码将从 receivers 端口逐个接收输入数据。receive() 方法在一个循环中被调用,直到它返回错误。输入数据存储在一个变量中,并记录输入数据的值。

注意

这种方法(逐个接收输入数据)不适用于 holoscan::IOSpec::kAnySize 的情况。对于 holoscan::IOSpec::kAnySize 参数,框架会在内部为端口上接收的每个输入对象创建一个新的输入端口。每个隐式输入端口(使用格式 <port_name>:<index> 命名)都与一个 MessageAvailableCondition 条件相关联,该条件的 min_size1。因此,receive() 方法需要使用 std::vector<T> 模板参数来一次性批量接收输入数据。

如果您确实需要为 holoscan::IOSpec::kAnySize 情况逐个接收输入数据(尽管不推荐这样做),您可以从每个隐式输入端口(命名为 <port_name>:<index>)逐个接收输入数据,使用不带 std::vector<T> 模板参数的 receive() 方法。(例如,op_input.receive<std::shared_ptr<ValueData>>("receivers:0")op_input.receive<std::shared_ptr<ValueData>>("receivers:1") 等)。为了避免在为隐式输入端口调用 receive() 方法时出现错误消息(例如 The operator does not have an input port with label 'receivers:X'),您需要提前计算到 receivers 端口的连接数,并相应地为每个隐式输入端口调用 receive() 方法。

复制
已复制!
            

void compute(holoscan::InputContext& op_input, holoscan::OutputContext&, holoscan::ExecutionContext&) override { int input_count = spec()->inputs().size() - 1; // -1 to exclude the 'receivers' input port for (int i = 0; i < input_count; i++) { auto maybe_value = op_input.receive<std::shared_ptr<ValueData>>(fmt::format("receivers:{}", i).c_str()); if (!maybe_value) { break; } auto& value = maybe_value.value(); // Process the input data HOLOSCAN_LOG_INFO("Rx message received (value: {})", value->data()); } }

注意

在上面的示例中,使用 IOSpec::kPrecedingCountIOSpec::IOSize(int64_t) 似乎显示出与 IOSpec::kAnySize 相同的行为。但是,区别在于,由于 IOSpec::kPrecedingCountIOSpec::IOSize(int64_t) 不为每个(内部)输入端口使用单独的 MessageAvailableCondition 条件,因此不能保证操作器将按顺序接收输入数据。

这意味着操作器接收输入数据的顺序可能与 compose() 方法中建立连接的顺序不同。此外,使用多线程调度器,不能保证操作器将均匀地接收来自每个连接的输入数据。操作器可能从一个连接接收到比另一个连接更多的输入数据。

如果输入数据的顺序很重要,建议使用 IOSpec::kAnySize 并使用带有 std::vector<T> 模板参数的 receive() 方法来一次性批量接收输入数据。

有关在 C++ 操作器中接收多个输入的更多示例,请参阅 C++ 系统测试用例

构建您的 C++ 操作器

您可以使用 CMake 构建 C++ 操作器,方法是在您的 CMakeLists.txt 中调用 find_package(holoscan) 以加载 SDK 库。您的操作器将需要链接到 holoscan::core

列表 5 /CMakeLists.txt

复制
已复制!
            

# Your CMake project cmake_minimum_required(VERSION 3.20) project(my_project CXX) # Finds the holoscan SDK find_package(holoscan REQUIRED CONFIG PATHS "/opt/nvidia/holoscan") # Create a library for your operator add_library(my_operator SHARED my_operator.cpp) # Link your operator against holoscan::core target_link_libraries(my_operator PUBLIC holoscan::core )


一旦您的 CMakeLists.txt<src_dir> 中准备就绪,您可以使用下面的命令行在 <build_dir> 中构建。如果想要使用的 SDK 安装与提供给 find_package(holoscan)PATHS 不同,您可以选择性地传递 Holoscan_ROOT

复制
已复制!
            

# Configure cmake -S <src_dir> -B <build_dir> -D Holoscan_ROOT="/opt/nvidia/holoscan" # Build cmake --build <build_dir> -j

在应用程序中使用您的 C++ 操作器

  • 如果应用程序与操作器配置在同一个 CMake 项目中,您可以简单地将操作器 CMake 目标库名称添加到应用程序可执行文件的 target_link_libraries 调用中,因为操作器 CMake 目标已经定义。

    复制
    已复制!
                

    # operator add_library(my_op my_op.cpp) target_link_libraries(my_operator PUBLIC holoscan::core) # application add_executable(my_app main.cpp) target_link_libraries(my_operator PRIVATE holoscan::core my_op )

  • 如果应用程序与操作器配置在不同的项目中,您需要在操作器的 CMake 项目中 导出操作器,并在应用程序 CMake 项目中导入它,然后才能将其列在 target_link_libraries 下。这与 SDK 内置操作器 所做的事情相同,这些操作器在 holoscan::ops 命名空间下可用。

然后,您可以在应用程序代码中包含 C++ 操作器的头文件。

GXF 操作器

使用 Holoscan C++ API,我们还可以将来自 GXF 扩展的 GXF Codelets 包装为 Holoscan Operators

注意

如果您没有现有的 GXF 扩展,我们建议使用 C++Python API 开发原生操作器,以跳过将 GXF codelets 包装为操作器的需要。如果您确实需要创建 GXF 扩展,请按照 创建 GXF 扩展 部分了解 GXF 扩展开发过程的详细说明。

提示

为了使用 GXF Codelet 作为 Holoscan 操作器,不再需要下面描述的手动 codelet 包装机制。现在有一个新的 GXFCodeletOp,它允许通过 Fragment::make_operator 直接使用现有的 GXF codelet,而无需先为其创建包装类。类似地,现在还有一个 GXFComponentResource 类,它允许将 GXF Component 用作 Holoscan 资源,通过 Fragment::make_resource。在 examples/import_gxf_components 文件夹中,为 C++ 和 Python 应用程序提供了如何使用这些类的详细示例。

给定一个现有的 GXF 扩展,我们可以创建一个简单的“identity”应用程序,它由一个重放器组成,该重放器从磁盘上的文件中读取内容,以及上一节的记录器,它将以完全相同的格式存储重放器的输出。这使我们能够查看记录器的输出是否与原始输入文件匹配。

下面的 MyRecorderOp Holoscan 操作器实现将包装 此处 显示的 MyRecorder GXF Codelet。

操作器定义

列表 6 my_recorder_op.hpp

复制
已复制!
            

#ifndef APPS_MY_RECORDER_APP_MY_RECORDER_OP_HPP #define APPS_MY_RECORDER_APP_MY_RECORDER_OP_HPP #include "holoscan/core/gxf/gxf_operator.hpp" namespace holoscan::ops { class MyRecorderOp : public holoscan::ops::GXFOperator { public: HOLOSCAN_OPERATOR_FORWARD_ARGS_SUPER(MyRecorderOp, holoscan::ops::GXFOperator) MyRecorderOp() = default; const char* gxf_typename() const override { return "MyRecorder"; } void setup(OperatorSpec& spec) override; void initialize() override; private: Parameter<holoscan::IOSpec*> receiver_; Parameter<std::shared_ptr<holoscan::Resource>> my_serializer_; Parameter<std::string> directory_; Parameter<std::string> basename_; Parameter<bool> flush_on_tick_; }; } // namespace holoscan::ops #endif/* APPS_MY_RECORDER_APP_MY_RECORDER_OP_HPP */


holoscan::ops::MyRecorderOp 类通过继承 holoscan::ops::GXFOperator 类来包装 MyRecorder GXF Codelet。HOLOSCAN_OPERATOR_FORWARD_ARGS_SUPER 宏用于将构造函数的参数转发到基类。

我们首先需要定义 MyRecorderOp 类的字段。您可以看到,在 MyRecorderOp 类和 MyRecorder GXF codelet 中都定义了具有相同名称的字段。

列表 7 gxf_extensions/my_recorder/my_recorder.hpp 中的参数声明

复制
已复制!
            

nvidia::gxf::Parameter<nvidia::gxf::Handle<nvidia::gxf::Receiver>> receiver_; nvidia::gxf::Parameter<nvidia::gxf::Handle<nvidia::gxf::EntitySerializer>> my_serializer_; nvidia::gxf::Parameter<std::string> directory_; nvidia::gxf::Parameter<std::string> basename_; nvidia::gxf::Parameter<bool> flush_on_tick_;


比较 MyRecorderOp holoscan 参数与 MyRecorder gxf codelet

Holoscan 操作器

GXF Codelet

holoscan::Parameter nvidia::gxf::Parameter
holoscan::IOSpec* nvidia::gxf::Handle<nvidia::gxf::Receiver>>
nvidia::gxf::Handle<nvidia::gxf::Transmitter>>
std::shared_ptr<holoscan::Resource>> nvidia::gxf::Handle<T>>
示例:Tnvidia::gxf::EntitySerializer

然后我们需要实现以下函数

  • const char* gxf_typename() const override:返回 Codelet 的 GXF 类型名称。指定 GXF Codelet 的完全限定类名 (MyRecorder)。

  • void setup(OperatorSpec& spec) override:使用操作器的输入/输出和参数设置 OperatorSpec。

  • void initialize() override:初始化操作器。

设置参数规范

setup(OperatorSpec& spec) 函数的实现如下

列表 8 my_recorder_op.cpp

复制
已复制!
            

#include "./my_recorder_op.hpp" #include "holoscan/core/fragment.hpp" #include "holoscan/core/gxf/entity.hpp" #include "holoscan/core/operator_spec.hpp" #include "holoscan/core/resources/gxf/video_stream_serializer.hpp" namespace holoscan::ops { void MyRecorderOp::setup(OperatorSpec& spec) { auto& input = spec.input<holoscan::gxf::Entity>("input"); // Above is same with the following two lines (a default condition is assigned to the input port if not specified): // // auto& input = spec.input<holoscan::gxf::Entity>("input") // .condition(ConditionType::kMessageAvailable, Arg("min_size") = static_cast<uint64_t>(1)); spec.param(receiver_, "receiver", "Entity receiver", "Receiver channel to log", &input); spec.param(my_serializer_, "serializer", "Entity serializer", "Serializer for serializing input data"); spec.param(directory_, "out_directory", "Output directory path", "Directory path to store received output"); spec.param(basename_, "basename", "File base name", "User specified file name without extension"); spec.param(flush_on_tick_, "flush_on_tick", "Boolean to flush on tick", "Flushes output buffer on every `tick` when true", false); } void MyRecorderOp::initialize() {...} } // namespace holoscan::ops


在这里,我们设置了操作器的输入/输出和参数。请注意,此函数的内容与 MyRecorder GXF codelet 的 registerInterface 函数非常相似。

上面的 MyRecorderOp::setup 中突出显示行与 GXF 应用程序 YAML 的以下突出显示语句匹配

列表 9 apps/my_recorder_app_gxf/my_recorder_gxf.yaml 的一部分

复制
已复制!
            

name: recorder components: - name: input type: nvidia::gxf::DoubleBufferReceiver - name: allocator type: nvidia::gxf::UnboundedAllocator - name: component_serializer type: nvidia::gxf::StdComponentSerializer parameters: allocator: allocator - name: entity_serializer type: nvidia::gxf::StdEntitySerializer parameters: component_serializers: [component_serializer] - type: MyRecorder parameters: receiver: input serializer: entity_serializer out_directory: "/tmp" basename: "tensor_out" - type: nvidia::gxf::MessageAvailableSchedulingTerm parameters: receiver: input min_size: 1


以同样的方式,如果我们有一个 Transmitter GXF 组件,我们将有以下语句(请参阅 holoscan::ConditionType 的可用常量)

复制
已复制!
            

auto& output = spec.output<holoscan::gxf::Entity>("output"); // Above is same with the following two lines (a default condition is assigned to the output port if not specified): // // auto& output = spec.output<holoscan::gxf::Entity>("output") // .condition(ConditionType::kDownstreamMessageAffordable, Arg("min_size") = static_cast<uint64_t>(1));

初始化操作器

接下来,initialize() 函数的实现如下

列表 10 my_recorder_op.cpp

复制
已复制!
            

#include "./my_recorder_op.hpp" #include "holoscan/core/fragment.hpp" #include "holoscan/core/gxf/entity.hpp" #include "holoscan/core/operator_spec.hpp" #include "holoscan/core/resources/gxf/video_stream_serializer.hpp" namespace holoscan::ops { void MyRecorderOp::setup(OperatorSpec& spec) {...} void MyRecorderOp::initialize() { // Set up prerequisite parameters before calling GXFOperator::initialize() auto frag = fragment(); auto serializer = frag->make_resource<holoscan::StdEntitySerializer>("serializer"); add_arg(Arg("serializer") = serializer); GXFOperator::initialize(); } } // namespace holoscan::ops


在这里,我们设置了预定义的参数,例如 serializer。上面突出显示行与 GXF 应用程序 YAML 的突出显示语句匹配

列表 11 apps/my_recorder_app_gxf/my_recorder_gxf.yaml 的另一部分

复制
已复制!
            

name: recorder components: - name: input type: nvidia::gxf::DoubleBufferReceiver - name: allocator type: nvidia::gxf::UnboundedAllocator - name: component_serializer type: nvidia::gxf::StdComponentSerializer parameters: allocator: allocator - name: entity_serializer type: nvidia::gxf::StdEntitySerializer parameters: component_serializers: [component_serializer] - type: MyRecorder parameters: receiver: input serializer: entity_serializer out_directory: "/tmp" basename: "tensor_out" - type: nvidia::gxf::MessageAvailableSchedulingTerm parameters: receiver: input min_size: 1


注意

Holoscan C++ API 已经提供了 holoscan::StdEntitySerializer 类,它包装了 nvidia::gxf::StdEntitySerializer GXF 组件,这里用作 serializer

构建您的 GXF 操作器

构建 GXF 操作器和 构建原生 C++ 操作器 在 CMake 中没有区别,因为 GXF codelet 实际上是通过 GXF 扩展作为插件加载的,并且不需要添加到 target_link_libraries(my_operator ...)

在应用程序中使用您的 GXF 操作器

使用 GXF 操作器和 在应用程序中使用原生 C++ 操作器 在 CMake 中没有区别。但是,应用程序将需要加载保存包装的 GXF codelet 符号的 GXF 扩展库,因此需要配置应用程序以在其 yaml 配置文件中查找扩展库,如 此处 文档所述。

GXF 和原生 C++ 操作器之间的互操作性

为了支持在操作器(GXF 和原生 C++ 操作器)之间发送和接收张量,Holoscan SDK 提供了以下 C++ 类

  • 一个名为 holoscan::Map 的类模板,它继承自 std::unordered_map<std::string, std::shared_ptr<T>>。模板参数 T 可以是任何类型,它用于指定存储在 map 中的 std::shared_ptr 对象的类型。

  • 一个 holoscan::TensorMap 类,定义为 holoscan::Map 对于 holoscan::Tensor 类型的特化。

当从原生 C++ 操作器发出带有 holoscan::TensorMap 的消息时,消息对象总是被转换为 holoscan::gxf::Entity 对象并发送到下游操作器。

然后,如果发送的 GXF Entity 对象仅包含 Tensor 对象作为其组件,则下游操作器可以将消息数据接收为 holoscan::TensorMap 对象,而不是 holoscan::gxf::Entity 对象。

图 16 显示了 holoscan::gxf::Entitynvidia::gxf::Entity 类之间的关系,以及 holoscan::Tensornvidia::gxf::Tensor 类之间的关系。

holoscan_tensor_interoperability.png

图 16 支持张量互操作性

holoscan::gxf::Tensornvidia::gxf::Tensor 彼此可互操作,因为它们是围绕相同的底层 DLManagedTensorContext 结构包装器,该结构体保存一个 DLManagedTensor 对象。

holoscan::TensorMap 类用于在 map 中存储多个张量,其中每个张量都与唯一的键关联。holoscan::TensorMap 类用于在操作器之间传递多个张量,其使用方式与 std::unordered_map<std::string, std::shared_ptr<holoscan::Tensor>> 对象相同。

由于 holoscan::TensorMapholoscan::gxf::Entity 对象都保存着可互操作的张量,因此 GXF 和原生 C++ 操作器之间的消息数据也是可互操作的。

图 17 说明了如何使用 holoscan::TensorMap 类在操作器之间传递多个张量。GXFSendTensorOp 操作器将一个 nvidia::gxf::Entity 对象(包含一个名为“tensor”的 GXF 组件 nvidia::gxf::Tensor 对象)发送到 ProcessTensorOp 操作器,后者处理张量,然后将处理后的张量转发到 GXFReceiveTensorOp 操作器。

考虑以下示例,其中 GXFSendTensorOpGXFReceiveTensorOp 是 GXF 操作器,而 ProcessTensorOp 是 C++ 中的 Holoscan 原生操作器

graphviz-bc6140f2577c7728c727ffec27e276500e615f63.png

图 17 C++ 原生操作器和 GXF 操作器之间的张量互操作性

以下代码显示了如何实现 ProcessTensorOpcompute() 方法,作为一个与 GXF 操作器通信的 C++ 原生操作器。重点关注 holoscan::gxf::Entity 的使用

列表 12 examples/tensor_interop/cpp/tensor_interop.cpp

复制
已复制!
            

void compute(InputContext& op_input, OutputContext& op_output, ExecutionContext& context) override { // The type of `in_message` is 'holoscan::TensorMap'. auto in_message = op_input.receive<holoscan::TensorMap>("in").value(); // The type of out_message is TensorMap TensorMap out_message; for (auto& [key, tensor] : in_message) { // Process with 'tensor' here. cudaError_t cuda_status; size_t data_size = tensor->nbytes(); std::vector<uint8_t> in_data(data_size); CUDA_TRY(cudaMemcpy(in_data.data(), tensor->data(), data_size, cudaMemcpyDeviceToHost)); HOLOSCAN_LOG_INFO("ProcessTensorOp Before key: '{}', shape: ({}), data: [{}]", key, fmt::join(tensor->shape(), ","), fmt::join(in_data, ",")); for (size_t i = 0; i < data_size; i++) { in_data[i] *= 2; } HOLOSCAN_LOG_INFO("ProcessTensorOp After key: '{}', shape: ({}), data: [{}]", key, fmt::join(tensor->shape(), ","), fmt::join(in_data, ",")); CUDA_TRY(cudaMemcpy(tensor->data(), in_data.data(), data_size, cudaMemcpyHostToDevice)); out_message.insert({key, tensor}); } // Send the processed message. op_output.emit(out_message); };


注意

examples/tensor_interop/cpp 目录中,提供了一个完整的 C++ 原生操作器示例,该操作器支持与 GXF 操作器的互操作性。

在组装 Python 应用程序时,可以使用两种类型的操作器

  1. 原生 Python 操作器:在 Python 中定义的自定义操作器,通过创建 holoscan.core.Operator 的子类。这些 Python 操作器可以在操作器之间传递任意 Python 对象,并且不受用于 C++ API 操作器的更严格的参数类型的限制。

  2. C++ 操作器的 Python 包装器:在底层 C++ 库中定义的操作器,通过继承 holoscan::Operator 类。这些操作器在 holoscan.operators 模块中提供 Python 绑定。示例包括用于重放视频文件的 VideoStreamReplayerOp,用于格式转换的 FormatConverterOp,以及用于可视化的 HolovizOp

注意

可以使用 Python 包装的 C++ 操作器和原生 Python 操作器的混合来创建一个应用程序。在这种情况下,必须特别注意适当地转换输入和输出张量,如 下面的章节 所示。

原生 Python 操作器

操作器生命周期 (Python)

holoscan.core.Operator 的生命周期由三个阶段组成

  • start() 在操作器启动时调用一次,用于初始化繁重的任务,例如分配内存资源和使用参数。

  • compute() 在操作器被触发时调用,这可以在操作器的生命周期中在 start()stop() 之间发生任意次数。

  • stop() 在操作器停止时调用一次,用于反初始化繁重的任务,例如释放先前在 start() 中分配的资源。

工作流程上的所有算子都安排执行。当一个算子首次执行时,将调用 start() 方法,然后调用 compute() 方法。当算子停止时,将调用 stop() 方法。compute() 方法在 start()stop() 之间被多次调用。

如果未满足 Conditions 指定的任何调度条件(例如,如果操作器已执行一定次数,则 CountCondition 将导致调度条件不满足),则操作器将停止并调用 stop() 方法。

我们将在用户指南的 指定操作器输入和输出 (Python) 部分介绍如何使用 Conditions

通常,start()stop() 函数在应用程序的生命周期中仅调用一次。但是,如果再次满足调度条件,则可以再次调度算子执行,并且将再次调用 start() 方法。

graphviz-d5bf8fa022b9f3584e20248ff54a8f06a49e3b63.png

图 18 Holoscan 操作器生命周期中方法调用的顺序

我们可以通过实现上述方法来覆盖操作器的默认行为。以下示例显示了如何实现一个自定义操作器,该操作器覆盖了 start、stop 和 compute 方法。

列表 13 Holoscan 操作器的基本结构 (Python)

复制
已复制!
            

from holoscan.core import ( ExecutionContext, InputContext, Operator, OperatorSpec, OutputContext, ) class MyOp(Operator): def __init__(self, fragment, *args, **kwargs): super().__init__(fragment, *args, **kwargs) def setup(self, spec: OperatorSpec): pass def start(self): pass def compute(self, op_input: InputContext, op_output: OutputContext, context: ExecutionContext): pass def stop(self): pass


setup() 方法 vs initialize() vs __init__()

setup() 方法旨在通过提供一个 OperatorSpec 对象作为 spec 参数来获取“operator 的 spec”。当 __init__() 被调用时,它会调用 C++ 的 Operator::spec 方法(并设置 self.spec 类成员),并调用 setup 方法,以便 Operator 的 spec 属性持有 operator 的规范。(有关更多详细信息,请参阅源代码。)

由于 setup() 方法可以多次调用,并使用其他 OperatorSpec 对象(例如,枚举 operator 的描述),因此在 setup() 方法中,用户不应该初始化任何内容。此类初始化需要通过重写 initialize() 方法来完成。

复制
已复制!
            

def initialize(self): pass

__init__() 方法用于创建 Operator 对象,并且可以通过传递各种参数来初始化 operator 对象本身。请注意,它不会初始化相应的 GXF 实体对象。底层 GXF 实体对象在 operator 被调度执行时初始化。

请不要忘记在 __init__ 方法的末尾调用基类构造函数 (super().__init__(fragment, *args, **kwargs))。

创建自定义 operator (Python)

要在 Python 中创建自定义 operator,需要创建 holoscan.core.Operator 的子类。以下是一个简单的 operator 示例,该 operator 接受名为 “signal” 的时变一维输入数组,并应用与箱形(即矩形)核的卷积。

为简单起见,此 operator 假定在输入端接收到的 “signal” 已经是 numpy.ndarray 或可以通过 np.asarray 转换为 numpy.ndarray 的对象。我们将在后面的章节中详细了解如何与各种张量类进行互操作,包括一些基于 C++ 的 operator 使用的 GXF Tensor 对象。

代码片段: examples/numpy_native/convolve.py

列表 14 examples/numpy_native/convolve.py

复制
已复制!
            

import os from holoscan.conditions import CountCondition from holoscan.core import Application, Operator, OperatorSpec from holoscan.logger import LogLevel, set_log_level import numpy as np class SignalGeneratorOp(Operator): """Generate a time-varying impulse. Transmits an array of zeros with a single non-zero entry of a specified `height`. The position of the non-zero entry shifts to the right (in a periodic fashion) each time `compute` is called. Parameters ---------- fragment : holoscan.core.Fragment The Fragment (or Application) the operator belongs to. height : number The height of the signal impulse. size : number The total number of samples in the generated 1d signal. dtype : numpy.dtype or str The data type of the generated signal. """ def __init__(self, fragment, *args, height=1, size=10, dtype=np.int32, **kwargs): self.count = 0 self.height = height self.dtype = dtype self.size = size super().__init__(fragment, *args, **kwargs) def setup(self, spec: OperatorSpec): spec.output("signal") def compute(self, op_input, op_output, context): # single sample wide impulse at a time-varying position signal = np.zeros((self.size,), dtype=self.dtype) signal[self.count % signal.size] = self.height self.count += 1 op_output.emit(signal, "signal") class ConvolveOp(Operator): """Apply convolution to a tensor. Convolves an input signal with a "boxcar" (i.e. "rect") kernel. Parameters ---------- fragment : holoscan.core.Fragment The Fragment (or Application) the operator belongs to. width : number The width of the boxcar kernel used in the convolution. unit_area : bool, optional Whether or not to normalize the convolution kernel to unit area. If False, all samples have implitude one and the dtype of the kernel will match that of the signal. When True the sum over the kernel is one and a 32-bit floating point data type is used for the kernel. """ def __init__(self, fragment, *args, width=4, unit_area=False, **kwargs): self.count = 0 self.width = width self.unit_area = unit_area super().__init__(fragment, *args, **kwargs) def setup(self, spec: OperatorSpec): spec.input("signal_in") spec.output("signal_out") def compute(self, op_input, op_output, context): signal = op_input.receive("signal_in") assert isinstance(signal, np.ndarray) if self.unit_area: kernel = np.full((self.width,), 1/self.width, dtype=np.float32) else: kernel = np.ones((self.width,), dtype=signal.dtype) convolved = np.convolve(signal, kernel, mode='same') op_output.emit(convolved, "signal_out") class PrintSignalOp(Operator): """Print the received signal to the terminal.""" def setup(self, spec: OperatorSpec): spec.input("signal") def compute(self, op_input, op_output, context): signal = op_input.receive("signal") print(signal) class ConvolveApp(Application): """Minimal signal processing application. Generates a time-varying impulse, convolves it with a boxcar kernel, and prints the result to the terminal. A `CountCondition` is applied to the generate to terminate execution after a specific number of steps. """ def compose(self): signal_generator = SignalGeneratorOp( self, CountCondition(self, count=24), name="generator", **self.kwargs("generator"), ) convolver = ConvolveOp(self, name="conv", **self.kwargs("convolve")) printer = PrintSignalOp(self, name="printer") self.add_flow(signal_generator, convolver) self.add_flow(convolver, printer) def main(config_file): app = ConvolveApp() # if the --config command line argument was provided, it will override this config_file` app.config(config_file) app.run() if __name__ == "__main__": config_file = os.path.join(os.path.dirname(__file__), 'convolve.yaml') main(config_file=config_file)


代码片段: examples/numpy_native/convolve.yaml

列表 15 examples/numpy_native/convolve.yaml

复制
已复制!
            

signal_generator: height: 1 size: 20 dtype: int32 convolve: width: 4 unit_area: false


在此应用程序中,创建了三个原生 Python operator:SignalGeneratorOpConvolveOpPrintSignalOpSignalGeneratorOp 生成合成信号,例如 [0, 0, 1, 0, 0, 0],其中非零项的位置在每次调用时都会变化。ConvolveOp 使用指定宽度的箱形(即矩形)函数执行一维卷积。PrintSignalOp 只是将接收到的信号打印到终端。

如下文更详细的介绍,每个 operator 的输入都在 operator 的 setup() 方法中指定。然后,输入在 compute 方法中通过 op_input.receive() 接收,输出通过 op_output.emit() 发送。

请注意,对于此处定义的原生 Python operator,可以发送或接收任何 Python 对象。在 operator 之间传输时,传输的是指向对象的共享指针,而不是副本。在某些情况下,例如将同一张量发送到多个下游 operator 时,可能需要避免对张量进行原地操作,以避免 operator 之间可能存在的任何竞争条件。

指定 operator 参数 (Python)

在上面的 SignalGeneratorOp operator 示例中,我们在 operator 的 __init__ 方法中添加了三个关键字参数,这些参数在 operator 的 compose() 方法内部使用,以调整其行为。

复制
已复制!
            

def __init__(self, fragment, *args, width=4, unit_area=False, **kwargs): # Internal counter for the time-dependent signal generation self.count = 0 # Parameters self.width = width self.unit_area = unit_area # To forward remaining arguments to any underlying C++ Operator class super().__init__(fragment, *args, **kwargs)

注意

作为更接近 C++ 的替代方法,这些参数可以通过 operator 的 OperatorSpec 属性在其 setup() 方法中添加,其中必须提供关联的字符串键以及默认值。

复制
已复制!
            

def setup(self, spec: OperatorSpec): spec.param("width", 4) spec.param("unit_area", False)

然后可以在 operator 的方法(包括 initialize()start()compute()stop())中以 self.widthself.unit_area 的形式访问 self 对象上的参数。

其他 kwargs 属性也可以传递给 spec.param,例如 headlinedescription(GXF 应用程序使用)或 kind(当 接收任意数量的输入 (Python) 时使用,自 v2.3.0 起已弃用)。

注意

通过这些方法之一添加的原生 operator 参数的名称不得与基类 Operator 的任何现有属性或方法名称重叠。

请参阅 配置算子参数 部分,了解应用程序如何设置这些参数。

指定 operator 输入和输出 (Python)

要配置 Python 原生 operator 的输入和输出,请在 operator 的 setup() 方法中调用 spec.input()spec.output() 方法。

对于要添加的每个输入和输出,spec.input()spec.output() 方法应调用一次。holoscan.core.OperatorSpec 对象和 setup() 方法将由 Application 类在其 run() 方法被调用时自动初始化和调用。

这些方法 (spec.input()spec.output()) 返回一个 IOSpec 对象,该对象可用于配置输入/输出端口。

默认情况下,holoscan.conditions.MessageAvailableConditionholoscan.conditions.DownstreamMessageAffordableCondition 条件应用于输入/输出端口(min_size1)。这意味着在输入端口上有消息可用并且下游 operator 的输入端口(队列)有足够的容量接收消息之前,不会调用 operator 的 compute() 方法。

复制
已复制!
            

def setup(self, spec: OperatorSpec): spec.input("in") # Above statement is equivalent to: # spec.input("in") # .condition(ConditionType.MESSAGE_AVAILABLE, min_size = 1) spec.output("out") # Above statement is equivalent to: # spec.output("out") # .condition(ConditionType.DOWNSTREAM_MESSAGE_AFFORDABLE, min_size = 1)

在上面的示例中,spec.input() 方法用于配置输入端口以具有 holoscan.conditions.MessageAvailableCondition,最小大小为 1。这意味着在 operator 的输入端口上有消息可用之前,不会调用 operator 的 compute() 方法。同样,spec.output() 方法用于配置输出端口以具有 holoscan.conditions.DownstreamMessageAffordableCondition,最小大小为 1。这意味着在下游 operator 的输入端口有足够的容量接收消息之前,不会调用 operator 的 compute() 方法。

如果要更改此行为,请使用 IOSpec.condition() 方法来配置条件。例如,要配置输入和输出端口不具有任何条件,可以使用以下代码

复制
已复制!
            

from holoscan.core import ConditionType, OperatorSpec # ... def setup(self, spec: OperatorSpec): spec.input("in").condition(ConditionType.NONE) spec.output("out").condition(ConditionType.NONE)

setup() 方法中的示例代码配置输入端口不具有任何条件,这意味着只要 operator 准备好计算,就会调用 compute() 方法。由于不能保证输入端口有可用消息,因此 compute() 方法应在尝试读取输入端口上的消息之前检查是否有可用消息。

receive() 方法的 InputContext 对象可用于访问 operator 类的 compute() 方法中的不同类型的输入数据。此方法将输入端口的名称作为参数(如果 operator 只有一个输入端口,则可以省略)。

对于标准 Python 对象,receive() 将直接返回指定名称的输入的 Python 对象。

Holoscan SDK 还提供了内置数据类型,称为 域对象,在 include/holoscan/core/domain 目录中定义。例如,Tensor 是一个域对象类,用于表示数据的多维数组,可以被 OperatorSpecInputContextOutputContext 直接使用。

提示

holoscan.core.Tensor 类同时支持 DLPack 和 NumPy 的数组接口 (__array_interface____cuda_array_interface__),因此可以与其他 Python 库一起使用,例如 CuPyPyTorchJAXTensorFlowNumba。有关更多详细信息,请参阅 互操作性部分

在这两种情况下,如果输入端口上没有可用消息,它都将返回 None

复制
已复制!
            

# ... def compute(self, op_input, op_output, context): msg = op_input.receive("in") if msg: # Do something with msg

接收任意数量的输入 (Python)

在某些情况下,最好允许在端口上接收任意数量的对象,而不是分配特定数量的输入端口。

使用 IOSpec.ANY_SIZE 进行可变输入处理

实现此目的的一种方法是通过在 operator 的 setup() 方法中调用 spec.input("port_name", IOSpec.ANY_SIZE) 并使用 IOSpec.ANY_SIZE 作为第二个参数来定义多接收器输入端口(如 原生 operator ping 示例中的 PingRxOp 所做的那样)。

复制
已复制!
            

def setup(self, spec: OperatorSpec): spec.input("receivers", size=IOSpec.ANY_SIZE)

代码片段: examples/ping_multi_port/python/ping_multi_port.py

列表 16 examples/ping_multi_port/python/ping_multi_port.py

复制
已复制!
            

class PingRxOp(Operator): """Simple receiver operator. This operator has: input: "receivers" This is an example of a native operator that can dynamically have any number of inputs connected to is "receivers" port. """ def __init__(self, fragment, *args, **kwargs): self.count = 1 # Need to call the base class constructor last super().__init__(fragment, *args, **kwargs) def setup(self, spec: OperatorSpec): # # Since Holoscan SDK v2.3, users can define a multi-receiver input port using # # 'spec.input()' with 'size=IOSpec.ANY_SIZE'. # # The old way is to use 'spec.param()' with 'kind="receivers"'. # spec.param("receivers", kind="receivers") spec.input("receivers", size=IOSpec.ANY_SIZE) def compute(self, op_input, op_output, context): values = op_input.receive("receivers") print(f"Rx message received (count:{self.count}, size:{len(values)})") self.count += 1 print(f"Rx message value1:{values[0].data}") print(f"Rx message value2:{values[1].data}") # Now define a simple application using the operators defined above class MyPingApp(Application): def compose(self): # Define the tx, mx, rx operators, allowing the tx operator to execute 10 times tx = PingTxOp(self, CountCondition(self, 10), name="tx") mx = PingMxOp(self, name="mx", multiplier=3) rx = PingRxOp(self, name="rx") # Define the workflow self.add_flow(tx, mx, {("out1", "in1"), ("out2", "in2")}) self.add_flow(mx, rx, {("out1", "receivers"), ("out2", "receivers")})


然后,一旦在 compose() 方法中提供了以下配置,

复制
已复制!
            

self.add_flow(mx, rx, {("out1", "receivers"), ("out2", "receivers")})

PingRxOp 将在 compute() 方法中的 receivers 端口上接收两个输入

复制
已复制!
            

values = op_input.receive("receivers")

提示

当输入端口使用 IOSpec.ANY_SIZE 定义时,框架会为端口上接收到的每个输入对象创建一个新的输入端口。输入端口使用格式 <port_name>:<index> 命名,其中 <port_name> 是输入端口的名称,<index> 是端口上接收到的输入对象的索引。例如,如果 receivers 端口接收到两个输入对象,则输入端口将命名为 receivers:0receivers:1

框架在内部创建一个类型为 std::vector<holoscan::IOSpec*> 的参数 (receivers),隐式创建输入端口 (receivers:0receivers:1),并将它们连接起来(将输入端口的引用添加到 receivers 向量)。这样,当调用 receive() 方法时,框架可以将来自相应输入端口的输入数据作为元组返回。

复制
已复制!
            

values = op_input.receive("receivers")

如果在 compose() 方法的末尾添加 print(rx.description),您将看到 PingRxOp operator 的描述,如下所示

复制
已复制!
            

id: -1 name: rx fragment: "" args: [] type: kNative conditions: [] resources: [] spec: fragment: "" params: - name: receivers type: std::vector<holoscan::IOSpec*> description: "" flag: kNone inputs: - name: receivers:1 io_type: kInput typeinfo_name: N8holoscan3gxf6EntityE connector_type: kDefault conditions: [] - name: receivers:0 io_type: kInput typeinfo_name: N8holoscan3gxf6EntityE connector_type: kDefault conditions: [] - name: receivers io_type: kInput typeinfo_name: N8holoscan3gxf6EntityE connector_type: kDefault conditions: [] outputs: []

配置输入端口队列大小和消息批处理条件 (Python)

如果您想在一个端口上接收多个对象并批量处理它们,您可以增加输入端口的队列大小,并将 MessageAvailableCondition 条件的 min_size 参数设置为所需的批次大小。这可以通过调用带有期望参数的 connector()condition() 方法来完成,分别使用批次大小作为 capacitymin_size 参数。

min_size 设置为 N 将确保操作器在 compute() 方法被调用之前接收到 N 个对象。

复制
已复制!
            

def setup(self, spec: OperatorSpec): spec.input("receivers").connector(IOSpec.ConnectorType.DOUBLE_BUFFER, capacity=2).condition( ConditionType.MESSAGE_AVAILABLE, min_size=2 )

然后,可以调用带有 receivers 端口名称的 receive() 方法来批量接收输入数据。

复制
已复制!
            

def compute(self, op_input, op_output, context): values = [] value = op_input.receive("receivers") while value: values.append(value) value = op_input.receive("receivers") print(f"Rx message received (size:{len(values)})")

在上面的示例中,operator 在名为 “receivers” 的端口上接收输入,队列大小为 2,min_size 为 2。在循环中调用 receive() 方法以批量接收输入数据(批大小为 2)。由于 operator 事先不知道要接收的对象数量,因此循环调用 receive() 方法,直到它返回 None 值。输入数据存储在一个列表中,并在接收到所有输入数据后记录列表的大小。

为了简化上述代码,Holoscan SDK 提供了 IOSpec.PRECEDING_COUNT 常量作为 spec.input() 方法的第二个参数,以指定输入端口的先前连接数(在本例中,与 receivers 端口的连接数为 2)作为批处理大小。这可用于批量接收输入数据,而无需在循环中调用 receive() 方法。

复制
已复制!
            

def setup(self, spec: OperatorSpec): spec.input("receivers", size=IOSpec.PRECEDING_COUNT)

然后,可以调用带有 receivers 端口名称的 receive() 方法来批量接收输入数据。

复制
已复制!
            

def compute(self, op_input, op_output, context): values = op_input.receive("receivers") print(f"Rx message received (size:{len(values)})") print(f"Rx message value1:{values[0].data}") print(f"Rx message value2:{values[1].data}")

在上面的示例中,operator 在名为 “receivers” 的端口上接收输入,批处理大小为 2。调用 receive() 方法并使用 receivers 端口名称以批量接收输入数据(批大小为 2)。输入数据存储在一个元组中,并在接收到所有输入数据后记录元组的大小。

如果要使用特定的批处理大小,可以使用 holoscan.IOSpec.IOSize(size : int) 而不是 holoscan.IOSpec.PRECEDING_COUNT 来指定批处理大小。以这种方式使用 IOSize 等效于更冗长的 condition()connector() 调用,以更新本节开头附近显示的 capacitymin_size 参数。

使用 IOSize 的简短形式,而不是 condition()connector() 方法的主要原因是,如果需要进行额外的参数更改,例如队列策略。有关 condition()connector() 方法的使用的更多详细信息,请参阅下面的高级主题部分(进一步自定义输入和输出)。

复制
已复制!
            

def setup(self, spec: OperatorSpec): spec.input("receivers", size=IOSpec.IOSize(2))

如果要逐个接收输入数据,可以调用带有 kind="single" 参数的 receive() 方法。

复制
已复制!
            

def compute(self, op_input, op_output, context): while True: value = op_input.receive("receivers", kind="single") if value is None: break # Process the input data print(f"Rx message received (value:{value.data})")

上面的代码将从 receivers 端口逐个接收输入数据。在循环中调用 receive() 方法,直到它返回 None 值。输入数据存储在一个变量中,并记录输入数据的值。

注意

此方法(逐个接收输入数据)不适用于 IOSpec.ANY_SIZE 情况。使用 IOSpec.ANY_SIZE 参数,框架会为内部接收到的每个输入对象创建一个新的输入端口。每个隐式输入端口(使用格式 <port_name>:<index> 命名)都与 MessageAvailableCondition 条件相关联,该条件的 min_size1。因此,不能使用 kind="single" 关键字参数调用 receive() 方法来逐个接收输入数据。相反,对于 IOSpec.ANY_SIZE 情况,可以不使用任何 kind 参数或使用 kind="multi" 参数调用它。

如果确实需要为 IOSpec.ANY_SIZE 情况逐个接收输入数据(尽管不建议这样做),则可以使用不带 kind 参数的 receive() 方法从每个隐式输入端口(命名为 <port_name>:<index>)逐个接收输入数据。(例如,op_input.receive("receivers:0")op_input.receive("receivers:1") 等)。为了避免在为隐式输入端口调用 receive() 方法时出现错误消息(例如 The operator does not have an input port with label 'receivers:X'),您需要预先计算与 receivers 端口的连接数,并相应地为每个隐式输入端口调用 receive() 方法。

复制
已复制!
            

def compute(self, op_input, op_output, context): input_count = len(self.spec.inputs) - 1 # -1 to exclude the 'receivers' input port for i in range(input_count): value = op_input.receive(f"receivers:{i}") if value is None: break # Process the input data print(f"Rx message received (value:{value.data})")

注意

在上面的示例中,使用 IOSpec.PRECEDING_COUNTIOSpec.IOSize(2) 似乎显示出与 IOSpec.ANY_SIZE 相同的行为。但是,区别在于,由于 IOSpec.PRECEDING_COUNTIOSpec.IOSize(2) 不为每个(内部)输入端口使用单独的 MessageAvailableCondition 条件,因此不能保证 operator 将按顺序接收输入数据。

这意味着操作器接收输入数据的顺序可能与 compose() 方法中建立连接的顺序不同。此外,使用多线程调度器,不能保证操作器将均匀地接收来自每个连接的输入数据。操作器可能从一个连接接收到比另一个连接更多的输入数据。

如果输入数据的顺序很重要,建议使用 IOSpec.ANY_SIZE 并调用 receive() 方法一次性批量接收输入数据。

有关在 Python operator 中接收多个输入的更多示例,请参阅 Python 系统测试用例

C++ operator 的 Python 封装

从 Python 使用封装以 C++ 开发的 operator 在关于 创建 C++ operator Python 绑定的单独章节中介绍。

提示

从 Holoscan 2.1 开始,有一个 GXFCodeletOp 类,可用于从 Python 轻松封装现有的 GXF codelet,而无需首先为其编写底层的 C++ 封装类。类似地,现在还有一个 GXFComponentResource 类,允许将 GXF Component 用作 Python 应用程序中的 Holoscan 资源。在 examples/import_gxf_components 文件夹中,为 Python 应用程序提供了如何使用这些类的详细示例。

封装的 operator 和原生 Python operator 之间的互操作性

GXF operator 和原生 C++ operator 之间的互操作性 部分所述,可以使用保存张量的 holoscan::TensorMap 消息将 holoscan::Tensor 对象传递给 GXF operator。在 Python 中,这通过发送 dict 类型对象来完成,这些对象将张量名称作为键,并将 holoscan Tensor 或类似数组的对象作为值。类似地,当传输单个 holoscan::Tensor 的封装 C++ operator 连接到 Python 原生 operator 的输入端口时,在该端口上调用 op_input.receive() 将返回包含单个项的 Python dict。该项的键是张量名称,其值是相应的 holoscan.core.Tensor

考虑以下示例,其中 VideoStreamReplayerOpHolovizOp 是 Python 封装的 C++ operator,而 ImageProcessingOp 是 Python 原生 operator。

graphviz-8cdb373b9b4010a5e5ba0fd13bbf5f2d32b72fc9.png

图 19 Python 原生 operator 和基于 C++ 的 Python GXF operator 之间的张量互操作性

以下代码显示了如何将 ImageProcessingOpcompute() 方法实现为与 C++ operator 通信的 Python 原生 operator。

列表 17 examples/tensor_interop/python/tensor_interop.py

复制
已复制!
            

def compute(self, op_input, op_output, context): # in_message is a dict of tensors in_message = op_input.receive("input_tensor") # smooth along first two axes, but not the color channels sigma = (self.sigma, self.sigma, 0) # out_message will be a dict of tensors out_message = dict() for key, value in in_message.items(): print(f"message received (count:{self.count})") self.count += 1 cp_array = cp.asarray(value) # process cp_array cp_array = ndi.gaussian_filter(cp_array, sigma) out_message[key] = cp_array op_output.emit(out_message, "output_tensor")


  • op_input.receive() 方法调用返回一个 dict 对象。

  • holoscan.core.Tensor 对象通过使用 cupy.asarray() 方法调用转换为 CuPy 数组。

  • CuPy 数组用作 ndi.gaussian_filter() 函数调用的输入,参数为 sigmandi.gaussian_filter() 函数调用的结果是一个 CuPy 数组。

  • 最后,创建一个新的 dict 对象 out_message,以便通过 op_output.emit() 发送到下一个 operator。CuPy 数组 cp_array 被添加到其中,键是张量名称。CuPy 数组不必显式转换为 holocan.core.Tensor 对象,因为它们实现了 DLPack(和 __cuda__array_interface__)接口。

注意

examples/tensor_interop/python 目录中,可以找到支持与 Python 封装的 C++ operator 互操作的 Python 原生 operator 的完整示例。

您可以将多个张量添加到单个 dict 对象中,如下例所示

Operator 发送消息

复制
已复制!
            

out_message = { "video": output_array, "labels": labels, "bbox_coords": bbox_coords, } # emit the tensors op_output.emit(out_message, "outputs")

Operator 接收消息,假设上面的 outputs 端口连接到下面的 inputs 端口,并且 add_flow() 具有相应的张量

复制
已复制!
            

in_message = op_input.receive("inputs") # Tensors and tensor names video_tensor = in_message["video"] labels_tensor = in_message["labels"] bbox_coords_tensor = in_message["bbox_coords"]

注意

一些现有的 operator 允许配置它们发送/接收的张量的名称。一个例子是 HolovizOptensors 参数,其中每个张量的名称都映射到 Entity 中的张量名称(请参阅 apps/endoscopy_tool_tracking/python/endoscopy_tool_tracking.yaml 中的 holoviz 条目)。

examples/holoviz/python 目录中,可以找到一个 Python 原生 operator 的完整示例,该 operator 将多个张量发送到下游 C++ operator。

对于通过 UCX 连接发送/接收张量对象,存在张量类型的特殊序列化代码,该代码避免将张量数据复制到中间缓冲区。对于分布式应用程序,我们不能像在单个 fragment 应用程序中的 operator 之间那样直接发送 Python 对象,而是需要将其强制转换为 holoscan::Tensor 以使用特殊的零拷贝代码路径。但是,我们还会传输一个标头,指示类型最初是否是其他类似数组的对象,并尝试在另一端再次返回相同的类型,以便行为与非分布式情况更相似。

发送的对象

接收的对象

holoscan.Tensor holoscan.Tensor
类似数组的 dict holoscan.Tensor 的 dict
主机类似数组的对象(具有 __array_interface__ numpy.ndarray
设备类似数组的对象(具有 __cuda_array_interface__ cupy.ndarray

这避免了 NumPy 或 CuPy 数组通过 cloudpickle 序列化为字符串,以便可以有效地传输它们,并在另一端再次返回相同的类型。值得一提的是,如果发送的类型是例如发送端的 PyTorch 主机/设备张量,则接收到的值将是 numpy/cupy 数组,因为任何实现接口的对象都会返回这些类型。

自动 operator 类创建

Holoscan 还提供了一个 holoscan.decorator 模块,该模块提供了通过向现有函数或类添加装饰器来自动生成 Operator 的方法。请参阅关于 通过 holoscan.decorator.create_op 创建 operator 的单独章节。

进一步自定义输入和输出

本节补充了上述关于基本输入和输出端口配置的信息,这些信息在 C++ 和 Python 操作器创建指南中分别给出。此处描述的概念对于 C++ 或 Python API 都是相同的。

默认情况下,操作器的输入和输出端口都将使用双缓冲队列,该队列的容量为一条消息,并且策略设置为:如果队列已满时有消息到达,则报错。对于每个输入端口,操作器上会自动放置一个 MessageAvailableConditionC++/Python)条件,以便在每个端口都有单条消息可用之前,不会调用 compute 方法。类似地,每个输出端口都有一个 DownstreamMessageAffordableConditionC++/Python)条件,该条件会阻止操作器调用 compute,直到任何连接的下游操作器在其接收器队列中都有空间容纳单条消息。这些默认条件确保消息永远不会在队列已满时到达,并且在调用 compute 方法时,消息已经被接收。这些默认条件使得连接每个操作器依次调用 compute 的管道相对容易,但可能不适用于所有应用。本节介绍如何根据请求覆盖默认行为。

可以通过 HOLOSCAN_QUEUE_POLICY 环境变量修改全局默认队列策略。有效选项(不区分大小写)包括

  • “pop”:当队列已满时,到达的新项将替换最旧的项

  • “reject”:当队列已满时,到达的新项将被丢弃

  • “fail”:如果队列已满时有新项到达,则终止应用程序

当未指定 HOLOSCAN_QUEUE_POLICY 时,默认行为为“fail”。如果操作器的 setup 方法通过 connectorC++/Python)方法显式设置了接收器或发送器(如下所述),则该连接器的策略不会被默认策略覆盖。

注意

覆盖操作器端口属性是一个高级主题。开发者可能希望跳过本节,直到他们遇到默认行为不足以满足其应用需求的情况。

要覆盖给定端口使用的队列属性,可以使用 connectorC++/Python)方法,如下例所示。此示例还展示了如何使用 conditionC++/Python)方法来更改端口在操作器上放置的条件类型。通常,当操作器有多个条件时,它们会进行“与”组合,因此在操作器可以调用 compute 之前,所有端口上的条件都必须满足。

考虑以下来自操作器的 holoscan::Operator::setup() 方法中的代码。

复制
已复制!
            

spec.output<TensorMap>("out1") spec.output<TensorMap>("out2").condition(ConditionType::kNone); spec.output<TensorMap>("in") .connector(IOSpec::ConnectorType::kDoubleBuffer, Arg("capacity", static_cast<uint64_t>(2)), Arg("policy", static_cast<uint64_t>(1))) // 0=pop, 1=reject, 2=fault (default) .condition(ConditionType::kMessageAvailable, Arg("min_size", static_cast<uint64_t>(2)), Arg("front_stage_max_size", static_cast<size_t>(2)));

这将定义

  • 一个名为“out1”的输出端口,具有默认属性

  • 一个名为“out2”的输出端口,它仍然具有默认连接器(一个 holoscan::gxf::DoubleBufferTransmitter),但通过设置 ConditionType::kNone 移除了 ConditionType::kDownstreamMessageAffordable 的默认条件。这表明操作器在调用 compute 之前,不会检查“out2”下游的任何端口的接收器队列中是否有可用空间。

  • 一个名为“in”的输入端口,其连接器和条件都具有与默认值不同的参数。例如,队列大小增加到 2,policy=1 是“reject”,表示如果消息到达时队列已满,则该消息将被拒绝,而保留队列中已有的消息。

考虑以下来自操作器的 holoscan::Operator::setup() 方法中的代码。

复制
已复制!
            

spec.output("out1") spec.output("out2").condition(ConditionType.NONE) spec.input("in").connector( IOSpec.ConnectorType.DOUBLE_BUFFER, capacity=2, policy=1, # 0=pop, 1=reject, 2=fault (default) ).condition(ConditionType.MESSAGE_AVAILABLE, min_size=2, front_stage_max_size=2)

这将定义

  • 一个名为“out1”的输出端口,具有默认属性

  • 一个名为“out2”的输出端口,它仍然具有默认连接器(一个 holoscan.resources.DoubleBufferTransmitter),但通过设置 ConditionType.NONE 移除了 ConditionType.DOWNSTREAM_MESSAGE_AFFORDABLE 的默认条件。这表明操作器在调用 compute 之前,不会检查“out2”下游的任何端口的接收器队列中是否有可用空间。

  • 一个名为“in1”的输入端口,其连接器和条件都具有与默认值不同的参数。例如,队列大小增加到 2,policy=1 是“reject”,表示如果消息到达时队列已满,则该消息将被拒绝,而保留队列中已有的消息。

要了解有关覆盖连接器和/或条件的更多信息,请参阅 multi_branch_pipeline 示例,该示例覆盖了默认条件,以允许管道的两个分支以不同的帧速率运行。在 此 Python 队列策略测试应用程序 中,也有增加可用队列大小的示例。

将 Holoscan SDK 与其他库一起使用

Holoscan SDK 能够与各种强大的 GPU 加速库无缝集成,以构建高效、高性能的管道。

有关 Holoscan 的张量互操作性以及在管道中处理 CUDA 库的详细示例和更多信息,请参阅 HoloHub 仓库 中的 将外部库集成到 Holoscan 管道的最佳实践 教程。这包括用于 Python 的 CUDA PythonCuPy,用于 C++ 的 MatX,以及用于集成到 Holoscan 应用的 cuCIMCV-CUDAOpenCV

上一页 打包 Holoscan 应用程序
下一页 创建条件
© 版权所有 2022-2024,NVIDIA。 上次更新时间:2025 年 1 月 27 日。