通过 create_op 装饰器简化 Python 运算符的创建
holoscan.decorator.create_op()
装饰器和支持它的 holoscan.decorator.Input
和 holoscan.decorator.Output
类在 Holoscan v2.2 中是新增功能,目前仍被视为实验性功能。它们现在可以使用,但根据初步反馈,行为或 API 可能会进行一些向后不兼容的更改。
为了方便起见,我们提供了一个 holoscan.decorator.create_op()
装饰器,它可以用于自动将简单的 Python 函数/生成器或类转换为原生 Python holoscan.core.Operator
。被包装的函数体(或者如果将 create_op
应用于类,则为 __call__
方法)将对应于在 holoscan.core.Operator.compute()
方法中完成的计算,但无需显式调用 holoscan.core.InputContext.receive()
来接收输入或 holoscan.core.OutputContext.emit()
来传输输出。任何必要的输入或输出端口都将自动生成。
首先考虑一个名为 mask_and_offset
的简单 Python 函数,它接受 image
和 mask
张量作为输入并将它们相乘,然后加上一些标量 offset
。
def mask_and_offset(image, mask, offset=1.5):
return image * mask + offset
要将此函数转换为返回相应运算符的函数,我们可以添加 create_op
装饰器,如下所示
from holoscan.decorator import create_op
@create_op(
inputs=("image", "mask"),
outputs="out",
)
def mask_and_offset(image, mask, offset=1.5):
return image * mask + offset
通过提供 inputs
参数,我们指定有两个名为 “image” 和 “mask” 的输入端口。通过设置 outputs="out"
,我们表明输出将在名为 “out” 的端口上进行传输。当以这种简单字符串的方式指定 inputs
时,使用的名称必须映射到被包装函数签名中的变量名称。稍后我们将看到,可以使用 holoscan.decorator.Input
类来提供更多关于输入如何映射到函数参数的控制。同样,我们将看到 holoscan.decorator.Output
类可以用于提供更多关于函数输出如何映射到任何输出端口的控制。
对于 create_op
,还有一个可选的 cast_tensors
参数。为了方便起见,此参数默认为 True
,这将导致任何类似张量的对象在传递给函数之前自动转换为 NumPy 或 CuPy 数组(分别用于主机或设备张量)。如果不需要这样做(例如,由于使用与 NumPy 或 CuPy 不同的第三方张量框架),用户可以将 cast_tensors=False
设置为 False,并在函数体中手动处理任何 holoscan.Tensor
对象到所需形式的转换。此转换选项适用于单个张量或张量图 (dict[Tensor]
)。
然后可以在 Application
的 compose
方法中使用此装饰函数来创建与此计算相对应的运算符
from holoscan.core import Application, Operator
def MyApp(Application):
def compose(self)
mask_op = mask_and_offset(self, name="my_scaling_op", offset=0.0)
# verify that an Operator was generated
assert(isinstance(mask_op, Operator))
# now add any additional operators and create the computation graph using add_flow
请注意,与所有其他 Operator 类一样,必须将应用程序(或片段)作为第一个参数提供(此处为 self
)。始终支持 name
kwarg,它是将分配给运算符的名称。由于使用了此 kwarg
来指定运算符名称,因此被包装的函数(在本例中为 mask_and_offset
)不应使用 name
作为参数名称。在本例中,我们指定了 offset=0.0
,这将覆盖函数签名中 offset=1.5
的默认值。
为了完整性,在 mask_and_offset
上使用 create_op
装饰器等效于用户定义了以下 MaskAndOffsetOp
类并在 MyApp.compose
中使用了它
def MaskAndOffsetOp(Operator):
def setup(self, spec):
spec.input("image")
spec.input("mask")
spec.output("out")
def compute(self, op_input, op_output, context):
# Simplified logic here assumes received values are GPU tensors
# create_op would add additional logic to handle the inputs
image = op_input.receive("image")
image = cp.asarray(image)
mask = op_input.receive("mask")
mask = cp.asarray(mask)
out = image * mask + offset
op_output.emit(out, "out")
让我们考虑另一个示例,其中函数接收多个数组,处理它们,并返回更新数组的元组
def scale_and_offset(x1, x2, scale=2.0, offset=1.5):
y1 = x1 * scale
y2 = x2 + offset
return y1, y2
要将其转换为相应的运算符,我们可以添加 create_op
装饰器,如下所示
@create_op(
inputs=("x1", "x2"),
outputs=("out1", "ou2"),
)
def scale_and_offset(x1, x2, scale=2.0, offset=1.5):
y1 = x1 * scale
y2 = x2 + offset
return y1, y2
与之前一样,通过 inputs
定义的端口 “x1” 和 “x2” 接收的消息将映射到相应的变量 x1
和 x2
。同样,输出元组的元素,数组 y1
和 y2
,将分别通过端口 “out1” 和 “out2” 发送。与输入映射(由端口和变量的命名确定)相比,输出映射由输出端口的顺序和函数返回的元组中的元素确定。
本节将介绍在以下情况下使用 str
或 Tuple[str]
作为 inputs
参数是不够的其他用例。
场景 1: 假设上游运算符将张量图发送到给定的输入端口,我们需要指定张量图中哪些张量将映射到哪个输入端口。
对于一个具体的例子,假设我们想使用如下函数打印张量的形状
def print_shape(tensor):
print(f"{tensor.shape = }")
但上游运算符输出一个包含两个名为 “image” 和 “labels” 的张量的字典。我们可以通过指定特定输入端口上的哪个张量名称将映射到函数的 “tensor” 参数来使用此运算符。例如
@create_op(inputs=Input("input_tensor", arg_map={"image": "tensor"}))
def print_shape(tensor):
print(f"{tensor.shape = }")
将创建一个具有单个名为 “input_tensor” 的输入端口且没有输出端口的运算符。输入端口可以接收包含任意数量张量的张量图,但只会使用名为 “image” 的张量,将其映射到被包装函数的 “tensor” 参数。一般来说,arg_map
是一个字典,它将端口上找到的张量名称映射到它们对应的函数参数名称。
场景 2: 我们想覆盖端口上存在的调度条件。这可以通过使用 condition
和可选的 condition_kwargs
参数指定 Input 来完成。例如,要覆盖默认添加到端口的 MessageAvailableCondition,并允许即使在没有输入消息可用时也调用 compute
@create_op(inputs=Input("input_tensor", condition=ConditionType.NONE, condition_kwargs={}))
场景 3: 我们想覆盖端口上存在的接收器的参数。例如,我们可以为默认使用的双缓冲接收器指定不同的策略(policy=1 对应于当队列已满时丢弃传入消息)
@create_op(inputs=Input("input_tensor", connector=ConditionType.DOUBLE_BUFFER, connector_kwargs=dict(capacity=1, policy=1)))
为了支持应该存在多个输出端口的情况,用户必须让函数返回一个 dict
。holoscan.decorator.Output
类随后具有一个 tensor_names
关键字参数,可以指定该参数来指示字典中的哪些项将在给定的输出端口上进行传输。
例如,假设我们有一个函数生成三个张量,x
、y
和 z
,我们想在端口 “out1” 上传输 x
和 y
,而 z
将在端口 “out2” 上传输。这可以通过在 create_op
调用中按如下方式指定 outputs
来完成
@create_op(
outputs=(
Output("out1", tensor_names=("x", "y")),
Output("out2", tensor_names=("z",)),
),
)
def xyz_generator(nx=32, ny=32, nz=16):
x = cp.arange(nx, dtype=np.uint32)
y = cp.arange(ny, dtype=np.uint32)
z = cp.arange(nz, dtype=np.uint32)
# must return a dict type when Output arg(s) with `tensor_names` is used
return dict(x=x, y=y, z=z)
此运算符没有输入端口和三个可选的关键字参数。它如上所述将输出张量拆分到两个端口上。tensor_names
中使用的所有名称都必须与对象发出的 dict
中存在的键相对应。通常 dict
值是张量,但这不是必需的。
holoscan.decorator.Output
类还支持 condition
、condition_kwargs
、connector
和 connector_kwargs
,它们的工作方式与上面针对 holoscan.decorator.Input
显示的方式相同。例如,要覆盖名为 “output_tensor” 的单个输出端口的发射器队列策略
@create_op(inputs=Output("output_tensor", connector=ConditionType.DOUBLE_BUFFER, connector_kwargs=dict(capacity=1, policy=1))
请注意,未指定 tensor_names
,这意味着返回的对象不需要是 dict
。对象本身将在 “output_tensor” 端口上发出。
在为 create_op
指定 inputs
和 outputs
参数时,请确保所有端口都具有唯一的名称。举一个具体的例子,如果一个运算符具有用于发送图像的单个输入和输出端口,则应使用唯一的端口名称,如 “image_in” 和 “image_out”,而不是对两者都使用 “image”。
SDK 包含一个 python_decorator 示例,展示了包装的 C++ 运算符 (VideoStreamReplayerOp
和 HolovizOp
) 与通过 create_op
装饰器创建的本机 Python 运算符的互操作性。
此应用程序的开头导入了几个内置的基于 C++ 的运算符,并带有 Python 绑定 (HolovizOp
和 VideoStreamReplayerOp
)。除了这些之外,还通过 create_op
装饰器 API 创建了两个新运算符。
import os
from holoscan.core import Application
from holoscan.decorator import Input, Output, create_op
from holoscan.operators import HolovizOp, VideoStreamReplayerOp
sample_data_path = os.environ.get("HOLOSCAN_INPUT_PATH", "../data")
@create_op(
inputs="tensor",
outputs="out_tensor",
)
def invert(tensor):
tensor = 255 - tensor
return tensor
@create_op(inputs=Input("in", arg_map="tensor"), outputs=Output("out", tensor_names=("frame",)))
def tensor_info(tensor):
print(f"tensor from 'in' port: shape ={tensor.shape}, " f"dtype ={tensor.dtype.name}")
return tensor
第一个是通过将装饰器添加到名为 invert
的函数来创建的,该函数仅反转(8 位 RGB)色彩空间值。第二个运算符是通过将装饰器添加到名为 tensor_info
的函数来创建的,该函数假定输入是 CuPy 或 NumPy 张量,并打印其形状和数据类型。请注意,create_op
的默认 cast_tensors=True
选项确保任何主机或设备张量都分别转换为 NumPy 或 CuPy 数组。这就是为什么在函数体中使用 NumPy API 是安全的原因。如果用户想直接接收 holoscan.Tensor
对象并在函数体中手动处理到不同对象类型的转换,则应在 create_op
的关键字参数中指定 cast_tensors=False
。
现在我们已经定义或导入了所有运算符,我们可以通过从 Application
类继承并实现 compose
方法,以通常的方式构建应用程序。此示例的其余代码如下所示。
class VideoReplayerApp(Application):
"""Example of an application that uses the operators defined above.
This application has the following operators:
- VideoStreamReplayerOp
- HolovizOp
- invert (created via decorator API)
- tensor_info (created via decorator API)
`VideoStreamReplayerOp` reads a video file and sends the frames to the HolovizOp.
The `invert` operator inverts the color map (the 8-bit `value` in each color channel is
set to `255 - value`).
The `tensor_info` operator prints information about the tensors shape and data type.
`HolovizOp` displays the frames.
"""
def compose(self):
video_dir = os.path.join(sample_data_path, "racerx")
if not os.path.exists(video_dir):
raise ValueError(f"Could not find video data:{video_dir=}")
# Define the replayer and holoviz operators
replayer = VideoStreamReplayerOp(
self,
name="replayer",
directory=video_dir,
basename="racerx",
frame_rate=0, # as specified in timestamps
repeat=False, # default: false
realtime=True, # default: true
count=40, # default: 0 (no frame count restriction)
)
invert_op = invert(self, name="image_invert")
info_op = tensor_info(self, name="tensor_info")
visualizer = HolovizOp(
self,
name="holoviz",
width=854,
height=480,
# name="frame" to match Output argument to create_op for tensor_info
tensors=[dict(name="frame", type="color", opacity=1.0, priority=0)],
)
# Define the workflow
self.add_flow(replayer, invert_op, {("output", "tensor")})
self.add_flow(invert_op, info_op, {("out_tensor", "in")})
self.add_flow(info_op, visualizer, {("out", "receivers")})
def main():
app = VideoReplayerApp()
app.run()
if __name__ == "__main__":
main()
突出显示的行显示了如何通过将应用程序本身作为第一个参数传递来创建与 invert
和 tensor_info
函数相对应的运算符。invert_op
和 info_op
变量现在对应于 holsocan.core.Operator
类,并且可以使用 add_flow
以通常的方式连接以定义计算。请注意,通过可选的 name
关键字参数,为这些运算符提供了名称。在本例中,每个运算符仅使用一次,但如果要在一个应用程序中多次使用同一运算符,则应为每个运算符指定唯一的名称。
create_op
装饰器可以像应用于函数一样应用于生成器。在这种情况下,将自动为运算符添加 BooleanCondition
,一旦生成器耗尽(没有更多值要生成),该条件将阻止运算符再次尝试调用 compute
。以下是将生成器装饰为从 1 到 count
的整数的基本示例
@create_op(outputs="out")
def source_generator(count):
yield from range(1, count + 1)
然后,compose
方法可以从这个装饰的生成器创建一个运算符,如下所示
count_op = source_generator(self, count=100, name="int_source")
create_op
装饰器也可以应用于实现 __call__
方法的类,以将其转换为 Operator()
。选择类而不是函数的一个原因是,如果需要在跨调用维护某些内部状态。例如,下面定义的运算符将输入数据转换为 32 位浮点数,并在偶数帧上对值取反。
@create_op
class NegateEven:
def __init__(self, start_index=0):
self.counter = start_index
def __call__(self, x):
# cast tensor to 32-bit floating point
x = x.astype('float32')
# negate the values if the frame is even
if self.counter % 2 == 0:
x = -x
return x
在本例中,由于该函数只有一个输入和输出,我们可以在对 create_op
的调用中省略 inputs
和 outputs
参数。在这种情况下,输入端口的名称将为 "x"
,这由函数签名中的变量名称确定。输出端口将具有空名称 ""
。要使用不同的端口名称,应指定 inputs
和/或 outputs
参数。
然后,compose
方法可以从此装饰的生成器创建一个运算符,如下所示。请注意,__init__
方法中的任何位置或关键字参数都将在 NegateEven
调用期间提供。这将返回一个函数(还不是运算符),然后可以调用该函数来生成运算符。如下所示
negate_op_creation_func = NegateEven(start_index=0) # `negate_op_creation_func` is a function that returns an Operator
negate_even_op = negate_op_creation_func(self, name="negate_even") # call the function to create an instance of NegateEvenOp
或更简洁地表示为
negate_even_op = NegateEven(start_index=0)(self, name="negate_even")
请注意,上面定义的运算符类大致等效于下面定义的 Python 本机运算符。我们在此处显式地显示它以供参考。
import cupy as cp
import numpy as np
class NegateEvenOp(Operator):
def __init__(self, fragment, *args, start_index=0, **kwargs):
self.counter = start_index
super().__init__(fragment, *args, **kwargs)
def setup(self, spec):
spec.input("x")
spec.output("")
def compute(op_input, op_output, context):
x = op_input.receive("x")
# cast to CuPy or NumPy array
# (validation that `x` is a holoscan.Tensor is omitted for simplicity)
if hasattr(x, '__cuda_array_interface__'):
x = cupy.asarray(x)
else:
x = numpy.asarray(x)
# cast tensor to 32-bit floating point
x = x.astype('float32')
# negate the values if the frame is even
if self.counter % 2 == 0:
x = -x
op_output.emit(x, "")
此 NegateEvenOp
类与上面装饰的 NegateEven
之间的主要区别在于
NegateEven
不需要定义setup
方法NegateEven
不从Operator
继承,因此不会从构造函数调用其__init__
。NegateEven::__call__
方法比NegateEvenOp::compute
方法更简单,因为不需要显式调用receive
和emit
方法,并且对于NegateEven
,到 NumPy 或 CuPy 数组的转换是自动处理的。