前言
Flink 1.12正式發(fā)布后执俩,帶來了很多新的特性,本文重點(diǎn)學(xué)習(xí)和總結(jié)一下Flink 1.11和 Flink1.12中時(shí)態(tài)表的使用和自己的一個(gè)小總結(jié)慎框,文章如有問題膜毁,請(qǐng)大家留言交流討論,我會(huì)及時(shí)改正张足。
本文主要將在Flink1.12中新的時(shí)態(tài)表的一些新的概念和注意事項(xiàng)触创,如何在Join中使用會(huì)在之后另一個(gè)篇文章中具體討論。
Flink中的時(shí)態(tài)表的設(shè)計(jì)初衷
首先为牍,大家需要明確一個(gè)概念哼绑,就是傳統(tǒng)SQL中表一般表示的都是有界的數(shù)據(jù)岩馍,而直接套用于流計(jì)算這樣源源不斷的數(shù)據(jù)上是存在問題的,所以在Flink SQL中抖韩,提出了一種叫做動(dòng)態(tài)表的概念蛀恩,這一點(diǎn)官網(wǎng)已經(jīng)有了明確的說明。
詳情請(qǐng)看鏈接:動(dòng)態(tài)表
不過這里還是補(bǔ)充一些說明:
與靜態(tài)表相比茂浮,動(dòng)態(tài)表隨時(shí)間而變化双谆。將SQL查詢作用與動(dòng)態(tài)表,查詢會(huì)持續(xù)執(zhí)行而不會(huì)終止席揽,是一個(gè)
連續(xù)的查詢
顽馋。因?yàn)閿?shù)據(jù)會(huì)持續(xù)產(chǎn)生沒有盡頭,所以連續(xù)查詢不會(huì)給出一個(gè)最終而不變的結(jié)果幌羞,
流上的SQL實(shí)際上給出的總是中間結(jié)果
寸谜。連續(xù)的查詢不會(huì)終止且會(huì)根據(jù)其輸入表(動(dòng)態(tài)表)上的數(shù)據(jù)變化,持續(xù)計(jì)算并將變化反應(yīng)到其結(jié)果表中属桦。
在明確了上面的3個(gè)概念后熊痴,我們來看看時(shí)態(tài)表的設(shè)計(jì)初衷。
在業(yè)務(wù)中地啰,我們會(huì)遇到維度表是在時(shí)刻更新的愁拭,正常來說,我們只能獲取到最近一個(gè)時(shí)間的維度表數(shù)據(jù)亏吝,但是在業(yè)務(wù)中岭埠,我們往往最關(guān)心的是當(dāng)某時(shí)間發(fā)生時(shí),該事件的事件時(shí)間對(duì)應(yīng)的維度應(yīng)該是怎樣的蔚鸥,結(jié)合官網(wǎng)的一個(gè)例子惜论,解釋說明一下:
rowtime currency rate
======= ======== ======
09:00 US Dollar 102
09:00 Euro 114
09:00 Yen 1
10:45 Euro 116
11:15 Euro 119
11:49 Pounds 108
我們有一張匯率表,從上數(shù)據(jù)來看止喷,其是根據(jù)時(shí)間不斷的變化的馆类,例如,當(dāng)一個(gè)訂單在9:00 到來是時(shí)弹谁,對(duì)應(yīng)的維度結(jié)果應(yīng)該是:
rowtime currency rate
======= ======== ======
09:00 US Dollar 102
09:00 Euro 114
09:00 Yen 1
而當(dāng)一個(gè)訂單在12:00 到來是時(shí)乾巧,對(duì)應(yīng)的維度結(jié)果應(yīng)該是:
rowtime currency rate
======= ======== ======
09:00 US Dollar 102
09:00 Yen 1
11:15 Euro 119 (Euro 更新)
11:49 Pounds 108 (Pounds新增)
在Flink SQL1.11的時(shí)候,SQL的DDL上只支持處理時(shí)間語義的時(shí)態(tài)表join预愤,如果我們想達(dá)到事件時(shí)間語義的效果沟于,只能使用時(shí)態(tài)表函數(shù)來實(shí)現(xiàn),例如:
log.info("注冊(cè)訂單表完場");
tEnv.createTemporaryView("RatesHistory", ratesHistory);
log.info("注冊(cè)匯率表完成");
// 創(chuàng)建和注冊(cè)時(shí)態(tài)表函數(shù)
// 指定 "r_proctime" 為時(shí)間屬性植康,指定 "r_currency" 為主鍵
TemporalTableFunction rates = ratesHistory.createTemporalTableFunction(
$("rowtime"), // <==== (1)指定時(shí)態(tài)表函數(shù)的時(shí)間屬性
$("currency")); // <==== (2) 指定時(shí)態(tài)表函數(shù)的主鍵
log.info("創(chuàng)建時(shí)態(tài)表函數(shù)完成");
tEnv.createTemporarySystemFunction("Rates", rates);
log.info("注冊(cè)失態(tài)表函數(shù)完成");
String dml = "SELECT * FROM Orders AS o , LATERAL TABLE (Rates(o.time)) AS r WHERE r.currency = o.currency";
這里要注意的是:如果要傳入TemporalTableFunction事件時(shí)間屬性旷太,那么定義TemporalTableFunction時(shí),也需要定義成事件時(shí)間,否則會(huì)報(bào)錯(cuò):Non processing timeAttribute [TIME ATTRIBUTE(ROWTIME)] passed as the argument to TemporalTableFunction供璧。
而在Flink1.12中存崖,完善了1.11中的不足,在DDL直接支持事件時(shí)間和處理時(shí)間兩種語義睡毒,也引出了版本表(1.12)来惧,版本視圖(1.12),普通表(1.12)吕嘀,時(shí)態(tài)表函數(shù)(1.11)等概念违寞。
Flink1.12中時(shí)態(tài)表的類型
時(shí)態(tài)表可以劃分成一系列帶版本的表快照集合,表快照中的版本代表了快照中所有記錄的有效區(qū)間偶房,有效區(qū)間的開始時(shí)間和結(jié)束時(shí)間可以通過用戶指定趁曼,根據(jù)時(shí)態(tài)表是否可以追蹤自身的歷史版本與否,時(shí)態(tài)表可以分為 版本表
和 普通表
棕洋。
版本表:
-
什么是版本挡闰、版本表?
版本是不同時(shí)間段上反應(yīng)表數(shù)據(jù)的一種形態(tài)掰盘。比如我們上面舉到的匯率的例子摄悯,在9:00 和12:00 就是匯率表的兩個(gè)版本。
版本表則是表在不同時(shí)間段版本的一個(gè)集合愧捕,我們可以追蹤和并訪問它的歷史版本奢驯。
而在Flink1.12中,對(duì)于任何其基礎(chǔ)源或格式直接定義變更日志的表次绘,都將隱式定義版本化表瘪阁。包括upsert Kafka源以及數(shù)據(jù)庫changelog日志格式,例如debezium和canal邮偎。如上所述管跺,唯一的附加要求是
CREATE
表語句必須包含PRIMARY KEY
和事件時(shí)間屬性。定義了主鍵約束和事件時(shí)間屬性的表就是版本表禾进。 -
如何定義版本表:
使用主鍵約束和事件時(shí)間來定義一張版本表豁跑,因?yàn)橹麈I和時(shí)間事件可以唯一確定一條維度數(shù)據(jù)。
-- 定義一張版本表
CREATE TABLE product_changelog (
product_id STRING,
product_name STRING,
product_price DECIMAL(10, 4),
update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
PRIMARY KEY(product_id) NOT ENFORCED, -- (1) 定義主鍵約束
WATERMARK FOR update_time AS update_time -- (2) 通過 watermark 定義事件時(shí)間
) WITH (
'connector' = 'kafka',
'topic' = 'products',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'value.format' = 'debezium-json'
);
? 以上的DML在 (1)
為表 product_changelog
定義了主鍵, (2)
把 update_time
定義為表 product_changelog
的事件時(shí)間泻云,因此 product_changelog
是一張版本表艇拍。METADATA FROM 'value.source.timestamp' VIRTUAL
語法的意思是從每條 changelog 中抽取 changelog 對(duì)應(yīng)的數(shù)據(jù)庫表中操作的執(zhí)行時(shí)間。
- 使用版本表Join時(shí)需要注意的事項(xiàng):
如果是基于事件時(shí)間的時(shí)態(tài)表 Join 的 join key 必須包含時(shí)態(tài)表的主鍵宠纯,例如:表
product_changelog
的主鍵P.product_id
必須包含在 join 條件O.product_id = P.product_id
中淑倾。這個(gè)好理解,主鍵和事件時(shí)間唯一確定一條數(shù)據(jù)征椒。-
watermark的設(shè)置:
基于事件時(shí)間的時(shí)態(tài)表 Join 是通過左右兩側(cè)的 watermark 觸發(fā),請(qǐng)確保為 join 兩側(cè)的表設(shè)置了合適的 watermark湃累。事件時(shí)間的一個(gè)重要概念就是watermark勃救,這個(gè)沒必要解釋了碍讨。
官網(wǎng)強(qiáng)烈推薦使用數(shù)據(jù)庫表中操作的執(zhí)行時(shí)間作為事件時(shí)間,否則通過時(shí)間抽取的版本可能和數(shù)據(jù)庫中的版本不匹配蒙秒。
版本視圖
-
什么是視圖勃黍,視圖表?
視圖就是是已經(jīng)編譯好的SQL語句晕讲,視圖表就是通過已經(jīng)編譯好的SQL語句產(chǎn)生的虛擬表覆获。
為什么要有視圖表?
在流上瓢省,我們往往得到的是一個(gè)append-only流弄息,這意味著我們無法定義PRIMARY KEY
,但是勤婚,我們很清楚該表具有定義版本表的所有必要信息摹量,所以我們可以通過Flink SQL提供的DISTINCT做去重處理,去重查詢可以產(chǎn)出一個(gè)有序的 changelog 流馒胆。
-
如何定義視圖表:
去重查詢能夠推斷主鍵并保留原始數(shù)據(jù)流的事件時(shí)間屬性,如下:
SELECT * FROM RatesHistory;
currency_time currency rate
============= ========= ====
09:00:00 US Dollar 102
09:00:00 Euro 114
09:00:00 Yen 1
10:45:00 Euro 116
11:15:00 Euro 119
11:49:00 Pounds 108
-- 視圖中的去重 query 會(huì)被 Flink 優(yōu)化并高效地產(chǎn)出 changelog stream, 產(chǎn)出的 changelog 保留了主鍵約束和事件時(shí)間缨称。
CREATE VIEW versioned_rates AS
SELECT currency, rate, currency_time -- (1) `currency_time` 保留了事件時(shí)間
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY currency -- (2) `currency` 是去重 query 的 unique key,可以作為主鍵
ORDER BY currency_time DESC) AS rowNum
FROM RatesHistory )
WHERE rowNum = 1;
-- 視圖 `versioned_rates` 將會(huì)產(chǎn)出如下的 changelog:
(changelog kind) currency_time currency rate
================ ============= ========= ====
+(INSERT) 09:00:00 US Dollar 102
+(INSERT) 09:00:00 Euro 114
+(INSERT) 09:00:00 Yen 1
+(UPDATE_AFTER) 10:45:00 Euro 116
+(UPDATE_AFTER) 11:15:00 Euro 119
+(INSERT) 11:49:00 Pounds 108
-
使用是視圖表需要注意的事項(xiàng):
視圖表首先需要一個(gè)append_only流祝迂,這樣我們可以使用DISTINCT操作睦尽,通過事件的變化產(chǎn)出一張changlog流。而Flink SQL1.12會(huì)自動(dòng)推斷主鍵并保留原始數(shù)據(jù)流的事件時(shí)間型雳。
普通表
-
什么是普通表当凡?
版本表保留了表在各個(gè)時(shí)間段的版本,而普通表則只保留該表最新的一份數(shù)據(jù)四啰。
-
如何定義普通表:
普通表的特性就和他名稱一樣宁玫,就是Flink中的一個(gè)普通表,其聲明和 Flink 建表 DDL一致柑晒,如下:
-- 用 DDL 定義一張 HBase 表欧瘪,然后我們可以在 SQL 中將其當(dāng)作一張時(shí)態(tài)表使用
-- 'currency' 列是 HBase 表中的 rowKey
CREATE TABLE LatestRates (
currency STRING,
fam1 ROW<rate DOUBLE>
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'rates',
'zookeeper.quorum' = 'localhost:2181'
);
-
使用普通表需要注意的事項(xiàng):
理論上講任意都能用作時(shí)態(tài)表并在基于處理時(shí)間的時(shí)態(tài)表 Join 中使用,但當(dāng)前支持作為時(shí)態(tài)表的普通表必須實(shí)現(xiàn)接口
LookupableTableSource
匙赞。接口LookupableTableSource
的實(shí)例只能作為時(shí)態(tài)表用于基于處理時(shí)間的時(shí)態(tài) Join 佛掖。通過
LookupableTableSource
定義的表意味著該表具備了在運(yùn)行時(shí)通過一個(gè)或多個(gè) key 去查詢外部存儲(chǔ)系統(tǒng)的能力,當(dāng)前支持在 基于處理時(shí)間的時(shí)態(tài)表 join 中使用的表包括 JDBC, HBase 和 Hive涌庭。在基于處理時(shí)間的時(shí)態(tài)表 Join 中支持任意表作為時(shí)態(tài)表會(huì)在不遠(yuǎn)的將來支持芥被。
時(shí)態(tài)表函數(shù)
時(shí)態(tài)表函數(shù)在本文的第二部分已經(jīng)有了說明,需要注意的就是
在join時(shí)左表(左輸入/探針側(cè))去關(guān)聯(lián)一個(gè)時(shí)態(tài)表(右輸入/構(gòu)建側(cè))坐榆,兩邊的時(shí)間語義必須相同拴魄,否則會(huì)拋出類似的異常:Non processing timeAttribute [TIME ATTRIBUTE(ROWTIME)] passed as the argument to TemporalTableFunction。
基于處理時(shí)間的時(shí)態(tài) Join 中, 如果右側(cè)表不是可以直接查詢外部系統(tǒng)的表而是普通的數(shù)據(jù)流匹中,時(shí)態(tài)表函數(shù) Join 和 時(shí)態(tài)表 Join 的語義都有問題夏漱,時(shí)態(tài)表函數(shù) Join 仍然允許使用,但是時(shí)態(tài)表 Join 禁用了該功能顶捷。 語義問題的原因是 Join 算子沒辦法知道右側(cè)時(shí)態(tài)表(構(gòu)建側(cè))的完整快照是否到齊挂绰,這可能導(dǎo)致左側(cè)的流在啟動(dòng)時(shí)關(guān)聯(lián)不到用戶期待的數(shù)據(jù), 在生產(chǎn)環(huán)境中可能誤導(dǎo)用戶。
在處理時(shí)間語義下的時(shí)態(tài)表函數(shù)的只會(huì)保留最新的一份數(shù)據(jù)服赎,時(shí)間事件語義下則會(huì)保留每個(gè)水位對(duì)應(yīng)的動(dòng)態(tài)表葵蒂。
總結(jié)
本文總結(jié)了Flink1.11 時(shí)態(tài)關(guān)聯(lián)的不足和Flink1.12中時(shí)態(tài)表設(shè)計(jì)的一些新的概念和一些基本的定義表的方法和注意事項(xiàng)。后續(xù)會(huì)寫一個(gè)Join篇章來進(jìn)行時(shí)態(tài)表重虑,時(shí)態(tài)函數(shù)的使用補(bǔ)充践付。
-- by 倆只猴