并行外部数据源 - Fork#
在本教程中,我们将向您展示如何使用 'fork'
方法启动 Python 工作进程,以并行模式运行外部数据源操作符。本教程假设您已经熟悉之前的 并行外部数据源 教程,该教程描述了 source
参数的要求,以及如何配置 pipeline 和操作符。它还解释了并行模式下外部数据源的工作原理。
使用 Fork 启动的步骤#
请记住,在我们计划使用的所有 DALI pipeline 中,我们都不希望在 worker 线程之前获取 CUDA 上下文。
如之前的教程所述,我们需要遵循以下步骤
定义所有 DALI pipeline(不要使用
build()
)收集所有 DALI pipeline 对象。
在每个 pipeline 对象上运行
start_py_workers()
。
在我们与 CUDA 交互之前(例如,通过使用在 GPU 上运行的深度学习框架)。
示例 Pipeline 和 source
#
让我们采用之前教程中定义的 source
可调用对象。与 'spawn'
方法相比,此进程中的所有代码最终都会被复制到 fork 进程中,因此我们不会遇到序列化问题。
警告
当 pipeline 启用条件执行时,必须采取额外步骤以防止 source
被 AutoGraph 重写。 有两种方法可以实现这一点
在全局作用域中定义函数(即在
pipeline_def
作用域之外)。如果函数是另一个“工厂”函数的结果,则工厂函数必须在 pipeline 定义函数之外定义,并使用
<nvidia.dali.pipeline.do_not_convert>
进行装饰。
更多详细信息可以在 nvidia.dali.pipeline.do_not_convert
文档中找到。
可调用对象#
[1]:
import numpy as np
from nvidia.dali import pipeline_def
import nvidia.dali.fn as fn
import nvidia.dali.types as types
batch_size = 4
class ExternalInputCallable:
def __init__(self, batch_size):
self.images_dir = "../../data/images/"
self.batch_size = batch_size
with open(self.images_dir + "file_list.txt", "r") as f:
file_label = [line.rstrip().split(" ") for line in f if line != ""]
self.files, self.labels = zip(*file_label)
self.full_iterations = len(self.files) // batch_size
def __call__(self, sample_info):
sample_idx = sample_info.idx_in_epoch
if sample_info.iteration >= self.full_iterations:
# Indicate end of the epoch
raise StopIteration()
jpeg_filename = self.files[sample_idx]
label = np.int32([self.labels[sample_idx]])
with open(self.images_dir + jpeg_filename, "rb") as f:
encoded_img = np.frombuffer(f.read(), dtype=np.uint8)
return encoded_img, label
Pipeline 定义#
我们想展示如何使用 'fork'
方法启动多个 pipeline,因此我们将定义两个 pipeline - 这通常发生在训练和验证 pipeline 中。第一个将使用随机增强,第二个则不会。
我们定义了两个 pipeline,对外部数据源使用 parallel=True
,每个 Pipeline 使用 4 个 worker。'fork'
是默认的启动方法。
[2]:
@pipeline_def(
batch_size=batch_size, num_threads=2, device_id=0, py_num_workers=4
)
def training_pipeline():
jpegs, labels = fn.external_source(
source=ExternalInputCallable(batch_size),
num_outputs=2,
batch=False,
parallel=True,
dtype=[types.UINT8, types.INT32],
)
decode = fn.decoders.image(jpegs, device="mixed")
contrast = fn.random.uniform(range=(1, 3.0))
enhance = fn.brightness_contrast(decode, contrast=contrast)
return enhance, labels
@pipeline_def(
batch_size=batch_size, num_threads=2, device_id=0, py_num_workers=4
)
def validation_pipeline():
jpegs, labels = fn.external_source(
source=ExternalInputCallable(batch_size),
num_outputs=2,
batch=False,
parallel=True,
dtype=[types.UINT8, types.INT32],
)
decode = fn.decoders.image(jpegs, device="mixed")
enhance = fn.brightness_contrast(decode, contrast=1.5)
return enhance, labels
显示结果#
让我们引入一些辅助代码来显示结果。
[3]:
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import math
def display(outputs, count, columns=2, captions=None, cpu=False):
rows = int(math.ceil(len(outputs) / columns))
fig = plt.figure()
fig.set_size_inches(16, 6 * rows)
gs = gridspec.GridSpec(rows, columns)
row = 0
col = 0
for i in range(count):
plt.subplot(gs[i])
plt.axis("off")
if captions is not None:
plt.title(captions[i])
plt.imshow(outputs.at(i) if cpu else outputs.as_cpu().at(i))
启动 Python 工作进程#
在定义了 pipeline 之后,我们可以继续执行步骤 2 和 3,即收集所有 pipeline 对象并启动其 worker。
[4]:
train_pipe = training_pipeline()
val_pipe = validation_pipeline()
pipelines = [train_pipe, val_pipe]
for pipe in pipelines:
pipe.start_py_workers()
运行 Pipeline#
现在我们已经为所有 pipeline 启动了 Python 工作进程,我们可以继续构建和运行它们,这将在本 Jupyter Notebook 中获取 CUDA 上下文。如果不重启 Jupyter Notebook Kernel,您将无法使用 'fork'
方法启动更多 worker 进程。
[5]:
for pipe in pipelines:
pipe.build()
[6]:
train_out = train_pipe.run()
val_out = val_pipe.run()
[7]:
display(train_out[0], batch_size)

[8]:
display(val_out[0], batch_size)
