NVIDIA Morpheus (24.10.01)

数字指纹 (DFP) 参考

dfp_deployment_configs.png

管道结构配置

dfp_pipeline_structure.png

训练和推理管道中的阶段可以混合和匹配,影响很小,也就是说,MultiFileSource 可以配置为从 S3 或本地文件拉取数据,并且可以完全替换为任何其他 Morpheus 输入阶段。同样,S3 写入器可以替换为任何 Morpheus 输出阶段。无论输入和输出如何,核心管道都应保持不变。虽然管道核心(上图蓝色区域内)的阶段执行应配置而非交换的常见操作。

Morpheus Config

对于推理和训练管道,Morpheus Config 对象应使用相同的值构建,例如

复制
已复制!
            

import os from morpheus.config import Config from morpheus.config import ConfigAutoEncoder from morpheus.cli.utils import get_package_relative_file from morpheus.utils.file_utils import load_labels_file

复制
已复制!
            

config = Config() config.num_threads = len(os.sched_getaffinity(0)) config.ae = ConfigAutoEncoder() config.ae.feature_columns = load_labels_file(get_package_relative_file("data/columns_ae_azure.txt"))

可能需要的其他属性

属性

类型

默认值

描述

Config.ae.userid_column_name str userIdentityaccountId DataFrame 中包含用户名或用户 ID 的列
Config.ae.timestamp_column_name str timestamp DataFrame 中包含事件时间戳的列
Config.ae.fallback_username str generic_user 通用用户模型使用的名称,不应与任何真实用户的名称匹配

模式定义

DataFrame 输入模式 ( DataFrameInputSchema )

DataFrameInputSchema 类定义了模式,指定要包含在输出 DataFrame 中的列。在 DFP 管道中,有两个阶段执行预处理,即 DFPFileToDataFrameStage 阶段和 DFPPreprocessingStage。这种将预处理阶段与需要执行的实际操作解耦的方式允许在管道中用户定义实际模式,并实现阶段的可重用性。由用户来定义将出现在 DataFrame 中的字段。输入数据中未在 column_infopreserve_columns 构造函数参数中指定的任何列都不会出现在输出中。例外情况是在 json_columns 参数中指定的 JSON 字段,该参数定义了要规范化的 JSON 字段。

重要的是要注意,在处理 column_info 中的字段之前,会先规范化 json_columns 中定义的字段,从而允许对嵌套在 JSON 列中的字段执行处理。例如,假设我们有一个名为 event 的 JSON 字段,其中包含一个名为 timestamp 的键,该键在 JSON 数据中显示为 ISO 8601 格式的日期字符串,我们可以确保将其转换为 datetime 对象以供下游阶段使用,方法如下

复制
已复制!
            

from morpheus.utils.column_info import DataFrameInputSchema from morpheus.utils.column_info import DateTimeColumn

复制
已复制!
            

schema = DataFrameInputSchema( json_columns=['event'], column_info=[DateTimeColumn(name=config.ae.timestamp_column_name, dtype=datetime, input_name='event.timestamp')])

在上面的示例中,执行了三个操作

  1. event JSON 字段被规范化,从而产生以 event. 为前缀的新字段,以便包含在输出 DataFrame 中。

  2. 新创建的字段 event.timestamp 被解析为 datetime 字段。

  3. 由于 DFP 管道显式要求时间戳字段,我们使用 config.ae.timestamp_column_name 属性命名这个新列,确保它与管道配置匹配。当 nameinput_name 相同时,旧字段将被覆盖,当它们不同时,将创建一个新字段。

DFPFileToDataFrameStage 首先执行,负责展平可能嵌套的 JSON 数据并执行任何类型的数据类型转换。DFPPreprocessingStageDFPSplitUsersStage 之后执行,从而允许按用户计算字段的可能性,例如前面提到的 logcountlocincrement 字段。这两个阶段都在 DFPFileBatcherStage 之后执行,从而允许按时间段(默认情况下为每天)计算字段。

参数

类型

描述

json_columns List[str] 传入 DataFrame 中要规范化的 JSON 列的可选列表(当前使用 pandas.json_normalize 方法)。JSON 字段中的每个键都将被展平为名为 <field>.<key> 的新列,例如,名为 user 的 JSON 字段包含名为 id 的键将导致一个名为 user.id 的新列。默认情况下,这是一个空 list
column_info List[str] 可选的 ColumnInfo 实例列表,每个实例定义要对列执行的特定操作。这些操作包括重命名、类型转换和自定义计算。默认情况下,这是一个空 list
preserve_columns List[str]str 可选的正则表达式字符串或正则表达式字符串列表,用于定义应在输出 DataFrame 中保留的输入数据中的列。默认情况下,这是一个空 list
row_filter functionNone 可选函数,在执行所有其他处理后调用。此函数接收 DataFrame 作为其唯一参数,并返回一个 DataFrame

列信息 ( ColumnInfo )

定义单个列和类型转换。

参数

类型

描述

name str 列名
dtype str 或 Python 类型 pandas 识别的任何类型字符串或 Python 类

自定义列 ( CustomColumn )

ColumnInfo 的子类,定义要由用户定义的函数 process_column_fn 计算的列。

参数

类型

描述

name str 列名
dtype str 或 Python 类型 pandas 识别的任何类型字符串或 Python 类
process_column_fn function 函数,接收整个 DataFrame 作为其唯一输入,并返回一个新的 pandas.Series 对象,该对象将存储在列 name 中。
input_column_types dict[str, str] 成功处理此列所需的输入列和预期 dtype。将此设置为 None 将传递所有列。指定需要的列可以提高性能。

重命名列 ( RenameColumn )

ColumnInfo 的子类,添加了执行重命名的功能。

参数

类型

描述

name str 目标列的名称
dtype str 或 Python 类型 pandas 识别的任何类型字符串或 Python 类
input_name str 原始列名

布尔列 ( BoolColumn )

RenameColumn 的子类,添加了将一组自定义值映射为布尔值的功能。例如,假设我们有一个字符串输入字段,其中包含五个可能的 enum 值之一:OKSUCCESSDENIEDCANCELEDEXPIRED,我们可以将这些值映射到单个布尔字段,如下所示

复制
已复制!
            

from morpheus.utils.column_info import BoolColumn

复制
已复制!
            

field = BoolColumn(name="result", dtype=bool, input_name="result", true_values=["OK", "SUCCESS"], false_values=["DENIED", "CANCELED", "EXPIRED"])

我们在本例中使用了字符串;但是,我们也可以同样轻松地映射整数状态代码。我们还可以通过为 true 和 false 提供自定义值(例如,1/0yes/no)来映射到布尔值以外的类型。

参数

类型

描述

name str 目标列的名称
dtype str 或 Python 类型 通常这应该是 bool ;但是,如果指定了 true_valuefalse_value,则它可能是另一种类型。
input_name str 原始列名
true_value Any 要为 true 值存储的可选值,应为 dtype 类型。默认为 True
false_value Any 要为 false 值存储的可选值,应为 dtype 类型。默认为 False
true_values List[str] 要解释为 true 的字符串值列表。
false_values List[str] 要解释为 false 的字符串值列表。

日期-时间列 ( DateTimeColumn )

RenameColumn 的子类,专门用于转换 UTC 本地化 datetime 值。当传入值包含时区偏移字符串时,这些值将转换为 UTC,而没有时区的值则假定为 UTC。

参数

类型

描述

name str 目标列的名称
dtype str 或 Python 类型 pandas 识别的任何类型字符串或 Python 类
input_name str 原始列名

字符串连接列 ( StringJoinColumn )

RenameColumn 的子类,通过按 sep 连接,将传入的 list 值转换为字符串。

参数

类型

描述

name str 目标列的名称
dtype str 或 Python 类型 pandas 识别的任何类型字符串或 Python 类
input_name str 原始列名
sep str 用于连接的分隔符字符串

字符串连接列 ( StringCatColumn )

ColumnInfo 的子类,将多个列中的值连接成一个新的字符串列,并用 sep 分隔。

参数

类型

描述

name str 目标列的名称
dtype str 或 Python 类型 pandas 识别的任何类型字符串或 Python 类
input_columns List[str] 要连接的列的列表
sep str 分隔符字符串

递增列 ( IncrementColumn )

DateTimeColumn 的子类,根据 input_name 字段中的日期,计算 groupby_column 中特定时间窗口 period 内值的唯一出现次数。

参数

类型

描述

name str 目标列的名称
dtype str 或 Python 类型 应为 int 或其他整数类
input_name str 包含时间戳值的原始列名
groupby_column str 要分组依据的列名
period str 执行计算的可选时间段,值必须是 pandas 的偏移字符串之一。默认为 D 一天

输入阶段

dfp_input_config.png

源阶段 ( MultiFileSource )

MultiFileSource (python/morpheus/morpheus/modules/input/multi_file_source.py) 接收路径或路径列表 (filenames),并将共同作为 fsspec.core.OpenFiles 对象发射到管道中。路径可以包括通配符 * 以及 URL(例如:s3://path)到远程存储提供商,如 S3、FTP、GCP、Azure、Databricks 和其他由 fsspec 定义的提供商。此外,可以通过在路径前加上 filecache::(例如:filecache::s3://bucket-name/key-name)在本地缓存路径。

注意: 此阶段实际上并不下载数据文件,允许在下载之前过滤和批处理文件列表。

参数

类型

描述

c morpheus.config.Config Morpheus 配置对象
filenames List[str]str 要从中读取的源文件的路径
watch bool 可选:当 True 时,将重复轮询 filenames 以查找新文件。这假定 filenames 中的至少一个路径包含通配符。默认为 False
watch_interval float watchTrue 时,这是轮询 filenames 中的路径以查找新文件的时间间隔(秒)。当 watchFalse 时忽略。

文件批处理阶段 ( DFPFileBatcherStage )

DFPFileBatcherStage (python/morpheus_dfp/morpheus_dfp/stages/dfp_file_batcher_stage.py) 将传入 DataFrame 中的数据按时间段(默认每天)分组为批次,并可选择将传入数据过滤到特定的时间窗口。此阶段可以通过将多个小文件合并到一个批次中来潜在地提高性能。此阶段假定日志的日期可以轻松推断,例如将创建时间编码在文件名中(例如,AUTH_LOG-2022-08-21T22.05.23Z.json),或使用文件系统报告的修改时间。提取日期的实际方法编码在用户提供的 date_conversion_func 函数中(稍后详细介绍)。

参数

类型

描述

c morpheus.config.Config Morpheus 配置对象
date_conversion_func function 函数接收单个 fsspec.core.OpenFile 参数并返回一个 datetime.datetime 对象
period str 对数据进行分组的时间段,值必须是 pandas 的偏移字符串之一
sampling_rate_s int 可选,默认值=None。已弃用,请考虑改用 sampling。定义后,将对传入数据文件的子集进行采样,每 sampling_rate_s 秒取第一行。
start_time datetime 可选,默认值=None。如果不为 None,则将过滤传入的数据文件,排除在 start_time 之前创建的任何文件
end_time datetime 可选,默认值=None。如果不为 None,则将过滤传入的数据文件,排除在 end_time 之后创建的任何文件
sampling str, float, int 可选,如果不为 None,则将对传入数据文件的子集进行采样。当为字符串时,该值被解释为 pandas 频率。将取每个频率的第一行。当值在 [0,1) 之间时,将取一定百分比的行。当值大于 1 时,该值被解释为要取的随机行数。

对于日志文件的创建日期编码在文件名中的情况,可以使用 morpheus/utils/file_utils.py 模块中的 date_extractordate_extractor 假定时间戳已本地化为 UTC,并且在作为参数传递给 DFPFileBatcherStage 之前,需要将其绑定到正则表达式模式。正则表达式模式需要包含以下命名组:yearmonthdayhourminutesecond,以及可选的 microsecond。如果正则表达式与 date_extractor 函数不匹配,则将回退到使用文件的修改时间。

对于包含 ISO 8601 格式日期字符串的输入文件,可以使用 iso_date_regex 正则表达式,例如

复制
已复制!
            

from functools import partial from morpheus.utils.file_utils import date_extractor from morpheus_dfp.utils.regex_utils import iso_date_regex

复制
已复制!
            

# Batch files into buckets by time. Use the default ISO date extractor from the filename pipeline.add_stage( DFPFileBatcherStage(config, period="D", date_conversion_func=functools.partial(date_extractor, filename_regex=iso_date_regex)))

注意: 如果 date_conversion_func 返回时区感知时间戳,则 start_timeend_time (如果不为 None)也需要是时区感知 datetime 对象。

文件到 DataFrame 阶段 ( DFPFileToDataFrameStage )

DFPFileToDataFrameStage (python/morpheus_dfp/morpheus_dfp/stages/dfp_file_to_df.py) 阶段接收 fsspec.core.OpenFileslist,并将它们加载到单个 DataFrame 中,然后将其发射到管道中。当父阶段为 DFPFileBatcherStage 时,每个批次(通常为一天)都将连接到单个 DataFrame 中。如果父阶段为 MultiFileSource,则整个数据集将加载到单个 DataFrame 中。因此,为 DFPFileBatcherStage 选择一个足够小的 period 参数非常重要,以便每个批次都可以容纳在内存中。

参数

类型

描述

c morpheus.config.Config Morpheus 配置对象
schema DataFrameInputSchema 指定要加载的列以及任何必要的重命名和数据类型转换的模式
filter_null bool 可选:是否在加载后过滤空行,默认值为 True。
file_type morpheus.common.FileTypes (enum) 可选:指示要加载的文件类型。在编写本文时,当前支持的值为:FileTypes.AutoFileTypes.CSVFileTypes.JSONFileTypes.PARQUET。默认值为 FileTypes.Auto,它将根据文件扩展名推断类型,如果使用自定义扩展名,请设置此值
parser_kwargs dictNone 可选:要传递到 DataFrame 解析器的其他关键字参数,当前这将是 pandas.read_csvpandas.read_jsonpandas.read_parquet
cache_dir str 可选:缓存位置的路径,默认为 ./.cache/dfp

此阶段能够通过多种方法并发下载和加载数据文件。当前支持的方法有:single_threaddaskdask_thread。使用的方法通过设置 MORPHEUS_FILE_DOWNLOAD_TYPE 环境变量来选择,默认使用 dask_thread,而 single_thread 有效地禁用并发加载。

此阶段将在 cache_dir 中缓存生成的 DataFrame,由于我们正在缓存 DataFrame 而不是源文件,因此缓存命中可以避免解析传入数据的成本。对于 S3 等远程存储系统,缓存命中可以避免解析和下载。这样做的结果之一是,对 schema 的任何更改都需要在 cache_dir 中清除缓存文件,然后这些更改才能可见。

注意: 此缓存是对使用可选 filecache:: 前缀时可能发生的任何缓存的补充。

输出阶段

dfp_output_config.png

对于推理管道,除了下面记录的 WriteToS3Stage 之外,还可以使用任何 Morpheus 输出阶段,例如 WriteToFileStageWriteToKafkaStage

写入文件阶段 ( WriteToFileStage )

最后一个阶段将所有收到的消息写入 CSV 或 JSON 格式的单个输出文件。

参数

类型

描述

c morpheus.config.Config Morpheus 配置对象
filename str 要将异常日志消息写入的文件。
overwrite bool 可选,默认为 False。如果 filename 中指定的文件已存在,则如果将此选项设置为 True,它将被覆盖

写入 S3 阶段 ( WriteToS3Stage )

WriteToS3Stage 阶段将生成的异常检测结果写入 S3。WriteToS3Stage 将 S3 特定操作与 Morpheus 阶段解耦,因此接收 s3_writer 参数。

参数

类型

描述

c morpheus.config.Config Morpheus 配置对象
s3_writer function 用户定义的函数,接收 morpheus.messages.message_meta.MessageMeta 的实例并返回相同的消息实例。任何 S3 特定配置(例如存储桶名称)都应绑定到该方法。

核心管道

这些阶段在训练和推理管道中都很常见,与输入和输出阶段不同,这些阶段特定于 DFP 管道,旨在进行配置,但不可替换。

拆分用户阶段 ( DFPSplitUsersStage )

DFPSplitUsersStage 阶段接收传入的 DataFrame 并发射 DFPMessageMetalist,其中每个 DFPMessageMeta 代表与给定用户关联的记录。这允许下游阶段按用户执行所有必要的操作。

参数

类型

描述

c morpheus.config.Config Morpheus 配置对象
include_generic bool True 时,将为通用用户构造一个 DFPMessageMeta,其中包含所有未被 skip_usersonly_users 过滤器排除的记录
include_individual bool True 时,将为每个未被 skip_usersonly_users 过滤器排除的用户构造一个 DFPMessageMeta 实例
skip_users List[str]None 要排除的用户列表,当 include_genericTrue 时,排除的记录也将从通用用户中排除。与 only_users 互斥。
only_users List[str]None 将记录限制为特定用户列表,当 include_genericTrue 时,通用用户的记录也将限制为此列表中的用户。与 skip_users 互斥。

滚动窗口阶段 ( DFPRollingWindowStage )

DFPRollingWindowStage 阶段为 DFP 执行几个关键功能。

  1. 此阶段按用户保留日志的移动窗口

    • 这些日志保存到磁盘,以减少同一用户的日志之间的内存需求

  2. 仅当满足窗口历史记录要求时,它才会发射日志

    • 在满足所有窗口历史记录要求之前,不会将消息发送到管道的其余部分。

    • 用于定义窗口历史记录要求的配置选项在下面详细介绍。

  3. 它重复必要的日志以正确计算日志相关的功能。

    • 为了支持所有列功能类型,可以将传入的日志消息与现有历史记录组合并发送到下游阶段。

    • 例如,要计算一个功能,该功能用于递增特定用户在一天内生成的日志数量的计数器,我们必须具有该用户过去 24 小时的日志历史记录。为了支持这一点,此阶段会将新日志与现有历史记录组合到一个 DataFrame 中。

    • 下游阶段有责任区分新日志和现有历史记录。

参数

类型

描述

c morpheus.config.Config Morpheus 配置对象
min_history int 排除记录少于 min_history 的用户,将此设置为 1 有效地禁用此功能
min_increment int 排除自上一批次以来添加的新记录少于 min_increment 的用户的传入批次,将此设置为 0 有效地禁用此功能
max_history int, strNone 如果不为 None,则最多包含 max_history 条记录。当 max_history 为 int 时,将包含最后 max_history 条记录。当 max_historystr 时,假定它表示 pandas.Timedelta 可解析的持续时间,并且仅包含 [最新时间戳 - max_history,最新时间戳] 窗口内的记录。
cache_dir str 可选的缓存目录路径,缓存项将存储在 cache_dir 下名为 rolling-user-data 的子目录中,如果此目录以及 cache_dir 尚不存在,则将创建它们。

注意: 此阶段计算传入 DataFrame 的第一行和最后一行的行哈希值,因此,其中包含的所有数据都必须是可哈希的,任何不可哈希的值(例如 lists)都应在 DFPFileToDataFrameStage 中删除或转换为可哈希类型。

预处理阶段 ( DFPPreprocessingStage )

DFPPreprocessingStage 阶段,预处理的实际逻辑在 input_schema 参数中定义。由于此阶段发生在 DFPFileBatcherStageDFPSplitUsersStage 阶段之后,因此传入 DataFrame 中的所有记录都仅对应于特定时间段内的单个用户,从而允许按用户按时间段计算列,例如上面提到的 logcountlocincrement 功能。使在此阶段执行的处理类型与在 DFPFileToDataFrameStage 中执行的处理类型不同。

参数

类型

描述

c morpheus.config.Config Morpheus 配置对象
input_schema DataFrameInputSchema 指定要包含在输出 DataFrame 中的列的模式,包括计算列
dfp_training_overview.png

训练必须从通用用户模型开始,该模型使用所有用户的日志进行训练。此模型充当没有足够训练数据的用户和帐户的后备模型。通用用户的名称在 Morpheus 配置对象的 ae.fallback_username 属性中定义,默认为 generic_user

在训练通用模型后,可以训练单个用户模型。单个用户模型提供更好的准确性,但需要足够的数据。许多用户没有足够的数据来准确训练模型。

训练阶段

训练阶段 ( DFPTraining )

DFPTraining 为每个传入 DataFrame 训练一个模型,并发射 morpheus.messages.ControlMessage 的一个实例,其中包含训练后的模型。

参数

类型

描述

c morpheus.config.Config Morpheus 配置对象
model_kwargs dictNone 可选的关键字参数字典,用于在构造模型时使用。有关可用选项的信息,请参阅 AutoEncoder
epochs int 训练 epoch 的数量。默认为 30。
validation_size float 用于训练验证的输入数据集的比例。应介于 0.0 和 1.0 之间。默认为 0.0。

MLflow 模型写入器阶段 ( DFPMLFlowModelWriterStage )

DFPMLFlowModelWriterStage 阶段将训练后的模型发布到 MLflow,跳过任何缺少足够训练数据的模型(当前要求的最低记录数为 300 条日志记录)。

参数

类型

描述

c morpheus.config.Config Morpheus 配置对象
model_name_formatter str 用于控制存储在 MLflow 中的模型名称的可选格式字符串,默认为 dfp-{user_id}。当前可用的字段名称为:user_iduser_md5,它是 hash.hexdigest 返回的 md5 十六进制摘要。
experiment_name_formatter str 用于控制存储在 MLflow 中的模型实验名称的可选格式字符串,默认为 /dfp-models/{reg_model_name}。当前可用的字段名称为:user_iduser_md5reg_model_name,它是应用字段名称后由 model_name_formatter 定义的模型名称。
databricks_permissions dictNone 可选,当非 None 时,设置使用 Databricks 托管的 MLflow 服务器时所需的权限

注意: 如果使用远程 MLflow 服务器,用户需要在启动管道之前调用 mlflow.set_tracking_uri

dfp_inference_overview.png

推理阶段

推理阶段 ( DFPInferenceStage )

DFPInferenceStage 阶段从 MLflow 加载模型并对这些模型执行推理。此阶段发出一条消息,其中包含原始 DataFrame 以及包含 Z 分数 (mean_abs_z) 的新列,以及生成该分数的模型的名称和版本 (model_version)。对于模型中的每个特征,还将添加三个额外的列

  • <feature name>_loss : 损失

  • <feature name>_z_loss : 损失 z 分数

  • <feature name>_pred : 预测值

对于一个名为 result 的假设特征,添加的三个列将是:result_lossresult_z_lossresult_pred

为了提高性能,从 MLflow 获取的模型会在本地缓存,缓存时间最长为 10 分钟,从而允许例行更新已更新的模型。除了缓存单个模型外,该阶段还维护可用模型的缓存,因此新训练的用户模型发布到 MLflow 后,对于已运行的推理管道,在最多 10 分钟内是不可见的。

对于任何在 MLflow 中没有关联模型的用户,将使用通用用户的模型。通用用户的名称在 Morpheus 配置对象的 ae.fallback_username 属性中定义,默认为 generic_user

参数

类型

描述

c morpheus.config.Config Morpheus 配置对象
model_name_formatter str 用于控制从 MLflow 获取的模型名称的格式字符串。当前可用的字段名称为:user_iduser_md5,它是 hash.hexdigest 返回的 md5 十六进制摘要。

过滤器检测阶段 ( FilterDetectionsStage )

FilterDetectionsStage 阶段过滤来自推理阶段的输出,以查找任何异常消息。超过指定 Z 分数的日志将传递到下一阶段。所有低于阈值的剩余日志将被丢弃。对于 DFP 管道的目的,此阶段配置为使用 DataFramemean_abs_z 列作为过滤条件。

名称

类型

默认值

描述

threshold float 0.5 阈值,高于此阈值的日志被认为是异常的。默认值为 0.5;但是,DFP 管道使用 2.0 的值。所有正常日志都将被过滤掉,异常日志将被传递。
copy bool True copy 参数为 True(默认值)时,满足过滤条件的行将复制到新的 DataFrame 中。当 False 时,将改用切片视图。这是一种性能优化,对功能没有影响。
filter_source FilterSource FilterSource.Auto 指示过滤条件是否存在于输出张量 (FilterSource.TENSOR) 或 DataFrame 中的列 (FilterSource.DATAFRAME) 中。
field_name str probs 用作过滤条件的张量 (filter_source=FilterSource.TENSOR) 或 DataFrame 列 (filter_source=FilterSource.DATAFRAME) 的名称。

后处理阶段 ( DFPPostprocessingStage )

DFPPostprocessingStage 阶段向 DataFrame 添加一个新的 event_time 列,指示 Morpheus 检测到异常消息的时间,并将任何 NAN 值替换为字符串值 'NaN'

上一个 数字指纹 (DFP)
下一个 Python Morpheus 模块
© 版权所有 2024, NVIDIA。 最后更新于 2024 年 12 月 3 日。