DALI 数据集与外部源的输入#
在本教程中,我们将展示如何将外部数据输入与 DALIDataset
一起使用。以下部分演示了如何为 DALIDataset
提供来自其他 TensorFlow tf.data.Datasets 的数据,以及如何使用 DALI Exernal Source operator 与自定义 Python 代码来生成数据。
本教程假设您熟悉 DALIDataset API - 一种将 DALI Pipeline 作为 tf.data.Dataset
兼容对象运行的方式。您可以在关于 在带有 tf.data 的 DALIDataset 中使用 pipelines 和 在多个 GPU 上运行 DALIDataset 的教程中找到更多信息。
来自其他 tf.data.Datasets 的输入#
DALI 已经提供了一组专用的 reader operators,并允许通过 External Source 运算符在 pipeline 中指定自定义 Python 代码作为数据源。
包装到 TensorFlow 兼容 Dataset 对象中的 DALI Pipeline 可以将其他 tf.data.Dataset
对象作为输入,使您可以使用 DALI 处理使用 TensorFlow API 创建的输入
注意:DALIDataset 的输入目前处于实验阶段,API 可能会在 DALI 的未来版本中进行调整。
注意:目前通过 DALIDataset 的附加变体启用输入的使用,该变体在 DALI Tensorflow 插件中作为 ``experimental.DALIDatasetWithInputs`` 提供。
在以下段落中,我们将演示如何指定这些输入并指示它们在 DALI pipeline 中的位置。
选择输入数据集#
作为一个例子,我们将使用 tf.data.TFRecordReader,即使 DALI 有其自己的原生变体。我们将使用来自 DALI_extra - DALI 测试数据存储库的示例数据集。我们将从该 TFRecord 数据集中提取编码图像,并将它们传递给 DALI 进行解码。
DALI_EXTRA_PATH
环境变量应指向从 DALI extra repository 下载数据的位置。请确保已检出正确的发布标签。
Tensorflow Dataset#
首先,我们使用 tf.data
API 准备 TensorFlow Dataset
。我们需要为 TFRecordDataset
构造函数提供数据路径。
接下来,我们选择编码图像作为我们要提取的特征,并使用 tf.io.parse_single_example
和我们定义的 schema
进行解析。我们还使用 tf.io.decode_raw
将原始 tf.string
张量转换为类型化的数值张量 - DALI 可以使用它。
最后两个步骤将构成我们的 decode_fn
,我们将映射到 TFRecordDataset
上。我们定义了我们的 input_dataset
。
[1]:
import tensorflow as tf
tf.compat.v1.enable_eager_execution()
import os
# Path to TFRecord dataset
data_path = os.path.join(os.environ["DALI_EXTRA_PATH"], "db/tfrecord/train/")
# We want to extract the encoded images
images_key = "image/encoded"
# Schema for raw, encoded buffers
schema = {images_key: tf.io.FixedLenFeature([], tf.string, "")}
def decode_fn(record_bytes):
parsed_examples = tf.io.parse_single_example(record_bytes, schema)
encoded_image = tf.io.decode_raw(parsed_examples[images_key], tf.uint8)
return encoded_image
input_dataset = tf.data.TFRecordDataset(data_path).map(decode_fn)
DALI Pipeline#
现在我们需要定义一个 DALI Pipeline,它可以接受 tf.data.Dataset
作为输入,并解码该输入提供的缓冲区。
我们使用带有特定 name
参数的 external_source
运算符节点作为来自 Tensorflow 的输入的占位符节点。在本例中,我们将使用一个输入,但可以有更多输入,只要占位符 fn.external_source
节点具有唯一的名称。
[2]:
from nvidia.dali import pipeline_def
import nvidia.dali.fn as fn
import nvidia.dali.types as types
batch_size = 2
@pipeline_def(device_id=0, num_threads=4, batch_size=batch_size)
def pipeline_with_inputs(device):
encoded = fn.external_source(name="input_from_tf_record", dtype=types.UINT8)
images = fn.decoders.image(
encoded, device="mixed" if device == "gpu" else "cpu"
)
resized = fn.resize(images, size=(300, 500))
return resized
请注意,我们的 pipeline 可以使用 device
进行参数化,以便我们可以在 GPU 上加速解码。当我们使用 GPU 加速时,我们需要特别注意 DALIDataset 和输入的位置。
我们还将所有图像调整为相同大小,因为 TensorFlow 希望数据批次是统一的。
构建 DALIDataset#
现在让我们导入实验性的 DALIDatasetWithInputs
。它与 DALIDataset
的不同之处在于添加了额外的 input_datasets
参数,我们将字典传递给该参数,该字典将 external_source
节点的名称映射到输入 tf.data.Dataset
对象。
这里非常简单,因为我们只有一个 input_dataset
,我们想将其映射到 'input_from_tf_record'
,我们将它命名为 input_spec_dict
。
对于第一个示例,我们使用 CPU 放置。
[3]:
import nvidia.dali.plugin.tf as dali_tf
input_spec_dict = {"input_from_tf_record": input_dataset}
pipe = pipeline_with_inputs("cpu")
with tf.device("/cpu:0"):
dali_dataset = dali_tf.experimental.DALIDatasetWithInputs(
pipeline=pipe,
input_datasets=input_spec_dict,
batch_size=batch_size,
output_shapes=None,
output_dtypes=tf.uint8,
device_id=0,
)
收集结果#
由于我们以 eager 模式启动,我们可以在常规循环中运行我们的 DALIDataset,将结果转换为 NumPy 并可视化它们。
[4]:
def run(dali_dataset, iters=2):
result = []
for batch, i in zip(dali_dataset, range(iters)):
images = batch[0].numpy()
for sample_idx in range(batch_size):
result.append(images[sample_idx])
return result
[5]:
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import math
def display(outputs, columns=2, captions=None, cpu=False):
rows = int(math.ceil(len(outputs) / columns))
fig = plt.figure()
fig.set_size_inches(16, 6 * rows)
gs = gridspec.GridSpec(rows, columns)
row = 0
col = 0
for i in range(len(outputs)):
plt.subplot(gs[i])
plt.axis("off")
if captions is not None:
plt.title(captions[i])
plt.imshow(outputs[i])
[6]:
display(run(dali_dataset))

批处理和每样本模式#
还有一些其他事情需要考虑。TensorFlow Datasets 在“示例”上运行 - 这通常是单个密集张量。即使我们对某些 tf.data.Dataset
应用了 .batch(batch_size)
,结果值也将是张量,最外层维度等于 batch_size
。
DALI 通过设计在数据批次上运行。在将来自 TensorFlow 的输入提供给 DALI 时,我们可以指定特定的 external_source
节点是否应查询其输入的批次或单独的样本。在后一种情况下,DALI 将查询输入 batch_size
次以构建批次 - 无需手动执行此操作。
通过 DALIDatasetWithInputs
的 input_datasets
参数传递 tf.data.Dataset
会自动指示此输入应被视为每样本。
为了展示 DALIDataset 可以处理不同模式下的多个输入,我们将创建一个额外的 dataset 输入,这次以批处理模式运行。
生成器 Dataset#
我们使用带有生成器 dataset 的简单生成器,以返回一个 NumPy 数组,该数组逐渐变红。接下来,我们使用 Batch dataset,以展示如何从 TensorFlow 向 DALI 指定批处理数据源。
我们还强调了生成器 Dataset 只能放置在 CPU 上的事实。
[7]:
import numpy as np
def get_red():
current_red = 64
max_red = 255
while True:
yield np.full((300, 500, 3), (current_red, 0, 0), dtype=np.float32)
current_red = min(current_red + 1, max_red)
signature = tf.TensorSpec(shape=(300, 500, 3), dtype=tf.float32)
with tf.device("/cpu:0"):
batch_dataset = tf.data.Dataset.from_generator(
get_red, output_signature=signature
).batch(batch_size)
具有两个输入的 Pipeline#
现在,让我们通过添加第二个 external_source
占位符节点来表示第二个输入来调整 pipeline,并将其用于某些处理。
[8]:
import nvidia.dali.math as dmath
import nvidia.dali.types as types
@pipeline_def(device_id=0, num_threads=4, batch_size=batch_size)
def pipeline_with_two_inputs(device):
encoded = fn.external_source(name="input_from_tf_record", dtype=types.UINT8)
red = fn.external_source(name="input_from_generator", dtype=types.FLOAT)
if device == "gpu":
red = red.gpu()
images = fn.decoders.image(
encoded, device="mixed" if device == "gpu" else "cpu"
)
resized = fn.resize(images, size=(300, 500))
saturated_red = dmath.min(resized + red, 255.0)
return fn.cast(saturated_red, dtype=types.UINT8)
指定批处理输入#
为了通知 DALI 我们传递了一个表示批次的输入,我们需要与该输入一起传递一些额外的选项。对于这种情况,我们可以使用 nvidia.dal.plugin.tf.experimental.Input
。
Input
类接受几个参数:我们想要传递选项的 tf.data.Dataset
输入,以及可以覆盖 external_source
节点定义中存在的参数的 layout
和 batch
- 为它们中的任何一个提供 None
将使 DALI 使用 DALI pipeline 中定义的那个。我们将设置 batch=True
以选择批处理模式。此外,我们可以为 DALI 指定 'HWC'
布局。
让我们通过扩展 input_datasets
字典规范来更新我们的示例。
[9]:
two_inputs_spec_dict = {
"input_from_tf_record": input_dataset,
"input_from_generator": dali_tf.experimental.Input(
batch_dataset, batch=True, layout="HWC"
),
}
pipe_batched_input = pipeline_with_two_inputs("cpu")
with tf.device("/cpu:0"):
batched_dali_dataset = dali_tf.experimental.DALIDatasetWithInputs(
pipeline=pipe_batched_input,
input_datasets=two_inputs_spec_dict,
batch_size=batch_size,
output_shapes=None,
output_dtypes=tf.uint8,
device_id=0,
)
[10]:
display(run(batched_dali_dataset))

或者,对于 'input_from_generator'
,我们可以使用带有 batch=None
的 Input
类,并将 batch=True
传递给 Pipeline 定义中具有该名称的 fn.external_source
,使其成为 fn.external_source(name='input_from_generator', batch=True)
。请注意,如果为 Input
类提供 None
选项,则该选项取自 Pipeline 定义。
GPU 放置#
如果您想充分利用 DALI 的潜力,则需要利用 GPU 加速。我们准备的两个示例 pipeline 都可以与 GPU 加速一起使用,但它们需要进行一些调整。
首先,它们需要放置在 GPU 上。我们还需要调整我们的输入 - TensorFlow 要求 GPU 放置的每个输入也应在 GPU 上可用。大多数 tf.data.Dataset
都不能直接放置在 GPU 上 - 这对于我们构建的输入 dataset 也是如此。因此,我们需要手动将输入 dataset 传输到我们使用 tf.data.experimental.copy_to_device 放置 DALIDataset
的 GPU 上。
请注意,GPU 放置的 DALIDataset 应该是最后一个 dataset,并且不应被其他 tf.data.Dataset
类处理。
显式复制#
让我们从将输入 dataset 传输到 /gpu:0
开始。接下来,我们使用它们为 input_datasets
参数指定一个新的字典。最后,我们获取一个新的 DALI pipeline 实例,这次指定用于 'gpu'
加速,并将其包装到 GPU 放置的 DALIDatasetWithInputs 中。
[11]:
input_dataset_gpu = input_dataset.apply(
tf.data.experimental.copy_to_device("/gpu:0")
)
batch_dataset_gpu = batch_dataset.apply(
tf.data.experimental.copy_to_device("/gpu:0")
)
two_inputs_spec_dict_gpu = {
"input_from_tf_record": input_dataset_gpu,
"input_from_generator": dali_tf.experimental.Input(
batch_dataset_gpu, batch=True, layout="HWC"
),
}
pipe_gpu = pipeline_with_two_inputs("gpu")
with tf.device("/gpu:0"):
dali_dataset_gpu = dali_tf.experimental.DALIDatasetWithInputs(
pipeline=pipe_gpu,
input_datasets=two_inputs_spec_dict_gpu,
batch_size=batch_size,
output_shapes=None,
output_dtypes=tf.uint8,
device_id=0,
)
[12]:
display(run(dali_dataset_gpu))

处理 External Source source
参数#
在前面的示例中,我们使用了 TensorFlow 的生成器 Dataset (tf.data.Dataset.from_generator()
)。DALI 在 External Source 运算符中内置了类似的功能 - 可以通过 source
参数传递 Python 代码:回调或可迭代对象。您甚至可能已经以这种方式定义了一些 pipeline,您已将其与其他深度学习框架一起使用。
DALIDatasetWithInputs
允许运行具有带有 source
参数的 External Source 节点的 Pipeline,而无需进行额外的更改。
它会自动检测 DALIDataset
的放置位置,并将 source
转换为放置在 CPU 上的 TensorFlow 生成器 Dataset。某些 限制 可能适用,但它支持 DALI 支持的所有类型的 source
参数
可调用对象
可迭代对象
生成器函数
在批处理和每样本模式下。
此集成仅适用于返回 CPU 数据的 Python source
。
source
处理示例#
让我们从我们定义的第二个 pipeline 开始。这一次,我们将生成器函数直接放入 DALI pipeline 中,而不是通过 input_datasets
手动提供来自 TensorFlow 的带有红色通道的 NumPy 数组。
我们保留第一个输入 dataset,以表明我们可以混合使用这两种方法。
我们在第二个 external_source
中设置 batch=False
,因为生成器被定义为返回单独的样本。
[13]:
@pipeline_def(device_id=0, num_threads=4, batch_size=batch_size)
def pipeline_with_source(device):
encoded = fn.external_source(name="input_from_tf_record", dtype=types.UINT8)
# Now we don't need to name this node, we just pass the `source`
red = fn.external_source(source=get_red, batch=False, dtype=types.FLOAT)
if device == "gpu":
red = red.gpu()
images = fn.decoders.image(
encoded, device="mixed" if device == "gpu" else "cpu"
)
resized = fn.resize(images, size=(300, 500))
saturated_red = dmath.min(resized + red, 255.0)
return fn.cast(saturated_red, dtype=types.UINT8)
现在我们不需要在 input_datasets
中传递任何其他详细信息来处理 fn.external_source(source=get_red)
- 这将自动完成。
[14]:
inputs_spec_dict_gpu = {
"input_from_tf_record": input_dataset_gpu,
}
pipe_source_gpu = pipeline_with_source("gpu")
with tf.device("/gpu:0"):
dali_dataset_source = dali_tf.experimental.DALIDatasetWithInputs(
pipeline=pipe_source_gpu,
input_datasets=inputs_spec_dict_gpu,
batch_size=batch_size,
output_shapes=None,
output_dtypes=tf.uint8,
device_id=0,
)
让我们运行该示例。
[15]:
display(run(dali_dataset_source))

局限性#
由于当前的 TensorFlow 限制,当使用 GPU 放置的 DALIDataset 时,输入将在强制 copy_to_device
之后作为 GPU 内存传递。当与 CPU 设备一起使用 External Source 时:fn.external_source(device='cpu')
,DALI 内部将有额外的 GPU 到 CPU 复制。也可以使用 device='gpu'
,从而消除复制。