Flink1.12新特性之Flink SQL中時(shí)態(tài)表(Temporal Tables)說明和總結(jié)

前言

Flink 1.12正式發(fā)布后执俩,帶來了很多新的特性,本文重點(diǎn)學(xué)習(xí)和總結(jié)一下Flink 1.11和 Flink1.12中時(shí)態(tài)表的使用和自己的一個(gè)小總結(jié)慎框,文章如有問題膜毁,請(qǐng)大家留言交流討論,我會(huì)及時(shí)改正张足。

本文主要將在Flink1.12中新的時(shí)態(tài)表的一些新的概念和注意事項(xiàng)触创,如何在Join中使用會(huì)在之后另一個(gè)篇文章中具體討論。

Flink中的時(shí)態(tài)表的設(shè)計(jì)初衷

首先为牍,大家需要明確一個(gè)概念哼绑,就是傳統(tǒng)SQL中表一般表示的都是有界的數(shù)據(jù)岩馍,而直接套用于流計(jì)算這樣源源不斷的數(shù)據(jù)上是存在問題的,所以在Flink SQL中抖韩,提出了一種叫做動(dòng)態(tài)表的概念蛀恩,這一點(diǎn)官網(wǎng)已經(jīng)有了明確的說明。

詳情請(qǐng)看鏈接:動(dòng)態(tài)表

不過這里還是補(bǔ)充一些說明:

  • 與靜態(tài)表相比茂浮,動(dòng)態(tài)表隨時(shí)間而變化双谆。將SQL查詢作用與動(dòng)態(tài)表,查詢會(huì)持續(xù)執(zhí)行而不會(huì)終止席揽,是一個(gè)連續(xù)的查詢顽馋。

  • 因?yàn)閿?shù)據(jù)會(huì)持續(xù)產(chǎn)生沒有盡頭,所以連續(xù)查詢不會(huì)給出一個(gè)最終而不變的結(jié)果幌羞,流上的SQL實(shí)際上給出的總是中間結(jié)果寸谜。

  • 連續(xù)的查詢不會(huì)終止且會(huì)根據(jù)其輸入表(動(dòng)態(tài)表)上的數(shù)據(jù)變化,持續(xù)計(jì)算并將變化反應(yīng)到其結(jié)果表中属桦。

在明確了上面的3個(gè)概念后熊痴,我們來看看時(shí)態(tài)表的設(shè)計(jì)初衷。

在業(yè)務(wù)中地啰,我們會(huì)遇到維度表是在時(shí)刻更新的愁拭,正常來說,我們只能獲取到最近一個(gè)時(shí)間的維度表數(shù)據(jù)亏吝,但是在業(yè)務(wù)中岭埠,我們往往最關(guān)心的是當(dāng)某時(shí)間發(fā)生時(shí),該事件的事件時(shí)間對(duì)應(yīng)的維度應(yīng)該是怎樣的蔚鸥,結(jié)合官網(wǎng)的一個(gè)例子惜论,解釋說明一下:

rowtime currency   rate
======= ======== ======
09:00   US Dollar   102
09:00   Euro        114
09:00   Yen           1
10:45   Euro        116
11:15   Euro        119
11:49   Pounds      108

我們有一張匯率表,從上數(shù)據(jù)來看止喷,其是根據(jù)時(shí)間不斷的變化的馆类,例如,當(dāng)一個(gè)訂單在9:00 到來是時(shí)弹谁,對(duì)應(yīng)的維度結(jié)果應(yīng)該是:

rowtime currency   rate
======= ======== ======
09:00   US Dollar   102
09:00   Euro        114
09:00   Yen           1

而當(dāng)一個(gè)訂單在12:00 到來是時(shí)乾巧,對(duì)應(yīng)的維度結(jié)果應(yīng)該是:

rowtime currency   rate
======= ======== ======
09:00   US Dollar   102
09:00   Yen           1
11:15   Euro        119 (Euro 更新)
11:49   Pounds      108 (Pounds新增)

在Flink SQL1.11的時(shí)候,SQL的DDL上只支持處理時(shí)間語義的時(shí)態(tài)表join预愤,如果我們想達(dá)到事件時(shí)間語義的效果沟于,只能使用時(shí)態(tài)表函數(shù)來實(shí)現(xiàn),例如:

log.info("注冊(cè)訂單表完場");
tEnv.createTemporaryView("RatesHistory", ratesHistory);
log.info("注冊(cè)匯率表完成");

// 創(chuàng)建和注冊(cè)時(shí)態(tài)表函數(shù)
// 指定 "r_proctime" 為時(shí)間屬性植康,指定 "r_currency" 為主鍵
TemporalTableFunction rates = ratesHistory.createTemporalTableFunction(
$("rowtime"), // <==== (1)指定時(shí)態(tài)表函數(shù)的時(shí)間屬性
$("currency")); // <==== (2) 指定時(shí)態(tài)表函數(shù)的主鍵
log.info("創(chuàng)建時(shí)態(tài)表函數(shù)完成");
tEnv.createTemporarySystemFunction("Rates", rates);
log.info("注冊(cè)失態(tài)表函數(shù)完成");

String dml = "SELECT * FROM Orders AS o , LATERAL TABLE (Rates(o.time)) AS r WHERE r.currency = o.currency";

這里要注意的是:如果要傳入TemporalTableFunction事件時(shí)間屬性旷太,那么定義TemporalTableFunction時(shí),也需要定義成事件時(shí)間,否則會(huì)報(bào)錯(cuò):Non processing timeAttribute [TIME ATTRIBUTE(ROWTIME)] passed as the argument to TemporalTableFunction供璧。

而在Flink1.12中存崖,完善了1.11中的不足,在DDL直接支持事件時(shí)間和處理時(shí)間兩種語義睡毒,也引出了版本表(1.12)来惧,版本視圖(1.12),普通表(1.12)吕嘀,時(shí)態(tài)表函數(shù)(1.11)等概念违寞。

Flink1.12中時(shí)態(tài)表的類型

時(shí)態(tài)表可以劃分成一系列帶版本的表快照集合,表快照中的版本代表了快照中所有記錄的有效區(qū)間偶房,有效區(qū)間的開始時(shí)間和結(jié)束時(shí)間可以通過用戶指定趁曼,根據(jù)時(shí)態(tài)表是否可以追蹤自身的歷史版本與否,時(shí)態(tài)表可以分為 版本表普通表棕洋。

版本表:

  • 什么是版本挡闰、版本表?

    版本是不同時(shí)間段上反應(yīng)表數(shù)據(jù)的一種形態(tài)掰盘。比如我們上面舉到的匯率的例子摄悯,在9:00 和12:00 就是匯率表的兩個(gè)版本。

    版本表則是表在不同時(shí)間段版本的一個(gè)集合愧捕,我們可以追蹤和并訪問它的歷史版本奢驯。

    而在Flink1.12中,對(duì)于任何其基礎(chǔ)源或格式直接定義變更日志的表次绘,都將隱式定義版本化表瘪阁。包括upsert Kafka源以及數(shù)據(jù)庫changelog日志格式,例如debeziumcanal邮偎。如上所述管跺,唯一的附加要求是CREATE表語句必須包含PRIMARY KEY和事件時(shí)間屬性。定義了主鍵約束和事件時(shí)間屬性的表就是版本表禾进。

  • 如何定義版本表:

    使用主鍵約束和事件時(shí)間來定義一張版本表豁跑,因?yàn)橹麈I和時(shí)間事件可以唯一確定一條維度數(shù)據(jù)。

-- 定義一張版本表
CREATE TABLE product_changelog (
  product_id STRING,
  product_name STRING,
  product_price DECIMAL(10, 4),
  update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
  PRIMARY KEY(product_id) NOT ENFORCED,      -- (1) 定義主鍵約束
  WATERMARK FOR update_time AS update_time   -- (2) 通過 watermark 定義事件時(shí)間              
) WITH (
  'connector' = 'kafka',
  'topic' = 'products',
  'scan.startup.mode' = 'earliest-offset',
  'properties.bootstrap.servers' = 'localhost:9092',
  'value.format' = 'debezium-json'
);

? 以上的DML在 (1) 為表 product_changelog 定義了主鍵, (2)update_time 定義為表 product_changelog 的事件時(shí)間泻云,因此 product_changelog 是一張版本表艇拍。METADATA FROM 'value.source.timestamp' VIRTUAL 語法的意思是從每條 changelog 中抽取 changelog 對(duì)應(yīng)的數(shù)據(jù)庫表中操作的執(zhí)行時(shí)間。

  • 使用版本表Join時(shí)需要注意的事項(xiàng):
    • 如果是基于事件時(shí)間的時(shí)態(tài)表 Join 的 join key 必須包含時(shí)態(tài)表的主鍵宠纯,例如:表 product_changelog 的主鍵 P.product_id 必須包含在 join 條件 O.product_id = P.product_id 中淑倾。這個(gè)好理解,主鍵和事件時(shí)間唯一確定一條數(shù)據(jù)征椒。

    • watermark的設(shè)置:

      基于事件時(shí)間的時(shí)態(tài)表 Join 是通過左右兩側(cè)的 watermark 觸發(fā),請(qǐng)確保為 join 兩側(cè)的表設(shè)置了合適的 watermark湃累。事件時(shí)間的一個(gè)重要概念就是watermark勃救,這個(gè)沒必要解釋了碍讨。

    • 官網(wǎng)強(qiáng)烈推薦使用數(shù)據(jù)庫表中操作的執(zhí)行時(shí)間作為事件時(shí)間,否則通過時(shí)間抽取的版本可能和數(shù)據(jù)庫中的版本不匹配蒙秒。

版本視圖

  • 什么是視圖勃黍,視圖表?

    視圖就是是已經(jīng)編譯好的SQL語句晕讲,視圖表就是通過已經(jīng)編譯好的SQL語句產(chǎn)生的虛擬表覆获。

  • 為什么要有視圖表?

在流上瓢省,我們往往得到的是一個(gè)append-only流弄息,這意味著我們無法定義PRIMARY KEY,但是勤婚,我們很清楚該表具有定義版本表的所有必要信息摹量,所以我們可以通過Flink SQL提供的DISTINCT做去重處理,去重查詢可以產(chǎn)出一個(gè)有序的 changelog 流馒胆。

  • 如何定義視圖表:

    去重查詢能夠推斷主鍵并保留原始數(shù)據(jù)流的事件時(shí)間屬性,如下:

SELECT * FROM RatesHistory;

currency_time currency  rate
============= ========= ====
09:00:00      US Dollar 102
09:00:00      Euro      114
09:00:00      Yen       1
10:45:00      Euro      116
11:15:00      Euro      119
11:49:00      Pounds    108

-- 視圖中的去重 query 會(huì)被 Flink 優(yōu)化并高效地產(chǎn)出 changelog stream, 產(chǎn)出的 changelog 保留了主鍵約束和事件時(shí)間缨称。
CREATE VIEW versioned_rates AS              
SELECT currency, rate, currency_time            -- (1) `currency_time` 保留了事件時(shí)間
  FROM (
      SELECT *,
      ROW_NUMBER() OVER (PARTITION BY currency  -- (2) `currency` 是去重 query 的 unique key,可以作為主鍵
         ORDER BY currency_time DESC) AS rowNum 
      FROM RatesHistory )
WHERE rowNum = 1; 

-- 視圖 `versioned_rates` 將會(huì)產(chǎn)出如下的 changelog:

(changelog kind) currency_time currency   rate
================ ============= =========  ====
+(INSERT)        09:00:00      US Dollar  102
+(INSERT)        09:00:00      Euro       114
+(INSERT)        09:00:00      Yen        1
+(UPDATE_AFTER)  10:45:00      Euro       116
+(UPDATE_AFTER)  11:15:00      Euro       119
+(INSERT)        11:49:00      Pounds     108
  • 使用是視圖表需要注意的事項(xiàng):

    視圖表首先需要一個(gè)append_only流祝迂,這樣我們可以使用DISTINCT操作睦尽,通過事件的變化產(chǎn)出一張changlog流。而Flink SQL1.12會(huì)自動(dòng)推斷主鍵并保留原始數(shù)據(jù)流的事件時(shí)間型雳。

普通表

  • 什么是普通表当凡?

    版本表保留了表在各個(gè)時(shí)間段的版本,而普通表則只保留該表最新的一份數(shù)據(jù)四啰。

  • 如何定義普通表:

    普通表的特性就和他名稱一樣宁玫,就是Flink中的一個(gè)普通表,其聲明和 Flink 建表 DDL一致柑晒,如下:

-- 用 DDL 定義一張 HBase 表欧瘪,然后我們可以在 SQL 中將其當(dāng)作一張時(shí)態(tài)表使用
-- 'currency' 列是 HBase 表中的 rowKey
 CREATE TABLE LatestRates (   
     currency STRING,   
     fam1 ROW<rate DOUBLE>   
 ) WITH (   
    'connector' = 'hbase-1.4',   
    'table-name' = 'rates',   
    'zookeeper.quorum' = 'localhost:2181'   
 );
  • 使用普通表需要注意的事項(xiàng):

    理論上講任意都能用作時(shí)態(tài)表并在基于處理時(shí)間的時(shí)態(tài)表 Join 中使用,但當(dāng)前支持作為時(shí)態(tài)表的普通表必須實(shí)現(xiàn)接口 LookupableTableSource匙赞。接口 LookupableTableSource 的實(shí)例只能作為時(shí)態(tài)表用于基于處理時(shí)間的時(shí)態(tài) Join 佛掖。

    通過 LookupableTableSource 定義的表意味著該表具備了在運(yùn)行時(shí)通過一個(gè)或多個(gè) key 去查詢外部存儲(chǔ)系統(tǒng)的能力,當(dāng)前支持在 基于處理時(shí)間的時(shí)態(tài)表 join 中使用的表包括 JDBC, HBaseHive涌庭。

    在基于處理時(shí)間的時(shí)態(tài)表 Join 中支持任意表作為時(shí)態(tài)表會(huì)在不遠(yuǎn)的將來支持芥被。

時(shí)態(tài)表函數(shù)

時(shí)態(tài)表函數(shù)在本文的第二部分已經(jīng)有了說明,需要注意的就是

  • 在join時(shí)左表(左輸入/探針側(cè))去關(guān)聯(lián)一個(gè)時(shí)態(tài)表(右輸入/構(gòu)建側(cè))坐榆,兩邊的時(shí)間語義必須相同拴魄,否則會(huì)拋出類似的異常:Non processing timeAttribute [TIME ATTRIBUTE(ROWTIME)] passed as the argument to TemporalTableFunction。

  • 基于處理時(shí)間的時(shí)態(tài) Join 中, 如果右側(cè)表不是可以直接查詢外部系統(tǒng)的表而是普通的數(shù)據(jù)流匹中,時(shí)態(tài)表函數(shù) Join 和 時(shí)態(tài)表 Join 的語義都有問題夏漱,時(shí)態(tài)表函數(shù) Join 仍然允許使用,但是時(shí)態(tài)表 Join 禁用了該功能顶捷。 語義問題的原因是 Join 算子沒辦法知道右側(cè)時(shí)態(tài)表(構(gòu)建側(cè))的完整快照是否到齊挂绰,這可能導(dǎo)致左側(cè)的流在啟動(dòng)時(shí)關(guān)聯(lián)不到用戶期待的數(shù)據(jù), 在生產(chǎn)環(huán)境中可能誤導(dǎo)用戶。

  • 在處理時(shí)間語義下的時(shí)態(tài)表函數(shù)的只會(huì)保留最新的一份數(shù)據(jù)服赎,時(shí)間事件語義下則會(huì)保留每個(gè)水位對(duì)應(yīng)的動(dòng)態(tài)表葵蒂。

總結(jié)

本文總結(jié)了Flink1.11 時(shí)態(tài)關(guān)聯(lián)的不足和Flink1.12中時(shí)態(tài)表設(shè)計(jì)的一些新的概念和一些基本的定義表的方法和注意事項(xiàng)。后續(xù)會(huì)寫一個(gè)Join篇章來進(jìn)行時(shí)態(tài)表重虑,時(shí)態(tài)函數(shù)的使用補(bǔ)充践付。

-- by 倆只猴

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市嚎尤,隨后出現(xiàn)的幾起案子荔仁,更是在濱河造成了極大的恐慌,老刑警劉巖芽死,帶你破解...
    沈念sama閱讀 212,718評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件乏梁,死亡現(xiàn)場離奇詭異,居然都是意外死亡关贵,警方通過查閱死者的電腦和手機(jī)遇骑,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,683評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來揖曾,“玉大人落萎,你說我怎么就攤上這事√考簦” “怎么了练链?”我有些...
    開封第一講書人閱讀 158,207評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長奴拦。 經(jīng)常有香客問我媒鼓,道長,這世上最難降的妖魔是什么错妖? 我笑而不...
    開封第一講書人閱讀 56,755評(píng)論 1 284
  • 正文 為了忘掉前任绿鸣,我火速辦了婚禮,結(jié)果婚禮上暂氯,老公的妹妹穿的比我還像新娘潮模。我一直安慰自己,他們只是感情好痴施,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,862評(píng)論 6 386
  • 文/花漫 我一把揭開白布擎厢。 她就那樣靜靜地躺著究流,像睡著了一般。 火紅的嫁衣襯著肌膚如雪锉矢。 梳的紋絲不亂的頭發(fā)上梯嗽,一...
    開封第一講書人閱讀 50,050評(píng)論 1 291
  • 那天,我揣著相機(jī)與錄音沽损,去河邊找鬼。 笑死循头,一個(gè)胖子當(dāng)著我的面吹牛绵估,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播卡骂,決...
    沈念sama閱讀 39,136評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼国裳,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了全跨?” 一聲冷哼從身側(cè)響起缝左,我...
    開封第一講書人閱讀 37,882評(píng)論 0 268
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎浓若,沒想到半個(gè)月后渺杉,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,330評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡挪钓,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,651評(píng)論 2 327
  • 正文 我和宋清朗相戀三年是越,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片碌上。...
    茶點(diǎn)故事閱讀 38,789評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡倚评,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出馏予,到底是詐尸還是另有隱情天梧,我是刑警寧澤,帶...
    沈念sama閱讀 34,477評(píng)論 4 333
  • 正文 年R本政府宣布霞丧,位于F島的核電站呢岗,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏蚯妇。R本人自食惡果不足惜敷燎,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,135評(píng)論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望箩言。 院中可真熱鬧硬贯,春花似錦、人聲如沸陨收。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,864評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至拄衰,卻和暖如春它褪,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背翘悉。 一陣腳步聲響...
    開封第一講書人閱讀 32,099評(píng)論 1 267
  • 我被黑心中介騙來泰國打工茫打, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人妖混。 一個(gè)月前我還...
    沈念sama閱讀 46,598評(píng)論 2 362
  • 正文 我出身青樓老赤,卻偏偏與公主長得像,于是被迫代替她去往敵國和親制市。 傳聞我的和親對(duì)象是個(gè)殘疾皇子抬旺,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,697評(píng)論 2 351

推薦閱讀更多精彩內(nèi)容