計算速度優(yōu)化
- 前面的計算都是針對輸入一個樣本漠酿,然后更新一次權重。這里將代碼改成矩陣運算谎亩,每次批量計算mini_batch對權重的更改炒嘲。下面把這章節(jié)的代碼和該系列文章二的代碼運算速度對比,結果如下:
參數(shù):
net.SGD(training_data, 10, 10, 0.5, test_data, False) # 全樣本
二 :0:01:19.567001
三(下) :0:00:42.725754
- 針對前面提到過的采用softmax作為輸出層函數(shù)匈庭,和似然函數(shù)作為損失函數(shù)結(輸入樣本x輸出a真實值為y夫凸, y對應真實值位置k與則這cost:- LOGe(a[k]), a理解為x被分為y每類對應的概率阱持; sum(y)=1夭拌,這是softmax函數(shù)導致的。當預測越接近真實值衷咽,a[k]越接近1鸽扁, 即 - LOGe(a[k])越接近0)。這里給出一些學習softmax函數(shù)的鏈接ufldl.stanford.edu镶骗、csdn
代碼如下
# encoding: utf-8
"""
@version: python3.5.2
@author: kaenlee @contact: lichaolfm@163.com
@software: PyCharm Community Edition
@time: 2017/8/16 11:09
purpose:
"""
# 輸出層采用softmax
# 似然函數(shù)作為損失函數(shù)
# minibatch訓練采用矩陣乘法曾快計算
# dropout 應對過度擬合
import numpy as np
from tensorflow.examples.tutorials.mnist import input_data
import random
from functools import reduce
import operator
import datetime as dt
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib as mp
mp.style.use('ggplot')
# 各個層仍然會用到s函數(shù)
def Sigmod(z):
return 1 / (1 + np.exp(-z))
def SigmodPrime(z):
"""對S函數(shù)求導"""
return Sigmod(z) * (1 - Sigmod(z))
class CrossEntropyLossFunc:
@staticmethod
def loss(A, Y):
"""
計算cost
:param A: N X 10 桶现,N:樣本的數(shù)量
:param Y: N X 10
"""
# 對應的輸出index
index = np.argmax(Y, axis=1)
CS = [-np.log(A[row, col]) for row, col in zip(range(len(index)), index)]
return np.sum(np.nan_to_num(CS)) / len(index) # 似然損失函數(shù)計算方法
@staticmethod
def delta(A, Y):
# L的誤差向量即偏倒(C-b)
return A - Y # 每行對應一個樣本L層delta向量
class NetWorks:
# 定義一個神經(jīng)網(wǎng)絡,也就是定義每一層的權重以及偏置
def __init__(self, size, lossFunc):
"""
給出每層的節(jié)點數(shù)量鼎姊,包含輸出輸出層
:param size: list
"""
self.size = size
self.Layers = len(size)
self.initializeWeightBias()
self.lossFunc = lossFunc
def initializeWeightBias(self):
# 普通的初始化權重方法骡和, 后面會給出更好的
self.bias = [np.random.randn(num) for num in self.size[1:]] # 輸入層沒有bias
# 每層的權重取決于row取決于該層的節(jié)點數(shù)量相赁,從來取決于前面一層的輸出即節(jié)點數(shù)
self.weight = [np.random.randn(row, col) for row, col in zip(self.size[1:], self.size[:-1])]
def Feedward(self, X):
"""
:param X:輸入向量矩陣 , array
:return:
"""
for b, w in zip(self.bias, self.weight):
Z = X.dot(w.T) + b # 帶全輸入信號 N X ?
X = Sigmod(Z) # 輸出信號, 每行代表一個樣本 N X ?
# 最后一層輸出需要除以輸出的和
total = np.sum(X, axis=1)
total.shape = -1, 1
return X / total # N X 10
def SGD(self, training_data, epochs, minibatch_size, eta, test_data=None, isplot=False):
"""
隨機梯度下降法
:param training_data:輸入模型訓練數(shù)據(jù)@[input, output] # 輸入的數(shù)據(jù)格式變化
:param epochs: 迭代的期數(shù)@ int
:param minibatch_size: 每次計算梯度向量的取樣數(shù)量
:param eta: 學習速率
:param p: 每次dropout的神經(jīng)元百分比
:param test_data: 訓練數(shù)據(jù)
:return:
"""
trainX = training_data[0]
trainY = training_data[1]
if test_data:
testX = test_data[0]
testY = test_data[1]
n_test = len(testY)
n = len(trainY)
accuracy_train = []
accuracy_test = []
cost_train = []
cost_test = []
for e in range(epochs):
# 每個迭代器抽樣前先打亂數(shù)據(jù)的順序
indices = np.arange(n)
random.shuffle(indices)
trainX = trainX[indices]
trainY = trainY[indices]
batchXs = [trainX[k:(k + minibatch_size)] for k in range(0, n, minibatch_size)]
batchYs = [trainY[k:(k + minibatch_size)] for k in range(0, n, minibatch_size)]
for batchX, batchY in zip(batchXs, batchYs):
self.Update_miniBatchs(batchX, batchY, eta)
if test_data:
totall_predRight = self.Evalueate(test_data)
print('Epoch {0}: {1}/{2}'.format(e, totall_predRight, n_test))
if isplot:
accuracy_test.append(totall_predRight / n_test)
cost_test.append(self.lossFunc.loss(self.Feedward(testX), testY))
if isplot:
accuracy_train.append(self.Evalueate(training_data) / n)
# 計算訓練數(shù)據(jù)的cost 即loss
cost_train.append(self.lossFunc.loss(self.lossFunc.loss(trainX), trainY))
if isplot:
plt.figure()
plt.plot(np.arange(1, epochs + 1), accuracy_train, label='train')
plt.plot(np.arange(1, epochs + 1), accuracy_test, label='test')
axis = plt.gca()
axis_01 = plt.twinx(axis)
axis_01.plot(np.arange(1, epochs + 1), cost_train, label='cost')
plt.xlabel('epoch')
plt.legend()
plt.savefig('dropout.png')
plt.close()
def Update_miniBatchs(self, batchX, batchY, eta):
"""
對mini_batch采用梯度下降法,對網(wǎng)絡的權重進行更新
:param mini_batch:
:param eta:
:return:
"""
# 批量計算每個樣本對權重改變
Cprime_bs, Cprime_ws = self.BackProd(batchX, batchY)
self.bias = [bias - eta * change for bias, change in zip(self.bias, Cprime_bs)]
self.weight = [weight - eta * change for weight, change in zip(self.weight, Cprime_ws)]
def BackProd(self, batchX, batchY):
"""
:param batchX: N X 748
:param batchY: N X 10
"""
n = len(batchY) # 樣本的數(shù)量
# 每層都會有n個z, a
zs_n = [] # 每層的加權輸入向量, 第一層沒有(輸入層)n X ?(取決于每層的神經(jīng)元個數(shù)) X layers -1
activations_n = [batchX] # 每層的輸出信號慰于,第一層為xmat本身 n X ? X layers
# 計算2...L的權重和偏置(n組)
for b, w in zip(self.bias, self.weight):
z_n = activations_n[-1].dot(w.T) + b
zs_n.append(z_n) # 從第二層開始保存帶權輸入噪生,size-1個
activations_n.append(Sigmod(z_n)) # 輸出信號a
# 計算輸出層L每個節(jié)點的delta
delta_L = self.lossFunc.delta(activations_n[-1], batchY) # n X 10
Cprime_bs = [delta_L] # 輸出成L的c對b偏倒等于delta_L
Cprime_ws = [[np.array(np.mat(delta_L[i]).T * np.mat(activations_n[-2][i])) for i in
range(n)]] # c對w的騙到等于前一層的輸出信號裝置乘當前層的誤差
# 計算所有的層的誤差
temp = delta_L
for i in range(1, self.Layers - 1):
# 僅僅需要計算到第二層(且最后一層已知),當前層的delta即b可以用下一層的w东囚、delta表示和當前z表示
# 從倒數(shù)第二層開始求解
x1 = temp.dot(self.weight[-i]) # 下一層的權重的裝置乘下一層的delta
x2 = SigmodPrime(zs_n[-i - 1]) # 當前層的帶權輸入
delta_now = x1 * x2
Cprime_bs.append(delta_now)
Cprime_ws.append([np.array(np.mat(delta_now[j]).T * np.mat(activations_n[-i - 2][j])) for j in range(n)])
temp = delta_now
# 把每個樣本的求解權重進行加總并取平均
Cprime_bs = [np.sum(bn, axis=0) / n for bn in Cprime_bs]
Cprime_ws = [reduce(operator.add, wn) / n for wn in Cprime_ws]
# print([len(b) for b in Cprime_bs])
# print([w.shape for w in Cprime_ws])
# 改變輸出的順序
Cprime_bs.reverse()
Cprime_ws.reverse()
return (Cprime_bs, Cprime_ws)
def Evalueate(self, test_data):
"""
評估模型
:param test_data:
:return:返回預測正確的數(shù)量@int
"""
# 最大數(shù)字位置相對應記為正確
testX = test_data[0]
testY = test_data[1]
n_test = len(testY)
res_pred = np.argmax(self.Feedward(testX), axis=1) == np.argmax(testY, axis=1)
return sum(res_pred)
if __name__ == '__main__':
mnist = input_data.read_data_sets(r'D:\PycharmProjects\HandWritingRecognition\TF\data', one_hot=True)
training_data = [mnist.train.images, mnist.train.labels]
test_data = [mnist.test.images, mnist.test.labels]
net = NetWorks([784, 20, 10], CrossEntropyLossFunc)
X = test_data[0][:3]
Y = test_data[1][:3]
# print(net.Feedward(X))
# print(net.BackProd(X, Y))
start = dt.datetime.now()
net.SGD(training_data, 10, 10, 0.5, test_data, isplot=False)
print(dt.datetime.now() - start)
DropOut
文(三)是針對解決過度擬合的問題跺嗽,回歸主題。這里補充上(三)上的dropout代碼
1.等比例隨機刪除隱藏層的p比例節(jié)點页藻,備份一份權重偏置數(shù)據(jù)
2.剩下的節(jié)點按自己原有權重桨嫁,進行一次更新
3.將更新的權重,覆蓋備份數(shù)據(jù)中對應位置的權重
4.預測取權重(1-p)比例進行預測份帐,預測后將權重還原
5.回到步驟1
# encoding: utf-8
"""
@version: python3.5.2
@author: kaenlee @contact: lichaolfm@163.com
@software: PyCharm Community Edition
@time: 2017/8/16 11:09
purpose:
"""
# 輸出層采用softmax
# 似然函數(shù)作為損失函數(shù)
# minibatch訓練采用矩陣乘法曾快計算
# dropout 應對過度擬合
import numpy as np
from tensorflow.examples.tutorials.mnist import input_data
import random
from functools import reduce
import operator
import datetime as dt
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib as mp
mp.style.use('ggplot')
# 各個層仍然會用到s函數(shù)
def Sigmod(z):
return 1 / (1 + np.exp(-z))
def SigmodPrime(z):
"""對S函數(shù)求導"""
return Sigmod(z) * (1 - Sigmod(z))
class CrossEntropyLossFunc:
@staticmethod
def loss(A, Y):
"""
計算cost
:param A: N X 10 璃吧,N:樣本的數(shù)量
:param Y: N X 10
"""
# 對應的輸出index
index = np.argmax(Y, axis=1)
CS = [-np.log(A[row, col]) for row, col in zip(range(len(index)), index)]
return np.sum(np.nan_to_num(CS)) / len(index) # 似然損失函數(shù)計算方法
@staticmethod
def delta(A, Y):
# L的誤差向量即偏倒(C-b)
return A - Y # 每行對應一個樣本L層delta向量
class NetWorks:
# 定義一個神經(jīng)網(wǎng)絡,也就是定義每一層的權重以及偏置
def __init__(self, size, lossFunc):
"""
給出每層的節(jié)點數(shù)量废境,包含輸出輸出層
:param size: list
"""
self.size = size
self.Layers = len(size)
self.initializeWeightBias()
self.lossFunc = lossFunc
def initializeWeightBias(self):
# 普通的初始化權重方法畜挨, 后面會給出更好的
self.bias = [np.random.randn(num) for num in self.size[1:]] # 輸入層沒有bias
# 每層的權重取決于row取決于該層的節(jié)點數(shù)量,從來取決于前面一層的輸出即節(jié)點數(shù)
self.weight = [np.random.randn(row, col) for row, col in zip(self.size[1:], self.size[:-1])]
def Feedward(self, X, p, ISpredtest=True):
"""
:param X:輸入向量矩陣 , array
:return:
"""
if ISpredtest:
# 這個主要用來預測函數(shù)噩凹, 權重要乘以1-p
weight = self.weight.copy()
bias = self.bias.copy()
self.bias = [(1 - p) * b for b in bias]
self.weight = [(1 - p) * w for w in weight]
for b, w in zip(self.bias, self.weight):
Z = X.dot(w.T) + b # 帶全輸入信號 N X ?
X = Sigmod(Z) # 輸出信號, 每行代表一個樣本 N X ?
if ISpredtest:
# 每個迭代器期都會預測巴元, 預測后需要將權重返還
self.weight = weight
self.bias = bias
# 最后一層輸出需要除以輸出的和
total = np.sum(X, axis=1)
total.shape = -1, 1
return X / total # N X 10
def DropOut(self, p):
# 給出隱藏層隱藏層刪除的節(jié)點
# print(p)
weight = self.weight.copy() # 被這個copy坑死了
# print('that', weight[-1].shape)
bias = self.bias
n = len(weight)
updateW = []
updateB = []
size = self.size[1:] # 輸入層沒有權重
save = []
for i in range(0, n - 1): # 保留全部輸出
# 刪除隱藏層的部分節(jié)點
saveIndex = [] # 無放回的抽樣
sample_num = int(size[i] * (1 - p))
while len(saveIndex) != sample_num:
index = np.random.randint(size[i])
if index not in saveIndex:
saveIndex.append(index)
# print(size[i], saveIndex)
saveIndex = sorted(saveIndex)
save.append(saveIndex)
updateW.append(self.weight[i][saveIndex])
updateB.append(self.bias[i][saveIndex])
# 當刪除當前層節(jié)點個數(shù),后面一層的每個節(jié)點w權重個數(shù)也相應減少
self.weight[i + 1] = self.weight[i + 1][:, saveIndex]
# print(weight[i])
# print((bias[i]))
# print(updateB)
# print(updateW)
updateW.append(self.weight[-1]) # 保留輸出層全部權重
updateB.append(self.bias[-1])
save.append(np.arange(size[-1]))
self.weight = updateW
self.bias = updateB
# print('here', weight[-1].shape)
return weight, bias, save
def SGD(self, training_data, epochs, minibatch_size, eta, p, test_data=None, isplot=False):
"""
隨機梯度下降法
:param training_data:輸入模型訓練數(shù)據(jù)@[input, output] # 輸入的數(shù)據(jù)格式變化
:param epochs: 迭代的期數(shù)@ int
:param minibatch_size: 每次計算梯度向量的取樣數(shù)量
:param eta: 學習速率
:param p: 每次dropout的神經(jīng)元百分比
:param test_data: 訓練數(shù)據(jù)
:return:
"""
trainX = training_data[0]
trainY = training_data[1]
if test_data:
testX = test_data[0]
testY = test_data[1]
n_test = len(testY)
n = len(trainY)
accuracy_train = []
accuracy_test = []
cost_train = []
cost_test = []
for e in range(epochs):
# 每個迭代器抽樣前先打亂數(shù)據(jù)的順序
indices = np.arange(n)
random.shuffle(indices)
trainX = trainX[indices]
trainY = trainY[indices]
batchXs = [trainX[k:(k + minibatch_size)] for k in range(0, n, minibatch_size)]
batchYs = [trainY[k:(k + minibatch_size)] for k in range(0, n, minibatch_size)]
for batchX, batchY in zip(batchXs, batchYs):
weightBackup, biasBackup, save = self.DropOut(p)
# print(self.bias)
# print(self.weight)
self.Update_miniBatchs(batchX, batchY, eta)
# 更新完后的權重和加入的權重相結合
for i in range(self.Layers - 1):
# print('i', i)
biasBackup[i][save[i]] = self.bias[i]
if i == 0:
# L2的層僅僅減少節(jié)點個數(shù)并沒有改變每個節(jié)點權重個數(shù)驮宴,因為輸出層沒有變
weightBackup[i][save[i]] = self.weight[i]
else:
row = save[i]
col = save[i - 1]
# print(row, col)
# print(type(weightBackup[i]))
# print(weightBackup[i].shape)
weightBackup[i][row, :][:, col] = self.weight[i]
self.weight = weightBackup
self.bias = biasBackup
if test_data:
totall_predRight = self.Evalueate(test_data, p)
print('Epoch {0}: {1}/{2}'.format(e, totall_predRight, n_test))
if isplot:
# ???計算test data 的cost需要 * 1-p 逮刨??堵泽?
accuracy_test.append(totall_predRight / n_test)
cost_test.append(self.lossFunc.loss(self.Feedward(testX, p), testY))
if isplot:
accuracy_train.append(self.Evalueate(training_data, p, False) / n)
# 計算訓練數(shù)據(jù)的cost 即loss
cost_train.append(self.lossFunc.loss(self.Feedward(trainX, p, False), trainY))
if isplot:
plt.figure()
plt.plot(np.arange(1, epochs + 1), accuracy_train, label='train')
plt.plot(np.arange(1, epochs + 1), accuracy_test, label='test')
axis = plt.gca()
axis_01 = plt.twinx(axis)
axis_01.plot(np.arange(1, epochs + 1), cost_train, label='cost')
plt.xlabel('epoch')
plt.legend()
plt.savefig('dropout.png')
plt.close()
def Update_miniBatchs(self, batchX, batchY, eta):
"""
對mini_batch采用梯度下降法,對網(wǎng)絡的權重進行更新
:param mini_batch:
:param eta:
:return:
"""
# 批量計算每個樣本對權重改變
Cprime_bs, Cprime_ws = self.BackProd(batchX, batchY)
self.bias = [bias - eta * change for bias, change in zip(self.bias, Cprime_bs)]
self.weight = [weight - eta * change for weight, change in zip(self.weight, Cprime_ws)]
def BackProd(self, batchX, batchY):
"""
:param batchX: N X 748
:param batchY: N X 10
"""
n = len(batchY) # 樣本的數(shù)量
# 每層都會有n個z, a
zs_n = [] # 每層的加權輸入向量修己, 第一層沒有(輸入層)n X ?(取決于每層的神經(jīng)元個數(shù)) X layers -1
activations_n = [batchX] # 每層的輸出信號,第一層為xmat本身 n X ? X layers
# 計算2...L的權重和偏置(n組)
# print(self.bias)
# print(self.weight)
for b, w in zip(self.bias, self.weight):
# print(w.shape)
z_n = activations_n[-1].dot(w.T) + b
zs_n.append(z_n) # 從第二層開始保存帶權輸入迎罗,size-1個
activations_n.append(Sigmod(z_n)) # 輸出信號a
# 計算輸出層L每個節(jié)點的delta
delta_L = self.lossFunc.delta(activations_n[-1], batchY) # n X 10
Cprime_bs = [delta_L] # 輸出成L的c對b偏倒等于delta_L
Cprime_ws = [[np.array(np.mat(delta_L[i]).T * np.mat(activations_n[-2][i])) for i in
range(n)]] # c對w的騙到等于前一層的輸出信號裝置乘當前層的誤差
# 計算所有的層的誤差
temp = delta_L
for i in range(1, self.Layers - 1):
# 僅僅需要計算到第二層(且最后一層已知)睬愤,當前層的delta即b可以用下一層的w、delta表示和當前z表示
# 從倒數(shù)第二層開始求解
x1 = temp.dot(self.weight[-i]) # 下一層的權重的裝置乘下一層的delta
x2 = SigmodPrime(zs_n[-i - 1]) # 當前層的帶權輸入
delta_now = x1 * x2
Cprime_bs.append(delta_now)
Cprime_ws.append([np.array(np.mat(delta_now[j]).T * np.mat(activations_n[-i - 2][j])) for j in range(n)])
temp = delta_now
# 把每個樣本的求解權重進行加總并取平均
Cprime_bs = [np.sum(bn, axis=0) / n for bn in Cprime_bs]
Cprime_ws = [reduce(operator.add, wn) / n for wn in Cprime_ws]
# print([len(b) for b in Cprime_bs])
# print([w.shape for w in Cprime_ws])
# 改變輸出的順序
Cprime_bs.reverse()
Cprime_ws.reverse()
return (Cprime_bs, Cprime_ws)
def Evalueate(self, test_data, p, IStest=True):
"""
評估模型
:param test_data:
:return:返回預測正確的數(shù)量@int
"""
# 最大數(shù)字位置相對應記為正確
testX = test_data[0]
testY = test_data[1]
n_test = len(testY)
res_pred = np.argmax(self.Feedward(testX, p, IStest), axis=1) == np.argmax(testY, axis=1)
return sum(res_pred)
if __name__ == '__main__':
mnist = input_data.read_data_sets(r'D:\PycharmProjects\HandWritingRecognition\TF\data', one_hot=True)
training_data = [mnist.train.images[:2000], mnist.train.labels[:2000]]
test_data = [mnist.test.images[:1000], mnist.test.labels[:1000]]
net = NetWorks([784, 100, 10], CrossEntropyLossFunc)
X = test_data[0][:3]
Y = test_data[1][:3]
# print(net.Feedward(X))
# print(net.BackProd(X, Y))
start = dt.datetime.now()
net.SGD(training_data, 100, 10, 3, 0.5, test_data, isplot=True)
print(dt.datetime.now() - start)
結果分析:雖然train和test的accuracy幾乎都是同時飽和纹安,但是cost缺還在下降尤辱,無法解釋。(難道pred train data的時候權重也需要乘以1-pW昝铩I犊獭!咪笑!有待考證)
dropout.png