上篇文章詳細(xì)解析了Receiver不斷接收數(shù)據(jù)的過(guò)程浩销,在Receiver接收數(shù)據(jù)的過(guò)程中會(huì)將數(shù)據(jù)的元信息發(fā)送給ReceiverTracker:
本文將詳細(xì)解析ReceiverTracker的的架構(gòu)設(shè)計(jì)和具體實(shí)現(xiàn)
一哮独、ReceiverTracker的主要功能
ReceiverTracker的主要功能有:
1.在Executor上啟動(dòng)Receivers
2.接受Receiver的注冊(cè)
3.借助ReceivedBlockTracker來(lái)管理Receiver接收數(shù)據(jù)的元數(shù)據(jù)
4.接受Receiver發(fā)送的各種消息,并作相應(yīng)處理
5.更新Receiver接收數(shù)據(jù)的速率(也就是限流)
6.不斷的等待Receivers的運(yùn)行狀態(tài),只要Receivers停止運(yùn)行录豺,就重新啟動(dòng)Receiver瞳浦。也就是Receiver的容錯(cuò)功能。
7.停止Receivers
8.匯報(bào)Receiver發(fā)送過(guò)來(lái)的錯(cuò)誤信息
二砍濒、ReceiverTracker具體功能詳解
2.1 啟動(dòng)receiver并管理receiver接收數(shù)據(jù)的元數(shù)據(jù)
首先淋肾,ReceiverTracker內(nèi)部有一個(gè)ReceiverTrackerEndPoint通訊體endpoint變量,endpoint用來(lái)和Receiver和ReceiverTracker本身進(jìn)行消息通訊爸邢。這個(gè)ReceiverTrackerEndPoint通訊體在ReceiverTracker啟動(dòng)時(shí)被初始化:
ReceiverTracker啟動(dòng)Receiver時(shí)候巫员,向ReceiverTrackerEndPoint通訊體endpoint變量發(fā)送了StartAllReceivers(receivers)消息:
Receiver啟動(dòng)后會(huì)向ReceiverTracker注冊(cè),告訴ReceiverTracker自己?jiǎn)?dòng)成功:
代碼中的trackerEndpoint就是ReceiverTracker中ReceiverTrackerEndPoint通訊體endpoint的引用甲棍。
Receiver會(huì)不斷將接收的數(shù)據(jù)封裝成Block简识,并將這些Block推送給BlockManager管理赶掖,在將這些Block推送給BlockManager之后,ReceiverSupervisor會(huì)將Block的元信息發(fā)送給ReceiverTracker的endpoint:
可以看到ReceiverSupervisor向ReceiverTracker的endpoint發(fā)送了AddBlock(blockInfo)消息:
ReceiverTracker收到AddBlock(blockInfo)消息后七扰,會(huì)啟動(dòng)一個(gè)線(xiàn)程進(jìn)行處理:
ReceiverTracker收到AddBlock(blockInfo)消息后奢赂,調(diào)用了addBlock(receiveedBlockInfo)方法進(jìn)行處理,下面是addBlock的源碼:
這里其實(shí)調(diào)用了receivedBlockTracker的addBlock方法颈走,receivedBlockTracker是ReceivedBlockTracker對(duì)象膳灶,它是在ReceiverTracker實(shí)例化時(shí)候被創(chuàng)建:
下面看一下ReceivedBlockTracker的addBlock方法:
可以看到ReceivedBlockTracker的addBlock方法將block的元信息添加到了一個(gè)隊(duì)隊(duì)列中,最終是添加到一個(gè)叫做streamIdToUnallocatedBlockQueues的HashMap中立由,其中key是streamId轧钓,值是該streamid對(duì)應(yīng)的block隊(duì)列。
2.2 為Batch分配Block
當(dāng)spark streaming應(yīng)用程序動(dòng)態(tài)生成job的時(shí)候锐膜,JobGenerator會(huì)調(diào)用generateJobs方法毕箍,在該方法中會(huì)為批處理分配已經(jīng)接收的Block
這里調(diào)用了jobScheduler中receiverTracker的allocatedBlockToBatch方法,這里的receiverTracker就是ReceiverTracker對(duì)象道盏,下面看一下該方法的實(shí)現(xiàn):
可以看到而柑,最終調(diào)用了ReceivedBlockTracker的allocatedBlockToBatch方法:
這里先根據(jù)streamId,從streamIdToUnallocatedBlockQueues中取出接收到的block隊(duì)列荷逞,并將streamId和block隊(duì)列封裝成AllocatedBlocks媒咳,最后根據(jù)batchTime將其對(duì)應(yīng)的AllocatedBlocks對(duì)象加入timeToAllocatedBlocks中,timeToAllocatedBlocks是一個(gè)HashMap:
這樣Batch的Block就分配完成种远。
2.3 ReceiverTracker處理的其他消息
ReceiverTracker中ReceiverTrackerEndpoint的receive方法定義了各種消息的處理邏輯:
(1) 收到StartAllReceivers(receivers)消息后涩澡,ReceiverTracker會(huì)為receivers分配executor,并在executor上啟動(dòng)相應(yīng)的receiver
(2)當(dāng)ReceiverTracker監(jiān)控到receiver退出返回時(shí),會(huì)給ReceiverTrackerEndpoint發(fā)送RestartTracker(receiver)消息坠敷。收到該消息后妙同,會(huì)重新為receiver分配executor啟動(dòng)receiver(如果原來(lái)的executor運(yùn)行正常就在原先的executor上重新啟動(dòng),否則重新調(diào)度executor)常拓。
(3)當(dāng)Spark Streaming 的job結(jié)束后渐溶,JobScheduler會(huì)調(diào)用handleJobCompletion方法,最終會(huì)調(diào)用cleanupOldBlocksAndBatches方法給endpoint發(fā)送CleanupOldBlocks消息:
收到該消息后弄抬,會(huì)被路由到Receiver 進(jìn)行Block的清理茎辐。
(4)UpdateReceiverRateLimit消息
收到UpdateReceiverRateLimit消息后,會(huì)將其路由到receiver掂恕,當(dāng)receiver收到該消息后會(huì)調(diào)用BlockGenerator的update方法更新Block生成速率拖陆。