Flink 原理與實現(xiàn):如何處理反壓問題

流處理系統(tǒng)需要能優(yōu)雅地處理反壓(backpressure)問題春瞬。反壓通常產(chǎn)生于這樣的場景:短時負載高峰導致系統(tǒng)接收數(shù)據(jù)的速率遠高于它處理數(shù)據(jù)的速率柴信。許多日常問題都會導致反壓,例如宽气,垃圾回收停頓可能會導致流入的數(shù)據(jù)快速堆積随常,或者遇到大促或秒殺活動導致流量陡增。反壓如果不能得到正確的處理萄涯,可能會導致資源耗盡甚至系統(tǒng)崩潰绪氛。

目前主流的流處理系統(tǒng) Storm/JStorm/Spark Streaming/Flink 都已經(jīng)提供了反壓機制,不過其實現(xiàn)各不相同涝影。

Storm 是通過監(jiān)控 Bolt 中的接收隊列負載情況枣察,如果超過高水位值就會將反壓信息寫到 Zookeeper ,Zookeeper 上的 watch 會通知該拓撲的所有 Worker 都進入反壓狀態(tài),最后 Spout 停止發(fā)送 tuple序目。具體實現(xiàn)可以看這個 JIRA STORM-886臂痕。

JStorm 認為直接停止 Spout 的發(fā)送太過暴力,存在大量問題宛琅。當下游出現(xiàn)阻塞時刻蟹,上游停止發(fā)送,下游消除阻塞后嘿辟,上游又開閘放水舆瘪,過了一會兒,下游又阻塞红伦,上游又限流英古,如此反復,整個數(shù)據(jù)流會一直處在一個顛簸狀態(tài)昙读。所以 JStorm 是通過逐級降速來進行反壓的召调,效果會較 Storm 更為穩(wěn)定,但算法也更復雜蛮浑。另外 JStorm 沒有引入 Zookeeper 而是通過 TopologyMaster 來協(xié)調(diào)拓撲進入反壓狀態(tài)唠叛,這降低了 Zookeeper 的負載。

1. Flink 中的反壓

那么 Flink 是怎么處理反壓的呢沮稚?答案非常簡單:Flink 沒有使用任何復雜的機制來解決反壓問題艺沼,因為根本不需要那樣的方案!它利用自身作為純數(shù)據(jù)流引擎的優(yōu)勢來優(yōu)雅地響應反壓問題蕴掏。下面我們會深入分析 Flink 是如何在 Task 之間傳輸數(shù)據(jù)的障般,以及數(shù)據(jù)流如何實現(xiàn)自然降速的。

Flink 在運行時主要由 operatorsstreams 兩大組件構(gòu)成盛杰。每個 operator 會消費中間態(tài)的流挽荡,并在流上進行轉(zhuǎn)換,然后生成新的流即供。對于 Flink 的網(wǎng)絡機制一種形象的類比是定拟,F(xiàn)link 使用了高效有界的分布式阻塞隊列,就像 Java 通用的阻塞隊列(BlockingQueue)一樣逗嫡。還記得經(jīng)典的線程間通信案例:生產(chǎn)者消費者模型嗎办素?使用 BlockingQueue 的話,一個較慢的接受者會降低發(fā)送者的發(fā)送速率祸穷,因為一旦隊列滿了(有界隊列)發(fā)送者會被阻塞。Flink 解決反壓的方案就是這種感覺勺三。

在 Flink 中雷滚,這些分布式阻塞隊列就是這些邏輯流,而隊列容量是通過緩沖池(LocalBufferPool)來實現(xiàn)的吗坚。每個被生產(chǎn)和被消費的流都會被分配一個緩沖池祈远。緩沖池管理著一組緩沖(Buffer)呆万,緩沖在被消費后可以被回收循環(huán)利用。這很好理解:你從池子中拿走一個緩沖车份,填上數(shù)據(jù)谋减,在數(shù)據(jù)消費完之后,又把緩沖還給池子扫沼,之后你可以再次使用它出爹。

在解釋 Flink 的反壓原理之前,我們必須先對 Flink 中網(wǎng)絡傳輸?shù)膬?nèi)存管理有個了解缎除。

1.1 網(wǎng)絡傳輸中的內(nèi)存管理

如下圖所示展示了 Flink 在網(wǎng)絡傳輸場景下的內(nèi)存管理严就。網(wǎng)絡上傳輸?shù)臄?shù)據(jù)會寫到 Task 的 InputGate(IG) 中,經(jīng)過 Task 的處理后器罐,再由 Task 寫到 ResultPartition(RS) 中梢为。每個 Task 都包括了輸入和輸入,輸入和輸出的數(shù)據(jù)存在 Buffer 中(都是字節(jié)數(shù)據(jù))轰坊。Buffer 是 MemorySegment 的包裝類铸董。

  1. TaskManager(TM)在啟動時,會先初始化NetworkEnvironment對象肴沫,TM 中所有與網(wǎng)絡相關(guān)的東西都由該類來管理(如 Netty 連接)粟害,其中就包括NetworkBufferPool。根據(jù)配置樊零,F(xiàn)link 會在 NetworkBufferPool 中生成一定數(shù)量(默認2048)的內(nèi)存塊 MemorySegment(關(guān)于 Flink 的內(nèi)存管理我磁,后續(xù)文章會詳細談到),內(nèi)存塊的總數(shù)量就代表了網(wǎng)絡傳輸中所有可用的內(nèi)存驻襟。NetworkEnvironment 和 NetworkBufferPool 是 Task 之間共享的夺艰,每個 TM 只會實例化一個。

  2. Task 線程啟動時沉衣,會向 NetworkEnvironment 注冊郁副,NetworkEnvironment 會為 Task 的 InputGate(IG)和 ResultPartition(RP) 分別創(chuàng)建一個 LocalBufferPool(緩沖池)并設置可申請的 MemorySegment(內(nèi)存塊)數(shù)量。IG 對應的緩沖池初始的內(nèi)存塊數(shù)量與 IG 中 InputChannel 數(shù)量一致豌习,RP 對應的緩沖池初始的內(nèi)存塊數(shù)量與 RP 中的 ResultSubpartition 數(shù)量一致存谎。不過,每當創(chuàng)建或銷毀緩沖池時肥隆,NetworkBufferPool 會計算剩余空閑的內(nèi)存塊數(shù)量既荚,并平均分配給已創(chuàng)建的緩沖池。注意栋艳,這個過程只是指定了緩沖池所能使用的內(nèi)存塊數(shù)量恰聘,并沒有真正分配內(nèi)存塊,只有當需要時才分配。為什么要動態(tài)地為緩沖池擴容呢晴叨?因為內(nèi)存越多凿宾,意味著系統(tǒng)可以更輕松地應對瞬時壓力(如GC),不會頻繁地進入反壓狀態(tài)兼蕊,所以我們要利用起那部分閑置的內(nèi)存塊初厚。

  3. 在 Task 線程執(zhí)行過程中,當 Netty 接收端收到數(shù)據(jù)時孙技,為了將 Netty 中的數(shù)據(jù)拷貝到 Task 中产禾,InputChannel(實際是 RemoteInputChannel)會向其對應的緩沖池申請內(nèi)存塊(上圖中的①)。如果緩沖池中也沒有可用的內(nèi)存塊且已申請的數(shù)量還沒到池子上限绪杏,則會向 NetworkBufferPool 申請內(nèi)存塊(上圖中的②)并交給 InputChannel 填上數(shù)據(jù)(上圖中的③和④)下愈。如果緩沖池已申請的數(shù)量達到上限了呢?或者 NetworkBufferPool 也沒有可用內(nèi)存塊了呢蕾久?這時候势似,Task 的 Netty Channel 會暫停讀取,上游的發(fā)送端會立即響應停止發(fā)送僧著,拓撲會進入反壓狀態(tài)履因。當 Task 線程寫數(shù)據(jù)到 ResultPartition 時,也會向緩沖池請求內(nèi)存塊盹愚,如果沒有可用內(nèi)存塊時栅迄,會阻塞在請求內(nèi)存塊的地方,達到暫停寫入的目的皆怕。

  4. 當一個內(nèi)存塊被消費完成之后(在輸入端是指內(nèi)存塊中的字節(jié)被反序列化成對象了毅舆,在輸出端是指內(nèi)存塊中的字節(jié)寫入到 Netty Channel 了),會調(diào)用 Buffer.recycle() 方法愈腾,會將內(nèi)存塊還給 LocalBufferPool (上圖中的⑤)憋活。如果LocalBufferPool中當前申請的數(shù)量超過了池子容量(由于上文提到的動態(tài)容量,由于新注冊的 Task 導致該池子容量變惺啤)悦即,則LocalBufferPool會將該內(nèi)存塊回收給 NetworkBufferPool(上圖中的⑥)。如果沒超過池子容量橱乱,則會繼續(xù)留在池子中辜梳,減少反復申請的開銷。

1.2 反壓的過程

下面這張圖簡單展示了兩個 Task 之間的數(shù)據(jù)傳輸以及 Flink 如何感知到反壓的:

  1. 記錄“A”進入了 Flink 并且被 Task 1 處理泳叠。(這里省略了 Netty 接收作瞄、反序列化等過程)
  2. 記錄被序列化到 buffer 中。
  3. 該 buffer 被發(fā)送到 Task 2危纫,然后 Task 2 從這個 buffer 中讀出記錄宗挥。

不要忘了:記錄能被 Flink 處理的前提是节预,必須有空閑可用的 Buffer。

結(jié)合上面兩張圖看:Task 1 在輸出端有一個相關(guān)聯(lián)的 LocalBufferPool(稱緩沖池1)属韧,Task 2 在輸入端也有一個相關(guān)聯(lián)的 LocalBufferPool(稱緩沖池2)。如果緩沖池1中有空閑可用的 buffer 來序列化記錄 “A”蛤吓,我們就序列化并發(fā)送該 buffer宵喂。

這里我們需要注意兩個場景:

  • 本地傳輸:如果 Task 1 和 Task 2 運行在同一個 worker 節(jié)點(TaskManager),該 buffer 可以直接交給下一個 Task会傲。一旦 Task 2 消費了該 buffer锅棕,則該 buffer 會被緩沖池1回收。如果 Task 2 的速度比 1 慢淌山,那么 buffer 回收的速度就會趕不上 Task 1 取 buffer 的速度裸燎,導致緩沖池1無可用的 buffer,Task 1 等待在可用的 buffer 上泼疑。最終形成 Task 1 的降速德绿。
  • 遠程傳輸:如果 Task 1 和 Task 2 運行在不同的 worker 節(jié)點上,那么 buffer 會在發(fā)送到網(wǎng)絡(TCP Channel)后被回收退渗。在接收端移稳,會從 LocalBufferPool 中申請 buffer,然后拷貝網(wǎng)絡中的數(shù)據(jù)到 buffer 中会油。如果沒有可用的 buffer个粱,會停止從 TCP 連接中讀取數(shù)據(jù)。在輸出端翻翩,通過 Netty 的水位值機制來保證不往網(wǎng)絡中寫入太多數(shù)據(jù)(后面會說)都许。如果網(wǎng)絡中的數(shù)據(jù)(Netty輸出緩沖中的字節(jié)數(shù))超過了高水位值,我們會等到其降到低水位值以下才繼續(xù)寫入數(shù)據(jù)嫂冻。這保證了網(wǎng)絡中不會有太多的數(shù)據(jù)胶征。如果接收端停止消費網(wǎng)絡中的數(shù)據(jù)(由于接收端緩沖池沒有可用 buffer),網(wǎng)絡中的緩沖數(shù)據(jù)就會堆積絮吵,那么發(fā)送端也會暫停發(fā)送弧烤。另外,這會使得發(fā)送端的緩沖池得不到回收蹬敲,writer 阻塞在向 LocalBufferPool 請求 buffer暇昂,阻塞了 writer 往 ResultSubPartition 寫數(shù)據(jù)。

這種固定大小緩沖池就像阻塞隊列一樣伴嗡,保證了 Flink 有一套健壯的反壓機制急波,使得 Task 生產(chǎn)數(shù)據(jù)的速度不會快于消費的速度。我們上面描述的這個方案可以從兩個 Task 之間的數(shù)據(jù)傳輸自然地擴展到更復雜的 pipeline 中瘪校,保證反壓機制可以擴散到整個 pipeline

1.3 Netty 水位值機制

下方的代碼是初始化 NettyServer 時配置的水位值參數(shù)澄暮。

// 默認高水位值為2個buffer大小, 當接收端消費速度跟不上名段,發(fā)送端會立即感知到
bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, config.getMemorySegmentSize() + 1);
bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 2 * config.getMemorySegmentSize());

當輸出緩沖中的字節(jié)數(shù)超過了高水位值, 則 Channel.isWritable() 會返回false。當輸出緩存中的字節(jié)數(shù)又掉到了低水位值以下, 則 Channel.isWritable() 會重新返回true泣懊。Flink 中發(fā)送數(shù)據(jù)的核心代碼在 PartitionRequestQueue 中伸辟,該類是 server channel pipeline 的最后一層。發(fā)送數(shù)據(jù)關(guān)鍵代碼如下所示馍刮。

private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {
  if (fatalError) {
    return;
  }

  Buffer buffer = null;

  try {
    // channel.isWritable() 配合 WRITE_BUFFER_LOW_WATER_MARK 
    // 和 WRITE_BUFFER_HIGH_WATER_MARK 實現(xiàn)發(fā)送端的流量控制
    if (channel.isWritable()) {
      // 注意: 一個while循環(huán)也就最多只發(fā)送一個BufferResponse, 連續(xù)發(fā)送BufferResponse是通過writeListener回調(diào)實現(xiàn)的
      while (true) {
        if (currentPartitionQueue == null && (currentPartitionQueue = queue.poll()) == null) {
          return;
        }

        buffer = currentPartitionQueue.getNextBuffer();

        if (buffer == null) {
          // 跳過這部分代碼
          ...
        }
        else {
          // 構(gòu)造一個response返回給客戶端
          BufferResponse resp = new BufferResponse(buffer, currentPartitionQueue.getSequenceNumber(), currentPartitionQueue.getReceiverId());

          if (!buffer.isBuffer() &&
              EventSerializer.fromBuffer(buffer, getClass().getClassLoader()).getClass() == EndOfPartitionEvent.class) {
            // 跳過這部分代碼信夫。batch 模式中 subpartition 的數(shù)據(jù)準備就緒,通知下游消費者卡啰。
            ...
          }

          // 將該response發(fā)到netty channel, 當寫成功后, 
          // 通過注冊的writeListener又會回調(diào)進來, 從而不斷地消費 queue 中的請求
          channel.writeAndFlush(resp).addListener(writeListener);

          return;
        }
      }
    }
  }
  catch (Throwable t) {
    if (buffer != null) {
      buffer.recycle();
    }

    throw new IOException(t.getMessage(), t);
  }
}

// 當水位值降下來后(channel 再次可寫)静稻,會重新觸發(fā)發(fā)送函數(shù)
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
  writeAndFlushNextMessageIfPossible(ctx.channel());
}

核心發(fā)送方法中如果channel不可寫,則會跳過發(fā)送匈辱。當channel再次可寫后振湾,Netty 會調(diào)用該Handle的 channelWritabilityChanged 方法,從而重新觸發(fā)發(fā)送函數(shù)亡脸。

1.4. 反壓實驗

另外押搪,官方博客中為了展示反壓的效果,給出了一個簡單的實驗梗掰。下面這張圖顯示了:隨著時間的改變嵌言,生產(chǎn)者(黃色線)和消費者(綠色線)每5秒的平均吞吐與最大吞吐(在單一JVM中每秒達到8百萬條記錄)的百分比。我們通過衡量task每5秒鐘處理的記錄數(shù)來衡量平均吞吐及穗。該實驗運行在單 JVM 中摧茴,不過使用了完整的 Flink 功能棧。

首先埂陆,我們運行生產(chǎn)task到它最大生產(chǎn)速度的60%(我們通過Thread.sleep()來模擬降速)苛白。消費者以同樣的速度處理數(shù)據(jù)。然后焚虱,我們將消費task的速度降至其最高速度的30%购裙。你就會看到背壓問題產(chǎn)生了,正如我們所見鹃栽,生產(chǎn)者的速度也自然降至其最高速度的30%躏率。接著,停止消費task的人為降速民鼓,之后生產(chǎn)者和消費者task都達到了其最大的吞吐薇芝。接下來,我們再次將消費者的速度降至30%丰嘉,pipeline給出了立即響應:生產(chǎn)者的速度也被自動降至30%夯到。最后,我們再次停止限速饮亏,兩個task也再次恢復100%的速度耍贾≡乃總而言之,我們可以看到:生產(chǎn)者和消費者在 pipeline 中的處理都在跟隨彼此的吞吐而進行適當?shù)恼{(diào)整荐开,這就是我們希望看到的反壓的效果付翁。

2. 反壓監(jiān)控

在 Storm/JStorm 中,只要監(jiān)控到隊列滿了晃听,就可以記錄下拓撲進入反壓了胆敞。但是 Flink 的反壓太過于天然了,導致我們無法簡單地通過監(jiān)控隊列來監(jiān)控反壓狀態(tài)杂伟。Flink 在這里使用了一個 trick 來實現(xiàn)對反壓的監(jiān)控。如果一個 Task 因為反壓而降速了仍翰,那么它會卡在向 LocalBufferPool 申請內(nèi)存塊上赫粥。那么這時候,該 Task 的 stack trace 就會長下面這樣:

java.lang.Object.wait(Native Method)
o.a.f.[...].LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
o.a.f.[...].LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) <--- BLOCKING request
[...]

那么事情就簡單了予借。通過不斷地采樣每個 task 的 stack trace 就可以實現(xiàn)反壓監(jiān)控越平。

Flink 的實現(xiàn)中,只有當 Web 頁面切換到某個 Job 的 Backpressure 頁面灵迫,才會對這個 Job 觸發(fā)反壓檢測秦叛,因為反壓檢測還是挺昂貴的。JobManager 會通過 Akka 給每個 TaskManager 發(fā)送TriggerStackTraceSample消息瀑粥。默認情況下挣跋,TaskManager 會觸發(fā)100次 stack trace 采樣,每次間隔 50ms(也就是說一次反壓檢測至少要等待5秒鐘)狞换。并將這 100 次采樣的結(jié)果返回給 JobManager避咆,由 JobManager 來計算反壓比率(反壓出現(xiàn)的次數(shù)/采樣的次數(shù)),最終展現(xiàn)在 UI 上修噪。UI 刷新的默認周期是一分鐘查库,目的是不對 TaskManager 造成太大的負擔。

3. 總結(jié)

Flink 不需要一種特殊的機制來處理反壓黄琼,因為 Flink 中的數(shù)據(jù)傳輸相當于已經(jīng)提供了應對反壓的機制樊销。因此,F(xiàn)link 所能獲得的最大吞吐量由其 pipeline 中最慢的組件決定脏款。相對于 Storm/JStorm 的實現(xiàn)围苫,F(xiàn)link 的實現(xiàn)更為簡潔優(yōu)雅,源碼中也看不見與反壓相關(guān)的代碼弛矛,無需 Zookeeper/TopologyMaster 的參與也降低了系統(tǒng)的負載够吩,也利于對反壓更迅速的響應。

轉(zhuǎn)載http://wuchong.me/blog/2016/04/26/flink-internals-how-to-handle-backpressure/

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末丈氓,一起剝皮案震驚了整個濱河市周循,隨后出現(xiàn)的幾起案子强法,更是在濱河造成了極大的恐慌,老刑警劉巖湾笛,帶你破解...
    沈念sama閱讀 216,919評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件饮怯,死亡現(xiàn)場離奇詭異,居然都是意外死亡嚎研,警方通過查閱死者的電腦和手機蓖墅,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,567評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來临扮,“玉大人论矾,你說我怎么就攤上這事「擞拢” “怎么了贪壳?”我有些...
    開封第一講書人閱讀 163,316評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長蚜退。 經(jīng)常有香客問我闰靴,道長,這世上最難降的妖魔是什么钻注? 我笑而不...
    開封第一講書人閱讀 58,294評論 1 292
  • 正文 為了忘掉前任蚂且,我火速辦了婚禮,結(jié)果婚禮上幅恋,老公的妹妹穿的比我還像新娘杏死。我一直安慰自己,他們只是感情好捆交,可當我...
    茶點故事閱讀 67,318評論 6 390
  • 文/花漫 我一把揭開白布识埋。 她就那樣靜靜地躺著,像睡著了一般零渐。 火紅的嫁衣襯著肌膚如雪窒舟。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,245評論 1 299
  • 那天诵盼,我揣著相機與錄音惠豺,去河邊找鬼。 笑死风宁,一個胖子當著我的面吹牛洁墙,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播戒财,決...
    沈念sama閱讀 40,120評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼热监,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了饮寞?” 一聲冷哼從身側(cè)響起孝扛,我...
    開封第一講書人閱讀 38,964評論 0 275
  • 序言:老撾萬榮一對情侶失蹤列吼,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后苦始,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體寞钥,經(jīng)...
    沈念sama閱讀 45,376評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,592評論 2 333
  • 正文 我和宋清朗相戀三年陌选,在試婚紗的時候發(fā)現(xiàn)自己被綠了理郑。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,764評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡咨油,死狀恐怖您炉,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情役电,我是刑警寧澤邻吭,帶...
    沈念sama閱讀 35,460評論 5 344
  • 正文 年R本政府宣布,位于F島的核電站宴霸,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏膏蚓。R本人自食惡果不足惜瓢谢,卻給世界環(huán)境...
    茶點故事閱讀 41,070評論 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望驮瞧。 院中可真熱鬧氓扛,春花似錦、人聲如沸论笔。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,697評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽狂魔。三九已至蒜埋,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間最楷,已是汗流浹背整份。 一陣腳步聲響...
    開封第一講書人閱讀 32,846評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留籽孙,地道東北人烈评。 一個月前我還...
    沈念sama閱讀 47,819評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像犯建,于是被迫代替她去往敵國和親讲冠。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,665評論 2 354