1. 涉及組件
FlinkKafkaConsumer是用戶使用Kafka作為Source進(jìn)行編程的入口装蓬,它有一個(gè)核心組件KafkaFetcher是趴,用來消費(fèi)kafka中的數(shù)據(jù)侍筛,并向下游發(fā)送接收到的數(shù)據(jù)钞脂,如果調(diào)用了FlinkKafkaConsumer#assignTimestampsAndWatermarks
纵散,還負(fù)責(zé)WaterMark的發(fā)送圈驼,WaterMark是本篇文章的重點(diǎn)人芽。
我們先看下KafkaFetcher的組成
- 消費(fèi)線程,用來構(gòu)建KafkaConsumer客戶端绩脆,向Kafka請(qǐng)求指定的分區(qū)數(shù)據(jù)萤厅,將獲取的批量數(shù)據(jù)ConsumerRecords放入到Handover,Handerover可以看成一個(gè)同步隊(duì)列靴迫,生成一個(gè)必須等到被消費(fèi)后才能再生產(chǎn)
- 任務(wù)線程惕味,用來消費(fèi)Handerover中的數(shù)據(jù),將ConsumerRecords反序列化為一條條的數(shù)據(jù)玉锌,然后存儲(chǔ)在隊(duì)列ArrayDeque中赦拘,然后同一個(gè)循環(huán)來消費(fèi)該隊(duì)列中的消息,用來做三件事情(看圖吧芬沉,這里不寫了)
- 在創(chuàng)建KafkaFetcher時(shí)躺同,會(huì)根據(jù)watermark的發(fā)送間隔,向timeService提交一個(gè)定時(shí)任務(wù)丸逸,定時(shí)的更新每個(gè)partition的watermark蹋艺,然后取各個(gè)partition中最小的watermark,作為任務(wù)的候選watermark進(jìn)行更新黄刚,如果更新成功則會(huì)向下游發(fā)送
KafkaFetcher
2. WaterMark的傳播
下面是調(diào)用了FlinkKafkaConsumer#assignTimestampsAndWatermarks
之后捎谨,KafkaFetcher中管理WaterMark的示意圖
- 只是一個(gè)Task,該Task消費(fèi)2個(gè)分區(qū)
- 更新每個(gè)分區(qū)的WaterMark:KafkaTopicPartitionStateWithWatermarkGenerator用來執(zhí)行WatermarkGenerator.onPeriodicEmit方法,并通過多路復(fù)用器WatermarkOutputMultiplexer將每個(gè)partition生成的WaterMark存儲(chǔ)到OutputState中涛救,當(dāng)新生成的WaterMark大于存儲(chǔ)在OutputState中的WaterMark時(shí)畏邢,則更新OutputState中的WaterMark
-
更新Task WaterMark:通過多路復(fù)用器WatermarkOutputMultiplexer遍歷所有非IDLE狀態(tài)的OutputState的Watermark,取最小的作為最新的Task的WaterMark检吆,如果該值大于老的Task WaterMark舒萎,則更新并向下游發(fā)送
watermark管理
2.1 WaterMark傳播可能產(chǎn)生的問題:Window算子不被觸發(fā)
如圖,假設(shè)partition1沒有數(shù)據(jù)了蹭沛,它的watermark就不更新臂寝,則Task1由于Task WaterMark得不到更新,不往下面發(fā)送WM摊灭,而Task2發(fā)送WM(30)咆贬,下游任務(wù)接收后,也會(huì)取最小帚呼,還是10掏缎,這樣會(huì)導(dǎo)致下游的Window計(jì)算不會(huì)被觸發(fā)。
解決辦法是assignTimestampsAndWatermarks.withIdleness(Duration.ofMinutes(1))煤杀,上面的是示例眷蜈,表示如果某個(gè)partition在1分鐘內(nèi)沒有數(shù)據(jù)可供消費(fèi)了,則將該partition置為IDLE怜珍,在更新Task WaterMark將該partition的WaterMark忽略。當(dāng)所有的partition都IDLE了凤粗,則會(huì)向下游發(fā)送StreamStatus.IDLE事件酥泛,接下來發(fā)生的事情可以參考flink解析:EventTime與Watermark
2.2 API使用不當(dāng)產(chǎn)生的問題:丟失數(shù)據(jù)
final FlinkKafkaConsumer<String> producer = new FlinkKafkaConsumer<>(sourceTopic, new SimpleStringSchema(), properties);
env.addSource(producer).assignTimestampsAndWatermarks(getWatermarkStrategy()));
不是調(diào)用的FlinkKafkaConsumer#assignTimestampsAndWatermarks而是調(diào)用DataStreamSource#assignTimestampsAndWatermarks,可能會(huì)產(chǎn)生數(shù)據(jù)丟失的問題
- 代碼那樣寫嫌拣,consumer與assignTimestampsAndWatermarks就是2個(gè)operator了柔袁,WaterMark直接按照規(guī)則往下發(fā)了,當(dāng)40發(fā)過去后异逐,20過去就被當(dāng)成遲到數(shù)據(jù)了捶索,這需要注意