重要提示

您正在查看 NeMo 2.0 文档。此版本引入了 API 的重大更改和一个新的库 NeMo Run。我们目前正在将 NeMo 1.0 的所有功能移植到 2.0。有关先前版本或 2.0 中尚未提供的功能的文档,请参阅 NeMo 24.07 文档

带有 Dask 的 CPU 和 GPU 模块#

NeMo Curator 除了 CPU 模块外,还提供 GPU 加速模块。这些模块基于 RAPIDS,以支持将工作流程扩展到海量数据集大小。其余模块基于 CPU,并依赖 Dask 扩展到多节点集群。当使用这些不同的模块时,重要的是了解如何正确设置您的 Dask 集群,以及如何管理数据集在内存中的存储位置。

初始化 Dask 集群#

NeMo Curator 提供了一个简单的函数 get_client,可用于启动本地 Dask 集群或连接到现有集群。examples/ 中的所有示例都使用它来设置 Dask 集群。

from nemo_curator.utils.distributed_utils import get_client

# Set up Dask client
client = get_client(cluster_type="cpu")

# Perform some computation...

get_client 接受许多参数,允许您初始化或连接到现有 Dask 集群。

  • cluster_type 控制要创建的 Dask 集群的类型。“cpu” 将创建一个基于 CPU 的本地 Dask 集群,而 “gpu” 将创建一个基于 GPU 的本地集群。如果指定 “cpu”,则可以使用 n_workers 参数指定集群启动的进程数。默认情况下,此参数设置为 os.cpu_count()。如果指定 “gpu”,则每个 GPU 启动一个工作进程。可以在 GPU 集群上完全运行基于 CPU 的工作流程,尽管进程计数(以及因此并行任务的数量)将受到机器上 GPU 数量的限制。

  • scheduler_addressscheduler_file 用于连接到现有 Dask 集群。如果您在 Slurm 或 Kubernetes 上运行 Dask 集群,则提供其中之一至关重要。如果传递了其中任何一个,则将忽略所有其他参数,因为集群配置将在您创建调度程序并在集群上工作时完成。

  • 其余参数可以在 此处 修改。

CPU 模块#

DocumentDataset 文档中所述,NeMo Curator 中数据集的底层存储格式只是 Dask 数据帧。对于 CPU 模块,Dask 使用 pandas 数据帧来保存数据帧分区。NeMo Curator 中的大多数模块都是基于 CPU 的。因此,读取和写入数据集的默认行为是在 CPU 内存中使用 pandas 后端对其进行操作。以下两个函数调用是等效的。

books = DocumentDataset.read_json(files, add_filename=True)
books = DocumentDataset.read_json(files, add_filename=True, backend="pandas")

GPU 模块#

以下 NeMo Curator 模块是基于 GPU 的。

  • 精确去重

  • 模糊去重

  • 语义去重

  • 分布式数据分类

    • 领域分类(英语和多语言)

    • 质量分类

    • AEGIS 和 Instruction Data Guard 安全模型

    • FineWeb 教育内容分类

    • FineWeb Mixtral 和 FineWeb Nemotron-4 教育模型

    • 内容类型分类

    • 提示任务和复杂性分类

GPU 模块使用 cudf 后端而不是 pandas 后端存储 DocumentDataset。要将数据集读取到 GPU 内存中,可以使用以下函数调用。

gpu_books = DocumentDataset.read_json(files, add_filename=True, backend="cudf")

即使您启动了 GPU dask 集群,您也无法操作使用 pandas 后端的数据集。DocuemntDataset 必须最初使用 cudf 后端读取,或者必须在脚本期间传输。

在 CPU 和 GPU 之间移动数据#

ToBackend 模块提供了一种通过在数据集的 pandas 和 cuDF 后端之间交换来在 CPU 内存和 GPU 内存之间移动数据的方法。要了解其工作原理,请查看此示例。

from nemo_curator import Sequential, ToBackend, ScoreFilter, get_client
from nemo_curator.datasets import DocumentDataset
from nemo_curator.classifiers import DomainClassifier
from nemo_curator.filters import RepeatingTopNGramsFilter, NonAlphaNumericFilter

def main():
    client = get_client(cluster_type="gpu")

    dataset = DocumentDataset.read_json("books.jsonl")
    curation_pipeline = Sequential([
        ScoreFilter(RepeatingTopNGramsFilter(n=5)),
        ToBackend("cudf"),
        DomainClassifier(),
        ToBackend("pandas"),
        ScoreFilter(NonAlphaNumericFilter()),
    ])

    curated_dataset = curation_pipeline(dataset)

    curated_dataset.to_json("curated_books.jsonl")

if __name__ == "__main__":
    main()

让我们重点介绍此示例的一些重要部分。

  • client = get_client(cluster_type="gpu"):创建一个可以访问 GPU 的本地 Dask 集群。为了使用/交换到 cuDF 数据帧后端,您需要确保在 GPU Dask 集群上运行。

  • dataset = DocumentDataset.read_json("books.jsonl"):默认情况下将数据集读取到 pandas (CPU) 后端。

  • curation_pipeline = ...:定义一个策展管道,该管道由 CPU 过滤步骤、GPU 分类器步骤和另一个 CPU 过滤步骤组成。ToBackend("cudf") 将数据集从 CPU 移动到 GPU 以用于分类器,而 ToBackend("pandas") 将数据集从 GPU 移回 CPU 以用于最后一个过滤器。

  • curated_dataset.to_json("curated_books.jsonl"):直接从 GPU 将数据集写入磁盘。在写入磁盘之前,无需传输回 CPU。

Dask 与 Slurm#

我们在 examples/slurm 中提供了一个示例 Slurm 脚本管道。此管道有一个脚本 start-slurm.sh,它提供了类似于 get_client 提供的配置选项。每个 Slurm 集群都不同,因此请确保您了解 Slurm 集群的工作方式,以便可以轻松地调整脚本。start-slurm.sh 调用 containter-entrypoint.sh,后者在集群中设置 Dask 调度程序和工作进程。

我们的 Python 示例旨在使其可以独立在本地运行,或者轻松地替换到 start-slurm.sh 脚本中以在多个节点上运行。您也可以通过简单地遵循添加带有 add_distributed_argsget_client 的模式来轻松调整您的脚本。

Dask 与 K8s#

我们还提供了一个示例指南,说明如何在 Kubernetes 集群上开始使用 NeMo Curator。

请访问 curator_kubernetes 了解更多信息。