pipeline_parallel 软件包
此软件包包含两种不同流水线并行调度的实现(一种不带交织,一种带交织,详情请参阅在 GPU 集群上使用 Megatron-LM 进行高效大规模语言模型训练),以及默认的非流水线调度。它还包含流水线阶段之间所需的点对点通信方法。
包含不同流水线并行调度中所需的各种点对点通信的实现(例如,recv_forward 和 recv_backward)。
- core.pipeline_parallel.p2p_communication.recv_backward(tensor_shape: Union[List[int], torch.Size], config: megatron.core.ModelParallelConfig) → torch.Tensor
从流水线中的下一个 rank 接收张量(反向接收)。
有关参数详细信息,请参阅 _communicate。
- core.pipeline_parallel.p2p_communication.recv_forward(tensor_shape: Union[List[int], torch.Size], config: megatron.core.ModelParallelConfig) → torch.Tensor
从流水线中的上一个 rank 接收张量(正向接收)。
有关参数详细信息,请参阅 _communicate。
- core.pipeline_parallel.p2p_communication.send_backward(input_tensor_grad: torch.Tensor, config: megatron.core.ModelParallelConfig) → None
将张量发送到流水线中的上一个 rank(反向发送)。
有关参数详细信息,请参阅 _communicate。
- core.pipeline_parallel.p2p_communication.send_backward_recv_backward(input_tensor_grad: torch.Tensor, recv_next: bool, tensor_shape: Union[List[int], torch.Size], config: megatron.core.ModelParallelConfig, overlap_p2p_comm: bool = False) → torch.Tensor
从流水线中的下一个 rank 批量接收并发送到上一个 rank。
有关参数详细信息,请参阅 _communicate。
- core.pipeline_parallel.p2p_communication.send_backward_recv_forward(input_tensor_grad: torch.Tensor, tensor_shape: Union[List[int], torch.Size], config: megatron.core.ModelParallelConfig) → torch.Tensor
与流水线中的上一个 rank 批量发送和接收。
有关参数详细信息,请参阅 _communicate。
- core.pipeline_parallel.p2p_communication.send_forward(output_tensor: torch.Tensor, config: megatron.core.ModelParallelConfig) → None
将张量发送到流水线中的下一个 rank(正向发送)。
有关参数详细信息,请参阅 _communicate。
- core.pipeline_parallel.p2p_communication.send_forward_backward_recv_forward_backward(output_tensor: torch.Tensor, input_tensor_grad: torch.Tensor, recv_prev: bool, recv_next: bool, tensor_shape: Union[List[int], torch.Size], config: megatron.core.ModelParallelConfig) → torch.Tensor
与流水线中的上一个和下一个 rank 批量发送和接收。
有关参数详细信息,请参阅 _communicate。
- core.pipeline_parallel.p2p_communication.send_forward_recv_backward(output_tensor: torch.Tensor, tensor_shape: Union[List[int], torch.Size], config: megatron.core.ModelParallelConfig) → torch.Tensor
与流水线中的下一个 rank 批量发送和接收。
有关参数详细信息,请参阅 _communicate。
- core.pipeline_parallel.p2p_communication.send_forward_recv_forward(output_tensor: torch.Tensor, recv_prev: bool, tensor_shape: Union[List[int], torch.Size], config: megatron.core.ModelParallelConfig, overlap_p2p_comm: bool = False) → torch.Tensor
从流水线中的上一个 rank 批量接收并发送到下一个 rank。
有关参数详细信息,请参阅 _communicate。
包含两种流水线并行调度的实现(用于带交织的流水线并行的 forward_backward_pipelining_with_interleaving,用于不带交织的流水线并行的 forward_backward_pipelining_without_interleaving)和默认的非流水线调度 (forward_backward_no_pipelining)。get_forward_backward_func 返回基于正在训练的配置使用的正确调度函数(例如,如果流水线并行大小为 1,则使用 forward_backward_no_pipelining)。
- core.pipeline_parallel.schedules.backward_step(input_tensor, output_tensor, output_tensor_grad, model_type, config)
通过传入的输出张量进行反向步骤。
如果是最后一个阶段,output_tensor_grad 为 None,否则为损失相对于阶段输出张量的梯度。
返回损失相对于输入张量的梯度(如果是第一个阶段,则为 None)。
- core.pipeline_parallel.schedules.check_first_val_step(first_val_step, forward_only, cond)
检查是否是第一个验证步骤。
- core.pipeline_parallel.schedules.clear_embedding_activation_buffer(config, model)
清除嵌入激活缓冲区。
- core.pipeline_parallel.schedules.custom_backward(output, grad_output)
直接调用 C++ autograd 引擎。
为了使“deallocate_output_tensor”(如上所述)优化工作,必须直接调用 C++ autograd 引擎,绕过 Pytorch 的 torch.autograd.backward。Pytorch 的“backward”检查输出和 grad 是否具有相同的形状,而 C++ 的“backward”则不检查。
- core.pipeline_parallel.schedules.deallocate_output_tensor(out, deallocate_pipeline_outputs=False)
伪释放(即,设置为标量)输出张量的“.data”字段。
此方法应在输出张量已发送到下一个流水线阶段后立即调用。此时,输出张量仅对其“.grad_fn”字段有用,而对其“.data”无用。
- core.pipeline_parallel.schedules.finish_embedding_wgrad_compute(config, embedding_module)
完成嵌入 wgrad 计算。
- core.pipeline_parallel.schedules.forward_backward_no_pipelining(*, forward_step_func, data_iterator: Union[Iterator, List[Iterator]], model: Union[torch.nn.Module, List[torch.nn.Module]], num_microbatches: int, seq_length: int, micro_batch_size: int, decoder_seq_length: Optional[int] = None, forward_only: bool = False, collect_non_loss_data: bool = False, first_val_step: Optional[bool] = None)
在没有流水线并行(没有阶段间通信)的情况下运行正向和反向传递。
返回包含损失的字典。
有关参数详细信息,请参阅 get_forward_backward_func()
- core.pipeline_parallel.schedules.forward_backward_pipelining_with_interleaving(*, forward_step_func, data_iterator: Union[Iterator, List[Iterator]], model: Union[torch.nn.Module, List[torch.nn.Module]], num_microbatches: int, seq_length: int, micro_batch_size: int, decoder_seq_length: Optional[int] = None, forward_only: bool = False, collect_non_loss_data: bool = False, first_val_step: Optional[bool] = None)
运行交织 1F1B 调度(模型拆分为模型块),并在流水线阶段之间进行必要的通信。
如果是最后一个阶段,则返回包含损失的字典,否则返回空字典。
- core.pipeline_parallel.schedules.forward_backward_pipelining_without_interleaving(*, forward_step_func, data_iterator: Union[Iterator, List[Iterator]], model: Union[torch.nn.Module, List[torch.nn.Module]], num_microbatches: int, seq_length: int, micro_batch_size: int, decoder_seq_length: Optional[int] = None, forward_only: bool = False, collect_non_loss_data: bool = False, first_val_step: Optional[bool] = None)
运行非交织 1F1B 调度,并在流水线阶段之间进行通信。如果是最后一个阶段,则返回包含损失的字典,否则返回空字典。
- core.pipeline_parallel.schedules.forward_step(forward_step_func, data_iterator, model, num_microbatches, input_tensor, forward_data_store, config, collect_non_loss_data=False, checkpoint_activations_microbatch=None, is_first_microbatch=False, current_microbatch=None, encoder_decoder_xattn=False)
传入模型的正向步骤。
如果是第一个阶段,则从 data_iterator 获取输入张量。否则,使用传入的 input_tensor。
- 参数
forward_step_func (可调用对象) –
模型的正向步骤函数,它将数据迭代器作为第一个参数,将模型作为第二个参数。此用户的正向步骤应输出两个元素的元组
-
- 来自正向步骤的输出对象。此输出对象需要是
-
张量或某种张量集合。对此对象的唯一硬性要求是它需要可以作为输入输入到第二个函数中。
-
- 用于(可选)减少来自正向步骤的输出的函数。此
-
可能是模型损失的缩减,可能是获取模型输出并重新格式化的函数,可能是仅传递模型输出的函数。此函数必须具有以下模式之一,并且根据模式的不同,内部会发生不同的事情
-
- 减少的损失和一些其他数据的元组。请注意,在这种情况下
-
第一个参数除以全局微批的数量,假设它是损失,以便损失作为步骤拆分的设备数量的函数是稳定的。
-
- 减少的损失、令牌数量和一些其他数据的三元组。这
-
与情况 (a) 类似,但损失进一步在批次中的令牌数量上取平均值。如果用户尚未在令牌数量上取平均值,则此模式很有用。
-
- 用户想要的任何任意数据(例如,张量字典、张量列表
-
在推理的情况下,张量列表等)。要触发情况 3,您需要指定 collect_non_loss_data=True,并且您可能还希望在调用父 forward_backward 函数时指定 forward_only=True。
-
-
data_iterator (迭代器) – 数据迭代器。
model (nn.Module) – 要在其上执行正向步骤的模型。
num_microbatches (int) – 微批的数量。
input_tensor (Tensor 或 list[Tensor]) – 正向步骤的输入张量。
forward_data_store (list) – 用于存储正向数据的列表。如果您为正向缩减函数的返回选择路径 2.a 或 2.b,则这将仅存储输出的最终维度,例如损失函数输出的元数据。如果您选择路径 2.c,则这将存储应用于模型输出的正向缩减函数的整个输出。
config (对象) – 配置对象。
collect_non_loss_data (bool, 可选) – 是否收集非损失数据。默认为 False。如果您想从模型正向收集任意输出(例如推理用例),则这是要使用的路径。默认为 False。
checkpoint_activations_microbatch (int, 可选) – 要检查点激活的微批。默认为 None。
is_first_microbatch (bool, 可选) – 是否是第一个微批。默认为 False。
current_microbatch (int, 可选) – 当前微批。默认为 None。
- 返回
来自正向步骤的输出对象。Tensor:令牌数量。
- 返回类型
Tensor 或 list[Tensor]
- core.pipeline_parallel.schedules.get_forward_backward_func()
检索给定 parallel_state 配置的适当 forward_backward 函数。
返回一个函数,该函数将根据全局 parallel_state 中的流水线模型并行世界大小和虚拟流水线模型并行世界大小执行模型的所有正向和反向传递。
请注意,如果使用序列并行性,则张量形状的序列长度分量将更新为 original_sequence_length / tensor_model_parallel_world_size。
返回的函数采用以下参数
- forward_step_func(必需):一个函数,它接受数据
迭代器和模型作为其参数,并返回模型的正向输出和损失函数。损失函数应接受一个 torch.Tensor 并返回一个 torch.Tensor 的损失和一个字符串 -> torch.Tensor 的字典。
第三个参数 checkpoint_activations_microbatch 指示应检查此微批的激活。此参数的 None 值表示应使用配置中的默认值。当使用 num_microbatches_with_partial_activation_checkpoints 时,将使用此参数。
例如
- def loss_func(loss_mask, output_tensor)
losses = output_tensor.float() loss_mask = loss_mask.view(-1).float() loss = torch.sum(losses.view(-1) * loss_mask) / loss_mask.sum()
# 减少损失以进行日志记录。averaged_loss = average_losses_across_data_parallel_group([loss])
return loss, {‘lm loss’: averaged_loss[0]}
- def forward_step(data_iterator, model)
data, loss_mask = next(data_iterator) output = model(data) return output, partial(loss_func, loss_mask)
forward_backward_func(forward_step_func=forward_step, …)
- data_iterator(必需):数据上的迭代器,将是
按原样传递给 forward_step_func。在交织流水线并行的情况下,预计是迭代器列表。
- model(必需):实际模型。在交织的情况下,预计是模块列表
流水线并行。必须是(可能包装的)megatron.core.models.MegatronModule。
- num_microbatches (int, 必需)
要遍历的微批数量
- seq_length (int, 必需):当前全局批次的序列长度。如果是双堆栈
transformer,这是编码器的序列长度。如果配置中的 variable_seq_lengths 为 True,则忽略此参数。否则,当前全局批次大小中的每个微批都必须使用此序列长度。
micro_batch_size (int, 必需):微批中的序列数量。
- decoder_seq_length (int, 可选):双堆栈中解码器的序列长度
transformer。对于单堆栈 transformer,忽略此参数。
forward_only(可选,默认 = False):仅执行正向步骤
collect_non_loss_data(可选,bool,默认=False):TODO
- first_val_step(bool,可选):验证阶段的第一步。由
Transformer Engine 模块使用,仅在第一个验证步骤中更新其 fp8 权重。
- core.pipeline_parallel.schedules.get_tensor_shapes(*, rank: int, model_type: megatron.core.enums.ModelType, seq_length: int, micro_batch_size: int, decoder_seq_length: int, config, encoder_decoder_xattn: bool)
确定正确的张量大小(基于 rank 相对于拆分 rank 的位置)和模型大小。如果模型解码器需要编码器的输出(通过交叉注意力),并且 rank 处于解码器阶段,则发送两个张量。第一个张量是解码器。第二个张量是编码器。如果模型具有编码器和解码器,并且 rank 位于边界,则发送一个张量。否则,发送一个张量。
- core.pipeline_parallel.schedules.recv_backward(tensor_shapes, config)
用于非交织调度的 p2p_communication.recv_backward 的包装器。
- core.pipeline_parallel.schedules.recv_forward(tensor_shapes, config)
用于非交织调度的 p2p_communication.recv_forward 的包装器。
- core.pipeline_parallel.schedules.send_backward(input_tensor_grads, tensor_shapes, config)
用于非交织调度的 p2p_communication.send_backward 的包装器。
- core.pipeline_parallel.schedules.send_backward_recv_forward(input_tensor_grads, tensor_shapes, config)
用于非交织调度的 p2p_communication.send_backward_recv_forward 的包装器。
- core.pipeline_parallel.schedules.send_forward(output_tensors, tensor_shapes, config)
用于非交织调度的 p2p_communication.send_forward 的包装器。
- core.pipeline_parallel.schedules.send_forward_recv_backward(output_tensors, tensor_shapes, config)
用于非交织调度的 p2p_communication.send_forward_recv_backward 的包装器。
- core.pipeline_parallel.schedules.set_current_microbatch(model, microbatch_id)
设置当前微批。