聊聊Flink DataStream的八種物理分區(qū)邏輯

這兩天正在折騰ClickHouse块请,折騰完再寫文章記錄挎袜,今天就先弄一篇關(guān)于Flink的小知識吧只厘。

DataStream分區(qū)

Spark的RDD有分區(qū)的概念乘综,F(xiàn)link的DataStream同樣也有符隙,只不過沒有RDD那么顯式而已趴捅。Flink通過流分區(qū)器StreamPartitioner來控制DataStream中的元素往下游的流向,以StreamPartitioner抽象類為中心的類圖如下所示霹疫。

在Flink的Web UI界面中拱绑,各算子之間的分區(qū)器類型會在箭頭上標(biāo)注出來,如下所示丽蝎。

StreamPartitioner繼承自ChannelSelector接口猎拨。這里的Channel概念與Netty不同,只是Flink對于數(shù)據(jù)寫入目的地的簡單抽象屠阻,我們可以直接認(rèn)為它就是下游算子的并發(fā)實例(即物理分區(qū))红省。所有StreamPartitioner的子類都要實現(xiàn)selectChannel()方法,用來選擇分區(qū)號国觉。下面分別來看看Flink提供的8種StreamPartitioner的源碼吧恃,以加深理解。

GlobalPartitioner

    // dataStream.global()
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        return 0;
    }

GlobalPartitioner只會將數(shù)據(jù)輸出到下游算子的第一個實例麻诀,簡單暴力蚜枢。

ShufflePartitioner

    private Random random = new Random();
    // dataStream.shuffle()
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        return random.nextInt(numberOfChannels);
    }

ShufflePartitioner會將數(shù)據(jù)隨機輸出到下游算子的并發(fā)實例缸逃。由于java.util.Random生成的隨機數(shù)符合均勻分布,故能夠近似保證平均厂抽。

RebalancePartitioner

    private int nextChannelToSendTo;

    @Override
    public void setup(int numberOfChannels) {
        super.setup(numberOfChannels);
        nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
    }
    // dataStream.rebalance()
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
        return nextChannelToSendTo;
    }

RebalancePartitioner會先隨機選擇一個下游算子的實例,然后用輪詢(round-robin)的方式從該實例開始循環(huán)輸出丁眼。該方式能保證完全的下游負(fù)載均衡筷凤,所以常用來處理帶有自然傾斜的原始數(shù)據(jù)流,比如各Partition之間數(shù)據(jù)量差距比較大的Kafka Topic苞七。

KeyGroupStreamPartitioner

    private final KeySelector<T, K> keySelector;
    private int maxParallelism;
    // dataStream.keyBy()
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        K key;
        try {
            key = keySelector.getKey(record.getInstance().getValue());
        } catch (Exception e) {
            throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
        }
        return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
    }

    public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
        return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
    }

    public static int assignToKeyGroup(Object key, int maxParallelism) {
        return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
    }

    public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
        return MathUtils.murmurHash(keyHash) % maxParallelism;
    }

    public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
        return keyGroupId * parallelism / maxParallelism;
    }

這就是keyBy()算子底層所采用的StreamPartitioner藐守,可見是先在key值的基礎(chǔ)上經(jīng)過了兩重哈希得到key對應(yīng)的哈希值,第一重是Java自帶的hashCode()蹂风,第二重則是MurmurHash卢厂。然后將哈希值乘以算子并行度,并除以最大并行度惠啄,得到最終的分區(qū)ID慎恒。

看官可能會覺得上面的代碼有點眼熟,其實它們在之前講解Key Group機制時出現(xiàn)過撵渡,詳情參見這篇文章融柬。

BroadcastPartitioner

    // dataStream.broadcast()
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        throw new UnsupportedOperationException("Broadcast partitioner does not support select channels.");
    }

    @Override
    public boolean isBroadcast() {
        return true;
    }

BroadcastPartitioner是廣播流專用的分區(qū)器。由于廣播流發(fā)揮作用必須靠DataStream.connect()方法與正常的數(shù)據(jù)流連接起來趋距,所以實際上不需要BroadcastPartitioner來選擇分區(qū)(廣播數(shù)據(jù)總會投遞給下游算子的所有并發(fā))粒氧,selectChannel()方法也就不必實現(xiàn)了。細(xì)節(jié)請參見Flink中BroadcastStream相關(guān)的源碼节腐,這里就不再列舉了外盯。

RescalePartitioner

    private int nextChannelToSendTo = -1;
    // dataStream.rescale()
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        if (++nextChannelToSendTo >= numberOfChannels) {
            nextChannelToSendTo = 0;
        }
        return nextChannelToSendTo;
    }

這個看起來也太簡單了,并且與RebalancePartitioner的邏輯是相同的翼雀?實際上并不是饱苟。我們看看StreamingJobGraphGenerator類,它負(fù)責(zé)把Flink執(zhí)行計劃中的StreamGraph(邏輯執(zhí)行計劃)轉(zhuǎn)換為JobGraph(優(yōu)化的邏輯執(zhí)行計劃)锅纺。其connect()方法中有如下代碼掷空。

        if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) {
            jobEdge = downStreamVertex.connectNewDataSetAsInput(
                headVertex,
                DistributionPattern.POINTWISE,
                resultPartitionType);
        } else {
            jobEdge = downStreamVertex.connectNewDataSetAsInput(
                    headVertex,
                    DistributionPattern.ALL_TO_ALL,
                    resultPartitionType);

粗略地講,如果分區(qū)邏輯是RescalePartitioner或ForwardPartitioner(下面會說)囤锉,那么采用POINTWISE模式來連接上下游的頂點坦弟,對于其他分區(qū)邏輯,都用ALL_TO_ALL模式來連接官地∧鸢看下面兩張圖會比較容易理解。

也就是說驱入,POINTWISE模式的RescalePartitioner在中間結(jié)果傳送給下游節(jié)點時赤炒,會根據(jù)并行度的比值來輪詢分配給下游算子實例的子集氯析,對TaskManager來說本地性會比較好。而ALL_TO_ALL模式的RebalancePartitioner是真正的全局輪詢分配莺褒,更加均衡掩缓,但是就會不可避免地在節(jié)點之間交換數(shù)據(jù),如果數(shù)據(jù)量大的話遵岩,造成的網(wǎng)絡(luò)流量會很可觀你辣。

ForwardPartitioner

   // dataStream.forward()
   @Override
   public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
       return 0;
   }

與GlobalPartitioner的實現(xiàn)相同。但通過上面對POINTWISE和ALL_TO_ALL連接模式的講解尘执,我們能夠知道舍哄,它會將數(shù)據(jù)輸出到本地運行的下游算子的第一個實例,而非全局誊锭。在上下游算子的并行度相同的情況下表悬,默認(rèn)就會采用ForwardPartitioner。反之丧靡,若上下游算子的并行度不同蟆沫,默認(rèn)會采用前述的RebalancePartitioner。

CustomPartitionerWrapper

    Partitioner<K> partitioner;
    KeySelector<T, K> keySelector;
    // dataStream.partitionCustom()
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        K key;
        try {
            key = keySelector.getKey(record.getInstance().getValue());
        } catch (Exception e) {
            throw new RuntimeException("Could not extract key from " + record.getInstance(), e);
        }

        return partitioner.partition(key, numberOfChannels);
    }

這就是自定義的分區(qū)邏輯了窘行,我們可以通過繼承Partitioner接口自己實現(xiàn)饥追,并傳入partitionCustom()方法。舉個簡單的栗子罐盔,以key的長度做分區(qū):

    sourceStream.partitionCustom(new Partitioner<String>() {
      @Override
      public int partition(String key, int numPartitions) {
        return key.length() % numPartitions;
      }
    }, 0);

The End

明天早起搬磚但绕,民那晚安晚安。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末惶看,一起剝皮案震驚了整個濱河市捏顺,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌纬黎,老刑警劉巖幅骄,帶你破解...
    沈念sama閱讀 211,743評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異本今,居然都是意外死亡拆座,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,296評論 3 385
  • 文/潘曉璐 我一進店門冠息,熙熙樓的掌柜王于貴愁眉苦臉地迎上來挪凑,“玉大人,你說我怎么就攤上這事逛艰□锾迹” “怎么了?”我有些...
    開封第一講書人閱讀 157,285評論 0 348
  • 文/不壞的土叔 我叫張陵散怖,是天一觀的道長菇绵。 經(jīng)常有香客問我肄渗,道長,這世上最難降的妖魔是什么咬最? 我笑而不...
    開封第一講書人閱讀 56,485評論 1 283
  • 正文 為了忘掉前任翎嫡,我火速辦了婚禮,結(jié)果婚禮上永乌,老公的妹妹穿的比我還像新娘钝的。我一直安慰自己,他們只是感情好铆遭,可當(dāng)我...
    茶點故事閱讀 65,581評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著沿猜,像睡著了一般枚荣。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上啼肩,一...
    開封第一講書人閱讀 49,821評論 1 290
  • 那天橄妆,我揣著相機與錄音,去河邊找鬼祈坠。 笑死害碾,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的赦拘。 我是一名探鬼主播慌随,決...
    沈念sama閱讀 38,960評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼躺同!你這毒婦竟也來了阁猜?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,719評論 0 266
  • 序言:老撾萬榮一對情侶失蹤蹋艺,失蹤者是張志新(化名)和其女友劉穎剃袍,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體捎谨,經(jīng)...
    沈念sama閱讀 44,186評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡民效,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,516評論 2 327
  • 正文 我和宋清朗相戀三年涛救,在試婚紗的時候發(fā)現(xiàn)自己被綠了畏邢。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,650評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡州叠,死狀恐怖棵红,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情咧栗,我是刑警寧澤逆甜,帶...
    沈念sama閱讀 34,329評論 4 330
  • 正文 年R本政府宣布虱肄,位于F島的核電站,受9級特大地震影響交煞,放射性物質(zhì)發(fā)生泄漏咏窿。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,936評論 3 313
  • 文/蒙蒙 一素征、第九天 我趴在偏房一處隱蔽的房頂上張望集嵌。 院中可真熱鬧,春花似錦御毅、人聲如沸根欧。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,757評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽凤粗。三九已至,卻和暖如春今豆,著一層夾襖步出監(jiān)牢的瞬間嫌拣,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,991評論 1 266
  • 我被黑心中介騙來泰國打工呆躲, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留异逐,地道東北人。 一個月前我還...
    沈念sama閱讀 46,370評論 2 360
  • 正文 我出身青樓插掂,卻偏偏與公主長得像灰瞻,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子燥筷,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,527評論 2 349

推薦閱讀更多精彩內(nèi)容

  • 在Flink中箩祥,由用戶代碼生成調(diào)度層圖結(jié)構(gòu),可以分成3步走:通過Stream API編寫的用戶代碼 -> Stre...
    MaQingxiang閱讀 4,486評論 0 18
  • Data Sources 源是程序讀取輸入數(shù)據(jù)的位置肆氓∨圩妫可以使用 StreamExecutionEnvironmen...
    Alex90閱讀 2,951評論 0 1
  • 1.早上老公開店門,我做完鏡子練習(xí)谢揪,在穩(wěn)穩(wěn)下樓蕉陋,開啟新一天的工作 2.今天送貨,回來后妹妹告訴我老公與客人發(fā)生爭執(zhí)...
    藍(lán)夢多雨閱讀 241評論 0 3
  • 太宰治的《人間失格》發(fā)表于1948年拨扶。這是一部關(guān)于太宰治青年時代的自傳小說凳鬓。太宰治用細(xì)膩的筆觸,告白了自己前半生患民。...
    天羽織閱讀 2,199評論 0 0
  • 《7號房的禮物》是一部講述父愛親情的故事缩举。講述了在患有精神遲滯癥狀的父親受到莫名犯罪指控之后,其女兒在法庭上為其做...
    餃子蘸醋閱讀 258評論 0 0