mac電腦端安裝flink并運(yùn)行example

本文介紹如何在mac電腦端安裝flink、運(yùn)行flink自帶exmaple团滥。

唯一的前置條件為電腦端安裝Java 8.x 托酸。
mac電腦端安裝flink命令:

brew install apache-flink

查看flink安裝位置,啟動(dòng)flink

brew info apache-flink
/usr/local/Cellar/apache-flink/1.8.1/libexec/bin

cd /usr/local/Cellar/apache-flink/1.8.1/libexec/bin;./start-cluster.sh
//輸出:
//Starting cluster.
//Starting standalonesession daemon on host MacBook-Pro.local.
//Starting taskexecutor daemon on host MacBook-Pro.local.

查看flink控制臺(tái)
http://localhost:8081/#/overview

創(chuàng)建一個(gè)flink的maven項(xiàng)目安皱,這里使用flink-quickstart-scala模版生成項(xiàng)目,同時(shí)可以使用scala與java代碼編寫flink應(yīng)用

 mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-scala -DarchetypeVersion=1.8.1 -DgroupId=com.galaxy.flink -DartifactId=galaxyFlink -Dversion=1.0-SNAPSHOT -Dpackage=com.galaxy.flink -DinteractiveMode=false

從github的flink項(xiàng)目獲得代碼爷贫,類名:org.apache.flink.streaming.examples.socket.SocketWindowWordCount

package com.galaxy.flink.examples.socket;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * Implements a streaming windowed version of the "WordCount" program.
 *
 * <p>This program connects to a server socket and reads strings from the socket.
 * The easiest way to try this out is to open a text server (at port 12345)
 * using the <i>netcat</i> tool via
 * <pre>
 * nc -l 12345 on Linux or nc -l -p 12345 on Windows or Mac
 * </pre>
 * and run this example with the hostname and the port as arguments.
 */
@SuppressWarnings("serial")
public class SocketWindowWordCount {

    public static void main(String[] args) throws Exception {

        // the host and the port to connect to
        final String hostname;
        final int port;
        try {
            final ParameterTool params = ParameterTool.fromArgs(args);
            hostname = params.has("hostname") ? params.get("hostname") : "localhost";
            port = params.getInt("port");
        } catch (Exception e) {
            System.err.println("No port specified. Please run 'SocketWindowWordCount " +
                    "--hostname <hostname> --port <port>', where hostname (localhost by default) " +
                    "and port is the address of the text server");
            System.err.println("To start a simple text server, run 'netcat -l <port>' and " +
                    "type the input text into the command line");
            return;
        }

        // get the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // get input data by connecting to the socket
        DataStream<String> text = env.socketTextStream(hostname, port, "\n");

        // parse the data, group it, window it, and aggregate the counts
        DataStream<WordWithCount> windowCounts = text

                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    @Override
                    public void flatMap(String value, Collector<WordWithCount> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                })

                .keyBy("word")
                .timeWindow(Time.seconds(5))

                .reduce(new ReduceFunction<WordWithCount>() {
                    @Override
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                        return new WordWithCount(a.word, a.count + b.count);
                    }
                });

        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");
    }

    // ------------------------------------------------------------------------

    /**
     * Data type for words with count.
     */
    public static class WordWithCount {

        public String word;
        public long count;

        public WordWithCount() {}

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}

添加README.md文件與.gitignore文件认然,并將本地項(xiàng)目提交到github,方便預(yù)研代碼托管漫萄。


image.png

使用 netcat 啟動(dòng)一個(gè)本地server:[注意:在mac端卷员,要使用p參數(shù)]

nc -l -p 9000
//隨機(jī)敲入字符
a d
w e

運(yùn)行example:
有兩種方式啟動(dòng)代碼:

  • 直接在IDEA中啟動(dòng)代碼類,任務(wù)將在本地內(nèi)嵌的Flink環(huán)境中運(yùn)行
//從控制臺(tái)日志中可以看到
15:01:38,218 INFO  org.apache.flink.streaming.api.environment.LocalStreamEnvironment  - Running job on local embedded Flink mini cluster
15:01:38,659 INFO  org.apache.flink.runtime.minicluster.MiniCluster              - Starting Flink Mini Cluster

控制臺(tái)中會(huì)打印出統(tǒng)計(jì)信息腾务。

  • 將代碼打成jar包子刮,提交到本地flink集群環(huán)境中運(yùn)行
//提交到本地flink集群命令
/usr/local/Cellar/apache-flink/1.8.1/libexec/bin/flink run -c com.galaxy.flink.examples.socket.SocketWindowWordCount /Users/baozhiwang/local_dir/codes/galaxyFlink/target/galaxyFlink-1.0-SNAPSHOT.jar --hostname localhost --port 9000

查看日志:

tailf /usr/local/Cellar/apache-flink/1.8.1/libexec/log/flink-baozhiwang-taskexecutor-0-baozhideMacBook-Pro.local.out
/**
a : 1
d : 1
w : 1
e : 1
*/
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市窑睁,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌葵孤,老刑警劉巖担钮,帶你破解...
    沈念sama閱讀 211,423評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異尤仍,居然都是意外死亡箫津,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,147評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來苏遥,“玉大人饼拍,你說我怎么就攤上這事√锾浚” “怎么了师抄?”我有些...
    開封第一講書人閱讀 157,019評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長教硫。 經(jīng)常有香客問我叨吮,道長,這世上最難降的妖魔是什么瞬矩? 我笑而不...
    開封第一講書人閱讀 56,443評(píng)論 1 283
  • 正文 為了忘掉前任茶鉴,我火速辦了婚禮,結(jié)果婚禮上景用,老公的妹妹穿的比我還像新娘涵叮。我一直安慰自己,他們只是感情好伞插,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,535評(píng)論 6 385
  • 文/花漫 我一把揭開白布割粮。 她就那樣靜靜地躺著,像睡著了一般蜂怎。 火紅的嫁衣襯著肌膚如雪穆刻。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,798評(píng)論 1 290
  • 那天杠步,我揣著相機(jī)與錄音氢伟,去河邊找鬼。 笑死幽歼,一個(gè)胖子當(dāng)著我的面吹牛朵锣,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播甸私,決...
    沈念sama閱讀 38,941評(píng)論 3 407
  • 文/蒼蘭香墨 我猛地睜開眼诚些,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了皇型?” 一聲冷哼從身側(cè)響起诬烹,我...
    開封第一講書人閱讀 37,704評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎弃鸦,沒想到半個(gè)月后绞吁,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,152評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡唬格,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,494評(píng)論 2 327
  • 正文 我和宋清朗相戀三年家破,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了颜说。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,629評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡汰聋,死狀恐怖门粪,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情烹困,我是刑警寧澤玄妈,帶...
    沈念sama閱讀 34,295評(píng)論 4 329
  • 正文 年R本政府宣布,位于F島的核電站韭邓,受9級(jí)特大地震影響措近,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜女淑,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,901評(píng)論 3 313
  • 文/蒙蒙 一瞭郑、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧鸭你,春花似錦屈张、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,742評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至愉老,卻和暖如春场绿,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背嫉入。 一陣腳步聲響...
    開封第一講書人閱讀 31,978評(píng)論 1 266
  • 我被黑心中介騙來泰國打工焰盗, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人咒林。 一個(gè)月前我還...
    沈念sama閱讀 46,333評(píng)論 2 360
  • 正文 我出身青樓熬拒,卻偏偏與公主長得像,于是被迫代替她去往敵國和親垫竞。 傳聞我的和親對(duì)象是個(gè)殘疾皇子澎粟,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,499評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容