使用 Tensorflow DALI 插件:DALI 和 tf.data#

概述#

DALI 提供了与 tf.data API 的集成。使用这种方法,您可以轻松地将 DALI pipeline 与各种 TensorFlow API 连接起来,并将其用作模型的数据源。本教程展示了如何使用众所周知的 MNIST 转换为 LMDB 格式来做到这一点。您可以在 DALI_extra - DALI 测试数据仓库中找到它。

我们首先创建一个 DALI pipeline 来读取、解码和归一化 MNIST 图像,并读取相应的标签。

DALI_EXTRA_PATH 环境变量应指向从 DALI extra 仓库 下载数据的位置。请确保检出正确的发布标签。

[1]:
from nvidia.dali import pipeline_def, Pipeline
import nvidia.dali.fn as fn
import nvidia.dali.types as types
import os

BATCH_SIZE = 64
DROPOUT = 0.2
IMAGE_SIZE = 28
NUM_CLASSES = 10
HIDDEN_SIZE = 128
EPOCHS = 5
ITERATIONS_PER_EPOCH = 100


# Path to MNIST dataset
data_path = os.path.join(os.environ["DALI_EXTRA_PATH"], "db/MNIST/training/")


@pipeline_def(device_id=0, batch_size=BATCH_SIZE)
def mnist_pipeline(device):
    jpegs, labels = fn.readers.caffe2(path=data_path, random_shuffle=True)
    images = fn.decoders.image(
        jpegs,
        device="mixed" if device == "gpu" else "cpu",
        output_type=types.GRAY,
    )
    images = fn.crop_mirror_normalize(
        images,
        device=device,
        dtype=types.FLOAT,
        std=[255.0],
        output_layout="CHW",
    )

    if device == "gpu":
        labels = labels.gpu()

    return images, labels

下一步是用 DALI TensorFlow 插件中的 DALIDataset 对象包装 MnistPipeline 的实例。此类与 tf.data.Dataset 兼容。其他参数是 pipeline 输出的形状和类型。这里我们返回图像和标签。这意味着我们有两个输出,一个是用于图像的 tf.float32 类型,另一个是用于标签的 tf.int32 类型。

[2]:
import nvidia.dali.plugin.tf as dali_tf
import tensorflow as tf
import tensorflow.compat.v1 as tf_v1
import logging

tf.get_logger().setLevel(logging.ERROR)

# Create pipeline
pipeline = mnist_pipeline(device="cpu")

# Define shapes and types of the outputs
shapes = ((BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE), (BATCH_SIZE))
dtypes = (tf.float32, tf.int32)

# Create dataset
with tf.device("/cpu:0"):
    mnist_set = dali_tf.DALIDataset(
        pipeline=pipeline,
        batch_size=BATCH_SIZE,
        output_shapes=shapes,
        output_dtypes=dtypes,
        device_id=0,
    )

我们已准备好开始训练。以下部分展示了如何使用 TensorFlow 中可用的不同 API 来做到这一点。

Keras#

首先,我们将 mnist_set 传递给使用 tf.keras 创建的模型,并使用 model.fit 方法对其进行训练。

[3]:
# Create the model
model = tf.keras.models.Sequential(
    [
        tf.keras.layers.Input(shape=(IMAGE_SIZE, IMAGE_SIZE), name="images"),
        tf.keras.layers.Flatten(input_shape=(IMAGE_SIZE, IMAGE_SIZE)),
        tf.keras.layers.Dense(HIDDEN_SIZE, activation="relu"),
        tf.keras.layers.Dropout(DROPOUT),
        tf.keras.layers.Dense(NUM_CLASSES, activation="softmax"),
    ]
)
model.compile(
    optimizer="adam",
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"],
)

# Train using DALI dataset
model.fit(mnist_set, epochs=EPOCHS, steps_per_epoch=ITERATIONS_PER_EPOCH)
Epoch 1/5
100/100 [==============================] - 1s 3ms/step - loss: 1.3511 - accuracy: 0.5834
Epoch 2/5
100/100 [==============================] - 0s 3ms/step - loss: 0.4534 - accuracy: 0.8690
Epoch 3/5
100/100 [==============================] - 0s 4ms/step - loss: 0.3380 - accuracy: 0.9003
Epoch 4/5
100/100 [==============================] - 0s 3ms/step - loss: 0.2927 - accuracy: 0.9218
Epoch 5/5
100/100 [==============================] - 0s 4ms/step - loss: 0.2736 - accuracy: 0.9217
[3]:
<tensorflow.python.keras.callbacks.History at 0x7f678122cbe0>

如您所见,将 DALI pipeline 与 tf.keras API 集成非常容易。

上面的代码使用 CPU 执行了训练。DALI pipeline 和模型都使用了 CPU。

我们可以轻松地将整个处理过程移至 GPU。首先,我们创建一个使用 ID = 0 的 GPU 的 pipeline。接下来,我们将 DALI 数据集和模型都放在同一个 GPU 上。

[4]:
# Define the model and place it on the GPU
with tf.device("/gpu:0"):
    mnist_set = dali_tf.DALIDataset(
        pipeline=mnist_pipeline(device="gpu"),
        batch_size=BATCH_SIZE,
        output_shapes=shapes,
        output_dtypes=dtypes,
        device_id=0,
    )
    model = tf.keras.models.Sequential(
        [
            tf.keras.layers.Input(
                shape=(IMAGE_SIZE, IMAGE_SIZE), name="images"
            ),
            tf.keras.layers.Flatten(input_shape=(IMAGE_SIZE, IMAGE_SIZE)),
            tf.keras.layers.Dense(HIDDEN_SIZE, activation="relu"),
            tf.keras.layers.Dropout(DROPOUT),
            tf.keras.layers.Dense(NUM_CLASSES, activation="softmax"),
        ]
    )
    model.compile(
        optimizer="adam",
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"],
    )

我们也把训练移到 GPU 上。这允许 TensorFlow 拾取 DALI 数据集的 GPU 实例。

[5]:
# Train on the GPU
with tf.device("/gpu:0"):
    model.fit(mnist_set, epochs=EPOCHS, steps_per_epoch=ITERATIONS_PER_EPOCH)
Epoch 1/5
100/100 [==============================] - 1s 4ms/step - loss: 1.3734 - accuracy: 0.5844
Epoch 2/5
100/100 [==============================] - 0s 4ms/step - loss: 0.4566 - accuracy: 0.8690
Epoch 3/5
100/100 [==============================] - 0s 4ms/step - loss: 0.3375 - accuracy: 0.8991
Epoch 4/5
100/100 [==============================] - 0s 4ms/step - loss: 0.3017 - accuracy: 0.9156
Epoch 5/5
100/100 [==============================] - 0s 4ms/step - loss: 0.2925 - accuracy: 0.9167

这里需要注意的是,在上面的执行中,DALI 和 TensorFlow 之间没有中间 CPU 缓冲区。DALI GPU 输出直接复制到模型使用的 TF GPU 张量。

在这个特定的玩具示例中,GPU 变体的性能低于 CPU 变体。MNIST 图像很小,GPU DALI pipeline 中使用的 nvJPEG 解码器不太适合这种情况。我们在此处使用它来展示如何在实际情况下正确集成它。

Estimators#

另一种流行的 TensorFlow API 是 tf.estimator API。本节介绍如何使用 DALI 数据集作为基于此 API 的模型的数据源。

首先,我们创建模型。

[6]:
# Define the feature columns for Estimator
feature_columns = [
    tf.feature_column.numeric_column("images", shape=[IMAGE_SIZE, IMAGE_SIZE])
]

# And the run config
run_config = tf.estimator.RunConfig(
    model_dir="/tmp/tensorflow-checkpoints", device_fn=lambda op: "/gpu:0"
)

# Finally create the model based on `DNNClassifier`
model = tf.estimator.DNNClassifier(
    feature_columns=feature_columns,
    hidden_units=[HIDDEN_SIZE],
    n_classes=NUM_CLASSES,
    dropout=DROPOUT,
    config=run_config,
    optimizer="Adam",
)

tf.estimator API 中,数据通过返回数据集的函数传递给模型。我们定义此函数以返回放置在 GPU 上的 DALI 数据集。

[7]:
def train_data_fn():
    with tf.device("/gpu:0"):
        mnist_set = dali_tf.DALIDataset(
            fail_on_device_mismatch=False,
            pipeline=mnist_pipeline(device="gpu"),
            batch_size=BATCH_SIZE,
            output_shapes=shapes,
            output_dtypes=dtypes,
            device_id=0,
        )
        mnist_set = mnist_set.map(
            lambda features, labels: ({"images": features}, labels)
        )

    return mnist_set

设置好一切后,我们就可以运行训练了。

[8]:
# Running the training on the GPU
model.train(input_fn=train_data_fn, steps=EPOCHS * ITERATIONS_PER_EPOCH)
[8]:
<tensorflow_estimator.python.estimator.canned.dnn.DNNClassifierV2 at 0x7f677012beb0>
[9]:
def test_data_fn():
    with tf.device("/cpu:0"):
        mnist_set = dali_tf.DALIDataset(
            fail_on_device_mismatch=False,
            pipeline=mnist_pipeline(device="cpu"),
            batch_size=BATCH_SIZE,
            output_shapes=shapes,
            output_dtypes=dtypes,
            device_id=0,
        )
        mnist_set = mnist_set.map(
            lambda features, labels: ({"images": features}, labels)
        )

    return mnist_set


model.evaluate(input_fn=test_data_fn, steps=ITERATIONS_PER_EPOCH)
[9]:
{'accuracy': 0.9915625,
 'average_loss': 0.030411616,
 'loss': 0.030411616,
 'global_step': 5500}

自定义模型和训练循环#

最后,本教程的最后一部分重点介绍将 DALI 数据集与自定义模型和训练循环集成。下面的完整示例从头到尾展示了如何将 DALI 数据集与原生 TensorFlow 模型一起使用,并使用 tf.Session 运行训练。

第一步是定义模型和数据集,并将两者都放在 GPU 上。

[10]:
tf.compat.v1.disable_eager_execution()
tf_v1.reset_default_graph()

with tf.device("/gpu:0"):
    mnist_set = dali_tf.DALIDataset(
        pipeline=mnist_pipeline(device="gpu"),
        batch_size=BATCH_SIZE,
        output_shapes=shapes,
        output_dtypes=dtypes,
        device_id=0,
    )

    iterator = tf_v1.data.make_initializable_iterator(mnist_set)
    images, labels = iterator.get_next()

    labels = tf_v1.reshape(
        tf_v1.one_hot(labels, NUM_CLASSES), [BATCH_SIZE, NUM_CLASSES]
    )

    with tf_v1.variable_scope("mnist_net", reuse=False):
        images = tf_v1.layers.flatten(images)
        images = tf_v1.layers.dense(
            images, HIDDEN_SIZE, activation=tf_v1.nn.relu
        )
        images = tf_v1.layers.dropout(images, rate=DROPOUT, training=True)
        images = tf_v1.layers.dense(
            images, NUM_CLASSES, activation=tf_v1.nn.softmax
        )

    logits_train = images
    loss_op = tf_v1.reduce_mean(
        tf.nn.softmax_cross_entropy_with_logits(
            logits=logits_train, labels=labels
        )
    )
    train_step = tf_v1.train.AdamOptimizer().minimize(loss_op)

    correct_pred = tf_v1.equal(
        tf_v1.argmax(logits_train, 1), tf_v1.argmax(labels, 1)
    )
    accuracy = tf_v1.reduce_mean(tf_v1.cast(correct_pred, tf_v1.float32))
/home/awolant/.local/lib/python3.8/site-packages/tensorflow/python/keras/legacy_tf_layers/core.py:329: UserWarning: `tf.layers.flatten` is deprecated and will be removed in a future version. Please use `tf.keras.layers.Flatten` instead.
  warnings.warn('`tf.layers.flatten` is deprecated and '
/home/awolant/.local/lib/python3.8/site-packages/tensorflow/python/keras/engine/base_layer_v1.py:1693: UserWarning: `layer.apply` is deprecated and will be removed in a future version. Please use `layer.__call__` method instead.
  warnings.warn('`layer.apply` is deprecated and '
/home/awolant/.local/lib/python3.8/site-packages/tensorflow/python/keras/legacy_tf_layers/core.py:171: UserWarning: `tf.layers.dense` is deprecated and will be removed in a future version. Please use `tf.keras.layers.Dense` instead.
  warnings.warn('`tf.layers.dense` is deprecated and '
/home/awolant/.local/lib/python3.8/site-packages/tensorflow/python/keras/legacy_tf_layers/core.py:268: UserWarning: `tf.layers.dropout` is deprecated and will be removed in a future version. Please use `tf.keras.layers.Dropout` instead.
  warnings.warn('`tf.layers.dropout` is deprecated and '

使用 tf.Session,我们可以运行此模型并在 GPU 上对其进行训练。

[11]:
with tf_v1.Session() as sess:
    sess.run(tf_v1.global_variables_initializer())
    sess.run(iterator.initializer)

    for i in range(EPOCHS * ITERATIONS_PER_EPOCH):
        sess.run(train_step)
        if i % ITERATIONS_PER_EPOCH == 0:
            train_accuracy = sess.run(accuracy)
            print("Step %d, accuracy: %g" % (i, train_accuracy))

    final_accuracy = 0
    for _ in range(ITERATIONS_PER_EPOCH):
        final_accuracy = final_accuracy + sess.run(accuracy)
    final_accuracy = final_accuracy / ITERATIONS_PER_EPOCH

    print("Final accuracy: ", final_accuracy)
Step 0, accuracy: 0.140625
Step 100, accuracy: 0.84375
Step 200, accuracy: 0.9375
Step 300, accuracy: 0.875
Step 400, accuracy: 0.90625
Final accuracy:  0.90640625
[ ]: