重要提示
您正在查看 NeMo 2.0 文档。此版本对 API 和一个新的库 NeMo Run 引入了重大更改。我们目前正在将 NeMo 1.0 的所有功能移植到 2.0。有关先前版本或 2.0 中尚不可用的功能的文档,请参阅 NeMo 24.07 文档。
使用 DocumentDataset#
背景#
文本数据集负责存储元数据以及核心文本/文档。jsonl`
文件因其易于处理和检查而常用。parquet
文件也是一种常用格式。在这两种情况下,单个数据集通常由多个底层文件(称为分片)表示。例如,如果您有一个名为“books”的大型数据集,则很可能将其存储在分片中,每个分片的名称类似于 books_00.jsonl
、books_01.jsonl
、books_02.jsonl
等。
数据集在内存中的存储方式与在磁盘上的存储方式同样重要。如果您有一个大型数据集,它太大而无法直接放入内存,则必须以某种方式将其分布在多台机器/节点上。此外,如果整理数据集需要很长时间,则很可能因某些不可预见的故障或其他原因而中断。NeMo Curator 的 DocumentDataset
采用 Dask 的分布式数据帧来管理跨多个节点的大型数据集,并允许轻松重启中断的数据整理。DocumentDataset
支持读取和写入本地磁盘以及直接来自 S3 等远程源的分片 jsonl
和 parquet
文件。
用法#
读取和写入#
DocumentDataset
是 NeMo Curator 中文本数据集的标准格式。假设我们有一个“books”数据集,其存储结构如下
books_dataset/
books_00.jsonl
books_01.jsonl
books_02.jsonl
您可以使用以下方法读取、过滤和写入数据集
import nemo_curator as nc
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.filters import WordCountFilter
files = get_all_files_paths_under("books_dataset/")
books = DocumentDataset.read_json(files, add_filename=True)
filter_step = nc.ScoreFilter(
WordCountFilter(min_words=80),
text_field="text",
score_field="word_count",
)
long_books = filter_step(books)
long_books.to_json("long_books/", write_to_filename=True)
让我们逐行浏览这段代码。
files = get_all_files_paths_under("books_dataset/")
这会检索给定目录中所有文件的列表。在我们的例子中,这等同于编写files = ["books_dataset/books_00.jsonl", "books_dataset/books_01.jsonl", "books_dataset/books_02.jsonl"]
books = DocumentDataset.read_json(files, add_filename=True)
这会将列出的文件读入内存。add_filename=True
选项将分片的名称(books_00.jsonl
、books_01.jsonl
等)保留为额外的file_name
字段。当数据集写回磁盘时,此选项(与write_to_filename
选项和filename_col
结合使用)确保文档保留在其原始分片中。这对于手动检查逐个分片过滤的结果很有用。add_filename
选项也可以用作字符串,在这种情况下,它将用作列的名称(而不是默认的file_name
)。filter_step = ...
这会构建并应用文档长度的启发式过滤器。有关更多信息,请参阅文档的过滤页面。long_books.to_json("long_books/", write_to_filename=True)
这会将过滤后的数据集写入新目录。如上所述,write_to_filename=True
保留数据集的分片。如果数据集在读取时未使用add_filename=True
,则设置write_to_filename=True
将引发错误。如果数据集在读取时使用了add_filename="path"
,则除了write_to_filename=True
之外,还需要设置filename_col="path"
。
DocumentDataset
只是 Dask 数据帧的包装器。可以使用 DocumentDataset.df
成员变量访问底层数据帧。重要的是要了解 Dask 如何处理计算。引用他们的 文档
Dask 是延迟评估的。计算的结果在您要求之前不会计算。相反,会生成用于计算的 Dask 任务图。
因此,对 DocumentDataset.read_json
的调用不会立即执行。相反,读取数据集每个分片的任务将放置在任务图上。只有在调用 DocumentDataset.df.compute()
或某些依赖于 DocumentDataset.df
的操作调用 .compute()
时,才会执行任务图。这使我们能够避免将大型数据集读入内存。在我们的例子中,long_books.to_json()
在内部调用 .compute()
,因此任务图将在那时执行。
从中断处恢复#
跟踪数据集中哪些文档已被处理可能很有用,这样,如果长时间的数据整理作业中断,可以恢复它们。NeMo Curator 提供了一个实用程序,可以轻松跟踪哪些数据集分片已被处理。考虑一下上面代码的修改版本
from nemo_curator.utils.file_utils import get_remaining_files
files = get_remaining_files("books_dataset/", "long_books/", "jsonl")
books = DocumentDataset.read_json(files, add_filename=True)
filter_step = nc.ScoreFilter(
WordCountFilter(min_words=80),
text_field="text",
score_field="word_count",
)
long_books = filter_step(books)
long_books.to_json("long_books/", write_to_filename=True)
get_remaining_files
比较输入目录("books_dataset/"
)和输出目录("long_books"
),并返回输入目录中尚未写入输出目录的所有分片的列表。
虽然 Dask 提供了一种避免将过多数据读入内存的简便方法,但在某些情况下,我们可能需要调用 persist()
或类似的强制数据集进入内存的操作。在这些情况下,我们建议使用围绕 get_remaining_files
的简单包装函数分批处理输入数据集,如下所示。
from nemo_curator.utils.file_utils import get_batched_files
for files in get_batched_files("books_dataset/", "long_books/", "jsonl", batch_size=64):
books = DocumentDataset.read_json(files, add_filename=True)
filter_step = nc.ScoreFilter(
WordCountFilter(min_words=80),
text_field="text",
score_field="word_count",
)
long_books = filter_step(books)
long_books.to_json("long_books/", write_to_filename=True)
这将一次读取 64 个分片,处理它们,然后将它们写回磁盘。与 get_remaining_files
一样,它只包括输入目录中但不在输出目录中的文件。
混合和打乱#
混合来自多个来源的数据可能是提高下游模型性能的好方法。这种混合可以在模型训练本身期间完成(即,在线混合),也可以在训练之前完成(即,离线混合)。在线混合对于在训练过程中快速迭代很有用。同时,如果您想分发数据集,离线混合很有用。在线混合目前在 NeMo via NVIDIA Megatron Core 中是可能的,而 NeMo Curator 提供了一种离线执行混合的方法。
让我们看看如何使用 nc.blend_datasets
组合数据集
import nemo_curator as nc
books = DocumentDataset.read_json("books_dataset/")
articles = DocumentDataset.read_json("articles_dataset/")
journals = DocumentDataset.read_json("journals_dataset/")
datasets = [books, articles, journals]
target_samples = 1000
weights = [5.0, 2.0, 1.0]
blended_dataset = nc.blend_datasets(target_samples, datasets, weights)
blended_dataset.to_json("blended_dataset/")
datasets = [books, articles, journals]
在这里,我们选择混合三个不同的数据集。这些数据集不必具有相同的文件格式或相似的大小。只要它们可以作为 DocumentDataset 读入即可。每个数据集的样本始终“按顺序”抽取。精确的顺序取决于格式。对于分片的 jsonl 文件,将首先选择排序顺序中文件名开头的条目。target_samples = 1000
这是结果数据集中所需的样本数。样本是指文档或通常只是单个数据点。根据权重,数据集中最终可能会有更多样本。weights = [5.0, 2.0, 1.0]
应从每个数据集中抽取的样本的相对数量。给定这些权重,混合数据集将具有五倍于期刊样本的书籍样本。同样,与期刊样本相比,文章样本的数量将是期刊样本的两倍。权重可以是实数列表(非负数)。nc.blend_datasets
将执行归一化,并将归一化权重与目标样本结合,以确定应从每个数据集中抽取多少样本。在 books 数据集的情况下,计算如下。\[\lceil target\_samples \cdot w_i\rceil=\lceil 1000\cdot \frac{5}{8}\rceil=625\]如果任何数据集的样本少于计算出的权重,则将对其进行过采样以满足配额。例如,如果 books 数据集只有 500 个文档,则前 125 个文档将被重复以达到 625 个样本。
blended_dataset = nc.blend_datasets(target_samples, datasets, weights)
我们现在调用函数本身。之后,我们得到了一个混合数据集,我们可以像操作任何其他数据集一样操作它。我们可以应用过滤器、去重或对文档进行分类。
由于混合数据集涉及组合来自多个来源的数据,因此无法保留原始数据集的分片。用于读取和写入数据集的 add_filename=True
和 write_to_filename=True
选项因此与 nc.blend_datasets
不兼容。
打乱可能是数据集管理的另一个重要方面。NeMo Curator 的 nc.Shuffle
允许用户重新排序数据集中的所有条目。
这是一个关于如何完成此操作的小示例
import nemo_curator as nc
books = DocumentDataset.read_json("books_dataset/")
shuffle = nc.Shuffle(seed=42)
shuffled_books = shuffle(books)
shuffled_books.to_json("shuffled_books/")
shuffle = nc.Shuffle(seed=42)
这会创建一个可以与 NeMo Curator 中各种其他模块链接的打乱操作。在本例中,我们将种子固定为 42。设置种子将保证确定性,但根据数据集大小,可能会稍慢(慢 20-30%)。shuffled_books = shuffle(books)
数据集现在已被打乱,我们可以将其保存到文件系统。