ExternalSource 算子#
在此示例中,我们将看到如何将 ExternalSource
算子与 Paddle DALI 迭代器一起使用,这使我们能够使用外部数据源作为 Pipeline 的输入。
为了实现这一点,我们必须定义一个迭代器或生成器类,其 next
函数将返回一个或多个 numpy
数组。
[1]:
import types
import collections
import numpy as np
from random import shuffle
from nvidia.dali.pipeline import Pipeline
import nvidia.dali as dali
import nvidia.dali.fn as fn
batch_size = 3
epochs = 3
定义迭代器#
[2]:
class ExternalInputIterator(object):
def __init__(self, batch_size, device_id, num_gpus):
self.images_dir = "../../data/images/"
self.batch_size = batch_size
with open(self.images_dir + "file_list.txt", "r") as f:
self.files = [line.rstrip() for line in f if line is not ""]
# whole data set size
self.data_set_len = len(self.files)
# based on the device_id and total number of GPUs - world size
# get proper shard
self.files = self.files[
self.data_set_len
* device_id
// num_gpus : self.data_set_len
* (device_id + 1)
// num_gpus
]
self.n = len(self.files)
def __iter__(self):
self.i = 0
shuffle(self.files)
return self
def __next__(self):
batch = []
labels = []
if self.i >= self.n:
self.__iter__()
raise StopIteration
for _ in range(self.batch_size):
jpeg_filename, label = self.files[self.i % self.n].split(" ")
batch.append(
np.fromfile(self.images_dir + jpeg_filename, dtype=np.uint8)
) # we can use numpy
labels.append(np.uint8([int(label)]))
self.i += 1
return (batch, labels)
def __len__(self):
return self.data_set_len
next = __next__
定义 Pipeline#
现在让我们定义我们的 pipeline。我们需要一个 Pipeline
类的实例和一些算子,它们将定义处理图。我们的外部源提供 2 个输出,我们可以通过在外部源算子中指定 num_outputs=2
来方便地解包这些输出。
[3]:
def ExternalSourcePipeline(batch_size, num_threads, device_id, external_data):
pipe = Pipeline(batch_size, num_threads, device_id)
with pipe:
jpegs, labels = fn.external_source(
source=external_data, num_outputs=2, dtype=dali.types.UINT8
)
images = fn.decoders.image(jpegs, device="mixed")
images = fn.resize(images, resize_x=240, resize_y=240)
output = fn.cast(images, dtype=dali.types.UINT8)
pipe.set_outputs(output, labels)
return pipe
使用 Pipeline#
最后,让我们看看它是如何工作的。
last_batch_padded
和 last_batch_policy
在这里仅用于演示目的而设置。用户可以编写任何自定义代码并更改每个 epoch 的 epoch 大小。在这种情况下,建议将 size
设置为 -1,并让迭代器仅等待来自 iter_setup
的 StopIteration 异常。
这里的 last_batch_padded
告诉迭代器,数据集大小和批次大小对齐之间的差异由真实数据填充,这些真实数据在提供给框架 (last_batch_policy
) 时可以跳过
[4]:
from nvidia.dali.plugin.paddle import (
DALIClassificationIterator as PaddleIterator,
)
from nvidia.dali.plugin.paddle import LastBatchPolicy
eii = ExternalInputIterator(batch_size, 0, 1)
pipe = ExternalSourcePipeline(
batch_size=batch_size, num_threads=2, device_id=0, external_data=eii
)
pii = PaddleIterator(
pipe, last_batch_padded=True, last_batch_policy=LastBatchPolicy.PARTIAL
)
for e in range(epochs):
for i, data in enumerate(pii):
real_batch_size = len(np.array(data[0]["data"]))
print(f"epoch: {e}, iter {i}, real batch size: {real_batch_size}")
pii.reset()
epoch: 0, iter 0, real batch size: 3
epoch: 0, iter 1, real batch size: 3
epoch: 0, iter 2, real batch size: 3
epoch: 0, iter 3, real batch size: 3
epoch: 0, iter 4, real batch size: 3
epoch: 0, iter 5, real batch size: 3
epoch: 0, iter 6, real batch size: 3
epoch: 1, iter 0, real batch size: 3
epoch: 1, iter 1, real batch size: 3
epoch: 1, iter 2, real batch size: 3
epoch: 1, iter 3, real batch size: 3
epoch: 1, iter 4, real batch size: 3
epoch: 1, iter 5, real batch size: 3
epoch: 1, iter 6, real batch size: 3
epoch: 2, iter 0, real batch size: 3
epoch: 2, iter 1, real batch size: 3
epoch: 2, iter 2, real batch size: 3
epoch: 2, iter 3, real batch size: 3
epoch: 2, iter 4, real batch size: 3
epoch: 2, iter 5, real batch size: 3
epoch: 2, iter 6, real batch size: 3