跳到内容

单细胞内存映射数据集

FileNames

基类:str, Enum

在 SingleCellCollection 中生成的文件名。

bionemo/scdl/io/single_cell_memmap_dataset.py 中的源代码
35
36
37
38
39
40
41
42
43
44
class FileNames(str, Enum):
    """Names of files that are generated in SingleCellCollection."""

    DATA = "data.npy"
    COLPTR = "col_ptr.npy"
    ROWPTR = "row_ptr.npy"
    METADATA = "metadata.json"
    DTYPE = "dtypes.json"
    FEATURES = "features"
    VERSION = "version.json"

METADATA

基类:str, Enum

存储的元数据。

bionemo/scdl/io/single_cell_memmap_dataset.py 中的源代码
59
60
61
62
class METADATA(str, Enum):
    """Stored metadata."""

    NUM_ROWS = "num_rows"

Mode

基类:str, Enum

单细胞内存映射数据集的有效模式。

写入追加模式为 'w+',读取追加模式为 'r+'。

bionemo/scdl/io/single_cell_memmap_dataset.py 中的源代码
47
48
49
50
51
52
53
54
55
56
class Mode(str, Enum):
    """Valid modes for the single cell memory mapped dataset.

    The write append mode is 'w+' while the read append mode is 'r+'.
    """

    CREATE_APPEND = "w+"
    READ_APPEND = "r+"
    READ = "r"
    CREATE = "w"

SingleCellMemMapDataset

基类:SingleCellRowDataset

表示一个或多个 AnnData 矩阵。

数据存储在大型内存映射数组中,这使得可以快速访问大于系统可用 RAM 容量的数据集。SCMMAP 实现了 SingleCellRowDataset 中定义的一致 API。

属性

名称 类型 描述
data_path str

要从中加载或将要加载的 np.memmap 文件的位置

mode Mode

数据集将从 np.memmap 文件中读取 (r+) 还是

data Optional[ndarray]

数据的 numpy 数组

row_index Optional[ndarray]

行指针的 numpy 数组

col_index Optional[ndarray]

列值的 numpy 数组

metadata Dict[str, int]

关于数据集的各种元数据。

_feature_index RowFeatureIndex

相应的 RowFeatureIndex,其中包含特征

dtypes Dict[FileNames, str]

包含数据、row_index 的数据类型的字典,

_version str

数据集的版本

bionemo/scdl/io/single_cell_memmap_dataset.py 中的源代码
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
class SingleCellMemMapDataset(SingleCellRowDataset):
    """Represents one or more AnnData matrices.

    Data is stored in large, memory-mapped arrays that enables fast access of
    datasets larger than the available amount of RAM on a system. SCMMAP
    implements a consistent API defined in SingleCellRowDataset.

    Attributes:
        data_path: Location of np.memmap files to be loaded from or that will be
        created.
        mode: Whether the dataset will be read in (r+) from np.memmap files or
        written to np.memmap files (w+).
        data: A numpy array of the data
        row_index: A numpy array of row pointers
        col_index: A numpy array of column values
        metadata: Various metata about the dataset.
        _feature_index: The corresponding RowFeatureIndex where features are
        stored
        dtypes: A dictionary containing the datatypes of the data, row_index,
        and col_index arrays.
        _version: The version of the dataset
    """

    def __init__(
        self,
        data_path: str,
        h5ad_path: Optional[str] = None,
        num_elements: Optional[int] = None,
        num_rows: Optional[int] = None,
        mode: Mode = Mode.READ_APPEND,
        paginated_load_cutoff: int = 10_000,
        load_block_row_size: int = 1_000_000,
    ) -> None:
        """Instantiate the class.

        Args:
            data_path: The location where the data np.memmap files are read from
            or stored.
            h5ad_path: Optional, the location of the h5_ad path.
            num_elements: The total number of elements in the array.
            num_rows: The number of rows in the data frame.
            mode: Whether to read or write from the data_path.
            paginated_load_cutoff: MB size on disk at which to load the h5ad structure with paginated load.
            load_block_row_size: Number of rows to load into memory with paginated load
        """
        self._version: str = importlib.metadata.version("bionemo.scdl")
        self.data_path: str = data_path
        self.mode: Mode = mode
        self.paginated_load_cutoff = paginated_load_cutoff
        self.load_block_row_size = load_block_row_size
        # Backing arrays
        self.data: Optional[np.ndarray] = None
        self.row_index: Optional[np.ndarray] = None
        self.row_index: Optional[np.ndarray] = None

        # Metadata and attributes
        self.metadata: Dict[str, int] = {}

        # Stores the Feature Index, which tracks
        # the original AnnData features (e.g., gene names)
        # and allows us to store ragged arrays in our SCMMAP structure.
        self._feature_index: RowFeatureIndex = RowFeatureIndex()

        # Variables for int packing / reduced precision
        self.dtypes: Dict[FileNames, str] = {
            f"{FileNames.DATA.value}": "float32",
            f"{FileNames.COLPTR.value}": "uint32",
            f"{FileNames.ROWPTR.value}": "uint64",
        }

        if mode == Mode.CREATE_APPEND and os.path.exists(data_path):
            raise FileExistsError(f"Output directory already exists: {data_path}")

        if h5ad_path is not None and (data_path is not None and os.path.exists(data_path)):
            raise FileExistsError(
                "Invalid input; both an existing SCMMAP and an h5ad file were passed. "
                "Please pass either an existing SCMMAP or an h5ad file."
            )

        # If there is only a data path, and it exists already, load SCMMAP data.
        elif data_path is not None and os.path.exists(data_path):
            self.__init__obj()
            self.load(data_path)

        # If there is only an h5ad path, load the HDF5 data
        elif h5ad_path is not None:
            self.__init__obj()
            self.load_h5ad(h5ad_path)
        else:
            match num_rows, num_elements:
                case (int(), int()):
                    self.__init__obj()
                    self._init_arrs(num_elements=num_elements, num_rows=num_rows)
                case _:
                    raise ValueError(
                        "An np.memmap path, an h5ad path, or the number of elements and rows is required" ""
                    )

    def __init__obj(self):
        """Initializes the datapath and writes the version."""
        os.makedirs(self.data_path, exist_ok=True)

        # Write the version
        if not os.path.exists(f"{self.data_path}/{FileNames.VERSION.value}"):
            with open(f"{self.data_path}/{FileNames.VERSION.value}", "w") as vfi:
                json.dump(self.version(), vfi)

    def _init_arrs(self, num_elements: int, num_rows: int) -> None:
        self.mode = Mode.CREATE_APPEND
        data_arr, col_arr, row_arr = _create_compressed_sparse_row_memmaps(
            num_elements=num_elements,
            num_rows=num_rows,
            memmap_dir_path=Path(self.data_path),
            mode=self.mode,
            dtypes=self.dtypes,
        )
        self.data = data_arr
        self.col_index = col_arr
        self.row_index = row_arr

    def version(self) -> str:
        """Returns a version number.

        (following <major>.<minor>.<point> convention).
        """
        return self._version

    def get_row(
        self,
        index: int,
        return_features: bool = False,
        feature_vars: Optional[List[str]] = None,
    ) -> Tuple[Tuple[np.ndarray, np.ndarray], List[np.ndarray]]:
        """Returns a given row in the dataset along with optional features.

        Args:
            index: The row to be returned. This is in the range of [0, num_rows)
            return_features: boolean that indicates whether to return features
            feature_vars: Optional, feature variables to extract
        Return:
            [Tuple[np.ndarray, np.ndarray]: data values and column pointes
            List[np.ndarray]: optional, corresponding features.
        """
        start = self.row_index[index]
        end = self.row_index[index + 1]
        values = self.data[start:end]
        columns = self.col_index[start:end]
        ret = (values, columns)
        if return_features:
            return ret, self._feature_index.lookup(index, select_features=feature_vars)[0]
        else:
            return ret, None

    def get_row_padded(
        self,
        index: int,
        return_features: bool = False,
        feature_vars: Optional[List[str]] = None,
    ) -> Tuple[np.ndarray, List[np.ndarray]]:
        """Returns a padded version of a row in the dataset.

        A padded version is one where the a sparse array representation is
        converted to a conventional represenentation. Optionally, features are
        returned.

        Args:
            index: The row to be returned
            return_features: boolean that indicates whether to return features
            feature_vars: Optional, feature variables to extract
        Return:
            np.ndarray: conventional row representation
            List[np.ndarray]: optional, corresponding features.
        """
        (row_values, row_column_pointer), features = self.get_row(index, return_features, feature_vars)
        return (
            _pad_sparse_array(row_values, row_column_pointer, self._feature_index.number_vars_at_row(index)),
            features,
        )

    def get_row_column(self, index: int, column: int, impute_missing_zeros: bool = True) -> Optional[float]:
        """Returns the value at a given index and the corresponding column.

        Args:
            index: The index to be returned
            column: The column to be returned
            impute_missing_zeros: boolean that indicates whether to set missing
            data to 0
        Return:
            A float that is the value in the array or None.
        """
        (row_values, row_column_pointer), _ = self.get_row(index)
        if column is not None:
            for col_index, col in enumerate(row_column_pointer):
                if col == column:
                    # return the value at this position
                    return row_values[col_index]
                elif col > column:
                    try:
                        raise ValueError(f"Column pointer {col} is larger than the column {column}.")
                    except ValueError:
                        break
            return 0.0 if impute_missing_zeros else None

    def features(self) -> Optional[RowFeatureIndex]:
        """Return the corresponding RowFeatureIndex."""
        return self._feature_index

    def _load_mmap_file_if_exists(self, file_path, dtype):
        if os.path.exists(file_path):
            return np.memmap(file_path, dtype=dtype, mode=self.mode)
        else:
            raise FileNotFoundError(f"The mmap file at {file_path} is missing")

    def load(self, stored_path: str) -> None:
        """Loads the data at store_path that is an np.memmap format.

        Args:
            stored_path: directory with np.memmap files
        Raises:
            FileNotFoundError if the corresponding directory or files are not
            found, or if the metadata file is not present.
        """
        if not os.path.exists(stored_path):
            raise FileNotFoundError(
                f"""Error: the specified data path to the mmap files {stored_path} does not exist.
                                    Specify an updated filepath or provide an h5ad path to the dataset. The data can
                                    be loaded with SingleCellMemMapDataset.load_h5ad. Alternatively, the class can be instantiated
                                    with  SingleCellMemMapDataset(<path to data that will be created>, h5ad_path=<path to h5ad file>"""
            )
        self.data_path = stored_path
        self.mode = Mode.READ_APPEND

        # Metadata is required, so we must check if it exists and fail if not.
        if not os.path.exists(f"{self.data_path}/{FileNames.METADATA.value}"):
            raise FileNotFoundError(
                f"Error: the metadata file {self.data_path}/{FileNames.METADATA.value} does not exist."
            )

        with open(f"{self.data_path}/{FileNames.METADATA.value}", Mode.READ_APPEND.value) as mfi:
            self.metadata = json.load(mfi)

        if os.path.exists(f"{self.data_path}/{FileNames.FEATURES.value}"):
            self._feature_index = RowFeatureIndex.load(f"{self.data_path}/{FileNames.FEATURES.value}")

        if os.path.exists(f"{self.data_path}/{FileNames.DTYPE.value}"):
            with open(f"{self.data_path}/{FileNames.DTYPE.value}") as dfi:
                self.dtypes = json.load(dfi)

        # mmap the existing arrays
        self.data = self._load_mmap_file_if_exists(
            f"{self.data_path}/{FileNames.DATA.value}", self.dtypes[f"{FileNames.DATA.value}"]
        )
        self.row_index = self._load_mmap_file_if_exists(
            f"{self.data_path}/{FileNames.ROWPTR.value}", dtype=self.dtypes[f"{FileNames.ROWPTR.value}"]
        )
        self.col_index = self._load_mmap_file_if_exists(
            f"{self.data_path}/{FileNames.COLPTR.value}", dtype=self.dtypes[f"{FileNames.COLPTR.value}"]
        )

    def _write_metadata(self) -> None:
        with open(f"{self.data_path}/{FileNames.METADATA.value}", f"{Mode.CREATE.value}") as mfi:
            json.dump(self.metadata, mfi)

    def regular_load_h5ad(
        self,
        anndata_path: str,
    ) -> Tuple[pd.DataFrame, int]:
        """Method for loading an h5ad file into memorySu and converting it to the SCDL format.

        Args:
            anndata_path: location of data to load
        Raises:
            NotImplementedError if the data is not in scipy.sparse.spmatrix format
            ValueError it there is not count data
        Returns:
            pd.DataFrame: var variables for features
            int: number of rows in the dataframe.

        """
        adata = ad.read_h5ad(anndata_path)  # slow

        if not isinstance(adata.X, scipy.sparse.spmatrix):
            raise NotImplementedError("Error: dense matrix loading not yet implemented.")

        # Check if raw data is present
        raw = getattr(adata, "raw", None)
        count_data = None
        if raw is not None:
            # If it is, attempt to get the counts in the raw data.
            count_data = getattr(raw, "X", None)

        if count_data is None:
            # No raw counts were present, resort to normalized
            count_data = getattr(adata, "X")
        if count_data is None:
            raise ValueError("This file does not have count data")

        shape = count_data.shape
        num_rows = shape[0]

        num_elements_stored = count_data.nnz

        self.dtypes[f"{FileNames.DATA.value}"] = count_data.dtype

        # Create the arrays.
        self._init_arrs(num_elements_stored, num_rows)
        # Store data
        self.data[0:num_elements_stored] = count_data.data

        # Store the col idx array
        self.col_index[0:num_elements_stored] = count_data.indices.astype(int)

        # Store the row idx array
        self.row_index[0 : num_rows + 1] = count_data.indptr.astype(int)

        return adata.var, num_rows

    def paginated_load_h5ad(
        self,
        anndata_path: str,
    ) -> Tuple[pd.DataFrame, int]:
        """Method for block loading a larger h5ad file and converting it to the SCDL format.

        This should be used in the case when the entire anndata file cannot be loaded into memory.
        The anndata is loaded into memory load_block_row_size number of rows at a time. Each chunk
        is converted into numpy memory maps which are then concatenated together.

        Raises:
            NotImplementedError if the data is not loaded in the CSRDataset format.

        Returns:
            pd.DataFrame: var variables for features
            int: number of rows in the dataframe.
        """
        adata = ad.read_h5ad(anndata_path, backed=True)

        if not isinstance(adata.X, ad.experimental.CSRDataset):
            raise NotImplementedError("Non-sparse format cannot be loaded: {type(adata.X)}.")
        num_rows = adata.X.shape[0]

        self.dtypes[f"{FileNames.DATA.value}"] = adata.X.dtype

        # Read the row indices into a memory map.
        mode = Mode.CREATE_APPEND
        self.row_index = _create_row_memmaps(num_rows, Path(self.data_path), mode, self.dtypes)
        self.row_index[:] = adata.X._indptr.astype(int)

        # The data from each column and data chunk of the original anndata file is read in. This is saved into the final
        # location of the memmap file. In this step, it is saved in the binary file format.
        memmap_dir_path = Path(self.data_path)
        with (
            open(f"{memmap_dir_path}/{FileNames.COLPTR.value}", "wb") as col_file,
            open(f"{memmap_dir_path}/{FileNames.DATA.value}", "wb") as data_file,
        ):
            n_elements = 0
            for row_start in range(0, num_rows, self.load_block_row_size):
                # Write each array's data to the file in binary format
                col_block = adata.X[row_start : row_start + self.load_block_row_size].indices
                col_file.write(col_block.tobytes())

                data_block = adata.X[row_start : row_start + self.load_block_row_size].data
                data_file.write(data_block.tobytes())

                n_elements += len(data_block)

        # The column and data files are re-opened as memory-mapped arrays with the final shape
        mode = Mode.READ_APPEND
        self.col_index = np.memmap(
            f"{memmap_dir_path}/{FileNames.COLPTR.value}",
            self.dtypes[f"{FileNames.COLPTR.value}"],
            mode=mode,
            shape=(n_elements,),
        )
        self.data = np.memmap(
            f"{memmap_dir_path}/{FileNames.DATA.value}",
            dtype=self.dtypes[f"{FileNames.DATA.value}"],
            mode=mode,
            shape=(n_elements,),
        )
        return adata.var, num_rows

    def load_h5ad(
        self,
        anndata_path: str,
    ) -> None:
        """Loads an existing AnnData archive from disk.

        This creates a new backing data structure which is saved.
        Note: the storage utilized will roughly double. Currently, the data must
        be in a scipy.sparse.spmatrix format.

        Args:
            anndata_path: location of data to load
        Raises:
            FileNotFoundError if the data path does not exist.
            NotImplementedError if the data is not in scipy.sparse.spmatrix
            format
            ValueError it there is not count data
        """
        if not os.path.exists(anndata_path):
            raise FileNotFoundError(f"Error: could not find h5ad path {anndata_path}")
        file_size_MB = os.path.getsize(anndata_path) / (1_024**2)

        if file_size_MB < self.paginated_load_cutoff:
            features_df, num_rows = self.regular_load_h5ad(anndata_path)

        else:
            features_df, num_rows = self.paginated_load_h5ad(anndata_path)

        features = {col: np.array(features_df[col].values) for col in features_df.columns}
        self._feature_index.append_features(
            n_obs=num_rows, features=features, num_genes=len(features[next(iter(features.keys()))]), label=anndata_path
        )
        self.save()

    def save(self, output_path: Optional[str] = None) -> None:
        """Saves the class to a given output path.

        Args:
            output_path: The location to save - not yet implemented and should
            be self.data_path

        Raises:
           NotImplementedError if output_path is not None.
        """
        if f"{METADATA.NUM_ROWS.value}" not in self.metadata:
            self.metadata[f"{METADATA.NUM_ROWS.value}"] = self.number_of_rows()

        self._write_metadata()
        # Write the feature index. This may not exist.
        self._feature_index.save(f"{self.data_path}/{FileNames.FEATURES.value}")

        # Ensure the object is in a valid state. These are saved at creation!
        for postfix in [
            f"{FileNames.VERSION.value}",
            f"{FileNames.DATA.value}",
            f"{FileNames.COLPTR.value}",
            f"{FileNames.ROWPTR.value}",
            f"{FileNames.FEATURES.value}",
        ]:
            if not os.path.exists(f"{self.data_path}/{postfix}"):
                raise FileNotFoundError(f"This file should exist from object creation: {self.data_path}/{postfix}")

        self.data.flush()
        self.row_index.flush()
        self.col_index.flush()

        if output_path is not None:
            raise NotImplementedError("Saving to separate path is not yet implemented.")

        return True

    def number_of_values(self) -> int:
        """Get the total number of values in the array.

        For each index, the length of the corresponding np.ndarray of features is counted.

        Returns:
            The sum of lengths of the features in every row
        """
        return sum(self._feature_index.number_of_values())

    def number_of_rows(self) -> int:
        """The number of rows in the dataset.

        Returns:
            The number of rows in the dataset
        Raises:
            ValueError if the length of the number of rows in the feature
            index does not correspond to the number of stored rows.
        """
        if len(self._feature_index) > 0 and self._feature_index.number_of_rows() != self.row_index.size - 1:
            raise ValueError(
                f"""The nuber of rows in the feature index {self._feature_index.number_of_rows()}
                             does not correspond to the number of rows in the row_index {self.row_index.size - 1}"""
            )
        return self._feature_index.number_of_rows()

    def number_nonzero_values(self) -> int:
        """Number of non zero entries in the dataset."""
        return self.data.size

    def __len__(self):
        """Return the number of rows."""
        return self.number_of_rows()

    def __getitem__(self, idx: int) -> torch.Tensor:
        """Get the row values located and index idx."""
        return torch.from_numpy(np.stack(self.get_row(idx)[0]))

    def number_of_variables(self) -> List[int]:
        """Get the number of features in every entry in the dataset.

        Returns:
            A list containing the lengths of the features in every row
        """
        feats = self._feature_index
        if len(feats) == 0:
            return [0]
        num_vars = feats.column_dims()
        return num_vars

    def shape(self) -> Tuple[int, List[int]]:
        """Get the shape of the dataset.

        This is the number of entries by the the length of the feature index
        corresponding to that variable.

        Returns:
            The number of elements in the dataset
            A list containing the number of variables for each row.
        """
        return self.number_of_rows(), self.number_of_variables()

    def concat(
        self,
        other_dataset: Union[list["SingleCellMemMapDataset"], "SingleCellMemMapDataset"],
    ) -> None:
        """Concatenates another SingleCellMemMapDataset to the existing one.

        The data is stored in the same place as for the original data set. This
        necessitates using _swap_memmap_array.

        Args:
            other_dataset: A SingleCellMemMapDataset or a list of
            SingleCellMemMapDatasets

        Raises:
           ValueError if the other dataset(s) are not of the same version or
           something of another type is passed in.
        """
        # Verify the other dataset or datasets are of the same type.
        match other_dataset:
            case self.__class__():
                other_dataset = [other_dataset]
            case list():
                pass
            case _:
                raise ValueError(
                    f"Expecting either a {SingleCellMemMapDataset} or a list thereof. Actually got: {type(other_dataset)}"
                )

        for dataset in other_dataset:
            if self.version() != dataset.version():
                raise ValueError(
                    f"""Incompatable versions: input version: {dataset.version()},
            this version:  {self.version}"""
                )

        # Set our mode:
        self.mode: Mode = Mode.READ_APPEND

        mmaps = []
        mmaps.extend(other_dataset)
        # Calculate the size of our new dataset arrays
        total_num_elements = (self.number_nonzero_values() if self.number_of_rows() > 0 else 0) + sum(
            [m.number_nonzero_values() for m in mmaps]
        )
        total_num_rows = self.number_of_rows() + sum([m.number_of_rows() for m in mmaps])

        # Create new arrays to store the data, colptr, and rowptr.
        with tempfile.TemporaryDirectory(prefix="_tmp", dir=self.data_path) as tmp:
            data_arr, col_arr, row_arr = _create_compressed_sparse_row_memmaps(
                num_elements=total_num_elements,
                num_rows=total_num_rows,
                memmap_dir_path=Path(tmp),
                mode=Mode.CREATE_APPEND,
                dtypes=self.dtypes,
            )
            # Copy the data from self and other into the new arrays.
            cumulative_elements = 0
            cumulative_rows = 0
            if self.number_of_rows() > 0:
                data_arr[cumulative_elements : cumulative_elements + self.number_nonzero_values()] = self.data.data
                col_arr[cumulative_elements : cumulative_elements + self.number_nonzero_values()] = self.col_index.data
                row_arr[cumulative_rows : cumulative_rows + self.number_of_rows() + 1] = self.row_index.data
                cumulative_elements += self.number_nonzero_values()
                cumulative_rows += self.number_of_rows()
            for mmap in mmaps:
                # Fill the data array for the span of this scmmap
                data_arr[cumulative_elements : cumulative_elements + mmap.number_nonzero_values()] = mmap.data.data
                # fill the col array for the span of this scmmap
                col_arr[cumulative_elements : cumulative_elements + mmap.number_nonzero_values()] = mmap.col_index.data
                # Fill the row array for the span of this scmmap
                row_arr[cumulative_rows : cumulative_rows + mmap.number_of_rows() + 1] = (
                    mmap.row_index + int(cumulative_elements)
                ).data

                self._feature_index.concat(mmap._feature_index)
                # Update counters
                cumulative_elements += mmap.number_nonzero_values()
                cumulative_rows += mmap.number_of_rows()
            # The arrays are swapped to ensure that the data remains stored at self.data_path and
            # not at a temporary filepath.
            _swap_mmap_array(
                data_arr,
                f"{tmp}/{FileNames.DATA.value}",
                self.data,
                f"{self.data_path}/{FileNames.DATA.value}",
                destroy_src=True,
            )
            _swap_mmap_array(
                col_arr,
                f"{tmp}/{FileNames.COLPTR.value}",
                self.col_index,
                f"{self.data_path}/{FileNames.COLPTR.value}",
                destroy_src=True,
            )
            _swap_mmap_array(
                row_arr,
                f"{tmp}/{FileNames.ROWPTR.value}",
                self.row_index,
                f"{self.data_path}/{FileNames.ROWPTR.value}",
                destroy_src=True,
            )
            # Reopen the data, colptr, and rowptr arrays
            self.data = np.memmap(
                f"{self.data_path}/{FileNames.DATA.value}",
                dtype=self.dtypes[f"{FileNames.DATA.value}"],
                shape=(cumulative_elements,),
                mode=Mode.READ_APPEND.value,
            )
            self.row_index = np.memmap(
                f"{self.data_path}/{FileNames.ROWPTR.value}",
                dtype=self.dtypes[f"{FileNames.ROWPTR.value}"],
                shape=(cumulative_rows + 1,),
                mode=Mode.READ_APPEND.value,
            )
            self.col_index = np.memmap(
                f"{self.data_path}/{FileNames.COLPTR.value}",
                dtype=self.dtypes[f"{FileNames.COLPTR.value}"],
                shape=(cumulative_elements,),
                mode=Mode.READ_APPEND.value,
            )

        self.save()

__getitem__(idx)

获取位于索引 idx 的行值。

bionemo/scdl/io/single_cell_memmap_dataset.py 中的源代码
696
697
698
def __getitem__(self, idx: int) -> torch.Tensor:
    """Get the row values located and index idx."""
    return torch.from_numpy(np.stack(self.get_row(idx)[0]))

__init__(data_path, h5ad_path=None, num_elements=None, num_rows=None, mode=Mode.READ_APPEND, paginated_load_cutoff=10000, load_block_row_size=1000000)

实例化类。

参数

名称 类型 描述 默认值
data_path str

从中读取数据 np.memmap 文件的位置

必需
h5ad_path Optional[str]

可选,h5_ad 路径的位置。

num_elements Optional[int]

数组中的元素总数。

num_rows Optional[int]

数据框中的行数。

mode Mode

是否从 data_path 读取或写入。

READ_APPEND
paginated_load_cutoff int

磁盘上的 MB 大小,超过此大小将使用分页加载来加载 h5ad 结构。

10000
load_block_row_size int

使用分页加载加载到内存中的行数

1000000
bionemo/scdl/io/single_cell_memmap_dataset.py 中的源代码
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
def __init__(
    self,
    data_path: str,
    h5ad_path: Optional[str] = None,
    num_elements: Optional[int] = None,
    num_rows: Optional[int] = None,
    mode: Mode = Mode.READ_APPEND,
    paginated_load_cutoff: int = 10_000,
    load_block_row_size: int = 1_000_000,
) -> None:
    """Instantiate the class.

    Args:
        data_path: The location where the data np.memmap files are read from
        or stored.
        h5ad_path: Optional, the location of the h5_ad path.
        num_elements: The total number of elements in the array.
        num_rows: The number of rows in the data frame.
        mode: Whether to read or write from the data_path.
        paginated_load_cutoff: MB size on disk at which to load the h5ad structure with paginated load.
        load_block_row_size: Number of rows to load into memory with paginated load
    """
    self._version: str = importlib.metadata.version("bionemo.scdl")
    self.data_path: str = data_path
    self.mode: Mode = mode
    self.paginated_load_cutoff = paginated_load_cutoff
    self.load_block_row_size = load_block_row_size
    # Backing arrays
    self.data: Optional[np.ndarray] = None
    self.row_index: Optional[np.ndarray] = None
    self.row_index: Optional[np.ndarray] = None

    # Metadata and attributes
    self.metadata: Dict[str, int] = {}

    # Stores the Feature Index, which tracks
    # the original AnnData features (e.g., gene names)
    # and allows us to store ragged arrays in our SCMMAP structure.
    self._feature_index: RowFeatureIndex = RowFeatureIndex()

    # Variables for int packing / reduced precision
    self.dtypes: Dict[FileNames, str] = {
        f"{FileNames.DATA.value}": "float32",
        f"{FileNames.COLPTR.value}": "uint32",
        f"{FileNames.ROWPTR.value}": "uint64",
    }

    if mode == Mode.CREATE_APPEND and os.path.exists(data_path):
        raise FileExistsError(f"Output directory already exists: {data_path}")

    if h5ad_path is not None and (data_path is not None and os.path.exists(data_path)):
        raise FileExistsError(
            "Invalid input; both an existing SCMMAP and an h5ad file were passed. "
            "Please pass either an existing SCMMAP or an h5ad file."
        )

    # If there is only a data path, and it exists already, load SCMMAP data.
    elif data_path is not None and os.path.exists(data_path):
        self.__init__obj()
        self.load(data_path)

    # If there is only an h5ad path, load the HDF5 data
    elif h5ad_path is not None:
        self.__init__obj()
        self.load_h5ad(h5ad_path)
    else:
        match num_rows, num_elements:
            case (int(), int()):
                self.__init__obj()
                self._init_arrs(num_elements=num_elements, num_rows=num_rows)
            case _:
                raise ValueError(
                    "An np.memmap path, an h5ad path, or the number of elements and rows is required" ""
                )

__init__obj()

初始化数据路径并写入版本。

bionemo/scdl/io/single_cell_memmap_dataset.py 中的源代码
308
309
310
311
312
313
314
315
def __init__obj(self):
    """Initializes the datapath and writes the version."""
    os.makedirs(self.data_path, exist_ok=True)

    # Write the version
    if not os.path.exists(f"{self.data_path}/{FileNames.VERSION.value}"):
        with open(f"{self.data_path}/{FileNames.VERSION.value}", "w") as vfi:
            json.dump(self.version(), vfi)

__len__()

返回行数。

bionemo/scdl/io/single_cell_memmap_dataset.py 中的源代码
692
693
694
def __len__(self):
    """Return the number of rows."""
    return self.number_of_rows()

concat(other_dataset)

将另一个 SingleCellMemMapDataset 连接到现有的数据集。

数据存储在与原始数据集相同的位置。这需要使用 _swap_memmap_array。

参数

名称 类型 描述 默认值
other_dataset Union[list[SingleCellMemMapDataset], SingleCellMemMapDataset]

SingleCellMemMapDataset 或列表

必需
bionemo/scdl/io/single_cell_memmap_dataset.py 中的源代码
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
def concat(
    self,
    other_dataset: Union[list["SingleCellMemMapDataset"], "SingleCellMemMapDataset"],
) -> None:
    """Concatenates another SingleCellMemMapDataset to the existing one.

    The data is stored in the same place as for the original data set. This
    necessitates using _swap_memmap_array.

    Args:
        other_dataset: A SingleCellMemMapDataset or a list of
        SingleCellMemMapDatasets

    Raises:
       ValueError if the other dataset(s) are not of the same version or
       something of another type is passed in.
    """
    # Verify the other dataset or datasets are of the same type.
    match other_dataset:
        case self.__class__():
            other_dataset = [other_dataset]
        case list():
            pass
        case _:
            raise ValueError(
                f"Expecting either a {SingleCellMemMapDataset} or a list thereof. Actually got: {type(other_dataset)}"
            )

    for dataset in other_dataset:
        if self.version() != dataset.version():
            raise ValueError(
                f"""Incompatable versions: input version: {dataset.version()},
        this version:  {self.version}"""
            )

    # Set our mode:
    self.mode: Mode = Mode.READ_APPEND

    mmaps = []
    mmaps.extend(other_dataset)
    # Calculate the size of our new dataset arrays
    total_num_elements = (self.number_nonzero_values() if self.number_of_rows() > 0 else 0) + sum(
        [m.number_nonzero_values() for m in mmaps]
    )
    total_num_rows = self.number_of_rows() + sum([m.number_of_rows() for m in mmaps])

    # Create new arrays to store the data, colptr, and rowptr.
    with tempfile.TemporaryDirectory(prefix="_tmp", dir=self.data_path) as tmp:
        data_arr, col_arr, row_arr = _create_compressed_sparse_row_memmaps(
            num_elements=total_num_elements,
            num_rows=total_num_rows,
            memmap_dir_path=Path(tmp),
            mode=Mode.CREATE_APPEND,
            dtypes=self.dtypes,
        )
        # Copy the data from self and other into the new arrays.
        cumulative_elements = 0
        cumulative_rows = 0
        if self.number_of_rows() > 0:
            data_arr[cumulative_elements : cumulative_elements + self.number_nonzero_values()] = self.data.data
            col_arr[cumulative_elements : cumulative_elements + self.number_nonzero_values()] = self.col_index.data
            row_arr[cumulative_rows : cumulative_rows + self.number_of_rows() + 1] = self.row_index.data
            cumulative_elements += self.number_nonzero_values()
            cumulative_rows += self.number_of_rows()
        for mmap in mmaps:
            # Fill the data array for the span of this scmmap
            data_arr[cumulative_elements : cumulative_elements + mmap.number_nonzero_values()] = mmap.data.data
            # fill the col array for the span of this scmmap
            col_arr[cumulative_elements : cumulative_elements + mmap.number_nonzero_values()] = mmap.col_index.data
            # Fill the row array for the span of this scmmap
            row_arr[cumulative_rows : cumulative_rows + mmap.number_of_rows() + 1] = (
                mmap.row_index + int(cumulative_elements)
            ).data

            self._feature_index.concat(mmap._feature_index)
            # Update counters
            cumulative_elements += mmap.number_nonzero_values()
            cumulative_rows += mmap.number_of_rows()
        # The arrays are swapped to ensure that the data remains stored at self.data_path and
        # not at a temporary filepath.
        _swap_mmap_array(
            data_arr,
            f"{tmp}/{FileNames.DATA.value}",
            self.data,
            f"{self.data_path}/{FileNames.DATA.value}",
            destroy_src=True,
        )
        _swap_mmap_array(
            col_arr,
            f"{tmp}/{FileNames.COLPTR.value}",
            self.col_index,
            f"{self.data_path}/{FileNames.COLPTR.value}",
            destroy_src=True,
        )
        _swap_mmap_array(
            row_arr,
            f"{tmp}/{FileNames.ROWPTR.value}",
            self.row_index,
            f"{self.data_path}/{FileNames.ROWPTR.value}",
            destroy_src=True,
        )
        # Reopen the data, colptr, and rowptr arrays
        self.data = np.memmap(
            f"{self.data_path}/{FileNames.DATA.value}",
            dtype=self.dtypes[f"{FileNames.DATA.value}"],
            shape=(cumulative_elements,),
            mode=Mode.READ_APPEND.value,
        )
        self.row_index = np.memmap(
            f"{self.data_path}/{FileNames.ROWPTR.value}",
            dtype=self.dtypes[f"{FileNames.ROWPTR.value}"],
            shape=(cumulative_rows + 1,),
            mode=Mode.READ_APPEND.value,
        )
        self.col_index = np.memmap(
            f"{self.data_path}/{FileNames.COLPTR.value}",
            dtype=self.dtypes[f"{FileNames.COLPTR.value}"],
            shape=(cumulative_elements,),
            mode=Mode.READ_APPEND.value,
        )

    self.save()

features()

返回相应的 RowFeatureIndex。

bionemo/scdl/io/single_cell_memmap_dataset.py 中的源代码
413
414
415
def features(self) -> Optional[RowFeatureIndex]:
    """Return the corresponding RowFeatureIndex."""
    return self._feature_index

get_row(index, return_features=False, feature_vars=None)

返回数据集中的给定行以及可选的特征。

参数

名称 类型 描述 默认值
index int

要返回的行。范围为 [0, num_rows)

必需
return_features bool

指示是否返回特征的布尔值

False
feature_vars Optional[List[str]]

可选,要提取的特征变量

返回: [Tuple[np.ndarray, np.ndarray]: 数据值和列指针 List[np.ndarray]: 可选,相应的特征。

bionemo/scdl/io/single_cell_memmap_dataset.py 中的源代码
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
def get_row(
    self,
    index: int,
    return_features: bool = False,
    feature_vars: Optional[List[str]] = None,
) -> Tuple[Tuple[np.ndarray, np.ndarray], List[np.ndarray]]:
    """Returns a given row in the dataset along with optional features.

    Args:
        index: The row to be returned. This is in the range of [0, num_rows)
        return_features: boolean that indicates whether to return features
        feature_vars: Optional, feature variables to extract
    Return:
        [Tuple[np.ndarray, np.ndarray]: data values and column pointes
        List[np.ndarray]: optional, corresponding features.
    """
    start = self.row_index[index]
    end = self.row_index[index + 1]
    values = self.data[start:end]
    columns = self.col_index[start:end]
    ret = (values, columns)
    if return_features:
        return ret, self._feature_index.lookup(index, select_features=feature_vars)[0]
    else:
        return ret, None

get_row_column(index, column, impute_missing_zeros=True)

返回给定索引处的值和相应的列。

参数

名称 类型 描述 默认值
index int

要返回的索引

必需
column int

要返回的列

必需
impute_missing_zeros bool

指示是否设置缺失值的布尔值

True

返回: 一个浮点数,表示数组中的值,或者 None。

bionemo/scdl/io/single_cell_memmap_dataset.py 中的源代码
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
def get_row_column(self, index: int, column: int, impute_missing_zeros: bool = True) -> Optional[float]:
    """Returns the value at a given index and the corresponding column.

    Args:
        index: The index to be returned
        column: The column to be returned
        impute_missing_zeros: boolean that indicates whether to set missing
        data to 0
    Return:
        A float that is the value in the array or None.
    """
    (row_values, row_column_pointer), _ = self.get_row(index)
    if column is not None:
        for col_index, col in enumerate(row_column_pointer):
            if col == column:
                # return the value at this position
                return row_values[col_index]
            elif col > column:
                try:
                    raise ValueError(f"Column pointer {col} is larger than the column {column}.")
                except ValueError:
                    break
        return 0.0 if impute_missing_zeros else None

get_row_padded(index, return_features=False, feature_vars=None)

返回数据集中行的填充版本。

填充版本是指将稀疏数组表示形式转换为传统表示形式的版本。可选地,返回特征。

参数

名称 类型 描述 默认值
index int

要返回的行

必需
return_features bool

指示是否返回特征的布尔值

False
feature_vars Optional[List[str]]

可选,要提取的特征变量

返回: np.ndarray: 传统行表示 List[np.ndarray]: 可选,相应的特征。

bionemo/scdl/io/single_cell_memmap_dataset.py 中的源代码
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
def get_row_padded(
    self,
    index: int,
    return_features: bool = False,
    feature_vars: Optional[List[str]] = None,
) -> Tuple[np.ndarray, List[np.ndarray]]:
    """Returns a padded version of a row in the dataset.

    A padded version is one where the a sparse array representation is
    converted to a conventional represenentation. Optionally, features are
    returned.

    Args:
        index: The row to be returned
        return_features: boolean that indicates whether to return features
        feature_vars: Optional, feature variables to extract
    Return:
        np.ndarray: conventional row representation
        List[np.ndarray]: optional, corresponding features.
    """
    (row_values, row_column_pointer), features = self.get_row(index, return_features, feature_vars)
    return (
        _pad_sparse_array(row_values, row_column_pointer, self._feature_index.number_vars_at_row(index)),
        features,
    )

load(stored_path)

加载存储在 store_path 的数据,该路径是 np.memmap 格式。

参数

名称 类型 描述 默认值
stored_path str

包含 np.memmap 文件的目录

必需

Raises: FileNotFoundError 如果找不到相应的目录或文件,或者如果元数据文件不存在。

bionemo/scdl/io/single_cell_memmap_dataset.py 中的源代码
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
def load(self, stored_path: str) -> None:
    """Loads the data at store_path that is an np.memmap format.

    Args:
        stored_path: directory with np.memmap files
    Raises:
        FileNotFoundError if the corresponding directory or files are not
        found, or if the metadata file is not present.
    """
    if not os.path.exists(stored_path):
        raise FileNotFoundError(
            f"""Error: the specified data path to the mmap files {stored_path} does not exist.
                                Specify an updated filepath or provide an h5ad path to the dataset. The data can
                                be loaded with SingleCellMemMapDataset.load_h5ad. Alternatively, the class can be instantiated
                                with  SingleCellMemMapDataset(<path to data that will be created>, h5ad_path=<path to h5ad file>"""
        )
    self.data_path = stored_path
    self.mode = Mode.READ_APPEND

    # Metadata is required, so we must check if it exists and fail if not.
    if not os.path.exists(f"{self.data_path}/{FileNames.METADATA.value}"):
        raise FileNotFoundError(
            f"Error: the metadata file {self.data_path}/{FileNames.METADATA.value} does not exist."
        )

    with open(f"{self.data_path}/{FileNames.METADATA.value}", Mode.READ_APPEND.value) as mfi:
        self.metadata = json.load(mfi)

    if os.path.exists(f"{self.data_path}/{FileNames.FEATURES.value}"):
        self._feature_index = RowFeatureIndex.load(f"{self.data_path}/{FileNames.FEATURES.value}")

    if os.path.exists(f"{self.data_path}/{FileNames.DTYPE.value}"):
        with open(f"{self.data_path}/{FileNames.DTYPE.value}") as dfi:
            self.dtypes = json.load(dfi)

    # mmap the existing arrays
    self.data = self._load_mmap_file_if_exists(
        f"{self.data_path}/{FileNames.DATA.value}", self.dtypes[f"{FileNames.DATA.value}"]
    )
    self.row_index = self._load_mmap_file_if_exists(
        f"{self.data_path}/{FileNames.ROWPTR.value}", dtype=self.dtypes[f"{FileNames.ROWPTR.value}"]
    )
    self.col_index = self._load_mmap_file_if_exists(
        f"{self.data_path}/{FileNames.COLPTR.value}", dtype=self.dtypes[f"{FileNames.COLPTR.value}"]
    )

load_h5ad(anndata_path)

从磁盘加载现有的 AnnData 存档。

这将创建一个新的后备数据结构并保存。注意:使用的存储空间将大致翻倍。目前,数据必须为 scipy.sparse.spmatrix 格式。

参数

名称 类型 描述 默认值
anndata_path str

要加载的数据的位置

必需

Raises: FileNotFoundError 如果数据路径不存在。NotImplementedError 如果数据不是 scipy.sparse.spmatrix 格式 ValueError 如果没有计数数据

bionemo/scdl/io/single_cell_memmap_dataset.py 中的源代码
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
def load_h5ad(
    self,
    anndata_path: str,
) -> None:
    """Loads an existing AnnData archive from disk.

    This creates a new backing data structure which is saved.
    Note: the storage utilized will roughly double. Currently, the data must
    be in a scipy.sparse.spmatrix format.

    Args:
        anndata_path: location of data to load
    Raises:
        FileNotFoundError if the data path does not exist.
        NotImplementedError if the data is not in scipy.sparse.spmatrix
        format
        ValueError it there is not count data
    """
    if not os.path.exists(anndata_path):
        raise FileNotFoundError(f"Error: could not find h5ad path {anndata_path}")
    file_size_MB = os.path.getsize(anndata_path) / (1_024**2)

    if file_size_MB < self.paginated_load_cutoff:
        features_df, num_rows = self.regular_load_h5ad(anndata_path)

    else:
        features_df, num_rows = self.paginated_load_h5ad(anndata_path)

    features = {col: np.array(features_df[col].values) for col in features_df.columns}
    self._feature_index.append_features(
        n_obs=num_rows, features=features, num_genes=len(features[next(iter(features.keys()))]), label=anndata_path
    )
    self.save()

number_nonzero_values()

数据集中非零条目的数量。

bionemo/scdl/io/single_cell_memmap_dataset.py 中的源代码
688
689
690
def number_nonzero_values(self) -> int:
    """Number of non zero entries in the dataset."""
    return self.data.size

number_of_rows()

数据集中的行数。

返回

类型 描述
int

数据集中的行数

Raises: ValueError 如果特征索引中行数的长度与存储的行数不对应。

bionemo/scdl/io/single_cell_memmap_dataset.py 中的源代码
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
def number_of_rows(self) -> int:
    """The number of rows in the dataset.

    Returns:
        The number of rows in the dataset
    Raises:
        ValueError if the length of the number of rows in the feature
        index does not correspond to the number of stored rows.
    """
    if len(self._feature_index) > 0 and self._feature_index.number_of_rows() != self.row_index.size - 1:
        raise ValueError(
            f"""The nuber of rows in the feature index {self._feature_index.number_of_rows()}
                         does not correspond to the number of rows in the row_index {self.row_index.size - 1}"""
        )
    return self._feature_index.number_of_rows()

number_of_values()

获取数组中值的总数。

对于每个索引,计算相应特征 np.ndarray 的长度。

返回

类型 描述
int

每行特征长度的总和

bionemo/scdl/io/single_cell_memmap_dataset.py 中的源代码
662
663
664
665
666
667
668
669
670
def number_of_values(self) -> int:
    """Get the total number of values in the array.

    For each index, the length of the corresponding np.ndarray of features is counted.

    Returns:
        The sum of lengths of the features in every row
    """
    return sum(self._feature_index.number_of_values())

number_of_variables()

获取数据集中每个条目的特征数量。

返回

类型 描述
List[int]

包含每行特征长度的列表

bionemo/scdl/io/single_cell_memmap_dataset.py 中的源代码
700
701
702
703
704
705
706
707
708
709
710
def number_of_variables(self) -> List[int]:
    """Get the number of features in every entry in the dataset.

    Returns:
        A list containing the lengths of the features in every row
    """
    feats = self._feature_index
    if len(feats) == 0:
        return [0]
    num_vars = feats.column_dims()
    return num_vars

paginated_load_h5ad(anndata_path)

用于分块加载较大的 h5ad 文件并将其转换为 SCDL 格式的方法。

当整个 anndata 文件无法加载到内存中时,应使用此方法。anndata 一次加载 load_block_row_size 行到内存中。每个块都转换为 numpy 内存映射,然后将它们连接在一起。

返回

名称 类型 描述
DataFrame

pd.DataFrame: 特征的 var 变量

int int

数据框中的行数。

bionemo/scdl/io/single_cell_memmap_dataset.py 中的源代码
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
def paginated_load_h5ad(
    self,
    anndata_path: str,
) -> Tuple[pd.DataFrame, int]:
    """Method for block loading a larger h5ad file and converting it to the SCDL format.

    This should be used in the case when the entire anndata file cannot be loaded into memory.
    The anndata is loaded into memory load_block_row_size number of rows at a time. Each chunk
    is converted into numpy memory maps which are then concatenated together.

    Raises:
        NotImplementedError if the data is not loaded in the CSRDataset format.

    Returns:
        pd.DataFrame: var variables for features
        int: number of rows in the dataframe.
    """
    adata = ad.read_h5ad(anndata_path, backed=True)

    if not isinstance(adata.X, ad.experimental.CSRDataset):
        raise NotImplementedError("Non-sparse format cannot be loaded: {type(adata.X)}.")
    num_rows = adata.X.shape[0]

    self.dtypes[f"{FileNames.DATA.value}"] = adata.X.dtype

    # Read the row indices into a memory map.
    mode = Mode.CREATE_APPEND
    self.row_index = _create_row_memmaps(num_rows, Path(self.data_path), mode, self.dtypes)
    self.row_index[:] = adata.X._indptr.astype(int)

    # The data from each column and data chunk of the original anndata file is read in. This is saved into the final
    # location of the memmap file. In this step, it is saved in the binary file format.
    memmap_dir_path = Path(self.data_path)
    with (
        open(f"{memmap_dir_path}/{FileNames.COLPTR.value}", "wb") as col_file,
        open(f"{memmap_dir_path}/{FileNames.DATA.value}", "wb") as data_file,
    ):
        n_elements = 0
        for row_start in range(0, num_rows, self.load_block_row_size):
            # Write each array's data to the file in binary format
            col_block = adata.X[row_start : row_start + self.load_block_row_size].indices
            col_file.write(col_block.tobytes())

            data_block = adata.X[row_start : row_start + self.load_block_row_size].data
            data_file.write(data_block.tobytes())

            n_elements += len(data_block)

    # The column and data files are re-opened as memory-mapped arrays with the final shape
    mode = Mode.READ_APPEND
    self.col_index = np.memmap(
        f"{memmap_dir_path}/{FileNames.COLPTR.value}",
        self.dtypes[f"{FileNames.COLPTR.value}"],
        mode=mode,
        shape=(n_elements,),
    )
    self.data = np.memmap(
        f"{memmap_dir_path}/{FileNames.DATA.value}",
        dtype=self.dtypes[f"{FileNames.DATA.value}"],
        mode=mode,
        shape=(n_elements,),
    )
    return adata.var, num_rows

regular_load_h5ad(anndata_path)

用于将 h5ad 文件加载到内存并将其转换为 SCDL 格式的方法。

参数

名称 类型 描述 默认值
anndata_path str

要加载的数据的位置

必需

Raises: NotImplementedError 如果数据不是 scipy.sparse.spmatrix 格式 ValueError 如果没有计数数据 Returns: pd.DataFrame: 特征的 var 变量 int: 数据框中的行数。

bionemo/scdl/io/single_cell_memmap_dataset.py 中的源代码
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
def regular_load_h5ad(
    self,
    anndata_path: str,
) -> Tuple[pd.DataFrame, int]:
    """Method for loading an h5ad file into memorySu and converting it to the SCDL format.

    Args:
        anndata_path: location of data to load
    Raises:
        NotImplementedError if the data is not in scipy.sparse.spmatrix format
        ValueError it there is not count data
    Returns:
        pd.DataFrame: var variables for features
        int: number of rows in the dataframe.

    """
    adata = ad.read_h5ad(anndata_path)  # slow

    if not isinstance(adata.X, scipy.sparse.spmatrix):
        raise NotImplementedError("Error: dense matrix loading not yet implemented.")

    # Check if raw data is present
    raw = getattr(adata, "raw", None)
    count_data = None
    if raw is not None:
        # If it is, attempt to get the counts in the raw data.
        count_data = getattr(raw, "X", None)

    if count_data is None:
        # No raw counts were present, resort to normalized
        count_data = getattr(adata, "X")
    if count_data is None:
        raise ValueError("This file does not have count data")

    shape = count_data.shape
    num_rows = shape[0]

    num_elements_stored = count_data.nnz

    self.dtypes[f"{FileNames.DATA.value}"] = count_data.dtype

    # Create the arrays.
    self._init_arrs(num_elements_stored, num_rows)
    # Store data
    self.data[0:num_elements_stored] = count_data.data

    # Store the col idx array
    self.col_index[0:num_elements_stored] = count_data.indices.astype(int)

    # Store the row idx array
    self.row_index[0 : num_rows + 1] = count_data.indptr.astype(int)

    return adata.var, num_rows

save(output_path=None)

将类保存到给定的输出路径。

参数

名称 类型 描述 默认值
output_path Optional[str]

要保存的位置 - 尚未实现,应

bionemo/scdl/io/single_cell_memmap_dataset.py 中的源代码
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
def save(self, output_path: Optional[str] = None) -> None:
    """Saves the class to a given output path.

    Args:
        output_path: The location to save - not yet implemented and should
        be self.data_path

    Raises:
       NotImplementedError if output_path is not None.
    """
    if f"{METADATA.NUM_ROWS.value}" not in self.metadata:
        self.metadata[f"{METADATA.NUM_ROWS.value}"] = self.number_of_rows()

    self._write_metadata()
    # Write the feature index. This may not exist.
    self._feature_index.save(f"{self.data_path}/{FileNames.FEATURES.value}")

    # Ensure the object is in a valid state. These are saved at creation!
    for postfix in [
        f"{FileNames.VERSION.value}",
        f"{FileNames.DATA.value}",
        f"{FileNames.COLPTR.value}",
        f"{FileNames.ROWPTR.value}",
        f"{FileNames.FEATURES.value}",
    ]:
        if not os.path.exists(f"{self.data_path}/{postfix}"):
            raise FileNotFoundError(f"This file should exist from object creation: {self.data_path}/{postfix}")

    self.data.flush()
    self.row_index.flush()
    self.col_index.flush()

    if output_path is not None:
        raise NotImplementedError("Saving to separate path is not yet implemented.")

    return True

shape()

获取数据集的形状。

这是条目数乘以与该变量对应的特征索引的长度。

返回

类型 描述
int

数据集中的元素数

List[int]

包含每行变量数的列表。

bionemo/scdl/io/single_cell_memmap_dataset.py 中的源代码
712
713
714
715
716
717
718
719
720
721
722
def shape(self) -> Tuple[int, List[int]]:
    """Get the shape of the dataset.

    This is the number of entries by the the length of the feature index
    corresponding to that variable.

    Returns:
        The number of elements in the dataset
        A list containing the number of variables for each row.
    """
    return self.number_of_rows(), self.number_of_variables()

version()

返回版本号。

(following..惯例).

bionemo/scdl/io/single_cell_memmap_dataset.py 中的源代码
330
331
332
333
334
335
def version(self) -> str:
    """Returns a version number.

    (following <major>.<minor>.<point> convention).
    """
    return self._version