Flink集成Hive之Hive Catalog與Hive Dialect--以Flink1.12

在上一篇分享Flink集成Hive之快速入門--以Flink1.12為例中盆色,介紹了Flink集成Hive的進本步驟。本文分享,將繼續(xù)介紹Flink集成Hive的另外兩個概念:Hive Catalog與Hive Dialect购裙。本文包括以下內(nèi)容科吭,希望對你有所幫助啄骇。

  • 什么是Hive Catalog
  • 如何使用Hive Catalog
  • 什么是Hive Dialect
  • 如何使用Hive Dialect

公眾號『大數(shù)據(jù)技術(shù)與數(shù)倉』丹诀,回復『資料』領取大數(shù)據(jù)資料包

什么是Hive Catalog

我們知道逆巍,Hive使用Hive Metastore(HMS)存儲元數(shù)據(jù)信息尺棋,使用關系型數(shù)據(jù)庫來持久化存儲這些信息封锉。所以,F(xiàn)link集成Hive需要打通Hive的metastore膘螟,去管理Flink的元數(shù)據(jù)成福,這就是Hive Catalog的功能。

Hive Catalog的主要作用是使用Hive MetaStore去管理Flink的元數(shù)據(jù)荆残。Hive Catalog可以將元數(shù)據(jù)進行持久化奴艾,這樣后續(xù)的操作就可以反復使用這些表的元數(shù)據(jù),而不用每次使用時都要重新注冊内斯。如果不去持久化catalog蕴潦,那么在每個session中取處理數(shù)據(jù),都要去重復地創(chuàng)建元數(shù)據(jù)對象俘闯,這樣是非常耗時的潭苞。

如何使用Hive Catalog

HiveCatalog是開箱即用的,所以真朗,一旦配置好Flink與Hive集成萄传,就可以使用HiveCatalog。比如,我們通過FlinkSQL 的DDL語句創(chuàng)建一張kafka的數(shù)據(jù)源表秀菱,立刻就能查看該表的元數(shù)據(jù)信息振诬。

HiveCatalog可以處理兩種類型的表:一種是Hive兼容的表,另一種是普通表(generic table)衍菱。其中Hive兼容表是以兼容Hive的方式來存儲的赶么,所以,對于Hive兼容表而言脊串,我們既可以使用Flink去操作該表辫呻,又可以使用Hive去操作該表。

普通表是對Flink而言的琼锋,當使用HiveCatalog創(chuàng)建一張普通表放闺,僅僅是使用Hive MetaStore將其元數(shù)據(jù)進行了持久化,所以可以通過Hive查看這些表的元數(shù)據(jù)信息(通過DESCRIBE FORMATTED命令)缕坎,但是不能通過Hive去處理這些表怖侦,因為語法不兼容。

對于是否是普通表谜叹,F(xiàn)link使用is_generic屬性進行標識匾寝。默認情況下,創(chuàng)建的表是普通表荷腊,即is_generic=true艳悔,如果要創(chuàng)建Hive兼容表,需要在建表屬性中指定is_generic=false女仰。

尖叫提示:

由于依賴Hive Metastore猜年,所以必須開啟Hive MetaStore服務

代碼中使用Hive Catalog

   EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        String name            = "myhive";
        String defaultDatabase = "default";
        String hiveConfDir = "/opt/modules/apache-hive-2.3.4-bin/conf";

        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
        tableEnv.registerCatalog("myhive", hive);
        // 使用注冊的catalog
        tableEnv.useCatalog("myhive");

Flink SQLCli中使用Hive Catalog

在FlinkSQL Cli中使用Hive Catalog很簡單,只需要配置一下sql-cli-defaults.yaml文件即可疾忍。配置內(nèi)容如下:

catalogs:
   - name: myhive
     type: hive
     default-database: default
     hive-conf-dir: /opt/modules/apache-hive-2.3.4-bin/conf
image

在FlinkSQL Cli中創(chuàng)建一張kafka表码倦,該表默認為普通表,即is_generic=true

CREATE TABLE user_behavior ( 
    `user_id` BIGINT, -- 用戶id
    `item_id` BIGINT, -- 商品id
    `cat_id` BIGINT, -- 品類id
    `action` STRING, -- 用戶行為
    `province` INT, -- 用戶所在的省份
    `ts` BIGINT, -- 用戶行為發(fā)生的時間戳
    `proctime` AS PROCTIME(), -- 通過計算列產(chǎn)生一個處理時間列
    `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件時間
     WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND  -- 定義watermark
 ) WITH ( 
    'connector' = 'kafka', -- 使用 kafka connector
    'topic' = 'user_behavior', -- kafka主題
    'scan.startup.mode' = 'earliest-offset', -- 偏移量
    'properties.group.id' = 'group1', -- 消費者組
    'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 
    'format' = 'json', -- 數(shù)據(jù)源格式為json
    'json.fail-on-missing-field' = 'true',
    'json.ignore-parse-errors' = 'false'
);

我們可以在Hive客戶端中查看該表的元數(shù)據(jù)信息

hive (default)> desc formatted  user_behavior;
Table Parameters:                
       ...
        is_generic              true                
      ...         

從上面的元數(shù)據(jù)信息可以看出锭碳,is_generic=true,說明該表是一張普通表勿璃,如果在Hive中去查看該表擒抛,則會報錯。

上面創(chuàng)建的表是普通表补疑,該表不能使用Hive去查詢歧沪。那么,該如何創(chuàng)建一張Hive兼容表呢莲组?我們只需要在建表的屬性中顯示指定is_generic=false即可诊胞,具體如下:

CREATE TABLE hive_compatible_tbl ( 
    `user_id` BIGINT, -- 用戶id
    `item_id` BIGINT, -- 商品id
    `cat_id` BIGINT, -- 品類id
    `action` STRING, -- 用戶行為
    `province` INT, -- 用戶所在的省份
    `ts` BIGINT -- 用戶行為發(fā)生的時間戳
 ) WITH ( 
    'connector' = 'kafka', -- 使用 kafka connector
    'topic' = 'user_behavior', -- kafka主題
    'scan.startup.mode' = 'earliest-offset', -- 偏移量
    'properties.group.id' = 'group1', -- 消費者組
    'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 
    'format' = 'json', -- 數(shù)據(jù)源格式為json
    'json.fail-on-missing-field' = 'true',
    'json.ignore-parse-errors' = 'false',
    'is_generic' = 'false'
);

當我們在Hive中查看該表的元數(shù)據(jù)信息時,可以看出:is_generic =false

hive (default)> desc formatted hive_compatible_tbl;
Table Parameters:                
        ...           
        is_generic              false               
        ...

我們可以使用FlinkSQL Cli或者HiveCli向該表中寫入數(shù)據(jù),然后分別通過FlinkSQL Cli和Hive Cli去查看該表數(shù)據(jù)的變化

hive (default)> insert into hive_compatible_tbl select 2020,1221,100,'buy',11,1574330486;
hive (default)> select * from hive_compatible_tbl;

再在FlinkSQL Cli中查看該表撵孤,

Flink SQL> select user_id,item_id,action from hive_compatible_tbl;
                   user_id                   item_id                    action
                      2020                      1221                       buy
    

同樣迈着,我們可以在FlinkSQL Cli中去向該表中寫入數(shù)據(jù):

Flink SQL>  insert into hive_compatible_tbl select 2020,1222,101,'fav',11,1574330486;
Flink SQL> select user_id,item_id,action from hive_compatible_tbl;

                   user_id                   item_id                    action
                      2020                      1221                       buy
                      2020                      1222                       fav

尖叫提示:

對于Hive兼容的表,需要注意數(shù)據(jù)類型邪码,具體的數(shù)據(jù)類型對應關系以及注意點如下

Flink 數(shù)據(jù)類型 Hive 數(shù)據(jù)類型
CHAR(p) CHAR(p)
VARCHAR(p) VARCHAR(p)
STRING STRING
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT LONG
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL(p, s) DECIMAL(p, s)
DATE DATE
TIMESTAMP(9) TIMESTAMP
BYTES BINARY
ARRAY<T> LIST<T>
MAP<K, V> MAP<K, V>
ROW STRUCT

注意

  • Hive CHAR(p) 類型的最大長度為255
  • Hive VARCHAR(p)類型的最大長度為65535
  • Hive MAP類型的key僅支持基本類型裕菠,而Flink’s MAP 類型的key執(zhí)行任意類型
  • Hive不支持聯(lián)合數(shù)據(jù)類型,比如STRUCT
  • Hive’s TIMESTAMP 的精度是 9 闭专, Hive UDFs函數(shù)只能處理 precision <= 9的 TIMESTAMP
  • Hive 不支持 Flink提供的 TIMESTAMP_WITH_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE, 及MULTISET類型
  • FlinkINTERVAL 類型與 Hive INTERVAL 類型不一樣

上面介紹了普通表和Hive兼容表奴潘,那么我們該如何使用Hive的語法進行建表呢?這個時候就需要使用Hive Dialect影钉。

什么是Hive Dialect

從Flink1.11.0開始画髓,只要開啟了Hive dialect配置,用戶就可以使用HiveQL語法平委,這樣我們就可以在Flink中使用Hive的語法使用一些DDL和DML操作奈虾。

Flink目前支持兩種SQL方言(SQL dialects),分別為:default和hive。默認的SQL方言是default肆汹,如果要使用Hive的語法愚墓,需要將SQL方言切換到hive

如何使用Hive Dialect

在SQL Cli中使用Hive dialect

使用hive dialect只需要配置一個參數(shù)即可昂勉,該參數(shù)名稱為:table.sql-dialect浪册。我們就可以在sql-client-defaults.yaml配置文件中進行配置,也可以在具體的會話窗口中進行設定岗照,對于SQL dialect的切換村象,不需要進行重啟session。

execution:
  planner: blink
  type: batch
  result-mode: table

configuration:
  table.sql-dialect: hive

如果我們需要在SQL Cli中進行切換hive dialect攒至,可以使用如下命令:

Flink SQL> set table.sql-dialect=hive; -- 使用hive dialect
Flink SQL> set table.sql-dialect=default; -- 使用default dialect

尖叫提示:

一旦切換到了hive dialect厚者,就只能使用Hive的語法建表,如果嘗試使用Flink的語法建表迫吐,則會報錯

在Table API中配合dialect

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner()...build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// 使用hive dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
// 使用 default dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

操作示例

Flink SQL> set table.sql-dialect=hive;
-- 使用Hive語法創(chuàng)建一張表
CREATE TABLE IF NOT EXISTS `hive_dialect_tbl` (
  `id` int COMMENT 'id',
  `name` string COMMENT '名稱',
  `age` int COMMENT '年齡' 
)
COMMENT 'hive dialect表測試'
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

進入Hive客戶端去查看該表的元數(shù)據(jù)信息

desc formatted hive_dialect_tbl;
col_name        data_type       comment
# col_name              data_type               comment             
                 
id                      int                                         
name                    string                                      
age                     int                                         
                 
# Detailed Table Information             
Database:               default                  
Owner:                  null                     
CreateTime:             Mon Dec 21 17:23:48 CST 2020     
LastAccessTime:         UNKNOWN                  
Retention:              0                        
Location:               hdfs://kms-1.apache.com:8020/user/hive/warehouse/hive_dialect_tbl        
Table Type:             MANAGED_TABLE            
Table Parameters:                
        comment                 hive dialect表測試     
        is_generic              false               
        transient_lastDdlTime   1608542628          
                 
# Storage Information            
SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe       
InputFormat:            org.apache.hadoop.mapred.TextInputFormat         
OutputFormat:           org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat   
Compressed:             No                       
Num Buckets:            -1                       
Bucket Columns:         []                       
Sort Columns:           []                       
Storage Desc Params:             
        field.delim             ,                   
        serialization.format    ,                   

很明顯库菲,該表是一張Hive兼容表,即is_generic=false志膀。

使用FlinkSQLCli向該表中寫入一條數(shù)據(jù):

Flink SQL> insert into hive_dialect_tbl select 1,'tom',20;

我們也可以在Hive的Cli中去操作該表

hive (default)> select * from hive_dialect_tbl;
hive (default)> insert into hive_dialect_tbl select 2,'jack',22;

以下是使用Hive方言的一些注意事項熙宇。

  • Hive dialect只能用于操作Hive表,不能用于普通表溉浙。Hive方言應與HiveCatalog一起使用烫止。
  • 雖然所有Hive版本都支持相同的語法,但是是否有特定功能仍然取決于使用的Hive版本戳稽。例如馆蠕,僅在Hive-2.4.0或更高版本中支持更新數(shù)據(jù)庫位置。
  • Hive和Calcite具有不同的保留關鍵字。例如互躬,default在Calcite中是保留關鍵字播赁,在Hive中是非保留關鍵字。所以吨铸,在使用Hive dialect時行拢,必須使用反引號(`)引用此類關鍵字,才能將其用作標識符诞吱。
  • 在Hive中不能查詢在Flink中創(chuàng)建的視圖舟奠。

當然,一旦開啟了Hive dialect房维,我們就可以按照Hive的操作方式在Flink中去處理Hive的數(shù)據(jù)了沼瘫,具體的操作與Hive一致,本文不再贅述咙俩。

總結(jié)

本文主要介紹了Hive Catalog和Hive Dialect耿戚。其中Hive Catalog的作用是持久化Flink的元數(shù)據(jù)信息,Hive Dialect是支持Hive語法的一個配置參數(shù)阿趁,這兩個概念是Flink集成Hive的關鍵膜蛔。下一篇分享將介紹如何使用Flink讀寫Hive。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末脖阵,一起剝皮案震驚了整個濱河市皂股,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌命黔,老刑警劉巖呜呐,帶你破解...
    沈念sama閱讀 219,188評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異悍募,居然都是意外死亡蘑辑,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,464評論 3 395
  • 文/潘曉璐 我一進店門坠宴,熙熙樓的掌柜王于貴愁眉苦臉地迎上來洋魂,“玉大人,你說我怎么就攤上這事喜鼓「笨常” “怎么了?”我有些...
    開封第一講書人閱讀 165,562評論 0 356
  • 文/不壞的土叔 我叫張陵颠通,是天一觀的道長。 經(jīng)常有香客問我膀懈,道長顿锰,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,893評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮硼控,結(jié)果婚禮上刘陶,老公的妹妹穿的比我還像新娘。我一直安慰自己牢撼,他們只是感情好匙隔,可當我...
    茶點故事閱讀 67,917評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著熏版,像睡著了一般纷责。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上撼短,一...
    開封第一講書人閱讀 51,708評論 1 305
  • 那天再膳,我揣著相機與錄音,去河邊找鬼曲横。 笑死喂柒,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的禾嫉。 我是一名探鬼主播灾杰,決...
    沈念sama閱讀 40,430評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼熙参!你這毒婦竟也來了艳吠?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,342評論 0 276
  • 序言:老撾萬榮一對情侶失蹤尊惰,失蹤者是張志新(化名)和其女友劉穎讲竿,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體弄屡,經(jīng)...
    沈念sama閱讀 45,801評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡题禀,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,976評論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了膀捷。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片迈嘹。...
    茶點故事閱讀 40,115評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖全庸,靈堂內(nèi)的尸體忽然破棺而出秀仲,到底是詐尸還是另有隱情,我是刑警寧澤壶笼,帶...
    沈念sama閱讀 35,804評論 5 346
  • 正文 年R本政府宣布神僵,位于F島的核電站,受9級特大地震影響覆劈,放射性物質(zhì)發(fā)生泄漏保礼。R本人自食惡果不足惜沛励,卻給世界環(huán)境...
    茶點故事閱讀 41,458評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望炮障。 院中可真熱鬧目派,春花似錦、人聲如沸胁赢。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,008評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽智末。三九已至谅摄,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間吹害,已是汗流浹背螟凭。 一陣腳步聲響...
    開封第一講書人閱讀 33,135評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留它呀,地道東北人螺男。 一個月前我還...
    沈念sama閱讀 48,365評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像纵穿,于是被迫代替她去往敵國和親下隧。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,055評論 2 355

推薦閱讀更多精彩內(nèi)容