揭開Spark Streaming神秘面紗② - ReceiverTracker 與數(shù)據(jù)導(dǎo)入

Spark Streaming 在數(shù)據(jù)接收與導(dǎo)入方面需要滿足有以下三個特點:

  1. 兼容眾多輸入源,包括HDFS, Flume, Kafka, Twitter and ZeroMQ厉亏。還可以自定義數(shù)據(jù)源
  2. 要能為每個 batch 的 RDD 提供相應(yīng)的輸入數(shù)據(jù)
  3. 為適應(yīng) 7*24h 不間斷運行董习,要有接收數(shù)據(jù)掛掉的容錯機制

有容乃大,兼容眾多數(shù)據(jù)源

在文章DStreamGraph 與 DStream DAG中爱只,我們提到

InputDStream是所有 input streams(數(shù)據(jù)輸入流) 的虛基類皿淋。該類提供了 start() 和 stop()方法供 streaming 系統(tǒng)來開始和停止接收數(shù)據(jù)。那些只需要在 driver 端接收數(shù)據(jù)并轉(zhuǎn)成 RDD 的 input streams 可以直接繼承 InputDStream恬试,例如 FileInputDStream是 InputDStream 的子類窝趣,它監(jiān)控一個 HDFS 目錄并將新文件轉(zhuǎn)成RDDs。而那些需要在 workers 上運行receiver 來接收數(shù)據(jù)的 Input DStream训柴,需要繼承 ReceiverInputDStream哑舒,比如 KafkaReceiver

只需在 driver 端接收數(shù)據(jù)的 input stream 一般比較簡單且在生產(chǎn)環(huán)境中使用的比較少,本文不作分析幻馁,只分析繼承了 ReceiverInputDStream 的 input stream 是如何導(dǎo)入數(shù)據(jù)的洗鸵。

ReceiverInputDStream有一個def getReceiver(): Receiver[T]方法越锈,每個繼承了ReceiverInputDStream的 input stream 都必須實現(xiàn)這個方法。該方法用來獲取將要分發(fā)到各個 worker 節(jié)點上用來接收數(shù)據(jù)的 receiver(接收器)膘滨。不同的 ReceiverInputDStream 子類都有它們對應(yīng)的不同的 receiver瞪浸,如KafkaInputDStream對應(yīng)KafkaReceiver,F(xiàn)lumeInputDStream對應(yīng)FlumeReceiver吏祸,TwitterInputDStream對應(yīng)TwitterReceiver对蒲,如果你要實現(xiàn)自己的數(shù)據(jù)源,也需要定義相應(yīng)的 receiver贡翘。

繼承 ReceiverInputDStream 并定義相應(yīng)的 receiver蹈矮,就是 Spark Streaming 能兼容眾多數(shù)據(jù)源的原因。

為每個 batch 的 RDD 提供輸入數(shù)據(jù)

在 StreamingContext 中鸣驱,有一個重要的組件叫做 ReceiverTracker泛鸟,它是 Spark Streaming 作業(yè)調(diào)度器 JobScheduler 的成員,負(fù)責(zé)啟動踊东、管理各個 receiver 及管理各個 receiver 接收到的數(shù)據(jù)北滥。

確定 receiver 要分發(fā)到哪些 executors 上執(zhí)行

創(chuàng)建 ReceiverTracker 實例

我們來看 StreamingContext#start() 方法部分調(diào)用實現(xiàn),如下:

可以看到闸翅,StreamingContext#start() 會調(diào)用 JobScheduler#start() 方法再芋,在 JobScheduler#start() 中,會創(chuàng)建一個新的 ReceiverTracker 實例 receiverTracker坚冀,并調(diào)用其 start() 方法济赎。

ReceiverTracker#start()

繼續(xù)跟進(jìn) ReceiverTracker#start(),如下圖记某,它主要做了兩件事:

  1. 初始化一個 endpoint: ReceiverTrackerEndpoint司训,用來接收和處理來自 ReceiverTracker 和 receivers 發(fā)送的消息
  2. 調(diào)用 launchReceivers 來自將各個 receivers 分發(fā)到 executors 上

ReceiverTracker#launchReceivers()

繼續(xù)跟進(jìn) launchReceivers,它也主要干了兩件事:

  1. 獲取 DStreamGraph.inputStreams 中繼承了 ReceiverInputDStream 的 input streams 的 receivers液南。也就是數(shù)據(jù)接收器
  2. 給消息接收處理器 endpoint 發(fā)送 StartAllReceivers(receivers)消息壳猜。直接返回,不等待消息被處理

處理StartAllReceivers消息

endpoint 在接收到消息后滑凉,會先判斷消息類型统扳,對不同的消息做不同處理。對于StartAllReceivers消息譬涡,處理流程如下:

  1. 計算每個 receiver 要分發(fā)的目的 executors闪幽。遵循兩條原則:
    • 將 receiver 分布的盡量均勻
    • 如果 receiver 的preferredLocation本身不均勻,以preferredLocation為準(zhǔn)
  2. 遍歷每個 receiver涡匀,根據(jù)第1步中得到的目的 executors 調(diào)用 startReceiver 方法

到這里,已經(jīng)確定了每個 receiver 要分發(fā)到哪些 executors 上

啟動 receivers

接上溉知,通過 ReceiverTracker#startReceiver(receiver: Receiver[_], scheduledExecutors: Seq[String]) 來啟動 receivers陨瘩,我們來看具體流程:

如上流程圖所述腕够,分發(fā)和啟動 receiver 的方式不可謂不精彩。其中舌劳,startReceiverFunc 函數(shù)主要實現(xiàn)如下:

val supervisor = new ReceiverSupervisorImpl(
  receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()

supervisor.start() 中會調(diào)用 receiver#onStart 后立即返回帚湘。receiver#onStart 一般自行新建線程或線程池來接收數(shù)據(jù),比如在 KafkaReceiver 中甚淡,就新建了線程池大诸,在線程池中接收 topics 的數(shù)據(jù)。
supervisor.start() 返回后贯卦,由 supervisor.awaitTermination() 阻塞住線程资柔,以讓這個 task 一直不退出,從而可以源源不斷接收數(shù)據(jù)撵割。

數(shù)據(jù)流轉(zhuǎn)

上圖為 receiver 接收到的數(shù)據(jù)的流轉(zhuǎn)過程贿堰,讓我們來逐一分析

Step1: Receiver -> ReceiverSupervisor

這一步中,Receiver 將接收到的數(shù)據(jù)源源不斷地傳給 ReceiverSupervisor啡彬。Receiver 調(diào)用其 store(...) 方法羹与,store 方法中繼續(xù)調(diào)用 supervisor.pushSingle 或 supervisor.pushArrayBuffer 等方法來傳遞數(shù)據(jù)。Receiver#store 有多重形式庶灿, ReceiverSupervisor 也有 pushSingle纵搁、pushArrayBuffer、pushIterator往踢、pushBytes 方法與不同的 store 對應(yīng)诡渴。

  • pushSingle: 對應(yīng)單條小數(shù)據(jù)
  • pushArrayBuffer: 對應(yīng)數(shù)組形式的數(shù)據(jù)
  • pushIterator: 對應(yīng) iterator 形式數(shù)據(jù)
  • pushBytes: 對應(yīng) ByteBuffer 形式的塊數(shù)據(jù)

對于細(xì)小的數(shù)據(jù),存儲時需要 BlockGenerator 聚集多條數(shù)據(jù)成一塊菲语,然后再成塊存儲妄辩;反之就不用聚集,直接成塊存儲山上。當(dāng)然眼耀,存儲操作并不在 Step1 中執(zhí)行,只為說明之后不同的操作邏輯佩憾。

Step2.1: ReceiverSupervisor -> BlockManager -> disk/memory

在這一步中哮伟,主要將從 receiver 收到的數(shù)據(jù)以 block(數(shù)據(jù)塊)的形式存儲

存儲 block 的是receivedBlockHandler: ReceivedBlockHandler,根據(jù)參數(shù)spark.streaming.receiver.writeAheadLog.enable配置的不同妄帘,默認(rèn)為 false楞黄,receivedBlockHandler對象對應(yīng)的類也不同,如下:

private val receivedBlockHandler: ReceivedBlockHandler = {
  if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
    //< 先寫 WAL抡驼,再存儲到 executor 的內(nèi)存或硬盤
    new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
      receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
  } else {
    //< 直接存到 executor 的內(nèi)存或硬盤
    new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
  }
}

啟動 WAL 的好處就是在application 掛掉之后鬼廓,可以恢復(fù)數(shù)據(jù)。

//< 調(diào)用 receivedBlockHandler.storeBlock 方法存儲 block致盟,并得到一個 blockStoreResult
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
//< 使用blockStoreResult初始化一個ReceivedBlockInfo實例
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
//< 發(fā)送消息通知 ReceiverTracker 新增并存儲了 block
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))

不管是 WriteAheadLogBasedBlockHandler 還是 BlockManagerBasedBlockHandler 最終都是通過 BlockManager 將 block 數(shù)據(jù)存儲 execuor 內(nèi)存或磁盤或還有 WAL 方式存入碎税。

這里需要說明的是 streamId尤慰,每個 InputDStream 都有它自己唯一的 id,即 streamId雷蹂,blockInfo包含 streamId 是為了區(qū)分block 是哪個 InputDStream 的數(shù)據(jù)伟端。之后為 batch 分配 blocks 時,需要知道每個 InputDStream 都有哪些未分配的 blocks匪煌。

Step2.2: ReceiverSupervisor -> ReceiverTracker

將 block 存儲之后责蝠,獲得 block 描述信息 blockInfo: ReceivedBlockInfo,這里面包含:streamId萎庭、數(shù)據(jù)位置霜医、數(shù)據(jù)條數(shù)、數(shù)據(jù) size 等信息擎椰。

之后支子,封裝以 block 作為參數(shù)的 AddBlock(blockInfo) 消息并發(fā)送給 ReceiverTracker 以通知其有新增 block 數(shù)據(jù)塊。

Step3: ReceiverTracker -> ReceivedBlockTracker

ReceiverTracker 收到 ReceiverSupervisor 發(fā)來的 AddBlock(blockInfo) 消息后达舒,直接調(diào)用以下代碼將 block 信息傳給 ReceivedBlockTracker:

  private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
    receivedBlockTracker.addBlock(receivedBlockInfo)
  }

receivedBlockTracker.addBlock中值朋,如果啟用了 WAL,會將新增的 block 信息以 WAL 方式保存巩搏。
無論 WAL 是否啟用昨登,都會將新增的 block 信息保存到 streamIdToUnallocatedBlockQueues: mutable.HashMap[Int, ReceivedBlockQueue]中,該變量 key 為 InputDStream 的唯一 id贯底,value 為已存儲未分配的 block 信息丰辣。之后為 batch 分配blocks,會訪問該結(jié)構(gòu)來獲取每個 InputDStream 對應(yīng)的未消費的 blocks禽捆。

總結(jié)

至此笙什,本文描述了:

  • streaming application 如何兼容眾多數(shù)據(jù)源
  • receivers 是如何分發(fā)并啟動的
  • receiver 接收到的數(shù)據(jù)是如何流轉(zhuǎn)的
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市胚想,隨后出現(xiàn)的幾起案子琐凭,更是在濱河造成了極大的恐慌,老刑警劉巖浊服,帶你破解...
    沈念sama閱讀 216,744評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件统屈,死亡現(xiàn)場離奇詭異,居然都是意外死亡牙躺,警方通過查閱死者的電腦和手機愁憔,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,505評論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來孽拷,“玉大人吨掌,你說我怎么就攤上這事。” “怎么了思犁?”我有些...
    開封第一講書人閱讀 163,105評論 0 353
  • 文/不壞的土叔 我叫張陵代虾,是天一觀的道長进肯。 經(jīng)常有香客問我激蹲,道長,這世上最難降的妖魔是什么江掩? 我笑而不...
    開封第一講書人閱讀 58,242評論 1 292
  • 正文 為了忘掉前任学辱,我火速辦了婚禮,結(jié)果婚禮上环形,老公的妹妹穿的比我還像新娘策泣。我一直安慰自己,他們只是感情好抬吟,可當(dāng)我...
    茶點故事閱讀 67,269評論 6 389
  • 文/花漫 我一把揭開白布萨咕。 她就那樣靜靜地躺著,像睡著了一般火本。 火紅的嫁衣襯著肌膚如雪危队。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,215評論 1 299
  • 那天钙畔,我揣著相機與錄音茫陆,去河邊找鬼。 笑死擎析,一個胖子當(dāng)著我的面吹牛簿盅,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播揍魂,決...
    沈念sama閱讀 40,096評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼桨醋,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了现斋?” 一聲冷哼從身側(cè)響起喜最,我...
    開封第一講書人閱讀 38,939評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎步责,沒想到半個月后返顺,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,354評論 1 311
  • 正文 獨居荒郊野嶺守林人離奇死亡蔓肯,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,573評論 2 333
  • 正文 我和宋清朗相戀三年遂鹊,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蔗包。...
    茶點故事閱讀 39,745評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡秉扑,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情舟陆,我是刑警寧澤误澳,帶...
    沈念sama閱讀 35,448評論 5 344
  • 正文 年R本政府宣布,位于F島的核電站秦躯,受9級特大地震影響忆谓,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜踱承,卻給世界環(huán)境...
    茶點故事閱讀 41,048評論 3 327
  • 文/蒙蒙 一倡缠、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧茎活,春花似錦昙沦、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,683評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至懒熙,卻和暖如春丘损,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背煌珊。 一陣腳步聲響...
    開封第一講書人閱讀 32,838評論 1 269
  • 我被黑心中介騙來泰國打工号俐, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人定庵。 一個月前我還...
    沈念sama閱讀 47,776評論 2 369
  • 正文 我出身青樓吏饿,卻偏偏與公主長得像,于是被迫代替她去往敵國和親蔬浙。 傳聞我的和親對象是個殘疾皇子猪落,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,652評論 2 354

推薦閱讀更多精彩內(nèi)容