外部源算子 - 基本用法#
在本示例中,我们将向您展示如何使用 ExternalSource
算子,以便您可以使用外部数据源作为 pipeline 的输入。
[1]:
import types
import collections
import numpy as np
from random import shuffle
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.fn as fn
import nvidia.dali.types as types
batch_size = 16
定义数据源#
在本示例中,我们将使用无限迭代器作为数据源。
[2]:
class ExternalInputIterator(object):
def __init__(self, batch_size):
self.images_dir = "../../data/images/"
self.batch_size = batch_size
with open(self.images_dir + "file_list.txt", "r") as f:
self.files = [line.rstrip() for line in f if line != ""]
shuffle(self.files)
def __iter__(self):
self.i = 0
self.n = len(self.files)
return self
def __next__(self):
batch = []
labels = []
for _ in range(self.batch_size):
jpeg_filename, label = self.files[self.i].split(" ")
f = open(self.images_dir + jpeg_filename, "rb")
batch.append(np.frombuffer(f.read(), dtype=np.uint8))
labels.append(np.array([label], dtype=np.uint8))
self.i = (self.i + 1) % self.n
return (batch, labels)
警告
当 pipeline 启用条件执行时,必须采取额外的步骤来防止 source
被 AutoGraph 重写。 有两种方法可以实现这一点
在全局范围内定义函数(即在
pipeline_def
范围之外)。如果函数是另一个“工厂”函数的结果,则工厂函数必须在 pipeline 定义函数之外定义,并使用
<nvidia.dali.pipeline.do_not_convert>
进行装饰。
更多详细信息可以在 nvidia.dali.pipeline.do_not_convert
文档中找到。
定义 Pipeline#
下一步是定义 Pipeline。
ExternalSource
算子接受可迭代对象或可调用对象。 如果源提供多个输出(例如图像和标签),则还必须将该数量指定为 num_outputs
参数。
在内部,当需要更多数据以保持 pipeline 运行时,pipeline 将调用 source
(如果可调用)或运行 next(source)
(如果可迭代)。
[3]:
eii = ExternalInputIterator(batch_size)
[4]:
pipe = Pipeline(batch_size=batch_size, num_threads=2, device_id=0)
with pipe:
jpegs, labels = fn.external_source(
source=eii, num_outputs=2, dtype=types.UINT8
)
decode = fn.decoders.image(jpegs, device="mixed", output_type=types.RGB)
enhance = fn.brightness_contrast(decode, contrast=2)
pipe.set_outputs(enhance, labels)
使用 Pipeline#
[5]:
pipe.build()
pipe_out = pipe.run()
在这里,标签仍然在内存中,并且不需要调用 as_cpu
来显示标签。
[6]:
batch_cpu = pipe_out[0].as_cpu()
labels_cpu = pipe_out[1]
[7]:
import matplotlib.pyplot as plt
img = batch_cpu.at(2)
print(img.shape)
print(labels_cpu.at(2))
plt.axis("off")
plt.imshow(img)
(427, 640, 3)
[1]
[7]:
<matplotlib.image.AxesImage at 0x7f65d55db7f0>

与 GPU 输入交互#
外部源算子还可以接受来自 CuPy 或任何其他支持 cuda array interface 的数据源的 GPU 数据。 对于本示例,我们创建了 ExternalInputGpuIterator
,它返回 GPU 上的数据。 由于 decoders.image
不接受 GPU 上的数据,我们需要在 DALI 外部的 CPU 上解码它,然后将其移动到 GPU。 通常,由于另一个库的操作,图像或其他数据将已在 GPU 上。
[8]:
import cupy as cp
import imageio
class ExternalInputGpuIterator(object):
def __init__(self, batch_size):
self.images_dir = "../../data/images/"
self.batch_size = batch_size
with open(self.images_dir + "file_list.txt", "r") as f:
self.files = [line.rstrip() for line in f if line != ""]
shuffle(self.files)
def __iter__(self):
self.i = 0
self.n = len(self.files)
return self
def __next__(self):
batch = []
labels = []
for _ in range(self.batch_size):
jpeg_filename, label = self.files[self.i].split(" ")
im = imageio.imread(self.images_dir + jpeg_filename)
im = cp.asarray(im)
im = im * 0.6
batch.append(im.astype(cp.uint8))
labels.append(cp.array([label], dtype=np.uint8))
self.i = (self.i + 1) % self.n
return (batch, labels)
让我们通过使用 GPU 版本的 ExternalSource 算子并移除解码来修改之前的 pipeline
注意:我们假设原始图像已在 GPU 上。
[9]:
eii_gpu = ExternalInputGpuIterator(batch_size)
print(type(next(iter(eii_gpu))[0][0]))
<class 'cupy.core.core.ndarray'>
[10]:
pipe_gpu = Pipeline(batch_size=batch_size, num_threads=2, device_id=0)
with pipe_gpu:
images, labels = fn.external_source(
source=eii_gpu, num_outputs=2, device="gpu", dtype=types.UINT8
)
enhance = fn.brightness_contrast(images, contrast=2)
pipe_gpu.set_outputs(enhance, labels)
pipe_gpu.build()
可视化结果
[11]:
pipe_out_gpu = pipe_gpu.run()
batch_gpu = pipe_out_gpu[0].as_cpu()
labels_gpu = pipe_out_gpu[1].as_cpu()
img = batch_gpu.at(2)
print(img.shape)
print(labels_cpu.at(2))
plt.axis("off")
plt.imshow(img)
(480, 640, 3)
[1]
[11]:
<matplotlib.image.AxesImage at 0x7f65d55a0ba8>
