并行外部数据源#

在本教程中,我们将向您展示如何在 external_source 操作符中启用并行模式,从而允许 Python 工作进程并发执行 source。这样做可以通过允许源在后台运行,解除主 Python 线程的阻塞,来减少每次迭代的执行时间。并非所有 external_source 的用法都可以直接并行化 - source 参数受到一系列限制,并且需要额外的配置。

接受的 source#

根据 batch 参数,并行外部数据源提供的 source 应该返回批次或单个样本。在样本模式下,DALI 负责将样本组合成批次,并有更多机会并行化输入数据的计算。因此,样本模式是运行并行外部数据源的首选方式。样本模式下的并行外部数据源对 source 参数有以下要求

  1. source 必须是可调用对象:函数或对象。

  2. source 回调必须接受一个参数:nvidia.dali.types.SampleInfo - 指示请求的样本的索引。

  3. 回调返回的数据必须是 CPU 数组(或它们的元组/列表)。

以下段落将解释这些要求背后的原因,并展示使用并行外部数据源的示例。

操作原理#

在每次迭代之前,DALI 外部数据源操作符都会查询其 source 参数以获取新数据,并将其传递到 Pipeline 中进行进一步处理。通过调用 source(当 source 是可调用对象时)或调用 next(source)(在可迭代对象的情况下)从 source 获取数据所需的时间可能很长,并且会影响处理迭代的时间 - 尤其是在主 Python 线程中这是一个阻塞操作。

external_source 节点设置 parallel=True 指示 Pipeline 在 DALI 启动的 Python 工作进程中运行 source。工作进程绑定到 Pipeline,并由该 Pipeline 中的所有并行外部数据源共享。每个工作进程都保留 source 回调/对象的副本。工作进程是独立的进程,因此请记住,它们不共享任何全局状态,只共享启动它们之前指定的副本。

在样本模式下,每个进程都可以通过使用包含请求样本索引的 SampleInfo 对象调用其 source 回调副本,来请求特定的样本。
DALI Pipeline 提前在工作进程之间分配计算下一个批次所需的样本,并收集数据以供当前迭代使用。

在批次模式下,DALI 无法请求返回特定样本,因此与样本模式相比,并行化的好处有限。如果 source 是接受 BatchInfo 的可调用对象,则可以并行预取几个批次。对于可迭代对象,唯一的优势是在单独的进程中运行可迭代对象。

由于并行样本模式可以提供最大的加速,因此我们介绍如何调整可迭代源以在并行样本模式下运行。

警告

当 Pipeline 启用条件执行时,必须采取额外的步骤来防止 source 被 AutoGraph 重写。有两种方法可以实现这一点

  1. 在全局范围内定义函数(即在 pipeline_def 范围之外)。

  2. 如果函数是另一个“工厂”函数的结果,则工厂函数必须在 Pipeline 定义函数之外定义,并使用 <nvidia.dali.pipeline.do_not_convert> 修饰。

更多详细信息可以在 nvidia.dali.pipeline.do_not_convert 文档中找到。

Pipeline 示例和 source#

让我们采用 外部数据源操作符教程 中的简化示例。我们将把它转换为可调用对象作为下一步。

[1]:
import numpy as np
from nvidia.dali import pipeline_def
import nvidia.dali.fn as fn
import nvidia.dali.types as types

batch_size = 4


class ExternalInputIterator:
    def __init__(self, batch_size):
        self.images_dir = "../../data/images/"
        self.batch_size = batch_size
        with open(self.images_dir + "file_list.txt", "r") as f:
            file_label = [line.rstrip().split(" ") for line in f if line != ""]
            self.files, self.labels = zip(*file_label)

    def __iter__(self):
        self.i = 0
        # Only full batches
        self.n = (len(self.files) // self.batch_size) * self.batch_size
        return self

    def __next__(self):
        if self.i >= self.n:
            raise StopIteration()
        encoded_imgs, labels = [], []
        for _ in range(self.batch_size):
            jpeg_filename = self.files[self.i]
            label = self.labels[self.i]
            with open(self.images_dir + jpeg_filename, "rb") as f:
                encoded_imgs.append(np.frombuffer(f.read(), dtype=np.uint8))
            labels.append(np.int32([label]))
            self.i += 1
        return encoded_imgs, labels

调整为可调用对象#

我们需要在这里进行两项调整

  1. 将其转换为每样本模式

  2. 不要在 __next__ 中按顺序返回数据,而是实现 __call__,使其能够返回特定样本。

我们可以保留初始化对象的 __init__。我们不再需要 __iter__

现在,信息不再由迭代器跟踪要返回的下一个样本(或批次),而是通过 sample_info 参数从 DALI 传入 - 描述样本索引的成员变量在 SampleInfo 类型文档中指定。

Epoch(轮次)#

此外,我们预先计算了每个 epoch 中完整批次的数量。这样我们将丢弃最后一个部分批次 - 或者,我们可以支持填充批次,但这超出了本教程的范围。并行模式下的外部数据源不支持部分批次。

建议所有 source 回调的副本都为相同的 iteration(以及在该迭代中请求的批次的所有样本索引)引发 StopIteration

这就是为什么我们计算完整批次的数量,并为任何大于或等于迭代索引的情况引发 StopIteration

在引发 StopIteration 之后,我们可以调用 pipeline.reset() 来重置 SampleInfo - DALI 将从样本索引和迭代索引 0 开始计数。

[2]:
class ExternalInputCallable:
    def __init__(self, batch_size):
        self.images_dir = "../../data/images/"
        self.batch_size = batch_size
        with open(self.images_dir + "file_list.txt", "r") as f:
            file_label = [line.rstrip().split(" ") for line in f if line != ""]
            self.files, self.labels = zip(*file_label)
        self.full_iterations = len(self.files) // batch_size

    def __call__(self, sample_info):
        sample_idx = sample_info.idx_in_epoch
        if sample_info.iteration >= self.full_iterations:
            # Indicate end of the epoch
            raise StopIteration()
        jpeg_filename = self.files[sample_idx]
        label = np.int32([self.labels[sample_idx]])
        with open(self.images_dir + jpeg_filename, "rb") as f:
            encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
        return encoded_img, label

可调用对象仅用作示例。如果您愿意,可以使用无状态函数,例如

[3]:
def my_callback(sample_info):
    return np.full((5, 5), sample_info.idx_in_epoch)

Pipeline 定义#

现在我们可以定义两个 Pipeline - 暂时不使用并行模式 - 以查看两种方法是否都给出相同的结果。请注意,我们在第二个示例中设置了 batch=False

[4]:
@pipeline_def(batch_size=batch_size, num_threads=2, device_id=0)
def iterable_pipeline():
    jpegs, labels = fn.external_source(
        source=ExternalInputIterator(batch_size),
        num_outputs=2,
        dtype=[types.UINT8, types.INT32],
    )
    decode = fn.decoders.image(jpegs, device="mixed")
    return decode, labels


@pipeline_def(batch_size=batch_size, num_threads=2, device_id=0)
def callable_pipeline():
    jpegs, labels = fn.external_source(
        source=ExternalInputCallable(batch_size),
        num_outputs=2,
        batch=False,
        dtype=[types.UINT8, types.INT32],
    )
    decode = fn.decoders.image(jpegs, device="mixed")
    return decode, labels

测试 Pipeline#

让我们引入一些辅助代码来显示结果。

[5]:
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import math


def display(outputs, count, columns=2, captions=None, cpu=False):
    rows = int(math.ceil(len(outputs) / columns))
    fig = plt.figure()
    fig.set_size_inches(16, 6 * rows)
    gs = gridspec.GridSpec(rows, columns)
    row = 0
    col = 0
    for i in range(count):
        plt.subplot(gs[i])
        plt.axis("off")
        if captions is not None:
            plt.title(captions[i])
        plt.imshow(outputs.at(i) if cpu else outputs.as_cpu().at(i))

现在我们可以构建并运行 Pipeline。

[6]:
iter_pipe = iterable_pipeline()
iter_pipe.build()
iter_out = iter_pipe.run()

call_pipe = callable_pipeline()
call_pipe.build()
call_out = call_pipe.run()
[7]:
display(iter_out[0], batch_size)
../../../_images/examples_general_data_loading_parallel_external_source_17_0.png
[8]:
display(call_out[0], batch_size)
../../../_images/examples_general_data_loading_parallel_external_source_18_0.png

开始并行#

现在我们有了一个带有可调用 source 的 Pipeline,它与并行 external_source 要求兼容,我们可以调整我们的 Pipeline。我们需要修改我们的 callable_pipeline,通过为 external_source 操作符设置 parallel=True

我们可以为 Pipeline 设置两个额外的参数来控制 Python 工作进程的行为

  1. py_num_workers - 设置工作进程的数量 - 默认值为 1,但我们将使用更多,以便在计算批次时实现更好的并行性。

  2. py_start_method - 控制工作进程的启动方式。

启动 Python 工作进程#

并行外部数据源使用 Python multiprocessing 库来控制工作进程。DALI 支持两种启动工作进程的方法:'fork''spawn'。有关启动方法的详细信息,请参阅 此处。DALI 不支持 'forkserver' 方法,因为工作进程的启动在处理开始时完成一次,并且使用此方法不会比其他两种方法产生任何额外的好处。

一般来说,source 回调应遵守 multiprocessing 编程指南,以避免不必要的行为或错误。

Fork#

第一个方法 'fork' 创建当前 Python 解释器的分支,这意味着新进程是相同的,并且原始进程中存在的所有对象在新进程中也可用。但是,有一个限制,即获取 CUDA 上下文的进程(通常通过与 CUDA 代码的任何交互来完成,例如创建框架 CUDA 张量,在 DALI Pipeline 上调用 build() 等)无法被 fork。

为了缓解这个问题,Pipeline 方法提供了一个名为 start_py_workers() 的方法,可以在我们需要与 CUDA 交互之前使用它来启动工作进程。

典型的做法是在与 CUDA 交互之前执行以下操作

  1. 定义所有 DALI Pipeline(不要使用 build())。

  2. 收集所有 DALI Pipeline 对象。

  3. 在每个 Pipeline 对象上运行 start_py_workers()

完成这些步骤后,可以构建 Pipeline 并访问 CUDA。

在本教程中,我们已经使用 DALI 在 GPU 上运行操作,因此当前的 Jupyter Notebook Kernel 实例已经获取了 CUDA 上下文。

在单独的教程 并行外部数据源 - Fork 中可以看到使用 start_py_workers() 的示例,我们在该教程中等待获取 CUDA 上下文。

Spawn#

'spawn' 方法启动一个新的 Python 解释器进程,并通过序列化和反序列化来共享必要的状态。因此,source 可调用对象需要是 可 pickle 的 才能使用此方法。

当使用 'spawn' 时,请确保主模块可以安全导入,方法是使用 if __name__ == '__main__' 保护入口点,如 multiprocessing 编程指南中所述。

'spawn' 不会干扰 CUDA 上下文,并且无需单独调用 start_py_workers() - 它在 build() 步骤内部完成。由于本教程笔记本已经获取了 CUDA 上下文,我们将使用此方法来运行我们的 Pipeline。

不幸的是,Jupyter Notebook 再次存在限制 - 可调用对象必须来自新进程可见的 Python 模块。作为一种变通方法,我们将 ExternalInputCallable 的定义写入辅助文件并再次导入它,以便它来自命名的模块并且易于 pickle。

[9]:
external_input_callable_def = """
import numpy as np

class ExternalInputCallable:
    def __init__(self, batch_size):
        self.images_dir = "../../data/images/"
        self.batch_size = batch_size
        with open(self.images_dir + "file_list.txt", 'r') as f:
            file_label = [line.rstrip().split(' ') for line in f if line != '']
            self.files, self.labels = zip(*file_label)
        self.full_iterations = len(self.files) // batch_size

    def __call__(self, sample_info):
        sample_idx = sample_info.idx_in_epoch
        if sample_info.iteration >= self.full_iterations:
            # Indicate end of the epoch
            raise StopIteration()
        jpeg_filename = self.files[sample_idx]
        label = np.int32([self.labels[sample_idx]])
        with open(self.images_dir + jpeg_filename, 'rb') as f:
            encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
        return encoded_img, label
"""


with open("external_input_tmp_module.py", "w") as f:
    f.write(external_input_callable_def)

import external_input_tmp_module

在常规 Python 代码中不需要这种变通方法。

您也可以将回调定义为函数,而无需将其导出到另一个文件。您可以在函数序列化部分阅读有关该方法的更多信息。

使用 Spawned Python Workers 运行 Pipeline#

现在,我们可以通过将 parallel=True 添加到 external_source 并将参数 py_num_workerspy_start_method 传递给 Pipeline 来调整 Pipeline 定义以并行运行。请记住也使用新导入的 source 定义。

[10]:
@pipeline_def(
    batch_size=batch_size,
    num_threads=2,
    device_id=0,
    py_num_workers=4,
    py_start_method="spawn",
)
def parallel_pipeline():
    jpegs, labels = fn.external_source(
        source=external_input_tmp_module.ExternalInputCallable(batch_size),
        num_outputs=2,
        batch=False,
        parallel=True,
        dtype=[types.UINT8, types.INT32],
    )
    decode = fn.decoders.image(jpegs, device="mixed")
    return decode, labels
[11]:
if __name__ == "__main__":
    parallel_pipe = parallel_pipeline()
    parallel_pipe.build()
[12]:
if __name__ == "__main__":
    parallel_out = parallel_pipe.run()
    display(parallel_out[0], batch_size)

    # let Python know that the pipeline will no longer be needed, so that
    # the worker processes and resources can be closed and freed.
    del parallel_pipe
../../../_images/examples_general_data_loading_parallel_external_source_29_0.png

函数序列化#

默认情况下,局部函数和 lambda 不可 pickle。DALI 将使用自定义 pickler 对此类回调进行 pickle,该 pickler 扩展了默认的 pickle 行为。

此外,为了解决 Jupyter Notebook 中的导入问题,您可以将全局定义的函数标记为按值而不是按引用(Python 中的默认行为)序列化。

有关如何序列化 lambda 和局部函数的不同想法,请参阅 自定义序列化 部分。

首先,让我们使用嵌套函数/闭包来模拟 ExternalInputCallable 的行为。

[13]:
def create_callback(batch_size):
    images_dir = "../../data/images/"
    with open(images_dir + "file_list.txt", "r") as f:
        file_label = [line.rstrip().split(" ") for line in f if line != ""]
        files, labels = zip(*file_label)
    full_iterations = len(files) // batch_size

    def callback(sample_info):
        sample_idx = sample_info.idx_in_epoch
        if sample_info.iteration >= full_iterations:
            # Indicate end of the epoch
            raise StopIteration
        jpeg_filename = files[sample_idx]
        label = np.int32([labels[sample_idx]])
        with open(images_dir + jpeg_filename, "rb") as f:
            encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
        return encoded_img, label

    return callback

请注意,外部函数如何准备通用配置,类似于 ExternalInputCallable 中的 __init__ 方法,以及内部回调如何对应于 __call__ 方法。

[14]:
@pipeline_def(
    batch_size=batch_size,
    num_threads=2,
    device_id=0,
    py_num_workers=4,
    py_start_method="spawn",
)
def parallel_pipeline_with_closure():
    jpegs, labels = fn.external_source(
        source=create_callback(batch_size),
        num_outputs=2,
        batch=False,
        parallel=True,
        dtype=[types.UINT8, types.INT32],
    )
    decode = fn.decoders.image(jpegs, device="mixed")
    return decode, labels
[15]:
import sys

ver = sys.version_info
if ver.major < 3 or (ver.major == 3 and ver.minor < 8):
    print("Python < 3.8 is not supported")
else:
    if __name__ == "__main__":
        parallel_pipe_with_closure = parallel_pipeline_with_closure()
        parallel_pipe_with_closure.build()

        parallel_out_with_closure = parallel_pipe_with_closure.run()
        display(parallel_out_with_closure[0], batch_size)

        # let Python know that the pipeline will no longer be needed, so that
        # the worker processes and resources can be closed and freed.
        del parallel_pipe_with_closure
../../../_images/examples_general_data_loading_parallel_external_source_35_0.png

pickle_by_value#

当使用 Jupyter Notebook 时,您可能希望简单地全局创建通用配置,并创建一个使用它的回调,而不是编写嵌套函数。为了使这种方法有效,您需要使用一种 pickle 方法,该方法序列化整个函数,而不是默认方法,后者仅存储函数的名称和定义模块。

[16]:
import nvidia.dali.pickling as dali_pickle

images_dir = "../../data/images/"
with open(images_dir + "file_list.txt", "r") as f:
    file_label = [line.rstrip().split(" ") for line in f if line != ""]
    files, labels = zip(*file_label)
full_iterations = len(files) // batch_size


@dali_pickle.pickle_by_value
def global_callback(sample_info):
    sample_idx = sample_info.idx_in_epoch
    if sample_info.iteration >= full_iterations:
        # Indicate end of the epoch
        raise StopIteration
    jpeg_filename = files[sample_idx]
    label = np.int32([labels[sample_idx]])
    with open(images_dir + jpeg_filename, "rb") as f:
        encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
    return encoded_img, label

请注意,使上述示例工作所需的唯一操作是 @dali_pickle.pickle_by_value 装饰器。

[17]:
@pipeline_def(
    batch_size=batch_size,
    num_threads=2,
    device_id=0,
    py_num_workers=4,
    py_start_method="spawn",
)
def parallel_pipeline_global_cb():
    jpegs, labels = fn.external_source(
        source=global_callback,
        num_outputs=2,
        batch=False,
        parallel=True,
        dtype=[types.UINT8, types.INT32],
    )
    decode = fn.decoders.image(jpegs, device="mixed")
    return decode, labels
[18]:
import sys

ver = sys.version_info
if ver.major < 3 or (ver.major == 3 and ver.minor < 8):
    print("Python < 3.8 is not supported")
else:
    if __name__ == "__main__":
        parallel_pipe_global_cb = parallel_pipeline_global_cb()
        parallel_pipe_global_cb.build()

        parallel_out_global_cb = parallel_pipe_global_cb.run()
        display(parallel_out_global_cb[0], batch_size)

        # let Python know that the pipeline will no longer be needed, so that
        # the worker processes and resources can be closed and freed.
        del parallel_pipe_global_cb
../../../_images/examples_general_data_loading_parallel_external_source_40_0.png

自定义序列化#

在底层,DALI 对回调使用标准的 pickle 机制。如果您需要自定义 pickle 回调的方式,请考虑将它们实现为可调用对象(ExternalInputCallable 是一个类,其实例是可调用的示例)。您可以通过手动制作 __getstate____setstate____reduce__ 方法来控制自定义类的序列化(您可以在 pickle 文档中找到有关此方法的更多信息)。或者,您可以通过 dispatch_tables 机制 为您的类注册自定义 reducer

如果需要,您可以使 DALI 使用外部包(例如 dillcloudpickle)序列化任何类型的回调。要实现这一点,只需将所需的模块作为 Pipeline 的 py_callback_pickler 参数传递即可。

py_callback_pickler 的有效值是实现 dumpsloads 方法的模块/对象,或者是一个元组,其中第一项是模块/对象,接下来的两个可选参数是在调用 dumps 和 loads 时分别传递的额外 kwargs。提供的方法和 kwargs 本身必须可以使用标准 pickle.dumps 进行 pickle。

让我们再次尝试使用闭包的示例

[19]:
def create_callback(batch_size):
    images_dir = "../../data/images/"
    with open(images_dir + "file_list.txt", "r") as f:
        file_label = [line.rstrip().split(" ") for line in f if line != ""]
        files, labels = zip(*file_label)
    full_iterations = len(files) // batch_size

    def callback(sample_info):
        sample_idx = sample_info.idx_in_epoch
        if sample_info.iteration >= full_iterations:
            # Indicate end of the epoch
            raise StopIteration
        jpeg_filename = files[sample_idx]
        label = np.int32([labels[sample_idx]])
        with open(images_dir + jpeg_filename, "rb") as f:
            encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
        return encoded_img, label

    return callback
[20]:
try:
    import dill
except:
    print("This example requires dill package")
else:

    @pipeline_def(
        batch_size=batch_size,
        num_threads=2,
        device_id=0,
        py_num_workers=4,
        py_start_method="spawn",
        py_callback_pickler=(dill, {"recurse": True}),
    )
    def parallel_pipeline_with_dill():
        jpegs, labels = fn.external_source(
            source=create_callback(batch_size),
            num_outputs=2,
            batch=False,
            parallel=True,
            dtype=[types.UINT8, types.INT32],
        )
        decode = fn.decoders.image(jpegs, device="mixed")
        return decode, labels

这一次,我们将使用 dill 来序列化回调。我们将额外的 recurse=True 参数传递给 dill.dumps 调用,以便 dill 尝试跟踪序列化回调内部引用的全局对象,并且需要一起序列化。

[21]:
try:
    import dill
except:
    print("This example requires dill package")
else:
    if __name__ == "__main__":
        parallel_pipe_with_dill = parallel_pipeline_with_dill()
        parallel_pipe_with_dill.build()

        parallel_out_with_dill = parallel_pipe_with_dill.run()
        display(parallel_out_with_dill[0], batch_size)
../../../_images/examples_general_data_loading_parallel_external_source_46_0.png

序列化和繁重的设置#

通常,用作 source 的可调用对象在构造时执行一些设置(例如,组成工作进程将并行读取的文件列表)。如果 py_start_method 设置为 spawn,则构造的对象将被序列化(默认情况下使用 Python 的 pickle 模块)。但是,对于特别大的设置(当可调用对象的成员达到数百 MB 大小时),由于序列化的内存占用,序列化可能会变得缓慢或不可能。在这种情况下,某些设置部分应推迟到可调用对象在工作进程中反序列化之后。

让我们在一个(稍微人为的)示例中看到这一点,该示例的可调用对象包含一个大的文件名列表,并分配一个大的 numpy 数组用作缓冲区。该示例假设 pickle 用作序列化方法,并依赖于标准的 pickle 机制来自定义序列化。

[22]:
class ExternalInputCallable:
    def __init__(self, start=0, end=128 * 1024 * 1024):
        self.start = start
        self.end = end
        # Normally, the setup would take place here, but we defer it
        self.buffer = None
        self.files = None

    def __getstate__(self):
        # This will be run in the main process to serialize the callable
        return self.__dict__.copy()

    def __setstate__(self, state):
        self.__dict__.update(state)
        # Deferred setup is done in each of the worker processes
        self.buffer = np.arange(self.start, self.end, dtype=np.int32).reshape(
            (-1, 1024, 1024)
        )
        self.files = [f"filename_{i}" for i in range(self.start, self.end)]

    def __call__(self, sample_info):
        if sample_info.idx_in_epoch >= self.end - self.start:
            raise StopIteration
        file_idx = int(self.files[sample_info.idx_in_epoch].split("_")[1])
        sample_from_buffer = self.buffer[sample_info.idx_in_epoch]
        return np.array(file_idx, dtype=np.int32), sample_from_buffer


@pipeline_def(
    batch_size=8,
    num_threads=2,
    device_id=0,
    py_num_workers=4,
    py_start_method="spawn",
)
def parallel_pipeline():
    idx, sample = fn.external_source(
        source=ExternalInputCallable(),
        num_outputs=2,
        batch=False,
        parallel=True,
        dtype=[types.INT32, types.INT32],
    )
    return idx, sample

洗牌和分片#

到目前为止,本教程中介绍的 Pipeline 在每个 epoch 中都以完全相同的顺序返回数据集。在以下部分中,我们将修改 source 回调,以便在 epoch 之间对数据集进行洗牌。此外,我们将使回调知道分片。

在 epoch 之间洗牌数据集#

要洗牌您的数据集,您可以置换索引。当使用 SampleInfo 实例调用 source 时,source 回调可以使用某些 permutation 函数来返回 permutation(sample_info.idx_in_epoch)-th 样本,而不是 sample_info.idx_in_epoch-th 样本。

请记住,您正在处理 source 回调的多个副本。每个副本都应以相同的方式洗牌数据,否则您可能会混合数据集并在同一 epoch 中多次返回相同的样本。

要在 epoch 之间洗牌数据集,您可以使用 sample_info.epoch_idx 并使用 epoch_idx 参数化 permutation

以下是如何调整 ExternalInputCallable 示例以处理 epoch 之间的数据洗牌。

[23]:
class ExternalInputCallable:
    def __init__(self, batch_size):
        self.images_dir = "../../data/images/"
        self.batch_size = batch_size
        with open(self.images_dir + "file_list.txt", "r") as f:
            file_label = [line.rstrip().split(" ") for line in f if line != ""]
            self.files, self.labels = zip(*file_label)
        # If the dataset size is not divisible by the batch size, the last
        # incomplete batch will be omitted. Alternatively, you may pad the dataset
        # by repeating the last sample in `self.files` and `self.labels` to
        # make the dataset size divisible by the `batch_size`.
        self.full_iterations = len(self.files) // batch_size

        self.perm = None  # permutation of indices
        self.last_seen_epoch = (
            # so that we don't have to recompute the `self.perm` for every sample
            None
        )

    def __call__(self, sample_info):
        if sample_info.iteration >= self.full_iterations:
            # Indicate end of the epoch
            raise StopIteration
        if self.last_seen_epoch != sample_info.epoch_idx:
            self.last_seen_epoch = sample_info.epoch_idx
            self.perm = np.random.default_rng(seed=42 + sample_info.epoch_idx)
            self.perm = self.perm.permutation(len(self.files))
        sample_idx = self.perm[sample_info.idx_in_epoch]

        jpeg_filename = self.files[sample_idx]
        label = np.int32([self.labels[sample_idx]])
        with open(self.images_dir + jpeg_filename, "rb") as f:
            encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
        return encoded_img, label

分片#

现在,让我们将 shard_idnum_shards 参数添加到可调用对象,以便它可以在任何给定 epoch 中处理数据集的非重叠子集。请注意,所有分片在同一 epoch 中使用相同的置换。

[24]:
class ExternalInputCallable:
    def __init__(self, batch_size, shard_id=0, num_shards=1):
        self.images_dir = "../../data/images/"
        self.batch_size = batch_size
        with open(self.images_dir + "file_list.txt", "r") as f:
            file_label = [line.rstrip().split(" ") for line in f if line != ""]
            self.files, self.labels = zip(*file_label)

        self.shard_id = shard_id
        self.num_shards = num_shards
        # If the dataset size is not divisibvle by number of shards,
        # the trailing samples will be omitted.
        self.shard_size = len(self.files) // num_shards
        self.shard_offset = self.shard_size * shard_id
        # If the shard size is not divisible by the batch size, the last
        # incomplete batch will be omitted.
        self.full_iterations = self.shard_size // batch_size
        self.perm = None  # permutation of indices
        self.last_seen_epoch = (
            # so that we don't have to recompute the `self.perm` for every sample
            None
        )

    def __call__(self, sample_info):
        if sample_info.iteration >= self.full_iterations:
            # Indicate end of the epoch
            raise StopIteration
        if self.last_seen_epoch != sample_info.epoch_idx:
            self.last_seen_epoch = sample_info.epoch_idx
            self.perm = np.random.default_rng(seed=42 + sample_info.epoch_idx)
            self.perm = self.perm.permutation(len(self.files))
        sample_idx = self.perm[sample_info.idx_in_epoch + self.shard_offset]

        jpeg_filename = self.files[sample_idx]
        label = np.int32([self.labels[sample_idx]])
        with open(self.images_dir + jpeg_filename, "rb") as f:
            encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
        return encoded_img, label

我们还可以以嵌套函数的形式表达可调用对象。

[25]:
@dali_pickle.pickle_by_value
def create_callback(batch_size, shard_id=0, num_shards=1):
    images_dir = "../../data/images/"
    with open(images_dir + "file_list.txt", "r") as f:
        file_label = [line.rstrip().split(" ") for line in f if line != ""]
        files, labels = zip(*file_label)

    dataset_size = len(files)
    shard_size = dataset_size // num_shards
    shard_offset = shard_size * shard_id
    full_iterations = shard_size // batch_size
    perm = None  # permutation of indices
    # so that we don't have to recompute the `perm` for every sample
    last_seen_epoch = None

    def callback(sample_info):
        nonlocal perm, last_seen_epoch
        if sample_info.iteration >= full_iterations:
            # Indicate end of the epoch
            raise StopIteration
        if last_seen_epoch != sample_info.epoch_idx:
            last_seen_epoch = sample_info.epoch_idx
            perm = np.random.default_rng(seed=42 + sample_info.epoch_idx)
            perm = perm.permutation(dataset_size)
        sample_idx = perm[sample_info.idx_in_epoch + shard_offset]

        jpeg_filename = files[sample_idx]
        label = np.int32([labels[sample_idx]])
        with open(images_dir + jpeg_filename, "rb") as f:
            encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
        return encoded_img, label

    return callback

让我们使用上述回调在两个 Pipeline 之间拆分 21 个图像的初始数据集,每个 Pipeline 在任何给定 epoch 中处理 10 个图像的非重叠子集。

在这里,我们决定如果数据集大小不能被分片数或批次大小整除,则丢弃尾随样本。请注意,由于 epoch 之间的数据洗牌,不同的样本将在不同的 epoch 中被省略。尽管如此,您可能会决定通过重复某些样本来填充数据集和分片,以便每个样本都在每个 epoch 中使用。

[26]:
batch_size = 5


@pipeline_def(
    batch_size=batch_size,
    num_threads=2,
    device_id=0,
    py_num_workers=4,
    py_start_method="spawn",
)
def shuffling_sharding_pipepline(shard_id, num_shards):
    jpegs, labels = fn.external_source(
        source=create_callback(batch_size, shard_id, num_shards),
        num_outputs=2,
        batch=False,
        parallel=True,
        dtype=[types.UINT8, types.INT32],
    )
    decode = fn.decoders.image(jpegs, device="mixed")
    return decode, labels
[27]:
import sys

ver = sys.version_info
if ver.major < 3 or (ver.major == 3 and ver.minor < 8):
    print("Python < 3.8 is not supported")
else:
    if __name__ == "__main__":
        pipe_1 = shuffling_sharding_pipepline(shard_id=0, num_shards=2)
        pipe_1.build()

        for _ in range(2):
            images, labels = pipe_1.run()
            display(images, batch_size, columns=batch_size)

        try:
            pipe_1.run()
        except StopIteration:
            pass
        else:
            raise Exception("Expected only 10 images in the epoch")

        # let Python know that the pipeline will no longer be needed, so that
        # the worker processes and resources can be closed and freed.
        del pipe_1
../../../_images/examples_general_data_loading_parallel_external_source_58_0.png
../../../_images/examples_general_data_loading_parallel_external_source_58_1.png
[28]:
import sys

ver = sys.version_info
if ver.major < 3 or (ver.major == 3 and ver.minor < 8):
    print("Python < 3.8 is not supported")
else:
    if __name__ == "__main__":
        pipe_2 = shuffling_sharding_pipepline(shard_id=1, num_shards=2)
        pipe_2.build()

        for _ in range(2):
            images, labels = pipe_2.run()
            display(images, batch_size, columns=batch_size)

        try:
            pipe_2.run()
        except StopIteration:
            pass
        else:
            raise Exception("Expected only 10 images in the epoch")

        # let Python know that the pipeline will no longer be needed, so that
        # the worker processes and resources can be closed and freed.
        del pipe_2
../../../_images/examples_general_data_loading_parallel_external_source_59_0.png
../../../_images/examples_general_data_loading_parallel_external_source_59_1.png

容器中的并行外部数据源#

并行外部数据源使用 OS 共享内存将样本从工作进程传递到主进程。由于 Pipeline 内的缓冲,DALI 可能需要分配足够的内存来一次存储几次迭代。因此,并行外部数据源可能需要大量的共享内存。但是,某些环境限制了 /dev/shm 的大小。

例如,当使用 Docker 运行 DALI 时,您可能需要在调用 docker run 时传递 --shm-size 参数,或者,如果您无法传递额外的参数,则在运行的容器中手动修改 /dev/shm 的大小。