使用 Tensorflow DALI 插件:具有多个 GPU 的 DALI tf.data.Dataset#

概述#

本笔记本是一个关于如何将 DALI tf.data.Dataset 与多个 GPU 结合使用的综合示例。建议先查看单 GPU 示例,以快速了解 DALI 数据集以及如何使用它来训练神经网络。此示例是单 GPU 版本的扩展。

最初,我们定义了一些训练参数,并创建了一个 DALI pipeline 来读取转换为 LMDB 格式的 MNIST。您可以在 DALI_extra 数据集中找到它。此 pipeline 能够将数据集划分为多个分片。

DALI_EXTRA_PATH 环境变量应指向从 DALI extra 存储库 下载的数据所在的位置。请确保检出正确的发布标记。

[1]:
import nvidia.dali as dali
from nvidia.dali import pipeline_def, Pipeline
import nvidia.dali.fn as fn
import nvidia.dali.types as types

import os

import nvidia.dali.plugin.tf as dali_tf
import tensorflow as tf

import logging

tf.get_logger().setLevel(logging.ERROR)
[2]:
# Path to MNIST dataset
data_path = os.path.join(os.environ["DALI_EXTRA_PATH"], "db/MNIST/training/")

BATCH_SIZE = 64
DROPOUT = 0.2
IMAGE_SIZE = 28
NUM_CLASSES = 10
HIDDEN_SIZE = 128
EPOCHS = 5
ITERATIONS = 100
NUM_DEVICES = 2
[3]:
@pipeline_def(batch_size=BATCH_SIZE)
def mnist_pipeline(shard_id):
    jpegs, labels = fn.readers.caffe2(
        path=data_path,
        random_shuffle=True,
        shard_id=shard_id,
        num_shards=NUM_DEVICES,
    )
    images = fn.decoders.image(jpegs, device="mixed", output_type=types.GRAY)
    images = fn.crop_mirror_normalize(
        images, dtype=types.FLOAT, std=[255.0], output_layout="CHW"
    )

    return images, labels.gpu()

接下来,我们创建 DALI 数据集所需的一些参数。有关它们的更多详细信息,您可以查看单 GPU 示例

[4]:
shapes = ((BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE), (BATCH_SIZE))
dtypes = (tf.float32, tf.int32)

现在我们准备好定义模型。为了使训练分布到多个 GPU,我们使用 tf.distribute.MirroredStrategy

[5]:
strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])

with strategy.scope():
    model = tf.keras.models.Sequential(
        [
            tf.keras.layers.Input(
                shape=(IMAGE_SIZE, IMAGE_SIZE), name="images"
            ),
            tf.keras.layers.Flatten(input_shape=(IMAGE_SIZE, IMAGE_SIZE)),
            tf.keras.layers.Dense(HIDDEN_SIZE, activation="relu"),
            tf.keras.layers.Dropout(DROPOUT),
            tf.keras.layers.Dense(NUM_CLASSES, activation="softmax"),
        ]
    )

    model.compile(
        optimizer="adam",
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"],
    )

DALI 数据集也需要进行分布式处理。为此,我们使用 distribute_datasets_from_function。首先,我们需要定义一个函数,该函数返回绑定到给定 id 的设备的数据集。此外,还需要一些特定的选项才能使一切正常运行。

[6]:
def dataset_fn(input_context):
    with tf.device("/gpu:{}".format(input_context.input_pipeline_id)):
        device_id = input_context.input_pipeline_id
        return dali_tf.DALIDataset(
            pipeline=mnist_pipeline(device_id=device_id, shard_id=device_id),
            batch_size=BATCH_SIZE,
            output_shapes=shapes,
            output_dtypes=dtypes,
            device_id=device_id,
        )


input_options = tf.distribute.InputOptions(
    experimental_place_dataset_on_device=True,
    experimental_fetch_to_device=False,
    experimental_replication_mode=tf.distribute.InputReplicationMode.PER_REPLICA,
)

train_dataset = strategy.distribute_datasets_from_function(
    dataset_fn, input_options
)

一切就绪后,我们就可以运行训练并评估模型了。

[7]:
model.fit(train_dataset, epochs=EPOCHS, steps_per_epoch=ITERATIONS)
Epoch 1/5
100/100 [==============================] - 4s 8ms/step - loss: 1.2438 - accuracy: 0.6290
Epoch 2/5
100/100 [==============================] - 1s 8ms/step - loss: 0.3991 - accuracy: 0.8876
Epoch 3/5
100/100 [==============================] - 1s 8ms/step - loss: 0.3202 - accuracy: 0.9045
Epoch 4/5
100/100 [==============================] - 1s 9ms/step - loss: 0.2837 - accuracy: 0.9183
Epoch 5/5
100/100 [==============================] - 1s 8ms/step - loss: 0.2441 - accuracy: 0.9303
[7]:
<tensorflow.python.keras.callbacks.History at 0x7f5d09685880>
[8]:
model.evaluate(train_dataset, steps=ITERATIONS)
100/100 [==============================] - 2s 5ms/step - loss: 0.1963 - accuracy: 0.9438
[8]:
[0.19630344212055206, 0.9437500238418579]