外部源算子 - 基本用法#

在此示例中,我们将向您展示如何使用 ExternalSource 算子,以便您可以使用外部数据源作为 pipeline 的输入。

[1]:
import types
import collections
import numpy as np
from random import shuffle
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.fn as fn
import nvidia.dali.types as types

batch_size = 16

定义数据源#

在此示例中,我们将使用无限迭代器作为数据源。

[2]:
class ExternalInputIterator(object):
    def __init__(self, batch_size):
        self.images_dir = "../../data/images/"
        self.batch_size = batch_size
        with open(self.images_dir + "file_list.txt", "r") as f:
            self.files = [line.rstrip() for line in f if line != ""]
        shuffle(self.files)

    def __iter__(self):
        self.i = 0
        self.n = len(self.files)
        return self

    def __next__(self):
        batch = []
        labels = []
        for _ in range(self.batch_size):
            jpeg_filename, label = self.files[self.i].split(" ")
            f = open(self.images_dir + jpeg_filename, "rb")
            batch.append(np.frombuffer(f.read(), dtype=np.uint8))
            labels.append(np.array([label], dtype=np.uint8))
            self.i = (self.i + 1) % self.n
        return (batch, labels)

警告

当 pipeline 启用条件执行时,必须采取额外的步骤来防止 source 被 AutoGraph 重写。 有两种方法可以实现这一点

  1. 在全局范围内定义函数(即在 pipeline_def 范围之外)。

  2. 如果函数是另一个“工厂”函数的结果,则工厂函数必须在 pipeline 定义函数之外定义,并使用 <nvidia.dali.pipeline.do_not_convert> 修饰。

更多详细信息可以在 nvidia.dali.pipeline.do_not_convert 文档中找到。

定义 Pipeline#

下一步是定义 Pipeline。

ExternalSource 算子接受可迭代对象或可调用对象。 如果源提供多个输出(例如图像和标签),则还必须将该数量指定为 num_outputs 参数。

在内部,当需要更多数据以保持 pipeline 运行时,pipeline 将调用 source(如果可调用)或运行 next(source)(如果可迭代)。

[3]:
eii = ExternalInputIterator(batch_size)
[4]:
pipe = Pipeline(batch_size=batch_size, num_threads=2, device_id=0)
with pipe:
    jpegs, labels = fn.external_source(
        source=eii, num_outputs=2, dtype=types.UINT8
    )
    decode = fn.decoders.image(jpegs, device="mixed", output_type=types.RGB)
    enhance = fn.brightness_contrast(decode, contrast=2)
    pipe.set_outputs(enhance, labels)

使用 Pipeline#

[5]:
pipe.build()
pipe_out = pipe.run()

在这里,标签仍然在内存中,无需调用 as_cpu 即可显示标签。

[6]:
batch_cpu = pipe_out[0].as_cpu()
labels_cpu = pipe_out[1]
[7]:
import matplotlib.pyplot as plt

img = batch_cpu.at(2)
print(img.shape)
print(labels_cpu.at(2))
plt.axis("off")
plt.imshow(img)
(427, 640, 3)
[1]
[7]:
<matplotlib.image.AxesImage at 0x7f65d55db7f0>
../../../_images/examples_general_data_loading_external_input_12_2.png

与 GPU 输入交互#

外部源算子还可以接受来自 CuPy 或任何其他支持cuda 数组接口的 GPU 数据。 对于此示例,我们创建了 ExternalInputGpuIterator,它返回 GPU 上的数据。 由于 decoders.image 不接受 GPU 上的数据,我们需要在 CPU 上 DALI 外部解码它,然后将其移动到 GPU。 通常,由于另一个库的操作,图像或其它数据将已在 GPU 上。

[8]:
import cupy as cp
import imageio


class ExternalInputGpuIterator(object):
    def __init__(self, batch_size):
        self.images_dir = "../../data/images/"
        self.batch_size = batch_size
        with open(self.images_dir + "file_list.txt", "r") as f:
            self.files = [line.rstrip() for line in f if line != ""]
        shuffle(self.files)

    def __iter__(self):
        self.i = 0
        self.n = len(self.files)
        return self

    def __next__(self):
        batch = []
        labels = []
        for _ in range(self.batch_size):
            jpeg_filename, label = self.files[self.i].split(" ")
            im = imageio.imread(self.images_dir + jpeg_filename)
            im = cp.asarray(im)
            im = im * 0.6
            batch.append(im.astype(cp.uint8))
            labels.append(cp.array([label], dtype=np.uint8))
            self.i = (self.i + 1) % self.n
        return (batch, labels)
  1. 让我们通过使用 GPU 版本的 ExternalSource 算子并移除解码来修改之前的 pipeline

    注意:我们假设原始图像已在 GPU 上。

[9]:
eii_gpu = ExternalInputGpuIterator(batch_size)

print(type(next(iter(eii_gpu))[0][0]))
<class 'cupy.core.core.ndarray'>
[10]:
pipe_gpu = Pipeline(batch_size=batch_size, num_threads=2, device_id=0)
with pipe_gpu:
    images, labels = fn.external_source(
        source=eii_gpu, num_outputs=2, device="gpu", dtype=types.UINT8
    )
    enhance = fn.brightness_contrast(images, contrast=2)
    pipe_gpu.set_outputs(enhance, labels)

pipe_gpu.build()
  1. 可视化结果

[11]:
pipe_out_gpu = pipe_gpu.run()
batch_gpu = pipe_out_gpu[0].as_cpu()
labels_gpu = pipe_out_gpu[1].as_cpu()

img = batch_gpu.at(2)
print(img.shape)
print(labels_cpu.at(2))
plt.axis("off")
plt.imshow(img)
(480, 640, 3)
[1]
[11]:
<matplotlib.image.AxesImage at 0x7f65d55a0ba8>
../../../_images/examples_general_data_loading_external_input_19_2.png