從零開發(fā)flink-01: 編寫wordcount

1. 關(guān)于

官方文檔
https://ci.apache.org/projects/flink/flink-docs-release-1.11/try-flink/local_installation.html
學(xué)習(xí)視頻
https://www.bilibili.com/video/BV197411M7cQ?p=8

使用目的

  • 智能推薦
  • ETL 數(shù)據(jù)清理
  • 惡意操作判斷
  • 實(shí)時推薦
  • 流數(shù)據(jù)分析
  • 實(shí)時報表 確保數(shù)據(jù)不丟失
  • 復(fù)雜事件處理 (實(shí)時判斷用戶一系列操作滿足什么條件會發(fā)生什么)

優(yōu)勢

  • 高吞吐 (每秒處理量) 低延時 (內(nèi)存保存上下文狀態(tài)沒所以低延時) 高性能
  • 支持事件時間(避免因?yàn)閿?shù)據(jù)延時等 操作有誤)
  • 有狀態(tài)管理
  • 窗口 (支持時間 條數(shù) 會話窗口) (比如 一分鐘內(nèi)的數(shù)據(jù), 100條數(shù)據(jù), 會話中的數(shù)據(jù))
  • 分布式快照(checkpoint實(shí)現(xiàn)容錯) 自動開啟
  • 基于JVM實(shí)現(xiàn)獨(dú)立內(nèi)存管理 (減少JVM的GC影響 / 二進(jìn)制存儲降低存儲大小)
  • savepoint 保存點(diǎn) 手動開啟, (比如停機(jī)維護(hù)時使用)

2 說明

版本

  • flink 1.11.1
  • scala 2.11

注意

  • 本文參考了上面的視頻學(xué)習(xí)鏈接,但是視頻中的版本是1.9.1 和現(xiàn)在最新的1.11.1有一些不同的地方,所以把踩坑的記錄下來.

3 測試項目1 (流計算)

參考官方文檔 https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/project-configuration.html

下面分別寫上流處理和批處理的wordcount代碼
3.1 創(chuàng)建項目
  • 使用idea創(chuàng)建maven項目
  • 添加pom.xml中的依賴,并刷新maven下載
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.11.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.11.0</version>
        </dependency>
    </dependencies>
3.2 編寫流計算的wordcount
  • maven項目 目錄結(jié)構(gòu)
├── pom.xml
├── src
│   ├── main
│   │   ├── resources
│   │   └── scala
│   │       └── com
│   │           └── cowkeys
│   │               └── FlinkStreamWordCount.scala
│   └── test
│       └── scala
  • FlinkStreamWordCount.scala
package com.cowkeys

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object FlinkStreamWordCount {
  def main(args: Array[String]): Unit = {
    // 1. 初始流計算的環(huán)境
    val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 2. 導(dǎo)入隱式轉(zhuǎn)換
    import org.apache.flink.streaming.api.scala._
    // 3. 讀取數(shù)據(jù)
    val stream: DataStream[String] = streamEnv.socketTextStream("localhost", 8888)
    // 4. 轉(zhuǎn)換和處理數(shù)據(jù)
    val result: DataStream[(String, Int)] = stream.flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(0)
      .sum(1)
    // 5. 打印結(jié)果
    result.print("res")
    // 6. 啟動流計算程序
    streamEnv.execute("wordcount")
  }
}
  • 控制臺使用nc 啟動一個socket服務(wù)監(jiān)聽8888端口
> nc -lk 8888
  • 在項目中右鍵 Run 運(yùn)行 (這里出現(xiàn)了一些報警信息,但是不用管,程序已經(jīng)運(yùn)行了)
/usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java ...
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  • 在nc終端輸入測試數(shù)據(jù)兩行
> nc -lk 8888
flink apache count
flink apache
  • 項目中會有下面的信息
/usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java ...
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
res:10> (flink,1)
res:11> (count,1)
res:10> (apache,1)
res:10> (flink,2)
res:10> (apache,2

3.3 編寫批計算的wordcount

  • 新建scala object 在同級目錄下 FlinkBatchWordCount.scala
├── flinkTest.iml
├── pom.xml
├── src
│   ├── main
│   │   ├── resources
│   │   │   └── word.txt
│   │   └── scala
│   │       └── com
│   │           └── cowkeys
│   │               ├── FlinkBatchWordCount.scala
│   │               └── FlinkStreamWordCount.scala
│   └── test
│       └── scala
  • 在resource目錄新建word.txt,隨便寫入一些重復(fù)的單詞
flink apache batch
operate word analysis
hello flink
word count core
  • FlinkBatchWordCount.scala 代碼
package com.cowkeys
import java.net.URL
import org.apache.flink.api.scala.ExecutionEnvironment
/*
flink 批計算
*/
object FlinkBatchWordCount {
  def main(args: Array[String]): Unit = {
    // 初始化flink批處理環(huán)境
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

    // 導(dǎo)入隱式轉(zhuǎn)換
    import org.apache.flink.api.scala._

    // 獲取文件路徑,讀取文件
    val dataPath: URL = getClass.getResource("/word.txt")
    val data: DataSet[String] = env.readTextFile(dataPath.getPath)

    // 計算
    data.flatMap(_.split(" "))
      .map((_,1))
      .groupBy(0)
      .sum(1)
      .print()
  }
}
  • 右鍵運(yùn)行, 執(zhí)行成功
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
(count,1)
(batch,1)
(apache,1)
(flink,2)
(core,1)
(analysis,1)
(word,2)
(operate,1)
(hello,1)
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末摧找,一起剝皮案震驚了整個濱河市儒士,隨后出現(xiàn)的幾起案子嚎杨,更是在濱河造成了極大的恐慌袒炉,老刑警劉巖疙赠,帶你破解...
    沈念sama閱讀 216,324評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件付材,死亡現(xiàn)場離奇詭異,居然都是意外死亡圃阳,警方通過查閱死者的電腦和手機(jī)厌衔,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,356評論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來捍岳,“玉大人富寿,你說我怎么就攤上這事÷嗉校” “怎么了页徐?”我有些...
    開封第一講書人閱讀 162,328評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長银萍。 經(jīng)常有香客問我变勇,道長,這世上最難降的妖魔是什么贴唇? 我笑而不...
    開封第一講書人閱讀 58,147評論 1 292
  • 正文 為了忘掉前任搀绣,我火速辦了婚禮,結(jié)果婚禮上戳气,老公的妹妹穿的比我還像新娘豌熄。我一直安慰自己,他們只是感情好物咳,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,160評論 6 388
  • 文/花漫 我一把揭開白布锣险。 她就那樣靜靜地躺著,像睡著了一般览闰。 火紅的嫁衣襯著肌膚如雪芯肤。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,115評論 1 296
  • 那天压鉴,我揣著相機(jī)與錄音崖咨,去河邊找鬼。 笑死油吭,一個胖子當(dāng)著我的面吹牛击蹲,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播婉宰,決...
    沈念sama閱讀 40,025評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼歌豺,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了心包?” 一聲冷哼從身側(cè)響起类咧,我...
    開封第一講書人閱讀 38,867評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后痕惋,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體区宇,經(jīng)...
    沈念sama閱讀 45,307評論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,528評論 2 332
  • 正文 我和宋清朗相戀三年值戳,在試婚紗的時候發(fā)現(xiàn)自己被綠了议谷。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,688評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡堕虹,死狀恐怖卧晓,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情鲫凶,我是刑警寧澤禀崖,帶...
    沈念sama閱讀 35,409評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站螟炫,受9級特大地震影響波附,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜昼钻,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,001評論 3 325
  • 文/蒙蒙 一掸屡、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧然评,春花似錦仅财、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,657評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽饥努。三九已至,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間欠拾,已是汗流浹背菠秒。 一陣腳步聲響...
    開封第一講書人閱讀 32,811評論 1 268
  • 我被黑心中介騙來泰國打工恶阴, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留淌山,地道東北人。 一個月前我還...
    沈念sama閱讀 47,685評論 2 368
  • 正文 我出身青樓竟趾,卻偏偏與公主長得像憔购,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子岔帽,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,573評論 2 353