Flink源碼分析之StreamGraph

StreamGraph是什么皿桑?

StreamGraph顧名思義是流圖详拙,它描述的是一個(gè)數(shù)據(jù)流的拓補(bǔ)結(jié)構(gòu)叹阔,包含了創(chuàng)建JobGraph的所有必要信息质蕉。StreamGraph由StreamNode和StreamEdge組成势篡,StreamNode描述了流程序中的一個(gè)操作符和相關(guān)的屬性翩肌,StreamEdge是連接兩個(gè)StreamNode的邊,代表的是一個(gè)數(shù)據(jù)流禁悠。


為什么需要StreamGraph念祭?

Flink是一個(gè)流式計(jì)算引擎,它支持兩種類型的執(zhí)行模式:流式計(jì)算和批處理碍侦。但是在運(yùn)行時(shí)粱坤,這兩種模式是被統(tǒng)一,并沒有區(qū)別對(duì)待瓷产,都是使用JobGraph進(jìn)行描述站玄。為了描述這兩種執(zhí)行模式,分別定義了不同的數(shù)據(jù)結(jié)構(gòu)濒旦,那就是StreamGraph和Plan株旷,基于這兩種數(shù)據(jù)結(jié)構(gòu)生成相應(yīng)的JobGraph。

如何創(chuàng)建StreamGraph尔邓?

在介紹如何創(chuàng)建StreamGraph之前晾剖,我們先介紹一些跟StreamGraph相關(guān)的和新概念和數(shù)據(jù)結(jié)構(gòu)。

StreamExecutionEnvironment

流的執(zhí)行環(huán)境梯嗽,包含流的ExecutionConfig齿尽,CheckpointConfig,StreamTransformation列表和StateBackend等灯节。它是生成StreamGraph的入口:

    public StreamGraph getStreamGraph() {
        if (transformations.size() <= 0) {
            throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
        }
        return StreamGraphGenerator.generate(this, transformations);
    }

StreamGraphGenerator

顧名思義雕什,StreamGraph生成器,真正執(zhí)行和生成StreamGraph的類显晶。它根據(jù)創(chuàng)建流拓補(bǔ)結(jié)構(gòu)過程中生成的StreamTransformation贷岸,解析生成相應(yīng)StreamNode和StreamEdge,組合生成StreamGraph磷雇。

    private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
        for (StreamTransformation<?> transformation: transformations) {
            transform(transformation);
        }
        return streamGraph;
    }

StreamTransformation

StreamTransformation代表了一個(gè)創(chuàng)建DataStream的操作偿警,每個(gè)DataStream都有一個(gè)底層的StreamTransformation。但是并不是每一個(gè)StreamTransformation都對(duì)應(yīng)一個(gè)運(yùn)行時(shí)的物理操作唯笙。例如:



上圖左邊對(duì)應(yīng)的是創(chuàng)建流拓補(bǔ)結(jié)構(gòu)是對(duì)應(yīng)的流圖螟蒸,而右邊則是運(yùn)行時(shí)的操作圖。
StreamTransformation是一個(gè)抽象類型崩掘,具體對(duì)應(yīng)的實(shí)現(xiàn)由很多種:

  • SourceTransformation
  • OneInputTransformation
  • SideOutputTransformation
  • SelectTransformation
  • PartitionTransformation
  • CoFeedbackTransformation
  • FeedbackTransformation
  • UnionTransformation
  • SplitTransformation
  • TwoInputTransformation
  • SinkTransformation

每種類型的StreamTransformation都對(duì)應(yīng)相應(yīng)的operator七嫌,通過解析具體的StreamTransformation,創(chuàng)建StreamNode和StreamEdge苞慢。當(dāng)然前面已經(jīng)說過诵原,并不是每種類型的StreamTransformation都會(huì)創(chuàng)建StreamNode和StreamEdge,下面看下具體的生成過程:

    private Collection<Integer> transform(StreamTransformation<?> transform) {

        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }

        LOG.debug("Transforming " + transform);

        if (transform.getMaxParallelism() <= 0) {

            // if the max parallelism hasn't been set, then first use the job wide max parallelism
            // from theExecutionConfig.
            int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();
            if (globalMaxParallelismFromConfig > 0) {
                transform.setMaxParallelism(globalMaxParallelismFromConfig);
            }
        }

        // call at least once to trigger exceptions about MissingTypeInfo
        transform.getOutputType();

        Collection<Integer> transformedIds;
        if (transform instanceof OneInputTransformation<?, ?>) {
            transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
        } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
            transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
        } else if (transform instanceof SourceTransformation<?>) {
            transformedIds = transformSource((SourceTransformation<?>) transform);
        } else if (transform instanceof SinkTransformation<?>) {
            transformedIds = transformSink((SinkTransformation<?>) transform);
        } else if (transform instanceof UnionTransformation<?>) {
            transformedIds = transformUnion((UnionTransformation<?>) transform);
        } else if (transform instanceof SplitTransformation<?>) {
            transformedIds = transformSplit((SplitTransformation<?>) transform);
        } else if (transform instanceof SelectTransformation<?>) {
            transformedIds = transformSelect((SelectTransformation<?>) transform);
        } else if (transform instanceof FeedbackTransformation<?>) {
            transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
        } else if (transform instanceof CoFeedbackTransformation<?>) {
            transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
        } else if (transform instanceof PartitionTransformation<?>) {
            transformedIds = transformPartition((PartitionTransformation<?>) transform);
        } else if (transform instanceof SideOutputTransformation<?>) {
            transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
        } else {
            throw new IllegalStateException("Unknown transformation: " + transform);
        }

        // need this check because the iterate transformation adds itself before
        // transforming the feedback edges
        if (!alreadyTransformed.containsKey(transform)) {
            alreadyTransformed.put(transform, transformedIds);
        }

        if (transform.getBufferTimeout() > 0) {
            streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
        }
        if (transform.getUid() != null) {
            streamGraph.setTransformationUID(transform.getId(), transform.getUid());
        }
        if (transform.getUserProvidedNodeHash() != null) {
            streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
        }

        if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
            streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
        }

        return transformedIds;
    }

以O(shè)neInputTransformation為例,看下是如何創(chuàng)建StreamNode和StreamEdge绍赛。

    private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {

        Collection<Integer> inputIds = transform(transform.getInput());

        // the recursive call might have already transformed this
        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }

        String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);

        streamGraph.addOperator(transform.getId(),
                slotSharingGroup,
                transform.getOperator(),
                transform.getInputType(),
                transform.getOutputType(),
                transform.getName());

        if (transform.getStateKeySelector() != null) {
            TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
            streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
        }

        streamGraph.setParallelism(transform.getId(), transform.getParallelism());
        streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());

        for (Integer inputId: inputIds) {
            streamGraph.addEdge(inputId, transform.getId(), 0);
        }

        return Collections.singleton(transform.getId());
    }

第一步:解析OneInputTransformation的輸入源(也是一個(gè)StreamTransformation蔓纠,其實(shí)這是一個(gè)遞歸的過程,從最后一個(gè)StreamTransformation一直解析到第一個(gè)StreamTransformation)吗蚌,得到輸入源的節(jié)點(diǎn)id腿倚。

        Collection<Integer> inputIds = transform(transform.getInput());

第二步:創(chuàng)建StreamNode,并添加到StreamGraph中蚯妇。

   streamGraph.addOperator(transform.getId(),
               slotSharingGroup,
               transform.getOperator(),
               transform.getInputType(),
               transform.getOutputType(),
               transform.getName());

第三步:往StreamGraph中添加關(guān)聯(lián)輸入源和節(jié)點(diǎn)的邊敷燎。

    for (Integer inputId: inputIds) {
        streamGraph.addEdge(inputId, transform.getId(), 0);
    }

當(dāng)遍歷完所有的StreamTransformation,則相應(yīng)的StreamGraph也已經(jīng)生成箩言。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末懈叹,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子分扎,更是在濱河造成了極大的恐慌澄成,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,406評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件畏吓,死亡現(xiàn)場(chǎng)離奇詭異墨状,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)菲饼,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門肾砂,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人宏悦,你說我怎么就攤上這事镐确。” “怎么了饼煞?”我有些...
    開封第一講書人閱讀 163,711評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵源葫,是天一觀的道長。 經(jīng)常有香客問我砖瞧,道長息堂,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,380評(píng)論 1 293
  • 正文 為了忘掉前任块促,我火速辦了婚禮荣堰,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘竭翠。我一直安慰自己振坚,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,432評(píng)論 6 392
  • 文/花漫 我一把揭開白布斋扰。 她就那樣靜靜地躺著渡八,像睡著了一般啃洋。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上呀狼,一...
    開封第一講書人閱讀 51,301評(píng)論 1 301
  • 那天,我揣著相機(jī)與錄音损离,去河邊找鬼哥艇。 笑死,一個(gè)胖子當(dāng)著我的面吹牛僻澎,可吹牛的內(nèi)容都是我干的貌踏。 我是一名探鬼主播,決...
    沈念sama閱讀 40,145評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼窟勃,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼祖乳!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起秉氧,我...
    開封第一講書人閱讀 39,008評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤眷昆,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后汁咏,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體亚斋,經(jīng)...
    沈念sama閱讀 45,443評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,649評(píng)論 3 334
  • 正文 我和宋清朗相戀三年攘滩,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了帅刊。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,795評(píng)論 1 347
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡漂问,死狀恐怖赖瞒,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情蚤假,我是刑警寧澤栏饮,帶...
    沈念sama閱讀 35,501評(píng)論 5 345
  • 正文 年R本政府宣布,位于F島的核電站磷仰,受9級(jí)特大地震影響抡爹,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜芒划,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,119評(píng)論 3 328
  • 文/蒙蒙 一冬竟、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧民逼,春花似錦泵殴、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽调缨。三九已至,卻和暖如春吆你,著一層夾襖步出監(jiān)牢的瞬間弦叶,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評(píng)論 1 269
  • 我被黑心中介騙來泰國打工妇多, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留伤哺,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,899評(píng)論 2 370
  • 正文 我出身青樓者祖,卻偏偏與公主長得像立莉,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子七问,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,724評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容