NVIDIA Holoscan SDK v2.9.0

创建应用程序

在本节中,我们将讨论

注意

本节介绍作为单个片段运行的应用程序的基础知识。对于多片段应用程序,请参阅分布式应用程序文档

以下代码片段显示了一个 Application 代码框架示例

  • 我们定义了从 Application 基类继承的 App 类。

  • 我们在 main() 中使用 make_application() 函数创建 App 类的实例。

  • run() 方法启动应用程序,这将执行其 compose() 方法,在其中定义自定义工作流程。

复制
已复制!
            

#include <holoscan/holoscan.hpp> class App : public holoscan::Application { public: void compose() override { // Define Operators and workflow // ... } }; int main() { auto app = holoscan::make_application<App>(); app->run(); return 0; }

  • 我们定义了从 Application 基类继承的 App 类。

  • 我们在从 __main__ 调用的 main() 函数中创建 App 类的实例。

  • run() 方法启动应用程序,这将执行其 compose() 方法,在其中定义自定义工作流程。

复制
已复制!
            

from holoscan.core import Application class App(Application): def compose(self): # Define Operators and workflow # ... def main(): app = App() app.run() if __name__ == "__main__": main()

注意

建议从单独的 main() 函数中调用 run(),而不是直接从 __main__ 中调用。这将确保在 Python 进程退出之前调用 Application 的析构函数。

提示

这也在 hello_world 示例中进行了说明。


也可以异步启动应用程序(即,对于启动应用程序的线程是非阻塞的),如下所示

这可以通过简单地将对 run() 的调用替换为 run_async() 来完成,后者返回 std::futurefuture.get() 的调用将阻塞,直到应用程序完成运行,并且如果在执行期间发生运行时错误,则会抛出异常。

复制
已复制!
            

int main() { auto app = holoscan::make_application<App>(); auto future = app->run_async(); future.get(); return 0; }

这可以通过简单地将对 run() 的调用替换为 run_async() 来完成,后者返回 Python concurrent.futures.Futurefuture.result() 的调用将阻塞,直到应用程序完成运行,并且如果在执行期间发生运行时错误,则会引发异常。

复制
已复制!
            

def main(): app = App() future = app.run_async() future.result() if __name__ == "__main__": main()

提示

这也在 ping_simple_run_async 示例中进行了说明。

可以在不同级别配置应用程序

  1. 提供需要加载的 GXF 扩展(当使用 GXF 算子时)。

  2. 配置应用程序的参数,包括:a. 工作流程中算子的参数。b. 应用程序的调度器

  3. 部署到生产环境时配置一些运行时属性

以下部分将介绍如何配置它们中的每一个,首先介绍对基于 YAML 的配置的本机支持,以方便使用。

YAML 配置支持

Holoscan 支持在运行时从 YAML 配置文件加载任意参数,从而方便配置上面列出的每个项,或者您希望在现有 API 之上添加的其他自定义参数。对于 C++ 应用程序,它还提供了在无需重新编译的情况下更改应用程序行为的能力。

注意

YAML 实用程序的使用是可选的。配置可以硬编码在您的程序中,或者使用您选择的任何解析器完成。

这是一个 YAML 配置示例

复制
已复制!
            

string_param: "test" float_param: 0.50 bool_param: true dict_param: key_1: value_1 key_2: value_2

可以使用以下两种方法来提取这些参数

  • config() 方法接受 YAML 配置文件的路径。如果输入路径是相对路径,则它将相对于当前工作目录。

  • from_config() 方法返回 YAML 文件中给定键的 ArgList 对象。它保存一个 Arg 对象列表,每个对象都包含一个名称(键)和一个值。

    • 如果 ArgList 对象只有一个 Arg(当键指向标量项时),则可以使用 as() 方法将类型作为参数传递,将其转换为所需的类型。

    • 键可以是点分隔的字符串,用于访问嵌套字段。

  • config_keys() 方法返回可通过 from_config() 访问的键名称的无序集合。

复制
已复制!
            

// Pass configuration file auto app = holoscan::make_application<App>(); app->config("path/to/app_config.yaml"); // Scalars auto string_param = app->from_config("string_param").as<std::string>(); auto float_param = app->from_config("float_param").as<float>(); auto bool_param = app->from_config("bool_param").as<bool>(); // Dict auto dict_param = app->from_config("dict_param"); auto dict_nested_param = app->from_config("dict_param.key_1").as<std::string>(); // Print std::cout << "string_param: " << string_param << std::endl; std::cout << "float_param: " << float_param << std::endl; std::cout << "bool_param: " << bool_param << std::endl; std::cout << "dict_param:\n" << dict_param.description() << std::endl; std::cout << "dict_param['key1']: " << dict_nested_param << std::endl; // // Output // string_param: test // float_param: 0.5 // bool_param: 1 // dict_param: // name: arglist // args: // - name: key_1 // type: YAML::Node // value: value_1 // - name: key_2 // type: YAML::Node // value: value_2 // dict_param['key1']: value_1

  • config() 方法接受 YAML 配置文件的路径。如果输入路径是相对路径,则它将相对于当前工作目录。

  • kwargs() 方法返回 YAML 文件中给定键的常规 Python 字典。

    • 高级:此方法包装了 from_config() 方法,类似于 C++ 等效方法,如果键指向映射项,则返回 ArgList 对象;如果键指向标量项,则返回 Arg 对象。Arg 对象可以强制转换为所需的类型(例如,str(app.from_config("string_param")))。

  • config_keys() 方法返回可通过 from_config() 访问的键名称的集合。

复制
已复制!
            

# Pass configuration file app = App() app.config("path/to/app_config.yaml") # Scalars string_param = app.kwargs("string_param")["string_param"] float_param = app.kwargs("float_param")["float_param"] bool_param = app.kwargs("bool_param")["bool_param"] # Dict dict_param = app.kwargs("dict_param") dict_nested_param = dict_param["key_1"] # Print print(f"string_param:{string_param}") print(f"float_param:{float_param}") print(f"bool_param:{bool_param}") print(f"dict_param:{dict_param}") print(f"dict_param['key_1']:{dict_nested_param}") # # Output: # string_param: test # float_param: 0.5 # bool_param: True # dict_param: {'key_1': 'value_1', 'key_2': 'value_2'} # dict_param['key_1']: 'value_1'

警告

from_config() 此时不能用作 内置算子 的输入。因此,建议在 Python 中使用 kwargs()

提示

这也在 video_replayer 示例中进行了说明。

注意

对于 from_configkwargs,如果该项值是标量,则返回的 ArgList/字典将同时包含键及其关联项。如果该项本身是映射/字典,则输入键将被删除,并且输出将仅保留来自该项的键/值。

加载 GXF 扩展

如果您使用的算子依赖于 GXF 扩展来实现(称为 GXF 算子),则这些扩展的共享库 (.so) 需要在运行时动态加载为插件。

SDK 已经自动处理加载 内置算子(在 C++ 和 Python 中)以及常见扩展(此处列出)所需的扩展。要为自己的算子加载其他扩展,您可以使用以下方法之一

复制
已复制!
            

extensions: - libgxf_myextension1.so - /path/to/libgxf_myextension2.so

复制
已复制!
            

auto app = holoscan::make_application<App>(); auto exts = {"libgxf_myextension1.so", "/path/to/libgxf_myextension2.so"}; for (auto& ext : exts) { app->executor().extension_manager()->load_extension(ext); }

复制
已复制!
            

from holoscan.gxf import load_extensions from holoscan.core import Application app = Application() context = app.executor.context_uint64 exts = ["libgxf_myextension1.so", "/path/to/libgxf_myextension2.so"] load_extensions(context, exts)

注意

为了可被发现,这些共享库的路径需要是绝对路径、相对于您的工作目录的路径、安装在 holoscan 包的 lib/gxf_extensions 文件夹中,或者列在 HOLOSCAN_LIB_PATHLD_LIBRARY_PATH 环境变量下。

请参阅 Holoscan SDK 存储库中系统测试中的其他示例。

配置算子

算子在应用程序的 compose() 方法中定义。在调用应用程序的 run() 方法之前,它们不会被实例化(使用 initialize 方法)。

算子有三种类型的字段可以配置:参数、条件和资源。

配置算子参数

算子可以在其 setup 方法中定义参数,以更好地控制其行为(请参阅创建您自己的算子时的详细信息)。下面的代码片段将是名为 MyOp 的最小算子的此方法的实现,该算子接受字符串和布尔值作为参数;为了本示例的目的,我们将忽略任何额外的细节

复制
已复制!
            

void setup(OperatorSpec& spec) override { spec.param(string_param_, "string_param"); spec.param(bool_param_, "bool_param"); }

复制
已复制!
            

def setup(self, spec: OperatorSpec): spec.param("string_param") spec.param("bool_param") # Optional in python. Could define `self.<param_name>` instead in `def __init__`

提示

给定算子类的实例,您可以打印其规范的人类可读描述,以检查可以在该算子类上配置的参数字段

复制
已复制!
            

std::cout << operator_object->spec()->description() << std::endl;

复制
已复制!
            

print(operator_object.spec)

给定此 YAML 配置

复制
已复制!
            

myop_param: string_param: "test" bool_param: true bool_param: false # we'll use this later

我们可以在应用程序的 compose 方法中配置 MyOp 算子的实例,如下所示

复制
已复制!
            

void compose() override { // Using YAML auto my_op1 = make_operator<MyOp>("my_op1", from_config("myop_param")); // Same as above auto my_op2 = make_operator<MyOp>("my_op2", Arg("string_param", std::string("test")), // can use Arg(key, value)... Arg("bool_param") = true // ... or Arg(key) = value ); }

复制
已复制!
            

def compose(self): # Using YAML my_op1 = MyOp(self, name="my_op1", **self.kwargs("myop_param")) # Same as above my_op2 = MyOp(self, name="my_op2", string_param="test", bool_param=True, )

提示

这也在 ping_custom_op 示例中进行了说明。

如果提供了具有重复键的多个 ArgList,则最新的 ArgList 会覆盖它们

复制
已复制!
            

void compose() override { // Using YAML auto my_op1 = make_operator<MyOp>("my_op1", from_config("myop_param"), from_config("bool_param") ); // Same as above auto my_op2 = make_operator<MyOp>("my_op2", Arg("string_param", "test"), Arg("bool_param") = true, Arg("bool_param") = false ); // -> my_op `bool_param_` will be set to `false` }

复制
已复制!
            

def compose(self): # Using YAML my_op1 = MyOp(self, name="my_op1", from_config("myop_param"), from_config("bool_param"), ) # Note: We're using from_config above since we can't merge automatically with kwargs # as this would create duplicated keys. However, we recommend using kwargs in Python # to avoid limitations with wrapped operators, so the code below is preferred. # Same as above params = self.kwargs("myop_param").update(self.kwargs("bool_param")) my_op2 = MyOp(self, name="my_op2", params) # -> my_op `bool_param` will be set to `False`

配置算子条件

默认情况下,没有输入端口的算子将持续运行,而具有输入端口的算子只要接收到输入就会运行(因为它们配置了 MessageAvailableCondition)。

要更改该行为,可以将一个或多个其他条件类传递给算子的构造函数,以定义何时应执行它。

例如,我们在算子 my_op 上设置了三个条件

复制
已复制!
            

void compose() override { // Limit to 10 iterations auto c1 = make_condition<CountCondition>("my_count_condition", 10); // Wait at least 200 milliseconds between each execution auto c2 = make_condition<PeriodicCondition>("my_periodic_condition", "200ms"); // Stop when the condition calls `disable_tick()` auto c3 = make_condition<BooleanCondition>("my_bool_condition"); // Pass directly to the operator constructor auto my_op = make_operator<MyOp>("my_op", c1, c2, c3); }

复制
已复制!
            

def compose(self): # Limit to 10 iterations c1 = CountCondition(self, 10, name="my_count_condition") # Wait at least 200 milliseconds between each execution c2 = PeriodicCondition(self, timedelta(milliseconds=200), name="my_periodic_condition") # Stop when the condition calls `disable_tick()` c3 = BooleanCondition(self, name="my_bool_condition") # Pass directly to the operator constructor my_op = MyOp(self, c1, c2, c3, name="my_op")

提示

这也在 conditions 示例中进行了说明。

注意

如果将多个条件应用于一个算子,则需要为条件指定唯一的名称。

配置算子资源

一些资源可以传递给算子的构造函数,通常是作为常规参数传递的分配器

例如

复制
已复制!
            

void compose() override { // Allocating memory pool of specific size on the GPU // ex: width * height * channels * channel size in bytes auto block_size = 640 * 480 * 4 * 2; auto p1 = make_resource<BlockMemoryPool>("my_pool1", 1, size, 1); // Provide unbounded memory pool auto p2 = make_condition<UnboundedAllocator>("my_pool2"); // Pass to operator as parameters (name defined in operator setup) auto my_op = make_operator<MyOp>("my_op", Arg("pool1", p1), Arg("pool2", p2)); }

复制
已复制!
            

def compose(self): # Allocating memory pool of specific size on the GPU # ex: width * height * channels * channel size in bytes block_size = 640 * 480 * 4 * 2; p1 = BlockMemoryPool(self, name="my_pool1", storage_type=1, block_size=block_size, num_blocks=1) # Provide unbounded memory pool p2 = UnboundedAllocator(self, name="my_pool2") # Pass to operator as parameters (name defined in operator setup) auto my_op = MyOp(self, name="my_op", pool1=p1, pool2=p2)

原生资源创建

与 SDK 捆绑在一起的资源包装了底层的 GXF 组件。但是,也可以定义“原生”资源,而无需创建和包装底层的 GXF 组件。这种资源也可以像上一节中创建的资源一样有条件地传递给算子。

例如

要创建原生资源,请实现一个从 Resource 继承的类

复制
已复制!
            

namespace holoscan { class MyNativeResource : public holoscan::Resource { public: HOLOSCAN_RESOURCE_FORWARD_ARGS_SUPER(MyNativeResource, Resource) MyNativeResource() = default; // add any desired parameters in the setup method // (a single string parameter is shown here for illustration) void setup(ComponentSpec& spec) override { spec.param(message_, "message", "Message string", "Message String", std::string("test message")); } // add any user-defined methods (these could be called from an Operator's compute method) std::string message() { return message_.get(); } private: Parameter<std::string> message_; }; } // namespace: holoscan

setup 方法可用于定义资源所需的任何参数。

此资源可以与 C++ 算子一起使用,就像任何其他资源一样。例如,算子可能具有一个参数,该参数保存指向 MyNativeResource 的共享指针,如下所示。

复制
已复制!
            

private: class MyOperator : public holoscan::Operator { public: HOLOSCAN_OPERATOR_FORWARD_ARGS(MyOperator) MyOperator() = default; void setup(OperatorSpec& spec) override { spec.param(message_resource_, "message_resource", "message resource", "resource printing a message"); } void compute(InputContext&, OutputContext& op_output, ExecutionContext&) override { HOLOSCAN_LOG_TRACE("MyOp::compute()"); // get a resource based on its name (this assumes the app author named the resource "message_resource") auto res = resource<MyNativeResource>("message_resource"); if (!res) { throw std::runtime_error("resource named 'message_resource' not found!"); } // call a method on the retrieved resource class auto message = res->message(); }; private: Parameter<std::shared_ptr<holoscan::MyNativeResource> message_resource_; }

上面的 compute 方法演示了如何使用模板化的 resource 方法来检索资源。

并且可以通过常用方式通过命名参数创建和传递资源

复制
已复制!
            

// example code for within Application::compose (or Fragment::compose) auto message_resource = make_resource<holoscan::MyNativeResource>( "message_resource", holoscan::Arg("message", "hello world"); auto my_op = std::make_operator<holoscan::ops::MyOperator>( "my_op", holoscan::Arg("message_resource", message_resource));

与基于 GXF 的资源一样,也可以将原生资源作为位置参数传递给算子构造函数。

有关真实应用程序中原生资源使用的具体示例,请参阅 Holohub 上的 volume_rendering_xr 应用程序。此应用程序使用原生 XrSession 资源类型,该类型对应于单个 OpenXR 会话。然后,此单个“会话”资源可以由 XrBeginFrameOpXrEndFrameOp 算子共享。

要创建原生资源,请实现一个从 Resource 继承的类。

复制
已复制!
            

class MyNativeResource(Resource): def __init__(self, fragment, message="test message", *args, **kwargs): self.message = message super().__init__(fragment, *args, **kwargs) # Could optionally define Parameter as in C++ via spec.param as below. # Here, we chose instead to pass message as an argument to __init__ above. # def setup(self, spec: ComponentSpec): # spec.param("message", "test message") # define a custom method def message(self): return self.message

下面显示了一些自定义算子如何在计算方法中使用此类资源

复制
已复制!
            

class MyOperator(Operator): def compute(self, op_input, op_output, context): resource = self.resource("message_resource") if resource is None: raise ValueError("expected message resource not found") assert isinstance(resource, MyNativeResource) print(f"message ={resource.message()")

其中,可以按如下方式创建此原生资源并将其按位置传递给 MyOperator

复制
已复制!
            

# example code within Application.compose (or Fragment.compose) message_resource = MyNativeResource( fragment=self, message="hello world", name="message_resource") # pass the native resource as a positional argument to MyOperator my_op = MyOperator(fragment=self, message_resource)

examples/native 文件夹中有一个原生资源使用的最小示例。

配置调度器

调度器控制应用程序如何调度构成其工作流程的算子的执行。

默认调度器是单线程 GreedyScheduler。可以将应用程序配置为使用不同的调度器 Scheduler (C++/Python) 或更改默认调度器的参数,使用 scheduler() 函数 (C++/Python)。

例如,如果应用程序需要并行运行多个算子,则可以改用 MultiThreadSchedulerEventBasedScheduler。两者之间的区别在于,MultiThreadScheduler 基于主动轮询算子以确定它们是否准备好执行,而 EventBasedScheduler 将等待指示算子准备好执行的事件。

下面的代码片段显示了如何设置和配置非默认调度器

复制
已复制!
            

auto app = holoscan::make_application<App>(); auto scheduler = app->make_scheduler<holoscan::EventBasedScheduler>( "myscheduler", Arg("worker_thread_number", 4), Arg("stop_on_deadlock", true) ); app->scheduler(scheduler); app->run();

  • 我们在 schedulers 模块中创建 Scheduler 类的实例。与算子一样,参数可以来自显式 ArgArgList,也可以来自 YAML 配置。

  • scheduler() 方法分配要由应用程序使用的调度器。

复制
已复制!
            

app = App() scheduler = holoscan.schedulers.EventBasedScheduler( app, name="myscheduler", worker_thread_number=4, stop_on_deadlock=True, ) app.scheduler(scheduler) app.run()

提示

这也在 multithread 示例中进行了说明。

配置工作线程池

上一节中讨论的 MultiThreadSchedulerEventBasedScheduler 都在默认情况下自动创建内部工作线程池。在某些情况下,用户可能希望改为将算子分配给特定的用户定义线程池。这也允许可选地将算子固定到特定线程。

假设我有三个算子,op1op2op3。假设我想为这两个算子分配一个线程池,并且我希望算子 2 和 3 固定到线程池中的特定线程。以下示例显示了从 Fragment compose 方法配置线程池的代码。

我们通过调用 make_thread_pool() 方法来创建线程池。第一个参数是用户定义的线程池名称,第二个参数是线程池中最初的线程数。此 make_thread_pool 方法返回指向 ThreadPool 对象的共享指针。然后可以使用该对象的 add() 方法将单个算子或算子向量添加到线程池。 add 函数的第二个参数是一个布尔值,指示给定的算子是否应固定为始终在线程池中的特定线程上运行。

复制
已复制!
            

// The following code would be within `Fragment::compose` after operators have been defined // Assume op1, op2 and op3 are `shared_ptr<OperatorType>` as returned by `make_operator` // create a thread pool with a three threads auto pool1 = make_thread_pool("pool1", 3); // assign a single operator to the thread pool (unpinned) pool1->add(op1, false); // assign multiple operators to this thread pool (pinned) pool1->add({op2, op3}, true);

我们通过调用 make_thread_pool() 方法来创建线程池。第一个参数是用户定义的线程池名称,第二个参数是线程池的初始大小。无需修改此值,因为大小会根据需要自动增加。此 make_thread_pool 方法返回指向 ThreadPool 对象的共享指针。然后可以使用该对象的 add() 方法将单个算子或算子向量添加到线程池。add 函数的第二个参数是一个布尔值,指示给定的算子是否应固定为始终在线程池中的特定线程上运行。

复制
已复制!
            

# The following code would be within `Fragment::compose` after operators have been defined # Assume op1, op2 and op3 are `shared_ptr<OperatorType>` as returned by `make_operator` # create a thread pool with a single thread pool1 = self.make_thread_pool("pool1", 1); # assign a single operator to the thread pool (unpinned) pool1.add(op1, True); # assign multiple operators to this thread pool (pinned) pool1.add([op2, op3], True);

注意

没有必要为 Holoscan 应用程序定义线程池。有一个默认线程池,用于用户未显式分配给线程池的任何算子。线程池的使用提供了一种显式指示应固定线程的方法。

必须使用单独线程池的一种情况是为了支持固定涉及单独 GPU 设备的算子。任何给定的线程池都应仅使用单个 GPU 设备。与 GPU 设备资源关联的算子是那些使用基于 CUDA 的分配器之一的算子,如 BlockMemoryPoolCudaStreamPoolRMMAllocatorStreamOrderedAllocator

提示

线程池资源示例中给出了一个简单的应用程序的具体示例,该应用程序在单独的线程池中具有两对算子。

请注意,任何给定的算子只能属于一个线程池。将同一算子分配给多个线程池可能会导致在应用程序启动时记录错误。

还有一个相关的布尔参数 strict_thread_pinning,可以作为 holoscan::Arg 传递给 MultiThreadScheduler 构造函数。当此参数设置为 false 并且算子固定到特定线程时,只要固定的算子未准备好执行,也允许其他算子在该同一线程上运行。当 strict_thread_pinningtrue 时,该线程只能由固定到该线程的算子使用。对于 EventBasedScheduler,它始终处于严格固定模式,并且没有这样的参数。

如果线程池由单线程 GreedyScheduler 配置使用,则会记录警告,指示将忽略用户定义的线程池。只有 MultiThreadSchedulerEventBasedScheduler 可以使用线程池。

配置运行时属性

如下所述,应用程序可以通过在给定节点上手动执行 C++ 或 Python 应用程序来简单地运行,或者通过在 HAP 容器打包它来运行。对于后者,需要配置运行时属性:有关详细信息,请参阅应用程序运行器配置

注意

根据其片段图的拓扑顺序初始化算子。当应用程序运行时,算子以相同的拓扑顺序执行。图的拓扑排序确保在算子的实例化和执行之前满足其所有数据依赖性。目前,我们不支持指定算子的不同且显式的实例化和执行顺序。

单算子工作流程

最简单的工作流程形式是单个算子。

graphviz-8efeecb48c58f5386369c48eaeef4a22e69d1fcd.png

图 12 单算子工作流程

上面的图显示了一个算子 (C++/Python)(名为 MyOp),它既没有输入端口也没有输出端口。

  • 这样的算子可以从外部(例如,从文件)接受输入数据,并生成输出数据(例如,到文件),从而充当源算子和接收器算子。

  • 算子的参数(例如,输入/输出文件路径)可以作为上面部分中描述的参数传递。

我们可以在 compose() 方法中调用 add_operator (C++/Python) 方法将算子添加到工作流程。

以下代码显示了如何在 App 类的 compose() 方法中定义单算子工作流程(假设算子类 MyOp 在同一文件中声明/定义)。

复制
已复制!
            

class App : public holoscan::Application { public: void compose() override { // Define Operators auto my_op = make_operator<MyOp>("my_op"); // Define the workflow add_operator(my_op); } };

复制
已复制!
            

class App(Application): def compose(self): # Define Operators my_op = MyOp(self, name="my_op") # Define the workflow self.add_operator(my_op)

线性工作流程

这是一个工作流程示例,其中算子线性连接

graphviz-c45d32849d97f0eeca87975cf39776b0f641e83c.png

图 13 线性工作流程

在此示例中,SourceOp 生成消息并将其传递给 ProcessOpProcessOp 生成另一个消息并将其传递给 SinkOp

我们可以在 compose() 方法中调用 add_flow() 方法 (C++/Python) 连接两个算子。

add_flow() 方法 (C++/Python) 接受源算子、目标算子和可选的端口名称对。端口名称对用于将源算子的输出端口连接到目标算子的输入端口。对的第一个元素是上游算子的输出端口名称,第二个元素是下游算子的输入端口名称。如果算子只有一个输入/输出端口,则可以使用空端口名称 (“”) 来指定端口名称。如果上游算子中只有一个输出端口,而下游算子中只有一个输入端口,则可以省略端口对。

以下代码显示了如何在 App 类的 compose() 方法中定义线性工作流程(假设算子类 SourceOpProcessOpSinkOp 在同一文件中声明/定义)。

复制
已复制!
            

class App : public holoscan::Application { public: void compose() override { // Define Operators auto source = make_operator<SourceOp>("source"); auto process = make_operator<ProcessOp>("process"); auto sink = make_operator<SinkOp>("sink"); // Define the workflow add_flow(source, process); // same as `add_flow(source, process, {{"output", "input"}});` add_flow(process, sink); // same as `add_flow(process, sink, {{"", ""}});` } };

复制
已复制!
            

class App(Application): def compose(self): # Define Operators source = SourceOp(self, name="source") process = ProcessOp(self, name="process") sink = SinkOp(self, name="sink") # Define the workflow self.add_flow(source, process) # same as `self.add_flow(source, process, {("output", "input")})` self.add_flow(process, sink) # same as `self.add_flow(process, sink, {("", "")})`

复杂工作流程(多输入和输出)

您可以设计如下所示的复杂工作流程,其中某些算子具有多输入和/或多输出

graphviz-05a77fe15e35f2a15dc49175047424d743335b87.png

图 14 复杂工作流程(多输入和输出)

复制
已复制!
            

class App : public holoscan::Application { public: void compose() override { // Define Operators auto reader1 = make_operator<Reader1>("reader1"); auto reader2 = make_operator<Reader2>("reader2"); auto processor1 = make_operator<Processor1>("processor1"); auto processor2 = make_operator<Processor2>("processor2"); auto processor3 = make_operator<Processor3>("processor3"); auto writer = make_operator<Writer>("writer"); auto notifier = make_operator<Notifier>("notifier"); // Define the workflow add_flow(reader1, processor1, {{"image", "image1"}, {"image", "image2"}, {"metadata", "metadata"}}); add_flow(reader1, processor1, {{"image", "image2"}}); add_flow(reader2, processor2, {{"roi", "roi"}}); add_flow(processor1, processor2, {{"image", "image"}}); add_flow(processor1, writer, {{"image", "image"}}); add_flow(processor2, notifier); add_flow(processor2, processor3); add_flow(processor3, writer, {{"seg_image", "seg_image"}}); } };

复制
已复制!
            

class App(Application): def compose(self): # Define Operators reader1 = Reader1Op(self, name="reader1") reader2 = Reader2Op(self, name="reader2") processor1 = Processor1Op(self, name="processor1") processor2 = Processor2Op(self, name="processor2") processor3 = Processor3Op(self, name="processor3") notifier = NotifierOp(self, name="notifier") writer = WriterOp(self, name="writer") # Define the workflow self.add_flow(reader1, processor1, {("image", "image1"), ("image", "image2"), ("metadata", "metadata")}) self.add_flow(reader2, processor2, {("roi", "roi")}) self.add_flow(processor1, processor2, {("image", "image")}) self.add_flow(processor1, writer, {("image", "image")}) self.add_flow(processor2, notifier) self.add_flow(processor2, processor3) self.add_flow(processor3, writer, {("seg_image", "seg_image")})

如果图中存在循环且没有隐式根算子,则根算子是第一次调用 add_flow 方法 (C++/Python) 中的第一个算子,或者是第一次调用 add_operator 方法 (C++/Python) 中的算子。

复制
已复制!
            

auto op1 = make_operator<...>("op1"); auto op2 = make_operator<...>("op2"); auto op3 = make_operator<...>("op3"); add_flow(op1, op2); add_flow(op2, op3); add_flow(op3, op1); // There is no implicit root operator // op1 is the root operator because op1 is the first operator in the first call to add_flow

如果图中存在循环依赖,且该循环依赖的根操作算子没有输入端口,那么操作算子的初始化和执行顺序仍然会尽可能地按照拓扑排序进行,直到循环依赖需要被显式地打破。下面给出一个示例

Cycle_Implicit_Root.png

您可以使用 CMake 构建您的 C++ 应用程序,方法是在您的 CMakeLists.txt 中调用 find_package(holoscan) 来加载 SDK 库。您的可执行文件需要链接到:

  • holoscan::core

  • 您希望在应用程序工作流程中使用的任何在 main.cpp 之外定义的操作算子,例如:

    • SDK 内置操作算子,位于 holoscan::ops 命名空间下。

    • 在您的项目中通过 add_library 单独创建的操作算子。

    • 使用 find_libraryfind_package 外部导入的操作算子。

列表 1 /CMakeLists.txt

复制
已复制!
            

# Your CMake project cmake_minimum_required(VERSION 3.20) project(my_project CXX) # Finds the holoscan SDK find_package(holoscan REQUIRED CONFIG PATHS "/opt/nvidia/holoscan") # Create an executable for your application add_executable(my_app main.cpp) # Link your application against holoscan::core and any existing operators you'd like to use target_link_libraries(my_app PRIVATE holoscan::core holoscan::ops::<some_built_in_operator_target> <some_other_operator_target> <...> )


提示

所有示例中也对此进行了说明

  • SDK 安装目录的 CMakeLists.txt 中 - /opt/nvidia/holoscan/examples

  • SDK 源代码目录CMakeLists.min.txt 中。

一旦您的 CMakeLists.txt<src_dir> 中准备就绪,您可以使用以下命令行在 <build_dir> 中进行构建。如果希望使用的 SDK 安装路径与上面 find_package(holoscan) 中给定的 PATHS 不同,您可以选择性地传递 Holoscan_ROOT

复制
已复制!
            

# Configure cmake -S <src_dir> -B <build_dir> -D Holoscan_ROOT="/opt/nvidia/holoscan" # Build cmake --build <build_dir> -j

然后,您可以通过运行 <build_dir>/my_app 来运行您的应用程序。

Python 应用程序不需要构建。只需确保:

  • holoscan python 模块已安装在您的 dist-packages 中,或者列在 PYTHONPATH 环境变量下,以便您可以导入 holoscan.core 和您可能需要的任何内置操作算子,例如 holoscan.operators

  • 任何外部操作算子都可以在您的 dist-packagesPYTHONPATH 中包含的模块中找到。

注意

虽然 Python 应用程序不需要构建,但它们可能依赖于包装了 C++ 操作算子的操作算子。SDK 中所有内置的 Python 操作算子都已经预先构建了 Python 绑定。如果您自己包装了 C++ 操作算子以在 Python 应用程序中使用,请遵循 本节

然后,您可以通过运行 python3 my_app.py 来运行您的应用程序。

注意

对于给定的 CMake 项目、预构建的可执行文件或 Python 应用程序,您还可以使用 Holoscan CLI打包和运行您的 Holoscan 应用程序,将其置于符合 OCI 标准的容器镜像中。

从 Holoscan v2.3(对于 C++)或 v2.4(对于 Python)开始,可以将元数据与从操作算子输出端口发出的数据一起发送。然后,下游操作算子可以使用和/或修改此元数据。以下小节将介绍如何启用和使用此功能。

启用应用程序元数据

目前,元数据功能默认禁用,必须显式启用,如下面的代码块所示

复制
已复制!
            

app = holoscan::make_application<MyApplication>(); // Enable metadata feature before calling app->run() or app->run_async() app->is_metadata_enabled(true); app->run();

复制
已复制!
            

app = MyApplication() # Enable metadata feature before calling app.run() or app.run_async() app.is_metadata_enabled = True app.run()

理解元数据流

工作流程中的每个操作算子都有一个关联的 MetadataDictionary 对象。在每次操作算子的 compute() 调用开始时,此元数据字典将为空(即,元数据不会从之前的 compute 调用中持久存在)。当调用 receive() 数据时,输入消息中找到的任何元数据也将合并到操作算子的本地元数据字典中。然后,操作算子的 compute 方法可以读取、追加或删除元数据,如下一节所述。每当操作算子通过调用 emit() 发送数据时,操作算子元数据字典的当前状态将与通过 emit 调用中的第一个参数传递的数据一起在该端口上传输。任何下游操作算子都将通过其输入端口接收此元数据。

在 Operator::compute 中使用元数据

在操作算子的 compute() 方法中,可以调用 metadata() 方法来获取操作算子的 MetadataDictionary 的共享指针。元数据字典提供类似于 std::unordered_map (C++) 或 dict (Python) 的 API,其中键是字符串 (std::string for C++),值可以存储任何对象类型(通过 C++ MetadataObject 持有 std::any)。

提供了模板化的 get()set() 方法,如下所示,允许直接设置给定类型的值,而无需显式地处理内部 MetadataObject 类型。

复制
已复制!
            

// Receiving from a port updates operator metadata with any metadata found on the port auto input_tensors = op_input.receive<TensorMap>("in"); // Get a reference to the shared metadata dictionary auto meta = metadata(); // Retrieve existing values. // Use get<Type> to automatically cast the `std::any` contained within the `holsocan::Message` auto name = meta->get<std::string>("patient_name"); auto age = meta->get<int>("age"); // Get also provides a two-argument version where a default value to be assigned is given by // the second argument. The type of the default value should match the expected type of the value. auto flag = meta->get("flag", false); // Add a new value (if a key already exists, the value will be updated according to the // operator's metadata_policy). std::vector<float> spacing{1.0, 1.0, 3.0}; meta->set("pixel_spacing"s, spacing); // Remove an item meta->erase("patient_name"); // Check if a key exists bool has_patient_name = meta->has_key("patient_name"); // Get a vector<std::string> of all keys in the metadata const auto& keys = meta->keys(); // ... Some processing to produce output `data` could go here ... // Current state of `meta` will automatically be emitted along with `data` in the call below op_output.emit(data, "output1"); // Can clear all items meta->clear(); // Any emit call after this point would not transmit a metadata object op_output.emit(data, "output2");

有关所有可用方法,请参阅 MetadataDictionary API 文档。

MetadataObject 类型提供了 Pythonic 接口。

复制
已复制!
            

# Receiving from a port updates operator metadata with any metadata found on the port input_tensors = op_input.receive("in") # self.metadata can be used to access the shared MetadataDictionary # for example we can check if a key exists has_key = "my_key" in self.metadata # get the number of keys num_keys = len(self.metadata) # get a list of the keys print(f"metadata keys ={self.metadata.keys()}") # iterate over the values in the dictionary using the `items()` method for key, value in self.metadata.items(): # process item pass # print a Python dict of the keys/values print(self.metadata) # Retrieve existing values. If the underlying value is a C++ class, a conversion to an equivalent Python object will be made (e.g. `std::vector<std::string>` to `List[str]`). name = self.metadata["patient_name"] age = self.metadata["age"] # It is also supported to use the get method along with an optional default value to use # if the key is not present. flag = self.metadata.get("flag", False) # print the current metadata policy print(f"metadata policy ={self.metadata_policy}") # Add a new value (if a key already exists, the value will be updated according to the # operator's metadata_policy). If the value is set via the indexing operator as below, # the Python object itself is stored as the value. spacing = (1.0, 1.0, 3.0) self.metadata["pixel_spacing"] = spacing # In some cases, if sending metadata to downstream C++-based operators, it may be desired # to instead store the metadata value as an equivalent C++ type. In that case, it is # necessary to instead set the value using the `set` method with `cast_to_cpp=True`. # Automatic casting is supported for bool, str, and various numeric and iterator or # sequence types. # The following would result in the spacing `Tuple[float]` being stored as a # C++ `std::vector<double>`. Here we show use of the `pop` method to remove a previous value # if present. self.metadata.pop("pixel_spacing", None) self.metadata.set("pixel_spacing", spacing, cast_to_cpp=True) # To store floating point elements at a different than the default (double) precision or # integers at a different precision than int64_t, use the dtype argument and pass a # numpy.dtype argument corresponding to the desired C++ type. For example, the following # would instead store `spacing` as a std::vector<float> instead. In this case we show # use of Python's `del` instead of the pop method to remove an existing item. del self.metadata["pixel_spacing"] self.metadata.set("pixel_spacing", spacing, dtype=np.float32, cast_to_cpp=True) # Remove a value del self["patient name"] # ... Some processing to produce output `data` could go here ... # Current state of `meta` will automatically be emitted along with `data` in the call below op_output.emit(data, "output1") # Can clear all items self.metadata.clear() # Any emit call after this point would not transmit a metadata object op_output.emit(data, "output2")

有关所有可用方法,请参阅 MetadataDictionary API 文档。

上面的代码说明了使用和更新操作算子元数据的各种方法。

注意

请特别注意元数据设置的细节。当使用纯 Python 应用程序时,最好只使用 self.metadata[key] = valueself.metadata.set(key, value) 来传递 Python 对象作为值。这将仅使用共享对象,而不会导致复制到/从相应的 C++ 类型。但是,当与包装 C++ 实现的其他操作算子交互时,它们的 compute 方法将期望 C++ 元数据。在这种情况下,需要使用带有 cast_to_cpp=Trueset 方法来转换为预期的 C++ 类型。上面的示例中的一些“pixel_spacing”设置调用中显示了这一点。为了方便起见,传递给 set 方法的 value 也可以是 NumPy 数组,但请注意,在这种情况下,会执行复制到新的 C++ std::vector 中的操作。创建向量时,将遵循数组的 dtype。一般来说,当前可以转换为 C++ 的类型是标量数值、字符串以及它们的 Python 迭代器或序列(序列将转换为 1D 或 2D C++ std::vector因此,Python 序列中的项不能是混合类型)。

元数据更新策略

操作算子类还具有 metadata_policy() 方法,可用于设置 MetadataPolicy,以便在处理操作算子的多个输入端口中重复的元数据键时使用。可用选项包括:

  • “update” (MetadataPolicy::kUpdate):将先前 receive 调用中的任何现有键替换为后续 receive 调用中存在的键。

  • “reject” (MetadataPolicy::kReject):当键由于先前的 receive 调用而已经存在时,拒绝新的键/值对。

  • “raise” (MetadataPolicy::kRaise):如果遇到重复的键,则抛出 std::runtime_error 异常。这是默认策略。

元数据策略通常在 compose() 期间设置,如下例所示

复制
已复制!
            

// Example for setting metadata policy from Application::compose() my_op = make_operator<MyOperator>("my_op"); my_op->metadata_policy(holoscan::MetadataPolicy::kRaise);

操作算子类还具有 metadata_policy() 属性,可用于设置 MetadataPolicy,以便在处理操作算子的多个输入端口中重复的元数据键时使用。可用选项包括:

  • “update” (MetadataPolicy.UPDATE):将先前 receive 调用中的任何现有键替换为后续 receive 调用中存在的键。这是默认策略。

  • “reject” (MetadataPolicy.REJECT):当键由于先前的 receive 调用而已经存在时,拒绝新的键/值对。

  • “raise” (MetadataPolicy.RAISE):如果遇到重复的键,则抛出异常。

元数据策略通常在 compose() 期间设置,如下例所示

复制
已复制!
            

# Example for setting metadata policy from Application.compose() my_op = MyOperator(self, name="my_op") my_op.metadata_policy = holoscan.core.MetadataPolicy.RAISE

该策略仅适用于设置它的操作算子。

在分布式应用程序中使用元数据

支持在分布式应用程序的两个片段之间发送元数据,但有几个方面需要注意。

  1. 通过网络发送元数据需要序列化和反序列化元数据键和值。为此支持的值类型与通过输出端口发送的数据类型相同(请参阅关于 对象序列化 部分中的表格)。唯一的例外是 TensorTensorMap 值不能作为片段之间的元数据值发送(此限制也适用于类似 tensor 的 Python 对象)。为 SDK 注册的任何 自定义编解码器 也将自动用于元数据值的序列化。

  2. 片段之间可以传输的元数据量存在实际的大小限制,约为几千字节。这是因为元数据当前与其他实体头信息一起在 UCX 标头中发送,而 UCX 标头具有固定的大小限制(元数据与其他标头信息一起存储在 HOLOSCAN_UCX_SERIALIZATION_BUFFER_SIZE 环境变量 定义的大小限制内)。

以上限制仅适用于片段之间发送的元数据。在片段内,元数据没有大小限制(除了系统内存限制),也不需要序列化或反序列化步骤。

当前限制

  1. 当前的元数据 API 仅完全支持原生 holoscan 操作算子,目前不支持包装 GXF codelet 的操作算子(即,继承自 GXFOperator 或通过 GXFCodeletOp 创建的操作算子)。除了 GXFCodeletOp 之外,holoscan::ops 命名空间下提供的内置操作算子都是原生操作算子,因此该功能可以与它们一起使用。目前,这些内置操作算子都没有添加自己的元数据,但是只要 app->is_metadata_enabled(true) 设置为启用元数据功能,输入端口上接收到的任何元数据都将自动传递到其输出端口。

请参阅专门的 Holoscan CUDA 流处理 页面,详细了解如何编写使用非默认 CUDA 流的 Holoscan 应用程序。

上一页 自带模型 (BYOM)
下一页 创建分布式应用程序
© 版权所有 2022-2024, NVIDIA。 上次更新于 2025 年 1 月 27 日。