Flink源碼閱讀(一)--- StreamGraph 的生成

本文內(nèi)容是基于Flink 1.9來講解缸沃。在執(zhí)行Flink任務(wù)的時候,會涉及到三個Graph修械,分別是StreamGraph趾牧,JobGraph,ExecutionGraph肯污。其中StreamGraph和JobGraph是在client端生成的武氓,ExecutionGraph是在JobMaster中執(zhí)行的。

  • StreamGraph是根據(jù)用戶代碼生成的最原始執(zhí)行圖仇箱,也就是直接翻譯用戶邏輯得到的圖
  • JobGraph是對StreamGraph進(jìn)行優(yōu)化,比如設(shè)置哪些算子可以chain东羹,減少網(wǎng)絡(luò)開銷
  • ExecutionGraph是用于作業(yè)調(diào)度的執(zhí)行圖剂桥,對JobGraph加了并行度的概念

本篇文章首先介紹下StreamGraph的生成

1. transformations生成

Flink引擎有很多算子,比如map, flatMap, join等属提,這些算子都會生成一個transformation权逗。比如對于flatMap算子美尸,咱們跟下源碼,看下DataStream#flatMap方法

    public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {

        TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
                getType(), Utils.getCallLocationName(), true);

        return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));

    }
  • 先獲取返回值類型相關(guān)信息
  • transform算子斟薇,跟進(jìn)去源碼繼續(xù)看的話师坎,會首先構(gòu)建一個OneInputTransformation對象,然后把該對象加入StreamExecutionEnvironment 的 transformations對象中

2. StreamGraph生成入口 StreamGraphGenerator#generate()方法

    public StreamGraph generate() {
        streamGraph = new StreamGraph(executionConfig, checkpointConfig);
        streamGraph.setStateBackend(stateBackend);
        streamGraph.setChaining(chaining);
        streamGraph.setScheduleMode(scheduleMode);
        streamGraph.setUserArtifacts(userArtifacts);
        streamGraph.setTimeCharacteristic(timeCharacteristic);
        streamGraph.setJobName(jobName);
        streamGraph.setBlockingConnectionsBetweenChains(blockingConnectionsBetweenChains);

        alreadyTransformed = new HashMap<>();

        for (Transformation<?> transformation: transformations) {
            transform(transformation);
        }

        final StreamGraph builtStreamGraph = streamGraph;

        alreadyTransformed.clear();
        alreadyTransformed = null;
        streamGraph = null;

        return builtStreamGraph;
    }

這個generate方法會對所有的transformations進(jìn)行轉(zhuǎn)換堪滨,咱們接著看下transform邏輯

    /**
     * Transforms one {@code Transformation}.
     *
     * <p>This checks whether we already transformed it and exits early in that case. If not it
     * delegates to one of the transformation specific methods.
     */
    private Collection<Integer> transform(Transformation<?> transform) {

        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }

        LOG.debug("Transforming " + transform);

        if (transform.getMaxParallelism() <= 0) {

            // if the max parallelism hasn't been set, then first use the job wide max parallelism
            // from the ExecutionConfig.
            int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();
            if (globalMaxParallelismFromConfig > 0) {
                transform.setMaxParallelism(globalMaxParallelismFromConfig);
            }
        }

        // call at least once to trigger exceptions about MissingTypeInfo
        transform.getOutputType();

        Collection<Integer> transformedIds;
        if (transform instanceof OneInputTransformation<?, ?>) {
            transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
        } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
            transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
        } else if (transform instanceof SourceTransformation<?>) {
            transformedIds = transformSource((SourceTransformation<?>) transform);
        } else if (transform instanceof SinkTransformation<?>) {
            transformedIds = transformSink((SinkTransformation<?>) transform);
        } else if (transform instanceof UnionTransformation<?>) {
            transformedIds = transformUnion((UnionTransformation<?>) transform);
        } else if (transform instanceof SplitTransformation<?>) {
            transformedIds = transformSplit((SplitTransformation<?>) transform);
        } else if (transform instanceof SelectTransformation<?>) {
            transformedIds = transformSelect((SelectTransformation<?>) transform);
        } else if (transform instanceof FeedbackTransformation<?>) {
            transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
        } else if (transform instanceof CoFeedbackTransformation<?>) {
            transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
        } else if (transform instanceof PartitionTransformation<?>) {
            transformedIds = transformPartition((PartitionTransformation<?>) transform);
        } else if (transform instanceof SideOutputTransformation<?>) {
            transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
        } else {
            throw new IllegalStateException("Unknown transformation: " + transform);
        }

        // need this check because the iterate transformation adds itself before
        // transforming the feedback edges
        if (!alreadyTransformed.containsKey(transform)) {
            alreadyTransformed.put(transform, transformedIds);
        }

        if (transform.getBufferTimeout() >= 0) {
            streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
        } else {
            streamGraph.setBufferTimeout(transform.getId(), defaultBufferTimeout);
        }

        if (transform.getUid() != null) {
            streamGraph.setTransformationUID(transform.getId(), transform.getUid());
        }
        if (transform.getUserProvidedNodeHash() != null) {
            streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
        }

        if (!streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()) {
            if (transform instanceof PhysicalTransformation &&
                    transform.getUserProvidedNodeHash() == null &&
                    transform.getUid() == null) {
                throw new IllegalStateException("Auto generated UIDs have been disabled " +
                    "but no UID or hash has been assigned to operator " + transform.getName());
            }
        }

        if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
            streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
        }

        return transformedIds;
    }

從源碼可以看出胯陋,transform在構(gòu)建的時候,會有多種類型袱箱,比如分為Source, Sink, OneInput, Split等遏乔。比如flatMap,就屬于OneInputTransformation发笔,接下來以比較常見的transformOneInputTransform進(jìn)行介紹盟萨。

    /**
     * Transforms a {@code OneInputTransformation}.
     *
     * <p>This recursively transforms the inputs, creates a new {@code StreamNode} in the graph and
     * wired the inputs to this new node.
     */
    private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {

        Collection<Integer> inputIds = transform(transform.getInput());

        // the recursive call might have already transformed this
        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }

        String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);

        streamGraph.addOperator(transform.getId(),
                slotSharingGroup,
                transform.getCoLocationGroupKey(),
                transform.getOperatorFactory(),
                transform.getInputType(),
                transform.getOutputType(),
                transform.getName());

        if (transform.getStateKeySelector() != null) {
            TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(executionConfig);
            streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
        }

        int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
            transform.getParallelism() : executionConfig.getParallelism();
        streamGraph.setParallelism(transform.getId(), parallelism);
        streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());

        for (Integer inputId: inputIds) {
            streamGraph.addEdge(inputId, transform.getId(), 0);
        }

        return Collections.singleton(transform.getId());
    }
  • 遞歸調(diào)用該算子所有的input節(jié)點(diǎn)
  • 把該Operator加入streamGraph中,實(shí)際會生成一個StreamNode對象加入streamGraph了讨。
    ?? 1. 首先調(diào)用streamGraph.addOperator
    ?? 2. 然后調(diào)用addNode方法
    protected StreamNode addNode(Integer vertexID,
        @Nullable String slotSharingGroup,
        @Nullable String coLocationGroup,
        Class<? extends AbstractInvokable> vertexClass,
        StreamOperatorFactory<?> operatorFactory,
        String operatorName) {

        if (streamNodes.containsKey(vertexID)) {
            throw new RuntimeException("Duplicate vertexID " + vertexID);
        }

        StreamNode vertex = new StreamNode(
            vertexID,
            slotSharingGroup,
            coLocationGroup,
            operatorFactory,
            operatorName,
            new ArrayList<OutputSelector<?>>(),
            vertexClass);

        streamNodes.put(vertexID, vertex);

        return vertex;
    }

這個addNode方法捻激,會把該operator轉(zhuǎn)換成一個StreamNode,然后加到StreamGraph中前计,vertexID對應(yīng)transform.getId()

  • 為該Operator的所有輸入與該Operator之間加上StreamEdge

這樣通過 StreamNode 和 SteamEdge胞谭,就構(gòu)建出了 DAG 中的所有節(jié)點(diǎn)和邊,以及它們之間的連接關(guān)系残炮,拓?fù)浣Y(jié)構(gòu)也就建立了韭赘。

3. 小結(jié)

StreamGraph其實(shí)就是由用戶代碼中涉及到transformations轉(zhuǎn)換來的,SteamEdge用來表示transformation之間的連接關(guān)系势就,StreamNode用來表示具體的operator泉瞻。

  • 從sink節(jié)點(diǎn)開始遍歷
  • 每個transformation,會在StreamGraph中新創(chuàng)建一個StreamNode苞冯,并且把新創(chuàng)建的StreamNode和它所有的input之間添加SteamEdge袖牙。
  • Partitioning, split/select 和 union 并不會在StreamNode中增加一個真實(shí)的StreamNode,而是創(chuàng)建一個具有特殊屬性的虛擬節(jié)點(diǎn)舅锄,比如partitioning, selector等鞭达,也就是在邊上加了屬性信息。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末皇忿,一起剝皮案震驚了整個濱河市畴蹭,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌鳍烁,老刑警劉巖叨襟,帶你破解...
    沈念sama閱讀 206,602評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異幔荒,居然都是意外死亡糊闽,警方通過查閱死者的電腦和手機(jī)梳玫,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,442評論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來右犹,“玉大人提澎,你說我怎么就攤上這事∧盍矗” “怎么了盼忌?”我有些...
    開封第一講書人閱讀 152,878評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長钓账。 經(jīng)常有香客問我碴犬,道長,這世上最難降的妖魔是什么梆暮? 我笑而不...
    開封第一講書人閱讀 55,306評論 1 279
  • 正文 為了忘掉前任服协,我火速辦了婚禮,結(jié)果婚禮上啦粹,老公的妹妹穿的比我還像新娘偿荷。我一直安慰自己,他們只是感情好唠椭,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,330評論 5 373
  • 文/花漫 我一把揭開白布跳纳。 她就那樣靜靜地躺著,像睡著了一般贪嫂。 火紅的嫁衣襯著肌膚如雪寺庄。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,071評論 1 285
  • 那天力崇,我揣著相機(jī)與錄音斗塘,去河邊找鬼。 笑死亮靴,一個胖子當(dāng)著我的面吹牛馍盟,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播茧吊,決...
    沈念sama閱讀 38,382評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼贞岭,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了搓侄?” 一聲冷哼從身側(cè)響起瞄桨,我...
    開封第一講書人閱讀 37,006評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎讶踪,沒想到半個月后讲婚,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,512評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡俊柔,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,965評論 2 325
  • 正文 我和宋清朗相戀三年筹麸,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片雏婶。...
    茶點(diǎn)故事閱讀 38,094評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡物赶,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出留晚,到底是詐尸還是另有隱情酵紫,我是刑警寧澤,帶...
    沈念sama閱讀 33,732評論 4 323
  • 正文 年R本政府宣布错维,位于F島的核電站奖地,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏赋焕。R本人自食惡果不足惜参歹,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,283評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望隆判。 院中可真熱鬧犬庇,春花似錦、人聲如沸侨嘀。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,286評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽咬腕。三九已至欢峰,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間涨共,已是汗流浹背纽帖。 一陣腳步聲響...
    開封第一講書人閱讀 31,512評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留煞赢,地道東北人抛计。 一個月前我還...
    沈念sama閱讀 45,536評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像照筑,于是被迫代替她去往敵國和親吹截。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,828評論 2 345

推薦閱讀更多精彩內(nèi)容