Apache Flink - SQL概覽

原文來自“金竹”原載于云棲社區(qū)

SQL 簡述

SQL 是 Structured Query Language 的縮寫,最初是由美國計算機科學(xué)家Donald D. Chamberlin和 Raymond F. Boyce 在 20 世紀(jì) 70 年代早期從 Early History of SQL 中了解關(guān)系模型后在 IBM 開發(fā)的亩进。該版本最初稱為[SEQUEL: A Structured English Query Language](結(jié)構(gòu)化英語查詢語言)蘸拔,旨在操縱和檢索存儲在 IBM 原始準(zhǔn)關(guān)系數(shù)據(jù)庫管理系統(tǒng) System R 中的數(shù)據(jù)宅此。SEQUEL 后來改為 SQL胯府,因為“SEQUEL”是英國 Hawker Siddeley 飛機公司的商標(biāo)。我們看看這款用于特技飛行的英國皇家空軍豪客 Siddeley Hawk T.1A (Looks great):

image.png

第一款 SQL 數(shù)據(jù)庫

在 20 世紀(jì) 70 年代后期,Oracle 公司(當(dāng)時叫 Relational Software薯演,Inc.)開發(fā)了基于 SQL 的 RDBMS,并希望將其出售給美國海軍秧了,Central Intelligence 代理商和其他美國政府機構(gòu)跨扮。 1979 年 6 月,Oracle 公司為 VAX 計算機推出了第一個商業(yè)化的 SQL 實現(xiàn)验毡,即 Oracle V2衡创。

ANSI-SQL 標(biāo)準(zhǔn)的采用

直到 1986 年,ANSI 和 ISO 標(biāo)準(zhǔn)組正式采用了標(biāo)準(zhǔn)的"數(shù)據(jù)庫語言 SQL"語言定義晶通。該標(biāo)準(zhǔn)的新版本發(fā)布于 1989,1992,1996,1999,2003,2006,2008,2011璃氢,以及最近的 2016。Apache Flink SQL 核心算子的語義設(shè)計也參考了1992 狮辽、2011等 ANSI-SQL 標(biāo)準(zhǔn)一也。

SQL 操作及擴展

SQL 是專為查詢包含在關(guān)系數(shù)據(jù)庫中的數(shù)據(jù)而設(shè)計的,是一種基于 SET 操作的聲明性編程語言喉脖,而不是像 C 語言一樣的命令式編程語言椰苟。但是,各大關(guān)系數(shù)據(jù)庫廠商在遵循 ANSI-SQL 標(biāo)準(zhǔn)的同時又對標(biāo)準(zhǔn) SQL 進行擴展树叽,由基于 SET(無重復(fù)元素)的操作擴展到基于 BAG(有重復(fù)元素)的操作舆蝴,并且添加了過程編程語言功能,如:Oracle 的 PL/SQL, DB2 的 SQL PL菱皆,MySQL - SQL/PSM 以及 SQL Server 的 T-SQL 等等须误。
隨著時間的推移 ANSI-SQL 規(guī)范不斷完善,所涉及的功能不斷豐富仇轻,比如在 ANSI-2011 中又增加了 Temporal Table 的標(biāo)準(zhǔn)定義京痢,Temporal Table 的標(biāo)準(zhǔn)在結(jié)構(gòu)化關(guān)系數(shù)據(jù)存儲上添加了時間維度信息,這使得關(guān)系數(shù)據(jù)庫中不僅可以對當(dāng)前數(shù)據(jù)進行查詢操作篷店,根據(jù)時間版本信息也可以對歷史數(shù)據(jù)進行操作祭椰。這些不斷豐富的功能極大增強了 SQL 的應(yīng)用領(lǐng)域。

大數(shù)據(jù)計算領(lǐng)域?qū)?SQL 的應(yīng)用

離線計算(批計算)

提及大數(shù)據(jù)計算領(lǐng)域不得不說 MapReduce 計算模型疲陕,MapReduce 最早是由 Google 公司研究提出的一種面向大規(guī)模數(shù)據(jù)處理的并行計算模型和方法方淤,并發(fā)于 2004 年發(fā)表了論文Simplified Data Processing on Large Clusters
論文發(fā)表之后 Apache 開源社區(qū)參考 Google MapReduce蹄殃,基于 Java 設(shè)計開發(fā)了一個稱為 Hadoop 的開源 MapReduce 并行計算框架携茂。很快得到了全球?qū)W術(shù)界和工業(yè)界的普遍關(guān)注,并得到推廣和普及應(yīng)用诅岩。
但利用 Hadoop 進行 MapReduce 的開發(fā)讳苦,需要開發(fā)人員精通 Java 語言带膜,并了解 MapReduce 的運行原理,這樣在一定程度上提高了 MapReduce 的開發(fā)門檻鸳谜,所以在開源社區(qū)又不斷涌現(xiàn)了一些為了簡化 MapReduce 開發(fā)的開源框架膝藕,其中 Hive 就是典型的代表。HSQL 可以讓用戶以類 SQL 的方式描述 MapReduce 計算咐扭,比如原本需要幾十行芭挽,甚至上百行才能完成的 wordCount,用戶一條 SQL 語句就能完成了蝗肪,這樣極大的降低了 MapReduce 的開發(fā)門檻袜爪,進而也成功的將 SQL 應(yīng)用到了大數(shù)據(jù)計算領(lǐng)域當(dāng)中來。

實時計算(流計算)

SQL 不僅僅被成功的應(yīng)用到了離線計算薛闪,SQL 的易用性也吸引了流計算產(chǎn)品饿敲,目前最熱的 Spark,F(xiàn)link 也紛紛支持了 SQL逛绵,尤其是 Flink 支持的更加徹底,集成了 Calcite倔韭,完全遵循 ANSI-SQL 標(biāo)準(zhǔn)术浪。Apache Flink 在 low-level API 上面用 DataSet 支持批計算,用 DataStream 支持流計算寿酌,但在 High-Level API 上面利用 SQL 將流與批進行了統(tǒng)一胰苏,使得用戶編寫一次 SQL 既可以在流計算中使用,又可以在批計算中使用醇疼,為既有流計算業(yè)務(wù)硕并,又有批計算業(yè)務(wù)的用戶節(jié)省了大量開發(fā)成本。

SQL 高性能與簡潔性

性能

SQL 經(jīng)過傳統(tǒng)數(shù)據(jù)庫領(lǐng)域幾十年的不斷打磨秧荆,查詢優(yōu)化器已經(jīng)能夠極大的優(yōu)化 SQL 的查詢性能倔毙,Apache Flink 應(yīng)用 Calcite 進行查詢優(yōu)化,復(fù)用了大量數(shù)據(jù)庫查詢優(yōu)化規(guī)則乙濒,在性能上不斷追求極致陕赃,能夠讓用戶關(guān)心但不用擔(dān)心性能問題。如下圖(Alibaba 對 Apache Flink 進行架構(gòu)優(yōu)化后的組件棧)

相對于 DataStream 而言颁股,SQL 會經(jīng)過 Optimization 模塊透明的為用戶進行查詢優(yōu)化么库,用戶專心編寫自己的業(yè)務(wù)邏輯,不用擔(dān)心性能甘有,卻能得到最優(yōu)的查詢性能!

簡潔

就簡潔性而言诉儒,SQL 與 DataSet 和 DataStream 相比具有很大的優(yōu)越性,我們先用一個 WordCount 示例來直觀的查看用戶的代碼量:

  • DataStream/DataSetAPI
... //省略初始化代碼
// 核心邏輯
text.flatMap(new WordCount.Tokenizer()).keyBy(new int[]{0}).sum(1);

// flatmap 代碼定義
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        public Tokenizer() {
        }

        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.toLowerCase().split("\\W+");
            String[] var4 = tokens;
            int var5 = tokens.length;

            for(int var6 = 0; var6 < var5; ++var6) {
                String token = var4[var6];
                if (token.length() > 0) {
                    out.collect(new Tuple2(token, 1));
                }
            }

        }
    }
  • SQL
//省略初始化代碼 
SELECT word, COUNT(word) FROM tab GROUP BY word;

我們直觀的體會到相同的統(tǒng)計功能使用 SQL 的簡潔性亏掀。

Flink SQL Job 的組成

我們做任何數(shù)據(jù)計算都離不開讀取原始數(shù)據(jù)忱反,計算邏輯和寫入計算結(jié)果數(shù)據(jù)三部分泛释,當(dāng)然基于 Apache Flink SQL 編寫的計算 Job 也離不開這三個部分,如下所示:

image.png

如上所示缭受,一個完整的 Apache Flink SQL Job 由如下三部分:

  • Source Operator - Soruce operator 是對外部數(shù)據(jù)源的抽象, 目前 Apache Flink 內(nèi)置了很多常用的數(shù)據(jù)源實現(xiàn)胁澳,比如上圖提到的 Kafka。
  • Query Operators - 查詢算子主要完成如圖的 Query Logic米者,目前支持了 Union韭畸,Join,Projection,Difference, Intersection 以及 window 等大多數(shù)傳統(tǒng)數(shù)據(jù)庫支持的操作蔓搞。
  • Sink Operator - Sink operator 是對外結(jié)果表的抽象胰丁,目前 Apache Flink 也內(nèi)置了很多常用的結(jié)果表的抽象,比如上圖提到的 Kafka喂分。

Flink SQL 核心算子

目前 Flink SQL 支持 Union锦庸,Join,Projection,Difference, Intersection 以及 Window 等大多數(shù)傳統(tǒng)數(shù)據(jù)庫支持的操作蒲祈,接下來為大家分別進行簡單直觀的介紹甘萧。

環(huán)境

為了很好的體驗和理解 Apache Flink SQL 算子我們需要先準(zhǔn)備一下測試環(huán)境,我們選擇IDEA梆掸,以 ITCase 測試方式來進行體驗扬卷。IDEA 安裝這里不占篇幅介紹了,相信大家能輕松搞定酸钦!我們進行功能體驗有兩種方式怪得,具體如下:

源碼方式

對于開源愛好者可能更喜歡源代碼方式理解和體驗 Apache Flink SQL 功能,那么我們需要下載源代碼并導(dǎo)入到 IDEA 中:

  • 下載源碼:
// 下載源代碼
git clone https://github.com/apache/flink.git study
// 進入源碼目錄
cd study
// 拉取穩(wěn)定版release-1.6
git fetch origin release-1.6:release-1.6
//切換到穩(wěn)定版
git checkout release-1.6
//將依賴安裝到本地mvn倉庫卑硫,耐心等待需要一段時間
mvn clean install -DskipTests
  • 導(dǎo)入到 IDEA
    將 Flink 源碼導(dǎo)入到 IDEA 過程這里不再占用篇幅徒恋,導(dǎo)入后確保在 IDEA 中可以運行 org.apache.flink.table.runtime.stream.sql.SqlITCase 并測試全部通過,即證明體驗環(huán)境已經(jīng)完成欢伏。如下圖所示:
image.png

如上圖運行測試后顯示測試通過入挣,我們就可以繼續(xù)下面的 Apache Flink SQL 功能體驗了。

依賴 Flink 包方式

我們還有一種更簡單直接的方式硝拧,就是新建一個 mvn 項目财岔,并在 pom 中添加如下依賴:

<properties>
    <table.version>1.6-SNAPSHOT</table.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table_2.11</artifactId>
      <version>${table.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_2.11</artifactId>
      <version>${table.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_2.11</artifactId>
      <version>${table.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.11</artifactId>
      <version>${table.version}</version>
    </dependency>

    <dependency>
      <groupId>JUnit</groupId>
      <artifactId>JUnit</artifactId>
      <version>4.12</version>
    </dependency>

  </dependencies>

完成環(huán)境準(zhǔn)備后,我們開始準(zhǔn)備測試數(shù)據(jù)和寫一個簡單的測試類河爹。

示例數(shù)據(jù)及測試類

測試數(shù)據(jù)

  • customer_tab 表 - 客戶表保存客戶 id匠璧,客戶姓名和客戶描述信息。字段及測試數(shù)據(jù)如下:
c_id c_name c_desc
c_001 Kevin from JinLin
c_002 Sunny from JinLin
c_003 JinCheng from HeBei
  • order_tab 表 - 訂單表保存客戶購買的訂單信息咸这,包括訂單 id夷恍,訂單時間和訂單描述信息。 字段節(jié)測試數(shù)據(jù)如下:
o_id c_id o_time o_desc
o_oo1 c_002 2018-11-05 10:01:01 iphone
o_002 c_001 2018-11-05 10:01:55 ipad
o_003 c_001 2018-11-05 10:03:44 flink book
  • Item_tab
    商品表, 攜帶商品 id,商品類型酿雪,出售時間遏暴,價格等信息,具體如下:
itemID itemType onSellTime price
ITEM001 Electronic 2017-11-11 10:01:00 20
ITEM002 Electronic 2017-11-11 10:02:00 50
ITEM003 Electronic 2017-11-11 10:03:00 30
ITEM004 Electronic 2017-11-11 10:03:00 60
ITEM005 Electronic 2017-11-11 10:05:00 40
ITEM006 Electronic 2017-11-11 10:06:00 20
ITEM007 Electronic 2017-11-11 10:07:00 70
ITEM008 Clothes 2017-11-11 10:08:00 20
  • PageAccess_tab
    頁面訪問表指黎,包含用戶 ID朋凉,訪問時間,用戶所在地域信息醋安,具體數(shù)據(jù)如下:
region userId accessTime
ShangHai U0010 2017-11-11 10:01:00
BeiJing U1001 2017-11-11 10:01:00
BeiJing U2032 2017-11-11 10:10:00
BeiJing U1100 2017-11-11 10:11:00
ShangHai U0011 2017-11-11 12:10:00
  • PageAccessCount_tab
    頁面訪問表杂彭,訪問量,訪問時間吓揪,用戶所在地域信息亲怠,具體數(shù)據(jù)如下:
region userCount accessTime
ShangHai 100 2017.11.11 10:01:00
BeiJing 86 2017.11.11 10:01:00
BeiJing 210 2017.11.11 10:06:00
BeiJing 33 2017.11.11 10:10:00
ShangHai 129 2017.11.11 12:10:00
  • PageAccessSession_tab
    頁面訪問表,訪問量柠辞,訪問時間团秽,用戶所在地域信息,具體數(shù)據(jù)如下:
region userId accessTime
ShangHai U0011 2017-11-11 10:01:00
ShangHai U0012 2017-11-11 10:02:00
ShangHai U0013 2017-11-11 10:03:00
ShangHai U0015 2017-11-11 10:05:00
ShangHai U0011 2017-11-11 10:10:00
BeiJing U0110 2017-11-11 10:10:00
ShangHai U2010 2017-11-11 10:11:00
ShangHai U0410 2017-11-11 12:16:00

測試類

我們創(chuàng)建一個SqlOverviewITCase.scala 用于接下來介紹 Flink SQL 算子的功能體驗叭首。代碼如下:

import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.StateBackend
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row
import org.junit.rules.TemporaryFolder
import org.junit.{Rule, Test}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

class SqlOverviewITCase {
  val _tempFolder = new TemporaryFolder

  @Rule
  def tempFolder: TemporaryFolder = _tempFolder

  def getStateBackend: StateBackend = {
    new MemoryStateBackend()
  }

  // 客戶表數(shù)據(jù)
  val customer_data = new mutable.MutableList[(String, String, String)]
  customer_data.+=(("c_001", "Kevin", "from JinLin"))
  customer_data.+=(("c_002", "Sunny", "from JinLin"))
  customer_data.+=(("c_003", "JinCheng", "from HeBei"))


  // 訂單表數(shù)據(jù)
  val order_data = new mutable.MutableList[(String, String, String, String)]
  order_data.+=(("o_001", "c_002", "2018-11-05 10:01:01", "iphone"))
  order_data.+=(("o_002", "c_001", "2018-11-05 10:01:55", "ipad"))
  order_data.+=(("o_003", "c_001", "2018-11-05 10:03:44", "flink book"))

  // 商品銷售表數(shù)據(jù)
  val item_data = Seq(
    Left((1510365660000L, (1510365660000L, 20, "ITEM001", "Electronic"))),
    Right((1510365660000L)),
    Left((1510365720000L, (1510365720000L, 50, "ITEM002", "Electronic"))),
    Right((1510365720000L)),
    Left((1510365780000L, (1510365780000L, 30, "ITEM003", "Electronic"))),
    Left((1510365780000L, (1510365780000L, 60, "ITEM004", "Electronic"))),
    Right((1510365780000L)),
    Left((1510365900000L, (1510365900000L, 40, "ITEM005", "Electronic"))),
    Right((1510365900000L)),
    Left((1510365960000L, (1510365960000L, 20, "ITEM006", "Electronic"))),
    Right((1510365960000L)),
    Left((1510366020000L, (1510366020000L, 70, "ITEM007", "Electronic"))),
    Right((1510366020000L)),
    Left((1510366080000L, (1510366080000L, 20, "ITEM008", "Clothes"))),
    Right((151036608000L)))

  // 頁面訪問表數(shù)據(jù)
  val pageAccess_data = Seq(
    Left((1510365660000L, (1510365660000L, "ShangHai", "U0010"))),
    Right((1510365660000L)),
    Left((1510365660000L, (1510365660000L, "BeiJing", "U1001"))),
    Right((1510365660000L)),
    Left((1510366200000L, (1510366200000L, "BeiJing", "U2032"))),
    Right((1510366200000L)),
    Left((1510366260000L, (1510366260000L, "BeiJing", "U1100"))),
    Right((1510366260000L)),
    Left((1510373400000L, (1510373400000L, "ShangHai", "U0011"))),
    Right((1510373400000L)))

  // 頁面訪問量表數(shù)據(jù)2
  val pageAccessCount_data = Seq(
    Left((1510365660000L, (1510365660000L, "ShangHai", 100))),
    Right((1510365660000L)),
    Left((1510365660000L, (1510365660000L, "BeiJing", 86))),
    Right((1510365660000L)),
    Left((1510365960000L, (1510365960000L, "BeiJing", 210))),
    Right((1510366200000L)),
    Left((1510366200000L, (1510366200000L, "BeiJing", 33))),
    Right((1510366200000L)),
    Left((1510373400000L, (1510373400000L, "ShangHai", 129))),
    Right((1510373400000L)))

  // 頁面訪問表數(shù)據(jù)3
  val pageAccessSession_data = Seq(
    Left((1510365660000L, (1510365660000L, "ShangHai", "U0011"))),
    Right((1510365660000L)),
    Left((1510365720000L, (1510365720000L, "ShangHai", "U0012"))),
    Right((1510365720000L)),
    Left((1510365720000L, (1510365720000L, "ShangHai", "U0013"))),
    Right((1510365720000L)),
    Left((1510365900000L, (1510365900000L, "ShangHai", "U0015"))),
    Right((1510365900000L)),
    Left((1510366200000L, (1510366200000L, "ShangHai", "U0011"))),
    Right((1510366200000L)),
    Left((1510366200000L, (1510366200000L, "BeiJing", "U2010"))),
    Right((1510366200000L)),
    Left((1510366260000L, (1510366260000L, "ShangHai", "U0011"))),
    Right((1510366260000L)),
    Left((1510373760000L, (1510373760000L, "ShangHai", "U0410"))),
    Right((1510373760000L)))

  def procTimePrint(sql: String): Unit = {
    // Streaming 環(huán)境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)

    // 將order_tab, customer_tab 注冊到catalog
    val customer = env.fromCollection(customer_data).toTable(tEnv).as('c_id, 'c_name, 'c_desc)
    val order = env.fromCollection(order_data).toTable(tEnv).as('o_id, 'c_id, 'o_time, 'o_desc)

    tEnv.registerTable("order_tab", order)
    tEnv.registerTable("customer_tab", customer)

    val result = tEnv.sqlQuery(sql).toRetractStream[Row]
    val sink = new RetractingSink
    result.addSink(sink)
    env.execute()
  }

  def rowTimePrint(sql: String): Unit = {
    // Streaming 環(huán)境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setStateBackend(getStateBackend)
    env.setParallelism(1)
    val tEnv = TableEnvironment.getTableEnvironment(env)

    // 將item_tab, pageAccess_tab 注冊到catalog
    val item =
      env.addSource(new EventTimeSourceFunction[(Long, Int, String, String)](item_data))
      .toTable(tEnv, 'onSellTime, 'price, 'itemID, 'itemType, 'rowtime.rowtime)

    val pageAccess =
      env.addSource(new EventTimeSourceFunction[(Long, String, String)](pageAccess_data))
      .toTable(tEnv, 'accessTime, 'region, 'userId, 'rowtime.rowtime)

    val pageAccessCount =
      env.addSource(new EventTimeSourceFunction[(Long, String, Int)](pageAccessCount_data))
      .toTable(tEnv, 'accessTime, 'region, 'accessCount, 'rowtime.rowtime)

    val pageAccessSession =
      env.addSource(new EventTimeSourceFunction[(Long, String, String)](pageAccessSession_data))
      .toTable(tEnv, 'accessTime, 'region, 'userId, 'rowtime.rowtime)

    tEnv.registerTable("item_tab", item)
    tEnv.registerTable("pageAccess_tab", pageAccess)
    tEnv.registerTable("pageAccessCount_tab", pageAccessCount)
    tEnv.registerTable("pageAccessSession_tab", pageAccessSession)

    val result = tEnv.sqlQuery(sql).toRetractStream[Row]
    val sink = new RetractingSink
    result.addSink(sink)
    env.execute()

  }

  @Test
  def testSelect(): Unit = {
    val sql = "替換想要測試的SQL"
    // 非window 相關(guān)用 procTimePrint(sql)
    // Window 相關(guān)用 rowTimePrint(sql)
  }

}

// 自定義Sink
final class RetractingSink extends RichSinkFunction[(Boolean, Row)] {
  var retractedResults: ArrayBuffer[String] = mutable.ArrayBuffer.empty[String]

  def invoke(v: (Boolean, Row)) {
    retractedResults.synchronized {
      val value = v._2.toString
      if (v._1) {
        retractedResults += value
      } else {
        val idx = retractedResults.indexOf(value)
        if (idx >= 0) {
          retractedResults.remove(idx)
        } else {
          throw new RuntimeException("Tried to retract a value that wasn't added first. " +
                                       "This is probably an incorrectly implemented test. " +
                                       "Try to set the parallelism of the sink to 1.")
        }
      }
    }
    retractedResults.sorted.foreach(println(_))
  }
}

// Water mark 生成器
class EventTimeSourceFunction[T](
  dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
  override def run(ctx: SourceContext[T]): Unit = {
    dataWithTimestampList.foreach {
      case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
      case Right(w) => ctx.emitWatermark(new Watermark(w))
    }
  }

  override def cancel(): Unit = ???
}

Select

SELECT 用于從數(shù)據(jù)集/流中選擇數(shù)據(jù)习勤,語法遵循 ANSI-SQL 標(biāo)準(zhǔn),語義是關(guān)系代數(shù)中的投影(Projection),對關(guān)系進行垂直分割焙格,消去某些列, 如下圖所示:

image.png

SQL 示例

customer_tab選擇用戶姓名姻报,并用內(nèi)置的 CONCAT 函數(shù)拼接客戶信息,如下:

SELECT c_name, CONCAT(c_name, ' come ', c_desc) as desc  FROM customer_tab;

Result

c_name desc
Kevin Kevin come from JinLin
Sunny Sunny come from JinLin
Jincheng Jincheng come from HeBei

特別說明

大家看到在 SELECT 不僅可以使用普通的字段選擇间螟,還可以使用ScalarFunction,當(dāng)然也包括User-Defined Function,同時還可以進行字段的alias設(shè)置损肛。其實SELECT可以結(jié)合聚合厢破,在 GROUPBY 部分會進行介紹,一個比較特殊的使用場景是攜帶 DISTINCT 關(guān)鍵字,示例如下:

SQL 示例

在訂單表查詢所有的客戶 id治拿,消除重復(fù)客戶 id, 如下:

SELECT DISTINCT c_id FROM order_tab;

Result

c_id
c_001
c_002

WHERE

WHERE 用于從數(shù)據(jù)集/流中過濾數(shù)據(jù)摩泪,與 SELECT 一起使用,語法遵循 ANSI-SQL 標(biāo)準(zhǔn)劫谅,語義是關(guān)系代數(shù)的 Selection见坑,根據(jù)某些條件對關(guān)系做水平分割,即選擇符合條件的記錄捏检,如下所示:

image.png

SQL 示例

customer_tab查詢客戶 id 為c_001c_003的客戶信息荞驴,如下:

SELECT c_id, c_name, c_desc FROM customer_tab WHERE c_id = 'c_001' OR c_id = 'c_003';

Result

c_id c_name c_desc
c_001 Kevin from JinLin
c_003 JinCheng from HeBei

特別說明

我們發(fā)現(xiàn)WHERE是對滿足一定條件的數(shù)據(jù)進行過濾,WHERE支持=, <, >, <>, >=, <=以及AND贯城, OR等表達(dá)式的組合熊楼,最終滿足過濾條件的數(shù)據(jù)會被選擇出來。并且 WHERE 可以結(jié)合IN,NOT IN聯(lián)合使用能犯,具體如下:

SQL 示例 (IN 常量)

使用 INcustomer_tab查詢客戶 id 為c_001c_003的客戶信息鲫骗,如下:

SELECT c_id, c_name, c_desc FROM customer_tab WHERE c_id IN ('c_001', 'c_003');

Result

c_id c_name c_desc
c_001 Kevin from JinLin
c_003 JinCheng from HeBei

SQL 示例 (IN 子查詢)

使用 IN和 子查詢 在customer_tab查詢已經(jīng)下過訂單的客戶信息犬耻,如下:

SELECT c_id, c_name, c_desc FROM customer_tab WHERE c_id IN (SELECT c_id FROM order_tab);

Result

c_id c_name c_desc
c_001 Kevin from JinLin
c_002 Sunny from JinLin

IN/NOT IN 與關(guān)系代數(shù)

如上介紹 IN 是關(guān)系代數(shù)中的 Intersection, NOT IN 是關(guān)系代數(shù)的 Difference执泰, 如下圖示意:

  • IN(Intersection)
    image.png
  • NOT IN(Difference)
    image.png

GROUP BY

GROUP BY 是對數(shù)據(jù)進行分組的操作枕磁,比如我需要分別計算一下一個學(xué)生表里面女生和男生的人數(shù)分別是多少,如下:

image.png

SQL 示例

將 order_tab 信息按 customer_tab 分組統(tǒng)計訂單數(shù)量术吝,簡單示例如下:

SELECT c_id, count(o_id) as o_count FROM order_tab GROUP BY c_id;

Result

c_id o_count
c_001 2
c_002 1

特別說明

在實際的業(yè)務(wù)場景中计济,GROUP BY 除了按業(yè)務(wù)字段進行分組外,很多時候用戶也可以用時間來進行分組(相當(dāng)于劃分窗口)顿苇,比如統(tǒng)計每分鐘的訂單數(shù)量:

SQL 示例

按時間進行分組峭咒,查詢每分鐘的訂單數(shù)量,如下:

   SELECT SUBSTRING(o_time, 1, 16) AS o_time_min, count(o_id) AS o_count FROM order_tab GROUP BY SUBSTRING(o_time, 1, 16)

Result

o_time_min o_count
2018-11-05 10:01 2
2018-11-05 10:03 1

說明:如果我們時間字段是 timestamp 類型纪岁,建議使用內(nèi)置的 DATE_FORMAT 函數(shù)凑队。

UNION ALL

UNION ALL 將兩個表合并起來,要求兩個表的字段完全一致幔翰,包括字段類型漩氨、字段順序,語義對應(yīng)關(guān)系代數(shù)的 Union,只是關(guān)系代數(shù)是 Set 集合操作遗增,會有去重復(fù)操作叫惊,UNION ALL 不進行去重,如下所示:

image.png

SQL 示例

我們簡單的將customer_tab查詢 2 次做修,將查詢結(jié)果合并起來霍狰,如下:

SELECT c_id, c_name, c_desc  FROM customer_tab UNION ALL SELECT c_id, c_name, c_desc  FROM customer_tab

Result

c_id c_name c_desc
c_001 Kevin from JinLin
c_002 Sunny from JinLin
c_003 JinCheng from HeBei
c_001 Kevin from JinLin
c_002 Sunny from JinLin
c_003 JinCheng from HeBei

特別說明

UNION ALL 對結(jié)果數(shù)據(jù)不進行去重,如果想對結(jié)果數(shù)據(jù)進行去重饰及,傳統(tǒng)數(shù)據(jù)庫需要進行 UNION 操作蔗坯。

UNION

UNION 將兩個流給合并起來,要求兩個流的字段完全一致燎含,包括字段類型宾濒、字段順序,并其 UNION 不同于 UNION ALL屏箍,UNION 會對結(jié)果數(shù)據(jù)去重,與關(guān)系代數(shù)的 Union 語義一致绘梦,如下:
[圖片上傳失敗...(image-a1f92a-1592206042833)]

SQL 示例

我們簡單的將customer_tab查詢 2 次,將查詢結(jié)果合并起來赴魁,如下:

SELECT c_id, c_name, c_desc  FROM customer_tab UNION SELECT c_id, c_name, c_desc  FROM customer_tab

我們發(fā)現(xiàn)完全一樣的表數(shù)據(jù)進行 UNION之后卸奉,數(shù)據(jù)是被去重的,UNION之后的數(shù)據(jù)并沒有增加颖御。

Result

c_id c_name c_desc
c_001 Kevin from JinLin
c_002 Sunny from JinLin
c_003 JinCheng from HeBei

特別說明

UNION 對結(jié)果數(shù)據(jù)進行去重择卦,在實際的實現(xiàn)過程需要對數(shù)據(jù)進行排序操作,所以非必要去重情況請使用 UNION ALL 操作。

JOIN

JOIN 用于把來自兩個表的行聯(lián)合起來形成一個寬表秉继,Apache Flink 支持的 JOIN 類型:

  • JOIN - INNER JOIN
  • LEFT JOIN - LEFT OUTER JOIN
  • RIGHT JOIN - RIGHT OUTER JOIN
  • FULL JOIN - FULL OUTER JOIN

JOIN 與關(guān)系代數(shù)的 Join 語義相同祈噪,具體如下:

image.png

SQL 示例 (JOIN)

INNER JOIN只選擇滿足ON條件的記錄,我們查詢customer_taborder_tab表尚辑,將有訂單的客戶和訂單信息選擇出來辑鲤,如下:

SELECT * FROM customer_tab AS c JOIN order_tab AS o ON o.c_id = c.c_id

Result

c_id c_name c_desc o_id c_id o_time o_desc
c_001 Kevin from JinLin o_002 c_001 2018-11-05 10:01:55 ipad
c_001 Kevin from JinLin o_003 c_001 2018-11-05 10:03:44 flink book
c_002 Sunny from JinLin o_oo1 c_002 2018-11-05 10:01:01 iphone

SQL 示例 (LEFT JOIN)

LEFT JOININNER JOIN的區(qū)別是當(dāng)右表沒有與左邊相 JOIN 的數(shù)據(jù)時候,右邊對應(yīng)的字段補NULL輸出杠茬,語義如下:

[圖片上傳失敗...(image-5939b9-1592206042833)]

對應(yīng)的 SQL 語句如下(LEFT JOIN):

SELECT ColA, ColB, T2.ColC, ColE FROM TI LEFT JOIN T2 ON T1.ColC = T2.ColC ;
  • 細(xì)心的讀者可能發(fā)現(xiàn)上面 T2.ColC 是添加了前綴 T2 了月褥,這里需要說明一下,當(dāng)兩張表有字段名字一樣的時候瓢喉,我需要指定是從那個表里面投影的宁赤。

我們查詢customer_taborder_tab表,將客戶和訂單信息選擇出來如下:

SELECT * FROM customer_tab AS c LEFT JOIN order_tab AS o ON o.c_id = c.c_id

Result

c_id c_name c_desc o_id c_id o_time o_desc
c_001 Kevin from JinLin o_002 c_001 2018-11-05 10:01:55 ipad
c_001 Kevin from JinLin o_003 c_001 2018-11-05 10:03:44 flink book
c_002 Sunny from JinLin o_oo1 c_002 2018-11-05 10:01:01 iphone
c_003 JinCheng from HeBei NULL NULL NULL NULL

特別說明

RIGHT JOIN 相當(dāng)于 LEFT JOIN 左右兩個表交互一下位置栓票。FULL JOIN相當(dāng)于 RIGHT JOINLEFT JOIN 之后進行UNION ALL操作决左。

Window

在 Apache Flink 中有 2 種類型的 Window,一種是 OverWindow走贪,即傳統(tǒng)數(shù)據(jù)庫的標(biāo)準(zhǔn)開窗佛猛,每一個元素都對應(yīng)一個窗口。一種是 GroupWindow坠狡,目前在 SQL 中 GroupWindow 都是基于時間進行窗口劃分的继找。

Over Window

Apache Flink 中對 OVER Window 的定義遵循標(biāo)準(zhǔn) SQL 的定義語法。
按 ROWS 和 RANGE 分類是傳統(tǒng)數(shù)據(jù)庫的標(biāo)準(zhǔn)分類方法逃沿,在 Apache Flink 中還可以根據(jù)時間類型(ProcTime/EventTime)和窗口的有限和無限(Bounded/UnBounded)進行分類婴渡,共計 8 種類型。為了避免大家對過細(xì)分類造成困擾凯亮,我們按照確定當(dāng)前行的不同方式將 OVER Window 分成兩大類進行介紹边臼,如下:

  • ROWS OVER Window - 每一行元素都視為新的計算行,即触幼,每一行都是一個新的窗口。
  • RANGE OVER Window - 具有相同時間值的所有元素行視為同一計算行究飞,即置谦,具有相同時間值的所有行都是同一個窗口。

Bounded ROWS OVER Window

Bounded ROWS OVER Window 每一行元素都視為新的計算行亿傅,即媒峡,每一行都是一個新的窗口。

語義

我們以 3 個元素(2 PRECEDING)的窗口為例葵擎,如下圖:

image.png

上圖所示窗口 user 1 的 w5 和 w6谅阿, user 2 的 窗口 w2 和 w3,雖然有元素都是同一時刻到達(dá),但是他們?nèi)匀皇窃诓煌拇翱谇┎停@一點有別于 RANGE OVER Window寓涨。

語法

Bounded ROWS OVER Window 語法如下:

SELECT 
    agg1(col1) OVER(
     [PARTITION BY (value_expression1,..., value_expressionN)] 
     ORDER BY timeCol
     ROWS 
     BETWEEN (UNBOUNDED | rowCount) PRECEDING AND CURRENT ROW) AS colName, 
... 
FROM Tab1
  • value_expression - 進行分區(qū)的字表達(dá)式;
  • timeCol - 用于元素排序的時間字段氯檐;
  • rowCount - 是定義根據(jù)當(dāng)前行開始向前追溯幾行元素戒良。
SQL 示例

利用item_tab測試數(shù)據(jù),我們統(tǒng)計同類商品中當(dāng)前和當(dāng)前商品之前 2 個商品中的最高價格冠摄。

SELECT  
    itemID,
    itemType, 
    onSellTime, 
    price,  
    MAX(price) OVER (
        PARTITION BY itemType 
        ORDER BY onSellTime 
        ROWS BETWEEN 2 preceding AND CURRENT ROW) AS maxPrice
  FROM item_tab

Result

itemID itemType onSellTime price maxPrice
ITEM001 Electronic 2017-11-11 10:01:00 20 20
ITEM002 Electronic 2017-11-11 10:02:00 50 50
ITEM003 Electronic 2017-11-11 10:03:00 30 50
ITEM004 Electronic 2017-11-11 10:03:00 60 60
ITEM005 Electronic 2017-11-11 10:05:00 40 60
ITEM006 Electronic 2017-11-11 10:06:00 20 60
ITEM007 Electronic 2017-11-11 10:07:00 70 70
ITEM008 Clothes 2017-11-11 10:08:00 20 20

Bounded RANGE OVER Window

Bounded RANGE OVER Window 具有相同時間值的所有元素行視為同一計算行糯崎,即,具有相同時間值的所有行都是同一個窗口河泳。

語義

我們以 3 秒中數(shù)據(jù)(INTERVAL '2' SECOND)的窗口為例沃呢,如下圖:

image.png

注意: 上圖所示窗口 user 1 的 w6, user 2 的 窗口 w3拆挥,元素都是同一時刻到達(dá),他們是在同一個窗口薄霜,這一點有別于 ROWS OVER Window。

語法

Bounded RANGE OVER Window 的語法如下:

SELECT 
    agg1(col1) OVER(
     [PARTITION BY (value_expression1,..., value_expressionN)] 
     ORDER BY timeCol
     RANGE 
     BETWEEN (UNBOUNDED | timeInterval) PRECEDING AND CURRENT ROW) AS colName, 
... 
FROM Tab1
  • value_expression - 進行分區(qū)的字表達(dá)式竿刁;
  • timeCol - 用于元素排序的時間字段黄锤;
  • timeInterval - 是定義根據(jù)當(dāng)前行開始向前追溯指定時間的元素行;
SQL 示例

我們統(tǒng)計同類商品中當(dāng)前和當(dāng)前商品之前 2 分鐘商品中的最高價格食拜。

SELECT  
    itemID,
    itemType, 
    onSellTime, 
    price,  
    MAX(price) OVER (
        PARTITION BY itemType 
        ORDER BY rowtime 
        RANGE BETWEEN INTERVAL '2' MINUTE preceding AND CURRENT ROW) AS maxPrice
  FROM item_tab
Result(Bounded RANGE OVER Window)
itemID itemType onSellTime price maxPrice
ITEM001 Electronic 2017-11-11 10:01:00 20 20
ITEM002 Electronic 2017-11-11 10:02:00 50 50
ITEM003 Electronic 2017-11-11 10:03:00 30 60
ITEM004 Electronic 2017-11-11 10:03:00 60 60
ITEM005 Electronic 2017-11-11 10:05:00 40 60
ITEM006 Electronic 2017-11-11 10:06:00 20 40
ITEM007 Electronic 2017-11-11 10:07:00 70 70
ITEM008 Clothes 2017-11-11 10:08:00 20 20

特別說明

OverWindow 最重要是要理解每一行數(shù)據(jù)都確定一個窗口鸵熟,同時目前在 Apache Flink 中只支持按時間字段排序。并且 OverWindow 開窗與 GroupBy 方式數(shù)據(jù)分組最大的不同在于负甸,GroupBy 數(shù)據(jù)分組統(tǒng)計時候流强,在SELECT中除了 GROUP BY 的 key,不能直接選擇其他非 key 的字段呻待,但是 OverWindow 沒有這個限制打月,SELECT可以選擇任何字段。比如一張表 table(a,b,c,d)4 個字段蚕捉,如果按 d 分組求 c 的最大值奏篙,兩種寫完如下:

  • GROUP BY - SELECT d, MAX(c) FROM table GROUP BY d
  • OVER Window = SELECT a, b, c, d, MAX(c) OVER(PARTITION BY d, ORDER BY ProcTime())
    如上 OVER Window 雖然 PARTITION BY d,但 SELECT 中仍然可以選擇 a,b,c 字段。但在 GROUPBY 中迫淹,SELECT 只能選擇 d 字段秘通。

Group Window

根據(jù)窗口數(shù)據(jù)劃分的不同,目前 Apache Flink 有如下 3 種 Bounded Winodw:

  • Tumble - 滾動窗口敛熬,窗口數(shù)據(jù)有固定的大小肺稀,窗口數(shù)據(jù)無疊加;
  • Hop - 滑動窗口应民,窗口數(shù)據(jù)有固定大小话原,并且有固定的窗口重建頻率夕吻,窗口數(shù)據(jù)有疊加;
  • Session - 會話窗口繁仁,窗口數(shù)據(jù)沒有固定的大小涉馅,根據(jù)窗口數(shù)據(jù)活躍程度劃分窗口,窗口數(shù)據(jù)無疊加改备。

說明: Aapche Flink 還支持 UnBounded 的 Group Window控漠,也就是全局 Window,流上所有數(shù)據(jù)都在一個窗口里面悬钳,語義非常簡單盐捷,這里不做詳細(xì)介紹了。

Tumble

語義

Tumble 滾動窗口有固定 size默勾,窗口數(shù)據(jù)不重疊,具體語義如下:

image.png

語法

Tumble 滾動窗口對應(yīng)的語法如下:

SELECT 
    [gk],
    [TUMBLE_START(timeCol, size)], 
    [TUMBLE_END(timeCol, size)], 
    agg1(col1), 
    ... 
    aggn(colN)
FROM Tab1
GROUP BY [gk], TUMBLE(timeCol, size)
  • [gk] - 決定了流是 Keyed 還是/Non-Keyed;
  • TUMBLE_START - 窗口開始時間;
  • TUMBLE_END - 窗口結(jié)束時間;
  • timeCol - 是流表中表示時間字段碉渡;
  • size - 表示窗口的大小,如 秒母剥,分鐘滞诺,小時,天环疼。
SQL 示例

利用pageAccess_tab測試數(shù)據(jù)习霹,我們需要按不同地域統(tǒng)計每 2 分鐘的淘寶首頁的訪問量(PV)。

SELECT  
    region,
    TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS winStart,  
    TUMBLE_END(rowtime, INTERVAL '2' MINUTE) AS winEnd,  
    COUNT(region) AS pv
FROM pageAccess_tab 
GROUP BY region, TUMBLE(rowtime, INTERVAL '2' MINUTE)
Result
region winStart winEnd pv
BeiJing 2017-11-11 02:00:00.0 2017-11-11 02:02:00.0 1
BeiJing 2017-11-11 02:10:00.0 2017-11-11 02:12:00.0 2
ShangHai 2017-11-11 02:00:00.0 2017-11-11 02:02:00.0 1
ShangHai 2017-11-11 04:10:00.0 2017-11-11 04:12:00.0 1

Hop

Hop 滑動窗口和滾動窗口類似炫隶,窗口有固定的 size淋叶,與滾動窗口不同的是滑動窗口可以通過 slide 參數(shù)控制滑動窗口的新建頻率。因此當(dāng) slide 值小于窗口 size 的值的時候多個滑動窗口會重疊伪阶。

語義

Hop 滑動窗口語義如下所示:

image.png

語法

Hop 滑動窗口對應(yīng)語法如下:

SELECT 
    [gk], 
    [HOP_START(timeCol, slide, size)] ,  
    [HOP_END(timeCol, slide, size)],
    agg1(col1), 
    ... 
    aggN(colN) 
FROM Tab1
GROUP BY [gk], HOP(timeCol, slide, size)
  • [gk] 決定了流是 Keyed 還是/Non-Keyed;
  • HOP_START - 窗口開始時間;
  • HOP_END - 窗口結(jié)束時間;
  • timeCol - 是流表中表示時間字段煞檩;
  • slide - 是滑動步伐的大小栅贴;
  • size - 是窗口的大小斟湃,如 秒,分鐘檐薯,小時凝赛,天;
SQL 示例

利用pageAccessCount_tab測試數(shù)據(jù)坛缕,我們需要每 5 分鐘統(tǒng)計近 10 分鐘的頁面訪問量(PV).

SELECT  
  HOP_START(rowtime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE) AS winStart,  
  HOP_END(rowtime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE) AS winEnd,  
  SUM(accessCount) AS accessCount  
FROM pageAccessCount_tab 
GROUP BY HOP(rowtime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE)
Result
winStart winEnd accessCount
2017-11-11 01:55:00.0 2017-11-11 02:05:00.0 186
2017-11-11 02:00:00.0 2017-11-11 02:10:00.0 396
2017-11-11 02:05:00.0 2017-11-11 02:15:00.0 243
2017-11-11 02:10:00.0 2017-11-11 02:20:00.0 33
2017-11-11 04:05:00.0 2017-11-11 04:15:00.0 129
2017-11-11 04:10:00.0 2017-11-11 04:20:00.0 129

Session

Seeeion 會話窗口 是沒有固定大小的窗口墓猎,通過 session 的活躍度分組元素。不同于滾動窗口和滑動窗口祷膳,會話窗口不重疊,也沒有固定的起止時間陶衅。一個會話窗口在一段時間內(nèi)沒有接收到元素時,即當(dāng)出現(xiàn)非活躍間隙時關(guān)閉。一個會話窗口 分配器通過配置 session gap 來指定非活躍周期的時長.

語義

Session 會話窗口語義如下所示:

image.png
語法

Seeeion 會話窗口對應(yīng)語法如下:

SELECT 
    [gk], 
    SESSION_START(timeCol, gap) AS winStart,  
    SESSION_END(timeCol, gap) AS winEnd,
    agg1(col1),
     ... 
    aggn(colN)
FROM Tab1
GROUP BY [gk], SESSION(timeCol, gap)
  • [gk] 決定了流是 Keyed 還是/Non-Keyed;
  • SESSION_START - 窗口開始時間趁啸;
  • SESSION_END - 窗口結(jié)束時間怕轿;
  • timeCol - 是流表中表示時間字段熙侍;
  • gap - 是窗口數(shù)據(jù)非活躍周期的時長弛针;
SQL 示例

利用pageAccessSession_tab測試數(shù)據(jù)裳朋,我們按地域統(tǒng)計連續(xù)的兩個訪問用戶之間的訪問時間間隔不超過 3 分鐘的的頁面訪問量(PV).

SELECT  
    region, 
    SESSION_START(rowtime, INTERVAL '3' MINUTE) AS winStart,  
    SESSION_END(rowtime, INTERVAL '3' MINUTE) AS winEnd, 
    COUNT(region) AS pv  
FROM pageAccessSession_tab
GROUP BY region, SESSION(rowtime, INTERVAL '3' MINUTE)
Result
region winStart winEnd pv
BeiJing 2017-11-11 02:10:00.0 2017-11-11 02:13:00.0 1
ShangHai 2017-11-11 02:01:00.0 2017-11-11 02:08:00.0 4
ShangHai 2017-11-11 02:10:00.0 2017-11-11 02:14:00.0 2
ShangHai 2017-11-11 04:16:00.0 2017-11-11 04:19:00.0 1

UDX

Apache Flink 除了提供了大部分 ANSI-SQL 的核心算子匪蝙,也為用戶提供了自己編寫業(yè)務(wù)代碼的機會敛摘,那就是 User-Defined Function,目前支持如下三種 User-Defined Function:

  • UDF - User-Defined Scalar Function
  • UDTF - User-Defined Table Function
  • UDAF - User-Defined Aggregate Funciton

UDX 都是用戶自定義的函數(shù)门烂,那么 Apache Flink 框架為啥將自定義的函數(shù)分成三類呢?是根據(jù)什么劃分的呢兄淫?Apache Flink 對自定義函數(shù)進行分類的依據(jù)是根據(jù)函數(shù)語義的不同屯远,函數(shù)的輸入和輸出不同來分類的,具體如下:

UDX INPUT OUTPUT INPUT:OUTPUT
UDF 單行中的N(N>=0)列 單行中的1列 1:1
UDTF 單行中的N(N>=0)列 M(M>=0)行 1:N(N>=0)
UDAF M(M>=0)行中的每行的N(N>=0)列 單行中的1列 M:1(M>=0)

UDF

  • 定義
    用戶想自己編寫一個字符串聯(lián)接的 UDF捕虽,我們只需要實現(xiàn)ScalarFunction#eval()方法即可慨丐,簡單實現(xiàn)如下:
object MyConnect extends ScalarFunction {
  @varargs
  def eval(args: String*): String = {
    val sb = new StringBuilder
    var i = 0
    while (i < args.length) {
      if (args(i) == null) {
        return null
      }
      sb.append(args(i))
      i += 1
    }
    sb.toString
  }
}
  • 使用
 val fun = MyConnect
 tEnv.registerFunction("myConnect", fun)
 val sql = "SELECT myConnect(a, b) as str FROM tab"

UDTF

  • 定義
    用戶想自己編寫一個字符串切分的 UDTF,我們只需要實現(xiàn)TableFunction#eval()方法即可泄私,簡單實現(xiàn)如下:

ScalarFunction#eval()

class MySplit extends TableFunction[String] {
  def eval(str: String): Unit = {
    if (str.contains("#")){
      str.split("#").foreach(collect)
    }
  }

  def eval(str: String, prefix: String): Unit = {
    if (str.contains("#")) {
      str.split("#").foreach(s => collect(prefix + s))
    }
  }
}
  • 使用
val fun = new MySplit()
tEnv.registerFunction("mySplit", fun)
val sql = "SELECT c, s FROM MyTable, LATERAL TABLE(mySplit(c)) AS T(s)"

UDAF

  • 定義
    UDAF 要實現(xiàn)的接口比較多房揭,我們以一個簡單的 CountAGG 為例,做簡單實現(xiàn)如下:
/** The initial accumulator for count aggregate function */
class CountAccumulator extends JTuple1[Long] {
  f0 = 0L //count
}

/**
  * User-defined count aggregate function
  */
class MyCount
  extends AggregateFunction[JLong, CountAccumulator] {

  // process argument is optimized by Calcite.
  // For instance count(42) or count(*) will be optimized to count().
  def accumulate(acc: CountAccumulator): Unit = {
    acc.f0 += 1L
  }

  // process argument is optimized by Calcite.
  // For instance count(42) or count(*) will be optimized to count().
  def retract(acc: CountAccumulator): Unit = {
    acc.f0 -= 1L
  }

  def accumulate(acc: CountAccumulator, value: Any): Unit = {
    if (value != null) {
      acc.f0 += 1L
    }
  }

  def retract(acc: CountAccumulator, value: Any): Unit = {
    if (value != null) {
      acc.f0 -= 1L
    }
  }

  override def getValue(acc: CountAccumulator): JLong = {
    acc.f0
  }

  def merge(acc: CountAccumulator, its: JIterable[CountAccumulator]): Unit = {
    val iter = its.iterator()
    while (iter.hasNext) {
      acc.f0 += iter.next().f0
    }
  }

  override def createAccumulator(): CountAccumulator = {
    new CountAccumulator
  }

  def resetAccumulator(acc: CountAccumulator): Unit = {
    acc.f0 = 0L
  }

  override def getAccumulatorType: TypeInformation[CountAccumulator] = {
    new TupleTypeInfo(classOf[CountAccumulator], BasicTypeInfo.LONG_TYPE_INFO)
  }

  override def getResultType: TypeInformation[JLong] =
    BasicTypeInfo.LONG_TYPE_INFO
}
  • 使用
val fun = new MyCount()
tEnv.registerFunction("myCount", fun)
val sql = "SELECT myCount(c) FROM MyTable GROUP BY  a"

Source&Sink

上面我們介紹了 Apache Flink SQL 核心算子的語法及語義晌端,這部分將選取 Bounded EventTime Tumble Window 為例為大家編寫一個完整的包括 Source 和 Sink 定義的 Apache Flink SQL Job捅暴。假設(shè)有一張?zhí)詫氻撁嬖L問表(PageAccess_tab),有地域咧纠,用戶 ID 和訪問時間蓬痒。我們需要按不同地域統(tǒng)計每 2 分鐘的淘寶首頁的訪問量(PV). 具體數(shù)據(jù)如下:

region userId accessTime
ShangHai U0010 2017-11-11 10:01:00
BeiJing U1001 2017-11-11 10:01:00
BeiJing U2032 2017-11-11 10:10:00
BeiJing U1100 2017-11-11 10:11:00
ShangHai U0011 2017-11-11 12:10:00

Source 定義

自定義 Apache Flink Stream Source 需要實現(xiàn)StreamTableSource, StreamTableSource中通過StreamExecutionEnvironmentaddSource方法獲取DataStream, 所以我們需要自定義一個 SourceFunction, 并且要支持產(chǎn)生 WaterMark,也就是要實現(xiàn)DefinedRowtimeAttributes接口惧盹。

Source Function 定義

支持接收攜帶 EventTime 的數(shù)據(jù)集合乳幸,Either 的數(shù)據(jù)結(jié)構(gòu),Right 表示 WaterMark 和 Left 表示數(shù)據(jù):

class MySourceFunction[T](dataWithTimestampList: Seq[Either[(Long, T), Long]]) 
  extends SourceFunction[T] {
  override def run(ctx: SourceContext[T]): Unit = {
    dataWithTimestampList.foreach {
      case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
      case Right(w) => ctx.emitWatermark(new Watermark(w))
    }
  }
  override def cancel(): Unit = ???
}

定義 StreamTableSource

我們自定義的 Source 要攜帶我們測試的數(shù)據(jù)钧椰,以及對應(yīng)的 WaterMark 數(shù)據(jù)粹断,具體如下:

class MyTableSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {

  val fieldNames = Array("accessTime", "region", "userId")
  val schema = new TableSchema(fieldNames, Array(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING))
  val rowType = new RowTypeInfo(
    Array(Types.LONG, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
    fieldNames)

  // 頁面訪問表數(shù)據(jù) rows with timestamps and watermarks
  val data = Seq(
    Left(1510365660000L, Row.of(new JLong(1510365660000L), "ShangHai", "U0010")),
    Right(1510365660000L),
    Left(1510365660000L, Row.of(new JLong(1510365660000L), "BeiJing", "U1001")),
    Right(1510365660000L),
    Left(1510366200000L, Row.of(new JLong(1510366200000L), "BeiJing", "U2032")),
    Right(1510366200000L),
    Left(1510366260000L, Row.of(new JLong(1510366260000L), "BeiJing", "U1100")),
    Right(1510366260000L),
    Left(1510373400000L, Row.of(new JLong(1510373400000L), "ShangHai", "U0011")),
    Right(1510373400000L)
  )

  override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
    Collections.singletonList(new RowtimeAttributeDescriptor(
      "accessTime",
      new ExistingField("accessTime"),
      PreserveWatermarks.INSTANCE))
  }

  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
    execEnv.addSource(new MySourceFunction[Row](data)).setParallelism(1).returns(rowType)
  }

  override def getReturnType: TypeInformation[Row] = rowType

  override def getTableSchema: TableSchema = schema

}

Sink 定義

我們簡單的將計算結(jié)果寫入到 Apache Flink 內(nèi)置支持的 CSVSink 中,定義 Sink 如下:

def getCsvTableSink: TableSink[Row] = {
    val tempFile = File.createTempFile("csv_sink_", "tem")
    // 打印sink的文件路徑嫡霞,方便我們查看運行結(jié)果
    println("Sink path : " + tempFile)
    if (tempFile.exists()) {
      tempFile.delete()
    }
    new CsvTableSink(tempFile.getAbsolutePath).configure(
      Array[String]("region", "winStart", "winEnd", "pv"),
      Array[TypeInformation[_]](Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG))
  }

構(gòu)建主程序

主程序包括執(zhí)行環(huán)境的定義瓶埋,Source/Sink 的注冊以及統(tǒng)計查 SQL 的執(zhí)行,具體如下:

def main(args: Array[String]): Unit = {
    // Streaming 環(huán)境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)

    // 設(shè)置EventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //方便我們查出輸出數(shù)據(jù)
    env.setParallelism(1)

    val sourceTableName = "mySource"
    // 創(chuàng)建自定義source數(shù)據(jù)結(jié)構(gòu)
    val tableSource = new MyTableSource

    val sinkTableName = "csvSink"
    // 創(chuàng)建CSV sink 數(shù)據(jù)結(jié)構(gòu)
    val tableSink = getCsvTableSink

    // 注冊source
    tEnv.registerTableSource(sourceTableName, tableSource)
    // 注冊sink
    tEnv.registerTableSink(sinkTableName, tableSink)

    val sql =
      "SELECT  " +
      "  region, " +
      "  TUMBLE_START(accessTime, INTERVAL '2' MINUTE) AS winStart," +
      "  TUMBLE_END(accessTime, INTERVAL '2' MINUTE) AS winEnd, COUNT(region) AS pv " +
      " FROM mySource " +
      " GROUP BY TUMBLE(accessTime, INTERVAL '2' MINUTE), region"

    tEnv.sqlQuery(sql).insertInto(sinkTableName);
    env.execute()
  }

執(zhí)行并查看運行結(jié)果

執(zhí)行主程序后我們會在控制臺得到 Sink 的文件路徑诊沪,如下:

Sink path : /var/folders/88/8n406qmx2z73qvrzc_rbtv_r0000gn/T/csv_sink_8025014910735142911tem

Cat 方式查看計算結(jié)果养筒,如下:

jinchengsunjcdeMacBook-Pro:FlinkTableApiDemo jincheng.sunjc$ cat /var/folders/88/8n406qmx2z73qvrzc_rbtv_r0000gn/T/csv_sink_8025014910735142911tem
ShangHai,2017-11-11 02:00:00.0,2017-11-11 02:02:00.0,1
BeiJing,2017-11-11 02:00:00.0,2017-11-11 02:02:00.0,1
BeiJing,2017-11-11 02:10:00.0,2017-11-11 02:12:00.0,2
ShangHai,2017-11-11 04:10:00.0,2017-11-11 04:12:00.0,1

表格化如上結(jié)果:

region winStart winEnd pv
BeiJing 2017-11-11 02:00:00.0 2017-11-11 02:02:00.0 1
BeiJing 2017-11-11 02:10:00.0 2017-11-11 02:12:00.0 2
ShangHai 2017-11-11 02:00:00.0 2017-11-11 02:02:00.0 1
ShangHai 2017-11-11 04:10:00.0 2017-11-11 04:12:00.0 1

上面這個端到端的完整示例也可以應(yīng)用到本篇前面介紹的其他算子示例中,只是大家根據(jù) Source 和 Sink 的 Schema 不同來進行相應(yīng)的構(gòu)建即可端姚!

總結(jié)

本篇概要的向大家介紹了 SQL 的由來晕粪,Apache Flink SQL 大部分核心功能,并附帶了具體的測試數(shù)據(jù)和測試程序渐裸,最后以一個 End-to-End 的示例展示了如何編寫 Apache Flink SQL 的 Job 收尾巫湘。本篇著重向大家介紹 Apache Flink SQL 的使用装悲,后續(xù)我們再繼續(xù)探究每個算子的實現(xiàn)原理。

附錄

關(guān)于點贊和評論

本系列文章難免有很多缺陷和不足尚氛,真誠希望讀者對有收獲的篇章給予點贊鼓勵诀诊,對有不足的篇章給予反饋和建議,先行感謝大家阅嘶!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末属瓣,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子讯柔,更是在濱河造成了極大的恐慌抡蛙,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,639評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件魂迄,死亡現(xiàn)場離奇詭異溜畅,居然都是意外死亡,警方通過查閱死者的電腦和手機极祸,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,277評論 3 385
  • 文/潘曉璐 我一進店門慈格,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人遥金,你說我怎么就攤上這事浴捆。” “怎么了稿械?”我有些...
    開封第一講書人閱讀 157,221評論 0 348
  • 文/不壞的土叔 我叫張陵选泻,是天一觀的道長。 經(jīng)常有香客問我美莫,道長页眯,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,474評論 1 283
  • 正文 為了忘掉前任厢呵,我火速辦了婚禮窝撵,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘襟铭。我一直安慰自己碌奉,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,570評論 6 386
  • 文/花漫 我一把揭開白布寒砖。 她就那樣靜靜地躺著赐劣,像睡著了一般。 火紅的嫁衣襯著肌膚如雪哩都。 梳的紋絲不亂的頭發(fā)上魁兼,一...
    開封第一講書人閱讀 49,816評論 1 290
  • 那天,我揣著相機與錄音漠嵌,去河邊找鬼咐汞。 笑死判哥,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的碉考。 我是一名探鬼主播,決...
    沈念sama閱讀 38,957評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼挺身,長吁一口氣:“原來是場噩夢啊……” “哼侯谁!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起章钾,我...
    開封第一講書人閱讀 37,718評論 0 266
  • 序言:老撾萬榮一對情侶失蹤墙贱,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后贱傀,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體惨撇,經(jīng)...
    沈念sama閱讀 44,176評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,511評論 2 327
  • 正文 我和宋清朗相戀三年府寒,在試婚紗的時候發(fā)現(xiàn)自己被綠了魁衙。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,646評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡株搔,死狀恐怖剖淀,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情纤房,我是刑警寧澤纵隔,帶...
    沈念sama閱讀 34,322評論 4 330
  • 正文 年R本政府宣布,位于F島的核電站炮姨,受9級特大地震影響捌刮,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜舒岸,卻給世界環(huán)境...
    茶點故事閱讀 39,934評論 3 313
  • 文/蒙蒙 一绅作、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧蛾派,春花似錦棚蓄、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,755評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至典尾,卻和暖如春役拴,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背钾埂。 一陣腳步聲響...
    開封第一講書人閱讀 31,987評論 1 266
  • 我被黑心中介騙來泰國打工河闰, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留科平,地道東北人。 一個月前我還...
    沈念sama閱讀 46,358評論 2 360
  • 正文 我出身青樓姜性,卻偏偏與公主長得像瞪慧,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子部念,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,514評論 2 348

推薦閱讀更多精彩內(nèi)容