NVIDIA Morpheus (24.10.01)

构建管道

先决条件 以下示例假定已获取示例数据集。从 Morpheus 仓库的根目录运行

复制
已复制!
            

./scripts/fetch_data.py fetch examples


要通过 CLI 构建管道,用户必须首先指定管道类型、源对象,然后是阶段的顺序列表。对于每个阶段,可以指定选项来配置特定阶段。由于阶段是按顺序排列的,因此一个阶段的输出将成为下一个阶段的输入。除非经过大量自定义,否则管道通常以 morpheus run 开头,后跟管道模式,例如 pipeline-nlppipeline-fil。例如,要运行 NLP 管道,请使用

复制
已复制!
            

morpheus run pipeline-nlp ...

虽然每个阶段都有配置选项,但也有一些选项适用于整个管道。查看 morpheus run --helpmorpheus run pipeline-nlp --helpmorpheus run pipeline-fil --help 以获取这些全局管道选项。

使用 CLI 配置的所有管道都需要从源对象开始。Morpheus 包含的两个常用源阶段是

  • from-file

    • 从本地文件读取到管道中

    • 支持 CSV、JSON、JSON lines 和 Parquet 格式

    • 所有行都在开始时读取并一次性排队到管道中。适用于性能测试。

    • 有关更多信息,请参阅 morpheus.stages.input.file_source_stage.FileSourceStage

  • from-kafka

    • 将消息从 Kafka 集群拉取到管道中

    • Kafka 集群可以在本地主机或远程运行

    • 有关更多信息,请参阅 morpheus.stages.input.kafka_source_stage.KafkaSourceStage

从此刻起,可以从命令行顺序添加任意数量的阶段,从头到尾。例如,我们可以构建一个简单的管道,该管道从文件读取、反序列化消息、序列化它们,然后写入文件,请使用以下命令

复制
已复制!
            

morpheus --log_level=DEBUG run pipeline-other --viz_file=.tmp/simple_identity.png \ from-file --filename=examples/data/pcap_dump.jsonlines \ deserialize \ serialize \ to-file --overwrite --filename .tmp/temp_out.json

simple_identity.png

输出应类似于

复制
已复制!
            

Configuring Pipeline via CLI Starting pipeline via CLI... Ctrl+C to Quit Config: { "_model_max_batch_size": 8, "_pipeline_batch_size": 256, "ae": null, "class_labels": [], "debug": false, "edge_buffer_size": 128, "feature_length": 1, "fil": { "feature_columns": null }, "log_config_file": null, "log_level": 10, "mode": "OTHER", "num_threads": 64, "plugins": [] } CPP Enabled: True ====Starting Pipeline==== ====Pipeline Started==== ====Building Segment: linear_segment_0==== Added source: <from-file-0; FileSourceStage(filename=examples/data/pcap_dump.jsonlines, iterative=False, file_type=FileTypes.Auto, repeat=1, filter_null=True, filter_null_columns=(), parser_kwargs={})> └─> morpheus.MessageMeta Added stage: <deserialize-1; DeserializeStage(ensure_sliceable_index=True, task_type=None, task_payload=None)> └─ morpheus.MessageMeta -> morpheus.ControlMessage Added stage: <serialize-2; SerializeStage(include=(), exclude=(), fixed_columns=True)> └─ morpheus.ControlMessage -> morpheus.MessageMeta Added stage: <to-file-3; WriteToFileStage(filename=.tmp/temp_out.json, overwrite=True, file_type=FileTypes.Auto, include_index_col=True, flush=False)> └─ morpheus.MessageMeta -> morpheus.MessageMeta ====Building Segment Complete!==== ====Pipeline Complete====

管道构建检查

====Building Pipeline==== 消息之后,如果日志记录级别为 INFO 或更高,CLI 将打印所有阶段的列表以及每个阶段的类型转换。要成为有效的管道,一个阶段的输出类型必须与下一个阶段的输入类型匹配。许多阶段是灵活的,并在运行时确定其类型,但某些阶段需要特定的输入类型。如果您的管道配置不正确,Morpheus 将报告错误。例如,如果我们运行与上面相同的命令,但忘记了 serialize 阶段

复制
已复制!
            

morpheus --log_level=DEBUG run pipeline-other \ from-file --filename=examples/data/pcap_dump.jsonlines \ deserialize \ to-file --overwrite --filename .tmp/temp_out.json

然后将显示以下错误

复制
已复制!
            

RuntimeError: The to-file stage cannot handle input of <class 'morpheus.messages.control_message.ControlMessage'>. Accepted input types: (<class 'morpheus.messages.message_meta.MessageMeta'>,)

这表明 to-file 阶段无法接受 morpheus.messages.ControlMessage 的输入类型。这是因为 to-file 阶段不知道如何将该类写入文件;它只知道如何写入 morpheus.messages.message_meta.MessageMeta 的实例。为了确保您拥有有效的管道,请检查消息的 Accepted input types: (<class 'morpheus.messages.message_meta.MessageMeta'>,) 部分。这表明您需要一个阶段来将 deserialize 阶段的输出类型 ControlMessage 转换为 MessageMeta,这正是 serialize 阶段的作用。

Kafka 源示例

上面的示例本质上只是复制文件。但是,重要的是要注意,大多数 Morpheus 管道在结构上是相似的,它们都以源阶段 (from-file) 开头,后跟 deserialize 阶段,以 serialize 阶段结尾,后跟接收器阶段 (to-file),实际的训练或推理逻辑发生在两者之间。

我们也可以轻松地在上面的示例中交换源或接收器阶段,而不会对整个管道产生任何影响。例如,要从 Kafka 主题读取,只需将 from-file 阶段替换为 from-kafka

注意:这假定 Kafka 代理在本地主机上运行,并侦听端口 9092。要使用 Kafka 测试 Morpheus,请按照 快速启动 Kafka 集群 部分的步骤 1-8,位于 contributing.md 中,创建一个名为 test_pcap 的主题,然后将端口 9092 替换为您的 Kafka 实例正在侦听的端口。

复制
已复制!
            

morpheus --log_level=DEBUG run pipeline-other \ from-kafka --input_topic test_pcap --bootstrap_servers localhost:9092 \ deserialize \ serialize \ to-file --filename .tmp/temp_out.json

有关特定管道模式的可用阶段的完整列表,请使用 CLI 帮助命令。首先,可以使用 morpheus run --help 列出可用的管道模式。然后,可以使用 morpheus run <mode> --help 列出该模式的可用阶段。例如,要列出 pipeline-nlp 模式的可用阶段

复制
已复制!
            

morpheus run pipeline-nlp --help

从 JSON 对象中删除字段

此示例仅将字段 timestampsrc_ipdest_ipexamples/data/pcap_dump.jsonlines 复制到 out.jsonlines

remove_fields_from_json_objects.png

复制
已复制!
            

morpheus run pipeline-other --viz_file=.tmp/remove_fields_from_json_objects.png \ from-file --filename examples/data/pcap_dump.jsonlines \ deserialize \ serialize --include 'timestamp' --include 'src_ip' --include 'dest_ip' \ to-file --overwrite --filename out.jsonlines

监控吞吐量

此示例在命令行报告吞吐量。

monitor_throughput.png

复制
已复制!
            

morpheus --log_level=INFO run pipeline-other --viz_file=.tmp/monitor_throughput.png \ from-file --filename examples/data/pcap_dump.jsonlines \ deserialize \ monitor --description "Lines Throughput" --smoothing 0.1 --unit "lines" \ serialize \ to-file --overwrite --filename out.jsonlines

输出

复制
已复制!
            

Configuring Pipeline via CLI Starting pipeline via CLI... Ctrl+C to Quit Lines Throughput[Complete]: 93085 lines [00:03, 29446.18 lines/s] Pipeline visualization saved to .tmp/monitor_throughput.png

注意:默认情况下,如果 log_level 设置为 WARNING 或更低,则 monitor 阶段将从管道中省略自身。

多监控器吞吐量

此示例独立报告每个阶段的吞吐量。

multi_monitor_throughput.png

复制
已复制!
            

morpheus --log_level=INFO run pipeline-nlp --viz_file=.tmp/multi_monitor_throughput.png \ from-file --filename examples/data/pcap_dump.jsonlines \ monitor --description "From File Throughput" \ deserialize \ monitor --description "Deserialize Throughput" \ serialize \ monitor --description "Serialize Throughput" \ to-file --filename out.jsonlines --overwrite

输出

复制
已复制!
            

Configuring Pipeline via CLI Starting pipeline via CLI... Ctrl+C to Quit From File Throughput[Complete]: 93085 messages [00:00, 168118.35 messages/s] Deserialize Throughput[Complete]: 93085 messages [00:04, 22584.37 messages/s] Serialize Throughput[Complete]: 93085 messages [00:06, 14095.36 messages/s] Pipeline visualization saved to .tmp/multi_monitor_throughput.png

NLP 综合示例

此示例展示了一个 NLP 管道,该管道使用了 Morpheus 中可用的多个阶段。此示例利用 Triton Inference Server 执行推理,并将输出写入名为 inference_output 的 Kafka 主题。这两者都需要在启动 Morpheus 之前启动。

启动 Triton

运行以下命令以启动 Triton 并加载 sid-minibert 模型

复制
已复制!
            

docker run --rm -ti --gpus=all -p8000:8000 -p8001:8001 -p8002:8002 nvcr.io/nvidia/morpheus/morpheus-tritonserver-models:24.10 tritonserver --model-repository=/models/triton-model-repo --exit-on-error=false --model-control-mode=explicit --load-model sid-minibert-onnx

启动 Kafka

按照 快速启动 Kafka 集群 部分的步骤 1-8,位于 contributing.md 中,创建一个名为 inference_output 的主题,然后将端口 9092 替换为您的 Kafka 实例正在侦听的端口。

nlp_kitchen_sink.png

复制
已复制!
            

morpheus --log_level=INFO run --pipeline_batch_size=1024 --model_max_batch_size=32 \ pipeline-nlp --viz_file=.tmp/nlp_kitchen_sink.png \ from-file --filename examples/data/pcap_dump.jsonlines \ deserialize \ preprocess \ inf-triton --model_name=sid-minibert-onnx --server_url=localhost:8000 \ monitor --description "Inference Rate" --smoothing=0.001 --unit "inf" \ add-class \ filter --filter_source=TENSOR --threshold=0.8 \ serialize --include 'timestamp' --exclude '^_ts_' \ to-kafka --bootstrap_servers localhost:9092 --output_topic "inference_output" \ monitor --description "ToKafka Rate" --smoothing=0.001 --unit "msg"

输出

复制
已复制!
            

Configuring Pipeline via CLI Starting pipeline via CLI... Ctrl+C to Quit Inference Rate[Complete]: 93085 inf [00:07, 12334.49 inf/s] ToKafka Rate[Complete]: 93085 msg [00:07, 13297.85 msg/s] Pipeline visualization saved to .tmp/nlp_kitchen_sink.png

上一步 Morpheus CLI 概述
下一步 Morpheus 仅 CPU 模式
© 版权所有 2024, NVIDIA。 上次更新于 2024 年 12 月 3 日。