Flink Window窗口詳解

Flink Window(窗口) 詳解

概述:
    windows 計算是流式計算中非常常用的數(shù)據(jù)計算方式之一.通過按照固定時間或長度將數(shù)據(jù)流切分成不同窗口,
    然后對數(shù)據(jù)進行相應的聚合運算,從而得到一定時間范圍內(nèi)的統(tǒng)計結果,
eg:
    例如統(tǒng)計最近5min 內(nèi)某基站的呼叫數(shù),此時基站的數(shù)據(jù)在不斷地產(chǎn)生,但是通過5min中的窗口將數(shù)據(jù)限定在固定
    時間范圍內(nèi),就可以對該范圍內(nèi)的有界數(shù)據(jù)執(zhí)行聚合處理,得出最近5min的基站的呼叫數(shù)量.

1. Window 分類

1. Global Window 和 keyed Window
概述:
    在運用窗口計算時,Flink根據(jù)上游數(shù)據(jù)集是否為KeyedStream類型,對應的Window也會有所不同.
Keyed Window :
    上游數(shù)據(jù)集如果是KeyedStream類型,則調(diào)用DataStream API 的Window()方法,數(shù)據(jù)會根據(jù)Key在不同的Task實例
    中并行分別計算,最后得出針對每個Key統(tǒng)計的結果.
Global Window:
    如果是Non-Keyey類型,則調(diào)用WindowsAll()方法,所有的數(shù)據(jù)都會在窗口算子中匯到一個Task中計算,并得出全局
    統(tǒng)計結果
eg:
    // Global Window
    data.windowAll(自定義的WindowAssigner)
    //KeyedWindow
    data.keyBy(_.sid)
    .window(自定義的WindowAssigner)
2. Time Window 和Count Window
概述:
    基于業(yè)務數(shù)據(jù)的方面考慮,Flink又支持兩種類型的窗口,一種是基于時間的窗口叫Time Window,
    還有一種基于輸入數(shù)據(jù)量的窗口叫Count Window
3. Time Window(時間窗口)
概述:
    根據(jù)不同的業(yè)務場景,Time Window也可以分為三種類型,分別是滾動窗口(Tumbling Window)、滑動窗口(Sliding Window)和會話窗口叫Count Window
1. 滾動窗口(Tumbling Window)
概述:
    滾動窗口是根據(jù)固定時間進行切分,且窗口和窗口之間的元素互不重疊,
    這種類型的窗口最大特點是比較簡單,只需要指定一個窗口長度(window size)
eg:
    // 每個5s統(tǒng)計每個基站的日志數(shù)量
    data.map((_.sid,1))
    .keyBy(_._1)
    .timeWindow(Time.seconds(5))
    //window(TumblingEventTImeWindows.of(Time.seconds(5)))
    .sum(1)//聚合
    其中時間間隔可以是Time.milliseconds(x),Time.seconds(x)或Time.minutes(x)
2. 滑動窗口(Sliding Window)
概述:
    滑動窗口也是一種比較常見的窗口類型,其特點是在滾動窗口基礎上增加了窗口滑動時間(Slide TIme),且允許窗口
    數(shù)據(jù)發(fā)生重疊,當Windows size固定之后,窗口并不像滾動窗口按照windows Size向前移動,而是根據(jù)設定的Slide Time
    向前滑動.
    窗口之間的數(shù)據(jù)重疊大小根據(jù)Windows Size和Slide Time決定,當SlideTime小于Windows size 便會發(fā)生窗口重疊
eg: 
    //每隔3s計算最近5s內(nèi),每個基站的日志數(shù)量
    data.map((_,1))
    .keyBy(_._1)
    .timeWindow(Time.seconds(5),Time.seconds(3))
    .sum(1)
3. 會話窗口(Session Window)
概述:
    會話窗口主要是將某段時間內(nèi)活躍度較高的數(shù)據(jù)聚合成一個窗口進行計算,窗口的觸發(fā)的條件是Session Gap,是指在規(guī)定
    的時間內(nèi)如果沒有數(shù)據(jù)活躍接入,則認為窗口結束,然后觸發(fā)窗口計算結果,
注意:
    需要注意的是如果數(shù)據(jù)一直不間斷地進入窗口,也會導致窗口始終不觸發(fā)的情況.
    與滑動窗口不同的是,Session Windows不需要有固定Window size和slide tinme ,
    需要定義SEssion gap,來規(guī)定不活躍數(shù)據(jù)的時間上限即可
eg:
    // 3s內(nèi)如果沒有數(shù)據(jù)接入,則計算每個基站的日志數(shù)量
    data.map((_.sid,1))
    .keyBy(_._1)
    .window(EventTimeSessionWindows.withGap(Time.seconds(3))))
    .sum(1)
4. Count Window(數(shù)量窗口)
概述:
    Count Window 也有滾動窗口豪筝、滑動窗口等.使用較少

Window 的 API

概述:
    在實際案例中Keyed Window 使用最多,所以我們需要掌握Keyed Window的算子,在每個窗口算子中包含了 
    Windows Assigner属铁、Windows Trigger(窗口觸發(fā)器)黍少、Evictor(數(shù)據(jù)剔除器)眷茁、Lateness(時延設定)、
    Output (輸出標簽)以及Windows Function,其中Windows Assigner和Windows Functions是所有窗口算子
    必須指定的屬性,其余的屬性都是根據(jù)實際情況選擇指定.
code:
    stream.keyBy(...)是Keyed類型數(shù)據(jù)集
    .window(...)//指定窗口分配器類型
    [.trigger(...)]//指定觸發(fā)器類型(可選)
    [.evictor(...)] // 指定evictor或者不指定(可選)
    [.allowedLateness(...)] //指定是否延遲處理數(shù)據(jù)(可選)
    [.sideOutputLateData(...)] // 指定Output lag(可選)
    .reduce/aggregate/fold/apply() //指定窗口計算函數(shù)
    [.getSideOutput(...)] //根據(jù)Tag輸出數(shù)據(jù)(可選)
intro:
    Windows Assigner : 指定窗口的類型,定義如何將數(shù)據(jù)流分配到一個或多個窗口
    Windows Trigger : 指定窗口觸發(fā)的時機,定義窗口滿足什么樣的條件觸發(fā)計算
    Evictor : 用于數(shù)據(jù)剔除
    allowedLateness : 標記是否處理遲到數(shù)據(jù),當遲到數(shù)據(jù)達到窗口是否觸發(fā)計算
    Output Tag: 標記輸出標簽,然后在通過getSideOutput將窗口中的數(shù)據(jù)根據(jù)標簽輸出
    Windows Function: 定義窗口上數(shù)據(jù)處理的邏輯,例如對數(shù)據(jù)進行Sum操作

窗口聚合函數(shù)

概述:
    如果定義了Window Assigner ,下一步就可以定義窗口內(nèi)數(shù)據(jù)的計算邏輯,這也就是Window Function的定義,
    Flink提供了四種類型的Window Function,分別為 ReduceFunction暑始、AggregateFunction以及ProcessWindowFunction,
    (sum和max)等.
    前3種類型的Window Function按照計算原理的不同可以分為兩大類
1. 一類是增量聚合函數(shù): 對應有ReduceFunction淋袖、AggregateFunction;
2. 另一類是全量窗口函數(shù),對應有ProcessWindowFunction(WindowFunction)
差異:
    1. 增量聚合函數(shù)計算性能較高,占用內(nèi)存空間少,主要因為基于中間狀態(tài)的計算結果,窗口中只維護中間結果狀態(tài)值,
    不需要緩存原始數(shù)據(jù).
    2. 全量窗口函數(shù)使用的代價相對較高,性能比較弱,主要因為此時算子需要對所有屬于該窗口的接入數(shù)據(jù)進行緩存,
    然后等到窗口觸發(fā)的時候,對所有的原始數(shù)據(jù)進行匯總計算.
1. ReduceFunction
概述
    ReduceFunction 定義了對輸入的兩個相同類型的數(shù)據(jù)元素按照指定的計算方法進行聚合的邏輯,
    然后輸出類型相同的一個結果元素
code:
    // 每隔5s統(tǒng)計每個基站的日志數(shù)量
    data.map((_.sid,1))
    .keyBy(_._1)
    .window(TumblingEventTimeWindows.of(TIme.seconds(5)))
    .reduce((v1,v2)=>(v1._1,v1._2+v2._2))
2. AggregateFunction
概述:
    和ReduceFunction 相似,AggregateFunction也是基于中間狀態(tài)計算結果的增量計算函數(shù),但AggregateFunctino在窗口
    計算上更加通用,AggregateFunction接口相對ReduceFunction更加靈活.實現(xiàn)復雜度也相對較高. 
    AggregateFunction接口中定義了三個需要復寫的方法,其中
    add()定義數(shù)據(jù)添加的邏輯,
    getResult()定義了根據(jù)accmulator計算結果的邏輯,
    merge()方法定義合并accumulator的邏輯
code:
    //每隔3s內(nèi)計算最近5s內(nèi),每個基站的日志數(shù)量
        val data = env.readTextFile("D:\\Workspace\\IdeaProjects\\F1Demo\\src\\FlinkDemo\\functions\\station.log")
          .map { line =>
            var arr = line.split(",")
            StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
          }
        val result = data.map(stationLog => (stationLog.sid, 1))
          .keyBy(_._1)
          .timeWindow(Time.seconds(5), Time.seconds(3))
          // new AggregateFunction[In,Acc,Out]
          .aggregate(new AggregateFunction[(String, Int), (String, Long), (String, Long)] {
          override def createAccumulator(): (String, Long) = ("", 0)
    
          override def add(in: (String, Int), acc: (String, Long)): (String, Long) = {
            (in._1, acc._2 + in._2)
          }
    
          override def getResult(acc: (String, Long)): (String, Long) = {
            print(acc)
            acc
          }
    
          override def merge(acc: (String, Long), acc1: (String, Long)): (String, Long) = {
            (acc._1, acc1._2 + acc._2)
          }
        })
    
        env.execute()
      }
3. ProcessWindowFunction
概念:
    前面提到的ReduceFunction和AggregateFunction都是基于中間狀態(tài)實現(xiàn)增量計算的窗口函數(shù),雖然已經(jīng)滿足絕大
    多數(shù)場景,但在某些情況下,統(tǒng)計更復雜的指標可能需要依賴于窗口中所有的數(shù)據(jù)元素,或需要操作窗口中的狀態(tài)數(shù)據(jù)
    和窗口元數(shù)據(jù),這時就需要使用到ProcessWindowsFunction,ProcessWindowsFunction能夠更加靈活地支持基于窗口
    全部數(shù)據(jù)元素的結果計算,例如對整個窗口數(shù)據(jù)排序取TopN,這樣的需要就必須使用ProcessWindowFunction.
code:
    val result = data.map(stationLog => ((stationLog.sid, 1)))
  .keyBy(_._1)
  //.timeWindow(Time.seconds(5))
  .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  // ProcessWindowFunction[In,Out,Key,Window]
  .process(new ProcessWindowFunction[(String, Int), (String, Long), String, TimeWindow] {
  //一個窗口結束的時候調(diào)用一次(一個分組執(zhí)行一次)
  override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Long)]): Unit = {
    print("----")
    //注意:整個窗口的數(shù)據(jù)保存到Iterable,里面有很多行數(shù)據(jù)节值。Iterable的size就是日志的總條數(shù)
    out.collect((key, elements.size))
  }
}).print()
env.execute()

案例:

1. AggregateFunction
    import FlinkDemo.functions.FunctionClassTransformation.StationLog
    import org.apache.flink.api.common.functions.AggregateFunction
    import org.apache.flink.streaming.api.scala.function.WindowFunction
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
    
    object WindowDemos {
    
      // 導入Flink隱式轉(zhuǎn)換
      import org.apache.flink.streaming.api.scala._
    
      def main(args: Array[String]): Unit = {
        //獲取flink實時流處理的環(huán)境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        val data = env.socketTextStream("localhost", 9999)
          .map { line =>
            var arr = line.split(",")
            StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
          }
        val result = data.map(stationLog => (stationLog.sid, 1))
          .keyBy(_._1)
          // timeWindow(t1,t2) t1表示窗口大小,t2表示滑動窗口大小
          .timeWindow(Time.seconds(10), Time.seconds(3))
          // new AggregateFunction[In,Acc,Out]
          .aggregate(new MyAggregateFunction, new MyWindowFunction)
          .print()
    
        env.execute()
      }
    
      /**
        * AggregateFunction<IN, ACC, OUT>
        *   1. In 表示輸入?yún)?shù)類型
        *   2. ACC 表示累加器類型
        *   3. Out 表示輸出值類型
        * add => 表示來一條數(shù)據(jù)執(zhí)行一次
        * getResult => 表示在窗口結束的時候執(zhí)行一次
        */
      class MyAggregateFunction extends AggregateFunction[(String, Int), (String, Long), (String, Long)] {
        // 初始化一個累加器,開始的時候為0
        override def createAccumulator(): (String, Long) = ("", 0)
    
        override def add(in: (String, Int), acc: (String, Long)): (String, Long) = {
          (in._1, acc._2 + in._2)
        }
    
        override def getResult(acc: (String, Long)): (String, Long) = {
          print(acc)
          acc
        }
        //合并統(tǒng)計的值
        override def merge(acc: (String, Long), acc1: (String, Long)): (String, Long) = {
          (acc._1, acc1._2 + acc._2)
        }
      }
    
      /**
        * WindowFunction[IN, OUT, KEY, W <: Window]
        * 1. In 表示輸入?yún)?shù)類型
        * 2. OUT 表示輸出參數(shù)類型
        * 3. key表示 key的類型
        * 4. W 表示window類型時間窗口
        * 輸入數(shù)據(jù)來自于AggregateFunction,在窗口結束的時候先執(zhí)行AggregateFunction對象的getResult,
        * 然后在執(zhí)行apply()
        */
      class MyWindowFunction extends WindowFunction[(String, Long), (String, Long), String, TimeWindow] {
        override def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[(String, Long)]): Unit = {
          // 獲取迭代器的第一個(迭代器中只有一個值)
          out.collect((key, input.iterator.next()._2))
        }
      }
    
    }
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末徙硅,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子搞疗,更是在濱河造成了極大的恐慌嗓蘑,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,122評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件匿乃,死亡現(xiàn)場離奇詭異桩皿,居然都是意外死亡,警方通過查閱死者的電腦和手機幢炸,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評論 3 395
  • 文/潘曉璐 我一進店門泄隔,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人宛徊,你說我怎么就攤上這事佛嬉。” “怎么了闸天?”我有些...
    開封第一講書人閱讀 164,491評論 0 354
  • 文/不壞的土叔 我叫張陵暖呕,是天一觀的道長。 經(jīng)常有香客問我苞氮,道長湾揽,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,636評論 1 293
  • 正文 為了忘掉前任葱淳,我火速辦了婚禮钝腺,結果婚禮上,老公的妹妹穿的比我還像新娘赞厕。我一直安慰自己艳狐,他們只是感情好,可當我...
    茶點故事閱讀 67,676評論 6 392
  • 文/花漫 我一把揭開白布皿桑。 她就那樣靜靜地躺著毫目,像睡著了一般蔬啡。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上镀虐,一...
    開封第一講書人閱讀 51,541評論 1 305
  • 那天箱蟆,我揣著相機與錄音,去河邊找鬼刮便。 笑死空猜,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的恨旱。 我是一名探鬼主播辈毯,決...
    沈念sama閱讀 40,292評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼搜贤!你這毒婦竟也來了谆沃?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,211評論 0 276
  • 序言:老撾萬榮一對情侶失蹤仪芒,失蹤者是張志新(化名)和其女友劉穎唁影,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體掂名,經(jīng)...
    沈念sama閱讀 45,655評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡据沈,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,846評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了铆隘。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片卓舵。...
    茶點故事閱讀 39,965評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡南用,死狀恐怖膀钠,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情裹虫,我是刑警寧澤肿嘲,帶...
    沈念sama閱讀 35,684評論 5 347
  • 正文 年R本政府宣布,位于F島的核電站筑公,受9級特大地震影響雳窟,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜匣屡,卻給世界環(huán)境...
    茶點故事閱讀 41,295評論 3 329
  • 文/蒙蒙 一封救、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧捣作,春花似錦誉结、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽掉盅。三九已至,卻和暖如春以舒,著一層夾襖步出監(jiān)牢的瞬間趾痘,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評論 1 269
  • 我被黑心中介騙來泰國打工蔓钟, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留永票,地道東北人。 一個月前我還...
    沈念sama閱讀 48,126評論 3 370
  • 正文 我出身青樓滥沫,卻偏偏與公主長得像瓦侮,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子佣谐,可洞房花燭夜當晚...
    茶點故事閱讀 44,914評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 原文連接 https://ci.apache.org/projects/flink/flink-docs-rele...
    Alex90閱讀 3,458評論 0 5
  • 說明:本文為《Flink大數(shù)據(jù)項目實戰(zhàn)》學習筆記肚吏,想通過視頻系統(tǒng)學習Flink這個最火爆的大數(shù)據(jù)計算框架的同學,推...
    大數(shù)據(jù)研習社閱讀 2,132評論 0 0
  • 摘要 Flink 認為 Batch 是 Streaming 的一個特例狭魂,所以 Flink 底層引擎是一個流式引擎罚攀,...
    尼小摩閱讀 5,091評論 0 13
  • 概述 Apache Flink 是一個為生產(chǎn)環(huán)境而生的流處理器斋泄,具有易于使用的 API,可以用于定義高級流分析程序...
    木戎閱讀 4,559評論 0 2
  • 在這條逐夢的路上太久,經(jīng)歷得越多睬涧,傷害也越多募胃,我們總會在麻木中不經(jīng)意地丟掉夢想,偏離了離夢想最近的那條軌道畦浓,往往就...
    上官冒冒閱讀 200評論 0 0