本文僅為筆者平日學(xué)習(xí)記錄之用间坐,侵刪
原文:https://mp.weixin.qq.com/s/6Oi_1tP-7Jns3ZguMW7wLg
在之前《StreamTask 初始化流程》的文章中,省略掉了 TM 端恢復(fù) State 的詳細(xì)過程拔妥,本文主要講述:
- OperatorState 的恢復(fù)和創(chuàng)建流程
- Checkpoint 處恢復(fù)的 State 如何與代碼中創(chuàng)建的 State 關(guān)聯(lián)起來(lái)
一造垛、 TM 端恢復(fù) OperatorState 的流程
StateBackend 創(chuàng)建 OperatorStateBackend 時(shí) TM 端會(huì)恢復(fù) OperatorState会喝。目前 Flink 支持的三種 StateBackend 都對(duì)應(yīng)同一種 OperatorStateBackend俊卤,即:DefaultOperatorStateBackend,具體 new DefaultOperatorStateBackend 的過程由建造器 DefaultOperatorStateBackendBuilder 完成浸赫。
三種 StateBackend 的 createOperatorStateBackend 方法非常相似,源碼如下:
public OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier,
@Nonnull Collection<OperatorStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry) throws Exception {
return new DefaultOperatorStateBackendBuilder(
env.getUserClassLoader(),
env.getExecutionConfig(),
isUsingAsynchronousSnapshots(),
stateHandles,
cancelStreamRegistry).build();
}
所有的初始化流程都在 DefaultOperatorStateBackendBuilder 類的 build 方法中赃绊,build 方法源碼如下所示:
@Override
public DefaultOperatorStateBackend build() throws BackendBuildingException {
AbstractSnapshotStrategy<OperatorStateHandle> snapshotStrategy =
new DefaultOperatorStateBackendSnapshotStrategy(XXX);
OperatorStateRestoreOperation restoreOperation =
new OperatorStateRestoreOperation(XXX);
try {
// OperatorState 恢復(fù)流程
restoreOperation.restore();
} catch (Exception e) {
IOUtils.closeQuietly(cancelStreamRegistryForBackend);
throw new BackendBuildingException("XXX", e);
}
return new DefaultOperatorStateBackend(XXX);
}
build 方法中除了構(gòu)造了幾個(gè)對(duì)象以外既峡,重點(diǎn)執(zhí)行了 OperatorStateRestoreOperation 的 restore 方法,restore 方法就是恢復(fù)流程凭戴。
先介紹 OperatorStateRestoreOperation 類中兩個(gè)重要的 Map:
- registeredOperatorStates 用于保存 StateName 和 ListState 的映射關(guān)系涧狮;
- registeredBroadcastStates 用于保存 StateName 和 BroadcastState 的映射關(guān)系
restore 源碼如下所示:
// OperatorStateRestoreOperation 類中兩個(gè)重要的 Map
// 保存 StateName 和 ListState 的映射關(guān)系
private final Map<String, PartitionableListState<?>> registeredOperatorStates;
// 保存 StateName 和 BroadcastState 的映射關(guān)系
private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;
// Operator State 真正的 restore 流程
@Override
public Void restore() throws Exception {
// stateHandles 為空炕矮,表示沒有要恢復(fù)的 State
if (stateHandles.isEmpty()) {
return null;
}
// 遍歷所有 stateHandles
for (OperatorStateHandle stateHandle : stateHandles) {
// 通過 stateHandle 可以獲取 InputStream 讀取數(shù)據(jù)
FSDataInputStream in = stateHandle.openInputStream();
try {
List<StateMetaInfoSnapshot> restoredOperatorMetaInfoSnapshots =
backendSerializationProxy.getOperatorStateMetaInfoSnapshots();
// 從元數(shù)據(jù)中創(chuàng)建 PartitionableListStates么夫,并沒有恢復(fù)真正的 State
for (StateMetaInfoSnapshot restoredSnapshot : restoredOperatorMetaInfoSnapshots) {
final RegisteredOperatorStateBackendMetaInfo<?> restoredMetaInfo =
new RegisteredOperatorStateBackendMetaInfo<>(restoredSnapshot);
// registeredOperatorStates 中維護(hù)的 StateName 與 ListState 的映射關(guān)系
PartitionableListState<?> listState = registeredOperatorStates
.get(restoredSnapshot.getName());
// listState == null 表示當(dāng)前 State 還未創(chuàng)建,則創(chuàng)建肤视,并保存到 map 中
if (null == listState) {
// 這里只是依賴 MetaInfo 創(chuàng)建了 PartitionableListState档痪,并沒有恢復(fù)真正的 State 數(shù)據(jù)
listState = new PartitionableListState<>(restoredMetaInfo);
// 創(chuàng)建出的 State 數(shù)據(jù) put 到 registeredOperatorStates 中
registeredOperatorStates.put(listState.getStateMetaInfo().getName(), listState);
}
}
// 真正恢復(fù) State 的操作
for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> nameToOffsets :
stateHandle.getStateNameToPartitionOffsets().entrySet()) {
final String stateName = nameToOffsets.getKey();
// 通過 StateName 從 registeredOperatorStates 中獲取 ListState
// 因?yàn)橹耙呀?jīng)根據(jù)元數(shù)據(jù)創(chuàng)建了 State,
// 所以這里 get 不到邢滑,只能是因?yàn)楫?dāng)前的 StateName 屬于 BroadcastState
PartitionableListState<?> listStateForName =
registeredOperatorStates.get(stateName);
// listState 為 null腐螟,表示恢復(fù) Broadcast 相關(guān)的 State
if (listStateForName == null) {
BackendWritableBroadcastState<?, ?> broadcastStateForName =
registeredBroadcastStates.get(stateName);
deserializeBroadcastStateValues(broadcastStateForName,
in, nameToOffsets.getValue());
} else {
// 恢復(fù) ListState,將恢復(fù)出來(lái)的元素 add 到 ListState 中
deserializeOperatorStateValues(listStateForName,
in, nameToOffsets.getValue());
}
}
} finally {
Thread.currentThread().setContextClassLoader(restoreClassLoader);
if (closeStreamOnCancelRegistry.unregisterCloseable(in)) {
IOUtils.closeQuietly(in);
}
}
}
return null;
}
restore 方法中拿到的就是 JM 分配給當(dāng)前 subtask 的 stateHandles困后,如果 stateHandles 為空表示沒有要恢復(fù)的 State 則直接返回 null乐纸,可能是因?yàn)槿蝿?wù)是直接啟動(dòng),而不是從 Checkpoint 處恢復(fù)摇予。否則 stateHandles 不為空的情況汽绢,就遍歷一個(gè)個(gè) OperatorStateHandle,通過 stateHandle 可以獲取 InputStream 讀取數(shù)據(jù)侧戴。
首先讀出元數(shù)據(jù)宁昭,用于創(chuàng)建 PartitionableListState,并沒有真正恢復(fù) State 數(shù)據(jù)酗宋,PartitionableListState 是 OperatorState 對(duì) ListState 的具體實(shí)現(xiàn)积仗。ListState 維護(hù)在 registeredOperatorStates 這個(gè) Map 中,通過 StateName 從 registeredOperatorStates 中 get蜕猫,get 不到時(shí)寂曹,通過元數(shù)據(jù)創(chuàng)建 State,并存放在 registeredOperatorStates 中回右。
代碼中省略了 BroadcastState 的創(chuàng)建流程隆圆,整體流程與 ListState 流程類似,只不過 BroadcastState 維護(hù)在 registeredBroadcastStates 中楣黍。
最后真正的恢復(fù) State 數(shù)據(jù)匾灶,對(duì)于 ListState 而言將恢復(fù)出來(lái)的元素 add 到 ListState 中∽馄恢復(fù) State 數(shù)據(jù)的過程其實(shí)用反序列化器對(duì)狀態(tài)數(shù)據(jù)反序列化生成對(duì)象的過程阶女。反序列化器維護(hù)在 PartitionableListState 的元數(shù)據(jù)中颊糜。
到這里 OperatorState 就恢復(fù)完成,此時(shí)映射關(guān)系已經(jīng)保存到 OperatorStateRestoreOperation 類的兩個(gè) Map 集合中⊥翰龋現(xiàn)在又回到 DefaultOperatorStateBackendBuilder 類的 build 方法中衬鱼,就會(huì)發(fā)現(xiàn)其實(shí)這兩個(gè) Map 是好多地方共享的。這里再貼一下 build 方法的完整源碼憔杨,重點(diǎn)關(guān)注兩個(gè) Map:
@Override
public DefaultOperatorStateBackend build() throws BackendBuildingException {
// 保存 StateName 和 ListState 的映射關(guān)系
Map<String, PartitionableListState<?>> registeredOperatorStates = new HashMap<>();
// 保存 StateName 和 BroadcastState 的映射關(guān)系
Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates =
new HashMap<>();
CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
OperatorStateRestoreOperation restoreOperation = new OperatorStateRestoreOperation(
cancelStreamRegistry,
userClassloader,
// 將兩個(gè) Map 傳遞進(jìn)去鸟赫,即:restore 過程中,映射關(guān)系會(huì)存儲(chǔ)在這兩個(gè) Map 中
registeredOperatorStates,
registeredBroadcastStates,
restoreStateHandles
);
try {
// OperatorState 恢復(fù)流程
restoreOperation.restore();
} catch (Exception e) {
IOUtils.closeQuietly(cancelStreamRegistryForBackend);
throw new BackendBuildingException("XXX", e);
}
AbstractSnapshotStrategy<OperatorStateHandle> snapshotStrategy =
new DefaultOperatorStateBackendSnapshotStrategy(
userClassloader,
asynchronousSnapshots,
// 將兩個(gè) Map 傳遞給 DefaultOperatorStateBackendSnapshotStrategy
registeredOperatorStates,
registeredBroadcastStates,
cancelStreamRegistryForBackend);
return new DefaultOperatorStateBackend(
executionConfig,
cancelStreamRegistryForBackend,
// 再將兩個(gè) Map 傳遞給 DefaultOperatorStateBackend
registeredOperatorStates,
registeredBroadcastStates,
new HashMap<>(),
new HashMap<>(),
snapshotStrategy
);
}
可以看到 build 方法剛開始會(huì) new 兩個(gè) Map消别,然后傳遞給了 OperatorStateRestoreOperation抛蚤,之后 OperatorStateRestoreOperation 的 restore 流程(也就是上述分析的恢復(fù)流程)實(shí)際上將 Checkpoint 中恢復(fù)出來(lái)的映射關(guān)系保存到了這兩個(gè) Map 中。之后兩個(gè) Map 又傳遞給了 DefaultOperatorStateBackendSnapshotStrategy 和 DefaultOperatorStateBackend寻狂。
所以得出結(jié)論:DefaultOperatorStateBackend 中持有從 Checkpoint 處恢復(fù)出來(lái)的 StateName 與具體 State 的映射關(guān)系岁经。
到這里 DefaultOperatorStateBackend 就創(chuàng)建完成了,同時(shí)留兩個(gè)問題:
上面流程雖然將 OperatorState 從 Checkpoint 中恢復(fù)了蛇券,但用戶在算子中創(chuàng)建的 State 如何與 Checkpoint 中恢復(fù)的 OperatorState 關(guān)聯(lián)起來(lái)呢缀壤?
另外對(duì)于直接啟動(dòng),不從 Checkpoint 處恢復(fù)的任務(wù)纠亚,OperatorState 又是如何創(chuàng)建出來(lái)的塘慕?
帶著這兩個(gè)問題閱讀下面流程。
二蒂胞、 用戶定義的 OperatorState 創(chuàng)建流程
Flink 源碼中最典型的使用 OperatorState 的場(chǎng)景就是 FlinkConsumer 使用 ListState 去維護(hù) Kafka 的 offset 信息图呢,所以本文就從這塊源碼入手,看一下這個(gè) ListState 創(chuàng)建流程啤誊。
FlinkKafkaConsumerBase 類的 initializeState 方法中用到了 getUnionListState 創(chuàng)建一個(gè) ListState岳瞭,簡(jiǎn)潔版源碼如下所示:
@Override
public final void initializeState(FunctionInitializationContext context) {
OperatorStateStore stateStore = context.getOperatorStateStore();
unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {})));
}
這里調(diào)用的 OperatorStateStore 的 getUnionListState 方法。OperatorStateStore 是個(gè)接口蚊锹,它只有一個(gè)實(shí)現(xiàn)類瞳筏,就是前面創(chuàng)建出來(lái)的 DefaultOperatorStateBackend。所以這里會(huì)調(diào)用 DefaultOperatorStateBackend 類的 getUnionListState 方法牡昆。不過 DefaultOperatorStateBackend 中還有一個(gè) getListState(ListStateDescriptor stateDescriptor) 方法姚炕,這也就是 OperatorState 類型的 ListState 兩種獲取方式《妫可以看一下這兩個(gè)方法的源碼:
@Override
public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) {
return getListState(stateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
}
@Override
public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) {
return getListState(stateDescriptor, OperatorStateHandle.Mode.UNION);
}
源碼中可以看到柱宦,無(wú)論業(yè)務(wù)使用的是 getListState 還是 getUnionListState 方法獲取 ListState ,最后都會(huì)調(diào)用同一個(gè)方法播瞳,即:getListState(ListStateDescriptor stateDescriptor, OperatorStateHandle.Mode mode)掸刊。加了一個(gè)參數(shù) OperatorStateHandle.Mode 用于區(qū)分 OperatorState 的模式:
- getListState 對(duì)應(yīng) SPLIT_DISTRIBUTE 模式
- getUnionListState 對(duì)應(yīng) UNION 模式
getListState(stateDescriptor, mode) 方法源碼如下所示:
// 無(wú)論是 getListState 還是 getUnionListState 方法都會(huì)調(diào)用這里,
// 只不過傳遞的 Mode 參數(shù)不同而已
private <S> ListState<S> getListState(
ListStateDescriptor<S> stateDescriptor,
OperatorStateHandle.Mode mode) throws StateMigrationException {
String name = Preconditions.checkNotNull(stateDescriptor.getName());
TypeSerializer<S> partitionStateSerializer =
Preconditions.checkNotNull(stateDescriptor.getElementSerializer());
PartitionableListState<S> partitionableListState = (PartitionableListState<S>)
registeredOperatorStates.get(name);
// registeredOperatorStates 中維護(hù)的是 Checkpoint 中恢復(fù)的 StateName 和 ListState 的映射關(guān)系
// 如果 partitionableListState == null 表示從 Checkpoint 中沒有恢復(fù)出這個(gè) State赢乓,
// 即:這是一個(gè)新的 State忧侧,則新建一個(gè) PartitionableListState石窑,并維護(hù)在 Map 中
if (null == partitionableListState) {
partitionableListState = new PartitionableListState<>(
new RegisteredOperatorStateBackendMetaInfo<>(
name,
partitionStateSerializer,
mode));
registeredOperatorStates.put(name, partitionableListState);
} else {
// State 已經(jīng)從 Checkpoint 中恢復(fù)了,檢查兼容性問題
// 這里會(huì)檢查 StateName 和 AssignmentMode 是否可以匹配
checkStateNameAndMode(
partitionableListState.getStateMetaInfo().getName(),
name,
partitionableListState.getStateMetaInfo().getAssignmentMode(),
mode);
RegisteredOperatorStateBackendMetaInfo<S> restoredPartitionableListStateMetaInfo =
partitionableListState.getStateMetaInfo();
// 檢查 序列化是否兼容
TypeSerializer<S> newPartitionStateSerializer = partitionStateSerializer.duplicate();
TypeSerializerSchemaCompatibility<S> stateCompatibility =
restoredPartitionableListStateMetaInfo.
updatePartitionStateSerializer(newPartitionStateSerializer);
// 不兼容蚓炬,則拋出異常
if (stateCompatibility.isIncompatible()) {
throw new StateMigrationException("XXX.");
}
partitionableListState.setStateMetaInfo(restoredPartitionableListStateMetaInfo);
}
accessedStatesByName.put(name, partitionableListState);
// 返回 State
return partitionableListState;
}
getListState(stateDescriptor, mode) 方法首先通過 name 從 registeredOperatorStates 中 get 對(duì)應(yīng)的 ListState 保存到 partitionableListState 中松逊,registeredOperatorStates 維護(hù)的是 Checkpoint 中恢復(fù)的 StateName 和 ListState 的映射關(guān)系。所以 partitionableListState == null 表示從 Checkpoint 中沒有恢復(fù)出這個(gè) State肯夏,即:這是一個(gè)新的 State经宏,所以新建一個(gè) PartitionableListState,并保存在 registeredOperatorStates 中驯击。
反之烁兰,partitionableListState != null 表示 State 已經(jīng)從 Checkpoint 中恢復(fù)了,開始檢查兼容性余耽,首先會(huì)檢查 Checkpoint 中恢復(fù)的 State 和用戶新申請(qǐng)的 StateName 和 AssignmentMode 是否可以匹配缚柏。
- StateName 和 name 肯定是匹配的,因?yàn)?partitionableListState 是根據(jù) name get 出來(lái)的碟贾。
- AssignmentMode 枚舉用于區(qū)分應(yīng)用層使用的 getListState 恢復(fù)還是 getUnionListState 恢復(fù),getListState 表示 SPLIT_DISTRIBUTE 模式轨域,getUnionListState 表示 UNION 模式袱耽。如果 State 中存儲(chǔ)的是 SPLIT_DISTRIBUTE 模式,但任務(wù)恢復(fù)時(shí)干发,代碼改成了 getUnionListState朱巨,實(shí)際上 State 不能正常恢復(fù)的枉长。
StateName 和 AssignmentMode 檢查完畢后冀续,會(huì)檢查序列化是否兼容,不兼容必峰,則拋出異常洪唐。兼容則會(huì)返回 State。
上述流程就回答了最開始提的兩個(gè)問題:
-
OperatorState 從 Checkpoint 中恢復(fù)后吼蚁,用戶在算子中創(chuàng)建的 State 如何與 Checkpoint 中恢復(fù)的 OperatorState 關(guān)聯(lián)起來(lái)呢凭需?
答:依賴 registeredOperatorStates 這個(gè) Map 維護(hù)了 StateName 和 ListState 的映射關(guān)系,用戶創(chuàng)建 State 是通過 StateName 從 registeredOperatorStates 中查找肝匆,如果能找到粒蜈,對(duì)其進(jìn)行兼容性檢查,檢查通過就會(huì)返回從 Checkpoint 中恢復(fù)的 ListState旗国,從而完成了關(guān)聯(lián)枯怖。
-
對(duì)于直接啟動(dòng),不從 Checkpoint 處恢復(fù)的任務(wù)能曾,OperatorState 又是如何創(chuàng)建出來(lái)的度硝?
答:對(duì)于直接啟動(dòng)的任務(wù)设捐,registeredOperatorStates 肯定是空的。創(chuàng)建 State 時(shí)塘淑,從 registeredOperatorStates 中 get 不到萝招,所以就創(chuàng)建一個(gè)新的 PartitionableListState,并保存在 registeredOperatorStates 中存捺。
到這里槐沼,OperatorState 就完成了恢復(fù),且用戶的 State 也正常的創(chuàng)建出來(lái)了捌治。
三岗钩、總結(jié)
文中首先介紹了 OperatorState 的恢復(fù)和創(chuàng)建流程,并介紹從 Checkpoint 處恢復(fù)的 State 如何與代碼中創(chuàng)建的 State 關(guān)聯(lián)起來(lái)的肖油。后續(xù)將會(huì)詳細(xì)介紹 KeyedState 的恢復(fù)創(chuàng)建流程以及如何將 Checkpoint 處恢復(fù)的 State 如何與代碼中創(chuàng)建的 State 關(guān)聯(lián)起來(lái)兼吓。