Flink狀態(tài)管理與Checkpoint實戰(zhàn)——模擬電商訂單計算過程中宕機的場景私杜,探索宕機恢復時如何精準繼續(xù)計算訂單

Flink的狀態(tài)與容錯是這個框架很核心的知識點奠涌。其中一致檢查點也就是Checkpoints也是Flink故障恢復機制的核心固灵,這篇文章將詳細介紹Flink的狀態(tài)管理和Checkpoints的概念以及在生產(chǎn)環(huán)境中的參數(shù)設(shè)置。

什么是State狀態(tài)轴术?

  • 在使用Flink進行窗口聚合統(tǒng)計难衰,排序等操作的時候,數(shù)據(jù)流的處理離不開狀態(tài)管理

  • 是一個Operator的運行的狀態(tài)/歷史值逗栽,在內(nèi)存中進行維護

  • 流程:一個算子的子任務(wù)接收輸入流盖袭,獲取對應(yīng)的狀態(tài),計算新的結(jié)果祭陷,然后把結(jié)果更新到狀態(tài)里面

在這里插入圖片描述

有狀態(tài)和無狀態(tài)介紹

  • 無狀態(tài)計算: 同個數(shù)據(jù)進到算子里面多少次苍凛,都是一樣的輸出,比如 filter

  • 有狀態(tài)計算:需要考慮歷史狀態(tài)兵志,同個輸入會有不同的輸出醇蝴,比如sum、reduce聚合操作

  • 狀態(tài)管理分類

    • ManagedState(用的多)

      • Flink管理想罕,自動存儲恢復

      • 細分兩類

        • Keyed State 鍵控狀態(tài)(用的多)

          • 有KeyBy才用這個悠栓,僅限用在KeyStream中,每個key都有state 按价,是基于KeyedStream上的狀態(tài)

          • 一般是用richFlatFunction,或者其他richfunction里面惭适,在open()聲明周期里面進行初始化

          • ValueState、ListState楼镐、MapState等數(shù)據(jù)結(jié)構(gòu)

        • Operator State 算子狀態(tài)(用的少,部分source會用)

          • ListState癞志、UnionListState、BroadcastState等數(shù)據(jù)結(jié)構(gòu)
    • RawState(用的少)

      • 用戶自己管理和維護

      • 存儲結(jié)構(gòu):二進制數(shù)組

  • State數(shù)據(jù)結(jié)構(gòu)(狀態(tài)值可能存在內(nèi)存框产、磁盤凄杯、DB或者其他分布式存儲中)

    • ValueState 簡單的存儲一個值(ThreadLocal / String)

      • ValueState.value()

      • ValueState.update(T value)

    • ListState 列表

      • ListState.add(T value)

      • ListState.get() //得到一個Iterator

    • MapState 映射類型

      • MapState.get(key)

      • MapState.put(key, value)

State狀態(tài)后端:存儲在哪里

  • Flink 內(nèi)置了以下這些開箱即用的 state backends :

    • (新版)HashMapStateBackend、EmbeddedRocksDBStateBackend

      • 如果沒有其他配置秉宿,系統(tǒng)將使用 HashMapStateBackend戒突。
    • (舊版)MemoryStateBackend、FsStateBackend描睦、RocksDBStateBackend

      • 如果不設(shè)置膊存,默認使用 MemoryStateBackend。
  • 狀態(tài)詳解

    • HashMapStateBackend 保存數(shù)據(jù)在內(nèi)部作為Java堆的對象忱叭。

      • 鍵/值狀態(tài)和窗口操作符持有哈希表,用于存儲值韵丑、觸發(fā)器等

      • 非常快埂息,因為每個狀態(tài)訪問和更新都對 Java 堆上的對象進行操作

      • 但是狀態(tài)大小受集群內(nèi)可用內(nèi)存的限制

      • 場景:

        • 具有大狀態(tài)、長窗口千康、大鍵/值狀態(tài)的作業(yè)享幽。

        • 所有高可用性設(shè)置拾弃。

    • EmbeddedRocksDBStateBackend 在RocksDB數(shù)據(jù)庫中保存狀態(tài)數(shù)據(jù)

      • 該數(shù)據(jù)庫(默認)存儲在 TaskManager 本地數(shù)據(jù)目錄中

      • 與HashMapStateBackend在java存儲 對象不同,數(shù)據(jù)存儲為序列化的字節(jié)數(shù)組

      • RocksDB可以根據(jù)可用磁盤空間進行擴展豪椿,并且是唯一支持增量快照的狀態(tài)后端奔坟。

      • 但是每個狀態(tài)訪問和更新都需要(反)序列化并可能從磁盤讀取,這導致平均性能比內(nèi)存狀態(tài)后端慢一個數(shù)量級

      • 場景

        • 具有非常大狀態(tài)咳秉、長窗口鸯隅、大鍵/值狀態(tài)的作業(yè)。

        • 所有高可用性設(shè)置

    • 舊版

MemoryStateBackend(內(nèi)存炕舵,不推薦在生產(chǎn)場景使用)
FsStateBackend(文件系統(tǒng)上跟畅,本地文件系統(tǒng)、HDFS, 性能更好,常用)
RocksDBStateBackend (無需擔心 OOM 風險徊件,是大部分時候的選擇)

代碼配置:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
//或者
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));

什么是Checkpoint檢查點

  • Flink中所有的Operator的當前State的全局快照

  • 默認情況下 checkpoint 是禁用的

  • Checkpoint是把State數(shù)據(jù)定時持久化存儲庇忌,防止丟失

  • 手工調(diào)用checkpoint,叫 savepoint疏橄,主要是用于flink集群維護升級等

  • 底層使用了Chandy-Lamport 分布式快照算法,保證數(shù)據(jù)在分布式環(huán)境下的一致性

  • 有狀態(tài)流應(yīng)用的一致檢查點略就,其實就是所有任務(wù)的狀態(tài),在某個時間點的一份 拷貝(一份快照)窄绒;這個時間點崔兴,應(yīng)該是所有任務(wù)都恰好處理完一個相同的輸入數(shù)據(jù)的時候

Flink 捆綁的些檢查點存儲類型:

  • 作業(yè)管理器檢查點存儲 JobManagerCheckpointStorage

  • 文件系統(tǒng)檢查點存儲 FileSystemCheckpointStorage

端到端(end-to-end)狀態(tài)一致性

數(shù)據(jù)一致性保證都是由流處理器實現(xiàn)的蛔翅,也就是說都是在Flink流處理器內(nèi)部保證的

在真實應(yīng)用中山析,了流處理器以外還包含了數(shù)據(jù)源(例如Kafka掏父、Mysql)和輸出到持久化系統(tǒng)(Kafka、Mysql爵政、Hbase、CK)

端到端的一致性保證陶缺,是意味著結(jié)果的正確性貫穿了整個流處理應(yīng)用的各個環(huán)節(jié),每一個組件都要保證自己的一致性等龙。
  • Source

    • 需要外部數(shù)據(jù)源可以重置讀取位置伶贰,當發(fā)生故障的時候重置偏移量到故障之前的位置
  • 內(nèi)部

    • 依賴Checkpoints機制黍衙,在發(fā)生故障的時可以恢復各個環(huán)節(jié)的數(shù)據(jù)
  • Sink:

    • 當故障恢復時,數(shù)據(jù)不會重復寫入外部系統(tǒng)位仁,常見的就是 冪等和事務(wù)寫入(和checkpoint配合)

有關(guān)檢查點配置的常用參數(shù)配置介紹

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//設(shè)置checkpoint的周期, 每隔1000 ms進行啟動一個檢查點
env.getCheckpointConfig().setCheckpointInterval(1000);
// 設(shè)置狀態(tài)級別模式為exactly-once
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//超時時間方椎,可能是保存太耗費時間或者是狀態(tài)后端的問題,任務(wù)同步執(zhí)行不能一直阻塞
env.getCheckpointConfig().setCheckpointTimeout(60000L);  
// 設(shè)置取消和故障時是否保留Checkpoint數(shù)據(jù)琳疏,這個設(shè)置較為重要闸拿,沒有正確的選擇好可能會導致檢查點數(shù)據(jù)失效
//有兩個參數(shù)可以設(shè)置
//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION: 取消作業(yè)時保留檢查點新荤。必須在取消后手動清理檢查點狀態(tài)。
//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 取消作業(yè)時刪除檢查點篱瞎。只有在作業(yè)失敗時,檢查點狀態(tài)才可用俐筋。
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

實戰(zhàn)部分:

為了模擬生產(chǎn)環(huán)境中實時產(chǎn)生的訂單數(shù)據(jù)校哎,這里我們自己定義一個數(shù)據(jù)源來源源不斷的產(chǎn)生模擬訂單數(shù)據(jù)

訂單類:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class VideoOrder {
    private String tradeNo;
    private String title;
    private int money;
    private int userId;
    private Date createTime;

    @Override
    public String toString() {
        return "VideoOrder{" +
                "tradeNo='" + tradeNo + '\'' +
                ", title='" + title + '\'' +
                ", money=" + money +
                ", userId=" + userId +
                ", createTime=" + createTime +
                '}';
    }
}

public class VideoOrderSourceV2 extends RichParallelSourceFunction<VideoOrder> {

    private volatile Boolean flag = true;

    private Random random = new Random();

    private static List<VideoOrder> list = new ArrayList<>();
    static {
        list.add(new VideoOrder("","java",10,0,null));
        list.add(new VideoOrder("","spring boot",15,0,null));
    }


    /**
     * run 方法調(diào)用前 用于初始化連接
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        System.out.println("-----open-----");
    }

    /**
     * 用于清理之前
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        System.out.println("-----close-----");
    }


    /**
     * 產(chǎn)生數(shù)據(jù)的邏輯
     * @param ctx
     * @throws Exception
     */
    @Override
    public void run(SourceContext<VideoOrder> ctx) throws Exception {

        while (flag){
            Thread.sleep(1000);
            String id = UUID.randomUUID().toString().substring(30);
            int userId = random.nextInt(10);
            int videoNum = random.nextInt(list.size());
            VideoOrder videoOrder = list.get(videoNum);
            videoOrder.setUserId(userId);
            videoOrder.setCreateTime(new Date());
            videoOrder.setTradeNo(id);
            System.out.println("產(chǎn)生:"+videoOrder.getTitle()+"闷哆,價格:"+videoOrder.getMoney()+", 時間:"+ TimeUtil.format(videoOrder.getCreateTime()));
            ctx.collect(videoOrder);
        }
    }

    /**
     * 控制任務(wù)取消
     */
    @Override
    public void cancel() {
        flag = false;
    }
}

產(chǎn)生數(shù)據(jù)的格式如下:

在這里插入圖片描述

主程序:使用reduce算子對數(shù)據(jù)進訂單價格進行滾動計算抱怔,并設(shè)置Checkpoint保證數(shù)據(jù)狀態(tài)可以存取

public class FlinkKeyByReduceApp {

    /**
     * source
     * transformation
     * sink
     *
     * @param args
     */
    public static void main(String[] args) throws Exception {

        //構(gòu)建執(zhí)行任務(wù)環(huán)境以及任務(wù)的啟動的入口, 存儲全局相關(guān)的參數(shù)
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.enableCheckpointing(5000);
 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //這是我本機的ip地址       
        env.getCheckpointConfig().setCheckpointStorage(new                                           FileSystemCheckpointStorage("hdfs://192.168.192.100:8020/checkpoint"));

       DataStreamSource<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
        KeyedStream<VideoOrder, String> videoOrderStringKeyedStream = ds.keyBy(new KeySelector<VideoOrder, String>() {
            @Override
            public String getKey(VideoOrder value) throws Exception {
                return value.getTitle();
            }
        });

        SingleOutputStreamOperator<VideoOrder> reduce = videoOrderStringKeyedStream.reduce(new ReduceFunction<VideoOrder>() {
            @Override
            public VideoOrder reduce(VideoOrder value1, VideoOrder value2) throws Exception {
                VideoOrder videoOrder = new VideoOrder();
                videoOrder.setTitle(value1.getTitle());
                videoOrder.setMoney(value1.getMoney() + value2.getMoney());
                return videoOrder;
            }
        });

        reduce.print();

        env.execute("job");
    }

}

在本地測試運行結(jié)果屈留,可以看到數(shù)據(jù)根據(jù)訂單分組不斷的進行滾動計算

在這里插入圖片描述

進入服務(wù)器的HDFS查看檢查點數(shù)據(jù)是否存在

在這里插入圖片描述

之后將應(yīng)用進行打包灌危,上傳到服務(wù)器進行測試碳胳,可以使用Flink的Web頁面進行手動提交jar包運行,也可以使用命令進行提交味混,之后可以看到程序運行過程中的相關(guān)日志輸出

在這里插入圖片描述
./bin/flink run -c net.xxx.xxx.FlinkKeyByReduceApp -p 3 /xiaochan-flink.jar 

模擬宕機

運行程序的時候我們可以在Flink看到任務(wù)進行的id號翁锡,這個時候我們手動的cancel掉或者是直接把服務(wù)kill掉夕土,這個時候任務(wù)被強制暫停。

進入到HDFS可以看到我們設(shè)置的檢查點的數(shù)據(jù)依舊存在哈踱,我們使用如下命令梨熙,讓程序從上次宕機前的訂單計算狀態(tài)繼續(xù)往下計算。

<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="" cid="n204" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; color: rgb(51, 51, 51); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">-s : 指定檢查點的元數(shù)據(jù)的位置邪财,這個位置記錄著宕機前程序的計算狀態(tài)
./bin/flink run -s /checkpoint/id號/chk-23/_metadata -c net.xxx.xxx.FlinkKeyByReduceApp -p 3 /root/xdclass-flink.jar </pre>

在這里插入圖片描述

運行命令树埠,進入WEB頁面進行查看,是否成功又碌。

在這里插入圖片描述

可以看到出現(xiàn)一次close的時候绊袋,代表我們的程序以及停止,服務(wù)器已經(jīng)宕機皂岔,這個時候訂單的計算結(jié)果如上圖的紅色方框展姐。在我們運行了上面那條命令后再次查看日志的數(shù)據(jù),從open開始可以看到這次就不是從訂單最初的狀態(tài)開始進行的了教馆,而是從上一次宕機前計算的結(jié)果墅拭,繼續(xù)往下計算,到這里Checkponit的實戰(zhàn)應(yīng)用測試就完成了舒憾。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末穗熬,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子探遵,更是在濱河造成了極大的恐慌妓柜,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,122評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件藏雏,死亡現(xiàn)場離奇詭異作煌,居然都是意外死亡,警方通過查閱死者的電腦和手機奏寨,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評論 3 395
  • 文/潘曉璐 我一進店門病瞳,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人套菜,你說我怎么就攤上這事笼踩⊥鱿樱” “怎么了?”我有些...
    開封第一講書人閱讀 164,491評論 0 354
  • 文/不壞的土叔 我叫張陵于购,是天一觀的道長知染。 經(jīng)常有香客問我,道長嫌吠,這世上最難降的妖魔是什么掺炭? 我笑而不...
    開封第一講書人閱讀 58,636評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮炕矮,結(jié)果婚禮上者冤,老公的妹妹穿的比我還像新娘。我一直安慰自己邢滑,他們只是感情好拜银,可當我...
    茶點故事閱讀 67,676評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著操灿,像睡著了一般。 火紅的嫁衣襯著肌膚如雪庶喜。 梳的紋絲不亂的頭發(fā)上救鲤,一...
    開封第一講書人閱讀 51,541評論 1 305
  • 那天本缠,我揣著相機與錄音,去河邊找鬼丹锹。 笑死楣黍,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的租漂。 我是一名探鬼主播,決...
    沈念sama閱讀 40,292評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼秃踩,長吁一口氣:“原來是場噩夢啊……” “哼业筏!你這毒婦竟也來了驾孔?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,211評論 0 276
  • 序言:老撾萬榮一對情侶失蹤妖啥,失蹤者是張志新(化名)和其女友劉穎对碌,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體怀读,經(jīng)...
    沈念sama閱讀 45,655評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,846評論 3 336
  • 正文 我和宋清朗相戀三年苍糠,在試婚紗的時候發(fā)現(xiàn)自己被綠了啤誊。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,965評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖牡昆,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情钻心,我是刑警寧澤铅协,帶...
    沈念sama閱讀 35,684評論 5 347
  • 正文 年R本政府宣布摊沉,位于F島的核電站,受9級特大地震影響骏全,放射性物質(zhì)發(fā)生泄漏尼斧。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,295評論 3 329
  • 文/蒙蒙 一楼咳、第九天 我趴在偏房一處隱蔽的房頂上張望烛恤。 院中可真熱鬧,春花似錦苹熏、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至蔬崩,卻和暖如春搀暑,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背桐罕。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評論 1 269
  • 我被黑心中介騙來泰國打工桂敛, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人薪伏。 一個月前我還...
    沈念sama閱讀 48,126評論 3 370
  • 正文 我出身青樓粗仓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親塘淑。 傳聞我的和親對象是個殘疾皇子蚂斤,可洞房花燭夜當晚...
    茶點故事閱讀 44,914評論 2 355

推薦閱讀更多精彩內(nèi)容