姓名:吳兆陽 ?學號:14020199009
轉(zhuǎn)自機器機器之心
嵌牛導讀:通過多 GPU 并行的方式可以有很好的加速效果系吭,然而一臺機器上所支持的 GPU 是有限的五嫂,因此本文介紹了分布式 TensorFlow。分布式 TensorFlow 允許我們在多臺機器上運行一個模型村斟,所以訓練速度或加速效果能顯著地提升贫导。本文簡要概述了分布式 TensorFlow 的原理與實踐抛猫,希望能為準備入坑分布式訓練的讀者提供一些介紹。不幸的是孩灯,關于分布式 TensorFlow 的官方文檔過于簡略闺金。我們需要一個稍微易懂的介紹,即通過 Jupyter 運行一些基本例子峰档。
嵌牛鼻子:分布式TensorFlow
嵌牛提問:什么是分布式TensorFlow败匹,如何入坑?
嵌牛正文:
簡介
importtensorflowastf
比方說讥巡,我們希望多個進程共享一些共同的參數(shù)掀亩。為了簡單起見,假設這只是一個單一的變量:
var = tf.Variable(initial_value=0.0)
第一步欢顷,我們需要為每個進程創(chuàng)建自己的會話槽棍。(假設 sess1 在一個進程中創(chuàng)建,而 sess2 會在另一個進程中創(chuàng)建)抬驴。
sess1 = tf.Session()
sess2 = tf.Session()
sess1.run(tf.global_variables_initializer())
sess2.run(tf.global_variables_initializer())
每次調(diào)用 tf.Session() 都會創(chuàng)建一個單獨的「執(zhí)行引擎」炼七,然后將會話句柄連接到執(zhí)行引擎。執(zhí)行引擎是實際存儲變量值并運行操作的東西布持。且 Python 天生是面向?qū)ο蟮木幊蹋锩娴脑囟际穷惢驅(qū)ο筇馀虼烁降卣f按傅,tf.Seesio() 是 TensorFlow 中的一個方法,它會打開一個會話并運行計算圖。
通常痊乾,不同進程中的執(zhí)行引擎是不相關的。在一個會話中更改變量(在一個執(zhí)行引擎上)不會影響其他會話中的變量。
print("Initial value of var in session 1:", sess1.run(var))
print("Initial value of var in session 2:", sess2.run(var))
sess1.run(var.assign_add(1.0))
print("Incremented var in session 1")
print("Value of var in session 1:", sess1.run(var))
print("Value of var in session 2:", sess2.run(var))
上面代碼塊的輸出結(jié)果為:
Initialvalue of varinsession1:0.0
Initialvalue of varinsession2:0.0
Incrementedvarinsession1
Valueof varinsession1:1.0
Valueof varinsession2:0.0
對于分布式 TensorFlow,我們首先需要了解它的基本原理。以下代碼展示了如何構(gòu)建一個最簡單 TensorFlow 集群待德,以幫助我們理解它的基本原理。
importtensorflowastf
c=tf.constant("Hello, Distributed TensorFlow!")
# 創(chuàng)建一個本地TensorFlow集群
server=tf.train.Server.create_local_server()
# 在集群上創(chuàng)建一個會話
sess=tf.Session(server.target)
print(sess.run(c))
在以上代碼中,我們先通過 tf.train.Server.create_local_server 在本地創(chuàng)建一個只有一臺機器的 TensorFlow 集群秉剑。然后在集群上生成一個會話侦鹏,通過該對話,我們可以將創(chuàng)建的計算圖運行在 TensorFlow 集群上岁疼。雖然這只是一個單機集群郭毕,但它基本上反映了 TensorFlow 集群的工作流程扳肛。
TensorFlow 集群會通過一系列任務(task)來執(zhí)行計算圖中的運算,一般來說不同的任務會在不同的機器上運行尖飞。TensorFlow 集群中的任務也會被聚集為工作(job)。例如在訓練深度模型時亥揖,一臺運行反向傳播的機器是一個任務,而所有運行反向傳播的集合是一個工作。上面簡單的案例只是一個任務的集群,若一個 TensorFlow 集群有多個任務時,我們需要使用 tf.train.ClusterSpec 來指定每一個任務的機器璧函。
使用分布式 TensorFlow 訓練深度學習模型一般有兩種方式,即 in-graph replication 和 between-graph replication宪萄。第一種計算圖內(nèi)的分布式會令所有任務都使用一個 TensorFlow 計算圖中的變量艺谆,而只是將計算部分分配到不同的服務器上。而另一種計算圖間的分布式會在每一個計算服務器上創(chuàng)建一個獨立的 TensorFlow 計算圖拜英,但不同計算圖中的相同參數(shù)需要以一種固定的方式存放到同一個參數(shù)服務器中静汤。以上大概就是分布式 TensorFlow 的基本概念,隨后我們將通過具體的案例與代碼加深這一部分的理解居凶。
分布式 TensorFlow
為了在進程之間共享變量虫给,我們需要將不同的執(zhí)行引擎連接在一起,并輸入分布式張量流侠碧。
若使用分布式 TensorFlow抹估,每個進程會運行一個特殊的執(zhí)行引擎:一個 TensorFlow 服務器。服務器作為集群的一部分鏈接在一起舆床。(群集中的每個服務器也稱為任務棋蚌。)
第一步是定義集群的規(guī)模。我們從最簡單的集群開始:即兩臺服務器(兩個任務)挨队,它們都在同一臺機器上谷暮,一個在 2222 端口,一個在 2223 端口盛垦。
tasks = ["localhost:2222","localhost:2223"]
每個任務都與「工作」(job)相關聯(lián)湿弦,該工作是相關任務的集合。我們將這兩個任務與一個稱為「local」的工作相關聯(lián)腾夯。
jobs = {"local": tasks}
所有這些即定義為一個集群颊埃。
cluster = tf.train.ClusterSpec(jobs)
我們現(xiàn)在可以啟動服務器,指定每個服務器對應為集群定義中的哪個服務器蝶俱。立即啟動各服務器班利,監(jiān)聽集群設置中指定的端口。
# "This server corresponds to the the first task (task_index=0)
# of the tasks associated with the 'local' job."
server1 = tf.train.Server(cluster, job_name="local", task_index=0)
server2 = tf.train.Server(cluster, job_name="local", task_index=1)
將服務器連接在同一個集群中榨呆,我們現(xiàn)在可以體驗到分布式 TensorFlow 的強大功能:任何具有相同名稱的變量都將在所有服務器之間共享罗标。
最簡單的例子是在所有的服務器上運行同一張靜態(tài)計算圖,且每個圖只有一個變量:
tf.reset_default_graph()
var = tf.Variable(initial_value=0.0, name='var')
sess1 = tf.Session(server1.target)
sess2 = tf.Session(server2.target)
現(xiàn)在积蜻,在一臺服務器上對變量所作的修改將在第二臺服務器上作鏡像處理闯割。
sess1.run(tf.global_variables_initializer())
sess2.run(tf.global_variables_initializer())
print("Initial value of var in session 1:", sess1.run(var))
print("Initial value of var in session 2:", sess2.run(var))
sess1.run(var.assign_add(1.0))
print("Incremented var in session 1")
print("Value of var in session 1:", sess1.run(var))
print("Value of var in session 2:", sess2.run(var))
Initialvalue of varinsession1:0.0
Initialvalue of varinsession2:0.0
Incrementedvarinsession1
Valueof varinsession1:1.0
Valueof varinsession2:1.0
請注意,因為我們只有一個變量且該變量由兩個會話共享竿拆,第二個會話再調(diào)用 global_variables_initializer 就有些多余宙拉。
存放
現(xiàn)在我們可能會想:變量究竟存儲在哪個服務器上?又是哪個服務器在運行操作丙笋?
按經(jīng)驗來說谢澈,變量和操作都默認存儲在集群的第一個任務上煌贴。
defrun_with_location_trace(sess, op):
# From https://stackoverflow.com/a/41525764/7832197
run_options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE)
run_metadata = tf.RunMetadata()
sess.run(op, options=run_options, run_metadata=run_metadata)
fordeviceinrun_metadata.step_stats.dev_stats:
print(device.device)
fornodeindevice.node_stats:
print(" ?", node.node_name)
例如,如果我們使用連接到第一個任務的會話來處理變量 var澳化,那么所有操作都會運行在這個任務上:
run_with_location_trace(sess1, var)
/job:local/replica:0/task:0/device:CPU:0
_SOURCE
var
run_with_location_trace(sess1, var.assign_add(1.0))
/job:local/replica:0/task:0/device:CPU:0
_SOURCE
AssignAdd_1/value
var
AssignAdd_1
但是崔步,如果我們嘗試使用連接到第二個任務的會話處理變量 var,那么圖節(jié)點仍然會在第一個任務上運行缎谷。
run_with_location_trace(sess2, var)
/job:local/replica:0/task:1/device:CPU:0
_SOURCE
/job:local/replica:0/task:0/device:CPU:0
_SOURCE
var
要將一個變量或操作固定到特定任務上井濒,我們可以使用 tf.device:
withtf.device("/job:local/task:0"):
var1 = tf.Variable(0.0, name='var1')
withtf.device("/job:local/task:1"):
var2 = tf.Variable(0.0, name='var2')
# (This will initialize both variables)
sess1.run(tf.global_variables_initializer())
現(xiàn)在,var1 像之前一樣運行在第一個任務上列林。
run_with_location_trace(sess1, var1)
/job:local/replica:0/task:0/device:CPU:0
_SOURCE
var1
但是 var2 運行在第二個任務上瑞你。即使我們嘗試使用連接到第一個任務的會話來評估它,它仍然在第二個任務上運行希痴。
run_with_location_trace(sess1, var2)
/job:local/replica:0/task:0/device:CPU:0
_SOURCE
/job:local/replica:0/task:1/device:CPU:0
_SOURCE
var2
變量 2 亦是如此者甲。
run_with_location_trace(sess2, var2)
/job:local/replica:0/task:1/device:CPU:0
_SOURCE
var2
run_with_location_trace(sess2, var1)
/job:local/replica:0/task:1/device:CPU:0
_SOURCE
/job:local/replica:0/task:0/device:CPU:0
_SOURCE
var1
計算圖
分布式 TensorFlow 處理圖的過程有幾點需要注意。
誰構(gòu)建了這個圖砌创?
首先虏缸,盡管在整個集群中共享變量值,但圖并不會自動共享嫩实。
我們用兩臺服務器創(chuàng)建一個新的集群刽辙,然后用顯式創(chuàng)建的圖設置第一臺服務器。
cluster = tf.train.ClusterSpec({"local": ["localhost:2224","localhost:2225"]})
server1 = tf.train.Server(cluster, job_name="local", task_index=0)
server2 = tf.train.Server(cluster, job_name="local", task_index=1)
graph1 = tf.Graph()
withgraph1.as_default():
var1 = tf.Variable(0.0, name='var')
sess1 = tf.Session(target=server1.target, graph=graph1)
print(graph1.get_operations())
[, , , ]
如果我們創(chuàng)建連接到第二臺服務器的會話甲献,請注意圖不會自動獲取鏡像宰缤。
graph2 = tf.Graph()
sess2 = tf.Session(target=server2.target, graph=graph2)
print(graph2.get_operations())
————————————————————————————
[]
要訪問共享變量,我們必須手動添加一個同名的變量到第二個圖中晃洒。
withgraph2.as_default():
var2 = tf.Variable(0.0, name='var')
只有如此我們才可以訪問它慨灭。
sess1.run(var1.assign(1.0))
sess2.run(var2)
————————————————————————————
1.0
關鍵是:每個服務器負責創(chuàng)建自己的圖。
所有服務器上的圖都必須一樣嗎球及?
到目前為止氧骤,我們所有的例子都是在兩臺服務器上運行相同的圖。這被稱為圖內(nèi)復制(in-graph replication)吃引。
例如筹陵,假設我們有一個包含三臺服務器的集群。服務器 1 保存共享參數(shù)际歼,而服務器 2 和服務器 3 是工作站節(jié)點,每個都有本地變量姑蓝。在圖內(nèi)復制中鹅心,每臺服務器的圖如下所示:
圖內(nèi)復制的問題在于每個服務器都必須具有整個圖的副本,包括可能只與其他服務器相關的子圖纺荧。這可能會導致圖變得非常大旭愧。
另一種方法是圖間復制(between-graph replication)颅筋。在這里,每個服務器都運行一個只包含共享參數(shù)的圖输枯,而且任何變量和操作都與單個服務器相關议泵。
這種方法縮減了圖的大小,因此我們推薦使用圖間復制桃熄。
實踐細節(jié)
在介紹完整示例之前先口,有幾個實踐中遇到的細節(jié)問題需要討論一下。
如果在所有服務器互聯(lián)之前嘗試在集群上運行某些程序瞳收,會發(fā)生什么碉京?
我們再次創(chuàng)建一個雙任務集群。
cluster = tf.train.ClusterSpec({
"local": ["localhost:2226","localhost:2227"]
})
這一次螟深,讓我們在隔離進程中啟動每個服務器谐宙。(這允許我們隨時關閉服務器,以便再次啟動它們進行后續(xù)的實驗界弧。除了關閉啟動服務器的進程之外凡蜻,目前沒有其它辦法關閉服務器。)
frommultiprocessingimportProcess
fromtimeimportsleep
defs1():
server1 = tf.train.Server(cluster,
job_name="local",
task_index=0)
sess1 = tf.Session(server1.target)
print("server 1: running no-op...")
sess1.run(tf.no_op())
print("server 1: no-op run!")
server1.join()# Block
defs2():
foriinrange(3):
print("server 2: %d seconds left before connecting..."
% (3- i))
sleep(1.0)
server2 = tf.train.Server(cluster,
job_name="local",
task_index=1)
print("server 2: connected!")
server2.join()# Block
# daemon=True so that these processes will definitely be killed
# when the parent process restarts
p1 =Process(target=s1, daemon=True)
p2 =Process(target=s2, daemon=True)
服務器 1 即刻加入集群垢箕,但服務器 2 在連接之前等待了一會兒划栓。結(jié)果如下所示:
p1.start()
p2.start()
server2:3seconds left before connecting...
server1: running no-op...
server2:2seconds left before connecting...
server2:1seconds left before connecting...
server2: connected!
server1: no-op run!
可以看出,每個服務器都試圖在集群上運行一個操作舰讹,直到所有的服務器都加入茅姜。
p1.terminate()
p2.terminate()
當服務器脫離集群會怎樣?
我們用兩臺服務器建立一個集群月匣。服務器 1 只是反復嘗試和運行位于服務器 1 上的 no-op 操作钻洒。服務器 2 將在兩秒鐘后宕機。
defs1():
server1 = tf.train.Server(cluster,
job_name="local",
task_index=0)
withtf.device("/job:local/task:0"):
no_op = tf.no_op()
sess1 = tf.Session(server1.target)
for_inrange(6):
print("Server 1: about to run no-op...", end="")
sess1.run(no_op)
print("success!")
sleep(1.0)
defs2():
server2 = tf.train.Server(cluster,
job_name="local",
task_index=1)
sleep(2.0)
print("Server 2 dieing...")
p1 =Process(target=s1, daemon=True)
p2 =Process(target=s2, daemon=True)
p1.start()
p2.start()
————————————————————————————————
Server1: about to run no-op...success!
Server1: about to run no-op...success!
Server2dieing...
Server1: about to run no-op...success!
Server1: about to run no-op...success!
Server1: about to run no-op...success!
Server1: about to run no-op...success!
短期內(nèi)锄开,只要我們試圖運行的操作不在脫離的服務器上素标,似乎不會出現(xiàn)問題。(我沒有測試過長期運行會發(fā)生什么萍悴。)
如果操作是在脫離的服務器上……
defs1():
server1 = tf.train.Server(cluster,
job_name="local",
task_index=0)
# This time, we place the no-op on server 2,
# which is going to leave
withtf.device("/job:local/task:1"):
no_op = tf.no_op()
sess1 = tf.Session(server1.target)
for_inrange(5):
print("Server 1: about to run no-op...", end="")
sess1.run(no_op)
print("success!")
sleep(1.0)
p1 =Process(target=s1, daemon=True)
p2 =Process(target=s2, daemon=True)
p1.start()
p2.start()
——————————————————————————————————
—
Server1: about to run no-op...success!
Server1: about to run no-op...success!
Server2dieing...
然后嘗試運行操作代碼头遭。
p1.terminate()
p2.terminate()
如果服務器又加入集群會怎樣?
p1 =Process(target=s1, daemon=True)
p2 =Process(target=s2, daemon=True)
p1.start()
p2.start()
sleep(3.0)
# At this point, server 1 is blocked, and server 2 is dead.
print("Restarting server 2...")
p2 =Process(target=s2, daemon=True)
p2.start()
————————————————————————————
Server1: about to run no-op...success!
Server1: about to run no-op...success!
Server2dieing...
Restartingserver2...
ProcessProcess-7:
Traceback(most recent call last):
File"/Users/matthew/tensorflow/lib/python3.6/site-packages/tensorflow/python/client/session.py", line1323,in_do_call
returnfn(*args)
File"/Users/matthew/tensorflow/lib/python3.6/site-packages/tensorflow/python/client/session.py", line1302,in_run_fn
status, run_metadata)
File"/Users/matthew/tensorflow/lib/python3.6/site-packages/tensorflow/python/framework/errors_impl.py", line473,in__exit__
c_api.TF_GetCode(self.status.status))
tensorflow.python.framework.errors_impl.AbortedError:Graphhandleisnotfound:0000000000000001
Server1: about to run no-op...Server2dieing...
系統(tǒng)報了一個 Graph handle is not found 的錯誤癣诱。
因此分布式 TensorFlow 不會自動恢復服務器故障计维。(如果您對容錯有興趣,請查看 https://www.youtube.com/watch?v=la_M6bCV91M撕予。)
誰負責初始化共享變量鲫惶?
一種方法是讓所有工作站運行 tf.global_variables_initializer()。
但是如果我們想保持代碼整潔并且只用一個服務器進行初始化实抡,那么如果有其他服務器在初始化之前嘗試使用這些變量欠母,可能會遇到問題欢策。一個解決方案就是讓其他工作站等待,直到使用 tf.report_uninitialized_variables 的初始化開始赏淌。
defs1():
server1 = tf.train.Server(cluster,
job_name="local",
task_index=0)
var = tf.Variable(0.0, name='var')
sess1 = tf.Session(server1.target)
print("Server 1: waiting for connection...")
sess1.run(tf.report_uninitialized_variables())
whilelen(sess1.run(tf.report_uninitialized_variables())) >0:
print("Server 1: waiting for initialization...")
sleep(1.0)
print("Server 1: variables initialized!")
defs2():
server2 = tf.train.Server(cluster,
job_name="local",
task_index=1)
var = tf.Variable(0.0, name='var')
sess2 = tf.Session(server2.target)
foriinrange(3):
print("Server 2: waiting %d seconds before initializing..."
% (3- i))
sleep(1.0)
sess2.run(tf.global_variables_initializer())
p1 =Process(target=s1, daemon=True)
p2 =Process(target=s2, daemon=True)
p1.start()
p2.start()
—————————————————————————————————
Server1: waitingforconnection...
Server2: waiting3seconds before initializing...
Server1: waitingforinitialization...
Server2: waiting2seconds before initializing...
Server1: waitingforinitialization...
Server2: waiting1seconds before initializing...
Server1: waitingforinitialization...
Server1: variables initialized!
p1.terminate()
p2.terminate()
示例
讓我們把所學的知識融合到最后一個使用多進程的例子中踩寇。
我們將創(chuàng)建:
一個存儲單個變量 var 的參數(shù)服務器。
兩個工作站任務(worker task)六水,每個工作站將多次增加變量 var 的值俺孙。
我們將讓參數(shù)服務器多輸出幾次 var 的值,以便查看其變化缩擂。
importtensorflowastf
frommultiprocessingimportProcess
fromtimeimportsleep
cluster = tf.train.ClusterSpec({
"worker": [
"localhost:3333",
"localhost:3334",
],
"ps": [
"localhost:3335"
]
})
defparameter_server():
withtf.device("/job:ps/task:0"):
var = tf.Variable(0.0, name='var')
server = tf.train.Server(cluster,
job_name="ps",
task_index=0)
sess = tf.Session(target=server.target)
print("Parameter server: waiting for cluster connection...")
sess.run(tf.report_uninitialized_variables())
print("Parameter server: cluster ready!")
print("Parameter server: initializing variables...")
sess.run(tf.global_variables_initializer())
print("Parameter server: variables initialized")
foriinrange(5):
val = sess.run(var)
print("Parameter server: var has value %.1f"% val)
sleep(1.0)
print("Parameter server: blocking...")
server.join()
defworker(worker_n):
withtf.device("/job:ps/task:0"):
var = tf.Variable(0.0, name='var')
server = tf.train.Server(cluster,
job_name="worker",
task_index=worker_n)
sess = tf.Session(target=server.target)
print("Worker %d: waiting for cluster connection..."% worker_n)
sess.run(tf.report_uninitialized_variables())
print("Worker %d: cluster ready!"% worker_n)
whilesess.run(tf.report_uninitialized_variables()):
print("Worker %d: waiting for variable initialization..."% worker_n)
sleep(1.0)
print("Worker %d: variables initialized"% worker_n)
foriinrange(5):
print("Worker %d: incrementing var"% worker_n)
sess.run(var.assign_add(1.0))
sleep(1.0)
print("Worker %d: blocking..."% worker_n)
server.join()
ps_proc =Process(target=parameter_server, daemon=True)
w1_proc =Process(target=worker, args=(0, ), daemon=True)
w2_proc =Process(target=worker, args=(1, ), daemon=True)
ps_proc.start()
————————————————————————————
Parameterserver: waitingforcluster connection...
Parameterserver: cluster ready!
Parameterserver: initializing variables...
Parameterserver: variables initialized
Parameterserver: var has value0.0
Parameterserver: var has value2.0
Parameterserver: var has value4.0
Parameterserver: var has value5.0
Parameterserver: var has value7.0
Parameterserver: blocking...
w1_proc.start()
————————————————————————————————
Worker0: waitingforcluster connection...
Worker0: cluster ready!
Worker0: waitingforvariable initialization...
Worker0: variables initialized
Worker0: incrementing var
Worker0: incrementing var
Worker0: incrementing var
Worker0: incrementing var
Worker0: incrementing var
Worker0: blocking...
w2_proc.start()
———————————————————————————————
Worker1: waitingforcluster connection...
Worker1: cluster ready!
Worker1: waitingforvariable initialization...
Worker1: variables initialized
Worker1: incrementing var
Worker1: incrementing var
Worker1: incrementing var
Worker1: incrementing var
Worker1: incrementing var
Worker1: blocking...
forprocin[w1_proc, w2_proc, ps_proc]:
proc.terminate()
總結(jié)
通過本文鼠冕,我們了解了:
如何將多個 TensorFlow 執(zhí)行引擎(運行在不同進程或不同機器上)集成為一個集群,以便共享變量胯盯。
如何為變量或操作指定服務器懈费。
圖內(nèi)復制與圖間復制。
在所有服務器互聯(lián)之前或在服務器脫離集群之后在集群上運行操作博脑,會發(fā)生什么憎乙。
如何等待變量被集群中的另一個任務初始化。