TensorFlow模型建立与训练

本章介绍如何使用TensorFlow快速搭建动态模型。

  • 模型的构建: tf.keras.Modeltf.keras.layers
  • 模型的损失函数: tf.keras.losses
  • 模型的优化器: tf.keras.optimizer
  • 模型的评估: tf.keras.metrics

前置知识

模型(Model)与层(Layer)

在TensorFlow中,推荐使用Keras( tf.keras )构建模型。Keras是一个广为流行的高级神经网络API,简单、快速而不失灵活性,现已得到TensorFlow的官方内置和全面支持。

Keras有两个重要的概念: 模型(Model)层(Layer) 。层将各种计算流程和变量进行了封装(例如基本的全连接层,CNN的卷积层、池化层等),而模型则将各种层进行组织和连接,并封装成一个整体,描述了如何将输入数据通过各种层以及运算而得到输出。在需要模型调用的时候,使用 y_pred = model(X) 的形式即可。Keras在 tf.keras.layers 下内置了深度学习中大量常用的的预定义层,同时也允许我们自定义层。

Keras模型以类的形式呈现,我们可以通过继承 tf.keras.Model 这个Python类来定义自己的模型。在继承类中,我们需要重写 __init__() (构造函数,初始化)和 call(input) (模型调用)两个方法,同时也可以根据需要增加自定义的方法。

class MyModel(tf.keras.Model):
    def __init__(self):
        super().__init__()     # Python 2 下使用 super(MyModel, self).__init__()
        # 此处添加初始化代码(包含call方法中会用到的层),例如
        # layer1 = tf.keras.layers.BuiltInLayer(...)
        # layer2 = MyCustomLayer(...)

    def call(self, input):
        # 此处添加模型调用的代码(处理输入并返回输出),例如
        # x = layer1(input)
        # output = layer2(x)
        return output

    # 还可以添加自定义的方法
../../_images/model.png

Keras模型类定义示意图

继承 tf.keras.Model 后,我们同时可以使用父类的若干方法和属性,例如在实例化类 model = Model() 后,可以通过 model.variables 这一属性直接获得模型中的所有变量,免去我们一个个显式指定变量的麻烦。

上一章中简单的线性模型 y_pred = a * X + b ,我们可以通过模型类的方式编写如下:

import tensorflow as tf

X = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
y = tf.constant([[10.0], [20.0]])


class Linear(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.dense = tf.keras.layers.Dense(
            units=1,
            activation=None,
            kernel_initializer=tf.zeros_initializer(),
            bias_initializer=tf.zeros_initializer()
        )

    def call(self, input):
        output = self.dense(input)
        return output


# 以下代码结构与前节类似
model = Linear()
optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
for i in range(100):
    with tf.GradientTape() as tape:
        y_pred = model(X)      # 调用模型 y_pred = model(X) 而不是显式写出 y_pred = a * X + b
        loss = tf.reduce_mean(tf.square(y_pred - y))
    grads = tape.gradient(loss, model.variables)    # 使用 model.variables 这一属性直接获得模型中的所有变量
    optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))
print(model.variables)

这里,我们没有显式地声明 ab 两个变量并写出 y_pred = a * X + b 这一线性变换,而是建立了一个继承了 tf.keras.Model 的模型类 Linear 。这个类在初始化部分实例化了一个 全连接层tf.keras.layers.Dense ),并在call方法中对这个层进行调用,实现了线性变换的计算。如果需要显式地声明自己的变量并使用变量进行自定义运算,或者希望了解Keras层的内部原理,请参考 自定义层

Keras的全连接层:线性变换+激活函数

全连接层 (Fully-connected Layer,tf.keras.layers.Dense )是Keras中最基础和常用的层之一,对输入矩阵 \(A\) 进行 \(f(AW + b)\) 的线性变换+激活函数操作。如果不指定激活函数,即是纯粹的线性变换 \(AW + b\)。具体而言,给定输入张量 input = [batch_size, input_dim] ,该层对输入张量首先进行 tf.matmul(input, kernel) + bias 的线性变换( kernelbias 是层中可训练的变量),然后对线性变换后张量的每个元素通过激活函数 activation ,从而输出形状为 [batch_size, units] 的二维张量。

../../_images/dense.png

其包含的主要参数如下:

  • units :输出张量的维度;
  • activation :激活函数,对应于 \(f(AW + b)\) 中的 \(f\) ,默认为无激活函数( a(x) = x )。常用的激活函数包括 tf.nn.relutf.nn.tanhtf.nn.sigmoid
  • use_bias :是否加入偏置向量 bias ,即 \(f(AW + b)\) 中的 \(b\)。默认为 True
  • kernel_initializerbias_initializer :权重矩阵 kernel 和偏置向量 bias 两个变量的初始化器。默认为 tf.glorot_uniform_initializer [1] 。设置为 tf.zeros_initializer 表示将两个变量均初始化为全0;

该层包含权重矩阵 kernel = [input_dim, units] 和偏置向量 bias = [units] [2] 两个可训练变量,对应于 \(f(AW + b)\) 中的 \(W\)\(b\)

这里着重从数学矩阵运算和线性变换的角度描述了全连接层。基于神经元建模的描述可参考 后文介绍

[1]Keras中的很多层都默认使用 tf.glorot_uniform_initializer 初始化变量,关于该初始化器可参考 https://www.tensorflow.org/api_docs/python/tf/glorot_uniform_initializer
[2]你可能会注意到, tf.matmul(input, kernel) 的结果是一个形状为 [batch_size, units] 的二维矩阵,这个二维矩阵要如何与形状为 [units] 的一维偏置向量bias相加呢?事实上,这里是TensorFlow的Broadcasting机制在起作用,该加法运算相当于将二维矩阵的每一行加上了 Bias 。Broadcasting机制的具体介绍可见 https://www.tensorflow.org/xla/broadcasting

为什么模型类是重载 call() 方法而不是 __call__() 方法?

在Python中,对类的实例 myClass 进行形如 myClass() 的调用等价于 myClass.__call__() (具体请见本章初“前置知识”的 __call__() 部分)。那么看起来,为了使用 y_pred = model(X) 的形式调用模型类,应该重写 __call__() 方法才对呀?原因是Keras在模型调用的前后还需要有一些自己的内部操作,所以暴露出一个专门用于重载的 call() 方法。 tf.keras.Model 这一父类已经包含 __call__() 的定义。 __call__() 中主要调用了 call() 方法,同时还需要在进行一些keras的内部操作。这里,我们通过继承 tf.keras.Model 并重载 call() 方法,即可在保持keras结构的同时加入模型调用的代码。

基础示例:多层感知机(MLP)

我们从编写一个最简单的 多层感知机 (Multilayer Perceptron, MLP),或者说“多层全连接神经网络”开始,介绍TensorFlow的模型编写方式。在这一部分,我们依次进行以下步骤:

  • 使用 tf.keras.datasets 获得数据集并预处理
  • 使用 tf.keras.Modeltf.keras.layers 构建模型
  • 构建模型训练流程,使用 tf.keras.losses 计算损失函数,并使用 tf.keras.optimizer 优化模型
  • 构建模型评估流程,使用 tf.keras.metrics 计算评估指标

基础知识和原理

这里,我们使用多层感知机完成MNIST手写体数字图片数据集 [LeCun1998] 的分类任务。

../../_images/mnist_0-9.png

MNIST手写体数字图片示例

数据获取及预处理: tf.keras.datasets

先进行预备工作,实现一个简单的 MNISTLoader 类来读取MNIST数据集数据。这里使用了 tf.keras.datasets 快速载入MNIST数据集。

class MNISTLoader():
    def __init__(self):
        mnist = tf.keras.datasets.mnist
        (self.train_data, self.train_label), (self.test_data, self.test_label) = mnist.load_data()
        # MNIST中的图像默认为uint8(0-255的数字)。以下代码将其归一化到0-1之间的浮点数,并在最后增加一维作为颜色通道
        self.train_data = np.expand_dims(self.train_data.astype(np.float32) / 255.0, axis=-1)      # [60000, 28, 28, 1]
        self.test_data = np.expand_dims(self.test_data.astype(np.float32) / 255.0, axis=-1)        # [10000, 28, 28, 1]
        self.train_label = self.train_label.astype(np.int32)    # [60000]
        self.test_label = self.test_label.astype(np.int32)      # [10000]
        self.num_train_data, self.num_test_data = self.train_data.shape[0], self.test_data.shape[0]

    def get_batch(self, batch_size):
        # 从数据集中随机取出batch_size个元素并返回
        index = np.random.randint(0, np.shape(self.train_data)[0], batch_size)
        return self.train_data[index, :], self.train_label[index]

提示

mnist = tf.keras.datasets.mnist 将从网络上自动下载MNIST数据集并加载。如果运行时出现网络连接错误,可以从 https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npzhttps://s3.amazonaws.com/img-datasets/mnist.npz 下载MNIST数据集 mnist.npz 文件,并放置于用户目录的 .keras/dataset 目录下(Windows下用户目录为 C:\Users\用户名 ,Linux下用户目录为 /home/用户名 )。

TensorFlow的图像数据表示

在TensorFlow中,图像数据集的一种典型表示是 [图像数目,长,宽,色彩通道数] 的四维张量。在上面的 DataLoader 类中, self.train_dataself.test_data 分别载入了60,000和10,000张大小为 28*28 的手写体数字图片。由于这里读入的是灰度图片,色彩通道数为1(彩色RGB图像色彩通道数为3),所以我们使用 np.expand_dims() 函数为图像数据手动在最后添加一维通道。

模型的构建: tf.keras.Modeltf.keras.layers

多层感知机的模型类实现与上面的线性模型类似,使用 tf.keras.Modeltf.keras.layers 构建,所不同的地方在于层数增加了(顾名思义,“多层”感知机),以及引入了非线性激活函数(这里使用了 ReLU函数 , 即下方的 activation=tf.nn.relu )。该模型输入一个向量(比如这里是拉直的 1×784 手写体数字图片),输出10维的向量,分别代表这张图片属于0到9的概率。

class MLP(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.flatten = tf.keras.layers.Flatten()    # Flatten层将除第一维(batch_size)以外的维度展平
        self.dense1 = tf.keras.layers.Dense(units=100, activation=tf.nn.relu)
        self.dense2 = tf.keras.layers.Dense(units=10)

    def call(self, inputs):         # [batch_size, 28, 28, 1]
        x = self.flatten(inputs)    # [batch_size, 784]
        x = self.dense1(x)          # [batch_size, 100]
        x = self.dense2(x)          # [batch_size, 10]
        output = tf.nn.softmax(x)
        return output

softmax函数

这里,因为我们希望输出“输入图片分别属于0到9的概率”,也就是一个10维的离散概率分布,所以我们希望这个10维向量至少满足两个条件:

  • 该向量中的每个元素均在 \([0, 1]\) 之间;
  • 该向量的所有元素之和为1。

为了使得模型的输出能始终满足这两个条件,我们使用 Softmax函数 (归一化指数函数, tf.nn.softmax )对模型的原始输出进行归一化。其形式为 \(\sigma(\mathbf{z})_j = \frac{e^{z_j}}{\sum_{k=1}^K e^{z_k}}\) 。不仅如此,softmax函数能够凸显原始向量中最大的值,并抑制远低于最大值的其他分量,这也是该函数被称作softmax函数的原因(即平滑化的argmax函数)。

../../_images/mlp.png

MLP模型示意图

模型的训练: tf.keras.lossestf.keras.optimizer

定义一些模型超参数:

training_loop = 'custom' # 'keras', 'custom' or 'graph'
num_epochs = 0.1
batch_size = 50

实例化模型和数据读取类,并实例化一个 tf.keras.optimizer 的优化器(这里使用常用的Adam优化器):

model = MLP()
data_loader = MNISTLoader()
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

然后迭代进行以下步骤:

  • 从DataLoader中随机取一批训练数据;
  • 将这批数据送入模型,计算出模型的预测值;
  • 将模型预测值与真实值进行比较,计算损失函数(loss)。这里使用 tf.keras.losses 中的交叉熵函数作为损失函数;
  • 计算损失函数关于模型变量的导数;
  • 将求出的导数值传入优化器,使用优化器的 apply_gradients 方法更新模型参数以最小化损失函数(优化器的详细使用方法见 前章 )。

具体代码实现如下:

    num_batches = int(data_loader.num_train_data // batch_size * num_epochs)
    for batch_index in range(num_batches):
        X, y = data_loader.get_batch(batch_size)
        with tf.GradientTape() as tape:
            y_pred = model(X)
            loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=y, y_pred=y_pred)
            loss = tf.reduce_mean(loss)
            print("batch %d: loss %f" % (batch_index, loss.numpy()))
        grads = tape.gradient(loss, model.variables)
        optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))

交叉熵(cross entropy)与 tf.keras.losses

你或许注意到了,在这里,我们没有显式地写出一个损失函数,而是使用了 tf.keras.losses 中的 sparse_categorical_crossentropy (交叉熵)函数,将模型的预测值 y_pred 与真实的标签值 y 作为函数参数传入,由Keras帮助我们计算损失函数的值。

交叉熵作为损失函数,在分类问题中被广泛应用。其离散形式为 \(H(y, \hat{y}) = -\sum_{i=1}^{n}y_i \log(\hat{y_i})\) ,其中 \(y\) 为真实概率分布, \(\hat{y}\) 为预测概率分布, \(n\) 为分类任务的类别个数。预测概率分布与真实分布越接近,则交叉熵的值越小,反之则越大。更具体的介绍及其在机器学习中的应用可参考 这篇博客文章

tf.keras 中,有两个交叉熵相关的损失函数 tf.keras.losses.categorical_crossentropytf.keras.losses.sparse_categorical_crossentropy 。其中sparse的含义是,真实的标签值 y_true 可以直接传入int类型的标签类别。具体而言:

loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=y, y_pred=y_pred)

loss = tf.keras.losses.categorical_crossentropy(
    y_true=tf.one_hot(y, depth=tf.shape(y_pred)[-1]),
    y_pred=y_pred
)

的结果相同。

模型的评估: tf.keras.metrics

最后,我们使用测试集评估模型性能。具体而言,比较测试集上模型预测的结果与真实结果,输出预测正确的样本数占总样本数的比例。这里,我们使用 tf.keras.metrics 中的 SparseCategoricalAccuracy 评估器来评估模型在测试集上的性能,该评估器。我们迭代测试数据集,每次通过 update_state() 方法向评估器输入两个参数: y_predy_true ,即模型预测出的结果和真实结果。同时,评估器具有内部变量来保存当前评估指标相关的参数数值(例如当前已传入的累计样本数和当前预测正确的样本数)。迭代结束后,使用 result() 方法输出最终的评估指标值。在以下代码中,我们实例化了一个 tf.keras.metrics.SparseCategoricalAccuracy 评估器,并使用For循环迭代分批次传入了测试集数据的预测结果与真实结果,并输出训练后的模型在测试数据集上的准确率。

    sparse_categorical_accuracy = tf.keras.metrics.SparseCategoricalAccuracy()
    num_batches = int(data_loader.num_test_data // batch_size)
    for batch_index in range(num_batches):
        start_index, end_index = batch_index * batch_size, (batch_index + 1) * batch_size
        y_pred = model.predict(data_loader.test_data[start_index: end_index])
        sparse_categorical_accuracy.update_state(y_true=data_loader.test_label[start_index: end_index], y_pred=y_pred)
    print("test accuracy: %f" % sparse_categorical_accuracy.result())

输出结果:

test accuracy: 0.947900

可以注意到,使用这样简单的模型,已经可以达到95%左右的准确率。

神经网络的基本单位:神经元 [3]

如果我们将上面的神经网络放大来看,详细研究计算过程,比如取第二层的第k个计算单元,可以得到示意图如下:

../../_images/neuron.png

该计算单元 \(Q_k\) 有100个权值参数 \(w_{0k}, w_{1k}, ..., w_{99k}\) 和1个偏置参数 \(b_k\) 。将第1层中所有的100个计算单元 \(P_0, P_1, ..., P_{99}\) 的值作为输入,分别按权值 \(w_{ik}\) 加和(即 \(\sum_{i=0}^{99} w_{ik} P_i\) ),并加上偏置值 \(b_k\) ,然后送入激活函数 \(f\) 进行计算,即得到输出结果。

事实上,这种结构和真实的神经细胞(神经元)类似。神经元由树突、胞体和轴突构成。树突接受其他神经元传来的信号作为输入(一个神经元可以有数千甚至上万树突),胞体对电位信号进行整合,而产生的信号则通过轴突传到神经末梢的突触,传播到下一个(或多个)神经元。

../../_images/real_neuron.png

神经细胞模式图(修改自 Quasar Jarosz at English Wikipedia [CC BY-SA 3.0 (https://creativecommons.org/licenses/by-sa/3.0)])

上面的计算单元,可以被视作对神经元结构的数学建模。在上面的例子里,第二层的每一个计算单元(人工神经元)有100个权值参数和1个偏置参数,而第二层计算单元的数目是10个,因此这一个全连接层的总参数量为100*10个权值参数和10个偏置参数。事实上,这正是该全连接层中的两个变量 kernelbias 的形状。仔细研究一下,你会发现,这里基于神经元建模的介绍与上文基于矩阵计算的介绍是等价的。

[3]事实上,应当是先有神经元建模的概念,再有基于人工神经元和层结构的人工神经网络。但由于本手册着重介绍TensorFlow的使用方法,所以调换了介绍顺序。

卷积神经网络(CNN)

卷积神经网络 (Convolutional Neural Network, CNN)是一种结构类似于人类或动物的 视觉系统 的人工神经网络,包含一个或多个卷积层(Convolutional Layer)、池化层(Pooling Layer)和全连接层(Fully-connected Layer)。

基础知识和原理

具体的实现见下,和MLP很类似,只是新加入了一些卷积层和池化层。

../../_images/cnn.png

CNN结构图示

class CNN(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.conv1 = tf.keras.layers.Conv2D(
            filters=32,             # 卷积核数目
            kernel_size=[5, 5],     # 感受野大小
            padding="same",         # padding策略
            activation=tf.nn.relu   # 激活函数
        )
        self.pool1 = tf.keras.layers.MaxPool2D(pool_size=[2, 2], strides=2)
        self.conv2 = tf.keras.layers.Conv2D(
            filters=64,
            kernel_size=[5, 5],
            padding="same",
            activation=tf.nn.relu
        )
        self.pool2 = tf.keras.layers.MaxPool2D(pool_size=[2, 2], strides=2)
        self.flatten = tf.keras.layers.Reshape(target_shape=(7 * 7 * 64,))
        self.dense1 = tf.keras.layers.Dense(units=1024, activation=tf.nn.relu)
        self.dense2 = tf.keras.layers.Dense(units=10)

    def call(self, inputs):
        x = self.conv1(inputs)                  # [batch_size, 28, 28, 32]
        x = self.pool1(x)                       # [batch_size, 14, 14, 32]
        x = self.conv2(x)                       # [batch_size, 14, 14, 64]
        x = self.pool2(x)                       # [batch_size, 7, 7, 64]
        x = self.flatten(x)                     # [batch_size, 7 * 7 * 64]
        x = self.dense1(x)                      # [batch_size, 1024]
        x = self.dense2(x)                      # [batch_size, 10]
        output = tf.nn.softmax(x)
        return output

将前节的 model = MLP() 更换成 model = CNN() ,输出如下:

test accuracy: 0.988100

可以发现准确率有非常显著的提高。事实上,通过改变模型的网络结构(比如加入Dropout层防止过拟合),准确率还有进一步提升的空间。

卷积层和池化层的工作原理

卷积层(Convolutional Layer,以 tf.keras.layers.Conv2D 为代表)是CNN的核心组件,其结构与大脑的视觉皮层有类似之处。

回忆我们之前建立的 神经细胞的计算模型 以及全连接层,我们默认每个神经元与上一层的所有神经元相连。不过,在视觉皮层的神经元中,情况并不是这样。你或许在生物课上学习过 感受野 (Receptive Field)这一概念,即视觉皮层中的神经元并非与前一层的所有神经元相连,而只是感受一片区域内的视觉信号,并只对局部区域的视觉刺激进行反应。CNN中的卷积层正体现了这一特性。

例如,下图是一个7×7的单通道图片信号输入:

../../_images/conv_image.png

如果使用之前基于全连接层的模型,我们需要让每个输入信号对应一个权值,即建模一个神经元需要7×7=49个权值(不考虑偏置权值),并得到一个输出信号。如果一层有N个神经元,我们就需要49N个权值,并得到N个输出信号。

而在CNN的卷积层中,我们这样建模一个卷积层的神经元:

../../_images/conv_field.png

图中3×3的红框代表该神经元的感受野。由此,我们只需3×3=9个权值 \(W = \begin{bmatrix}w_{1, 1} & w_{1, 2} & w_{1, 3} \\w_{2, 1} & w_{2, 2} & w_{2, 3} \\w_{3, 1} & w_{3, 2} & w_{3, 3}\end{bmatrix}\) ,即可得到一个输出信号。例如,对于红框所示的位置,输出信号即为对矩阵 \(\begin{bmatrix}0 \times w_{1, 1} & 0 \times w_{1, 2} & 0 \times w_{1, 3} \\0 \times w_{2, 1} & 1 \times w_{2, 2} & 0 \times w_{2, 3} \\0 \times w_{3, 1} & 0 \times w_{3, 2} & 2 \times w_{3, 3}\end{bmatrix}\) 的所有元素求和,并通过激活函数,记作 \(a_{1, 1}\)

不过,3×3的范围显然不足以处理整个图像,因此我们使用滑动窗口的方法。使用相同的参数 \(W\) ,但将红框在图像中从左到有滑动,进行逐行扫描,每滑动到一个位置就计算一个值。例如,当红框向右移动一个单位时,我们计算矩阵 \(\begin{bmatrix}0 \times w_{1, 1} & 0 \times w_{1, 2} & 0 \times w_{1, 3} \\1 \times w_{2, 1} & 0 \times w_{2, 2} & 1 \times w_{2, 3} \\0 \times w_{3, 1} & 2 \times w_{3, 2} & 1 \times w_{3, 3}\end{bmatrix}\) 的所有元素的和并通过激活函数,记作 \(a_{1, 2}\) 。由此,和一般的神经元只能输出1个值不同,这里的卷积层神经元可以输出一个5×5的矩阵 \(A = \begin{bmatrix}a_{1, 1} & \cdots & a_{1, 5} \\ \vdots & & \vdots \\ a_{5, 1} & \cdots & a_{5, 5}\end{bmatrix}\)

还有一个问题,以上假设图片都只有一个通道(例如灰度图片),但如果图像是彩色的(例如有RGB三个通道)该怎么办呢?此时,我们可以为每个通道准备一个3×3的权值矩阵,即一共有3×3×3=27个权值。对于每个通道,均使用自己的权值矩阵进行处理,输出时将多个通道所输出的值进行加和后再通过激活函数。

一个动态演示如下图所示。其中红色的矩阵为多通道的图像(这里展示为2个通道),绿色的矩阵为图像的每个通道所对应的权值矩阵 \(W\) ,蓝色的矩阵为输出矩阵 \(A\)

事实上,卷积的形式多种多样,以上的介绍只是其中最简单和基础的一种。更多卷积方式的示例可见 这里

池化层(Pooling Layer)的理解则简单得多,其可以理解为对图像进行降采样的过程,对于每一次滑动窗口中的所有值,输出其中的最大值(MaxPooling)/均值或其他值。例如,对于一个三通道的16×16图像(即一个 16*16*3 的张量),经过感受野为2×2,滑动步长为2的池化层,则得到一个 8*8*3 的张量。

循环神经网络(RNN)

循环神经网络(Recurrent Neural Network, RNN)是一种适宜于处理序列数据的神经网络,被广泛用于语言模型、文本生成、机器翻译等。

基础知识和原理

这里,我们使用RNN来进行尼采风格文本的自动生成。 [4]

这个任务的本质其实预测一段英文文本的接续字母的概率分布。比如,我们有以下句子:

I am a studen

这个句子(序列)一共有13个字符(包含空格)。当我们阅读到这个由13个字符组成的序列后,根据我们的经验,我们可以预测出下一个字符很大概率是“t”。我们希望建立这样一个模型,输入num_batch个由编码后字符组成的,长为seq_length的序列,输入张量形状为[num_batch, seq_length],输出这些序列接续的下一个字符的概率分布,概率分布的维度为字符种类数num_chars,输出张量形状为[num_batch, num_chars]。我们从下一个字符的概率分布中采样作为预测值,然后滚雪球式地生成下两个字符,下三个字符等等,即可完成文本的生成任务。

首先,还是实现一个简单的 DataLoader 类来读取文本,并以字符为单位进行编码。

class DataLoader():
    def __init__(self):
        path = tf.keras.utils.get_file('nietzsche.txt',
            origin='https://s3.amazonaws.com/text-datasets/nietzsche.txt')
        with open(path, encoding='utf-8') as f:
            self.raw_text = f.read().lower()
        self.chars = sorted(list(set(self.raw_text)))
        self.char_indices = dict((c, i) for i, c in enumerate(self.chars))
        self.indices_char = dict((i, c) for i, c in enumerate(self.chars))
        self.text = [self.char_indices[c] for c in self.raw_text]

    def get_batch(self, seq_length, batch_size):
        seq = []
        next_char = []
        for i in range(batch_size):
            index = np.random.randint(0, len(self.text) - seq_length)
            seq.append(self.text[index:index+seq_length])
            next_char.append(self.text[index+seq_length])
        return np.array(seq), np.array(next_char)       # [num_batch, seq_length], [num_batch]

接下来进行模型的实现。在 __init__ 方法中我们实例化一个常用的 BasicLSTMCell 单元,以及一个线性变换用的全连接层,我们首先对序列进行One Hot操作,即将编码i变换为一个n维向量,其第i位为1,其余均为0。这里n为字符种类数num_char。变换后的序列张量形状为[num_batch, seq_length, num_chars]。接下来,我们将序列从头到尾依序送入RNN单元,即将当前时间t的RNN单元状态 state 和t时刻的序列 inputs[:, t, :] 送入RNN单元,得到当前时间的输出 output 和下一个时间t+1的RNN单元状态。取RNN单元最后一次的输出,通过全连接层变换到num_chars维,即作为模型的输出。

../../_images/rnn_single.jpg

output, state = self.cell(inputs[:, t, :], state) 图示

../../_images/rnn.jpg

RNN流程图示

具体实现如下:

class RNN(tf.keras.Model):
    def __init__(self, num_chars, batch_size, seq_length):
        super().__init__()
        self.num_chars = num_chars
        self.seq_length = seq_length
        self.batch_size = batch_size
        self.cell = tf.keras.layers.LSTMCell(units=256)
        self.dense = tf.keras.layers.Dense(units=self.num_chars)

    def call(self, inputs, from_logits=False):
        inputs = tf.one_hot(inputs, depth=self.num_chars)       # [batch_size, seq_length, num_chars]
        state = self.cell.get_initial_state(batch_size=self.batch_size, dtype=tf.float32)
        for t in range(self.seq_length):
            output, state = self.cell(inputs[:, t, :], state)
        logits = self.dense(output)
        if from_logits:
            return logits
        else:
            return tf.nn.softmax(logits)

定义一些模型超参数:

    num_batches = 1000
    seq_length = 40
    batch_size = 50
    learning_rate = 1e-3

训练过程与前节基本一致,在此复述:

  • 从DataLoader中随机取一批训练数据;
  • 将这批数据送入模型,计算出模型的预测值;
  • 将模型预测值与真实值进行比较,计算损失函数(loss);
  • 计算损失函数关于模型变量的导数;
  • 使用优化器更新模型参数以最小化损失函数。
    data_loader = DataLoader()
    model = RNN(num_chars=len(data_loader.chars), batch_size=batch_size, seq_length=seq_length)
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
    for batch_index in range(num_batches):
        X, y = data_loader.get_batch(seq_length, batch_size)
        with tf.GradientTape() as tape:
            y_pred = model(X)
            loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=y, y_pred=y_pred)
            loss = tf.reduce_mean(loss)
            print("batch %d: loss %f" % (batch_index, loss.numpy()))
        grads = tape.gradient(loss, model.variables)
        optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))

关于文本生成的过程有一点需要特别注意。之前,我们一直使用 tf.argmax() 函数,将对应概率最大的值作为预测值。然而对于文本生成而言,这样的预测方式过于绝对,会使得生成的文本失去丰富性。于是,我们使用 np.random.choice() 函数按照生成的概率分布取样。这样,即使是对应概率较小的字符,也有机会被取样到。同时,我们加入一个 temperature 参数控制分布的形状,参数值越大则分布越平缓(最大值和最小值的差值越小),生成文本的丰富度越高;参数值越小则分布越陡峭,生成文本的丰富度越低。

    def predict(self, inputs, temperature=1.):
        batch_size, _ = tf.shape(inputs)
        logits = self(inputs, from_logits=True)
        prob = tf.nn.softmax(logits / temperature).numpy()
        return np.array([np.random.choice(self.num_chars, p=prob[i, :])
                         for i in range(batch_size.numpy())])

通过这种方式进行“滚雪球”式的连续预测,即可得到生成文本。

    X_, _ = data_loader.get_batch(seq_length, 1)
    for diversity in [0.2, 0.5, 1.0, 1.2]:
        X = X_
        print("diversity %f:" % diversity)
        for t in range(400):
            y_pred = model.predict(X, diversity)
            print(data_loader.indices_char[y_pred[0]], end='', flush=True)
            X = np.concatenate([X[:, 1:], np.expand_dims(y_pred, axis=1)], axis=-1)
        print("\n")

生成的文本如下:

diversity 0.200000:
conserted and conseive to the conterned to it is a self--and seast and the selfes as a seast the expecience and and and the self--and the sered is a the enderself and the sersed and as a the concertion of the series of the self in the self--and the serse and and the seried enes and seast and the sense and the eadure to the self and the present and as a to the self--and the seligious and the enders

diversity 0.500000:
can is reast to as a seligut and the complesed
has fool which the self as it is a the beasing and us immery and seese for entoured underself of the seless and the sired a mears and everyther to out every sone thes and reapres and seralise as a streed liees of the serse to pease the cersess of the selung the elie one of the were as we and man one were perser has persines and conceity of all self-el

diversity 1.000000:
entoles by
their lisevers de weltaale, arh pesylmered, and so jejurted count have foursies as is
descinty iamo; to semplization refold, we dancey or theicks-welf--atolitious on his
such which
here
oth idey of pire master, ie gerw their endwit in ids, is an trees constenved mase commars is leed mad decemshime to the mor the elige. the fedies (byun their ope wopperfitious--antile and the it as the f

diversity 1.200000:
cain, elvotidue, madehoublesily
inselfy!--ie the rads incults of to prusely le]enfes patuateded:.--a coud--theiritibaior "nrallysengleswout peessparify oonsgoscess teemind thenry ansken suprerial mus, cigitioum: 4reas. whouph: who
eved
arn inneves to sya" natorne. hag open reals whicame oderedte,[fingo is
zisternethta simalfule dereeg hesls lang-lyes thas quiin turjentimy; periaspedey tomm--whach
[4]此处的任务及实现参考了 https://github.com/keras-team/keras/blob/master/examples/lstm_text_generation.py

循环神经网络的工作过程

循环神经网络是一个处理时间序列数据的神经网络结构,也就是说,我们需要在脑海里有一根时间轴,循环神经网络具有初始状态 \(s_0\) ,在每个时间点 \(t\) 迭代对当前时间的输入 \(x_t\) 进行处理,修改自身的状态 \(s_t\) ,并进行输出 \(o_t\)

循环神经网络的核心是状态 \(s\) ,是一个特定维数的向量,类似于神经网络的“记忆”。在 \(t=0\) 的初始时刻,\(s_0\) 被赋予一个初始值(常用的为全0向量)。然后,我们用类似于递归的方法来描述循环神经网络的工作过程。即在 \(t\) 时刻,我们假设 \(s_{t-1}\) 已经求出,关注如何在此基础上求出 \(s_{t}\)

  • 对输出向量 \(x_t\) 通过矩阵 \(U\) 进行线性变换,\(U x_t\) 与状态s具有相同的维度;
  • \(s_{t-1}\) 通过矩阵 \(W\) 进行线性变换,\(W s_{t-1}\) 与状态s具有相同的维度;
  • 将上述得到的两个向量相加并通过激活函数,作为当前状态 \(s_t\) 的值,即 \(s_t = f(U x_t + W s_{t-1})\)。也就是说,当前状态的值是上一个状态的值和当前输入进行某种信息整合而产生的;
  • 对当前状态 \(s_t\) 通过矩阵 \(V\) 进行线性变换,得到当前时刻的输出 \(o_t\)

深度强化学习(DRL)

强化学习 (Reinforcement learning,RL)强调如何基于环境而行动,以取得最大化的预期利益。结合了深度学习技术后的强化学习更是如虎添翼。这两年广为人知的AlphaGo即是深度强化学习的典型应用。

这里,我们使用深度强化学习玩CartPole(平衡杆)游戏。简单说,我们需要让模型控制杆的左右运动,以让其一直保持竖直平衡状态。

../../_images/cartpole.gif

CartPole游戏

我们使用 OpenAI推出的Gym环境库 中的CartPole游戏环境,具体安装步骤和教程可参考 官方文档这里 。Gym的基本调用方法如下:

import gym

env = gym.make('CartPole-v1')       # 实例化一个游戏环境,参数为游戏名称
state = env.reset()                 # 初始化环境,获得初始状态
while True:
    env.render()                    # 对当前帧进行渲染,绘图到屏幕
    action = model.predict(state)   # 假设我们有一个训练好的模型,能够通过当前状态预测出这时应该进行的动作
    next_state, reward, done, info = env.step(action)   # 让环境执行动作,获得执行完动作的下一个状态,动作的奖励,游戏是否已结束以及额外信息
    if done:                        # 如果游戏结束则退出循环
        break

那么,我们的任务就是训练出一个模型,能够根据当前的状态预测出应该进行的一个好的动作。粗略地说,一个好的动作应当能够最大化整个游戏过程中获得的奖励之和,这也是强化学习的目标。

以下代码展示了如何使用深度强化学习中的Deep Q-Learning方法来训练模型。

import tensorflow as tf
import numpy as np
import gym
import random
from collections import deque

num_episodes = 500
num_exploration_episodes = 100
max_len_episode = 1000
batch_size = 32
learning_rate = 1e-3
gamma = 1.
initial_epsilon = 1.
final_epsilon = 0.01


# Q-network用于拟合Q函数,和前节的多层感知机类似。输入state,输出各个action下的Q-value(CartPole下为2维)。
class QNetwork(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.dense1 = tf.keras.layers.Dense(units=24, activation=tf.nn.relu)
        self.dense2 = tf.keras.layers.Dense(units=24, activation=tf.nn.relu)
        self.dense3 = tf.keras.layers.Dense(units=2)

    def call(self, inputs):
        x = self.dense1(inputs)
        x = self.dense2(x)
        x = self.dense3(x)
        return x

    def predict(self, inputs):
        q_values = self(inputs)
        return tf.argmax(q_values, axis=-1)


if __name__ == '__main__':
    env = gym.make('CartPole-v1')       # 实例化一个游戏环境,参数为游戏名称
    model = QNetwork()
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
    replay_buffer = deque(maxlen=10000)
    epsilon = initial_epsilon
    for episode_id in range(num_episodes):
        state = env.reset()             # 初始化环境,获得初始状态
        epsilon = max(
            initial_epsilon * (num_exploration_episodes - episode_id) / num_exploration_episodes,
            final_epsilon)
        for t in range(max_len_episode):
            env.render()                                # 对当前帧进行渲染,绘图到屏幕
            if random.random() < epsilon:               # epsilon-greedy探索策略
                action = env.action_space.sample()      # 以epsilon的概率选择随机动作
            else:
                action = model.predict(
                    tf.constant(np.expand_dims(state, axis=0), dtype=tf.float32)).numpy()
                action = action[0]

            # 让环境执行动作,获得执行完动作的下一个状态,动作的奖励,游戏是否已结束以及额外信息
            next_state, reward, done, info = env.step(action)
            # 如果游戏Game Over,给予大的负奖励
            reward = -10. if done else reward
            # 将(state, action, reward, next_state)的四元组(外加done标签表示是否结束)放入经验重放池
            replay_buffer.append((state, action, reward, next_state, 1 if done else 0))
            # 更新当前state
            state = next_state

            if done:                                    # 游戏结束则退出本轮循环,进行下一个episode
                print("episode %d, epsilon %f, score %d" % (episode_id, epsilon, t))
                break

            if len(replay_buffer) >= batch_size:
                # 从经验回放池中随机取一个批次的四元组,并分别转换为NumPy数组
                batch_state, batch_action, batch_reward, batch_next_state, batch_done = zip(
                    *random.sample(replay_buffer, batch_size))
                batch_state, batch_reward, batch_next_state, batch_done = \
                    [np.array(a, dtype=np.float32) for a in [batch_state, batch_reward, batch_next_state, batch_done]]
                batch_action = np.array(batch_action, dtype=np.int32)

                q_value = model(tf.constant(batch_next_state, dtype=tf.float32))
                y = batch_reward + (gamma * tf.reduce_max(q_value, axis=1)) * (1 - batch_done)  # 按照论文计算y值
                with tf.GradientTape() as tape:
                    loss = tf.keras.losses.mean_squared_error(  # 最小化y和Q-value的距离
                        y_true=y,
                        y_pred=tf.reduce_sum(model(tf.constant(batch_state)) * tf.one_hot(batch_action, depth=2), axis=1)
                    )
                grads = tape.gradient(loss, model.variables)
                optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))       # 计算梯度并更新参数

对于不同的任务(或者说环境),我们需要根据任务的特点,设计不同的状态以及采取合适的网络来拟合Q函数。例如,如果我们考虑经典的打砖块游戏(Gym环境库中的 Breakout-v0 ),每一次执行动作(挡板向左、向右或不动),都会返回一个 210 * 160 * 3 的RGB图片,表示当前屏幕画面。为了给打砖块游戏这个任务设计合适的状态表示,我们有以下分析:

  • 砖块的颜色信息并不是很重要,画面转换成灰度也不影响操作,因此可以去除状态中的颜色信息(即将图片转为灰度表示);
  • 小球移动的信息很重要,如果只知道单帧画面而不知道小球往哪边运动,即使是人也很难判断挡板应当移动的方向。因此,必须在状态中加入表征小球运动方向的信息。一个简单的方式是将当前帧与前面几帧的画面进行叠加,得到一个 210 * 160 * X (X为叠加帧数)的状态表示;
  • 每帧的分辨率不需要特别高,只要能大致表征方块、小球和挡板的位置以做出决策即可,因此对于每帧的长宽可做适当压缩。

而考虑到我们需要从图像信息中提取特征,使用CNN作为拟合Q函数的网络将更为适合。将上面的 QNetwork 更换为CNN网络,即可用于玩一些简单的视频游戏。

Keras Pipeline *

以上示例均使用了Keras的Subclassing API建立模型,即对 tf.keras.Model 类进行扩展以定义自己的新模型,同时手工编写了训练和评估模型的流程。这种方式灵活度高,且与其他流行的深度学习框架(如PyTorch、Chainer)共通,是本手册所推荐的方法。不过在很多时候,我们只需要建立一个结构相对简单和典型的神经网络(比如上文中的MLP和CNN),并使用常规的手段进行训练。这时,Keras也给我们提供了另一套更为简单高效的内置方法来建立、训练和评估模型。

Keras Sequential/Functional API模式建立模型

最典型和常用的神经网络结构是将一堆层按特定顺序叠加起来,那么,我们是不是只需要提供一个层的列表,就能由Keras将它们自动首尾相连,形成模型呢?Keras的Sequential API正是如此。通过向 tf.keras.models.Sequential() 提供一个层的列表,就能快速地建立一个 tf.keras.Model 模型并返回:

        model = tf.keras.models.Sequential([
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(100, activation=tf.nn.relu),
            tf.keras.layers.Dense(10),
            tf.keras.layers.Softmax()
        ])

不过,这种层叠结构并不能表示任意的神经网络结构。为此,Keras提供了Functional API,帮助我们建立更为复杂的模型,例如多输入/输出或存在参数共享的模型。其使用方法是将层作为可调用的对象并返回张量(这点与之前章节的使用方法一致),并将输入向量和输出向量提供给 tf.keras.Modelinputsoutputs 参数,示例如下:

        inputs = tf.keras.Input(shape=(28, 28, 1))
        x = tf.keras.layers.Flatten()(inputs)
        x = tf.keras.layers.Dense(units=100, activation=tf.nn.relu)(x)
        x = tf.keras.layers.Dense(units=10)(x)
        outputs = tf.keras.layers.Softmax()(x)
        model = tf.keras.Model(inputs=inputs, outputs=outputs)

使用Keras Model的 compilefitevaluate 方法训练和评估模型

当模型建立完成后,通过 tf.keras.Modelcompile 方法配置训练过程:

    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss=tf.keras.losses.sparse_categorical_crossentropy,
        metrics=[tf.keras.metrics.sparse_categorical_accuracy]
    )

tf.keras.Model.compile 接受3个重要的参数:

  • oplimizer :优化器,可从 tf.keras.optimizers 中选择;
  • loss :损失函数,可从 tf.keras.losses 中选择;
  • metrics :评估指标,可从 tf.keras.metrics 中选择。

接下来,可以使用 tf.keras.Modelfit 方法训练模型:

    model.fit(data_loader.train_data, data_loader.train_label, epochs=num_epochs, batch_size=batch_size)

tf.keras.Model.fit 接受5个重要的参数:

  • x :训练数据;
  • y :目标数据(数据标签);
  • epochs :将训练数据迭代多少遍;
  • batch_size :批次的大小;
  • validation_data :验证数据,可用于在训练过程中监控模型的性能。

Keras支持使用 tf.data.Dataset 进行训练,详见 tf.data

最后,使用 tf.keras.Model.evaluate 评估训练效果,提供测试数据及标签即可:

    print(model.evaluate(data_loader.test_data, data_loader.test_label))

自定义层、损失函数和评估指标 *

可能你还会问,如果现有的这些层无法满足我的要求,我需要定义自己的层怎么办?事实上,我们不仅可以继承 tf.keras.Model 编写自己的模型类,也可以继承 tf.keras.layers.Layer 编写自己的层。

自定义层

自定义层需要继承 tf.keras.layers.Layer 类,并重写 __init__buildcall 三个方法,如下所示:

class MyLayer(tf.keras.layers.Layer):
    def __init__(self):
        super().__init__()
        # 初始化代码

    def build(self, input_shape):     # input_shape 是一个 TensorShape 类型对象,提供输入的形状
        # 在第一次使用该层的时候调用该部分代码,在这里创建变量可以使得变量的形状自适应输入的形状
        # 而不需要使用者额外指定变量形状。
        # 如果已经可以完全确定变量的形状,也可以在__init__部分创建变量
        self.variable_0 = self.add_variable(...)
        self.variable_1 = self.add_variable(...)

    def call(self, inputs):
        # 模型调用的代码(处理输入并返回输出)
        return output

例如,如果我们要自己实现一个 本章第一节 中的全连接层( tf.keras.layers.Dense ),可以按如下方式编写。此代码在 build 方法中创建两个变量,并在 call 方法中使用创建的变量进行运算:

class LinearLayer(tf.keras.layers.Layer):
    def __init__(self, units):
        super().__init__()
        self.units = units

    def build(self, input_shape):     # 这里 input_shape 是第一次运行call()时参数inputs的形状
        self.w = self.add_variable(name='w',
            shape=[input_shape[-1], self.units], initializer=tf.zeros_initializer())
        self.b = self.add_variable(name='b',
            shape=[self.units], initializer=tf.zeros_initializer())

    def call(self, inputs):
        y_pred = tf.matmul(inputs, self.w) + self.b
        return y_pred

在定义模型的时候,我们便可以如同Keras中的其他层一样,调用我们自定义的层 LinearLayer

class LinearModel(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.layer = LinearLayer(units=1)

    def call(self, inputs):
        output = self.layer(inputs)
        return output

自定义损失函数和评估指标

自定义损失函数需要继承 tf.keras.losses.Loss 类,重写 call 方法即可,输入真实值 y_true 和模型预测值 y_pred ,输出模型预测值和真实值之间通过自定义的损失函数计算出的损失值。下面的示例为均方差损失函数:

class MeanSquaredError(tf.keras.losses.Loss):
    def call(self, y_true, y_pred):
        return tf.reduce_mean(tf.square(y_pred - y_true))

自定义评估指标需要继承 tf.keras.metrics.Metric 类,并重写 __init__update_stateresult 三个方法。下面的示例对前面用到的 SparseCategoricalAccuracy 评估指标类做了一个简单的重实现:

class SparseCategoricalAccuracy(tf.keras.metrics.Metric):
    def __init__(self):
        super().__init__()
        self.total = self.add_weight(name='total', dtype=tf.int32, initializer=tf.zeros_initializer())
        self.count = self.add_weight(name='count', dtype=tf.int32, initializer=tf.zeros_initializer())

    def update_state(self, y_true, y_pred, sample_weight=None):
        values = tf.cast(tf.equal(y_true, tf.argmax(y_pred, axis=-1, output_type=tf.int32)), tf.int32)
        self.total.assign_add(tf.shape(y_true)[0])
        self.count.assign_add(tf.reduce_sum(values))

    def result(self):
        return self.count / self.total
[LeCun1998]
  1. LeCun, L. Bottou, Y. Bengio, and P. Haffner. “Gradient-based learning applied to document recognition.” Proceedings of the IEEE, 86(11):2278-2324, November 1998. http://yann.lecun.com/exdb/mnist/
[Graves2013]Graves, Alex. “Generating Sequences With Recurrent Neural Networks.” ArXiv:1308.0850 [Cs], August 4, 2013. http://arxiv.org/abs/1308.0850.
[Mnih2013]Mnih, Volodymyr, Koray Kavukcuoglu, David Silver, Alex Graves, Ioannis Antonoglou, Daan Wierstra, and Martin Riedmiller. “Playing Atari with Deep Reinforcement Learning.” ArXiv:1312.5602 [Cs], December 19, 2013. http://arxiv.org/abs/1312.5602.