Flink常用API詳解
概述:
Flink根據(jù)抽象程度分層,提供了3種不同的API和庫(kù),每一種API在簡(jiǎn)潔性和表達(dá)力上有著不同的側(cè)重,并且針對(duì)不同的應(yīng)用場(chǎng)景.
ProcessFunction
概述:
ProcessFunctino是Flink所提供最底層接口. ProcessFunction可以處理一或兩條輸入數(shù)據(jù)流中的單個(gè)事件或者歸入一個(gè)特定窗口
內(nèi)的多個(gè)事件,它提供了對(duì)于時(shí)間和狀態(tài)的細(xì)粒度控制,開發(fā)者可以在其中任意地修改狀態(tài),也能夠注冊(cè)定時(shí)器用以在未來(lái)的某一時(shí)刻觸發(fā)
回調(diào)函數(shù).因此你可以利用ProcessFunctino實(shí)現(xiàn)許多有狀態(tài)的事件驅(qū)動(dòng)應(yīng)用所需要的基于單個(gè)事件的復(fù)雜業(yè)務(wù)邏輯.
DataStream API
概述:
為許多通用的流處理操作提供了處理原語(yǔ)
這些操作包括窗口、逐條記錄的的轉(zhuǎn)換操作,在處理時(shí)間時(shí)進(jìn)行外部數(shù)據(jù)庫(kù)查詢等. DataStream API支持Java 和 Scala語(yǔ)言,預(yù)先定義了
例如map() 茅特、reduce() 娇斑、Aggregate()等函數(shù),你可以通過(guò)擴(kuò)展實(shí)現(xiàn)預(yù)定義接口或使用Java、SCala的lambda表達(dá)式實(shí)現(xiàn)自定義的函數(shù)
SQL& Table API:
概述:
Flink支持兩種關(guān)系型的API,Table API和SQL,這兩個(gè)API都是批處理,和流處理統(tǒng)一的API,這意味著在無(wú)邊界的實(shí)時(shí)數(shù)據(jù)流和
有邊界的歷史記錄數(shù)據(jù)流上,關(guān)系型API 會(huì)以相同的語(yǔ)義執(zhí)行查詢,并產(chǎn)生相同的結(jié)果.Table API 和SQL 接住了 Apache Calcite
來(lái)進(jìn)行查詢的解析,校驗(yàn)以及優(yōu)化,它們可以與DataStream和DataSEt API無(wú)縫集成,并支持用戶自定義的標(biāo)量函數(shù),聚合函數(shù)以及標(biāo)志函數(shù).
復(fù)雜事件處理-CEP
概述:
模式檢測(cè)是事件流處理中的一個(gè)非常常見(jiàn)的用例. Flink 的CEP 庫(kù)提供了API,使用戶能夠以例如正則表達(dá)式
或狀態(tài)機(jī)的方式指定事件模式,CEP庫(kù)與Flink的DataStream API集成,以便在DataStream上評(píng)估模式.
CEP庫(kù)的應(yīng)用包括網(wǎng)絡(luò)入侵檢測(cè),業(yè)務(wù)流程監(jiān)控和欺詐檢測(cè).
DataSet API
概述:
DataSet API 是Flink用于批處理應(yīng)用程序的核心 API, DataSet API所提供的的基礎(chǔ)算子 包括 map吗铐、reduce东亦、(outer) join、co-group唬渗、
iterate等.
所有算子都有相應(yīng)的算法和數(shù)據(jù)結(jié)構(gòu)支持,對(duì)內(nèi)存中的序列化數(shù)據(jù)進(jìn)行操作. 如果數(shù)據(jù)大小超過(guò)預(yù)留內(nèi)存,則過(guò)量數(shù)據(jù)將存儲(chǔ)到磁盤.
Gallery 可擴(kuò)展的圖形處理和分析庫(kù).
DataStream 的編程模型,
概述:
DataStream的編程模型包括四個(gè)部分: Environment,DataSource,Trasnformation,Sink
初始化上下文環(huán)境-> 數(shù)據(jù)源-> 轉(zhuǎn)換操作,=> 數(shù)據(jù)輸出
Flink的DataSource數(shù)據(jù)源;
1. 基于文件的Source
讀取HDFS文件系統(tǒng)的Source
// 首先需要配置Hadoop的依賴
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
</dependency>
讀取HDFS上的文件:
env.readTextFile($HDFS_PATH)
2. 基于集合的Source
env.fromCollection()
3. 基于Kafka的Source
首先需要配置Kafka連接器的依賴,另外更多的連接器可以查看官網(wǎng)
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.9.1</version>
</dependency>
3. Flink的Sink數(shù)據(jù)目標(biāo)
概述:
Flink針對(duì)DataStream提供了大量的已經(jīng)實(shí)現(xiàn)的數(shù)據(jù)目標(biāo)(Sink),包括文件,Kafka,Redis,HDFS,Elasticsearch
1. 基于HDFS的Sink
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>1.9.1</version>
</dependency>
Streaming File Sink能把數(shù)據(jù)寫入HDFS中,還可以支持分桶寫入,每一個(gè)分桶就對(duì)應(yīng)HDFS中的一個(gè)目錄,
默認(rèn)按照該小時(shí)來(lái)分桶,在一個(gè)桶內(nèi)部,會(huì)進(jìn)一步將輸出基于滾動(dòng)策略切分成更小的文件.這有助于防止桶文件變得過(guò)大,
滾動(dòng)策略也是可以配置的,默認(rèn)策略會(huì)更具文件大小和超時(shí)時(shí)間來(lái)滾動(dòng)文件,超時(shí)時(shí)間是指沒(méi)有新數(shù)據(jù)寫入部分文件(part file)的時(shí)間
2. 基于Redis的Sink
3. 基于Kafka的Sink
4. 自定義的Sink
4. DataStream 轉(zhuǎn)換算子
概述:
即通過(guò)從一個(gè)或多個(gè)DataStream生成新的DataStream的過(guò)程被稱為Transformation操作. 在轉(zhuǎn)換過(guò)程中,每種操作類型被
定義為不同的Operator,Flink程序都能夠?qū)⒍鄠€(gè)Transformation組成一個(gè)DataFlow的拓?fù)?
1. Map[DataStream->DataStream]
調(diào)用用戶自定義的MapFunction對(duì)DataStream[T]數(shù)據(jù)進(jìn)行處理,形成新的DataStream[T],其中數(shù)據(jù)格式可能會(huì)發(fā)生變化,
常用作對(duì)數(shù)據(jù)集內(nèi)數(shù)據(jù)的清洗和轉(zhuǎn)換,例如將輸入數(shù)據(jù)集中的每個(gè)數(shù)值全部加1處理,并且將數(shù)據(jù)輸出到下游數(shù)據(jù)集
2. FlatMap[DataStream->DataStream]
該算子主要應(yīng)用處理輸入一個(gè)元素產(chǎn)生一個(gè)或者多個(gè)元素的計(jì)算場(chǎng)景,比較常見(jiàn)的是在經(jīng)典例子WordCount中,將每一行的
文本數(shù)據(jù)切割,生成單詞序列
3. Filter[DataStream->DataStream]
該算子將按照條件對(duì)輸入數(shù)據(jù)集進(jìn)行篩選操作,將符合條件的數(shù)據(jù)集輸出,將不符合條件的數(shù)據(jù)過(guò)濾掉.
3.1 // 通過(guò)通配符
val filter:DataStream[Int] = dataStream.filter{_%2==0}
3.2 // 或者指定運(yùn)算表達(dá)式
val filter:DataStream[Int] = dataStream.filter{x=> x%2==0}
4. KeyBy[DataStream->KeyedStream]
該算子根據(jù)指定的key將輸入的DataStream[T]數(shù)據(jù)格式轉(zhuǎn)換為KeyedStream[T],也就是在數(shù)據(jù)集中執(zhí)行Partition操作,將
相同的Key值的數(shù)據(jù)放置在相同的分區(qū)中.
例如WordCount-> 將數(shù)據(jù)集中第一個(gè)參數(shù)作為Key,對(duì)數(shù)據(jù)集進(jìn)行KeyBy函數(shù)操作,形成根據(jù)Id分區(qū)的KeyedStream數(shù)據(jù)集.
eg:
val dataStream = env.fromElements((1,5),(2,2),(2,4),(1,3))
//指定第一個(gè)字段為分區(qū)key
val keyedStream: KeyedStream [(String,Int),Tuple] = dataStream.keyBy(0)
5. Reduce[KeyedStream->DataStream]
該算子和MapReduce中Reduce原理基本一致,主要目的是將輸入的KeyedStream通過(guò)傳入的用戶自定義地ReduceFunction
滾動(dòng)地進(jìn)行數(shù)據(jù)聚合處理,其中定義ReduceFunction必須滿足運(yùn)算結(jié)合律和交換律,
eg:
對(duì)傳入的KeyedStream數(shù)據(jù)集中相同key值的數(shù)據(jù)獨(dú)立進(jìn)行求和運(yùn)算,得到每個(gè)key所對(duì)應(yīng)的求和值.
val dataStream = env.fromElements(("a",3),("d",4),("c",2),("a",5))
//指定第一個(gè)字段為分區(qū)key`
val keyedStream:KeyedStream[(String,Int),Tuple] = dataStream.keyBy(0)
// 滾動(dòng)對(duì)第二個(gè)字段進(jìn)行reduce相加求和
val reduceStream = keyedStream.reduce{(x1,x2)=>(x1._1,x1._2+x2._2)
6. Aggregations[KeyedStream->DataStream]
Aggregations是KeyedDataStream接口提供的聚合算子,根據(jù)指定的字段進(jìn)行聚合操作,滾動(dòng)地產(chǎn)生一系列數(shù)據(jù)聚合結(jié)果.
其實(shí)是將Reduce算子中的函數(shù)進(jìn)行了封裝,封裝的聚合操作有sum典阵、min奋渔、minBy、max萄喳、maxBy等,這樣就不需要用戶自己定義
Reduce函數(shù).
eg:
指定數(shù)據(jù)集中第一個(gè)字段作為key,用第二個(gè)字段作為累加字段,然后滾動(dòng)地對(duì)第二個(gè)字段的數(shù)值進(jìn)行累加并輸出
//指定第一個(gè)字段為分區(qū)key
val keyedStream:KeyedStream[(Int,Int),Tuple] = dataStream.keyBy(0)
// 對(duì)對(duì)第二個(gè)字段進(jìn)行sum統(tǒng)計(jì)
val sumStream:DataStream[(Int,Int)] = keyedStream.sum(1)
// 輸出計(jì)算結(jié)果
sumStream.print()
7. Union[DataStream->DataStream]
union算子主要是將兩個(gè)或者多個(gè)輸入的數(shù)據(jù)集合并成一個(gè)數(shù)據(jù)集,需要保證兩個(gè)數(shù)據(jù)集的格式一致,輸出的數(shù)據(jù)集的格式和
輸入的數(shù)據(jù)集格式保持一致,
code:
//獲取flink實(shí)時(shí)流處理的環(huán)境
val env = ExecutionEnvironment.getExecutionEnvironment
// 創(chuàng)建不同的數(shù)據(jù)集
val dataStream = env.fromElements(("a", 3), ("d", 4), ("c", 2), ("a", 5))
val dataStream2 = env.fromElements(("a", 1), ("d", 1), ("c", 1), ("a", 1))
// 調(diào)用union算子進(jìn)行不同的數(shù)據(jù)集合并
dataStream.union(dataStream2).print()
8. Connect,CoMap,CoFlatMap[DataStream->ConnectedStream->DataStream](只能在Stream才可以用)
connect算子主要是為了合并兩種或者多種不同數(shù)據(jù)類型的數(shù)據(jù)集,合并后會(huì)保留原來(lái)原來(lái)數(shù)據(jù)集的數(shù)據(jù)類型.
例如: dataStream1數(shù)據(jù)集為(String,Int) 元組類型,dataStream2數(shù)據(jù)集為Int類型,通過(guò)connect連接算子將兩個(gè)不同數(shù)據(jù)
類型的流結(jié)合在一起,形成格式為ConnectedStreams的數(shù)據(jù)集,其內(nèi)部數(shù)據(jù)為[(String,Int),Int]的混合數(shù)據(jù)類型,保留了兩個(gè)
原始數(shù)據(jù)集的數(shù)據(jù)類型
eg:
//獲取flink實(shí)時(shí)流處理的環(huán)境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 創(chuàng)建不同的數(shù)據(jù)集
val dataStream = env.fromElements(("a", 3), ("d", 4), ("c", 2), ("a", 5))
val dataStream2 = env.fromElements(1, 2, 4, 5)
// 連接兩個(gè)DataStream數(shù)據(jù)集
val connectedStream = dataStream.connect(dataStream2)
val result = connectedStream.map(
//第一個(gè)處理函數(shù)
t1 => {
(t1._1, t1._2)
},
//第二個(gè)處理函數(shù)
t2 => {
(t2, 0)
})
result.print()
env.execute("h")
注意:Union和Connect區(qū)別
1. Union之間兩個(gè)流的類型必須是一樣,Connect可以不一樣,在之后的coMap中在去調(diào)整成為一樣的.
2. Connect只能操作兩個(gè)流,Union可以操作多個(gè)
9. Split 和 select [DataStream -> SplitStream->DataStream]
Split算子是將一個(gè)DataStream數(shù)據(jù)集按照條件進(jìn)行拆分,形成兩個(gè)數(shù)據(jù)集的過(guò)程,也是Union算子的逆向?qū)崿F(xiàn),每個(gè)接入的數(shù)據(jù)
都會(huì)被路由到一個(gè)或者多個(gè)輸出數(shù)據(jù)集中,
在使用Splict函數(shù)中,需要定義split函數(shù)中的切分邏輯,通過(guò)調(diào)用split函數(shù),然后指定條件判斷函數(shù),
例如: 如下代碼所示,將根據(jù)第二個(gè)字段的奇偶性將數(shù)據(jù)集標(biāo)記出來(lái),如果是偶數(shù)則標(biāo)記為even,如果是奇數(shù)則標(biāo)記為odd,然后通過(guò)集合將標(biāo)記返回,最終生成格式SplitStream的數(shù)據(jù)集
code:
//獲取flink實(shí)時(shí)流處理的環(huán)境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 導(dǎo)入Flink隱式轉(zhuǎn)換
import org.apache.flink.streaming.api.scala._
// 創(chuàng)建不同的數(shù)據(jù)集
val dataStream = env.fromElements(("a", 3), ("d", 4), ("c", 2), ("a", 5))
val splitedStream = dataStream.split(t => if (t._2 % 2 == 0) Seq("even") else Seq("odd"))
// Split函數(shù)本身只是對(duì)輸入數(shù)據(jù)集進(jìn)行標(biāo)記,并沒(méi)有將數(shù)據(jù)集真正的實(shí)現(xiàn)拆分,因此需要借助Select函數(shù)根據(jù)標(biāo)記將數(shù)據(jù)切分成不同的數(shù)據(jù)集,
//篩選出偶數(shù)數(shù)據(jù)集
val evenStream = splitedStream.select("even").print()
//篩選出偶數(shù)數(shù)據(jù)集
val oddStream = splitedStream.select("odd")
env.execute("l ")
函數(shù)類和富函數(shù)類
概述:
前面學(xué)過(guò)的所有算子集合都可以自定義一個(gè)函數(shù)類,富函數(shù)類作為參數(shù),因?yàn)镕link暴露了這兩種函數(shù)類的接口,常見(jiàn)的函數(shù)接口:
1. MapFunction
2. FlatMapFunction
3. ReduceFunction
富函數(shù)接口它與其他常規(guī)函數(shù)接口的不同在于:可以獲取運(yùn)行環(huán)境的上下文,在上下文環(huán)境中可以管理狀態(tài)(State),并擁有一些生命周期方法,
所以可以實(shí)現(xiàn)更復(fù)雜的功能.富函數(shù)的接口有:
1. RichMapFunction
2. RichFlatMapFunction
3. RichFilterFunction
1. 普通函數(shù)類舉例:
按照指定的時(shí)間格式輸出每個(gè)通話的撥號(hào)時(shí)間和結(jié)束時(shí)間
code:
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object FunctionClassTransformation {
case class StationLog(sid: String, var callOut: String, var callInt: String, callType: String, callTime: Long, duration: Long)
def main(args: Array[String]): Unit = {
//獲取flink實(shí)時(shí)流處理的環(huán)境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 導(dǎo)入Flink隱式轉(zhuǎn)換
import org.apache.flink.streaming.api.scala._
val data = env.readTextFile(getClass.getResource("station.log").getPath)
.map { line =>
var arr = line.split(",")
StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
}
// 定義時(shí)間輸出格式
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
// 過(guò)濾那些通話成功的
data.filter(_.callType.equals("success"))
.map(new CallMapFunction(format))
.print()
env.execute("l ")
}
class CallMapFunction(format: SimpleDateFormat) extends
MapFunction[StationLog, String] {
override def map(t: StationLog): String = {
var startTime = t.callTime
val endTime = t.callTime + t.duration * 1000
"主叫號(hào)碼: " + t.callOut + " , 被叫號(hào)碼: " + t.callInt + ", 呼叫起始時(shí)間: " + format.format(new Date(startTime)) + ",呼叫結(jié)束時(shí)間: " + format.format(new Date(endTime))
}
}}
result:
主叫號(hào)碼: 18600003186 , 被叫號(hào)碼: 18900002113, 呼叫起始時(shí)間: 2019-12-23 13:54:13,呼叫結(jié)束時(shí)間: 2019-12-23 13:54:45
主叫號(hào)碼: 18600003794 , 被叫號(hào)碼: 18900009608, 呼叫起始時(shí)間: 2019-12-23 13:54:13,呼叫結(jié)束時(shí)間: 2019-12-23 13:54:17
主叫號(hào)碼: 18600000005 , 被叫號(hào)碼: 18900007729, 呼叫起始時(shí)間: 2019-12-23 13:56:43,呼叫結(jié)束時(shí)間: 2019-12-23 14:02:32
主叫號(hào)碼: 18600005404 , 被叫號(hào)碼: 18900000558, 呼叫起始時(shí)間: 2019-12-23 13:54:17,呼叫結(jié)束時(shí)間: 2019-12-23 13:54:22
主叫號(hào)碼: 18600003532 , 被叫號(hào)碼: 18900008128, 呼叫起始時(shí)間: 2019-12-23 13:54:19,呼叫結(jié)束時(shí)間: 2019-12-23 13:54:29
主叫號(hào)碼: 18600003532 , 被叫號(hào)碼: 18900008128, 呼叫起始時(shí)間: 2019-12-23 13:54:26,呼叫結(jié)束時(shí)間: 2019-12-23 13:54:41
主叫號(hào)碼: 18600003502 , 被叫號(hào)碼: 18900009859, 呼叫起始時(shí)間: 2019-12-23 13:54:28,呼叫結(jié)束時(shí)間: 2019-12-23 13:54:28
主叫號(hào)碼: 18600003502 , 被叫號(hào)碼: 18900009859, 呼叫起始時(shí)間: 2019-12-23 13:54:28,呼叫結(jié)束時(shí)間: 2019-12-23 13:54:28
2. 富函數(shù)類舉例:
把呼叫成功的通話信息轉(zhuǎn)換成真實(shí)的用戶姓名,通話用戶對(duì)應(yīng)的用戶表(在MySql數(shù)據(jù)庫(kù)中),
由于需要從數(shù)據(jù)庫(kù)中查詢數(shù)據(jù),就需要?jiǎng)?chuàng)建連接,創(chuàng)建連接的代碼必須寫在生命周期的open方法中,所以需要使用富函數(shù)類.
Rich Function有一個(gè)生命周期的概念,典型的生命周期方法有:
open()方法是rich function的初始化方法,當(dāng)一個(gè)算子例如map或者filter被調(diào)用之前open()會(huì)被調(diào)用.
clsose()方法是生命周期中的最后一個(gè)調(diào)用的方法,做一些清理工作
getRuntimeContext()方法提供了函數(shù)的RuntimeContext的一些信息,例如函數(shù)執(zhí)行的并行度,任務(wù)的名字,以及state狀態(tài)
code:
package com.bjsxt.flink.transformation
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import com.bjsxt.flink.source.StationLog
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object TestRichFunctionClass {
/**
* 把通話成功的電話號(hào)碼轉(zhuǎn)換成真是用戶姓名卒稳,用戶姓名保存在Mysql表中
* @param args
*/
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
//讀取數(shù)據(jù)源
var filePath =getClass.getResource("/station.log").getPath
val stream: DataStream[StationLog] = streamEnv.readTextFile(filePath)
.map(line=>{
var arr=line.split(",")
new StationLog(arr(0).trim,arr(1).trim,arr(2).trim,arr(3).trim,arr(4).trim.toLong,arr(5).trim.toLong)
})
//計(jì)算:把電話號(hào)碼變成用戶姓名
val result: DataStream[StationLog] = stream.filter(_.callType.equals("success"))
.map(new MyRichMapFunction)
result.print()
streamEnv.execute()
}
//自定義一個(gè)富函數(shù)類
class MyRichMapFunction extends RichMapFunction[StationLog,StationLog]{
var conn:Connection=_
var pst:PreparedStatement=_
override def open(parameters: Configuration): Unit = {
conn =DriverManager.getConnection("jdbc:mysql://localhost/test","root","123123")
pst =conn.prepareStatement("select name from t_phone where phone_number=?")
}
override def close(): Unit = {
pst.close()
conn.close()
}
override def map(value: StationLog): StationLog = {
println(getRuntimeContext.getTaskNameWithSubtasks)
//查詢主叫號(hào)碼對(duì)應(yīng)的姓名
pst.setString(1,value.callOut)
val result: ResultSet = pst.executeQuery()
if(result.next()){
value.callOut=result.getString(1)
}
//查詢被叫號(hào)碼對(duì)應(yīng)的姓名
pst.setString(1,value.callInt)
val result2: ResultSet = pst.executeQuery()
if(result2.next()){
value.callInt=result2.getString(1)
}
value
}
}
}
6. 底層 ProcessFunctionAPI
概述:
ProcessFunction是一個(gè)低層次的流處理操作,允許所有返回Stream的基礎(chǔ)構(gòu)建模塊:
訪問(wèn)Event本身數(shù)據(jù)(比如:Event的時(shí)間, Event的當(dāng)前Key)
管理狀態(tài) State(僅在keyed Stream中)
管理定時(shí)器Timer( 包括: 注冊(cè)定時(shí)器,刪除定時(shí)器等)
總而言之,ProcessFunction是Flink最底層的API,也是功能最強(qiáng)大的.
例如:
監(jiān)控每一個(gè)手機(jī),如果在5s內(nèi)呼叫它的通話都是失敗的,發(fā)出警告信息.
注意:
這個(gè)案例中會(huì)使用到狀態(tài)編程,請(qǐng)同學(xué)們只要知道狀態(tài)的意思,不需要掌握,
code:
package com.bjsxt.flink.transformation
import com.bjsxt.flink.source.StationLog
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
object TestProcessFunction {
//監(jiān)控每一個(gè)手機(jī)號(hào)碼,如果這個(gè)號(hào)碼在5秒內(nèi)他巨,所有呼叫它的日志都是失敗的充坑,則發(fā)出告警信息
//如果在5秒內(nèi)只要有一個(gè)呼叫不是fail則不用告警
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
//讀取數(shù)據(jù)源
val stream: DataStream[StationLog] = streamEnv.socketTextStream("hadoop101",8888)
.map(line=>{
var arr=line.split(",")
new StationLog(arr(0).trim,arr(1).trim,arr(2).trim,arr(3).trim,arr(4).trim.toLong,arr(5).trim.toLong)
})
//計(jì)算
val result: DataStream[String] = stream.keyBy(_.callInt)
.process(new MonitorCallFail)
result.print()
streamEnv.execute()
}
//自定義一個(gè)底層的類
class MonitorCallFail extends KeyedProcessFunction[String,StationLog,String]{
//使用一個(gè)狀態(tài)對(duì)象記錄時(shí)間
lazy val timeState :ValueState[Long] =getRuntimeContext.getState(new ValueStateDescriptor[Long]("time",classOf[Long]))
override def processElement(value: StationLog, ctx: KeyedProcessFunction[String, StationLog, String]#Context, out: Collector[String]): Unit = {
//從狀態(tài)中取得時(shí)間
var time =timeState.value()
if(time==0 && value.callType.equals("fail") ){ //表示第一次發(fā)現(xiàn)呼叫失敗,記錄當(dāng)前的時(shí)間
//獲取當(dāng)前系統(tǒng)時(shí)間染突,并注冊(cè)定時(shí)器
var nowTime =ctx.timerService().currentProcessingTime()
//定時(shí)器在5秒后觸發(fā)
var onTime =nowTime+8*1000L
ctx.timerService().registerProcessingTimeTimer(onTime)
//把觸發(fā)時(shí)間保存到狀態(tài)中
timeState.update(onTime)
}
if (time!=0 && !value.callType.equals("fail")){ //表示有一次成功的呼叫,必須要?jiǎng)h除定時(shí)器
ctx.timerService().deleteProcessingTimeTimer(time)
timeState.clear() //清空狀態(tài)中的時(shí)間
}
}
//時(shí)間到了捻爷,定時(shí)器執(zhí)行,
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, StationLog, String]#OnTimerContext, out: Collector[String]): Unit = {
var warnStr="觸發(fā)的時(shí)間:"+timestamp +" 手機(jī)號(hào) :"+ctx.getCurrentKey
out.collect(warnStr)
timeState.clear()
}
}
}
7. 側(cè)輸出流 Side Output
概述:
在Flink處理數(shù)據(jù)流時(shí),我們經(jīng)常會(huì)遇到這樣的情況: 在處理一個(gè)數(shù)據(jù)源時(shí),往往需要將該源中的不同類型的數(shù)據(jù)做分割處理,如果使用filter
算子對(duì)數(shù)據(jù)源進(jìn)行篩選分割的話,勢(shì)必會(huì)造成數(shù)據(jù)流的多次復(fù)制,造成不必要的性能浪費(fèi);flink中的側(cè)輸出就是將數(shù)據(jù)流進(jìn)行分割,而不對(duì)流
進(jìn)行復(fù)制的一種分流機(jī)制,
Flink的側(cè)輸出的另一個(gè)作用就是對(duì)延時(shí)遲到的數(shù)據(jù)進(jìn)行處理,這樣就可以不必丟棄遲到的數(shù)據(jù).
案例: (根據(jù)基站的日志,請(qǐng)把呼叫成功的Stream(主流)和不成功的Stream(側(cè)流分別輸出)
code:
package FlinkDemo.functions
9
import FlinkDemo.functions.FunctionClassTransformation.StationLog
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
object TestSideOutpurStream {
// 導(dǎo)入Flink隱式轉(zhuǎn)換
import org.apache.flink.streaming.api.scala._
// 側(cè)輸出流首先需要定義一個(gè)流的標(biāo)簽
val notSuccessTag = new OutputTag[StationLog]("not_success")
def main(args: Array[String]): Unit = {
//獲取flink實(shí)時(shí)流處理的環(huán)境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val data = env.readTextFile(getClass.getResource("station.log").getPath)
.map { line =>
var arr = line.split(",")
StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
}
val mainStream = data
.process(new CreateSideOutputStream(notSuccessTag))
// 得到測(cè)流
val sideStream = mainStream.getSideOutput(notSuccessTag)
mainStream.print("main")
sideStream.print("sideOutput")
env.execute()
}
class CreateSideOutputStream(tag: OutputTag[StationLog]) extends ProcessFunction[StationLog, StationLog] {
override def processElement(value: StationLog, context: ProcessFunction[StationLog, StationLog]#Context, collector: Collector[StationLog]): Unit = {
//輸出主流
if (value.callType.equals("success")) {
collector.collect(value)
}
else {
//輸出側(cè)流
context.output(tag, value)}}}}