本文內(nèi)容是基于Flink 1.9來講解纱意。Flink使用checkpoint檢查點來保證Exactly-Once語義的琅绅,這篇文章會從源碼角度分析下checkpoint怎么觸發(fā)的厦幅。首先說下checkpoint原理契讲,可以直接參考數(shù)據(jù)流容錯官方文檔唬党。
概述
Flink分布式快照的一個核心概念就是barrier,barrier會和數(shù)據(jù)記錄一起下發(fā)到stream中橙垢,是一種特殊的消息,非常輕量伦糯。一個barrier就能把數(shù)據(jù)記錄分到不同的snapshot中柜某,不同snapshot對應(yīng)的barrier可能同時在stream中嗽元,這也就意味著可以同時制作多個snapshot。
下面這兩個圖給出了barrier分割snapshot的原理喂击,以及checkpoint制作過程
checkpoint制作
1. 源碼入口 ExecutionGraphBuilder#buildGraph
在生成ExecutionGraph的時候剂癌,如果設(shè)置了開啟checkpoint,那在buildGraph方法中會調(diào)用executionGraph.enableCheckpointing方法翰绊,咱們接著看下這個方法做了什么珍手。
- 構(gòu)建CheckpointCoordinator對象。CheckpointCoordinator主要負責(zé)觸發(fā)checkpoint制作并接收task的ack信息辞做,收集并維護task發(fā)送的ack state全局視圖琳要。
- 設(shè)置checkpoint狀態(tài)追蹤器
- 創(chuàng)建CheckpointCoordinatorDeActivator對象,該對象會監(jiān)控JobStatus是activates/deactivates來啟動/停止checkpoint scheduler
2. CheckpointCoordinatorDeActivator是怎么觸發(fā)checkpoint制作的
2.1 看下CheckpointCoordinatorDeActivator#jobStatusChanges方法
@Override
public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
if (newJobStatus == JobStatus.RUNNING) {
// start the checkpoint scheduler
coordinator.startCheckpointScheduler();
} else {
// anything else should stop the trigger for now
coordinator.stopCheckpointScheduler();
}
}
如果監(jiān)控到JobStatus變成RUNNING秤茅,就會調(diào)用CheckpointCoordinator的startCheckpointScheduler方法稚补,最終會調(diào)用CheckpointCoordinator#triggerCheckpoint方法,triggerCheckpoint主要做了以下幾件事情
- 制作checkpoint之前框喳,會做一些檢查
?? - 排隊checkpoint數(shù)是否超過設(shè)定的允許最大并行checkpoint數(shù)
?? - 兩次checkpoint制作間隔是否滿足要求 - check需要觸發(fā)checkpoint所有source task是不是都是running狀態(tài)
- check需要對checkpoint進行ack的所有task(包括source+非source)是不是都是running狀態(tài)
- 對checkpointID加1
- 初始化CheckpointStorageLocation
- 往pendingCheckpoints這個map中新增一個PendingCheckpoint课幕, key是checkpointID。 PendingCheckpoint表示checkpoint制作已經(jīng)開始五垮,不過部分task還沒有進行ack乍惊,一旦所有的task都對這次checkpoint進行了ack,PendingCheckpoint就變成了CompletedCheckpoint放仗。
- 注冊cancellerHandle润绎,比如checkpoint超時就cancel掉
- 對于所有的source task,調(diào)用execution.triggerCheckpoint方法
從全局看了checkpoint怎么觸發(fā)的诞挨,然后再看下具體Task粒度怎么做checkpoint的莉撇,入口就是上面提到的execution.triggerCheckpoint,真正調(diào)用是Task#triggerCheckpointBarrier方法
2.2 SourceTask怎么觸發(fā)checkpoint的
Task#triggerCheckpointBarrier方法會再去調(diào)用StreamTask#performCheckpoint方法惶傻,接著看StreamTask#performCheckpoint方法源碼
if (isRunning) {
if (checkpointOptions.getCheckpointType().isSynchronous()) {
syncSavepointLatch.setCheckpointId(checkpointId);
if (advanceToEndOfTime) {
advanceToEndOfEventTime();
}
}
// All of the following steps happen as an atomic step from the perspective of barriers and
// records/watermarks/timers/callbacks.
// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
// checkpoint alignments
// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
// The pre-barrier work should be nothing or minimal in the common case.
operatorChain.prepareSnapshotPreBarrier(checkpointId);
// Step (2): Send the checkpoint barrier downstream
operatorChain.broadcastCheckpointBarrier(
checkpointId,
checkpointMetaData.getTimestamp(),
checkpointOptions);
// Step (3): Take the state snapshot. This should be largely asynchronous, to not
// impact progress of the streaming topology
checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
return true;
}
從源碼可以看出棍郎,checkpoint的制作主流程是異步來執(zhí)行的,主要分為
- checkpoint制作前的準(zhǔn)備银室,允許operators可以做一些前置工作涂佃,這個前置工作最好是沒有,也可以有很小一部分
- 往下游發(fā)barrier(向該Source Task的所有輸出蜈敢,以廣播的方式發(fā)送barrier辜荠,CheckpointBarrier有三個對象組成checkpointId+timestamp+CheckpointOptions(CheckpointType和CheckpointStorageLocationReference))
- 開始做checkpoint,這個步驟應(yīng)該盡可能的異步扶认,以免影響job執(zhí)行
2.3 SourceTask怎么制作checkpoint的
checkpoint制作跟進去看下StreamTask#executeCheckpointing代碼
startSyncPartNano = System.nanoTime();
try {
// 1. 同步執(zhí)行侨拦,會依次調(diào)用每一個operator的StreamOperator.snapshotState,返回結(jié)果是一個runnable future辐宾。
// 根據(jù)checkpoint配置成同步模式和異步模式的區(qū)別狱从,這個future可能處于完成狀態(tài)膨蛮,也可能處于未完成狀態(tài)
for (StreamOperator<?> op : allOperators) {
checkpointStreamOperator(op);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",
checkpointMetaData.getCheckpointId(), owner.getName());
}
startAsyncPartNano = System.nanoTime();
checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);
// 2. 異步執(zhí)行,如果是checkpoint配置成同步執(zhí)行季研,這里實際上所有的runnable future都是已經(jīng)完成的狀態(tài)
// we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
owner,
operatorSnapshotsInProgress,
checkpointMetaData,
checkpointMetrics,
startAsyncPartNano);
owner.cancelables.registerCloseable(asyncCheckpointRunnable);
owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);
if (LOG.isDebugEnabled()) {
LOG.debug("{} - finished synchronous part of checkpoint {}. " +
"Alignment duration: {} ms, snapshot duration {} ms",
owner.getName(), checkpointMetaData.getCheckpointId(),
checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
checkpointMetrics.getSyncDurationMillis());
}
}
其實checkpoint制作分為兩部分
- 同步checkpoint
依次調(diào)用每一個operator的StreamOperator.snapshotState敞葛,返回一個OperatorSnapshotFutures,并將其放入operatorSnapshotsInProgress与涡。各operator的snapshotState方法的實現(xiàn)是一樣的惹谐,可以看下AbstractStreamOperator#snapshotState,這個方法首先創(chuàng)建了一個OperatorSnapshotFutures驼卖,并且最終會把OperatorSnapshotFutures return出去氨肌,看下OperatorSnapshotFutures的成員變量,來大體了解下snapshotState主要是存什么內(nèi)容
@Nonnull
private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateManagedFuture;
@Nonnull
private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateRawFuture;
@Nonnull
private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateManagedFuture;
@Nonnull
private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateRawFuture;
從成員變量就可以看出酌畜,snapshot中主要存儲四部分內(nèi)容怎囚,分別為keyed manage state, keyed raw state, operator manage state 和 operator raw state。
然后具體看下DefaultOperatorStateBackendSnapshotStrategy#snapshot中都做了什么桥胞?
- 把注冊的所有OperatorState(就是ListState) 和 BroadcastState 做深度拷貝
- 將實際的寫入操作封裝在一個異步的 FutureTask中恳守,如果不啟用異步checkpoint模式,那么這個 FutureTask 在同步階段就會立刻執(zhí)行贩虾。這個 FutureTask 的主要工作包括:
?? 1. 獲取注冊的operator state 以及 broadcast operator state 信息
?? 2. 把上面的state寫入checkpoint中
?? 3. 把狀態(tài)元信息寫入writtenStatesMetaData這個map中
?? 4. 關(guān)閉輸出流催烘,返回狀態(tài)句柄OperatorStreamStateHandle
- 異步checkpoint
同步checkpoint完成之后,會將具體的信息寫入異步實現(xiàn)缎罢,該流程可以看下AsyncCheckpointRunnable#run方法
- 調(diào)用new OperatorSnapshotFinalizer(snapshotInProgress);方法等待各future執(zhí)行完成
- 調(diào)用reportCompletedSnapshotStates方法完成checkpoint狀態(tài)上報JobManager伊群,也就是對checkpoint進行ack
2.4 非SourceTask是怎么觸發(fā)checkpoint的
對于SourceTask,checkpint的執(zhí)行是由JM觸發(fā)屁使,對于非SourceTask在岂,則是依靠上游Task的checkpointBarrier消息觸發(fā)。上面也提到了蛮寂,在SourceTask進行checkpoint時,會向下游發(fā)送CheckpointBarrier消息易茬,而下游的task正是拿到該消息后酬蹋,進行checkpoint操作。
我們知道所有的Task都是繼承StreamTask抽莱,在Task執(zhí)行的時候范抓,會去調(diào)用StreamTask#run方法,下面看下run方法邏輯
/**
* Runs the stream-tasks main processing loop.
*/
private void run() throws Exception {
final ActionContext actionContext = new ActionContext();
while (true) {
if (mailbox.hasMail()) {
Optional<Runnable> maybeLetter;
while ((maybeLetter = mailbox.tryTakeMail()).isPresent()) {
Runnable letter = maybeLetter.get();
if (letter == POISON_LETTER) {
return;
}
letter.run();
}
}
processInput(actionContext);
}
}
processInput是用來處理數(shù)據(jù)的食铐,StreamInputProcessor是處理數(shù)據(jù)記錄的接口匕垫,它有三個實現(xiàn)類,分別為StreamOneInputProcessor虐呻,StreamTwoInputProcessor和StreamTwoInputSelectableProcessor象泵。我們以StreamOneInputProcessor#processInput進行分析怎么處理barrier消息的
@Override
public boolean processInput() throws Exception {
initializeNumRecordsIn();
StreamElement recordOrMark = input.pollNextNullable();
if (recordOrMark == null) {
input.isAvailable().get();
return !checkFinished();
}
int channel = input.getLastChannel();
checkState(channel != StreamTaskInput.UNSPECIFIED);
processElement(recordOrMark, channel);
return true;
}
關(guān)于barrier的調(diào)用鏈?zhǔn)牵篿nput.pollNextNullable() --> StreamTaskNetworkInput#pollNextNullable --> checkpointedInputGate.pollNext() --> barrierHandler.processBarrier寞秃,CheckpointBarrierHandler有兩個實現(xiàn)類
- CheckpointBarrierAligner,對應(yīng)EXACTLY_ONCE語義
- CheckpointBarrierTracker偶惠,對應(yīng)AT_LEAST_ONCE語義
接下來看下CheckpointBarrierAligner#processBarrier方法春寿,涉及到以下處理流程
- 如果下游Task的InputChannels為1(消費的partition數(shù)為1),直接不需要對齊忽孽,return
- 第一個barrier到達Task绑改,調(diào)用beginNewAlignment方法開始checkpoint,然后調(diào)用onBarrier方法兄一,將numBarriersReceived加1厘线,把對應(yīng)的inputchannel的狀態(tài)blockedChannels[channelIndex]置為true,開始barrier對齊操作
- 不斷調(diào)用onBarrier方法出革,直到所有的barrier都到齊造壮。
- 等所有的barrier到齊之后,會進行 (1) 調(diào)用releaseBlocksAndResetBarriers蹋盆,該方法把blockedChannels的狀態(tài)都設(shè)置成false费薄,并且把numBarriersReceived設(shè)置成0,方便下一次checkpoint制作栖雾,barrier對齊結(jié)束楞抡。 (2) 調(diào)用notifyCheckpoint方法,開始該Task 的checkpoint制作析藕,checkpoint制作還是調(diào)用StreamTask#performCheckpoint方法召廷,涉及到 i): snapshot前準(zhǔn)備工作 ii:) 往下游廣播barrier iii:) 做快照
至此,checkpoint的觸發(fā)以及制作原理基本介紹完了
小結(jié)
本篇文章簡單介紹了下checkpoint是怎么被調(diào)用账胧,以及SourceTask怎么從開始被觸發(fā)竞慢,以及非SourceTask怎么觸發(fā)和制作checkpoint的。
- barrier是以Task也就是operatorChain為單位下發(fā)的
- snapshot中的state是以operator為單位制作的
- 非SourceTask只有把它消費的所有partition的barrier都接收到之后治泥,才會進行checkpoint的制作
推薦文章列表
checkpoint總結(jié)不錯的文章 Flink CheckPoint詳細過程