重要提示

您正在查看 NeMo 2.0 文档。此版本对 API 和一个新的库 NeMo Run 引入了重大更改。我们目前正在将 NeMo 1.0 的所有功能移植到 2.0。有关先前版本或 2.0 中尚不可用的功能的文档,请参阅 NeMo 24.07 文档

使用 DocumentDataset#

背景#

文本数据集负责存储元数据以及核心文本/文档。jsonl` 文件因其易于处理和检查而常用。parquet 文件也是一种常用格式。在这两种情况下,单个数据集通常由多个底层文件(称为分片)表示。例如,如果您有一个名为“books”的大型数据集,则很可能将其存储在分片中,每个分片的名称类似于 books_00.jsonlbooks_01.jsonlbooks_02.jsonl 等。

数据集在内存中的存储方式与在磁盘上的存储方式同样重要。如果您有一个大型数据集,它太大而无法直接放入内存,则必须以某种方式将其分布在多台机器/节点上。此外,如果整理数据集需要很长时间,则很可能因某些不可预见的故障或其他原因而中断。NeMo Curator 的 DocumentDataset 采用 Dask 的分布式数据帧来管理跨多个节点的大型数据集,并允许轻松重启中断的数据整理。DocumentDataset 支持读取和写入本地磁盘以及直接来自 S3 等远程源的分片 jsonlparquet 文件。

用法#

读取和写入#

DocumentDataset 是 NeMo Curator 中文本数据集的标准格式。假设我们有一个“books”数据集,其存储结构如下

books_dataset/
    books_00.jsonl
    books_01.jsonl
    books_02.jsonl

您可以使用以下方法读取、过滤和写入数据集

import nemo_curator as nc
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.filters import WordCountFilter

files = get_all_files_paths_under("books_dataset/")
books = DocumentDataset.read_json(files, add_filename=True)

filter_step = nc.ScoreFilter(
                WordCountFilter(min_words=80),
                text_field="text",
                score_field="word_count",
            )

long_books = filter_step(books)

long_books.to_json("long_books/", write_to_filename=True)

让我们逐行浏览这段代码。

  • files = get_all_files_paths_under("books_dataset/") 这会检索给定目录中所有文件的列表。在我们的例子中,这等同于编写

    files = ["books_dataset/books_00.jsonl",
             "books_dataset/books_01.jsonl",
             "books_dataset/books_02.jsonl"]
    
  • books = DocumentDataset.read_json(files, add_filename=True) 这会将列出的文件读入内存。add_filename=True 选项将分片的名称(books_00.jsonlbooks_01.jsonl 等)保留为额外的 file_name 字段。当数据集写回磁盘时,此选项(与 write_to_filename 选项和 filename_col 结合使用)确保文档保留在其原始分片中。这对于手动检查逐个分片过滤的结果很有用。add_filename 选项也可以用作字符串,在这种情况下,它将用作列的名称(而不是默认的 file_name)。

  • filter_step = ... 这会构建并应用文档长度的启发式过滤器。有关更多信息,请参阅文档的过滤页面。

  • long_books.to_json("long_books/", write_to_filename=True) 这会将过滤后的数据集写入新目录。如上所述,write_to_filename=True 保留数据集的分片。如果数据集在读取时未使用 add_filename=True,则设置 write_to_filename=True 将引发错误。如果数据集在读取时使用了 add_filename="path",则除了 write_to_filename=True 之外,还需要设置 filename_col="path"

DocumentDataset 只是 Dask 数据帧的包装器。可以使用 DocumentDataset.df 成员变量访问底层数据帧。重要的是要了解 Dask 如何处理计算。引用他们的 文档

Dask 是延迟评估的。计算的结果在您要求之前不会计算。相反,会生成用于计算的 Dask 任务图。

因此,对 DocumentDataset.read_json 的调用不会立即执行。相反,读取数据集每个分片的任务将放置在任务图上。只有在调用 DocumentDataset.df.compute() 或某些依赖于 DocumentDataset.df 的操作调用 .compute() 时,才会执行任务图。这使我们能够避免将大型数据集读入内存。在我们的例子中,long_books.to_json() 在内部调用 .compute(),因此任务图将在那时执行。

从中断处恢复#

跟踪数据集中哪些文档已被处理可能很有用,这样,如果长时间的数据整理作业中断,可以恢复它们。NeMo Curator 提供了一个实用程序,可以轻松跟踪哪些数据集分片已被处理。考虑一下上面代码的修改版本

from nemo_curator.utils.file_utils import get_remaining_files

files = get_remaining_files("books_dataset/", "long_books/", "jsonl")
books = DocumentDataset.read_json(files, add_filename=True)

filter_step = nc.ScoreFilter(
                WordCountFilter(min_words=80),
                text_field="text",
                score_field="word_count",
            )

long_books = filter_step(books)

long_books.to_json("long_books/", write_to_filename=True)

get_remaining_files 比较输入目录("books_dataset/")和输出目录("long_books"),并返回输入目录中尚未写入输出目录的所有分片的列表。

虽然 Dask 提供了一种避免将过多数据读入内存的简便方法,但在某些情况下,我们可能需要调用 persist() 或类似的强制数据集进入内存的操作。在这些情况下,我们建议使用围绕 get_remaining_files 的简单包装函数分批处理输入数据集,如下所示。

from nemo_curator.utils.file_utils import get_batched_files

for files in get_batched_files("books_dataset/", "long_books/", "jsonl", batch_size=64):
    books = DocumentDataset.read_json(files, add_filename=True)

    filter_step = nc.ScoreFilter(
                    WordCountFilter(min_words=80),
                    text_field="text",
                    score_field="word_count",
                )

    long_books = filter_step(books)

    long_books.to_json("long_books/", write_to_filename=True)

这将一次读取 64 个分片,处理它们,然后将它们写回磁盘。与 get_remaining_files 一样,它只包括输入目录中但不在输出目录中的文件。

混合和打乱#

混合来自多个来源的数据可能是提高下游模型性能的好方法。这种混合可以在模型训练本身期间完成(即,在线混合),也可以在训练之前完成(即,离线混合)。在线混合对于在训练过程中快速迭代很有用。同时,如果您想分发数据集,离线混合很有用。在线混合目前在 NeMo via NVIDIA Megatron Core 中是可能的,而 NeMo Curator 提供了一种离线执行混合的方法。

让我们看看如何使用 nc.blend_datasets 组合数据集

import nemo_curator as nc

books = DocumentDataset.read_json("books_dataset/")
articles = DocumentDataset.read_json("articles_dataset/")
journals = DocumentDataset.read_json("journals_dataset/")

datasets = [books, articles, journals]
target_samples = 1000
weights = [5.0, 2.0, 1.0]

blended_dataset = nc.blend_datasets(target_samples, datasets, weights)

blended_dataset.to_json("blended_dataset/")
  • datasets = [books, articles, journals] 在这里,我们选择混合三个不同的数据集。这些数据集不必具有相同的文件格式或相似的大小。只要它们可以作为 DocumentDataset 读入即可。每个数据集的样本始终“按顺序”抽取。精确的顺序取决于格式。对于分片的 jsonl 文件,将首先选择排序顺序中文件名开头的条目。

  • target_samples = 1000 这是结果数据集中所需的样本数。样本是指文档或通常只是单个数据点。根据权重,数据集中最终可能会有更多样本。

  • weights = [5.0, 2.0, 1.0] 应从每个数据集中抽取的样本的相对数量。给定这些权重,混合数据集将具有五倍于期刊样本的书籍样本。同样,与期刊样本相比,文章样本的数量将是期刊样本的两倍。权重可以是实数列表(非负数)。nc.blend_datasets 将执行归一化,并将归一化权重与目标样本结合,以确定应从每个数据集中抽取多少样本。在 books 数据集的情况下,计算如下。

    \[\lceil target\_samples \cdot w_i\rceil=\lceil 1000\cdot \frac{5}{8}\rceil=625\]

    如果任何数据集的样本少于计算出的权重,则将对其进行过采样以满足配额。例如,如果 books 数据集只有 500 个文档,则前 125 个文档将被重复以达到 625 个样本。

  • blended_dataset = nc.blend_datasets(target_samples, datasets, weights) 我们现在调用函数本身。之后,我们得到了一个混合数据集,我们可以像操作任何其他数据集一样操作它。我们可以应用过滤器、去重或对文档进行分类。

由于混合数据集涉及组合来自多个来源的数据,因此无法保留原始数据集的分片。用于读取和写入数据集的 add_filename=Truewrite_to_filename=True 选项因此与 nc.blend_datasets 不兼容。

打乱可能是数据集管理的另一个重要方面。NeMo Curator 的 nc.Shuffle 允许用户重新排序数据集中的所有条目。

这是一个关于如何完成此操作的小示例

import nemo_curator as nc

books = DocumentDataset.read_json("books_dataset/")

shuffle = nc.Shuffle(seed=42)

shuffled_books = shuffle(books)

shuffled_books.to_json("shuffled_books/")
  • shuffle = nc.Shuffle(seed=42) 这会创建一个可以与 NeMo Curator 中各种其他模块链接的打乱操作。在本例中,我们将种子固定为 42。设置种子将保证确定性,但根据数据集大小,可能会稍慢(慢 20-30%)。

  • shuffled_books = shuffle(books) 数据集现在已被打乱,我们可以将其保存到文件系统。