Ping 多端口
在本节中,我们将了解如何创建一个应用程序,其工作流程更为复杂,其中操作器可能具有多个输入/输出端口,用于发送/接收用户定义的数据类型。
在本示例中,我们将介绍
如何使用自定义数据类型发送/接收消息。
如何添加一个可以接收任意数量输入的端口。
示例源代码和运行说明可以在 GitHub 上的 examples 目录中找到,或者在 NGC 容器和 Debian 软件包中的 /opt/nvidia/holoscan/examples
下找到,与其可执行文件放在一起。
以下是本示例中使用的操作器和工作流程的图表。

图 7 具有多个输入和输出的工作流程
在本示例中,PingTxOp
将奇数整数流发送到 out1
端口,将偶数整数发送到 out2
端口。PingMxOp
使用 in1
和 in2
端口接收这些值,将它们乘以一个常数因子,然后将它们转发到 PingRxOp
上的单个端口 - receivers
。
在前一个 ping
示例中,我们操作器的端口类型是整数,但 Holoscan SDK 可以发送任何任意数据类型。在本示例中,我们将了解如何为用户定义的 ValueData
类配置操作器。
#include "holoscan/holoscan.hpp"
class ValueData {
public:
ValueData() = default;
explicit ValueData(int value) : data_(value) {
HOLOSCAN_LOG_TRACE("ValueData::ValueData(): {}", data_);
}
~ValueData() { HOLOSCAN_LOG_TRACE("ValueData::~ValueData(): {}", data_); }
void data(int value) { data_ = value; }
int data() const { return data_; }
private:
int data_;
};
ValueData
类包装了一个简单的整数(第 6
、16
行),但可以是任意复杂的。
HOLOSCAN_LOG_<LEVEL>()
宏可以用于使用 fmtlib 语法进行日志记录(上面第 7
、9
行),如本示例所示。有关更多详细信息,请参阅 日志记录 部分。
from holoscan.conditions import CountCondition
from holoscan.core import Application, IOSpec, Operator, OperatorSpec
class ValueData:
"""Example of a custom Python class"""
def __init__(self, value):
self.data = value
def __repr__(self):
return f"ValueData({self.data})"
def __eq__(self, other):
return self.data == other.data
def __hash__(self):
return hash(self.data)
ValueData
类是一个简单的包装器,但可以是任意复杂的。
在定义了自定义 ValueData
类之后,我们配置操作器的端口以发送/接收此类型的消息,类似于之前的示例。
这是第一个操作器 - PingTxOp
- 在两个端口 out1
和 out2
上发送 ValueData
对象
namespace holoscan::ops {
class PingTxOp : public Operator {
public:
HOLOSCAN_OPERATOR_FORWARD_ARGS(PingTxOp)
PingTxOp() = default;
void setup(OperatorSpec& spec) override {
spec.output<std::shared_ptr<ValueData>>("out1");
spec.output<std::shared_ptr<ValueData>>("out2");
}
void compute(InputContext&, OutputContext& op_output, ExecutionContext&) override {
auto value1 = std::make_shared<ValueData>(index_++);
op_output.emit(value1, "out1");
auto value2 = std::make_shared<ValueData>(index_++);
op_output.emit(value2, "out2");
};
int index_ = 1;
};
我们使用
spec.output<std::shared_ptr<ValueData>>()
在第10
和11
行配置输出端口的ValueData
类型。因此,输出端口的数据类型是指向ValueData
对象的共享指针的对象。然后使用
op_output.emit()
在第16
和19
行发送值。由于此操作器上有多个端口,因此需要端口名称。
输出端口的数据类型是共享指针 (std::shared_ptr
),因此在第 15
和 18
行调用 std::make_shared<ValueData>(...)
。
class PingTxOp(Operator):
"""Simple transmitter operator.
This operator has:
outputs: "out1", "out2"
On each tick, it transmits a `ValueData` object at each port. The
transmitted values are even on port1 and odd on port2 and increment with
each call to compute.
"""
def __init__(self, fragment, *args, **kwargs):
self.index = 1
# Need to call the base class constructor last
super().__init__(fragment, *args, **kwargs)
def setup(self, spec: OperatorSpec):
spec.output("out1")
spec.output("out2")
def compute(self, op_input, op_output, context):
value1 = ValueData(self.index)
self.index += 1
op_output.emit(value1, "out1")
value2 = ValueData(self.index)
self.index += 1
op_output.emit(value2, "out2")
我们使用
spec.output()
在第18
和19
行配置输出端口。在 Python 中,无需引用类型 (ValueData
)。然后使用
op_output.emit()
在第24
和28
行发送值。
然后,我们配置中间操作器 - PingMxOp
- 以接收端口 in1
和 in2
上的数据
class PingMxOp : public Operator {
public:
HOLOSCAN_OPERATOR_FORWARD_ARGS(PingMxOp)
PingMxOp() = default;
void setup(OperatorSpec& spec) override {
spec.input<std::shared_ptr<ValueData>>("in1");
spec.input<std::shared_ptr<ValueData>>("in2");
spec.output<std::shared_ptr<ValueData>>("out1");
spec.output<std::shared_ptr<ValueData>>("out2");
spec.param(multiplier_, "multiplier", "Multiplier", "Multiply the input by this value", 2);
}
void compute(InputContext& op_input, OutputContext& op_output, ExecutionContext&) override {
auto value1 = op_input.receive<std::shared_ptr<ValueData>>("in1").value();
auto value2 = op_input.receive<std::shared_ptr<ValueData>>("in2").value();
HOLOSCAN_LOG_INFO("Middle message received (count: {})", count_++);
HOLOSCAN_LOG_INFO("Middle message value1: {}", value1->data());
HOLOSCAN_LOG_INFO("Middle message value2: {}", value2->data());
// Multiply the values by the multiplier parameter
value1->data(value1->data() * multiplier_);
value2->data(value2->data() * multiplier_);
op_output.emit(value1, "out1");
op_output.emit(value2, "out2");
};
private:
int count_ = 1;
Parameter<int> multiplier_;
};
我们使用
spec.input<std::shared_ptr<ValueData>>()
在第8
和9
行配置输入端口的std::shared_ptr<ValueData>
类型。使用
op_input.receive()
在第16
和17
行使用端口名称接收值。接收到的值的类型为std::shared_ptr<ValueData>
,如模板化的receive()
方法中所述。
class PingMxOp(Operator):
"""Example of an operator modifying data.
This operator has:
inputs: "in1", "in2"
outputs: "out1", "out2"
The data from each input is multiplied by a user-defined value.
"""
def __init__(self, fragment, *args, **kwargs):
self.count = 1
# Need to call the base class constructor last
super().__init__(fragment, *args, **kwargs)
def setup(self, spec: OperatorSpec):
spec.input("in1")
spec.input("in2")
spec.output("out1")
spec.output("out2")
spec.param("multiplier", 2)
def compute(self, op_input, op_output, context):
value1 = op_input.receive("in1")
value2 = op_input.receive("in2")
print(f"Middle message received (count:{self.count})")
self.count += 1
print(f"Middle message value1:{value1.data}")
print(f"Middle message value2:{value2.data}")
# Multiply the values by the multiplier parameter
value1.data *= self.multiplier
value2.data *= self.multiplier
op_output.emit(value1, "out1")
op_output.emit(value2, "out2")
在 Python 中,发送任意数据类型的消息非常简单。当我们从传递 int
更改为 ValueData
对象时,定义操作器输入端口(第 18-19
行)和接收它们(第 25-26
行)的代码没有更改。
PingMxOp
处理数据,然后在两个端口上发送出去,类似于上面的 PingTxOp
所做的事情。
在此工作流程中,PingRxOp
具有单个输入端口 - receivers
- 它连接到来自 PingMxOp
的两个上游端口。当输入端口需要连接到多个上游端口时,我们使用 spec.input()
定义它,并将大小设置为 IOSpec::kAnySize
(或 Python 中的 IOSpec.ANY_SIZE
)。这允许输入端口从多个源接收数据。然后,输入存储在一个向量中,遵循使用 add_flow()
添加它们的顺序。
class PingRxOp : public Operator {
public:
HOLOSCAN_OPERATOR_FORWARD_ARGS(PingRxOp)
PingRxOp() = default;
void setup(OperatorSpec& spec) override {
// // Since Holoscan SDK v2.3, users can define a multi-receiver input port using 'spec.input()'
// // with 'IOSpec::kAnySize'.
// // The old way is to use 'spec.param()' with 'Parameter<std::vector<IOSpec*>> receivers_;'.
// spec.param(receivers_, "receivers", "Input Receivers", "List of input receivers.", {});
spec.input<std::vector<std::shared_ptr<ValueData>>>("receivers", IOSpec::kAnySize);
}
void compute(InputContext& op_input, OutputContext&, ExecutionContext&) override {
auto value_vector =
op_input.receive<std::vector<std::shared_ptr<ValueData>>>("receivers").value();
HOLOSCAN_LOG_INFO("Rx message received (count: {}, size: {})", count_++, value_vector.size());
HOLOSCAN_LOG_INFO("Rx message value1: {}", value_vector[0]->data());
HOLOSCAN_LOG_INFO("Rx message value2: {}", value_vector[1]->data());
};
private:
// // Since Holoscan SDK v2.3, the following line is no longer needed.
// Parameter<std::vector<IOSpec*>> receivers_;
int count_ = 1;
};
在操作器的
setup()
方法中,我们使用holoscan::IOSpec::kAnySize
定义一个输入端口receivers
(第12
行),以允许任意数量的上游端口连接到它。使用
op_input.receive<std::vector<std::shared_ptr<ValueData>>>(...)
检索值。value_vector
的类型是std::vector<std::shared_ptr<ValueData>>
(第16-17
行)。
有关如何在 C++ 中检索任意数量的输入的更多信息,请参阅 接收任意数量的输入 (C++)。
class PingRxOp(Operator):
"""Simple receiver operator.
This operator has:
input: "receivers"
This is an example of a native operator that can dynamically have any
number of inputs connected to its "receivers" port.
"""
def __init__(self, fragment, *args, **kwargs):
self.count = 1
# Need to call the base class constructor last
super().__init__(fragment, *args, **kwargs)
def setup(self, spec: OperatorSpec):
# # Since Holoscan SDK v2.3, users can define a multi-receiver input port using
# # 'spec.input()' with 'size=IOSpec.ANY_SIZE'.
# # The old way is to use 'spec.param()' with 'kind="receivers"'.
# spec.param("receivers", kind="receivers")
spec.input("receivers", size=IOSpec.ANY_SIZE)
def compute(self, op_input, op_output, context):
values = op_input.receive("receivers")
print(f"Rx message received (count:{self.count}, size:{len(values)})")
self.count += 1
print(f"Rx message value1:{values[0].data}")
print(f"Rx message value2:{values[1].data}")
在 Python 中,可以通过定义输入端口并设置参数
size=IOSpec.ANY_SIZE
(第21
行)来创建可以连接到多个上游端口的端口。调用
receive()
返回ValueData
对象的元组(第24
行)。
有关如何在 Python 中检索任意数量的输入的更多信息,请参阅 接收任意数量的输入 (Python)。
代码的其余部分创建应用程序、操作器并定义工作流程
class MyPingApp : public holoscan::Application {
public:
void compose() override {
using namespace holoscan;
// Define the tx, mx, rx operators, allowing the tx operator to execute 10 times
auto tx = make_operator<ops::PingTxOp>("tx", make_condition<CountCondition>(10));
auto mx = make_operator<ops::PingMxOp>("mx", Arg("multiplier", 3));
auto rx = make_operator<ops::PingRxOp>("rx");
// Define the workflow
add_flow(tx, mx, {{"out1", "in1"}, {"out2", "in2"}});
add_flow(mx, rx, {{"out1", "receivers"}, {"out2", "receivers"}});
}
};
int main(int argc, char** argv) {
auto app = holoscan::make_application<MyPingApp>();
app->run();
return 0;
}
class MyPingApp(Application):
def compose(self):
# Define the tx, mx, rx operators, allowing the tx operator to execute 10 times
tx = PingTxOp(self, CountCondition(self, 10), name="tx")
mx = PingMxOp(self, name="mx", multiplier=3)
rx = PingRxOp(self, name="rx")
# Define the workflow
self.add_flow(tx, mx, {("out1", "in1"), ("out2", "in2")})
self.add_flow(mx, rx, {("out1", "receivers"), ("out2", "receivers")})
def main():
app = MyPingApp()
app.run()
if __name__ == "__main__":
main()
操作器
tx
、mx
和rx
在应用程序的compose()
中创建,类似于之前的示例。由于本示例中的操作器具有多个输入/输出端口,因此在调用
add_flow()
时,我们需要指定第三个端口名称对参数tx/out1
连接到mx/in1
,tx/out2
连接到mx/in2
。mx/out1
和mx/out2
都连接到rx/receivers
。
运行应用程序应该在您的终端中给出类似于以下内容的输出。
[info] [fragment.cpp:586] Loading extensions from configs...
[info] [gxf_executor.cpp:249] Creating context
[info] [gxf_executor.cpp:1960] Activating Graph...
[info] [gxf_executor.cpp:1992] Running Graph...
[info] [greedy_scheduler.cpp:191] Scheduling 3 entities
[info] [gxf_executor.cpp:1994] Waiting for completion...
[info] [ping_multi_port.cpp:80] Middle message received (count: 1)
[info] [ping_multi_port.cpp:82] Middle message value1: 1
[info] [ping_multi_port.cpp:83] Middle message value2: 2
[info] [ping_multi_port.cpp:116] Rx message received (count: 1, size: 2)
[info] [ping_multi_port.cpp:118] Rx message value1: 3
[info] [ping_multi_port.cpp:119] Rx message value2: 6
[info] [ping_multi_port.cpp:80] Middle message received (count: 2)
[info] [ping_multi_port.cpp:82] Middle message value1: 3
[info] [ping_multi_port.cpp:83] Middle message value2: 4
[info] [ping_multi_port.cpp:116] Rx message received (count: 2, size: 2)
[info] [ping_multi_port.cpp:118] Rx message value1: 9
[info] [ping_multi_port.cpp:119] Rx message value2: 12
...
[info] [ping_multi_port.cpp:80] Middle message received (count: 10)
[info] [ping_multi_port.cpp:82] Middle message value1: 19
[info] [ping_multi_port.cpp:83] Middle message value2: 20
[info] [ping_multi_port.cpp:116] Rx message received (count: 10, size: 2)
[info] [ping_multi_port.cpp:118] Rx message value1: 57
[info] [ping_multi_port.cpp:119] Rx message value2: 60
[info] [greedy_scheduler.cpp:372] Scheduler stopped: Some entities are waiting for execution, but there are no periodic or async entities to get out of the deadlock.
[info] [greedy_scheduler.cpp:401] Scheduler finished.
[info] [gxf_executor.cpp:1997] Deactivating Graph...
[info] [gxf_executor.cpp:2005] Graph execution finished.
[info] [gxf_executor.cpp:278] Destroying context
根据您的日志级别,您可能会看到更多或更少的消息。上面的输出是使用 INFO
的默认值生成的。有关如何设置日志级别的更多详细信息,请参阅 日志记录 部分。