使用 Tensorflow DALI 插件:DALI 和 tf.data#
概述#
DALI 提供与 tf.data API 的集成。使用这种方法,您可以轻松地将 DALI 管道与各种 TensorFlow API 连接,并将其用作您模型的数据源。本教程展示了如何使用转换为 LMDB 格式的著名 MNIST 来实现这一点。您可以在 DALI_extra - DALI 测试数据仓库中找到它。
我们首先创建一个 DALI 管道来读取、解码和归一化 MNIST 图像,并读取相应的标签。
DALI_EXTRA_PATH
环境变量应指向从 DALI extra repository 下载数据的位置。请确保检出正确的发布标签。
[1]:
from nvidia.dali import pipeline_def, Pipeline
import nvidia.dali.fn as fn
import nvidia.dali.types as types
import os
BATCH_SIZE = 64
DROPOUT = 0.2
IMAGE_SIZE = 28
NUM_CLASSES = 10
HIDDEN_SIZE = 128
EPOCHS = 5
ITERATIONS_PER_EPOCH = 100
# Path to MNIST dataset
data_path = os.path.join(os.environ["DALI_EXTRA_PATH"], "db/MNIST/training/")
@pipeline_def(device_id=0, batch_size=BATCH_SIZE)
def mnist_pipeline(device):
jpegs, labels = fn.readers.caffe2(path=data_path, random_shuffle=True)
images = fn.decoders.image(
jpegs,
device="mixed" if device == "gpu" else "cpu",
output_type=types.GRAY,
)
images = fn.crop_mirror_normalize(
images,
device=device,
dtype=types.FLOAT,
std=[255.0],
output_layout="CHW",
)
if device == "gpu":
labels = labels.gpu()
return images, labels
下一步是使用 DALI TensorFlow 插件中的 DALIDataset
对象包装 MnistPipeline
的实例。此类与 tf.data.Dataset
兼容。其他参数是管道输出的形状和类型。这里我们返回图像和标签。这意味着我们有两个输出,一个是用于图像的 tf.float32
类型,另一个是用于标签的 tf.int32
类型。
[2]:
import nvidia.dali.plugin.tf as dali_tf
import tensorflow as tf
import tensorflow.compat.v1 as tf_v1
import logging
tf.get_logger().setLevel(logging.ERROR)
# Create pipeline
pipeline = mnist_pipeline(device="cpu")
# Define shapes and types of the outputs
shapes = ((BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE), (BATCH_SIZE))
dtypes = (tf.float32, tf.int32)
# Create dataset
with tf.device("/cpu:0"):
mnist_set = dali_tf.DALIDataset(
pipeline=pipeline,
batch_size=BATCH_SIZE,
output_shapes=shapes,
output_dtypes=dtypes,
device_id=0,
)
我们已准备好开始训练。以下部分展示了如何使用 TensorFlow 中提供的不同 API 来完成此操作。
Keras#
首先,我们将 mnist_set
传递给使用 tf.keras
创建的模型,并使用 model.fit
方法对其进行训练。
[3]:
# Create the model
model = tf.keras.models.Sequential(
[
tf.keras.layers.Input(shape=(IMAGE_SIZE, IMAGE_SIZE), name="images"),
tf.keras.layers.Flatten(input_shape=(IMAGE_SIZE, IMAGE_SIZE)),
tf.keras.layers.Dense(HIDDEN_SIZE, activation="relu"),
tf.keras.layers.Dropout(DROPOUT),
tf.keras.layers.Dense(NUM_CLASSES, activation="softmax"),
]
)
model.compile(
optimizer="adam",
loss="sparse_categorical_crossentropy",
metrics=["accuracy"],
)
# Train using DALI dataset
model.fit(mnist_set, epochs=EPOCHS, steps_per_epoch=ITERATIONS_PER_EPOCH)
Epoch 1/5
100/100 [==============================] - 1s 3ms/step - loss: 1.3511 - accuracy: 0.5834
Epoch 2/5
100/100 [==============================] - 0s 3ms/step - loss: 0.4534 - accuracy: 0.8690
Epoch 3/5
100/100 [==============================] - 0s 4ms/step - loss: 0.3380 - accuracy: 0.9003
Epoch 4/5
100/100 [==============================] - 0s 3ms/step - loss: 0.2927 - accuracy: 0.9218
Epoch 5/5
100/100 [==============================] - 0s 4ms/step - loss: 0.2736 - accuracy: 0.9217
[3]:
<tensorflow.python.keras.callbacks.History at 0x7f678122cbe0>
如您所见,将 DALI 管道与 tf.keras
API 集成非常容易。
上面的代码使用 CPU 执行训练。DALI 管道和模型都使用 CPU。
我们可以轻松地将整个处理过程转移到 GPU。首先,我们创建一个使用 ID = 0 的 GPU 的管道。接下来,我们将 DALI 数据集和模型都放置在同一个 GPU 上。
[4]:
# Define the model and place it on the GPU
with tf.device("/gpu:0"):
mnist_set = dali_tf.DALIDataset(
pipeline=mnist_pipeline(device="gpu"),
batch_size=BATCH_SIZE,
output_shapes=shapes,
output_dtypes=dtypes,
device_id=0,
)
model = tf.keras.models.Sequential(
[
tf.keras.layers.Input(
shape=(IMAGE_SIZE, IMAGE_SIZE), name="images"
),
tf.keras.layers.Flatten(input_shape=(IMAGE_SIZE, IMAGE_SIZE)),
tf.keras.layers.Dense(HIDDEN_SIZE, activation="relu"),
tf.keras.layers.Dropout(DROPOUT),
tf.keras.layers.Dense(NUM_CLASSES, activation="softmax"),
]
)
model.compile(
optimizer="adam",
loss="sparse_categorical_crossentropy",
metrics=["accuracy"],
)
我们也将训练转移到 GPU。这允许 TensorFlow 拾取 DALI 数据集的 GPU 实例。
[5]:
# Train on the GPU
with tf.device("/gpu:0"):
model.fit(mnist_set, epochs=EPOCHS, steps_per_epoch=ITERATIONS_PER_EPOCH)
Epoch 1/5
100/100 [==============================] - 1s 4ms/step - loss: 1.3734 - accuracy: 0.5844
Epoch 2/5
100/100 [==============================] - 0s 4ms/step - loss: 0.4566 - accuracy: 0.8690
Epoch 3/5
100/100 [==============================] - 0s 4ms/step - loss: 0.3375 - accuracy: 0.8991
Epoch 4/5
100/100 [==============================] - 0s 4ms/step - loss: 0.3017 - accuracy: 0.9156
Epoch 5/5
100/100 [==============================] - 0s 4ms/step - loss: 0.2925 - accuracy: 0.9167
这里需要注意的是,在上面的执行中,DALI 和 TensorFlow 之间没有中间 CPU 缓冲区。DALI GPU 输出直接复制到模型使用的 TF GPU 张量。
在这个特定的玩具示例中,GPU 变体的性能低于 CPU 变体。MNIST 图像很小,GPU DALI 管道中使用的 nvJPEG 解码器不太适合这种情况。我们在此处使用它来展示如何在实际案例中正确集成它。
Estimators#
另一个流行的 TensorFlow API 是 tf.estimator
API。本节介绍如何使用 DALI 数据集作为基于此 API 的模型的数据源。
首先,我们创建模型。
[6]:
# Define the feature columns for Estimator
feature_columns = [
tf.feature_column.numeric_column("images", shape=[IMAGE_SIZE, IMAGE_SIZE])
]
# And the run config
run_config = tf.estimator.RunConfig(
model_dir="/tmp/tensorflow-checkpoints", device_fn=lambda op: "/gpu:0"
)
# Finally create the model based on `DNNClassifier`
model = tf.estimator.DNNClassifier(
feature_columns=feature_columns,
hidden_units=[HIDDEN_SIZE],
n_classes=NUM_CLASSES,
dropout=DROPOUT,
config=run_config,
optimizer="Adam",
)
在 tf.estimator
API 中,数据通过返回数据集的函数传递给模型。我们定义此函数以返回放置在 GPU 上的 DALI 数据集。
[7]:
def train_data_fn():
with tf.device("/gpu:0"):
mnist_set = dali_tf.DALIDataset(
fail_on_device_mismatch=False,
pipeline=mnist_pipeline(device="gpu"),
batch_size=BATCH_SIZE,
output_shapes=shapes,
output_dtypes=dtypes,
device_id=0,
)
mnist_set = mnist_set.map(
lambda features, labels: ({"images": features}, labels)
)
return mnist_set
完成所有设置后,我们就可以运行训练了。
[8]:
# Running the training on the GPU
model.train(input_fn=train_data_fn, steps=EPOCHS * ITERATIONS_PER_EPOCH)
[8]:
<tensorflow_estimator.python.estimator.canned.dnn.DNNClassifierV2 at 0x7f677012beb0>
[9]:
def test_data_fn():
with tf.device("/cpu:0"):
mnist_set = dali_tf.DALIDataset(
fail_on_device_mismatch=False,
pipeline=mnist_pipeline(device="cpu"),
batch_size=BATCH_SIZE,
output_shapes=shapes,
output_dtypes=dtypes,
device_id=0,
)
mnist_set = mnist_set.map(
lambda features, labels: ({"images": features}, labels)
)
return mnist_set
model.evaluate(input_fn=test_data_fn, steps=ITERATIONS_PER_EPOCH)
[9]:
{'accuracy': 0.9915625,
'average_loss': 0.030411616,
'loss': 0.030411616,
'global_step': 5500}
自定义模型和训练循环#
最后,本教程的最后一部分重点介绍如何将 DALI 数据集与自定义模型和训练循环集成。下面的完整示例从头到尾展示了如何将 DALI 数据集与原生 TensorFlow 模型一起使用,并使用 tf.Session
运行训练。
第一步是定义模型和数据集,并将两者都放置在 GPU 上。
[10]:
tf.compat.v1.disable_eager_execution()
tf_v1.reset_default_graph()
with tf.device("/gpu:0"):
mnist_set = dali_tf.DALIDataset(
pipeline=mnist_pipeline(device="gpu"),
batch_size=BATCH_SIZE,
output_shapes=shapes,
output_dtypes=dtypes,
device_id=0,
)
iterator = tf_v1.data.make_initializable_iterator(mnist_set)
images, labels = iterator.get_next()
labels = tf_v1.reshape(
tf_v1.one_hot(labels, NUM_CLASSES), [BATCH_SIZE, NUM_CLASSES]
)
with tf_v1.variable_scope("mnist_net", reuse=False):
images = tf_v1.layers.flatten(images)
images = tf_v1.layers.dense(
images, HIDDEN_SIZE, activation=tf_v1.nn.relu
)
images = tf_v1.layers.dropout(images, rate=DROPOUT, training=True)
images = tf_v1.layers.dense(
images, NUM_CLASSES, activation=tf_v1.nn.softmax
)
logits_train = images
loss_op = tf_v1.reduce_mean(
tf.nn.softmax_cross_entropy_with_logits(
logits=logits_train, labels=labels
)
)
train_step = tf_v1.train.AdamOptimizer().minimize(loss_op)
correct_pred = tf_v1.equal(
tf_v1.argmax(logits_train, 1), tf_v1.argmax(labels, 1)
)
accuracy = tf_v1.reduce_mean(tf_v1.cast(correct_pred, tf_v1.float32))
/home/awolant/.local/lib/python3.8/site-packages/tensorflow/python/keras/legacy_tf_layers/core.py:329: UserWarning: `tf.layers.flatten` is deprecated and will be removed in a future version. Please use `tf.keras.layers.Flatten` instead.
warnings.warn('`tf.layers.flatten` is deprecated and '
/home/awolant/.local/lib/python3.8/site-packages/tensorflow/python/keras/engine/base_layer_v1.py:1693: UserWarning: `layer.apply` is deprecated and will be removed in a future version. Please use `layer.__call__` method instead.
warnings.warn('`layer.apply` is deprecated and '
/home/awolant/.local/lib/python3.8/site-packages/tensorflow/python/keras/legacy_tf_layers/core.py:171: UserWarning: `tf.layers.dense` is deprecated and will be removed in a future version. Please use `tf.keras.layers.Dense` instead.
warnings.warn('`tf.layers.dense` is deprecated and '
/home/awolant/.local/lib/python3.8/site-packages/tensorflow/python/keras/legacy_tf_layers/core.py:268: UserWarning: `tf.layers.dropout` is deprecated and will be removed in a future version. Please use `tf.keras.layers.Dropout` instead.
warnings.warn('`tf.layers.dropout` is deprecated and '
使用 tf.Session
,我们可以运行此模型并在 GPU 上对其进行训练。
[11]:
with tf_v1.Session() as sess:
sess.run(tf_v1.global_variables_initializer())
sess.run(iterator.initializer)
for i in range(EPOCHS * ITERATIONS_PER_EPOCH):
sess.run(train_step)
if i % ITERATIONS_PER_EPOCH == 0:
train_accuracy = sess.run(accuracy)
print("Step %d, accuracy: %g" % (i, train_accuracy))
final_accuracy = 0
for _ in range(ITERATIONS_PER_EPOCH):
final_accuracy = final_accuracy + sess.run(accuracy)
final_accuracy = final_accuracy / ITERATIONS_PER_EPOCH
print("Final accuracy: ", final_accuracy)
Step 0, accuracy: 0.140625
Step 100, accuracy: 0.84375
Step 200, accuracy: 0.9375
Step 300, accuracy: 0.875
Step 400, accuracy: 0.90625
Final accuracy: 0.90640625
[ ]: