Holoscan 应用程序中的 CUDA 流处理
CUDA 提供了流的概念,以允许在 GPU 上进行异步并发执行。每个流都是按顺序执行的命令序列,但在单独流上启动的工作可以潜在地并发操作。示例包括在单独的流中运行多个内核或重叠数据传输和内核执行。请参阅 CUDA 编程指南的异步并发执行章节。
CudaStreamPool
类 (C++
/Python
) 是一种资源,它提供了一种从流池中分配 CUDA 流的机制,该流池的生命周期由 Holoscan 管理。从 Holoscan v2.8 开始,提供了新的 API,使应用程序作者可以更轻松地使用专用 CUDA 流。这些 API 旨在替代下文注释中描述的旧版 CudaStreamHandler
实用程序。
存在一个旧版 CudaStreamHandler
实用程序类(通过 #include "holoscan/utils/cuda_stream_handler.hpp"
提供),它使编写可以使用 CudaStreamPool
的 C++ 运算符成为可能。此类有一些限制
它要求接收类型为
holoscan::gxf::Entity
的消息。它要求使用底层 GXF 库中的
nvidia::gxf::Entity
和nvidia::gxf::Handle
方法。它不适用于原生 Python 运算符。
此现有实用程序仍然提供用于向后兼容性,并且使用它的运算符可以继续与使用新 API 的运算符互操作。但是,我们鼓励运算符作者迁移到使用新的 API。
从 Holoscan v2.9 开始,如果用户没有另外提供,则默认的 CudaStreamPool
将添加到所有运算符。这意味着在大多数情况下,用户无需显式添加流池。默认流池具有无界大小,未设置标志,优先级值为 0。如果用户想要分配具有不同标志或优先级的流,则可以按照以下部分将自定义流池添加到运算符。
仅当应用程序(片段)在没有任何支持 CUDA 的设备的节点上运行时,才不会添加默认流池。在这种情况下,由于无法使用 CUDA,因此不会添加默认流池。
要使运算符能够分配 CUDA 流,用户可以传递 CudaStreamPool
,如以下示例所示。Holoscan SDK 中用于流处理的通用模式是让每个想要使用非默认流的运算符都分配一个 CudaStreamPool
。然后,该运算符将从流池中保留一个专用流,供其启动的任何内核使用。允许多个运算符使用相同的流池,共享池的“max_size”等于至少共享它的运算符的数量。
请注意,CudaStreamPool
将管理 SDK 使用的任何 CUDA 流的生命周期。用户通常不需要显式调用任何 CUDA API 来创建或销毁流。请注意,来自单个 CudaStreamPool
的所有流都在单个设备上(CUDA ID 作为传递给“dev_id”参数)。如果工作流程涉及在单独的 CUDA 设备上运行的运算符,则这些运算符必须使用为相应设备配置的单独流池。
// The code below would appear within `Application::compose` (or `Fragment::compose`)
// Create a stream pool with a 5 streams capacity (5 operators could share the same pool)
const auto cuda_stream_pool = make_resource<CudaStreamPool>("stream_pool",
Arg("dev_id", 0),
Arg("stream_flags", 0u),
Arg("stream_priority", 0),
Arg("reserved_size", 1u),
Arg("max_size", 5u));
auto my_op = make_operator<MyOperator>("my_op", cuda_stream_pool, arg_list);
// Alternatively, the argument can be added via `add_arg` after operator construction
// auto my_op = make_operator<MyOperator>("my_op", arg_list);
// my_op->add_arg(cuda_stream_pool);
请注意,旧版 CudaStreamHandler
实用程序不支持以这种方式传递流池,而是要求用户显式地向运算符的私有数据成员添加参数。
private:
// The legacy CudaStreamHandler required a "cuda_stream_pool" parameter.
// The spec.param call in the Operator's `setup` method would use the name "cuda_stream_pool"
// for it
Parameter<std::shared_ptr<CudaStreamPool>> cuda_stream_pool_{};
为了向后兼容以前的版本,先前使用 CudaStreamHandler
实用程序类的内置运算符仍然提供此显式定义的“cuda_stream_pool”参数。用户无需将其添加到自己的运算符中,除非他们更喜欢在初始化运算符时显式使用名为“cuda_stream_pool”的 Arg
参数。
auto visualizer = make_operator<HolovizOp>(
"visualizer",
from_config("holoviz"),
Arg("cuda_stream_pool", make_resource<CudaStreamPool>(0, 0, 0, 1, 5)));
# The code below would appear within `Application.compose` (or `Fragment.compose`)
# Create a stream pool with a 5 streams capacity (5 operators could share the same pool)
cuda_stream_pool = CudaStreamPool(
self,
name="stream_pool",
dev_id=0,
stream_flags=0,
stream_priority=0,
reserved_size=1,
max_size=5,
)
my_op = MyOperator(self, cuda_stream_pool, name="my_op", **my_kwargs)
# Alternatively, the argument can be added via `add_arg` after operator construction
# auto my_op = MyOperator(self, name="my_op", **my_kwargs)
# my_op.add_arg(cuda_stream_pool)
以上是用户定义的运算符添加 CudaStreamPool
的推荐方法。为了向后兼容性,SDK 的内置运算符已经具有基于关键字的 cuda_stream_pool
参数,它们继续允许像以下示例中那样传递流池
visualizer = HolovizOp(
self,
name="holoviz",
cuda_stream_pool=CudaStreamPool(self, 0, 0, 0, 1, 5),
**self.kwargs("holoviz"))
由于 CUDA 内核由主机 (CPU) 异步启动,因此 compute
方法可能在 GPU 上的底层计算完成之前返回(请参阅下文有关在此场景中进行基准测试的相关警告)。在这种情况下,有关所用流的信息必须与数据一起发送,以便下游运算符可以处理任何需要的流同步。例如,如果上游内核在启动 CUDA 内核后立即发出 Tensor
对象,则下游运算符需要确保内核已完成,然后才能访问张量的数据。
CudaStreamPool
(C++
/Python
) 类在后台分配 nvidia::gxf::CudaStream
对象。这些流对象作为底层 GXF 库的实体组件系统中的组件存在。GXF 定义了一个 nvidia::gxf::CudaStreamId
结构,其中包含与流对应的“组件 ID”。正是这个 CudaStreamId
结构实际上与从输出端口发出的每条消息一起传输。Holoscan 应用程序作者无需直接与 CudaStream
或 CudaStreamId
类交互,而是使用标准的 CUDA 运行时 API cudaStream_t
类型,该类型由下文各节中描述的 Holoscan 公共流处理方法返回。诸如 receive_cuda_stream
(C++
/Python
) 或 allocate_cuda_stream
(C++
/Python
) 之类的方法返回一个 cudaStream_t
,它对应于底层的 CudaStream
对象。类似地,诸如 set_cuda_stream
(C++
/Python
) 和 device_from_stream
(C++
/Python
) 之类的方法将 cudaStream_t
作为输入,但仅接受对应于底层 CudaStream
对象的 cudaStream_t
,这些对象的生命周期可以由 SDK 管理。
SDK 提供了几种公开可访问的方法来处理可以从运算符的 compute
方法调用的流。下面将详细介绍这些方法。
在许多情况下,用户只需要在其 compute
方法中使用 InputContext
提供的 receive_cuda_stream
(C++
/Python
) 方法。这是因为该方法自动管理流处理的多个方面
它自动将命名输入端口上找到的任何流同步到运算符的内部 CUDA 流
首次调用
compute
时,将从分配的CudaStreamPool
中分配运算符的内部 CUDA 流。然后在所有后续compute
调用中重用相同的流。有一个布尔标志也可以强制同步到默认流(默认为 false)
它返回与运算符的内部流对应的
cudaStream_t
。
用户应将此返回的流用于要在非默认流上运行的任何内核或内存复制操作。
它将步骤 2 中返回的流对应的 CUDA 设备设置为活动 CUDA 设备
此方法自动配置所有输出端口,以将步骤 2 中返回的流作为每个发送消息中的组件发出。
此 ID 将允许下游运算符知道此消息中接收到的任何数据使用了哪个流。
请确保对于给定的输入端口,始终在 receive_cuda_stream
之前 调用 receive
。这是必要的,因为 receive
调用实际上接收消息,并允许运算符了解输入端口消息中找到的任何流 ID。receive
方法仅在内部记录有关找到的任何流的信息。后续的 receive_cuda_stream
调用是执行同步并返回输入流同步到的 cudaStream_t
所必需的。
以下是内置 BayerDemosaicOp
中此方法的典型用法示例
// The code below would appear within `Operator::compute`
// Process input message
auto maybe_message = op_input.receive<gxf::Entity>("receiver");
if (!maybe_message || maybe_message.value().is_null()) {
throw std::runtime_error("No message available");
}
auto in_message = maybe_message.value();
// Get the CUDA stream from the input message if present, otherwise generate one.
// This stream will also be transmitted on the "tensor" output port.
cudaStream_t cuda_stream = op_input.receive_cuda_stream("receiver", // input port name
true, // allocate
false); // sync_to_default
// assign the CUDA stream to the NPP stream context
npp_stream_ctx_.hStream = cuda_stream;
请注意,BayerDemosaicOp
是使用 C++ 选项卡中显示的代码在 C++ 中实现的,但这显示了等效代码在 Python API 中的外观。
# The code below would appear within `Operator.compute`
# Process input message
in_message = op_input.receive("receiver")
if in_message is None:
raise RuntimeError("No message available")
# Get the CUDA stream from the input message if present, otherwise generate one.
# This stream will also be transmitted on the "tensor" output port.
cuda_stream_ptr = op_input.receive_cuda_stream("receiver", allocate=True, sync_to_default=False)
# can then use cuda_stream_ptr to create a `cupy.cuda.ExternalStream` context, for example
可以看到,对于“receiver”输入端口,在调用 receive_cuda_stream
之前调用了 receive
,这是必需的。另请注意,与旧版 CudaStreamHandler
实用程序类不同,无需在“receive”调用中使用 gxf::Entity
类型。某些内置运算符(如 BayerDemosaicOp
)使用该类型,以支持 nvidia::gxf::VideoBuffer
类型和常用的 Tensor
类型作为输入。如果仅支持 Tensor
,我们可以使用 receive<std::shared_ptr<Tensor>>
或 receive<TensorMap>
来代替。
receive_cuda_stream
的第二个布尔参数默认为 true,表示运算符应分配自己的内部流。可以将其设置为 false,以不允许运算符从流池中分配自己的内部流。请参阅下面的注释,了解在 receive_cuda_stream
在这种情况下如何运行的详细信息。
还有 receive_cuda_stream
的可选第三个参数,它是一个布尔值,用于指定是否也应执行输入流(和内部流)与 CUDA 默认流的同步。此选项默认为 false
。
当 CudaStreamPool
以上述方式之一传递给运算符时,上述 receive_cuda_stream
的描述是准确的。有关如果运算符由于 CudaStreamPool
不可用而无法分配内部流时此方法如何运行的更多详细信息,请参阅下面的注释。
避免来自 Python 的 CUDA Array Interface 的额外同步
Python 应用程序在 Holoscan 的 Tensor 和第三方张量对象之间转换时,通常使用 CUDA Array Interface。此接口默认执行其自身的显式同步(此处描述)。当使用 receive_cuda_stream
时,这可能是没有必要的,receive_cuda_stream
已经将输入中找到的流与运算符的内部流同步。可以将环境变量 CUPY_CUDA_ARARAY_INTERFACE_SYNC
设置为 0,以在通过数组接口从 holoscan Tensor 创建 CUDA 数组时禁用 CuPy 的额外同步。类似地,可以将 HOLOSCAN_CUDA_ARRAY_INTERFACE_SYNC
设置为 0,以在从第三方张量创建 Holoscan 张量时禁用 Holoscan 端的数组接口同步。
使用 receive_cuda_stream
而没有可用的流池
本节介绍在运算符的 CudaStreamPool
中没有可用流(或 receive_cuda_stream
的 allocate
参数设置为 false)的情况下 receive_cuda_stream
的行为。在这种情况下,receive_cuda_stream
将无法为运算符自身的使用分配专用内部流。相反,将返回与命名输入端口上找到的第一个流对应的 cudaStream_t
,并且该输入端口上的任何其他流都将同步到它。如果在另一个输入端口上进行了后续的 receive_cuda_stream
调用,则在该第二个端口上找到的任何流都将同步到第一个 receive_cuda_stream
调用返回的 cudaStream_t
,并且返回的流是相同的 cudaStream_t
。换句话说,在首次调用 receive_cuda_stream
时找到的第一个流将被重新用作运算符的内部流,任何其他输入流都将同步到该流。相同的流也将是在输出端口上自动发出的流。
如果不存在 CudaStreamPool
并且输入端口没有找到流(或任何先前的 receive_cuda_stream
调用都没有找到另一个端口的流),则 receive_cuda_stream
将返回默认流 (cudaStreamDefault
)。在这种情况下,输出端口上不会发出任何流。
receive_cuda_streams
(C++
/Python
) 方法专为高级用例而设计,在这些用例中,应用程序作者需要手动管理 CUDA 流的所有方面的同步、分配和发出。与 receive_cuda_stream
不同,此方法不执行同步,不自动分配内部 CUDA 流,不更新活动 CUDA 设备,也不配置任何要在输出端口上发出的流。相反,它仅返回 std::vector<std::optional<cudaStream_t>>
,这是一个大小等于输入端口上消息数量的向量。向量中的每个值对应于消息指定的 cudaStream_t
(如果未找到流 ID,则为 std::nullopt
)。
请注意,与 receive_cuda_stream
一样,重要的是,对于端口的任何 receive_cuda_streams
调用都在该同一端口的相应 receive
调用之后。下面给出一个示例
// The code below would appear within `Operator::compute`
// Process a "receivers" port (e.g. one having IOSpec::kAnySize) that may
// have an arbitrary number of connections, each of which may have sent a
// TensorMap.
auto maybe_tensors = op_input.receive<std::vector<Tensor>>("receivers");
if (!maybe_tensors) { throw std::runtime_error("No message available"); }
auto tensormaps = maybe_tensors.value();
// Get a length two vector of std::option<CudaStream_t> containing any streams
// found by the any of the above receive calls.
auto cuda_streams = op_input.receive_cuda_streams("receivers");
# The code below would appear within `Operator.compute`
auto tensors = op_input.receive("receivers")
if tensors is None:
raise RuntimeError("No message available on 'receivers' input")
cuda_stream_ptrs = op_input.receive_cuda_streams("receivers")
allocate_cuda_stream
(C++
/Python
) 方法可用于从运算符的 CudaStreamPool
中分配其他 CUDA 流。如果运算符没有关联的流池,或者流池中的所有流都已在使用中,则将返回 unexpected
(或 Python 中的 None
)。为分配提供用户提供的流名称,以便对于给定的名称,仅在首次调用该方法时才分配新流。然后,在后续使用相同名称的任何调用中重用相同的流。以这种方式分配的流不会自动在输出端口上发出。如果需要这样做,用户必须在调用端口的 emit
之前,通过调用输出端口的 set_cuda_stream
来专门发出流 ID。
// The code below would appear within `Operator::compute`
cudaStream_t my_stream = context.allocate_cuda_stream("my_stream");
// some custom code using the CUDA stream here
// emit the allocated stream on the "out" port
op_output.set_cuda_stream(my_stream, "out");
# The code below would appear within `Operator.compute`
my_stream_ptr = context.allocate_cuda_stream("my_stream")
# some custom code using the CUDA stream here
# emit the allocated stream on the "out" port
op_output.set_cuda_stream(my_stream, "out")
synchronize_streams
(C++
/Python
) 方法采用(可选)cudaStream_t
值的向量,并将所有这些流同步到指定的 target_cuda_stream
。目标流也可以出现在要同步的流向量中(对于向量中与目标流相同的任何元素,将跳过同步)。如果应用程序作者正在使用上述 receive_cuda_stream
API,则通常会处理任何需要的同步,而无需调用此方法。它为手动流处理用例提供。
device_from_stream
(C++
/Python
) 方法采用 cudaStream_t
值,并返回与该流对应的整数 CUDA 设备 ID。此方法仅支持为此类由 Holoscan SDK 管理的流查询设备(即,它仅支持由 receive_cuda_stream
、receive_cuda_streams
或 allocate_cuda_stream
返回的流)。
上面提到 receive_cuda_stream
自动处理输入端口上找到的流的同步。如果流上的工作尚未完成,并且 compute
方法将要执行需要同步的操作(例如设备到主机内存复制),则将花费一些时间等待上游运算符在输入流上启动的工作完成。显式指定给定输入端口上流的工作必须在调度程序执行运算符(调用其 compute
方法)之前完成可能是有益的。
要要求输入流上的工作在运算符准备好调度之前完成,可以将 CudaStreamCondition
(C++
/Python
) 添加到运算符。当消息发送到已分配 CudaStreamCondition
的端口时,此条件会在在此输入端口上找到的 CUDA 流上设置内部主机回调函数。回调函数将在流上的其他工作完成后将运算符的状态设置为 READY。然后,这将允许调度程序执行运算符。
CudaStreamCondition
的一个限制是它仅在输入端口队列中的第一条消息中查找流。它目前不支持处理在同一消息(实体)内或跨队列中多条消息具有多个不同输入流组件的端口。CudaStreamCondition
的行为足以满足 Holoscan 的默认队列大小为 1 的情况以及与 receive_cuda_stream
一起使用的情况,后者仅在上游运算符的传出消息中放置单个 CUDA 流组件。不适用它的情况是
输入端口的队列大小被显式设置为容量大于 1,并且不知道队列中的所有消息是否对应于相同的 CUDA 流。
输入端口是多接收器端口(即
IOSpec::kAnySize
),任何数量的上游运算符都可以连接到该端口。
在输入消息中未找到流的情况下,此条件将允许执行运算符。
用法示例如下
// The code below would appear within `Application::compose` (or `Fragment::compose`)
// assuming the Operator has a port named "in", we can create the condition
auto stream_cond = make_condition<CudaStreamCondition>(name="stream_sync", receiver="in")
// it can then be passed as an argument to `make_operator`
auto my_op = make_operator<ops::MyOperator>("my_op",
stream_cond,
from_config("my_operator"));
)
# The code below would appear within `Application.compose` (or `Fragment.compose`)
# assuming the Operator has a port named "in", we can create the condition
stream_cond = CudaStreamCondition(self, receiver="in", name="stream_sync")
# the condition is then passed as a positional argument to an Operator's constructor
visualizer = MyOperator(
self,
stream_cond,
**my_kwargs,
name="my_op",
)
本节介绍应用程序作者在使用异步启动内核的运算符时可能遇到的几种令人惊讶的行为。如上所述,一旦 CUDA 内核启动,控制权立即返回到主机,并且 compute
方法可能会在 GPU 上的所有工作完成之前退出。这对于应用程序性能是可取的,但也提出了一些应用程序作者应注意的其他注意事项。
诸如内置 {ref}数据流跟踪<holoscan-flow-tracking>
或 {ref}GXF 作业统计<gxf-job-satistics>
度量标准之类的工具会报告运算符在 compute
方法中花费的时间。当实际 GPU 内核在 compute
调用结束后在稍后的时间完成时,这可能会产生误导性的短时间。一个具体的例子是,当上游运算符异步启动 CUDA 内核,然后下游运算符需要执行设备到主机传输(需要同步)时。在这种情况下,下游运算符将需要等待上游运算符启动的内核完成,因此该上游内核的时间将反映在下游运算符的 compute
持续时间中(假设没有使用 CudaStreamCondition
强制上游内核在调用下游 compute
方法之前完成)。
在这种情况下,建议使用 Nsight Systems 进行性能分析,以更详细地了解应用程序计时。Nsight Systems UI 将具有 CUDA 调用的每个流跟踪以及任何调度程序工作线程的单独跟踪,这些跟踪显示运算符 compute
调用的持续时间。
当运算符使用 Allocator
(例如 UnboundedAllocator
、BlockMemoryPool
、RMMAllocator
或 StreamOrderedAllocator
)在每次 compute
调用时动态分配内存时,可能需要比最初估计的更多的内存。例如,如果内核已启动,但 compute
在张量仍在进行计算时返回,则上游运算符可以再次自由调度。如果该上游运算符正在使用 Allocator
,则先前计算调用的内存仍在使用中。因此,运算符需要空间在原始张量之上分配第二个张量。这意味着作者必须设置比他们原本估计的更大的所需字节数(或块数)(例如,多达 2 倍)。