DALI 数据集与外部源的输入#

在本教程中,我们将展示如何将外部数据输入与 DALIDataset 一起使用。以下部分演示了如何为 DALIDataset 提供来自其他 TensorFlow tf.data.Datasets 的数据,以及如何使用 DALI Exernal Source operator 与自定义 Python 代码来生成数据。

本教程假设您熟悉 DALIDataset API - 一种将 DALI Pipeline 作为 tf.data.Dataset 兼容对象运行的方式。您可以在关于 在带有 tf.data 的 DALIDataset 中使用 pipelines在多个 GPU 上运行 DALIDataset 的教程中找到更多信息。

来自其他 tf.data.Datasets 的输入#

DALI 已经提供了一组专用的 reader operators,并允许通过 External Source 运算符在 pipeline 中指定自定义 Python 代码作为数据源。

包装到 TensorFlow 兼容 Dataset 对象中的 DALI Pipeline 可以将其他 tf.data.Dataset 对象作为输入,使您可以使用 DALI 处理使用 TensorFlow API 创建的输入

注意:DALIDataset 的输入目前处于实验阶段,API 可能会在 DALI 的未来版本中进行调整。

注意:目前通过 DALIDataset 的附加变体启用输入的使用,该变体在 DALI Tensorflow 插件中作为 ``experimental.DALIDatasetWithInputs`` 提供。

在以下段落中,我们将演示如何指定这些输入并指示它们在 DALI pipeline 中的位置。

选择输入数据集#

作为一个例子,我们将使用 tf.data.TFRecordReader,即使 DALI 有其自己的原生变体。我们将使用来自 DALI_extra - DALI 测试数据存储库的示例数据集。我们将从该 TFRecord 数据集中提取编码图像,并将它们传递给 DALI 进行解码。

DALI_EXTRA_PATH 环境变量应指向从 DALI extra repository 下载数据的位置。请确保已检出正确的发布标签。

Tensorflow Dataset#

首先,我们使用 tf.data API 准备 TensorFlow Dataset。我们需要为 TFRecordDataset 构造函数提供数据路径。

接下来,我们选择编码图像作为我们要提取的特征,并使用 tf.io.parse_single_example 和我们定义的 schema 进行解析。我们还使用 tf.io.decode_raw 将原始 tf.string 张量转换为类型化的数值张量 - DALI 可以使用它。

最后两个步骤将构成我们的 decode_fn,我们将映射到 TFRecordDataset 上。我们定义了我们的 input_dataset

[1]:
import tensorflow as tf

tf.compat.v1.enable_eager_execution()
import os

# Path to TFRecord dataset
data_path = os.path.join(os.environ["DALI_EXTRA_PATH"], "db/tfrecord/train/")

# We want to extract the encoded images
images_key = "image/encoded"

# Schema for raw, encoded buffers
schema = {images_key: tf.io.FixedLenFeature([], tf.string, "")}


def decode_fn(record_bytes):
    parsed_examples = tf.io.parse_single_example(record_bytes, schema)
    encoded_image = tf.io.decode_raw(parsed_examples[images_key], tf.uint8)
    return encoded_image


input_dataset = tf.data.TFRecordDataset(data_path).map(decode_fn)

DALI Pipeline#

现在我们需要定义一个 DALI Pipeline,它可以接受 tf.data.Dataset 作为输入,并解码该输入提供的缓冲区。

我们使用带有特定 name 参数的 external_source 运算符节点作为来自 Tensorflow 的输入的占位符节点。在本例中,我们将使用一个输入,但可以有更多输入,只要占位符 fn.external_source 节点具有唯一的名称。

[2]:
from nvidia.dali import pipeline_def
import nvidia.dali.fn as fn
import nvidia.dali.types as types

batch_size = 2


@pipeline_def(device_id=0, num_threads=4, batch_size=batch_size)
def pipeline_with_inputs(device):
    encoded = fn.external_source(name="input_from_tf_record", dtype=types.UINT8)
    images = fn.decoders.image(
        encoded, device="mixed" if device == "gpu" else "cpu"
    )
    resized = fn.resize(images, size=(300, 500))
    return resized

请注意,我们的 pipeline 可以使用 device 进行参数化,以便我们可以在 GPU 上加速解码。当我们使用 GPU 加速时,我们需要特别注意 DALIDataset 和输入的位置。

我们还将所有图像调整为相同大小,因为 TensorFlow 希望数据批次是统一的。

构建 DALIDataset#

现在让我们导入实验性的 DALIDatasetWithInputs。它与 DALIDataset 的不同之处在于添加了额外的 input_datasets 参数,我们将字典传递给该参数,该字典将 external_source 节点的名称映射到输入 tf.data.Dataset 对象。

这里非常简单,因为我们只有一个 input_dataset,我们想将其映射到 'input_from_tf_record',我们将它命名为 input_spec_dict

对于第一个示例,我们使用 CPU 放置。

[3]:
import nvidia.dali.plugin.tf as dali_tf

input_spec_dict = {"input_from_tf_record": input_dataset}

pipe = pipeline_with_inputs("cpu")

with tf.device("/cpu:0"):
    dali_dataset = dali_tf.experimental.DALIDatasetWithInputs(
        pipeline=pipe,
        input_datasets=input_spec_dict,
        batch_size=batch_size,
        output_shapes=None,
        output_dtypes=tf.uint8,
        device_id=0,
    )

收集结果#

由于我们以 eager 模式启动,我们可以在常规循环中运行我们的 DALIDataset,将结果转换为 NumPy 并可视化它们。

[4]:
def run(dali_dataset, iters=2):
    result = []
    for batch, i in zip(dali_dataset, range(iters)):
        images = batch[0].numpy()
        for sample_idx in range(batch_size):
            result.append(images[sample_idx])
    return result
[5]:
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import math


def display(outputs, columns=2, captions=None, cpu=False):
    rows = int(math.ceil(len(outputs) / columns))
    fig = plt.figure()
    fig.set_size_inches(16, 6 * rows)
    gs = gridspec.GridSpec(rows, columns)
    row = 0
    col = 0
    for i in range(len(outputs)):
        plt.subplot(gs[i])
        plt.axis("off")
        if captions is not None:
            plt.title(captions[i])
        plt.imshow(outputs[i])
[6]:
display(run(dali_dataset))
../../../_images/examples_frameworks_tensorflow_tensorflow-dataset-inputs_13_0.png

批处理和每样本模式#

还有一些其他事情需要考虑。TensorFlow Datasets 在“示例”上运行 - 这通常是单个密集张量。即使我们对某些 tf.data.Dataset 应用了 .batch(batch_size),结果值也将是张量,最外层维度等于 batch_size

DALI 通过设计在数据批次上运行。在将来自 TensorFlow 的输入提供给 DALI 时,我们可以指定特定的 external_source 节点是否应查询其输入的批次或单独的样本。在后一种情况下,DALI 将查询输入 batch_size 次以构建批次 - 无需手动执行此操作。

通过 DALIDatasetWithInputsinput_datasets 参数传递 tf.data.Dataset 会自动指示此输入应被视为每样本。

为了展示 DALIDataset 可以处理不同模式下的多个输入,我们将创建一个额外的 dataset 输入,这次以批处理模式运行。

生成器 Dataset#

我们使用带有生成器 dataset 的简单生成器,以返回一个 NumPy 数组,该数组逐渐变红。接下来,我们使用 Batch dataset,以展示如何从 TensorFlow 向 DALI 指定批处理数据源。

我们还强调了生成器 Dataset 只能放置在 CPU 上的事实。

[7]:
import numpy as np


def get_red():
    current_red = 64
    max_red = 255
    while True:
        yield np.full((300, 500, 3), (current_red, 0, 0), dtype=np.float32)
        current_red = min(current_red + 1, max_red)


signature = tf.TensorSpec(shape=(300, 500, 3), dtype=tf.float32)

with tf.device("/cpu:0"):
    batch_dataset = tf.data.Dataset.from_generator(
        get_red, output_signature=signature
    ).batch(batch_size)

具有两个输入的 Pipeline#

现在,让我们通过添加第二个 external_source 占位符节点来表示第二个输入来调整 pipeline,并将其用于某些处理。

[8]:
import nvidia.dali.math as dmath
import nvidia.dali.types as types


@pipeline_def(device_id=0, num_threads=4, batch_size=batch_size)
def pipeline_with_two_inputs(device):
    encoded = fn.external_source(name="input_from_tf_record", dtype=types.UINT8)
    red = fn.external_source(name="input_from_generator", dtype=types.FLOAT)
    if device == "gpu":
        red = red.gpu()
    images = fn.decoders.image(
        encoded, device="mixed" if device == "gpu" else "cpu"
    )
    resized = fn.resize(images, size=(300, 500))
    saturated_red = dmath.min(resized + red, 255.0)
    return fn.cast(saturated_red, dtype=types.UINT8)

指定批处理输入#

为了通知 DALI 我们传递了一个表示批次的输入,我们需要与该输入一起传递一些额外的选项。对于这种情况,我们可以使用 nvidia.dal.plugin.tf.experimental.Input

Input 类接受几个参数:我们想要传递选项的 tf.data.Dataset 输入,以及可以覆盖 external_source 节点定义中存在的参数的 layoutbatch - 为它们中的任何一个提供 None 将使 DALI 使用 DALI pipeline 中定义的那个。我们将设置 batch=True 以选择批处理模式。此外,我们可以为 DALI 指定 'HWC' 布局。

让我们通过扩展 input_datasets 字典规范来更新我们的示例。

[9]:
two_inputs_spec_dict = {
    "input_from_tf_record": input_dataset,
    "input_from_generator": dali_tf.experimental.Input(
        batch_dataset, batch=True, layout="HWC"
    ),
}

pipe_batched_input = pipeline_with_two_inputs("cpu")

with tf.device("/cpu:0"):
    batched_dali_dataset = dali_tf.experimental.DALIDatasetWithInputs(
        pipeline=pipe_batched_input,
        input_datasets=two_inputs_spec_dict,
        batch_size=batch_size,
        output_shapes=None,
        output_dtypes=tf.uint8,
        device_id=0,
    )
[10]:
display(run(batched_dali_dataset))
../../../_images/examples_frameworks_tensorflow_tensorflow-dataset-inputs_21_0.png

或者,对于 'input_from_generator',我们可以使用带有 batch=NoneInput 类,并将 batch=True 传递给 Pipeline 定义中具有该名称的 fn.external_source,使其成为 fn.external_source(name='input_from_generator', batch=True)。请注意,如果为 Input 类提供 None 选项,则该选项取自 Pipeline 定义。

GPU 放置#

如果您想充分利用 DALI 的潜力,则需要利用 GPU 加速。我们准备的两个示例 pipeline 都可以与 GPU 加速一起使用,但它们需要进行一些调整。

首先,它们需要放置在 GPU 上。我们还需要调整我们的输入 - TensorFlow 要求 GPU 放置的每个输入也应在 GPU 上可用。大多数 tf.data.Dataset 都不能直接放置在 GPU 上 - 这对于我们构建的输入 dataset 也是如此。因此,我们需要手动将输入 dataset 传输到我们使用 tf.data.experimental.copy_to_device 放置 DALIDataset 的 GPU 上。

请注意,GPU 放置的 DALIDataset 应该是最后一个 dataset,并且不应被其他 tf.data.Dataset 类处理。

显式复制#

让我们从将输入 dataset 传输到 /gpu:0 开始。接下来,我们使用它们为 input_datasets 参数指定一个新的字典。最后,我们获取一个新的 DALI pipeline 实例,这次指定用于 'gpu' 加速,并将其包装到 GPU 放置的 DALIDatasetWithInputs 中。

[11]:
input_dataset_gpu = input_dataset.apply(
    tf.data.experimental.copy_to_device("/gpu:0")
)
batch_dataset_gpu = batch_dataset.apply(
    tf.data.experimental.copy_to_device("/gpu:0")
)


two_inputs_spec_dict_gpu = {
    "input_from_tf_record": input_dataset_gpu,
    "input_from_generator": dali_tf.experimental.Input(
        batch_dataset_gpu, batch=True, layout="HWC"
    ),
}

pipe_gpu = pipeline_with_two_inputs("gpu")

with tf.device("/gpu:0"):
    dali_dataset_gpu = dali_tf.experimental.DALIDatasetWithInputs(
        pipeline=pipe_gpu,
        input_datasets=two_inputs_spec_dict_gpu,
        batch_size=batch_size,
        output_shapes=None,
        output_dtypes=tf.uint8,
        device_id=0,
    )
[12]:
display(run(dali_dataset_gpu))
../../../_images/examples_frameworks_tensorflow_tensorflow-dataset-inputs_26_0.png

处理 External Source source 参数#

在前面的示例中,我们使用了 TensorFlow 的生成器 Dataset (tf.data.Dataset.from_generator())。DALI 在 External Source 运算符中内置了类似的功能 - 可以通过 source 参数传递 Python 代码:回调或可迭代对象。您甚至可能已经以这种方式定义了一些 pipeline,您已将其与其他深度学习框架一起使用。

DALIDatasetWithInputs 允许运行具有带有 source 参数的 External Source 节点的 Pipeline,而无需进行额外的更改。

它会自动检测 DALIDataset 的放置位置,并将 source 转换为放置在 CPU 上的 TensorFlow 生成器 Dataset。某些 限制 可能适用,但它支持 DALI 支持的所有类型的 source 参数

  • 可调用对象

  • 可迭代对象

  • 生成器函数

在批处理和每样本模式下。

此集成仅适用于返回 CPU 数据的 Python source

source 处理示例#

让我们从我们定义的第二个 pipeline 开始。这一次,我们将生成器函数直接放入 DALI pipeline 中,而不是通过 input_datasets 手动提供来自 TensorFlow 的带有红色通道的 NumPy 数组。

我们保留第一个输入 dataset,以表明我们可以混合使用这两种方法。

我们在第二个 external_source 中设置 batch=False,因为生成器被定义为返回单独的样本。

[13]:
@pipeline_def(device_id=0, num_threads=4, batch_size=batch_size)
def pipeline_with_source(device):
    encoded = fn.external_source(name="input_from_tf_record", dtype=types.UINT8)
    # Now we don't need to name this node, we just pass the `source`
    red = fn.external_source(source=get_red, batch=False, dtype=types.FLOAT)
    if device == "gpu":
        red = red.gpu()
    images = fn.decoders.image(
        encoded, device="mixed" if device == "gpu" else "cpu"
    )
    resized = fn.resize(images, size=(300, 500))
    saturated_red = dmath.min(resized + red, 255.0)
    return fn.cast(saturated_red, dtype=types.UINT8)

现在我们不需要在 input_datasets 中传递任何其他详细信息来处理 fn.external_source(source=get_red) - 这将自动完成。

[14]:
inputs_spec_dict_gpu = {
    "input_from_tf_record": input_dataset_gpu,
}

pipe_source_gpu = pipeline_with_source("gpu")

with tf.device("/gpu:0"):
    dali_dataset_source = dali_tf.experimental.DALIDatasetWithInputs(
        pipeline=pipe_source_gpu,
        input_datasets=inputs_spec_dict_gpu,
        batch_size=batch_size,
        output_shapes=None,
        output_dtypes=tf.uint8,
        device_id=0,
    )

让我们运行该示例。

[15]:
display(run(dali_dataset_source))
../../../_images/examples_frameworks_tensorflow_tensorflow-dataset-inputs_33_0.png

局限性#

由于当前的 TensorFlow 限制,当使用 GPU 放置的 DALIDataset 时,输入将在强制 copy_to_device 之后作为 GPU 内存传递。当与 CPU 设备一起使用 External Source 时:fn.external_source(device='cpu'),DALI 内部将有额外的 GPU 到 CPU 复制。也可以使用 device='gpu',从而消除复制。