重要提示
您正在查看 NeMo 2.0 文档。此版本对 API 和新库 NeMo Run 进行了重大更改。我们目前正在将所有功能从 NeMo 1.0 移植到 2.0。有关先前版本或 2.0 中尚未提供的功能的文档,请参阅 NeMo 24.07 文档。
使用 NeMo Curator 和 Apache Spark 读取和写入数据集#
背景#
NeMo Curator 使用 DocumentDataset
类来读取和写入 JSONL 和 Parquet 文件。它是 Dask (或 Dask-cuDF) DataFrame 的包装器。Apache Spark 可以读取和写入 NeMo Curator 生成的 JSONL 和 Parquet 文件,同样,NeMo Curator 也可以处理 Spark 生成的输出。
用法#
为了演示其工作原理,请考虑以下示例
import dask.dataframe as dd
import pandas as pd
from nemo_curator.datasets import DocumentDataset
# Create sample data
data = {
"id": [1, 2, 3],
"text": [
"This is a tiny story.",
"Another tiny story appears here.",
"Yet another tiny story for you."
]
}
# Convert to a pandas DataFrame first
df = pd.DataFrame(data)
# Convert pandas DataFrame to DocumentDataset
stories_ds = DocumentDataset(dd.from_pandas(df, npartitions=2))
# Write the dataset to JSONL files
stories_ds.to_json("tiny_stories/", write_to_filename=False)
这将在目录 tiny_stories/
中创建两个 JSONL 文件
tiny_stories/
0.part
1.part
Apache Spark 可以使用标准 API 读取这些文件。让我们首先创建一个名为 NeMoCuratorExample
的 Spark 会话,然后我们可以使用以下命令读取目录中的文件
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NeMoCuratorExample").getOrCreate()
# Reading JSONL file
stories_df = spark.read.json("tiny_stories")
stories_df.show()
让我们继续向 Spark DataFrame 添加几列
from pyspark.sql.functions import size, split, length
# Calculate Word Count
stories_df = stories_df.withColumn("WordCount", size(split(stories_df["text"], r"\s+")))
# Calculate Character Count
stories_df = stories_df.withColumn("CharacterCount", length(stories_df["text"]))
stories_df.write.mode("overwrite").parquet("tiny_stories_transformed")
为了在 NeMo Curator DocumentDataset
和 Spark DataFrame 之间进行互操作,我们建议使用 Parquet 文件进行数据交换。以下代码片段演示了如何将 Spark DataFrame 的输出读取到 NeMo Curator DocumentDataset
中
from nemo_curator.utils.file_utils import get_all_files_paths_under
# Ignores checksum and marker files created by Spark
processed_files = [
filename for filename in get_all_files_paths_under("tiny_stories_transformed")
if not (filename.endswith(".crc") or filename.endswith("_SUCCESS"))
]
stories_dataset = DocumentDataset.read_parquet(processed_files, backend="pandas")
值得注意的是,Spark 通常倾向于创建校验和和其他标记文件,这些文件可能因 Spark 发行版而异,因此建议在将数据读取到 NeMo Curator DocumentDataset
中时忽略它们。