NVIDIA Morpheus (24.10.01)

Python Morpheus 模块

Morpheus 使用 MRC 图执行框架。Morpheus 管道构建于 MRC 管道之上,MRC 管道由节点和边的集合组成,称为段(可以理解为子图),段之间可以通过入口/出口端口连接。在许多常见情况下,一个 MRC 管道将仅包含单个段。虽然 Morpheus 阶段是 Morpheus 管道的主要构建块,但 Morpheus 模块可以被视为定义工作基本单元的一种方式,这些单元可以被组合和(重)用于构建更复杂的阶段。模块可以使用 Python 或 C++ 编写。

直通模块是一个简单的模块,它接受单个输入端口和单个输出端口。它只是简单地将输入传递到输出,这与简单 Python 阶段中定义的示例阶段非常相似;但是,它仅定义实际的工作单元,并且必须作为其自己的 Morpheus 阶段加载,或者在另一个阶段的上下文中加载才能使用。

模块定义和注册

examples/developer_guide/7_python_modules/my_test_module.py

复制
已复制!
            

import mrc from mrc.core import operators as ops from morpheus.utils.module_utils import register_module @register_module("my_test_module", "my_module_namespace") def my_test_module_initialization(builder: mrc.Builder): module_config = builder.get_current_module_config() # Get the module configuration def on_data(data): return data def node_fn(observable: mrc.Observable, subscriber: mrc.Subscriber): observable.pipe(ops.map(on_data)).subscribe(subscriber) node = builder.make_node("my_test_module_forwarding_node", mrc.core.operators.build(node_fn)) builder.register_module_input("input_0", node) builder.register_module_output("output_0", node)

在这里,我们定义了一个模块,或者更确切地说是一个用于创建模块的蓝图,在 my_module_namespace 命名空间中命名为 my_test_moduleregister_module 装饰器用于向系统注册模块,并使其可供其他模块、阶段或管道加载。register_module 装饰器接受两个参数:模块的名称和定义模块的命名空间。命名空间用于避免核心 Morpheus、自定义模块和第三方模块之间的命名冲突。

my_test_module_initialization 函数在模块加载时由 Morpheus 模块加载器调用。然后,它创建模块的新实例,该实例创建适当的 MRC 节点和边,并注册其他模块或 MRC 节点可以连接的输入和输出。

请注意,我们还从构建器中获取了 module_config 对象。此对象是一个字典,其中包含加载模块时传递给模块的所有配置参数。这对于允许模块根据运行时参数自定义其行为非常有用。我们将在下一节中看到一个示例。

加载模块

在模块被定义和注册后,它可以被其他模块或阶段加载。下面,我们将说明这两种情况下的过程。首先,在另一个模块中的用法;其次,我们将加载我们刚刚创建的模块作为一个简单阶段,这个过程专门化了现有 LinearModuleStage 的通用行为。

examples/developer_guide/7_python_modules/my_test_module_consumer.py

复制
已复制!
            

@register_module("my_test_module_consumer", "my_module_namespace") def my_test_module_consumer_initialization(builder: mrc.Builder): consumer_module_config = builder.get_current_module_config() # Get the module configuration module_config = {"some_configuration_parameter": "some_value"} my_test_module = builder.load_module("my_test_module", "my_module_namespace", "module_instance_name", module_config) builder.register_module_input("input_0", my_test_module.input_port("input_0")) builder.register_module_output("output_0", my_test_module.output_port("output_0"))

在这里,我们定义了一个新模块,该模块加载了我们上面定义的 my_test_module 模块,然后直接连接到其输入和输出端口。显然,这是一个简单的示例,但它说明了将模块加载和合并到现有工作流程中的基本过程和易用性。

examples/developer_guide/7_python_modules/my_test_module_consumer_stage.py

复制
已复制!
            

import typing import mrc from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin from morpheus.pipeline.single_port_stage import SinglePortStage class MyPassthroughModuleWrapper(PassThruTypeMixin, SinglePortStage): @property def name(self) -> str: return "my-pass-thru-module-wrapper" def accepted_types(self) -> tuple: return (typing.Any, ) def supports_cpp_node(self) -> bool: return False def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject: module_config = {"some_configuration_parameter": "some_value"} module_name = "my_test_module" my_module = builder.load_module(module_name, "my_module_namespace", f"{self.unique_name}-{module_name}", module_config) module_in_node = my_module.input_port("input_0") module_out_node = my_module.output_port("output_0") builder.make_edge(input_node, module_in_node) return module_out_node

在这里,我们定义了一个新阶段,该阶段加载了我们上面定义的 my_test_module 模块,然后包装了其输入和输出连接。

模块链接和嵌套

模块可以任意嵌套,并且可以链接在一起以创建更复杂的模块。例如,让我们定义一个稍微有趣的模块,它接受一个整数输入 i,并输出 (i^2 + 3*i)。为此,我们将定义三个新模块 my_square_modulemy_times_three_module,它们执行适当的操作,以及 my_compound_op_module,它将它们都包装起来。然后,我们将构建一个新模块作为这三个模块的组合。

examples/developer_guide/7_python_modules/my_test_compound_module.py

复制
已复制!
            

import mrc from mrc.core import operators as ops from morpheus.messages import MessageMeta from morpheus.utils.module_utils import register_module # Create and register our two new component modules # ========================================== @register_module("my_square_module", "my_module_namespace") def my_square_module_initialization(builder: mrc.Builder): module_config = builder.get_current_module_config() field_name = module_config.get("field_name", "data") def on_data(msg: MessageMeta) -> MessageMeta: with msg.mutable_dataframe() as df: df[field_name] = df[field_name]**2 return msg def node_fn(observable: mrc.Observable, subscriber: mrc.Subscriber): observable.pipe(ops.map(on_data)).subscribe(subscriber) node = builder.make_node("square", mrc.core.operators.build(node_fn)) builder.register_module_input("input_0", node) builder.register_module_output("output_0", node) @register_module("my_times_three_module", "my_module_namespace") def my_times_three_module_initialization(builder: mrc.Builder): module_config = builder.get_current_module_config() field_name = module_config.get("field_name", "data") def on_data(msg: MessageMeta) -> MessageMeta: with msg.mutable_dataframe() as df: df[field_name] = df[field_name] * 3 return msg def node_fn(observable: mrc.Observable, subscriber: mrc.Subscriber): observable.pipe(ops.map(on_data)).subscribe(subscriber) node = builder.make_node("times_two", mrc.core.operators.build(node_fn)) builder.register_module_input("input_0", node) builder.register_module_output("output_0", node) # Create and register our new compound operator -- illustrates module chaining @register_module("my_compound_op_module", "my_module_namespace") def my_compound_op(builder: mrc.Builder): square_module = builder.load_module("my_square_module", "my_module_namespace", "square_module", {}) times_three_module = builder.load_module("my_times_three_module", "my_module_namespace", "times_three_module", {}) builder.make_edge(square_module.output_port("output_0"), times_three_module.input_port("input_0")) builder.register_module_input("input_0", square_module.input_port("input_0")) builder.register_module_output("output_0", times_three_module.output_port("output_0")) # Create and register our new compound module -- illustrates module nesting @register_module("my_compound_module", "my_module_namespace") def my_compound_module(builder: mrc.Builder): op_module = builder.load_module("my_compound_op_module", "my_module_namespace", "op_module", {}) builder.register_module_input("input_0", op_module.input_port("input_0")) builder.register_module_output("output_0", op_module.output_port("output_0"))

examples/developer_guide/7_python_modules/my_compound_module_consumer_stage.py

复制
已复制!
            

import typing import mrc from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin from morpheus.pipeline.single_port_stage import SinglePortStage class MyCompoundOpModuleWrapper(PassThruTypeMixin, SinglePortStage): @property def name(self) -> str: return "my-compound-op-module-wrapper" def accepted_types(self) -> tuple: return (typing.Any, ) def supports_cpp_node(self) -> bool: return False def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject: module_config = {} module_name = "my_compound_module" my_module = builder.load_module(module_name, "my_module_namespace", f"{self.unique_name}-{module_name}", module_config) module_in_node = my_module.input_port("input_0") module_out_node = my_module.output_port("output_0") builder.make_edge(input_node, module_in_node) return module_out_node

实践中包装模块

虽然我们在这里为示例模块创建了新阶段,但通常我们不会仅仅为了包装一个模块而定义一个全新的阶段。相反,我们将使用 LinearModuleStage 来包装模块

复制
已复制!
            

from morpheus.stages.general.linear_modules_stage import LinearModulesStage config = Config() # Morpheus config module_config = { "module_id": "my_compound_module", "namespace": "my_module_namespace", "module_name": "module_instance_name", # ... other module config params... } pipeline.add_stage(LinearModulesStage(config, module_config, input_port_name="input_0", output_port_name="output_0"))

上一篇 数字指纹 (DFP) 参考
下一篇 C++ Morpheus 模块
© 版权所有 2024, NVIDIA。 上次更新于 2024 年 12 月 3 日。