前言
???????? 時間語義扔仓,要配合窗口操作才能發(fā)揮作用翘簇。最主要的用途版保,當(dāng)然就是開窗口彻犁、根據(jù)時間段做計算了汞幢。下面我們就來看看 Table API 和 SQL 中溉瓶,怎么利用時間字段做窗口操作堰酿。在 Table API 和 SQL 中触创,主要有兩種窗口:Group Windows 和 Over Windows(時間語義的文章推薦)
一哼绑、分組窗口(Group Windows)
???????? 分組窗口(Group Windows)會根據(jù)時間或行計數(shù)間隔抖韩,將行聚合到有限的組(Group)中茂浮,并對每個組的數(shù)據(jù)執(zhí)行一次聚合函數(shù)席揽。???????? Table API 中的 Group Windows 都是使用.window
(w:GroupWindow)子句定義的幌羞,并且必須由 as 子句指定一個別名。為了按窗口對表進(jìn)行分組熊痴,窗口的別名必須在 group by 子句中果善,像常規(guī)的分組字段一樣引用。例子:
val?table?=?input
.window([w:?GroupWindow]?as?'w)
.groupBy('w,?'a)
.select('a,?'w.start,?'w.end,?'w.rowtime,?'b.count)
???????? Table API 提供了一組具有特定語義的預(yù)定義 Window 類,這些類會被轉(zhuǎn)換為底層DataStream 或 DataSet 的窗口操作惜论。?????????
? ? ? ? Table API 支持的窗口定義馆类,和我們熟悉的一樣乾巧,主要也是三種:滾動(Tumbling)
沟于、滑動(Sliding
和 會話(Session)
旷太。
1.1 ?滾動窗口
? ? ? ?滾動窗口(Tumbling windows)要用 Tumble 類來定義供璧,另外還有三個方法:
- over:定義窗口長度
- on:用來分組(按時間間隔)或者排序(按行數(shù))的時間字段
- as:別名睡毒,必須出現(xiàn)在后面的 groupBy 中
實(shí)現(xiàn)案例
- 需求????????設(shè)置
滾動窗口為10秒鐘
統(tǒng)計id
出現(xiàn)的次數(shù)冗栗。 - 數(shù)據(jù)準(zhǔn)備
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718206,32
sensor_1,1547718208,36.2
sensor_1,1547718210,29.7
sensor_1,1547718213,30.9
- 代碼實(shí)現(xiàn)
package?windows
import?org.apache.flink.streaming.api.TimeCharacteristic
import?org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import?org.apache.flink.streaming.api.scala._
import?org.apache.flink.streaming.api.windowing.time.Time
import?org.apache.flink.table.api.scala._
import?org.apache.flink.table.api.{EnvironmentSettings,?Table,?Tumble}
import?org.apache.flink.types.Row
/**
?*?@Package?Windows
?*?@File :FlinkSQLTumBlingTie.java
?*?@author?大數(shù)據(jù)老哥
?*?@date?2020/12/25?21:58
?*?@version?V1.0
?*??????????設(shè)置滾動窗口?*/
object?FlinkSQLTumBlingTie?{
??def?main(args:?Array[String]):?Unit?=?{
????val?env?=?StreamExecutionEnvironment.getExecutionEnvironment
????env.setParallelism(1)
????env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
????val?settings?=?EnvironmentSettings.newInstance()
??????.useBlinkPlanner()
??????.inStreamingMode()
??????.build()
????val?tableEnv?=?StreamTableEnvironment.create(env,?settings)
????//?讀取數(shù)據(jù)
????val?inputPath?=?"./data/sensor.txt"
????val?inputStream?=?env.readTextFile(inputPath)
???
????//?先轉(zhuǎn)換成樣例類類型(簡單轉(zhuǎn)換操作)
????val?dataStream?=?inputStream
??????.map(data?=>?{
????????val?arr?=?data.split(",")
????????SensorReading(arr(0),?arr(1).toLong,?arr(2).toDouble)
??????})
??????.assignTimestampsAndWatermarks(new?BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1))?{
????????override?def?extractTimestamp(element:?SensorReading):?Long?=?element.timestamp?*?1000L
??????})
????val?sensorTable:?Table?=?tableEnv.fromDataStream(dataStream,?'id,?'temperature,?'timestamp.rowtime?as?'ts)
????//?注冊表
????tableEnv.createTemporaryView("sensor",?sensorTable)
????//?table?實(shí)現(xiàn)
????val?resultTable?=?sensorTable
??????.window(Tumble?over?10.seconds?on?'ts?as?'tw)?//?每10秒統(tǒng)計一次,滾動時間窗口
??????.groupBy('id,?'tw)
??????.select('id,?'id.count,?'tw.end)
????//sql?實(shí)現(xiàn)
????val?sqlTable?=?tableEnv.sqlQuery(
??????"""
????????|select
????????|id,
????????|count(id)?,
????????|tumble_end(ts,interval?'10'?second)
????????|from?sensor
????????|group?by
????????|id,
????????|tumble(ts,interval?'10'?second)????????|""".stripMargin)
????/***
?????*?.window(Tumble?over?10.minutes?on?'rowtime?as?'w)?(事件時間字段?rowtime)
?????*?.window(Tumble?over?10.minutes?on?'proctime?as?'w)(處理時間字段?proctime)
?????*?.window(Tumble?over?10.minutes?on?'proctime?as?'w)?(類似于計數(shù)窗口棕洋,按處理時間排序乒融,10?行一組)?????*/
????resultTable.toAppendStream[Row].print("talbe")
????sqlTable.toRetractStream[Row].print("sqlTable")
????
????env.execute("FlinkSQLTumBlingTie")
??}
??case?class?SensorReading(id:?String,?timestamp:?Long,?temperature:?Double)
}
1.2 滑動窗口
滑動窗口(Sliding windows)要用 Slide 類來定義,另外還有四個方法:
- over:定義窗口長度
- every:定義滑動步長
- on:用來分組(按時間間隔)或者排序(按行數(shù))的時間字段
- as:別名奢驯,必須出現(xiàn)在后面的 groupBy 中
實(shí)現(xiàn)案例
- 需求描述???????? ??設(shè)置
窗口大小為10秒鐘設(shè)置滑動距離為5秒鐘
瘪阁,統(tǒng)計id
的出現(xiàn)的次數(shù)管跺。 - 數(shù)據(jù)準(zhǔn)備
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718206,32
sensor_1,1547718208,36.2
sensor_1,1547718210,29.7
sensor_1,1547718213,30.9
- 實(shí)現(xiàn)代碼
package?windows
import?org.apache.flink.streaming.api.TimeCharacteristic
import?org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import?org.apache.flink.streaming.api.scala._
import?org.apache.flink.streaming.api.windowing.time.Time
import?org.apache.flink.table.api.{EnvironmentSettings,?Slide,?Table}
import?org.apache.flink.table.api.scala._
import?org.apache.flink.types.Row
import?windows.FlinkSQLTumBlingTie.SensorReading
/**
?*?@Package?windows
?*?@File :FlinkSQLSlideTime.java
?*?@author?大數(shù)據(jù)老哥
?*?@date?2020/12/27?22:19
?*?@version?V1.0
?*??????????滑動窗口?*/
object?FlinkSQLSlideTime?{
??def?main(args:?Array[String]):?Unit?=?{
????//構(gòu)建運(yùn)行環(huán)境
????val?env?=?StreamExecutionEnvironment.getExecutionEnvironment
????env.setParallelism(1)?//?設(shè)置分區(qū)為1?方便后面測試
????env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)?//事件時間
????val?settings?=?EnvironmentSettings.newInstance()
??????.useBlinkPlanner()
??????.inStreamingMode()
??????.build()
????//?創(chuàng)建表env
????val?tableEnv?=?StreamTableEnvironment.create(env,?settings)
????//?讀取數(shù)據(jù)
????val?inputPath?=?"./data/sensor.txt"
????val?inputStream?=?env.readTextFile(inputPath)
????//?先轉(zhuǎn)換成樣例類類型(簡單轉(zhuǎn)換操作)
????val?dataStream?=?inputStream
??????.map(data?=>?{
????????val?arr?=?data.split(",")
????????SensorReading(arr(0),?arr(1).toLong,?arr(2).toDouble)
??????})
??????.assignTimestampsAndWatermarks(new?BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1))?{
????????override?def?extractTimestamp(element:?SensorReading):?Long?=?element.timestamp?*?1000L
??????})
????val?sensorTable:?Table?=?tableEnv.fromDataStream(dataStream,?'id,?'temperature,?'timestamp.rowtime?as?'ts)
????//?注冊表
????tableEnv.createTemporaryView("sensor",?sensorTable)
????//?table?API?實(shí)現(xiàn)
????val?tableApi?=?sensorTable.window(Slide?over?10.seconds?every?5.seconds?on?'ts?as?'w)
??????.groupBy('w,?'id)
??????.select('id,?'id.count,?'w.end)
????val?tableSql?=?tableEnv.sqlQuery(
??????"""
????????|select
????????|id,
????????|count(id),
????????|HOP_END(ts,INTERVAL?'10'?SECOND,?INTERVAL?'5'?SECOND?)as?w
????????|from?sensor
????????|group?by
????????|HOP(ts,INTERVAL?'10'?SECOND,?INTERVAL?'5'?SECOND),id????????|""".stripMargin)
????tableApi.toAppendStream[Row].print("tableApi")
????tableSql.toAppendStream[Row].print("tableSql")
????/**
.window(Slide?over?10.minutes?every?5.minutes?on?'rowtime?as?'w)?(事件時間字段?rowtime)
.window(Slide?over?10.minutes?every?5.minutes?on?'proctime?as?'w)?(處理時間字段?proctime)?
.window(Slide?over?10.rows?every?5.rows?on?'proctime?as?'w)?(類似于計數(shù)窗口,按處理時間排序艇拍,10?行一組)???**/
????env.execute("FlinkSQLSlideTime")
??}
}
1.3 會話窗口
? ? ? ?會話窗口(Session windows)要用 Session 類來定義卸夕,另外還有三個方法:
- withGap:會話時間間隔
- on:用來分組(按時間間隔)或者排序(按行數(shù))的時間字段
- as:別名娇哆,必須出現(xiàn)在后面的 groupBy 中實(shí)現(xiàn)案例
- 需求描述???????? ?設(shè)置一個
session 為10秒鐘
統(tǒng)計id
的個數(shù) - 準(zhǔn)備數(shù)據(jù)
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718206,32
sensor_1,1547718208,36.2
sensor_1,1547718210,29.7
sensor_1,1547718213,30.9
- 編寫代碼
package?windows
import?org.apache.flink.streaming.api.TimeCharacteristic
import?org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import?org.apache.flink.streaming.api.scala._
import?org.apache.flink.streaming.api.windowing.time.Time
import?org.apache.flink.table.api.{EnvironmentSettings,?Session,?Table}
import?org.apache.flink.table.api.scala._
import?org.apache.flink.types.Row
import?windows.FlinkSQLTumBlingTie.SensorReading
/**
?*?@Package?windows
?*?@File :FlinkSqlSessionTime.java
?*?@author?大數(shù)據(jù)老哥
?*?@date?2020/12/27?22:52
?*?@version?V1.0?*/
object?FlinkSqlSessionTime?{
??def?main(args:?Array[String]):?Unit?=?{
????//構(gòu)建運(yùn)行環(huán)境
????val?env?=?StreamExecutionEnvironment.getExecutionEnvironment
????env.setParallelism(1)?//?設(shè)置分區(qū)為1?方便后面測試
????env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)?//事件時間
????val?settings?=?EnvironmentSettings.newInstance()
??????.useBlinkPlanner()
??????.inStreamingMode()
??????.build()
????//?創(chuàng)建表env
????val?tableEnv?=?StreamTableEnvironment.create(env,?settings)
????//?讀取數(shù)據(jù)
????val?inputPath?=?"./data/sensor.txt"
????val?inputStream?=?env.readTextFile(inputPath)
????//?先轉(zhuǎn)換成樣例類類型(簡單轉(zhuǎn)換操作)
????val?dataStream?=?inputStream
??????.map(data?=>?{
????????val?arr?=?data.split(",")
????????SensorReading(arr(0),?arr(1).toLong,?arr(2).toDouble)
??????})
??????.assignTimestampsAndWatermarks(new?BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1))?{
????????override?def?extractTimestamp(element:?SensorReading):?Long?=?element.timestamp?*?1000L
??????})
????val?sensorTable:?Table?=?tableEnv.fromDataStream(dataStream,?'id,?'temperature,?'timestamp.rowtime?as?'ts)
????//?注冊表
????tableEnv.createTemporaryView("sensor",?sensorTable)
????val?tableApi?=?sensorTable.
??????window(Session?withGap?10.seconds?on?'ts?as?'w)
??????.groupBy('id,?'w)
??????.select('id,?'id.count,?'w.end)
????val?tableSQL?=?tableEnv.sqlQuery(
??????"""
????????|SELECT
????????|id,
????????|COUNT(id),
????????|SESSION_END(ts,?INTERVAL?'10'?SECOND)?AS?w
????????|FROM?sensor
????????|GROUP?BY
????????|id,
????????|SESSION(ts,?INTERVAL?'10'?SECOND)????????|""".stripMargin)
????tableApi.toAppendStream[Row].print("tableApi")
????tableSQL.toAppendStream[Row].print("tableSQL")
????/**
?????*?.window(Session?withGap?10.minutes?on?'rowtime?as?'w)?事件時間字段?rowtime)
?????*?.window(Session?withGap?10.minutes?on?'proctime?as?'w)?處理時間字段?proctime)?????*/
????env.execute("FlinkSqlSessionTime")
??}
}
二蒙秒、 Over Windows
???????? Over window 聚合是標(biāo)準(zhǔn) SQL 中已有的(Over 子句)晕讲,可以在查詢的 SELECT 子句中定義。Over window 聚合弄息,會針對每個輸入行摹量,計算相鄰行范圍內(nèi)的聚合。Over windows使用.window(w:overwindows*)子句定義凝果,并在 select()方法中通過別名來引用器净。例子:
val?table?=?input
.window([w:?OverWindow]?as?'w)
.select('a,?'b.sum?over?'w,?'c.min?over?'w)
???????? Table API 提供了 Over 類山害,來配置 Over 窗口的屬性沿量。可以在事件時間或處理時間,以及指定為時間間隔佛掖、或行計數(shù)的范圍內(nèi)芥被,定義 Over windows坐榆。?????????
? ? ? ? 無界的 over window 是使用常量指定的。也就是說匹中,時間間隔要指定 UNBOUNDED_RANGE
顶捷,或者行計數(shù)間隔要指定 UNBOUNDED_ROW
服赎。而有界的 over window 是用間隔的大小指定的交播。
2.1 無界的 over window
//?無界的事件時間?over?window?(時間字段?"rowtime")
.window(Over?partitionBy?'a?orderBy?'rowtime?preceding?UNBOUNDED_RANGE?as?'w)
//無界的處理時間?over?window?(時間字段"proctime")
.window(Over?partitionBy?'a?orderBy?'proctime?preceding?UNBOUNDED_RANGE?as?'w)
//?無界的事件時間?Row-count?over?window?(時間字段?"rowtime")
.window(Over?partitionBy?'a?orderBy?'rowtime?preceding?UNBOUNDED_ROW?as?'w)
//無界的處理時間?Row-count?over?window?(時間字段?"rowtime")
.window(Over?partitionBy?'a?orderBy?'proctime?preceding?UNBOUNDED_ROW?as?'w)
2.2 有界的 over window
//?有界的事件時間?over?window?(時間字段?"rowtime"秦士,之前?1?分鐘)
.window(Over?partitionBy?'a?orderBy?'rowtime?preceding?1.minutes?as?'w)
//?有界的處理時間?over?window?(時間字段?"rowtime",之前?1?分鐘)
.window(Over?partitionBy?'a?orderBy?'proctime?preceding?1.minutes?as?'w)
//?有界的事件時間?Row-count?over?window?(時間字段?"rowtime"乏梁,之前?10?行)
.window(Over?partitionBy?'a?orderBy?'rowtime?preceding?10.rows?as?'w)
//?有界的處理時間?Row-count?over?window?(時間字段?"rowtime"遇骑,之前?10?行)
.window(Over?partitionBy?'a?orderBy?'proctime?preceding?10.rows?as?'w)
2.3 代碼練習(xí)
???????? 我們可以綜合學(xué)習(xí)過的內(nèi)容揖曾,用一段完整的代碼實(shí)現(xiàn)一個具體的需求炭剪。例如,統(tǒng)計每個sensor每條數(shù)據(jù)媒鼓,與之前兩行數(shù)據(jù)的平均溫度绿鸣。
數(shù)據(jù)準(zhǔn)備
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718206,32
sensor_1,1547718208,36.2
sensor_1,1547718210,29.7
sensor_1,1547718213,30.9
代碼分析:
package?windows
import?org.apache.flink.streaming.api.TimeCharacteristic
import?org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import?org.apache.flink.streaming.api.scala._
import?org.apache.flink.streaming.api.windowing.time.Time
import?org.apache.flink.table.api.{EnvironmentSettings,?Over,?Tumble}
import?org.apache.flink.table.api.scala._
import?org.apache.flink.types.Row
/**
*?@Package?windows
*?@File :FlinkSqlTumBlingOverTime.java
*?@author?大數(shù)據(jù)老哥
*?@date?2020/12/28?21:45
*?@version?V1.0*/
object?FlinkSqlTumBlingOverTime?{
?def?main(args:?Array[String]):?Unit?=?{
???//?構(gòu)建運(yùn)行環(huán)境
???val?env?=?StreamExecutionEnvironment.getExecutionEnvironment
???env.setParallelism(1)?//?設(shè)置并行度為1方便后面進(jìn)行測試
???env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)?//?設(shè)置事件時間
???val?settings?=?EnvironmentSettings.newInstance()
?????.useBlinkPlanner()
?????.inStreamingMode()
?????.build()
???//構(gòu)建table?Env
???val?tableEnv?=?StreamTableEnvironment.create(env,?settings)
???//?讀取數(shù)據(jù)
???val?inputPath?=?"./data/sensor.txt"
???val?inputStream?=?env.readTextFile(inputPath)
???//?先轉(zhuǎn)換成樣例類類型(簡單轉(zhuǎn)換操作)
???//?解析數(shù)據(jù)?封裝成樣例類
???val?dataStream?=?inputStream
?????.map(data?=>?{
???????val?arr?=?data.split(",")
???????SensorReading(arr(0),?arr(1).toLong,?arr(2).toDouble)
?????})
?????.assignTimestampsAndWatermarks(new?BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1))?{
???????override?def?extractTimestamp(element:?SensorReading):?Long?=?element.timestamp?*?1000L
?????})
???//?將數(shù)據(jù)注冊成一張臨時表
???val?dataTable?=?tableEnv.fromDataStream(dataStream,'id,?'temperature,?'timestamp.rowtime?as?'ts)
???tableEnv.createTemporaryView("sensor",dataTable)
???var?tableRes=?dataTable.window(?Over?partitionBy?'id?orderBy??'ts?preceding?2.rows?as?'ow)
????.select('id,'ts,'id.count?over?'ow,?'temperature.avg?over?'ow)
??var?tableSql=?tableEnv.sqlQuery(
?????"""
???????|select
???????|id,
???????|ts,
???????|count(id)?over?ow,
???????|avg(temperature)?over?ow
???????|from?sensor
???????|window?ow?as(
???????|?partition?by?id
???????|?order?by?ts
???????|?rows?between?2?preceding?and?current?row
???????|)???????|""".stripMargin)
???tableRes.toAppendStream[Row].print("tableRes")
???tableSql.toAppendStream[Row].print("tableSql")
???env.execute("FlinkSqlTumBlingOverTime")
?}
?case?class?SensorReading(id:?String,?timestamp:?Long,?temperature:?Double)
}
總結(jié)
???????? 好了到這里FlinkSql中窗口使用到這里就結(jié)束啦,喜歡的可以給了三連擎厢。其中FlinkSql中的窗口的用法還是比較多得动遭,所有還是要多加練習(xí)神得。老話說的好循头,師傅領(lǐng)進(jìn)門,修行在個人国裳。有什么不明白的可以在評論區(qū)留言全跨,也可以加我微信就進(jìn)行一起討論。我是大數(shù)據(jù)老哥蛇数,我們下期見~~~是越。