Flink源碼閱讀(四)--- checkpoint制作

本文內(nèi)容是基于Flink 1.9來講解纱意。Flink使用checkpoint檢查點來保證Exactly-Once語義的琅绅,這篇文章會從源碼角度分析下checkpoint怎么觸發(fā)的厦幅。首先說下checkpoint原理契讲,可以直接參考數(shù)據(jù)流容錯官方文檔唬党。

概述

Flink分布式快照的一個核心概念就是barrier,barrier會和數(shù)據(jù)記錄一起下發(fā)到stream中橙垢,是一種特殊的消息,非常輕量伦糯。一個barrier就能把數(shù)據(jù)記錄分到不同的snapshot中柜某,不同snapshot對應(yīng)的barrier可能同時在stream中嗽元,這也就意味著可以同時制作多個snapshot。
下面這兩個圖給出了barrier分割snapshot的原理喂击,以及checkpoint制作過程


barrier.png
checkpoint.png

checkpoint制作

1. 源碼入口 ExecutionGraphBuilder#buildGraph

在生成ExecutionGraph的時候剂癌,如果設(shè)置了開啟checkpoint,那在buildGraph方法中會調(diào)用executionGraph.enableCheckpointing方法翰绊,咱們接著看下這個方法做了什么珍手。

  • 構(gòu)建CheckpointCoordinator對象。CheckpointCoordinator主要負責(zé)觸發(fā)checkpoint制作并接收task的ack信息辞做,收集并維護task發(fā)送的ack state全局視圖琳要。
  • 設(shè)置checkpoint狀態(tài)追蹤器
  • 創(chuàng)建CheckpointCoordinatorDeActivator對象,該對象會監(jiān)控JobStatus是activates/deactivates來啟動/停止checkpoint scheduler

2. CheckpointCoordinatorDeActivator是怎么觸發(fā)checkpoint制作的

2.1 看下CheckpointCoordinatorDeActivator#jobStatusChanges方法
    @Override
    public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
        if (newJobStatus == JobStatus.RUNNING) {
            // start the checkpoint scheduler
            coordinator.startCheckpointScheduler();
        } else {
            // anything else should stop the trigger for now
            coordinator.stopCheckpointScheduler();
        }
    }

如果監(jiān)控到JobStatus變成RUNNING秤茅,就會調(diào)用CheckpointCoordinator的startCheckpointScheduler方法稚补,最終會調(diào)用CheckpointCoordinator#triggerCheckpoint方法,triggerCheckpoint主要做了以下幾件事情

  1. 制作checkpoint之前框喳,會做一些檢查
    ?? - 排隊checkpoint數(shù)是否超過設(shè)定的允許最大并行checkpoint數(shù)
    ?? - 兩次checkpoint制作間隔是否滿足要求
  2. check需要觸發(fā)checkpoint所有source task是不是都是running狀態(tài)
  3. check需要對checkpoint進行ack的所有task(包括source+非source)是不是都是running狀態(tài)
  4. 對checkpointID加1
  5. 初始化CheckpointStorageLocation
  6. 往pendingCheckpoints這個map中新增一個PendingCheckpoint课幕, key是checkpointID。 PendingCheckpoint表示checkpoint制作已經(jīng)開始五垮,不過部分task還沒有進行ack乍惊,一旦所有的task都對這次checkpoint進行了ack,PendingCheckpoint就變成了CompletedCheckpoint放仗。
  7. 注冊cancellerHandle润绎,比如checkpoint超時就cancel掉
  8. 對于所有的source task,調(diào)用execution.triggerCheckpoint方法

從全局看了checkpoint怎么觸發(fā)的诞挨,然后再看下具體Task粒度怎么做checkpoint的莉撇,入口就是上面提到的execution.triggerCheckpoint,真正調(diào)用是Task#triggerCheckpointBarrier方法

2.2 SourceTask怎么觸發(fā)checkpoint的

Task#triggerCheckpointBarrier方法會再去調(diào)用StreamTask#performCheckpoint方法惶傻,接著看StreamTask#performCheckpoint方法源碼

            if (isRunning) {

                if (checkpointOptions.getCheckpointType().isSynchronous()) {
                    syncSavepointLatch.setCheckpointId(checkpointId);

                    if (advanceToEndOfTime) {
                        advanceToEndOfEventTime();
                    }
                }

                // All of the following steps happen as an atomic step from the perspective of barriers and
                // records/watermarks/timers/callbacks.
                // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
                // checkpoint alignments

                // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
                //           The pre-barrier work should be nothing or minimal in the common case.
                operatorChain.prepareSnapshotPreBarrier(checkpointId);

                // Step (2): Send the checkpoint barrier downstream
                operatorChain.broadcastCheckpointBarrier(
                        checkpointId,
                        checkpointMetaData.getTimestamp(),
                        checkpointOptions);

                // Step (3): Take the state snapshot. This should be largely asynchronous, to not
                //           impact progress of the streaming topology
                checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);

                return true;
            }

從源碼可以看出棍郎,checkpoint的制作主流程是異步來執(zhí)行的,主要分為

  1. checkpoint制作前的準(zhǔn)備银室,允許operators可以做一些前置工作涂佃,這個前置工作最好是沒有,也可以有很小一部分
  2. 往下游發(fā)barrier(向該Source Task的所有輸出蜈敢,以廣播的方式發(fā)送barrier辜荠,CheckpointBarrier有三個對象組成checkpointId+timestamp+CheckpointOptions(CheckpointType和CheckpointStorageLocationReference))
  3. 開始做checkpoint,這個步驟應(yīng)該盡可能的異步扶认,以免影響job執(zhí)行
2.3 SourceTask怎么制作checkpoint的

checkpoint制作跟進去看下StreamTask#executeCheckpointing代碼

            startSyncPartNano = System.nanoTime();

            try {
                // 1. 同步執(zhí)行侨拦,會依次調(diào)用每一個operator的StreamOperator.snapshotState,返回結(jié)果是一個runnable future辐宾。
                // 根據(jù)checkpoint配置成同步模式和異步模式的區(qū)別狱从,這個future可能處于完成狀態(tài)膨蛮,也可能處于未完成狀態(tài)
                for (StreamOperator<?> op : allOperators) {
                    checkpointStreamOperator(op);
                }

                if (LOG.isDebugEnabled()) {
                    LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",
                        checkpointMetaData.getCheckpointId(), owner.getName());
                }

                startAsyncPartNano = System.nanoTime();

                checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);

                // 2. 異步執(zhí)行,如果是checkpoint配置成同步執(zhí)行季研,這里實際上所有的runnable future都是已經(jīng)完成的狀態(tài)
                // we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
                AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
                    owner,
                    operatorSnapshotsInProgress,
                    checkpointMetaData,
                    checkpointMetrics,
                    startAsyncPartNano);

                owner.cancelables.registerCloseable(asyncCheckpointRunnable);
                owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);

                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} - finished synchronous part of checkpoint {}. " +
                            "Alignment duration: {} ms, snapshot duration {} ms",
                        owner.getName(), checkpointMetaData.getCheckpointId(),
                        checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
                        checkpointMetrics.getSyncDurationMillis());
                }
            }

其實checkpoint制作分為兩部分

  1. 同步checkpoint
    依次調(diào)用每一個operator的StreamOperator.snapshotState敞葛,返回一個OperatorSnapshotFutures,并將其放入operatorSnapshotsInProgress与涡。各operator的snapshotState方法的實現(xiàn)是一樣的惹谐,可以看下AbstractStreamOperator#snapshotState,這個方法首先創(chuàng)建了一個OperatorSnapshotFutures驼卖,并且最終會把OperatorSnapshotFutures return出去氨肌,看下OperatorSnapshotFutures的成員變量,來大體了解下snapshotState主要是存什么內(nèi)容
    @Nonnull
    private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateManagedFuture;

    @Nonnull
    private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateRawFuture;

    @Nonnull
    private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateManagedFuture;

    @Nonnull
    private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateRawFuture;

從成員變量就可以看出酌畜,snapshot中主要存儲四部分內(nèi)容怎囚,分別為keyed manage state, keyed raw state, operator manage state 和 operator raw state。

然后具體看下DefaultOperatorStateBackendSnapshotStrategy#snapshot中都做了什么桥胞?

  • 把注冊的所有OperatorState(就是ListState) 和 BroadcastState 做深度拷貝
  • 將實際的寫入操作封裝在一個異步的 FutureTask中恳守,如果不啟用異步checkpoint模式,那么這個 FutureTask 在同步階段就會立刻執(zhí)行贩虾。這個 FutureTask 的主要工作包括:
    ?? 1. 獲取注冊的operator state 以及 broadcast operator state 信息
    ?? 2. 把上面的state寫入checkpoint中
    ?? 3. 把狀態(tài)元信息寫入writtenStatesMetaData這個map中
    ?? 4. 關(guān)閉輸出流催烘,返回狀態(tài)句柄OperatorStreamStateHandle
  1. 異步checkpoint
    同步checkpoint完成之后,會將具體的信息寫入異步實現(xiàn)缎罢,該流程可以看下AsyncCheckpointRunnable#run方法
  • 調(diào)用new OperatorSnapshotFinalizer(snapshotInProgress);方法等待各future執(zhí)行完成
  • 調(diào)用reportCompletedSnapshotStates方法完成checkpoint狀態(tài)上報JobManager伊群,也就是對checkpoint進行ack
2.4 非SourceTask是怎么觸發(fā)checkpoint的

對于SourceTask,checkpint的執(zhí)行是由JM觸發(fā)屁使,對于非SourceTask在岂,則是依靠上游Task的checkpointBarrier消息觸發(fā)。上面也提到了蛮寂,在SourceTask進行checkpoint時,會向下游發(fā)送CheckpointBarrier消息易茬,而下游的task正是拿到該消息后酬蹋,進行checkpoint操作。

我們知道所有的Task都是繼承StreamTask抽莱,在Task執(zhí)行的時候范抓,會去調(diào)用StreamTask#run方法,下面看下run方法邏輯

    /**
     * Runs the stream-tasks main processing loop.
     */
    private void run() throws Exception {
        final ActionContext actionContext = new ActionContext();
        while (true) {
            if (mailbox.hasMail()) {
                Optional<Runnable> maybeLetter;
                while ((maybeLetter = mailbox.tryTakeMail()).isPresent()) {
                    Runnable letter = maybeLetter.get();
                    if (letter == POISON_LETTER) {
                        return;
                    }
                    letter.run();
                }
            }

            processInput(actionContext);
        }
    }

processInput是用來處理數(shù)據(jù)的食铐,StreamInputProcessor是處理數(shù)據(jù)記錄的接口匕垫,它有三個實現(xiàn)類,分別為StreamOneInputProcessor虐呻,StreamTwoInputProcessor和StreamTwoInputSelectableProcessor象泵。我們以StreamOneInputProcessor#processInput進行分析怎么處理barrier消息的

    @Override
    public boolean processInput() throws Exception {
        initializeNumRecordsIn();

        StreamElement recordOrMark = input.pollNextNullable();
        if (recordOrMark == null) {
            input.isAvailable().get();
            return !checkFinished();
        }
        int channel = input.getLastChannel();
        checkState(channel != StreamTaskInput.UNSPECIFIED);

        processElement(recordOrMark, channel);
        return true;
    }

關(guān)于barrier的調(diào)用鏈?zhǔn)牵篿nput.pollNextNullable() --> StreamTaskNetworkInput#pollNextNullable --> checkpointedInputGate.pollNext() --> barrierHandler.processBarrier寞秃,CheckpointBarrierHandler有兩個實現(xiàn)類

  • CheckpointBarrierAligner,對應(yīng)EXACTLY_ONCE語義
  • CheckpointBarrierTracker偶惠,對應(yīng)AT_LEAST_ONCE語義

接下來看下CheckpointBarrierAligner#processBarrier方法春寿,涉及到以下處理流程

  1. 如果下游Task的InputChannels為1(消費的partition數(shù)為1),直接不需要對齊忽孽,return
  2. 第一個barrier到達Task绑改,調(diào)用beginNewAlignment方法開始checkpoint,然后調(diào)用onBarrier方法兄一,將numBarriersReceived加1厘线,把對應(yīng)的inputchannel的狀態(tài)blockedChannels[channelIndex]置為true,開始barrier對齊操作
  3. 不斷調(diào)用onBarrier方法出革,直到所有的barrier都到齊造壮。
  4. 等所有的barrier到齊之后,會進行 (1) 調(diào)用releaseBlocksAndResetBarriers蹋盆,該方法把blockedChannels的狀態(tài)都設(shè)置成false费薄,并且把numBarriersReceived設(shè)置成0,方便下一次checkpoint制作栖雾,barrier對齊結(jié)束楞抡。 (2) 調(diào)用notifyCheckpoint方法,開始該Task 的checkpoint制作析藕,checkpoint制作還是調(diào)用StreamTask#performCheckpoint方法召廷,涉及到 i): snapshot前準(zhǔn)備工作 ii:) 往下游廣播barrier iii:) 做快照

至此,checkpoint的觸發(fā)以及制作原理基本介紹完了

小結(jié)

本篇文章簡單介紹了下checkpoint是怎么被調(diào)用账胧,以及SourceTask怎么從開始被觸發(fā)竞慢,以及非SourceTask怎么觸發(fā)和制作checkpoint的。

  • barrier是以Task也就是operatorChain為單位下發(fā)的
  • snapshot中的state是以operator為單位制作的
  • 非SourceTask只有把它消費的所有partition的barrier都接收到之后治泥,才會進行checkpoint的制作

推薦文章列表

checkpoint總結(jié)不錯的文章 Flink CheckPoint詳細過程

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末筹煮,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子居夹,更是在濱河造成了極大的恐慌败潦,老刑警劉巖,帶你破解...
    沈念sama閱讀 207,113評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件准脂,死亡現(xiàn)場離奇詭異劫扒,居然都是意外死亡,警方通過查閱死者的電腦和手機狸膏,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,644評論 2 381
  • 文/潘曉璐 我一進店門沟饥,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人,你說我怎么就攤上這事贤旷」懔希” “怎么了?”我有些...
    開封第一講書人閱讀 153,340評論 0 344
  • 文/不壞的土叔 我叫張陵遮晚,是天一觀的道長性昭。 經(jīng)常有香客問我,道長县遣,這世上最難降的妖魔是什么糜颠? 我笑而不...
    開封第一講書人閱讀 55,449評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮萧求,結(jié)果婚禮上其兴,老公的妹妹穿的比我還像新娘。我一直安慰自己夸政,他們只是感情好元旬,可當(dāng)我...
    茶點故事閱讀 64,445評論 5 374
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著守问,像睡著了一般匀归。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上耗帕,一...
    開封第一講書人閱讀 49,166評論 1 284
  • 那天穆端,我揣著相機與錄音,去河邊找鬼仿便。 笑死体啰,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的嗽仪。 我是一名探鬼主播荒勇,決...
    沈念sama閱讀 38,442評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼闻坚!你這毒婦竟也來了沽翔?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,105評論 0 261
  • 序言:老撾萬榮一對情侶失蹤窿凤,失蹤者是張志新(化名)和其女友劉穎搀擂,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體卷玉,經(jīng)...
    沈念sama閱讀 43,601評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,066評論 2 325
  • 正文 我和宋清朗相戀三年喷市,在試婚紗的時候發(fā)現(xiàn)自己被綠了相种。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,161評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖寝并,靈堂內(nèi)的尸體忽然破棺而出箫措,到底是詐尸還是另有隱情,我是刑警寧澤衬潦,帶...
    沈念sama閱讀 33,792評論 4 323
  • 正文 年R本政府宣布斤蔓,位于F島的核電站,受9級特大地震影響镀岛,放射性物質(zhì)發(fā)生泄漏弦牡。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,351評論 3 307
  • 文/蒙蒙 一漂羊、第九天 我趴在偏房一處隱蔽的房頂上張望驾锰。 院中可真熱鬧,春花似錦走越、人聲如沸椭豫。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,352評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽赏酥。三九已至,卻和暖如春谆构,著一層夾襖步出監(jiān)牢的瞬間裸扶,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,584評論 1 261
  • 我被黑心中介騙來泰國打工低淡, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留姓言,地道東北人。 一個月前我還...
    沈念sama閱讀 45,618評論 2 355
  • 正文 我出身青樓蔗蹋,卻偏偏與公主長得像何荚,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子猪杭,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,916評論 2 344

推薦閱讀更多精彩內(nèi)容