Chaining operators together into tasks is a useful optimization: it reduces the overhead of thread-to-thread handover and buffering, and increases overall throughput while decreasing latency.
上面是官方對Operator Chains的解釋厌小,以及示意圖,那么看到這個圖的時候會產(chǎn)生一個疑問扑浸,什么場景什么樣的算子會產(chǎn)生Operator Chains?本文將詳細解答這個疑問。
Flink job執(zhí)行作業(yè)過程
下面是一段樣例代碼贝淤,傳送門
public class TaskDemo {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
env.addSource(new DataSource())
.map(new MyMapFunction())
.keyBy(0)
.process(new MyKeyedProcessFunction())
.addSink(new DataSink()).setParallelism(1).name("Custom Sink");
env.execute();
}
}
從這樣一段Program到flink真正的執(zhí)行系羞,會經(jīng)過如下一個流程
Program-> StreamGraph-> JobGraph-> ExecutionGraph加缘,那么生成Operator Chains就是在StreamGraph-> JobGraph這個階段。
Operator Chains生成規(guī)則
StreamingJobGraphGenerator這個類觉啊,是負責(zé)將StreamGraph轉(zhuǎn)成JobGraph。其中isChainable
方法就是用來判斷operator之間是否可以構(gòu)成chain沈贝。
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
StreamOperatorFactory<?> headOperator = upStreamVertex.getOperatorFactory();
StreamOperatorFactory<?> outOperator = downStreamVertex.getOperatorFactory();
return downStreamVertex.getInEdges().size() == 1
&& outOperator != null
&& headOperator != null
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& edge.getShuffleMode() != ShuffleMode.BATCH
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled();
}
可以看到是否可以組成chain判斷了10個條件杠人,有任何一個不滿足,兩個operator就不能構(gòu)成一個chain宋下。在分析這10個條件之前嗡善,讀者需要明白,在flink中把程序轉(zhuǎn)化成了一個有向圖学歧,這個圖的頂點就是每個operator罩引,邊作為operator之間的連接關(guān)系有很多的屬性,比如outputPartitioner
等枝笨。
1. downStreamVertex.getInEdges().size() == 1
下游算子的入邊只有一個袁铐,也就是說它的上游的算子只能有一個。比如上面的樣例程序横浑,map
的上游算子是source
剔桨,而且只有一個source,就滿足這個條件徙融。如果是下面的程序洒缀,map的上游有兩個source,即downStreamVertex.getInEdges()=2
欺冀,這樣source
和map
兩個算子就不能構(gòu)成一個chain树绩。
public class UnionDemo {
public static void main(String[] args) throws Exception{
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
DataStream<Tuple2<String, Integer>> orangeStream = env.addSource(new DataSource("orangeStream"));
DataStream<Tuple2<String, Integer>> greenStream = env.addSource(new DataSource("greenStream"));
orangeStream.union(greenStream).map(new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
return value;
}
}).print("union");
env.execute("Union Demo");
}
}
2.outOperator != null
表示下游頂點的算子不能為null,例如TaskDemo樣例中map頂點的算子就是StreamMap隐轩。
3.headOperator != null
表示上游頂點的算子不能為null饺饭,例如TaskDemo樣例中source頂點的算子就是StreamSource。
4.upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
這個條件的意思是兩個頂點在同一個槽位共享組中职车。在StreamGraphGenerator#determineSlotSharingGroup
中確定了槽位共享組
/**
* Determines the slot sharing group for an operation based on the slot sharing group set by
* the user and the slot sharing groups of the inputs.
*
* <p>If the user specifies a group name, this is taken as is. If nothing is specified and
* the input operations all have the same group name then this name is taken. Otherwise the
* default group is chosen.
*
* @param specifiedGroup The group specified by the user.
* @param inputIds The IDs of the input operations.
*/
private String determineSlotSharingGroup(String specifiedGroup, Collection<Integer> inputIds) {
if (!isSlotSharingEnabled) {
return null;
}
if (specifiedGroup != null) {
return specifiedGroup;
} else {
String inputGroup = null;
for (int id: inputIds) {
String inputGroupCandidate = streamGraph.getSlotSharingGroup(id);
if (inputGroup == null) {
inputGroup = inputGroupCandidate;
} else if (!inputGroup.equals(inputGroupCandidate)) {
return DEFAULT_SLOT_SHARING_GROUP;
}
}
return inputGroup == null ? DEFAULT_SLOT_SHARING_GROUP : inputGroup;
}
}
官方對槽位共享組的描述如下:
Set the slot sharing group of an operation. Flink will put operations with the same slot sharing group into the same slot while keeping operations that don't have the slot sharing group in other slots. This can be used to isolate slots. The slot sharing group is inherited from input operations if all input operations are in the same slot sharing group. The name of the default slot sharing group is "default", operations can explicitly be put into this group by calling slotSharingGroup("default").
5.outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
下游頂點的算子連接策略是ALWAYS砰奕,默認是ALWAYS,如果算子調(diào)用了disableChaining()
提鸟,會設(shè)置為NEVER军援;如果算子調(diào)用了startNewChain()
,會設(shè)置為HEAD称勋。
6.(headOperator.getChainingStrategy()==ChainingStrategy.HEAD || headOperator.getChainingStrategy()==ChainingStrategy.ALWAYS)
胸哥,和上一條一樣,上游頂點的算子連接策略必須是HEAD或者ALWAYS赡鲜。
7.(edge.getPartitioner() instanceof ForwardPartitioner)
這個表示邊的Partitioner必須是ForwardPartitioner空厌,這個是在生成StreamGraph的時候庐船,在StreamGraph#addEdgeInternal
中確定的,詳細的過程可以參見這個方法嘲更。
8.edge.getShuffleMode() != ShuffleMode.BATCH
筐钟,shuffle模式不能是BATCH,一共有3種模式赋朦,shuffle模式的確定也是在StreamGraph#addEdgeInternal
中篓冲。
/**
* The shuffle mode defines the data exchange mode between operators.
*/
@PublicEvolving
public enum ShuffleMode {
/**
* Producer and consumer are online at the same time.
* Produced data is received by consumer immediately.
*/
PIPELINED,
/**
* The producer first produces its entire result and finishes.
* After that, the consumer is started and may consume the data.
*/
BATCH,
/**
* The shuffle mode is undefined. It leaves it up to the framework to decide the shuffle mode.
* The framework will pick one of {@link ShuffleMode#BATCH} or {@link ShuffleMode#PIPELINED} in
* the end.
*/
UNDEFINED
}
9.upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
上游頂點的并行度和下游的要一樣。例如上面TaskDemo代碼中source和map的并行度都是1宠哄,所以可以構(gòu)成一個chain壹将,如果將map設(shè)置為2,那么他們就不能構(gòu)成一個chain毛嫉。
10.streamGraph.isChainingEnabled()
默認這個值是true诽俯,如果調(diào)用了StreamExecutionEnvironment.disableOperatorChaining()
那么streamGraph.isChainingEnabled()
返回值就是false。
總結(jié)
本文詳細的分析了算子之間可以生成operator chain的條件承粤,對于Partitioner和ShuffleMode并沒有展開說明暴区,后續(xù)文章會對這部分進行補充。