Flink Checkpoint 原理流程以及常見失敗原因分析

本文僅為筆者平日學(xué)習(xí)記錄之用,侵刪
原文:https://mp.weixin.qq.com/s/dKpYz-YvySAyAEFCq5_dGA

前言

目前實(shí)時(shí)任務(wù)主要以 Flink 為主葛超,為了保證實(shí)時(shí)任務(wù)的容錯(cuò)恢復(fù)以及停止重啟時(shí)的狀態(tài)恢復(fù)答渔,幾乎所有的實(shí)時(shí)任務(wù)都會開啟 Checkpoint 或者觸發(fā) Savepoint 進(jìn)行狀態(tài)保存沼撕。由于 Savepoint 底層原理的實(shí)現(xiàn)和 Checkpoint 幾乎一致务豺,本文結(jié)合 Flink 1.9 版本笼沥,重點(diǎn)講述 Flink Checkpoint 原理流程以及常見原因分析奔浅,讓用戶能夠更好的理解 Flink Checkpoint汹桦,從而開發(fā)出更健壯的實(shí)時(shí)任務(wù)营勤。

一、 什么是 Flink Checkpoint 和狀態(tài)

1.1 Flink Checkpoint 是什么

Flink Checkpoint 是一種容錯(cuò)恢復(fù)機(jī)制赂蠢。這種機(jī)制保證了實(shí)時(shí)程序運(yùn)行時(shí)虱岂,即使突然遇到異车卺或者機(jī)器問題時(shí)也能夠進(jìn)行自我恢復(fù)。Flink Checkpoint 對于用戶層面來說键袱,是透明的褐健,用戶會感覺實(shí)時(shí)任務(wù)一直在運(yùn)行蚜迅。

Flink Checkpoint 是 Flink 自身的系統(tǒng)行為慢叨,用戶無法對其進(jìn)行交互,用戶可以在程序啟動之前馏段,設(shè)置好實(shí)時(shí)任務(wù) Checkpoint 相關(guān)的參數(shù),當(dāng)任務(wù)啟動之后践瓷,剩下的就全交給 Flink 自行管理院喜。

1.2 為什么要開啟 Checkpoint

實(shí)時(shí)任務(wù)不同于批處理任務(wù),除非用戶主動停止晕翠,一般會一直運(yùn)行喷舀,運(yùn)行的過程中可能存在機(jī)器故障、網(wǎng)絡(luò)問題淋肾、外界存儲問題等等硫麻,要想實(shí)時(shí)任務(wù)一直能夠穩(wěn)定運(yùn)行,實(shí)時(shí)任務(wù)要有自動容錯(cuò)恢復(fù)的功能樊卓。而批處理任務(wù)在遇到異常情況時(shí),在重新計(jì)算一遍即可柳洋。實(shí)時(shí)任務(wù)因?yàn)闀恢边\(yùn)行的特性,如果在從頭開始計(jì)算,成本會很大,尤其是對于那種運(yùn)行時(shí)間很久的實(shí)時(shí)任務(wù)來說。

實(shí)時(shí)任務(wù)開啟 Checkpoint 功能,也能夠減少容錯(cuò)恢復(fù)的時(shí)間。因?yàn)槊看味际菑淖钚碌?Chekpoint 點(diǎn)位開始狀態(tài)恢復(fù)射富,而不是從程序啟動的狀態(tài)開始恢復(fù)柴灯。舉個(gè)列子依啰,如果你有一個(gè)運(yùn)行一年的實(shí)時(shí)任務(wù)闷旧,如果容錯(cuò)恢復(fù)是從一年前啟動時(shí)的狀態(tài)恢復(fù)钝侠,實(shí)時(shí)任務(wù)可能需要運(yùn)行很久才能恢復(fù)到現(xiàn)在狀態(tài)啃勉,這一般是業(yè)務(wù)方所不允許的。

1.3 Flink 任務(wù)狀態(tài)是什么

Flink Checkpoint 會將實(shí)時(shí)任務(wù)的狀態(tài)存儲到遠(yuǎn)端存儲,比如 HDFS ,亞馬遜的 S3 等等暑劝。Flink 任務(wù)狀態(tài)可以理解為實(shí)時(shí)任務(wù)計(jì)算過程中,中間產(chǎn)生的數(shù)據(jù)結(jié)果,同時(shí)這些計(jì)算結(jié)果會在后續(xù)實(shí)時(shí)任務(wù)處理時(shí),能夠繼續(xù)進(jìn)行使用。實(shí)時(shí)任務(wù)的狀態(tài)可以是一個(gè)聚合結(jié)果值掸掸,比如 WordCount 統(tǒng)計(jì)的每個(gè)單詞的數(shù)量,也可以是消息流中的明細(xì)數(shù)據(jù)陪竿。

Flink 任務(wù)狀態(tài)整體可以劃分兩種:Operator 狀態(tài)和 KeyedState。常見的 Operator 狀態(tài)桐绒,比如 Kafka Topic 每個(gè)分區(qū)的偏移量烁竭。KeyedState 是基于 KeyedStream 來使用的,所以在使用前,你需要對你的流通過 keyby 來進(jìn)行分區(qū),常見的狀態(tài)比如有 MapState舔痕、ListState慨代、ValueState 等等。

下面是一個(gè)實(shí)時(shí)計(jì)算奇數(shù)和偶數(shù)的任務(wù)的示例:

在上圖中说莫,假如輸入的流來自于 Kafka ,那么 Kafka Topic 分區(qū)的偏移量是狀態(tài)尊勿,所有奇數(shù)的和、所有偶數(shù)的和也都是狀態(tài)。

二、 Flink Checkpoint 流程和原理

2.1 開啟 Checkpoint 功能

想要使用 Flink Checkpoint 功能,首先是要在實(shí)時(shí)任務(wù)開啟 Checkpoint麻削。Flink 默認(rèn)情況下是關(guān)閉 Checkpoint 功能竖共,下面代碼是開啟 Checkpoint :

上述代碼中腿准,設(shè)置了 Flink Checkpoint 的間隔 3 秒,設(shè)置的 Checkpoint 的語義為 EXACTLY_ONCE饲嗽。Flink 默認(rèn)的 Checkpoint 語義為 EXACTLY_ONCE衔憨。上述代碼也使用 RocksDBStateBackend 進(jìn)行狀態(tài)存儲。用戶也可以自己設(shè)置 Flink Checkpoint 的參數(shù),通過 CheckpointConfig 這個(gè)類進(jìn)行設(shè)置心赶,代碼如下:

CheckpointConfig
 chkConfig = env.getCheckpointConfig();
/** 調(diào)用 CheckpointConfig 各種 set 方法 */
chkConfig.setXXX

2.2 Flink 一次 Checkpoint 的參與者

Flink 整體作業(yè)采用主從架構(gòu)销钝,Master 為 JobManager盯捌,Slave 為 TaskManager筷狼,Client 則是負(fù)責(zé)提交用戶實(shí)時(shí)任務(wù)的代碼邏輯 竖独,F(xiàn)link 整體框架圖如下圖所示:

JobManager 主要負(fù)責(zé)實(shí)時(shí)任務(wù)的調(diào)度以及對 Checkpoint 的觸發(fā)亭饵,TaskManager 負(fù)責(zé)真正用戶的代碼執(zhí)行邏輯,具體表現(xiàn)形式則是 Task 在 TaskManager上面進(jìn)行運(yùn)行懂算,一個(gè) Task 對應(yīng)一個(gè)線程只冻,它可能運(yùn)行一個(gè)算子的 SubTask,也可能是運(yùn)行多個(gè) Chain 起來的算子的 SubTask计技。

Flink 實(shí)時(shí)任務(wù)一次 Checkpoint 的參與者主要包括三塊:JobManager喜德、TaskManager以及 Zookeeper。JobManager 定時(shí)會觸發(fā)執(zhí)行 Checkpoint垮媒,具體則是在 JobManager 中運(yùn)行的 CheckpointCoordinator 中觸發(fā)所有 Source 的 SubTask 向下游廣播 CheckpointBarrier舍悯。

TaskManager 收到 CheckpointBarrier 后,根據(jù) Checkpoint 的語義睡雇,決定是否在進(jìn)行 CheckpointBarrier 對齊時(shí)萌衬,緩沖后續(xù)的數(shù)據(jù)記錄,當(dāng)收到所有上游輸入的 CheckpointBarrier 后它抱,開始做 Checkpoint秕豫。TaskManager Checkpoint 完成后,會向 JobManager 發(fā)送確認(rèn)完成的消息。只有當(dāng)所有 Sink 算子完成 Checkpoint 且發(fā)送確認(rèn)消息后混移,該次 Checkpoint 才算完成祠墅。

在高可用模式下,ZooKeeper 主要存儲最新一次 Checkpoint 成功的目錄歌径,當(dāng)Flink 任務(wù)容錯(cuò)恢復(fù)時(shí)毁嗦,會從最新成功的 Checkpoint 恢復(fù)。Zookeeper 同時(shí)也存儲著 Flink 作業(yè)的元數(shù)據(jù)信息回铛。比如在高可用模式下狗准,F(xiàn)link 會將 JobGraph 以及相關(guān) Jar 包存儲在 HDFS 上面,Zookeeper 記錄著該信息勺届。再次容錯(cuò)重啟時(shí)驶俊,讀取這些信息,進(jìn)行任務(wù)啟動免姿。

下圖是一次 Checkpoint 的參與者:

2.3 Checkpoint 協(xié)調(diào)者 — CheckpointCoordinator

CheckpointCoordinator,是 Checkpoint 中最重要的類饼酿,協(xié)調(diào)著實(shí)時(shí)任務(wù)整個(gè) Checkpoint 的執(zhí)行。下圖是 CheckpointCoordinator 中的方法:

Flink CheckpointCoordinator 中有幾個(gè)比較重要的方法:

  1. triggerCheckpoint胚膊,觸發(fā) Flink 任務(wù)進(jìn)行 Checkpoint 的方法

  2. triggerSavepoint故俐,觸發(fā) Flink 任務(wù) Savepoint 的方法

  3. restoreSavepoint,F(xiàn)link 任務(wù)從 Savepoint 狀態(tài)恢復(fù)

  4. restoreLatestCheckpointedState紊婉,從最新一次 Checkpoint 點(diǎn)位狀態(tài)恢復(fù)

  5. receiveAcknowledgeMessage药版,接受 Operator SubTask Checkpoint 完成的消息并處理

Flink CheckpointCoordinator 類是在 ExecutionGraph 形成時(shí)進(jìn)行初始化的,具體則是在 ExecutionGraph 創(chuàng)建之后喻犁,調(diào)用 enableCheckpointing 方法槽片,然后在該方法中,CheckpointCoordinator 進(jìn)行創(chuàng)建肢础。以下是 Flink Checkpoint 觸發(fā)的時(shí)序圖:

當(dāng) Flink 作業(yè)狀態(tài)由創(chuàng)建到運(yùn)行時(shí)还栓,CheckpointCoordinator 中的 ScheduledThreadPoolExecutor 會定時(shí)執(zhí)行 ScheduledTrigger 中的邏輯。ScheduledTrigger 本質(zhì)就是一個(gè) Runnable传轰,run 方法中執(zhí)行 triggerCheckpoint 方法剩盒。

2.4 Flink Checkpoint 流程與原理

一次 Flink Checkpoint 的流程是從 CheckpointCoordinator 的 triggerCheckpoint 方法開始,下面來看看一次 Flink Checkpoint 涉及到的主要內(nèi)容:

  1. Checkpoint 開始之前先進(jìn)行預(yù)檢查慨蛙,比如檢查最大并發(fā)的 Checkpoint 數(shù)辽聊,最小的 Checkpoint 之間的時(shí)間間隔。默認(rèn)情況下期贫,最大并發(fā)的 Checkpoint 數(shù)為 1跟匆,最小的 Checkpoint 之間的時(shí)間間隔為 0.

  2. 判斷所有 Source 算子的 Subtask (Execution) 是否都處于運(yùn)行狀態(tài),有則直接報(bào)錯(cuò)通砍。同時(shí)檢查所有待確認(rèn)的算子的 SubTask(Execution)是否是運(yùn)行狀態(tài)贾铝,有則直接報(bào)錯(cuò)。

  3. 創(chuàng)建 PendingCheckpoint埠帕,同時(shí)為該次 Checkpoint 創(chuàng)建一個(gè) Runnable垢揩,即超時(shí)取消線程,默認(rèn) Checkpoint 十分鐘超時(shí)敛瓷。

  4. 循環(huán)遍歷所有 Source 算子的 Subtask(Execution),最底層調(diào)用 Task 的triggerCheckpointBarrier, 廣播 CheckBarrier 到下游 叁巨,同時(shí) Checkpoint 其狀態(tài)。

  5. 下游的輸入中有 CheckpointBarrierHandler 類來處理 CheckpoinBarrier呐籽,然后會調(diào)用 notifyCheckpoint 方法锋勺,通知 Operator SubTask 進(jìn)行 Checkpoint。

  6. 每當(dāng) Operator SubTask 完成 Checkpoint 時(shí)狡蝶,都會向 CheckpointCoordoritor 發(fā)送確認(rèn)消息庶橱。CheckpointCoordinator 的 receiveAcknowledgeMessage 方法會進(jìn)行處理。

  7. 在一次 Checkpoint 過程中贪惹,當(dāng)所有從 Source 端到 Sink 端的算子 SubTask 都完成之后苏章,CheckpointCoordoritor 會通知算子進(jìn)行 notifyCheckpointCompleted 方法,前提是算子的函數(shù)實(shí)現(xiàn) CheckpointListener 接口奏瞬。

Flink 會定時(shí)在任務(wù)的 Source 算子的 SubTask 觸發(fā) CheckpointBarrier枫绅,CheckpointBarrier 是一種特殊的消息事件,會隨著消息通道流入到下游的算子中硼端。只有當(dāng)最后 Sink 端的算子接收到 CheckpointBarrier 并確認(rèn)該次 Checkpoint 完成時(shí)并淋,該次 Checkpoint 才算完成。所以在某些算子的 Task 有多個(gè)輸入時(shí)珍昨,會存在 Barrier 對齊時(shí)間县耽,我們可以在 Flink Web UI上面看到各個(gè) Task 的 CheckpointBarrier 對齊時(shí)間。

下圖是一次 Flink Checkpoint 實(shí)例流程示意圖:

Flin Checkpoint 保存的任務(wù)狀態(tài)在程序取消停止時(shí)镣典,默認(rèn)會進(jìn)行清除兔毙。Checkpoint 狀態(tài)保留策略主要有兩種:

DELETE_ON_CANCELLATION,RETAIN_ON_CANCELLATION

DELETE_ON_CANCELLATION 表示當(dāng)程序取消時(shí),刪除 Checkpoint 存儲的狀態(tài)文件骆撇。RETAIN_ON_CANCELLATION 表示當(dāng)程序取消時(shí)瞒御,保存之前的 Checkpoint 存儲的狀態(tài)文件 用戶可以結(jié)合業(yè)務(wù)情況,設(shè)置 Checkpoint 保留模式:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/** 開啟 checkpoint */
env.enableCheckpointing(10000);
/** 設(shè)置 checkpoint 保留策略,取消程序時(shí)神郊,保留 checkpoint 狀態(tài)文件 */
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

2.5 Flink Checkpoint 語義

Flink Checkpoint 支持兩種語義:Exactly_OnceAt_least_Once肴裙,默認(rèn)的 Checkpoint 語義是 Exactly_Once。具體語義含義如下:

Exactly_Once 含義是:保證每條數(shù)據(jù)對于 Flink 任務(wù)的狀態(tài)結(jié)果只影響一次涌乳。打個(gè)比方蜻懦,比如 WordCount 程序,目前實(shí)時(shí)統(tǒng)計(jì)的 "hello" 這個(gè)單詞數(shù)為 5夕晓,同時(shí)這個(gè)結(jié)果在這次 Checkpoint 成功后宛乃,保存在了 HDFS。在下次 Checkpoint 之前, 又來 2 個(gè) "hello" 單詞征炼,突然程序遇到外部異常自動容錯(cuò)恢復(fù)析既,會從最近的 Checkpoint 點(diǎn)開始恢復(fù),那么會從單詞數(shù)為 5 的這個(gè)狀態(tài)點(diǎn)開始恢復(fù)谆奥,Kafka 消費(fèi)的數(shù)據(jù)點(diǎn)位也是狀態(tài)為 5 這個(gè)點(diǎn)位開始計(jì)算眼坏,所以即使程序遇到外部異常自動恢復(fù)時(shí),也不會影響到 Flink 狀態(tài)的結(jié)果計(jì)算酸些。

At_Least_Once 含義是:每條數(shù)據(jù)對于 Flink 任務(wù)的狀態(tài)計(jì)算至少影響一次宰译。比如在 WordCount 程序中,你統(tǒng)計(jì)到的某個(gè)單詞的單詞數(shù)可能會比真實(shí)的單詞數(shù)要大魄懂,因?yàn)橥粭l消息沿侈,當(dāng) Flink 任務(wù)容錯(cuò)恢復(fù)后,可能將其計(jì)算多次市栗。

Flink 中 Exactly_Once 和 At_Least_Once 具體是針對 Flink 任務(wù)狀態(tài)而言的缀拭,并不是 Flink 程序?qū)ο⒂涗浿惶幚硪淮巍Ee個(gè)例子肃廓,當(dāng)前 Flink 任務(wù)正在做 Checkpoint智厌,該次 Checkpoint 還沒有完成,這次 Checkpoint 時(shí)間段的數(shù)據(jù)其實(shí)已經(jīng)進(jìn)入 Flink 程序處理盲赊,只是程序狀態(tài)沒有最終存儲到遠(yuǎn)程存儲铣鹏。當(dāng)程序突然遇到異常,進(jìn)行容錯(cuò)恢復(fù)時(shí)哀蘑,那么就會從最新的 Checkpoint 進(jìn)行狀態(tài)恢復(fù)重啟诚卸,上一次 Checkpoint 成功到這次 Checkpoint 失敗的數(shù)據(jù)還會進(jìn)入 Flink 系統(tǒng)重新處理,具體實(shí)例如下圖:

上圖中表示一個(gè) WordCount 實(shí)時(shí)任務(wù)的 Checkpoint绘迁,在進(jìn)行 chk-5 Checkpoint 時(shí)合溺,突然遇到程序異常,那么實(shí)時(shí)任務(wù)會從 chk-4 進(jìn)行恢復(fù)缀台,那么之前 chk-5 處理的數(shù)據(jù)棠赛,F(xiàn)link 系統(tǒng)會再次進(jìn)行處理。不過這些數(shù)據(jù)的狀態(tài)沒有 Checkpoint 成功膛腐,所以 Flink 任務(wù)容錯(cuò)恢復(fù)再次運(yùn)行時(shí)睛约,對于狀態(tài)的影響還是只有一次。

Exactly_Once 和 At_Least_Once 具體在底層實(shí)現(xiàn)大致相同哲身,具體差異表現(xiàn)在 CheckpointBarrier 對齊方式的處理:

如果是 Exactly_Once 模式辩涝,某個(gè)算子的 Task 有多個(gè)輸入通道時(shí),當(dāng)其中一個(gè)輸入通道收到 CheckpointBarrier 時(shí)勘天,F(xiàn)link Task 會阻塞該通道怔揩,其不會處理該通道后續(xù)數(shù)據(jù)捉邢,但是會將這些數(shù)據(jù)緩存起來,一旦完成了所有輸入通道的 CheckpointBarrier 對齊商膊,才會繼續(xù)對這些數(shù)據(jù)進(jìn)行消費(fèi)處理伏伐。

對于 At_least_Once,同樣針對某個(gè)算子的 Task 有多個(gè)輸入通道的情況下翘狱,當(dāng)某個(gè)輸入通道接收到 CheckpointBarrier 時(shí)秘案,它不同于 Exactly Once,即使沒有完成所有輸入通道 CheckpointBarrier 對齊潦匈,At Least Once 也會繼續(xù)處理后續(xù)接收到的數(shù)據(jù)。所以使用 At Least Once 不能保證數(shù)據(jù)對于狀態(tài)計(jì)算只有一次的計(jì)算影響赚导。

三茬缩、 Flink Checkpoint 常見失敗原因和注意點(diǎn)

3.1 Flink Checkpoint 常見失敗原因分析

Flink Checkpoint 失敗有很多種原因,常見的失敗原因如下:

  1. 用戶代碼邏輯沒有對于異常處理吼旧,讓其直接在運(yùn)行中拋出凰锡。比如解析 Json 異常,沒有捕獲圈暗,導(dǎo)致 Checkpoint失敗掂为,或者調(diào)用 Dubbo 超時(shí)異常等等。

  2. 依賴外部存儲系統(tǒng)员串,在進(jìn)行數(shù)據(jù)交互時(shí)勇哗,出錯(cuò),異常沒有處理寸齐。比如輸出數(shù)據(jù)到 Kafka欲诺、Redis、HBase等渺鹦,客戶端拋出了超時(shí)異常扰法,沒有進(jìn)行捕獲,F(xiàn)link 任務(wù)容錯(cuò)機(jī)制會再次重啟毅厚。

  3. 內(nèi)存不足塞颁,頻繁GC,超出了 GC 負(fù)載的限制吸耿。比如 OOM 異常

  4. 網(wǎng)絡(luò)問題祠锣、機(jī)器不可用問題等等。

從目前的具體實(shí)踐情況來看珍语,F(xiàn)link Checkpoint 異常覺大多數(shù)還是用戶代碼邏輯的問題锤岸,對于程序異常沒有正確的處理導(dǎo)致。所以在編寫 Flink 實(shí)時(shí)任務(wù)時(shí)板乙,一定要注意處理程序可能出現(xiàn)的各種異常是偷。這樣拳氢,也會讓實(shí)時(shí)任務(wù)的邏輯更加的健壯。

當(dāng)自己的 Flink 實(shí)時(shí)任務(wù) Checkpoint 失敗時(shí)蛋铆,用戶可以先通過 Flink Web UI 進(jìn)行快速定位 Checkpoint 失敗的原因馋评,如果在 Flink Web UI 上面沒有看到異常信息,可以去看任務(wù)的具體日志進(jìn)行定位刺啦,如下是 Flink Web UI 查看錯(cuò)誤原因示意圖:

3.2 Flink Checkpoint 參數(shù)配置及注意點(diǎn)

下面是設(shè)置 Flink Checkpoint 參數(shù)配置的建議及注意點(diǎn):

  1. 當(dāng) Checkpoint 時(shí)間比設(shè)置的 Checkpoint 間隔時(shí)間要長時(shí)留特,可以設(shè)置 Checkpoint 間最小時(shí)間間隔。這樣在上次 Checkpoint 完成時(shí)玛瘸,不會立馬進(jìn)行下一次 Checkpoint蜕青,而是會等待一個(gè)最小時(shí)間間隔,之后再進(jìn)行 Checkpoint糊渊。否則右核,每次 Checkpoint 完成時(shí),就會立馬開始下一次 Checkpoint渺绒,系統(tǒng)會有很多資源消耗 Checkpoint 方面贺喝,而真正任務(wù)計(jì)算的資源就會變少。
  2. 如果Flink狀態(tài)很大宗兼,在進(jìn)行恢復(fù)時(shí)躏鱼,需要從遠(yuǎn)程存儲上讀取狀態(tài)進(jìn)行恢復(fù),如果狀態(tài)文件過大殷绍,此時(shí)可能導(dǎo)致任務(wù)恢復(fù)很慢染苛,大量的時(shí)間浪費(fèi)在網(wǎng)絡(luò)傳輸方面。此時(shí)可以設(shè)置 Flink Task 本地狀態(tài)恢復(fù)篡帕,任務(wù)狀態(tài)本地恢復(fù)默認(rèn)沒有開啟殖侵,可以設(shè)置參數(shù) state.backend.local-recovery 值為 true 進(jìn)行激活。
  3. Checkpoint 保存數(shù)镰烧,Checkpoint 保存數(shù)默認(rèn)是1拢军,也就是只保存最新的 Checkpoint 的狀態(tài)文件,當(dāng)進(jìn)行狀態(tài)恢復(fù)時(shí)怔鳖,如果最新的 Checkpoint 文件不可用時(shí)(比如 HDFS 文件所有副本都損壞或者其他原因)茉唉,那么狀態(tài)恢復(fù)就會失敗,如果設(shè)置 Checkpoint 保存數(shù) 2结执,即使最新的Checkpoint恢復(fù)失敗度陆,那么Flink 會回滾到之前那一次 Checkpoint 的狀態(tài)文件進(jìn)行恢復(fù)∠揍#考慮到這種情況懂傀,用戶可以增加 Checkpoint 保存數(shù)。
  4. 建議設(shè)置的 Checkpoint 的間隔時(shí)間最好大于 Checkpoint 的完成時(shí)間蜡感。

下圖是不設(shè)置 Checkpoint 最小時(shí)間間隔示例圖蹬蚁,可以看到恃泪,系統(tǒng)一致在進(jìn)行 Checkpoint,大量的資源使用在 Flink Chekpoint 上犀斋,可能對運(yùn)行的任務(wù)產(chǎn)生一定影響:

還有一種特殊的情況贝乎,F(xiàn)link 端到端 Sink 的 EXACTLYONCE 的問題,也就是數(shù)據(jù)從 Flink 端到外部消息系統(tǒng)的消息一致性叽粹。打個(gè)比方览效,F(xiàn)link 輸出數(shù)據(jù)到 Kafka 消息系統(tǒng)中,如果使用 Kafka 0.10 的版本虫几,F(xiàn)link 不支持端到端的 EXACTLYONCE锤灿,可能存在消息重復(fù)輸入到 Kafka。

如上圖所示持钉,當(dāng)做 chk-5 Checkpoint 的時(shí)候衡招,chk-5 失敗,然后從 chk-4 來進(jìn)行恢復(fù)每强,但是 chk-5 的部分?jǐn)?shù)據(jù)在 Chekpoint 失敗之前就有部分進(jìn)入到 Kafka 消息系統(tǒng),再次恢復(fù)時(shí)州刽,該部分?jǐn)?shù)據(jù)可能再次重放到 Kafka 消息系統(tǒng)中空执。

Flink 中解決端到端的一致性有兩種方法:做冪等以及事務(wù)寫,冪等的話穗椅,可以使用 KV 存儲系統(tǒng)來做冪等辨绊,因?yàn)?KV 存儲系統(tǒng)的多次操作結(jié)果都是相同的。Flink 內(nèi)部目前支持二階段事務(wù)提交匹表,Kafka 0.11 以上版本支持事務(wù)寫门坷,所以支持 Flink 端到 Kafka 端的 EXACTLY_ONCE。

四袍镀、 優(yōu)化實(shí)踐

實(shí)時(shí)計(jì)算對于 Flink 任務(wù)的 Checkpoint 和 Savepoint 做了兩個(gè)方面工作默蚌,第一個(gè)工作是對于 Flink Checkpoint 失敗的情況,如果 Checkpoint 失敗過于頻繁苇羡,同時(shí) Flink Checkpoint 失敗次數(shù)如果達(dá)到平臺默認(rèn)的失敗閾值绸吸,平臺會及時(shí)給用戶報(bào)警提示。我們會每 5 分鐘檢查一次實(shí)時(shí)任務(wù)设江,統(tǒng)計(jì)實(shí)時(shí)任務(wù)近 15 分鐘內(nèi)锦茁,F(xiàn)link Checkpoint 失敗次數(shù)的最大值和最小值的差值達(dá)到平臺默認(rèn)的閾值,則會立馬給用戶報(bào)警叉存,讓用戶能夠及時(shí)的處理問題码俩。

當(dāng)然,并不是所有的 Flink 實(shí)時(shí)任務(wù) Checkpoint 失敗平臺都能發(fā)現(xiàn)歼捏,因?yàn)?Checkpoint 失敗次數(shù)的檢查稿存,首先與用戶配置的 Checkpoint 的時(shí)間間隔有關(guān)笨篷。舉個(gè)例子,如果用戶配置的 Checkpoint 間隔為 1 小時(shí)挠铲,其實(shí)平臺默認(rèn) Checkpoint 邏輯檢查根本就無法發(fā)現(xiàn)實(shí)時(shí)任務(wù) Checkpoint 失敗冕屯。

針對這種情況,實(shí)時(shí)平臺也支持用戶自定義設(shè)置 Checkpoint 失敗閾值拂苹,目前支持兩種 Checkpoint 失敗邏輯檢查安聘,一個(gè)是 實(shí)時(shí)任務(wù)的 Checkpoint 失敗次數(shù)的總和達(dá)到閾值,另一個(gè)則是近 10 分鐘內(nèi)瓢棒,F(xiàn)link Checkpoint 次數(shù)的最大值和最小值的差值的計(jì)算邏輯浴韭,用戶可以根據(jù)實(shí)時(shí)任務(wù)的敏感度,設(shè)置具體的參數(shù)脯宿。

第二個(gè)方面則是針對 Flink 任務(wù)的狀態(tài)恢復(fù)念颈,為了防止實(shí)時(shí)任務(wù)的狀態(tài)丟失,實(shí)時(shí)計(jì)算平臺會定期的對實(shí)時(shí)任務(wù)進(jìn)行 Savepoint 觸發(fā)连霉,當(dāng)任務(wù)由于外界因素導(dǎo)致任務(wù)失敗時(shí)榴芳,這種失敗是任務(wù)直接掛掉,Yarn 任務(wù)的狀態(tài)直接為 Killed跺撼,這種情況下窟感,如果用戶開啟自動拉起功能,實(shí)時(shí)平臺自動拉起實(shí)時(shí)任務(wù)歉井,同時(shí)從最新的 Savepoint 進(jìn)行狀態(tài)恢復(fù)柿祈,以至于狀態(tài)不丟失。同時(shí)哩至,實(shí)時(shí)計(jì)算平臺也支持用戶停止任務(wù)時(shí)躏嚎,觸發(fā) Savepoint,再次重啟實(shí)時(shí)任務(wù)時(shí)菩貌,還是從停止時(shí)的任務(wù)狀態(tài)進(jìn)行恢復(fù)卢佣。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市菜谣,隨后出現(xiàn)的幾起案子珠漂,更是在濱河造成了極大的恐慌,老刑警劉巖尾膊,帶你破解...
    沈念sama閱讀 222,464評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件媳危,死亡現(xiàn)場離奇詭異,居然都是意外死亡冈敛,警方通過查閱死者的電腦和手機(jī)待笑,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,033評論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來抓谴,“玉大人暮蹂,你說我怎么就攤上這事寞缝。” “怎么了仰泻?”我有些...
    開封第一講書人閱讀 169,078評論 0 362
  • 文/不壞的土叔 我叫張陵荆陆,是天一觀的道長。 經(jīng)常有香客問我集侯,道長被啼,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,979評論 1 299
  • 正文 為了忘掉前任棠枉,我火速辦了婚禮浓体,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘辈讶。我一直安慰自己命浴,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,001評論 6 398
  • 文/花漫 我一把揭開白布贱除。 她就那樣靜靜地躺著生闲,像睡著了一般。 火紅的嫁衣襯著肌膚如雪月幌。 梳的紋絲不亂的頭發(fā)上跪腹,一...
    開封第一講書人閱讀 52,584評論 1 312
  • 那天,我揣著相機(jī)與錄音飞醉,去河邊找鬼。 笑死屯阀,一個(gè)胖子當(dāng)著我的面吹牛缅帘,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播难衰,決...
    沈念sama閱讀 41,085評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼钦无,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了盖袭?” 一聲冷哼從身側(cè)響起失暂,我...
    開封第一講書人閱讀 40,023評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎鳄虱,沒想到半個(gè)月后弟塞,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,555評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡拙已,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,626評論 3 342
  • 正文 我和宋清朗相戀三年决记,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片倍踪。...
    茶點(diǎn)故事閱讀 40,769評論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡系宫,死狀恐怖索昂,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情扩借,我是刑警寧澤椒惨,帶...
    沈念sama閱讀 36,439評論 5 351
  • 正文 年R本政府宣布,位于F島的核電站潮罪,受9級特大地震影響康谆,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜错洁,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,115評論 3 335
  • 文/蒙蒙 一秉宿、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧屯碴,春花似錦描睦、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,601評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至今艺,卻和暖如春韵丑,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背虚缎。 一陣腳步聲響...
    開封第一講書人閱讀 33,702評論 1 274
  • 我被黑心中介騙來泰國打工撵彻, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人实牡。 一個(gè)月前我還...
    沈念sama閱讀 49,191評論 3 378
  • 正文 我出身青樓陌僵,卻偏偏與公主長得像,于是被迫代替她去往敵國和親创坞。 傳聞我的和親對象是個(gè)殘疾皇子碗短,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,781評論 2 361