Morpheus CPU 独有模式
默认情况下,Morpheus 旨在利用 GPU 进行加速处理。但是,在某些情况下,可能需要在无法访问 GPU 的系统上运行 Morpheus。为了满足这种需求,Morpheus 提供了仅 CPU 执行模式。Morpheus 中的许多阶段都需要 GPU 才能运行,而其他阶段可以在 GPU 和 CPU 执行模式下运行。尝试将仅 GPU 阶段添加到配置为仅 CPU 模式运行的管道将导致错误。
默认情况下,Morpheus 将在 GPU 执行模式下运行。用户可以选择使用 Python API 或从命令行指定执行模式。
Python API
执行模式在 morpheus.config.ExecutionMode
枚举中定义,然后在 morpheus.config.Config
对象的 execution_mode
属性中指定。以下示例演示了如何将管道的执行模式设置为仅 CPU
from morpheus.config import Config
from morpheus.config import ExecutionMode
config = Config()
config.execution_mode = ExecutionMode.CPU
注意:execution_mode
以及 morpheus.config.Config
对象的所有其他属性都必须在构建管道或任何阶段对象之前设置。首次使用 Config
对象构建管道或阶段时,Config
对象将冻结配置并阻止任何进一步的更改。
示例
examples/cpu_only
、examples/developer_guide/1_simple_python_stage
和 examples/developer_guide/2_2_rabbitmq
示例演示了能够在 GPU 和 CPU 执行模式下运行的管道。
命令行
--use_cpu_only
标志可用作 morpheus run
子命令的选项。
morpheus run --use_cpu_only pipeline-other --help
示例
以下是一个简单的命令行示例,说明了可以在仅 CPU 模式下执行的管道。首先,通过从 Morpheus 存储库的根目录运行以下命令,确保您已获取示例数据集
./scripts/fetch_data.py fetch examples
然后,运行以下命令来运行管道
morpheus --log_level=INFO \
run --use_cpu_only pipeline-other \
from-file --filename=examples/data/email_with_addresses.jsonlines \
deserialize \
monitor \
serialize \
to-file --filename=.tmp/output.jsonlines --overwrite
每个阶段的作者可以自行决定支持哪些执行模式。选项包括:CPU、GPU 或两者。如前所述,默认执行模式为 GPU;需要 GPU 的阶段的作者无需对其阶段定义进行任何更改。
DataFrames 和张量
执行模式的选择意味着 DataFrame 和张量类型的选择。在 GPU 模式下,Morpheus 将使用 cuDF DataFrame,张量表示为 CuPy ndarray
对象。在 CPU 模式下,Morpheus 将使用 pandas DataFrame 和 NumPy ndarray
对象。
模式 |
DataFrame |
张量 |
---|---|---|
GPU | cuDF | CuPy |
CPU | pandas | NumPy |
使用 @stage
和 @source
装饰器定义的阶段
@stage
和 @source
装饰器都有一个可选的 execution_modes
参数,该参数接受 morpheus.config.ExecutionMode
值的元组,用于指定阶段支持的执行模式。
仅 CPU 源和阶段示例
import logging
import pandas as pd
from morpheus.config import Config
from morpheus.config import ExecutionMode
from morpheus.messages import MessageMeta
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.pipeline.stage_decorator import source
from morpheus.pipeline.stage_decorator import stage
from morpheus.utils.logger import configure_logging
logger = logging.getLogger(f"morpheus.{__name__}")
@source(execution_modes=(ExecutionMode.CPU, ))
def simple_source(num_rows: int = 10) -> MessageMeta:
df = pd.DataFrame({"a": range(num_rows)})
message = MessageMeta(df)
yield message
@stage(execution_modes=(ExecutionMode.CPU, ))
def print_msg(msg: MessageMeta) -> MessageMeta:
logger.info(f"Receive a message with a DataFrame of type:{type(msg.df)}")
return msg
def main():
configure_logging(log_level=logging.INFO)
pipeline = LinearPipeline(config)
pipeline.set_source(simple_source(config))
pipeline.add_stage(print_msg(config))
pipeline.run()
if __name__ == "__main__":
main()
CPU 和 GPU 源和阶段示例
支持 CPU 和 GPU 执行模式都需要编写能够处理两种类型的 DataFrame 和 ndarray
对象的代码。在许多情况下,为 pandas 设计的代码也适用于 cuDF,为 NumPy 设计的代码也适用于 CuPy,而无需对代码进行任何更改。但是,在某些情况下,API 可能略有不同,并且需要了解有效负载类型。在没有 GPU 的系统上以 CPU 模式运行时,必须注意不要直接导入 cudf
或任何其他需要 GPU 的软件包。Morpheus 在 type_utils
模块中提供了一些辅助方法来帮助解决此问题,例如 is_cudf_type()
、get_df_class()
和 get_array_pkg()
。
通过一些简单的修改,之前的示例现在支持 CPU 和 GPU 执行模式。get_df_class
函数用于确定要使用的 DataFrame 类型,我们添加了一个命令行标志来在两种执行模式之间切换。
import logging
import click
from morpheus.config import Config
from morpheus.config import ExecutionMode
from morpheus.messages import MessageMeta
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.pipeline.stage_decorator import source
from morpheus.pipeline.stage_decorator import stage
from morpheus.utils.logger import configure_logging
from morpheus.utils.type_utils import get_df_class
logger = logging.getLogger(f"morpheus.{__name__}")
@source(execution_modes=(ExecutionMode.GPU, ExecutionMode.CPU))
def simple_source(num_rows: int = 10) -> MessageMeta:
df_class = get_df_class() # Returns either cudf.DataFrame or pandas.DataFrame
df = df_class({"a": range(num_rows)})
message = MessageMeta(df)
yield message
@stage(execution_modes=(ExecutionMode.GPU, ExecutionMode.CPU))
def print_msg(msg: MessageMeta) -> MessageMeta:
logger.info(f"Receive a message with a DataFrame of type:{type(msg.df)}")
return msg
@click.command()
@click.option('--use_cpu_only',
default=False,
type=bool,
is_flag=True,
help=("Whether or not to run in CPU only mode, setting this to True will disable C++ mode."))
def main(use_cpu_only: bool):
configure_logging(log_level=logging.INFO)
if use_cpu_only:
execution_mode = ExecutionMode.CPU
else:
execution_mode = ExecutionMode.GPU
config = Config()
config.execution_mode = execution_mode
pipeline = LinearPipeline(config)
pipeline.set_source(simple_source(config))
pipeline.add_stage(print_msg(config))
pipeline.run()
if __name__ == "__main__":
main()
源和阶段类
与 @source
和 @stage
装饰器类似,基于类的源和阶段也可以定义为声明它们支持哪些执行模式。所有源和阶段类的基类 StageBase
定义了一个 supported_execution_modes
方法用于此目的,该方法可以在派生类中重写。基类中的方法定义为
def supported_execution_modes(self) -> tuple[ExecutionMode]:
return (ExecutionMode.GPU, )
阶段作者可以自由检查阶段的构造函数参数,以确定支持哪些执行模式。但是,对于许多阶段,支持的执行模式不会根据构造函数参数而改变。在这些情况下,可以使用 GpuAndCpuMixin
和 CpuOnlyMixin
mixin 来简化实现。
类定义示例
from morpheus.cli.register_stage import register_stage
from morpheus.pipeline.execution_mode_mixins import GpuAndCpuMixin
from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin
from morpheus.pipeline.single_port_stage import SinglePortStage
@register_stage("pass-thru")
class PassThruStage(PassThruTypeMixin, GpuAndCpuMixin, SinglePortStage):
...
GpuAndCpuMixin
在之前的装饰器示例中,我们讨论了利用 type_utils
模块中提供的各种辅助方法来帮助编写能够在 CPU 和 GPU 执行模式下运行的代码。为了进一步简化这一点,GpuAndCpuMixin
mixin 将这些辅助方法添加到类中。在撰写本文时,它们是
df_type_str
- 返回"cudf"
或"pandas"
。get_df_pkg
- 返回cudf
或pandas
模块。get_df_class
- 返回cudf.DataFrame
或pandas.DataFrame
类。
具有 C++ 实现的阶段
C++ 阶段能够通过 libcudf 库与 cuDF DataFrame 交互;但是,pandas DataFrame 没有这样的 C++ 库。因此,任何包含 Python 和 C++ 实现的阶段,Python 实现都将在 CPU 模式下使用,而 C++ 实现将在 GPU 模式下使用。对于这些阶段,Python 实现可以自由地假设 DataFrame 的类型为 pandas.DataFrame
,张量的类型为 numpy.ndarray
。
仅包含 C++ 实现的阶段将无法在 CPU 模式下运行。