Apache Flink 學(xué)習(xí)筆記(一)

最近在項(xiàng)目中需要用到Flink里初,關(guān)于Flink的基本介紹就不啰嗦了锅睛,官方文檔傳送門 疚膊。

由于是第一次接觸义辕,我花了一些時(shí)間整理了幾個(gè)小demo(java)當(dāng)作筆記。對(duì)Flink很多地方的理解有些片面甚至錯(cuò)誤的寓盗,路過(guò)的朋友權(quán)當(dāng)參考灌砖,不能保證說(shuō)得都對(duì)。

之前接觸過(guò)Spark的都知道傀蚌,數(shù)據(jù)處理是在RDD中進(jìn)行的(無(wú)論是批處理還是流處理)基显。Flink則不同,批處理用DataSet善炫,流處理用DataStream撩幽,而且批處理和流處理的api也是不一樣的销部。

先來(lái)看一下第一個(gè)demo 經(jīng)典的 word count

我筆記中的例子都是基于 JDK1.8 ,F(xiàn)link 1.6 編寫的

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

import java.util.List;

/**
 * 從本地文件讀取字符串酱虎,按空格分割單詞,統(tǒng)計(jì)每個(gè)分詞出現(xiàn)的次數(shù)并輸出
 */
public class Demo1 {
    public static void main(String[] args) {
        //獲取執(zhí)行環(huán)境 ExecutionEnvironment (批處理用這個(gè)對(duì)象)
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        
        //加載數(shù)據(jù)源到 DataSet
        DataSet<String> text = env.readTextFile("test.txt");
        DataSet<Tuple2<String, Integer>> counts =
                text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                        //s 即從文本中讀取到一行字符串,按空格分割后得到數(shù)組tokens
                        String[] tokens = s.toLowerCase().split("\\s+");
                        for (String token : tokens) {
                            if (token.length() > 0) {
                                //初始化每一個(gè)單詞杰捂,保存為元祖對(duì)象
                                collector.collect(new Tuple2<String, Integer>(token, 1));
                            }
                        }
                    }
                })
                        .groupBy(0) //0表示Tuple2<String, Integer> 中的第一個(gè)元素嫁佳,即分割后的單詞
                        .aggregate(Aggregations.SUM, 1); //同理蒿往,1表示Tuple2<String, Integer> 中的第二個(gè)元素瓤漏,即出現(xiàn)次數(shù)

        try {
            //從DataSet 中獲得集合蔬充,并遍歷
            List<Tuple2<String,Integer>> list = counts.collect();
            for (Tuple2<String,Integer> tuple2:list){
                System.out.println(tuple2.f0 + ":" + tuple2.f1);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

其中娃惯,groupBy(0) 表示按照DataSet中保存的元祖的第一個(gè)字段分組趾浅,aggregate 是聚合函數(shù)皿哨,Aggregations.SUM 指定了求和证膨,1 表示對(duì)元祖的第二個(gè)字段進(jìn)行求和計(jì)算央勒。

//test.txt 
hello world
flink demo
this is a flink demo file
//控制臺(tái)輸出
demo:2
is:1
this:1
a:1
file:1
world:1
hello:1
flink:2

可以看到,Flink程序已經(jīng)成功工作了井濒。但是有一個(gè)問(wèn)題瑞你,DataSet中的對(duì)象使用元祖Tuple來(lái)保存的者甲,如果字段比較多虏缸,肯定不如pojo 更加方便刀疙,所以第二個(gè)demo 我用pojo來(lái)改造一下谦秧。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.List;

/**
 * 用pojo 改造 demo1
 */
public class Demo2 {
    public static void main(String[] args) {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> text = env.readTextFile("test.txt");
        //用 WordWithCount 保存單詞和次數(shù)信息
        DataSet<WordWithCount> counts =
                text.flatMap(new FlatMapFunction<String, WordWithCount>() {
                    @Override
                    public void flatMap(String s, Collector<WordWithCount> collector) throws Exception {
                        String[] tokens = s.toLowerCase().split("\\s+");
                        for (String token : tokens) {
                            if (token.length() > 0) {
                                collector.collect(new WordWithCount(token, 1));
                            }
                        }
                    }
                })
                        .groupBy("word")//直接指定字段名稱
                        .reduce(new ReduceFunction<WordWithCount>() {
                            @Override
                            public WordWithCount reduce(WordWithCount wc, WordWithCount t1) throws Exception {
                                  return new WordWithCount(wc.word, wc.count + t1.count);
                            }
                        });
        try {
            List<WordWithCount> list = counts.collect();
            for (WordWithCount wc: list) {
                System.out.println(wc.toString());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // pojo
    public static class WordWithCount {
        public String word;
        public long count;

        public WordWithCount() {
        }

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}

運(yùn)行結(jié)果和demo1完全一致缘挑。但是你可能會(huì)注意到语淘,demo1中的aggregate聚合函數(shù)被替換成了reduce姑蓝,這是因?yàn)?code>aggregate函數(shù)只接受int來(lái)表示filed吕粗。同時(shí)颅筋,.groupBy(0) 也相應(yīng)改成用.groupBy("word")直接指定字段占贫。

請(qǐng)注意靶剑,如果你的pojo demo 運(yùn)行失敗桩引,你可能需要做以下檢查工作:
1坑匠、pojo 有沒(méi)有聲明為public厘灼,如果是內(nèi)部類必須是static
2舰讹、有沒(méi)有為pojo創(chuàng)建一個(gè)無(wú)參的構(gòu)造函數(shù)
3月匣、有沒(méi)有聲明pojo的字段為public锄开,或者生成publicgetset方法
4癣诱、必須使用Flink 支持的數(shù)據(jù)類型

如果你有提供publicget,set 方法享潜,比如:

public String getWord() {
    return word;
}

public void setWord(String word) {
    this.word = word;
}

那么疾就,.groupBy("word") 還可以用.groupBy(WordWithCount::getWord)替換

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子鼠冕,更是在濱河造成了極大的恐慌,老刑警劉巖博脑,帶你破解...
    沈念sama閱讀 216,651評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異沈善,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)罩润,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,468評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門应媚,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)消玄,“玉大人,你說(shuō)我怎么就攤上這事携龟。” “怎么了仅乓?”我有些...
    開(kāi)封第一講書人閱讀 162,931評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵裕偿,是天一觀的道長(zhǎng)劲腿。 經(jīng)常有香客問(wèn)我,道長(zhǎng)鸟妙,這世上最難降的妖魔是什么焦人? 我笑而不...
    開(kāi)封第一講書人閱讀 58,218評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮重父,結(jié)果婚禮上花椭,老公的妹妹穿的比我還像新娘。我一直安慰自己房午,他們只是感情好矿辽,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,234評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著郭厌,像睡著了一般袋倔。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上折柠,一...
    開(kāi)封第一講書人閱讀 51,198評(píng)論 1 299
  • 那天宾娜,我揣著相機(jī)與錄音,去河邊找鬼扇售。 笑死前塔,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的缘眶。 我是一名探鬼主播嘱根,決...
    沈念sama閱讀 40,084評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼巷懈!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起慌洪,我...
    開(kāi)封第一講書人閱讀 38,926評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤顶燕,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后冈爹,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體涌攻,經(jīng)...
    沈念sama閱讀 45,341評(píng)論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,563評(píng)論 2 333
  • 正文 我和宋清朗相戀三年频伤,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了恳谎。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,731評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖因痛,靈堂內(nèi)的尸體忽然破棺而出婚苹,到底是詐尸還是另有隱情,我是刑警寧澤鸵膏,帶...
    沈念sama閱讀 35,430評(píng)論 5 343
  • 正文 年R本政府宣布膊升,位于F島的核電站,受9級(jí)特大地震影響谭企,放射性物質(zhì)發(fā)生泄漏廓译。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,036評(píng)論 3 326
  • 文/蒙蒙 一债查、第九天 我趴在偏房一處隱蔽的房頂上張望非区。 院中可真熱鬧,春花似錦盹廷、人聲如沸院仿。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,676評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)歹垫。三九已至,卻和暖如春颠放,著一層夾襖步出監(jiān)牢的瞬間排惨,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 32,829評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工碰凶, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留暮芭,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,743評(píng)論 2 368
  • 正文 我出身青樓欲低,卻偏偏與公主長(zhǎng)得像辕宏,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子砾莱,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,629評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容