前言
如上圖勿她,算子术奖,也叫operation或transformation腰奋,是編寫業(yè)務(wù)邏輯的核心單元蝠引,類似于Spark中的RDD阳谍。
本文會(huì)參考Flink Operators?通過(guò)實(shí)例的方式蛀柴,以DataStream API來(lái)講解,如有不對(duì)的地方矫夯,請(qǐng)給我留言鸽疾。
注:流處理和批處理的一個(gè)重要區(qū)別是,流處理是“rolling”训貌,也就是說(shuō)數(shù)據(jù)會(huì)不斷的源源流入制肮,不像批處理,是一次性獲取所有數(shù)據(jù)旺订,然后再一起做處理弄企。
一、Map
DataStream --> DataStream:輸入一個(gè)參數(shù)產(chǎn)生一個(gè)參數(shù)区拳,map的功能是對(duì)輸入的參數(shù)進(jìn)行轉(zhuǎn)換操作拘领。
Map算子是一進(jìn)一出,如下圖所示
代碼:input:[0,1,2,3,4,5,6,7,8,9,10]樱调,
output:[100,101,102,103,104,105,106,107,108,109,110]
public class TestMap {
public static void main(String[] args)throws Exception{
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
? ? ? ? DataStream input = env.generateSequence(0,10);
? ? ? ? DataStream plusOne = input.map(new MapFunction() {
@Override
? ? ? ? ? ? public Longmap(Long value)throws Exception {
System.out.println("-----------" + value);
? ? ? ? ? ? ? ? return value+100;
? ? ? ? ? ? }
});
? ? ? ? plusOne.print();
? ? ? ? env.execute();
? ? }
}
二约素、FlatMap
DataStream --> DataStream:輸入一個(gè)參數(shù),產(chǎn)生0笆凌、1或者多個(gè)輸出圣猎,這個(gè)多用于拆分操作。
flatMap是一進(jìn)多出乞而,最常見(jiàn)的例子就是wordcount:
public static void main(String[] args)throws Exception{
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
? ? DataStream input = env.fromElements(WORDS);
? ? DataStream wordStream = input.flatMap(new FlatMapFunction() {
@Override
? ? ? ? public void flatMap(String value, Collector out)throws Exception {
String[] tokens = value.toLowerCase().split("\\W+");
? ? ? ? ? ? for(String word : tokens){
if(word.length()>0) {
out.collect(word);
? ? ? ? ? ? ? ? }
}
}
});
? ? wordStream.print();
? ? env.execute("w");
}
三送悔、Filter
DataStream --> DataStream:結(jié)算每個(gè)元素的布爾值,并返回為true的元素
public class TestFilter {
public static void main(String[] args)throws Exception{
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
? ? ? ? DataStream input = env.generateSequence(-5,5);
? ? ? ? input.filter(new FilterFunction() {
@Override
? ? ? ? ? ? public boolean filter(Long value)throws Exception {
return value>0;
? ? ? ? ? ? }
}).print();
? ? ? ? //input.print();
? ? ? ? env.execute();
? ? }
}
四爪模、KeyBy
DataSteam --> DataStream:邏輯地將一個(gè)流拆分成不相交的分區(qū)欠啤,每個(gè)分區(qū)包含具有相同key的元素,在內(nèi)部以hash的形式實(shí)現(xiàn)的屋灌。以key來(lái)分組洁段。
注意:以下類型無(wú)法作為key
1. POJO類,且沒(méi)有實(shí)現(xiàn)hashCode函數(shù)
2. 任意形式的數(shù)組類型
key的創(chuàng)建參考:Keyed DataStream
KeyBy是根據(jù)key來(lái)進(jìn)行分類共郭,類似SQL中的groupBy祠丝,分類之后就可以求最大值、最小值除嘹、平均值写半、求和等
public class TestKeyBy {
? ? public static void main(String[] args) throws Exception{
? ? ? ? final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
? ? ? ? DataStream<Tuple4<String,String,String,Integer>> input = env.fromElements(TRANSCRIPT);
? ? ? ? System.out.println("----------" + input.getParallelism());
? ? ? ? KeyedStream<Tuple4<String,String,String,Integer>,Tuple> keyedStream = input.keyBy("f0");
? ? ? ? keyedStream.maxBy("f3").print();
? ? ? ? env.execute();
? ? }
? ? public static final Tuple4[] TRANSCRIPT = new Tuple4[]{
? ? ? ? ? ? Tuple4.of("class1","張三","語(yǔ)文",100),
? ? ? ? ? ? Tuple4.of("class1","李四","語(yǔ)文",78),
? ? ? ? ? ? Tuple4.of("class1","王五","語(yǔ)文",99),
? ? ? ? ? ? Tuple4.of("class2","趙六","語(yǔ)文",81),
? ? ? ? ? ? Tuple4.of("class2","錢七","語(yǔ)文",59),
? ? ? ? ? ? Tuple4.of("class2","馬二","語(yǔ)文",97)
? ? };
}
五、reduce
KeyedStream --> DataStream:滾動(dòng)合并操作尉咕,合并當(dāng)前元素和上一次合并的元素結(jié)果污朽。
實(shí)例:
輸出:將f3列的成績(jī)疊加
六、fold
KeyedStream --> DataStream:用一個(gè)初始的一個(gè)值龙考,與其每個(gè)元素進(jìn)行滾動(dòng)合并操作蟆肆。相當(dāng)于是一次折疊操作,這個(gè)算子在新的API中已經(jīng)去除晦款,比較雞肋炎功。
public class TestFold {
public static void main(String[] args)throws Exception{
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
? ? ? ? DataStream> input = env.fromElements(TRANSCRIPT);
? ? ? ? DataStream result = input.keyBy(0).fold("start", new FoldFunction, String>() {
@Override
? ? ? ? ? ? public Stringfold(String accumulator, Tuple4 value)throws Exception {
return accumulator +"=" + value.f1;
? ? ? ? ? ? }
});
? ? ? ? result.print();
? ? ? ? env.execute();
? ? }
public static final Tuple4[]TRANSCRIPT =new Tuple4[]{
Tuple4.of("class1","張三","語(yǔ)文",100),
? ? ? ? ? ? Tuple4.of("class1","李四","語(yǔ)文",78),
? ? ? ? ? ? Tuple4.of("class1","王五","語(yǔ)文",99),
? ? ? ? ? ? Tuple4.of("class2","趙六","語(yǔ)文",81),
? ? ? ? ? ? Tuple4.of("class2","錢七","語(yǔ)文",59),
? ? ? ? ? ? Tuple4.of("class2","馬二","語(yǔ)文",97)
};
}
輸出:
七、aggregation
KeyedStream --> DataStream:分組流數(shù)據(jù)的滾動(dòng)聚合操作:min和minBy的區(qū)別是min返回的是一個(gè)最小值缓溅,而minBy返回的是其字段中包含的最小值的元素(同樣元原理適用于max和maxBy)
八蛇损、iterate
DataStream --> IterativeStream --> DataStream:在流程中創(chuàng)建一個(gè)反饋循環(huán)哆姻,將一個(gè)操作的輸出重定向到之前的操作业栅,這對(duì)于定義持續(xù)更新模型的算法來(lái)說(shuō)很有意義的。
九涣狗、aggregation on windows
WindowedStream --> DataStream:對(duì)window的元素做聚合操作袜匿,min和minBy的區(qū)別是min返回的是最小值更啄,而minBy返回的是包含最小值字段的元素。(同樣原理適用于max和maxBy)
十居灯、connect 和union的區(qū)別
DataStream祭务,DataStream --> ConnectedStreams:連接兩個(gè)保持她們類型的數(shù)據(jù)流,各自分析怪嫌,并且雙流之間可以共享狀態(tài)(比如計(jì)數(shù))义锥,這在第一個(gè)流的輸入會(huì)影響第二個(gè)流時(shí),非常有用岩灭。
DataStream*??-->?DataStream:連接兩個(gè)及以上相同的數(shù)據(jù)流拌倍,合并多個(gè)流,新的流包含所有輸入的流噪径。
注意:如果將一個(gè)DataStream和自己做union操作柱恤,在新的DataStream中,將看到每個(gè)元素重復(fù)兩次
使用的算子是coMap熄云、coFlatMap膨更,類似于Map、FlatMap缴允,只不過(guò)作用在ConnectedStreams荚守。
public class TestConnect {
public static void main(String[] args)throws Exception{
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
? ? ? ? DataStream somestream = env.generateSequence(0,10);
? ? ? ? DataStream otherstream = env.fromElements(WORDS);
? ? ? ? ConnectedStreams connectedStreams = somestream.connect(otherstream);
? ? ? ? DataStream result = connectedStreams.flatMap(new CoFlatMapFunction() {
@Override
? ? ? ? ? ? public void flatMap1(Long value, Collector out)throws Exception {
out.collect(value.toString());
? ? ? ? ? ? }
@Override
? ? ? ? ? ? public void flatMap2(String value, Collector out)throws Exception {
for(String word : value.split("\\W+")){
if(word.length()>0){
out.collect(word);
? ? ? ? ? ? ? ? ? ? }
}
}
});
? ? ? ? result.print();
? ? ? ? env.execute();
? ? }
public static final String[]WORDS =new String[]{
"To be, or not to be,--that is the question:--",
? ? ? ? ? ? "Whether 'tis nobler in the mind to suffer",
? ? ? ? ? ? "The slings and arrows of outrageous fortune",
? ? ? ? ? ? "Or to take arms against a sea of troubles,",
? ? ? ? ? ? "And by opposing end them?--To die,--to sleep,--",
? ? ? ? ? ? "No more; and by a sleep to say we end",
? ? ? ? ? ? "The heartache, and the thousand natural shocks",
? ? ? ? ? ? "That flesh is heir to,--'tis a consummation",
? ? ? ? ? ? "Devoutly to be wish'd. To die,--to sleep;--",
? ? ? ? ? ? "To sleep! perchance to dream:--ay, there's the rub;",
? ? ? ? ? ? "For in that sleep of death what dreams may come,",
? ? ? ? ? ? "When we have shuffled off this mortal coil,",
? ? ? ? ? ? "Must give us pause: there's the respect",
? ? ? ? ? ? "That makes calamity of so long life;",
? ? ? ? ? ? "For who would bear the whips and scorns of time,",
? ? ? ? ? ? "The oppressor's wrong, the proud man's contumely,",
? ? ? ? ? ? "The pangs of despis'd love, the law's delay,",
? ? ? ? ? ? "The insolence of office, and the spurns",
? ? ? ? ? ? "That patient merit of the unworthy takes,",
? ? ? ? ? ? "When he himself might his quietus make",
? ? ? ? ? ? "With a bare bodkin? who would these fardels bear,",
? ? ? ? ? ? "To grunt and sweat under a weary life,",
? ? ? ? ? ? "But that the dread of something after death,--",
? ? ? ? ? ? "The undiscover'd country, from whose bourn",
? ? ? ? ? ? "No traveller returns,--puzzles the will,",
? ? ? ? ? ? "And makes us rather bear those ills we have",
? ? ? ? ? ? "Than fly to others that we know not of?",
? ? ? ? ? ? "Thus conscience does make cowards of us all;",
? ? ? ? ? ? "And thus the native hue of resolution",
? ? ? ? ? ? "Is sicklied o'er with the pale cast of thought;",
? ? ? ? ? ? "And enterprises of great pith and moment,",
? ? ? ? ? ? "With this regard, their currents turn awry,",
? ? ? ? ? ? "And lose the name of action.--Soft you now!",
? ? ? ? ? ? "The fair Ophelia!--Nymph, in thy orisons",
? ? ? ? ? ? "Be all my sins remember'd."
? ? };
}
輸出:
第十一、split和select
split:DataStream --> SplitStream练般,按照指定標(biāo)準(zhǔn)將指定的DataStream拆分成多個(gè)SplitStream矗漾。
select:SplitStream?-->?DataStream,跟split搭配使用薄料,從SplitStream中選擇一個(gè)或多個(gè)流敞贡。
public class TestSplitAndSelect {
public static void main(String[] args)throws Exception{
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
? ? ? ? DataStream input = env.generateSequence(0,10);
? ? ? ? SplitStream splitStream = input.split(new OutputSelector() {
@Override
? ? ? ? ? ? public Iterableselect(Long value) {
List output =new ArrayList();
? ? ? ? ? ? ? ? if(value %2 ==0){
output.add("even");
? ? ? ? ? ? ? ? }else {
output.add("odd");
? ? ? ? ? ? ? ? }
return output;
? ? ? ? ? ? }
});
? ? ? ? //splitStream.print();
? ? ? ? DataStream even = splitStream.select("even");
? ? ? ? DataStream odd? = splitStream.select("odd");
? ? ? ? DataStream all? = splitStream.select("even","odd");
? ? ? ? even.print();
? ? ? ? //odd.print();
????????//all.print();
? ? ? ? env.execute();
? ? }
}
打印偶數(shù):
打印奇數(shù):
打印全部(奇偶數(shù)):
第十二、project
從Tuple中選擇屬性的子集摄职,即僅限event數(shù)據(jù)類型為Tuple的DataStream
注意:只有java API
使用場(chǎng)景:ETL時(shí)刪減計(jì)算過(guò)程中不需要的字段
public class TestProject {
public static void main(String[] args)throws Exception{
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
? ? ? ? DataStream> input = env.fromElements(TRANSCRIPT);
? ? ? ? DataStream> out = input.project(1,3);
? ? ? ? out.print();
? ? ? ? env.execute();
? ? }
public static final Tuple4[]TRANSCRIPT =new Tuple4[]{
Tuple4.of("class1","張三","語(yǔ)文",100),
? ? ? ? ? ? Tuple4.of("class1","李四","語(yǔ)文",78),
? ? ? ? ? ? Tuple4.of("class1","王五","語(yǔ)文",99),
? ? ? ? ? ? Tuple4.of("class2","趙六","語(yǔ)文",81),
? ? ? ? ? ? Tuple4.of("class2","錢七","語(yǔ)文",59),
? ? ? ? ? ? Tuple4.of("class2","馬二","語(yǔ)文",97)
};
}
輸出:
第十三誊役、MapPartition
類似Map获列,一次僅處理一個(gè)分區(qū)的數(shù)據(jù)
并行度為2:
并行度為4:
第十四、Distinct蛔垢,去重
返回?cái)?shù)據(jù)中不相同的元素击孩,可以指定去重所依據(jù)的字段
根據(jù)第一個(gè)元素去重:
不指定去重字段的話,就是全元素匹配:
第十五鹏漆、SortPartition巩梢,分區(qū)內(nèi)排序
分區(qū)和分組是兩個(gè)不同的概念,不要混淆艺玲。
下面的例子是先在第一個(gè)字段升序括蝠,如果第一個(gè)字段相同,則根據(jù)第二個(gè)字段降序
輸出:
第十六饭聚、Join(Default/Inner Join)
場(chǎng)景一:默認(rèn)是等值連接忌警,就是inner join
輸出:
場(chǎng)景二:按照自定義類,格式化輸出若治,如(用戶id慨蓝,用戶名,城市名)
輸出:
第十七端幼、Outer Join
場(chǎng)景一礼烈、left outer join
輸出:
場(chǎng)景二、right outer join
輸出:
場(chǎng)景三婆跑、full outer join
輸出:
第十八此熬、笛卡爾積Cross
第十九、union
Flink源碼:https://github.com/apache/flink
Flink官網(wǎng):https://ci.apache.org/projects/flink/flink-docs-release-1.12/