归约算子#

本节介绍如何使用来自 reductions 模块的算子。

  1. 从一个基于 ExternalSource 的简单 pipeline 开始。输入每个批次有两个样本。两个样本的形状都是 (3, 3)。第一个包含连续数字,第二个包含连续偶数。这将有助于可视化可能的归约。

[1]:
import nvidia.dali.fn as fn
import nvidia.dali.types as types
import nvidia.dali.backend as backend
from nvidia.dali.pipeline import Pipeline
import numpy as np

batch_size = 2


def get_batch():
    return [
        np.reshape(np.arange(9), (3, 3)) * (i + 1) for i in range(batch_size)
    ]


def run_and_print(pipe):
    pipe.build()
    output = pipe.run()
    for i, out in enumerate(output):
        if type(out) == backend.TensorListGPU:
            out = out.as_cpu()
        output_array = out.as_array()
        print("Output {}:\n{} \n".format(i, output_array))


pipe = Pipeline(batch_size=batch_size, num_threads=4, device_id=0)
with pipe:
    input = fn.external_source(source=get_batch, dtype=types.INT64)

    pipe.set_outputs(input)

run_and_print(pipe)
Output 0:
[[[ 0  1  2]
  [ 3  4  5]
  [ 6  7  8]]

 [[ 0  2  4]
  [ 6  8 10]
  [12 14 16]]]

  1. 在上面的 pipeline 中添加一些归约。从 Max 算子开始。

[2]:
pipe = Pipeline(batch_size=batch_size, num_threads=4, device_id=0)
with pipe:
    input = fn.external_source(source=get_batch, dtype=types.INT64)
    max = fn.reductions.max(input)

    pipe.set_outputs(max)

run_and_print(pipe)
Output 0:
[ 8 16]

如您所见,它返回了每个样本中的最大值。

  1. 执行其他归约,如 MinSum

[3]:
pipe = Pipeline(batch_size=batch_size, num_threads=4, device_id=0)
with pipe:
    input = fn.external_source(source=get_batch, dtype=types.INT64)
    min = fn.reductions.min(input)
    sum = fn.reductions.sum(input)

    pipe.set_outputs(min, sum)

run_and_print(pipe)
Output 0:
[0 0]

Output 1:
[36 72]

在上面的代码示例中,我们看到对每个样本的所有元素执行了归约。

  1. 归约可以沿着任意轴集合执行。要控制此行为,您可以使用 axes 参数。

[4]:
pipe = Pipeline(batch_size=batch_size, num_threads=4, device_id=0)
with pipe:
    input = fn.external_source(source=get_batch, dtype=types.INT64)
    min_axis_0 = fn.reductions.min(input, axes=0)
    min_axis_1 = fn.reductions.min(input, axes=1)

    pipe.set_outputs(min_axis_0, min_axis_1)

run_and_print(pipe)
Output 0:
[[0 1 2]
 [0 2 4]]

Output 1:
[[ 0  3  6]
 [ 0  6 12]]

Min 归约是沿着轴 0 和 1 执行的,它分别返回每列和每行的最小值。

为了更方便,归约支持 axis_names 参数。它允许传递轴名称而不是索引。名称基于输入的布局进行匹配。您需要在 ExternalSource 中提供 layout 参数。

[5]:
pipe = Pipeline(batch_size=batch_size, num_threads=4, device_id=0)
with pipe:
    input = fn.external_source(source=get_batch, layout="AB", dtype=types.INT64)
    min_axis_0 = fn.reductions.min(input, axis_names="A")
    min_axis_1 = fn.reductions.min(input, axis_names="B")

    pipe.set_outputs(min_axis_0, min_axis_1)

run_and_print(pipe)
Output 0:
[[0 1 2]
 [0 2 4]]

Output 1:
[[ 0  3  6]
 [ 0  6 12]]

注意:传递所有轴将导致完全归约,而传递空轴将导致不归约。这对于索引和布局都适用。

[6]:
pipe = Pipeline(batch_size=batch_size, num_threads=4, device_id=0)
with pipe:
    input = fn.external_source(source=get_batch, layout="AB", dtype=types.INT64)
    min_axes_full = fn.reductions.min(input, axes=(0, 1))
    min_axes_empty = fn.reductions.min(input, axes=())
    min_layout_full = fn.reductions.min(input, axis_names="AB")
    min_layout_empty = fn.reductions.min(input, axis_names="")

    pipe.set_outputs(
        min_axes_full, min_axes_empty, min_layout_full, min_layout_empty
    )

run_and_print(pipe)
Output 0:
[0 0]

Output 1:
[[[ 0  1  2]
  [ 3  4  5]
  [ 6  7  8]]

 [[ 0  2  4]
  [ 6  8 10]
  [12 14 16]]]

Output 2:
[0 0]

Output 3:
[[[ 0  1  2]
  [ 3  4  5]
  [ 6  7  8]]

 [[ 0  2  4]
  [ 6  8 10]
  [12 14 16]]]

  1. 对于更高维度的输入,您可以传递轴的任意组合。

[7]:
def get_batch():
    return [
        np.reshape(np.arange(8, dtype=np.int32), (2, 2, 2)) * (i + 1)
        for i in range(batch_size)
    ]
[8]:
pipe = Pipeline(batch_size=batch_size, num_threads=4, device_id=0)
with pipe:
    input = fn.external_source(
        source=get_batch, layout="ABC", dtype=types.INT32
    )
    min_axes_empty = fn.reductions.min(input, axes=())
    min_axes_0_1 = fn.reductions.min(input, axes=(0, 1))
    min_layout_A_C = fn.reductions.min(input, axis_names="AC")

    pipe.set_outputs(min_axes_empty, min_axes_0_1, min_layout_A_C)

run_and_print(pipe)
Output 0:
[[[[ 0  1]
   [ 2  3]]

  [[ 4  5]
   [ 6  7]]]


 [[[ 0  2]
   [ 4  6]]

  [[ 8 10]
   [12 14]]]]

Output 1:
[[0 1]
 [0 2]]

Output 2:
[[0 2]
 [0 4]]

  1. 有些归约需要额外的输入。StdDevVariance 依赖于外部提供的均值,均值可以使用 Mean 归约算子计算得出。

[9]:
pipe = Pipeline(batch_size=batch_size, num_threads=4, device_id=0)
with pipe:
    input = fn.external_source(source=get_batch, dtype=types.INT32)
    mean = fn.reductions.mean(input)
    std_dev = fn.reductions.std_dev(input, mean)
    variance = fn.reductions.variance(input, mean)

    pipe.set_outputs(mean, std_dev, variance)

run_and_print(pipe)
Output 0:
[3.5 7. ]

Output 1:
[2.291288 4.582576]

Output 2:
[ 5.25 21.  ]

  1. 默认情况下,归约会删除不必要的维度。此行为可以使用 keep_dims 参数控制。

[10]:
pipe = Pipeline(batch_size=batch_size, num_threads=4, device_id=0)
with pipe:
    input = fn.external_source(source=get_batch, dtype=types.INT32)
    mean = fn.reductions.mean(input)
    std_dev = fn.reductions.std_dev(input, mean, keep_dims=True)
    variance = fn.reductions.variance(input, mean)

    pipe.set_outputs(mean, std_dev, variance)

run_and_print(pipe)
Output 0:
[3.5 7. ]

Output 1:
[2.291288 4.582576]

Output 2:
[ 5.25 21.  ]

在上面的代码示例中,应用归约导致输出类型发生变化。

  1. dtype 参数可用于指定所需的输出数据类型。

[11]:
pipe = Pipeline(batch_size=batch_size, num_threads=4, device_id=0)
with pipe:
    input = fn.external_source(source=get_batch, dtype=types.INT32)
    sum_int_64 = fn.reductions.sum(input, dtype=types.INT64)
    sum_float = fn.reductions.sum(input, dtype=types.FLOAT)

    pipe.set_outputs(sum_int_64, sum_float)

run_and_print(pipe)
Output 0:
[28 56]

Output 1:
[28. 56.]

注意:并非支持所有数据类型组合。默认行为因算子而异。一般规则是输出类型能够容纳结果,具体取决于输入类型。例如,对于输入类型 INT32,求和的默认输出类型为 INT32,均值的默认输出类型为 FLOAT

  1. 所有归约都可以卸载到 GPU。GPU 变体的工作方式与其 CPU 对应物相同。下面我们展示了一个代码示例,其中包含使用各种参数卸载到 GPU 的所有归约。

[12]:
pipe = Pipeline(batch_size=batch_size, num_threads=4, device_id=0)
with pipe:
    input = fn.external_source(
        source=get_batch, layout="ABC", dtype=types.INT32
    )
    min = fn.reductions.min(input.gpu(), axis_names="AC", keep_dims=True)
    max = fn.reductions.max(input.gpu(), keep_dims=True)
    sum = fn.reductions.sum(input.gpu(), dtype=types.INT64)
    mean = fn.reductions.mean(input.gpu(), axes=0)
    mean_square = fn.reductions.mean_square(input.gpu())
    rms = fn.reductions.rms(input.gpu(), axes=(), dtype=types.FLOAT)
    std_dev = fn.reductions.std_dev(input.gpu(), mean, axes=0)
    variance = fn.reductions.variance(
        input.gpu(), mean.gpu(), axes=0, keep_dims=True
    )

    pipe.set_outputs(min, max, sum, mean, mean_square, rms, std_dev, variance)

run_and_print(pipe)
Output 0:
[[[[0]
   [2]]]


 [[[0]
   [4]]]]

Output 1:
[[[[ 7]]]


 [[[14]]]]

Output 2:
[28 56]

Output 3:
[[[ 2.  3.]
  [ 4.  5.]]

 [[ 4.  6.]
  [ 8. 10.]]]

Output 4:
[17.5 70. ]

Output 5:
[[[[ 0.  1.]
   [ 2.  3.]]

  [[ 4.  5.]
   [ 6.  7.]]]


 [[[ 0.  2.]
   [ 4.  6.]]

  [[ 8. 10.]
   [12. 14.]]]]

Output 6:
[[[2. 2.]
  [2. 2.]]

 [[4. 4.]
  [4. 4.]]]

Output 7:
[[[[ 4.  4.]
   [ 4.  4.]]]


 [[[16. 16.]
   [16. 16.]]]]