Flink 源碼:TM 端恢復(fù)及創(chuàng)建 KeyedState 的流程

本文僅為筆者平日學(xué)習(xí)記錄之用予颤,侵刪
原文:https://mp.weixin.qq.com/s/eaALnpd_qHQg6fxI12fQjg

本文會(huì)詳細(xì)分析 TM 端恢復(fù)及創(chuàng)建 KeyedState 的流程蚁鳖,恢復(fù)過(guò)程會(huì)分析 RocksDB 和 Fs 兩種 StateBackend 的恢復(fù)流程娃豹,創(chuàng)建流程會(huì)介紹 Checkpoint 處恢復(fù)的 State 如何與代碼中創(chuàng)建的 State 關(guān)聯(lián)起來(lái)盘寡。

一、 RocksDBKeyedStateBackend 創(chuàng)建流程

從 RocksDBStateBackend 類的 createKeyedStateBackend 方法開始知牌,createKeyedStateBackend 方法源碼主要加載一些配置和創(chuàng)建 RocksDBKeyedStateBackend油狂,就不貼出來(lái)了。簡(jiǎn)單介紹一下 createKeyedStateBackend 方法的功能:

  • 加載 RocksDB JNI library
  • 初始化 RocksDB 的本地?cái)?shù)據(jù)目錄企锌,對(duì)應(yīng)的是 RocksDB state.backend.rocksdb.localdir 參數(shù)配置的目錄
  • new RocksDBKeyedStateBackendBuilder 的構(gòu)造器榆浓,所有狀態(tài)的恢復(fù)及初始化都封裝在 build 方法中。

RocksDBKeyedStateBackendBuilder 的 build 方法刪減后源碼如下所示:

@Override
public RocksDBKeyedStateBackend<K> build() throws BackendBuildingException { 
 // 維護(hù) StateName 與 StateInfo 的映射關(guān)系
 LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> 
    kvStateInformation = new LinkedHashMap<>();
 RocksDB db = null;
 AbstractRocksDBRestoreOperation restoreOperation = null;

 SnapshotStrategy<K> snapshotStrategy;
 try {
  // 保存 CheckpointID 與 sst 的映射
  SortedMap<Long, Set<StateHandleID>> materializedSstFiles = new TreeMap<>();
  long lastCompletedCheckpointId = -1L;

  // 準(zhǔn)備 instanceBasePath 目錄撕攒,用于本地狀態(tài)存儲(chǔ)
  prepareDirectories();

  // 根據(jù) restoreStateHandles 決定三種恢復(fù)模式:
  // 1陡鹃、 RocksDB 無(wú)需恢復(fù),直接啟動(dòng)
  // 2抖坪、 RocksDB 增量 Checkpoint 的狀態(tài)恢復(fù)
  // 3萍鲸、 RocksDB 全量 Checkpoint 的狀態(tài)恢復(fù)
  restoreOperation = getRocksDBRestoreOperation(XXX);

  // 恢復(fù)狀態(tài),并打開 db擦俐,具體 執(zhí)行三種不同恢復(fù)流程
  RocksDBRestoreResult restoreResult = restoreOperation.restore();
  db = restoreResult.getDb();

  // RocksDB 的增量 Checkpoint 模式脊阴,則獲取 sst 文件,
  if (restoreOperation instanceof RocksDBIncrementalRestoreOperation) {
   backendUID = restoreResult.getBackendUID();
   materializedSstFiles = restoreResult.getRestoredSstFiles();
   lastCompletedCheckpointId = restoreResult.getLastCompletedCheckpointId();
  }

  // 初始化 Savepoint 和 Checkpoint 的 snapshot 策略
  snapshotStrategy = initializeSavepointAndCheckpointStrategies(XXX);
 } catch (Throwable e) {
  // State 初始化異常蚯瞧,做一些清理操作
 }
 InternalKeyContext<K> keyContext = new InternalKeyContextImpl<>(
  keyGroupRange,
  numberOfKeyGroups
 );
  // kvStateInformation 會(huì)傳遞給 RocksDBKeyedStateBackend
 return new RocksDBKeyedStateBackend<>(XXX);
}

build 方法中同樣定義了一個(gè) Map kvStateInformation 用于維護(hù) StateName 與 StateInfo 的映射關(guān)系嘿期。之后會(huì)準(zhǔn)備本地目錄,用于本地狀態(tài)存儲(chǔ)埋合。

getRocksDBRestoreOperation 方法會(huì)創(chuàng)建 RestoreOperation备徐,源碼如下所示:

// 根據(jù) restoreStateHandles 決定三種恢復(fù)模式:
// 1、 RocksDB 直接啟動(dòng)無(wú)需恢復(fù)
// 2甚颂、 增量 Checkpoint 的 RocksDB 恢復(fù)
// 3蜜猾、 全量 Checkpoint 的 RocksDB 恢復(fù)
private AbstractRocksDBRestoreOperation<K> getRocksDBRestoreOperation() {
  // restoreStateHandles 為空表示沒有 State 需要恢復(fù),構(gòu)造 NoneRestoreOperation
 if (restoreStateHandles.isEmpty()) {
  return new RocksDBNoneRestoreOperation<>(X);
 }

 KeyedStateHandle firstStateHandle = restoreStateHandles.iterator().next();
  // StateHandle 是 Increment 模式西设,則構(gòu)造 IncrementalRestoreOperation
  // 否則構(gòu)造 FullRestoreOperation
 if (firstStateHandle instanceof IncrementalKeyedStateHandle) {
  return new RocksDBIncrementalRestoreOperation<>(XXX);
 } else {
  return new RocksDBFullRestoreOperation<>(XXX);
 }
}

getRocksDBRestoreOperation 方法會(huì)根據(jù) restoreStateHandles 決定三種恢復(fù)模式:

  • restoreStateHandles 為空表示沒有 State 需要恢復(fù)瓣铣,構(gòu)造 NoneRestoreOperation,即:不恢復(fù) State 的方式贷揽,直接啟動(dòng)

  • restoreStateHandles 不為空的情況下棠笑,判斷 StateHandle 的類型, StateHandle 是 Increment 模式禽绪,則構(gòu)造 IncrementalRestoreOperation蓖救;否則構(gòu)造 FullRestoreOperation

build 方法下一步就會(huì)執(zhí)行具體 RocksDBRestoreOperation 的 restore 方法了,三種 RocksDBRestoreOperation 的 restore 流程完全不同印屁,且比較復(fù)雜循捺,后續(xù)單獨(dú)介紹。后續(xù)會(huì)執(zhí)行 initializeSavepointAndCheckpointStrategies 方法初始化 Savepoint 和 Checkpoint 的 snapshot 策略雄人,方法的返回值是 SnapshotStrategy 類型从橘,SnapshotStrategy 封裝了 Checkpoint 和 Savepoint 兩個(gè)策略念赶。如果用戶觸發(fā) Checkpoint,則執(zhí)行 Checkpoint 策略恰力,觸發(fā) Savepoint叉谜,則執(zhí)行 Checkpoint 策略。

initializeSavepointAndCheckpointStrategies 方法源碼如下所示:

// SnapshotStrategy 封裝了 Checkpoint 和 Savepoint 兩個(gè)策略踩萎。
class SnapshotStrategy<K> {
 final RocksDBSnapshotStrategyBase<K> checkpointSnapshotStrategy;
 final RocksDBSnapshotStrategyBase<K> savepointSnapshotStrategy;
}

private SnapshotStrategy<K> initializeSavepointAndCheckpointStrategies(XXX) {
 // 創(chuàng)建 Savepoint 的 snapshot 類為 Full Snapshot 策略
 RocksDBSnapshotStrategyBase<K> savepointSnapshotStrategy = 
    new RocksFullSnapshotStrategy<>(XXX);

 RocksDBSnapshotStrategyBase<K> checkpointSnapshotStrategy;
 // 如果開啟了增量 Checkpoint停局,
  // 則 Checkpoint 的 snapshot 類為 Increment Snapshot 策略
 if (enableIncrementalCheckpointing) {
  checkpointSnapshotStrategy = new RocksIncrementalSnapshotStrategy<>(XXX);
 } else {
  // 未開始增量 Checkpoint,
    // 則 Checkpoint 的 snapshot 為 Savepoint 的 snapshot 策略 
  checkpointSnapshotStrategy = savepointSnapshotStrategy;
 }
  // 封裝兩個(gè)策略到 SnapshotStrategy 中
 return new SnapshotStrategy<>(checkpointSnapshotStrategy, savepointSnapshotStrategy);
}

可以看到 RocksDB Savepoint 的 snapshot 類永遠(yuǎn)為 Full Snapshot 策略香府,如果開啟增量 Checkpoint董栽,則 Checkpoint 的 snapshot 類為 Increment Snapshot 策略。未開始增量 Checkpoint企孩,則 Checkpoint 的 snapshot 為 Savepoint 的 snapshot 策略锭碳,最后封裝兩個(gè)策略到 SnapshotStrategy 中。

這塊可以得出一個(gè)結(jié)論:使用 RocksDBStateBackend 時(shí)勿璃,如果不開啟增量 Checkpoint工禾,那么觸發(fā) Savepoint 和 Checkpoint 都是相同的策略,即:都是 Full Snapshot 模式蝗柔。

build 方法中上述流程如果任何階段拋出異常,都認(rèn)為 State 初始化異常民泵,會(huì)做一些清理操作癣丧,并認(rèn)為本次任務(wù)恢復(fù)失敗。如果一切都成功栈妆,最后會(huì)構(gòu)建出 RocksDBKeyedStateBackend胁编。

build 方法結(jié)束!下面重點(diǎn)關(guān)注三種不同的 RestoreOperation 具體是怎么 restore 的鳞尔。

二嬉橙、 RocksDB 的 NoneRestoreOperation 恢復(fù)流程

NoneRestoreOperation 表示沒有 State 需要恢復(fù),直接啟動(dòng)寥假,所以該模式下 restore 流程特別簡(jiǎn)單市框。

restore 方法源碼如下所示:

@Override
public RocksDBRestoreResult restore() throws Exception {
 openDB();
 return new RocksDBRestoreResult(this.db, defaultColumnFamilyHandle, 
                                  nativeMetricMonitor, -1, null, null);
}

直接打開一個(gè)空的 RocksDB 就恢復(fù)完成返回結(jié)果。

三糕韧、 RocksDB 的 IncrementalRestoreOperation 恢復(fù)流程

RocksDBIncrementalRestoreOperation 類的 restore 方法源碼如下所示:

@Override
public RocksDBRestoreResult restore() throws Exception {

 if (restoreStateHandles == null || restoreStateHandles.isEmpty()) {
  return null;
 }

 final KeyedStateHandle theFirstStateHandle = restoreStateHandles.iterator().next();

 // restoreStateHandles 數(shù)量大于 1枫振,
 // 或者 恢復(fù)的 keyGroupRange 與當(dāng)前負(fù)責(zé)的 keyGroupRange 不同,
 // 則使用 Rescaling 模式萤彩。如果沒有改并發(fā)粪滤,則關(guān)閉 Rescaling 模式
 boolean isRescaling = (restoreStateHandles.size() > 1 ||
  !Objects.equals(theFirstStateHandle.getKeyGroupRange(), keyGroupRange));

 if (isRescaling) {
  // Rescaling 開啟的恢復(fù)模式,相當(dāng)于改并發(fā)恢復(fù)雀扶,需要依賴 KeyGroup 恢復(fù)
  restoreWithRescaling(restoreStateHandles);
 } else {
  // Rescaling 關(guān)閉的恢復(fù)模式杖小,相當(dāng)于沒有改變并發(fā),直接恢復(fù) sst 即可
  // 沒有改并發(fā)就只有一個(gè) StateHandle,所以這里只需要將 firstStateHandle 當(dāng)做參數(shù)傳遞即可
  restoreWithoutRescaling(theFirstStateHandle);
 }
 return new RocksDBRestoreResult(this.db, defaultColumnFamilyHandle,
  nativeMetricMonitor, lastCompletedCheckpointId, backendUID, restoredSstFiles);
}

如果 restoreStateHandles 為 null 或者集合為空予权,直接返回 null昂勉。否則開始后面的恢復(fù)流程。

恢復(fù)流程第一步檢測(cè)是否是 rescale 模式伟件,換言之:檢測(cè)是否新舊 Job 之間修改并發(fā)了硼啤。isRescaling 為 true 表示修改并發(fā)了,isRescaling 為 false 表示沒有修改并發(fā)斧账。

判斷是否修改并發(fā)的邏輯:

代碼中判斷邏輯:如果 restoreStateHandles 集合中元素?cái)?shù)量大于 1 或者恢復(fù)的 keyGroupRange 與當(dāng)前負(fù)責(zé)的 keyGroupRange 不同谴返,則開啟 Rescaling 模式。否則關(guān)閉 Rescaling 模式。

分析一波:如果不修改并發(fā)魏滚,那么新 Job 的 subtask 與舊 Job 的 subtask 是一對(duì)一的關(guān)系奸披,每個(gè) subtask 只會(huì)恢復(fù)舊 Job 對(duì)應(yīng)的那一個(gè) subtask 的 StateHandle,且新舊 subtask 負(fù)責(zé)的 KeyGroupRange 是相同的渠抹。

代碼中 restoreStateHandles 集合中元素?cái)?shù)量表示要恢復(fù)的 KeyedStateHandle 的數(shù)據(jù),數(shù)量大于 1 表示當(dāng)前 subtask 要恢復(fù)舊 Job 的多個(gè) subtask 的 KeyedStateHandle闪萄。所以 restoreStateHandles.size() > 1 必然修改了并發(fā)梧却。

代碼中拿要恢復(fù)的 StateHandle 的 KeyGroupRange 與當(dāng)前 subtask 負(fù)責(zé)的 KeyGroupRange 進(jìn)行比較,兩者不同則表示新舊 subtask 負(fù)責(zé)的 KeyGroupRange 不是完全相同的败去,也可以推斷出一定修改并發(fā)了放航。

例如:舊的 subtask 0 負(fù)責(zé)的 KeyGroupRange(0,9),新的 subtask 0 負(fù)責(zé)的 KeyGroupRange(0,6)圆裕,雖然 restoreStateHandles 集合中只有一個(gè) StateHandle广鳍,但是 KeyGroupRange 變了,也可以推斷出并發(fā)改變了吓妆。

判斷完是否修改并發(fā)赊时,就會(huì)按照是否修改并發(fā),進(jìn)行兩種不同的模式開始恢復(fù)流程行拢。如源碼所示祖秒,restoreWithoutRescaling 方法表示為修改并發(fā)的恢復(fù)模式,這里有個(gè)小細(xì)節(jié)舟奠,restoreWithoutRescaling 方法的參數(shù)是 KeyedStateHandle 類型狈涮,而不用傳整個(gè)集合,因?yàn)椴恍薷牟l(fā)只會(huì)恢復(fù)一個(gè) KeyedStateHandle鸭栖。而 restoreWithRescaling 方法的參數(shù)就是 KeyedStateHandle 的集合類型歌馍。

下面詳細(xì)介紹兩種恢復(fù)模式。

未修改并發(fā)的恢復(fù)流程

restoreWithoutRescaling 方法源碼如下所示:

private void restoreWithoutRescaling(KeyedStateHandle keyedStateHandle) throws Exception {
 if (keyedStateHandle instanceof IncrementalRemoteKeyedStateHandle) {
  // 遠(yuǎn)程的 KeyedStateHandle 恢復(fù)
  IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle =
   (IncrementalRemoteKeyedStateHandle) keyedStateHandle;

  // 保存 StateHandle 對(duì)應(yīng)的那一次 Checkpoint 對(duì)應(yīng)的文件狀態(tài)晕鹊,
  // 包括 lastCompletedCheckpointId 和 chk id 與 sst 映射關(guān)系
  restorePreviousIncrementalFilesStatus(incrementalRemoteKeyedStateHandle);

  /**
   * 從遠(yuǎn)程恢復(fù) State 的過(guò)程:
   *  1松却、 本地創(chuàng)建 tmp 目錄
   *  2暴浦、 從遠(yuǎn)程拉取 sst 文件到本地,將 遠(yuǎn)程的 StateHandle 轉(zhuǎn)換為 本地 的 StateHandle
   *  3晓锻、 調(diào)用 restoreFromLocalState 方法歌焦,從 local 恢復(fù) State
   *  4、 清理 tmp 文件
   */
  restoreFromRemoteState(incrementalRemoteKeyedStateHandle);

 } else if (keyedStateHandle instanceof IncrementalLocalKeyedStateHandle) {
  // Local 的 KeyedStateHandle 恢復(fù)
  IncrementalLocalKeyedStateHandle incrementalLocalKeyedStateHandle =
   (IncrementalLocalKeyedStateHandle) keyedStateHandle;

  // 保存 StateHandle 對(duì)應(yīng)的那一次 Checkpoint 對(duì)應(yīng)的文件狀態(tài)砚哆,
  // 包括 lastCompletedCheckpointId 和 chk id 與 sst 映射關(guān)系
  restorePreviousIncrementalFilesStatus(incrementalLocalKeyedStateHandle);

  // 從 local 恢復(fù) State 的方法
  restoreFromLocalState(incrementalLocalKeyedStateHandle);
 } else {
  throw new BackendBuildingException("XXX");
 }
}

restoreWithoutRescaling 方法中首先會(huì)區(qū)分 KeyedStateHandle 到底是 IncrementalRemoteKeyedStateHandle 類型還是 IncrementalLocalKeyedStateHandle独撇,區(qū)別在于一個(gè)是 Remote 一個(gè)是 Local。正常從 dfs 上恢復(fù)就屬于 Remote 模式躁锁,但是 RocksDB 增量 Checkpoint 有個(gè) local-recovery 的優(yōu)化纷铣。

local-recovery 是指:Checkpoint 時(shí)會(huì)在本地目錄保留一份狀態(tài)快照,當(dāng)任務(wù)重啟時(shí)避免了從 dfs 上拉取狀態(tài)文件的過(guò)程战转,加速任務(wù)恢復(fù)搜立。

Local 模式的恢復(fù)流程:

restorePreviousIncrementalFilesStatus 方法保存 StateHandle 對(duì)應(yīng)的那一次 Checkpoint 對(duì)應(yīng)的文件狀態(tài),包括 lastCompletedCheckpointId 和 chk id 與 sst 映射關(guān)系槐秧。

然后調(diào)用 restoreFromLocalState 方法從 Local 恢復(fù) State 即可啄踊。

Remote 模式的恢復(fù)流程:

同樣也是調(diào)用 restorePreviousIncrementalFilesStatus 方法保存 StateHandle 對(duì)應(yīng)的那一次 Checkpoint 對(duì)應(yīng)的文件狀態(tài),包括 lastCompletedCheckpointId 和 chk id 與 sst 映射關(guān)系刁标。

區(qū)別在于調(diào)用 restoreFromRemoteState 方法從 Remote 恢復(fù) State颠通,但其實(shí) restoreFromRemoteState 最后還是調(diào)用的 restoreFromLocalState。restoreFromRemoteState 方法源碼如下所示:

private void restoreFromRemoteState(IncrementalRemoteKeyedStateHandle stateHandle) {
 // 創(chuàng)建臨時(shí)目錄
 final Path tmpRestoreInstancePath = new Path(
  instanceBasePath.getAbsolutePath(),
  UUID.randomUUID().toString());
 try {
  // 從本地恢復(fù)
  restoreFromLocalState(
   // 從遠(yuǎn)程拉取 State 文件到本地
   transferRemoteStateToLocalDirectory(tmpRestoreInstancePath, stateHandle));
 } finally {
  cleanUpPathQuietly(tmpRestoreInstancePath);
 }
}

遠(yuǎn)程恢復(fù) State 的過(guò)程如下所示:

  1. 本地創(chuàng)建 tmp 目錄
  2. 從遠(yuǎn)程拉取 sst 文件到本地膀懈,將 遠(yuǎn)程的 StateHandle 轉(zhuǎn)換為 本地 的 StateHandle
  3. 調(diào)用 restoreFromLocalState 方法蒜哀,從 local 恢復(fù) State
  4. 清理 tmp 文件

所以得出的結(jié)論是:Remote 模式相比 Local 模式而言,只是多了一個(gè)從 dfs 上下載文件到本地的過(guò)程吏砂,下載到本地后就轉(zhuǎn)換成了 Local 模式進(jìn)行恢復(fù)。所以先看一下 transferRemoteStateToLocalDirectory 方法是如何下載文件的乘客,之后重點(diǎn)關(guān)注 restoreFromLocalState 即可狐血。

下載狀態(tài)文件到本地流程:

transferRemoteStateToLocalDirectory 方法源碼如下所示:

private IncrementalLocalKeyedStateHandle transferRemoteStateToLocalDirectory(
 Path temporaryRestoreInstancePath,
 IncrementalRemoteKeyedStateHandle restoreStateHandle) throws Exception {

 // try with resource 的方式創(chuàng)建 RocksDBStateDownloader
 try (RocksDBStateDownloader rocksDBStateDownloader = 
       new RocksDBStateDownloader(numberOfTransferringThreads)) {
  // 具體的從 dfs 上 Download 數(shù)據(jù)到本地
    // 使用線程池,多線程拉取 所有的 sst 文件和 RocksDB 數(shù)據(jù)庫(kù)的元數(shù)據(jù)
  rocksDBStateDownloader.transferAllStateDataToDirectory(
   restoreStateHandle,
   temporaryRestoreInstancePath,
   cancelStreamRegistry);
 }

 // 將 Remote 的 StateHandle 重新構(gòu)建成 Local 的 StateHandle
 return new IncrementalLocalKeyedStateHandle(
  restoreStateHandle.getBackendIdentifier(),
  restoreStateHandle.getCheckpointId(),
    // 使用 DirectoryStateHandle易核,目錄就是之前創(chuàng)建的臨時(shí)數(shù)據(jù)目錄
  new DirectoryStateHandle(temporaryRestoreInstancePath),
  restoreStateHandle.getKeyGroupRange(),
  restoreStateHandle.getMetaStateHandle(),
  restoreStateHandle.getSharedState().keySet());
}

創(chuàng)建 RocksDBStateDownloader 類匈织,見名之意,用于下載 RocksDB 狀態(tài)文件的類牡直。RocksDBStateDownloader 的構(gòu)造參數(shù)是拉取文件的線程數(shù)缀匕,具體可以進(jìn)行配置的。然后使用 RocksDBStateDownloader 去 dfs 上多線程 Download 所有的 sst 文件和 RocksDB 數(shù)據(jù)庫(kù)的元數(shù)據(jù)數(shù)據(jù)到本地的 temporaryRestoreInstancePath 臨時(shí)目錄下碰逸。

拉取完成后乡小,將 Remote 的 StateHandle 重新構(gòu)建成 Local 的 StateHandle,并且使用 DirectoryStateHandle饵史,這里的目錄就是之前創(chuàng)建的臨時(shí)數(shù)據(jù)目錄满钟,剛才下載的數(shù)據(jù)也在該目錄下胜榔。然后就開始

Increment 模式不修改并發(fā),從 Local 恢復(fù) State 流程

restoreFromLocalState 方法源碼如下所示:

// 從本地 State 文件中恢復(fù)狀態(tài)
private void restoreFromLocalState(IncrementalLocalKeyedStateHandle localKeyedStateHandle) throws Exception {
 // 從 State 中獲取元數(shù)據(jù)
 KeyedBackendSerializationProxy<K> serializationProxy = readMetaData(
    localKeyedStateHandle.getMetaDataState());
 List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = serializationProxy
    .getStateMetaInfoSnapshots();

 // 根據(jù) State 的元數(shù)據(jù)湃番,創(chuàng)建或注冊(cè) CF 描述符
 columnFamilyDescriptors = 
    createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots, true);

 Path restoreSourcePath = localKeyedStateHandle.getDirectoryStateHandle().getDirectory();

 // 準(zhǔn)備數(shù)據(jù)目錄
 restoreInstanceDirectoryFromPath(restoreSourcePath, dbPath);

 // 打開 DB夭织,打開 DB 時(shí)會(huì)將 columnFamilyHandles 填滿,
  // 即獲取到 columnFamilyHandles 集合
 columnFamilyHandles = new ArrayList<>(columnFamilyDescriptors.size() + 1);
 openDB();

  // 將 StateName 和 State 在 RocksDB 的 CF 句柄等元數(shù)據(jù) 映射信息保存到 kvStateInformation 中
 registerColumnFamilyHandles(stateMetaInfoSnapshots);
}

首先從 State 中獲取元數(shù)據(jù)吠撮,根據(jù) State 的元數(shù)據(jù)尊惰,創(chuàng)建或注冊(cè) CF 描述符,然后 restoreInstanceDirectoryFromPath 方法用于準(zhǔn)備數(shù)據(jù)目錄泥兰,準(zhǔn)備數(shù)據(jù)目錄就是將那些從 Checkpoint 處拉取的文件從臨時(shí)目錄拷貝到真正的 DB 目錄弄屡。restoreInstanceDirectoryFromPath 方法在拷貝數(shù)據(jù)時(shí)做了一個(gè)優(yōu)化,以 .sst 結(jié)尾的文件不是真正拷貝逾条,而是做了一個(gè) link琢岩,其他文件真正的拷貝過(guò)去。RocksDB 真正存儲(chǔ)數(shù)據(jù)的是 .sst师脂,這些才是占用空間較大的文件担孔,向其他的元數(shù)據(jù)占用空間較小,所以直接拷貝吃警。

最后 registerColumnFamilyHandles 方法糕篇,將 StateName 和 State 在 RocksDB 的 CF 句柄等元數(shù)據(jù)映射信息保存到 kvStateInformation 中。kvStateInformation 是一個(gè) Map酌心,key 為 StateName拌消,value 為 RocksDbKvStateInfo 類型。

RocksDbKvStateInfo 相關(guān)類源碼如下所示:

public static class RocksDbKvStateInfo implements AutoCloseable{
        public final ColumnFamilyHandle columnFamilyHandle;
        public final RegisteredStateMetaInfoBase metaInfo;
}

RocksDbKvStateInfo 封裝了 ColumnFamilyHandle 和 RegisteredStateMetaInfoBase安券,ColumnFamilyHandle 表示 RocksDB CF 句柄墩崩,RegisteredStateMetaInfoBase 中維護(hù)了 State 的 name、類型侯勉、序列化信息等鹦筹。有了 kvStateInformation,就可以根據(jù) StateName 拿到當(dāng)前 State 對(duì)應(yīng)的 CF 句柄讀到數(shù)據(jù)址貌,并拿到其對(duì)應(yīng)的序列化規(guī)則對(duì)讀到的數(shù)據(jù)進(jìn)行反序列化铐拐。

到這里 RocksDB 的 Increment 模式在不改變并發(fā)的情況下,無(wú)論是 Remote 還是 Local练对,數(shù)據(jù)都正潮轶恢復(fù)了(數(shù)據(jù)在本地的 RocksDB 實(shí)例中,可以根據(jù) kvStateInformation 中維護(hù)的信息從 RocksDB 中讀取到數(shù)據(jù))螟凭。

修改并發(fā)的恢復(fù)流程

修改并發(fā)的情況虚青,新的 subtask 可能要恢復(fù)多個(gè) StateHandle 的數(shù)據(jù),也就是多個(gè) RocksDB 實(shí)例的數(shù)據(jù)螺男。最笨的方法是將多個(gè) RocksDB 的數(shù)據(jù)全拉取的本地挟憔,建立多個(gè) RocksDB 實(shí)例钟些,從中讀取出當(dāng)前 subtask 對(duì)應(yīng) KeyGroup 的數(shù)據(jù),寫入到一個(gè)新的 RocksDB 實(shí)例中绊谭。這樣存在一個(gè)問(wèn)題政恍,所有數(shù)據(jù)都是一條條從舊的 RocksDB get 出來(lái),再一條條 put 到一個(gè)新的 RocksDB 中 达传。

恢復(fù) RocksDB 優(yōu)化

FLINK-8790 基于上述方案做了一個(gè)優(yōu)化:首先從多個(gè) RocksDB 實(shí)例中選取一個(gè)最優(yōu)的 RocksDB 實(shí)例篙耗,最優(yōu)的標(biāo)準(zhǔn)是:RocksDB 負(fù)責(zé) KeyGroupRange 與當(dāng)前 subtask 負(fù)責(zé)的 KeyGroupRange 的交集占 RocksDB 負(fù)責(zé)的 KeyGroupRange 的百分比最高。

舉個(gè)例子:RocksDB a 存儲(chǔ)的 KeyGroupRange(0,9) 的數(shù)據(jù)宪赶,當(dāng)前 subtask 負(fù)責(zé)的 KeyGroupRange(0,7) 的數(shù)據(jù)宗弯,那么交集就是 KeyGroupRange(0,7)。KeyGroupRange(0,7) 中包含 8 個(gè) KeyGroup搂妻,KeyGroupRange(0,9) 包含 10 個(gè) KeyGroup蒙保,8 /10 為 80%。

此時(shí)就認(rèn)為 RocksDB a 上有 80% 的數(shù)據(jù)是有效的欲主,所有要恢復(fù)的 RocksDB 都做一次上述運(yùn)算邓厕,挑選出分?jǐn)?shù)最高的 RocksDB 實(shí)例。源碼中還加了一層限制扁瓢,重疊率低于 75% 的 RocksDB 會(huì)直接被過(guò)濾掉详恼。通過(guò)上述篩選,可能會(huì)得到一個(gè)相對(duì)來(lái)講最優(yōu)的 RocksDB 做為最終的 RocksDB引几,但是要對(duì)其進(jìn)行裁剪昧互。就拿上述例子來(lái)講,RocksDB 中存儲(chǔ)的 KeyGroupRange(0,9) 的數(shù)據(jù)伟桅,但只需要 KeyGroupRange(0,7) 的數(shù)據(jù)敞掘,所以會(huì)將 KeyGroupRange(8,9) 的數(shù)據(jù)裁掉。當(dāng)然裁剪效率相對(duì)較高楣铁,RocksDB 中 key 的設(shè)計(jì)都是以 KeyGroup 開頭的玖雁,LSM Tree 的底層存儲(chǔ)都是按照 key 有序存儲(chǔ),所以直接按照前綴即可高效裁剪民褂。

篩選最優(yōu) StateHandle 的代碼,參考 RocksDBIncrementalCheckpointUtils 的 chooseTheBestStateHandleForInitial 方法和 STATE_HANDLE_EVALUATOR 函數(shù)式接口疯潭,源碼如下所示:

public static KeyedStateHandle chooseTheBestStateHandleForInitial(
 @Nonnull Collection<KeyedStateHandle> restoreStateHandles,
 @Nonnull KeyGroupRange targetKeyGroupRange) {

 KeyedStateHandle bestStateHandle = null;
 double bestScore = 0;
 // 遍歷所有 KeyedStateHandle
 for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
  // 計(jì)算分?jǐn)?shù)
  double handleScore = STATE_HANDLE_EVALUATOR.apply(rawStateHandle, targetKeyGroupRange);
  if (handleScore > bestScore) {
   // 保存最高分 及 對(duì)應(yīng)的 KeyedStateHandle
   bestStateHandle = rawStateHandle;
   bestScore = handleScore;
  }
 }

 return bestStateHandle;
}

private static final BiFunction<KeyedStateHandle, KeyGroupRange, Double> 
  STATE_HANDLE_EVALUATOR = (stateHandle, targetKeyGroupRange) -> {

 final KeyGroupRange handleKeyGroupRange = stateHandle.getKeyGroupRange();
 // 計(jì)算當(dāng)前 StateHandle 與 目標(biāo) Handle 在 KeyGroup 上的交集
 final KeyGroupRange intersectGroup = handleKeyGroupRange.
    getIntersection(targetKeyGroupRange);

 // 計(jì)算 當(dāng)前 StateHandle 對(duì)應(yīng)的狀態(tài)文件上 有 百分之多少的數(shù)據(jù)應(yīng)該在當(dāng)前 subtask 上
 final double overlapFraction = (double) intersectGroup.getNumberOfKeyGroups() / 
    handleKeyGroupRange.getNumberOfKeyGroups();

 // 概率小于 0.75 返回 -1赊堪,意味著該 StateHandle 是肯定會(huì)被丟棄的
 if (overlapFraction < OVERLAP_FRACTION_THRESHOLD) {
  return -1.0;
 }

 return intersectGroup.getNumberOfKeyGroups() 
    * overlapFraction * overlapFraction;
};

修改并發(fā)的恢復(fù)主流程

restoreWithRescaling 源碼如下所示:

private void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles) {

 // 選取一個(gè)最好的 StateHandle 用于數(shù)據(jù)初始化,會(huì)有一個(gè)選擇標(biāo)準(zhǔn)打分竖哩,分?jǐn)?shù)最高哭廉,則被選中
 // 分?jǐn)?shù)的計(jì)算規(guī)則:主要依賴 StateHandle 的 KeyGroup 與 當(dāng)前 subtask 處理的 KeyGroup 求一個(gè)交集,看重疊率
 KeyedStateHandle initialHandle = RocksDBIncrementalCheckpointUtils.
    chooseTheBestStateHandleForInitial(restoreStateHandles, keyGroupRange);

 // Init base DB instance
 if (initialHandle != null) {
  // 打開選取的認(rèn)為最好的 StateHandle 對(duì)應(yīng)的 db相叁,并對(duì)其多余的 KeyGroup 進(jìn)行裁剪
  restoreStateHandles.remove(initialHandle);
  initDBWithRescaling(initialHandle);
 } else {
  // open 一個(gè)空的 db
  openDB();
 }

 // 將 target 的 startKey 和 endKey 轉(zhuǎn)換成 byte 形式
 byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
 RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getStartKeyGroup(), 
                                                 startKeyGroupPrefixBytes);

 byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
 RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getEndKeyGroup() + 1,
                                                 stopKeyGroupPrefixBytes);

 // 將所有要恢復(fù)的 StateHandle 中對(duì)應(yīng)的 RocksDB 恢復(fù)遵绰,
 // 并將 target 的 startKey 和 endKey 之間的數(shù)據(jù) put 到目標(biāo) db
  // 同時(shí)還需要將 StateName 和 State 在 RocksDB 的 CF 句柄等元數(shù)據(jù) 映射信息保存到 kvStateInformation 中
 for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
  XXX
 }
}

首先根據(jù)分?jǐn)?shù)辽幌,挑選一個(gè)最優(yōu)的 StateHandle 作為 RocksDB 初始 DB,initDBWithRescaling 方法會(huì)對(duì)多余的 KeyGroup 進(jìn)行裁剪椿访。如果沒有挑選出來(lái)說(shuō)明都不太優(yōu)乌企,會(huì)直接創(chuàng)建一個(gè)空的 RocksDB 作為初始 DB。

根據(jù)當(dāng)前 subtask 負(fù)責(zé)的 keyGroupRange 計(jì)算出 RocksDB 的 startKey 和 endKey成玫,把其他剩余的所有 StateHandle 對(duì)應(yīng) RocksDB 數(shù)據(jù)庫(kù)一一恢復(fù)加酵,從中讀取出 key 位于 startKey 和 endKey 之間的數(shù)據(jù)插入到初始 DB 中。從 RocksDB 讀取數(shù)據(jù)時(shí)可以直接通過(guò) startKey seek 到指定位置哭当,因?yàn)槭侨钟行虻闹硗螅员闅v過(guò)程中一旦讀到 endKey 以外的數(shù)據(jù),就認(rèn)為遍歷結(jié)束了钦勘,直接退出循環(huán)陋葡。

同時(shí)在恢復(fù)過(guò)程中,需要將 StateName 和 State 在 RocksDB 的 CF 句柄等元數(shù)據(jù) 映射信息保存到 kvStateInformation 中彻采。

小優(yōu)化思考

后續(xù)多個(gè) RocksDB 實(shí)例恢復(fù)時(shí)的流程基本是串行操作腐缤,即:從 dfs 上拉取第一個(gè) RocksDB 數(shù)據(jù)文件、本地構(gòu)建 RocksDB 數(shù)據(jù)庫(kù)颊亮,依次讀出 startKey 和 endKey 之間的數(shù)據(jù)插入到新的 RocksDB柴梆。再拉取第二個(gè)、構(gòu)建终惑、讀數(shù)據(jù)绍在、寫數(shù)據(jù)。再拉取第三個(gè)雹有。偿渡。。

思考:所有從 dfs 上拉取 RocksDB 數(shù)據(jù)文件的過(guò)程霸奕,能不能完全異步化溜宽,即:讀寫第一個(gè) RocksDB 的過(guò)程中,就開始拉取第二個(gè)质帅、第三個(gè)等适揉。

四、 RocksDB 的 FullRestoreOperation 恢復(fù)流程

RocksDBFullRestoreOperation 類的 restore 方法的源碼如下所示:

@Override
public RocksDBRestoreResult restore()
 throws IOException, StateMigrationException, RocksDBException {
 // 打開空的 DB
 openDB();
 // 遍歷所有的 restoreStateHandles
 for (KeyedStateHandle keyedStateHandle : restoreStateHandles) {
  if (keyedStateHandle != null) {

   // RocksDB 的 Full 模式與 Savepoint 模式保存的狀態(tài)文件都是 Flink 自己序列化好的問(wèn)題煤惩,
   // 其對(duì)應(yīng)的 KeyedStateHandle 必然是 KeyGroupsStateHandle嫉嘀。
   if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) {
    throw new IllegalStateException("Unexpected state handle type, " +
     "expected: " + KeyGroupsStateHandle.class +
     ", but found: " + keyedStateHandle.getClass());
   }
   this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle;
   // 根據(jù) StateHandle 恢復(fù)
   restoreKeyGroupsInStateHandle();
  }
 }
 return new RocksDBRestoreResult(this.db, defaultColumnFamilyHandle, 
                                  nativeMetricMonitor, -1, null, null);
}

restore 方法會(huì)打開一個(gè)空的 RocksDB,遍歷所有的 restoreStateHandles魄揉,之前強(qiáng)調(diào)過(guò) Full 模式的 KeyStateHandle 對(duì)應(yīng)的是 KeyGroupsStateHandle 類型剪侮。所以這里進(jìn)行了判斷,如果不是 KeyGroupsStateHandle 類型洛退,直接拋出異常瓣俯,恢復(fù)失敗杰标。然后 restoreKeyGroupsInStateHandle 方法用于恢復(fù)當(dāng)前 keyedStateHandle 對(duì)應(yīng)的數(shù)據(jù)。

restoreKeyGroupsInStateHandle 方法源碼如下所示:

private void restoreKeyGroupsInStateHandle()
 throws IOException, StateMigrationException, RocksDBException {
 try {
  // KeyGroupsStateHandle 的場(chǎng)景彩匕,并不會(huì)直接拉回文件腔剂,而是建立一個(gè)遠(yuǎn)程的輸入流
  currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream();
  cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
  currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream);
  // 注冊(cè) StateName 和 State 在 RocksDB 的 CF 句柄等元數(shù)據(jù) 
    // 映射信息保存到 kvStateInformation 中
  restoreKVStateMetaData();
  // 將當(dāng)前 StateHandle 中屬于當(dāng)前 KeyGroupRange 的數(shù)據(jù) put 到 db 中
  restoreKVStateData();
 } finally {
  if (cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream)) {
   IOUtils.closeQuietly(currentStateHandleInStream);
  }
 }
}

restoreKeyGroupsInStateHandle 依然會(huì)將 StateName 和 State 在 RocksDB 的 CF 句柄等元數(shù)據(jù)映射信息保存到 kvStateInformation 中,并將 StateHandle 中屬于當(dāng)前 KeyGroupRange 的數(shù)據(jù) put 到 db 中推掸。KeyGroupsStateHandle 中能拿到狀態(tài)文件的輸入流桶蝎,且有保存的每個(gè) KeyGroup 在文件中的 offset,所以可以直接讀取到數(shù)據(jù)并 put 到剛創(chuàng)建的 RocksDB 中谅畅。

小結(jié)

到這里 RocksDB 的三種模式都恢復(fù)完成登渣,RocksDB 的三種恢復(fù)模式下,都會(huì)將 StateName 與具體 State 的信息維護(hù)在 kvStateInformation 中毡泻。后期在創(chuàng)建 State 的過(guò)程中胜茧,也會(huì)通過(guò) kvStateInformation 將創(chuàng)建的 State 與 Checkpoint 中恢復(fù)的 State 進(jìn)行關(guān)聯(lián)。

下面分析 FsStateBackend 模式下的恢復(fù)流程仇味。

五呻顽、 HeapKeyedStateBackend 創(chuàng)建恢復(fù)流程

FsStateBackend 模式下,createKeyedStateBackend 方法創(chuàng)建的是 HeapKeyedStateBackend丹墨,最后調(diào)用的 HeapKeyedStateBackendBuilder 的 build 方法廊遍。

build 方法源碼如下所示:

public HeapKeyedStateBackend<K> build() throws BackendBuildingException {
 // Map of registered Key/Value states
 Map<String, StateTable<K, ?, ?>> registeredKVStates = new HashMap<>();
 // Map of registered priority queue set states
 Map<String, HeapPriorityQueueSnapshotRestoreWrapper> 
    registeredPQStates = new HashMap<>();

 HeapSnapshotStrategy<K> snapshotStrategy = initSnapshotStrategy(XXX);
 InternalKeyContext<K> keyContext = new InternalKeyContextImpl<>(
  keyGroupRange,
  numberOfKeyGroups
 );
 HeapRestoreOperation<K> restoreOperation = new HeapRestoreOperation<>(XXX);
 try {
    // 恢復(fù)流程
  restoreOperation.restore();
 } catch (Exception e) {
  throw new BackendBuildingException("XXX", e);
 }
  // 構(gòu)建 HeapKeyedStateBackend
 return new HeapKeyedStateBackend<>(XXX);
}

build 方法中首先創(chuàng)建出 Map 類型的 registeredKVStates,用于保存 StateName 及對(duì)應(yīng)的 StateTable贩挣,每個(gè) State 對(duì)應(yīng)一個(gè) StateTable 存儲(chǔ)狀態(tài)數(shù)據(jù)喉前。將 registeredKVStates 傳遞給 HeapRestoreOperation 用于恢復(fù),最后再傳遞給 HeapKeyedStateBackend 用于后續(xù)使用王财。

HeapRestoreOperation 類的 restore 方法會(huì)遍歷所有的 StateHandle 恢復(fù) State 信息卵迂,維護(hù)映射關(guān)系到 registeredKVStates 中,并恢復(fù) State 信息到具體的 StateTable 中绒净。StateTable 是 Heap 模式真正存儲(chǔ) State 的集合见咒。

小結(jié)

HeapKeyedStateBackend 會(huì)將 StateName 與具體 State 的信息維護(hù)在 registeredKVStates 中。后期在創(chuàng)建 State 的過(guò)程中挂疆,也會(huì)通過(guò) registeredKVStates 將創(chuàng)建的 State 與 Checkpoint 中恢復(fù)的 State 進(jìn)行關(guān)聯(lián)改览。

下面詳細(xì)分析 KeyedState 的創(chuàng)建流程。

六缤言、 用戶定義的 KeyedState 創(chuàng)建流程

可以拿 IntervalJoinOperator 的例子來(lái)分析 KeyedState 的創(chuàng)建流程宝当,IntervalJoin 用于對(duì)兩個(gè)輸入流的數(shù)據(jù)進(jìn)行關(guān)聯(lián),兩個(gè)流先到的數(shù)據(jù)會(huì)放到 buffer 中墨闲,左右兩個(gè)流分別有各自的 buffer今妄,使用 Flink 的 MapState 充當(dāng) buffer郑口。

IntervalJoinOperator 類的 initializeState 方法源碼如下所示:

public void initializeState(StateInitializationContext context) throws Exception {
 super.initializeState(context);

 // 左流的 buffer
 this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
  LEFT_BUFFER,
  LongSerializer.INSTANCE,
  new ListSerializer<>(new BufferEntrySerializer<>(leftTypeSerializer))
 ));

 // 右流的 buffer
 this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
  RIGHT_BUFFER,
  LongSerializer.INSTANCE,
  new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer))
 ));
}

initializeState 方法會(huì)創(chuàng)建兩個(gè) State鸳碧,即:LEFT_BUFFER 和 RIGHT_BUFFER盾鳞。

context.getKeyedStateStore().getMapState 實(shí)際調(diào)用 DefaultKeyedStateStore 類的 getMapState 方法,整個(gè)創(chuàng)建 State 第一階段的方法調(diào)用時(shí)序圖如下所示:

調(diào)用關(guān)系從 DefaultKeyedStateStore 類的 getMapState 方法到 KeyedStateFactory 的 createInternalState 方法瞻离。

如下圖所示腾仅,KeyedStateFactory 有兩個(gè)子類,即:HeapKeyedStateBackend 和 RocksDBKeyedStateBackend套利。

如下圖所示推励,HeapKeyedStateBackend 會(huì)對(duì)應(yīng) MemoryStateBackend 和 FsStateBackend,RocksDBKeyedStateBackend 對(duì)應(yīng) RocksDBStateBackend肉迫。

StateBackend 與 keyedStateBackend 以及 operatorStateBackend 的映射關(guān)系

下面詳細(xì)介紹 HeapKeyedStateBackend 和 RocksDBKeyedStateBackend 的 createInternalState 方法是如何創(chuàng)建 State 的验辞。

HeapKeyedStateBackend 創(chuàng)建 State 流程

HeapKeyedStateBackend 類的 createInternalState 方法源碼如下所示:

public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(XXX) {
 StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
 // 注冊(cè)和恢復(fù) StateTable
 StateTable<K, N, SV> stateTable = tryRegisterStateTable(
  namespaceSerializer, stateDesc, 
    getStateSnapshotTransformFactory(stateDesc, snapshotTransformFactory));
 // 根據(jù) stateDesc、StateTable 和 序列化信息喊衫,創(chuàng)建具體的 State
 return stateFactory.createState(stateDesc, stateTable, getKeySerializer());
}

createInternalState 方法中首先會(huì)執(zhí)行 tryRegisterStateTable 方法「注冊(cè)和恢復(fù)」 StateTable跌造,然后根據(jù) stateDesc、StateTable 和 序列化信息族购,創(chuàng)建具體的 State壳贪。

重點(diǎn)的恢復(fù)邏輯就在 tryRegisterStateTable 方法中,tryRegisterStateTable 方法源碼如下所示:

private <N, V> StateTable<K, N, V> tryRegisterStateTable(XXX) {

  // 根據(jù) StateName 從 registeredKVStates 中獲取 StateTable
 StateTable<K, N, V> stateTable = (StateTable<K, N, V>) 
    registeredKVStates.get(stateDesc.getName());
 // stateTable 不為空寝杖,表示從 Checkpoint 中恢復(fù)了當(dāng)前 State
 if (stateTable != null) {
  RegisteredKeyValueStateBackendMetaInfo<N, V> restoredKvMetaInfo = 
      stateTable.getMetaInfo();

  // 主要對(duì) State 的兼容性進(jìn)行校驗(yàn)违施,校驗(yàn)包括:StateName、狀態(tài)類型瑟幕、序列化校驗(yàn)
  // 如果創(chuàng)建的 State 與 Checkpoint 恢復(fù)的 State 不匹配磕蒲,
  // 則拋出異常,不能成功恢復(fù)
  restoredKvMetaInfo.updateSnapshotTransformFactory(snapshotTransformFactory);

  TypeSerializerSchemaCompatibility<N> namespaceCompatibility =
   restoredKvMetaInfo.updateNamespaceSerializer(namespaceSerializer);

  // 檢查 State 的 name 和 Type 是否可以匹配
  restoredKvMetaInfo.checkStateMetaInfo(stateDesc);

  TypeSerializerSchemaCompatibility<V> stateCompatibility =
   restoredKvMetaInfo.updateStateSerializer(newStateSerializer);

  if (stateCompatibility.isIncompatible()) {
   throw new StateMigrationException("XXX");
  }

  stateTable.setMetaInfo(restoredKvMetaInfo);
 } else {
  // 沒有從 Checkpoint 恢復(fù)收苏,則創(chuàng)建 StateTable亿卤,存放到 registeredKVStates 中
  RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo = new 
      RegisteredKeyValueStateBackendMetaInfo<>(XXX);

  stateTable = snapshotStrategy.newStateTable(keyContext, newMetaInfo, keySerializer);
  registeredKVStates.put(stateDesc.getName(), stateTable);
 }

 return stateTable;
}

tryRegisterStateTable 方法首先會(huì)根據(jù) StateName 從 registeredKVStates 中獲取 StateTable 保存到 stateTable 中。

如果 stateTable 不為空鹿霸,表示 Checkpoint 中有當(dāng)前 StateName 對(duì)應(yīng)的狀態(tài)排吴,應(yīng)該恢復(fù),此時(shí)會(huì)對(duì)新舊 Job 的 State 匹配性進(jìn)行檢測(cè)懦鼠,校驗(yàn)項(xiàng)包括:StateName钻哩、狀態(tài)類型、序列化校驗(yàn)肛冶。

否則 stateTable 為空街氢,表示當(dāng)前 StateName 不需要從 Checkpoint 恢復(fù),直接創(chuàng)建一個(gè)新的 StateTable睦袖,存放到 registeredKVStates 中珊肃。

RocksDBKeyedStateBackend 創(chuàng)建 State 流程

RocksDBKeyedStateBackend 類的 createInternalState 方法源碼如下所示:

public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(XXX) {
 StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
 // 注冊(cè)和恢復(fù) State 元信息
 Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> 
  registerResult = tryRegisterKvStateInformation(
  stateDesc, namespaceSerializer, snapshotTransformFactory);
 // 根據(jù) stateDesc、State 元信息 和 RocksDBKeyedStateBackend,創(chuàng)建具體的 State
 return stateFactory.createState(stateDesc, registerResult, 
                                  RocksDBKeyedStateBackend.this);
}

createInternalState 方法中首先會(huì)執(zhí)行 tryRegisterKvStateInformation 方法「注冊(cè)和恢復(fù)」 State 元信息伦乔,然后根據(jù) stateDesc厉亏、State 元信息 和 RocksDBKeyedStateBackend,創(chuàng)建具體的 State烈和。

tryRegisterKvStateInformation 與上述 HeapKeyedStateBackend 類的恢復(fù)邏輯類似爱只,所以不貼代碼了,tryRegisterKvStateInformation 方法的整體邏輯就是:

  1. 首先會(huì)根據(jù) StateName 從 kvStateInformation 中獲取 State 的元信息保存到 oldStateInfo 中招刹。

  2. 如果 stateTable 不為空恬试,表示 Checkpoint 中有當(dāng)前 StateName 對(duì)應(yīng)的狀態(tài),應(yīng)該恢復(fù)疯暑,此時(shí)會(huì)對(duì)新舊 Job 的 State 匹配性進(jìn)行校驗(yàn)训柴。

  3. 否則 stateTable 為空,表示當(dāng)前 StateName 不需要從 Checkpoint 恢復(fù)妇拯,直接在 RocksDB 中創(chuàng)建一個(gè)新的 ColumnFamily 存儲(chǔ)當(dāng)前 State 的數(shù)據(jù)畦粮。

小結(jié)

在恢復(fù)過(guò)程中主要依賴之前創(chuàng)建的 Map,Map 中保存的從 Checkpoint 中恢復(fù)出來(lái)的狀態(tài)數(shù)據(jù)乖阵。如果 Map 中有對(duì)應(yīng) StateName 的數(shù)據(jù)宣赔,則對(duì)其進(jìn)行校驗(yàn)并恢復(fù);如果 Map 中找不到瞪浸,則創(chuàng)建新的儒将。

七、 總結(jié)

本文首先介紹了 RocksDBKeyedStateBackend 創(chuàng)建流程对蒲,并分別介紹 RocksDB 三種模式下的 State 恢復(fù)流程钩蚊,分別是:NoneRestoreOperation、IncrementalRestoreOperation蹈矮、FullRestoreOperation 三種模式砰逻。之后介紹 HeapKeyedStateBackend 恢復(fù)流程。最后介紹了用戶定義的 KeyedState 創(chuàng)建流程泛鸟,創(chuàng)建流程會(huì)介紹 Checkpoint 處恢復(fù)的 State 如何與代碼中創(chuàng)建的 State 關(guān)聯(lián)起來(lái)蝠咆。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市北滥,隨后出現(xiàn)的幾起案子刚操,更是在濱河造成了極大的恐慌,老刑警劉巖再芋,帶你破解...
    沈念sama閱讀 210,978評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件菊霜,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡济赎,警方通過(guò)查閱死者的電腦和手機(jī)鉴逞,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,954評(píng)論 2 384
  • 文/潘曉璐 我一進(jìn)店門记某,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人构捡,你說(shuō)我怎么就攤上這事辙纬。” “怎么了叭喜?”我有些...
    開封第一講書人閱讀 156,623評(píng)論 0 345
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)蓖谢。 經(jīng)常有香客問(wèn)我捂蕴,道長(zhǎng),這世上最難降的妖魔是什么闪幽? 我笑而不...
    開封第一講書人閱讀 56,324評(píng)論 1 282
  • 正文 為了忘掉前任啥辨,我火速辦了婚禮,結(jié)果婚禮上盯腌,老公的妹妹穿的比我還像新娘溉知。我一直安慰自己,他們只是感情好腕够,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,390評(píng)論 5 384
  • 文/花漫 我一把揭開白布级乍。 她就那樣靜靜地躺著,像睡著了一般帚湘。 火紅的嫁衣襯著肌膚如雪玫荣。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,741評(píng)論 1 289
  • 那天大诸,我揣著相機(jī)與錄音捅厂,去河邊找鬼。 笑死资柔,一個(gè)胖子當(dāng)著我的面吹牛焙贷,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播贿堰,決...
    沈念sama閱讀 38,892評(píng)論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼辙芍,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了羹与?” 一聲冷哼從身側(cè)響起沸手,我...
    開封第一講書人閱讀 37,655評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎注簿,沒想到半個(gè)月后契吉,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,104評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡诡渴,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,451評(píng)論 2 325
  • 正文 我和宋清朗相戀三年捐晶,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了菲语。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,569評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡惑灵,死狀恐怖山上,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情英支,我是刑警寧澤佩憾,帶...
    沈念sama閱讀 34,254評(píng)論 4 328
  • 正文 年R本政府宣布,位于F島的核電站干花,受9級(jí)特大地震影響妄帘,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜池凄,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,834評(píng)論 3 312
  • 文/蒙蒙 一抡驼、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧肿仑,春花似錦致盟、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,725評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至伟端,卻和暖如春眷篇,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背荔泳。 一陣腳步聲響...
    開封第一講書人閱讀 31,950評(píng)論 1 264
  • 我被黑心中介騙來(lái)泰國(guó)打工蕉饼, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人玛歌。 一個(gè)月前我還...
    沈念sama閱讀 46,260評(píng)論 2 360
  • 正文 我出身青樓昧港,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親支子。 傳聞我的和親對(duì)象是個(gè)殘疾皇子创肥,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,446評(píng)論 2 348