创建分布式应用程序
分布式应用程序指的是工作流程被划分为多个片段,这些片段可能在不同的节点上运行的应用程序。例如,数据可能通过边缘的传感器收集,发送到单独的工作站进行处理,然后处理后的数据可以发送回边缘节点进行可视化。每个节点将运行一个由运算符组成的计算图的单个片段。因此,一个片段相当于一个非分布式应用程序。在分布式上下文中,应用程序初始化不同的片段,然后定义它们之间的连接,以构建完整的分布式应用程序工作流程。
在本节中,我们将介绍
如何定义分布式应用程序类。
如何构建和运行分布式应用程序。
定义分布式应用程序也在 video_replayer_distributed 和 ping_distributed 示例中进行了说明。ping_distributed
示例还说明了如何更新 C++ 或 Python 应用程序以解析用户定义的参数,使其在不中断对分布式应用程序命令行参数(例如,--driver
、--worker
)支持的情况下工作。
定义单个片段(C++
/Python
)涉及使用 make_operator()
(C++
)或运算符构造函数(Python
)添加运算符,并在 compose()
方法中使用 add_flow()
方法(C++
/Python
)定义它们之间的连接。因此,定义片段就像定义非分布式应用程序一样,只是类应该从片段而不是应用程序继承。
然后,应用程序将通过在应用程序的 compose()
方法中初始化片段来定义。add_flow()
方法(C++
/Python
)可用于定义跨片段的连接。
我们定义了从
Fragment
基类继承的Fragment1
和Fragment2
类。我们定义了从
Application
基类继承的App
类。App
类初始化任何使用的片段,并定义它们之间的连接。在这里,我们在连接片段的示例add_flow
调用中使用了虚拟端口和运算符名称,因为此示例中未显示任何特定运算符。我们在
main()
中使用make_application()
函数创建App
类的实例。
#include <holoscan/holoscan.hpp>
class Fragment1 : public holoscan::Fragment {
public:
void compose() override {
// Define Operators and workflow for Fragment1
// ...
}
};
class Fragment2 : public holoscan::Fragment {
public:
void compose() override {
// Define Operators and workflow for Fragment2
// ...
}
};
class App : public holoscan::Application {
public:
void compose() override {
using namespace holoscan;
auto fragment1 = make_fragment<Fragment1>("fragment1");
auto fragment2 = make_fragment<Fragment2>("fragment2");
// Define the workflow: replayer -> holoviz
add_flow(fragment1, fragment2, {{"fragment1_operator_name.output_port_name",
"fragment2_operator_name.input_port_name"}});
}
};
int main() {
auto app = holoscan::make_application<App>();
app->run();
return 0;
}
我们定义了从
Fragment
基类继承的Fragment1
和Fragment2
类。我们定义了从
Application
基类继承的App
类。App
类初始化任何使用的片段,并定义它们之间的连接。在这里,我们在连接片段的示例 add_flow 调用中使用了虚拟端口和运算符名称,因为此示例中未显示任何特定运算符。我们在
__main__
中创建App
类的实例。run()
方法启动应用程序,该应用程序将执行其compose()
方法,在该方法中将定义自定义工作流程。
from holoscan.core import Application, Fragment
class Fragment1(Fragment):
def compose(self):
# Define Operators and workflow
# ...
class Fragment2(Fragment):
def compose(self):
# Define Operators and workflow
# ...
class App(Application):
def compose(self):
fragment1 = Fragment1(self, name="fragment1")
fragment2 = Fragment2(self, name="fragment2")
self.add_flow(fragment1, fragment2, {("fragment1_operator_name.output_port_name",
"fragment2_operator_name.input_port_name")})
def main():
app = App()
app.run()
if __name__ == "__main__":
main()
分布式应用程序的自定义数据类型序列化
多片段应用程序的片段之间的数据传输是通过 统一通信 X (UCX) 库完成的。为了传输数据,必须将其序列化为适合通过网络传输的二进制形式。对于张量(C++
/Python
)、字符串和各种标量和向量数值类型,序列化已经内置。有关如何将数据序列化支持扩展到其他用户定义的类的具体示例的更多详细信息,请参阅关于序列化的单独页面。
构建分布式应用程序的工作方式与非分布式应用程序相同。请参阅构建和运行您的应用程序
Python 应用程序不需要构建。请参阅构建和运行您的应用程序。
在分布式环境中运行应用程序需要在分布式应用程序中涉及的所有节点上启动应用程序二进制文件。必须选择单个节点充当应用程序驱动程序。这可以通过使用 --driver
命令行选项来实现。工作节点通过使用 --worker
命令行选项启动应用程序来启动。如果同时指定了这两个选项,驱动程序节点也可以充当工作程序。
必须为每个进程(驱动程序和工作程序)指定驱动程序节点的地址,以识别用于通信的适当网络接口。这可以通过 --address
命令行选项完成,该选项采用 [<IPv4/IPv6 地址或主机名>][:<端口>]
形式的值(例如,--address 192.168.50.68:10000
)
在多个节点上运行分布式应用程序时,必须为每个进程(驱动程序和工作程序)设置驱动程序的 IP(或主机名)(默认值:
0.0.0.0
)。可以不设置端口(例如,--address 192.168.50.68
)。在单节点应用程序中,可以省略驱动程序的 IP(或主机名),允许 UCX 库选择任何网络接口(
0.0.0.0
)。端口始终是可选的(默认值:
8765
)。可以不设置 IP(例如,--address :10000
)。
可以使用 --worker-address
命令行选项([<IPv4/IPv6 地址或主机名>][:<端口>]
)定义工作节点地址。如果未指定,应用程序工作程序将默认为主机地址(0.0.0.0
),端口号在 10000
和 32767
之间随机选择,且当前未使用。如果工作程序地址是本地 IP 地址,则此参数会自动设置 HOLOSCAN_UCX_SOURCE_ADDRESS
环境变量。有关详细信息,请参阅本节。
--fragments
命令行选项与 --worker
结合使用,以指定要由工作程序运行的片段名称的逗号分隔列表。如果未指定,应用程序驱动程序将为工作程序分配单个片段。要指示工作程序应运行所有片段,您可以指定 --fragments all
。
--config
命令行选项可用于指定应用程序要使用的配置文件的路径。
以下是在两个单独的节点上启动名为 my_app
的三片段应用程序的示例
应用程序驱动程序在第一个节点 (A) 上以
192.168.50.68:10000
启动,工作程序运行两个片段“fragment1”和“fragment3”。在单独的节点 (B) 上,应用程序为“fragment2”启动工作程序,该工作程序将连接到上述地址的驱动程序。
# Node A
my_app --driver --worker --address 192.168.50.68:10000 --fragments fragment1,fragment3
# Node B
my_app --worker --address 192.168.50.68:10000 --fragments fragment2
# Node A
python3 my_app.py --driver --worker --address 192.168.50.68:10000 --fragments fragment1,fragment3
# Node B
python3 my_app.py --worker --address 192.168.50.68:10000 --fragments fragment2
UCX 网络接口选择
UCX 在 Holoscan SDK 中用于分布式应用程序中跨片段的通信。它旨在根据性能特征(带宽、延迟、NUMA 局部性等)选择最佳网络设备。在某些情况下(正在调查中),UCX 找不到要使用的正确网络接口,并且应用程序无法运行。在这种情况下,您可以手动指定要使用的网络接口,方法是设置 UCX_NET_DEVICES
环境变量。
例如,如果用户想要使用网络接口 eth0
,您可以在运行应用程序之前按如下方式设置环境变量
export UCX_NET_DEVICES=eth0
或者,如果您正在使用 Holoscan CLI 运行打包的分布式应用程序,请使用 --nic eth0
选项手动指定要使用的网络接口。
可以通过运行以下命令找到可用的网络接口名称
ucx_info -d | grep Device: | awk '{print $3}' | sort | uniq
# or
ip -o -4 addr show | awk '{print $2, $4}' # to show interface name and IP
已知限制
以下是 SDK 中分布式应用程序支持的已知限制,其中一些将在未来的更新中解决
1. 即使分布式应用程序运行正确,也会显示连接错误消息。
即使应用程序运行正常,控制台中也会出现消息 Connection dropped with status -25 (Connection reset by remote peer)
。这是一个已知问题,将在未来的更新中解决,确保此消息仅在实际连接错误事件中显示。它目前在某些片段完成其工作并开始关闭时打印一次。从这些片段到保持打开状态的片段的任何连接都会在该点断开连接,从而导致记录的消息。
2. GPU 张量目前只能由 UCX 从给定节点上的单个设备发送/接收。
默认情况下,设备 ID 0 由 UCX 扩展用于在片段之间发送/接收数据。要覆盖此默认值,用户可以设置环境变量 HOLOSCAN_UCX_DEVICE_ID
。
3. 运行状况检查服务默认情况下已关闭
分布式应用程序中的运行状况检查器服务默认情况下已关闭。但是,可以通过将环境变量 HOLOSCAN_ENABLE_HEALTH_CHECK
设置为 true
(可以使用 1
和 on,
不区分大小写)来启用它。如果未设置环境变量或环境变量无效,则使用默认值(禁用)。
4. NVIDIA IGX Orin 开发者套件不支持管理端口的使用。
IGX 设备配有两个以太网端口,在 NVIDIA IGX Orin 用户指南中分别标记为端口 #4 和 #5。要在这些设备上运行分布式应用程序,用户必须确保以太网端口 #4 用于连接驱动程序和工作程序。
GXF UCX 扩展
Holoscan 的分布式应用程序功能利用了 GXF UCX 扩展。其文档可能提供有关数据如何在片段之间传输的有用补充信息。
给定一个 CMake 项目、预构建的可执行文件或 Python 应用程序,您还可以使用 Holoscan CLI 来打包和运行 Holoscan 应用程序在符合 OCI 标准的容器映像中。
分布式应用程序的环境变量
Holoscan SDK 环境变量
您可以设置环境变量来修改服务和调度程序在执行分布式应用程序时的默认操作。
HOLOSCAN_ENABLE_HEALTH_CHECK:确定运行状况检查服务是否应为分布式应用程序处于活动状态。接受诸如“true”、“1”或“on”(不区分大小写)之类的 true 值,以启用运行状况检查。如果未指定,则默认为“false”。启用后,gRPC 运行状况检查服务将被激活,允许诸如 grpc-health-probe 之类的工具监视活动性和就绪状态。此环境变量仅在分布式应用程序使用
--driver
或--worker
选项启动时使用。分布式应用程序中的运行状况检查服务与 App Driver(默认值:8765
)和/或 App Worker 在同一端口上运行。HOLOSCAN_DISTRIBUTED_APP_SCHEDULER:控制分布式应用程序使用哪个调度程序。可以将其设置为
greedy
、multi_thread
或event_based
。multithread
也被允许作为multi_thread
的同义词,以实现向后兼容性。如果未指定,则默认调度程序为multi_thread
。HOLOSCAN_STOP_ON_DEADLOCK:可以与
HOLOSCAN_DISTRIBUTED_APP_SCHEDULER
结合使用,以控制应用程序是否会在死锁时自动停止。“True”、“1”或“ON”值将被解释为 true(启用死锁时停止)。如果未指定,则为“true”。此环境变量仅在显式设置HOLOSCAN_DISTRIBUTED_APP_SCHEDULER
时使用。HOLOSCAN_STOP_ON_DEADLOCK_TIMEOUT:控制在认为应用程序处于死锁状态之前所需的无活动延迟(以毫秒为单位)。它必须是整数值(单位为毫秒)。
HOLOSCAN_MAX_DURATION_MS:设置应用程序在请求的最大持续时间(以毫秒为单位)过去后自动终止。它必须是整数值(单位为毫秒)。此环境变量仅在显式设置
HOLOSCAN_DISTRIBUTED_APP_SCHEDULER
时使用。HOLOSCAN_CHECK_RECESSION_PERIOD_MS:控制调度程序在重新检查应用程序中运算符的状态之前等待多长时间(以毫秒为单位)。它必须是浮点值(单位为毫秒)。此环境变量仅在显式设置
HOLOSCAN_DISTRIBUTED_APP_SCHEDULER
时使用。HOLOSCAN_UCX_SERIALIZATION_BUFFER_SIZE:可用于覆盖默认的 7 kB 序列化缓冲区大小。这通常不需要,因为张量类型在此缓冲区中仅存储一个小的标头,以避免显式复制其数据。但是,其他数据类型会直接复制到序列化缓冲区,在某些情况下,可能需要增加它。
HOLOSCAN_UCX_ASYNCHRONOUS:如果设置为 true,则启用片段之间 UCX 消息的异步传输(这是默认设置)。将其设置为 False 会强制同步传输。同步模式使使用诸如
BlockMemoryPool
之类的分配器更容易,因为其他张量在发送先前的张量之前不会排队。HOLOSCAN_UCX_DEVICE_ID:分布式应用程序中 UCX 发射器/接收器将使用的设备的 GPU ID。如果未指定,则默认为 0。可以使用
nvidia-smi -L
获取系统中可用的离散 GPU 列表。在分布式应用程序的片段之间发送的 GPU 数据必须在此设备上。HOLOSCAN_UCX_PORTS:当需要预先确定 UCX 通信的特定端口(例如在 Kubernetes 环境中)时,这将定义 SDK 的首选端口号。如果分布式应用程序需要三个端口(UCX 接收器)并且未设置环境变量,则 SDK 将从 10000~32767 范围中顺序选择三个未使用的端口。指定一个值,例如,
HOLOSCAN_UCX_PORTS=10000
,将导致选择端口 10000、10001 和 10002。多个起始值可以用逗号分隔。如果需要更多端口,系统将从最后一个提供的端口开始递增。任何未使用的指定端口都将被忽略。HOLOSCAN_UCX_SOURCE_ADDRESS:此环境变量指定 UCX 连接的本地 IP 地址(源)。当节点具有多个网络接口时,此变量尤其有用,使用户能够确定应使用哪个网络接口来建立 UCX 客户端 (UcxTransmitter)。如果未显式指定,则默认地址设置为
0.0.0.0
,表示任何可用接口。
UCX 特定的环境变量
多片段应用程序的片段之间的数据传输是通过 统一通信 X (UCX) 库完成的,这是一个点对点通信框架,旨在利用最佳可用硬件资源(共享内存、TCP、GPUDirect RDMA 等)。UCX 有许多参数可以通过环境变量控制。下面列出了一些与 Holoscan SDK 分布式应用程序特别相关的参数
UCX_TLS
环境变量可用于控制启用哪些传输层。默认情况下,UCX_TLS=all
,UCX 将尝试自动选择最佳传输层。UCX_NET_DEVICES
环境变量默认设置为all
,这意味着 UCX 可以选择使用任何可用的网络接口控制器 (NIC)。在某些情况下,可能需要将 UCX 限制为特定设备或一组设备,这可以通过将UCX_NET_DEVICES
设置为设备名称的逗号分隔列表来完成(即,通过 Linux 命令ifconfig -a
或ip link show
获取)。建议设置
UCX_TCP_CM_REUSEADDR=y
以便在套接字关闭后无需等待完整的套接字 TIME_WAIT 期间即可重用端口。UCX_LOG_LEVEL
环境变量可用于控制 UCX 的日志记录级别。默认设置为 WARN,但更改为较低级别(例如 INFO)将提供有关正在使用的传输和设备的更详细的输出。默认情况下,Holoscan SDK 将在应用程序启动时自动设置
UCX_PROTO_ENABLE=y
以启用较新的“v2”UCX 协议。如果由于某种原因需要较旧的 v1 协议,则可以在环境中设置UCX_PROTO_ENABLE=n
以覆盖此设置。启用 v2 协议后,可以选择设置UCX_PROTO_INFO=y
以启用有关运行时正在使用的协议的详细日志记录。默认情况下,Holoscan SDK 将在应用程序启动时自动设置
UCX_MEMTYPE_CACHE=n
以禁用 UCX 内存类型缓存(有关更多信息,请参阅 UCX 文档。它可能会导致大约 0.2 微秒的指针类型检查开销,使用 cudacudaPointerGetAttributes() CUDA API)。如果由于某种原因需要内存类型缓存,则可以在环境中设置UCX_MEMTYPE_CACHE=y
以覆盖此设置。默认情况下,Holoscan SDK 将在应用程序启动时自动设置
UCX_CM_USE_ALL_DEVICES=n
,以禁用考虑所有设备进行数据传输。如果由于某种原因需要相反的行为,则可以在环境中设置UCX_CM_USE_ALL_DEVICES=y
以覆盖此设置。设置UCX_CM_USE_ALL_DEVICES=n
可用于解决一个问题,即 UCX 有时默认使用可能不是最适合基于主机可用设备进行数据传输的设备。例如,在地址为 10.111.66.60 的主机上,与eno2
(10.111.66.60) 相比,UCX 可能会选择br-80572179a31d
(192.168.49.1) 设备,因为其带宽更优越。使用UCX_CM_USE_ALL_DEVICES=n
,UCX 将通过对数据传输使用最初用于建立连接的同一设备来确保一致性。这确保了更可预测的行为,并可以避免数据传输过程中设备不匹配可能导致的问题。设置
UCX_TCP_PORT_RANGE=<start>-<end>
可用于定义 UCX 应用于数据传输的特定端口范围。这在需要预先确定端口的环境中尤其有用,例如在 Kubernetes 设置中。在这种情况下,Pod 通常具有需要公开的端口,并且这些端口必须提前指定。此外,在防火墙配置严格且仅允许指定端口的情况下,具有预定的范围可确保 UCX 通信不会被阻止。这补充了HOLOSCAN_UCX_SOURCE_ADDRESS
,后者指定 UCX 连接的本地 IP 地址,通过进一步控制应使用该指定地址上的哪些端口。通过设置端口范围,用户可以确保 UCX 在其基础架构的网络和安全策略范围内运行。
可以通过从 Holoscan SDK 容器运行 ucx_info -f
来获取所有可用 UCX 环境变量的列表以及每个变量的简要说明。Holoscan SDK 使用 UCX 的主动消息 (AM) 协议,因此环境变量与诸如 tag-mat 之类的其他协议相关。
分布式应用程序必须序列化要在多片段应用程序的片段之间发送的任何对象。序列化涉及二进制序列化到缓冲区,该缓冲区将通过统一通信 X (UCX) 库从一个片段发送到另一个片段。对于张量类型(例如,holoscan::Tensor),不进行实际复制,而是直接从原始张量的数据进行传输,并且仅将少量标头信息复制到序列化缓冲区。
下表给出了预先注册了编解码器的类型表,以便可以使用 Holoscan SDK 在片段之间序列化它们。
类型类 | 特定类型 |
---|---|
整数 | int8_t、int16_t、int32_t、int64_t、uint8_t、uint16_t、uint32_t、uint64_t |
浮点 | float、double、complex |
布尔值 | bool |
字符串 | std::string |
std::vector |
T 是 std::string 或上述任何布尔值、整数或浮点类型 |
std::vector |
T 是 std::string 或上述任何布尔值、整数或浮点类型 |
std::vector |
特定于 HolovizOp 的 InputSpec 对象向量 |
std::shared_ptr |
T 是上述任何标量、向量或 std::string 类型 |
张量类型 | holoscan::Tensor、nvidia::gxf::Tensor、nvidia::gxf::VideoBuffer、nvidia::gxf::AudioBuffer |
GXF 特定类型 | nvidia::gxf::TimeStamp、nvidia::gxf::EndOfStream |
如果要在分布式应用程序中使用传输 CPU 和 GPU 张量的运算符,则同一输出端口不能混合使用 GPU 和 CPU 张量。CPU 和 GPU 张量输出应放置在单独的输出端口上。这是用于运算符之间零拷贝张量序列化的底层 UCX 库的限制。
作为一个具体的示例,假设一个运算符 MyOperator
,其设置方法中定义了一个名为“out”的单个输出端口。如果输出端口仅连接到片段内的其他运算符,而不是跨片段连接,则可以有一个 TensorMap
,其中单个端口上混合了主机和设备数组。
void MyOperator::setup(OperatorSpec& spec) {
spec.output<holoscan::TensorMap>("out");
}
void MyOperator::compute(OperatorSpec& spec) {
// omitted: some computation resulting in multiple holoscan::Tensors
// (two on CPU ("cpu_coords_tensor" and "cpu_metric_tensor") and one on device ("gpu_tensor").
TensorMap out_message;
// insert all tensors in one TensorMap (mixing CPU and GPU tensors is okay when ports only connect within a Fragment)
out_message.insert({"coordinates", cpu_coords_tensor});
out_message.insert({"metrics", cpu_metric_tensor});
out_message.insert({"mask", gpu_tensor});
op_output.emit(out_message, "out");
}
class MyOperator:
def setup(self, spec: OperatorSpec):
spec.output("out")
def compute(self, op_input, op_output, context):
# Omitted: assume some computation resulting in three holoscan::Tensor or tensor-like
# objects. Two on CPU ("cpu_coords_tensor" and "cpu_metric_tensor") and one on device
# ("gpu_tensor").
# mixing CPU and GPU tensors in a single dict is okay only for within-Fragment connections
op_output.emit(
dict(
coordinates=cpu_coords_tensor,
metrics=cpu_metrics_tensor,
mask=gpu_tensor,
),
"out"
)
但是,在单个端口上混合 CPU 和 GPU 数组不适用于分布式应用程序,如果运算符需要在片段之间通信,则应使用单独的端口。
void MyOperator::setup(OperatorSpec& spec) {
spec.output<holoscan::TensorMap>("out_host");
spec.output<holoscan::TensorMap>("out_device");
}
void MyOperator::compute(OperatorSpec& spec) {
// some computation resulting in a pair of holoscan::Tensor, one on CPU ("cpu_tensor") and one on device ("gpu_tensor").
TensorMap out_message_host;
TensorMap out_message_device;
// put all CPU tensors on one port
out_message_host.insert({"coordinates", cpu_coordinates_tensor});
out_message_host.insert({"metrics", cpu_metrics_tensor});
op_output.emit(out_message_host, "out_host");
// put all GPU tensors on another
out_message_device.insert({"mask", gpu_tensor});
op_output.emit(out_message_device, "out_device");
}
class MyOperator:
def setup(self, spec: OperatorSpec):
spec.output("out_host")
spec.output("out_device")
def compute(self, op_input, op_output, context):
# Omitted: assume some computation resulting in three holoscan::Tensor or tensor-like
# objects. Two on CPU ("cpu_coords_tensor" and "cpu_metric_tensor") and one on device
# ("gpu_tensor").
# split CPU and GPU tensors across ports for compatibility with inter-fragment communication
op_output.emit(
dict(coordinates=cpu_coords_tensor, metrics=cpu_metrics_tensor),
"out_host"
)
op_output.emit(dict(mask=gpu_tensor), "out_device")
Python
对于 Python API,任何支持 DLPack 接口、__array_interface__
或 __cuda_array_interface__
的类似数组的对象将使用 Tensor
序列化进行传输。这样做是为了避免数据复制以提高性能。类型为 list[holoscan.HolovizOp.InputSpec]
的对象将使用底层 C++ 序列化器用于 std::vector<HolovizOp::InputSpec>
进行发送。所有其他 Python 对象将使用 cloudpickle 库序列化为/从 std::string
。
使用 cloudpickle 施加的限制是,分布式应用程序中的所有片段必须运行相同的 Python 版本。
当调用 op_output.emit()
发射类似张量的 Python 对象时,分布式应用程序的行为与单片段应用程序的行为有所不同。具体而言,对于类似数组的对象(例如 PyTorch 张量),下游 Python 算子中任何调用 op_input.receive()
的操作都不会接收到相同的 Python 对象(即使上游和下游算子属于同一片段)。类型为 holoscan.Tensor
的对象将被接收为 holoscan.Tensor
。任何其他数据存储在设备 (GPU) 上的类数组对象都将作为 CuPy 张量接收。同样,任何数据存储在主机 (CPU) 上的类数组对象都将作为 NumPy 数组接收。如果需要,用户必须转换回原始的类数组类型(通常可以通过 DLPack 或数组接口以零拷贝方式实现)。
C++
对于需要在分布式应用程序中的片段之间进行传输的任何其他 C++ 类,用户必须创建自己的编解码器,并将其注册到 Holoscan SDK 框架中。作为一个具体的例子,假设我们有以下简单的 Coordinate 类,我们希望在片段之间发送它。
struct Coordinate {
float x;
float y;
float z;
};
要创建一个能够序列化和反序列化此类型的编解码器,应该为其定义一个 holoscan::codec
类,如下所示。
#include "holoscan/core/codec_registry.hpp"
#include "holoscan/core/errors.hpp"
#include "holoscan/core/expected.hpp"
namespace holoscan {
template <>
struct codec<Coordinate> {
static expected<size_t, RuntimeError> serialize(const Coordinate& value, Endpoint* endpoint) {
return serialize_trivial_type<Coordinate>(value, endpoint);
}
static expected<Coordinate, RuntimeError> deserialize(Endpoint* endpoint) {
return deserialize_trivial_type<Coordinate>(endpoint);
}
};
} // namespace holoscan
在此示例中,serialize
的第一个参数是对要序列化的类型的常量引用,返回值是一个 expected
,其中包含序列化的字节数。deserialize
方法返回一个 expected
,其中包含反序列化的对象。Endpoint
类是一个基类,表示序列化端点(对于分布式应用程序,实际使用的端点类是 UcxSerializationBuffer
)。
辅助函数 serialize_trivial_type
(deserialize_trivial_type
)可用于序列化(反序列化)任何普通旧数据 (POD) 类型。具体来说,POD 类型可以通过简单地将 sizeof(Type)
字节复制到/从端点进行序列化。read_trivial_type()
和 ~holoscan::Endpoint::write_trivial_type
方法也可以直接使用。
template <>
struct codec<Coordinate> {
static expected<size_t, RuntimeError> serialize(const Coordinate& value, Endpoint* endpoint) {
return endpoint->write_trivial_type(&value);
}
static expected<Coordinate, RuntimeError> deserialize(Endpoint* endpoint) {
Coordinate encoded;
auto maybe_value = endpoint->read_trivial_type(&encoded);
if (!maybe_value) { return forward_error(maybe_value); }
return encoded;
}
};
在实践中,实际上根本不需要定义 codec<Coordinate>
,因为 Coordinate
是一个可平凡序列化的类型,并且现有的 codec
将任何没有模板特化的类型都视为可平凡序列化的类型。然而,仍然需要将编解码器类型注册到 CodecRegistry
中,如下所述。
对于非平凡类型,可能还需要使用 read()
和 write()
方法来实现编解码器。有关内置编解码器使用这些方法的示例,请参见 holoscan/core/codecs.hpp
。
一旦定义了这样的编解码器,剩下的步骤就是将其注册到静态 CodecRegistry
类中。这将使分布式应用程序使用的基于 UCX 的类知道存在用于序列化此对象类型的编解码器。如果该类型特定于某个算子,则可以通过 register_codec()
类进行注册。
#include "holoscan/core/codec_registry.hpp"
namespace holoscan::ops {
void MyCoordinateOperator::initialize() {
register_codec<Coordinate>("Coordinate");
// ...
// parent class initialize() call must be after the argument additions above
Operator::initialize();
}
} // namespace holoscan::ops
在这里,提供给 register_codec
的参数是注册表将用于编解码器的名称。此名称将被序列化到消息头中,以便反序列化器知道在接收到的数据上使用哪个反序列化函数。在本例中,我们选择了一个与类名匹配的名称,但这并非必要条件。如果该名称与 CodecRegistry
类中已存在的名称匹配,则该名称下的任何现有编解码器都将被新注册的编解码器替换。
也可以在 initialize()
的上下文之外直接注册类型,方法是直接检索编解码器注册表的静态实例,如下所示。
namespace holoscan {
CodecRegistry::get_instance().add_codec<Coordinate>("Coordinate");
} // namespace holoscan
CLI 参数(例如 --driver
、--worker
、--fragments
)由 Application
(C++
/Python
) 类解析,其余参数可作为 app.argv
(C++
/Python
) 使用。
关于 用户自定义命令行参数 部分涵盖了在 ping_distributed.cpp 示例中使用 app->argv()
的具体示例。
如果您想在创建 C++
实例之前访问参数,可以通过 holoscan::Application().argv()
访问它们。
以下示例展示了如何在应用程序中访问参数。
#include <holoscan/holoscan.hpp>
class MyPingApp : public holoscan::Application {
// ...
};
int main(int argc, char** argv) {
auto my_argv =
holoscan::Application({"myapp", "--driver", "my_arg1", "--address=10.0.0.1"}).argv();
HOLOSCAN_LOG_INFO(" my_argv: {}", fmt::join(my_argv, " "));
HOLOSCAN_LOG_INFO(
" argv: {} (argc: {}) ",
fmt::join(std::vector<std::string>(argv, argv + argc), " "),
argc);
auto app_argv = holoscan::Application().argv(); // do not use reference ('auto&') here (lifetime issue)
HOLOSCAN_LOG_INFO("app_argv: {} (size: {})", fmt::join(app_argv, " "), app_argv.size());
auto app = holoscan::make_application<MyPingApp>();
HOLOSCAN_LOG_INFO("app->argv() == app_argv: {}", app->argv() == app_argv);
app->run();
return 0;
}
// $ ./myapp --driver --input image.dat --address 10.0.0.20
// my_argv: myapp my_arg1
// argv: ./myapp --driver --input image.dat --address 10.0.0.20 (argc: 6)
// app_argv: ./myapp --input image.dat (size: 3)
// app->argv() == app_argv: true
请参阅 Holoscan SDK 存储库中 Application 单元测试 中的其他示例。
关于 用户自定义命令行参数 部分涵盖了在 ping_distributed.py 示例中使用 app.argv
的具体示例。
如果您想在创建 Python
实例之前访问参数,可以通过 Application().argv
访问它们。
以下示例展示了如何在应用程序中访问参数。
import argparse
import sys
from holoscan.core import Application
class MyApp(Application):
def compose(self):
pass
def main():
app = MyApp() # or alternatively, MyApp([sys.executable, *sys.argv])
app.run()
if __name__ == "__main__":
print("sys.argv:", sys.argv)
print("Application().argv:", app.argv)
parser = argparse.ArgumentParser()
parser.add_argument("--input")
args = parser.parse_args(app.argv[1:])
print("args:", args)
main()
# $ python cli_test.py --address 10.0.0.20 --input image.dat
# sys.argv: ['cli_test.py', '--address', '10.0.0.20', '--input', 'image.dat']
# Application().argv: ['cli_test.py', '--input', 'image.dat']
# args: Namespace(input='a')
>>> from holoscan.core import Application
>>> import sys
>>> Application().argv == sys.argv
True
>>> Application([]).argv == sys.argv
True
>>> Application([sys.executable, *sys.argv]).argv == sys.argv
True
>>> Application(["python3", "myapp.py", "--driver", "my_arg1", "--address=10.0.0.1"]).argv
['myapp.py', 'my_arg1']
请参阅 Holoscan SDK 存储库中 Application 单元测试(TestApplication 类) 中的其他示例。
添加用户自定义命令行参数
当向应用程序添加用户自定义命令行参数时,应避免使用任何默认命令行参数名称,如 --help
、--version
、--config
、--driver
、--worker
、--address
、--worker-address
、--fragments
,正如 运行分布式应用程序 部分所述。建议从应用程序的 argv
((C++
/Python
)) 方法/属性中解析用户自定义参数,如上文所述,而不是直接使用 C++ char* argv[]
或 Python sys.argv
。这样,只需要解析新的用户自定义参数。
在现有的 ping_distributed 示例中,可以看到 C++ 和 Python 的具体示例,其中除了默认的应用程序参数集之外,还指定了一个应用程序定义的布尔参数 (--gpu
)。
int main() {
auto app = holoscan::make_application<App>();
// Parse args
bool tensor_on_gpu = false;
auto& args = app->argv();
if (std::find(args.begin(), args.end(), "--gpu") != std::end(args)) { tensor_on_gpu = true; }
// configure tensor on host vs. GPU
app->gpu_tensor(tensor_on_gpu);
// run the application
app->run();
return 0;
}
def main(on_gpu=False):
app = MyPingApp()
tensor_str = "GPU" if on_gpu else "host"
print(f"Configuring application to use{tensor_str}tensors")
app.gpu_tensor = on_gpu
app.run()
if __name__ == "__main__":
# get the Application's arguments
app_argv = Application().argv
parser = ArgumentParser(description="Distributed ping application.")
parser.add_argument(
"--gpu",
action="store_true",
help="Use a GPU tensor instead of a host tensor",
)
# pass app_argv[1:] to parse_args (app_argv[0] is the path of the application)
args = parser.parse_args(app_argv[1:])
main(on_gpu=args.gpu)
对于 Python,app.argv[1:]
可以与 Python argparse 模块中的 ArgumentParser
一起使用。
或者,可能更倾向于使用 parser.parse_known_args()
,以允许任何用户解析器未定义的参数传递到应用程序类本身。如果在构造 ArgumentParser
时也设置了 add_help=False
,则可以打印解析器的帮助信息,同时仍然保留默认的应用程序帮助信息(涵盖默认的分布式应用程序参数集)。以下代码块显示了这种风格的示例。
parser = ArgumentParser(description="Distributed ping application.", add_help=False)
parser.add_argument(
"--gpu",
action="store_true",
help="Use a GPU tensor instead of a host tensor",
)
# use parse_known_args to ignore other CLI arguments that may be used by Application
args, remaining = parser.parse_known_args()
# can print the parser's help here prior to the Application's help output
if "-h" in remaining or "--help" in remaining:
print("Additional arguments supported by this application:")
print(textwrap.indent(parser.format_help(), " "))
main(on_gpu=args.gpu)