上文學習了簡單的map铃彰、flatmap早敬、filter,在這里開始繼續(xù)看keyBy及reduce
keyBy
先看定義屁置,通過keyBy,DataStream→KeyedStream仁连。
邏輯上將流分區(qū)為不相交的分區(qū)蓝角。具有相同Keys的所有記錄都分配給同一分區(qū)。在內(nèi)部饭冬,keyBy()是使用散列分區(qū)實現(xiàn)的使鹅。指定鍵有不同的方法。
此轉換返回KeyedStream昌抠,其中包括使用被Keys化狀態(tài)所需的KeyedStream患朱。
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple(數(shù)組)
注意 如果出現(xiàn)以下情況,則類型不能成為關鍵:
- 它是POJO類型但不覆蓋hashCode()方法并依賴于Object.hashCode()實現(xiàn)扰魂。
- 它是任何類型的數(shù)組麦乞。
看段代碼:
public class KeyByTestJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L))
.keyBy(0) // 以數(shù)組的第一個元素作為key
.map((MapFunction<Tuple2<Long, Long>, String>) longLongTuple2 -> "key:" + longLongTuple2.f0 + ",value:" + longLongTuple2.f1)
.print();
env.execute("execute");
}
}
運行后,結果如下:
3> key:1,value:5
3> key:1,value:7
3> key:1,value:2
4> key:2,value:3
4> key:2,value:4
可以看到劝评,前面的 3> 和 4> 輸出 本身是個分組,而且順序是從先輸出key=1的tuple數(shù)組倦淀,再輸出key=2的數(shù)組蒋畜。
也就是說,keyby類似于sql中的group by撞叽,將數(shù)據(jù)進行了分組姻成。后面基于keyedSteam的操作,都是組內(nèi)操作愿棋。
斷點看了下keyedStream的結構:
可以看到科展,包含了keyType、keySelector糠雨,以及轉換后的PartitionTransformation才睹,也就是已經(jīng)做了分區(qū)了。后續(xù)的所有操作都是按照分區(qū)內(nèi)數(shù)據(jù)來處理的甘邀。
reduce
reduce表示將數(shù)據(jù)合并成一個新的數(shù)據(jù)琅攘,返回單個的結果值,并且 reduce 操作每處理一個元素總是創(chuàng)建一個新值松邪。而且reduce方法不能直接應用于SingleOutputStreamOperator對象坞琴,也好理解,因為這個對象是個無限的流逗抑,對無限的數(shù)據(jù)做合并剧辐,沒有任何意義哈寒亥!
所以reduce需要針對分組或者一個window(窗口)來執(zhí)行,也就是分別對應于keyBy荧关、window/timeWindow 處理后的數(shù)據(jù)溉奕,根據(jù)ReduceFunction將元素與上一個reduce后的結果合并,產(chǎn)出合并之后的結果羞酗。
在上面代碼的基礎上修改:
public class KeyByTestJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L))
.keyBy(0) // 以數(shù)組的第一個元素作為key
.reduce((ReduceFunction<Tuple2<Long, Long>>) (t2, t1) -> new Tuple2<>(t1.f0, t2.f1 + t1.f1)) // value做累加
.print();
env.execute("execute");
}
}
3> (1,5)
3> (1,12)
3> (1,14)
4> (2,3)
4> (2,7)
可以看到腐宋,分組后,每次有一個數(shù)組進來檀轨,都會產(chǎn)生新的數(shù)據(jù)胸竞,依然是按照分組來輸出的。
如果改下reduce中的實現(xiàn):
ReduceFunction<Tuple2<Long, Long>>) (t2, t1) -> new Tuple2<>(t1.f0 + t2.f0, t2.f1 + t1.f1)
那么輸出就是:
2019-01-22 12:04:56.083 [Keyed Reduce -> Sink: Print to Std. Out (2/4)] INFO org.apache.flink.runtime.taskmanager.Task - Keyed Reduce -> Sink: Print to Std. Out (2/4) (7117b0831e59cae2201e6f7097356214) switched from RUNNING to FINISHED.
2019-01-22 12:04:56.083 [Keyed Reduce -> Sink: Print to Std. Out (2/4)] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Keyed Reduce -> Sink: Print to Std. Out (2/4) (7117b0831e59cae2201e6f7097356214).
2019-01-22 12:04:56.083 [Keyed Reduce -> Sink: Print to Std. Out (2/4)] INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Keyed Reduce -> Sink: Print to Std. Out (2/4) (7117b0831e59cae2201e6f7097356214) [FINISHED]
4> (2,3)
4> (4,7)
...
2019-01-22 12:04:56.118 [flink-akka.actor.default-dispatcher-4] INFO o.a.flink.runtime.executiongraph.ExecutionGraph - Keyed Reduce -> Sink: Print to Std. Out (2/4) (7117b0831e59cae2201e6f7097356214) switched from RUNNING to FINISHED.
2019-01-22 12:04:56.122 [flink-akka.actor.default-dispatcher-4] INFO o.a.flink.runtime.executiongraph.ExecutionGraph - Keyed Reduce -> Sink: Print to Std. Out (1/4) (0fdc49eb18050efa3acec361978f3e93) switched from RUNNING to FINISHED.
2019-01-22 12:04:56.125 [flink-akka.actor.default-dispatcher-4] INFO o.a.flink.runtime.executiongraph.ExecutionGraph - Keyed Reduce -> Sink: Print to Std. Out (4/4) (1607b502ab2791f2f567c61da214bd82) switched from RUNNING to FINISHED.
3> (1,5)
3> (2,12)
3> (3,14)
可以看到輸出結果参萄,一方面是是key-reduce的狀態(tài)卫枝,從RUNNING遷移到FINISHED;另一方面是按組輸出了最終的reduce值讹挎。
聚合
KeyedStream→DataStream
在被Keys化數(shù)據(jù)流上滾動聚合校赤。min和minBy之間的差異是min返回最小值,而minBy返回該字段中具有最小值的數(shù)據(jù)元(max和maxBy類似)筒溃。
---TODO 這里存疑马篮,因為返回的數(shù)據(jù)始終是數(shù)據(jù)源,難道是我寫錯了什么怜奖?SingleOutputStreamOperator<Tuple2>改成SingleOutputStreamOperator<Long> 也是一樣的結果浑测,等待后續(xù)繼續(xù)驗證。
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
繼續(xù)在上面代碼的基礎上做實驗:
sum
public class KeyByTestJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
KeyedStream keyedStream = env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L))
.keyBy(0) // 以數(shù)組的第一個元素作為key
;
SingleOutputStreamOperator<Tuple2> sumStream = keyedStream.sum(0);
sumStream.addSink(new PrintSinkFunction<>());
env.execute("execute");
}
對第一個元素(位置0)做sum歪玲,結果如下:
3> (1,5)
3> (2,5)
3> (3,5)
...
4> (2,3)
2019-01-22 21:27:07.401 [flink-akka.actor.default-dispatcher-3] INFO o.a.flink.runtime.executiongraph.ExecutionGraph - Source: Collection Source (1/1) (f3368fedb9805b1e59f4443252a2fb2b) switched from RUNNING to FINISHED.
4> (4,3)
可以看到迁央,對第一個數(shù)據(jù)(也就是key)做了累加,然后value以第一個進來的數(shù)據(jù)為準滥崩。
如過改成keyedStream.sum(1); 也就是針對第二個元素求和岖圈,得到的結果如下:
4> (2,3)
4> (2,7)
...
3> (1,5)
3> (1,12)
2019-01-23 10:50:47.498 [flink-akka.actor.default-dispatcher-5] INFO o.a.flink.runtime.executiongraph.ExecutionGraph - Source: Collection Source (1/1) (df09751c6722a5942b058a1300ae9fb3) switched from RUNNING to FINISHED.
3> (1,14)
min
SingleOutputStreamOperator<Tuple2> sumStream = keyedStream.min(1);
得到的輸出結果是:
3> (1,5) -- 第一組 第一個數(shù)據(jù)到的結果
3> (1,5) -- 第一組 第二個數(shù)據(jù)到的結果
4> (2,3) -- 第二組 第一個數(shù)據(jù)到的結果
4> (2,3) -- 第二組 第二個數(shù)據(jù)到的結果
3> (1,2) -- 第一組 第三個數(shù)據(jù)到的結果
這里順序有點亂,不過沒問題钙皮,數(shù)據(jù)按照順序一個一個的過來蜂科,然后計算當前數(shù)據(jù)過來時有最小value的數(shù)據(jù)。
minBy
SingleOutputStreamOperator<Tuple2> sumStream = keyedStream.minBy(1);
3> (1,5)
3> (1,5)
4> (2,3)
3> (1,2)
4> (2,3)
類似的株灸,只是組間打印的順序有區(qū)別而已崇摄。
max
SingleOutputStreamOperator<Tuple2> sumStream = keyedStream.max(1);
3> (1,5)
4> (2,3)
3> (1,7)
4> (2,4)
3> (1,7)
按照順序,取最大的數(shù)據(jù)
maxBy
SingleOutputStreamOperator<Tuple2> sumStream = keyedStream.maxBy(1);
3> (1,5)
4> (2,3)
3> (1,7)
4> (2,4)
3> (1,7)
有一點要牢記慌烧,數(shù)據(jù)是一直流過來的逐抑,這些聚合方法都是在每次收到新的數(shù)據(jù)之后,重新計算/比較得出來的結果屹蚊,而不是只有一個最終結果厕氨。
PS:有人評論說哪兒來的f0进每、f1,只能這里貼個圖了...