Flink源碼閱讀(四)--- checkpoint制作這篇文章介紹了checkpoint制作原理缀雳,這篇文章在此基礎(chǔ)上,介紹下怎么從checkpoint/savepoint恢復(fù)穿撮。本文內(nèi)容是基于Flink 1.9來講解漱受。
1. 概述
作業(yè)從狀態(tài) checkpoint / savepoint 的情況簡單總結(jié)主要是兩種
- 作業(yè)手動重啟精肃,從savepoint恢復(fù)
- 作業(yè)運(yùn)行過程中贝咙,某個(gè)task執(zhí)行失敗,從checkpoint恢復(fù)
savepoint是一種人為主動觸發(fā)生成的checkpoint,所以checkpoint/savepoint 恢復(fù)的原理是一樣的阳仔。下面以工作中比較常見的某個(gè)task失敗豌拙,作業(yè)如何恢復(fù)為例進(jìn)行介紹弓叛。
2. 狀態(tài)分配
首先說明下Task的狀態(tài)state都有哪些镶苞,可以看ExecutionState.java類
CREATED,
SCHEDULED,
DEPLOYING,
RUNNING,
/**
* This state marks "successfully completed". It can only be reached when a
* program reaches the "end of its input". The "end of input" can be reached
* when consuming a bounded input (fix set of files, bounded query, etc) or
* when stopping a program (not cancelling!) which make the input look like
* it reached its end at a specific point.
*/
FINISHED,
CANCELING,
CANCELED,
FAILED,
RECONCILING;
Task各個(gè)state的轉(zhuǎn)換關(guān)系如下:
* CREATED -> SCHEDULED -> DEPLOYING -> RUNNING -> FINISHED
* | | | |
* | | | +------+
* | | V V
* | | CANCELLING -----+----> CANCELED
* | | |
* | +-------------------------+
* |
* | ... -> FAILED
* V
* RECONCILING -> RUNNING | FINISHED | CANCELED | FAILED
Task進(jìn)行state轉(zhuǎn)換肝断,是調(diào)用的
Execution#transitionState --> vertex.notifyStateTransition --> getExecutionGraph().notifyExecutionChange
如果task變成FAILED蝶柿,就會調(diào)用
failoverStrategy.onTaskFailure --> AdaptedRestartPipelinedRegionStrategyNG#onTaskFailure --> restartTasks --> resetAndRescheduleTasks --> createResetAndRescheduleTasksCallback
這里restartTasks方法的參數(shù)是該P(yáng)ipeline上所有需要restart的task丈钙。
重點(diǎn)看下createResetAndRescheduleTasksCallback方法做了什么,看下源碼
LOG.info("Finally restart {} tasks to recover from task failure.", unmodifiedVertices.size());
// reset tasks to CREATED state and reload state
resetTasks(unmodifiedVertices, globalModVersion);
// re-schedule tasks
rescheduleTasks(unmodifiedVertices, globalModVersion);
做了兩件事情交汤,重置Tasks (狀態(tài)分配) 和 重新調(diào)度Tasks雏赦,下面介紹下重置Tasks方法
2.1 重置Tasks
第一步:為每個(gè)節(jié)點(diǎn)重置Execution
for (ExecutionVertex ev : vertices) {
CoLocationGroup cgroup = ev.getJobVertex().getCoLocationGroup();
if (cgroup != null && !colGroups.contains(cgroup)){
cgroup.resetConstraints();
colGroups.add(cgroup);
}
ev.resetForNewExecution(restartTimestamp, globalModVersion);
}
第二步:把pendingCheckpoints這個(gè)map中所有正在做的checkpoint fail掉
executionGraph.getCheckpointCoordinator().abortPendingCheckpoints(
new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION));
第三步:從最近完成的checkpoint恢復(fù)state
executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedState(
involvedExecutionJobVertices, false, true);
接下來重點(diǎn)看下第三步怎么從checkpoint恢復(fù)的?
2.1.1 首先找到最近完成的一個(gè)latestCheckpoint
?? 如果latestCheckpoint==null
?? ?? 如果 errorIfNoCheckpoint 開關(guān)為true芙扎,直接拋IllegalStateException
?? ?? 如果 errorIfNoCheckpoint 開關(guān)為false星岗,直接return
2.1.2 給Tasks分配states,stateAssignmentOperation.assignStates()戒洼,主要做了下面幾件事情:
?? 1. 對于checkpoint中所有的operatorStates俏橘,check在新tasks中是否都有對應(yīng)的operatorID。如果在新tasks中缺少operatorStates中某一個(gè)operatorID圈浇,(i) allowNonRestoredState==true, 跳過該operatorID (ii) allowNonRestoredState==false, 拋IllegalStateException異常寥掐。
checkStateMappingCompleteness(allowNonRestoredState, operatorStates, tasks);
?? 2. 遍歷所有的Tasks
(1) 對于每個(gè)Task的所有operator:(i) 如果在checkpoint中存在對應(yīng)的state,直接記錄在operatorStates list中磷蜀,(ii) 如果在checkpoint中沒有對應(yīng)的state召耘,就為該operatorID初始化一個(gè)OperatorState,并記錄在operatorStates list中褐隆。(iii) 對于并發(fā)度改變的縮擴(kuò)容情況污它,對state進(jìn)行重新分配,具體可以參考 state縮擴(kuò)容。
?? 最終每個(gè)Task分配的狀態(tài)被封裝在 JobManagerTaskRestore 中衫贬,然后通過 Execution.setInitialState() 關(guān)聯(lián)到 Execution 中蜜宪。JobManagerTaskRestore 會作為 TaskDeploymentDescriptor 的一個(gè)屬性下發(fā)到 TaskExecutor 中。 縮擴(kuò)容state重新分配簡單總結(jié)如下:
?? Operator State:state存儲實(shí)現(xiàn)ListCheckpointed接口祥山,這種實(shí)現(xiàn)的優(yōu)點(diǎn)是可以對state根據(jù)并發(fā)方便重新分配圃验。用戶也可以重寫restore state邏輯。
?? Keyed State:Flink引入了Key Group的概念缝呕,將Key Group作為Keyed State的基本分配單元澳窑,如果并發(fā)度改變,就可以重新計(jì)算key group分配供常,然后分到不同的算子中摊聋。
?? (iiii)補(bǔ)充一點(diǎn),在對state重新分配的時(shí)候栈暇,會檢查新提交tasks的Parallelism與上次operatorStates的MaxParallelism的關(guān)系麻裁,源碼可參考 StateAssignmentOperation#checkParallelismPreconditions方法
?? 1. 如果 task的并發(fā)度 > checkpoint中operatorState的最大并發(fā)度, 就直接拋異常
?? 2. 如果 task的最大并發(fā)度 != operatorState的最大并發(fā)度
???? 2.1 如果 task的最大并發(fā)度沒有自己配置源祈,那把task的最大并發(fā)度就設(shè)置為operatorState的最大并發(fā)度
???? 2.2 如果自己配置了最大并發(fā)度煎源,就直接拋異常
if (operatorState.getMaxParallelism() < executionJobVertex.getParallelism()) {
throw new IllegalStateException(
"The state for task "
+ executionJobVertex.getJobVertexId()
+ " can not be restored. The maximum parallelism ("
+ operatorState.getMaxParallelism()
+ ") of the restored state is lower than the configured parallelism ("
+ executionJobVertex.getParallelism()
+ "). Please reduce the parallelism of the task to be lower or equal to the maximum parallelism.");
}
// check that the number of key groups have not changed or if we need to override it to
// satisfy the restored state
if (operatorState.getMaxParallelism() != executionJobVertex.getMaxParallelism()) {
if (!executionJobVertex.isMaxParallelismConfigured()) {
// if the max parallelism was not explicitly specified by the user, we derive it
// from the state
LOG.debug(
"Overriding maximum parallelism for JobVertex {} from {} to {}",
executionJobVertex.getJobVertexId(),
executionJobVertex.getMaxParallelism(),
operatorState.getMaxParallelism());
executionJobVertex.setMaxParallelism(operatorState.getMaxParallelism());
} else {
// if the max parallelism was explicitly specified, we complain on mismatch
throw new IllegalStateException(
"The maximum parallelism ("
+ operatorState.getMaxParallelism()
+ ") with which the latest "
+ "checkpoint of the execution job vertex "
+ executionJobVertex
+ " has been taken and the current maximum parallelism ("
+ executionJobVertex.getMaxParallelism()
+ ") changed. This "
+ "is currently not supported.");
}
}
至于operatorState的最大并發(fā)度怎么計(jì)算的,等于存儲operator對應(yīng)的ExecutionJobVertex的最大并發(fā)度香缺,ExecutionJobVertex的最大并發(fā)度可以參考ExecutionJobVertex類的構(gòu)造方法
?? 1. 如果task設(shè)置了最大并發(fā)度手销,就按照設(shè)置的來
?? 2. 如果task沒有設(shè)置最大并發(fā)度,就根據(jù)算子并發(fā)度來計(jì)算图张,可以參考 KeyGroupRangeAssignment#computeDefaultMaxParallelism方法锋拖,min(max(parallelism向上取整到2的最近冪, 2^7), 2^15)
public static int computeDefaultMaxParallelism(int operatorParallelism) {
checkParallelismPreconditions(operatorParallelism);
return Math.min(
Math.max(
MathUtils.roundUpToPowerOfTwo(
operatorParallelism + (operatorParallelism / 2)),
DEFAULT_LOWER_BOUND_MAX_PARALLELISM),
UPPER_BOUND_MAX_PARALLELISM);
}
至此祸轮,重置Tasks的邏輯大體就介紹完了兽埃。
2.2 調(diào)度Tasks
入口是AdaptedRestartPipelinedRegionStrategyNG#rescheduleTasks方法,真正開始執(zhí)行調(diào)度的是SchedulingUtils.schedule方法适袜。
關(guān)于task調(diào)度的內(nèi)容柄错,可以看下我之前寫的一篇文章 Flink作業(yè)提交(三)--- Job運(yùn)行, 調(diào)度分為兩步,申請slot和deploy task痪蝇。
在deploy task的時(shí)候鄙陡,首先會調(diào)用StreamTask.invoke()方法冕房,在invoke方法中躏啰,會對該Task中每個(gè)operator調(diào)用initializeState()方法,這里看下initializeState#initializeState源碼
private void initializeState() throws Exception {
StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
for (StreamOperator<?> operator : allOperators) {
if (null != operator) {
operator.initializeState();
}
}
}
然后會調(diào)用AbstractStreamOperator#initializeState方法
@Override
public final void initializeState() throws Exception {
final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());
final StreamTask<?, ?> containingTask =
Preconditions.checkNotNull(getContainingTask());
final CloseableRegistry streamTaskCloseableRegistry =
Preconditions.checkNotNull(containingTask.getCancelables());
final StreamTaskStateInitializer streamTaskStateManager =
Preconditions.checkNotNull(containingTask.createStreamTaskStateInitializer());
final StreamOperatorStateContext context =
streamTaskStateManager.streamOperatorStateContext(
getOperatorID(),
getClass().getSimpleName(),
this,
keySerializer,
streamTaskCloseableRegistry,
metrics);
this.operatorStateBackend = context.operatorStateBackend();
this.keyedStateBackend = context.keyedStateBackend();
if (keyedStateBackend != null) {
this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());
}
timeServiceManager = context.internalTimerServiceManager();
CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = context.rawKeyedStateInputs();
CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = context.rawOperatorStateInputs();
try {
StateInitializationContext initializationContext = new StateInitializationContextImpl(
context.isRestored(), // information whether we restore or start for the first time
operatorStateBackend, // access to operator state backend
keyedStateStore, // access to keyed state backend
keyedStateInputs, // access to keyed state stream
operatorStateInputs); // access to operator state stream
initializeState(initializationContext);
} finally {
closeFromRegistry(operatorStateInputs, streamTaskCloseableRegistry);
closeFromRegistry(keyedStateInputs, streamTaskCloseableRegistry);
}
}
上面提到 TaskExecutor 使用 TaskStateManager 來管理當(dāng)前 Task 的狀態(tài)耙册,TaskStateManager 對象會基于分配的 JobManagerTaskRestore 和本地狀態(tài)存儲 TaskLocalStateStore 進(jìn)行創(chuàng)建给僵。
狀態(tài)初始化的關(guān)鍵方法在于通過 StreamTaskStateInitializer.streamOperatorStateContext() 生成 StreamOperatorStateContext,通過 StreamOperatorStateContext 可以獲取 operatorStateBackend,Raw State Streams帝际,operatorStateBackend以及timeServiceManager等蔓同,然后就可以進(jìn)行狀態(tài)恢復(fù)了。
咱們接著看下StreamOperatorStateContext是怎么生成的蹲诀,具體實(shí)現(xiàn)可以看下 StreamTaskStateInitializerImpl#streamOperatorStateContext方法
TaskInfo taskInfo = environment.getTaskInfo();
OperatorSubtaskDescriptionText operatorSubtaskDescription =
new OperatorSubtaskDescriptionText(
operatorID,
operatorClassName,
taskInfo.getIndexOfThisSubtask(),
taskInfo.getNumberOfParallelSubtasks());
final String operatorIdentifierText = operatorSubtaskDescription.toString();
final PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates =
taskStateManager.prioritizedOperatorState(operatorID);
AbstractKeyedStateBackend<?> keyedStatedBackend = null;
OperatorStateBackend operatorStateBackend = null;
CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = null;
CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = null;
InternalTimeServiceManager<?> timeServiceManager;
try {
// -------------- Keyed State Backend --------------
keyedStatedBackend = keyedStatedBackend(
keySerializer,
operatorIdentifierText,
prioritizedOperatorSubtaskStates,
streamTaskCloseableRegistry,
metricGroup);
// -------------- Operator State Backend --------------
operatorStateBackend = operatorStateBackend(
operatorIdentifierText,
prioritizedOperatorSubtaskStates,
streamTaskCloseableRegistry);
// -------------- Raw State Streams --------------
rawKeyedStateInputs = rawKeyedStateInputs(
prioritizedOperatorSubtaskStates.getPrioritizedRawKeyedState().iterator());
streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);
rawOperatorStateInputs = rawOperatorStateInputs(
prioritizedOperatorSubtaskStates.getPrioritizedRawOperatorState().iterator());
streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);
// -------------- Internal Timer Service Manager --------------
timeServiceManager = internalTimeServiceManager(keyedStatedBackend, keyContext, rawKeyedStateInputs);
// -------------- Preparing return value --------------
return new StreamOperatorStateContextImpl(
prioritizedOperatorSubtaskStates.isRestored(),
operatorStateBackend,
keyedStatedBackend,
timeServiceManager,
rawOperatorStateInputs,
rawKeyedStateInputs);
- 為了生成 StreamOperatorStateContext
?? 1. 通過 TaskStateManager.prioritizedOperatorState() 方法獲得每個(gè) Operator 需要恢復(fù)的狀態(tài)句柄斑粱。
?? 2. 使用獲得的狀態(tài)句柄創(chuàng)建并還原 state backend 和 timer。這里引入了 PrioritizedOperatorSubtaskState脯爪,它封裝了多個(gè)備選的 OperatorSubtaskState快照则北,這些快照相互之間是可以(部分)替換的,并按照優(yōu)先級排序痕慢。
小結(jié)
本篇文章介紹了當(dāng)作業(yè)某些Task fail之后尚揣,Task狀態(tài)如何分配,以及調(diào)度Task怎么使用state進(jìn)行恢復(fù)掖举。