跳到内容

Lightning

DataStep = Callable[[Iterator[DataT]], DataT] module-attribute

将单个示例的迭代器批处理在一起。

对于与 Megatron 的兼容性是必要的。此函数类型类似于 PyTorch 的整理函数。

DataStep 函数接受单个示例的迭代器。每个示例可以是张量、张量序列或一组命名的张量(以 dict 形式提供,将 str 名称映射到每个 Tensor)。每次迭代都必须产生相同的类型。

此函数的输出将反映每个生成示例的相同结构。它将是迭代器中所有示例的串联。

ForwardStep = Callable[[MegatronModelType, DataT], DataT] module-attribute

与 Megatron 兼容的前向传递函数。

BionemoLightningModule

基类:Generic[MegatronModelType, MegatronLossType]LightningModuleIOMixinConnectorMixinLightningPassthroughPredictionMixin

用于 Megatron 模型的、可重用的 PyTorch Lightning 模块,与 NeMo 的约定兼容。

源代码位于 bionemo/llm/lightning.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
class BionemoLightningModule(
    Generic[MegatronModelType, MegatronLossType],
    pl.LightningModule,
    nlio.IOMixin,
    nlio.ConnectorMixin,
    LightningPassthroughPredictionMixin,
):
    """Reusable PyTorch Lightning module for Megatron models that is compatible with NeMo's conventions."""

    def __init__(
        self,
        config: BionemoTrainableModelConfig[MegatronModelType, MegatronLossType],
        forward_step: ForwardStep,
        data_step: DataStep,
        # TODO: Add transformer_layer_spec when we update mcore
        optimizer: MegatronOptimizerModule,
        model_transform: Optional[Callable[[MegatronModelType], MegatronModelType]] = None,
        **model_construct_args,
    ) -> None:
        """Constructor.

        Args:
            config: Serializable configuration object that allows one to construct a new model instance and loss
                function. Necessary for Megatron-based training as the model itself cannot be serialized and
                distributed to nodes. Instead, we serialize the procedure for making the model and distribute that.
            forward_step: Performs forward pass using the model and a batch of data.
            data_step: Custom batch-creating function for the model.
            optimizer: Megatron-compatible distributed optimizer instance. Defaults to using ADAM with a 1e-4 learning
                rate.
            model_construct_args: Optional. Any arguments necessary to construct the model in the `config`'s
                `configure_model` method.
            model_transform: Optional. The model transform function.
            **model_construct_args: Optional. Arguments necessary for the supplied model configuration's
                `configure_model` method, which will make an instance of the model.
        """
        super().__init__()
        self.config = config
        self.module_construct_args: Optional[dict[str, Any]] = model_construct_args
        # ***must** be set up in configure_model() -- megatron constraint
        # also, must be called `module`: nemo expects the actual model to be stored this way
        self.module: Optional[MegatronModelType] = None
        self.loss_reduction_class: type[MegatronLossType] = config.get_loss_reduction_class()
        self.optim = optimizer
        self.optim.connect(self)  # This will bind the `configure_optimizers` method
        self._data_step = data_step
        self._forward_step = forward_step
        self.model_transform = model_transform

    def configure_model(self) -> None:
        """Updates internal state: instantiates the model from the object's config, assigns to `model` attribute.

        NOTE: this method is idempotent; successive calls have no effect. The model is only initialized once.

        Raises:
            ValueError iff the internal config's configure_model method returns None.
        """
        if self.module is None:
            model: MegatronModelType = (
                self.config.configure_model(**self.module_construct_args)
                if self.module_construct_args is not None
                else self.config.configure_model()
            )
            self.module = model
        if self.module is None:
            raise ValueError("Invalid semantics: configure_model method **MUST** initialize the model.")

    def forward(self, *args, **kwargs) -> DataT:
        """Call the forward method of the underlying model, and return whatever it outputs."""
        # safe to do because configure_model is idempotent
        self.configure_model()
        assert self.module is not None, "ERROR: configure_model() method has been incorrectly overridden!"
        prediction = self.module(*args, **kwargs)  # for now just pass through to the underlying model
        return prediction

    def data_step(self, dataloader_iter: Iterator[DataT]) -> DataT:  # noqa: D102
        return self._data_step(dataloader_iter)

    def forward_step(self, batch) -> Tensor:
        """Megatron-required: the training forward step for the model, which is required to produce the loss.

        Normally, the forward pass of a model means its inference. Loss is computed using the predictions
        from the forward pass against labels. Megatron unfortunately conflates these two different concepts
        and instead has models "forward" method produce the loss. See the Megatron docs for details:
        https://github.com/NVIDIA/Megatron-LM/blob/main/megatron/core/pipeline_parallel/schedules.py#L170

        To get actual predictions, use the :func:`forward` method instead.
        """
        # safe to do because configure_model is idempotent
        self.configure_model()
        assert self.module is not None
        return self._forward_step(self.module, batch)

    def training_step(self, batch, batch_idx: Optional[int] = None) -> Tensor:
        """In mcore the loss-function is part of the forward-pass when labels are provided."""
        return self.forward_step(batch)

    def validation_step(self, batch, batch_idx: Optional[int] = None) -> Tensor:
        """In mcore the loss-function is part of the forward-pass when labels are provided."""
        return self.forward_step(batch)

    def predict_step(self, batch, batch_idx: Optional[int] = None) -> Tensor:
        """Alias for forward_step."""
        if len(batch) == 0:
            return
        return self.forward_step(batch)

    def training_loss_reduction(self) -> MegatronLossType:
        """This is the function that takes batch['loss_mask'] and the logits output by the model and reduces the loss."""
        return self.loss_reduction_class()

    def validation_loss_reduction(self) -> MegatronLossType:  # noqa: D102
        return self.loss_reduction_class(validation_step=True)

    def test_loss_reduction(self) -> MegatronLossType:  # noqa: D102
        return self.loss_reduction_class(validation_step=True)

__init__(config, forward_step, data_step, optimizer, model_transform=None, **model_construct_args)

构造函数。

参数

名称 类型 描述 默认值
config BionemoTrainableModelConfig[MegatronModelType, MegatronLossType]

可序列化配置对象,允许用户构造新的模型实例和损失函数。对于基于 Megatron 的训练是必要的,因为模型本身无法序列化并分发到节点。相反,我们序列化用于创建模型的过程并分发它。

必需
forward_step ForwardStep

使用模型和一批数据执行前向传递。

必需
data_step DataStep

模型的自定义批创建函数。

必需
optimizer MegatronOptimizerModule

与 Megatron 兼容的分布式优化器实例。默认为使用学习率为 1e-4 的 ADAM。

必需
model_construct_args

可选。在 configconfigure_model 方法中构造模型所需的任何参数。

{}
model_transform Optional[Callable[[MegatronModelType], MegatronModelType]]

可选。模型转换函数。

None
**model_construct_args

可选。提供的模型配置的 configure_model 方法所需的参数,该方法将创建模型实例。

{}
源代码位于 bionemo/llm/lightning.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def __init__(
    self,
    config: BionemoTrainableModelConfig[MegatronModelType, MegatronLossType],
    forward_step: ForwardStep,
    data_step: DataStep,
    # TODO: Add transformer_layer_spec when we update mcore
    optimizer: MegatronOptimizerModule,
    model_transform: Optional[Callable[[MegatronModelType], MegatronModelType]] = None,
    **model_construct_args,
) -> None:
    """Constructor.

    Args:
        config: Serializable configuration object that allows one to construct a new model instance and loss
            function. Necessary for Megatron-based training as the model itself cannot be serialized and
            distributed to nodes. Instead, we serialize the procedure for making the model and distribute that.
        forward_step: Performs forward pass using the model and a batch of data.
        data_step: Custom batch-creating function for the model.
        optimizer: Megatron-compatible distributed optimizer instance. Defaults to using ADAM with a 1e-4 learning
            rate.
        model_construct_args: Optional. Any arguments necessary to construct the model in the `config`'s
            `configure_model` method.
        model_transform: Optional. The model transform function.
        **model_construct_args: Optional. Arguments necessary for the supplied model configuration's
            `configure_model` method, which will make an instance of the model.
    """
    super().__init__()
    self.config = config
    self.module_construct_args: Optional[dict[str, Any]] = model_construct_args
    # ***must** be set up in configure_model() -- megatron constraint
    # also, must be called `module`: nemo expects the actual model to be stored this way
    self.module: Optional[MegatronModelType] = None
    self.loss_reduction_class: type[MegatronLossType] = config.get_loss_reduction_class()
    self.optim = optimizer
    self.optim.connect(self)  # This will bind the `configure_optimizers` method
    self._data_step = data_step
    self._forward_step = forward_step
    self.model_transform = model_transform

configure_model()

更新内部状态:从对象的配置实例化模型,并分配给 model 属性。

注意:此方法是幂等的;连续调用无效。模型仅初始化一次。

源代码位于 bionemo/llm/lightning.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
def configure_model(self) -> None:
    """Updates internal state: instantiates the model from the object's config, assigns to `model` attribute.

    NOTE: this method is idempotent; successive calls have no effect. The model is only initialized once.

    Raises:
        ValueError iff the internal config's configure_model method returns None.
    """
    if self.module is None:
        model: MegatronModelType = (
            self.config.configure_model(**self.module_construct_args)
            if self.module_construct_args is not None
            else self.config.configure_model()
        )
        self.module = model
    if self.module is None:
        raise ValueError("Invalid semantics: configure_model method **MUST** initialize the model.")

forward(*args, **kwargs)

调用底层模型的前向方法,并返回其输出。

源代码位于 bionemo/llm/lightning.py
279
280
281
282
283
284
285
def forward(self, *args, **kwargs) -> DataT:
    """Call the forward method of the underlying model, and return whatever it outputs."""
    # safe to do because configure_model is idempotent
    self.configure_model()
    assert self.module is not None, "ERROR: configure_model() method has been incorrectly overridden!"
    prediction = self.module(*args, **kwargs)  # for now just pass through to the underlying model
    return prediction

forward_step(batch)

Megatron 必需:模型的训练前向步骤,需要生成损失。

通常,模型的前向传递意味着其推理。损失是使用前向传递的预测结果与标签进行计算的。不幸的是,Megatron 混淆了这两个不同的概念,而是让模型的“forward”方法生成损失。有关详细信息,请参阅 Megatron 文档:https://github.com/NVIDIA/Megatron-LM/blob/main/megatron/core/pipeline_parallel/schedules.py#L170

要获得实际预测,请改用 :func:forward 方法。

源代码位于 bionemo/llm/lightning.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def forward_step(self, batch) -> Tensor:
    """Megatron-required: the training forward step for the model, which is required to produce the loss.

    Normally, the forward pass of a model means its inference. Loss is computed using the predictions
    from the forward pass against labels. Megatron unfortunately conflates these two different concepts
    and instead has models "forward" method produce the loss. See the Megatron docs for details:
    https://github.com/NVIDIA/Megatron-LM/blob/main/megatron/core/pipeline_parallel/schedules.py#L170

    To get actual predictions, use the :func:`forward` method instead.
    """
    # safe to do because configure_model is idempotent
    self.configure_model()
    assert self.module is not None
    return self._forward_step(self.module, batch)

predict_step(batch, batch_idx=None)

forward_step 的别名。

源代码位于 bionemo/llm/lightning.py
313
314
315
316
317
def predict_step(self, batch, batch_idx: Optional[int] = None) -> Tensor:
    """Alias for forward_step."""
    if len(batch) == 0:
        return
    return self.forward_step(batch)

training_loss_reduction()

此函数接受 batch['loss_mask'] 和模型输出的 logits,并减少损失。

源代码位于 bionemo/llm/lightning.py
319
320
321
def training_loss_reduction(self) -> MegatronLossType:
    """This is the function that takes batch['loss_mask'] and the logits output by the model and reduces the loss."""
    return self.loss_reduction_class()

training_step(batch, batch_idx=None)

在 mcore 中,当提供标签时,损失函数是前向传递的一部分。

源代码位于 bionemo/llm/lightning.py
305
306
307
def training_step(self, batch, batch_idx: Optional[int] = None) -> Tensor:
    """In mcore the loss-function is part of the forward-pass when labels are provided."""
    return self.forward_step(batch)

validation_step(batch, batch_idx=None)

在 mcore 中,当提供标签时,损失函数是前向传递的一部分。

源代码位于 bionemo/llm/lightning.py
309
310
311
def validation_step(self, batch, batch_idx: Optional[int] = None) -> Tensor:
    """In mcore the loss-function is part of the forward-pass when labels are provided."""
    return self.forward_step(batch)

LightningPassthroughPredictionMixin

一种混入,允许您的模型通过劫持 nemo 的损失减少机制在预测步骤中进行推理。

源代码位于 bionemo/llm/lightning.py
187
188
189
190
191
192
class LightningPassthroughPredictionMixin:
    """A mixin that allows your model to do inference on the predict step by hijacking nemo's loss reduction mechanism."""

    def predict_loss_reduction(self) -> PassthroughLossReduction:
        """For the predict step, pass through the forward pass output."""
        return PassthroughLossReduction()

predict_loss_reduction()

对于预测步骤,传递前向传递输出。

源代码位于 bionemo/llm/lightning.py
190
191
192
def predict_loss_reduction(self) -> PassthroughLossReduction:
    """For the predict step, pass through the forward pass output."""
    return PassthroughLossReduction()

PassthroughLossReduction

基类:MegatronLossReductionGeneric[DataT]

一种用于 nemo/megatron 执行推理的变通方法。

在 NeMo2.0 内部,前向步骤始终应返回损失减少类,而 forward 应返回损失。此类劫持该机制,而是将前向输出未经扰动地作为损失传递(以在预测步骤中启用推理),然后 reduce 方法用于将一批前向输出整理为单个批次。这支持模型前向输出为张量、字典、元组或张量列表。内部类型必须始终为张量

源代码位于 bionemo/llm/lightning.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
class PassthroughLossReduction(MegatronLossReduction, Generic[DataT]):
    """A workaround for nemo/megatron to perform inference.

    Internally in NeMo2.0 the forward step is always expected to return a loss reduction class, and forward is
    expected to return a loss. This class hijacks that mechanism to instead pass through the forward output unperturbed
    as the loss (to enable inference in the predict step), and then the reduce method is used to collate the batch of
    forward outputs into a single batch. This supports the model forward output being a tensor, dict, tuple, or list of
    tensors. The inner type _must always be a Tensor_.
    """

    def forward(self, batch: DataT, forward_out: DataT) -> Tuple[Tensor, DataT]:
        """Passes through the `forward_out` value as the 2nd tuple element.

        Args:
            batch: The batch of data that was passed through the model to generate output. NOTE: this value is ignored.
            forward_out: The output from your model's forward pass.

        Returns:
            A tuple containing the loss tensor (dummy in this case) and the forward output (unmodified).
        """
        return torch.zeros((1, 1)), forward_out

    def reduce(self, forward_out: List[DataT]) -> DataT:
        """Collates list of model's outputs into a single output."""
        return batch_collator(forward_out)

forward(batch, forward_out)

forward_out 值作为第二个元组元素传递。

参数

名称 类型 描述 默认值
batch DataT

已传递到模型以生成输出的数据批次。注意:此值将被忽略。

必需
forward_out DataT

模型前向传递的输出。

必需

返回

类型 描述
Tuple[Tensor, DataT]

包含损失张量(在本例中为虚拟)和前向输出(未修改)的元组。

源代码位于 bionemo/llm/lightning.py
170
171
172
173
174
175
176
177
178
179
180
def forward(self, batch: DataT, forward_out: DataT) -> Tuple[Tensor, DataT]:
    """Passes through the `forward_out` value as the 2nd tuple element.

    Args:
        batch: The batch of data that was passed through the model to generate output. NOTE: this value is ignored.
        forward_out: The output from your model's forward pass.

    Returns:
        A tuple containing the loss tensor (dummy in this case) and the forward output (unmodified).
    """
    return torch.zeros((1, 1)), forward_out

reduce(forward_out)

将模型输出列表整理为单个输出。

源代码位于 bionemo/llm/lightning.py
182
183
184
def reduce(self, forward_out: List[DataT]) -> DataT:
    """Collates list of model's outputs into a single output."""
    return batch_collator(forward_out)

PerplexityLoggingCallback

基类:CallbackCallbackMethods

Megatron 回调,用于记录验证中的困惑度,以及可选的训练中的困惑度。

NeMo2.0 检查回调是否为 {LightningModule,LightningDataModule,Callback} 的实例,但只有 megatron_hooks 有用。

源代码位于 bionemo/llm/lightning.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
class PerplexityLoggingCallback(pl.Callback, CallbackMethods):
    """Megatron Callback to log perplexity in validation and optionally training.

    NeMo2.0 checks whether a callback is an instance of {LightningModule,LightningDataModule,Callback} but only megatron_hooks are useful.
    """

    def __init__(self, log_train: bool = False, log_val: bool = True):
        """Initialize PerplexityLoggingCallback.

        Args:
            log_train: whether to log train perplexity. Defaults to False.
            log_val: whether to log validation perplexity. Defaults to True.
        """
        super().__init__()
        self.log_train = log_train
        self.log_val = log_val

    def _pad_to_max_length(
        self,
        microbatch_outputs: List[Dict[str, Dict[str, Tensor]]],
        key1: str,
        key2: str,
        pad_value: int = 0,
        seq_dim: int = 1,
        batch_dim: int = 0,
    ) -> Tensor:
        """Pad tensors to max length in microbatch_outputs."""
        assert seq_dim != batch_dim, "Forgot to set one of seq_dim, batch_dim, they are equal!"
        max_sequence_length: int = max(output[key1][key2].shape[seq_dim] for output in microbatch_outputs)

        tensors: List[Tensor] = []
        for microbatch_output in microbatch_outputs:
            tensor = microbatch_output[key1][key2]
            assert (
                tensor.dim() >= 2
            ), f"Tensor in microbatch_outputs must have at least 2 dimensions, but got {tensor.dim()} dimensions"
            pad_size = [(0, 0)] * tensor.dim()
            pad_size[seq_dim] = (0, max_sequence_length - tensor.shape[seq_dim])
            # Flatten pad size list for F.pad
            pad_size_flat = [item for sublist in reversed(pad_size) for item in sublist]
            tensors.append(
                torch.nn.functional.pad(  # padding reverse in order
                    tensor,
                    pad_size_flat,
                    mode="constant",
                    value=pad_value,
                )
            )

        return torch.cat(tensors, dim=batch_dim)  # concat on batch dim

    @override
    def on_megatron_reduce_microbatches_end(
        self,
        step: MegatronStep,
        microbatch_outputs: List[Any],
        loss_reduction: MegatronLossReduction,
        reduced: Tensor | dict[str, Tensor],
    ) -> None:
        """Log after MegatronReductionLoss.reduce is called.

        Expected microbatch_outputs to be a list of dicts with the following keys:
            - batch: dict of tensors with the following keys:
                - labels: [b s]
                - loss_mask: [b s]; 1 means included 0 means ignored
            - forward_out: dict of tensors with the following keys:
                - token_logits: [b s vocab]
        """
        if step.trainer.sanity_checking:  # skip sanity check
            return

        if step.trainer.training and not self.log_train:
            return

        if not parallel_state.is_pipeline_last_stage():
            return

        assert step.num_microbatches is not None, "num_microbatches must be initialized to non-None"
        assert step.num_microbatches > 0, "num_microbatches must be greater than 0"
        assert (
            len(microbatch_outputs) == step.num_microbatches
        ), "microbatch_outputs length does not match num_microbatches"
        labels = self._pad_to_max_length(microbatch_outputs, "batch", "labels", pad_value=-100)
        loss_mask = self._pad_to_max_length(microbatch_outputs, "batch", "loss_mask")
        token_logits = self._pad_to_max_length(
            microbatch_outputs, "forward_out", "token_logits", seq_dim=0, batch_dim=1
        )

        unreduced_token_loss = unreduced_token_loss_fn(
            token_logits.clone(),  # [s,b] as expected unreduced_token_loss_fn has inplace operation on token_logits
            labels.clone(),  # [b,s] as expected
        )  # [b s] is the return

        cp_size = parallel_state.get_context_parallel_world_size()
        if cp_size == 1:
            ppl = torch.exp((unreduced_token_loss * loss_mask).sum() / loss_mask.sum())
        else:
            raise NotImplementedError("Context parallel perplexity logging is not supported yet")

        if self.log_val and not step.trainer.training:
            step.pl_module.log("val_ppl", ppl, prog_bar=True, on_epoch=True)
        elif self.log_train and step.trainer.training:
            step.pl_module.log("train_ppl", ppl, prog_bar=True, batch_size=1, sync_dist=False)

__init__(log_train=False, log_val=True)

初始化 PerplexityLoggingCallback。

参数

名称 类型 描述 默认值
log_train bool

是否记录训练困惑度。默认为 False。

False
log_val bool

是否记录验证困惑度。默认为 True。

True
源代码位于 bionemo/llm/lightning.py
343
344
345
346
347
348
349
350
351
352
def __init__(self, log_train: bool = False, log_val: bool = True):
    """Initialize PerplexityLoggingCallback.

    Args:
        log_train: whether to log train perplexity. Defaults to False.
        log_val: whether to log validation perplexity. Defaults to True.
    """
    super().__init__()
    self.log_train = log_train
    self.log_val = log_val

on_megatron_reduce_microbatches_end(step, microbatch_outputs, loss_reduction, reduced)

在调用 MegatronReductionLoss.reduce 后记录。

预期的 microbatch_outputs 是带有以下键的字典列表
  • batch:带有以下键的张量字典
    • labels: [b s]
    • loss_mask: [b s];1 表示包含,0 表示忽略
  • forward_out:带有以下键的张量字典
    • token_logits: [b s vocab]
源代码位于 bionemo/llm/lightning.py
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
@override
def on_megatron_reduce_microbatches_end(
    self,
    step: MegatronStep,
    microbatch_outputs: List[Any],
    loss_reduction: MegatronLossReduction,
    reduced: Tensor | dict[str, Tensor],
) -> None:
    """Log after MegatronReductionLoss.reduce is called.

    Expected microbatch_outputs to be a list of dicts with the following keys:
        - batch: dict of tensors with the following keys:
            - labels: [b s]
            - loss_mask: [b s]; 1 means included 0 means ignored
        - forward_out: dict of tensors with the following keys:
            - token_logits: [b s vocab]
    """
    if step.trainer.sanity_checking:  # skip sanity check
        return

    if step.trainer.training and not self.log_train:
        return

    if not parallel_state.is_pipeline_last_stage():
        return

    assert step.num_microbatches is not None, "num_microbatches must be initialized to non-None"
    assert step.num_microbatches > 0, "num_microbatches must be greater than 0"
    assert (
        len(microbatch_outputs) == step.num_microbatches
    ), "microbatch_outputs length does not match num_microbatches"
    labels = self._pad_to_max_length(microbatch_outputs, "batch", "labels", pad_value=-100)
    loss_mask = self._pad_to_max_length(microbatch_outputs, "batch", "loss_mask")
    token_logits = self._pad_to_max_length(
        microbatch_outputs, "forward_out", "token_logits", seq_dim=0, batch_dim=1
    )

    unreduced_token_loss = unreduced_token_loss_fn(
        token_logits.clone(),  # [s,b] as expected unreduced_token_loss_fn has inplace operation on token_logits
        labels.clone(),  # [b,s] as expected
    )  # [b s] is the return

    cp_size = parallel_state.get_context_parallel_world_size()
    if cp_size == 1:
        ppl = torch.exp((unreduced_token_loss * loss_mask).sum() / loss_mask.sum())
    else:
        raise NotImplementedError("Context parallel perplexity logging is not supported yet")

    if self.log_val and not step.trainer.training:
        step.pl_module.log("val_ppl", ppl, prog_bar=True, on_epoch=True)
    elif self.log_train and step.trainer.training:
        step.pl_module.log("train_ppl", ppl, prog_bar=True, batch_size=1, sync_dist=False)

batch_collator(batches, batch_dim=0, batch_dim_key_defaults={'token_logits': 1})

接受一批序列并将它们整理成一个批次。

This is distinct from the standard pytorch default_collator since it does
not add the batch dimension, it's assumed the batch
dimension is already present in the input, as would be the case when
parallelizing across minibatches.

重要提示:底层数据原语必须是 torch Tensor。此函数的输入是递归类型,字典、元组和列表之间可以有任意数量的嵌套,只要内部类型是 n 维张量。

示例

外部容器 = 字典:[{'a': Tensor([1]), 'b': Tensor([2])}, {'a': Tensor([2]), 'b': Tensor([3])}] -> {'a': Tensor([1, 2]), 'b': Tensor([2, 3])} 外部容器 = 列表:[[Tensor([1]), Tensor([2])], [Tensor([2]), Tensor([3])]] -> [Tensor([1, 2]), Tensor([2, 3])] 外部容器 = 元组:([Tensor([1]), Tensor([2])], [Tensor([2]), Tensor([3])]) -> (Tensor([1, 2]), Tensor([2, 3]))

参数

名称 类型 描述 默认值
batches Optional[Sequence[ReductionT]]

要整理成一个批次的批次序列。

必需
batch_dim int

如果您知道要连接的批次的批处理维度不是第 0 维(例如,它是序列优先),请提供该维度。

0
batch_dim_key_defaults 键到整数的字典

如果您的批次是字典,并且您知道某些键具有非标准(0)批处理维度,请在此处提供这些键。默认情况下,“token_logits”的批处理维度为 1,否则所有键都假定批处理维度为 0。

{'token_logits': 1}

返回

类型 描述
Optional[ReductionT]

与输入序列元素类型相同的单个批次。

源代码位于 bionemo/llm/lightning.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def batch_collator(
    batches: Optional[Union[Tuple[ReductionT], List[ReductionT]]],
    batch_dim: int = 0,
    batch_dim_key_defaults: dict[str, int] = {"token_logits": 1},
) -> Optional[ReductionT]:
    """Takes a sequence of batches and collates them into a single batch.

        This is distinct from the standard pytorch default_collator since it does
        not add the batch dimension, it's assumed the batch
        dimension is already present in the input, as would be the case when
        parallelizing across minibatches.

    IMPORTANT: The underlying data primitive _must_ be a torch Tensor. The input to this function is a recurisve type,
    there can be any amount of nesting between dictionaries, tuples, and lists, as long as the inner type is a n-d Tensor.

    Examples:
        Outer container = Dict:
            [{'a': Tensor([1]), 'b': Tensor([2])}, {'a': Tensor([2]), 'b': Tensor([3])}] -> {'a': Tensor([1, 2]), 'b': Tensor([2, 3])}
        Outer container = List:
            [[Tensor([1]), Tensor([2])], [Tensor([2]), Tensor([3])]] -> [Tensor([1, 2]), Tensor([2, 3])]
        Outer container = Tuple:
            ([Tensor([1]), Tensor([2])], [Tensor([2]), Tensor([3])]) -> (Tensor([1, 2]), Tensor([2, 3]))

    Args:
        batches (Optional[Sequence[ReductionT]]): sequence of batches to collate into a single batch.
        batch_dim: If you know that the batch dim for the batch you are concatenating is not the 0th dimension (for
            example it is sequence first) then supply that dimension.
        batch_dim_key_defaults (dictionary of keys to integers): If your batch is a dictionary and you know that some
            keys have non-standard (0) batch dimensions, supply those here. By default "token_logits" has batch dim 1
            and otherwise all keys are assumed to have batch dim 0.

    Returns:
        A single batch of the same type as the elements of your input sequence.
    """
    match batches:
        # Handle base-cases for batch concatenation, either a list of None or a list of tensors
        case [None, *_]:
            return None
        case [Tensor(), *_]:
            return torch.cat(batches, dim=batch_dim)
        # Next 3 calls are the recursive calls into the sub-structures of the batch. We handle dictionaries, tuples, and lists
        case [dict(), *_]:
            return {
                key: batch_collator(
                    [batch[key] for batch in batches],
                    batch_dim=batch_dim_key_defaults.get(key, 0),
                    batch_dim_key_defaults=batch_dim_key_defaults,
                )
                for key in batches[0]
            }
        case [tuple(), *_]:
            return tuple(
                batch_collator(
                    [batch[i] for batch in batches], batch_dim=batch_dim, batch_dim_key_defaults=batch_dim_key_defaults
                )
                for i in range(len(batches[0]))
            )
        case [list(), *_]:
            return [
                batch_collator(
                    [batch[i] for batch in batches], batch_dim=batch_dim, batch_dim_key_defaults=batch_dim_key_defaults
                )
                for i in range(len(batches[0]))
            ]
        # Final cases shouldn't happen, an empty sequence (no batches), or "other".
        case []:
            raise ValueError("Cannot process an empty sequence")
        case _:
            raise ValueError("Unsupported input structure in batch_collator")

default_megatron_optimizer()

默认分布式优化器使用 Adam,学习率为 1e-4。

源代码位于 bionemo/llm/lightning.py
330
331
332
333
334
def default_megatron_optimizer() -> MegatronOptimizerModule:
    """Default distributed optimizer uses Adam with a 1e-4 learning rate."""
    return MegatronOptimizerModule(
        config=OptimizerConfig(lr=1e-4, optimizer="adam", use_distributed_optimizer=True),
    )

some_first(seq)

从序列返回第一个非 None 值,否则失败

源代码位于 bionemo/llm/lightning.py
54
55
56
57
58
59
def some_first(seq: Iterable[Optional[T]]) -> T:
    """Returns the first non-None value from the sequence or fails"""  # noqa: D415
    for s in seq:
        if s is not None:
            return s
    raise ValueError("non-None value not found")