Flink Window(窗口) 詳解
概述:
windows 計算是流式計算中非常常用的數(shù)據(jù)計算方式之一.通過按照固定時間或長度將數(shù)據(jù)流切分成不同窗口,
然后對數(shù)據(jù)進行相應的聚合運算,從而得到一定時間范圍內(nèi)的統(tǒng)計結果,
eg:
例如統(tǒng)計最近5min 內(nèi)某基站的呼叫數(shù),此時基站的數(shù)據(jù)在不斷地產(chǎn)生,但是通過5min中的窗口將數(shù)據(jù)限定在固定
時間范圍內(nèi),就可以對該范圍內(nèi)的有界數(shù)據(jù)執(zhí)行聚合處理,得出最近5min的基站的呼叫數(shù)量.
1. Window 分類
1. Global Window 和 keyed Window
概述:
在運用窗口計算時,Flink根據(jù)上游數(shù)據(jù)集是否為KeyedStream類型,對應的Window也會有所不同.
Keyed Window :
上游數(shù)據(jù)集如果是KeyedStream類型,則調(diào)用DataStream API 的Window()方法,數(shù)據(jù)會根據(jù)Key在不同的Task實例
中并行分別計算,最后得出針對每個Key統(tǒng)計的結果.
Global Window:
如果是Non-Keyey類型,則調(diào)用WindowsAll()方法,所有的數(shù)據(jù)都會在窗口算子中匯到一個Task中計算,并得出全局
統(tǒng)計結果
eg:
// Global Window
data.windowAll(自定義的WindowAssigner)
//KeyedWindow
data.keyBy(_.sid)
.window(自定義的WindowAssigner)
2. Time Window 和Count Window
概述:
基于業(yè)務數(shù)據(jù)的方面考慮,Flink又支持兩種類型的窗口,一種是基于時間的窗口叫Time Window,
還有一種基于輸入數(shù)據(jù)量的窗口叫Count Window
3. Time Window(時間窗口)
概述:
根據(jù)不同的業(yè)務場景,Time Window也可以分為三種類型,分別是滾動窗口(Tumbling Window)、滑動窗口(Sliding Window)和會話窗口叫Count Window
1. 滾動窗口(Tumbling Window)
概述:
滾動窗口是根據(jù)固定時間進行切分,且窗口和窗口之間的元素互不重疊,
這種類型的窗口最大特點是比較簡單,只需要指定一個窗口長度(window size)
eg:
// 每個5s統(tǒng)計每個基站的日志數(shù)量
data.map((_.sid,1))
.keyBy(_._1)
.timeWindow(Time.seconds(5))
//window(TumblingEventTImeWindows.of(Time.seconds(5)))
.sum(1)//聚合
其中時間間隔可以是Time.milliseconds(x),Time.seconds(x)或Time.minutes(x)
2. 滑動窗口(Sliding Window)
概述:
滑動窗口也是一種比較常見的窗口類型,其特點是在滾動窗口基礎上增加了窗口滑動時間(Slide TIme),且允許窗口
數(shù)據(jù)發(fā)生重疊,當Windows size固定之后,窗口并不像滾動窗口按照windows Size向前移動,而是根據(jù)設定的Slide Time
向前滑動.
窗口之間的數(shù)據(jù)重疊大小根據(jù)Windows Size和Slide Time決定,當SlideTime小于Windows size 便會發(fā)生窗口重疊
eg:
//每隔3s計算最近5s內(nèi),每個基站的日志數(shù)量
data.map((_,1))
.keyBy(_._1)
.timeWindow(Time.seconds(5),Time.seconds(3))
.sum(1)
3. 會話窗口(Session Window)
概述:
會話窗口主要是將某段時間內(nèi)活躍度較高的數(shù)據(jù)聚合成一個窗口進行計算,窗口的觸發(fā)的條件是Session Gap,是指在規(guī)定
的時間內(nèi)如果沒有數(shù)據(jù)活躍接入,則認為窗口結束,然后觸發(fā)窗口計算結果,
注意:
需要注意的是如果數(shù)據(jù)一直不間斷地進入窗口,也會導致窗口始終不觸發(fā)的情況.
與滑動窗口不同的是,Session Windows不需要有固定Window size和slide tinme ,
需要定義SEssion gap,來規(guī)定不活躍數(shù)據(jù)的時間上限即可
eg:
// 3s內(nèi)如果沒有數(shù)據(jù)接入,則計算每個基站的日志數(shù)量
data.map((_.sid,1))
.keyBy(_._1)
.window(EventTimeSessionWindows.withGap(Time.seconds(3))))
.sum(1)
4. Count Window(數(shù)量窗口)
概述:
Count Window 也有滾動窗口豪筝、滑動窗口等.使用較少
Window 的 API
概述:
在實際案例中Keyed Window 使用最多,所以我們需要掌握Keyed Window的算子,在每個窗口算子中包含了
Windows Assigner属铁、Windows Trigger(窗口觸發(fā)器)黍少、Evictor(數(shù)據(jù)剔除器)眷茁、Lateness(時延設定)、
Output (輸出標簽)以及Windows Function,其中Windows Assigner和Windows Functions是所有窗口算子
必須指定的屬性,其余的屬性都是根據(jù)實際情況選擇指定.
code:
stream.keyBy(...)是Keyed類型數(shù)據(jù)集
.window(...)//指定窗口分配器類型
[.trigger(...)]//指定觸發(fā)器類型(可選)
[.evictor(...)] // 指定evictor或者不指定(可選)
[.allowedLateness(...)] //指定是否延遲處理數(shù)據(jù)(可選)
[.sideOutputLateData(...)] // 指定Output lag(可選)
.reduce/aggregate/fold/apply() //指定窗口計算函數(shù)
[.getSideOutput(...)] //根據(jù)Tag輸出數(shù)據(jù)(可選)
intro:
Windows Assigner : 指定窗口的類型,定義如何將數(shù)據(jù)流分配到一個或多個窗口
Windows Trigger : 指定窗口觸發(fā)的時機,定義窗口滿足什么樣的條件觸發(fā)計算
Evictor : 用于數(shù)據(jù)剔除
allowedLateness : 標記是否處理遲到數(shù)據(jù),當遲到數(shù)據(jù)達到窗口是否觸發(fā)計算
Output Tag: 標記輸出標簽,然后在通過getSideOutput將窗口中的數(shù)據(jù)根據(jù)標簽輸出
Windows Function: 定義窗口上數(shù)據(jù)處理的邏輯,例如對數(shù)據(jù)進行Sum操作
窗口聚合函數(shù)
概述:
如果定義了Window Assigner ,下一步就可以定義窗口內(nèi)數(shù)據(jù)的計算邏輯,這也就是Window Function的定義,
Flink提供了四種類型的Window Function,分別為 ReduceFunction暑始、AggregateFunction以及ProcessWindowFunction,
(sum和max)等.
前3種類型的Window Function按照計算原理的不同可以分為兩大類
1. 一類是增量聚合函數(shù): 對應有ReduceFunction淋袖、AggregateFunction;
2. 另一類是全量窗口函數(shù),對應有ProcessWindowFunction(WindowFunction)
差異:
1. 增量聚合函數(shù)計算性能較高,占用內(nèi)存空間少,主要因為基于中間狀態(tài)的計算結果,窗口中只維護中間結果狀態(tài)值,
不需要緩存原始數(shù)據(jù).
2. 全量窗口函數(shù)使用的代價相對較高,性能比較弱,主要因為此時算子需要對所有屬于該窗口的接入數(shù)據(jù)進行緩存,
然后等到窗口觸發(fā)的時候,對所有的原始數(shù)據(jù)進行匯總計算.
1. ReduceFunction
概述
ReduceFunction 定義了對輸入的兩個相同類型的數(shù)據(jù)元素按照指定的計算方法進行聚合的邏輯,
然后輸出類型相同的一個結果元素
code:
// 每隔5s統(tǒng)計每個基站的日志數(shù)量
data.map((_.sid,1))
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(TIme.seconds(5)))
.reduce((v1,v2)=>(v1._1,v1._2+v2._2))
2. AggregateFunction
概述:
和ReduceFunction 相似,AggregateFunction也是基于中間狀態(tài)計算結果的增量計算函數(shù),但AggregateFunctino在窗口
計算上更加通用,AggregateFunction接口相對ReduceFunction更加靈活.實現(xiàn)復雜度也相對較高.
AggregateFunction接口中定義了三個需要復寫的方法,其中
add()定義數(shù)據(jù)添加的邏輯,
getResult()定義了根據(jù)accmulator計算結果的邏輯,
merge()方法定義合并accumulator的邏輯
code:
//每隔3s內(nèi)計算最近5s內(nèi),每個基站的日志數(shù)量
val data = env.readTextFile("D:\\Workspace\\IdeaProjects\\F1Demo\\src\\FlinkDemo\\functions\\station.log")
.map { line =>
var arr = line.split(",")
StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
}
val result = data.map(stationLog => (stationLog.sid, 1))
.keyBy(_._1)
.timeWindow(Time.seconds(5), Time.seconds(3))
// new AggregateFunction[In,Acc,Out]
.aggregate(new AggregateFunction[(String, Int), (String, Long), (String, Long)] {
override def createAccumulator(): (String, Long) = ("", 0)
override def add(in: (String, Int), acc: (String, Long)): (String, Long) = {
(in._1, acc._2 + in._2)
}
override def getResult(acc: (String, Long)): (String, Long) = {
print(acc)
acc
}
override def merge(acc: (String, Long), acc1: (String, Long)): (String, Long) = {
(acc._1, acc1._2 + acc._2)
}
})
env.execute()
}
3. ProcessWindowFunction
概念:
前面提到的ReduceFunction和AggregateFunction都是基于中間狀態(tài)實現(xiàn)增量計算的窗口函數(shù),雖然已經(jīng)滿足絕大
多數(shù)場景,但在某些情況下,統(tǒng)計更復雜的指標可能需要依賴于窗口中所有的數(shù)據(jù)元素,或需要操作窗口中的狀態(tài)數(shù)據(jù)
和窗口元數(shù)據(jù),這時就需要使用到ProcessWindowsFunction,ProcessWindowsFunction能夠更加靈活地支持基于窗口
全部數(shù)據(jù)元素的結果計算,例如對整個窗口數(shù)據(jù)排序取TopN,這樣的需要就必須使用ProcessWindowFunction.
code:
val result = data.map(stationLog => ((stationLog.sid, 1)))
.keyBy(_._1)
//.timeWindow(Time.seconds(5))
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
// ProcessWindowFunction[In,Out,Key,Window]
.process(new ProcessWindowFunction[(String, Int), (String, Long), String, TimeWindow] {
//一個窗口結束的時候調(diào)用一次(一個分組執(zhí)行一次)
override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Long)]): Unit = {
print("----")
//注意:整個窗口的數(shù)據(jù)保存到Iterable,里面有很多行數(shù)據(jù)节值。Iterable的size就是日志的總條數(shù)
out.collect((key, elements.size))
}
}).print()
env.execute()
案例:
1. AggregateFunction
import FlinkDemo.functions.FunctionClassTransformation.StationLog
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object WindowDemos {
// 導入Flink隱式轉(zhuǎn)換
import org.apache.flink.streaming.api.scala._
def main(args: Array[String]): Unit = {
//獲取flink實時流處理的環(huán)境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val data = env.socketTextStream("localhost", 9999)
.map { line =>
var arr = line.split(",")
StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
}
val result = data.map(stationLog => (stationLog.sid, 1))
.keyBy(_._1)
// timeWindow(t1,t2) t1表示窗口大小,t2表示滑動窗口大小
.timeWindow(Time.seconds(10), Time.seconds(3))
// new AggregateFunction[In,Acc,Out]
.aggregate(new MyAggregateFunction, new MyWindowFunction)
.print()
env.execute()
}
/**
* AggregateFunction<IN, ACC, OUT>
* 1. In 表示輸入?yún)?shù)類型
* 2. ACC 表示累加器類型
* 3. Out 表示輸出值類型
* add => 表示來一條數(shù)據(jù)執(zhí)行一次
* getResult => 表示在窗口結束的時候執(zhí)行一次
*/
class MyAggregateFunction extends AggregateFunction[(String, Int), (String, Long), (String, Long)] {
// 初始化一個累加器,開始的時候為0
override def createAccumulator(): (String, Long) = ("", 0)
override def add(in: (String, Int), acc: (String, Long)): (String, Long) = {
(in._1, acc._2 + in._2)
}
override def getResult(acc: (String, Long)): (String, Long) = {
print(acc)
acc
}
//合并統(tǒng)計的值
override def merge(acc: (String, Long), acc1: (String, Long)): (String, Long) = {
(acc._1, acc1._2 + acc._2)
}
}
/**
* WindowFunction[IN, OUT, KEY, W <: Window]
* 1. In 表示輸入?yún)?shù)類型
* 2. OUT 表示輸出參數(shù)類型
* 3. key表示 key的類型
* 4. W 表示window類型時間窗口
* 輸入數(shù)據(jù)來自于AggregateFunction,在窗口結束的時候先執(zhí)行AggregateFunction對象的getResult,
* 然后在執(zhí)行apply()
*/
class MyWindowFunction extends WindowFunction[(String, Long), (String, Long), String, TimeWindow] {
override def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[(String, Long)]): Unit = {
// 獲取迭代器的第一個(迭代器中只有一個值)
out.collect((key, input.iterator.next()._2))
}
}
}