Flink for java之三:算子串燒Operators

前言

如上圖勿她,算子术奖,也叫operation或transformation腰奋,是編寫業(yè)務(wù)邏輯的核心單元蝠引,類似于Spark中的RDD阳谍。

本文會(huì)參考Flink Operators?通過(guò)實(shí)例的方式蛀柴,以DataStream API來(lái)講解,如有不對(duì)的地方矫夯,請(qǐng)給我留言鸽疾。

注:流處理和批處理的一個(gè)重要區(qū)別是,流處理是“rolling”训貌,也就是說(shuō)數(shù)據(jù)會(huì)不斷的源源流入制肮,不像批處理,是一次性獲取所有數(shù)據(jù)旺订,然后再一起做處理弄企。

一、Map

DataStream --> DataStream:輸入一個(gè)參數(shù)產(chǎn)生一個(gè)參數(shù)区拳,map的功能是對(duì)輸入的參數(shù)進(jìn)行轉(zhuǎn)換操作拘领。

Map算子是一進(jìn)一出,如下圖所示


代碼:input:[0,1,2,3,4,5,6,7,8,9,10]樱调,

output:[100,101,102,103,104,105,106,107,108,109,110]

public class TestMap {

public static void main(String[] args)throws Exception{

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

? ? ? ? DataStream input = env.generateSequence(0,10);

? ? ? ? DataStream plusOne = input.map(new MapFunction() {

@Override

? ? ? ? ? ? public Longmap(Long value)throws Exception {

System.out.println("-----------" + value);

? ? ? ? ? ? ? ? return value+100;

? ? ? ? ? ? }

});

? ? ? ? plusOne.print();

? ? ? ? env.execute();

? ? }

}


二约素、FlatMap

DataStream --> DataStream:輸入一個(gè)參數(shù),產(chǎn)生0笆凌、1或者多個(gè)輸出圣猎,這個(gè)多用于拆分操作。

flatMap是一進(jìn)多出乞而,最常見(jiàn)的例子就是wordcount:

public static void main(String[] args)throws Exception{

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

? ? DataStream input = env.fromElements(WORDS);

? ? DataStream wordStream = input.flatMap(new FlatMapFunction() {

@Override

? ? ? ? public void flatMap(String value, Collector out)throws Exception {

String[] tokens = value.toLowerCase().split("\\W+");

? ? ? ? ? ? for(String word : tokens){

if(word.length()>0) {

out.collect(word);

? ? ? ? ? ? ? ? }

}

}

});

? ? wordStream.print();

? ? env.execute("w");

}


三送悔、Filter

DataStream --> DataStream:結(jié)算每個(gè)元素的布爾值,并返回為true的元素



public class TestFilter {

public static void main(String[] args)throws Exception{

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

? ? ? ? DataStream input = env.generateSequence(-5,5);

? ? ? ? input.filter(new FilterFunction() {

@Override

? ? ? ? ? ? public boolean filter(Long value)throws Exception {

return value>0;

? ? ? ? ? ? }

}).print();

? ? ? ? //input.print();

? ? ? ? env.execute();

? ? }

}

四爪模、KeyBy

DataSteam --> DataStream:邏輯地將一個(gè)流拆分成不相交的分區(qū)欠啤,每個(gè)分區(qū)包含具有相同key的元素,在內(nèi)部以hash的形式實(shí)現(xiàn)的屋灌。以key來(lái)分組洁段。

注意:以下類型無(wú)法作為key

1. POJO類,且沒(méi)有實(shí)現(xiàn)hashCode函數(shù)

2. 任意形式的數(shù)組類型

key的創(chuàng)建參考:Keyed DataStream

KeyBy是根據(jù)key來(lái)進(jìn)行分類共郭,類似SQL中的groupBy祠丝,分類之后就可以求最大值、最小值除嘹、平均值写半、求和等

public class TestKeyBy {

? ? public static void main(String[] args) throws Exception{

? ? ? ? final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

? ? ? ? DataStream<Tuple4<String,String,String,Integer>> input = env.fromElements(TRANSCRIPT);

? ? ? ? System.out.println("----------" + input.getParallelism());

? ? ? ? KeyedStream<Tuple4<String,String,String,Integer>,Tuple> keyedStream = input.keyBy("f0");

? ? ? ? keyedStream.maxBy("f3").print();

? ? ? ? env.execute();

? ? }

? ? public static final Tuple4[] TRANSCRIPT = new Tuple4[]{

? ? ? ? ? ? Tuple4.of("class1","張三","語(yǔ)文",100),

? ? ? ? ? ? Tuple4.of("class1","李四","語(yǔ)文",78),

? ? ? ? ? ? Tuple4.of("class1","王五","語(yǔ)文",99),

? ? ? ? ? ? Tuple4.of("class2","趙六","語(yǔ)文",81),

? ? ? ? ? ? Tuple4.of("class2","錢七","語(yǔ)文",59),

? ? ? ? ? ? Tuple4.of("class2","馬二","語(yǔ)文",97)

? ? };

}

五、reduce

KeyedStream --> DataStream:滾動(dòng)合并操作尉咕,合并當(dāng)前元素和上一次合并的元素結(jié)果污朽。

實(shí)例:

輸出:將f3列的成績(jī)疊加


六、fold

KeyedStream --> DataStream:用一個(gè)初始的一個(gè)值龙考,與其每個(gè)元素進(jìn)行滾動(dòng)合并操作蟆肆。相當(dāng)于是一次折疊操作,這個(gè)算子在新的API中已經(jīng)去除晦款,比較雞肋炎功。

public class TestFold {

public static void main(String[] args)throws Exception{

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

? ? ? ? DataStream> input = env.fromElements(TRANSCRIPT);

? ? ? ? DataStream result = input.keyBy(0).fold("start", new FoldFunction, String>() {

@Override

? ? ? ? ? ? public Stringfold(String accumulator, Tuple4 value)throws Exception {

return accumulator +"=" + value.f1;

? ? ? ? ? ? }

});

? ? ? ? result.print();

? ? ? ? env.execute();

? ? }

public static final Tuple4[]TRANSCRIPT =new Tuple4[]{

Tuple4.of("class1","張三","語(yǔ)文",100),

? ? ? ? ? ? Tuple4.of("class1","李四","語(yǔ)文",78),

? ? ? ? ? ? Tuple4.of("class1","王五","語(yǔ)文",99),

? ? ? ? ? ? Tuple4.of("class2","趙六","語(yǔ)文",81),

? ? ? ? ? ? Tuple4.of("class2","錢七","語(yǔ)文",59),

? ? ? ? ? ? Tuple4.of("class2","馬二","語(yǔ)文",97)

};

}

輸出:


七、aggregation

KeyedStream --> DataStream:分組流數(shù)據(jù)的滾動(dòng)聚合操作:min和minBy的區(qū)別是min返回的是一個(gè)最小值缓溅,而minBy返回的是其字段中包含的最小值的元素(同樣元原理適用于max和maxBy)


八蛇损、iterate

DataStream --> IterativeStream --> DataStream:在流程中創(chuàng)建一個(gè)反饋循環(huán)哆姻,將一個(gè)操作的輸出重定向到之前的操作业栅,這對(duì)于定義持續(xù)更新模型的算法來(lái)說(shuō)很有意義的。



九涣狗、aggregation on windows

WindowedStream --> DataStream:對(duì)window的元素做聚合操作袜匿,min和minBy的區(qū)別是min返回的是最小值更啄,而minBy返回的是包含最小值字段的元素。(同樣原理適用于max和maxBy)


十居灯、connect 和union的區(qū)別

DataStream祭务,DataStream --> ConnectedStreams:連接兩個(gè)保持她們類型的數(shù)據(jù)流,各自分析怪嫌,并且雙流之間可以共享狀態(tài)(比如計(jì)數(shù))义锥,這在第一個(gè)流的輸入會(huì)影響第二個(gè)流時(shí),非常有用岩灭。


DataStream*??-->?DataStream:連接兩個(gè)及以上相同的數(shù)據(jù)流拌倍,合并多個(gè)流,新的流包含所有輸入的流噪径。

注意:如果將一個(gè)DataStream和自己做union操作柱恤,在新的DataStream中,將看到每個(gè)元素重復(fù)兩次

使用的算子是coMap熄云、coFlatMap膨更,類似于Map、FlatMap缴允,只不過(guò)作用在ConnectedStreams荚守。

public class TestConnect {

public static void main(String[] args)throws Exception{

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

? ? ? ? DataStream somestream = env.generateSequence(0,10);

? ? ? ? DataStream otherstream = env.fromElements(WORDS);

? ? ? ? ConnectedStreams connectedStreams = somestream.connect(otherstream);

? ? ? ? DataStream result = connectedStreams.flatMap(new CoFlatMapFunction() {

@Override

? ? ? ? ? ? public void flatMap1(Long value, Collector out)throws Exception {

out.collect(value.toString());

? ? ? ? ? ? }

@Override

? ? ? ? ? ? public void flatMap2(String value, Collector out)throws Exception {

for(String word : value.split("\\W+")){

if(word.length()>0){

out.collect(word);

? ? ? ? ? ? ? ? ? ? }

}

}

});

? ? ? ? result.print();

? ? ? ? env.execute();

? ? }

public static final String[]WORDS =new String[]{

"To be, or not to be,--that is the question:--",

? ? ? ? ? ? "Whether 'tis nobler in the mind to suffer",

? ? ? ? ? ? "The slings and arrows of outrageous fortune",

? ? ? ? ? ? "Or to take arms against a sea of troubles,",

? ? ? ? ? ? "And by opposing end them?--To die,--to sleep,--",

? ? ? ? ? ? "No more; and by a sleep to say we end",

? ? ? ? ? ? "The heartache, and the thousand natural shocks",

? ? ? ? ? ? "That flesh is heir to,--'tis a consummation",

? ? ? ? ? ? "Devoutly to be wish'd. To die,--to sleep;--",

? ? ? ? ? ? "To sleep! perchance to dream:--ay, there's the rub;",

? ? ? ? ? ? "For in that sleep of death what dreams may come,",

? ? ? ? ? ? "When we have shuffled off this mortal coil,",

? ? ? ? ? ? "Must give us pause: there's the respect",

? ? ? ? ? ? "That makes calamity of so long life;",

? ? ? ? ? ? "For who would bear the whips and scorns of time,",

? ? ? ? ? ? "The oppressor's wrong, the proud man's contumely,",

? ? ? ? ? ? "The pangs of despis'd love, the law's delay,",

? ? ? ? ? ? "The insolence of office, and the spurns",

? ? ? ? ? ? "That patient merit of the unworthy takes,",

? ? ? ? ? ? "When he himself might his quietus make",

? ? ? ? ? ? "With a bare bodkin? who would these fardels bear,",

? ? ? ? ? ? "To grunt and sweat under a weary life,",

? ? ? ? ? ? "But that the dread of something after death,--",

? ? ? ? ? ? "The undiscover'd country, from whose bourn",

? ? ? ? ? ? "No traveller returns,--puzzles the will,",

? ? ? ? ? ? "And makes us rather bear those ills we have",

? ? ? ? ? ? "Than fly to others that we know not of?",

? ? ? ? ? ? "Thus conscience does make cowards of us all;",

? ? ? ? ? ? "And thus the native hue of resolution",

? ? ? ? ? ? "Is sicklied o'er with the pale cast of thought;",

? ? ? ? ? ? "And enterprises of great pith and moment,",

? ? ? ? ? ? "With this regard, their currents turn awry,",

? ? ? ? ? ? "And lose the name of action.--Soft you now!",

? ? ? ? ? ? "The fair Ophelia!--Nymph, in thy orisons",

? ? ? ? ? ? "Be all my sins remember'd."

? ? };

}

輸出:

第十一、split和select

split:DataStream --> SplitStream练般,按照指定標(biāo)準(zhǔn)將指定的DataStream拆分成多個(gè)SplitStream矗漾。

select:SplitStream?-->?DataStream,跟split搭配使用薄料,從SplitStream中選擇一個(gè)或多個(gè)流敞贡。

public class TestSplitAndSelect {

public static void main(String[] args)throws Exception{

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

? ? ? ? DataStream input = env.generateSequence(0,10);

? ? ? ? SplitStream splitStream = input.split(new OutputSelector() {

@Override

? ? ? ? ? ? public Iterableselect(Long value) {

List output =new ArrayList();

? ? ? ? ? ? ? ? if(value %2 ==0){

output.add("even");

? ? ? ? ? ? ? ? }else {

output.add("odd");

? ? ? ? ? ? ? ? }

return output;

? ? ? ? ? ? }

});

? ? ? ? //splitStream.print();

? ? ? ? DataStream even = splitStream.select("even");

? ? ? ? DataStream odd? = splitStream.select("odd");

? ? ? ? DataStream all? = splitStream.select("even","odd");

? ? ? ? even.print();

? ? ? ? //odd.print();

????????//all.print();

? ? ? ? env.execute();

? ? }

}

打印偶數(shù):

打印奇數(shù):

打印全部(奇偶數(shù)):

第十二、project

從Tuple中選擇屬性的子集摄职,即僅限event數(shù)據(jù)類型為Tuple的DataStream

注意:只有java API

使用場(chǎng)景:ETL時(shí)刪減計(jì)算過(guò)程中不需要的字段

public class TestProject {

public static void main(String[] args)throws Exception{

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

? ? ? ? DataStream> input = env.fromElements(TRANSCRIPT);

? ? ? ? DataStream> out = input.project(1,3);

? ? ? ? out.print();

? ? ? ? env.execute();

? ? }

public static final Tuple4[]TRANSCRIPT =new Tuple4[]{

Tuple4.of("class1","張三","語(yǔ)文",100),

? ? ? ? ? ? Tuple4.of("class1","李四","語(yǔ)文",78),

? ? ? ? ? ? Tuple4.of("class1","王五","語(yǔ)文",99),

? ? ? ? ? ? Tuple4.of("class2","趙六","語(yǔ)文",81),

? ? ? ? ? ? Tuple4.of("class2","錢七","語(yǔ)文",59),

? ? ? ? ? ? Tuple4.of("class2","馬二","語(yǔ)文",97)

};

}

輸出:

第十三誊役、MapPartition

類似Map获列,一次僅處理一個(gè)分區(qū)的數(shù)據(jù)

并行度為2:

并行度為4:


第十四、Distinct蛔垢,去重

返回?cái)?shù)據(jù)中不相同的元素击孩,可以指定去重所依據(jù)的字段

根據(jù)第一個(gè)元素去重:

不指定去重字段的話,就是全元素匹配:

第十五鹏漆、SortPartition巩梢,分區(qū)內(nèi)排序

分區(qū)和分組是兩個(gè)不同的概念,不要混淆艺玲。

下面的例子是先在第一個(gè)字段升序括蝠,如果第一個(gè)字段相同,則根據(jù)第二個(gè)字段降序

輸出:

第十六饭聚、Join(Default/Inner Join)

場(chǎng)景一:默認(rèn)是等值連接忌警,就是inner join

輸出:

場(chǎng)景二:按照自定義類,格式化輸出若治,如(用戶id慨蓝,用戶名,城市名)

輸出:

第十七端幼、Outer Join

場(chǎng)景一礼烈、left outer join


輸出:

場(chǎng)景二、right outer join

輸出:

場(chǎng)景三婆跑、full outer join

輸出:

第十八此熬、笛卡爾積Cross

第十九、union


Flink源碼:https://github.com/apache/flink

Flink官網(wǎng):https://ci.apache.org/projects/flink/flink-docs-release-1.12/

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末滑进,一起剝皮案震驚了整個(gè)濱河市犀忱,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌扶关,老刑警劉巖阴汇,帶你破解...
    沈念sama閱讀 219,539評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異节槐,居然都是意外死亡搀庶,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,594評(píng)論 3 396
  • 文/潘曉璐 我一進(jìn)店門铜异,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)哥倔,“玉大人,你說(shuō)我怎么就攤上這事揍庄∨剌铮” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,871評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)沃测。 經(jīng)常有香客問(wèn)我缭黔,道長(zhǎng),這世上最難降的妖魔是什么蒂破? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,963評(píng)論 1 295
  • 正文 為了忘掉前任试浙,我火速辦了婚禮,結(jié)果婚禮上寞蚌,老公的妹妹穿的比我還像新娘。我一直安慰自己钠糊,他們只是感情好挟秤,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,984評(píng)論 6 393
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著抄伍,像睡著了一般艘刚。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上截珍,一...
    開(kāi)封第一講書(shū)人閱讀 51,763評(píng)論 1 307
  • 那天攀甚,我揣著相機(jī)與錄音,去河邊找鬼岗喉。 笑死秋度,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的钱床。 我是一名探鬼主播荚斯,決...
    沈念sama閱讀 40,468評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼查牌!你這毒婦竟也來(lái)了事期?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,357評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤纸颜,失蹤者是張志新(化名)和其女友劉穎兽泣,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體胁孙,經(jīng)...
    沈念sama閱讀 45,850評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡唠倦,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,002評(píng)論 3 338
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了浊洞。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片牵敷。...
    茶點(diǎn)故事閱讀 40,144評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖法希,靈堂內(nèi)的尸體忽然破棺而出枷餐,到底是詐尸還是另有隱情,我是刑警寧澤苫亦,帶...
    沈念sama閱讀 35,823評(píng)論 5 346
  • 正文 年R本政府宣布毛肋,位于F島的核電站怨咪,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏润匙。R本人自食惡果不足惜诗眨,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,483評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望孕讳。 院中可真熱鬧匠楚,春花似錦、人聲如沸厂财。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,026評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)璃饱。三九已至与斤,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間荚恶,已是汗流浹背撩穿。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,150評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留谒撼,地道東北人食寡。 一個(gè)月前我還...
    沈念sama閱讀 48,415評(píng)論 3 373
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像嗤栓,于是被迫代替她去往敵國(guó)和親冻河。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,092評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容