概述
這篇文章側(cè)重于分析JobManager和TaskManager的啟動(dòng)過程以及注冊(cè)相味,還有Flink的implementation中所用到的設(shè)計(jì)模式越平。本文從本地與standalone模式進(jìn)行解析识虚。
Akka 簡(jiǎn)介
- 因?yàn)榻M件之間的信息傳遞是通過Akka工具包幅慌,所以在這兒我做一個(gè)簡(jiǎn)單的解釋
-
首先參考Akka官方文檔給出的一個(gè)抽象的圖
- 如圖就是對(duì)ActorSystem的一個(gè)高度抽象,所有的Actor成樹狀蛤织,user actor的子孫就是用戶創(chuàng)建的Actor,system下面是Akka的監(jiān)管與支持的Actor鸿染,往往不需要用戶過多參與
- Actor之間如果持有對(duì)方的ActorRef則可以向?qū)Ψ桨l(fā)送消息
- 父Actor負(fù)責(zé)監(jiān)管子Actor拋出的異常能被父Actor處理指蚜,則父Actor可以重啟或者廢棄它,如果不能處理涨椒,會(huì)繼續(xù)向上拋異常摊鸡。一個(gè)Actor如果推出,那么它的所有子Actor都會(huì)退出蚕冬。
- 它們共同組成了一個(gè)ActorSystem
Flink的架構(gòu)
-
首先上一張從Flink官方文檔拿來的架構(gòu)圖
這次我會(huì)從JobManager和TaskManager著手來解析Flink的啟動(dòng)過程
Flink中JobManager與TaskManager免猾,JobManager與Client的交互是基于Akka工具包的,是通過消息驅(qū)動(dòng)囤热,這樣就把中心放到了消息的接收猎提,發(fā)送與處理,而且由于對(duì)每個(gè)Akka中的Actor來說旁蔼,消息是同步的锨苏,排在一個(gè)隊(duì)列中,極大地簡(jiǎn)化了多線程程序設(shè)計(jì)的復(fù)雜度棺聊,關(guān)于Akka的一些學(xué)習(xí)資料我會(huì)放在文章最后
-
下面的啟動(dòng)過程我會(huì)分為入口伞租,ActorSystem的創(chuàng)建,JobManager的啟動(dòng)限佩,TaskManager的啟動(dòng)和注冊(cè)這幾塊來講
入口
如果看過該系列上一篇文章葵诈,應(yīng)該知道怎么找到在Local和Standalone模式下程序的入口
這里我直接給出來,就是org.apache.flink.runtime.jobmanager 中的main方法
-
main方法主要可以分為,一是啟動(dòng)環(huán)境的準(zhǔn)備
-
二是處理輸入的命令行參數(shù)驯击,并準(zhǔn)備好包含配置信息的Configuration對(duì)象,以及host地址與端口號(hào)耐亏,還有運(yùn)行模式
-
三是新建一個(gè)Callable對(duì)象徊都,在另一個(gè)線程上運(yùn)行runJobManager方法
-
runJobManager有 個(gè)重載方法,第一個(gè)方法中綁定了端口號(hào)并建立了socket鏈接广辰,并調(diào)用第二個(gè)重載方法
-
第二個(gè)重載方法根據(jù)本機(jī)硬件情況建立futureExecutor(對(duì)Future類不了解的話可以簡(jiǎn)單理解成Java中帶返回值的Runnable暇矫,細(xì)節(jié)不做討論), ioExecutor择吊,并調(diào)用startActorSystemAndJobManagerActors方法建立ActorSystem并等待結(jié)束
ActorSystem創(chuàng)建
-
在這里Flink做了一個(gè)Akka的工具類來簡(jiǎn)化邏輯李根,本質(zhì)上就是從Configuration配置文件類中提取Akka啟動(dòng)所需配置信息,并根據(jù)配置建立ActorSystem几睛,ActorSystem可以是本機(jī)的房轿,也可以是跨多臺(tái)機(jī)器的,具體邏輯都在AkkaUtils中所森,有興趣可以研究
建立JobManager的可視化WebMonitor囱持,這里不做介紹
-
重點(diǎn)出現(xiàn),在這里通過ActorSystem建立了JobManager的Actor焕济,并建立了一個(gè)JobManager process reaper來做簡(jiǎn)單的失敗檢測(cè)
-
(*)如果實(shí)在Local模式下的話纷妆,則啟動(dòng)一個(gè)內(nèi)嵌的TaskManager,如果不是晴弃,則需要在另一個(gè)JVM中啟動(dòng)TaskManager掩幢,通過taskmanager.sh腳本來完成
針對(duì)每一個(gè)TaskManager Actor再啟動(dòng)一個(gè)WebMonitor可視化界面
JobManager
- 對(duì)于JobManager與TaskManager的啟動(dòng)與關(guān)閉,會(huì)有三個(gè)環(huán)節(jié)上鞠,preStart际邻,handleMessage與postStop
- 對(duì)于每一個(gè)Actor,必須Override抽象方法handleMessage旗国,也就是Actor針對(duì)收到的message做的業(yè)務(wù)邏輯枯怖,可以選擇Override preStart和postStop方法,做一些啟動(dòng)前準(zhǔn)備與結(jié)束時(shí)的清理
- (關(guān)于Scala的多重繼承能曾,以及菱形繼承的問題我這邊不做過多討論度硝,網(wǎng)上有一些帖子,也可以通過閱讀Scala In Depth來獲得更深入了解寿冕,這對(duì)Java程序員來說是一個(gè)新的概念)
- 首先是啟動(dòng)前準(zhǔn)備preStart
-
啟動(dòng)leaderElectionService蕊程,將JobManager本身作為參數(shù)傳入
這邊值得一提的是設(shè)計(jì)模式,策略模式與觀察者模式驼唱,因?yàn)閘eaderElectionService是一個(gè)Java的接口藻茂,在生產(chǎn)環(huán)境中有非高可用(單點(diǎn)失敗)的Implementation與基于Zookeeper的高可用模式,可以再運(yùn)行時(shí)更改該接口的行為辨赐。JobManager還實(shí)現(xiàn)了LeaderContender接口优俘,實(shí)現(xiàn)了多個(gè)CallBack方法,當(dāng)leaderElectionService被修改時(shí)掀序,會(huì)通知JobManager來調(diào)整帆焕,典型的觀察者模式,適用于在高可用模式下作為Leader的JobManager被更改的情況不恭。
- 接著是啟動(dòng)SubmittedJobGraph服務(wù)叶雹,失敗恢復(fù)服務(wù)與Metrics,這些會(huì)在以后講到
-
在這兒調(diào)用了leaderElectionService换吧,對(duì)高可用模式的解析可能會(huì)在以后補(bǔ)上折晦,現(xiàn)在側(cè)重于Standalone模式下的解析,在StandaloneLeaderElectionService中沾瓦,因?yàn)橹挥幸粋€(gè)JobManager满着,所以直接在start時(shí)調(diào)用LeaderContentder中的callback方法,也就是JobManager的grantLeadership方法
- 在該方法中向JobManager Actor本身發(fā)送了一條消息暴拄,從而在handleMessage中進(jìn)行接收處理(漓滔!在Scala的Akka包中是發(fā)送消息的方法)
-
- handleMessage(啟動(dòng)相關(guān))
-
根據(jù)消息的類型進(jìn)行匹配,當(dāng)接收到GrantLeadership的Message后乖篷,會(huì)匹配到如下代碼
首先確認(rèn)身份响驴,再判斷是否為高可用模式,如果是高可用模式還需要發(fā)送恢復(fù)任務(wù)的消息撕蔼,如果不是豁鲤,JobManager的啟動(dòng)已完成
-
TaskManager
- TaskManager Actor創(chuàng)建
- 如果為本地模式,則直接調(diào)用startTaskManagerComponentAndActor方法鲸沮,如果是用腳本啟動(dòng)琳骡,則會(huì)進(jìn)入TaskManager的main函數(shù)
- 對(duì)于standalone模式
-
在main函數(shù)中解析完命令行參數(shù)并生成配置文件對(duì)象后,會(huì)生成resourceID作為身份
-
接下來會(huì)新建一個(gè)Callable對(duì)象并調(diào)用selectNetworkInterfaceAndRunTaskManager方法
-
在selectNetworkInterfaceAndRunTaskManager方法中讼溺,先綁定地址與端口號(hào)楣号,建立遠(yuǎn)程ActorSystem,然后調(diào)用runTaskManager方法
-
在runTaskManager方法中通過調(diào)用startTaskManagerComponentsAndActor并傳入遠(yuǎn)程ActorSystem怒坯,至此與local模式的啟動(dòng)一致炫狱,區(qū)別在于ActorSystem是與JobManager一致還是遠(yuǎn)程另外一個(gè)ActorSystem,但對(duì)于開發(fā)者來說對(duì)Actor之間的消息傳遞方法并沒有任何區(qū)別
-
-
在startTaskManagerComponentsAndActor創(chuàng)建ioManager剔猿,network视译,leaderRetrievalService等創(chuàng)建TaskManager所需要的參數(shù),通過actorSystem來創(chuàng)建TaskManager Actor
- prestart
-
首先啟動(dòng)leaderRetrievalService归敬,和LeaderElection一樣使用了策略模式來處理是否高可用兩種情況酷含,觀察者模式來接收對(duì)象變化并調(diào)用callback方法
-
這邊同樣關(guān)注Standalone版本的Implementation鄙早,在start中調(diào)用TaskManager的notifyLeaderAddress回調(diào)方法,并將jobManager地址作為參數(shù)傳入
-
在TaskManager實(shí)現(xiàn)的notifyLeaderAddress方法中發(fā)送JobManagerLeaderAddress消息給自己
-
- handleMessage(啟動(dòng)相關(guān))
-
TaskManager Actor一旦接收到該Message要不就是剛啟動(dòng)椅亚,要不就是JobManager的Leader發(fā)生了改變限番,調(diào)用handleJobManagerLeaderAddress函數(shù)
-
在handleJobManagerLeaderAddress函數(shù)中,先斷開連接呀舔,然后出發(fā)TaskManager的注冊(cè)操作
-
TaskManager Actor一旦接收到該Message要不就是剛啟動(dòng)椅亚,要不就是JobManager的Leader發(fā)生了改變限番,調(diào)用handleJobManagerLeaderAddress函數(shù)
TaskManager的注冊(cè)
- 在TaskManager的注冊(cè)中扳缕,設(shè)計(jì)了與JobManager的消息交互,所以單獨(dú)分開來講
- TaskManager中的發(fā)送注冊(cè)請(qǐng)求
- 在handleJobManagerLeaderAddress中觸發(fā)了triggerTaskManagerRegistration注冊(cè)函數(shù)
-
在該函數(shù)中别威,提取超時(shí)信息設(shè)置,以及當(dāng)前嘗試的ID驴剔,清空已經(jīng)在調(diào)度器中應(yīng)該被廢棄的注冊(cè)消息省古,并向自身發(fā)送嘗試次數(shù)為第一次的TriggerTaskManagerRegistration消息
-
因?yàn)門riggerTaskManagerRegistration是在TaskManager Actor接收到RegistrationMessage的子類,所以在接收到該消息時(shí)丧失,根據(jù)RegistrationMessage來匹配豺妓,并調(diào)用handleRegistrationMessage方法
-
匹配到TriggerTaskManagerRegistration消息后,先判斷該消息有沒有失效布讹,如果沒有琳拭,則有三種情況
- 如果已連接成功,寫入日志
- 如果超時(shí)描验,寫入日志并推出
- 除此之外白嘁,進(jìn)行下一次嘗試,向JobManager Actor發(fā)送RegisterTaskManager消息膘流,并在調(diào)度其中注冊(cè)下一次TriggerTaskManagerRegistration的消息的發(fā)送絮缅,知道出現(xiàn)第一種情況注冊(cè)成功或第二種情況注冊(cè)失敗為止
-
JobManager接收到注冊(cè)請(qǐng)求消息
- 根據(jù)消息找到相應(yīng)的TaskManager地址
- 一旦JobManager接收到RegisterTaskManager消息,先想ResourceManager注冊(cè)(這邊不做介紹呼股,這邊的耕魄?是Akka里面發(fā)送ask消息并期望得到一個(gè)返回值),如果Resource注冊(cè)失敗則向發(fā)送ReconnectResourceManager消息進(jìn)行重試
- 如果該TaskManager已經(jīng)注冊(cè)在instanceManager中彭谁,則發(fā)送AlreadyRegistered消息給相應(yīng)的TaskManager
- 如果還未注冊(cè)吸奴,則向instanceManager注冊(cè)該TaskManager,并發(fā)送AcknowledgeRegistration給相應(yīng)的TaskManager
- 出現(xiàn)異常則拒絕注冊(cè)缠局,發(fā)送RefuseRegistration消息
- TaskManager接到返回消息
-
AcknowledgeRegistration注冊(cè)成功则奥,如果isConnected為true則是已連接,判斷該消息是否由當(dāng)前鏈接的JobManager發(fā)送并寫入日志甩鳄,如果未連接逞度,調(diào)用associateWithJobManager進(jìn)行接收消息錢的準(zhǔn)備工作(后續(xù)會(huì)深入解析)
- AlreadyRegistered重復(fù)注冊(cè),寫入日志妙啃,邏輯與 AcknowledgeRegistration消息的處理相同
- RefuseRegistration注冊(cè)失敗档泽,如果JobManager地址存在俊戳,則發(fā)送新的TriggerTaskManagerRegistration, 重復(fù)到TaskManager注冊(cè)部分的福州馆匿,如果沒有地址抑胎,則驗(yàn)證如果發(fā)送消息的JobManager是否是當(dāng)前已連接的JobManager寫入日志,對(duì)結(jié)果沒有影響
-
總結(jié)
至此渐北,JobManager和TaskManager的啟動(dòng)過程以及TaskManager的注冊(cè)過程解析已經(jīng)完成阿逃。解析中沒有辦法做到面面俱到,把自己覺得重要的點(diǎn)挑了出來赃蛛,主要是能再時(shí)間上形成一個(gè)線恃锉,方便理解。
下面還有一個(gè)根據(jù)時(shí)間線來做的思維導(dǎo)圖呕臂,側(cè)重于Local模式下的啟動(dòng)破托,虛線代表調(diào)用或者是消息。
附錄
Akka學(xué)習(xí)資料: