最近在項(xiàng)目中需要用到Flink
里初,關(guān)于Flink
的基本介紹就不啰嗦了锅睛,官方文檔傳送門 疚膊。
由于是第一次接觸义辕,我花了一些時(shí)間整理了幾個(gè)小demo(java)
當(dāng)作筆記。對(duì)Flink很多地方的理解有些片面甚至錯(cuò)誤的寓盗,路過(guò)的朋友權(quán)當(dāng)參考灌砖,不能保證說(shuō)得都對(duì)。
之前接觸過(guò)Spark
的都知道傀蚌,數(shù)據(jù)處理是在RDD
中進(jìn)行的(無(wú)論是批處理還是流處理)基显。Flink
則不同,批處理用DataSet
善炫,流處理用DataStream
撩幽,而且批處理和流處理的api
也是不一樣的销部。
先來(lái)看一下第一個(gè)demo
經(jīng)典的 word count
我筆記中的例子都是基于 JDK1.8 ,F(xiàn)link 1.6 編寫的
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import java.util.List;
/**
* 從本地文件讀取字符串酱虎,按空格分割單詞,統(tǒng)計(jì)每個(gè)分詞出現(xiàn)的次數(shù)并輸出
*/
public class Demo1 {
public static void main(String[] args) {
//獲取執(zhí)行環(huán)境 ExecutionEnvironment (批處理用這個(gè)對(duì)象)
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//加載數(shù)據(jù)源到 DataSet
DataSet<String> text = env.readTextFile("test.txt");
DataSet<Tuple2<String, Integer>> counts =
text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
//s 即從文本中讀取到一行字符串,按空格分割后得到數(shù)組tokens
String[] tokens = s.toLowerCase().split("\\s+");
for (String token : tokens) {
if (token.length() > 0) {
//初始化每一個(gè)單詞杰捂,保存為元祖對(duì)象
collector.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
})
.groupBy(0) //0表示Tuple2<String, Integer> 中的第一個(gè)元素嫁佳,即分割后的單詞
.aggregate(Aggregations.SUM, 1); //同理蒿往,1表示Tuple2<String, Integer> 中的第二個(gè)元素瓤漏,即出現(xiàn)次數(shù)
try {
//從DataSet 中獲得集合蔬充,并遍歷
List<Tuple2<String,Integer>> list = counts.collect();
for (Tuple2<String,Integer> tuple2:list){
System.out.println(tuple2.f0 + ":" + tuple2.f1);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
其中娃惯,groupBy(0)
表示按照DataSet
中保存的元祖的第一個(gè)字段分組趾浅,aggregate
是聚合函數(shù)皿哨,Aggregations.SUM
指定了求和证膨,1 表示對(duì)元祖的第二個(gè)字段進(jìn)行求和計(jì)算央勒。
//test.txt
hello world
flink demo
this is a flink demo file
//控制臺(tái)輸出
demo:2
is:1
this:1
a:1
file:1
world:1
hello:1
flink:2
可以看到,Flink
程序已經(jīng)成功工作了井濒。但是有一個(gè)問(wèn)題瑞你,DataSet
中的對(duì)象使用元祖Tuple
來(lái)保存的者甲,如果字段比較多虏缸,肯定不如pojo
更加方便刀疙,所以第二個(gè)demo
我用pojo
來(lái)改造一下谦秧。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.List;
/**
* 用pojo 改造 demo1
*/
public class Demo2 {
public static void main(String[] args) {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.readTextFile("test.txt");
//用 WordWithCount 保存單詞和次數(shù)信息
DataSet<WordWithCount> counts =
text.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String s, Collector<WordWithCount> collector) throws Exception {
String[] tokens = s.toLowerCase().split("\\s+");
for (String token : tokens) {
if (token.length() > 0) {
collector.collect(new WordWithCount(token, 1));
}
}
}
})
.groupBy("word")//直接指定字段名稱
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount wc, WordWithCount t1) throws Exception {
return new WordWithCount(wc.word, wc.count + t1.count);
}
});
try {
List<WordWithCount> list = counts.collect();
for (WordWithCount wc: list) {
System.out.println(wc.toString());
}
} catch (Exception e) {
e.printStackTrace();
}
}
// pojo
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {
}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
運(yùn)行結(jié)果和demo1
完全一致缘挑。但是你可能會(huì)注意到语淘,demo1
中的aggregate
聚合函數(shù)被替換成了reduce
姑蓝,這是因?yàn)?code>aggregate函數(shù)只接受int
來(lái)表示filed
吕粗。同時(shí)颅筋,.groupBy(0)
也相應(yīng)改成用.groupBy("word")
直接指定字段占贫。
請(qǐng)注意靶剑,如果你的pojo demo
運(yùn)行失敗桩引,你可能需要做以下檢查工作:
1坑匠、pojo
有沒(méi)有聲明為public
厘灼,如果是內(nèi)部類必須是static
的
2舰讹、有沒(méi)有為pojo
創(chuàng)建一個(gè)無(wú)參的構(gòu)造函數(shù)
3月匣、有沒(méi)有聲明pojo
的字段為public
锄开,或者生成public
的get
,set
方法
4癣诱、必須使用Flink 支持的數(shù)據(jù)類型
如果你有提供public
的 get
,set
方法享潜,比如:
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
那么疾就,.groupBy("word")
還可以用.groupBy(WordWithCount::getWord)
替換