Flink 常用API詳解

Flink常用API詳解

概述:
    Flink根據(jù)抽象程度分層,提供了3種不同的API和庫(kù),每一種API在簡(jiǎn)潔性和表達(dá)力上有著不同的側(cè)重,并且針對(duì)不同的應(yīng)用場(chǎng)景.

ProcessFunction

概述:
    ProcessFunctino是Flink所提供最底層接口. ProcessFunction可以處理一或兩條輸入數(shù)據(jù)流中的單個(gè)事件或者歸入一個(gè)特定窗口
    內(nèi)的多個(gè)事件,它提供了對(duì)于時(shí)間和狀態(tài)的細(xì)粒度控制,開發(fā)者可以在其中任意地修改狀態(tài),也能夠注冊(cè)定時(shí)器用以在未來(lái)的某一時(shí)刻觸發(fā)
    回調(diào)函數(shù).因此你可以利用ProcessFunctino實(shí)現(xiàn)許多有狀態(tài)的事件驅(qū)動(dòng)應(yīng)用所需要的基于單個(gè)事件的復(fù)雜業(yè)務(wù)邏輯.

DataStream API

概述:
    為許多通用的流處理操作提供了處理原語(yǔ)
    這些操作包括窗口、逐條記錄的的轉(zhuǎn)換操作,在處理時(shí)間時(shí)進(jìn)行外部數(shù)據(jù)庫(kù)查詢等. DataStream API支持Java 和 Scala語(yǔ)言,預(yù)先定義了
    例如map() 茅特、reduce() 娇斑、Aggregate()等函數(shù),你可以通過(guò)擴(kuò)展實(shí)現(xiàn)預(yù)定義接口或使用Java、SCala的lambda表達(dá)式實(shí)現(xiàn)自定義的函數(shù)

SQL& Table API:

概述:
    Flink支持兩種關(guān)系型的API,Table API和SQL,這兩個(gè)API都是批處理,和流處理統(tǒng)一的API,這意味著在無(wú)邊界的實(shí)時(shí)數(shù)據(jù)流和
    有邊界的歷史記錄數(shù)據(jù)流上,關(guān)系型API 會(huì)以相同的語(yǔ)義執(zhí)行查詢,并產(chǎn)生相同的結(jié)果.Table API 和SQL 接住了 Apache Calcite
    來(lái)進(jìn)行查詢的解析,校驗(yàn)以及優(yōu)化,它們可以與DataStream和DataSEt API無(wú)縫集成,并支持用戶自定義的標(biāo)量函數(shù),聚合函數(shù)以及標(biāo)志函數(shù).

復(fù)雜事件處理-CEP

概述:
    模式檢測(cè)是事件流處理中的一個(gè)非常常見(jiàn)的用例. Flink 的CEP 庫(kù)提供了API,使用戶能夠以例如正則表達(dá)式
    或狀態(tài)機(jī)的方式指定事件模式,CEP庫(kù)與Flink的DataStream API集成,以便在DataStream上評(píng)估模式.
    CEP庫(kù)的應(yīng)用包括網(wǎng)絡(luò)入侵檢測(cè),業(yè)務(wù)流程監(jiān)控和欺詐檢測(cè).

DataSet API

概述:
    DataSet API 是Flink用于批處理應(yīng)用程序的核心 API, DataSet API所提供的的基礎(chǔ)算子 包括 map吗铐、reduce东亦、(outer) join、co-group唬渗、
    iterate等.
    所有算子都有相應(yīng)的算法和數(shù)據(jù)結(jié)構(gòu)支持,對(duì)內(nèi)存中的序列化數(shù)據(jù)進(jìn)行操作. 如果數(shù)據(jù)大小超過(guò)預(yù)留內(nèi)存,則過(guò)量數(shù)據(jù)將存儲(chǔ)到磁盤.

Gallery 可擴(kuò)展的圖形處理和分析庫(kù).

DataStream 的編程模型,
概述:
    DataStream的編程模型包括四個(gè)部分: Environment,DataSource,Trasnformation,Sink
    初始化上下文環(huán)境-> 數(shù)據(jù)源-> 轉(zhuǎn)換操作,=> 數(shù)據(jù)輸出

Flink的DataSource數(shù)據(jù)源;

1. 基于文件的Source
    讀取HDFS文件系統(tǒng)的Source
    // 首先需要配置Hadoop的依賴
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.7.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.7.2</version>
    </dependency>
    讀取HDFS上的文件:
        env.readTextFile($HDFS_PATH)
2. 基于集合的Source
    env.fromCollection()
3. 基于Kafka的Source
    首先需要配置Kafka連接器的依賴,另外更多的連接器可以查看官網(wǎng)
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.11</artifactId>
        <version>1.9.1</version>
    </dependency>

3. Flink的Sink數(shù)據(jù)目標(biāo)

概述:
    Flink針對(duì)DataStream提供了大量的已經(jīng)實(shí)現(xiàn)的數(shù)據(jù)目標(biāo)(Sink),包括文件,Kafka,Redis,HDFS,Elasticsearch
1. 基于HDFS的Sink
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-filesystem_2.11</artifactId>
        <version>1.9.1</version>
    </dependency>
    Streaming File Sink能把數(shù)據(jù)寫入HDFS中,還可以支持分桶寫入,每一個(gè)分桶就對(duì)應(yīng)HDFS中的一個(gè)目錄,
    默認(rèn)按照該小時(shí)來(lái)分桶,在一個(gè)桶內(nèi)部,會(huì)進(jìn)一步將輸出基于滾動(dòng)策略切分成更小的文件.這有助于防止桶文件變得過(guò)大,
    滾動(dòng)策略也是可以配置的,默認(rèn)策略會(huì)更具文件大小和超時(shí)時(shí)間來(lái)滾動(dòng)文件,超時(shí)時(shí)間是指沒(méi)有新數(shù)據(jù)寫入部分文件(part file)的時(shí)間
2. 基于Redis的Sink
3. 基于Kafka的Sink
4. 自定義的Sink

4. DataStream 轉(zhuǎn)換算子

概述:
    即通過(guò)從一個(gè)或多個(gè)DataStream生成新的DataStream的過(guò)程被稱為Transformation操作. 在轉(zhuǎn)換過(guò)程中,每種操作類型被
    定義為不同的Operator,Flink程序都能夠?qū)⒍鄠€(gè)Transformation組成一個(gè)DataFlow的拓?fù)?
1. Map[DataStream->DataStream]
    調(diào)用用戶自定義的MapFunction對(duì)DataStream[T]數(shù)據(jù)進(jìn)行處理,形成新的DataStream[T],其中數(shù)據(jù)格式可能會(huì)發(fā)生變化,
    常用作對(duì)數(shù)據(jù)集內(nèi)數(shù)據(jù)的清洗和轉(zhuǎn)換,例如將輸入數(shù)據(jù)集中的每個(gè)數(shù)值全部加1處理,并且將數(shù)據(jù)輸出到下游數(shù)據(jù)集
2. FlatMap[DataStream->DataStream]
    該算子主要應(yīng)用處理輸入一個(gè)元素產(chǎn)生一個(gè)或者多個(gè)元素的計(jì)算場(chǎng)景,比較常見(jiàn)的是在經(jīng)典例子WordCount中,將每一行的
    文本數(shù)據(jù)切割,生成單詞序列
3. Filter[DataStream->DataStream]
    該算子將按照條件對(duì)輸入數(shù)據(jù)集進(jìn)行篩選操作,將符合條件的數(shù)據(jù)集輸出,將不符合條件的數(shù)據(jù)過(guò)濾掉.
    3.1 // 通過(guò)通配符
        val filter:DataStream[Int] = dataStream.filter{_%2==0}
    3.2 // 或者指定運(yùn)算表達(dá)式
        val filter:DataStream[Int] = dataStream.filter{x=> x%2==0}
4. KeyBy[DataStream->KeyedStream]
    該算子根據(jù)指定的key將輸入的DataStream[T]數(shù)據(jù)格式轉(zhuǎn)換為KeyedStream[T],也就是在數(shù)據(jù)集中執(zhí)行Partition操作,將
    相同的Key值的數(shù)據(jù)放置在相同的分區(qū)中.
    例如WordCount-> 將數(shù)據(jù)集中第一個(gè)參數(shù)作為Key,對(duì)數(shù)據(jù)集進(jìn)行KeyBy函數(shù)操作,形成根據(jù)Id分區(qū)的KeyedStream數(shù)據(jù)集.
    eg:
        val dataStream = env.fromElements((1,5),(2,2),(2,4),(1,3))
        //指定第一個(gè)字段為分區(qū)key
        val keyedStream: KeyedStream [(String,Int),Tuple] = dataStream.keyBy(0)
5. Reduce[KeyedStream->DataStream]
    該算子和MapReduce中Reduce原理基本一致,主要目的是將輸入的KeyedStream通過(guò)傳入的用戶自定義地ReduceFunction
    滾動(dòng)地進(jìn)行數(shù)據(jù)聚合處理,其中定義ReduceFunction必須滿足運(yùn)算結(jié)合律和交換律,
    eg:
        對(duì)傳入的KeyedStream數(shù)據(jù)集中相同key值的數(shù)據(jù)獨(dú)立進(jìn)行求和運(yùn)算,得到每個(gè)key所對(duì)應(yīng)的求和值.
        val dataStream = env.fromElements(("a",3),("d",4),("c",2),("a",5))
        //指定第一個(gè)字段為分區(qū)key`
        val keyedStream:KeyedStream[(String,Int),Tuple] = dataStream.keyBy(0)
        // 滾動(dòng)對(duì)第二個(gè)字段進(jìn)行reduce相加求和
        val reduceStream = keyedStream.reduce{(x1,x2)=>(x1._1,x1._2+x2._2)
6. Aggregations[KeyedStream->DataStream]
    Aggregations是KeyedDataStream接口提供的聚合算子,根據(jù)指定的字段進(jìn)行聚合操作,滾動(dòng)地產(chǎn)生一系列數(shù)據(jù)聚合結(jié)果.
    其實(shí)是將Reduce算子中的函數(shù)進(jìn)行了封裝,封裝的聚合操作有sum典阵、min奋渔、minBy、max萄喳、maxBy等,這樣就不需要用戶自己定義
    Reduce函數(shù).
    eg:
        指定數(shù)據(jù)集中第一個(gè)字段作為key,用第二個(gè)字段作為累加字段,然后滾動(dòng)地對(duì)第二個(gè)字段的數(shù)值進(jìn)行累加并輸出
        //指定第一個(gè)字段為分區(qū)key
        val keyedStream:KeyedStream[(Int,Int),Tuple] = dataStream.keyBy(0)
        // 對(duì)對(duì)第二個(gè)字段進(jìn)行sum統(tǒng)計(jì)
        val sumStream:DataStream[(Int,Int)] = keyedStream.sum(1)
        // 輸出計(jì)算結(jié)果
        sumStream.print()
7. Union[DataStream->DataStream]
    union算子主要是將兩個(gè)或者多個(gè)輸入的數(shù)據(jù)集合并成一個(gè)數(shù)據(jù)集,需要保證兩個(gè)數(shù)據(jù)集的格式一致,輸出的數(shù)據(jù)集的格式和
    輸入的數(shù)據(jù)集格式保持一致,
    code:
        //獲取flink實(shí)時(shí)流處理的環(huán)境
        val env = ExecutionEnvironment.getExecutionEnvironment
        // 創(chuàng)建不同的數(shù)據(jù)集
        val dataStream = env.fromElements(("a", 3), ("d", 4), ("c", 2), ("a", 5))
        val dataStream2 = env.fromElements(("a", 1), ("d", 1), ("c", 1), ("a", 1))
        // 調(diào)用union算子進(jìn)行不同的數(shù)據(jù)集合并
        dataStream.union(dataStream2).print()
8. Connect,CoMap,CoFlatMap[DataStream->ConnectedStream->DataStream](只能在Stream才可以用)
    connect算子主要是為了合并兩種或者多種不同數(shù)據(jù)類型的數(shù)據(jù)集,合并后會(huì)保留原來(lái)原來(lái)數(shù)據(jù)集的數(shù)據(jù)類型.
    例如:  dataStream1數(shù)據(jù)集為(String,Int) 元組類型,dataStream2數(shù)據(jù)集為Int類型,通過(guò)connect連接算子將兩個(gè)不同數(shù)據(jù)
    類型的流結(jié)合在一起,形成格式為ConnectedStreams的數(shù)據(jù)集,其內(nèi)部數(shù)據(jù)為[(String,Int),Int]的混合數(shù)據(jù)類型,保留了兩個(gè)
    原始數(shù)據(jù)集的數(shù)據(jù)類型
    eg:
        //獲取flink實(shí)時(shí)流處理的環(huán)境
            val env = StreamExecutionEnvironment.getExecutionEnvironment
            // 創(chuàng)建不同的數(shù)據(jù)集
            val dataStream = env.fromElements(("a", 3), ("d", 4), ("c", 2), ("a", 5))
            val dataStream2 = env.fromElements(1, 2, 4, 5)
            // 連接兩個(gè)DataStream數(shù)據(jù)集
            val connectedStream = dataStream.connect(dataStream2)
            val result = connectedStream.map(
              //第一個(gè)處理函數(shù)
              t1 => {
                (t1._1, t1._2)
              },
              //第二個(gè)處理函數(shù)
              t2 => {
                (t2, 0)
              })
            result.print()
            env.execute("h")
    注意:Union和Connect區(qū)別
        1. Union之間兩個(gè)流的類型必須是一樣,Connect可以不一樣,在之后的coMap中在去調(diào)整成為一樣的.
        2. Connect只能操作兩個(gè)流,Union可以操作多個(gè)
9. Split 和 select [DataStream -> SplitStream->DataStream]
    Split算子是將一個(gè)DataStream數(shù)據(jù)集按照條件進(jìn)行拆分,形成兩個(gè)數(shù)據(jù)集的過(guò)程,也是Union算子的逆向?qū)崿F(xiàn),每個(gè)接入的數(shù)據(jù)
    都會(huì)被路由到一個(gè)或者多個(gè)輸出數(shù)據(jù)集中,
    在使用Splict函數(shù)中,需要定義split函數(shù)中的切分邏輯,通過(guò)調(diào)用split函數(shù),然后指定條件判斷函數(shù),
    例如: 如下代碼所示,將根據(jù)第二個(gè)字段的奇偶性將數(shù)據(jù)集標(biāo)記出來(lái),如果是偶數(shù)則標(biāo)記為even,如果是奇數(shù)則標(biāo)記為odd,然后通過(guò)集合將標(biāo)記返回,最終生成格式SplitStream的數(shù)據(jù)集
    code:
        
        //獲取flink實(shí)時(shí)流處理的環(huán)境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        // 導(dǎo)入Flink隱式轉(zhuǎn)換
        import org.apache.flink.streaming.api.scala._
        // 創(chuàng)建不同的數(shù)據(jù)集
        val dataStream = env.fromElements(("a", 3), ("d", 4), ("c", 2), ("a", 5))
        val splitedStream = dataStream.split(t => if (t._2 % 2 == 0) Seq("even") else Seq("odd"))
    
        // Split函數(shù)本身只是對(duì)輸入數(shù)據(jù)集進(jìn)行標(biāo)記,并沒(méi)有將數(shù)據(jù)集真正的實(shí)現(xiàn)拆分,因此需要借助Select函數(shù)根據(jù)標(biāo)記將數(shù)據(jù)切分成不同的數(shù)據(jù)集,
        //篩選出偶數(shù)數(shù)據(jù)集
        val evenStream = splitedStream.select("even").print()
        //篩選出偶數(shù)數(shù)據(jù)集
        val oddStream = splitedStream.select("odd")
    
        env.execute("l ")

函數(shù)類和富函數(shù)類

概述:
    前面學(xué)過(guò)的所有算子集合都可以自定義一個(gè)函數(shù)類,富函數(shù)類作為參數(shù),因?yàn)镕link暴露了這兩種函數(shù)類的接口,常見(jiàn)的函數(shù)接口:
    1. MapFunction
    2. FlatMapFunction
    3. ReduceFunction
    富函數(shù)接口它與其他常規(guī)函數(shù)接口的不同在于:可以獲取運(yùn)行環(huán)境的上下文,在上下文環(huán)境中可以管理狀態(tài)(State),并擁有一些生命周期方法,
    所以可以實(shí)現(xiàn)更復(fù)雜的功能.富函數(shù)的接口有:
    1. RichMapFunction
    2. RichFlatMapFunction
    3. RichFilterFunction
1. 普通函數(shù)類舉例:
    按照指定的時(shí)間格式輸出每個(gè)通話的撥號(hào)時(shí)間和結(jié)束時(shí)間
    code:
        import java.text.SimpleDateFormat
        import java.util.Date
        import org.apache.flink.api.common.functions.MapFunction
        import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
        object FunctionClassTransformation {
          case class StationLog(sid: String, var callOut: String, var callInt: String, callType: String, callTime: Long, duration: Long)
          def main(args: Array[String]): Unit = {
            //獲取flink實(shí)時(shí)流處理的環(huán)境
            val env = StreamExecutionEnvironment.getExecutionEnvironment
            env.setParallelism(1)
            // 導(dǎo)入Flink隱式轉(zhuǎn)換
            import org.apache.flink.streaming.api.scala._
            val data = env.readTextFile(getClass.getResource("station.log").getPath)
              .map { line =>
                var arr = line.split(",")
                StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
              }
            // 定義時(shí)間輸出格式
            val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
            // 過(guò)濾那些通話成功的
            data.filter(_.callType.equals("success"))
              .map(new CallMapFunction(format))
              .print()
        
            env.execute("l ")
        
          }
        
          class CallMapFunction(format: SimpleDateFormat) extends
            MapFunction[StationLog, String] {
            override def map(t: StationLog): String = {
              var startTime = t.callTime
              val endTime = t.callTime + t.duration * 1000
              "主叫號(hào)碼: " + t.callOut + " , 被叫號(hào)碼: " + t.callInt + ", 呼叫起始時(shí)間: " + format.format(new Date(startTime)) + ",呼叫結(jié)束時(shí)間: " + format.format(new Date(endTime))
            }
          }}
    result:
        主叫號(hào)碼: 18600003186 , 被叫號(hào)碼: 18900002113, 呼叫起始時(shí)間: 2019-12-23 13:54:13,呼叫結(jié)束時(shí)間: 2019-12-23 13:54:45
        主叫號(hào)碼: 18600003794 , 被叫號(hào)碼: 18900009608, 呼叫起始時(shí)間: 2019-12-23 13:54:13,呼叫結(jié)束時(shí)間: 2019-12-23 13:54:17
        主叫號(hào)碼: 18600000005 , 被叫號(hào)碼: 18900007729, 呼叫起始時(shí)間: 2019-12-23 13:56:43,呼叫結(jié)束時(shí)間: 2019-12-23 14:02:32
        主叫號(hào)碼: 18600005404 , 被叫號(hào)碼: 18900000558, 呼叫起始時(shí)間: 2019-12-23 13:54:17,呼叫結(jié)束時(shí)間: 2019-12-23 13:54:22
        主叫號(hào)碼: 18600003532 , 被叫號(hào)碼: 18900008128, 呼叫起始時(shí)間: 2019-12-23 13:54:19,呼叫結(jié)束時(shí)間: 2019-12-23 13:54:29
        主叫號(hào)碼: 18600003532 , 被叫號(hào)碼: 18900008128, 呼叫起始時(shí)間: 2019-12-23 13:54:26,呼叫結(jié)束時(shí)間: 2019-12-23 13:54:41
        主叫號(hào)碼: 18600003502 , 被叫號(hào)碼: 18900009859, 呼叫起始時(shí)間: 2019-12-23 13:54:28,呼叫結(jié)束時(shí)間: 2019-12-23 13:54:28
        主叫號(hào)碼: 18600003502 , 被叫號(hào)碼: 18900009859, 呼叫起始時(shí)間: 2019-12-23 13:54:28,呼叫結(jié)束時(shí)間: 2019-12-23 13:54:28            
2.  富函數(shù)類舉例:
        把呼叫成功的通話信息轉(zhuǎn)換成真實(shí)的用戶姓名,通話用戶對(duì)應(yīng)的用戶表(在MySql數(shù)據(jù)庫(kù)中),
        由于需要從數(shù)據(jù)庫(kù)中查詢數(shù)據(jù),就需要?jiǎng)?chuàng)建連接,創(chuàng)建連接的代碼必須寫在生命周期的open方法中,所以需要使用富函數(shù)類.
    Rich Function有一個(gè)生命周期的概念,典型的生命周期方法有:
        open()方法是rich function的初始化方法,當(dāng)一個(gè)算子例如map或者filter被調(diào)用之前open()會(huì)被調(diào)用.
        clsose()方法是生命周期中的最后一個(gè)調(diào)用的方法,做一些清理工作
        getRuntimeContext()方法提供了函數(shù)的RuntimeContext的一些信息,例如函數(shù)執(zhí)行的并行度,任務(wù)的名字,以及state狀態(tài)
    code:
        package com.bjsxt.flink.transformation
        
        import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
        
        import com.bjsxt.flink.source.StationLog
        import org.apache.flink.api.common.functions.RichMapFunction
        import org.apache.flink.configuration.Configuration
        import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
        
        object TestRichFunctionClass {
        
          /**
           * 把通話成功的電話號(hào)碼轉(zhuǎn)換成真是用戶姓名卒稳,用戶姓名保存在Mysql表中
           * @param args
           */
          def main(args: Array[String]): Unit = {
            val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
            import org.apache.flink.streaming.api.scala._
        
            //讀取數(shù)據(jù)源
            var filePath =getClass.getResource("/station.log").getPath
            val stream: DataStream[StationLog] = streamEnv.readTextFile(filePath)
              .map(line=>{
                var arr=line.split(",")
                new StationLog(arr(0).trim,arr(1).trim,arr(2).trim,arr(3).trim,arr(4).trim.toLong,arr(5).trim.toLong)
              })
        
            //計(jì)算:把電話號(hào)碼變成用戶姓名
            val result: DataStream[StationLog] = stream.filter(_.callType.equals("success"))
              .map(new MyRichMapFunction)
            result.print()
        
            streamEnv.execute()
          }
        
          //自定義一個(gè)富函數(shù)類
          class MyRichMapFunction extends RichMapFunction[StationLog,StationLog]{
            var conn:Connection=_
            var pst:PreparedStatement=_
            override def open(parameters: Configuration): Unit = {
              conn =DriverManager.getConnection("jdbc:mysql://localhost/test","root","123123")
              pst =conn.prepareStatement("select name from t_phone where phone_number=?")
            }
        
            override def close(): Unit = {
              pst.close()
              conn.close()
            }
        
            override def map(value: StationLog): StationLog = {
              println(getRuntimeContext.getTaskNameWithSubtasks)
              //查詢主叫號(hào)碼對(duì)應(yīng)的姓名
              pst.setString(1,value.callOut)
              val result: ResultSet = pst.executeQuery()
              if(result.next()){
                value.callOut=result.getString(1)
              }
              //查詢被叫號(hào)碼對(duì)應(yīng)的姓名
              pst.setString(1,value.callInt)
              val result2: ResultSet = pst.executeQuery()
              if(result2.next()){
                value.callInt=result2.getString(1)
              }
              value
            }
          }
        }
6. 底層 ProcessFunctionAPI
    概述:
        ProcessFunction是一個(gè)低層次的流處理操作,允許所有返回Stream的基礎(chǔ)構(gòu)建模塊:
        訪問(wèn)Event本身數(shù)據(jù)(比如:Event的時(shí)間, Event的當(dāng)前Key)
        管理狀態(tài) State(僅在keyed Stream中)
        管理定時(shí)器Timer( 包括: 注冊(cè)定時(shí)器,刪除定時(shí)器等)
    總而言之,ProcessFunction是Flink最底層的API,也是功能最強(qiáng)大的.
    例如:
        監(jiān)控每一個(gè)手機(jī),如果在5s內(nèi)呼叫它的通話都是失敗的,發(fā)出警告信息.
    注意:
        這個(gè)案例中會(huì)使用到狀態(tài)編程,請(qǐng)同學(xué)們只要知道狀態(tài)的意思,不需要掌握,
    code:
        package com.bjsxt.flink.transformation
        
        import com.bjsxt.flink.source.StationLog
        import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
        import org.apache.flink.streaming.api.functions.KeyedProcessFunction
        import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
        import org.apache.flink.util.Collector
        
        object TestProcessFunction {
        
          //監(jiān)控每一個(gè)手機(jī)號(hào)碼,如果這個(gè)號(hào)碼在5秒內(nèi)他巨,所有呼叫它的日志都是失敗的充坑,則發(fā)出告警信息
          //如果在5秒內(nèi)只要有一個(gè)呼叫不是fail則不用告警
          def main(args: Array[String]): Unit = {
        
            val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
            import org.apache.flink.streaming.api.scala._
        
            //讀取數(shù)據(jù)源
            val stream: DataStream[StationLog] = streamEnv.socketTextStream("hadoop101",8888)
              .map(line=>{
                var arr=line.split(",")
                new StationLog(arr(0).trim,arr(1).trim,arr(2).trim,arr(3).trim,arr(4).trim.toLong,arr(5).trim.toLong)
              })
        
            //計(jì)算
            val result: DataStream[String] = stream.keyBy(_.callInt)
              .process(new MonitorCallFail)
            result.print()
        
            streamEnv.execute()
          }
        
          //自定義一個(gè)底層的類
          class MonitorCallFail extends KeyedProcessFunction[String,StationLog,String]{
            //使用一個(gè)狀態(tài)對(duì)象記錄時(shí)間
            lazy val timeState :ValueState[Long] =getRuntimeContext.getState(new ValueStateDescriptor[Long]("time",classOf[Long]))
        
            override def processElement(value: StationLog, ctx: KeyedProcessFunction[String, StationLog, String]#Context, out: Collector[String]): Unit = {
              //從狀態(tài)中取得時(shí)間
              var time =timeState.value()
              if(time==0 && value.callType.equals("fail") ){ //表示第一次發(fā)現(xiàn)呼叫失敗,記錄當(dāng)前的時(shí)間
                //獲取當(dāng)前系統(tǒng)時(shí)間染突,并注冊(cè)定時(shí)器
                var nowTime =ctx.timerService().currentProcessingTime()
                //定時(shí)器在5秒后觸發(fā)
                var onTime =nowTime+8*1000L
                ctx.timerService().registerProcessingTimeTimer(onTime)
                //把觸發(fā)時(shí)間保存到狀態(tài)中
                timeState.update(onTime)
              }
              if (time!=0 && !value.callType.equals("fail")){ //表示有一次成功的呼叫,必須要?jiǎng)h除定時(shí)器
                ctx.timerService().deleteProcessingTimeTimer(time)
                timeState.clear() //清空狀態(tài)中的時(shí)間
              }
            }
        
            //時(shí)間到了捻爷,定時(shí)器執(zhí)行,
            override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, StationLog, String]#OnTimerContext, out: Collector[String]): Unit = {
              var warnStr="觸發(fā)的時(shí)間:"+timestamp +" 手機(jī)號(hào) :"+ctx.getCurrentKey
              out.collect(warnStr)
              timeState.clear()
            }
          }
        }

7. 側(cè)輸出流 Side Output

概述:
    在Flink處理數(shù)據(jù)流時(shí),我們經(jīng)常會(huì)遇到這樣的情況: 在處理一個(gè)數(shù)據(jù)源時(shí),往往需要將該源中的不同類型的數(shù)據(jù)做分割處理,如果使用filter
    算子對(duì)數(shù)據(jù)源進(jìn)行篩選分割的話,勢(shì)必會(huì)造成數(shù)據(jù)流的多次復(fù)制,造成不必要的性能浪費(fèi);flink中的側(cè)輸出就是將數(shù)據(jù)流進(jìn)行分割,而不對(duì)流
    進(jìn)行復(fù)制的一種分流機(jī)制,
    Flink的側(cè)輸出的另一個(gè)作用就是對(duì)延時(shí)遲到的數(shù)據(jù)進(jìn)行處理,這樣就可以不必丟棄遲到的數(shù)據(jù).
案例: (根據(jù)基站的日志,請(qǐng)把呼叫成功的Stream(主流)和不成功的Stream(側(cè)流分別輸出)
code:
    package FlinkDemo.functions
    9
    import FlinkDemo.functions.FunctionClassTransformation.StationLog
    import org.apache.flink.streaming.api.functions.ProcessFunction
    import org.apache.flink.util.Collector
    
    object TestSideOutpurStream {
      // 導(dǎo)入Flink隱式轉(zhuǎn)換
      import org.apache.flink.streaming.api.scala._
    
      // 側(cè)輸出流首先需要定義一個(gè)流的標(biāo)簽
      val notSuccessTag = new OutputTag[StationLog]("not_success")
    
      def main(args: Array[String]): Unit = {
        //獲取flink實(shí)時(shí)流處理的環(huán)境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
    
        val data = env.readTextFile(getClass.getResource("station.log").getPath)
          .map { line =>
            var arr = line.split(",")
            StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
          }
        val mainStream = data
          .process(new CreateSideOutputStream(notSuccessTag))
        // 得到測(cè)流
        val sideStream = mainStream.getSideOutput(notSuccessTag)
        mainStream.print("main")
        sideStream.print("sideOutput")
        env.execute()
      }
    
      class CreateSideOutputStream(tag: OutputTag[StationLog]) extends ProcessFunction[StationLog, StationLog] {
        override def processElement(value: StationLog, context: ProcessFunction[StationLog, StationLog]#Context, collector: Collector[StationLog]): Unit = {
          //輸出主流
          if (value.callType.equals("success")) {
            collector.collect(value)
          }
          else {
            //輸出側(cè)流
            context.output(tag, value)}}}}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市份企,隨后出現(xiàn)的幾起案子也榄,更是在濱河造成了極大的恐慌,老刑警劉巖司志,帶你破解...
    沈念sama閱讀 217,826評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件甜紫,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡骂远,警方通過(guò)查閱死者的電腦和手機(jī)囚霸,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,968評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)激才,“玉大人拓型,你說(shuō)我怎么就攤上這事∪衬眨” “怎么了劣挫?”我有些...
    開封第一講書人閱讀 164,234評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)东帅。 經(jīng)常有香客問(wèn)我压固,道長(zhǎng),這世上最難降的妖魔是什么靠闭? 我笑而不...
    開封第一講書人閱讀 58,562評(píng)論 1 293
  • 正文 為了忘掉前任邓夕,我火速辦了婚禮,結(jié)果婚禮上阎毅,老公的妹妹穿的比我還像新娘焚刚。我一直安慰自己,他們只是感情好扇调,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,611評(píng)論 6 392
  • 文/花漫 我一把揭開白布矿咕。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪碳柱。 梳的紋絲不亂的頭發(fā)上捡絮,一...
    開封第一講書人閱讀 51,482評(píng)論 1 302
  • 那天,我揣著相機(jī)與錄音莲镣,去河邊找鬼福稳。 笑死,一個(gè)胖子當(dāng)著我的面吹牛瑞侮,可吹牛的內(nèi)容都是我干的的圆。 我是一名探鬼主播,決...
    沈念sama閱讀 40,271評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼半火,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼越妈!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起钮糖,我...
    開封第一講書人閱讀 39,166評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤梅掠,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后店归,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體阎抒,經(jīng)...
    沈念sama閱讀 45,608評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,814評(píng)論 3 336
  • 正文 我和宋清朗相戀三年消痛,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了且叁。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,926評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡肄满,死狀恐怖谴古,靈堂內(nèi)的尸體忽然破棺而出质涛,到底是詐尸還是另有隱情稠歉,我是刑警寧澤,帶...
    沈念sama閱讀 35,644評(píng)論 5 346
  • 正文 年R本政府宣布汇陆,位于F島的核電站怒炸,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏毡代。R本人自食惡果不足惜阅羹,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,249評(píng)論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望教寂。 院中可真熱鬧捏鱼,春花似錦、人聲如沸酪耕。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,866評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至看尼,卻和暖如春递鹉,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背藏斩。 一陣腳步聲響...
    開封第一講書人閱讀 32,991評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工躏结, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人狰域。 一個(gè)月前我還...
    沈念sama閱讀 48,063評(píng)論 3 370
  • 正文 我出身青樓媳拴,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親北专。 傳聞我的和親對(duì)象是個(gè)殘疾皇子禀挫,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,871評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • Flink程序是實(shí)現(xiàn)分布式集合轉(zhuǎn)換的常規(guī)程序(例如, filtering, mapping, updating s...
    大菜鳥_閱讀 2,464評(píng)論 0 1
  • 摘于博古林全集 夜晚拓颓,如今隨著我們生命進(jìn)程不斷地推進(jìn)语婴,那種自我感觸變的更加厚實(shí)沉淀起來(lái),越來(lái)越多更為實(shí)在的日常...
    博古林V雙木居士閱讀 934評(píng)論 1 4
  • 易經(jīng)是十三經(jīng)之首。 榮格的老師說(shuō)场航,讀懂了易經(jīng)缠导,你就知道如何做心理治療。 榮格接觸了易經(jīng)之后溉痢,創(chuàng)造了深度分析心理學(xué)僻造。...
    春江花月夜9999閱讀 347評(píng)論 2 2
  • 面對(duì)監(jiān)理的抱怨,辛苦的施工工人及其相關(guān)負(fù)責(zé)人都很無(wú)奈這種難以滿足的驗(yàn)收標(biāo)準(zhǔn)孩饼,由不得怨恨監(jiān)理的吹毛求疵髓削。一位剛畢業(yè)的...
    匆匆不離去閱讀 277評(píng)論 0 0
  • 感恩伙伴一早叫起,還帶的早餐 感恩店鋪伙伴認(rèn)真負(fù)責(zé)镀娶,來(lái)春裝了立膛,整理庫(kù)房 感恩今天的學(xué)習(xí),很有優(yōu)秀的分享梯码,值得學(xué)習(xí) ...
    十八菩提子閱讀 113評(píng)論 0 0