并行外部源#
在本教程中,我们将向您展示如何在 external_source 操作符中启用并行模式,从而允许 Python 工作进程并发执行 source。这样做可以通过允许源在后台运行,解除主 Python 线程的阻塞,从而减少每次迭代的执行时间。并非所有 external_source 的用法都可以直接并行化 - source 参数受到一系列限制,并且需要额外的配置。
接受的 source#
根据 batch 参数,提供给并行外部源的 source 预期返回批次或单个样本。在样本模式下,DALI 负责将样本组合成批次,并有更多机会并行化输入数据的计算。因此,样本模式是运行并行外部源的首选方式。样本模式下的并行外部源对 source 参数有以下要求
- source必须是可调用对象:函数或对象。
- source回调必须接受一个参数:nvidia.dali.types.SampleInfo - 指示请求样本的索引。
- 回调返回的数据必须是 CPU 数组(或它们的元组/列表)。 
以下段落将解释这些要求背后的原因,并展示使用并行外部源的示例。
操作原理#
在每次迭代之前,DALI 外部源操作符都会查询其 source 参数以获取新数据,以便在 pipeline 中进一步处理。通过调用 source(当 source 可调用时)或调用 next(source)(在可迭代对象的情况下)从 source 获取数据所需的时间可能很长,并且可能会影响处理迭代的时间 - 尤其是在主 Python 线程中这是一个阻塞操作。
为 external_source 节点设置 parallel=True 表示 pipeline 在 DALI 启动的 Python 工作进程中运行 source。工作进程绑定到 pipeline,并由该 pipeline 中的所有并行外部源共享。每个工作进程都保留 source 回调/对象的副本。工作进程是独立的进程,因此请记住,它们不共享任何全局状态,只共享在启动它们之前指定的副本。
source 回调副本,从而从副本中请求特定样本。在批次模式下,DALI 无法请求返回特定样本,因此与样本模式相比,并行化的好处受到限制。如果 source 是接受 BatchInfo 的可调用对象,则可以并行预取几个批次。对于可迭代对象,唯一的优势是在单独的进程中运行可迭代对象。
由于并行样本模式可以提供最大的加速,因此我们介绍如何调整可迭代源以在并行样本模式下运行。
警告
当 pipeline 启用条件执行时,必须采取额外的步骤来防止 source 被 AutoGraph 重写。有两种方法可以实现这一点
- 在全局范围内定义函数(即在 - pipeline_def范围之外)。
- 如果函数是另一个“工厂”函数的结果,则工厂函数必须在 pipeline 定义函数之外定义,并使用 - <nvidia.dali.pipeline.do_not_convert>修饰。
更多详细信息可以在 nvidia.dali.pipeline.do_not_convert 文档中找到。
示例 Pipeline 和 source#
让我们采用 ExternalSource 操作符教程 中的简化示例。我们将把它转换为可调用对象作为下一步。
[1]:
import numpy as np
from nvidia.dali import pipeline_def
import nvidia.dali.fn as fn
import nvidia.dali.types as types
batch_size = 4
class ExternalInputIterator:
    def __init__(self, batch_size):
        self.images_dir = "../../data/images/"
        self.batch_size = batch_size
        with open(self.images_dir + "file_list.txt", "r") as f:
            file_label = [line.rstrip().split(" ") for line in f if line != ""]
            self.files, self.labels = zip(*file_label)
    def __iter__(self):
        self.i = 0
        # Only full batches
        self.n = (len(self.files) // self.batch_size) * self.batch_size
        return self
    def __next__(self):
        if self.i >= self.n:
            raise StopIteration()
        encoded_imgs, labels = [], []
        for _ in range(self.batch_size):
            jpeg_filename = self.files[self.i]
            label = self.labels[self.i]
            with open(self.images_dir + jpeg_filename, "rb") as f:
                encoded_imgs.append(np.frombuffer(f.read(), dtype=np.uint8))
            labels.append(np.int32([label]))
            self.i += 1
        return encoded_imgs, labels
调整为可调用对象#
我们需要在这里进行两项调整
- 将其转换为每样本模式 
- 实现能够返回特定样本的 - __call__,而不是在- __next__中按顺序返回数据。
我们可以保留初始化对象的 __init__。我们不再需要 __iter__。
现在,信息不再由迭代器跟踪要返回的下一个样本(或批次),而是来自 DALI,通过 sample_info 参数 - 描述样本索引的成员变量在 SampleInfo 类型文档中指定。
Epochs#
此外,我们预先计算 epoch 中完整批次的数量。这样我们将丢弃最后一个部分批次 - 或者,我们可以支持填充批次,但这超出了本教程的范围。并行模式下的外部源不支持部分批次。
建议所有 ``source`` 回调的副本都为相同的 ``iteration``(以及在该迭代中请求的批次的所有样本索引)引发 ``StopIteration``。
这就是我们计算完整批次数量并为任何更大或相等的迭代索引引发 StopIteration 的原因。
在引发 StopIteration 后,我们可以调用 pipeline.reset() 来重置 SampleInfo - DALI 将从样本索引和迭代索引 0 开始计数。
[2]:
class ExternalInputCallable:
    def __init__(self, batch_size):
        self.images_dir = "../../data/images/"
        self.batch_size = batch_size
        with open(self.images_dir + "file_list.txt", "r") as f:
            file_label = [line.rstrip().split(" ") for line in f if line != ""]
            self.files, self.labels = zip(*file_label)
        self.full_iterations = len(self.files) // batch_size
    def __call__(self, sample_info):
        sample_idx = sample_info.idx_in_epoch
        if sample_info.iteration >= self.full_iterations:
            # Indicate end of the epoch
            raise StopIteration()
        jpeg_filename = self.files[sample_idx]
        label = np.int32([self.labels[sample_idx]])
        with open(self.images_dir + jpeg_filename, "rb") as f:
            encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
        return encoded_img, label
可调用对象仅用作示例。如果您愿意,可以使用无状态函数,例如
[3]:
def my_callback(sample_info):
    return np.full((5, 5), sample_info.idx_in_epoch)
Pipeline 定义#
现在我们可以定义两个 pipeline - **目前不使用并行模式** - 以查看这两种方法都给我们相同的结果。请注意,我们在第二个示例中设置了 batch=False。
[4]:
@pipeline_def(batch_size=batch_size, num_threads=2, device_id=0)
def iterable_pipeline():
    jpegs, labels = fn.external_source(
        source=ExternalInputIterator(batch_size),
        num_outputs=2,
        dtype=[types.UINT8, types.INT32],
    )
    decode = fn.decoders.image(jpegs, device="mixed")
    return decode, labels
@pipeline_def(batch_size=batch_size, num_threads=2, device_id=0)
def callable_pipeline():
    jpegs, labels = fn.external_source(
        source=ExternalInputCallable(batch_size),
        num_outputs=2,
        batch=False,
        dtype=[types.UINT8, types.INT32],
    )
    decode = fn.decoders.image(jpegs, device="mixed")
    return decode, labels
测试 Pipelines#
让我们介绍一些辅助代码来显示结果。
[5]:
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import math
def display(outputs, count, columns=2, captions=None, cpu=False):
    rows = int(math.ceil(len(outputs) / columns))
    fig = plt.figure()
    fig.set_size_inches(16, 6 * rows)
    gs = gridspec.GridSpec(rows, columns)
    row = 0
    col = 0
    for i in range(count):
        plt.subplot(gs[i])
        plt.axis("off")
        if captions is not None:
            plt.title(captions[i])
        plt.imshow(outputs.at(i) if cpu else outputs.as_cpu().at(i))
现在我们可以构建并运行 pipelines。
[6]:
iter_pipe = iterable_pipeline()
iter_pipe.build()
iter_out = iter_pipe.run()
call_pipe = callable_pipeline()
call_pipe.build()
call_out = call_pipe.run()
[7]:
display(iter_out[0], batch_size)
 
[8]:
display(call_out[0], batch_size)
 
走向并行#
现在我们有了一个带有可调用 source 的 pipeline,它与并行 external_source 要求兼容,我们可以调整我们的 pipeline。我们需要修改我们的 callable_pipeline,方法是为 external_source 操作符设置 parallel=True。
我们可以为 pipeline 设置两个额外的参数来控制 Python 工作进程的行为
- py_num_workers- 设置工作进程的数量 - 默认值为 1,但我们将使用更多,以在计算批次时实现更好的并行性。
- py_start_method- 控制工作进程的启动方式。
启动 Python 工作进程#
并行外部源使用 Python multiprocessing 库来控制工作进程。DALI 支持两种启动工作进程的方法:'fork' 和 'spawn'。有关启动方法的详细信息,请参阅 此处。DALI 不支持 'forkserver' 方法,因为工作进程的启动是在处理开始时完成一次,使用此方法不会比其他两种方法产生任何额外的好处。
通常,source 回调应遵守 multiprocessing 编程指南,以避免不必要的行为或错误。
Fork#
第一个方法 'fork' 创建当前 Python 解释器的 fork,这意味着新进程是相同的,并且原始进程中存在的所有对象在新进程中也可用。但是,有一个限制,即获取 CUDA 上下文的进程(通常通过与 CUDA 代码的任何交互来完成,例如创建框架 CUDA 张量,在 DALI pipeline 上调用 build() 等)无法 fork。
为了缓解这个问题,Pipeline 方法 start_py_workers() 可用于在我们需要与 CUDA 交互之前启动工作进程。
典型的做法是在与 CUDA 交互之前执行以下操作
- 定义所有 DALI pipelines(不要使用 build())。 
- 收集所有 DALI pipeline 对象。 
- 在每个 pipeline 对象上运行 - start_py_workers()。
完成这些步骤后,可以构建 pipelines 并可以访问 CUDA。
在本教程中,我们已经使用 DALI 在 GPU 上运行操作,因此当前的 Jupyter Notebook Kernel 实例已经获取了 CUDA 上下文。
在单独的教程 并行外部源 - Fork 中可以看到使用 start_py_workers() 的示例,我们在其中等待获取 CUDA 上下文。
Spawn#
'spawn' 方法启动一个新的 Python 解释器进程,并通过序列化和反序列化共享必要的状态。因此,source 可调用对象需要是 可 pickle 的才能使用此方法。
当使用 'spawn' 时,请确保主模块可以安全导入,方法是使用 if __name__ == '__main__' 保护入口点,如 multiprocessing 编程指南中所述。
'spawn' 不会干扰 CUDA 上下文,并且无需单独调用 start_py_workers() - 它在 build() 步骤中完成。由于本教程 notebook 已经获取了 CUDA 上下文,我们将使用此方法来运行我们的 pipeline。
不幸的是,Jupyter Notebook 再次存在限制 - 可调用对象必须来自新进程可见的 Python 模块。作为一种解决方法,我们将 ExternalInputCallable 的定义写入辅助文件并再次导入,以便它来自命名的模块并且易于 pickle。
[9]:
external_input_callable_def = """
import numpy as np
class ExternalInputCallable:
    def __init__(self, batch_size):
        self.images_dir = "../../data/images/"
        self.batch_size = batch_size
        with open(self.images_dir + "file_list.txt", 'r') as f:
            file_label = [line.rstrip().split(' ') for line in f if line != '']
            self.files, self.labels = zip(*file_label)
        self.full_iterations = len(self.files) // batch_size
    def __call__(self, sample_info):
        sample_idx = sample_info.idx_in_epoch
        if sample_info.iteration >= self.full_iterations:
            # Indicate end of the epoch
            raise StopIteration()
        jpeg_filename = self.files[sample_idx]
        label = np.int32([self.labels[sample_idx]])
        with open(self.images_dir + jpeg_filename, 'rb') as f:
            encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
        return encoded_img, label
"""
with open("external_input_tmp_module.py", "w") as f:
    f.write(external_input_callable_def)
import external_input_tmp_module
在常规 Python 代码中不需要此类解决方法。
您可以选择将回调定义为函数,而无需将其导出到另一个文件。您可以在**函数序列化**部分阅读有关该方法的更多信息。
使用 Spawned Python Workers 运行 Pipeline#
现在,我们可以通过向 external_source 添加 parallel=True 并将参数 py_num_workers 和 py_start_method 传递给 pipeline 来调整 pipeline 定义以并行运行。请记住也要使用新导入的 source 定义。
[10]:
@pipeline_def(
    batch_size=batch_size,
    num_threads=2,
    device_id=0,
    py_num_workers=4,
    py_start_method="spawn",
)
def parallel_pipeline():
    jpegs, labels = fn.external_source(
        source=external_input_tmp_module.ExternalInputCallable(batch_size),
        num_outputs=2,
        batch=False,
        parallel=True,
        dtype=[types.UINT8, types.INT32],
    )
    decode = fn.decoders.image(jpegs, device="mixed")
    return decode, labels
[11]:
if __name__ == "__main__":
    parallel_pipe = parallel_pipeline()
    parallel_pipe.build()
[12]:
if __name__ == "__main__":
    parallel_out = parallel_pipe.run()
    display(parallel_out[0], batch_size)
    # let Python know that the pipeline will no longer be needed, so that
    # the worker processes and resources can be closed and freed.
    del parallel_pipe
 
函数序列化#
默认情况下,局部函数和 lambda 不可 pickle。DALI 将使用自定义 pickler 对此类回调进行 pickle,该 pickler 扩展了默认的 pickle 行为。
此外,为了解决 Jupyter notebook 中的导入问题,您可以将全局定义的函数标记为按值而不是按引用(Python 中的默认行为)进行序列化。
有关如何序列化 lambda 和局部函数的不同想法,请参阅**自定义序列化**部分。
首先,让我们使用嵌套函数/闭包来模拟 ExternalInputCallable 行为。
[13]:
def create_callback(batch_size):
    images_dir = "../../data/images/"
    with open(images_dir + "file_list.txt", "r") as f:
        file_label = [line.rstrip().split(" ") for line in f if line != ""]
        files, labels = zip(*file_label)
    full_iterations = len(files) // batch_size
    def callback(sample_info):
        sample_idx = sample_info.idx_in_epoch
        if sample_info.iteration >= full_iterations:
            # Indicate end of the epoch
            raise StopIteration
        jpeg_filename = files[sample_idx]
        label = np.int32([labels[sample_idx]])
        with open(images_dir + jpeg_filename, "rb") as f:
            encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
        return encoded_img, label
    return callback
请注意外部函数如何准备通用配置,就像 ExternalInputCallable 中的 __init__ 方法一样,以及内部回调如何对应于 __call__ 方法。
[14]:
@pipeline_def(
    batch_size=batch_size,
    num_threads=2,
    device_id=0,
    py_num_workers=4,
    py_start_method="spawn",
)
def parallel_pipeline_with_closure():
    jpegs, labels = fn.external_source(
        source=create_callback(batch_size),
        num_outputs=2,
        batch=False,
        parallel=True,
        dtype=[types.UINT8, types.INT32],
    )
    decode = fn.decoders.image(jpegs, device="mixed")
    return decode, labels
[15]:
import sys
ver = sys.version_info
if ver.major < 3 or (ver.major == 3 and ver.minor < 8):
    print("Python < 3.8 is not supported")
else:
    if __name__ == "__main__":
        parallel_pipe_with_closure = parallel_pipeline_with_closure()
        parallel_pipe_with_closure.build()
        parallel_out_with_closure = parallel_pipe_with_closure.run()
        display(parallel_out_with_closure[0], batch_size)
        # let Python know that the pipeline will no longer be needed, so that
        # the worker processes and resources can be closed and freed.
        del parallel_pipe_with_closure
 
pickle_by_value#
当使用 Jupyter notebook 时,您可能希望简单地全局创建您的通用配置,并创建一个使用它的回调,而不是编写嵌套函数。为了使此方法有效,您需要使用一种 pickle 方法,该方法序列化整个函数,而不是默认方法,默认方法仅存储函数的名称和定义模块。
[16]:
import nvidia.dali.pickling as dali_pickle
images_dir = "../../data/images/"
with open(images_dir + "file_list.txt", "r") as f:
    file_label = [line.rstrip().split(" ") for line in f if line != ""]
    files, labels = zip(*file_label)
full_iterations = len(files) // batch_size
@dali_pickle.pickle_by_value
def global_callback(sample_info):
    sample_idx = sample_info.idx_in_epoch
    if sample_info.iteration >= full_iterations:
        # Indicate end of the epoch
        raise StopIteration
    jpeg_filename = files[sample_idx]
    label = np.int32([labels[sample_idx]])
    with open(images_dir + jpeg_filename, "rb") as f:
        encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
    return encoded_img, label
请注意,使上述示例工作所需的唯一内容是 @dali_pickle.pickle_by_value 修饰器。
[17]:
@pipeline_def(
    batch_size=batch_size,
    num_threads=2,
    device_id=0,
    py_num_workers=4,
    py_start_method="spawn",
)
def parallel_pipeline_global_cb():
    jpegs, labels = fn.external_source(
        source=global_callback,
        num_outputs=2,
        batch=False,
        parallel=True,
        dtype=[types.UINT8, types.INT32],
    )
    decode = fn.decoders.image(jpegs, device="mixed")
    return decode, labels
[18]:
import sys
ver = sys.version_info
if ver.major < 3 or (ver.major == 3 and ver.minor < 8):
    print("Python < 3.8 is not supported")
else:
    if __name__ == "__main__":
        parallel_pipe_global_cb = parallel_pipeline_global_cb()
        parallel_pipe_global_cb.build()
        parallel_out_global_cb = parallel_pipe_global_cb.run()
        display(parallel_out_global_cb[0], batch_size)
        # let Python know that the pipeline will no longer be needed, so that
        # the worker processes and resources can be closed and freed.
        del parallel_pipe_global_cb
 
自定义序列化#
在底层,DALI 对回调使用标准的 pickle 机制。如果您需要自定义 pickle 回调的方式,请考虑将它们实现为可调用对象(ExternalInputCallable 是一个类,其实例是可调用的示例)。您可以通过手动制作 __getstate__ 和 __setstate__ 或 __reduce__ 方法来控制自定义类的序列化(您可以在 pickle 文档中找到有关此方法的更多信息)。或者,您可以通过 dispatch_tables 机制 为您的类注册自定义 reducer。
如果需要,您可以使 DALI 使用外部包(如 dill 或 cloudpickle)序列化任何类型的回调。要实现这一点,只需将所需的模块作为 pipeline 的 py_callback_pickler 参数传递即可。
py_callback_pickler 的有效值是实现 dumps 和 loads 方法的模块/对象,或者是元组,其中第一项是模块/对象,接下来的两个可选参数是在调用 dumps 和 loads 时要传递的额外 kwargs。提供的方法和 kwargs 本身必须可以使用标准 pickle.dumps 进行 pickle。
让我们再次尝试使用闭包的示例
[19]:
def create_callback(batch_size):
    images_dir = "../../data/images/"
    with open(images_dir + "file_list.txt", "r") as f:
        file_label = [line.rstrip().split(" ") for line in f if line != ""]
        files, labels = zip(*file_label)
    full_iterations = len(files) // batch_size
    def callback(sample_info):
        sample_idx = sample_info.idx_in_epoch
        if sample_info.iteration >= full_iterations:
            # Indicate end of the epoch
            raise StopIteration
        jpeg_filename = files[sample_idx]
        label = np.int32([labels[sample_idx]])
        with open(images_dir + jpeg_filename, "rb") as f:
            encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
        return encoded_img, label
    return callback
[20]:
try:
    import dill
except:
    print("This example requires dill package")
else:
    @pipeline_def(
        batch_size=batch_size,
        num_threads=2,
        device_id=0,
        py_num_workers=4,
        py_start_method="spawn",
        py_callback_pickler=(dill, {"recurse": True}),
    )
    def parallel_pipeline_with_dill():
        jpegs, labels = fn.external_source(
            source=create_callback(batch_size),
            num_outputs=2,
            batch=False,
            parallel=True,
            dtype=[types.UINT8, types.INT32],
        )
        decode = fn.decoders.image(jpegs, device="mixed")
        return decode, labels
这次,我们将使用 dill 来序列化回调。我们将额外的 recurse=True 参数传递给 dill.dumps 调用,以便 dill 尝试跟踪序列化回调内部引用的全局对象,并且需要一起序列化。
[21]:
try:
    import dill
except:
    print("This example requires dill package")
else:
    if __name__ == "__main__":
        parallel_pipe_with_dill = parallel_pipeline_with_dill()
        parallel_pipe_with_dill.build()
        parallel_out_with_dill = parallel_pipe_with_dill.run()
        display(parallel_out_with_dill[0], batch_size)
 
序列化和繁重的设置#
通常,用作 source 的可调用对象在构造时执行一些设置(例如,组合工作进程将并行读取的文件列表)。如果 py_start_method 设置为 spawn,则构造的对象将被序列化(默认情况下使用 Python 的 pickle 模块)。但是,对于特别大的设置(当可调用对象的成员达到数百 MB 大小时),由于序列化的内存占用,序列化可能会变得缓慢或不可能。在这种情况下,某些设置部分应推迟到在工作进程中反序列化可调用对象之后。
让我们在一个(稍微人为的)示例中看到这一点,该示例的可调用对象包含一个大的文件名列表,并分配一个大的 numpy 数组用作缓冲区。该示例假设 pickle 用作序列化方法,并依赖于标准的 pickle 机制来自定义序列化。
[22]:
class ExternalInputCallable:
    def __init__(self, start=0, end=128 * 1024 * 1024):
        self.start = start
        self.end = end
        # Normally, the setup would take place here, but we defer it
        self.buffer = None
        self.files = None
    def __getstate__(self):
        # This will be run in the main process to serialize the callable
        return self.__dict__.copy()
    def __setstate__(self, state):
        self.__dict__.update(state)
        # Deferred setup is done in each of the worker processes
        self.buffer = np.arange(self.start, self.end, dtype=np.int32).reshape(
            (-1, 1024, 1024)
        )
        self.files = [f"filename_{i}" for i in range(self.start, self.end)]
    def __call__(self, sample_info):
        if sample_info.idx_in_epoch >= self.end - self.start:
            raise StopIteration
        file_idx = int(self.files[sample_info.idx_in_epoch].split("_")[1])
        sample_from_buffer = self.buffer[sample_info.idx_in_epoch]
        return np.array(file_idx, dtype=np.int32), sample_from_buffer
@pipeline_def(
    batch_size=8,
    num_threads=2,
    device_id=0,
    py_num_workers=4,
    py_start_method="spawn",
)
def parallel_pipeline():
    idx, sample = fn.external_source(
        source=ExternalInputCallable(),
        num_outputs=2,
        batch=False,
        parallel=True,
        dtype=[types.INT32, types.INT32],
    )
    return idx, sample
洗牌和分片#
到目前为止,本教程中介绍的 pipelines 在每个 epoch 中都以完全相同的顺序返回数据集。在以下部分中,我们将修改 source 回调,以便在 epoch 之间对数据集进行洗牌。此外,我们将使回调意识到分片。
在 epochs 之间洗牌数据集#
要打乱你的数据集,你可以置换索引。当使用 SampleInfo 实例调用 source 时,source 回调函数可以使用一些 permutation 函数来返回 permutation(sample_info.idx_in_epoch)-th 样本,而不是 sample_info.idx_in_epoch-th 样本。
请记住,你正在处理 source 回调函数的多个副本。每个副本都应该以相同的方式打乱数据,否则你可能会混淆你的数据集,并在同一个 epoch 中多次返回相同的样本。
要在 epoch 之间打乱数据集,你可以使用 sample_info.epoch_idx 并使用 epoch_idx 参数化 permutation。
以下是如何调整 ExternalInputCallable 示例以处理 epoch 之间的数据打乱。
[23]:
class ExternalInputCallable:
    def __init__(self, batch_size):
        self.images_dir = "../../data/images/"
        self.batch_size = batch_size
        with open(self.images_dir + "file_list.txt", "r") as f:
            file_label = [line.rstrip().split(" ") for line in f if line != ""]
            self.files, self.labels = zip(*file_label)
        # If the dataset size is not divisible by the batch size, the last
        # incomplete batch will be omitted. Alternatively, you may pad the dataset
        # by repeating the last sample in `self.files` and `self.labels` to
        # make the dataset size divisible by the `batch_size`.
        self.full_iterations = len(self.files) // batch_size
        self.perm = None  # permutation of indices
        self.last_seen_epoch = (
            # so that we don't have to recompute the `self.perm` for every sample
            None
        )
    def __call__(self, sample_info):
        if sample_info.iteration >= self.full_iterations:
            # Indicate end of the epoch
            raise StopIteration
        if self.last_seen_epoch != sample_info.epoch_idx:
            self.last_seen_epoch = sample_info.epoch_idx
            self.perm = np.random.default_rng(seed=42 + sample_info.epoch_idx)
            self.perm = self.perm.permutation(len(self.files))
        sample_idx = self.perm[sample_info.idx_in_epoch]
        jpeg_filename = self.files[sample_idx]
        label = np.int32([self.labels[sample_idx]])
        with open(self.images_dir + jpeg_filename, "rb") as f:
            encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
        return encoded_img, label
分片#
现在,让我们向 callable 对象添加 shard_id 和 num_shards 参数,以便它可以在任何给定的 epoch 中处理数据集的非重叠子集。请注意,所有分片在同一个 epoch 中使用相同的置换。
[24]:
class ExternalInputCallable:
    def __init__(self, batch_size, shard_id=0, num_shards=1):
        self.images_dir = "../../data/images/"
        self.batch_size = batch_size
        with open(self.images_dir + "file_list.txt", "r") as f:
            file_label = [line.rstrip().split(" ") for line in f if line != ""]
            self.files, self.labels = zip(*file_label)
        self.shard_id = shard_id
        self.num_shards = num_shards
        # If the dataset size is not divisibvle by number of shards,
        # the trailing samples will be omitted.
        self.shard_size = len(self.files) // num_shards
        self.shard_offset = self.shard_size * shard_id
        # If the shard size is not divisible by the batch size, the last
        # incomplete batch will be omitted.
        self.full_iterations = self.shard_size // batch_size
        self.perm = None  # permutation of indices
        self.last_seen_epoch = (
            # so that we don't have to recompute the `self.perm` for every sample
            None
        )
    def __call__(self, sample_info):
        if sample_info.iteration >= self.full_iterations:
            # Indicate end of the epoch
            raise StopIteration
        if self.last_seen_epoch != sample_info.epoch_idx:
            self.last_seen_epoch = sample_info.epoch_idx
            self.perm = np.random.default_rng(seed=42 + sample_info.epoch_idx)
            self.perm = self.perm.permutation(len(self.files))
        sample_idx = self.perm[sample_info.idx_in_epoch + self.shard_offset]
        jpeg_filename = self.files[sample_idx]
        label = np.int32([self.labels[sample_idx]])
        with open(self.images_dir + jpeg_filename, "rb") as f:
            encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
        return encoded_img, label
我们也可以用嵌套函数的形式来表达 callable 对象。
[25]:
@dali_pickle.pickle_by_value
def create_callback(batch_size, shard_id=0, num_shards=1):
    images_dir = "../../data/images/"
    with open(images_dir + "file_list.txt", "r") as f:
        file_label = [line.rstrip().split(" ") for line in f if line != ""]
        files, labels = zip(*file_label)
    dataset_size = len(files)
    shard_size = dataset_size // num_shards
    shard_offset = shard_size * shard_id
    full_iterations = shard_size // batch_size
    perm = None  # permutation of indices
    # so that we don't have to recompute the `perm` for every sample
    last_seen_epoch = None
    def callback(sample_info):
        nonlocal perm, last_seen_epoch
        if sample_info.iteration >= full_iterations:
            # Indicate end of the epoch
            raise StopIteration
        if last_seen_epoch != sample_info.epoch_idx:
            last_seen_epoch = sample_info.epoch_idx
            perm = np.random.default_rng(seed=42 + sample_info.epoch_idx)
            perm = perm.permutation(dataset_size)
        sample_idx = perm[sample_info.idx_in_epoch + shard_offset]
        jpeg_filename = files[sample_idx]
        label = np.int32([labels[sample_idx]])
        with open(images_dir + jpeg_filename, "rb") as f:
            encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
        return encoded_img, label
    return callback
让我们使用上面的回调函数将包含 21 张图像的初始数据集拆分到两个 pipeline 之间,每个 pipeline 在任何给定的 epoch 中处理 10 张图像的非重叠子集。
在这里,我们决定如果数据集大小不能被分片数量或 batch size 整除,则丢弃尾部的样本。请注意,由于 epoch 之间的数据打乱,不同的样本将在不同的 epoch 中被省略。尽管如此,你也可以决定通过重复某些样本来填充数据集和分片,以便每个样本都在每个 epoch 中使用。
[26]:
batch_size = 5
@pipeline_def(
    batch_size=batch_size,
    num_threads=2,
    device_id=0,
    py_num_workers=4,
    py_start_method="spawn",
)
def shuffling_sharding_pipepline(shard_id, num_shards):
    jpegs, labels = fn.external_source(
        source=create_callback(batch_size, shard_id, num_shards),
        num_outputs=2,
        batch=False,
        parallel=True,
        dtype=[types.UINT8, types.INT32],
    )
    decode = fn.decoders.image(jpegs, device="mixed")
    return decode, labels
[27]:
import sys
ver = sys.version_info
if ver.major < 3 or (ver.major == 3 and ver.minor < 8):
    print("Python < 3.8 is not supported")
else:
    if __name__ == "__main__":
        pipe_1 = shuffling_sharding_pipepline(shard_id=0, num_shards=2)
        pipe_1.build()
        for _ in range(2):
            images, labels = pipe_1.run()
            display(images, batch_size, columns=batch_size)
        try:
            pipe_1.run()
        except StopIteration:
            pass
        else:
            raise Exception("Expected only 10 images in the epoch")
        # let Python know that the pipeline will no longer be needed, so that
        # the worker processes and resources can be closed and freed.
        del pipe_1
 
 
[28]:
import sys
ver = sys.version_info
if ver.major < 3 or (ver.major == 3 and ver.minor < 8):
    print("Python < 3.8 is not supported")
else:
    if __name__ == "__main__":
        pipe_2 = shuffling_sharding_pipepline(shard_id=1, num_shards=2)
        pipe_2.build()
        for _ in range(2):
            images, labels = pipe_2.run()
            display(images, batch_size, columns=batch_size)
        try:
            pipe_2.run()
        except StopIteration:
            pass
        else:
            raise Exception("Expected only 10 images in the epoch")
        # let Python know that the pipeline will no longer be needed, so that
        # the worker processes and resources can be closed and freed.
        del pipe_2
 
 
容器中的并行外部源#
并行外部源使用操作系统共享内存将样本从 worker 传递到主进程。由于 pipeline 内的缓冲,DALI 可能需要分配足够的内存来一次存储几个迭代。因此,并行外部源可能需要大量的共享内存。但是,某些环境限制了 /dev/shm 的大小。
例如,当使用 Docker 运行 DALI 时,你可能需要在调用 docker run 时传递 --shm-size 参数,或者,如果你不能传递额外的参数,则在运行的容器中手动修改 /dev/shm 的大小。