带有外部源的 DALI 数据集的输入#

在本教程中,我们将展示如何将外部数据输入与 DALIDataset 一起使用。以下部分演示了如何为 DALIDataset 提供来自其他 TensorFlow tf.data.Datasets 的数据,以及如何使用 DALI 外部源操作器 与自定义 Python 代码来生成数据。

本教程假设您熟悉 DALIDataset API - 一种将 DALI Pipeline 作为 tf.data.Dataset 兼容对象运行的方式。您可以在关于 在带有 tf.data 的 DALIDataset 中使用 pipeline在多个 GPU 上运行 DALIDataset 的教程中找到更多信息。

来自其他 tf.data.Datasets 的输入#

DALI 已经提供了一组专用的 读取器操作器,并允许通过 外部源 操作器在 pipeline 中指定自定义 Python 代码作为数据源。

包装到 TensorFlow 兼容 Dataset 对象中的 DALI Pipeline 可以将其他 tf.data.Dataset 对象作为输入,允许您使用 DALI 处理使用 TensorFlow API 创建的输入

注意:DALIDataset 的输入目前处于实验阶段,API 可能会在 DALI 的未来版本中进行调整。

注意:目前通过 DALIDataset 的附加变体启用输入的使用,该变体在 DALI Tensorflow 插件中作为 ``experimental.DALIDatasetWithInputs`` 提供。

在以下段落中,我们将演示如何指定这些输入并指示它们在 DALI pipeline 中的位置。

选择输入数据集#

作为一个示例,我们将使用 tf.data.TFRecordReader,即使 DALI 有自己的原生变体。我们将使用来自 DALI_extra - DALI 测试数据存储库的示例数据集。我们将从该 TFRecord 数据集中提取编码图像,并将它们传递给 DALI 进行解码。

DALI_EXTRA_PATH 环境变量应指向从 DALI extra 存储库 下载数据的位置。请确保检出正确的发布标签。

Tensorflow Dataset#

首先,我们使用 tf.data API 准备 TensorFlow Dataset。我们需要为 TFRecordDataset 构造函数提供数据路径。

接下来,我们选择编码图像作为我们要提取的特征,并使用我们定义的 schema 通过 tf.io.parse_single_example 进行解析。我们还使用 tf.io.decode_raw 将原始 tf.string 张量转换为类型化的数值张量 - DALI 可以使用它。

最后两个步骤将组成我们的 decode_fn,我们将把它映射到 TFRecordDataset 上。我们定义了我们的 input_dataset

[1]:
import tensorflow as tf

tf.compat.v1.enable_eager_execution()
import os

# Path to TFRecord dataset
data_path = os.path.join(os.environ["DALI_EXTRA_PATH"], "db/tfrecord/train/")

# We want to extract the encoded images
images_key = "image/encoded"

# Schema for raw, encoded buffers
schema = {images_key: tf.io.FixedLenFeature([], tf.string, "")}


def decode_fn(record_bytes):
    parsed_examples = tf.io.parse_single_example(record_bytes, schema)
    encoded_image = tf.io.decode_raw(parsed_examples[images_key], tf.uint8)
    return encoded_image


input_dataset = tf.data.TFRecordDataset(data_path).map(decode_fn)

DALI Pipeline#

现在我们需要定义一个 DALI Pipeline,它可以接受 tf.data.Dataset 作为输入,并解码该输入提供的缓冲区。

我们使用带有特定 name 参数的 external_source 操作器节点作为来自 Tensorflow 的输入的占位符节点。在本示例中,我们将使用一个输入,但可以有更多,只要占位符 fn.external_source 节点具有唯一的名称。

[2]:
from nvidia.dali import pipeline_def
import nvidia.dali.fn as fn
import nvidia.dali.types as types

batch_size = 2


@pipeline_def(device_id=0, num_threads=4, batch_size=batch_size)
def pipeline_with_inputs(device):
    encoded = fn.external_source(name="input_from_tf_record", dtype=types.UINT8)
    images = fn.decoders.image(
        encoded, device="mixed" if device == "gpu" else "cpu"
    )
    resized = fn.resize(images, size=(300, 500))
    return resized

请注意,我们的 pipeline 可以使用 device 进行参数化,因此我们可以在 GPU 上加速解码。当我们使用 GPU 加速时,我们需要特别注意 DALIDataset 和输入的位置。

我们还将所有图像调整为相同大小,因为 TensorFlow 希望数据批次是统一的。

构建 DALIDataset#

现在让我们导入实验性的 DALIDatasetWithInputs。它与 DALIDataset 的不同之处在于添加了额外的 input_datasets 参数,我们将字典传递给该参数,该字典将 external_source 节点的名称映射到输入 tf.data.Dataset 对象。

这里非常简单,因为我们只有一个 input_dataset 要映射到 'input_from_tf_record',我们将它命名为 input_spec_dict

对于第一个示例,我们使用 CPU 放置。

[3]:
import nvidia.dali.plugin.tf as dali_tf

input_spec_dict = {"input_from_tf_record": input_dataset}

pipe = pipeline_with_inputs("cpu")

with tf.device("/cpu:0"):
    dali_dataset = dali_tf.experimental.DALIDatasetWithInputs(
        pipeline=pipe,
        input_datasets=input_spec_dict,
        batch_size=batch_size,
        output_shapes=None,
        output_dtypes=tf.uint8,
        device_id=0,
    )

收集结果#

由于我们在 eager 模式下启动,我们可以在常规循环中运行我们的 DALIDataset,将结果转换为 NumPy 并可视化它们。

[4]:
def run(dali_dataset, iters=2):
    result = []
    for batch, i in zip(dali_dataset, range(iters)):
        images = batch[0].numpy()
        for sample_idx in range(batch_size):
            result.append(images[sample_idx])
    return result
[5]:
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import math


def display(outputs, columns=2, captions=None, cpu=False):
    rows = int(math.ceil(len(outputs) / columns))
    fig = plt.figure()
    fig.set_size_inches(16, 6 * rows)
    gs = gridspec.GridSpec(rows, columns)
    row = 0
    col = 0
    for i in range(len(outputs)):
        plt.subplot(gs[i])
        plt.axis("off")
        if captions is not None:
            plt.title(captions[i])
        plt.imshow(outputs[i])
[6]:
display(run(dali_dataset))
../../../_images/examples_frameworks_tensorflow_tensorflow-dataset-inputs_13_0.png

批处理和按样本模式#

还有一些其他事项需要考虑。TensorFlow Datasets 在“示例”上运行 - 这通常是单个密集张量。即使我们对某些 tf.data.Dataset 应用了 .batch(batch_size),结果值也将是张量,最外层维度等于 batch_size

DALI 通过设计在数据批次上运行。在将来自 TensorFlow 的输入提供给 DALI 时,我们可以指定特定的 external_source 节点是应查询其输入的批次还是单独的样本。在后一种情况下,DALI 将查询输入 batch_size 次以构建批次 - 无需手动执行此操作。

通过 DALIDatasetWithInputsinput_datasets 参数传递 tf.data.Dataset 会自动指示此输入应被视为按样本处理。

为了展示 DALIDataset 可以处理不同模式下的多个输入,我们将创建一个额外的 dataset 输入,这次以批处理模式运行。

生成器数据集#

我们使用一个简单的生成器和一个生成器数据集,以返回一个逐渐变红的 numpy 数组。接下来,我们使用 Batch 数据集,以展示如何将来自 TensorFlow 的批处理数据源指定给 DALI。

我们还强调了生成器数据集只能放置在 CPU 上的事实。

[7]:
import numpy as np


def get_red():
    current_red = 64
    max_red = 255
    while True:
        yield np.full((300, 500, 3), (current_red, 0, 0), dtype=np.float32)
        current_red = min(current_red + 1, max_red)


signature = tf.TensorSpec(shape=(300, 500, 3), dtype=tf.float32)

with tf.device("/cpu:0"):
    batch_dataset = tf.data.Dataset.from_generator(
        get_red, output_signature=signature
    ).batch(batch_size)

带有两个输入的 Pipeline#

现在,让我们通过添加第二个 external_source 占位符节点来表示第二个输入来调整 pipeline,并将其用于某些处理。

[8]:
import nvidia.dali.math as dmath
import nvidia.dali.types as types


@pipeline_def(device_id=0, num_threads=4, batch_size=batch_size)
def pipeline_with_two_inputs(device):
    encoded = fn.external_source(name="input_from_tf_record", dtype=types.UINT8)
    red = fn.external_source(name="input_from_generator", dtype=types.FLOAT)
    if device == "gpu":
        red = red.gpu()
    images = fn.decoders.image(
        encoded, device="mixed" if device == "gpu" else "cpu"
    )
    resized = fn.resize(images, size=(300, 500))
    saturated_red = dmath.min(resized + red, 255.0)
    return fn.cast(saturated_red, dtype=types.UINT8)

指定批处理输入#

要通知 DALI 我们传递的输入表示一个批次,我们需要在该输入旁边传递一些额外的选项。对于这种情况,我们可以使用 nvidia.dal.plugin.tf.experimental.Input

Input 类接受多个参数:我们想要为其传递选项的 tf.data.Dataset 输入,以及可以覆盖 external_source 节点定义中存在的参数的 layoutbatch - 为其中任何一个提供 None 将使 DALI 使用 DALI pipeline 中定义的那个。我们将设置 batch=True 以选择批处理模式。此外,我们可以为 DALI 指定 'HWC' 布局。

让我们通过扩展 input_datasets 字典规范来更新我们的示例。

[9]:
two_inputs_spec_dict = {
    "input_from_tf_record": input_dataset,
    "input_from_generator": dali_tf.experimental.Input(
        batch_dataset, batch=True, layout="HWC"
    ),
}

pipe_batched_input = pipeline_with_two_inputs("cpu")

with tf.device("/cpu:0"):
    batched_dali_dataset = dali_tf.experimental.DALIDatasetWithInputs(
        pipeline=pipe_batched_input,
        input_datasets=two_inputs_spec_dict,
        batch_size=batch_size,
        output_shapes=None,
        output_dtypes=tf.uint8,
        device_id=0,
    )
[10]:
display(run(batched_dali_dataset))
../../../_images/examples_frameworks_tensorflow_tensorflow-dataset-inputs_21_0.png

或者,对于 'input_from_generator',我们可以使用带有 batch=NoneInput 类,并将 batch=True 传递给 Pipeline 定义中具有该名称的 fn.external_source,使其成为 fn.external_source(name='input_from_generator', batch=True)。请注意,如果为 Input 类提供了一个选项作为 None,则该选项取自 Pipeline 定义。

GPU 放置#

如果您想充分利用 DALI 的潜力,则需要利用 GPU 加速。我们准备的两个示例 pipeline 都可以与 GPU 加速一起使用,但它们需要进行一些调整。

首先,它们需要放置在 GPU 上。我们还需要调整我们的输入 - TensorFlow 要求放置在 GPU 上的每个输入也应在 GPU 上可用。大多数 tf.data.Dataset 都不能直接放置在 GPU 上 - 这对于我们构建的输入数据集也是如此。因此,我们需要手动将输入数据集传输到我们使用 tf.data.experimental.copy_to_deviceDALIDataset 放置到的 GPU 上。

请注意,GPU 放置的 DALIDataset 应该是最后一个数据集,并且不应由其他 tf.data.Dataset 类处理。

显式复制#

让我们首先将输入数据集传输到 /gpu:0。接下来,我们使用它们为 input_datasets 参数指定一个新的字典。最后,我们获取一个新的 DALI pipeline 实例,这次为 'gpu' 加速指定,并将其包装到 GPU 放置的 DALIDatasetWithInputs 中。

[11]:
input_dataset_gpu = input_dataset.apply(
    tf.data.experimental.copy_to_device("/gpu:0")
)
batch_dataset_gpu = batch_dataset.apply(
    tf.data.experimental.copy_to_device("/gpu:0")
)


two_inputs_spec_dict_gpu = {
    "input_from_tf_record": input_dataset_gpu,
    "input_from_generator": dali_tf.experimental.Input(
        batch_dataset_gpu, batch=True, layout="HWC"
    ),
}

pipe_gpu = pipeline_with_two_inputs("gpu")

with tf.device("/gpu:0"):
    dali_dataset_gpu = dali_tf.experimental.DALIDatasetWithInputs(
        pipeline=pipe_gpu,
        input_datasets=two_inputs_spec_dict_gpu,
        batch_size=batch_size,
        output_shapes=None,
        output_dtypes=tf.uint8,
        device_id=0,
    )
[12]:
display(run(dali_dataset_gpu))
../../../_images/examples_frameworks_tensorflow_tensorflow-dataset-inputs_26_0.png

处理外部源 source 参数#

在前面的示例中,我们使用了 TensorFlow 的 Generator Dataset (tf.data.Dataset.from_generator())。DALI 在外部源操作器中内置了类似的功能 - 可以通过 source 参数传递 Python 代码:回调或可迭代对象。您甚至可能已经以这种方式定义了一些 pipeline,这些 pipeline 已与其他深度学习框架一起使用。

DALIDatasetWithInputs 允许运行具有带 source 参数的外部源节点的 Pipeline,而无需进行其他更改。

它会自动检测 DALIDataset 的放置位置,并将 source 转换为放置在 CPU 上的 TensorFlow Generator Dataset。某些 限制 可能适用,但它支持 DALI 支持的所有类型的 source 参数

  • 可调用对象

  • 可迭代对象

  • 生成器函数

在批处理和按样本模式下。

此集成仅适用于返回 CPU 数据的 Python source

source 处理示例#

让我们从我们定义的第二个 pipeline 开始。这次,我们不会通过 input_datasets 从 TensorFlow 手动提供带有红色通道的 NumPy 数组,而是将生成器函数直接放入 DALI pipeline。

我们保留第一个输入数据集,以显示我们可以混合使用这两种方法。

我们在第二个 external_source 中设置 batch=False,因为生成器被定义为返回单独的样本。

[13]:
@pipeline_def(device_id=0, num_threads=4, batch_size=batch_size)
def pipeline_with_source(device):
    encoded = fn.external_source(name="input_from_tf_record", dtype=types.UINT8)
    # Now we don't need to name this node, we just pass the `source`
    red = fn.external_source(source=get_red, batch=False, dtype=types.FLOAT)
    if device == "gpu":
        red = red.gpu()
    images = fn.decoders.image(
        encoded, device="mixed" if device == "gpu" else "cpu"
    )
    resized = fn.resize(images, size=(300, 500))
    saturated_red = dmath.min(resized + red, 255.0)
    return fn.cast(saturated_red, dtype=types.UINT8)

现在我们不需要在 input_datasets 中传递任何其他详细信息来处理 fn.external_source(source=get_red) - 这将自动完成。

[14]:
inputs_spec_dict_gpu = {
    "input_from_tf_record": input_dataset_gpu,
}

pipe_source_gpu = pipeline_with_source("gpu")

with tf.device("/gpu:0"):
    dali_dataset_source = dali_tf.experimental.DALIDatasetWithInputs(
        pipeline=pipe_source_gpu,
        input_datasets=inputs_spec_dict_gpu,
        batch_size=batch_size,
        output_shapes=None,
        output_dtypes=tf.uint8,
        device_id=0,
    )

让我们运行示例。

[15]:
display(run(dali_dataset_source))
../../../_images/examples_frameworks_tensorflow_tensorflow-dataset-inputs_33_0.png

局限性#

由于当前的 TensorFlow 限制,当使用 GPU 放置的 DALIDataset 时,输入将作为 GPU 内存传递 - 在强制性的 copy_to_device 之后。当与 CPU 设备的外部源一起使用时:fn.external_source(device='cpu'),DALI 内部将有额外的 GPU 到 CPU 复制。也可以使用 device='gpu',删除复制。