跳到内容

实用工具

基类:NamedTuple

用于存储桶边界和大小的容器。

属性

名称 类型 描述
bucket_boundaries 张量

包含所有桶边界的 1D 张量。

bucket_sizes 张量

每个桶中的元素数量。

源代码位于 bionemo/size_aware_batching/utils.py
30
31
32
33
34
35
36
37
38
39
class Buckets(NamedTuple):
    """A container for storing bucket boundaries and sizes.

    Attributes:
        bucket_boundaries (torch.Tensor): A 1D tensor with the boundaries of all the bucket.
        bucket_sizes (torch.Tensor): The number of elements in each bucket.
    """

    bucket_boundaries: torch.Tensor
    bucket_sizes: torch.Tensor

collect_cuda_peak_alloc(dataset, work, device, cleanup=None)

收集给定工作流程的 CUDA 峰值内存分配统计信息。

此函数迭代提供的数据集,将给定的特征函数应用于每个数据点,并记录此过程中的峰值 CUDA 内存分配。从数据点提取的特征与其相应的内存使用统计信息一起收集。

请注意,由于未初始化的数据(例如,内部 PyTorch 缓冲区),工作流程的最初几次迭代可能会导致较小的内存分配。因此,用户可能希望在分析结果时跳过这些初始数据点。

参数

名称 类型 描述 默认值
dataset Iterable[Data]

包含输入数据的可迭代对象。

必需
work Callable[[Data], Feature]

一个函数,它接受一个数据点并返回其对应的特征。这是主要计算发生和内存分配被跟踪的地方。

必需
device device

目标 Torch CUDA 设备。

必需
cleanup Optional[Callable[[], None]]

一个函数,在每次迭代后调用,以执行任何必要的清理。

返回值

类型 描述
Tuple[List[Feature], List[int]]

一个元组,包含收集的特征及其对应的内存使用统计信息。

Raises

类型 描述
ValueError

如果提供的设备不是 CUDA 设备。


示例

>>> import torch
>>> from bionemo.size_aware_batching.utils import collect_cuda_peak_alloc


>>> # prepare dataset, model and other components of a workflow
>>> # for which the user want to collect CUDA peak memory allocation statistics
>>> dataset, model, optimizer = ...
>>> # Set the target Torch CUDA device.
>>> device = torch.device("cuda:0")
>>> model = model.to(device)

>>> # Define a function that takes an element of the dataset as input and
>>> # do a training step
>>> def work(data):
...     # example body of a training loop
...     optimizer.zero_grad()
...     output = model(data.to(device))
...     loss = compute_loss(output)
...     loss.backward()
...     optimizer.step()
...     # extract the feature for later to be modeled or analyzed
...     return featurize(data)

>>> # can optionally use a cleanup function to release the references
>>> # hold during the work(). This cleanup function will be called
>>> # at the end of each step before garbage collection and memory allocations measurement
>>> def cleanup():
...     model.zero_grad(set_to_none=True)

>>> # Collect features (i.e., model outputs) and memory usage statistics for the workflow.
>>> features, alloc_peaks = collect_cuda_peak_alloc(
...     dataset=batches,
...     work=work,
...     device=device,
...     cleanup=cleanup,
... )


>>> # use features and alloc_peaks as needed, e.g., fit a model
>>> # that can use these statistics to predict memory usage
>>> memory_model = ...
>>> memory_model.fit(features, alloc_peaks)

源代码位于 bionemo/size_aware_batching/utils.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def collect_cuda_peak_alloc(
    dataset: Iterable[Data],
    work: Callable[[Data], Feature],
    device: torch.device,
    cleanup: Optional[Callable[[], None]] = None,
) -> Tuple[List[Feature], List[int]]:
    """Collects CUDA peak memory allocation statistics for a given workflow.

    This function iterates through the provided dataset, applies the given feature function to each data point,
    and records the peak CUDA memory allocation during this process. The features extracted from the data points
    are collected along with their corresponding memory usage statistics.

    Note that the first few iterations of the workflow might result in smaller memory allocations due to uninitialized
    data (e.g., internal PyTorch buffers). Therefore, users may want to skip these initial data points when analyzing the results.

    Args:
        dataset: An iterable containing the input data.
        work: A function that takes a data point and returns its corresponding feature. This is where
            the main computation happens and memory allocations are tracked.
        device: The target Torch CUDA device.
        cleanup: A function that is called after each iteration to perform any necessary cleanup.

    Returns:
        A tuple containing the collected features and their corresponding memory usage statistics.

    Raises:
        ValueError: If the provided device is not a CUDA device.

    -------

    Examples:
    ```python
    >>> import torch
    >>> from bionemo.size_aware_batching.utils import collect_cuda_peak_alloc


    >>> # prepare dataset, model and other components of a workflow
    >>> # for which the user want to collect CUDA peak memory allocation statistics
    >>> dataset, model, optimizer = ...
    >>> # Set the target Torch CUDA device.
    >>> device = torch.device("cuda:0")
    >>> model = model.to(device)

    >>> # Define a function that takes an element of the dataset as input and
    >>> # do a training step
    >>> def work(data):
    ...     # example body of a training loop
    ...     optimizer.zero_grad()
    ...     output = model(data.to(device))
    ...     loss = compute_loss(output)
    ...     loss.backward()
    ...     optimizer.step()
    ...     # extract the feature for later to be modeled or analyzed
    ...     return featurize(data)

    >>> # can optionally use a cleanup function to release the references
    >>> # hold during the work(). This cleanup function will be called
    >>> # at the end of each step before garbage collection and memory allocations measurement
    >>> def cleanup():
    ...     model.zero_grad(set_to_none=True)

    >>> # Collect features (i.e., model outputs) and memory usage statistics for the workflow.
    >>> features, alloc_peaks = collect_cuda_peak_alloc(
    ...     dataset=batches,
    ...     work=work,
    ...     device=device,
    ...     cleanup=cleanup,
    ... )


    >>> # use features and alloc_peaks as needed, e.g., fit a model
    >>> # that can use these statistics to predict memory usage
    >>> memory_model = ...
    >>> memory_model.fit(features, alloc_peaks)
    ```


    """
    if device.type != "cuda":
        raise ValueError("This function is intended for CUDA devices only.")

    features = []
    alloc_peaks = []

    for data in dataset:
        try:
            torch.cuda.reset_peak_memory_stats(device)
            feature = work(data)
            alloc_peak = torch.cuda.memory_stats(device)["allocated_bytes.all.peak"]
            alloc_peaks.append(alloc_peak)
            features.append(feature)
        except torch.cuda.OutOfMemoryError:
            print("Encounter CUDA out-of-memory error. Skipping sample", file=sys.stderr, flush=True)
            continue
        finally:
            # ensures cleanup is done next round even in case of exception
            del data
            if "feature" in locals():
                del feature
            if cleanup is not None:
                cleanup()
            gc.collect()
            torch.cuda.empty_cache()
            torch.cuda.reset_peak_memory_stats(device)
    return features, alloc_peaks

create_buckets(sizes, max_width, min_bucket_count)

为整数列表创建桶,预定义最大间隔宽度和最小桶计数。

它将返回一个命名元组,其中包含桶边界和实际桶大小。例如,torch.tensor([0, 5, 7]), torch.tensor([3,2]):指定 2 个桶:一个范围为 0<= sizes < 5,宽度=5,包含 3 个元素;另一个范围为 5 <= sizes < 7,宽度=2,包含 2 个元素。

参数

名称 类型 描述 默认值
sizes 张量

一个 1D 整数张量。

必需
max_width int

桶的最大宽度,应为正整数。

必需
min_bucket_count int

桶的最小计数,应为正整数。如果桶的宽度达到 max_width,则桶大小可能小于 min_bucket_count。

必需

Raises

类型 描述
ValueError

如果提供的 sizes 为空或不是整数。

ValueError

如果 max_width 不是正整数或 min_bucket_count 不是正整数。

返回值

类型 描述

一个命名元组,其中包含升序排列的桶边界和每个桶中的元素数量。


示例

>>> import torch
>>> from bionemo.size_aware_batching.utils import create_buckets

>>> sizes = torch.tensor([1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 22, 22, 22, 22])
>>> buckets = create_buckets(sizes, max_width=5, min_bucket_count=10)
>>> # 5 buckets: 1 <= sizes < 6, 6 <= sizes < 11, 11 <= sizes < 16, 16 <= sizes < 21, 21 <= sizes < 23
>>> print(buckets.bucket_boundaries)
tensor([ 1,  6, 11, 16, 21, 23])

>>> # each with 12, 0, 0, 0, 4 elements respectively.
>>> print(buckets.bucket_sizes)
tensor([12,  0,  0,  0,  4])

>>> sizes = torch.arange(20)
>>> # min_bucket_count is used to control bucket size
>>> buckets = create_buckets(sizes, max_width=10, min_bucket_count=5)
>>> print(buckets.bucket_boundaries)
tensor([ 0,  5, 10, 15, 20])

>>> print(buckets.bucket_sizes)
tensor([5, 5, 5, 5])

源代码位于 bionemo/size_aware_batching/utils.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def create_buckets(sizes: torch.Tensor, max_width: int, min_bucket_count: int) -> Buckets:
    """Create buckets for a list of integers with pre-defined maximal width of interval and minimal bucket count.

    It will return a named tuple containing the bucket boundaries and the actual bucket sizes.
    e.g. torch.tensor([0, 5, 7]), torch.tensor([3,2]): specifies 2 buckets: one with range 0<= sizes < 5, width=5 and 3 elements
    and the other one with range 5 <= sizes < 7, width=2 and 2 elements.


    Args:
        sizes: An 1D tensor of integers.
        max_width: The maximum width of a bucket, should be a positive integer.
        min_bucket_count: The minimum count of a bucket, should be a positive integer.
            Bucket size may be smaller than min_bucket_count if its width reaches max_width.

    Raises:
        ValueError: If the provided sizes is empty, or not integers.
        ValueError: If max_width is not a positive integer or min_bucket_count is not a positive integer.

    Returns:
        A namedtuple containing bucket boundaries in ascending order and the number of elements in each bucket.

    ---------

    Examples:
    ```python
    >>> import torch
    >>> from bionemo.size_aware_batching.utils import create_buckets

    >>> sizes = torch.tensor([1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 22, 22, 22, 22])
    >>> buckets = create_buckets(sizes, max_width=5, min_bucket_count=10)
    >>> # 5 buckets: 1 <= sizes < 6, 6 <= sizes < 11, 11 <= sizes < 16, 16 <= sizes < 21, 21 <= sizes < 23
    >>> print(buckets.bucket_boundaries)
    tensor([ 1,  6, 11, 16, 21, 23])

    >>> # each with 12, 0, 0, 0, 4 elements respectively.
    >>> print(buckets.bucket_sizes)
    tensor([12,  0,  0,  0,  4])

    >>> sizes = torch.arange(20)
    >>> # min_bucket_count is used to control bucket size
    >>> buckets = create_buckets(sizes, max_width=10, min_bucket_count=5)
    >>> print(buckets.bucket_boundaries)
    tensor([ 0,  5, 10, 15, 20])

    >>> print(buckets.bucket_sizes)
    tensor([5, 5, 5, 5])
    ```

    """
    if not torch.is_tensor(sizes):
        raise TypeError(f"sizes should be a torch tensor, but got sizes={sizes}")

    if sizes.ndim != 1:
        raise ValueError(f"sizes should be a 1D tensor, but got sizes with shape {sizes.shape}")

    if sizes.dtype not in TorchIntegerDataTypes:
        raise ValueError(f"sizes should contain only integers, but got sizes.dtype={sizes.dtype}")

    if len(sizes) == 0:
        raise ValueError("sizes should not be empty")

    if not isinstance(max_width, int) or max_width <= 0:
        raise ValueError(f"max_width should be a positive integer but got max_width={max_width}")

    if not isinstance(min_bucket_count, int) or min_bucket_count <= 0:
        raise ValueError(f"min_bucket_count should be a positive integer but got min_bucket_count={min_bucket_count}")

    unique_values, counts = torch.unique(sizes, return_counts=True, sorted=True)

    bucket_boundaries = [unique_values[0]]
    bucket_sizes = []
    start = 0
    end = 0
    upper_bound = unique_values[0] + 1
    bucket_count = 0

    while start < len(unique_values):
        while (
            end < len(unique_values)
            and bucket_count < min_bucket_count
            and unique_values[end] - bucket_boundaries[-1] < max_width
        ):
            bucket_count += counts[end]
            end += 1

        bucket_sizes.append(sum(counts[start:end]))
        if end == len(unique_values):
            upper_bound = unique_values[-1] + 1
        else:
            upper_bound = unique_values[end]

        # Adjust the end of the range to ensure that no width exceeds 'max_width'
        n_empty_buckets = (upper_bound - bucket_boundaries[-1]) // max_width
        if n_empty_buckets > 0:
            bucket_boundaries.extend(
                list(
                    range(
                        bucket_boundaries[-1] + max_width,
                        bucket_boundaries[-1] + max_width * (n_empty_buckets + 1),
                        max_width,
                    )
                )
            )
            bucket_sizes.extend([0] * (n_empty_buckets - 1))
        else:
            bucket_boundaries.append(upper_bound)

        start = end
        end = start + 1
        # index start may be out of bounds
        bucket_count = counts[start:end].sum()

    bucket_boundaries = torch.tensor(bucket_boundaries)
    bucket_sizes = torch.tensor(bucket_sizes)

    return Buckets(bucket_boundaries, bucket_sizes)