跳到内容

多轮数据集

EpochIndex

Bases: NamedTuple

包含多轮训练的当前 epoch 和索引的元组。

源代码位于 bionemo/core/data/multi_epoch_dataset.py
42
43
44
45
46
47
48
49
class EpochIndex(NamedTuple):
    """A tuple that contains both the current epoch and index for multi-epoch training."""

    epoch: int
    """An integer representing the current epoch."""

    idx: int
    """An integer representing the index within the current epoch."""

epoch: int instance-attribute

表示当前 epoch 的整数。

idx: int instance-attribute

表示当前 epoch 内索引的整数。

IdentityMultiEpochDatasetWrapper dataclass

Bases: MultiEpochDatasetWrapper[T, T]

MultiEpochDatasetWrapper 的一个实现,不应用任何转换。

源代码位于 bionemo/core/data/multi_epoch_dataset.py
177
178
179
180
181
182
183
class IdentityMultiEpochDatasetWrapper(MultiEpochDatasetWrapper[T, T]):
    """An implementation of the `MultiEpochDatasetWrapper` that does not apply any transformations."""

    def apply_transform(self, sample: T, index: EpochIndex) -> T:
        """Return the sample as is."""
        del index  # Unused.
        return sample

apply_transform(sample, index)

按原样返回样本。

源代码位于 bionemo/core/data/multi_epoch_dataset.py
180
181
182
183
def apply_transform(self, sample: T, index: EpochIndex) -> T:
    """Return the sample as is."""
    del index  # Unused.
    return sample

MultiEpochDataset

Bases: Protocol[T_co]

用于 Megatron-LM 中多轮训练的数据集协议。

Megatron-LM 中的数据集确定性

在 megatron 训练中,采样器和数据集对象用于确保跨模型并行 rank 的数据加载一致性。为了使数据集与 megatron 训练一起工作,对于每次使用相同索引调用 __getitem__,它们必须返回完全相同的数据。

源代码位于 bionemo/core/data/multi_epoch_dataset.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class MultiEpochDataset(Protocol[T_co]):
    """A protocol for datasets for multi-epoch training in Megatron-LM.

    !!! important "Dataset determinism in Megatron-LM"
        In megatron training, the sampler and dataset objects are used to ensure consistent data loading across
        model-parallel ranks. For datasets to work with megatron training, they must return exactly the same data for
        every call to `__getitem__` with the same index.
    """

    def __getitem__(self, index: EpochIndex) -> T_co:  # noqa: D105
        ...

    def __len__(self) -> int:  # noqa: D105
        ...

MultiEpochDatasetResampler dataclass

Bases: Dataset[T_co]

一个数据集包装器类,将来自 Megatron-LM 的顺序采样转换为基于 epoch 的采样。

应提供 num_epochsnum_samples 之一。如果两者均未提供,则数据集将使用单个 epoch。如果给定 num_epochs,则重新采样的数据集的长度将为 len(dataset) * num_epochs 个样本。如果 num_samples,则重新采样的数据集将具有 num_samples 个样本。对于 num_samples,数据集将重复多个 epoch,直到达到所需的样本数(最终 epoch 将被截断)。

源代码位于 bionemo/core/data/multi_epoch_dataset.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
@dataclass
class MultiEpochDatasetResampler(Dataset[T_co]):
    """A dataset wrapper class that converts the sequential sampling from Megatron-LM to epoch-based sampling.

    Either `num_epochs` or `num_samples` should be provided. If neither are provided, the dataset will use a single
    epoch. If `num_epochs` is given, the resampled dataset will have `len(dataset) * num_epochs` samples. If
    `num_samples` the resampled dataset will have `num_samples` samples. For `num_samples`, the dataset will be repeated
    for multiple epochs until the desired number of samples is reached (with the final epoch being truncated).
    """

    dataset: MultiEpochDataset[T_co]
    """The dataset to resample. Must support indexing with an `EpochIndex`."""

    num_epochs: int | None = None
    """The total number of epochs. The length of the resampled dataset will be len(dataset) * num_epochs."""

    num_samples: int | None = None
    """The total number of samples to draw.

    The number of epochs will be determined by the number of samples and the length of the dataset.
    """

    shuffle: bool = True
    """Whether to shuffle the samples in the dataset each epoch."""

    seed: int = 42  # type: ignore
    """A random seed for reproducibility."""

    def __post_init__(self):
        """Pre-shuffle each epoch's samples."""
        if self.num_epochs is None and self.num_samples is None:
            self.num_epochs = 1
        elif self.num_epochs is not None and self.num_samples is not None:
            raise ValueError("Only one of num_epochs and num_samples should be provided.")

        if self.num_epochs is None and self.num_samples is not None:
            self.num_epochs = math.ceil(self.num_samples / len(self.dataset))

        elif self.num_samples is None and self.num_epochs is not None:
            self.num_samples = len(self.dataset) * self.num_epochs

        # Type guard statements, the above if/elif block should ensure these are not None.
        assert self.num_epochs is not None
        assert self.num_samples is not None

        if self.num_epochs < 1:
            raise ValueError("num_epochs must be at least 1.")

        rng = np.random.default_rng(self.seed)

        # Initialize a vector of random seeds so that each epoch is shuffled differently.
        self.epoch_seeds = rng.integers(0, np.iinfo(np.int32).max, size=self.num_epochs)

    def __getitem__(self, index: int) -> T_co:
        """Get the sample at the given index."""
        if index not in range(len(self)):
            raise IndexError(f"Index {index} out of bounds for dataset of length {len(self)}.")
        return self.dataset[self._global_index_to_permuted_local_index(index)]

    def __len__(self) -> int:
        """Return the length of the resampled dataset."""
        return self.num_samples  # type: ignore

    def _global_index_to_permuted_local_index(self, index: int) -> EpochIndex:
        """Convert a global index to an epoch index."""
        epoch = index // len(self.dataset)
        idx = index % len(self.dataset)
        if self.shuffle:
            idx = permute(idx, len(self.dataset), self.epoch_seeds[epoch])
        return EpochIndex(epoch, idx)

dataset: MultiEpochDataset[T_co] instance-attribute

要重新采样的数据集。必须支持使用 EpochIndex 进行索引。

num_epochs: int | None = None class-attribute instance-attribute

epoch 的总数。重新采样的数据集的长度将为 len(dataset) * num_epochs。

num_samples: int | None = None class-attribute instance-attribute

要抽取的样本总数。

epoch 的数量将由样本数量和数据集的长度决定。

seed: int = 42 class-attribute instance-attribute

用于可重复性的随机种子。

shuffle: bool = True class-attribute instance-attribute

是否在每个 epoch 中打乱数据集中的样本。

__getitem__(index)

获取给定索引处的样本。

源代码位于 bionemo/core/data/multi_epoch_dataset.py
131
132
133
134
135
def __getitem__(self, index: int) -> T_co:
    """Get the sample at the given index."""
    if index not in range(len(self)):
        raise IndexError(f"Index {index} out of bounds for dataset of length {len(self)}.")
    return self.dataset[self._global_index_to_permuted_local_index(index)]

__len__()

返回重新采样的数据集的长度。

源代码位于 bionemo/core/data/multi_epoch_dataset.py
137
138
139
def __len__(self) -> int:
    """Return the length of the resampled dataset."""
    return self.num_samples  # type: ignore

__post_init__()

预先打乱每个 epoch 的样本。

源代码位于 bionemo/core/data/multi_epoch_dataset.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def __post_init__(self):
    """Pre-shuffle each epoch's samples."""
    if self.num_epochs is None and self.num_samples is None:
        self.num_epochs = 1
    elif self.num_epochs is not None and self.num_samples is not None:
        raise ValueError("Only one of num_epochs and num_samples should be provided.")

    if self.num_epochs is None and self.num_samples is not None:
        self.num_epochs = math.ceil(self.num_samples / len(self.dataset))

    elif self.num_samples is None and self.num_epochs is not None:
        self.num_samples = len(self.dataset) * self.num_epochs

    # Type guard statements, the above if/elif block should ensure these are not None.
    assert self.num_epochs is not None
    assert self.num_samples is not None

    if self.num_epochs < 1:
        raise ValueError("num_epochs must be at least 1.")

    rng = np.random.default_rng(self.seed)

    # Initialize a vector of random seeds so that each epoch is shuffled differently.
    self.epoch_seeds = rng.integers(0, np.iinfo(np.int32).max, size=self.num_epochs)

MultiEpochDatasetWrapper dataclass

Bases: Dataset[U_co], Generic[T, U_co], ABC

一个包装器,用于将标准 pytorch 数据集转换为支持多轮 megatron 训练的数据集。

底层数据集的 getitem 方法必须是确定性的,即对于每次调用,对于相同的索引,它必须返回相同的数据。如果存在任何非确定性操作,则应将其移动到 apply_transform 方法。此方法对于每个 (epoch, index) 对也必须是确定性的,但它可以使用 epoch 来实现每个 epoch 的数据增强。

源代码位于 bionemo/core/data/multi_epoch_dataset.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
@dataclass
class MultiEpochDatasetWrapper(Dataset[U_co], Generic[T, U_co], ABC):
    """A wrapper to convert a standard pytorch dataset into one that supports multi-epoch megatron training.

    The underlying dataset's __getitem__ method must be deterministic, i.e. it must return the same data for the same
    index every time it is called. If there are any non-deterministic operations, they should be moved to the
    `apply_transform` method. This method must also be deterministic for every (epoch, index) pair, but it can use
    the epoch to implement data augmentation each epoch.
    """

    dataset: SizedDataset[T]
    """A deterministic dataset that supports indexing with an integer index."""

    @abstractmethod
    def apply_transform(self, sample: T, index: EpochIndex) -> U_co:
        """Apply any transformations to the sample for the given epoch."""
        raise NotImplementedError

    def __getitem__(self, index: EpochIndex) -> U_co:
        """Get the sample at the given epoch and index."""
        return self.apply_transform(self.dataset[index.idx], index)

    def __len__(self) -> int:
        """Return the length of the dataset."""
        return len(self.dataset)

dataset: SizedDataset[T] instance-attribute

一个确定性数据集,支持使用整数索引进行索引。

__getitem__(index)

获取给定 epoch 和索引处的样本。

源代码位于 bionemo/core/data/multi_epoch_dataset.py
168
169
170
def __getitem__(self, index: EpochIndex) -> U_co:
    """Get the sample at the given epoch and index."""
    return self.apply_transform(self.dataset[index.idx], index)

__len__()

返回数据集的长度。

源代码位于 bionemo/core/data/multi_epoch_dataset.py
172
173
174
def __len__(self) -> int:
    """Return the length of the dataset."""
    return len(self.dataset)

apply_transform(sample, index) abstractmethod

对给定 epoch 的样本应用任何转换。

源代码位于 bionemo/core/data/multi_epoch_dataset.py
163
164
165
166
@abstractmethod
def apply_transform(self, sample: T, index: EpochIndex) -> U_co:
    """Apply any transformations to the sample for the given epoch."""
    raise NotImplementedError

SizedDataset

Bases: Protocol[T_co]

用于具有固定长度的整数索引数据集的协议。

源代码位于 bionemo/core/data/multi_epoch_dataset.py
52
53
54
55
56
57
58
59
class SizedDataset(Protocol[T_co]):
    """A protocol for integer-indexed datasets that have a fixed length."""

    def __getitem__(self, index: int) -> T_co:  # noqa: D105
        ...

    def __len__(self) -> int:  # noqa: D105
        ...