Operator Chains的生成

目錄

Chaining operators together into tasks is a useful optimization: it reduces the overhead of thread-to-thread handover and buffering, and increases overall throughput while decreasing latency.

上面是官方對Operator Chains的解釋厌小,以及示意圖,那么看到這個圖的時候會產(chǎn)生一個疑問扑浸,什么場景什么樣的算子會產(chǎn)生Operator Chains?本文將詳細解答這個疑問。

Flink job執(zhí)行作業(yè)過程

下面是一段樣例代碼贝淤,傳送門

public class TaskDemo {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);

        env.addSource(new DataSource())
                .map(new MyMapFunction())
                .keyBy(0)
                .process(new MyKeyedProcessFunction())
                .addSink(new DataSink()).setParallelism(1).name("Custom Sink");

        env.execute();
    }
}

從這樣一段Program到flink真正的執(zhí)行系羞,會經(jīng)過如下一個流程
Program-> StreamGraph-> JobGraph-> ExecutionGraph加缘,那么生成Operator Chains就是在StreamGraph-> JobGraph這個階段。

Operator Chains生成規(guī)則

StreamingJobGraphGenerator這個類觉啊,是負責(zé)將StreamGraph轉(zhuǎn)成JobGraph。其中isChainable方法就是用來判斷operator之間是否可以構(gòu)成chain沈贝。

public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
  StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
  StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

  StreamOperatorFactory<?> headOperator = upStreamVertex.getOperatorFactory();
  StreamOperatorFactory<?> outOperator = downStreamVertex.getOperatorFactory();

  return downStreamVertex.getInEdges().size() == 1
    && outOperator != null
    && headOperator != null
    && upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
    && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
    && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
        headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
    && (edge.getPartitioner() instanceof ForwardPartitioner)
    && edge.getShuffleMode() != ShuffleMode.BATCH
    && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
    && streamGraph.isChainingEnabled();
}

可以看到是否可以組成chain判斷了10個條件杠人,有任何一個不滿足,兩個operator就不能構(gòu)成一個chain宋下。在分析這10個條件之前嗡善,讀者需要明白,在flink中把程序轉(zhuǎn)化成了一個有向圖学歧,這個圖的頂點就是每個operator罩引,邊作為operator之間的連接關(guān)系有很多的屬性,比如outputPartitioner等枝笨。
1. downStreamVertex.getInEdges().size() == 1下游算子的入邊只有一個袁铐,也就是說它的上游的算子只能有一個。比如上面的樣例程序横浑,map的上游算子是source剔桨,而且只有一個source,就滿足這個條件徙融。如果是下面的程序洒缀,map的上游有兩個source,即downStreamVertex.getInEdges()=2欺冀,這樣sourcemap兩個算子就不能構(gòu)成一個chain树绩。

public class UnionDemo {
    public static void main(String[] args) throws Exception{
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);

        DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new DataSource("orangeStream"));
        DataStream<Tuple2<String, Integer>> greenStream = env.addSource(new DataSource("greenStream"));

        orangeStream.union(greenStream).map(new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
                return value;
            }
        }).print("union");
        env.execute("Union Demo");
    }
}

2.outOperator != null表示下游頂點的算子不能為null,例如TaskDemo樣例中map頂點的算子就是StreamMap隐轩。

3.headOperator != null表示上游頂點的算子不能為null饺饭,例如TaskDemo樣例中source頂點的算子就是StreamSource。

4.upStreamVertex.isSameSlotSharingGroup(downStreamVertex)這個條件的意思是兩個頂點在同一個槽位共享組中职车。在StreamGraphGenerator#determineSlotSharingGroup中確定了槽位共享組

/**
     * Determines the slot sharing group for an operation based on the slot sharing group set by
     * the user and the slot sharing groups of the inputs.
     *
     * <p>If the user specifies a group name, this is taken as is. If nothing is specified and
     * the input operations all have the same group name then this name is taken. Otherwise the
     * default group is chosen.
     *
     * @param specifiedGroup The group specified by the user.
     * @param inputIds The IDs of the input operations.
     */
private String determineSlotSharingGroup(String specifiedGroup, Collection<Integer> inputIds) {
  if (!isSlotSharingEnabled) {
    return null;
  }

  if (specifiedGroup != null) {
    return specifiedGroup;
  } else {
    String inputGroup = null;
    for (int id: inputIds) {
      String inputGroupCandidate = streamGraph.getSlotSharingGroup(id);
      if (inputGroup == null) {
        inputGroup = inputGroupCandidate;
      } else if (!inputGroup.equals(inputGroupCandidate)) {
        return DEFAULT_SLOT_SHARING_GROUP;
      }
    }
    return inputGroup == null ? DEFAULT_SLOT_SHARING_GROUP : inputGroup;
  }
}

官方對槽位共享組的描述如下:

Set the slot sharing group of an operation. Flink will put operations with the same slot sharing group into the same slot while keeping operations that don't have the slot sharing group in other slots. This can be used to isolate slots. The slot sharing group is inherited from input operations if all input operations are in the same slot sharing group. The name of the default slot sharing group is "default", operations can explicitly be put into this group by calling slotSharingGroup("default").

5.outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS下游頂點的算子連接策略是ALWAYS砰奕,默認是ALWAYS,如果算子調(diào)用了disableChaining()提鸟,會設(shè)置為NEVER军援;如果算子調(diào)用了startNewChain(),會設(shè)置為HEAD称勋。

6.(headOperator.getChainingStrategy()==ChainingStrategy.HEAD || headOperator.getChainingStrategy()==ChainingStrategy.ALWAYS)胸哥,和上一條一樣,上游頂點的算子連接策略必須是HEAD或者ALWAYS赡鲜。

7.(edge.getPartitioner() instanceof ForwardPartitioner)這個表示邊的Partitioner必須是ForwardPartitioner空厌,這個是在生成StreamGraph的時候庐船,在StreamGraph#addEdgeInternal中確定的,詳細的過程可以參見這個方法嘲更。

8.edge.getShuffleMode() != ShuffleMode.BATCH筐钟,shuffle模式不能是BATCH,一共有3種模式赋朦,shuffle模式的確定也是在StreamGraph#addEdgeInternal中篓冲。

/**
 * The shuffle mode defines the data exchange mode between operators.
 */
@PublicEvolving
public enum ShuffleMode {
    /**
     * Producer and consumer are online at the same time.
     * Produced data is received by consumer immediately.
     */
    PIPELINED,

    /**
     * The producer first produces its entire result and finishes.
     * After that, the consumer is started and may consume the data.
     */
    BATCH,

    /**
     * The shuffle mode is undefined. It leaves it up to the framework to decide the shuffle mode.
     * The framework will pick one of {@link ShuffleMode#BATCH} or {@link ShuffleMode#PIPELINED} in
     * the end.
     */
    UNDEFINED
}

9.upStreamVertex.getParallelism() == downStreamVertex.getParallelism()上游頂點的并行度和下游的要一樣。例如上面TaskDemo代碼中source和map的并行度都是1宠哄,所以可以構(gòu)成一個chain壹将,如果將map設(shè)置為2,那么他們就不能構(gòu)成一個chain毛嫉。

10.streamGraph.isChainingEnabled()默認這個值是true诽俯,如果調(diào)用了StreamExecutionEnvironment.disableOperatorChaining()那么streamGraph.isChainingEnabled()返回值就是false。

總結(jié)

本文詳細的分析了算子之間可以生成operator chain的條件承粤,對于Partitioner和ShuffleMode并沒有展開說明暴区,后續(xù)文章會對這部分進行補充。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末辛臊,一起剝皮案震驚了整個濱河市颜启,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌浪讳,老刑警劉巖缰盏,帶你破解...
    沈念sama閱讀 206,126評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異淹遵,居然都是意外死亡口猜,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評論 2 382
  • 文/潘曉璐 我一進店門透揣,熙熙樓的掌柜王于貴愁眉苦臉地迎上來济炎,“玉大人,你說我怎么就攤上這事辐真⌒肷校” “怎么了?”我有些...
    開封第一講書人閱讀 152,445評論 0 341
  • 文/不壞的土叔 我叫張陵侍咱,是天一觀的道長耐床。 經(jīng)常有香客問我,道長楔脯,這世上最難降的妖魔是什么撩轰? 我笑而不...
    開封第一講書人閱讀 55,185評論 1 278
  • 正文 為了忘掉前任,我火速辦了婚禮,結(jié)果婚禮上堪嫂,老公的妹妹穿的比我還像新娘偎箫。我一直安慰自己,他們只是感情好皆串,可當(dāng)我...
    茶點故事閱讀 64,178評論 5 371
  • 文/花漫 我一把揭開白布淹办。 她就那樣靜靜地躺著,像睡著了一般恶复。 火紅的嫁衣襯著肌膚如雪怜森。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 48,970評論 1 284
  • 那天寂玲,我揣著相機與錄音,去河邊找鬼梗摇。 笑死拓哟,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的伶授。 我是一名探鬼主播断序,決...
    沈念sama閱讀 38,276評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼糜烹!你這毒婦竟也來了违诗?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,927評論 0 259
  • 序言:老撾萬榮一對情侶失蹤疮蹦,失蹤者是張志新(化名)和其女友劉穎诸迟,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體愕乎,經(jīng)...
    沈念sama閱讀 43,400評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡阵苇,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,883評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了感论。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片绅项。...
    茶點故事閱讀 37,997評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖比肄,靈堂內(nèi)的尸體忽然破棺而出快耿,到底是詐尸還是另有隱情,我是刑警寧澤芳绩,帶...
    沈念sama閱讀 33,646評論 4 322
  • 正文 年R本政府宣布掀亥,位于F島的核電站,受9級特大地震影響妥色,放射性物質(zhì)發(fā)生泄漏铺浇。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,213評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望鳍侣。 院中可真熱鬧丁稀,春花似錦、人聲如沸倚聚。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽惑折。三九已至授账,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間惨驶,已是汗流浹背白热。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評論 1 260
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留粗卜,地道東北人屋确。 一個月前我還...
    沈念sama閱讀 45,423評論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像续扔,于是被迫代替她去往敵國和親攻臀。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,722評論 2 345