1.架構(gòu)分析
TF core是什么满着?
為什么HDFS只和worker相連而與PS無關(guān)雀摘?
TensorFlowOnSpark的架構(gòu)較為簡(jiǎn)單实愚,Spark Driver程序并不會(huì)參與TensorFlow內(nèi)部相關(guān)的計(jì)算和處理纬傲。其設(shè)計(jì)思路像是將一個(gè)TensorFlow集群運(yùn)行在了Spark上,其在每個(gè)Spark Executor中啟動(dòng)TensorFlow應(yīng)用程序贴铜,然后通過gRPC或RDMA方式進(jìn)行數(shù)據(jù)傳遞與交互粪摘。
2. 生命周期
TensorFlowOnSpark的Spark應(yīng)用程序包括4個(gè)基本過程。
- Reserve:組建TensorFlow集群绍坝,并在每個(gè)Executor進(jìn)程上預(yù)留監(jiān)聽端口徘意,啟動(dòng)“數(shù)據(jù)/控制”消息的監(jiān)聽程序。
- Start:在每個(gè)Executor進(jìn)程上啟動(dòng)TensorFlow應(yīng)用程序轩褐;
- Train/Inference:在TensorFlow集群上完成模型的訓(xùn)練或推理
- Shutdown:關(guān)閉Executor進(jìn)程上的TensorFlow應(yīng)用程序椎咧,釋放相應(yīng)的系統(tǒng)資源(消息隊(duì)列)。
用戶直接通過spark-submit的方式提交Spark應(yīng)用程序(mnist_spark.py)把介。其中通過--py-files選項(xiàng)附帶TensorFlowOnSpark框架(tfspark.zip)勤讽,及其TensorFlow應(yīng)用程序(mnist_dist.py),從而實(shí)現(xiàn)TensorFlow集群在Spark平臺(tái)上的部署拗踢。
首先看看TensorFlow集群的建立過程脚牍。
首先根據(jù)spark-submit傳遞的num_executor參數(shù),通過調(diào)用cluster = sc.parallelize(num_executor)建立一個(gè)ParllelCollectionRDD巢墅,其中分區(qū)數(shù)為num_executor莫矗。也就是說,此時(shí)分區(qū)數(shù)等于Executor數(shù)砂缩。
然后再調(diào)用cluster.mapPartitions(TFSparkNode.reserve)將ParllelCollectionRDD變換(transformation)為MapPartitionsRDD,在每個(gè)分區(qū)上回調(diào)TFSparkNode.reserve三娩。
TFSparkNode.reserve將會(huì)在該節(jié)點(diǎn)上預(yù)留一個(gè)端口庵芭,并駐留一個(gè)Manager服務(wù)。Manager持有一個(gè)隊(duì)列雀监,用于完成進(jìn)程間的同步双吆,實(shí)現(xiàn)該節(jié)點(diǎn)的“數(shù)據(jù)/控制”消息的服務(wù)。
數(shù)據(jù)消息啟動(dòng)了兩個(gè)隊(duì)列:Input與Output会前,分別用于RDD與Executor進(jìn)程之間的數(shù)據(jù)交換好乐。
控制消息啟動(dòng)了一個(gè)隊(duì)列:Control,用于Driver進(jìn)程控制PS任務(wù)的生命周期瓦宜,當(dāng)模型訓(xùn)練完成之后蔚万,通過Driver發(fā)送Stop的控制消息結(jié)束PS任務(wù)。
這是從分區(qū)的角度看待TensorFlow集群建立的過程临庇,橫軸表示RDD反璃。這里存在兩個(gè)RDD昵慌,第一個(gè)為ParllelCollectionRDD,然后變換為MapPartitionsRDD淮蜈。
縱軸表示同一個(gè)分區(qū)(Partition)斋攀,并在每個(gè)分區(qū)上啟動(dòng)一個(gè)Executor進(jìn)程 。在Spark中梧田,分區(qū)數(shù)等于最終在TaskScheduler上調(diào)度的Task數(shù)目淳蔼。
此處,sc.parallelize(num_executor)生成一個(gè)分區(qū)數(shù)為num_executor的ParllelCollectionRDD裁眯。也就是說鹉梨,此時(shí)分區(qū)數(shù)等于num_executor數(shù)目。
在本例中未状,num_executor為3俯画,包括1個(gè)PS任務(wù),2個(gè)Worker任務(wù)司草。
TensorFlow集群建立后艰垂,將生成上圖所示的領(lǐng)域模型。其中埋虹,一個(gè)TFCluster將持有num_executor個(gè)TFSparkNode節(jié)點(diǎn)猜憎;在每個(gè)TFSparkNode上駐留一個(gè)Manager服務(wù),并預(yù)留一個(gè)監(jiān)聽端口搔课,用于監(jiān)聽“數(shù)據(jù)/控制”消息胰柑。
實(shí)際上,TFSparkNode節(jié)點(diǎn)承載于Spark Executor進(jìn)程之上爬泥。
3. 啟動(dòng)
TensorFlow集群建立后柬讨,通過調(diào)用cluster.start啟動(dòng)集群服務(wù)。其結(jié)果將在每個(gè)Executor進(jìn)程上啟動(dòng)TensorFlow應(yīng)用程序袍啡。
此處踩官,需要對(duì)原生的TensorFlow應(yīng)用程序進(jìn)行適配修改,包括2個(gè)部分:
Feeding與Fetching: 數(shù)據(jù)輸入/輸出機(jī)制修改
ClusterSpec: TF集群的構(gòu)造描述
其余代碼都將保留境输,最小化TensorFlow應(yīng)用程序的修改蔗牡。
在cluster上調(diào)用foreachPartition(TFSparkNode.start(map_func)),將在每個(gè)分區(qū)(Executor進(jìn)程)上回調(diào)TFSparkNode.start(map_func)嗅剖。其中辩越,map_func是對(duì)應(yīng)TF應(yīng)用程序的包裝。
通過上述過程信粮,在Spark上拉起了一個(gè)TF的集群服務(wù)黔攒。從而使得Spark集群擁有了深度學(xué)習(xí)和GPU加速的能力。
4.數(shù)據(jù)供給
- TensorFlow QueueRunner: FileReader & QueueRunner
- Spark Feeding: RDD->Executor->TensorFlow Graph
當(dāng)Spark平臺(tái)上已經(jīng)拉起了TF集群服務(wù)之后,便可以啟動(dòng)模型的訓(xùn)練或推理過程了亏钩。在訓(xùn)練或推理過程中莲绰,最重要的是解決數(shù)據(jù)的Feeding和Fetching問題。
TFoS上提供了兩種方案:
TensorFlow QueueRunner:利用TensorFlow提供的FileReader和QueueRunner機(jī)制姑丑。Spark未參與任何工作蛤签,請(qǐng)查閱TensorFlow官方相關(guān)文檔。
Spark Feeding:首先從RDD讀取分區(qū)數(shù)據(jù)(通過HadoopRDD.compute)栅哀,然后將其放在Input隊(duì)列中震肮,Executor進(jìn)程再?gòu)脑撽?duì)列中取出,并進(jìn)一步通過feed_dict留拾,調(diào)用session.run將分區(qū)數(shù)據(jù)供給給TensorFlow Graph中戳晌。
Feeding過程,就是通過Input Queue同步實(shí)現(xiàn)的痴柔。當(dāng)RDD讀取分區(qū)數(shù)據(jù)后沦偎,阻塞式地將分區(qū)數(shù)據(jù)put到Input隊(duì)列中;TFGraph在session.run獲取Next Batch時(shí)咳蔚,也是阻塞式地等待數(shù)據(jù)的到來豪嚎。
同樣的道理,F(xiàn)etching過程與Feeding過程類同谈火,只是使用Output Queue侈询,并且數(shù)據(jù)流方向相反。
session.run返回的數(shù)據(jù)糯耍,通過put阻塞式地放入Output Queue扔字,RDD也是阻塞式地等待數(shù)據(jù)到來。
以模型訓(xùn)練過程為例温技,講解RDD的變換過程革为。此處以Mnist手寫識(shí)別為例,左邊表示X舵鳞,右邊表示Y篷角。分別通過HadoopRDD讀取分區(qū)數(shù)據(jù),然后通過MapPartititionRDD變換分區(qū)的數(shù)據(jù)格式系任;然后通過zip算子,實(shí)現(xiàn)兩個(gè)RDD的折疊虐块,生成ZipPartitionsRDD俩滥。
然后,根據(jù)Epoches超參數(shù)的配置贺奠,將該RDD重復(fù)執(zhí)行Epoches次霜旧,最終將結(jié)果匯總,生成UnionRDD。
在此之前挂据,都是Transformation的過程以清,最終調(diào)用foreachPartition(train)啟動(dòng)Action,觸發(fā)Spark Job的提交和任務(wù)的運(yùn)行崎逃。
5.關(guān)閉隊(duì)列
當(dāng)模型訓(xùn)練或推理完成之后掷倔,分別在Input/Control隊(duì)列中投擲Stop(以傳遞None實(shí)現(xiàn))消息,當(dāng)Manager收到Stop消息后个绍,停止隊(duì)列的運(yùn)行勒葱。
最終,Spark應(yīng)用程序退出巴柿,Executor進(jìn)程退出凛虽,整個(gè)工作流執(zhí)行結(jié)束。
轉(zhuǎn)載自劉光聰-TensorFlow遇上Spark