使用图像处理算子处理视频#
在本教程中,我们将看到几个使用图像处理算子处理视频的示例。
我们将重点介绍如何使用 fn.per_frame 算子来指定时间参数,即控制每个已处理视频帧的参数。我们将使用 fn.readers.video_resize 算子加载视频作为帧序列批次(具有 FHWC
样本 布局 的批次)。
让我们首先定义一些实用工具来显示结果。
[1]:
import base64
import io
from PIL import Image, ImageDraw
from IPython import display
def draw_points(images, points):
assert len(points) == len(images)
for frame_points, image in zip(points, images):
draw = ImageDraw.Draw(image)
for x, y in frame_points:
draw.ellipse(
(x - 3, y - 3, x + 3, y + 3), fill="blue", outline="blue"
)
def display_video(batch, duration=50, points=None):
images = [
Image.fromarray(frame)
for sequence in batch
for frame in np.array(sequence)
]
if points is not None:
points = [
frame_points
for sequence in points
for frame_points in np.array(sequence)
]
draw_points(images, points)
image, *images = images
with io.BytesIO() as file:
image.save(
file,
save_all=True,
append_images=images,
duration=duration,
loop=0,
format="webp",
minimize_size=True,
)
display.display(display.Image(data=file.getvalue()))
我们设置 DALI_EXTRA_PATH 以指向 DALI extra 仓库,其中包含我们将在示例中使用的 Sintel 电影预告片。
[2]:
import os
num_frames, num_channels = 30, 3
roi_start = (90, 0)
roi_end = (630, 1280)
vid_dir = os.path.join(
os.environ["DALI_EXTRA_PATH"], "db", "video", "sintel", "video_files"
)
vid_files = ["sintel_trailer-720p_3.mp4"]
vid_filenames = [os.path.join(vid_dir, vid_file) for vid_file in vid_files]
让我们定义一个 pipeline,它通过 external_source 提供的角度读取和 旋转 视频。
[3]:
import math
import numpy as np
from nvidia.dali import pipeline_def
import nvidia.dali.fn as fn
import nvidia.dali.types as types
height, width = 144, 256
diag = math.ceil(math.sqrt(height * height + width * width))
@pipeline_def
def per_sample_pipeline(angle_source):
video = fn.readers.video_resize(
filenames=vid_filenames,
name="video reader",
sequence_length=num_frames,
file_list_include_preceding_frame=True,
device="gpu",
roi_start=roi_start,
roi_end=roi_end,
seed=42,
resize_x=width,
resize_y=height,
)
angle = fn.external_source(source=angle_source, batch=False)
rotated = fn.rotate(video, angle=angle, size=[diag, diag], fill_value=255)
return rotated
我们在样本模式下运行 external_source (batch=False
)。对于批次中的每个样本(序列),将使用 SampleInfo 实例调用 source
回调。这样,我们可以控制数据集中每个序列的旋转角度。
我们使用了 num_frames=30
和 batch_size=4
,这意味着单个批次总共包含 120 帧,分为 4 个序列。这 4 个序列中的每一个都将以不同的角度旋转。
[4]:
def sample_rotate(sample_info):
return np.array(-10 * sample_info.idx_in_epoch, dtype=np.float32)
[5]:
pipe = per_sample_pipeline(
batch_size=4, device_id=0, num_threads=4, angle_source=sample_rotate
)
pipe.build()
(video,) = pipe.run()
display_video(video.as_cpu())

在所展示的示例中,我们可以看到视频的后续片段以不同的角度旋转。
然而,在处理视频时,您可能需要对参数进行更精细的控制。例如,在旋转的情况下,您可能需要为每个序列中的每个帧指定不同的角度。fn.per_frame 算子使这成为可能。
我们需要调整两件事才能使其工作。
首先,我们需要修改回调。它将返回角度数组,而不是给定序列的单个角度。数组的大小必须与相应视频序列中的帧数匹配。在我们的例子中,任何序列中始终有 num_frames=30
帧。
其次,在 pipeline 中,我们用 fn.per_frame 调用包装角度参数,以提示 fn.rotate 算子应逐帧使用角度。
[6]:
def frame_rotate(sample_info):
return np.linspace(0, -30, num_frames, dtype=np.float32)
[7]:
@pipeline_def
def per_frame_pipeline(angle_source):
video = fn.readers.video_resize(
filenames=vid_filenames,
name="video reader",
sequence_length=num_frames,
file_list_include_preceding_frame=True,
device="gpu",
roi_start=roi_start,
roi_end=roi_end,
seed=42,
resize_x=width,
resize_y=height,
)
angle = fn.external_source(source=angle_source, batch=False)
rotated = fn.rotate(
video, angle=fn.per_frame(angle), size=[diag, diag], fill_value=255
)
return rotated
[8]:
pipe = per_frame_pipeline(
batch_size=4, device_id=0, num_threads=4, angle_source=frame_rotate
)
pipe.build()
(video,) = pipe.run()
display_video(video.as_cpu())

在所展示的示例中,我们为序列的每一帧使用了不同的角度,但角度向量在不同的序列中是相同的。让我们通过修改回调来更改这一点,使其使用提供的 SampleInfo 实例来根据 sample_info.idx_in_epoch
移动角度。
[9]:
def frame_rotate(sample_info):
quarter = np.linspace(0, -90, num_frames, endpoint=False, dtype=np.float32)
return quarter - 90 * sample_info.idx_in_epoch
[10]:
pipe = per_frame_pipeline(
batch_size=4, device_id=0, num_threads=4, angle_source=frame_rotate
)
pipe.build()
(video,) = pipe.run()
display_video(video.as_cpu())

现在,让我们考虑一个稍微复杂一点的示例,它结合了 fn.warp_affine 和 fn.gaussian_blur 算子。
fn.warp_affine 将使用通过 fn.external_source 提供的 fn.per_frame 参数,以将不同的变换应用于连续的序列。相反,fn.gaussian_blur 算子将对数据集中的每个序列使用相同的参数系列。为此,我们不需要使用 fn.external_source,而是可以直接在 pipeline 中定义数组。
[11]:
height, width = 216, 384
max_sigma = 5
batch_size = 8
def compose(*transforms):
h, *t = transforms
return h if not t else np.matmul(h, compose(*t))
def translate(offsets):
return np.array([[[1, 0, tw], [0, 1, th], [0, 0, 1]] for tw, th in offsets])
def scale(scales):
return np.array([[[sw, 0, 0], [0, sh, 0], [0, 0, 1]] for sw, sh in scales])
def rotate(angles):
css = [
(np.cos(angle), np.sin(angle))
for angle in (math.radians(angle) for angle in angles)
]
return np.array([[[c, -s, 0], [s, c, 0], [0, 0, 1]] for c, s in css])
def rotate_center(angles):
pre_rot = translate([(-0.5 * width, -0.5 * height)] * len(angles))
return compose(np.linalg.inv(pre_rot), rotate(angles), pre_rot)
def align_center(sizes):
return translate([((width - w) / 2, (height - h) / 2) for w, h in sizes])
def move(rs, angles):
def shift(r, angle):
dw = r * np.sin(math.radians(angle))
dh = r * np.cos(math.radians(angle))
return (dw, dh)
return translate([shift(r, angle) for r, angle in zip(rs, angles)])
def zoom(start, end):
scaling = np.linspace(start, end, num_frames)
s = scale(zip(scaling, scaling))
t = align_center([(width * s, height * s) for s in scaling])
return compose(t, s)
def shake():
max_angle = 30
angles = (
np.sin(np.linspace(0, 5 * np.pi, num_frames))
* max_angle
* np.linspace(1, 0.25, num_frames)
)
return rotate_center(angles)
def circle(r, angle_start, angle_end):
angles = np.linspace(angle_start, angle_end, num_frames, endpoint=False)
return compose(move([r] * num_frames, angles), rotate_center(angles - 90))
def create_warp_ms_source():
r = height // 3 - 0.125 * height
def affine(transform):
return np.array(
[frame_transform[0:2] for frame_transform in transform],
dtype=np.float32,
)
def animation(sample_info):
i = sample_info.idx_in_batch
if i == 0:
return zoom(1, 0.5)
if i == 1:
return compose(zoom(0.5, 0.5), shake())
if i == 2:
move_right = move(np.linspace(0, r, num_frames), [90] * num_frames)
return compose(move_right, zoom(0.5, 0.25))
if i == 7:
move_back = move(np.linspace(r, 0, num_frames), [90] * num_frames)
return compose(move_back, zoom(0.25, 1))
# for samples 3, 4, 5, 6
return compose(circle(r, 90 * (i - 2), 90 * (i - 1)), zoom(0.25, 0.25))
return lambda sample_info: affine(animation(sample_info))
[12]:
@pipeline_def
def pipeline():
video = fn.readers.video_resize(
filenames=vid_filenames,
name="video reader",
sequence_length=num_frames,
file_list_include_preceding_frame=True,
device="gpu",
roi_start=roi_start,
roi_end=roi_end,
seed=42,
resize_x=width,
resize_y=height,
)
warp_matrix = fn.external_source(
source=create_warp_ms_source(), batch=False
)
video = fn.warp_affine(
video, fn.per_frame(warp_matrix), fill_value=255, inverse_map=False
)
sigma = np.sin(np.linspace(0, np.pi, num_frames)) * max_sigma
window_size = np.array(2 * np.ceil(sigma) + 1, dtype=np.int32)
video = fn.gaussian_blur(
video, window_size=fn.per_frame(window_size), sigma=fn.per_frame(sigma)
)
return video
[13]:
pipe = pipeline(batch_size=batch_size, device_id=0, num_threads=4)
pipe.build()
(vid,) = pipe.run()
[14]:
display_video(vid.as_cpu().as_array())

手动计算变换矩阵可能很繁琐。幸运的是,DALI 提供了一系列 fn.transforms 算子,可以简化此任务。让我们考虑类似的示例,但这次限制外部源的使用,而倾向于使用 DALI 变换。
此外,该示例将说明如何使用 fn.coord_transform 算子将相同的变换应用于一组关键点。
[15]:
import nvidia.dali.math as dali_math
height, width = 216, 384
r = height // 3 - 0.125 * height
scale = 0.35
def angle_source(sample_info):
i = sample_info.idx_in_batch
angle_deg = np.linspace(
90 * (i + 1), 90 * (i + 2), num_frames, dtype=np.float32, endpoint=False
)
return angle_deg, np.deg2rad(angle_deg)
def coord_source(sample_info):
# let us track how corners and the center of the image are transformed
return np.array(
[
[0, 0],
[0, height],
[width, 0],
[width, height],
[width // 2, height // 2],
],
dtype=np.int32,
)
[16]:
@pipeline_def
def pipeline():
video = fn.readers.video_resize(
filenames=vid_filenames,
name="video reader",
sequence_length=num_frames,
file_list_include_preceding_frame=True,
device="gpu",
roi_start=roi_start,
roi_end=roi_end,
seed=42,
resize_x=width,
resize_y=height,
)
angle_deg, angle_rad = fn.external_source(
source=angle_source, batch=False, num_outputs=2
)
shifted_angle = angle_rad + 90
# np.sin(*) will be treated as a constant - for every sequence we reuse
# the same shear_angle tensor
shear_angle = (
np.sin(
np.linspace(0, 2 * np.pi, num_frames, endpoint=False),
dtype=np.float32,
)
* 15
)
shear_angles = fn.stack(shear_angle, shear_angle, axis=1)
shear = fn.transforms.shear(
angles=fn.per_frame(shear_angles), center=[width / 2, height / 2]
)
rotation = fn.transforms.rotation(
angle=fn.per_frame(angle_deg), center=[width / 2, height / 2]
)
# we are using nvidia.dali.math arithmetic to compute sin and cos
# in each iteration
shift_s = r * dali_math.sin(shifted_angle)
shift_c = r * dali_math.cos(shifted_angle)
move_offset = fn.stack(shift_s, shift_c, axis=1)
move = fn.transforms.translation(offset=fn.per_frame(move_offset))
zoomed_out = fn.transforms.scale(
scale=[scale, scale], center=[width / 2, height / 2]
)
# zoomed_out is broadcast when combined with per-frame rotation and move
circle_rotate = fn.transforms.combine(zoomed_out, shear, rotation, move)
coord = fn.external_source(source=coord_source, batch=False)
coord_transformed = fn.coord_transform(coord, MT=circle_rotate)
# the circle_rotate is already per-frame, wrapping it with per_frame
# call is optional
video = fn.warp_affine(
video, matrix=circle_rotate, fill_value=255, inverse_map=False
)
return video, coord_transformed
[17]:
pipe = pipeline(batch_size=batch_size, device_id=0, num_threads=4)
pipe.build()
(vid, coord_transformed) = pipe.run()
display_video(vid.as_cpu().as_array(), points=coord_transformed)
