SQL 簡述
SQL 是 Structured Query Language 的縮寫,最初是由美國計算機科學(xué)家Donald D. Chamberlin和 Raymond F. Boyce 在 20 世紀(jì) 70 年代早期從 Early History of SQL 中了解關(guān)系模型后在 IBM 開發(fā)的亩进。該版本最初稱為[SEQUEL: A Structured English Query Language](結(jié)構(gòu)化英語查詢語言)蘸拔,旨在操縱和檢索存儲在 IBM 原始準(zhǔn)關(guān)系數(shù)據(jù)庫管理系統(tǒng) System R 中的數(shù)據(jù)宅此。SEQUEL 后來改為 SQL胯府,因為“SEQUEL”是英國 Hawker Siddeley 飛機公司的商標(biāo)。我們看看這款用于特技飛行的英國皇家空軍豪客 Siddeley Hawk T.1A (Looks great):
第一款 SQL 數(shù)據(jù)庫
在 20 世紀(jì) 70 年代后期,Oracle 公司(當(dāng)時叫 Relational Software薯演,Inc.)開發(fā)了基于 SQL 的 RDBMS,并希望將其出售給美國海軍秧了,Central Intelligence 代理商和其他美國政府機構(gòu)跨扮。 1979 年 6 月,Oracle 公司為 VAX 計算機推出了第一個商業(yè)化的 SQL 實現(xiàn)验毡,即 Oracle V2衡创。
ANSI-SQL 標(biāo)準(zhǔn)的采用
直到 1986 年,ANSI 和 ISO 標(biāo)準(zhǔn)組正式采用了標(biāo)準(zhǔn)的"數(shù)據(jù)庫語言 SQL"語言定義晶通。該標(biāo)準(zhǔn)的新版本發(fā)布于 1989,1992,1996,1999,2003,2006,2008,2011璃氢,以及最近的 2016。Apache Flink SQL 核心算子的語義設(shè)計也參考了1992 狮辽、2011等 ANSI-SQL 標(biāo)準(zhǔn)一也。
SQL 操作及擴展
SQL 是專為查詢包含在關(guān)系數(shù)據(jù)庫中的數(shù)據(jù)而設(shè)計的,是一種基于 SET 操作的聲明性編程語言喉脖,而不是像 C 語言一樣的命令式編程語言椰苟。但是,各大關(guān)系數(shù)據(jù)庫廠商在遵循 ANSI-SQL 標(biāo)準(zhǔn)的同時又對標(biāo)準(zhǔn) SQL 進行擴展树叽,由基于 SET(無重復(fù)元素)的操作擴展到基于 BAG(有重復(fù)元素)的操作舆蝴,并且添加了過程編程語言功能,如:Oracle 的 PL/SQL, DB2 的 SQL PL菱皆,MySQL - SQL/PSM 以及 SQL Server 的 T-SQL 等等须误。
隨著時間的推移 ANSI-SQL 規(guī)范不斷完善,所涉及的功能不斷豐富仇轻,比如在 ANSI-2011 中又增加了 Temporal Table 的標(biāo)準(zhǔn)定義京痢,Temporal Table 的標(biāo)準(zhǔn)在結(jié)構(gòu)化關(guān)系數(shù)據(jù)存儲上添加了時間維度信息,這使得關(guān)系數(shù)據(jù)庫中不僅可以對當(dāng)前數(shù)據(jù)進行查詢操作篷店,根據(jù)時間版本信息也可以對歷史數(shù)據(jù)進行操作祭椰。這些不斷豐富的功能極大增強了 SQL 的應(yīng)用領(lǐng)域。
大數(shù)據(jù)計算領(lǐng)域?qū)?SQL 的應(yīng)用
離線計算(批計算)
提及大數(shù)據(jù)計算領(lǐng)域不得不說 MapReduce 計算模型疲陕,MapReduce 最早是由 Google 公司研究提出的一種面向大規(guī)模數(shù)據(jù)處理的并行計算模型和方法方淤,并發(fā)于 2004 年發(fā)表了論文Simplified Data Processing on Large Clusters。
論文發(fā)表之后 Apache 開源社區(qū)參考 Google MapReduce蹄殃,基于 Java 設(shè)計開發(fā)了一個稱為 Hadoop 的開源 MapReduce 并行計算框架携茂。很快得到了全球?qū)W術(shù)界和工業(yè)界的普遍關(guān)注,并得到推廣和普及應(yīng)用诅岩。
但利用 Hadoop 進行 MapReduce 的開發(fā)讳苦,需要開發(fā)人員精通 Java 語言带膜,并了解 MapReduce 的運行原理,這樣在一定程度上提高了 MapReduce 的開發(fā)門檻鸳谜,所以在開源社區(qū)又不斷涌現(xiàn)了一些為了簡化 MapReduce 開發(fā)的開源框架膝藕,其中 Hive 就是典型的代表。HSQL 可以讓用戶以類 SQL 的方式描述 MapReduce 計算咐扭,比如原本需要幾十行芭挽,甚至上百行才能完成的 wordCount,用戶一條 SQL 語句就能完成了蝗肪,這樣極大的降低了 MapReduce 的開發(fā)門檻袜爪,進而也成功的將 SQL 應(yīng)用到了大數(shù)據(jù)計算領(lǐng)域當(dāng)中來。
實時計算(流計算)
SQL 不僅僅被成功的應(yīng)用到了離線計算薛闪,SQL 的易用性也吸引了流計算產(chǎn)品饿敲,目前最熱的 Spark,F(xiàn)link 也紛紛支持了 SQL逛绵,尤其是 Flink 支持的更加徹底,集成了 Calcite倔韭,完全遵循 ANSI-SQL 標(biāo)準(zhǔn)术浪。Apache Flink 在 low-level API 上面用 DataSet 支持批計算,用 DataStream 支持流計算寿酌,但在 High-Level API 上面利用 SQL 將流與批進行了統(tǒng)一胰苏,使得用戶編寫一次 SQL 既可以在流計算中使用,又可以在批計算中使用醇疼,為既有流計算業(yè)務(wù)硕并,又有批計算業(yè)務(wù)的用戶節(jié)省了大量開發(fā)成本。
SQL 高性能與簡潔性
性能
SQL 經(jīng)過傳統(tǒng)數(shù)據(jù)庫領(lǐng)域幾十年的不斷打磨秧荆,查詢優(yōu)化器已經(jīng)能夠極大的優(yōu)化 SQL 的查詢性能倔毙,Apache Flink 應(yīng)用 Calcite 進行查詢優(yōu)化,復(fù)用了大量數(shù)據(jù)庫查詢優(yōu)化規(guī)則乙濒,在性能上不斷追求極致陕赃,能夠讓用戶關(guān)心但不用擔(dān)心性能問題。如下圖(Alibaba 對 Apache Flink 進行架構(gòu)優(yōu)化后的組件棧)
相對于 DataStream 而言颁股,SQL 會經(jīng)過 Optimization 模塊透明的為用戶進行查詢優(yōu)化么库,用戶專心編寫自己的業(yè)務(wù)邏輯,不用擔(dān)心性能甘有,卻能得到最優(yōu)的查詢性能!
簡潔
就簡潔性而言诉儒,SQL 與 DataSet 和 DataStream 相比具有很大的優(yōu)越性,我們先用一個 WordCount 示例來直觀的查看用戶的代碼量:
- DataStream/DataSetAPI
... //省略初始化代碼
// 核心邏輯
text.flatMap(new WordCount.Tokenizer()).keyBy(new int[]{0}).sum(1);
// flatmap 代碼定義
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
public Tokenizer() {
}
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\\W+");
String[] var4 = tokens;
int var5 = tokens.length;
for(int var6 = 0; var6 < var5; ++var6) {
String token = var4[var6];
if (token.length() > 0) {
out.collect(new Tuple2(token, 1));
}
}
}
}
- SQL
//省略初始化代碼
SELECT word, COUNT(word) FROM tab GROUP BY word;
我們直觀的體會到相同的統(tǒng)計功能使用 SQL 的簡潔性亏掀。
Flink SQL Job 的組成
我們做任何數(shù)據(jù)計算都離不開讀取原始數(shù)據(jù)忱反,計算邏輯和寫入計算結(jié)果數(shù)據(jù)三部分泛释,當(dāng)然基于 Apache Flink SQL 編寫的計算 Job 也離不開這三個部分,如下所示:
如上所示缭受,一個完整的 Apache Flink SQL Job 由如下三部分:
- Source Operator - Soruce operator 是對外部數(shù)據(jù)源的抽象, 目前 Apache Flink 內(nèi)置了很多常用的數(shù)據(jù)源實現(xiàn)胁澳,比如上圖提到的 Kafka。
- Query Operators - 查詢算子主要完成如圖的 Query Logic米者,目前支持了 Union韭畸,Join,Projection,Difference, Intersection 以及 window 等大多數(shù)傳統(tǒng)數(shù)據(jù)庫支持的操作蔓搞。
- Sink Operator - Sink operator 是對外結(jié)果表的抽象胰丁,目前 Apache Flink 也內(nèi)置了很多常用的結(jié)果表的抽象,比如上圖提到的 Kafka喂分。
Flink SQL 核心算子
目前 Flink SQL 支持 Union锦庸,Join,Projection,Difference, Intersection 以及 Window 等大多數(shù)傳統(tǒng)數(shù)據(jù)庫支持的操作蒲祈,接下來為大家分別進行簡單直觀的介紹甘萧。
環(huán)境
為了很好的體驗和理解 Apache Flink SQL 算子我們需要先準(zhǔn)備一下測試環(huán)境,我們選擇IDEA梆掸,以 ITCase 測試方式來進行體驗扬卷。IDEA 安裝這里不占篇幅介紹了,相信大家能輕松搞定酸钦!我們進行功能體驗有兩種方式怪得,具體如下:
源碼方式
對于開源愛好者可能更喜歡源代碼方式理解和體驗 Apache Flink SQL 功能,那么我們需要下載源代碼并導(dǎo)入到 IDEA 中:
- 下載源碼:
// 下載源代碼
git clone https://github.com/apache/flink.git study
// 進入源碼目錄
cd study
// 拉取穩(wěn)定版release-1.6
git fetch origin release-1.6:release-1.6
//切換到穩(wěn)定版
git checkout release-1.6
//將依賴安裝到本地mvn倉庫卑硫,耐心等待需要一段時間
mvn clean install -DskipTests
- 導(dǎo)入到 IDEA
將 Flink 源碼導(dǎo)入到 IDEA 過程這里不再占用篇幅徒恋,導(dǎo)入后確保在 IDEA 中可以運行org.apache.flink.table.runtime.stream.sql.SqlITCase
并測試全部通過,即證明體驗環(huán)境已經(jīng)完成欢伏。如下圖所示:
如上圖運行測試后顯示測試通過入挣,我們就可以繼續(xù)下面的 Apache Flink SQL 功能體驗了。
依賴 Flink 包方式
我們還有一種更簡單直接的方式硝拧,就是新建一個 mvn 項目财岔,并在 pom 中添加如下依賴:
<properties>
<table.version>1.6-SNAPSHOT</table.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>${table.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${table.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${table.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${table.version}</version>
</dependency>
<dependency>
<groupId>JUnit</groupId>
<artifactId>JUnit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
完成環(huán)境準(zhǔn)備后,我們開始準(zhǔn)備測試數(shù)據(jù)和寫一個簡單的測試類河爹。
示例數(shù)據(jù)及測試類
測試數(shù)據(jù)
- customer_tab 表 - 客戶表保存客戶 id匠璧,客戶姓名和客戶描述信息。字段及測試數(shù)據(jù)如下:
c_id | c_name | c_desc |
---|---|---|
c_001 | Kevin | from JinLin |
c_002 | Sunny | from JinLin |
c_003 | JinCheng | from HeBei |
- order_tab 表 - 訂單表保存客戶購買的訂單信息咸这,包括訂單 id夷恍,訂單時間和訂單描述信息。 字段節(jié)測試數(shù)據(jù)如下:
o_id | c_id | o_time | o_desc |
---|---|---|---|
o_oo1 | c_002 | 2018-11-05 10:01:01 | iphone |
o_002 | c_001 | 2018-11-05 10:01:55 | ipad |
o_003 | c_001 | 2018-11-05 10:03:44 | flink book |
- Item_tab
商品表, 攜帶商品 id,商品類型酿雪,出售時間遏暴,價格等信息,具體如下:
itemID | itemType | onSellTime | price |
---|---|---|---|
ITEM001 | Electronic | 2017-11-11 10:01:00 | 20 |
ITEM002 | Electronic | 2017-11-11 10:02:00 | 50 |
ITEM003 | Electronic | 2017-11-11 10:03:00 | 30 |
ITEM004 | Electronic | 2017-11-11 10:03:00 | 60 |
ITEM005 | Electronic | 2017-11-11 10:05:00 | 40 |
ITEM006 | Electronic | 2017-11-11 10:06:00 | 20 |
ITEM007 | Electronic | 2017-11-11 10:07:00 | 70 |
ITEM008 | Clothes | 2017-11-11 10:08:00 | 20 |
- PageAccess_tab
頁面訪問表指黎,包含用戶 ID朋凉,訪問時間,用戶所在地域信息醋安,具體數(shù)據(jù)如下:
region | userId | accessTime |
---|---|---|
ShangHai | U0010 | 2017-11-11 10:01:00 |
BeiJing | U1001 | 2017-11-11 10:01:00 |
BeiJing | U2032 | 2017-11-11 10:10:00 |
BeiJing | U1100 | 2017-11-11 10:11:00 |
ShangHai | U0011 | 2017-11-11 12:10:00 |
- PageAccessCount_tab
頁面訪問表杂彭,訪問量,訪問時間吓揪,用戶所在地域信息亲怠,具體數(shù)據(jù)如下:
region | userCount | accessTime |
---|---|---|
ShangHai | 100 | 2017.11.11 10:01:00 |
BeiJing | 86 | 2017.11.11 10:01:00 |
BeiJing | 210 | 2017.11.11 10:06:00 |
BeiJing | 33 | 2017.11.11 10:10:00 |
ShangHai | 129 | 2017.11.11 12:10:00 |
- PageAccessSession_tab
頁面訪問表,訪問量柠辞,訪問時間团秽,用戶所在地域信息,具體數(shù)據(jù)如下:
region | userId | accessTime |
---|---|---|
ShangHai | U0011 | 2017-11-11 10:01:00 |
ShangHai | U0012 | 2017-11-11 10:02:00 |
ShangHai | U0013 | 2017-11-11 10:03:00 |
ShangHai | U0015 | 2017-11-11 10:05:00 |
ShangHai | U0011 | 2017-11-11 10:10:00 |
BeiJing | U0110 | 2017-11-11 10:10:00 |
ShangHai | U2010 | 2017-11-11 10:11:00 |
ShangHai | U0410 | 2017-11-11 12:16:00 |
測試類
我們創(chuàng)建一個SqlOverviewITCase.scala
用于接下來介紹 Flink SQL 算子的功能體驗叭首。代碼如下:
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.StateBackend
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row
import org.junit.rules.TemporaryFolder
import org.junit.{Rule, Test}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
class SqlOverviewITCase {
val _tempFolder = new TemporaryFolder
@Rule
def tempFolder: TemporaryFolder = _tempFolder
def getStateBackend: StateBackend = {
new MemoryStateBackend()
}
// 客戶表數(shù)據(jù)
val customer_data = new mutable.MutableList[(String, String, String)]
customer_data.+=(("c_001", "Kevin", "from JinLin"))
customer_data.+=(("c_002", "Sunny", "from JinLin"))
customer_data.+=(("c_003", "JinCheng", "from HeBei"))
// 訂單表數(shù)據(jù)
val order_data = new mutable.MutableList[(String, String, String, String)]
order_data.+=(("o_001", "c_002", "2018-11-05 10:01:01", "iphone"))
order_data.+=(("o_002", "c_001", "2018-11-05 10:01:55", "ipad"))
order_data.+=(("o_003", "c_001", "2018-11-05 10:03:44", "flink book"))
// 商品銷售表數(shù)據(jù)
val item_data = Seq(
Left((1510365660000L, (1510365660000L, 20, "ITEM001", "Electronic"))),
Right((1510365660000L)),
Left((1510365720000L, (1510365720000L, 50, "ITEM002", "Electronic"))),
Right((1510365720000L)),
Left((1510365780000L, (1510365780000L, 30, "ITEM003", "Electronic"))),
Left((1510365780000L, (1510365780000L, 60, "ITEM004", "Electronic"))),
Right((1510365780000L)),
Left((1510365900000L, (1510365900000L, 40, "ITEM005", "Electronic"))),
Right((1510365900000L)),
Left((1510365960000L, (1510365960000L, 20, "ITEM006", "Electronic"))),
Right((1510365960000L)),
Left((1510366020000L, (1510366020000L, 70, "ITEM007", "Electronic"))),
Right((1510366020000L)),
Left((1510366080000L, (1510366080000L, 20, "ITEM008", "Clothes"))),
Right((151036608000L)))
// 頁面訪問表數(shù)據(jù)
val pageAccess_data = Seq(
Left((1510365660000L, (1510365660000L, "ShangHai", "U0010"))),
Right((1510365660000L)),
Left((1510365660000L, (1510365660000L, "BeiJing", "U1001"))),
Right((1510365660000L)),
Left((1510366200000L, (1510366200000L, "BeiJing", "U2032"))),
Right((1510366200000L)),
Left((1510366260000L, (1510366260000L, "BeiJing", "U1100"))),
Right((1510366260000L)),
Left((1510373400000L, (1510373400000L, "ShangHai", "U0011"))),
Right((1510373400000L)))
// 頁面訪問量表數(shù)據(jù)2
val pageAccessCount_data = Seq(
Left((1510365660000L, (1510365660000L, "ShangHai", 100))),
Right((1510365660000L)),
Left((1510365660000L, (1510365660000L, "BeiJing", 86))),
Right((1510365660000L)),
Left((1510365960000L, (1510365960000L, "BeiJing", 210))),
Right((1510366200000L)),
Left((1510366200000L, (1510366200000L, "BeiJing", 33))),
Right((1510366200000L)),
Left((1510373400000L, (1510373400000L, "ShangHai", 129))),
Right((1510373400000L)))
// 頁面訪問表數(shù)據(jù)3
val pageAccessSession_data = Seq(
Left((1510365660000L, (1510365660000L, "ShangHai", "U0011"))),
Right((1510365660000L)),
Left((1510365720000L, (1510365720000L, "ShangHai", "U0012"))),
Right((1510365720000L)),
Left((1510365720000L, (1510365720000L, "ShangHai", "U0013"))),
Right((1510365720000L)),
Left((1510365900000L, (1510365900000L, "ShangHai", "U0015"))),
Right((1510365900000L)),
Left((1510366200000L, (1510366200000L, "ShangHai", "U0011"))),
Right((1510366200000L)),
Left((1510366200000L, (1510366200000L, "BeiJing", "U2010"))),
Right((1510366200000L)),
Left((1510366260000L, (1510366260000L, "ShangHai", "U0011"))),
Right((1510366260000L)),
Left((1510373760000L, (1510373760000L, "ShangHai", "U0410"))),
Right((1510373760000L)))
def procTimePrint(sql: String): Unit = {
// Streaming 環(huán)境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
// 將order_tab, customer_tab 注冊到catalog
val customer = env.fromCollection(customer_data).toTable(tEnv).as('c_id, 'c_name, 'c_desc)
val order = env.fromCollection(order_data).toTable(tEnv).as('o_id, 'c_id, 'o_time, 'o_desc)
tEnv.registerTable("order_tab", order)
tEnv.registerTable("customer_tab", customer)
val result = tEnv.sqlQuery(sql).toRetractStream[Row]
val sink = new RetractingSink
result.addSink(sink)
env.execute()
}
def rowTimePrint(sql: String): Unit = {
// Streaming 環(huán)境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setStateBackend(getStateBackend)
env.setParallelism(1)
val tEnv = TableEnvironment.getTableEnvironment(env)
// 將item_tab, pageAccess_tab 注冊到catalog
val item =
env.addSource(new EventTimeSourceFunction[(Long, Int, String, String)](item_data))
.toTable(tEnv, 'onSellTime, 'price, 'itemID, 'itemType, 'rowtime.rowtime)
val pageAccess =
env.addSource(new EventTimeSourceFunction[(Long, String, String)](pageAccess_data))
.toTable(tEnv, 'accessTime, 'region, 'userId, 'rowtime.rowtime)
val pageAccessCount =
env.addSource(new EventTimeSourceFunction[(Long, String, Int)](pageAccessCount_data))
.toTable(tEnv, 'accessTime, 'region, 'accessCount, 'rowtime.rowtime)
val pageAccessSession =
env.addSource(new EventTimeSourceFunction[(Long, String, String)](pageAccessSession_data))
.toTable(tEnv, 'accessTime, 'region, 'userId, 'rowtime.rowtime)
tEnv.registerTable("item_tab", item)
tEnv.registerTable("pageAccess_tab", pageAccess)
tEnv.registerTable("pageAccessCount_tab", pageAccessCount)
tEnv.registerTable("pageAccessSession_tab", pageAccessSession)
val result = tEnv.sqlQuery(sql).toRetractStream[Row]
val sink = new RetractingSink
result.addSink(sink)
env.execute()
}
@Test
def testSelect(): Unit = {
val sql = "替換想要測試的SQL"
// 非window 相關(guān)用 procTimePrint(sql)
// Window 相關(guān)用 rowTimePrint(sql)
}
}
// 自定義Sink
final class RetractingSink extends RichSinkFunction[(Boolean, Row)] {
var retractedResults: ArrayBuffer[String] = mutable.ArrayBuffer.empty[String]
def invoke(v: (Boolean, Row)) {
retractedResults.synchronized {
val value = v._2.toString
if (v._1) {
retractedResults += value
} else {
val idx = retractedResults.indexOf(value)
if (idx >= 0) {
retractedResults.remove(idx)
} else {
throw new RuntimeException("Tried to retract a value that wasn't added first. " +
"This is probably an incorrectly implemented test. " +
"Try to set the parallelism of the sink to 1.")
}
}
}
retractedResults.sorted.foreach(println(_))
}
}
// Water mark 生成器
class EventTimeSourceFunction[T](
dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
override def run(ctx: SourceContext[T]): Unit = {
dataWithTimestampList.foreach {
case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
case Right(w) => ctx.emitWatermark(new Watermark(w))
}
}
override def cancel(): Unit = ???
}
Select
SELECT 用于從數(shù)據(jù)集/流中選擇數(shù)據(jù)习勤,語法遵循 ANSI-SQL 標(biāo)準(zhǔn),語義是關(guān)系代數(shù)中的投影(Projection),對關(guān)系進行垂直分割焙格,消去某些列, 如下圖所示:
SQL 示例
從customer_tab
選擇用戶姓名姻报,并用內(nèi)置的 CONCAT 函數(shù)拼接客戶信息,如下:
SELECT c_name, CONCAT(c_name, ' come ', c_desc) as desc FROM customer_tab;
Result
c_name | desc |
---|---|
Kevin | Kevin come from JinLin |
Sunny | Sunny come from JinLin |
Jincheng | Jincheng come from HeBei |
特別說明
大家看到在 SELECT
不僅可以使用普通的字段選擇间螟,還可以使用ScalarFunction
,當(dāng)然也包括User-Defined Function
,同時還可以進行字段的alias
設(shè)置损肛。其實SELECT
可以結(jié)合聚合厢破,在 GROUPBY 部分會進行介紹,一個比較特殊的使用場景是攜帶 DISTINCT
關(guān)鍵字,示例如下:
SQL 示例
在訂單表查詢所有的客戶 id治拿,消除重復(fù)客戶 id, 如下:
SELECT DISTINCT c_id FROM order_tab;
Result
c_id |
---|
c_001 |
c_002 |
WHERE
WHERE 用于從數(shù)據(jù)集/流中過濾數(shù)據(jù)摩泪,與 SELECT 一起使用,語法遵循 ANSI-SQL 標(biāo)準(zhǔn)劫谅,語義是關(guān)系代數(shù)的 Selection见坑,根據(jù)某些條件對關(guān)系做水平分割,即選擇符合條件的記錄捏检,如下所示:
SQL 示例
在customer_tab
查詢客戶 id 為c_001
和c_003
的客戶信息荞驴,如下:
SELECT c_id, c_name, c_desc FROM customer_tab WHERE c_id = 'c_001' OR c_id = 'c_003';
Result
c_id | c_name | c_desc |
---|---|---|
c_001 | Kevin | from JinLin |
c_003 | JinCheng | from HeBei |
特別說明
我們發(fā)現(xiàn)WHERE
是對滿足一定條件的數(shù)據(jù)進行過濾,WHERE
支持=, <, >, <>, >=, <=以及AND
贯城, OR
等表達(dá)式的組合熊楼,最終滿足過濾條件的數(shù)據(jù)會被選擇出來。并且 WHERE
可以結(jié)合IN
,NOT IN
聯(lián)合使用能犯,具體如下:
SQL 示例 (IN 常量)
使用 IN
在customer_tab
查詢客戶 id 為c_001
和c_003
的客戶信息鲫骗,如下:
SELECT c_id, c_name, c_desc FROM customer_tab WHERE c_id IN ('c_001', 'c_003');
Result
c_id | c_name | c_desc |
---|---|---|
c_001 | Kevin | from JinLin |
c_003 | JinCheng | from HeBei |
SQL 示例 (IN 子查詢)
使用 IN
和 子查詢 在customer_tab
查詢已經(jīng)下過訂單的客戶信息犬耻,如下:
SELECT c_id, c_name, c_desc FROM customer_tab WHERE c_id IN (SELECT c_id FROM order_tab);
Result
c_id | c_name | c_desc |
---|---|---|
c_001 | Kevin | from JinLin |
c_002 | Sunny | from JinLin |
IN/NOT IN 與關(guān)系代數(shù)
如上介紹 IN 是關(guān)系代數(shù)中的 Intersection, NOT IN 是關(guān)系代數(shù)的 Difference执泰, 如下圖示意:
- IN(Intersection)
- NOT IN(Difference)
GROUP BY
GROUP BY 是對數(shù)據(jù)進行分組的操作枕磁,比如我需要分別計算一下一個學(xué)生表里面女生和男生的人數(shù)分別是多少,如下:
SQL 示例
將 order_tab 信息按 customer_tab 分組統(tǒng)計訂單數(shù)量术吝,簡單示例如下:
SELECT c_id, count(o_id) as o_count FROM order_tab GROUP BY c_id;
Result
c_id | o_count |
---|---|
c_001 | 2 |
c_002 | 1 |
特別說明
在實際的業(yè)務(wù)場景中计济,GROUP BY 除了按業(yè)務(wù)字段進行分組外,很多時候用戶也可以用時間來進行分組(相當(dāng)于劃分窗口)顿苇,比如統(tǒng)計每分鐘的訂單數(shù)量:
SQL 示例
按時間進行分組峭咒,查詢每分鐘的訂單數(shù)量,如下:
SELECT SUBSTRING(o_time, 1, 16) AS o_time_min, count(o_id) AS o_count FROM order_tab GROUP BY SUBSTRING(o_time, 1, 16)
Result
o_time_min | o_count |
---|---|
2018-11-05 10:01 | 2 |
2018-11-05 10:03 | 1 |
說明:如果我們時間字段是 timestamp 類型纪岁,建議使用內(nèi)置的 DATE_FORMAT
函數(shù)凑队。
UNION ALL
UNION ALL 將兩個表合并起來,要求兩個表的字段完全一致幔翰,包括字段類型漩氨、字段順序,語義對應(yīng)關(guān)系代數(shù)的 Union,只是關(guān)系代數(shù)是 Set 集合操作遗增,會有去重復(fù)操作叫惊,UNION ALL 不進行去重,如下所示:
SQL 示例
我們簡單的將customer_tab
查詢 2 次做修,將查詢結(jié)果合并起來霍狰,如下:
SELECT c_id, c_name, c_desc FROM customer_tab UNION ALL SELECT c_id, c_name, c_desc FROM customer_tab
Result
c_id | c_name | c_desc |
---|---|---|
c_001 | Kevin | from JinLin |
c_002 | Sunny | from JinLin |
c_003 | JinCheng | from HeBei |
c_001 | Kevin | from JinLin |
c_002 | Sunny | from JinLin |
c_003 | JinCheng | from HeBei |
特別說明
UNION ALL 對結(jié)果數(shù)據(jù)不進行去重,如果想對結(jié)果數(shù)據(jù)進行去重饰及,傳統(tǒng)數(shù)據(jù)庫需要進行 UNION 操作蔗坯。
UNION
UNION 將兩個流給合并起來,要求兩個流的字段完全一致燎含,包括字段類型宾濒、字段順序,并其 UNION 不同于 UNION ALL屏箍,UNION 會對結(jié)果數(shù)據(jù)去重,與關(guān)系代數(shù)的 Union 語義一致绘梦,如下:
[圖片上傳失敗...(image-a1f92a-1592206042833)]
SQL 示例
我們簡單的將customer_tab
查詢 2 次,將查詢結(jié)果合并起來赴魁,如下:
SELECT c_id, c_name, c_desc FROM customer_tab UNION SELECT c_id, c_name, c_desc FROM customer_tab
我們發(fā)現(xiàn)完全一樣的表數(shù)據(jù)進行 UNION
之后卸奉,數(shù)據(jù)是被去重的,UNION
之后的數(shù)據(jù)并沒有增加颖御。
Result
c_id | c_name | c_desc |
---|---|---|
c_001 | Kevin | from JinLin |
c_002 | Sunny | from JinLin |
c_003 | JinCheng | from HeBei |
特別說明
UNION 對結(jié)果數(shù)據(jù)進行去重择卦,在實際的實現(xiàn)過程需要對數(shù)據(jù)進行排序操作,所以非必要去重情況請使用 UNION ALL 操作。
JOIN
JOIN 用于把來自兩個表的行聯(lián)合起來形成一個寬表秉继,Apache Flink 支持的 JOIN 類型:
- JOIN - INNER JOIN
- LEFT JOIN - LEFT OUTER JOIN
- RIGHT JOIN - RIGHT OUTER JOIN
- FULL JOIN - FULL OUTER JOIN
JOIN 與關(guān)系代數(shù)的 Join 語義相同祈噪,具體如下:
SQL 示例 (JOIN)
INNER JOIN
只選擇滿足ON
條件的記錄,我們查詢customer_tab
和 order_tab
表尚辑,將有訂單的客戶和訂單信息選擇出來辑鲤,如下:
SELECT * FROM customer_tab AS c JOIN order_tab AS o ON o.c_id = c.c_id
Result
c_id | c_name | c_desc | o_id | c_id | o_time | o_desc |
---|---|---|---|---|---|---|
c_001 | Kevin from | JinLin | o_002 | c_001 | 2018-11-05 10:01:55 | ipad |
c_001 | Kevin from | JinLin | o_003 | c_001 | 2018-11-05 10:03:44 | flink book |
c_002 | Sunny from | JinLin | o_oo1 | c_002 | 2018-11-05 10:01:01 | iphone |
SQL 示例 (LEFT JOIN)
LEFT JOIN
與INNER JOIN
的區(qū)別是當(dāng)右表沒有與左邊相 JOIN 的數(shù)據(jù)時候,右邊對應(yīng)的字段補NULL
輸出杠茬,語義如下:
[圖片上傳失敗...(image-5939b9-1592206042833)]
對應(yīng)的 SQL 語句如下(LEFT JOIN):
SELECT ColA, ColB, T2.ColC, ColE FROM TI LEFT JOIN T2 ON T1.ColC = T2.ColC ;
- 細(xì)心的讀者可能發(fā)現(xiàn)上面 T2.ColC 是添加了前綴 T2 了月褥,這里需要說明一下,當(dāng)兩張表有字段名字一樣的時候瓢喉,我需要指定是從那個表里面投影的宁赤。
我們查詢customer_tab
和 order_tab
表,將客戶和訂單信息選擇出來如下:
SELECT * FROM customer_tab AS c LEFT JOIN order_tab AS o ON o.c_id = c.c_id
Result
c_id | c_name | c_desc | o_id | c_id | o_time | o_desc |
---|---|---|---|---|---|---|
c_001 | Kevin | from JinLin | o_002 | c_001 | 2018-11-05 10:01:55 | ipad |
c_001 | Kevin | from JinLin | o_003 | c_001 | 2018-11-05 10:03:44 | flink book |
c_002 | Sunny | from JinLin | o_oo1 | c_002 | 2018-11-05 10:01:01 | iphone |
c_003 | JinCheng | from HeBei | NULL | NULL | NULL | NULL |
特別說明
RIGHT JOIN
相當(dāng)于 LEFT JOIN
左右兩個表交互一下位置栓票。FULL JOIN
相當(dāng)于 RIGHT JOIN
和 LEFT JOIN
之后進行UNION ALL
操作决左。
Window
在 Apache Flink 中有 2 種類型的 Window,一種是 OverWindow走贪,即傳統(tǒng)數(shù)據(jù)庫的標(biāo)準(zhǔn)開窗佛猛,每一個元素都對應(yīng)一個窗口。一種是 GroupWindow坠狡,目前在 SQL 中 GroupWindow 都是基于時間進行窗口劃分的继找。
Over Window
Apache Flink 中對 OVER Window 的定義遵循標(biāo)準(zhǔn) SQL 的定義語法。
按 ROWS 和 RANGE 分類是傳統(tǒng)數(shù)據(jù)庫的標(biāo)準(zhǔn)分類方法逃沿,在 Apache Flink 中還可以根據(jù)時間類型(ProcTime/EventTime)和窗口的有限和無限(Bounded/UnBounded)進行分類婴渡,共計 8 種類型。為了避免大家對過細(xì)分類造成困擾凯亮,我們按照確定當(dāng)前行的不同方式將 OVER Window 分成兩大類進行介紹边臼,如下:
- ROWS OVER Window - 每一行元素都視為新的計算行,即触幼,每一行都是一個新的窗口。
- RANGE OVER Window - 具有相同時間值的所有元素行視為同一計算行究飞,即置谦,具有相同時間值的所有行都是同一個窗口。
Bounded ROWS OVER Window
Bounded ROWS OVER Window 每一行元素都視為新的計算行亿傅,即媒峡,每一行都是一個新的窗口。
語義
我們以 3 個元素(2 PRECEDING)的窗口為例葵擎,如下圖:
上圖所示窗口 user 1 的 w5 和 w6谅阿, user 2 的 窗口 w2 和 w3,雖然有元素都是同一時刻到達(dá),但是他們?nèi)匀皇窃诓煌拇翱谇┎停@一點有別于 RANGE OVER Window寓涨。
語法
Bounded ROWS OVER Window 語法如下:
SELECT
agg1(col1) OVER(
[PARTITION BY (value_expression1,..., value_expressionN)]
ORDER BY timeCol
ROWS
BETWEEN (UNBOUNDED | rowCount) PRECEDING AND CURRENT ROW) AS colName,
...
FROM Tab1
- value_expression - 進行分區(qū)的字表達(dá)式;
- timeCol - 用于元素排序的時間字段氯檐;
- rowCount - 是定義根據(jù)當(dāng)前行開始向前追溯幾行元素戒良。
SQL 示例
利用item_tab
測試數(shù)據(jù),我們統(tǒng)計同類商品中當(dāng)前和當(dāng)前商品之前 2 個商品中的最高價格冠摄。
SELECT
itemID,
itemType,
onSellTime,
price,
MAX(price) OVER (
PARTITION BY itemType
ORDER BY onSellTime
ROWS BETWEEN 2 preceding AND CURRENT ROW) AS maxPrice
FROM item_tab
Result
itemID | itemType | onSellTime | price | maxPrice |
---|---|---|---|---|
ITEM001 | Electronic | 2017-11-11 10:01:00 | 20 | 20 |
ITEM002 | Electronic | 2017-11-11 10:02:00 | 50 | 50 |
ITEM003 | Electronic | 2017-11-11 10:03:00 | 30 | 50 |
ITEM004 | Electronic | 2017-11-11 10:03:00 | 60 | 60 |
ITEM005 | Electronic | 2017-11-11 10:05:00 | 40 | 60 |
ITEM006 | Electronic | 2017-11-11 10:06:00 | 20 | 60 |
ITEM007 | Electronic | 2017-11-11 10:07:00 | 70 | 70 |
ITEM008 | Clothes | 2017-11-11 10:08:00 | 20 | 20 |
Bounded RANGE OVER Window
Bounded RANGE OVER Window 具有相同時間值的所有元素行視為同一計算行糯崎,即,具有相同時間值的所有行都是同一個窗口河泳。
語義
我們以 3 秒中數(shù)據(jù)(INTERVAL '2' SECOND)的窗口為例沃呢,如下圖:
注意: 上圖所示窗口 user 1 的 w6, user 2 的 窗口 w3拆挥,元素都是同一時刻到達(dá),他們是在同一個窗口薄霜,這一點有別于 ROWS OVER Window。
語法
Bounded RANGE OVER Window 的語法如下:
SELECT
agg1(col1) OVER(
[PARTITION BY (value_expression1,..., value_expressionN)]
ORDER BY timeCol
RANGE
BETWEEN (UNBOUNDED | timeInterval) PRECEDING AND CURRENT ROW) AS colName,
...
FROM Tab1
- value_expression - 進行分區(qū)的字表達(dá)式竿刁;
- timeCol - 用于元素排序的時間字段黄锤;
- timeInterval - 是定義根據(jù)當(dāng)前行開始向前追溯指定時間的元素行;
SQL 示例
我們統(tǒng)計同類商品中當(dāng)前和當(dāng)前商品之前 2 分鐘商品中的最高價格食拜。
SELECT
itemID,
itemType,
onSellTime,
price,
MAX(price) OVER (
PARTITION BY itemType
ORDER BY rowtime
RANGE BETWEEN INTERVAL '2' MINUTE preceding AND CURRENT ROW) AS maxPrice
FROM item_tab
Result(Bounded RANGE OVER Window)
itemID | itemType | onSellTime | price | maxPrice |
---|---|---|---|---|
ITEM001 | Electronic | 2017-11-11 10:01:00 | 20 | 20 |
ITEM002 | Electronic | 2017-11-11 10:02:00 | 50 | 50 |
ITEM003 | Electronic | 2017-11-11 10:03:00 | 30 | 60 |
ITEM004 | Electronic | 2017-11-11 10:03:00 | 60 | 60 |
ITEM005 | Electronic | 2017-11-11 10:05:00 | 40 | 60 |
ITEM006 | Electronic | 2017-11-11 10:06:00 | 20 | 40 |
ITEM007 | Electronic | 2017-11-11 10:07:00 | 70 | 70 |
ITEM008 | Clothes | 2017-11-11 10:08:00 | 20 | 20 |
特別說明
OverWindow 最重要是要理解每一行數(shù)據(jù)都確定一個窗口鸵熟,同時目前在 Apache Flink 中只支持按時間字段排序。并且 OverWindow 開窗與 GroupBy 方式數(shù)據(jù)分組最大的不同在于负甸,GroupBy 數(shù)據(jù)分組統(tǒng)計時候流强,在SELECT
中除了 GROUP BY 的 key,不能直接選擇其他非 key 的字段呻待,但是 OverWindow 沒有這個限制打月,SELECT
可以選擇任何字段。比如一張表 table(a,b,c,d)4 個字段蚕捉,如果按 d 分組求 c 的最大值奏篙,兩種寫完如下:
- GROUP BY -
SELECT d, MAX(c) FROM table GROUP BY d
- OVER Window =
SELECT a, b, c, d, MAX(c) OVER(PARTITION BY d, ORDER BY ProcTime())
如上 OVER Window 雖然 PARTITION BY d,但 SELECT 中仍然可以選擇 a,b,c 字段。但在 GROUPBY 中迫淹,SELECT 只能選擇 d 字段秘通。
Group Window
根據(jù)窗口數(shù)據(jù)劃分的不同,目前 Apache Flink 有如下 3 種 Bounded Winodw:
- Tumble - 滾動窗口敛熬,窗口數(shù)據(jù)有固定的大小肺稀,窗口數(shù)據(jù)無疊加;
- Hop - 滑動窗口应民,窗口數(shù)據(jù)有固定大小话原,并且有固定的窗口重建頻率夕吻,窗口數(shù)據(jù)有疊加;
- Session - 會話窗口繁仁,窗口數(shù)據(jù)沒有固定的大小涉馅,根據(jù)窗口數(shù)據(jù)活躍程度劃分窗口,窗口數(shù)據(jù)無疊加改备。
說明: Aapche Flink 還支持 UnBounded 的 Group Window控漠,也就是全局 Window,流上所有數(shù)據(jù)都在一個窗口里面悬钳,語義非常簡單盐捷,這里不做詳細(xì)介紹了。
Tumble
語義
Tumble 滾動窗口有固定 size默勾,窗口數(shù)據(jù)不重疊,具體語義如下:
語法
Tumble 滾動窗口對應(yīng)的語法如下:
SELECT
[gk],
[TUMBLE_START(timeCol, size)],
[TUMBLE_END(timeCol, size)],
agg1(col1),
...
aggn(colN)
FROM Tab1
GROUP BY [gk], TUMBLE(timeCol, size)
- [gk] - 決定了流是 Keyed 還是/Non-Keyed;
- TUMBLE_START - 窗口開始時間;
- TUMBLE_END - 窗口結(jié)束時間;
- timeCol - 是流表中表示時間字段碉渡;
- size - 表示窗口的大小,如 秒母剥,分鐘滞诺,小時,天环疼。
SQL 示例
利用pageAccess_tab
測試數(shù)據(jù)习霹,我們需要按不同地域統(tǒng)計每 2 分鐘的淘寶首頁的訪問量(PV)。
SELECT
region,
TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS winStart,
TUMBLE_END(rowtime, INTERVAL '2' MINUTE) AS winEnd,
COUNT(region) AS pv
FROM pageAccess_tab
GROUP BY region, TUMBLE(rowtime, INTERVAL '2' MINUTE)
Result
region | winStart | winEnd | pv |
---|---|---|---|
BeiJing | 2017-11-11 02:00:00.0 | 2017-11-11 02:02:00.0 | 1 |
BeiJing | 2017-11-11 02:10:00.0 | 2017-11-11 02:12:00.0 | 2 |
ShangHai | 2017-11-11 02:00:00.0 | 2017-11-11 02:02:00.0 | 1 |
ShangHai | 2017-11-11 04:10:00.0 | 2017-11-11 04:12:00.0 | 1 |
Hop
Hop 滑動窗口和滾動窗口類似炫隶,窗口有固定的 size淋叶,與滾動窗口不同的是滑動窗口可以通過 slide 參數(shù)控制滑動窗口的新建頻率。因此當(dāng) slide 值小于窗口 size 的值的時候多個滑動窗口會重疊伪阶。
語義
Hop 滑動窗口語義如下所示:
語法
Hop 滑動窗口對應(yīng)語法如下:
SELECT
[gk],
[HOP_START(timeCol, slide, size)] ,
[HOP_END(timeCol, slide, size)],
agg1(col1),
...
aggN(colN)
FROM Tab1
GROUP BY [gk], HOP(timeCol, slide, size)
- [gk] 決定了流是 Keyed 還是/Non-Keyed;
- HOP_START - 窗口開始時間;
- HOP_END - 窗口結(jié)束時間;
- timeCol - 是流表中表示時間字段煞檩;
- slide - 是滑動步伐的大小栅贴;
- size - 是窗口的大小斟湃,如 秒,分鐘檐薯,小時凝赛,天;
SQL 示例
利用pageAccessCount_tab
測試數(shù)據(jù)坛缕,我們需要每 5 分鐘統(tǒng)計近 10 分鐘的頁面訪問量(PV).
SELECT
HOP_START(rowtime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE) AS winStart,
HOP_END(rowtime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE) AS winEnd,
SUM(accessCount) AS accessCount
FROM pageAccessCount_tab
GROUP BY HOP(rowtime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE)
Result
winStart | winEnd | accessCount |
---|---|---|
2017-11-11 01:55:00.0 | 2017-11-11 02:05:00.0 | 186 |
2017-11-11 02:00:00.0 | 2017-11-11 02:10:00.0 | 396 |
2017-11-11 02:05:00.0 | 2017-11-11 02:15:00.0 | 243 |
2017-11-11 02:10:00.0 | 2017-11-11 02:20:00.0 | 33 |
2017-11-11 04:05:00.0 | 2017-11-11 04:15:00.0 | 129 |
2017-11-11 04:10:00.0 | 2017-11-11 04:20:00.0 | 129 |
Session
Seeeion 會話窗口 是沒有固定大小的窗口墓猎,通過 session 的活躍度分組元素。不同于滾動窗口和滑動窗口祷膳,會話窗口不重疊,也沒有固定的起止時間陶衅。一個會話窗口在一段時間內(nèi)沒有接收到元素時,即當(dāng)出現(xiàn)非活躍間隙時關(guān)閉。一個會話窗口 分配器通過配置 session gap 來指定非活躍周期的時長.
語義
Session 會話窗口語義如下所示:
語法
Seeeion 會話窗口對應(yīng)語法如下:
SELECT
[gk],
SESSION_START(timeCol, gap) AS winStart,
SESSION_END(timeCol, gap) AS winEnd,
agg1(col1),
...
aggn(colN)
FROM Tab1
GROUP BY [gk], SESSION(timeCol, gap)
- [gk] 決定了流是 Keyed 還是/Non-Keyed;
- SESSION_START - 窗口開始時間趁啸;
- SESSION_END - 窗口結(jié)束時間怕轿;
- timeCol - 是流表中表示時間字段熙侍;
- gap - 是窗口數(shù)據(jù)非活躍周期的時長弛针;
SQL 示例
利用pageAccessSession_tab
測試數(shù)據(jù)裳朋,我們按地域統(tǒng)計連續(xù)的兩個訪問用戶之間的訪問時間間隔不超過 3 分鐘的的頁面訪問量(PV).
SELECT
region,
SESSION_START(rowtime, INTERVAL '3' MINUTE) AS winStart,
SESSION_END(rowtime, INTERVAL '3' MINUTE) AS winEnd,
COUNT(region) AS pv
FROM pageAccessSession_tab
GROUP BY region, SESSION(rowtime, INTERVAL '3' MINUTE)
Result
region | winStart | winEnd | pv |
---|---|---|---|
BeiJing | 2017-11-11 02:10:00.0 | 2017-11-11 02:13:00.0 | 1 |
ShangHai | 2017-11-11 02:01:00.0 | 2017-11-11 02:08:00.0 | 4 |
ShangHai | 2017-11-11 02:10:00.0 | 2017-11-11 02:14:00.0 | 2 |
ShangHai | 2017-11-11 04:16:00.0 | 2017-11-11 04:19:00.0 | 1 |
UDX
Apache Flink 除了提供了大部分 ANSI-SQL 的核心算子匪蝙,也為用戶提供了自己編寫業(yè)務(wù)代碼的機會敛摘,那就是 User-Defined Function,目前支持如下三種 User-Defined Function:
- UDF - User-Defined Scalar Function
- UDTF - User-Defined Table Function
- UDAF - User-Defined Aggregate Funciton
UDX 都是用戶自定義的函數(shù)门烂,那么 Apache Flink 框架為啥將自定義的函數(shù)分成三類呢?是根據(jù)什么劃分的呢兄淫?Apache Flink 對自定義函數(shù)進行分類的依據(jù)是根據(jù)函數(shù)語義的不同屯远,函數(shù)的輸入和輸出不同來分類的,具體如下:
UDX | INPUT | OUTPUT | INPUT:OUTPUT |
---|---|---|---|
UDF | 單行中的N(N>=0)列 | 單行中的1列 | 1:1 |
UDTF | 單行中的N(N>=0)列 | M(M>=0)行 | 1:N(N>=0) |
UDAF | M(M>=0)行中的每行的N(N>=0)列 | 單行中的1列 | M:1(M>=0) |
UDF
- 定義
用戶想自己編寫一個字符串聯(lián)接的 UDF捕虽,我們只需要實現(xiàn)ScalarFunction#eval()
方法即可慨丐,簡單實現(xiàn)如下:
object MyConnect extends ScalarFunction {
@varargs
def eval(args: String*): String = {
val sb = new StringBuilder
var i = 0
while (i < args.length) {
if (args(i) == null) {
return null
}
sb.append(args(i))
i += 1
}
sb.toString
}
}
- 使用
val fun = MyConnect
tEnv.registerFunction("myConnect", fun)
val sql = "SELECT myConnect(a, b) as str FROM tab"
UDTF
- 定義
用戶想自己編寫一個字符串切分的 UDTF,我們只需要實現(xiàn)TableFunction#eval()
方法即可泄私,簡單實現(xiàn)如下:
ScalarFunction#eval()
class MySplit extends TableFunction[String] {
def eval(str: String): Unit = {
if (str.contains("#")){
str.split("#").foreach(collect)
}
}
def eval(str: String, prefix: String): Unit = {
if (str.contains("#")) {
str.split("#").foreach(s => collect(prefix + s))
}
}
}
- 使用
val fun = new MySplit()
tEnv.registerFunction("mySplit", fun)
val sql = "SELECT c, s FROM MyTable, LATERAL TABLE(mySplit(c)) AS T(s)"
UDAF
- 定義
UDAF 要實現(xiàn)的接口比較多房揭,我們以一個簡單的 CountAGG 為例,做簡單實現(xiàn)如下:
/** The initial accumulator for count aggregate function */
class CountAccumulator extends JTuple1[Long] {
f0 = 0L //count
}
/**
* User-defined count aggregate function
*/
class MyCount
extends AggregateFunction[JLong, CountAccumulator] {
// process argument is optimized by Calcite.
// For instance count(42) or count(*) will be optimized to count().
def accumulate(acc: CountAccumulator): Unit = {
acc.f0 += 1L
}
// process argument is optimized by Calcite.
// For instance count(42) or count(*) will be optimized to count().
def retract(acc: CountAccumulator): Unit = {
acc.f0 -= 1L
}
def accumulate(acc: CountAccumulator, value: Any): Unit = {
if (value != null) {
acc.f0 += 1L
}
}
def retract(acc: CountAccumulator, value: Any): Unit = {
if (value != null) {
acc.f0 -= 1L
}
}
override def getValue(acc: CountAccumulator): JLong = {
acc.f0
}
def merge(acc: CountAccumulator, its: JIterable[CountAccumulator]): Unit = {
val iter = its.iterator()
while (iter.hasNext) {
acc.f0 += iter.next().f0
}
}
override def createAccumulator(): CountAccumulator = {
new CountAccumulator
}
def resetAccumulator(acc: CountAccumulator): Unit = {
acc.f0 = 0L
}
override def getAccumulatorType: TypeInformation[CountAccumulator] = {
new TupleTypeInfo(classOf[CountAccumulator], BasicTypeInfo.LONG_TYPE_INFO)
}
override def getResultType: TypeInformation[JLong] =
BasicTypeInfo.LONG_TYPE_INFO
}
- 使用
val fun = new MyCount()
tEnv.registerFunction("myCount", fun)
val sql = "SELECT myCount(c) FROM MyTable GROUP BY a"
Source&Sink
上面我們介紹了 Apache Flink SQL 核心算子的語法及語義晌端,這部分將選取 Bounded EventTime Tumble Window 為例為大家編寫一個完整的包括 Source 和 Sink 定義的 Apache Flink SQL Job捅暴。假設(shè)有一張?zhí)詫氻撁嬖L問表(PageAccess_tab),有地域咧纠,用戶 ID 和訪問時間蓬痒。我們需要按不同地域統(tǒng)計每 2 分鐘的淘寶首頁的訪問量(PV). 具體數(shù)據(jù)如下:
region | userId | accessTime |
---|---|---|
ShangHai | U0010 | 2017-11-11 10:01:00 |
BeiJing | U1001 | 2017-11-11 10:01:00 |
BeiJing | U2032 | 2017-11-11 10:10:00 |
BeiJing | U1100 | 2017-11-11 10:11:00 |
ShangHai | U0011 | 2017-11-11 12:10:00 |
Source 定義
自定義 Apache Flink Stream Source 需要實現(xiàn)StreamTableSource
, StreamTableSource
中通過StreamExecutionEnvironment
的addSource
方法獲取DataStream
, 所以我們需要自定義一個 SourceFunction
, 并且要支持產(chǎn)生 WaterMark,也就是要實現(xiàn)DefinedRowtimeAttributes
接口惧盹。
Source Function 定義
支持接收攜帶 EventTime 的數(shù)據(jù)集合乳幸,Either 的數(shù)據(jù)結(jié)構(gòu),Right 表示 WaterMark 和 Left 表示數(shù)據(jù):
class MySourceFunction[T](dataWithTimestampList: Seq[Either[(Long, T), Long]])
extends SourceFunction[T] {
override def run(ctx: SourceContext[T]): Unit = {
dataWithTimestampList.foreach {
case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
case Right(w) => ctx.emitWatermark(new Watermark(w))
}
}
override def cancel(): Unit = ???
}
定義 StreamTableSource
我們自定義的 Source 要攜帶我們測試的數(shù)據(jù)钧椰,以及對應(yīng)的 WaterMark 數(shù)據(jù)粹断,具體如下:
class MyTableSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {
val fieldNames = Array("accessTime", "region", "userId")
val schema = new TableSchema(fieldNames, Array(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING))
val rowType = new RowTypeInfo(
Array(Types.LONG, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
fieldNames)
// 頁面訪問表數(shù)據(jù) rows with timestamps and watermarks
val data = Seq(
Left(1510365660000L, Row.of(new JLong(1510365660000L), "ShangHai", "U0010")),
Right(1510365660000L),
Left(1510365660000L, Row.of(new JLong(1510365660000L), "BeiJing", "U1001")),
Right(1510365660000L),
Left(1510366200000L, Row.of(new JLong(1510366200000L), "BeiJing", "U2032")),
Right(1510366200000L),
Left(1510366260000L, Row.of(new JLong(1510366260000L), "BeiJing", "U1100")),
Right(1510366260000L),
Left(1510373400000L, Row.of(new JLong(1510373400000L), "ShangHai", "U0011")),
Right(1510373400000L)
)
override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
Collections.singletonList(new RowtimeAttributeDescriptor(
"accessTime",
new ExistingField("accessTime"),
PreserveWatermarks.INSTANCE))
}
override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
execEnv.addSource(new MySourceFunction[Row](data)).setParallelism(1).returns(rowType)
}
override def getReturnType: TypeInformation[Row] = rowType
override def getTableSchema: TableSchema = schema
}
Sink 定義
我們簡單的將計算結(jié)果寫入到 Apache Flink 內(nèi)置支持的 CSVSink 中,定義 Sink 如下:
def getCsvTableSink: TableSink[Row] = {
val tempFile = File.createTempFile("csv_sink_", "tem")
// 打印sink的文件路徑嫡霞,方便我們查看運行結(jié)果
println("Sink path : " + tempFile)
if (tempFile.exists()) {
tempFile.delete()
}
new CsvTableSink(tempFile.getAbsolutePath).configure(
Array[String]("region", "winStart", "winEnd", "pv"),
Array[TypeInformation[_]](Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG))
}
構(gòu)建主程序
主程序包括執(zhí)行環(huán)境的定義瓶埋,Source/Sink 的注冊以及統(tǒng)計查 SQL 的執(zhí)行,具體如下:
def main(args: Array[String]): Unit = {
// Streaming 環(huán)境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
// 設(shè)置EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//方便我們查出輸出數(shù)據(jù)
env.setParallelism(1)
val sourceTableName = "mySource"
// 創(chuàng)建自定義source數(shù)據(jù)結(jié)構(gòu)
val tableSource = new MyTableSource
val sinkTableName = "csvSink"
// 創(chuàng)建CSV sink 數(shù)據(jù)結(jié)構(gòu)
val tableSink = getCsvTableSink
// 注冊source
tEnv.registerTableSource(sourceTableName, tableSource)
// 注冊sink
tEnv.registerTableSink(sinkTableName, tableSink)
val sql =
"SELECT " +
" region, " +
" TUMBLE_START(accessTime, INTERVAL '2' MINUTE) AS winStart," +
" TUMBLE_END(accessTime, INTERVAL '2' MINUTE) AS winEnd, COUNT(region) AS pv " +
" FROM mySource " +
" GROUP BY TUMBLE(accessTime, INTERVAL '2' MINUTE), region"
tEnv.sqlQuery(sql).insertInto(sinkTableName);
env.execute()
}
執(zhí)行并查看運行結(jié)果
執(zhí)行主程序后我們會在控制臺得到 Sink 的文件路徑诊沪,如下:
Sink path : /var/folders/88/8n406qmx2z73qvrzc_rbtv_r0000gn/T/csv_sink_8025014910735142911tem
Cat 方式查看計算結(jié)果养筒,如下:
jinchengsunjcdeMacBook-Pro:FlinkTableApiDemo jincheng.sunjc$ cat /var/folders/88/8n406qmx2z73qvrzc_rbtv_r0000gn/T/csv_sink_8025014910735142911tem
ShangHai,2017-11-11 02:00:00.0,2017-11-11 02:02:00.0,1
BeiJing,2017-11-11 02:00:00.0,2017-11-11 02:02:00.0,1
BeiJing,2017-11-11 02:10:00.0,2017-11-11 02:12:00.0,2
ShangHai,2017-11-11 04:10:00.0,2017-11-11 04:12:00.0,1
表格化如上結(jié)果:
region | winStart | winEnd | pv |
---|---|---|---|
BeiJing | 2017-11-11 02:00:00.0 | 2017-11-11 02:02:00.0 | 1 |
BeiJing | 2017-11-11 02:10:00.0 | 2017-11-11 02:12:00.0 | 2 |
ShangHai | 2017-11-11 02:00:00.0 | 2017-11-11 02:02:00.0 | 1 |
ShangHai | 2017-11-11 04:10:00.0 | 2017-11-11 04:12:00.0 | 1 |
上面這個端到端的完整示例也可以應(yīng)用到本篇前面介紹的其他算子示例中,只是大家根據(jù) Source 和 Sink 的 Schema 不同來進行相應(yīng)的構(gòu)建即可端姚!
總結(jié)
本篇概要的向大家介紹了 SQL 的由來晕粪,Apache Flink SQL 大部分核心功能,并附帶了具體的測試數(shù)據(jù)和測試程序渐裸,最后以一個 End-to-End 的示例展示了如何編寫 Apache Flink SQL 的 Job 收尾巫湘。本篇著重向大家介紹 Apache Flink SQL 的使用装悲,后續(xù)我們再繼續(xù)探究每個算子的實現(xiàn)原理。
附錄
- [1] Early History of SQL
- [2] SEQUEL: A Structured English Query Language
- [3] Source Code - EventTimeTumbleWindowDemo.scala
關(guān)于點贊和評論
本系列文章難免有很多缺陷和不足尚氛,真誠希望讀者對有收獲的篇章給予點贊鼓勵诀诊,對有不足的篇章給予反饋和建議,先行感謝大家阅嘶!