并行外部源#

在本教程中,我们将向您展示如何在 external_source 操作符中启用并行模式,从而允许 Python 工作进程并发执行 source。这样做可以通过允许源在后台运行,解除主 Python 线程的阻塞,从而减少每次迭代的执行时间。并非所有 external_source 的用法都可以直接并行化 - source 参数受到一系列限制,并且需要额外的配置。

接受的 source#

根据 batch 参数,提供给并行外部源的 source 预期返回批次或单个样本。在样本模式下,DALI 负责将样本组合成批次,并有更多机会并行化输入数据的计算。因此,样本模式是运行并行外部源的首选方式。样本模式下的并行外部源对 source 参数有以下要求

  1. source 必须是可调用对象:函数或对象。

  2. source 回调必须接受一个参数:nvidia.dali.types.SampleInfo - 指示请求样本的索引。

  3. 回调返回的数据必须是 CPU 数组(或它们的元组/列表)。

以下段落将解释这些要求背后的原因,并展示使用并行外部源的示例。

操作原理#

在每次迭代之前,DALI 外部源操作符都会查询其 source 参数以获取新数据,以便在 pipeline 中进一步处理。通过调用 source(当 source 可调用时)或调用 next(source)(在可迭代对象的情况下)从 source 获取数据所需的时间可能很长,并且可能会影响处理迭代的时间 - 尤其是在主 Python 线程中这是一个阻塞操作。

external_source 节点设置 parallel=True 表示 pipeline 在 DALI 启动的 Python 工作进程中运行 source。工作进程绑定到 pipeline,并由该 pipeline 中的所有并行外部源共享。每个工作进程都保留 source 回调/对象的副本。工作进程是独立的进程,因此请记住,它们不共享任何全局状态,只共享在启动它们之前指定的副本。

在样本模式下,每个进程都可以通过使用包含请求样本索引的 SampleInfo 对象调用其 source 回调副本,从而从副本中请求特定样本。
DALI pipeline 在工作进程之间提前拆分计算下一个批次所需的样本,并将数据收集回来以用于当前迭代。

在批次模式下,DALI 无法请求返回特定样本,因此与样本模式相比,并行化的好处受到限制。如果 source 是接受 BatchInfo 的可调用对象,则可以并行预取几个批次。对于可迭代对象,唯一的优势是在单独的进程中运行可迭代对象。

由于并行样本模式可以提供最大的加速,因此我们介绍如何调整可迭代源以在并行样本模式下运行。

警告

当 pipeline 启用条件执行时,必须采取额外的步骤来防止 source 被 AutoGraph 重写。有两种方法可以实现这一点

  1. 在全局范围内定义函数(即在 pipeline_def 范围之外)。

  2. 如果函数是另一个“工厂”函数的结果,则工厂函数必须在 pipeline 定义函数之外定义,并使用 <nvidia.dali.pipeline.do_not_convert> 修饰。

更多详细信息可以在 nvidia.dali.pipeline.do_not_convert 文档中找到。

示例 Pipeline 和 source#

让我们采用 ExternalSource 操作符教程 中的简化示例。我们将把它转换为可调用对象作为下一步。

[1]:
import numpy as np
from nvidia.dali import pipeline_def
import nvidia.dali.fn as fn
import nvidia.dali.types as types

batch_size = 4


class ExternalInputIterator:
    def __init__(self, batch_size):
        self.images_dir = "../../data/images/"
        self.batch_size = batch_size
        with open(self.images_dir + "file_list.txt", "r") as f:
            file_label = [line.rstrip().split(" ") for line in f if line != ""]
            self.files, self.labels = zip(*file_label)

    def __iter__(self):
        self.i = 0
        # Only full batches
        self.n = (len(self.files) // self.batch_size) * self.batch_size
        return self

    def __next__(self):
        if self.i >= self.n:
            raise StopIteration()
        encoded_imgs, labels = [], []
        for _ in range(self.batch_size):
            jpeg_filename = self.files[self.i]
            label = self.labels[self.i]
            with open(self.images_dir + jpeg_filename, "rb") as f:
                encoded_imgs.append(np.frombuffer(f.read(), dtype=np.uint8))
            labels.append(np.int32([label]))
            self.i += 1
        return encoded_imgs, labels

调整为可调用对象#

我们需要在这里进行两项调整

  1. 将其转换为每样本模式

  2. 实现能够返回特定样本的 __call__,而不是在 __next__ 中按顺序返回数据。

我们可以保留初始化对象的 __init__。我们不再需要 __iter__

现在,信息不再由迭代器跟踪要返回的下一个样本(或批次),而是来自 DALI,通过 sample_info 参数 - 描述样本索引的成员变量在 SampleInfo 类型文档中指定。

Epochs#

此外,我们预先计算 epoch 中完整批次的数量。这样我们将丢弃最后一个部分批次 - 或者,我们可以支持填充批次,但这超出了本教程的范围。并行模式下的外部源不支持部分批次。

建议所有 ``source`` 回调的副本都为相同的 ``iteration``(以及在该迭代中请求的批次的所有样本索引)引发 ``StopIteration``。

这就是我们计算完整批次数量并为任何更大或相等的迭代索引引发 StopIteration 的原因。

在引发 StopIteration 后,我们可以调用 pipeline.reset() 来重置 SampleInfo - DALI 将从样本索引和迭代索引 0 开始计数。

[2]:
class ExternalInputCallable:
    def __init__(self, batch_size):
        self.images_dir = "../../data/images/"
        self.batch_size = batch_size
        with open(self.images_dir + "file_list.txt", "r") as f:
            file_label = [line.rstrip().split(" ") for line in f if line != ""]
            self.files, self.labels = zip(*file_label)
        self.full_iterations = len(self.files) // batch_size

    def __call__(self, sample_info):
        sample_idx = sample_info.idx_in_epoch
        if sample_info.iteration >= self.full_iterations:
            # Indicate end of the epoch
            raise StopIteration()
        jpeg_filename = self.files[sample_idx]
        label = np.int32([self.labels[sample_idx]])
        with open(self.images_dir + jpeg_filename, "rb") as f:
            encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
        return encoded_img, label

可调用对象仅用作示例。如果您愿意,可以使用无状态函数,例如

[3]:
def my_callback(sample_info):
    return np.full((5, 5), sample_info.idx_in_epoch)

Pipeline 定义#

现在我们可以定义两个 pipeline - **目前不使用并行模式** - 以查看这两种方法都给我们相同的结果。请注意,我们在第二个示例中设置了 batch=False

[4]:
@pipeline_def(batch_size=batch_size, num_threads=2, device_id=0)
def iterable_pipeline():
    jpegs, labels = fn.external_source(
        source=ExternalInputIterator(batch_size),
        num_outputs=2,
        dtype=[types.UINT8, types.INT32],
    )
    decode = fn.decoders.image(jpegs, device="mixed")
    return decode, labels


@pipeline_def(batch_size=batch_size, num_threads=2, device_id=0)
def callable_pipeline():
    jpegs, labels = fn.external_source(
        source=ExternalInputCallable(batch_size),
        num_outputs=2,
        batch=False,
        dtype=[types.UINT8, types.INT32],
    )
    decode = fn.decoders.image(jpegs, device="mixed")
    return decode, labels

测试 Pipelines#

让我们介绍一些辅助代码来显示结果。

[5]:
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import math


def display(outputs, count, columns=2, captions=None, cpu=False):
    rows = int(math.ceil(len(outputs) / columns))
    fig = plt.figure()
    fig.set_size_inches(16, 6 * rows)
    gs = gridspec.GridSpec(rows, columns)
    row = 0
    col = 0
    for i in range(count):
        plt.subplot(gs[i])
        plt.axis("off")
        if captions is not None:
            plt.title(captions[i])
        plt.imshow(outputs.at(i) if cpu else outputs.as_cpu().at(i))

现在我们可以构建并运行 pipelines。

[6]:
iter_pipe = iterable_pipeline()
iter_pipe.build()
iter_out = iter_pipe.run()

call_pipe = callable_pipeline()
call_pipe.build()
call_out = call_pipe.run()
[7]:
display(iter_out[0], batch_size)
../../../_images/examples_general_data_loading_parallel_external_source_17_0.png
[8]:
display(call_out[0], batch_size)
../../../_images/examples_general_data_loading_parallel_external_source_18_0.png

走向并行#

现在我们有了一个带有可调用 source 的 pipeline,它与并行 external_source 要求兼容,我们可以调整我们的 pipeline。我们需要修改我们的 callable_pipeline,方法是为 external_source 操作符设置 parallel=True

我们可以为 pipeline 设置两个额外的参数来控制 Python 工作进程的行为

  1. py_num_workers - 设置工作进程的数量 - 默认值为 1,但我们将使用更多,以在计算批次时实现更好的并行性。

  2. py_start_method - 控制工作进程的启动方式。

启动 Python 工作进程#

并行外部源使用 Python multiprocessing 库来控制工作进程。DALI 支持两种启动工作进程的方法:'fork''spawn'。有关启动方法的详细信息,请参阅 此处。DALI 不支持 'forkserver' 方法,因为工作进程的启动是在处理开始时完成一次,使用此方法不会比其他两种方法产生任何额外的好处。

通常,source 回调应遵守 multiprocessing 编程指南,以避免不必要的行为或错误。

Fork#

第一个方法 'fork' 创建当前 Python 解释器的 fork,这意味着新进程是相同的,并且原始进程中存在的所有对象在新进程中也可用。但是,有一个限制,即获取 CUDA 上下文的进程(通常通过与 CUDA 代码的任何交互来完成,例如创建框架 CUDA 张量,在 DALI pipeline 上调用 build() 等)无法 fork。

为了缓解这个问题,Pipeline 方法 start_py_workers() 可用于在我们需要与 CUDA 交互之前启动工作进程。

典型的做法是在与 CUDA 交互之前执行以下操作

  1. 定义所有 DALI pipelines(不要使用 build())。

  2. 收集所有 DALI pipeline 对象。

  3. 在每个 pipeline 对象上运行 start_py_workers()

完成这些步骤后,可以构建 pipelines 并可以访问 CUDA。

在本教程中,我们已经使用 DALI 在 GPU 上运行操作,因此当前的 Jupyter Notebook Kernel 实例已经获取了 CUDA 上下文。

在单独的教程 并行外部源 - Fork 中可以看到使用 start_py_workers() 的示例,我们在其中等待获取 CUDA 上下文。

Spawn#

'spawn' 方法启动一个新的 Python 解释器进程,并通过序列化和反序列化共享必要的状态。因此,source 可调用对象需要是 可 pickle 的才能使用此方法。

当使用 'spawn' 时,请确保主模块可以安全导入,方法是使用 if __name__ == '__main__' 保护入口点,如 multiprocessing 编程指南中所述。

'spawn' 不会干扰 CUDA 上下文,并且无需单独调用 start_py_workers() - 它在 build() 步骤中完成。由于本教程 notebook 已经获取了 CUDA 上下文,我们将使用此方法来运行我们的 pipeline。

不幸的是,Jupyter Notebook 再次存在限制 - 可调用对象必须来自新进程可见的 Python 模块。作为一种解决方法,我们将 ExternalInputCallable 的定义写入辅助文件并再次导入,以便它来自命名的模块并且易于 pickle。

[9]:
external_input_callable_def = """
import numpy as np

class ExternalInputCallable:
    def __init__(self, batch_size):
        self.images_dir = "../../data/images/"
        self.batch_size = batch_size
        with open(self.images_dir + "file_list.txt", 'r') as f:
            file_label = [line.rstrip().split(' ') for line in f if line != '']
            self.files, self.labels = zip(*file_label)
        self.full_iterations = len(self.files) // batch_size

    def __call__(self, sample_info):
        sample_idx = sample_info.idx_in_epoch
        if sample_info.iteration >= self.full_iterations:
            # Indicate end of the epoch
            raise StopIteration()
        jpeg_filename = self.files[sample_idx]
        label = np.int32([self.labels[sample_idx]])
        with open(self.images_dir + jpeg_filename, 'rb') as f:
            encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
        return encoded_img, label
"""


with open("external_input_tmp_module.py", "w") as f:
    f.write(external_input_callable_def)

import external_input_tmp_module

在常规 Python 代码中不需要此类解决方法。

您可以选择将回调定义为函数,而无需将其导出到另一个文件。您可以在**函数序列化**部分阅读有关该方法的更多信息。

使用 Spawned Python Workers 运行 Pipeline#

现在,我们可以通过向 external_source 添加 parallel=True 并将参数 py_num_workerspy_start_method 传递给 pipeline 来调整 pipeline 定义以并行运行。请记住也要使用新导入的 source 定义。

[10]:
@pipeline_def(
    batch_size=batch_size,
    num_threads=2,
    device_id=0,
    py_num_workers=4,
    py_start_method="spawn",
)
def parallel_pipeline():
    jpegs, labels = fn.external_source(
        source=external_input_tmp_module.ExternalInputCallable(batch_size),
        num_outputs=2,
        batch=False,
        parallel=True,
        dtype=[types.UINT8, types.INT32],
    )
    decode = fn.decoders.image(jpegs, device="mixed")
    return decode, labels
[11]:
if __name__ == "__main__":
    parallel_pipe = parallel_pipeline()
    parallel_pipe.build()
[12]:
if __name__ == "__main__":
    parallel_out = parallel_pipe.run()
    display(parallel_out[0], batch_size)

    # let Python know that the pipeline will no longer be needed, so that
    # the worker processes and resources can be closed and freed.
    del parallel_pipe
../../../_images/examples_general_data_loading_parallel_external_source_29_0.png

函数序列化#

默认情况下,局部函数和 lambda 不可 pickle。DALI 将使用自定义 pickler 对此类回调进行 pickle,该 pickler 扩展了默认的 pickle 行为。

此外,为了解决 Jupyter notebook 中的导入问题,您可以将全局定义的函数标记为按值而不是按引用(Python 中的默认行为)进行序列化。

有关如何序列化 lambda 和局部函数的不同想法,请参阅**自定义序列化**部分。

首先,让我们使用嵌套函数/闭包来模拟 ExternalInputCallable 行为。

[13]:
def create_callback(batch_size):
    images_dir = "../../data/images/"
    with open(images_dir + "file_list.txt", "r") as f:
        file_label = [line.rstrip().split(" ") for line in f if line != ""]
        files, labels = zip(*file_label)
    full_iterations = len(files) // batch_size

    def callback(sample_info):
        sample_idx = sample_info.idx_in_epoch
        if sample_info.iteration >= full_iterations:
            # Indicate end of the epoch
            raise StopIteration
        jpeg_filename = files[sample_idx]
        label = np.int32([labels[sample_idx]])
        with open(images_dir + jpeg_filename, "rb") as f:
            encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
        return encoded_img, label

    return callback

请注意外部函数如何准备通用配置,就像 ExternalInputCallable 中的 __init__ 方法一样,以及内部回调如何对应于 __call__ 方法。

[14]:
@pipeline_def(
    batch_size=batch_size,
    num_threads=2,
    device_id=0,
    py_num_workers=4,
    py_start_method="spawn",
)
def parallel_pipeline_with_closure():
    jpegs, labels = fn.external_source(
        source=create_callback(batch_size),
        num_outputs=2,
        batch=False,
        parallel=True,
        dtype=[types.UINT8, types.INT32],
    )
    decode = fn.decoders.image(jpegs, device="mixed")
    return decode, labels
[15]:
import sys

ver = sys.version_info
if ver.major < 3 or (ver.major == 3 and ver.minor < 8):
    print("Python < 3.8 is not supported")
else:
    if __name__ == "__main__":
        parallel_pipe_with_closure = parallel_pipeline_with_closure()
        parallel_pipe_with_closure.build()

        parallel_out_with_closure = parallel_pipe_with_closure.run()
        display(parallel_out_with_closure[0], batch_size)

        # let Python know that the pipeline will no longer be needed, so that
        # the worker processes and resources can be closed and freed.
        del parallel_pipe_with_closure
../../../_images/examples_general_data_loading_parallel_external_source_35_0.png

pickle_by_value#

当使用 Jupyter notebook 时,您可能希望简单地全局创建您的通用配置,并创建一个使用它的回调,而不是编写嵌套函数。为了使此方法有效,您需要使用一种 pickle 方法,该方法序列化整个函数,而不是默认方法,默认方法仅存储函数的名称和定义模块。

[16]:
import nvidia.dali.pickling as dali_pickle

images_dir = "../../data/images/"
with open(images_dir + "file_list.txt", "r") as f:
    file_label = [line.rstrip().split(" ") for line in f if line != ""]
    files, labels = zip(*file_label)
full_iterations = len(files) // batch_size


@dali_pickle.pickle_by_value
def global_callback(sample_info):
    sample_idx = sample_info.idx_in_epoch
    if sample_info.iteration >= full_iterations:
        # Indicate end of the epoch
        raise StopIteration
    jpeg_filename = files[sample_idx]
    label = np.int32([labels[sample_idx]])
    with open(images_dir + jpeg_filename, "rb") as f:
        encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
    return encoded_img, label

请注意,使上述示例工作所需的唯一内容是 @dali_pickle.pickle_by_value 修饰器。

[17]:
@pipeline_def(
    batch_size=batch_size,
    num_threads=2,
    device_id=0,
    py_num_workers=4,
    py_start_method="spawn",
)
def parallel_pipeline_global_cb():
    jpegs, labels = fn.external_source(
        source=global_callback,
        num_outputs=2,
        batch=False,
        parallel=True,
        dtype=[types.UINT8, types.INT32],
    )
    decode = fn.decoders.image(jpegs, device="mixed")
    return decode, labels
[18]:
import sys

ver = sys.version_info
if ver.major < 3 or (ver.major == 3 and ver.minor < 8):
    print("Python < 3.8 is not supported")
else:
    if __name__ == "__main__":
        parallel_pipe_global_cb = parallel_pipeline_global_cb()
        parallel_pipe_global_cb.build()

        parallel_out_global_cb = parallel_pipe_global_cb.run()
        display(parallel_out_global_cb[0], batch_size)

        # let Python know that the pipeline will no longer be needed, so that
        # the worker processes and resources can be closed and freed.
        del parallel_pipe_global_cb
../../../_images/examples_general_data_loading_parallel_external_source_40_0.png

自定义序列化#

在底层,DALI 对回调使用标准的 pickle 机制。如果您需要自定义 pickle 回调的方式,请考虑将它们实现为可调用对象(ExternalInputCallable 是一个类,其实例是可调用的示例)。您可以通过手动制作 __getstate____setstate____reduce__ 方法来控制自定义类的序列化(您可以在 pickle 文档中找到有关此方法的更多信息)。或者,您可以通过 dispatch_tables 机制 为您的类注册自定义 reducer

如果需要,您可以使 DALI 使用外部包(如 dillcloudpickle)序列化任何类型的回调。要实现这一点,只需将所需的模块作为 pipeline 的 py_callback_pickler 参数传递即可。

py_callback_pickler 的有效值是实现 dumpsloads 方法的模块/对象,或者是元组,其中第一项是模块/对象,接下来的两个可选参数是在调用 dumps 和 loads 时要传递的额外 kwargs。提供的方法和 kwargs 本身必须可以使用标准 pickle.dumps 进行 pickle。

让我们再次尝试使用闭包的示例

[19]:
def create_callback(batch_size):
    images_dir = "../../data/images/"
    with open(images_dir + "file_list.txt", "r") as f:
        file_label = [line.rstrip().split(" ") for line in f if line != ""]
        files, labels = zip(*file_label)
    full_iterations = len(files) // batch_size

    def callback(sample_info):
        sample_idx = sample_info.idx_in_epoch
        if sample_info.iteration >= full_iterations:
            # Indicate end of the epoch
            raise StopIteration
        jpeg_filename = files[sample_idx]
        label = np.int32([labels[sample_idx]])
        with open(images_dir + jpeg_filename, "rb") as f:
            encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
        return encoded_img, label

    return callback
[20]:
try:
    import dill
except:
    print("This example requires dill package")
else:

    @pipeline_def(
        batch_size=batch_size,
        num_threads=2,
        device_id=0,
        py_num_workers=4,
        py_start_method="spawn",
        py_callback_pickler=(dill, {"recurse": True}),
    )
    def parallel_pipeline_with_dill():
        jpegs, labels = fn.external_source(
            source=create_callback(batch_size),
            num_outputs=2,
            batch=False,
            parallel=True,
            dtype=[types.UINT8, types.INT32],
        )
        decode = fn.decoders.image(jpegs, device="mixed")
        return decode, labels

这次,我们将使用 dill 来序列化回调。我们将额外的 recurse=True 参数传递给 dill.dumps 调用,以便 dill 尝试跟踪序列化回调内部引用的全局对象,并且需要一起序列化。

[21]:
try:
    import dill
except:
    print("This example requires dill package")
else:
    if __name__ == "__main__":
        parallel_pipe_with_dill = parallel_pipeline_with_dill()
        parallel_pipe_with_dill.build()

        parallel_out_with_dill = parallel_pipe_with_dill.run()
        display(parallel_out_with_dill[0], batch_size)
../../../_images/examples_general_data_loading_parallel_external_source_46_0.png

序列化和繁重的设置#

通常,用作 source 的可调用对象在构造时执行一些设置(例如,组合工作进程将并行读取的文件列表)。如果 py_start_method 设置为 spawn,则构造的对象将被序列化(默认情况下使用 Python 的 pickle 模块)。但是,对于特别大的设置(当可调用对象的成员达到数百 MB 大小时),由于序列化的内存占用,序列化可能会变得缓慢或不可能。在这种情况下,某些设置部分应推迟到在工作进程中反序列化可调用对象之后。

让我们在一个(稍微人为的)示例中看到这一点,该示例的可调用对象包含一个大的文件名列表,并分配一个大的 numpy 数组用作缓冲区。该示例假设 pickle 用作序列化方法,并依赖于标准的 pickle 机制来自定义序列化。

[22]:
class ExternalInputCallable:
    def __init__(self, start=0, end=128 * 1024 * 1024):
        self.start = start
        self.end = end
        # Normally, the setup would take place here, but we defer it
        self.buffer = None
        self.files = None

    def __getstate__(self):
        # This will be run in the main process to serialize the callable
        return self.__dict__.copy()

    def __setstate__(self, state):
        self.__dict__.update(state)
        # Deferred setup is done in each of the worker processes
        self.buffer = np.arange(self.start, self.end, dtype=np.int32).reshape(
            (-1, 1024, 1024)
        )
        self.files = [f"filename_{i}" for i in range(self.start, self.end)]

    def __call__(self, sample_info):
        if sample_info.idx_in_epoch >= self.end - self.start:
            raise StopIteration
        file_idx = int(self.files[sample_info.idx_in_epoch].split("_")[1])
        sample_from_buffer = self.buffer[sample_info.idx_in_epoch]
        return np.array(file_idx, dtype=np.int32), sample_from_buffer


@pipeline_def(
    batch_size=8,
    num_threads=2,
    device_id=0,
    py_num_workers=4,
    py_start_method="spawn",
)
def parallel_pipeline():
    idx, sample = fn.external_source(
        source=ExternalInputCallable(),
        num_outputs=2,
        batch=False,
        parallel=True,
        dtype=[types.INT32, types.INT32],
    )
    return idx, sample

洗牌和分片#

到目前为止,本教程中介绍的 pipelines 在每个 epoch 中都以完全相同的顺序返回数据集。在以下部分中,我们将修改 source 回调,以便在 epoch 之间对数据集进行洗牌。此外,我们将使回调意识到分片。

在 epochs 之间洗牌数据集#

要打乱你的数据集,你可以置换索引。当使用 SampleInfo 实例调用 source 时,source 回调函数可以使用一些 permutation 函数来返回 permutation(sample_info.idx_in_epoch)-th 样本,而不是 sample_info.idx_in_epoch-th 样本。

请记住,你正在处理 source 回调函数的多个副本。每个副本都应该以相同的方式打乱数据,否则你可能会混淆你的数据集,并在同一个 epoch 中多次返回相同的样本。

要在 epoch 之间打乱数据集,你可以使用 sample_info.epoch_idx 并使用 epoch_idx 参数化 permutation

以下是如何调整 ExternalInputCallable 示例以处理 epoch 之间的数据打乱。

[23]:
class ExternalInputCallable:
    def __init__(self, batch_size):
        self.images_dir = "../../data/images/"
        self.batch_size = batch_size
        with open(self.images_dir + "file_list.txt", "r") as f:
            file_label = [line.rstrip().split(" ") for line in f if line != ""]
            self.files, self.labels = zip(*file_label)
        # If the dataset size is not divisible by the batch size, the last
        # incomplete batch will be omitted. Alternatively, you may pad the dataset
        # by repeating the last sample in `self.files` and `self.labels` to
        # make the dataset size divisible by the `batch_size`.
        self.full_iterations = len(self.files) // batch_size

        self.perm = None  # permutation of indices
        self.last_seen_epoch = (
            # so that we don't have to recompute the `self.perm` for every sample
            None
        )

    def __call__(self, sample_info):
        if sample_info.iteration >= self.full_iterations:
            # Indicate end of the epoch
            raise StopIteration
        if self.last_seen_epoch != sample_info.epoch_idx:
            self.last_seen_epoch = sample_info.epoch_idx
            self.perm = np.random.default_rng(seed=42 + sample_info.epoch_idx)
            self.perm = self.perm.permutation(len(self.files))
        sample_idx = self.perm[sample_info.idx_in_epoch]

        jpeg_filename = self.files[sample_idx]
        label = np.int32([self.labels[sample_idx]])
        with open(self.images_dir + jpeg_filename, "rb") as f:
            encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
        return encoded_img, label

分片#

现在,让我们向 callable 对象添加 shard_idnum_shards 参数,以便它可以在任何给定的 epoch 中处理数据集的非重叠子集。请注意,所有分片在同一个 epoch 中使用相同的置换。

[24]:
class ExternalInputCallable:
    def __init__(self, batch_size, shard_id=0, num_shards=1):
        self.images_dir = "../../data/images/"
        self.batch_size = batch_size
        with open(self.images_dir + "file_list.txt", "r") as f:
            file_label = [line.rstrip().split(" ") for line in f if line != ""]
            self.files, self.labels = zip(*file_label)

        self.shard_id = shard_id
        self.num_shards = num_shards
        # If the dataset size is not divisibvle by number of shards,
        # the trailing samples will be omitted.
        self.shard_size = len(self.files) // num_shards
        self.shard_offset = self.shard_size * shard_id
        # If the shard size is not divisible by the batch size, the last
        # incomplete batch will be omitted.
        self.full_iterations = self.shard_size // batch_size
        self.perm = None  # permutation of indices
        self.last_seen_epoch = (
            # so that we don't have to recompute the `self.perm` for every sample
            None
        )

    def __call__(self, sample_info):
        if sample_info.iteration >= self.full_iterations:
            # Indicate end of the epoch
            raise StopIteration
        if self.last_seen_epoch != sample_info.epoch_idx:
            self.last_seen_epoch = sample_info.epoch_idx
            self.perm = np.random.default_rng(seed=42 + sample_info.epoch_idx)
            self.perm = self.perm.permutation(len(self.files))
        sample_idx = self.perm[sample_info.idx_in_epoch + self.shard_offset]

        jpeg_filename = self.files[sample_idx]
        label = np.int32([self.labels[sample_idx]])
        with open(self.images_dir + jpeg_filename, "rb") as f:
            encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
        return encoded_img, label

我们也可以用嵌套函数的形式来表达 callable 对象。

[25]:
@dali_pickle.pickle_by_value
def create_callback(batch_size, shard_id=0, num_shards=1):
    images_dir = "../../data/images/"
    with open(images_dir + "file_list.txt", "r") as f:
        file_label = [line.rstrip().split(" ") for line in f if line != ""]
        files, labels = zip(*file_label)

    dataset_size = len(files)
    shard_size = dataset_size // num_shards
    shard_offset = shard_size * shard_id
    full_iterations = shard_size // batch_size
    perm = None  # permutation of indices
    # so that we don't have to recompute the `perm` for every sample
    last_seen_epoch = None

    def callback(sample_info):
        nonlocal perm, last_seen_epoch
        if sample_info.iteration >= full_iterations:
            # Indicate end of the epoch
            raise StopIteration
        if last_seen_epoch != sample_info.epoch_idx:
            last_seen_epoch = sample_info.epoch_idx
            perm = np.random.default_rng(seed=42 + sample_info.epoch_idx)
            perm = perm.permutation(dataset_size)
        sample_idx = perm[sample_info.idx_in_epoch + shard_offset]

        jpeg_filename = files[sample_idx]
        label = np.int32([labels[sample_idx]])
        with open(images_dir + jpeg_filename, "rb") as f:
            encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
        return encoded_img, label

    return callback

让我们使用上面的回调函数将包含 21 张图像的初始数据集拆分到两个 pipeline 之间,每个 pipeline 在任何给定的 epoch 中处理 10 张图像的非重叠子集。

在这里,我们决定如果数据集大小不能被分片数量或 batch size 整除,则丢弃尾部的样本。请注意,由于 epoch 之间的数据打乱,不同的样本将在不同的 epoch 中被省略。尽管如此,你也可以决定通过重复某些样本来填充数据集和分片,以便每个样本都在每个 epoch 中使用。

[26]:
batch_size = 5


@pipeline_def(
    batch_size=batch_size,
    num_threads=2,
    device_id=0,
    py_num_workers=4,
    py_start_method="spawn",
)
def shuffling_sharding_pipepline(shard_id, num_shards):
    jpegs, labels = fn.external_source(
        source=create_callback(batch_size, shard_id, num_shards),
        num_outputs=2,
        batch=False,
        parallel=True,
        dtype=[types.UINT8, types.INT32],
    )
    decode = fn.decoders.image(jpegs, device="mixed")
    return decode, labels
[27]:
import sys

ver = sys.version_info
if ver.major < 3 or (ver.major == 3 and ver.minor < 8):
    print("Python < 3.8 is not supported")
else:
    if __name__ == "__main__":
        pipe_1 = shuffling_sharding_pipepline(shard_id=0, num_shards=2)
        pipe_1.build()

        for _ in range(2):
            images, labels = pipe_1.run()
            display(images, batch_size, columns=batch_size)

        try:
            pipe_1.run()
        except StopIteration:
            pass
        else:
            raise Exception("Expected only 10 images in the epoch")

        # let Python know that the pipeline will no longer be needed, so that
        # the worker processes and resources can be closed and freed.
        del pipe_1
../../../_images/examples_general_data_loading_parallel_external_source_58_0.png
../../../_images/examples_general_data_loading_parallel_external_source_58_1.png
[28]:
import sys

ver = sys.version_info
if ver.major < 3 or (ver.major == 3 and ver.minor < 8):
    print("Python < 3.8 is not supported")
else:
    if __name__ == "__main__":
        pipe_2 = shuffling_sharding_pipepline(shard_id=1, num_shards=2)
        pipe_2.build()

        for _ in range(2):
            images, labels = pipe_2.run()
            display(images, batch_size, columns=batch_size)

        try:
            pipe_2.run()
        except StopIteration:
            pass
        else:
            raise Exception("Expected only 10 images in the epoch")

        # let Python know that the pipeline will no longer be needed, so that
        # the worker processes and resources can be closed and freed.
        del pipe_2
../../../_images/examples_general_data_loading_parallel_external_source_59_0.png
../../../_images/examples_general_data_loading_parallel_external_source_59_1.png

容器中的并行外部源#

并行外部源使用操作系统共享内存将样本从 worker 传递到主进程。由于 pipeline 内的缓冲,DALI 可能需要分配足够的内存来一次存储几个迭代。因此,并行外部源可能需要大量的共享内存。但是,某些环境限制了 /dev/shm 的大小。

例如,当使用 Docker 运行 DALI 时,你可能需要在调用 docker run 时传递 --shm-size 参数,或者,如果你不能传递额外的参数,则在运行的容器中手动修改 /dev/shm 的大小。