跳到内容

Datamodule

ESM2FineTuneDataModule

基类:MegatronDataModule

用于微调 ESM2 模型的 PyTorch Lightning DataModule。

此 DataModule 旨在处理用于微调 ESM2 模型的数据准备和加载。它提供了一种灵活的方式来创建和管理数据集、数据加载器和采样策略。

源代码位于 bionemo/esm2/model/finetune/datamodule.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
class ESM2FineTuneDataModule(MegatronDataModule):
    """A PyTorch Lightning DataModule for fine-tuning ESM2 models.

    This DataModule is designed to handle the data preparation and loading for fine-tuning ESM2 models.
    It provides a flexible way to create and manage datasets, data loaders, and sampling strategies.
    """

    def __init__(
        self,
        train_dataset: DATASET_TYPES = None,
        valid_dataset: DATASET_TYPES = None,
        predict_dataset: DATASET_TYPES = None,
        seed: int = 42,
        min_seq_length: int | None = None,
        max_seq_length: int = 1024,
        micro_batch_size: int = 4,
        global_batch_size: int = 8,
        num_workers: int = 2,
        persistent_workers: bool = True,
        pin_memory: bool = True,
        rampup_batch_size: list[int] | None = None,
        tokenizer: tokenizer.BioNeMoESMTokenizer = tokenizer.get_tokenizer(),
    ) -> None:
        """Initialize the ESM2FineTuneDataModule.

        Args:
            train_dataset: The training dataset.
            valid_dataset: The validation dataset.
            predict_dataset: The prediction dataset. Should not be set together with train/valid datasets
            seed: The random seed to use for shuffling the datasets. Defaults to 42.
            min_seq_length: The minimum sequence length for the datasets. Defaults to None.
            max_seq_length: The maximum sequence length for the datasets. Defaults to 1024.
            micro_batch_size: The micro-batch size for the data loader. Defaults to 4.
            global_batch_size: The global batch size for the data loader. Defaults to 8.
            num_workers: The number of worker processes for the data loader. Defaults to 10.
            persistent_workers: Whether to persist the worker processes. Defaults to True.
            pin_memory: Whether to pin the data in memory. Defaults to True.
            rampup_batch_size: The batch size ramp-up schedule. Defaults to None.
            tokenizer: The tokenizer to use for tokenization. Defaults to the BioNeMoESMTokenizer.

        Returns:
            None
        """
        super().__init__()
        self.train_dataset = train_dataset
        self.valid_dataset = valid_dataset
        self.predict_dataset = predict_dataset
        if predict_dataset is not None:
            assert train_dataset is None, "Datamodule expects either trin/valid dataset or predict dataset"
        self._seed = seed
        self._min_seq_length = min_seq_length
        self._max_seq_length = max_seq_length
        self._tokenizer = tokenizer

        self._micro_batch_size = micro_batch_size
        self._num_workers = num_workers
        self._persistent_workers = persistent_workers
        self._pin_memory = pin_memory

        self.data_sampler = MegatronDataSampler(
            seq_len=max_seq_length,
            micro_batch_size=micro_batch_size,
            global_batch_size=global_batch_size,
            dataloader_type="single",  # `MegatronPretrainingRandomSampler` from "cyclic" is failing.
            rampup_batch_size=rampup_batch_size,
            output_log=predict_dataset is None,  # logging does not work with predict step
        )

    def setup(self, stage: str) -> None:
        """Setup the ESMDataModule.

        Args:
            stage: Unused.

        Raises:
            RuntimeError: If the trainer is not attached, or if the trainer's max_steps is not set.
        """
        del stage  # Unused.

        if not hasattr(self, "trainer") or self.trainer is None:
            raise RuntimeError("Setup should be completed when trainer and config are attached.")

        if self.trainer.max_epochs is not None and self.trainer.max_epochs > 1:
            logging.warning(
                "Trainer is set to run for multiple epochs. This is not recommended due to the same shuffle being used "
                "in each. Instead set max_epochs to 1 and increase the number of max_steps."
            )

        # Create training dataset
        if self.train_dataset is not None:
            max_train_steps = self.trainer.max_steps
            if max_train_steps <= 0:
                raise RuntimeError("Please specify trainer.max_steps")

            num_train_samples = int(max_train_steps * self.data_sampler.global_batch_size)
            self._train_ds = self._create_epoch_based_dataset(self.train_dataset, num_train_samples)

        # Create validation dataset
        if self.valid_dataset is not None and self.trainer.limit_val_batches != 0:
            num_val_samples = infer_num_samples(
                limit_batches=self.trainer.limit_val_batches,
                num_samples_in_dataset=len(self.valid_dataset),
                global_batch_size=self.data_sampler.global_batch_size,
                stage="val",
            )
            self._valid_ds = self._create_epoch_based_dataset(self.valid_dataset, num_val_samples)

        assert (
            hasattr(self, "trainer") and self.trainer is not None
        ), "Setup should be completed when trainer and config are attached."

    def _create_epoch_based_dataset(
        self,
        dataset: InMemoryPerTokenValueDataset | InMemorySingleValueDataset,
        total_samples: int,
    ):
        return MultiEpochDatasetResampler(
            IdentityMultiEpochDatasetWrapper(dataset),
            num_samples=total_samples,
            shuffle=self.predict_dataset is None,
            seed=self._seed,
        )

    def _create_dataloader(self, dataset, mode: Mode, **kwargs) -> WrappedDataLoader:
        """Create dataloader for train, validation, and test stages.

        Args:
            dataset: The dataset to create the dataloader for.
            mode: Stage of training, which is used to determined if consumed_samples in MegatronPretrainingSampler should be initialized to 0 (validation/test), or be set to the previous value from state_dict in case of checkpoint resumption (train).
            **kwargs: Additional arguments to pass to the dataloader.
        """
        if mode not in ["predict", "test"]:
            self.update_init_global_step()
        assert self._tokenizer.pad_token_id is not None, "Tokenizer must have a pad token id."

        return WrappedDataLoader(
            mode=mode,
            dataset=dataset,
            num_workers=self._num_workers,
            pin_memory=self._pin_memory,
            persistent_workers=self._persistent_workers,
            collate_fn=functools.partial(
                collate.bert_padding_collate_fn,
                padding_value=self._tokenizer.pad_token_id,
                min_length=self._min_seq_length,
                max_length=self._max_seq_length,
            ),
            **kwargs,
        )

    def train_dataloader(self) -> TRAIN_DATALOADERS:
        """Returns the dataloader for training data."""
        assert self._train_ds is not None, "train_dataset is not provided to ESM2FineTuneDataModule"
        return self._create_dataloader(self._train_ds, mode="train")

    def val_dataloader(self) -> EVAL_DATALOADERS:
        """Returns the dataloader for validation data."""
        assert self._valid_ds is not None, "valid_dataset is not provided to ESM2FineTuneDataModule"
        return self._create_dataloader(self._valid_ds, mode="validation")

    def predict_dataloader(self) -> EVAL_DATALOADERS:
        """Returns the dataloader for prediction data."""
        assert self.predict_dataset is not None, "predict_dataset is not provided to ESM2FineTuneDataModule"
        return self._create_dataloader(self.predict_dataset, mode="predict")

    def test_dataloader(self) -> EVAL_DATALOADERS:
        """Raises a not implemented error."""
        raise NotImplementedError("No test dataset provided for ESM2")

__init__(train_dataset=None, valid_dataset=None, predict_dataset=None, seed=42, min_seq_length=None, max_seq_length=1024, micro_batch_size=4, global_batch_size=8, num_workers=2, persistent_workers=True, pin_memory=True, rampup_batch_size=None, tokenizer=tokenizer.get_tokenizer())

初始化 ESM2FineTuneDataModule。

参数

名称 类型 描述 默认值
train_dataset DATASET_TYPES

训练数据集。

valid_dataset DATASET_TYPES

验证数据集。

predict_dataset DATASET_TYPES

预测数据集。不应与训练/验证数据集一起设置

seed 整数

用于数据集洗牌的随机种子。默认为 42。

42
min_seq_length 整数 | 无

数据集的最小序列长度。默认为无。

max_seq_length 整数

数据集的最大序列长度。默认为 1024。

1024
micro_batch_size 整数

数据加载器的微批次大小。默认为 4。

4
global_batch_size 整数

数据加载器的全局批次大小。默认为 8。

8
num_workers 整数

数据加载器的工作进程数。默认为 10。

2
persistent_workers 布尔值

是否持久化工作进程。默认为 True。

True
pin_memory 布尔值

是否将数据固定在内存中。默认为 True。

True
rampup_batch_size 列表[整数] | 无

批次大小 ramp-up 计划表。默认为无。

tokenizer BioNeMoESMTokenizer

用于分词的分词器。默认为 BioNeMoESMTokenizer。

get_tokenizer()

返回

类型 描述

源代码位于 bionemo/esm2/model/finetune/datamodule.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def __init__(
    self,
    train_dataset: DATASET_TYPES = None,
    valid_dataset: DATASET_TYPES = None,
    predict_dataset: DATASET_TYPES = None,
    seed: int = 42,
    min_seq_length: int | None = None,
    max_seq_length: int = 1024,
    micro_batch_size: int = 4,
    global_batch_size: int = 8,
    num_workers: int = 2,
    persistent_workers: bool = True,
    pin_memory: bool = True,
    rampup_batch_size: list[int] | None = None,
    tokenizer: tokenizer.BioNeMoESMTokenizer = tokenizer.get_tokenizer(),
) -> None:
    """Initialize the ESM2FineTuneDataModule.

    Args:
        train_dataset: The training dataset.
        valid_dataset: The validation dataset.
        predict_dataset: The prediction dataset. Should not be set together with train/valid datasets
        seed: The random seed to use for shuffling the datasets. Defaults to 42.
        min_seq_length: The minimum sequence length for the datasets. Defaults to None.
        max_seq_length: The maximum sequence length for the datasets. Defaults to 1024.
        micro_batch_size: The micro-batch size for the data loader. Defaults to 4.
        global_batch_size: The global batch size for the data loader. Defaults to 8.
        num_workers: The number of worker processes for the data loader. Defaults to 10.
        persistent_workers: Whether to persist the worker processes. Defaults to True.
        pin_memory: Whether to pin the data in memory. Defaults to True.
        rampup_batch_size: The batch size ramp-up schedule. Defaults to None.
        tokenizer: The tokenizer to use for tokenization. Defaults to the BioNeMoESMTokenizer.

    Returns:
        None
    """
    super().__init__()
    self.train_dataset = train_dataset
    self.valid_dataset = valid_dataset
    self.predict_dataset = predict_dataset
    if predict_dataset is not None:
        assert train_dataset is None, "Datamodule expects either trin/valid dataset or predict dataset"
    self._seed = seed
    self._min_seq_length = min_seq_length
    self._max_seq_length = max_seq_length
    self._tokenizer = tokenizer

    self._micro_batch_size = micro_batch_size
    self._num_workers = num_workers
    self._persistent_workers = persistent_workers
    self._pin_memory = pin_memory

    self.data_sampler = MegatronDataSampler(
        seq_len=max_seq_length,
        micro_batch_size=micro_batch_size,
        global_batch_size=global_batch_size,
        dataloader_type="single",  # `MegatronPretrainingRandomSampler` from "cyclic" is failing.
        rampup_batch_size=rampup_batch_size,
        output_log=predict_dataset is None,  # logging does not work with predict step
    )

predict_dataloader()

返回预测数据的数据加载器。

源代码位于 bionemo/esm2/model/finetune/datamodule.py
293
294
295
296
def predict_dataloader(self) -> EVAL_DATALOADERS:
    """Returns the dataloader for prediction data."""
    assert self.predict_dataset is not None, "predict_dataset is not provided to ESM2FineTuneDataModule"
    return self._create_dataloader(self.predict_dataset, mode="predict")

setup(stage)

设置 ESMDataModule。

参数

名称 类型 描述 默认值
stage 字符串

未使用。

必需

引发

类型 描述
RuntimeError

如果未附加训练器,或者如果未设置训练器的 max_steps。

源代码位于 bionemo/esm2/model/finetune/datamodule.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def setup(self, stage: str) -> None:
    """Setup the ESMDataModule.

    Args:
        stage: Unused.

    Raises:
        RuntimeError: If the trainer is not attached, or if the trainer's max_steps is not set.
    """
    del stage  # Unused.

    if not hasattr(self, "trainer") or self.trainer is None:
        raise RuntimeError("Setup should be completed when trainer and config are attached.")

    if self.trainer.max_epochs is not None and self.trainer.max_epochs > 1:
        logging.warning(
            "Trainer is set to run for multiple epochs. This is not recommended due to the same shuffle being used "
            "in each. Instead set max_epochs to 1 and increase the number of max_steps."
        )

    # Create training dataset
    if self.train_dataset is not None:
        max_train_steps = self.trainer.max_steps
        if max_train_steps <= 0:
            raise RuntimeError("Please specify trainer.max_steps")

        num_train_samples = int(max_train_steps * self.data_sampler.global_batch_size)
        self._train_ds = self._create_epoch_based_dataset(self.train_dataset, num_train_samples)

    # Create validation dataset
    if self.valid_dataset is not None and self.trainer.limit_val_batches != 0:
        num_val_samples = infer_num_samples(
            limit_batches=self.trainer.limit_val_batches,
            num_samples_in_dataset=len(self.valid_dataset),
            global_batch_size=self.data_sampler.global_batch_size,
            stage="val",
        )
        self._valid_ds = self._create_epoch_based_dataset(self.valid_dataset, num_val_samples)

    assert (
        hasattr(self, "trainer") and self.trainer is not None
    ), "Setup should be completed when trainer and config are attached."

test_dataloader()

引发未实现错误。

源代码位于 bionemo/esm2/model/finetune/datamodule.py
298
299
300
def test_dataloader(self) -> EVAL_DATALOADERS:
    """Raises a not implemented error."""
    raise NotImplementedError("No test dataset provided for ESM2")

train_dataloader()

返回训练数据的数据加载器。

源代码位于 bionemo/esm2/model/finetune/datamodule.py
283
284
285
286
def train_dataloader(self) -> TRAIN_DATALOADERS:
    """Returns the dataloader for training data."""
    assert self._train_ds is not None, "train_dataset is not provided to ESM2FineTuneDataModule"
    return self._create_dataloader(self._train_ds, mode="train")

val_dataloader()

返回验证数据的数据加载器。

源代码位于 bionemo/esm2/model/finetune/datamodule.py
288
289
290
291
def val_dataloader(self) -> EVAL_DATALOADERS:
    """Returns the dataloader for validation data."""
    assert self._valid_ds is not None, "valid_dataset is not provided to ESM2FineTuneDataModule"
    return self._create_dataloader(self._valid_ds, mode="validation")

InMemoryCSVDataset

基类:Dataset

一个内存数据集,将字符串分词为 BertSample 实例。

源代码位于 bionemo/esm2/model/finetune/datamodule.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class InMemoryCSVDataset(Dataset):
    """An in-memory dataset that tokenize strings into BertSample instances."""

    def __init__(
        self,
        data_path: str | os.PathLike,
        tokenizer: tokenizer.BioNeMoESMTokenizer = tokenizer.get_tokenizer(),
        seed: int = np.random.SeedSequence().entropy,  # type: ignore
    ):
        """Initializes a dataset for single-value regression fine-tuning.

        This is an in-memory dataset that does not apply masking to the sequence. But keeps track of <mask> in the
        dataset sequences provided.

        Args:
            data_path (str | os.PathLike): A path to the CSV file containing sequences.
            labels (Optional[Sequence[float | str]]): An optional sequence of labels with 1:1 mapping to sequences.
            tokenizer (tokenizer.BioNeMoESMTokenizer, optional): The tokenizer to use. Defaults to tokenizer.get_tokenizer().
            seed: Random seed for reproducibility. This seed is mixed with the index of the sample to retrieve to ensure
                that __getitem__ is deterministic, but can be random across different runs. If None, a random seed is
                generated.
        """
        self.sequences, self.labels = self.load_data(data_path)

        self.seed = seed
        self._len = len(self.sequences)
        self.tokenizer = tokenizer

    def __len__(self) -> int:
        """The size of the dataset."""
        return self._len

    def __getitem__(self, index: int) -> BertSample:
        """Obtains the BertSample at the given index."""
        sequence = self.sequences[index]
        tokenized_sequence = self._tokenize(sequence)

        label = tokenized_sequence if len(self.labels) == 0 else torch.Tensor([self.labels[index]])
        # Overall mask for a token being masked in some capacity - either mask token, random token, or left as-is
        loss_mask = ~torch.isin(tokenized_sequence, Tensor(self.tokenizer.all_special_ids))

        return {
            "text": tokenized_sequence,
            "types": torch.zeros_like(tokenized_sequence, dtype=torch.int64),
            "attention_mask": torch.ones_like(tokenized_sequence, dtype=torch.int64),
            "labels": label,
            "loss_mask": loss_mask,
            "is_random": torch.zeros_like(tokenized_sequence, dtype=torch.int64),
        }

    def load_data(self, csv_path: str | os.PathLike) -> Tuple[Sequence, Sequence]:
        """Loads data from a CSV file, returning sequences and optionally labels.

        This method should be implemented by subclasses to process labels for their specific dataset.

        Args:
            csv_path (str | os.PathLike): The path to the CSV file containing the data.
            The file is expected to have at least one column named 'sequence'. A 'label' column is optional.

        Returns:
            Tuple[Sequence, Sequence]: A tuple where the first element is a list of sequences and the second element is
            a list of labels. If the 'label' column is not present, an empty list is returned for labels.
        """
        df = pd.read_csv(csv_path)
        sequences = df["sequences"].tolist()

        if "labels" in df.columns:
            labels = df["labels"].tolist()
        else:
            labels = []
        return sequences, labels

    def _tokenize(self, sequence: str) -> Tensor:
        """Tokenize a protein sequence.

        Args:
            sequence: The protein sequence.

        Returns:
            The tokenized sequence.
        """
        tensor = self.tokenizer.encode(sequence, add_special_tokens=True, return_tensors="pt")
        return tensor.flatten()  # type: ignore

__getitem__(index)

获取给定索引处的 BertSample。

源代码位于 bionemo/esm2/model/finetune/datamodule.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def __getitem__(self, index: int) -> BertSample:
    """Obtains the BertSample at the given index."""
    sequence = self.sequences[index]
    tokenized_sequence = self._tokenize(sequence)

    label = tokenized_sequence if len(self.labels) == 0 else torch.Tensor([self.labels[index]])
    # Overall mask for a token being masked in some capacity - either mask token, random token, or left as-is
    loss_mask = ~torch.isin(tokenized_sequence, Tensor(self.tokenizer.all_special_ids))

    return {
        "text": tokenized_sequence,
        "types": torch.zeros_like(tokenized_sequence, dtype=torch.int64),
        "attention_mask": torch.ones_like(tokenized_sequence, dtype=torch.int64),
        "labels": label,
        "loss_mask": loss_mask,
        "is_random": torch.zeros_like(tokenized_sequence, dtype=torch.int64),
    }

__init__(data_path, tokenizer=tokenizer.get_tokenizer(), seed=np.random.SeedSequence().entropy)

初始化用于单值回归微调的数据集。

这是一个内存数据集,不对序列应用掩码。但会跟踪在提供的数据集序列中。

参数

名称 类型 描述 默认值
data_path 字符串 | PathLike

包含序列的 CSV 文件的路径。

必需
labels Optional[Sequence[float | str]]

一个可选的标签序列,与序列具有 1:1 映射。

必需
tokenizer BioNeMoESMTokenizer

要使用的分词器。默认为 tokenizer.get_tokenizer()。

get_tokenizer()
seed 整数

用于可重现性的随机种子。此种子与要检索的样本的索引混合,以确保 getitem 是确定性的,但在不同运行中可能是随机的。如果为 None,则生成一个随机种子。

entropy
源代码位于 bionemo/esm2/model/finetune/datamodule.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def __init__(
    self,
    data_path: str | os.PathLike,
    tokenizer: tokenizer.BioNeMoESMTokenizer = tokenizer.get_tokenizer(),
    seed: int = np.random.SeedSequence().entropy,  # type: ignore
):
    """Initializes a dataset for single-value regression fine-tuning.

    This is an in-memory dataset that does not apply masking to the sequence. But keeps track of <mask> in the
    dataset sequences provided.

    Args:
        data_path (str | os.PathLike): A path to the CSV file containing sequences.
        labels (Optional[Sequence[float | str]]): An optional sequence of labels with 1:1 mapping to sequences.
        tokenizer (tokenizer.BioNeMoESMTokenizer, optional): The tokenizer to use. Defaults to tokenizer.get_tokenizer().
        seed: Random seed for reproducibility. This seed is mixed with the index of the sample to retrieve to ensure
            that __getitem__ is deterministic, but can be random across different runs. If None, a random seed is
            generated.
    """
    self.sequences, self.labels = self.load_data(data_path)

    self.seed = seed
    self._len = len(self.sequences)
    self.tokenizer = tokenizer

__len__()

数据集的大小。

源代码位于 bionemo/esm2/model/finetune/datamodule.py
73
74
75
def __len__(self) -> int:
    """The size of the dataset."""
    return self._len

load_data(csv_path)

从 CSV 文件加载数据,返回序列和可选的标签。

子类应实现此方法,以处理其特定数据集的标签。

参数

名称 类型 描述 默认值
csv_path 字符串 | PathLike

包含数据的 CSV 文件的路径。

必需

返回

类型 描述
序列

Tuple[Sequence, Sequence]: 一个元组,其中第一个元素是序列列表,第二个元素是

序列

标签列表。如果 'label' 列不存在,则为标签返回一个空列表。

源代码位于 bionemo/esm2/model/finetune/datamodule.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def load_data(self, csv_path: str | os.PathLike) -> Tuple[Sequence, Sequence]:
    """Loads data from a CSV file, returning sequences and optionally labels.

    This method should be implemented by subclasses to process labels for their specific dataset.

    Args:
        csv_path (str | os.PathLike): The path to the CSV file containing the data.
        The file is expected to have at least one column named 'sequence'. A 'label' column is optional.

    Returns:
        Tuple[Sequence, Sequence]: A tuple where the first element is a list of sequences and the second element is
        a list of labels. If the 'label' column is not present, an empty list is returned for labels.
    """
    df = pd.read_csv(csv_path)
    sequences = df["sequences"].tolist()

    if "labels" in df.columns:
        labels = df["labels"].tolist()
    else:
        labels = []
    return sequences, labels