并行外部数据源#
在本教程中,我们将向您展示如何在 external_source
操作符中启用并行模式,从而允许 Python 工作进程并发执行 source
。这样做可以通过允许源在后台运行,解除主 Python 线程的阻塞,来减少每次迭代的执行时间。并非所有 external_source
的用法都可以直接并行化 - source
参数受到一系列限制,并且需要额外的配置。
接受的 source
#
根据 batch
参数,并行外部数据源提供的 source
应该返回批次或单个样本。在样本模式下,DALI 负责将样本组合成批次,并有更多机会并行化输入数据的计算。因此,样本模式是运行并行外部数据源的首选方式。样本模式下的并行外部数据源对 source 参数有以下要求
source
必须是可调用对象:函数或对象。source
回调必须接受一个参数:nvidia.dali.types.SampleInfo - 指示请求的样本的索引。回调返回的数据必须是 CPU 数组(或它们的元组/列表)。
以下段落将解释这些要求背后的原因,并展示使用并行外部数据源的示例。
操作原理#
在每次迭代之前,DALI 外部数据源操作符都会查询其 source
参数以获取新数据,并将其传递到 Pipeline 中进行进一步处理。通过调用 source
(当 source
是可调用对象时)或调用 next(source)
(在可迭代对象的情况下)从 source
获取数据所需的时间可能很长,并且会影响处理迭代的时间 - 尤其是在主 Python 线程中这是一个阻塞操作。
为 external_source
节点设置 parallel=True
指示 Pipeline 在 DALI 启动的 Python 工作进程中运行 source
。工作进程绑定到 Pipeline,并由该 Pipeline 中的所有并行外部数据源共享。每个工作进程都保留 source
回调/对象的副本。工作进程是独立的进程,因此请记住,它们不共享任何全局状态,只共享启动它们之前指定的副本。
source
回调副本,来请求特定的样本。在批次模式下,DALI 无法请求返回特定样本,因此与样本模式相比,并行化的好处有限。如果 source
是接受 BatchInfo 的可调用对象,则可以并行预取几个批次。对于可迭代对象,唯一的优势是在单独的进程中运行可迭代对象。
由于并行样本模式可以提供最大的加速,因此我们介绍如何调整可迭代源以在并行样本模式下运行。
警告
当 Pipeline 启用条件执行时,必须采取额外的步骤来防止 source
被 AutoGraph 重写。有两种方法可以实现这一点
在全局范围内定义函数(即在
pipeline_def
范围之外)。如果函数是另一个“工厂”函数的结果,则工厂函数必须在 Pipeline 定义函数之外定义,并使用
<nvidia.dali.pipeline.do_not_convert>
修饰。
更多详细信息可以在 nvidia.dali.pipeline.do_not_convert
文档中找到。
Pipeline 示例和 source
#
让我们采用 外部数据源操作符教程 中的简化示例。我们将把它转换为可调用对象作为下一步。
[1]:
import numpy as np
from nvidia.dali import pipeline_def
import nvidia.dali.fn as fn
import nvidia.dali.types as types
batch_size = 4
class ExternalInputIterator:
def __init__(self, batch_size):
self.images_dir = "../../data/images/"
self.batch_size = batch_size
with open(self.images_dir + "file_list.txt", "r") as f:
file_label = [line.rstrip().split(" ") for line in f if line != ""]
self.files, self.labels = zip(*file_label)
def __iter__(self):
self.i = 0
# Only full batches
self.n = (len(self.files) // self.batch_size) * self.batch_size
return self
def __next__(self):
if self.i >= self.n:
raise StopIteration()
encoded_imgs, labels = [], []
for _ in range(self.batch_size):
jpeg_filename = self.files[self.i]
label = self.labels[self.i]
with open(self.images_dir + jpeg_filename, "rb") as f:
encoded_imgs.append(np.frombuffer(f.read(), dtype=np.uint8))
labels.append(np.int32([label]))
self.i += 1
return encoded_imgs, labels
调整为可调用对象#
我们需要在这里进行两项调整
将其转换为每样本模式
不要在
__next__
中按顺序返回数据,而是实现__call__
,使其能够返回特定样本。
我们可以保留初始化对象的 __init__
。我们不再需要 __iter__
。
现在,信息不再由迭代器跟踪要返回的下一个样本(或批次),而是通过 sample_info
参数从 DALI 传入 - 描述样本索引的成员变量在 SampleInfo 类型文档中指定。
Epoch(轮次)#
此外,我们预先计算了每个 epoch 中完整批次的数量。这样我们将丢弃最后一个部分批次 - 或者,我们可以支持填充批次,但这超出了本教程的范围。并行模式下的外部数据源不支持部分批次。
建议所有 source
回调的副本都为相同的 iteration
(以及在该迭代中请求的批次的所有样本索引)引发 StopIteration
。
这就是为什么我们计算完整批次的数量,并为任何大于或等于迭代索引的情况引发 StopIteration
。
在引发 StopIteration
之后,我们可以调用 pipeline.reset()
来重置 SampleInfo - DALI 将从样本索引和迭代索引 0 开始计数。
[2]:
class ExternalInputCallable:
def __init__(self, batch_size):
self.images_dir = "../../data/images/"
self.batch_size = batch_size
with open(self.images_dir + "file_list.txt", "r") as f:
file_label = [line.rstrip().split(" ") for line in f if line != ""]
self.files, self.labels = zip(*file_label)
self.full_iterations = len(self.files) // batch_size
def __call__(self, sample_info):
sample_idx = sample_info.idx_in_epoch
if sample_info.iteration >= self.full_iterations:
# Indicate end of the epoch
raise StopIteration()
jpeg_filename = self.files[sample_idx]
label = np.int32([self.labels[sample_idx]])
with open(self.images_dir + jpeg_filename, "rb") as f:
encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
return encoded_img, label
可调用对象仅用作示例。如果您愿意,可以使用无状态函数,例如
[3]:
def my_callback(sample_info):
return np.full((5, 5), sample_info.idx_in_epoch)
Pipeline 定义#
现在我们可以定义两个 Pipeline - 暂时不使用并行模式 - 以查看两种方法是否都给出相同的结果。请注意,我们在第二个示例中设置了 batch=False
。
[4]:
@pipeline_def(batch_size=batch_size, num_threads=2, device_id=0)
def iterable_pipeline():
jpegs, labels = fn.external_source(
source=ExternalInputIterator(batch_size),
num_outputs=2,
dtype=[types.UINT8, types.INT32],
)
decode = fn.decoders.image(jpegs, device="mixed")
return decode, labels
@pipeline_def(batch_size=batch_size, num_threads=2, device_id=0)
def callable_pipeline():
jpegs, labels = fn.external_source(
source=ExternalInputCallable(batch_size),
num_outputs=2,
batch=False,
dtype=[types.UINT8, types.INT32],
)
decode = fn.decoders.image(jpegs, device="mixed")
return decode, labels
测试 Pipeline#
让我们引入一些辅助代码来显示结果。
[5]:
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import math
def display(outputs, count, columns=2, captions=None, cpu=False):
rows = int(math.ceil(len(outputs) / columns))
fig = plt.figure()
fig.set_size_inches(16, 6 * rows)
gs = gridspec.GridSpec(rows, columns)
row = 0
col = 0
for i in range(count):
plt.subplot(gs[i])
plt.axis("off")
if captions is not None:
plt.title(captions[i])
plt.imshow(outputs.at(i) if cpu else outputs.as_cpu().at(i))
现在我们可以构建并运行 Pipeline。
[6]:
iter_pipe = iterable_pipeline()
iter_pipe.build()
iter_out = iter_pipe.run()
call_pipe = callable_pipeline()
call_pipe.build()
call_out = call_pipe.run()
[7]:
display(iter_out[0], batch_size)

[8]:
display(call_out[0], batch_size)

开始并行#
现在我们有了一个带有可调用 source
的 Pipeline,它与并行 external_source
要求兼容,我们可以调整我们的 Pipeline。我们需要修改我们的 callable_pipeline
,通过为 external_source
操作符设置 parallel=True
。
我们可以为 Pipeline 设置两个额外的参数来控制 Python 工作进程的行为
py_num_workers
- 设置工作进程的数量 - 默认值为 1,但我们将使用更多,以便在计算批次时实现更好的并行性。py_start_method
- 控制工作进程的启动方式。
启动 Python 工作进程#
并行外部数据源使用 Python multiprocessing 库来控制工作进程。DALI 支持两种启动工作进程的方法:'fork'
和 'spawn'
。有关启动方法的详细信息,请参阅 此处。DALI 不支持 'forkserver'
方法,因为工作进程的启动在处理开始时完成一次,并且使用此方法不会比其他两种方法产生任何额外的好处。
一般来说,source
回调应遵守 multiprocessing 编程指南,以避免不必要的行为或错误。
Fork#
第一个方法 'fork'
创建当前 Python 解释器的分支,这意味着新进程是相同的,并且原始进程中存在的所有对象在新进程中也可用。但是,有一个限制,即获取 CUDA 上下文的进程(通常通过与 CUDA 代码的任何交互来完成,例如创建框架 CUDA 张量,在 DALI Pipeline 上调用 build()
等)无法被 fork。
为了缓解这个问题,Pipeline 方法提供了一个名为 start_py_workers()
的方法,可以在我们需要与 CUDA 交互之前使用它来启动工作进程。
典型的做法是在与 CUDA 交互之前执行以下操作
定义所有 DALI Pipeline(不要使用 build())。
收集所有 DALI Pipeline 对象。
在每个 Pipeline 对象上运行
start_py_workers()
。
完成这些步骤后,可以构建 Pipeline 并访问 CUDA。
在本教程中,我们已经使用 DALI 在 GPU 上运行操作,因此当前的 Jupyter Notebook Kernel 实例已经获取了 CUDA 上下文。
在单独的教程 并行外部数据源 - Fork 中可以看到使用 start_py_workers()
的示例,我们在该教程中等待获取 CUDA 上下文。
Spawn#
'spawn'
方法启动一个新的 Python 解释器进程,并通过序列化和反序列化来共享必要的状态。因此,source
可调用对象需要是 可 pickle 的 才能使用此方法。
当使用 'spawn'
时,请确保主模块可以安全导入,方法是使用 if __name__ == '__main__'
保护入口点,如 multiprocessing 编程指南中所述。
'spawn'
不会干扰 CUDA 上下文,并且无需单独调用 start_py_workers()
- 它在 build()
步骤内部完成。由于本教程笔记本已经获取了 CUDA 上下文,我们将使用此方法来运行我们的 Pipeline。
不幸的是,Jupyter Notebook 再次存在限制 - 可调用对象必须来自新进程可见的 Python 模块。作为一种变通方法,我们将 ExternalInputCallable
的定义写入辅助文件并再次导入它,以便它来自命名的模块并且易于 pickle。
[9]:
external_input_callable_def = """
import numpy as np
class ExternalInputCallable:
def __init__(self, batch_size):
self.images_dir = "../../data/images/"
self.batch_size = batch_size
with open(self.images_dir + "file_list.txt", 'r') as f:
file_label = [line.rstrip().split(' ') for line in f if line != '']
self.files, self.labels = zip(*file_label)
self.full_iterations = len(self.files) // batch_size
def __call__(self, sample_info):
sample_idx = sample_info.idx_in_epoch
if sample_info.iteration >= self.full_iterations:
# Indicate end of the epoch
raise StopIteration()
jpeg_filename = self.files[sample_idx]
label = np.int32([self.labels[sample_idx]])
with open(self.images_dir + jpeg_filename, 'rb') as f:
encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
return encoded_img, label
"""
with open("external_input_tmp_module.py", "w") as f:
f.write(external_input_callable_def)
import external_input_tmp_module
在常规 Python 代码中不需要这种变通方法。
您也可以将回调定义为函数,而无需将其导出到另一个文件。您可以在函数序列化部分阅读有关该方法的更多信息。
使用 Spawned Python Workers 运行 Pipeline#
现在,我们可以通过将 parallel=True
添加到 external_source
并将参数 py_num_workers
和 py_start_method
传递给 Pipeline 来调整 Pipeline 定义以并行运行。请记住也使用新导入的 source
定义。
[10]:
@pipeline_def(
batch_size=batch_size,
num_threads=2,
device_id=0,
py_num_workers=4,
py_start_method="spawn",
)
def parallel_pipeline():
jpegs, labels = fn.external_source(
source=external_input_tmp_module.ExternalInputCallable(batch_size),
num_outputs=2,
batch=False,
parallel=True,
dtype=[types.UINT8, types.INT32],
)
decode = fn.decoders.image(jpegs, device="mixed")
return decode, labels
[11]:
if __name__ == "__main__":
parallel_pipe = parallel_pipeline()
parallel_pipe.build()
[12]:
if __name__ == "__main__":
parallel_out = parallel_pipe.run()
display(parallel_out[0], batch_size)
# let Python know that the pipeline will no longer be needed, so that
# the worker processes and resources can be closed and freed.
del parallel_pipe

函数序列化#
默认情况下,局部函数和 lambda 不可 pickle。DALI 将使用自定义 pickler 对此类回调进行 pickle,该 pickler 扩展了默认的 pickle 行为。
此外,为了解决 Jupyter Notebook 中的导入问题,您可以将全局定义的函数标记为按值而不是按引用(Python 中的默认行为)序列化。
有关如何序列化 lambda 和局部函数的不同想法,请参阅 自定义序列化 部分。
首先,让我们使用嵌套函数/闭包来模拟 ExternalInputCallable
的行为。
[13]:
def create_callback(batch_size):
images_dir = "../../data/images/"
with open(images_dir + "file_list.txt", "r") as f:
file_label = [line.rstrip().split(" ") for line in f if line != ""]
files, labels = zip(*file_label)
full_iterations = len(files) // batch_size
def callback(sample_info):
sample_idx = sample_info.idx_in_epoch
if sample_info.iteration >= full_iterations:
# Indicate end of the epoch
raise StopIteration
jpeg_filename = files[sample_idx]
label = np.int32([labels[sample_idx]])
with open(images_dir + jpeg_filename, "rb") as f:
encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
return encoded_img, label
return callback
请注意,外部函数如何准备通用配置,类似于 ExternalInputCallable
中的 __init__
方法,以及内部回调如何对应于 __call__
方法。
[14]:
@pipeline_def(
batch_size=batch_size,
num_threads=2,
device_id=0,
py_num_workers=4,
py_start_method="spawn",
)
def parallel_pipeline_with_closure():
jpegs, labels = fn.external_source(
source=create_callback(batch_size),
num_outputs=2,
batch=False,
parallel=True,
dtype=[types.UINT8, types.INT32],
)
decode = fn.decoders.image(jpegs, device="mixed")
return decode, labels
[15]:
import sys
ver = sys.version_info
if ver.major < 3 or (ver.major == 3 and ver.minor < 8):
print("Python < 3.8 is not supported")
else:
if __name__ == "__main__":
parallel_pipe_with_closure = parallel_pipeline_with_closure()
parallel_pipe_with_closure.build()
parallel_out_with_closure = parallel_pipe_with_closure.run()
display(parallel_out_with_closure[0], batch_size)
# let Python know that the pipeline will no longer be needed, so that
# the worker processes and resources can be closed and freed.
del parallel_pipe_with_closure

pickle_by_value#
当使用 Jupyter Notebook 时,您可能希望简单地全局创建通用配置,并创建一个使用它的回调,而不是编写嵌套函数。为了使这种方法有效,您需要使用一种 pickle 方法,该方法序列化整个函数,而不是默认方法,后者仅存储函数的名称和定义模块。
[16]:
import nvidia.dali.pickling as dali_pickle
images_dir = "../../data/images/"
with open(images_dir + "file_list.txt", "r") as f:
file_label = [line.rstrip().split(" ") for line in f if line != ""]
files, labels = zip(*file_label)
full_iterations = len(files) // batch_size
@dali_pickle.pickle_by_value
def global_callback(sample_info):
sample_idx = sample_info.idx_in_epoch
if sample_info.iteration >= full_iterations:
# Indicate end of the epoch
raise StopIteration
jpeg_filename = files[sample_idx]
label = np.int32([labels[sample_idx]])
with open(images_dir + jpeg_filename, "rb") as f:
encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
return encoded_img, label
请注意,使上述示例工作所需的唯一操作是 @dali_pickle.pickle_by_value
装饰器。
[17]:
@pipeline_def(
batch_size=batch_size,
num_threads=2,
device_id=0,
py_num_workers=4,
py_start_method="spawn",
)
def parallel_pipeline_global_cb():
jpegs, labels = fn.external_source(
source=global_callback,
num_outputs=2,
batch=False,
parallel=True,
dtype=[types.UINT8, types.INT32],
)
decode = fn.decoders.image(jpegs, device="mixed")
return decode, labels
[18]:
import sys
ver = sys.version_info
if ver.major < 3 or (ver.major == 3 and ver.minor < 8):
print("Python < 3.8 is not supported")
else:
if __name__ == "__main__":
parallel_pipe_global_cb = parallel_pipeline_global_cb()
parallel_pipe_global_cb.build()
parallel_out_global_cb = parallel_pipe_global_cb.run()
display(parallel_out_global_cb[0], batch_size)
# let Python know that the pipeline will no longer be needed, so that
# the worker processes and resources can be closed and freed.
del parallel_pipe_global_cb

自定义序列化#
在底层,DALI 对回调使用标准的 pickle 机制。如果您需要自定义 pickle 回调的方式,请考虑将它们实现为可调用对象(ExternalInputCallable
是一个类,其实例是可调用的示例)。您可以通过手动制作 __getstate__
和 __setstate__
或 __reduce__
方法来控制自定义类的序列化(您可以在 pickle 文档中找到有关此方法的更多信息)。或者,您可以通过 dispatch_tables
机制 为您的类注册自定义 reducer。
如果需要,您可以使 DALI 使用外部包(例如 dill
或 cloudpickle
)序列化任何类型的回调。要实现这一点,只需将所需的模块作为 Pipeline 的 py_callback_pickler
参数传递即可。
py_callback_pickler
的有效值是实现 dumps
和 loads
方法的模块/对象,或者是一个元组,其中第一项是模块/对象,接下来的两个可选参数是在调用 dumps 和 loads 时分别传递的额外 kwargs。提供的方法和 kwargs 本身必须可以使用标准 pickle.dumps
进行 pickle。
让我们再次尝试使用闭包的示例
[19]:
def create_callback(batch_size):
images_dir = "../../data/images/"
with open(images_dir + "file_list.txt", "r") as f:
file_label = [line.rstrip().split(" ") for line in f if line != ""]
files, labels = zip(*file_label)
full_iterations = len(files) // batch_size
def callback(sample_info):
sample_idx = sample_info.idx_in_epoch
if sample_info.iteration >= full_iterations:
# Indicate end of the epoch
raise StopIteration
jpeg_filename = files[sample_idx]
label = np.int32([labels[sample_idx]])
with open(images_dir + jpeg_filename, "rb") as f:
encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
return encoded_img, label
return callback
[20]:
try:
import dill
except:
print("This example requires dill package")
else:
@pipeline_def(
batch_size=batch_size,
num_threads=2,
device_id=0,
py_num_workers=4,
py_start_method="spawn",
py_callback_pickler=(dill, {"recurse": True}),
)
def parallel_pipeline_with_dill():
jpegs, labels = fn.external_source(
source=create_callback(batch_size),
num_outputs=2,
batch=False,
parallel=True,
dtype=[types.UINT8, types.INT32],
)
decode = fn.decoders.image(jpegs, device="mixed")
return decode, labels
这一次,我们将使用 dill 来序列化回调。我们将额外的 recurse=True
参数传递给 dill.dumps
调用,以便 dill 尝试跟踪序列化回调内部引用的全局对象,并且需要一起序列化。
[21]:
try:
import dill
except:
print("This example requires dill package")
else:
if __name__ == "__main__":
parallel_pipe_with_dill = parallel_pipeline_with_dill()
parallel_pipe_with_dill.build()
parallel_out_with_dill = parallel_pipe_with_dill.run()
display(parallel_out_with_dill[0], batch_size)

序列化和繁重的设置#
通常,用作 source
的可调用对象在构造时执行一些设置(例如,组成工作进程将并行读取的文件列表)。如果 py_start_method
设置为 spawn
,则构造的对象将被序列化(默认情况下使用 Python 的 pickle
模块)。但是,对于特别大的设置(当可调用对象的成员达到数百 MB 大小时),由于序列化的内存占用,序列化可能会变得缓慢或不可能。在这种情况下,某些设置部分应推迟到可调用对象在工作进程中反序列化之后。
让我们在一个(稍微人为的)示例中看到这一点,该示例的可调用对象包含一个大的文件名列表,并分配一个大的 numpy 数组用作缓冲区。该示例假设 pickle
用作序列化方法,并依赖于标准的 pickle 机制来自定义序列化。
[22]:
class ExternalInputCallable:
def __init__(self, start=0, end=128 * 1024 * 1024):
self.start = start
self.end = end
# Normally, the setup would take place here, but we defer it
self.buffer = None
self.files = None
def __getstate__(self):
# This will be run in the main process to serialize the callable
return self.__dict__.copy()
def __setstate__(self, state):
self.__dict__.update(state)
# Deferred setup is done in each of the worker processes
self.buffer = np.arange(self.start, self.end, dtype=np.int32).reshape(
(-1, 1024, 1024)
)
self.files = [f"filename_{i}" for i in range(self.start, self.end)]
def __call__(self, sample_info):
if sample_info.idx_in_epoch >= self.end - self.start:
raise StopIteration
file_idx = int(self.files[sample_info.idx_in_epoch].split("_")[1])
sample_from_buffer = self.buffer[sample_info.idx_in_epoch]
return np.array(file_idx, dtype=np.int32), sample_from_buffer
@pipeline_def(
batch_size=8,
num_threads=2,
device_id=0,
py_num_workers=4,
py_start_method="spawn",
)
def parallel_pipeline():
idx, sample = fn.external_source(
source=ExternalInputCallable(),
num_outputs=2,
batch=False,
parallel=True,
dtype=[types.INT32, types.INT32],
)
return idx, sample
洗牌和分片#
到目前为止,本教程中介绍的 Pipeline 在每个 epoch 中都以完全相同的顺序返回数据集。在以下部分中,我们将修改 source
回调,以便在 epoch 之间对数据集进行洗牌。此外,我们将使回调知道分片。
在 epoch 之间洗牌数据集#
要洗牌您的数据集,您可以置换索引。当使用 SampleInfo 实例调用 source
时,source
回调可以使用某些 permutation
函数来返回 permutation(sample_info.idx_in_epoch)
-th 样本,而不是 sample_info.idx_in_epoch
-th 样本。
请记住,您正在处理 source
回调的多个副本。每个副本都应以相同的方式洗牌数据,否则您可能会混合数据集并在同一 epoch 中多次返回相同的样本。
要在 epoch 之间洗牌数据集,您可以使用 sample_info.epoch_idx
并使用 epoch_idx
参数化 permutation
。
以下是如何调整 ExternalInputCallable
示例以处理 epoch 之间的数据洗牌。
[23]:
class ExternalInputCallable:
def __init__(self, batch_size):
self.images_dir = "../../data/images/"
self.batch_size = batch_size
with open(self.images_dir + "file_list.txt", "r") as f:
file_label = [line.rstrip().split(" ") for line in f if line != ""]
self.files, self.labels = zip(*file_label)
# If the dataset size is not divisible by the batch size, the last
# incomplete batch will be omitted. Alternatively, you may pad the dataset
# by repeating the last sample in `self.files` and `self.labels` to
# make the dataset size divisible by the `batch_size`.
self.full_iterations = len(self.files) // batch_size
self.perm = None # permutation of indices
self.last_seen_epoch = (
# so that we don't have to recompute the `self.perm` for every sample
None
)
def __call__(self, sample_info):
if sample_info.iteration >= self.full_iterations:
# Indicate end of the epoch
raise StopIteration
if self.last_seen_epoch != sample_info.epoch_idx:
self.last_seen_epoch = sample_info.epoch_idx
self.perm = np.random.default_rng(seed=42 + sample_info.epoch_idx)
self.perm = self.perm.permutation(len(self.files))
sample_idx = self.perm[sample_info.idx_in_epoch]
jpeg_filename = self.files[sample_idx]
label = np.int32([self.labels[sample_idx]])
with open(self.images_dir + jpeg_filename, "rb") as f:
encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
return encoded_img, label
分片#
现在,让我们将 shard_id
和 num_shards
参数添加到可调用对象,以便它可以在任何给定 epoch 中处理数据集的非重叠子集。请注意,所有分片在同一 epoch 中使用相同的置换。
[24]:
class ExternalInputCallable:
def __init__(self, batch_size, shard_id=0, num_shards=1):
self.images_dir = "../../data/images/"
self.batch_size = batch_size
with open(self.images_dir + "file_list.txt", "r") as f:
file_label = [line.rstrip().split(" ") for line in f if line != ""]
self.files, self.labels = zip(*file_label)
self.shard_id = shard_id
self.num_shards = num_shards
# If the dataset size is not divisibvle by number of shards,
# the trailing samples will be omitted.
self.shard_size = len(self.files) // num_shards
self.shard_offset = self.shard_size * shard_id
# If the shard size is not divisible by the batch size, the last
# incomplete batch will be omitted.
self.full_iterations = self.shard_size // batch_size
self.perm = None # permutation of indices
self.last_seen_epoch = (
# so that we don't have to recompute the `self.perm` for every sample
None
)
def __call__(self, sample_info):
if sample_info.iteration >= self.full_iterations:
# Indicate end of the epoch
raise StopIteration
if self.last_seen_epoch != sample_info.epoch_idx:
self.last_seen_epoch = sample_info.epoch_idx
self.perm = np.random.default_rng(seed=42 + sample_info.epoch_idx)
self.perm = self.perm.permutation(len(self.files))
sample_idx = self.perm[sample_info.idx_in_epoch + self.shard_offset]
jpeg_filename = self.files[sample_idx]
label = np.int32([self.labels[sample_idx]])
with open(self.images_dir + jpeg_filename, "rb") as f:
encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
return encoded_img, label
我们还可以以嵌套函数的形式表达可调用对象。
[25]:
@dali_pickle.pickle_by_value
def create_callback(batch_size, shard_id=0, num_shards=1):
images_dir = "../../data/images/"
with open(images_dir + "file_list.txt", "r") as f:
file_label = [line.rstrip().split(" ") for line in f if line != ""]
files, labels = zip(*file_label)
dataset_size = len(files)
shard_size = dataset_size // num_shards
shard_offset = shard_size * shard_id
full_iterations = shard_size // batch_size
perm = None # permutation of indices
# so that we don't have to recompute the `perm` for every sample
last_seen_epoch = None
def callback(sample_info):
nonlocal perm, last_seen_epoch
if sample_info.iteration >= full_iterations:
# Indicate end of the epoch
raise StopIteration
if last_seen_epoch != sample_info.epoch_idx:
last_seen_epoch = sample_info.epoch_idx
perm = np.random.default_rng(seed=42 + sample_info.epoch_idx)
perm = perm.permutation(dataset_size)
sample_idx = perm[sample_info.idx_in_epoch + shard_offset]
jpeg_filename = files[sample_idx]
label = np.int32([labels[sample_idx]])
with open(images_dir + jpeg_filename, "rb") as f:
encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
return encoded_img, label
return callback
让我们使用上述回调在两个 Pipeline 之间拆分 21 个图像的初始数据集,每个 Pipeline 在任何给定 epoch 中处理 10 个图像的非重叠子集。
在这里,我们决定如果数据集大小不能被分片数或批次大小整除,则丢弃尾随样本。请注意,由于 epoch 之间的数据洗牌,不同的样本将在不同的 epoch 中被省略。尽管如此,您可能会决定通过重复某些样本来填充数据集和分片,以便每个样本都在每个 epoch 中使用。
[26]:
batch_size = 5
@pipeline_def(
batch_size=batch_size,
num_threads=2,
device_id=0,
py_num_workers=4,
py_start_method="spawn",
)
def shuffling_sharding_pipepline(shard_id, num_shards):
jpegs, labels = fn.external_source(
source=create_callback(batch_size, shard_id, num_shards),
num_outputs=2,
batch=False,
parallel=True,
dtype=[types.UINT8, types.INT32],
)
decode = fn.decoders.image(jpegs, device="mixed")
return decode, labels
[27]:
import sys
ver = sys.version_info
if ver.major < 3 or (ver.major == 3 and ver.minor < 8):
print("Python < 3.8 is not supported")
else:
if __name__ == "__main__":
pipe_1 = shuffling_sharding_pipepline(shard_id=0, num_shards=2)
pipe_1.build()
for _ in range(2):
images, labels = pipe_1.run()
display(images, batch_size, columns=batch_size)
try:
pipe_1.run()
except StopIteration:
pass
else:
raise Exception("Expected only 10 images in the epoch")
# let Python know that the pipeline will no longer be needed, so that
# the worker processes and resources can be closed and freed.
del pipe_1


[28]:
import sys
ver = sys.version_info
if ver.major < 3 or (ver.major == 3 and ver.minor < 8):
print("Python < 3.8 is not supported")
else:
if __name__ == "__main__":
pipe_2 = shuffling_sharding_pipepline(shard_id=1, num_shards=2)
pipe_2.build()
for _ in range(2):
images, labels = pipe_2.run()
display(images, batch_size, columns=batch_size)
try:
pipe_2.run()
except StopIteration:
pass
else:
raise Exception("Expected only 10 images in the epoch")
# let Python know that the pipeline will no longer be needed, so that
# the worker processes and resources can be closed and freed.
del pipe_2


容器中的并行外部数据源#
并行外部数据源使用 OS 共享内存将样本从工作进程传递到主进程。由于 Pipeline 内的缓冲,DALI 可能需要分配足够的内存来一次存储几次迭代。因此,并行外部数据源可能需要大量的共享内存。但是,某些环境限制了 /dev/shm
的大小。
例如,当使用 Docker 运行 DALI 时,您可能需要在调用 docker run 时传递 --shm-size
参数,或者,如果您无法传递额外的参数,则在运行的容器中手动修改 /dev/shm
的大小。