Megatron Core 用户指南

用户指南

以下指南是 Megatron Core 的简短入门指南。在其中,您将

  • 在 2 个 GPU 上初始化 Megatron Core。

  • 构建一个张量模型并行大小为 2,流水线并行大小为 1 的 GPT 模型

  • 使用 Megatron Core 调度对其进行五次迭代的训练

  • 使用分布式检查点格式保存模型

  • 加载上面保存的模型。

注意: 以下示例已使用 Megatron Core 版本 0.8.0 和 NGC PyTorch 容器版本 24.02 进行测试。

环境设置

复制
已复制!
            

docker run --ipc=host --shm-size=512m --gpus 2 -it nvcr.io/nvidia/pytorch:24.02-py3 git clone https://github.com/NVIDIA/Megatron-LM.git && cd Megatron-LM


编写您的第一个训练循环

在以下步骤中,您将在 2 个 GPU 上创建一个跨张量(张量模型并行)分割的示例 GPT 模型,并使用我们在 Megatron Core 中创建的 MockGPT 数据集辅助类对其运行前向传递。

注意: 以下所有步骤都在 run_simple_mcore_train_loop.py 脚本中。

要运行 run_simple_mcore_train_loop.py 脚本

复制
已复制!
            

PYTHONPATH=$PYTHON_PATH:./megatron torchrun --nproc-per-node 2 examples/run_simple_mcore_train_loop.py


步骤 1 - 初始化分布式训练和模型并行设置

以下实用程序在调用时会初始化您的分布式设置。

复制
已复制!
            

import os import torch from megatron.core import parallel_state def initialize_distributed(tensor_model_parallel_size = 1, pipeline_model_parallel_size = 1): # Torch setup for distributed training rank = int(os.environ['LOCAL_RANK']) world_size = torch.cuda.device_count() torch.cuda.set_device(rank) torch.distributed.init_process_group(world_size=world_size, rank=rank) # Megatron core distributed training initialization parallel_state.initialize_model_parallel(tensor_model_parallel_size, pipeline_model_parallel_size)


步骤 2 - GPT 模型设置

在此步骤中,您将创建一个 GPT 模型。 有关可以传递到模型中的其他配置列表,请打开并查看 transformer_config.py

复制
已复制!
            

from megatron.core.transformer.transformer_config import TransformerConfig from megatron.core.models.gpt.gpt_model import GPTModel from megatron.core.models.gpt.gpt_layer_specs import get_gpt_layer_local_spec def model_provider(): """Build the model.""" transformer_config = TransformerConfig( num_layers=2, hidden_size=12, num_attention_heads=4, use_cpu_initialization=True, pipeline_dtype=torch.float32) gpt_model = GPTModel( config=transformer_config, transformer_layer_spec=get_gpt_layer_local_spec(), vocab_size=100, max_sequence_length=64) return gpt_model


步骤 3 - GPT 模拟数据集设置

在以下步骤中,您将探索模拟数据集实用程序。

  • 要使用您的数据训练模型,请使用 gpt_dataset.py 中的 GPTDataset 类。

  • 要查找有关 Megatron Core 数据管道的更多信息,请参阅 数据管道 readme.md

复制
已复制!
            

import torch from torch.utils.data import DataLoader from megatron.core.datasets.blended_megatron_dataset_builder import BlendedMegatronDatasetBuilder from megatron.core.datasets.gpt_dataset import GPTDatasetConfig, MockGPTDataset from megatron.training.tokenizer.tokenizer import _NullTokenizer from megatron.core.datasets.utils import compile_helpers _SEQUENCE_LENGTH = 64 def get_train_data_iterator(): if torch.distributed.is_available() and torch.distributed.is_initialized(): if torch.distributed.get_rank() == 0: compile_helpers() torch.distributed.barrier() else: compile_helpers() config = GPTDatasetConfig( random_seed=0, sequence_length=_SEQUENCE_LENGTH, reset_position_ids=False, reset_attention_mask=False, eod_mask_loss=False, tokenizer=_NullTokenizer(vocab_size=_SEQUENCE_LENGTH), ) datasets = BlendedMegatronDatasetBuilder( MockGPTDataset, [1000, None, None], lambda: True, config ).build() train_dataloader = DataLoader(datasets[0], batch_size=8, shuffle=True) train_iterator = iter(train_dataloader) return train_iterator


步骤 4 - 前向步骤函数

Megatron Core 使用 schedules.py 运行模型。 定义一个前向步骤函数就足够了,该函数以数据迭代器和模型作为输入,并生成输出张量和损失函数作为输出。

复制
已复制!
            

from functools import partial def forward_step_func(data_iterator, model): def loss_func(loss_mask: torch.Tensor, output_tensor: torch.Tensor): losses = output_tensor.float() loss_mask = loss_mask.view(-1).float() loss = torch.sum(losses.view(-1) * loss_mask) / loss_mask.sum() # If you have data parallel reduce loss across data parallel groups. # If pipeline parallel, loss computation is done only in last stage. return loss, {'lm loss': loss} data = next(data_iterator) tokens = data['tokens'].to(device) attention_mask = data['attention_mask'].to(device) position_ids = data['position_ids'].to(device) labels = data['labels'].to(device) loss_mask = data['loss_mask'].to(device) output_tensor = model(tokens, position_ids, attention_mask, labels=labels) return output_tensor, partial(loss_func, loss_mask)


步骤 5 - 加载和保存分布式检查点

Megatron Core 使用分布式检查点来加载和保存模型。 这使您可以灵活地在加载模型时将模型从一个模型并行设置转换为另一个模型并行设置。 例如,使用张量并行大小 2 训练的模型可以再次加载为张量模型并行大小 4,依此类推。

复制
已复制!
            

from megatron.core import dist_checkpointing def save_distributed_checkpoint(checkpoint_path, gpt_model): sharded_state_dict = gpt_model.sharded_state_dict(prefix='') dist_checkpointing.save(sharded_state_dict=sharded_state_dict, checkpoint_dir=checkpoint_path) def load_distributed_checkpoint(checkpoint_path, gpt_model): sharded_state_dict=gpt_model.sharded_state_dict(prefix='') checkpoint = dist_checkpointing.load(sharded_state_dict=sharded_state_dict, checkpoint_dir=checkpoint_path) gpt_model.load_state_dict(checkpoint) return gpt_model


步骤 6 - 主函数

以下代码片段是需要放入脚本中的主函数。 它运行模型 5 次迭代,保存模型,并加载数据模型。

复制
已复制!
            

from pathlib import Path from torch.optim import Adam from megatron.core.pipeline_parallel.schedules import get_forward_backward_func from megatron.core.tensor_parallel.random import model_parallel_cuda_manual_seed if __name__ == "__main__": initialize_distributed(tensor_model_parallel_size=2, pipeline_model_parallel_size=1) model_parallel_cuda_manual_seed(123) gpt_model = model_provider() device = torch.device("cuda") gpt_model.to(device) optim = Adam(gpt_model.parameters()) train_iterator = get_train_data_iterator() forward_backward_func = get_forward_backward_func() # Running the model for 5 iterations for _ in range(5): optim.zero_grad() losses_reduced = forward_backward_func( forward_step_func=forward_step_func, data_iterator=train_iterator, model=gpt_model, num_microbatches=1, seq_length=64, micro_batch_size=8, decoder_seq_length=64, forward_only=False) optim.step() print(f'Losses reduced :{losses_reduced}') # Saving the model save_distributed_checkpoint(gpt_model=gpt_model, checkpoint_path='/workspace/ckpt') # Loading the model gpt_model = load_distributed_checkpoint(gpt_model=gpt_model, checkpoint_path='/workspace/ckpt') gpt_model.to(device) print('Successfully loaded the model')


进一步扩展

您在此处探索的示例是 Megatron Core 中的基本训练循环。 要查看更高级的示例,请探索 [pretrain_gpt.py]。 pretrain_gpt.py 具有更复杂的训练循环,其中包括以下内容和其他 Megatron Core 功能

  • 流水线并行

  • 上下文并行

  • Rope 嵌入

  • 专家混合

Previous Megatron Core 用户指南
Next API 指南
© 版权所有 2022-2025,NVIDIA。 上次更新于 2025 年 1 月 14 日。