本文僅為筆者平日學(xué)習(xí)記錄之用予颤,侵刪
原文:https://mp.weixin.qq.com/s/eaALnpd_qHQg6fxI12fQjg
本文會(huì)詳細(xì)分析 TM 端恢復(fù)及創(chuàng)建 KeyedState 的流程蚁鳖,恢復(fù)過(guò)程會(huì)分析 RocksDB 和 Fs 兩種 StateBackend 的恢復(fù)流程娃豹,創(chuàng)建流程會(huì)介紹 Checkpoint 處恢復(fù)的 State 如何與代碼中創(chuàng)建的 State 關(guān)聯(lián)起來(lái)盘寡。
一、 RocksDBKeyedStateBackend 創(chuàng)建流程
從 RocksDBStateBackend 類的 createKeyedStateBackend 方法開始知牌,createKeyedStateBackend 方法源碼主要加載一些配置和創(chuàng)建 RocksDBKeyedStateBackend油狂,就不貼出來(lái)了。簡(jiǎn)單介紹一下 createKeyedStateBackend 方法的功能:
- 加載 RocksDB JNI library
- 初始化 RocksDB 的本地?cái)?shù)據(jù)目錄企锌,對(duì)應(yīng)的是 RocksDB
state.backend.rocksdb.localdir
參數(shù)配置的目錄 - new RocksDBKeyedStateBackendBuilder 的構(gòu)造器榆浓,所有狀態(tài)的恢復(fù)及初始化都封裝在 build 方法中。
RocksDBKeyedStateBackendBuilder 的 build 方法刪減后源碼如下所示:
@Override
public RocksDBKeyedStateBackend<K> build() throws BackendBuildingException {
// 維護(hù) StateName 與 StateInfo 的映射關(guān)系
LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo>
kvStateInformation = new LinkedHashMap<>();
RocksDB db = null;
AbstractRocksDBRestoreOperation restoreOperation = null;
SnapshotStrategy<K> snapshotStrategy;
try {
// 保存 CheckpointID 與 sst 的映射
SortedMap<Long, Set<StateHandleID>> materializedSstFiles = new TreeMap<>();
long lastCompletedCheckpointId = -1L;
// 準(zhǔn)備 instanceBasePath 目錄撕攒,用于本地狀態(tài)存儲(chǔ)
prepareDirectories();
// 根據(jù) restoreStateHandles 決定三種恢復(fù)模式:
// 1陡鹃、 RocksDB 無(wú)需恢復(fù),直接啟動(dòng)
// 2抖坪、 RocksDB 增量 Checkpoint 的狀態(tài)恢復(fù)
// 3萍鲸、 RocksDB 全量 Checkpoint 的狀態(tài)恢復(fù)
restoreOperation = getRocksDBRestoreOperation(XXX);
// 恢復(fù)狀態(tài),并打開 db擦俐,具體 執(zhí)行三種不同恢復(fù)流程
RocksDBRestoreResult restoreResult = restoreOperation.restore();
db = restoreResult.getDb();
// RocksDB 的增量 Checkpoint 模式脊阴,則獲取 sst 文件,
if (restoreOperation instanceof RocksDBIncrementalRestoreOperation) {
backendUID = restoreResult.getBackendUID();
materializedSstFiles = restoreResult.getRestoredSstFiles();
lastCompletedCheckpointId = restoreResult.getLastCompletedCheckpointId();
}
// 初始化 Savepoint 和 Checkpoint 的 snapshot 策略
snapshotStrategy = initializeSavepointAndCheckpointStrategies(XXX);
} catch (Throwable e) {
// State 初始化異常蚯瞧,做一些清理操作
}
InternalKeyContext<K> keyContext = new InternalKeyContextImpl<>(
keyGroupRange,
numberOfKeyGroups
);
// kvStateInformation 會(huì)傳遞給 RocksDBKeyedStateBackend
return new RocksDBKeyedStateBackend<>(XXX);
}
build 方法中同樣定義了一個(gè) Map kvStateInformation 用于維護(hù) StateName 與 StateInfo 的映射關(guān)系嘿期。之后會(huì)準(zhǔn)備本地目錄,用于本地狀態(tài)存儲(chǔ)埋合。
getRocksDBRestoreOperation 方法會(huì)創(chuàng)建 RestoreOperation备徐,源碼如下所示:
// 根據(jù) restoreStateHandles 決定三種恢復(fù)模式:
// 1、 RocksDB 直接啟動(dòng)無(wú)需恢復(fù)
// 2甚颂、 增量 Checkpoint 的 RocksDB 恢復(fù)
// 3蜜猾、 全量 Checkpoint 的 RocksDB 恢復(fù)
private AbstractRocksDBRestoreOperation<K> getRocksDBRestoreOperation() {
// restoreStateHandles 為空表示沒有 State 需要恢復(fù),構(gòu)造 NoneRestoreOperation
if (restoreStateHandles.isEmpty()) {
return new RocksDBNoneRestoreOperation<>(X);
}
KeyedStateHandle firstStateHandle = restoreStateHandles.iterator().next();
// StateHandle 是 Increment 模式西设,則構(gòu)造 IncrementalRestoreOperation
// 否則構(gòu)造 FullRestoreOperation
if (firstStateHandle instanceof IncrementalKeyedStateHandle) {
return new RocksDBIncrementalRestoreOperation<>(XXX);
} else {
return new RocksDBFullRestoreOperation<>(XXX);
}
}
getRocksDBRestoreOperation 方法會(huì)根據(jù) restoreStateHandles 決定三種恢復(fù)模式:
restoreStateHandles 為空表示沒有 State 需要恢復(fù)瓣铣,構(gòu)造 NoneRestoreOperation,即:不恢復(fù) State 的方式贷揽,直接啟動(dòng)
restoreStateHandles 不為空的情況下棠笑,判斷 StateHandle 的類型, StateHandle 是 Increment 模式禽绪,則構(gòu)造 IncrementalRestoreOperation蓖救;否則構(gòu)造 FullRestoreOperation
build 方法下一步就會(huì)執(zhí)行具體 RocksDBRestoreOperation 的 restore 方法了,三種 RocksDBRestoreOperation 的 restore 流程完全不同印屁,且比較復(fù)雜循捺,后續(xù)單獨(dú)介紹。后續(xù)會(huì)執(zhí)行 initializeSavepointAndCheckpointStrategies 方法初始化 Savepoint 和 Checkpoint 的 snapshot 策略雄人,方法的返回值是 SnapshotStrategy 類型从橘,SnapshotStrategy 封裝了 Checkpoint 和 Savepoint 兩個(gè)策略念赶。如果用戶觸發(fā) Checkpoint,則執(zhí)行 Checkpoint 策略恰力,觸發(fā) Savepoint叉谜,則執(zhí)行 Checkpoint 策略。
initializeSavepointAndCheckpointStrategies 方法源碼如下所示:
// SnapshotStrategy 封裝了 Checkpoint 和 Savepoint 兩個(gè)策略踩萎。
class SnapshotStrategy<K> {
final RocksDBSnapshotStrategyBase<K> checkpointSnapshotStrategy;
final RocksDBSnapshotStrategyBase<K> savepointSnapshotStrategy;
}
private SnapshotStrategy<K> initializeSavepointAndCheckpointStrategies(XXX) {
// 創(chuàng)建 Savepoint 的 snapshot 類為 Full Snapshot 策略
RocksDBSnapshotStrategyBase<K> savepointSnapshotStrategy =
new RocksFullSnapshotStrategy<>(XXX);
RocksDBSnapshotStrategyBase<K> checkpointSnapshotStrategy;
// 如果開啟了增量 Checkpoint停局,
// 則 Checkpoint 的 snapshot 類為 Increment Snapshot 策略
if (enableIncrementalCheckpointing) {
checkpointSnapshotStrategy = new RocksIncrementalSnapshotStrategy<>(XXX);
} else {
// 未開始增量 Checkpoint,
// 則 Checkpoint 的 snapshot 為 Savepoint 的 snapshot 策略
checkpointSnapshotStrategy = savepointSnapshotStrategy;
}
// 封裝兩個(gè)策略到 SnapshotStrategy 中
return new SnapshotStrategy<>(checkpointSnapshotStrategy, savepointSnapshotStrategy);
}
可以看到 RocksDB Savepoint 的 snapshot 類永遠(yuǎn)為 Full Snapshot 策略香府,如果開啟增量 Checkpoint董栽,則 Checkpoint 的 snapshot 類為 Increment Snapshot 策略。未開始增量 Checkpoint企孩,則 Checkpoint 的 snapshot 為 Savepoint 的 snapshot 策略锭碳,最后封裝兩個(gè)策略到 SnapshotStrategy 中。
這塊可以得出一個(gè)結(jié)論:使用 RocksDBStateBackend 時(shí)勿璃,如果不開啟增量 Checkpoint工禾,那么觸發(fā) Savepoint 和 Checkpoint 都是相同的策略,即:都是 Full Snapshot 模式蝗柔。
build 方法中上述流程如果任何階段拋出異常,都認(rèn)為 State 初始化異常民泵,會(huì)做一些清理操作癣丧,并認(rèn)為本次任務(wù)恢復(fù)失敗。如果一切都成功栈妆,最后會(huì)構(gòu)建出 RocksDBKeyedStateBackend胁编。
build 方法結(jié)束!下面重點(diǎn)關(guān)注三種不同的 RestoreOperation 具體是怎么 restore 的鳞尔。
二嬉橙、 RocksDB 的 NoneRestoreOperation 恢復(fù)流程
NoneRestoreOperation 表示沒有 State 需要恢復(fù),直接啟動(dòng)寥假,所以該模式下 restore 流程特別簡(jiǎn)單市框。
restore 方法源碼如下所示:
@Override
public RocksDBRestoreResult restore() throws Exception {
openDB();
return new RocksDBRestoreResult(this.db, defaultColumnFamilyHandle,
nativeMetricMonitor, -1, null, null);
}
直接打開一個(gè)空的 RocksDB 就恢復(fù)完成返回結(jié)果。
三糕韧、 RocksDB 的 IncrementalRestoreOperation 恢復(fù)流程
RocksDBIncrementalRestoreOperation 類的 restore 方法源碼如下所示:
@Override
public RocksDBRestoreResult restore() throws Exception {
if (restoreStateHandles == null || restoreStateHandles.isEmpty()) {
return null;
}
final KeyedStateHandle theFirstStateHandle = restoreStateHandles.iterator().next();
// restoreStateHandles 數(shù)量大于 1枫振,
// 或者 恢復(fù)的 keyGroupRange 與當(dāng)前負(fù)責(zé)的 keyGroupRange 不同,
// 則使用 Rescaling 模式萤彩。如果沒有改并發(fā)粪滤,則關(guān)閉 Rescaling 模式
boolean isRescaling = (restoreStateHandles.size() > 1 ||
!Objects.equals(theFirstStateHandle.getKeyGroupRange(), keyGroupRange));
if (isRescaling) {
// Rescaling 開啟的恢復(fù)模式,相當(dāng)于改并發(fā)恢復(fù)雀扶,需要依賴 KeyGroup 恢復(fù)
restoreWithRescaling(restoreStateHandles);
} else {
// Rescaling 關(guān)閉的恢復(fù)模式杖小,相當(dāng)于沒有改變并發(fā),直接恢復(fù) sst 即可
// 沒有改并發(fā)就只有一個(gè) StateHandle,所以這里只需要將 firstStateHandle 當(dāng)做參數(shù)傳遞即可
restoreWithoutRescaling(theFirstStateHandle);
}
return new RocksDBRestoreResult(this.db, defaultColumnFamilyHandle,
nativeMetricMonitor, lastCompletedCheckpointId, backendUID, restoredSstFiles);
}
如果 restoreStateHandles 為 null 或者集合為空予权,直接返回 null昂勉。否則開始后面的恢復(fù)流程。
恢復(fù)流程第一步檢測(cè)是否是 rescale 模式伟件,換言之:檢測(cè)是否新舊 Job 之間修改并發(fā)了硼啤。isRescaling 為 true 表示修改并發(fā)了,isRescaling 為 false 表示沒有修改并發(fā)斧账。
判斷是否修改并發(fā)的邏輯:
代碼中判斷邏輯:如果 restoreStateHandles 集合中元素?cái)?shù)量大于 1 或者恢復(fù)的 keyGroupRange 與當(dāng)前負(fù)責(zé)的 keyGroupRange 不同谴返,則開啟 Rescaling 模式。否則關(guān)閉 Rescaling 模式。
分析一波:如果不修改并發(fā)魏滚,那么新 Job 的 subtask 與舊 Job 的 subtask 是一對(duì)一的關(guān)系奸披,每個(gè) subtask 只會(huì)恢復(fù)舊 Job 對(duì)應(yīng)的那一個(gè) subtask 的 StateHandle,且新舊 subtask 負(fù)責(zé)的 KeyGroupRange 是相同的渠抹。
代碼中 restoreStateHandles 集合中元素?cái)?shù)量表示要恢復(fù)的 KeyedStateHandle 的數(shù)據(jù),數(shù)量大于 1 表示當(dāng)前 subtask 要恢復(fù)舊 Job 的多個(gè) subtask 的 KeyedStateHandle闪萄。所以 restoreStateHandles.size() > 1 必然修改了并發(fā)梧却。
代碼中拿要恢復(fù)的 StateHandle 的 KeyGroupRange 與當(dāng)前 subtask 負(fù)責(zé)的 KeyGroupRange 進(jìn)行比較,兩者不同則表示新舊 subtask 負(fù)責(zé)的 KeyGroupRange 不是完全相同的败去,也可以推斷出一定修改并發(fā)了放航。
例如:舊的 subtask 0 負(fù)責(zé)的 KeyGroupRange(0,9),新的 subtask 0 負(fù)責(zé)的 KeyGroupRange(0,6)圆裕,雖然 restoreStateHandles 集合中只有一個(gè) StateHandle广鳍,但是 KeyGroupRange 變了,也可以推斷出并發(fā)改變了吓妆。
判斷完是否修改并發(fā)赊时,就會(huì)按照是否修改并發(fā),進(jìn)行兩種不同的模式開始恢復(fù)流程行拢。如源碼所示祖秒,restoreWithoutRescaling 方法表示為修改并發(fā)的恢復(fù)模式,這里有個(gè)小細(xì)節(jié)舟奠,restoreWithoutRescaling 方法的參數(shù)是 KeyedStateHandle 類型狈涮,而不用傳整個(gè)集合,因?yàn)椴恍薷牟l(fā)只會(huì)恢復(fù)一個(gè) KeyedStateHandle鸭栖。而 restoreWithRescaling 方法的參數(shù)就是 KeyedStateHandle 的集合類型歌馍。
下面詳細(xì)介紹兩種恢復(fù)模式。
未修改并發(fā)的恢復(fù)流程
restoreWithoutRescaling 方法源碼如下所示:
private void restoreWithoutRescaling(KeyedStateHandle keyedStateHandle) throws Exception {
if (keyedStateHandle instanceof IncrementalRemoteKeyedStateHandle) {
// 遠(yuǎn)程的 KeyedStateHandle 恢復(fù)
IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle =
(IncrementalRemoteKeyedStateHandle) keyedStateHandle;
// 保存 StateHandle 對(duì)應(yīng)的那一次 Checkpoint 對(duì)應(yīng)的文件狀態(tài)晕鹊,
// 包括 lastCompletedCheckpointId 和 chk id 與 sst 映射關(guān)系
restorePreviousIncrementalFilesStatus(incrementalRemoteKeyedStateHandle);
/**
* 從遠(yuǎn)程恢復(fù) State 的過(guò)程:
* 1松却、 本地創(chuàng)建 tmp 目錄
* 2暴浦、 從遠(yuǎn)程拉取 sst 文件到本地,將 遠(yuǎn)程的 StateHandle 轉(zhuǎn)換為 本地 的 StateHandle
* 3晓锻、 調(diào)用 restoreFromLocalState 方法歌焦,從 local 恢復(fù) State
* 4、 清理 tmp 文件
*/
restoreFromRemoteState(incrementalRemoteKeyedStateHandle);
} else if (keyedStateHandle instanceof IncrementalLocalKeyedStateHandle) {
// Local 的 KeyedStateHandle 恢復(fù)
IncrementalLocalKeyedStateHandle incrementalLocalKeyedStateHandle =
(IncrementalLocalKeyedStateHandle) keyedStateHandle;
// 保存 StateHandle 對(duì)應(yīng)的那一次 Checkpoint 對(duì)應(yīng)的文件狀態(tài)砚哆,
// 包括 lastCompletedCheckpointId 和 chk id 與 sst 映射關(guān)系
restorePreviousIncrementalFilesStatus(incrementalLocalKeyedStateHandle);
// 從 local 恢復(fù) State 的方法
restoreFromLocalState(incrementalLocalKeyedStateHandle);
} else {
throw new BackendBuildingException("XXX");
}
}
restoreWithoutRescaling 方法中首先會(huì)區(qū)分 KeyedStateHandle 到底是 IncrementalRemoteKeyedStateHandle 類型還是 IncrementalLocalKeyedStateHandle独撇,區(qū)別在于一個(gè)是 Remote 一個(gè)是 Local。正常從 dfs 上恢復(fù)就屬于 Remote 模式躁锁,但是 RocksDB 增量 Checkpoint 有個(gè) local-recovery 的優(yōu)化纷铣。
local-recovery 是指:Checkpoint 時(shí)會(huì)在本地目錄保留一份狀態(tài)快照,當(dāng)任務(wù)重啟時(shí)避免了從 dfs 上拉取狀態(tài)文件的過(guò)程战转,加速任務(wù)恢復(fù)搜立。
Local 模式的恢復(fù)流程:
restorePreviousIncrementalFilesStatus 方法保存 StateHandle 對(duì)應(yīng)的那一次 Checkpoint 對(duì)應(yīng)的文件狀態(tài),包括 lastCompletedCheckpointId 和 chk id 與 sst 映射關(guān)系槐秧。
然后調(diào)用 restoreFromLocalState 方法從 Local 恢復(fù) State 即可啄踊。
Remote 模式的恢復(fù)流程:
同樣也是調(diào)用 restorePreviousIncrementalFilesStatus 方法保存 StateHandle 對(duì)應(yīng)的那一次 Checkpoint 對(duì)應(yīng)的文件狀態(tài),包括 lastCompletedCheckpointId 和 chk id 與 sst 映射關(guān)系刁标。
區(qū)別在于調(diào)用 restoreFromRemoteState 方法從 Remote 恢復(fù) State颠通,但其實(shí) restoreFromRemoteState 最后還是調(diào)用的 restoreFromLocalState。restoreFromRemoteState 方法源碼如下所示:
private void restoreFromRemoteState(IncrementalRemoteKeyedStateHandle stateHandle) {
// 創(chuàng)建臨時(shí)目錄
final Path tmpRestoreInstancePath = new Path(
instanceBasePath.getAbsolutePath(),
UUID.randomUUID().toString());
try {
// 從本地恢復(fù)
restoreFromLocalState(
// 從遠(yuǎn)程拉取 State 文件到本地
transferRemoteStateToLocalDirectory(tmpRestoreInstancePath, stateHandle));
} finally {
cleanUpPathQuietly(tmpRestoreInstancePath);
}
}
遠(yuǎn)程恢復(fù) State 的過(guò)程如下所示:
- 本地創(chuàng)建 tmp 目錄
- 從遠(yuǎn)程拉取 sst 文件到本地膀懈,將 遠(yuǎn)程的 StateHandle 轉(zhuǎn)換為 本地 的 StateHandle
- 調(diào)用 restoreFromLocalState 方法蒜哀,從 local 恢復(fù) State
- 清理 tmp 文件
所以得出的結(jié)論是:Remote 模式相比 Local 模式而言,只是多了一個(gè)從 dfs 上下載文件到本地的過(guò)程吏砂,下載到本地后就轉(zhuǎn)換成了 Local 模式進(jìn)行恢復(fù)。所以先看一下 transferRemoteStateToLocalDirectory 方法是如何下載文件的乘客,之后重點(diǎn)關(guān)注 restoreFromLocalState 即可狐血。
下載狀態(tài)文件到本地流程:
transferRemoteStateToLocalDirectory 方法源碼如下所示:
private IncrementalLocalKeyedStateHandle transferRemoteStateToLocalDirectory(
Path temporaryRestoreInstancePath,
IncrementalRemoteKeyedStateHandle restoreStateHandle) throws Exception {
// try with resource 的方式創(chuàng)建 RocksDBStateDownloader
try (RocksDBStateDownloader rocksDBStateDownloader =
new RocksDBStateDownloader(numberOfTransferringThreads)) {
// 具體的從 dfs 上 Download 數(shù)據(jù)到本地
// 使用線程池,多線程拉取 所有的 sst 文件和 RocksDB 數(shù)據(jù)庫(kù)的元數(shù)據(jù)
rocksDBStateDownloader.transferAllStateDataToDirectory(
restoreStateHandle,
temporaryRestoreInstancePath,
cancelStreamRegistry);
}
// 將 Remote 的 StateHandle 重新構(gòu)建成 Local 的 StateHandle
return new IncrementalLocalKeyedStateHandle(
restoreStateHandle.getBackendIdentifier(),
restoreStateHandle.getCheckpointId(),
// 使用 DirectoryStateHandle易核,目錄就是之前創(chuàng)建的臨時(shí)數(shù)據(jù)目錄
new DirectoryStateHandle(temporaryRestoreInstancePath),
restoreStateHandle.getKeyGroupRange(),
restoreStateHandle.getMetaStateHandle(),
restoreStateHandle.getSharedState().keySet());
}
創(chuàng)建 RocksDBStateDownloader 類匈织,見名之意,用于下載 RocksDB 狀態(tài)文件的類牡直。RocksDBStateDownloader 的構(gòu)造參數(shù)是拉取文件的線程數(shù)缀匕,具體可以進(jìn)行配置的。然后使用 RocksDBStateDownloader 去 dfs 上多線程 Download 所有的 sst 文件和 RocksDB 數(shù)據(jù)庫(kù)的元數(shù)據(jù)數(shù)據(jù)到本地的 temporaryRestoreInstancePath 臨時(shí)目錄下碰逸。
拉取完成后乡小,將 Remote 的 StateHandle 重新構(gòu)建成 Local 的 StateHandle,并且使用 DirectoryStateHandle饵史,這里的目錄就是之前創(chuàng)建的臨時(shí)數(shù)據(jù)目錄满钟,剛才下載的數(shù)據(jù)也在該目錄下胜榔。然后就開始
Increment 模式不修改并發(fā),從 Local 恢復(fù) State 流程
restoreFromLocalState 方法源碼如下所示:
// 從本地 State 文件中恢復(fù)狀態(tài)
private void restoreFromLocalState(IncrementalLocalKeyedStateHandle localKeyedStateHandle) throws Exception {
// 從 State 中獲取元數(shù)據(jù)
KeyedBackendSerializationProxy<K> serializationProxy = readMetaData(
localKeyedStateHandle.getMetaDataState());
List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = serializationProxy
.getStateMetaInfoSnapshots();
// 根據(jù) State 的元數(shù)據(jù)湃番,創(chuàng)建或注冊(cè) CF 描述符
columnFamilyDescriptors =
createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots, true);
Path restoreSourcePath = localKeyedStateHandle.getDirectoryStateHandle().getDirectory();
// 準(zhǔn)備數(shù)據(jù)目錄
restoreInstanceDirectoryFromPath(restoreSourcePath, dbPath);
// 打開 DB夭织,打開 DB 時(shí)會(huì)將 columnFamilyHandles 填滿,
// 即獲取到 columnFamilyHandles 集合
columnFamilyHandles = new ArrayList<>(columnFamilyDescriptors.size() + 1);
openDB();
// 將 StateName 和 State 在 RocksDB 的 CF 句柄等元數(shù)據(jù) 映射信息保存到 kvStateInformation 中
registerColumnFamilyHandles(stateMetaInfoSnapshots);
}
首先從 State 中獲取元數(shù)據(jù)吠撮,根據(jù) State 的元數(shù)據(jù)尊惰,創(chuàng)建或注冊(cè) CF 描述符,然后 restoreInstanceDirectoryFromPath 方法用于準(zhǔn)備數(shù)據(jù)目錄泥兰,準(zhǔn)備數(shù)據(jù)目錄就是將那些從 Checkpoint 處拉取的文件從臨時(shí)目錄拷貝到真正的 DB 目錄弄屡。restoreInstanceDirectoryFromPath 方法在拷貝數(shù)據(jù)時(shí)做了一個(gè)優(yōu)化,以 .sst 結(jié)尾的文件不是真正拷貝逾条,而是做了一個(gè) link琢岩,其他文件真正的拷貝過(guò)去。RocksDB 真正存儲(chǔ)數(shù)據(jù)的是 .sst师脂,這些才是占用空間較大的文件担孔,向其他的元數(shù)據(jù)占用空間較小,所以直接拷貝吃警。
最后 registerColumnFamilyHandles 方法糕篇,將 StateName 和 State 在 RocksDB 的 CF 句柄等元數(shù)據(jù)映射信息保存到 kvStateInformation 中。kvStateInformation 是一個(gè) Map酌心,key 為 StateName拌消,value 為 RocksDbKvStateInfo 類型。
RocksDbKvStateInfo 相關(guān)類源碼如下所示:
public static class RocksDbKvStateInfo implements AutoCloseable{
public final ColumnFamilyHandle columnFamilyHandle;
public final RegisteredStateMetaInfoBase metaInfo;
}
RocksDbKvStateInfo 封裝了 ColumnFamilyHandle 和 RegisteredStateMetaInfoBase安券,ColumnFamilyHandle 表示 RocksDB CF 句柄墩崩,RegisteredStateMetaInfoBase 中維護(hù)了 State 的 name、類型侯勉、序列化信息等鹦筹。有了 kvStateInformation,就可以根據(jù) StateName 拿到當(dāng)前 State 對(duì)應(yīng)的 CF 句柄讀到數(shù)據(jù)址貌,并拿到其對(duì)應(yīng)的序列化規(guī)則對(duì)讀到的數(shù)據(jù)進(jìn)行反序列化铐拐。
到這里 RocksDB 的 Increment 模式在不改變并發(fā)的情況下,無(wú)論是 Remote 還是 Local练对,數(shù)據(jù)都正潮轶恢復(fù)了(數(shù)據(jù)在本地的 RocksDB 實(shí)例中,可以根據(jù) kvStateInformation 中維護(hù)的信息從 RocksDB 中讀取到數(shù)據(jù))螟凭。
修改并發(fā)的恢復(fù)流程
修改并發(fā)的情況虚青,新的 subtask 可能要恢復(fù)多個(gè) StateHandle 的數(shù)據(jù),也就是多個(gè) RocksDB 實(shí)例的數(shù)據(jù)螺男。最笨的方法是將多個(gè) RocksDB 的數(shù)據(jù)全拉取的本地挟憔,建立多個(gè) RocksDB 實(shí)例钟些,從中讀取出當(dāng)前 subtask 對(duì)應(yīng) KeyGroup 的數(shù)據(jù),寫入到一個(gè)新的 RocksDB 實(shí)例中绊谭。這樣存在一個(gè)問(wèn)題政恍,所有數(shù)據(jù)都是一條條從舊的 RocksDB get 出來(lái),再一條條 put 到一個(gè)新的 RocksDB 中 达传。
恢復(fù) RocksDB 優(yōu)化
FLINK-8790 基于上述方案做了一個(gè)優(yōu)化:首先從多個(gè) RocksDB 實(shí)例中選取一個(gè)最優(yōu)的 RocksDB 實(shí)例篙耗,最優(yōu)的標(biāo)準(zhǔn)是:RocksDB 負(fù)責(zé) KeyGroupRange 與當(dāng)前 subtask 負(fù)責(zé)的 KeyGroupRange 的交集占 RocksDB 負(fù)責(zé)的 KeyGroupRange 的百分比最高。
舉個(gè)例子:RocksDB a 存儲(chǔ)的 KeyGroupRange(0,9) 的數(shù)據(jù)宪赶,當(dāng)前 subtask 負(fù)責(zé)的 KeyGroupRange(0,7) 的數(shù)據(jù)宗弯,那么交集就是 KeyGroupRange(0,7)。KeyGroupRange(0,7) 中包含 8 個(gè) KeyGroup搂妻,KeyGroupRange(0,9) 包含 10 個(gè) KeyGroup蒙保,8 /10 為 80%。
此時(shí)就認(rèn)為 RocksDB a 上有 80% 的數(shù)據(jù)是有效的欲主,所有要恢復(fù)的 RocksDB 都做一次上述運(yùn)算邓厕,挑選出分?jǐn)?shù)最高的 RocksDB 實(shí)例。源碼中還加了一層限制扁瓢,重疊率低于 75% 的 RocksDB 會(huì)直接被過(guò)濾掉详恼。通過(guò)上述篩選,可能會(huì)得到一個(gè)相對(duì)來(lái)講最優(yōu)的 RocksDB 做為最終的 RocksDB引几,但是要對(duì)其進(jìn)行裁剪昧互。就拿上述例子來(lái)講,RocksDB 中存儲(chǔ)的 KeyGroupRange(0,9) 的數(shù)據(jù)伟桅,但只需要 KeyGroupRange(0,7) 的數(shù)據(jù)敞掘,所以會(huì)將 KeyGroupRange(8,9) 的數(shù)據(jù)裁掉。當(dāng)然裁剪效率相對(duì)較高楣铁,RocksDB 中 key 的設(shè)計(jì)都是以 KeyGroup 開頭的玖雁,LSM Tree 的底層存儲(chǔ)都是按照 key 有序存儲(chǔ),所以直接按照前綴即可高效裁剪民褂。
篩選最優(yōu) StateHandle 的代碼,參考 RocksDBIncrementalCheckpointUtils 的 chooseTheBestStateHandleForInitial 方法和 STATE_HANDLE_EVALUATOR 函數(shù)式接口疯潭,源碼如下所示:
public static KeyedStateHandle chooseTheBestStateHandleForInitial(
@Nonnull Collection<KeyedStateHandle> restoreStateHandles,
@Nonnull KeyGroupRange targetKeyGroupRange) {
KeyedStateHandle bestStateHandle = null;
double bestScore = 0;
// 遍歷所有 KeyedStateHandle
for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
// 計(jì)算分?jǐn)?shù)
double handleScore = STATE_HANDLE_EVALUATOR.apply(rawStateHandle, targetKeyGroupRange);
if (handleScore > bestScore) {
// 保存最高分 及 對(duì)應(yīng)的 KeyedStateHandle
bestStateHandle = rawStateHandle;
bestScore = handleScore;
}
}
return bestStateHandle;
}
private static final BiFunction<KeyedStateHandle, KeyGroupRange, Double>
STATE_HANDLE_EVALUATOR = (stateHandle, targetKeyGroupRange) -> {
final KeyGroupRange handleKeyGroupRange = stateHandle.getKeyGroupRange();
// 計(jì)算當(dāng)前 StateHandle 與 目標(biāo) Handle 在 KeyGroup 上的交集
final KeyGroupRange intersectGroup = handleKeyGroupRange.
getIntersection(targetKeyGroupRange);
// 計(jì)算 當(dāng)前 StateHandle 對(duì)應(yīng)的狀態(tài)文件上 有 百分之多少的數(shù)據(jù)應(yīng)該在當(dāng)前 subtask 上
final double overlapFraction = (double) intersectGroup.getNumberOfKeyGroups() /
handleKeyGroupRange.getNumberOfKeyGroups();
// 概率小于 0.75 返回 -1赊堪,意味著該 StateHandle 是肯定會(huì)被丟棄的
if (overlapFraction < OVERLAP_FRACTION_THRESHOLD) {
return -1.0;
}
return intersectGroup.getNumberOfKeyGroups()
* overlapFraction * overlapFraction;
};
修改并發(fā)的恢復(fù)主流程
restoreWithRescaling 源碼如下所示:
private void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandles) {
// 選取一個(gè)最好的 StateHandle 用于數(shù)據(jù)初始化,會(huì)有一個(gè)選擇標(biāo)準(zhǔn)打分竖哩,分?jǐn)?shù)最高哭廉,則被選中
// 分?jǐn)?shù)的計(jì)算規(guī)則:主要依賴 StateHandle 的 KeyGroup 與 當(dāng)前 subtask 處理的 KeyGroup 求一個(gè)交集,看重疊率
KeyedStateHandle initialHandle = RocksDBIncrementalCheckpointUtils.
chooseTheBestStateHandleForInitial(restoreStateHandles, keyGroupRange);
// Init base DB instance
if (initialHandle != null) {
// 打開選取的認(rèn)為最好的 StateHandle 對(duì)應(yīng)的 db相叁,并對(duì)其多余的 KeyGroup 進(jìn)行裁剪
restoreStateHandles.remove(initialHandle);
initDBWithRescaling(initialHandle);
} else {
// open 一個(gè)空的 db
openDB();
}
// 將 target 的 startKey 和 endKey 轉(zhuǎn)換成 byte 形式
byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getStartKeyGroup(),
startKeyGroupPrefixBytes);
byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
RocksDBKeySerializationUtils.serializeKeyGroup(keyGroupRange.getEndKeyGroup() + 1,
stopKeyGroupPrefixBytes);
// 將所有要恢復(fù)的 StateHandle 中對(duì)應(yīng)的 RocksDB 恢復(fù)遵绰,
// 并將 target 的 startKey 和 endKey 之間的數(shù)據(jù) put 到目標(biāo) db
// 同時(shí)還需要將 StateName 和 State 在 RocksDB 的 CF 句柄等元數(shù)據(jù) 映射信息保存到 kvStateInformation 中
for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
XXX
}
}
首先根據(jù)分?jǐn)?shù)辽幌,挑選一個(gè)最優(yōu)的 StateHandle 作為 RocksDB 初始 DB,initDBWithRescaling 方法會(huì)對(duì)多余的 KeyGroup 進(jìn)行裁剪椿访。如果沒有挑選出來(lái)說(shuō)明都不太優(yōu)乌企,會(huì)直接創(chuàng)建一個(gè)空的 RocksDB 作為初始 DB。
根據(jù)當(dāng)前 subtask 負(fù)責(zé)的 keyGroupRange 計(jì)算出 RocksDB 的 startKey 和 endKey成玫,把其他剩余的所有 StateHandle 對(duì)應(yīng) RocksDB 數(shù)據(jù)庫(kù)一一恢復(fù)加酵,從中讀取出 key 位于 startKey 和 endKey 之間的數(shù)據(jù)插入到初始 DB 中。從 RocksDB 讀取數(shù)據(jù)時(shí)可以直接通過(guò) startKey seek 到指定位置哭当,因?yàn)槭侨钟行虻闹硗螅员闅v過(guò)程中一旦讀到 endKey 以外的數(shù)據(jù),就認(rèn)為遍歷結(jié)束了钦勘,直接退出循環(huán)陋葡。
同時(shí)在恢復(fù)過(guò)程中,需要將 StateName 和 State 在 RocksDB 的 CF 句柄等元數(shù)據(jù) 映射信息保存到 kvStateInformation 中彻采。
小優(yōu)化思考
后續(xù)多個(gè) RocksDB 實(shí)例恢復(fù)時(shí)的流程基本是串行操作腐缤,即:從 dfs 上拉取第一個(gè) RocksDB 數(shù)據(jù)文件、本地構(gòu)建 RocksDB 數(shù)據(jù)庫(kù)颊亮,依次讀出 startKey 和 endKey 之間的數(shù)據(jù)插入到新的 RocksDB柴梆。再拉取第二個(gè)、構(gòu)建终惑、讀數(shù)據(jù)绍在、寫數(shù)據(jù)。再拉取第三個(gè)雹有。偿渡。。
思考:所有從 dfs 上拉取 RocksDB 數(shù)據(jù)文件的過(guò)程霸奕,能不能完全異步化溜宽,即:讀寫第一個(gè) RocksDB 的過(guò)程中,就開始拉取第二個(gè)质帅、第三個(gè)等适揉。
四、 RocksDB 的 FullRestoreOperation 恢復(fù)流程
RocksDBFullRestoreOperation 類的 restore 方法的源碼如下所示:
@Override
public RocksDBRestoreResult restore()
throws IOException, StateMigrationException, RocksDBException {
// 打開空的 DB
openDB();
// 遍歷所有的 restoreStateHandles
for (KeyedStateHandle keyedStateHandle : restoreStateHandles) {
if (keyedStateHandle != null) {
// RocksDB 的 Full 模式與 Savepoint 模式保存的狀態(tài)文件都是 Flink 自己序列化好的問(wèn)題煤惩,
// 其對(duì)應(yīng)的 KeyedStateHandle 必然是 KeyGroupsStateHandle嫉嘀。
if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) {
throw new IllegalStateException("Unexpected state handle type, " +
"expected: " + KeyGroupsStateHandle.class +
", but found: " + keyedStateHandle.getClass());
}
this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle;
// 根據(jù) StateHandle 恢復(fù)
restoreKeyGroupsInStateHandle();
}
}
return new RocksDBRestoreResult(this.db, defaultColumnFamilyHandle,
nativeMetricMonitor, -1, null, null);
}
restore 方法會(huì)打開一個(gè)空的 RocksDB,遍歷所有的 restoreStateHandles魄揉,之前強(qiáng)調(diào)過(guò) Full 模式的 KeyStateHandle 對(duì)應(yīng)的是 KeyGroupsStateHandle 類型剪侮。所以這里進(jìn)行了判斷,如果不是 KeyGroupsStateHandle 類型洛退,直接拋出異常瓣俯,恢復(fù)失敗杰标。然后 restoreKeyGroupsInStateHandle 方法用于恢復(fù)當(dāng)前 keyedStateHandle 對(duì)應(yīng)的數(shù)據(jù)。
restoreKeyGroupsInStateHandle 方法源碼如下所示:
private void restoreKeyGroupsInStateHandle()
throws IOException, StateMigrationException, RocksDBException {
try {
// KeyGroupsStateHandle 的場(chǎng)景彩匕,并不會(huì)直接拉回文件腔剂,而是建立一個(gè)遠(yuǎn)程的輸入流
currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream();
cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream);
// 注冊(cè) StateName 和 State 在 RocksDB 的 CF 句柄等元數(shù)據(jù)
// 映射信息保存到 kvStateInformation 中
restoreKVStateMetaData();
// 將當(dāng)前 StateHandle 中屬于當(dāng)前 KeyGroupRange 的數(shù)據(jù) put 到 db 中
restoreKVStateData();
} finally {
if (cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream)) {
IOUtils.closeQuietly(currentStateHandleInStream);
}
}
}
restoreKeyGroupsInStateHandle 依然會(huì)將 StateName 和 State 在 RocksDB 的 CF 句柄等元數(shù)據(jù)映射信息保存到 kvStateInformation 中,并將 StateHandle 中屬于當(dāng)前 KeyGroupRange 的數(shù)據(jù) put 到 db 中推掸。KeyGroupsStateHandle 中能拿到狀態(tài)文件的輸入流桶蝎,且有保存的每個(gè) KeyGroup 在文件中的 offset,所以可以直接讀取到數(shù)據(jù)并 put 到剛創(chuàng)建的 RocksDB 中谅畅。
小結(jié)
到這里 RocksDB 的三種模式都恢復(fù)完成登渣,RocksDB 的三種恢復(fù)模式下,都會(huì)將 StateName 與具體 State 的信息維護(hù)在 kvStateInformation 中毡泻。后期在創(chuàng)建 State 的過(guò)程中胜茧,也會(huì)通過(guò) kvStateInformation 將創(chuàng)建的 State 與 Checkpoint 中恢復(fù)的 State 進(jìn)行關(guān)聯(lián)。
下面分析 FsStateBackend 模式下的恢復(fù)流程仇味。
五呻顽、 HeapKeyedStateBackend 創(chuàng)建恢復(fù)流程
FsStateBackend 模式下,createKeyedStateBackend 方法創(chuàng)建的是 HeapKeyedStateBackend丹墨,最后調(diào)用的 HeapKeyedStateBackendBuilder 的 build 方法廊遍。
build 方法源碼如下所示:
public HeapKeyedStateBackend<K> build() throws BackendBuildingException {
// Map of registered Key/Value states
Map<String, StateTable<K, ?, ?>> registeredKVStates = new HashMap<>();
// Map of registered priority queue set states
Map<String, HeapPriorityQueueSnapshotRestoreWrapper>
registeredPQStates = new HashMap<>();
HeapSnapshotStrategy<K> snapshotStrategy = initSnapshotStrategy(XXX);
InternalKeyContext<K> keyContext = new InternalKeyContextImpl<>(
keyGroupRange,
numberOfKeyGroups
);
HeapRestoreOperation<K> restoreOperation = new HeapRestoreOperation<>(XXX);
try {
// 恢復(fù)流程
restoreOperation.restore();
} catch (Exception e) {
throw new BackendBuildingException("XXX", e);
}
// 構(gòu)建 HeapKeyedStateBackend
return new HeapKeyedStateBackend<>(XXX);
}
build 方法中首先創(chuàng)建出 Map 類型的 registeredKVStates,用于保存 StateName 及對(duì)應(yīng)的 StateTable贩挣,每個(gè) State 對(duì)應(yīng)一個(gè) StateTable 存儲(chǔ)狀態(tài)數(shù)據(jù)喉前。將 registeredKVStates 傳遞給 HeapRestoreOperation 用于恢復(fù),最后再傳遞給 HeapKeyedStateBackend 用于后續(xù)使用王财。
HeapRestoreOperation 類的 restore 方法會(huì)遍歷所有的 StateHandle 恢復(fù) State 信息卵迂,維護(hù)映射關(guān)系到 registeredKVStates 中,并恢復(fù) State 信息到具體的 StateTable 中绒净。StateTable 是 Heap 模式真正存儲(chǔ) State 的集合见咒。
小結(jié)
HeapKeyedStateBackend 會(huì)將 StateName 與具體 State 的信息維護(hù)在 registeredKVStates 中。后期在創(chuàng)建 State 的過(guò)程中挂疆,也會(huì)通過(guò) registeredKVStates 將創(chuàng)建的 State 與 Checkpoint 中恢復(fù)的 State 進(jìn)行關(guān)聯(lián)改览。
下面詳細(xì)分析 KeyedState 的創(chuàng)建流程。
六缤言、 用戶定義的 KeyedState 創(chuàng)建流程
可以拿 IntervalJoinOperator 的例子來(lái)分析 KeyedState 的創(chuàng)建流程宝当,IntervalJoin 用于對(duì)兩個(gè)輸入流的數(shù)據(jù)進(jìn)行關(guān)聯(lián),兩個(gè)流先到的數(shù)據(jù)會(huì)放到 buffer 中墨闲,左右兩個(gè)流分別有各自的 buffer今妄,使用 Flink 的 MapState 充當(dāng) buffer郑口。
IntervalJoinOperator 類的 initializeState 方法源碼如下所示:
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
// 左流的 buffer
this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
LEFT_BUFFER,
LongSerializer.INSTANCE,
new ListSerializer<>(new BufferEntrySerializer<>(leftTypeSerializer))
));
// 右流的 buffer
this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
RIGHT_BUFFER,
LongSerializer.INSTANCE,
new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer))
));
}
initializeState 方法會(huì)創(chuàng)建兩個(gè) State鸳碧,即:LEFT_BUFFER 和 RIGHT_BUFFER盾鳞。
context.getKeyedStateStore().getMapState 實(shí)際調(diào)用 DefaultKeyedStateStore 類的 getMapState 方法,整個(gè)創(chuàng)建 State 第一階段的方法調(diào)用時(shí)序圖如下所示:
調(diào)用關(guān)系從 DefaultKeyedStateStore 類的 getMapState 方法到 KeyedStateFactory 的 createInternalState 方法瞻离。
如下圖所示腾仅,KeyedStateFactory 有兩個(gè)子類,即:HeapKeyedStateBackend 和 RocksDBKeyedStateBackend套利。
如下圖所示推励,HeapKeyedStateBackend 會(huì)對(duì)應(yīng) MemoryStateBackend 和 FsStateBackend,RocksDBKeyedStateBackend 對(duì)應(yīng) RocksDBStateBackend肉迫。
下面詳細(xì)介紹 HeapKeyedStateBackend 和 RocksDBKeyedStateBackend 的 createInternalState 方法是如何創(chuàng)建 State 的验辞。
HeapKeyedStateBackend 創(chuàng)建 State 流程
HeapKeyedStateBackend 類的 createInternalState 方法源碼如下所示:
public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(XXX) {
StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
// 注冊(cè)和恢復(fù) StateTable
StateTable<K, N, SV> stateTable = tryRegisterStateTable(
namespaceSerializer, stateDesc,
getStateSnapshotTransformFactory(stateDesc, snapshotTransformFactory));
// 根據(jù) stateDesc、StateTable 和 序列化信息喊衫,創(chuàng)建具體的 State
return stateFactory.createState(stateDesc, stateTable, getKeySerializer());
}
createInternalState 方法中首先會(huì)執(zhí)行 tryRegisterStateTable 方法「注冊(cè)和恢復(fù)」 StateTable跌造,然后根據(jù) stateDesc、StateTable 和 序列化信息族购,創(chuàng)建具體的 State壳贪。
重點(diǎn)的恢復(fù)邏輯就在 tryRegisterStateTable 方法中,tryRegisterStateTable 方法源碼如下所示:
private <N, V> StateTable<K, N, V> tryRegisterStateTable(XXX) {
// 根據(jù) StateName 從 registeredKVStates 中獲取 StateTable
StateTable<K, N, V> stateTable = (StateTable<K, N, V>)
registeredKVStates.get(stateDesc.getName());
// stateTable 不為空寝杖,表示從 Checkpoint 中恢復(fù)了當(dāng)前 State
if (stateTable != null) {
RegisteredKeyValueStateBackendMetaInfo<N, V> restoredKvMetaInfo =
stateTable.getMetaInfo();
// 主要對(duì) State 的兼容性進(jìn)行校驗(yàn)违施,校驗(yàn)包括:StateName、狀態(tài)類型瑟幕、序列化校驗(yàn)
// 如果創(chuàng)建的 State 與 Checkpoint 恢復(fù)的 State 不匹配磕蒲,
// 則拋出異常,不能成功恢復(fù)
restoredKvMetaInfo.updateSnapshotTransformFactory(snapshotTransformFactory);
TypeSerializerSchemaCompatibility<N> namespaceCompatibility =
restoredKvMetaInfo.updateNamespaceSerializer(namespaceSerializer);
// 檢查 State 的 name 和 Type 是否可以匹配
restoredKvMetaInfo.checkStateMetaInfo(stateDesc);
TypeSerializerSchemaCompatibility<V> stateCompatibility =
restoredKvMetaInfo.updateStateSerializer(newStateSerializer);
if (stateCompatibility.isIncompatible()) {
throw new StateMigrationException("XXX");
}
stateTable.setMetaInfo(restoredKvMetaInfo);
} else {
// 沒有從 Checkpoint 恢復(fù)收苏,則創(chuàng)建 StateTable亿卤,存放到 registeredKVStates 中
RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo = new
RegisteredKeyValueStateBackendMetaInfo<>(XXX);
stateTable = snapshotStrategy.newStateTable(keyContext, newMetaInfo, keySerializer);
registeredKVStates.put(stateDesc.getName(), stateTable);
}
return stateTable;
}
tryRegisterStateTable 方法首先會(huì)根據(jù) StateName 從 registeredKVStates 中獲取 StateTable 保存到 stateTable 中。
如果 stateTable 不為空鹿霸,表示 Checkpoint 中有當(dāng)前 StateName 對(duì)應(yīng)的狀態(tài)排吴,應(yīng)該恢復(fù),此時(shí)會(huì)對(duì)新舊 Job 的 State 匹配性進(jìn)行檢測(cè)懦鼠,校驗(yàn)項(xiàng)包括:StateName钻哩、狀態(tài)類型、序列化校驗(yàn)肛冶。
否則 stateTable 為空街氢,表示當(dāng)前 StateName 不需要從 Checkpoint 恢復(fù),直接創(chuàng)建一個(gè)新的 StateTable睦袖,存放到 registeredKVStates 中珊肃。
RocksDBKeyedStateBackend 創(chuàng)建 State 流程
RocksDBKeyedStateBackend 類的 createInternalState 方法源碼如下所示:
public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(XXX) {
StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
// 注冊(cè)和恢復(fù) State 元信息
Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>>
registerResult = tryRegisterKvStateInformation(
stateDesc, namespaceSerializer, snapshotTransformFactory);
// 根據(jù) stateDesc、State 元信息 和 RocksDBKeyedStateBackend,創(chuàng)建具體的 State
return stateFactory.createState(stateDesc, registerResult,
RocksDBKeyedStateBackend.this);
}
createInternalState 方法中首先會(huì)執(zhí)行 tryRegisterKvStateInformation 方法「注冊(cè)和恢復(fù)」 State 元信息伦乔,然后根據(jù) stateDesc厉亏、State 元信息 和 RocksDBKeyedStateBackend,創(chuàng)建具體的 State烈和。
tryRegisterKvStateInformation 與上述 HeapKeyedStateBackend 類的恢復(fù)邏輯類似爱只,所以不貼代碼了,tryRegisterKvStateInformation 方法的整體邏輯就是:
首先會(huì)根據(jù) StateName 從 kvStateInformation 中獲取 State 的元信息保存到 oldStateInfo 中招刹。
如果 stateTable 不為空恬试,表示 Checkpoint 中有當(dāng)前 StateName 對(duì)應(yīng)的狀態(tài),應(yīng)該恢復(fù)疯暑,此時(shí)會(huì)對(duì)新舊 Job 的 State 匹配性進(jìn)行校驗(yàn)训柴。
否則 stateTable 為空,表示當(dāng)前 StateName 不需要從 Checkpoint 恢復(fù)妇拯,直接在 RocksDB 中創(chuàng)建一個(gè)新的 ColumnFamily 存儲(chǔ)當(dāng)前 State 的數(shù)據(jù)畦粮。
小結(jié)
在恢復(fù)過(guò)程中主要依賴之前創(chuàng)建的 Map,Map 中保存的從 Checkpoint 中恢復(fù)出來(lái)的狀態(tài)數(shù)據(jù)乖阵。如果 Map 中有對(duì)應(yīng) StateName 的數(shù)據(jù)宣赔,則對(duì)其進(jìn)行校驗(yàn)并恢復(fù);如果 Map 中找不到瞪浸,則創(chuàng)建新的儒将。
七、 總結(jié)
本文首先介紹了 RocksDBKeyedStateBackend 創(chuàng)建流程对蒲,并分別介紹 RocksDB 三種模式下的 State 恢復(fù)流程钩蚊,分別是:NoneRestoreOperation、IncrementalRestoreOperation蹈矮、FullRestoreOperation 三種模式砰逻。之后介紹 HeapKeyedStateBackend 恢復(fù)流程。最后介紹了用戶定義的 KeyedState 創(chuàng)建流程泛鸟,創(chuàng)建流程會(huì)介紹 Checkpoint 處恢復(fù)的 State 如何與代碼中創(chuàng)建的 State 關(guān)聯(lián)起來(lái)蝠咆。