概述
這篇文章是但不僅僅是官方文檔的中文翻譯坟瓢,還有里面每一個方法對應(yīng)的Transformation和運行時對Task的影響。
Prerequisites
- 關(guān)于算子想說的有很多,都在上一篇文章里椭员,在這篇文章中侥猬,把算子理解為包含了一個函數(shù)(Flink實現(xiàn)的或自己實現(xiàn)的,比如MapFunction欧漱,F(xiàn)ilterFunction)的持續(xù)獲得輸入并且將結(jié)果輸出出去的任務(wù)就好。
- 圖中的Task表示一個節(jié)點葬燎,或者說是一個TaskManager中一個Slot執(zhí)行的任務(wù)
- 流程圖中紅色代表這個方法在生成Transformation和實際運行時對Task產(chǎn)生的影響
DataStream
-
Map
- 消費一個元素并產(chǎn)出一個元素
- 參數(shù) MapFunction
- 返回DataStream
- 例子:
DataStream<Integer> dataStream = //... dataStream.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer value) throws Exception { return 2 * value; } });
-
Transformation: 生成一個OneInputTransformation并包含StreamMap算子
-
Runtime:
-
FlatMap
- 消費一個元素并產(chǎn)生零到多個元素
- 參數(shù) FlatMapFunction
- 返回 DataStream
- 例子:
dataStream.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { for(String word: value.split(" ")){ out.collect(word); } } });
-
Transformation: 生成一個OneInputTransformation并包含StreamFlatMap算子
-
Runtime:
-
Filter
- 根據(jù)FliterFunction返回的布爾值來判斷是否保留元素误甚,true為保留,false則丟棄
- 參數(shù) FilterFunction
- 返回DataStream
- 例子:
dataStream.filter(new FilterFunction<Integer>() { @Override public boolean filter(Integer value) throws Exception { return value != 0; } });
-
Transformation:生成一個OneInputTransformation并包含StreamFilter算子
-
Runtime:
-
KeyBy
- 根據(jù)指定的Key將元素發(fā)送到不同的分區(qū)谱净,相同的Key會被分到一個分區(qū)(這里分區(qū)指的就是下游算子多個并行的節(jié)點的其中一個)窑邦。keyBy()是通過哈希來分區(qū)的。
- 只能使用KeyedState(Flink做備份和容錯的狀態(tài))
- 參數(shù) String壕探,tuple的索引冈钦,覆蓋了hashCode方法的POJO,不能使數(shù)組
- 返回KeyedStream
- 例子:
dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key by the first element of a Tuple
-
Transformation: KeyBy會產(chǎn)生一個PartitionTransformation李请,并且通過KeySelector創(chuàng)建一個KeyGroupStreamPartitioner瞧筛,目的是將輸出的數(shù)據(jù)分區(qū)。此外還會把KeySelector保存到KeyedStream的屬性中导盅,在下一個Transformation創(chuàng)建時時將KeySelector注入進去较幌。
-
Runtime: 生成StreamGraph時會將PartitionTransformation中的Partitioner 注入到StreamEdge當(dāng)中,此外還會在下一個StreamNode創(chuàng)建過程中注入KeySelector用于提取元素的Key白翻。之后將Partitioner注入StreamRecordWriter中用于將上一個Task的輸出元素指定到某一個ResultSubParition中乍炉,此外KeySelector也被注入到下一個Task的算子當(dāng)中。
-
WindowAll
- 將元素按照某種特性聚集在一起(如時間:滑動窗口滤馍,翻轉(zhuǎn)窗口岛琼,會話窗口,又如出現(xiàn)次數(shù):計數(shù)窗口)
- 參數(shù) WindowAssigner
- 返回 AllWindowedStream
- 例子:
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
- Transformation:返回AllWindowedStream纪蜒,不產(chǎn)生Transformation衷恭,詳情見AllWindowedStream
- Runtime:詳情見AllWindowedStream
-
Union
- 將兩個或多個datastream合并,創(chuàng)造一個新的流包含這些datastream的所有元素
- 參數(shù)DataStream(一個或多個)
- 返回UnionStream
- 例子:
dataStream.union(otherStream1, otherStream2, ...);
-
Transformation: 從所有相關(guān)的stream中獲取Transformation并注入到UnionTransformation的inputs中
- Runtime:這些Inputs會在下一個Transformation創(chuàng)建時被作為Input來穿件StreamEdge纯续,如果上下游并行度一致則會生成ForwardPartitioner随珠,不一致則是RebalancePartitioner灭袁。由于Partitioner是在處理下游Transformation生成的,所以這里沒有圖窗看。
-
Join
- 將兩個DataStream按照key和window join在一起
- 參數(shù):1. KeySelector1 2. KeySelector2 3. DataStream 4. WindowAssigner 5. JoinFunction/FlatJoinFunction
- 返回DataStream
- 例子:
dataStream.join(otherStream) .where(<key selector>).equalTo(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new JoinFunction () {...});
- Transformation:1. 調(diào)用join方法后生成JoinedStream茸歧,JoinedStream保存了兩個input 2. 調(diào)用where方法生成一個內(nèi)部類Where對象,注入KeySelector1 3. 調(diào)用equalTo生成內(nèi)部類EqualTo對象显沈,注入KeySelector2 4. 調(diào)用window升成內(nèi)部靜態(tài)類WithWindow软瞎,并且注入WindowAssigner(在該對象中還可以注入Trigger和Evictor 5. 最后調(diào)用apply方法將(Flat)JoinFunction注入并且用一個(Flat)JoinCoGroupFunction封裝起來,而在這一步會將所有注入的對象用在coGroup上拉讯。詳情見下一個Window CoGroup的解析涤浇。
- Runtime: 與Window CoGroup相同,詳情見下一個WIndow CoGroup解析
-
Window CoGroup
- 根據(jù)Key和window將兩個DataStream的元素聚集在兩個集合中魔慷,根據(jù)CoGroupFunction來處理這兩個集合只锭,并產(chǎn)出結(jié)果
- 參數(shù) 1. DataStream 2. KeySelector1 3. KeySelector2 4. WindowAssigner 5. CoGroupFunction
- 返回DataStream
- 例子:
dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new CoGroupFunction () {...});
-
Transformation:生成一個TaggedUnion類型和unionKeySelector,里面分別包含了兩個流的元素類型和兩個流的KeySelector院尔。將兩個流通過map分別輸出為類型是TaggedUnion的兩個流(map詳情見StreamMap)蜻展,再Union在一起(詳情見Union),再使用合并過后的流和unionKeySelector生成一個KeyedStream(詳情見KeyBy)邀摆,最后使用KeyedStream的window方法并傳入WindowAssigner生成WindowedStream纵顾,并apply CoGroupFunction來處理(詳情見WindowedStream Apply方法)栋盹∈┯猓總體來說,F(xiàn)link對這個方法做了很多內(nèi)部的轉(zhuǎn)換贞盯,最后生成了兩個StreamMapTransformation音念,一個PartitionTransformation和一個包含了WindowOperator的OneInputTransformation沪饺。
Runtime:參考每個Transformation對應(yīng)的Runtime情況
-
Connect
- 將兩個DataStream連接在一起躏敢,使得他們之間可以共享狀態(tài)
- 參數(shù) DataStream
- 返回ConnectedStreams
- 例子:
DataStream<Integer> someStream = //... DataStream<String> otherStream = //... ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
- Transformation:在這一步會生成一個包含了兩個DataStream的ConnectedStreams對象,不會有Transformation產(chǎn)生整葡。詳情見后續(xù)ConnectedStreams的API詳解件余。
-
Split
- 按照一個規(guī)則將一個流的元素產(chǎn)出到兩個或多個支流(每個元素可以發(fā)送到不止一個支流)
- 參數(shù) OutputSelector
- 返回 SplitStream
- 例子:
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() { @Override public Iterable<String> select(Integer value) { List<String> output = new ArrayList<String>(); if (value % 2 == 0) { output.add("even"); } else { output.add("odd"); } return output; } });
-
Transformation:在這一步會生成一個SplitTransformation,里面包含了OutputSelector遭居。
-
Runtime: 在生成StreamGraph時找到父Transformation啼器,并將OutputSelector注入到父StreamNode中。生成JobGraph的時候在注入到對應(yīng)的JobNode中俱萍,最后在運行時封裝到OperatorChain的OutputCollector中并且注入算子端壳。
-
Iterate
- 通過將一個算子的輸出重定向到某個輸入Operator上來創(chuàng)個一個循環(huán)。非常適合用來持續(xù)更新一個模型枪蘑。
- 過程 DataStream → IterativeStream → DataStream
- 例子:
IterativeStream<Long> iteration = initialStream.iterate(); DataStream<Long> iterationBody = iteration.map (/*do something*/); DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){ @Override public boolean filter(Integer value) throws Exception { return value > 0; } }); iteration.closeWith(feedback); DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){ @Override public boolean filter(Integer value) throws Exception { return value <= 0; } });
- Iterate不展開講解
-
ExtractTimestamps
- 從元素中提取timestamp來用作事件時間(EventTime)损谦。
- 參數(shù) TimeStampExtractor
- 返回 DataStream
- 例子:
stream.assignTimestamps (new TimeStampExtractor() {...});
-
Transformation:assignTimestamps會將TimeStampExtractor注入進剛創(chuàng)建的ExtractTimestampsOperator岖免,再通過ExtractTimestampsOperator生成一個OneInputTransformation
-
Runtime:
-
Project
- 如果元素是Tuple,直接通過index提取出Tuple中的字段組成新的Tuple照捡,并產(chǎn)出結(jié)果
- 參數(shù) Tuple中的index(int颅湘, 一個或多個)
- 返回 DataStream
- 例子:
DataStream<Tuple3<Integer, Double, String>> in = // [...] DataStream<Tuple2<String, Integer>> out = in.project(2,0);
-
Transformation:生成一個OneInputTransformation并包含StreamProjection算子
-
Runtime
-
Custom partitioning
- 通過用戶定義的流分區(qū)器(Partitioner)將每個元素傳輸?shù)街付ǖ膕ubtask
- 參數(shù) Partitioner, Tuple索引/POJO屬性名/KeySelector
- 返回 DataStream
- 例子:
dataStream.partitionCustom(partitioner, "someKey"); dataStream.partitionCustom(partitioner, 0);
-
Transformation:partitionCustom類似于KeyBy栗精,不過partitioner是由自己定制并且輸出的不是KeyedStream闯参。首先會通過KeySelector和用戶實現(xiàn)的Partitioner生成一個CustomPartitionerWrapper(StreamPartitioner),再講它注入到PartitionTransformation悲立。
-
Runtime:將Partitioner注入StreamRecordWriter中用于將上一個Task的輸出元素指定到某一個ResultSubParition中
-
Random partitioning
- 將元素按照均勻分布打散到下游
- 返回 DataStream
- 例子:
dataStream.shuffle();
- Transformation: 將partitioner換成ShufflePartitioner鹿寨,其余同上
- Runtime:同上
-
Rebalancing (Round-robin partitioning)
- 通過輪詢調(diào)度(Round-robin)將元素均勻的分配到下游
- 返回 DataStream
- 例子
dataStream.rebalance();
- Transformation: 將partitioner換成RebalancePartitioner,其余同上
- Runtime:同上
-
Rescaling
- 通過輪詢調(diào)度將元素從上游的task一個子集發(fā)送到下游task的一個子集
- 返回 DataStream
- 原理:第一個task并行度為2薪夕,第二個task并行度為6释移,第三個task并行度為2。從第一個task到第二個task寥殖,Src的子集Src1 和 Map的子集Map1玩讳,2,3對應(yīng)起來嚼贡,Src1會以輪詢調(diào)度的方式分別向Map1熏纯,2,3發(fā)送記錄粤策。從第二個task到第三個task樟澜,Map的子集1,2叮盘,3對應(yīng)Sink的子集1秩贰,這三個流的元素只會發(fā)送到Sink1。
假設(shè)我們每個TaskManager有三個Slot柔吼,并且我們開了SlotSharingGroup毒费,那么通過rescale,所有的數(shù)據(jù)傳輸都在一個TaskManager內(nèi)愈魏,不需要通過網(wǎng)絡(luò)觅玻。 - 例子
dataStream.rescale();
- Transformation: 將partitioner換成RescalePartitioner,其余同上
- Runtime:同上
-
Broadcasting
- 將元素廣播到每個分區(qū)
- 返回DataStream
- 例子:
dataStream.broadcast();
KeyedStream
-
Reduce
- 根據(jù)ReduceFunction將元素與上一個reduce后的結(jié)果合并培漏,產(chǎn)出合并之后的結(jié)果溪厘。
- 參數(shù) ReduceFunction
- 返回 DataStream
- 例子:
keyedStream.reduce(new ReduceFunction<Integer>() { @Override public Integer reduce(Integer value1, Integer value2) throws Exception { return value1 + value2; } });
-
Transformation:生成一個OneInputTransformation并包含StreamGroupedReduce算子
-
Runtime:
-
Fold
- 根據(jù)FoldFunction和初始值,將元素與上一個fold過后的結(jié)果合并牌柄,產(chǎn)出合并之后的結(jié)果畸悬。
- 參數(shù) FoldFunction
- 返回 DataStream
- 例子:
DataStream<String> result = keyedStream.fold("start", new FoldFunction<Integer, String>() { @Override public String fold(String current, Integer value) { return current + "-" + value; } });
- Transformation:將StreamGroupedReduce換成StreamGroupedFold,其余同Reduce
- Runtime:將StreamGroupedReduce換成StreamGroupedFold珊佣,其余同Reduce
-
Aggregations
- Flink實現(xiàn)的一系列聚合方法蹋宦,具體作用由方法名就可以得知
- 返回 DataStream
- 例子:
keyedStream.sum(0); keyedStream.sum("key"); keyedStream.min(0); keyedStream.min("key"); keyedStream.max(0); keyedStream.max("key"); keyedStream.minBy(0); keyedStream.minBy("key"); keyedStream.maxBy(0); keyedStream.maxBy("key");
- Transformation:StreamGroupedReduce里注入了Flink內(nèi)置的Aggregation方法實現(xiàn)闺骚,同Reduce
- Transformation:同Reduce
-
Window
- 窗口將同一個key的元素按照某種特性聚集在一起(如時間:滑動窗口,翻轉(zhuǎn)窗口妆档,會話窗口僻爽,又如出現(xiàn)次數(shù):計數(shù)窗口)
- 返回WindowedStream
- 參數(shù)WindowAssigner
- 例子:
dataStream.window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
- Transformation: 生成一個WindowedStream,不產(chǎn)生Transformation贾惦,詳情見WindowedStream詳解
- Runtime:詳情見WindowedStream
-
Interval Join
- 給定一個時間間隔胸梆,將兩個流中的元素按照key來做join
- 滿足條件e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound
- 參數(shù) 1. KeyedStream 2. Time: LowerBound and UpperBound 3. boolean(optional) 4. boolean(optional) 5. IntervalJoinFunction
- 返回DataStream
- 例子:
// this will join the two streams so that // key1 == key2 && leftTs - 2 < rightTs < leftTs + 2 keyedStream.intervalJoin(otherKeyedStream) .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound .upperBoundExclusive(true) // optional .lowerBoundExclusive(true) // optional .process(new IntervalJoinFunction() {...});
WindowedStream
-
Apply
- 使用WindowFunction對window重的元素做處理(例如聚合操作)并產(chǎn)出結(jié)果
- 參數(shù) WindowFunction
- 返回 DataStream
- 例子:
windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() { public void apply (Tuple tuple, Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } });
-
Transformation:
-
Runtime:
-
Reduce
- 根據(jù)ReduceFunction將窗口中的元素按照key和window合并,并產(chǎn)出結(jié)果
- 參數(shù) ReduceFunction
- 返回DataStream
- 例子
windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() { public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1); } });
- Transformation:基本同上须板,將ReduceFunction注入到WindowOperator中(具體注入方式要看有沒有evictor碰镜,這邊不作贅述)。
- Runtime:同上
-
Aggregations
- Flink實現(xiàn)的一系列聚合方法习瑰,具體作用由方法名就可以得知绪颖,需要注意的是他們被分別作用在按key和window分割過后的元素集合上
- 返回 DataStream
- 例子:
windowedStream.sum(0); windowedStream.sum("key"); windowedStream.min(0); windowedStream.min("key"); windowedStream.max(0); windowedStream.max("key"); windowedStream.minBy(0); windowedStream.minBy("key"); windowedStream.maxBy(0); windowedStream.maxBy("key");
- Transformation:WindowOperator里注入了Flink內(nèi)置的Aggregation方法實現(xiàn),其余同上
- Runtime:同上
AllWindowedStream
-
Apply
- 使用WindowFunction對window重的元素做處理(例如聚合操作)并產(chǎn)出結(jié)果
- 與WindowedStream的區(qū)別在于是否有key
- 參數(shù) WindowFunction
- 返回 DataStream
- 例子
// applying an AllWindowFunction on non-keyed window stream allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() { public void apply (Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } });
- Transformation:AllWindowedStream.apply()與WindowedStream.apply()基本是一致的甜奄,只是沒有KeySelector
- Runtime:通WindowedStream.apply()
ConnectedStreams
-
CoMap, CoFlatMap
- 同時對兩個流進行Map或FlatMap操作
- 參數(shù) CoMapFunction, CoFlatMapFunction
- 返回 DataStream
- 例子:
connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() { @Override public Boolean map1(Integer value) { return true; } @Override public Boolean map2(String value) { return false; } }); connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() { @Override public void flatMap1(Integer value, Collector<String> out) { out.collect(value.toString()); } @Override public void flatMap2(String value, Collector<String> out) { for (String word: value.split(" ")) { out.collect(word); } } });
-
Transformation:ConnectedStream并不會產(chǎn)生Transformation柠横,只會保存兩個Input DataStream,從inputs中的DataStream獲取父Transformation课兄,并生成一個CoStream(Flat)Map算子牍氛。KeySelector依賴于父Transformation注入(如果是PartitionTransformation的話)。
-
Runtime: Task會具體負(fù)責(zé)調(diào)用processElement1方法還是processElement2方法烟阐。
SplitStream
-
Select
- 根據(jù)SplitStream中OutputSelector設(shè)定的規(guī)則獲取一個或多個DataStream
- 參數(shù) OutputNames
- 返回 DataStream
- 例子:
SplitStream<Integer> split; DataStream<Integer> even = split.select("even"); DataStream<Integer> odd = split.select("odd"); DataStream<Integer> all = split.select("even","odd");
-
Transformation:生成SelectTransformation搬俊,里面包含了OutputSelector
-
Runtime:生成StreamGraph時會將OutputNames注入到新生成的StreamEdge中,然后注入到對應(yīng)的JobEdge中蜒茄,最后用它來生成OutputCollector中的outputMap唉擂,發(fā)送消息時根據(jù)相應(yīng)的selectedName發(fā)送到相應(yīng)的下游Task