Spark Streaming 在數(shù)據(jù)接收與導(dǎo)入方面需要滿足有以下三個特點:
- 兼容眾多輸入源,包括HDFS, Flume, Kafka, Twitter and ZeroMQ厉亏。還可以自定義數(shù)據(jù)源
- 要能為每個 batch 的 RDD 提供相應(yīng)的輸入數(shù)據(jù)
- 為適應(yīng) 7*24h 不間斷運行董习,要有接收數(shù)據(jù)掛掉的容錯機制
有容乃大,兼容眾多數(shù)據(jù)源
在文章DStreamGraph 與 DStream DAG中爱只,我們提到
InputDStream是所有 input streams(數(shù)據(jù)輸入流) 的虛基類皿淋。該類提供了 start() 和 stop()方法供 streaming 系統(tǒng)來開始和停止接收數(shù)據(jù)。那些只需要在 driver 端接收數(shù)據(jù)并轉(zhuǎn)成 RDD 的 input streams 可以直接繼承 InputDStream恬试,例如 FileInputDStream是 InputDStream 的子類窝趣,它監(jiān)控一個 HDFS 目錄并將新文件轉(zhuǎn)成RDDs。而那些需要在 workers 上運行receiver 來接收數(shù)據(jù)的 Input DStream训柴,需要繼承 ReceiverInputDStream哑舒,比如 KafkaReceiver
只需在 driver 端接收數(shù)據(jù)的 input stream 一般比較簡單且在生產(chǎn)環(huán)境中使用的比較少,本文不作分析幻馁,只分析繼承了 ReceiverInputDStream 的 input stream 是如何導(dǎo)入數(shù)據(jù)的洗鸵。
ReceiverInputDStream有一個def getReceiver(): Receiver[T]
方法越锈,每個繼承了ReceiverInputDStream的 input stream 都必須實現(xiàn)這個方法。該方法用來獲取將要分發(fā)到各個 worker 節(jié)點上用來接收數(shù)據(jù)的 receiver(接收器)膘滨。不同的 ReceiverInputDStream 子類都有它們對應(yīng)的不同的 receiver瞪浸,如KafkaInputDStream對應(yīng)KafkaReceiver,F(xiàn)lumeInputDStream對應(yīng)FlumeReceiver吏祸,TwitterInputDStream對應(yīng)TwitterReceiver对蒲,如果你要實現(xiàn)自己的數(shù)據(jù)源,也需要定義相應(yīng)的 receiver贡翘。
繼承 ReceiverInputDStream 并定義相應(yīng)的 receiver蹈矮,就是 Spark Streaming 能兼容眾多數(shù)據(jù)源的原因。
為每個 batch 的 RDD 提供輸入數(shù)據(jù)
在 StreamingContext 中鸣驱,有一個重要的組件叫做 ReceiverTracker泛鸟,它是 Spark Streaming 作業(yè)調(diào)度器 JobScheduler 的成員,負(fù)責(zé)啟動踊东、管理各個 receiver 及管理各個 receiver 接收到的數(shù)據(jù)北滥。
確定 receiver 要分發(fā)到哪些 executors 上執(zhí)行
創(chuàng)建 ReceiverTracker 實例
我們來看 StreamingContext#start()
方法部分調(diào)用實現(xiàn),如下:
可以看到闸翅,StreamingContext#start()
會調(diào)用 JobScheduler#start()
方法再芋,在 JobScheduler#start()
中,會創(chuàng)建一個新的 ReceiverTracker 實例 receiverTracker坚冀,并調(diào)用其 start() 方法济赎。
ReceiverTracker#start()
繼續(xù)跟進(jìn) ReceiverTracker#start()
,如下圖记某,它主要做了兩件事:
- 初始化一個 endpoint: ReceiverTrackerEndpoint司训,用來接收和處理來自 ReceiverTracker 和 receivers 發(fā)送的消息
- 調(diào)用 launchReceivers 來自將各個 receivers 分發(fā)到 executors 上
ReceiverTracker#launchReceivers()
繼續(xù)跟進(jìn) launchReceivers,它也主要干了兩件事:
- 獲取 DStreamGraph.inputStreams 中繼承了 ReceiverInputDStream 的 input streams 的 receivers液南。也就是數(shù)據(jù)接收器
- 給消息接收處理器 endpoint 發(fā)送 StartAllReceivers(receivers)消息壳猜。直接返回,不等待消息被處理
處理StartAllReceivers消息
endpoint 在接收到消息后滑凉,會先判斷消息類型统扳,對不同的消息做不同處理。對于StartAllReceivers消息譬涡,處理流程如下:
- 計算每個 receiver 要分發(fā)的目的 executors闪幽。遵循兩條原則:
- 將 receiver 分布的盡量均勻
- 如果 receiver 的preferredLocation本身不均勻,以preferredLocation為準(zhǔn)
- 遍歷每個 receiver涡匀,根據(jù)第1步中得到的目的 executors 調(diào)用 startReceiver 方法
到這里,已經(jīng)確定了每個 receiver 要分發(fā)到哪些 executors 上
啟動 receivers
接上溉知,通過 ReceiverTracker#startReceiver(receiver: Receiver[_], scheduledExecutors: Seq[String])
來啟動 receivers陨瘩,我們來看具體流程:
如上流程圖所述腕够,分發(fā)和啟動 receiver 的方式不可謂不精彩。其中舌劳,startReceiverFunc 函數(shù)主要實現(xiàn)如下:
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
supervisor.start() 中會調(diào)用 receiver#onStart 后立即返回帚湘。receiver#onStart 一般自行新建線程或線程池來接收數(shù)據(jù),比如在 KafkaReceiver 中甚淡,就新建了線程池大诸,在線程池中接收 topics 的數(shù)據(jù)。
supervisor.start() 返回后贯卦,由 supervisor.awaitTermination() 阻塞住線程资柔,以讓這個 task 一直不退出,從而可以源源不斷接收數(shù)據(jù)撵割。
數(shù)據(jù)流轉(zhuǎn)
上圖為 receiver 接收到的數(shù)據(jù)的流轉(zhuǎn)過程贿堰,讓我們來逐一分析
Step1: Receiver -> ReceiverSupervisor
這一步中,Receiver 將接收到的數(shù)據(jù)源源不斷地傳給 ReceiverSupervisor啡彬。Receiver 調(diào)用其 store(...) 方法羹与,store 方法中繼續(xù)調(diào)用 supervisor.pushSingle 或 supervisor.pushArrayBuffer 等方法來傳遞數(shù)據(jù)。Receiver#store 有多重形式庶灿, ReceiverSupervisor 也有 pushSingle纵搁、pushArrayBuffer、pushIterator往踢、pushBytes 方法與不同的 store 對應(yīng)诡渴。
- pushSingle: 對應(yīng)單條小數(shù)據(jù)
- pushArrayBuffer: 對應(yīng)數(shù)組形式的數(shù)據(jù)
- pushIterator: 對應(yīng) iterator 形式數(shù)據(jù)
- pushBytes: 對應(yīng) ByteBuffer 形式的塊數(shù)據(jù)
對于細(xì)小的數(shù)據(jù),存儲時需要 BlockGenerator 聚集多條數(shù)據(jù)成一塊菲语,然后再成塊存儲妄辩;反之就不用聚集,直接成塊存儲山上。當(dāng)然眼耀,存儲操作并不在 Step1 中執(zhí)行,只為說明之后不同的操作邏輯佩憾。
Step2.1: ReceiverSupervisor -> BlockManager -> disk/memory
在這一步中哮伟,主要將從 receiver 收到的數(shù)據(jù)以 block(數(shù)據(jù)塊)的形式存儲
存儲 block 的是receivedBlockHandler: ReceivedBlockHandler
,根據(jù)參數(shù)spark.streaming.receiver.writeAheadLog.enable
配置的不同妄帘,默認(rèn)為 false楞黄,receivedBlockHandler對象對應(yīng)的類也不同,如下:
private val receivedBlockHandler: ReceivedBlockHandler = {
if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
//< 先寫 WAL抡驼,再存儲到 executor 的內(nèi)存或硬盤
new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
} else {
//< 直接存到 executor 的內(nèi)存或硬盤
new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
}
}
啟動 WAL 的好處就是在application 掛掉之后鬼廓,可以恢復(fù)數(shù)據(jù)。
//< 調(diào)用 receivedBlockHandler.storeBlock 方法存儲 block致盟,并得到一個 blockStoreResult
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
//< 使用blockStoreResult初始化一個ReceivedBlockInfo實例
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
//< 發(fā)送消息通知 ReceiverTracker 新增并存儲了 block
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
不管是 WriteAheadLogBasedBlockHandler 還是 BlockManagerBasedBlockHandler 最終都是通過 BlockManager 將 block 數(shù)據(jù)存儲 execuor 內(nèi)存或磁盤或還有 WAL 方式存入碎税。
這里需要說明的是 streamId尤慰,每個 InputDStream 都有它自己唯一的 id,即 streamId雷蹂,blockInfo包含 streamId 是為了區(qū)分block 是哪個 InputDStream 的數(shù)據(jù)伟端。之后為 batch 分配 blocks 時,需要知道每個 InputDStream 都有哪些未分配的 blocks匪煌。
Step2.2: ReceiverSupervisor -> ReceiverTracker
將 block 存儲之后责蝠,獲得 block 描述信息 blockInfo: ReceivedBlockInfo
,這里面包含:streamId萎庭、數(shù)據(jù)位置霜医、數(shù)據(jù)條數(shù)、數(shù)據(jù) size 等信息擎椰。
之后支子,封裝以 block 作為參數(shù)的 AddBlock(blockInfo)
消息并發(fā)送給 ReceiverTracker 以通知其有新增 block 數(shù)據(jù)塊。
Step3: ReceiverTracker -> ReceivedBlockTracker
ReceiverTracker 收到 ReceiverSupervisor 發(fā)來的 AddBlock(blockInfo)
消息后达舒,直接調(diào)用以下代碼將 block 信息傳給 ReceivedBlockTracker:
private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
receivedBlockTracker.addBlock(receivedBlockInfo)
}
receivedBlockTracker.addBlock
中值朋,如果啟用了 WAL,會將新增的 block 信息以 WAL 方式保存巩搏。
無論 WAL 是否啟用昨登,都會將新增的 block 信息保存到 streamIdToUnallocatedBlockQueues: mutable.HashMap[Int, ReceivedBlockQueue]
中,該變量 key 為 InputDStream 的唯一 id贯底,value 為已存儲未分配的 block 信息丰辣。之后為 batch 分配blocks,會訪問該結(jié)構(gòu)來獲取每個 InputDStream 對應(yīng)的未消費的 blocks禽捆。
總結(jié)
至此笙什,本文描述了:
- streaming application 如何兼容眾多數(shù)據(jù)源
- receivers 是如何分發(fā)并啟動的
- receiver 接收到的數(shù)據(jù)是如何流轉(zhuǎn)的