nvidia.dali.fn.external_source#
- nvidia.dali.fn.external_source(source=None, num_outputs=None, *, cycle=None, name=None, device='cpu', layout=None, dtype=None, ndim=None, cuda_stream=None, use_copy_kernel=None, blocking=None, batch=True, repeat_last=False, parallel=None, no_copy=None, prefetch_queue_depth=None, bytes_per_sample_hint=None, batch_info=None, **kwargs)#
创建一个数据节点,该节点使用来自 Python 来源的数据填充。
数据可以由
source
函数或可迭代对象提供,也可以由pipeline.feed_input(name, data, layout, cuda_stream)
提供。对于 GPU 输入,用户有责任仅使用提供的流修改提供的 GPU 内存内容(DALI 会在其上调度复制,并且所有工作都已正确排队)。如果没有提供流,则输入馈送会阻塞,直到提供的内存被复制到内部缓冲区。
注意
nvidia.dali.fn.external_source()
操作符通过nvidia.dali.plugin.tf.experimental.DALIDatasetWithInputs()
与 TensorFlow 集成部分兼容。请参阅其文档以了解详细信息。注意
要返回同一张量的批量副本,请使用
nvidia.dali.types.Constant()
,这更高效。- 参数:
source¶ (callable 或 iterable) –
数据的来源。
当 pipeline 需要下一个迭代的输入时,会轮询数据源(通过调用
source()
或next(source)
)。根据num_outputs
的值,源可以提供一个或多个数据项。数据项可以是整个批次(默认)或单个批次条目(当batch==False
时)。如果未设置num_outputs
,则预期source
返回一个项目(一个批次或一个样本)。如果指定了此值(即使其值为 1),则预期数据为元组或列表,其中每个元素对应于 external_source 的相应返回值。数据样本必须是以下兼容的数组类型之一
NumPy ndarray (CPU)
MXNet ndarray (CPU)
PyTorch 张量 (CPU 或 GPU)
CuPy 数组 (GPU)
实现
__cuda_array_interface__
的对象DALI Tensor 对象
批次源必须生成整个批次的数据。这可以通过向数组添加新的最外层维度或返回数组列表来实现(在这种情况下,它们的大小可以不同,但必须具有相同的秩和元素类型)。批次源还可以生成 DALI TensorList 对象,它可以是另一个 DALI pipeline 的输出。
每个批次的源可以接受一个位置参数。如果接受,则它是 epoch 中当前迭代的索引,后续调用将是
source(0)
、source(1)
,依此类推。如果batch_info
设置为 True,则nvidia.dali.types.BatchInfo
的实例将传递给源,而不是纯索引。每个样本的源可以接受一个类型为
nvidia.dali.types.SampleInfo
的位置参数,其中包含当前 epoch 和批次中样本的索引,以及当前迭代次数。如果源是生成器函数,则会调用该函数并将其视为可迭代对象。但是,与生成器不同,该函数可以与
cycle
一起使用。在这种情况下,当生成器到达迭代结束时,将再次调用该函数。对于 GPU 输入,用户有责任仅在提供的流中修改提供的 GPU 内存内容。DALI 在此流上调度复制,并且所有工作都已正确排队。如果没有提供流,DALI 将使用默认流,并尽最大努力确保正确性。有关更多信息,请参阅
cuda_stream
参数文档。注意
从检查点恢复后,对作为单参数可调用对象(接受
nvidia.dali.types.SampleInfo
、nvidia.dali.types.BatchInfo
或批次索引)的源的查询将从检查点中保存的 epoch 和迭代恢复。num_outputs¶ (int, optional) –
如果指定,则表示源函数生成的 TensorList 的数量。
如果设置,则操作符返回
DataNode
对象列表,否则返回单个DataNode
对象。
- 关键字参数:
cycle¶ (string 或 bool, optional) –
指定是否以及如何循环遍历源。它可以是以下值之一
"no"
、False
或None
- 不循环;当
到达数据末尾时引发
StopIteration
;这是默认行为 *"quiet"
或True
- 无限期地重复数据,*"raise"
- 当到达数据末尾时,引发StopIteration
,但在后续调用中重新启动迭代。此标志要求
source
是一个集合,例如,一个可迭代对象,其中iter(source)
在每次调用时返回一个新的迭代器,或一个生成器函数。在后一种情况下,当请求的数据量超过函数产生的数据量时,将再次调用生成器函数。指定
"raise"
可以与 DALI 迭代器一起使用以创建 epoch 的概念。name¶ (str, optional) –
数据节点的名称。
当通过调用
feed_input
馈送数据时使用,如果数据由source
提供,则可以省略。layout¶ (layout str 或其列表/元组,可选) –
如果提供,则设置数据的布局。
当
num_outputs > 1
时,布局可以是列表,其中包含每个输出的不同布局。如果列表的元素少于num_outputs
,则只有第一个输出设置了布局,其余输出没有设置布局。dtype¶ (nvidia.dali.types.DALIDataType 或其列表/元组,可选) –
输入数据类型。
当
num_outputs > 1
时,dtype
可以是列表,其中包含每个输出的不同值。操作符将验证获取的数据是否为提供的类型。如果省略参数或传递了
DALIDataType.NO_TYPE
,则操作符将从提供的数据中推断类型。从 DALI 2.0 开始,此参数将是必需的。
ndim¶ (int 或 列表/元组, optional) –
输入数据中的维度数。
当
num_outputs > 1
时,ndim
可以是列表,其中包含每个输出的不同值。将根据此值验证提供给操作符的数据的维度。如果提供了
layout
参数,也可以从该参数推断维度数。如果提供了
layout
参数,则ndim
必须与布局中的维度数匹配。从 DALI 2.0 开始,将需要指定输入维度。
cuda_stream¶ (optional,
cudaStream_t
或可转换为cudaStream_t
的对象) –例如
cupy.cuda.Stream
或torch.cuda.Stream
。CUDA 流用于将数据复制到 GPU 或从 GPU 源复制数据。如果未设置此参数,将尽最大努力保持正确性。也就是说,如果数据是从可识别的库(如 CuPy 或 PyTorch)提供的张量/数组,则使用该库的当前流。尽管这种方法在典型情况下有效,但在高级用例和使用不受支持的库的代码中,您可能需要显式提供流句柄。
此参数有两个特殊值
0 - 使用默认 CUDA 流
1 - 使用 DALI 的内部流
如果使用内部流,则调用
feed_input
将阻塞,直到复制到内部缓冲区完成,因为无法与此流同步以防止在另一个流中使用新数据覆盖数组。use_copy_kernel¶ (bool, optional) –
如果设置为 True,DALI 将使用 CUDA 内核来馈送数据,而不是 cudaMemcpyAsync(默认)。
注意
这仅在将数据复制到 GPU 内存和从 GPU 内存复制数据时适用。
blocking¶ (bool, optional) – 高级 如果为 True,则此操作符将阻塞,直到数据可用(例如,通过调用
feed_input
)。如果为 False,则如果数据不可用,则操作符将引发错误。no_copy¶ (bool, optional) –
确定在调用 feed_input 时 DALI 是否应复制缓冲区。
如果设置为 True,DALI 会将用户内存直接传递给 pipeline,而不是复制它。用户有责任保持缓冲区处于活动状态且未修改,直到 pipeline 使用它为止。
在相关迭代的输出被消耗后,可以再次修改或释放缓冲区。实际上,这发生在 Pipeline 的
prefetch_queue_depth
或cpu_queue_depth * gpu_queue_depth
(当它们不相等时)在feed_input
调用之后的迭代之后。内存位置必须与操作符的指定
device
参数匹配。对于 CPU,提供的内存可以是一个连续的缓冲区或一个连续张量列表。对于 GPU,为避免额外的复制,提供的缓冲区必须是连续的。如果您提供单独张量的列表,则会在内部进行额外的复制,从而消耗内存和带宽。当
parallel=True
时自动设置为True
batch¶ (bool, optional) –
如果设置为 True 或 None,则期望
source
一次生成整个批次。如果设置为 False,则source
将按每个样本调用。batch_info¶ (bool, 可选, 默认值 = False) – 控制可调用
source
(接受参数并返回批次) 是否应接收 class:~nvidia.dali.types.BatchInfo 实例,或者仅接收表示迭代次数的整数。如果设置为 False(默认值),则仅传递整数。如果source
不可调用、不接受参数或batch
设置为 False,则设置此标志无效。parallel¶ (bool, 可选, 默认值 = False) –
如果设置为 True,则相应的 pipeline 将启动一个 Python 工作进程池,以并行运行回调。您可以通过将
py_num_workers
传递到 pipeline 的构造函数中来指定工作进程的数量。当
parallel
设置为 True 时,source
返回的样本必须是 NumPy/MXNet/PyTorch CPU 数组或 TensorCPU 实例。可接受的 source 取决于为
batch
参数指定的值。如果
batch
设置为False
,则source
必须是- 一个可调用对象(函数或具有
__call__
方法的对象),它接受 恰好一个参数(
SampleInfo
实例,表示请求样本的索引)。
- 一个可调用对象(函数或具有
如果
batch
设置为True
,则source
可以是以下任一类型- 一个可调用对象,它接受恰好一个参数(
BatchInfo
实例或整数 - 有关详细信息,请参阅
batch_info
)
- 一个可调用对象,它接受恰好一个参数(
一个可迭代对象,
一个生成器函数。
警告
无论
batch
值如何,可调用对象都应该是无状态的 - 它们应仅根据SampleInfo
/BatchInfo
实例或批次中的索引来生成请求的样本或批次,以便它们可以在多个工作进程中并行运行。当数据结束时,
source
回调必须引发StopIteration
异常。请注意,由于预取,回调可能会在数据集结束后被调用几次 - 请确保在这种情况下它始终如一地引发StopIteration
异常。警告
当 pipeline 启用了条件执行时,必须采取额外的步骤来防止
source
被 AutoGraph 重写。有两种方法可以实现这一点在全局范围内定义函数(即在
pipeline_def
范围之外)。
2. 如果函数是另一个“工厂”函数的result,则工厂函数必须在 pipeline 定义函数之外定义,并使用
@do_not_convert
进行装饰。更多详细信息可以在
@do_not_convert
文档中找到。注意
可调用
source
可以由多个工作进程并行运行。对于batch=True
,可以并行准备多个批次,对于batch=False
,可以在批次内并行计算。当
batch=True
时,可调用对象的性能可能尤其受益于增加prefetch_queue_depth
,以便可以并行计算接下来的几个批次。注意
迭代器或生成器函数将被分配给单个工作进程,该工作进程将迭代它们。主要优点是与主 Python 进程并行执行,但由于它们的状态,不可能一次计算多个批次。
repeat_last¶ (bool, 可选, 默认值 = False) –
注意
这是一个高级设置,主要用于具有解耦模型的 Triton Inference Server。
通常,
external_source
会消耗其输入数据,并期望在即将到来的迭代中输入新的数据。设置repeat_last=True
会更改此行为,以便external_source
将检测到在前一个 pipeline 运行和当前运行之间没有输入新数据,并将使用最近的数据自行重新输入。仅在“推送”模式下,即当数据由用户通过调用
feed_input
主动提供时,将repeat_last
设置为 True 才有意义。启用此选项与指定source
不兼容,这会使external_source
在“拉取”模式下运行。prefetch_queue_depth¶ (int, 可选, 默认值 = 1) – 当在
parallel=True
模式下运行时,指定要预先计算并存储在内部缓冲区中的批次数量,否则将忽略此参数。bytes_per_sample_hint¶ (int, 可选, 默认值 = None) –
如果在
parallel=True
模式下指定,则该值在计算工作进程用于将并行外部 source 输出传递到 pipeline 的共享内存槽的初始容量时,充当提示。在非并行模式下,此参数将被忽略。设置足够大的值以容纳传入的数据可以防止 DALI 在 pipeline 运行期间重新分配共享内存。此外,手动提供提示可以防止 DALI 高估必要的共享内存容量。
该值必须为正整数。请注意,共享内存中的样本附带一些内部元数据,因此,对共享内存的实际需求略高于外部 source 生成的二进制数据的大小。实际元数据大小取决于许多因素,例如,可能会在 Python 或 DALI 版本之间更改,恕不另行通知。
请参阅 pipeline 的
external_source_shm_statistics
以检查为 pipeline 的并行外部 source 生成的数据分配了多少共享内存。