Flink源碼閱讀(五)--- checkpoint / savepoint恢復(fù)

Flink源碼閱讀(四)--- checkpoint制作這篇文章介紹了checkpoint制作原理缀雳,這篇文章在此基礎(chǔ)上,介紹下怎么從checkpoint/savepoint恢復(fù)穿撮。本文內(nèi)容是基于Flink 1.9來講解漱受。

1. 概述

作業(yè)從狀態(tài) checkpoint / savepoint 的情況簡單總結(jié)主要是兩種

  • 作業(yè)手動重啟精肃,從savepoint恢復(fù)
  • 作業(yè)運(yùn)行過程中贝咙,某個(gè)task執(zhí)行失敗,從checkpoint恢復(fù)

savepoint是一種人為主動觸發(fā)生成的checkpoint,所以checkpoint/savepoint 恢復(fù)的原理是一樣的阳仔。下面以工作中比較常見的某個(gè)task失敗豌拙,作業(yè)如何恢復(fù)為例進(jìn)行介紹弓叛。

2. 狀態(tài)分配

首先說明下Task的狀態(tài)state都有哪些镶苞,可以看ExecutionState.java類

    CREATED,
    
    SCHEDULED,
    
    DEPLOYING,
    
    RUNNING,

    /**
     * This state marks "successfully completed". It can only be reached when a
     * program reaches the "end of its input". The "end of input" can be reached
     * when consuming a bounded input (fix set of files, bounded query, etc) or
     * when stopping a program (not cancelling!) which make the input look like
     * it reached its end at a specific point.
     */
    FINISHED,
    
    CANCELING,
    
    CANCELED,
    
    FAILED,

    RECONCILING;

Task各個(gè)state的轉(zhuǎn)換關(guān)系如下:

 *     CREATED  -> SCHEDULED -> DEPLOYING -> RUNNING -> FINISHED
 *        |            |            |          |
 *        |            |            |   +------+
 *        |            |            V   V
 *        |            |         CANCELLING -----+----> CANCELED
 *        |            |                         |
 *        |            +-------------------------+
 *        |
 *        |                                   ... -> FAILED
 *        V
 *    RECONCILING  -> RUNNING | FINISHED | CANCELED | FAILED

Task進(jìn)行state轉(zhuǎn)換肝断,是調(diào)用的

Execution#transitionState --> vertex.notifyStateTransition --> getExecutionGraph().notifyExecutionChange

如果task變成FAILED蝶柿,就會調(diào)用

failoverStrategy.onTaskFailure --> AdaptedRestartPipelinedRegionStrategyNG#onTaskFailure --> restartTasks --> resetAndRescheduleTasks --> createResetAndRescheduleTasksCallback

這里restartTasks方法的參數(shù)是該P(yáng)ipeline上所有需要restart的task丈钙。

重點(diǎn)看下createResetAndRescheduleTasksCallback方法做了什么,看下源碼

LOG.info("Finally restart {} tasks to recover from task failure.", unmodifiedVertices.size());

// reset tasks to CREATED state and reload state
resetTasks(unmodifiedVertices, globalModVersion);

// re-schedule tasks
rescheduleTasks(unmodifiedVertices, globalModVersion);

做了兩件事情交汤,重置Tasks (狀態(tài)分配) 和 重新調(diào)度Tasks雏赦,下面介紹下重置Tasks方法

2.1 重置Tasks

第一步:為每個(gè)節(jié)點(diǎn)重置Execution

        for (ExecutionVertex ev : vertices) {
            CoLocationGroup cgroup = ev.getJobVertex().getCoLocationGroup();
            if (cgroup != null && !colGroups.contains(cgroup)){
                cgroup.resetConstraints();
                colGroups.add(cgroup);
            }

            ev.resetForNewExecution(restartTimestamp, globalModVersion);
        }

第二步:把pendingCheckpoints這個(gè)map中所有正在做的checkpoint fail掉

executionGraph.getCheckpointCoordinator().abortPendingCheckpoints(
                new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION));

第三步:從最近完成的checkpoint恢復(fù)state

executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedState(
                involvedExecutionJobVertices, false, true);

接下來重點(diǎn)看下第三步怎么從checkpoint恢復(fù)的?
2.1.1 首先找到最近完成的一個(gè)latestCheckpoint
?? 如果latestCheckpoint==null
?? ?? 如果 errorIfNoCheckpoint 開關(guān)為true芙扎,直接拋IllegalStateException
?? ?? 如果 errorIfNoCheckpoint 開關(guān)為false星岗,直接return
2.1.2 給Tasks分配states,stateAssignmentOperation.assignStates()戒洼,主要做了下面幾件事情:
?? 1. 對于checkpoint中所有的operatorStates俏橘,check在新tasks中是否都有對應(yīng)的operatorID。如果在新tasks中缺少operatorStates中某一個(gè)operatorID圈浇,(i) allowNonRestoredState==true, 跳過該operatorID (ii) allowNonRestoredState==false, 拋IllegalStateException異常寥掐。

checkStateMappingCompleteness(allowNonRestoredState, operatorStates, tasks);

?? 2. 遍歷所有的Tasks
(1) 對于每個(gè)Task的所有operator:(i) 如果在checkpoint中存在對應(yīng)的state,直接記錄在operatorStates list中磷蜀,(ii) 如果在checkpoint中沒有對應(yīng)的state召耘,就為該operatorID初始化一個(gè)OperatorState,并記錄在operatorStates list中褐隆。(iii) 對于并發(fā)度改變的縮擴(kuò)容情況污它,對state進(jìn)行重新分配,具體可以參考 state縮擴(kuò)容
?? 最終每個(gè)Task分配的狀態(tài)被封裝在 JobManagerTaskRestore 中衫贬,然后通過 Execution.setInitialState() 關(guān)聯(lián)到 Execution 中蜜宪。JobManagerTaskRestore 會作為 TaskDeploymentDescriptor 的一個(gè)屬性下發(fā)到 TaskExecutor 中。 縮擴(kuò)容state重新分配簡單總結(jié)如下:
?? Operator State:state存儲實(shí)現(xiàn)ListCheckpointed接口祥山,這種實(shí)現(xiàn)的優(yōu)點(diǎn)是可以對state根據(jù)并發(fā)方便重新分配圃验。用戶也可以重寫restore state邏輯。
?? Keyed State:Flink引入了Key Group的概念缝呕,將Key Group作為Keyed State的基本分配單元澳窑,如果并發(fā)度改變,就可以重新計(jì)算key group分配供常,然后分到不同的算子中摊聋。

?? (iiii)補(bǔ)充一點(diǎn),在對state重新分配的時(shí)候栈暇,會檢查新提交tasks的Parallelism與上次operatorStates的MaxParallelism的關(guān)系麻裁,源碼可參考 StateAssignmentOperation#checkParallelismPreconditions方法
?? 1. 如果 task的并發(fā)度 > checkpoint中operatorState的最大并發(fā)度, 就直接拋異常
?? 2. 如果 task的最大并發(fā)度 != operatorState的最大并發(fā)度
???? 2.1 如果 task的最大并發(fā)度沒有自己配置源祈,那把task的最大并發(fā)度就設(shè)置為operatorState的最大并發(fā)度
???? 2.2 如果自己配置了最大并發(fā)度煎源,就直接拋異常

        if (operatorState.getMaxParallelism() < executionJobVertex.getParallelism()) {
            throw new IllegalStateException(
                    "The state for task "
                            + executionJobVertex.getJobVertexId()
                            + " can not be restored. The maximum parallelism ("
                            + operatorState.getMaxParallelism()
                            + ") of the restored state is lower than the configured parallelism ("
                            + executionJobVertex.getParallelism()
                            + "). Please reduce the parallelism of the task to be lower or equal to the maximum parallelism.");
        }

        // check that the number of key groups have not changed or if we need to override it to
        // satisfy the restored state
        if (operatorState.getMaxParallelism() != executionJobVertex.getMaxParallelism()) {

            if (!executionJobVertex.isMaxParallelismConfigured()) {
                // if the max parallelism was not explicitly specified by the user, we derive it
                // from the state

                LOG.debug(
                        "Overriding maximum parallelism for JobVertex {} from {} to {}",
                        executionJobVertex.getJobVertexId(),
                        executionJobVertex.getMaxParallelism(),
                        operatorState.getMaxParallelism());

                executionJobVertex.setMaxParallelism(operatorState.getMaxParallelism());
            } else {
                // if the max parallelism was explicitly specified, we complain on mismatch
                throw new IllegalStateException(
                        "The maximum parallelism ("
                                + operatorState.getMaxParallelism()
                                + ") with which the latest "
                                + "checkpoint of the execution job vertex "
                                + executionJobVertex
                                + " has been taken and the current maximum parallelism ("
                                + executionJobVertex.getMaxParallelism()
                                + ") changed. This "
                                + "is currently not supported.");
            }
        }

至于operatorState的最大并發(fā)度怎么計(jì)算的,等于存儲operator對應(yīng)的ExecutionJobVertex的最大并發(fā)度香缺,ExecutionJobVertex的最大并發(fā)度可以參考ExecutionJobVertex類的構(gòu)造方法
?? 1. 如果task設(shè)置了最大并發(fā)度手销,就按照設(shè)置的來
?? 2. 如果task沒有設(shè)置最大并發(fā)度,就根據(jù)算子并發(fā)度來計(jì)算图张,可以參考 KeyGroupRangeAssignment#computeDefaultMaxParallelism方法锋拖,min(max(parallelism向上取整到2的最近冪, 2^7), 2^15)

    public static int computeDefaultMaxParallelism(int operatorParallelism) {

        checkParallelismPreconditions(operatorParallelism);

        return Math.min(
                Math.max(
                        MathUtils.roundUpToPowerOfTwo(
                                operatorParallelism + (operatorParallelism / 2)),
                        DEFAULT_LOWER_BOUND_MAX_PARALLELISM),
                UPPER_BOUND_MAX_PARALLELISM);
    }

至此祸轮,重置Tasks的邏輯大體就介紹完了兽埃。

2.2 調(diào)度Tasks

入口是AdaptedRestartPipelinedRegionStrategyNG#rescheduleTasks方法,真正開始執(zhí)行調(diào)度的是SchedulingUtils.schedule方法适袜。

關(guān)于task調(diào)度的內(nèi)容柄错,可以看下我之前寫的一篇文章 Flink作業(yè)提交(三)--- Job運(yùn)行, 調(diào)度分為兩步,申請slot和deploy task痪蝇。

在deploy task的時(shí)候鄙陡,首先會調(diào)用StreamTask.invoke()方法冕房,在invoke方法中躏啰,會對該Task中每個(gè)operator調(diào)用initializeState()方法,這里看下initializeState#initializeState源碼

    private void initializeState() throws Exception {

        StreamOperator<?>[] allOperators = operatorChain.getAllOperators();

        for (StreamOperator<?> operator : allOperators) {
            if (null != operator) {
                operator.initializeState();
            }
        }
    }

然后會調(diào)用AbstractStreamOperator#initializeState方法

    @Override
    public final void initializeState() throws Exception {

        final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());

        final StreamTask<?, ?> containingTask =
            Preconditions.checkNotNull(getContainingTask());
        final CloseableRegistry streamTaskCloseableRegistry =
            Preconditions.checkNotNull(containingTask.getCancelables());
        final StreamTaskStateInitializer streamTaskStateManager =
            Preconditions.checkNotNull(containingTask.createStreamTaskStateInitializer());

        final StreamOperatorStateContext context =
            streamTaskStateManager.streamOperatorStateContext(
                getOperatorID(),
                getClass().getSimpleName(),
                this,
                keySerializer,
                streamTaskCloseableRegistry,
                metrics);

        this.operatorStateBackend = context.operatorStateBackend();
        this.keyedStateBackend = context.keyedStateBackend();

        if (keyedStateBackend != null) {
            this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());
        }

        timeServiceManager = context.internalTimerServiceManager();

        CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = context.rawKeyedStateInputs();
        CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = context.rawOperatorStateInputs();

        try {
            StateInitializationContext initializationContext = new StateInitializationContextImpl(
                context.isRestored(), // information whether we restore or start for the first time
                operatorStateBackend, // access to operator state backend
                keyedStateStore, // access to keyed state backend
                keyedStateInputs, // access to keyed state stream
                operatorStateInputs); // access to operator state stream

            initializeState(initializationContext);
        } finally {
            closeFromRegistry(operatorStateInputs, streamTaskCloseableRegistry);
            closeFromRegistry(keyedStateInputs, streamTaskCloseableRegistry);
        }
    }
  • 上面提到 TaskExecutor 使用 TaskStateManager 來管理當(dāng)前 Task 的狀態(tài)耙册,TaskStateManager 對象會基于分配的 JobManagerTaskRestore 和本地狀態(tài)存儲 TaskLocalStateStore 進(jìn)行創(chuàng)建给僵。

  • 狀態(tài)初始化的關(guān)鍵方法在于通過 StreamTaskStateInitializer.streamOperatorStateContext() 生成 StreamOperatorStateContext,通過 StreamOperatorStateContext 可以獲取 operatorStateBackend,Raw State Streams帝际,operatorStateBackend以及timeServiceManager等蔓同,然后就可以進(jìn)行狀態(tài)恢復(fù)了。

  • 咱們接著看下StreamOperatorStateContext是怎么生成的蹲诀,具體實(shí)現(xiàn)可以看下 StreamTaskStateInitializerImpl#streamOperatorStateContext方法

        TaskInfo taskInfo = environment.getTaskInfo();
        OperatorSubtaskDescriptionText operatorSubtaskDescription =
            new OperatorSubtaskDescriptionText(
                operatorID,
                operatorClassName,
                taskInfo.getIndexOfThisSubtask(),
                taskInfo.getNumberOfParallelSubtasks());

        final String operatorIdentifierText = operatorSubtaskDescription.toString();

        final PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates =
            taskStateManager.prioritizedOperatorState(operatorID);

        AbstractKeyedStateBackend<?> keyedStatedBackend = null;
        OperatorStateBackend operatorStateBackend = null;
        CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = null;
        CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = null;
        InternalTimeServiceManager<?> timeServiceManager;

        try {

            // -------------- Keyed State Backend --------------
            keyedStatedBackend = keyedStatedBackend(
                keySerializer,
                operatorIdentifierText,
                prioritizedOperatorSubtaskStates,
                streamTaskCloseableRegistry,
                metricGroup);

            // -------------- Operator State Backend --------------
            operatorStateBackend = operatorStateBackend(
                operatorIdentifierText,
                prioritizedOperatorSubtaskStates,
                streamTaskCloseableRegistry);

            // -------------- Raw State Streams --------------
            rawKeyedStateInputs = rawKeyedStateInputs(
                prioritizedOperatorSubtaskStates.getPrioritizedRawKeyedState().iterator());
            streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);

            rawOperatorStateInputs = rawOperatorStateInputs(
                prioritizedOperatorSubtaskStates.getPrioritizedRawOperatorState().iterator());
            streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);

            // -------------- Internal Timer Service Manager --------------
            timeServiceManager = internalTimeServiceManager(keyedStatedBackend, keyContext, rawKeyedStateInputs);

            // -------------- Preparing return value --------------

            return new StreamOperatorStateContextImpl(
                prioritizedOperatorSubtaskStates.isRestored(),
                operatorStateBackend,
                keyedStatedBackend,
                timeServiceManager,
                rawOperatorStateInputs,
                rawKeyedStateInputs);
  • 為了生成 StreamOperatorStateContext
    ?? 1. 通過 TaskStateManager.prioritizedOperatorState() 方法獲得每個(gè) Operator 需要恢復(fù)的狀態(tài)句柄斑粱。
    ?? 2. 使用獲得的狀態(tài)句柄創(chuàng)建并還原 state backend 和 timer。這里引入了 PrioritizedOperatorSubtaskState脯爪,它封裝了多個(gè)備選的 OperatorSubtaskState快照则北,這些快照相互之間是可以(部分)替換的,并按照優(yōu)先級排序痕慢。

小結(jié)

本篇文章介紹了當(dāng)作業(yè)某些Task fail之后尚揣,Task狀態(tài)如何分配,以及調(diào)度Task怎么使用state進(jìn)行恢復(fù)掖举。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末快骗,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子塔次,更是在濱河造成了極大的恐慌方篮,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,402評論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件励负,死亡現(xiàn)場離奇詭異恭取,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)熄守,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評論 3 392
  • 文/潘曉璐 我一進(jìn)店門蜈垮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人裕照,你說我怎么就攤上這事攒发。” “怎么了晋南?”我有些...
    開封第一講書人閱讀 162,483評論 0 353
  • 文/不壞的土叔 我叫張陵惠猿,是天一觀的道長。 經(jīng)常有香客問我负间,道長偶妖,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,165評論 1 292
  • 正文 為了忘掉前任政溃,我火速辦了婚禮趾访,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘董虱。我一直安慰自己扼鞋,他們只是感情好申鱼,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,176評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著云头,像睡著了一般捐友。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上溃槐,一...
    開封第一講書人閱讀 51,146評論 1 297
  • 那天匣砖,我揣著相機(jī)與錄音,去河邊找鬼昏滴。 笑死脆粥,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的影涉。 我是一名探鬼主播变隔,決...
    沈念sama閱讀 40,032評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼蟹倾!你這毒婦竟也來了匣缘?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,896評論 0 274
  • 序言:老撾萬榮一對情侶失蹤鲜棠,失蹤者是張志新(化名)和其女友劉穎肌厨,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體豁陆,經(jīng)...
    沈念sama閱讀 45,311評論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡柑爸,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,536評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了盒音。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片表鳍。...
    茶點(diǎn)故事閱讀 39,696評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖祥诽,靈堂內(nèi)的尸體忽然破棺而出譬圣,到底是詐尸還是另有隱情,我是刑警寧澤雄坪,帶...
    沈念sama閱讀 35,413評論 5 343
  • 正文 年R本政府宣布厘熟,位于F島的核電站,受9級特大地震影響维哈,放射性物質(zhì)發(fā)生泄漏绳姨。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,008評論 3 325
  • 文/蒙蒙 一阔挠、第九天 我趴在偏房一處隱蔽的房頂上張望飘庄。 院中可真熱鬧,春花似錦谒亦、人聲如沸竭宰。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽切揭。三九已至,卻和暖如春锁摔,著一層夾襖步出監(jiān)牢的瞬間廓旬,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,815評論 1 269
  • 我被黑心中介騙來泰國打工谐腰, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留孕豹,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,698評論 2 368
  • 正文 我出身青樓十气,卻偏偏與公主長得像励背,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子砸西,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,592評論 2 353

推薦閱讀更多精彩內(nèi)容