并行外部数据源 - Fork#

在本教程中,我们将向您展示如何使用 'fork' 方法启动 Python 工作进程,以并行模式运行外部数据源操作符。本教程假设您已经熟悉之前的 并行外部数据源 教程,该教程描述了 source 参数的要求,以及如何配置 pipeline 和操作符。它还解释了并行模式下外部数据源的工作原理。

使用 Fork 启动的步骤#

请记住,在我们计划使用的所有 DALI pipeline 中,我们都不希望在 worker 线程之前获取 CUDA 上下文。

如之前的教程所述,我们需要遵循以下步骤

  1. 定义所有 DALI pipeline(不要使用 build()

  2. 收集所有 DALI pipeline 对象。

  3. 在每个 pipeline 对象上运行 start_py_workers()

在我们与 CUDA 交互之前(例如,通过使用在 GPU 上运行的深度学习框架)。

示例 Pipeline 和 source#

让我们采用之前教程中定义的 source 可调用对象。与 'spawn' 方法相比,此进程中的所有代码最终都会被复制到 fork 进程中,因此我们不会遇到序列化问题。

警告

当 pipeline 启用条件执行时,必须采取额外步骤以防止 source 被 AutoGraph 重写。 有两种方法可以实现这一点

  1. 在全局作用域中定义函数(即在 pipeline_def 作用域之外)。

  2. 如果函数是另一个“工厂”函数的结果,则工厂函数必须在 pipeline 定义函数之外定义,并使用 <nvidia.dali.pipeline.do_not_convert> 进行装饰。

更多详细信息可以在 nvidia.dali.pipeline.do_not_convert 文档中找到。

可调用对象#

[1]:
import numpy as np
from nvidia.dali import pipeline_def
import nvidia.dali.fn as fn
import nvidia.dali.types as types

batch_size = 4


class ExternalInputCallable:
    def __init__(self, batch_size):
        self.images_dir = "../../data/images/"
        self.batch_size = batch_size
        with open(self.images_dir + "file_list.txt", "r") as f:
            file_label = [line.rstrip().split(" ") for line in f if line != ""]
            self.files, self.labels = zip(*file_label)
        self.full_iterations = len(self.files) // batch_size

    def __call__(self, sample_info):
        sample_idx = sample_info.idx_in_epoch
        if sample_info.iteration >= self.full_iterations:
            # Indicate end of the epoch
            raise StopIteration()
        jpeg_filename = self.files[sample_idx]
        label = np.int32([self.labels[sample_idx]])
        with open(self.images_dir + jpeg_filename, "rb") as f:
            encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
        return encoded_img, label

Pipeline 定义#

我们想展示如何使用 'fork' 方法启动多个 pipeline,因此我们将定义两个 pipeline - 这通常发生在训练和验证 pipeline 中。第一个将使用随机增强,第二个则不会。

我们定义了两个 pipeline,对外部数据源使用 parallel=True,每个 Pipeline 使用 4 个 worker。'fork' 是默认的启动方法。

[2]:
@pipeline_def(
    batch_size=batch_size, num_threads=2, device_id=0, py_num_workers=4
)
def training_pipeline():
    jpegs, labels = fn.external_source(
        source=ExternalInputCallable(batch_size),
        num_outputs=2,
        batch=False,
        parallel=True,
        dtype=[types.UINT8, types.INT32],
    )
    decode = fn.decoders.image(jpegs, device="mixed")
    contrast = fn.random.uniform(range=(1, 3.0))
    enhance = fn.brightness_contrast(decode, contrast=contrast)
    return enhance, labels


@pipeline_def(
    batch_size=batch_size, num_threads=2, device_id=0, py_num_workers=4
)
def validation_pipeline():
    jpegs, labels = fn.external_source(
        source=ExternalInputCallable(batch_size),
        num_outputs=2,
        batch=False,
        parallel=True,
        dtype=[types.UINT8, types.INT32],
    )
    decode = fn.decoders.image(jpegs, device="mixed")
    enhance = fn.brightness_contrast(decode, contrast=1.5)
    return enhance, labels

显示结果#

让我们引入一些辅助代码来显示结果。

[3]:
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import math


def display(outputs, count, columns=2, captions=None, cpu=False):
    rows = int(math.ceil(len(outputs) / columns))
    fig = plt.figure()
    fig.set_size_inches(16, 6 * rows)
    gs = gridspec.GridSpec(rows, columns)
    row = 0
    col = 0
    for i in range(count):
        plt.subplot(gs[i])
        plt.axis("off")
        if captions is not None:
            plt.title(captions[i])
        plt.imshow(outputs.at(i) if cpu else outputs.as_cpu().at(i))

启动 Python 工作进程#

在定义了 pipeline 之后,我们可以继续执行步骤 2 和 3,即收集所有 pipeline 对象并启动其 worker。

[4]:
train_pipe = training_pipeline()
val_pipe = validation_pipeline()

pipelines = [train_pipe, val_pipe]

for pipe in pipelines:
    pipe.start_py_workers()

运行 Pipeline#

现在我们已经为所有 pipeline 启动了 Python 工作进程,我们可以继续构建和运行它们,这将在本 Jupyter Notebook 中获取 CUDA 上下文。如果不重启 Jupyter Notebook Kernel,您将无法使用 'fork' 方法启动更多 worker 进程。

[5]:
for pipe in pipelines:
    pipe.build()
[6]:
train_out = train_pipe.run()
val_out = val_pipe.run()
[7]:
display(train_out[0], batch_size)
../../../_images/examples_general_data_loading_parallel_external_source_fork_16_0.png
[8]:
display(val_out[0], batch_size)
../../../_images/examples_general_data_loading_parallel_external_source_fork_17_0.png