入门#
概述#
NVIDIA 数据加载库 (DALI) 是高度优化的构建块和执行引擎的集合,可加速计算机视觉和音频深度学习应用的数据 pipeline。
深度学习框架提供的输入和增强 pipeline 通常分为两类
快速,但不灵活 - 用 C++ 编写,它们作为单个单体式 Python 对象公开,具有非常特定的操作集及其提供的操作顺序
慢速,但灵活 - 用 C++ 或 Python 编写的构建块集合,可用于组成最终速度较慢的任意数据 pipeline。 这种类型的数据 pipeline 的最大开销之一是 Python 中的全局解释器锁 (GIL)。 这迫使开发人员使用多处理,从而使高效输入 pipeline 的设计复杂化。
DALI 的突出之处在于,它既提供了性能,又提供了加速不同数据 pipeline 的灵活性。 它通过公开使用简单高效的引擎执行的优化构建块,并将操作卸载到 GPU(从而实现扩展到多 GPU 系统)来实现这一点。
它是一个单一的库,可以轻松集成到不同的深度学习训练和推理应用中。
DALI 在启用 GPU 的系统上提供易用性和灵活性,具有直接的框架插件、多种输入数据格式和可配置的图。 由于 CPU 周期限制导致 I/O pipeline 成为瓶颈,DALI 可以帮助加速深度学习工作流程的整体速度。 通常,GPU 与 CPU 比率高的系统(例如 Amazon EC2 P3.16xlarge、NVIDIA DGX1-V 或 NVIDIA DGX-2)在主机 CPU 上受到限制,从而未充分利用可用的 GPU 计算能力。 DALI 显着加速了此类密集 GPU 配置上的输入处理,以实现整体吞吐量。
Pipeline#
使用 DALI 进行数据处理的核心在于数据处理 pipeline 的概念。 它由以有向图形式连接的多个操作组成,并包含在类 nvidia.dali.Pipeline 的对象中。 此类提供了定义、构建和运行数据处理 pipeline 所需的函数。
[1]:
from nvidia.dali.pipeline import Pipeline
定义 Pipeline#
让我们从定义一个非常简单的 pipeline 来进行分类任务开始,以确定图片是否包含狗或小猫。 我们准备了一个目录结构,其中包含狗和小猫的图片 在我们的存储库中。
我们的简单 pipeline 将从此目录中读取图像,解码它们并返回(图像,标签)对。
创建 pipeline 的最简单方法是使用 pipeline_def 装饰器。 在 simple_pipeline 函数中,我们定义要执行的操作以及它们之间计算的流程。
使用
fn.readers.file从硬盘驱动器读取 jpeg(编码图像)和标签。使用
fn.decoders.image操作将图像从 jpeg 解码为 RGB。指定应将哪些中间变量作为 pipeline 的输出返回。
有关 pipeline_def 的更多信息,请查看 文档。
[2]:
from nvidia.dali import pipeline_def
import nvidia.dali.fn as fn
import nvidia.dali.types as types
image_dir = "data/images"
max_batch_size = 8
@pipeline_def
def simple_pipeline():
jpegs, labels = fn.readers.file(file_root=image_dir)
images = fn.decoders.image(jpegs, device="cpu")
return images, labels
构建 Pipeline#
为了使用用 simple_pipeline 定义的 pipeline,我们需要实例化它。 这可以通过调用 simple_pipeline 来实现,这将创建 pipeline 的实例。
[3]:
pipe = simple_pipeline(batch_size=max_batch_size, num_threads=1, device_id=0)
请注意,用 pipeline_def 装饰函数会向其添加新的命名参数。 它们可用于控制 pipeline 的各个方面,例如
最大批处理大小,
用于在 CPU 上执行计算的线程数,
要使用的 GPU 设备(用
simple_pipeline创建的 pipeline 尚未使用 GPU 进行计算),随机数生成的种子。
有关 Pipeline 参数的更多信息,您可以查看 Pipeline 文档。
运行 Pipeline#
创建 pipeline 实例后,我们可以运行它以获取一批结果。
[4]:
pipe_out = pipe.run()
print(pipe_out)
(TensorListCPU(
[[[[255 255 255]
[255 255 255]
...
[ 86 46 55]
[ 86 46 55]]
[[255 255 255]
[255 255 255]
...
[ 86 46 55]
[ 86 46 55]]
...
[[158 145 154]
[158 147 155]
...
[ 93 38 41]
[ 93 38 41]]
[[157 145 155]
[158 146 156]
...
[ 93 38 41]
[ 93 38 41]]]
[[[ 69 77 80]
[ 69 77 80]
...
[ 97 105 108]
[ 97 105 108]]
[[ 69 77 80]
[ 70 78 81]
...
[ 97 105 108]
[ 97 105 108]]
...
[[199 203 206]
[199 203 206]
...
[206 210 213]
[206 210 213]]
[[199 203 206]
[199 203 206]
...
[206 210 213]
[206 210 213]]]
...
[[[ 26 28 25]
[ 26 28 25]
...
[ 34 39 33]
[ 34 39 33]]
[[ 26 28 25]
[ 26 28 25]
...
[ 34 39 33]
[ 34 39 33]]
...
[[ 35 46 30]
[ 36 47 31]
...
[114 99 106]
[127 114 121]]
[[ 35 46 30]
[ 35 46 30]
...
[107 92 99]
[112 97 102]]]
[[[182 185 132]
[180 183 128]
...
[ 98 103 9]
[ 97 102 8]]
[[180 183 130]
[179 182 127]
...
[ 93 98 4]
[ 91 96 2]]
...
[[ 69 111 71]
[ 68 111 66]
...
[147 159 121]
[148 163 124]]
[[ 64 109 68]
[ 64 110 64]
...
[113 123 88]
[104 116 80]]]],
dtype=DALIDataType.UINT8,
layout="HWC",
num_samples=8,
shape=[(427, 640, 3),
(427, 640, 3),
(425, 640, 3),
(480, 640, 3),
(485, 640, 3),
(427, 640, 3),
(409, 640, 3),
(427, 640, 3)]), TensorListCPU(
[[0]
[0]
[0]
[0]
[0]
[0]
[0]
[0]],
dtype=DALIDataType.INT32,
num_samples=8,
shape=[(1,), (1,), (1,), (1,), (1,), (1,), (1,), (1,)]))
pipeline 的输出(我们已将其保存到 pipe_out 变量)是一个包含 2 个元素的元组(正如预期的那样 - 我们在 simple_pipeline 函数中指定了 2 个输出)。 这两个元素都是 TensorListCPU 对象 - 每个对象都包含 CPU 张量列表。
为了显示结果(仅用于调试目的 - 在实际训练期间我们不会执行此步骤,因为它会使我们的一批图像从 GPU 往返到 CPU 并返回),我们可以将我们的数据从 DALI 的张量发送到 NumPy 数组。 并非每个 TensorList 都可以通过这种方式访问 - TensorList 比 NumPy 数组更通用,并且可以保存具有不同形状的张量。 为了检查我们是否可以直接将其发送到 NumPy,我们可以调用 TensorList 的 is_dense_tensor 函数
[5]:
images, labels = pipe_out
print("Images is_dense_tensor: " + str(images.is_dense_tensor()))
print("Labels is_dense_tensor: " + str(labels.is_dense_tensor()))
Images is_dense_tensor: False
Labels is_dense_tensor: True
事实证明,包含标签的 TensorList 可以用张量表示,而包含图像的 TensorList 则不能。
让我们看看返回标签的形状和内容。
[6]:
print(labels)
TensorListCPU(
[[0]
[0]
[0]
[0]
[0]
[0]
[0]
[0]],
dtype=DALIDataType.INT32,
num_samples=8,
shape=[(1,), (1,), (1,), (1,), (1,), (1,), (1,), (1,)])
为了查看图像,我们需要循环访问 TensorList 中包含的所有张量,使用其 at 方法访问。
[7]:
import matplotlib.gridspec as gridspec
import matplotlib.pyplot as plt
%matplotlib inline
def show_images(image_batch):
columns = 4
rows = (max_batch_size + 1) // (columns)
fig = plt.figure(figsize=(24, (24 // columns) * rows))
gs = gridspec.GridSpec(rows, columns)
for j in range(rows * columns):
plt.subplot(gs[j])
plt.axis("off")
plt.imshow(image_batch.at(j))
[8]:
show_images(images)
添加增强#
随机混洗#
正如我们可以从上面的示例中看到的那样,我们的 pipeline 返回的第一批图像仅包含狗。 这是因为我们没有混洗我们的数据集,因此 fn.readers.file 以字典顺序返回图像。
让我们创建一个新的 pipeline,它将改变这一点。
[9]:
@pipeline_def
def shuffled_pipeline():
jpegs, labels = fn.readers.file(
file_root=image_dir, random_shuffle=True, initial_fill=21
)
images = fn.decoders.image(jpegs, device="cpu")
return images, labels
我们对 simple_pipeline 进行了 2 处更改以获得 shuffled_pipeline - 我们向 fn.readers.file 操作添加了 2 个参数
random_shuffle启用读取器中图像的混洗。 混洗是通过使用从磁盘读取的图像缓冲区来执行的。 当要求读取器提供下一张图像时,它会从缓冲区中随机选择一张图像,输出它,并立即用新读取的图像替换缓冲区中的该位置。initial_fill设置缓冲区的容量。 此参数的默认值 (1000) 非常适合包含数千个示例的数据集,但对于我们非常小的数据集(仅包含 21 张图像)来说太大了。 这可能会导致返回的批次中频繁出现重复项。 这就是为什么在本示例中我们将其设置为数据集的大小。
让我们测试此修改的结果。
[10]:
pipe = shuffled_pipeline(
batch_size=max_batch_size, num_threads=1, device_id=0, seed=1234
)
[11]:
pipe_out = pipe.run()
images, labels = pipe_out
show_images(images)
现在,pipeline 返回的图像已正确混洗。
增强#
DALI 不仅可以从磁盘读取图像并将它们批处理到张量中,还可以对这些图像执行各种增强,以改善深度学习训练结果。
这种增强的一个示例是旋转。 让我们创建一个新的 pipeline,它在输出图像之前旋转图像。
[12]:
@pipeline_def
def rotated_pipeline():
jpegs, labels = fn.readers.file(
file_root=image_dir, random_shuffle=True, initial_fill=21
)
images = fn.decoders.image(jpegs, device="cpu")
rotated_images = fn.rotate(images, angle=10.0, fill_value=0)
return rotated_images, labels
为此,我们在 pipeline 中添加了一个新操作:fn.rotate。
正如我们在 文档 中看到的那样,rotate 可以接受多个参数,但除了 input 之外,只需要一个参数 - angle 告诉操作员它应该旋转图像多少度。 我们还指定了 fill_value 以更好地可视化结果。
让我们测试新创建的 pipeline
[13]:
pipe = rotated_pipeline(
batch_size=max_batch_size, num_threads=1, device_id=0, seed=1234
)
pipe.build()
[14]:
pipe_out = pipe.run()
images, labels = pipe_out
show_images(images)
张量作为参数和随机数生成#
将每张图像旋转 10 度并不是那么有趣。 为了进行有意义的增强,我们希望操作员在给定范围内以随机角度旋转我们的图像。
Rotate 的 angle 参数可以接受 float 或 float tensor 类型的数值。 第二个选项 float tensor 使我们能够通过其他操作生成的张量,为每个图像提供不同的旋转角度。
随机数生成器是可以与 DALI 一起使用的操作示例。 让我们使用 fn.random.uniform 创建一个以随机角度旋转图像的 pipeline。
注意
请记住,每次将 DALI 操作员的输出作为命名关键字参数传递给另一个操作员时,数据都必须放置在 CPU 上。 在下面的示例中,我们使用
random.uniform的输出(其默认设备为“cpu”)作为rotate的angle关键字参数。DALI 中的此类参数称为“参数输入”。 有关它们的更多信息,请参见 pipeline 文档部分。
常规输入(非命名、位置输入)没有此类约束,并且可以使用 CPU 或 GPU 数据,如下所示。
[15]:
@pipeline_def
def random_rotated_pipeline():
jpegs, labels = fn.readers.file(
file_root=image_dir, random_shuffle=True, initial_fill=21
)
images = fn.decoders.image(jpegs, device="cpu")
angle = fn.random.uniform(range=(-10.0, 10.0))
rotated_images = fn.rotate(images, angle=angle, fill_value=0)
return rotated_images, labels
这次,我们没有为 angle 参数提供固定值,而是将其设置为 fn.random.uniform 操作符的输出。
让我们检查结果
[16]:
pipe = random_rotated_pipeline(
batch_size=max_batch_size, num_threads=1, device_id=0, seed=1234
)
[17]:
pipe_out = pipe.run()
images, labels = pipe_out
show_images(images)
这次,旋转角度是从值范围中随机选择的。
添加 GPU 加速#
DALI 提供对 GPU 加速操作符的访问,这可以提高输入和增强 pipeline 的速度,并使其扩展到多 GPU 系统。
将张量复制到 GPU#
让我们修改先前 random_rotated_pipeline 的示例,以使用 GPU 进行旋转。
[18]:
@pipeline_def
def random_rotated_gpu_pipeline():
jpegs, labels = fn.readers.file(
file_root=image_dir, random_shuffle=True, initial_fill=21
)
images = fn.decoders.image(jpegs, device="cpu")
angle = fn.random.uniform(range=(-10.0, 10.0))
rotated_images = fn.rotate(images.gpu(), angle=angle, fill_value=0)
return rotated_images, labels
为了告诉 DALI 我们要使用 GPU,我们只需要对 pipeline 进行一项更改。 我们将 rotate 操作的输入从 CPU 上的张量 images 更改为将其复制到 GPU 的 images.gpu()。
[19]:
pipe = random_rotated_gpu_pipeline(
batch_size=max_batch_size, num_threads=1, device_id=0, seed=1234
)
pipe.build()
[20]:
pipe_out = pipe.run()
print(pipe_out)
(TensorListGPU(
[[[[0 0 0]
[0 0 0]
...
[0 0 0]
[0 0 0]]
[[0 0 0]
[0 0 0]
...
[0 0 0]
[0 0 0]]
...
[[0 0 0]
[0 0 0]
...
[0 0 0]
[0 0 0]]
[[0 0 0]
[0 0 0]
...
[0 0 0]
[0 0 0]]]
[[[0 0 0]
[0 0 0]
...
[0 0 0]
[0 0 0]]
[[0 0 0]
[0 0 0]
...
[0 0 0]
[0 0 0]]
...
[[0 0 0]
[0 0 0]
...
[0 0 0]
[0 0 0]]
[[0 0 0]
[0 0 0]
...
[0 0 0]
[0 0 0]]]
...
[[[0 0 0]
[0 0 0]
...
[0 0 0]
[0 0 0]]
[[0 0 0]
[0 0 0]
...
[0 0 0]
[0 0 0]]
...
[[0 0 0]
[0 0 0]
...
[0 0 0]
[0 0 0]]
[[0 0 0]
[0 0 0]
...
[0 0 0]
[0 0 0]]]
[[[0 0 0]
[0 0 0]
...
[0 0 0]
[0 0 0]]
[[0 0 0]
[0 0 0]
...
[0 0 0]
[0 0 0]]
...
[[0 0 0]
[0 0 0]
...
[0 0 0]
[0 0 0]]
[[0 0 0]
[0 0 0]
...
[0 0 0]
[0 0 0]]]],
dtype=DALIDataType.UINT8,
layout="HWC",
num_samples=8,
shape=[(583, 710, 3),
(477, 682, 3),
(482, 642, 3),
(761, 736, 3),
(467, 666, 3),
(449, 654, 3),
(510, 662, 3),
(463, 664, 3)]), TensorListCPU(
[[0]
[0]
[1]
[1]
[0]
[1]
[0]
[0]],
dtype=DALIDataType.INT32,
num_samples=8,
shape=[(1,), (1,), (1,), (1,), (1,), (1,), (1,), (1,)]))
pipe_out 仍然包含 2 个 TensorList,但这次第一个输出,即 rotate 操作的结果,在 GPU 上。 我们无法直接从 CPU 访问 TensorListGPU 的内容,因此为了可视化结果,我们需要使用 as_cpu 方法将其复制到 CPU。
[21]:
images, labels = pipe_out
show_images(images.as_cpu())
重要提示#
DALI 中的默认执行器不允许 CPU 操作符跟随 GPU 操作符。 要启用更灵活的 pipeline,请将 exec_dynamic=True 传递给 pipeline 构造函数或 @pipeline_def 装饰器。
混合解码#
有时,特别是对于较高分辨率的图像,解码以 JPEG 格式存储的图像可能会成为瓶颈。 为了解决这个问题,开发了 nvJPEG 和 nvJPEG2000 库。 它们在 CPU 和 GPU 之间拆分解码过程,从而显着减少了解码时间。
在 fn.decoders.image 中指定“混合”设备参数可启用 nvJPEG 和 nvJPEG2000 支持。 其他文件格式仍在 CPU 上解码。
[22]:
@pipeline_def
def hybrid_pipeline():
jpegs, labels = fn.readers.file(
file_root=image_dir, random_shuffle=True, initial_fill=21
)
images = fn.decoders.image(jpegs, device="mixed")
return images, labels
带有 device=mixed 的 fn.decoders.image 使用计算的混合方法,该方法同时使用 CPU 和 GPU。 这意味着它接受 CPU 输入,但返回 GPU 输出。 这就是为什么从 pipeline 返回的 images 对象是 TensorListGPU 类型的原因。
[23]:
pipe = hybrid_pipeline(
batch_size=max_batch_size, num_threads=1, device_id=0, seed=1234
)
pipe.build()
[24]:
pipe_out = pipe.run()
images, labels = pipe_out
show_images(images.as_cpu())
让我们通过测量具有 4 个 CPU 线程的 shuffled_pipeline 和 hybrid_pipeline 的速度来比较“cpu”和“mixed”后端的 fn.decoders.image 的速度。
[25]:
from timeit import default_timer as timer
test_batch_size = 64
def speedtest(pipeline, batch, n_threads):
pipe = pipeline(batch_size=batch, num_threads=n_threads, device_id=0)
pipe.build()
# warmup
for i in range(5):
pipe.run()
# test
n_test = 20
t_start = timer()
for i in range(n_test):
pipe.run()
t = timer() - t_start
print("Speed: {} imgs/s".format((n_test * batch) / t))
[26]:
speedtest(shuffled_pipeline, test_batch_size, 4)
Speed: 2597.527149961429 imgs/s
[27]:
speedtest(hybrid_pipeline, test_batch_size, 4)
Speed: 5828.851662794091 imgs/s
正如我们所见,使用 GPU 加速解码带来了显着的加速。