前段時(shí)間學(xué)習(xí)了spark streaming采用kafka作為數(shù)據(jù)源時(shí)张足,數(shù)據(jù)接收并行度這一部分的源代碼。本文主要將學(xué)習(xí)的體會記錄一下疲恢,有理解不對的地方請多多指教描融。
Streaming從kafka接收數(shù)據(jù)有Receiver和direct兩種方式铡溪。下面我們看一下這兩種方式的源碼漂辐。
Direct approach
這種方式是使用kafka的低階API從kafka消費(fèi)數(shù)據(jù)。一般如果需要自行維護(hù)partition的offset棕硫,實(shí)現(xiàn)自定義checkpoint文件髓涯,或者exactlyOnce場景下就會用到這一方式。
首先需要看一下DirectKafkaInputDStream這個類饲帅,他是我們調(diào)用KafkaUtil.
createDirectStream方法生成的用來從kafka端接收數(shù)據(jù)的复凳。
compute方法定義了InputDStream是如何根據(jù)指定的batchTime生成RDD的瘤泪。
latestLeaderOffsets方法是獲取當(dāng)前InputDStream所包含的topic下所有的partition的最新offset的。Clamp方法是根據(jù)spark.streaming.kafka.maxRatePerPartition和backpressure這兩個參數(shù)來設(shè)置當(dāng)前block可以消費(fèi)到的offset的(即untilOffset)育八。這個數(shù)值需要跟partition最新的offset取最小值对途。
maxMessagesPerPartition方法實(shí)現(xiàn)了獲取某個partition能消費(fèi)到的message的數(shù)量。該方法首先會計(jì)算一個每分區(qū)每秒鐘消費(fèi)的消息數(shù)上線effectiveRateLimitPerPartition髓棋,他的value如下圖紅框中实檀,是在spark.streaming.kafka.maxRatePerPartition和batckpressure中取一個最小值,如果只配置了一個則以配置的為準(zhǔn)按声,都沒配置則返回None膳犹,返回None時(shí)直接取leader最新的offset。然后再根據(jù)batchTime計(jì)算出某partition在batchTime內(nèi)能消費(fèi)的消息數(shù)上限签则。
其中backpressure是spark1.5版本之后增加的參數(shù)须床,能夠根據(jù)上一個batch的執(zhí)行效率,動態(tài)估算出當(dāng)前batch能處理的最大消息數(shù)渐裂。這個參數(shù)在每個batch計(jì)算完成后豺旬,會通過StreamingListenerBus監(jiān)聽StreamingListenerBatchCompleted事件,然后由org.apache.spark.streaming.scheduler.
onBatchCompleted方法來重新計(jì)算柒凉,如下:
Backpressure的具體實(shí)現(xiàn)思路先不展開了(計(jì)算公式在PIDRateEstimator.compute方法中)族阅。我們回到DirectKafkaInputDStream.compute方法。當(dāng)計(jì)算完每個partition的untilOffset之后膝捞,會根據(jù)當(dāng)前InputDStream所消費(fèi)的topic的每個partition的currentOffset和untilOffset構(gòu)建KafkaRDD坦刀。
在kafkaRDD中我們可以看到他重寫的一些RDD的方法,
在getPartitions方法中可以看到蔬咬,KafkaRDD的partition個數(shù)就是topic的partition個數(shù)之和鲤遥。
在getPreferredLocations方法中可以看到,partition的首選location就是該topic的某個partition的leader所在的host林艘。這是很合理的渴频,因?yàn)閘eader上的數(shù)據(jù)正常情況下是最新的而且是最準(zhǔn)確的。而follower的數(shù)據(jù)往往還需要從leader上做同步北启,并且一旦同步出現(xiàn)較大的落后,還會從in-sync列表中移除拔第。而且kafka的讀寫都是通過leader進(jìn)行的咕村。
關(guān)于方法中part.host可以一路反推回去,會跟蹤到KafkaCluster.getLeaderOffsets方法中調(diào)用的findLeaders方法蚊俺,即part.host就是leader的host懈涛。
compute方法是RDD用來構(gòu)建一個partition的數(shù)據(jù)的。
我們看一下用來從partition中獲取數(shù)據(jù)的KafkaRDDIterator類泳猬。在類體中會發(fā)現(xiàn)
val consumer = connectLeader
的代碼批钠,這說明一點(diǎn)宇植,spark streaming的kafka低階API是每一個partition起一個consumer來消費(fèi)數(shù)據(jù)的。
然后我們看一下fetchBatch方法埋心。該方法中是我們很熟悉的一段根據(jù)起止offset消費(fèi)kafka某topic某partition數(shù)據(jù)的代碼指郁。
通過kafkaRDD這個類的閱讀我們可以看出,接收數(shù)據(jù)是以partition的leader為維度做分布式的拷呆,這樣做可以保證這個host上是有我要消費(fèi)的數(shù)據(jù)的闲坎,能夠?qū)崿F(xiàn)數(shù)據(jù)本地化。
Receiver
這種方式是采用kafka的高階API來消費(fèi)數(shù)據(jù)的茬斧。
建立InputDStream的代碼如下:
從KafkaUtils.createStream開始跟到KafkaInputDStream類腰懂,
getReceiver()方法中的變量useReliableReceiver是判斷是否配置了WAL機(jī)制。如下:
我們看一下KafkaReceiver的實(shí)現(xiàn)代碼:
在他的onStart()方法中可以看到他是創(chuàng)建了一個線程池executorPool來消費(fèi)消息的项秉。而這個線程池的線程數(shù)绣溜,就是我們在KafkaUtils.createStream時(shí)的入?yún)nlineStaffTopicMap的values的和。也就是說入?yún)nlineStaffTopicMap的value指的是某個topic在這個InputDStream中會有多少個consumer去消費(fèi)數(shù)據(jù)娄蔼。
再看一下MessageHandler中消費(fèi)及保存數(shù)據(jù)的邏輯:
這段代碼中streamIterator是被我們所喜聞樂見的使用高階API從kafka消費(fèi)數(shù)據(jù)的代碼怖喻。在代碼中消費(fèi)完數(shù)據(jù)之后,調(diào)用了store方法將message進(jìn)行了保存贷屎。
Store方法最終會將這條消息addData到BlockGenerator類中的currentBuffer:
ArrayBuffer中罢防。
該類中的updateCurrentBuffer方法值得我們關(guān)注一下,他是用來將已經(jīng)收集到的消息封裝成一個Block的唉侄。
那么這個方法什么情況下會被調(diào)用呢咒吐,需要看一下blockIntervalTimer的實(shí)現(xiàn)類RecurringTimer。
RecurringTimer是一個定時(shí)重復(fù)執(zhí)行高階函數(shù)callback的執(zhí)行器属划,他是通過Thread反復(fù)執(zhí)行l(wèi)oop方法實(shí)現(xiàn)的恬叹,loop方法中只要定時(shí)器不被終止,就會反復(fù)調(diào)用triggerActionForNextInterval方法同眯,而triggerActionForNextInterval會在特定的時(shí)刻(即nextTime)執(zhí)行callback函數(shù)(即入?yún)pdateCurrentBuffer函數(shù))绽昼。執(zhí)行完成之后會在nextTime上增加period作為下一次執(zhí)行的時(shí)刻。
而period方法是什么呢须蜗,他就是我們在構(gòu)建blockIntervalTimer時(shí)的入?yún)lockIntervalMs硅确,也就是streaming性能的一個優(yōu)化點(diǎn)spark.streaming.blockInterval。也就是說明肮,這段代碼的邏輯是每間隔blockInterval將由consumer消費(fèi)到的數(shù)據(jù)切分成一個block菱农。由此我們可以看到,這個參數(shù)是用來將Batch中所接受到的數(shù)據(jù)以它為時(shí)間間隔切分為block柿估,而在streaming處理數(shù)據(jù)時(shí)循未,會將block作為一個partition來進(jìn)行分布式計(jì)算,也就是說我們在指定的batchTime中秫舌,根據(jù)blockInterval能切出多少個block的妖,就能分成多少個partition绣檬,從而決定了streaming處理時(shí)的分布式程度。這一段代碼如下:
具體為什么我們說一個block會作為一個partition來進(jìn)行計(jì)算嫂粟,這一點(diǎn)可以看一下ReceiverInputDStream類的compute方法娇未,該方法調(diào)用了createBlockRDD方法來創(chuàng)建基于Receiver模式的RDD。在該方法中可以看到最終封裝的RDD為BlockRDD或者WriteAheadLogBackedBlockRDD赋元。
BlockRDD類中g(shù)etPartitions方法是說將這個batch的blocks作為partitions忘蟹。Compute方法則按照入?yún)lockRDDPartition的blockId,從blockManager中獲取該block作為partition的數(shù)據(jù)搁凸。getPreferredLocations則是將BlockRDDPartition所在的host作為partition的首選位置媚值。
總結(jié)
通過閱讀源碼我們可以看出,direct的方式是從kafka消費(fèi)完數(shù)據(jù)之后直接封裝成partition的數(shù)據(jù)提供給作業(yè)使用护糖,而receiver是將消費(fèi)到數(shù)據(jù)按照blockInterval切分成block褥芒,保存到blockManager中,在使用時(shí)會根據(jù)blockId獲取該數(shù)據(jù)嫡良。
另外direct的方式rdd的partition與topic的partition是一一對應(yīng)的锰扶,如果某個topic只有一個partition就不好了。而receiver的partition是根據(jù)blockInterval切分出來的寝受,blockInterval的默認(rèn)值是200ms坷牛,不存在這個問題。
這兩種方式在生產(chǎn)環(huán)境上用的都比較多很澄,我們一開始采用的是receiver的方式京闰。后來為了實(shí)現(xiàn)自定義checkpoint,改為了direct的方式甩苛。