创建算子
在 ping_custom_op 示例中也演示了如何创建自定义算子。
在组装 C++ 应用程序时,可以使用两种类型的算子
原生 C++ 算子:在 C++ 中定义的自定义算子,不使用 GXF API,通过创建
holoscan::Operator
的子类来实现。这些 C++ 算子可以在算子之间传递任意 C++ 对象。GXF 算子:通过从
holoscan::ops::GXFOperator
类继承,在底层 C++ 库中定义的算子。这些算子包装了来自 GXF 扩展的 GXF 代码小组件。例如,VideoStreamReplayerOp
用于重放视频文件,FormatConverterOp
用于格式转换,以及HolovizOp
用于可视化。
可以使用 GXF 算子和原生算子的混合来创建应用程序。在这种情况下,必须特别注意适当地转换输入和输出张量,如下面的章节所示。
原生 C++ 算子
算子生命周期 (C++)
holoscan::Operator
的生命周期由三个阶段组成
start()
在算子启动时调用一次,用于初始化繁重的任务,例如分配内存资源和使用参数。compute()
在算子被触发时调用,这可以在算子生命周期中的start()
和stop()
之间发生任意次数。stop()
在算子停止时调用一次,用于反初始化繁重的任务,例如释放先前在start()
中分配的资源。
工作流程上的所有算子都安排执行。当一个算子首次执行时,将调用 start()
方法,然后调用 compute()
方法。当算子停止时,将调用 stop()
方法。compute()
方法在 start()
和 stop()
之间被多次调用。
如果由 条件 指定的任何调度条件未满足(例如,如果算子已执行一定次数,则 CountCondition
会导致调度条件不满足),则算子停止并调用 stop()
方法。
我们将在用户指南的 指定算子输入和输出 (C++) 部分介绍如何使用 条件。
通常,start()
和 stop()
函数在应用程序的生命周期中仅调用一次。但是,如果再次满足调度条件,则可以再次调度算子执行,并且将再次调用 start()
方法。

图 15 Holoscan 算子生命周期中方法调用的顺序
如果要为此 C++ 算子创建 Python 绑定,建议将 initialize()
和/或 start()
方法中分配的任何资源清理放入算子的 stop()
方法中,而不是在其析构函数中。这是必要的,因为目前存在一个问题,即不保证析构函数总是在 Python 应用程序终止之前被调用。stop()
方法将始终被显式调用,因此我们可以确信任何清理都会按预期发生。
我们可以通过实现上述方法来覆盖算子的默认行为。以下示例展示了如何实现一个自定义算子,该算子覆盖了 start、stop 和 compute 方法。
列表 2 Holoscan 算子的基本结构 (C++)
#include "holoscan/holoscan.hpp"
using holoscan::Operator;
using holoscan::OperatorSpec;
using holoscan::InputContext;
using holoscan::OutputContext;
using holoscan::ExecutionContext;
using holoscan::Arg;
using holoscan::ArgList;
class MyOp : public Operator {
public:
HOLOSCAN_OPERATOR_FORWARD_ARGS(MyOp)
MyOp() = default;
void setup(OperatorSpec& spec) override {
}
void start() override {
HOLOSCAN_LOG_TRACE("MyOp::start()");
}
void compute(InputContext&, OutputContext& op_output, ExecutionContext&) override {
HOLOSCAN_LOG_TRACE("MyOp::compute()");
};
void stop() override {
HOLOSCAN_LOG_TRACE("MyOp::stop()");
}
};
创建自定义算子 (C++)
要在 C++ 中创建自定义算子,必须创建 holoscan::Operator
的子类。以下示例演示了如何使用原生算子(没有底层预编译 GXF Codelet 的算子)。
代码片段: examples/ping_multi_port/cpp/ping_multi_port.cpp
列表 3 examples/ping_multi_port/cpp/ping_multi_port.cpp
#include "holoscan/holoscan.hpp"
class ValueData {
public:
ValueData() = default;
explicit ValueData(int value) : data_(value) {
HOLOSCAN_LOG_TRACE("ValueData::ValueData(): {}", data_);
}
~ValueData() { HOLOSCAN_LOG_TRACE("ValueData::~ValueData(): {}", data_); }
void data(int value) { data_ = value; }
int data() const { return data_; }
private:
int data_;
};
namespace holoscan::ops {
class PingTxOp : public Operator {
public:
HOLOSCAN_OPERATOR_FORWARD_ARGS(PingTxOp)
PingTxOp() = default;
void setup(OperatorSpec& spec) override {
spec.output<std::shared_ptr<ValueData>>("out1");
spec.output<std::shared_ptr<ValueData>>("out2");
}
void compute(InputContext&, OutputContext& op_output, ExecutionContext&) override {
auto value1 = std::make_shared<ValueData>(index_++);
op_output.emit(value1, "out1");
auto value2 = std::make_shared<ValueData>(index_++);
op_output.emit(value2, "out2");
};
int index_ = 1;
};
class PingMxOp : public Operator {
public:
HOLOSCAN_OPERATOR_FORWARD_ARGS(PingMxOp)
PingMxOp() = default;
void setup(OperatorSpec& spec) override {
spec.input<std::shared_ptr<ValueData>>("in1");
spec.input<std::shared_ptr<ValueData>>("in2");
spec.output<std::shared_ptr<ValueData>>("out1");
spec.output<std::shared_ptr<ValueData>>("out2");
spec.param(multiplier_, "multiplier", "Multiplier", "Multiply the input by this value", 2);
}
void compute(InputContext& op_input, OutputContext& op_output, ExecutionContext&) override {
auto value1 = op_input.receive<std::shared_ptr<ValueData>>("in1").value();
auto value2 = op_input.receive<std::shared_ptr<ValueData>>("in2").value();
HOLOSCAN_LOG_INFO("Middle message received (count: {})", count_++);
HOLOSCAN_LOG_INFO("Middle message value1: {}", value1->data());
HOLOSCAN_LOG_INFO("Middle message value2: {}", value2->data());
// Multiply the values by the multiplier parameter
value1->data(value1->data() * multiplier_);
value2->data(value2->data() * multiplier_);
op_output.emit(value1, "out1");
op_output.emit(value2, "out2");
};
private:
int count_ = 1;
Parameter<int> multiplier_;
};
class PingRxOp : public Operator {
public:
HOLOSCAN_OPERATOR_FORWARD_ARGS(PingRxOp)
PingRxOp() = default;
void setup(OperatorSpec& spec) override {
// // Since Holoscan SDK v2.3, users can define a multi-receiver input port using 'spec.input()'
// // with 'IOSpec::kAnySize'.
// // The old way is to use 'spec.param()' with 'Parameter<std::vector<IOSpec*>> receivers_;'.
// spec.param(receivers_, "receivers", "Input Receivers", "List of input receivers.", {});
spec.input<std::vector<std::shared_ptr<ValueData>>>("receivers", IOSpec::kAnySize);
}
void compute(InputContext& op_input, OutputContext&, ExecutionContext&) override {
auto value_vector =
op_input.receive<std::vector<std::shared_ptr<ValueData>>>("receivers").value();
HOLOSCAN_LOG_INFO("Rx message received (count: {}, size: {})", count_++, value_vector.size());
HOLOSCAN_LOG_INFO("Rx message value1: {}", value_vector[0]->data());
HOLOSCAN_LOG_INFO("Rx message value2: {}", value_vector[1]->data());
};
private:
// // Since Holoscan SDK v2.3, the following line is no longer needed.
// Parameter<std::vector<IOSpec*>> receivers_;
int count_ = 1;
};
} // namespace holoscan::ops
class MyPingApp : public holoscan::Application {
public:
void compose() override {
using namespace holoscan;
// Define the tx, mx, rx operators, allowing the tx operator to execute 10 times
auto tx = make_operator<ops::PingTxOp>("tx", make_condition<CountCondition>(10));
auto mx = make_operator<ops::PingMxOp>("mx", Arg("multiplier", 3));
auto rx = make_operator<ops::PingRxOp>("rx");
// Define the workflow
add_flow(tx, mx, {{"out1", "in1"}, {"out2", "in2"}});
add_flow(mx, rx, {{"out1", "receivers"}, {"out2", "receivers"}});
}
};
int main(int argc, char** argv) {
auto app = holoscan::make_application<MyPingApp>();
app->run();
return 0;
}
在此应用程序中,创建了三个算子:PingTxOp
、PingMxOp
和 PingRxOp
PingTxOp
算子是一个源算子,每次调用时都会发出两个值。这些值在两个不同的输出端口out1
(用于奇数整数)和out2
(用于偶数整数)上发出。PingMxOp
算子是一个中间算子,它从PingTxOp
算子接收两个值,并在两个不同的输出端口上发出两个值。这些值乘以multiplier
参数。PingRxOp
算子是一个接收器算子,它从PingMxOp
算子接收两个值。这些值在单个输入receivers
上接收,receivers
是输入端口的向量。PingRxOp
算子按照PingMxOp
算子发出的顺序接收这些值。
如下文更详细的介绍,每个算子的输入都在算子的 setup()
方法中指定。然后,在 compute()
方法中通过 op_input.receive()
接收输入,并通过 op_output.emit()
发出输出。
请注意,对于此处定义的原生 C++ 算子,可以发出或接收任何对象,包括共享指针。对于大型对象(如张量),从性能角度来看,最好传输指向该对象的共享指针,而不是复制。当使用共享指针并且同一个张量发送到多个下游算子时,应避免对张量进行原地操作,否则可能会发生算子之间的竞争条件。
如果需要在初始化算子之前或之后配置参数或执行其他设置任务,可以覆盖 initialize()
方法。此方法在 start()
方法之前调用一次。
示例
void initialize() override {
// Register custom type and codec for serialization
register_converter<std::array<float, 3>>();
register_codec<std::vector<InputSpec>>("std::vector<holoscan::ops::HolovizOp::InputSpec>", true);
// Set up prerequisite parameters before calling Operator::initialize()
auto frag = fragment();
// Check if an argument for 'allocator' exists
auto has_allocator = std::find_if(
args().begin(), args().end(), [](const auto& arg) { return (arg.name() == "allocator"); });
// Create the allocator if no argument is provided
if (has_allocator == args().end()) {
allocator_ = frag->make_resource<UnboundedAllocator>("allocator");
add_arg(allocator_.get());
}
// Call the parent class's initialize() method to complete the initialization.
// Operator::initialize must occur after all arguments have been added.
Operator::initialize();
// After Operator::initialize(), the operator is ready for use and the parameters are set
int multiplier = multiplier_;
HOLOSCAN_LOG_INFO("Multiplier: {}", multiplier);
}
有关 register_converter()
和 register_codec()
方法的详细信息,请参阅自定义参数类型的 holoscan::ComponentBase::register_converter()
和分布式应用程序的对象序列化部分 object serialization。
指定算子参数 (C++)
在上面的 holoscan::ops::PingMxOp
算子示例中,您有一个参数 multiplier
,它声明为类的一部分,作为私有成员,使用 param()
模板类型
Parameter<int> multiplier_;
然后将其添加到算子的 OperatorSpec
属性的 setup()
方法中,其中必须提供关联的字符串键。还可以提及其他属性,例如描述和默认值
// Provide key, and optionally other information
spec.param(multiplier_, "multiplier", "Multiplier", "Multiply the input by this value", 2);
如果您的参数是自定义类型,则必须注册该类型并提供 YAML 编码器/解码器,如 holoscan::ComponentBase::register_converter()
下的文档所述
请参阅 配置算子参数 部分,了解应用程序如何设置这些参数。
指定算子输入和输出 (C++)
要配置 C++ 原生算子的输入和输出,请在算子的 setup()
方法中调用 spec.input()
和 spec.output()
方法。
对于要添加的每个输入和输出,spec.input()
和 spec.output()
方法应调用一次。OperatorSpec
对象和 setup()
方法将由 Application
类在其 run()
方法被调用时自动初始化和调用。
这些方法 (spec.input()
和 spec.output()
) 返回一个 IOSpec
对象,该对象可用于配置输入/输出端口。
默认情况下,holoscan::MessageAvailableCondition
和 holoscan::DownstreamMessageAffordableCondition
条件(min_size
为 1
)应用于输入/输出端口。这意味着在输入端口上有消息可用且下游算子的输入端口(队列)有足够的容量接收消息之前,不会调用算子的 compute()
方法。
void setup(OperatorSpec& spec) override {
spec.input<std::shared_ptr<ValueData>>("in");
// Above statement is equivalent to:
// spec.input<std::shared_ptr<ValueData>>("in")
// .condition(ConditionType::kMessageAvailable, Arg("min_size") = static_cast<uint64_t>(1));
spec.output<std::shared_ptr<ValueData>>("out");
// Above statement is equivalent to:
// spec.output<std::shared_ptr<ValueData>>("out")
// .condition(ConditionType::kDownstreamMessageAffordable, Arg("min_size") = static_cast<uint64_t>(1));
...
}
在上面的示例中,spec.input()
方法用于配置输入端口,使其具有最小大小为 1 的 holoscan::MessageAvailableCondition
。这意味着在算子的输入端口上有消息可用之前,不会调用算子的 compute()
方法。类似地,spec.output()
方法用于配置输出端口,使其具有最小大小为 1 的 holoscan::DownstreamMessageAffordableCondition
。这意味着在下游算子的输入端口有足够的容量接收消息之前,不会调用算子的 compute()
方法。
如果要更改此行为,请使用 IOSpec::condition()
方法来配置条件。例如,要将输入和输出端口配置为没有条件,可以使用以下代码
void setup(OperatorSpec& spec) override {
spec.input<std::shared_ptr<ValueData>>("in")
.condition(ConditionType::kNone);
spec.output<std::shared_ptr<ValueData>>("out")
.condition(ConditionType::kNone);
// ...
}
setup()
方法中的示例代码将输入端口配置为没有条件,这意味着只要算子准备好计算,就会调用 compute()
方法。由于不保证输入端口有消息可用,因此 compute()
方法应在尝试读取输入端口上的消息之前检查是否有消息可用。
receive()
方法的 InputContext
对象可用于访问操作符类的 compute()
方法中的不同类型的输入数据,其中其模板参数 (DataT
) 是输入数据的数据类型。此方法将输入端口的名称作为参数(如果您的算子只有一个输入端口,则可以省略),并返回输入数据。如果输入数据不可用,则该方法返回 holoscan::expected<std::shared_ptr<ValueData>, holoscan::RuntimeError>
类型的对象。holoscan::expected<T, E>
类模板用于表示预期对象,该对象可以保存 T
类型的值或 E
类型的错误。预期对象用于以比使用错误代码或异常更结构化的方式返回和传播错误。在这种情况下,预期对象可以保存 std::shared_ptr<ValueData>
对象或 holoscan::RuntimeError
类,其中包含描述失败原因的错误消息。
holoscan::RuntimeError
类是 std::runtime_error
的派生类,并支持访问更多错误信息,例如,使用 what()
方法。
在下面的示例代码片段中,PingRxOp
算子在名为“in”的端口上接收输入,数据类型为 std::shared_ptr<ValueData>
。receive()
方法用于访问输入数据。使用 if
条件检查 maybe_value
是否有效。如果输入数据中存在错误,则记录错误消息,并且算子抛出错误。如果输入数据有效,我们可以使用 expected
对象的 value()
方法访问输入数据的引用。为了避免复制输入数据(或创建另一个共享指针),输入数据的引用存储在 value
变量中(使用 auto& value = maybe_value.value()
)。然后调用 ValueData
类的 data()
方法来获取输入数据的值。
// ...
class PingRxOp : public holoscan::Operator {
public:
HOLOSCAN_OPERATOR_FORWARD_ARGS(PingRxOp)
PingRxOp() = default;
void setup(holoscan::OperatorSpec& spec) override {
spec.input<std::shared_ptr<ValueData>>("in");
}
void compute(holoscan::InputContext& op_input, holoscan::OutputContext&,
holoscan::ExecutionContext&) override {
auto maybe_value = op_input.receive<std::shared_ptr<ValueData>>("in");
if (!maybe_value) {
HOLOSCAN_LOG_ERROR("Failed to receive message - {}", maybe_value.error().what());
// [error] Failed to receive message - InputContext receive() Error: No message is received from the input port with name 'in'
throw maybe_value.error(); // or `return;`
}
auto& value = maybe_value.value();
HOLOSCAN_LOG_INFO("Message received (value: {})", value->data());
}
};
在内部,Holoscan 中的消息传递是使用 Message
类实现的,该类包装了一个 std::any
对象,并提供了一个类型安全的接口来访问输入数据。std::any
类是任何类型的单个值的类型安全容器,用于存储算子的输入和输出数据。std::any
类是 C++ 标准库的一部分,并在 any
头文件中定义。
由于 Holoscan SDK 使用 GXF 作为执行引擎,因此当在 Holoscan 原生算子和 GXF 算子之间传递数据时,holoscan::Message
对象也封装在 nvidia::gxf::Entity
对象中。这确保了数据与 GXF 框架兼容。
如果输入数据预计来自 GXF 算子或张量(在这两种情况下,数据都是 nvidia::gxf::Entity
的实例),则可以在 receive
方法的模板参数中使用 holoscan::gxf::Entity
类来访问输入数据。holoscan::gxf::Entity
类是 nvidia::gxf::Entity
类(类似于字典对象)的包装器,并提供了一种获取张量和向实体添加张量的方法。
Holoscan SDK 提供了称为 域对象 的内置数据类型,这些类型在 include/holoscan/core/domain
目录中定义。例如,holoscan::Tensor
是一个域对象类,它表示数据的多维数组,并且可以与底层 GXF 类 (nvidia::gxf::Tensor
) 互操作。holoscan::Tensor
类提供了访问张量数据、形状和其他属性的方法。支持将 holoscan::Tensor
对象传递到 GXF 算子和从 GXF 算子传递 holoscan::Tensor
对象。
holoscan::Tensor
类是包装 DLManagedTensorContext
结构体的包装器,该结构体保存 DLManagedTensor 对象。因此,它提供了访问张量数据的主要接口,并且可以与其他支持 DLPack 接口的框架互操作。
有关更多详细信息,请参阅 互操作性部分。
在下面的示例中,TensorRx 算子在名为“in”的端口上接收输入,数据类型为 holoscan::gxf::Entity
。
// ...
class TensorRxOp : public holoscan::Operator {
public:
HOLOSCAN_OPERATOR_FORWARD_ARGS(TensorRxOp)
TensorRxOp() = default;
void setup(holoscan::OperatorSpec& spec) override {
spec.input<holoscan::gxf::Entity>("in");
}
void compute(holoscan::InputContext& op_input, holoscan::OutputContext&,
holoscan::ExecutionContext&) override {
// Type of 'maybe_entity' is holoscan::expected<holoscan::gxf::Entity, holoscan::RuntimeError>
auto maybe_entity = op_input.receive<holoscan::gxf::Entity>("in");
if (maybe_entity) {
auto& entity = maybe_entity.value(); // holoscan::gxf::Entity&
// Get a tensor from the entity if it exists.
// Can pass a tensor name as an argument to get a specific tensor.
auto tensor = entity.get<holoscan::Tensor>(); // std::shared_ptr<holoscan::Tensor>
if (tensor) {
HOLOSCAN_LOG_INFO("tensor nbytes: {}", tensor->nbytes());
}
}
}
};
如果实体包含张量,则可以使用 holoscan::gxf::Entity
类的 get
方法来检索张量。get
方法返回一个 std::shared_ptr<holoscan::Tensor>
对象,该对象可用于访问张量数据。holoscan::Tensor
类的 nbytes
方法用于获取张量中的字节数。
通过使用 holoscan::TensorMap
类,该类存储张量名称到张量的映射 (std::unordered_map<std::string, std::shared_ptr<holoscan::Tensor>>
),可以更新接收包含一个或多个张量对象的实体对象的代码,以接收 holoscan::TensorMap
对象而不是 holoscan::gxf::Entity
对象。holoscan::TensorMap
类提供了一种通过名称访问张量数据的方法,使用类似于 std::unordered_map
的接口。
// ...
class TensorRxOp : public holoscan::Operator {
public:
HOLOSCAN_OPERATOR_FORWARD_ARGS(TensorRxOp)
TensorRxOp() = default;
void setup(holoscan::OperatorSpec& spec) override {
spec.input<holoscan::TensorMap>("in");
}
void compute(holoscan::InputContext& op_input, holoscan::OutputContext&,
holoscan::ExecutionContext&) override {
// Type of 'maybe_entity' is holoscan::expected<holoscan::TensorMap, holoscan::RuntimeError>
auto maybe_tensor_map = op_input.receive<holoscan::TensorMap>("in");
if (maybe_tensor_map) {
auto& tensor_map = maybe_tensor_map.value(); // holoscan::TensorMap&
for (const auto& [name, tensor] : tensor_map) {
HOLOSCAN_LOG_INFO("tensor name: {}", name);
HOLOSCAN_LOG_INFO("tensor nbytes: {}", tensor->nbytes());
}
}
}
};
在上面的示例中,TensorRxOp
算子在名为“in”的端口上接收输入,数据类型为 holoscan::TensorMap
。InputContext
对象的 receive
方法用于访问输入数据。receive
方法返回一个 expected
对象,该对象可以保存 holoscan::TensorMap
对象或 holoscan::RuntimeError
对象。holoscan::TensorMap
类是 std::unordered_map<std::string, std::shared_ptr<holoscan::Tensor>>
类的包装器,并提供了一种访问张量数据的方法。holoscan::Tensor
类的 nbytes
方法用于获取张量中的字节数。
如果类型 std::any 用于 receive
方法的模板参数,则 receive()
方法将返回一个 std::any
对象,其中包含指定名称的输入。在下面的示例中,PingRxOp
算子在名为“in”的端口上接收输入,数据类型为 std::any
。std::any
对象的 type()
方法用于确定输入数据的实际类型,std::any_cast
// ...
class AnyRxOp : public holoscan::Operator {
public:
HOLOSCAN_OPERATOR_FORWARD_ARGS_SUPER(AnyRxOp, holoscan::ops::GXFOperator)
AnyRxOp() = default;
void setup(holoscan::OperatorSpec& spec) override {
spec.input<std::any>("in");
}
void compute(holoscan::InputContext& op_input, holoscan::OutputContext&, holoscan::ExecutionContext&) override {
auto maybe_any = op_input.receive<std::any>("in");
if (!maybe_any) {
HOLOSCAN_LOG_ERROR("Failed to receive message - {}", maybe_any.error().what());
return;
}
auto& in_any = maybe_any.value();
const auto& in_any_type = in_any.type();
try {
if (in_any_type == typeid(holoscan::gxf::Entity)) {
auto in_entity = std::any_cast<holoscan::gxf::Entity>(in_any);
auto tensor = in_entity.get<holoscan::Tensor>(); // std::shared_ptr<holoscan::Tensor>
if (tensor) {
HOLOSCAN_LOG_INFO("tensor nbytes: {}", tensor->nbytes());
}
} else if (in_any_type == typeid(std::shared_ptr<ValueData>)) {
auto in_value = std::any_cast<std::shared_ptr<ValueData>>(in_any);
HOLOSCAN_LOG_INFO("Received value: {}", in_value->data());
} else {
HOLOSCAN_LOG_ERROR("Invalid message type: {}", in_any_type.name());
}
} catch (const std::bad_any_cast& e) {
HOLOSCAN_LOG_ERROR("Failed to cast message - {}", e.what());
}
}
};
接收任意数量的输入 (C++)
在某些情况下,最好允许在端口上接收任意数量的对象,而不是分配特定数量的输入端口。
使用 IOSpec::kAnySize
用于可变输入处理
实现此目的的一种方法是通过调用 spec.input<std::vector<T>>("port_name", IOSpec::kAnySize)
并将 IOSpec::kAnySize
作为 setup()
方法中算子的第二个参数来定义多接收器输入端口,其中 T
是输入数据类型(如 原生算子 ping 示例中的 PingRxOp
所做的那样)。
void setup(OperatorSpec& spec) override {
spec.input<std::vector<std::shared_ptr<ValueData>>>("receivers", IOSpec::kAnySize);
}
列表 4 examples/ping_multi_port/cpp/ping_multi_port.cpp
class PingRxOp : public Operator {
public:
HOLOSCAN_OPERATOR_FORWARD_ARGS(PingRxOp)
PingRxOp() = default;
void setup(OperatorSpec& spec) override {
// // Since Holoscan SDK v2.3, users can define a multi-receiver input port using 'spec.input()'
// // with 'IOSpec::kAnySize'.
// // The old way is to use 'spec.param()' with 'Parameter<std::vector<IOSpec*>> receivers_;'.
// spec.param(receivers_, "receivers", "Input Receivers", "List of input receivers.", {});
spec.input<std::vector<std::shared_ptr<ValueData>>>("receivers", IOSpec::kAnySize);
}
void compute(InputContext& op_input, OutputContext&, ExecutionContext&) override {
auto value_vector =
op_input.receive<std::vector<std::shared_ptr<ValueData>>>("receivers").value();
HOLOSCAN_LOG_INFO("Rx message received (count: {}, size: {})", count_++, value_vector.size());
HOLOSCAN_LOG_INFO("Rx message value1: {}", value_vector[0]->data());
HOLOSCAN_LOG_INFO("Rx message value2: {}", value_vector[1]->data());
};
private:
// // Since Holoscan SDK v2.3, the following line is no longer needed.
// Parameter<std::vector<IOSpec*>> receivers_;
int count_ = 1;
};
} // namespace holoscan::ops
class MyPingApp : public holoscan::Application {
public:
void compose() override {
using namespace holoscan;
// Define the tx, mx, rx operators, allowing the tx operator to execute 10 times
auto tx = make_operator<ops::PingTxOp>("tx", make_condition<CountCondition>(10));
auto mx = make_operator<ops::PingMxOp>("mx", Arg("multiplier", 3));
auto rx = make_operator<ops::PingRxOp>("rx");
// Define the workflow
add_flow(tx, mx, {{"out1", "in1"}, {"out2", "in2"}});
add_flow(mx, rx, {{"out1", "receivers"}, {"out2", "receivers"}});
}
};
然后,一旦在 compose()
方法中提供了以下配置,
add_flow(mx, rx, {{"out1", "receivers"}, {"out2", "receivers"}});
PingRxOp
将在 compute()
方法中的 receivers
端口上接收两个输入
auto value_vector =
op_input.receive<std::vector<std::shared_ptr<ValueData>>>("receivers").value();
当输入端口使用 IOSpec::kAnySize
定义时,框架会为端口上接收的每个输入对象创建一个新的输入端口。输入端口使用格式 <port_name>:<index>
命名,其中 <port_name>
是输入端口的名称,<index>
是端口上接收的输入对象的索引。例如,如果 receivers
端口接收两个输入对象,则输入端口将命名为 receivers:0
和 receivers:1
。
框架在内部创建一个类型为 std::vector<holoscan::IOSpec*>
的参数 (receivers
),隐式创建输入端口 (receivers:0
和 receivers:1
),并将它们连接起来(将输入端口的引用添加到 receivers
向量)。这样,当调用 receive()
方法时,框架可以将来自相应输入端口的输入数据作为向量返回。
auto value_vector =
op_input.receive<std::vector<std::shared_ptr<ValueData>>>("receivers").value();
如果在 compose()
方法的末尾添加 HOLOSCAN_LOG_INFO(rx->description());
,您将看到 PingRxOp
算子的描述,如下所示
id: -1
name: rx
fragment: ""
args:
[]
type: kNative
conditions:
[]
resources:
[]
spec:
fragment: ""
params:
- name: receivers
type: std::vector<holoscan::IOSpec*>
description: ""
flag: kNone
inputs:
- name: receivers:1
io_type: kInput
typeinfo_name: N8holoscan3gxf6EntityE
connector_type: kDefault
conditions:
[]
- name: receivers:0
io_type: kInput
typeinfo_name: N8holoscan3gxf6EntityE
connector_type: kDefault
conditions:
[]
- name: receivers
io_type: kInput
typeinfo_name: St6vectorISt10shared_ptrI9ValueDataESaIS2_EE
connector_type: kDefault
conditions:
[]
outputs:
[]
配置输入端口队列大小和消息批处理条件 (C++)
如果您想在一个端口上接收多个对象并批量处理它们,您可以增加输入端口的队列大小,并将 MessageAvailableCondition
条件的 min_size
参数设置为所需的批次大小。这可以通过调用带有期望参数的 connector()
和 condition()
方法来完成,分别使用批次大小作为 capacity
和 min_size
参数。
将 min_size
设置为 N
将确保操作器在 compute()
方法被调用之前接收到 N
个对象。
void setup(holoscan::OperatorSpec& spec) override {
spec.input<std::shared_ptr<ValueData>>("receivers")
.connector(holoscan::IOSpec::ConnectorType::kDoubleBuffer,
holoscan::Arg("capacity", static_cast<uint64_t>(2)))
.condition(holoscan::ConditionType::kMessageAvailable,
holoscan::Arg("min_size", static_cast<uint64_t>(2)));
}
然后,可以调用带有 receivers
端口名称的 receive()
方法来批量接收输入数据。
void compute(holoscan::InputContext& op_input, holoscan::OutputContext&,
holoscan::ExecutionContext&) override {
std::vector<std::shared_ptr<ValueData>> value_vector;
auto maybe_value = op_input.receive<std::shared_ptr<ValueData>>("receivers");
while (maybe_value) {
value_vector.push_back(maybe_value.value());
maybe_value = op_input.receive<std::shared_ptr<ValueData>>("receivers");
}
HOLOSCAN_LOG_INFO("Rx message received (size: {})", value_vector.size());
}
在上面的示例中,操作器在名为“receivers”的端口上接收输入,队列大小为 2,min_size
为 2。receive()
方法在一个循环中被调用,以批量接收输入数据,批次大小为 2。由于操作器事先不知道要接收的对象数量,因此在一个循环中调用 receive()
方法,直到它返回错误。输入数据存储在一个向量中,并且在接收到所有输入数据后,向量的大小会被记录。
为了简化上述代码,Holoscan SDK 提供了 IOSpec::kPrecedingCount
常量作为 OperatorSpec 的 input()
方法的第二个参数,以指定输入端口的前置连接数(在本例中,到 receivers
端口的连接数为 2)作为批次大小。这可以用于批量接收输入数据,而无需在一个循环中调用 receive()
方法。
void setup(holoscan::OperatorSpec& spec) override {
spec.input<std::vector<std::shared_ptr<ValueData>>>("receivers", holoscan::IOSpec::kPrecedingCount);
}
然后,可以调用带有 receivers
端口名称的 receive()
方法来批量接收输入数据。
void compute(holoscan::InputContext& op_input, holoscan::OutputContext&,
holoscan::ExecutionContext&) override {
auto value_vector =
op_input.receive<std::vector<std::shared_ptr<ValueData>>>("receivers").value();
HOLOSCAN_LOG_INFO("Rx message received (size: {})", value_vector.size());
HOLOSCAN_LOG_INFO("Rx message value1: {}", value_vector[0]->data());
HOLOSCAN_LOG_INFO("Rx message value2: {}", value_vector[1]->data());
}
在上面的示例中,操作器在名为“receivers”的端口上接收输入,批次大小为 2。receive()
方法被调用,并带有 receivers
端口名称,以批量接收输入数据,批次大小为 2。输入数据存储在一个向量中,并且在接收到所有输入数据后,向量的大小会被记录。
如果您想使用特定的批次大小,您可以使用 holoscan::IOSpec::IOSize(int64_t)
而不是 holoscan::IOSpec::kPrecedingCount
来指定批次大小。以这种方式使用 IOSize 等同于更详细的 condition()
和 connector()
调用,以更新本节开始附近显示的 capacity
和 min_size
参数。
使用 IOSize
的简短形式,而不是 condition()
或 connector()
方法的主要原因是,如果需要进行额外的参数更改,例如队列策略。有关 condition()
和 connector()
方法的使用的更多详细信息,请参阅下面的高级主题部分(进一步自定义输入和输出)。
void setup(holoscan::OperatorSpec& spec) override {
spec.input<std::vector<std::shared_ptr<ValueData>>>("receivers", holoscan::IOSpec::IOSize(2));
}
如果您想逐个接收输入数据,您可以调用 receive()
方法,而无需使用 std::vector<T>
模板参数。
void compute(holoscan::InputContext& op_input, holoscan::OutputContext&,
holoscan::ExecutionContext&) override {
while (true) {
auto maybe_value = op_input.receive<std::shared_ptr<ValueData>>("receivers");
if (!maybe_value) { break; }
auto& value = maybe_value.value();
// Process the input data
HOLOSCAN_LOG_INFO("Rx message received (value: {})", value->data());
}
}
上面的代码将从 receivers
端口逐个接收输入数据。receive()
方法在一个循环中被调用,直到它返回错误。输入数据存储在一个变量中,并记录输入数据的值。
这种方法(逐个接收输入数据)不适用于 holoscan::IOSpec::kAnySize
的情况。对于 holoscan::IOSpec::kAnySize
参数,框架会在内部为端口上接收的每个输入对象创建一个新的输入端口。每个隐式输入端口(使用格式 <port_name>:<index>
命名)都与一个 MessageAvailableCondition
条件相关联,该条件的 min_size
为 1
。因此,receive()
方法需要使用 std::vector<T>
模板参数来一次性批量接收输入数据。
如果您确实需要为 holoscan::IOSpec::kAnySize
情况逐个接收输入数据(尽管不推荐这样做),您可以从每个隐式输入端口(命名为 <port_name>:<index>
)逐个接收输入数据,使用不带 std::vector<T>
模板参数的 receive()
方法。(例如,op_input.receive<std::shared_ptr<ValueData>>("receivers:0")
,op_input.receive<std::shared_ptr<ValueData>>("receivers:1")
等)。为了避免在为隐式输入端口调用 receive()
方法时出现错误消息(例如 The operator does not have an input port with label 'receivers:X'
),您需要提前计算到 receivers
端口的连接数,并相应地为每个隐式输入端口调用 receive()
方法。
void compute(holoscan::InputContext& op_input, holoscan::OutputContext&,
holoscan::ExecutionContext&) override {
int input_count = spec()->inputs().size() - 1; // -1 to exclude the 'receivers' input port
for (int i = 0; i < input_count; i++) {
auto maybe_value =
op_input.receive<std::shared_ptr<ValueData>>(fmt::format("receivers:{}", i).c_str());
if (!maybe_value) { break; }
auto& value = maybe_value.value();
// Process the input data
HOLOSCAN_LOG_INFO("Rx message received (value: {})", value->data());
}
}
在上面的示例中,使用 IOSpec::kPrecedingCount
或 IOSpec::IOSize(int64_t)
似乎显示出与 IOSpec::kAnySize
相同的行为。但是,区别在于,由于 IOSpec::kPrecedingCount
或 IOSpec::IOSize(int64_t)
不为每个(内部)输入端口使用单独的 MessageAvailableCondition
条件,因此不能保证操作器将按顺序接收输入数据。
这意味着操作器接收输入数据的顺序可能与 compose()
方法中建立连接的顺序不同。此外,使用多线程调度器,不能保证操作器将均匀地接收来自每个连接的输入数据。操作器可能从一个连接接收到比另一个连接更多的输入数据。
如果输入数据的顺序很重要,建议使用 IOSpec::kAnySize
并使用带有 std::vector<T>
模板参数的 receive()
方法来一次性批量接收输入数据。
有关在 C++ 操作器中接收多个输入的更多示例,请参阅 C++ 系统测试用例。
构建您的 C++ 操作器
您可以使用 CMake 构建 C++ 操作器,方法是在您的 CMakeLists.txt
中调用 find_package(holoscan)
以加载 SDK 库。您的操作器将需要链接到 holoscan::core
列表 5
# Your CMake project
cmake_minimum_required(VERSION 3.20)
project(my_project CXX)
# Finds the holoscan SDK
find_package(holoscan REQUIRED CONFIG PATHS "/opt/nvidia/holoscan")
# Create a library for your operator
add_library(my_operator SHARED my_operator.cpp)
# Link your operator against holoscan::core
target_link_libraries(my_operator
PUBLIC holoscan::core
)
一旦您的 CMakeLists.txt
在 <src_dir>
中准备就绪,您可以使用下面的命令行在 <build_dir>
中构建。如果想要使用的 SDK 安装与提供给 find_package(holoscan)
的 PATHS
不同,您可以选择性地传递 Holoscan_ROOT
。
# Configure
cmake -S <src_dir> -B <build_dir> -D Holoscan_ROOT="/opt/nvidia/holoscan"
# Build
cmake --build <build_dir> -j
在应用程序中使用您的 C++ 操作器
如果应用程序与操作器配置在同一个 CMake 项目中,您可以简单地将操作器 CMake 目标库名称添加到应用程序可执行文件的
target_link_libraries
调用中,因为操作器 CMake 目标已经定义。# operator add_library(my_op my_op.cpp) target_link_libraries(my_operator PUBLIC holoscan::core) # application add_executable(my_app main.cpp) target_link_libraries(my_operator PRIVATE holoscan::core my_op )
如果应用程序与操作器配置在不同的项目中,您需要在操作器的 CMake 项目中 导出操作器,并在应用程序 CMake 项目中导入它,然后才能将其列在
target_link_libraries
下。这与 SDK 内置操作器 所做的事情相同,这些操作器在holoscan::ops
命名空间下可用。
然后,您可以在应用程序代码中包含 C++ 操作器的头文件。
GXF 操作器
使用 Holoscan C++ API,我们还可以将来自 GXF 扩展的 GXF Codelets 包装为 Holoscan Operators。
为了使用 GXF Codelet 作为 Holoscan 操作器,不再需要下面描述的手动 codelet 包装机制。现在有一个新的 GXFCodeletOp
,它允许通过 Fragment::make_operator
直接使用现有的 GXF codelet,而无需先为其创建包装类。类似地,现在还有一个 GXFComponentResource
类,它允许将 GXF Component 用作 Holoscan 资源,通过 Fragment::make_resource
。在 examples/import_gxf_components 文件夹中,为 C++ 和 Python 应用程序提供了如何使用这些类的详细示例。
给定一个现有的 GXF 扩展,我们可以创建一个简单的“identity”应用程序,它由一个重放器组成,该重放器从磁盘上的文件中读取内容,以及上一节的记录器,它将以完全相同的格式存储重放器的输出。这使我们能够查看记录器的输出是否与原始输入文件匹配。
下面的 MyRecorderOp
Holoscan 操作器实现将包装 此处 显示的 MyRecorder
GXF Codelet。
操作器定义
列表 6 my_recorder_op.hpp
#ifndef APPS_MY_RECORDER_APP_MY_RECORDER_OP_HPP
#define APPS_MY_RECORDER_APP_MY_RECORDER_OP_HPP
#include "holoscan/core/gxf/gxf_operator.hpp"
namespace holoscan::ops {
class MyRecorderOp : public holoscan::ops::GXFOperator {
public:
HOLOSCAN_OPERATOR_FORWARD_ARGS_SUPER(MyRecorderOp, holoscan::ops::GXFOperator)
MyRecorderOp() = default;
const char* gxf_typename() const override { return "MyRecorder"; }
void setup(OperatorSpec& spec) override;
void initialize() override;
private:
Parameter<holoscan::IOSpec*> receiver_;
Parameter<std::shared_ptr<holoscan::Resource>> my_serializer_;
Parameter<std::string> directory_;
Parameter<std::string> basename_;
Parameter<bool> flush_on_tick_;
};
} // namespace holoscan::ops
#endif/* APPS_MY_RECORDER_APP_MY_RECORDER_OP_HPP */
holoscan::ops::MyRecorderOp
类通过继承 holoscan::ops::GXFOperator
类来包装 MyRecorder
GXF Codelet。HOLOSCAN_OPERATOR_FORWARD_ARGS_SUPER 宏用于将构造函数的参数转发到基类。
我们首先需要定义 MyRecorderOp
类的字段。您可以看到,在 MyRecorderOp
类和 MyRecorder
GXF codelet 中都定义了具有相同名称的字段。
列表 7 gxf_extensions/my_recorder/my_recorder.hpp 中的参数声明
nvidia::gxf::Parameter<nvidia::gxf::Handle<nvidia::gxf::Receiver>> receiver_;
nvidia::gxf::Parameter<nvidia::gxf::Handle<nvidia::gxf::EntitySerializer>> my_serializer_;
nvidia::gxf::Parameter<std::string> directory_;
nvidia::gxf::Parameter<std::string> basename_;
nvidia::gxf::Parameter<bool> flush_on_tick_;
比较 MyRecorderOp
holoscan 参数与 MyRecorder
gxf codelet
Holoscan 操作器 | GXF Codelet |
---|---|
holoscan::Parameter |
nvidia::gxf::Parameter |
holoscan::IOSpec* |
nvidia::gxf::Handle<nvidia::gxf::Receiver>> 或 nvidia::gxf::Handle<nvidia::gxf::Transmitter>> |
std::shared_ptr<holoscan::Resource>> |
nvidia::gxf::Handle<T>> 示例: T 是 nvidia::gxf::EntitySerializer |
然后我们需要实现以下函数
const char* gxf_typename() const override
:返回 Codelet 的 GXF 类型名称。指定 GXF Codelet 的完全限定类名 (MyRecorder
)。void setup(OperatorSpec& spec) override
:使用操作器的输入/输出和参数设置 OperatorSpec。void initialize() override
:初始化操作器。
设置参数规范
setup(OperatorSpec& spec)
函数的实现如下
列表 8 my_recorder_op.cpp
#include "./my_recorder_op.hpp"
#include "holoscan/core/fragment.hpp"
#include "holoscan/core/gxf/entity.hpp"
#include "holoscan/core/operator_spec.hpp"
#include "holoscan/core/resources/gxf/video_stream_serializer.hpp"
namespace holoscan::ops {
void MyRecorderOp::setup(OperatorSpec& spec) {
auto& input = spec.input<holoscan::gxf::Entity>("input");
// Above is same with the following two lines (a default condition is assigned to the input port if not specified):
//
// auto& input = spec.input<holoscan::gxf::Entity>("input")
// .condition(ConditionType::kMessageAvailable, Arg("min_size") = static_cast<uint64_t>(1));
spec.param(receiver_, "receiver", "Entity receiver", "Receiver channel to log", &input);
spec.param(my_serializer_,
"serializer",
"Entity serializer",
"Serializer for serializing input data");
spec.param(directory_, "out_directory", "Output directory path", "Directory path to store received output");
spec.param(basename_, "basename", "File base name", "User specified file name without extension");
spec.param(flush_on_tick_,
"flush_on_tick",
"Boolean to flush on tick",
"Flushes output buffer on every `tick` when true",
false);
}
void MyRecorderOp::initialize() {...}
} // namespace holoscan::ops
在这里,我们设置了操作器的输入/输出和参数。请注意,此函数的内容与 MyRecorder
GXF codelet 的 registerInterface 函数非常相似。
在 C++ API 中,GXF
Receiver
和Transmitter
组件(例如DoubleBufferReceiver
和DoubleBufferTransmitter
)被视为操作器的输入和输出端口,因此我们使用input<T>
和output<T>
函数注册操作器的输入/输出(其中T
是端口的数据类型)。与执行相同工作的纯 GXF 应用程序 相比,SchedulingTerm 在 GXF 应用程序 YAML 中的实体被指定为输入/输出端口上的
Condition
s(例如,holoscan::MessageAvailableCondition
和holoscan::DownstreamMessageAffordableCondition
)。
上面的 MyRecorderOp::setup
中突出显示行与 GXF 应用程序 YAML 的以下突出显示语句匹配
列表 9 apps/my_recorder_app_gxf/my_recorder_gxf.yaml 的一部分
name: recorder
components:
- name: input
type: nvidia::gxf::DoubleBufferReceiver
- name: allocator
type: nvidia::gxf::UnboundedAllocator
- name: component_serializer
type: nvidia::gxf::StdComponentSerializer
parameters:
allocator: allocator
- name: entity_serializer
type: nvidia::gxf::StdEntitySerializer
parameters:
component_serializers: [component_serializer]
- type: MyRecorder
parameters:
receiver: input
serializer: entity_serializer
out_directory: "/tmp"
basename: "tensor_out"
- type: nvidia::gxf::MessageAvailableSchedulingTerm
parameters:
receiver: input
min_size: 1
以同样的方式,如果我们有一个 Transmitter
GXF 组件,我们将有以下语句(请参阅 holoscan::ConditionType
的可用常量)
auto& output = spec.output<holoscan::gxf::Entity>("output");
// Above is same with the following two lines (a default condition is assigned to the output port if not specified):
//
// auto& output = spec.output<holoscan::gxf::Entity>("output")
// .condition(ConditionType::kDownstreamMessageAffordable, Arg("min_size") = static_cast<uint64_t>(1));
初始化操作器
接下来,initialize()
函数的实现如下
列表 10 my_recorder_op.cpp
#include "./my_recorder_op.hpp"
#include "holoscan/core/fragment.hpp"
#include "holoscan/core/gxf/entity.hpp"
#include "holoscan/core/operator_spec.hpp"
#include "holoscan/core/resources/gxf/video_stream_serializer.hpp"
namespace holoscan::ops {
void MyRecorderOp::setup(OperatorSpec& spec) {...}
void MyRecorderOp::initialize() {
// Set up prerequisite parameters before calling GXFOperator::initialize()
auto frag = fragment();
auto serializer =
frag->make_resource<holoscan::StdEntitySerializer>("serializer");
add_arg(Arg("serializer") = serializer);
GXFOperator::initialize();
}
} // namespace holoscan::ops
在这里,我们设置了预定义的参数,例如 serializer
。上面突出显示行与 GXF 应用程序 YAML 的突出显示语句匹配
列表 11 apps/my_recorder_app_gxf/my_recorder_gxf.yaml 的另一部分
name: recorder
components:
- name: input
type: nvidia::gxf::DoubleBufferReceiver
- name: allocator
type: nvidia::gxf::UnboundedAllocator
- name: component_serializer
type: nvidia::gxf::StdComponentSerializer
parameters:
allocator: allocator
- name: entity_serializer
type: nvidia::gxf::StdEntitySerializer
parameters:
component_serializers: [component_serializer]
- type: MyRecorder
parameters:
receiver: input
serializer: entity_serializer
out_directory: "/tmp"
basename: "tensor_out"
- type: nvidia::gxf::MessageAvailableSchedulingTerm
parameters:
receiver: input
min_size: 1
Holoscan C++ API 已经提供了 holoscan::StdEntitySerializer
类,它包装了 nvidia::gxf::StdEntitySerializer
GXF 组件,这里用作 serializer
。
构建您的 GXF 操作器
构建 GXF 操作器和 构建原生 C++ 操作器 在 CMake 中没有区别,因为 GXF codelet 实际上是通过 GXF 扩展作为插件加载的,并且不需要添加到 target_link_libraries(my_operator ...)
。
在应用程序中使用您的 GXF 操作器
使用 GXF 操作器和 在应用程序中使用原生 C++ 操作器 在 CMake 中没有区别。但是,应用程序将需要加载保存包装的 GXF codelet 符号的 GXF 扩展库,因此需要配置应用程序以在其 yaml 配置文件中查找扩展库,如 此处 文档所述。
GXF 和原生 C++ 操作器之间的互操作性
为了支持在操作器(GXF 和原生 C++ 操作器)之间发送和接收张量,Holoscan SDK 提供了以下 C++ 类
一个名为
holoscan::Map
的类模板,它继承自std::unordered_map<std::string, std::shared_ptr<T>>
。模板参数T
可以是任何类型,它用于指定存储在 map 中的std::shared_ptr
对象的类型。一个
holoscan::TensorMap
类,定义为holoscan::Map
对于holoscan::Tensor
类型的特化。
当从原生 C++ 操作器发出带有 holoscan::TensorMap
的消息时,消息对象总是被转换为 holoscan::gxf::Entity
对象并发送到下游操作器。
然后,如果发送的 GXF Entity 对象仅包含 Tensor 对象作为其组件,则下游操作器可以将消息数据接收为 holoscan::TensorMap
对象,而不是 holoscan::gxf::Entity
对象。
图 16 显示了 holoscan::gxf::Entity
和 nvidia::gxf::Entity
类之间的关系,以及 holoscan::Tensor
和 nvidia::gxf::Tensor
类之间的关系。

图 16 支持张量互操作性
holoscan::gxf::Tensor
和 nvidia::gxf::Tensor
彼此可互操作,因为它们是围绕相同的底层 DLManagedTensorContext
结构包装器,该结构体保存一个 DLManagedTensor 对象。
holoscan::TensorMap
类用于在 map 中存储多个张量,其中每个张量都与唯一的键关联。holoscan::TensorMap
类用于在操作器之间传递多个张量,其使用方式与 std::unordered_map<std::string, std::shared_ptr<holoscan::Tensor>>
对象相同。
由于 holoscan::TensorMap
和 holoscan::gxf::Entity
对象都保存着可互操作的张量,因此 GXF 和原生 C++ 操作器之间的消息数据也是可互操作的。
图 17 说明了如何使用 holoscan::TensorMap
类在操作器之间传递多个张量。GXFSendTensorOp
操作器将一个 nvidia::gxf::Entity
对象(包含一个名为“tensor”的 GXF 组件 nvidia::gxf::Tensor
对象)发送到 ProcessTensorOp
操作器,后者处理张量,然后将处理后的张量转发到 GXFReceiveTensorOp
操作器。
考虑以下示例,其中 GXFSendTensorOp
和 GXFReceiveTensorOp
是 GXF 操作器,而 ProcessTensorOp
是 C++ 中的 Holoscan 原生操作器

图 17 C++ 原生操作器和 GXF 操作器之间的张量互操作性
以下代码显示了如何实现 ProcessTensorOp
的 compute()
方法,作为一个与 GXF 操作器通信的 C++ 原生操作器。重点关注 holoscan::gxf::Entity
的使用
列表 12 examples/tensor_interop/cpp/tensor_interop.cpp
void compute(InputContext& op_input, OutputContext& op_output,
ExecutionContext& context) override {
// The type of `in_message` is 'holoscan::TensorMap'.
auto in_message = op_input.receive<holoscan::TensorMap>("in").value();
// The type of out_message is TensorMap
TensorMap out_message;
for (auto& [key, tensor] : in_message) { // Process with 'tensor' here.
cudaError_t cuda_status;
size_t data_size = tensor->nbytes();
std::vector<uint8_t> in_data(data_size);
CUDA_TRY(cudaMemcpy(in_data.data(), tensor->data(), data_size, cudaMemcpyDeviceToHost));
HOLOSCAN_LOG_INFO("ProcessTensorOp Before key: '{}', shape: ({}), data: [{}]",
key,
fmt::join(tensor->shape(), ","),
fmt::join(in_data, ","));
for (size_t i = 0; i < data_size; i++) { in_data[i] *= 2; }
HOLOSCAN_LOG_INFO("ProcessTensorOp After key: '{}', shape: ({}), data: [{}]",
key,
fmt::join(tensor->shape(), ","),
fmt::join(in_data, ","));
CUDA_TRY(cudaMemcpy(tensor->data(), in_data.data(), data_size, cudaMemcpyHostToDevice));
out_message.insert({key, tensor});
}
// Send the processed message.
op_output.emit(out_message);
};
输入消息的类型为
holoscan::TensorMap
对象。TensorMap
对象中的每个holoscan::Tensor
都作为in_data
复制到主机上。数据被处理(值乘以 2)
数据被移回 GPU 上的
holoscan::Tensor
对象。创建一个新的
holoscan::TensorMap
对象out_message
,以便使用op_output.emit()
发送到下一个操作器。
在 examples/tensor_interop/cpp 目录中,提供了一个完整的 C++ 原生操作器示例,该操作器支持与 GXF 操作器的互操作性。
在组装 Python 应用程序时,可以使用两种类型的操作器
原生 Python 操作器:在 Python 中定义的自定义操作器,通过创建
holoscan.core.Operator
的子类。这些 Python 操作器可以在操作器之间传递任意 Python 对象,并且不受用于 C++ API 操作器的更严格的参数类型的限制。C++ 操作器的 Python 包装器:在底层 C++ 库中定义的操作器,通过继承
holoscan::Operator
类。这些操作器在holoscan.operators
模块中提供 Python 绑定。示例包括用于重放视频文件的VideoStreamReplayerOp
,用于格式转换的FormatConverterOp
,以及用于可视化的HolovizOp
。
可以使用 Python 包装的 C++ 操作器和原生 Python 操作器的混合来创建一个应用程序。在这种情况下,必须特别注意适当地转换输入和输出张量,如 下面的章节 所示。
原生 Python 操作器
操作器生命周期 (Python)
holoscan.core.Operator
的生命周期由三个阶段组成
start()
在操作器启动时调用一次,用于初始化繁重的任务,例如分配内存资源和使用参数。compute()
在操作器被触发时调用,这可以在操作器的生命周期中在start()
和stop()
之间发生任意次数。stop()
在操作器停止时调用一次,用于反初始化繁重的任务,例如释放先前在start()
中分配的资源。
工作流程上的所有算子都安排执行。当一个算子首次执行时,将调用 start()
方法,然后调用 compute()
方法。当算子停止时,将调用 stop()
方法。compute()
方法在 start()
和 stop()
之间被多次调用。
如果未满足 Conditions 指定的任何调度条件(例如,如果操作器已执行一定次数,则 CountCondition
将导致调度条件不满足),则操作器将停止并调用 stop()
方法。
我们将在用户指南的 指定操作器输入和输出 (Python) 部分介绍如何使用 Conditions
。
通常,start()
和 stop()
函数在应用程序的生命周期中仅调用一次。但是,如果再次满足调度条件,则可以再次调度算子执行,并且将再次调用 start()
方法。

图 18 Holoscan 操作器生命周期中方法调用的顺序
我们可以通过实现上述方法来覆盖操作器的默认行为。以下示例显示了如何实现一个自定义操作器,该操作器覆盖了 start、stop 和 compute 方法。
列表 13 Holoscan 操作器的基本结构 (Python)
from holoscan.core import (
ExecutionContext,
InputContext,
Operator,
OperatorSpec,
OutputContext,
)
class MyOp(Operator):
def __init__(self, fragment, *args, **kwargs):
super().__init__(fragment, *args, **kwargs)
def setup(self, spec: OperatorSpec):
pass
def start(self):
pass
def compute(self, op_input: InputContext, op_output: OutputContext, context: ExecutionContext):
pass
def stop(self):
pass
setup()
方法 vs initialize()
vs __init__()
setup()
方法旨在通过提供一个 OperatorSpec
对象作为 spec 参数来获取“operator 的 spec”。当 __init__()
被调用时,它会调用 C++ 的 Operator::spec
方法(并设置 self.spec
类成员),并调用 setup
方法,以便 Operator 的 spec
属性持有 operator 的规范。(有关更多详细信息,请参阅源代码。)
由于 setup()
方法可以多次调用,并使用其他 OperatorSpec
对象(例如,枚举 operator 的描述),因此在 setup()
方法中,用户不应该初始化任何内容。此类初始化需要通过重写 initialize()
方法来完成。
def initialize(self):
pass
__init__()
方法用于创建 Operator 对象,并且可以通过传递各种参数来初始化 operator 对象本身。请注意,它不会初始化相应的 GXF 实体对象。底层 GXF 实体对象在 operator 被调度执行时初始化。
请不要忘记在 __init__
方法的末尾调用基类构造函数 (super().__init__(fragment, *args, **kwargs)
)。
创建自定义 operator (Python)
要在 Python 中创建自定义 operator,需要创建 holoscan.core.Operator
的子类。以下是一个简单的 operator 示例,该 operator 接受名为 “signal” 的时变一维输入数组,并应用与箱形(即矩形)核的卷积。
为简单起见,此 operator 假定在输入端接收到的 “signal” 已经是 numpy.ndarray
或可以通过 np.asarray
转换为 numpy.ndarray
的对象。我们将在后面的章节中详细了解如何与各种张量类进行互操作,包括一些基于 C++ 的 operator 使用的 GXF Tensor 对象。
代码片段: examples/numpy_native/convolve.py
列表 14 examples/numpy_native/convolve.py
import os
from holoscan.conditions import CountCondition
from holoscan.core import Application, Operator, OperatorSpec
from holoscan.logger import LogLevel, set_log_level
import numpy as np
class SignalGeneratorOp(Operator):
"""Generate a time-varying impulse.
Transmits an array of zeros with a single non-zero entry of a
specified `height`. The position of the non-zero entry shifts
to the right (in a periodic fashion) each time `compute` is
called.
Parameters
----------
fragment : holoscan.core.Fragment
The Fragment (or Application) the operator belongs to.
height : number
The height of the signal impulse.
size : number
The total number of samples in the generated 1d signal.
dtype : numpy.dtype or str
The data type of the generated signal.
"""
def __init__(self, fragment, *args, height=1, size=10, dtype=np.int32, **kwargs):
self.count = 0
self.height = height
self.dtype = dtype
self.size = size
super().__init__(fragment, *args, **kwargs)
def setup(self, spec: OperatorSpec):
spec.output("signal")
def compute(self, op_input, op_output, context):
# single sample wide impulse at a time-varying position
signal = np.zeros((self.size,), dtype=self.dtype)
signal[self.count % signal.size] = self.height
self.count += 1
op_output.emit(signal, "signal")
class ConvolveOp(Operator):
"""Apply convolution to a tensor.
Convolves an input signal with a "boxcar" (i.e. "rect") kernel.
Parameters
----------
fragment : holoscan.core.Fragment
The Fragment (or Application) the operator belongs to.
width : number
The width of the boxcar kernel used in the convolution.
unit_area : bool, optional
Whether or not to normalize the convolution kernel to unit area.
If False, all samples have implitude one and the dtype of the
kernel will match that of the signal. When True the sum over
the kernel is one and a 32-bit floating point data type is used
for the kernel.
"""
def __init__(self, fragment, *args, width=4, unit_area=False, **kwargs):
self.count = 0
self.width = width
self.unit_area = unit_area
super().__init__(fragment, *args, **kwargs)
def setup(self, spec: OperatorSpec):
spec.input("signal_in")
spec.output("signal_out")
def compute(self, op_input, op_output, context):
signal = op_input.receive("signal_in")
assert isinstance(signal, np.ndarray)
if self.unit_area:
kernel = np.full((self.width,), 1/self.width, dtype=np.float32)
else:
kernel = np.ones((self.width,), dtype=signal.dtype)
convolved = np.convolve(signal, kernel, mode='same')
op_output.emit(convolved, "signal_out")
class PrintSignalOp(Operator):
"""Print the received signal to the terminal."""
def setup(self, spec: OperatorSpec):
spec.input("signal")
def compute(self, op_input, op_output, context):
signal = op_input.receive("signal")
print(signal)
class ConvolveApp(Application):
"""Minimal signal processing application.
Generates a time-varying impulse, convolves it with a boxcar kernel, and
prints the result to the terminal.
A `CountCondition` is applied to the generate to terminate execution
after a specific number of steps.
"""
def compose(self):
signal_generator = SignalGeneratorOp(
self,
CountCondition(self, count=24),
name="generator",
**self.kwargs("generator"),
)
convolver = ConvolveOp(self, name="conv", **self.kwargs("convolve"))
printer = PrintSignalOp(self, name="printer")
self.add_flow(signal_generator, convolver)
self.add_flow(convolver, printer)
def main(config_file):
app = ConvolveApp()
# if the --config command line argument was provided, it will override this config_file`
app.config(config_file)
app.run()
if __name__ == "__main__":
config_file = os.path.join(os.path.dirname(__file__), 'convolve.yaml')
main(config_file=config_file)
代码片段: examples/numpy_native/convolve.yaml
列表 15 examples/numpy_native/convolve.yaml
signal_generator:
height: 1
size: 20
dtype: int32
convolve:
width: 4
unit_area: false
在此应用程序中,创建了三个原生 Python operator:SignalGeneratorOp
、ConvolveOp
和 PrintSignalOp
。SignalGeneratorOp
生成合成信号,例如 [0, 0, 1, 0, 0, 0]
,其中非零项的位置在每次调用时都会变化。ConvolveOp
使用指定宽度的箱形(即矩形)函数执行一维卷积。PrintSignalOp
只是将接收到的信号打印到终端。
如下文更详细的介绍,每个 operator 的输入都在 operator 的 setup()
方法中指定。然后,输入在 compute
方法中通过 op_input.receive()
接收,输出通过 op_output.emit()
发送。
请注意,对于此处定义的原生 Python operator,可以发送或接收任何 Python 对象。在 operator 之间传输时,传输的是指向对象的共享指针,而不是副本。在某些情况下,例如将同一张量发送到多个下游 operator 时,可能需要避免对张量进行原地操作,以避免 operator 之间可能存在的任何竞争条件。
指定 operator 参数 (Python)
在上面的 SignalGeneratorOp
operator 示例中,我们在 operator 的 __init__
方法中添加了三个关键字参数,这些参数在 operator 的 compose()
方法内部使用,以调整其行为。
def __init__(self, fragment, *args, width=4, unit_area=False, **kwargs):
# Internal counter for the time-dependent signal generation
self.count = 0
# Parameters
self.width = width
self.unit_area = unit_area
# To forward remaining arguments to any underlying C++ Operator class
super().__init__(fragment, *args, **kwargs)
作为更接近 C++ 的替代方法,这些参数可以通过 operator 的 OperatorSpec
属性在其 setup()
方法中添加,其中必须提供关联的字符串键以及默认值。
def setup(self, spec: OperatorSpec):
spec.param("width", 4)
spec.param("unit_area", False)
然后可以在 operator 的方法(包括 initialize()
、start()
、compute()
、stop()
)中以 self.width
和 self.unit_area
的形式访问 self
对象上的参数。
其他 kwargs
属性也可以传递给 spec.param
,例如 headline
、description
(GXF 应用程序使用)或 kind
(当 接收任意数量的输入 (Python) 时使用,自 v2.3.0 起已弃用)。
通过这些方法之一添加的原生 operator 参数的名称不得与基类 Operator
的任何现有属性或方法名称重叠。
请参阅 配置算子参数 部分,了解应用程序如何设置这些参数。
指定 operator 输入和输出 (Python)
要配置 Python 原生 operator 的输入和输出,请在 operator 的 setup()
方法中调用 spec.input()
和 spec.output()
方法。
对于要添加的每个输入和输出,spec.input()
和 spec.output()
方法应调用一次。holoscan.core.OperatorSpec
对象和 setup()
方法将由 Application
类在其 run()
方法被调用时自动初始化和调用。
这些方法 (spec.input()
和 spec.output()
) 返回一个 IOSpec
对象,该对象可用于配置输入/输出端口。
默认情况下,holoscan.conditions.MessageAvailableCondition
和 holoscan.conditions.DownstreamMessageAffordableCondition
条件应用于输入/输出端口(min_size
为 1
)。这意味着在输入端口上有消息可用并且下游 operator 的输入端口(队列)有足够的容量接收消息之前,不会调用 operator 的 compute()
方法。
def setup(self, spec: OperatorSpec):
spec.input("in")
# Above statement is equivalent to:
# spec.input("in")
# .condition(ConditionType.MESSAGE_AVAILABLE, min_size = 1)
spec.output("out")
# Above statement is equivalent to:
# spec.output("out")
# .condition(ConditionType.DOWNSTREAM_MESSAGE_AFFORDABLE, min_size = 1)
在上面的示例中,spec.input()
方法用于配置输入端口以具有 holoscan.conditions.MessageAvailableCondition
,最小大小为 1。这意味着在 operator 的输入端口上有消息可用之前,不会调用 operator 的 compute()
方法。同样,spec.output()
方法用于配置输出端口以具有 holoscan.conditions.DownstreamMessageAffordableCondition
,最小大小为 1。这意味着在下游 operator 的输入端口有足够的容量接收消息之前,不会调用 operator 的 compute()
方法。
如果要更改此行为,请使用 IOSpec.condition()
方法来配置条件。例如,要配置输入和输出端口不具有任何条件,可以使用以下代码
from holoscan.core import ConditionType, OperatorSpec
# ...
def setup(self, spec: OperatorSpec):
spec.input("in").condition(ConditionType.NONE)
spec.output("out").condition(ConditionType.NONE)
setup()
方法中的示例代码配置输入端口不具有任何条件,这意味着只要 operator 准备好计算,就会调用 compute()
方法。由于不能保证输入端口有可用消息,因此 compute()
方法应在尝试读取输入端口上的消息之前检查是否有可用消息。
receive()
方法的 InputContext
对象可用于访问 operator 类的 compute()
方法中的不同类型的输入数据。此方法将输入端口的名称作为参数(如果 operator 只有一个输入端口,则可以省略)。
对于标准 Python 对象,receive()
将直接返回指定名称的输入的 Python 对象。
Holoscan SDK 还提供了内置数据类型,称为 域对象,在 include/holoscan/core/domain
目录中定义。例如,Tensor
是一个域对象类,用于表示数据的多维数组,可以被 OperatorSpec
、InputContext
和 OutputContext
直接使用。
此 holoscan.core.Tensor
类同时支持 DLPack 和 NumPy 的数组接口 (__array_interface__
和 __cuda_array_interface__
),因此可以与其他 Python 库一起使用,例如 CuPy、PyTorch、JAX、TensorFlow 和 Numba。有关更多详细信息,请参阅 互操作性部分。
在这两种情况下,如果输入端口上没有可用消息,它都将返回 None
。
# ...
def compute(self, op_input, op_output, context):
msg = op_input.receive("in")
if msg:
# Do something with msg
接收任意数量的输入 (Python)
在某些情况下,最好允许在端口上接收任意数量的对象,而不是分配特定数量的输入端口。
使用 IOSpec.ANY_SIZE
进行可变输入处理
实现此目的的一种方法是通过在 operator 的 setup()
方法中调用 spec.input("port_name", IOSpec.ANY_SIZE)
并使用 IOSpec.ANY_SIZE
作为第二个参数来定义多接收器输入端口(如 原生 operator ping 示例中的 PingRxOp
所做的那样)。
def setup(self, spec: OperatorSpec):
spec.input("receivers", size=IOSpec.ANY_SIZE)
代码片段: examples/ping_multi_port/python/ping_multi_port.py
列表 16 examples/ping_multi_port/python/ping_multi_port.py
class PingRxOp(Operator):
"""Simple receiver operator.
This operator has:
input: "receivers"
This is an example of a native operator that can dynamically have any
number of inputs connected to is "receivers" port.
"""
def __init__(self, fragment, *args, **kwargs):
self.count = 1
# Need to call the base class constructor last
super().__init__(fragment, *args, **kwargs)
def setup(self, spec: OperatorSpec):
# # Since Holoscan SDK v2.3, users can define a multi-receiver input port using
# # 'spec.input()' with 'size=IOSpec.ANY_SIZE'.
# # The old way is to use 'spec.param()' with 'kind="receivers"'.
# spec.param("receivers", kind="receivers")
spec.input("receivers", size=IOSpec.ANY_SIZE)
def compute(self, op_input, op_output, context):
values = op_input.receive("receivers")
print(f"Rx message received (count:{self.count}, size:{len(values)})")
self.count += 1
print(f"Rx message value1:{values[0].data}")
print(f"Rx message value2:{values[1].data}")
# Now define a simple application using the operators defined above
class MyPingApp(Application):
def compose(self):
# Define the tx, mx, rx operators, allowing the tx operator to execute 10 times
tx = PingTxOp(self, CountCondition(self, 10), name="tx")
mx = PingMxOp(self, name="mx", multiplier=3)
rx = PingRxOp(self, name="rx")
# Define the workflow
self.add_flow(tx, mx, {("out1", "in1"), ("out2", "in2")})
self.add_flow(mx, rx, {("out1", "receivers"), ("out2", "receivers")})
然后,一旦在 compose()
方法中提供了以下配置,
self.add_flow(mx, rx, {("out1", "receivers"), ("out2", "receivers")})
PingRxOp
将在 compute()
方法中的 receivers
端口上接收两个输入
values = op_input.receive("receivers")
当输入端口使用 IOSpec.ANY_SIZE
定义时,框架会为端口上接收到的每个输入对象创建一个新的输入端口。输入端口使用格式 <port_name>:<index>
命名,其中 <port_name>
是输入端口的名称,<index>
是端口上接收到的输入对象的索引。例如,如果 receivers
端口接收到两个输入对象,则输入端口将命名为 receivers:0
和 receivers:1
。
框架在内部创建一个类型为 std::vector<holoscan::IOSpec*>
的参数 (receivers
),隐式创建输入端口 (receivers:0
和 receivers:1
),并将它们连接起来(将输入端口的引用添加到 receivers
向量)。这样,当调用 receive()
方法时,框架可以将来自相应输入端口的输入数据作为元组返回。
values = op_input.receive("receivers")
如果在 compose()
方法的末尾添加 print(rx.description)
,您将看到 PingRxOp
operator 的描述,如下所示
id: -1
name: rx
fragment: ""
args:
[]
type: kNative
conditions:
[]
resources:
[]
spec:
fragment: ""
params:
- name: receivers
type: std::vector<holoscan::IOSpec*>
description: ""
flag: kNone
inputs:
- name: receivers:1
io_type: kInput
typeinfo_name: N8holoscan3gxf6EntityE
connector_type: kDefault
conditions:
[]
- name: receivers:0
io_type: kInput
typeinfo_name: N8holoscan3gxf6EntityE
connector_type: kDefault
conditions:
[]
- name: receivers
io_type: kInput
typeinfo_name: N8holoscan3gxf6EntityE
connector_type: kDefault
conditions:
[]
outputs:
[]
配置输入端口队列大小和消息批处理条件 (Python)
如果您想在一个端口上接收多个对象并批量处理它们,您可以增加输入端口的队列大小,并将 MessageAvailableCondition
条件的 min_size
参数设置为所需的批次大小。这可以通过调用带有期望参数的 connector()
和 condition()
方法来完成,分别使用批次大小作为 capacity
和 min_size
参数。
将 min_size
设置为 N
将确保操作器在 compute()
方法被调用之前接收到 N
个对象。
def setup(self, spec: OperatorSpec):
spec.input("receivers").connector(IOSpec.ConnectorType.DOUBLE_BUFFER, capacity=2).condition(
ConditionType.MESSAGE_AVAILABLE, min_size=2
)
然后,可以调用带有 receivers
端口名称的 receive()
方法来批量接收输入数据。
def compute(self, op_input, op_output, context):
values = []
value = op_input.receive("receivers")
while value:
values.append(value)
value = op_input.receive("receivers")
print(f"Rx message received (size:{len(values)})")
在上面的示例中,operator 在名为 “receivers” 的端口上接收输入,队列大小为 2,min_size
为 2。在循环中调用 receive()
方法以批量接收输入数据(批大小为 2)。由于 operator 事先不知道要接收的对象数量,因此循环调用 receive()
方法,直到它返回 None
值。输入数据存储在一个列表中,并在接收到所有输入数据后记录列表的大小。
为了简化上述代码,Holoscan SDK 提供了 IOSpec.PRECEDING_COUNT
常量作为 spec.input()
方法的第二个参数,以指定输入端口的先前连接数(在本例中,与 receivers
端口的连接数为 2)作为批处理大小。这可用于批量接收输入数据,而无需在循环中调用 receive()
方法。
def setup(self, spec: OperatorSpec):
spec.input("receivers", size=IOSpec.PRECEDING_COUNT)
然后,可以调用带有 receivers
端口名称的 receive()
方法来批量接收输入数据。
def compute(self, op_input, op_output, context):
values = op_input.receive("receivers")
print(f"Rx message received (size:{len(values)})")
print(f"Rx message value1:{values[0].data}")
print(f"Rx message value2:{values[1].data}")
在上面的示例中,operator 在名为 “receivers” 的端口上接收输入,批处理大小为 2。调用 receive()
方法并使用 receivers
端口名称以批量接收输入数据(批大小为 2)。输入数据存储在一个元组中,并在接收到所有输入数据后记录元组的大小。
如果要使用特定的批处理大小,可以使用 holoscan.IOSpec.IOSize(size : int)
而不是 holoscan.IOSpec.PRECEDING_COUNT
来指定批处理大小。以这种方式使用 IOSize
等效于更冗长的 condition()
和 connector()
调用,以更新本节开头附近显示的 capacity
和 min_size
参数。
使用 IOSize
的简短形式,而不是 condition()
或 connector()
方法的主要原因是,如果需要进行额外的参数更改,例如队列策略。有关 condition()
和 connector()
方法的使用的更多详细信息,请参阅下面的高级主题部分(进一步自定义输入和输出)。
def setup(self, spec: OperatorSpec):
spec.input("receivers", size=IOSpec.IOSize(2))
如果要逐个接收输入数据,可以调用带有 kind="single"
参数的 receive()
方法。
def compute(self, op_input, op_output, context):
while True:
value = op_input.receive("receivers", kind="single")
if value is None:
break
# Process the input data
print(f"Rx message received (value:{value.data})")
上面的代码将从 receivers
端口逐个接收输入数据。在循环中调用 receive()
方法,直到它返回 None
值。输入数据存储在一个变量中,并记录输入数据的值。
此方法(逐个接收输入数据)不适用于 IOSpec.ANY_SIZE
情况。使用 IOSpec.ANY_SIZE
参数,框架会为内部接收到的每个输入对象创建一个新的输入端口。每个隐式输入端口(使用格式 <port_name>:<index>
命名)都与 MessageAvailableCondition
条件相关联,该条件的 min_size
为 1
。因此,不能使用 kind="single"
关键字参数调用 receive()
方法来逐个接收输入数据。相反,对于 IOSpec.ANY_SIZE
情况,可以不使用任何 kind
参数或使用 kind="multi"
参数调用它。
如果确实需要为 IOSpec.ANY_SIZE
情况逐个接收输入数据(尽管不建议这样做),则可以使用不带 kind
参数的 receive()
方法从每个隐式输入端口(命名为 <port_name>:<index>
)逐个接收输入数据。(例如,op_input.receive("receivers:0")
、op_input.receive("receivers:1")
等)。为了避免在为隐式输入端口调用 receive()
方法时出现错误消息(例如 The operator does not have an input port with label 'receivers:X'
),您需要预先计算与 receivers
端口的连接数,并相应地为每个隐式输入端口调用 receive()
方法。
def compute(self, op_input, op_output, context):
input_count = len(self.spec.inputs) - 1 # -1 to exclude the 'receivers' input port
for i in range(input_count):
value = op_input.receive(f"receivers:{i}")
if value is None:
break
# Process the input data
print(f"Rx message received (value:{value.data})")
在上面的示例中,使用 IOSpec.PRECEDING_COUNT
或 IOSpec.IOSize(2)
似乎显示出与 IOSpec.ANY_SIZE
相同的行为。但是,区别在于,由于 IOSpec.PRECEDING_COUNT
或 IOSpec.IOSize(2)
不为每个(内部)输入端口使用单独的 MessageAvailableCondition
条件,因此不能保证 operator 将按顺序接收输入数据。
这意味着操作器接收输入数据的顺序可能与 compose()
方法中建立连接的顺序不同。此外,使用多线程调度器,不能保证操作器将均匀地接收来自每个连接的输入数据。操作器可能从一个连接接收到比另一个连接更多的输入数据。
如果输入数据的顺序很重要,建议使用 IOSpec.ANY_SIZE
并调用 receive()
方法一次性批量接收输入数据。
有关在 Python operator 中接收多个输入的更多示例,请参阅 Python 系统测试用例。
C++ operator 的 Python 封装
从 Python 使用封装以 C++ 开发的 operator 在关于 创建 C++ operator Python 绑定的单独章节中介绍。
从 Holoscan 2.1 开始,有一个 GXFCodeletOp
类,可用于从 Python 轻松封装现有的 GXF codelet,而无需首先为其编写底层的 C++ 封装类。类似地,现在还有一个 GXFComponentResource
类,允许将 GXF Component 用作 Python 应用程序中的 Holoscan 资源。在 examples/import_gxf_components 文件夹中,为 Python 应用程序提供了如何使用这些类的详细示例。
封装的 operator 和原生 Python operator 之间的互操作性
如 GXF operator 和原生 C++ operator 之间的互操作性 部分所述,可以使用保存张量的 holoscan::TensorMap
消息将 holoscan::Tensor
对象传递给 GXF operator。在 Python 中,这通过发送 dict
类型对象来完成,这些对象将张量名称作为键,并将 holoscan Tensor 或类似数组的对象作为值。类似地,当传输单个 holoscan::Tensor
的封装 C++ operator 连接到 Python 原生 operator 的输入端口时,在该端口上调用 op_input.receive()
将返回包含单个项的 Python dict。该项的键是张量名称,其值是相应的 holoscan.core.Tensor
。
考虑以下示例,其中 VideoStreamReplayerOp
和 HolovizOp
是 Python 封装的 C++ operator,而 ImageProcessingOp
是 Python 原生 operator。

图 19 Python 原生 operator 和基于 C++ 的 Python GXF operator 之间的张量互操作性
以下代码显示了如何将 ImageProcessingOp
的 compute()
方法实现为与 C++ operator 通信的 Python 原生 operator。
列表 17 examples/tensor_interop/python/tensor_interop.py
def compute(self, op_input, op_output, context):
# in_message is a dict of tensors
in_message = op_input.receive("input_tensor")
# smooth along first two axes, but not the color channels
sigma = (self.sigma, self.sigma, 0)
# out_message will be a dict of tensors
out_message = dict()
for key, value in in_message.items():
print(f"message received (count:{self.count})")
self.count += 1
cp_array = cp.asarray(value)
# process cp_array
cp_array = ndi.gaussian_filter(cp_array, sigma)
out_message[key] = cp_array
op_output.emit(out_message, "output_tensor")
op_input.receive()
方法调用返回一个dict
对象。holoscan.core.Tensor
对象通过使用cupy.asarray()
方法调用转换为 CuPy 数组。CuPy 数组用作
ndi.gaussian_filter()
函数调用的输入,参数为sigma
。ndi.gaussian_filter()
函数调用的结果是一个 CuPy 数组。最后,创建一个新的
dict
对象out_message
,以便通过op_output.emit()
发送到下一个 operator。CuPy 数组cp_array
被添加到其中,键是张量名称。CuPy 数组不必显式转换为holocan.core.Tensor
对象,因为它们实现了 DLPack(和__cuda__array_interface__
)接口。
在 examples/tensor_interop/python 目录中,可以找到支持与 Python 封装的 C++ operator 互操作的 Python 原生 operator 的完整示例。
您可以将多个张量添加到单个 dict
对象中,如下例所示
Operator 发送消息
out_message = {
"video": output_array,
"labels": labels,
"bbox_coords": bbox_coords,
}
# emit the tensors
op_output.emit(out_message, "outputs")
Operator 接收消息,假设上面的 outputs
端口连接到下面的 inputs
端口,并且 add_flow()
具有相应的张量
in_message = op_input.receive("inputs")
# Tensors and tensor names
video_tensor = in_message["video"]
labels_tensor = in_message["labels"]
bbox_coords_tensor = in_message["bbox_coords"]
一些现有的 operator 允许配置它们发送/接收的张量的名称。一个例子是 HolovizOp
的 tensors
参数,其中每个张量的名称都映射到 Entity
中的张量名称(请参阅 apps/endoscopy_tool_tracking/python/endoscopy_tool_tracking.yaml 中的 holoviz
条目)。
在 examples/holoviz/python 目录中,可以找到一个 Python 原生 operator 的完整示例,该 operator 将多个张量发送到下游 C++ operator。
对于通过 UCX 连接发送/接收张量对象,存在张量类型的特殊序列化代码,该代码避免将张量数据复制到中间缓冲区。对于分布式应用程序,我们不能像在单个 fragment 应用程序中的 operator 之间那样直接发送 Python 对象,而是需要将其强制转换为 holoscan::Tensor
以使用特殊的零拷贝代码路径。但是,我们还会传输一个标头,指示类型最初是否是其他类似数组的对象,并尝试在另一端再次返回相同的类型,以便行为与非分布式情况更相似。
发送的对象 | 接收的对象 |
---|---|
holoscan.Tensor | holoscan.Tensor |
类似数组的 dict | holoscan.Tensor 的 dict |
主机类似数组的对象(具有 __array_interface__ ) |
numpy.ndarray |
设备类似数组的对象(具有 __cuda_array_interface__ ) |
cupy.ndarray |
这避免了 NumPy 或 CuPy 数组通过 cloudpickle 序列化为字符串,以便可以有效地传输它们,并在另一端再次返回相同的类型。值得一提的是,如果发送的类型是例如发送端的 PyTorch 主机/设备张量,则接收到的值将是 numpy/cupy 数组,因为任何实现接口的对象都会返回这些类型。
自动 operator 类创建
Holoscan 还提供了一个 holoscan.decorator
模块,该模块提供了通过向现有函数或类添加装饰器来自动生成 Operator 的方法。请参阅关于 通过 holoscan.decorator.create_op 创建 operator 的单独章节。
进一步自定义输入和输出
本节补充了上述关于基本输入和输出端口配置的信息,这些信息在 C++ 和 Python 操作器创建指南中分别给出。此处描述的概念对于 C++ 或 Python API 都是相同的。
默认情况下,操作器的输入和输出端口都将使用双缓冲队列,该队列的容量为一条消息,并且策略设置为:如果队列已满时有消息到达,则报错。对于每个输入端口,操作器上会自动放置一个 MessageAvailableCondition
(C++
/Python
)条件,以便在每个端口都有单条消息可用之前,不会调用 compute
方法。类似地,每个输出端口都有一个 DownstreamMessageAffordableCondition
(C++
/Python
)条件,该条件会阻止操作器调用 compute
,直到任何连接的下游操作器在其接收器队列中都有空间容纳单条消息。这些默认条件确保消息永远不会在队列已满时到达,并且在调用 compute
方法时,消息已经被接收。这些默认条件使得连接每个操作器依次调用 compute 的管道相对容易,但可能不适用于所有应用。本节介绍如何根据请求覆盖默认行为。
可以通过 HOLOSCAN_QUEUE_POLICY
环境变量修改全局默认队列策略。有效选项(不区分大小写)包括
“pop”:当队列已满时,到达的新项将替换最旧的项
“reject”:当队列已满时,到达的新项将被丢弃
“fail”:如果队列已满时有新项到达,则终止应用程序
当未指定 HOLOSCAN_QUEUE_POLICY
时,默认行为为“fail”。如果操作器的 setup
方法通过 connector
(C++
/Python
)方法显式设置了接收器或发送器(如下所述),则该连接器的策略不会被默认策略覆盖。
覆盖操作器端口属性是一个高级主题。开发者可能希望跳过本节,直到他们遇到默认行为不足以满足其应用需求的情况。
要覆盖给定端口使用的队列属性,可以使用 connector
(C++
/Python
)方法,如下例所示。此示例还展示了如何使用 condition
(C++
/Python
)方法来更改端口在操作器上放置的条件类型。通常,当操作器有多个条件时,它们会进行“与”组合,因此在操作器可以调用 compute
之前,所有端口上的条件都必须满足。
考虑以下来自操作器的 holoscan::Operator::setup()
方法中的代码。
spec.output<TensorMap>("out1")
spec.output<TensorMap>("out2").condition(ConditionType::kNone);
spec.output<TensorMap>("in")
.connector(IOSpec::ConnectorType::kDoubleBuffer,
Arg("capacity", static_cast<uint64_t>(2)),
Arg("policy", static_cast<uint64_t>(1))) // 0=pop, 1=reject, 2=fault (default)
.condition(ConditionType::kMessageAvailable,
Arg("min_size", static_cast<uint64_t>(2)),
Arg("front_stage_max_size", static_cast<size_t>(2)));
这将定义
一个名为“out1”的输出端口,具有默认属性
一个名为“out2”的输出端口,它仍然具有默认连接器(一个
holoscan::gxf::DoubleBufferTransmitter
),但通过设置ConditionType::kNone
移除了ConditionType::kDownstreamMessageAffordable
的默认条件。这表明操作器在调用compute
之前,不会检查“out2”下游的任何端口的接收器队列中是否有可用空间。一个名为“in”的输入端口,其连接器和条件都具有与默认值不同的参数。例如,队列大小增加到 2,
policy=1
是“reject”,表示如果消息到达时队列已满,则该消息将被拒绝,而保留队列中已有的消息。
考虑以下来自操作器的 holoscan::Operator::setup()
方法中的代码。
spec.output("out1")
spec.output("out2").condition(ConditionType.NONE)
spec.input("in").connector(
IOSpec.ConnectorType.DOUBLE_BUFFER,
capacity=2,
policy=1, # 0=pop, 1=reject, 2=fault (default)
).condition(ConditionType.MESSAGE_AVAILABLE, min_size=2, front_stage_max_size=2)
这将定义
一个名为“out1”的输出端口,具有默认属性
一个名为“out2”的输出端口,它仍然具有默认连接器(一个
holoscan.resources.DoubleBufferTransmitter
),但通过设置ConditionType.NONE
移除了ConditionType.DOWNSTREAM_MESSAGE_AFFORDABLE
的默认条件。这表明操作器在调用compute
之前,不会检查“out2”下游的任何端口的接收器队列中是否有可用空间。一个名为“in1”的输入端口,其连接器和条件都具有与默认值不同的参数。例如,队列大小增加到 2,
policy=1
是“reject”,表示如果消息到达时队列已满,则该消息将被拒绝,而保留队列中已有的消息。
要了解有关覆盖连接器和/或条件的更多信息,请参阅 multi_branch_pipeline 示例,该示例覆盖了默认条件,以允许管道的两个分支以不同的帧速率运行。在 此 Python 队列策略测试应用程序 中,也有增加可用队列大小的示例。
将 Holoscan SDK 与其他库一起使用
Holoscan SDK 能够与各种强大的 GPU 加速库无缝集成,以构建高效、高性能的管道。
有关 Holoscan 的张量互操作性以及在管道中处理 CUDA 库的详细示例和更多信息,请参阅 HoloHub 仓库 中的 将外部库集成到 Holoscan 管道的最佳实践 教程。这包括用于 Python 的 CUDA Python、CuPy,用于 C++ 的 MatX,以及用于集成到 Holoscan 应用的 cuCIM、CV-CUDA 和 OpenCV。