Flink狀態(tài)的縮放(rescale)與鍵組(Key Group)設(shè)計

前言

在之前那篇講解Flink Timer的文章里蛤签,我曾經(jīng)用三言兩語簡單解釋了Key Group和KeyGroupRange的概念瘟芝。實際上性宏,Key Group是Flink狀態(tài)機制中的一個重要設(shè)計踩验,值得專門探究一下涉茧。本文先介紹Flink狀態(tài)的理念赴恨,再經(jīng)由狀態(tài)——主要是Keyed State——的縮放(rescale)引出KeyGroup的細節(jié)。

再認識Flink狀態(tài)

自從開始寫關(guān)于Flink的東西以來伴栓,“狀態(tài)”這個詞被提過不下百次伦连,卻從來沒有統(tǒng)一的定義。Flink官方博客中給出的一種定義如下:

When it comes to stateful stream processing, state comprises of the information that an application or stream processing engine will remember across events and streams as more realtime (unbounded) and/or offline (bounded) data flow through the system.

根據(jù)這句話钳垮,狀態(tài)就是流處理過程中需要“記住”的那些數(shù)據(jù)的快照惑淳。而這些數(shù)據(jù)既可以包括業(yè)務(wù)數(shù)據(jù),也可以包括元數(shù)據(jù)(例如Kafka Consumer的offset)饺窿。以最常用也是最可靠的RocksDB狀態(tài)后端為例歧焦,狀態(tài)數(shù)據(jù)的流動可以抽象為3層,如下圖所示肚医。

用戶代碼產(chǎn)生的狀態(tài)實時地存儲在本地文件中绢馍,并且隨著Checkpoint的周期異步地同步到遠端的可靠分布式文件系統(tǒng)(如HDFS)向瓷。這樣就保證了100%本地性,各個Sub-Task只需要負責自己所屬的那部分狀態(tài)舰涌,不需要通過網(wǎng)絡(luò)互相傳輸狀態(tài)數(shù)據(jù)猖任,也不需要頻繁地讀寫HDFS,減少了開銷舵稠。在Flink作業(yè)重啟時超升,從HDFS取回狀態(tài)數(shù)據(jù)到本地,即可恢復現(xiàn)場哺徊。

我們已經(jīng)知道Flink的狀態(tài)分為兩類:Keyed State和Operator State室琢。前者與每個鍵相關(guān)聯(lián),后者與每個算子的并行實例(即Sub-Task)相關(guān)聯(lián)落追。下面來看看Keyed State的縮放盈滴。

Keyed State的縮放

所謂縮放,在Flink中就是指改變算子的并行度轿钠。Flink是不支持動態(tài)改變并行度的巢钓,必須先停止作業(yè),修改并行度之后再從Savepoint恢復疗垛。如果沒有狀態(tài)症汹,那么不管scale-in還是scale-out都非常簡單,只要做好數(shù)據(jù)流的重新分配就行贷腕,如下圖的例子所示背镇。

可是如果考慮狀態(tài)的話,就沒有那么簡單了:并行度改變之后泽裳,HDFS里的狀態(tài)數(shù)據(jù)該按何種規(guī)則取回給新作業(yè)里的各個Sub-Task瞒斩?下圖示出了這種困局。

按照最naive的思路考慮涮总,F(xiàn)link中的key是按照hash(key) % parallelism的規(guī)則分配到各個Sub-Task上去的胸囱,那么我們可以在縮放完成后,根據(jù)新分配的key集合從HDFS直接取回對應(yīng)的Keyed State數(shù)據(jù)瀑梗。下圖示出并行度從3增加到4后烹笔,Keyed State中各個key的重新分配。

在Checkpoint發(fā)生時抛丽,狀態(tài)數(shù)據(jù)是順序?qū)懭胛募到y(tǒng)的箕宙。但從上圖可以看出,從狀態(tài)恢復時是隨機讀的铺纽,效率非常低下柬帕。并且縮放之后各Sub-Task處理的key有可能大多都不是縮放之前的那些key,無形中降低了本地性。為了解決這兩個問題陷寝,在FLINK-3755對Keyed State專門引入了Key Group锅很,下面具體看看。

引入Key Group

如果看官有仔細讀Flink官方文檔的話凤跑,可能對這個概念已經(jīng)不陌生了爆安,原話抄錄如下:

Keyed State is further organized into so-called Key Groups. Key Groups are the atomic unit by which Flink can redistribute Keyed State; there are exactly as many Key Groups as the defined maximum parallelism. During execution each parallel instance of a keyed operator works with the keys for one or more Key Groups.

翻譯一下,Key Group是Keyed State分配的原子單位仔引,且Flink作業(yè)內(nèi)Key Group的數(shù)量與最大并行度相同扔仓,也就是說Key Group的索引位于[0, maxParallelism - 1]的區(qū)間內(nèi)。每個Sub-Task都會處理一個到多個Key Group咖耘,在源碼中翘簇,以KeyGroupRange數(shù)據(jù)結(jié)構(gòu)來表示。

KeyGroupRange的邏輯相對簡單儿倒,部分源碼如下版保。注意startKeyGroup和endKeyGroup實際上指的是Key Group的索引,并且是閉區(qū)間夫否。

public class KeyGroupRange implements KeyGroupsList, Serializable {
    private static final long serialVersionUID = 4869121477592070607L;
    public static final KeyGroupRange EMPTY_KEY_GROUP_RANGE = new KeyGroupRange();

    private final int startKeyGroup;
    private final int endKeyGroup;

    private KeyGroupRange() {
        this.startKeyGroup = 0;
        this.endKeyGroup = -1;
    }

    public KeyGroupRange(int startKeyGroup, int endKeyGroup) {
        this.startKeyGroup = startKeyGroup;
        this.endKeyGroup = endKeyGroup;
    }

    @Override
    public boolean contains(int keyGroup) {
        return keyGroup >= startKeyGroup && keyGroup <= endKeyGroup;
    }

    public KeyGroupRange getIntersection(KeyGroupRange other) {
        int start = Math.max(startKeyGroup, other.startKeyGroup);
        int end = Math.min(endKeyGroup, other.endKeyGroup);
        return start <= end ? new KeyGroupRange(start, end) : EMPTY_KEY_GROUP_RANGE;
    }

    public int getNumberOfKeyGroups() {
        return 1 + endKeyGroup - startKeyGroup;
    }

    public int getStartKeyGroup() {
        return startKeyGroup;
    }

    public int getEndKeyGroup() {
        return endKeyGroup;
    }

    @Override
    public int getKeyGroupId(int idx) {
        if (idx < 0 || idx > getNumberOfKeyGroups()) {
            throw new IndexOutOfBoundsException("Key group index out of bounds: " + idx);
        }
        return startKeyGroup + idx;
    }

    public static KeyGroupRange of(int startKeyGroup, int endKeyGroup) {
        return startKeyGroup <= endKeyGroup ? new KeyGroupRange(startKeyGroup, endKeyGroup) : EMPTY_KEY_GROUP_RANGE;
    }
}

我們還有兩個問題需要解決:

  • 如何決定一個key該分配到哪個Key Group中彻犁?
  • 如何決定一個Sub-Task該處理哪些Key Group(即對應(yīng)的KeyGroupRange)?

第一個問題凰慈,相關(guān)方法位于KeyGroupRangeAssignment類:

    public static int assignToKeyGroup(Object key, int maxParallelism) {
        return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
    }

    public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
        return MathUtils.murmurHash(keyHash) % maxParallelism;
    }

可見是對key進行兩重哈希(一次取hashCode汞幢,一次做MurmurHash)之后,再對最大并行度取余微谓,得到Key Group的索引急鳄。

第二個問題,仍然在上述類中的computeKeyGroupRangeForOperatorIndex()方法堰酿,源碼如下。

    public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
        int maxParallelism,
        int parallelism,
        int operatorIndex) {

        checkParallelismPreconditions(parallelism);
        checkParallelismPreconditions(maxParallelism);

        Preconditions.checkArgument(maxParallelism >= parallelism,
            "Maximum parallelism must not be smaller than parallelism.");

        int start = ((operatorIndex * maxParallelism + parallelism - 1) / parallelism);
        int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
        return new KeyGroupRange(start, end);
    }

可見是由并行度张足、最大并行度和算子實例(即Sub-Task)的ID共同決定的触创。根據(jù)Key Group的邏輯,上一節(jié)中Keyed State重分配的場景就會變成下圖所示(設(shè)最大并行度為10)为牍。

很明顯哼绑,將Key Group作為Keyed State的基本分配單元之后,上文所述本地性差和隨機讀的問題都部分得到了解決碉咆。當然還要注意抖韩,最大并行度對Key Group分配的影響是顯而易見的,因此不要隨意修改最大并行度的值疫铜。Flink內(nèi)部確定默認最大并行度的邏輯如下代碼所示茂浮。

    public static int computeDefaultMaxParallelism(int operatorParallelism) {
        checkParallelismPreconditions(operatorParallelism);
        return Math.min(
                Math.max(
                        MathUtils.roundUpToPowerOfTwo(operatorParallelism + (operatorParallelism / 2)),
                        DEFAULT_LOWER_BOUND_MAX_PARALLELISM),
                UPPER_BOUND_MAX_PARALLELISM);
    }

其中,下限值DEFAULT_LOWER_BOUND_MAX_PARALLELISM為128,上限值UPPER_BOUND_MAX_PARALLELISM為32768席揽。

The End

準備開啟遠程辦公模式顽馋。希望疫情快點好轉(zhuǎn)。

民那晚安幌羞。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末寸谜,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子属桦,更是在濱河造成了極大的恐慌熊痴,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,743評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件聂宾,死亡現(xiàn)場離奇詭異果善,居然都是意外死亡,警方通過查閱死者的電腦和手機亏吝,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,296評論 3 385
  • 文/潘曉璐 我一進店門岭埠,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人蔚鸥,你說我怎么就攤上這事惜论。” “怎么了止喷?”我有些...
    開封第一講書人閱讀 157,285評論 0 348
  • 文/不壞的土叔 我叫張陵馆类,是天一觀的道長。 經(jīng)常有香客問我弹谁,道長乾巧,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,485評論 1 283
  • 正文 為了忘掉前任预愤,我火速辦了婚禮沟于,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘植康。我一直安慰自己旷太,他們只是感情好,可當我...
    茶點故事閱讀 65,581評論 6 386
  • 文/花漫 我一把揭開白布销睁。 她就那樣靜靜地躺著供璧,像睡著了一般。 火紅的嫁衣襯著肌膚如雪冻记。 梳的紋絲不亂的頭發(fā)上睡毒,一...
    開封第一講書人閱讀 49,821評論 1 290
  • 那天,我揣著相機與錄音冗栗,去河邊找鬼演顾。 笑死供搀,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的偶房。 我是一名探鬼主播趁曼,決...
    沈念sama閱讀 38,960評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼棕洋!你這毒婦竟也來了挡闰?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,719評論 0 266
  • 序言:老撾萬榮一對情侶失蹤掰盘,失蹤者是張志新(化名)和其女友劉穎摄悯,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體愧捕,經(jīng)...
    沈念sama閱讀 44,186評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡奢驯,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,516評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了次绘。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片瘪阁。...
    茶點故事閱讀 38,650評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖邮偎,靈堂內(nèi)的尸體忽然破棺而出管跺,到底是詐尸還是另有隱情,我是刑警寧澤禾进,帶...
    沈念sama閱讀 34,329評論 4 330
  • 正文 年R本政府宣布豁跑,位于F島的核電站,受9級特大地震影響泻云,放射性物質(zhì)發(fā)生泄漏艇拍。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,936評論 3 313
  • 文/蒙蒙 一宠纯、第九天 我趴在偏房一處隱蔽的房頂上張望卸夕。 院中可真熱鬧,春花似錦婆瓜、人聲如沸快集。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,757評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至治力,卻和暖如春蒙秒,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背宵统。 一陣腳步聲響...
    開封第一講書人閱讀 31,991評論 1 266
  • 我被黑心中介騙來泰國打工晕讲, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留覆获,地道東北人。 一個月前我還...
    沈念sama閱讀 46,370評論 2 360
  • 正文 我出身青樓瓢省,卻偏偏與公主長得像弄息,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子勤婚,可洞房花燭夜當晚...
    茶點故事閱讀 43,527評論 2 349