關(guān)于使用Flink RocksDB狀態(tài)后端時(shí)一定要寫(xiě)MapState而非ValueState<Map>這檔事(以及解決方法)

前言

抱歉起這種爛大街的日本輕小說(shuō)風(fēng)格標(biāo)題來(lái)吸引注意力于游。原本我認(rèn)為這是常識(shí),不需要專(zhuān)門(mén)寫(xiě)一篇文章來(lái)講解如此細(xì)碎的點(diǎn)孵稽。但是在最近工作巡檢中發(fā)現(xiàn)了越來(lái)越多如同ValueState<Map>的狀態(tài)用法(當(dāng)然大部分是歷史遺留)咙好,部分Flink作業(yè)深受性能問(wèn)題困擾,所以還是抽出點(diǎn)時(shí)間快速聊一聊术吝,順便給出不算優(yōu)雅但還算有效的挽救方案。

基于RocksDB的狀態(tài)序列化

我們已經(jīng)知道茸苇,RocksDB是基于二進(jìn)制流的內(nèi)嵌K-V存儲(chǔ)排苍,所以Flink任務(wù)使用RocksDB狀態(tài)后端時(shí),寫(xiě)/讀操作的狀態(tài)數(shù)據(jù)都需要經(jīng)過(guò)序列化和反序列化学密,從而利用TaskManager本地磁盤(pán)實(shí)現(xiàn)海量的狀態(tài)存儲(chǔ)淘衙。

舉個(gè)栗子,RocksDBValueState的取值和更新方法如下:

class RocksDBValueState<K, N, V> extends AbstractRocksDBState<K, N, V>
       implements InternalValueState<K, N, V> {
   @Override
   public TypeSerializer<K> getKeySerializer() {
       return backend.getKeySerializer();
   }

   @Override
   public TypeSerializer<N> getNamespaceSerializer() {
       return namespaceSerializer;
   }

   @Override
   public TypeSerializer<V> getValueSerializer() {
       return valueSerializer;
   }

   @Override
   public V value() {
       try {
           byte[] valueBytes =
                   backend.db.get(columnFamily, serializeCurrentKeyWithGroupAndNamespace());

           if (valueBytes == null) {
               return getDefaultValue();
           }
           dataInputView.setBuffer(valueBytes);
           return valueSerializer.deserialize(dataInputView);
       } catch (IOException | RocksDBException e) {
           throw new FlinkRuntimeException("Error while retrieving data from RocksDB.", e);
       }
   }

   @Override
   public void update(V value) {
       if (value == null) {
           clear();
           return;
       }

       try {
           backend.db.put(
                   columnFamily,
                   writeOptions,
                   serializeCurrentKeyWithGroupAndNamespace(),
                   serializeValue(value));
       } catch (Exception e) {
           throw new FlinkRuntimeException("Error while adding data to RocksDB", e);
       }
   }
}

可見(jiàn)key和value都需要經(jīng)過(guò)對(duì)應(yīng)類(lèi)型的TypeSerializer的處理腻暮,即如果將狀態(tài)聲明為ValueState<Map<K, V>>彤守,那么將由MapSerializer<K, V>負(fù)責(zé)值的正反序列化毯侦。特別注意,serializeCurrentKeyWithGroupAndNamespace()方法中具垫,key需要加上它所對(duì)應(yīng)的KeyGroup編號(hào)和對(duì)應(yīng)的Namespace(Namespace是窗口信息)侈离,形成一個(gè)復(fù)合key,即:CompositeKey(KG, K, NS)筝蚕,RocksDB實(shí)際存儲(chǔ)的狀態(tài)數(shù)據(jù)的key都類(lèi)似如此卦碾。具體可參看SerializedCompositeKeyBuilder類(lèi),不再贅述起宽。

接下來(lái)再看一下RocksDBMapState的部分實(shí)現(xiàn)洲胖。

class RocksDBMapState<K, N, UK, UV> extends AbstractRocksDBState<K, N, Map<UK, UV>>
        implements InternalMapState<K, N, UK, UV> {
    @Override
    public TypeSerializer<K> getKeySerializer() {
        return backend.getKeySerializer();
    }

    @Override
    public TypeSerializer<N> getNamespaceSerializer() {
        return namespaceSerializer;
    }

    @Override
    public TypeSerializer<Map<UK, UV>> getValueSerializer() {
        return valueSerializer;
    }

    @Override
    public UV get(UK userKey) throws IOException, RocksDBException {
        byte[] rawKeyBytes =
                serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
        byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);

        return (rawValueBytes == null
                ? null
                : deserializeUserValue(dataInputView, rawValueBytes, userValueSerializer));
    }

    @Override
    public void put(UK userKey, UV userValue) throws IOException, RocksDBException {

        byte[] rawKeyBytes =
                serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
        byte[] rawValueBytes = serializeValueNullSensitive(userValue, userValueSerializer);

        backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
    }

    @Override
    public void putAll(Map<UK, UV> map) throws IOException, RocksDBException {
        if (map == null) {
            return;
        }

        try (RocksDBWriteBatchWrapper writeBatchWrapper =
                new RocksDBWriteBatchWrapper(
                        backend.db, writeOptions, backend.getWriteBatchSize())) {
            for (Map.Entry<UK, UV> entry : map.entrySet()) {
                byte[] rawKeyBytes =
                        serializeCurrentKeyWithGroupAndNamespacePlusUserKey(
                                entry.getKey(), userKeySerializer);
                byte[] rawValueBytes =
                        serializeValueNullSensitive(entry.getValue(), userValueSerializer);
                writeBatchWrapper.put(columnFamily, rawKeyBytes, rawValueBytes);
            }
        }
    }

    @Override
    public Iterable<Map.Entry<UK, UV>> entries() {
        return this::iterator;
    }

    @Override
    public Iterator<Map.Entry<UK, UV>> iterator() {
        final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace();

        return new RocksDBMapIterator<Map.Entry<UK, UV>>(
                backend.db, prefixBytes, userKeySerializer, userValueSerializer, dataInputView) {
            @Override
            public Map.Entry<UK, UV> next() {
                return nextEntry();
            }
        };
    }

由于MapState的本身有用戶(hù)定義的key UK,所以RocksDB存儲(chǔ)它時(shí)坯沪,會(huì)在上文所述的復(fù)合key后面绿映,再加上UK的值,即:CompositeKey(KG, K, NS) :: UK屏箍。這樣绘梦,同屬于一個(gè)KeyContext的所有用戶(hù)鍵值對(duì)就存在一個(gè)連續(xù)的存儲(chǔ)空間內(nèi),可以通過(guò)RocksDB WriteBatch機(jī)制攢批赴魁,實(shí)現(xiàn)批量寫(xiě)(putAll()方法),也可以通過(guò)RocksDB Iterator機(jī)制做前綴掃描钝诚,實(shí)現(xiàn)批量讀(entries()方法)颖御。

問(wèn)題的癥結(jié)

代碼讀完了。假設(shè)我們?cè)谀硞€(gè)key下有5條數(shù)據(jù)的狀態(tài)凝颇,若使用ValueState<Map<String, String>>來(lái)存儲(chǔ)潘拱,按照MapSerializer的序列化方式,其存儲(chǔ)可以記為:

(1, k, VoidNamespace) -> [5, k1, false, v1, k2, false, v2, k3, true, k4, false, v4, k5, false, v5]

注意對(duì)于無(wú)窗口上下文的狀態(tài)拧略,NS為VoidNamespace芦岂。且序列化Map時(shí),會(huì)加上Map的大小垫蛆,以及表示每個(gè)value是否為NULL的標(biāo)記禽最。

如果使用MapState<String, String>存儲(chǔ),可以記為:

(1, k, VoidNamespace) :: k1 -> v1
(1, k, VoidNamespace) :: k2 -> v2
(1, k, VoidNamespace) :: k3 -> NULL
(1, k, VoidNamespace) :: k4 -> v4
(1, k, VoidNamespace) :: k5 -> v5

如果我們獲取或修改一條狀態(tài)數(shù)據(jù)袱饭,前者需要將所有數(shù)據(jù)做一遍序列化和反序列化川无,而后者只需要處理一條。在Map比較小的情況下可能沒(méi)有明顯的性能差異虑乖,但是如果Map有幾十個(gè)甚至上百個(gè)鍵值對(duì)懦趋,或者某些value的長(zhǎng)度很長(zhǎng)(如各類(lèi)打標(biāo)標(biāo)記串等),ValueState<Map>的性能退化就會(huì)非常嚴(yán)重疹味,造成反壓仅叫。

有的同學(xué)可能會(huì)問(wèn):我對(duì)狀態(tài)數(shù)據(jù)的操作基本都是“整存整取”(即讀/寫(xiě)整個(gè)Map)帜篇,也不建議使用ValueState<Map>嗎?答案仍然是不建議诫咱。除了前面提到的WriteBatch和Iterator為MapState帶來(lái)的優(yōu)化之外坠狡,RocksDB更可以利用多線(xiàn)程進(jìn)行讀寫(xiě),而單個(gè)大value不僅不能享受這個(gè)便利遂跟,還會(huì)擠占Block Cache空間逃沿,在出現(xiàn)數(shù)據(jù)傾斜等場(chǎng)景時(shí),磁盤(pán)I/O可能會(huì)打到瓶頸幻锁。所以凯亮,我們?cè)陂_(kāi)始編寫(xiě)作業(yè)時(shí)就應(yīng)該正確使用MapState

平滑遷移

為了消除此類(lèi)狀態(tài)誤用的影響哄尔,常見(jiàn)的重構(gòu)方式是將ValueState<Map>修改為MapState假消,重置位點(diǎn)后消費(fèi)歷史數(shù)據(jù),積攢狀態(tài)岭接,并替換掉舊任務(wù)富拗。但是對(duì)于狀態(tài)TTL較長(zhǎng)、size較大的場(chǎng)景(例如物流監(jiān)控場(chǎng)景經(jīng)常有30天TTL鸣戴、十幾TB大的State)啃沪,這樣顯然非常不方便,下面提供一種簡(jiǎn)單的平滑遷移方式窄锅。

假設(shè)原本誤用的狀態(tài)為mainState创千,我們聲明兩個(gè)新的狀態(tài),一個(gè)是新的MapState newMainState入偷,一個(gè)是布爾型ValueState isMigratedState追驴,表示該key對(duì)應(yīng)的狀態(tài)是否已經(jīng)遷移成了新的,即:

    private transient ValueState<Map<String, String>> mainState;

    private transient ValueState<Boolean> isMigratedState;
    private transient MapState<String, String> newMainState;    

當(dāng)然疏之,它們的TTL等參數(shù)要完全相同殿雪。

寫(xiě)兩個(gè)新的方法,負(fù)責(zé)在讀寫(xiě)mainState時(shí)將其遷移成newMainState锋爪,并做上相應(yīng)的標(biāo)記丙曙。不存在歷史狀態(tài)的,直接以新格式存儲(chǔ)几缭。再?gòu)?qiáng)調(diào)一遍河泳,newMainState.entries()newMainState.putAll()的性能很不錯(cuò),不必過(guò)于擔(dān)心年栓。

    private Map<String, String> wrapGetMainState() throws Exception {
        Boolean isMigrated = isMigratedState.value();

        if (isMigrated == null || !isMigrated) {
            Map<String, String> oldStateData = mainState.value();
            if (oldStateData != null) {
                newMainState.putAll(mainState.value());
            }
            isMigratedState.update(true);
            mainState.clear();
        }

        Map<String, String> result = new HashMap<>();
        for (Entry<String, String> e : newMainState.entries()) {
            result.put(e.getKey(), e.getValue());
        }
        return result;
    }

    private void wrapUpdateMainState(Map<String, String> data) throws Exception {
        Boolean isMigrated = isMigratedState.value();

        if (isMigrated == null || !isMigrated) {
            Map<String, String> oldStateData = mainState.value();
            if (oldStateData != null) {
                newMainState.putAll(mainState.value());
            }
            isMigratedState.update(true);
            mainState.clear();
        }

        newMainState.putAll(data);
    }

再將歷史代碼中的狀態(tài)訪問(wèn)全部替換成wrapGetMainState()wrapUpdateMainState()方法的調(diào)用即可拆挥。表面上看是由一個(gè)狀態(tài)句柄變成了兩個(gè)狀態(tài)句柄,但是標(biāo)記狀態(tài)的訪問(wèn)十分輕量級(jí),且隨著程序的運(yùn)行纸兔,舊狀態(tài)的數(shù)據(jù)漸進(jìn)式地替換完畢之后惰瓜,就可以安全地刪除mainStateisMigratedState了。當(dāng)然汉矿,托管內(nèi)存的設(shè)置要科學(xué)崎坊,并添加一些有利于RocksDB狀態(tài)吞吐量的參數(shù),如:

state.backend.rocksdb.predefined-options  SPINNING_DISK_OPTIMIZED_HIGH_MEM
state.backend.rocksdb.memory.partitioned-index-filters  true

基于堆的狀態(tài)呢洲拇?

與RocksDB相反奈揍,基于堆的JobManager和FileSystem狀態(tài)后端無(wú)需序列化和反序列化,當(dāng)然狀態(tài)的大小就要受制于TaskManager內(nèi)存赋续。不過(guò)男翰,如果我們采用這兩種狀態(tài)后端,ValueState<Map>MapState也就沒(méi)有明顯的性能差別了纽乱,因?yàn)?code>HeapValueState和HeapMapState的底層都是相同的蛾绎,即CopyOnWriteStateTable,本質(zhì)上是內(nèi)存中的狀態(tài)映射表鸦列。讀者有興趣可以自行參考對(duì)應(yīng)的Flink源碼租冠,這里不再啰嗦了。

ValueState薯嗤、ListState顽爹、MapState三者在RocksDB狀態(tài)后端和基于堆的狀態(tài)后端中的異同點(diǎn)可以概括成下表。

The End

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末应民,一起剝皮案震驚了整個(gè)濱河市话原,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌诲锹,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,402評(píng)論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件涉馅,死亡現(xiàn)場(chǎng)離奇詭異归园,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)稚矿,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén)庸诱,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人晤揣,你說(shuō)我怎么就攤上這事桥爽。” “怎么了昧识?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,483評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵钠四,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我,道長(zhǎng)缀去,這世上最難降的妖魔是什么侣灶? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,165評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮缕碎,結(jié)果婚禮上褥影,老公的妹妹穿的比我還像新娘。我一直安慰自己咏雌,他們只是感情好凡怎,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,176評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著赊抖,像睡著了一般统倒。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上熏迹,一...
    開(kāi)封第一講書(shū)人閱讀 51,146評(píng)論 1 297
  • 那天檐薯,我揣著相機(jī)與錄音,去河邊找鬼注暗。 笑死坛缕,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的捆昏。 我是一名探鬼主播赚楚,決...
    沈念sama閱讀 40,032評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼骗卜!你這毒婦竟也來(lái)了宠页?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 38,896評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤寇仓,失蹤者是張志新(化名)和其女友劉穎举户,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體遍烦,經(jīng)...
    沈念sama閱讀 45,311評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡俭嘁,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,536評(píng)論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了服猪。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片供填。...
    茶點(diǎn)故事閱讀 39,696評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖罢猪,靈堂內(nèi)的尸體忽然破棺而出近她,到底是詐尸還是另有隱情,我是刑警寧澤膳帕,帶...
    沈念sama閱讀 35,413評(píng)論 5 343
  • 正文 年R本政府宣布粘捎,位于F島的核電站,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏晌端。R本人自食惡果不足惜捅暴,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,008評(píng)論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望咧纠。 院中可真熱鬧蓬痒,春花似錦、人聲如沸漆羔。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)演痒。三九已至亲轨,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間鸟顺,已是汗流浹背惦蚊。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,815評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留讯嫂,地道東北人蹦锋。 一個(gè)月前我還...
    沈念sama閱讀 47,698評(píng)論 2 368
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像欧芽,于是被迫代替她去往敵國(guó)和親莉掂。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,592評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容