Morpheus 控制消息
控制消息在 23.03 版本中引入,为之前无法实现的众多用例提供了解决方案。这种新范式通过启用更具反应性的、事件驱动的操作,增强了 Morpheus 管道的功能。控制消息涉及将消息对象发送到管道,这些消息对象可以代表广泛的概念,从原始数据到从指定源加载数据或启动带外推理或训练任务的显式指令。管道的行为可以根据设计动态调整;某些阶段可能会忽略它们不打算处理的消息,而其他阶段则会根据消息类型和内容采取行动。
这种方法为 Morpheus 管道解锁了各种新的应用。例如,控制消息可以促进实时数据处理和分析,使管道能够及时响应时间敏感的事件或数据流。此外,它们可以支持自适应机器学习模型,这些模型可以根据传入的数据不断更新和优化其预测。此外,控制消息可以通过实现按需数据处理和任务执行来提高资源分配和效率。总的来说,在 Morpheus 管道中引入控制消息为更通用和响应更快的软件解决方案铺平了道路,满足了更广泛的需求和用例。
控制消息是直接的对象,包含 tasks
、metadata
和可能的 payload
数据。任务可以是以下之一:training
、inference
或 other
。metadata
是键值对的字典,提供有关消息的附加信息,并且必须是 JSON 可序列化的。payload
是 Morpheus MessageMeta
对象,可用于移动原始数据。可以通过 API 访问这些元素中的每一个,因为消息会流经管道。
使用任务
控制消息可以处理诸如 training
、inference
和一个包罗万象的类别 other
之类的任务。可以使用诸如 add_task
、has_task
和 remove_task
之类的方法添加、检查是否存在或从控制消息中删除任务。
from morpheus.messages import ControlMessage
task_data = {
"....": "...."
}
msg = ControlMessage()
msg.add_task("training", task_data)
if msg.has_task("training"):
task = msg.remove_task("training")
管理元数据
metadata
是一组键值对,提供有关控制消息的补充信息,并且必须是 JSON 可序列化的。您可以使用 set_metadata
、has_metadata
和 get_metadata
方法分别设置、检查和检索元数据值。
from morpheus.messages import ControlMessage
msg = ControlMessage()
msg.set_metadata("description", "This is a sample control message.")
if msg.has_metadata("description"):
description = msg.get_metadata("description")
处理有效负载
控制消息的 payload
是 Morpheus MessageMeta
对象,可以携带原始数据。您可以使用 payload
方法设置或检索有效负载,该方法可以接受 MessageMeta
实例或返回有效负载本身。
import cudf
from morpheus.messages import ControlMessage
from morpheus.messages import MessageMeta
data = cudf.DataFrame() # some data
msg_meta = MessageMeta(data)
msg = ControlMessage()
msg.payload(msg_meta)
retrieved_payload = msg.payload()
msg_meta == retrieved_payload # True
从 MultiMessage
转换为 ControlMessage
MultiMessage
类型在 24.06 版本中已弃用,并在 24.10 版本中已完全移除。
升级到 24.10 时,所有 MultiMessage
的用法都需要转换为 ControlMessage
。每个 MultiMessage
功能在 ControlMessage
中都有相应的等效项,如下所示。
功能 |
MultiMessage |
ControlMessage |
---|---|---|
初始化 | multi_msg = MultiMessage(msg_meta) |
control_msg = ControlMessage() control_msg.payload(msg_meta) |
获取 DataFrame |
multi_msg.get_meta() |
control_msg.payload().get_data() |
从 DataFrame 获取列 |
multi_msg.get_meta(col_name) |
control_msg.payload().get_data(col_name) |
将列值设置为 DataFrame |
multi_msg.set_meta(col_name, value) |
control_msg.payload().set_data(col_name, value) |
获取给定起始和停止位置的切片 DataFrame |
multi_msg.get_slice(start, stop) |
control_msg.payload().get_slice(start, stop) |
复制给定行范围的 DataFrame |
multi_msg.copy_ranges(ranges) |
control_msg.payload().copy_ranges(ranges) |
MultiTensorMessage | ControlMessage | |
获取推理张量 ndarray |
multi_tensor_msg.tensor() |
control_msg.tensors() |
获取特定的推理张量 | multi_tensor_msg.get_tensor(tensor_name) |
control_msg.tensors().get_tensor(tensor_name) |
请注意,在 ControlMessage
列中,get_slice()
和 copy_ranges()
方法是在 MessageMeta
有效负载上调用的,因此在切片后返回 MessageMeta
,而 MultiMessage
中的这些函数返回新的 MultiMessage
实例。