Flink源碼分析系列文檔目錄
請點擊:Flink 源碼分析系列文檔目錄
JobGraph
相比StreamGraph灰蛙,JobGraph在生成的時候做出了一項優(yōu)化:將盡可能多的operator組合到同一個task中仗处,形成operator chain廓推。這樣以來黔帕,同一個chain中的operator運行在同一個線程中,可以顯著降低線程切換的性能開銷奋刽,并且能增大吞吐量和降低延遲砌溺。
入口方法
StreamGraph的getJobGraph
方法
@Override
public JobGraph getJobGraph(@Nullable JobID jobID) {
return StreamingJobGraphGenerator.createJobGraph(this, jobID);
}
生成JobGraph的邏輯在StreamingJobGraphGenerator
類中
StreamingJobGraphGenerator的createJobGraph方法如下所示:
public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) {
return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();
}
繼續(xù)跟蹤阁将,發(fā)現(xiàn)創(chuàng)建JobGraph的主要邏輯如下所示:
private JobGraph createJobGraph() {
// 進行一些校驗工作
preValidate();
// make sure that all vertices start immediately
// 設(shè)置JobGraph的調(diào)度模式
jobGraph.setScheduleMode(streamGraph.getScheduleMode());
// Generate deterministic hashes for the nodes in order to identify them across
// submission iff they didn't change.
Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
// Generate legacy version hashes for backwards compatibility
List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
}
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
// 這里是重點潦刃,JobGraph的頂點和邊在這個方法中創(chuàng)建,并且嘗試將盡可能多的StreamNode聚合在一個JobGraph節(jié)點中懈叹。聚合條件稍后分析
setChaining(hashes, legacyHashes, chainedOperatorHashes);
// 設(shè)置物理邊界
setPhysicalEdges();
// 設(shè)置slot共享和coLocation乖杠。同一個coLocationGroup的task需要在同一個slot中運行
setSlotSharingAndCoLocation();
// 配置檢查點
configureCheckpointing();
JobGraphGenerator.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph);
// set the ExecutionConfig last when it has been finalized
try {
// 設(shè)置運行時配置
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
}
catch (IOException e) {
throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
"This indicates that non-serializable types (like custom serializers) were registered");
}
return jobGraph;
}
其中最為重要的是setChaining方法。該方法為StreamGraph中的每個source節(jié)點生成Job Vertex(chain)澄成。
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes);
}
}
Chain的概念:JobGraph最為重要的優(yōu)化方式為創(chuàng)建OperatorChain胧洒,可以盡可能的多整合一些操作在同一個節(jié)點中完成,避免不必要的線程切換和網(wǎng)絡(luò)通信墨状。
createChain方法的主要邏輯:
- 如果stream具有多個sources卫漫,遍歷每一個sources瘩例,調(diào)用createChain方法蜓斧。
- createChain方法的兩個參數(shù)startNodeId和currentNodeId,如果這兩個參數(shù)形同骇窍,意味著一個新chain的創(chuàng)建镐确。如果這兩個參數(shù)不相同包吝,則將startNode和currentNode構(gòu)造在同一個chain中饼煞。
- 使用一個變量builtVertices保證各個StreamNode沒有被重復(fù)處理。
- 處理流程將各個節(jié)點的出邊(out edge)分類诗越。分類的依據(jù)為isChainable函數(shù)砖瞧。
- 出邊分為3類,可以被chain和不可以被chain的嚷狞,還有一種(transitiveOutEdges)是在遞歸調(diào)用createChain的時候加入块促,目的是存放整個chain所有的出邊(在構(gòu)造chain的時候,遇到一個無法被chain的節(jié)點床未,則意味著該chain已經(jīng)結(jié)束竭翠,這個無法被chain的StreamEdge就是這個chain的出邊)。
- createChain方法會遞歸調(diào)用即硼。如果某個StreamNode的出邊可以chain逃片,則調(diào)用createChain方法連接這個節(jié)點(chain的起始節(jié)點)和這個節(jié)點可以被chain的出邊指向的節(jié)點,一直遞歸到出邊不可chain為止只酥。
- 遇到不可chain的節(jié)點褥实,會創(chuàng)建一個job vertex。
- 同一個chain中的start node和chain內(nèi)的節(jié)點之間operator的關(guān)系在chainedOperatorHashes變量中保存裂允,結(jié)構(gòu)為
Map<startNodeID, List<Tuple2<StartNodeHash, currentNodeHash>>>
- 每一個Stream Node(無論有沒有對應(yīng)的job vertex)的配置信息在config變量中损离。
setVertexConfig
方法負責設(shè)置config變量。 - 通過
ChainedConfig
變量來保存chain的起始節(jié)點和chain內(nèi)各個節(jié)點配置的對應(yīng)關(guān)系绝编。ChainedConfig
結(jié)構(gòu)為Map<startNodeID, Map<currentNodeID, Config>>
僻澎。 - 調(diào)用connect方法將每個job vertex(chain)和下一個連接起來。比如節(jié)點A和B相連十饥,會現(xiàn)在A后追加一個Intermediate DateSet窟勃,然后是Job Edge,最后連接到B節(jié)點逗堵。
createChain代碼如下所示:
private List<StreamEdge> createChain(
Integer startNodeId,
Integer currentNodeId,
Map<Integer, byte[]> hashes,
List<Map<Integer, byte[]>> legacyHashes,
int chainIndex,
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
// builtVertices存放了已經(jīng)被構(gòu)建了的StreamNode ID秉氧,避免重復(fù)操作
if (!builtVertices.contains(startNodeId)) {
// 存儲整個chain所有的出邊
List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
// 存儲可以被chain的StreamEdge
List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
// 存儲可以不可以被chain的StreamEdge
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
// 獲取當前處理node
StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
// 分類可以被chain的edge和不可被chain的edge,使用isChainable的方法判斷
for (StreamEdge outEdge : currentNode.getOutEdges()) {
if (isChainable(outEdge, streamGraph)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
}
}
for (StreamEdge chainable : chainableOutputs) {
// 如果是可被chain的StreamEdge蜒秤,遞歸調(diào)用createChain
// 注意currentNode是chainable.getTargetId()
// 遞歸直到currentNode的out edge為不可chain的edge汁咏,會執(zhí)行下一段for循環(huán),不可chain的邊被加入transitiveOutEdges作媚,最終返回到遞歸最外層
// 這樣以來攘滩,transitiveOutEdges收集齊了整個chain所有的出邊
transitiveOutEdges.addAll(
createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
}
for (StreamEdge nonChainable : nonChainableOutputs) {
// 如果是不可被chain的StreamEdge,添加到transitiveOutEdges集合中
transitiveOutEdges.add(nonChainable);
// 調(diào)用createChain纸泡,構(gòu)建新的chain
createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
}
List<Tuple2<byte[], byte[]>> operatorHashes =
chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());
byte[] primaryHashBytes = hashes.get(currentNodeId);
OperatorID currentOperatorId = new OperatorID(primaryHashBytes);
for (Map<Integer, byte[]> legacyHash : legacyHashes) {
operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
}
// 設(shè)置chain的名字
chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
// 設(shè)置chain的最小資源
chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
// 設(shè)置chain的最小資源
chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
if (currentNode.getInputFormat() != null) {
getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
}
if (currentNode.getOutputFormat() != null) {
getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
}
// 如果currentNodeId和startNodeId相等漂问,說明需要創(chuàng)建一個新的chain,會生成一個JobVertex
StreamConfig config = currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
: new StreamConfig(new Configuration());
// 設(shè)置的頂點屬性到config中
setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
if (currentNodeId.equals(startNodeId)) {
// 意味著一個新chain的開始
config.setChainStart();
config.setChainIndex(0);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
config.setOutEdgesInOrder(transitiveOutEdges);
config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
// 對于每一個chain,把它和指向下一個chain的出邊連接起來
for (StreamEdge edge : transitiveOutEdges) {
connect(startNodeId, edge);
}
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
} else {
chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());
config.setChainIndex(chainIndex);
// 獲取到被chain的節(jié)點
StreamNode node = streamGraph.getStreamNode(currentNodeId);
config.setOperatorName(node.getOperatorName());
// 關(guān)聯(lián)chain內(nèi)節(jié)點的配置信息到chain的起始節(jié)點上
chainedConfigs.get(startNodeId).put(currentNodeId, config);
}
config.setOperatorID(currentOperatorId);
if (chainableOutputs.isEmpty()) {
config.setChainEnd();
}
return transitiveOutEdges;
} else {
return new ArrayList<>();
}
}
isChainable方法级解,這個方法很重要冒黑。用于判斷某個邊兩頭連接的StreamNode的node是否可以組成OperatorChain。方法如下所示:
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
StreamOperator<?> headOperator = upStreamVertex.getOperator();
StreamOperator<?> outOperator = downStreamVertex.getOperator();
return downStreamVertex.getInEdges().size() == 1
&& outOperator != null
&& headOperator != null
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled();
}
總結(jié)起來勤哗,可以chain的條件如下(都必須滿足):
- 下游節(jié)點的前置節(jié)點有且只能有1個抡爹。
- 該Edge的上游和下游節(jié)點必須存在。
- 上游節(jié)點和下游節(jié)點位于同一個SlotSharingGroup中芒划。
- 下游的chain策略為ChainingStrategy.ALWAYS冬竟。
- 上游的chain策略為ChainingStrategy.ALWAYS或ChainingStrategy.HEAD。
- 使用ForwardPartitoner及其子類民逼。
- 上游和下游節(jié)點的并行度一致泵殴。
- chaining被啟用。
接下來是setPhysicalEdges方法拼苍。該方法負責設(shè)置job vertex的物理邊界笑诅。執(zhí)行步驟總結(jié)如下:
- 遍歷
physicalEdgesInOrder
對象,該對象包含了所有的不可被chain的出邊(在調(diào)用connect方法的時候edge被加入該集合)疮鲫。 -
physicalInEdgesInOrder
結(jié)構(gòu)為Map<不可chain的edge指向的下游節(jié)點,List<不可chain的edge>>
吆你。 - 找到這些不可chain的edge指向的下游節(jié)點,設(shè)置物理邊界(該節(jié)點的入邊)
private void setPhysicalEdges() {
Map<Integer, List<StreamEdge>> physicalInEdgesInOrder = new HashMap<Integer, List<StreamEdge>>();
for (StreamEdge edge : physicalEdgesInOrder) {
int target = edge.getTargetId();
List<StreamEdge> inEdges = physicalInEdgesInOrder.computeIfAbsent(target, k -> new ArrayList<>());
inEdges.add(edge);
}
for (Map.Entry<Integer, List<StreamEdge>> inEdges : physicalInEdgesInOrder.entrySet()) {
int vertex = inEdges.getKey();
List<StreamEdge> edgeList = inEdges.getValue();
vertexConfigs.get(vertex).setInPhysicalEdges(edgeList);
}
}
其余的方法對生成JobGraph過程的理解不是很重要俊犯,暫時不分析妇多,留在以后補充。
示例圖
注意StreamGraph的window和sink兩個節(jié)點被chain到了一起燕侠。