数字指纹 (DFP) 参考

管道结构配置

训练和推理管道中的阶段可以混合和匹配,影响很小,也就是说,MultiFileSource
可以配置为从 S3 或本地文件拉取数据,并且可以完全替换为任何其他 Morpheus 输入阶段。同样,S3 写入器可以替换为任何 Morpheus 输出阶段。无论输入和输出如何,核心管道都应保持不变。虽然管道核心(上图蓝色区域内)的阶段执行应配置而非交换的常见操作。
Morpheus Config
对于推理和训练管道,Morpheus Config
对象应使用相同的值构建,例如
import os
from morpheus.config import Config
from morpheus.config import ConfigAutoEncoder
from morpheus.cli.utils import get_package_relative_file
from morpheus.utils.file_utils import load_labels_file
config = Config()
config.num_threads = len(os.sched_getaffinity(0))
config.ae = ConfigAutoEncoder()
config.ae.feature_columns = load_labels_file(get_package_relative_file("data/columns_ae_azure.txt"))
可能需要的其他属性
属性 |
类型 |
默认值 |
描述 |
---|---|---|---|
Config.ae.userid_column_name |
str |
userIdentityaccountId |
DataFrame 中包含用户名或用户 ID 的列 |
Config.ae.timestamp_column_name |
str |
timestamp |
DataFrame 中包含事件时间戳的列 |
Config.ae.fallback_username |
str |
generic_user |
通用用户模型使用的名称,不应与任何真实用户的名称匹配 |
模式定义
DataFrame 输入模式 ( DataFrameInputSchema
)
DataFrameInputSchema
类定义了模式,指定要包含在输出 DataFrame
中的列。在 DFP 管道中,有两个阶段执行预处理,即 DFPFileToDataFrameStage
阶段和 DFPPreprocessingStage
。这种将预处理阶段与需要执行的实际操作解耦的方式允许在管道中用户定义实际模式,并实现阶段的可重用性。由用户来定义将出现在 DataFrame
中的字段。输入数据中未在 column_info
或 preserve_columns
构造函数参数中指定的任何列都不会出现在输出中。例外情况是在 json_columns
参数中指定的 JSON 字段,该参数定义了要规范化的 JSON 字段。
重要的是要注意,在处理 column_info
中的字段之前,会先规范化 json_columns
中定义的字段,从而允许对嵌套在 JSON 列中的字段执行处理。例如,假设我们有一个名为 event
的 JSON 字段,其中包含一个名为 timestamp
的键,该键在 JSON 数据中显示为 ISO 8601 格式的日期字符串,我们可以确保将其转换为 datetime
对象以供下游阶段使用,方法如下
from morpheus.utils.column_info import DataFrameInputSchema
from morpheus.utils.column_info import DateTimeColumn
schema = DataFrameInputSchema(
json_columns=['event'],
column_info=[DateTimeColumn(name=config.ae.timestamp_column_name, dtype=datetime, input_name='event.timestamp')])
在上面的示例中,执行了三个操作
event
JSON 字段被规范化,从而产生以event.
为前缀的新字段,以便包含在输出DataFrame
中。新创建的字段
event.timestamp
被解析为datetime
字段。由于 DFP 管道显式要求时间戳字段,我们使用
config.ae.timestamp_column_name
属性命名这个新列,确保它与管道配置匹配。当name
和input_name
相同时,旧字段将被覆盖,当它们不同时,将创建一个新字段。
DFPFileToDataFrameStage
首先执行,负责展平可能嵌套的 JSON 数据并执行任何类型的数据类型转换。DFPPreprocessingStage
在 DFPSplitUsersStage
之后执行,从而允许按用户计算字段的可能性,例如前面提到的 logcount
和 locincrement
字段。这两个阶段都在 DFPFileBatcherStage
之后执行,从而允许按时间段(默认情况下为每天)计算字段。
参数 |
类型 |
描述 |
---|---|---|
json_columns |
List[str] |
传入 DataFrame 中要规范化的 JSON 列的可选列表(当前使用 pandas.json_normalize 方法)。JSON 字段中的每个键都将被展平为名为 <field>.<key> 的新列,例如,名为 user 的 JSON 字段包含名为 id 的键将导致一个名为 user.id 的新列。默认情况下,这是一个空 list 。 |
column_info |
List[str] |
可选的 ColumnInfo 实例列表,每个实例定义要对列执行的特定操作。这些操作包括重命名、类型转换和自定义计算。默认情况下,这是一个空 list 。 |
preserve_columns |
List[str] 或 str |
可选的正则表达式字符串或正则表达式字符串列表,用于定义应在输出 DataFrame 中保留的输入数据中的列。默认情况下,这是一个空 list 。 |
row_filter |
function 或 None |
可选函数,在执行所有其他处理后调用。此函数接收 DataFrame 作为其唯一参数,并返回一个 DataFrame 。 |
列信息 ( ColumnInfo
)
定义单个列和类型转换。
参数 |
类型 |
描述 |
---|---|---|
name |
str |
列名 |
dtype |
str 或 Python 类型 |
pandas 识别的任何类型字符串或 Python 类 |
自定义列 ( CustomColumn
)
ColumnInfo
的子类,定义要由用户定义的函数 process_column_fn
计算的列。
参数 |
类型 |
描述 |
---|---|---|
name |
str |
列名 |
dtype |
str 或 Python 类型 |
pandas 识别的任何类型字符串或 Python 类 |
process_column_fn |
function |
函数,接收整个 DataFrame 作为其唯一输入,并返回一个新的 pandas.Series 对象,该对象将存储在列 name 中。 |
input_column_types |
dict[str, str] |
成功处理此列所需的输入列和预期 dtype 。将此设置为 None 将传递所有列。指定需要的列可以提高性能。 |
重命名列 ( RenameColumn
)
ColumnInfo
的子类,添加了执行重命名的功能。
参数 |
类型 |
描述 |
---|---|---|
name |
str |
目标列的名称 |
dtype |
str 或 Python 类型 |
pandas 识别的任何类型字符串或 Python 类 |
input_name |
str |
原始列名 |
布尔列 ( BoolColumn
)
RenameColumn
的子类,添加了将一组自定义值映射为布尔值的功能。例如,假设我们有一个字符串输入字段,其中包含五个可能的 enum
值之一:OK
、SUCCESS
、DENIED
、CANCELED
和 EXPIRED
,我们可以将这些值映射到单个布尔字段,如下所示
from morpheus.utils.column_info import BoolColumn
field = BoolColumn(name="result",
dtype=bool,
input_name="result",
true_values=["OK", "SUCCESS"],
false_values=["DENIED", "CANCELED", "EXPIRED"])
我们在本例中使用了字符串;但是,我们也可以同样轻松地映射整数状态代码。我们还可以通过为 true 和 false 提供自定义值(例如,1
/0
、yes
/no
)来映射到布尔值以外的类型。
参数 |
类型 |
描述 |
---|---|---|
name |
str |
目标列的名称 |
dtype |
str 或 Python 类型 |
通常这应该是 bool ;但是,如果指定了 true_value 和 false_value ,则它可能是另一种类型。 |
input_name |
str |
原始列名 |
true_value |
Any | 要为 true 值存储的可选值,应为 dtype 类型。默认为 True 。 |
false_value |
Any | 要为 false 值存储的可选值,应为 dtype 类型。默认为 False 。 |
true_values |
List[str] |
要解释为 true 的字符串值列表。 |
false_values |
List[str] |
要解释为 false 的字符串值列表。 |
日期-时间列 ( DateTimeColumn
)
RenameColumn
的子类,专门用于转换 UTC 本地化 datetime
值。当传入值包含时区偏移字符串时,这些值将转换为 UTC,而没有时区的值则假定为 UTC。
参数 |
类型 |
描述 |
---|---|---|
name |
str |
目标列的名称 |
dtype |
str 或 Python 类型 |
pandas 识别的任何类型字符串或 Python 类 |
input_name |
str |
原始列名 |
字符串连接列 ( StringJoinColumn
)
RenameColumn
的子类,通过按 sep
连接,将传入的 list
值转换为字符串。
参数 |
类型 |
描述 |
---|---|---|
name |
str |
目标列的名称 |
dtype |
str 或 Python 类型 |
pandas 识别的任何类型字符串或 Python 类 |
input_name |
str |
原始列名 |
sep |
str |
用于连接的分隔符字符串 |
字符串连接列 ( StringCatColumn
)
ColumnInfo
的子类,将多个列中的值连接成一个新的字符串列,并用 sep
分隔。
参数 |
类型 |
描述 |
---|---|---|
name |
str |
目标列的名称 |
dtype |
str 或 Python 类型 |
pandas 识别的任何类型字符串或 Python 类 |
input_columns |
List[str] |
要连接的列的列表 |
sep |
str |
分隔符字符串 |
递增列 ( IncrementColumn
)
DateTimeColumn
的子类,根据 input_name
字段中的日期,计算 groupby_column
中特定时间窗口 period
内值的唯一出现次数。
参数 |
类型 |
描述 |
---|---|---|
name |
str |
目标列的名称 |
dtype |
str 或 Python 类型 |
应为 int 或其他整数类 |
input_name |
str |
包含时间戳值的原始列名 |
groupby_column |
str |
要分组依据的列名 |
period |
str |
执行计算的可选时间段,值必须是 pandas 的偏移字符串之一。默认为 D 一天 |
输入阶段

源阶段 ( MultiFileSource
)
MultiFileSource
(python/morpheus/morpheus/modules/input/multi_file_source.py
) 接收路径或路径列表 (filenames
),并将共同作为 fsspec.core.OpenFiles
对象发射到管道中。路径可以包括通配符 *
以及 URL(例如:s3://path
)到远程存储提供商,如 S3、FTP、GCP、Azure、Databricks 和其他由 fsspec
定义的提供商。此外,可以通过在路径前加上 filecache::
(例如:filecache::s3://bucket-name/key-name
)在本地缓存路径。
注意: 此阶段实际上并不下载数据文件,允许在下载之前过滤和批处理文件列表。
参数 |
类型 |
描述 |
---|---|---|
c |
morpheus.config.Config |
Morpheus 配置对象 |
filenames |
List[str] 或 str |
要从中读取的源文件的路径 |
watch |
bool |
可选:当 True 时,将重复轮询 filenames 以查找新文件。这假定 filenames 中的至少一个路径包含通配符。默认为 False 。 |
watch_interval |
float |
当 watch 为 True 时,这是轮询 filenames 中的路径以查找新文件的时间间隔(秒)。当 watch 为 False 时忽略。 |
文件批处理阶段 ( DFPFileBatcherStage
)
DFPFileBatcherStage
(python/morpheus_dfp/morpheus_dfp/stages/dfp_file_batcher_stage.py
) 将传入 DataFrame
中的数据按时间段(默认每天)分组为批次,并可选择将传入数据过滤到特定的时间窗口。此阶段可以通过将多个小文件合并到一个批次中来潜在地提高性能。此阶段假定日志的日期可以轻松推断,例如将创建时间编码在文件名中(例如,AUTH_LOG-2022-08-21T22.05.23Z.json
),或使用文件系统报告的修改时间。提取日期的实际方法编码在用户提供的 date_conversion_func
函数中(稍后详细介绍)。
参数 |
类型 |
描述 |
---|---|---|
c |
morpheus.config.Config |
Morpheus 配置对象 |
date_conversion_func |
function |
函数接收单个 fsspec.core.OpenFile 参数并返回一个 datetime.datetime 对象 |
period |
str |
对数据进行分组的时间段,值必须是 pandas 的偏移字符串之一 |
sampling_rate_s |
int |
可选,默认值=None 。已弃用,请考虑改用 sampling 。定义后,将对传入数据文件的子集进行采样,每 sampling_rate_s 秒取第一行。 |
start_time |
datetime |
可选,默认值=None 。如果不为 None,则将过滤传入的数据文件,排除在 start_time 之前创建的任何文件 |
end_time |
datetime |
可选,默认值=None 。如果不为 None,则将过滤传入的数据文件,排除在 end_time 之后创建的任何文件 |
sampling |
str , float , int |
可选,如果不为 None,则将对传入数据文件的子集进行采样。当为字符串时,该值被解释为 pandas 频率。将取每个频率的第一行。当值在 [0,1) 之间时,将取一定百分比的行。当值大于 1 时,该值被解释为要取的随机行数。 |
对于日志文件的创建日期编码在文件名中的情况,可以使用 morpheus/utils/file_utils.py
模块中的 date_extractor
。date_extractor
假定时间戳已本地化为 UTC,并且在作为参数传递给 DFPFileBatcherStage
之前,需要将其绑定到正则表达式模式。正则表达式模式需要包含以下命名组:year
、month
、day
、hour
、minute
、second
,以及可选的 microsecond
。如果正则表达式与 date_extractor
函数不匹配,则将回退到使用文件的修改时间。
对于包含 ISO 8601 格式日期字符串的输入文件,可以使用 iso_date_regex
正则表达式,例如
from functools import partial
from morpheus.utils.file_utils import date_extractor
from morpheus_dfp.utils.regex_utils import iso_date_regex
# Batch files into buckets by time. Use the default ISO date extractor from the filename
pipeline.add_stage(
DFPFileBatcherStage(config,
period="D",
date_conversion_func=functools.partial(date_extractor, filename_regex=iso_date_regex)))
注意: 如果 date_conversion_func
返回时区感知时间戳,则 start_time
和 end_time
(如果不为 None
)也需要是时区感知 datetime
对象。
文件到 DataFrame 阶段 ( DFPFileToDataFrameStage
)
DFPFileToDataFrameStage
(python/morpheus_dfp/morpheus_dfp/stages/dfp_file_to_df.py
) 阶段接收 fsspec.core.OpenFiles
的 list
,并将它们加载到单个 DataFrame
中,然后将其发射到管道中。当父阶段为 DFPFileBatcherStage
时,每个批次(通常为一天)都将连接到单个 DataFrame
中。如果父阶段为 MultiFileSource
,则整个数据集将加载到单个 DataFrame
中。因此,为 DFPFileBatcherStage
选择一个足够小的 period
参数非常重要,以便每个批次都可以容纳在内存中。
参数 |
类型 |
描述 |
---|---|---|
c |
morpheus.config.Config |
Morpheus 配置对象 |
schema |
DataFrameInputSchema |
指定要加载的列以及任何必要的重命名和数据类型转换的模式 |
filter_null |
bool |
可选:是否在加载后过滤空行,默认值为 True。 |
file_type |
morpheus.common.FileTypes (enum ) |
可选:指示要加载的文件类型。在编写本文时,当前支持的值为:FileTypes.Auto 、FileTypes.CSV 、FileTypes.JSON 和 FileTypes.PARQUET 。默认值为 FileTypes.Auto ,它将根据文件扩展名推断类型,如果使用自定义扩展名,请设置此值 |
parser_kwargs |
dict 或 None |
可选:要传递到 DataFrame 解析器的其他关键字参数,当前这将是 pandas.read_csv 、pandas.read_json 或 pandas.read_parquet |
cache_dir |
str |
可选:缓存位置的路径,默认为 ./.cache/dfp |
此阶段能够通过多种方法并发下载和加载数据文件。当前支持的方法有:single_thread
、dask
和 dask_thread
。使用的方法通过设置 MORPHEUS_FILE_DOWNLOAD_TYPE
环境变量来选择,默认使用 dask_thread
,而 single_thread
有效地禁用并发加载。
此阶段将在 cache_dir
中缓存生成的 DataFrame
,由于我们正在缓存 DataFrame
而不是源文件,因此缓存命中可以避免解析传入数据的成本。对于 S3 等远程存储系统,缓存命中可以避免解析和下载。这样做的结果之一是,对 schema
的任何更改都需要在 cache_dir
中清除缓存文件,然后这些更改才能可见。
注意: 此缓存是对使用可选 filecache::
前缀时可能发生的任何缓存的补充。
输出阶段

对于推理管道,除了下面记录的 WriteToS3Stage
之外,还可以使用任何 Morpheus 输出阶段,例如 WriteToFileStage
和 WriteToKafkaStage
。
写入文件阶段 ( WriteToFileStage
)
最后一个阶段将所有收到的消息写入 CSV 或 JSON 格式的单个输出文件。
参数 |
类型 |
描述 |
---|---|---|
c |
morpheus.config.Config |
Morpheus 配置对象 |
filename |
str |
要将异常日志消息写入的文件。 |
overwrite |
bool |
可选,默认为 False 。如果 filename 中指定的文件已存在,则如果将此选项设置为 True ,它将被覆盖 |
写入 S3 阶段 ( WriteToS3Stage
)
WriteToS3Stage
阶段将生成的异常检测结果写入 S3。WriteToS3Stage
将 S3 特定操作与 Morpheus 阶段解耦,因此接收 s3_writer
参数。
参数 |
类型 |
描述 |
---|---|---|
c |
morpheus.config.Config |
Morpheus 配置对象 |
s3_writer |
function |
用户定义的函数,接收 morpheus.messages.message_meta.MessageMeta 的实例并返回相同的消息实例。任何 S3 特定配置(例如存储桶名称)都应绑定到该方法。 |
核心管道
这些阶段在训练和推理管道中都很常见,与输入和输出阶段不同,这些阶段特定于 DFP 管道,旨在进行配置,但不可替换。
拆分用户阶段 ( DFPSplitUsersStage
)
DFPSplitUsersStage
阶段接收传入的 DataFrame
并发射 DFPMessageMeta
的 list
,其中每个 DFPMessageMeta
代表与给定用户关联的记录。这允许下游阶段按用户执行所有必要的操作。
参数 |
类型 |
描述 |
---|---|---|
c |
morpheus.config.Config |
Morpheus 配置对象 |
include_generic |
bool |
当 True 时,将为通用用户构造一个 DFPMessageMeta ,其中包含所有未被 skip_users 和 only_users 过滤器排除的记录 |
include_individual |
bool |
当 True 时,将为每个未被 skip_users 和 only_users 过滤器排除的用户构造一个 DFPMessageMeta 实例 |
skip_users |
List[str] 或 None |
要排除的用户列表,当 include_generic 为 True 时,排除的记录也将从通用用户中排除。与 only_users 互斥。 |
only_users |
List[str] 或 None |
将记录限制为特定用户列表,当 include_generic 为 True 时,通用用户的记录也将限制为此列表中的用户。与 skip_users 互斥。 |
滚动窗口阶段 ( DFPRollingWindowStage
)
DFPRollingWindowStage
阶段为 DFP 执行几个关键功能。
此阶段按用户保留日志的移动窗口
这些日志保存到磁盘,以减少同一用户的日志之间的内存需求
仅当满足窗口历史记录要求时,它才会发射日志
在满足所有窗口历史记录要求之前,不会将消息发送到管道的其余部分。
用于定义窗口历史记录要求的配置选项在下面详细介绍。
它重复必要的日志以正确计算日志相关的功能。
为了支持所有列功能类型,可以将传入的日志消息与现有历史记录组合并发送到下游阶段。
例如,要计算一个功能,该功能用于递增特定用户在一天内生成的日志数量的计数器,我们必须具有该用户过去 24 小时的日志历史记录。为了支持这一点,此阶段会将新日志与现有历史记录组合到一个
DataFrame
中。下游阶段有责任区分新日志和现有历史记录。
参数 |
类型 |
描述 |
---|---|---|
c |
morpheus.config.Config |
Morpheus 配置对象 |
min_history |
int |
排除记录少于 min_history 的用户,将此设置为 1 有效地禁用此功能 |
min_increment |
int |
排除自上一批次以来添加的新记录少于 min_increment 的用户的传入批次,将此设置为 0 有效地禁用此功能 |
max_history |
int , str 或 None |
如果不为 None ,则最多包含 max_history 条记录。当 max_history 为 int 时,将包含最后 max_history 条记录。当 max_history 为 str 时,假定它表示 pandas.Timedelta 可解析的持续时间,并且仅包含 [最新时间戳 - max_history ,最新时间戳] 窗口内的记录。 |
cache_dir |
str |
可选的缓存目录路径,缓存项将存储在 cache_dir 下名为 rolling-user-data 的子目录中,如果此目录以及 cache_dir 尚不存在,则将创建它们。 |
注意: 此阶段计算传入 DataFrame
的第一行和最后一行的行哈希值,因此,其中包含的所有数据都必须是可哈希的,任何不可哈希的值(例如 lists
)都应在 DFPFileToDataFrameStage
中删除或转换为可哈希类型。
预处理阶段 ( DFPPreprocessingStage
)
DFPPreprocessingStage
阶段,预处理的实际逻辑在 input_schema
参数中定义。由于此阶段发生在 DFPFileBatcherStage
和 DFPSplitUsersStage
阶段之后,因此传入 DataFrame
中的所有记录都仅对应于特定时间段内的单个用户,从而允许按用户按时间段计算列,例如上面提到的 logcount
和 locincrement
功能。使在此阶段执行的处理类型与在 DFPFileToDataFrameStage
中执行的处理类型不同。
参数 |
类型 |
描述 |
---|---|---|
c |
morpheus.config.Config |
Morpheus 配置对象 |
input_schema |
DataFrameInputSchema |
指定要包含在输出 DataFrame 中的列的模式,包括计算列 |

训练必须从通用用户模型开始,该模型使用所有用户的日志进行训练。此模型充当没有足够训练数据的用户和帐户的后备模型。通用用户的名称在 Morpheus 配置对象的 ae.fallback_username
属性中定义,默认为 generic_user
。
在训练通用模型后,可以训练单个用户模型。单个用户模型提供更好的准确性,但需要足够的数据。许多用户没有足够的数据来准确训练模型。
训练阶段
训练阶段 ( DFPTraining
)
DFPTraining
为每个传入 DataFrame
训练一个模型,并发射 morpheus.messages.ControlMessage
的一个实例,其中包含训练后的模型。
参数 |
类型 |
描述 |
---|---|---|
c |
morpheus.config.Config |
Morpheus 配置对象 |
model_kwargs |
dict 或 None |
可选的关键字参数字典,用于在构造模型时使用。有关可用选项的信息,请参阅 AutoEncoder 。 |
epochs |
int |
训练 epoch 的数量。默认为 30。 |
validation_size |
float |
用于训练验证的输入数据集的比例。应介于 0.0 和 1.0 之间。默认为 0.0。 |
MLflow 模型写入器阶段 ( DFPMLFlowModelWriterStage
)
DFPMLFlowModelWriterStage
阶段将训练后的模型发布到 MLflow,跳过任何缺少足够训练数据的模型(当前要求的最低记录数为 300 条日志记录)。
参数 |
类型 |
描述 |
---|---|---|
c |
morpheus.config.Config |
Morpheus 配置对象 |
model_name_formatter |
str |
用于控制存储在 MLflow 中的模型名称的可选格式字符串,默认为 dfp-{user_id} 。当前可用的字段名称为:user_id 和 user_md5 ,它是 hash.hexdigest 返回的 md5 十六进制摘要。 |
experiment_name_formatter |
str |
用于控制存储在 MLflow 中的模型实验名称的可选格式字符串,默认为 /dfp-models/{reg_model_name} 。当前可用的字段名称为:user_id 、user_md5 和 reg_model_name ,它是应用字段名称后由 model_name_formatter 定义的模型名称。 |
databricks_permissions |
dict 或 None |
可选,当非 None 时,设置使用 Databricks 托管的 MLflow 服务器时所需的权限 |
注意: 如果使用远程 MLflow 服务器,用户需要在启动管道之前调用 mlflow.set_tracking_uri
。

推理阶段
推理阶段 ( DFPInferenceStage
)
DFPInferenceStage
阶段从 MLflow 加载模型并对这些模型执行推理。此阶段发出一条消息,其中包含原始 DataFrame
以及包含 Z 分数 (mean_abs_z
) 的新列,以及生成该分数的模型的名称和版本 (model_version
)。对于模型中的每个特征,还将添加三个额外的列
<feature name>_loss
: 损失<feature name>_z_loss
: 损失 z 分数<feature name>_pred
: 预测值
对于一个名为 result
的假设特征,添加的三个列将是:result_loss
、result_z_loss
、result_pred
。
为了提高性能,从 MLflow 获取的模型会在本地缓存,缓存时间最长为 10 分钟,从而允许例行更新已更新的模型。除了缓存单个模型外,该阶段还维护可用模型的缓存,因此新训练的用户模型发布到 MLflow 后,对于已运行的推理管道,在最多 10 分钟内是不可见的。
对于任何在 MLflow 中没有关联模型的用户,将使用通用用户的模型。通用用户的名称在 Morpheus 配置对象的 ae.fallback_username
属性中定义,默认为 generic_user
。
参数 |
类型 |
描述 |
---|---|---|
c |
morpheus.config.Config |
Morpheus 配置对象 |
model_name_formatter |
str |
用于控制从 MLflow 获取的模型名称的格式字符串。当前可用的字段名称为:user_id 和 user_md5 ,它是 hash.hexdigest 返回的 md5 十六进制摘要。 |
过滤器检测阶段 ( FilterDetectionsStage
)
FilterDetectionsStage
阶段过滤来自推理阶段的输出,以查找任何异常消息。超过指定 Z 分数的日志将传递到下一阶段。所有低于阈值的剩余日志将被丢弃。对于 DFP 管道的目的,此阶段配置为使用 DataFrame
的 mean_abs_z
列作为过滤条件。
名称 |
类型 |
默认值 |
描述 |
---|---|---|---|
threshold |
float |
0.5 |
阈值,高于此阈值的日志被认为是异常的。默认值为 0.5 ;但是,DFP 管道使用 2.0 的值。所有正常日志都将被过滤掉,异常日志将被传递。 |
copy |
bool |
True |
当 copy 参数为 True (默认值)时,满足过滤条件的行将复制到新的 DataFrame 中。当 False 时,将改用切片视图。这是一种性能优化,对功能没有影响。 |
filter_source |
FilterSource |
FilterSource.Auto |
指示过滤条件是否存在于输出张量 (FilterSource.TENSOR ) 或 DataFrame 中的列 (FilterSource.DATAFRAME ) 中。 |
field_name |
str |
probs |
用作过滤条件的张量 (filter_source=FilterSource.TENSOR ) 或 DataFrame 列 (filter_source=FilterSource.DATAFRAME ) 的名称。 |
后处理阶段 ( DFPPostprocessingStage
)
DFPPostprocessingStage
阶段向 DataFrame 添加一个新的 event_time
列,指示 Morpheus 检测到异常消息的时间,并将任何 NAN
值替换为字符串值 'NaN'
。