名詞解釋
StreamGraph
一個代碼用戶編碼結(jié)構(gòu)的拓?fù)浣Y(jié)構(gòu)(不是很準(zhǔn)確哼审,因為很多用戶編碼的算子沒有生成對應(yīng)的StreamNode,like: shuffle, split等)斩启,所以【一個還未經(jīng)過優(yōu)化處理的邏輯計劃】這樣描述會更加準(zhǔn)確。由Client端生成。(問題:StreamGraph → JobGraph這一步的轉(zhuǎn)換叫惊,優(yōu)化了什么帝洪?)
相關(guān)API
DataStream
A DataStream represents a stream of elements of the same type. A DataStream can be transformed into another DataStream by applying a transformation.
顧名思義似舵,data組成的stream;DataStream中持有Transformation<T> transformation葱峡,經(jīng)過該transformation砚哗,得到此DataStream(注意不是通過該transformation生成一個新的DataStream),意味著DataStream中持有的Transformation是該DataStream的來源砰奕;
Transformation
A Transformation represents the operation that creates a DataStream
(講述起來比較復(fù)雜)可以簡單理解成蛛芥,代碼中每個引用的算子,都是一個transformation军援;比如flatMap仅淑,split,print都有對應(yīng)的Transformation實現(xiàn)類胸哥,具體實現(xiàn)如下:
對Transformation的具體實現(xiàn)類都在org.apache.flink.streaming.api.transformations中
Operator
Transformation中持有StreamOperator的實現(xiàn)涯竟,StreamOperator封裝具體的Function來對DataStream中的record進(jìn)程處理;以StreamMap實現(xiàn)類為例烘嘱,StreamMap繼承自StreamOperator接口昆禽;
以一個例子(也是貫穿全文的例子)歸納以上三個類:
public?class?FlinkBatchDemo {
????private?static?final?Logger logger = LoggerFactory.getLogger(FlinkBatchDemo.class);
????public?static?void?main(String[] args)?throws?Exception {
????????final?StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
????????DataStream<String> text = env.fromElements(WordCountData.WORDS);
????????DataStream<Tuple2<String, Integer>> counts =
????????????????// split up the lines in pairs (2-tuples) containing: (word,1)
????????????????text.flatMap(new?Tokenizer())
????????????????????????// group by the tuple field "0" and sum up tuple field "1"
????????????????????????.keyBy(0).sum(1);
????????// emit result
????????counts.print();
//??????? System.out.println(env.getStreamGraph().getStreamingPlanAsJSON());
????????// execute program
????????env.execute("Streaming WordCount");
????}
????public?static?final?class?Tokenizer?implements?FlatMapFunction<String, Tuple2<String, Integer>> {
????????@Override
????????public?void?flatMap(String value, Collector<Tuple2<String, Integer>> out) {
????????????// normalize and split the line
????????????String[] tokens = value.toLowerCase().split("\\W+");
????????????// emit the pairs
????????????for?(String token : tokens) {
????????????????if?(token.length() >?0) {
????????????????????out.collect(new?Tuple2<>(token,?1));
????????????????}
????????????}
????????}
????}
}
示例中text.flatMap(new Tokenizer())這一行,對inputStream調(diào)用flatMap算子蝇庭,相當(dāng)于對inputStream這個DataStream進(jìn)行transform醉鳖,transform的具體實現(xiàn)類為:OneInputTransformation(生成的該OneInputTransformation的input為inputStream中的Transformation),而OneInputTransformation持有StreamFlatMap哮内,具體的處理邏輯在StreamFlatMap中 盗棵,通過調(diào)用userFunction的具體實現(xiàn)來實現(xiàn)壮韭,如下圖所示:
(該圖有誤,需要重畫)
StreamGraph
StreamGraph可視化
通過env.getStreamGraph().getStreamingPlanAsJSON()方法可以的到StreamGraph的Json:
{
??"nodes"?: [ {
????"id"?:?1,
????"type"?:?"Source: Collection Source",
????"pact"?:?"Data Source",
????"contents"?:?"Source: Collection Source",
????"parallelism"?:?1
??}, {
????"id"?:?2,
????"type"?:?"Flat Map",
????"pact"?:?"Operator",
????"contents"?:?"Flat Map",
????"parallelism"?:?1,
????"predecessors"?: [ {
??????"id"?:?1,
??????"ship_strategy"?:?"FORWARD",
??????"side"?:?"second"
????} ]
??}, {
????"id"?:?4,
????"type"?:?"Keyed Aggregation",
????"pact"?:?"Operator",
????"contents"?:?"Keyed Aggregation",
????"parallelism"?:?1,
????"predecessors"?: [ {
??????"id"?:?2,
??????"ship_strategy"?:?"HASH",
??????"side"?:?"second"
????} ]
??}, {
????"id"?:?5,
????"type"?:?"Sink: Print to Std. Out",
????"pact"?:?"Data Sink",
????"contents"?:?"Sink: Print to Std. Out",
????"parallelism"?:?1,
????"predecessors"?: [ {
??????"id"?:?4,
??????"ship_strategy"?:?"FORWARD",
??????"side"?:?"second"
????} ]
??} ]
}
通過flink提供的可視化界面:https://flink.apache.org/visualizer/得到如下所示圖形:
從圖形中可以看到纹因,缺少ID=3的節(jié)點(StreamNode)喷屋,對應(yīng)算子keyBy(0)未生成對應(yīng)的StreamNode;因此可以看出瞭恰,不是所有的算子都會產(chǎn)生對應(yīng)的StreamNode屯曹;
源碼閱讀
從StreamExecutionEnvironment類入手來看源碼是如何生成StreamGraph的,帶著問題去看:
什么樣的算子會生成StreamNode惊畏,什么樣的算子會被忽略恶耽,被忽略的算子中的function是怎樣處理的?
圖中箭頭中的FORWARD颜启、HASH代表什么偷俭?
追蹤代碼,StreamGraph是在env.execute()之后觸發(fā)生成的
-> execute(getStreamGraph(jobName));
??->?return?getStreamGraph(jobName,?true);
????-> StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
??????->?return?new?StreamGraphGenerator(transformations, config, checkpointCfg)
生成StreamGraphGenerator需要transformations參數(shù)缰盏,追蹤transformations賦值的過程:
-> env.fromElements
?->?return?addSource(function,?"Collection Source", typeInfo).setParallelism(1);
??->?return?new?DataStreamSource<>(this, resolvedTypeInfo, sourceOperator, isParallel, sourceName);
???-> DataStream.flatMap
????->?return?transform("Flat Map", outputType,?new?StreamFlatMap<>(clean(flatMapper)));
?????-> getExecutionEnvironment().addOperator(resultTransform);?// resultTransform是flatmap對應(yīng)的Transformation涌萤,其inputTransformation也就是上游??????? transformation對應(yīng)LegacySourceTransformation
??????->?this.transformations.add(transformation);
發(fā)現(xiàn)將flatmap對應(yīng)的transformation也就是OneInputTransformation加入到變量transformations中;
繼續(xù)追蹤demo中的算子口猜,最終確定transformations中存在Flat Map →?OneInputTransformation,?Keyed Aggregation?→?OneInputTransformation(上游transformation是keyBy算子對應(yīng)的PartitionTransformation),?Print to Std. Out?→?SinkTransformation(上游transformation是OneInputTransformation);
共有三個Transformation负溪;
StreamGraphGenerator是通過遍歷Transformation中的每一個成員,最終形成一個DAG圖:
for?(Transformation<?> transformation: transformations) {
???transform(transformation);
}
...
不同的Transform類型暮的,對應(yīng)不同的方法笙以;
需要注意的是(問題1):
對于Keyed Aggregation這個OneInputTransformation,上游的PartitionTransformation不會生成對應(yīng)的StreamNode冻辩,而是生成一個VirtualPartitionNode:
private?<T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {
???Transformation<T> input = partition.getInput();
???List resultIds =?new?ArrayList<>();
???Collection<Integer> transformedIds = transform(input);
???for?(Integer transformedId: transformedIds) {
??????int?virtualId = Transformation.getNewNodeId();
??????streamGraph.addVirtualPartitionNode(
????????????transformedId, virtualId, partition.getPartitioner(), partition.getShuffleMode());
??????resultIds.add(virtualId);
???}
???return?resultIds;
}
// 對于VirtualPartitionNode,在addEdge中拆祈,如果上游是該Node恨闪,會繼續(xù)向上遍歷直到獲取到非虛擬節(jié)點
}?else?if?(virtualPartitionNodes.containsKey(upStreamVertexID)) {
???int?virtualId = upStreamVertexID;
???upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
???if?(partitioner ==?null) {
??????partitioner = virtualPartitionNodes.get(virtualId).f1;
???}
???shuffleMode = virtualPartitionNodes.get(virtualId).f2;
???addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
}
所以對于keyby算子(對應(yīng)PartitionTransformation)不會生成對應(yīng)的StreamNode與StreamEdge,生成的StreamGraph會越過這個算子直接將兩個非虛擬的Node進(jìn)行連接放坏;具體執(zhí)行的function在生成StreamGraph的代碼中沒有體現(xiàn)咙咽。
對于問題2:
生成兩個Node之間的Edge時,會對上下游的partition數(shù)目進(jìn)行判斷淤年,上下游數(shù)目一致時钧敞,選擇ForwardPartitioner:
if?(partitioner ==?null?&& upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
???partitioner =?new?ForwardPartitioner<Object>();
}?else?if?(partitioner ==?null) {
???partitioner =?new?RebalancePartitioner<Object>();
}
public?class?ForwardPartitioner?extends?StreamPartitioner<T> {
???private?static?final?long?serialVersionUID = 1L;
???@Override
???public?int?selectChannel(SerializationDelegate<StreamRecord<T>> record) {
??????return?0;
???}
???public?StreamPartitioner<T> copy() {
??????return?this;
???}
???@Override
???public?String toString() {
??????return?"FORWARD";
???}
}
而KeyedStream中,用到的是KeyGroupStreamPartitioner
```public?KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
???this(
??????dataStream,
??????new?PartitionTransformation<>(
?????????dataStream.getTransformation(),
?????????// 初始化KeyedStream時麸粮,聲明KeyGroupStreamPartitioner
?????????new?KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),
??????keySelector,
??????keyType);
}
public?class?KeyGroupStreamPartitioner?extends?StreamPartitioner?implements?ConfigurableStreamPartitioner {
???private?static?final?long?serialVersionUID = 1L;
???private?final?KeySelector<T, K> keySelector;
???private?int?maxParallelism;
???public?KeyGroupStreamPartitioner(KeySelector keySelector,?int?maxParallelism) {
??????Preconditions.checkArgument(maxParallelism >?0,?"Number of key-groups must be > 0!");
??????this.keySelector = Preconditions.checkNotNull(keySelector);
??????this.maxParallelism = maxParallelism;
???}
???public?int?getMaxParallelism() {
??????return?maxParallelism;
???}
???@Override
???public?int?selectChannel(SerializationDelegate<StreamRecord<T>> record) {
??????K key;
??????try?{
?????????key = keySelector.getKey(record.getInstance().getValue());
??????}?catch?(Exception e) {
?????????throw?new?RuntimeException("Could not extract key from "?+ record.getInstance().getValue(), e);
??????}
??????return?KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
???}
???@Override
???public?StreamPartitioner<T> copy() {
??????return?this;
???}
???@Override
???public?String toString() {
??????return?"HASH";
???}
???@Override
???public?void?configure(int?maxParallelism) {
??????KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism);
??????this.maxParallelism = maxParallelism;
???}
}```swift