Flink State機制源碼解析

在流計算中,事件會持續(xù)不斷的產(chǎn)生,如果每次計算都是相互獨立的嘹履,不依賴于上下游事件,則稱為無狀態(tài) (stateless) 計算债热;如果每次的計算依賴于之前或者后續(xù)的事件砾嫉,則稱為有狀態(tài) (stateful) 計算。Flink 中的狀態(tài)用 State 抽象窒篱,用來保存中間計算結(jié)果或者緩存數(shù)據(jù)焕刮,State 是 Flink 內(nèi)部實現(xiàn) Exactly-Once的基礎。

狀態(tài)類型

和 redis 類似墙杯,F(xiàn)link 按照數(shù)據(jù)類型的不同配并,定義了多種 State 接口,具體如下:

  1. ValueState<T>

    單值狀態(tài)高镐,與數(shù)據(jù)的 key 綁定溉旋;提供了 update(T value) 方法更新值,value() 方法獲取狀態(tài)值嫉髓。

  2. ListState<T>

    Key 上的狀態(tài)值為一個 List观腊;提供了 add 邑闲,get 方法來分別增加和獲取數(shù)據(jù)。

  3. MapState<UK, UV>

    Key 上的狀態(tài)值為一個 Map梧油;提供了 put监憎,putAll, get 方法來增加和獲取數(shù)據(jù)。

  4. ReducingState<T>

    實現(xiàn)的 ReduceFunction 中使用的 state, 在 reduce 方法之前婶溯,會先調(diào)用 ReducingState 的 add 方法,reduce 方法中的第一個參數(shù)就是狀態(tài)值偷霉。

  5. AggregatingState<IN, OUT>

    聚合 state; 在 AggregateFunction 的 add 方法調(diào)用之前迄委,會先調(diào)用 AggregatingState 的 add 方法,傳入 acc类少。

按照算子是否有 key, Flink State 又被劃分為 KeyedState 和 OperatorState叙身。

類型 state
KeyedState ValueState<br/ >ListState<br />ReducingState<br />AggregationState<br />MapState<br />
OperatorState ListState

今天我們重點講 KeyedState 里的最簡單的 ValueState 的實現(xiàn)。KeyedState硫狞,顧名思義信轿,要與 key 綁定,即只能用在 KeyedStream 流之后残吩;每一個 key 對應一個 State 值财忽;Key 除了有分區(qū)的作用,在狀態(tài)管理當中泣侮,它還用于計算 keyGroupIndex:

// AbstractStreamOperator.java line:463
private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector) throws Exception {
    if (selector != null) {
        Object key = selector.getKey(record.getValue());
        setCurrentKey(key);
    }
}

// StreamOperatorStateHandler.java line:281
public void setCurrentKey(Object key) {
    if (keyedStateBackend != null) {
        try {
            // need to work around type restrictions
            @SuppressWarnings("unchecked,rawtypes")
            AbstractKeyedStateBackend rawBackend = (AbstractKeyedStateBackend) keyedStateBackend;

            rawBackend.setCurrentKey(key);
        } catch (Exception e) {
            throw new RuntimeException("Exception occurred while setting the current key context.", e);
        }
    }
}

// 接著看 AbstractKeyedStateBackend.java line:172
public void setCurrentKey(K newKey) {
    notifyKeySelected(newKey);
    this.keyContext.setCurrentKey(newKey);      this.keyContext.setCurrentKeyGroupIndex(KeyGroupRangeAssignment.assignToKeyGroup(newKey,numberOfKeyGroups));
}

從上面可以看出, 先設置 key, 再設置 keyGroupIndex即彪,具體 keyGroupIndex 的作用是什么,我們后面會講活尊。

狀態(tài)描述

State 是暴露給用戶的接口隶校,那么就需要指定狀態(tài)的一些屬性,如 name, type, ttl 等蛹锰。Flink 中用 StateDescriptor 來描述一個狀態(tài)深胳,在對應的 StateBackend (狀態(tài)后端) 中,調(diào)用 create 方法得到對應的 State 對象铜犬。下面以一個簡單的 demo 來演示舞终。

private transient ValueState<Tuple2<Integer, Integer>> state;
public void open(Configuration parameters) throws Exception {
    ValueStateDescriptor<Tuple2<Integer, Integer>> descriptor =
                    new ValueStateDescriptor<>("average", TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {}));
    state = getRuntimeContext().getState(descriptor);
}

我們順著方法調(diào)用鏈走下去,跳過中間一些無關的代碼癣猾,直接看 TtlStateFactory 的 createStateAndWrapWithTtlIfEnabled 方法:

public static <K, N, SV, TTLSV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
        TypeSerializer<N> namespaceSerializer,
        StateDescriptor<S, SV> stateDesc,
        KeyedStateBackend<K> stateBackend,
        TtlTimeProvider timeProvider) throws Exception {
        ...
        // 是否為狀態(tài)設置了 ttl权埠,如果設置 ttl,使用TtlStateFactory創(chuàng)建state,如果沒有煎谍,直接調(diào)用 stateBackend創(chuàng)建state
        return  stateDesc.getTtlConfig().isEnabled() ?
            new TtlStateFactory<K, N, SV, TTLSV, S, IS>(
                namespaceSerializer, stateDesc, stateBackend, timeProvider)
                .createState() :
            stateBackend.createInternalState(namespaceSerializer, stateDesc);
    }

上面兩個創(chuàng)建 state 的不同分支區(qū)別就是是否設置了 ttl攘蔽,其它的基本一樣。接著看 stateBackend.createInternalState 方法的實現(xiàn)

public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
        @Nonnull TypeSerializer<N> namespaceSerializer,
        @Nonnull StateDescriptor<S, SV> stateDesc,
        @Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {
        StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
        if (stateFactory == null) {
            String message = String.format("State %s is not supported by %s",
                stateDesc.getClass(), this.getClass());
            throw new FlinkRuntimeException(message);
        }
        // 這里是狀態(tài)實現(xiàn)原理的重點, 狀態(tài)的獲取/更新都是通過 statetable 操作的呐粘,我們重點看下里面的實現(xiàn)
        StateTable<K, N, SV> stateTable = tryRegisterStateTable(
            namespaceSerializer, stateDesc, getStateSnapshotTransformFactory(stateDesc, snapshotTransformFactory));
        // 根據(jù)狀態(tài)描述的類型满俗,調(diào)用對應的構造方法转捕。
        return stateFactory.createState(stateDesc, stateTable, getKeySerializer());
    }

StateTable 是狀態(tài)實現(xiàn)原理的重點, 狀態(tài)的獲取/更新都是通過 statetable 操作的,我們重點看下里面的實現(xiàn)唆垃。

private <N, V> StateTable<K, N, V> tryRegisterStateTable(
        TypeSerializer<N> namespaceSerializer,
        StateDescriptor<?, V> stateDesc,
        @Nonnull StateSnapshotTransformFactory<V> snapshotTransformFactory) throws StateMigrationException {

        @SuppressWarnings("unchecked")
        StateTable<K, N, V> stateTable = (StateTable<K, N, V>) registeredKVStates.get(stateDesc.getName());

        TypeSerializer<V> newStateSerializer = stateDesc.getSerializer();

        if (stateTable != null) {
            // ...
        } else {
            RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
                stateDesc.getType(),
                stateDesc.getName(),
                namespaceSerializer,
                newStateSerializer,
                snapshotTransformFactory);
            // 得到 statetable對象五芝。啟用 statetable的構造方法。初始化size為128(默認最大并行度)的 StateMap數(shù)組
            stateTable = snapshotStrategy.newStateTable(keyContext, newMetaInfo, keySerializer);
            registeredKVStates.put(stateDesc.getName(), stateTable);
        }

        return stateTable;
    }

StateTable 里提供的 get, put 操作實現(xiàn)了對狀態(tài)值的 獲取和更新辕万,StateTable是一個抽象類枢步,如下:

StateTable.java

NestedMapsStateTable 使用兩層嵌套的 HashMap 保存狀態(tài)數(shù)據(jù),支持同步快照渐尿;CopyOnWriteStateTable 使用 CopyOnWriteStateMap 來保存狀態(tài)數(shù)據(jù)醉途,支持異步快照。下面是 CopyOnWriteStateTable里的 put 和 get 方法的實現(xiàn)

private S get(K key, int keyGroupIndex, N namespace) {
        checkKeyNamespacePreconditions(key, namespace);
        // stateMap數(shù)組默認size為128砖茸,keyGroupIndex為key的hash與128取模得到
 // StateMap內(nèi)部封裝了MapEntry隘擎,類似于 HashMap,基于鏈表加數(shù)組的實現(xiàn)
        StateMap<K, N, S> stateMap = getMapForKeyGroup(keyGroupIndex);
        if (stateMap == null) {
            return null;
        }
        // 從map中得到state value
        return stateMap.get(key, namespace);
}

public void put(K key, int keyGroup, N namespace, S state) {
        checkKeyNamespacePreconditions(key, namespace);
        // stateMap數(shù)組默認size為128凉夯,keyGroupIndex為key的hash與128取模得到
        StateMap<K, N, S> stateMap = getMapForKeyGroup(keyGroup);
        stateMap.put(key, namespace, state);
}

上面代碼比較簡單明了货葬,也就不用多做解釋。

總結(jié)

  1. StateDescribe 持有狀態(tài)的描述劲够, StateBackend 通過它來創(chuàng)建 State 對象
  2. State 對象里封裝了 StateTable震桶,StateTable 負責對State 做 snapshot 到對應的 StateBackend。
  3. StateTable 里封裝了 StateMap征绎,為存儲 state 的內(nèi)存介質(zhì)尼夺,負責狀態(tài)的更新/新增/獲取 (基于內(nèi)存)。
  4. StateMap 在 StateTable 中為一個數(shù)組炒瘸,默認 size 為最大并行度 128淤堵,所以會存在不同 key 對應一個StateMap對象。里面的實現(xiàn)類似于 HashMap顷扩,為鏈表加數(shù)組的數(shù)據(jù)結(jié)構拐邪。
最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市隘截,隨后出現(xiàn)的幾起案子扎阶,更是在濱河造成了極大的恐慌,老刑警劉巖婶芭,帶你破解...
    沈念sama閱讀 222,104評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件东臀,死亡現(xiàn)場離奇詭異,居然都是意外死亡犀农,警方通過查閱死者的電腦和手機惰赋,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,816評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人赁濒,你說我怎么就攤上這事轨奄。” “怎么了拒炎?”我有些...
    開封第一講書人閱讀 168,697評論 0 360
  • 文/不壞的土叔 我叫張陵挪拟,是天一觀的道長。 經(jīng)常有香客問我击你,道長玉组,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,836評論 1 298
  • 正文 為了忘掉前任丁侄,我火速辦了婚禮惯雳,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘绒障。我一直安慰自己,他們只是感情好捍歪,可當我...
    茶點故事閱讀 68,851評論 6 397
  • 文/花漫 我一把揭開白布户辱。 她就那樣靜靜地躺著,像睡著了一般糙臼。 火紅的嫁衣襯著肌膚如雪庐镐。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,441評論 1 310
  • 那天变逃,我揣著相機與錄音必逆,去河邊找鬼畜普。 笑死糠涛,一個胖子當著我的面吹牛鞋吉,可吹牛的內(nèi)容都是我干的产场。 我是一名探鬼主播端辱,決...
    沈念sama閱讀 40,992評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼档桃,長吁一口氣:“原來是場噩夢啊……” “哼行冰!你這毒婦竟也來了被碗?” 一聲冷哼從身側(cè)響起撒犀,我...
    開封第一講書人閱讀 39,899評論 0 276
  • 序言:老撾萬榮一對情侶失蹤福压,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后或舞,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體荆姆,經(jīng)...
    沈念sama閱讀 46,457評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,529評論 3 341
  • 正文 我和宋清朗相戀三年映凳,在試婚紗的時候發(fā)現(xiàn)自己被綠了胆筒。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,664評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡诈豌,死狀恐怖腐泻,靈堂內(nèi)的尸體忽然破棺而出决乎,到底是詐尸還是另有隱情,我是刑警寧澤派桩,帶...
    沈念sama閱讀 36,346評論 5 350
  • 正文 年R本政府宣布构诚,位于F島的核電站,受9級特大地震影響铆惑,放射性物質(zhì)發(fā)生泄漏范嘱。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,025評論 3 334
  • 文/蒙蒙 一员魏、第九天 我趴在偏房一處隱蔽的房頂上張望丑蛤。 院中可真熱鬧,春花似錦撕阎、人聲如沸受裹。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,511評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽棉饶。三九已至,卻和暖如春镇匀,著一層夾襖步出監(jiān)牢的瞬間照藻,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,611評論 1 272
  • 我被黑心中介騙來泰國打工汗侵, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留幸缕,地道東北人。 一個月前我還...
    沈念sama閱讀 49,081評論 3 377
  • 正文 我出身青樓晰韵,卻偏偏與公主長得像发乔,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子雪猪,可洞房花燭夜當晚...
    茶點故事閱讀 45,675評論 2 359

推薦閱讀更多精彩內(nèi)容