Flink 源碼筆記 --- checkpoint
設(shè)置觸發(fā)checkpoint的節(jié)點(diǎn)
ExecutionGraph該對(duì)象運(yùn)行在 SchedulerBase中, SchedulerBase運(yùn)行在JobMaster中
ExecutionGraph的enableCheckpointing方法 初始化了checkpointCoordinator(檢查點(diǎn)協(xié)調(diào)器)對(duì)象, 改對(duì)象運(yùn)行在JobManager中
checkpointCoordinator 負(fù)責(zé)分布式系統(tǒng)下checkpoint過程 主要職責(zé):
- 定時(shí)觸發(fā)checkpoint,命令數(shù)據(jù)源發(fā)送 checkpoint barrier
- 接受各個(gè)operator的某個(gè)checkpoint完成確認(rèn)消息
- 對(duì)于某個(gè)checkpoint,當(dāng)接受到所有的operator的確認(rèn)消息之時(shí),發(fā)送消息通知各個(gè)operator,checkpoint已完成
- 保存已完成和正在進(jìn)行中的checkpoint的相關(guān)信息
在構(gòu)建checkpointCoordinator時(shí),傳入一個(gè)變量 tasksToTrigger ,是需要觸發(fā)checkpoint的節(jié)點(diǎn),該變量在StreamingJobGraphGenerator的configureCheckpointing方法中創(chuàng)建
在創(chuàng)建JobMaster的時(shí)候, 對(duì)schedulerNG進(jìn)行了初始化
this.schedulerNGFactory = checkNotNull(schedulerNGFactory);
this.schedulerNG = createScheduler(jobManagerJobMetricGroup);
createScheduler該方法會(huì)調(diào)用 schedulerNGFactory的createInstance,獲取調(diào)度器
DefaultSchedulerFactory和LegacySchedulerFactory分別創(chuàng)建DefaultScheduler和LegacyScheduler實(shí)例,這兩者都繼承SchedulerBase啸蜜,實(shí)例化時(shí)都會(huì)調(diào)用SchedulerBase的構(gòu)造方法,其中會(huì)構(gòu)造ExecutionGraph,然后通過startScheduling進(jìn)行調(diào)度
private SchedulerNG createScheduler(final JobManagerJobMetricGroup jobManagerJobMetricGroup) throws Exception {
return schedulerNGFactory.createInstance(
log,
jobGraph,
backPressureStatsTracker,
scheduledExecutorService,
jobMasterConfiguration.getConfiguration(),
scheduler,
scheduledExecutorService,
userCodeLoader,
highAvailabilityServices.getCheckpointRecoveryFactory(),
rpcTimeout,
blobWriter,
jobManagerJobMetricGroup,
jobMasterConfiguration.getSlotRequestTimeout(),
shuffleMaster,
partitionTracker);
}
? 在DefaultScheduler和 LegacyScheduler 類中, 創(chuàng)建時(shí)候會(huì)初始化父類 SchedulerBase
DefaultScheduler(
final Logger log,
final JobGraph jobGraph,
final BackPressureStatsTracker backPressureStatsTracker,
final Executor ioExecutor,
final Configuration jobMasterConfiguration,
final ScheduledExecutorService futureExecutor,
final ScheduledExecutor delayExecutor,
final ClassLoader userCodeLoader,
final CheckpointRecoveryFactory checkpointRecoveryFactory,
final Time rpcTimeout,
final BlobWriter blobWriter,
final JobManagerJobMetricGroup jobManagerJobMetricGroup,
final ShuffleMaster<?> shuffleMaster,
final JobMasterPartitionTracker partitionTracker,
final SchedulingStrategyFactory schedulingStrategyFactory,
final FailoverStrategy.Factory failoverStrategyFactory,
final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
final ExecutionVertexOperations executionVertexOperations,
final ExecutionVertexVersioner executionVertexVersioner,
final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory) throws Exception {
super(
log,
jobGraph,
backPressureStatsTracker,
ioExecutor,
jobMasterConfiguration,
new ThrowingSlotProvider(), // this is not used any more in the new scheduler
futureExecutor,
userCodeLoader,
checkpointRecoveryFactory,
rpcTimeout,
new ThrowingRestartStrategy.ThrowingRestartStrategyFactory(),
blobWriter,
jobManagerJobMetricGroup,
Time.seconds(0), // this is not used any more in the new scheduler
shuffleMaster,
partitionTracker,
executionVertexVersioner,
false);
...
? 繼續(xù)看checkpoints過程
JobMaster觸發(fā)savepoint的時(shí)候會(huì)啟動(dòng)checkpoint過程。現(xiàn)在查看一下JobMaster
的triggerSavepoint
方法
//觸發(fā)保存點(diǎn)
@Override
public CompletableFuture<String> triggerSavepoint(
@Nullable final String targetDirectory,
final boolean cancelJob,
final Time timeout) {
//調(diào)用的 SchedulerBase實(shí)現(xiàn)類的方法
return schedulerNG.triggerSavepoint(targetDirectory, cancelJob);
}
在該方法中,調(diào)用了schedulerNG的triggerSavepoint同名方法,這里調(diào)用的是SchedulerNG接口的實(shí)現(xiàn)類 SchedulerBase中的方法,該類同時(shí)是一個(gè)抽象類
public abstract class SchedulerBase implements SchedulerNG
現(xiàn)在看一下schedulerNG.triggerSavepoint方法 ,該方法中主要獲取checkpointCoordinator(檢查點(diǎn)協(xié)調(diào)器),然后調(diào)用checkpointCoordinator的triggerSavepoint方法
@Override
public CompletableFuture<String> triggerSavepoint(final String targetDirectory, final boolean cancelJob) {
//確保運(yùn)行在主線程
mainThreadExecutor.assertRunningInMainThread();
//從executionGraph中獲取CheckpointCoordinator(檢查點(diǎn)協(xié)調(diào)器)
final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
if (checkpointCoordinator == null) {
throw new IllegalStateException(
String.format("Job %s is not a streaming job.", jobGraph.getJobID()));
} else if (targetDirectory == null && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
// 確保配置了savepoint默認(rèn)存儲(chǔ)目錄,或者方法中傳入了存儲(chǔ)目錄
log.info("Trying to cancel job {} with savepoint, but no savepoint directory configured.", jobGraph.getJobID());
throw new IllegalStateException(
"No savepoint directory configured. You can either specify a directory " +
"while cancelling via -s :targetDirectory or configure a cluster-wide " +
"default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'.");
}
// 如果是取消作業(yè)豁护,停止checkpoint協(xié)調(diào)器
if (cancelJob) {
checkpointCoordinator.stopCheckpointScheduler();
}
// 先觸發(fā)一次savepoint操作(實(shí)際上觸發(fā)的是checkpoint)
// 接下來返回checkpoint操作保存的文件路徑
// 最后執(zhí)行:
// 1.如果需要取消作業(yè)铃辖,并且之前步驟拋出了異常,則再次啟動(dòng)checkpoint協(xié)調(diào)器冬筒,拋出異常
// 2.如果需要取消作業(yè),之前步驟沒有拋出異常茅主,取消任務(wù)執(zhí)行
return checkpointCoordinator
.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
.thenApply(CompletedCheckpoint::getExternalPointer)
.handleAsync((path, throwable) -> {
if (throwable != null) {
if (cancelJob) {
startCheckpointScheduler(checkpointCoordinator);
}
throw new CompletionException(throwable);
} else if (cancelJob) {
log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID());
cancel();
}
return path;
}, mainThreadExecutor);
}
進(jìn)入checkpointCoordinator的triggerSavepoint方法中,在該方法中,首先說去checkpointProperties(檢查點(diǎn)配置),而后調(diào)用triggerSavepointInternal方法并返回
public CompletableFuture<CompletedCheckpoint> triggerSavepoint(
final long timestamp,
@Nullable final String targetLocation) {
final CheckpointProperties properties = CheckpointProperties.forSavepoint();
return triggerSavepointInternal(timestamp, properties, false, targetLocation);
}
跟蹤 triggerSavepointInternal方法進(jìn)去,該方法中主要邏輯都在triggerCheckpoint中
private CompletableFuture<CompletedCheckpoint> triggerSavepointInternal(
final long timestamp,
final CheckpointProperties checkpointProperties,
final boolean advanceToEndOfEventTime,
@Nullable final String targetLocation) {
checkNotNull(checkpointProperties);
// TODO, call triggerCheckpoint directly after removing timer thread
// for now, execute the trigger in timer thread to avoid competition
final CompletableFuture<CompletedCheckpoint> resultFuture = new CompletableFuture<>();
timer.execute(() -> triggerCheckpoint(
timestamp,
checkpointProperties,
targetLocation,
false,
advanceToEndOfEventTime)
.whenComplete((completedCheckpoint, throwable) -> {
if (throwable == null) {
resultFuture.complete(completedCheckpoint);
} else {
resultFuture.completeExceptionally(throwable);
}
}));
return resultFuture;
}
繼續(xù)追蹤triggerCheckpoint方法,
該方法中的主要邏輯 :
1.首先進(jìn)行觸發(fā)Checkpoint之前的預(yù)檢查舞痰,判斷是否滿足條件;
2.然后獲取一個(gè)CheckpointID诀姚,創(chuàng)建PendingCheckpoint實(shí)例响牛;
3.之后重新檢查觸發(fā)條件是否滿足要求,防止產(chǎn)生競(jìng)態(tài)條件赫段;
4.最后將PendingCheckpoint實(shí)例checkpoint加入到pendingCheckpoints中呀打,并向tasks發(fā)送消息觸發(fā)它們的檢查點(diǎn)。
@VisibleForTesting
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
long timestamp, //觸發(fā)檢查點(diǎn)時(shí)間戳
CheckpointProperties props, //檢查點(diǎn)配置
@Nullable String externalSavepointLocation, //外部保存點(diǎn)位置
boolean isPeriodic, //是否時(shí)周期性的
boolean advanceToEndOfTime) { //提前到結(jié)束時(shí)間
// Sanity check 如果檢查點(diǎn)是存儲(chǔ)在外部系統(tǒng)中且targetDirectory為空糯笙,報(bào)錯(cuò)
// 不能為同步 設(shè)置了保存點(diǎn)
if (advanceToEndOfTime && !(props.isSynchronous() && props.isSavepoint())) {
return FutureUtils.completedExceptionally(new IllegalArgumentException(
"Only synchronous savepoints are allowed to advance the watermark to MAX."));
}
//CompletableFuture 該對(duì)象 是1.8新特性,表示一個(gè)任務(wù)使用
//CompletedCheckpoint 完成的檢查點(diǎn) 包含 檢查點(diǎn)id 完成時(shí)間等元數(shù)據(jù)
final CompletableFuture<CompletedCheckpoint> onCompletionPromise =
new CompletableFuture<>();
//make some eager pre-checks 一些checkpoint之前的預(yù)檢查
synchronized (lock) {
//如果觸發(fā)請(qǐng)求, 或者在隊(duì)列中存在 不能觸發(fā)檢查點(diǎn)
if (isTriggering || !triggerRequestQueue.isEmpty()) {
// we can't trigger checkpoint directly if there is a trigger request being processed
// or queued
//創(chuàng)建一個(gè)新的檢查點(diǎn)請(qǐng)求加入隊(duì)列
triggerRequestQueue.add(new CheckpointTriggerRequest(
timestamp,
props,
externalSavepointLocation,
isPeriodic,
advanceToEndOfTime,
onCompletionPromise));
return onCompletionPromise;
}
}
//觸發(fā)檢查點(diǎn)
startTriggeringCheckpoint(
timestamp,
props,
externalSavepointLocation,
isPeriodic,
advanceToEndOfTime,
onCompletionPromise);
return onCompletionPromise;
}
進(jìn)入startTriggeringCheckpoint方法
private void startTriggeringCheckpoint(
long timestamp,
CheckpointProperties props,
@Nullable String externalSavepointLocation,
boolean isPeriodic,
boolean advanceToEndOfTime,
CompletableFuture<CompletedCheckpoint> onCompletionPromise) {
try {
// make some eager pre-checks
// 觸發(fā)之前 對(duì)checkpoint 進(jìn)行預(yù)檢查
synchronized (lock) { // 是否為周期的 是否是強(qiáng)制執(zhí)行檢查點(diǎn)
preCheckBeforeTriggeringCheckpoint(isPeriodic, props.forceCheckpoint());
}
//檢查我們需要觸發(fā)的所有任務(wù)是否都在運(yùn)行贬丛。如果不是,則放棄檢查點(diǎn)
final Execution[] executions = getTriggerExecutions();
final Map<ExecutionAttemptID, ExecutionVertex> ackTasks = getAckTasks();
// we will actually trigger this checkpoint!
Preconditions.checkState(!isTriggering);
isTriggering = true;
final CompletableFuture<PendingCheckpoint> pendingCheckpointCompletableFuture =
initializeCheckpoint(props, externalSavepointLocation) //初始化 checkpoints
//該方法 當(dāng)前階段正常完成以后執(zhí)行给涕,而且當(dāng)前階段的執(zhí)行的結(jié)果會(huì)作為下一階段的輸入?yún)?shù)豺憔。
// thenApplyAsync默認(rèn)是異步執(zhí)行的玻墅。這里所謂的異步指的是不在當(dāng)前線程內(nèi)執(zhí)行 todo 屬于回調(diào)函數(shù)
.thenApplyAsync(
(checkpointIdAndStorageLocation) -> createPendingCheckpoint( //創(chuàng)建掛起的檢查點(diǎn)
timestamp,
props,
ackTasks,
isPeriodic,
checkpointIdAndStorageLocation.checkpointId,
checkpointIdAndStorageLocation.checkpointStorageLocation,
onCompletionPromise),
timer);
//主狀態(tài)完成
final CompletableFuture<?> masterStatesComplete = pendingCheckpointCompletableFuture
//是結(jié)合兩個(gè)任務(wù)的返回值進(jìn)行轉(zhuǎn)化后再返回
.thenCompose(this::snapshotMasterState);
//協(xié)調(diào)檢查點(diǎn)完成
final CompletableFuture<?> coordinatorCheckpointsComplete = pendingCheckpointCompletableFuture
// 異步的
.thenComposeAsync((pendingCheckpoint) ->
//所有與接受 操作符協(xié)調(diào)器 的檢查點(diǎn)相關(guān)的邏輯
OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
coordinatorsToCheckpoint, pendingCheckpoint, timer),
timer);
CompletableFuture.allOf(masterStatesComplete, coordinatorCheckpointsComplete)
//上面完成 或 拋出異常 都會(huì)執(zhí)行該方法
.whenCompleteAsync(
(ignored, throwable) -> {
//該方法返回一個(gè)未來的執(zhí)行的結(jié)果, 如果異常執(zhí)行完或未完成 未null
//創(chuàng)建一個(gè)進(jìn)行中的checkpoint
final PendingCheckpoint checkpoint =
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
if (throwable == null && checkpoint != null && !checkpoint.isDiscarded()) {
//不是丟棄的 , 沒有意外,
// no exception, no discarding, everything is OK
//快照任務(wù)狀態(tài)
snapshotTaskState(
timestamp,
checkpoint.getCheckpointId(),
checkpoint.getCheckpointStorageLocation(),
props,
executions,
advanceToEndOfTime);
//觸發(fā)請(qǐng)求成功必須調(diào)用它
onTriggerSuccess();
} else {
// the initialization might not be finished yet
if (checkpoint == null) {
onTriggerFailure(onCompletionPromise, throwable);
} else {
onTriggerFailure(checkpoint, throwable);
}
}
},
timer);
} catch (Throwable throwable) {
onTriggerFailure(onCompletionPromise, throwable);
}
}
由于方法比較長(zhǎng),查看該方法調(diào)用的重點(diǎn)方法中具體實(shí)現(xiàn)
preCheckBeforeTriggeringCheckpoint方法,主要對(duì)觸發(fā)檢查點(diǎn)前進(jìn)行預(yù)檢查
private void preCheckBeforeTriggeringCheckpoint(boolean isPeriodic, boolean forceCheckpoint) throws CheckpointException {
//檢查 檢查點(diǎn)全局狀態(tài) -- 調(diào)度器和協(xié)調(diào)器的檢查
preCheckGlobalState(isPeriodic);
//如果不是強(qiáng)制執(zhí)行的檢查點(diǎn)
if (!forceCheckpoint) {
//檢查 是否并行的檢查點(diǎn)數(shù)量是否查過并發(fā)
checkConcurrentCheckpoints();
//檢查 檢查點(diǎn)之間的最小間隔已經(jīng)通過
checkMinPauseBetweenCheckpoints();
}
}
getTriggerExecutions方法,查我們需要觸發(fā)的所有任務(wù)是否都在運(yùn)行棍矛。如果不是,則放棄檢查點(diǎn)并拋出異常.
private Execution[] getTriggerExecutions() throws CheckpointException {
Execution[] executions = new Execution[tasksToTrigger.length];
for (int i = 0; i < tasksToTrigger.length; i++) {
Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
if (ee == null) {
LOG.info(
"Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
job);
throw new CheckpointException(
CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
} else if (ee.getState() == ExecutionState.RUNNING) {
//todo tasksToTrigger 該單位在StreamingJobGraphGenerator中初始化,
// 該變量的含義為需要觸發(fā)checkpoint的節(jié)點(diǎn)
//todo 如果任務(wù)運(yùn)行狀態(tài)處于 running 則將該任務(wù)添加到 執(zhí)行序列中
executions[i] = ee;
} else {
LOG.info(
"Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
job,
ExecutionState.RUNNING,
ee.getState());
throw new CheckpointException(
CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
}
}
return executions;
}
getAckTasks方法同上方法,檢查任務(wù)是否運(yùn)行,如果不是放棄檢查點(diǎn)
private Map<ExecutionAttemptID, ExecutionVertex> getAckTasks() throws CheckpointException {
Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);
for (ExecutionVertex ev : tasksToWaitFor) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
ackTasks.put(ee.getAttemptId(), ev);
} else {
LOG.info(
"Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.",
ev.getTaskNameWithSubtaskIndex(),
job);
throw new CheckpointException(
CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
}
}
return ackTasks;
}
createPendingCheckpoint ,在該方法中,在創(chuàng)建PendingCheckpoint對(duì)象之前,進(jìn)行了預(yù)檢查, PendingCheckpoint對(duì)象的作用, 掛起的檢查點(diǎn)是已經(jīng)啟動(dòng)的檢查點(diǎn)阴孟,但是還沒有被所有需要確認(rèn)它的任務(wù)確認(rèn)耘眨。一旦所有任務(wù)都確認(rèn)了它昼榛,它就變成了一個(gè){@link CompletedCheckpoint}。 創(chuàng)建PendingCheckpoint對(duì)象后,設(shè)置跟蹤此PendingCheckpoint的回調(diào) 在線程鎖中,將心創(chuàng)建的PendingCheckpoint添加到pendingCheckpoints集合中,該集合存儲(chǔ)著待處理的PendingCheckpoint,然后設(shè)置一個(gè)定時(shí)任務(wù),在給定的延遲之后執(zhí)行給定的命令,最后將 PendingCheckpoint對(duì)象返回
private PendingCheckpoint createPendingCheckpoint(
long timestamp,
CheckpointProperties props,
Map<ExecutionAttemptID, ExecutionVertex> ackTasks,
boolean isPeriodic,
long checkpointID,
CheckpointStorageLocation checkpointStorageLocation,
CompletableFuture<CompletedCheckpoint> onCompletionPromise) {
synchronized (lock) {
// todo 創(chuàng)建 pendingCheckpoint 前 先進(jìn)行檢查
try {
// since we haven't created the PendingCheckpoint yet, we need to check the
// global state here.
preCheckGlobalState(isPeriodic);
} catch (Throwable t) {
throw new CompletionException(t);
}
}
//創(chuàng)建 PendingCheckpoint
final PendingCheckpoint checkpoint = new PendingCheckpoint(
job,
checkpointID,
timestamp,
ackTasks,
OperatorCoordinatorCheckpointContext.getIds(coordinatorsToCheckpoint),
masterHooks.keySet(),
props,
checkpointStorageLocation,
executor,
onCompletionPromise);
if (statsTracker != null) {
//創(chuàng)建一個(gè) pending checkpoint 的跟蹤器
PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint(
checkpointID,
timestamp,
props);
// 設(shè)置跟蹤此pending checkpoint的回調(diào)
checkpoint.setStatsCallback(callback);
}
synchronized (lock) {
//將掛起的checkpoints 加入集合
pendingCheckpoints.put(checkpointID, checkpoint);
//在給定的延遲之后執(zhí)行給定的命令
ScheduledFuture<?> cancellerHandle = timer.schedule(
new CheckpointCanceller(checkpoint), // todo 要執(zhí)行的任務(wù)
checkpointTimeout, // todo 從現(xiàn)在開始推遲執(zhí)行的時(shí)間
TimeUnit.MILLISECONDS); // todo 延遲參數(shù)的時(shí)間單位
if (!checkpoint.setCancellerHandle(cancellerHandle)) { //設(shè)置句柄
// checkpoint is already disposed!
cancellerHandle.cancel(false);
}
}
//在Flink wen監(jiān)控頁(yè)面中 可以看到
//2020-05-14 14:44:08,203 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
// - Triggering checkpoint 1 @ 1589438648198 for job b94ef58349d8befba6412e3d85478bf5.
LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job);
return checkpoint;
}
最主要的方法 snapshotTaskState () 在該方法中,具體邏輯都在這里 現(xiàn)在看一下這個(gè)方法的實(shí)現(xiàn), 首先創(chuàng)建CheckpointOptions對(duì)象,該對(duì)象執(zhí)行檢查點(diǎn)的選項(xiàng) 獲取checkpoint類型和存儲(chǔ)位置配置
然后開始觸發(fā)所有tasksToTrigger的checkpoint創(chuàng)建過程,在觸發(fā)的時(shí)候會(huì)根據(jù)是否時(shí)異步的調(diào)用不同的方法,但是兩個(gè)方法最終都會(huì)調(diào)用 Execution的triggerCheckpointHelper方法
private void snapshotTaskState(
long timestamp,
long checkpointID,
CheckpointStorageLocation checkpointStorageLocation,
CheckpointProperties props,
Execution[] executions,
boolean advanceToEndOfTime) {
// 該對(duì)象 執(zhí)行檢查點(diǎn)的選項(xiàng) 獲取checkpoint類型和存儲(chǔ)位置配置
final CheckpointOptions checkpointOptions = new CheckpointOptions(
props.getCheckpointType(), //獲取 類型 是檢查點(diǎn) 還是 保存點(diǎn)
checkpointStorageLocation.getLocationReference(), // 獲取存儲(chǔ)位置的引用
isExactlyOnceMode,
isUnalignedCheckpoint);
// send the messages to the tasks that trigger their checkpoint
// todo 觸發(fā)所有tasksToTrigger的checkpoint創(chuàng)建過程
for (Execution execution : executions) {
// todo 觸發(fā)checkpoint的入口
if (props.isSynchronous()) { //同步的
execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
} else { //非同步的
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
}
}
由于我們平時(shí)開發(fā)都是使用異步的檢查點(diǎn),所以進(jìn)入triggerSynchronousSavepoint方法, 該方法調(diào)用triggerCheckpointHelper方法
public void triggerSynchronousSavepoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {
triggerCheckpointHelper(checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime);
}
進(jìn)入triggerCheckpointHelper方法,在該方法中,會(huì)先獲取slot表示邏輯槽表示任務(wù)管理器上的資源剔难,可以將單個(gè)任務(wù)部署到該資源中
獲取TaskManagerGateway對(duì)象,這里的對(duì)象為RpcTaskManagerGateway類型,RpcTaskManagerGateway是TaskManagerGateway的實(shí)現(xiàn)類
然后調(diào)用taskManagerGateway的triggerCheckpoint方法,進(jìn)行觸發(fā)checkpoint
private void triggerCheckpointHelper(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {
final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
//只有同步保存點(diǎn)才允許將水印提升到最大值
throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
}
//獲取 slot
final LogicalSlot slot = assignedResource;
if (slot != null) {
//獲取task manager gateway
//todo 這里taskManagerGateway是RpcTaskManagerGateway類型
//返回 與TaskManager對(duì)話的網(wǎng)關(guān)
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
//todo 觸發(fā)一個(gè)保存點(diǎn)
taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime);
} else {
// 日志輸出 執(zhí)行沒有分配槽胆屿。這表示執(zhí)行不再運(yùn)行
LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running.");
}
}
進(jìn)入taskManagerGateway的triggerCheckpoint方法,該方法調(diào)用了taskExecutorGateway的同名方法, taskExecutorGateway是TaskExecutor類型
TaskExecutor是TaskExecutorGateway接口的實(shí)現(xiàn)類
@Override
public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {
taskExecutorGateway.triggerCheckpoint(
executionAttemptID,
checkpointId,
timestamp,
checkpointOptions,
advanceToEndOfEventTime);
}
下一篇
Flink-1.10 源碼筆記 checkpint - 2
如有錯(cuò)誤,歡迎指正!