并行外部源#
在本教程中,我们将向您展示如何在 external_source
操作符中启用并行模式,从而允许 Python 工作进程并发执行 source
。这样做可以通过允许源在后台运行,解除主 Python 线程的阻塞,从而减少每次迭代的执行时间。并非所有 external_source
的用法都可以直接并行化 - source
参数受到一系列限制,并且需要额外的配置。
接受的 source
#
根据 batch
参数,提供给并行外部源的 source
预期返回批次或单个样本。在样本模式下,DALI 负责将样本组合成批次,并有更多机会并行化输入数据的计算。因此,样本模式是运行并行外部源的首选方式。样本模式下的并行外部源对 source 参数有以下要求
source
必须是可调用对象:函数或对象。source
回调必须接受一个参数:nvidia.dali.types.SampleInfo - 指示请求样本的索引。回调返回的数据必须是 CPU 数组(或它们的元组/列表)。
以下段落将解释这些要求背后的原因,并展示使用并行外部源的示例。
操作原理#
在每次迭代之前,DALI 外部源操作符都会查询其 source
参数以获取新数据,以便在 pipeline 中进一步处理。通过调用 source
(当 source
可调用时)或调用 next(source)
(在可迭代对象的情况下)从 source
获取数据所需的时间可能很长,并且可能会影响处理迭代的时间 - 尤其是在主 Python 线程中这是一个阻塞操作。
为 external_source
节点设置 parallel=True
表示 pipeline 在 DALI 启动的 Python 工作进程中运行 source
。工作进程绑定到 pipeline,并由该 pipeline 中的所有并行外部源共享。每个工作进程都保留 source
回调/对象的副本。工作进程是独立的进程,因此请记住,它们不共享任何全局状态,只共享在启动它们之前指定的副本。
source
回调副本,从而从副本中请求特定样本。在批次模式下,DALI 无法请求返回特定样本,因此与样本模式相比,并行化的好处受到限制。如果 source
是接受 BatchInfo 的可调用对象,则可以并行预取几个批次。对于可迭代对象,唯一的优势是在单独的进程中运行可迭代对象。
由于并行样本模式可以提供最大的加速,因此我们介绍如何调整可迭代源以在并行样本模式下运行。
警告
当 pipeline 启用条件执行时,必须采取额外的步骤来防止 source
被 AutoGraph 重写。有两种方法可以实现这一点
在全局范围内定义函数(即在
pipeline_def
范围之外)。如果函数是另一个“工厂”函数的结果,则工厂函数必须在 pipeline 定义函数之外定义,并使用
<nvidia.dali.pipeline.do_not_convert>
修饰。
更多详细信息可以在 nvidia.dali.pipeline.do_not_convert
文档中找到。
示例 Pipeline 和 source
#
让我们采用 ExternalSource 操作符教程 中的简化示例。我们将把它转换为可调用对象作为下一步。
[1]:
import numpy as np
from nvidia.dali import pipeline_def
import nvidia.dali.fn as fn
import nvidia.dali.types as types
batch_size = 4
class ExternalInputIterator:
def __init__(self, batch_size):
self.images_dir = "../../data/images/"
self.batch_size = batch_size
with open(self.images_dir + "file_list.txt", "r") as f:
file_label = [line.rstrip().split(" ") for line in f if line != ""]
self.files, self.labels = zip(*file_label)
def __iter__(self):
self.i = 0
# Only full batches
self.n = (len(self.files) // self.batch_size) * self.batch_size
return self
def __next__(self):
if self.i >= self.n:
raise StopIteration()
encoded_imgs, labels = [], []
for _ in range(self.batch_size):
jpeg_filename = self.files[self.i]
label = self.labels[self.i]
with open(self.images_dir + jpeg_filename, "rb") as f:
encoded_imgs.append(np.frombuffer(f.read(), dtype=np.uint8))
labels.append(np.int32([label]))
self.i += 1
return encoded_imgs, labels
调整为可调用对象#
我们需要在这里进行两项调整
将其转换为每样本模式
实现能够返回特定样本的
__call__
,而不是在__next__
中按顺序返回数据。
我们可以保留初始化对象的 __init__
。我们不再需要 __iter__
。
现在,信息不再由迭代器跟踪要返回的下一个样本(或批次),而是来自 DALI,通过 sample_info
参数 - 描述样本索引的成员变量在 SampleInfo 类型文档中指定。
Epochs#
此外,我们预先计算 epoch 中完整批次的数量。这样我们将丢弃最后一个部分批次 - 或者,我们可以支持填充批次,但这超出了本教程的范围。并行模式下的外部源不支持部分批次。
建议所有 ``source`` 回调的副本都为相同的 ``iteration``(以及在该迭代中请求的批次的所有样本索引)引发 ``StopIteration``。
这就是我们计算完整批次数量并为任何更大或相等的迭代索引引发 StopIteration
的原因。
在引发 StopIteration
后,我们可以调用 pipeline.reset()
来重置 SampleInfo - DALI 将从样本索引和迭代索引 0 开始计数。
[2]:
class ExternalInputCallable:
def __init__(self, batch_size):
self.images_dir = "../../data/images/"
self.batch_size = batch_size
with open(self.images_dir + "file_list.txt", "r") as f:
file_label = [line.rstrip().split(" ") for line in f if line != ""]
self.files, self.labels = zip(*file_label)
self.full_iterations = len(self.files) // batch_size
def __call__(self, sample_info):
sample_idx = sample_info.idx_in_epoch
if sample_info.iteration >= self.full_iterations:
# Indicate end of the epoch
raise StopIteration()
jpeg_filename = self.files[sample_idx]
label = np.int32([self.labels[sample_idx]])
with open(self.images_dir + jpeg_filename, "rb") as f:
encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
return encoded_img, label
可调用对象仅用作示例。如果您愿意,可以使用无状态函数,例如
[3]:
def my_callback(sample_info):
return np.full((5, 5), sample_info.idx_in_epoch)
Pipeline 定义#
现在我们可以定义两个 pipeline - **目前不使用并行模式** - 以查看这两种方法都给我们相同的结果。请注意,我们在第二个示例中设置了 batch=False
。
[4]:
@pipeline_def(batch_size=batch_size, num_threads=2, device_id=0)
def iterable_pipeline():
jpegs, labels = fn.external_source(
source=ExternalInputIterator(batch_size),
num_outputs=2,
dtype=[types.UINT8, types.INT32],
)
decode = fn.decoders.image(jpegs, device="mixed")
return decode, labels
@pipeline_def(batch_size=batch_size, num_threads=2, device_id=0)
def callable_pipeline():
jpegs, labels = fn.external_source(
source=ExternalInputCallable(batch_size),
num_outputs=2,
batch=False,
dtype=[types.UINT8, types.INT32],
)
decode = fn.decoders.image(jpegs, device="mixed")
return decode, labels
测试 Pipelines#
让我们介绍一些辅助代码来显示结果。
[5]:
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import math
def display(outputs, count, columns=2, captions=None, cpu=False):
rows = int(math.ceil(len(outputs) / columns))
fig = plt.figure()
fig.set_size_inches(16, 6 * rows)
gs = gridspec.GridSpec(rows, columns)
row = 0
col = 0
for i in range(count):
plt.subplot(gs[i])
plt.axis("off")
if captions is not None:
plt.title(captions[i])
plt.imshow(outputs.at(i) if cpu else outputs.as_cpu().at(i))
现在我们可以构建并运行 pipelines。
[6]:
iter_pipe = iterable_pipeline()
iter_pipe.build()
iter_out = iter_pipe.run()
call_pipe = callable_pipeline()
call_pipe.build()
call_out = call_pipe.run()
[7]:
display(iter_out[0], batch_size)
data:image/s3,"s3://crabby-images/07555/075553ff7eb45e871312803bee8c223b524d78d1" alt="../../../_images/examples_general_data_loading_parallel_external_source_17_0.png"
[8]:
display(call_out[0], batch_size)
data:image/s3,"s3://crabby-images/ea510/ea5107a0b104431fe68d96924fd51e1ea406b875" alt="../../../_images/examples_general_data_loading_parallel_external_source_18_0.png"
走向并行#
现在我们有了一个带有可调用 source
的 pipeline,它与并行 external_source
要求兼容,我们可以调整我们的 pipeline。我们需要修改我们的 callable_pipeline
,方法是为 external_source
操作符设置 parallel=True
。
我们可以为 pipeline 设置两个额外的参数来控制 Python 工作进程的行为
py_num_workers
- 设置工作进程的数量 - 默认值为 1,但我们将使用更多,以在计算批次时实现更好的并行性。py_start_method
- 控制工作进程的启动方式。
启动 Python 工作进程#
并行外部源使用 Python multiprocessing 库来控制工作进程。DALI 支持两种启动工作进程的方法:'fork'
和 'spawn'
。有关启动方法的详细信息,请参阅 此处。DALI 不支持 'forkserver'
方法,因为工作进程的启动是在处理开始时完成一次,使用此方法不会比其他两种方法产生任何额外的好处。
通常,source
回调应遵守 multiprocessing 编程指南,以避免不必要的行为或错误。
Fork#
第一个方法 'fork'
创建当前 Python 解释器的 fork,这意味着新进程是相同的,并且原始进程中存在的所有对象在新进程中也可用。但是,有一个限制,即获取 CUDA 上下文的进程(通常通过与 CUDA 代码的任何交互来完成,例如创建框架 CUDA 张量,在 DALI pipeline 上调用 build()
等)无法 fork。
为了缓解这个问题,Pipeline 方法 start_py_workers()
可用于在我们需要与 CUDA 交互之前启动工作进程。
典型的做法是在与 CUDA 交互之前执行以下操作
定义所有 DALI pipelines(不要使用 build())。
收集所有 DALI pipeline 对象。
在每个 pipeline 对象上运行
start_py_workers()
。
完成这些步骤后,可以构建 pipelines 并可以访问 CUDA。
在本教程中,我们已经使用 DALI 在 GPU 上运行操作,因此当前的 Jupyter Notebook Kernel 实例已经获取了 CUDA 上下文。
在单独的教程 并行外部源 - Fork 中可以看到使用 start_py_workers()
的示例,我们在其中等待获取 CUDA 上下文。
Spawn#
'spawn'
方法启动一个新的 Python 解释器进程,并通过序列化和反序列化共享必要的状态。因此,source
可调用对象需要是 可 pickle 的才能使用此方法。
当使用 'spawn'
时,请确保主模块可以安全导入,方法是使用 if __name__ == '__main__'
保护入口点,如 multiprocessing 编程指南中所述。
'spawn'
不会干扰 CUDA 上下文,并且无需单独调用 start_py_workers()
- 它在 build()
步骤中完成。由于本教程 notebook 已经获取了 CUDA 上下文,我们将使用此方法来运行我们的 pipeline。
不幸的是,Jupyter Notebook 再次存在限制 - 可调用对象必须来自新进程可见的 Python 模块。作为一种解决方法,我们将 ExternalInputCallable
的定义写入辅助文件并再次导入,以便它来自命名的模块并且易于 pickle。
[9]:
external_input_callable_def = """
import numpy as np
class ExternalInputCallable:
def __init__(self, batch_size):
self.images_dir = "../../data/images/"
self.batch_size = batch_size
with open(self.images_dir + "file_list.txt", 'r') as f:
file_label = [line.rstrip().split(' ') for line in f if line != '']
self.files, self.labels = zip(*file_label)
self.full_iterations = len(self.files) // batch_size
def __call__(self, sample_info):
sample_idx = sample_info.idx_in_epoch
if sample_info.iteration >= self.full_iterations:
# Indicate end of the epoch
raise StopIteration()
jpeg_filename = self.files[sample_idx]
label = np.int32([self.labels[sample_idx]])
with open(self.images_dir + jpeg_filename, 'rb') as f:
encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
return encoded_img, label
"""
with open("external_input_tmp_module.py", "w") as f:
f.write(external_input_callable_def)
import external_input_tmp_module
在常规 Python 代码中不需要此类解决方法。
您可以选择将回调定义为函数,而无需将其导出到另一个文件。您可以在**函数序列化**部分阅读有关该方法的更多信息。
使用 Spawned Python Workers 运行 Pipeline#
现在,我们可以通过向 external_source
添加 parallel=True
并将参数 py_num_workers
和 py_start_method
传递给 pipeline 来调整 pipeline 定义以并行运行。请记住也要使用新导入的 source
定义。
[10]:
@pipeline_def(
batch_size=batch_size,
num_threads=2,
device_id=0,
py_num_workers=4,
py_start_method="spawn",
)
def parallel_pipeline():
jpegs, labels = fn.external_source(
source=external_input_tmp_module.ExternalInputCallable(batch_size),
num_outputs=2,
batch=False,
parallel=True,
dtype=[types.UINT8, types.INT32],
)
decode = fn.decoders.image(jpegs, device="mixed")
return decode, labels
[11]:
if __name__ == "__main__":
parallel_pipe = parallel_pipeline()
parallel_pipe.build()
[12]:
if __name__ == "__main__":
parallel_out = parallel_pipe.run()
display(parallel_out[0], batch_size)
# let Python know that the pipeline will no longer be needed, so that
# the worker processes and resources can be closed and freed.
del parallel_pipe
data:image/s3,"s3://crabby-images/e5e51/e5e519ec53954ee1fc2ee39b63e306231b0208cd" alt="../../../_images/examples_general_data_loading_parallel_external_source_29_0.png"
函数序列化#
默认情况下,局部函数和 lambda 不可 pickle。DALI 将使用自定义 pickler 对此类回调进行 pickle,该 pickler 扩展了默认的 pickle 行为。
此外,为了解决 Jupyter notebook 中的导入问题,您可以将全局定义的函数标记为按值而不是按引用(Python 中的默认行为)进行序列化。
有关如何序列化 lambda 和局部函数的不同想法,请参阅**自定义序列化**部分。
首先,让我们使用嵌套函数/闭包来模拟 ExternalInputCallable
行为。
[13]:
def create_callback(batch_size):
images_dir = "../../data/images/"
with open(images_dir + "file_list.txt", "r") as f:
file_label = [line.rstrip().split(" ") for line in f if line != ""]
files, labels = zip(*file_label)
full_iterations = len(files) // batch_size
def callback(sample_info):
sample_idx = sample_info.idx_in_epoch
if sample_info.iteration >= full_iterations:
# Indicate end of the epoch
raise StopIteration
jpeg_filename = files[sample_idx]
label = np.int32([labels[sample_idx]])
with open(images_dir + jpeg_filename, "rb") as f:
encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
return encoded_img, label
return callback
请注意外部函数如何准备通用配置,就像 ExternalInputCallable
中的 __init__
方法一样,以及内部回调如何对应于 __call__
方法。
[14]:
@pipeline_def(
batch_size=batch_size,
num_threads=2,
device_id=0,
py_num_workers=4,
py_start_method="spawn",
)
def parallel_pipeline_with_closure():
jpegs, labels = fn.external_source(
source=create_callback(batch_size),
num_outputs=2,
batch=False,
parallel=True,
dtype=[types.UINT8, types.INT32],
)
decode = fn.decoders.image(jpegs, device="mixed")
return decode, labels
[15]:
import sys
ver = sys.version_info
if ver.major < 3 or (ver.major == 3 and ver.minor < 8):
print("Python < 3.8 is not supported")
else:
if __name__ == "__main__":
parallel_pipe_with_closure = parallel_pipeline_with_closure()
parallel_pipe_with_closure.build()
parallel_out_with_closure = parallel_pipe_with_closure.run()
display(parallel_out_with_closure[0], batch_size)
# let Python know that the pipeline will no longer be needed, so that
# the worker processes and resources can be closed and freed.
del parallel_pipe_with_closure
data:image/s3,"s3://crabby-images/c36dc/c36dc08536063b171057c6fbeb4cbb9a176e3a1e" alt="../../../_images/examples_general_data_loading_parallel_external_source_35_0.png"
pickle_by_value#
当使用 Jupyter notebook 时,您可能希望简单地全局创建您的通用配置,并创建一个使用它的回调,而不是编写嵌套函数。为了使此方法有效,您需要使用一种 pickle 方法,该方法序列化整个函数,而不是默认方法,默认方法仅存储函数的名称和定义模块。
[16]:
import nvidia.dali.pickling as dali_pickle
images_dir = "../../data/images/"
with open(images_dir + "file_list.txt", "r") as f:
file_label = [line.rstrip().split(" ") for line in f if line != ""]
files, labels = zip(*file_label)
full_iterations = len(files) // batch_size
@dali_pickle.pickle_by_value
def global_callback(sample_info):
sample_idx = sample_info.idx_in_epoch
if sample_info.iteration >= full_iterations:
# Indicate end of the epoch
raise StopIteration
jpeg_filename = files[sample_idx]
label = np.int32([labels[sample_idx]])
with open(images_dir + jpeg_filename, "rb") as f:
encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
return encoded_img, label
请注意,使上述示例工作所需的唯一内容是 @dali_pickle.pickle_by_value
修饰器。
[17]:
@pipeline_def(
batch_size=batch_size,
num_threads=2,
device_id=0,
py_num_workers=4,
py_start_method="spawn",
)
def parallel_pipeline_global_cb():
jpegs, labels = fn.external_source(
source=global_callback,
num_outputs=2,
batch=False,
parallel=True,
dtype=[types.UINT8, types.INT32],
)
decode = fn.decoders.image(jpegs, device="mixed")
return decode, labels
[18]:
import sys
ver = sys.version_info
if ver.major < 3 or (ver.major == 3 and ver.minor < 8):
print("Python < 3.8 is not supported")
else:
if __name__ == "__main__":
parallel_pipe_global_cb = parallel_pipeline_global_cb()
parallel_pipe_global_cb.build()
parallel_out_global_cb = parallel_pipe_global_cb.run()
display(parallel_out_global_cb[0], batch_size)
# let Python know that the pipeline will no longer be needed, so that
# the worker processes and resources can be closed and freed.
del parallel_pipe_global_cb
data:image/s3,"s3://crabby-images/7c6f7/7c6f795dccb6ffa82a2f4dff218ccaa3be3a7274" alt="../../../_images/examples_general_data_loading_parallel_external_source_40_0.png"
自定义序列化#
在底层,DALI 对回调使用标准的 pickle 机制。如果您需要自定义 pickle 回调的方式,请考虑将它们实现为可调用对象(ExternalInputCallable
是一个类,其实例是可调用的示例)。您可以通过手动制作 __getstate__
和 __setstate__
或 __reduce__
方法来控制自定义类的序列化(您可以在 pickle 文档中找到有关此方法的更多信息)。或者,您可以通过 dispatch_tables
机制 为您的类注册自定义 reducer。
如果需要,您可以使 DALI 使用外部包(如 dill
或 cloudpickle
)序列化任何类型的回调。要实现这一点,只需将所需的模块作为 pipeline 的 py_callback_pickler
参数传递即可。
py_callback_pickler
的有效值是实现 dumps
和 loads
方法的模块/对象,或者是元组,其中第一项是模块/对象,接下来的两个可选参数是在调用 dumps 和 loads 时要传递的额外 kwargs。提供的方法和 kwargs 本身必须可以使用标准 pickle.dumps
进行 pickle。
让我们再次尝试使用闭包的示例
[19]:
def create_callback(batch_size):
images_dir = "../../data/images/"
with open(images_dir + "file_list.txt", "r") as f:
file_label = [line.rstrip().split(" ") for line in f if line != ""]
files, labels = zip(*file_label)
full_iterations = len(files) // batch_size
def callback(sample_info):
sample_idx = sample_info.idx_in_epoch
if sample_info.iteration >= full_iterations:
# Indicate end of the epoch
raise StopIteration
jpeg_filename = files[sample_idx]
label = np.int32([labels[sample_idx]])
with open(images_dir + jpeg_filename, "rb") as f:
encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
return encoded_img, label
return callback
[20]:
try:
import dill
except:
print("This example requires dill package")
else:
@pipeline_def(
batch_size=batch_size,
num_threads=2,
device_id=0,
py_num_workers=4,
py_start_method="spawn",
py_callback_pickler=(dill, {"recurse": True}),
)
def parallel_pipeline_with_dill():
jpegs, labels = fn.external_source(
source=create_callback(batch_size),
num_outputs=2,
batch=False,
parallel=True,
dtype=[types.UINT8, types.INT32],
)
decode = fn.decoders.image(jpegs, device="mixed")
return decode, labels
这次,我们将使用 dill 来序列化回调。我们将额外的 recurse=True
参数传递给 dill.dumps
调用,以便 dill 尝试跟踪序列化回调内部引用的全局对象,并且需要一起序列化。
[21]:
try:
import dill
except:
print("This example requires dill package")
else:
if __name__ == "__main__":
parallel_pipe_with_dill = parallel_pipeline_with_dill()
parallel_pipe_with_dill.build()
parallel_out_with_dill = parallel_pipe_with_dill.run()
display(parallel_out_with_dill[0], batch_size)
data:image/s3,"s3://crabby-images/98beb/98bebca6c1471d7109e707c4e36d2bb549fe75b6" alt="../../../_images/examples_general_data_loading_parallel_external_source_46_0.png"
序列化和繁重的设置#
通常,用作 source
的可调用对象在构造时执行一些设置(例如,组合工作进程将并行读取的文件列表)。如果 py_start_method
设置为 spawn
,则构造的对象将被序列化(默认情况下使用 Python 的 pickle
模块)。但是,对于特别大的设置(当可调用对象的成员达到数百 MB 大小时),由于序列化的内存占用,序列化可能会变得缓慢或不可能。在这种情况下,某些设置部分应推迟到在工作进程中反序列化可调用对象之后。
让我们在一个(稍微人为的)示例中看到这一点,该示例的可调用对象包含一个大的文件名列表,并分配一个大的 numpy 数组用作缓冲区。该示例假设 pickle
用作序列化方法,并依赖于标准的 pickle 机制来自定义序列化。
[22]:
class ExternalInputCallable:
def __init__(self, start=0, end=128 * 1024 * 1024):
self.start = start
self.end = end
# Normally, the setup would take place here, but we defer it
self.buffer = None
self.files = None
def __getstate__(self):
# This will be run in the main process to serialize the callable
return self.__dict__.copy()
def __setstate__(self, state):
self.__dict__.update(state)
# Deferred setup is done in each of the worker processes
self.buffer = np.arange(self.start, self.end, dtype=np.int32).reshape(
(-1, 1024, 1024)
)
self.files = [f"filename_{i}" for i in range(self.start, self.end)]
def __call__(self, sample_info):
if sample_info.idx_in_epoch >= self.end - self.start:
raise StopIteration
file_idx = int(self.files[sample_info.idx_in_epoch].split("_")[1])
sample_from_buffer = self.buffer[sample_info.idx_in_epoch]
return np.array(file_idx, dtype=np.int32), sample_from_buffer
@pipeline_def(
batch_size=8,
num_threads=2,
device_id=0,
py_num_workers=4,
py_start_method="spawn",
)
def parallel_pipeline():
idx, sample = fn.external_source(
source=ExternalInputCallable(),
num_outputs=2,
batch=False,
parallel=True,
dtype=[types.INT32, types.INT32],
)
return idx, sample
洗牌和分片#
到目前为止,本教程中介绍的 pipelines 在每个 epoch 中都以完全相同的顺序返回数据集。在以下部分中,我们将修改 source
回调,以便在 epoch 之间对数据集进行洗牌。此外,我们将使回调意识到分片。
在 epochs 之间洗牌数据集#
要打乱你的数据集,你可以置换索引。当使用 SampleInfo
实例调用 source
时,source
回调函数可以使用一些 permutation
函数来返回 permutation(sample_info.idx_in_epoch)
-th 样本,而不是 sample_info.idx_in_epoch
-th 样本。
请记住,你正在处理 source
回调函数的多个副本。每个副本都应该以相同的方式打乱数据,否则你可能会混淆你的数据集,并在同一个 epoch 中多次返回相同的样本。
要在 epoch 之间打乱数据集,你可以使用 sample_info.epoch_idx
并使用 epoch_idx
参数化 permutation
。
以下是如何调整 ExternalInputCallable
示例以处理 epoch 之间的数据打乱。
[23]:
class ExternalInputCallable:
def __init__(self, batch_size):
self.images_dir = "../../data/images/"
self.batch_size = batch_size
with open(self.images_dir + "file_list.txt", "r") as f:
file_label = [line.rstrip().split(" ") for line in f if line != ""]
self.files, self.labels = zip(*file_label)
# If the dataset size is not divisible by the batch size, the last
# incomplete batch will be omitted. Alternatively, you may pad the dataset
# by repeating the last sample in `self.files` and `self.labels` to
# make the dataset size divisible by the `batch_size`.
self.full_iterations = len(self.files) // batch_size
self.perm = None # permutation of indices
self.last_seen_epoch = (
# so that we don't have to recompute the `self.perm` for every sample
None
)
def __call__(self, sample_info):
if sample_info.iteration >= self.full_iterations:
# Indicate end of the epoch
raise StopIteration
if self.last_seen_epoch != sample_info.epoch_idx:
self.last_seen_epoch = sample_info.epoch_idx
self.perm = np.random.default_rng(seed=42 + sample_info.epoch_idx)
self.perm = self.perm.permutation(len(self.files))
sample_idx = self.perm[sample_info.idx_in_epoch]
jpeg_filename = self.files[sample_idx]
label = np.int32([self.labels[sample_idx]])
with open(self.images_dir + jpeg_filename, "rb") as f:
encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
return encoded_img, label
分片#
现在,让我们向 callable 对象添加 shard_id
和 num_shards
参数,以便它可以在任何给定的 epoch 中处理数据集的非重叠子集。请注意,所有分片在同一个 epoch 中使用相同的置换。
[24]:
class ExternalInputCallable:
def __init__(self, batch_size, shard_id=0, num_shards=1):
self.images_dir = "../../data/images/"
self.batch_size = batch_size
with open(self.images_dir + "file_list.txt", "r") as f:
file_label = [line.rstrip().split(" ") for line in f if line != ""]
self.files, self.labels = zip(*file_label)
self.shard_id = shard_id
self.num_shards = num_shards
# If the dataset size is not divisibvle by number of shards,
# the trailing samples will be omitted.
self.shard_size = len(self.files) // num_shards
self.shard_offset = self.shard_size * shard_id
# If the shard size is not divisible by the batch size, the last
# incomplete batch will be omitted.
self.full_iterations = self.shard_size // batch_size
self.perm = None # permutation of indices
self.last_seen_epoch = (
# so that we don't have to recompute the `self.perm` for every sample
None
)
def __call__(self, sample_info):
if sample_info.iteration >= self.full_iterations:
# Indicate end of the epoch
raise StopIteration
if self.last_seen_epoch != sample_info.epoch_idx:
self.last_seen_epoch = sample_info.epoch_idx
self.perm = np.random.default_rng(seed=42 + sample_info.epoch_idx)
self.perm = self.perm.permutation(len(self.files))
sample_idx = self.perm[sample_info.idx_in_epoch + self.shard_offset]
jpeg_filename = self.files[sample_idx]
label = np.int32([self.labels[sample_idx]])
with open(self.images_dir + jpeg_filename, "rb") as f:
encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
return encoded_img, label
我们也可以用嵌套函数的形式来表达 callable 对象。
[25]:
@dali_pickle.pickle_by_value
def create_callback(batch_size, shard_id=0, num_shards=1):
images_dir = "../../data/images/"
with open(images_dir + "file_list.txt", "r") as f:
file_label = [line.rstrip().split(" ") for line in f if line != ""]
files, labels = zip(*file_label)
dataset_size = len(files)
shard_size = dataset_size // num_shards
shard_offset = shard_size * shard_id
full_iterations = shard_size // batch_size
perm = None # permutation of indices
# so that we don't have to recompute the `perm` for every sample
last_seen_epoch = None
def callback(sample_info):
nonlocal perm, last_seen_epoch
if sample_info.iteration >= full_iterations:
# Indicate end of the epoch
raise StopIteration
if last_seen_epoch != sample_info.epoch_idx:
last_seen_epoch = sample_info.epoch_idx
perm = np.random.default_rng(seed=42 + sample_info.epoch_idx)
perm = perm.permutation(dataset_size)
sample_idx = perm[sample_info.idx_in_epoch + shard_offset]
jpeg_filename = files[sample_idx]
label = np.int32([labels[sample_idx]])
with open(images_dir + jpeg_filename, "rb") as f:
encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
return encoded_img, label
return callback
让我们使用上面的回调函数将包含 21 张图像的初始数据集拆分到两个 pipeline 之间,每个 pipeline 在任何给定的 epoch 中处理 10 张图像的非重叠子集。
在这里,我们决定如果数据集大小不能被分片数量或 batch size 整除,则丢弃尾部的样本。请注意,由于 epoch 之间的数据打乱,不同的样本将在不同的 epoch 中被省略。尽管如此,你也可以决定通过重复某些样本来填充数据集和分片,以便每个样本都在每个 epoch 中使用。
[26]:
batch_size = 5
@pipeline_def(
batch_size=batch_size,
num_threads=2,
device_id=0,
py_num_workers=4,
py_start_method="spawn",
)
def shuffling_sharding_pipepline(shard_id, num_shards):
jpegs, labels = fn.external_source(
source=create_callback(batch_size, shard_id, num_shards),
num_outputs=2,
batch=False,
parallel=True,
dtype=[types.UINT8, types.INT32],
)
decode = fn.decoders.image(jpegs, device="mixed")
return decode, labels
[27]:
import sys
ver = sys.version_info
if ver.major < 3 or (ver.major == 3 and ver.minor < 8):
print("Python < 3.8 is not supported")
else:
if __name__ == "__main__":
pipe_1 = shuffling_sharding_pipepline(shard_id=0, num_shards=2)
pipe_1.build()
for _ in range(2):
images, labels = pipe_1.run()
display(images, batch_size, columns=batch_size)
try:
pipe_1.run()
except StopIteration:
pass
else:
raise Exception("Expected only 10 images in the epoch")
# let Python know that the pipeline will no longer be needed, so that
# the worker processes and resources can be closed and freed.
del pipe_1
data:image/s3,"s3://crabby-images/2784d/2784da539ddee6faf50ffdacf73df9400d19d985" alt="../../../_images/examples_general_data_loading_parallel_external_source_58_0.png"
data:image/s3,"s3://crabby-images/2f9ea/2f9eabd2960b828660caf9672115a0ad87b9c080" alt="../../../_images/examples_general_data_loading_parallel_external_source_58_1.png"
[28]:
import sys
ver = sys.version_info
if ver.major < 3 or (ver.major == 3 and ver.minor < 8):
print("Python < 3.8 is not supported")
else:
if __name__ == "__main__":
pipe_2 = shuffling_sharding_pipepline(shard_id=1, num_shards=2)
pipe_2.build()
for _ in range(2):
images, labels = pipe_2.run()
display(images, batch_size, columns=batch_size)
try:
pipe_2.run()
except StopIteration:
pass
else:
raise Exception("Expected only 10 images in the epoch")
# let Python know that the pipeline will no longer be needed, so that
# the worker processes and resources can be closed and freed.
del pipe_2
data:image/s3,"s3://crabby-images/04e09/04e09e95014292cdcee5752c4d431487532b8f06" alt="../../../_images/examples_general_data_loading_parallel_external_source_59_0.png"
data:image/s3,"s3://crabby-images/f13e2/f13e25b24955bc46e1421bc31b8a1ac099e24c91" alt="../../../_images/examples_general_data_loading_parallel_external_source_59_1.png"
容器中的并行外部源#
并行外部源使用操作系统共享内存将样本从 worker 传递到主进程。由于 pipeline 内的缓冲,DALI 可能需要分配足够的内存来一次存储几个迭代。因此,并行外部源可能需要大量的共享内存。但是,某些环境限制了 /dev/shm
的大小。
例如,当使用 Docker 运行 DALI 时,你可能需要在调用 docker run 时传递 --shm-size
参数,或者,如果你不能传递额外的参数,则在运行的容器中手动修改 /dev/shm
的大小。