重要提示

您正在查看 NeMo 2.0 文档。此版本对 API 和新库 NeMo Run 进行了重大更改。我们目前正在将所有功能从 NeMo 1.0 移植到 2.0。有关先前版本或 2.0 中尚未提供的功能的文档,请参阅 NeMo 24.07 文档

使用 NeMo Curator 和 Apache Spark 读取和写入数据集#

背景#

NeMo Curator 使用 DocumentDataset 类来读取和写入 JSONL 和 Parquet 文件。它是 Dask (或 Dask-cuDF) DataFrame 的包装器。Apache Spark 可以读取和写入 NeMo Curator 生成的 JSONL 和 Parquet 文件,同样,NeMo Curator 也可以处理 Spark 生成的输出。

用法#

为了演示其工作原理,请考虑以下示例

import dask.dataframe as dd
import pandas as pd
from nemo_curator.datasets import DocumentDataset

# Create sample data
data = {
    "id": [1, 2, 3],
    "text": [
        "This is a tiny story.",
        "Another tiny story appears here.",
        "Yet another tiny story for you."
    ]
}

# Convert to a pandas DataFrame first
df = pd.DataFrame(data)

# Convert pandas DataFrame to DocumentDataset
stories_ds = DocumentDataset(dd.from_pandas(df, npartitions=2))

# Write the dataset to JSONL files
stories_ds.to_json("tiny_stories/", write_to_filename=False)

这将在目录 tiny_stories/ 中创建两个 JSONL 文件

tiny_stories/
    0.part
    1.part

Apache Spark 可以使用标准 API 读取这些文件。让我们首先创建一个名为 NeMoCuratorExample 的 Spark 会话,然后我们可以使用以下命令读取目录中的文件

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("NeMoCuratorExample").getOrCreate()

# Reading JSONL file
stories_df = spark.read.json("tiny_stories")
stories_df.show()

让我们继续向 Spark DataFrame 添加几列

from pyspark.sql.functions import size, split, length

# Calculate Word Count
stories_df = stories_df.withColumn("WordCount", size(split(stories_df["text"], r"\s+")))

# Calculate Character Count
stories_df = stories_df.withColumn("CharacterCount", length(stories_df["text"]))

stories_df.write.mode("overwrite").parquet("tiny_stories_transformed")

为了在 NeMo Curator DocumentDataset 和 Spark DataFrame 之间进行互操作,我们建议使用 Parquet 文件进行数据交换。以下代码片段演示了如何将 Spark DataFrame 的输出读取到 NeMo Curator DocumentDataset

from nemo_curator.utils.file_utils import get_all_files_paths_under

# Ignores checksum and marker files created by Spark
processed_files = [
     filename for filename in get_all_files_paths_under("tiny_stories_transformed")
     if not (filename.endswith(".crc") or filename.endswith("_SUCCESS"))
]

stories_dataset = DocumentDataset.read_parquet(processed_files, backend="pandas")

值得注意的是,Spark 通常倾向于创建校验和和其他标记文件,这些文件可能因 Spark 发行版而异,因此建议在将数据读取到 NeMo Curator DocumentDataset 中时忽略它们。