flink學習之八-keyby&reduce

上文學習了簡單的map铃彰、flatmap早敬、filter,在這里開始繼續(xù)看keyBy及reduce

keyBy

先看定義屁置,通過keyBy,DataStream→KeyedStream仁连。

邏輯上將流分區(qū)為不相交的分區(qū)蓝角。具有相同Keys的所有記錄都分配給同一分區(qū)。在內(nèi)部饭冬,keyBy()是使用散列分區(qū)實現(xiàn)的使鹅。指定鍵有不同的方法。

此轉換返回KeyedStream昌抠,其中包括使用被Keys化狀態(tài)所需的KeyedStream患朱。

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple(數(shù)組)
    

注意 如果出現(xiàn)以下情況,則類型不能成為關鍵

  1. 它是POJO類型但不覆蓋hashCode()方法并依賴于Object.hashCode()實現(xiàn)扰魂。
  2. 它是任何類型的數(shù)組麦乞。

看段代碼:

public class KeyByTestJob {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
        env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L))
                .keyBy(0) // 以數(shù)組的第一個元素作為key
                .map((MapFunction<Tuple2<Long, Long>, String>) longLongTuple2 -> "key:" + longLongTuple2.f0 + ",value:" + longLongTuple2.f1)
                .print();

        env.execute("execute");
    }
}

運行后,結果如下:

3> key:1,value:5
3> key:1,value:7
3> key:1,value:2
4> key:2,value:3
4> key:2,value:4

可以看到劝评,前面的 3> 和 4> 輸出 本身是個分組,而且順序是從先輸出key=1的tuple數(shù)組倦淀,再輸出key=2的數(shù)組蒋畜。

也就是說,keyby類似于sql中的group by撞叽,將數(shù)據(jù)進行了分組姻成。后面基于keyedSteam的操作,都是組內(nèi)操作愿棋。

斷點看了下keyedStream的結構:

keyedStream.png

可以看到科展,包含了keyType、keySelector糠雨,以及轉換后的PartitionTransformation才睹,也就是已經(jīng)做了分區(qū)了。后續(xù)的所有操作都是按照分區(qū)內(nèi)數(shù)據(jù)來處理的甘邀。

reduce

reduce表示將數(shù)據(jù)合并成一個新的數(shù)據(jù)琅攘,返回單個的結果值,并且 reduce 操作每處理一個元素總是創(chuàng)建一個新值松邪。而且reduce方法不能直接應用于SingleOutputStreamOperator對象坞琴,也好理解,因為這個對象是個無限的流逗抑,對無限的數(shù)據(jù)做合并剧辐,沒有任何意義哈寒亥!

所以reduce需要針對分組或者一個window(窗口)來執(zhí)行,也就是分別對應于keyBy荧关、window/timeWindow 處理后的數(shù)據(jù)溉奕,根據(jù)ReduceFunction將元素與上一個reduce后的結果合并,產(chǎn)出合并之后的結果羞酗。

在上面代碼的基礎上修改:

public class KeyByTestJob {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
        env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L))
                .keyBy(0) // 以數(shù)組的第一個元素作為key
                .reduce((ReduceFunction<Tuple2<Long, Long>>) (t2, t1) -> new Tuple2<>(t1.f0, t2.f1 + t1.f1)) // value做累加
                .print();

        env.execute("execute");
    }
}
3> (1,5)
3> (1,12)
3> (1,14)
4> (2,3)
4> (2,7)

可以看到腐宋,分組后,每次有一個數(shù)組進來檀轨,都會產(chǎn)生新的數(shù)據(jù)胸竞,依然是按照分組來輸出的。

如果改下reduce中的實現(xiàn):

ReduceFunction<Tuple2<Long, Long>>) (t2, t1) -> new Tuple2<>(t1.f0 + t2.f0, t2.f1 + t1.f1)

那么輸出就是:

2019-01-22 12:04:56.083 [Keyed Reduce -> Sink: Print to Std. Out (2/4)] INFO  org.apache.flink.runtime.taskmanager.Task - Keyed Reduce -> Sink: Print to Std. Out (2/4) (7117b0831e59cae2201e6f7097356214) switched from RUNNING to FINISHED.
2019-01-22 12:04:56.083 [Keyed Reduce -> Sink: Print to Std. Out (2/4)] INFO  org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Keyed Reduce -> Sink: Print to Std. Out (2/4) (7117b0831e59cae2201e6f7097356214).
2019-01-22 12:04:56.083 [Keyed Reduce -> Sink: Print to Std. Out (2/4)] INFO  org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Keyed Reduce -> Sink: Print to Std. Out (2/4) (7117b0831e59cae2201e6f7097356214) [FINISHED]
4> (2,3)
4> (4,7)

...

2019-01-22 12:04:56.118 [flink-akka.actor.default-dispatcher-4] INFO  o.a.flink.runtime.executiongraph.ExecutionGraph - Keyed Reduce -> Sink: Print to Std. Out (2/4) (7117b0831e59cae2201e6f7097356214) switched from RUNNING to FINISHED.
2019-01-22 12:04:56.122 [flink-akka.actor.default-dispatcher-4] INFO  o.a.flink.runtime.executiongraph.ExecutionGraph - Keyed Reduce -> Sink: Print to Std. Out (1/4) (0fdc49eb18050efa3acec361978f3e93) switched from RUNNING to FINISHED.
2019-01-22 12:04:56.125 [flink-akka.actor.default-dispatcher-4] INFO  o.a.flink.runtime.executiongraph.ExecutionGraph - Keyed Reduce -> Sink: Print to Std. Out (4/4) (1607b502ab2791f2f567c61da214bd82) switched from RUNNING to FINISHED.
3> (1,5)
3> (2,12)
3> (3,14)

可以看到輸出結果参萄,一方面是是key-reduce的狀態(tài)卫枝,從RUNNING遷移到FINISHED;另一方面是按組輸出了最終的reduce值讹挎。

聚合

KeyedStream→DataStream

在被Keys化數(shù)據(jù)流上滾動聚合校赤。min和minBy之間的差異是min返回最小值,而minBy返回該字段中具有最小值的數(shù)據(jù)元(max和maxBy類似)筒溃。

---TODO 這里存疑马篮,因為返回的數(shù)據(jù)始終是數(shù)據(jù)源,難道是我寫錯了什么怜奖?SingleOutputStreamOperator<Tuple2>改成SingleOutputStreamOperator<Long> 也是一樣的結果浑测,等待后續(xù)繼續(xù)驗證。

keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");

繼續(xù)在上面代碼的基礎上做實驗:

sum
public class KeyByTestJob {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
        KeyedStream keyedStream =  env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L))
                .keyBy(0) // 以數(shù)組的第一個元素作為key
                ;

        SingleOutputStreamOperator<Tuple2> sumStream = keyedStream.sum(0);
        sumStream.addSink(new PrintSinkFunction<>());

        env.execute("execute");
    }

對第一個元素(位置0)做sum歪玲,結果如下:

3> (1,5)
3> (2,5)
3> (3,5)
...
4> (2,3)
2019-01-22 21:27:07.401 [flink-akka.actor.default-dispatcher-3] INFO  o.a.flink.runtime.executiongraph.ExecutionGraph - Source: Collection Source (1/1) (f3368fedb9805b1e59f4443252a2fb2b) switched from RUNNING to FINISHED.
4> (4,3)

可以看到迁央,對第一個數(shù)據(jù)(也就是key)做了累加,然后value以第一個進來的數(shù)據(jù)為準滥崩。

如過改成keyedStream.sum(1); 也就是針對第二個元素求和岖圈,得到的結果如下:

4> (2,3)
4> (2,7)
...
3> (1,5)
3> (1,12)
2019-01-23 10:50:47.498 [flink-akka.actor.default-dispatcher-5] INFO  o.a.flink.runtime.executiongraph.ExecutionGraph - Source: Collection Source (1/1) (df09751c6722a5942b058a1300ae9fb3) switched from RUNNING to FINISHED.
3> (1,14)
min
SingleOutputStreamOperator<Tuple2> sumStream = keyedStream.min(1);

得到的輸出結果是:

3> (1,5)  -- 第一組 第一個數(shù)據(jù)到的結果
3> (1,5)  -- 第一組 第二個數(shù)據(jù)到的結果
4> (2,3)  -- 第二組 第一個數(shù)據(jù)到的結果
4> (2,3)  -- 第二組 第二個數(shù)據(jù)到的結果
3> (1,2)  -- 第一組 第三個數(shù)據(jù)到的結果

這里順序有點亂,不過沒問題钙皮,數(shù)據(jù)按照順序一個一個的過來蜂科,然后計算當前數(shù)據(jù)過來時有最小value的數(shù)據(jù)。

minBy
SingleOutputStreamOperator<Tuple2> sumStream = keyedStream.minBy(1);
3> (1,5)
3> (1,5)
4> (2,3)
3> (1,2)
4> (2,3)

類似的株灸,只是組間打印的順序有區(qū)別而已崇摄。

max
SingleOutputStreamOperator<Tuple2> sumStream = keyedStream.max(1);
3> (1,5)
4> (2,3)
3> (1,7)
4> (2,4)
3> (1,7)

按照順序,取最大的數(shù)據(jù)

maxBy
SingleOutputStreamOperator<Tuple2> sumStream = keyedStream.maxBy(1);

3> (1,5)
4> (2,3)
3> (1,7)
4> (2,4)
3> (1,7)

有一點要牢記慌烧,數(shù)據(jù)是一直流過來的逐抑,這些聚合方法都是在每次收到新的數(shù)據(jù)之后,重新計算/比較得出來的結果屹蚊,而不是只有一個最終結果厕氨。

PS:有人評論說哪兒來的f0进每、f1,只能這里貼個圖了...


image.png
最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末命斧,一起剝皮案震驚了整個濱河市田晚,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌国葬,老刑警劉巖贤徒,帶你破解...
    沈念sama閱讀 217,734評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異汇四,居然都是意外死亡接奈,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,931評論 3 394
  • 文/潘曉璐 我一進店門通孽,熙熙樓的掌柜王于貴愁眉苦臉地迎上來序宦,“玉大人,你說我怎么就攤上這事背苦』グ疲” “怎么了?”我有些...
    開封第一講書人閱讀 164,133評論 0 354
  • 文/不壞的土叔 我叫張陵行剂,是天一觀的道長互拾。 經(jīng)常有香客問我九昧,道長领跛,這世上最難降的妖魔是什么夺衍? 我笑而不...
    開封第一講書人閱讀 58,532評論 1 293
  • 正文 為了忘掉前任泡态,我火速辦了婚禮叠萍,結果婚禮上夭禽,老公的妹妹穿的比我還像新娘贡茅。我一直安慰自己城菊,他們只是感情好备燃,可當我...
    茶點故事閱讀 67,585評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著凌唬,像睡著了一般并齐。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上客税,一...
    開封第一講書人閱讀 51,462評論 1 302
  • 那天况褪,我揣著相機與錄音,去河邊找鬼更耻。 笑死测垛,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的秧均。 我是一名探鬼主播食侮,決...
    沈念sama閱讀 40,262評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼号涯,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了锯七?” 一聲冷哼從身側響起链快,我...
    開封第一講書人閱讀 39,153評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎眉尸,沒想到半個月后域蜗,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,587評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡噪猾,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,792評論 3 336
  • 正文 我和宋清朗相戀三年霉祸,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片畏妖。...
    茶點故事閱讀 39,919評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡脉执,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出戒劫,到底是詐尸還是另有隱情半夷,我是刑警寧澤,帶...
    沈念sama閱讀 35,635評論 5 345
  • 正文 年R本政府宣布迅细,位于F島的核電站巫橄,受9級特大地震影響,放射性物質發(fā)生泄漏茵典。R本人自食惡果不足惜湘换,卻給世界環(huán)境...
    茶點故事閱讀 41,237評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望统阿。 院中可真熱鬧彩倚,春花似錦、人聲如沸扶平。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,855評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽结澄。三九已至哥谷,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間麻献,已是汗流浹背们妥。 一陣腳步聲響...
    開封第一講書人閱讀 32,983評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留勉吻,地道東北人监婶。 一個月前我還...
    沈念sama閱讀 48,048評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像餐曼,于是被迫代替她去往敵國和親压储。 傳聞我的和親對象是個殘疾皇子鲜漩,可洞房花燭夜當晚...
    茶點故事閱讀 44,864評論 2 354

推薦閱讀更多精彩內(nèi)容