- 看完本文潦嘶,你能get到以下知識(shí)
- Flink 流處理為什么需要網(wǎng)絡(luò)流控床佳?
- Flink V1.5 版之前網(wǎng)絡(luò)流控介紹
- Flink V1.5 版之前的反壓策略存在的問(wèn)題
- Credit的反壓策略實(shí)現(xiàn)原理驮宴,Credit是如何解決 Flink 1.5 之前的問(wèn)題沛简?
- 對(duì)比spark,都說(shuō)flink延遲低贪壳,來(lái)一條處理一條,真是這樣嗎蚜退?其實(shí)Flink內(nèi)部也有Buffer機(jī)制闰靴,Buffer機(jī)制具體是如何實(shí)現(xiàn)的彪笼?
- Flink 如何在吞吐量和延遲之間做權(quán)衡?
- 后續(xù)相關(guān)博客
- Flink 反壓相關(guān) Metrics 介紹
- 基于 Flink 的流控機(jī)制和反壓如何定位 Flink 任務(wù)的瓶頸蚂且∨涿ǎ或者說(shuō),如果一個(gè)平時(shí)正常的 Flink 任務(wù)突然出現(xiàn)延遲了杏死,怎么來(lái)定位問(wèn)題泵肄?到底是 Kafka 讀取數(shù)據(jù)慢,還是中間某個(gè)計(jì)算環(huán)節(jié)比較消耗資源使得變慢淑翼,還是由于最后的寫(xiě)入外部存儲(chǔ)時(shí)比較慢腐巢?
Flink 流處理為什么需要網(wǎng)絡(luò)流控?
分析一個(gè)簡(jiǎn)單的 Flink 流任務(wù)玄括,下圖是一個(gè)簡(jiǎn)單的Flink流任務(wù)執(zhí)行圖:任務(wù)首先從 Kafka 中讀取數(shù)據(jù)冯丙、 map 算子對(duì)數(shù)據(jù)進(jìn)行轉(zhuǎn)換、keyBy 按照指定 key 對(duì)數(shù)據(jù)進(jìn)行分區(qū)(相同 key 的數(shù)據(jù)經(jīng)過(guò) keyBy 后分到同一個(gè) subtask 實(shí)例中)遭京,keyBy 后對(duì)數(shù)據(jù)接著進(jìn)行 map 轉(zhuǎn)換胃惜,然后使用 Sink 將數(shù)據(jù)輸出到外部存儲(chǔ)。
眾所周知哪雕,在大數(shù)據(jù)處理中船殉,無(wú)論是批處理還是流處理,單點(diǎn)處理的性能總是有限的斯嚎,我們的單個(gè) Job 一般會(huì)運(yùn)行在多個(gè)節(jié)點(diǎn)上利虫,多個(gè)節(jié)點(diǎn)共同配合來(lái)提升整個(gè)系統(tǒng)的處理性能。圖中孝扛,任務(wù)被切分成 4 個(gè)可獨(dú)立執(zhí)行的 subtask( A0列吼、A1、B0苦始、B1)寞钥,在數(shù)據(jù)處理過(guò)程中,就會(huì)存在 shuffle(數(shù)據(jù)傳輸)的過(guò)程陌选。例如理郑,subtask A0 處理完的數(shù)據(jù)經(jīng)過(guò) keyBy 后發(fā)送到 subtask B0、B1 所在節(jié)點(diǎn)去處理咨油。
那么問(wèn)題來(lái)了您炉,下圖中,上游 Producer 向下游 Consumer 發(fā)送數(shù)據(jù)役电,在發(fā)送端和接受端都有相應(yīng)的 Send Buffer 和 Receive Buffer赚爵,但是上游 Producer 生成數(shù)據(jù)的速率比下游 Consumer 消費(fèi)數(shù)據(jù)的速率快。Producer 生產(chǎn)數(shù)據(jù) 2MB/s, Consumer 消費(fèi)數(shù)據(jù) 1MB/s冀膝,Receive Buffer 只有 5MB唁奢,所以過(guò)了5秒后,接收端的 Receive Buffer 滿了窝剖。(可以把下圖中的 Producer 當(dāng)做上面案例中的 subtask A0麻掸,把下圖中的 Consumer 當(dāng)做上面案例中的 subtask B0)
下游接收區(qū)的 Receive Buffer 有限,如果上游一直有源源不斷的數(shù)據(jù)赐纱,那么將會(huì)面臨著以下兩個(gè)情況:
下游消費(fèi)者會(huì)丟棄新到達(dá)的數(shù)據(jù)脊奋,因?yàn)橄掠蜗M(fèi)者的緩沖區(qū)放不下
為了不丟棄數(shù)據(jù),所以下游消費(fèi)者的 Receive Buffer 持續(xù)擴(kuò)張疙描,最后耗盡消費(fèi)者的內(nèi)存诚隙,OOM,程序掛掉
常識(shí)告訴我們淫痰,這兩種情況在生產(chǎn)環(huán)境都是不能接受的最楷,第一種會(huì)把數(shù)據(jù)丟棄、第二種會(huì)把我們的應(yīng)用程序掛掉待错。所以籽孙,該問(wèn)題的解決方案不應(yīng)該是下游 Receive Buffer 一直累積數(shù)據(jù),而是上游 Producer 發(fā)現(xiàn)下游 Consumer 處理比較慢之后火俄,應(yīng)該在 Producer 端做出限流的策略犯建,防止在下游 Consumer 端無(wú)限制的數(shù)據(jù)堆積。
那上游 Producer 端該如何做限流呢瓜客?可以采用下圖所示靜態(tài)限流的策略:
靜態(tài)限速的思想就是适瓦,提前已知下游 Consumer 的消費(fèi)速率,然后通過(guò)在上游 Producer 端使用類(lèi)似令牌桶的思想谱仪,限制 Producer 端生產(chǎn)數(shù)據(jù)的速率玻熙,從而控制上游 Producer 端向下游 Consumer 端發(fā)送數(shù)據(jù)的速率。但是靜態(tài)限速會(huì)存在問(wèn)題:
- 通常無(wú)法事先預(yù)估下游 Consumer 端能承受的最大速率
- 就算通過(guò)某種方式預(yù)估出下游 Consumer 端能承受的最大速率疯攒,下游應(yīng)用程序也可能會(huì)因?yàn)榫W(wǎng)絡(luò)抖動(dòng)嗦随、 CPU 共享競(jìng)爭(zhēng)、內(nèi)存緊張敬尺、IO阻塞等原因造成下游應(yīng)用程序的吞吐量降低枚尼,然后又會(huì)出現(xiàn)上面所說(shuō)的下游接收區(qū)的 Receive Buffer 有限,上游一直有源源不斷的數(shù)據(jù)發(fā)送到下游的問(wèn)題砂吞,還是會(huì)造成下游要么丟數(shù)據(jù)署恍,要么為了不丟數(shù)據(jù) buffer 不斷擴(kuò)充導(dǎo)致下游 OOM的問(wèn)題
綜上所述,我們發(fā)現(xiàn)了蜻直,上游 Producer 端必須有一個(gè)限流的策略盯质,且靜態(tài)限流是不可靠的袁串,于是就需要一個(gè)動(dòng)態(tài)限流的策略』脚梗可以采用下圖動(dòng)態(tài)反饋所示:
下游 Consumer 端會(huì)頻繁地向上游 Producer 端進(jìn)行動(dòng)態(tài)反饋般婆,告訴 Producer 下游 Consumer 的負(fù)載能力,從而 Producer 端動(dòng)態(tài)調(diào)整向下游 Consumer 發(fā)送數(shù)據(jù)的速率實(shí)現(xiàn) Producer 端的動(dòng)態(tài)限流朵逝。當(dāng) Consumer 端處理較慢時(shí),Consumer 將負(fù)載反饋到 Producer 端乡范,Producer端會(huì)根據(jù)反饋適當(dāng)降低 Producer 自身從上游或者 Source 端讀數(shù)據(jù)的速率來(lái)降低向下游 Consumer 發(fā)送數(shù)據(jù)的速率配名。當(dāng) Consumer 處理負(fù)載能力提升后,又及時(shí)向 Producer 端反饋晋辆,Producer 會(huì)通過(guò)提升從上游或 Source 端讀數(shù)據(jù)的速率來(lái)提升向下游發(fā)送數(shù)據(jù)的速率渠脉。通過(guò)這個(gè)動(dòng)態(tài)反饋來(lái)提升整個(gè)系統(tǒng)的吞吐量。
補(bǔ)充一點(diǎn)瓶佳,如下圖所示芋膘,假如我們的 Job 分為 Task A、B霸饲、C为朋,Task A 是 Source Task、Task B 處理數(shù)據(jù)厚脉、Task C 是 Sink Task习寸。假如 Task C 由于各種原因吞吐量降低,會(huì)將負(fù)載信息反饋給 Task B傻工,Task B 會(huì)降低向 Task C 發(fā)送數(shù)據(jù)的速率霞溪,此時(shí)如果 Task B 如果還是一直從 Task A 讀取數(shù)據(jù),那么按照同樣的道理中捆,數(shù)據(jù)會(huì)把 Task B 的 Send Buffer 和 Receive Buffer 撐爆鸯匹,又會(huì)出現(xiàn)上面描述的問(wèn)題。所以泄伪,當(dāng) Task B 的 Send Buffer 和 Receive Buffer 被用完后殴蓬,Task B 會(huì)用同樣的原理將負(fù)載信息反饋給 Task A,Task A 收到 Task B 的負(fù)載信息后臂容,會(huì)降低 給 Task B 發(fā)送數(shù)據(jù)的速率科雳,以此類(lèi)推。
上面這個(gè)流程脓杉,就是 Flink 動(dòng)態(tài)限流(反壓機(jī)制)的簡(jiǎn)單描述糟秘。我們可以看到 Flink 的反壓其實(shí)是從下游往上游傳播的,一直往上傳播到 Source Task 后球散,Source Task 最終會(huì)降低從 Source 端讀取數(shù)據(jù)的速率尿赚。如果下游 Task C 的負(fù)載能力提升后,會(huì)及時(shí)反饋給 Task B,于是 Task B 會(huì)提升往 Task C 發(fā)送數(shù)據(jù)的速率凌净,Task B 又將負(fù)載提升的信息反饋給 Task A悲龟,Task A 就會(huì)提升從 Source 端讀取數(shù)據(jù)的速率,從而提升整個(gè)系統(tǒng)的負(fù)載能力冰寻。
讀到這里须教,我們應(yīng)該知道 Flink 為什么需要一個(gè)網(wǎng)絡(luò)流控機(jī)制了,并且知道 Flink 的網(wǎng)絡(luò)流控機(jī)制必須是一個(gè)動(dòng)態(tài)反饋的過(guò)程斩芭。但是還有以下幾個(gè)問(wèn)題:
- 數(shù)據(jù)具體是怎么從上游 Producer 端發(fā)送到下游 Consumer 端的轻腺?
- Flink 的動(dòng)態(tài)限流具體是怎么實(shí)現(xiàn)的?下游的負(fù)載能力和壓力是如何傳遞給上游的划乖?
我們帶著這兩個(gè)問(wèn)題贬养,學(xué)習(xí)下面的 Flink 網(wǎng)絡(luò)流控與反壓機(jī)制
Flink V1.5 版之前網(wǎng)絡(luò)流控介紹
在 Flink V1.5 版之前,其實(shí) Flink 并沒(méi)有刻意做上述所說(shuō)的動(dòng)態(tài)反饋琴庵。那么問(wèn)題來(lái)了误算,沒(méi)有做上述的動(dòng)態(tài)反饋機(jī)制,F(xiàn)link 難道不怕數(shù)據(jù)丟失或者上游和下游的一些 Buffer 把內(nèi)存撐爆嗎迷殿?當(dāng)然不怕了儿礼,因?yàn)?Flink 已經(jīng)依賴其他機(jī)制來(lái)實(shí)現(xiàn)了所謂的動(dòng)態(tài)反饋。其實(shí)很簡(jiǎn)單贪庙,讓我們繼續(xù)往下看蜘犁。
如下圖所示,對(duì)于一個(gè) Flink 任務(wù)止邮,動(dòng)態(tài)反饋可以抽象成以下兩個(gè)階段:
- 跨 Task这橙,動(dòng)態(tài)反饋如何從下游 Task 的 Receive Buffer 反饋給上游 Task 的 Send Buffer
當(dāng)下游 Task C 的 Receive Buffer 滿了,如何告訴上游 Task B 應(yīng)該降低數(shù)據(jù)發(fā)送速率
當(dāng)下游 Task C 的 Receive Buffer 空了导披,如何告訴上游 Task B 應(yīng)該提升數(shù)據(jù)發(fā)送速率
-
注:這里又分了兩種情況屈扎,Task B 和 Task C 可能在同一臺(tái)節(jié)點(diǎn)上運(yùn)行,也有可能不在同一個(gè)臺(tái)節(jié)點(diǎn)運(yùn)行
- Task B 和 Task C 在同一臺(tái)節(jié)點(diǎn)上運(yùn)行指的是:一臺(tái)節(jié)點(diǎn)運(yùn)行了一個(gè)或多個(gè) TaskManager撩匕,包含了多個(gè) Slot鹰晨,Task B 和 Task C 都運(yùn)行在這臺(tái)節(jié)點(diǎn)上,且 Task B 是 Task C 的上游止毕,給 Task C 發(fā)送數(shù)據(jù)模蜡。此時(shí) Task B 給 Task C 發(fā)送數(shù)據(jù)實(shí)際上是同一個(gè) JVM 內(nèi)的數(shù)據(jù)發(fā)送,所以不存在網(wǎng)絡(luò)通信
- Task B 和 Task C 不在同一臺(tái)節(jié)點(diǎn)上運(yùn)行指的是:Task B 和 Task C 運(yùn)行在不同的 TaskManager 中扁凛,且 Task B 是 Task C 的上游忍疾,給 Task C 發(fā)送數(shù)據(jù)。此時(shí) Task B 給 Task C 發(fā)送數(shù)據(jù)是跨節(jié)點(diǎn)的谨朝,所以會(huì)存在網(wǎng)絡(luò)通信
- Task 內(nèi)卤妒,動(dòng)態(tài)反饋如何從內(nèi)部的 Send Buffer 反饋給內(nèi)部的 Receive Buffer
- 當(dāng) Task B 的 Send Buffer 滿了甥绿,如何告訴 Task B 內(nèi)部的 Receive Buffer 下游 Send Buffer 滿了、下游處理性能不行了则披?因?yàn)橐?Task B 的 Receive Buffer 感受到壓力共缕,才能把下游的壓力傳遞到 Task A
- 當(dāng) Task B 的 Send Buffer 空了,如何告訴 Task B 內(nèi)部的 Receive Buffer 下游 Send Buffer 空了士复,下游處理性能很強(qiáng)图谷,上游加快處理數(shù)據(jù)吧
跨 TaskManager,反壓如何向上游傳播
先了解一下 Flink 的 TaskManager 之間網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)流向:
圖中阱洪,我們可以看到 TaskManager A 給 TaskManager B 發(fā)送數(shù)據(jù)蜓萄,TaskManager A 做為 Producer,TaskManager B 做為 Consumer澄峰。Producer 端的 Operator 實(shí)例會(huì)產(chǎn)生數(shù)據(jù),最后通過(guò)網(wǎng)絡(luò)發(fā)送給 Consumer 端的 Operator 實(shí)例辟犀。Producer 端 Operator 實(shí)例生產(chǎn)的數(shù)據(jù)首先緩存到 TaskManager 內(nèi)部的 NetWork Buffer俏竞。NetWork 依賴 Netty 來(lái)做通信,Producer 端的 Netty 內(nèi)部有 ChannelOutbound Buffer堂竟,Consumer 端的 Netty 內(nèi)部有 ChannelInbound Buffer魂毁。Netty 最終還是要通過(guò) Socket 發(fā)送網(wǎng)絡(luò)請(qǐng)求,Socket 這一層也會(huì)有 Buffer出嘹,Producer 端有 Send Buffer席楚,Consumer 端有 Receive Buffer。
總結(jié)一下税稼,現(xiàn)在有兩個(gè) TaskManager A烦秩、B,TaskManager A 中 Producer Operator 處理完的數(shù)據(jù)由 TaskManager B 中 Consumer Operator 處理郎仆。那么 Producer Operator 處理完的數(shù)據(jù)是怎么到達(dá) Consumer Operator 的只祠?首先 Producer Operator 從自己的上游或者外部數(shù)據(jù)源讀取到數(shù)據(jù)后,對(duì)一條條的數(shù)據(jù)進(jìn)行處理扰肌,處理完的數(shù)據(jù)首先輸出到 Producer Operator 對(duì)應(yīng)的 NetWork Buffer 中抛寝。Buffer 寫(xiě)滿或者超時(shí)后,就會(huì)觸發(fā)將 NetWork Buffer 中的數(shù)據(jù)拷貝到 Producer 端 Netty 的 ChannelOutbound Buffer曙旭,之后又把數(shù)據(jù)拷貝到 Socket 的 Send Buffer 中盗舰,這里有一個(gè)從用戶態(tài)拷貝到內(nèi)核態(tài)的過(guò)程,最后通過(guò) Socket 發(fā)送網(wǎng)絡(luò)請(qǐng)求桂躏,把 Send Buffer 中的數(shù)據(jù)發(fā)送到 Consumer 端的 Receive Buffer钻趋。數(shù)據(jù)到達(dá) Consumer 端后,再依次從 Socket 的 Receive Buffer 拷貝到 Netty 的 ChannelInbound Buffer沼头,再拷貝到 Consumer Operator 的 NetWork Buffer爷绘,最后 Consumer Operator 就可以讀到數(shù)據(jù)進(jìn)行處理了书劝。這就是兩個(gè) TaskManager 之間的數(shù)據(jù)傳輸過(guò)程,我們可以看到發(fā)送方和接收方各有三層的 Buffer土至。
了解了數(shù)據(jù)傳輸流程购对,我們?cè)倬唧w了解一下跨 TaskManager 的反壓過(guò)程,如下圖所示陶因,Producer 端生產(chǎn)數(shù)據(jù)速率為 2骡苞,Consumer 消費(fèi)數(shù)據(jù)速率為 1。持續(xù)下去,下游消費(fèi)較慢废岂,Buffer 容量又是有限的鸳址,那 Flink 反壓是怎么做的?
上面介紹后躲株,我們知道每個(gè) Operator 計(jì)算數(shù)據(jù)時(shí),輸出和輸入都有對(duì)應(yīng)的 NetWork Buffer镣衡,這個(gè) NetWork Buffer 對(duì)應(yīng)到 Flink 就是圖中所示的 ResultSubPartition 和 InputChannel霜定。ResultSubPartition 和 InputChannel 都是向 LocalBufferPool 申請(qǐng) Buffer 空間,然后 LocalBufferPool 再向 NetWork BufferPool 申請(qǐng)內(nèi)存空間廊鸥。這里望浩,NetWork BufferPool 是 TaskManager 內(nèi)所有 Task 共享的 BufferPool,TaskManager 初始化時(shí)就會(huì)向堆外內(nèi)存申請(qǐng) NetWork BufferPool惰说。LocalBufferPool 是每個(gè) Task 自己的 BufferPool磨德,假如一個(gè) TaskManager 內(nèi)運(yùn)行著 5 個(gè) Task,那么就會(huì)有 5 個(gè) LocalBufferPool吆视,但 TaskManager 內(nèi)永遠(yuǎn)只有一個(gè) NetWork BufferPool典挑。Netty 的 Buffer 也是初始化時(shí)直接向堆外內(nèi)存申請(qǐng)內(nèi)存空間。雖然可以申請(qǐng)揩环,但是必須明白內(nèi)存申請(qǐng)肯定是有限制的搔弄,不可能無(wú)限制的申請(qǐng),我們?cè)趩?dòng)任務(wù)時(shí)可以指定該任務(wù)最多可能申請(qǐng)多大的內(nèi)存空間用于 NetWork Buffer丰滑。
我們繼續(xù)分析我們的場(chǎng)景顾犹, Producer 端生產(chǎn)數(shù)據(jù)速率為2,Consumer 端消費(fèi)數(shù)據(jù)速率為1褒墨。數(shù)據(jù)從 Task A 的 ResultSubPartition 按照上面的流程最后傳輸?shù)?Task B 的 InputChannel 供 Task B 讀取并計(jì)算炫刷。持續(xù)一段時(shí)間后,由于 Task B 消費(fèi)比較慢郁妈,導(dǎo)致 InputChannel 被占滿了浑玛,所以 InputChannel 向 LocalBufferPool 申請(qǐng)新的 Buffer 空間,LocalBufferPool 分配給 InputChannel 一些 Buffer噩咪。
再持續(xù)一段時(shí)間后顾彰,InputChannel 重復(fù)向 LocalBufferPool 申請(qǐng) Buffer 空間极阅,導(dǎo)致 LocalBufferPool 也滿了,所以 LocalBufferPool 向 NetWork BufferPool 申請(qǐng) Buffer 空間涨享,NetWork BufferPool 給 LocalBufferPool 分配 Buffer筋搏。
再持續(xù)下去,NetWork BufferPool 滿了厕隧,或者說(shuō) NetWork BufferPool 不能把自己的 Buffer 全分配給 Task B 對(duì)應(yīng)的 LocalBufferPool 奔脐,因?yàn)?TaskManager 上一般會(huì)運(yùn)行了多個(gè) Task,每個(gè) Task 只能使用 NetWork BufferPool 中的一部分吁讨。所以髓迎,可以認(rèn)為 Task B 把自己可以使用的 InputChannel 、 LocalBufferPool 和 NetWork BufferPool 都用完了建丧。此時(shí) Netty 還想把數(shù)據(jù)寫(xiě)入到 InputChannel排龄,但是發(fā)現(xiàn) InputChannel 滿了,所以 Socket 層會(huì)把 Netty 的 autoRead disable翎朱,Netty 不會(huì)再?gòu)?Socket 中去讀消息涣雕。可以看到下圖中多個(gè) ?闭翩,表示 Buffer 已滿,數(shù)據(jù)已經(jīng)不能往下游寫(xiě)了迄埃,發(fā)生了阻塞疗韵。
由于 Netty 不從 Socket 的 Receive Buffer 讀數(shù)據(jù)了,所以很快 Socket 的 Receive Buffer 就會(huì)變滿侄非,TCP 的 Socket 通信有動(dòng)態(tài)反饋的流控機(jī)制蕉汪,會(huì)把容量為0的消息反饋給上游發(fā)送端,所以上游的 Socket 就不會(huì)往下游再發(fā)送數(shù)據(jù) 逞怨。
Task A 持續(xù)生產(chǎn)數(shù)據(jù)者疤,發(fā)送端 Socket 的 Send Buffer 很快被打滿,所以 Task A 端的 Netty 也會(huì)停止往 Socket 寫(xiě)數(shù)據(jù)叠赦。
接下來(lái)驹马,數(shù)據(jù)會(huì)在 Netty 的 Buffer 中緩存數(shù)據(jù),但 Netty 的 Buffer 是無(wú)界的除秀。但可以設(shè)置 Netty 的高水位糯累,即:設(shè)置一個(gè) Netty 中 Buffer 的上限。所以每次 ResultSubPartition 向 Netty 中寫(xiě)數(shù)據(jù)時(shí)册踩,都會(huì)檢測(cè) Netty 是否已經(jīng)到達(dá)高水位泳姐,如果達(dá)到高水位就不會(huì)再往 Netty 中寫(xiě)數(shù)據(jù),防止 Netty 的 Buffer 無(wú)限制的增長(zhǎng)暂吉。
接下來(lái)胖秒,數(shù)據(jù)會(huì)在 Task A 的 ResultSubPartition 中累積缎患,ResultSubPartition 滿了后,會(huì)向 LocalBufferPool 申請(qǐng)新的 Buffer 空間阎肝,LocalBufferPool 分配給 ResultSubPartition 一些 Buffer挤渔。
持續(xù)下去 LocalBufferPool 也會(huì)用完,LocalBufferPool 再向 NetWork BufferPool 申請(qǐng) Buffer盗痒。
然后 NetWork BufferPool 也會(huì)用完蚂蕴,或者說(shuō) NetWork BufferPool 不能把自己的 Buffer 全分配給 Task A 對(duì)應(yīng)的 LocalBufferPool ,因?yàn)?TaskManager 上一般會(huì)運(yùn)行了多個(gè) Task俯邓,每個(gè) Task 只能使用 NetWork BufferPool 中的一部分骡楼。此時(shí),Task A 已經(jīng)申請(qǐng)不到任何的 Buffer 了稽鞭,Task A 的 Record Writer 輸出就被 wait 鸟整,Task A 不再生產(chǎn)數(shù)據(jù)。
通過(guò)上述的這個(gè)流程朦蕴,來(lái)動(dòng)態(tài)反饋篮条,保障各個(gè) Buffer 都不會(huì)因?yàn)閿?shù)據(jù)太多導(dǎo)致內(nèi)存溢出。上面描述了整個(gè)阻塞的流程吩抓,當(dāng)下游 Task B 持續(xù)消費(fèi)涉茧,Buffer 的可用容量會(huì)增加,所有被阻塞的數(shù)據(jù)通道會(huì)被一個(gè)個(gè)打開(kāi)疹娶,之后 Task A 又可以開(kāi)始正常的生產(chǎn)數(shù)據(jù)了伴栓。
之前介紹,Task 之間的數(shù)據(jù)傳輸可能存在上游的 Task A 和下游的 Task B 運(yùn)行在同一臺(tái)節(jié)點(diǎn)的情況雨饺,整個(gè)流程與上述類(lèi)似钳垮,只不過(guò)由于 Task A 和 B 運(yùn)行在同一個(gè) JVM,所以不需要網(wǎng)絡(luò)傳輸?shù)沫h(huán)節(jié)额港,Task B 的 InputChannel 會(huì)直接從 Task A 的 ResultSubPartition 讀取數(shù)據(jù)饺窿。
Task 內(nèi)部,反壓如何向上游傳播
假如 Task A 的下游所有 Buffer 都占滿了移斩,那么 Task A 的 Record Writer 會(huì)被 block肚医,Task A 的 Record Reader、Operator向瓷、Record Writer 都屬于同一個(gè)線程忍宋,所以 Task A 的 Record Reader 也會(huì)被 block。
然后可以把這里的 Task A 類(lèi)比成上面所說(shuō)的 Task B风罩,Task A 上游持續(xù)高速率發(fā)送數(shù)據(jù)到 Task A 就會(huì)導(dǎo)致可用的 InputChannel糠排、 LocalBufferPool 和 NetWork BufferPool 都會(huì)被用完。然后 Netty 超升、Socket 同理將壓力傳輸?shù)?Task A 的上游入宦。
假設(shè) Task A 的上游是 Task X哺徊,那么 Task A 將壓力反饋給 Task X 的過(guò)程與 Task B 將壓力反饋給 Task A 的過(guò)程是一樣的。整個(gè) Flink 的反壓是從下游往上游傳播的乾闰,一直傳播到 Source Task落追,Source Task 有壓力后,會(huì)降低從外部組件中讀取數(shù)據(jù)的速率涯肩,例如:Source Task 會(huì)降低從 Kafka 中讀取數(shù)據(jù)的速率轿钠,來(lái)降低整個(gè) Flink Job 中緩存的數(shù)據(jù),從而降低負(fù)載病苗。
所以得出的結(jié)論是:Flink 1.5之前并沒(méi)有特殊的機(jī)制來(lái)處理反壓疗垛,因?yàn)?Flink 中的數(shù)據(jù)傳輸相當(dāng)于已經(jīng)提供了應(yīng)對(duì)反壓的機(jī)制。
Flink V1.5 版之前的反壓策略存在的問(wèn)題
看著挺完美的反壓機(jī)制硫朦,其實(shí)是有問(wèn)題的贷腕。如下圖所示,我們的任務(wù)有4個(gè) SubTask咬展,SubTask A 是 SubTask B的上游泽裳,即 SubTask A 給 SubTask B 發(fā)送數(shù)據(jù)。Job 運(yùn)行在兩個(gè) TaskManager中破婆, TaskManager 1 運(yùn)行著 SubTask A.1 和 SubTask A.2涮总, TaskManager 2 運(yùn)行著 SubTask B.3 和 SubTask B.4。現(xiàn)在假如由于CPU共享或者內(nèi)存緊張或者磁盤(pán)IO瓶頸造成 SubTask B.4 遇到瓶頸祷舀、處理速率有所下降妹卿,但是上游源源不斷地生產(chǎn)數(shù)據(jù),所以導(dǎo)致 SubTask A.2 與 SubTask B.4 產(chǎn)生反壓蔑鹦。
這里需要明確一點(diǎn):不同 Job 之間的每個(gè)(遠(yuǎn)程)網(wǎng)絡(luò)連接將在 Flink 的網(wǎng)絡(luò)堆棧中獲得自己的TCP通道。 但是箕宙,如果同一 Task 的不同 SubTask 被安排到同一個(gè)TaskManager嚎朽,則它們與其他 TaskManager 的網(wǎng)絡(luò)連接將被多路復(fù)用并共享一個(gè)TCP信道以減少資源使用。例如柬帕,圖中的 A.1 -> B.3哟忍、A.1 -> B.4、A.2 -> B.3陷寝、A.2 -> B.4 這四條將會(huì)多路復(fù)用共享一個(gè) TCP 信道锅很。
現(xiàn)在 SubTask B.3 并沒(méi)有壓力,從上面跨 TaskManager 的反壓流程凤跑,我們知道當(dāng)上圖中 SubTask A.2 與 SubTask B.4 產(chǎn)生反壓時(shí)爆安,會(huì)把 TaskManager1 端該任務(wù)對(duì)應(yīng) Socket 的 Send Buffer 和 TaskManager2 端該任務(wù)對(duì)應(yīng) Socket 的 Receive Buffer 占滿,多路復(fù)用的 TCP 通道已經(jīng)被占住了仔引,會(huì)導(dǎo)致 SubTask A.1 和 SubTask A.2 要發(fā)送給 SubTask B.3 的數(shù)據(jù)全被阻塞了扔仓,從而導(dǎo)致本來(lái)沒(méi)有壓力的 SubTask B.3 現(xiàn)在接收不到數(shù)據(jù)了褐奥。所以,F(xiàn)link 1.5 版之前的反壓機(jī)制會(huì)存在當(dāng)一個(gè) Task 出現(xiàn)反壓時(shí)翘簇,可能導(dǎo)致其他正常的 Task 接收不到數(shù)據(jù)撬码。
Credit的反壓策略實(shí)現(xiàn)原理
Flink 1.5 之后,為了解決上述所描述的問(wèn)題版保,引入了基于 Credit 的反壓機(jī)制呜笑。如下圖所示,反壓機(jī)制作用于 Flink 的應(yīng)用層彻犁,即在 ResultSubPartition 和 InputChannel 這一層引入了反壓機(jī)制叫胁。每次上游 SubTask A.2 給下游 SubTask B.4 發(fā)送數(shù)據(jù)時(shí),會(huì)把 Buffer 中的數(shù)據(jù)和上游 ResultSubPartition 堆積的數(shù)據(jù)量 Backlog size發(fā)給下游袖裕,下游會(huì)接收上游發(fā)來(lái)的數(shù)據(jù)曹抬,并向上游反饋目前下游現(xiàn)在的 Credit 值,Credit 值表示目前下游可以接收上游的 Buffer 量急鳄,1 個(gè)Buffer 等價(jià)于 1 個(gè) Credit 谤民。
例如,上游 SubTask A.2 發(fā)送完數(shù)據(jù)后疾宏,還有 5 個(gè) Buffer 被積壓张足,那么會(huì)把發(fā)送數(shù)據(jù)和 Backlog size = 5 一塊發(fā)送給下游 SubTask B.4,下游接受到數(shù)據(jù)后坎藐,知道上游積壓了 5 個(gè)Buffer为牍,于是向 Buffer Pool 申請(qǐng) Buffer,由于容量有限岩馍,下游 InputChannel 目前僅有 2 個(gè) Buffer 空間碉咆,所以,SubTask B.4 會(huì)向上游 SubTask A.2 反饋 Channel Credit = 2蛀恩。然后上游下一次最多只給下游發(fā)送 2 個(gè) Buffer 的數(shù)據(jù)疫铜,這樣每次上游發(fā)送的數(shù)據(jù)都是下游 InputChannel 的 Buffer 可以承受的數(shù)據(jù)量,所以通過(guò)這種反饋策略双谆,保證了不會(huì)在公用的 Netty 和 TCP 這一層數(shù)據(jù)堆積而影響其他 SubTask 通信壳咕。
ResultSubPartition 會(huì)把 buffer 和 backlog size 同時(shí)發(fā)送給下游,下游向上游反饋 credit顽馋。再用一個(gè)案例來(lái)詳細(xì)地描述一下整個(gè)過(guò)程谓厘。
Task A 向 Task B 發(fā)送了數(shù)據(jù) <8,9> 和 backlog size =3,下游 InputChannel 接受完 <8,9> 后寸谜,發(fā)現(xiàn)上游目前積壓了 3 條數(shù)據(jù)竟稳,但是自己的緩沖區(qū)不夠,于是向 LocalBufferPool 申請(qǐng) buffer 空間,申請(qǐng)成功后住练,向上游反饋 credit = 3地啰,表示下游目前可以接受 3 條記錄(實(shí)際上是以 Buffer 為單位,而不是記錄數(shù)讲逛,F(xiàn)link 將真實(shí)記錄序列化后的二進(jìn)制數(shù)據(jù)放到 Buffer 中)亏吝,然后上游下次最多發(fā)送 3 條數(shù)據(jù)給下游。
持續(xù)下去盏混,上游生產(chǎn)數(shù)據(jù)速率比下游消費(fèi)速率快蔚鸥,所以 LocalBufferPool 和 NetWork BufferPool 都會(huì)被申請(qǐng)完,下游的 InputChannel 沒(méi)有可用的緩沖區(qū)了许赃,所以會(huì)向上游反饋 credit = 0止喷,然后上游就不會(huì)發(fā)送數(shù)據(jù)到 Netty。所以基于 Credit 的反壓策略不會(huì)導(dǎo)致 Netty 和 Socket 的數(shù)據(jù)積壓混聊。當(dāng)然上游也不會(huì)一直不發(fā)送數(shù)據(jù)到下游弹谁,上游會(huì)定期地僅發(fā)送 backlog size 給下游,直到下游反饋 credit > 0 時(shí)句喜,上游就會(huì)繼續(xù)發(fā)送真正的數(shù)據(jù)到下游了预愤。
基于 Credit 的反壓機(jī)制還帶來(lái)了一個(gè)優(yōu)勢(shì):由于我們?cè)诎l(fā)送方和接收方之間緩存較少的數(shù)據(jù),可能會(huì)更早地將反壓反饋給上游咳胃,緩沖更多數(shù)據(jù)只是把數(shù)據(jù)緩沖在內(nèi)存中植康,并沒(méi)有提高處理性能。
Flink 如何在吞吐量和延遲之間做權(quán)衡展懈?
Flink 天然支持流式處理销睁,即每來(lái)一條數(shù)據(jù)就能處理一條,而不是像 Spark Streaming 一樣存崖,完全是微批處理冻记。但是為了提高吞吐量,默認(rèn)使用的 Flink 并不是每來(lái)一條數(shù)據(jù)就處理一條来惧。那這個(gè)到底是怎么控制的呢冗栗?
我們分析了上述的網(wǎng)絡(luò)傳輸后,知道每個(gè) SubTask 輸出的數(shù)據(jù)并不是直接輸出到下游违寞,而是在 ResultSubPartition 中有一個(gè) Buffer 用來(lái)緩存一批數(shù)據(jù)后,再 Flush 到 Netty 發(fā)送到下游 SubTask偶房。那到底哪些情況會(huì)觸發(fā) Buffer Flush 到 Netty 呢趁曼?
Buffer 變滿時(shí)
Buffer timeout 時(shí)
特殊事件來(lái)臨時(shí),例如:CheckPoint 的 barrier 來(lái)臨時(shí)
Flink 在數(shù)據(jù)傳輸時(shí)棕洋,會(huì)把數(shù)據(jù)序列化成二進(jìn)制然后寫(xiě)到 Buffer 中挡闰,當(dāng) Buffer 滿了,需要 Flush(默認(rèn)為32KiB,通過(guò)taskmanager.memory.segment-size設(shè)置)摄悯。但是當(dāng)流量低峰或者測(cè)試環(huán)節(jié)赞季,可能1分鐘都沒(méi)有 32 KB的數(shù)據(jù),就會(huì)導(dǎo)致1分鐘內(nèi)的數(shù)據(jù)都積攢在 Buffer 中不會(huì)發(fā)送到下游 Task 去處理奢驯,從而導(dǎo)致數(shù)據(jù)出現(xiàn)延遲申钩,這并不是我們想看到的。所以 Flink 有一個(gè) Buffer timeout 的策略瘪阁,意思是當(dāng)數(shù)據(jù)量比較少撒遣,Buffer 一直沒(méi)有變滿時(shí),后臺(tái)的 Output flusher 線程會(huì)強(qiáng)制地將 Buffer 中的數(shù)據(jù) Flush 到下游管跺。Flink 中默認(rèn) timeout 時(shí)間是 100ms义黎,即:Buffer 中的數(shù)據(jù)要么變滿時(shí) Flush,要么最多等 100ms 也會(huì) Flush 來(lái)保證數(shù)據(jù)不會(huì)出現(xiàn)很大的延遲豁跑。當(dāng)然這個(gè)可以通過(guò) env.setBufferTimeout(timeoutMillis)
來(lái)控制超時(shí)時(shí)間廉涕。
- timeoutMillis > 0 表示最長(zhǎng)等待 timeoutMillis 時(shí)間,就會(huì)flush
- timeoutMillis = 0 表示每條數(shù)據(jù)都會(huì)觸發(fā) flush艇拍,直接將數(shù)據(jù)發(fā)送到下游狐蜕,相當(dāng)于沒(méi)有Buffer了(避免設(shè)置為0,可能導(dǎo)致性能下降)
- timeoutMillis = -1 表示只有等到 buffer滿了或 CheckPoint的時(shí)候淑倾,才會(huì)flush馏鹤。相當(dāng)于取消了 timeout 策略
嚴(yán)格來(lái)講,Output flusher 不提供任何保證——它只向 Netty 發(fā)送通知娇哆,而 Netty 線程會(huì)按照能力與意愿進(jìn)行處理湃累。這也意味著如果存在反壓,則 Output flusher 是無(wú)效的碍讨。言外之意治力,如果反壓很?chē)?yán)重,下游 Buffer 都滿了勃黍,當(dāng)然不能強(qiáng)制一直往下游發(fā)數(shù)據(jù)宵统。
一些特殊的消息如果通過(guò) RecordWriter 發(fā)送,也會(huì)觸發(fā)立即 Flush 緩存的數(shù)據(jù)覆获。其中最重要的消息包括 Checkpoint barrier 以及 end-of-partition 事件马澈,這些事件應(yīng)該盡快被發(fā)送,而不應(yīng)該等待 Buffer 被填滿或者 Output flusher 的下一次 Flush弄息。當(dāng)然如果出現(xiàn)反壓痊班,CheckPoint barrier 也會(huì)等待,不能發(fā)送到下游摹量。
引入 Network buffers 以獲得更高的資源利用率和更高的吞吐量涤伐,代價(jià)是讓一些記錄在 Buffer 中等待一段時(shí)間馒胆。雖然可以通過(guò)緩沖區(qū)超時(shí)給出此等待時(shí)間的上限,但你可能知道有關(guān)這兩個(gè)維度(延遲和吞吐量)之間權(quán)衡的更多信息:顯然凝果,無(wú)法同時(shí)獲得這兩者祝迂。下圖是 Flink 官網(wǎng)的博客展示的不同的 buffer timeout 下對(duì)應(yīng)的吞吐量,從0毫秒開(kāi)始(每個(gè)記錄都 flush)到100毫秒(默認(rèn)值)器净,測(cè)試在具有 100 個(gè)節(jié)點(diǎn)每個(gè)節(jié)點(diǎn) 8 個(gè) Slot 的群集上運(yùn)行型雳,每個(gè)節(jié)點(diǎn)運(yùn)行沒(méi)有業(yè)務(wù)邏輯的 Task,因此只用于測(cè)試網(wǎng)絡(luò)協(xié)議棧掌动。為了進(jìn)行比較四啰,還測(cè)試了低延遲改進(jìn)之前的 Flink 1.4 版本。
如圖粗恢,使用 Flink 1.5+柑晒,即使是非常低的 Buffer timeout(例如1ms,對(duì)于低延遲場(chǎng)景)也提供高達(dá)超時(shí)默認(rèn)參數(shù)(100ms)75% 的最大吞吐眷射,但會(huì)緩存更少的數(shù)據(jù)匙赞。但是筆者仍然不理解為什么 timeout 設(shè)置為0時(shí),吞吐量竟然能比 Flink 1.4 的吞吐量提高那么多妖碉。Credit 只是解決了反壓的問(wèn)題涌庭,并不能優(yōu)化低延遲的吞吐量。楊華老師的回答是網(wǎng)絡(luò)協(xié)議棧做了其他優(yōu)化而且性能測(cè)試是在特定場(chǎng)景下做的欧宜。筆者后續(xù)會(huì)繼續(xù)深入學(xué)習(xí)研究 Flink 網(wǎng)絡(luò)通信來(lái)解決筆者目前的疑問(wèn)坐榆。
參考文獻(xiàn):
flink-china系列課程----2.7 Flink網(wǎng)絡(luò)流控及反壓剖析
Flink 官網(wǎng)兩篇關(guān)于 Flink 網(wǎng)絡(luò)協(xié)議棧的博客:
A Deep-Dive into Flink's Network Stack
Flink Network Stack Vol. 2: Monitoring, Metrics, and that Backpressure Thing