StreamGraph生成

例子

如SocketWindowWordCount例子為例譬猫,分析Graph的構(gòu)建過程撇眯;

public static void main(String[] args) throws Exception {
        // get the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data by connecting to the socket
        DataStream<String> text = env.socketTextStream(hostname, port, "\n");

        // parse the data, group it, window it, and aggregate the counts
        DataStream<WordWithCount> windowCounts =
                text.flatMap(
                                new FlatMapFunction<String, WordWithCount>() {
                                    @Override
                                    public void flatMap(
                                            String value, Collector<WordWithCount> out) {
                                        for (String word : value.split("\\s")) {
                                            out.collect(new WordWithCount(word, 1L));
                                        }
                                    }
                                })
                        .keyBy(value -> value.word)
                        .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                        /**
                        * han_pf
                        * keyby,window不會往transformations list中裝入transformation捎琐,reduce才會
                        */
                        .reduce(
                                new ReduceFunction<WordWithCount>() {
                                    @Override
                                    public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                                        return new WordWithCount(a.word, a.count + b.count);
                                    }
                                });
        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");
    }

如代碼,整個(gè)算子鏈執(zhí)行流程如下圖所示:
其中在算子構(gòu)成過程中否彩,當(dāng)前transformation創(chuàng)建時(shí)會以上游算子作為輸入(input);而對于某些只是改變數(shù)據(jù)流類型的算子對應(yīng)的transformation不會添加到transformations集合中嗦随,這種算子只是對流的類型作了改變列荔,而沒有具體的業(yè)務(wù)處理。在生成streamNode過程中會生成virtualPartitionNodes;如下圖枚尼,只有flatMap贴浙、reduce、sink對應(yīng)的transformation才會加入transformations集合中署恍。

image.png

StreamGraph的生成

如上執(zhí)行execute()方法崎溃,會調(diào)用StreamExecutionEnvironment.getStreamGraph()方法,然后調(diào)用StreamGraphGenerator.generate()方法盯质,執(zhí)行生成過程.

    public JobExecutionResult execute(String jobName) throws Exception {
        Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
        /**
        * han_pf
        * 重點(diǎn)看
        */
        final StreamGraph streamGraph = getStreamGraph();
        streamGraph.setJobName(jobName);
        return execute(streamGraph);
    }
    public StreamGraph getStreamGraph(boolean clearTransformations) {
        final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate();
        if (clearTransformations) {
            transformations.clear();
        }
        return streamGraph;
    }

StreamGraphGenerator

首先笨奠,會對之前添加到transformations集合中的transformation進(jìn)行遍歷,并且執(zhí)行轉(zhuǎn)換唤殴,具體轉(zhuǎn)換過程如下般婆,會從初始化的translatorMap中根據(jù)transformation類型獲取對應(yīng)的translator進(jìn)行算子轉(zhuǎn)換。
整個(gè)執(zhí)行流程:


image.png
translatorMap
static {
        @SuppressWarnings("rawtypes")
        Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>>
                tmp = new HashMap<>();
        tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator<>());
        tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator<>());
        tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
        tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
        tmp.put(SourceTransformation.class, new SourceTransformationTranslator<>());
        tmp.put(SinkTransformation.class, new SinkTransformationTranslator<>());
        tmp.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator<>());
        tmp.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator<>());
        tmp.put(UnionTransformation.class, new UnionTransformationTranslator<>());
        tmp.put(PartitionTransformation.class, new PartitionTransformationTranslator<>());
        tmp.put(SideOutputTransformation.class, new SideOutputTransformationTranslator<>());
        tmp.put(ReduceTransformation.class, new ReduceTransformationTranslator<>());
        tmp.put(
                TimestampsAndWatermarksTransformation.class,
                new TimestampsAndWatermarksTransformationTranslator<>());
        tmp.put(BroadcastStateTransformation.class, new BroadcastStateTransformationTranslator<>());
        tmp.put(
                KeyedBroadcastStateTransformation.class,
                new KeyedBroadcastStateTransformationTranslator<>());
        translatorMap = Collections.unmodifiableMap(tmp);
    }
public StreamGraph generate() {

        streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);

        streamGraph.setEnableCheckpointsAfterTasksFinish(
                configuration.get(
                        ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH));

        shouldExecuteInBatchMode = shouldExecuteInBatchMode();

        configureStreamGraph(streamGraph);

        alreadyTransformed = new HashMap<>();
        /**
        * han_pf
        * 遍歷之前設(shè)置進(jìn)去的transformations
        */
        for (Transformation<?> transformation : transformations) {
            transform(transformation);
        }

        streamGraph.setSlotSharingGroupResource(slotSharingGroupResources);

        setFineGrainedGlobalStreamExchangeMode(streamGraph);

        for (StreamNode node : streamGraph.getStreamNodes()) {
            if (node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing)) {
                for (StreamEdge edge : node.getInEdges()) {
                    edge.setSupportsUnalignedCheckpoints(false);
                }
            }
        }

        final StreamGraph builtStreamGraph = streamGraph;

        alreadyTransformed.clear();
        alreadyTransformed = null;
        streamGraph = null;

        return builtStreamGraph;
    }

private Collection<Integer> transform(Transformation<?> transform) {
        
      //省略部分代碼
        // call at least once to trigger exceptions about MissingTypeInfo
        transform.getOutputType();

        @SuppressWarnings("unchecked")
        final TransformationTranslator<?, Transformation<?>> translator =
                (TransformationTranslator<?, Transformation<?>>)
                        translatorMap.get(transform.getClass());

        Collection<Integer> transformedIds;
        if (translator != null) {
            /**
            * han_pf
            * 根據(jù)算子類型調(diào)用不同的translator進(jìn)行轉(zhuǎn)換
            */
            transformedIds = translate(translator, transform);
        } else {
            transformedIds = legacyTransform(transform);
        }

        // need this check because the iterate transformation adds itself before
        // transforming the feedback edges
        if (!alreadyTransformed.containsKey(transform)) {
            alreadyTransformed.put(transform, transformedIds);
        }

        return transformedIds;
    }
核心的translate(translator,transform)方法

首先朵逝,在遍歷transformations集合中的transformation時(shí)會獲取當(dāng)前transformation的input算子蔚袍,見getParentInputs(transform.getInputs())方法,會進(jìn)行遞歸調(diào)用transform()方法配名,將上游transformation轉(zhuǎn)換完成啤咽。

 private Collection<Integer> translate(
            final TransformationTranslator<?, Transformation<?>> translator,
            final Transformation<?> transform) {
        checkNotNull(translator);
        checkNotNull(transform);
        /**
        * han_pf
        * 涉及遞歸調(diào)用當(dāng)前算子 input算子的解析
        */
        final List<Collection<Integer>> allInputIds = getParentInputIds(transform.getInputs());

        // the recursive call might have already transformed this
        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }
        /**
        * han_pf
        * 判斷當(dāng)前算子是否設(shè)置了slotSharingGroup,默認(rèn)名稱是default渠脉,會以此判斷是否共享slot;
         * 不同task的subtask之間可以共享slot宇整,
        */
        final String slotSharingGroup =
                determineSlotSharingGroup(
                        transform.getSlotSharingGroup().isPresent()
                                ? transform.getSlotSharingGroup().get().getName()
                                : null,
                        allInputIds.stream()
                                .flatMap(Collection::stream)
                                .collect(Collectors.toList()));

        final TransformationTranslator.Context context =
                new ContextImpl(this, streamGraph, slotSharingGroup, configuration);

        return shouldExecuteInBatchMode
                ? translator.translateForBatch(transform, context)
                : translator.translateForStreaming(transform, context);
    }

   private List<Collection<Integer>> getParentInputIds(
            @Nullable final Collection<Transformation<?>> parentTransformations) {
        final List<Collection<Integer>> allInputIds = new ArrayList<>();
        if (parentTransformations == null) {
            return allInputIds;
        }

        for (Transformation<?> transformation : parentTransformations) {
            allInputIds.add(transform(transformation));
        }
        return allInputIds;
    }

SimpleTransformationTranslator

   @Override
    public final Collection<Integer> translateForStreaming(
            final T transformation, final Context context) {
        checkNotNull(transformation);
        checkNotNull(context);

        final Collection<Integer> transformedIds =
                translateForStreamingInternal(transformation, context);

        configure(transformation, context);

        return transformedIds;
    

translateForStreamingInternal會調(diào)用對應(yīng)類型的translator進(jìn)行轉(zhuǎn)化,對于類似keyby等生成的partionTransformation會調(diào)用PartitionTransformationTranslator進(jìn)行解析會將這種算子生成VirtualPartitionNode,其他的如OneInputTransformationTranslator會將transformation轉(zhuǎn)換成StreamNode芋膘,代碼如下:

image.png

PartitionTransformationTranslator

public class PartitionTransformationTranslator<OUT>
        extends SimpleTransformationTranslator<OUT, PartitionTransformation<OUT>> {

    @Override
    protected Collection<Integer> translateForBatchInternal(
            final PartitionTransformation<OUT> transformation, final Context context) {
        return translateInternal(transformation, context);
    }

    @Override
    protected Collection<Integer> translateForStreamingInternal(
            final PartitionTransformation<OUT> transformation, final Context context) {
        return translateInternal(transformation, context);
    }

    private Collection<Integer> translateInternal(
            final PartitionTransformation<OUT> transformation, final Context context) {
        checkNotNull(transformation);
        checkNotNull(context);

        final StreamGraph streamGraph = context.getStreamGraph();

        final List<Transformation<?>> parentTransformations = transformation.getInputs();
        checkState(
                parentTransformations.size() == 1,
                "Expected exactly one input transformation but found "
                        + parentTransformations.size());
        final Transformation<?> input = parentTransformations.get(0);

        List<Integer> resultIds = new ArrayList<>();

        for (Integer inputId : context.getStreamNodeIds(input)) {
            final int virtualId = Transformation.getNewNodeId();
            streamGraph.addVirtualPartitionNode(
                    inputId,
                    virtualId,
                    transformation.getPartitioner(),
                    transformation.getExchangeMode());
            resultIds.add(virtualId);
        }
        return resultIds;
    }
}

以上對于StreamGraph生成整體流程進(jìn)行介紹鳞青,后續(xù)再對生成過程中的細(xì)節(jié),如streamNode及edge的生成進(jìn)行介紹为朋。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末臂拓,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子习寸,更是在濱河造成了極大的恐慌胶惰,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,482評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件霞溪,死亡現(xiàn)場離奇詭異孵滞,居然都是意外死亡中捆,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評論 2 382
  • 文/潘曉璐 我一進(jìn)店門坊饶,熙熙樓的掌柜王于貴愁眉苦臉地迎上來泄伪,“玉大人,你說我怎么就攤上這事幼东”廴荩” “怎么了科雳?”我有些...
    開封第一講書人閱讀 152,762評論 0 342
  • 文/不壞的土叔 我叫張陵根蟹,是天一觀的道長。 經(jīng)常有香客問我糟秘,道長简逮,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,273評論 1 279
  • 正文 為了忘掉前任尿赚,我火速辦了婚禮散庶,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘凌净。我一直安慰自己悲龟,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,289評論 5 373
  • 文/花漫 我一把揭開白布冰寻。 她就那樣靜靜地躺著须教,像睡著了一般。 火紅的嫁衣襯著肌膚如雪斩芭。 梳的紋絲不亂的頭發(fā)上轻腺,一...
    開封第一講書人閱讀 49,046評論 1 285
  • 那天,我揣著相機(jī)與錄音划乖,去河邊找鬼贬养。 笑死,一個(gè)胖子當(dāng)著我的面吹牛琴庵,可吹牛的內(nèi)容都是我干的误算。 我是一名探鬼主播,決...
    沈念sama閱讀 38,351評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼迷殿,長吁一口氣:“原來是場噩夢啊……” “哼尉桩!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起贪庙,我...
    開封第一講書人閱讀 36,988評論 0 259
  • 序言:老撾萬榮一對情侶失蹤蜘犁,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后止邮,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體这橙,經(jīng)...
    沈念sama閱讀 43,476評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡奏窑,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,948評論 2 324
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了屈扎。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片埃唯。...
    茶點(diǎn)故事閱讀 38,064評論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖鹰晨,靈堂內(nèi)的尸體忽然破棺而出墨叛,到底是詐尸還是另有隱情,我是刑警寧澤模蜡,帶...
    沈念sama閱讀 33,712評論 4 323
  • 正文 年R本政府宣布漠趁,位于F島的核電站,受9級特大地震影響忍疾,放射性物質(zhì)發(fā)生泄漏闯传。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,261評論 3 307
  • 文/蒙蒙 一卤妒、第九天 我趴在偏房一處隱蔽的房頂上張望甥绿。 院中可真熱鬧,春花似錦则披、人聲如沸共缕。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,264評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽图谷。三九已至,卻和暖如春判没,著一層夾襖步出監(jiān)牢的瞬間蜓萄,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,486評論 1 262
  • 我被黑心中介騙來泰國打工澄峰, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留嫉沽,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,511評論 2 354
  • 正文 我出身青樓俏竞,卻偏偏與公主長得像绸硕,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子魂毁,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,802評論 2 345

推薦閱讀更多精彩內(nèi)容