時間語義
上圖是數(shù)據(jù)流式處理過程胀溺,涉及到兩個重要的時間點(diǎn):事件時間(Event Time)和處理時間(Processing Time)介杆。
- 事件時間(Event Time):即數(shù)據(jù)產(chǎn)生的時間;
- 處理時間(Processing Time):即數(shù)據(jù)真正被處理的時刻龙誊;
我們在處理數(shù)據(jù)時抚垃,以哪種時間作為衡量標(biāo)準(zhǔn),就是所謂的時間語義問題(Notions of Time)载迄。由于分布式系統(tǒng)中網(wǎng)絡(luò)傳輸?shù)难舆t和時鐘漂移讯柔,處理時間相對事件發(fā)生的時間會有滯后。在這種情況下护昧,就不能簡單地把數(shù)據(jù)自帶的時間戳當(dāng)作時鐘了魂迄,而需要用另外的標(biāo)志來表示事件時間進(jìn)展,在 Flink 中把它叫作事件時間的“水位線”(Watermarks)惋耙。
水位線(Watermark)
我們把時鐘也數(shù)據(jù)的形式傳遞出去捣炬,告訴下游任務(wù)當(dāng)前時間的進(jìn)展熊昌;而且這個時鐘的傳遞不會因?yàn)榇翱诰酆现惖倪\(yùn)算而停滯。一種簡單的想法是湿酸,在數(shù)據(jù)流中加入一個時鐘標(biāo)記婿屹,記錄當(dāng)前的事件時間;這個標(biāo)記可以直接到廣播下游推溃,當(dāng)下游任務(wù)收到這個標(biāo)記昂利,就可以更新自己的時鐘了。由于類似流水中用來當(dāng)做標(biāo)志的幾號铁坎,在Flink中蜂奸,這種用來衡量事件時間(Event Time)進(jìn)展的標(biāo)記,就被稱作水位線(Watermark)硬萍。
水位線可以看作一條特殊的數(shù)據(jù)記錄扩所,它是插入到數(shù)據(jù)流中的一個標(biāo)記點(diǎn),主要內(nèi)容就是一個時間戳朴乖,用來指示當(dāng)前的事件時間祖屏。而他插入流中的位置,就應(yīng)該是在某個數(shù)據(jù)到來之后买羞;這樣就可以從這個數(shù)據(jù)中提取時間戳袁勺,作為當(dāng)前水位線的時間戳了。
理想的水位線是有序的哩都,但是現(xiàn)實(shí)中由于不可控因素常常會有少量亂序的數(shù)據(jù)魁兼。
水位線代表當(dāng)前事件時間時鐘漠嵌,而且可以在數(shù)據(jù)的時間戳基礎(chǔ)上加一些延遲來保證不丟失數(shù)據(jù)咐汞,這一點(diǎn)對于亂序流的正確處理非常重要。水位線的特性:
- 水位線是插入到數(shù)據(jù)流中的一個標(biāo)記儒鹿,可以認(rèn)為是一個特殊的數(shù)據(jù)化撕;
- 水位線主要的內(nèi)容是一個時間戳,用來表示當(dāng)前事件時間的進(jìn)展约炎;
- 水位顯示基于數(shù)據(jù)的時間戳生成的植阴;
- 水位線的時間戳必須單調(diào)遞增,以確保任務(wù)的事件時間時鐘一直向前推進(jìn)圾浅;
- 水位線可以通過設(shè)置延遲掠手,以保證正確處理亂序數(shù)據(jù);
- 一個水位線(Watermark)t表示在當(dāng)前流中事件時間已經(jīng)到達(dá)了時間戳t狸捕,這導(dǎo)表t之前的所有數(shù)據(jù)都到齊了喷鸽,之后流中不會出現(xiàn)時間戳t'<t的數(shù)據(jù);
水位線是Flink流處理中保證結(jié)果正確性的核心機(jī)制灸拍,它往往會跟窗口一起配合做祝,完成對亂序數(shù)據(jù)的正確處理砾省。
如何生成水位線
在生成水位線的時候,如果希望計(jì)算結(jié)果更準(zhǔn)確混槐,可以將水位線延遲設(shè)置得更高一些编兄,等待時間越長,越不容易漏掉數(shù)據(jù)声登,但是這樣時效性降低了狠鸳。而如果將等待時間設(shè)置過短則會遺漏掉部分?jǐn)?shù)據(jù),雖然Flink提供了處理遲到數(shù)據(jù)的方法捌刮,但是需要分開處理碰煌。因此如何設(shè)置延遲是一個需要根據(jù)實(shí)際情況權(quán)衡的問題舒岸。
在Flink的DataStream API中绅作,有一個單獨(dú)用于生成水位線的方法:assignTimestampAndWatermarks()
,他主要用來為流中的數(shù)據(jù)分配時間戳蛾派,并生成水位線來顯示時間俄认。該方法需要傳入一個WatermarkStrategy
作為參數(shù),WatermarkStrategy 中包含了一個“時間戳分配器”TimestampAssigner和一個“水位線生成器”WatermarkGenerator:
-
TimestampAssigner
, 主要負(fù)責(zé)從流中數(shù)據(jù)元素的某個字段中提取時間戳洪乍,并分配給元素眯杏。時間戳的分配是生成水位線的基礎(chǔ); -
WatermarkGenerator
, 主要負(fù)責(zé)按照既定的方式壳澳,基于時間戳生成水位線岂贩。在WatermarkGenerator
中主要有兩個方法:onEvent和onPeriodicEmit;-
onEvent
, 每個事件(數(shù)據(jù))到來都會調(diào)用的方法,他的參數(shù)有當(dāng)前事件巷波、時間戳萎津,以及允許發(fā)出水位線的一個WatermarkOutput,可以基于事件做各種操作抹镊; -
onPeriodiEmit
, 周期性調(diào)用的方法锉屈,可以由WatermarkOutput
發(fā)出水位線。周期時間為處理時間垮耳,可以調(diào)用環(huán)境配置的setAutoWatermarkInterval()方法來設(shè)置颈渊,默認(rèn)為200ms;
-
代碼
Flink提供了內(nèi)置的水位線生成器:
- 有序流终佛,對于有序流俊嗽,主要特點(diǎn)就是時間戳單調(diào)增長(Monotonously Increasing Timestamps),所以永遠(yuǎn)不會出現(xiàn)遲到數(shù)據(jù)的問題铃彰。這是周期性生成水位線的最簡單的場景绍豁,直接調(diào)用WatermarkStrategy.forMonotonousTimestamps()方法就可以實(shí)現(xiàn)。簡單來說豌研,就是直接拿當(dāng)前最大的時間戳作為水位線就可以了妹田。
- 亂序流唬党,由于亂序流中需要等待遲到數(shù)據(jù)到齊,所以必須設(shè)置一個固定量的時間延遲(Fixed Amount of Lateness)鬼佣。這時生成水位線的時間戳驶拱,就是當(dāng)前數(shù)據(jù)流中最大的時間戳減去延遲的結(jié)果,相當(dāng)于把表調(diào)慢晶衷,當(dāng)前時鐘會滯后于數(shù)據(jù)的最大時間戳蓝纲。調(diào)用 WatermarkStrategy. forBoundedOutOfOrderness()方法就可以實(shí)現(xiàn)。這個方法需要傳入一個 maxOutOfOrderness 參數(shù)晌纫,表示“最大亂序程度”税迷,它表示數(shù)據(jù)流中亂序數(shù)據(jù)時間戳的最大差值;如果我們能確定亂序程度锹漱,那么設(shè)置對應(yīng)時間長度的延遲箭养,就可以等到所有的亂序數(shù)據(jù)了。
代碼
自定義水位線
在WatermarkStrategy中哥牍,時間戳分配器TimestampAssigner都是大同小異的毕泌,指定字段提取時間戳就可以了。不同策略的關(guān)鍵在于WatermarkGenerator的實(shí)現(xiàn)嗅辣。整體來說撼泛,F(xiàn)link有兩種不同的生產(chǎn)水位線的方式:一種是周期性的(Periodic),另一種是斷點(diǎn)式的(Punctuated):
- 周期性水位線生成器(Periodic Generator)澡谭,周期性生成器一般是通過 onEvent()觀察判斷輸入的事件愿题,而在 onPeriodicEmit()里發(fā)出水位線;
- 斷點(diǎn)式水位線生成器(Punctuated Generator)蛙奖,斷點(diǎn)式生成器會不停地檢測 onEvent()中的事件潘酗,當(dāng)發(fā)現(xiàn)帶有水位線信息的特殊事件時,就立即發(fā)出水位線外永。一般來說崎脉,斷點(diǎn)式生成器不會通過 onPeriodicEmit()發(fā)出水位線。
此外伯顶,也可以在自定義數(shù)據(jù)源中發(fā)送水位線囚灼,但是這樣就不能使用assignTimestampsAndWatermarks 方法來生成水位線了,兩者只能二選一祭衩。
在“重分區(qū)”(redistributing)的傳輸模式下灶体,一個任務(wù)有可能會收到來自不同分區(qū)上游子任務(wù)的數(shù)據(jù)。而不同分區(qū)的子任務(wù)時鐘并不同步掐暮,所以同一時刻發(fā)給下游任務(wù)的水位線可能并不相同蝎抽。這說明上游各個分區(qū)處理得有快有慢,進(jìn)度各不相同,這時我們應(yīng)該以最慢的那個時鐘樟结,也就是最小的那個水位線為準(zhǔn)养交。
窗口(Window)
Flink是一種流式計(jì)算引擎,主要是用來處理無界數(shù)據(jù)流的瓢宦。想要更加方便的處理無界流碎连,一種方式就是將無限數(shù)據(jù)切割成有限的“數(shù)據(jù)塊”進(jìn)行處理,這就是所謂的“窗口”(window)驮履。在Flink中鱼辙,窗口就是用來處理無界流的核心。
由于存在遲到數(shù)據(jù)的問題玫镐,將窗口視為一個框可能并不是最合適的倒戏。我們可以把它理解成一個“桶”(bucket):每個數(shù)據(jù)都會分發(fā)到對應(yīng)的桶中,當(dāng)?shù)竭_(dá)窗口的結(jié)束時間時恐似,就對每個桶中收集的數(shù)據(jù)進(jìn)行計(jì)算處理杜跷。
窗口的分類
- 按照驅(qū)動類型分類:
- 1)時間窗口,時間窗口以時間點(diǎn)來定義窗口的開始(start)和結(jié)束(end)蹂喻,所以截取出的就是某一時間段的數(shù)據(jù)葱椭;
- 2)計(jì)數(shù)窗口,計(jì)數(shù)窗口基于元素的個數(shù)來截取數(shù)據(jù)口四,到達(dá)固定的個數(shù)時就觸發(fā)計(jì)算并關(guān)閉窗口。每個窗口截取數(shù)據(jù)的個數(shù)秦陋,就是窗口的大小蔓彩。
-
按照窗口分配數(shù)據(jù)的規(guī)則分類:1)滾動窗口,2)滑動窗口驳概,3)會話窗口赤嚼,4)全局窗口;
窗口API概覽
在定義窗口操作之前顺又,需要先確定到底是基于按鍵分區(qū)的數(shù)據(jù)流KeyedStream還是在沒有按鍵分區(qū)的DataStream上面開窗更卒。也即調(diào)用窗口算子之前是否有keyBy操作。
而在API上面的區(qū)別也是非常兄烧铡:
// 按鍵分區(qū)
stream.keyBy(...).window(...)
// 非按鍵分區(qū)
stream.windowAll(...)
窗口分配器(Window Assigner)
定義窗口分配器(Window Assigner)是構(gòu)建窗口算子的第一步蹂空,他的作用就是定義數(shù)據(jù)應(yīng)該被分配到哪個窗口。通過向上一節(jié)中的window/windowAll函數(shù)中傳入WindowAssigner參數(shù)果录,返回WindowStream上枕。
不同窗口類型有不同的窗口分配器。
1弱恒、時間窗口
// 滾動處理時間窗口
stream.keyBy(...)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))
.aggregate(...)
// 滑動處理時間窗口
stream.keyBy(...)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))
.aggregate(...)
// 處理時間會話窗口
stream.keyBy(...)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)
// 滾動事件時間窗口
stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(...)
// 滑動事件時間窗口
stream.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)
// 事件時間會話窗口
stream.keyBy(...)
.window(EventTimeSessionWindows.WithGap(Time.seconds(10)))
.aggregate(...)
// 2辨萍、計(jì)數(shù)窗口
// 滾動計(jì)數(shù)窗口, 定義一個長度為10的滾動計(jì)數(shù)窗口
stream.keyBy(...)
.countWindow(10)
// 滑動計(jì)數(shù)窗口,長度為10返弹,步長為3
stream.keyBy(...)
.countWindow(10, 3)
// 3锈玉、全局窗口, 全局窗口必須自行定義觸發(fā)器才能實(shí)現(xiàn)窗口計(jì)算爪飘,否則起不到任何作用
stream.keyBy(...)
.window(GlobalWindows.create())
窗口函數(shù)(Window Functions)
在上面定義了窗口分配器,我們只是知道了數(shù)據(jù)屬于哪個窗口拉背,而本節(jié)介紹的窗口函數(shù)則是如何將這些窗口中的數(shù)據(jù)收集起來悦施,即如何處理。
窗口函數(shù)是作用在windowStream上面的去团,返回的是DataStream抡诞。各種stream間的轉(zhuǎn)換如下:
1、增量聚合函數(shù)
為了提高實(shí)時性土陪,我們可以像 DataStream 的簡單聚合一樣昼汗,每來一條數(shù)據(jù)就立即進(jìn)行計(jì)算,中間只要保持一個簡單的聚合狀態(tài)就可以了鬼雀;區(qū)別只是在于不立即輸出結(jié)果顷窒,而是要等到窗口結(jié)束時間。等到窗口到了結(jié)束時間需要輸出計(jì)算結(jié)果的時候源哩,我們只需要拿出之前聚合的狀態(tài)直接輸出鞋吉,這無疑就大大提高了程序運(yùn)行的效率和實(shí)時性。
典型的增量聚合函數(shù)有兩個:ReduceFunction和AggregateFunction励烦。
ReduceFunction:
package com.whu.chapter06
import com.whu.chapter05.{ClickSource, Event}
import java.time.Duration
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
object WindowFunctionDemo {
def main(args:Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.addSource(new ClickSource())
// 數(shù)據(jù)源中的時間戳是單調(diào)遞增的谓着,所以使用下面的方法,只需要抽取時間戳就好了
// 等同于最大延遲時間是0毫秒
.assignAscendingTimestamps(_.timeStamp)
.map(r => (r.user, 1L))
// 使用用戶名對數(shù)據(jù)流進(jìn)行分組
.keyBy(_._1)
// 設(shè)置5秒鐘的滾動事件時間窗口
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
// 保留第一個字段坛掠,針對第二個字段進(jìn)行聚合
.reduce((r1, r2) => (r1._1, r1._2+r2._2))
.print()
env.execute()
}
}
AggregateFunction
ReduceFunction 可以解決大多數(shù)歸約聚合的問題赊锚,但是這個接口有一個限制,就是聚合狀態(tài)的類型屉栓、輸出結(jié)果的類型都必須和輸入數(shù)據(jù)類型一樣舷蒲。
AggregateFunction 在源碼中的定義如下:
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable
{
// 創(chuàng)建一個累加器,這就是為聚合創(chuàng)建了一個初始狀態(tài)友多,每個聚合任務(wù)只會調(diào)用一次
ACC createAccumulator();
// 將輸入的元素添加到累加器中牲平。這就是基于聚合狀態(tài),對新來的數(shù)據(jù)進(jìn)行進(jìn)一步聚合的過程
ACC add(IN value, ACC accumulator);
// getResult():從累加器中提取聚合的輸出結(jié)果域滥。也就是說纵柿,我們可以定義多個狀態(tài),然后再基于這些聚合的狀態(tài)計(jì)算出一個結(jié)果進(jìn)行輸出
OUT getResult(ACC accumulator);
// 合并兩個累加器骗绕,并將合并后的狀態(tài)作為一個累加器返回藐窄。這個方法只在需要合并窗口的場景下才會被調(diào)用
ACC merge(ACC a, ACC b);
}
AggregateFunction接受3個數(shù)據(jù)類型:輸入類型(IN)、累加器類型(ACC)和輸出類型(OUT)荆忍。輸入類型 IN 就是輸入流中元素的數(shù)據(jù)類型;累加器類型 ACC 則是我們進(jìn)行聚合的中間狀態(tài)類型;而輸出類型當(dāng)然就是最終計(jì)算結(jié)果的類型了刹枉。
env.addSource(new ClickSource())
.assignAscendingTimestamps(_.timeStamp)
// 通過為每條數(shù)據(jù)分配相同的key叽唱,來將數(shù)據(jù)發(fā)送到同一個分區(qū)
.keyBy(_ => "key")
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2)))
.aggregate(new AvgPv)
env.execute()
class AvgPv extends AggregateFunction[Event,(Set[String], Double), Double] {
// 創(chuàng)建空累加器,類型是元組微宝,元組的第一個元素類型為Set數(shù)據(jù)結(jié)構(gòu)棺亭,用來對用戶名去重
// 第二個元素用來累加pv操作,也就是沒來一條數(shù)據(jù)就加一
override def createAccumulator(): (Set[String], Double) = (Set[String](), 0L)
// 累加規(guī)則
override def add(in: Event, acc: (Set[String], Double)): (Set[String], Double) = {
(acc._1+in.user, acc._2+1)
}
// 獲取窗口關(guān)閉時向下游發(fā)送的結(jié)果
override def getResult(acc: (Set[String], Double)): Double = {
acc._2/(acc._1.size.toDouble)
}
// merge方法只有在事件時間的會話窗口時蟋软,才需要實(shí)現(xiàn)镶摘,這里無需實(shí)現(xiàn)
override def merge(acc: (Set[String], Double), acc1: (Set[String], Double)): (Set[String], Double) = ???
}
全窗口函數(shù)(Full Window Functions)
窗口操作中的另一大類就是全窗口函數(shù),與增量聚合函數(shù)不同岳守,全窗口函數(shù)需要先收集窗口中的數(shù)據(jù)凄敢,并在內(nèi)部緩存起來,等到窗口要輸出結(jié)果的時候再取出數(shù)據(jù)進(jìn)行計(jì)算湿痢。
Flink中的全窗口函數(shù)有兩種:WindowFunction和ProcessWindowFunction涝缝。
// 窗口函數(shù)
stream.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction())
處理窗口函數(shù)ProcessWindowFunction是Window API中最底層的通用窗口函數(shù)接口。除了可以拿到窗口中的所有數(shù)據(jù)之外譬重,ProcessWindowFunction還可以獲取到一個“上下文對象(context)”拒逮。這個上下文對象不僅有窗口信息,還可以訪問當(dāng)前的時間和狀態(tài)信息臀规。這里的時間包括了處理時間(process time)和事件時間水位線(event time watermark)滩援。這使得ProcessWindowFunction更加靈活、功能更加豐富以现,可以認(rèn)為是一個增強(qiáng)版的WindowFunction狠怨。
// Full WindowFunction
env.addSource(new ClickSource())
.assignAscendingTimestamps(_.timeStamp)
// 為所有數(shù)據(jù)都指定同一個key,可以將所有數(shù)據(jù)發(fā)送到同一個分區(qū)
.keyBy(_ => "key")
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(new UvCountByWindow)
.print()
env.execute()
// 自定義窗口處理函數(shù)
class UvCountByWindow extends ProcessWindowFunction[Event, String, String, TimeWindow]{
//
override def process(key: String, context: Context, elements: Iterable[Event], out: Collector[String]): Unit = {
// 初始化一個Set數(shù)據(jù)結(jié)構(gòu)邑遏,用來對用戶名進(jìn)行去重
var userSet = Set[String]()
// 將所有用戶進(jìn)行去重
elements.foreach(userSet += _.user)
// 結(jié)合窗口信息,包裝輸出內(nèi)容
val windowStart = context.window.getStart
val windowEnd = context.window.getEnd
out.collect(" 窗口:"+ new Timestamp(windowStart) + " ~ "+ new Timestamp(windowEnd) + " 獨(dú)立訪客數(shù)為:" + userSet.size)
}
增量和聚合函數(shù)結(jié)合使用
增量聚合相當(dāng)于把計(jì)算量“均攤”到了窗口收集數(shù)據(jù)的過程中恰矩,自然就會比全窗口聚合更加高效记盒、輸出更加實(shí)時。而全窗口函數(shù)的優(yōu)勢在于提供了更多的信息外傅,可以認(rèn)為是更加“通用”的窗口操作纪吮,窗口計(jì)算更加靈活,功能更加強(qiáng)大萎胰。所以在實(shí)際應(yīng)用中碾盟,我們往往希望兼具這兩者的優(yōu)點(diǎn),把它們結(jié)合在一起使用技竟。Flink 的Window API 就給我們實(shí)現(xiàn)了這樣的用法冰肴。
這樣調(diào)用的處理機(jī)制是:基于第一個參數(shù)(增量聚合函數(shù))來處理窗口數(shù)據(jù),每來一個數(shù)
據(jù)就做一次聚合;等到窗口需要觸發(fā)計(jì)算時熙尉,則調(diào)用第二個參數(shù)(全窗口函數(shù))的處理邏輯輸出結(jié)果联逻。
// 全窗口函數(shù)和聚合函數(shù)結(jié)合使用
env.addSource(new ClickSource())
.assignAscendingTimestamps(_.timeStamp)
// 使用url作為key對數(shù)據(jù)進(jìn)行分區(qū)
.keyBy(_.url)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2)))
// 注意這里調(diào)用的是aggregate方法
// 增量聚合函數(shù)和全窗口聚合函數(shù)結(jié)合使用
.aggregate(new UrlViewCountAgg, new UrlViewCountResult)
.print()
class UrlViewCountAgg extends AggregateFunction[Event, Long, Long] {
override def createAccumulator(): Long = 0L
// 每來一個事件就加1
override def add(in: Event, acc: Long): Long = acc + 1L
// 窗口閉合時發(fā)送的計(jì)算結(jié)果
override def getResult(acc: Long): Long = acc
override def merge(acc: Long, acc1: Long): Long = ???
}
case class UrlViewCount(url: String, count: Long, windowStart: Long, windowEnd: Long)
class UrlViewCountResult extends ProcessWindowFunction[Long, UrlViewCount, String, TimeWindow] {
// 迭代器中只有一個元素,是增量聚合函數(shù)在窗口閉合時發(fā)送過來的計(jì)算結(jié)果
override def process(key: String, context: Context, elements: Iterable[Long], out: Collector[UrlViewCount]): Unit = {
out.collect(UrlViewCount(key, elements.iterator.next(), context.window.getStart, context.window.getEnd))
}
}
其它API
- 觸發(fā)器(Trigger):觸發(fā)器主要是用來控制窗口什么時候觸發(fā)計(jì)算检痰。所謂的“觸發(fā)計(jì)算”包归,本質(zhì)上就是執(zhí)行窗口函數(shù),所以可以認(rèn)為是計(jì)算得到結(jié)果并輸出的過程铅歼;Trigger 是窗口算子的內(nèi)部屬性公壤,每個窗口分配器(WindowAssigner)都會對應(yīng)一個默認(rèn)的觸發(fā)器;對于 Flink 內(nèi)置的窗口類型椎椰,它們的觸發(fā)器都已經(jīng)做了實(shí)現(xiàn)厦幅;
- 移除器(Evictor):移除器主要用來定義移除某些數(shù)據(jù)的邏輯;
- 允許延遲(Allowed Lateness):為了解決遲到數(shù)據(jù)的問題俭识,F(xiàn)link 提供了一個特殊的接口慨削,可以為窗口算子設(shè)置一個“允許的最大延遲”(Allowed Lateness)。也就是說套媚,我們可以設(shè)定允許延遲一段時間缚态,在這段時間內(nèi),窗口不會銷毀堤瘤,繼續(xù)到來的數(shù)據(jù)依然可以進(jìn)入窗口中并觸發(fā)計(jì)算玫芦;
- 將遲到的數(shù)據(jù)放入側(cè)輸出流:Flink 還提供了另外一種方式處理遲到數(shù)據(jù)。我們可以將未收入窗口的遲到數(shù)據(jù)本辐,放入“側(cè)輸出流”(side output)進(jìn)行另外的處理桥帆。所謂的側(cè)輸出流,相當(dāng)于是數(shù)據(jù)流的一個“分支”慎皱,這個流中單獨(dú)放置那些本該被丟棄的數(shù)據(jù)老虫。
窗口的生命周期
熟悉了窗口 API 的使用,這里梳理一下窗口本身的生命周期茫多,這也是對窗口所有操作的一個總結(jié):
- 窗口的創(chuàng)建祈匙;
- 窗口計(jì)算的觸發(fā);
-
窗口的銷毀天揖;
遲到數(shù)據(jù)的處理
所謂的“遲到數(shù)據(jù)”(late data)夺欲,是指某個水位線之后到來的數(shù)據(jù),它的時間戳其實(shí)是在水位線之前的今膊。所以只有在事件時間語義下些阅,討論遲到數(shù)據(jù)的處理才是有意義的。
- 設(shè)置水位線延遲時間斑唬;
- 允許窗口處理遲到數(shù)據(jù)市埋;
- 將遲到數(shù)據(jù)放入窗口側(cè)輸出流黎泣;
package com.whu.chapter06
import com.whu.chapter05.{ClickSource, Event}
import com.whu.chapter06.WindowFunctionDemo.{UrlViewCountAgg, UrlViewCountResult}
import java.time.Duration
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import java.sql.Timestamp
object ProcessLateDataDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 為方便測試,讀取socket文本流進(jìn)行處理
val stream = env.socketTextStream("localhost", 7777)
.map(data => {
val fields = data.split(",")
Event(fields(0).trim, fields(1).trim, fields(2).trim.toLong)
})
// 方式1:設(shè)置Watermark延遲時間 2秒鐘
val res1 = stream.assignTimestampsAndWatermarks(WatermarkStrategy
// 最大延遲時間設(shè)置為5秒鐘
.forBoundedOutOfOrderness[Event](Duration.ofSeconds(2))
.withTimestampAssigner( new SerializableTimestampAssigner[Event] {
override def extractTimestamp(t: Event, l: Long): Long = t.timeStamp
})
)
// 定義側(cè)輸出流標(biāo)簽
val outputTag = OutputTag[Event]("late")
val res2 = stream
.keyBy(_.url)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
// 方式二:允許窗口處理遲到數(shù)據(jù)腰素,設(shè)置1分鐘的等待時間
.allowedLateness(Time.minutes(1))
// 方式三:將最后的遲到數(shù)據(jù)輸出到側(cè)輸出流
.sideOutputLateData(outputTag)
.aggregate(new UrlViewCountAgg, new UrlViewCountResult)
res2.print()
res2.getSideOutput(outputTag).print("late")
// 為方便觀察聘裁,可以將原始數(shù)據(jù)也輸出
stream.print("input")
}
}