Spark-streaming-2.0-Kafka數(shù)據(jù)接收并行度源碼學(xué)習(xí)

前段時(shí)間學(xué)習(xí)了spark streaming采用kafka作為數(shù)據(jù)源時(shí)张足,數(shù)據(jù)接收并行度這一部分的源代碼。本文主要將學(xué)習(xí)的體會記錄一下疲恢,有理解不對的地方請多多指教描融。

Streaming從kafka接收數(shù)據(jù)有Receiver和direct兩種方式铡溪。下面我們看一下這兩種方式的源碼漂辐。

Direct approach

這種方式是使用kafka的低階API從kafka消費(fèi)數(shù)據(jù)。一般如果需要自行維護(hù)partition的offset棕硫,實(shí)現(xiàn)自定義checkpoint文件髓涯,或者exactlyOnce場景下就會用到這一方式。

首先需要看一下DirectKafkaInputDStream這個類饲帅,他是我們調(diào)用KafkaUtil.

createDirectStream方法生成的用來從kafka端接收數(shù)據(jù)的复凳。

compute方法定義了InputDStream是如何根據(jù)指定的batchTime生成RDD的瘤泪。

latestLeaderOffsets方法是獲取當(dāng)前InputDStream所包含的topic下所有的partition的最新offset的。Clamp方法是根據(jù)spark.streaming.kafka.maxRatePerPartition和backpressure這兩個參數(shù)來設(shè)置當(dāng)前block可以消費(fèi)到的offset的(即untilOffset)育八。這個數(shù)值需要跟partition最新的offset取最小值对途。

maxMessagesPerPartition方法實(shí)現(xiàn)了獲取某個partition能消費(fèi)到的message的數(shù)量。該方法首先會計(jì)算一個每分區(qū)每秒鐘消費(fèi)的消息數(shù)上線effectiveRateLimitPerPartition髓棋,他的value如下圖紅框中实檀,是在spark.streaming.kafka.maxRatePerPartition和batckpressure中取一個最小值,如果只配置了一個則以配置的為準(zhǔn)按声,都沒配置則返回None膳犹,返回None時(shí)直接取leader最新的offset。然后再根據(jù)batchTime計(jì)算出某partition在batchTime內(nèi)能消費(fèi)的消息數(shù)上限签则。

其中backpressure是spark1.5版本之后增加的參數(shù)须床,能夠根據(jù)上一個batch的執(zhí)行效率,動態(tài)估算出當(dāng)前batch能處理的最大消息數(shù)渐裂。這個參數(shù)在每個batch計(jì)算完成后豺旬,會通過StreamingListenerBus監(jiān)聽StreamingListenerBatchCompleted事件,然后由org.apache.spark.streaming.scheduler.

onBatchCompleted方法來重新計(jì)算柒凉,如下:

Backpressure的具體實(shí)現(xiàn)思路先不展開了(計(jì)算公式在PIDRateEstimator.compute方法中)族阅。我們回到DirectKafkaInputDStream.compute方法。當(dāng)計(jì)算完每個partition的untilOffset之后膝捞,會根據(jù)當(dāng)前InputDStream所消費(fèi)的topic的每個partition的currentOffset和untilOffset構(gòu)建KafkaRDD坦刀。

在kafkaRDD中我們可以看到他重寫的一些RDD的方法,

在getPartitions方法中可以看到蔬咬,KafkaRDD的partition個數(shù)就是topic的partition個數(shù)之和鲤遥。

在getPreferredLocations方法中可以看到,partition的首選location就是該topic的某個partition的leader所在的host林艘。這是很合理的渴频,因?yàn)閘eader上的數(shù)據(jù)正常情況下是最新的而且是最準(zhǔn)確的。而follower的數(shù)據(jù)往往還需要從leader上做同步北启,并且一旦同步出現(xiàn)較大的落后,還會從in-sync列表中移除拔第。而且kafka的讀寫都是通過leader進(jìn)行的咕村。

關(guān)于方法中part.host可以一路反推回去,會跟蹤到KafkaCluster.getLeaderOffsets方法中調(diào)用的findLeaders方法蚊俺,即part.host就是leader的host懈涛。

compute方法是RDD用來構(gòu)建一個partition的數(shù)據(jù)的。

我們看一下用來從partition中獲取數(shù)據(jù)的KafkaRDDIterator類泳猬。在類體中會發(fā)現(xiàn)

val consumer = connectLeader

的代碼批钠,這說明一點(diǎn)宇植,spark streaming的kafka低階API是每一個partition起一個consumer來消費(fèi)數(shù)據(jù)的。

然后我們看一下fetchBatch方法埋心。該方法中是我們很熟悉的一段根據(jù)起止offset消費(fèi)kafka某topic某partition數(shù)據(jù)的代碼指郁。

通過kafkaRDD這個類的閱讀我們可以看出,接收數(shù)據(jù)是以partition的leader為維度做分布式的拷呆,這樣做可以保證這個host上是有我要消費(fèi)的數(shù)據(jù)的闲坎,能夠?qū)崿F(xiàn)數(shù)據(jù)本地化。

Receiver

這種方式是采用kafka的高階API來消費(fèi)數(shù)據(jù)的茬斧。

建立InputDStream的代碼如下:

從KafkaUtils.createStream開始跟到KafkaInputDStream類腰懂,

getReceiver()方法中的變量useReliableReceiver是判斷是否配置了WAL機(jī)制。如下:

我們看一下KafkaReceiver的實(shí)現(xiàn)代碼:

在他的onStart()方法中可以看到他是創(chuàng)建了一個線程池executorPool來消費(fèi)消息的项秉。而這個線程池的線程數(shù)绣溜,就是我們在KafkaUtils.createStream時(shí)的入?yún)nlineStaffTopicMap的values的和。也就是說入?yún)nlineStaffTopicMap的value指的是某個topic在這個InputDStream中會有多少個consumer去消費(fèi)數(shù)據(jù)娄蔼。

再看一下MessageHandler中消費(fèi)及保存數(shù)據(jù)的邏輯:

這段代碼中streamIterator是被我們所喜聞樂見的使用高階API從kafka消費(fèi)數(shù)據(jù)的代碼怖喻。在代碼中消費(fèi)完數(shù)據(jù)之后,調(diào)用了store方法將message進(jìn)行了保存贷屎。

Store方法最終會將這條消息addData到BlockGenerator類中的currentBuffer:

ArrayBuffer中罢防。

該類中的updateCurrentBuffer方法值得我們關(guān)注一下,他是用來將已經(jīng)收集到的消息封裝成一個Block的唉侄。

那么這個方法什么情況下會被調(diào)用呢咒吐,需要看一下blockIntervalTimer的實(shí)現(xiàn)類RecurringTimer。

RecurringTimer是一個定時(shí)重復(fù)執(zhí)行高階函數(shù)callback的執(zhí)行器属划,他是通過Thread反復(fù)執(zhí)行l(wèi)oop方法實(shí)現(xiàn)的恬叹,loop方法中只要定時(shí)器不被終止,就會反復(fù)調(diào)用triggerActionForNextInterval方法同眯,而triggerActionForNextInterval會在特定的時(shí)刻(即nextTime)執(zhí)行callback函數(shù)(即入?yún)pdateCurrentBuffer函數(shù))绽昼。執(zhí)行完成之后會在nextTime上增加period作為下一次執(zhí)行的時(shí)刻。

而period方法是什么呢须蜗,他就是我們在構(gòu)建blockIntervalTimer時(shí)的入?yún)lockIntervalMs硅确,也就是streaming性能的一個優(yōu)化點(diǎn)spark.streaming.blockInterval。也就是說明肮,這段代碼的邏輯是每間隔blockInterval將由consumer消費(fèi)到的數(shù)據(jù)切分成一個block菱农。由此我們可以看到,這個參數(shù)是用來將Batch中所接受到的數(shù)據(jù)以它為時(shí)間間隔切分為block柿估,而在streaming處理數(shù)據(jù)時(shí)循未,會將block作為一個partition來進(jìn)行分布式計(jì)算,也就是說我們在指定的batchTime中秫舌,根據(jù)blockInterval能切出多少個block的妖,就能分成多少個partition绣檬,從而決定了streaming處理時(shí)的分布式程度。這一段代碼如下:

具體為什么我們說一個block會作為一個partition來進(jìn)行計(jì)算嫂粟,這一點(diǎn)可以看一下ReceiverInputDStream類的compute方法娇未,該方法調(diào)用了createBlockRDD方法來創(chuàng)建基于Receiver模式的RDD。在該方法中可以看到最終封裝的RDD為BlockRDD或者WriteAheadLogBackedBlockRDD赋元。

BlockRDD類中g(shù)etPartitions方法是說將這個batch的blocks作為partitions忘蟹。Compute方法則按照入?yún)lockRDDPartition的blockId,從blockManager中獲取該block作為partition的數(shù)據(jù)搁凸。getPreferredLocations則是將BlockRDDPartition所在的host作為partition的首選位置媚值。

總結(jié)

通過閱讀源碼我們可以看出,direct的方式是從kafka消費(fèi)完數(shù)據(jù)之后直接封裝成partition的數(shù)據(jù)提供給作業(yè)使用护糖,而receiver是將消費(fèi)到數(shù)據(jù)按照blockInterval切分成block褥芒,保存到blockManager中,在使用時(shí)會根據(jù)blockId獲取該數(shù)據(jù)嫡良。

另外direct的方式rdd的partition與topic的partition是一一對應(yīng)的锰扶,如果某個topic只有一個partition就不好了。而receiver的partition是根據(jù)blockInterval切分出來的寝受,blockInterval的默認(rèn)值是200ms坷牛,不存在這個問題。

這兩種方式在生產(chǎn)環(huán)境上用的都比較多很澄,我們一開始采用的是receiver的方式京闰。后來為了實(shí)現(xiàn)自定義checkpoint,改為了direct的方式甩苛。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末蹂楣,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子讯蒲,更是在濱河造成了極大的恐慌痊土,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,692評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件墨林,死亡現(xiàn)場離奇詭異赁酝,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)旭等,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,482評論 3 392
  • 文/潘曉璐 我一進(jìn)店門赞哗,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人辆雾,你說我怎么就攤上這事≡屡” “怎么了度迂?”我有些...
    開封第一講書人閱讀 162,995評論 0 353
  • 文/不壞的土叔 我叫張陵藤乙,是天一觀的道長。 經(jīng)常有香客問我惭墓,道長坛梁,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,223評論 1 292
  • 正文 為了忘掉前任腊凶,我火速辦了婚禮划咐,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘钧萍。我一直安慰自己褐缠,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,245評論 6 388
  • 文/花漫 我一把揭開白布风瘦。 她就那樣靜靜地躺著队魏,像睡著了一般。 火紅的嫁衣襯著肌膚如雪万搔。 梳的紋絲不亂的頭發(fā)上胡桨,一...
    開封第一講書人閱讀 51,208評論 1 299
  • 那天,我揣著相機(jī)與錄音瞬雹,去河邊找鬼昧谊。 笑死,一個胖子當(dāng)著我的面吹牛酗捌,可吹牛的內(nèi)容都是我干的呢诬。 我是一名探鬼主播,決...
    沈念sama閱讀 40,091評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼意敛,長吁一口氣:“原來是場噩夢啊……” “哼馅巷!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起草姻,我...
    開封第一講書人閱讀 38,929評論 0 274
  • 序言:老撾萬榮一對情侶失蹤钓猬,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后撩独,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體敞曹,經(jīng)...
    沈念sama閱讀 45,346評論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,570評論 2 333
  • 正文 我和宋清朗相戀三年综膀,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了澳迫。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,739評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡剧劝,死狀恐怖橄登,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情,我是刑警寧澤拢锹,帶...
    沈念sama閱讀 35,437評論 5 344
  • 正文 年R本政府宣布谣妻,位于F島的核電站,受9級特大地震影響卒稳,放射性物質(zhì)發(fā)生泄漏蹋半。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,037評論 3 326
  • 文/蒙蒙 一充坑、第九天 我趴在偏房一處隱蔽的房頂上張望减江。 院中可真熱鬧,春花似錦捻爷、人聲如沸辈灼。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,677評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽茵休。三九已至,卻和暖如春手蝎,著一層夾襖步出監(jiān)牢的瞬間榕莺,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,833評論 1 269
  • 我被黑心中介騙來泰國打工棵介, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留钉鸯,地道東北人。 一個月前我還...
    沈念sama閱讀 47,760評論 2 369
  • 正文 我出身青樓邮辽,卻偏偏與公主長得像唠雕,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子吨述,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,647評論 2 354

推薦閱讀更多精彩內(nèi)容