重要提示

您正在查看 NeMo 2.0 文档。此版本引入了 API 的重大更改和一个新的库 NeMo Run。我们目前正在将 NeMo 1.0 的所有功能移植到 2.0。有关先前版本或 2.0 中尚未提供的功能的文档,请参阅 NeMo 24.07 文档

NeMo 分布式检查点用户指南#

本指南详细介绍了来自 NeMo Megatron Core 的分布式检查点最佳实践。

简介#

Megatron Core 是一个基于 PyTorch 的开源库,提供了一系列 GPU 优化技术,包括各种并行性(数据并行、张量并行、流水线并行、上下文并行和专家并行)。NeMo 框架是一个端到端的 LLM 训练框架,构建于 Megatron Core 库之上。

在大型规模训练中,检查点用于定期保存中间模型状态(包括模型权重、优化器状态和其他必要的元数据)。这使得在训练过程被中断时可以轻松恢复。

NeMo 分布式检查点是 Megatron Core 库的一部分,指的是跨多个 GPU 或节点保存分布式训练作业的状态。这种方法旨在减少内存开销并提高 GPU 利用率。它还为用户提供了使用不同并行策略恢复训练的灵活性。

Megatron Core 库

Megatron Core 提供了一个检查点库,能够处理 LLM 训练中使用的所有类型的并行性。虽然分布式检查点库的目标是 Megatron Core 模型,但只要实现了适当的集成,它也可以与其他模型一起使用。

该库提供了两个主要入口点:dist_checkpointing.savedist_checkpointing.load,它们旨在替换常规检查点流程中的 torch.savetorch.load。除此之外,它还提供了一种机制来定义如何在全球检查点中组合和拆分不同类型的本地张量。

机制#

NeMo 分布式检查点支持并行地从多个 rank 保存和加载模型。它采用了一种称为完全并行保存 (FPS) 的新策略,将优化器状态、梯度和模型参数在所有 GPU rank 之间进行分区。当保存分布式优化器的检查点时,每个 DP rank 都持有其优化器状态的分片,并独立地将其分片写入共享存储(梯度缓冲区)。

当加载检查点时,每个 DP rank 读取其对应的检查点文件(分片)以恢复。如果需要不同的并行策略(例如,张量并行、流水线并行),则每个 rank 还可以访问其他检查点文件以将数据传输到正确的位置。

NeMo 允许用户从使用不同张量和流水线并行度保存的检查点恢复训练,从而提供根据需要调整训练配置的灵活性。

下图说明了 NeMo 框架中的完全并行保存,利用数据并行副本在节点之间进行写入。

https://github.com/NVIDIA/NeMo/releases/download/v2.0.0/asset-nemo-dist-ckpt-explain-0.png

图 1. NeMo 框架中的完全并行保存使用数据并行副本在节点之间进行并行写入

下图说明了 NeMo 框架中的异步保存,其中检查点在后台保存,同时训练继续进行。异步并行保存允许模型参数首先复制到 CPU,然后在后台将检查点持久化到稳定存储。此过程最大限度地减少了对主训练的干扰,从而加快了分布式检查点过程。

https://github.com/NVIDIA/NeMo/releases/download/v2.0.0/asset-nemo-dist-ckpt-explain-1.png

图 2. NeMo 框架中的异步保存与训练并行,在后台保存检查点

参数调优#

您可以在 NeMo 预训练和微调作业中配置分布式检查点。

NeMo 1.0 YAML 配置文件NeMo 2.0 MegatronStrategy 中,您可以启用和调整这些参数。

最新的 NeMo 版本是 Nemo 2.0(NGC 容器 nvcr.io/nvidia/nemo:24.09)。

最佳实践#

以下是在 NeMo 中配置分布式检查点的最佳实践

dist_ckpt_format: 'torch_dist'

dist_ckpt_load_on_device: True

dist_ckpt_parallel_save: True

dist_ckpt_parallel_save_within_dp: False

dist_ckpt_parallel_load: True

dist_ckpt_torch_dist_multiproc: 2

dist_ckpt_assume_constant_structure: False

dist_ckpt_parallel_dist_opt: True

dist_ckpt_load_strictness: null

以下是检查点格式选项和相关参数的摘要

dist_ckpt_format#

用于保存的检查点格式。选项为 torch_distzarr。推荐格式为 PyTorch 分布式 (torch_dist)。保存格式可以与用于恢复作业的格式不同。加载格式会自动检测。

dist_ckpt_load_on_device#

确定是将检查点权重直接加载到 GPU 还是 CPU 上。如果为 True,则权重加载到 GPU 上。这目前仅影响 zarr 格式。

dist_ckpt_parallel_save#

每个 worker 写入其自己的分布式检查点部分,这意味着每个 DP rank 独立保存其检查点分片。这适用于模型权重或非分布式优化器状态。分布式优化器并行化由 dist_ckpt_parallel_dist_opt 标志控制(见下文)。

dist_ckpt_parallel_save_within_dp#

控制 NCCL 是否在数据并行域内并行化保存。如果为 False,则保存将在整个 world size(节点数 * GPU 数)上并行化。如果为 True,则保存仅在数据并行域内并行化。将此设置为 True 可以减少延迟,但在某些设置中可能会导致 NCCL 错误。

dist_ckpt_parallel_load#

每个 worker 加载分布式检查点的一部分,并使用 NCCL 与其交换,这意味着每个 DP rank 独立加载其检查点分片。这可能会使用额外的 GPU 内存,并且对于大型 DP 设置至关重要。如果为 True,则检查点仅从存储中读取一次;否则,模型权重部分将从存储中读取 DP 次。

dist_ckpt_torch_dist_multiproc#

在使用 torch_dist 格式保存检查点期间,每个 rank 使用的额外进程数。这等于每个 rank 创建的检查点文件数。增加此数字有助于饱和写入带宽。默认值为 2。

dist_ckpt_assume_constant_structure#

仅当状态字典结构在单个训练作业期间保持不变时(包括启动、数据加载、训练设置和实际训练),才设置为 True。这允许在检查点保存之间缓存一些计算,并可以减少从当前进程中的第三个检查点保存开始的保存时间。

dist_ckpt_parallel_dist_opt#

启用分布式优化器的并行保存/加载。设置为 True 可将优化器状态保存为可重新分片的格式(允许在恢复时更改 TP、PP 等)。设置为 False 可最大限度地减少检查点文件的数量。

dist_ckpt_load_strictness#

定义加载期间检查点键不匹配的行为。选项为 assume_ok_unexpected(默认值,尝试加载而不进行任何检查)、log_all(记录不匹配项)和 raise_all(引发不匹配项)。设置为 log_all 会导致将非严格状态字典加载到模型中。非默认选项可能会由于额外的存储交互而导致轻微的开销。建议首先将此标志设置为 raise_all 以检查预期的不匹配项。如果预期有不匹配项,请将其设置为 log_all 以忽略(但记录)它们。

基本分片#

定义普通本地 PyTorch 张量与其他 rank 上的张量之间关系的主要方法是将其包装在 ShardedTensor 类中。这表示给定的本地张量是给定形状和给定偏移量的较大张量网格的一部分。我们不是保存带有 torch.Tensor 的简单状态字典,而是保存带有 dist_checkpointing.ShardedTensor分片状态字典。

示例:假设我们有一个张量(由 128 个元素组成),它在整个 workload 中平均分配,我们希望使用不同数量的 rank 来保存和加载它。

from pathlib import Path

import torch

from megatron.core import dist_checkpointing

# Setup
ckpt_root = Path('/tmp/checkpoints')
native_ckpt_root = ckpt_root / 'native'
native_ckpt_root.mkdir(exist_ok=True, parents=True)
dist_ckpt_root = ckpt_root / 'dist_ckpt'
dist_ckpt_root.mkdir(exist_ok=True, parents=True)

torch.distributed.init_process_group()
world_size = torch.distributed.get_world_size()
rank = torch.distributed.get_rank()

# Local tensor to save
assert 128 % world_size == 0
num_elems_per_rank = 128 // world_size
local_ten = torch.arange(start=num_elems_per_rank * rank,
                         end=num_elems_per_rank * (rank + 1))

# Native checkpoint save
state_dict = {
    'weight': local_ten
}
torch.save(state_dict, native_ckpt_root / f'ckpt_{rank}.pt')

# Distributed checkpoint save
# `(0, rank, world_size)` describes that `weight` ShardedTensor is sharded into `world_size` pieces
# along the 0th dimension and `local_ten` is the shard at position `rank`.
# Together, all shards implicitly form a "global" `torch.arange(128)` tensor.
sharded_state_dict = {
    'weight': dist_checkpointing.ShardedTensor.from_rank_offsets('weight', local_ten, (0, rank, world_size))
}
dist_checkpointing.save(sharded_state_dict, dist_ckpt_root)

在加载期间,即使作业大小发生变化,也可以轻松读取分布式检查点(与需要相同数量 rank 的原生检查点相反)。相对于 torch.load 的主要区别在于,用户必须提供需要加载的分片状态字典的定义。

from pathlib import Path

import torch

from megatron.core import dist_checkpointing

ckpt_root = Path('/tmp/checkpoints')
dist_ckpt_root = ckpt_root / 'dist_ckpt'

torch.distributed.init_process_group()
world_size = torch.distributed.get_world_size()
rank = torch.distributed.get_rank()
assert 128 % world_size == 0
num_elems_per_rank = 128 // world_size

# Local tensor to load
local_ten = torch.empty(num_elems_per_rank)
sharded_state_dict = {
    'weight': dist_checkpointing.ShardedTensor.from_rank_offsets('weight', local_ten, (0, rank, world_size))
}
loaded_state_dict = dist_checkpointing.load(sharded_state_dict, dist_ckpt_root)
expected_local_ten = torch.arange(start=num_elems_per_rank * rank, end=num_elems_per_rank * (rank + 1))
assert torch.all(loaded_state_dict['weight'] == expected_local_ten)

# With torch.save and torch.load, we would have to load all files that contain
# parts of the desired tensor in new configuration and concatenate appropriate fragments.
# For some distributed checkpoint backends this is actually what happens underneath.

支持的实体#

分布式检查点库支持在不同配置中保存和加载不同的对象。

分片状态字典是(可能是嵌套的)Python 字典或列表,包含以下元素

  1. ShardedBase
    1. ShardedTensor

    2. ShardedObject

    3. ShardedTensorFactory

  2. LocalNonpersistentObject

  3. 任意对象

ShardedBase#

ShardedBase 是表达任何类型分片的基础类。每个分片实体都必须通过其 key 唯一标识,携带一些要保存或加载的 data,并定义 replica_id,这有助于识别数据冗余。

请注意,key 不必(通常也不)对应于状态字典中的键。状态字典中的键是临时的,而 ShardedTensor.key 用于标识检查点中的张量。

在以下示例中,要加载的状态字典包含与保存的状态字典不同的键。重要的是 ShardedTensor.key 是等效的 (tensor-A)

import torch

from megatron.core import dist_checkpointing

# Checkpoint saved with some key in the state dict that is eventually ignored
model = ...
ckpt_dir = ...
sharded_state_dict = {
    'ignored': dist_checkpointing.ShardedTensor('tensor-A', ...)
}
dist_checkpointing.save(sharded_state_dict, ckpt_dir)

# During loading, all that matters is the ShardedTensor.key.
sharded_state_dict = {
    'different-key': dist_checkpointing.ShardedTensor('tensor-A', ...)
}
loaded_state_dict = dist_checkpointing.load(sharded_state_dict, ckpt_dir)
assert 'ignored' not in loaded_state_dict
assert 'tensor-A' not in loaded_state_dict
assert isinstance(loaded_state_dict['different-key'], torch.Tensor)

# The key in the state dict is important only from the subsequent `model.load_state_dict`
# that usually happens after `dist_checkpointing.load` - the state dict must have
# the structure and keys corresponding to the model structure and submodule names.
model.load_state_dict(loaded_state_dict)

ShardedTensor#

ShardedTensor 是分布式检查点的主要用例 - 张量分片。它定义了 PyTorch 张量如何在 workload 中分布。有关 ShardedTensor 的更多详细信息,请参阅 张量转换 部分。

ShardedObject#

有时需要跨 rank 保存任意对象。ShardedObject 允许将这些对象结构化为具有固定 global_shape 的对象数组,并在特定 rank 上保存/加载数组的部分。

ShardedTensorFactory#

ShardedTensorFactory 类将张量转换推迟到实际保存它们时。工厂可以将张量扩展为任意子状态字典(包括上面列出的所有支持的实体)。张量转换 部分将解释这种推迟的必要性。

LocalNonpersistentObject#

LocalNonpersistentObject 是一个简单的包装器,指示用此类包装的对象应在加载期间最终出现在加载的状态字典中。在保存期间,此类对象将被忽略。

任意对象#

所有不同于字典、列表和上面列出的类的实例的对象都被视为“通用”对象。

在保存期间,传递给 dist_checkpointing.save 的分片状态字典中的所有此类对象都假定在 rank 之间重复。因此,它们仅由单个协调器 rank(rank 0)保存。

在加载期间,传递给 dist_checkpointing.load 的分片状态字典中的所有此类对象都会被简单地忽略 - 加载的状态字典仅包含实际保存在检查点中的“通用”对象。

入口点#

检查点保存和加载有几个有用的用户入口点。

dist_checkpointing.save#

dist_checkpointing.save 函数是检查点保存的唯一入口点。它需要一个分片状态字典来保存,以及用于处理不同实体的保存策略(有关详细说明,请参阅 保存和加载策略)。分片状态字典按以下方式处理(另请参阅 save 函数 文档

  1. 应用 ShardedTensorFactory。

  2. 从分片状态字典中提取并忽略 LocalNonPersistentObject。

  3. 提取 ShardedBase 对象。

  4. 所有其他对象都被视为“通用”对象,并根据通用策略保存(请参阅 保存和加载策略)。

  5. 从点 (3) 对象中提取所有 ShardedObject,并使用分片策略保存(请参阅 保存和加载策略)。

  6. 保存所有 ShardedTensor。

  7. 带有后端和版本元数据的 metadata.json 文件保存到检查点目录。

dist_checkpointing.load#

dist_checkpointing.load 函数是检查点加载的主要入口点。它需要一个分片状态字典(以便隐式定义本地张量和检查点张量之间的映射)和加载策略。实际上,通常可以将相同的分片状态字典用于保存和加载(用于加载的分片状态字典将仅包含具有未初始化数据的张量)。

当分片状态字典作为输入提供时,它将按以下方式处理(另请参阅 load 函数 文档

  1. 从检查点加载“通用”状态字典。这构成了结果状态字典的基础。

  2. 应用来自输入分片状态字典的 ShardedTensorFactory。

  3. 从输入分片状态字典中提取 LocalNonPersistentObject,解包并添加到结果状态字典。

  4. 从检查点提取 ShardedObject 并加载到结果状态字典中。

  5. 从检查点提取 ShardedTensor 并加载到结果状态字典中。

  6. 应用工厂合并(有关说明,请参阅 优化器)。

这将生成一个常规状态字典,其中包含纯张量,可以由应用程序进一步处理(通常意味着运行 model.load_state_dict(state_dict))。

dist_checkpointing.load_common_state_dict#

dist_checkpointing.load_common_state_dict 函数是一个入口点,允许仅加载检查点的“通用”部分。可以使用此方法加载大多数检查点配置和元数据,这允许跳过数据加载,以便就检查点配置、版本等做出决策。

dist_checkpointing.load_tensors_metadata#

dist_checkpointing.load_tensors_metadata 函数是一个入口点,允许从检查点读取所有 ShardedTensor 元数据,而无需加载任何数据。结果是一个分片状态字典,具有简单的分片(每个张量都分片为一个大的分片)。

dist_checkpointing.load_plain_tensors#

dist_checkpointing.load_plain_tensors 函数是一个入口点,允许读取存储在检查点中的分片张量,而无需任何分片(作为纯张量)。此函数只是 load_tensors_metadatasave 的组合。

保存和加载策略#

有多种方法可以将分片状态字典保存到序列化检查点中。它们可以由用户作为保存和加载策略提供(例如,如下所示的 TorchDistLoadShardedStrategyTorchDistSaveShardedStrategy)。

共有四种类型的策略

  1. ShardedTensor 的保存策略

  2. “通用”数据的保存策略

  3. ShardedTensor 的加载策略

  4. “通用”数据的加载策略

此外,ShardedObject 根据其功能 (can_handle_sharded_objects 属性) 使用“分片”或“通用”策略处理。

每个保存策略都与一个 backend 和一个 version 关联。每个加载策略可以与它可以加载的 backendversion 的多个值关联。对于给定的后端和版本,每个保存和加载策略的组合必须在功能上等效。策略是引入优化到保存和加载算法的主要方法,而无需更改检查点格式。

在以下示例中,“完全并行”包装器修改了保存和加载算法,但底层检查点格式(以及 backend)保持不变。它使 basic_save_loadfully_parallel_save_load 函数等效

from megatron.core import dist_checkpointing
from megatron.core.dist_checkpointing.strategies.torch import (
    TorchDistLoadShardedStrategy,
    TorchDistSaveShardedStrategy
)
from megatron.core.dist_checkpointing.strategies.fully_parallel import (
    FullyParallelLoadStrategyWrapper,
    FullyParallelSaveStrategyWrapper
)

# Base save and load strategies defining a regular (non-parallel) save
base_save_strategy = TorchDistSaveShardedStrategy('torch_dist', 1)
base_load_strategy = TorchDistLoadShardedStrategy()

def basic_save_load(sharded_state_dict, ckpt_dir):
    """ Save and load using some basic strategies. """
    dist_checkpointing.save(sharded_state_dict, ckpt_dir, base_save_strategy)
    return dist_checkpointing.load(sharded_state_dict, ckpt_dir, base_load_strategy)


def fully_parallel_save_load(sharded_state_dict):
    """ Save and load using basic strategies wrapped with parallelization strategies. """
    fully_parallel_save_strategy = FullyParallelSaveStrategyWrapper(base_save_strategy)
    # "fully parallel" wrapper modifies the saving strategy, but not the underlying format
    assert fully_parallel_save_strategy.backend == base_save_strategy.backend == 'torch_dist'
    fully_parallel_load_strategy = FullyParallelLoadStrategyWrapper(base_load_strategy)
    dist_checkpointing.save(sharded_state_dict, ckpt_dir, fully_parallel_save_strategy)
    return dist_checkpointing.load(sharded_state_dict, ckpt_dir, fully_parallel_load_strategy)

dist_checkpointing 包为某些分片后端提供了默认策略,因此只需指定元组 (backend, version) 作为保存策略就足够了。后端和版本存储在检查点内的 metadata.json 文件中,以便可以自动确定加载策略(前提是给定后端和版本存在默认加载策略)。

对于“分片”策略,当前默认支持的后端基于 PyTorch 分布式 格式(torch_dist 后端)和 Zarr 格式(zarr 后端)。此外,如上面的示例所示,提供了一些包装器,使其能够在整个 workload 中并行化保存和加载(假设存在一些数据重复)。

对于“通用”策略,目前唯一支持的是 torch,它将“通用”数据保存到 common.pt 文件中。

PyTorch 分布式#

PyTorch 分布式检查点格式使用 torch.distributed.checkpoint 包,以便将检查点序列化到存储。 Megatron Core 分片状态字典被转换为 torch.distributed.ShardedTensor,然后使用 torch.distributed.checkpoint 原语来序列化这些状态字典。 即使 Megatron Core 提供了几种保存优化,但底层检查点仍然可以使用原生的 PyTorch 加载方法 读取。 请注意,检查点仍然遵循 dist_checkpointing 包格式,并提供上面描述的额外的 common.ptmetadata.json 文件。

PyTorch 分布式是一种推荐的检查点格式。

Zarr#

基于 Zarr 的检查点格式使用 Zarr 库,以便将检查点序列化到存储。 此格式已弃用,建议过渡到 torch_dist 格式(使用此转换器脚本)。

优化器#

Optimizers 模块为用户提供了辅助工具,以简化为优化器状态构建 ShardedTensor。 定义模型参数的本地到分片张量映射的 ShardedTensor 应该重用于优化器状态,以避免代码重复。

为此,dist_checkpointing.optimizers.get_param_id_to_sharded_param_map 函数可以构建优化器参数 ID 和模型 ShardedTensor 之间的映射。 dist_checkpointing.optimizers.optim_state_to_sharding_state 函数或应用程序代码(对于非标准用例)可以使用此映射来构建带有 ShardedTensor 的优化器分片状态字典。 这应该支持大多数优化器情况,但其中一些可能需要自定义分片状态字典创建。 一个很好的例子是分布式优化器,它会展平参数 - 有关更多详细信息,请参见张量转换部分。

注意:为了重用模型 ShardedTensor 来创建优化器 ShardedTensor,模型ShardedTensor 必须包装模型参数,而不仅仅是张量(通过将 keep_vars=True 传递给模型 state_dict 函数可以获得包含模型参数的状态字典)。 否则,模型 ShardedTensor 和优化器状态之间的对应关系将无法重新创建。 这就是引入 ShardedTensorFactories 的原因 - 我们必须将原始模型参数注册为 ShardedTensorFactories.data,并将任何后续转换应用为工厂函数,以确保相同的转换可以应用于优化器状态。 即使模型参数转换很复杂,但在大多数情况下,优化器状态字典也很容易基于模型 ShardedTensor 和 ShardedTensorFactories 重新创建,例如 FP32Optimizer.sharded_state_dict 仅仅是两个通用的 get_param_id_to_sharded_param_mapoptim_state_to_sharding_state 函数调用,而与模型参数的复杂性无关。

张量转换#

ShardedTensor API 允许声明应在保存和加载期间执行的基本转换。

形状不匹配#

allow_shape_mismatch 标志放宽了加载期间匹配全局张量形状的要求。 额外的填充会根据不匹配的类型用零填充或剥离。 这对于像 embedding 这样的层很有用,这些层可能会根据并行性进行填充以提高性能。

展平#

flattened_range 属性声明 ShardedTensor.data 表示展平模型参数的切片。 这对应于分布式优化器中使用的转换,它会展平数据并沿数据并行域对其进行分片。

额外的展平在检查点重新分片期间带来了效率挑战。 由于展平是在全局张量分片成局部块网格后应用的,因此重新分片后加载需要访问不连续的数据片段。 resharding 模块中实现了针对此问题的示例解决方案,该方案涉及以与原始形状不同的全局形状保存展平的张量。

示例:对于全局张量 [[0, 1, 2, 3, 4, 5], [6, 7, 8, 9, 10, 11]],通过 TP(张量并行)在第二轴上进行分片,如果 TP=2,则局部碎片如下:

局部碎片

0

[[0, 1, 2], [6, 7, 8]]

1

[[3, 4, 5], [9, 10, 11]]

在展平并通过 DP=3(这将在 Megatron Core 分布式优化器中发生)进行分片后,结果局部碎片如下:

局部碎片

0

[0, 1]

2

[2, 6]

4

[7, 8]

1

[3, 4]

3

[5, 9]

5

[10, 11]

在通过 TP=6 分片,展平并通过 DP=1 分片后,结果局部碎片如下:

局部碎片

0

[0, 6]

1

[1, 7]

2

[2, 8]

3

[3, 9]

4

[4, 10]

5

[5, 11]

任意转换#

在保存和加载期间将任意转换应用于张量的方法是使用 ShardedTensorFactory。 它将这种转换定义为一个函数,该函数可以重新应用于任何 ShardedTensor(特别是表示优化器状态的 ShardedTensor)。 这种“构建”函数也与一个“合并”函数相关联,该函数可以在加载期间应用逆变换。

如果不需要处理优化器状态,则这种转换也可以在分片状态字典创建期间直接应用。 为了以一致的方式将这种转换应用于模型和优化器参数,有必要将它们编码为工厂函数(以原始模型参数作为 data 输入,以便优化器参数可以正确映射到模型 ShardedTensor)。

请注意,在支持分布式优化器情况下的展平时,实现某些转换可能具有挑战性或不可能。 例如,如果模型权重应该在检查点中转置,则几乎不可能实现能够转置展平且切片的张量的高性能工厂函数。 这是因为展平和切片应该在转置维度中发生。

应用程序集成#

dist_checkpointing 包提供了用于保存任意分布式检查点的所有通用机制。 从应用程序端唯一需要的是准备一个包含 ShardedTensor、ShardedObject 等的分片状态字典(表示应用程序采用的数据分片),并使用 dist_checkpointing.savedist_checkpointing.load 入口点作为 torch.savetorch.load 的替代品。

在 Megatron Core 中,分片状态字典准备已在 sharded_state_dict 方法中实现,该方法以可组合的方式创建分片状态字典。 对于其他应用程序(例如,具有更简单类型的支持并行性),可以应用从常规模型状态字典到分片状态字典的直接转换。

常见问题解答#

1. 问:使用 torch_dist 检查点格式的默认配置,每个 rank 创建两个文件。 例如,一个拥有 576 个 GPU 的集群,这会导致 1152 个文件。 这是预期的行为吗?

答:对于 torch_dist 检查点,这是预期的行为。

2. 问:写入检查点时,会创建检查点目录的两个相同副本。 例如,对于 Llama 70B,写入了两个文件夹,每个文件夹包含约 1.4TB 的数据。 这是预期的行为吗?

答:在 NeMo 中,这是预期的行为。 一个副本与最后一个检查点相关,而另一个副本与前 K 个检查点相关。

3. 问:在哪里可以找到关于 Megatron 二进制文件格式及其访问模式的详细信息?

4. 问:哪些 `dist_ckpt` 配置对于预训练和微调有效?

答:所有 dist_ckpt 配置对于预训练和微调都有效。 (请注意,NeMo 2.0 容器 24.09 尚不支持 dist_ckpt_load_strictness)。

5. 问:什么是 `-last` 检查点的解释?

答:-last 检查点是训练会话中的最终检查点。 它用于识别从中继续训练的最新检查点。

6. 问:save_top_k: 1 如何与 save_best_model 交互?

答:save_top_k 指定训练期间要保存的检查点数量。 save_best_model 标志确定是否根据监控指标(例如,验证损失或准确率)保存最佳模型。

– 如果 save_top_ksave_best_model=True:仅保留性能最佳的单个检查点。

– 如果 save_top_k>1save_best_model=True:NeMo 最多将保存 save_top_k 个检查点,并且始终保证包含最佳检查点(由监控指标确定)。

– 如果 save_best_model=False:NeMo 将仅保存前 K 个模型,而不会明确确保保留最佳模型。

7. 问:dist_ckpt_torch_dist_multiproc 如何影响 async_save=True 参数?

答:dist_ckpt_torch_dist_multiproc 通过定义每个 rank 的辅助进程数来控制分布式检查点,以加速检查点保存。 async_save=True 启用异步检查点,允许检查点进程在后台运行,而不会阻止主训练循环。 这两个参数可以正交使用。

8. 问:使用分布式融合 Adam 优化器或 Megatron Core 分布式优化器,预期的检查点保存时间是多少? 如何加速检查点保存?

答:推荐使用 Megatron Core 分布式优化器,它是 NeMo 2.0 中的默认设置。 使用 Megatron Core 分布式优化器(模型配置 mcore_distributed_optim),对于单个检查点,预期的保存时间应约为 1 秒。 使用来自 Apex 的分布式融合 Adam 优化器(模型配置 distributed_fused_adam),预期的保存时间应该更长,估计单个检查点大约需要 3 秒。

为了加速检查点保存,建议设置 dist_ckpt_assume_constant_structure=True

词汇表#

DP#

数据并行 (DP) 在多个 GPU 上复制模型。 数据批次均匀分布在 GPU 之间,数据并行 GPU 独立处理它们。 虽然计算工作负载有效地分布在 GPU 上,但需要 GPU 间通信以保持训练步骤之间模型副本的一致性。

TP#

张量并行 (TP) 是一种模型并行分区方法,它将单个层的参数张量分布到多个 GPU 上。 除了减少模型状态内存使用量外,它还可以节省激活内存,因为每个 GPU 张量的大小会缩小。 但是,缩小的每个 GPU 张量大小由于较小的每个 GPU 内核工作负载而增加了 CPU 开销。

PP#

流水线并行 (PP) 是一种将神经网络的连续层或段分配给不同 GPU 的技术。 这种划分允许每个 GPU 顺序处理网络的不同阶段。

分布式优化器#

分布式优化器是一种内存优化的数据并行部署方法。 它将优化器状态和高精度主参数分片到数据并行 GPU 上,而不是复制它们。 在参数优化器步骤中,每个数据并行 GPU 更新其参数分片。 由于每个 GPU 都需要自己的梯度分片,因此分布式优化器执行参数梯度的 reduce-scatter 而不是 all-reduce。 然后,更新后的参数分片在数据并行 GPU 之间进行 all-gather。 这种方法显着减少了大规模 LLM 训练的内存需求。 此外,当梯度的精度高于参数精度时,梯度 reduce-scatter 和参数 all-gather 的拆分执行可以减少总通信量。 这种拆分集体执行增加了总计算量,以与通信重叠,从而提高了重叠机会。

有关更多信息,请参阅 http://docs.nvda.net.cn/nemo-framework/user-guide/latest/nemotoolkit/features/parallelisms.html