flink解析之state

一。 概述

在流計(jì)算場(chǎng)景中,數(shù)據(jù)沒(méi)有邊界源源不斷的流入的忠蝗,每條數(shù)據(jù)流入都可能會(huì)觸發(fā)計(jì)算增淹,比如在進(jìn)行count或sum這些操作椿访,是選擇每次觸發(fā)計(jì)算將所有流入的歷史數(shù)據(jù)重新計(jì)算一邊還是每次計(jì)算都基于上次計(jì)算結(jié)果進(jìn)行增量計(jì)算呢? 從綜合考慮角度虑润,很多人都會(huì) 選擇增量計(jì)算成玫,那么問(wèn)題就產(chǎn)生了:上一次的中間計(jì)算結(jié)果保存在哪里??jī)?nèi)存拳喻?這其中會(huì)由于本身的網(wǎng)絡(luò)哭当,硬件或軟件等問(wèn)題造成某個(gè)計(jì)算節(jié)點(diǎn)失敗,對(duì)應(yīng)的上次計(jì)算結(jié)果就會(huì)丟失冗澈,在節(jié)點(diǎn)恢復(fù)時(shí)钦勘,是需要將所有歷史數(shù)據(jù)重新計(jì)算一遍的,對(duì)于這樣的結(jié)果大家是很難接受的亚亲。

二彻采。flink中state

而在flink中提出了state用來(lái)存放計(jì)算過(guò)程的節(jié)點(diǎn)中間結(jié)果或元數(shù)據(jù)等腐缤,并提供Exactly-Once語(yǔ)義,例如:執(zhí)行aggregation時(shí)在state中記錄中間聚合結(jié)果肛响,再如從kafka中攝取記錄時(shí)岭粤,是需要記錄對(duì)應(yīng)的partition的offset,而這些state數(shù)據(jù)在計(jì)算過(guò)程中會(huì)進(jìn)行持久化的特笋。state就變成了與時(shí)間相關(guān)的是對(duì)flink任務(wù)內(nèi)部數(shù)據(jù)的快照剃浇。
由于流計(jì)算大多數(shù)場(chǎng)景下都是增量計(jì)算的,數(shù)據(jù)逐條被處理猎物,每次當(dāng)前結(jié)果都是基于上一次計(jì)算結(jié)果之上進(jìn)行處理的偿渡,這也勢(shì)必要將上一次的計(jì)算結(jié)果進(jìn)行存儲(chǔ)持久化,無(wú)論是機(jī)器霸奕,網(wǎng)絡(luò)溜宽,臟數(shù)據(jù)等原因?qū)е碌某绦蝈e(cuò)誤,都能在job進(jìn)行任務(wù)恢復(fù)時(shí)提供支持质帅∈嗜啵基于這些已被持久化的state,而非將歷史的數(shù)據(jù)重新計(jì)算一遍煤惩。
在flink內(nèi)部提供三種state存儲(chǔ)實(shí)現(xiàn)

  • 內(nèi)存HeapStateBackend:存放數(shù)據(jù)量小嫉嘀,用于開(kāi)發(fā)測(cè)試使用;生產(chǎn)不建議使用
  • HDFS的FsStateBackend :分布式文件持久化魄揉,每次都會(huì)產(chǎn)生網(wǎng)絡(luò)io剪侮,可用于大state,不支持增量洛退;可用于生產(chǎn)
  • RocksDB的RocksDBStateBackend:本地文件 + 異步hdfs持久化瓣俯,也可用于大state數(shù)據(jù)量,唯一支持增量兵怯,可用于生產(chǎn)彩匕;
    比如使用RocksDB + HDFS進(jìn)行state存儲(chǔ):首先state先在本地存儲(chǔ)到RocksDB,然后異步寫入到HDFS中媒区,這樣可以突破HeapStateBackend受單節(jié)點(diǎn)資源限制(物理內(nèi)存驼仪,機(jī)器故障數(shù)據(jù)丟失等),也減少了分布式過(guò)程寫入帶來(lái)的網(wǎng)絡(luò)io開(kāi)銷袜漩。
    (state詳情)https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html
    state

三绪爸。operator state與keyedstate相關(guān)

3.1 state 分類

從operator和data兩個(gè)角度可將state劃分為2類

  • KeyedState:一般groupby或PartitionBy組成的內(nèi)容,即為key宙攻,每個(gè)key都有自己的state奠货,并且key與key間的state是不可見(jiàn)的
  • OperatorState:Source Connector的實(shí)現(xiàn)中就會(huì)用OperatorState來(lái)記錄source數(shù)據(jù)讀取的offset。

3.2 state擴(kuò)容重新分配

在flink中每一個(gè)并行運(yùn)算操作實(shí)例就是一個(gè)獨(dú)立的任務(wù)粘优,可以在機(jī)器上調(diào)度到網(wǎng)絡(luò)中其他的機(jī)器仇味;并且flink能夠進(jìn)行大規(guī)模的有狀態(tài)流處理呻顽,在邏輯上將這些分割成不同operator graph,同時(shí)operator也將被物理分解成多個(gè)操作實(shí)例丹墨。在flink的DAG圖中隨著data流向廊遍,垂直方向存在網(wǎng)絡(luò)io,而水平方向的stateful節(jié)點(diǎn)間是沒(méi)有網(wǎng)絡(luò)通信的贩挣,這樣每個(gè)operator維護(hù)一份自己本地的state喉前,并保存在本地磁盤。
比如source有5個(gè)partition王财,將source并行度1->2,中間stateful operator并行度2->3,結(jié)果如下圖:


擴(kuò)容分布圖

分析結(jié)果如下:在flink中不同的state有不同的擴(kuò)容方法

  • 關(guān)于operatorstate的擴(kuò)容
    這種state分配結(jié)合operator對(duì)應(yīng)并行度parallelism有關(guān)聯(lián)卵迂,比如FlinkKafkaConsumerBase的定義
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements
        CheckpointListener,
        ResultTypeQueryable<T>,
        CheckpointedFunction

看到在代碼內(nèi)部使用ListState:ListState<Tuple2<KafkaTopicPartition, Long>>

@PublicEvolving
public interface ListState<T> extends MergingState<T, Iterable<T>> {
    void update(List<T> values) throws Exception;

    void addAll(List<T> values) throws Exception;
}

通過(guò)源碼可以看到ListState具體定義,T是Tuple2<KafkaTopicPartition,Long>說(shuō)明了state存儲(chǔ)了當(dāng)前partition及其offset信息的列表绒净,KafkaTopicPartition代表一個(gè)partition见咒,Long代表當(dāng)前partition的offset,
當(dāng)source并行度=1挂疆,代表所有的partition都在同一個(gè)線程中讀取改览,對(duì)應(yīng)所有的partition的state也在同一個(gè)state維護(hù):如下圖


state存儲(chǔ)信息

當(dāng)把source并行度=2,對(duì)應(yīng)的operator并行度=3缤言,先看下parition與subtask之間的映射方法:
首先根據(jù)topic的hash值得到當(dāng)前的index開(kāi)始點(diǎn)宝当,進(jìn)行對(duì)齊,接著對(duì)當(dāng)前operator的subtasks進(jìn)行取模胆萧,得到的結(jié)果即為當(dāng)前partition分配的subtask的index

public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
        int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;

        // here, the assumption is that the id of Kafka partitions are always ascending
        // starting from 0, and therefore can be used directly as the offset clockwise from the start index
        return (startIndex + partition.getPartition()) % numParallelSubtasks;
    }
state擴(kuò)容后

可以發(fā)現(xiàn)通過(guò)Operator使用List<T>作為state的存儲(chǔ)結(jié)構(gòu)庆揩,是很容易解決這類state擴(kuò)容的,不過(guò)有一點(diǎn)source擴(kuò)容后的parallelism是否可以超過(guò)Source物理存儲(chǔ)上的partition個(gè)數(shù)跌穗?這樣會(huì)造成資源的浪費(fèi)订晌,超過(guò)partition的并發(fā)永遠(yuǎn)分配不到待處理的partition。

  • KeyedState擴(kuò)容處理
    在flink相對(duì)OperatorState的大小來(lái)說(shuō)瞻离,KeyedState還是比較大的腾仅,如果采用OperatorState進(jìn)行取模方式可能會(huì)帶來(lái)網(wǎng)絡(luò)拉取的成本較大,flink直接采用key-Groups(類似range的方式分配)
    1.Key-Groups:flink對(duì)keyedstate按照key進(jìn)行分組的方式套利,每個(gè)key-group會(huì)包含N>0個(gè)key,是keystate分配的原子單元鹤耍,在flink使用KeyGroupRange代表一個(gè)key-group肉迫。
public class KeyGroupRange implements KeyGroupsList, Serializable {
        ...
        ...
        private final int startKeyGroup;
        private final int endKeyGroup;
        ...
        ...
}

在KeyGroupRange中:startKeyGroup和endKeyGroup用來(lái)定義Operator關(guān)聯(lián)的Key-Group個(gè)數(shù)。
不過(guò)參考flink源碼可看到key-group在job啟動(dòng)之前對(duì)應(yīng)的數(shù)量是需要確定并且運(yùn)行中是不可變的稿黄。本身Operator的最大并行度<= key-group個(gè)數(shù)喊衫,每個(gè)Operator實(shí)例都會(huì)有自己的state,每個(gè)state關(guān)聯(lián)至少一個(gè)key-group

       ...
    public static int assignToKeyGroup(Object key, int maxParallelism) {
        return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
    }
        ...
    ...
    public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
        return MathUtils.murmurHash(keyHash) % maxParallelism;
    }
        ...

其實(shí)從分配key到key-group利用key的hash值與maxParallelism進(jìn)行取模來(lái)完成的杆怕。比如parallelism=2 maxParallelism=10


key-state

如上圖比如key=a對(duì)應(yīng)hash(a)=97族购, hash(a) % 10 = 7則分配到KG-7壳贪,其他的以此類推。
flink源碼中針對(duì)分配到key-group的操作如下:

public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
        int maxParallelism,
        int parallelism,
        int operatorIndex) {

        checkParallelismPreconditions(parallelism);
        checkParallelismPreconditions(maxParallelism);

        Preconditions.checkArgument(maxParallelism >= parallelism,
            "Maximum parallelism must not be smaller than parallelism.");
                //  當(dāng)前operator實(shí)例
        int start = ((operatorIndex * maxParallelism + parallelism - 1) / parallelism);
                // 當(dāng)前operator下一個(gè)實(shí)例的位置
        int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
        return new KeyGroupRange(start, end);
    }

如上代碼就是用于將operator(已指定parallelism和maxparallelism)對(duì)應(yīng)的key-groups進(jìn)行分配寝杖,計(jì)算當(dāng)前keygroup的start违施,end:先計(jì)算每個(gè)Operator實(shí)例至少分配的Key-Group個(gè)數(shù),將不能整除的部分N個(gè)瑟幕,平均分給前N個(gè)實(shí)例磕蒲。最終每個(gè)Operator實(shí)例管理的Key-Groups會(huì)在GroupRange中表示,本質(zhì)是一個(gè)區(qū)間值只盹;實(shí)例圖如下:


樣例解析圖

1.當(dāng)parallelism=2時(shí)可得到KeyGroupRange:
operatorIndex=0辣往,則得到start=0, end=4:如圖kg-keys:0殖卑,1站削,2,3孵稽,4
operatorIndex=1许起,則得到start=5,end=9:如圖kg-keys:5肛冶,6街氢,7,8睦袖,9

2.當(dāng)parallelism=3時(shí)可得到KeyGroupRange:
operatorIndex=0珊肃,則得到start=0, end=3:如圖kg-keys:0馅笙,1伦乔,2,3
operatorIndex=1董习,則得到start=4烈和,end=6:如圖kg-keys:4,5皿淋,6
operatorIndex=2招刹,則得到start=7, end=9:如圖kg-keys:7窝趣,8疯暑,9

一旦當(dāng)job修改了maxParallelism的值那么會(huì)直接影響到Key-Groups的數(shù)量和key的分配,也會(huì)打亂所有的Key-Group的分配哑舒,目前在Apache Flink系統(tǒng)中統(tǒng)一將maxParallelism的默認(rèn)值調(diào)整到4096妇拯,最大程度的避免無(wú)法擴(kuò)容的情況發(fā)生。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末洗鸵,一起剝皮案震驚了整個(gè)濱河市越锈,隨后出現(xiàn)的幾起案子仗嗦,更是在濱河造成了極大的恐慌,老刑警劉巖甘凭,帶你破解...
    沈念sama閱讀 218,036評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件稀拐,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡对蒲,警方通過(guò)查閱死者的電腦和手機(jī)钩蚊,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,046評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)蹈矮,“玉大人砰逻,你說(shuō)我怎么就攤上這事》耗瘢” “怎么了蝠咆?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,411評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)北滥。 經(jīng)常有香客問(wèn)我刚操,道長(zhǎng),這世上最難降的妖魔是什么再芋? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,622評(píng)論 1 293
  • 正文 為了忘掉前任菊霜,我火速辦了婚禮,結(jié)果婚禮上济赎,老公的妹妹穿的比我還像新娘鉴逞。我一直安慰自己,他們只是感情好司训,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,661評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布构捡。 她就那樣靜靜地躺著,像睡著了一般壳猜。 火紅的嫁衣襯著肌膚如雪勾徽。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,521評(píng)論 1 304
  • 那天统扳,我揣著相機(jī)與錄音喘帚,去河邊找鬼。 笑死咒钟,一個(gè)胖子當(dāng)著我的面吹牛啥辨,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播盯腌,決...
    沈念sama閱讀 40,288評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼陨瘩!你這毒婦竟也來(lái)了腕够?” 一聲冷哼從身側(cè)響起级乍,我...
    開(kāi)封第一講書(shū)人閱讀 39,200評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎帚湘,沒(méi)想到半個(gè)月后玫荣,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,644評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡大诸,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,837評(píng)論 3 336
  • 正文 我和宋清朗相戀三年捅厂,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片资柔。...
    茶點(diǎn)故事閱讀 39,953評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡焙贷,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出贿堰,到底是詐尸還是另有隱情辙芍,我是刑警寧澤,帶...
    沈念sama閱讀 35,673評(píng)論 5 346
  • 正文 年R本政府宣布羹与,位于F島的核電站故硅,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏纵搁。R本人自食惡果不足惜吃衅,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,281評(píng)論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望腾誉。 院中可真熱鬧徘层,春花似錦、人聲如沸妄辩。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,889評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)眼耀。三九已至英支,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間哮伟,已是汗流浹背干花。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,011評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留楞黄,地道東北人池凄。 一個(gè)月前我還...
    沈念sama閱讀 48,119評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像鬼廓,于是被迫代替她去往敵國(guó)和親肿仑。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,901評(píng)論 2 355