真实世界应用:网络钓鱼检测
注意: 本指南的代码可以在 Morpheus 仓库的 examples/developer_guide/2_1_real_world_phishing
目录中找到。
之前的示例演示了如何创建一个简单的 stage 并在 pipeline 的上下文中使用它,我们将继续看一个更高级的示例,该示例代表了我们在真实世界场景中可能想要做的事情。给定一组记录,每条记录代表一封电子邮件,假设我们想要预测哪些记录对应于欺诈性电子邮件。
作为此过程的一部分,除了每封电子邮件的原始内容外,我们可能还想使用在各种元数据(例如收件人计数)上训练的分类模型。如果我们假设这对于我们的示例是正确的,我们需要构建并连接一个预处理 stage,以便在应用分类器之前将此信息附加到每条记录。
对于此任务,我们需要定义一个新的 stage,我们将其称为 RecipientFeaturesStage
,它将
接收与电子邮件对应的输入。
计算电子邮件元数据中收件人的数量。
发出一个 Morpheus
MessageMeta
对象,该对象将包含记录内容以及增强的元数据。
对于此 stage,代码将与之前的示例类似,但有一些明显的更改。我们将使用 MessageMeta
类。这是一个 Morpheus 消息,其中包含 cuDF DataFrame。由于我们希望新的 stage 对 MessageMeta
类型进行操作,因此我们新的 accepted_types
方法定义为
def accepted_types(self) -> tuple:
return (MessageMeta, )
接下来,我们将更新我们的 on_data
方法以执行实际工作。我们获取对传入消息的 df
属性的引用。重要的是要注意,message
是一个引用,对其或其成员(例如 df
)所做的任何更改都将在现有消息实例上就地执行。
def on_data(self, message: MessageMeta) -> MessageMeta:
# Open the DataFrame from the incoming message for in-place modification
with message.mutable_dataframe() as df:
df['to_count'] = df['To'].str.count('@')
df['bcc_count'] = df['BCC'].str.count('@')
df['cc_count'] = df['CC'].str.count('@')
df['total_recipients'] = df['to_count'] + df['bcc_count'] + df['cc_count']
# Attach features to string data
df['data'] = (df['to_count'].astype(str) + '[SEP]' + df['bcc_count'].astype(str) + '[SEP]' +
df['cc_count'].astype(str) + '[SEP]' + df['total_recipients'].astype(str) + '[SEP]' +
df['Message'])
# Return the message for the next stage
return message
如果不想就地改变 DataFrame,我们可以使用 MessageMeta.copy_dataframe
方法复制 DataFrame 并返回一个新的 MessageMeta
。但是请注意,这会以性能和增加内存使用量为代价。我们可以通过将 on_data
方法更改为来实现这一点
def on_data(self, message: MessageMeta) -> MessageMeta:
# Get a copy of the DataFrame from the incoming message
df = message.copy_dataframe()
df['to_count'] = df['To'].str.count('@')
df['bcc_count'] = df['BCC'].str.count('@')
df['cc_count'] = df['CC'].str.count('@')
df['total_recipients'] = df['to_count'] + df['bcc_count'] + df['cc_count']
# Attach features to string data
df['data'] = (df['to_count'].astype(str) + '[SEP]' + df['bcc_count'].astype(str) + '[SEP]' +
df['cc_count'].astype(str) + '[SEP]' + df['total_recipients'].astype(str) + '[SEP]' +
df['Message'])
# Return a new message with our updated DataFrame for the next stage
return MessageMeta(df)
在上面的示例中,我们向 DataFrame 添加了五个新字段。由于我们事先知道这些字段及其类型,因此作为优化,我们可以要求 Morpheus 在首次构造 DataFrame 时预先分配这些新字段。为此,我们在构造函数中填充 _needed_columns
属性
from morpheus.common import TypeId
# ...
def __init__(self, config: Config):
super().__init__(config)
# This stage adds new columns to the DataFrame, as an optimization we define the columns that are needed,
# ensuring that these columns are pre-allocated with null values. This action is performed by Morpheus for any
# stage defining this attribute.
self._needed_columns.update({
'to_count': TypeId.INT32,
'bcc_count': TypeId.INT32,
'cc_count': TypeId.INT32,
'total_recipients': TypeId.INT32,
'data': TypeId.STRING
})
有关更多详细信息,请参阅 Stage 构造函数 部分。
由于此 stage 的目的专门与 NLP pipeline 的文本数据预处理相关,因此在注册 stage 时,我们将显式地将 stage 限制为 NLP pipeline。除此之外,由于我们的 stage 运行所在的 pipeline 是 GPU pipeline,因此我们将不会使用之前示例中的 GpuAndCpuMixin
mixin。
@register_stage("recipient-features", modes=[PipelineModes.NLP])
class RecipientFeaturesStage(PassThruTypeMixin, SinglePortStage):
我们的 _build_single
方法与之前的示例保持不变;即使我们正在修改传入的消息,我们的输入和输出类型仍然相同,并且我们继续使用 PassThruTypeMixin
。
已完成的预处理 Stage
import mrc
from mrc.core import operators as ops
from morpheus.cli.register_stage import register_stage
from morpheus.common import TypeId
from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.messages.message_meta import MessageMeta
from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin
from morpheus.pipeline.single_port_stage import SinglePortStage
@register_stage("recipient-features", modes=[PipelineModes.NLP])
class RecipientFeaturesStage(PassThruTypeMixin, SinglePortStage):
"""
Pre-processing stage which counts the number of recipients in an email's metadata.
Parameters
----------
config : morpheus.config.Config
Pipeline configuration instance.
"""
def __init__(self, config: Config):
super().__init__(config)
# This stage adds new columns to the DataFrame, as an optimization we define the columns that are needed,
# ensuring that these columns are pre-allocated with null values. This action is performed by Morpheus for any
# stage defining this attribute.
self._needed_columns.update({
'to_count': TypeId.INT32,
'bcc_count': TypeId.INT32,
'cc_count': TypeId.INT32,
'total_recipients': TypeId.INT32,
'data': TypeId.STRING
})
@property
def name(self) -> str:
return "recipient-features"
def accepted_types(self) -> tuple:
return (MessageMeta, )
def supports_cpp_node(self) -> bool:
return False
def on_data(self, message: MessageMeta) -> MessageMeta:
# Open the DataFrame from the incoming message for in-place modification
with message.mutable_dataframe() as df:
df['to_count'] = df['To'].str.count('@')
df['bcc_count'] = df['BCC'].str.count('@')
df['cc_count'] = df['CC'].str.count('@')
df['total_recipients'] = df['to_count'] + df['bcc_count'] + df['cc_count']
# Attach features to string data
df['data'] = (df['to_count'].astype(str) + '[SEP]' + df['bcc_count'].astype(str) + '[SEP]' +
df['cc_count'].astype(str) + '[SEP]' + df['total_recipients'].astype(str) + '[SEP]' +
df['Message'])
# Return the message for the next stage
return message
def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject:
node = builder.make_node(self.unique_name, ops.map(self.on_data))
builder.make_edge(input_node, node)
return node
独立函数
在此示例中,我们从基于类的方法开始。但是,我们也可以很容易地将其编写为独立函数。以下示例等效于上面基于类的示例
from morpheus.common import TypeId
from morpheus.messages import MessageMeta
from morpheus.pipeline.stage_decorator import stage
@stage(
needed_columns={
'to_count': TypeId.INT32,
'bcc_count': TypeId.INT32,
'cc_count': TypeId.INT32,
'total_recipients': TypeId.INT32,
'data': TypeId.STRING
})
def recipient_features_stage(message: MessageMeta, *, sep_token: str = '[SEP]') -> MessageMeta:
# Open the DataFrame from the incoming message for in-place modification
with message.mutable_dataframe() as df:
df['to_count'] = df['To'].str.count('@')
df['bcc_count'] = df['BCC'].str.count('@')
df['cc_count'] = df['CC'].str.count('@')
df['total_recipients'] = df['to_count'] + df['bcc_count'] + df['cc_count']
# Attach features to string data
df['data'] = (df['to_count'].astype(str) + sep_token + df['bcc_count'].astype(str) + sep_token +
df['cc_count'].astype(str) + sep_token + df['total_recipients'].astype(str) + sep_token +
df['Message'])
# Return the message for the next stage
return message
在上面,needed_columns
作为参数提供给 stage
装饰器,可选的 sep_token
参数作为关键字参数公开。
注意: stage
装饰器方法的一个缺点是我们失去了在运行时根据构造函数参数确定 needed_columns
的能力。
现在我们将使用我们刚刚在真实世界 pipeline 中创建的 RecipientFeaturesStage
来检测欺诈性电子邮件。我们将构建的 pipeline 使用了 TritonInferenceStage
,这是一个预定义的 Morpheus stage,旨在支持通过 NVIDIA 的 Triton Inference Server 执行自然语言处理 (NLP) 模型。NVIDIA Triton Inference Server 允许 GPU 加速的 ML/DL 以及各种模型框架的无缝共址和执行。对于我们的应用程序,我们将使用 phishing-bert-onnx
模型,该模型包含在 Morpheus 模型 Docker 容器中以及 models/triton-model-repo/phishing-bert-onnx
目录中。
这里需要注意的是,Triton 是一项独立于 Morpheus pipeline 的服务,通常不会与 pipeline 的其余部分驻留在同一台机器上。TritonInferenceStage
将使用 HTTP 和 gRPC 网络协议,以便我们与 Triton 服务器托管的机器学习模型进行交互。
启动 Triton
在执行我们的 pipeline 时,需要运行 Triton。为了简单起见,我们将使用 Morpheus 模型容器,其中包含 Triton 和 Morpheus 模型。
注意: 此步骤假定您已安装 Docker 和 NVIDIA Container Toolkit。
我们将使用以下命令启动 Triton Docker 容器
docker run --rm -ti --gpus=all -p8000:8000 -p8001:8001 -p8002:8002 \
nvcr.io/nvidia/morpheus/morpheus-tritonserver-models:24.10 \
tritonserver --model-repository=/models/triton-model-repo \
--exit-on-error=false \
--log-info=true \
--strict-readiness=false \
--disable-auto-complete-config \
--model-control-mode=explicit \
--load-model=phishing-bert-onnx
一旦我们运行了 Triton,我们可以使用 curl 验证其是否健康。/v2/health/live
端点应返回 200 状态代码
curl -v "localhost:8000/v2/health/live"
我们还可以查询 Triton 以获取可用模型
curl -X POST "localhost:8000/v2/repository/index"
让我们向 Triton 询问有关我们将要使用的 phishing-bert-onnx
模型的一些信息,并使用 jq
解析大型 JSON 输出
curl "localhost:8000/v2/models/phishing-bert-onnx/config" | jq
输出
{
"name": "phishing-bert-onnx",
"platform": "onnxruntime_onnx",
"backend": "onnxruntime",
"version_policy": {
"latest": {
"num_versions": 1
}
},
"max_batch_size": 32,
"input": [
{
"name": "input_ids",
"data_type": "TYPE_INT64",
"format": "FORMAT_NONE",
"dims": [
128
],
"is_shape_tensor": false,
"allow_ragged_batch": false,
"optional": false
},
{
"name": "attention_mask",
"data_type": "TYPE_INT64",
"format": "FORMAT_NONE",
"dims": [
128
],
"is_shape_tensor": false,
"allow_ragged_batch": false,
"optional": false
}
],
"output": [
{
"name": "output",
"data_type": "TYPE_FP32",
"dims": [
2
],
"label_filename": "",
"is_shape_tensor": false
}
],
"batch_input": [],
"batch_output": [],
"optimization": {
"priority": "PRIORITY_DEFAULT",
"execution_accelerators": {
"gpu_execution_accelerator": [
{
"name": "tensorrt",
"parameters": {
"max_workspace_size_bytes": "1073741824",
"precision_mode": "FP16"
}
}
],
"cpu_execution_accelerator": []
},
"input_pinned_memory": {
"enable": true
},
"output_pinned_memory": {
"enable": true
},
"gather_kernel_buffer_threshold": 0,
"eager_batching": false
},
"dynamic_batching": {
"preferred_batch_size": [
1,
4,
8,
12,
16,
20,
24,
28,
32
],
"max_queue_delay_microseconds": 50000,
"preserve_ordering": false,
"priority_levels": 0,
"default_priority_level": 0,
"priority_queue_policy": {}
},
"instance_group": [
{
"name": "phishing-bert-onnx",
"kind": "KIND_GPU",
"count": 1,
"gpus": [
0
],
"secondary_devices": [],
"profile": [],
"passive": false,
"host_policy": ""
}
],
"default_model_filename": "model.onnx",
"cc_model_filenames": {},
"metric_tags": {},
"parameters": {},
"model_warmup": []
}
从这些信息中,我们注意到模型输入的预期维度是 "dims": [128]
。
定义我们的 Pipeline
对于此 pipeline,我们将有几个配置参数,例如输入和输出文件的路径,我们将使用 (click)[https://click.palletsprojects.com/] 库来公开和解析这些参数作为命令行参数。我们还将通过 --use_stage_function
命令行标志公开使用基于类或基于函数的 stage 实现的选择。
注意: 为了简单起见,我们假设 MORPHEUS_ROOT
环境变量设置为 Morpheus 项目仓库的根目录。
首先,我们需要实例化并设置 Config
类的一些属性。此对象用于 pipeline 整体的全局配置选项。我们将此对象以及 stage 特定的配置参数提供给每个 stage。
config = Config()
config.mode = PipelineModes.NLP
config.num_threads = len(os.sched_getaffinity(0))
config.feature_length = model_fea_length
with open(labels_file, encoding='UTF-8') as fh:
config.class_labels = [x.strip() for x in fh]
首先,我们将 pipeline 模式设置为 NLP
。接下来,我们将 num_threads
属性设置为与系统中内核数匹配。
feature_length
属性需要与模型输入的维度匹配,这是我们在上一节中使用模型的 /config
端点从 Triton 获取的。
真实值分类标签从 Morpheus 中包含的 morpheus/data/labels_phishing.txt
文件中读取。
现在我们的配置对象已填充,我们继续进行 pipeline 本身。我们将使用之前示例中的相同输入文件。
接下来,我们将自定义收件人特征 stage 添加到 pipeline。我们导入了 stage 的两种实现,允许我们根据命令行提供的 use_stage_function
值添加适当的实现。
# Add our custom stage
if use_stage_function:
pipeline.add_stage(recipient_features_stage(config))
else:
pipeline.add_stage(RecipientFeaturesStage(config))
为了标记化输入数据,我们将使用 Morpheus 的 PreprocessNLPStage
。此 stage 使用 cuDF 子词标记器 将字符串转换为数字张量,以便输入到神经网络模型中。我们不是按字符或空格拆分字符串,而是根据子词在大型训练语料库中出现的频率将其拆分为有意义的子词。您可以在此处找到更多详细信息:https://arxiv.org/abs/1810.04805v2。我们现在需要知道的是,文本将根据我们提供的词汇表文件 (vocab_hash_file=vocab file
) 转换为子词标记 ID。
让我们继续实例化我们的 PreprocessNLPStage
并将其添加到 pipeline
pipeline.add_stage(
PreprocessNLPStage(
config,
vocab_hash_file=vocab_file,
truncation=True,
do_lower_case=True,
add_special_tokens=False))
除了提供我们上面定义的 Config
对象之外,我们还将此 stage 配置为
使用
morpheus/data/bert-base-uncased-hash.txt
词汇表文件作为其子词标记 ID (vocab_hash_file=vocab_file
)。将文本长度截断为最大标记数 (
truncation=True
)。将大小写更改为全部小写 (
do_lower_case=True
)。避免添加默认的 BERT 特殊标记,例如用于分隔两个句子的
[SEP]
和在文本开头的[CLS]
(add_special_tokens=False
)。
请注意,标记器参数和词汇表哈希文件应与 NLP 模型训练期间使用的标记化完全匹配。
此时,我们有了一个 pipeline,它可以读取一组记录,并使用分类器进行预测所需的元数据对其进行预处理。我们的下一步是定义一个 stage,该 stage 将机器学习模型应用于我们的 MessageMeta
对象。为了实现这一点,我们将使用 Morpheus 的 TritonInferenceStage
。此 stage 将处理与 phishing-bert-onnx
模型的通信,我们通过 models
目录挂载将其提供给 Triton Docker 容器。
接下来,我们将添加一个 monitor stage 来测量推理速率
# Add a inference stage
pipeline.add_stage(
TritonInferenceStage(
config,
model_name=model_name,
server_url=server_url,
force_convert_inputs=True,
))
pipeline.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf"))
在这里,我们添加一个后处理 stage,该 stage 添加 is_phishing
的概率分数
pipeline.add_stage(AddScoresStage(config, labels=["is_phishing"]))
最后,我们将结果保存到磁盘。为此,我们使用了两个 stage,它们通常一起使用:SerializeStage
和 WriteToFileStage
。
SerializeStage
用于在输出中根据需要包含和排除列。重要的是,它还处理从 ControlMessage
输出类型到 WriteToFileStage
预期作为输入的 MessageMeta
类的转换。
WriteToFileStage
会在收到消息时将消息数据附加到输出文件。但是请注意,出于性能原因,WriteToFileStage
不会在每次收到消息时将其内容刷新到磁盘。相反,它依赖于底层的 缓冲输出流 根据需要进行刷新,然后在关闭时关闭文件句柄。
# Write the file to the output
pipeline.add_stage(SerializeStage(config))
pipeline.add_stage(WriteToFileStage(config, filename=output_file, overwrite=True))
请注意,我们没有指定输出格式。在我们的示例中,默认输出文件名包含扩展名 .jsonlines
。Morpheus 将根据扩展名推断输出格式。在编写本文时,Morpheus 将推断的扩展名是:.csv
、.json
和 .jsonlines
。
要显式设置输出格式,我们可以将 file_type
参数指定给 WriteToFileStage
,这是一个在 morpheus.common.FileTypes
中定义的枚举。支持的值为
FileTypes.Auto
FileTypes.CSV
FileTypes.JSON
已完成的 Pipeline
import logging
import os
import tempfile
import click
import morpheus
from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.pipeline import LinearPipeline
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.inference.triton_inference_stage import TritonInferenceStage
from morpheus.stages.input.file_source_stage import FileSourceStage
from morpheus.stages.output.write_to_file_stage import WriteToFileStage
from morpheus.stages.postprocess.add_scores_stage import AddScoresStage
from morpheus.stages.postprocess.serialize_stage import SerializeStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage
from morpheus.stages.preprocess.preprocess_nlp_stage import PreprocessNLPStage
from morpheus.utils.logger import configure_logging
from recipient_features_stage import RecipientFeaturesStage
from recipient_features_stage_deco import recipient_features_stage
MORPHEUS_ROOT = os.environ['MORPHEUS_ROOT']
@click.command()
@click.option("--use_stage_function",
is_flag=True,
default=False,
help="Use the function based version of the recipient features stage instead of the class")
@click.option(
"--labels_file",
type=click.Path(exists=True, readable=True),
default=os.path.join(morpheus.DATA_DIR, 'labels_phishing.txt'),
help="Specifies a file to read labels from in order to convert class IDs into labels.",
)
@click.option(
"--vocab_file",
type=click.Path(exists=True, readable=True),
default=os.path.join(morpheus.DATA_DIR, 'bert-base-uncased-hash.txt'),
help="Path to hash file containing vocabulary of words with token-ids.",
)
@click.option(
"--input_file",
type=click.Path(exists=True, readable=True),
default=os.path.join(MORPHEUS_ROOT, 'examples/data/email_with_addresses.jsonlines'),
help="Input filepath.",
)
@click.option(
"--model_fea_length",
default=128,
type=click.IntRange(min=1),
help="Features length to use for the model.",
)
@click.option(
"--model_name",
default="phishing-bert-onnx",
help="The name of the model that is deployed on Tritonserver.",
)
@click.option("--server_url", default='localhost:8000', help="Tritonserver url.")
@click.option(
"--output_file",
default=os.path.join(tempfile.gettempdir(), "detections.jsonlines"),
help="The path to the file where the inference output will be saved.",
)
def run_pipeline(use_stage_function: bool,
labels_file: str,
vocab_file: str,
input_file: str,
model_fea_length: int,
model_name: str,
server_url: str,
output_file: str):
"""Run the phishing detection pipeline."""
# Enable the default logger
configure_logging(log_level=logging.INFO)
# It's necessary to configure the pipeline for NLP mode
config = Config()
config.mode = PipelineModes.NLP
# Set the thread count to match our cpu count
config.num_threads = len(os.sched_getaffinity(0))
config.feature_length = model_fea_length
with open(labels_file, encoding='UTF-8') as fh:
config.class_labels = [x.strip() for x in fh]
# Create a linear pipeline object
pipeline = LinearPipeline(config)
# Set source stage
pipeline.set_source(FileSourceStage(config, filename=input_file, iterative=False))
# Add our custom stage
if use_stage_function:
pipeline.add_stage(recipient_features_stage(config))
else:
pipeline.add_stage(RecipientFeaturesStage(config))
# Add a deserialize stage
pipeline.add_stage(DeserializeStage(config))
# Tokenize the input
pipeline.add_stage(
PreprocessNLPStage(config,
vocab_hash_file=vocab_file,
truncation=True,
do_lower_case=True,
add_special_tokens=False))
# Add a inference stage
pipeline.add_stage(
TritonInferenceStage(
config,
model_name=model_name,
server_url=server_url,
force_convert_inputs=True,
))
# Monitor the inference rate
pipeline.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf"))
# Add detection scores
pipeline.add_stage(AddScoresStage(config, labels=["is_phishing"]))
# Write the to the output file
pipeline.add_stage(SerializeStage(config))
pipeline.add_stage(WriteToFileStage(config, filename=output_file, overwrite=True))
# Run the pipeline
pipeline.run()
if __name__ == "__main__":
run_pipeline()
备用 Morpheus CLI 示例
上面的 pipeline 也可以使用 Morpheus CLI 构建。
从 Morpheus repo 的根目录运行
morpheus --log_level=debug --plugin examples/developer_guide/2_1_real_world_phishing/recipient_features_stage.py \
run pipeline-nlp --labels_file=data/labels_phishing.txt --model_seq_length=128 \
from-file --filename=examples/data/email_with_addresses.jsonlines \
recipient-features \
deserialize \
preprocess --vocab_hash_file=data/bert-base-uncased-hash.txt --truncation=true --do_lower_case=true --add_special_tokens=false \
inf-triton --model_name=phishing-bert-onnx --server_url=localhost:8000 --force_convert_inputs=true \
monitor --description="Inference Rate" --smoothing=0.001 --unit=inf \
add-scores --label=is_phishing \
serialize \
to-file --filename=/tmp/detections.jsonlines --overwrite
在我们的 RecipientFeaturesStage
示例中,我们向 stage 添加了一个构造函数,但是我们没有详细介绍实现。每个 stage 构造函数都必须接收 Config
对象的实例作为其第一个参数,然后可以自由定义其他 stage 特定的参数。Morpheus 配置对象将包含 pipeline 中多个 stage 所需的配置参数,并且每个 Morpheus stage 中的构造函数都可以自由检查这些参数。相反,特定于单个 stage 的参数通常定义为构造函数参数。最佳实践是在构造函数中执行任何必要的验证检查,并在配置错误的情况下引发异常。这使我们能够在 pipeline 启动之前而不是之后尽早失败。
在我们的 RecipientFeaturesStage
示例中,我们硬编码了 Bert 分隔符标记。让我们改为重构代码以将其作为构造函数参数接收。这个新的构造函数参数按照 numpydoc
格式样式进行文档化,使其可以为 API 和 CLI 用户正确记录。让我们也借此机会验证 pipeline 模式是否设置为 morpheus.config.PipelineModes.NLP
。
注意: 在 register_stage
装饰器中设置 pipeline 模式会将我们的 stage 的使用限制为在使用 Morpheus 命令行工具时的 NLP pipeline,但是 Python API 没有这种强制执行。
我们重构的类定义现在是
@register_stage("recipient-features", modes=[PipelineModes.NLP])
class RecipientFeaturesStage(PassThruTypeMixin, SinglePortStage):
"""
Pre-processing stage which counts the number of recipients in an email's metadata.
Parameters
----------
config : morpheus.config.Config
Pipeline configuration instance.
sep_token : str
Bert separator token.
"""
def __init__(self, config: Config, sep_token: str = '[SEP]'):
super().__init__(config)
if config.mode != PipelineModes.NLP:
raise RuntimeError(
"RecipientFeaturesStage must be used in a pipeline configured for NLP"
)
if len(sep_token) > 0:
self._sep_token = sep_token
else:
raise ValueError("sep_token cannot be an empty string")
# This stage adds new columns to the DataFrame, as an optimization we define the columns that are needed,
# ensuring that these columns are pre-allocated with null values. This action is performed by Morpheus for any
# stage defining this attribute.
self._needed_columns.update({
'to_count': TypeId.INT32,
'bcc_count': TypeId.INT32,
'cc_count': TypeId.INT32,
'total_recipients': TypeId.INT32,
'data': TypeId.STRING
})
@property
def name(self) -> str:
return "recipient-features"
def accepted_types(self) -> tuple:
return (MessageMeta, )
def supports_cpp_node(self) -> bool:
return False
def on_data(self, message: MessageMeta) -> MessageMeta:
# Open the DataFrame from the incoming message for in-place modification
with message.mutable_dataframe() as df:
df['to_count'] = df['To'].str.count('@')
df['bcc_count'] = df['BCC'].str.count('@')
df['cc_count'] = df['CC'].str.count('@')
df['total_recipients'] = df['to_count'] + df['bcc_count'] + df[
'cc_count']
# Attach features to string data
df['data'] = (df['to_count'].astype(str) + self._sep_token +
df['bcc_count'].astype(str) + self._sep_token +
df['cc_count'].astype(str) + self._sep_token +
df['total_recipients'].astype(str) +
self._sep_token + df['Message'])
# Return the message for the next stage
return message
def _build_single(self, builder: mrc.Builder,
input_node: mrc.SegmentObject) -> mrc.SegmentObject:
node = builder.make_node(self.unique_name, ops.map(self.on_data))
builder.make_edge(input_node, node)
return node
如果我们进行上述更改,我们可以使用以下命令查看生成的帮助字符串
morpheus --plugin examples/developer_guide/2_1_real_world_phishing/recipient_features_stage.py run pipeline-nlp recipient-features --help
Configuring Pipeline via CLI
Usage: morpheus run pipeline-nlp recipient-features [OPTIONS]
Pre-processing stage which counts the number of recipients in an email's metadata.
Options:
--sep_token TEXT Bert separator token. [default: [SEP]]
--help Show this message and exit.
注意: 本指南的代码可以在 Morpheus 仓库的 examples/developer_guide/2_2_rabbitmq
目录中找到。
基于类的方法
创建新的 source stage 与定义任何其他 stage 类似,但有一些差异。首先,我们将继承 SingleOutputSource
并包含 PreallocatorMixin
。其次,必需的方法是 name
属性、_build_source
、compute_schema
和 supports_cpp_node
方法。
在此示例中,我们将创建一个 source,它使用 Python 的 pika 客户端从 RabbitMQ 队列中读取消息。为了简单起见,我们将假设我们的 RabbitMQ 交换不需要身份验证,并且 RabbitMQ 消息的主体将是 JSON 格式的。身份验证和对其他格式的支持都可以稍后轻松添加。
添加到 stage 类(通常是 source stage)时,PreallocatorMixin
指示 stage 将新构造的 DataFrame 直接或包含在 MessageMeta
实例中发出到 pipeline 中。添加此 mixin 允许将其他 stage 所需的任何列插入到 DataFrame 中。
与 pass through stage 类似,这个新的 source stage 应该能够在 GPU 和 CPU 执行模式下运行,因此我们将使用 GpuAndCpuMixin
mixin。需要注意的一点是,当在 GPU 模式下运行时,MessageMeta
对象的 DataFrame 有效负载始终是 cudf.DataFrame
,而在 CPU 模式下运行时,则是 pandas.DataFrame
。当支持 GPU 和 CPU 执行模式时,必须注意避免在没有 GPU 的系统上的 CPU 模式下运行时直接导入 cudf
(或任何其他需要 GPU 的软件包),否则会导致错误。Stage 能够使用 morpheus.config.Config.execution_mode
属性检查执行模式。get_df_pkg()
帮助方法用于根据构造函数中的执行模式导入适当的 DataFrame 包
# This will return either cudf.DataFrame or pandas.DataFrame depending on the execution mode
self._df_pkg = get_df_pkg(config.execution_mode)
compute_schema
方法允许我们定义 MessageMeta
的输出类型,我们通过调用传递给该方法的 StageSchema
对象的 output_schema
属性的 set_type
方法来做到这一点。这里值得注意的是,stage 根据传递给构造函数的配置参数来确定其输出类型是完全有效的。但是,stage 必须记录每个输出端口的单个输出类型。如果 stage 发出多种输出类型,则这些类型必须共享一个公共基类,该基类将用作 stage 的输出类型。
def compute_schema(self, schema: StageSchema):
schema.output_schema.set_type(MessageMeta)
_build_source
方法与 _build_single
方法类似;它接收 MRC 段构建器 (mrc.Builder
) 的实例并返回 mrc.SegmentObject
。但是,与之前的示例不同,source stage 没有父 stage,因此不接收输入节点。我们将调用 make_source
并使用参数 self.source_generator
而不是使用 make_node
构建我们的节点,我们将在接下来定义该方法。
def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject:
return builder.make_source(self.unique_name, self.source_generator)
source_generator
方法是 RabbitMQ 特定代码存在最多的地方。当我们有希望发射到 pipeline 中的消息时,我们只需 yield
它。我们继续此过程,直到 is_stop_requested()
方法返回 True
或 subscription.is_subscribed()
返回 False
。
def source_generator(self, subscription: mrc.Subscription) -> collections.abc.Iterator[MessageMeta]:
try:
while not self.is_stop_requested() and subscription.is_subscribed():
(method_frame, _, body) = self._channel.basic_get(self._queue_name)
if method_frame is not None:
try:
buffer = StringIO(body.decode("utf-8"))
df = self._df_pkg.read_json(buffer, orient='records', lines=True)
yield MessageMeta(df=df)
except Exception as ex:
logger.exception("Error occurred converting RabbitMQ message to Dataframe:%s", ex)
finally:
self._channel.basic_ack(method_frame.delivery_tag)
else:
# queue is empty, sleep before polling again
time.sleep(self._poll_interval.total_seconds())
finally:
self._connection.close()
请注意,我们尽可能快地从队列中读取消息。当队列为空时,我们调用 time.sleep
,以便在需要时发生上下文切换。只有在我们成功发出消息或反序列化消息失败后,我们才会确认消息(通过调用 basic_ack
)。这意味着,如果在使用队列时 pipeline 关闭,我们将不会丢失任何消息。但是,在这种情况下,我们可能会得到重复的消息(也就是说,如果 pipeline 在我们 yield 消息之后但在调用 basic_ack
之前关闭)。
已完成的 Source Stage
import collections.abc
import logging
import time
from io import StringIO
import mrc
import pandas as pd
import pika
from morpheus.cli.register_stage import register_stage
from morpheus.config import Config
from morpheus.messages.message_meta import MessageMeta
from morpheus.pipeline.execution_mode_mixins import GpuAndCpuMixin
from morpheus.pipeline.preallocator_mixin import PreallocatorMixin
from morpheus.pipeline.single_output_source import SingleOutputSource
from morpheus.pipeline.stage_schema import StageSchema
from morpheus.utils.type_utils import get_df_pkg
logger = logging.getLogger(__name__)
@register_stage("from-rabbitmq")
class RabbitMQSourceStage(PreallocatorMixin, GpuAndCpuMixin, SingleOutputSource):
"""
Source stage used to load messages from a RabbitMQ queue.
Parameters
----------
config : morpheus.config.Config
Pipeline configuration instance.
host : str
Hostname or IP of the RabbitMQ server.
exchange : str
Name of the RabbitMQ exchange to connect to.
exchange_type : str, optional
RabbitMQ exchange type; defaults to `fanout`.
queue_name : str, optional
Name of the queue to listen to. If left blank, RabbitMQ will generate a random queue name bound to the exchange.
poll_interval : str, optional
Amount of time between polling RabbitMQ for new messages
"""
def __init__(self,
config: Config,
host: str,
exchange: str,
exchange_type: str = 'fanout',
queue_name: str = '',
poll_interval: str = '100millis'):
super().__init__(config)
self._connection = pika.BlockingConnection(pika.ConnectionParameters(host=host))
self._channel = self._connection.channel()
self._channel.exchange_declare(exchange=exchange, exchange_type=exchange_type)
result = self._channel.queue_declare(queue=queue_name, exclusive=True)
# When queue_name='' we will receive a randomly generated queue name
self._queue_name = result.method.queue
self._channel.queue_bind(exchange=exchange, queue=self._queue_name)
self._poll_interval = pd.Timedelta(poll_interval)
# This will return either cudf.DataFrame or pandas.DataFrame depending on the execution mode
self._df_pkg = get_df_pkg(config.execution_mode)
@property
def name(self) -> str:
return "from-rabbitmq"
def supports_cpp_node(self) -> bool:
return False
def compute_schema(self, schema: StageSchema):
schema.output_schema.set_type(MessageMeta)
def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject:
return builder.make_source(self.unique_name, self.source_generator)
def source_generator(self, subscription: mrc.Subscription) -> collections.abc.Iterator[MessageMeta]:
try:
while not self.is_stop_requested() and subscription.is_subscribed():
(method_frame, _, body) = self._channel.basic_get(self._queue_name)
if method_frame is not None:
try:
buffer = StringIO(body.decode("utf-8"))
df = self._df_pkg.read_json(buffer, orient='records', lines=True)
yield MessageMeta(df=df)
except Exception as ex:
logger.exception("Error occurred converting RabbitMQ message to Dataframe:%s", ex)
finally:
self._channel.basic_ack(method_frame.delivery_tag)
else:
# queue is empty, sleep before polling again
time.sleep(self._poll_interval.total_seconds())
finally:
self._connection.close()
基于函数的方法
与之前示例中使用的 stage
装饰器类似,Morpheus 提供了 source()
装饰器,它包装一个生成器函数以用作 source stage。在基于类的方法中,我们显式添加了 PreallocatorMixin
,当使用 source
装饰器时,将检查返回类型注释,如果返回类型是 DataFrame
类型或包含 DataFrame
(MessageMeta
和 ControlMessage
) 的消息,则将使用 PreallocatorMixin
创建 stage。我们还将通过将 execution_modes
参数设置为装饰器来指示 stage 支持哪些执行模式。
该函数的代码将首先执行与类构造函数中使用的相同设置,然后进入与 source_generator
方法中几乎相同的循环。
import collections.abc
import logging
import time
from io import StringIO
import mrc
import pandas as pd
import pika
from morpheus.config import ExecutionMode
from morpheus.messages.message_meta import MessageMeta
from morpheus.pipeline.stage_decorator import source
from morpheus.utils.type_utils import get_df_pkg
logger = logging.getLogger(__name__)
@source(name="from-rabbitmq", execution_modes=(ExecutionMode.GPU, ExecutionMode.CPU))
def rabbitmq_source(subscription: mrc.Subscription,
host: str,
exchange: str,
exchange_type: str = 'fanout',
queue_name: str = '',
poll_interval: str = '100millis') -> collections.abc.Iterator[MessageMeta]:
"""
Source stage used to load messages from a RabbitMQ queue.
Parameters
----------
subscription : mrc.Subscription
Subscription object used to determine if the pipeline is still running.
host : str
Hostname or IP of the RabbitMQ server.
exchange : str
Name of the RabbitMQ exchange to connect to.
exchange_type : str, optional
RabbitMQ exchange type; defaults to `fanout`.
queue_name : str, optional
Name of the queue to listen to. If left blank, RabbitMQ will generate a random queue name bound to the exchange.
poll_interval : str, optional
Amount of time between polling RabbitMQ for new messages
"""
connection = pika.BlockingConnection(pika.ConnectionParameters(host=host))
channel = connection.channel()
channel.exchange_declare(exchange=exchange, exchange_type=exchange_type)
result = channel.queue_declare(queue=queue_name, exclusive=True)
# When queue_name='' we will receive a randomly generated queue name
queue_name = result.method.queue
channel.queue_bind(exchange=exchange, queue=queue_name)
poll_interval = pd.Timedelta(poll_interval)
df_pkg = get_df_pkg()
try:
while subscription.is_subscribed():
(method_frame, _, body) = channel.basic_get(queue_name)
if method_frame is not None:
try:
buffer = StringIO(body.decode("utf-8"))
df = df_pkg.read_json(buffer, orient='records', lines=True)
yield MessageMeta(df=df)
except Exception as ex:
logger.exception("Error occurred converting RabbitMQ message to Dataframe:%s", ex)
finally:
channel.basic_ack(method_frame.delivery_tag)
else:
# queue is empty, sleep before polling again
time.sleep(poll_interval.total_seconds())
finally:
connection.close()
在 Morpheus 中,如果 stage 将 pipeline 的结果输出到 pipeline 外部的目标,则我们将其定义为 sink。Morpheus 在 morpheus.stages.output
命名空间下包含多个 sink stage。
回想一下,在上一节中,我们编写了一个 RabbitMQSourceStage
。我们现在将通过编写一个 sink stage 来补充这一点,该 stage 可以将 Morpheus 数据输出到 RabbitMQ。对于此示例,我们再次使用 Python 的 pika 客户端。
我们的 sink 的代码将与其他 stage 类似,但有一些更改。首先,我们将继承 SinglePortStage
@register_stage("to-rabbitmq")
class WriteToRabbitMQStage(PassThruTypeMixin, GpuAndCpuMixin, SinglePortStage):
我们的 sink 将充当 pass-through,从而允许将其他 sink 添加到 pipeline 的可能性。假设我们可以有一个 pipeline,在其中我们将结果同时发送到 RabbitMQ 和文件。因此,我们还将使用 PassThruTypeMixin
。

在我们的 _build_single
方法中,我们将使用 make_sink
方法而不是 make_node
或 make_source
。
def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject:
node = builder.make_sink(self.unique_name, self.on_data, self.on_error, self.on_complete)
builder.make_edge(input_node, node)
return node
与我们之前的示例类似,stage 的大部分实际业务逻辑都包含在 on_data
方法中。在这种情况下,我们获取对附加到传入消息的 DataFrame 的引用。然后,我们序列化到 io.StringIO
缓冲区,然后将其发送到 RabbitMQ。
注意: 此 stage 支持 GPU 和 CPU 执行模式。在 GPU 模式下运行时,MessageMeta
对象的有效负载始终是 cuDF DataFrame。在 CPU 模式下运行时,有效负载始终是 pandas DataFrame。在许多情况下,两者在 API 上都是兼容的,而无需进行任何代码更改。但在某些情况下,API 可能略有不同,并且需要知道有效负载类型,必须注意在没有 GPU 的系统上的 CPU 模式下运行时不要直接导入 cudf
或任何其他需要 GPU 的软件包。Morpheus 提供了一些辅助方法来帮助解决此问题,这些方法位于 type_utils
模块中,例如 is_cudf_type()
和 get_df_pkg_from_obj()
。
def on_data(self, message: MessageMeta) -> MessageMeta:
df = message.df
buffer = StringIO()
df.to_json(buffer, orient='records', lines=True)
body = buffer.getvalue().strip()
self._channel.basic_publish(exchange=self._exchange, routing_key=self._routing_key, body=body)
return message
本示例中介绍的两个新方法是 on_error
和 on_complete
方法。对于这两种方法,我们都希望确保 connection 对象已正确关闭。
注意: 我们没有关闭 channel 对象,因为关闭连接也会关闭任何关联的 channel 对象。
def on_error(self, ex: Exception):
logger.exception("Error occurred :%s", ex)
self._connection.close()
def on_complete(self):
self._connection.close()
已完成的 Sink Stage
import logging
from io import StringIO
import mrc
import pika
from morpheus.cli.register_stage import register_stage
from morpheus.config import Config
from morpheus.messages.message_meta import MessageMeta
from morpheus.pipeline.execution_mode_mixins import GpuAndCpuMixin
from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin
from morpheus.pipeline.single_port_stage import SinglePortStage
logger = logging.getLogger(__name__)
@register_stage("to-rabbitmq")
class WriteToRabbitMQStage(PassThruTypeMixin, GpuAndCpuMixin, SinglePortStage):
"""
Source stage used to load messages from a RabbitMQ queue.
Parameters
----------
config : morpheus.config.Config
Pipeline configuration instance.
host : str
Hostname or IP of the RabbitMQ server.
exchange : str
Name of the RabbitMQ exchange to connect to.
exchange_type : str
RabbitMQ exchange type; defaults to `fanout`.
routing_key : str
RabbitMQ routing key if needed.
"""
def __init__(self, config: Config, host: str, exchange: str, exchange_type: str = 'fanout', routing_key: str = ''):
super().__init__(config)
self._connection = pika.BlockingConnection(pika.ConnectionParameters(host=host))
self._exchange = exchange
self._routing_key = routing_key
self._channel = self._connection.channel()
self._channel.exchange_declare(exchange=self._exchange, exchange_type=exchange_type)
@property
def name(self) -> str:
return "to-rabbitmq"
def accepted_types(self) -> tuple:
return (MessageMeta, )
def supports_cpp_node(self) -> bool:
return False
def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject:
node = builder.make_sink(self.unique_name, self.on_data, self.on_error, self.on_complete)
builder.make_edge(input_node, node)
return node
def on_data(self, message: MessageMeta) -> MessageMeta:
df = message.df
buffer = StringIO()
df.to_json(buffer, orient='records', lines=True)
body = buffer.getvalue().strip()
self._channel.basic_publish(exchange=self._exchange, routing_key=self._routing_key, body=body)
return message
def on_error(self, ex: Exception):
logger.exception("Error occurred :%s", ex)
self._connection.close()
def on_complete(self):
self._connection.close()
注意: 有关测试 RabbitMQSourceStage
、rabbitmq_source
和 WriteToRabbitMQStage
stage 的信息,请参阅 Morpheus repo 中的 examples/developer_guide/2_2_rabbitmq/README.md
。