Apache Flink 源碼解析(二)(Legacy mode已廢棄)系統(tǒng)架構(gòu), 啟動(dòng)與注冊(cè)

概述

這篇文章側(cè)重于分析JobManager和TaskManager的啟動(dòng)過程以及注冊(cè)相味,還有Flink的implementation中所用到的設(shè)計(jì)模式越平。本文從本地與standalone模式進(jìn)行解析识虚。

Akka 簡(jiǎn)介

  • 因?yàn)榻M件之間的信息傳遞是通過Akka工具包幅慌,所以在這兒我做一個(gè)簡(jiǎn)單的解釋
  • 首先參考Akka官方文檔給出的一個(gè)抽象的圖


    image
  • 如圖就是對(duì)ActorSystem的一個(gè)高度抽象,所有的Actor成樹狀蛤织,user actor的子孫就是用戶創(chuàng)建的Actor,system下面是Akka的監(jiān)管與支持的Actor鸿染,往往不需要用戶過多參與
  • Actor之間如果持有對(duì)方的ActorRef則可以向?qū)Ψ桨l(fā)送消息
  • 父Actor負(fù)責(zé)監(jiān)管子Actor拋出的異常能被父Actor處理指蚜,則父Actor可以重啟或者廢棄它,如果不能處理涨椒,會(huì)繼續(xù)向上拋異常摊鸡。一個(gè)Actor如果推出,那么它的所有子Actor都會(huì)退出蚕冬。
  • 它們共同組成了一個(gè)ActorSystem

Flink的架構(gòu)

  • 首先上一張從Flink官方文檔拿來的架構(gòu)圖


    process-model.png
  • 這次我會(huì)從JobManager和TaskManager著手來解析Flink的啟動(dòng)過程

  • Flink中JobManager與TaskManager免猾,JobManager與Client的交互是基于Akka工具包的,是通過消息驅(qū)動(dòng)囤热,這樣就把中心放到了消息的接收猎提,發(fā)送與處理,而且由于對(duì)每個(gè)Akka中的Actor來說旁蔼,消息是同步的锨苏,排在一個(gè)隊(duì)列中,極大地簡(jiǎn)化了多線程程序設(shè)計(jì)的復(fù)雜度棺聊,關(guān)于Akka的一些學(xué)習(xí)資料我會(huì)放在文章最后

  • 下面的啟動(dòng)過程我會(huì)分為入口伞租,ActorSystem的創(chuàng)建,JobManager的啟動(dòng)限佩,TaskManager的啟動(dòng)和注冊(cè)這幾塊來講

    入口

    • 如果看過該系列上一篇文章葵诈,應(yīng)該知道怎么找到在Local和Standalone模式下程序的入口

    • 這里我直接給出來,就是org.apache.flink.runtime.jobmanager 中的main方法

    • main方法主要可以分為,一是啟動(dòng)環(huán)境的準(zhǔn)備


      JobManager Startup.png
    • 二是處理輸入的命令行參數(shù)驯击,并準(zhǔn)備好包含配置信息的Configuration對(duì)象,以及host地址與端口號(hào)耐亏,還有運(yùn)行模式


      configuration prepare.png
    • 三是新建一個(gè)Callable對(duì)象徊都,在另一個(gè)線程上運(yùn)行runJobManager方法


      mainRunJobManager.png
    • runJobManager有 個(gè)重載方法,第一個(gè)方法中綁定了端口號(hào)并建立了socket鏈接广辰,并調(diào)用第二個(gè)重載方法


      bindport.png
    • 第二個(gè)重載方法根據(jù)本機(jī)硬件情況建立futureExecutor(對(duì)Future類不了解的話可以簡(jiǎn)單理解成Java中帶返回值的Runnable暇矫,細(xì)節(jié)不做討論), ioExecutor择吊,并調(diào)用startActorSystemAndJobManagerActors方法建立ActorSystem并等待結(jié)束


      startActorSystem.png

    ActorSystem創(chuàng)建

    • 在這里Flink做了一個(gè)Akka的工具類來簡(jiǎn)化邏輯李根,本質(zhì)上就是從Configuration配置文件類中提取Akka啟動(dòng)所需配置信息,并根據(jù)配置建立ActorSystem几睛,ActorSystem可以是本機(jī)的房轿,也可以是跨多臺(tái)機(jī)器的,具體邏輯都在AkkaUtils中所森,有興趣可以研究
      startJobManagerSystem.png
    • 建立JobManager的可視化WebMonitor囱持,這里不做介紹

    • 重點(diǎn)出現(xiàn),在這里通過ActorSystem建立了JobManager的Actor焕济,并建立了一個(gè)JobManager process reaper來做簡(jiǎn)單的失敗檢測(cè)


      createJobManager.png
    • (*)如果實(shí)在Local模式下的話纷妆,則啟動(dòng)一個(gè)內(nèi)嵌的TaskManager,如果不是晴弃,則需要在另一個(gè)JVM中啟動(dòng)TaskManager掩幢,通過taskmanager.sh腳本來完成


      localTaskManager.png
    • 針對(duì)每一個(gè)TaskManager Actor再啟動(dòng)一個(gè)WebMonitor可視化界面

    JobManager

    • 對(duì)于JobManager與TaskManager的啟動(dòng)與關(guān)閉,會(huì)有三個(gè)環(huán)節(jié)上鞠,preStart际邻,handleMessage與postStop
    • 對(duì)于每一個(gè)Actor,必須Override抽象方法handleMessage旗国,也就是Actor針對(duì)收到的message做的業(yè)務(wù)邏輯枯怖,可以選擇Override preStart和postStop方法,做一些啟動(dòng)前準(zhǔn)備與結(jié)束時(shí)的清理
    • (關(guān)于Scala的多重繼承能曾,以及菱形繼承的問題我這邊不做過多討論度硝,網(wǎng)上有一些帖子,也可以通過閱讀Scala In Depth來獲得更深入了解寿冕,這對(duì)Java程序員來說是一個(gè)新的概念)
    • 首先是啟動(dòng)前準(zhǔn)備preStart
      • 啟動(dòng)leaderElectionService蕊程,將JobManager本身作為參數(shù)傳入


        startLeaderElectionService.png

        這邊值得一提的是設(shè)計(jì)模式,策略模式與觀察者模式驼唱,因?yàn)閘eaderElectionService是一個(gè)Java的接口藻茂,在生產(chǎn)環(huán)境中有非高可用(單點(diǎn)失敗)的Implementation與基于Zookeeper的高可用模式,可以再運(yùn)行時(shí)更改該接口的行為辨赐。JobManager還實(shí)現(xiàn)了LeaderContender接口优俘,實(shí)現(xiàn)了多個(gè)CallBack方法,當(dāng)leaderElectionService被修改時(shí)掀序,會(huì)通知JobManager來調(diào)整帆焕,典型的觀察者模式,適用于在高可用模式下作為Leader的JobManager被更改的情況不恭。

      • 接著是啟動(dòng)SubmittedJobGraph服務(wù)叶雹,失敗恢復(fù)服務(wù)與Metrics,這些會(huì)在以后講到
      • 在這兒調(diào)用了leaderElectionService换吧,對(duì)高可用模式的解析可能會(huì)在以后補(bǔ)上折晦,現(xiàn)在側(cè)重于Standalone模式下的解析,在StandaloneLeaderElectionService中沾瓦,因?yàn)橹挥幸粋€(gè)JobManager满着,所以直接在start時(shí)調(diào)用LeaderContentder中的callback方法,也就是JobManager的grantLeadership方法


        grantLeadership Method.png
      • 在該方法中向JobManager Actor本身發(fā)送了一條消息暴拄,從而在handleMessage中進(jìn)行接收處理(漓滔!在Scala的Akka包中是發(fā)送消息的方法)
    • handleMessage(啟動(dòng)相關(guān))
      • 根據(jù)消息的類型進(jìn)行匹配,當(dāng)接收到GrantLeadership的Message后乖篷,會(huì)匹配到如下代碼


        handleGrantLeadershipMessage.png

        首先確認(rèn)身份响驴,再判斷是否為高可用模式,如果是高可用模式還需要發(fā)送恢復(fù)任務(wù)的消息撕蔼,如果不是豁鲤,JobManager的啟動(dòng)已完成

    TaskManager

    • TaskManager Actor創(chuàng)建
      • 如果為本地模式,則直接調(diào)用startTaskManagerComponentAndActor方法鲸沮,如果是用腳本啟動(dòng)琳骡,則會(huì)進(jìn)入TaskManager的main函數(shù)
      • 對(duì)于standalone模式
        • 在main函數(shù)中解析完命令行參數(shù)并生成配置文件對(duì)象后,會(huì)生成resourceID作為身份


          resourceIDGenerate.png
        • 接下來會(huì)新建一個(gè)Callable對(duì)象并調(diào)用selectNetworkInterfaceAndRunTaskManager方法


          selectNetworkAndRunTaskManager.png
        • 在selectNetworkInterfaceAndRunTaskManager方法中讼溺,先綁定地址與端口號(hào)楣号,建立遠(yuǎn)程ActorSystem,然后調(diào)用runTaskManager方法


          runTaskManager1.png
        • 在runTaskManager方法中通過調(diào)用startTaskManagerComponentsAndActor并傳入遠(yuǎn)程ActorSystem怒坯,至此與local模式的啟動(dòng)一致炫狱,區(qū)別在于ActorSystem是與JobManager一致還是遠(yuǎn)程另外一個(gè)ActorSystem,但對(duì)于開發(fā)者來說對(duì)Actor之間的消息傳遞方法并沒有任何區(qū)別


          startTaskManagerComponentsAndActorRemote.png
      • 在startTaskManagerComponentsAndActor創(chuàng)建ioManager剔猿,network视译,leaderRetrievalService等創(chuàng)建TaskManager所需要的參數(shù),通過actorSystem來創(chuàng)建TaskManager Actor


        createTaskManager.png
    • prestart
      • 首先啟動(dòng)leaderRetrievalService归敬,和LeaderElection一樣使用了策略模式來處理是否高可用兩種情況酷含,觀察者模式來接收對(duì)象變化并調(diào)用callback方法


        startLeaderRetrievalService.png
      • 這邊同樣關(guān)注Standalone版本的Implementation鄙早,在start中調(diào)用TaskManager的notifyLeaderAddress回調(diào)方法,并將jobManager地址作為參數(shù)傳入


        ldeaderRetrievalServiceStart.png
      • 在TaskManager實(shí)現(xiàn)的notifyLeaderAddress方法中發(fā)送JobManagerLeaderAddress消息給自己


        notifyLeaderAddress.png
    • handleMessage(啟動(dòng)相關(guān))
      • TaskManager Actor一旦接收到該Message要不就是剛啟動(dòng)椅亚,要不就是JobManager的Leader發(fā)生了改變限番,調(diào)用handleJobManagerLeaderAddress函數(shù)
        handleGrantLeadershipMessage.png
      • 在handleJobManagerLeaderAddress函數(shù)中,先斷開連接呀舔,然后出發(fā)TaskManager的注冊(cè)操作


        handleJobManagerLeaderAddressMessage.png

    TaskManager的注冊(cè)

    • 在TaskManager的注冊(cè)中扳缕,設(shè)計(jì)了與JobManager的消息交互,所以單獨(dú)分開來講
    • TaskManager中的發(fā)送注冊(cè)請(qǐng)求
      • 在handleJobManagerLeaderAddress中觸發(fā)了triggerTaskManagerRegistration注冊(cè)函數(shù)
      • 在該函數(shù)中别威,提取超時(shí)信息設(shè)置,以及當(dāng)前嘗試的ID驴剔,清空已經(jīng)在調(diào)度器中應(yīng)該被廢棄的注冊(cè)消息省古,并向自身發(fā)送嘗試次數(shù)為第一次的TriggerTaskManagerRegistration消息


        triggerTaskManagerRegistration.png
      • 因?yàn)門riggerTaskManagerRegistration是在TaskManager Actor接收到RegistrationMessage的子類,所以在接收到該消息時(shí)丧失,根據(jù)RegistrationMessage來匹配豺妓,并調(diào)用handleRegistrationMessage方法


        handleRegistrationMessage.png
      • 匹配到TriggerTaskManagerRegistration消息后,先判斷該消息有沒有失效布讹,如果沒有琳拭,則有三種情況


        handleTriggerTaskManagerRegistration.png
      • 如果已連接成功,寫入日志
      • 如果超時(shí)描验,寫入日志并推出
      • 除此之外白嘁,進(jìn)行下一次嘗試,向JobManager Actor發(fā)送RegisterTaskManager消息膘流,并在調(diào)度其中注冊(cè)下一次TriggerTaskManagerRegistration的消息的發(fā)送絮缅,知道出現(xiàn)第一種情況注冊(cè)成功或第二種情況注冊(cè)失敗為止
    • JobManager接收到注冊(cè)請(qǐng)求消息


      handleRegisterTaskManager.png
      • 根據(jù)消息找到相應(yīng)的TaskManager地址
      • 一旦JobManager接收到RegisterTaskManager消息,先想ResourceManager注冊(cè)(這邊不做介紹呼股,這邊的耕魄?是Akka里面發(fā)送ask消息并期望得到一個(gè)返回值),如果Resource注冊(cè)失敗則向發(fā)送ReconnectResourceManager消息進(jìn)行重試
      • 如果該TaskManager已經(jīng)注冊(cè)在instanceManager中彭谁,則發(fā)送AlreadyRegistered消息給相應(yīng)的TaskManager
      • 如果還未注冊(cè)吸奴,則向instanceManager注冊(cè)該TaskManager,并發(fā)送AcknowledgeRegistration給相應(yīng)的TaskManager
      • 出現(xiàn)異常則拒絕注冊(cè)缠局,發(fā)送RefuseRegistration消息
    • TaskManager接到返回消息
      • AcknowledgeRegistration注冊(cè)成功则奥,如果isConnected為true則是已連接,判斷該消息是否由當(dāng)前鏈接的JobManager發(fā)送并寫入日志甩鳄,如果未連接逞度,調(diào)用associateWithJobManager進(jìn)行接收消息錢的準(zhǔn)備工作(后續(xù)會(huì)深入解析)


        ackRegistration.png
      • AlreadyRegistered重復(fù)注冊(cè),寫入日志妙啃,邏輯與 AcknowledgeRegistration消息的處理相同
      • RefuseRegistration注冊(cè)失敗档泽,如果JobManager地址存在俊戳,則發(fā)送新的TriggerTaskManagerRegistration, 重復(fù)到TaskManager注冊(cè)部分的福州馆匿,如果沒有地址抑胎,則驗(yàn)證如果發(fā)送消息的JobManager是否是當(dāng)前已連接的JobManager寫入日志,對(duì)結(jié)果沒有影響

總結(jié)

至此渐北,JobManager和TaskManager的啟動(dòng)過程以及TaskManager的注冊(cè)過程解析已經(jīng)完成阿逃。解析中沒有辦法做到面面俱到,把自己覺得重要的點(diǎn)挑了出來赃蛛,主要是能再時(shí)間上形成一個(gè)線恃锉,方便理解。
下面還有一個(gè)根據(jù)時(shí)間線來做的思維導(dǎo)圖呕臂,側(cè)重于Local模式下的啟動(dòng)破托,虛線代表調(diào)用或者是消息。


Flink start up.jpg

附錄
Akka學(xué)習(xí)資料:

  1. Akka 官方文檔 link
  2. Akka 手冊(cè)翻譯 link 翻譯有一些晦澀
  3. Akka 系列博客 link
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末歧蒋,一起剝皮案震驚了整個(gè)濱河市土砂,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌谜洽,老刑警劉巖萝映,帶你破解...
    沈念sama閱讀 207,113評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異阐虚,居然都是意外死亡序臂,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,644評(píng)論 2 381
  • 文/潘曉璐 我一進(jìn)店門实束,熙熙樓的掌柜王于貴愁眉苦臉地迎上來贸宏,“玉大人,你說我怎么就攤上這事磕洪】粤罚” “怎么了?”我有些...
    開封第一講書人閱讀 153,340評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵析显,是天一觀的道長鲫咽。 經(jīng)常有香客問我,道長谷异,這世上最難降的妖魔是什么分尸? 我笑而不...
    開封第一講書人閱讀 55,449評(píng)論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮歹嘹,結(jié)果婚禮上箩绍,老公的妹妹穿的比我還像新娘。我一直安慰自己尺上,他們只是感情好材蛛,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,445評(píng)論 5 374
  • 文/花漫 我一把揭開白布圆到。 她就那樣靜靜地躺著,像睡著了一般卑吭。 火紅的嫁衣襯著肌膚如雪芽淡。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,166評(píng)論 1 284
  • 那天豆赏,我揣著相機(jī)與錄音挣菲,去河邊找鬼。 笑死掷邦,一個(gè)胖子當(dāng)著我的面吹牛白胀,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播抚岗,決...
    沈念sama閱讀 38,442評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼纹笼,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了苟跪?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,105評(píng)論 0 261
  • 序言:老撾萬榮一對(duì)情侶失蹤蔓涧,失蹤者是張志新(化名)和其女友劉穎件已,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體元暴,經(jīng)...
    沈念sama閱讀 43,601評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡篷扩,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,066評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了茉盏。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片鉴未。...
    茶點(diǎn)故事閱讀 38,161評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖鸠姨,靈堂內(nèi)的尸體忽然破棺而出铜秆,到底是詐尸還是另有隱情,我是刑警寧澤讶迁,帶...
    沈念sama閱讀 33,792評(píng)論 4 323
  • 正文 年R本政府宣布连茧,位于F島的核電站,受9級(jí)特大地震影響巍糯,放射性物質(zhì)發(fā)生泄漏啸驯。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,351評(píng)論 3 307
  • 文/蒙蒙 一祟峦、第九天 我趴在偏房一處隱蔽的房頂上張望宅楞。 院中可真熱鬧杆故,春花似錦、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,352評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至,卻和暖如春惊豺,著一層夾襖步出監(jiān)牢的瞬間旷偿,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,584評(píng)論 1 261
  • 我被黑心中介騙來泰國打工朽褪, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留友题,地道東北人度宦。 一個(gè)月前我還...
    沈念sama閱讀 45,618評(píng)論 2 355
  • 正文 我出身青樓划鸽,卻偏偏與公主長得像丈冬,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子实夹,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,916評(píng)論 2 344

推薦閱讀更多精彩內(nèi)容