Pipeline#
在 DALI 中,任何数据处理任务都有一个中心对象,称为 Pipeline。Pipeline 对象是 nvidia.dali.Pipeline
或派生类的实例。Pipeline 封装了数据处理图和执行引擎。
您可以通过以下方式定义 DALI Pipeline
通过实现一个在内部使用 DALI 算子的函数,并使用
pipeline_def()
装饰器对其进行装饰。通过直接实例化
Pipeline
对象,构建图,并使用Pipeline.set_outputs()
设置 pipeline 输出。通过继承
Pipeline
类并重写Pipeline.define_graph()
(这是定义 DALI Pipeline 的传统方法)。
数据处理图#
DALI pipeline 表示为操作图。图中有两种节点:
算子 - 每次调用算子时创建
数据节点(参见
DataNode
)- 表示算子的输出和输入;它们从算子的调用中返回,并将它们作为输入传递给其他算子,从而在图中建立连接。
数据节点可以通过调用 算子函数 进行转换。它们还支持 Python 样式的 索引,并且可以合并到 数学表达式 中。
示例
@pipeline_def # create a pipeline with processing graph defined by the function below
def my_pipeline():
""" Create a pipeline which reads images and masks, decodes the images and
returns them. """
img_files, labels = fn.readers.file(file_root="image_dir", seed=1)
mask_files, _ = fn.readers.file(file_root="mask_dir", seed=1)
images = fn.decoders.image(img_files, device="mixed")
masks = fn.decoders.image(mask_files, device="mixed")
return images, masks, labels
pipe = my_pipeline(batch_size=4, num_threads=2, device_id=0)
结果图为
重要提示
pipeline 定义函数仅在构建 pipeline 时执行一次,并且通常返回 dali.DataNode
对象或其元组。为了方便起见,可以返回其他类型,例如 NumPy 数组,但这些类型被视为常量,并且仅评估一次。
处理图结构#
DALI pipeline 分阶段执行。这些阶段对应于可以为算子指定的 device
参数,并按以下顺序执行:
'cpu'
- 接受 CPU 输入并生成 CPU 输出的算子。'mixed'
- 接受 CPU 输入并生成 GPU 输出的算子,例如nvidia.dali.fn.decoders.image()
。'gpu'
- 接受 GPU 输入并生成 GPU 输出的算子。
CPU 算子生成的数据可以通过在 DataNode
(DALI 算子的输出)上调用 .gpu()
显式复制到 GPU。
后期阶段生成的数据不能被早期阶段执行的算子使用。
大多数 DALI 算子接受用于参数化其行为的其他关键字参数。这些命名的关键字参数(与位置输入不同)可以是:
Python 常量
参数输入 - CPU 算子的输出 - 在算子的文档字符串中表示为 TensorList。
在参数输入的情况下,将一个算子的输出作为另一个算子的命名关键字参数传递,将在处理图中建立连接。
这些参数将作为 DALI pipeline 图的一部分,在每次迭代和每个样本中计算。请记住,只有 CPU 算子可以用作参数输入。
示例
@pipeline_def
def my_pipeline():
img_files, labels = fn.readers.file(file_root='image_dir', device='cpu')
# `images` is GPU data (result of Mixed operator)
images = fn.decoders.image(img_files, device='mixed')
# `coin_flip` must be on CPU so the `flip_params` can be used as argument input
flip_param = fn.random.coin_flip(device='cpu')
# `images` is input (GPU) and `flip_param` is argument input (CPU)
flipped = fn.flip(images, horizontal=flip_param, device='gpu')
# `labels` is explicitly marked for transfer to GPU, `flipped` is already GPU
return flipped, labels.gpu()
pipe = my_pipeline(batch_size=4, num_threads=2, device_id=0)
注意
如果未指定 device
参数,则会根据输入的位置自动选择。如果至少有一个 GPU 输入,则假定为 device='gpu'
,否则使用 'cpu'
。
上面的示例为了清晰起见显式添加了 device
参数,但如果仅为 fn.decoders.image
指定 device='mixed'
,则效果相同。
当前 Pipeline#
不影响 pipeline 输出的子图会自动修剪。如果算子具有副作用(例如 PythonFunction
算子系列),则在未设置当前 pipeline 的情况下无法调用它。当在派生 pipeline 的 Pipeline.define_graph()
方法内部定义图时,将隐式设置当前 pipeline。否则,可以使用上下文管理器(with
语句)进行设置
pipe = dali.Pipeline(batch_size=N, num_threads=3, device_id=0)
with pipe:
src = dali.ops.ExternalSource(my_source, num_outputs=2)
a, b = src()
pipe.set_outputs(a, b)
当使用 pipeline_def()
创建 pipeline 时,定义 pipeline 的函数在新创建的 pipeline 的作用域内执行。以下示例与上一个示例等效:
@dali.pipeline_def(batch_size=N, num_threads=3, device_id=0)
def my_pipe(my_source):
return dali.fn.external_source(my_source, num_outputs=2)
pipe = my_pipe(my_source)
Pipeline 装饰器#
- @nvidia.dali.pipeline_def(fn=None, *, enable_conditionals=False, **pipeline_kwargs)#
将图定义函数转换为 DALI pipeline 工厂的装饰器。
图定义函数是返回预期 pipeline 输出的函数。您可以使用
@pipeline_def
装饰此函数@pipeline_def def my_pipe(flip_vertical, flip_horizontal): ''' Creates a DALI pipeline, which returns flipped and original images ''' data, _ = fn.readers.file(file_root=images_dir) img = fn.decoders.image(data, device="mixed") flipped = fn.flip(img, horizontal=flip_horizontal, vertical=flip_vertical) return flipped, img
装饰后的函数返回 DALI Pipeline 对象
pipe = my_pipe(True, False) # pipe.run() # the pipeline is not configured properly yet
pipeline 需要其他参数,例如批大小、worker 线程数、GPU 设备 ID 等(有关 pipeline 参数的完整列表,请参见
nvidia.dali.Pipeline()
)。这些参数可以作为附加的关键字参数提供,并传递给装饰后的函数pipe = my_pipe(True, False, batch_size=32, num_threads=1, device_id=0)
pipeline 已正确配置,我们现在可以运行它了。原始函数的输出变成了 Pipeline 的输出
flipped, img = pipe.run()
当某些 pipeline 参数是固定的时,可以在装饰器中按名称指定它们
@pipeline_def(batch_size=42, num_threads=3) def my_pipe(flip_vertical, flip_horizontal): ...
稍后在调用装饰后的函数时传递的任何 Pipeline 构造函数参数都将覆盖装饰器定义的参数
@pipeline_def(batch_size=32, num_threads=3) def my_pipe(): data = fn.external_source(source=my_generator) return data pipe = my_pipe(batch_size=128) # batch_size=128 overrides batch_size=32
警告
被装饰函数的参数可能会遮蔽 pipeline 构造函数参数 - 在这种情况下,无法更改它们的值。
注意
在图定义函数中使用
**kwargs
(可变关键字参数)是不允许的。它们可能会导致 Pipeline 构造函数意外地、静默地劫持一些同名参数。以这种方式编写的代码在未来版本的 DALI 中添加新参数到 Pipeline 构造函数时将停止工作。要访问
@pipeline_def
函数体内的任何 pipeline 参数,可以使用函数nvidia.dali.Pipeline.current()
@pipeline_def() def my_pipe(): pipe = Pipeline.current() batch_size = pipe.batch_size num_threads = pipe.num_threads ... pipe = my_pipe(batch_size=42, num_threads=3) ...
- 关键字参数:
enable_conditionals (bool, optional) – 启用对在 pipeline 定义中使用
if
语句有条件地执行 DALI 算子的支持,默认为 False。
条件执行#
DALI 允许使用 if
语句有条件地为批次中选定的样本执行算子。要启用此功能,请使用 @pipeline_def
装饰器定义 pipeline,并将 enable_conditionals
设置为 True
。
每个将 DataNode()
作为条件的 if
语句都将被识别为 DALI 条件语句。
例如,此 pipeline 以 25% 的概率将每张图像旋转 10 到 30 度之间的随机角度
@pipeline_def(enable_conditionals=True)
def random_rotate():
jpegs, _ = fn.readers.file(device="cpu", file_root=images_dir)
images = fn.decoders.image(jpegs, device="mixed")
do_rotate = fn.random.coin_flip(probability=0.25, dtype=DALIDataType.BOOL)
if do_rotate:
result = fn.rotate(images, angle=fn.random.uniform(range=(10, 30)), fill_value=0)
else:
result = images
return result
DALI 条件语句的语义可以理解为代码一次处理一个样本。
条件必须由标量样本表示 - 即具有 0-d 形状。它可以是布尔值或 DALI 支持的任何数值类型 - 在后一种情况下,非零值被视为 True,零值被视为 False,这符合典型的 Python 语义。
此外,逻辑表达式 and
、or
和 not
可以用于 DataNode()
。前两个仅限于布尔输入,not
允许与 if
语句条件相同的输入类型。逻辑表达式在评估时遵循短路规则。
您可以在条件教程中阅读更多内容。
防止 AutoGraph 转换#
- @nvidia.dali.pipeline.do_not_convert#
阻止 AutoGraph 转换函数的装饰器。
在条件模式下,DALI 使用 TensorFlow 的 AutoGraph 的一个分支来转换代码,使我们能够重写和检测
if
语句,以便它们可以在处理 DALI pipeline 中使用。AutoGraph 转换应用于 pipeline 定义中调用的任何顶级函数或方法(以及 pipeline 定义本身)。当一个函数被转换时,在其语法范围内定义的所有函数也会被转换。重写除其他效果外,还使这些函数不可序列化。
要阻止函数被转换,必须使用此装饰器标记其顶级包含函数。有时这可能需要将函数重构到外部作用域。
external source
的并行模式(parallel=True
)要求其source
参数是可序列化的。为了防止重写source
,用于创建source
的函数应使用@do_not_convert
进行装饰。注意
只有不处理
DataNode
(因此不使用 DALI 算子)的函数才应使用此装饰器标记。例如
from nvidia.dali import pipeline_def, fn @pipeline_def(enable_conditionals=True) def pipe(): def source_factory(size): def source_fun(sample_info): return np.full(size, sample_info.iter_idx) return source_fun source = source_factory(size=(2, 1)) return fn.external_source(source=source, parallel=True, batch=False)
应转换为
from nvidia.dali import pipeline_def, fn from nvidia.dali.pipeline import do_not_convert @do_not_convert def source_factory(size): def source_fun(sample_info): return np.full(size, sample_info.iter_idx) return source_fun @pipeline_def(enable_conditionals=True) def pipe(): source = source_factory(size=(2, 1)) return fn.external_source(source=source, parallel=True, batch=False)
source_factory
必须被分解出来,否则它将被作为 pipeline 定义的一部分进行转换。由于我们有兴趣防止source_fun
的 AutoGraph 转换,我们需要装饰其顶级包含函数。注意
如果在 pipeline 定义之外声明一个函数,并且作为参数传递,但未在 pipeline 定义中直接调用,则它不会被转换。在这种情况下,传递给
external source
算子、python function
算子系列或Numba function
算子的回调不被视为在 pipeline 定义中直接调用。此类回调在 pipeline 运行时执行,因此在 pipeline 定义和构建之后执行。例如
from nvidia.dali import pipeline_def, fn def source_fun(sample_info): return np.full((2, 2), sample_info.iter_idx) @pipeline_def(enable_conditionals=True) def pipe(): return fn.external_source(source=source_fun, batch=False)
source_fun
不会被转换,因为它是在 pipeline 定义之外定义的,并且仅通过名称传递给外部源。
Pipeline 类#
- class nvidia.dali.Pipeline(batch_size=-1, num_threads=-1, device_id=-1, seed=-1, exec_pipelined=True, prefetch_queue_depth=2, exec_async=True, bytes_per_sample=0, set_affinity=False, max_streams=None, default_cuda_stream_priority=None, *, enable_memory_stats=False, enable_checkpointing=False, checkpoint=None, py_num_workers=1, py_start_method='fork', py_callback_pickler=None, output_dtype=None, output_ndim=None, exec_dynamic=False, experimental_exec_dynamic=None)#
Pipeline 类是所有 DALI 数据 pipeline 的基础。pipeline 封装了数据处理图和执行引擎。
- 参数:
batch_size¶ (int, optional, default = -1) –
pipeline 的最大批大小。此参数的负值无效 - 默认值只能用于序列化的 pipeline(使用序列化的 pipeline 中存储的值代替)。在大多数情况下,pipeline 的实际批大小将等于最大批大小。也支持以较小的批大小运行 DALI Pipeline。批大小可能会在迭代之间更改。
请注意,DALI 可能会根据此参数执行内存预分配。将其设置得过高可能会导致内存不足错误。
num_threads¶ (int, optional, default = -1) – pipeline 使用的 CPU 线程数。此参数的负值无效 - 默认值只能用于序列化的 pipeline(使用序列化的 pipeline 中存储的值代替)。
device_id¶ (int, optional, default = -1) – pipeline 使用的 GPU 的 ID。此参数的 None 值表示 DALI 不应使用 GPU 或 CUDA 运行时。这会将 pipeline 限制为仅 CPU 算子,但允许它在任何支持 CPU 的机器上运行。
seed¶ (int, optional, default = -1) – 用于随机数生成的种子。为此参数保留默认值将导致随机种子。
exec_pipelined¶ (bool, optional, default = True) – 是否以允许 CPU 和 GPU 计算重叠的方式执行 pipeline,通常会导致更快的执行速度,但会消耗更多的内存。
prefetch_queue_depth¶ (int or {"cpu_size": int, "gpu_size": int}, optional, default = 2) –
执行器 pipeline 的深度。更深的 pipeline 使 DALI 更能抵抗每个批次执行时间的不均匀性,但它也会消耗更多内存用于内部缓冲区。指定字典
{ "cpu_size": x, "gpu_size": y }
而不是整数将导致 pipeline 使用分离队列执行器,其中 CPU 阶段的缓冲区队列大小为 x,混合和 GPU 阶段的缓冲区队列大小为 y。执行器将分别缓冲 CPU 和 GPU 阶段,并在发出第一个
run()
时填充缓冲区队列。分离执行要求exec_async=True
、exec_pipelined=True
和exec_dynamic=False
。exec_async¶ (bool, optional, default = True) – 是否异步执行 pipeline。 这使得
run()
方法相对于调用 Python 线程异步运行。 为了与 pipeline 同步,需要调用outputs()
方法。exec_dynamic¶ (bool, optional, default = False) – 是否使用动态执行器。 动态执行器允许交错 CPU 和 GPU 算子,并执行 GPU 到 CPU 的复制。 它还为 pipeline 输出和算子间缓冲区使用动态内存分配,从而减少复杂 pipeline 中的内存消耗。 当
exec_dynamic
为True
时,exec_async
和exec_pipelined
必须保留其默认值 (True
)。bytes_per_sample¶ (int, optional, default = 0) – 为 DALI 提供关于其 tensors 应该使用多少内存的提示。
set_affinity¶ (bool, optional, default = False) – 是否将 CPU 核心亲和性设置为最接近所用 GPU 的核心。
max_streams¶ (int, deprecated, default = None) – 已弃用,此参数无效。
default_cuda_stream_priority¶ (int, optional, default = None) – 已弃用,此参数无效。
enable_memory_stats¶ (bool, optional, default = False) – DALI 是否应打印算子输出缓冲区统计信息。 对 bytes_per_sample_hint 算子参数很有用。 当
exec_dynamic
为True
时,此标志无效。enable_checkpointing¶ (bool, optional, default = False) –
如果为 True,DALI 将跟踪算子的状态。 在这种情况下,调用
checkpoint
方法会返回 pipeline 的序列化状态。 稍后可以使用作为checkpoint
参数传递的序列化状态重建相同的 pipeline,以从保存的迭代恢复运行。更多详细信息可以在 此文档部分 中找到。
checkpoint¶ (str, optional, default = None) –
从
checkpoint
方法接收的序列化 checkpoint。 构建 pipeline 后,其状态将从checkpoint
恢复,并且 pipeline 从保存的迭代恢复执行。更多详细信息可以在 此文档部分 中找到。
py_num_workers¶ (int, optional, default = 1) – 将处理并行
external_source()
回调的 Python worker 的数量。 仅当至少有一个 ExternalSource 的parallel
设置为 True 时,池才会启动。 将其设置为 0 将禁用池,并且即使parallel
设置为 True,所有 ExternalSource 算子都将回退到非并行模式。py_start_method¶ (str, default = "fork") –
确定如何启动 Python worker。 支持的方法
"fork"
- 通过 fork 进程启动"spawn"
- 启动新的解释器进程
如果使用
spawn
方法,则 ExternalSource 的回调必须是可 pickle 的。 为了使用fork
,启动 worker 时不得获取 CUDA 上下文。 因此,如果您需要构建使用 Python worker 的多个 pipeline,您需要在构建或运行任何 pipeline 之前调用start_py_workers()
(有关详细信息,请参见build()
)。 您可以在 Python 的multiprocessing
模块文档中找到有关这两种方法的更多详细信息和注意事项。py_callback_pickler¶ (module or tuple, default = None) –
如果
py_start_method
设置为 spawn,则传递给并行 ExternalSource 的回调必须是可 pickle 的。 如果在 Python 3.8 或更高版本中运行,且py_callback_pickler
设置为 None,则 DALI 在序列化回调时使用自定义 pickle,以支持本地函数和 lambda 的序列化。但是,如果您需要序列化更复杂的对象(例如本地类),或者您正在运行旧版本的 Python,您可以提供外部序列化包(例如 dill 或 cloudpickle),这些包实现了两个方法:dumps 和 loads,以使 DALI 使用它们来序列化外部源回调。 您可以直接将模块作为
py_callback_pickler
传递import dill @pipeline_def(py_callback_pickler=dill, ...) def create_pipeline(): src = fn.external_source( lambda sample_info: np.int32([42]), batch=False, parallel=True, ) ...
py_callback_pickler
的有效值是实现dumps
和loads
方法的模块/对象,或者是一个元组,其中第一个项目是模块/对象,接下来的两个可选参数是在调用 dumps 和 loads 时要传递的额外 kwargs。 提供的方法和 kwargs 必须可以使用标准 pickle.dumps 进行 pickle。如果您运行的是 Python 3.8 或更高版本,并且使用默认的 DALI pickler (
py_callback_pickler
= None),您可以通过使用 @dali.pickling.pickle_by_value 修饰全局函数来提示 DALI 按值而不是按引用序列化全局函数。 当使用 Jupyter notebook 时,这可能特别有用,以解决 worker 进程无法导入在 notebook 内定义为全局函数的回调的问题。output_dtype¶ (
nvidia.dali.types.DALIDataType
或其列表,default = None) –使用此参数,您可以声明您期望在给定输出中使用的数据类型。 您应传递 mod:types.DALIDataType 的列表,列表中的每个元素对应于 pipeline 的一个输出。 此外,您可以传递
None
作为通配符。 每次迭代后,输出将根据您传递给此参数的类型进行验证。 如果任何输出与提供的类型不匹配,则会引发 RuntimeError。如果
output_dtype
值是单个值(而不是列表),它将被广播到 pipeline 的输出数量。output_ndim¶ (int or list of ints, default = None) –
使用此参数,您可以声明您期望在给定输出中使用的维度数。 您应传递整数列表,列表中的每个元素对应于 pipeline 的一个输出。 此外,您可以传递
None
作为通配符。 每次迭代后,输出将根据您传递给此参数的维度数进行验证。 如果任何输出的维度与提供的ndim
不匹配,则会引发 RuntimeError。如果
output_ndim
值是单个值(而不是列表),它将被广播到 pipeline 的输出数量。
- __enter__()#
安全地将 pipeline 设置为当前 pipeline。 调用具有副作用或没有输出的算子需要当前 pipeline。 此类算子的示例包括 PythonFunction(潜在的副作用)或 DumpImage(无输出)。
任何悬空算子都可以标记为具有副作用,如果它被标记为 preserve=True,这对于调试很有用 - 否则,不贡献 pipeline 输出的算子将从图中删除。
要手动设置新的(和恢复以前的)当前 pipeline,请分别使用
push_current()
和pop_current()
。
- __exit__(exception_type, exception_value, traceback)#
安全地恢复以前的 pipeline。
- add_sink(edge)#
将 edge 标记为数据接收器,防止其被修剪,即使它未连接到 pipeline 输出。
- property batch_size#
Batch size。
- build()#
构建 pipeline(可选步骤)。
实例化 pipeline 的后端对象并启动处理线程。 如果 pipeline 使用多进程
external_source
,则 worker 进程也会启动。 在大多数情况下,无需手动调用 build。 当使用多进程时,可能需要在主进程与 GPU 进行任何交互之前调用build()
或start_py_workers()
。 如果需要,可以在运行 pipeline 之前使用build()
来分离图构建和处理步骤。Pipeline 在以下情况下会自动构建
运行,通过 run API (
run()
,schedule_run()
) 或特定于框架的插件,通过
feed_input()
提供输入访问 pipeline 元数据 (
epoch_size()
,reader_meta()
)访问输出 - 包括
output_stream()
需要以其他方式实现图 - 例如
save_graph_to_dot_file()
。
- checkpoint(filename=None)#
以序列化的 Protobuf 字符串形式返回 pipeline 的状态。
此外,如果指定了
filename
,则序列化的 checkpoint 将被写入指定的文件。 文件内容将被覆盖。稍后可以使用保存的 checkpoint 重建相同的 pipeline,并将其作为 checkpoint 参数传递,以从保存的迭代恢复执行。
更多详细信息可以在 此文档部分 中找到。
- 参数:
filename¶ (str) – 序列化 pipeline 将写入的文件。
- property cpu_queue_size#
CPU 阶段提前处理的迭代次数。
- static current()#
返回由
push_current()
设置的当前 pipeline 的实例。
- property default_cuda_stream_priority#
已弃用;始终为 0。
- define_graph()#
此函数由用户定义,用于构建其 pipeline 的算子图。
它返回通过调用 DALI 算子创建的输出列表。
- classmethod deserialize(serialized_pipeline=None, filename=None, **kwargs)#
反序列化并构建 pipeline。
反序列化先前使用
serialize()
方法序列化的 pipeline。返回的 pipeline 已经构建完成。
或者,可以传递额外的参数,这些参数将在实例化 pipeline 时使用。 有关完整参数列表,请参阅 Pipeline 构造函数。 默认情况下,pipeline 将使用来自序列化 pipeline 的参数进行实例化。
请注意,
serialized_pipeline
和filename
参数是互斥的
- deserialize_and_build(serialized_pipeline)#
反序列化并构建以序列化形式给出的 pipeline。
- 参数:
serialized_pipeline¶ (str) – 序列化 pipeline。
- property device_id#
pipeline 使用的 GPU 的 ID,对于仅 CPU pipeline 则为 None。
- empty()#
如果 pipeline 中有任何已调度但尚未消耗的工作
- enable_api_check(enable)#
允许在运行时启用或禁用 API 检查
- property enable_memory_stats#
如果为 True,则收集内存使用统计信息。
- epoch_size(name=None)#
pipeline 的 epoch size。
如果
name
参数为 None,则返回 (reader 名称, 该 reader 的 epoch size) 对的字典。 如果name
参数不为 None,则返回该 reader 的 epoch size。- 参数:
name¶ (str, optional, default = None) – 应用于获取 epoch size 的 reader。
- property exec_async#
如果为 true,则使用异步执行。
- property exec_dynamic#
如果为 true,则使用动态执行器。
- property exec_pipelined#
如果为 true,则使用 pipeline 执行模型。
- property exec_separated#
如果为 True,则 CPU 和 GPU 阶段有单独的预取队列。
- executor_statistics()#
以字典形式返回提供的 pipeline 执行器统计元数据。 字典中的每个键都是算子名称。 要启用它,请使用
executor_statistics
每个算子的可用元数据键
real_memory_size
- 算子的每个输出使用的内存大小列表。 列表中的索引对应于输出索引。max_real_memory_size
- 算子的每个输出使用的最大 tensor 大小列表。 列表中的索引对应于输出索引。reserved_memory_size
- 为每个算子输出保留的内存大小列表。 列表中的索引对应于输出索引。max_reserved_memory_size
- 为每个算子输出保留的每个 tensor 的最大内存大小列表。 列表中的索引对应于输出索引。
注意
当使用
exec_dynamic=True
时,执行器统计信息不可用。
- external_source_shm_statistics()#
返回关于共享内存消耗的并行外部源的统计信息。 返回的字典包含以下键
capacities
- 分配用于容纳并行外部源生成的数据的共享内存槽的大小(以字节为单位)列表。per_sample_capacities
- 共享内存槽的大小(以字节为单位)除以 mini-batch size 的列表,即此类槽中存储的最大样本数。 此值对应于外部源的bytes_per_sample_hint
参数,即,如果提示足够大并且外部源不需要重新分配内存,则这些值应相等。
- feed_input(data_node, data, layout=None, cuda_stream=None, use_copy_kernel=False)#
将多维数组或 DLPack(或其列表)传递给符合条件的算子。
可以使用此函数提供数据的算子是输入算子(即
fn.inputs
模块中的所有内容)和fn.external_source()
。对于 GPU 输入,必须在与
feed_input
使用的流相同的流上修改数据。 有关详细信息,请参阅cuda_stream
参数。为了避免停顿,数据应提前 prefetch_queue_depth 次提供。
- 参数:
data_node¶ (
DataNode
或字符串) – 符合条件的算子节点的名称或调用该算子返回的DataNode
对象。data¶ (ndarray 或 DLPack 或 其列表) –
数组可以是以下之一
NumPy ndarray (CPU)
MXNet ndarray (CPU)
PyTorch tensor (CPU 或 GPU)
CuPy array (GPU)
实现
__cuda_array_interface__
的对象DALI
TensorList
或 DALITensor
对象列表
要用作
data_node
引用的算子的输出的数据。layout¶ (字符串 或
None
) – 数据布局的描述(如果未指定,则为空字符串)。 它应该是长度与数据维度匹配的字符串,不包括 batch 维度。 对于通道优先图像的 batch,这应该是"CHW"
,对于通道最后的视频,它是"FHWC"
,依此类推。 如果data
是 DALITensorList
或 DALITensor
对象列表,并且layout
为None
,则布局取自data
。 数据布局在每次迭代中必须相同。cuda_stream¶ (可选,
cudaStream_t
或可转换为cudaStream_t
的对象) –例如
cupy.cuda.Stream
,torch.cuda.Stream
将用于将数据复制到 GPU 或从 GPU 源复制数据的 CUDA 流。 如果未设置,将尽最大努力保持正确性 - 即,如果数据是作为来自已识别库(CuPy、PyTorch)的 tensor/array 提供的,则使用该库的当前流。 这应该适用于典型场景,但高级用例(和使用不受支持的库的代码)可能仍需要显式提供流句柄。特殊值
0 - 使用默认 CUDA 流
-1 - 使用 DALI 的内部流
如果使用内部流,则对
feed_input
的调用将阻塞,直到复制到内部缓冲区完成,因为无法与此流同步以防止在另一个流中用新数据覆盖数组。use_copy_kernel¶ (可选,
bool
) – 如果设置为 True,DALI 将使用 CUDA kernel 来馈送数据(仅适用于将数据复制到/从 GPU 内存复制数据时),而不是cudaMemcpyAsync
(默认)。
- property gpu_queue_size#
GPU 阶段提前处理的迭代次数。
- property is_restored_from_checkpoint#
如果为 True,则此 pipeline 是从 checkpoint 恢复的。
- iter_setup()#
一种已弃用的为 pipeline 提供外部输入的方法。
用户定义的 pipeline 可以覆盖此函数,以执行每次迭代所需的任何设置。 例如,可以使用此函数从 NumPy 数组馈送输入数据。
此方法已弃用,不建议使用。 较新的执行模型可能与这种为 pipeline 提供数据的方法不兼容。 请尽可能使用
external_source
中的 source 参数代替。
- property max_batch_size#
最大批大小。
- property max_streams#
已弃用,未使用;返回 -1。
- property num_outputs#
管道输出的数量。
- property num_threads#
此管道使用的 CPU 线程数。
- output_dtype()#
输出端期望的数据类型。
- output_ndim()#
输出端期望的维度数。
- output_stream()#
返回在其上生成输出的内部 CUDA 流。
- outputs(cuda_stream=None)#
返回管道的输出并释放之前的缓冲区。
如果管道是异步执行的,则此函数会阻塞,直到结果可用。 如果数据集已到达末尾(通常在 iter_setup 无法生成更多数据时),则会引发 StopIteration。
- 参数:
cuda_stream¶ (可选,
cudaStream_t
或可转换为cudaStream_t
的对象) – 例如cupy.cuda.Stream
,torch.cuda.Stream
。 返回的 TensorLists 所绑定的流。 默认为 None,这意味着输出与主机同步。 仅适用于使用exec_dynamic=True
构建的管道。- 返回类型:
各个管道输出的 TensorList 对象列表
- static pop_current()#
恢复之前的管道作为当前管道。 与
push_current()
互补。
- property prefetch_queue_depth#
预取队列的深度(或深度),如
__init__
参数中指定的那样。
- static push_current(pipeline)#
将管道设置为当前管道并将之前的当前管道存储在堆栈上。 要恢复之前的管道作为当前管道,请使用
pop_current()
。为了确保在发生异常时正确恢复管道,请使用上下文管理器 (with my_pipeline:)。
调用具有副作用或没有输出的操作符需要当前管道。 此类操作符的示例包括 PythonFunction (潜在的副作用) 或 DumpImage (没有输出)。
任何悬空算子都可以标记为具有副作用,如果它被标记为 preserve=True,这对于调试很有用 - 否则,不贡献 pipeline 输出的算子将从图中删除。
- property py_num_workers#
并行
`external_source`
使用的 Python 工作进程数。
- property py_start_method#
并行
`external_source`
使用的启动 Python 工作进程的方法。
- reader_meta(name=None)#
以字典形式返回提供的读取器元数据。 如果未提供名称,则提供包含所有读取器数据的字典,格式为 {reader_name : meta}
可用的元数据键
epoch_size
: 原始 epoch 大小epoch_size_padded
: epoch 大小,末尾填充以可被分片数整除
number_of_shards
: 分片数shard_id
: 给定读取器的分片 IDpad_last_batch
: 给定读取器是否应填充最后一个批次stick_to_shard
: 给定读取器是否应坚持其分片- 参数:
name¶ (str, 可选, 默认值 = None) – 应用于获取 shards_number 的读取器。
- release_outputs()#
释放 share_outputs 调用返回的缓冲区。
当输出调用结果被消耗(复制)并且缓冲区可以在下次调用 share_outputs 之前标记为可用时,这很有帮助。 它为用户提供了更好的控制,可以控制何时运行管道,何时获取结果缓冲区以及何时可以在结果被消耗后将其返回到 DALI 池。 需要与
schedule_run()
和share_outputs()
一起使用。 不应与同一管道中的run()
混合使用。注意
当使用动态执行器 (
exec_dynamic=True
) 时,缓冲区不会失效。
- reset()#
重置管道迭代器
如果管道迭代器到达末尾,则将其状态重置为开始。
- run(cuda_stream=None, /, **pipeline_inputs)#
运行管道并在指定的 CUDA 流上返回结果。
如果管道创建时将 exec_pipelined 选项设置为 True,则此函数还将启动预取下一个迭代,以加快执行速度。 不应与同一管道中的
schedule_run()
,share_outputs()
和release_outputs()
混合使用。- 参数:
cuda_stream¶ (可选,
cudaStream_t
或可转换为cudaStream_t
的对象) – 如果提供,则在此流上返回输出。 如果跳过,则结果是主机同步的。 请注意,当 prefetch_queue_depth>1 时,无需等待最近迭代的结果即可获得主机同步输出。pipeline_inputs¶ –
可用于为 DALI 提供输入的可选参数。 当 DALI 定义了任何输入操作符(例如 fn.external_source)时,您可以使用此函数中的命名参数为这些操作符提供输入。 假设 DALI 管道已正确定义和命名它们
@pipeline_def def my_pipe(): inp = fn.external_source(name="my_inp") return inp
使用上面的示例管道,您可以将
"my_inp"
输入提供给run()
函数p = my_pipe(prefetch_queue_depth=1, ...) p.run(my_inp=np.random((2,3,2)))
在
run()
函数中指定的此类关键字参数必须在 DALI 管道中声明相应的输入操作符节点。与使用 DALI 时始终一样,传递给关键字参数的值必须表示整个批次的数据。
请注意,使用此功能需要在 DALI Pipeline 构造函数中设置
prefetch_queue_depth=1
或exec_pipelined=False
。此功能可以被视为
feed_input()
函数的语法糖。
- 返回类型:
各个管道输出的 TensorList 对象元组
- save_graph_to_dot_file(filename, *, show_tensors=False, show_ids=None, use_colors=False)#
将管道图保存到文件。
- schedule_run()#
运行管道,但不返回结果缓冲区。
如果管道创建时将 exec_pipelined 选项设置为 True,则此函数还将启动预取下一个迭代,以加快执行速度。 它为用户提供了更好的控制,可以控制何时运行管道,何时获取结果缓冲区以及何时可以在结果被消耗后将其返回到 DALI 缓冲区池。 需要与
release_outputs()
和share_outputs()
一起使用。 不应与同一管道中的run()
混合使用
- property seed#
管道中使用的随机种子,如果种子未固定,则为 None。
- serialize(define_graph=None, filename=None)#
将管道序列化为 Protobuf 字符串。
此外,您可以传递文件名,以便将序列化的管道写入该文件。 文件内容将被覆盖。
- 参数:
define_graph¶ (callable) – 如果指定,则将使用此函数代替成员
define_graph()
。 如果管道输出通过set_outputs()
指定,则不得设置此参数。filename¶ (str) – 序列化管道将写入的文件。
kwargs¶ (dict) – 有关完整参数列表,请参阅 Pipeline 构造函数。
- property set_affinity#
如果为 True,则工作线程绑定到 CPU 核心。
- set_outputs(*output_data_nodes)#
设置管道的输出。
此函数的使用是覆盖派生类中 define_graph 的替代方法。
- 参数:
*output_data_nodes¶ (解包的
DataNode
对象列表) – 管道的输出
返回管道的输出。
与
outputs()
的主要区别在于 share_outputs 不会释放返回的缓冲区,需要调用 release_outputs 才能释放。 如果管道是异步执行的,则此函数会阻塞,直到结果可用。 它为用户提供了更好的控制,可以控制何时运行管道,何时获取结果缓冲区以及何时可以在结果被消耗后将其返回到 DALI 池。 需要与release_outputs()
和schedule_run()
一起使用。 不应与同一管道中的run()
混合使用。- 参数:
cuda_stream¶ (可选,
cudaStream_t
或可转换为cudaStream_t
的对象) – 例如cupy.cuda.Stream
,torch.cuda.Stream
。 返回的 TensorLists 所绑定的流。 默认为 None,这意味着输出与主机同步。 仅适用于使用exec_dynamic=True
构建的管道。- 返回:
各个管道输出的
TensorList
对象列表。除非使用动态执行器,否则返回的缓冲区仅在
release_outputs()
被调用之前有效。
- start_py_workers()#
启动 Python 工作进程(将运行
ExternalSource
回调)。 当使用fork
启动 Python 工作进程 (py_start_method="fork"
) 时,您需要在调用任何创建或获取 CUDA 上下文的功能之前调用start_py_workers()
。 当不需要这种分离时,Pipeline.build()
方法会自动调用它。如果您要构建多个通过 fork 进程启动 Python 工作进程的管道,则需要在调用任何构建或运行管道的方法之前,在所有这些管道上调用
start_py_workers()
方法(有关详细信息,请参阅build()
),因为构建会为当前进程获取 CUDA 上下文。这同样适用于使用任何其他将创建 CUDA 上下文的功能 - 例如,初始化使用 CUDA 的框架或使用它创建 CUDA 张量。 当使用
py_start_method="fork"
时,您需要在调用此类功能之前调用start_py_workers()
。fork 具有 CUDA 上下文的进程不受支持,可能会导致意外错误。
如果您使用此方法,则在调用
build()
时,不能指定define_graph
参数。
DataNode#
- class nvidia.dali.pipeline.DataNode(name, device='cpu', source=None)#
此类是 TensorList 的符号表示,在图形定义阶段使用。 它不携带实际数据,但用于定义操作符之间的连接并指定管道输出。 有关详细信息,请参阅
Pipeline
的文档。DataNode
对象可以作为输入(和一些命名关键字参数)传递给 DALI 操作符,但它们也提供算术运算,这些运算隐式创建执行表达式的适当操作符。- property(key, *, device='cpu')#
返回与 DataNode 关联的元数据属性
- shape(*, dtype=None, device='cpu')#
将此 DataNode 的运行时形状作为新的 DataNode 返回
- 参数:
arg_dtype¶ (DALIDataType, 可选) – 如果指定,形状将转换为此数据类型;默认为 INT64。
device¶ (str, 可选) – 返回结果的设备(“cpu”或“gpu”);默认为 CPU。
- source_info(*, device='cpu')#
返回 “source_info” 属性。 等效于 self.meta(“source_info”)。
实验性管道功能#
可以通过管道装饰器的特殊变体启用一些额外的实验性功能。
- @nvidia.dali.pipeline.experimental.pipeline_def(fn=None, *, enable_conditionals=False, **pipeline_kwargs)#
启用其他实验性功能的
@pipeline_def
装饰器的变体。 它具有与其非实验性变体相同的 API,并添加了下面列出的关键字参数。- 关键字参数:
debug (bool, 可选) –
启用管道调试模式 - 允许对管道定义进行逐步执行和中间数据检查,默认为 False。
注意
此模式仅用于调试目的 - 管道性能将比非调试模式差得多。
注意:: (..) – 由此装饰器启用的功能是实验性的。 API 可能会更改,功能可能会受到限制。
管道调试模式(实验性)#
可以通过将 @nvidia.dali.pipeline_def
装饰器替换为其实验性变体 @nvidia.dali.pipeline.experimental.pipeline_def
并将参数 debug
设置为 True,以调试模式运行管道。 它允许您访问和修改管道执行图中的数据,以及使用非 DALI 数据类型作为 DALI 操作符的输入。
在此模式下,操作符的输出类型为 DataNodeDebug
,它等效于标准模式下的 DataNode
。 您可以对 DataNodeDebug
类型的对象执行与 DataNode
相同的操作,包括算术运算。
在当前执行 Pipeline.run()
期间,使用 .get()
访问与 DataNodeDebug
对象关联的数据
@nvidia.dali.pipeline.experimental.pipeline_def(debug=True)
def my_pipe():
data, _ = fn.readers.file(file_root=images_dir)
img = fn.decoders.image(data)
print(np.array(img.get()[0]))
...
直接将非 DALI 数据类型(例如 NumPy ndarray,PyTorch Tensor)与 DALI 操作符一起使用
@nvidia.dali.pipeline.experimental.pipeline_def(batch_size=8, debug=True)
def my_pipe():
img = [np.random.rand(640, 480, 3) for _ in range(8)]
output = fn.flip(img)
...
注意#
调试模式下的种子生成与标准模式下的种子生成方式不同(它是确定性的但不同)。 如果您想在调试模式下获得与标准模式下相同的结果,请使用
seed
参数初始化操作符。对操作符的直接调用仅在
pipeline_def
函数的范围内有效,您不能在pipeline_def
外部以这种方式使用它们。您不能在迭代之间更改管道内操作符的顺序。
警告
使用调试模式会大大降低管道的性能。 仅将其用于调试目的。
注意
此功能是实验性的,其 API 可能会在没有通知的情况下更改。