Flink 源碼:Checkpoint 元數(shù)據(jù)詳解

本文僅為筆者平日學(xué)習(xí)記錄之用,侵刪
原文:https://mp.weixin.qq.com/s/KNVRs4k8nH8JM5sgOWd9Lw

本文是 Flink 源碼解析系列,通過閱讀本文你能 get 到以下點:

  • Flink 任務(wù)從 Checkpoint 處恢復(fù)流程概述
  • Checkpoint 元數(shù)據(jù)詳解
  • 從源碼層分析:JM 該如何合理地給每個 subtask 分配 State,讓 TM 去恢復(fù)

?
聲明:筆者的源碼分析都是基于 flink-1.9.0 release 分支尾序,其實閱讀源碼不用非常在意版本的問題,各版本的主要流程基本都是類似的。如果熟悉了某個版本的源碼潜必,之后新版本有變化,我們重點看一下變化之處即可沃但。
筆者閱讀源碼中會加很多中文注釋磁滚,對源碼感興趣且有需要的同學(xué)可以關(guān)注一下筆者的 github 倉庫:https://github.com/1996fanrui/flink/tree/feature/source-code-read-1-9-0
注釋都在 feature/source-code-read-1-9-0 分支,之后也會持續(xù)更新
?

閱讀本文之前宵晚,強烈建議閱讀《從 KeyGroup 到 Rescale》垂攘,本文講述 KeyedState 恢復(fù)時需要用到 KeyGroup 相關(guān)知識。

一淤刃、Job 從 Checkpoint 處恢復(fù)流程概述

Flink 任務(wù)從 Checkpoint 或 Savepoint 處恢復(fù)的整體流程簡單概述晒他,如下所示:

  • 首先客戶端提供 Checkpoint 或 Savepoint 的目錄
  • JM 從給定的目錄中找到 _metadata 文件(Checkpoint 的元數(shù)據(jù)文件)
  • JM 解析元數(shù)據(jù)文件,做一些校驗逸贾,將信息寫入到 zk 中陨仅,然后準備從這一次 Checkpoint 中恢復(fù)任務(wù)
  • JM 拿到所有算子對應(yīng)的 State,給各個 subtask 分配 StateHandle(狀態(tài)文件句柄)
  • TM 啟動時铝侵,也就是 StreamTask 的初始化階段會創(chuàng)建 KeyedStateBackend 和 OperatorStateBackend
  • 創(chuàng)建過程中就會根據(jù) JM 分配給自己的 StateHandle 從 dfs 上恢復(fù) State

由上述流程可知灼伤,F(xiàn)link 任務(wù)從 Checkpoint 恢復(fù)不只是說 TM 去 dfs 拉狀態(tài)文件即可,需要 JM 先給各個 TM 分配 State咪鲜,由于牽扯到修改并發(fā)颜阐,所以 JM 端給各個 subtask 分配 State 的流程也是比較復(fù)雜的披粟。本系列源碼分析會陸續(xù)分析上述所有列出的流程,東西比較多摘投。

本文從 Checkpoint 的元數(shù)據(jù)入手開始分析,同時分析一下 JM 拿到 Checkpoint 元數(shù)據(jù)后該如何合理地給每個 subtask 分配 State,讓 TM 去恢復(fù)。

二、 Checkpoint 元數(shù)據(jù)介紹

開始介紹 Checkpoint 元數(shù)據(jù)崔慧,這里是從元數(shù)據(jù)的設(shè)計角度來分析。后期在分析 Checkpoint 過程的源碼時穴墅,會詳細介紹這些元數(shù)據(jù)是如何一步步生成的惶室。

Checkpoint 完整的元數(shù)據(jù)

CompletedCheckpoint 封裝了一次 Checkpoint 完整的元數(shù)據(jù)信息,CompletedCheckpoint 類包含的屬性如下所示:

public class CompletedCheckpoint implements Serializable {
 private final JobID job;
 private final long checkpointID;
 private final long timestamp;
 private final long duration;

 /** 本次 Checkpoint 中每個算子的 ID 及算子對應(yīng) State 信息 */
 private final Map<OperatorID, OperatorState> operatorStates;
 private final CheckpointProperties props;
 private final Collection<MasterState> masterHookStates;
 // Checkpoint 存儲路徑
 private final CompletedCheckpointStorageLocation storageLocation;
 // 元數(shù)據(jù)句柄
 private final StreamStateHandle metadataHandle;
  // Checkpoint 目錄地址
 private final String externalPointer;
 private transient volatile CompletedCheckpointStats.DiscardCallback discardCallback;
}

CompletedCheckpoint 類的大部分屬性都是見名之意的玄货,重要的屬性就是本次 Checkpoint 中每個算子的 OperatorID 及算子對應(yīng) State 信息皇钞。再次強調(diào)源碼中的 OperatorState 這個類不是 Flink 中常說的 OperatorState,而是指代 Operator 算子對應(yīng)的 State 信息松捉。

算子級別的元數(shù)據(jù)

OperatorState 類的屬性如下所示:

public class OperatorState implements CompositeStateHandle {
 private final OperatorID operatorID;
  
 // checkpoint 時算子的并行度
 private final int parallelism;

 // checkpoint 時算子的 maxParallelism
 private final int maxParallelism;

 // 當(dāng)前 Operator 算子內(nèi)夹界,每個 subtask 持有的 State 信息,
 // 這里 map 對應(yīng)的 key 為 subtaskId隘世,value 為 subtask 對應(yīng)的 State,
 // OperatorState 表示一個 算子級別的可柿,OperatorSubtaskState 是 subtask 級別的。
 // 如果一個算子有 10 個并行度丙者,那么 OperatorState 有 10 個 OperatorSubtaskState
 private final Map<Integer, OperatorSubtaskState> operatorSubtaskStates;
}

OperatorState 中包含算子對應(yīng)的 OperatorID复斥,checkpoint 時算子的并行度和 maxParallelism。

?
注:這里專門強調(diào)是 Checkpoint 時的并行度和 maxParallelism械媒,因為這個 OperatorState 本來就是從 Checkpoint 中恢復(fù)出來的目锭,所以這些元數(shù)據(jù)都屬于 Checkpoint 發(fā)生時 Job 的一些屬性。
有可能新運行的 Job 調(diào)整了算子的并行度纷捞,當(dāng)然如果新 Job 的 maxParallelism 發(fā)生變化而且是人為設(shè)定痢虹,任務(wù)是無法恢復(fù)的(至于為什么無法恢復(fù),前面的源碼已經(jīng)分析過了)主儡。
?

OperatorState 中還使用一個 Map 保存當(dāng)前 Operator 算子內(nèi)每個 subtask 持有的 State 信息世分,這里 map 對應(yīng)的 key 為 subtaskId,value 為 subtask 對應(yīng)的 State缀辩。OperatorState 表示算子級別的 State 元數(shù)據(jù)信息,OperatorSubtaskState 表示 subtask 級別的 State 元數(shù)據(jù)信息踪央。如果一個算子有 10 個并行度臀玄,那么 OperatorState 內(nèi)就會包含 10 個 OperatorSubtaskState。

subtask 級別的元數(shù)據(jù)

OperatorSubtaskState 類的屬性如下所示:

public class OperatorSubtaskState implements CompositeStateHandle {

 private final StateObjectCollection<OperatorStateHandle> managedOperatorState;

 private final StateObjectCollection<OperatorStateHandle> rawOperatorState;

 private final StateObjectCollection<KeyedStateHandle> managedKeyedState;

 private final StateObjectCollection<KeyedStateHandle> rawKeyedState;

 private final long stateSize;
}

OperatorSubtaskState 類的屬性看起來非常明了畅蹂,Managed 兩種 Raw 兩種健无,Raw 這里不關(guān)注,所以這里重點關(guān)注 Managed 下的兩種 State液斜,即:managedOperatorState 和 managedKeyedState累贤。

?
因為沒用過 Raw 所以看源碼略過了叠穆,不過從源碼實現(xiàn)來看無論是 Raw 還是 Managed,他們的 Checkpoint 的元數(shù)據(jù)管理都是類似的臼膏,區(qū)別主要在于使用上硼被。
?

managedOperatorState 元數(shù)據(jù)維護在 OperatorStateHandle 中,managedKeyedState 元數(shù)據(jù)存儲維護在 KeyedStateHandle 中渗磅。所以下面重點關(guān)注 OperatorStateHandle 和 KeyedStateHandle嚷硫,這兩部分內(nèi)容較多,所以另外開了大標(biāo)題始鱼。

這里同時留一個小疑問:OperatorSubtaskState 中維護的所有狀態(tài)句柄仔掸,都是一個 Collection 集合,為什么是集合呢医清?稍后回答起暮。

三、 OperatorStateHandle 介紹

OperatorStateHandle 是個接口会烙,它只有一種實現(xiàn)负懦,即:OperatorStreamStateHandle。所以具體分析 OperatorStreamStateHandle持搜。

OperatorStreamStateHandle 相關(guān)源碼如下所示:

public class OperatorStreamStateHandle implements OperatorStateHandle {
  // map 中 key 是 StateName密似,value 是 StateMetaInfo
 // StateMetaInfo 中封裝的是當(dāng)前 State 在狀態(tài)文件所處的 offset 和 Mode
 private final Map<String, StateMetaInfo> stateNameToPartitionOffsets;
  
 // OperatorState 狀態(tài)文件句柄,可以讀出狀態(tài)數(shù)據(jù)
 private final StreamStateHandle delegateStateHandle;
}

// OperatorState 分布模式的枚舉
enum Mode {
 // 對應(yīng) getListState API
 SPLIT_DISTRIBUTE,
 // 對應(yīng) getUnionListState API
 UNION,
 // 對應(yīng) BroadcastState
 BROADCAST
}

class StateMetaInfo implements Serializable {
  // 當(dāng)前 State 在狀態(tài)文件所處的 offset 和 Mode
 private final long[] offsets;
  // OperatorState 的分布模式
 private final Mode distributionMode;
}

OperatorStreamStateHandle 維護了 OperatorState 狀態(tài)文件句柄葫盼,根據(jù) StreamStateHandle 可以讀出狀態(tài)文件的數(shù)據(jù)残腌,即當(dāng)前 subtask 可以從這個文件中讀取狀態(tài)數(shù)據(jù)。OperatorStreamStateHandle 還維護了一個 map贫导,map 中 key 是 StateName抛猫,value 是 StateMetaInfo。StateMetaInfo 中封裝的是當(dāng)前 State 在狀態(tài)文件所處的 offset 和 Mode孩灯。這里有了文件和 offset闺金,就可以讀出所有 State 的狀態(tài)數(shù)據(jù)了。

在介紹一些 Mode 這個枚舉峰档,Mode 表示 OperatorState 分布模式的枚舉败匹,有三種類型,其中前兩種 SPLIT_DISTRIBUTEUNION 都對應(yīng)的是 ListState讥巡,只不過恢復(fù)模式不同掀亩。

  • SPLIT_DISTRIBUTE 表示每個 subtask 只獲取一部分狀態(tài)數(shù)據(jù),即:所有 subtask 的狀態(tài)加起來是一份全量的欢顷。
  • UNION 表示每個 subtask 獲取一份全量的狀態(tài)數(shù)據(jù)槽棍。

Mode 還有一種類型是 BROADCAST,對應(yīng)的是 Flink 中的 BroadcastState。

思考題

這里再拋出分析 OperatorSubtaskState 源碼時留下的問題:OperatorSubtaskState 中維護的所有狀態(tài)句柄炼七,都是一個 Collection 集合缆巧,例如 managedOperatorState 的類型是 StateObjectCollection<OperatorStateHandle> ,為什么這里是集合而不直接是 OperatorStateHandle 呢豌拙?難道 OperatorStateHandle 不能把當(dāng)前 subtask 的所有 managedOperatorState 封裝起來嗎陕悬?

答:OperatorStateHandle 內(nèi)維護了一個 map,保存了 Checkpoint 時當(dāng)前 Operator 當(dāng)前 subtask 內(nèi)所有 managedOperatorState 的元數(shù)據(jù)信息姆蘸。其實這里可以不用集合墩莫,一個 OperatorStateHandle 就足以保存 managedOperatorState 的元數(shù)據(jù)信息了。OperatorSubtaskState 內(nèi)封裝的是 OperatorStateHandle 的集合逞敷,其實 Checkpoint 生成元信息構(gòu)造 OperatorSubtaskState 時狂秦,給 OperatorSubtaskState 傳遞的也不是 OperatorStateHandle 的集合,傳遞的就是一個 OperatorStateHandle推捐。只不過 OperatorSubtaskState 構(gòu)造器內(nèi)將 OperatorStateHandle 封裝成了集合裂问。

我們可以看一下 OperatorSubtaskState 構(gòu)造器源碼:

public class OperatorSubtaskState implements CompositeStateHandle {

  // 集合構(gòu)造器
 public OperatorSubtaskState(
  @Nonnull StateObjectCollection<OperatorStateHandle> managedOperatorState,
  @Nonnull StateObjectCollection<OperatorStateHandle> rawOperatorState,
  @Nonnull StateObjectCollection<KeyedStateHandle> managedKeyedState,
  @Nonnull StateObjectCollection<KeyedStateHandle> rawKeyedState) {

  this.managedOperatorState = Preconditions.checkNotNull(managedOperatorState);
  this.rawOperatorState = Preconditions.checkNotNull(rawOperatorState);
  this.managedKeyedState = Preconditions.checkNotNull(managedKeyedState);
  this.rawKeyedState = Preconditions.checkNotNull(rawKeyedState);
 }

  // 非集合構(gòu)造器
 public OperatorSubtaskState(
  @Nullable OperatorStateHandle managedOperatorState,
  @Nullable OperatorStateHandle rawOperatorState,
  @Nullable KeyedStateHandle managedKeyedState,
  @Nullable KeyedStateHandle rawKeyedState) {

  this(
   singletonOrEmptyOnNull(managedOperatorState),
   singletonOrEmptyOnNull(rawOperatorState),
   singletonOrEmptyOnNull(managedKeyedState),
   singletonOrEmptyOnNull(rawKeyedState));
 }

 private static <T extends StateObject> StateObjectCollection<T> singletonOrEmptyOnNull(T element) {
  return element != null ? StateObjectCollection.singleton(element) : 
    StateObjectCollection.empty();
 }
}

可以看到 OperatorSubtaskState 有集合構(gòu)造器和非集合構(gòu)造器,非集合的構(gòu)造器會將單個的 StateHandle 封裝成集合牛柒,再調(diào)用集合構(gòu)造器堪簿。

重點:Checkpoint 封裝元數(shù)據(jù)時調(diào)用的就是非集合構(gòu)造器,即每個 Operator 的每個 subtask 對應(yīng)一個 managedOperatorState 的 StateHandle皮壁。問題在于為什么 OperatorSubtaskState 必須把這些 StateHandle 又封裝成集合呢椭更?

這么做為了從 Checkpoint 處恢復(fù)狀態(tài)。舉個例子蛾魄,假如一個 Flink 任務(wù)并行度是 2虑瀑,現(xiàn)在要將其并行度調(diào)為 1,整個流程如下:

  • 對并行度為 2 的 Flink Job 觸發(fā) Checkpoint滴须,生成快照
  • 并行度為 1 的 Flink Job 從 Checkpoint 處恢復(fù)即可

生成的快照是兩個 subtask舌狗,會生成兩個 managedOperatorState 的 StateHandle,而新的 Job 并行度為 1扔水,即:一個 subtask 要恢復(fù)兩個 managedOperatorState 的 StateHandle痛侍。基于此所以 OperatorSubtaskState 需要將 StateHandle 封裝為集合魔市。

講到這里主届,讀者應(yīng)該對 Flink Checkpoint 的元數(shù)據(jù)有一絲感覺了,其實 JM 后續(xù)分配 State 的流程就是給每個 subtask 合理的分配上述這些 StateHandle待德。每個 subtask 接受到的 StateHandle 就是自己要讀取的狀態(tài)數(shù)據(jù)岂膳。

四、 KeyedStateHandle 介紹

KeyedStateHandle 的子類比較多磅网,如下圖所示是 KeyedStateHandle 及其子類:

KeyedStateHandle 及其子類

其中 KeyGroupsStateHandle 最為常用,應(yīng)用于 Fs 或 RocksDB 的 Full 模式筷屡。

IncrementalRemoteKeyedStateHandle 應(yīng)用于 RocksDB 的 Increment 模式涧偷。

IncrementalLocalKeyedStateHandle 和 DirectoryKeyedStateHandle 是對 RocksDB Increment 模式的優(yōu)化簸喂。RocksDB 在 Increment 模式開啟 local-recovery,可以在本地目錄存放一份 State燎潮,當(dāng)從 Checkpoint 處恢復(fù)時喻鳄,不用去 dfs 去拉,而是直接從本地目錄恢復(fù) State确封。

?
附加小彩蛋:如果之前看過 TM 端基于 RocksDB Increment Remote 模式恢復(fù)任務(wù)時除呵,就會發(fā)現(xiàn)第一步是先去 dfs 去拉狀態(tài)文件,拉完文件后狀態(tài)文件就到了本地目錄爪喘,其實就變成了 Local 模式颜曾,第二步就復(fù)用 Local 模式的 Checkpoint 恢復(fù)邏輯。后續(xù)也會有 TM 端恢復(fù) State 的源碼分析秉剑。
?

多種實現(xiàn)類泛豪,下面具體分析一下 KeyGroupsStateHandle 和 IncrementalRemoteKeyedStateHandle。Increment Local 模式本文就不分析了侦鹏,Increment Local 模式與 Remote 模式比較類似诡曙,區(qū)別在于 Local 保存的文件句柄變成了本地的一個目錄而已。

KeyGroupsStateHandle 介紹

KeyGroupsStateHandle 類相關(guān)源碼如下所示:

public class KeyGroupsStateHandle implements StreamStateHandle, KeyedStateHandle {
 // KeyedState 狀態(tài)文件句柄略水,可以讀出狀態(tài)數(shù)據(jù)
 private final StreamStateHandle stateHandle;
 // KeyGroupRangeOffsets 封裝了當(dāng)前負責(zé)的 KeyGroupRange
  // 及 KeyGroupRange 中每個 KeyGroup 對應(yīng)的 State 在 stateHandle 的 offset 位置
 private final KeyGroupRangeOffsets groupRangeOffsets;
}

public class KeyGroupRangeOffsets implements Iterable<Tuple2<Integer, Long>> 
        , Serializable {
 // 當(dāng)前 Operator 當(dāng)前 subtask 負責(zé)的 KeyGroupRange
 private final KeyGroupRange keyGroupRange;

 // 數(shù)組保存了每個 KeyGroup 對應(yīng)的 offset价卤,
  // 所以:數(shù)組的長度 == keyGroupRange 中 KeyGroup 的數(shù)量
 private final long[] offsets;
}

KeyGroupsStateHandle 封裝了 KeyedState 狀態(tài)文件句柄,可以讀出狀態(tài)數(shù)據(jù)渊涝,同時還封裝了 KeyGroupRangeOffsets慎璧。KeyGroupRangeOffsets 封裝了當(dāng)前負責(zé)的 KeyGroupRange 及 KeyGroupRange 中每個 KeyGroup 對應(yīng)的 State 在 stateHandle 的 offset 位置。

也就是說 KeyGroupsStateHandle 可以讀取文件驶赏,也知道每個 KeyGroup 在文件中的 offset炸卑,理所當(dāng)然就可以在文件中讀到每個 KeyGroup 的狀態(tài)數(shù)據(jù)了。

在 《從 KeyGroup 到 Rescale》文章中講到 KeyedState 分發(fā)時是以 KeyGroup 為最小單元的煤傍,即:不可能將同一個 KeyGroup 的數(shù)據(jù)分到兩個 subtask 中盖文,所以恢復(fù)狀態(tài)時可以讀到每個 KeyGroup 的數(shù)據(jù)就足夠了,并不需要讀到每個 Key 的數(shù)據(jù)蚯姆。

KeyGroupsStateHandle 模式下五续,JM 該如何給各 subtask 分配 State?

這里順便可以想一下龄恋,KeyGroupsStateHandle 模式下疙驾,JM 會怎么去給 TM 分配 State 呢?

如果不修改并發(fā)郭毕,每個新的 subtask 負責(zé)的 KeyGroupRange 與之前舊的 subtask 相同它碎,所以 JM 直接把 Checkpoint 中保存的 KeyGroupsStateHandle 分配給新的給 subtask 即可。

修改并發(fā)的情況可能稍微復(fù)雜,舉個例子:現(xiàn)在當(dāng)前 subtask 負責(zé)的 KeyGroupRange(10,19) 即負責(zé)的 KeyGroupId 范圍是 10~19扳肛,那么 offsets[] 數(shù)組中就有 10 個元素傻挂,假設(shè) offsets 是 {100,110,120,130,140,150,160,170,180,190}。

新的任務(wù) subtask A 負責(zé) KeyGroupRange(10,14)挖息,subtask B 負責(zé) KeyGroupRange(15,19)金拒,那么 JM 會將這個狀態(tài)文件 KeyGroupsStateHandle 分發(fā)給 subtask A 和 subtask B,讓 subtask A 和 subtask B 都可以通過 KeyGroupsStateHandle 中的 stateHandle 讀到狀態(tài)文件套腹,但是發(fā)給 subtask A 和 subtask B 的 KeyGroupRangeOffsets 完全不一樣了绪抛。subtask A 收到的是 KeyGroupRange(10,14),offsets = {100,110,120,130,140}电禀;subtask B 收到的是 KeyGroupRange(15,19) offsets = {150,160,170,180,190}幢码。

當(dāng)然 subtask A 負責(zé)的 KeyGroupRange 不是這么規(guī)整,可能新的 subtask A 對應(yīng)的是 KeyGroupRange(8,14)鞭呕。這樣會存在:subtask A 恢復(fù)的 State 來自于舊任務(wù)的兩個 subtask蛤育。所以 subtask A 會收到兩個 KeyGroupsStateHandle。第一個 KeyGroupsStateHandle 負責(zé)的 KeyGroupRange(8,9)葫松,第二個 KeyGroupsStateHandle 負責(zé)的 KeyGroupRange(10,14)瓦糕。

其實上述過程就是 Flink 任務(wù)修改并發(fā)的情況下,狀態(tài)如何恢復(fù)腋么,如果不理解上述過程源碼是看不懂的咕娄,源碼只是對原理的一種實現(xiàn)而已,看源碼能幫助我們確認我們對知識的理解是否正確珊擂,也能吸取開源項目優(yōu)秀的設(shè)計思想圣勒。

這里也再次證實了為什么 OperatorSubtaskState 必須把 StateHandle 封裝成集合,因為一個新的 subtask 可能要恢復(fù)多個舊 subtask 的狀態(tài)數(shù)據(jù)摧扇。

IncrementalRemoteKeyedStateHandle 介紹

IncrementalRemoteKeyedStateHandle 應(yīng)用于 RocksDB 增量 Checkpoint 模式圣贸,所以在介紹 IncrementalRemoteKeyedStateHandle 中具體存儲的數(shù)據(jù)之前,先來描述一下 RocksDB 增量 Checkpoint 的實現(xiàn)原理扛稽。(之前寫過吁峻,這里直接粘過來)

RocksDB 增量 Checkpoint 實現(xiàn)原理

RocksDB 是一個基于 LSM 實現(xiàn)的 KV 數(shù)據(jù)庫。LSM 全稱 Log Structured Merge Trees在张,LSM 樹本質(zhì)是將大量的磁盤隨機寫操作轉(zhuǎn)換成磁盤的批量寫操作來極大地提升磁盤數(shù)據(jù)寫入效率用含。一般 LSM Tree 實現(xiàn)上都會有一個基于內(nèi)存的 MemTable 介質(zhì),所有的增刪改操作都是寫入到 MemTable 中帮匾,當(dāng) MemTable 足夠大以后啄骇,將 MemTable 中的數(shù)據(jù) flush 到磁盤中生成不可變且內(nèi)部有序的 ssTable(Sorted String Table)文件,全量數(shù)據(jù)保存在磁盤的多個 ssTable 文件中瘟斜。HBase 也是基于 LSM Tree 實現(xiàn)的缸夹,HBase 磁盤上的 HFile 就相當(dāng)于這里的 ssTable 文件痪寻,每次生成的 HFile 都是不可變的而且內(nèi)部有序的文件∶魑矗基于 ssTable 不可變的特性槽华,才實現(xiàn)了增量 Checkpoint,具體流程如下所示:

第一次 Checkpoint 時生成的狀態(tài)快照信息包含了兩個 sstable 文件:sstable1 和 sstable2 及 Checkpoint1 的元數(shù)據(jù)文件 MANIFEST-chk1趟妥,所以第一次 Checkpoint 時需要將 sstable1、sstable2 和 MANIFEST-chk1 上傳到外部持久化存儲中佣蓉。第二次 Checkpoint 時生成的快照信息為 sstable1披摄、sstable2、sstable3 及元數(shù)據(jù)文件 MANIFEST-chk2勇凭,由于 sstable 文件的不可變特性疚膊,所以狀態(tài)快照信息的 sstable1、sstable2 這兩個文件并沒有發(fā)生變化虾标,sstable1寓盗、sstable2 這兩個文件不需要重復(fù)上傳到外部持久化存儲中,因此第二次 Checkpoint 時璧函,只需要將 sstable3 和 MANIFEST-chk2 文件上傳到外部持久化存儲中即可傀蚌。這里只將新增的文件上傳到外部持久化存儲,也就是所謂的增量 Checkpoint蘸吓。

基于 LSM Tree 實現(xiàn)的數(shù)據(jù)庫為了提高查詢效率善炫,都需要定期對磁盤上多個 sstable 文件進行合并操作,合并時會將已刪除的库继、過期的以及舊版本的數(shù)據(jù)進行清理箩艺,從而降低 sstable 文件的總大小。圖中可以看到第三次 Checkpoint 時生成的快照信息為sstable3宪萄、sstable4艺谆、sstable5 及元數(shù)據(jù)文件 MANIFEST-chk3, 其中新增了 sstable4 文件且 sstable1 和 sstable2 文件合并成 sstable5 文件拜英,因此第三次 Checkpoint 時只需要向外部持久化存儲上傳 sstable4静汤、sstable5 及元數(shù)據(jù)文件 MANIFEST-chk3。

基于 RocksDB 的增量 Checkpoint 從本質(zhì)上來講每次 Checkpoint 時只將本次 Checkpoint 新增的快照信息上傳到外部的持久化存儲中聊记,依靠的是 LSM Tree 中 sstable 文件不可變的特性撒妈。

IncrementalRemoteKeyedStateHandle 元數(shù)據(jù)介紹

了解了基于 RocksDB 實現(xiàn)的增量 Checkpoint 原理,我們知道 Checkpoint 實際存儲的是 RocksDB 數(shù)據(jù)庫的 sst 文件和 RocksDB 數(shù)據(jù)庫的元數(shù)據(jù)文件排监。所以 IncrementalRemoteKeyedStateHandle 的設(shè)計如下所示:

public class IncrementalRemoteKeyedStateHandle implements IncrementalKeyedStateHandle {
  // 每個 RocksDB 數(shù)據(jù)庫的唯一 ID
 private final UUID backendIdentifier;

  // 這個 RocksDB 數(shù)據(jù)庫負責(zé)的 KeyGroupRange
 private final KeyGroupRange keyGroupRange;

 private final long checkpointId;

 // RocksDB 真正存儲數(shù)據(jù)的 sst 文件
 private final Map<StateHandleID, StreamStateHandle> sharedState;

 // RocksDB 數(shù)據(jù)庫的一些元數(shù)據(jù)
 private final Map<StateHandleID, StreamStateHandle> privateState;

 // 本次 Checkpoint 元數(shù)據(jù)的 StateHandle
 private final StreamStateHandle metaStateHandle;
}

// StateHandleID 只是簡單地對 sst 文件名做了封裝
public class StateHandleID extends StringBasedID {
 private static final long serialVersionUID = 1L;
 // keyString 為 sst 文件名
 public StateHandleID(String keyString) {
  super(keyString);
 }
}

注:StateHandleID 只是簡單地對 sst 文件名做了封裝狰右,所以后續(xù)用到 StateHandleID 的地方就直接說 sst 文件名。

IncrementalRemoteKeyedStateHandle 包含了對應(yīng)的 RocksDB 數(shù)據(jù)庫的一個標(biāo)識 ID舆床,負責(zé)的 KeyGroupRange嫁佳,本次 Checkpoint 的 ID谷暮,還有 RocksDB 的 sst 文件信息和 RocksDB 數(shù)據(jù)庫的一些元數(shù)據(jù)湿弦。

其中 sharedState 是一個 Map颊埃,Map 的 key 是 sst 文件名班利,value 為 sst 的文件句柄罗标。privateState 也是一個 map闯割,Map 的 key 是 RocksDB 數(shù)據(jù)庫元數(shù)據(jù)的文件名纽谒,value 為相對應(yīng)的文件句柄。

用上面案例來講央勒,第二次 Checkpoint崔步,即 Checkpoint Id 為 2缎谷,此時對應(yīng)的 sst 為 1.sst列林、2.sst希痴、3.sst砌创,RocksDB 元數(shù)據(jù)文件為 MANIFEST-chk2。雖然第二次 Checkpoint 時只上傳了一個 3.sst窥岩,但 sharedState 需要保存三個 sst 的信息,因為這三個 sst 都屬于本次 Checkpoint 的一部分數(shù)據(jù)文件疚鲤。

Increment 模式下,JM 該如何給各 subtask 分配 State语淘?

如果不修改并發(fā)惶翻,每個新的 subtask 負責(zé)的 KeyGroupRange 與之前舊的 subtask 相同鹅心,所以 JM 直接把 Checkpoint 中保存的 IncrementalRemoteKeyedStateHandle 分配給新的給 subtask 即可旭愧。TM 端恢復(fù)時直接把本次 Checkpoint 對應(yīng)的 sst 和 RocksDB 的元數(shù)據(jù)文件拉取到本地输枯,整個 RocksDB 數(shù)據(jù)庫就可以開始工作了桃熄。

修改并發(fā)的情況就比較復(fù)雜了瞳收,與 KeyGroupsStateHandle 模式有非常大的區(qū)別螟深。還是用修改并行度的例子來分析:

假設(shè)舊任務(wù)并發(fā)為 2:

  • subtask a 負責(zé) KeyGroupRange(0,9)
  • subtask b 負責(zé) KeyGroupRange(10,19)

新任務(wù)并發(fā)為 3:

  • subtask A 負責(zé) KeyGroupRange(0,6)
  • subtask B 負責(zé) KeyGroupRange(7,13)
  • subtask C 負責(zé) KeyGroupRange(14,19)

先分析一下新的 subtask A 和 subtask B 狀態(tài)該如何恢復(fù)卧惜,subtask C 與 subtask A 恢復(fù)流程極其相似咽瓷,讀者可以自行分析茅姜。subtask A 負責(zé)的 KeyGroupRange(0,6)钻洒,恢復(fù)時數(shù)據(jù)來源于 subtask a 負責(zé) KeyGroupRange(0,9)素标,但是可以拿著 StateHandle 只從 dfs 上讀取 KeyGroupRange(0,6) 的數(shù)據(jù)嗎寓免?

不行计维,我們看到了 IncrementalRemoteKeyedStateHandle 維護的只是 RocksDB 的一份數(shù)據(jù)快照蜈首,維護了一堆 sst欢策,實際上可能每個 sst 都有 KeyGroup 0~9 的數(shù)據(jù)猬腰,不能直接截斷 sst 去拉取 KeyGroup 0~6 的部分。而且 sst 是 RocksDB 自己生成的鼠冕,不是 Flink 序列化生成的懈费。

而 KeyGroupsStateHandle 模式可以做到只拉取部分數(shù)據(jù)是因為 KeyGroupsStateHandle 模式下是按照 KeyGroup 為單位對數(shù)據(jù)進行存儲的票罐,元數(shù)據(jù)中存儲了每個 KeyGroup 對應(yīng)的 offset 值。

那 IncrementalRemoteKeyedStateHandle 模式該如何恢復(fù)呢蚕礼?

  • 對于新的 subtask A 雖然只負責(zé) KeyGroup 0~6 的部分奠蹬,但必須將 KeyGroup 0~9 的數(shù)據(jù)全拉取到 TM 本地囤躁,基于這些數(shù)據(jù)建立出一個 RocksDB 實例割以,讀出自己想要的數(shù)據(jù)猜极。

  • 新的 subtask B 負責(zé) KeyGroupRange(7,13)跟伏,恢復(fù)時數(shù)據(jù)來源于舊 subtask a 負責(zé) KeyGroupRange(0,9) 和舊 subtask b 負責(zé) KeyGroupRange(10,19)携龟,所以需要將兩個 subtask 對應(yīng)的數(shù)據(jù)全部拉取到本地峡蟋,建立兩個 RocksDB 實例,讀取自己想要的數(shù)據(jù)蓬戚。

通過上述流程豫喧,其實發(fā)現(xiàn)了 JM 要做的就是:分析新的 subtask 與 Checkpoint 中保存的 StateHandle 負責(zé)的 KeyGroupRange 只要有重合,那么這個 StateHandle 就需要分配給新的 subtask鸟妙。例如:新的 subtask B(7~13)與舊的 (0,9) 和 (10,19)兩個 StateHandle 有重合郭厌,那么這兩個 StateHandle 都要完整地分配給 subtask B折柠。具體怎么讀取數(shù)據(jù),冗余數(shù)據(jù)的如何裁剪交給 TM 來做,在后續(xù) TM 恢復(fù)狀態(tài)部分詳細介紹(備注:裁剪的邏輯非常有意思)困乒。

以上就是 IncrementalRemoteKeyedStateHandle 模式下 JM 給 subtask 分配 StateHandle 的原理吱抚。

五、總結(jié)

本文開頭介紹了 Flink 任務(wù)從 Checkpoint 處恢復(fù)流程概述婚苹,隨后通過源碼介紹了 Checkpoint 的元數(shù)據(jù)。最后從源碼層分析了多種模式下:JM 該如何合理地給每個 subtask 分配 State谭企,讓 TM 去恢復(fù)廓译。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市债查,隨后出現(xiàn)的幾起案子非区,更是在濱河造成了極大的恐慌,老刑警劉巖盹廷,帶你破解...
    沈念sama閱讀 218,858評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件征绸,死亡現(xiàn)場離奇詭異,居然都是意外死亡俄占,警方通過查閱死者的電腦和手機管怠,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來缸榄,“玉大人渤弛,你說我怎么就攤上這事∩醮” “怎么了暮芭?”我有些...
    開封第一講書人閱讀 165,282評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長欲低。 經(jīng)常有香客問我,道長畜晰,這世上最難降的妖魔是什么砾莱? 我笑而不...
    開封第一講書人閱讀 58,842評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮凄鼻,結(jié)果婚禮上腊瑟,老公的妹妹穿的比我還像新娘。我一直安慰自己块蚌,他們只是感情好闰非,可當(dāng)我...
    茶點故事閱讀 67,857評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著峭范,像睡著了一般财松。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,679評論 1 305
  • 那天辆毡,我揣著相機與錄音菜秦,去河邊找鬼。 笑死舶掖,一個胖子當(dāng)著我的面吹牛球昨,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播眨攘,決...
    沈念sama閱讀 40,406評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼主慰,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了鲫售?” 一聲冷哼從身側(cè)響起共螺,我...
    開封第一講書人閱讀 39,311評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎龟虎,沒想到半個月后璃谨,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,767評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡鲤妥,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年佳吞,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片棉安。...
    茶點故事閱讀 40,090評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡底扳,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出贡耽,到底是詐尸還是另有隱情衷模,我是刑警寧澤,帶...
    沈念sama閱讀 35,785評論 5 346
  • 正文 年R本政府宣布蒲赂,位于F島的核電站阱冶,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏滥嘴。R本人自食惡果不足惜木蹬,卻給世界環(huán)境...
    茶點故事閱讀 41,420評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望若皱。 院中可真熱鬧镊叁,春花似錦、人聲如沸走触。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽互广。三九已至敛腌,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背迎瞧。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評論 1 271
  • 我被黑心中介騙來泰國打工夸溶, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人凶硅。 一個月前我還...
    沈念sama閱讀 48,298評論 3 372
  • 正文 我出身青樓缝裁,卻偏偏與公主長得像,于是被迫代替她去往敵國和親足绅。 傳聞我的和親對象是個殘疾皇子捷绑,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,033評論 2 355