?Flink + Kafka 整合數(shù)據(jù)一致性保證
1. Flink消費(fèi)kafka數(shù)據(jù)起始o(jì)ffset配置:Flink讀取Kafka數(shù)據(jù)確定開(kāi)始位置有以下幾種設(shè)置方式:
????????1) flinkKafkaConsumer.setStartFromEarliest():從topic的最早offset位置開(kāi)始處理數(shù)據(jù)纲刀,如果kafka中保存有消費(fèi)者組的消費(fèi)位置將被忽略秋麸。
????????2) flinkKafkaConsumer.setStartFromLatest():從topic的最新offset位置開(kāi)始處理數(shù)據(jù)猛计,如果kafka中保存有消費(fèi)者組的消費(fèi)位置將被忽略略吨。
????????3) flinkKafkaConsumer.setStartFromTimestamp(…):從指定的時(shí)間戳(毫秒)開(kāi)始消費(fèi)數(shù)據(jù),Kafka中每個(gè)分區(qū)中數(shù)據(jù)大于等于設(shè)置的時(shí)間戳的數(shù)據(jù)位置將被當(dāng)做開(kāi)始消費(fèi)的位置曼验。如果kafka中保存有消費(fèi)者組的消費(fèi)位置將被忽略泌射。
????????4) flinkKafkaConsumer.setStartFromGroupOffsets():默認(rèn)的設(shè)置。根據(jù)代碼中設(shè)置的group.id設(shè)置的消費(fèi)者組蚣驼,去kafka中或者zookeeper中找到對(duì)應(yīng)的消費(fèi)者offset位置消費(fèi)數(shù)據(jù)魄幕。如果沒(méi)有找到對(duì)應(yīng)的消費(fèi)者組的位置相艇,那么將按照auto.offset.reset設(shè)置的策略讀取offset颖杏。
2. Flink消費(fèi)kafka數(shù)據(jù),消費(fèi)者offset提交配置
配置offset的提交方式取決于是否為job設(shè)置開(kāi)啟checkpoint坛芽×舸ⅲ可以使用env.enableCheckpointing(5000)來(lái)設(shè)置開(kāi)啟checkpoint。5000單位毫秒,代表每5秒進(jìn)行依次checkpoint
●關(guān)閉checkpoint:如何禁用了checkpoint咙轩,那么offset位置的提交取決于Flink讀取kafka客戶端的配置获讳,enable.auto.commit ( auto.commit.enable【Kafka 0.8】)配置是否開(kāi)啟自動(dòng)提交offset, auto.commit.interval.ms決定自動(dòng)提交offset的周期。
●開(kāi)啟checkpoint:如果開(kāi)啟了checkpoint活喊,那么當(dāng)checkpoint保存狀態(tài)完成后丐膝,將checkpoint中保存的offset位置提交到kafka。這樣保證了Kafka中保存的offset和checkpoint中保存的offset一致钾菊,可以通過(guò)配置setCommitOffsetsOnCheckpoints(boolean)來(lái)配置是否將checkpoint中的offset提交到kafka中(默認(rèn)是true)帅矗。如果使用這種方式,那么properties中配置的kafka offset自動(dòng)提交參數(shù)enable.auto.commit和周期提交參數(shù)auto.commit.interval.ms參數(shù)將被忽略煞烫。
總結(jié):Flink提供了消費(fèi)kafka數(shù)據(jù)的offset如何提交給Kafka或者zookeeper(kafka0.8之前,因?yàn)?.8之前offset是維護(hù)在zookeeper中的)的配置 ;關(guān)閉checkpoint的話,flink消費(fèi)kafka數(shù)據(jù) offset取決于kafka客戶端的配置;開(kāi)啟checkpoint的話,flink消費(fèi)kafka offset由jobmanager中的checkpoint維護(hù),并同步到kafka中保持一置,注意浑此,F(xiàn)link并不依賴提交給Kafka或者zookeeper中的offset來(lái)保證容錯(cuò)。提交的offset只是為了外部來(lái)查詢監(jiān)視kafka數(shù)據(jù)消費(fèi)的情況滞详。
3. 使用checkpoint + 兩階段提交來(lái)保證僅消費(fèi)一次kafka中的數(shù)據(jù)
Flink checkpoint機(jī)制:? 這種機(jī)制是在Flink應(yīng)用內(nèi)部實(shí)現(xiàn)僅一次處理數(shù)據(jù)的基礎(chǔ)凛俱。
當(dāng)談及“exactly-once semantics”僅一次處理數(shù)據(jù)時(shí)紊馏,指的是每條數(shù)據(jù)只會(huì)影響最終結(jié)果一次。Flink可以保證當(dāng)機(jī)器出現(xiàn)故障或者程序出現(xiàn)錯(cuò)誤時(shí)蒲犬,也沒(méi)有重復(fù)的數(shù)據(jù)或者未被處理的數(shù)據(jù)出現(xiàn)朱监,實(shí)現(xiàn)僅一次處理的語(yǔ)義。
checkpoint中包含: 1).當(dāng)前應(yīng)用的狀態(tài);
????????????????????????????????2).當(dāng)前消費(fèi)流數(shù)據(jù)的位置;
注意:checkpoint機(jī)制僅限于Flink架構(gòu)內(nèi)部保證僅一次處理數(shù)據(jù);
使用兩階段提交協(xié)議保證flink連接外部系統(tǒng)數(shù)據(jù)僅一次處理;
當(dāng)Flink處理完的數(shù)據(jù)需要寫(xiě)入外部系統(tǒng)時(shí)原叮,不保證僅一次處理數(shù)據(jù)赌朋。為了提供端到端的僅一次處理數(shù)據(jù),在將數(shù)據(jù)寫(xiě)入外部系統(tǒng)時(shí)也要保證僅一次處理數(shù)據(jù)篇裁,這些外部系統(tǒng)必須提供一種手段來(lái)允許程序提交或者回滾寫(xiě)入操作沛慢,同時(shí)還要保證與Flink的checkpoint機(jī)制協(xié)調(diào)使用,在分布式系統(tǒng)中協(xié)調(diào)提交和回滾的常見(jiàn)方法就是兩階段提交協(xié)議。下面給出一個(gè)實(shí)例了解Flink如何使用兩階段提交協(xié)議來(lái)實(shí)現(xiàn)數(shù)據(jù)僅一次處理語(yǔ)義达布。
該實(shí)例是從kafka中讀取數(shù)據(jù)团甲,經(jīng)過(guò)處理數(shù)據(jù)之后將結(jié)果再寫(xiě)回kafka。kafka0.11版本之后支持事務(wù)黍聂,這也是Flink與kafka交互時(shí)僅一次處理的必要條件躺苦。【注意:當(dāng)Flink處理完的數(shù)據(jù)寫(xiě)入kafka時(shí)产还,即當(dāng)sink為kafka時(shí)匹厘,自動(dòng)封裝了兩階段提交協(xié)議】。Flink支持僅一次處理數(shù)據(jù)不僅僅限于和Kafka的結(jié)合脐区,只要sink提供了必要的兩階段協(xié)調(diào)實(shí)現(xiàn)愈诚,可以對(duì)任何sink都能實(shí)現(xiàn)僅一次處理數(shù)據(jù)語(yǔ)義。
其原理如下:
上圖Flink程序包含以下組件:
????????1. 一個(gè)從kafka中讀取數(shù)據(jù)的source
????????2. 一個(gè)窗口聚合操作
????????3. 一個(gè)將結(jié)果寫(xiě)往kafka的sink牛隅。
要使sink支持僅一次處理數(shù)據(jù)語(yǔ)義炕柔,必須以事務(wù)的方式將數(shù)據(jù)寫(xiě)往kafka,將兩次checkpoint之間的操作當(dāng)做一個(gè)事務(wù)提交,確保出現(xiàn)故障時(shí)操作能夠被回滾媒佣。假設(shè)出現(xiàn)故障匕累,在分布式多并發(fā)執(zhí)行sink的應(yīng)用程序中,僅僅執(zhí)行單次提交或回滾事務(wù)是不夠的默伍,因?yàn)榉植际街械母鱾€(gè)sink程序都必須對(duì)這些提交或者回滾達(dá)成共識(shí)欢嘿,這樣才能保證兩次checkpoint之間的數(shù)據(jù)得到一個(gè)一致性的結(jié)果。Flink使用兩階段提交協(xié)議(pre-commit+commit)來(lái)實(shí)現(xiàn)這個(gè)問(wèn)題也糊。
Filnk checkpointing開(kāi)始時(shí)就進(jìn)入到pre-commit階段炼蹦,具體來(lái)說(shuō),一旦checkpoint開(kāi)始显设,F(xiàn)link的JobManager向輸入流中寫(xiě)入一個(gè)checkpoint barrier將流中所有消息分隔成屬于本次checkpoint的消息以及屬于下次checkpoint的消息框弛,barrier也會(huì)在操作算子間流轉(zhuǎn),對(duì)于每個(gè)operator來(lái)說(shuō)捕捂,該barrier會(huì)觸發(fā)operator的State Backend來(lái)為當(dāng)前的operator來(lái)打快照瑟枫。如下圖示:
Flink DataSource中存儲(chǔ)著Kafka消費(fèi)的offset斗搞,當(dāng)完成快照保存后,將chechkpoint barrier傳遞給下一個(gè)operator慷妙。這種方式只有在Flink內(nèi)部狀態(tài)的場(chǎng)景是可行的僻焚,內(nèi)部狀態(tài)指的是由Flink的State Backend管理狀態(tài),例如上面的window的狀態(tài)就是內(nèi)部狀態(tài)管理膝擂。只有當(dāng)內(nèi)部狀態(tài)時(shí)虑啤,pre-commit階段無(wú)需執(zhí)行額外的操作,僅僅是寫(xiě)入一些定義好的狀態(tài)變量即可架馋,checkpoint成功時(shí)Flink負(fù)責(zé)提交這些狀態(tài)寫(xiě)入狞山,否則就不寫(xiě)入當(dāng)前狀態(tài)。
但是叉寂,一旦operator操作包含外部狀態(tài)萍启,事情就不一樣了。我們不能像處理內(nèi)部狀態(tài)一樣處理外部狀態(tài)屏鳍,因?yàn)橥獠繝顟B(tài)涉及到與外部系統(tǒng)的交互勘纯。這種情況下,外部系統(tǒng)必須要支持可以與兩階段提交協(xié)議綁定的事務(wù)才能保證僅一次處理數(shù)據(jù)钓瞭。
本例中的data sink是將數(shù)據(jù)寫(xiě)往kafka驳遵,因?yàn)閷?xiě)往kafka是有外部狀態(tài)的,這種情況下山涡,pre-commit階段下data sink 在保存狀態(tài)到State Backend的同時(shí)堤结,還必須pre-commit外部的事務(wù)。如下圖:
當(dāng)checkpoint barrier在所有的operator都傳遞一遍切對(duì)應(yīng)的快照都成功完成之后佳鳖,pre-commit階段才算完成霍殴。這個(gè)過(guò)程中所有創(chuàng)建的快照都被視為checkpoint的一部分,checkpoint中保存著整個(gè)應(yīng)用的全局狀態(tài)系吩,當(dāng)然也包含pre-commit階段提交的外部狀態(tài)。當(dāng)程序出現(xiàn)崩潰時(shí)妒蔚,我們可以回滾狀態(tài)到最新已經(jīng)完成快照的時(shí)間點(diǎn)穿挨。
下一步就是通知所有的operator,告訴它們checkpoint已經(jīng)完成肴盏,這便是兩階段提交的第二個(gè)階段:commit階段科盛。這個(gè)階段中JobManager會(huì)為應(yīng)用中的每個(gè)operator發(fā)起checkpoint已經(jīng)完成的回調(diào)邏輯。本例中菜皂,DataSource和Winow操作都沒(méi)有外部狀態(tài)贞绵,因此在該階段,這兩個(gè)operator無(wú)需執(zhí)行任何邏輯恍飘,但是Data Sink是有外部狀態(tài)的榨崩,因此此時(shí)我們需要提交外部事務(wù)谴垫。如下圖示:
匯總以上信息,總結(jié)得出:
1. 一旦所有的operator完成各自的pre-commit,他們會(huì)發(fā)起一個(gè)commit操作母蛛。
2. 如果一個(gè)operator的pre-commit失敗翩剪,所有其他的operator 的pre-commit必須被終止,并且Flink會(huì)回滾到最近成功完成的checkpoint位置彩郊。
3. 一旦pre-commit完成前弯,必須要確保commit也要成功,內(nèi)部的operator和外部的系統(tǒng)都要對(duì)此進(jìn)行保證秫逝。假設(shè)commit失敗【網(wǎng)絡(luò)故障原因】恕出,F(xiàn)link程序就會(huì)崩潰,然后根據(jù)用戶重啟策略執(zhí)行重啟邏輯违帆,重啟之后會(huì)再次commit剃根。
因此,所有的operator必須對(duì)checkpoint最終結(jié)果達(dá)成共識(shí)前方,即所有的operator都必須認(rèn)定數(shù)據(jù)提交要么成功執(zhí)行狈醉,要么被終止然后回滾。
4. Flink中外部狀態(tài)實(shí)現(xiàn)兩階段提交
Flink外部狀態(tài)實(shí)現(xiàn)兩階段提交將邏輯封裝到TwoPhaseComitSinkFunction類中惠险,下面擴(kuò)展TwoPhaseCommitSinkFunction來(lái)實(shí)現(xiàn)就文件的sink苗傅。若要實(shí)現(xiàn)支持exactly-once語(yǔ)義的文件sink,需要實(shí)現(xiàn)以下4個(gè)方法:
1. beginTransaction:開(kāi)啟一個(gè)事務(wù),創(chuàng)建一個(gè)臨時(shí)文件班巩,將數(shù)據(jù)寫(xiě)入到臨時(shí)文件中
2. preCommit:在pre-commit階段渣慕,flush緩存數(shù)據(jù)到磁盤(pán),然后關(guān)閉這個(gè)文件抱慌,確保不會(huì)有新的數(shù)據(jù)寫(xiě)入到這個(gè)文件逊桦,同時(shí)開(kāi)啟一個(gè)新事務(wù)執(zhí)行屬于下一個(gè)checkpoint的寫(xiě)入操作
3. commit:在commit階段,我們以原子性的方式將上一階段的文件寫(xiě)入真正的文件目錄下抑进∏烤【注意:數(shù)據(jù)有延時(shí),不是實(shí)時(shí)的】
4. abort:一旦異常終止事務(wù)寺渗,程序如何處理匿情。這里要清除臨時(shí)文件。
代碼如下:此次消費(fèi)kafka的數(shù)據(jù)不再寫(xiě)往kafka