bionemo-size-aware-batching
要安装,请执行以下操作
pip install -e .
要运行单元测试,请执行
pytest -v .
使用摘要
此软件包提供了一种以内存消耗感知(或大小感知)的方式创建小批量数据的简单方法,使其适用于在具有不同内存需求的数据集上训练模型等任务。使用方法通常包括以下步骤
- 使用
collect_cuda_peak_alloc
函数收集用户定义的工作流程的 CUDA 峰值内存分配统计信息。预期用户定义的工作流程将返回从数据中提取的特征列表,以便后续步骤中的内存模型可以使用这些特征来预测内存分配。 - 用户使用上一步骤中的特征和内存分配数据定义和训练内存模型。然后,此内存模型可用于预测内存消耗。
- 使用
SizeAwareBatchSampler
或size_aware_batching
以及内存模型预测(来自上一步骤)来构建数据批次,以便生成的小批量数据不超过指定的最大总内存大小。
此外,此软件包还提供了一种创建同质小批量数据的解决方案,这对于减少在训练或评估模型时对齐输入形状时的填充非常有用。此 BucketBatchSampler
可以与 torch.utils.data.BatchSampler
、SizeAwareBatchSampler
或其他用户定义的批次采样器结合使用。此用法可以利用 create_buckets
函数并遵循以下步骤
- 收集数据集中元素的张量大小,这些大小是您想要减少填充的特定维度中张量的形状。
- 根据张量大小提供您自己的存储桶边界,或使用
create_buckets
函数以及张量大小、存储桶最大宽度和最小存储桶计数创建存储桶边界。create_buckets
函数将尝试创建宽度最小且元素计数 >= 最小存储桶计数的存储桶,除非达到最大宽度或边界。 - 为每个存储桶使用
BucketBatchSampler
以及基本批次采样器,如torch.utils.data.BatchSampler
或SizeAwareBatchSampler
。BucketBatchSampler
每次将选择一个存储桶,并使用此存储桶的基本批次采样器从此存储桶生成一个小批量数据。因此,为生成的小批量数据所必需的填充将始终小于存储桶的宽度。
有关 API 文档以及有关如何实现上述每个步骤的示例,请参阅后面的章节。
utils 模块
-
collect_cuda_peak_alloc:一个函数,用于收集 CUDA 峰值内存分配统计信息和特征,以用于给定工作流程的内存使用情况预测。
-
create_buckets:一个函数,用于为具有预定义最大范围宽度和最小存储桶大小的整数列表创建存储桶。
sampler 模块
- size_aware_batching:一个生成器,用于从可迭代对象中批量处理元素,同时确保每个批次的总大小不超过指定的最大值。此处的大小可以是批次中元素内存消耗的度量。
- SizeAwareBatchSampler:一个类,用于批量处理大小不同的元素,同时确保每个批次的总大小不超过指定的最大值。
- BucketBatchSampler:一个类,用于根据预定义的存储桶范围对大小不同的元素进行分组,并创建来自每个存储桶的元素的批次,以确保每个批次都具有大小同质的元素。
API 参考和示例
utils
collect_cuda_peak_alloc
def collect_cuda_peak_alloc(
dataset: Iterable[Data],
work: Callable[[Data], Feature],
device: torch.device,
cleanup: Optional[Callable[[], None]] = None
) -> Tuple[List[Feature], List[int]]
收集给定工作流程的 CUDA 峰值内存分配统计信息。
此函数迭代提供的数据集,将给定的特征函数应用于每个数据点,并记录此过程中的峰值 CUDA 内存分配。从数据点提取的特征与其相应的内存使用情况统计信息一起收集。
请注意,由于未初始化的数据(例如,内部 PyTorch 缓冲区),工作流程的最初几次迭代可能会导致较小的内存分配。因此,用户可能希望在分析结果时跳过这些初始数据点。
参数:
dataset
- 包含输入数据的可迭代对象。work
- 接受数据点并返回其对应特征的函数。这是发生主要计算和跟踪内存分配的地方。device
- 目标 Torch CUDA 设备。cleanup
- 在每次迭代后调用的函数,用于执行任何必要的清理。
返回:
包含收集的特征及其相应的内存使用情况统计信息的元组。
引发:
ValueError
- 如果提供的设备不是 CUDA 设备。
示例:
>>> import torch
>>> from bionemo.size_aware_batching.utils import collect_cuda_peak_alloc
>>> # prepare dataset, model and other components of a workflow
>>> # for which the user want to collect CUDA peak memory allocation statistics
>>> dataset, model, optimizer = ...
>>> # Set the target Torch CUDA device.
>>> device = torch.device("cuda:0")
>>> model = model.to(device)
>>> # Define a function that takes an element of the dataset as input and
>>> # do a training step
>>> def work(data):
... # example body of a training loop
... optimizer.zero_grad()
... output = model(data.to(device))
... loss = compute_loss(output)
... loss.backward()
... optimizer.step()
... # extract the feature for later to be modeled or analyzed
... return featurize(data)
>>> # can optionally use a cleanup function to release the references
>>> # hold during the work(). This cleanup function will be called
>>> # at the end of each step before garbage collection and memory allocations measurement
>>> def cleanup():
... model.zero_grad(set_to_none=True)
>>> # Collect features (i.e., model outputs) and memory usage statistics for the workflow.
>>> features, alloc_peaks = collect_cuda_peak_alloc(
... dataset=batches,
... work=work,
... device=device,
... cleanup=cleanup,
... )
>>> # use features and alloc_peaks as needed, e.g., fit a model
>>> # that can use these statistics to predict memory usage
>>> memory_model = ...
>>> memory_model.fit(features, alloc_peaks)
create_buckets
def create_buckets(sizes: torch.Tensor, max_width: int,
min_bucket_count: int) -> Buckets
为具有预定义最大间隔宽度和最小存储桶计数的整数列表创建存储桶。
它将返回一个命名元组,其中包含存储桶边界和实际存储桶大小。例如,torch.tensor([0, 5, 7]), torch.tensor([3,2]):指定 2 个存储桶:一个范围为 0<= 大小 < 5,宽度=5 和 3 个元素,另一个范围为 5 <= 大小 < 7,宽度=2 和 2 个元素。
参数:
sizes
- 整数的 1D 张量。max_width
- 存储桶的最大宽度,应为正整数。min_bucket_count
- 存储桶的最小计数,应为正整数。如果存储桶宽度达到 max_width,则存储桶大小可能小于 min_bucket_count。
引发:
ValueError
- 如果提供的 sizes 为空或不是整数。ValueError
- 如果 max_width 不是正整数或 min_bucket_count 不是正整数。
返回:
一个命名元组,其中包含升序排列的存储桶边界和每个存储桶中元素的数量。
示例:
>>> import torch
>>> from bionemo.size_aware_batching.utils import create_buckets
>>> sizes = torch.tensor([1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 22, 22, 22, 22])
>>> buckets = create_buckets(sizes, max_width=5, min_bucket_count=10)
>>> # 5 buckets: 1 <= sizes < 6, 6 <= sizes < 11, 11 <= sizes < 16, 16 <= sizes < 21, 21 <= sizes < 23
>>> print(buckets.bucket_boundaries)
tensor([ 1, 6, 11, 16, 21, 23])
>>> # each with 12, 0, 0, 0, 4 elements respectively.
>>> print(buckets.bucket_sizes)
tensor([12, 0, 0, 0, 4])
>>> sizes = torch.arange(20)
>>> # min_bucket_count is used to control bucket size
>>> buckets = create_buckets(sizes, max_width=10, min_bucket_count=5)
>>> print(buckets.bucket_boundaries)
tensor([ 0, 5, 10, 15, 20])
>>> print(buckets.bucket_sizes)
tensor([5, 5, 5, 5])
sampler
size_aware_batching
def size_aware_batching(
dataset: Iterable[Data],
sizeof: Callable[[Data], Real],
max_total_size: Real,
collate_fn: Optional[Callable[[Iterable[Data]], BatchCollated]] = None,
info_logger: Optional[Callable[[str], None]] = None,
warn_logger: Optional[Callable[[str], None]] = None
) -> Iterator[Union[List[Data], BatchCollated]]
一个生成器,用于从可迭代对象中批量处理元素,同时确保每个批次的总大小不超过指定的最大值。此处的大小可以是批次中元素内存消耗的度量。这对于可索引数据或不可索引但可迭代的数据都很有用。
参数:
dataset
- 输入可迭代对象。sizeof
- 一个函数或映射,用于返回dataset
中每个元素的“大小”。例如,这可以用于确定元素消耗多少内存。其返回类型必须与max_total_size
可比较,并且必须可加(运算符+
)。max_total_size
- 每个批次的最大总“大小”。“大小”的语义由sizeof
参数定义。此值的类型必须与 sizeof 的返回类型可比较,即,运算符<
和==
必须有意义。collate_fn
- 用于整理批次的可选函数。默认为 None,在这种情况下,每个批次都是来自输入数据集的元素列表info_logger
- 用于记录信息的函数。默认为 None。warn_logger
- 用于记录警告的函数。默认为 None。
产生:
一个生成器,用于从 dataset
产生批次。
假设 1. 线性复杂度。此函数消耗给定数据可迭代对象 (dataset
) 一次,方法是逐个遍历数据项以构建批次,并在将下一个数据项添加到批次将超过 max_total_size
或批次是最后一个批次(迭代结束)时立即产生批次。2. 可加性大小测量。对于构建具有批次内存消耗阈值的小批量数据的通用用例,它假定批次的大小是批次中所有元素的总和(可加性属性)。3. max_total_size
和 sizeof
返回的可比较类型。sizeof
的返回值必须与 max_total_size
进行比较,以阈值化批次的大小
注意事项 - 1
- 生成的批次大小可能具有较大差异 - 如何解决:使用批次大小阈值过滤此生成器的输出 - 2
- 不同 epoch 之间的批次数量可能差异很大。 - 如何解决:增加构成一个 epoch 的步数,例如,在 Lightning 训练/验证循环中,这有效地增加了每个 epoch 的输入数据集大小
示例
>>> import torch
>>> from torch.utils.data import default_collate
>>> from bionemo.size_aware_batching.sampler import size_aware_batching
>>> # Define a sample dataset with torch.tensor
>>> dataset = [torch.tensor([1, 2]), torch.tensor([3, 4]), torch.tensor([5, 6]),
... torch.tensor([7, 8]), torch.tensor([9, 10])]
>>> # Define a sizeof function that returns the size of each tensor
>>> def sizeof(x):
... return x.numel()
>>> # Create a generator with max_total_size=4 and default_collate_fn
>>> gen = size_aware_batching(dataset, sizeof, 4, collate_fn=default_collate)
>>> batches = list(gen)
>>> print(batches)
[tensor([[1, 2], [3, 4]]), tensor([[5, 6], [7, 8]]), tensor([[9, 10]])]
SizeAwareBatchSampler 对象
class SizeAwareBatchSampler(Sampler[List[int]])
一个采样器,用于批量处理大小不同的元素,同时确保每个批次的总大小不超过指定的最大值。
当处理每个元素具有不同大小的数据集(例如,长度不同的图或序列)时,这非常有用。采样器使用提供的 sizeof
函数来确定数据集中每个元素的大小,并确保每个批次的总大小不超过指定的 max_total_size
。
示例:
>>> import torch
>>> from bionemo.size_aware_batching.sampler import SizeAwareBatchSampler
>>> # Define a sample dataset with torch.tensor
>>> dataset = [torch.tensor([1, 2]), torch.tensor([3, 4]), torch.tensor([5, 6]),
... torch.tensor([7, 8]), torch.tensor([9, 10])]
>>> # Define a function that returns the size of each element in the dataset.
>>> def sizeof(index):
... return dataset[index].numel()
>>> # Create a SizeAwareBatchSampler with a maximum total batch size of 10.
>>> batch_sampler = SizeAwareBatchSampler(
... sampler=torch.utils.data.SequentialSampler(dataset),
... sizeof=sizeof,
... max_total_size=4
... )
>>> # Iterate over batches of indices that do not exceed the maximum total size.
>>> print(list(batch_sampler))
[[0, 1], [2, 3], [4]]
__init__
def __init__(sampler: Union[Sampler[List[int]], Iterable[int]],
sizeof: Callable[[int], Real],
max_total_size: Real,
info_logger: Optional[Callable[[str], None]] = None,
warn_logger: Optional[Callable[[str], None]] = None) -> None
初始化 SizeAwareBatchSampler。
参数:
sampler
- 底层采样器。sizeof
- 一个函数,用于返回每个索引的大小。例如,这可以用于确定元素消耗多少内存。其返回类型必须与max_total_size
可比较,并且必须可加(运算符+
)。max_total_size
- 小批量数据的最大总大小。“大小”的语义由sizeof
参数定义。此值的类型必须与 sizeof 的返回类型可比较,即,运算符<
和==
必须有意义。info_logger
- 用于记录信息的函数。默认为 None。warn_logger
- 用于记录警告的函数。默认为 None。
引发:
TypeError
- 如果 sampler 不是 Sampler 或 Iterable 的实例,或者如果 sizeof 不是可调用对象、字典或序列容器。ValueError
- 如果 max_total_size 不是正数。
__iter__
def __iter__() -> Iterator[List[int]]
迭代索引批次。
此函数产生不超过最大总大小的索引批次。
产生:
一个不超过最大总大小的索引批次。
BucketBatchSampler 对象
class BucketBatchSampler(Sampler[List[int]])
一个批次采样器,用于创建具有来自每个预定义存储桶范围的元素大小的批次。
数据集的元素首先根据存储桶范围和元素大小分组到每个存储桶中。然后,为每个存储桶使用基本批次采样器来创建小批量数据。
存储桶范围由 bucket_boundaries
指定,它将首先在内部排序,并用于创建 len(bucket_boundaries) - 1
左闭右开区间。例如,如果 bucket_boundaries 张量为 [10, 5, 0, 16],则它将被排序为 [0, 5, 10, 16],并且将创建 3 个存储桶,范围为:[0, 5), [5, 10), [10, 16)。
基本批次采样器将通过传递每个存储桶中的元素索引作为数据源,并将 base_batch_sampler_shared_kwargs
和 base_batch_sampler_individual_kwargs
传递给指定为 base_batch_sampler_class
的基本批次采样器类的构造函数来创建。例如,base_batch_sampler_shared_kwargs = {'drop_last': True}
和 base_batch_sampler_individual_kwargs = {'batch_size': [8,10,12]}
将用于创建 3 个批次采样器,其中 drop_last=True 和 batch_size=8、10 和 12,并像 base_batch_sampler_class(bucket_element_indices[0], batch_size=8, drop_last=True)
这样初始化。
在 __iter__
方法中,如果 shuffle
为 True
,则每个存储桶中的元素索引将被混洗,并且每次随机选择一个存储桶以创建小批量数据。如果 shuffle
为 False
,则元素索引上没有混洗,并且存储桶按其间隔边界的升序选择。
此类用于创建用于训练或评估的同质数据批次,并减少对齐元素形状所必需的填充。
修改自 https://github.com/rssrwn/semla-flow/blob/main/semlaflow/data/util.py
示例:
>>> import torch
>>> from bionemo.size_aware_batching.sampler import BucketBatchSampler
>>> # Define the sizes for a dataset
>>> sizes = torch.arange(25)
>>> # Define bucket ranges
>>> bucket_boundaries = torch.tensor([0, 6, 15, 25])
>>> # Create a bucket batch sampler with torch.utils.data.BatchSampler as base batch sampler
>>> # As there are 3 buckets, there will be 3 base batch samplers with batch sizes 2, 3, and 5.
>>> batch_sampler = BucketBatchSampler(
sizes=sizes,
bucket_boundaries=bucket_boundaries,
base_batch_sampler_class=torch.utils.data.BatchSampler,
base_batch_sampler_shared_kwargs={'drop_last': False},
base_batch_sampler_individual_kwargs={'batch_size': [2,3,5]},
shuffle=False,
)
>>> # Iterate over batches of indices that lies in the same bucket and with different batch sizes.
>>> print(list(batch_sampler))
[[0, 1], [2, 3], [4, 5], [6, 7, 8], [9, 10, 11], [12, 13, 14], [15, 16, 17, 18, 19], [20, 21, 22, 23, 24]]
>>> # randomize the dataset and buckets
>>> batch_sampler = BucketBatchSampler(
sizes=sizes,
bucket_boundaries=bucket_boundaries,
base_batch_sampler_class=torch.utils.data.BatchSampler,
base_batch_sampler_shared_kwargs={'drop_last': False},
base_batch_sampler_individual_kwargs={'batch_size': [2,3,5]},
shuffle=True,
generator=torch.Generator().manual_seed(0),
)
>>> print(list(batch_sampler))
[[24, 17, 16, 22, 19], [2, 5], [12, 10, 11], [3, 0], [15, 18, 20, 21, 23], [7, 13, 6], [14, 9, 8], [1, 4]]
>>> print(list(batch_sampler))
[[14, 9, 13], [23, 16, 20, 21, 15], [5, 0], [8, 10, 11], [17, 24, 22, 18, 19], [12, 6, 7], [4, 2], [3, 1]]
>>> # Combine with SizeAwareBatchSampler to control the cost of each batch
>>> from bionemo.size_aware_batching.sampler import SizeAwareBatchSampler
>>> item_costs = sizes.tolist()
>>> def cost_of_element(index):
return item_costs[index]
>>> batch_sampler = BucketBatchSampler(
sizes=sizes,
bucket_boundaries=bucket_boundaries,
base_batch_sampler_class=SizeAwareBatchSampler,
base_batch_sampler_shared_kwargs={"sizeof": cost_of_element, "max_total_size": 40},
base_batch_sampler_individual_kwargs={},
shuffle=True,
generator=torch.Generator().manual_seed(0),
)
>>> print(list(iter(batch_sampler)))
[[24], [2, 5, 3, 0, 1, 4], [12, 10, 11, 7], [13, 6, 14], [17, 16], [22], [19, 15], [9, 8], [18, 20], [21], [23]]
__init__
def __init__(sizes: torch.Tensor,
bucket_boundaries: torch.Tensor,
base_batch_sampler_class: Type[Sampler],
base_batch_sampler_shared_kwargs: Optional[Dict[str, Any]] = None,
base_batch_sampler_individual_kwargs: Optional[Dict[
str, Iterable]] = None,
shuffle: Optional[bool] = True,
generator: Optional[torch.Generator] = None) -> None
初始化 BucketBatchSampler。
参数:
sizes
- 表示数据集中每个元素大小的实数 1D 张量。bucket_boundaries
- 表示存储桶范围边界的实数 1D 张量。它将首先排序,并用于创建len(bucket_boundaries) - 1
左闭右开区间作为存储桶范围。它不应包含任何重复值。base_batch_sampler_class
- 基本批次采样器类类型,将用于每个存储桶,并使用存储桶元素索引、base_batch_sampler_shared_kwargs
和相应的base_batch_sampler_individual_kwargs
初始化。base_batch_sampler_shared_kwargs
- 用于初始化所有存储桶的所有基本批次采样器的共享关键字参数字典。应为base_batch_sampler_class
提供具有base_batch_sampler_individual_kwargs
的足够且有效的参数。默认为 {}。base_batch_sampler_individual_kwargs
- 关键字参数字典,用于使用相应的键值对初始化每个存储桶批次采样器。此字典中每个值的长度必须等于 len(bucket_boundaries) - 1(存储桶的数量)。应为base_batch_sampler_class
提供具有base_batch_sampler_shared_kwargs
的足够且有效的参数。默认为 {}。shuffle
- 一个布尔值,指示是否混洗数据集和存储桶。默认为 True。generator
- 采样中使用的生成器。默认为 None。
引发:
ValueError
- 如果sizes
不是实数的 1D 张量。ValueError
- 如果bucket_boundaries
不是实数的 1D 张量。ValueError
- 如果base_batch_sampler_individual_kwargs
或base_batch_sampler_individual_kwargs
不是关键字参数字典。ValueError
- 如果base_batch_sampler_individual_kwargs
字典中值的长度必须等于 len(bucket_boundaries) - 1。RuntimeError
- 如果没有元素的大小在bucket_boundaries
指定的范围内。
__len__
def __len__() -> int
获取批次数量。
仅当 base_batch_sampler_class
实现了 len() 时才能调用
返回:
int
- 批次数量
__iter__
def __iter__() -> Iterator[List[int]]
迭代索引批次。
此函数产生来自每个存储桶范围的大小的元素的索引批次。
产生:
List[int]
- 来自每个存储桶范围的大小的元素的索引批次。