11.Spark Streaming源碼解讀之Driver中的ReceiverTracker架構(gòu)設(shè)計(jì)以及具體實(shí)現(xiàn)徹底研究

上篇文章詳細(xì)解析了Receiver不斷接收數(shù)據(jù)的過(guò)程浩销,在Receiver接收數(shù)據(jù)的過(guò)程中會(huì)將數(shù)據(jù)的元信息發(fā)送給ReceiverTracker:

本文將詳細(xì)解析ReceiverTracker的的架構(gòu)設(shè)計(jì)和具體實(shí)現(xiàn)

一哮独、ReceiverTracker的主要功能

ReceiverTracker的主要功能有:

1.在Executor上啟動(dòng)Receivers

2.接受Receiver的注冊(cè)

3.借助ReceivedBlockTracker來(lái)管理Receiver接收數(shù)據(jù)的元數(shù)據(jù)

4.接受Receiver發(fā)送的各種消息,并作相應(yīng)處理

5.更新Receiver接收數(shù)據(jù)的速率(也就是限流)

6.不斷的等待Receivers的運(yùn)行狀態(tài),只要Receivers停止運(yùn)行录豺,就重新啟動(dòng)Receiver瞳浦。也就是Receiver的容錯(cuò)功能。

7.停止Receivers

8.匯報(bào)Receiver發(fā)送過(guò)來(lái)的錯(cuò)誤信息

二砍濒、ReceiverTracker具體功能詳解

2.1 啟動(dòng)receiver并管理receiver接收數(shù)據(jù)的元數(shù)據(jù)

首先淋肾,ReceiverTracker內(nèi)部有一個(gè)ReceiverTrackerEndPoint通訊體endpoint變量,endpoint用來(lái)和Receiver和ReceiverTracker本身進(jìn)行消息通訊爸邢。這個(gè)ReceiverTrackerEndPoint通訊體在ReceiverTracker啟動(dòng)時(shí)被初始化:

ReceiverTracker啟動(dòng)Receiver時(shí)候巫员,向ReceiverTrackerEndPoint通訊體endpoint變量發(fā)送了StartAllReceivers(receivers)消息:

Receiver啟動(dòng)后會(huì)向ReceiverTracker注冊(cè),告訴ReceiverTracker自己?jiǎn)?dòng)成功:

代碼中的trackerEndpoint就是ReceiverTracker中ReceiverTrackerEndPoint通訊體endpoint的引用甲棍。

Receiver會(huì)不斷將接收的數(shù)據(jù)封裝成Block简识,并將這些Block推送給BlockManager管理赶掖,在將這些Block推送給BlockManager之后,ReceiverSupervisor會(huì)將Block的元信息發(fā)送給ReceiverTracker的endpoint:

可以看到ReceiverSupervisor向ReceiverTracker的endpoint發(fā)送了AddBlock(blockInfo)消息:

ReceiverTracker收到AddBlock(blockInfo)消息后七扰,會(huì)啟動(dòng)一個(gè)線(xiàn)程進(jìn)行處理:

ReceiverTracker收到AddBlock(blockInfo)消息后奢赂,調(diào)用了addBlock(receiveedBlockInfo)方法進(jìn)行處理,下面是addBlock的源碼:

這里其實(shí)調(diào)用了receivedBlockTracker的addBlock方法颈走,receivedBlockTracker是ReceivedBlockTracker對(duì)象膳灶,它是在ReceiverTracker實(shí)例化時(shí)候被創(chuàng)建:

下面看一下ReceivedBlockTracker的addBlock方法:

可以看到ReceivedBlockTracker的addBlock方法將block的元信息添加到了一個(gè)隊(duì)隊(duì)列中,最終是添加到一個(gè)叫做streamIdToUnallocatedBlockQueues的HashMap中立由,其中key是streamId轧钓,值是該streamid對(duì)應(yīng)的block隊(duì)列。

2.2 為Batch分配Block

當(dāng)spark streaming應(yīng)用程序動(dòng)態(tài)生成job的時(shí)候锐膜,JobGenerator會(huì)調(diào)用generateJobs方法毕箍,在該方法中會(huì)為批處理分配已經(jīng)接收的Block

這里調(diào)用了jobScheduler中receiverTracker的allocatedBlockToBatch方法,這里的receiverTracker就是ReceiverTracker對(duì)象道盏,下面看一下該方法的實(shí)現(xiàn):

可以看到而柑,最終調(diào)用了ReceivedBlockTracker的allocatedBlockToBatch方法:

這里先根據(jù)streamId,從streamIdToUnallocatedBlockQueues中取出接收到的block隊(duì)列荷逞,并將streamId和block隊(duì)列封裝成AllocatedBlocks媒咳,最后根據(jù)batchTime將其對(duì)應(yīng)的AllocatedBlocks對(duì)象加入timeToAllocatedBlocks中,timeToAllocatedBlocks是一個(gè)HashMap:

這樣Batch的Block就分配完成种远。

2.3 ReceiverTracker處理的其他消息

ReceiverTracker中ReceiverTrackerEndpoint的receive方法定義了各種消息的處理邏輯:

(1) 收到StartAllReceivers(receivers)消息后涩澡,ReceiverTracker會(huì)為receivers分配executor,并在executor上啟動(dòng)相應(yīng)的receiver

(2)當(dāng)ReceiverTracker監(jiān)控到receiver退出返回時(shí),會(huì)給ReceiverTrackerEndpoint發(fā)送RestartTracker(receiver)消息坠敷。收到該消息后妙同,會(huì)重新為receiver分配executor啟動(dòng)receiver(如果原來(lái)的executor運(yùn)行正常就在原先的executor上重新啟動(dòng),否則重新調(diào)度executor)常拓。

(3)當(dāng)Spark Streaming 的job結(jié)束后渐溶,JobScheduler會(huì)調(diào)用handleJobCompletion方法,最終會(huì)調(diào)用cleanupOldBlocksAndBatches方法給endpoint發(fā)送CleanupOldBlocks消息:

收到該消息后弄抬,會(huì)被路由到Receiver 進(jìn)行Block的清理茎辐。

(4)UpdateReceiverRateLimit消息

收到UpdateReceiverRateLimit消息后,會(huì)將其路由到receiver掂恕,當(dāng)receiver收到該消息后會(huì)調(diào)用BlockGenerator的update方法更新Block生成速率拖陆。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市懊亡,隨后出現(xiàn)的幾起案子依啰,更是在濱河造成了極大的恐慌,老刑警劉巖店枣,帶你破解...
    沈念sama閱讀 211,042評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件速警,死亡現(xiàn)場(chǎng)離奇詭異叹誉,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)闷旧,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,996評(píng)論 2 384
  • 文/潘曉璐 我一進(jìn)店門(mén)长豁,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人忙灼,你說(shuō)我怎么就攤上這事匠襟。” “怎么了该园?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,674評(píng)論 0 345
  • 文/不壞的土叔 我叫張陵酸舍,是天一觀(guān)的道長(zhǎng)。 經(jīng)常有香客問(wèn)我里初,道長(zhǎng)啃勉,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,340評(píng)論 1 283
  • 正文 為了忘掉前任青瀑,我火速辦了婚禮璧亮,結(jié)果婚禮上萧诫,老公的妹妹穿的比我還像新娘斥难。我一直安慰自己,他們只是感情好帘饶,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,404評(píng)論 5 384
  • 文/花漫 我一把揭開(kāi)白布哑诊。 她就那樣靜靜地躺著,像睡著了一般及刻。 火紅的嫁衣襯著肌膚如雪镀裤。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,749評(píng)論 1 289
  • 那天缴饭,我揣著相機(jī)與錄音暑劝,去河邊找鬼。 笑死颗搂,一個(gè)胖子當(dāng)著我的面吹牛担猛,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播丢氢,決...
    沈念sama閱讀 38,902評(píng)論 3 405
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼傅联,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了疚察?” 一聲冷哼從身側(cè)響起蒸走,我...
    開(kāi)封第一講書(shū)人閱讀 37,662評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎貌嫡,沒(méi)想到半個(gè)月后比驻,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體该溯,經(jīng)...
    沈念sama閱讀 44,110評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,451評(píng)論 2 325
  • 正文 我和宋清朗相戀三年别惦,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了朗伶。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,577評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡步咪,死狀恐怖论皆,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情猾漫,我是刑警寧澤点晴,帶...
    沈念sama閱讀 34,258評(píng)論 4 328
  • 正文 年R本政府宣布,位于F島的核電站悯周,受9級(jí)特大地震影響粒督,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜禽翼,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,848評(píng)論 3 312
  • 文/蒙蒙 一屠橄、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧闰挡,春花似錦锐墙、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,726評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至夺脾,卻和暖如春之拨,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背咧叭。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,952評(píng)論 1 264
  • 我被黑心中介騙來(lái)泰國(guó)打工蚀乔, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人菲茬。 一個(gè)月前我還...
    沈念sama閱讀 46,271評(píng)論 2 360
  • 正文 我出身青樓吉挣,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親生均。 傳聞我的和親對(duì)象是個(gè)殘疾皇子听想,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,452評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容