本文基于spark streaming通過direct mode訪問kafka的場景徐许,從源碼出發(fā)分析spark streaming如何實現(xiàn)數(shù)據(jù)讀取的限流和反壓检盼。
我們知道蹦渣,KafkaUtils.createDirectStream方法用于創(chuàng)建direct mode訪問kafka的InputDStream.
從源碼可以看到,所有KafkaUtils.createDirectStream方法重載最終都會調(diào)用下面這個方法創(chuàng)建DirectKafkaInputDStream對象:
1 DirectKafkaInputDStream如何生成KafkaRDD堪滨?
先看看DirectKafkaInputDStream類圖:
DirectKafkaInputDstream.compute方法用于生成指定時間batch的KafkaRDD,這個RDD會從kafka各個分區(qū)抽取指定offset range的數(shù)據(jù)调俘。
而這個offet range由以下三個因素決定:
1. 已消費到的最新offsets值皇忿,即DirectKafkaInputDStream.currentOffsets
2. 當前batch應該消費的最大消息數(shù)量,即DirectKafkaInputDStream.maxMessagesPerPartition的返回值
3. 當前kafka分區(qū)的最大可用的offset坦仍,即DirectKafkaInputDStream.latestOffsets的返回值
DirectKafkaInputDstream.compute會調(diào)用DirectKafkaInputDstream.clamp方法計算得到每個分區(qū)offset range的end offset鳍烁,計算的方式就是currentOffsets中各個分區(qū)的begin offset加上maxMessagesPerPartition中對應分區(qū)的消息數(shù),再和latestOffsets中對應分區(qū)的latest offset做比較繁扎,取更小值幔荒。
其中,clamp的參數(shù)offets就是DirectKafkaInputDStream.latestOffsets的返回值梳玫。
最后爹梁,DirectKafkaInputDstream.compute會將各個分區(qū)的end offsets和currentOffets中的begin offsets組成各個分區(qū)的offset ranges,用以生成KafkaRDD對象:
2 DirectKafkaInputDStream如何實現(xiàn)限流反壓提澎?
2.1 rateController的生成
Spark Streaming限流反壓機制主要在DirectKafkaInputDStream.maxMessagesPerPartition方法中實現(xiàn)姚垃,其依賴于rateController對象計算最新的數(shù)據(jù)消費速率(即每秒消費的消息數(shù)量),先看看DirectKafkaInputDStream中的rateController對象是如何生成的:
可以看到盼忌,DirectKafkaInputDStream使用的是DirectKafkaRateController.
這里的RateController.isBackPressureEnabled會check配置參數(shù)spark.streaming.backpressure.enabled, 默認為false.
接著看RateEstimator.create方法:
可以看到积糯,目前(spark 2.4.0)spark只有一種rate estimator選擇,即PIDRateEstimator.
2.2 DirectKafkaInputDStream.maxMessagesPerPartition
下面我們看看DirectKafkaInputDStream.maxMessagesPerPartition做了哪些事情:
? ? 1. 調(diào)用RateController.getLatestRate方法得到最新的數(shù)據(jù)消費速率:
? ? ? ? ? 注意谦纱,這里得到的estimatedRateLimit是在一個batch中每秒從所有kafka分區(qū)消費的消息總數(shù)量看成。
? ? 2. 計算各個分區(qū)的消息延遲量(即各個分區(qū)在latestOffsets和currentOffsets中對應分區(qū)offset的差值)。
? ? 3. 以每個分區(qū)的延遲量為權重跨嘉,計算各個分區(qū)的數(shù)據(jù)消費速率backpressureRate.
? ? 4. 當maxRateLimitPerPartition>0時川慌,比較backpressureRate和maxRateLimitPerPartition,取二者更小者作為分區(qū)的消費速率(這里正是限流功能的體現(xiàn))祠乃。其中梦重,maxRateLimitPerPartition由參數(shù)spark.streaming.kafka.maxRatePerPartition決定,默認為0.
? ? 5. 最后亮瓷,將#4中計算得到的消費速率乘以batch的時長(單位轉化為秒)琴拧,并和minRateLimitPerPartition做比較(應該是和secsPerBatch*minRateLimitPerPartition做比較,此處可能是spark的一個bug)寺庄,取二者更大者作為分區(qū)要消費的消息數(shù)艾蓝。其中,minRateLimitPerPartition由參數(shù)spark.streaming.kafka.minRatePerPartition決定斗塘,默認為1.
到此赢织,我們分析了在DirectKafkaInputDStream中是如何利用RateController得到消費速率,并根據(jù)消費速率計算出指定batch中需要從各個分區(qū)消費的offset ranges馍盟, 進而生成對應KafkaRDD對象的于置。
下面,我們就詳細討論一下RateController是如何計算出數(shù)據(jù)消費速率的贞岭。
3 RateController & RateEstimator 如何計算和更新數(shù)據(jù)消費速率八毯?
先看看RateController和RateEstimator的類圖:
從上面的類圖可以看到搓侄,所有RateController的構造函數(shù)都是需要兩個參數(shù):streamUID和rateEstimator.
streamUID是stream的唯一ID,由StreamingContext.getNewInputStreamId生成话速。 RateEstimator是用來估計輸入流的消費速率的組件讶踪。
從上面的類圖可以看出,有兩種RateEstimator實現(xiàn):
? ? 1. ConstantEstimator:以一個固定值作為輸入流的抽取速率泊交,在spark的內(nèi)部測試類(如DirectKafkaStreamSuite)中使用乳讥。
? ? 2. PIDRateEstimator: 以proportional-integral-derivative (PID)方法評估輸入流的數(shù)據(jù)抽取速率,下文會詳細介紹廓俭。
3.1 RateController
從上面的類圖可以看到云石,所有的RateController都是繼承自StreamingListener的,也就是說rateController是在每個batch執(zhí)行完成后作為一個listener被調(diào)用以更新數(shù)據(jù)消費速率的研乒。
先來看看RateController的onBatchCompleted方法:
先從batchInfo中抽取出四個變量:
? ? 1. processingEnd: batch執(zhí)行結束的時間汹忠。
? ? 2. workDelay:處理batch數(shù)據(jù)所消耗的時間(單位是毫秒),也就是batch從開始執(zhí)行到執(zhí)行結束的時間雹熬。
? ? 3. waitDelay:batch的等待時間(單位是毫秒宽菜,主要是前一個batch未能在新的batch開始之前完成而導致的延遲),也就是從batch被提交到其第一個job開始執(zhí)行的時間橄唬。
? ? 4. elems:在batch的時間段中赋焕,RateController對應的stream接受的消息數(shù)量。
RateController.computeAndPublish方法以這四個變量為參數(shù)仰楚,計算并更新數(shù)據(jù)消費速率:
首先,調(diào)用reateEstimator.compute方法計算得到新的數(shù)據(jù)消費速率犬庇,并更新到RateController的rateLimit變量中僧界。
接著,調(diào)用publish方法發(fā)布更新后的數(shù)據(jù)消費速率臭挽,publish方法由RateController的子類實現(xiàn)捂襟。
因為DirectKafkaInputDStream.maxMessagesPerPartition直接調(diào)用RateController.getLatestRate方法獲取最新的數(shù)據(jù)消費速率,不需要額外的發(fā)布動作欢峰。
所以葬荷,DirectKafkaRateController的publish方法是一個空實現(xiàn):
3.2 PIDRateEstimator
從3.1的分析可以看出,數(shù)據(jù)消費速率的計算是在rateEstimator中完成的纽帖,上文也有提到宠漩,spark中只有一種可選擇的RateEstimator實現(xiàn)類,即PIDRateEstimator (ConstantEstimator僅在測試中使用)懊直。
PIDRateEstimator的主要計算源碼如下:
1. 基于batch的消息數(shù)量elems和處理時間workDelay扒吁,計算出batch的消息處理速率:processingRate
2. 用上一個batch的消息處理速率latestRate減去processingRate,得到兩個連續(xù)batch的消息處理速率差值:error
3. 基于batch的等待時間waitDelay和processingRate室囊,計算出為了彌補等待時間造成的延遲所需要增加的消息處理速率差值:historicalError(顧名思義雕崩,就是因為歷史batch處理延遲導致的差值)
4. 基于上一個batch的消息處理速率差值latestError和當前batch的消息處理速率差值error魁索,計算出消息處理速率差值的變化率dError
5. 最后,根據(jù)PIDRateEstimator的參數(shù)proportional盼铁,integral和derivative(這三個參數(shù)可分別看作是error粗蔚,historicalError和dError的權重值),用latestRate減去加權后的這三個error值饶火,再和PIDRateEstimator.minRate做比較取更大者作為新的數(shù)據(jù)消費速率newRate.
其中支鸡,proportional默認為1,integral默認為0.2趁窃,derivative默認為0 (也就是說dError默認是不起作用的)牧挣。從上文RateEstimator.create的源碼可以看到,這三個權重值和PIDRateEstimator.minRate均由配置參數(shù)spark.streaming.backpressure.pid.*決定醒陆。
可以看到瀑构,在計算新的數(shù)據(jù)消費速率時,spark將消息處理速率刨摩,累計延遲以及消息處理速率差值的變化都納入了考量范圍寺晌,避免新的batch消費超過處理能力的數(shù)據(jù)量,從而達到反壓的功能澡刹。
4 總結
本文基于spark streaming direct mode消費kafka數(shù)據(jù)的場景呻征,從源碼角度分析了DirectKafkaInputDStream計算kafka分區(qū)的offset ranges并生成KafkaRDD,DirectKafkaInputDStream如何借助RateController計算各個kafka分區(qū)在新batch中應處理的最大消息數(shù)量罢浇,以及RateController和RateEstimator的內(nèi)部結構陆赋,并著重分析了PIDRateEstimator的實現(xiàn)原理。
總的來說嚷闭,spark streaming在通過direct mode訪問kafka數(shù)據(jù)時攒岛,通過配置參數(shù)spark.streaming.kafka.maxRatePerPartition和spark.streaming.kafka.minRatePerPartition進行限流;與此同時胞锰,通過PIDRateEstimator計算出合適的數(shù)據(jù)消費速率灾锯,從而控制數(shù)據(jù)處理的workload,進行反壓嗅榕。
5 思考
這里聊一下筆者對spark streaming限流反壓機制的思考顺饮。spark streaming的限流反壓僅僅是依據(jù)歷史(前一個batch)處理速率和累計延遲去估算新的batch合適的數(shù)據(jù)處理速率,這種機制在一些場景下確實可以有效地起到反壓作用凌那。
比如兼雄,當內(nèi)存充足,數(shù)據(jù)處理的瓶頸在CPU案怯,CPU來不及計算單個batch的數(shù)據(jù)導致延遲,這個時候spark streaming的反壓機制計算出來的數(shù)據(jù)處理速率是和數(shù)據(jù)量線性成比例的,所以估算出來的新的數(shù)據(jù)處理速率可以保證新batch的數(shù)據(jù)可以在一個batch時間內(nèi)處理完畢金砍,并可以比較充分地利用整個batch interval的時間局蚀。
然而,當數(shù)據(jù)處理速率和數(shù)據(jù)量并非成比例恕稠,比如琅绅,當內(nèi)存成為瓶頸,GC或者spill數(shù)據(jù)到磁盤的時間可能占了整個batch處理時間的一大部分鹅巍,spark streaming反壓機制估算出來的單條記錄處理時間可能遠大于實際的單條記錄處理時間千扶。這個時候新的batch就會讀取很少的數(shù)據(jù)進行處理,可能就不會碰到內(nèi)存瓶頸骆捧,很快就算完了澎羞,那么在估算時又會把數(shù)據(jù)處理速率估計得很大,導致又讀取了大量數(shù)據(jù)進行處理敛苇,從而又碰到內(nèi)存瓶頸妆绞,增加數(shù)據(jù)延遲。
從根本上講枫攀,以上描述的問題是由于spark streaming的限流反壓在估算數(shù)據(jù)處理速率時括饶,沒有更細粒度地考慮前一個batch的處理時間消耗在哪,而是籠統(tǒng)地用batch的數(shù)據(jù)量除以整個batch的處理時間來估算每秒能處理的記錄數(shù)来涨。
還有一個小問題图焰,spark streaming的反壓機制是從第一個batch結束后開始計算處理速率的,那么第一個batch就只能通過限流的方式來控制流入的數(shù)據(jù)量蹦掐,而這個限流機制是需要開發(fā)人員手動設置并且無法在程序運行期間動態(tài)調(diào)整技羔。
6 說明
1. spark源碼版本:2.4.0
2. 本文的討論和分析均基于Spark Streaming direct mode訪問kafka的場景,其他streaming情況(如spark streaming的receiver-based訪問模式)不一定適用
3. 水平有限笤闯,如有錯誤堕阔,望讀者指出