Flink SQL Window源碼全解析

文章目錄

一、概述

二碰镜、Window分類

1萝喘、TimeWindow與CountWindow

2匙隔、TimeWindow子類型

  • Tumble Window(翻轉(zhuǎn)窗口)
  • Hop Window(滑動(dòng)窗口)
  • Session Window(會(huì)話窗口)

三、Window分類及整體流程

四涌韩、創(chuàng)建WindowOperator算子

五畔柔、WindowOperator處理數(shù)據(jù)圖解

六、WindowOperator源碼調(diào)試

1臣樱、StreamExecGroupWindowAggregate#createWindowOperator()創(chuàng)建算子

2靶擦、WindowOperator#processElement()處理數(shù)據(jù),注冊(cè)Timer

3雇毫、Timer觸發(fā)

  • InternalTimerServiceImpl#advanceWatermark()
  • WindwOperator#onEventTime()
  • emitWindowResult()提交結(jié)果

七玄捕、Emit(Trigger)觸發(fā)器

  • 1、Emit策略
  • 2棚放、用途
  • 3枚粘、語(yǔ)法
  • 4、示例
  • 5飘蚯、Trigger類和結(jié)構(gòu)關(guān)系

概述

窗口是無(wú)限流上一種核心機(jī)制馍迄,可以流分割為有限大小的“窗口”福也,同時(shí),在窗口內(nèi)進(jìn)行聚合攀圈,從而把源源不斷產(chǎn)生的數(shù)據(jù)根據(jù)不同的條件劃分成一段一段有邊界的數(shù)據(jù)區(qū)間暴凑,使用戶能夠利用窗口功能實(shí)現(xiàn)很多復(fù)雜的統(tǒng)計(jì)分析需求。

本文內(nèi)容:

  • Flink SQL WINDOW功能介紹
  • 底層實(shí)現(xiàn)源碼分析:StreamExecGroupWindowAggregate創(chuàng)建WindowOperator
  • 底層實(shí)現(xiàn)源碼分析:WindowOperator算子處理數(shù)據(jù)這兩個(gè)地方源碼分析赘来。

Window分類

1现喳、TimeWindow與CountWindow
Flink Window可以是時(shí)間驅(qū)動(dòng)的(TimeWindow),也可以是數(shù)據(jù)驅(qū)動(dòng)的(CountWindow)犬辰。
由于flink-planner-blink SQL中目前只支持TimeWindow相應(yīng)的表達(dá)語(yǔ)句(TUMBLE拿穴、HOP、SESSION)忧风,因此默色,本文主要介紹TimeWindow SQL示例和邏輯,CountWindow感興趣的讀者可自行分析狮腿。

2腿宰、TimeWindow子類型
Flink TimeWindow有滑動(dòng)窗口(HOP)、滾動(dòng)窗口(TUMBLE)以及會(huì)話窗口(SESSION)三種缘厢,所選取的字段時(shí)間吃度,可以是系統(tǒng)時(shí)間(PROCTIME)或事件時(shí)間(EVENT TIME)兩種,接來(lái)下依次介紹贴硫。

  • Tumble Window(翻轉(zhuǎn)窗口)

翻轉(zhuǎn)窗口Assigner將每個(gè)元素分配給具有指定大小的窗口椿每。翻轉(zhuǎn)窗口的大小是固定的,且不會(huì)重疊英遭。例如间护,指定一個(gè)大小為5分鐘的翻滾窗口,并每5分鐘啟動(dòng)一個(gè)新窗口挖诸,如下圖所示:

file

TUMBLE ROWTIME語(yǔ)法示例:

CREATE TABLE sessionOrderTableRowtime (
    ctime TIMESTAMP,
    categoryName VARCHAR,
    shopName VARCHAR,
    itemName VARCHAR,
    userId VARCHAR,
    price FLOAT,
    action BIGINT,
    WATERMARK FOR ctime AS withOffset(ctime, 1000),
    proc AS PROCTIME()
) with (
    `type` = 'kafka',
    format = 'json',
    updateMode = 'append',
    `group.id` = 'groupId',
    bootstrap.servers = 'xxxxx:9092',
    version = '0.10',
    `zookeeper.connect` = 'xxxxx:2181',
    startingOffsets = 'latest',
    topic = 'sessionsourceproctime'
);


CREATE TABLE popwindowsink (
    countA BIGINT,
    ctime_start TIMESTAMP,
    ctime_end VARCHAR,
    ctime_rowtime VARCHAR,
    categoryName VARCHAR,
    price_sum FLOAT
) with (
    format = 'json',
    updateMode = 'append',
    bootstrap.servers = 'xxxxx:9092',
    version = '0.10',
    topic = 'sessionsinkproctime',
    `type` = 'kafka'
);

INSERT INTO popwindowsink
(SELECT
COUNT(*),
TUMBLE_START(ctime, INTERVAL '5' MINUTE),
DATE_FORMAT(TUMBLE_END(ctime, INTERVAL '5' MINUTE), 'yyyy-MM-dd-HH-mm-ss:SSS'), --將TUMBLE_END轉(zhuǎn)為可視化的日期
DATE_FORMAT(TUMBLE_ROWTIME(ctime, INTERVAL '5' MINUTE), 'yyyy-MM-dd-HH-mm-ss:SSS'), --這里TUMBLE_ROWTIME為TUMBLE_END-1ms汁尺,一般用于后續(xù)窗口級(jí)聯(lián)聚合
categoryName,
SUM(price)
FROM sessionOrderTableRowtime
GROUP BY TUMBLE(ctime, INTERVAL '5' MINUTE), categoryName)


TUMBLEP ROCTIME語(yǔ)法示例:

INSERT INTO popwindowsink
(SELECT
COUNT(*),
TUMBLE_START(proc, INTERVAL '5' MINUTE),
DATE_FORMAT(TUMBLE_END(proc, INTERVAL '5' MINUTE), 'yyyy-MM-dd-HH-mm-ss:SSS'),
DATE_FORMAT(TUMBLE_PROCTIME(proc, INTERVAL '5' MINUTE), 'yyyy-MM-dd-HH-mm-ss:SSS'), --注意這里proc字段即Source DDL中指定的PROCTIME
categoryName,
SUM(price)
FROM sessionOrderTableRowtime
GROUP BY TUMBLE(proc, INTERVAL '5' MINUTE), categoryName)

ROWTIME與PROCTIME區(qū)別:

  • 在使用上: 主要是填入的ctime、proc關(guān)鍵字的區(qū)別多律,這兩個(gè)字段在Source DDL中指定方式不一樣.
  • 在實(shí)現(xiàn)原理上: ROWTIME模式痴突,根據(jù)ctime對(duì)應(yīng)的值,去確定窗口的start狼荞、end辽装;PROCTIME模式,在WindowOperator處理數(shù)據(jù)時(shí)相味,獲取本地系統(tǒng)時(shí)間拾积,去確定窗口的start、end.

由于生產(chǎn)系統(tǒng)中,主要使用ROWTIME來(lái)計(jì)算殷勘、聚合此再、統(tǒng)計(jì),PROCTIME一般用于測(cè)試或?qū)y(tǒng)計(jì)精度要求不高的場(chǎng)景玲销,本文后續(xù)都主要以ROWTIME進(jìn)行分析输拇。

  • Hop Window(滑動(dòng)窗口)

滑動(dòng)窗口Assigner將元素分配給多個(gè)固定長(zhǎng)度的窗口。類似于滾動(dòng)窗口分配程序贤斜,窗口的大小由窗口大小參數(shù)配置策吠。因此,如果滑動(dòng)窗口小于窗口大小瘩绒,則滑動(dòng)窗口可以重疊猴抹。在這種情況下,元素被分配到多個(gè)窗口锁荔。其實(shí)蟀给,滾動(dòng)窗口TUMBLE是滑動(dòng)窗口的一個(gè)特例。
例子阳堕,設(shè)置一個(gè)10分鐘長(zhǎng)度的窗口跋理,以5分鐘間隔滑動(dòng)。這樣恬总,每5分鐘就會(huì)出現(xiàn)一個(gè)窗口前普,其中包含最近10分鐘內(nèi)到達(dá)的事件,如下圖:

file

HOP ROWTIME語(yǔ)法示例:

INSERT INTO popwindowsink
(SELECT
COUNT(*),
HOP_START(ctime, INTERVAL '5' MINUTE,  INTERVAL '10' MINUTE),
DATE_FORMAT(HOP_END(ctime, INTERVAL '5' MINUTE,  INTERVAL '10' MINUTE), 'yyyy-MM-dd-HH-mm-ss:SSS'),
DATE_FORMAT(HOP_ROWTIME(ctime, INTERVAL '5' MINUTE,  INTERVAL '10' MINUTE), 'yyyy-MM-dd-HH-mm-ss:SSS'), --注意這里ctime字段即Source DDL中指定的ROWTIME
categoryName,
SUM(price)
FROM sessionOrderTableRowtime
GROUP BY HOP(ctime, INTERVAL '5' MINUTE,  INTERVAL '10' MINUTE), categoryName)

  • Session Window(會(huì)話窗口)
會(huì)話窗口Assigner根據(jù)活動(dòng)會(huì)話對(duì)元素進(jìn)行分組壹堰。與翻滾窗口和滑動(dòng)窗口相比拭卿,會(huì)話窗口不會(huì)重疊,也沒有固定的開始和結(jié)束時(shí)間贱纠。相反峻厚,會(huì)話窗口在一段時(shí)間內(nèi)不接收元素時(shí)關(guān)閉,即并巍,當(dāng)一段不活躍的間隙發(fā)生時(shí)目木,當(dāng)前會(huì)話關(guān)閉,隨后的元素被分配給新的會(huì)話懊渡。

file

SESSION ROWTIME語(yǔ)法示例:

INSERT INTO popwindowsink
(SELECT
COUNT(*),
SESSION_START(ctime, INTERVAL '5' MINUTE),
DATE_FORMAT(SESSION_END(ctime, INTERVAL '5' MINUTE, 'yyyy-MM-dd-HH-mm-ss:SSS'),
DATE_FORMAT(SESSION_ROWTIME(ctime, INTERVAL '5' MINUTE), 'yyyy-MM-dd-HH-mm-ss:SSS'), --注意這里ctime字段即Source DDL中指定的ROWTIME
categoryName,
SUM(price)
FROM sessionOrderTableRowtime
GROUP BY SESSION(ctime, INTERVAL '5' MINUTE), categoryName)

Window分類及整體流程

file
上圖內(nèi)部流程分析:

應(yīng)用層SQL:
1.1 window分類及配置,包括滑動(dòng)军拟、翻轉(zhuǎn)剃执、會(huì)話類型窗口
1.2 window時(shí)間類型配置,默認(rèn)待字段名的EventTime懈息,也可以通過(guò)PROCTIME()配置為ProcessingTime
Calcite解析引擎:
2.1 Calcite SQL解析肾档,包括邏輯、優(yōu)化、物理計(jì)劃和算子綁定(#translateToPlanInternal)怒见,在本文特指StreamExecGroupWindowAggregateRule和StreamExecGroupWindowAggregate物理計(jì)劃
WindowOperator算子創(chuàng)建相關(guān):
3.1 StreamExecGroupWindowAggregate#createWindowOperator創(chuàng)建算子
3.2 WindowAssigner的創(chuàng)建俗慈,根據(jù)輸入的數(shù)據(jù),和窗口類型遣耍,生成多個(gè)窗口
3.3 processElement()真實(shí)處理數(shù)據(jù)闺阱,包括聚合運(yùn)算,生成窗口舵变,更新緩存酣溃,提交數(shù)據(jù)等功能
3.4 Trigger根據(jù)數(shù)據(jù)或時(shí)間,來(lái)決定窗口觸發(fā)

創(chuàng)建WindowOperator算子

由于window語(yǔ)法主要是在group by語(yǔ)句中使用纪隙,calcite創(chuàng)建WindowOperator算子伴隨著聚合策略的實(shí)現(xiàn)赊豌,包括聚合規(guī)則匹配(StreamExecGroupWindowAggregateRule),以及生成聚合physical算子StreamExecGroupWindowAggregate兩個(gè)子流程:

file

上圖內(nèi)部流程分析:

a. StreamExecGroupWindowAggregateRule會(huì)對(duì)window進(jìn)行提前匹配绵咱,
生成的WindowEmitStrategy內(nèi)部具有:是否為EventTime表標(biāo)識(shí)碘饼、是否為SessionWindow、early fire和late fire配置悲伶、延遲毫秒數(shù)(窗口結(jié)束時(shí)間加上這個(gè)毫秒數(shù)即數(shù)據(jù)清理時(shí)間)
b. StreamExecGroupWindowAggregateRule會(huì)獲取聚合邏輯計(jì)劃中派昧,window配置的時(shí)間字段,記錄時(shí)間字段index信息拢切,window的觸發(fā)和清理都會(huì)用到這個(gè)時(shí)間
c. StreamExecGroupWindowAggregate入口即為translateToPlanInternal蒂萎,它的實(shí)現(xiàn)方式與spark比較類似,會(huì)先循環(huán)調(diào)用child子節(jié)點(diǎn)translateToPlan方法淮椰,生成inputtranform信息作為輸入
d.創(chuàng)建aggregateHandler是一個(gè)代碼生成的過(guò)程五慈,其生成的創(chuàng)建的class實(shí)現(xiàn)了accumulate、retract主穗、merge泻拦、update方法,這個(gè)handler最后也傳遞給了WindowOperater忽媒,處理數(shù)據(jù)時(shí)争拐,可以進(jìn)行聚合温圆、回撤并輸出最新數(shù)據(jù)給下游
e. StreamExecGroupWindowAggregate與window相關(guān)的最后一步就是調(diào)用#createWindowOperator創(chuàng)建算子仆抵,其內(nèi)部先創(chuàng)建了一個(gè)WindowOperatorBuilder,設(shè)置window類型抵赢、retract標(biāo)識(shí)闹瞧、trigger(window觸發(fā)條件)绑雄、聚合函數(shù)句柄等,最后創(chuàng)建WindowOperator

WindowOperator處理數(shù)據(jù)圖解

在上一小節(jié)奥邮,已經(jīng)完成了WindowOperator參數(shù)的設(shè)定万牺,并創(chuàng)建實(shí)例罗珍,接下來(lái)我們主要分析WindowOperator真實(shí)處理數(shù)據(jù)的流程(起點(diǎn)在WindowOperator#processElement方法):

file
processElement處理數(shù)據(jù)流程:

a、 獲取當(dāng)前record具有的事件時(shí)間脚粟,如果是Processing Time模式覆旱,從時(shí)間服務(wù)Service里面獲取時(shí)間即可
b、使用上一步獲取的時(shí)間核无,接著調(diào)用windowFunction.assignWindow生成窗口扣唱,其內(nèi)部實(shí)際上是調(diào)用各類型的WindowAssigner生成窗口,windowFunction有三大類厕宗,分別是Paned(滑動(dòng))画舌、Merge(會(huì)話)、General(前兩種以外的)已慢,WindowAssigner類型大致有5類曲聂,分別是Tumbling(翻轉(zhuǎn))、Sliding(滑動(dòng))佑惠、Session(會(huì)話)朋腋、CountTumbling 、CountSlide這幾類,根據(jù)輸入的一條數(shù)據(jù)和時(shí)間膜楷,可以生成1到多個(gè)窗口
c旭咽、接下來(lái)是遍歷涉及的窗口進(jìn)行聚合,包括從windowState獲取聚合前值赌厅、使用句柄進(jìn)行聚合穷绵、更新狀態(tài)至windowState,將當(dāng)前轉(zhuǎn)態(tài)
d特愿、上一步聚合完成后仲墨,就可以遍歷窗口,使用TriggerContext(其實(shí)就是不同類型窗口Trigger觸發(fā)器的代理)揍障,綜合early fire目养、late fire、水印時(shí)間與窗口結(jié)束時(shí)間毒嫡,綜合判斷是否觸發(fā)窗口寫出
e癌蚁、如果TriggerContext判斷出觸發(fā)條件為true,則調(diào)用emitWindowResult寫出兜畸,其內(nèi)部有retract判斷努释,更新當(dāng)前state及previous state,寫出數(shù)據(jù)等操作
f膳叨、如果TriggerContext判斷出觸發(fā)條件為false洽洁,則觸發(fā)需要注冊(cè)cleanupTimer,到達(dá)指定時(shí)間后,觸發(fā)onEventTime或onProcessingTime
g菲嘴、onEventTime或onProcessingTime功能十分類似,首先會(huì)觸發(fā)emitWindowResult提交結(jié)果,另外會(huì)判斷窗口結(jié)束時(shí)間+Lateness和當(dāng)前時(shí)間是否相等龄坪,相等則表示可以清除窗口數(shù)據(jù)昭雌、當(dāng)前state及previous state、窗口對(duì)應(yīng)trigger健田。

WindowOperator源碼調(diào)試

為了更直觀的理解Window內(nèi)部運(yùn)行原理烛卧,這里我們引入一個(gè)Flink源碼中已有的SQL Window測(cè)試用例,并進(jìn)行了簡(jiǎn)單的修改(即修改為使用HOP滑動(dòng)窗口)

class WindowJoinITCase{
  @Test
  def testRowTimeInnerJoinWithWindowAggregateOnFirstTime(): Unit = {
    val sqlQuery =
      """
        |SELECT t1.key, HOP_END(t1.rowtime, INTERVAL '4' SECOND, INTERVAL '20' SECOND), COUNT(t1.key)
        |FROM T1 AS t1
        |GROUP BY HOP(t1.rowtime, INTERVAL '4' SECOND, INTERVAL '20' SECOND), t1.key
        |""".stripMargin

    val data1 = new mutable.MutableList[(String, String, Long)]
    data1.+=(("A", "L-1", 1000L))
    data1.+=(("A", "L-2", 2000L))
    data1.+=(("A", "L-3", 3000L))
    //data1.+=(("B", "L-8", 2000L))
    data1.+=(("B", "L-4", 4000L)) 
    data1.+=(("C", "L-5", 2100L))
    data1.+=(("A", "L-6", 10000L)) 
    data1.+=(("A", "L-7", 13000L))

    val t1 = env.fromCollection(data1)
      .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2)
      .toTable(tEnv, 'key, 'id, 'rowtime)

    tEnv.registerTable("T1", t1)

    val sink = new TestingAppendSink
    val t_r = tEnv.sqlQuery(sqlQuery)
    val result = t_r.toAppendStream[Row]
    result.addSink(sink)
    env.execute()
  }
}

1妓局、StreamExecGroupWindowAggregate#createWindowOperator()創(chuàng)建算子

StreamExecGroupWindowAggregate#createWindowOperator()是創(chuàng)建WindowOperator算子的地方总放,對(duì)應(yīng)的代碼和注釋:

class StreamExecGroupWindowAggregate{
  private def createWindowOperator(
      config: TableConfig,
      aggsHandler: GeneratedNamespaceAggsHandleFunction[_],
      recordEqualiser: GeneratedRecordEqualiser,
      accTypes: Array[LogicalType],
      windowPropertyTypes: Array[LogicalType],
      aggValueTypes: Array[LogicalType],
      inputFields: Seq[LogicalType],
      timeIdx: Int): WindowOperator[_, _] = {

    val builder = WindowOperatorBuilder
      .builder()
      .withInputFields(inputFields.toArray)
    val timeZoneOffset = -config.getTimeZone.getOffset(Calendar.ZONE_OFFSET)
    
    // 設(shè)置WindowOperatorBuilder,最后通過(guò)Builder創(chuàng)建WindowOperator
    val newBuilder = window match {
      case TumblingGroupWindow(_, timeField, size) //Tumble PROCTIME模式好爬,內(nèi)部設(shè)置Assiger
          if isProctimeAttribute(timeField) && hasTimeIntervalType(size) =>
        builder.tumble(toDuration(size), timeZoneOffset).withProcessingTime()

      case TumblingGroupWindow(_, timeField, size) //Tumble ROWTIME模式局雄,內(nèi)部設(shè)置Assiger
          if isRowtimeAttribute(timeField) && hasTimeIntervalType(size) =>
        builder.tumble(toDuration(size), timeZoneOffset).withEventTime(timeIdx)

      case SlidingGroupWindow(_, timeField, size, slide) //HOP PROCTIME模式,內(nèi)部設(shè)置Assiger
          if isProctimeAttribute(timeField) && hasTimeIntervalType(size) =>
        builder.sliding(toDuration(size), toDuration(slide), timeZoneOffset)
          .withProcessingTime()
       .....
      case SessionGroupWindow(_, timeField, gap)
          if isRowtimeAttribute(timeField) =>
        builder.session(toDuration(gap)).withEventTime(timeIdx)
    }

    // Retraction和Trigger設(shè)置
    //默認(rèn)是no retract和EventTime.afterEndOfWindow
    if (emitStrategy.produceUpdates) {
      // mark this operator will send retraction and set new trigger
      newBuilder
        .withSendRetraction()
        .triggering(emitStrategy.getTrigger)
    }

    newBuilder
      .aggregate(aggsHandler, recordEqualiser, accTypes, aggValueTypes, windowPropertyTypes)
      .withAllowedLateness(Duration.ofMillis(emitStrategy.getAllowLateness))
      .build()
  }
}


2存炮、WindowOperator#processElement()處理數(shù)據(jù)炬搭,注冊(cè)Timer


public class WindowOperator{
    public void processElement(StreamRecord<BaseRow> record) throws Exception {
        BaseRow inputRow = record.getValue();
        long timestamp;
        // 獲取時(shí)間戳(數(shù)據(jù)時(shí)間或系統(tǒng)時(shí)間),這個(gè)時(shí)間是后續(xù)邏輯劃分窗口的依據(jù)
        // 例如獲取的timestamp為10000L
        if (windowAssigner.isEventTime()) {
            timestamp = inputRow.getLong(rowtimeIndex);
        } else {
            timestamp = internalTimerService.currentProcessingTime();
        }

        // 計(jì)算當(dāng)前數(shù)據(jù)所屬于的窗口穆桂,注意滑動(dòng)窗口這里計(jì)算出來(lái)也只有一個(gè)affected窗口(見調(diào)試數(shù)據(jù))宫盔,在這個(gè)窗口內(nèi)進(jìn)行聚合
        Collection<W> affectedWindows = windowFunction.assignStateNamespace(inputRow, timestamp);
        boolean isElementDropped = true;
        for (W window : affectedWindows) {
            isElementDropped = false;
            // 設(shè)置ValueState命名空間,例如TimeWindow{start=8000, end=12000}
            windowState.setCurrentNamespace(window);
            // 從windowState獲取上次聚合值
            BaseRow acc = windowState.value();
            if (acc == null) {
                acc = windowAggregator.createAccumulators();
            }
            windowAggregator.setAccumulators(window, acc);
            // 默認(rèn)進(jìn)行聚合
            if (BaseRowUtil.isAccumulateMsg(inputRow)) {
                windowAggregator.accumulate(inputRow);
            } else {
                windowAggregator.retract(inputRow);
            }
            acc = windowAggregator.getAccumulators();
            // 更新TimeWindow{start=8000, end=12000}對(duì)應(yīng)聚合值
            windowState.update(acc);
        }

        // 對(duì)應(yīng)的實(shí)際窗口享完,例如輸入Timestamp為10000L灼芭,且執(zhí)行HOP(t1.rowtime, INTERVAL '4' SECOND, INTERVAL '20' SECOND),拆分出實(shí)際的窗口為:
        // TimeWindow{start=-8000, end=12000}
        // TimeWindow{start=-4000, end=16000}
        // TimeWindow{start=0, end=20000}
        // TimeWindow{start=4000, end=24000}
        // TimeWindow{start=8000, end=28000}
        Collection<W> actualWindows = windowFunction.assignActualWindows(inputRow, timestamp);
        for (W window : actualWindows) {
            isElementDropped = false;
            triggerContext.window = window;
            // 判斷窗口是否立即觸發(fā)般又,例如earliy fire模式彼绷,默認(rèn)這里是不觸發(fā)的,交給onEventTime()或onProcessingTime()來(lái)觸發(fā)
            boolean triggerResult = triggerContext.onElement(inputRow, timestamp);
            if (triggerResult) {
                emitWindowResult(window);
            }
            // 注冊(cè)清理時(shí)間倒源,根據(jù)時(shí)間模式苛预,分別對(duì)應(yīng)到Event Time對(duì)應(yīng)Timer或Processing Time對(duì)應(yīng)Timer
            // Event Time對(duì)應(yīng)Timer通過(guò)全局watermark來(lái)觸發(fā),實(shí)現(xiàn)代碼在InternalTimerServiceImpl#advanceWatermark()
            // watermark是一個(gè)遞增的邏輯笋熬,后面代碼解析
            registerCleanupTimer(window);
        }

        if (isElementDropped) {
            // markEvent will increase numLateRecordsDropped
            lateRecordsDroppedRate.markEvent();
        }
    }
}

運(yùn)行數(shù)據(jù):

file

3热某、Timer觸發(fā)
I、InternalTimerServiceImpl#advanceWatermark()

WindowOperator#onEventTime()的調(diào)用前胳螟,可以先看其上層調(diào)用:InternalTimerServiceImpl#advanceWatermark()

file

當(dāng)獲取的watermark為9999L時(shí)昔馋,把eventTimeTimerQueue隊(duì)列中所有小于這個(gè)值的timer poll出來(lái),調(diào)用WindowOperator.onEnventTime(timer)

II糖耸、WindwOperator#onEventTime()

WindwOperator#onEventTime()方法比較清晰秘遏,主要是window的觸發(fā)和window的清理兩段邏輯:


public class WindowOperator{
    public void onEventTime(InternalTimer<K, W> timer) throws Exception {
        setCurrentKey(timer.getKey());

        triggerContext.window = timer.getNamespace();
        if (triggerContext.onEventTime(timer.getTimestamp())) {
            // fire
            emitWindowResult(triggerContext.window);
        }

        if (windowAssigner.isEventTime()) {
            windowFunction.cleanWindowIfNeeded(triggerContext.window, timer.getTimestamp());
        }
    }
}


III、emitWindowResult()提交結(jié)果

emitWindowResult()重點(diǎn)關(guān)注下其第一行代碼:

BaseRow aggResult = windowFunction.getWindowAggregationResult(window);
這個(gè)表示根據(jù)具體的TimeWindow{start=4000, end=24000}嘉竟,去獲取聚合數(shù)據(jù)邦危,如果是滑動(dòng)窗口洋侨,需要將4000, 8000 ,12000,16000 , 20000, 24000這幾段affect窗口里面的聚合值合并起來(lái)倦蚪,內(nèi)部邏輯:


public class PanedWindowProcessFunction{
    public BaseRow getWindowAggregationResult(W window) throws Exception {
        Iterable<W> panes = windowAssigner.splitIntoPanes(window);
        BaseRow acc = windowAggregator.createAccumulators();
        // null namespace means use heap data views
        windowAggregator.setAccumulators(null, acc);
        for (W pane : panes) {
            BaseRow paneAcc = ctx.getWindowAccumulators(pane);
            if (paneAcc != null) {
                windowAggregator.merge(pane, paneAcc);
            }
        }
        return windowAggregator.getValue(window);
    }
}


file

Emit(Trigger)觸發(fā)器

  • 配置方式指定Trigger:Flink1.9.0目前支持通過(guò)TableConifg配置earlyFireInterval希坚、lateFireInterval毫秒數(shù),來(lái)指定窗口結(jié)束之前陵且、窗口結(jié)束之后的觸發(fā)策略(默認(rèn)是watermark超過(guò)窗口結(jié)束后觸發(fā)一次)裁僧,策略的解析在WindowEmitStrategy,在StreamExecGroupWindowAggregateRule就會(huì)創(chuàng)建和解析這個(gè)策略
  • SQL方式指定Trigger:Flink1.9.0代碼中calcite部分已有SqlEmit相關(guān)的實(shí)現(xiàn)慕购,后續(xù)可以支持SQL 語(yǔ)句(INSERT INTO)中配置EMIT觸發(fā)器

本文Emit和Trigger都是觸發(fā)器這一個(gè)概念聊疲,只是使用的方式不一樣

1、Emit策略
Emit 策略是指在Flink SQL 中沪悲,query的輸出策略(如能忍受的延遲)可能在不同的場(chǎng)景有不同的需求获洲,而這部分需求,傳統(tǒng)的 ANSI SQL 并沒有對(duì)應(yīng)的語(yǔ)法支持可训。比如用戶需求:1小時(shí)的時(shí)間窗口昌妹,窗口觸發(fā)之前希望每分鐘都能看到最新的結(jié)果,窗口觸發(fā)之后希望不丟失遲到一天內(nèi)的數(shù)據(jù)握截。針對(duì)這類需求飞崖,抽象出了EMIT語(yǔ)法,并擴(kuò)展到了SQL語(yǔ)法谨胞。

2固歪、用途
EMIT語(yǔ)法的用途目前總結(jié)起來(lái)主要提供了:控制延遲、數(shù)據(jù)精確性胯努,兩方面的功能牢裳。

  • 控制延遲。針對(duì)大窗口叶沛,設(shè)置窗口觸發(fā)之前的EMIT輸出頻率蒲讯,減少用戶看到結(jié)果的延遲(WITH| WITHOUT DELAY)。
  • 數(shù)據(jù)精確性灰署。不丟棄窗口觸發(fā)之后的遲到的數(shù)據(jù)判帮,修正輸出結(jié)果(minIdleStateRetentionTime,在WindowEmitStrategy中生成allowLateness)溉箕。

在選擇EMIT策略時(shí)晦墙,還需要與處理開銷進(jìn)行權(quán)衡。因?yàn)樵降偷妮敵鲅舆t肴茄、越高的數(shù)據(jù)精確性晌畅,都會(huì)帶來(lái)越高的計(jì)算開銷。

3寡痰、語(yǔ)法
EMIT 語(yǔ)法是用來(lái)定義輸出的策略抗楔,即是定義在輸出(INSERT INTO)上的動(dòng)作棋凳。當(dāng)未配置時(shí),保持原有默認(rèn)行為谓谦,即 window 只在 watermark 觸發(fā)時(shí) EMIT 一個(gè)結(jié)果贫橙。

語(yǔ)法:
INSERT INTO tableName
query
EMIT strategy [, strategy]*

strategy ::= {WITH DELAY timeInterval | WITHOUT DELAY}
[BEFORE WATERMARK |AFTER WATERMARK]

timeInterval ::=‘string’ timeUnit

WITH DELAY:聲明能忍受的結(jié)果延遲贪婉,即按指定 interval 進(jìn)行間隔輸出反粥。
WITHOUT DELAY:聲明不忍受延遲,即每來(lái)一條數(shù)據(jù)就進(jìn)行輸出疲迂。
BEFORE WATERMARK:窗口結(jié)束之前的策略配置才顿,即watermark 觸發(fā)之前。
AFTER WATERMARK:窗口結(jié)束之后的策略配置尤蒿,即watermark 觸發(fā)之后郑气。
注:

  • 其中 strategy可以定義多個(gè),同時(shí)定義before和after的策略腰池。 但不能同時(shí)定義兩個(gè) before 或 兩個(gè)after 的策略尾组。
  • 若配置了AFTER WATERMARK 策略,需要顯式地在TableConfig中配置minIdleStateRetentionTime標(biāo)識(shí)能忍受的最大遲到時(shí)間示弓。
  • minIdleStateRetentionTime在window中只影響窗口何時(shí)清除讳侨,不直接影響窗口何時(shí)觸發(fā), 例如配置為3600000奏属,最多容忍1小時(shí)的遲到數(shù)據(jù)跨跨,超過(guò)這個(gè)時(shí)間的數(shù)據(jù)會(huì)直接丟棄

4、示例
如果我們已經(jīng)有一個(gè)TUMBLE(ctime, INTERVAL ‘1’ HOUR)的窗口囱皿,tumble_window 的輸出是需要等到一小時(shí)結(jié)束才能看到結(jié)果勇婴,我們希望能盡早能看到窗口的結(jié)果(即使是不完整的結(jié)果)。例如嘱腥,我們希望每分鐘看到最新的窗口結(jié)果:
INSERT INTO result
SELECT * FROM tumble_window
EMIT WITH DELAY ‘1’ MINUTE BEFORE WATERMARK – 窗口結(jié)束之前耕渴,每隔1分鐘輸出一次更新結(jié)果

tumble_window 會(huì)忽略并丟棄窗口結(jié)束后到達(dá)的數(shù)據(jù),而這部分?jǐn)?shù)據(jù)對(duì)我們來(lái)說(shuō)很重要齿兔,希望能統(tǒng)計(jì)進(jìn)最終的結(jié)果里橱脸。而且我們知道我們的遲到數(shù)據(jù)不會(huì)太多,且遲到時(shí)間不會(huì)超過(guò)一天以上愧驱,并且希望收到遲到的數(shù)據(jù)立刻就更新結(jié)果:
INSERT INTO result
SELECT * FROM tumble_window
EMIT WITH DELAY ‘1’ MINUTE BEFORE WATERMARK,
WITHOUT DELAY AFTER WATERMARK --窗口結(jié)束之后慰技,每條到達(dá)的數(shù)據(jù)都輸出

tEnv.getConfig.setIdleStateRetentionTime(Time.days(1), Time.days(2))//min、max组砚,只有Time.days(1)這個(gè)參數(shù)直接對(duì)window生效

補(bǔ)充一下WITH DELAY '1’這種配置的周期觸發(fā)策略(即DELAY大于0)吻商,最后都是由ProcessingTime系統(tǒng)時(shí)間觸發(fā):


class WindowEmitStrategy{
  private def createTriggerFromInterval(
      enableDelayEmit: Boolean,
      interval: Long): Option[Trigger[TimeWindow]] = {
    if (!enableDelayEmit) {
      None
    } else {
      if (interval > 0) {
       // 系統(tǒng)時(shí)間觸發(fā),小于wm的所有timer都執(zhí)行onProcessingTime()
        Some(ProcessingTimeTriggers.every(Duration.ofMillis(interval)))
      } else {
       // 為0則每條都觸發(fā)
        Some(ElementTriggers.every())
      }
    }
  }
}


5糟红、Trigger類和結(jié)構(gòu)關(guān)系
在源碼中艾帐,Window Trigger的實(shí)現(xiàn)子類有10個(gè)左右乌叶,需要結(jié)合上一個(gè)小節(jié)的EMIT SQL能更容易理清他們之間的關(guān)系,這里簡(jiǎn)單介紹下:

file
  • AfterEndOfWindow:這個(gè)就是沒配置任何EMIT策略時(shí)柒爸,默認(rèn)的EvenTime准浴、ProcTime
  • Window觸發(fā)策略(即窗口結(jié)束后觸發(fā)一次)
  • EveryElement:即delay=0,在processElement()時(shí)直接觸發(fā)捎稚,無(wú)論是在窗口結(jié)束之前或者窗口結(jié)束之后都觸發(fā)乐横,且不再注冊(cè)timer
  • AfterEndOfWindowNoLate:對(duì)應(yīng)EMIT WITHOUT DELAY AFTER WATERMARK,窗口結(jié)束之前不輸出今野,窗口結(jié)束之后無(wú)延遲輸出
  • AfterFirstElementPeriodic:對(duì)應(yīng)WITH DELAY ‘1’ MINUTE BEFORE| AFTER WATERMARK葡公,即按系統(tǒng)時(shí)間周期執(zhí)行,由ProcessingTime系統(tǒng)時(shí)間周期觸發(fā)

關(guān)注我的公眾號(hào)条霜,后臺(tái)回復(fù)【JAVAPDF】獲取200頁(yè)面試題催什!
5萬(wàn)人關(guān)注的大數(shù)據(jù)成神之路,不來(lái)了解一下嗎宰睡?
5萬(wàn)人關(guān)注的大數(shù)據(jù)成神之路蒲凶,真的不來(lái)了解一下嗎?
5萬(wàn)人關(guān)注的大數(shù)據(jù)成神之路拆内,確定真的不來(lái)了解一下嗎旋圆?

歡迎您關(guān)注《大數(shù)據(jù)成神之路》
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市矛纹,隨后出現(xiàn)的幾起案子臂聋,更是在濱河造成了極大的恐慌,老刑警劉巖或南,帶你破解...
    沈念sama閱讀 221,695評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件孩等,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡采够,警方通過(guò)查閱死者的電腦和手機(jī)肄方,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)蹬癌,“玉大人权她,你說(shuō)我怎么就攤上這事∈判剑” “怎么了隅要?”我有些...
    開封第一講書人閱讀 168,130評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)董济。 經(jīng)常有香客問(wèn)我步清,道長(zhǎng),這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,648評(píng)論 1 297
  • 正文 為了忘掉前任廓啊,我火速辦了婚禮欢搜,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘谴轮。我一直安慰自己炒瘟,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,655評(píng)論 6 397
  • 文/花漫 我一把揭開白布第步。 她就那樣靜靜地躺著疮装,像睡著了一般。 火紅的嫁衣襯著肌膚如雪雌续。 梳的紋絲不亂的頭發(fā)上斩个,一...
    開封第一講書人閱讀 52,268評(píng)論 1 309
  • 那天,我揣著相機(jī)與錄音驯杜,去河邊找鬼。 笑死做个,一個(gè)胖子當(dāng)著我的面吹牛鸽心,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播居暖,決...
    沈念sama閱讀 40,835評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼顽频,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了太闺?” 一聲冷哼從身側(cè)響起糯景,我...
    開封第一講書人閱讀 39,740評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎省骂,沒想到半個(gè)月后蟀淮,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,286評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡钞澳,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,375評(píng)論 3 340
  • 正文 我和宋清朗相戀三年怠惶,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片轧粟。...
    茶點(diǎn)故事閱讀 40,505評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖兰吟,靈堂內(nèi)的尸體忽然破棺而出通惫,到底是詐尸還是另有隱情,我是刑警寧澤混蔼,帶...
    沈念sama閱讀 36,185評(píng)論 5 350
  • 正文 年R本政府宣布履腋,位于F島的核電站,受9級(jí)特大地震影響拄丰,放射性物質(zhì)發(fā)生泄漏府树。R本人自食惡果不足惜俐末,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,873評(píng)論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望奄侠。 院中可真熱鬧卓箫,春花似錦、人聲如沸垄潮。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)弯洗。三九已至旅急,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間牡整,已是汗流浹背藐吮。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留逃贝,地道東北人谣辞。 一個(gè)月前我還...
    沈念sama閱讀 48,921評(píng)論 3 376
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像沐扳,于是被迫代替她去往敵國(guó)和親泥从。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,515評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容