前言
對應Flink來說checkpoint的作用及重要性就不細說了遂铡,前面文章寫過checkpoint的詳細過程和checkpoint周期性觸發(fā)過程扒接。不熟悉checkpoint大概過程的同學可以查閱钾怔。
本篇我們在一起根據源碼看下checkpoint的詳細執(zhí)行過程宗侦。
checkpoint過程
源頭
我們都知道checkpoint的周期性觸發(fā)是由jobmanager中的一個叫做CheckpointCoordinator角色發(fā)起的矾利,具體執(zhí)行在CheckpointCoordinator.triggerCheckpoint中男旗,這個方法代碼邏輯很長察皇,概括一下主要包括:
- 預檢查什荣。包括
- 是否需要強制進行 checkpoint
- 當前正在排隊的并發(fā) checkpoint 的數(shù)目是否超過閾值
- 距離上一次成功 checkpoint 的間隔時間是否過小
如果上述條件不滿足則不會進行這次checkpoint溃睹。
- 檢查需要觸發(fā)的task是否都是running狀態(tài),否則放棄笔横。之前踩過坑吹缔,請見記一次flink不做checkpoint的問題厢塘。
- 檢查所有需要ack checkpoint完成的task是否都是running狀態(tài)晚碾。否則放棄格嘁。
上面的檢查都通過之后就可以做checkpoint啦糕簿。 - 生成唯一自增的checkpointID懂诗。
- 初始化CheckpointStorageLocation殃恒,用于存儲這次checkpoint快照的路徑,不同的backend有區(qū)別界阁。
- 生成 PendingCheckpoint泡躯,這表示一個處于中間狀態(tài)的 checkpoint较剃,并保存在 checkpointId -> PendingCheckpoint 這樣的映射關系中写穴。
- 注冊一個調度任務啊送,在 checkpoint 超時后取消此次 checkpoint馋没,并重新觸發(fā)一次新的 checkpoint
- 調用 Execution.triggerCheckpoint() 方法向所有需要 trigger 的 task 發(fā)起 checkpoint 請求
for (Execution execution: executions) {
if (props.isSynchronous()) {
execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
} else {
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
}
最終通過 RPC 調用 TaskExecutorGateway.triggerCheckpoint,即請求執(zhí)行 TaskExecutor.triggerCheckpoin()声旺。 因為一個 TaskExecutor 中可能有多個 Task 正在運行腮猖,因而要根據觸發(fā) checkpoint 的 ExecutionAttemptID 找到對應的 Task缚够,然后調用 Task.triggerCheckpointBarrier() 方法
private void triggerCheckpointHelper(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {
final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
}
final LogicalSlot slot = assignedResource;
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime);
} else {
LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running.");
}
}
@Override
public CompletableFuture<Acknowledge> triggerCheckpoint(
ExecutionAttemptID executionAttemptID,
long checkpointId,
long checkpointTimestamp,
CheckpointOptions checkpointOptions,
boolean advanceToEndOfEventTime) {
log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);
final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
}
final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {
task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions, advanceToEndOfEventTime);
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';
log.debug(message);
return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));
}
}
Task 執(zhí)行 checkpoint 的真正邏輯被封裝在 AbstractInvokable.triggerCheckpointAsync(...) 中古话,
public void triggerCheckpointBarrier(
final long checkpointID,
final long checkpointTimestamp,
final CheckpointOptions checkpointOptions,
final boolean advanceToEndOfEventTime) {
final AbstractInvokable invokable = this.invokable;
final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
if (executionState == ExecutionState.RUNNING && invokable != null) {
try {
invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
}
catch (RejectedExecutionException ex) {
// This may happen if the mailbox is closed. It means that the task is shutting down, so we just ignore it.
LOG.debug(
"Triggering checkpoint {} for {} ({}) was rejected by the mailbox",
checkpointID, taskNameWithSubtask, executionId);
}
catch (Throwable t) {
if (getExecutionState() == ExecutionState.RUNNING) {
failExternally(new Exception(
"Error while triggering checkpoint " + checkpointID + " for " +
taskNameWithSubtask, t));
} else {
LOG.debug("Encountered error while triggering checkpoint {} for " +
"{} ({}) while being not in state running.", checkpointID,
taskNameWithSubtask, executionId, t);
}
}
}
else {
LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
// send back a message that we did not do the checkpoint
checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
new CheckpointException("Task name with subtask : " + taskNameWithSubtask, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY));
}
}
triggerCheckpointAsync方法分別被SourceStreamTask和普通StreamTask覆蓋杖们,主要邏輯還是在StreamTask中
private boolean performCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetrics checkpointMetrics,
boolean advanceToEndOfTime) throws Exception {
LOG.debug("Starting checkpoint ({}) {} on task {}",
checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());
final long checkpointId = checkpointMetaData.getCheckpointId();
if (isRunning) {
actionExecutor.runThrowing(() -> {
if (checkpointOptions.getCheckpointType().isSynchronous()) {
setSynchronousSavepointId(checkpointId);
if (advanceToEndOfTime) {
advanceToEndOfEventTime();
}
}
// All of the following steps happen as an atomic step from the perspective of barriers and
// records/watermarks/timers/callbacks.
// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
// checkpoint alignments
// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
// The pre-barrier work should be nothing or minimal in the common case.
operatorChain.prepareSnapshotPreBarrier(checkpointId);
// Step (2): Send the checkpoint barrier downstream
operatorChain.broadcastCheckpointBarrier(
checkpointId,
checkpointMetaData.getTimestamp(),
checkpointOptions);
// Step (3): Take the state snapshot. This should be largely asynchronous, to not
// impact progress of the streaming topology
checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
});
return true;
} else {
actionExecutor.runThrowing(() -> {
// we cannot perform our checkpoint - let the downstream operators know that they
// should not wait for any input from this operator
// we cannot broadcast the cancellation markers on the 'operator chain', because it may not
// yet be created
final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
recordWriter.broadcastEvent(message);
});
return false;
}
}
主要做三件事:1)checkpoint的準備操作,這里通常不進行太多操作列粪;2)發(fā)送 CheckpointBarrier岂座;3)存儲檢查點快照费什。
廣播Barrier
public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
for (RecordWriterOutput<?> streamOutput : streamOutputs) {
streamOutput.broadcastEvent(barrier);
}
}
進行快照
private void checkpointState(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetrics checkpointMetrics) throws Exception {
//checkpoint的存儲地址及元數(shù)據信息
CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(
checkpointMetaData.getCheckpointId(),
checkpointOptions.getTargetLocation());
//將checkpoint的過程封裝為CheckpointingOperation對象
CheckpointingOperation checkpointingOperation = new CheckpointingOperation(
this,
checkpointMetaData,
checkpointOptions,
storage,
checkpointMetrics);
checkpointingOperation.executeCheckpointing();
}
每一個算子的快照被抽象為 OperatorSnapshotFutures,包含了 operator state 和 keyed state 的快照結果:
public class OperatorSnapshotFutures {
@Nonnull
private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateManagedFuture;
@Nonnull
private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateRawFuture;
@Nonnull
private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateManagedFuture;
@Nonnull
private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateRawFuture;
}
由于每一個 StreamTask 可能包含多個算子氯质,因而內部使用一個 Map 維護 OperatorID -> OperatorSnapshotFutures 的關系。
private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress;
快照的過程分同步和異步兩個部分
public void executeCheckpointing() throws Exception {
startSyncPartNano = System.nanoTime();
try {
//同步
for (StreamOperator<?> op : allOperators) {
checkpointStreamOperator(op);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",
checkpointMetaData.getCheckpointId(), owner.getName());
}
startAsyncPartNano = System.nanoTime();
checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);
// we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
//異步
// checkpoint 可以配置成同步執(zhí)行,也可以配置成異步執(zhí)行的
// 如果是同步執(zhí)行的吴超,在這里實際上所有的 runnable future 都是已經完成的狀態(tài)
AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
owner,
operatorSnapshotsInProgress,
checkpointMetaData,
checkpointMetrics,
startAsyncPartNano);
owner.cancelables.registerCloseable(asyncCheckpointRunnable);
owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);
if (LOG.isDebugEnabled()) {
LOG.debug("{} - finished synchronous part of checkpoint {}. " +
"Alignment duration: {} ms, snapshot duration {} ms",
owner.getName(), checkpointMetaData.getCheckpointId(),
checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
checkpointMetrics.getSyncDurationMillis());
}
} catch (Exception ex) {
// Cleanup to release resources
for (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values()) {
if (null != operatorSnapshotResult) {
try {
operatorSnapshotResult.cancel();
} catch (Exception e) {
LOG.warn("Could not properly cancel an operator snapshot result.", e);
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("{} - did NOT finish synchronous part of checkpoint {}. " +
"Alignment duration: {} ms, snapshot duration {} ms",
owner.getName(), checkpointMetaData.getCheckpointId(),
checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
checkpointMetrics.getSyncDurationMillis());
}
if (checkpointOptions.getCheckpointType().isSynchronous()) {
// in the case of a synchronous checkpoint, we always rethrow the exception,
// so that the task fails.
// this is because the intention is always to stop the job after this checkpointing
// operation, and without the failure, the task would go back to normal execution.
throw ex;
} else {
owner.getEnvironment().declineCheckpoint(checkpointMetaData.getCheckpointId(), ex);
}
}
}
在同步執(zhí)行階段缨睡,會依次調用每一個算子的 StreamOperator.snapshotState奖年,返回結果是一個 runnable future陋守。根據 checkpoint 配置成同步模式和異步模式的區(qū)別,這個 future 可能處于完成狀態(tài)媚送,也可能處于未完成狀態(tài):
private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
if (null != op) {
//同步過程調用算子的snapshotState方法季希,返回OperatorSnapshotFutures可能已完成或未完成
OperatorSnapshotFutures snapshotInProgress = op.snapshotState(
checkpointMetaData.getCheckpointId(),
checkpointMetaData.getTimestamp(),
checkpointOptions,
storageLocation);
operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress);
}
}
詳細過程在AbstractStreamOperator#snapshotState
public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,
CheckpointStreamFactory factory) throws Exception {
KeyGroupRange keyGroupRange = null != keyedStateBackend ?
keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();
StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
checkpointId,
timestamp,
factory,
keyGroupRange,
getContainingTask().getCancelables());
try {
//對狀態(tài)進行快照,包括KeyedState和OperatorState
snapshotState(snapshotContext);
snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
//寫入operatorState快照
if (null != operatorStateBackend) {
snapshotInProgress.setOperatorStateManagedFuture(
operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
//寫入keyedState快照
if (null != keyedStateBackend) {
snapshotInProgress.setKeyedStateManagedFuture(
keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
} catch (Exception snapshotException) {
try {
snapshotInProgress.cancel();
} catch (Exception e) {
snapshotException.addSuppressed(e);
}
String snapshotFailMessage = "Could not complete snapshot " + checkpointId + " for operator " +
getOperatorName() + ".";
if (!getContainingTask().isCanceled()) {
LOG.info(snapshotFailMessage, snapshotException);
}
try {
snapshotContext.closeExceptionally();
} catch (IOException e) {
snapshotException.addSuppressed(e);
}
throw new CheckpointException(snapshotFailMessage, CheckpointFailureReason.CHECKPOINT_DECLINED, snapshotException);
}
return snapshotInProgress;
}
我們知道state還分為raw state(原生state)和managed state(flink管理的state),timer定時器屬于raw state武学,也需要寫到snapshot中火窒。
/**
* Stream operators with state, which want to participate in a snapshot need to override this hook method.
*
* @param context context that provides information and means required for taking a snapshot
*/
public void snapshotState(StateSnapshotContext context) throws Exception {
final KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
//TODO all of this can be removed once heap-based timers are integrated with RocksDB incremental snapshots
// 所有的 timer 都作為 raw keyed state 寫入
if (keyedStateBackend instanceof AbstractKeyedStateBackend &&
((AbstractKeyedStateBackend<?>) keyedStateBackend).requiresLegacySynchronousTimerSnapshots()) {
KeyedStateCheckpointOutputStream out;
try {
out = context.getRawKeyedOperatorStateOutput();
} catch (Exception exception) {
throw new Exception("Could not open raw keyed operator state stream for " +
getOperatorName() + '.', exception);
}
try {
KeyGroupsList allKeyGroups = out.getKeyGroupList();
for (int keyGroupIdx : allKeyGroups) {
out.startNewKeyGroup(keyGroupIdx);
timeServiceManager.snapshotStateForKeyGroup(
new DataOutputViewStreamWrapper(out), keyGroupIdx);
}
} catch (Exception exception) {
throw new Exception("Could not write timer service of " + getOperatorName() +
" to checkpoint state stream.", exception);
} finally {
try {
out.close();
} catch (Exception closeException) {
LOG.warn("Could not close raw keyed operator state stream for {}. This " +
"might have prevented deleting some state data.", getOperatorName(), closeException);
}
}
}
}
上面是AbstractStreamOperator中的snapshotState做的操作,還有個子類AbstractUdfStreamOperator
public void snapshotState(StateSnapshotContext context) throws Exception {
//先調用父類方法票编,寫入timer
super.snapshotState(context);
StreamingFunctionUtils.snapshotFunctionState(context, getOperatorStateBackend(), userFunction);
}
public static void snapshotFunctionState(
StateSnapshotContext context,
OperatorStateBackend backend,
Function userFunction) throws Exception {
Preconditions.checkNotNull(context);
Preconditions.checkNotNull(backend);
while (true) {
if (trySnapshotFunctionState(context, backend, userFunction)) {
break;
}
// inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function
if (userFunction instanceof WrappingFunction) {
userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
} else {
break;
}
}
}
private static boolean trySnapshotFunctionState(
StateSnapshotContext context,
OperatorStateBackend backend,
Function userFunction) throws Exception {
//如果用戶函數(shù)實現(xiàn)了CheckpointedFunction接口,則調用udf中的snapshotState方法進行快照
if (userFunction instanceof CheckpointedFunction) {
((CheckpointedFunction) userFunction).snapshotState(context);
return true;
}
// 如果用戶函數(shù)實現(xiàn)了 ListCheckpointed
if (userFunction instanceof ListCheckpointed) {
//先調用 snapshotState 方法獲取當前狀態(tài)
@SuppressWarnings("unchecked")
List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction).
snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());
//獲取狀態(tài)后端存儲引用
ListState<Serializable> listState = backend.
getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
//清空
listState.clear();
//當前狀態(tài)寫入狀態(tài)后端存儲
if (null != partitionableState) {
try {
for (Serializable statePartition : partitionableState) {
listState.add(statePartition);
}
} catch (Exception e) {
listState.clear();
throw new Exception("Could not write partitionable state to operator " +
"state backend.", e);
}
}
return true;
}
return false;
}
到這里我們知道了checkpoint過程中如何調用到我們自己實現(xiàn)的快照方法。再看下flink管理的狀態(tài)是如何寫入快照的互订。
if (null != operatorStateBackend) {
snapshotInProgress.setOperatorStateManagedFuture(
operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
if (null != keyedStateBackend) {
snapshotInProgress.setKeyedStateManagedFuture(
keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
首先來看看 operator state屁奏。DefaultOperatorStateBackend 將實際的工作交給 DefaultOperatorStateBackendSnapshotStrategy 完成坟瓢。首先折联,會為對當前注冊的所有 operator state(包含 list state 和 broadcast state)做深度拷貝诚镰,然后將實際的寫入操作封裝在一個異步的 FutureTask 中清笨,這個 FutureTask 的主要任務包括: 1)打開輸出流 2)寫入狀態(tài)元數(shù)據信息 3)寫入狀態(tài) 4)關閉輸出流抠艾,獲得狀態(tài)句柄检号。如果不啟用異步checkpoint模式齐苛,那么這個 FutureTask 在同步階段就會立刻執(zhí)行凹蜂。
public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
final long checkpointId,
final long timestamp,
@Nonnull final CheckpointStreamFactory streamFactory,
@Nonnull final CheckpointOptions checkpointOptions) throws IOException {
if (registeredOperatorStates.isEmpty() && registeredBroadcastStates.isEmpty()) {
return DoneFuture.of(SnapshotResult.empty());
}
final Map<String, PartitionableListState<?>> registeredOperatorStatesDeepCopies =
new HashMap<>(registeredOperatorStates.size());
final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStatesDeepCopies =
new HashMap<>(registeredBroadcastStates.size());
ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(userClassLoader);
try {
// eagerly create deep copies of the list and the broadcast states (if any)
// in the synchronous phase, so that we can use them in the async writing.
//獲得已注冊的所有 list state 和 broadcast state 的深拷貝
if (!registeredOperatorStates.isEmpty()) {
for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStates.entrySet()) {
PartitionableListState<?> listState = entry.getValue();
if (null != listState) {
listState = listState.deepCopy();
}
registeredOperatorStatesDeepCopies.put(entry.getKey(), listState);
}
}
if (!registeredBroadcastStates.isEmpty()) {
for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry : registeredBroadcastStates.entrySet()) {
BackendWritableBroadcastState<?, ?> broadcastState = entry.getValue();
if (null != broadcastState) {
broadcastState = broadcastState.deepCopy();
}
registeredBroadcastStatesDeepCopies.put(entry.getKey(), broadcastState);
}
}
} finally {
Thread.currentThread().setContextClassLoader(snapshotClassLoader);
}
//將主要寫入操作封裝為一個異步的FutureTask
AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>> snapshotCallable =
new AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>>() {
@Override
protected SnapshotResult<OperatorStateHandle> callInternal() throws Exception {
// 創(chuàng)建狀態(tài)輸出流
CheckpointStreamFactory.CheckpointStateOutputStream localOut =
streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
snapshotCloseableRegistry.registerCloseable(localOut);
// 收集元數(shù)據
// get the registered operator state infos ...
List<StateMetaInfoSnapshot> operatorMetaInfoSnapshots =
new ArrayList<>(registeredOperatorStatesDeepCopies.size());
for (Map.Entry<String, PartitionableListState<?>> entry :
registeredOperatorStatesDeepCopies.entrySet()) {
operatorMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
}
// 寫入元數(shù)據
// ... get the registered broadcast operator state infos ...
List<StateMetaInfoSnapshot> broadcastMetaInfoSnapshots =
new ArrayList<>(registeredBroadcastStatesDeepCopies.size());
for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :
registeredBroadcastStatesDeepCopies.entrySet()) {
broadcastMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
}
// ... write them all in the checkpoint stream ...
DataOutputView dov = new DataOutputViewStreamWrapper(localOut);
OperatorBackendSerializationProxy backendSerializationProxy =
new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);
backendSerializationProxy.write(dov);
// ... and then go for the states ...
// 寫入狀態(tài)
// we put BOTH normal and broadcast state metadata here
int initialMapCapacity =
registeredOperatorStatesDeepCopies.size() + registeredBroadcastStatesDeepCopies.size();
final Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =
new HashMap<>(initialMapCapacity);
for (Map.Entry<String, PartitionableListState<?>> entry :
registeredOperatorStatesDeepCopies.entrySet()) {
PartitionableListState<?> value = entry.getValue();
long[] partitionOffsets = value.write(localOut);
OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
writtenStatesMetaData.put(
entry.getKey(),
new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
}
// ... and the broadcast states themselves ...
for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :
registeredBroadcastStatesDeepCopies.entrySet()) {
BackendWritableBroadcastState<?, ?> value = entry.getValue();
long[] partitionOffsets = {value.write(localOut)};
OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
writtenStatesMetaData.put(
entry.getKey(),
new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
}
// ... and, finally, create the state handle.
OperatorStateHandle retValue = null;
if (snapshotCloseableRegistry.unregisterCloseable(localOut)) {
//關閉輸出流菱父,獲得狀態(tài)句柄浙宜,后面可以用這個句柄讀取狀態(tài)
StreamStateHandle stateHandle = localOut.closeAndGetHandle();
if (stateHandle != null) {
retValue = new OperatorStreamStateHandle(writtenStatesMetaData, stateHandle);
}
return SnapshotResult.of(retValue);
} else {
throw new IOException("Stream was already unregistered.");
}
}
@Override
protected void cleanupProvidedResources() {
// nothing to do
}
@Override
protected void logAsyncSnapshotComplete(long startTime) {
if (asynchronousSnapshots) {
logAsyncCompleted(streamFactory, startTime);
}
}
};
final FutureTask<SnapshotResult<OperatorStateHandle>> task =
snapshotCallable.toAsyncSnapshotFutureTask(closeStreamOnCancelRegistry);
//如果不是異步 checkpoint 那么在這里直接運行 FutureTask粟瞬,即在同步階段就完成了狀態(tài)的寫入
if (!asynchronousSnapshots) {
task.run();
}
return task;
}
keyed state 寫入的基本流程與此相似裙品,但由于 keyed state 在存儲時有多種實現(xiàn),包括基于堆內存和 RocksDB 的不同實現(xiàn)辛慰,此外基于 RocksDB 的實現(xiàn)還包括支持增量 checkpoint帅腌,因而相比于 operator state 要更復雜一些速客。
至此五鲫,我們介紹了快照操作的第一個階段,即同步執(zhí)行的階段辅愿。異步執(zhí)行階段被封裝為 AsyncCheckpointRunnable,主要的操作包括 1)執(zhí)行同步階段創(chuàng)建的 FutureTask 2)完成后向 CheckpointCoordinator 發(fā)送 Ack 響應癞埠。
protected static final class AsyncCheckpointRunnable implements Runnable, Closeable {
@Override
public void run() {
FileSystemSafetyNet.initializeSafetyNetForThread();
try {
TaskStateSnapshot jobManagerTaskOperatorSubtaskStates =
new TaskStateSnapshot(operatorSnapshotsInProgress.size());
TaskStateSnapshot localTaskOperatorSubtaskStates =
new TaskStateSnapshot(operatorSnapshotsInProgress.size());
// 完成每一個 operator 的狀態(tài)寫入
// 如果是同步 checkpoint苗踪,那么在此之前狀態(tài)已經寫入完成
// 如果是異步 checkpoint通铲,那么在這里才會寫入狀態(tài)
for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : operatorSnapshotsInProgress.entrySet()) {
OperatorID operatorID = entry.getKey();
OperatorSnapshotFutures snapshotInProgress = entry.getValue();
// finalize the async part of all by executing all snapshot runnables
OperatorSnapshotFinalizer finalizedSnapshots =
new OperatorSnapshotFinalizer(snapshotInProgress);
jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
operatorID,
finalizedSnapshots.getJobManagerOwnedState());
localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
operatorID,
finalizedSnapshots.getTaskLocalState());
}
final long asyncEndNanos = System.nanoTime();
final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000L;
checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);
if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING,
CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {
//報告 snapshot 完成
reportCompletedSnapshotStates(
jobManagerTaskOperatorSubtaskStates,
localTaskOperatorSubtaskStates,
asyncDurationMillis);
} else {
LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.",
owner.getName(),
checkpointMetaData.getCheckpointId());
}
} catch (Exception e) {
handleExecutionException(e);
} finally {
owner.cancelables.unregisterCloseable(this);
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
}
}
}
private void reportCompletedSnapshotStates(
TaskStateSnapshot acknowledgedTaskStateSnapshot,
TaskStateSnapshot localTaskStateSnapshot,
long asyncDurationMillis) {
TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager();
boolean hasAckState = acknowledgedTaskStateSnapshot.hasState();
boolean hasLocalState = localTaskStateSnapshot.hasState();
// we signal stateless tasks by reporting null, so that there are no attempts to assign empty state
// to stateless tasks on restore. This enables simple job modifications that only concern
// stateless without the need to assign them uids to match their (always empty) states.
taskStateManager.reportTaskStateSnapshots(
checkpointMetaData,
checkpointMetrics,
hasAckState ? acknowledgedTaskStateSnapshot : null,
hasLocalState ? localTaskStateSnapshot : null);
}
}
public class TaskStateManagerImpl implements TaskStateManager {
@Override
public void reportTaskStateSnapshots(
@Nonnull CheckpointMetaData checkpointMetaData,
@Nonnull CheckpointMetrics checkpointMetrics,
@Nullable TaskStateSnapshot acknowledgedState,
@Nullable TaskStateSnapshot localState) {
long checkpointId = checkpointMetaData.getCheckpointId();
localStateStore.storeLocalState(checkpointId, localState);
//發(fā)送 ACK 響應給 CheckpointCoordinator
checkpointResponder.acknowledgeCheckpoint(
jobId,
executionAttemptID,
checkpointId,
checkpointMetrics,
acknowledgedState);
}
}
Checkpoint 的確認
Task 對 checkpoint 的響應是通過 CheckpointResponder 接口完成的:
public interface CheckpointResponder {
/**
* Acknowledges the given checkpoint.
*/
void acknowledgeCheckpoint(
JobID jobID,
ExecutionAttemptID executionAttemptID,
long checkpointId,
CheckpointMetrics checkpointMetrics,
TaskStateSnapshot subtaskState);
/**
* Declines the given checkpoint.
*/
void declineCheckpoint(
JobID jobID,
ExecutionAttemptID executionAttemptID,
long checkpointId,
Throwable cause);
}
RpcCheckpointResponder 作為 CheckpointResponder 的具體實現(xiàn)吧黄,主要是通過 RPC 調用通知 CheckpointCoordinatorGateway拗慨,即通知給 JobMaster, JobMaster 調用 CheckpointCoordinator.receiveAcknowledgeMessage() 和 CheckpointCoordinator.receiveDeclineMessage() 進行處理赵抢。
確認完成
在一個 Task 完成 checkpoint 操作后昌讲,CheckpointCoordinator 接收到 Ack 響應短绸,對 Ack 響應的處理流程主要如下:
- 根據 Ack 的 checkpointID 從 Map<Long, PendingCheckpoint> pendingCheckpoints 中查找對應的 PendingCheckpoint
- 若存在對應的 PendingCheckpoint
- 這個 PendingCheckpoint 沒有被丟棄醋闭,調用 PendingCheckpoint.acknowledgeTask 方法處理 Ack证逻,根據處理結果的不同:
- SUCCESS:判斷是否已經接受了所有需要響應的 Ack囚企,如果是龙宏,則調用 completePendingCheckpoint 完成此次 checkpoint
- DUPLICATE:Ack 消息重復接收银酗,直接忽略
- UNKNOWN:未知的 Ack 消息,清理上報的 Ack 中攜帶的狀態(tài)句柄
- DISCARD:Checkpoint 已經被 discard蛙讥,清理上報的 Ack 中攜帶的狀態(tài)句柄
- 這個 PendingCheckpoint 已經被丟棄次慢,拋出異常
- 這個 PendingCheckpoint 沒有被丟棄醋闭,調用 PendingCheckpoint.acknowledgeTask 方法處理 Ack证逻,根據處理結果的不同:
- 若不存在對應的 PendingCheckpoint经备,則清理上報的 Ack 中攜帶的狀態(tài)句柄
相應代碼:
class CheckpointCoordinator {
public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException {
if (shutdown || message == null) {
return false;
}
if (!job.equals(message.getJob())) {
LOG.error("Received wrong AcknowledgeCheckpoint message for job {}: {}", job, message);
return false;
}
final long checkpointId = message.getCheckpointId();
synchronized (lock) {
// we need to check inside the lock for being shutdown as well, otherwise we
// get races and invalid error log messages
if (shutdown) {
return false;
}
final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);
if (checkpoint != null && !checkpoint.isDiscarded()) {
switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) {
case SUCCESS:
LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {}.",
checkpointId, message.getTaskExecutionId(), message.getJob());
if (checkpoint.isFullyAcknowledged()) {
completePendingCheckpoint(checkpoint);
}
break;
case DUPLICATE:
LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}.",
message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
break;
case UNKNOWN:
LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
"because the task's execution attempt id was unknown. Discarding " +
"the state handle to avoid lingering state.", message.getCheckpointId(),
message.getTaskExecutionId(), message.getJob());
discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
break;
case DISCARDED:
LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
"because the pending checkpoint had been discarded. Discarding the " +
"state handle tp avoid lingering state.",
message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
}
return true;
}
else if (checkpoint != null) {
// this should not happen
throw new IllegalStateException(
"Received message for discarded but non-removed checkpoint " + checkpointId);
}
else {
boolean wasPendingCheckpoint;
// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
if (recentPendingCheckpoints.contains(checkpointId)) {
wasPendingCheckpoint = true;
LOG.warn("Received late message for now expired checkpoint attempt {} from " +
"{} of job {}.", checkpointId, message.getTaskExecutionId(), message.getJob());
}
else {
LOG.debug("Received message for an unknown checkpoint {} from {} of job {}.",
checkpointId, message.getTaskExecutionId(), message.getJob());
wasPendingCheckpoint = false;
}
// try to discard the state so that we don't have lingering state lying around
discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
return wasPendingCheckpoint;
}
}
}
}
對于一個已經觸發(fā)但還沒有完成的 checkpoint,即 PendingCheckpoint犁功,它是如何處理 Ack 消息的呢婚夫?在 PendingCheckpoint 內部維護了兩個 Map案糙,分別是:
- Map<OperatorID, OperatorState> operatorStates; : 已經接收到 Ack 的算子的狀態(tài)句柄
- Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks;: 需要 Ack 但還沒有接收到的 Task
每當接收到一個 Ack 消息時时捌,PendingCheckpoint 就從 notYetAcknowledgedTasks 中移除對應的 Task奢讨,并保存 Ack 攜帶的狀態(tài)句柄保存拿诸。當 notYetAcknowledgedTasks 為空時亩码,表明所有的 Ack 消息都接收到了蟀伸。
一旦 PendingCheckpoint 確認所有 Ack 消息都已經接收蚀同,那么就可以完成此次 checkpoint 了,具體包括:
- 調用 PendingCheckpoint.finalizeCheckpoint() 將 PendingCheckpoint 轉化為 CompletedCheckpoint
- 獲取 CheckpointMetadataOutputStream啊掏,將所有的狀態(tài)句柄信息通過 CheckpointMetadataOutputStream 寫入到存儲系統(tǒng)中
- 創(chuàng)建一個 CompletedCheckpoint 對象
- 將 CompletedCheckpoint 保存到 CompletedCheckpointStore 中
- CompletedCheckpointStore 有兩種實現(xiàn)蠢络,分別為 StandaloneCompletedCheckpointStore 和 ZooKeeperCompletedCheckpointStore
- StandaloneCompletedCheckpointStore 簡單地將 CompletedCheckpointStore 存放在一個數(shù)組中
- ZooKeeperCompletedCheckpointStore 提供高可用實現(xiàn):先將 CompletedCheckpointStore 寫入到 RetrievableStateStorageHelper 中(通常是文件系統(tǒng)),然后將文件句柄存在 ZK 中
- 保存的 CompletedCheckpointStore 數(shù)量是有限的迟蜜,會刪除舊的快照
- 移除被越過的 PendingCheckpoint刹孔,因為 CheckpointID 是遞增的,那么所有比當前完成的 CheckpointID 小的 PendingCheckpoint 都可以被丟棄了
- 依次調用 Execution.notifyCheckpointComplete() 通知所有的 Task 當前 Checkpoint 已經完成
- 通過 RPC 調用 TaskExecutor.confirmCheckpoint() 告知對應的 Task
Task收到notifyCheckpointComplete確認后進行后續(xù)處理髓霞,比如kafkaproduce的兩段式提交過程。
總結
本文分析了checkpoint進行snapshot的過程畦戒,包括廣播barrier方库、進行snapshot以及checkpoint完成后的ACK過程。