nvidia.dali.fn.external_source#
- nvidia.dali.fn.external_source(source=None, num_outputs=None, *, cycle=None, name=None, device='cpu', layout=None, dtype=None, ndim=None, cuda_stream=None, use_copy_kernel=None, blocking=None, batch=True, repeat_last=False, parallel=None, no_copy=None, prefetch_queue_depth=None, bytes_per_sample_hint=None, batch_info=None, **kwargs)#
创建一个数据节点,该节点使用来自 Python 来源的数据填充。
数据可以由
source函数或可迭代对象提供,也可以由pipeline.feed_input(name, data, layout, cuda_stream)提供。对于 GPU 输入,用户有责任仅使用提供的流修改提供的 GPU 内存内容(DALI 会在其上调度复制,并且所有工作都已正确排队)。如果没有提供流,则输入馈送会阻塞,直到提供的内存被复制到内部缓冲区。
注意
nvidia.dali.fn.external_source()操作符通过nvidia.dali.plugin.tf.experimental.DALIDatasetWithInputs()与 TensorFlow 集成部分兼容。请参阅其文档以了解详细信息。注意
要返回同一张量的批量副本,请使用
nvidia.dali.types.Constant(),这更高效。- 参数:
source¶ (callable 或 iterable) –
数据的来源。
当 pipeline 需要下一个迭代的输入时,会轮询数据源(通过调用
source()或next(source))。根据num_outputs的值,源可以提供一个或多个数据项。数据项可以是整个批次(默认)或单个批次条目(当batch==False时)。如果未设置num_outputs,则预期source返回一个项目(一个批次或一个样本)。如果指定了此值(即使其值为 1),则预期数据为元组或列表,其中每个元素对应于 external_source 的相应返回值。数据样本必须是以下兼容的数组类型之一
NumPy ndarray (CPU)
MXNet ndarray (CPU)
PyTorch 张量 (CPU 或 GPU)
CuPy 数组 (GPU)
实现
__cuda_array_interface__的对象DALI Tensor 对象
批次源必须生成整个批次的数据。这可以通过向数组添加新的最外层维度或返回数组列表来实现(在这种情况下,它们的大小可以不同,但必须具有相同的秩和元素类型)。批次源还可以生成 DALI TensorList 对象,它可以是另一个 DALI pipeline 的输出。
每个批次的源可以接受一个位置参数。如果接受,则它是 epoch 中当前迭代的索引,后续调用将是
source(0)、source(1),依此类推。如果batch_info设置为 True,则nvidia.dali.types.BatchInfo的实例将传递给源,而不是纯索引。每个样本的源可以接受一个类型为
nvidia.dali.types.SampleInfo的位置参数,其中包含当前 epoch 和批次中样本的索引,以及当前迭代次数。如果源是生成器函数,则会调用该函数并将其视为可迭代对象。但是,与生成器不同,该函数可以与
cycle一起使用。在这种情况下,当生成器到达迭代结束时,将再次调用该函数。对于 GPU 输入,用户有责任仅在提供的流中修改提供的 GPU 内存内容。DALI 在此流上调度复制,并且所有工作都已正确排队。如果没有提供流,DALI 将使用默认流,并尽最大努力确保正确性。有关更多信息,请参阅
cuda_stream参数文档。注意
从检查点恢复后,对作为单参数可调用对象(接受
nvidia.dali.types.SampleInfo、nvidia.dali.types.BatchInfo或批次索引)的源的查询将从检查点中保存的 epoch 和迭代恢复。num_outputs¶ (int, optional) –
如果指定,则表示源函数生成的 TensorList 的数量。
如果设置,则操作符返回
DataNode对象列表,否则返回单个DataNode对象。
- 关键字参数:
cycle¶ (string 或 bool, optional) –
指定是否以及如何循环遍历源。它可以是以下值之一
"no"、False或None- 不循环;当
到达数据末尾时引发
StopIteration;这是默认行为 *"quiet"或True- 无限期地重复数据,*"raise"- 当到达数据末尾时,引发StopIteration,但在后续调用中重新启动迭代。此标志要求
source是一个集合,例如,一个可迭代对象,其中iter(source)在每次调用时返回一个新的迭代器,或一个生成器函数。在后一种情况下,当请求的数据量超过函数产生的数据量时,将再次调用生成器函数。指定
"raise"可以与 DALI 迭代器一起使用以创建 epoch 的概念。name¶ (str, optional) –
数据节点的名称。
当通过调用
feed_input馈送数据时使用,如果数据由source提供,则可以省略。layout¶ (layout str 或其列表/元组,可选) –
如果提供,则设置数据的布局。
当
num_outputs > 1时,布局可以是列表,其中包含每个输出的不同布局。如果列表的元素少于num_outputs,则只有第一个输出设置了布局,其余输出没有设置布局。dtype¶ (nvidia.dali.types.DALIDataType 或其列表/元组,可选) –
输入数据类型。
当
num_outputs > 1时,dtype可以是列表,其中包含每个输出的不同值。操作符将验证获取的数据是否为提供的类型。如果省略参数或传递了
DALIDataType.NO_TYPE,则操作符将从提供的数据中推断类型。从 DALI 2.0 开始,此参数将是必需的。
ndim¶ (int 或 列表/元组, optional) –
输入数据中的维度数。
当
num_outputs > 1时,ndim可以是列表,其中包含每个输出的不同值。将根据此值验证提供给操作符的数据的维度。如果提供了
layout参数,也可以从该参数推断维度数。如果提供了
layout参数,则ndim必须与布局中的维度数匹配。从 DALI 2.0 开始,将需要指定输入维度。
cuda_stream¶ (optional,
cudaStream_t或可转换为cudaStream_t的对象) –例如
cupy.cuda.Stream或torch.cuda.Stream。CUDA 流用于将数据复制到 GPU 或从 GPU 源复制数据。如果未设置此参数,将尽最大努力保持正确性。也就是说,如果数据是从可识别的库(如 CuPy 或 PyTorch)提供的张量/数组,则使用该库的当前流。尽管这种方法在典型情况下有效,但在高级用例和使用不受支持的库的代码中,您可能需要显式提供流句柄。
此参数有两个特殊值
0 - 使用默认 CUDA 流
1 - 使用 DALI 的内部流
如果使用内部流,则调用
feed_input将阻塞,直到复制到内部缓冲区完成,因为无法与此流同步以防止在另一个流中使用新数据覆盖数组。use_copy_kernel¶ (bool, optional) –
如果设置为 True,DALI 将使用 CUDA 内核来馈送数据,而不是 cudaMemcpyAsync(默认)。
注意
这仅在将数据复制到 GPU 内存和从 GPU 内存复制数据时适用。
blocking¶ (bool, optional) – 高级 如果为 True,则此操作符将阻塞,直到数据可用(例如,通过调用
feed_input)。如果为 False,则如果数据不可用,则操作符将引发错误。no_copy¶ (bool, optional) –
确定在调用 feed_input 时 DALI 是否应复制缓冲区。
如果设置为 True,DALI 会将用户内存直接传递给 pipeline,而不是复制它。用户有责任保持缓冲区处于活动状态且未修改,直到 pipeline 使用它为止。
在相关迭代的输出被消耗后,可以再次修改或释放缓冲区。实际上,这发生在 Pipeline 的
prefetch_queue_depth或cpu_queue_depth * gpu_queue_depth(当它们不相等时)在feed_input调用之后的迭代之后。内存位置必须与操作符的指定
device参数匹配。对于 CPU,提供的内存可以是一个连续的缓冲区或一个连续张量列表。对于 GPU,为避免额外的复制,提供的缓冲区必须是连续的。如果您提供单独张量的列表,则会在内部进行额外的复制,从而消耗内存和带宽。当
parallel=True时自动设置为Truebatch¶ (bool, optional) –
如果设置为 True 或 None,则期望
source一次生成整个批次。如果设置为 False,则source将按每个样本调用。batch_info¶ (bool, 可选, 默认值 = False) – 控制可调用
source(接受参数并返回批次) 是否应接收 class:~nvidia.dali.types.BatchInfo 实例,或者仅接收表示迭代次数的整数。如果设置为 False(默认值),则仅传递整数。如果source不可调用、不接受参数或batch设置为 False,则设置此标志无效。parallel¶ (bool, 可选, 默认值 = False) –
如果设置为 True,则相应的 pipeline 将启动一个 Python 工作进程池,以并行运行回调。您可以通过将
py_num_workers传递到 pipeline 的构造函数中来指定工作进程的数量。当
parallel设置为 True 时,source返回的样本必须是 NumPy/MXNet/PyTorch CPU 数组或 TensorCPU 实例。可接受的 source 取决于为
batch参数指定的值。如果
batch设置为False,则source必须是- 一个可调用对象(函数或具有
__call__方法的对象),它接受 恰好一个参数(
SampleInfo实例,表示请求样本的索引)。
- 一个可调用对象(函数或具有
如果
batch设置为True,则source可以是以下任一类型- 一个可调用对象,它接受恰好一个参数(
BatchInfo 实例或整数 - 有关详细信息,请参阅
batch_info)
- 一个可调用对象,它接受恰好一个参数(
一个可迭代对象,
一个生成器函数。
警告
无论
batch值如何,可调用对象都应该是无状态的 - 它们应仅根据SampleInfo/BatchInfo实例或批次中的索引来生成请求的样本或批次,以便它们可以在多个工作进程中并行运行。当数据结束时,
source回调必须引发StopIteration异常。请注意,由于预取,回调可能会在数据集结束后被调用几次 - 请确保在这种情况下它始终如一地引发StopIteration异常。警告
当 pipeline 启用了条件执行时,必须采取额外的步骤来防止
source被 AutoGraph 重写。有两种方法可以实现这一点在全局范围内定义函数(即在
pipeline_def范围之外)。
2. 如果函数是另一个“工厂”函数的result,则工厂函数必须在 pipeline 定义函数之外定义,并使用
@do_not_convert进行装饰。更多详细信息可以在
@do_not_convert文档中找到。注意
可调用
source可以由多个工作进程并行运行。对于batch=True,可以并行准备多个批次,对于batch=False,可以在批次内并行计算。当
batch=True时,可调用对象的性能可能尤其受益于增加prefetch_queue_depth,以便可以并行计算接下来的几个批次。注意
迭代器或生成器函数将被分配给单个工作进程,该工作进程将迭代它们。主要优点是与主 Python 进程并行执行,但由于它们的状态,不可能一次计算多个批次。
repeat_last¶ (bool, 可选, 默认值 = False) –
注意
这是一个高级设置,主要用于具有解耦模型的 Triton Inference Server。
通常,
external_source会消耗其输入数据,并期望在即将到来的迭代中输入新的数据。设置repeat_last=True会更改此行为,以便external_source将检测到在前一个 pipeline 运行和当前运行之间没有输入新数据,并将使用最近的数据自行重新输入。仅在“推送”模式下,即当数据由用户通过调用
feed_input主动提供时,将repeat_last设置为 True 才有意义。启用此选项与指定source不兼容,这会使external_source在“拉取”模式下运行。prefetch_queue_depth¶ (int, 可选, 默认值 = 1) – 当在
parallel=True模式下运行时,指定要预先计算并存储在内部缓冲区中的批次数量,否则将忽略此参数。bytes_per_sample_hint¶ (int, 可选, 默认值 = None) –
如果在
parallel=True模式下指定,则该值在计算工作进程用于将并行外部 source 输出传递到 pipeline 的共享内存槽的初始容量时,充当提示。在非并行模式下,此参数将被忽略。设置足够大的值以容纳传入的数据可以防止 DALI 在 pipeline 运行期间重新分配共享内存。此外,手动提供提示可以防止 DALI 高估必要的共享内存容量。
该值必须为正整数。请注意,共享内存中的样本附带一些内部元数据,因此,对共享内存的实际需求略高于外部 source 生成的二进制数据的大小。实际元数据大小取决于许多因素,例如,可能会在 Python 或 DALI 版本之间更改,恕不另行通知。
请参阅 pipeline 的
external_source_shm_statistics以检查为 pipeline 的并行外部 source 生成的数据分配了多少共享内存。