前言
Flink的檢查點(diǎn)和恢復(fù)機(jī)制定期的會(huì)保存應(yīng)用程序狀態(tài)的一致性檢查點(diǎn)。在故障的情況下抹竹,應(yīng)用程序的狀態(tài)將會(huì)從最近一次完成的檢查點(diǎn)恢復(fù),并繼續(xù)處理止潮。盡管如此窃判,可以使用檢查點(diǎn)來(lái)重置應(yīng)用程序的狀態(tài)無(wú)法完全達(dá)到令人滿意的一致性保證。相反喇闸,source和sink的連接器需要和Flink的檢查點(diǎn)和恢復(fù)機(jī)制進(jìn)行集成才能提供有意義的一致性保證袄琳。
狀態(tài)一致性
對(duì)于流處理器內(nèi)部來(lái)說(shuō)窿凤,所謂的狀態(tài)一致性,其實(shí)就是我們所說(shuō)的計(jì)算結(jié)果要保證準(zhǔn)確跨蟹。 一條數(shù)據(jù)不應(yīng)該丟失雳殊,也不應(yīng)該重復(fù)計(jì)算 在遇到故障時(shí)可以恢復(fù)狀態(tài),恢復(fù)以后的重新計(jì)算窗轩,結(jié)果應(yīng)該也是完全正確的夯秃。
分類
AT-MOST-ONCE(最多一次) 當(dāng)任務(wù)故障時(shí),最簡(jiǎn)單的做法是什么都不干痢艺,既不恢復(fù)丟失的狀態(tài)仓洼,也不重播丟失的數(shù)據(jù)。At-most-once 語(yǔ)義的含義是最多處理一次事件堤舒。
AT-LEAST-ONCE(至少一次) 在大多數(shù)的真實(shí)應(yīng)用場(chǎng)景色建,我們希望不丟失事件。這種類型的保障稱為 at-least-once舌缤,意思是所有的事件都得到了處理箕戳,而一些事件還可能被處理多次。
EXACTLY-ONCE(精確一次) 恰好處理一次是最嚴(yán)格的保證国撵,也是最難實(shí)現(xiàn)的陵吸。恰好處理一次語(yǔ)義不僅僅意味著沒(méi)有事件丟失,還意味著針對(duì)每一個(gè)數(shù)據(jù)介牙,內(nèi)部狀態(tài)僅僅更新一次壮虫。
EXACTLY-ONCE 的保證
Flink的 checkpoint機(jī)制和故障恢復(fù)機(jī)制給Flink內(nèi)部提供了精確一次的保證,需要注意的是环础,所謂精確一次并不是說(shuō)精確到每個(gè)event只執(zhí)行一次囚似,而是每個(gè)event對(duì)狀態(tài)(計(jì)算結(jié)果)的影響只有一次。
端到端 EXACTLY-ONCE
目前我們看到的一致性保證都是由流處理器實(shí)現(xiàn)的线得,也就是說(shuō)都是在 Flink 流處理器內(nèi)部保證的饶唤;而在真實(shí)應(yīng)用中,流處理應(yīng)用除了流處理器以外還包含了數(shù)據(jù)源(例如 Kafka)和輸出到持久化系統(tǒng)
端到端的一致性保證框都,意味著結(jié)果的正確性貫穿了整個(gè)流處理應(yīng)用的始終搬素;每一個(gè)組件都保證了它自己的一致性
不同Source 和Sink的一致性保證
source/sink | 不可重置 | 可重置 |
---|---|---|
任意(Any) | At-most-once | At-least-once |
冪等 | At-most-once | Exactly-once(故障恢復(fù)時(shí)會(huì)出現(xiàn)暫時(shí)不一致) |
預(yù)寫日志(WAL) | At-most-once | At-least-once |
兩階段提交(2PC) | At-most-once | Exactly-once |
整個(gè)端到端的一致性級(jí)別取決于所有組件中一致性最弱的組件
內(nèi)部保證 —— checkpoint
source 端 —— 可重設(shè)數(shù)據(jù)的讀取位置
-
sink 端 —— 從故障恢復(fù)時(shí)呵晨,數(shù)據(jù)不會(huì)重復(fù)寫入外部系統(tǒng)
冪等寫入
事務(wù)寫入
Fink的檢查點(diǎn)和恢復(fù)機(jī)制和可以重置讀位置的source連接器結(jié)合使用魏保,可以保證應(yīng)用程序不會(huì)丟失任何數(shù)據(jù)。盡管如此摸屠,應(yīng)用程序可能會(huì)發(fā)出兩次計(jì)算結(jié)果谓罗,因?yàn)閺纳弦淮螜z查點(diǎn)恢復(fù)的應(yīng)用程序所計(jì)算的結(jié)果將會(huì)被重新發(fā)送一次(一些結(jié)果已經(jīng)發(fā)送出去了,這時(shí)任務(wù)故障季二,然后從上一次檢查點(diǎn)恢復(fù)檩咱,這些結(jié)果將被重新計(jì)算一次然后發(fā)送出去)揭措。所以,可重置讀位置的source和Flink的恢復(fù)機(jī)制不足以提供端到端的恰好處理一次語(yǔ)義刻蚯,即使應(yīng)用程序的狀態(tài)是恰好處理一次一致性級(jí)別绊含。
端到端恰好處理一次語(yǔ)義一致性的應(yīng)用程序需要特殊的sink連接器。sink連接器可以在不同的情況下使用兩種技術(shù)來(lái)達(dá)到恰好處理一次一致性語(yǔ)義:冪等性寫入和事務(wù)性寫入炊汹。
冪等寫入
冪等概念
所謂冪等操作躬充,是說(shuō)一個(gè)操作,可以重復(fù)執(zhí)行很多次讨便,但只導(dǎo)致一次結(jié)果更改充甚,也就是說(shuō),后面再重復(fù)執(zhí)行就不起作用了
實(shí)現(xiàn)思想
必須保證在從檢查點(diǎn)恢復(fù)以后霸褒,它將會(huì)覆蓋之前已經(jīng)寫入的結(jié)果伴找。
優(yōu)缺點(diǎn)
從Flink程序sink到的key-value存儲(chǔ)中讀取數(shù)據(jù)的應(yīng)用,在Flink從檢查點(diǎn)恢復(fù)的過(guò)程中废菱,可能會(huì)看到不想看到的結(jié)果技矮。當(dāng)重播開(kāi)始時(shí),之前已經(jīng)發(fā)出的計(jì)算結(jié)果可能會(huì)被更早的結(jié)果所覆蓋(因?yàn)樵诨謴?fù)過(guò)程中)殊轴。所以穆役,一個(gè)消費(fèi)Flink程序輸出數(shù)據(jù)的應(yīng)用,可能會(huì)觀察到時(shí)間回退梳凛,例如讀到了比之前小的計(jì)數(shù)耿币。
事務(wù)寫入
事務(wù)概念
應(yīng)用程序中一系列嚴(yán)密的操作,所有操作必須成功完成韧拒,否則在每個(gè)操作中所作的所有更改都會(huì)被撤消
具有原子性:一個(gè)事務(wù)中的一系列的操作要么全部成功淹接,要么一個(gè)都不做
實(shí)現(xiàn)思想
構(gòu)建的事務(wù)對(duì)應(yīng)著 checkpoint,等到 checkpoint 真正完成的時(shí)候叛溢,才把所有對(duì)應(yīng)的結(jié)果寫入 sink 系統(tǒng)中
優(yōu)缺點(diǎn)
事務(wù)性的方法將不會(huì)遭受冪等性寫入所遭受的重播不一致的問(wèn)題塑悼。但是,事務(wù)性寫入?yún)s帶來(lái)了延遲楷掉,因?yàn)橹挥性跈z查點(diǎn)完成以后厢蒜,我們才能看到計(jì)算結(jié)果。
Flink提供了兩種構(gòu)建模塊來(lái)實(shí)現(xiàn)事務(wù)性sink連接器:write-ahead-log(WAL烹植,預(yù)寫式日志)sink和兩階段提交sink斑鸦。
實(shí)現(xiàn)方式
預(yù)寫日志
把結(jié)果數(shù)據(jù)先當(dāng)成狀態(tài)保存,然后在收到 checkpoint 完成的通知時(shí)草雕,一次性寫入 sink 系統(tǒng)
簡(jiǎn)單易于實(shí)現(xiàn)巷屿,由于數(shù)據(jù)提前在狀態(tài)后端中做了緩存,所以無(wú)論什么 sink 系統(tǒng)墩虹,都能用這種方式一批搞定
DataStream API 提供了一個(gè)模板類:GenericWriteAheadSink嘱巾,來(lái)實(shí)現(xiàn)這種事務(wù)性 sink
兩階段提交
對(duì)于每個(gè) checkpoint憨琳,sink 任務(wù)會(huì)啟動(dòng)一個(gè)事務(wù),并將接下來(lái)所有接收的數(shù)據(jù)添加到事務(wù)里
然后將這些數(shù)據(jù)寫入外部 sink 系統(tǒng)旬昭,但不提交它們 —— 這時(shí)只是“預(yù)提交”
當(dāng)它收到 checkpoint 完成的通知時(shí)篙螟,它才正式提交事務(wù),實(shí)現(xiàn)結(jié)果的真正寫入
這種方式真正實(shí)現(xiàn)了 exactly-once问拘,它需要一個(gè)提供事務(wù)支持的外部 sink 系統(tǒng)闲擦。
Flink 提供了 TwoPhaseCommitSinkFunction 接口。
2PC 對(duì)外部 sink 系統(tǒng)的要求
外部 sink 系統(tǒng)必須提供事務(wù)支持场梆,或者 sink 任務(wù)必須能夠模擬外部系統(tǒng)上的事務(wù)
在 checkpoint 的間隔期間里墅冷,必須能夠開(kāi)啟一個(gè)事務(wù)并接受數(shù)據(jù)寫入
在收到 checkpoint 完成的通知之前,事務(wù)必須是“等待提交”的狀態(tài)或油。在故障恢復(fù)的情況下寞忿,這可能需要一些時(shí)間。如果這個(gè)時(shí)候sink系統(tǒng)關(guān)閉事務(wù)(例如超時(shí)了)顶岸,那么未提交的數(shù)據(jù)就會(huì)丟失
sink 任務(wù)必須能夠在進(jìn)程失敗后恢復(fù)事務(wù)
提交事務(wù)必須是冪等操作
Flink+Kafka 端到端狀態(tài)一致性的保證
使用flink+kafka來(lái)實(shí)現(xiàn)一個(gè)端對(duì)端一致性保證岔乔,source -> transform -> sink
內(nèi)部 —— 利用 checkpoint 機(jī)制席赂,把狀態(tài)存盤虱歪,發(fā)生故障的時(shí)候可以恢復(fù)谴咸,保證內(nèi)部的狀態(tài)一致性
source —— kafka consumer 作為 source,可以將偏移量保存下來(lái)卷谈,如果后續(xù)任務(wù)出現(xiàn)了故障杯拐,恢復(fù)的時(shí)候可以由連接器重置偏移量,重新消費(fèi)數(shù)據(jù)世蔗,保證一致性
sink —— kafka producer 作為sink端逼,采用兩階段提交 sink,需要實(shí)現(xiàn)一個(gè) TwoPhaseCommitSinkFunction
圖解Exactly-Once 兩階段提交
Exactly-once 兩階段提交1:
JobManager 協(xié)調(diào)各個(gè) TaskManager 進(jìn)行 checkpoint 存儲(chǔ) checkpoint保存在 StateBackend中污淋,默認(rèn)StateBackend是內(nèi)存級(jí)的顶滩,也可以改為文件級(jí)的進(jìn)行持久化保存
Exactly-once 兩階段提交2:
當(dāng)開(kāi)啟了checkpoint ,JobManager 會(huì)將檢查點(diǎn)分界線(barrier)注入數(shù)據(jù)流 barrier會(huì)在算子間傳遞下去
每個(gè)算子會(huì)對(duì)當(dāng)前的狀態(tài)做個(gè)快照寸爆,保存到狀態(tài)后端
checkpoint 機(jī)制可以保證內(nèi)部的狀態(tài)一致性
每個(gè)內(nèi)部的 transform 任務(wù)遇到 barrier 時(shí)礁鲁,都會(huì)把狀態(tài)存到 checkpoint 里
sink 任務(wù)首先把數(shù)據(jù)寫入外部 kafka,這些數(shù)據(jù)都屬于預(yù)提交的事務(wù)赁豆;
遇到 barrier 時(shí)仅醇,把狀態(tài)保存到狀態(tài)后端,并開(kāi)啟新的預(yù)提交事務(wù)
當(dāng)所有算子任務(wù)的快照完成歌憨,也就是這次的 checkpoint 完成時(shí)着憨,JobManager 會(huì)向所有任務(wù)發(fā)通知,確認(rèn)這次 checkpoint 完成
sink 任務(wù)收到確認(rèn)通知务嫡,正式提交之前的事務(wù)甲抖,kafka 中未確認(rèn)數(shù)據(jù)改為“已確認(rèn)”
總結(jié) Exactly-once 兩階段提交步驟
第一條數(shù)據(jù)來(lái)了之后,開(kāi)啟一個(gè) kafka 的事務(wù)(transaction)心铃,正常寫入 kafka 分區(qū)日志但標(biāo)記為未提交准谚,這就是“預(yù)提交”
jobmanager 觸發(fā) checkpoint 操作,barrier 從 source 開(kāi)始向下傳遞去扣,遇到 barrier 的算子將狀態(tài)存入狀態(tài)后端柱衔,并通知 jobmanager sink 連接器收到 barrier,保存當(dāng)前狀態(tài)愉棱,存入 checkpoint唆铐,通知 jobmanager,并開(kāi)啟下一階段的事務(wù)奔滑,用于提交下個(gè)檢查點(diǎn)的數(shù)據(jù)
jobmanager 收到所有任務(wù)的通知艾岂,發(fā)出確認(rèn)信息,表示 checkpoint 完成
sink 任務(wù)收到 jobmanager 的確認(rèn)信息朋其,正式提交這段時(shí)間的數(shù)據(jù)
外部kafka關(guān)閉事務(wù)王浴,提交的數(shù)據(jù)可以正常消費(fèi)了。
在使用kafka011 sink 時(shí)注意的點(diǎn):
1.為了保證事務(wù)特性梅猿,在使用其他程序去消費(fèi)我們flink sink 數(shù)據(jù)的kafka時(shí)氓辣,這個(gè)consumer需要設(shè)置了isolation.level = read_committed
,那么它只會(huì)讀取已經(jīng)提交了的消息袱蚓。
2.Checkpoint超時(shí)時(shí)間 必需大于 kafka 提交事務(wù)時(shí)間钞啸。
假如checkpoint失敗時(shí)間高于 kafka事務(wù)等待時(shí)間,比如喇潘,設(shè)置了一個(gè)checkpoint最多等待10分鐘爽撒,10分鐘后會(huì)失敗這個(gè)checkpoint的保存。而kafka 的事務(wù)只能等待5分鐘响蓉,5分鐘后把uncommitted的事務(wù)關(guān)掉硕勿。這個(gè)時(shí)候6分鐘checkpoint成功了,但是對(duì)應(yīng)kafka數(shù)據(jù)的事務(wù)已經(jīng)失敗枫甲。這樣就無(wú)法保證Exactly-once的實(shí)現(xiàn)