前言
這篇是昨晚沒寫完的鸣个,今晚補(bǔ)全發(fā)出來邻眷。
Flink算子鏈簡(jiǎn)介
“為什么我的Flink作業(yè)Web UI中只顯示出了一個(gè)框,并且Records Sent和Records Received指標(biāo)都是0?是我的程序?qū)懙糜袉栴}嗎臀突?”
筆者在Flink社區(qū)群里經(jīng)常能看到類似這樣的疑問。這種情況幾乎都不是程序有問題贾漏,而是因?yàn)镕link的operator chain——即算子鏈機(jī)制導(dǎo)致的候学,即提交的作業(yè)的執(zhí)行計(jì)劃中,所有算子的并發(fā)實(shí)例(即sub-task)都因?yàn)闈M足特定條件而串成了整體來執(zhí)行磕瓷,自然就觀察不到算子之間的數(shù)據(jù)流量了盒齿。
當(dāng)然上述是一種特殊情況。我們更常見到的是只有部分算子得到了算子鏈機(jī)制的優(yōu)化困食,如官方文檔中出現(xiàn)過多次的下圖所示边翁,注意Source和map()算子。
算子鏈機(jī)制的好處是顯而易見的:所有chain在一起的sub-task都會(huì)在同一個(gè)線程(即TaskManager的slot)中執(zhí)行硕盹,能夠減少不必要的數(shù)據(jù)交換符匾、序列化和上下文切換,從而提高作業(yè)的執(zhí)行效率瘩例。
鋪墊了這么多啊胶,接下來就通過源碼簡(jiǎn)單看看算子鏈產(chǎn)生的條件,以及它是如何在Flink Runtime中實(shí)現(xiàn)的垛贤。
邏輯計(jì)劃中的算子鏈
對(duì)Flink Runtime稍有了解的看官應(yīng)該知道焰坪,F(xiàn)link作業(yè)的執(zhí)行計(jì)劃會(huì)用三層圖結(jié)構(gòu)來表示,即:
- StreamGraph——原始邏輯執(zhí)行計(jì)劃
- JobGraph——優(yōu)化的邏輯執(zhí)行計(jì)劃(Web UI中看到的就是這個(gè))
- ExecutionGraph——物理執(zhí)行計(jì)劃
算子鏈?zhǔn)窃趦?yōu)化邏輯計(jì)劃時(shí)加入的聘惦,也就是由StreamGraph生成JobGraph的過程中某饰。那么我們來到負(fù)責(zé)生成JobGraph的o.a.f.streaming.api.graph.StreamingJobGraphGenerator類,查看其核心方法createJobGraph()的源碼善绎。
private JobGraph createJobGraph() {
// make sure that all vertices start immediately
jobGraph.setScheduleMode(streamGraph.getScheduleMode());
// Generate deterministic hashes for the nodes in order to identify them across
// submission iff they didn't change.
Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
// Generate legacy version hashes for backwards compatibility
List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
}
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
setChaining(hashes, legacyHashes, chainedOperatorHashes);
setPhysicalEdges();
// 略......
return jobGraph;
}
可見黔漂,該方法會(huì)先計(jì)算出StreamGraph中各個(gè)節(jié)點(diǎn)的哈希碼作為唯一標(biāo)識(shí),并創(chuàng)建一個(gè)空的Map結(jié)構(gòu)保存即將被鏈在一起的算子的哈希碼禀酱,然后調(diào)用setChaining()方法炬守,如下源碼所示。
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes);
}
}
可見是逐個(gè)遍歷StreamGraph中的Source節(jié)點(diǎn)剂跟,并調(diào)用createChain()方法减途。createChain()是邏輯計(jì)劃層創(chuàng)建算子鏈的核心方法酣藻,完整源碼如下,有點(diǎn)長(zhǎng)观蜗。
private List<StreamEdge> createChain(
Integer startNodeId,
Integer currentNodeId,
Map<Integer, byte[]> hashes,
List<Map<Integer, byte[]>> legacyHashes,
int chainIndex,
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
if (!builtVertices.contains(startNodeId)) {
List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
for (StreamEdge outEdge : currentNode.getOutEdges()) {
if (isChainable(outEdge, streamGraph)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
}
}
for (StreamEdge chainable : chainableOutputs) {
transitiveOutEdges.addAll(
createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
}
for (StreamEdge nonChainable : nonChainableOutputs) {
transitiveOutEdges.add(nonChainable);
createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
}
List<Tuple2<byte[], byte[]>> operatorHashes =
chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());
byte[] primaryHashBytes = hashes.get(currentNodeId);
OperatorID currentOperatorId = new OperatorID(primaryHashBytes);
for (Map<Integer, byte[]> legacyHash : legacyHashes) {
operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
}
chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
if (currentNode.getInputFormat() != null) {
getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
}
if (currentNode.getOutputFormat() != null) {
getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
}
StreamConfig config = currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
: new StreamConfig(new Configuration());
setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
if (currentNodeId.equals(startNodeId)) {
config.setChainStart();
config.setChainIndex(0);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
config.setOutEdgesInOrder(transitiveOutEdges);
config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
for (StreamEdge edge : transitiveOutEdges) {
connect(startNodeId, edge);
}
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
} else {
chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());
config.setChainIndex(chainIndex);
StreamNode node = streamGraph.getStreamNode(currentNodeId);
config.setOperatorName(node.getOperatorName());
chainedConfigs.get(startNodeId).put(currentNodeId, config);
}
config.setOperatorID(currentOperatorId);
if (chainableOutputs.isEmpty()) {
config.setChainEnd();
}
return transitiveOutEdges;
} else {
return new ArrayList<>();
}
}
先解釋一下方法開頭創(chuàng)建的3個(gè)List結(jié)構(gòu):
- transitiveOutEdges:當(dāng)前算子鏈在JobGraph中的出邊列表臊恋,同時(shí)也是createChain()方法的最終返回值;
- chainableOutputs:當(dāng)前能夠鏈在一起的StreamGraph邊列表墓捻;
- nonChainableOutputs:當(dāng)前不能夠鏈在一起的StreamGraph邊列表抖仅。
接下來,從Source開始遍歷StreamGraph中當(dāng)前節(jié)點(diǎn)的所有出邊砖第,調(diào)用isChainable()方法判斷是否可以被鏈在一起(這個(gè)判斷邏輯稍后會(huì)講到)撤卢。可以鏈接的出邊被放入chainableOutputs列表梧兼,否則放入nonChainableOutputs列表放吩。
對(duì)于chainableOutputs中的邊,就會(huì)以這些邊的直接下游為起點(diǎn)羽杰,繼續(xù)遞歸調(diào)用createChain()方法延展算子鏈渡紫。對(duì)于nonChainableOutputs中的邊,由于當(dāng)前算子鏈的延展已經(jīng)到頭考赛,就會(huì)以這些“斷點(diǎn)”為起點(diǎn)惕澎,繼續(xù)遞歸調(diào)用createChain()方法試圖創(chuàng)建新的算子鏈。也就是說颜骤,邏輯計(jì)劃中整個(gè)創(chuàng)建算子鏈的過程都是遞歸的唧喉,亦即實(shí)際返回時(shí),是從Sink端開始返回的忍抽。
然后要判斷當(dāng)前節(jié)點(diǎn)是不是算子鏈的起始節(jié)點(diǎn)八孝。如果是,則調(diào)用createJobVertex()方法為算子鏈創(chuàng)建一個(gè)JobVertex(即JobGraph中的節(jié)點(diǎn))鸠项,也就形成了我們?cè)赪eb UI中看到的JobGraph效果:
最后干跛,還需要將各個(gè)節(jié)點(diǎn)的算子鏈數(shù)據(jù)寫入各自的StreamConfig中,算子鏈的起始節(jié)點(diǎn)要額外保存下transitiveOutEdges祟绊。StreamConfig在后文的物理執(zhí)行階段會(huì)再次用到楼入。
形成算子鏈的條件
來看看isChainable()方法的代碼。
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
StreamOperatorFactory<?> headOperator = upStreamVertex.getOperatorFactory();
StreamOperatorFactory<?> outOperator = downStreamVertex.getOperatorFactory();
return downStreamVertex.getInEdges().size() == 1
&& outOperator != null
&& headOperator != null
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& edge.getShuffleMode() != ShuffleMode.BATCH
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled();
}
由此可得久免,上下游算子能夠chain在一起的條件還是非常苛刻的(老生常談了)扭弧,列舉如下:
- 上下游算子實(shí)例處于同一個(gè)SlotSharingGroup中(之后再提)阎姥;
- 下游算子的鏈接策略(ChainingStrategy)為ALWAYS——既可以與上游鏈接,也可以與下游鏈接鸽捻。我們常見的map()呼巴、filter()等都屬此類泽腮;
- 上游算子的鏈接策略為HEAD或ALWAYS。HEAD策略表示只能與下游鏈接衣赶,這在正常情況下是Source算子的專屬诊赊;
- 兩個(gè)算子間的物理分區(qū)邏輯是ForwardPartitioner,可參見之前寫過的《聊聊Flink DataStream的八種物理分區(qū)邏輯》府瞄;
- 兩個(gè)算子間的shuffle方式不是批處理模式碧磅;
- 上下游算子實(shí)例的并行度相同;
- 沒有禁用算子鏈遵馆。
禁用算子鏈
用戶可以在一個(gè)算子上調(diào)用startNewChain()方法強(qiáng)制開始一個(gè)新的算子鏈鲸郊,或者調(diào)用disableOperatorChaining()方法指定它不參與算子鏈。代碼位于SingleOutputStreamOperator類中货邓,都是通過改變算子的鏈接策略實(shí)現(xiàn)的秆撮。
@PublicEvolving
public SingleOutputStreamOperator<T> disableChaining() {
return setChainingStrategy(ChainingStrategy.NEVER);
}
@PublicEvolving
public SingleOutputStreamOperator<T> startNewChain() {
return setChainingStrategy(ChainingStrategy.HEAD);
}
如果要在整個(gè)運(yùn)行時(shí)環(huán)境中禁用算子鏈,調(diào)用StreamExecutionEnvironment.disableOperatorChaining()方法即可换况。
物理計(jì)劃中的算子鏈
在JobGraph轉(zhuǎn)換成ExecutionGraph并交由TaskManager執(zhí)行之后职辨,會(huì)生成調(diào)度執(zhí)行的基本任務(wù)單元——StreamTask,負(fù)責(zé)執(zhí)行具體的StreamOperator邏輯戈二。在StreamTask.invoke()方法中舒裤,初始化了狀態(tài)后端、checkpoint存儲(chǔ)和定時(shí)器服務(wù)之后挽拂,可以發(fā)現(xiàn):
operatorChain = new OperatorChain<>(this, recordWriters);
headOperator = operatorChain.getHeadOperator();
構(gòu)造出了一個(gè)OperatorChain實(shí)例惭每,這就是算子鏈在實(shí)際執(zhí)行時(shí)的形態(tài)。解釋一下OperatorChain中的幾個(gè)主要屬性亏栈。
private final StreamOperator<?>[] allOperators;
private final RecordWriterOutput<?>[] streamOutputs;
private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint;
private final OP headOperator;
- headOperator:算子鏈的第一個(gè)算子台腥,對(duì)應(yīng)JobGraph中的算子鏈起始節(jié)點(diǎn);
- allOperators:算子鏈中的所有算子绒北,倒序排列黎侈,即headOperator位于該數(shù)組的末尾;
- streamOutputs:算子鏈的輸出闷游,可以有多個(gè)峻汉;
- chainEntryPoint:算子鏈的“入口點(diǎn)”,它的含義將在后文說明脐往。
由上可知休吠,所有StreamTask都會(huì)創(chuàng)建OperatorChain。如果一個(gè)算子無法進(jìn)入算子鏈业簿,也會(huì)形成一個(gè)只有headOperator的單個(gè)算子的OperatorChain瘤礁。
OperatorChain構(gòu)造方法中的核心代碼如下。
for (int i = 0; i < outEdgesInOrder.size(); i++) {
StreamEdge outEdge = outEdgesInOrder.get(i);
RecordWriterOutput<?> streamOutput = createStreamOutput(
recordWriters.get(i),
outEdge,
chainedConfigs.get(outEdge.getSourceId()),
containingTask.getEnvironment());
this.streamOutputs[i] = streamOutput;
streamOutputMap.put(outEdge, streamOutput);
}
// we create the chain of operators and grab the collector that leads into the chain
List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());
this.chainEntryPoint = createOutputCollector(
containingTask,
configuration,
chainedConfigs,
userCodeClassloader,
streamOutputMap,
allOps);
if (operatorFactory != null) {
WatermarkGaugeExposingOutput<StreamRecord<OUT>> output = getChainEntryPoint();
headOperator = operatorFactory.createStreamOperator(containingTask, configuration, output);
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, output.getWatermarkGauge());
} else {
headOperator = null;
}
// add head operator to end of chain
allOps.add(headOperator);
this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size()]);
首先會(huì)遍歷算子鏈整體的所有出邊梅尤,并調(diào)用createStreamOutput()方法創(chuàng)建對(duì)應(yīng)的下游輸出RecordWriterOutput柜思。然后就會(huì)調(diào)用createOutputCollector()方法創(chuàng)建物理的算子鏈岩调,并返回chainEntryPoint,這個(gè)方法比較重要赡盘,部分代碼如下号枕。
private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(
StreamTask<?, ?> containingTask,
StreamConfig operatorConfig,
Map<Integer, StreamConfig> chainedConfigs,
ClassLoader userCodeClassloader,
Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
List<StreamOperator<?>> allOperators) {
List<Tuple2<WatermarkGaugeExposingOutput<StreamRecord<T>>, StreamEdge>> allOutputs = new ArrayList<>(4);
// create collectors for the network outputs
for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
@SuppressWarnings("unchecked")
RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);
allOutputs.add(new Tuple2<>(output, outputEdge));
}
// Create collectors for the chained outputs
for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
int outputId = outputEdge.getTargetId();
StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
WatermarkGaugeExposingOutput<StreamRecord<T>> output = createChainedOperator(
containingTask,
chainedOpConfig,
chainedConfigs,
userCodeClassloader,
streamOutputs,
allOperators,
outputEdge.getOutputTag());
allOutputs.add(new Tuple2<>(output, outputEdge));
}
// 以下略......
}
該方法從上一節(jié)提到的StreamConfig中分別取出出邊和鏈接邊的數(shù)據(jù),并創(chuàng)建各自的Output陨享。出邊的Output就是將數(shù)據(jù)發(fā)往算子鏈之外下游的RecordWriterOutput葱淳,而鏈接邊的輸出要靠createChainedOperator()方法。
private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createChainedOperator(
StreamTask<?, ?> containingTask,
StreamConfig operatorConfig,
Map<Integer, StreamConfig> chainedConfigs,
ClassLoader userCodeClassloader,
Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
List<StreamOperator<?>> allOperators,
OutputTag<IN> outputTag) {
// create the output that the operator writes to first. this may recursively create more operators
WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainedOperatorOutput = createOutputCollector(
containingTask,
operatorConfig,
chainedConfigs,
userCodeClassloader,
streamOutputs,
allOperators);
// now create the operator and give it the output collector to write its output to
StreamOperatorFactory<OUT> chainedOperatorFactory = operatorConfig.getStreamOperatorFactory(userCodeClassloader);
OneInputStreamOperator<IN, OUT> chainedOperator = chainedOperatorFactory.createStreamOperator(
containingTask, operatorConfig, chainedOperatorOutput);
allOperators.add(chainedOperator);
WatermarkGaugeExposingOutput<StreamRecord<IN>> currentOperatorOutput;
if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
currentOperatorOutput = new ChainingOutput<>(chainedOperator, this, outputTag);
}
else {
TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);
currentOperatorOutput = new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this);
}
// wrap watermark gauges since registered metrics must be unique
chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, currentOperatorOutput.getWatermarkGauge()::getValue);
chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, chainedOperatorOutput.getWatermarkGauge()::getValue);
return currentOperatorOutput;
}
我們一眼就可以看到霉咨,這個(gè)方法遞歸調(diào)用了上述createOutputCollector()方法蛙紫,與邏輯計(jì)劃階段類似,通過不斷延伸Output來產(chǎn)生chainedOperator(即算子鏈中除了headOperator之外的算子)途戒,并逆序返回坑傅,這也是allOperators數(shù)組中的算子順序?yàn)榈剐虻脑颉?/p>
chainedOperator產(chǎn)生之后,將它們通過ChainingOutput連接起來喷斋,形成如下圖所示的結(jié)構(gòu)唁毒。
最后來看看ChainingOutput.collect()方法是如何輸出數(shù)據(jù)流的。
@Override
public void collect(StreamRecord<T> record) {
if (this.outputTag != null) {
// we are only responsible for emitting to the main input
return;
}
pushToOperator(record);
}
@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
// we are only responsible for emitting to the side-output specified by our
// OutputTag.
return;
}
pushToOperator(record);
}
protected <X> void pushToOperator(StreamRecord<X> record) {
try {
// we know that the given outputTag matches our OutputTag so the record
// must be of the type that our operator expects.
@SuppressWarnings("unchecked")
StreamRecord<T> castRecord = (StreamRecord<T>) record;
numRecordsIn.inc();
operator.setKeyContextElement1(castRecord);
operator.processElement(castRecord);
}
catch (Exception e) {
throw new ExceptionInChainedOperatorException(e);
}
}
可見是通過調(diào)用鏈接算子的processElement()方法星爪,直接將數(shù)據(jù)推給下游處理了浆西。也就是說,OperatorChain完全可以看做一個(gè)由headOperator和streamOutputs組成的單個(gè)算子顽腾,其內(nèi)部的chainedOperator和ChainingOutput都像是被黑盒遮蔽近零,同時(shí)沒有引入任何overhead。
打通了算子鏈在執(zhí)行層的邏輯抄肖,看官應(yīng)該會(huì)明白chainEntryPoint的含義了久信。由于它位于遞歸返回的終點(diǎn),所以它就是流入算子鏈的起始Output漓摩,即上圖中指向headOperator的RecordWriterOutput裙士。
The End
上半年過去了,希望下半年能夠多些好事發(fā)生管毙。
民那晚安腿椎。