在真正開始Tensorflow并行運(yùn)算代碼實(shí)現(xiàn)之前竞阐,我們首先了解一下Tensorflow系統(tǒng)結(jié)構(gòu)設(shè)計(jì)是如何完美的支持并行運(yùn)算的懦鼠。(參見(jiàn)博客)
1. Tensorflow系統(tǒng)概述
Tensorflow
的系統(tǒng)結(jié)構(gòu)以C API
為界爹梁,將整個(gè)系統(tǒng)分為前端
和后端
兩個(gè)子系統(tǒng)(見(jiàn)下圖)
- 前端:提供編程模型,負(fù)責(zé)構(gòu)造計(jì)算圖
- 后端:提供運(yùn)行環(huán)境火的,負(fù)責(zé)執(zhí)行計(jì)算圖
前端提供各種語(yǔ)言的庫(kù)(主要是Python),前端庫(kù)基于C API觸發(fā)tensorflow后端程序運(yùn)行震束。
后端的Distributed Runtime下的Distributed Master
根據(jù)Session.run()的參數(shù),從計(jì)算圖中反向遍歷当犯,找到所依賴的最小子圖
垢村,并將最小子圖分裂為多個(gè)子圖片段,派發(fā)給Work Service
嚎卫, 啟動(dòng)子圖片段的執(zhí)行過(guò)程嘉栓。
Kernel
主要包括一些具體操作,如卷積操作等拓诸。后端的最底層是網(wǎng)絡(luò)層和設(shè)備層侵佃。網(wǎng)絡(luò)層包括RPC和RDMA,負(fù)責(zé)傳遞神經(jīng)網(wǎng)絡(luò)的參數(shù)奠支。
??client馋辈、master 和 worker各組件的內(nèi)部工作原理
Client
基于tensorflow的編程接口來(lái)構(gòu)造計(jì)算圖, 主要為Python和C++ 編程接口倍谜,直到Session會(huì)話被建立tensorflow才開始工作迈螟,Session建立Client和后端運(yùn)行時(shí)的通道,將Graph發(fā)送給 Distributed Master
尔崔,如下為Client構(gòu)建了一個(gè)簡(jiǎn)單的計(jì)算Graph:
執(zhí)行Session.run運(yùn)算時(shí)答毫,
Master
將最小子圖分片派發(fā)給Work Service
。如下圖所示季春,PS
上放置模型參數(shù)洗搂,worker
上則執(zhí)行op。邊
被任務(wù)點(diǎn)分割载弄,Distributed Master
會(huì)將該邊分裂耘拇,并在兩個(gè)分布式任務(wù)之間插入send
和recv
節(jié)點(diǎn),實(shí)現(xiàn)數(shù)據(jù)傳遞侦锯。2. Tensorflow multi GPU
Tensorflow官網(wǎng)給出了單GPU運(yùn)行驼鞭,多GPU運(yùn)行的簡(jiǎn)單例子。這里需要注意的是:如果沒(méi)有指定運(yùn)行設(shè)備尺碰,會(huì)優(yōu)先選用GPU(如果有GPU的話)挣棕。
對(duì)于深度學(xué)習(xí)來(lái)說(shuō),Tensorflow的并行主要包括數(shù)據(jù)并行
和模型并行
參見(jiàn)博客
2.1 數(shù)據(jù)并行
每個(gè)GPU上的模型相同亲桥,喂以相同模型不同的訓(xùn)練樣本洛心。
數(shù)據(jù)并行根據(jù)參數(shù)更新方式的不同又可以分為同步數(shù)據(jù)并行
和異步數(shù)據(jù)并行
。
同步數(shù)據(jù)并行
:每個(gè)GPU根據(jù)loss計(jì)算各自的gradient题篷,匯總所有
GPU的gradient词身,求平均梯度,根據(jù)平均梯度更新模型參數(shù)番枚,具體過(guò)程見(jiàn)下圖法严。所以同步數(shù)據(jù)并行的速度取決于最慢的GPU损敷,當(dāng)各個(gè)GPU的性能相差不大時(shí)適用。
異步數(shù)據(jù)并行
:和同步并行的區(qū)別是深啤,不用等所有GPU的梯度拗馒,每個(gè)GPU均可更新參數(shù)。每個(gè)GPU每次取到的參數(shù)也是最新的溯街。據(jù)說(shuō)缺點(diǎn)是:參數(shù)容易移出最優(yōu)解诱桂。
數(shù)據(jù)并行,速度取決于最慢的GPU和中心服務(wù)器(分發(fā)數(shù)據(jù)呈昔,計(jì)算平均梯度的cpu/gpu)的快慢挥等。
2.2 模型并行
同一批訓(xùn)練樣本,將不同的模型計(jì)算部分分布在不同的計(jì)算設(shè)備上同時(shí)執(zhí)行模型并行堤尾,比如輸入層到隱層的計(jì)算放到gpu0上肝劲,隱層到輸出層的計(jì)算放到gpu1上。初始啟動(dòng)時(shí)gpu1是不工作的哀峻,要等gpu0輸出后才能運(yùn)行涡相。能保證對(duì)同一批數(shù)據(jù)的同步嗎?疑惑點(diǎn)??
多機(jī)多卡剩蟀,即client,master切威,worker不在同一臺(tái)機(jī)器上時(shí)稱之為分布式
3. 并行計(jì)算代碼實(shí)現(xiàn)
代碼中使用到的數(shù)據(jù)是自己寫的數(shù)據(jù)育特,參見(jiàn)數(shù)據(jù)讀取
3.1 同步數(shù)據(jù)并行
#!/usr/bin/env python
# _*_coding:utf-8 _*_
import tensorflow as tf
from tensorflow.python.client import device_lib
import os
import time
# 設(shè)置tf記錄那些信息,這里有以下參數(shù):
# 0 = all messages are logged (default behavior)
# 1 = INFO messages are not printed
# 2 = INFO and WARNING messages are not printed
# 3 = INFO, WARNING, and ERROR messages are not printed
# 在Linux下先朦,運(yùn)行python程序前缰冤,使用的語(yǔ)句是$ export TF_CPP_MIN_LOG_LEVEL=2
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
################# 獲取當(dāng)前設(shè)備上的所有GPU ##################
def check_available_gpus():
local_devices = device_lib.list_local_devices()
gpu_names = [x.name for x in local_devices if x.device_type == 'GPU']
gpu_num = len(gpu_names)
print('{0} GPUs are detected : {1}'.format(gpu_num, gpu_names))
return gpu_num # 返回GPU個(gè)數(shù)
# 設(shè)置使用設(shè)備上的哪些GPU,這樣設(shè)置后喳魏,實(shí)際的GPU12對(duì)我的程序來(lái)說(shuō)就是GPU0
os.environ['CUDA_VISIBLE_DEVICES'] = '12, 13, 14, 15'
N_GPU = 4 # 定義GPU個(gè)數(shù)
# 定義網(wǎng)絡(luò)中需要使用一些參數(shù)
BATCH_SIZE = 100*N_GPU
LEARNING_RATE = 0.001
EPOCHS_NUM = 1000
NUM_THREADS = 10
# 定義讀取數(shù)據(jù)和保存模型的路徑
MODEL_SAVE_PATH = 'data/tmp/logs_and_models/'
MODEL_NAME = 'model.ckpt'
DATA_PATH = 'data/test_data.tfrecord'
# Dataset的解析函數(shù)
def _parse_function(example_proto):
dics = {
'sample': tf.FixedLenFeature([5], tf.int64),
'label': tf.FixedLenFeature([], tf.int64)}
parsed_example = tf.parse_single_example(example_proto, dics)
parsed_example['sample'] = tf.cast(parsed_example['sample'], tf.float32)
parsed_example['label'] = tf.cast(parsed_example['label'], tf.float32)
return parsed_example
# 讀取數(shù)據(jù)并根據(jù)GPU個(gè)數(shù)進(jìn)行均分
def _get_data(tfrecord_path = DATA_PATH, num_threads = NUM_THREADS, num_epochs = EPOCHS_NUM, batch_size = BATCH_SIZE, num_gpu = N_GPU):
dataset = tf.data.TFRecordDataset(tfrecord_path)
new_dataset = dataset.map(_parse_function, num_parallel_calls=num_threads)# 同時(shí)設(shè)置了多線程
# 這里需要注意的一個(gè)點(diǎn)是棉浸,目前從代碼運(yùn)行來(lái)看,shuffle必須放在repeat前面刺彩,才能正確運(yùn)行迷郑。否則會(huì)報(bào)錯(cuò): Out of Range
shuffle_dataset = new_dataset.shuffle(buffer_size=10000)# shuffle打亂順序
repeat_dataset = shuffle_dataset.repeat(num_epochs)# 定義重復(fù)訓(xùn)練多少次全部樣本
batch_dataset = repeat_dataset.batch(batch_size=batch_size)
iterator = batch_dataset.make_one_shot_iterator()# 創(chuàng)建迭代器
next_element = iterator.get_next()
x_split = tf.split(next_element['sample'], num_gpu)
y_split = tf.split(next_element['label'], num_gpu)
return x_split, y_split
# 由于對(duì)命名空間不理解登失,且模型的參數(shù)比較少魂仍,把參數(shù)的初始化放在外面,運(yùn)行前只初始化一次兰怠。
# 但是畦攘,當(dāng)模型參數(shù)多的時(shí)候霸妹,這樣定義幾百個(gè)會(huì)崩潰的。之后會(huì)詳細(xì)介紹一下TF中共享變量的定義知押,解決此問(wèn)題叹螟。
def _init_parameters():
w1 = tf.get_variable('w1', shape=[5, 10], initializer=tf.random_normal_initializer(mean=0, stddev=1, seed=9))
b1 = tf.get_variable('b1', shape=[10], initializer=tf.random_normal_initializer(mean=0, stddev=1, seed=1))
w2 = tf.get_variable('w2', shape=[10, 1], initializer=tf.random_normal_initializer(mean=0, stddev=1, seed=0))
b2 = tf.get_variable('b2', shape=[1], initializer=tf.random_normal_initializer(mean=0, stddev=1, seed=2))
return w1, w2, b1, b2
# 計(jì)算平均梯度鹃骂,平均梯度是對(duì)樣本個(gè)數(shù)的平均
def average_gradients(tower_grads):
avg_grads = []
# grad_and_vars代表不同的參數(shù)(含全部gpu),如四個(gè)gpu上對(duì)應(yīng)w1的所有梯度值
for grad_and_vars in zip(*tower_grads)
grads = []
for g, _ in grad_and_vars:# 這里循環(huán)的是不同gpu
expanded_g = tf.expand_dims(g, 0) # 擴(kuò)展一個(gè)維度代表gpu罢绽,如w1=shape(5,10), 擴(kuò)展后變?yōu)閟hape(1,5,10)
grads.append(expanded_g)
grad = tf.concat(grads, 0) # 在第一個(gè)維度上合并
grad = tf.reduce_mean(grad, 0)# 求平均
v = grad_and_vars[0][1] # v 是變量
grad_and_var = (grad, v) # 這里是將平均梯度和變量對(duì)應(yīng)起來(lái)
# 將不同變量的平均梯度append一起
avg_grads.append(grad_and_var)
# return average gradients
return avg_grads
# 初始化變量
w1, w2, b1, b2 = _init_parameters()
# 獲取訓(xùn)練樣本
x_split, y_split = _get_data()
# 建立優(yōu)化器
opt = tf.train.GradientDescentOptimizer(LEARNING_RATE)
tower_grads = []
# 將神經(jīng)網(wǎng)絡(luò)中前傳過(guò)程分配給不同的gpu訓(xùn)練不同的樣本
for i in range(N_GPU):
with tf.device("/gpu:%d" % i):
y_hidden = tf.nn.relu(tf.matmul(x_split[i], w1) + b1)
y_out = tf.matmul(y_hidden, w2) + b2
y_out = tf.reshape(y_out, [-1])
cur_loss = tf.nn.sigmoid_cross_entropy_with_logits(logits=y_out, labels=y_split[i], name=None)
grads = opt.compute_gradients(cur_loss)
tower_grads.append(grads)
###### 這里建立一個(gè)session主要是想獲取參數(shù)的具體數(shù)值畏线,以查看是否對(duì)于每一個(gè)gpu來(lái)說(shuō)都沒(méi)有更新參數(shù)。
##### 當(dāng)然有缆,這里從程序也能看出象踊,在每個(gè)gpu上只是計(jì)算梯度,并沒(méi)有更新參數(shù)棚壁。
with tf.Session(config=tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)) as sess:
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
sess.run(tf.global_variables_initializer())
sess.run(tf.local_variables_initializer())
sess.run(tower_grads)
print('=============== parameter test sy =========')
print(i)
print(sess.run(b1))
coord.request_stop()
coord.join(threads)
# 計(jì)算平均梯度
grads = average_gradients(tower_grads)
# 用平均梯度更新模型參數(shù)
apply_gradient_op = opt.apply_gradients(grads)
# allow_soft_placement是當(dāng)指定的設(shè)備如gpu不存在是杯矩,用可用的設(shè)備來(lái)處理。
# log_device_placement是記錄哪些操作在哪個(gè)設(shè)備上完成的信息
with tf.Session(config=tf.ConfigProto(allow_soft_placement=True, log_device_placement=True)) as sess:
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
sess.run(tf.global_variables_initializer())
sess.run(tf.local_variables_initializer())
for step in range(1000):
start_time = time.time()
sess.run(apply_gradient_op)
duration = time.time() - start_time
if step != 0 and step % 100 == 0:
num_examples_per_step = BATCH_SIZE * N_GPU
examples_per_sec = num_examples_per_step / duration
sec_per_batch = duration / N_GPU
print('step:', step, grads, examples_per_sec, sec_per_batch)
print('=======================parameter b1============ :')
print(sess.run(b1))
coord.request_stop()
coord.join(threads)
- 計(jì)算所有g(shù)pu的平均梯度袖外,再更新參數(shù)史隆。
- 計(jì)算所有g(shù)pu平均損失函數(shù),用梯度更新參數(shù)
- 各個(gè)gpu得到新參數(shù)曼验,再平均更新參數(shù)泌射,這樣有沒(méi)有影響,參數(shù)暫時(shí)怎么存放等鬓照。
1和2的結(jié)果理論上應(yīng)該是相同的
3.2 異步數(shù)據(jù)并行
'''
這里先定義訓(xùn)練模型熔酷,利用optimizer.minimize()直接更新模型參數(shù)
'''
def _model_nn(w1, w2, b1, b2, x_split, y_split, i_gpu):
y_hidden = tf.nn.relu(tf.matmul(x_split[i_gpu], w1) + b1)
y_out = tf.matmul(y_hidden, w2) + b2
y_out = tf.reshape(y_out, [-1])
loss = tf.nn.sigmoid_cross_entropy_with_logits(logits=y_out, labels=y_split[i_gpu], name=None)
opt = tf.train.GradientDescentOptimizer(LEARNING_RATE)
train = opt.minimize(loss)
return train
w1, w2, b1, b2 = _init_parameters()
x_split, y_split = _get_data()
for i in range(N_GPU):
with tf.device("/gpu:%d" % i):
train = _model_nn(w1, w2, b1, b2, x_split, y_split, i)
##### 同樣,這里建立session主要是為了檢查在每個(gè)gpu的時(shí)候豺裆,變量是否更新了拒秘。
with tf.Session(config=tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)) as sess:
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
sess.run(tf.global_variables_initializer())
sess.run(tf.local_variables_initializer())
sess.run(train)
print('=============== parameter test Asy =========')
print(i)
print(sess.run(b1))
coord.request_stop()
coord.join(threads)
with tf.Session(config=tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)) as sess:
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
sess.run(tf.global_variables_initializer())
sess.run(tf.local_variables_initializer())
for step in range(2):
sess.run(train)
print('======================= parameter b1 ================ :')
print('step:', step, sess.run(b1)
coord.request_stop()
coord.join(threads)
4. 共享變量
之前提到了我們?cè)诙x多層變量時(shí),一個(gè)一個(gè)定義權(quán)重和偏置臭猜,對(duì)于大型網(wǎng)絡(luò)是不太現(xiàn)實(shí)和讓人崩潰的躺酒。所有就有了tf.variable_scope 和 tf.name_scope()。
-
tf.name_scope()
主要是與Variable配合蔑歌,方便參數(shù)命名管理 -
tf.variable_scope
與tf.get_variable
配合使用羹应,實(shí)現(xiàn)變量共享
tf.name_scope
命名空間是便于管理變量,不同命名空間下的用Variable
定義的變量名允許相同次屠≡捌ィ可以理解為名字相同,但是姓(命名空間)不同帅矗,指向的也是不同變量偎肃。
而tf.get_variable()
定義的變量不受命名空間的限制(主要是用于共享變量,避免大型網(wǎng)絡(luò)結(jié)構(gòu)中定義過(guò)多的模型參數(shù)浑此。
我們主要看tf.variable_scope()
的使用累颂。
tf.variable_scope(
name_or_scope, # name
default_name=None,
values=None,
initializer=None,
regularizer=None,
caching_device=None,
partitioner=None,
custom_getter=None,
reuse=None, # True, None, or tf.AUTO_REUSE;
# if True, we go into reuse mode for this scope as well as all sub-scopes;
# if tf.AUTO_REUSE, we create variables if they do not exist, and return them otherwise;
# if None, we inherit the parent scope's reuse flag. When eager execution is enabled, new variables are always created unless an EagerVariableStore or template is currently active.
dtype=None,
use_resource=None,
constraint=None,
auxiliary_name_scope=True
)
tf.variable_scope()可以理解為從某個(gè)name的籃子里取東西。在這個(gè)籃子里,只要名字相同紊馏,下次可以反復(fù)的用這個(gè)變量料饥。
tf.variable_scope()可以節(jié)省內(nèi)存,官網(wǎng)的例子:
import tensorflow as tf
def my_image_filter():
conv1_weights = tf.Variable(tf.random_normal([5, 5, 32, 32]),
name="conv1_weights")
conv1_biases = tf.Variable(tf.zeros([32]), name="conv1_biases")
conv2_weights = tf.Variable(tf.random_normal([5, 5, 32, 32]),
name="conv2_weights")
conv2_biases = tf.Variable(tf.zeros([32]), name="conv2_biases")
return
# First call creates one set of 4 variables.
result1 = my_image_filter()
# Another set of 4 variables is created in the second call.
result2 = my_image_filter()
# 獲取所有的可訓(xùn)練變量
vs = tf.trainable_variables()
print('There are %d train_able_variables in the Graph: ' % len(vs))
for v in vs:
print(v)
這是官網(wǎng)上的例子朱监,從輸出可以看出調(diào)用my_image_fileter()兩次岸啡,會(huì)有8個(gè)變量
There are 8 train_able_variables in the Graph:
<tf.Variable 'conv1_weights:0' shape=(5, 5, 32, 32) dtype=float32_ref>
<tf.Variable 'conv1_biases:0' shape=(32,) dtype=float32_ref>
<tf.Variable 'conv2_weights:0' shape=(5, 5, 32, 32) dtype=float32_ref>
<tf.Variable 'conv2_biases:0' shape=(32,) dtype=float32_ref>
<tf.Variable 'conv1_weights_1:0' shape=(5, 5, 32, 32) dtype=float32_ref>
<tf.Variable 'conv1_biases_1:0' shape=(32,) dtype=float32_ref>
<tf.Variable 'conv2_weights_1:0' shape=(5, 5, 32, 32) dtype=float32_ref>
<tf.Variable 'conv2_biases_1:0' shape=(32,) dtype=float32_ref>
如果用tf.variable_scope()共享變量會(huì)怎么樣呢?
import tensorflow as tf
# 定義一個(gè)卷積層的通用方式
def conv_relu(kernel_shape, bias_shape):
# Create variable named "weights".
weights = tf.get_variable("weights", kernel_shape, initializer=tf.random_normal_initializer())
# Create variable named "biases".
biases = tf.get_variable("biases", bias_shape, initializer=tf.constant_initializer(0.0))
return
def my_image_filter():
# 按照下面的方式定義卷積層赫编,非常直觀巡蘸,而且富有層次感
with tf.variable_scope("conv1"):
# Variables created here will be named "conv1/weights", "conv1/biases".
relu1 = conv_relu([5, 5, 32, 32], [32])
with tf.variable_scope("conv2"):
# Variables created here will be named "conv2/weights", "conv2/biases".
return conv_relu([5, 5, 32, 32], [32])
with tf.variable_scope("image_filters") as scope:
# 下面我們兩次調(diào)用 my_image_filter 函數(shù),但是由于引入了 變量共享機(jī)制
# 可以看到我們只是創(chuàng)建了一遍網(wǎng)絡(luò)結(jié)構(gòu)擂送。
result1 = my_image_filter()
scope.reuse_variables()
result2 = my_image_filter()
# 獲取所有的可訓(xùn)練變量
vs = tf.trainable_variables()
print('There are %d train_able_variables in the Graph: ' % len(vs))
for v in vs:
print(v)
輸出為:
There are 4 train_able_variables in the Graph:
<tf.Variable 'image_filters/conv1/weights:0' shape=(5, 5, 32, 32) dtype=float32_ref>
<tf.Variable 'image_filters/conv1/biases:0' shape=(32,) dtype=float32_ref>
<tf.Variable 'image_filters/conv2/weights:0' shape=(5, 5, 32, 32) dtype=float32_ref>
<tf.Variable 'image_filters/conv2/biases:0' shape=(32,) dtype=float32_ref>
簡(jiǎn)言之悦荒,二者對(duì)于神經(jīng)網(wǎng)絡(luò)的作用就是讓我們寫出來(lái)的神經(jīng)網(wǎng)絡(luò)結(jié)構(gòu)(節(jié)點(diǎn)和節(jié)點(diǎn)之間的連接)更加的清楚明了。下面我們用命名空間整理一下之前簡(jiǎn)單二分類的網(wǎng)絡(luò)結(jié)構(gòu):
#!/usr/bin/env python
# _*_coding:utf-8 _*_
import os
import tensorflow as tf
# set environment
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
# set the visible_devices
os.environ['CUDA_VISIBLE_DEVICES'] = '12, 13, 14, 15'
# GPU list
N_GPU = 4 # GPU number
# define parameters of neural network
BATCH_SIZE = 100*N_GPU
LEARNING_RATE = 0.001
EPOCHS_NUM = 1000
NUM_THREADS = 10
# define the path of log message and model
DATA_DIR = 'data/tmp/'
LOG_DIR = 'data/tmp/log'
DATA_PATH = 'data/test_data.tfrecord'
# get train data
def _parse_function(example_proto):
dics = {
'sample': tf.FixedLenFeature([5], tf.int64),
'label': tf.FixedLenFeature([], tf.int64)}
parsed_example = tf.parse_single_example(example_proto, dics)
parsed_example['sample'] = tf.cast(parsed_example['sample'], tf.float32)
parsed_example['label'] = tf.cast(parsed_example['label'], tf.float32)
return parsed_example
def _get_data(tfrecord_path = DATA_PATH, num_threads = NUM_THREADS, num_epochs = EPOCHS_NUM, batch_size = BATCH_SIZE, num_gpu = N_GPU):
with tf.variable_scope('input_data'):
dataset = tf.data.TFRecordDataset(tfrecord_path)
new_dataset = dataset.map(_parse_function, num_parallel_calls=num_threads)
shuffle_dataset = new_dataset.shuffle(buffer_size=10000)
repeat_dataset = shuffle_dataset.repeat(num_epochs)
batch_dataset = repeat_dataset.batch(batch_size=batch_size)
iterator = batch_dataset.make_one_shot_iterator()
next_element = iterator.get_next()
x_split = tf.split(next_element['sample'], num_gpu)
y_split = tf.split(next_element['label'], num_gpu)
return x_split, y_split
def weight_bias_variable(weight_shape, bias_shape):
weight = tf.get_variable('weight', weight_shape, initializer=tf.random_normal_initializer(mean=0, stddev=1))
bias = tf.get_variable('bias', bias_shape, initializer=tf.random_normal_initializer(mean=0, stddev=1))
return weight, bias
# 隱藏層的函數(shù)定義嘹吨,我們可以根據(jù)layer_name來(lái)設(shè)定不同的隱藏層搬味。這個(gè)程序里只是用了單隱層。
def hidden_layer(x_data, input_dim, output_dim, layer_name):
with tf.variable_scope(layer_name, reuse=tf.AUTO_REUSE):
weight, bias = weight_bias_variable([input_dim, output_dim], [output_dim])
# calculation output
y_hidden = tf.nn.relu(tf.matmul(x_data, weight) + bias)
tf.summary.histogram('weight', weight)
tf.summary.histogram('bias', bias)
tf.summary.histogram('y_hidden', y_hidden)
return y_hidden
# 由于輸出層在計(jì)算輸出時(shí)暫時(shí)不用激活函數(shù)蟀拷,激活函數(shù)在計(jì)算損失函數(shù)時(shí)設(shè)定碰纬。所以這里單獨(dú)創(chuàng)建了輸出層
def output_grads(y_hidden, y_label, input_dim, output_dim):
with tf.variable_scope('out_layer', reuse=tf.AUTO_REUSE):
weight, bias = weight_bias_variable([input_dim, output_dim], [output_dim])
tf.summary.histogram('bias', bias)
y_out = tf.matmul(y_hidden, weight) + bias
y_out = tf.reshape(y_out, [-1])
loss = tf.nn.sigmoid_cross_entropy_with_logits(logits=y_out, labels=y_label)
loss_mean = tf.reduce_mean(loss, 0)
tf.summary.scalar('loss', loss_mean)
grads = opt.compute_gradients(loss_mean)
return loss_mean, grads
# calculate gradient
def average_gradients(tower_grads):
avg_grads = []
# list all the gradient obtained from different GPU
# grad_and_vars represents gradient of w1, b1, w2, b2 of different gpu respectively
for grad_and_vars in zip(*tower_grads): # w1, b1, w2, b2
# calculate average gradients
# print('grad_and_vars: ', grad_and_vars)
grads = []
for g, _ in grad_and_vars: # different gpu
expanded_g = tf.expand_dims(g, 0) # expand one dimension (5, 10) to (1, 5, 10)
grads.append(expanded_g)
grad = tf.concat(grads, 0) # for 4 gpu, 4 (1, 5, 10) will be (4, 5, 10),concat the first dimension
grad = tf.reduce_mean(grad, 0) # calculate average by the first dimension
# print('grad: ', grad)
v = grad_and_vars[0][1] # get w1 and then b1, and then w2, then b2, why?
# print('v',v)
grad_and_var = (grad, v)
# print('grad_and_var: ', grad_and_var)
# corresponding variables and gradients
avg_grads.append(grad_and_var)
return avg_grads
# get samples and labels
with tf.name_scope('input_data'):
x_split, y_split = _get_data()
# set optimizer
opt = tf.train.GradientDescentOptimizer(LEARNING_RATE)
tower_grads = []
for i in range(N_GPU):
with tf.device("/gpu:%d" % i):
with tf.name_scope('GPU_%d' %i) as scope:
y_hidden = hidden_layer(x_split[i], input_dim=5, output_dim=10, layer_name='hidden1')
loss_mean, grads = output_grads(y_hidden, y_label=y_split[i], input_dim=10, output_dim=1)
tower_grads.append(grads)
with tf.name_scope('update_parameters'):
# get average gradient
grads = average_gradients(tower_grads)
for i in range(len(grads)):
tf.summary.histogram('gradients/'+grads[i][1].name, grads[i][0])
# update parameters。
apply_gradient_op = opt.apply_gradients(grads)
init = tf.global_variables_initializer()
config = tf.ConfigProto()
config.gpu_options.allow_growth = False # 配置GPU內(nèi)存分配问芬,剛一開始分配少量的GPU容量悦析,
# 然后按需慢慢的增加,由于不會(huì)釋放內(nèi)存此衅,所以會(huì)導(dǎo)致碎片
config.allow_soft_placement = True # 當(dāng)指定設(shè)備不存在時(shí)她按,找可用設(shè)備
config.log_device_placement = False
with tf.Session(config=config) as sess:
sess.run(init)
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
merged = tf.summary.merge_all()
writer = tf.summary.FileWriter('data/tfboard', sess.graph)
sess.run(tf.global_variables_initializer())
sess.run(tf.local_variables_initializer())
for step in range(1000):
sess.run(apply_gradient_op)
summary = sess.run(merged)
writer.add_summary(summary, step)
writer.close()
coord.request_stop()
coord.join(threads)
根據(jù)保存路徑,在終端輸入:
$ tensorboard --logdir=路徑(例如:/Users/username/PycharmProjects/firsttensorflow/multigpu/data/tfboard)
在瀏覽器中輸入http://localhost:6006炕柔,出現(xiàn)tensorboard的界面,默認(rèn)界面是scalar(標(biāo)量):
切換到graph界面如下圖所示: