看了 BucketSink 的相關(guān)源碼匙头。著重看了它的checkpoint以及故障恢復(fù)機(jī)制瑟慈。
把大概的理解梳理如下:
BucketSink 大體的工作流程:
1.新建一個(gè)文件桃移,不斷的寫入文件中,后綴命名為 .in-progress
2.判斷文件寫入完畢封豪,關(guān)閉該文件時(shí)谴轮,后綴名命名為 .pending
3.checkpoint觸發(fā)時(shí),將上次ck到這次ck間的所有 .pending 文件變?yōu)?finish 狀態(tài)
BucketSink 實(shí)現(xiàn)了 CheckpointedFunction 接口
有兩個(gè)方法
void snapshotState(FunctionSnapshotContext context) throws Exception;
與
void initializeState(FunctionInitializationContext context) throws Exception;?其中:
initializeState 方法在每次新建 BucketSink 或者故障恢復(fù)時(shí) 會(huì)調(diào)用吹埠。
snapshotState 在每次觸發(fā) ck 時(shí)會(huì)被調(diào)用第步。
下面簡(jiǎn)單分析下這兩個(gè)方法的邏輯:
initializeState 方法主要執(zhí)行一些初始化操作,其中我認(rèn)為關(guān)鍵的在于
restoredBucketStates = stateStore.getSerializableListState("bucket-states");
該方法獲取一個(gè)叫做 bucket-states 的狀態(tài)對(duì)象缘琅,從名稱也可知粘都,該對(duì)象用于重啟。正常情況下刷袍,該對(duì)象無內(nèi)容下面的for語句不會(huì)執(zhí)行翩隧。但是若有故障重啟的情況,則會(huì)從上次的ck中讀取出內(nèi)容呻纹,也就是上次ck的狀態(tài)信息堆生,然后執(zhí)行回滾操作保證數(shù)據(jù)的一致性。這一點(diǎn)最后再做介紹雷酪。
snapshotState 方法用于觸發(fā) ck 操作淑仆。
這個(gè)方法做了如下幾件事
1.獲取當(dāng)前正在寫的 .pending 文件的大小,以便若下次 ck 前發(fā)生故障哥力,可以獲知本次ck時(shí)蔗怠,該文件的大小,以便刪除本次ck后到故障發(fā)生時(shí)寫入的數(shù)據(jù)吩跋,或者顯示該文件的有效數(shù)據(jù)大小寞射。
2.將所有 .pending 狀態(tài)的文件存儲(chǔ)到list中,稍后ck結(jié)束后锌钮,方便修改其狀態(tài)為 finish
3.將當(dāng)前狀態(tài)存入 restoredBucketStates 對(duì)象桥温,以便若下次 ck 前發(fā)生故障,可以從這個(gè)狀態(tài)處進(jìn)行恢復(fù)轧粟。
同時(shí)策治,BucketSink也實(shí)現(xiàn)了 CheckpointListener 接口
void notifyCheckpointComplete(long checkpointId) throws Exception;
該方法會(huì)在 ck 完成后調(diào)用脓魏。
該方法,將 .pending 文件的狀態(tài)轉(zhuǎn)為 final 狀態(tài)
并且移除writer已經(jīng)處于close狀態(tài)的bucket通惫。
最后詳細(xì)說一下故障恢復(fù)茂翔。
當(dāng)程序因故障自動(dòng)恢復(fù)時(shí),initializeState 方法的 restoredBucketStates 就會(huì)從上次 ck 中獲取到上次ck時(shí)的狀態(tài)履腋。進(jìn)而進(jìn)行恢復(fù)珊燎。
首先,將 .pending 狀態(tài)的文件名列表清空即可遵湖,因?yàn)閷?.pending 狀態(tài)轉(zhuǎn)為 finish 狀態(tài)悔政,可以在 notifyCheckpointComplete 方法中完成。故障恢復(fù)時(shí)延旧,該方法對(duì) .pending 的文件的做法是不做處理谋国,等待故障恢復(fù)之后,第一次ck觸發(fā)時(shí)迁沫,便會(huì)自動(dòng)的將 .pending 的文件變?yōu)?finish 狀態(tài)芦瘾。
而之所以不處理 .pending 狀態(tài)文件,是因?yàn)?.pending 狀態(tài)文件說明該文件已經(jīng)寫入完畢集畅,就差ck成功后修改文件狀態(tài)(也就是文件名)而已近弟,本質(zhì)上,該文件已經(jīng)不再寫入數(shù)據(jù)挺智,沒有數(shù)據(jù)的變化祷愉。
接下來 handlePendingInProgressFile 就是處理 .in-progress 狀態(tài)的文件。
我們?cè)O(shè)想一下赦颇,故障重啟是指在上次成功的ck之后二鳄,下次ck之前,發(fā)生了故障媒怯,然后應(yīng)用自動(dòng)重啟泥从,使用的是上次成功的ck的狀態(tài)信息。
這樣的話沪摄,上次 ck 時(shí)狀態(tài)為 .in-progress 的文件,可能在故障發(fā)生時(shí)纱烘,已經(jīng)處于 .pending 狀態(tài)杨拐,也就是寫完的狀態(tài),也可能仍然處于 .in-progress 狀態(tài)擂啥。
flink的做法是哄陶,不管處于什么狀態(tài) 首先全部標(biāo)注為 finish 狀態(tài)。然后根據(jù)上次ck時(shí)狀態(tài)中存儲(chǔ)的文件的大小進(jìn)行截?cái)嗖负@樣屋吨,該文件就能回滾到上次ck成功時(shí)的狀態(tài)蜒谤。若 Hadoop 版本不支持截?cái)嗖僮鳎瑒t新建一個(gè)后綴為 .valid-length 的文件至扰,內(nèi)容為文件的大小鳍徽,單位 byte。
然后flink就可以從上次ck處重新拉取數(shù)據(jù)源敢课,繼續(xù)處理阶祭,寫入sink。
最后直秆,調(diào)用 handlePendingFilesForPreviousCheckpoints 將上次ck成功后濒募,若故障發(fā)生的很快,沒來得及調(diào)用 CheckpointListener 的 notifyCheckpointComplete 方法圾结,則此處將文件狀態(tài)置為 finish 瑰剃。
BucketSink 是一個(gè)控制類,具體的寫入操作可以自己實(shí)現(xiàn) org.apache.flink.streaming.connectors.fs.Writer 接口筝野。
其中 snappy 等壓縮文件的追加晌姚,可以使用
Fs.append 的方式追加內(nèi)容到同一文件中