ExternalSource operator#

在此示例中,我们将了解如何将 ExternalSource 算子与 PyTorch DALI 迭代器一起使用,这使我们能够使用外部数据源作为 Pipeline 的输入。

为了实现这一点,我们必须定义一个 Iterator 或 Generator 类,其 next 函数将返回一个或多个 numpy 数组。

[1]:
import types
import collections
import numpy as np
from random import shuffle
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.types as types
import nvidia.dali.fn as fn
import torch

batch_size = 3
epochs = 3

定义迭代器#

[2]:
class ExternalInputIterator(object):
    def __init__(self, batch_size, device_id, num_gpus):
        self.images_dir = "../../data/images/"
        self.batch_size = batch_size
        with open(self.images_dir + "file_list.txt", "r") as f:
            self.files = [line.rstrip() for line in f if line is not ""]
        # whole data set size
        self.data_set_len = len(self.files)
        # based on the device_id and total number of GPUs - world size
        # get proper shard
        self.files = self.files[
            self.data_set_len
            * device_id
            // num_gpus : self.data_set_len
            * (device_id + 1)
            // num_gpus
        ]
        self.n = len(self.files)

    def __iter__(self):
        self.i = 0
        shuffle(self.files)
        return self

    def __next__(self):
        batch = []
        labels = []

        if self.i >= self.n:
            self.__iter__()
            raise StopIteration

        for _ in range(self.batch_size):
            jpeg_filename, label = self.files[self.i % self.n].split(" ")
            batch.append(
                np.fromfile(self.images_dir + jpeg_filename, dtype=np.uint8)
            )  # we can use numpy
            labels.append(
                torch.tensor([int(label)], dtype=torch.uint8)
            )  # or PyTorch's native tensors
            self.i += 1
        return (batch, labels)

    def __len__(self):
        return self.data_set_len

    next = __next__

定义 Pipeline#

现在让我们定义我们的 pipeline。我们需要一个 Pipeline 类的实例和一些算子,它们将定义处理图。我们的外部源提供 2 个输出,我们可以通过在外部源算子中指定 num_outputs=2 来方便地解包它们。

[3]:
def ExternalSourcePipeline(batch_size, num_threads, device_id, external_data):
    pipe = Pipeline(batch_size, num_threads, device_id)
    with pipe:
        jpegs, labels = fn.external_source(
            source=external_data, num_outputs=2, dtype=types.UINT8
        )
        images = fn.decoders.image(jpegs, device="mixed")
        images = fn.resize(images, resize_x=240, resize_y=240)
        output = fn.cast(images, dtype=types.UINT8)
        pipe.set_outputs(output, labels)
    return pipe

使用 Pipeline#

最后,让我们看看它是如何工作的。

last_batch_paddedlast_batch_policy 在这里仅用于演示目的而设置。用户可以编写任何自定义代码,并将 epoch 大小更改为 epoch。在这种情况下,建议将 size 设置为 -1,并让迭代器仅等待来自 iter_setup 的 StopIteration 异常。

这里的 last_batch_padded 告诉迭代器,数据集大小和批次大小对齐之间的差异由真实数据填充,这些数据在提供给框架时可以跳过 (last_batch_policy)

[4]:
from nvidia.dali.plugin.pytorch import (
    DALIClassificationIterator as PyTorchIterator,
)
from nvidia.dali.plugin.pytorch import LastBatchPolicy

eii = ExternalInputIterator(batch_size, 0, 1)
pipe = ExternalSourcePipeline(
    batch_size=batch_size, num_threads=2, device_id=0, external_data=eii
)
pii = PyTorchIterator(
    pipe, last_batch_padded=True, last_batch_policy=LastBatchPolicy.PARTIAL
)

for e in range(epochs):
    for i, data in enumerate(pii):
        real_batch_size = len(data[0]["data"])
        print(f"epoch: {e}, iter {i}, real batch size: {real_batch_size}")
    pii.reset()
epoch: 0, iter 0, real batch size: 3
epoch: 0, iter 1, real batch size: 3
epoch: 0, iter 2, real batch size: 3
epoch: 0, iter 3, real batch size: 3
epoch: 0, iter 4, real batch size: 3
epoch: 0, iter 5, real batch size: 3
epoch: 0, iter 6, real batch size: 3
epoch: 1, iter 0, real batch size: 3
epoch: 1, iter 1, real batch size: 3
epoch: 1, iter 2, real batch size: 3
epoch: 1, iter 3, real batch size: 3
epoch: 1, iter 4, real batch size: 3
epoch: 1, iter 5, real batch size: 3
epoch: 1, iter 6, real batch size: 3
epoch: 2, iter 0, real batch size: 3
epoch: 2, iter 1, real batch size: 3
epoch: 2, iter 2, real batch size: 3
epoch: 2, iter 3, real batch size: 3
epoch: 2, iter 4, real batch size: 3
epoch: 2, iter 5, real batch size: 3
epoch: 2, iter 6, real batch size: 3