在 PyTorch Lightning 中使用 DALI#
概述#
此示例展示了如何在 PyTorch Lightning 中使用 DALI。
让我们获取一个玩具示例,展示分类网络,并了解 DALI 如何加速它。
DALI_EXTRA_PATH 环境变量应指向 DALI extra 副本。请确保检出与您的 DALI 版本关联的正确发布标签。
[1]:
import torch
from torch.nn import functional as F
from torch import nn
from pytorch_lightning import Trainer, LightningModule
from torch.optim import Adam
from torchvision.datasets import MNIST
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import os
BATCH_SIZE = 64
# workaround for https://github.com/pytorch/vision/issues/1938 - error 403 when
# downloading mnist dataset
import urllib
opener = urllib.request.build_opener()
opener.addheaders = [("User-agent", "Mozilla/5.0")]
urllib.request.install_opener(opener)
我们将从实现一个使用原生数据加载器的训练类开始
[2]:
class LitMNIST(LightningModule):
def __init__(self):
super().__init__()
# mnist images are (1, 28, 28) (channels, width, height)
self.layer_1 = torch.nn.Linear(28 * 28, 128)
self.layer_2 = torch.nn.Linear(128, 256)
self.layer_3 = torch.nn.Linear(256, 10)
def forward(self, x):
batch_size, channels, width, height = x.size()
# (b, 1, 28, 28) -> (b, 1*28*28)
x = x.view(batch_size, -1)
x = self.layer_1(x)
x = F.relu(x)
x = self.layer_2(x)
x = F.relu(x)
x = self.layer_3(x)
x = F.log_softmax(x, dim=1)
return x
def process_batch(self, batch):
return batch
def training_step(self, batch, batch_idx):
x, y = self.process_batch(batch)
logits = self(x)
loss = F.nll_loss(logits, y)
return loss
def cross_entropy_loss(self, logits, labels):
return F.nll_loss(logits, labels)
def configure_optimizers(self):
return Adam(self.parameters(), lr=1e-3)
def prepare_data(self):
# download data only
self.mnist_train = MNIST(
os.getcwd(),
train=True,
download=True,
transform=transforms.ToTensor(),
)
def setup(self, stage=None):
# transforms for images
transform = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
)
self.mnist_train = MNIST(
os.getcwd(), train=True, download=False, transform=transform
)
def train_dataloader(self):
return DataLoader(
self.mnist_train, batch_size=64, num_workers=8, pin_memory=True
)
并查看它的工作方式
[3]:
model = LitMNIST()
trainer = Trainer(max_epochs=5, devices=1, accelerator="gpu")
# ddp work only in no-interactive mode, to test it unncoment and run as a script
# trainer = Trainer(devices=8, accelerator="gpu", strategy="ddp", max_epochs=5)
## MNIST data set is not always available to download due to network issues
## to run this part of example either uncomment below line
# trainer.fit(model)
GPU available: True, used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
下一步是定义一个 DALI pipeline,它将用于加载和预处理数据。
[4]:
import nvidia.dali as dali
from nvidia.dali import pipeline_def
import nvidia.dali.fn as fn
import nvidia.dali.types as types
from nvidia.dali.plugin.pytorch import (
DALIClassificationIterator,
LastBatchPolicy,
)
# Path to MNIST dataset
data_path = os.path.join(os.environ["DALI_EXTRA_PATH"], "db/MNIST/training/")
@pipeline_def
def GetMnistPipeline(device, shard_id=0, num_shards=1):
jpegs, labels = fn.readers.caffe2(
path=data_path,
shard_id=shard_id,
num_shards=num_shards,
random_shuffle=True,
name="Reader",
)
images = fn.decoders.image(
jpegs,
device="mixed" if device == "gpu" else "cpu",
output_type=types.GRAY,
)
images = fn.crop_mirror_normalize(
images,
dtype=types.FLOAT,
std=[0.3081 * 255],
mean=[0.1307 * 255],
output_layout="CHW",
)
if device == "gpu":
labels = labels.gpu()
# PyTorch expects labels as INT64
labels = fn.cast(labels, dtype=types.INT64)
return images, labels
现在我们准备好修改训练类以使用我们刚刚定义的 DALI pipeline。因为我们想与 PyTorch 集成,所以我们用 PyTorch DALI 迭代器包装我们的 pipeline,它可以用一些代码中的小改动来替换原生数据加载器。DALI 迭代器返回字典列表,其中列表中的每个元素对应于一个 pipeline 实例,字典中的条目映射到 pipeline 的输出。
有关更多信息,请查看 DALIGenericIterator 的文档。
[5]:
class DALILitMNIST(LitMNIST):
def __init__(self):
super().__init__()
def prepare_data(self):
# no preparation is needed in DALI
pass
def setup(self, stage=None):
device_id = self.local_rank
shard_id = self.global_rank
num_shards = self.trainer.world_size
mnist_pipeline = GetMnistPipeline(
batch_size=BATCH_SIZE,
device="gpu",
device_id=device_id,
shard_id=shard_id,
num_shards=num_shards,
num_threads=8,
)
self.train_loader = DALIClassificationIterator(
mnist_pipeline,
reader_name="Reader",
last_batch_policy=LastBatchPolicy.PARTIAL,
)
def train_dataloader(self):
return self.train_loader
def process_batch(self, batch):
x = batch[0]["data"]
y = batch[0]["label"].squeeze(-1)
return (x, y)
我们现在可以运行训练
[6]:
# Even if previous Trainer finished his work it still keeps the GPU booked,
# force it to release the device.
if "PL_TRAINER_GPUS" in os.environ:
os.environ.pop("PL_TRAINER_GPUS")
model = DALILitMNIST()
trainer = Trainer(
max_epochs=5, devices=1, accelerator="gpu", num_sanity_val_steps=0
)
# ddp work only in no-interactive mode, to test it unncoment and run as a script
# trainer = Trainer(devices=8, accelerator="gpu", strategy="ddp", max_epochs=5)
trainer.fit(model)
GPU available: True, used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
| Name | Type | Params
-----------------------------------
0 | layer_1 | Linear | 100 K
1 | layer_2 | Linear | 33.0 K
2 | layer_3 | Linear | 2.6 K
-----------------------------------
136 K Trainable params
0 Non-trainable params
136 K Total params
0.544 Total estimated model params size (MB)
为了更好的集成,我们可以提供一个自定义 DALI 迭代器包装器,这样在 LitMNIST.process_batch
内部就不需要额外的处理。此外,PyTorch 可以通过这种方式学习数据集的大小。
[7]:
class BetterDALILitMNIST(LitMNIST):
def __init__(self):
super().__init__()
def prepare_data(self):
# no preparation is needed in DALI
pass
def setup(self, stage=None):
device_id = self.local_rank
shard_id = self.global_rank
num_shards = self.trainer.world_size
mnist_pipeline = GetMnistPipeline(
batch_size=BATCH_SIZE,
device="gpu",
device_id=device_id,
shard_id=shard_id,
num_shards=num_shards,
num_threads=8,
)
class LightningWrapper(DALIClassificationIterator):
def __init__(self, *kargs, **kvargs):
super().__init__(*kargs, **kvargs)
def __next__(self):
out = super().__next__()
# DDP is used so only one pipeline per process
# also we need to transform dict returned by
# DALIClassificationIterator to iterable and squeeze the lables
out = out[0]
return [
out[k] if k != "label" else torch.squeeze(out[k])
for k in self.output_map
]
self.train_loader = LightningWrapper(
mnist_pipeline,
reader_name="Reader",
last_batch_policy=LastBatchPolicy.PARTIAL,
)
def train_dataloader(self):
return self.train_loader
让我们再次运行训练
[8]:
# Even if previous Trainer finished his work it still keeps the GPU booked,
# force it to release the device.
if "PL_TRAINER_GPUS" in os.environ:
os.environ.pop("PL_TRAINER_GPUS")
model = BetterDALILitMNIST()
trainer = Trainer(
max_epochs=5, devices=1, accelerator="gpu", num_sanity_val_steps=0
)
# ddp work only in no-interactive mode, to test it uncomment and run as a script
# trainer = Trainer(devices=8, accelerator="gpu", strategy="ddp", max_epochs=5)
trainer.fit(model)
GPU available: True, used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
| Name | Type | Params
-----------------------------------
0 | layer_1 | Linear | 100 K
1 | layer_2 | Linear | 33.0 K
2 | layer_3 | Linear | 2.6 K
-----------------------------------
136 K Trainable params
0 Non-trainable params
136 K Total params
0.544 Total estimated model params size (MB)