StreamGraph生成

名詞解釋

StreamGraph

一個代碼用戶編碼結(jié)構(gòu)的拓?fù)浣Y(jié)構(gòu)(不是很準(zhǔn)確哼审,因為很多用戶編碼的算子沒有生成對應(yīng)的StreamNode,like: shuffle, split等)斩启,所以【一個還未經(jīng)過優(yōu)化處理的邏輯計劃】這樣描述會更加準(zhǔn)確。由Client端生成。(問題:StreamGraph → JobGraph這一步的轉(zhuǎn)換叫惊,優(yōu)化了什么帝洪?)

相關(guān)API

DataStream

A DataStream represents a stream of elements of the same type. A DataStream can be transformed into another DataStream by applying a transformation.

顧名思義似舵,data組成的stream;DataStream中持有Transformation<T> transformation葱峡,經(jīng)過該transformation砚哗,得到此DataStream(注意不是通過該transformation生成一個新的DataStream),意味著DataStream中持有的Transformation是該DataStream的來源砰奕;

Transformation

A Transformation represents the operation that creates a DataStream

(講述起來比較復(fù)雜)可以簡單理解成蛛芥,代碼中每個引用的算子,都是一個transformation军援;比如flatMap仅淑,split,print都有對應(yīng)的Transformation實現(xiàn)類胸哥,具體實現(xiàn)如下:


對Transformation的具體實現(xiàn)類都在org.apache.flink.streaming.api.transformations中

Operator

Transformation中持有StreamOperator的實現(xiàn)涯竟,StreamOperator封裝具體的Function來對DataStream中的record進(jìn)程處理;以StreamMap實現(xiàn)類為例烘嘱,StreamMap繼承自StreamOperator接口昆禽;

以一個例子(也是貫穿全文的例子)歸納以上三個類:

public?class?FlinkBatchDemo {

????private?static?final?Logger logger = LoggerFactory.getLogger(FlinkBatchDemo.class);

????public?static?void?main(String[] args)?throws?Exception {

????????final?StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

????????DataStream<String> text = env.fromElements(WordCountData.WORDS);

????????DataStream<Tuple2<String, Integer>> counts =

????????????????// split up the lines in pairs (2-tuples) containing: (word,1)

????????????????text.flatMap(new?Tokenizer())

????????????????????????// group by the tuple field "0" and sum up tuple field "1"

????????????????????????.keyBy(0).sum(1);

????????// emit result

????????counts.print();

//??????? System.out.println(env.getStreamGraph().getStreamingPlanAsJSON());

????????// execute program

????????env.execute("Streaming WordCount");

????}


????public?static?final?class?Tokenizer?implements?FlatMapFunction<String, Tuple2<String, Integer>> {


????????@Override

????????public?void?flatMap(String value, Collector<Tuple2<String, Integer>> out) {

????????????// normalize and split the line

????????????String[] tokens = value.toLowerCase().split("\\W+");


????????????// emit the pairs

????????????for?(String token : tokens) {

????????????????if?(token.length() >?0) {

????????????????????out.collect(new?Tuple2<>(token,?1));

????????????????}

????????????}

????????}

????}

}

示例中text.flatMap(new Tokenizer())這一行,對inputStream調(diào)用flatMap算子蝇庭,相當(dāng)于對inputStream這個DataStream進(jìn)行transform醉鳖,transform的具體實現(xiàn)類為:OneInputTransformation(生成的該OneInputTransformation的input為inputStream中的Transformation),而OneInputTransformation持有StreamFlatMap哮内,具體的處理邏輯在StreamFlatMap中 盗棵,通過調(diào)用userFunction的具體實現(xiàn)來實現(xiàn)壮韭,如下圖所示:

(該圖有誤,需要重畫)

StreamGraph

StreamGraph可視化

通過env.getStreamGraph().getStreamingPlanAsJSON()方法可以的到StreamGraph的Json:

{

??"nodes"?: [ {

????"id"?:?1,

????"type"?:?"Source: Collection Source",

????"pact"?:?"Data Source",

????"contents"?:?"Source: Collection Source",

????"parallelism"?:?1

??}, {

????"id"?:?2,

????"type"?:?"Flat Map",

????"pact"?:?"Operator",

????"contents"?:?"Flat Map",

????"parallelism"?:?1,

????"predecessors"?: [ {

??????"id"?:?1,

??????"ship_strategy"?:?"FORWARD",

??????"side"?:?"second"

????} ]

??}, {

????"id"?:?4,

????"type"?:?"Keyed Aggregation",

????"pact"?:?"Operator",

????"contents"?:?"Keyed Aggregation",

????"parallelism"?:?1,

????"predecessors"?: [ {

??????"id"?:?2,

??????"ship_strategy"?:?"HASH",

??????"side"?:?"second"

????} ]

??}, {

????"id"?:?5,

????"type"?:?"Sink: Print to Std. Out",

????"pact"?:?"Data Sink",

????"contents"?:?"Sink: Print to Std. Out",

????"parallelism"?:?1,

????"predecessors"?: [ {

??????"id"?:?4,

??????"ship_strategy"?:?"FORWARD",

??????"side"?:?"second"

????} ]

??} ]

}

通過flink提供的可視化界面:https://flink.apache.org/visualizer/得到如下所示圖形:



從圖形中可以看到纹因,缺少ID=3的節(jié)點(StreamNode)喷屋,對應(yīng)算子keyBy(0)未生成對應(yīng)的StreamNode;因此可以看出瞭恰,不是所有的算子都會產(chǎn)生對應(yīng)的StreamNode屯曹;

源碼閱讀

從StreamExecutionEnvironment類入手來看源碼是如何生成StreamGraph的,帶著問題去看:

什么樣的算子會生成StreamNode惊畏,什么樣的算子會被忽略恶耽,被忽略的算子中的function是怎樣處理的?

圖中箭頭中的FORWARD颜启、HASH代表什么偷俭?

追蹤代碼,StreamGraph是在env.execute()之后觸發(fā)生成的

-> execute(getStreamGraph(jobName));

??->?return?getStreamGraph(jobName,?true);

????-> StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();

??????->?return?new?StreamGraphGenerator(transformations, config, checkpointCfg)

生成StreamGraphGenerator需要transformations參數(shù)缰盏,追蹤transformations賦值的過程:

-> env.fromElements

?->?return?addSource(function,?"Collection Source", typeInfo).setParallelism(1);

??->?return?new?DataStreamSource<>(this, resolvedTypeInfo, sourceOperator, isParallel, sourceName);

???-> DataStream.flatMap

????->?return?transform("Flat Map", outputType,?new?StreamFlatMap<>(clean(flatMapper)));

?????-> getExecutionEnvironment().addOperator(resultTransform);?// resultTransform是flatmap對應(yīng)的Transformation涌萤,其inputTransformation也就是上游??????? transformation對應(yīng)LegacySourceTransformation

??????->?this.transformations.add(transformation);

發(fā)現(xiàn)將flatmap對應(yīng)的transformation也就是OneInputTransformation加入到變量transformations中;

繼續(xù)追蹤demo中的算子口猜,最終確定transformations中存在Flat Map →?OneInputTransformation,?Keyed Aggregation?→?OneInputTransformation(上游transformation是keyBy算子對應(yīng)的PartitionTransformation),?Print to Std. Out?→?SinkTransformation(上游transformation是OneInputTransformation);

共有三個Transformation负溪;

StreamGraphGenerator是通過遍歷Transformation中的每一個成員,最終形成一個DAG圖:

for?(Transformation<?> transformation: transformations) {

???transform(transformation);

}



...

不同的Transform類型暮的,對應(yīng)不同的方法笙以;

需要注意的是(問題1):

對于Keyed Aggregation這個OneInputTransformation,上游的PartitionTransformation不會生成對應(yīng)的StreamNode冻辩,而是生成一個VirtualPartitionNode:

private?<T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {

???Transformation<T> input = partition.getInput();

???List resultIds =?new?ArrayList<>();


???Collection<Integer> transformedIds = transform(input);

???for?(Integer transformedId: transformedIds) {

??????int?virtualId = Transformation.getNewNodeId();

??????streamGraph.addVirtualPartitionNode(

????????????transformedId, virtualId, partition.getPartitioner(), partition.getShuffleMode());

??????resultIds.add(virtualId);

???}


???return?resultIds;

}



// 對于VirtualPartitionNode,在addEdge中拆祈,如果上游是該Node恨闪,會繼續(xù)向上遍歷直到獲取到非虛擬節(jié)點

}?else?if?(virtualPartitionNodes.containsKey(upStreamVertexID)) {

???int?virtualId = upStreamVertexID;

???upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;

???if?(partitioner ==?null) {

??????partitioner = virtualPartitionNodes.get(virtualId).f1;

???}

???shuffleMode = virtualPartitionNodes.get(virtualId).f2;

???addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);

}

所以對于keyby算子(對應(yīng)PartitionTransformation)不會生成對應(yīng)的StreamNode與StreamEdge,生成的StreamGraph會越過這個算子直接將兩個非虛擬的Node進(jìn)行連接放坏;具體執(zhí)行的function在生成StreamGraph的代碼中沒有體現(xiàn)咙咽。

對于問題2:

生成兩個Node之間的Edge時,會對上下游的partition數(shù)目進(jìn)行判斷淤年,上下游數(shù)目一致時钧敞,選擇ForwardPartitioner:

if?(partitioner ==?null?&& upstreamNode.getParallelism() == downstreamNode.getParallelism()) {

???partitioner =?new?ForwardPartitioner<Object>();

}?else?if?(partitioner ==?null) {

???partitioner =?new?RebalancePartitioner<Object>();

}



public?class?ForwardPartitioner?extends?StreamPartitioner<T> {

???private?static?final?long?serialVersionUID = 1L;


???@Override

???public?int?selectChannel(SerializationDelegate<StreamRecord<T>> record) {

??????return?0;

???}


???public?StreamPartitioner<T> copy() {

??????return?this;

???}


???@Override

???public?String toString() {

??????return?"FORWARD";

???}

}

而KeyedStream中,用到的是KeyGroupStreamPartitioner

```public?KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {

???this(

??????dataStream,

??????new?PartitionTransformation<>(

?????????dataStream.getTransformation(),

?????????// 初始化KeyedStream時麸粮,聲明KeyGroupStreamPartitioner

?????????new?KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),

??????keySelector,

??????keyType);

}



public?class?KeyGroupStreamPartitioner?extends?StreamPartitioner?implements?ConfigurableStreamPartitioner {

???private?static?final?long?serialVersionUID = 1L;


???private?final?KeySelector<T, K> keySelector;


???private?int?maxParallelism;


???public?KeyGroupStreamPartitioner(KeySelector keySelector,?int?maxParallelism) {

??????Preconditions.checkArgument(maxParallelism >?0,?"Number of key-groups must be > 0!");

??????this.keySelector = Preconditions.checkNotNull(keySelector);

??????this.maxParallelism = maxParallelism;

???}


???public?int?getMaxParallelism() {

??????return?maxParallelism;

???}


???@Override

???public?int?selectChannel(SerializationDelegate<StreamRecord<T>> record) {

??????K key;

??????try?{

?????????key = keySelector.getKey(record.getInstance().getValue());

??????}?catch?(Exception e) {

?????????throw?new?RuntimeException("Could not extract key from "?+ record.getInstance().getValue(), e);

??????}

??????return?KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);

???}


???@Override

???public?StreamPartitioner<T> copy() {

??????return?this;

???}


???@Override

???public?String toString() {

??????return?"HASH";

???}


???@Override

???public?void?configure(int?maxParallelism) {

??????KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism);

??????this.maxParallelism = maxParallelism;

???}

}```swift

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末溉苛,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子弄诲,更是在濱河造成了極大的恐慌愚战,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,602評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異寂玲,居然都是意外死亡塔插,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,442評論 2 382
  • 文/潘曉璐 我一進(jìn)店門拓哟,熙熙樓的掌柜王于貴愁眉苦臉地迎上來想许,“玉大人,你說我怎么就攤上這事断序∩烊校” “怎么了?”我有些...
    開封第一講書人閱讀 152,878評論 0 344
  • 文/不壞的土叔 我叫張陵逢倍,是天一觀的道長捧颅。 經(jīng)常有香客問我,道長较雕,這世上最難降的妖魔是什么碉哑? 我笑而不...
    開封第一講書人閱讀 55,306評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮亮蒋,結(jié)果婚禮上扣典,老公的妹妹穿的比我還像新娘。我一直安慰自己慎玖,他們只是感情好贮尖,可當(dāng)我...
    茶點故事閱讀 64,330評論 5 373
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著趁怔,像睡著了一般湿硝。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上润努,一...
    開封第一講書人閱讀 49,071評論 1 285
  • 那天关斜,我揣著相機(jī)與錄音,去河邊找鬼铺浇。 笑死痢畜,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的鳍侣。 我是一名探鬼主播丁稀,決...
    沈念sama閱讀 38,382評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼倚聚!你這毒婦竟也來了线衫?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,006評論 0 259
  • 序言:老撾萬榮一對情侶失蹤秉沼,失蹤者是張志新(化名)和其女友劉穎桶雀,沒想到半個月后矿酵,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,512評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡矗积,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,965評論 2 325
  • 正文 我和宋清朗相戀三年全肮,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片棘捣。...
    茶點故事閱讀 38,094評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡辜腺,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出乍恐,到底是詐尸還是另有隱情评疗,我是刑警寧澤,帶...
    沈念sama閱讀 33,732評論 4 323
  • 正文 年R本政府宣布茵烈,位于F島的核電站百匆,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏呜投。R本人自食惡果不足惜加匈,卻給世界環(huán)境...
    茶點故事閱讀 39,283評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望仑荐。 院中可真熱鬧雕拼,春花似錦、人聲如沸粘招。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,286評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽洒扎。三九已至辑甜,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間逊笆,已是汗流浹背栈戳。 一陣腳步聲響...
    開封第一講書人閱讀 31,512評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留难裆,地道東北人。 一個月前我還...
    沈念sama閱讀 45,536評論 2 354
  • 正文 我出身青樓镊掖,卻偏偏與公主長得像乃戈,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子亩进,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,828評論 2 345

推薦閱讀更多精彩內(nèi)容