Flink 在設(shè)計(jì)和實(shí)現(xiàn)流計(jì)算算子時(shí),把“面向狀態(tài)編程”作為第一準(zhǔn)則职烧。因?yàn)樵诹饔?jì)算中涩盾,為了保證狀態(tài)(State)的一致性,需要將狀態(tài)數(shù)據(jù)存儲(chǔ)在狀態(tài)后端(StateBackend)丧蘸,由框架來做分布式快照。而目前主要使用的RocksDB狀態(tài)后端會(huì)在每次read和write操作時(shí)發(fā)生序列化和反序列化操作遥皂,甚至是磁盤的 I/O 操作力喷。因此狀態(tài)的相關(guān)操作通常都會(huì)成為整個(gè)任務(wù)的性能瓶頸,狀態(tài)的數(shù)據(jù)結(jié)構(gòu)設(shè)計(jì)以及對(duì)狀態(tài)的每一次訪問都需要特別注意演训。
微批(MiniBatch)的核心思想就是緩存一小批數(shù)據(jù)弟孟,在訪問狀態(tài)狀態(tài)時(shí),同key 的數(shù)據(jù)就只需要發(fā)生一次狀態(tài)的操作样悟。當(dāng)批次內(nèi)數(shù)據(jù)的 key 重復(fù)率較大時(shí)拂募,能顯著降低對(duì)狀態(tài)的訪問頻次庭猩,從而大幅提高吞吐。
下圖說明了MiniBatch如何減少狀態(tài)操作陈症。
當(dāng)未開啟 MiniBatch 時(shí)蔼水,Aggregate 的處理模式是每來一條數(shù)據(jù),查詢一次狀態(tài)录肯,進(jìn)行聚合計(jì)算趴腋,然后寫入一次狀態(tài)。當(dāng)有 4條數(shù)據(jù)時(shí)论咏,需要操作 2*4 次狀態(tài)
當(dāng)開啟 MiniBatch 時(shí)优炬,對(duì)于緩存下來的 N 條數(shù)據(jù)一起觸發(fā),同 key 的數(shù)據(jù)只會(huì)讀寫狀態(tài)一次厅贪。例如下緩存的 4 條 A 的記錄蠢护,只會(huì)對(duì)狀態(tài)讀寫各一次。所以當(dāng)數(shù)據(jù)的 key 的重復(fù)率越大卦溢,攢批的大小越大糊余,那么對(duì)狀態(tài)的訪問會(huì)越少,得到的吞吐量越高单寂。
適用場景
微批處理通過增加延遲換取高吞吐,如果您有超低延遲的要求吐辙,不建議開啟微批處理宣决。通常對(duì)于聚合的場景,微批處理可以顯著的提升系統(tǒng)性能昏苏,建議開啟尊沸。
開啟方式
MiniBatch默認(rèn)關(guān)閉,開啟方式如下:
// instantiate table environment
TableEnvironment tEnv = ...
// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true"); // enable mini-batch optimization
configuration.setString("table.exec.mini-batch.allow-latency", "5 s"); // use 5 seconds to buffer input records
configuration.setString("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task
MiniBatch生效的SQL語句
MiniBatch主要作用于聚合(Group By)語句中贤惯,且不帶window的場景洼专。
window agg
select count(a) from t group by tumble(ts, interval ’10’ second), b
以10秒翻轉(zhuǎn)窗口和字段b聚合,該場景MiniBatch不生效
group agg
select count(a) from t group by b
以字段a聚合孵构,該場景MiniBatch生效
over agg
select count(a) over (partition by b order by c) from t
over window屁商,該場景MiniBatch不生效
MiniBatch二種聚合優(yōu)化
Local-Global 聚合
LocalGlobal優(yōu)化將原先的Aggregate分成Local+Global兩階段聚合,即MapReduce模型中的Combine+Reduce處理模式颈墅。
- 第一階段在上游節(jié)點(diǎn)本地?cái)€一批數(shù)據(jù)進(jìn)行聚合(localAgg)蜡镶,并輸出這次微批的增量值(Accumulator)。
- 第二階段再將收到的Accumulator合并(Merge)恤筛,得到最終的結(jié)果(GlobalAgg)官还。
LocalGlobal本質(zhì)上能夠靠localAgg的預(yù)聚合篩除部分傾斜數(shù)據(jù),從而降低globalAgg的熱點(diǎn)毒坛,提升性能望伦。
可以結(jié)合下圖及SQL理解LocalGlobal如何解決數(shù)據(jù)傾斜的問題林说。
SELECT color, sum(id)
FROM T
GROUP BY color
-
使用場景:
- LocalGlobal適用于提升如SUM、COUNT屯伞、MAX腿箩、MIN和AVG等普通聚合的性能,能提高算子吞吐量愕掏,也能有效解決常見數(shù)據(jù)熱點(diǎn)問題度秘。
-
配置:
table.optimizer.agg-phase-strategy開啟(默認(rèn)值已為AUTO開啟,所以不用配置)
// instantiate table environment TableEnvironment tEnv = ... // access flink configuration Configuration configuration = tEnv.getConfig().getConfiguration(); // set low-level key-value options configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled configuration.setString("table.exec.mini-batch.allow-latency", "5 s"); configuration.setString("table.exec.mini-batch.size", "5000"); configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); // enable two-phase, i.e. local-global aggregation
Partial-Final聚合
Local-Global聚合能針對(duì)常見普通聚合有較好的效果(如SUM饵撑、COUNT剑梳、MAX、MIN和AVG)滑潘。但是對(duì)于COUNT DISTINCT效果并不明顯垢乙。COUNT DISTINCT在local聚合時(shí),對(duì)于DISTINCT KEY的去重率不高语卤,導(dǎo)致在Global節(jié)點(diǎn)仍然存在熱點(diǎn)追逮。
PartialFinal優(yōu)化會(huì)自動(dòng)打散成兩層聚合,增加按distinct key取模的打散層粹舵,解決COUNT DISTINCT的熱點(diǎn)問題钮孵。
可以結(jié)合下圖及SQL理解Partial-Final如何解決COUNT DISTINCT熱點(diǎn)問題。
-- 原始SQL
SELECT day, COUNT(DISTINCT user_id)
FROM T
GROUP BY day
-- PartialFinal優(yōu)化后SQL
SELECT day, SUM(cnt)
FROM (
SELECT day, COUNT(DISTINCT user_id) as cnt
FROM T
GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day
-
使用場景:
- PartialFinal優(yōu)化方法適用于使用COUNT DISTINCT且聚合節(jié)點(diǎn)性能無法滿足時(shí)眼滤。
- PartialFinal優(yōu)化方法不能在包含UDAF的Flink SQL中使用巴席。
-
配置:
table.optimizer.distinct-agg.split.enabled開啟(默認(rèn)值已為false,需要設(shè)置為true)
table.optimizer.distinct-agg.split.bucket-num(默認(rèn)值1024诅需,可以根據(jù)業(yè)務(wù)數(shù)據(jù)量和熱點(diǎn)情況漾唉,設(shè)置這個(gè)取模值)
// instantiate table environment TableEnvironment tEnv = ... tEnv.getConfig() // access high-level configuration .getConfiguration() // set low-level key-value options .setString("table.optimizer.distinct-agg.split.enabled", "true"); // enable distinct agg split
建議: 數(shù)據(jù)量不大的情況下不建議使用PartialFinal優(yōu)化方法。PartialFinal優(yōu)化會(huì)自動(dòng)打散成兩層聚合堰塌,引入額外的網(wǎng)絡(luò)Shuffle赵刑,在數(shù)據(jù)量不大的情況下,可能反而會(huì)浪費(fèi)資源场刑。
數(shù)據(jù)抖動(dòng)現(xiàn)象
所謂數(shù)據(jù)抖動(dòng)問題是指般此,兩層 AGG 時(shí),第一層 AGG 發(fā)出的更新消息會(huì)拆成兩條獨(dú)立的消息被下游消費(fèi)摇邦,分別是retract 消息和 accumulate 消息恤煞。而當(dāng)?shù)诙?AGG 消費(fèi)這兩條消息時(shí)也會(huì)發(fā)出兩條消息。從前端看到就是數(shù)據(jù)會(huì)有抖動(dòng)的現(xiàn)象施籍。例如下面的例子居扒,統(tǒng)計(jì)買家數(shù),這里做了兩層打散丑慎,第一層先做 UV 統(tǒng)計(jì)喜喂,第二級(jí)做SUM瓤摧。
SELECT day, SUM(cnt) total
FROM (
SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
FROM T GROUP BY day, MOD(buy_id, 1024))
GROUP BY day
當(dāng)?shù)谝粚觕ount distinct的結(jié)果從100上升到101時(shí),它會(huì)發(fā)出 -100, +101 的兩條消息玉吁。當(dāng)?shù)诙拥?SUM 會(huì)依次收到這兩條消息并處理照弥,假設(shè)此時(shí) SUM 值是 900,那么在處理 -100 時(shí)进副,會(huì)先發(fā)出 800 的結(jié)果值这揣,然后處理 +101 時(shí),再發(fā)出 901 的結(jié)果值影斑。從用戶端的感受就是買家數(shù)從 900 降到了 800 又上升到了 901给赞,我們稱之為數(shù)據(jù)抖動(dòng)。而理論上買家數(shù)只應(yīng)該只增不減的矫户,所以我們也一直在思考如何解決這個(gè)問題片迅。
數(shù)據(jù)抖動(dòng)的本質(zhì)原因是 retract 和 accumulate 消息是一個(gè)事務(wù)中的兩個(gè)操作,但是這兩個(gè)操作的中間結(jié)果被用戶看到了皆辽,也就是傳統(tǒng)數(shù)據(jù)庫 ACID 中的隔離性中最弱的 READ UNCOMMITTED 的事務(wù)保障柑蛇。要從根本上解決這個(gè)問題的思路是,如何原子地處理 retract & accumulate 的消息驱闷。