TensorFlow 模型建立與訓練

本章介紹如何使用 TensorFlow 快速搭建動態模型。

  • 模型的構建: tf.keras.Modeltf.keras.layers

  • 模型的損失函數: tf.keras.losses

  • 模型的優化器: tf.keras.optimizer

  • 模型的評估: tf.keras.metrics

前置知識

模型(Model)與層(Layer)

在 TensorFlow 中,推薦使用 Keras( tf.keras )構建模型。Keras 是一個廣爲流行的高級神經網絡 API,簡單、快速而不失靈活性,現已得到 TensorFlow 的官方內置和全面支持。

Keras 有兩個重要的概念: 模型(Model)層(Layer) 。層將各種計算流程和變量進行了封裝(例如基本的全連接層,CNN 的卷積層、池化層等),而模型則將各種層進行組織和連接,並封裝成一個整體,描述了如何將輸入數據通過各種層以及運算而得到輸出。在需要模型調用的時候,使用 y_pred = model(X) 的形式即可。Keras 在 tf.keras.layers 下內置了深度學習中大量常用的的預定義層,同時也允許我們自定義層。

Keras 模型以類的形式呈現,我們可以通過繼承 tf.keras.Model 這個 Python 類來定義自己的模型。在繼承類中,我們需要重寫 __init__() (構造函數,初始化)和 call(input) (模型調用)兩個方法,同時也可以根據需要增加自定義的方法。

class MyModel(tf.keras.Model):
    def __init__(self):
        super().__init__()     # Python 2 下使用 super(MyModel, self).__init__()
        # 此處添加初始化代碼(包含 call 方法中會用到的層),例如
        # layer1 = tf.keras.layers.BuiltInLayer(...)
        # layer2 = MyCustomLayer(...)

    def call(self, input):
        # 此處添加模型調用的代碼(處理輸入並返回輸出),例如
        # x = layer1(input)
        # output = layer2(x)
        return output

    # 還可以添加自定義的方法
../../_images/model.png

Keras 模型類定義示意圖

繼承 tf.keras.Model 後,我們同時可以使用父類的若干方法和屬性,例如在實例化類 model = Model() 後,可以通過 model.variables 這一屬性直接獲得模型中的所有變量,免去我們一個個顯式指定變量的麻煩。

上一章中簡單的線性模型 y_pred = a * X + b ,我們可以通過模型類的方式編寫如下:

import tensorflow as tf

X = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
y = tf.constant([[10.0], [20.0]])


class Linear(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.dense = tf.keras.layers.Dense(
            units=1,
            activation=None,
            kernel_initializer=tf.zeros_initializer(),
            bias_initializer=tf.zeros_initializer()
        )

    def call(self, input):
        output = self.dense(input)
        return output


# 以下代码结构与前节类似
model = Linear()
optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
for i in range(100):
    with tf.GradientTape() as tape:
        y_pred = model(X)      # 调用模型 y_pred = model(X) 而不是显式写出 y_pred = a * X + b
        loss = tf.reduce_mean(tf.square(y_pred - y))
    grads = tape.gradient(loss, model.variables)    # 使用 model.variables 这一属性直接获得模型中的所有变量
    optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))
print(model.variables)

這裡,我們沒有顯式地聲明 ab 兩個變量並寫出 y_pred = a * X + b 這一線性變換,而是建立了一個繼承了 tf.keras.Model 的模型類 Linear 。這個類在初始化部分實例化了一個 全連接層tf.keras.layers.Dense ),並在 call 方法中對這個層進行調用,實現了線性變換的計算。如果需要顯式地聲明自己的變量並使用變量進行自定義運算,或者希望了解 Keras 層的內部原理,請參考 自定義層

Keras 的全連接層:線性變換 + 激活函數

全連接層 (Fully-connected Layer,tf.keras.layers.Dense )是 Keras 中最基礎和常用的層之一,對輸入矩陣 A 進行 f(AW + b) 的線性變換 + 激活函數操作。如果不指定激活函數,即是純粹的線性變換 AW + b。具體而言,給定輸入張量 input = [batch_size, input_dim] ,該層對輸入張量首先進行 tf.matmul(input, kernel) + bias 的線性變換( kernelbias 是層中可訓練的變量),然後對線性變換後張量的每個元素通過激活函數 activation ,從而輸出形狀爲 [batch_size, units] 的二維張量。

../../_images/dense.png

其包含的主要參數如下:

  • units :輸出張量的維度;

  • activation :激活函數,對應於 f(AW + b) 中的 f ,默認爲無激活函數( a(x) = x )。常用的激活函數包括 tf.nn.relutf.nn.tanhtf.nn.sigmoid

  • use_bias :是否加入偏置向量 bias ,即 f(AW + b) 中的 b。默認爲 True

  • kernel_initializerbias_initializer :權重矩陣 kernel 和偏置向量 bias 兩個變量的初始化器。默認爲 tf.glorot_uniform_initializer 1 。設置爲 tf.zeros_initializer 表示將兩個變量均初始化爲全 0;

該層包含權重矩陣 kernel = [input_dim, units] 和偏置向量 bias = [units] 2 兩個可訓練變量,對應於 f(AW + b) 中的 Wb

這裡著重從數學矩陣運算和線性變換的角度描述了全連接層。基於神經元建模的描述可參考 後文介紹

1

Keras 中的很多層都默認使用 tf.glorot_uniform_initializer 初始化變量,關於該初始化器可參考 https://www.tensorflow.org/api_docs/python/tf/glorot_uniform_initializer

2

你可能會注意到, tf.matmul(input, kernel) 的結果是一個形狀爲 [batch_size, units] 的二維矩陣,這個二維矩陣要如何與形狀爲 [units] 的一維偏置向量 bias 相加呢?事實上,這裡是 TensorFlow 的 Broadcasting 機制在起作用,該加法運算相當於將二維矩陣的每一行加上了 Bias 。Broadcasting 機制的具體介紹可見 https://www.tensorflow.org/xla/broadcasting

爲什麼模型類是重載 call() 方法而不是 __call__() 方法?

在 Python 中,對類的實例 myClass 進行形如 myClass() 的調用等價於 myClass.__call__() (具體請見本章初 「前置知識」 的 __call__() 部分)。那麼看起來,爲了使用 y_pred = model(X) 的形式調用模型類,應該重寫 __call__() 方法才對呀?原因是 Keras 在模型調用的前後還需要有一些自己的內部操作,所以暴露出一個專門用於重載的 call() 方法。 tf.keras.Model 這一父類已經包含 __call__() 的定義。 __call__() 中主要調用了 call() 方法,同時還需要在進行一些 keras 的內部操作。這裡,我們通過繼承 tf.keras.Model 並重載 call() 方法,即可在保持 keras 結構的同時加入模型調用的代碼。

基礎示例:多層感知機(MLP)

我們從編寫一個最簡單的 多層感知機 (Multilayer Perceptron, MLP),或者說 「多層全連接神經網絡」 開始,介紹 TensorFlow 的模型編寫方式。在這一部分,我們依次進行以下步驟:

  • 使用 tf.keras.datasets 獲得數據集並預處理

  • 使用 tf.keras.Modeltf.keras.layers 構建模型

  • 構建模型訓練流程,使用 tf.keras.losses 計算損失函數,並使用 tf.keras.optimizer 優化模型

  • 構建模型評估流程,使用 tf.keras.metrics 計算評估指標

基礎知識和原理

這裡,我們使用多層感知機完成 MNIST 手寫體數字圖片數據集 [LeCun1998] 的分類任務。

../../_images/mnist_0-9.png

MNIST 手寫體數字圖片示例

數據獲取及預處理: tf.keras.datasets

先進行預備工作,實現一個簡單的 MNISTLoader 類來讀取 MNIST 數據集數據。這裡使用了 tf.keras.datasets 快速載入 MNIST 數據集。

class MNISTLoader():
    def __init__(self):
        mnist = tf.keras.datasets.mnist
        (self.train_data, self.train_label), (self.test_data, self.test_label) = mnist.load_data()
        # MNIST中的图像默认为uint8(0-255的数字)。以下代码将其归一化到0-1之间的浮点数,并在最后增加一维作为颜色通道
        self.train_data = np.expand_dims(self.train_data.astype(np.float32) / 255.0, axis=-1)      # [60000, 28, 28, 1]
        self.test_data = np.expand_dims(self.test_data.astype(np.float32) / 255.0, axis=-1)        # [10000, 28, 28, 1]
        self.train_label = self.train_label.astype(np.int32)    # [60000]
        self.test_label = self.test_label.astype(np.int32)      # [10000]
        self.num_train_data, self.num_test_data = self.train_data.shape[0], self.test_data.shape[0]

    def get_batch(self, batch_size):
        # 从数据集中随机取出batch_size个元素并返回
        index = np.random.randint(0, np.shape(self.train_data)[0], batch_size)
        return self.train_data[index, :], self.train_label[index]

提示

mnist = tf.keras.datasets.mnist 將從網絡上自動下載 MNIST 數據集並加載。如果運行時出現網絡連接錯誤,可以從 https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npzhttps://s3.amazonaws.com/img-datasets/mnist.npz 下載 MNIST 數據集 mnist.npz 文件,並放置於用戶目錄的 .keras/dataset 目錄下(Windows 下用戶目錄爲 C:\Users\用戶名 ,Linux 下用戶目錄爲 /home/用戶名 )。

TensorFlow 的圖像數據表示

在 TensorFlow 中,圖像數據集的一種典型表示是 [圖像數目,長,寬,色彩通道數] 的四維張量。在上面的 DataLoader 類中, self.train_dataself.test_data 分別載入了 60,000 和 10,000 張大小爲 28*28 的手寫體數字圖片。由於這裡讀入的是灰度圖片,色彩通道數爲 1(彩色 RGB 圖像色彩通道數爲 3),所以我們使用 np.expand_dims() 函數爲圖像數據手動在最後添加一維通道。

模型的構建: tf.keras.Modeltf.keras.layers

多層感知機的模型類實現與上面的線性模型類似,使用 tf.keras.Modeltf.keras.layers 構建,所不同的地方在於層數增加了(顧名思義,「多層」 感知機),以及引入了非線性激活函數(這裡使用了 ReLU 函數 , 即下方的 activation=tf.nn.relu )。該模型輸入一個向量(比如這裡是拉直的 1×784 手寫體數字圖片),輸出 10 維的向量,分別代表這張圖片屬於 0 到 9 的概率。

class MLP(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.flatten = tf.keras.layers.Flatten()    # Flatten层将除第一维(batch_size)以外的维度展平
        self.dense1 = tf.keras.layers.Dense(units=100, activation=tf.nn.relu)
        self.dense2 = tf.keras.layers.Dense(units=10)

    def call(self, inputs):         # [batch_size, 28, 28, 1]
        x = self.flatten(inputs)    # [batch_size, 784]
        x = self.dense1(x)          # [batch_size, 100]
        x = self.dense2(x)          # [batch_size, 10]
        output = tf.nn.softmax(x)
        return output

softmax 函數

這裡,因爲我們希望輸出 「輸入圖片分別屬於 0 到 9 的概率」,也就是一個 10 維的離散概率分布,所以我們希望這個 10 維向量至少滿足兩個條件:

  • 該向量中的每個元素均在 [0, 1] 之間;

  • 該向量的所有元素之和爲 1。

爲了使得模型的輸出能始終滿足這兩個條件,我們使用 Softmax 函數 (歸一化指數函數, tf.nn.softmax )對模型的原始輸出進行歸一化。其形式爲 \sigma(\mathbf{z})_j = \frac{e^{z_j}}{\sum_{k=1}^K e^{z_k}} 。不僅如此,softmax 函數能夠凸顯原始向量中最大的值,並抑制遠低於最大值的其他分量,這也是該函數被稱作 softmax 函數的原因(即平滑化的 argmax 函數)。

../../_images/mlp.png

MLP 模型示意圖

模型的訓練: tf.keras.lossestf.keras.optimizer

定義一些模型超參數:

num_epochs = 5
batch_size = 50
learning_rate = 0.001

實例化模型和數據讀取類,並實例化一個 tf.keras.optimizer 的優化器(這裡使用常用的 Adam 優化器):

model = MLP()
data_loader = MNISTLoader()
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

然後疊代進行以下步驟:

  • 從 DataLoader 中隨機取一批訓練數據;

  • 將這批數據送入模型,計算出模型的預測值;

  • 將模型預測值與真實值進行比較,計算損失函數(loss)。這裡使用 tf.keras.losses 中的交叉熵函數作爲損失函數;

  • 計算損失函數關於模型變量的導數;

  • 將求出的導數值傳入優化器,使用優化器的 apply_gradients 方法更新模型參數以最小化損失函數(優化器的詳細使用方法見 前章 )。

具體代碼實現如下:

    num_batches = int(data_loader.num_train_data // batch_size * num_epochs)
    for batch_index in range(num_batches):
        X, y = data_loader.get_batch(batch_size)
        with tf.GradientTape() as tape:
            y_pred = model(X)
            loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=y, y_pred=y_pred)
            loss = tf.reduce_mean(loss)
            print("batch %d: loss %f" % (batch_index, loss.numpy()))
        grads = tape.gradient(loss, model.variables)
        optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))

交叉熵(cross entropy)與 tf.keras.losses

你或許注意到了,在這裡,我們沒有顯式地寫出一個損失函數,而是使用了 tf.keras.losses 中的 sparse_categorical_crossentropy (交叉熵)函數,將模型的預測值 y_pred 與真實的標籤值 y 作爲函數參數傳入,由 Keras 幫助我們計算損失函數的值。

交叉熵作爲損失函數,在分類問題中被廣泛應用。其離散形式爲 H(y, \hat{y}) = -\sum_{i=1}^{n}y_i \log(\hat{y_i}) ,其中 y 爲真實概率分布, \hat{y} 爲預測概率分布, n 爲分類任務的類別個數。預測概率分布與真實分布越接近,則交叉熵的值越小,反之則越大。更具體的介紹及其在機器學習中的應用可參考 這篇博客文章

tf.keras 中,有兩個交叉熵相關的損失函數 tf.keras.losses.categorical_crossentropytf.keras.losses.sparse_categorical_crossentropy 。其中 sparse 的含義是,真實的標籤值 y_true 可以直接傳入 int 類型的標籤類別。具體而言:

loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=y, y_pred=y_pred)

loss = tf.keras.losses.categorical_crossentropy(
    y_true=tf.one_hot(y, depth=tf.shape(y_pred)[-1]),
    y_pred=y_pred
)

的結果相同。

模型的評估: tf.keras.metrics

最後,我們使用測試集評估模型的性能。這裡,我們使用 tf.keras.metrics 中的 SparseCategoricalAccuracy 評估器來評估模型在測試集上的性能,該評估器能夠對模型預測的結果與真實結果進行比較,並輸出預測正確的樣本數占總樣本數的比例。我們疊代測試數據集,每次通過 update_state() 方法向評估器輸入兩個參數: y_predy_true ,即模型預測出的結果和真實結果。評估器具有內部變量來保存當前評估指標相關的參數數值(例如當前已傳入的累計樣本數和當前預測正確的樣本數)。疊代結束後,我們使用 result() 方法輸出最終的評估指標值(預測正確的樣本數占總樣本數的比例)。

在以下代碼中,我們實例化了一個 tf.keras.metrics.SparseCategoricalAccuracy 評估器,並使用 For 循環疊代分批次傳入了測試集數據的預測結果與真實結果,並輸出訓練後的模型在測試數據集上的準確率。

    sparse_categorical_accuracy = tf.keras.metrics.SparseCategoricalAccuracy()
    num_batches = int(data_loader.num_test_data // batch_size)
    for batch_index in range(num_batches):
        start_index, end_index = batch_index * batch_size, (batch_index + 1) * batch_size
        y_pred = model.predict(data_loader.test_data[start_index: end_index])
        sparse_categorical_accuracy.update_state(y_true=data_loader.test_label[start_index: end_index], y_pred=y_pred)
    print("test accuracy: %f" % sparse_categorical_accuracy.result())

輸出結果:

test accuracy: 0.947900

可以注意到,使用這樣簡單的模型,已經可以達到 95% 左右的準確率。

神經網絡的基本單位:神經元 3

如果我們將上面的神經網絡放大來看,詳細研究計算過程,比如取第二層的第 k 個計算單元,可以得到示意圖如下:

../../_images/neuron.png

該計算單元 Q_k 有 100 個權值參數 w_{0k}, w_{1k}, ..., w_{99k} 和 1 個偏置參數 b_k 。將第 1 層中所有的 100 個計算單元 P_0, P_1, ..., P_{99} 的值作爲輸入,分別按權值 w_{ik} 加和(即 \sum_{i=0}^{99} w_{ik} P_i ),並加上偏置值 b_k ,然後送入激活函數 f 進行計算,即得到輸出結果。

事實上,這種結構和真實的神經細胞(神經元)類似。神經元由樹突、胞體和軸突構成。樹突接受其他神經元傳來的信號作爲輸入(一個神經元可以有數千甚至上萬樹突),胞體對電位信號進行整合,而產生的信號則通過軸突傳到神經末梢的突觸,傳播到下一個(或多個)神經元。

../../_images/real_neuron.png

神經細胞模式圖(修改自 Quasar Jarosz at English Wikipedia [CC BY-SA 3.0 (https://creativecommons.org/licenses/by-sa/3.0)])

上面的計算單元,可以被視作對神經元結構的數學建模。在上面的例子裡,第二層的每一個計算單元(人工神經元)有 100 個權值參數和 1 個偏置參數,而第二層計算單元的數目是 10 個,因此這一個全連接層的總參數量爲 100*10 個權值參數和 10 個偏置參數。事實上,這正是該全連接層中的兩個變量 kernelbias 的形狀。仔細研究一下,你會發現,這裡基於神經元建模的介紹與上文基於矩陣計算的介紹是等價的。

3

事實上,應當是先有神經元建模的概念,再有基於人工神經元和層結構的人工神經網絡。但由於本手冊著重介紹 TensorFlow 的使用方法,所以調換了介紹順序。

卷積神經網絡(CNN)

卷積神經網絡 (Convolutional Neural Network, CNN)是一種結構類似於人類或動物的 視覺系統 的人工神經網絡,包含一個或多個卷積層(Convolutional Layer)、池化層(Pooling Layer)和全連接層(Fully-connected Layer)。

基礎知識和原理

使用Keras實現卷積神經網絡

卷積神經網絡的一個示例實現如下所示,和 上節中的多層感知機 在代碼結構上很類似,只是新加入了一些卷積層和池化層。這裡的網絡結構並不是唯一的,可以增加、刪除或調整 CNN 的網絡結構和參數,以達到更好的性能。

class CNN(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.conv1 = tf.keras.layers.Conv2D(
            filters=32,             # 卷积层神经元(卷积核)数目
            kernel_size=[5, 5],     # 感受野大小
            padding='same',         # padding策略(vaild 或 same)
            activation=tf.nn.relu   # 激活函数
        )
        self.pool1 = tf.keras.layers.MaxPool2D(pool_size=[2, 2], strides=2)
        self.conv2 = tf.keras.layers.Conv2D(
            filters=64,
            kernel_size=[5, 5],
            padding='same',
            activation=tf.nn.relu
        )
        self.pool2 = tf.keras.layers.MaxPool2D(pool_size=[2, 2], strides=2)
        self.flatten = tf.keras.layers.Reshape(target_shape=(7 * 7 * 64,))
        self.dense1 = tf.keras.layers.Dense(units=1024, activation=tf.nn.relu)
        self.dense2 = tf.keras.layers.Dense(units=10)

    def call(self, inputs):
        x = self.conv1(inputs)                  # [batch_size, 28, 28, 32]
        x = self.pool1(x)                       # [batch_size, 14, 14, 32]
        x = self.conv2(x)                       # [batch_size, 14, 14, 64]
        x = self.pool2(x)                       # [batch_size, 7, 7, 64]
        x = self.flatten(x)                     # [batch_size, 7 * 7 * 64]
        x = self.dense1(x)                      # [batch_size, 1024]
        x = self.dense2(x)                      # [batch_size, 10]
        output = tf.nn.softmax(x)
        return output
../../_images/cnn.png

示例代碼中的 CNN 結構圖示

將前節的 model = MLP() 更換成 model = CNN() ,輸出如下:

test accuracy: 0.988100

可以發現準確率相較於前節的多層感知機有非常顯著的提高。事實上,通過改變模型的網絡結構(比如加入 Dropout 層防止過擬合),準確率還有進一步提升的空間。

使用Keras中預定義的經典卷積神經網絡結構

tf.keras.applications 中有一些預定義好的經典卷積神經網絡結構,如 VGG16VGG19ResNetMobileNet 等。我們可以直接調用這些經典的卷積神經網絡結構(甚至載入預訓練的參數),而無需手動定義網絡結構。

例如,我們可以使用以下代碼來實例化一個 MobileNetV2 網絡結構:

model = tf.keras.applications.MobileNetV2()

當執行以上代碼時,TensorFlow會自動從網絡上下載 MobileNetV2 網絡的預訓練權值,因此在第一次執行代碼時需要具備網絡連接。也可以通過將參數 weights 設置爲 None 來隨機初始化變量而不使用預訓練權值。每個網絡結構具有自己特定的詳細參數設置,一些共通的常用參數如下:

  • input_shape :輸入張量的形狀(不含第一維的Batch),大多默認爲 224 × 224 × 3 。一般而言,模型對輸入張量的大小有下限,長和寬至少爲 32 × 3275 × 75

  • include_top :在網絡的最後是否包含全連接層,默認爲 True

  • weights :預訓練權值,默認爲 'imagenet' ,即爲當前模型載入在ImageNet數據集上預訓練的權值。如需隨機初始化變量可設爲 None

  • classes :分類數,默認爲1000。修改該參數需要 include_top 參數爲 Trueweights 參數爲 None

各網絡模型參數的詳細介紹可參考 Keras文檔

設置訓練狀態

對於一些預定義的經典模型,其中的某些層(例如 BatchNormalization )在訓練和測試時的行爲是不同的(可以參考 這篇文章 )。因此,在訓練模型時,需要手動設置訓練狀態,告訴模型「我現在是處於訓練模型的階段」。可以通過

tf.keras.backend.set_learning_phase(True)

進行設置,也可以在調用模型時通過將 training 參數設爲 True 來設置。

以下展示一個例子,使用 MobileNetV2 網絡在 tf_flowers 五分類數據集上進行訓練(爲了代碼的簡短高效,在該示例中我們使用了 TensorFlow Datasetstf.data 載入和預處理數據)。同時將 classes 設置爲5,對應於5分類的數據集。

import tensorflow as tf
import tensorflow_datasets as tfds

num_epoch = 5
batch_size = 19
learning_rate = 0.001

dataset = tfds.load("tf_flowers", split=tfds.Split.TRAIN, as_supervised=True)
dataset = dataset.map(lambda img, label: (tf.image.resize(img, (224, 224)) / 255.0, label)).shuffle(1024).batch(batch_size)
model = tf.keras.applications.MobileNetV2(weights=None, classes=5)
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
for e in range(num_epoch):
    for images, labels in dataset:
        with tf.GradientTape() as tape:
            labels_pred = model(images, training=True)
            loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=labels, y_pred=labels_pred)
            loss = tf.reduce_mean(loss)
            print("loss %f" % loss.numpy())
        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(grads_and_vars=zip(grads, model.trainable_variables))
    print(labels_pred)

後文的部分章節(如 分布式訓練 )中,我們也會直接調用這些經典的網絡結構來進行訓練。

卷積層和池化層的工作原理

卷積層(Convolutional Layer,以 tf.keras.layers.Conv2D 爲代表)是 CNN 的核心組件,其結構與大腦的視覺皮層有類似之處。

回憶我們之前建立的 神經細胞的計算模型 以及全連接層,我們默認每個神經元與上一層的所有神經元相連。不過,在視覺皮層的神經元中,情況並不是這樣。你或許在生物課上學習過 感受野 (Receptive Field)這一概念,即視覺皮層中的神經元並非與前一層的所有神經元相連,而只是感受一片區域內的視覺信號,並只對局部區域的視覺刺激進行反應。CNN 中的卷積層正體現了這一特性。

例如,下圖是一個 7×7 的單通道圖片信號輸入:

../../_images/conv_image.png

如果使用之前基於全連接層的模型,我們需要讓每個輸入信號對應一個權值,即建模一個神經元需要 7×7=49 個權值(加上偏置項是50個),並得到一個輸出信號。如果一層有 N 個神經元,我們就需要 49N 個權值,並得到 N 個輸出信號。

而在 CNN 的卷積層中,我們這樣建模一個卷積層的神經元:

../../_images/conv_field.png

圖中 3×3 的紅框代表該神經元的感受野。由此,我們只需 3×3=9 個權值 W = \begin{bmatrix}w_{1, 1} & w_{1, 2} & w_{1, 3} \\w_{2, 1} & w_{2, 2} & w_{2, 3} \\w_{3, 1} & w_{3, 2} & w_{3, 3}\end{bmatrix} ,外加1個偏置項 b ,即可得到一個輸出信號。例如,對於紅框所示的位置,輸出信號即爲對矩陣 \begin{bmatrix}0 \times w_{1, 1} & 0 \times w_{1, 2} & 0 \times w_{1, 3} \\0 \times w_{2, 1} & 1 \times w_{2, 2} & 0 \times w_{2, 3} \\0 \times w_{3, 1} & 0 \times w_{3, 2} & 2 \times w_{3, 3}\end{bmatrix} 的所有元素求和並加上偏置項 b,記作 a_{1, 1}

不過,3×3 的範圍顯然不足以處理整個圖像,因此我們使用滑動窗口的方法。使用相同的參數 W ,但將紅框在圖像中從左到右滑動,進行逐行掃描,每滑動到一個位置就計算一個值。例如,當紅框向右移動一個單位時,我們計算矩陣 \begin{bmatrix}0 \times w_{1, 1} & 0 \times w_{1, 2} & 0 \times w_{1, 3} \\1 \times w_{2, 1} & 0 \times w_{2, 2} & 1 \times w_{2, 3} \\0 \times w_{3, 1} & 2 \times w_{3, 2} & 1 \times w_{3, 3}\end{bmatrix} 的所有元素的和加上偏置項 b,記作 a_{1, 2} 。由此,和一般的神經元只能輸出 1 個值不同,這裡的卷積層神經元可以輸出一個 5×5 的矩陣 A = \begin{bmatrix}a_{1, 1} & \cdots & a_{1, 5} \\ \vdots & & \vdots \\ a_{5, 1} & \cdots & a_{5, 5}\end{bmatrix}

../../_images/conv_procedure.png

卷積示意圖。一個單通道的 7×7 圖像在通過一個感受野爲 3×3 ,參數爲10個的卷積層神經元後,得到 5×5 的矩陣作爲卷積結果。

下面,我們使用TensorFlow來驗證一下上圖的計算結果。

將上圖中的輸入圖像、權值矩陣 W 和偏置項 b 表示爲NumPy數組 image , W , b 如下:

# TensorFlow 的图像表示为 [图像数目,长,宽,色彩通道数] 的四维张量
# 这里我们的输入图像 image 的张量形状为 [1, 7, 7, 1]
image = np.array([[
    [0, 0, 0, 0, 0, 0, 0],
    [0, 1, 0, 1, 2, 1, 0],
    [0, 0, 2, 2, 0, 1, 0],
    [0, 1, 1, 0, 2, 1, 0],
    [0, 0, 2, 1, 1, 0, 0],
    [0, 2, 1, 1, 2, 0, 0],
    [0, 0, 0, 0, 0, 0, 0]
]], dtype=np.float32)
image = np.expand_dims(image, axis=-1)  
W = np.array([[
    [ 0, 0, -1], 
    [ 0, 1, 0 ], 
    [-2, 0, 2 ]
]], dtype=np.float32)
b = np.array([1], dtype=np.float32)

然後建立一個僅有一個卷積層的模型,用 Wb 初始化 4

model = tf.keras.models.Sequential([
    tf.keras.layers.Conv2D(
        filters=1,              # 卷积层神经元(卷积核)数目
        kernel_size=[3, 3],     # 感受野大小
        kernel_initializer=tf.constant_initializer(W),
        bias_initializer=tf.constant_initializer(b)
    )]
)

最後將圖像數據 image 輸入模型,列印輸出:

output = model(image)
print(tf.squeeze(output))

程序運行結果爲:

tf.Tensor(
[[ 6.  5. -2.  1.  2.]
 [ 3.  0.  3.  2. -2.]
 [ 4.  2. -1.  0.  0.]
 [ 2.  1.  2. -1. -3.]
 [ 1.  1.  1.  3.  1.]], shape=(5, 5), dtype=float32)

可見與上圖中矩陣 A 的值一致。

還有一個問題,以上假設圖片都只有一個通道(例如灰度圖片),但如果圖像是彩色的(例如有 RGB 三個通道)該怎麼辦呢?此時,我們可以爲每個通道準備一個 3×3 的權值矩陣,即一共有 3×3×3=27 個權值。對於每個通道,均使用自己的權值矩陣進行處理,輸出時將多個通道所輸出的值進行加和即可。

可能有讀者會注意到,按照上述介紹的方法,每次卷積後的結果相比於原始圖像而言,四周都會「少一圈」。比如上面 7×7 的圖像,卷積後變成了 5×5 ,這有時會爲後面的工作帶來麻煩。因此,我們可以設定padding策略。在 tf.keras.layers.Conv2D 中,當我們將 padding 參數設爲 same 時,會將周圍缺少的部分使用0補齊,使得輸出的矩陣大小和輸入一致。

最後,既然我們可以使用滑動窗口的方法進行卷積,那麼每次滑動的步長是不是可以設置呢?答案是肯定的。通過 tf.keras.layers.Conv2Dstrides 參數即可設置步長(默認爲1)。比如,在上面的例子中,如果我們將步長設定爲2,輸出的卷積結果即會是一個3×3的矩陣。

事實上,卷積的形式多種多樣,以上的介紹只是其中最簡單和基礎的一種。更多卷積方式的示例可見 Convolution arithmetic

池化層(Pooling Layer)的理解則簡單得多,其可以理解爲對圖像進行降採樣的過程,對於每一次滑動窗口中的所有值,輸出其中的最大值(MaxPooling)、均值或其他方法產生的值。例如,對於一個三通道的 16×16 圖像(即一個 16*16*3 的張量),經過感受野爲 2×2,滑動步長爲 2 的池化層,則得到一個 8*8*3 的張量。

4

這裡使用了較爲簡易的Sequential模式建立模型,具體介紹見 後文

循環神經網絡(RNN)

循環神經網絡(Recurrent Neural Network, RNN)是一種適宜於處理序列數據的神經網絡,被廣泛用於語言模型、文本生成、機器翻譯等。

基礎知識和原理

這裡,我們使用 RNN 來進行尼採風格文本的自動生成。 5

這個任務的本質其實預測一段英文文本的接續字母的概率分布。比如,我們有以下句子:

I am a studen

這個句子(序列)一共有 13 個字符(包含空格)。當我們閱讀到這個由 13 個字符組成的序列後,根據我們的經驗,我們可以預測出下一個字符很大概率是 「t」。我們希望建立這樣一個模型,逐個輸入一段長爲 seq_length 的序列,輸出這些序列接續的下一個字符的概率分布。我們從下一個字符的概率分布中採樣作爲預測值,然後滾雪球式地生成下兩個字符,下三個字符等等,即可完成文本的生成任務。

首先,還是實現一個簡單的 DataLoader 類來讀取文本,並以字符爲單位進行編碼。設字符種類數爲 num_chars ,則每種字符賦予一個 0 到 num_chars - 1 之間的唯一整數編號 i。

class DataLoader():
    def __init__(self):
        path = tf.keras.utils.get_file('nietzsche.txt',
            origin='https://s3.amazonaws.com/text-datasets/nietzsche.txt')
        with open(path, encoding='utf-8') as f:
            self.raw_text = f.read().lower()
        self.chars = sorted(list(set(self.raw_text)))
        self.char_indices = dict((c, i) for i, c in enumerate(self.chars))
        self.indices_char = dict((i, c) for i, c in enumerate(self.chars))
        self.text = [self.char_indices[c] for c in self.raw_text]

    def get_batch(self, seq_length, batch_size):
        seq = []
        next_char = []
        for i in range(batch_size):
            index = np.random.randint(0, len(self.text) - seq_length)
            seq.append(self.text[index:index+seq_length])
            next_char.append(self.text[index+seq_length])
        return np.array(seq), np.array(next_char)       # [batch_size, seq_length], [num_batch]

接下來進行模型的實現。在 __init__ 方法中我們實例化一個常用的 LSTMCell 單元,以及一個線性變換用的全連接層,我們首先對序列進行「One Hot」操作,即將序列中的每個字符的編碼 i 均變換爲一個 num_char 維向量,其第 i 位爲 1,其餘均爲 0。變換後的序列張量形狀爲 [seq_length, num_chars] 。然後,我們初始化 RNN 單元的狀態,存入變量 state 中。接下來,將序列從頭到尾依次送入 RNN 單元,即在 t 時刻,將上一個時刻 t-1 的 RNN 單元狀態 state 和序列的第 t 個元素 inputs[t, :] 送入 RNN 單元,得到當前時刻的輸出 output 和 RNN 單元狀態。取 RNN 單元最後一次的輸出,通過全連接層變換到 num_chars 維,即作爲模型的輸出。

../../_images/rnn_single.jpg

output, state = self.cell(inputs[:, t, :], state) 圖示

../../_images/rnn.jpg

RNN 流程圖示

具體實現如下:

class RNN(tf.keras.Model):
    def __init__(self, num_chars, batch_size, seq_length):
        super().__init__()
        self.num_chars = num_chars
        self.seq_length = seq_length
        self.batch_size = batch_size
        self.cell = tf.keras.layers.LSTMCell(units=256)
        self.dense = tf.keras.layers.Dense(units=self.num_chars)

    def call(self, inputs, from_logits=False):
        inputs = tf.one_hot(inputs, depth=self.num_chars)       # [batch_size, seq_length, num_chars]
        state = self.cell.get_initial_state(batch_size=self.batch_size, dtype=tf.float32)
        for t in range(self.seq_length):
            output, state = self.cell(inputs[:, t, :], state)
        logits = self.dense(output)
        if from_logits:
            return logits
        else:
            return tf.nn.softmax(logits)

定義一些模型超參數:

    num_batches = 1000
    seq_length = 40
    batch_size = 50
    learning_rate = 1e-3

訓練過程與前節基本一致,在此複述:

  • DataLoader 中隨機取一批訓練數據;

  • 將這批數據送入模型,計算出模型的預測值;

  • 將模型預測值與真實值進行比較,計算損失函數(loss);

  • 計算損失函數關於模型變量的導數;

  • 使用優化器更新模型參數以最小化損失函數。

    data_loader = DataLoader()
    model = RNN(num_chars=len(data_loader.chars), batch_size=batch_size, seq_length=seq_length)
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
    for batch_index in range(num_batches):
        X, y = data_loader.get_batch(seq_length, batch_size)
        with tf.GradientTape() as tape:
            y_pred = model(X)
            loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=y, y_pred=y_pred)
            loss = tf.reduce_mean(loss)
            print("batch %d: loss %f" % (batch_index, loss.numpy()))
        grads = tape.gradient(loss, model.variables)
        optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))

關於文本生成的過程有一點需要特別注意。之前,我們一直使用 tf.argmax() 函數,將對應概率最大的值作爲預測值。然而對於文本生成而言,這樣的預測方式過於絕對,會使得生成的文本失去豐富性。於是,我們使用 np.random.choice() 函數按照生成的概率分布取樣。這樣,即使是對應概率較小的字符,也有機會被取樣到。同時,我們加入一個 temperature 參數控制分布的形狀,參數值越大則分布越平緩(最大值和最小值的差值越小),生成文本的豐富度越高;參數值越小則分布越陡峭,生成文本的豐富度越低。

    def predict(self, inputs, temperature=1.):
        batch_size, _ = tf.shape(inputs)
        logits = self(inputs, from_logits=True)
        prob = tf.nn.softmax(logits / temperature).numpy()
        return np.array([np.random.choice(self.num_chars, p=prob[i, :])
                         for i in range(batch_size.numpy())])

通過這種方式進行 「滾雪球」 式的連續預測,即可得到生成文本。

    X_, _ = data_loader.get_batch(seq_length, 1)
    for diversity in [0.2, 0.5, 1.0, 1.2]:
        X = X_
        print("diversity %f:" % diversity)
        for t in range(400):
            y_pred = model.predict(X, diversity)
            print(data_loader.indices_char[y_pred[0]], end='', flush=True)
            X = np.concatenate([X[:, 1:], np.expand_dims(y_pred, axis=1)], axis=-1)
        print("\n")

生成的文本如下:

diversity 0.200000:
conserted and conseive to the conterned to it is a self--and seast and the selfes as a seast the expecience and and and the self--and the sered is a the enderself and the sersed and as a the concertion of the series of the self in the self--and the serse and and the seried enes and seast and the sense and the eadure to the self and the present and as a to the self--and the seligious and the enders

diversity 0.500000:
can is reast to as a seligut and the complesed
has fool which the self as it is a the beasing and us immery and seese for entoured underself of the seless and the sired a mears and everyther to out every sone thes and reapres and seralise as a streed liees of the serse to pease the cersess of the selung the elie one of the were as we and man one were perser has persines and conceity of all self-el

diversity 1.000000:
entoles by
their lisevers de weltaale, arh pesylmered, and so jejurted count have foursies as is
descinty iamo; to semplization refold, we dancey or theicks-welf--atolitious on his
such which
here
oth idey of pire master, ie gerw their endwit in ids, is an trees constenved mase commars is leed mad decemshime to the mor the elige. the fedies (byun their ope wopperfitious--antile and the it as the f

diversity 1.200000:
cain, elvotidue, madehoublesily
inselfy!--ie the rads incults of to prusely le]enfes patuateded:.--a coud--theiritibaior "nrallysengleswout peessparify oonsgoscess teemind thenry ansken suprerial mus, cigitioum: 4reas. whouph: who
eved
arn inneves to sya" natorne. hag open reals whicame oderedte,[fingo is
zisternethta simalfule dereeg hesls lang-lyes thas quiin turjentimy; periaspedey tomm--whach
5

此處的任務及實現參考了 https://github.com/keras-team/keras/blob/master/examples/lstm_text_generation.py

循環神經網絡的工作過程

循環神經網絡是一個處理時間序列數據的神經網絡結構,也就是說,我們需要在腦海里有一根時間軸,循環神經網絡具有初始狀態 s_0 ,在每個時間點 t 疊代對當前時間的輸入 x_t 進行處理,修改自身的狀態 s_t ,並進行輸出 o_t

循環神經網絡的核心是狀態 s ,是一個特定維數的向量,類似於神經網絡的 「記憶」。在 t=0 的初始時刻,s_0 被賦予一個初始值(常用的爲全 0 向量)。然後,我們用類似於遞歸的方法來描述循環神經網絡的工作過程。即在 t 時刻,我們假設 s_{t-1} 已經求出,關注如何在此基礎上求出 s_{t}

  • 對輸入向量 x_t 通過矩陣 U 進行線性變換,U x_t 與狀態 s 具有相同的維度;

  • s_{t-1} 通過矩陣 W 進行線性變換,W s_{t-1} 與狀態 s 具有相同的維度;

  • 將上述得到的兩個向量相加並通過激活函數,作爲當前狀態 s_t 的值,即 s_t = f(U x_t + W s_{t-1})。也就是說,當前狀態的值是上一個狀態的值和當前輸入進行某種信息整合而產生的;

  • 對當前狀態 s_t 通過矩陣 V 進行線性變換,得到當前時刻的輸出 o_t

我們假設輸入向量 x_t 、狀態 s 和輸出向量 o_t 的維度分別爲 mnp,則 U \in \mathbb{R}^{m \times n}W \in \mathbb{R}^{n \times n}V \in \mathbb{R}^{n \times p}

上述爲最基礎的 RNN 原理介紹。在實際使用時往往使用一些常見的改進型,如LSTM(長短期記憶神經網絡,解決了長序列的梯度消失問題,適用於較長的序列)、GRU等。

深度強化學習(DRL)

強化學習 (Reinforcement learning,RL)強調如何基於環境而行動,以取得最大化的預期利益。結合了深度學習技術後的強化學習(Deep Reinforcement learning,DRL)更是如虎添翼。近年廣爲人知的 AlphaGo 即是深度強化學習的典型應用。

注解

可參考本手冊附錄的 強化學習簡介 一章以獲得強化學習的基礎知識。

這裡,我們使用深度強化學習玩 CartPole(倒立擺)遊戲。倒立擺是控制論中的經典問題,在這個遊戲中,一根杆的底部與一個小車通過軸相連,而杆的重心在軸之上,因此是一個不穩定的系統。在重力的作用下,杆很容易倒下。而我們則需要控制小車在水平的軌道上進行左右運動,以使得杆一直保持豎直平衡狀態。

../../_images/cartpole.gif

CartPole 遊戲

我們使用 OpenAI 推出的 Gym 環境庫 中的 CartPole 遊戲環境,可使用 pip install gym 進行安裝,具體安裝步驟和教程可參考 官方文檔這裡 。和Gym的交互過程很像是一個回合制遊戲,我們首先獲得遊戲的初始狀態(比如杆的初始角度和小車位置),然後在每個回合t,我們都需要在當前可行的動作中選擇一個並交由Gym執行(比如向左或者向右推動小車,每個回合中二者只能擇一),Gym在執行動作後,會返回動作執行後的下一個狀態和當前回合所獲得的獎勵值(比如我們選擇向左推動小車並執行後,小車位置更加偏左,而杆的角度更加偏右,Gym將新的角度和位置返回給我們。而如果杆在這一回合仍沒有倒下,Gym同時返回給我們一個小的正獎勵)。這個過程可以一直疊代下去,直到遊戲終止(比如杆倒下了)。在 Python 中,Gym 的基本調用方法如下:

import gym

env = gym.make('CartPole-v1')       # 實例化一個遊戲環境,參數爲遊戲名稱
state = env.reset()                 # 初始化環境,獲得初始狀態
while True:
    env.render()                    # 對當前幀進行渲染,繪圖到屏幕
    action = model.predict(state)   # 假設我們有一個訓練好的模型,能夠通過當前狀態預測出這時應該進行的動作
    next_state, reward, done, info = env.step(action)   # 讓環境執行動作,獲得執行完動作的下一個狀態,動作的獎勵,遊戲是否已結束以及額外信息
    if done:                        # 如果遊戲結束則退出循環
        break

那麼,我們的任務就是訓練出一個模型,能夠根據當前的狀態預測出應該進行的一個好的動作。粗略地說,一個好的動作應當能夠最大化整個遊戲過程中獲得的獎勵之和,這也是強化學習的目標。以CartPole遊戲爲例,我們的目標是希望做出合適的動作使得杆一直不倒,即遊戲交互的回合數儘可能地多。而回合每進行一次,我們都會獲得一個小的正獎勵,回合數越多則累積的獎勵值也越高。因此,我們最大化遊戲過程中的獎勵之和與我們的最終目標是一致的。

以下代碼展示了如何使用深度強化學習中的 Deep Q-Learning 方法 [Mnih2013] 來訓練模型。首先,我們引入TensorFlow、Gym和一些常用庫,並定義一些模型超參數:

import tensorflow as tf
import numpy as np
import gym
import random
from collections import deque

num_episodes = 500              # 游戏训练的总episode数量
num_exploration_episodes = 100  # 探索过程所占的episode数量
max_len_episode = 1000          # 每个episode的最大回合数
batch_size = 32                 # 批次大小
learning_rate = 1e-3            # 学习率
gamma = 1.                      # 折扣因子
initial_epsilon = 1.            # 探索起始时的探索率
final_epsilon = 0.01            # 探索终止时的探索率

然後,我們使用 tf.keras.Model 建立一個Q函數網絡(Q-network),用於擬合Q Learning中的Q函數。這裡我們使用較簡單的多層全連接神經網絡進行擬合。該網絡輸入當前狀態,輸出各個動作下的Q-value(CartPole下爲2維,即向左和向右推動小車)。

class QNetwork(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.dense1 = tf.keras.layers.Dense(units=24, activation=tf.nn.relu)
        self.dense2 = tf.keras.layers.Dense(units=24, activation=tf.nn.relu)
        self.dense3 = tf.keras.layers.Dense(units=2)

    def call(self, inputs):
        x = self.dense1(inputs)
        x = self.dense2(x)
        x = self.dense3(x)
        return x

    def predict(self, inputs):
        q_values = self(inputs)
        return tf.argmax(q_values, axis=-1)

最後,我們在主程序中實現Q Learning算法。

if __name__ == '__main__':
    env = gym.make('CartPole-v1')       # 实例化一个游戏环境,参数为游戏名称
    model = QNetwork()
    optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
    replay_buffer = deque(maxlen=10000) # 使用一个 deque 作为 Q Learning 的经验回放池
    epsilon = initial_epsilon
    for episode_id in range(num_episodes):
        state = env.reset()             # 初始化环境,获得初始状态
        epsilon = max(                  # 计算当前探索率
            initial_epsilon * (num_exploration_episodes - episode_id) / num_exploration_episodes,
            final_epsilon)
        for t in range(max_len_episode):
            env.render()                                # 对当前帧进行渲染,绘图到屏幕
            if random.random() < epsilon:               # epsilon-greedy 探索策略,以 epsilon 的概率选择随机动作
                action = env.action_space.sample()      # 选择随机动作(探索)
            else:
                action = model.predict(np.expand_dims(state, axis=0)).numpy()   # 选择模型计算出的 Q Value 最大的动作
                action = action[0]

            # 让环境执行动作,获得执行完动作的下一个状态,动作的奖励,游戏是否已结束以及额外信息
            next_state, reward, done, info = env.step(action)
            # 如果游戏Game Over,给予大的负奖励
            reward = -10. if done else reward
            # 将(state, action, reward, next_state)的四元组(外加 done 标签表示是否结束)放入经验回放池
            replay_buffer.append((state, action, reward, next_state, 1 if done else 0))
            # 更新当前 state
            state = next_state

            if done:                                    # 游戏结束则退出本轮循环,进行下一个 episode
                print("episode %d, epsilon %f, score %d" % (episode_id, epsilon, t))
                break

            if len(replay_buffer) >= batch_size:
                # 从经验回放池中随机取一个批次的四元组,并分别转换为 NumPy 数组
                batch_state, batch_action, batch_reward, batch_next_state, batch_done = zip(
                    *random.sample(replay_buffer, batch_size))
                batch_state, batch_reward, batch_next_state, batch_done = \
                    [np.array(a, dtype=np.float32) for a in [batch_state, batch_reward, batch_next_state, batch_done]]
                batch_action = np.array(batch_action, dtype=np.int32)

                q_value = model(batch_next_state)
                y = batch_reward + (gamma * tf.reduce_max(q_value, axis=1)) * (1 - batch_done)  # 计算 y 值
                with tf.GradientTape() as tape:
                    loss = tf.keras.losses.mean_squared_error(  # 最小化 y 和 Q-value 的距离
                        y_true=y,
                        y_pred=tf.reduce_sum(model(batch_state) * tf.one_hot(batch_action, depth=2), axis=1)
                    )
                grads = tape.gradient(loss, model.variables)
                optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))       # 计算梯度并更新参数

對於不同的任務(或者說環境),我們需要根據任務的特點,設計不同的狀態以及採取合適的網絡來擬合 Q 函數。例如,如果我們考慮經典的打磚塊遊戲(Gym 環境庫中的 Breakout-v0 ),每一次執行動作(擋板向左、向右或不動),都會返回一個 210 * 160 * 3 的 RGB 圖片,表示當前屏幕畫面。爲了給打磚塊遊戲這個任務設計合適的狀態表示,我們有以下分析:

  • 磚塊的顏色信息並不是很重要,畫面轉換成灰度也不影響操作,因此可以去除狀態中的顏色信息(即將圖片轉爲灰度表示);

  • 小球移動的信息很重要,如果只知道單幀畫面而不知道小球往哪邊運動,即使是人也很難判斷擋板應當移動的方向。因此,必須在狀態中加入表徵小球運動方向的信息。一個簡單的方式是將當前幀與前面幾幀的畫面進行疊加,得到一個 210 * 160 * X (X 爲疊加幀數)的狀態表示;

  • 每幀的解析度不需要特別高,只要能大致表徵方塊、小球和擋板的位置以做出決策即可,因此對於每幀的長寬可做適當壓縮。

而考慮到我們需要從圖像信息中提取特徵,使用 CNN 作爲擬合 Q 函數的網絡將更爲適合。由此,將上面的 QNetwork 更換爲 CNN 網絡,並對狀態做一些修改,即可用於玩一些簡單的視頻遊戲。

Keras Pipeline *

以上示例均使用了 Keras 的 Subclassing API 建立模型,即對 tf.keras.Model 類進行擴展以定義自己的新模型,同時手工編寫了訓練和評估模型的流程。這種方式靈活度高,且與其他流行的深度學習框架(如 PyTorch、Chainer)共通,是本手冊所推薦的方法。不過在很多時候,我們只需要建立一個結構相對簡單和典型的神經網絡(比如上文中的 MLP 和 CNN),並使用常規的手段進行訓練。這時,Keras 也給我們提供了另一套更爲簡單高效的內置方法來建立、訓練和評估模型。

Keras Sequential/Functional API 模式建立模型

最典型和常用的神經網絡結構是將一堆層按特定順序疊加起來,那麼,我們是不是只需要提供一個層的列表,就能由 Keras 將它們自動首尾相連,形成模型呢?Keras 的 Sequential API 正是如此。通過向 tf.keras.models.Sequential() 提供一個層的列表,就能快速地建立一個 tf.keras.Model 模型並返回:

        model = tf.keras.models.Sequential([
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(100, activation=tf.nn.relu),
            tf.keras.layers.Dense(10),
            tf.keras.layers.Softmax()
        ])

不過,這種層疊結構並不能表示任意的神經網絡結構。爲此,Keras 提供了 Functional API,幫助我們建立更爲複雜的模型,例如多輸入 / 輸出或存在參數共享的模型。其使用方法是將層作爲可調用的對象並返回張量(這點與之前章節的使用方法一致),並將輸入向量和輸出向量提供給 tf.keras.Modelinputsoutputs 參數,示例如下:

        inputs = tf.keras.Input(shape=(28, 28, 1))
        x = tf.keras.layers.Flatten()(inputs)
        x = tf.keras.layers.Dense(units=100, activation=tf.nn.relu)(x)
        x = tf.keras.layers.Dense(units=10)(x)
        outputs = tf.keras.layers.Softmax()(x)
        model = tf.keras.Model(inputs=inputs, outputs=outputs)

使用 Keras Model 的 compilefitevaluate 方法訓練和評估模型

當模型建立完成後,通過 tf.keras.Modelcompile 方法配置訓練過程:

    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss=tf.keras.losses.sparse_categorical_crossentropy,
        metrics=[tf.keras.metrics.sparse_categorical_accuracy]
    )

tf.keras.Model.compile 接受 3 個重要的參數:

  • oplimizer :優化器,可從 tf.keras.optimizers 中選擇;

  • loss :損失函數,可從 tf.keras.losses 中選擇;

  • metrics :評估指標,可從 tf.keras.metrics 中選擇。

接下來,可以使用 tf.keras.Modelfit 方法訓練模型:

    model.fit(data_loader.train_data, data_loader.train_label, epochs=num_epochs, batch_size=batch_size)

tf.keras.Model.fit 接受 5 個重要的參數:

  • x :訓練數據;

  • y :目標數據(數據標籤);

  • epochs :將訓練數據疊代多少遍;

  • batch_size :批次的大小;

  • validation_data :驗證數據,可用於在訓練過程中監控模型的性能。

Keras 支持使用 tf.data.Dataset 進行訓練,詳見 tf.data

最後,使用 tf.keras.Model.evaluate 評估訓練效果,提供測試數據及標籤即可:

    print(model.evaluate(data_loader.test_data, data_loader.test_label))

自定義層、損失函數和評估指標 *

可能你還會問,如果現有的這些層無法滿足我的要求,我需要定義自己的層怎麼辦?事實上,我們不僅可以繼承 tf.keras.Model 編寫自己的模型類,也可以繼承 tf.keras.layers.Layer 編寫自己的層。

自定義層

自定義層需要繼承 tf.keras.layers.Layer 類,並重寫 __init__buildcall 三個方法,如下所示:

class MyLayer(tf.keras.layers.Layer):
    def __init__(self):
        super().__init__()
        # 初始化代碼

    def build(self, input_shape):     # input_shape 是一個 TensorShape 類型對象,提供輸入的形狀
        # 在第一次使用該層的時候調用該部分代碼,在這裡創建變量可以使得變量的形狀自適應輸入的形狀
        # 而不需要使用者額外指定變量形狀。
        # 如果已經可以完全確定變量的形狀,也可以在__init__部分創建變量
        self.variable_0 = self.add_weight(...)
        self.variable_1 = self.add_weight(...)

    def call(self, inputs):
        # 模型調用的代碼(處理輸入並返回輸出)
        return output

例如,如果我們要自己實現一個 本章第一節 中的全連接層( tf.keras.layers.Dense ),可以按如下方式編寫。此代碼在 build 方法中創建兩個變量,並在 call 方法中使用創建的變量進行運算:

class LinearLayer(tf.keras.layers.Layer):
    def __init__(self, units):
        super().__init__()
        self.units = units

    def build(self, input_shape):     # 这里 input_shape 是第一次运行call()时参数inputs的形状
        self.w = self.add_variable(name='w',
            shape=[input_shape[-1], self.units], initializer=tf.zeros_initializer())
        self.b = self.add_variable(name='b',
            shape=[self.units], initializer=tf.zeros_initializer())

    def call(self, inputs):
        y_pred = tf.matmul(inputs, self.w) + self.b
        return y_pred

在定義模型的時候,我們便可以如同 Keras 中的其他層一樣,調用我們自定義的層 LinearLayer

class LinearModel(tf.keras.Model):
    def __init__(self):
        super().__init__()
        self.layer = LinearLayer(units=1)

    def call(self, inputs):
        output = self.layer(inputs)
        return output

自定義損失函數和評估指標

自定義損失函數需要繼承 tf.keras.losses.Loss 類,重寫 call 方法即可,輸入真實值 y_true 和模型預測值 y_pred ,輸出模型預測值和真實值之間通過自定義的損失函數計算出的損失值。下面的示例爲均方差損失函數:

class MeanSquaredError(tf.keras.losses.Loss):
    def call(self, y_true, y_pred):
        return tf.reduce_mean(tf.square(y_pred - y_true))

自定義評估指標需要繼承 tf.keras.metrics.Metric 類,並重寫 __init__update_stateresult 三個方法。下面的示例對前面用到的 SparseCategoricalAccuracy 評估指標類做了一個簡單的重實現:

class SparseCategoricalAccuracy(tf.keras.metrics.Metric):
    def __init__(self):
        super().__init__()
        self.total = self.add_weight(name='total', dtype=tf.int32, initializer=tf.zeros_initializer())
        self.count = self.add_weight(name='count', dtype=tf.int32, initializer=tf.zeros_initializer())

    def update_state(self, y_true, y_pred, sample_weight=None):
        values = tf.cast(tf.equal(y_true, tf.argmax(y_pred, axis=-1, output_type=tf.int32)), tf.int32)
        self.total.assign_add(tf.shape(y_true)[0])
        self.count.assign_add(tf.reduce_sum(values))

    def result(self):
        return self.count / self.total
LeCun1998
  1. LeCun, L. Bottou, Y. Bengio, and P. Haffner. “Gradient-based learning applied to document recognition.” Proceedings of the IEEE, 86(11):2278-2324, November 1998. http://yann.lecun.com/exdb/mnist/

Graves2013

Graves, Alex. 「Generating Sequences With Recurrent Neural Networks.」 ArXiv:1308.0850 [Cs], August 4, 2013. http://arxiv.org/abs/1308.0850.

Mnih2013

Mnih, Volodymyr, Koray Kavukcuoglu, David Silver, Alex Graves, Ioannis Antonoglou, Daan Wierstra, and Martin Riedmiller. 「Playing Atari with Deep Reinforcement Learning.」 ArXiv:1312.5602 [Cs], December 19, 2013. http://arxiv.org/abs/1312.5602.