文章目錄
一、概述
二碰镜、Window分類
1萝喘、TimeWindow與CountWindow
2匙隔、TimeWindow子類型
- Tumble Window(翻轉(zhuǎn)窗口)
- Hop Window(滑動(dòng)窗口)
- Session Window(會(huì)話窗口)
三、Window分類及整體流程
四涌韩、創(chuàng)建WindowOperator算子
五畔柔、WindowOperator處理數(shù)據(jù)圖解
六、WindowOperator源碼調(diào)試
1臣樱、StreamExecGroupWindowAggregate#createWindowOperator()創(chuàng)建算子
2靶擦、WindowOperator#processElement()處理數(shù)據(jù),注冊(cè)Timer
3雇毫、Timer觸發(fā)
- InternalTimerServiceImpl#advanceWatermark()
- WindwOperator#onEventTime()
- emitWindowResult()提交結(jié)果
七玄捕、Emit(Trigger)觸發(fā)器
- 1、Emit策略
- 2棚放、用途
- 3枚粘、語(yǔ)法
- 4、示例
- 5飘蚯、Trigger類和結(jié)構(gòu)關(guān)系
概述
窗口是無(wú)限流上一種核心機(jī)制馍迄,可以流分割為有限大小的“窗口”福也,同時(shí),在窗口內(nèi)進(jìn)行聚合攀圈,從而把源源不斷產(chǎn)生的數(shù)據(jù)根據(jù)不同的條件劃分成一段一段有邊界的數(shù)據(jù)區(qū)間暴凑,使用戶能夠利用窗口功能實(shí)現(xiàn)很多復(fù)雜的統(tǒng)計(jì)分析需求。
本文內(nèi)容:
- Flink SQL WINDOW功能介紹
- 底層實(shí)現(xiàn)源碼分析:StreamExecGroupWindowAggregate創(chuàng)建WindowOperator
- 底層實(shí)現(xiàn)源碼分析:WindowOperator算子處理數(shù)據(jù)這兩個(gè)地方源碼分析赘来。
Window分類
1现喳、TimeWindow與CountWindow
Flink Window可以是時(shí)間驅(qū)動(dòng)的(TimeWindow),也可以是數(shù)據(jù)驅(qū)動(dòng)的(CountWindow)犬辰。
由于flink-planner-blink SQL中目前只支持TimeWindow相應(yīng)的表達(dá)語(yǔ)句(TUMBLE拿穴、HOP、SESSION)忧风,因此默色,本文主要介紹TimeWindow SQL示例和邏輯,CountWindow感興趣的讀者可自行分析狮腿。
2腿宰、TimeWindow子類型
Flink TimeWindow有滑動(dòng)窗口(HOP)、滾動(dòng)窗口(TUMBLE)以及會(huì)話窗口(SESSION)三種缘厢,所選取的字段時(shí)間吃度,可以是系統(tǒng)時(shí)間(PROCTIME)或事件時(shí)間(EVENT TIME)兩種,接來(lái)下依次介紹贴硫。
- Tumble Window(翻轉(zhuǎn)窗口)
翻轉(zhuǎn)窗口Assigner將每個(gè)元素分配給具有指定大小的窗口椿每。翻轉(zhuǎn)窗口的大小是固定的,且不會(huì)重疊英遭。例如间护,指定一個(gè)大小為5分鐘的翻滾窗口,并每5分鐘啟動(dòng)一個(gè)新窗口挖诸,如下圖所示:
TUMBLE ROWTIME語(yǔ)法示例:
CREATE TABLE sessionOrderTableRowtime (
ctime TIMESTAMP,
categoryName VARCHAR,
shopName VARCHAR,
itemName VARCHAR,
userId VARCHAR,
price FLOAT,
action BIGINT,
WATERMARK FOR ctime AS withOffset(ctime, 1000),
proc AS PROCTIME()
) with (
`type` = 'kafka',
format = 'json',
updateMode = 'append',
`group.id` = 'groupId',
bootstrap.servers = 'xxxxx:9092',
version = '0.10',
`zookeeper.connect` = 'xxxxx:2181',
startingOffsets = 'latest',
topic = 'sessionsourceproctime'
);
CREATE TABLE popwindowsink (
countA BIGINT,
ctime_start TIMESTAMP,
ctime_end VARCHAR,
ctime_rowtime VARCHAR,
categoryName VARCHAR,
price_sum FLOAT
) with (
format = 'json',
updateMode = 'append',
bootstrap.servers = 'xxxxx:9092',
version = '0.10',
topic = 'sessionsinkproctime',
`type` = 'kafka'
);
INSERT INTO popwindowsink
(SELECT
COUNT(*),
TUMBLE_START(ctime, INTERVAL '5' MINUTE),
DATE_FORMAT(TUMBLE_END(ctime, INTERVAL '5' MINUTE), 'yyyy-MM-dd-HH-mm-ss:SSS'), --將TUMBLE_END轉(zhuǎn)為可視化的日期
DATE_FORMAT(TUMBLE_ROWTIME(ctime, INTERVAL '5' MINUTE), 'yyyy-MM-dd-HH-mm-ss:SSS'), --這里TUMBLE_ROWTIME為TUMBLE_END-1ms汁尺,一般用于后續(xù)窗口級(jí)聯(lián)聚合
categoryName,
SUM(price)
FROM sessionOrderTableRowtime
GROUP BY TUMBLE(ctime, INTERVAL '5' MINUTE), categoryName)
TUMBLEP ROCTIME語(yǔ)法示例:
INSERT INTO popwindowsink
(SELECT
COUNT(*),
TUMBLE_START(proc, INTERVAL '5' MINUTE),
DATE_FORMAT(TUMBLE_END(proc, INTERVAL '5' MINUTE), 'yyyy-MM-dd-HH-mm-ss:SSS'),
DATE_FORMAT(TUMBLE_PROCTIME(proc, INTERVAL '5' MINUTE), 'yyyy-MM-dd-HH-mm-ss:SSS'), --注意這里proc字段即Source DDL中指定的PROCTIME
categoryName,
SUM(price)
FROM sessionOrderTableRowtime
GROUP BY TUMBLE(proc, INTERVAL '5' MINUTE), categoryName)
ROWTIME與PROCTIME區(qū)別:
- 在使用上: 主要是填入的ctime、proc關(guān)鍵字的區(qū)別多律,這兩個(gè)字段在Source DDL中指定方式不一樣.
- 在實(shí)現(xiàn)原理上: ROWTIME模式痴突,根據(jù)ctime對(duì)應(yīng)的值,去確定窗口的start狼荞、end辽装;PROCTIME模式,在WindowOperator處理數(shù)據(jù)時(shí)相味,獲取本地系統(tǒng)時(shí)間拾积,去確定窗口的start、end.
由于生產(chǎn)系統(tǒng)中,主要使用ROWTIME來(lái)計(jì)算殷勘、聚合此再、統(tǒng)計(jì),PROCTIME一般用于測(cè)試或?qū)y(tǒng)計(jì)精度要求不高的場(chǎng)景玲销,本文后續(xù)都主要以ROWTIME進(jìn)行分析输拇。
- Hop Window(滑動(dòng)窗口)
滑動(dòng)窗口Assigner將元素分配給多個(gè)固定長(zhǎng)度的窗口。類似于滾動(dòng)窗口分配程序贤斜,窗口的大小由窗口大小參數(shù)配置策吠。因此,如果滑動(dòng)窗口小于窗口大小瘩绒,則滑動(dòng)窗口可以重疊猴抹。在這種情況下,元素被分配到多個(gè)窗口锁荔。其實(shí)蟀给,滾動(dòng)窗口TUMBLE是滑動(dòng)窗口的一個(gè)特例。
例子阳堕,設(shè)置一個(gè)10分鐘長(zhǎng)度的窗口跋理,以5分鐘間隔滑動(dòng)。這樣恬总,每5分鐘就會(huì)出現(xiàn)一個(gè)窗口前普,其中包含最近10分鐘內(nèi)到達(dá)的事件,如下圖:
HOP ROWTIME語(yǔ)法示例:
INSERT INTO popwindowsink
(SELECT
COUNT(*),
HOP_START(ctime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE),
DATE_FORMAT(HOP_END(ctime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE), 'yyyy-MM-dd-HH-mm-ss:SSS'),
DATE_FORMAT(HOP_ROWTIME(ctime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE), 'yyyy-MM-dd-HH-mm-ss:SSS'), --注意這里ctime字段即Source DDL中指定的ROWTIME
categoryName,
SUM(price)
FROM sessionOrderTableRowtime
GROUP BY HOP(ctime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE), categoryName)
- Session Window(會(huì)話窗口)
會(huì)話窗口Assigner根據(jù)活動(dòng)會(huì)話對(duì)元素進(jìn)行分組壹堰。與翻滾窗口和滑動(dòng)窗口相比拭卿,會(huì)話窗口不會(huì)重疊,也沒有固定的開始和結(jié)束時(shí)間贱纠。相反峻厚,會(huì)話窗口在一段時(shí)間內(nèi)不接收元素時(shí)關(guān)閉,即并巍,當(dāng)一段不活躍的間隙發(fā)生時(shí)目木,當(dāng)前會(huì)話關(guān)閉,隨后的元素被分配給新的會(huì)話懊渡。
SESSION ROWTIME語(yǔ)法示例:
INSERT INTO popwindowsink
(SELECT
COUNT(*),
SESSION_START(ctime, INTERVAL '5' MINUTE),
DATE_FORMAT(SESSION_END(ctime, INTERVAL '5' MINUTE, 'yyyy-MM-dd-HH-mm-ss:SSS'),
DATE_FORMAT(SESSION_ROWTIME(ctime, INTERVAL '5' MINUTE), 'yyyy-MM-dd-HH-mm-ss:SSS'), --注意這里ctime字段即Source DDL中指定的ROWTIME
categoryName,
SUM(price)
FROM sessionOrderTableRowtime
GROUP BY SESSION(ctime, INTERVAL '5' MINUTE), categoryName)
Window分類及整體流程
上圖內(nèi)部流程分析:
應(yīng)用層SQL:
1.1 window分類及配置,包括滑動(dòng)军拟、翻轉(zhuǎn)剃执、會(huì)話類型窗口
1.2 window時(shí)間類型配置,默認(rèn)待字段名的EventTime懈息,也可以通過(guò)PROCTIME()配置為ProcessingTime
Calcite解析引擎:
2.1 Calcite SQL解析肾档,包括邏輯、優(yōu)化、物理計(jì)劃和算子綁定(#translateToPlanInternal)怒见,在本文特指StreamExecGroupWindowAggregateRule和StreamExecGroupWindowAggregate物理計(jì)劃
WindowOperator算子創(chuàng)建相關(guān):
3.1 StreamExecGroupWindowAggregate#createWindowOperator創(chuàng)建算子
3.2 WindowAssigner的創(chuàng)建俗慈,根據(jù)輸入的數(shù)據(jù),和窗口類型遣耍,生成多個(gè)窗口
3.3 processElement()真實(shí)處理數(shù)據(jù)闺阱,包括聚合運(yùn)算,生成窗口舵变,更新緩存酣溃,提交數(shù)據(jù)等功能
3.4 Trigger根據(jù)數(shù)據(jù)或時(shí)間,來(lái)決定窗口觸發(fā)
創(chuàng)建WindowOperator算子
由于window語(yǔ)法主要是在group by語(yǔ)句中使用纪隙,calcite創(chuàng)建WindowOperator算子伴隨著聚合策略的實(shí)現(xiàn)赊豌,包括聚合規(guī)則匹配(StreamExecGroupWindowAggregateRule),以及生成聚合physical算子StreamExecGroupWindowAggregate兩個(gè)子流程:
上圖內(nèi)部流程分析:
a. StreamExecGroupWindowAggregateRule會(huì)對(duì)window進(jìn)行提前匹配绵咱,
生成的WindowEmitStrategy內(nèi)部具有:是否為EventTime表標(biāo)識(shí)碘饼、是否為SessionWindow、early fire和late fire配置悲伶、延遲毫秒數(shù)(窗口結(jié)束時(shí)間加上這個(gè)毫秒數(shù)即數(shù)據(jù)清理時(shí)間)
b. StreamExecGroupWindowAggregateRule會(huì)獲取聚合邏輯計(jì)劃中派昧,window配置的時(shí)間字段,記錄時(shí)間字段index信息拢切,window的觸發(fā)和清理都會(huì)用到這個(gè)時(shí)間
c. StreamExecGroupWindowAggregate入口即為translateToPlanInternal蒂萎,它的實(shí)現(xiàn)方式與spark比較類似,會(huì)先循環(huán)調(diào)用child子節(jié)點(diǎn)translateToPlan方法淮椰,生成inputtranform信息作為輸入
d.創(chuàng)建aggregateHandler是一個(gè)代碼生成的過(guò)程五慈,其生成的創(chuàng)建的class實(shí)現(xiàn)了accumulate、retract主穗、merge泻拦、update方法,這個(gè)handler最后也傳遞給了WindowOperater忽媒,處理數(shù)據(jù)時(shí)争拐,可以進(jìn)行聚合温圆、回撤并輸出最新數(shù)據(jù)給下游
e. StreamExecGroupWindowAggregate與window相關(guān)的最后一步就是調(diào)用#createWindowOperator創(chuàng)建算子仆抵,其內(nèi)部先創(chuàng)建了一個(gè)WindowOperatorBuilder,設(shè)置window類型抵赢、retract標(biāo)識(shí)闹瞧、trigger(window觸發(fā)條件)绑雄、聚合函數(shù)句柄等,最后創(chuàng)建WindowOperator
WindowOperator處理數(shù)據(jù)圖解
在上一小節(jié)奥邮,已經(jīng)完成了WindowOperator參數(shù)的設(shè)定万牺,并創(chuàng)建實(shí)例罗珍,接下來(lái)我們主要分析WindowOperator真實(shí)處理數(shù)據(jù)的流程(起點(diǎn)在WindowOperator#processElement方法):
processElement處理數(shù)據(jù)流程:
a、 獲取當(dāng)前record具有的事件時(shí)間脚粟,如果是Processing Time模式覆旱,從時(shí)間服務(wù)Service里面獲取時(shí)間即可
b、使用上一步獲取的時(shí)間核无,接著調(diào)用windowFunction.assignWindow生成窗口扣唱,其內(nèi)部實(shí)際上是調(diào)用各類型的WindowAssigner生成窗口,windowFunction有三大類厕宗,分別是Paned(滑動(dòng))画舌、Merge(會(huì)話)、General(前兩種以外的)已慢,WindowAssigner類型大致有5類曲聂,分別是Tumbling(翻轉(zhuǎn))、Sliding(滑動(dòng))佑惠、Session(會(huì)話)朋腋、CountTumbling 、CountSlide這幾類,根據(jù)輸入的一條數(shù)據(jù)和時(shí)間膜楷,可以生成1到多個(gè)窗口
c旭咽、接下來(lái)是遍歷涉及的窗口進(jìn)行聚合,包括從windowState獲取聚合前值赌厅、使用句柄進(jìn)行聚合穷绵、更新狀態(tài)至windowState,將當(dāng)前轉(zhuǎn)態(tài)
d特愿、上一步聚合完成后仲墨,就可以遍歷窗口,使用TriggerContext(其實(shí)就是不同類型窗口Trigger觸發(fā)器的代理)揍障,綜合early fire目养、late fire、水印時(shí)間與窗口結(jié)束時(shí)間毒嫡,綜合判斷是否觸發(fā)窗口寫出
e癌蚁、如果TriggerContext判斷出觸發(fā)條件為true,則調(diào)用emitWindowResult寫出兜畸,其內(nèi)部有retract判斷努释,更新當(dāng)前state及previous state,寫出數(shù)據(jù)等操作
f膳叨、如果TriggerContext判斷出觸發(fā)條件為false洽洁,則觸發(fā)需要注冊(cè)cleanupTimer,到達(dá)指定時(shí)間后,觸發(fā)onEventTime或onProcessingTime
g菲嘴、onEventTime或onProcessingTime功能十分類似,首先會(huì)觸發(fā)emitWindowResult提交結(jié)果,另外會(huì)判斷窗口結(jié)束時(shí)間+Lateness和當(dāng)前時(shí)間是否相等龄坪,相等則表示可以清除窗口數(shù)據(jù)昭雌、當(dāng)前state及previous state、窗口對(duì)應(yīng)trigger健田。
WindowOperator源碼調(diào)試
為了更直觀的理解Window內(nèi)部運(yùn)行原理烛卧,這里我們引入一個(gè)Flink源碼中已有的SQL Window測(cè)試用例,并進(jìn)行了簡(jiǎn)單的修改(即修改為使用HOP滑動(dòng)窗口)
class WindowJoinITCase{
@Test
def testRowTimeInnerJoinWithWindowAggregateOnFirstTime(): Unit = {
val sqlQuery =
"""
|SELECT t1.key, HOP_END(t1.rowtime, INTERVAL '4' SECOND, INTERVAL '20' SECOND), COUNT(t1.key)
|FROM T1 AS t1
|GROUP BY HOP(t1.rowtime, INTERVAL '4' SECOND, INTERVAL '20' SECOND), t1.key
|""".stripMargin
val data1 = new mutable.MutableList[(String, String, Long)]
data1.+=(("A", "L-1", 1000L))
data1.+=(("A", "L-2", 2000L))
data1.+=(("A", "L-3", 3000L))
//data1.+=(("B", "L-8", 2000L))
data1.+=(("B", "L-4", 4000L))
data1.+=(("C", "L-5", 2100L))
data1.+=(("A", "L-6", 10000L))
data1.+=(("A", "L-7", 13000L))
val t1 = env.fromCollection(data1)
.assignTimestampsAndWatermarks(new Row3WatermarkExtractor2)
.toTable(tEnv, 'key, 'id, 'rowtime)
tEnv.registerTable("T1", t1)
val sink = new TestingAppendSink
val t_r = tEnv.sqlQuery(sqlQuery)
val result = t_r.toAppendStream[Row]
result.addSink(sink)
env.execute()
}
}
1妓局、StreamExecGroupWindowAggregate#createWindowOperator()創(chuàng)建算子
StreamExecGroupWindowAggregate#createWindowOperator()是創(chuàng)建WindowOperator算子的地方总放,對(duì)應(yīng)的代碼和注釋:
class StreamExecGroupWindowAggregate{
private def createWindowOperator(
config: TableConfig,
aggsHandler: GeneratedNamespaceAggsHandleFunction[_],
recordEqualiser: GeneratedRecordEqualiser,
accTypes: Array[LogicalType],
windowPropertyTypes: Array[LogicalType],
aggValueTypes: Array[LogicalType],
inputFields: Seq[LogicalType],
timeIdx: Int): WindowOperator[_, _] = {
val builder = WindowOperatorBuilder
.builder()
.withInputFields(inputFields.toArray)
val timeZoneOffset = -config.getTimeZone.getOffset(Calendar.ZONE_OFFSET)
// 設(shè)置WindowOperatorBuilder,最后通過(guò)Builder創(chuàng)建WindowOperator
val newBuilder = window match {
case TumblingGroupWindow(_, timeField, size) //Tumble PROCTIME模式好爬,內(nèi)部設(shè)置Assiger
if isProctimeAttribute(timeField) && hasTimeIntervalType(size) =>
builder.tumble(toDuration(size), timeZoneOffset).withProcessingTime()
case TumblingGroupWindow(_, timeField, size) //Tumble ROWTIME模式局雄,內(nèi)部設(shè)置Assiger
if isRowtimeAttribute(timeField) && hasTimeIntervalType(size) =>
builder.tumble(toDuration(size), timeZoneOffset).withEventTime(timeIdx)
case SlidingGroupWindow(_, timeField, size, slide) //HOP PROCTIME模式,內(nèi)部設(shè)置Assiger
if isProctimeAttribute(timeField) && hasTimeIntervalType(size) =>
builder.sliding(toDuration(size), toDuration(slide), timeZoneOffset)
.withProcessingTime()
.....
case SessionGroupWindow(_, timeField, gap)
if isRowtimeAttribute(timeField) =>
builder.session(toDuration(gap)).withEventTime(timeIdx)
}
// Retraction和Trigger設(shè)置
//默認(rèn)是no retract和EventTime.afterEndOfWindow
if (emitStrategy.produceUpdates) {
// mark this operator will send retraction and set new trigger
newBuilder
.withSendRetraction()
.triggering(emitStrategy.getTrigger)
}
newBuilder
.aggregate(aggsHandler, recordEqualiser, accTypes, aggValueTypes, windowPropertyTypes)
.withAllowedLateness(Duration.ofMillis(emitStrategy.getAllowLateness))
.build()
}
}
2存炮、WindowOperator#processElement()處理數(shù)據(jù)炬搭,注冊(cè)Timer
public class WindowOperator{
public void processElement(StreamRecord<BaseRow> record) throws Exception {
BaseRow inputRow = record.getValue();
long timestamp;
// 獲取時(shí)間戳(數(shù)據(jù)時(shí)間或系統(tǒng)時(shí)間),這個(gè)時(shí)間是后續(xù)邏輯劃分窗口的依據(jù)
// 例如獲取的timestamp為10000L
if (windowAssigner.isEventTime()) {
timestamp = inputRow.getLong(rowtimeIndex);
} else {
timestamp = internalTimerService.currentProcessingTime();
}
// 計(jì)算當(dāng)前數(shù)據(jù)所屬于的窗口穆桂,注意滑動(dòng)窗口這里計(jì)算出來(lái)也只有一個(gè)affected窗口(見調(diào)試數(shù)據(jù))宫盔,在這個(gè)窗口內(nèi)進(jìn)行聚合
Collection<W> affectedWindows = windowFunction.assignStateNamespace(inputRow, timestamp);
boolean isElementDropped = true;
for (W window : affectedWindows) {
isElementDropped = false;
// 設(shè)置ValueState命名空間,例如TimeWindow{start=8000, end=12000}
windowState.setCurrentNamespace(window);
// 從windowState獲取上次聚合值
BaseRow acc = windowState.value();
if (acc == null) {
acc = windowAggregator.createAccumulators();
}
windowAggregator.setAccumulators(window, acc);
// 默認(rèn)進(jìn)行聚合
if (BaseRowUtil.isAccumulateMsg(inputRow)) {
windowAggregator.accumulate(inputRow);
} else {
windowAggregator.retract(inputRow);
}
acc = windowAggregator.getAccumulators();
// 更新TimeWindow{start=8000, end=12000}對(duì)應(yīng)聚合值
windowState.update(acc);
}
// 對(duì)應(yīng)的實(shí)際窗口享完,例如輸入Timestamp為10000L灼芭,且執(zhí)行HOP(t1.rowtime, INTERVAL '4' SECOND, INTERVAL '20' SECOND),拆分出實(shí)際的窗口為:
// TimeWindow{start=-8000, end=12000}
// TimeWindow{start=-4000, end=16000}
// TimeWindow{start=0, end=20000}
// TimeWindow{start=4000, end=24000}
// TimeWindow{start=8000, end=28000}
Collection<W> actualWindows = windowFunction.assignActualWindows(inputRow, timestamp);
for (W window : actualWindows) {
isElementDropped = false;
triggerContext.window = window;
// 判斷窗口是否立即觸發(fā)般又,例如earliy fire模式彼绷,默認(rèn)這里是不觸發(fā)的,交給onEventTime()或onProcessingTime()來(lái)觸發(fā)
boolean triggerResult = triggerContext.onElement(inputRow, timestamp);
if (triggerResult) {
emitWindowResult(window);
}
// 注冊(cè)清理時(shí)間倒源,根據(jù)時(shí)間模式苛预,分別對(duì)應(yīng)到Event Time對(duì)應(yīng)Timer或Processing Time對(duì)應(yīng)Timer
// Event Time對(duì)應(yīng)Timer通過(guò)全局watermark來(lái)觸發(fā),實(shí)現(xiàn)代碼在InternalTimerServiceImpl#advanceWatermark()
// watermark是一個(gè)遞增的邏輯笋熬,后面代碼解析
registerCleanupTimer(window);
}
if (isElementDropped) {
// markEvent will increase numLateRecordsDropped
lateRecordsDroppedRate.markEvent();
}
}
}
運(yùn)行數(shù)據(jù):
3热某、Timer觸發(fā)
I、InternalTimerServiceImpl#advanceWatermark()
WindowOperator#onEventTime()的調(diào)用前胳螟,可以先看其上層調(diào)用:InternalTimerServiceImpl#advanceWatermark()
當(dāng)獲取的watermark為9999L時(shí)昔馋,把eventTimeTimerQueue隊(duì)列中所有小于這個(gè)值的timer poll出來(lái),調(diào)用WindowOperator.onEnventTime(timer)
II糖耸、WindwOperator#onEventTime()
WindwOperator#onEventTime()方法比較清晰秘遏,主要是window的觸發(fā)和window的清理兩段邏輯:
public class WindowOperator{
public void onEventTime(InternalTimer<K, W> timer) throws Exception {
setCurrentKey(timer.getKey());
triggerContext.window = timer.getNamespace();
if (triggerContext.onEventTime(timer.getTimestamp())) {
// fire
emitWindowResult(triggerContext.window);
}
if (windowAssigner.isEventTime()) {
windowFunction.cleanWindowIfNeeded(triggerContext.window, timer.getTimestamp());
}
}
}
III、emitWindowResult()提交結(jié)果
emitWindowResult()重點(diǎn)關(guān)注下其第一行代碼:
BaseRow aggResult = windowFunction.getWindowAggregationResult(window);
這個(gè)表示根據(jù)具體的TimeWindow{start=4000, end=24000}嘉竟,去獲取聚合數(shù)據(jù)邦危,如果是滑動(dòng)窗口洋侨,需要將4000, 8000 ,12000,16000 , 20000, 24000這幾段affect窗口里面的聚合值合并起來(lái)倦蚪,內(nèi)部邏輯:
public class PanedWindowProcessFunction{
public BaseRow getWindowAggregationResult(W window) throws Exception {
Iterable<W> panes = windowAssigner.splitIntoPanes(window);
BaseRow acc = windowAggregator.createAccumulators();
// null namespace means use heap data views
windowAggregator.setAccumulators(null, acc);
for (W pane : panes) {
BaseRow paneAcc = ctx.getWindowAccumulators(pane);
if (paneAcc != null) {
windowAggregator.merge(pane, paneAcc);
}
}
return windowAggregator.getValue(window);
}
}
Emit(Trigger)觸發(fā)器
- 配置方式指定Trigger:Flink1.9.0目前支持通過(guò)TableConifg配置earlyFireInterval希坚、lateFireInterval毫秒數(shù),來(lái)指定窗口結(jié)束之前陵且、窗口結(jié)束之后的觸發(fā)策略(默認(rèn)是watermark超過(guò)窗口結(jié)束后觸發(fā)一次)裁僧,策略的解析在WindowEmitStrategy,在StreamExecGroupWindowAggregateRule就會(huì)創(chuàng)建和解析這個(gè)策略
- SQL方式指定Trigger:Flink1.9.0代碼中calcite部分已有SqlEmit相關(guān)的實(shí)現(xiàn)慕购,后續(xù)可以支持SQL 語(yǔ)句(INSERT INTO)中配置EMIT觸發(fā)器
本文Emit和Trigger都是觸發(fā)器這一個(gè)概念聊疲,只是使用的方式不一樣
1、Emit策略
Emit 策略是指在Flink SQL 中沪悲,query的輸出策略(如能忍受的延遲)可能在不同的場(chǎng)景有不同的需求获洲,而這部分需求,傳統(tǒng)的 ANSI SQL 并沒有對(duì)應(yīng)的語(yǔ)法支持可训。比如用戶需求:1小時(shí)的時(shí)間窗口昌妹,窗口觸發(fā)之前希望每分鐘都能看到最新的結(jié)果,窗口觸發(fā)之后希望不丟失遲到一天內(nèi)的數(shù)據(jù)握截。針對(duì)這類需求飞崖,抽象出了EMIT語(yǔ)法,并擴(kuò)展到了SQL語(yǔ)法谨胞。
2固歪、用途
EMIT語(yǔ)法的用途目前總結(jié)起來(lái)主要提供了:控制延遲、數(shù)據(jù)精確性胯努,兩方面的功能牢裳。
- 控制延遲。針對(duì)大窗口叶沛,設(shè)置窗口觸發(fā)之前的EMIT輸出頻率蒲讯,減少用戶看到結(jié)果的延遲(WITH| WITHOUT DELAY)。
- 數(shù)據(jù)精確性灰署。不丟棄窗口觸發(fā)之后的遲到的數(shù)據(jù)判帮,修正輸出結(jié)果(minIdleStateRetentionTime,在WindowEmitStrategy中生成allowLateness)溉箕。
在選擇EMIT策略時(shí)晦墙,還需要與處理開銷進(jìn)行權(quán)衡。因?yàn)樵降偷妮敵鲅舆t肴茄、越高的數(shù)據(jù)精確性晌畅,都會(huì)帶來(lái)越高的計(jì)算開銷。
3寡痰、語(yǔ)法
EMIT 語(yǔ)法是用來(lái)定義輸出的策略抗楔,即是定義在輸出(INSERT INTO)上的動(dòng)作棋凳。當(dāng)未配置時(shí),保持原有默認(rèn)行為谓谦,即 window 只在 watermark 觸發(fā)時(shí) EMIT 一個(gè)結(jié)果贫橙。
語(yǔ)法:
INSERT INTO tableName
query
EMIT strategy [, strategy]*
strategy ::= {WITH DELAY timeInterval | WITHOUT DELAY}
[BEFORE WATERMARK |AFTER WATERMARK]
timeInterval ::=‘string’ timeUnit
WITH DELAY:聲明能忍受的結(jié)果延遲贪婉,即按指定 interval 進(jìn)行間隔輸出反粥。
WITHOUT DELAY:聲明不忍受延遲,即每來(lái)一條數(shù)據(jù)就進(jìn)行輸出疲迂。
BEFORE WATERMARK:窗口結(jié)束之前的策略配置才顿,即watermark 觸發(fā)之前。
AFTER WATERMARK:窗口結(jié)束之后的策略配置尤蒿,即watermark 觸發(fā)之后郑气。
注:
- 其中 strategy可以定義多個(gè),同時(shí)定義before和after的策略腰池。 但不能同時(shí)定義兩個(gè) before 或 兩個(gè)after 的策略尾组。
- 若配置了AFTER WATERMARK 策略,需要顯式地在TableConfig中配置minIdleStateRetentionTime標(biāo)識(shí)能忍受的最大遲到時(shí)間示弓。
- minIdleStateRetentionTime在window中只影響窗口何時(shí)清除讳侨,不直接影響窗口何時(shí)觸發(fā), 例如配置為3600000奏属,最多容忍1小時(shí)的遲到數(shù)據(jù)跨跨,超過(guò)這個(gè)時(shí)間的數(shù)據(jù)會(huì)直接丟棄
4、示例
如果我們已經(jīng)有一個(gè)TUMBLE(ctime, INTERVAL ‘1’ HOUR)的窗口囱皿,tumble_window 的輸出是需要等到一小時(shí)結(jié)束才能看到結(jié)果勇婴,我們希望能盡早能看到窗口的結(jié)果(即使是不完整的結(jié)果)。例如嘱腥,我們希望每分鐘看到最新的窗口結(jié)果:
INSERT INTO result
SELECT * FROM tumble_window
EMIT WITH DELAY ‘1’ MINUTE BEFORE WATERMARK – 窗口結(jié)束之前耕渴,每隔1分鐘輸出一次更新結(jié)果
tumble_window 會(huì)忽略并丟棄窗口結(jié)束后到達(dá)的數(shù)據(jù),而這部分?jǐn)?shù)據(jù)對(duì)我們來(lái)說(shuō)很重要齿兔,希望能統(tǒng)計(jì)進(jìn)最終的結(jié)果里橱脸。而且我們知道我們的遲到數(shù)據(jù)不會(huì)太多,且遲到時(shí)間不會(huì)超過(guò)一天以上愧驱,并且希望收到遲到的數(shù)據(jù)立刻就更新結(jié)果:
INSERT INTO result
SELECT * FROM tumble_window
EMIT WITH DELAY ‘1’ MINUTE BEFORE WATERMARK,
WITHOUT DELAY AFTER WATERMARK --窗口結(jié)束之后慰技,每條到達(dá)的數(shù)據(jù)都輸出
tEnv.getConfig.setIdleStateRetentionTime(Time.days(1), Time.days(2))//min、max组砚,只有Time.days(1)這個(gè)參數(shù)直接對(duì)window生效
補(bǔ)充一下WITH DELAY '1’這種配置的周期觸發(fā)策略(即DELAY大于0)吻商,最后都是由ProcessingTime系統(tǒng)時(shí)間觸發(fā):
class WindowEmitStrategy{
private def createTriggerFromInterval(
enableDelayEmit: Boolean,
interval: Long): Option[Trigger[TimeWindow]] = {
if (!enableDelayEmit) {
None
} else {
if (interval > 0) {
// 系統(tǒng)時(shí)間觸發(fā),小于wm的所有timer都執(zhí)行onProcessingTime()
Some(ProcessingTimeTriggers.every(Duration.ofMillis(interval)))
} else {
// 為0則每條都觸發(fā)
Some(ElementTriggers.every())
}
}
}
}
5糟红、Trigger類和結(jié)構(gòu)關(guān)系
在源碼中艾帐,Window Trigger的實(shí)現(xiàn)子類有10個(gè)左右乌叶,需要結(jié)合上一個(gè)小節(jié)的EMIT SQL能更容易理清他們之間的關(guān)系,這里簡(jiǎn)單介紹下:
- AfterEndOfWindow:這個(gè)就是沒配置任何EMIT策略時(shí)柒爸,默認(rèn)的EvenTime准浴、ProcTime
- Window觸發(fā)策略(即窗口結(jié)束后觸發(fā)一次)
- EveryElement:即delay=0,在processElement()時(shí)直接觸發(fā)捎稚,無(wú)論是在窗口結(jié)束之前或者窗口結(jié)束之后都觸發(fā)乐横,且不再注冊(cè)timer
- AfterEndOfWindowNoLate:對(duì)應(yīng)EMIT WITHOUT DELAY AFTER WATERMARK,窗口結(jié)束之前不輸出今野,窗口結(jié)束之后無(wú)延遲輸出
- AfterFirstElementPeriodic:對(duì)應(yīng)WITH DELAY ‘1’ MINUTE BEFORE| AFTER WATERMARK葡公,即按系統(tǒng)時(shí)間周期執(zhí)行,由ProcessingTime系統(tǒng)時(shí)間周期觸發(fā)
關(guān)注我的公眾號(hào)条霜,后臺(tái)回復(fù)【JAVAPDF】獲取200頁(yè)面試題催什!
5萬(wàn)人關(guān)注的大數(shù)據(jù)成神之路,不來(lái)了解一下嗎宰睡?
5萬(wàn)人關(guān)注的大數(shù)據(jù)成神之路蒲凶,真的不來(lái)了解一下嗎?
5萬(wàn)人關(guān)注的大數(shù)據(jù)成神之路拆内,確定真的不來(lái)了解一下嗎旋圆?