重要提示
您正在查看 NeMo 2.0 文档。此版本引入了 API 的重大更改和一个新的库 NeMo Run。我们目前正在将 NeMo 1.0 的所有功能移植到 2.0。有关先前版本或 2.0 中尚未提供的功能的文档,请参阅 NeMo 24.07 文档。
带有 Dask 的 CPU 和 GPU 模块#
NeMo Curator 除了 CPU 模块外,还提供 GPU 加速模块。这些模块基于 RAPIDS,以支持将工作流程扩展到海量数据集大小。其余模块基于 CPU,并依赖 Dask 扩展到多节点集群。当使用这些不同的模块时,重要的是了解如何正确设置您的 Dask 集群,以及如何管理数据集在内存中的存储位置。
初始化 Dask 集群#
NeMo Curator 提供了一个简单的函数 get_client
,可用于启动本地 Dask 集群或连接到现有集群。examples/
中的所有示例都使用它来设置 Dask 集群。
from nemo_curator.utils.distributed_utils import get_client
# Set up Dask client
client = get_client(cluster_type="cpu")
# Perform some computation...
get_client
接受许多参数,允许您初始化或连接到现有 Dask 集群。
cluster_type
控制要创建的 Dask 集群的类型。“cpu” 将创建一个基于 CPU 的本地 Dask 集群,而 “gpu” 将创建一个基于 GPU 的本地集群。如果指定 “cpu”,则可以使用n_workers
参数指定集群启动的进程数。默认情况下,此参数设置为os.cpu_count()
。如果指定 “gpu”,则每个 GPU 启动一个工作进程。可以在 GPU 集群上完全运行基于 CPU 的工作流程,尽管进程计数(以及因此并行任务的数量)将受到机器上 GPU 数量的限制。scheduler_address
和scheduler_file
用于连接到现有 Dask 集群。如果您在 Slurm 或 Kubernetes 上运行 Dask 集群,则提供其中之一至关重要。如果传递了其中任何一个,则将忽略所有其他参数,因为集群配置将在您创建调度程序并在集群上工作时完成。其余参数可以在 此处 修改。
CPU 模块#
如 DocumentDataset
文档中所述,NeMo Curator 中数据集的底层存储格式只是 Dask 数据帧。对于 CPU 模块,Dask 使用 pandas 数据帧来保存数据帧分区。NeMo Curator 中的大多数模块都是基于 CPU 的。因此,读取和写入数据集的默认行为是在 CPU 内存中使用 pandas 后端对其进行操作。以下两个函数调用是等效的。
books = DocumentDataset.read_json(files, add_filename=True)
books = DocumentDataset.read_json(files, add_filename=True, backend="pandas")
GPU 模块#
以下 NeMo Curator 模块是基于 GPU 的。
精确去重
模糊去重
语义去重
分布式数据分类
领域分类(英语和多语言)
质量分类
AEGIS 和 Instruction Data Guard 安全模型
FineWeb 教育内容分类
FineWeb Mixtral 和 FineWeb Nemotron-4 教育模型
内容类型分类
提示任务和复杂性分类
GPU 模块使用 cudf
后端而不是 pandas
后端存储 DocumentDataset
。要将数据集读取到 GPU 内存中,可以使用以下函数调用。
gpu_books = DocumentDataset.read_json(files, add_filename=True, backend="cudf")
即使您启动了 GPU dask 集群,您也无法操作使用 pandas
后端的数据集。DocuemntDataset
必须最初使用 cudf
后端读取,或者必须在脚本期间传输。
在 CPU 和 GPU 之间移动数据#
ToBackend
模块提供了一种通过在数据集的 pandas 和 cuDF 后端之间交换来在 CPU 内存和 GPU 内存之间移动数据的方法。要了解其工作原理,请查看此示例。
from nemo_curator import Sequential, ToBackend, ScoreFilter, get_client
from nemo_curator.datasets import DocumentDataset
from nemo_curator.classifiers import DomainClassifier
from nemo_curator.filters import RepeatingTopNGramsFilter, NonAlphaNumericFilter
def main():
client = get_client(cluster_type="gpu")
dataset = DocumentDataset.read_json("books.jsonl")
curation_pipeline = Sequential([
ScoreFilter(RepeatingTopNGramsFilter(n=5)),
ToBackend("cudf"),
DomainClassifier(),
ToBackend("pandas"),
ScoreFilter(NonAlphaNumericFilter()),
])
curated_dataset = curation_pipeline(dataset)
curated_dataset.to_json("curated_books.jsonl")
if __name__ == "__main__":
main()
让我们重点介绍此示例的一些重要部分。
client = get_client(cluster_type="gpu")
:创建一个可以访问 GPU 的本地 Dask 集群。为了使用/交换到 cuDF 数据帧后端,您需要确保在 GPU Dask 集群上运行。dataset = DocumentDataset.read_json("books.jsonl")
:默认情况下将数据集读取到 pandas (CPU) 后端。curation_pipeline = ...
:定义一个策展管道,该管道由 CPU 过滤步骤、GPU 分类器步骤和另一个 CPU 过滤步骤组成。ToBackend("cudf")
将数据集从 CPU 移动到 GPU 以用于分类器,而ToBackend("pandas")
将数据集从 GPU 移回 CPU 以用于最后一个过滤器。curated_dataset.to_json("curated_books.jsonl")
:直接从 GPU 将数据集写入磁盘。在写入磁盘之前,无需传输回 CPU。
Dask 与 Slurm#
我们在 examples/slurm
中提供了一个示例 Slurm 脚本管道。此管道有一个脚本 start-slurm.sh
,它提供了类似于 get_client
提供的配置选项。每个 Slurm 集群都不同,因此请确保您了解 Slurm 集群的工作方式,以便可以轻松地调整脚本。start-slurm.sh
调用 containter-entrypoint.sh
,后者在集群中设置 Dask 调度程序和工作进程。
我们的 Python 示例旨在使其可以独立在本地运行,或者轻松地替换到 start-slurm.sh
脚本中以在多个节点上运行。您也可以通过简单地遵循添加带有 add_distributed_args
的 get_client
的模式来轻松调整您的脚本。
Dask 与 K8s#
我们还提供了一个示例指南,说明如何在 Kubernetes 集群上开始使用 NeMo Curator。
请访问 curator_kubernetes 了解更多信息。