在流計算中,事件會持續(xù)不斷的產(chǎn)生,如果每次計算都是相互獨立的嘹履,不依賴于上下游事件,則稱為無狀態(tài) (stateless) 計算债热;如果每次的計算依賴于之前或者后續(xù)的事件砾嫉,則稱為有狀態(tài) (stateful) 計算。Flink 中的狀態(tài)用 State 抽象窒篱,用來保存中間計算結(jié)果或者緩存數(shù)據(jù)焕刮,State 是 Flink 內(nèi)部實現(xiàn) Exactly-Once的基礎。
狀態(tài)類型
和 redis 類似墙杯,F(xiàn)link 按照數(shù)據(jù)類型的不同配并,定義了多種 State 接口,具體如下:
-
ValueState<T>
單值狀態(tài)高镐,與數(shù)據(jù)的 key 綁定溉旋;提供了 update(T value) 方法更新值,value() 方法獲取狀態(tài)值嫉髓。
-
ListState<T>
Key 上的狀態(tài)值為一個 List观腊;提供了 add 邑闲,get 方法來分別增加和獲取數(shù)據(jù)。
-
MapState<UK, UV>
Key 上的狀態(tài)值為一個 Map梧油;提供了 put监憎,putAll, get 方法來增加和獲取數(shù)據(jù)。
-
ReducingState<T>
實現(xiàn)的 ReduceFunction 中使用的 state, 在 reduce 方法之前婶溯,會先調(diào)用 ReducingState 的 add 方法,reduce 方法中的第一個參數(shù)就是狀態(tài)值偷霉。
-
AggregatingState<IN, OUT>
聚合 state; 在 AggregateFunction 的 add 方法調(diào)用之前迄委,會先調(diào)用 AggregatingState 的 add 方法,傳入 acc类少。
按照算子是否有 key, Flink State 又被劃分為 KeyedState 和 OperatorState叙身。
類型 | state |
---|---|
KeyedState | ValueState<br/ >ListState<br />ReducingState<br />AggregationState<br />MapState<br /> |
OperatorState | ListState |
今天我們重點講 KeyedState 里的最簡單的 ValueState 的實現(xiàn)。KeyedState硫狞,顧名思義信轿,要與 key 綁定,即只能用在 KeyedStream 流之后残吩;每一個 key 對應一個 State 值财忽;Key 除了有分區(qū)的作用,在狀態(tài)管理當中泣侮,它還用于計算 keyGroupIndex:
// AbstractStreamOperator.java line:463
private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector) throws Exception {
if (selector != null) {
Object key = selector.getKey(record.getValue());
setCurrentKey(key);
}
}
// StreamOperatorStateHandler.java line:281
public void setCurrentKey(Object key) {
if (keyedStateBackend != null) {
try {
// need to work around type restrictions
@SuppressWarnings("unchecked,rawtypes")
AbstractKeyedStateBackend rawBackend = (AbstractKeyedStateBackend) keyedStateBackend;
rawBackend.setCurrentKey(key);
} catch (Exception e) {
throw new RuntimeException("Exception occurred while setting the current key context.", e);
}
}
}
// 接著看 AbstractKeyedStateBackend.java line:172
public void setCurrentKey(K newKey) {
notifyKeySelected(newKey);
this.keyContext.setCurrentKey(newKey); this.keyContext.setCurrentKeyGroupIndex(KeyGroupRangeAssignment.assignToKeyGroup(newKey,numberOfKeyGroups));
}
從上面可以看出, 先設置 key, 再設置 keyGroupIndex即彪,具體 keyGroupIndex 的作用是什么,我們后面會講活尊。
狀態(tài)描述
State 是暴露給用戶的接口隶校,那么就需要指定狀態(tài)的一些屬性,如 name, type, ttl 等蛹锰。Flink 中用 StateDescriptor 來描述一個狀態(tài)深胳,在對應的 StateBackend (狀態(tài)后端) 中,調(diào)用 create 方法得到對應的 State 對象铜犬。下面以一個簡單的 demo 來演示舞终。
private transient ValueState<Tuple2<Integer, Integer>> state;
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Tuple2<Integer, Integer>> descriptor =
new ValueStateDescriptor<>("average", TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {}));
state = getRuntimeContext().getState(descriptor);
}
我們順著方法調(diào)用鏈走下去,跳過中間一些無關的代碼癣猾,直接看 TtlStateFactory 的 createStateAndWrapWithTtlIfEnabled 方法:
public static <K, N, SV, TTLSV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, SV> stateDesc,
KeyedStateBackend<K> stateBackend,
TtlTimeProvider timeProvider) throws Exception {
...
// 是否為狀態(tài)設置了 ttl权埠,如果設置 ttl,使用TtlStateFactory創(chuàng)建state,如果沒有煎谍,直接調(diào)用 stateBackend創(chuàng)建state
return stateDesc.getTtlConfig().isEnabled() ?
new TtlStateFactory<K, N, SV, TTLSV, S, IS>(
namespaceSerializer, stateDesc, stateBackend, timeProvider)
.createState() :
stateBackend.createInternalState(namespaceSerializer, stateDesc);
}
上面兩個創(chuàng)建 state 的不同分支區(qū)別就是是否設置了 ttl攘蔽,其它的基本一樣。接著看 stateBackend.createInternalState 方法的實現(xiàn)
public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull StateDescriptor<S, SV> stateDesc,
@Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {
StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
if (stateFactory == null) {
String message = String.format("State %s is not supported by %s",
stateDesc.getClass(), this.getClass());
throw new FlinkRuntimeException(message);
}
// 這里是狀態(tài)實現(xiàn)原理的重點, 狀態(tài)的獲取/更新都是通過 statetable 操作的呐粘,我們重點看下里面的實現(xiàn)
StateTable<K, N, SV> stateTable = tryRegisterStateTable(
namespaceSerializer, stateDesc, getStateSnapshotTransformFactory(stateDesc, snapshotTransformFactory));
// 根據(jù)狀態(tài)描述的類型满俗,調(diào)用對應的構造方法转捕。
return stateFactory.createState(stateDesc, stateTable, getKeySerializer());
}
StateTable 是狀態(tài)實現(xiàn)原理的重點, 狀態(tài)的獲取/更新都是通過 statetable 操作的,我們重點看下里面的實現(xiàn)唆垃。
private <N, V> StateTable<K, N, V> tryRegisterStateTable(
TypeSerializer<N> namespaceSerializer,
StateDescriptor<?, V> stateDesc,
@Nonnull StateSnapshotTransformFactory<V> snapshotTransformFactory) throws StateMigrationException {
@SuppressWarnings("unchecked")
StateTable<K, N, V> stateTable = (StateTable<K, N, V>) registeredKVStates.get(stateDesc.getName());
TypeSerializer<V> newStateSerializer = stateDesc.getSerializer();
if (stateTable != null) {
// ...
} else {
RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
stateDesc.getType(),
stateDesc.getName(),
namespaceSerializer,
newStateSerializer,
snapshotTransformFactory);
// 得到 statetable對象五芝。啟用 statetable的構造方法。初始化size為128(默認最大并行度)的 StateMap數(shù)組
stateTable = snapshotStrategy.newStateTable(keyContext, newMetaInfo, keySerializer);
registeredKVStates.put(stateDesc.getName(), stateTable);
}
return stateTable;
}
StateTable 里提供的 get, put 操作實現(xiàn)了對狀態(tài)值的 獲取和更新辕万,StateTable是一個抽象類枢步,如下:
NestedMapsStateTable 使用兩層嵌套的 HashMap 保存狀態(tài)數(shù)據(jù),支持同步快照渐尿;CopyOnWriteStateTable 使用 CopyOnWriteStateMap 來保存狀態(tài)數(shù)據(jù)醉途,支持異步快照。下面是 CopyOnWriteStateTable里的 put 和 get 方法的實現(xiàn)
private S get(K key, int keyGroupIndex, N namespace) {
checkKeyNamespacePreconditions(key, namespace);
// stateMap數(shù)組默認size為128砖茸,keyGroupIndex為key的hash與128取模得到
// StateMap內(nèi)部封裝了MapEntry隘擎,類似于 HashMap,基于鏈表加數(shù)組的實現(xiàn)
StateMap<K, N, S> stateMap = getMapForKeyGroup(keyGroupIndex);
if (stateMap == null) {
return null;
}
// 從map中得到state value
return stateMap.get(key, namespace);
}
public void put(K key, int keyGroup, N namespace, S state) {
checkKeyNamespacePreconditions(key, namespace);
// stateMap數(shù)組默認size為128凉夯,keyGroupIndex為key的hash與128取模得到
StateMap<K, N, S> stateMap = getMapForKeyGroup(keyGroup);
stateMap.put(key, namespace, state);
}
上面代碼比較簡單明了货葬,也就不用多做解釋。
總結(jié)
- StateDescribe 持有狀態(tài)的描述劲够, StateBackend 通過它來創(chuàng)建 State 對象
- State 對象里封裝了 StateTable震桶,StateTable 負責對State 做 snapshot 到對應的 StateBackend。
- StateTable 里封裝了 StateMap征绎,為存儲 state 的內(nèi)存介質(zhì)尼夺,負責狀態(tài)的更新/新增/獲取 (基于內(nèi)存)。
- StateMap 在 StateTable 中為一個數(shù)組炒瘸,默認 size 為最大并行度 128淤堵,所以會存在不同 key 對應一個StateMap對象。里面的實現(xiàn)類似于 HashMap顷扩,為鏈表加數(shù)組的數(shù)據(jù)結(jié)構拐邪。