Flink 源碼之JobGraph生成

Flink源碼分析系列文檔目錄

請點擊:Flink 源碼分析系列文檔目錄

JobGraph

相比StreamGraph灰蛙,JobGraph在生成的時候做出了一項優(yōu)化:將盡可能多的operator組合到同一個task中仗处,形成operator chain廓推。這樣以來黔帕,同一個chain中的operator運行在同一個線程中,可以顯著降低線程切換的性能開銷奋刽,并且能增大吞吐量和降低延遲砌溺。

Operator Chain

入口方法

StreamGraph的getJobGraph方法

@Override
public JobGraph getJobGraph(@Nullable JobID jobID) {
    return StreamingJobGraphGenerator.createJobGraph(this, jobID);
}

生成JobGraph的邏輯在StreamingJobGraphGenerator類中

StreamingJobGraphGenerator的createJobGraph方法如下所示:

public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) {
    return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();
}

繼續(xù)跟蹤阁将,發(fā)現(xiàn)創(chuàng)建JobGraph的主要邏輯如下所示:

private JobGraph createJobGraph() {
    // 進行一些校驗工作
    preValidate();

    // make sure that all vertices start immediately
    // 設(shè)置JobGraph的調(diào)度模式
    jobGraph.setScheduleMode(streamGraph.getScheduleMode());

    // Generate deterministic hashes for the nodes in order to identify them across
    // submission iff they didn't change.
    Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);

    // Generate legacy version hashes for backwards compatibility
    List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
    for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
        legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
    }

    Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();

    // 這里是重點潦刃,JobGraph的頂點和邊在這個方法中創(chuàng)建,并且嘗試將盡可能多的StreamNode聚合在一個JobGraph節(jié)點中懈叹。聚合條件稍后分析
    setChaining(hashes, legacyHashes, chainedOperatorHashes);

    // 設(shè)置物理邊界
    setPhysicalEdges();

    // 設(shè)置slot共享和coLocation乖杠。同一個coLocationGroup的task需要在同一個slot中運行
    setSlotSharingAndCoLocation();

    // 配置檢查點
    configureCheckpointing();

    JobGraphGenerator.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph);

    // set the ExecutionConfig last when it has been finalized
    try {
        // 設(shè)置運行時配置
        jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
    }
    catch (IOException e) {
        throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
                "This indicates that non-serializable types (like custom serializers) were registered");
    }

    return jobGraph;
}

其中最為重要的是setChaining方法。該方法為StreamGraph中的每個source節(jié)點生成Job Vertex(chain)澄成。

private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
    for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
        createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes);
    }
}

Chain的概念:JobGraph最為重要的優(yōu)化方式為創(chuàng)建OperatorChain胧洒,可以盡可能的多整合一些操作在同一個節(jié)點中完成,避免不必要的線程切換和網(wǎng)絡(luò)通信墨状。

createChain方法的主要邏輯:

  1. 如果stream具有多個sources卫漫,遍歷每一個sources瘩例,調(diào)用createChain方法蜓斧。
  2. createChain方法的兩個參數(shù)startNodeId和currentNodeId,如果這兩個參數(shù)形同骇窍,意味著一個新chain的創(chuàng)建镐确。如果這兩個參數(shù)不相同包吝,則將startNode和currentNode構(gòu)造在同一個chain中饼煞。
  3. 使用一個變量builtVertices保證各個StreamNode沒有被重復(fù)處理。
  4. 處理流程將各個節(jié)點的出邊(out edge)分類诗越。分類的依據(jù)為isChainable函數(shù)砖瞧。
  5. 出邊分為3類,可以被chain和不可以被chain的嚷狞,還有一種(transitiveOutEdges)是在遞歸調(diào)用createChain的時候加入块促,目的是存放整個chain所有的出邊(在構(gòu)造chain的時候,遇到一個無法被chain的節(jié)點床未,則意味著該chain已經(jīng)結(jié)束竭翠,這個無法被chain的StreamEdge就是這個chain的出邊)。
  6. createChain方法會遞歸調(diào)用即硼。如果某個StreamNode的出邊可以chain逃片,則調(diào)用createChain方法連接這個節(jié)點(chain的起始節(jié)點)和這個節(jié)點可以被chain的出邊指向的節(jié)點,一直遞歸到出邊不可chain為止只酥。
  7. 遇到不可chain的節(jié)點褥实,會創(chuàng)建一個job vertex。
  8. 同一個chain中的start node和chain內(nèi)的節(jié)點之間operator的關(guān)系在chainedOperatorHashes變量中保存裂允,結(jié)構(gòu)為Map<startNodeID, List<Tuple2<StartNodeHash, currentNodeHash>>>
  9. 每一個Stream Node(無論有沒有對應(yīng)的job vertex)的配置信息在config變量中损离。setVertexConfig方法負責設(shè)置config變量。
  10. 通過ChainedConfig變量來保存chain的起始節(jié)點和chain內(nèi)各個節(jié)點配置的對應(yīng)關(guān)系绝编。ChainedConfig結(jié)構(gòu)為Map<startNodeID, Map<currentNodeID, Config>>僻澎。
  11. 調(diào)用connect方法將每個job vertex(chain)和下一個連接起來。比如節(jié)點A和B相連十饥,會現(xiàn)在A后追加一個Intermediate DateSet窟勃,然后是Job Edge,最后連接到B節(jié)點逗堵。

createChain代碼如下所示:

private List<StreamEdge> createChain(
        Integer startNodeId,
        Integer currentNodeId,
        Map<Integer, byte[]> hashes,
        List<Map<Integer, byte[]>> legacyHashes,
        int chainIndex,
        Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {

    // builtVertices存放了已經(jīng)被構(gòu)建了的StreamNode ID秉氧,避免重復(fù)操作
    if (!builtVertices.contains(startNodeId)) {

        // 存儲整個chain所有的出邊
        List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
        // 存儲可以被chain的StreamEdge
        List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
        // 存儲可以不可以被chain的StreamEdge
        List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();

        // 獲取當前處理node
        StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);

        // 分類可以被chain的edge和不可被chain的edge,使用isChainable的方法判斷
        for (StreamEdge outEdge : currentNode.getOutEdges()) {
            if (isChainable(outEdge, streamGraph)) {
                chainableOutputs.add(outEdge);
            } else {
                nonChainableOutputs.add(outEdge);
            }
        }

        for (StreamEdge chainable : chainableOutputs) {
            // 如果是可被chain的StreamEdge蜒秤,遞歸調(diào)用createChain
            // 注意currentNode是chainable.getTargetId()
            // 遞歸直到currentNode的out edge為不可chain的edge汁咏,會執(zhí)行下一段for循環(huán),不可chain的邊被加入transitiveOutEdges作媚,最終返回到遞歸最外層
            // 這樣以來攘滩,transitiveOutEdges收集齊了整個chain所有的出邊
            transitiveOutEdges.addAll(
                    createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
        }

        for (StreamEdge nonChainable : nonChainableOutputs) {
            // 如果是不可被chain的StreamEdge,添加到transitiveOutEdges集合中
            transitiveOutEdges.add(nonChainable);
            // 調(diào)用createChain纸泡,構(gòu)建新的chain
            createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
        }

        List<Tuple2<byte[], byte[]>> operatorHashes =
            chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());

        byte[] primaryHashBytes = hashes.get(currentNodeId);
        OperatorID currentOperatorId = new OperatorID(primaryHashBytes);

        for (Map<Integer, byte[]> legacyHash : legacyHashes) {
            operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
        }

        // 設(shè)置chain的名字
        chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
        // 設(shè)置chain的最小資源
        chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
        // 設(shè)置chain的最小資源
        chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));

        if (currentNode.getInputFormat() != null) {
            getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
        }

        if (currentNode.getOutputFormat() != null) {
            getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
        }

        // 如果currentNodeId和startNodeId相等漂问,說明需要創(chuàng)建一個新的chain,會生成一個JobVertex
        StreamConfig config = currentNodeId.equals(startNodeId)
                ? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
                : new StreamConfig(new Configuration());

        // 設(shè)置的頂點屬性到config中
        setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);

        if (currentNodeId.equals(startNodeId)) {

            // 意味著一個新chain的開始
            config.setChainStart();
            config.setChainIndex(0);
            config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
            config.setOutEdgesInOrder(transitiveOutEdges);
            config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());

            // 對于每一個chain,把它和指向下一個chain的出邊連接起來
            for (StreamEdge edge : transitiveOutEdges) {
                connect(startNodeId, edge);
            }

            config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));

        } else {
            chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());

            config.setChainIndex(chainIndex);
            // 獲取到被chain的節(jié)點
            StreamNode node = streamGraph.getStreamNode(currentNodeId);
            config.setOperatorName(node.getOperatorName());
            // 關(guān)聯(lián)chain內(nèi)節(jié)點的配置信息到chain的起始節(jié)點上
            chainedConfigs.get(startNodeId).put(currentNodeId, config);
        }

        config.setOperatorID(currentOperatorId);

        if (chainableOutputs.isEmpty()) {
            config.setChainEnd();
        }
        return transitiveOutEdges;

    } else {
        return new ArrayList<>();
    }
}

isChainable方法级解,這個方法很重要冒黑。用于判斷某個邊兩頭連接的StreamNode的node是否可以組成OperatorChain。方法如下所示:

public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
    StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
    StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

    StreamOperator<?> headOperator = upStreamVertex.getOperator();
    StreamOperator<?> outOperator = downStreamVertex.getOperator();

    return downStreamVertex.getInEdges().size() == 1
            && outOperator != null
            && headOperator != null
            && upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
            && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
            && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
                headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
            && (edge.getPartitioner() instanceof ForwardPartitioner)
            && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
            && streamGraph.isChainingEnabled();
}

總結(jié)起來勤哗,可以chain的條件如下(都必須滿足):

  1. 下游節(jié)點的前置節(jié)點有且只能有1個抡爹。
  2. 該Edge的上游和下游節(jié)點必須存在。
  3. 上游節(jié)點和下游節(jié)點位于同一個SlotSharingGroup中芒划。
  4. 下游的chain策略為ChainingStrategy.ALWAYS冬竟。
  5. 上游的chain策略為ChainingStrategy.ALWAYS或ChainingStrategy.HEAD。
  6. 使用ForwardPartitoner及其子類民逼。
  7. 上游和下游節(jié)點的并行度一致泵殴。
  8. chaining被啟用。

接下來是setPhysicalEdges方法拼苍。該方法負責設(shè)置job vertex的物理邊界笑诅。執(zhí)行步驟總結(jié)如下:

  1. 遍歷physicalEdgesInOrder對象,該對象包含了所有的不可被chain的出邊(在調(diào)用connect方法的時候edge被加入該集合)疮鲫。
  2. physicalInEdgesInOrder結(jié)構(gòu)為Map<不可chain的edge指向的下游節(jié)點,List<不可chain的edge>>吆你。
  3. 找到這些不可chain的edge指向的下游節(jié)點,設(shè)置物理邊界(該節(jié)點的入邊)
private void setPhysicalEdges() {
    Map<Integer, List<StreamEdge>> physicalInEdgesInOrder = new HashMap<Integer, List<StreamEdge>>();

    for (StreamEdge edge : physicalEdgesInOrder) {
        int target = edge.getTargetId();

        List<StreamEdge> inEdges = physicalInEdgesInOrder.computeIfAbsent(target, k -> new ArrayList<>());

        inEdges.add(edge);
    }

    for (Map.Entry<Integer, List<StreamEdge>> inEdges : physicalInEdgesInOrder.entrySet()) {
        int vertex = inEdges.getKey();
        List<StreamEdge> edgeList = inEdges.getValue();

        vertexConfigs.get(vertex).setInPhysicalEdges(edgeList);
    }
}

其余的方法對生成JobGraph過程的理解不是很重要俊犯,暫時不分析妇多,留在以后補充。

示例圖

JobGraph示意圖

注意StreamGraph的window和sink兩個節(jié)點被chain到了一起燕侠。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末者祖,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子绢彤,更是在濱河造成了極大的恐慌七问,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,265評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件茫舶,死亡現(xiàn)場離奇詭異烂瘫,居然都是意外死亡,警方通過查閱死者的電腦和手機奇适,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,078評論 2 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來芦鳍,“玉大人嚷往,你說我怎么就攤上這事∧疲” “怎么了皮仁?”我有些...
    開封第一講書人閱讀 156,852評論 0 347
  • 文/不壞的土叔 我叫張陵,是天一觀的道長。 經(jīng)常有香客問我贷祈,道長趋急,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,408評論 1 283
  • 正文 為了忘掉前任势誊,我火速辦了婚禮呜达,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘粟耻。我一直安慰自己查近,他們只是感情好,可當我...
    茶點故事閱讀 65,445評論 5 384
  • 文/花漫 我一把揭開白布挤忙。 她就那樣靜靜地躺著霜威,像睡著了一般。 火紅的嫁衣襯著肌膚如雪册烈。 梳的紋絲不亂的頭發(fā)上戈泼,一...
    開封第一講書人閱讀 49,772評論 1 290
  • 那天,我揣著相機與錄音赏僧,去河邊找鬼大猛。 笑死,一個胖子當著我的面吹牛次哈,可吹牛的內(nèi)容都是我干的胎署。 我是一名探鬼主播,決...
    沈念sama閱讀 38,921評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼窑滞,長吁一口氣:“原來是場噩夢啊……” “哼琼牧!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起哀卫,我...
    開封第一講書人閱讀 37,688評論 0 266
  • 序言:老撾萬榮一對情侶失蹤巨坊,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后此改,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體趾撵,經(jīng)...
    沈念sama閱讀 44,130評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,467評論 2 325
  • 正文 我和宋清朗相戀三年共啃,在試婚紗的時候發(fā)現(xiàn)自己被綠了占调。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,617評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡移剪,死狀恐怖究珊,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情纵苛,我是刑警寧澤剿涮,帶...
    沈念sama閱讀 34,276評論 4 329
  • 正文 年R本政府宣布言津,位于F島的核電站,受9級特大地震影響取试,放射性物質(zhì)發(fā)生泄漏悬槽。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,882評論 3 312
  • 文/蒙蒙 一瞬浓、第九天 我趴在偏房一處隱蔽的房頂上張望初婆。 院中可真熱鬧,春花似錦瑟蜈、人聲如沸烟逊。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,740評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽宪躯。三九已至,卻和暖如春位迂,著一層夾襖步出監(jiān)牢的瞬間访雪,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,967評論 1 265
  • 我被黑心中介騙來泰國打工掂林, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留臣缀,地道東北人。 一個月前我還...
    沈念sama閱讀 46,315評論 2 360
  • 正文 我出身青樓泻帮,卻偏偏與公主長得像精置,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子锣杂,可洞房花燭夜當晚...
    茶點故事閱讀 43,486評論 2 348

推薦閱讀更多精彩內(nèi)容

  • 1. 主要內(nèi)容 本文主要是將用戶寫的java程序如何生成Flink JobGraph的過程與邏輯追蹤了一下脂倦,歡迎有...
    ni_d58f閱讀 1,312評論 0 1
  • 在Flink中,由用戶代碼生成調(diào)度層圖結(jié)構(gòu)元莫,可以分成3步走:通過Stream API編寫的用戶代碼 -> Stre...
    MaQingxiang閱讀 4,486評論 0 18
  • 看到心愛的電視劇時真希望它永遠不要結(jié)束或者就停在他風華正茂的年紀里這樣就可以一直看下去了赖阻,仿佛你永遠跟他們活在一起...
    LIEYUWUJI閱讀 269評論 0 0
  • 有時候想到一些東西,但是因為某些原因踱蠢,某些東西可能發(fā)表不出來火欧,不管是微信朋友圈,抑或是微博的公眾平臺茎截,多多...
    木木胥叔閱讀 162評論 0 1
  • 今天是什么日子 起床:5:45 天氣:多云苇侵。 任務(wù)清單 習慣養(yǎng)成:今天衛(wèi)生大檢查,鋪子關(guān)門一天企锌。 昨天衛(wèi)生大掃除衅檀,...
    宋會兵閱讀 582評論 0 2