Flink--Checkpoint機(jī)制原理

如何理解flink中state(狀態(tài))

state泛指

  • state泛指:flink中有狀態(tài)函數(shù)和運(yùn)算符在各個(gè)元素(element)/事件(event)的處理過(guò)程中存儲(chǔ)的數(shù)據(jù)(注意:狀態(tài)數(shù)據(jù)可以修改和查詢,可以自己維護(hù)疹味,根據(jù)自己的業(yè)務(wù)場(chǎng)景,保存歷史數(shù)據(jù)或者中間結(jié)果到狀態(tài)(state)中);
  • 使用狀態(tài)計(jì)算的例子:
    • 當(dāng)應(yīng)用程序搜索某些事件模式時(shí)颜及,狀態(tài)將存儲(chǔ)到目前為止遇到的事件序列鳄抒。
    • 在每分鐘/小時(shí)/天聚合事件時(shí)屡穗,狀態(tài)保存待處理的聚合。
    • 當(dāng)在數(shù)據(jù)點(diǎn)流上訓(xùn)練機(jī)器學(xué)習(xí)模型時(shí)猴仑,狀態(tài)保持模型參數(shù)的當(dāng)前版本。
    • 當(dāng)需要管理歷史數(shù)據(jù)時(shí),狀態(tài)允許有效訪問(wèn)過(guò)去發(fā)生的事件辽俗。

案例理解state

  • 以例子理解flink的狀態(tài)(state)
  • 無(wú)狀態(tài)計(jì)算指的是數(shù)據(jù)進(jìn)入Flink后經(jīng)過(guò)算子時(shí)只需要對(duì)當(dāng)前數(shù)據(jù)進(jìn)行處理就能得到想要的結(jié)果疾渣;有狀態(tài)計(jì)算就是需要和歷史的一些狀態(tài)或進(jìn)行相關(guān)操作,才能計(jì)算出正確的結(jié)果崖飘;
    • 無(wú)狀態(tài)計(jì)算的例子:
      • 比如:我們只是進(jìn)行一個(gè)字符串拼接榴捡,輸入 a,輸出 a_666,輸入b朱浴,輸出 b_666輸出的結(jié)果跟之前的狀態(tài)沒(méi)關(guān)系吊圾,符合冪等性。
      • 冪等性:就是用戶對(duì)于同一操作發(fā)起的一次請(qǐng)求或者多次請(qǐng)求的結(jié)果是一致的翰蠢,不會(huì)因?yàn)槎啻吸c(diǎn)擊而產(chǎn)生了副作用项乒;
    • 有狀態(tài)計(jì)算的例子:
      • 以wordcount中計(jì)算pv/uv為例:
      • 輸出的結(jié)果跟之前的狀態(tài)有關(guān)系,不符合冪等性梁沧,訪問(wèn)多次檀何,pv會(huì)增加;

為什么需要state管理

  • 流式作業(yè)的特點(diǎn)是7*24小時(shí)運(yùn)行趁尼,數(shù)據(jù)不重復(fù)消費(fèi)埃碱,不丟失,保證只計(jì)算一次酥泞,數(shù)據(jù)實(shí)時(shí)產(chǎn)出不延遲砚殿,但是當(dāng)狀態(tài)很大,內(nèi)存容量限制芝囤,或者實(shí)例運(yùn)行奔潰似炎,或需要擴(kuò)展并發(fā)度等情況下,如何保證狀態(tài)正確的管理悯姊,在任務(wù)重新執(zhí)行的時(shí)候能正確執(zhí)行羡藐,狀態(tài)管理就顯得尤為重要。

理想中的state管理

  • 理想的狀態(tài)管理是:
    • 易用悯许,flink提供了豐富的數(shù)據(jù)結(jié)構(gòu)仆嗦,簡(jiǎn)潔易用的接口;
    • 高效先壕,flink對(duì)狀態(tài)的處理讀寫(xiě)快瘩扼,可以橫向擴(kuò)展,保存狀態(tài)不影響計(jì)算性能垃僚;
    • 可靠集绰,flink對(duì)狀態(tài)可以做持久化,而且可以保證exactly-once語(yǔ)義谆棺;

flink中checkpoint執(zhí)行流程

  • checkpoint機(jī)制是Flink可靠性的基石栽燕,可以保證Flink集群在某個(gè)算子因?yàn)槟承┰?如 異常退出)出現(xiàn)故障時(shí),能夠?qū)⒄麄€(gè)應(yīng)用流圖的狀態(tài)恢復(fù)到故障之前的某一狀態(tài),保 證應(yīng)用流圖狀態(tài)的一致性碍岔。Flink的checkpoint機(jī)制原理來(lái)自“Chandy-Lamport algorithm”算法浴讯。 (分布式快照算)
  • 每個(gè)需要checkpoint的應(yīng)用在啟動(dòng)時(shí),F(xiàn)link的JobManager為其創(chuàng)建一個(gè) CheckpointCoordinator付秕,CheckpointCoordinator全權(quán)負(fù)責(zé)本應(yīng)用的快照制作兰珍。


    flink_checkpoint.png
    1. CheckpointCoordinator周期性的向該流應(yīng)用的所有source算子發(fā)送barrier。
  • 2.當(dāng)某個(gè)source算子收到一個(gè)barrier時(shí)询吴,便暫停數(shù)據(jù)處理過(guò)程掠河,然后將自己的當(dāng)前狀 態(tài)制作成快照,并保存到指定的持久化存儲(chǔ)中猛计,最后向CheckpointCoordinator報(bào)告 自己快照制作情況唠摹,同時(shí)向自身所有下游算子廣播該barrier,恢復(fù)數(shù)據(jù)處理
  • 3.下游算子收到barrier之后奉瘤,會(huì)暫停自己的數(shù)據(jù)處理過(guò)程勾拉,然后將自身的相關(guān)狀態(tài)制作成快照,并保存到指定的持久化存儲(chǔ)中盗温,最后向CheckpointCoordinator報(bào)告自身 快照情況藕赞,同時(shí)向自身所有下游算子廣播該barrier,恢復(fù)數(shù)據(jù)處理卖局。
    1. 每個(gè)算子按照步驟3不斷制作快照并向下游廣播斧蜕,直到最后barrier傳遞到sink算子,快照制作完成砚偶。
    1. 當(dāng)CheckpointCoordinator收到所有算子的報(bào)告之后批销,認(rèn)為該周期的快照制作成功; 否則,如果在規(guī)定的時(shí)間內(nèi)沒(méi)有收到所有算子的報(bào)告染坯,則認(rèn)為本周期快照制作失敗 ;

checkpoint中保存的是什么信息

  • 帶著問(wèn)題找答案
    • 那CheckPoint具體做了哪些功能均芽,為什么任務(wù)掛掉之后,通過(guò)CheckPoint能使得任務(wù)恢復(fù)呢单鹿?
    • CheckPoint是通過(guò)給程序快照的方式使得將歷史某些時(shí)刻的狀態(tài)保存下來(lái)掀宋,當(dāng)任務(wù)掛掉之后,默認(rèn)從最近一次保存的完整快照處進(jìn)行恢復(fù)任務(wù)仲锄。問(wèn)題來(lái)了劲妙,快照是什么鬼?能吃嗎昼窗?
    • SnapShot翻譯為快照是趴,指將程序中某些信息存一份涛舍,后期可以用來(lái)恢復(fù)澄惊。對(duì)于一個(gè)Flink任務(wù)來(lái)講,快照里面到底保存著什么信息呢?
  • 以flink消費(fèi)kafka數(shù)據(jù)wordcount為例:
  • 我們從Kafka讀取到一條條的日志掸驱,從日志中解析出app_id肛搬,然后將統(tǒng)計(jì)的結(jié)果放到內(nèi)存中一個(gè)Map集合,app_id做為key毕贼,對(duì)應(yīng)的pv做為value温赔,每次只需要將相應(yīng)app_id 的pv值+1后put到Map中即可;
  • kafka topic:test鬼癣;
  • flink運(yùn)算流程如下:


    flink_kafka.png

kafka topic有且只有一個(gè)分區(qū)

  • 假設(shè)kafka的topic-test只有一個(gè)分區(qū)陶贼,flink的Source task記錄了當(dāng)前消費(fèi)到kafka test topic的所有partition的offset

    • 例:(0,1000)
      • 表示0號(hào)partition目前消費(fèi)到offset為1000的數(shù)據(jù)
  • flink的pv task記錄了當(dāng)前計(jì)算的各app的pv值待秃,為了方便講解拜秧,我這里有兩個(gè)app:app1、app2

    • 例:(app1章郁,50000)(app2枉氮,10000)
      • 表示app1當(dāng)前pv值為50000
      • 表示app2當(dāng)前pv值為10000
  • 每來(lái)一條數(shù)據(jù),只需要確定相應(yīng)app_id暖庄,將相應(yīng)的value值+1后put到map中即可聊替;

  • 該案例中,CheckPoint到底記錄了什么信息呢培廓?

  • 記錄的其實(shí)就是第n次CheckPoint消費(fèi)的offset信息和各app的pv值信息惹悄,記錄一下發(fā)生CheckPoint當(dāng)前的狀態(tài)信息,并將該狀態(tài)信息保存到相應(yīng)的狀態(tài)后端医舆。(注:狀態(tài)后端是保存狀態(tài)的地方俘侠,決定狀態(tài)如何保存,如何保障狀態(tài)高可用蔬将,我們只需要知道爷速,我們能從狀態(tài)后端拿到offset信息和pv信息即可。狀態(tài)后端必須是高可用的霞怀,否則我們的狀態(tài)后端經(jīng)常出現(xiàn)故障惫东,會(huì)導(dǎo)致無(wú)法通過(guò)checkpoint來(lái)恢復(fù)我們的應(yīng)用程序)。

  • eg:

    • chk-100
      • offset:(0毙石,1000)
      • pv:(app1廉沮,50000)(app2,10000)
    • 該狀態(tài)信息表示第100次CheckPoint的時(shí)候徐矩, partition 0 offset消費(fèi)到了1000滞时,pv統(tǒng)計(jì)結(jié)果為(app1,50000)(app2滤灯,10000)
  • 任務(wù)掛了坪稽,如何恢復(fù)曼玩?

    • 假如我們?cè)O(shè)置了三分鐘進(jìn)行一次CheckPoint,保存了上述所說(shuō)的 chk-100 的CheckPoint狀態(tài)后窒百,過(guò)了十秒鐘黍判,offset已經(jīng)消費(fèi)到 (0,1100)篙梢,pv統(tǒng)計(jì)結(jié)果變成了(app1顷帖,50080)(app2,10020)渤滞,但是突然任務(wù)掛了贬墩,怎么辦?
      莫慌妄呕,其實(shí)很簡(jiǎn)單震糖,flink只需要從最近一次成功的CheckPoint保存的offset(0,1000)處接著消費(fèi)即可趴腋,當(dāng)然pv值也要按照狀態(tài)里的pv值(app1吊说,50000)(app2,10000)進(jìn)行累加优炬,不能從(app1颁井,50080)(app2,10020)處進(jìn)行累加蠢护,因?yàn)?partition 0 offset消費(fèi)到 1000時(shí)纤掸,pv統(tǒng)計(jì)結(jié)果為(app1关带,50000)(app2后频,10000)
    • 當(dāng)然如果你想從offset (0吴藻,1100)pv(app1,50080)(app2懈凹,10020)這個(gè)狀態(tài)恢復(fù)蜀变,也是做不到的,因?yàn)槟莻€(gè)時(shí)刻程序突然掛了介评,這個(gè)狀態(tài)根本沒(méi)有保存下來(lái)库北。我們能做的最高效方式就是從最近一次成功的CheckPoint處恢復(fù),也就是我一直所說(shuō)的chk-100;
  • 以上講解们陆,基本就是CheckPoint承擔(dān)的工作寒瓦,描述的場(chǎng)景比較簡(jiǎn)單.

  • 疑問(wèn),計(jì)算pv的task在一直運(yùn)行坪仇,它怎么知道什么時(shí)候去做這個(gè)快照杂腰?或者說(shuō)計(jì)算pv的task怎么保障它自己計(jì)算的pv值(app1,50000)(app2椅文,10000)就是offset(0喂很,1000)那一刻的統(tǒng)計(jì)結(jié)果呢蜡镶?

  • flink是在數(shù)據(jù)中加了一個(gè)叫做barrier的東西(barrier中文翻譯:柵欄),下圖中紅圈處就是兩個(gè)barrier;


    flink_barrier.png
  • barrier從Source Task處生成恤筛,一直流到Sink Task,期間所有的Task只要碰到barrier芹橡,就會(huì)觸發(fā)自身進(jìn)行快照;

  • CheckPoint barrier n-1處做的快照就是指Job從開(kāi)始處理到 barrier n-1所有的狀態(tài)數(shù)據(jù);

  • barrier n 處做的快照就是指從Job開(kāi)始到處理到 barrier n所有的狀態(tài)數(shù)據(jù);

  • 對(duì)應(yīng)到pv案例中就是毒坛,Source Task接收到JobManager的編號(hào)為chk-100的CheckPoint觸發(fā)請(qǐng)求后,發(fā)現(xiàn)自己恰好接收到kafka offset(0林说,1000)處的數(shù)據(jù)煎殷,所以會(huì)往offset(0,1000)數(shù)據(jù)之后offset(0腿箩,1001)數(shù)據(jù)之前安插一個(gè)barrier豪直,然后自己開(kāi)始做快照,也就是將offset(0珠移,1000)保存到狀態(tài)后端chk-100中弓乙。然后barrier接著往下游發(fā)送,當(dāng)統(tǒng)計(jì)pv的task接收到barrier后钧惧,也會(huì)暫停處理數(shù)據(jù)暇韧,將自己內(nèi)存中保存的pv信息(app1,50000)(app2浓瞪,10000)保存到狀態(tài)后端chk-100中懈玻。OK,flink大概就是通過(guò)這個(gè)原理來(lái)保存快照的;

  • 統(tǒng)計(jì)pv的task接收到barrier乾颁,就意味著barrier之前的數(shù)據(jù)都處理了涂乌,所以說(shuō),不會(huì)出現(xiàn)丟數(shù)據(jù)的情況

  • barrier的作用就是為了把數(shù)據(jù)區(qū)分開(kāi)英岭,CheckPoint過(guò)程中有一個(gè)同步做快照的環(huán)節(jié)不能處理barrier之后的數(shù)據(jù)湾盒,為什么呢?

  • 如果做快照的同時(shí)诅妹,也在處理數(shù)據(jù)历涝,那么處理的數(shù)據(jù)可能會(huì)修改快照內(nèi)容,所以先暫停處理數(shù)據(jù)漾唉,把內(nèi)存中快照保存好后荧库,再處理數(shù)據(jù)
    結(jié)合案例來(lái)講就是,統(tǒng)計(jì)pv的task想對(duì)(app1赵刑,50000)(app2分衫,10000)做快照,但是如果數(shù)據(jù)還在處理般此,可能快照還沒(méi)保存下來(lái)蚪战,狀態(tài)已經(jīng)變成了(app1牵现,50001)(app2,10001)邀桑,快照就不準(zhǔn)確了瞎疼,就不能保障Exactly Once了;

  • 總結(jié):

  • 流計(jì)算中狀態(tài)交互:


    流計(jì)算中狀態(tài)交互.png
  • 簡(jiǎn)易場(chǎng)景精確一次的容錯(cuò)方法

    • 周期性地對(duì)消費(fèi)offset和統(tǒng)計(jì)的狀態(tài)信息或統(tǒng)計(jì)結(jié)果進(jìn)行快照


      save-1.png
  • 消費(fèi)到X位置的時(shí)候,將X對(duì)應(yīng)的狀態(tài)保存下來(lái)


    save-2.png
  • 消費(fèi)到Y(jié)位置的時(shí)候壁畸,將Y對(duì)應(yīng)的狀態(tài)保存下來(lái)


    save-3.png

多并行度贼急、多Operator情況下,CheckPoint過(guò)程

  • 分布式狀態(tài)容錯(cuò)面臨的問(wèn)題與挑戰(zhàn)

    • 如何確保狀態(tài)擁有精確一次的容錯(cuò)保證捏萍?
    • 如何在分布式場(chǎng)景下替多個(gè)擁有本地狀態(tài)的算子產(chǎn)生一個(gè)全域一致的快照太抓?
    • 如何在不中斷運(yùn)算的前提下產(chǎn)生快照?
  • 多并行度令杈、多Operator實(shí)例的情況下走敌,如何做全域一致的快照?

    • 所有的Operator運(yùn)行過(guò)程中遇到barrier后逗噩,都對(duì)自身的狀態(tài)進(jìn)行一次快照掉丽,保存到相應(yīng)狀態(tài)后端
  • 對(duì)應(yīng)到pv案例:有的Operator計(jì)算的app1的pv,有的Operator計(jì)算的app2的pv异雁,當(dāng)他們碰到barrier時(shí)机打,都需要將目前統(tǒng)計(jì)的pv信息快照到狀態(tài)后端。

  • 多并行圖簡(jiǎn)易快照


    多并行度CheckPoint快照簡(jiǎn)圖.png
  • 多Operator狀態(tài)恢復(fù)


    多Operator狀態(tài)恢復(fù).png
  • 具體怎么做這個(gè)快照呢片迅?

    • 利用之前所有的barrier策略


      flink_barrier.png
  • JobManager向Source Task發(fā)送CheckPointTrigger残邀,Source Task會(huì)在數(shù)據(jù)流中安插CheckPoint barrier;


    多并行度快照詳圖0.png
  • Source Task自身做快照柑蛇,并保存到狀態(tài)后端芥挣;


    多并行度快照詳圖1.png
  • Source Task將barrier跟數(shù)據(jù)流一塊往下游發(fā)送;


    多并行度快照詳圖2.png
  • 當(dāng)下游的Operator實(shí)例接收到CheckPoint barrier后耻台,對(duì)自身做快照


    多并行度快照詳圖3.png

    多并行度快照詳圖4.png
  • 上述圖中空免,有4個(gè)帶狀態(tài)的Operator實(shí)例,相應(yīng)的狀態(tài)后端就可以想象成填4個(gè)格子盆耽。整個(gè)CheckPoint 的過(guò)程可以當(dāng)做Operator實(shí)例填自己格子的過(guò)程蹋砚,Operator實(shí)例將自身的狀態(tài)寫(xiě)到狀態(tài)后端中相應(yīng)的格子,當(dāng)所有的格子填滿可以簡(jiǎn)單的認(rèn)為一次完整的CheckPoint做完了

  • 上面只是快照的過(guò)程摄杂,整個(gè)CheckPoint執(zhí)行過(guò)程如下

    • 1坝咐、JobManager端的 CheckPointCoordinator向 所有SourceTask發(fā)送CheckPointTrigger,Source Task會(huì)在數(shù)據(jù)流中安插CheckPoint barrier
    • 2析恢、當(dāng)task收到所有的barrier后墨坚,向自己的下游繼續(xù)傳遞barrier,然后自身執(zhí)行快照映挂,并將自己的狀態(tài)異步寫(xiě)入到持久化存儲(chǔ)中泽篮。增量CheckPoint只是把最新的一部分更新寫(xiě)入到 外部存儲(chǔ)盗尸;為了下游盡快做CheckPoint,所以會(huì)先發(fā)送barrier到下游帽撑,自身再同步進(jìn)行快照
    • 3泼各、當(dāng)task完成備份后,會(huì)將備份數(shù)據(jù)的地址(state handle)通知給JobManager的CheckPointCoordinator亏拉;
      如果CheckPoint的持續(xù)時(shí)長(zhǎng)超過(guò) 了CheckPoint設(shè)定的超時(shí)時(shí)間扣蜻,CheckPointCoordinator 還沒(méi)有收集完所有的 State Handle,CheckPointCoordinator就會(huì)認(rèn)為本次CheckPoint失敗专筷,會(huì)把這次CheckPoint產(chǎn)生的所有 狀態(tài)數(shù)據(jù)全部刪除。
    • 4蒸苇、 最后 CheckPoint Coordinator 會(huì)把整個(gè) StateHandle 封裝成 completed CheckPoint Meta磷蛹,寫(xiě)入到hdfs。
  • barrier對(duì)齊

    • 什么是barrier對(duì)齊溪烤?


      barrier_對(duì)齊.png
  • 一旦Operator從輸入流接收到CheckPoint barrier n味咳,它就不能處理來(lái)自該流的任何數(shù)據(jù)記錄,直到它從其他所有輸入接收到barrier n為止檬嘀。否則槽驶,它會(huì)混合屬于快照n的記錄和屬于快照n + 1的記錄;

  • 接收到barrier n的流暫時(shí)被擱置鸳兽。從這些流接收的記錄不會(huì)被處理掂铐,而是放入輸入緩沖區(qū)。

  • 上圖中第2個(gè)圖揍异,雖然數(shù)字流對(duì)應(yīng)的barrier已經(jīng)到達(dá)了全陨,但是barrier之后的1、2衷掷、3這些數(shù)據(jù)只能放到buffer中辱姨,等待字母流的barrier到達(dá);

  • 一旦最后所有輸入流都接收到barrier n戚嗅,Operator就會(huì)把緩沖區(qū)中pending 的輸出數(shù)據(jù)發(fā)出去雨涛,然后把CheckPoint barrier n接著往下游發(fā)送

  • 這里還會(huì)對(duì)自身進(jìn)行快照;
    之后懦胞,Operator將繼續(xù)處理來(lái)自所有輸入流的記錄替久,在處理來(lái)自流的記錄之前先處理來(lái)自輸入緩沖區(qū)的記錄。

  • 什么是barrier不對(duì)齊躏尉?

    • 上述圖2中侣肄,當(dāng)還有其他輸入流的barrier還沒(méi)有到達(dá)時(shí),會(huì)把已到達(dá)的barrier之后的數(shù)據(jù)1醇份、2稼锅、3擱置在緩沖區(qū)吼具,等待其他流的barrier到達(dá)后才能處理
    • barrier不對(duì)齊就是指當(dāng)還有其他流的barrier還沒(méi)到達(dá)時(shí),為了不影響性能矩距,也不用理會(huì)拗盒,直接處理barrier之后的數(shù)據(jù)。等到所有流的barrier的都到達(dá)后锥债,就可以對(duì)該Operator做CheckPoint了陡蝇;
  • 為什么要進(jìn)行barrier對(duì)齊?不對(duì)齊到底行不行哮肚?

    • 答:Exactly Once時(shí)必須barrier對(duì)齊登夫,如果barrier不對(duì)齊就變成了At Least Once;
    • 后面的部分主要證明這句話允趟;
  • CheckPoint的目的就是為了保存快照恼策,如果不對(duì)齊,那么在chk-100快照之前潮剪,已經(jīng)處理了一些chk-100 對(duì)應(yīng)的offset之后的數(shù)據(jù)涣楷,當(dāng)程序從chk-100恢復(fù)任務(wù)時(shí),chk-100對(duì)應(yīng)的offset之后的數(shù)據(jù)還會(huì)被處理一次抗碰,所以就出現(xiàn)了重復(fù)消費(fèi)狮斗。如果聽(tīng)不懂沒(méi)關(guān)系,后面有案例讓您懂弧蝇。

  • 結(jié)合pv案例來(lái)看碳褒,之前的案例為了簡(jiǎn)單,描述的kafka的topic只有1個(gè)partition看疗,這里為了講述barrier對(duì)齊骤视,所以topic有2個(gè)partittion;


    flink消費(fèi)kafka鹃觉,計(jì)算pv詳圖.png
  • 結(jié)合業(yè)務(wù)专酗,先介紹一下上述所有算子在業(yè)務(wù)中的功能

    • Source的kafka的Consumer,從kakfa中讀取數(shù)據(jù)到flink應(yīng)用中
      TaskA中的map將讀取到的一條kafka日志轉(zhuǎn)換為我們需要統(tǒng)計(jì)的app_id
      keyBy 按照app_id進(jìn)行keyBy盗扇,相同的app_id 會(huì)分到下游TaskB的同一個(gè)實(shí)例中
      TaskB的map在狀態(tài)中查出該app_id 對(duì)應(yīng)的pv值祷肯,然后+1,存儲(chǔ)到狀態(tài)中
      利用Sink將統(tǒng)計(jì)的pv值寫(xiě)入到外部存儲(chǔ)介質(zhì)中疗隶;

    • 我們從kafka的兩個(gè)partition消費(fèi)數(shù)據(jù)佑笋,TaskA和TaskB都有兩個(gè)并行度,所以總共flink有4個(gè)Operator實(shí)例斑鼻,這里我們稱之為 TaskA0蒋纬、TaskA1、TaskB0、TaskB1蜀备;

    • 假設(shè)已經(jīng)成功做了99次CheckPoint关摇,這里詳細(xì)解釋第100次CheckPoint過(guò)程;

      • JobManager內(nèi)部有個(gè)定時(shí)調(diào)度碾阁,假如現(xiàn)在10點(diǎn)00分00秒到了第100次CheckPoint的時(shí)間了输虱,JobManager的CheckPointCoordinator進(jìn)程會(huì)向所有的Source Task發(fā)送CheckPointTrigger,也就是向TaskA0脂凶、TaskA1發(fā)送CheckPointTrigger宪睹。
      • TaskA0、TaskA1接收到CheckPointTrigger蚕钦,會(huì)往數(shù)據(jù)流中安插barrier亭病,將barrier發(fā)送到下游,在自己的狀態(tài)中記錄barrier安插的offset位置嘶居,然后自身做快照罪帖,將offset信息保存到狀態(tài)后端。
      • 這里假如TaskA0消費(fèi)的partition0的offset為10000食听,TaskA1消費(fèi)的partition1的offset為10005胸蛛。那么狀態(tài)中會(huì)保存 (0污茵,10000)(1樱报,10005),表示0號(hào)partition消費(fèi)到了offset為10000的位置泞当,1號(hào)partition消費(fèi)到了offset為10005的位置迹蛤;
      • 然后TaskA的map和keyBy算子中并沒(méi)有狀態(tài),所以不需要進(jìn)行快照
        接著數(shù)據(jù)和barrier都向下游TaskB發(fā)送襟士,相同的app_id 會(huì)發(fā)送到相同的TaskB實(shí)例上盗飒,這里假設(shè)有兩個(gè)app:app0和app1,經(jīng)過(guò)keyBy后陋桂,假設(shè)app0分到了TaskB0上逆趣,app1分到了TaskB1上∈壤基于上面描述宣渗,TaskA0和TaskA1中的所有app0的數(shù)據(jù)都發(fā)送到TaskB0上,所有app1的數(shù)據(jù)都發(fā)送到TaskB1上
    • 現(xiàn)在我們假設(shè)TaskB0做CheckPoint的時(shí)候barrier對(duì)齊了梨州,TaskB1做CheckPoint的時(shí)候barrier不對(duì)齊痕囱,當(dāng)然不能這么配置,我就是舉這么個(gè)例子暴匠,帶大家分析一下barrier對(duì)不對(duì)齊到底對(duì)統(tǒng)計(jì)結(jié)果有什么影響鞍恢?

    • 上面說(shuō)了chk-100的這次CheckPoint,offset位置為(0,10000)(1帮掉,10005)弦悉,TaskB0使用barrier對(duì)齊,也就是說(shuō)TaskB0不會(huì)處理barrier之后的數(shù)據(jù)旭寿,所以TaskB0在chk-100快照的時(shí)候警绩,狀態(tài)后端保存的app0的pv數(shù)據(jù)是從程序開(kāi)始啟動(dòng)到kafka offset位置為(0,10000)(1盅称,10005)的所有數(shù)據(jù)計(jì)算出來(lái)的pv值肩祥,一條不多(沒(méi)處理barrier之后,所以不會(huì)重復(fù))缩膝,一條不少(barrier之前的所有數(shù)據(jù)都處理了混狠,所以不會(huì)丟失),假如保存的狀態(tài)信息為(app0疾层,8000)表示消費(fèi)到(0将饺,10000)(1,10005)offset的時(shí)候痛黎,app0的pv值為8000

    • TaskB1使用的barrier不對(duì)齊予弧,假如TaskA0由于服務(wù)器的CPU或者網(wǎng)絡(luò)等其他波動(dòng),導(dǎo)致TaskA0處理數(shù)據(jù)較慢湖饱,而TaskA1很穩(wěn)定掖蛤,所以處理數(shù)據(jù)比較快。導(dǎo)致的結(jié)果就是TaskB1先接收到了TaskA1的barrier井厌,由于配置的barrier不對(duì)齊蚓庭,所以TaskB1會(huì)接著處理TaskA1 barrier之后的數(shù)據(jù),過(guò)了2秒后仅仆,TaskB1接收到了TaskA0的barrier器赞,于是對(duì)狀態(tài)中存儲(chǔ)的app1的pv值開(kāi)始做CheckPoint 快照,保存的狀態(tài)信息為(app1墓拜,12050)港柜,但是我們知道這個(gè)(app1,12050)實(shí)際上多處理了2秒TaskA1發(fā)來(lái)的barrier之后的數(shù)據(jù)咳榜,也就是kafka topic對(duì)應(yīng)的partition1 offset 10005之后的數(shù)據(jù)夏醉,app1真實(shí)的pv數(shù)據(jù)肯定要小于這個(gè)12050,partition1的offset保存的offset雖然是10005贿衍,但是我們實(shí)際上可能已經(jīng)處理到了offset 10200的數(shù)據(jù)授舟,假設(shè)就是處理到了10200;

    • 雖然狀態(tài)保存的pv值偏高了贸辈,但是不能說(shuō)明重復(fù)處理释树,因?yàn)槲业腡askA1并沒(méi)有再次去消費(fèi)partition1的offset 10005~10200的數(shù)據(jù)肠槽,所以相當(dāng)于也沒(méi)有重復(fù)消費(fèi),只是展示的結(jié)果更實(shí)時(shí)了

  • 分析到這里奢啥,我們先梳理一下我們的狀態(tài)保存了什么:

  • chk-100

    • offset:(0秸仙,10000)(1,10005)
    • pv:(app0桩盲,8000) (app1寂纪,12050)
  • 接著程序在繼續(xù)運(yùn)行,過(guò)了10秒赌结,由于某個(gè)服務(wù)器掛了捞蛋,導(dǎo)致我們的四個(gè)Operator實(shí)例有一個(gè)Operator掛了,所以Flink會(huì)從最近一次的狀態(tài)恢復(fù)柬姚,也就是我們剛剛詳細(xì)講的chk-100處恢復(fù)拟杉,那具體是怎么恢復(fù)的呢?

  • Flink 同樣會(huì)起四個(gè)Operator實(shí)例量承,我還稱他們是 TaskA0搬设、TaskA1、TaskB0撕捍、TaskB1拿穴。四個(gè)Operator會(huì)從狀態(tài)后端讀取保存的狀態(tài)信息。

  • 從offset:(0忧风,10000)(1默色,10005) 開(kāi)始消費(fèi),并且基于 pv:(app0阀蒂,8000) (app1该窗,12050)值進(jìn)行累加統(tǒng)計(jì)

  • 然后你就應(yīng)該會(huì)發(fā)現(xiàn)這個(gè)app1的pv值12050實(shí)際上已經(jīng)包含了partition1的offset 10005~10200的數(shù)據(jù)弟蚀,所以partition1從offset 10005恢復(fù)任務(wù)時(shí)蚤霞,partition1的offset 10005~10200的數(shù)據(jù)被消費(fèi)了兩次

  • TaskB1設(shè)置的barrier不對(duì)齊,所以CheckPoint chk-100對(duì)應(yīng)的狀態(tài)中多消費(fèi)了barrier之后的一些數(shù)據(jù)(TaskA1發(fā)送)义钉,重啟后是從chk-100保存的offset恢復(fù)昧绣,這就是所說(shuō)的At Least Once

  • 由于上面說(shuō)TaskB0設(shè)置的barrier對(duì)齊,所以app0不會(huì)出現(xiàn)重復(fù)消費(fèi)捶闸,因?yàn)閍pp0沒(méi)有消費(fèi)offset:(0夜畴,10000)(1,10005) 之后的數(shù)據(jù)删壮,也就是所謂的Exactly Once贪绘;

  • 看到這里你應(yīng)該已經(jīng)知道了哪種情況會(huì)出現(xiàn)重復(fù)消費(fèi)了,也應(yīng)該要掌握為什么barrier對(duì)齊就是Exactly Once央碟,為什么barrier不對(duì)齊就是 At Least Once

  • 分析了這么多税灌,這里我再補(bǔ)充一個(gè)問(wèn)題,到底什么時(shí)候會(huì)出現(xiàn)barrier對(duì)齊?

    • 首先設(shè)置了Flink的CheckPoint語(yǔ)義是:Exactly Once
    • Operator實(shí)例必須有多個(gè)輸入流才會(huì)出現(xiàn)barrier對(duì)齊
      • 對(duì)齊菱涤,漢語(yǔ)詞匯苞也,釋義為使兩個(gè)以上事物配合或接觸得整齊。由漢語(yǔ)解釋可得對(duì)齊肯定需要兩個(gè)以上事物粘秆,所以如迟,必須有多個(gè)流才叫對(duì)齊。barrier對(duì)齊其實(shí)也就是上游多個(gè)流配合使得數(shù)據(jù)對(duì)齊的過(guò)程攻走;
      • 言外之意:如果Operator實(shí)例只有一個(gè)輸入流殷勘,就根本不存在barrier對(duì)齊,自己跟自己默認(rèn)永遠(yuǎn)都是對(duì)齊的昔搂;
  • 博客發(fā)出去后劳吠,感謝上海姜同學(xué)提問(wèn)的幾個(gè)問(wèn)題,最后跟姜同學(xué)語(yǔ)音了2個(gè)多小時(shí)巩趁,交流了很多Flink相關(guān)技術(shù)痒玩,最后提煉了以下三個(gè)問(wèn)題,當(dāng)然討論的很多FLink的其他技術(shù)并沒(méi)有放到該博客中议慰。

  • 第一種場(chǎng)景計(jì)算PV蠢古,kafka只有一個(gè)partition,精確一次别凹,至少一次就沒(méi)有區(qū)別草讶?

    • 答:如果只有一個(gè)partition,對(duì)應(yīng)flink任務(wù)的Source Task并行度只能是1炉菲,確實(shí)沒(méi)有區(qū)別堕战,不會(huì)有至少一次的存在了,肯定是精確一次拍霜。因?yàn)橹挥衎arrier不對(duì)齊才會(huì)有可能重復(fù)處理嘱丢,這里并行度都已經(jīng)為1,默認(rèn)就是對(duì)齊的祠饺,只有當(dāng)上游有多個(gè)并行度的時(shí)候越驻,多個(gè)并行度發(fā)到下游的barrier才需要對(duì)齊,單并行度不會(huì)出現(xiàn)barrier不對(duì)齊道偷,所以必然精確一次缀旁。其實(shí)還是要理解barrier對(duì)齊就是Exactly Once不會(huì)重復(fù)消費(fèi),barrier不對(duì)齊就是 At Least Once可能重復(fù)消費(fèi)勺鸦,這里只有單個(gè)并行度根本不會(huì)存在barrier不對(duì)齊并巍,所以不會(huì)存在至少一次語(yǔ)義;
  • 為了下游盡快做CheckPoint换途,所以會(huì)先發(fā)送barrier到下游懊渡,自身再同步進(jìn)行快照嘶窄;這一步,如果向下發(fā)送barrier后距贷,自己同步快照慢怎么辦柄冲?下游已經(jīng)同步好了,自己還沒(méi)忠蝗?

    • 答: 可能會(huì)出現(xiàn)下游比上游快照還早的情況现横,但是這不影響快照結(jié)果,只是下游快照的更及時(shí)了阁最,我只要保障下游把barrier之前的數(shù)據(jù)都處理了戒祠,并且不處理barrier之后的數(shù)據(jù),然后做快照速种,那么下游也同樣支持精確一次姜盈。這個(gè)問(wèn)題你不要從全局思考,你單獨(dú)思考上游和下游的實(shí)例配阵,你會(huì)發(fā)現(xiàn)上下游的狀態(tài)都是準(zhǔn)確的馏颂,既沒(méi)有丟,也沒(méi)有重復(fù)計(jì)算棋傍。這里需要注意一點(diǎn)救拉,如果有一個(gè)Operator 的CheckPoint失敗了或者因?yàn)镃heckPoint超時(shí)也會(huì)導(dǎo)致失敗,那么JobManager會(huì)認(rèn)為整個(gè)CheckPoint失敗瘫拣。失敗的CheckPoint是不能用來(lái)恢復(fù)任務(wù)的亿絮,必須所有的算子的CheckPoint都成功,那么這次CheckPoint才能認(rèn)為是成功的麸拄,才能用來(lái)恢復(fù)任務(wù)派昧;
  • 我程序中Flink的CheckPoint語(yǔ)義設(shè)置了 Exactly Once,但是我的mysql中看到數(shù)據(jù)重復(fù)了拢切?程序中設(shè)置了1分鐘1次CheckPoint蒂萎,但是5秒向mysql寫(xiě)一次數(shù)據(jù),并commit失球;

    • 答:Flink要求end to end的精確一次都必須實(shí)現(xiàn)TwoPhaseCommitSinkFunction岖是。如果你的chk-100成功了帮毁,過(guò)了30秒实苞,由于5秒commit一次,所以實(shí)際上已經(jīng)寫(xiě)入了6批數(shù)據(jù)進(jìn)入mysql烈疚,但是突然程序掛了黔牵,從chk100處恢復(fù),這樣的話爷肝,之前提交的6批數(shù)據(jù)就會(huì)重復(fù)寫(xiě)入猾浦,所以出現(xiàn)了重復(fù)消費(fèi)陆错。Flink的精確一次有兩種情況,一個(gè)是Flink內(nèi)部的精確一次金赦,一個(gè)是端對(duì)端的精確一次音瓷,這個(gè)博客所描述的都是關(guān)于Flink內(nèi)部去的精確一次,我后期再發(fā)一個(gè)博客詳細(xì)介紹一下Flink端對(duì)端的精確一次如何實(shí)現(xiàn)

參考博客:

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市漠烧,隨后出現(xiàn)的幾起案子杏愤,更是在濱河造成了極大的恐慌,老刑警劉巖已脓,帶你破解...
    沈念sama閱讀 221,548評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件珊楼,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡度液,警方通過(guò)查閱死者的電腦和手機(jī)厕宗,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,497評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)堕担,“玉大人媳瞪,你說(shuō)我怎么就攤上這事≌毡Γ” “怎么了蛇受?”我有些...
    開(kāi)封第一講書(shū)人閱讀 167,990評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)厕鹃。 經(jīng)常有香客問(wèn)我兢仰,道長(zhǎng),這世上最難降的妖魔是什么剂碴? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,618評(píng)論 1 296
  • 正文 為了忘掉前任把将,我火速辦了婚禮,結(jié)果婚禮上忆矛,老公的妹妹穿的比我還像新娘察蹲。我一直安慰自己,他們只是感情好催训,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,618評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布洽议。 她就那樣靜靜地躺著,像睡著了一般漫拭。 火紅的嫁衣襯著肌膚如雪亚兄。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 52,246評(píng)論 1 308
  • 那天采驻,我揣著相機(jī)與錄音审胚,去河邊找鬼匈勋。 笑死,一個(gè)胖子當(dāng)著我的面吹牛膳叨,可吹牛的內(nèi)容都是我干的洽洁。 我是一名探鬼主播,決...
    沈念sama閱讀 40,819評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼菲嘴,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼诡挂!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起临谱,我...
    開(kāi)封第一講書(shū)人閱讀 39,725評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤璃俗,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后悉默,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體城豁,經(jīng)...
    沈念sama閱讀 46,268評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,356評(píng)論 3 340
  • 正文 我和宋清朗相戀三年抄课,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了唱星。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,488評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡跟磨,死狀恐怖间聊,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情抵拘,我是刑警寧澤哎榴,帶...
    沈念sama閱讀 36,181評(píng)論 5 350
  • 正文 年R本政府宣布,位于F島的核電站僵蛛,受9級(jí)特大地震影響尚蝌,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜充尉,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,862評(píng)論 3 333
  • 文/蒙蒙 一飘言、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧驼侠,春花似錦姿鸿、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,331評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至相速,卻和暖如春碟渺,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背突诬。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,445評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工苫拍, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人旺隙。 一個(gè)月前我還...
    沈念sama閱讀 48,897評(píng)論 3 376
  • 正文 我出身青樓绒极,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親蔬捷。 傳聞我的和親對(duì)象是個(gè)殘疾皇子垄提,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,500評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容

  • 翻譯自:Data Streaming Fault Tolerance 簡(jiǎn)介 Apache Flink 提供了容錯(cuò)機(jī)...
    第十人_孔閱讀 2,221評(píng)論 0 2
  • Flink 提供了容錯(cuò)機(jī)制,可以恢復(fù)數(shù)據(jù)流應(yīng)用到一致?tīng)顟B(tài)周拐。該機(jī)制確保在發(fā)生故障時(shí)铡俐,程序的狀態(tài)最終將只反映數(shù)據(jù)流中的...
    Alex90閱讀 5,755評(píng)論 1 2
  • Apache Flink是一個(gè)面向分布式數(shù)據(jù)流處理和批量數(shù)據(jù)處理的開(kāi)源計(jì)算平臺(tái),它能夠基于同一個(gè)Flink運(yùn)行時(shí)妥粟,...
    康小為6840閱讀 1,202評(píng)論 0 7
  • 我們一生中可能會(huì)遇到很多人:有些人來(lái)了又去审丘,有些人去而復(fù)返。有些人擦肩而過(guò)勾给,有些人一路同行滩报。有些人或許在路的盡頭相...
    吳小小溪閱讀 1,301評(píng)論 5 3
  • 今天下了一整天的雨,夾雜著微風(fēng)播急,讓我感覺(jué)涼意陣陣脓钾,這場(chǎng)雨,一直沒(méi)有停過(guò)桩警。 我的小表妹去上學(xué)了可训,小學(xué)...
    做你的貓咪閱讀 165評(píng)論 0 1