NVIDIA Holoscan SDK v2.9.0

Ping 多端口

在本节中,我们将了解如何创建一个应用程序,其工作流程更为复杂,其中操作器可能具有多个输入/输出端口,用于发送/接收用户定义的数据类型。

在本示例中,我们将介绍

  • 如何使用自定义数据类型发送/接收消息。

  • 如何添加一个可以接收任意数量输入的端口。

注意

示例源代码和运行说明可以在 GitHub 上的 examples 目录中找到,或者在 NGC 容器和 Debian 软件包中的 /opt/nvidia/holoscan/examples 下找到,与其可执行文件放在一起。

以下是本示例中使用的操作器和工作流程的图表。

graphviz-481832a40f9d6ff34591625aec2eb4eb1cc991eb.png

图 7 具有多个输入和输出的工作流程

在本示例中,PingTxOp 将奇数整数流发送到 out1 端口,将偶数整数发送到 out2 端口。PingMxOp 使用 in1in2 端口接收这些值,将它们乘以一个常数因子,然后将它们转发到 PingRxOp 上的单个端口 - receivers

在前一个 ping 示例中,我们操作器的端口类型是整数,但 Holoscan SDK 可以发送任何任意数据类型。在本示例中,我们将了解如何为用户定义的 ValueData 类配置操作器。

复制
已复制!
            

#include "holoscan/holoscan.hpp" class ValueData { public: ValueData() = default; explicit ValueData(int value) : data_(value) { HOLOSCAN_LOG_TRACE("ValueData::ValueData(): {}", data_); } ~ValueData() { HOLOSCAN_LOG_TRACE("ValueData::~ValueData(): {}", data_); } void data(int value) { data_ = value; } int data() const { return data_; } private: int data_; };

ValueData 类包装了一个简单的整数(第 616 行),但可以是任意复杂的。

注意

HOLOSCAN_LOG_<LEVEL>() 宏可以用于使用 fmtlib 语法进行日志记录(上面第 79 行),如本示例所示。有关更多详细信息,请参阅 日志记录 部分。

复制
已复制!
            

from holoscan.conditions import CountCondition from holoscan.core import Application, IOSpec, Operator, OperatorSpec class ValueData: """Example of a custom Python class""" def __init__(self, value): self.data = value def __repr__(self): return f"ValueData({self.data})" def __eq__(self, other): return self.data == other.data def __hash__(self): return hash(self.data)

ValueData 类是一个简单的包装器,但可以是任意复杂的。

在定义了自定义 ValueData 类之后,我们配置操作器的端口以发送/接收此类型的消息,类似于之前的示例

这是第一个操作器 - PingTxOp - 在两个端口 out1out2 上发送 ValueData 对象

复制
已复制!
            

namespace holoscan::ops { class PingTxOp : public Operator { public: HOLOSCAN_OPERATOR_FORWARD_ARGS(PingTxOp) PingTxOp() = default; void setup(OperatorSpec& spec) override { spec.output<std::shared_ptr<ValueData>>("out1"); spec.output<std::shared_ptr<ValueData>>("out2"); } void compute(InputContext&, OutputContext& op_output, ExecutionContext&) override { auto value1 = std::make_shared<ValueData>(index_++); op_output.emit(value1, "out1"); auto value2 = std::make_shared<ValueData>(index_++); op_output.emit(value2, "out2"); }; int index_ = 1; };

  • 我们使用 spec.output<std::shared_ptr<ValueData>>() 在第 1011 行配置输出端口的 ValueData 类型。因此,输出端口的数据类型是指向 ValueData 对象的共享指针的对象。

  • 然后使用 op_output.emit() 在第 1619 行发送值。由于此操作器上有多个端口,因此需要端口名称。

注意

输出端口的数据类型是共享指针 (std::shared_ptr),因此在第 1518 行调用 std::make_shared<ValueData>(...)

复制
已复制!
            

class PingTxOp(Operator): """Simple transmitter operator. This operator has: outputs: "out1", "out2" On each tick, it transmits a `ValueData` object at each port. The transmitted values are even on port1 and odd on port2 and increment with each call to compute. """ def __init__(self, fragment, *args, **kwargs): self.index = 1 # Need to call the base class constructor last super().__init__(fragment, *args, **kwargs) def setup(self, spec: OperatorSpec): spec.output("out1") spec.output("out2") def compute(self, op_input, op_output, context): value1 = ValueData(self.index) self.index += 1 op_output.emit(value1, "out1") value2 = ValueData(self.index) self.index += 1 op_output.emit(value2, "out2")

  • 我们使用 spec.output() 在第 1819 行配置输出端口。在 Python 中,无需引用类型 (ValueData)。

  • 然后使用 op_output.emit() 在第 2428 行发送值。

然后,我们配置中间操作器 - PingMxOp - 以接收端口 in1in2 上的数据

复制
已复制!
            

class PingMxOp : public Operator { public: HOLOSCAN_OPERATOR_FORWARD_ARGS(PingMxOp) PingMxOp() = default; void setup(OperatorSpec& spec) override { spec.input<std::shared_ptr<ValueData>>("in1"); spec.input<std::shared_ptr<ValueData>>("in2"); spec.output<std::shared_ptr<ValueData>>("out1"); spec.output<std::shared_ptr<ValueData>>("out2"); spec.param(multiplier_, "multiplier", "Multiplier", "Multiply the input by this value", 2); } void compute(InputContext& op_input, OutputContext& op_output, ExecutionContext&) override { auto value1 = op_input.receive<std::shared_ptr<ValueData>>("in1").value(); auto value2 = op_input.receive<std::shared_ptr<ValueData>>("in2").value(); HOLOSCAN_LOG_INFO("Middle message received (count: {})", count_++); HOLOSCAN_LOG_INFO("Middle message value1: {}", value1->data()); HOLOSCAN_LOG_INFO("Middle message value2: {}", value2->data()); // Multiply the values by the multiplier parameter value1->data(value1->data() * multiplier_); value2->data(value2->data() * multiplier_); op_output.emit(value1, "out1"); op_output.emit(value2, "out2"); }; private: int count_ = 1; Parameter<int> multiplier_; };

  • 我们使用 spec.input<std::shared_ptr<ValueData>>() 在第 89 行配置输入端口的 std::shared_ptr<ValueData> 类型。

  • 使用 op_input.receive() 在第 1617 行使用端口名称接收值。接收到的值的类型为 std::shared_ptr<ValueData>,如模板化的 receive() 方法中所述。

复制
已复制!
            

class PingMxOp(Operator): """Example of an operator modifying data. This operator has: inputs: "in1", "in2" outputs: "out1", "out2" The data from each input is multiplied by a user-defined value. """ def __init__(self, fragment, *args, **kwargs): self.count = 1 # Need to call the base class constructor last super().__init__(fragment, *args, **kwargs) def setup(self, spec: OperatorSpec): spec.input("in1") spec.input("in2") spec.output("out1") spec.output("out2") spec.param("multiplier", 2) def compute(self, op_input, op_output, context): value1 = op_input.receive("in1") value2 = op_input.receive("in2") print(f"Middle message received (count:{self.count})") self.count += 1 print(f"Middle message value1:{value1.data}") print(f"Middle message value2:{value2.data}") # Multiply the values by the multiplier parameter value1.data *= self.multiplier value2.data *= self.multiplier op_output.emit(value1, "out1") op_output.emit(value2, "out2")

在 Python 中,发送任意数据类型的消息非常简单。当我们从传递 int 更改为 ValueData 对象时,定义操作器输入端口(第 18-19 行)和接收它们(第 25-26 行)的代码没有更改。

PingMxOp 处理数据,然后在两个端口上发送出去,类似于上面的 PingTxOp 所做的事情。

在此工作流程中,PingRxOp 具有单个输入端口 - receivers - 它连接到来自 PingMxOp 的两个上游端口。当输入端口需要连接到多个上游端口时,我们使用 spec.input() 定义它,并将大小设置为 IOSpec::kAnySize(或 Python 中的 IOSpec.ANY_SIZE)。这允许输入端口从多个源接收数据。然后,输入存储在一个向量中,遵循使用 add_flow() 添加它们的顺序。

复制
已复制!
            

class PingRxOp : public Operator { public: HOLOSCAN_OPERATOR_FORWARD_ARGS(PingRxOp) PingRxOp() = default; void setup(OperatorSpec& spec) override { // // Since Holoscan SDK v2.3, users can define a multi-receiver input port using 'spec.input()' // // with 'IOSpec::kAnySize'. // // The old way is to use 'spec.param()' with 'Parameter<std::vector<IOSpec*>> receivers_;'. // spec.param(receivers_, "receivers", "Input Receivers", "List of input receivers.", {}); spec.input<std::vector<std::shared_ptr<ValueData>>>("receivers", IOSpec::kAnySize); } void compute(InputContext& op_input, OutputContext&, ExecutionContext&) override { auto value_vector = op_input.receive<std::vector<std::shared_ptr<ValueData>>>("receivers").value(); HOLOSCAN_LOG_INFO("Rx message received (count: {}, size: {})", count_++, value_vector.size()); HOLOSCAN_LOG_INFO("Rx message value1: {}", value_vector[0]->data()); HOLOSCAN_LOG_INFO("Rx message value2: {}", value_vector[1]->data()); }; private: // // Since Holoscan SDK v2.3, the following line is no longer needed. // Parameter<std::vector<IOSpec*>> receivers_; int count_ = 1; };

  • 在操作器的 setup() 方法中,我们使用 holoscan::IOSpec::kAnySize 定义一个输入端口 receivers(第 12 行),以允许任意数量的上游端口连接到它。

  • 使用 op_input.receive<std::vector<std::shared_ptr<ValueData>>>(...) 检索值。

  • value_vector 的类型是 std::vector<std::shared_ptr<ValueData>>(第 16-17 行)。

有关如何在 C++ 中检索任意数量的输入的更多信息,请参阅 接收任意数量的输入 (C++)

复制
已复制!
            

class PingRxOp(Operator): """Simple receiver operator. This operator has: input: "receivers" This is an example of a native operator that can dynamically have any number of inputs connected to its "receivers" port. """ def __init__(self, fragment, *args, **kwargs): self.count = 1 # Need to call the base class constructor last super().__init__(fragment, *args, **kwargs) def setup(self, spec: OperatorSpec): # # Since Holoscan SDK v2.3, users can define a multi-receiver input port using # # 'spec.input()' with 'size=IOSpec.ANY_SIZE'. # # The old way is to use 'spec.param()' with 'kind="receivers"'. # spec.param("receivers", kind="receivers") spec.input("receivers", size=IOSpec.ANY_SIZE) def compute(self, op_input, op_output, context): values = op_input.receive("receivers") print(f"Rx message received (count:{self.count}, size:{len(values)})") self.count += 1 print(f"Rx message value1:{values[0].data}") print(f"Rx message value2:{values[1].data}")

  • 在 Python 中,可以通过定义输入端口并设置参数 size=IOSpec.ANY_SIZE (第 21 行)来创建可以连接到多个上游端口的端口。

  • 调用 receive() 返回 ValueData 对象的元组(第 24 行)。

有关如何在 Python 中检索任意数量的输入的更多信息,请参阅 接收任意数量的输入 (Python)

代码的其余部分创建应用程序、操作器并定义工作流程

复制
已复制!
            

class MyPingApp : public holoscan::Application { public: void compose() override { using namespace holoscan; // Define the tx, mx, rx operators, allowing the tx operator to execute 10 times auto tx = make_operator<ops::PingTxOp>("tx", make_condition<CountCondition>(10)); auto mx = make_operator<ops::PingMxOp>("mx", Arg("multiplier", 3)); auto rx = make_operator<ops::PingRxOp>("rx"); // Define the workflow add_flow(tx, mx, {{"out1", "in1"}, {"out2", "in2"}}); add_flow(mx, rx, {{"out1", "receivers"}, {"out2", "receivers"}}); } }; int main(int argc, char** argv) { auto app = holoscan::make_application<MyPingApp>(); app->run(); return 0; }

复制
已复制!
            

class MyPingApp(Application): def compose(self): # Define the tx, mx, rx operators, allowing the tx operator to execute 10 times tx = PingTxOp(self, CountCondition(self, 10), name="tx") mx = PingMxOp(self, name="mx", multiplier=3) rx = PingRxOp(self, name="rx") # Define the workflow self.add_flow(tx, mx, {("out1", "in1"), ("out2", "in2")}) self.add_flow(mx, rx, {("out1", "receivers"), ("out2", "receivers")}) def main(): app = MyPingApp() app.run() if __name__ == "__main__": main()

  • 操作器 txmxrx 在应用程序的 compose() 中创建,类似于之前的示例。

  • 由于本示例中的操作器具有多个输入/输出端口,因此在调用 add_flow() 时,我们需要指定第三个端口名称对参数

    • tx/out1 连接到 mx/in1tx/out2 连接到 mx/in2

    • mx/out1mx/out2 都连接到 rx/receivers

运行应用程序应该在您的终端中给出类似于以下内容的输出。

复制
已复制!
            

[info] [fragment.cpp:586] Loading extensions from configs... [info] [gxf_executor.cpp:249] Creating context [info] [gxf_executor.cpp:1960] Activating Graph... [info] [gxf_executor.cpp:1992] Running Graph... [info] [greedy_scheduler.cpp:191] Scheduling 3 entities [info] [gxf_executor.cpp:1994] Waiting for completion... [info] [ping_multi_port.cpp:80] Middle message received (count: 1) [info] [ping_multi_port.cpp:82] Middle message value1: 1 [info] [ping_multi_port.cpp:83] Middle message value2: 2 [info] [ping_multi_port.cpp:116] Rx message received (count: 1, size: 2) [info] [ping_multi_port.cpp:118] Rx message value1: 3 [info] [ping_multi_port.cpp:119] Rx message value2: 6 [info] [ping_multi_port.cpp:80] Middle message received (count: 2) [info] [ping_multi_port.cpp:82] Middle message value1: 3 [info] [ping_multi_port.cpp:83] Middle message value2: 4 [info] [ping_multi_port.cpp:116] Rx message received (count: 2, size: 2) [info] [ping_multi_port.cpp:118] Rx message value1: 9 [info] [ping_multi_port.cpp:119] Rx message value2: 12 ... [info] [ping_multi_port.cpp:80] Middle message received (count: 10) [info] [ping_multi_port.cpp:82] Middle message value1: 19 [info] [ping_multi_port.cpp:83] Middle message value2: 20 [info] [ping_multi_port.cpp:116] Rx message received (count: 10, size: 2) [info] [ping_multi_port.cpp:118] Rx message value1: 57 [info] [ping_multi_port.cpp:119] Rx message value2: 60 [info] [greedy_scheduler.cpp:372] Scheduler stopped: Some entities are waiting for execution, but there are no periodic or async entities to get out of the deadlock. [info] [greedy_scheduler.cpp:401] Scheduler finished. [info] [gxf_executor.cpp:1997] Deactivating Graph... [info] [gxf_executor.cpp:2005] Graph execution finished. [info] [gxf_executor.cpp:278] Destroying context

注意

根据您的日志级别,您可能会看到更多或更少的消息。上面的输出是使用 INFO 的默认值生成的。有关如何设置日志级别的更多详细信息,请参阅 日志记录 部分。

上一个 Ping 自定义操作
下一个 视频重放器
© 版权所有 2022-2024,NVIDIA。 上次更新于 2025 年 1 月 27 日。