使用 Tensorflow DALI 插件:具有多个 GPU 的 DALI tf.data.Dataset#
概述#
本笔记本是一个关于如何将 DALI tf.data.Dataset
与多个 GPU 结合使用的综合示例。建议先查看单 GPU 示例,以快速了解 DALI 数据集以及如何使用它来训练神经网络。此示例是单 GPU 版本的扩展。
最初,我们定义了一些训练参数,并创建了一个 DALI pipeline 来读取转换为 LMDB 格式的 MNIST。您可以在 DALI_extra 数据集中找到它。此 pipeline 能够将数据集划分为多个分片。
DALI_EXTRA_PATH
环境变量应指向从 DALI extra 存储库 下载的数据所在的位置。请确保检出正确的发布标记。
[1]:
import nvidia.dali as dali
from nvidia.dali import pipeline_def, Pipeline
import nvidia.dali.fn as fn
import nvidia.dali.types as types
import os
import nvidia.dali.plugin.tf as dali_tf
import tensorflow as tf
import logging
tf.get_logger().setLevel(logging.ERROR)
[2]:
# Path to MNIST dataset
data_path = os.path.join(os.environ["DALI_EXTRA_PATH"], "db/MNIST/training/")
BATCH_SIZE = 64
DROPOUT = 0.2
IMAGE_SIZE = 28
NUM_CLASSES = 10
HIDDEN_SIZE = 128
EPOCHS = 5
ITERATIONS = 100
NUM_DEVICES = 2
[3]:
@pipeline_def(batch_size=BATCH_SIZE)
def mnist_pipeline(shard_id):
jpegs, labels = fn.readers.caffe2(
path=data_path,
random_shuffle=True,
shard_id=shard_id,
num_shards=NUM_DEVICES,
)
images = fn.decoders.image(jpegs, device="mixed", output_type=types.GRAY)
images = fn.crop_mirror_normalize(
images, dtype=types.FLOAT, std=[255.0], output_layout="CHW"
)
return images, labels.gpu()
接下来,我们创建 DALI 数据集所需的一些参数。有关它们的更多详细信息,您可以查看单 GPU 示例。
[4]:
shapes = ((BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE), (BATCH_SIZE))
dtypes = (tf.float32, tf.int32)
现在我们准备好定义模型。为了使训练分布到多个 GPU,我们使用 tf.distribute.MirroredStrategy
。
[5]:
strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
with strategy.scope():
model = tf.keras.models.Sequential(
[
tf.keras.layers.Input(
shape=(IMAGE_SIZE, IMAGE_SIZE), name="images"
),
tf.keras.layers.Flatten(input_shape=(IMAGE_SIZE, IMAGE_SIZE)),
tf.keras.layers.Dense(HIDDEN_SIZE, activation="relu"),
tf.keras.layers.Dropout(DROPOUT),
tf.keras.layers.Dense(NUM_CLASSES, activation="softmax"),
]
)
model.compile(
optimizer="adam",
loss="sparse_categorical_crossentropy",
metrics=["accuracy"],
)
DALI 数据集也需要进行分布式处理。为此,我们使用 distribute_datasets_from_function
。首先,我们需要定义一个函数,该函数返回绑定到给定 id 的设备的数据集。此外,还需要一些特定的选项才能使一切正常运行。
[6]:
def dataset_fn(input_context):
with tf.device("/gpu:{}".format(input_context.input_pipeline_id)):
device_id = input_context.input_pipeline_id
return dali_tf.DALIDataset(
pipeline=mnist_pipeline(device_id=device_id, shard_id=device_id),
batch_size=BATCH_SIZE,
output_shapes=shapes,
output_dtypes=dtypes,
device_id=device_id,
)
input_options = tf.distribute.InputOptions(
experimental_place_dataset_on_device=True,
experimental_fetch_to_device=False,
experimental_replication_mode=tf.distribute.InputReplicationMode.PER_REPLICA,
)
train_dataset = strategy.distribute_datasets_from_function(
dataset_fn, input_options
)
一切就绪后,我们就可以运行训练并评估模型了。
[7]:
model.fit(train_dataset, epochs=EPOCHS, steps_per_epoch=ITERATIONS)
Epoch 1/5
100/100 [==============================] - 4s 8ms/step - loss: 1.2438 - accuracy: 0.6290
Epoch 2/5
100/100 [==============================] - 1s 8ms/step - loss: 0.3991 - accuracy: 0.8876
Epoch 3/5
100/100 [==============================] - 1s 8ms/step - loss: 0.3202 - accuracy: 0.9045
Epoch 4/5
100/100 [==============================] - 1s 9ms/step - loss: 0.2837 - accuracy: 0.9183
Epoch 5/5
100/100 [==============================] - 1s 8ms/step - loss: 0.2441 - accuracy: 0.9303
[7]:
<tensorflow.python.keras.callbacks.History at 0x7f5d09685880>
[8]:
model.evaluate(train_dataset, steps=ITERATIONS)
100/100 [==============================] - 2s 5ms/step - loss: 0.1963 - accuracy: 0.9438
[8]:
[0.19630344212055206, 0.9437500238418579]