用户指南
以下指南是 Megatron Core 的简短入门指南。在其中,您将
在 2 个 GPU 上初始化 Megatron Core。
构建一个张量模型并行大小为 2,流水线并行大小为 1 的 GPT 模型
使用 Megatron Core 调度对其进行五次迭代的训练
使用分布式检查点格式保存模型
加载上面保存的模型。
注意: 以下示例已使用 Megatron Core 版本 0.8.0 和 NGC PyTorch 容器版本 24.02 进行测试。
环境设置
docker run --ipc=host --shm-size=512m --gpus 2 -it nvcr.io/nvidia/pytorch:24.02-py3
git clone https://github.com/NVIDIA/Megatron-LM.git && cd Megatron-LM
编写您的第一个训练循环
在以下步骤中,您将在 2 个 GPU 上创建一个跨张量(张量模型并行)分割的示例 GPT 模型,并使用我们在 Megatron Core 中创建的 MockGPT 数据集辅助类对其运行前向传递。
注意: 以下所有步骤都在 run_simple_mcore_train_loop.py 脚本中。
要运行 run_simple_mcore_train_loop.py
脚本
PYTHONPATH=$PYTHON_PATH:./megatron torchrun --nproc-per-node 2 examples/run_simple_mcore_train_loop.py
步骤 1 - 初始化分布式训练和模型并行设置
以下实用程序在调用时会初始化您的分布式设置。
import os
import torch
from megatron.core import parallel_state
def initialize_distributed(tensor_model_parallel_size = 1, pipeline_model_parallel_size = 1):
# Torch setup for distributed training
rank = int(os.environ['LOCAL_RANK'])
world_size = torch.cuda.device_count()
torch.cuda.set_device(rank)
torch.distributed.init_process_group(world_size=world_size, rank=rank)
# Megatron core distributed training initialization
parallel_state.initialize_model_parallel(tensor_model_parallel_size, pipeline_model_parallel_size)
步骤 2 - GPT 模型设置
在此步骤中,您将创建一个 GPT 模型。 有关可以传递到模型中的其他配置列表,请打开并查看 transformer_config.py。
from megatron.core.transformer.transformer_config import TransformerConfig
from megatron.core.models.gpt.gpt_model import GPTModel
from megatron.core.models.gpt.gpt_layer_specs import get_gpt_layer_local_spec
def model_provider():
"""Build the model."""
transformer_config = TransformerConfig(
num_layers=2,
hidden_size=12,
num_attention_heads=4,
use_cpu_initialization=True,
pipeline_dtype=torch.float32)
gpt_model = GPTModel(
config=transformer_config,
transformer_layer_spec=get_gpt_layer_local_spec(),
vocab_size=100,
max_sequence_length=64)
return gpt_model
步骤 3 - GPT 模拟数据集设置
在以下步骤中,您将探索模拟数据集实用程序。
要使用您的数据训练模型,请使用 gpt_dataset.py 中的 GPTDataset 类。
要查找有关 Megatron Core 数据管道的更多信息,请参阅 数据管道 readme.md。
import torch
from torch.utils.data import DataLoader
from megatron.core.datasets.blended_megatron_dataset_builder import BlendedMegatronDatasetBuilder
from megatron.core.datasets.gpt_dataset import GPTDatasetConfig, MockGPTDataset
from megatron.training.tokenizer.tokenizer import _NullTokenizer
from megatron.core.datasets.utils import compile_helpers
_SEQUENCE_LENGTH = 64
def get_train_data_iterator():
if torch.distributed.is_available() and torch.distributed.is_initialized():
if torch.distributed.get_rank() == 0:
compile_helpers()
torch.distributed.barrier()
else:
compile_helpers()
config = GPTDatasetConfig(
random_seed=0,
sequence_length=_SEQUENCE_LENGTH,
reset_position_ids=False,
reset_attention_mask=False,
eod_mask_loss=False,
tokenizer=_NullTokenizer(vocab_size=_SEQUENCE_LENGTH),
)
datasets = BlendedMegatronDatasetBuilder(
MockGPTDataset, [1000, None, None], lambda: True, config
).build()
train_dataloader = DataLoader(datasets[0], batch_size=8, shuffle=True)
train_iterator = iter(train_dataloader)
return train_iterator
步骤 4 - 前向步骤函数
Megatron Core 使用 schedules.py 运行模型。 定义一个前向步骤函数就足够了,该函数以数据迭代器和模型作为输入,并生成输出张量和损失函数作为输出。
from functools import partial
def forward_step_func(data_iterator, model):
def loss_func(loss_mask: torch.Tensor, output_tensor: torch.Tensor):
losses = output_tensor.float()
loss_mask = loss_mask.view(-1).float()
loss = torch.sum(losses.view(-1) * loss_mask) / loss_mask.sum()
# If you have data parallel reduce loss across data parallel groups.
# If pipeline parallel, loss computation is done only in last stage.
return loss, {'lm loss': loss}
data = next(data_iterator)
tokens = data['tokens'].to(device)
attention_mask = data['attention_mask'].to(device)
position_ids = data['position_ids'].to(device)
labels = data['labels'].to(device)
loss_mask = data['loss_mask'].to(device)
output_tensor = model(tokens, position_ids, attention_mask,
labels=labels)
return output_tensor, partial(loss_func, loss_mask)
步骤 5 - 加载和保存分布式检查点
Megatron Core 使用分布式检查点来加载和保存模型。 这使您可以灵活地在加载模型时将模型从一个模型并行设置转换为另一个模型并行设置。 例如,使用张量并行大小 2 训练的模型可以再次加载为张量模型并行大小 4,依此类推。
from megatron.core import dist_checkpointing
def save_distributed_checkpoint(checkpoint_path, gpt_model):
sharded_state_dict = gpt_model.sharded_state_dict(prefix='')
dist_checkpointing.save(sharded_state_dict=sharded_state_dict, checkpoint_dir=checkpoint_path)
def load_distributed_checkpoint(checkpoint_path, gpt_model):
sharded_state_dict=gpt_model.sharded_state_dict(prefix='')
checkpoint = dist_checkpointing.load(sharded_state_dict=sharded_state_dict, checkpoint_dir=checkpoint_path)
gpt_model.load_state_dict(checkpoint)
return gpt_model
步骤 6 - 主函数
以下代码片段是需要放入脚本中的主函数。 它运行模型 5 次迭代,保存模型,并加载数据模型。
from pathlib import Path
from torch.optim import Adam
from megatron.core.pipeline_parallel.schedules import get_forward_backward_func
from megatron.core.tensor_parallel.random import model_parallel_cuda_manual_seed
if __name__ == "__main__":
initialize_distributed(tensor_model_parallel_size=2, pipeline_model_parallel_size=1)
model_parallel_cuda_manual_seed(123)
gpt_model = model_provider()
device = torch.device("cuda")
gpt_model.to(device)
optim = Adam(gpt_model.parameters())
train_iterator = get_train_data_iterator()
forward_backward_func = get_forward_backward_func()
# Running the model for 5 iterations
for _ in range(5):
optim.zero_grad()
losses_reduced = forward_backward_func(
forward_step_func=forward_step_func,
data_iterator=train_iterator,
model=gpt_model,
num_microbatches=1,
seq_length=64,
micro_batch_size=8,
decoder_seq_length=64,
forward_only=False)
optim.step()
print(f'Losses reduced :{losses_reduced}')
# Saving the model
save_distributed_checkpoint(gpt_model=gpt_model, checkpoint_path='/workspace/ckpt')
# Loading the model
gpt_model = load_distributed_checkpoint(gpt_model=gpt_model, checkpoint_path='/workspace/ckpt')
gpt_model.to(device)
print('Successfully loaded the model')
进一步扩展
您在此处探索的示例是 Megatron Core 中的基本训练循环。 要查看更高级的示例,请探索 [pretrain_gpt.py]。 pretrain_gpt.py
具有更复杂的训练循环,其中包括以下内容和其他 Megatron Core 功能
流水线并行
上下文并行
Rope 嵌入
专家混合