Pipeline#
在 DALI 中,任何数据处理任务都有一个中心对象,称为 Pipeline。Pipeline 对象是 nvidia.dali.Pipeline
或派生类的实例。Pipeline 封装了数据处理图和执行引擎。
您可以通过以下方式定义 DALI Pipeline
通过实现一个在内部使用 DALI 操作符的函数,并使用
pipeline_def()
装饰器对其进行装饰。通过直接实例化
Pipeline
对象,构建图,并使用Pipeline.set_outputs()
设置 pipeline 输出。通过继承
Pipeline
类并覆盖Pipeline.define_graph()
(这是定义 DALI Pipeline 的旧版方法)。
数据处理图#
DALI pipeline 表示为操作图。图中有两种类型的节点
操作符 - 在每次调用操作符时创建
数据节点(参见
DataNode
)- 表示操作符的输出和输入;它们从操作符的调用返回,并将它们作为输入传递给其他操作符,从而在图中建立连接。
数据节点可以通过调用 操作符函数 进行转换。它们还支持 Python 样式的 索引,并且可以合并到 数学表达式 中。
示例
@pipeline_def # create a pipeline with processing graph defined by the function below
def my_pipeline():
""" Create a pipeline which reads images and masks, decodes the images and
returns them. """
img_files, labels = fn.readers.file(file_root="image_dir", seed=1)
mask_files, _ = fn.readers.file(file_root="mask_dir", seed=1)
images = fn.decoders.image(img_files, device="mixed")
masks = fn.decoders.image(mask_files, device="mixed")
return images, masks, labels
pipe = my_pipeline(batch_size=4, num_threads=2, device_id=0)
生成的图是
重要提示
pipeline 定义函数仅在构建 pipeline 时执行一次,并且通常返回一个 dali.DataNode
对象或其元组。为了方便起见,可以返回其他类型,例如 NumPy 数组,但这些类型被视为常量,并且仅评估一次。
处理图结构#
DALI pipeline 分阶段执行。这些阶段对应于可以为操作符指定的 device
参数,并按以下顺序执行
'cpu'
- 接受 CPU 输入并生成 CPU 输出的操作符。'mixed'
- 接受 CPU 输入并生成 GPU 输出的操作符,例如nvidia.dali.fn.decoders.image()
。'gpu'
- 接受 GPU 输入并生成 GPU 输出的操作符。
CPU 操作符生成的数据可以通过在 DataNode
(DALI 操作符的输出) 上调用 .gpu()
显式复制到 GPU。
稍后阶段生成的数据不能被在早期阶段执行的操作符使用。
大多数 DALI 操作符接受用于参数化其行为的附加关键字参数。这些命名的关键字参数(与位置输入不同)可以是
Python 常量
参数输入 - **CPU** 操作符的输出 - 在操作符的文档字符串中表示为 TensorList。
在参数输入的情况下,将一个操作符的输出作为另一个操作符的**命名关键字参数**传递,将在处理图中建立连接。
这些参数将作为 DALI pipeline 图的一部分在每次迭代和每个样本中计算。请记住,只有 CPU 操作符可以用作参数输入。
示例
@pipeline_def
def my_pipeline():
img_files, labels = fn.readers.file(file_root='image_dir', device='cpu')
# `images` is GPU data (result of Mixed operator)
images = fn.decoders.image(img_files, device='mixed')
# `coin_flip` must be on CPU so the `flip_params` can be used as argument input
flip_param = fn.random.coin_flip(device='cpu')
# `images` is input (GPU) and `flip_param` is argument input (CPU)
flipped = fn.flip(images, horizontal=flip_param, device='gpu')
# `labels` is explicitly marked for transfer to GPU, `flipped` is already GPU
return flipped, labels.gpu()
pipe = my_pipeline(batch_size=4, num_threads=2, device_id=0)
注意
如果未指定 device
参数,则会根据输入的位置自动选择它。如果至少有一个 GPU 输入,则假定 device='gpu'
,否则使用 'cpu'
。
上面的示例显式添加了 device
参数以提高清晰度,但如果仅为 fn.decoders.image
指定 device='mixed'
,则效果相同。
当前 Pipeline#
不影响 pipeline 输出的子图将被自动修剪。如果操作符具有副作用(例如 PythonFunction
操作符族),则在不设置当前 pipeline 的情况下无法调用它。当在派生 pipeline 的 Pipeline.define_graph()
方法内部定义图时,将隐式设置当前 pipeline。否则,可以使用上下文管理器(with
语句)进行设置
pipe = dali.Pipeline(batch_size=N, num_threads=3, device_id=0)
with pipe:
src = dali.ops.ExternalSource(my_source, num_outputs=2)
a, b = src()
pipe.set_outputs(a, b)
当使用 pipeline_def()
创建 pipeline 时,定义 pipeline 的函数将在新创建的 pipeline 的范围内执行。以下示例与上一个示例等效
@dali.pipeline_def(batch_size=N, num_threads=3, device_id=0)
def my_pipe(my_source):
return dali.fn.external_source(my_source, num_outputs=2)
pipe = my_pipe(my_source)
Pipeline 装饰器#
- @nvidia.dali.pipeline_def(fn=None, *, enable_conditionals=False, **pipeline_kwargs)#
将图定义函数转换为 DALI pipeline 工厂的装饰器。
图定义函数是返回预期 pipeline 输出的函数。您可以使用
@pipeline_def
装饰此函数@pipeline_def def my_pipe(flip_vertical, flip_horizontal): ''' Creates a DALI pipeline, which returns flipped and original images ''' data, _ = fn.readers.file(file_root=images_dir) img = fn.decoders.image(data, device="mixed") flipped = fn.flip(img, horizontal=flip_horizontal, vertical=flip_vertical) return flipped, img
装饰后的函数返回一个 DALI Pipeline 对象
pipe = my_pipe(True, False) # pipe.run() # the pipeline is not configured properly yet
pipeline 需要额外的参数,例如批大小、工作线程数、GPU 设备 ID 等(有关 pipeline 参数的完整列表,请参见
nvidia.dali.Pipeline()
)。这些参数可以作为附加关键字参数提供,并传递给装饰后的函数pipe = my_pipe(True, False, batch_size=32, num_threads=1, device_id=0)
pipeline 已正确配置,我们现在可以运行它。原始函数的输出变成了 Pipeline 的输出
flipped, img = pipe.run()
当某些 pipeline 参数是固定的时,可以在装饰器中按名称指定它们
@pipeline_def(batch_size=42, num_threads=3) def my_pipe(flip_vertical, flip_horizontal): ...
稍后在调用装饰后的函数时传递的任何 Pipeline 构造函数参数都将覆盖装饰器定义的参数
@pipeline_def(batch_size=32, num_threads=3) def my_pipe(): data = fn.external_source(source=my_generator) return data pipe = my_pipe(batch_size=128) # batch_size=128 overrides batch_size=32
警告
被装饰函数的参数可能会遮蔽 pipeline 构造函数参数 - 在这种情况下,无法更改其值。
注意
不允许在图定义函数中使用
**kwargs
(可变关键字参数)。它们可能会导致不受欢迎的、静默地劫持 Pipeline 构造函数中同名的一些参数。以这种方式编写的代码在 DALI 的未来版本中添加新参数到 Pipeline 构造函数时将停止工作。要在
@pipeline_def
函数的主体中访问任何 pipeline 参数,可以使用函数nvidia.dali.Pipeline.current()
@pipeline_def() def my_pipe(): pipe = Pipeline.current() batch_size = pipe.batch_size num_threads = pipe.num_threads ... pipe = my_pipe(batch_size=42, num_threads=3) ...
- 关键字参数:
enable_conditionals (bool, 可选) – 启用对在 pipeline 定义中使用
if
语句条件执行 DALI 操作符的支持,默认为 False。
条件执行#
DALI 允许使用 if
语句为批次中选定的样本有条件地执行操作符。要启用此功能,请使用 @pipeline_def
装饰器来定义 pipeline,并将 enable_conditionals
设置为 True
。
每个以 DataNode()
作为条件的 if
语句都将被识别为 DALI 条件语句。
例如,此 pipeline 以 25% 的概率将每张图像旋转一个介于 10 到 30 度之间的随机角度
@pipeline_def(enable_conditionals=True)
def random_rotate():
jpegs, _ = fn.readers.file(device="cpu", file_root=images_dir)
images = fn.decoders.image(jpegs, device="mixed")
do_rotate = fn.random.coin_flip(probability=0.25, dtype=DALIDataType.BOOL)
if do_rotate:
result = fn.rotate(images, angle=fn.random.uniform(range=(10, 30)), fill_value=0)
else:
result = images
return result
DALI 条件的语义可以理解为代码一次处理一个样本。
条件必须由标量样本表示 - 即具有 0 维形状。它可以是布尔值或 DALI 支持的任何数值类型 - 在后一种情况下,非零值被视为 True,零值被视为 False,这符合典型的 Python 语义。
此外,逻辑表达式 and
、or
和 not
可以用于 DataNode()
。前两个限制为布尔输入,not
允许与 if
语句条件相同的输入类型。逻辑表达式在评估时遵循短路规则。
您可以在 条件教程 中阅读更多内容。
防止 AutoGraph 转换#
- @nvidia.dali.pipeline.do_not_convert#
阻止 AutoGraph 转换函数的装饰器。
在条件模式下,DALI 使用 TensorFlow 的 AutoGraph 的一个分支来转换代码,使我们能够重写和检测
if
语句,以便它们可以在处理 DALI pipeline 中使用。AutoGraph 转换应用于 pipeline 定义中调用的任何顶级函数或方法(以及 pipeline 定义本身)。当函数被转换时,在其语法范围内定义的所有函数也会被转换。重写除其他效果外,还使这些函数不可序列化。
要阻止函数被转换,必须使用此装饰器标记其顶级包含函数。这有时可能需要将函数重构到外部范围。
external source
的并行模式 (parallel=True
) 要求其source
参数是可序列化的。为了防止重写source
,用于创建source
的函数应使用@do_not_convert
进行装饰。注意
只有不处理
DataNode
(因此不使用 DALI 操作符) 的函数才应使用此装饰器标记。例如
from nvidia.dali import pipeline_def, fn @pipeline_def(enable_conditionals=True) def pipe(): def source_factory(size): def source_fun(sample_info): return np.full(size, sample_info.iter_idx) return source_fun source = source_factory(size=(2, 1)) return fn.external_source(source=source, parallel=True, batch=False)
应转换为
from nvidia.dali import pipeline_def, fn from nvidia.dali.pipeline import do_not_convert @do_not_convert def source_factory(size): def source_fun(sample_info): return np.full(size, sample_info.iter_idx) return source_fun @pipeline_def(enable_conditionals=True) def pipe(): source = source_factory(size=(2, 1)) return fn.external_source(source=source, parallel=True, batch=False)
source_factory
必须被分解出来,否则它将作为 pipeline 定义的一部分进行转换。由于我们有兴趣防止source_fun
的 AutoGraph 转换,我们需要装饰其顶级包含函数。注意
如果在 pipeline 定义之外声明函数,并作为参数传递,但未在 pipeline 定义中直接调用,则不会对其进行转换。在这种情况下,传递给
external source
操作符、python function
操作符族或Numba function
操作符的回调不被视为在 pipeline 定义中直接调用。此类回调在 pipeline 运行时执行,因此在定义和构建 pipeline 之后执行。例如
from nvidia.dali import pipeline_def, fn def source_fun(sample_info): return np.full((2, 2), sample_info.iter_idx) @pipeline_def(enable_conditionals=True) def pipe(): return fn.external_source(source=source_fun, batch=False)
source_fun
不会被转换,因为它是在 pipeline 定义之外定义的,并且仅通过名称传递给外部源。
Pipeline 类#
- class nvidia.dali.Pipeline(batch_size=-1, num_threads=-1, device_id=-1, seed=-1, exec_pipelined=True, prefetch_queue_depth=2, exec_async=True, bytes_per_sample=0, set_affinity=False, max_streams=None, default_cuda_stream_priority=None, *, enable_memory_stats=False, enable_checkpointing=False, checkpoint=None, py_num_workers=1, py_start_method='fork', py_callback_pickler=None, output_dtype=None, output_ndim=None, exec_dynamic=False, experimental_exec_dynamic=None)#
Pipeline 类是所有 DALI 数据管线的基础。管线封装了数据处理图和执行引擎。
- 参数:
batch_size¶ (int, optional, default = -1) –
管线的最大批大小。此参数的负值无效 - 默认值只能用于序列化管线(使用存储在序列化管线中的值)。在大多数情况下,管线的实际批大小将等于最大批大小。也支持使用较小的批大小运行 DALI 管线。批大小可能会在迭代之间发生变化。
请注意,DALI 可能会根据此参数执行内存预分配。将其设置得过高可能会导致内存不足错误。
num_threads¶ (int, optional, default = -1) – 管线使用的 CPU 线程数。此参数的负值无效 - 默认值只能用于序列化管线(使用存储在序列化管线中的值)。
device_id¶ (int, optional, default = -1) – 管线使用的 GPU 的 ID。此参数的 None 值表示 DALI 不应使用 GPU 或 CUDA 运行时。这会将管线限制为仅 CPU 算子,但允许它在任何具有 CPU 功能的机器上运行。
seed¶ (int, optional, default = -1) – 用于随机数生成的种子。为此参数保留默认值将导致随机种子。
exec_pipelined¶ (bool, optional, default = True) – 是否以启用 CPU 和 GPU 计算重叠的方式执行管线,通常会带来更快的执行速度,但会消耗更多的内存。
prefetch_queue_depth¶ (int or {"cpu_size": int, "gpu_size": int}, optional, default = 2) –
执行器管线的深度。更深的管线使 DALI 更能抵抗每个批次的不均匀执行时间,但它也会消耗更多内存用于内部缓冲区。指定一个字典
{ "cpu_size": x, "gpu_size": y }
而不是整数将导致管线使用分离队列执行器,CPU 阶段的缓冲区队列大小为 x,混合和 GPU 阶段的缓冲区队列大小为 y。执行器将分别缓冲 CPU 和 GPU 阶段,并在发出第一个
run()
时填充缓冲区队列。分离执行要求exec_async=True
、exec_pipelined=True
和exec_dynamic=False
。exec_async¶ (bool, optional, default = True) – 是否异步执行管线。这使得
run()
方法相对于调用 Python 线程异步运行。为了与管线同步,需要调用outputs()
方法。exec_dynamic¶ (bool, optional, default = False) – 是否使用动态执行器。动态执行器允许交错 CPU 和 GPU 算子,并执行 GPU 到 CPU 的复制。它还对管线输出和算子间缓冲区使用动态内存分配,从而减少复杂管线中的内存消耗。当
exec_dynamic
为True
时,exec_async
和exec_pipelined
必须保留为其默认值 (True
)。bytes_per_sample¶ (int, optional, default = 0) – DALI 用于其张量的内存使用量提示。
set_affinity¶ (bool, optional, default = False) – 是否将 CPU 核心亲和性设置为最接近所用 GPU 的核心。
max_streams¶ (int, deprecated, default = None) – 已弃用,此参数无效。
default_cuda_stream_priority¶ (int, optional, default = None) – 已弃用,此参数无效。
enable_memory_stats¶ (bool, optional, default = False) – DALI 是否应打印算子输出缓冲区统计信息。对 bytes_per_sample_hint 算子参数很有用。当
exec_dynamic
为True
时,此标志无效。enable_checkpointing¶ (bool, optional, default = False) –
如果为 True,DALI 将跟踪算子的状态。在这种情况下,调用
checkpoint
方法会返回管线的序列化状态。可以使用作为checkpoint
参数传递的序列化状态稍后重建同一管线,以从保存的迭代恢复运行。更多详细信息可以在此文档部分中找到。
checkpoint¶ (str, optional, default = None) –
从
checkpoint
方法接收的序列化检查点。构建管线后,其状态将从checkpoint
恢复,并且管线从保存的迭代恢复执行。更多详细信息可以在此文档部分中找到。
py_num_workers¶ (int, optional, default = 1) – 将处理并行
external_source()
回调的 Python 工作进程数。仅当至少有一个external_source()
的parallel
设置为 True 时,池才启动。将其设置为 0 将禁用池,并且所有 ExternalSource 算子都将回退到非并行模式,即使parallel
设置为 True 也是如此。py_start_method¶ (str, default = "fork") –
确定 Python 工作进程的启动方式。支持的方法
"fork"
- 通过 fork 进程启动"spawn"
- 启动新的解释器进程
如果使用
spawn
方法,则 ExternalSource 的回调必须是可 pickle 的。为了使用fork
,启动工作进程时不得获取 CUDA 上下文。因此,如果您需要构建多个使用 Python 工作进程的管线,则需要在构建或运行任何管线之前调用start_py_workers()
(有关详细信息,请参阅build()
)。您可以在 Python 的multiprocessing
模块文档中找到有关这两种方法的更多详细信息和注意事项。py_callback_pickler¶ (module or tuple, default = None) –
如果
py_start_method
设置为 spawn,则传递给并行 ExternalSource 的回调必须是可 pickle 的。如果在 Python 3.8 或更高版本中运行,且py_callback_pickler
设置为 None,则 DALI 在序列化回调时使用自定义 pickle,以支持本地函数和 lambda 的序列化。但是,如果您需要序列化更复杂的对象(如本地类)或者您正在运行旧版本的 Python,您可以提供外部序列化包(如 dill 或 cloudpickle),这些包实现了两个方法:dumps 和 loads,以使 DALI 使用它们来序列化外部源回调。您可以将模块直接作为
py_callback_pickler
传递import dill @pipeline_def(py_callback_pickler=dill, ...) def create_pipeline(): src = fn.external_source( lambda sample_info: np.int32([42]), batch=False, parallel=True, ) ...
py_callback_pickler
的有效值可以是实现dumps
和loads
方法的模块/对象,也可以是元组,其中第一个项目是模块/对象,接下来的两个可选参数是在调用 dumps 和 loads 时要传递的额外 kwargs。所提供的方法和 kwargs 必须可使用标准 pickle.dumps 进行 pickle。如果您运行的是 Python 3.8 或更高版本,并使用默认的 DALI pickler (
py_callback_pickler
= None),您可以提示 DALI 通过使用 @dali.pickling.pickle_by_value 修饰全局函数,按值而不是按引用序列化全局函数。当使用 Jupyter notebook 时,这可能特别有用,以解决工作进程无法导入在 notebook 中定义为全局函数的回调的问题。output_dtype¶ (
nvidia.dali.types.DALIDataType
或其列表, default = None) –使用此参数,您可以声明您期望在给定输出中使用的数据类型。您应传递 types.DALIDataType 的列表,列表中的每个元素对应于管线的一个输出。此外,您可以传递
None
作为通配符。在每次迭代之后,将根据您传递给此参数的类型验证输出。如果任何输出与提供的类型不匹配,则会引发 RuntimeError。如果
output_dtype
值是单个值(而不是列表),则会将其广播到管线的输出数量。output_ndim¶ (int or list of ints, default = None) –
使用此参数,您可以声明您期望在给定输出中使用的维度数。您应传递整数列表,列表中的每个元素对应于管线的一个输出。此外,您可以传递
None
作为通配符。在每次迭代之后,将根据您传递给此参数的维度数验证输出。如果任何输出的维度与提供的ndim
不匹配,则会引发 RuntimeError。如果
output_ndim
值是单个值(而不是列表),则会将其广播到管线的输出数量。
- __enter__()#
安全地将管线设置为当前管线。调用具有副作用或没有输出的算子时,需要当前管线。此类算子的示例包括 PythonFunction(潜在的副作用)或 DumpImage(没有输出)。
任何悬空算子都可以标记为具有副作用,如果它标记了 preserve=True,这对于调试可能很有用 - 否则,不贡献于管线输出的算子将从图中删除。
要手动设置新的(和恢复以前的)当前管线,请分别使用
push_current()
和pop_current()
。
- __exit__(exception_type, exception_value, traceback)#
安全地恢复以前的管线。
- add_sink(edge)#
将边标记为数据接收器,防止其被修剪,即使它未连接到管线输出。
- property batch_size#
批大小。
- build()#
构建管线(可选步骤)。
实例化管线的后端对象并启动处理线程。如果管线使用多处理
external_source
,则也会启动工作进程。在大多数情况下,无需手动调用 build。当使用多处理时,可能有必要在主进程与 GPU 进行任何交互之前调用build()
或start_py_workers()
。如果需要,可以在运行管线之前使用build()
来分离图构建和处理步骤。当管线被自动构建时,它会:
通过 run API (
run()
,schedule_run()
) 或框架特定的插件运行,通过
feed_input()
提供输入访问管线元数据 (
epoch_size()
,reader_meta()
)访问输出 - 包括
output_stream()
需要以其他方式物化图 - 例如
save_graph_to_dot_file()
。
- checkpoint(filename=None)#
以序列化的 Protobuf 字符串形式返回管线的状态。
此外,如果指定了
filename
,则序列化的检查点将写入指定的文件。文件内容将被覆盖。可以使用保存的检查点稍后重建同一管线,作为 checkpoint 参数传递,以从保存的迭代恢复执行。
更多详细信息可以在此文档部分中找到。
- 参数:
filename¶ (str) – 将序列化管线写入的文件。
- property cpu_queue_size#
CPU 阶段提前处理的迭代次数。
- static current()#
返回由
push_current()
设置的当前管线的实例。
- property default_cuda_stream_priority#
已弃用;始终为 0。
- define_graph()#
此函数由用户定义,用于构建其管线操作图。
它返回通过调用 DALI 算子创建的输出列表。
- classmethod deserialize(serialized_pipeline=None, filename=None, **kwargs)#
反序列化并构建管线。
反序列化先前使用
serialize()
方法序列化的管线。返回的管线已构建。
或者,可以传递额外的参数,这些参数将在实例化管线时使用。有关参数的完整列表,请参阅 Pipeline 构造函数。默认情况下,将使用来自序列化管线的参数实例化管线。
请注意,
serialized_pipeline
和filename
参数是互斥的
- deserialize_and_build(serialized_pipeline)#
反序列化并构建以序列化形式给出的管线。
- 参数:
serialized_pipeline¶ (str) – 序列化的管线。
- property device_id#
管线使用的 GPU 的 ID,对于仅 CPU 管线,则为 None。
- empty()#
如果管线中已安排但尚未消耗任何工作
- enable_api_check(enable)#
允许在运行时启用或禁用 API 检查
- property enable_memory_stats#
如果为 True,则收集内存使用情况统计信息。
- epoch_size(name=None)#
管线的 epoch 大小。
如果
name
参数为 None,则返回 (reader name, 该 reader 的 epoch 大小) 对的字典。如果name
参数不为 None,则返回该 reader 的 epoch 大小。- 参数:
name¶ (str, optional, default = None) – 应用于获取 epoch 大小的 reader。
- property exec_async#
如果为 true,则使用异步执行。
- property exec_dynamic#
如果为 true,则使用动态执行器。
- property exec_pipelined#
如果为 true,则使用管线执行模型。
- property exec_separated#
如果为 True,则 CPU 和 GPU 阶段有单独的预取队列。
- executor_statistics()#
以字典形式返回提供的管线执行器统计元数据。字典中的每个键都是算子名称。要启用它,请使用
executor_statistics
每个算子的可用元数据键
real_memory_size
- 算子的每个输出使用的内存大小列表。列表中的索引对应于输出索引。max_real_memory_size
- 算子的每个输出使用的最大张量大小的列表。列表中的索引对应于输出索引。reserved_memory_size
- 为每个算子输出保留的内存大小列表。列表中的索引对应于输出索引。max_reserved_memory_size
- 为每个算子输出保留的每个张量的最大内存大小的列表。列表中的索引对应于输出索引。
注意
当使用
exec_dynamic=True
时,执行器统计信息不可用。
- external_source_shm_statistics()#
返回有关共享内存消耗的并行外部源的统计信息。返回的字典包含以下键
capacities
- 分配给共享内存槽的大小(以字节为单位)列表,用于容纳并行外部源生成的数据。per_sample_capacities
- 共享内存槽大小(以字节为单位)的列表,除以 mini-batch 大小,即此类槽中存储的最大样本数。此值对应于外部源的bytes_per_sample_hint
参数,即,如果提示足够大并且外部源不需要重新分配内存,则这些值应相等。
- feed_input(data_node, data, layout=None, cuda_stream=None, use_copy_kernel=False)#
将多维数组或 DLPack(或列表)传递给符合条件的操作符。
可以使用此函数提供数据的操作符是输入操作符(即
fn.inputs
模块中的所有内容)和fn.external_source()
。对于 GPU 输入的情况,必须在与
feed_input
使用的流相同的流上修改数据。有关详细信息,请参阅cuda_stream
参数。为了避免停顿,数据应提前 prefetch_queue_depth 次提供。
- 参数:
data_node¶ (
DataNode
或字符串) – 符合条件的操作符节点的名称或调用该操作符返回的DataNode
对象。data¶ (ndarray 或 DLPack 或 列表) –
数组可以是以下之一
NumPy ndarray (CPU)
MXNet ndarray (CPU)
PyTorch tensor (CPU 或 GPU)
CuPy array (GPU)
实现
__cuda_array_interface__
的对象DALI
TensorList
或 DALITensor
对象列表
要用作
data_node
引用的操作符输出的数据。layout¶ (字符串或
None
) – 数据布局的描述(如果未指定,则为空字符串)。它应该是与数据维度匹配的长度字符串,不包括批次维度。对于通道优先图像批次,应为"CHW"
,对于通道最后视频,应为"FHWC"
,依此类推。如果data
是 DALITensorList
或 DALITensor
对象列表,并且layout
为None
,则布局取自data
。每次迭代中数据布局必须相同。cuda_stream¶ (可选,
cudaStream_t
或可转换为cudaStream_t
的对象,) –例如
cupy.cuda.Stream
,torch.cuda.Stream
。CUDA 流,将用于将数据复制到 GPU 或从 GPU 源复制数据。如果未设置,将尽力保持正确性 - 即,如果数据是从已识别的库(CuPy、PyTorch)作为张量/数组提供的,则使用库的当前流。这在典型情况下应该有效,但高级用例(和使用不支持的库的代码)可能仍然需要显式提供流句柄。特殊值
0 - 使用默认 CUDA 流
-1 - 使用 DALI 内部流
如果使用内部流,则对
feed_input
的调用将阻塞,直到复制到内部缓冲区完成,因为无法与此流同步以防止在另一个流中使用新数据覆盖数组。use_copy_kernel¶ (可选,
bool
) – 如果设置为 True,DALI 将使用 CUDA 内核来馈送数据(仅适用于将数据复制到/从 GPU 内存复制数据时),而不是cudaMemcpyAsync
(默认)。
- property gpu_queue_size#
GPU 阶段提前处理的迭代次数。
- property is_restored_from_checkpoint#
如果为 True,则此 pipeline 是从检查点恢复的。
- iter_setup()#
一种已弃用的为 pipeline 提供外部输入的方法。
用户定义的 pipeline 可以覆盖此函数,以便为每次迭代执行任何需要的设置。例如,可以使用此函数从 NumPy 数组馈送输入数据。
此方法已弃用,不鼓励使用。较新的执行模型可能与这种为 pipeline 提供数据的方法不兼容。尽可能使用 source 参数在
external_source
中代替。
- property max_batch_size#
最大批次大小。
- property max_streams#
已弃用,未使用;返回 -1。
- property num_threads#
此 pipeline 使用的 CPU 线程数。
- output_dtype()#
输出端预期的数据类型。
- output_ndim()#
输出端预期的维度数。
- output_stream()#
返回生成输出的内部 CUDA 流。
- outputs(cuda_stream=None)#
返回 pipeline 的输出并释放先前的缓冲区。
如果 pipeline 是异步执行的,则此函数将阻塞,直到结果可用。如果数据集已到达末尾,则会引发 StopIteration - 通常在 iter_setup 无法生成更多数据时。
- 参数:
cuda_stream¶ (可选,
cudaStream_t
或可转换为cudaStream_t
的对象,) – 例如cupy.cuda.Stream
,torch.cuda.Stream
。返回的 TensorLists 绑定到的流。默认为 None,这意味着输出与主机同步。仅适用于使用exec_dynamic=True
构建的 pipeline。- 返回类型:
各个 pipeline 输出的 TensorList 对象列表
- static pop_current()#
将先前的 pipeline 恢复为当前 pipeline。
push_current()
的补充。
- property prefetch_queue_depth#
预取队列的深度(或深度),如
__init__
参数中指定。
- static push_current(pipeline)#
将 pipeline 设置为当前 pipeline,并将先前的当前 pipeline 存储在堆栈上。要将先前的 pipeline 恢复为当前 pipeline,请使用
pop_current()
。为确保在发生异常时正确恢复 pipeline,请使用上下文管理器 (with my_pipeline:)。
调用具有副作用或没有输出的操作符需要当前 pipeline。此类操作符的示例包括 PythonFunction (潜在的副作用)或 DumpImage (没有输出)。
任何悬空算子都可以标记为具有副作用,如果它标记了 preserve=True,这对于调试可能很有用 - 否则,不贡献于管线输出的算子将从图中删除。
- property py_num_workers#
并行
`external_source`
使用的 Python 工作进程数。
- property py_start_method#
并行
`external_source`
使用的启动 Python 工作进程的方法。
- reader_meta(name=None)#
以字典形式返回提供的 reader 元数据。如果未提供名称,则提供包含所有 reader 数据的字典,格式为 {reader_name : meta}
可用的元数据键
epoch_size
:原始 epoch 大小epoch_size_padded
:epoch 大小,末尾填充以可被分片数整除
number_of_shards
:分片数shard_id
:给定 reader 的分片 IDpad_last_batch
:给定 reader 是否应填充最后一个批次stick_to_shard
:给定 reader 是否应坚持其分片- 参数:
name¶ (str, 可选, 默认 = None) – 应用于获取 shards_number 的 reader。
- release_outputs()#
释放 share_outputs 调用返回的缓冲区。
当输出调用结果被使用(复制)并且可以在下次调用 share_outputs 之前将缓冲区标记为空闲时,这很有帮助。它为用户提供了更好的控制,可以控制何时运行 pipeline,何时获取结果缓冲区,以及何时可以在结果被使用后将其返回到 DALI 池。需要与
schedule_run()
和share_outputs()
一起使用。不应与同一 pipeline 中的run()
混合使用。注意
当使用动态执行器(
exec_dynamic=True
)时,缓冲区不会失效。
- reset()#
重置 pipeline 迭代器
如果 pipeline 迭代器已到达末尾,则将其状态重置为开始。
- run(cuda_stream=None, /, **pipeline_inputs)#
运行 pipeline 并在指定的 CUDA 流上返回结果。
如果 pipeline 是使用 exec_pipelined 选项设置为 True 创建的,则此函数还将启动预取下一次迭代以加快执行速度。不应与同一 pipeline 中的
schedule_run()
、share_outputs()
和release_outputs()
混合使用- 参数:
cuda_stream¶ (可选,
cudaStream_t
或可转换为cudaStream_t
的对象,) – 如果提供,则在此流上返回输出。如果跳过,则结果是主机同步的。请注意,使用 prefetch_queue_depth>1 可以获得主机同步输出,而无需等待最近迭代的结果。pipeline_inputs¶ –
可用于为 DALI 提供输入的可选参数。当 DALI 定义了任何输入操作符(例如 fn.external_source)时,您可以使用此函数中的命名参数为这些操作符提供输入。假设 DALI pipeline 具有正确定义和命名的这些输入操作符
@pipeline_def def my_pipe(): inp = fn.external_source(name="my_inp") return inp
使用上面的示例 pipeline,您可以将
"my_inp"
输入到run()
函数中p = my_pipe(prefetch_queue_depth=1, ...) p.run(my_inp=np.random((2,3,2)))
在
run()
函数中指定的此类关键字参数必须在 DALI pipeline 中声明相应的输入操作符节点。与使用 DALI 时一样,传递给关键字参数的值必须表示整个批次的数据。
请注意,使用此功能需要在 DALI Pipeline 构造函数中设置
prefetch_queue_depth=1
或exec_pipelined=False
。此功能可以被视为
feed_input()
函数的语法糖。
- 返回类型:
各个 pipeline 输出的 TensorList 对象元组
- save_graph_to_dot_file(filename, *, show_tensors=False, show_ids=None, use_colors=False)#
将 pipeline 图形保存到文件。
- schedule_run()#
运行 pipeline,但不返回结果缓冲区。
如果 pipeline 是使用 exec_pipelined 选项设置为 True 创建的,则此函数还将启动预取下一次迭代以加快执行速度。它为用户提供了更好的控制,可以控制何时运行 pipeline,何时获取结果缓冲区,以及在结果被使用后何时将其返回到 DALI 缓冲区池。需要与
release_outputs()
和share_outputs()
一起使用。不应与同一 pipeline 中的run()
混合使用
- property seed#
pipeline 中使用的随机种子,如果种子未固定,则为 None。
- serialize(define_graph=None, filename=None)#
将 pipeline 序列化为 Protobuf 字符串。
此外,您可以传递文件名,以便将序列化的 pipeline 写入该文件。文件内容将被覆盖。
- 参数:
define_graph¶ (callable) – 如果指定,则将使用此函数代替成员
define_graph()
。如果 pipeline 输出是使用set_outputs()
指定的,则不得设置此参数。filename¶ (str) – 将序列化 pipeline 写入的文件。
kwargs¶ (dict) – 有关参数的完整列表,请参阅 Pipeline 构造函数。
- property set_affinity#
如果为 True,则工作线程绑定到 CPU 核心。
- set_outputs(*output_data_nodes)#
设置 pipeline 的输出。
此函数的使用是派生类中覆盖 define_graph 的替代方法。
- 参数:
*output_data_nodes¶ (解包的
DataNode
对象列表) – pipeline 的输出
返回 pipeline 的输出。
与
outputs()
的主要区别在于 share_outputs 不会释放返回的缓冲区,需要调用 release_outputs 来释放。如果 pipeline 是异步执行的,则此函数将阻塞,直到结果可用。它为用户提供了更好的控制,可以控制何时运行 pipeline,何时获取结果缓冲区,以及何时可以在结果被使用后将其返回到 DALI 池。需要与release_outputs()
和schedule_run()
一起使用。不应与同一 pipeline 中的run()
混合使用。- 参数:
cuda_stream¶ (可选,
cudaStream_t
或可转换为cudaStream_t
的对象,) – 例如cupy.cuda.Stream
,torch.cuda.Stream
。返回的 TensorLists 绑定到的流。默认为 None,这意味着输出与主机同步。仅适用于使用exec_dynamic=True
构建的 pipeline。- 返回:
各个 pipeline 输出的
TensorList
对象列表。除非使用动态执行器,否则返回的缓冲区仅在以下情况下有效
release_outputs()
被调用。
- start_py_workers()#
启动 Python 工作进程(将运行
ExternalSource
回调)。当使用fork
启动 Python 工作进程(py_start_method="fork"
)时,您需要在调用任何创建或获取 CUDA 上下文的功能之前调用start_py_workers()
。当不需要这种分离时,Pipeline.build()
方法会自动调用它。如果您要构建多个通过 fork 进程启动 Python worker 的 pipeline,那么您需要在所有这些 pipeline 上调用
start_py_workers()
方法,然后再调用任何构建或运行 pipeline 的方法(详见build()
的详细信息),因为构建操作会为当前进程获取 CUDA 上下文。同样适用于使用任何其他会创建 CUDA 上下文的功能 - 例如,初始化使用 CUDA 的框架或使用 CUDA 创建 CUDA 张量。当使用
py_start_method="fork"
时,您需要在调用此类功能之前调用start_py_workers()
。fork 具有 CUDA 上下文的进程是不受支持的,并且可能导致意外错误。
如果您使用此方法,则在调用
build()
时,不能指定define_graph
参数。
DataNode#
- class nvidia.dali.pipeline.DataNode(name, device='cpu', source=None)#
此类是 TensorList 的符号表示,用于图定义阶段。它不携带实际数据,但用于定义运算符之间的连接并指定 pipeline 输出。 有关详细信息,请参阅
Pipeline
的文档。DataNode
对象可以作为输入(以及一些命名的关键字参数)传递给 DALI 运算符,但它们也提供算术运算,这些运算隐式地创建执行表达式的相应运算符。- property(key, *, device='cpu')#
返回与 DataNode 关联的元数据属性
- shape(*, dtype=None, device='cpu')#
将此 DataNode 的运行时形状作为新的 DataNode 返回
- 参数:
arg_dtype¶ (DALIDataType, optional) – 如果指定,形状将转换为此数据类型;默认为 INT64。
device¶ (str, optional) – 返回结果的设备(“cpu” 或 “gpu”);默认为 CPU。
- source_info(*, device='cpu')#
返回 “source_info” 属性。等效于 self.meta(“source_info”)。
实验性 Pipeline 功能#
一些额外的实验性功能可以通过 pipeline 装饰器的特殊变体启用。
- @nvidia.dali.pipeline.experimental.pipeline_def(fn=None, *, enable_conditionals=False, **pipeline_kwargs)#
@pipeline_def
装饰器的变体,它启用额外的实验性功能。它具有与其非实验性变体相同的 API,并增加了下面列出的关键字参数。- 关键字参数:
debug (bool, optional) –
启用 pipeline 调试模式 - 允许对 pipeline 定义进行逐步执行和中间数据检查,默认为 False。
注意
此模式仅用于调试目的 - pipeline 性能将明显低于非调试模式。
note:: (..) – 由此装饰器启用的功能是实验性的。API 可能会更改,并且功能可能会受到限制。
Pipeline 调试模式(实验性)#
Pipeline 可以在调试模式下运行,方法是将 @nvidia.dali.pipeline_def
装饰器替换为其实验性变体 @nvidia.dali.pipeline.experimental.pipeline_def
并将参数 debug
设置为 True。它允许您访问和修改 pipeline 执行图中的数据,以及使用非 DALI 数据类型作为 DALI 运算符的输入。
在此模式下,运算符的输出类型为 DataNodeDebug
,它等效于标准模式下的 DataNode
。 您可以对 DataNodeDebug
类型的对象执行与 DataNode
相同的操作,包括算术运算。
使用 .get()
在当前 Pipeline.run()
执行期间访问与 DataNodeDebug
对象关联的数据
@nvidia.dali.pipeline.experimental.pipeline_def(debug=True)
def my_pipe():
data, _ = fn.readers.file(file_root=images_dir)
img = fn.decoders.image(data)
print(np.array(img.get()[0]))
...
直接将非 DALI 数据类型(例如 NumPy ndarray,PyTorch Tensor)与 DALI 运算符一起使用
@nvidia.dali.pipeline.experimental.pipeline_def(batch_size=8, debug=True)
def my_pipe():
img = [np.random.rand(640, 480, 3) for _ in range(8)]
output = fn.flip(img)
...
注意#
调试模式下的种子生成与标准模式不同(它是确定性的但不同)。如果您想在调试模式下获得与标准模式相同的结果,请使用
seed
参数初始化运算符。直接调用运算符仅在
pipeline_def
函数的范围内有效,您不能在pipeline_def
之外以这种方式使用它们。您不能在迭代之间更改 pipeline 内运算符的顺序。
警告
使用调试模式会大大降低 pipeline 的性能。仅将其用于调试目的。
注意
此功能是实验性的,其 API 可能会更改,恕不另行通知。