Flink的State擴(kuò)容機(jī)制

何為State

為實(shí)現(xiàn)增量計(jì)算和容錯(cuò)寂汇,F(xiàn)link提出了State機(jī)制西采,本質(zhì)上State就是用來存放計(jì)算過程中各節(jié)點(diǎn)的中間結(jié)果或元數(shù)據(jù)等罐呼,并提供Exactly-Once語義鞠柄。

流計(jì)算的大部分場景均是增量計(jì)算的,數(shù)據(jù)逐條被處理嫉柴,每次當(dāng)前結(jié)果均是基于上一次計(jì)算結(jié)果之上進(jìn)行處理的厌杜,這勢必需要將上一次的計(jì)算結(jié)果進(jìn)行存儲(chǔ)持久化。

目前Flink有3種State存儲(chǔ)實(shí)現(xiàn):

  • HeapStateBackend计螺,內(nèi)存夯尽,存放數(shù)據(jù)量小侧馅,用于開發(fā)測試,生產(chǎn)環(huán)境不建議呐萌;
  • FsStateBackend,分布式文件系統(tǒng)(HDFS等)谊娇,不支持增量肺孤,可用于大State,可用于生產(chǎn)環(huán)境济欢;
  • RockDBStateBackend赠堵,RockDB,支持增量法褥,可用于大State茫叭,可用于生產(chǎn)環(huán)境。

生產(chǎn)環(huán)境下的最佳實(shí)踐為:

2.jpg

State先在本地存儲(chǔ)到RockDB半等,然后異步寫入到HDFS揍愁,即避免了HeapStateBackend的單節(jié)點(diǎn)資源限制(物理內(nèi)存、機(jī)器宕機(jī)丟失數(shù)據(jù)等)杀饵,也減少了分布式寫入帶來的網(wǎng)絡(luò)IO開銷莽囤。

State分類

從Operator和Data角度可將State分為2類:

  • OperatorState:Source Connector的實(shí)現(xiàn)中就會(huì)用OperatorState來記錄source數(shù)據(jù)讀取的offset。
  • KeyedState:groupby或partitionBy組成的內(nèi)容切距,即為key朽缎,每個(gè)key均有自己的state,且key與key之間的state不可見谜悟。

State擴(kuò)容

所謂State擴(kuò)容话肖,指的當(dāng)算子并行度發(fā)生改變時(shí),其需要進(jìn)行相應(yīng)的組織調(diào)整葡幸。

如下圖所示:

3.jpg

OperatorState擴(kuò)容

OperatorState往往以ListState<T>的形式存在最筒,如FlinkKafkaConsumerBase:

@Internal
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements
        CheckpointListener,
        ResultTypeQueryable<T>,
        CheckpointedFunction {
    ... 
    /** Accessor for state in the operator state backend. */
    private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;
    ...
        }

此時(shí),T對應(yīng)Tuple2<KafkaTopicPartition, Long>礼患,KafkaTopicPartition代表Kafka Topic的一個(gè)Partition是钥,Long代表當(dāng)前Partition的offset。

假設(shè)某Topic的partition數(shù)目為5缅叠,且Source的并行度=1悄泥,則對應(yīng)的State如下所示:

4.jpg

當(dāng)Source的并行度修改為2之后,Task與State的對應(yīng)關(guān)系如下:

5.jpg

KeyedState擴(kuò)容

Flink Source執(zhí)行keyBy之后肤粱,各個(gè)元素會(huì)基于key鏈接到下游不同的并行Operator上弹囚,流計(jì)算中同時(shí)會(huì)涉及到KeyedState的組織。

key的數(shù)目一般大于Operator的并行度parallelism,最直觀的做法是將key的hash值與并行度parallelism取余逗堵。

假設(shè)上游10個(gè)元素臭增,其keyHash分別為{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}瞪讼,下游Operator的并行度parallelism為2劫狠。

  • operatorIndex=0胡嘿,分配到的元素有0, 2, 4, 6, 8按傅,維護(hù)的State-0值為0, 2, 4, 6, 8
  • operatorIndex=1含末,分配到的元素有1, 3, 5, 7, 9灸异,維護(hù)的State-1值為1, 3, 5, 7, 9

假設(shè)下游Operator的并行度parallelism為修改為3府适,此時(shí):

  • operatorIndex=0,分配到的元素有0, 3, 6, 9肺樟,維護(hù)的State-0值為0, 3, 6, 9
  • operatorIndex=1檐春,分配到的元素有1, 4, 7,維護(hù)的State-1值為1, 4, 7
  • operatorIndex=2么伯,分配到的元素有2, 5, 8疟暖,維護(hù)的State-2值為2, 5, 8

假如并行度parallelism發(fā)生改變的話,則前面維護(hù)好的State也需要重新組織一遍田柔。KeyedState數(shù)據(jù)較大時(shí)俐巴,數(shù)據(jù)重新組織的代價(jià)較高。

為了解決上述問題凯楔,F(xiàn)link采用了一種KeyGroupRange的機(jī)制窜骄,基本思想是將各元素先分配到最細(xì)粒度的組中,F(xiàn)link將其稱為KeyGroup摆屯,KeyGroup也是KeyedState的最小組織單位邻遏。然后并行Operator持有各自的KeyGroup集合即可,該集合即所謂的KeyGroupRange虐骑。

public class KeyGroupRange implements KeyGroupsList, Serializable {
    ...
    private final int startKeyGroup;
    private final int endKeyGroup;
    ...
}

很明顯准验,其通過一個(gè)范圍來定義集合,范圍起點(diǎn)為startKeyGroup廷没,終點(diǎn)為endKeyGroup糊饱,左閉右閉。

我們知道颠黎,各個(gè)算子均有最大并行度maxParallelism另锋,所以可以利用key的hash值與maxParallelism進(jìn)行取模來完成KeyGroup的構(gòu)建。

public static int assignToKeyGroup(Object key, int maxParallelism) {
    return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
}

public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
    return MathUtils.murmurHash(keyHash) % maxParallelism;
}

Flink沒有直接使用hashcode狭归,而是在hashcode的基礎(chǔ)上又調(diào)用了murmurHash方法夭坪,以保證盡量的散列。

現(xiàn)在有maxParallelism個(gè)KeyGroup过椎,需要將其分配到parallelism個(gè)并行算子中室梅,每個(gè)并行算子持有1個(gè)KeyGroupRange,其起終點(diǎn)的計(jì)算方式如下:

public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
    int maxParallelism,
    int parallelism,
    int operatorIndex) {

    checkParallelismPreconditions(parallelism);
    checkParallelismPreconditions(maxParallelism);

    Preconditions.checkArgument(maxParallelism >= parallelism,
        "Maximum parallelism must not be smaller than parallelism.");

    int start = ((operatorIndex * maxParallelism + parallelism - 1) / parallelism);
    int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
    return new KeyGroupRange(start, end);
}

最后,每個(gè)元素對應(yīng)的算子計(jì)算方式如下:

public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
    return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
}

public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
    return keyGroupId * parallelism / maxParallelism;
}

上面說的可能還是有點(diǎn)抽象亡鼠,下面用1個(gè)例子來實(shí)際說明:

1.jpg
  • 當(dāng)parallelism=2時(shí)可得到KeyGroupRange:

operatorIndex=0赏殃,則得到start=0,end=4:如圖kg-keys:0间涵,1仁热,2,3勾哩,4
operatorIndex=1股耽,則得到start=5,end=9:如圖kg-keys:5钳幅,6,7炎滞,8敢艰,9

  • 當(dāng)parallelism=3時(shí)可得到KeyGroupRange:

operatorIndex=0,則得到start=0册赛,end=3:如圖kg-keys:0钠导,1,2森瘪,3
operatorIndex=1牡属,則得到start=4,end=6:如圖kg-keys:4扼睬,5逮栅,6
operatorIndex=2,則得到start=7窗宇,end=9:如圖kg-keys:7措伐,8,9

采用KeyGroupRange機(jī)制军俊,只要Flink任務(wù)的maxParallelism配置不變侥加,無論算子的parallelism如何變化,底層的KeyedSate均不需要重新組織粪躬。

核心思想就是: 不直接操作數(shù)據(jù)担败,只操作數(shù)據(jù)的指針

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末镰官,一起剝皮案震驚了整個(gè)濱河市提前,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌朋魔,老刑警劉巖岖研,帶你破解...
    沈念sama閱讀 219,110評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異,居然都是意外死亡孙援,警方通過查閱死者的電腦和手機(jī)害淤,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,443評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來拓售,“玉大人窥摄,你說我怎么就攤上這事〈∮伲” “怎么了崭放?”我有些...
    開封第一講書人閱讀 165,474評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長鸽凶。 經(jīng)常有香客問我币砂,道長,這世上最難降的妖魔是什么玻侥? 我笑而不...
    開封第一講書人閱讀 58,881評論 1 295
  • 正文 為了忘掉前任决摧,我火速辦了婚禮,結(jié)果婚禮上凑兰,老公的妹妹穿的比我還像新娘掌桩。我一直安慰自己,他們只是感情好姑食,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,902評論 6 392
  • 文/花漫 我一把揭開白布波岛。 她就那樣靜靜地躺著,像睡著了一般音半。 火紅的嫁衣襯著肌膚如雪则拷。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,698評論 1 305
  • 那天曹鸠,我揣著相機(jī)與錄音隔躲,去河邊找鬼。 笑死物延,一個(gè)胖子當(dāng)著我的面吹牛宣旱,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播叛薯,決...
    沈念sama閱讀 40,418評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼浑吟,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了耗溜?” 一聲冷哼從身側(cè)響起组力,我...
    開封第一講書人閱讀 39,332評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎抖拴,沒想到半個(gè)月后燎字,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體腥椒,經(jīng)...
    沈念sama閱讀 45,796評論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,968評論 3 337
  • 正文 我和宋清朗相戀三年候衍,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了笼蛛。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,110評論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡蛉鹿,死狀恐怖滨砍,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情妖异,我是刑警寧澤惋戏,帶...
    沈念sama閱讀 35,792評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站他膳,受9級特大地震影響响逢,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜棕孙,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,455評論 3 331
  • 文/蒙蒙 一龄句、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧散罕,春花似錦、人聲如沸傀蓉。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,003評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽葬燎。三九已至误甚,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間谱净,已是汗流浹背窑邦。 一陣腳步聲響...
    開封第一講書人閱讀 33,130評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留壕探,地道東北人冈钦。 一個(gè)月前我還...
    沈念sama閱讀 48,348評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像李请,于是被迫代替她去往敵國和親瞧筛。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,047評論 2 355

推薦閱讀更多精彩內(nèi)容