Pipeline#

在 DALI 中,任何数据处理任务都有一个中心对象,称为 Pipeline。Pipeline 对象是 nvidia.dali.Pipeline 或派生类的实例。Pipeline 封装了数据处理图和执行引擎。

您可以通过以下方式定义 DALI Pipeline

  1. 通过实现一个在内部使用 DALI 操作符的函数,并使用 pipeline_def() 装饰器对其进行装饰。

  2. 通过直接实例化 Pipeline 对象,构建图,并使用 Pipeline.set_outputs() 设置 pipeline 输出。

  3. 通过继承 Pipeline 类并覆盖 Pipeline.define_graph() (这是定义 DALI Pipeline 的旧版方法)。

数据处理图#

DALI pipeline 表示为操作图。图中有两种类型的节点

  • 操作符 - 在每次调用操作符时创建

  • 数据节点(参见 DataNode)- 表示操作符的输出和输入;它们从操作符的调用返回,并将它们作为输入传递给其他操作符,从而在图中建立连接。

数据节点可以通过调用 操作符函数 进行转换。它们还支持 Python 样式的 索引,并且可以合并到 数学表达式 中。

示例

@pipeline_def  # create a pipeline with processing graph defined by the function below
def my_pipeline():
    """ Create a pipeline which reads images and masks, decodes the images and
        returns them. """
    img_files, labels = fn.readers.file(file_root="image_dir", seed=1)
    mask_files, _ = fn.readers.file(file_root="mask_dir", seed=1)
    images = fn.decoders.image(img_files, device="mixed")
    masks  = fn.decoders.image(mask_files, device="mixed")
    return images, masks, labels

pipe = my_pipeline(batch_size=4, num_threads=2, device_id=0)

生成的图是

_images/two_readers.svg

重要提示

pipeline 定义函数仅在构建 pipeline 时执行一次,并且通常返回一个 dali.DataNode 对象或其元组。为了方便起见,可以返回其他类型,例如 NumPy 数组,但这些类型被视为常量,并且仅评估一次。

处理图结构#

DALI pipeline 分阶段执行。这些阶段对应于可以为操作符指定的 device 参数,并按以下顺序执行

  1. 'cpu' - 接受 CPU 输入并生成 CPU 输出的操作符。

  2. 'mixed' - 接受 CPU 输入并生成 GPU 输出的操作符,例如 nvidia.dali.fn.decoders.image()

  3. 'gpu' - 接受 GPU 输入并生成 GPU 输出的操作符。

CPU 操作符生成的数据可以通过在 DataNode (DALI 操作符的输出) 上调用 .gpu() 显式复制到 GPU。

稍后阶段生成的数据不能被在早期阶段执行的操作符使用。

大多数 DALI 操作符接受用于参数化其行为的附加关键字参数。这些命名的关键字参数(与位置输入不同)可以是

  • Python 常量

  • 参数输入 - **CPU** 操作符的输出 - 在操作符的文档字符串中表示为 TensorList

在参数输入的情况下,将一个操作符的输出作为另一个操作符的**命名关键字参数**传递,将在处理图中建立连接。

这些参数将作为 DALI pipeline 图的一部分在每次迭代和每个样本中计算。请记住,只有 CPU 操作符可以用作参数输入。

示例

@pipeline_def
def my_pipeline():
    img_files, labels = fn.readers.file(file_root='image_dir', device='cpu')
    # `images` is GPU data (result of Mixed operator)
    images = fn.decoders.image(img_files, device='mixed')
    # `coin_flip` must be on CPU so the `flip_params` can be used as argument input
    flip_param = fn.random.coin_flip(device='cpu')
    # `images` is input (GPU) and `flip_param` is argument input (CPU)
    flipped = fn.flip(images, horizontal=flip_param, device='gpu')
    # `labels` is explicitly marked for transfer to GPU, `flipped` is already GPU
    return flipped, labels.gpu()

pipe = my_pipeline(batch_size=4, num_threads=2, device_id=0)

注意

如果未指定 device 参数,则会根据输入的位置自动选择它。如果至少有一个 GPU 输入,则假定 device='gpu',否则使用 'cpu'

上面的示例显式添加了 device 参数以提高清晰度,但如果仅为 fn.decoders.image 指定 device='mixed',则效果相同。

当前 Pipeline#

不影响 pipeline 输出的子图将被自动修剪。如果操作符具有副作用(例如 PythonFunction 操作符族),则在不设置当前 pipeline 的情况下无法调用它。当在派生 pipeline 的 Pipeline.define_graph() 方法内部定义图时,将隐式设置当前 pipeline。否则,可以使用上下文管理器(with 语句)进行设置

pipe = dali.Pipeline(batch_size=N, num_threads=3, device_id=0)
with pipe:
    src = dali.ops.ExternalSource(my_source, num_outputs=2)
    a, b = src()
    pipe.set_outputs(a, b)

当使用 pipeline_def() 创建 pipeline 时,定义 pipeline 的函数将在新创建的 pipeline 的范围内执行。以下示例与上一个示例等效

@dali.pipeline_def(batch_size=N, num_threads=3, device_id=0)
def my_pipe(my_source):
    return dali.fn.external_source(my_source, num_outputs=2)

pipe = my_pipe(my_source)

Pipeline 装饰器#

@nvidia.dali.pipeline_def(fn=None, *, enable_conditionals=False, **pipeline_kwargs)#

将图定义函数转换为 DALI pipeline 工厂的装饰器。

图定义函数是返回预期 pipeline 输出的函数。您可以使用 @pipeline_def 装饰此函数

@pipeline_def
def my_pipe(flip_vertical, flip_horizontal):
    ''' Creates a DALI pipeline, which returns flipped and original images '''
    data, _ = fn.readers.file(file_root=images_dir)
    img = fn.decoders.image(data, device="mixed")
    flipped = fn.flip(img, horizontal=flip_horizontal, vertical=flip_vertical)
    return flipped, img

装饰后的函数返回一个 DALI Pipeline 对象

pipe = my_pipe(True, False)
# pipe.run()  # the pipeline is not configured properly yet

pipeline 需要额外的参数,例如批大小、工作线程数、GPU 设备 ID 等(有关 pipeline 参数的完整列表,请参见 nvidia.dali.Pipeline())。这些参数可以作为附加关键字参数提供,并传递给装饰后的函数

pipe = my_pipe(True, False, batch_size=32, num_threads=1, device_id=0)

pipeline 已正确配置,我们现在可以运行它。原始函数的输出变成了 Pipeline 的输出

flipped, img = pipe.run()

当某些 pipeline 参数是固定的时,可以在装饰器中按名称指定它们

@pipeline_def(batch_size=42, num_threads=3)
def my_pipe(flip_vertical, flip_horizontal):
    ...

稍后在调用装饰后的函数时传递的任何 Pipeline 构造函数参数都将覆盖装饰器定义的参数

@pipeline_def(batch_size=32, num_threads=3)
def my_pipe():
    data = fn.external_source(source=my_generator)
    return data

pipe = my_pipe(batch_size=128)  # batch_size=128 overrides batch_size=32

警告

被装饰函数的参数可能会遮蔽 pipeline 构造函数参数 - 在这种情况下,无法更改其值。

注意

不允许在图定义函数中使用 **kwargs (可变关键字参数)。它们可能会导致不受欢迎的、静默地劫持 Pipeline 构造函数中同名的一些参数。以这种方式编写的代码在 DALI 的未来版本中添加新参数到 Pipeline 构造函数时将停止工作。

要在 @pipeline_def 函数的主体中访问任何 pipeline 参数,可以使用函数 nvidia.dali.Pipeline.current()

@pipeline_def()
def my_pipe():
    pipe = Pipeline.current()
    batch_size = pipe.batch_size
    num_threads = pipe.num_threads
    ...

pipe = my_pipe(batch_size=42, num_threads=3)
...
关键字参数:

enable_conditionals (bool, 可选) – 启用对在 pipeline 定义中使用 if 语句条件执行 DALI 操作符的支持,默认为 False。

条件执行#

DALI 允许使用 if 语句为批次中选定的样本有条件地执行操作符。要启用此功能,请使用 @pipeline_def 装饰器来定义 pipeline,并将 enable_conditionals 设置为 True

每个以 DataNode() 作为条件的 if 语句都将被识别为 DALI 条件语句。

例如,此 pipeline 以 25% 的概率将每张图像旋转一个介于 10 到 30 度之间的随机角度

@pipeline_def(enable_conditionals=True)
def random_rotate():
    jpegs, _ = fn.readers.file(device="cpu", file_root=images_dir)
    images = fn.decoders.image(jpegs, device="mixed")
    do_rotate = fn.random.coin_flip(probability=0.25, dtype=DALIDataType.BOOL)
    if do_rotate:
        result = fn.rotate(images, angle=fn.random.uniform(range=(10, 30)), fill_value=0)
    else:
        result = images
    return result

DALI 条件的语义可以理解为代码一次处理一个样本。

条件必须由标量样本表示 - 即具有 0 维形状。它可以是布尔值或 DALI 支持的任何数值类型 - 在后一种情况下,非零值被视为 True,零值被视为 False,这符合典型的 Python 语义。

此外,逻辑表达式 andornot 可以用于 DataNode()。前两个限制为布尔输入,not 允许与 if 语句条件相同的输入类型。逻辑表达式在评估时遵循短路规则。

您可以在 条件教程 中阅读更多内容。

防止 AutoGraph 转换#

@nvidia.dali.pipeline.do_not_convert#

阻止 AutoGraph 转换函数的装饰器。

在条件模式下,DALI 使用 TensorFlow 的 AutoGraph 的一个分支来转换代码,使我们能够重写和检测 if 语句,以便它们可以在处理 DALI pipeline 中使用。

AutoGraph 转换应用于 pipeline 定义中调用的任何顶级函数或方法(以及 pipeline 定义本身)。当函数被转换时,在其语法范围内定义的所有函数也会被转换。重写除其他效果外,还使这些函数不可序列化。

要阻止函数被转换,必须使用此装饰器标记其顶级包含函数。这有时可能需要将函数重构到外部范围。

external source 的并行模式 (parallel=True) 要求其 source 参数是可序列化的。为了防止重写 source,用于创建 source 的函数应使用 @do_not_convert 进行装饰。

注意

只有不处理 DataNode (因此不使用 DALI 操作符) 的函数才应使用此装饰器标记。

例如

from nvidia.dali import pipeline_def, fn

@pipeline_def(enable_conditionals=True)
def pipe():

    def source_factory(size):
        def source_fun(sample_info):
            return np.full(size, sample_info.iter_idx)
        return source_fun

    source = source_factory(size=(2, 1))
    return fn.external_source(source=source, parallel=True, batch=False)

应转换为

from nvidia.dali import pipeline_def, fn
from nvidia.dali.pipeline import do_not_convert

@do_not_convert
def source_factory(size):
    def source_fun(sample_info):
        return np.full(size, sample_info.iter_idx)
    return source_fun

@pipeline_def(enable_conditionals=True)
def pipe():
    source = source_factory(size=(2, 1))
    return fn.external_source(source=source, parallel=True, batch=False)

source_factory 必须被分解出来,否则它将作为 pipeline 定义的一部分进行转换。由于我们有兴趣防止 source_fun 的 AutoGraph 转换,我们需要装饰其顶级包含函数。

注意

如果在 pipeline 定义之外声明函数,并作为参数传递,但未在 pipeline 定义中直接调用,则不会对其进行转换。在这种情况下,传递给 external source 操作符、python function 操作符族或 Numba function 操作符的回调不被视为在 pipeline 定义中直接调用。此类回调在 pipeline 运行时执行,因此在定义和构建 pipeline 之后执行。

例如

from nvidia.dali import pipeline_def, fn

def source_fun(sample_info):
    return np.full((2, 2), sample_info.iter_idx)

@pipeline_def(enable_conditionals=True)
def pipe():
    return fn.external_source(source=source_fun, batch=False)

source_fun 不会被转换,因为它是在 pipeline 定义之外定义的,并且仅通过名称传递给外部源。

Pipeline 类#

class nvidia.dali.Pipeline(batch_size=-1, num_threads=-1, device_id=-1, seed=-1, exec_pipelined=True, prefetch_queue_depth=2, exec_async=True, bytes_per_sample=0, set_affinity=False, max_streams=None, default_cuda_stream_priority=None, *, enable_memory_stats=False, enable_checkpointing=False, checkpoint=None, py_num_workers=1, py_start_method='fork', py_callback_pickler=None, output_dtype=None, output_ndim=None, exec_dynamic=False, experimental_exec_dynamic=None)#

Pipeline 类是所有 DALI 数据管线的基础。管线封装了数据处理图和执行引擎。

参数:
  • batch_size (int, optional, default = -1) –

    管线的最大批大小。此参数的负值无效 - 默认值只能用于序列化管线(使用存储在序列化管线中的值)。在大多数情况下,管线的实际批大小将等于最大批大小。也支持使用较小的批大小运行 DALI 管线。批大小可能会在迭代之间发生变化。

    请注意,DALI 可能会根据此参数执行内存预分配。将其设置得过高可能会导致内存不足错误。

  • num_threads (int, optional, default = -1) – 管线使用的 CPU 线程数。此参数的负值无效 - 默认值只能用于序列化管线(使用存储在序列化管线中的值)。

  • device_id (int, optional, default = -1) – 管线使用的 GPU 的 ID。此参数的 None 值表示 DALI 不应使用 GPU 或 CUDA 运行时。这会将管线限制为仅 CPU 算子,但允许它在任何具有 CPU 功能的机器上运行。

  • seed (int, optional, default = -1) – 用于随机数生成的种子。为此参数保留默认值将导致随机种子。

  • exec_pipelined (bool, optional, default = True) – 是否以启用 CPU 和 GPU 计算重叠的方式执行管线,通常会带来更快的执行速度,但会消耗更多的内存。

  • prefetch_queue_depth (int or {"cpu_size": int, "gpu_size": int}, optional, default = 2) –

    执行器管线的深度。更深的管线使 DALI 更能抵抗每个批次的不均匀执行时间,但它也会消耗更多内存用于内部缓冲区。指定一个字典

    { "cpu_size": x, "gpu_size": y }

    而不是整数将导致管线使用分离队列执行器,CPU 阶段的缓冲区队列大小为 x,混合和 GPU 阶段的缓冲区队列大小为 y。执行器将分别缓冲 CPU 和 GPU 阶段,并在发出第一个 run() 时填充缓冲区队列。分离执行要求 exec_async=Trueexec_pipelined=Trueexec_dynamic=False

  • exec_async (bool, optional, default = True) – 是否异步执行管线。这使得 run() 方法相对于调用 Python 线程异步运行。为了与管线同步,需要调用 outputs() 方法。

  • exec_dynamic (bool, optional, default = False) – 是否使用动态执行器。动态执行器允许交错 CPU 和 GPU 算子,并执行 GPU 到 CPU 的复制。它还对管线输出和算子间缓冲区使用动态内存分配,从而减少复杂管线中的内存消耗。当 exec_dynamicTrue 时,exec_asyncexec_pipelined 必须保留为其默认值 (True)。

  • bytes_per_sample (int, optional, default = 0) – DALI 用于其张量的内存使用量提示。

  • set_affinity (bool, optional, default = False) – 是否将 CPU 核心亲和性设置为最接近所用 GPU 的核心。

  • max_streams (int, deprecated, default = None) – 已弃用,此参数无效。

  • default_cuda_stream_priority (int, optional, default = None) – 已弃用,此参数无效。

  • enable_memory_stats (bool, optional, default = False) – DALI 是否应打印算子输出缓冲区统计信息。对 bytes_per_sample_hint 算子参数很有用。当 exec_dynamicTrue 时,此标志无效。

  • enable_checkpointing (bool, optional, default = False) –

    如果为 True,DALI 将跟踪算子的状态。在这种情况下,调用 checkpoint 方法会返回管线的序列化状态。可以使用作为 checkpoint 参数传递的序列化状态稍后重建同一管线,以从保存的迭代恢复运行。

    更多详细信息可以在此文档部分中找到。

  • checkpoint (str, optional, default = None) –

    checkpoint 方法接收的序列化检查点。构建管线后,其状态将从 checkpoint 恢复,并且管线从保存的迭代恢复执行。

    更多详细信息可以在此文档部分中找到。

  • py_num_workers (int, optional, default = 1) – 将处理并行 external_source() 回调的 Python 工作进程数。仅当至少有一个 external_source()parallel 设置为 True 时,池才启动。将其设置为 0 将禁用池,并且所有 ExternalSource 算子都将回退到非并行模式,即使 parallel 设置为 True 也是如此。

  • py_start_method (str, default = "fork") –

    确定 Python 工作进程的启动方式。支持的方法

    • "fork" - 通过 fork 进程启动

    • "spawn" - 启动新的解释器进程

    如果使用 spawn 方法,则 ExternalSource 的回调必须是可 pickle 的。为了使用 fork,启动工作进程时不得获取 CUDA 上下文。因此,如果您需要构建多个使用 Python 工作进程的管线,则需要在构建或运行任何管线之前调用 start_py_workers()(有关详细信息,请参阅 build())。您可以在 Python 的 multiprocessing 模块文档中找到有关这两种方法的更多详细信息和注意事项。

  • py_callback_pickler (module or tuple, default = None) –

    如果 py_start_method 设置为 spawn,则传递给并行 ExternalSource 的回调必须是可 pickle 的。如果在 Python 3.8 或更高版本中运行,且 py_callback_pickler 设置为 None,则 DALI 在序列化回调时使用自定义 pickle,以支持本地函数和 lambda 的序列化。

    但是,如果您需要序列化更复杂的对象(如本地类)或者您正在运行旧版本的 Python,您可以提供外部序列化包(如 dill 或 cloudpickle),这些包实现了两个方法:dumpsloads,以使 DALI 使用它们来序列化外部源回调。您可以将模块直接作为 py_callback_pickler 传递

    import dill
    @pipeline_def(py_callback_pickler=dill, ...)
    def create_pipeline():
        src = fn.external_source(
            lambda sample_info: np.int32([42]),
            batch=False,
            parallel=True,
        )
        ...
    

    py_callback_pickler 的有效值可以是实现 dumpsloads 方法的模块/对象,也可以是元组,其中第一个项目是模块/对象,接下来的两个可选参数是在调用 dumps 和 loads 时要传递的额外 kwargs。所提供的方法和 kwargs 必须可使用标准 pickle.dumps 进行 pickle。

    如果您运行的是 Python 3.8 或更高版本,并使用默认的 DALI pickler (py_callback_pickler = None),您可以提示 DALI 通过使用 @dali.pickling.pickle_by_value 修饰全局函数,按值而不是按引用序列化全局函数。当使用 Jupyter notebook 时,这可能特别有用,以解决工作进程无法导入在 notebook 中定义为全局函数的回调的问题。

  • output_dtype (nvidia.dali.types.DALIDataType 或其列表, default = None) –

    使用此参数,您可以声明您期望在给定输出中使用的数据类型。您应传递 types.DALIDataType 的列表,列表中的每个元素对应于管线的一个输出。此外,您可以传递 None 作为通配符。在每次迭代之后,将根据您传递给此参数的类型验证输出。如果任何输出与提供的类型不匹配,则会引发 RuntimeError。

    如果 output_dtype 值是单个值(而不是列表),则会将其广播到管线的输出数量。

  • output_ndim (int or list of ints, default = None) –

    使用此参数,您可以声明您期望在给定输出中使用的维度数。您应传递整数列表,列表中的每个元素对应于管线的一个输出。此外,您可以传递 None 作为通配符。在每次迭代之后,将根据您传递给此参数的维度数验证输出。如果任何输出的维度与提供的 ndim 不匹配,则会引发 RuntimeError。

    如果 output_ndim 值是单个值(而不是列表),则会将其广播到管线的输出数量。

__enter__()#

安全地将管线设置为当前管线。调用具有副作用或没有输出的算子时,需要当前管线。此类算子的示例包括 PythonFunction(潜在的副作用)或 DumpImage(没有输出)。

任何悬空算子都可以标记为具有副作用,如果它标记了 preserve=True,这对于调试可能很有用 - 否则,不贡献于管线输出的算子将从图中删除。

要手动设置新的(和恢复以前的)当前管线,请分别使用 push_current()pop_current()

__exit__(exception_type, exception_value, traceback)#

安全地恢复以前的管线。

add_sink(edge)#

将边标记为数据接收器,防止其被修剪,即使它未连接到管线输出。

property batch_size#

批大小。

build()#

构建管线(可选步骤)。

实例化管线的后端对象并启动处理线程。如果管线使用多处理 external_source,则也会启动工作进程。在大多数情况下,无需手动调用 build。当使用多处理时,可能有必要在主进程与 GPU 进行任何交互之前调用 build()start_py_workers()。如果需要,可以在运行管线之前使用 build() 来分离图构建和处理步骤。

当管线被自动构建时,它会:

checkpoint(filename=None)#

以序列化的 Protobuf 字符串形式返回管线的状态。

此外,如果指定了 filename,则序列化的检查点将写入指定的文件。文件内容将被覆盖。

可以使用保存的检查点稍后重建同一管线,作为 checkpoint 参数传递,以从保存的迭代恢复执行。

更多详细信息可以在此文档部分中找到。

参数:

filename (str) – 将序列化管线写入的文件。

property cpu_queue_size#

CPU 阶段提前处理的迭代次数。

static current()#

返回由 push_current() 设置的当前管线的实例。

property default_cuda_stream_priority#

已弃用;始终为 0。

define_graph()#

此函数由用户定义,用于构建其管线操作图。

它返回通过调用 DALI 算子创建的输出列表。

classmethod deserialize(serialized_pipeline=None, filename=None, **kwargs)#

反序列化并构建管线。

反序列化先前使用 serialize() 方法序列化的管线。

返回的管线已构建。

或者,可以传递额外的参数,这些参数将在实例化管线时使用。有关参数的完整列表,请参阅 Pipeline 构造函数。默认情况下,将使用来自序列化管线的参数实例化管线。

请注意,serialized_pipelinefilename 参数是互斥的

参数:
  • serialized_pipeline (str) – 使用 serialize() 方法序列化的管线。

  • filename (str) – 将从中读取序列化管线的文件。

  • kwargs (dict) – 有关参数的完整列表,请参阅 Pipeline 构造函数。

返回类型:

反序列化并构建的管线。

deserialize_and_build(serialized_pipeline)#

反序列化并构建以序列化形式给出的管线。

参数:

serialized_pipeline (str) – 序列化的管线。

property device_id#

管线使用的 GPU 的 ID,对于仅 CPU 管线,则为 None。

empty()#

如果管线中已安排但尚未消耗任何工作

enable_api_check(enable)#

允许在运行时启用或禁用 API 检查

property enable_memory_stats#

如果为 True,则收集内存使用情况统计信息。

epoch_size(name=None)#

管线的 epoch 大小。

如果 name 参数为 None,则返回 (reader name, 该 reader 的 epoch 大小) 对的字典。如果 name 参数不为 None,则返回该 reader 的 epoch 大小。

参数:

name (str, optional, default = None) – 应用于获取 epoch 大小的 reader。

property exec_async#

如果为 true,则使用异步执行。

property exec_dynamic#

如果为 true,则使用动态执行器。

property exec_pipelined#

如果为 true,则使用管线执行模型。

property exec_separated#

如果为 True,则 CPU 和 GPU 阶段有单独的预取队列。

executor_statistics()#

以字典形式返回提供的管线执行器统计元数据。字典中的每个键都是算子名称。要启用它,请使用 executor_statistics

每个算子的可用元数据键

  • real_memory_size - 算子的每个输出使用的内存大小列表。列表中的索引对应于输出索引。

  • max_real_memory_size - 算子的每个输出使用的最大张量大小的列表。列表中的索引对应于输出索引。

  • reserved_memory_size - 为每个算子输出保留的内存大小列表。列表中的索引对应于输出索引。

  • max_reserved_memory_size - 为每个算子输出保留的每个张量的最大内存大小的列表。列表中的索引对应于输出索引。

注意

当使用 exec_dynamic=True 时,执行器统计信息不可用。

external_source_shm_statistics()#

返回有关共享内存消耗的并行外部源的统计信息。返回的字典包含以下键

  • capacities - 分配给共享内存槽的大小(以字节为单位)列表,用于容纳并行外部源生成的数据。

  • per_sample_capacities - 共享内存槽大小(以字节为单位)的列表,除以 mini-batch 大小,即此类槽中存储的最大样本数。此值对应于外部源的 bytes_per_sample_hint 参数,即,如果提示足够大并且外部源不需要重新分配内存,则这些值应相等。

feed_input(data_node, data, layout=None, cuda_stream=None, use_copy_kernel=False)#

将多维数组或 DLPack(或列表)传递给符合条件的操作符。

可以使用此函数提供数据的操作符是输入操作符(即 fn.inputs 模块中的所有内容)和 fn.external_source()

对于 GPU 输入的情况,必须在与 feed_input 使用的流相同的流上修改数据。有关详细信息,请参阅 cuda_stream 参数。

为了避免停顿,数据应提前 prefetch_queue_depth 次提供。

参数:
  • data_node (DataNode 或字符串) – 符合条件的操作符节点的名称或调用该操作符返回的 DataNode 对象。

  • data (ndarrayDLPack列表) –

    数组可以是以下之一

    • NumPy ndarray (CPU)

    • MXNet ndarray (CPU)

    • PyTorch tensor (CPU 或 GPU)

    • CuPy array (GPU)

    • 实现 __cuda_array_interface__ 的对象

    • DALI TensorList 或 DALI Tensor 对象列表

    要用作 data_node 引用的操作符输出的数据。

  • layout (字符串或 None) – 数据布局的描述(如果未指定,则为空字符串)。它应该是与数据维度匹配的长度字符串,不包括批次维度。对于通道优先图像批次,应为 "CHW",对于通道最后视频,应为 "FHWC",依此类推。如果 data 是 DALI TensorList 或 DALI Tensor 对象列表,并且 layoutNone,则布局取自 data。每次迭代中数据布局必须相同。

  • cuda_stream (可选, cudaStream_t 或可转换为 cudaStream_t 的对象,) –

    例如 cupy.cuda.Stream, torch.cuda.Stream。CUDA 流,将用于将数据复制到 GPU 或从 GPU 源复制数据。如果未设置,将尽力保持正确性 - 即,如果数据是从已识别的库(CuPy、PyTorch)作为张量/数组提供的,则使用库的当前流。这在典型情况下应该有效,但高级用例(和使用不支持的库的代码)可能仍然需要显式提供流句柄。

    特殊值

    • 0 - 使用默认 CUDA 流

    • -1 - 使用 DALI 内部流

    如果使用内部流,则对 feed_input 的调用将阻塞,直到复制到内部缓冲区完成,因为无法与此流同步以防止在另一个流中使用新数据覆盖数组。

  • use_copy_kernel (可选, bool) – 如果设置为 True,DALI 将使用 CUDA 内核来馈送数据(仅适用于将数据复制到/从 GPU 内存复制数据时),而不是 cudaMemcpyAsync (默认)。

property gpu_queue_size#

GPU 阶段提前处理的迭代次数。

property is_restored_from_checkpoint#

如果为 True,则此 pipeline 是从检查点恢复的。

iter_setup()#

一种已弃用的为 pipeline 提供外部输入的方法。

用户定义的 pipeline 可以覆盖此函数,以便为每次迭代执行任何需要的设置。例如,可以使用此函数从 NumPy 数组馈送输入数据。

此方法已弃用,不鼓励使用。较新的执行模型可能与这种为 pipeline 提供数据的方法不兼容。尽可能使用 source 参数在 external_source 中代替。

property max_batch_size#

最大批次大小。

property max_streams#

已弃用,未使用;返回 -1。

property num_threads#

此 pipeline 使用的 CPU 线程数。

output_dtype()#

输出端预期的数据类型。

output_ndim()#

输出端预期的维度数。

output_stream()#

返回生成输出的内部 CUDA 流。

outputs(cuda_stream=None)#

返回 pipeline 的输出并释放先前的缓冲区。

如果 pipeline 是异步执行的,则此函数将阻塞,直到结果可用。如果数据集已到达末尾,则会引发 StopIteration - 通常在 iter_setup 无法生成更多数据时。

参数:

cuda_stream (可选, cudaStream_t 或可转换为 cudaStream_t 的对象,) – 例如 cupy.cuda.Stream, torch.cuda.Stream。返回的 TensorLists 绑定到的流。默认为 None,这意味着输出与主机同步。仅适用于使用 exec_dynamic=True 构建的 pipeline。

返回类型:

各个 pipeline 输出的 TensorList 对象列表

static pop_current()#

将先前的 pipeline 恢复为当前 pipeline。push_current() 的补充。

property prefetch_queue_depth#

预取队列的深度(或深度),如 __init__ 参数中指定。

static push_current(pipeline)#

将 pipeline 设置为当前 pipeline,并将先前的当前 pipeline 存储在堆栈上。要将先前的 pipeline 恢复为当前 pipeline,请使用 pop_current()

为确保在发生异常时正确恢复 pipeline,请使用上下文管理器 (with my_pipeline:)。

调用具有副作用或没有输出的操作符需要当前 pipeline。此类操作符的示例包括 PythonFunction (潜在的副作用)或 DumpImage (没有输出)。

任何悬空算子都可以标记为具有副作用,如果它标记了 preserve=True,这对于调试可能很有用 - 否则,不贡献于管线输出的算子将从图中删除。

property py_num_workers#

并行 `external_source` 使用的 Python 工作进程数。

property py_start_method#

并行 `external_source` 使用的启动 Python 工作进程的方法。

reader_meta(name=None)#

以字典形式返回提供的 reader 元数据。如果未提供名称,则提供包含所有 reader 数据的字典,格式为 {reader_name : meta}

可用的元数据键

epoch_size:原始 epoch 大小

epoch_size_padded:epoch 大小,末尾填充以可被

分片数整除

number_of_shards:分片数

shard_id:给定 reader 的分片 ID

pad_last_batch:给定 reader 是否应填充最后一个批次

stick_to_shard:给定 reader 是否应坚持其分片

参数:

name (str, 可选, 默认 = None) – 应用于获取 shards_number 的 reader。

release_outputs()#

释放 share_outputs 调用返回的缓冲区。

当输出调用结果被使用(复制)并且可以在下次调用 share_outputs 之前将缓冲区标记为空闲时,这很有帮助。它为用户提供了更好的控制,可以控制何时运行 pipeline,何时获取结果缓冲区,以及何时可以在结果被使用后将其返回到 DALI 池。需要与 schedule_run()share_outputs() 一起使用。不应与同一 pipeline 中的 run() 混合使用。

注意

当使用动态执行器(exec_dynamic=True)时,缓冲区不会失效。

reset()#

重置 pipeline 迭代器

如果 pipeline 迭代器已到达末尾,则将其状态重置为开始。

run(cuda_stream=None, /, **pipeline_inputs)#

运行 pipeline 并在指定的 CUDA 流上返回结果。

如果 pipeline 是使用 exec_pipelined 选项设置为 True 创建的,则此函数还将启动预取下一次迭代以加快执行速度。不应与同一 pipeline 中的 schedule_run()share_outputs()release_outputs() 混合使用

参数:
  • cuda_stream (可选, cudaStream_t 或可转换为 cudaStream_t 的对象,) – 如果提供,则在此流上返回输出。如果跳过,则结果是主机同步的。请注意,使用 prefetch_queue_depth>1 可以获得主机同步输出,而无需等待最近迭代的结果。

  • pipeline_inputs

    可用于为 DALI 提供输入的可选参数。当 DALI 定义了任何输入操作符(例如 fn.external_source)时,您可以使用此函数中的命名参数为这些操作符提供输入。假设 DALI pipeline 具有正确定义和命名的这些输入操作符

    @pipeline_def
    def my_pipe():
        inp = fn.external_source(name="my_inp")
        return inp
    

    使用上面的示例 pipeline,您可以将 "my_inp" 输入到 run() 函数中

    p = my_pipe(prefetch_queue_depth=1, ...)
    p.run(my_inp=np.random((2,3,2)))
    

    run() 函数中指定的此类关键字参数必须在 DALI pipeline 中声明相应的输入操作符节点。

    与使用 DALI 时一样,传递给关键字参数的值必须表示整个批次的数据。

    请注意,使用此功能需要在 DALI Pipeline 构造函数中设置 prefetch_queue_depth=1exec_pipelined=False

    此功能可以被视为 feed_input() 函数的语法糖。

返回类型:

各个 pipeline 输出的 TensorList 对象元组

save_graph_to_dot_file(filename, *, show_tensors=False, show_ids=None, use_colors=False)#

将 pipeline 图形保存到文件。

参数:
  • filename (str) – 要将图形写入的文件名。

  • show_tensors (bool) – 在图形中显示 Tensor 节点(默认情况下仅显示操作符节点)

  • show_ids (bool, 已弃用) – 此标志已过时,不起作用

  • use_colors (bool) – 是否使用颜色来区分阶段

schedule_run()#

运行 pipeline,但不返回结果缓冲区。

如果 pipeline 是使用 exec_pipelined 选项设置为 True 创建的,则此函数还将启动预取下一次迭代以加快执行速度。它为用户提供了更好的控制,可以控制何时运行 pipeline,何时获取结果缓冲区,以及在结果被使用后何时将其返回到 DALI 缓冲区池。需要与 release_outputs()share_outputs() 一起使用。不应与同一 pipeline 中的 run() 混合使用

property seed#

pipeline 中使用的随机种子,如果种子未固定,则为 None。

serialize(define_graph=None, filename=None)#

将 pipeline 序列化为 Protobuf 字符串。

此外,您可以传递文件名,以便将序列化的 pipeline 写入该文件。文件内容将被覆盖。

参数:
  • define_graph (callable) – 如果指定,则将使用此函数代替成员 define_graph()。如果 pipeline 输出是使用 set_outputs() 指定的,则不得设置此参数。

  • filename (str) – 将序列化 pipeline 写入的文件。

  • kwargs (dict) – 有关参数的完整列表,请参阅 Pipeline 构造函数。

property set_affinity#

如果为 True,则工作线程绑定到 CPU 核心。

set_outputs(*output_data_nodes)#

设置 pipeline 的输出。

此函数的使用是派生类中覆盖 define_graph 的替代方法。

参数:

*output_data_nodes (解包的 DataNode 对象列表) – pipeline 的输出

share_outputs(cuda_stream=None)#

返回 pipeline 的输出。

outputs() 的主要区别在于 share_outputs 不会释放返回的缓冲区,需要调用 release_outputs 来释放。如果 pipeline 是异步执行的,则此函数将阻塞,直到结果可用。它为用户提供了更好的控制,可以控制何时运行 pipeline,何时获取结果缓冲区,以及何时可以在结果被使用后将其返回到 DALI 池。需要与 release_outputs()schedule_run() 一起使用。不应与同一 pipeline 中的 run() 混合使用。

参数:

cuda_stream (可选, cudaStream_t 或可转换为 cudaStream_t 的对象,) – 例如 cupy.cuda.Stream, torch.cuda.Stream。返回的 TensorLists 绑定到的流。默认为 None,这意味着输出与主机同步。仅适用于使用 exec_dynamic=True 构建的 pipeline。

返回:

  • 各个 pipeline 输出的 TensorList 对象列表。

  • 除非使用动态执行器,否则返回的缓冲区仅在以下情况下有效

  • release_outputs() 被调用。

start_py_workers()#

启动 Python 工作进程(将运行 ExternalSource 回调)。当使用 fork 启动 Python 工作进程(py_start_method="fork")时,您需要在调用任何创建或获取 CUDA 上下文的功能之前调用 start_py_workers()。当不需要这种分离时,Pipeline.build() 方法会自动调用它。

如果您要构建多个通过 fork 进程启动 Python worker 的 pipeline,那么您需要在所有这些 pipeline 上调用 start_py_workers() 方法,然后再调用任何构建或运行 pipeline 的方法(详见 build() 的详细信息),因为构建操作会为当前进程获取 CUDA 上下文。

同样适用于使用任何其他会创建 CUDA 上下文的功能 - 例如,初始化使用 CUDA 的框架或使用 CUDA 创建 CUDA 张量。当使用 py_start_method="fork" 时,您需要在调用此类功能之前调用 start_py_workers()

fork 具有 CUDA 上下文的进程是不受支持的,并且可能导致意外错误。

如果您使用此方法,则在调用 build() 时,不能指定 define_graph 参数。

DataNode#

class nvidia.dali.pipeline.DataNode(name, device='cpu', source=None)#

此类是 TensorList 的符号表示,用于图定义阶段。它不携带实际数据,但用于定义运算符之间的连接并指定 pipeline 输出。 有关详细信息,请参阅 Pipeline 的文档。

DataNode 对象可以作为输入(以及一些命名的关键字参数)传递给 DALI 运算符,但它们也提供算术运算,这些运算隐式地创建执行表达式的相应运算符。

property(key, *, device='cpu')#

返回与 DataNode 关联的元数据属性

参数:
  • key (str) –

    元数据项的名称。当前支持:“source_info” - 数据来源的文件名或数据集中的位置

    (每个样本都是一个 1D uint8 张量)

    “layout” - 布局字符串

    (每个样本都是一个 1D uint8 张量)

  • device (str, optional) – 返回值的设备;默认为 CPU。

shape(*, dtype=None, device='cpu')#

将此 DataNode 的运行时形状作为新的 DataNode 返回

参数:
  • arg_dtype (DALIDataType, optional) – 如果指定,形状将转换为此数据类型;默认为 INT64。

  • device (str, optional) – 返回结果的设备(“cpu” 或 “gpu”);默认为 CPU。

source_info(*, device='cpu')#

返回 “source_info” 属性。等效于 self.meta(“source_info”)。

实验性 Pipeline 功能#

一些额外的实验性功能可以通过 pipeline 装饰器的特殊变体启用。

@nvidia.dali.pipeline.experimental.pipeline_def(fn=None, *, enable_conditionals=False, **pipeline_kwargs)#

@pipeline_def 装饰器的变体,它启用额外的实验性功能。它具有与其非实验性变体相同的 API,并增加了下面列出的关键字参数。

关键字参数:
  • debug (bool, optional) –

    启用 pipeline 调试模式 - 允许对 pipeline 定义进行逐步执行和中间数据检查,默认为 False。

    注意

    此模式仅用于调试目的 - pipeline 性能将明显低于非调试模式。

  • note:: (..) – 由此装饰器启用的功能是实验性的。API 可能会更改,并且功能可能会受到限制。

Pipeline 调试模式(实验性)#

Pipeline 可以在调试模式下运行,方法是将 @nvidia.dali.pipeline_def 装饰器替换为其实验性变体 @nvidia.dali.pipeline.experimental.pipeline_def 并将参数 debug 设置为 True。它允许您访问和修改 pipeline 执行图中的数据,以及使用非 DALI 数据类型作为 DALI 运算符的输入。

在此模式下,运算符的输出类型为 DataNodeDebug,它等效于标准模式下的 DataNode。 您可以对 DataNodeDebug 类型的对象执行与 DataNode 相同的操作,包括算术运算。

使用 .get() 在当前 Pipeline.run() 执行期间访问与 DataNodeDebug 对象关联的数据

@nvidia.dali.pipeline.experimental.pipeline_def(debug=True)
def my_pipe():
    data, _ = fn.readers.file(file_root=images_dir)
    img = fn.decoders.image(data)
    print(np.array(img.get()[0]))
    ...

直接将非 DALI 数据类型(例如 NumPy ndarray,PyTorch Tensor)与 DALI 运算符一起使用

@nvidia.dali.pipeline.experimental.pipeline_def(batch_size=8, debug=True)
def my_pipe():
    img = [np.random.rand(640, 480, 3) for _ in range(8)]
    output = fn.flip(img)
    ...

注意#

  • 调试模式下的种子生成与标准模式不同(它是确定性的但不同)。如果您想在调试模式下获得与标准模式相同的结果,请使用 seed 参数初始化运算符。

  • 直接调用运算符仅在 pipeline_def 函数的范围内有效,您不能在 pipeline_def 之外以这种方式使用它们。

  • 您不能在迭代之间更改 pipeline 内运算符的顺序。

警告

使用调试模式会大大降低 pipeline 的性能。仅将其用于调试目的。

注意

此功能是实验性的,其 API 可能会更改,恕不另行通知。