Flink 原理架構(gòu)
下圖是官網(wǎng)的一個架構(gòu)圖,有以下特點:
1:數(shù)據(jù)源有實時數(shù)據(jù)和非實時數(shù)據(jù)后专,比如數(shù)據(jù)庫褪迟、文件系統(tǒng)等
2:
用來做什么
-
處理有邊界和無邊界的數(shù)據(jù)
image.png
1:無邊界數(shù)據(jù)流(Unbounded streams)
2:有邊界數(shù)據(jù)流(bounded Streams)
部署
可以和各種流行的分布式資源管理工具整合食磕,比如: Hadoop YARN, Apache Mesos, and Kubernetes
利用內(nèi)存計算
Flink 主要使用內(nèi)存來計算尽棕。
因此延遲低,
Flink 周期性的異步的把內(nèi)存中的狀態(tài)持久化彬伦,來保證出現(xiàn)故障的時候能夠保證 exactly-onece 的效果滔悉。
Flink的一些入門操作
Flink是什么
Flink 作為新一代流式大數(shù)據(jù)處理框架,已獲得阿里单绑、美團等諸多大廠的青睞
流處理并不是一個新概念氧敢,但是要做好并不是一件容易的事情。提到流處理询张,我們最先想到的可能是金融交易孙乖、信號檢測以及地圖導(dǎo)航等領(lǐng)域的應(yīng)用。但是近年來隨著信息技術(shù)的發(fā)展份氧,除了前面提到的三個領(lǐng)域唯袄,其它方向?qū)?shù)據(jù)時效性的要求也越來越高。隨著 Hadoop 生態(tài)的崛起蜗帜,Storm恋拷、Spark Streaming、Samza厅缺、MillWheel 等一眾流處理技術(shù)開始走入大眾視野蔬顾,但是我們最熟悉的應(yīng)該還是 Storm 和 Spark Steaming宴偿。
“高吞吐”、“低延遲”和”exactly-once“是衡量一個流處理框架的重要指標诀豁。 Storm 雖然提供了低延遲的流處理窄刘,但是在高吞吐方面的表現(xiàn)并不算佳,可以說基本滿足不了日益暴漲的數(shù)據(jù)量舷胜,而且也沒辦法保證精準一次消費娩践。Spark Streaming 中通過微批次的批處理來模擬流處理,只要當批處理的批次分的足夠小烹骨,那么從宏觀上來看就是流處理翻伺,這也是 Spark Steaming 的核心思想。通過微觀批處理的方式沮焕,Spark Streaming 也實現(xiàn)了高吞吐和 exactly-once 語義吨岭,時效性也有了大幅提升,在很長一段時間里占據(jù)流處理榜首峦树。但是受限于其實現(xiàn)方式辣辫,依然存在幾秒的延遲,對于那些實時性要求較高的領(lǐng)域來說依然不夠完美空入。 在這樣的背景下络它,F(xiàn)link 應(yīng)用而生族檬。
Apache Flink 是為分布式歪赢、高性能、隨時可用以及準確的流處理應(yīng)用程序打造的開源流處理框架单料,用于對無界和有界數(shù)據(jù)流進行有狀態(tài)計算埋凯。Flink 最早起源于在 2010 ~ 2014 年,由 3 所地處柏林的大學(xué)和歐洲的一些其它大學(xué)共同進行研究的名為 Stratosphere 的項目扫尖。2014 年 4 月 Stratosphere 將其捐贈給 Apache 軟件基 金會白对, 初始成員是 Stratosphere 系統(tǒng)的核心開發(fā)人員,2014 年 12 月换怖,F(xiàn)link 一躍成為 Apache 軟件基金會的頂級項目甩恼。在 2015 年,阿里也加入到了 Flink 的開發(fā)工作中沉颂,并貢獻了至少 150 萬行代碼条摸。
Flink 一詞在德語中有著“靈巧”、“快速”的意思铸屉,它的 logo 原型也是柏林常見的一種松鼠钉蒲,以身材嬌小、靈活著稱彻坛,為該項目取這樣的名字和選定這樣的 logo 也正好符合 Flink 的特點和愿景顷啼。
注意踏枣,雖然我們說 Flink 是一個流處理框架,但是它同樣可以進行批處理钙蒙。因為在 Flink 的世界觀里茵瀑,批處理是流處理的一種特殊形式,這和 Spark 不同仪搔,在 Spark 中瘾婿,流處理是通過大批量的微批處理實現(xiàn)的。
Flink入門的一些知識點
Flink本地安裝及啟動和任務(wù)提交
下載flink烤咧,到flink官網(wǎng)下載
然后:
$ tar -xzf flink-1.13.2-bin-scala_2.11.tgz
$ cd flink-1.13.2-bin-scala_2.11
啟動flink
$ ./bin/start-cluster.sh
提交任務(wù)
$ ./bin/flink run examples/streaming/WordCount.jar
$ tail log/flink-*-taskexecutor-*.out
(nymph,1)
(in,3)
(thy,1)
(orisons,1)
(be,4)
(all,2)
(my,1)
(sins,1)
(remember,1)
(d,4)
關(guān)閉集群
$ ./bin/stop-cluster.sh
Flink界面
http://localhost:8081/
搭建Flink開發(fā)環(huán)境
建立IDEA maven項目
在Java同級目錄下新建scala目錄偏陪,并設(shè)置為source folder
右鍵點擊項目,增加Scala支持
接下來建立兩個 Scala的 Object:
然后配置maven以來和build內(nèi)容:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>kenian.test</groupId>
<artifactId>FlinkLearning</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.11.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 該插件用于將 Scala 代碼編譯成 class 文件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
注意煮嫌,高版本的flink笛谦,需要增加下面的依賴才能夠在本地啟動
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.11.0</version>
</dependency>
注意,如果要連接kafka昌阿,可以加上
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.9.1</version>
</dependency>
編寫批處理代碼并啟動
package test.kenian
import org.apache.flink.api.scala._
object BatchWordCount {
def main(args: Array[String]): Unit = {
// 創(chuàng)建執(zhí)行環(huán)境
val env = ExecutionEnvironment.getExecutionEnvironment
// 從文本讀取數(shù)據(jù)
val inputPath = "D:/ubuntu_linux_base/words.txt"
val inputDS: DataSet[String] = env.readTextFile(inputPath)
// 計算邏輯
val wordCountDS: AggregateDataSet[(String, Int)] = inputDS
.flatMap(_.split(" "))
.map((_, 1))
.groupBy(0)
.sum(1)
// 打印輸出
wordCountDS.print()
}
}
直接IDEA啟動饥脑,得到下面的結(jié)果
編寫流代碼并且啟動
package test.kenian
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object StreamWordCount {
def main(args: Array[String]): Unit = {
// 創(chuàng)建執(zhí)行環(huán)境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 監(jiān)控Socket數(shù)據(jù)
val textDstream: DataStream[String] = env.socketTextStream("localhost", 9999)
// 導(dǎo)入隱式轉(zhuǎn)換
import org.apache.flink.api.scala._
// 計算邏輯
val dataStream: DataStream[(String, Int)] = textDstream
.flatMap(_.split(" "))
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(0)
.sum(1)
// 設(shè)置并行度
dataStream.print().setParallelism(1)
// 執(zhí)行
env.execute("Socket stream word count")
}
}
STEP1
執(zhí)行 下面命令:
nc -l -p 9999
STEP2:
啟動Steaming代碼
STEP3
在nc 命令下數(shù)據(jù)數(shù)據(jù),然后查看結(jié)果
hello world
hello flink
hello spark
hello java
輸出結(jié)果如下:
(hello,1)
(flink,1)
(hello,2)
(spark,1)
(java,1)
(hello,3)
Flink算子
基礎(chǔ)算子
Map
Filter
Flatmap
package test.kenian.operator
import org.apache.flink.api.scala._
object BaseOperator {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val data = env.fromCollection(List(1, 2, 3, 4, 5))
println("--------- map --------------")
//map 算子
val data2 = data.map( x => x * 10)
data2.print()
println("--------- filter --------------")
// filter 算子
val data3 = data.filter( x => x >= 3)
data3.print()
// flatmap 算子
println("--------- flatmap --------------")
// flatmap 最終會導(dǎo)致數(shù)據(jù)行數(shù)的改變懦冰,但是如果是map不會導(dǎo)致這兒問題
val dataStr = env.fromCollection(List("a b","c d","e f"))
val dataStr2 = dataStr.flatMap(_.split(" "))
dataStr2.print()
println("--------- map --------------")
val dataStr1 = dataStr.map(_.split(" "))
dataStr1.print()
}
}
結(jié)果如下:
--------- map --------------
10
20
30
40
50
--------- filter --------------
3
4
5
--------- flatmap --------------
a
b
c
d
e
f
--------- map --------------
[Ljava.lang.String;@5ec46cdd
[Ljava.lang.String;@2324bfe7
[Ljava.lang.String;@112d1c8e
基于Key的算子
基于Key的算子分為三個大類:
? KeyBy
? Rolling Aggregation
o sum
o min
o max
o minBy
o maxBy
? reduce
KeyBy算子
該算子和其他算子一般一起使用灶轰,比如和Sum,看下面代碼
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object KeyByOperator {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val data: DataStream[String] = env.readTextFile("D:/ubuntu_linux_base/userlog.txt")
val userLogStream: DataStream[UserLog] = data.map(fun = log => {
val arr: Array[String] = log.split(" ")
UserLog(arr(0), arr(1), arr(2).toInt)
})
userLogStream
.keyBy("city")
.sum("duration")
.print()
env.execute("Key By Operator")
}
case class UserLog(name: String, city: String, duration: Int)
}
執(zhí)行結(jié)果如下:
6> UserLog(Jack,Beijing,100)
16> UserLog(Joker,Shanghai,200)
18> UserLog(William,Chengdu,600)
18> UserLog(William,Chengdu,900)
6> UserLog(Jack,Beijing,500)
16> UserLog(Joker,Shanghai,400)
原始數(shù)據(jù)如下:
Jack Beijing 100
Bob Chengdu 300
William Chengdu 600
Lily Shanghai 200
Loius Beijing 400
Joker Shanghai 200
你可能會疑惑刷钢,輸出的不應(yīng)該是有兩列笋颤,city 和 sum(duration)嗎?請注意内地,我們這里的計算是流處理伴澄,而不是離線的批處理,我們創(chuàng)建的環(huán)境是 StreamExecutionEnvironment阱缓。程序從上往下一行一行讀取文本非凌,然后按照 city 字段分組,當執(zhí)行到第一行的時候荆针,只有它自己敞嗡,所以輸出自己本身。當執(zhí)行到第二行的時候航背,city為 Chengdu 的也只有一行喉悴,所以也輸出了它自己。當程序執(zhí)行到第三行的時候沃粗,第二個 Chengdu 出現(xiàn)了粥惧,所以 sum 的結(jié)果是 900(300 + 600)。當程序執(zhí)行到第四行的時候最盅,Shanghai 第一次出現(xiàn)突雪,所以也只有它自己起惕。當程序執(zhí)行到第 5 行的時候,第二個 Beijing 出現(xiàn)了咏删,所以輸出的是 500(100 + 400)惹想。當程序執(zhí)行到第 6 行,第二個 Shanghai 出現(xiàn)了督函,所以輸出的是 400(200 + 200)嘀粱。
maxBy算子
和max的區(qū)別是,max只會展示對應(yīng)值的最大值辰狡。但是maxBy會替換整行的值锋叨。
有點類似于,max是屬于開窗宛篇,獨立出來一個值用于存儲最大值娃磺。
MaxBy 是直接找到最大的那行并保留下來。
Reduce算子
代碼如下:
userLogStream
.keyBy("city")
.reduce((x, y) => {
UserLog(y.name, y.city, x.duration + y.duration)
}).print
可以對數(shù)據(jù)做出聚合的效果叫倍。
多流轉(zhuǎn)換算子
Union 合并數(shù)據(jù)流
作用是把多個流的數(shù)據(jù)合并到一起偷卧,比如從兩個系統(tǒng)取的同一種交易信息,則可以合并到一起進行處理吆倦。
樣例代碼听诸,健康兩個端口 9999 9998 ,使用nc命令發(fā)送消息蚕泽,然后把消息展示出來晌梨。查看效果
ackage test.kenian.operator
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object UnionOperator {
def main(args: Array[String]): Unit = {
// 創(chuàng)建執(zhí)行環(huán)境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 接收淘寶的訂單信息
val taobaoOrder: DataStream[String] = env.socketTextStream("localhost", 9998)
// 接收天貓的訂單信息
val tianmaoOrder: DataStream[String] = env.socketTextStream("localhost", 9999)
// 把兩個環(huán)境的信息合并
val dataStream: DataStream[String] = taobaoOrder.union(tianmaoOrder)
// 設(shè)置并行度
// 設(shè)置并行度
dataStream.print().setParallelism(1)
// 執(zhí)行
env.execute("Socket stream word count")
}
}
Split select 分離數(shù)據(jù)流
Split 和 Select是完全想法的一種做法,
Split 可以把現(xiàn)有的一個數(shù)據(jù)流分為多個赛糟,然后通過select 可以單獨的查詢分裂出來的某個數(shù)據(jù)流派任。
這里有個小DEMO砸逊,就是根據(jù)溫度把不同的人放到不同的數(shù)據(jù)流里面璧南,比如說檢查出高危新冠人群。
package test.kenian.operator
import org.apache.flink.streaming.api.scala._
object SelectOperator {
def main(args: Array[String]): Unit = {
// 創(chuàng)建執(zhí)行環(huán)境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 接收天貓的訂單信息
val dataStream: DataStream[String] = env.socketTextStream("localhost", 9999)
val splitStream = dataStream.map(line => {
val arr: Array[String] = line.split(" ")
People(arr(0), arr(1).toFloat)
}).split(people => {
if (people.temperature > 36) Seq("fever") else Seq("normal")
})
val normal: DataStream[People] = splitStream.select("normal")
val fever: DataStream[People] = splitStream.select("fever")
// normal.print()
fever.print()
// 執(zhí)行
env.execute("Socket Stream Split and Select")
}
case class People(name: String, temperature: Float)
}
以上代碼师逸,使用nc發(fā)送的數(shù)據(jù)個人的溫度大于36都會被上報
Source 和 Sink
Source
Flink自帶常見source有:
? env.readTextFile()
? env.socketTextStream()
? env.fromCollection()
? env.fromElement()
? env.generateSequence()
自定義source
但是實際工作中司倚,更加常見的是使用一些第三方的工具作為source,比如kafka
Kafka其實是自定義source的一種篓像。自定義source的原理如下:
package test.kenian.source
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
object Source {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val data = env.addSource(new MySource())
data.setParallelism(1).print()
env.execute()
}
/**
* 自定義source动知,該source實現(xiàn)兩個方法
* 分別是斷開方法和搜集數(shù)據(jù)方法
*
*/
class MySource extends SourceFunction[Integer] {
var flag = true
override def run(ctx: SourceFunction.SourceContext[Integer]): Unit = {
var count = 0
val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
while (flag) {
for (elem <- list) {
ctx.collect(elem) // 如果把數(shù)據(jù)導(dǎo)入到flink中,這是個橋梁员辩。如果自定義source盒粮,需要在這個地方進行控制,如何把數(shù)據(jù)放入到 sourceContext中
count += 1
if (count == 300) cancel()
}
}
}
override def cancel(): Unit = {
flag = false
}
}
}
下面代碼中自定義了一個source奠滑,該source被flink流式處理丹皱。一個會發(fā)送300個數(shù)字過去妒穴。
然后在代碼中使用了該source。通過該source接收數(shù)據(jù)摊崭。
Kafka source
啟動kafka
到kafka的安裝目錄
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.log 2>&1 &
nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &
在kafka上建立topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic shiyanlou
在topic中寫入數(shù)據(jù)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic shiyanlou
然后可以手寫的方式隨便寫一些
實驗是否可以通過命令讀取數(shù)據(jù)
bin/kafka-console-consumer.sh --topic shiyanlou --from-beginning --bootstrap-server localhost:9092
通過Java代碼讀取kafka
這里需要注意一個非常重要的點讼油,因為我的本地沒有配置遠程機器的別名什么的,導(dǎo)致沒法連接上他們的環(huán)境呢簸。因此需要配置kafka的這兩個參數(shù):
advertised.host.name=127.0.0.1
advertised.port=9092
這兩個參數(shù)要根據(jù)自己的實際情況重新配置矮台,我因為是本地因此配置的就是127.0.0.1
Java代碼如下:
import java.util.Properties
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer}
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.kafka.common.serialization.StringDeserializer
object KafkaSource {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
val prop = new Properties()
prop.setProperty("bootstrap.servers", "localhost:9092")
prop.setProperty("zookeeper.connect", "localhost:2181")
prop.setProperty("key.deserializer", classOf[StringDeserializer].getName)
prop.setProperty("value.deserializer", classOf[StringDeserializer].getName)
prop.setProperty("auto.offset.reset", "latest")
// 添加 Kafka Source
val data = env.addSource(new FlinkKafkaConsumer[String]("shiyanlou", new SimpleStringSchema(), prop))
data.print()
env.execute()
}
}
Sink
常見flink自帶sink
? writeAsText
? writeAsCsv
? writeToSocket
注意,生成文件的個數(shù)和并行度分區(qū)數(shù)相關(guān)
自定義sink
代碼如下根时,類似source瘦赫,flink和sink對象的溝通通過的是 SinkFunction.Context
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object FlinkSink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val data = env.readTextFile("D:/ubuntu_linux_base/score.txt")
data.addSink(new MySink())
env.execute()
}
class MySink extends SinkFunction[String] {
override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {
val time = context.currentProcessingTime()
val waterMark = context.currentWatermark()
println(s"$value : $time : $waterMark")
}
}
}
我們可以從該對象中取出對應(yīng)的結(jié)果值,然后打印出來
Flink狀態(tài)管理
什么叫flink的狀態(tài)
有狀態(tài)的計算是流處理框架中的重要功能之一蛤迎,因為很多復(fù)雜的業(yè)務(wù)場景都會涉及到數(shù)據(jù)前后狀態(tài)耸彪。Flink 本身也是帶狀態(tài)的流處理引擎,如果你在此之前有看過 Flink 的官方文檔忘苛,應(yīng)該有注意到Stateful這個關(guān)鍵詞蝉娜。本節(jié)實驗我們就重點學(xué)習(xí) Flink 中的狀態(tài)(State)相關(guān)知識點。
狀態(tài)相關(guān)知識點
- State 分類
- Keyed State
- Operator State
- Checkpoint
- StateBackend
State的兩種類型:Keyed State 和 Operate State
Operator State:Operator State 可以作用在所有算子上扎唾,每個算子中并行的 Task 都可以共享一個狀態(tài)召川,或者說同?個算?中的多個 Task 的狀態(tài)是相同的。但是請注意胸遇,算子狀態(tài)不能由相同或不同算子的另一個實例訪問荧呐。Operator State 支持三種基本數(shù)據(jù)結(jié)構(gòu),分別是:
o ListState:存儲列表類型的狀態(tài)纸镊。
o UnionListState:存儲列表類型的狀態(tài)倍阐。和 ListState 的區(qū)別是,如果發(fā)生故障逗威,ListState 會將該算子的所有并發(fā)的狀態(tài)實例進行匯總峰搪,然后均分給新的 Task;而 UnionListState 只是將所有并發(fā)的狀態(tài)實例匯總起來凯旭,具體的劃分行為則由用戶進行定義概耻。
BroadcastState:用于廣播的算子狀態(tài)。如果一個算子有多項任務(wù)罐呼,并且它的每項任務(wù)狀態(tài)又都相同鞠柄,這種情況就可以使用廣播狀態(tài)。
? Keyed State:Keyed State 是作用在 KeyedStream 上的嫉柴。從名稱中就可以看出來厌杜,它的特點是和 Key 強相關(guān)的。在任務(wù)處理中计螺,F(xiàn)link 為每個 Key 維護一個狀態(tài)實例夯尽,而且相同 Key 所對應(yīng)的數(shù)據(jù)都會被分配到同一個任務(wù)中執(zhí)行侧馅。Keyed State 支持五種基本數(shù)據(jù)結(jié)構(gòu),分別是:
o ValueState:保存單個 Value呐萌,可以針對該 Value 進行 get/set 操作馁痴。
o ListState:保存一個列表,列表中可以存儲多個 Value肺孤÷拊危可以針對列表進行 add、get赠堵、update 操作小渊。
o MapState:保存 Key-Value 類型的值∶0龋可以針對其進行 get酬屉、put、remove 操作揍愁,還可以使用 contains 判斷某個 key 是否存在呐萨。
o ReducingState:保存一個單一值,該值是添加到狀態(tài)的所有值聚合的結(jié)果莽囤。
o AggregatingState:保存一個單一值谬擦,該值是添加到狀態(tài)的所有值聚合的結(jié)果。與 ReducingState 有些不同朽缎,聚合類型可能不同于添加到狀態(tài)的元素的類型惨远。接口和 ListState 相同,但是使用 add(IN)添加的元素本質(zhì)是通過使用指定的 AggregateFunction 進行聚合话肖。
Flink 架構(gòu)
主要進程
JobManager 作業(yè)管理器
控制一個應(yīng)用程序執(zhí)行的主進程北秽,也就是說,每個應(yīng)用程序都會被一個不同的 JobManager 所控制執(zhí)行最筒。JobManager 會先接收到要執(zhí)行的應(yīng)用程序贺氓,這個應(yīng)用程序會包括:作業(yè)圖(JobGraph)、邏輯數(shù)據(jù)流圖(logical dataflow graph)和打包了所有的類是钥、庫和其它資源的 JAR 包掠归。JobManager 會把 JobGraph 轉(zhuǎn)換成一個物理層面的數(shù)據(jù)流圖缅叠,這個圖被叫做“執(zhí)行圖”(ExecutionGraph)悄泥,包含了所有可以并發(fā)執(zhí)行的任務(wù)。JobManager 會向資源管理器(ResourceManager)請求執(zhí)行任務(wù)必要的資源肤粱,也就是任務(wù)管理器(TaskManager)上的插槽(slot)弹囚。一旦它獲取到了足夠的資源,就會將執(zhí)行圖分發(fā)到真正運行它們的 TaskManager 上领曼。而在運行過程中鸥鹉,JobManager 會負責(zé)所有需要中央?yún)f(xié)調(diào)的操作蛮穿,比如說檢查點(checkpoints)的協(xié)調(diào)。
ResourceManager 資源管理器
主要負責(zé)管理任務(wù)管理器(TaskManager)的插槽(slot)毁渗,TaskManger 插槽是 Flink 中定義的處理資源單元践磅。Flink 為不同的環(huán)境和資源管理工具提供了不同資源管理器,比如 YARN灸异、Mesos府适、K8s,以及 standalone 部署肺樟。當 JobManager 申請插槽資源時檐春,ResourceManager 會將有空閑插槽的 TaskManager 分配給 JobManager。如果 ResourceManager 沒有足夠的插槽來滿足 JobManager 的請求么伯,它還可以向資源提供平臺發(fā)起會話疟暖,以提供啟動 TaskManager 進程的容器。另外田柔,ResourceManager 還負責(zé)終止空閑的 TaskManager俐巴,釋放計算資源。
TaskManager 任務(wù)管理器
Flink 中的工作進程硬爆。通常在 Flink 中會有多個 TaskManager 運行窜骄,每一個 TaskManager 都包含了一定數(shù)量的插槽(slots)。插槽的數(shù)量限制了 TaskManager 能夠執(zhí)行的任務(wù)數(shù)量摆屯。啟動之后邻遏,TaskManager 會向資源管理器注冊它的插槽;收到資源管理器的指令后虐骑,TaskManager 就會將一個或者多個插槽提供給 JobManager 調(diào)用准验。JobManager 就可以向插槽分配任務(wù)(tasks)來執(zhí)行了。在執(zhí)行過程中廷没,一個 TaskManager 可以跟其它運行同一應(yīng)用程序的 TaskManager 交換數(shù)據(jù)糊饱。
Dispatcher 以及分發(fā)器
可以跨作業(yè)運行,它為應(yīng)用提交提供了 REST 接口颠黎。當一個應(yīng)用被提交執(zhí)行時另锋,分發(fā)器就會啟動并將應(yīng)用移交給一個 JobManager。由于是 REST 接口狭归,所以 Dispatcher 可以作為集群的一個 HTTP 接入點夭坪,這樣就能夠不受防火墻阻擋。Dispatcher 也會啟動一個 Web UI过椎,用來方便地展示和監(jiān)控作業(yè)執(zhí)行的信息室梅。Dispatcher 在架構(gòu)中可能并不是必需的,這取決于應(yīng)用提交運行的方式。
Slots
每?個 TaskManager(worker)是?個 JVM 進程亡鼠,它可能會在獨?的線程上執(zhí)??個或多個 subtask赏殃。為了控制?個 worker 能接收多少個 task,worker 通過 task slot 來進?控制(?個 worker ?少有?個 task slot)间涵。每個 task slot 表示 TaskManager 擁有資源的?個固定??的?集仁热。假如?個 TaskManager 有三個 slot,那么它會將其管理的內(nèi)存分成三份給各個 slot勾哩。資源 slot 化意味著?個 subtask 將不需要跟來?其 他 job 的 subtask 競爭被管理的內(nèi)存股耽,取?代之的是它將擁有?定數(shù)量的內(nèi)存儲備。需要注意的是钳幅,這? 不會涉及到 CPU 的隔離物蝙,slot ?前僅僅?來隔離 task 的受管理的內(nèi)存。
通過調(diào)整 task slot 的數(shù)量敢艰,允許?戶定義 subtask 之間如何互相隔離诬乞。如果?個 TaskManager ?個 slot,那將意味著每個 task group 運?在獨?的 JVM 中(該 JVM 可能是通過?個特定的容器啟動的)钠导,? ?個 TaskManager 多個 slot 意味著更多的 subtask 可以共享同?個 JVM震嫉。?在同?個 JVM 進程中的 task 將 共享 TCP 連接(基于多路復(fù)?)和?跳消息。它們也可能共享數(shù)據(jù)集和數(shù)據(jù)結(jié)構(gòu)牡属,因此這減少了每個 task 的負載票堵。
Task Slot 是靜態(tài)的概念,是指 TaskManager 具有的并發(fā)執(zhí)?能?逮栅,可以通過參數(shù) taskmanager.numberOfTaskSlots 進?配置悴势,?并?度 parallelism 是動態(tài)概念,即 TaskManager 運? 程序時實際使?的并發(fā)能?措伐,可以通過參數(shù) parallelism.default 進?配置特纤。也就是說,假設(shè)?共有 3 個 TaskManager侥加,每?個 TaskManager 中分配 3 個 TaskSlot捧存,也就是每個 TaskManager 可以接收 3 個 task,?共 9 個 TaskSlot担败,如果我們設(shè)置 parallelism.default=1昔穴,即運?程序默認的并?度為 1,9 個 TaskSlot 只?了 1 個提前,有 8 個空閑吗货,因此,設(shè)置合適的并?度才能提?效率岖研。
窗口
Window分類
根據(jù)上游數(shù)據(jù)集的類型可以分為:
Keyed Window
Global Window
根據(jù)業(yè)務(wù)場景來分卿操,又可以分為:
Count Window
Time Window警检。
TIME WINDOW
滾動窗口(Tumbling Window)
滾動窗口是按照固定時間進行切分孙援,而且所有窗口之間的數(shù)據(jù)不會重疊害淤,使用時只需要指定一個窗口長度即可。
代碼樣例:
滾動窗口的窗口大型厥邸(window size)是固定的窥摄,而且相鄰窗口之間是連續(xù)的。現(xiàn)在有這樣的業(yè)務(wù)場:某公司要求每 10 秒統(tǒng)計一次最近 10 秒內(nèi)各個電商平臺的訂單數(shù)量并輸出到大屏幕础淤,這時候就需要用到滾動窗口了崭放,我們只需要將窗口大小設(shè)置為 10 秒就可以。我們使用 netcat 發(fā)送 Socket 數(shù)據(jù)來模擬訂單流量鸽凶。
// 監(jiān)控Socket數(shù)據(jù)
val textDstream: DataStream[String] = env.socketTextStream("localhost", 9999)
val dataStream: DataStream[(String, Int)] = textDstream
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(0)
.timeWindow(Time.seconds(10))
.sum(1)
滑動窗口(Sliding Window)
滑動窗口(Sliding Window):滑動窗口有兩個參數(shù)桌硫,分別是窗口大小和窗口滑動時間言津,它是允許不同窗口的元素重疊的(同一個元素可以出現(xiàn)在不同的窗口中)。窗口大小指定數(shù)據(jù)統(tǒng)計的時間跨度,而滑動時間指定的是相鄰兩個窗口時間的時間偏移量融击。當滑動時間小于窗口大小的時候,數(shù)據(jù)會發(fā)生重疊闹啦;當滑動窗口大于窗口大小的時候遣蚀,窗口會出現(xiàn)不連續(xù)的情況(部分元素不會納入統(tǒng)計);當滑動時間和窗口大小相等的時候姑食,滑動窗口就是滾動窗口波岛,從這個角度來看,滾動窗口是滑動窗口的一個特殊存在音半。
簡單來說则拷,會有數(shù)據(jù)的重復(fù)統(tǒng)計。不同的窗口里面曹鸠,會有交叉的數(shù)據(jù)隔躲。
滾動窗口屬于特殊滑動窗口
代碼和滾動窗口類似:多了個滑動時間
// 監(jiān)控Socket數(shù)據(jù)
val textDstream: DataStream[String] = env.socketTextStream("localhost", 9999)
val dataStream: DataStream[(String, Int)] = textDstream
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(0)
.timeWindow(Time.seconds(10), Time.seconds(5))
.sum(1)
會話窗口(Session Window)
與前滾動窗口和滑動窗口不同的是,會話窗口沒有固定的滑動時間和窗口大小物延,而是通過一個 session gap 來指定窗口間隔宣旱。如果在 session gap 規(guī)定的時間內(nèi)沒有活躍數(shù)據(jù)進入的話,則認為當前窗口結(jié)束叛薯,下一個窗口開始浑吟。session gap 可以理解為相鄰元素的最大時間差。
簡單理解下耗溜,類似Tomcat的session组力,只要在當前session下有數(shù)據(jù)過來,則session繼續(xù)保持抖拴;超過session規(guī)定等待時間燎字,則session失效腥椒;當前session失效后,如果有新的數(shù)據(jù)進來候衍,則建立新的session笼蛛。
把一個session內(nèi)的數(shù)據(jù)進行統(tǒng)計
代碼如下:
val dataStream: DataStream[(String, Int)] = textDstream
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(0)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.sum(1)
CountWindow
基于輸入數(shù)據(jù)量定義,與時間無關(guān)蛉鹿。Count Window 也可以細分為滾動窗口和滑動窗口滨砍,邏輯和 Time Window 中的滾動窗口和滑動窗口的邏輯類似,只是窗口大小和觸發(fā)條件由時間換成了相同 Key 元素的數(shù)量妖异。窗口大小是由相同 Key 元素的數(shù)量來觸發(fā)執(zhí)行惋戏,執(zhí)行時只計算元素數(shù)量達到窗口大小的 key 對應(yīng)的結(jié)果。
簡單來說他膳,key重復(fù)次數(shù)到達閾值响逢,則觸發(fā)計算。
適合業(yè)務(wù)那種規(guī)定某個消息出現(xiàn)多少次則計算或者上報一次
val dataStream: DataStream[(String, Int)] = textDstream
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(0)
// .countWindow(3) // 一個key出現(xiàn)了三次棕孙,則會觸發(fā)一次 針對該key的 計算舔亭。且只計算這3個key
.countWindow(3,2) // 一個key出現(xiàn)了兩次,則觸發(fā)一次針對該key的計算散罕。且會計算該key最近3個key分歇。也就是說,會有重復(fù)統(tǒng)計的情況
.sum(1)
Time和WaterMark
在流式數(shù)據(jù)處理中欧漱,如何保證數(shù)據(jù)的全局有序和 Exactly Once(精準一次消費)是非常重要的职抡。雖然數(shù)據(jù)在上游產(chǎn)生的時候是唯一并且有序的,但是數(shù)據(jù)從產(chǎn)生到進入 Flink 的過程中误甚,中間可能會由于負載均衡缚甩、網(wǎng)絡(luò)傳輸、分區(qū)等等原因造成數(shù)據(jù)亂序窑邦,為了應(yīng)對這種情況擅威,所以我們引入了時間語義和 WaterMark 的概念
內(nèi)容范圍
? Time
o Event Time (事件生成時間)
o Ingestion Time (事件接入時間)
o Processing Time (事件處理時間)
? Watermark
o Watermark 的概念
o Watermark 的使用
TIME時間語義
? Event Time:指的是事件產(chǎn)生的時間。通常由事件中的某個時間戳字段來表示冈钦,比如用戶登錄日志中所攜帶的時間字段郊丛、天氣信號檢測系統(tǒng)中采集到的天氣數(shù)據(jù)中所攜帶的時間字段。
? Ingestion Time:指的是事件進入 Flink 中的時間瞧筛。
? Processing Time:指的是時間被處理時的當前系統(tǒng)時間厉熟。比如某條數(shù)據(jù)進入 Flink 之后,我們運行到某個算子時的系統(tǒng)時間较幌,就是 Processing Time揍瑟。Processing Time 是 Flink 中的默認時間屬性。
也就是說乍炉,我們窗口默認處理的時間其實是 Processing Time绢片。
但是有時候希望是 Ingestion Time滤馍。。底循。巢株。。而且從業(yè)務(wù)角度來說此叠,我們更加想使用 Event Time 纯续。随珠。灭袁。。窗看。茸歧。。显沈。软瞎。。拉讯。涤浇,可以參考下面代碼:
// 設(shè)置使用EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 設(shè)置使用IngestionTime
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
Watermark 原理
一般業(yè)務(wù)中以Event Time作為處理時間,但是魔慷,Event 產(chǎn)生的比較早只锭,到達flink的時間有可能比較遲。這個時候需要flink進行等待院尔。比如12:00:00 的event Time蜻展,F(xiàn)link等到12:00:05
WaterMark 代碼樣例
// 指定使用waterMark的方式處理處理時間,也就是指定EventTime邀摆,該時間是事件的業(yè)務(wù)日期
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val data: DataStream[UserLog] = env.socketTextStream("localhost", 9999)
.map(line => {
val arr = line.split(",")
UserLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4).toLong)
})
// 指定DataStream的WaterMark字段纵顾,并且以該字段作為基礎(chǔ)來處理數(shù)據(jù)
// 指定WaterMark字段以后,同時指定具體的等待時間
data.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[UserLog](Time.seconds(2)) {
override def extractTimestamp(element: UserLog): Long = {
element.time
}
}).print()
Table API 和 SQL
首先是TableAPI栋盹,其實就是一種聲明式編程的方式施逾,類似于Spark的DataFrame和DataSet。
這是SQL的底層例获。
其實還是要加一些POM的依賴汉额,當然具體的版本根據(jù)自己的情況來
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>1.11.0</version>
</dependency>
TableAPI
看下面代碼樣例,在Flink批處理中躏敢,使用對Table的一些簡單SELECT 操作闷愤。
//初始化Table API的上下文環(huán)境
val tableEnv = BatchTableEnvironment.create(env)
tableEnv
.connect(new FileSystem().path("D:/ubuntu_linux_base/userlog.log"))
.withFormat(new OldCsv())
.withSchema(new Schema()
.field("time", DataTypes.BIGINT())
.field("action", DataTypes.STRING())
.field("city", DataTypes.STRING())
.field("IP", DataTypes.STRING())
.field("user_id", DataTypes.BIGINT())
)
.createTemporaryTable("temp_userlog")
tableEnv.from("temp_userlog").printSchema()
val res = tableEnv.from("temp_userlog").select("city")
tableEnv.toDataSet[Row](res).print();
這里面讀取以逗號分隔的文件,且最終定義了多個列件余。
然后讀取文件的schema信息讥脐,和展示文件的某個列的數(shù)據(jù)遭居。
如果是流式處理,其實代碼差不多旬渠,看下面代碼:
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types._
object StreamTableTest {
def main(args: Array[String]): Unit = {
import org.apache.flink.streaming.api.scala._
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(streamEnv)
val data = streamEnv.socketTextStream("localhost", 9999)
.map(line => {
val arr = line.split(",")
UserLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4).toLong)
})
val table: Table = tableEnv.fromDataStream(data)
table.printSchema()
val res = table
.where('city === "北京" || 'city === "成都")
.groupBy('city)
.select('city, 'user_id.count as 'cnt )
tableEnv
.toRetractStream[Row](res)
.print()
streamEnv.execute()
}
case class UserLog(time: Long, action: String, city: String, ip: String, user_id: Long)
區(qū)別不是很大俱萍,TableAPI 聲明式編程適合批處理和流式處理兩種情況。且處理的過程中告丢,可以使用各種查詢條件等枪蘑,非常方便。
Flink SQL
創(chuàng)建 MyFlinkSql object岖免。還是針對”過濾出城市為北京和成都的用戶岳颇,并分別統(tǒng)計這兩個城市中的用戶數(shù)量“這個業(yè)務(wù)邏輯,對應(yīng)到 Flink SQL 中的語法為:
object MyFlinkSql {
def main(args: Array[String]): Unit = {
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(streamEnv)
val data = streamEnv.socketTextStream("localhost", 9999)
.map(line => {
val arr = line.split(",")
UserLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4))
})
val table = tableEnv.fromDataStream(data)
//tableEnv.registerDataStream("temp_userlog", data) // 也可以直接在SQL中使用指定臨時表的名字的方式
val res = tableEnv.sqlQuery(
s"""
|select
| city, count(user_id) as cnt
|from
| $table
|where
| city = '北京' or city = '成都'
|group by
| city
|""".stripMargin)
tableEnv
.toRetractStream[Row](res)
.filter(_._1 == true)
.print()
tableEnv.execute("Table API")
}
case class UserLog(time: Long, action: String, city: String, ip: String, user_id: String)
}
可以通過sql的方式對流式的數(shù)據(jù)進行統(tǒng)計颅湘。
輸入下面數(shù)據(jù):
20210301121533,login,北京,118.128.11.31,0001
20210301121536,login,上海,10.90.113.150,0002
20210301121544,login,成都,112.112.31.33,0003
20210301121559,login,成都,101.132.93.24,0004
20210301121612,login,上海,189.112.89.78,0005
20210301121638,login,北京,113.52.101.50,0006
20210301121533,login,北京,118.128.11.31,0001
20210301121536,login,上海,10.90.113.150,0002
20210301121544,login,成都,112.112.31.33,0003
20210301121559,login,成都,101.132.93.24,0004
20210301121612,login,上海,189.112.89.78,0005
20210301121638,login,北京,113.52.101.50,0006
20210301121533,login,北京,118.128.11.31,0001
20210301121536,login,上海,10.90.113.150,0002
20210301121544,login,成都,112.112.31.33,0003
20210301121559,login,成都,101.132.93.24,0004
20210301121612,login,上海,189.112.89.78,0005
20210301121638,login,北京,113.52.101.50,0006
Flink處理結(jié)果如下:
13> (true,成都,1)
16> (true,北京,1)
13> (false,成都,1)
13> (true,成都,2)
16> (false,北京,1)
16> (true,北京,2)
16> (false,北京,2)
16> (true,北京,3)
13> (false,成都,2)
13> (true,成都,3)
13> (false,成都,3)
13> (true,成都,4)
16> (false,北京,3)
16> (true,北京,4)
16> (false,北京,4)
16> (true,北京,5)
13> (false,成都,4)
13> (true,成都,5)
13> (false,成都,5)
13> (true,成都,6)
16> (false,北京,5)
16> (true,北京,6)