Flink 1.11 新特性:流批一體的 Hive 數(shù)倉

本文僅為筆者平日學習記錄之用,侵刪
原文:https://mp.weixin.qq.com/s/kX3ZmJARRWOl2WwF8W6Nyw

Table/SQL 的 blink planner 成為默認 Planner。

Flink 1.11 中流計算結(jié)合 Hive 批處理數(shù)倉,給離線數(shù)倉帶來 Flink 流處理實時且 Exactly-once 的能力套硼。另外锻梳,F(xiàn)link 1.11 完善了 Flink 自身的 Filesystem connector衫画,大大提高了 Flink 的易用性睡榆。

數(shù)倉架構(gòu)

離線數(shù)倉

傳統(tǒng)的離線數(shù)倉是由 Hive 加上 HDFS 的方案场航,Hive 數(shù)倉有著成熟和穩(wěn)定的大數(shù)據(jù)分析能力缠导,結(jié)合調(diào)度和上下游工具,構(gòu)建一個完整的數(shù)據(jù)處理分析平臺溉痢,流程如下:

  • Flume 把數(shù)據(jù)導入 Hive 數(shù)倉

  • 調(diào)度工具僻造,調(diào)度 ETL 作業(yè)進行數(shù)據(jù)處理

  • 在 Hive 數(shù)倉的表上,可以進行靈活的 Ad-hoc 查詢

  • 調(diào)度工具孩饼,調(diào)度聚合作業(yè)輸出到BI層的數(shù)據(jù)庫中

這個流程下的問題是:

  • 導入過程不夠靈活髓削,這應該是一個靈活 SQL 流計算的過程

  • 基于調(diào)度作業(yè)的級聯(lián)計算,實時性太差

  • ETL 不能有流式的增量計算

實時數(shù)倉

針對離線數(shù)倉的特點镀娶,隨著實時計算的流行立膛,越來越多的公司引入實時數(shù)倉,實時數(shù)倉基于 Kafka + Flink streaming梯码,定義全流程的流計算作業(yè)宝泵,有著秒級甚至毫秒的實時性。

但是轩娶,實時數(shù)倉的一個問題是歷史數(shù)據(jù)只有 3-15 天儿奶,無法在其上做 Ad-hoc 的查詢。如果搭建 Lambda 的離線+實時的架構(gòu)罢坝,維護成本廓握、計算存儲成本、一致性保證嘁酿、重復的開發(fā)會帶來很大的負擔隙券。

Hive 實時化

Flink 1.11 為解決離線數(shù)倉的問題,給 Hive 數(shù)倉帶來了實時化的能力闹司,加強各環(huán)節(jié)的實時性的同時娱仔,又不會給架構(gòu)造成太大的負擔。

Hive streaming sink

實時數(shù)據(jù)導入 Hive 數(shù)倉游桩,你是怎么做的牲迫?Flume、Spark Streaming 還是 Flink Datastream借卧?千呼萬喚盹憎,Table / SQL 層的 streaming file sink 來啦,Flink 1.11 支持 Filesystem connector[1] 和 Hive connector 的 streaming sink[2]铐刘。

(注:圖中 StreamingFileSink 的 Bucket 概念就是 Table/SQL 中的 Partition)Table/SQL 層的 streaming sink 不僅:

  • 帶來 Flink streaming 的實時/準實時的能力
  • 支持 Filesystem connector 的全部 formats(csv,json,avro,parquet,orc)
  • 支持 Hive table 的所有 formats
  • 繼承 Datastream StreamingFileSink 的所有特性:Exactly-once陪每、支持HDFS, S3。

而且引入了新的機制:Partition commit。一個合理的數(shù)倉的數(shù)據(jù)導入檩禾,它不止包含數(shù)據(jù)文件的寫入挂签,也包含了 Partition 的可見性提交。當某個 Partition 完成寫入時盼产,需要通知 Hive metastore 或者在文件夾內(nèi)添加 SUCCESS 文件饵婆。Flink 1.11 的 Partition commit 機制可以讓你:

  • Trigger:控制Partition提交的時機,可以根據(jù)Watermark加上從Partition中提取的時間來判斷戏售,也可以通過Processing time來判斷侨核。你可以控制:是想先盡快看到?jīng)]寫完的Partition;還是保證寫完Partition之后蜈项,再讓下游看到它芹关。
  • Policy:提交策略,內(nèi)置支持SUCCESS文件和Metastore的提交紧卒,你也可以擴展提交的實現(xiàn)侥衬,比如在提交階段觸發(fā)Hive的analysis來生成統(tǒng)計信息,或者進行小文件的合并等等跑芳。

一個例子:

-- 結(jié)合Hive dialect使用Hive DDL語法
SET table.sql-dialect=hive;
CREATE TABLE hive_table (
  user_id STRING,
  order_amount DOUBLE
) PARTITIONED BY (
  dt STRING,
  hour STRING
) STORED AS PARQUET TBLPROPERTIES (
  -- 使用partition中抽取時間轴总,加上watermark決定partiton commit的時機
  'sink.partition-commit.trigger'='partition-time',
  -- 配置hour級別的partition時間抽取策略,這個例子中dt字段是yyyy-MM-dd格式的天博个,hour是0-23的小時怀樟,timestamp-pattern定義了如何從這兩個partition字段推出完整的timestamp
  'partition.time-extractor.timestamp-pattern'=’$dt $hour:00:00’,
  -- 配置dalay為小時級,當 watermark > partition時間 + 1小時盆佣,會commit這個partition
  'sink.partition-commit.delay'='1 h',
  -- partitiion commit的策略是:先更新metastore(addPartition)往堡,再寫SUCCESS文件
  'sink.partition-commit.policy.kind’='metastore,success-file'
)
 
SET table.sql-dialect=default;
CREATE TABLE kafka_table (
  user_id STRING,
  order_amount DOUBLE,
  log_ts TIMESTAMP(3),
  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
)
 
-- 可以結(jié)合Table Hints動態(tài)指定table properties [3]
INSERT INTO TABLE hive_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;

Hive streaming source

Hive 數(shù)倉中存在大量的 ETL 任務,這些任務往往是通過調(diào)度工具來周期性的運行共耍,這樣做主要有兩個問題:

  1. 實時性不強虑灰,往往調(diào)度最小是小時級。
  2. 流程復雜痹兜,組件多穆咐,容易出現(xiàn)問題。

針對這些離線的 ETL 作業(yè)字旭,Flink 1.11 為此開發(fā)了實時化的 Hive 流讀对湃,支持:

  • Partition 表,監(jiān)控 Partition 的生成遗淳,增量讀取新的 Partition拍柒。
  • 非 Partition 表,監(jiān)控文件夾內(nèi)新文件的生成屈暗,增量讀取新的文件拆讯。

你甚至可以使用10分鐘級別的分區(qū)策略剧包,使用 Flink 的 Hive streaming source 和Hive streaming sink 可以大大提高 Hive 數(shù)倉的實時性到準實時分鐘級 [4][5],在實時化的同時往果,也支持針對 Table 全量的 Ad-hoc 查詢,提高靈活性一铅。

SELECT * FROM hive_table
/*+ OPTIONS('streaming-source.enable'=’true’,
'streaming-source.consume-start-offset'='2020-05-20') */;

實時數(shù)據(jù)關聯(lián) Hive 表

在 Flink 與 Hive 集成的功能發(fā)布以后陕贮,我們收到最多的用戶反饋之一就是希望能夠?qū)?Flink 的實時數(shù)據(jù)與離線的 Hive 表進行關聯(lián)。因此潘飘,在 Flink 1.11 中肮之,我們支持將實時表與 Hive 表進行 temporal join [6]。沿用 Flink 官方文檔中的例子卜录,假定 Orders 是實時表戈擒,而 LatestRates 是一張 Hive 表,用戶可以通過以下語句進行temporal join:

SELECT
  o.amout, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency

與 Hive 表進行 temporal join 目前只支持 processing time艰毒,我們會把 Hive 表的數(shù)據(jù)緩存到內(nèi)存中筐高,并按照固定的時間間隔去更新緩存的數(shù)據(jù)。用戶可以通過參數(shù)“l(fā)ookup.join.cache.ttl” 來控制緩存更新的間隔丑瞧,默認間隔為一個小時柑土。“l(fā)ookup.join.cache.ttl” 需要配置到 Hive 表的 property 當中绊汹,因此每張表可以有不同的配置稽屏。另外,由于需要將整張 Hive 表加載到內(nèi)存中西乖,因此目前只適用于 Hive 表較小的場景狐榔。

Hive 增強

Hive Dialect 語法兼容

Flink on Hive 用戶并不能很好的使用 DDL,主要是因為:

  • Flink 1.10 中進一步完善了 DDL获雕,但由于 Flink 與 Hive 在元數(shù)據(jù)語義上的差異薄腻,通過 Flink DDL 來操作 Hive 元數(shù)據(jù)的可用性比較差,僅能覆蓋很少的應用場景典鸡。
  • 使用 Flink 對接 Hive 的用戶經(jīng)常需要切換到 Hive CLI 來執(zhí)行 DDL被廓。

針對上述兩個問題,我們提出了 FLIP-123 [7]萝玷,通過 Hive Dialect 為用戶提供 Hive語法兼容嫁乘。該功能的最終目標,是為用戶提供近似 Hive CLI/Beeline 的使用體驗球碉,讓用戶無需在 Flink 和 Hive 的 CLI 之間進行切換蜓斧,甚至可以直接遷移部分 Hive 腳本到 Flink 中執(zhí)行。 在 Flink 1.11中睁冬,Hive Dialect 可以支持大部分常用的 DDL挎春,比如 CREATE/ALTER TABLE看疙、CHANGE/REPLACE COLUMN、ADD/DROP PARTITION 等等直奋。為此能庆,我們?yōu)?Hive Dialect 實現(xiàn)了一個獨立的 parser,F(xiàn)link 會根據(jù)用戶指定的 Dialect 決定使用哪個 parser 來解析 SQL 語句脚线。用戶可以通過配置項“ table.sql-dialect ” 來指定使用的 SQL Dialect搁胆。它的默認值為 “default”,即 Flink 原生的 Dialect邮绿,而將其設置為 “hive” 時就開啟了 Hive Dialect渠旁。對于 SQL 用戶,可以在 yaml 文件中設置“table.sql-dialect” 來指定 session 的初始 Dialect船逮,也可以通過 set 命令來動態(tài)調(diào)整需要使用的 Dialect顾腊,而無需重啟 session。Hive Dialect 目前所支持的具體功能可以參考 FLIP-123 或 Flink 的官方文檔挖胃。另外杂靶,該功能的一些設計原則和使用注意事項如下:

  1. Hive Dialect 只能用于操作 Hive 表,而不是 Flink 原生的表(如 Kafka冠骄、ES 的表)伪煤,這也意味著 Hive Dialect 需要配合 HiveCatalog 使用。
  2. 使用 Hive Dialect 時凛辣,原有的 Flink 的一些語法可能會無法使用(例如 Flink 定義的類型別名)抱既,在需要使用 Flink 語法時可以動態(tài)切換到默認的 Dialect。
  3. Hive Dialect 的 DDL 語法定義基于 Hive 的官方文檔扁誓,而不同 Hive 版本之間語法可能會有輕微的差異防泵,需要用戶進行一定的調(diào)整。
  4. Hive Dialect 的語法實現(xiàn)基于 Calcite蝗敢,而 Calcite 與 Hive 有不同的保留關鍵字捷泞。因此,某些在 Hive 中可以直接作為標識符的關鍵字(如 “default” )寿谴,在Hive Dialect 中可能需要用“`”進行轉(zhuǎn)義锁右。

向量化讀取

Flink 1.10中,F(xiàn)link 已經(jīng)支持了 ORC (Hive 2+) 的向量化讀取支持讶泰,但是這很局限咏瑟,為此,F(xiàn)link 1.11 增加了更多的向量化支持:

  • ORC for Hive 1.x [8]
  • Parquet for Hive 1,2,3 [9]

也就是說已經(jīng)補全了所有版本的 Parquet 和 ORC 向量化支持痪署,默認是開啟的码泞,提供開關。

簡化 Hive 依賴

Flink 1.10 中狼犯,F(xiàn)link 文檔中列出了所需的 Hive 相關依賴余寥,推薦用戶自行下載领铐。但是這仍然稍顯麻煩,所以在1.11 中宋舷,F(xiàn)link 提供了內(nèi)置的依賴支持[10]:

  • flink-sql-connector-hive-1.2.2_2.11-1.11.jar:Hive 1 的依賴版本绪撵。
  • flink-sql-connector-hive-2.2.0_2.11-1.11.jar:Hive 2.0 - 2.2 的依賴版本。
  • flink-sql-connector-hive-2.3.6_2.11-1.11.jar:Hive 2.3 的依賴版本祝蝠。
  • flink-sql-connector-hive-3.1.2_2.11-1.11.jar:Hive 3 的依賴版本莲兢。

現(xiàn)在,你只需要單獨下一個包续膳,再搞定 HADOOP_CLASSPATH,即可運行 Flink on Hive收班。

Flink 增強

除了 Hive 相關的 features坟岔,F(xiàn)link 1.11 也完成了大量其它關于流批一體的增強。

Flink Filesystem connector

Flink table 在長久以來只支持一個 csv 的 file system table摔桦,而且它還不支持Partition社付,行為上在某些方面也有些不符合大數(shù)據(jù)計算的直覺。在 Flink 1.11邻耕,重構(gòu)了整個 Filesystem connector 的實現(xiàn) [1]:

  • 結(jié)合 Partition鸥咖,現(xiàn)在,F(xiàn)ilesystem connector 支持 SQL 中 Partition 的所有語義兄世,支持 Partition 的 DDL啼辣,支持 Partition Pruning,支持靜態(tài)/動態(tài) Partition 的插入御滩,支持 overwrite 的插入鸥拧。

  • 支持各種 Formats:

  • CSV

  • JSON

  • Aparch AVRO

  • Apache Parquet

  • Apache ORC.

  • 支持 Batch 的讀寫。

  • 支持 Streaming sink削解,也支持上述 Hive 支持的 Partition commit富弦,支持寫Success 文件。

例子:

CREATE TABLE fs_table (
  user_id STRING,
  order_amount DOUBLE,
  dt STRING,
  hour STRING
) PARTITIONED BY (dt, hour) WITH (
  ’connector’=’filesystem’,
  ’path’=’...’,
  ’format’=’parquet’,
  'partition.time-extractor.timestamp-pattern'=’$dt $hour:00:00’,
  'sink.partition-commit.delay'='1 h',
  ‘sink.partition-commit.policy.kind’='success-file')
)

-- stream environment or batch environment
INSERT INTO TABLE fs_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;

-- 通過 Partition 查詢
SELECT * FROM fs_table WHERE dt=’2020-05-20’ and hour=’12’;

引入 Max Slot

Yarn perJob 或者 session 模式在 1.11 之前是無限擴張的氛驮,沒有辦法限制它的資源使用腕柜,只能用 Yarn queue 等方式來限制。但是傳統(tǒng)的批作業(yè)其實都是大并發(fā)矫废,運行在局限的資源上盏缤,一部分一部分階段性的運行,為此磷脯,F(xiàn)link 1.11 引入 Max Slot 的配置[11]蛾找,限制 Yarn application 的資源使用。

slotmanager.number-of-slots.max

定義 Flink 集群分配的最大 Slot 數(shù)赵誓。此配置選項用于限制批處理工作負載的資源消耗打毛。不建議為流作業(yè)配置此選項柿赊,如果沒有足夠的 Slot,則流作業(yè)可能會失敗幻枉。

結(jié) 語

Flink 1.11 也是一個大版本碰声,社區(qū)做了大量的 Features 和 Improvements,F(xiàn)link 的大目標是幫助業(yè)務構(gòu)建流批一體的數(shù)倉熬甫,提供完善胰挑、順滑、高性能的一體式數(shù)倉椿肩。希望大家多多參與社區(qū)瞻颂,積極反饋問題和想法,甚至參與社區(qū)的討論和開發(fā)郑象,一起把 Flink 做得越來越好贡这!

參考資料:

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
[2] https://issues.apache.org/jira/browse/FLINK-14255
[3] https://cwiki.apache.org/confluence/display/FLINK/FLIP-113%3A+Supports+Dynamic+Table+Options+for+Flink+SQL
[4] https://issues.apache.org/jira/browse/FLINK-17434
[5] https://issues.apache.org/jira/browse/FLINK-17435
[6] https://issues.apache.org/jira/browse/FLINK-17387
[7] https://cwiki.apache.org/confluence/display/FLINK/FLIP-123%3A+DDL+and+DML+compatibility+for+Hive+connector
[8] https://issues.apache.org/jira/browse/FLINK-14802
[9] https://issues.apache.org/jira/browse/FLINK-16450
[10] https://issues.apache.org/jira/browse/FLINK-16455
[11] https://issues.apache.org/jira/browse/FLINK-16605

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市厂榛,隨后出現(xiàn)的幾起案子盖矫,更是在濱河造成了極大的恐慌,老刑警劉巖击奶,帶你破解...
    沈念sama閱讀 210,978評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件辈双,死亡現(xiàn)場離奇詭異,居然都是意外死亡柜砾,警方通過查閱死者的電腦和手機湃望,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,954評論 2 384
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來痰驱,“玉大人喜爷,你說我怎么就攤上這事√汛剑” “怎么了檩帐?”我有些...
    開封第一講書人閱讀 156,623評論 0 345
  • 文/不壞的土叔 我叫張陵,是天一觀的道長另萤。 經(jīng)常有香客問我湃密,道長,這世上最難降的妖魔是什么四敞? 我笑而不...
    開封第一講書人閱讀 56,324評論 1 282
  • 正文 為了忘掉前任泛源,我火速辦了婚禮,結(jié)果婚禮上忿危,老公的妹妹穿的比我還像新娘达箍。我一直安慰自己,他們只是感情好铺厨,可當我...
    茶點故事閱讀 65,390評論 5 384
  • 文/花漫 我一把揭開白布缎玫。 她就那樣靜靜地躺著硬纤,像睡著了一般。 火紅的嫁衣襯著肌膚如雪赃磨。 梳的紋絲不亂的頭發(fā)上筝家,一...
    開封第一講書人閱讀 49,741評論 1 289
  • 那天,我揣著相機與錄音邻辉,去河邊找鬼溪王。 笑死,一個胖子當著我的面吹牛值骇,可吹牛的內(nèi)容都是我干的莹菱。 我是一名探鬼主播,決...
    沈念sama閱讀 38,892評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼吱瘩,長吁一口氣:“原來是場噩夢啊……” “哼芒珠!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起搅裙,我...
    開封第一講書人閱讀 37,655評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎裹芝,沒想到半個月后部逮,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,104評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡嫂易,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,451評論 2 325
  • 正文 我和宋清朗相戀三年兄朋,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片怜械。...
    茶點故事閱讀 38,569評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡颅和,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出缕允,到底是詐尸還是另有隱情峡扩,我是刑警寧澤,帶...
    沈念sama閱讀 34,254評論 4 328
  • 正文 年R本政府宣布障本,位于F島的核電站教届,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏驾霜。R本人自食惡果不足惜案训,卻給世界環(huán)境...
    茶點故事閱讀 39,834評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望粪糙。 院中可真熱鬧强霎,春花似錦、人聲如沸蓉冈。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,725評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至椿争,卻和暖如春怕膛,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背秦踪。 一陣腳步聲響...
    開封第一講書人閱讀 31,950評論 1 264
  • 我被黑心中介騙來泰國打工褐捻, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人椅邓。 一個月前我還...
    沈念sama閱讀 46,260評論 2 360
  • 正文 我出身青樓柠逞,卻偏偏與公主長得像,于是被迫代替她去往敵國和親景馁。 傳聞我的和親對象是個殘疾皇子板壮,可洞房花燭夜當晚...
    茶點故事閱讀 43,446評論 2 348