NVIDIA Morpheus (24.10.01)

Morpheus 控制消息

控制消息在 23.03 版本中引入,为之前无法实现的众多用例提供了解决方案。这种新范式通过启用更具反应性的、事件驱动的操作,增强了 Morpheus 管道的功能。控制消息涉及将消息对象发送到管道,这些消息对象可以代表广泛的概念,从原始数据到从指定源加载数据或启动带外推理或训练任务的显式指令。管道的行为可以根据设计动态调整;某些阶段可能会忽略它们不打算处理的消息,而其他阶段则会根据消息类型和内容采取行动。

这种方法为 Morpheus 管道解锁了各种新的应用。例如,控制消息可以促进实时数据处理和分析,使管道能够及时响应时间敏感的事件或数据流。此外,它们可以支持自适应机器学习模型,这些模型可以根据传入的数据不断更新和优化其预测。此外,控制消息可以通过实现按需数据处理和任务执行来提高资源分配和效率。总的来说,在 Morpheus 管道中引入控制消息为更通用和响应更快的软件解决方案铺平了道路,满足了更广泛的需求和用例。

控制消息是直接的对象,包含 tasksmetadata 和可能的 payload 数据。任务可以是以下之一:traininginferenceothermetadata 是键值对的字典,提供有关消息的附加信息,并且必须是 JSON 可序列化的。payload 是 Morpheus MessageMeta 对象,可用于移动原始数据。可以通过 API 访问这些元素中的每一个,因为消息会流经管道。

使用任务

控制消息可以处理诸如 traininginference 和一个包罗万象的类别 other 之类的任务。可以使用诸如 add_taskhas_taskremove_task 之类的方法添加、检查是否存在或从控制消息中删除任务。

复制
已复制!
            

from morpheus.messages import ControlMessage task_data = { "....": "...." } msg = ControlMessage() msg.add_task("training", task_data) if msg.has_task("training"): task = msg.remove_task("training")

管理元数据

metadata 是一组键值对,提供有关控制消息的补充信息,并且必须是 JSON 可序列化的。您可以使用 set_metadatahas_metadataget_metadata 方法分别设置、检查和检索元数据值。

复制
已复制!
            

from morpheus.messages import ControlMessage msg = ControlMessage() msg.set_metadata("description", "This is a sample control message.") if msg.has_metadata("description"): description = msg.get_metadata("description")

处理有效负载

控制消息的 payload 是 Morpheus MessageMeta 对象,可以携带原始数据。您可以使用 payload 方法设置或检索有效负载,该方法可以接受 MessageMeta 实例或返回有效负载本身。

复制
已复制!
            

import cudf from morpheus.messages import ControlMessage from morpheus.messages import MessageMeta data = cudf.DataFrame() # some data msg_meta = MessageMeta(data) msg = ControlMessage() msg.payload(msg_meta) retrieved_payload = msg.payload() msg_meta == retrieved_payload # True

MultiMessage 转换为 ControlMessage

MultiMessage 类型在 24.06 版本中已弃用,并在 24.10 版本中已完全移除。

升级到 24.10 时,所有 MultiMessage 的用法都需要转换为 ControlMessage。每个 MultiMessage 功能在 ControlMessage 中都有相应的等效项,如下所示。

功能

MultiMessage

ControlMessage

初始化 multi_msg = MultiMessage(msg_meta) control_msg = ControlMessage()
control_msg.payload(msg_meta)
获取 DataFrame multi_msg.get_meta() control_msg.payload().get_data()
DataFrame 获取列 multi_msg.get_meta(col_name) control_msg.payload().get_data(col_name)
将列值设置为 DataFrame multi_msg.set_meta(col_name, value) control_msg.payload().set_data(col_name, value)
获取给定起始和停止位置的切片 DataFrame multi_msg.get_slice(start, stop) control_msg.payload().get_slice(start, stop)
复制给定行范围的 DataFrame multi_msg.copy_ranges(ranges) control_msg.payload().copy_ranges(ranges)

MultiTensorMessage ControlMessage
获取推理张量 ndarray multi_tensor_msg.tensor() control_msg.tensors()
获取特定的推理张量 multi_tensor_msg.get_tensor(tensor_name) control_msg.tensors().get_tensor(tensor_name)

请注意,在 ControlMessage 列中,get_slice()copy_ranges() 方法是在 MessageMeta 有效负载上调用的,因此在切片后返回 MessageMeta,而 MultiMessage 中的这些函数返回新的 MultiMessage 实例。

上一篇 C++ Morpheus 模块
下一篇 模块化数字指纹管道简介(集成训练)
© 版权所有 2024, NVIDIA。 上次更新于 2024 年 12 月 3 日。