DALI 表达式和算术运算符#

此示例向您展示如何在 DALI Pipeline 中使用二元算术运算符,这些运算符允许对 pipeline 中的张量执行元素级操作。我们将提供有关可用运算符的信息以及使用常量和标量输入的示例。

支持的运算符#

DALI 当前支持以下运算符

  • 一元算术运算符:+, -;

  • 二元算术运算符:+, -, *, /, 和 //;

  • 比较运算符:==, !=, <, <=, >, >=;

  • 按位二元运算符:&, |, ^

DALI 还支持 nvidia.dali.math 模块中公开的一组数学函数

  • abs, fabs, floor, ceil, pow, fpow, min, max, clamp;

  • 指数和对数:sqrt, rsqrt, cbrt, exp, log, log2, log10;

  • 三角函数:sin, cos, tan, asin, acos, atan, atan2;

  • 双曲函数:sinh, cosh, tanh, asinh, acosh, atanh;

二元运算符可以用作两个张量之间、张量和标量之间或张量和常量之间的运算。通过张量,我们考虑 DALI 运算符(常规运算符或其他算术运算符)的输出。一元运算符仅适用于张量输入。

在本节中,我们重点介绍二元算术运算符、张量、常量和标量操作数。比较和按位运算符的详细类型提升规则在**支持的操作**部分和其他示例中介绍。

准备测试 Pipeline#

  1. 准备辅助代码,以便我们可以轻松地操作将作为张量出现在 DALI pipeline 中的类型和值。

  2. 我们将使用 numpy 作为自定义提供数据的源,因此我们需要从 DALI 导入几个内容,以创建 Pipeline 并使用 ExternalSource 运算符。

[1]:
import numpy as np
from nvidia.dali.pipeline import pipeline_def
import nvidia.dali.fn as fn
import nvidia.dali.types as types
from nvidia.dali.types import Constant

定义数据#

为了定义数据,由于存在二元运算符,因此需要两个输入。我们将创建一个简单的辅助函数,该函数返回两个批次的硬编码数据,这些数据存储为 np.int32。在实际场景中,由 DALI 算术运算符处理的数据将是由另一个运算符生成的张量,该运算符包含一些图像、视频序列或其他数据。

您可以通过更改这些值或调整 get_data() 函数以使用不同的输入数据来进行实验。

**注意**:两个输入的形状需要完全匹配或根据广播规则兼容 - 运算是逐元素执行的。

[2]:
left_magic_values = [[[42, 7, 0], [0, 0, 0]], [[5, 10, 15], [10, 100, 1000]]]

right_magic_values = [[[3, 3, 3], [1, 3, 5]], [[1, 5, 5], [1, 1, 1]]]

batch_size = len(left_magic_values)


def convert_batch(batch):
    return [np.int32(tensor) for tensor in batch]


def get_data():
    return (convert_batch(left_magic_values), convert_batch(right_magic_values))

对张量进行运算#

定义 Pipeline#

  1. 为了定义 pipeline,数据将从 get_data 函数获取,并通过 ExternalSource 提供给 pipeline。

**注意**:您无需实例化任何其他运算符,我们可以在其他运算符的结果上使用常规 Python 算术表达式。

  1. 您可以通过添加、乘法和除法来操作源数据。

[3]:
@pipeline_def(batch_size=batch_size, num_threads=4, device_id=0)
def pipeline():
    l, r = fn.external_source(source=get_data, num_outputs=2, dtype=types.INT32)
    sum_result = l + r
    mul_result = l * r
    div_result = l // r
    return l, r, sum_result, mul_result, div_result

运行 Pipeline#

  1. 构建并运行我们的 pipeline

[4]:
pipe = pipeline()
pipe.build()
out = pipe.run()
  1. 显示结果

[5]:
def examine_output(pipe_out):
    l = pipe_out[0].as_array()
    r = pipe_out[1].as_array()
    sum_out = pipe_out[2].as_array()
    mul_out = pipe_out[3].as_array()
    div_out = pipe_out[4].as_array()
    print("{}\n+\n{}\n=\n{}\n\n".format(l, r, sum_out))
    print("{}\n*\n{}\n=\n{}\n\n".format(l, r, mul_out))
    print("{}\n//\n{}\n=\n{}\n\n".format(l, r, div_out))


examine_output(out)
[[[  42    7    0]
  [   0    0    0]]

 [[   5   10   15]
  [  10  100 1000]]]
+
[[[3 3 3]
  [1 3 5]]

 [[1 5 5]
  [1 1 1]]]
=
[[[  45   10    3]
  [   1    3    5]]

 [[   6   15   20]
  [  11  101 1001]]]


[[[  42    7    0]
  [   0    0    0]]

 [[   5   10   15]
  [  10  100 1000]]]
*
[[[3 3 3]
  [1 3 5]]

 [[1 5 5]
  [1 1 1]]]
=
[[[ 126   21    0]
  [   0    0    0]]

 [[   5   50   75]
  [  10  100 1000]]]


[[[  42    7    0]
  [   0    0    0]]

 [[   5   10   15]
  [  10  100 1000]]]
//
[[[3 3 3]
  [1 3 5]]

 [[1 5 5]
  [1 1 1]]]
=
[[[  14    2    0]
  [   0    0    0]]

 [[   5    2    3]
  [  10  100 1000]]]


生成的张量是通过对其输入的相应元素应用算术运算获得的。

除了我们将在下一节中描述的标量张量输入之外,参数的形状应该是兼容的 - 要么完全匹配,要么是可广播的,否则您将收到错误。

常量和标量操作数#

到目前为止,我们仅考虑了算术运算符输入的匹配形状的张量输入。DALI 允许其中一个操作数是常量或一批标量,并且此类操作数可以出现在二元表达式的两侧。

常量#

算术运算符的常量操作数可以是以下选项之一

  • 直接使用的 Python intfloat 类型的值。

  • 包装在 nvidia.dali.types.Constant 中的值。

张量和常量之间的运算会导致常量广播到所有张量元素。

**注意**:目前,整数常量的值在内部作为 int32 传递给 DALI,浮点常量的值作为 float32 传递给 DALI。

关于类型提升,Python int 值将被视为 int32float 值将被视为 float32

DALI Constant 可用于指示其他类型。它接受 DALIDataType 枚举值作为第二个参数,并具有方便的成员函数,例如 .uint8().float32(),可用于转换。

使用常量#

  1. 调整 Pipeline 以利用常量。

[6]:
@pipeline_def(batch_size=batch_size, num_threads=4, device_id=0)
def pipeline_2():
    l, r = fn.external_source(source=get_data, num_outputs=2, dtype=types.INT32)
    add_200 = l + 200
    mul_075 = l * 0.75
    sub_15 = Constant(15).float32() - r
    return l, r, add_200, mul_075, sub_15
[7]:
pipe = pipeline_2()
pipe.build()
out = pipe.run()
  1. 显示结果

[8]:
def examine_output(pipe_out):
    l = pipe_out[0].as_array()
    r = pipe_out[1].as_array()
    add_200 = pipe_out[2].as_array()
    mul_075 = pipe_out[3].as_array()
    sub_15 = pipe_out[4].as_array()
    print("{}\n+ 200 =\n{}\n\n".format(l, add_200))
    print("{}\n* 0.75 =\n{}\n\n".format(l, mul_075))
    print("15 -\n{}\n=\n{}\n\n".format(r, sub_15))


examine_output(out)
[[[  42    7    0]
  [   0    0    0]]

 [[   5   10   15]
  [  10  100 1000]]]
+ 200 =
[[[ 242  207  200]
  [ 200  200  200]]

 [[ 205  210  215]
  [ 210  300 1200]]]


[[[  42    7    0]
  [   0    0    0]]

 [[   5   10   15]
  [  10  100 1000]]]
* 0.75 =
[[[ 31.5    5.25   0.  ]
  [  0.     0.     0.  ]]

 [[  3.75   7.5   11.25]
  [  7.5   75.   750.  ]]]


15 -
[[[3 3 3]
  [1 3 5]]

 [[1 5 5]
  [1 1 1]]]
=
[[[12. 12. 12.]
  [14. 12. 10.]]

 [[14. 10. 10.]
  [14. 14. 14.]]]


常量值与批次中所有张量的所有元素一起使用。

广播#

术语“广播”指的是在数学表达式中如何处理不同形状的张量。较小张量中的值被“广播”,因此它有助于多个输出值。最简单地说,标量值被广播到所有输出值。两个批次之间的广播可以看作是这个概念的推广。

在更复杂的情况下,如果其中一个操作数的大小为 1 而另一个操作数较大,则值可以沿某些维度广播。例如,当添加到形状为 (2, 3) 的张量时,形状为 (1, 3) 的张量可以沿最外层维度广播。让我们添加两个这样的常量并查看结果。

[9]:
@pipeline_def(batch_size=1, num_threads=4, device_id=0)
def pipeline_3():
    left = Constant(np.float32([[1.0, 2.0, 3.0]]))
    right = Constant(np.float32([[-5.0, -6.0, -7.0], [10.0, 20.0, 30.0]]))
    return left + right
[10]:
pipe = pipeline_3()
pipe.build()
(out,) = pipe.run()
print(out)
TensorListCPU(
    [[[-4. -4. -4.]
      [11. 22. 33.]]],
    dtype=DALIDataType.FLOAT,
    num_samples=1,
    shape=[(2, 3)])

检查输出,我们可以看到 left 张量已添加到 right 张量的两行。

实际上,我们不必创建前导维度等于 1 的张量 - 所有缺失的维度都将用 1 填充。例如,形状为 (3)(800, 600, 3) 的张量之间的广播操作等效于 (1, 1, 3)(800, 600, 3) 之间的广播。

请记住,每个样本都是单独广播的。

广播标量#

标量批次(0D 张量)可以针对任何其他批次进行广播。

  1. 使用 ExternalSource 生成数字序列,这些数字将添加到张量操作数。

[11]:
@pipeline_def(batch_size=batch_size, num_threads=4, device_id=0)
def pipeline_4():
    tensors = fn.external_source(lambda: get_data()[0], dtype=types.INT32)
    scalars = fn.external_source(
        lambda: np.arange(1, batch_size + 1), dtype=types.INT64
    )
    return tensors, scalars, tensors + scalars
  1. 构建并运行 Pipeline。

[12]:
pipe = pipeline_4()
pipe.build()
out = pipe.run()
[13]:
def examine_output(pipe_out):
    t = pipe_out[0].as_array()
    scalar = pipe_out[1].as_array()
    result = pipe_out[2].as_array()
    print("{}\n+\n{}\n=\n{}".format(t, scalar, result))


examine_output(out)
[[[  42    7    0]
  [   0    0    0]]

 [[   5   10   15]
  [  10  100 1000]]]
+
[1 2]
=
[[[  43    8    1]
  [   1    1    1]]

 [[   7   12   17]
  [  12  102 1002]]]

批次中的第一个标量 (1) 添加到第一个张量中的所有元素,第二个标量 (2) 添加到第二个张量。