Flink的checkpoint原理就不說了,官網以及博客都有說明,有興趣的同學可以自行查閱嚎莉。
本文主要從源碼層面分析一下checkpoint是如何周期性觸發(fā)的倚聚。
分析
首先通過如下配置啟用CheckPoint
env.enableCheckpointing(1000);
不設置佛玄,則默認CheckPoint間隔為-1,即不啟用CheckPoint
/** Periodic checkpoint triggering interval. */
private long checkpointInterval = -1; // disabled
如不設置則在構建jobGraph時checkpointInterval 會被賦值為Long.MAX_VALUE
StreamingJobGraphGenerator#configureCheckpointing
long interval = cfg.getCheckpointInterval();
if (interval < MINIMAL_CHECKPOINT_TIME) {
// interval of max value means disable periodic checkpoint
interval = Long.MAX_VALUE;
}
同時會初始化三個列表:
// collect the vertices that receive "trigger checkpoint" messages.
// currently, these are all the sources
List<JobVertexID> triggerVertices = new ArrayList<>();
// collect the vertices that need to acknowledge the checkpoint
// currently, these are all vertices
List<JobVertexID> ackVertices = new ArrayList<>(jobVertices.size());
// collect the vertices that receive "commit checkpoint" messages
// currently, these are all vertices
List<JobVertexID> commitVertices = new ArrayList<>(jobVertices.size());
其中, triggerVertices 只包含那些作為 source 的節(jié)點,ackVertices 和 commitVertices 均包含所有的節(jié)點赘娄。
checkpoint的進行是由CheckpointCoordinator發(fā)起的,在 ExecutionGraphBuilder#buildGraph 中宏蛉,如果作業(yè)開啟了 checkpoint遣臼,則會調用 ExecutionGraph.enableCheckpointing() 方法, 這里會創(chuàng)建 CheckpointCoordinator 對象,并注冊一個作業(yè)狀態(tài)的監(jiān)聽 CheckpointCoordinatorDeActivator, CheckpointCoordinatorDeActivator 會在作業(yè)狀態(tài)發(fā)生改變時得到通知拾并。
ExecuteGraph#enableCheckpointing
checkpointCoordinator = new CheckpointCoordinator(...);
// interval of max long value indicates disable periodic checkpoint,
// the CheckpointActivatorDeactivator should be created only if the interval is not max value
if (interval != Long.MAX_VALUE) {
// the periodic checkpoint scheduler is activated and deactivated as a result of
// job status changes (running -> on, all other states -> off)
registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
}
當作業(yè)狀態(tài)發(fā)送變更時揍堰,CheckpointCoordinatorDeActivator 會得到通知并執(zhí)行notifyJobStatusChange
//ExecuteGraph.java
private void notifyJobStatusChange(JobStatus newState, Throwable error) {
if (jobStatusListeners.size() > 0) {
final long timestamp = System.currentTimeMillis();
final Throwable serializedError = error == null ? null : new SerializedThrowable(error);
for (JobStatusListener listener : jobStatusListeners) {
try {
listener.jobStatusChanges(getJobID(), newState, timestamp, serializedError);
} catch (Throwable t) {
LOG.warn("Error while notifying JobStatusListener", t);
}
}
}
}
//CheckpointCoordinatorDeActivator.java
public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
if (newJobStatus == JobStatus.RUNNING) {
// start the checkpoint scheduler
coordinator.startCheckpointScheduler();
} else {
// anything else should stop the trigger for now
coordinator.stopCheckpointScheduler();
}
}
開始觸發(fā)checkpoint調度
// --------------------------------------------------------------------------------------------
// Periodic scheduling of checkpoints
// --------------------------------------------------------------------------------------------
public void startCheckpointScheduler() {
synchronized (lock) {
if (shutdown) {
throw new IllegalArgumentException("Checkpoint coordinator is shut down");
}
// make sure all prior timers are cancelled
stopCheckpointScheduler();
periodicScheduling = true;
currentPeriodicTrigger = scheduleTriggerWithDelay(getRandomInitDelay());
}
}
private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {
return timer.scheduleAtFixedRate(
new ScheduledTrigger(),
initDelay, baseInterval, TimeUnit.MILLISECONDS);
}
new ScheduledTrigger()這是調度線程,這里也是用的ScheduledThreadPoolExecutor線程池來調度線程執(zhí)行嗅义,和周期性生成水印調度一樣屏歹。run方法如下
private final class ScheduledTrigger implements Runnable {
@Override
public void run() {
try {
triggerCheckpoint(System.currentTimeMillis(), true);
}
catch (Exception e) {
LOG.error("Exception while triggering checkpoint for job {}.", job, e);
}
}
}
定時觸發(fā)checkpoint,具體執(zhí)行checkpoint過程在
public CheckpointTriggerResult triggerCheckpoint(long timestamp, CheckpointProperties props, @Nullable String externalSavepointLocation, boolean isPeriodic)
具體觸發(fā)checkpoint執(zhí)行的過程之碗,后面文章再作分析西采。
總結
具體的過程包括以下幾點:
- 通過env配置checkpoint的間隔,即開啟checkpoint继控。
- 在構建jobgraph時進行checkpoint相關配置。
- 構建executiongraph時初始化CheckpointCoordinator 對象并注冊CheckpointCoordinatorDeActivator監(jiān)聽胖眷。
- 作業(yè)狀態(tài)發(fā)生變化時武通,開啟checkpoint調度。