构建管道
先决条件 以下示例假定已获取示例数据集。从 Morpheus 仓库的根目录运行
./scripts/fetch_data.py fetch examples
要通过 CLI 构建管道,用户必须首先指定管道类型、源对象,然后是阶段的顺序列表。对于每个阶段,可以指定选项来配置特定阶段。由于阶段是按顺序排列的,因此一个阶段的输出将成为下一个阶段的输入。除非经过大量自定义,否则管道通常以 morpheus run
开头,后跟管道模式,例如 pipeline-nlp
或 pipeline-fil
。例如,要运行 NLP 管道,请使用
morpheus run pipeline-nlp ...
虽然每个阶段都有配置选项,但也有一些选项适用于整个管道。查看 morpheus run --help
、morpheus run pipeline-nlp --help
和 morpheus run pipeline-fil --help
以获取这些全局管道选项。
使用 CLI 配置的所有管道都需要从源对象开始。Morpheus 包含的两个常用源阶段是
from-file
从本地文件读取到管道中
支持 CSV、JSON、JSON lines 和 Parquet 格式
所有行都在开始时读取并一次性排队到管道中。适用于性能测试。
有关更多信息,请参阅
morpheus.stages.input.file_source_stage.FileSourceStage
from-kafka
将消息从 Kafka 集群拉取到管道中
Kafka 集群可以在本地主机或远程运行
有关更多信息,请参阅
morpheus.stages.input.kafka_source_stage.KafkaSourceStage
从此刻起,可以从命令行顺序添加任意数量的阶段,从头到尾。例如,我们可以构建一个简单的管道,该管道从文件读取、反序列化消息、序列化它们,然后写入文件,请使用以下命令
morpheus --log_level=DEBUG run pipeline-other --viz_file=.tmp/simple_identity.png \
from-file --filename=examples/data/pcap_dump.jsonlines \
deserialize \
serialize \
to-file --overwrite --filename .tmp/temp_out.json

输出应类似于
Configuring Pipeline via CLI
Starting pipeline via CLI... Ctrl+C to Quit
Config:
{
"_model_max_batch_size": 8,
"_pipeline_batch_size": 256,
"ae": null,
"class_labels": [],
"debug": false,
"edge_buffer_size": 128,
"feature_length": 1,
"fil": {
"feature_columns": null
},
"log_config_file": null,
"log_level": 10,
"mode": "OTHER",
"num_threads": 64,
"plugins": []
}
CPP Enabled: True
====Starting Pipeline====
====Pipeline Started====
====Building Segment: linear_segment_0====
Added source: <from-file-0; FileSourceStage(filename=examples/data/pcap_dump.jsonlines, iterative=False, file_type=FileTypes.Auto, repeat=1, filter_null=True, filter_null_columns=(), parser_kwargs={})>
└─> morpheus.MessageMeta
Added stage: <deserialize-1; DeserializeStage(ensure_sliceable_index=True, task_type=None, task_payload=None)>
└─ morpheus.MessageMeta -> morpheus.ControlMessage
Added stage: <serialize-2; SerializeStage(include=(), exclude=(), fixed_columns=True)>
└─ morpheus.ControlMessage -> morpheus.MessageMeta
Added stage: <to-file-3; WriteToFileStage(filename=.tmp/temp_out.json, overwrite=True, file_type=FileTypes.Auto, include_index_col=True, flush=False)>
└─ morpheus.MessageMeta -> morpheus.MessageMeta
====Building Segment Complete!====
====Pipeline Complete====
管道构建检查
在 ====Building Pipeline====
消息之后,如果日志记录级别为 INFO
或更高,CLI 将打印所有阶段的列表以及每个阶段的类型转换。要成为有效的管道,一个阶段的输出类型必须与下一个阶段的输入类型匹配。许多阶段是灵活的,并在运行时确定其类型,但某些阶段需要特定的输入类型。如果您的管道配置不正确,Morpheus 将报告错误。例如,如果我们运行与上面相同的命令,但忘记了 serialize
阶段
morpheus --log_level=DEBUG run pipeline-other \
from-file --filename=examples/data/pcap_dump.jsonlines \
deserialize \
to-file --overwrite --filename .tmp/temp_out.json
然后将显示以下错误
RuntimeError: The to-file stage cannot handle input of <class 'morpheus.messages.control_message.ControlMessage'>. Accepted input types: (<class 'morpheus.messages.message_meta.MessageMeta'>,)
这表明 to-file
阶段无法接受 morpheus.messages.ControlMessage
的输入类型。这是因为 to-file
阶段不知道如何将该类写入文件;它只知道如何写入 morpheus.messages.message_meta.MessageMeta
的实例。为了确保您拥有有效的管道,请检查消息的 Accepted input types: (<class 'morpheus.messages.message_meta.MessageMeta'>,)
部分。这表明您需要一个阶段来将 deserialize
阶段的输出类型 ControlMessage
转换为 MessageMeta
,这正是 serialize
阶段的作用。
Kafka 源示例
上面的示例本质上只是复制文件。但是,重要的是要注意,大多数 Morpheus 管道在结构上是相似的,它们都以源阶段 (from-file
) 开头,后跟 deserialize
阶段,以 serialize
阶段结尾,后跟接收器阶段 (to-file
),实际的训练或推理逻辑发生在两者之间。
我们也可以轻松地在上面的示例中交换源或接收器阶段,而不会对整个管道产生任何影响。例如,要从 Kafka 主题读取,只需将 from-file
阶段替换为 from-kafka
注意:这假定 Kafka 代理在本地主机上运行,并侦听端口 9092。要使用 Kafka 测试 Morpheus,请按照 快速启动 Kafka 集群 部分的步骤 1-8,位于 contributing.md 中,创建一个名为 test_pcap
的主题,然后将端口 9092
替换为您的 Kafka 实例正在侦听的端口。
morpheus --log_level=DEBUG run pipeline-other \
from-kafka --input_topic test_pcap --bootstrap_servers localhost:9092 \
deserialize \
serialize \
to-file --filename .tmp/temp_out.json
有关特定管道模式的可用阶段的完整列表,请使用 CLI 帮助命令。首先,可以使用 morpheus run --help
列出可用的管道模式。然后,可以使用 morpheus run <mode> --help
列出该模式的可用阶段。例如,要列出 pipeline-nlp
模式的可用阶段
morpheus run pipeline-nlp --help
从 JSON 对象中删除字段
此示例仅将字段 timestamp
、src_ip
和 dest_ip
从 examples/data/pcap_dump.jsonlines
复制到 out.jsonlines
。

morpheus run pipeline-other --viz_file=.tmp/remove_fields_from_json_objects.png \
from-file --filename examples/data/pcap_dump.jsonlines \
deserialize \
serialize --include 'timestamp' --include 'src_ip' --include 'dest_ip' \
to-file --overwrite --filename out.jsonlines
监控吞吐量
此示例在命令行报告吞吐量。

morpheus --log_level=INFO run pipeline-other --viz_file=.tmp/monitor_throughput.png \
from-file --filename examples/data/pcap_dump.jsonlines \
deserialize \
monitor --description "Lines Throughput" --smoothing 0.1 --unit "lines" \
serialize \
to-file --overwrite --filename out.jsonlines
输出
Configuring Pipeline via CLI
Starting pipeline via CLI... Ctrl+C to Quit
Lines Throughput[Complete]: 93085 lines [00:03, 29446.18 lines/s]
Pipeline visualization saved to .tmp/monitor_throughput.png
注意:默认情况下,如果 log_level
设置为 WARNING
或更低,则 monitor 阶段将从管道中省略自身。
多监控器吞吐量
此示例独立报告每个阶段的吞吐量。

morpheus --log_level=INFO run pipeline-nlp --viz_file=.tmp/multi_monitor_throughput.png \
from-file --filename examples/data/pcap_dump.jsonlines \
monitor --description "From File Throughput" \
deserialize \
monitor --description "Deserialize Throughput" \
serialize \
monitor --description "Serialize Throughput" \
to-file --filename out.jsonlines --overwrite
输出
Configuring Pipeline via CLI
Starting pipeline via CLI... Ctrl+C to Quit
From File Throughput[Complete]: 93085 messages [00:00, 168118.35 messages/s]
Deserialize Throughput[Complete]: 93085 messages [00:04, 22584.37 messages/s]
Serialize Throughput[Complete]: 93085 messages [00:06, 14095.36 messages/s]
Pipeline visualization saved to .tmp/multi_monitor_throughput.png
NLP 综合示例
此示例展示了一个 NLP 管道,该管道使用了 Morpheus 中可用的多个阶段。此示例利用 Triton Inference Server 执行推理,并将输出写入名为 inference_output
的 Kafka 主题。这两者都需要在启动 Morpheus 之前启动。
启动 Triton
运行以下命令以启动 Triton 并加载 sid-minibert
模型
docker run --rm -ti --gpus=all -p8000:8000 -p8001:8001 -p8002:8002 nvcr.io/nvidia/morpheus/morpheus-tritonserver-models:24.10 tritonserver --model-repository=/models/triton-model-repo --exit-on-error=false --model-control-mode=explicit --load-model sid-minibert-onnx
启动 Kafka
按照 快速启动 Kafka 集群 部分的步骤 1-8,位于 contributing.md 中,创建一个名为 inference_output
的主题,然后将端口 9092
替换为您的 Kafka 实例正在侦听的端口。

morpheus --log_level=INFO run --pipeline_batch_size=1024 --model_max_batch_size=32 \
pipeline-nlp --viz_file=.tmp/nlp_kitchen_sink.png \
from-file --filename examples/data/pcap_dump.jsonlines \
deserialize \
preprocess \
inf-triton --model_name=sid-minibert-onnx --server_url=localhost:8000 \
monitor --description "Inference Rate" --smoothing=0.001 --unit "inf" \
add-class \
filter --filter_source=TENSOR --threshold=0.8 \
serialize --include 'timestamp' --exclude '^_ts_' \
to-kafka --bootstrap_servers localhost:9092 --output_topic "inference_output" \
monitor --description "ToKafka Rate" --smoothing=0.001 --unit "msg"
输出
Configuring Pipeline via CLI
Starting pipeline via CLI... Ctrl+C to Quit
Inference Rate[Complete]: 93085 inf [00:07, 12334.49 inf/s]
ToKafka Rate[Complete]: 93085 msg [00:07, 13297.85 msg/s]
Pipeline visualization saved to .tmp/nlp_kitchen_sink.png