NVIDIA Morpheus (24.10.01)

真实世界应用:网络钓鱼检测

注意: 本指南的代码可以在 Morpheus 仓库的 examples/developer_guide/2_1_real_world_phishing 目录中找到。

之前的示例演示了如何创建一个简单的 stage 并在 pipeline 的上下文中使用它,我们将继续看一个更高级的示例,该示例代表了我们在真实世界场景中可能想要做的事情。给定一组记录,每条记录代表一封电子邮件,假设我们想要预测哪些记录对应于欺诈性电子邮件。

作为此过程的一部分,除了每封电子邮件的原始内容外,我们可能还想使用在各种元数据(例如收件人计数)上训练的分类模型。如果我们假设这对于我们的示例是正确的,我们需要构建并连接一个预处理 stage,以便在应用分类器之前将此信息附加到每条记录。

对于此任务,我们需要定义一个新的 stage,我们将其称为 RecipientFeaturesStage,它将

  1. 接收与电子邮件对应的输入。

  2. 计算电子邮件元数据中收件人的数量。

  3. 发出一个 Morpheus MessageMeta 对象,该对象将包含记录内容以及增强的元数据。

对于此 stage,代码将与之前的示例类似,但有一些明显的更改。我们将使用 MessageMeta 类。这是一个 Morpheus 消息,其中包含 cuDF DataFrame。由于我们希望新的 stage 对 MessageMeta 类型进行操作,因此我们新的 accepted_types 方法定义为

复制
已复制!
            

def accepted_types(self) -> tuple: return (MessageMeta, )

接下来,我们将更新我们的 on_data 方法以执行实际工作。我们获取对传入消息的 df 属性的引用。重要的是要注意,message 是一个引用,对其或其成员(例如 df)所做的任何更改都将在现有消息实例上就地执行。

复制
已复制!
            

def on_data(self, message: MessageMeta) -> MessageMeta: # Open the DataFrame from the incoming message for in-place modification with message.mutable_dataframe() as df: df['to_count'] = df['To'].str.count('@') df['bcc_count'] = df['BCC'].str.count('@') df['cc_count'] = df['CC'].str.count('@') df['total_recipients'] = df['to_count'] + df['bcc_count'] + df['cc_count'] # Attach features to string data df['data'] = (df['to_count'].astype(str) + '[SEP]' + df['bcc_count'].astype(str) + '[SEP]' + df['cc_count'].astype(str) + '[SEP]' + df['total_recipients'].astype(str) + '[SEP]' + df['Message']) # Return the message for the next stage return message

如果不想就地改变 DataFrame,我们可以使用 MessageMeta.copy_dataframe 方法复制 DataFrame 并返回一个新的 MessageMeta。但是请注意,这会以性能和增加内存使用量为代价。我们可以通过将 on_data 方法更改为来实现这一点

复制
已复制!
            

def on_data(self, message: MessageMeta) -> MessageMeta: # Get a copy of the DataFrame from the incoming message df = message.copy_dataframe() df['to_count'] = df['To'].str.count('@') df['bcc_count'] = df['BCC'].str.count('@') df['cc_count'] = df['CC'].str.count('@') df['total_recipients'] = df['to_count'] + df['bcc_count'] + df['cc_count'] # Attach features to string data df['data'] = (df['to_count'].astype(str) + '[SEP]' + df['bcc_count'].astype(str) + '[SEP]' + df['cc_count'].astype(str) + '[SEP]' + df['total_recipients'].astype(str) + '[SEP]' + df['Message']) # Return a new message with our updated DataFrame for the next stage return MessageMeta(df)

在上面的示例中,我们向 DataFrame 添加了五个新字段。由于我们事先知道这些字段及其类型,因此作为优化,我们可以要求 Morpheus 在首次构造 DataFrame 时预先分配这些新字段。为此,我们在构造函数中填充 _needed_columns 属性

复制
已复制!
            

from morpheus.common import TypeId # ... def __init__(self, config: Config): super().__init__(config) # This stage adds new columns to the DataFrame, as an optimization we define the columns that are needed, # ensuring that these columns are pre-allocated with null values. This action is performed by Morpheus for any # stage defining this attribute. self._needed_columns.update({ 'to_count': TypeId.INT32, 'bcc_count': TypeId.INT32, 'cc_count': TypeId.INT32, 'total_recipients': TypeId.INT32, 'data': TypeId.STRING })

有关更多详细信息,请参阅 Stage 构造函数 部分。

由于此 stage 的目的专门与 NLP pipeline 的文本数据预处理相关,因此在注册 stage 时,我们将显式地将 stage 限制为 NLP pipeline。除此之外,由于我们的 stage 运行所在的 pipeline 是 GPU pipeline,因此我们将不会使用之前示例中的 GpuAndCpuMixin mixin。

复制
已复制!
            

@register_stage("recipient-features", modes=[PipelineModes.NLP]) class RecipientFeaturesStage(PassThruTypeMixin, SinglePortStage):

我们的 _build_single 方法与之前的示例保持不变;即使我们正在修改传入的消息,我们的输入和输出类型仍然相同,并且我们继续使用 PassThruTypeMixin

已完成的预处理 Stage

复制
已复制!
            

import mrc from mrc.core import operators as ops from morpheus.cli.register_stage import register_stage from morpheus.common import TypeId from morpheus.config import Config from morpheus.config import PipelineModes from morpheus.messages.message_meta import MessageMeta from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin from morpheus.pipeline.single_port_stage import SinglePortStage @register_stage("recipient-features", modes=[PipelineModes.NLP]) class RecipientFeaturesStage(PassThruTypeMixin, SinglePortStage): """ Pre-processing stage which counts the number of recipients in an email's metadata. Parameters ---------- config : morpheus.config.Config Pipeline configuration instance. """ def __init__(self, config: Config): super().__init__(config) # This stage adds new columns to the DataFrame, as an optimization we define the columns that are needed, # ensuring that these columns are pre-allocated with null values. This action is performed by Morpheus for any # stage defining this attribute. self._needed_columns.update({ 'to_count': TypeId.INT32, 'bcc_count': TypeId.INT32, 'cc_count': TypeId.INT32, 'total_recipients': TypeId.INT32, 'data': TypeId.STRING }) @property def name(self) -> str: return "recipient-features" def accepted_types(self) -> tuple: return (MessageMeta, ) def supports_cpp_node(self) -> bool: return False def on_data(self, message: MessageMeta) -> MessageMeta: # Open the DataFrame from the incoming message for in-place modification with message.mutable_dataframe() as df: df['to_count'] = df['To'].str.count('@') df['bcc_count'] = df['BCC'].str.count('@') df['cc_count'] = df['CC'].str.count('@') df['total_recipients'] = df['to_count'] + df['bcc_count'] + df['cc_count'] # Attach features to string data df['data'] = (df['to_count'].astype(str) + '[SEP]' + df['bcc_count'].astype(str) + '[SEP]' + df['cc_count'].astype(str) + '[SEP]' + df['total_recipients'].astype(str) + '[SEP]' + df['Message']) # Return the message for the next stage return message def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject: node = builder.make_node(self.unique_name, ops.map(self.on_data)) builder.make_edge(input_node, node) return node

独立函数

在此示例中,我们从基于类的方法开始。但是,我们也可以很容易地将其编写为独立函数。以下示例等效于上面基于类的示例

复制
已复制!
            

from morpheus.common import TypeId from morpheus.messages import MessageMeta from morpheus.pipeline.stage_decorator import stage @stage( needed_columns={ 'to_count': TypeId.INT32, 'bcc_count': TypeId.INT32, 'cc_count': TypeId.INT32, 'total_recipients': TypeId.INT32, 'data': TypeId.STRING }) def recipient_features_stage(message: MessageMeta, *, sep_token: str = '[SEP]') -> MessageMeta: # Open the DataFrame from the incoming message for in-place modification with message.mutable_dataframe() as df: df['to_count'] = df['To'].str.count('@') df['bcc_count'] = df['BCC'].str.count('@') df['cc_count'] = df['CC'].str.count('@') df['total_recipients'] = df['to_count'] + df['bcc_count'] + df['cc_count'] # Attach features to string data df['data'] = (df['to_count'].astype(str) + sep_token + df['bcc_count'].astype(str) + sep_token + df['cc_count'].astype(str) + sep_token + df['total_recipients'].astype(str) + sep_token + df['Message']) # Return the message for the next stage return message

在上面,needed_columns 作为参数提供给 stage 装饰器,可选的 sep_token 参数作为关键字参数公开。

注意: stage 装饰器方法的一个缺点是我们失去了在运行时根据构造函数参数确定 needed_columns 的能力。

现在我们将使用我们刚刚在真实世界 pipeline 中创建的 RecipientFeaturesStage 来检测欺诈性电子邮件。我们将构建的 pipeline 使用了 TritonInferenceStage,这是一个预定义的 Morpheus stage,旨在支持通过 NVIDIA 的 Triton Inference Server 执行自然语言处理 (NLP) 模型。NVIDIA Triton Inference Server 允许 GPU 加速的 ML/DL 以及各种模型框架的无缝共址和执行。对于我们的应用程序,我们将使用 phishing-bert-onnx 模型,该模型包含在 Morpheus 模型 Docker 容器中以及 models/triton-model-repo/phishing-bert-onnx 目录中。

这里需要注意的是,Triton 是一项独立于 Morpheus pipeline 的服务,通常不会与 pipeline 的其余部分驻留在同一台机器上。TritonInferenceStage 将使用 HTTP 和 gRPC 网络协议,以便我们与 Triton 服务器托管的机器学习模型进行交互。

启动 Triton

在执行我们的 pipeline 时,需要运行 Triton。为了简单起见,我们将使用 Morpheus 模型容器,其中包含 Triton 和 Morpheus 模型。

注意: 此步骤假定您已安装 DockerNVIDIA Container Toolkit

我们将使用以下命令启动 Triton Docker 容器

复制
已复制!
            

docker run --rm -ti --gpus=all -p8000:8000 -p8001:8001 -p8002:8002 \ nvcr.io/nvidia/morpheus/morpheus-tritonserver-models:24.10 \ tritonserver --model-repository=/models/triton-model-repo \ --exit-on-error=false \ --log-info=true \ --strict-readiness=false \ --disable-auto-complete-config \ --model-control-mode=explicit \ --load-model=phishing-bert-onnx

一旦我们运行了 Triton,我们可以使用 curl 验证其是否健康。/v2/health/live 端点应返回 200 状态代码

复制
已复制!
            

curl -v "localhost:8000/v2/health/live"

我们还可以查询 Triton 以获取可用模型

复制
已复制!
            

curl -X POST "localhost:8000/v2/repository/index"

让我们向 Triton 询问有关我们将要使用的 phishing-bert-onnx 模型的一些信息,并使用 jq 解析大型 JSON 输出

复制
已复制!
            

curl "localhost:8000/v2/models/phishing-bert-onnx/config" | jq

输出

复制
已复制!
            

{ "name": "phishing-bert-onnx", "platform": "onnxruntime_onnx", "backend": "onnxruntime", "version_policy": { "latest": { "num_versions": 1 } }, "max_batch_size": 32, "input": [ { "name": "input_ids", "data_type": "TYPE_INT64", "format": "FORMAT_NONE", "dims": [ 128 ], "is_shape_tensor": false, "allow_ragged_batch": false, "optional": false }, { "name": "attention_mask", "data_type": "TYPE_INT64", "format": "FORMAT_NONE", "dims": [ 128 ], "is_shape_tensor": false, "allow_ragged_batch": false, "optional": false } ], "output": [ { "name": "output", "data_type": "TYPE_FP32", "dims": [ 2 ], "label_filename": "", "is_shape_tensor": false } ], "batch_input": [], "batch_output": [], "optimization": { "priority": "PRIORITY_DEFAULT", "execution_accelerators": { "gpu_execution_accelerator": [ { "name": "tensorrt", "parameters": { "max_workspace_size_bytes": "1073741824", "precision_mode": "FP16" } } ], "cpu_execution_accelerator": [] }, "input_pinned_memory": { "enable": true }, "output_pinned_memory": { "enable": true }, "gather_kernel_buffer_threshold": 0, "eager_batching": false }, "dynamic_batching": { "preferred_batch_size": [ 1, 4, 8, 12, 16, 20, 24, 28, 32 ], "max_queue_delay_microseconds": 50000, "preserve_ordering": false, "priority_levels": 0, "default_priority_level": 0, "priority_queue_policy": {} }, "instance_group": [ { "name": "phishing-bert-onnx", "kind": "KIND_GPU", "count": 1, "gpus": [ 0 ], "secondary_devices": [], "profile": [], "passive": false, "host_policy": "" } ], "default_model_filename": "model.onnx", "cc_model_filenames": {}, "metric_tags": {}, "parameters": {}, "model_warmup": [] }

从这些信息中,我们注意到模型输入的预期维度是 "dims": [128]

定义我们的 Pipeline

对于此 pipeline,我们将有几个配置参数,例如输入和输出文件的路径,我们将使用 (click)[https://click.palletsprojects.com/] 库来公开和解析这些参数作为命令行参数。我们还将通过 --use_stage_function 命令行标志公开使用基于类或基于函数的 stage 实现的选择。

注意: 为了简单起见,我们假设 MORPHEUS_ROOT 环境变量设置为 Morpheus 项目仓库的根目录。

首先,我们需要实例化并设置 Config 类的一些属性。此对象用于 pipeline 整体的全局配置选项。我们将此对象以及 stage 特定的配置参数提供给每个 stage。

复制
已复制!
            

config = Config() config.mode = PipelineModes.NLP config.num_threads = len(os.sched_getaffinity(0)) config.feature_length = model_fea_length with open(labels_file, encoding='UTF-8') as fh: config.class_labels = [x.strip() for x in fh]

首先,我们将 pipeline 模式设置为 NLP。接下来,我们将 num_threads 属性设置为与系统中内核数匹配。

feature_length 属性需要与模型输入的维度匹配,这是我们在上一节中使用模型的 /config 端点从 Triton 获取的。

真实值分类标签从 Morpheus 中包含的 morpheus/data/labels_phishing.txt 文件中读取。

现在我们的配置对象已填充,我们继续进行 pipeline 本身。我们将使用之前示例中的相同输入文件。

接下来,我们将自定义收件人特征 stage 添加到 pipeline。我们导入了 stage 的两种实现,允许我们根据命令行提供的 use_stage_function 值添加适当的实现。

复制
已复制!
            

# Add our custom stage if use_stage_function: pipeline.add_stage(recipient_features_stage(config)) else: pipeline.add_stage(RecipientFeaturesStage(config))

为了标记化输入数据,我们将使用 Morpheus 的 PreprocessNLPStage。此 stage 使用 cuDF 子词标记器 将字符串转换为数字张量,以便输入到神经网络模型中。我们不是按字符或空格拆分字符串,而是根据子词在大型训练语料库中出现的频率将其拆分为有意义的子词。您可以在此处找到更多详细信息:https://arxiv.org/abs/1810.04805v2。我们现在需要知道的是,文本将根据我们提供的词汇表文件 (vocab_hash_file=vocab file) 转换为子词标记 ID。

让我们继续实例化我们的 PreprocessNLPStage 并将其添加到 pipeline

复制
已复制!
            

pipeline.add_stage( PreprocessNLPStage( config, vocab_hash_file=vocab_file, truncation=True, do_lower_case=True, add_special_tokens=False))

除了提供我们上面定义的 Config 对象之外,我们还将此 stage 配置为

  • 使用 morpheus/data/bert-base-uncased-hash.txt 词汇表文件作为其子词标记 ID (vocab_hash_file=vocab_file)。

  • 将文本长度截断为最大标记数 (truncation=True)。

  • 将大小写更改为全部小写 (do_lower_case=True)。

  • 避免添加默认的 BERT 特殊标记,例如用于分隔两个句子的 [SEP] 和在文本开头的 [CLS] (add_special_tokens=False)。

请注意,标记器参数和词汇表哈希文件应与 NLP 模型训练期间使用的标记化完全匹配。

此时,我们有了一个 pipeline,它可以读取一组记录,并使用分类器进行预测所需的元数据对其进行预处理。我们的下一步是定义一个 stage,该 stage 将机器学习模型应用于我们的 MessageMeta 对象。为了实现这一点,我们将使用 Morpheus 的 TritonInferenceStage。此 stage 将处理与 phishing-bert-onnx 模型的通信,我们通过 models 目录挂载将其提供给 Triton Docker 容器。

接下来,我们将添加一个 monitor stage 来测量推理速率

复制
已复制!
            

# Add a inference stage pipeline.add_stage( TritonInferenceStage( config, model_name=model_name, server_url=server_url, force_convert_inputs=True, )) pipeline.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf"))

在这里,我们添加一个后处理 stage,该 stage 添加 is_phishing 的概率分数

复制
已复制!
            

pipeline.add_stage(AddScoresStage(config, labels=["is_phishing"]))

最后,我们将结果保存到磁盘。为此,我们使用了两个 stage,它们通常一起使用:SerializeStageWriteToFileStage

SerializeStage 用于在输出中根据需要包含和排除列。重要的是,它还处理从 ControlMessage 输出类型到 WriteToFileStage 预期作为输入的 MessageMeta 类的转换。

WriteToFileStage 会在收到消息时将消息数据附加到输出文件。但是请注意,出于性能原因,WriteToFileStage 不会在每次收到消息时将其内容刷新到磁盘。相反,它依赖于底层的 缓冲输出流 根据需要进行刷新,然后在关闭时关闭文件句柄。

复制
已复制!
            

# Write the file to the output pipeline.add_stage(SerializeStage(config)) pipeline.add_stage(WriteToFileStage(config, filename=output_file, overwrite=True))

请注意,我们没有指定输出格式。在我们的示例中,默认输出文件名包含扩展名 .jsonlines。Morpheus 将根据扩展名推断输出格式。在编写本文时,Morpheus 将推断的扩展名是:.csv.json.jsonlines

要显式设置输出格式,我们可以将 file_type 参数指定给 WriteToFileStage,这是一个在 morpheus.common.FileTypes 中定义的枚举。支持的值为

  • FileTypes.Auto

  • FileTypes.CSV

  • FileTypes.JSON

已完成的 Pipeline

复制
已复制!
            

import logging import os import tempfile import click import morpheus from morpheus.config import Config from morpheus.config import PipelineModes from morpheus.pipeline import LinearPipeline from morpheus.stages.general.monitor_stage import MonitorStage from morpheus.stages.inference.triton_inference_stage import TritonInferenceStage from morpheus.stages.input.file_source_stage import FileSourceStage from morpheus.stages.output.write_to_file_stage import WriteToFileStage from morpheus.stages.postprocess.add_scores_stage import AddScoresStage from morpheus.stages.postprocess.serialize_stage import SerializeStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage from morpheus.stages.preprocess.preprocess_nlp_stage import PreprocessNLPStage from morpheus.utils.logger import configure_logging from recipient_features_stage import RecipientFeaturesStage from recipient_features_stage_deco import recipient_features_stage MORPHEUS_ROOT = os.environ['MORPHEUS_ROOT'] @click.command() @click.option("--use_stage_function", is_flag=True, default=False, help="Use the function based version of the recipient features stage instead of the class") @click.option( "--labels_file", type=click.Path(exists=True, readable=True), default=os.path.join(morpheus.DATA_DIR, 'labels_phishing.txt'), help="Specifies a file to read labels from in order to convert class IDs into labels.", ) @click.option( "--vocab_file", type=click.Path(exists=True, readable=True), default=os.path.join(morpheus.DATA_DIR, 'bert-base-uncased-hash.txt'), help="Path to hash file containing vocabulary of words with token-ids.", ) @click.option( "--input_file", type=click.Path(exists=True, readable=True), default=os.path.join(MORPHEUS_ROOT, 'examples/data/email_with_addresses.jsonlines'), help="Input filepath.", ) @click.option( "--model_fea_length", default=128, type=click.IntRange(min=1), help="Features length to use for the model.", ) @click.option( "--model_name", default="phishing-bert-onnx", help="The name of the model that is deployed on Tritonserver.", ) @click.option("--server_url", default='localhost:8000', help="Tritonserver url.") @click.option( "--output_file", default=os.path.join(tempfile.gettempdir(), "detections.jsonlines"), help="The path to the file where the inference output will be saved.", ) def run_pipeline(use_stage_function: bool, labels_file: str, vocab_file: str, input_file: str, model_fea_length: int, model_name: str, server_url: str, output_file: str): """Run the phishing detection pipeline.""" # Enable the default logger configure_logging(log_level=logging.INFO) # It's necessary to configure the pipeline for NLP mode config = Config() config.mode = PipelineModes.NLP # Set the thread count to match our cpu count config.num_threads = len(os.sched_getaffinity(0)) config.feature_length = model_fea_length with open(labels_file, encoding='UTF-8') as fh: config.class_labels = [x.strip() for x in fh] # Create a linear pipeline object pipeline = LinearPipeline(config) # Set source stage pipeline.set_source(FileSourceStage(config, filename=input_file, iterative=False)) # Add our custom stage if use_stage_function: pipeline.add_stage(recipient_features_stage(config)) else: pipeline.add_stage(RecipientFeaturesStage(config)) # Add a deserialize stage pipeline.add_stage(DeserializeStage(config)) # Tokenize the input pipeline.add_stage( PreprocessNLPStage(config, vocab_hash_file=vocab_file, truncation=True, do_lower_case=True, add_special_tokens=False)) # Add a inference stage pipeline.add_stage( TritonInferenceStage( config, model_name=model_name, server_url=server_url, force_convert_inputs=True, )) # Monitor the inference rate pipeline.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf")) # Add detection scores pipeline.add_stage(AddScoresStage(config, labels=["is_phishing"])) # Write the to the output file pipeline.add_stage(SerializeStage(config)) pipeline.add_stage(WriteToFileStage(config, filename=output_file, overwrite=True)) # Run the pipeline pipeline.run() if __name__ == "__main__": run_pipeline()

备用 Morpheus CLI 示例

上面的 pipeline 也可以使用 Morpheus CLI 构建。

从 Morpheus repo 的根目录运行

复制
已复制!
            

morpheus --log_level=debug --plugin examples/developer_guide/2_1_real_world_phishing/recipient_features_stage.py \ run pipeline-nlp --labels_file=data/labels_phishing.txt --model_seq_length=128 \ from-file --filename=examples/data/email_with_addresses.jsonlines \ recipient-features \ deserialize \ preprocess --vocab_hash_file=data/bert-base-uncased-hash.txt --truncation=true --do_lower_case=true --add_special_tokens=false \ inf-triton --model_name=phishing-bert-onnx --server_url=localhost:8000 --force_convert_inputs=true \ monitor --description="Inference Rate" --smoothing=0.001 --unit=inf \ add-scores --label=is_phishing \ serialize \ to-file --filename=/tmp/detections.jsonlines --overwrite

在我们的 RecipientFeaturesStage 示例中,我们向 stage 添加了一个构造函数,但是我们没有详细介绍实现。每个 stage 构造函数都必须接收 Config 对象的实例作为其第一个参数,然后可以自由定义其他 stage 特定的参数。Morpheus 配置对象将包含 pipeline 中多个 stage 所需的配置参数,并且每个 Morpheus stage 中的构造函数都可以自由检查这些参数。相反,特定于单个 stage 的参数通常定义为构造函数参数。最佳实践是在构造函数中执行任何必要的验证检查,并在配置错误的情况下引发异常。这使我们能够在 pipeline 启动之前而不是之后尽早失败。

在我们的 RecipientFeaturesStage 示例中,我们硬编码了 Bert 分隔符标记。让我们改为重构代码以将其作为构造函数参数接收。这个新的构造函数参数按照 numpydoc 格式样式进行文档化,使其可以为 API 和 CLI 用户正确记录。让我们也借此机会验证 pipeline 模式是否设置为 morpheus.config.PipelineModes.NLP

注意: 在 register_stage 装饰器中设置 pipeline 模式会将我们的 stage 的使用限制为在使用 Morpheus 命令行工具时的 NLP pipeline,但是 Python API 没有这种强制执行。

我们重构的类定义现在是

复制
已复制!
            

@register_stage("recipient-features", modes=[PipelineModes.NLP]) class RecipientFeaturesStage(PassThruTypeMixin, SinglePortStage): """ Pre-processing stage which counts the number of recipients in an email's metadata. Parameters ---------- config : morpheus.config.Config Pipeline configuration instance. sep_token : str Bert separator token. """ def __init__(self, config: Config, sep_token: str = '[SEP]'): super().__init__(config) if config.mode != PipelineModes.NLP: raise RuntimeError( "RecipientFeaturesStage must be used in a pipeline configured for NLP" ) if len(sep_token) > 0: self._sep_token = sep_token else: raise ValueError("sep_token cannot be an empty string") # This stage adds new columns to the DataFrame, as an optimization we define the columns that are needed, # ensuring that these columns are pre-allocated with null values. This action is performed by Morpheus for any # stage defining this attribute. self._needed_columns.update({ 'to_count': TypeId.INT32, 'bcc_count': TypeId.INT32, 'cc_count': TypeId.INT32, 'total_recipients': TypeId.INT32, 'data': TypeId.STRING }) @property def name(self) -> str: return "recipient-features" def accepted_types(self) -> tuple: return (MessageMeta, ) def supports_cpp_node(self) -> bool: return False def on_data(self, message: MessageMeta) -> MessageMeta: # Open the DataFrame from the incoming message for in-place modification with message.mutable_dataframe() as df: df['to_count'] = df['To'].str.count('@') df['bcc_count'] = df['BCC'].str.count('@') df['cc_count'] = df['CC'].str.count('@') df['total_recipients'] = df['to_count'] + df['bcc_count'] + df[ 'cc_count'] # Attach features to string data df['data'] = (df['to_count'].astype(str) + self._sep_token + df['bcc_count'].astype(str) + self._sep_token + df['cc_count'].astype(str) + self._sep_token + df['total_recipients'].astype(str) + self._sep_token + df['Message']) # Return the message for the next stage return message def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject: node = builder.make_node(self.unique_name, ops.map(self.on_data)) builder.make_edge(input_node, node) return node

如果我们进行上述更改,我们可以使用以下命令查看生成的帮助字符串

复制
已复制!
            

morpheus --plugin examples/developer_guide/2_1_real_world_phishing/recipient_features_stage.py run pipeline-nlp recipient-features --help

复制
已复制!
            

Configuring Pipeline via CLI Usage: morpheus run pipeline-nlp recipient-features [OPTIONS] Pre-processing stage which counts the number of recipients in an email's metadata. Options: --sep_token TEXT Bert separator token. [default: [SEP]] --help Show this message and exit.

注意: 本指南的代码可以在 Morpheus 仓库的 examples/developer_guide/2_2_rabbitmq 目录中找到。

基于类的方法

创建新的 source stage 与定义任何其他 stage 类似,但有一些差异。首先,我们将继承 SingleOutputSource 并包含 PreallocatorMixin。其次,必需的方法是 name 属性、_build_sourcecompute_schemasupports_cpp_node 方法。

在此示例中,我们将创建一个 source,它使用 Python 的 pika 客户端从 RabbitMQ 队列中读取消息。为了简单起见,我们将假设我们的 RabbitMQ 交换不需要身份验证,并且 RabbitMQ 消息的主体将是 JSON 格式的。身份验证和对其他格式的支持都可以稍后轻松添加。

添加到 stage 类(通常是 source stage)时,PreallocatorMixin 指示 stage 将新构造的 DataFrame 直接或包含在 MessageMeta 实例中发出到 pipeline 中。添加此 mixin 允许将其他 stage 所需的任何列插入到 DataFrame 中。

与 pass through stage 类似,这个新的 source stage 应该能够在 GPU 和 CPU 执行模式下运行,因此我们将使用 GpuAndCpuMixin mixin。需要注意的一点是,当在 GPU 模式下运行时,MessageMeta 对象的 DataFrame 有效负载始终是 cudf.DataFrame,而在 CPU 模式下运行时,则是 pandas.DataFrame。当支持 GPU 和 CPU 执行模式时,必须注意避免在没有 GPU 的系统上的 CPU 模式下运行时直接导入 cudf(或任何其他需要 GPU 的软件包),否则会导致错误。Stage 能够使用 morpheus.config.Config.execution_mode 属性检查执行模式。get_df_pkg() 帮助方法用于根据构造函数中的执行模式导入适当的 DataFrame 包

复制
已复制!
            

# This will return either cudf.DataFrame or pandas.DataFrame depending on the execution mode self._df_pkg = get_df_pkg(config.execution_mode)

compute_schema 方法允许我们定义 MessageMeta 的输出类型,我们通过调用传递给该方法的 StageSchema 对象的 output_schema 属性的 set_type 方法来做到这一点。这里值得注意的是,stage 根据传递给构造函数的配置参数来确定其输出类型是完全有效的。但是,stage 必须记录每个输出端口的单个输出类型。如果 stage 发出多种输出类型,则这些类型必须共享一个公共基类,该基类将用作 stage 的输出类型。

复制
已复制!
            

def compute_schema(self, schema: StageSchema): schema.output_schema.set_type(MessageMeta)

_build_source 方法与 _build_single 方法类似;它接收 MRC 段构建器 (mrc.Builder) 的实例并返回 mrc.SegmentObject。但是,与之前的示例不同,source stage 没有父 stage,因此不接收输入节点。我们将调用 make_source 并使用参数 self.source_generator 而不是使用 make_node 构建我们的节点,我们将在接下来定义该方法。

复制
已复制!
            

def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject: return builder.make_source(self.unique_name, self.source_generator)

source_generator 方法是 RabbitMQ 特定代码存在最多的地方。当我们有希望发射到 pipeline 中的消息时,我们只需 yield 它。我们继续此过程,直到 is_stop_requested() 方法返回 Truesubscription.is_subscribed() 返回 False

复制
已复制!
            

def source_generator(self, subscription: mrc.Subscription) -> collections.abc.Iterator[MessageMeta]: try: while not self.is_stop_requested() and subscription.is_subscribed(): (method_frame, _, body) = self._channel.basic_get(self._queue_name) if method_frame is not None: try: buffer = StringIO(body.decode("utf-8")) df = self._df_pkg.read_json(buffer, orient='records', lines=True) yield MessageMeta(df=df) except Exception as ex: logger.exception("Error occurred converting RabbitMQ message to Dataframe:%s", ex) finally: self._channel.basic_ack(method_frame.delivery_tag) else: # queue is empty, sleep before polling again time.sleep(self._poll_interval.total_seconds()) finally: self._connection.close()

请注意,我们尽可能快地从队列中读取消息。当队列为空时,我们调用 time.sleep,以便在需要时发生上下文切换。只有在我们成功发出消息或反序列化消息失败后,我们才会确认消息(通过调用 basic_ack)。这意味着,如果在使用队列时 pipeline 关闭,我们将不会丢失任何消息。但是,在这种情况下,我们可能会得到重复的消息(也就是说,如果 pipeline 在我们 yield 消息之后但在调用 basic_ack 之前关闭)。

已完成的 Source Stage

复制
已复制!
            

import collections.abc import logging import time from io import StringIO import mrc import pandas as pd import pika from morpheus.cli.register_stage import register_stage from morpheus.config import Config from morpheus.messages.message_meta import MessageMeta from morpheus.pipeline.execution_mode_mixins import GpuAndCpuMixin from morpheus.pipeline.preallocator_mixin import PreallocatorMixin from morpheus.pipeline.single_output_source import SingleOutputSource from morpheus.pipeline.stage_schema import StageSchema from morpheus.utils.type_utils import get_df_pkg logger = logging.getLogger(__name__) @register_stage("from-rabbitmq") class RabbitMQSourceStage(PreallocatorMixin, GpuAndCpuMixin, SingleOutputSource): """ Source stage used to load messages from a RabbitMQ queue. Parameters ---------- config : morpheus.config.Config Pipeline configuration instance. host : str Hostname or IP of the RabbitMQ server. exchange : str Name of the RabbitMQ exchange to connect to. exchange_type : str, optional RabbitMQ exchange type; defaults to `fanout`. queue_name : str, optional Name of the queue to listen to. If left blank, RabbitMQ will generate a random queue name bound to the exchange. poll_interval : str, optional Amount of time between polling RabbitMQ for new messages """ def __init__(self, config: Config, host: str, exchange: str, exchange_type: str = 'fanout', queue_name: str = '', poll_interval: str = '100millis'): super().__init__(config) self._connection = pika.BlockingConnection(pika.ConnectionParameters(host=host)) self._channel = self._connection.channel() self._channel.exchange_declare(exchange=exchange, exchange_type=exchange_type) result = self._channel.queue_declare(queue=queue_name, exclusive=True) # When queue_name='' we will receive a randomly generated queue name self._queue_name = result.method.queue self._channel.queue_bind(exchange=exchange, queue=self._queue_name) self._poll_interval = pd.Timedelta(poll_interval) # This will return either cudf.DataFrame or pandas.DataFrame depending on the execution mode self._df_pkg = get_df_pkg(config.execution_mode) @property def name(self) -> str: return "from-rabbitmq" def supports_cpp_node(self) -> bool: return False def compute_schema(self, schema: StageSchema): schema.output_schema.set_type(MessageMeta) def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject: return builder.make_source(self.unique_name, self.source_generator) def source_generator(self, subscription: mrc.Subscription) -> collections.abc.Iterator[MessageMeta]: try: while not self.is_stop_requested() and subscription.is_subscribed(): (method_frame, _, body) = self._channel.basic_get(self._queue_name) if method_frame is not None: try: buffer = StringIO(body.decode("utf-8")) df = self._df_pkg.read_json(buffer, orient='records', lines=True) yield MessageMeta(df=df) except Exception as ex: logger.exception("Error occurred converting RabbitMQ message to Dataframe:%s", ex) finally: self._channel.basic_ack(method_frame.delivery_tag) else: # queue is empty, sleep before polling again time.sleep(self._poll_interval.total_seconds()) finally: self._connection.close()

基于函数的方法

与之前示例中使用的 stage 装饰器类似,Morpheus 提供了 source() 装饰器,它包装一个生成器函数以用作 source stage。在基于类的方法中,我们显式添加了 PreallocatorMixin,当使用 source 装饰器时,将检查返回类型注释,如果返回类型是 DataFrame 类型或包含 DataFrame (MessageMetaControlMessage) 的消息,则将使用 PreallocatorMixin 创建 stage。我们还将通过将 execution_modes 参数设置为装饰器来指示 stage 支持哪些执行模式。

该函数的代码将首先执行与类构造函数中使用的相同设置,然后进入与 source_generator 方法中几乎相同的循环。

复制
已复制!
            

import collections.abc import logging import time from io import StringIO import mrc import pandas as pd import pika from morpheus.config import ExecutionMode from morpheus.messages.message_meta import MessageMeta from morpheus.pipeline.stage_decorator import source from morpheus.utils.type_utils import get_df_pkg logger = logging.getLogger(__name__) @source(name="from-rabbitmq", execution_modes=(ExecutionMode.GPU, ExecutionMode.CPU)) def rabbitmq_source(subscription: mrc.Subscription, host: str, exchange: str, exchange_type: str = 'fanout', queue_name: str = '', poll_interval: str = '100millis') -> collections.abc.Iterator[MessageMeta]: """ Source stage used to load messages from a RabbitMQ queue. Parameters ---------- subscription : mrc.Subscription Subscription object used to determine if the pipeline is still running. host : str Hostname or IP of the RabbitMQ server. exchange : str Name of the RabbitMQ exchange to connect to. exchange_type : str, optional RabbitMQ exchange type; defaults to `fanout`. queue_name : str, optional Name of the queue to listen to. If left blank, RabbitMQ will generate a random queue name bound to the exchange. poll_interval : str, optional Amount of time between polling RabbitMQ for new messages """ connection = pika.BlockingConnection(pika.ConnectionParameters(host=host)) channel = connection.channel() channel.exchange_declare(exchange=exchange, exchange_type=exchange_type) result = channel.queue_declare(queue=queue_name, exclusive=True) # When queue_name='' we will receive a randomly generated queue name queue_name = result.method.queue channel.queue_bind(exchange=exchange, queue=queue_name) poll_interval = pd.Timedelta(poll_interval) df_pkg = get_df_pkg() try: while subscription.is_subscribed(): (method_frame, _, body) = channel.basic_get(queue_name) if method_frame is not None: try: buffer = StringIO(body.decode("utf-8")) df = df_pkg.read_json(buffer, orient='records', lines=True) yield MessageMeta(df=df) except Exception as ex: logger.exception("Error occurred converting RabbitMQ message to Dataframe:%s", ex) finally: channel.basic_ack(method_frame.delivery_tag) else: # queue is empty, sleep before polling again time.sleep(poll_interval.total_seconds()) finally: connection.close()

在 Morpheus 中,如果 stage 将 pipeline 的结果输出到 pipeline 外部的目标,则我们将其定义为 sink。Morpheus 在 morpheus.stages.output 命名空间下包含多个 sink stage。

回想一下,在上一节中,我们编写了一个 RabbitMQSourceStage。我们现在将通过编写一个 sink stage 来补充这一点,该 stage 可以将 Morpheus 数据输出到 RabbitMQ。对于此示例,我们再次使用 Python 的 pika 客户端。

我们的 sink 的代码将与其他 stage 类似,但有一些更改。首先,我们将继承 SinglePortStage

复制
已复制!
            

@register_stage("to-rabbitmq") class WriteToRabbitMQStage(PassThruTypeMixin, GpuAndCpuMixin, SinglePortStage):

我们的 sink 将充当 pass-through,从而允许将其他 sink 添加到 pipeline 的可能性。假设我们可以有一个 pipeline,在其中我们将结果同时发送到 RabbitMQ 和文件。因此,我们还将使用 PassThruTypeMixin

sink_deps.png

在我们的 _build_single 方法中,我们将使用 make_sink 方法而不是 make_nodemake_source

复制
已复制!
            

def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject: node = builder.make_sink(self.unique_name, self.on_data, self.on_error, self.on_complete) builder.make_edge(input_node, node) return node

与我们之前的示例类似,stage 的大部分实际业务逻辑都包含在 on_data 方法中。在这种情况下,我们获取对附加到传入消息的 DataFrame 的引用。然后,我们序列化到 io.StringIO 缓冲区,然后将其发送到 RabbitMQ。

注意: 此 stage 支持 GPU 和 CPU 执行模式。在 GPU 模式下运行时,MessageMeta 对象的有效负载始终是 cuDF DataFrame。在 CPU 模式下运行时,有效负载始终是 pandas DataFrame。在许多情况下,两者在 API 上都是兼容的,而无需进行任何代码更改。但在某些情况下,API 可能略有不同,并且需要知道有效负载类型,必须注意在没有 GPU 的系统上的 CPU 模式下运行时不要直接导入 cudf 或任何其他需要 GPU 的软件包。Morpheus 提供了一些辅助方法来帮助解决此问题,这些方法位于 type_utils 模块中,例如 is_cudf_type()get_df_pkg_from_obj()

复制
已复制!
            

def on_data(self, message: MessageMeta) -> MessageMeta: df = message.df buffer = StringIO() df.to_json(buffer, orient='records', lines=True) body = buffer.getvalue().strip() self._channel.basic_publish(exchange=self._exchange, routing_key=self._routing_key, body=body) return message

本示例中介绍的两个新方法是 on_erroron_complete 方法。对于这两种方法,我们都希望确保 connection 对象已正确关闭。

注意: 我们没有关闭 channel 对象,因为关闭连接也会关闭任何关联的 channel 对象。

复制
已复制!
            

def on_error(self, ex: Exception): logger.exception("Error occurred :%s", ex) self._connection.close() def on_complete(self): self._connection.close()

已完成的 Sink Stage

复制
已复制!
            

import logging from io import StringIO import mrc import pika from morpheus.cli.register_stage import register_stage from morpheus.config import Config from morpheus.messages.message_meta import MessageMeta from morpheus.pipeline.execution_mode_mixins import GpuAndCpuMixin from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin from morpheus.pipeline.single_port_stage import SinglePortStage logger = logging.getLogger(__name__) @register_stage("to-rabbitmq") class WriteToRabbitMQStage(PassThruTypeMixin, GpuAndCpuMixin, SinglePortStage): """ Source stage used to load messages from a RabbitMQ queue. Parameters ---------- config : morpheus.config.Config Pipeline configuration instance. host : str Hostname or IP of the RabbitMQ server. exchange : str Name of the RabbitMQ exchange to connect to. exchange_type : str RabbitMQ exchange type; defaults to `fanout`. routing_key : str RabbitMQ routing key if needed. """ def __init__(self, config: Config, host: str, exchange: str, exchange_type: str = 'fanout', routing_key: str = ''): super().__init__(config) self._connection = pika.BlockingConnection(pika.ConnectionParameters(host=host)) self._exchange = exchange self._routing_key = routing_key self._channel = self._connection.channel() self._channel.exchange_declare(exchange=self._exchange, exchange_type=exchange_type) @property def name(self) -> str: return "to-rabbitmq" def accepted_types(self) -> tuple: return (MessageMeta, ) def supports_cpp_node(self) -> bool: return False def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject: node = builder.make_sink(self.unique_name, self.on_data, self.on_error, self.on_complete) builder.make_edge(input_node, node) return node def on_data(self, message: MessageMeta) -> MessageMeta: df = message.df buffer = StringIO() df.to_json(buffer, orient='records', lines=True) body = buffer.getvalue().strip() self._channel.basic_publish(exchange=self._exchange, routing_key=self._routing_key, body=body) return message def on_error(self, ex: Exception): logger.exception("Error occurred :%s", ex) self._connection.close() def on_complete(self): self._connection.close()

注意: 有关测试 RabbitMQSourceStagerabbitmq_sourceWriteToRabbitMQStage stage 的信息,请参阅 Morpheus repo 中的 examples/developer_guide/2_2_rabbitmq/README.md

Previous Simple Python Stage
Next Simple C++ Stage
© 版权所有 2024, NVIDIA。 最后更新于 2024 年 12 月 3 日。