Flink安裝及使用

本地部署

安裝

  1. 官網(wǎng)安裝Flink锁保,并解壓到/usr/local/flink

    • sudo tar -zxf flink-1.6.2-bin-hadoop27-scala_2.11.tgz -C /usr/local
      cd /usr/local
      
    • 54388226982
  2. 修改文件名字匣砖,并設(shè)置權(quán)限

    • sudo mv ./flink-*/ ./flink
      sudo chown -R hadoop:hadoop ./flink
      

修改配置文件

  • Flink對于本地模式是開箱即用的,如果要修改Java運行環(huán)境带膀,可修改conf/flink-conf.yaml中的env.java.home,設(shè)置為本地java的絕對路徑

添加環(huán)境變量

vim ~/.bashrc
export FLINK_HOME=/usr/local/flink
export PATH=$FLINK_HOME/bin:$PATH
54388242695

啟動Flink

start-cluster.sh
  • 可以通過觀察logs目錄下的日志來檢測系統(tǒng)是否正在運行了
tail log/flink--jobmanager-.log
54388315301
54388290147

可以發(fā)現(xiàn)flink已經(jīng)正常啟動

運行示例

使用Maven創(chuàng)建Flink項目捡偏,在pom.xml中添加以下依賴:

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.6.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.6.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.6.2</version>
        </dependency>
    </dependencies>

批處理運行WordCount

官方示例

可以直接在/usr/local/flink/examples/batch中運行WordCount程序,并且這里還有更多示例:

54388437325

運行:

flink run WordCount.jar 
54388443638

代碼

WordCountData

提供原始數(shù)據(jù)

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;

public class WordCountData {
    public static final String[] WORDS=new String[]{"To be, or not to be,--that is the question:--", "Whether \'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,", "And by opposing end them?--To die,--to sleep,--", "No more; and by a sleep to say we end", "The heartache, and the thousand natural shocks", "That flesh is heir to,--\'tis a consummation", "Devoutly to be wish\'d. To die,--to sleep;--", "To sleep! perchance to dream:--ay, there\'s the rub;", "For in that sleep of death what dreams may come,", "When we have shuffled off this mortal coil,", "Must give us pause: there\'s the respect", "That makes calamity of so long life;", "For who would bear the whips and scorns of time,", "The oppressor\'s wrong, the proud man\'s contumely,", "The pangs of despis\'d love, the law\'s delay,", "The insolence of office, and the spurns", "That patient merit of the unworthy takes,", "When he himself might his quietus make", "With a bare bodkin? who would these fardels bear,", "To grunt and sweat under a weary life,", "But that the dread of something after death,--", "The undiscover\'d country, from whose bourn", "No traveller returns,--puzzles the will,", "And makes us rather bear those ills we have", "Than fly to others that we know not of?", "Thus conscience does make cowards of us all;", "And thus the native hue of resolution", "Is sicklied o\'er with the pale cast of thought;", "And enterprises of great pith and moment,", "With this regard, their currents turn awry,", "And lose the name of action.--Soft you now!", "The fair Ophelia!--Nymph, in thy orisons", "Be all my sins remember\'d."};
    public WordCountData() {
    }
    public static DataSet<String> getDefaultTextLineDataset(ExecutionEnvironment env){
        return env.fromElements(WORDS);
    }
}

WordCountTokenizer

切分句子

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;


public class WordCountTokenizer implements FlatMapFunction<String, Tuple2<String,Integer>>{

    public WordCountTokenizer(){}


    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
        String[] tokens = value.toLowerCase().split("\\W+");
        int len = tokens.length;

        for(int i = 0; i<len;i++){
            String tmp = tokens[i];
            if(tmp.length()>0){
                out.collect(new Tuple2<String, Integer>(tmp,Integer.valueOf(1)));
            }
        }
    }
}

WordCount

主函數(shù)

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.utils.ParameterTool;


public class WordCount {

    public WordCount(){}

    public static void main(String[] args) throws Exception {
        ParameterTool params = ParameterTool.fromArgs(args);
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        Object text;
        //如果沒有指定輸入路徑峡迷,則默認(rèn)使用WordCountData中提供的數(shù)據(jù)
        if(params.has("input")){
            text = env.readTextFile(params.get("input"));
        }else{
            System.out.println("Executing WordCount example with default input data set.");
            System.out.println("Use -- input to specify file input.");
            text = WordCountData.getDefaultTextLineDataset(env);
        }

        AggregateOperator counts = ((DataSet)text).flatMap(new WordCountTokenizer()).groupBy(new int[]{0}).sum(1);
        //如果沒有指定輸出银伟,則默認(rèn)打印到控制臺
        if(params.has("output")){
            counts.writeAsCsv(params.get("output"),"\n", " ");
            env.execute();
        }else{
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            counts.print();
        }

    }
}

首先打包成JAR包,這里需要使用-c指定main函數(shù):

flink run -c WordCount WordCount.jar

流處理運行WordCount

官方示例

可以直接在/usr/local/flink/examples/streaming中運行WordCount程序绘搞,并且這里還有更多示例:

54388669798

代碼

SocketWindowWordCount

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.sql.Time;
import java.util.stream.Collector;

public class SocketWindowWordCount {

    public static void main(String[] args) throws Exception {

        // the port to connect to
        final int port;
        try {
            final ParameterTool params = ParameterTool.fromArgs(args);
            port = params.getInt("port");
        } catch (Exception e) {
            System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
            return;
        }

        // get the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data by connecting to the socket
        DataStream<String> text = env.socketTextStream("localhost", port, "\n");

        // parse the data, group it, window it, and aggregate the counts
        DataStream<WordWithCount> windowCounts = text
                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    @Override
                    public void flatMap(String value, Collector<WordWithCount> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                })
                .keyBy("word")
                .timeWindow(Time.seconds(5), Time.seconds(1))
                .reduce(new ReduceFunction<WordWithCount>() {
                    @Override
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                        return new WordWithCount(a.word, a.count + b.count);
                    }
                });

        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");
    }

    // Data type for words with count
    public static class WordWithCount {

        public String word;
        public long count;

        public WordWithCount() {}

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}

首先打包成JAR包彤避,然后啟動netcat

nc -l 9000

將終端啟動netcat作為輸入流:

提交Jar包:

flink run -c SocketWindowWordCount WordCountSteaming.jar --port 9000

這樣終端會一直等待netcat的輸入流

54388822906

在netcat中輸入字符流:

54388825265

可以在WebUI中查看運行結(jié)果:

54388897680
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市夯辖,隨后出現(xiàn)的幾起案子琉预,更是在濱河造成了極大的恐慌,老刑警劉巖蒿褂,帶你破解...
    沈念sama閱讀 221,888評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件圆米,死亡現(xiàn)場離奇詭異卒暂,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)娄帖,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,677評論 3 399
  • 文/潘曉璐 我一進(jìn)店門也祠,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人块茁,你說我怎么就攤上這事齿坷。” “怎么了数焊?”我有些...
    開封第一講書人閱讀 168,386評論 0 360
  • 文/不壞的土叔 我叫張陵永淌,是天一觀的道長。 經(jīng)常有香客問我佩耳,道長遂蛀,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,726評論 1 297
  • 正文 為了忘掉前任干厚,我火速辦了婚禮李滴,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘蛮瞄。我一直安慰自己所坯,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 68,729評論 6 397
  • 文/花漫 我一把揭開白布挂捅。 她就那樣靜靜地躺著芹助,像睡著了一般。 火紅的嫁衣襯著肌膚如雪闲先。 梳的紋絲不亂的頭發(fā)上状土,一...
    開封第一講書人閱讀 52,337評論 1 310
  • 那天,我揣著相機(jī)與錄音伺糠,去河邊找鬼蒙谓。 笑死,一個胖子當(dāng)著我的面吹牛训桶,可吹牛的內(nèi)容都是我干的累驮。 我是一名探鬼主播,決...
    沈念sama閱讀 40,902評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼舵揭,長吁一口氣:“原來是場噩夢啊……” “哼慰照!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起琉朽,我...
    開封第一講書人閱讀 39,807評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎稚铣,沒想到半個月后箱叁,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體墅垮,經(jīng)...
    沈念sama閱讀 46,349評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,439評論 3 340
  • 正文 我和宋清朗相戀三年耕漱,在試婚紗的時候發(fā)現(xiàn)自己被綠了算色。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,567評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡螟够,死狀恐怖灾梦,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情妓笙,我是刑警寧澤若河,帶...
    沈念sama閱讀 36,242評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站寞宫,受9級特大地震影響萧福,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜辈赋,卻給世界環(huán)境...
    茶點故事閱讀 41,933評論 3 334
  • 文/蒙蒙 一鲫忍、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧钥屈,春花似錦悟民、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,420評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至腻脏,卻和暖如春鸦泳,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背永品。 一陣腳步聲響...
    開封第一講書人閱讀 33,531評論 1 272
  • 我被黑心中介騙來泰國打工做鹰, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人鼎姐。 一個月前我還...
    沈念sama閱讀 48,995評論 3 377
  • 正文 我出身青樓钾麸,卻偏偏與公主長得像,于是被迫代替她去往敵國和親炕桨。 傳聞我的和親對象是個殘疾皇子饭尝,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,585評論 2 359

推薦閱讀更多精彩內(nèi)容

  • Apache Flink是一個面向分布式數(shù)據(jù)流處理和批量數(shù)據(jù)處理的開源計算平臺钥平,它能夠基于同一個Flink運行時,...
    生活的探路者閱讀 2,042評論 0 3
  • 目的這篇教程從用戶的角度出發(fā)姊途,全面地介紹了Hadoop Map/Reduce框架的各個方面涉瘾。先決條件請先確認(rèn)Had...
    SeanC52111閱讀 1,734評論 0 1
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理知态,服務(wù)發(fā)現(xiàn),斷路器立叛,智...
    卡卡羅2017閱讀 134,702評論 18 139
  • Flink初體驗 安裝 官網(wǎng):http://flink.apache.org/downloads.html 可以看...
    it_zzy閱讀 29,809評論 0 10
  • 朋友圈越來越常出現(xiàn)這樣的情況:好端端的一個朋友,頭像莫名其妙就變成了公司的硬廣赁还。大部分情況下還都是跟紅領(lǐng)巾一樣鮮艷...
    涵丹尼閱讀 260評論 0 0