Pipeline#

在 DALI 中,任何数据处理任务都有一个中心对象,称为 Pipeline。Pipeline 对象是 nvidia.dali.Pipeline 或派生类的实例。Pipeline 封装了数据处理图和执行引擎。

您可以通过以下方式定义 DALI Pipeline

  1. 通过实现一个在内部使用 DALI 算子的函数,并使用 pipeline_def() 装饰器对其进行装饰。

  2. 通过直接实例化 Pipeline 对象,构建图,并使用 Pipeline.set_outputs() 设置 pipeline 输出。

  3. 通过继承 Pipeline 类并重写 Pipeline.define_graph()(这是定义 DALI Pipeline 的传统方法)。

数据处理图#

DALI pipeline 表示为操作图。图中有两种节点:

  • 算子 - 每次调用算子时创建

  • 数据节点(参见 DataNode)- 表示算子的输出和输入;它们从算子的调用中返回,并将它们作为输入传递给其他算子,从而在图中建立连接。

数据节点可以通过调用 算子函数 进行转换。它们还支持 Python 样式的 索引,并且可以合并到 数学表达式 中。

示例

@pipeline_def  # create a pipeline with processing graph defined by the function below
def my_pipeline():
    """ Create a pipeline which reads images and masks, decodes the images and
        returns them. """
    img_files, labels = fn.readers.file(file_root="image_dir", seed=1)
    mask_files, _ = fn.readers.file(file_root="mask_dir", seed=1)
    images = fn.decoders.image(img_files, device="mixed")
    masks  = fn.decoders.image(mask_files, device="mixed")
    return images, masks, labels

pipe = my_pipeline(batch_size=4, num_threads=2, device_id=0)

结果图为

_images/two_readers.svg

重要提示

pipeline 定义函数仅在构建 pipeline 时执行一次,并且通常返回 dali.DataNode 对象或其元组。为了方便起见,可以返回其他类型,例如 NumPy 数组,但这些类型被视为常量,并且仅评估一次。

处理图结构#

DALI pipeline 分阶段执行。这些阶段对应于可以为算子指定的 device 参数,并按以下顺序执行:

  1. 'cpu' - 接受 CPU 输入并生成 CPU 输出的算子。

  2. 'mixed' - 接受 CPU 输入并生成 GPU 输出的算子,例如 nvidia.dali.fn.decoders.image()

  3. 'gpu' - 接受 GPU 输入并生成 GPU 输出的算子。

CPU 算子生成的数据可以通过在 DataNode (DALI 算子的输出)上调用 .gpu() 显式复制到 GPU。

后期阶段生成的数据不能被早期阶段执行的算子使用。

大多数 DALI 算子接受用于参数化其行为的其他关键字参数。这些命名的关键字参数(与位置输入不同)可以是:

  • Python 常量

  • 参数输入 - CPU 算子的输出 - 在算子的文档字符串中表示为 TensorList

在参数输入的情况下,将一个算子的输出作为另一个算子的命名关键字参数传递,将在处理图中建立连接。

这些参数将作为 DALI pipeline 图的一部分,在每次迭代和每个样本中计算。请记住,只有 CPU 算子可以用作参数输入。

示例

@pipeline_def
def my_pipeline():
    img_files, labels = fn.readers.file(file_root='image_dir', device='cpu')
    # `images` is GPU data (result of Mixed operator)
    images = fn.decoders.image(img_files, device='mixed')
    # `coin_flip` must be on CPU so the `flip_params` can be used as argument input
    flip_param = fn.random.coin_flip(device='cpu')
    # `images` is input (GPU) and `flip_param` is argument input (CPU)
    flipped = fn.flip(images, horizontal=flip_param, device='gpu')
    # `labels` is explicitly marked for transfer to GPU, `flipped` is already GPU
    return flipped, labels.gpu()

pipe = my_pipeline(batch_size=4, num_threads=2, device_id=0)

注意

如果未指定 device 参数,则会根据输入的位置自动选择。如果至少有一个 GPU 输入,则假定为 device='gpu',否则使用 'cpu'

上面的示例为了清晰起见显式添加了 device 参数,但如果仅为 fn.decoders.image 指定 device='mixed',则效果相同。

当前 Pipeline#

不影响 pipeline 输出的子图会自动修剪。如果算子具有副作用(例如 PythonFunction 算子系列),则在未设置当前 pipeline 的情况下无法调用它。当在派生 pipeline 的 Pipeline.define_graph() 方法内部定义图时,将隐式设置当前 pipeline。否则,可以使用上下文管理器(with 语句)进行设置

pipe = dali.Pipeline(batch_size=N, num_threads=3, device_id=0)
with pipe:
    src = dali.ops.ExternalSource(my_source, num_outputs=2)
    a, b = src()
    pipe.set_outputs(a, b)

当使用 pipeline_def() 创建 pipeline 时,定义 pipeline 的函数在新创建的 pipeline 的作用域内执行。以下示例与上一个示例等效:

@dali.pipeline_def(batch_size=N, num_threads=3, device_id=0)
def my_pipe(my_source):
    return dali.fn.external_source(my_source, num_outputs=2)

pipe = my_pipe(my_source)

Pipeline 装饰器#

@nvidia.dali.pipeline_def(fn=None, *, enable_conditionals=False, **pipeline_kwargs)#

将图定义函数转换为 DALI pipeline 工厂的装饰器。

图定义函数是返回预期 pipeline 输出的函数。您可以使用 @pipeline_def 装饰此函数

@pipeline_def
def my_pipe(flip_vertical, flip_horizontal):
    ''' Creates a DALI pipeline, which returns flipped and original images '''
    data, _ = fn.readers.file(file_root=images_dir)
    img = fn.decoders.image(data, device="mixed")
    flipped = fn.flip(img, horizontal=flip_horizontal, vertical=flip_vertical)
    return flipped, img

装饰后的函数返回 DALI Pipeline 对象

pipe = my_pipe(True, False)
# pipe.run()  # the pipeline is not configured properly yet

pipeline 需要其他参数,例如批大小、worker 线程数、GPU 设备 ID 等(有关 pipeline 参数的完整列表,请参见 nvidia.dali.Pipeline())。这些参数可以作为附加的关键字参数提供,并传递给装饰后的函数

pipe = my_pipe(True, False, batch_size=32, num_threads=1, device_id=0)

pipeline 已正确配置,我们现在可以运行它了。原始函数的输出变成了 Pipeline 的输出

flipped, img = pipe.run()

当某些 pipeline 参数是固定的时,可以在装饰器中按名称指定它们

@pipeline_def(batch_size=42, num_threads=3)
def my_pipe(flip_vertical, flip_horizontal):
    ...

稍后在调用装饰后的函数时传递的任何 Pipeline 构造函数参数都将覆盖装饰器定义的参数

@pipeline_def(batch_size=32, num_threads=3)
def my_pipe():
    data = fn.external_source(source=my_generator)
    return data

pipe = my_pipe(batch_size=128)  # batch_size=128 overrides batch_size=32

警告

被装饰函数的参数可能会遮蔽 pipeline 构造函数参数 - 在这种情况下,无法更改它们的值。

注意

在图定义函数中使用 **kwargs (可变关键字参数)是不允许的。它们可能会导致 Pipeline 构造函数意外地、静默地劫持一些同名参数。以这种方式编写的代码在未来版本的 DALI 中添加新参数到 Pipeline 构造函数时将停止工作。

要访问 @pipeline_def 函数体内的任何 pipeline 参数,可以使用函数 nvidia.dali.Pipeline.current()

@pipeline_def()
def my_pipe():
    pipe = Pipeline.current()
    batch_size = pipe.batch_size
    num_threads = pipe.num_threads
    ...

pipe = my_pipe(batch_size=42, num_threads=3)
...
关键字参数:

enable_conditionals (bool, optional) – 启用对在 pipeline 定义中使用 if 语句有条件地执行 DALI 算子的支持,默认为 False。

条件执行#

DALI 允许使用 if 语句有条件地为批次中选定的样本执行算子。要启用此功能,请使用 @pipeline_def 装饰器定义 pipeline,并将 enable_conditionals 设置为 True

每个将 DataNode() 作为条件的 if 语句都将被识别为 DALI 条件语句。

例如,此 pipeline 以 25% 的概率将每张图像旋转 10 到 30 度之间的随机角度

@pipeline_def(enable_conditionals=True)
def random_rotate():
    jpegs, _ = fn.readers.file(device="cpu", file_root=images_dir)
    images = fn.decoders.image(jpegs, device="mixed")
    do_rotate = fn.random.coin_flip(probability=0.25, dtype=DALIDataType.BOOL)
    if do_rotate:
        result = fn.rotate(images, angle=fn.random.uniform(range=(10, 30)), fill_value=0)
    else:
        result = images
    return result

DALI 条件语句的语义可以理解为代码一次处理一个样本。

条件必须由标量样本表示 - 即具有 0-d 形状。它可以是布尔值或 DALI 支持的任何数值类型 - 在后一种情况下,非零值被视为 True,零值被视为 False,这符合典型的 Python 语义。

此外,逻辑表达式 andornot 可以用于 DataNode()。前两个仅限于布尔输入,not 允许与 if 语句条件相同的输入类型。逻辑表达式在评估时遵循短路规则。

您可以在条件教程中阅读更多内容。

防止 AutoGraph 转换#

@nvidia.dali.pipeline.do_not_convert#

阻止 AutoGraph 转换函数的装饰器。

在条件模式下,DALI 使用 TensorFlow 的 AutoGraph 的一个分支来转换代码,使我们能够重写和检测 if 语句,以便它们可以在处理 DALI pipeline 中使用。

AutoGraph 转换应用于 pipeline 定义中调用的任何顶级函数或方法(以及 pipeline 定义本身)。当一个函数被转换时,在其语法范围内定义的所有函数也会被转换。重写除其他效果外,还使这些函数不可序列化。

要阻止函数被转换,必须使用此装饰器标记其顶级包含函数。有时这可能需要将函数重构到外部作用域。

external source 的并行模式(parallel=True)要求其 source 参数是可序列化的。为了防止重写 source,用于创建 source 的函数应使用 @do_not_convert 进行装饰。

注意

只有不处理 DataNode (因此不使用 DALI 算子)的函数才应使用此装饰器标记。

例如

from nvidia.dali import pipeline_def, fn

@pipeline_def(enable_conditionals=True)
def pipe():

    def source_factory(size):
        def source_fun(sample_info):
            return np.full(size, sample_info.iter_idx)
        return source_fun

    source = source_factory(size=(2, 1))
    return fn.external_source(source=source, parallel=True, batch=False)

应转换为

from nvidia.dali import pipeline_def, fn
from nvidia.dali.pipeline import do_not_convert

@do_not_convert
def source_factory(size):
    def source_fun(sample_info):
        return np.full(size, sample_info.iter_idx)
    return source_fun

@pipeline_def(enable_conditionals=True)
def pipe():
    source = source_factory(size=(2, 1))
    return fn.external_source(source=source, parallel=True, batch=False)

source_factory 必须被分解出来,否则它将被作为 pipeline 定义的一部分进行转换。由于我们有兴趣防止 source_fun 的 AutoGraph 转换,我们需要装饰其顶级包含函数。

注意

如果在 pipeline 定义之外声明一个函数,并且作为参数传递,但未在 pipeline 定义中直接调用,则它不会被转换。在这种情况下,传递给 external source 算子、python function 算子系列或 Numba function 算子的回调不被视为在 pipeline 定义中直接调用。此类回调在 pipeline 运行时执行,因此在 pipeline 定义和构建之后执行。

例如

from nvidia.dali import pipeline_def, fn

def source_fun(sample_info):
    return np.full((2, 2), sample_info.iter_idx)

@pipeline_def(enable_conditionals=True)
def pipe():
    return fn.external_source(source=source_fun, batch=False)

source_fun 不会被转换,因为它是在 pipeline 定义之外定义的,并且仅通过名称传递给外部源。

Pipeline 类#

class nvidia.dali.Pipeline(batch_size=-1, num_threads=-1, device_id=-1, seed=-1, exec_pipelined=True, prefetch_queue_depth=2, exec_async=True, bytes_per_sample=0, set_affinity=False, max_streams=None, default_cuda_stream_priority=None, *, enable_memory_stats=False, enable_checkpointing=False, checkpoint=None, py_num_workers=1, py_start_method='fork', py_callback_pickler=None, output_dtype=None, output_ndim=None, exec_dynamic=False, experimental_exec_dynamic=None)#

Pipeline 类是所有 DALI 数据 pipeline 的基础。pipeline 封装了数据处理图和执行引擎。

参数:
  • batch_size (int, optional, default = -1) –

    pipeline 的最大批大小。此参数的负值无效 - 默认值只能用于序列化的 pipeline(使用序列化的 pipeline 中存储的值代替)。在大多数情况下,pipeline 的实际批大小将等于最大批大小。也支持以较小的批大小运行 DALI Pipeline。批大小可能会在迭代之间更改。

    请注意,DALI 可能会根据此参数执行内存预分配。将其设置得过高可能会导致内存不足错误。

  • num_threads (int, optional, default = -1) – pipeline 使用的 CPU 线程数。此参数的负值无效 - 默认值只能用于序列化的 pipeline(使用序列化的 pipeline 中存储的值代替)。

  • device_id (int, optional, default = -1) – pipeline 使用的 GPU 的 ID。此参数的 None 值表示 DALI 不应使用 GPU 或 CUDA 运行时。这会将 pipeline 限制为仅 CPU 算子,但允许它在任何支持 CPU 的机器上运行。

  • seed (int, optional, default = -1) – 用于随机数生成的种子。为此参数保留默认值将导致随机种子。

  • exec_pipelined (bool, optional, default = True) – 是否以允许 CPU 和 GPU 计算重叠的方式执行 pipeline,通常会导致更快的执行速度,但会消耗更多的内存。

  • prefetch_queue_depth (int or {"cpu_size": int, "gpu_size": int}, optional, default = 2) –

    执行器 pipeline 的深度。更深的 pipeline 使 DALI 更能抵抗每个批次执行时间的不均匀性,但它也会消耗更多内存用于内部缓冲区。指定字典

    { "cpu_size": x, "gpu_size": y }

    而不是整数将导致 pipeline 使用分离队列执行器,其中 CPU 阶段的缓冲区队列大小为 x,混合和 GPU 阶段的缓冲区队列大小为 y。执行器将分别缓冲 CPU 和 GPU 阶段,并在发出第一个 run() 时填充缓冲区队列。分离执行要求 exec_async=Trueexec_pipelined=Trueexec_dynamic=False

  • exec_async (bool, optional, default = True) – 是否异步执行 pipeline。 这使得 run() 方法相对于调用 Python 线程异步运行。 为了与 pipeline 同步,需要调用 outputs() 方法。

  • exec_dynamic (bool, optional, default = False) – 是否使用动态执行器。 动态执行器允许交错 CPU 和 GPU 算子,并执行 GPU 到 CPU 的复制。 它还为 pipeline 输出和算子间缓冲区使用动态内存分配,从而减少复杂 pipeline 中的内存消耗。 当 exec_dynamicTrue 时,exec_asyncexec_pipelined 必须保留其默认值 (True)。

  • bytes_per_sample (int, optional, default = 0) – 为 DALI 提供关于其 tensors 应该使用多少内存的提示。

  • set_affinity (bool, optional, default = False) – 是否将 CPU 核心亲和性设置为最接近所用 GPU 的核心。

  • max_streams (int, deprecated, default = None) – 已弃用,此参数无效。

  • default_cuda_stream_priority (int, optional, default = None) – 已弃用,此参数无效。

  • enable_memory_stats (bool, optional, default = False) – DALI 是否应打印算子输出缓冲区统计信息。 对 bytes_per_sample_hint 算子参数很有用。 当 exec_dynamicTrue 时,此标志无效。

  • enable_checkpointing (bool, optional, default = False) –

    如果为 True,DALI 将跟踪算子的状态。 在这种情况下,调用 checkpoint 方法会返回 pipeline 的序列化状态。 稍后可以使用作为 checkpoint 参数传递的序列化状态重建相同的 pipeline,以从保存的迭代恢复运行。

    更多详细信息可以在 此文档部分 中找到。

  • checkpoint (str, optional, default = None) –

    checkpoint 方法接收的序列化 checkpoint。 构建 pipeline 后,其状态将从 checkpoint 恢复,并且 pipeline 从保存的迭代恢复执行。

    更多详细信息可以在 此文档部分 中找到。

  • py_num_workers (int, optional, default = 1) – 将处理并行 external_source() 回调的 Python worker 的数量。 仅当至少有一个 ExternalSource 的 parallel 设置为 True 时,池才会启动。 将其设置为 0 将禁用池,并且即使 parallel 设置为 True,所有 ExternalSource 算子都将回退到非并行模式。

  • py_start_method (str, default = "fork") –

    确定如何启动 Python worker。 支持的方法

    • "fork" - 通过 fork 进程启动

    • "spawn" - 启动新的解释器进程

    如果使用 spawn 方法,则 ExternalSource 的回调必须是可 pickle 的。 为了使用 fork,启动 worker 时不得获取 CUDA 上下文。 因此,如果您需要构建使用 Python worker 的多个 pipeline,您需要在构建或运行任何 pipeline 之前调用 start_py_workers()(有关详细信息,请参见 build())。 您可以在 Python 的 multiprocessing 模块文档中找到有关这两种方法的更多详细信息和注意事项。

  • py_callback_pickler (module or tuple, default = None) –

    如果 py_start_method 设置为 spawn,则传递给并行 ExternalSource 的回调必须是可 pickle 的。 如果在 Python 3.8 或更高版本中运行,且 py_callback_pickler 设置为 None,则 DALI 在序列化回调时使用自定义 pickle,以支持本地函数和 lambda 的序列化。

    但是,如果您需要序列化更复杂的对象(例如本地类),或者您正在运行旧版本的 Python,您可以提供外部序列化包(例如 dill 或 cloudpickle),这些包实现了两个方法:dumpsloads,以使 DALI 使用它们来序列化外部源回调。 您可以直接将模块作为 py_callback_pickler 传递

    import dill
    @pipeline_def(py_callback_pickler=dill, ...)
    def create_pipeline():
        src = fn.external_source(
            lambda sample_info: np.int32([42]),
            batch=False,
            parallel=True,
        )
        ...
    

    py_callback_pickler 的有效值是实现 dumpsloads 方法的模块/对象,或者是一个元组,其中第一个项目是模块/对象,接下来的两个可选参数是在调用 dumps 和 loads 时要传递的额外 kwargs。 提供的方法和 kwargs 必须可以使用标准 pickle.dumps 进行 pickle。

    如果您运行的是 Python 3.8 或更高版本,并且使用默认的 DALI pickler (py_callback_pickler = None),您可以通过使用 @dali.pickling.pickle_by_value 修饰全局函数来提示 DALI 按值而不是按引用序列化全局函数。 当使用 Jupyter notebook 时,这可能特别有用,以解决 worker 进程无法导入在 notebook 内定义为全局函数的回调的问题。

  • output_dtype (nvidia.dali.types.DALIDataType 或其列表,default = None) –

    使用此参数,您可以声明您期望在给定输出中使用的数据类型。 您应传递 mod:types.DALIDataType 的列表,列表中的每个元素对应于 pipeline 的一个输出。 此外,您可以传递 None 作为通配符。 每次迭代后,输出将根据您传递给此参数的类型进行验证。 如果任何输出与提供的类型不匹配,则会引发 RuntimeError。

    如果 output_dtype 值是单个值(而不是列表),它将被广播到 pipeline 的输出数量。

  • output_ndim (int or list of ints, default = None) –

    使用此参数,您可以声明您期望在给定输出中使用的维度数。 您应传递整数列表,列表中的每个元素对应于 pipeline 的一个输出。 此外,您可以传递 None 作为通配符。 每次迭代后,输出将根据您传递给此参数的维度数进行验证。 如果任何输出的维度与提供的 ndim 不匹配,则会引发 RuntimeError。

    如果 output_ndim 值是单个值(而不是列表),它将被广播到 pipeline 的输出数量。

__enter__()#

安全地将 pipeline 设置为当前 pipeline。 调用具有副作用或没有输出的算子需要当前 pipeline。 此类算子的示例包括 PythonFunction(潜在的副作用)或 DumpImage(无输出)。

任何悬空算子都可以标记为具有副作用,如果它被标记为 preserve=True,这对于调试很有用 - 否则,不贡献 pipeline 输出的算子将从图中删除。

要手动设置新的(和恢复以前的)当前 pipeline,请分别使用 push_current()pop_current()

__exit__(exception_type, exception_value, traceback)#

安全地恢复以前的 pipeline。

add_sink(edge)#

将 edge 标记为数据接收器,防止其被修剪,即使它未连接到 pipeline 输出。

property batch_size#

Batch size。

build()#

构建 pipeline(可选步骤)。

实例化 pipeline 的后端对象并启动处理线程。 如果 pipeline 使用多进程 external_source,则 worker 进程也会启动。 在大多数情况下,无需手动调用 build。 当使用多进程时,可能需要在主进程与 GPU 进行任何交互之前调用 build()start_py_workers()。 如果需要,可以在运行 pipeline 之前使用 build() 来分离图构建和处理步骤。

Pipeline 在以下情况下会自动构建

checkpoint(filename=None)#

以序列化的 Protobuf 字符串形式返回 pipeline 的状态。

此外,如果指定了 filename,则序列化的 checkpoint 将被写入指定的文件。 文件内容将被覆盖。

稍后可以使用保存的 checkpoint 重建相同的 pipeline,并将其作为 checkpoint 参数传递,以从保存的迭代恢复执行。

更多详细信息可以在 此文档部分 中找到。

参数:

filename (str) – 序列化 pipeline 将写入的文件。

property cpu_queue_size#

CPU 阶段提前处理的迭代次数。

static current()#

返回由 push_current() 设置的当前 pipeline 的实例。

property default_cuda_stream_priority#

已弃用;始终为 0。

define_graph()#

此函数由用户定义,用于构建其 pipeline 的算子图。

它返回通过调用 DALI 算子创建的输出列表。

classmethod deserialize(serialized_pipeline=None, filename=None, **kwargs)#

反序列化并构建 pipeline。

反序列化先前使用 serialize() 方法序列化的 pipeline。

返回的 pipeline 已经构建完成。

或者,可以传递额外的参数,这些参数将在实例化 pipeline 时使用。 有关完整参数列表,请参阅 Pipeline 构造函数。 默认情况下,pipeline 将使用来自序列化 pipeline 的参数进行实例化。

请注意,serialized_pipelinefilename 参数是互斥的

参数:
  • serialized_pipeline (str) – 使用 serialize() 方法序列化的 Pipeline。

  • filename (str) – 将从中读取序列化 pipeline 的文件。

  • kwargs (dict) – 有关完整参数列表,请参阅 Pipeline 构造函数。

返回类型:

反序列化和构建的 pipeline。

deserialize_and_build(serialized_pipeline)#

反序列化并构建以序列化形式给出的 pipeline。

参数:

serialized_pipeline (str) – 序列化 pipeline。

property device_id#

pipeline 使用的 GPU 的 ID,对于仅 CPU pipeline 则为 None。

empty()#

如果 pipeline 中有任何已调度但尚未消耗的工作

enable_api_check(enable)#

允许在运行时启用或禁用 API 检查

property enable_memory_stats#

如果为 True,则收集内存使用统计信息。

epoch_size(name=None)#

pipeline 的 epoch size。

如果 name 参数为 None,则返回 (reader 名称, 该 reader 的 epoch size) 对的字典。 如果 name 参数不为 None,则返回该 reader 的 epoch size。

参数:

name (str, optional, default = None) – 应用于获取 epoch size 的 reader。

property exec_async#

如果为 true,则使用异步执行。

property exec_dynamic#

如果为 true,则使用动态执行器。

property exec_pipelined#

如果为 true,则使用 pipeline 执行模型。

property exec_separated#

如果为 True,则 CPU 和 GPU 阶段有单独的预取队列。

executor_statistics()#

以字典形式返回提供的 pipeline 执行器统计元数据。 字典中的每个键都是算子名称。 要启用它,请使用 executor_statistics

每个算子的可用元数据键

  • real_memory_size - 算子的每个输出使用的内存大小列表。 列表中的索引对应于输出索引。

  • max_real_memory_size - 算子的每个输出使用的最大 tensor 大小列表。 列表中的索引对应于输出索引。

  • reserved_memory_size - 为每个算子输出保留的内存大小列表。 列表中的索引对应于输出索引。

  • max_reserved_memory_size - 为每个算子输出保留的每个 tensor 的最大内存大小列表。 列表中的索引对应于输出索引。

注意

当使用 exec_dynamic=True 时,执行器统计信息不可用。

external_source_shm_statistics()#

返回关于共享内存消耗的并行外部源的统计信息。 返回的字典包含以下键

  • capacities - 分配用于容纳并行外部源生成的数据的共享内存槽的大小(以字节为单位)列表。

  • per_sample_capacities - 共享内存槽的大小(以字节为单位)除以 mini-batch size 的列表,即此类槽中存储的最大样本数。 此值对应于外部源的 bytes_per_sample_hint 参数,即,如果提示足够大并且外部源不需要重新分配内存,则这些值应相等。

feed_input(data_node, data, layout=None, cuda_stream=None, use_copy_kernel=False)#

将多维数组或 DLPack(或其列表)传递给符合条件的算子。

可以使用此函数提供数据的算子是输入算子(即 fn.inputs 模块中的所有内容)和 fn.external_source()

对于 GPU 输入,必须在与 feed_input 使用的流相同的流上修改数据。 有关详细信息,请参阅 cuda_stream 参数。

为了避免停顿,数据应提前 prefetch_queue_depth 次提供。

参数:
  • data_node (DataNode 或字符串) – 符合条件的算子节点的名称或调用该算子返回的 DataNode 对象。

  • data (ndarrayDLPack其列表) –

    数组可以是以下之一

    • NumPy ndarray (CPU)

    • MXNet ndarray (CPU)

    • PyTorch tensor (CPU 或 GPU)

    • CuPy array (GPU)

    • 实现 __cuda_array_interface__ 的对象

    • DALI TensorList 或 DALI Tensor 对象列表

    要用作 data_node 引用的算子的输出的数据。

  • layout (字符串 或 None) – 数据布局的描述(如果未指定,则为空字符串)。 它应该是长度与数据维度匹配的字符串,不包括 batch 维度。 对于通道优先图像的 batch,这应该是 "CHW",对于通道最后的视频,它是 "FHWC",依此类推。 如果 data 是 DALI TensorList 或 DALI Tensor 对象列表,并且 layoutNone,则布局取自 data。 数据布局在每次迭代中必须相同。

  • cuda_stream (可选, cudaStream_t 或可转换为 cudaStream_t 的对象) –

    例如 cupy.cuda.Stream, torch.cuda.Stream 将用于将数据复制到 GPU 或从 GPU 源复制数据的 CUDA 流。 如果未设置,将尽最大努力保持正确性 - 即,如果数据是作为来自已识别库(CuPy、PyTorch)的 tensor/array 提供的,则使用该库的当前流。 这应该适用于典型场景,但高级用例(和使用不受支持的库的代码)可能仍需要显式提供流句柄。

    特殊值

    • 0 - 使用默认 CUDA 流

    • -1 - 使用 DALI 的内部流

    如果使用内部流,则对 feed_input 的调用将阻塞,直到复制到内部缓冲区完成,因为无法与此流同步以防止在另一个流中用新数据覆盖数组。

  • use_copy_kernel (可选, bool) – 如果设置为 True,DALI 将使用 CUDA kernel 来馈送数据(仅适用于将数据复制到/从 GPU 内存复制数据时),而不是 cudaMemcpyAsync(默认)。

property gpu_queue_size#

GPU 阶段提前处理的迭代次数。

property is_restored_from_checkpoint#

如果为 True,则此 pipeline 是从 checkpoint 恢复的。

iter_setup()#

一种已弃用的为 pipeline 提供外部输入的方法。

用户定义的 pipeline 可以覆盖此函数,以执行每次迭代所需的任何设置。 例如,可以使用此函数从 NumPy 数组馈送输入数据。

此方法已弃用,不建议使用。 较新的执行模型可能与这种为 pipeline 提供数据的方法不兼容。 请尽可能使用 external_source 中的 source 参数代替。

property max_batch_size#

最大批大小。

property max_streams#

已弃用,未使用;返回 -1。

property num_outputs#

管道输出的数量。

property num_threads#

此管道使用的 CPU 线程数。

output_dtype()#

输出端期望的数据类型。

output_ndim()#

输出端期望的维度数。

output_stream()#

返回在其上生成输出的内部 CUDA 流。

outputs(cuda_stream=None)#

返回管道的输出并释放之前的缓冲区。

如果管道是异步执行的,则此函数会阻塞,直到结果可用。 如果数据集已到达末尾(通常在 iter_setup 无法生成更多数据时),则会引发 StopIteration。

参数:

cuda_stream (可选, cudaStream_t 或可转换为 cudaStream_t 的对象) – 例如 cupy.cuda.Stream, torch.cuda.Stream。 返回的 TensorLists 所绑定的流。 默认为 None,这意味着输出与主机同步。 仅适用于使用 exec_dynamic=True 构建的管道。

返回类型:

各个管道输出的 TensorList 对象列表

static pop_current()#

恢复之前的管道作为当前管道。 与 push_current() 互补。

property prefetch_queue_depth#

预取队列的深度(或深度),如 __init__ 参数中指定的那样。

static push_current(pipeline)#

将管道设置为当前管道并将之前的当前管道存储在堆栈上。 要恢复之前的管道作为当前管道,请使用 pop_current()

为了确保在发生异常时正确恢复管道,请使用上下文管理器 (with my_pipeline:)。

调用具有副作用或没有输出的操作符需要当前管道。 此类操作符的示例包括 PythonFunction (潜在的副作用) 或 DumpImage (没有输出)。

任何悬空算子都可以标记为具有副作用,如果它被标记为 preserve=True,这对于调试很有用 - 否则,不贡献 pipeline 输出的算子将从图中删除。

property py_num_workers#

并行 `external_source` 使用的 Python 工作进程数。

property py_start_method#

并行 `external_source` 使用的启动 Python 工作进程的方法。

reader_meta(name=None)#

以字典形式返回提供的读取器元数据。 如果未提供名称,则提供包含所有读取器数据的字典,格式为 {reader_name : meta}

可用的元数据键

epoch_size: 原始 epoch 大小

epoch_size_padded: epoch 大小,末尾填充以可被

分片数整除

number_of_shards: 分片数

shard_id: 给定读取器的分片 ID

pad_last_batch: 给定读取器是否应填充最后一个批次

stick_to_shard: 给定读取器是否应坚持其分片

参数:

name (str, 可选, 默认值 = None) – 应用于获取 shards_number 的读取器。

release_outputs()#

释放 share_outputs 调用返回的缓冲区。

当输出调用结果被消耗(复制)并且缓冲区可以在下次调用 share_outputs 之前标记为可用时,这很有帮助。 它为用户提供了更好的控制,可以控制何时运行管道,何时获取结果缓冲区以及何时可以在结果被消耗后将其返回到 DALI 池。 需要与 schedule_run()share_outputs() 一起使用。 不应与同一管道中的 run() 混合使用。

注意

当使用动态执行器 (exec_dynamic=True) 时,缓冲区不会失效。

reset()#

重置管道迭代器

如果管道迭代器到达末尾,则将其状态重置为开始。

run(cuda_stream=None, /, **pipeline_inputs)#

运行管道并在指定的 CUDA 流上返回结果。

如果管道创建时将 exec_pipelined 选项设置为 True,则此函数还将启动预取下一个迭代,以加快执行速度。 不应与同一管道中的 schedule_run(), share_outputs()release_outputs() 混合使用。

参数:
  • cuda_stream (可选, cudaStream_t 或可转换为 cudaStream_t 的对象) – 如果提供,则在此流上返回输出。 如果跳过,则结果是主机同步的。 请注意,当 prefetch_queue_depth>1 时,无需等待最近迭代的结果即可获得主机同步输出。

  • pipeline_inputs

    可用于为 DALI 提供输入的可选参数。 当 DALI 定义了任何输入操作符(例如 fn.external_source)时,您可以使用此函数中的命名参数为这些操作符提供输入。 假设 DALI 管道已正确定义和命名它们

    @pipeline_def
    def my_pipe():
        inp = fn.external_source(name="my_inp")
        return inp
    

    使用上面的示例管道,您可以将 "my_inp" 输入提供给 run() 函数

    p = my_pipe(prefetch_queue_depth=1, ...)
    p.run(my_inp=np.random((2,3,2)))
    

    run() 函数中指定的此类关键字参数必须在 DALI 管道中声明相应的输入操作符节点。

    与使用 DALI 时始终一样,传递给关键字参数的值必须表示整个批次的数据。

    请注意,使用此功能需要在 DALI Pipeline 构造函数中设置 prefetch_queue_depth=1exec_pipelined=False

    此功能可以被视为 feed_input() 函数的语法糖。

返回类型:

各个管道输出的 TensorList 对象元组

save_graph_to_dot_file(filename, *, show_tensors=False, show_ids=None, use_colors=False)#

将管道图保存到文件。

参数:
  • filename (str) – 图形写入的文件名。

  • show_tensors (bool) – 在图中显示 Tensor 节点(默认情况下仅显示操作符节点)

  • show_ids (bool, 已弃用) – 此标志已过时,不起作用

  • use_colors (bool) – 是否使用颜色来区分阶段

schedule_run()#

运行管道,但不返回结果缓冲区。

如果管道创建时将 exec_pipelined 选项设置为 True,则此函数还将启动预取下一个迭代,以加快执行速度。 它为用户提供了更好的控制,可以控制何时运行管道,何时获取结果缓冲区以及何时可以在结果被消耗后将其返回到 DALI 缓冲区池。 需要与 release_outputs()share_outputs() 一起使用。 不应与同一管道中的 run() 混合使用

property seed#

管道中使用的随机种子,如果种子未固定,则为 None。

serialize(define_graph=None, filename=None)#

将管道序列化为 Protobuf 字符串。

此外,您可以传递文件名,以便将序列化的管道写入该文件。 文件内容将被覆盖。

参数:
  • define_graph (callable) – 如果指定,则将使用此函数代替成员 define_graph()。 如果管道输出通过 set_outputs() 指定,则不得设置此参数。

  • filename (str) – 序列化管道将写入的文件。

  • kwargs (dict) – 有关完整参数列表,请参阅 Pipeline 构造函数。

property set_affinity#

如果为 True,则工作线程绑定到 CPU 核心。

set_outputs(*output_data_nodes)#

设置管道的输出。

此函数的使用是覆盖派生类中 define_graph 的替代方法。

参数:

*output_data_nodes (解包的 DataNode 对象列表) – 管道的输出

share_outputs(cuda_stream=None)#

返回管道的输出。

outputs() 的主要区别在于 share_outputs 不会释放返回的缓冲区,需要调用 release_outputs 才能释放。 如果管道是异步执行的,则此函数会阻塞,直到结果可用。 它为用户提供了更好的控制,可以控制何时运行管道,何时获取结果缓冲区以及何时可以在结果被消耗后将其返回到 DALI 池。 需要与 release_outputs()schedule_run() 一起使用。 不应与同一管道中的 run() 混合使用。

参数:

cuda_stream (可选, cudaStream_t 或可转换为 cudaStream_t 的对象) – 例如 cupy.cuda.Stream, torch.cuda.Stream。 返回的 TensorLists 所绑定的流。 默认为 None,这意味着输出与主机同步。 仅适用于使用 exec_dynamic=True 构建的管道。

返回:

  • 各个管道输出的 TensorList 对象列表。

  • 除非使用动态执行器,否则返回的缓冲区仅在

  • release_outputs() 被调用之前有效。

start_py_workers()#

启动 Python 工作进程(将运行 ExternalSource 回调)。 当使用 fork 启动 Python 工作进程 (py_start_method="fork") 时,您需要在调用任何创建或获取 CUDA 上下文的功能之前调用 start_py_workers()。 当不需要这种分离时,Pipeline.build() 方法会自动调用它。

如果您要构建多个通过 fork 进程启动 Python 工作进程的管道,则需要在调用任何构建或运行管道的方法之前,在所有这些管道上调用 start_py_workers() 方法(有关详细信息,请参阅 build()),因为构建会为当前进程获取 CUDA 上下文。

这同样适用于使用任何其他将创建 CUDA 上下文的功能 - 例如,初始化使用 CUDA 的框架或使用它创建 CUDA 张量。 当使用 py_start_method="fork" 时,您需要在调用此类功能之前调用 start_py_workers()

fork 具有 CUDA 上下文的进程不受支持,可能会导致意外错误。

如果您使用此方法,则在调用 build() 时,不能指定 define_graph 参数。

DataNode#

class nvidia.dali.pipeline.DataNode(name, device='cpu', source=None)#

此类是 TensorList 的符号表示,在图形定义阶段使用。 它不携带实际数据,但用于定义操作符之间的连接并指定管道输出。 有关详细信息,请参阅 Pipeline 的文档。

DataNode 对象可以作为输入(和一些命名关键字参数)传递给 DALI 操作符,但它们也提供算术运算,这些运算隐式创建执行表达式的适当操作符。

property(key, *, device='cpu')#

返回与 DataNode 关联的元数据属性

参数:
  • key (str) –

    元数据项的名称。 当前支持:“source_info” - 数据来源的数据集中的文件名或位置

    (每个样本都是 1D uint8 张量)

    “layout” - 布局字符串

    (每个样本都是 1D uint8 张量)

  • device (str, 可选) – 返回值的设备;默认为 CPU。

shape(*, dtype=None, device='cpu')#

将此 DataNode 的运行时形状作为新的 DataNode 返回

参数:
  • arg_dtype (DALIDataType, 可选) – 如果指定,形状将转换为此数据类型;默认为 INT64。

  • device (str, 可选) – 返回结果的设备(“cpu”或“gpu”);默认为 CPU。

source_info(*, device='cpu')#

返回 “source_info” 属性。 等效于 self.meta(“source_info”)。

实验性管道功能#

可以通过管道装饰器的特殊变体启用一些额外的实验性功能。

@nvidia.dali.pipeline.experimental.pipeline_def(fn=None, *, enable_conditionals=False, **pipeline_kwargs)#

启用其他实验性功能的 @pipeline_def 装饰器的变体。 它具有与其非实验性变体相同的 API,并添加了下面列出的关键字参数。

关键字参数:
  • debug (bool, 可选) –

    启用管道调试模式 - 允许对管道定义进行逐步执行和中间数据检查,默认为 False。

    注意

    此模式仅用于调试目的 - 管道性能将比非调试模式差得多。

  • 注意:: (..) – 由此装饰器启用的功能是实验性的。 API 可能会更改,功能可能会受到限制。

管道调试模式(实验性)#

可以通过将 @nvidia.dali.pipeline_def 装饰器替换为其实验性变体 @nvidia.dali.pipeline.experimental.pipeline_def 并将参数 debug 设置为 True,以调试模式运行管道。 它允许您访问和修改管道执行图中的数据,以及使用非 DALI 数据类型作为 DALI 操作符的输入。

在此模式下,操作符的输出类型为 DataNodeDebug,它等效于标准模式下的 DataNode。 您可以对 DataNodeDebug 类型的对象执行与 DataNode 相同的操作,包括算术运算。

在当前执行 Pipeline.run() 期间,使用 .get() 访问与 DataNodeDebug 对象关联的数据

@nvidia.dali.pipeline.experimental.pipeline_def(debug=True)
def my_pipe():
    data, _ = fn.readers.file(file_root=images_dir)
    img = fn.decoders.image(data)
    print(np.array(img.get()[0]))
    ...

直接将非 DALI 数据类型(例如 NumPy ndarray,PyTorch Tensor)与 DALI 操作符一起使用

@nvidia.dali.pipeline.experimental.pipeline_def(batch_size=8, debug=True)
def my_pipe():
    img = [np.random.rand(640, 480, 3) for _ in range(8)]
    output = fn.flip(img)
    ...

注意#

  • 调试模式下的种子生成与标准模式下的种子生成方式不同(它是确定性的但不同)。 如果您想在调试模式下获得与标准模式下相同的结果,请使用 seed 参数初始化操作符。

  • 对操作符的直接调用仅在 pipeline_def 函数的范围内有效,您不能在 pipeline_def 外部以这种方式使用它们。

  • 您不能在迭代之间更改管道内操作符的顺序。

警告

使用调试模式会大大降低管道的性能。 仅将其用于调试目的。

注意

此功能是实验性的,其 API 可能会在没有通知的情况下更改。