何為State
為實(shí)現(xiàn)增量計(jì)算和容錯(cuò)寂汇,F(xiàn)link提出了State機(jī)制西采,本質(zhì)上State就是用來存放計(jì)算過程中各節(jié)點(diǎn)的中間結(jié)果或元數(shù)據(jù)等罐呼,并提供Exactly-Once語義鞠柄。
流計(jì)算的大部分場景均是增量計(jì)算的,數(shù)據(jù)逐條被處理嫉柴,每次當(dāng)前結(jié)果均是基于上一次計(jì)算結(jié)果之上進(jìn)行處理的厌杜,這勢必需要將上一次的計(jì)算結(jié)果進(jìn)行存儲(chǔ)持久化。
目前Flink有3種State存儲(chǔ)實(shí)現(xiàn):
- HeapStateBackend计螺,內(nèi)存夯尽,存放數(shù)據(jù)量小侧馅,用于開發(fā)測試,生產(chǎn)環(huán)境不建議呐萌;
- FsStateBackend,分布式文件系統(tǒng)(HDFS等)谊娇,不支持增量肺孤,可用于大State,可用于生產(chǎn)環(huán)境济欢;
- RockDBStateBackend赠堵,RockDB,支持增量法褥,可用于大State茫叭,可用于生產(chǎn)環(huán)境。
生產(chǎn)環(huán)境下的最佳實(shí)踐為:
State先在本地存儲(chǔ)到RockDB半等,然后異步寫入到HDFS揍愁,即避免了HeapStateBackend的單節(jié)點(diǎn)資源限制(物理內(nèi)存、機(jī)器宕機(jī)丟失數(shù)據(jù)等)杀饵,也減少了分布式寫入帶來的網(wǎng)絡(luò)IO開銷莽囤。
State分類
從Operator和Data角度可將State分為2類:
- OperatorState:Source Connector的實(shí)現(xiàn)中就會(huì)用OperatorState來記錄source數(shù)據(jù)讀取的offset。
- KeyedState:groupby或partitionBy組成的內(nèi)容切距,即為key朽缎,每個(gè)key均有自己的state,且key與key之間的state不可見谜悟。
State擴(kuò)容
所謂State擴(kuò)容话肖,指的當(dāng)算子并行度發(fā)生改變時(shí),其需要進(jìn)行相應(yīng)的組織調(diào)整葡幸。
如下圖所示:
OperatorState擴(kuò)容
OperatorState往往以ListState<T>的形式存在最筒,如FlinkKafkaConsumerBase:
@Internal
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements
CheckpointListener,
ResultTypeQueryable<T>,
CheckpointedFunction {
...
/** Accessor for state in the operator state backend. */
private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;
...
}
此時(shí),T對應(yīng)Tuple2<KafkaTopicPartition, Long>礼患,KafkaTopicPartition代表Kafka Topic的一個(gè)Partition是钥,Long代表當(dāng)前Partition的offset。
假設(shè)某Topic的partition數(shù)目為5缅叠,且Source的并行度=1悄泥,則對應(yīng)的State如下所示:
當(dāng)Source的并行度修改為2之后,Task與State的對應(yīng)關(guān)系如下:
KeyedState擴(kuò)容
Flink Source執(zhí)行keyBy之后肤粱,各個(gè)元素會(huì)基于key鏈接到下游不同的并行Operator上弹囚,流計(jì)算中同時(shí)會(huì)涉及到KeyedState的組織。
key的數(shù)目一般大于Operator的并行度parallelism,最直觀的做法是將key的hash值與并行度parallelism取余逗堵。
假設(shè)上游10個(gè)元素臭增,其keyHash分別為{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}瞪讼,下游Operator的并行度parallelism為2劫狠。
- operatorIndex=0胡嘿,分配到的元素有0, 2, 4, 6, 8按傅,維護(hù)的State-0值為0, 2, 4, 6, 8
- operatorIndex=1含末,分配到的元素有1, 3, 5, 7, 9灸异,維護(hù)的State-1值為1, 3, 5, 7, 9
假設(shè)下游Operator的并行度parallelism為修改為3府适,此時(shí):
- operatorIndex=0,分配到的元素有0, 3, 6, 9肺樟,維護(hù)的State-0值為0, 3, 6, 9
- operatorIndex=1檐春,分配到的元素有1, 4, 7,維護(hù)的State-1值為1, 4, 7
- operatorIndex=2么伯,分配到的元素有2, 5, 8疟暖,維護(hù)的State-2值為2, 5, 8
假如并行度parallelism發(fā)生改變的話,則前面維護(hù)好的State也需要重新組織一遍田柔。KeyedState數(shù)據(jù)較大時(shí)俐巴,數(shù)據(jù)重新組織的代價(jià)較高。
為了解決上述問題凯楔,F(xiàn)link采用了一種KeyGroupRange的機(jī)制窜骄,基本思想是將各元素先分配到最細(xì)粒度的組中,F(xiàn)link將其稱為KeyGroup摆屯,KeyGroup也是KeyedState的最小組織單位邻遏。然后并行Operator持有各自的KeyGroup集合即可,該集合即所謂的KeyGroupRange虐骑。
public class KeyGroupRange implements KeyGroupsList, Serializable {
...
private final int startKeyGroup;
private final int endKeyGroup;
...
}
很明顯准验,其通過一個(gè)范圍來定義集合,范圍起點(diǎn)為startKeyGroup廷没,終點(diǎn)為endKeyGroup糊饱,左閉右閉。
我們知道颠黎,各個(gè)算子均有最大并行度maxParallelism另锋,所以可以利用key的hash值與maxParallelism進(jìn)行取模來完成KeyGroup的構(gòu)建。
public static int assignToKeyGroup(Object key, int maxParallelism) {
return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
}
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
return MathUtils.murmurHash(keyHash) % maxParallelism;
}
Flink沒有直接使用hashcode狭归,而是在hashcode的基礎(chǔ)上又調(diào)用了murmurHash方法夭坪,以保證盡量的散列。
現(xiàn)在有maxParallelism個(gè)KeyGroup过椎,需要將其分配到parallelism個(gè)并行算子中室梅,每個(gè)并行算子持有1個(gè)KeyGroupRange,其起終點(diǎn)的計(jì)算方式如下:
public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
int maxParallelism,
int parallelism,
int operatorIndex) {
checkParallelismPreconditions(parallelism);
checkParallelismPreconditions(maxParallelism);
Preconditions.checkArgument(maxParallelism >= parallelism,
"Maximum parallelism must not be smaller than parallelism.");
int start = ((operatorIndex * maxParallelism + parallelism - 1) / parallelism);
int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
return new KeyGroupRange(start, end);
}
最后,每個(gè)元素對應(yīng)的算子計(jì)算方式如下:
public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
}
public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
return keyGroupId * parallelism / maxParallelism;
}
上面說的可能還是有點(diǎn)抽象亡鼠,下面用1個(gè)例子來實(shí)際說明:
- 當(dāng)parallelism=2時(shí)可得到KeyGroupRange:
operatorIndex=0赏殃,則得到start=0,end=4:如圖kg-keys:0间涵,1仁热,2,3勾哩,4
operatorIndex=1股耽,則得到start=5,end=9:如圖kg-keys:5钳幅,6,7炎滞,8敢艰,9
- 當(dāng)parallelism=3時(shí)可得到KeyGroupRange:
operatorIndex=0,則得到start=0册赛,end=3:如圖kg-keys:0钠导,1,2森瘪,3
operatorIndex=1牡属,則得到start=4,end=6:如圖kg-keys:4扼睬,5逮栅,6
operatorIndex=2,則得到start=7窗宇,end=9:如圖kg-keys:7措伐,8,9
采用KeyGroupRange機(jī)制军俊,只要Flink任務(wù)的maxParallelism配置不變侥加,無論算子的parallelism如何變化,底層的KeyedSate均不需要重新組織粪躬。
核心思想就是: 不直接操作數(shù)據(jù)担败,只操作數(shù)據(jù)的指針。