Flink SQL 性能調(diào)優(yōu)--MiniBatch提升吞吐率

Flink 在設(shè)計(jì)和實(shí)現(xiàn)流計(jì)算算子時(shí),把“面向狀態(tài)編程”作為第一準(zhǔn)則职烧。因?yàn)樵诹饔?jì)算中涩盾,為了保證狀態(tài)(State)的一致性,需要將狀態(tài)數(shù)據(jù)存儲(chǔ)在狀態(tài)后端(StateBackend)丧蘸,由框架來做分布式快照。而目前主要使用的RocksDB狀態(tài)后端會(huì)在每次read和write操作時(shí)發(fā)生序列化和反序列化操作遥皂,甚至是磁盤的 I/O 操作力喷。因此狀態(tài)的相關(guān)操作通常都會(huì)成為整個(gè)任務(wù)的性能瓶頸,狀態(tài)的數(shù)據(jù)結(jié)構(gòu)設(shè)計(jì)以及對(duì)狀態(tài)的每一次訪問都需要特別注意演训。

微批(MiniBatch)的核心思想就是緩存一小批數(shù)據(jù)弟孟,在訪問狀態(tài)狀態(tài)時(shí),同key 的數(shù)據(jù)就只需要發(fā)生一次狀態(tài)的操作样悟。當(dāng)批次內(nèi)數(shù)據(jù)的 key 重復(fù)率較大時(shí)拂募,能顯著降低對(duì)狀態(tài)的訪問頻次庭猩,從而大幅提高吞吐。

下圖說明了MiniBatch如何減少狀態(tài)操作陈症。

minibatch_agg

當(dāng)未開啟 MiniBatch 時(shí)蔼水,Aggregate 的處理模式是每來一條數(shù)據(jù),查詢一次狀態(tài)录肯,進(jìn)行聚合計(jì)算趴腋,然后寫入一次狀態(tài)。當(dāng)有 4條數(shù)據(jù)時(shí)论咏,需要操作 2*4 次狀態(tài)

當(dāng)開啟 MiniBatch 時(shí)优炬,對(duì)于緩存下來的 N 條數(shù)據(jù)一起觸發(fā),同 key 的數(shù)據(jù)只會(huì)讀寫狀態(tài)一次厅贪。例如下緩存的 4 條 A 的記錄蠢护,只會(huì)對(duì)狀態(tài)讀寫各一次。所以當(dāng)數(shù)據(jù)的 key 的重復(fù)率越大卦溢,攢批的大小越大糊余,那么對(duì)狀態(tài)的訪問會(huì)越少,得到的吞吐量越高单寂。

適用場景

微批處理通過增加延遲換取高吞吐,如果您有超低延遲的要求吐辙,不建議開啟微批處理宣决。通常對(duì)于聚合的場景,微批處理可以顯著的提升系統(tǒng)性能昏苏,建議開啟尊沸。

開啟方式

MiniBatch默認(rèn)關(guān)閉,開啟方式如下:

// instantiate table environment
TableEnvironment tEnv = ...

// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true"); // enable mini-batch optimization
configuration.setString("table.exec.mini-batch.allow-latency", "5 s"); // use 5 seconds to buffer input records
configuration.setString("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task

MiniBatch生效的SQL語句

MiniBatch主要作用于聚合(Group By)語句中贤惯,且不帶window的場景洼专。

window agg

select count(a) from t group by tumble(ts, interval ’10’ second), b

以10秒翻轉(zhuǎn)窗口和字段b聚合,該場景MiniBatch不生效

group agg

select count(a) from t group by b

以字段a聚合孵构,該場景MiniBatch生效

over agg

select count(a) over (partition by b order by c) from t 

over window屁商,該場景MiniBatch不生效

MiniBatch二種聚合優(yōu)化

Local-Global 聚合

LocalGlobal優(yōu)化將原先的Aggregate分成Local+Global兩階段聚合,即MapReduce模型中的Combine+Reduce處理模式颈墅。

  • 第一階段在上游節(jié)點(diǎn)本地?cái)€一批數(shù)據(jù)進(jìn)行聚合(localAgg)蜡镶,并輸出這次微批的增量值(Accumulator)。
  • 第二階段再將收到的Accumulator合并(Merge)恤筛,得到最終的結(jié)果(GlobalAgg)官还。

LocalGlobal本質(zhì)上能夠靠localAgg的預(yù)聚合篩除部分傾斜數(shù)據(jù),從而降低globalAgg的熱點(diǎn)毒坛,提升性能望伦。

可以結(jié)合下圖及SQL理解LocalGlobal如何解決數(shù)據(jù)傾斜的問題林说。

SELECT color, sum(id)
FROM T
GROUP BY color
local_agg
  • 使用場景:

    • LocalGlobal適用于提升如SUM、COUNT屯伞、MAX腿箩、MIN和AVG等普通聚合的性能,能提高算子吞吐量愕掏,也能有效解決常見數(shù)據(jù)熱點(diǎn)問題度秘。
  • 配置:

    • table.optimizer.agg-phase-strategy開啟(默認(rèn)值已為AUTO開啟,所以不用配置)

    • // instantiate table environment
      TableEnvironment tEnv = ...
      
      // access flink configuration
      Configuration configuration = tEnv.getConfig().getConfiguration();
      // set low-level key-value options
      configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled
      configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
      configuration.setString("table.exec.mini-batch.size", "5000");
      configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); // enable two-phase, i.e. local-global aggregation
      

Partial-Final聚合

Local-Global聚合能針對(duì)常見普通聚合有較好的效果(如SUM饵撑、COUNT剑梳、MAX、MIN和AVG)滑潘。但是對(duì)于COUNT DISTINCT效果并不明顯垢乙。COUNT DISTINCT在local聚合時(shí),對(duì)于DISTINCT KEY的去重率不高语卤,導(dǎo)致在Global節(jié)點(diǎn)仍然存在熱點(diǎn)追逮。

PartialFinal優(yōu)化會(huì)自動(dòng)打散成兩層聚合,增加按distinct key取模的打散層粹舵,解決COUNT DISTINCT的熱點(diǎn)問題钮孵。

可以結(jié)合下圖及SQL理解Partial-Final如何解決COUNT DISTINCT熱點(diǎn)問題。

-- 原始SQL
SELECT day, COUNT(DISTINCT user_id)
FROM T
GROUP BY day

-- PartialFinal優(yōu)化后SQL
SELECT day, SUM(cnt)
FROM (
    SELECT day, COUNT(DISTINCT user_id) as cnt
    FROM T
    GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day
distinct_split
  • 使用場景:

    • PartialFinal優(yōu)化方法適用于使用COUNT DISTINCT且聚合節(jié)點(diǎn)性能無法滿足時(shí)眼滤。
    • PartialFinal優(yōu)化方法不能在包含UDAF的Flink SQL中使用巴席。
  • 配置:

    • table.optimizer.distinct-agg.split.enabled開啟(默認(rèn)值已為false,需要設(shè)置為true)

    • table.optimizer.distinct-agg.split.bucket-num(默認(rèn)值1024诅需,可以根據(jù)業(yè)務(wù)數(shù)據(jù)量和熱點(diǎn)情況漾唉,設(shè)置這個(gè)取模值)

    • // instantiate table environment
      TableEnvironment tEnv = ...
      
      tEnv.getConfig()        // access high-level configuration
        .getConfiguration()   // set low-level key-value options
        .setString("table.optimizer.distinct-agg.split.enabled", "true");  // enable distinct agg split
      
  • 建議: 數(shù)據(jù)量不大的情況下不建議使用PartialFinal優(yōu)化方法。PartialFinal優(yōu)化會(huì)自動(dòng)打散成兩層聚合堰塌,引入額外的網(wǎng)絡(luò)Shuffle赵刑,在數(shù)據(jù)量不大的情況下,可能反而會(huì)浪費(fèi)資源场刑。

數(shù)據(jù)抖動(dòng)現(xiàn)象

所謂數(shù)據(jù)抖動(dòng)問題是指般此,兩層 AGG 時(shí),第一層 AGG 發(fā)出的更新消息會(huì)拆成兩條獨(dú)立的消息被下游消費(fèi)摇邦,分別是retract 消息和 accumulate 消息恤煞。而當(dāng)?shù)诙?AGG 消費(fèi)這兩條消息時(shí)也會(huì)發(fā)出兩條消息。從前端看到就是數(shù)據(jù)會(huì)有抖動(dòng)的現(xiàn)象施籍。例如下面的例子居扒,統(tǒng)計(jì)買家數(shù),這里做了兩層打散丑慎,第一層先做 UV 統(tǒng)計(jì)喜喂,第二級(jí)做SUM瓤摧。

SELECT day, SUM(cnt) total
FROM (
  SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
  FROM T GROUP BY day, MOD(buy_id, 1024))
GROUP BY day

當(dāng)?shù)谝粚觕ount distinct的結(jié)果從100上升到101時(shí),它會(huì)發(fā)出 -100, +101 的兩條消息玉吁。當(dāng)?shù)诙拥?SUM 會(huì)依次收到這兩條消息并處理照弥,假設(shè)此時(shí) SUM 值是 900,那么在處理 -100 時(shí)进副,會(huì)先發(fā)出 800 的結(jié)果值这揣,然后處理 +101 時(shí),再發(fā)出 901 的結(jié)果值影斑。從用戶端的感受就是買家數(shù)從 900 降到了 800 又上升到了 901给赞,我們稱之為數(shù)據(jù)抖動(dòng)。而理論上買家數(shù)只應(yīng)該只增不減的矫户,所以我們也一直在思考如何解決這個(gè)問題片迅。

數(shù)據(jù)抖動(dòng)的本質(zhì)原因是 retract 和 accumulate 消息是一個(gè)事務(wù)中的兩個(gè)操作,但是這兩個(gè)操作的中間結(jié)果被用戶看到了皆辽,也就是傳統(tǒng)數(shù)據(jù)庫 ACID 中的隔離性中最弱的 READ UNCOMMITTED 的事務(wù)保障柑蛇。要從根本上解決這個(gè)問題的思路是,如何原子地處理 retract & accumulate 的消息驱闷。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末耻台,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子空另,更是在濱河造成了極大的恐慌粘我,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,490評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件痹换,死亡現(xiàn)場離奇詭異,居然都是意外死亡都弹,警方通過查閱死者的電腦和手機(jī)娇豫,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,581評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來畅厢,“玉大人冯痢,你說我怎么就攤上這事】蚨牛” “怎么了浦楣?”我有些...
    開封第一講書人閱讀 165,830評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長咪辱。 經(jīng)常有香客問我振劳,道長,這世上最難降的妖魔是什么油狂? 我笑而不...
    開封第一講書人閱讀 58,957評(píng)論 1 295
  • 正文 為了忘掉前任历恐,我火速辦了婚禮寸癌,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘弱贼。我一直安慰自己蒸苇,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,974評(píng)論 6 393
  • 文/花漫 我一把揭開白布吮旅。 她就那樣靜靜地躺著溪烤,像睡著了一般。 火紅的嫁衣襯著肌膚如雪庇勃。 梳的紋絲不亂的頭發(fā)上檬嘀,一...
    開封第一講書人閱讀 51,754評(píng)論 1 307
  • 那天,我揣著相機(jī)與錄音匪凉,去河邊找鬼枪眉。 笑死,一個(gè)胖子當(dāng)著我的面吹牛再层,可吹牛的內(nèi)容都是我干的贸铜。 我是一名探鬼主播,決...
    沈念sama閱讀 40,464評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼聂受,長吁一口氣:“原來是場噩夢啊……” “哼蒿秦!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起蛋济,我...
    開封第一講書人閱讀 39,357評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤棍鳖,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后碗旅,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體渡处,經(jīng)...
    沈念sama閱讀 45,847評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,995評(píng)論 3 338
  • 正文 我和宋清朗相戀三年祟辟,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了医瘫。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,137評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡旧困,死狀恐怖醇份,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情吼具,我是刑警寧澤僚纷,帶...
    沈念sama閱讀 35,819評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站拗盒,受9級(jí)特大地震影響怖竭,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜锣咒,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,482評(píng)論 3 331
  • 文/蒙蒙 一侵状、第九天 我趴在偏房一處隱蔽的房頂上張望赞弥。 院中可真熱鬧,春花似錦趣兄、人聲如沸绽左。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,023評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽拼窥。三九已至,卻和暖如春蹋凝,著一層夾襖步出監(jiān)牢的瞬間鲁纠,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,149評(píng)論 1 272
  • 我被黑心中介騙來泰國打工鳍寂, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留改含,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,409評(píng)論 3 373
  • 正文 我出身青樓迄汛,卻偏偏與公主長得像捍壤,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子鞍爱,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,086評(píng)論 2 355