Spark Streaming限流反壓機制源碼剖析

本文基于spark streaming通過direct mode訪問kafka的場景徐许,從源碼出發(fā)分析spark streaming如何實現(xiàn)數(shù)據(jù)讀取的限流和反壓检盼。

我們知道蹦渣,KafkaUtils.createDirectStream方法用于創(chuàng)建direct mode訪問kafka的InputDStream.

從源碼可以看到,所有KafkaUtils.createDirectStream方法重載最終都會調(diào)用下面這個方法創(chuàng)建DirectKafkaInputDStream對象:

KafkaUtils.createDirectStream

1 DirectKafkaInputDStream如何生成KafkaRDD堪滨?

先看看DirectKafkaInputDStream類圖:

DirectKafkaInputDStream類圖

DirectKafkaInputDstream.compute方法用于生成指定時間batch的KafkaRDD,這個RDD會從kafka各個分區(qū)抽取指定offset range的數(shù)據(jù)调俘。

而這個offet range由以下三個因素決定:

1. 已消費到的最新offsets值皇忿,即DirectKafkaInputDStream.currentOffsets

2. 當前batch應該消費的最大消息數(shù)量,即DirectKafkaInputDStream.maxMessagesPerPartition的返回值

3. 當前kafka分區(qū)的最大可用的offset坦仍,即DirectKafkaInputDStream.latestOffsets的返回值

DirectKafkaInputDstream.compute會調(diào)用DirectKafkaInputDstream.clamp方法計算得到每個分區(qū)offset range的end offset鳍烁,計算的方式就是currentOffsets中各個分區(qū)的begin offset加上maxMessagesPerPartition中對應分區(qū)的消息數(shù),再和latestOffsets中對應分區(qū)的latest offset做比較繁扎,取更小值幔荒。

DirectKafkaInputDstream.clamp

其中,clamp的參數(shù)offets就是DirectKafkaInputDStream.latestOffsets的返回值梳玫。

最后爹梁,DirectKafkaInputDstream.compute會將各個分區(qū)的end offsets和currentOffets中的begin offsets組成各個分區(qū)的offset ranges,用以生成KafkaRDD對象:


DirectKafkaInputDstream.compute

2 DirectKafkaInputDStream如何實現(xiàn)限流反壓提澎?

2.1 rateController的生成

Spark Streaming限流反壓機制主要在DirectKafkaInputDStream.maxMessagesPerPartition方法中實現(xiàn)姚垃,其依賴于rateController對象計算最新的數(shù)據(jù)消費速率(即每秒消費的消息數(shù)量),先看看DirectKafkaInputDStream中的rateController對象是如何生成的:


DirectKafkaInputDStream.rateController

可以看到盼忌,DirectKafkaInputDStream使用的是DirectKafkaRateController.

這里的RateController.isBackPressureEnabled會check配置參數(shù)spark.streaming.backpressure.enabled, 默認為false.

接著看RateEstimator.create方法:

RateEstimator.create

可以看到积糯,目前(spark 2.4.0)spark只有一種rate estimator選擇,即PIDRateEstimator.

2.2 DirectKafkaInputDStream.maxMessagesPerPartition

下面我們看看DirectKafkaInputDStream.maxMessagesPerPartition做了哪些事情:

? ? 1. 調(diào)用RateController.getLatestRate方法得到最新的數(shù)據(jù)消費速率:


DirectKafkaInputDStream.maxMessagesPerPartition

? ? ? ? ? 注意谦纱,這里得到的estimatedRateLimit是在一個batch中每秒從所有kafka分區(qū)消費的消息總數(shù)量看成。

? ? 2. 計算各個分區(qū)的消息延遲量(即各個分區(qū)在latestOffsets和currentOffsets中對應分區(qū)offset的差值)。

? ? 3. 以每個分區(qū)的延遲量為權重跨嘉,計算各個分區(qū)的數(shù)據(jù)消費速率backpressureRate.

? ? 4. 當maxRateLimitPerPartition>0時川慌,比較backpressureRate和maxRateLimitPerPartition,取二者更小者作為分區(qū)的消費速率(這里正是限流功能的體現(xiàn))祠乃。其中梦重,maxRateLimitPerPartition由參數(shù)spark.streaming.kafka.maxRatePerPartition決定,默認為0.

DirectKafkaInputDStream.maxMessagesPerPartition

? ? 5. 最后亮瓷,將#4中計算得到的消費速率乘以batch的時長(單位轉化為秒)琴拧,并和minRateLimitPerPartition做比較(應該是和secsPerBatch*minRateLimitPerPartition做比較,此處可能是spark的一個bug)寺庄,取二者更大者作為分區(qū)要消費的消息數(shù)艾蓝。其中,minRateLimitPerPartition由參數(shù)spark.streaming.kafka.minRatePerPartition決定斗塘,默認為1.

DirectKafkaInputDStream.maxMessagesPerPartition

到此赢织,我們分析了在DirectKafkaInputDStream中是如何利用RateController得到消費速率,并根據(jù)消費速率計算出指定batch中需要從各個分區(qū)消費的offset ranges馍盟, 進而生成對應KafkaRDD對象的于置。

下面,我們就詳細討論一下RateController是如何計算出數(shù)據(jù)消費速率的贞岭。

3 RateController & RateEstimator 如何計算和更新數(shù)據(jù)消費速率八毯?

先看看RateController和RateEstimator的類圖:

RateController和RateEstimator的類圖

從上面的類圖可以看到搓侄,所有RateController的構造函數(shù)都是需要兩個參數(shù):streamUID和rateEstimator.

streamUID是stream的唯一ID,由StreamingContext.getNewInputStreamId生成话速。 RateEstimator是用來估計輸入流的消費速率的組件讶踪。

從上面的類圖可以看出,有兩種RateEstimator實現(xiàn):

? ? 1. ConstantEstimator:以一個固定值作為輸入流的抽取速率泊交,在spark的內(nèi)部測試類(如DirectKafkaStreamSuite)中使用乳讥。

? ? 2. PIDRateEstimator: 以proportional-integral-derivative (PID)方法評估輸入流的數(shù)據(jù)抽取速率,下文會詳細介紹廓俭。

3.1 RateController

從上面的類圖可以看到云石,所有的RateController都是繼承自StreamingListener的,也就是說rateController是在每個batch執(zhí)行完成后作為一個listener被調(diào)用以更新數(shù)據(jù)消費速率的研乒。

先來看看RateController的onBatchCompleted方法:

RateController.onBatchCompleted

先從batchInfo中抽取出四個變量:

? ? 1. processingEnd: batch執(zhí)行結束的時間汹忠。

? ? 2. workDelay:處理batch數(shù)據(jù)所消耗的時間(單位是毫秒),也就是batch從開始執(zhí)行到執(zhí)行結束的時間雹熬。

? ? 3. waitDelay:batch的等待時間(單位是毫秒宽菜,主要是前一個batch未能在新的batch開始之前完成而導致的延遲),也就是從batch被提交到其第一個job開始執(zhí)行的時間橄唬。

? ? 4. elems:在batch的時間段中赋焕,RateController對應的stream接受的消息數(shù)量。

RateController.computeAndPublish方法以這四個變量為參數(shù)仰楚,計算并更新數(shù)據(jù)消費速率:

RateController.computeAndPublish

首先,調(diào)用reateEstimator.compute方法計算得到新的數(shù)據(jù)消費速率犬庇,并更新到RateController的rateLimit變量中僧界。

接著,調(diào)用publish方法發(fā)布更新后的數(shù)據(jù)消費速率臭挽,publish方法由RateController的子類實現(xiàn)捂襟。

因為DirectKafkaInputDStream.maxMessagesPerPartition直接調(diào)用RateController.getLatestRate方法獲取最新的數(shù)據(jù)消費速率,不需要額外的發(fā)布動作欢峰。

所以葬荷,DirectKafkaRateController的publish方法是一個空實現(xiàn):

DirectKafkaRateController

3.2 PIDRateEstimator

從3.1的分析可以看出,數(shù)據(jù)消費速率的計算是在rateEstimator中完成的纽帖,上文也有提到宠漩,spark中只有一種可選擇的RateEstimator實現(xiàn)類,即PIDRateEstimator (ConstantEstimator僅在測試中使用)懊直。

PIDRateEstimator的主要計算源碼如下:

PIDRateEstimator.compute

1. 基于batch的消息數(shù)量elems和處理時間workDelay扒吁,計算出batch的消息處理速率:processingRate

2. 用上一個batch的消息處理速率latestRate減去processingRate,得到兩個連續(xù)batch的消息處理速率差值:error

3. 基于batch的等待時間waitDelay和processingRate室囊,計算出為了彌補等待時間造成的延遲所需要增加的消息處理速率差值:historicalError(顧名思義雕崩,就是因為歷史batch處理延遲導致的差值)

4. 基于上一個batch的消息處理速率差值latestError和當前batch的消息處理速率差值error魁索,計算出消息處理速率差值的變化率dError

5. 最后,根據(jù)PIDRateEstimator的參數(shù)proportional盼铁,integral和derivative(這三個參數(shù)可分別看作是error粗蔚,historicalError和dError的權重值),用latestRate減去加權后的這三個error值饶火,再和PIDRateEstimator.minRate做比較取更大者作為新的數(shù)據(jù)消費速率newRate.

其中支鸡,proportional默認為1,integral默認為0.2趁窃,derivative默認為0 (也就是說dError默認是不起作用的)牧挣。從上文RateEstimator.create的源碼可以看到,這三個權重值和PIDRateEstimator.minRate均由配置參數(shù)spark.streaming.backpressure.pid.*決定醒陆。

可以看到瀑构,在計算新的數(shù)據(jù)消費速率時,spark將消息處理速率刨摩,累計延遲以及消息處理速率差值的變化都納入了考量范圍寺晌,避免新的batch消費超過處理能力的數(shù)據(jù)量,從而達到反壓的功能澡刹。

4 總結

本文基于spark streaming direct mode消費kafka數(shù)據(jù)的場景呻征,從源碼角度分析了DirectKafkaInputDStream計算kafka分區(qū)的offset ranges并生成KafkaRDD,DirectKafkaInputDStream如何借助RateController計算各個kafka分區(qū)在新batch中應處理的最大消息數(shù)量罢浇,以及RateController和RateEstimator的內(nèi)部結構陆赋,并著重分析了PIDRateEstimator的實現(xiàn)原理。

總的來說嚷闭,spark streaming在通過direct mode訪問kafka數(shù)據(jù)時攒岛,通過配置參數(shù)spark.streaming.kafka.maxRatePerPartition和spark.streaming.kafka.minRatePerPartition進行限流;與此同時胞锰,通過PIDRateEstimator計算出合適的數(shù)據(jù)消費速率灾锯,從而控制數(shù)據(jù)處理的workload,進行反壓嗅榕。

5 思考

這里聊一下筆者對spark streaming限流反壓機制的思考顺饮。spark streaming的限流反壓僅僅是依據(jù)歷史(前一個batch)處理速率和累計延遲去估算新的batch合適的數(shù)據(jù)處理速率,這種機制在一些場景下確實可以有效地起到反壓作用凌那。

比如兼雄,當內(nèi)存充足,數(shù)據(jù)處理的瓶頸在CPU案怯,CPU來不及計算單個batch的數(shù)據(jù)導致延遲,這個時候spark streaming的反壓機制計算出來的數(shù)據(jù)處理速率是和數(shù)據(jù)量線性成比例的,所以估算出來的新的數(shù)據(jù)處理速率可以保證新batch的數(shù)據(jù)可以在一個batch時間內(nèi)處理完畢金砍,并可以比較充分地利用整個batch interval的時間局蚀。

然而,當數(shù)據(jù)處理速率和數(shù)據(jù)量并非成比例恕稠,比如琅绅,當內(nèi)存成為瓶頸,GC或者spill數(shù)據(jù)到磁盤的時間可能占了整個batch處理時間的一大部分鹅巍,spark streaming反壓機制估算出來的單條記錄處理時間可能遠大于實際的單條記錄處理時間千扶。這個時候新的batch就會讀取很少的數(shù)據(jù)進行處理,可能就不會碰到內(nèi)存瓶頸骆捧,很快就算完了澎羞,那么在估算時又會把數(shù)據(jù)處理速率估計得很大,導致又讀取了大量數(shù)據(jù)進行處理敛苇,從而又碰到內(nèi)存瓶頸妆绞,增加數(shù)據(jù)延遲。

從根本上講枫攀,以上描述的問題是由于spark streaming的限流反壓在估算數(shù)據(jù)處理速率時括饶,沒有更細粒度地考慮前一個batch的處理時間消耗在哪,而是籠統(tǒng)地用batch的數(shù)據(jù)量除以整個batch的處理時間來估算每秒能處理的記錄數(shù)来涨。

還有一個小問題图焰,spark streaming的反壓機制是從第一個batch結束后開始計算處理速率的,那么第一個batch就只能通過限流的方式來控制流入的數(shù)據(jù)量蹦掐,而這個限流機制是需要開發(fā)人員手動設置并且無法在程序運行期間動態(tài)調(diào)整技羔。

6 說明

1. spark源碼版本:2.4.0

2. 本文的討論和分析均基于Spark Streaming direct mode訪問kafka的場景,其他streaming情況(如spark streaming的receiver-based訪問模式)不一定適用

3. 水平有限笤闯,如有錯誤堕阔,望讀者指出

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市颗味,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌牺弹,老刑警劉巖浦马,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異张漂,居然都是意外死亡晶默,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進店門航攒,熙熙樓的掌柜王于貴愁眉苦臉地迎上來磺陡,“玉大人,你說我怎么就攤上這事”宜” “怎么了坞靶?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長蝴悉。 經(jīng)常有香客問我彰阴,道長,這世上最難降的妖魔是什么拍冠? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任尿这,我火速辦了婚禮,結果婚禮上庆杜,老公的妹妹穿的比我還像新娘射众。我一直安慰自己,他們只是感情好晃财,可當我...
    茶點故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布叨橱。 她就那樣靜靜地躺著,像睡著了一般拓劝。 火紅的嫁衣襯著肌膚如雪雏逾。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天郑临,我揣著相機與錄音栖博,去河邊找鬼。 笑死厢洞,一個胖子當著我的面吹牛仇让,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播躺翻,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼丧叽,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了公你?” 一聲冷哼從身側響起踊淳,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎陕靠,沒想到半個月后迂尝,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡剪芥,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年垄开,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片税肪。...
    茶點故事閱讀 39,779評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡溉躲,死狀恐怖榜田,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情锻梳,我是刑警寧澤箭券,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站唱蒸,受9級特大地震影響邦鲫,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜神汹,卻給世界環(huán)境...
    茶點故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一庆捺、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧屁魏,春花似錦滔以、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至桃漾,卻和暖如春坏匪,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背撬统。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工适滓, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人恋追。 一個月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓凭迹,卻偏偏與公主長得像,于是被迫代替她去往敵國和親苦囱。 傳聞我的和親對象是個殘疾皇子嗅绸,可洞房花燭夜當晚...
    茶點故事閱讀 44,700評論 2 354

推薦閱讀更多精彩內(nèi)容