Flink源碼閱讀之Checkpoint周期觸發(fā)過程

Flink的checkpoint原理就不說了,官網以及博客都有說明,有興趣的同學可以自行查閱嚎莉。
本文主要從源碼層面分析一下checkpoint是如何周期性觸發(fā)的倚聚。

分析

首先通過如下配置啟用CheckPoint

env.enableCheckpointing(1000);

不設置佛玄,則默認CheckPoint間隔為-1,即不啟用CheckPoint

/** Periodic checkpoint triggering interval. */
private long checkpointInterval = -1; // disabled

如不設置則在構建jobGraph時checkpointInterval 會被賦值為Long.MAX_VALUE
StreamingJobGraphGenerator#configureCheckpointing

long interval = cfg.getCheckpointInterval();
if (interval < MINIMAL_CHECKPOINT_TIME) {
    // interval of max value means disable periodic checkpoint
    interval = Long.MAX_VALUE;
}

同時會初始化三個列表:

// collect the vertices that receive "trigger checkpoint" messages.
        // currently, these are all the sources
        List<JobVertexID> triggerVertices = new ArrayList<>();

        // collect the vertices that need to acknowledge the checkpoint
        // currently, these are all vertices
        List<JobVertexID> ackVertices = new ArrayList<>(jobVertices.size());

        // collect the vertices that receive "commit checkpoint" messages
        // currently, these are all vertices
        List<JobVertexID> commitVertices = new ArrayList<>(jobVertices.size());

其中, triggerVertices 只包含那些作為 source 的節(jié)點,ackVertices 和 commitVertices 均包含所有的節(jié)點赘娄。

checkpoint的進行是由CheckpointCoordinator發(fā)起的,在 ExecutionGraphBuilder#buildGraph 中宏蛉,如果作業(yè)開啟了 checkpoint遣臼,則會調用 ExecutionGraph.enableCheckpointing() 方法, 這里會創(chuàng)建 CheckpointCoordinator 對象,并注冊一個作業(yè)狀態(tài)的監(jiān)聽 CheckpointCoordinatorDeActivator, CheckpointCoordinatorDeActivator 會在作業(yè)狀態(tài)發(fā)生改變時得到通知拾并。

ExecuteGraph#enableCheckpointing
checkpointCoordinator = new CheckpointCoordinator(...);

// interval of max long value indicates disable periodic checkpoint,
// the CheckpointActivatorDeactivator should be created only if the interval is not max value
if (interval != Long.MAX_VALUE) {
   // the periodic checkpoint scheduler is activated and deactivated as a result of
   // job status changes (running -> on, all other states -> off)
   registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
}

當作業(yè)狀態(tài)發(fā)送變更時揍堰,CheckpointCoordinatorDeActivator 會得到通知并執(zhí)行notifyJobStatusChange

//ExecuteGraph.java
private void notifyJobStatusChange(JobStatus newState, Throwable error) {
   if (jobStatusListeners.size() > 0) {
      final long timestamp = System.currentTimeMillis();
      final Throwable serializedError = error == null ? null : new SerializedThrowable(error);

      for (JobStatusListener listener : jobStatusListeners) {
         try {
            listener.jobStatusChanges(getJobID(), newState, timestamp, serializedError);
         } catch (Throwable t) {
            LOG.warn("Error while notifying JobStatusListener", t);
         }
      }
   }
}

//CheckpointCoordinatorDeActivator.java
public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
   if (newJobStatus == JobStatus.RUNNING) {
      // start the checkpoint scheduler
      coordinator.startCheckpointScheduler();
   } else {
      // anything else should stop the trigger for now
      coordinator.stopCheckpointScheduler();
   }
}

開始觸發(fā)checkpoint調度

    // --------------------------------------------------------------------------------------------
    //  Periodic scheduling of checkpoints
    // --------------------------------------------------------------------------------------------
public void startCheckpointScheduler() {
        synchronized (lock) {
            if (shutdown) {
                throw new IllegalArgumentException("Checkpoint coordinator is shut down");
            }

            // make sure all prior timers are cancelled
            stopCheckpointScheduler();

            periodicScheduling = true;
            currentPeriodicTrigger = scheduleTriggerWithDelay(getRandomInitDelay());
        }
    }
private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {
        return timer.scheduleAtFixedRate(
            new ScheduledTrigger(),
            initDelay, baseInterval, TimeUnit.MILLISECONDS);
    }

new ScheduledTrigger()這是調度線程,這里也是用的ScheduledThreadPoolExecutor線程池來調度線程執(zhí)行嗅义,和周期性生成水印調度一樣屏歹。run方法如下

private final class ScheduledTrigger implements Runnable {

        @Override
        public void run() {
            try {
                triggerCheckpoint(System.currentTimeMillis(), true);
            }
            catch (Exception e) {
                LOG.error("Exception while triggering checkpoint for job {}.", job, e);
            }
        }
    }

定時觸發(fā)checkpoint,具體執(zhí)行checkpoint過程在

public CheckpointTriggerResult triggerCheckpoint(long timestamp, CheckpointProperties props, @Nullable String externalSavepointLocation, boolean isPeriodic)

具體觸發(fā)checkpoint執(zhí)行的過程之碗,后面文章再作分析西采。

總結

具體的過程包括以下幾點:

  1. 通過env配置checkpoint的間隔,即開啟checkpoint继控。
  2. 在構建jobgraph時進行checkpoint相關配置。
  3. 構建executiongraph時初始化CheckpointCoordinator 對象并注冊CheckpointCoordinatorDeActivator監(jiān)聽胖眷。
  4. 作業(yè)狀態(tài)發(fā)生變化時武通,開啟checkpoint調度。
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末珊搀,一起剝皮案震驚了整個濱河市冶忱,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌境析,老刑警劉巖囚枪,帶你破解...
    沈念sama閱讀 212,383評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異劳淆,居然都是意外死亡链沼,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,522評論 3 385
  • 文/潘曉璐 我一進店門沛鸵,熙熙樓的掌柜王于貴愁眉苦臉地迎上來括勺,“玉大人缆八,你說我怎么就攤上這事〖埠矗” “怎么了奈辰?”我有些...
    開封第一講書人閱讀 157,852評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長乱豆。 經常有香客問我奖恰,道長,這世上最難降的妖魔是什么宛裕? 我笑而不...
    開封第一講書人閱讀 56,621評論 1 284
  • 正文 為了忘掉前任瑟啃,我火速辦了婚禮,結果婚禮上续滋,老公的妹妹穿的比我還像新娘翰守。我一直安慰自己,他們只是感情好疲酌,可當我...
    茶點故事閱讀 65,741評論 6 386
  • 文/花漫 我一把揭開白布蜡峰。 她就那樣靜靜地躺著,像睡著了一般朗恳。 火紅的嫁衣襯著肌膚如雪湿颅。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,929評論 1 290
  • 那天粥诫,我揣著相機與錄音油航,去河邊找鬼。 笑死怀浆,一個胖子當著我的面吹牛谊囚,可吹牛的內容都是我干的。 我是一名探鬼主播执赡,決...
    沈念sama閱讀 39,076評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼镰踏,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了沙合?” 一聲冷哼從身側響起奠伪,我...
    開封第一講書人閱讀 37,803評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎首懈,沒想到半個月后绊率,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經...
    沈念sama閱讀 44,265評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡究履,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,582評論 2 327
  • 正文 我和宋清朗相戀三年滤否,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片挎袜。...
    茶點故事閱讀 38,716評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡顽聂,死狀恐怖肥惭,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情紊搪,我是刑警寧澤蜜葱,帶...
    沈念sama閱讀 34,395評論 4 333
  • 正文 年R本政府宣布,位于F島的核電站耀石,受9級特大地震影響牵囤,放射性物質發(fā)生泄漏。R本人自食惡果不足惜滞伟,卻給世界環(huán)境...
    茶點故事閱讀 40,039評論 3 316
  • 文/蒙蒙 一揭鳞、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧梆奈,春花似錦野崇、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,798評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至清酥,卻和暖如春扶镀,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背焰轻。 一陣腳步聲響...
    開封第一講書人閱讀 32,027評論 1 266
  • 我被黑心中介騙來泰國打工臭觉, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人辱志。 一個月前我還...
    沈念sama閱讀 46,488評論 2 361
  • 正文 我出身青樓蝠筑,卻偏偏與公主長得像,于是被迫代替她去往敵國和親揩懒。 傳聞我的和親對象是個殘疾皇子菱肖,可洞房花燭夜當晚...
    茶點故事閱讀 43,612評論 2 350