1. 關(guān)于
官方文檔
https://ci.apache.org/projects/flink/flink-docs-release-1.11/try-flink/local_installation.html
學(xué)習(xí)視頻
https://www.bilibili.com/video/BV197411M7cQ?p=8
使用目的
- 智能推薦
- ETL 數(shù)據(jù)清理
- 惡意操作判斷
- 實(shí)時推薦
- 流數(shù)據(jù)分析
- 實(shí)時報表 確保數(shù)據(jù)不丟失
- 復(fù)雜事件處理 (實(shí)時判斷用戶一系列操作滿足什么條件會發(fā)生什么)
優(yōu)勢
- 高吞吐 (每秒處理量) 低延時 (內(nèi)存保存上下文狀態(tài)沒所以低延時) 高性能
- 支持事件時間(避免因?yàn)閿?shù)據(jù)延時等 操作有誤)
- 有狀態(tài)管理
- 窗口 (支持時間 條數(shù) 會話窗口) (比如 一分鐘內(nèi)的數(shù)據(jù), 100條數(shù)據(jù), 會話中的數(shù)據(jù))
- 分布式快照(checkpoint實(shí)現(xiàn)容錯) 自動開啟
- 基于JVM實(shí)現(xiàn)獨(dú)立內(nèi)存管理 (減少JVM的GC影響 / 二進(jìn)制存儲降低存儲大小)
- savepoint 保存點(diǎn) 手動開啟, (比如停機(jī)維護(hù)時使用)
2 說明
版本
- flink 1.11.1
- scala 2.11
注意
- 本文參考了上面的視頻學(xué)習(xí)鏈接,但是視頻中的版本是1.9.1 和現(xiàn)在最新的1.11.1有一些不同的地方,所以把踩坑的記錄下來.
3 測試項目1 (流計算)
參考官方文檔 https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/project-configuration.html
下面分別寫上流處理和批處理的wordcount代碼
3.1 創(chuàng)建項目
- 使用idea創(chuàng)建maven項目
- 添加pom.xml中的依賴,并刷新maven下載
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.11.0</version>
</dependency>
</dependencies>
3.2 編寫流計算的wordcount
- maven項目 目錄結(jié)構(gòu)
├── pom.xml
├── src
│ ├── main
│ │ ├── resources
│ │ └── scala
│ │ └── com
│ │ └── cowkeys
│ │ └── FlinkStreamWordCount.scala
│ └── test
│ └── scala
- FlinkStreamWordCount.scala
package com.cowkeys
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object FlinkStreamWordCount {
def main(args: Array[String]): Unit = {
// 1. 初始流計算的環(huán)境
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 2. 導(dǎo)入隱式轉(zhuǎn)換
import org.apache.flink.streaming.api.scala._
// 3. 讀取數(shù)據(jù)
val stream: DataStream[String] = streamEnv.socketTextStream("localhost", 8888)
// 4. 轉(zhuǎn)換和處理數(shù)據(jù)
val result: DataStream[(String, Int)] = stream.flatMap(_.split(" "))
.map((_, 1))
.keyBy(0)
.sum(1)
// 5. 打印結(jié)果
result.print("res")
// 6. 啟動流計算程序
streamEnv.execute("wordcount")
}
}
- 控制臺使用nc 啟動一個socket服務(wù)監(jiān)聽8888端口
> nc -lk 8888
- 在項目中右鍵 Run 運(yùn)行 (這里出現(xiàn)了一些報警信息,但是不用管,程序已經(jīng)運(yùn)行了)
/usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java ...
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
- 在nc終端輸入測試數(shù)據(jù)兩行
> nc -lk 8888
flink apache count
flink apache
- 項目中會有下面的信息
/usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java ...
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
res:10> (flink,1)
res:11> (count,1)
res:10> (apache,1)
res:10> (flink,2)
res:10> (apache,2
3.3 編寫批計算的wordcount
- 新建scala object 在同級目錄下 FlinkBatchWordCount.scala
├── flinkTest.iml
├── pom.xml
├── src
│ ├── main
│ │ ├── resources
│ │ │ └── word.txt
│ │ └── scala
│ │ └── com
│ │ └── cowkeys
│ │ ├── FlinkBatchWordCount.scala
│ │ └── FlinkStreamWordCount.scala
│ └── test
│ └── scala
- 在resource目錄新建word.txt,隨便寫入一些重復(fù)的單詞
flink apache batch
operate word analysis
hello flink
word count core
- FlinkBatchWordCount.scala 代碼
package com.cowkeys
import java.net.URL
import org.apache.flink.api.scala.ExecutionEnvironment
/*
flink 批計算
*/
object FlinkBatchWordCount {
def main(args: Array[String]): Unit = {
// 初始化flink批處理環(huán)境
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
// 導(dǎo)入隱式轉(zhuǎn)換
import org.apache.flink.api.scala._
// 獲取文件路徑,讀取文件
val dataPath: URL = getClass.getResource("/word.txt")
val data: DataSet[String] = env.readTextFile(dataPath.getPath)
// 計算
data.flatMap(_.split(" "))
.map((_,1))
.groupBy(0)
.sum(1)
.print()
}
}
- 右鍵運(yùn)行, 執(zhí)行成功
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
(count,1)
(batch,1)
(apache,1)
(flink,2)
(core,1)
(analysis,1)
(word,2)
(operate,1)
(hello,1)