Flink源碼閱讀之Checkpoint執(zhí)行過程

前言

對應Flink來說checkpoint的作用及重要性就不細說了遂铡,前面文章寫過checkpoint的詳細過程checkpoint周期性觸發(fā)過程扒接。不熟悉checkpoint大概過程的同學可以查閱钾怔。
本篇我們在一起根據源碼看下checkpoint的詳細執(zhí)行過程宗侦。

checkpoint過程

源頭

我們都知道checkpoint的周期性觸發(fā)是由jobmanager中的一個叫做CheckpointCoordinator角色發(fā)起的矾利,具體執(zhí)行在CheckpointCoordinator.triggerCheckpoint中男旗,這個方法代碼邏輯很長察皇,概括一下主要包括:

  1. 預檢查什荣。包括
  • 是否需要強制進行 checkpoint
  • 當前正在排隊的并發(fā) checkpoint 的數(shù)目是否超過閾值
  • 距離上一次成功 checkpoint 的間隔時間是否過小
    如果上述條件不滿足則不會進行這次checkpoint溃睹。
  1. 檢查需要觸發(fā)的task是否都是running狀態(tài),否則放棄笔横。之前踩過坑吹缔,請見記一次flink不做checkpoint的問題厢塘。
  2. 檢查所有需要ack checkpoint完成的task是否都是running狀態(tài)晚碾。否則放棄格嘁。
    上面的檢查都通過之后就可以做checkpoint啦糕簿。
  3. 生成唯一自增的checkpointID懂诗。
  4. 初始化CheckpointStorageLocation殃恒,用于存儲這次checkpoint快照的路徑,不同的backend有區(qū)別界阁。
  5. 生成 PendingCheckpoint泡躯,這表示一個處于中間狀態(tài)的 checkpoint较剃,并保存在 checkpointId -> PendingCheckpoint 這樣的映射關系中写穴。
  6. 注冊一個調度任務啊送,在 checkpoint 超時后取消此次 checkpoint馋没,并重新觸發(fā)一次新的 checkpoint
  7. 調用 Execution.triggerCheckpoint() 方法向所有需要 trigger 的 task 發(fā)起 checkpoint 請求
for (Execution execution: executions) {
                if (props.isSynchronous()) {
                    execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
                } else {
                    execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
                }
            }

最終通過 RPC 調用 TaskExecutorGateway.triggerCheckpoint,即請求執(zhí)行 TaskExecutor.triggerCheckpoin()声旺。 因為一個 TaskExecutor 中可能有多個 Task 正在運行腮猖,因而要根據觸發(fā) checkpoint 的 ExecutionAttemptID 找到對應的 Task缚够,然后調用 Task.triggerCheckpointBarrier() 方法

private void triggerCheckpointHelper(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {

        final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
        if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
            throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
        }

        final LogicalSlot slot = assignedResource;

        if (slot != null) {
            final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

            taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime);
        } else {
            LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running.");
        }
    }
@Override
    public CompletableFuture<Acknowledge> triggerCheckpoint(
            ExecutionAttemptID executionAttemptID,
            long checkpointId,
            long checkpointTimestamp,
            CheckpointOptions checkpointOptions,
            boolean advanceToEndOfEventTime) {
        log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);

        final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
        if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
            throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
        }

        final Task task = taskSlotTable.getTask(executionAttemptID);

        if (task != null) {
            task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions, advanceToEndOfEventTime);

            return CompletableFuture.completedFuture(Acknowledge.get());
        } else {
            final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';

            log.debug(message);
            return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));
        }
    }

Task 執(zhí)行 checkpoint 的真正邏輯被封裝在 AbstractInvokable.triggerCheckpointAsync(...) 中古话,

public void triggerCheckpointBarrier(
            final long checkpointID,
            final long checkpointTimestamp,
            final CheckpointOptions checkpointOptions,
            final boolean advanceToEndOfEventTime) {

        final AbstractInvokable invokable = this.invokable;
        final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);

        if (executionState == ExecutionState.RUNNING && invokable != null) {
            try {
                invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
            }
            catch (RejectedExecutionException ex) {
                // This may happen if the mailbox is closed. It means that the task is shutting down, so we just ignore it.
                LOG.debug(
                    "Triggering checkpoint {} for {} ({}) was rejected by the mailbox",
                    checkpointID, taskNameWithSubtask, executionId);
            }
            catch (Throwable t) {
                if (getExecutionState() == ExecutionState.RUNNING) {
                    failExternally(new Exception(
                        "Error while triggering checkpoint " + checkpointID + " for " +
                            taskNameWithSubtask, t));
                } else {
                    LOG.debug("Encountered error while triggering checkpoint {} for " +
                        "{} ({}) while being not in state running.", checkpointID,
                        taskNameWithSubtask, executionId, t);
                }
            }
        }
        else {
            LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);

            // send back a message that we did not do the checkpoint
            checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
                    new CheckpointException("Task name with subtask : " + taskNameWithSubtask, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY));
        }
    }

triggerCheckpointAsync方法分別被SourceStreamTask和普通StreamTask覆蓋杖们,主要邏輯還是在StreamTask中

private boolean performCheckpoint(
            CheckpointMetaData checkpointMetaData,
            CheckpointOptions checkpointOptions,
            CheckpointMetrics checkpointMetrics,
            boolean advanceToEndOfTime) throws Exception {

        LOG.debug("Starting checkpoint ({}) {} on task {}",
            checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());

        final long checkpointId = checkpointMetaData.getCheckpointId();

        if (isRunning) {
            actionExecutor.runThrowing(() -> {

                if (checkpointOptions.getCheckpointType().isSynchronous()) {
                    setSynchronousSavepointId(checkpointId);

                    if (advanceToEndOfTime) {
                        advanceToEndOfEventTime();
                    }
                }

                // All of the following steps happen as an atomic step from the perspective of barriers and
                // records/watermarks/timers/callbacks.
                // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
                // checkpoint alignments

                // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
                //           The pre-barrier work should be nothing or minimal in the common case.
                operatorChain.prepareSnapshotPreBarrier(checkpointId);

                // Step (2): Send the checkpoint barrier downstream
                operatorChain.broadcastCheckpointBarrier(
                        checkpointId,
                        checkpointMetaData.getTimestamp(),
                        checkpointOptions);

                // Step (3): Take the state snapshot. This should be largely asynchronous, to not
                //           impact progress of the streaming topology
                checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);

            });

            return true;
        } else {
            actionExecutor.runThrowing(() -> {
                // we cannot perform our checkpoint - let the downstream operators know that they
                // should not wait for any input from this operator

                // we cannot broadcast the cancellation markers on the 'operator chain', because it may not
                // yet be created
                final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
                recordWriter.broadcastEvent(message);
            });

            return false;
        }
    }

主要做三件事:1)checkpoint的準備操作,這里通常不進行太多操作列粪;2)發(fā)送 CheckpointBarrier岂座;3)存儲檢查點快照费什。

廣播Barrier

public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
        CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
        for (RecordWriterOutput<?> streamOutput : streamOutputs) {
            streamOutput.broadcastEvent(barrier);
        }
    }

進行快照

private void checkpointState(
            CheckpointMetaData checkpointMetaData,
            CheckpointOptions checkpointOptions,
            CheckpointMetrics checkpointMetrics) throws Exception {
        //checkpoint的存儲地址及元數(shù)據信息
        CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(
                checkpointMetaData.getCheckpointId(),
                checkpointOptions.getTargetLocation());
        //將checkpoint的過程封裝為CheckpointingOperation對象
        CheckpointingOperation checkpointingOperation = new CheckpointingOperation(
            this,
            checkpointMetaData,
            checkpointOptions,
            storage,
            checkpointMetrics);

        checkpointingOperation.executeCheckpointing();
    }

每一個算子的快照被抽象為 OperatorSnapshotFutures,包含了 operator state 和 keyed state 的快照結果:

public class OperatorSnapshotFutures {

    @Nonnull
    private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateManagedFuture;

    @Nonnull
    private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateRawFuture;

    @Nonnull
    private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateManagedFuture;

    @Nonnull
    private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateRawFuture;
    }

由于每一個 StreamTask 可能包含多個算子氯质,因而內部使用一個 Map 維護 OperatorID -> OperatorSnapshotFutures 的關系。

        private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress;

快照的過程分同步和異步兩個部分

public void executeCheckpointing() throws Exception {
            startSyncPartNano = System.nanoTime();

            try {
            //同步
                for (StreamOperator<?> op : allOperators) {
                    checkpointStreamOperator(op);
                }

                if (LOG.isDebugEnabled()) {
                    LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",
                        checkpointMetaData.getCheckpointId(), owner.getName());
                }

                startAsyncPartNano = System.nanoTime();

                checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);

                // we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
                //異步
                // checkpoint 可以配置成同步執(zhí)行,也可以配置成異步執(zhí)行的
                // 如果是同步執(zhí)行的吴超,在這里實際上所有的 runnable future 都是已經完成的狀態(tài)
                AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
                    owner,
                    operatorSnapshotsInProgress,
                    checkpointMetaData,
                    checkpointMetrics,
                    startAsyncPartNano);

                owner.cancelables.registerCloseable(asyncCheckpointRunnable);
                owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);

                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} - finished synchronous part of checkpoint {}. " +
                            "Alignment duration: {} ms, snapshot duration {} ms",
                        owner.getName(), checkpointMetaData.getCheckpointId(),
                        checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
                        checkpointMetrics.getSyncDurationMillis());
                }
            } catch (Exception ex) {
                // Cleanup to release resources
                for (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values()) {
                    if (null != operatorSnapshotResult) {
                        try {
                            operatorSnapshotResult.cancel();
                        } catch (Exception e) {
                            LOG.warn("Could not properly cancel an operator snapshot result.", e);
                        }
                    }
                }

                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} - did NOT finish synchronous part of checkpoint {}. " +
                            "Alignment duration: {} ms, snapshot duration {} ms",
                        owner.getName(), checkpointMetaData.getCheckpointId(),
                        checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
                        checkpointMetrics.getSyncDurationMillis());
                }

                if (checkpointOptions.getCheckpointType().isSynchronous()) {
                    // in the case of a synchronous checkpoint, we always rethrow the exception,
                    // so that the task fails.
                    // this is because the intention is always to stop the job after this checkpointing
                    // operation, and without the failure, the task would go back to normal execution.
                    throw ex;
                } else {
                    owner.getEnvironment().declineCheckpoint(checkpointMetaData.getCheckpointId(), ex);
                }
            }
        }

在同步執(zhí)行階段缨睡,會依次調用每一個算子的 StreamOperator.snapshotState奖年,返回結果是一個 runnable future陋守。根據 checkpoint 配置成同步模式和異步模式的區(qū)別,這個 future 可能處于完成狀態(tài)媚送,也可能處于未完成狀態(tài):

private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
            if (null != op) {
//同步過程調用算子的snapshotState方法季希,返回OperatorSnapshotFutures可能已完成或未完成
                OperatorSnapshotFutures snapshotInProgress = op.snapshotState(
                        checkpointMetaData.getCheckpointId(),
                        checkpointMetaData.getTimestamp(),
                        checkpointOptions,
                        storageLocation);
                operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress);
            }
        }

詳細過程在AbstractStreamOperator#snapshotState

public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,
            CheckpointStreamFactory factory) throws Exception {

        KeyGroupRange keyGroupRange = null != keyedStateBackend ?
                keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;

        OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();

        StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
            checkpointId,
            timestamp,
            factory,
            keyGroupRange,
            getContainingTask().getCancelables());

        try {
            //對狀態(tài)進行快照,包括KeyedState和OperatorState
            snapshotState(snapshotContext);

            snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
            snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());

            //寫入operatorState快照
            if (null != operatorStateBackend) {
                snapshotInProgress.setOperatorStateManagedFuture(
                    operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }
            //寫入keyedState快照    
            if (null != keyedStateBackend) {
                snapshotInProgress.setKeyedStateManagedFuture(
                    keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }
        } catch (Exception snapshotException) {
            try {
                snapshotInProgress.cancel();
            } catch (Exception e) {
                snapshotException.addSuppressed(e);
            }

            String snapshotFailMessage = "Could not complete snapshot " + checkpointId + " for operator " +
                getOperatorName() + ".";

            if (!getContainingTask().isCanceled()) {
                LOG.info(snapshotFailMessage, snapshotException);
            }
            try {
                snapshotContext.closeExceptionally();
            } catch (IOException e) {
                snapshotException.addSuppressed(e);
            }
            throw new CheckpointException(snapshotFailMessage, CheckpointFailureReason.CHECKPOINT_DECLINED, snapshotException);
        }

        return snapshotInProgress;
    }

我們知道state還分為raw state(原生state)和managed state(flink管理的state),timer定時器屬于raw state武学,也需要寫到snapshot中火窒。

/**
     * Stream operators with state, which want to participate in a snapshot need to override this hook method.
     *
     * @param context context that provides information and means required for taking a snapshot
     */
    public void snapshotState(StateSnapshotContext context) throws Exception {
        final KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
        //TODO all of this can be removed once heap-based timers are integrated with RocksDB incremental snapshots
        // 所有的 timer 都作為 raw keyed state 寫入
        if (keyedStateBackend instanceof AbstractKeyedStateBackend &&
            ((AbstractKeyedStateBackend<?>) keyedStateBackend).requiresLegacySynchronousTimerSnapshots()) {

            KeyedStateCheckpointOutputStream out;

            try {
                out = context.getRawKeyedOperatorStateOutput();
            } catch (Exception exception) {
                throw new Exception("Could not open raw keyed operator state stream for " +
                    getOperatorName() + '.', exception);
            }

            try {
                KeyGroupsList allKeyGroups = out.getKeyGroupList();
                for (int keyGroupIdx : allKeyGroups) {
                    out.startNewKeyGroup(keyGroupIdx);

                    timeServiceManager.snapshotStateForKeyGroup(
                        new DataOutputViewStreamWrapper(out), keyGroupIdx);
                }
            } catch (Exception exception) {
                throw new Exception("Could not write timer service of " + getOperatorName() +
                    " to checkpoint state stream.", exception);
            } finally {
                try {
                    out.close();
                } catch (Exception closeException) {
                    LOG.warn("Could not close raw keyed operator state stream for {}. This " +
                        "might have prevented deleting some state data.", getOperatorName(), closeException);
                }
            }
        }
    }

上面是AbstractStreamOperator中的snapshotState做的操作,還有個子類AbstractUdfStreamOperator

public void snapshotState(StateSnapshotContext context) throws Exception {
        //先調用父類方法票编,寫入timer
        super.snapshotState(context);
        StreamingFunctionUtils.snapshotFunctionState(context, getOperatorStateBackend(), userFunction);
    }
public static void snapshotFunctionState(
            StateSnapshotContext context,
            OperatorStateBackend backend,
            Function userFunction) throws Exception {

        Preconditions.checkNotNull(context);
        Preconditions.checkNotNull(backend);

        while (true) {

            if (trySnapshotFunctionState(context, backend, userFunction)) {
                break;
            }

            // inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function
            if (userFunction instanceof WrappingFunction) {
                userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
            } else {
                break;
            }
        }
    }


private static boolean trySnapshotFunctionState(
            StateSnapshotContext context,
            OperatorStateBackend backend,
            Function userFunction) throws Exception {
        //如果用戶函數(shù)實現(xiàn)了CheckpointedFunction接口,則調用udf中的snapshotState方法進行快照
        if (userFunction instanceof CheckpointedFunction) {
            ((CheckpointedFunction) userFunction).snapshotState(context);

            return true;
        }
        // 如果用戶函數(shù)實現(xiàn)了 ListCheckpointed
        if (userFunction instanceof ListCheckpointed) {
        //先調用 snapshotState 方法獲取當前狀態(tài)
            @SuppressWarnings("unchecked")
            List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction).
                    snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());
            //獲取狀態(tài)后端存儲引用
            ListState<Serializable> listState = backend.
                    getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
            //清空
            listState.clear();
            //當前狀態(tài)寫入狀態(tài)后端存儲
            if (null != partitionableState) {
                try {
                    for (Serializable statePartition : partitionableState) {
                        listState.add(statePartition);
                    }
                } catch (Exception e) {
                    listState.clear();

                    throw new Exception("Could not write partitionable state to operator " +
                        "state backend.", e);
                }
            }

            return true;
        }

        return false;
    }

到這里我們知道了checkpoint過程中如何調用到我們自己實現(xiàn)的快照方法。再看下flink管理的狀態(tài)是如何寫入快照的互订。

            if (null != operatorStateBackend) {
                snapshotInProgress.setOperatorStateManagedFuture(
                    operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }
            
            if (null != keyedStateBackend) {
                snapshotInProgress.setKeyedStateManagedFuture(
                    keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }

首先來看看 operator state屁奏。DefaultOperatorStateBackend 將實際的工作交給 DefaultOperatorStateBackendSnapshotStrategy 完成坟瓢。首先折联,會為對當前注冊的所有 operator state(包含 list state 和 broadcast state)做深度拷貝诚镰,然后將實際的寫入操作封裝在一個異步的 FutureTask 中清笨,這個 FutureTask 的主要任務包括: 1)打開輸出流 2)寫入狀態(tài)元數(shù)據信息 3)寫入狀態(tài) 4)關閉輸出流抠艾,獲得狀態(tài)句柄检号。如果不啟用異步checkpoint模式齐苛,那么這個 FutureTask 在同步階段就會立刻執(zhí)行凹蜂。

public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
        final long checkpointId,
        final long timestamp,
        @Nonnull final CheckpointStreamFactory streamFactory,
        @Nonnull final CheckpointOptions checkpointOptions) throws IOException {

        if (registeredOperatorStates.isEmpty() && registeredBroadcastStates.isEmpty()) {
            return DoneFuture.of(SnapshotResult.empty());
        }

        final Map<String, PartitionableListState<?>> registeredOperatorStatesDeepCopies =
            new HashMap<>(registeredOperatorStates.size());
        final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStatesDeepCopies =
            new HashMap<>(registeredBroadcastStates.size());

        ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(userClassLoader);
        try {
            // eagerly create deep copies of the list and the broadcast states (if any)
            // in the synchronous phase, so that we can use them in the async writing.
            //獲得已注冊的所有 list state 和 broadcast state 的深拷貝
            if (!registeredOperatorStates.isEmpty()) {
                for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStates.entrySet()) {
                    PartitionableListState<?> listState = entry.getValue();
                    if (null != listState) {
                        listState = listState.deepCopy();
                    }
                    registeredOperatorStatesDeepCopies.put(entry.getKey(), listState);
                }
            }

            if (!registeredBroadcastStates.isEmpty()) {
                for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry : registeredBroadcastStates.entrySet()) {
                    BackendWritableBroadcastState<?, ?> broadcastState = entry.getValue();
                    if (null != broadcastState) {
                        broadcastState = broadcastState.deepCopy();
                    }
                    registeredBroadcastStatesDeepCopies.put(entry.getKey(), broadcastState);
                }
            }
        } finally {
            Thread.currentThread().setContextClassLoader(snapshotClassLoader);
        }
//將主要寫入操作封裝為一個異步的FutureTask
        AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>> snapshotCallable =
            new AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>>() {

                @Override
                protected SnapshotResult<OperatorStateHandle> callInternal() throws Exception {
                    // 創(chuàng)建狀態(tài)輸出流
                    CheckpointStreamFactory.CheckpointStateOutputStream localOut =
                        streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
                    snapshotCloseableRegistry.registerCloseable(localOut);
                    // 收集元數(shù)據
                    // get the registered operator state infos ...
                    List<StateMetaInfoSnapshot> operatorMetaInfoSnapshots =
                        new ArrayList<>(registeredOperatorStatesDeepCopies.size());

                    for (Map.Entry<String, PartitionableListState<?>> entry :
                        registeredOperatorStatesDeepCopies.entrySet()) {
                        operatorMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
                    }
                    // 寫入元數(shù)據
                    // ... get the registered broadcast operator state infos ...
                    List<StateMetaInfoSnapshot> broadcastMetaInfoSnapshots =
                        new ArrayList<>(registeredBroadcastStatesDeepCopies.size());

                    for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :
                        registeredBroadcastStatesDeepCopies.entrySet()) {
                        broadcastMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
                    }
        
                    // ... write them all in the checkpoint stream ...
                    DataOutputView dov = new DataOutputViewStreamWrapper(localOut);

                    OperatorBackendSerializationProxy backendSerializationProxy =
                        new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);

                    backendSerializationProxy.write(dov);

                    // ... and then go for the states ...
                    // 寫入狀態(tài)
                    // we put BOTH normal and broadcast state metadata here
                    int initialMapCapacity =
                        registeredOperatorStatesDeepCopies.size() + registeredBroadcastStatesDeepCopies.size();
                    final Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =
                        new HashMap<>(initialMapCapacity);

                    for (Map.Entry<String, PartitionableListState<?>> entry :
                        registeredOperatorStatesDeepCopies.entrySet()) {

                        PartitionableListState<?> value = entry.getValue();
                        long[] partitionOffsets = value.write(localOut);
                        OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
                        writtenStatesMetaData.put(
                            entry.getKey(),
                            new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
                    }

                    // ... and the broadcast states themselves ...
                    for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :
                        registeredBroadcastStatesDeepCopies.entrySet()) {

                        BackendWritableBroadcastState<?, ?> value = entry.getValue();
                        long[] partitionOffsets = {value.write(localOut)};
                        OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
                        writtenStatesMetaData.put(
                            entry.getKey(),
                            new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
                    }

                    // ... and, finally, create the state handle.
                    OperatorStateHandle retValue = null;

                    if (snapshotCloseableRegistry.unregisterCloseable(localOut)) {
                        //關閉輸出流菱父,獲得狀態(tài)句柄浙宜,后面可以用這個句柄讀取狀態(tài)
                        StreamStateHandle stateHandle = localOut.closeAndGetHandle();

                        if (stateHandle != null) {
                            retValue = new OperatorStreamStateHandle(writtenStatesMetaData, stateHandle);
                        }

                        return SnapshotResult.of(retValue);
                    } else {
                        throw new IOException("Stream was already unregistered.");
                    }
                }

                @Override
                protected void cleanupProvidedResources() {
                    // nothing to do
                }

                @Override
                protected void logAsyncSnapshotComplete(long startTime) {
                    if (asynchronousSnapshots) {
                        logAsyncCompleted(streamFactory, startTime);
                    }
                }
            };

        final FutureTask<SnapshotResult<OperatorStateHandle>> task =
            snapshotCallable.toAsyncSnapshotFutureTask(closeStreamOnCancelRegistry);
//如果不是異步 checkpoint 那么在這里直接運行 FutureTask粟瞬,即在同步階段就完成了狀態(tài)的寫入
        if (!asynchronousSnapshots) {
            task.run();
        }

        return task;
    }

keyed state 寫入的基本流程與此相似裙品,但由于 keyed state 在存儲時有多種實現(xiàn),包括基于堆內存和 RocksDB 的不同實現(xiàn)辛慰,此外基于 RocksDB 的實現(xiàn)還包括支持增量 checkpoint帅腌,因而相比于 operator state 要更復雜一些速客。

至此五鲫,我們介紹了快照操作的第一個階段,即同步執(zhí)行的階段辅愿。異步執(zhí)行階段被封裝為 AsyncCheckpointRunnable,主要的操作包括 1)執(zhí)行同步階段創(chuàng)建的 FutureTask 2)完成后向 CheckpointCoordinator 發(fā)送 Ack 響應癞埠。

protected static final class AsyncCheckpointRunnable implements Runnable, Closeable {
        @Override
        public void run() {
            FileSystemSafetyNet.initializeSafetyNetForThread();
            try {
                TaskStateSnapshot jobManagerTaskOperatorSubtaskStates =
                    new TaskStateSnapshot(operatorSnapshotsInProgress.size());
                TaskStateSnapshot localTaskOperatorSubtaskStates =
                    new TaskStateSnapshot(operatorSnapshotsInProgress.size());

                // 完成每一個 operator 的狀態(tài)寫入
                // 如果是同步 checkpoint苗踪,那么在此之前狀態(tài)已經寫入完成
                // 如果是異步 checkpoint通铲,那么在這里才會寫入狀態(tài)
                for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : operatorSnapshotsInProgress.entrySet()) {
                    OperatorID operatorID = entry.getKey();
                    OperatorSnapshotFutures snapshotInProgress = entry.getValue();
                    // finalize the async part of all by executing all snapshot runnables
                    OperatorSnapshotFinalizer finalizedSnapshots =
                        new OperatorSnapshotFinalizer(snapshotInProgress);

                    jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
                        operatorID,
                        finalizedSnapshots.getJobManagerOwnedState());

                    localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
                        operatorID,
                        finalizedSnapshots.getTaskLocalState());
                }

                final long asyncEndNanos = System.nanoTime();
                final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000L;

                checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);

                if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING,
                    CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {
                    //報告 snapshot 完成
                    reportCompletedSnapshotStates(
                        jobManagerTaskOperatorSubtaskStates,
                        localTaskOperatorSubtaskStates,
                        asyncDurationMillis);

                } else {
                    LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.",
                        owner.getName(),
                        checkpointMetaData.getCheckpointId());
                }
            } catch (Exception e) {
                handleExecutionException(e);
            } finally {
                owner.cancelables.unregisterCloseable(this);
                FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
            }
        }
    }

    private void reportCompletedSnapshotStates(
            TaskStateSnapshot acknowledgedTaskStateSnapshot,
            TaskStateSnapshot localTaskStateSnapshot,
            long asyncDurationMillis) {
            TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager();
            boolean hasAckState = acknowledgedTaskStateSnapshot.hasState();
            boolean hasLocalState = localTaskStateSnapshot.hasState();
            // we signal stateless tasks by reporting null, so that there are no attempts to assign empty state
            // to stateless tasks on restore. This enables simple job modifications that only concern
            // stateless without the need to assign them uids to match their (always empty) states.
            taskStateManager.reportTaskStateSnapshots(
                checkpointMetaData,
                checkpointMetrics,
                hasAckState ? acknowledgedTaskStateSnapshot : null,
                hasLocalState ? localTaskStateSnapshot : null);
        }
}

public class TaskStateManagerImpl implements TaskStateManager {
    @Override
    public void reportTaskStateSnapshots(
        @Nonnull CheckpointMetaData checkpointMetaData,
        @Nonnull CheckpointMetrics checkpointMetrics,
        @Nullable TaskStateSnapshot acknowledgedState,
        @Nullable TaskStateSnapshot localState) {

        long checkpointId = checkpointMetaData.getCheckpointId();

        localStateStore.storeLocalState(checkpointId, localState);

        //發(fā)送 ACK 響應給 CheckpointCoordinator
        checkpointResponder.acknowledgeCheckpoint(
            jobId,
            executionAttemptID,
            checkpointId,
            checkpointMetrics,
            acknowledgedState);
    }
}

Checkpoint 的確認

Task 對 checkpoint 的響應是通過 CheckpointResponder 接口完成的:

public interface CheckpointResponder {

    /**
     * Acknowledges the given checkpoint.
     */
    void acknowledgeCheckpoint(
        JobID jobID,
        ExecutionAttemptID executionAttemptID,
        long checkpointId,
        CheckpointMetrics checkpointMetrics,
        TaskStateSnapshot subtaskState);

    /**
     * Declines the given checkpoint.
     */
    void declineCheckpoint(
        JobID jobID,
        ExecutionAttemptID executionAttemptID,
        long checkpointId,
        Throwable cause);
}

RpcCheckpointResponder 作為 CheckpointResponder 的具體實現(xiàn)吧黄,主要是通過 RPC 調用通知 CheckpointCoordinatorGateway拗慨,即通知給 JobMaster, JobMaster 調用 CheckpointCoordinator.receiveAcknowledgeMessage() 和 CheckpointCoordinator.receiveDeclineMessage() 進行處理赵抢。

確認完成

在一個 Task 完成 checkpoint 操作后昌讲,CheckpointCoordinator 接收到 Ack 響應短绸,對 Ack 響應的處理流程主要如下:

  • 根據 Ack 的 checkpointID 從 Map<Long, PendingCheckpoint> pendingCheckpoints 中查找對應的 PendingCheckpoint
  • 若存在對應的 PendingCheckpoint
    • 這個 PendingCheckpoint 沒有被丟棄醋闭,調用 PendingCheckpoint.acknowledgeTask 方法處理 Ack证逻,根據處理結果的不同:
      • SUCCESS:判斷是否已經接受了所有需要響應的 Ack囚企,如果是龙宏,則調用 completePendingCheckpoint 完成此次 checkpoint
      • DUPLICATE:Ack 消息重復接收银酗,直接忽略
      • UNKNOWN:未知的 Ack 消息,清理上報的 Ack 中攜帶的狀態(tài)句柄
      • DISCARD:Checkpoint 已經被 discard蛙讥,清理上報的 Ack 中攜帶的狀態(tài)句柄
    • 這個 PendingCheckpoint 已經被丟棄次慢,拋出異常
  • 若不存在對應的 PendingCheckpoint经备,則清理上報的 Ack 中攜帶的狀態(tài)句柄
    相應代碼:
class CheckpointCoordinator {
    public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException {
        if (shutdown || message == null) {
            return false;
        }

        if (!job.equals(message.getJob())) {
            LOG.error("Received wrong AcknowledgeCheckpoint message for job {}: {}", job, message);
            return false;
        }

        final long checkpointId = message.getCheckpointId();

        synchronized (lock) {
            // we need to check inside the lock for being shutdown as well, otherwise we
            // get races and invalid error log messages
            if (shutdown) {
                return false;
            }

            final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);

            if (checkpoint != null && !checkpoint.isDiscarded()) {

                switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) {
                    case SUCCESS:
                        LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {}.",
                            checkpointId, message.getTaskExecutionId(), message.getJob());

                        if (checkpoint.isFullyAcknowledged()) {
                            completePendingCheckpoint(checkpoint);
                        }
                        break;
                    case DUPLICATE:
                        LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}.",
                            message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
                        break;
                    case UNKNOWN:
                        LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
                                "because the task's execution attempt id was unknown. Discarding " +
                                "the state handle to avoid lingering state.", message.getCheckpointId(),
                            message.getTaskExecutionId(), message.getJob());

                        discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());

                        break;
                    case DISCARDED:
                        LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
                            "because the pending checkpoint had been discarded. Discarding the " +
                                "state handle tp avoid lingering state.",
                            message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());

                        discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
                }

                return true;
            }
            else if (checkpoint != null) {
                // this should not happen
                throw new IllegalStateException(
                        "Received message for discarded but non-removed checkpoint " + checkpointId);
            }
            else {
                boolean wasPendingCheckpoint;
                // message is for an unknown checkpoint, or comes too late (checkpoint disposed)
                if (recentPendingCheckpoints.contains(checkpointId)) {
                    wasPendingCheckpoint = true;
                    LOG.warn("Received late message for now expired checkpoint attempt {} from " +
                        "{} of job {}.", checkpointId, message.getTaskExecutionId(), message.getJob());
                }
                else {
                    LOG.debug("Received message for an unknown checkpoint {} from {} of job {}.",
                        checkpointId, message.getTaskExecutionId(), message.getJob());
                    wasPendingCheckpoint = false;
                }

                // try to discard the state so that we don't have lingering state lying around
                discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());

                return wasPendingCheckpoint;
            }
        }
    }
}

對于一個已經觸發(fā)但還沒有完成的 checkpoint,即 PendingCheckpoint犁功,它是如何處理 Ack 消息的呢婚夫?在 PendingCheckpoint 內部維護了兩個 Map案糙,分別是:

  • Map<OperatorID, OperatorState> operatorStates; : 已經接收到 Ack 的算子的狀態(tài)句柄
  • Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks;: 需要 Ack 但還沒有接收到的 Task

每當接收到一個 Ack 消息時时捌,PendingCheckpoint 就從 notYetAcknowledgedTasks 中移除對應的 Task奢讨,并保存 Ack 攜帶的狀態(tài)句柄保存拿诸。當 notYetAcknowledgedTasks 為空時亩码,表明所有的 Ack 消息都接收到了蟀伸。

一旦 PendingCheckpoint 確認所有 Ack 消息都已經接收蚀同,那么就可以完成此次 checkpoint 了,具體包括:

  • 調用 PendingCheckpoint.finalizeCheckpoint() 將 PendingCheckpoint 轉化為 CompletedCheckpoint
    • 獲取 CheckpointMetadataOutputStream啊掏,將所有的狀態(tài)句柄信息通過 CheckpointMetadataOutputStream 寫入到存儲系統(tǒng)中
    • 創(chuàng)建一個 CompletedCheckpoint 對象
  • 將 CompletedCheckpoint 保存到 CompletedCheckpointStore 中
    • CompletedCheckpointStore 有兩種實現(xiàn)蠢络,分別為 StandaloneCompletedCheckpointStore 和 ZooKeeperCompletedCheckpointStore
    • StandaloneCompletedCheckpointStore 簡單地將 CompletedCheckpointStore 存放在一個數(shù)組中
    • ZooKeeperCompletedCheckpointStore 提供高可用實現(xiàn):先將 CompletedCheckpointStore 寫入到 RetrievableStateStorageHelper 中(通常是文件系統(tǒng)),然后將文件句柄存在 ZK 中
    • 保存的 CompletedCheckpointStore 數(shù)量是有限的迟蜜,會刪除舊的快照
  • 移除被越過的 PendingCheckpoint刹孔,因為 CheckpointID 是遞增的,那么所有比當前完成的 CheckpointID 小的 PendingCheckpoint 都可以被丟棄了
  • 依次調用 Execution.notifyCheckpointComplete() 通知所有的 Task 當前 Checkpoint 已經完成
    • 通過 RPC 調用 TaskExecutor.confirmCheckpoint() 告知對應的 Task

Task收到notifyCheckpointComplete確認后進行后續(xù)處理髓霞,比如kafkaproduce的兩段式提交過程。

總結

本文分析了checkpoint進行snapshot的過程畦戒,包括廣播barrier方库、進行snapshot以及checkpoint完成后的ACK過程。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末障斋,一起剝皮案震驚了整個濱河市纵潦,隨后出現(xiàn)的幾起案子徐鹤,更是在濱河造成了極大的恐慌,老刑警劉巖邀层,帶你破解...
    沈念sama閱讀 219,490評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件返敬,死亡現(xiàn)場離奇詭異,居然都是意外死亡寥院,警方通過查閱死者的電腦和手機劲赠,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,581評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來秸谢,“玉大人凛澎,你說我怎么就攤上這事。” “怎么了?”我有些...
    開封第一講書人閱讀 165,830評論 0 356
  • 文/不壞的土叔 我叫張陵具练,是天一觀的道長氏豌。 經常有香客問我,道長,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,957評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮炭晒,結果婚禮上,老公的妹妹穿的比我還像新娘甥角。我一直安慰自己网严,他們只是感情好,可當我...
    茶點故事閱讀 67,974評論 6 393
  • 文/花漫 我一把揭開白布嗤无。 她就那樣靜靜地躺著震束,像睡著了一般。 火紅的嫁衣襯著肌膚如雪当犯。 梳的紋絲不亂的頭發(fā)上垢村,一...
    開封第一講書人閱讀 51,754評論 1 307
  • 那天,我揣著相機與錄音嚎卫,去河邊找鬼嘉栓。 笑死,一個胖子當著我的面吹牛拓诸,可吹牛的內容都是我干的侵佃。 我是一名探鬼主播,決...
    沈念sama閱讀 40,464評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼奠支,長吁一口氣:“原來是場噩夢啊……” “哼馋辈!你這毒婦竟也來了?” 一聲冷哼從身側響起胚宦,我...
    開封第一講書人閱讀 39,357評論 0 276
  • 序言:老撾萬榮一對情侶失蹤首有,失蹤者是張志新(化名)和其女友劉穎燕垃,沒想到半個月后枢劝,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體井联,經...
    沈念sama閱讀 45,847評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,995評論 3 338
  • 正文 我和宋清朗相戀三年您旁,在試婚紗的時候發(fā)現(xiàn)自己被綠了烙常。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,137評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡鹤盒,死狀恐怖蚕脏,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情侦锯,我是刑警寧澤驼鞭,帶...
    沈念sama閱讀 35,819評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站尺碰,受9級特大地震影響挣棕,放射性物質發(fā)生泄漏。R本人自食惡果不足惜亲桥,卻給世界環(huán)境...
    茶點故事閱讀 41,482評論 3 331
  • 文/蒙蒙 一洛心、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧题篷,春花似錦词身、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,023評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至葫笼,卻和暖如春深啤,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背渔欢。 一陣腳步聲響...
    開封第一講書人閱讀 33,149評論 1 272
  • 我被黑心中介騙來泰國打工墓塌, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人奥额。 一個月前我還...
    沈念sama閱讀 48,409評論 3 373
  • 正文 我出身青樓苫幢,卻偏偏與公主長得像,于是被迫代替她去往敵國和親垫挨。 傳聞我的和親對象是個殘疾皇子韩肝,可洞房花燭夜當晚...
    茶點故事閱讀 45,086評論 2 355