在上一篇分享Flink集成Hive之快速入門--以Flink1.12為例中盆色,介紹了Flink集成Hive的進本步驟。本文分享,將繼續(xù)介紹Flink集成Hive的另外兩個概念:Hive Catalog與Hive Dialect购裙。本文包括以下內(nèi)容科吭,希望對你有所幫助啄骇。
- 什么是Hive Catalog
- 如何使用Hive Catalog
- 什么是Hive Dialect
- 如何使用Hive Dialect
公眾號『大數(shù)據(jù)技術(shù)與數(shù)倉』丹诀,回復『資料』領取大數(shù)據(jù)資料包
什么是Hive Catalog
我們知道逆巍,Hive使用Hive Metastore(HMS)存儲元數(shù)據(jù)信息尺棋,使用關系型數(shù)據(jù)庫來持久化存儲這些信息封锉。所以,F(xiàn)link集成Hive需要打通Hive的metastore膘螟,去管理Flink的元數(shù)據(jù)成福,這就是Hive Catalog的功能。
Hive Catalog的主要作用是使用Hive MetaStore去管理Flink的元數(shù)據(jù)荆残。Hive Catalog可以將元數(shù)據(jù)進行持久化奴艾,這樣后續(xù)的操作就可以反復使用這些表的元數(shù)據(jù),而不用每次使用時都要重新注冊内斯。如果不去持久化catalog蕴潦,那么在每個session中取處理數(shù)據(jù),都要去重復地創(chuàng)建元數(shù)據(jù)對象俘闯,這樣是非常耗時的潭苞。
如何使用Hive Catalog
HiveCatalog是開箱即用的,所以真朗,一旦配置好Flink與Hive集成萄传,就可以使用HiveCatalog。比如,我們通過FlinkSQL 的DDL語句創(chuàng)建一張kafka的數(shù)據(jù)源表秀菱,立刻就能查看該表的元數(shù)據(jù)信息振诬。
HiveCatalog可以處理兩種類型的表:一種是Hive兼容的表,另一種是普通表(generic table)衍菱。其中Hive兼容表是以兼容Hive的方式來存儲的赶么,所以,對于Hive兼容表而言脊串,我們既可以使用Flink去操作該表辫呻,又可以使用Hive去操作該表。
普通表是對Flink而言的琼锋,當使用HiveCatalog創(chuàng)建一張普通表放闺,僅僅是使用Hive MetaStore將其元數(shù)據(jù)進行了持久化,所以可以通過Hive查看這些表的元數(shù)據(jù)信息(通過DESCRIBE FORMATTED命令)缕坎,但是不能通過Hive去處理這些表怖侦,因為語法不兼容。
對于是否是普通表谜叹,F(xiàn)link使用is_generic屬性進行標識匾寝。默認情況下,創(chuàng)建的表是普通表荷腊,即is_generic=true艳悔,如果要創(chuàng)建Hive兼容表,需要在建表屬性中指定is_generic=false女仰。
尖叫提示:
由于依賴Hive Metastore猜年,所以必須開啟Hive MetaStore服務
代碼中使用Hive Catalog
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/opt/modules/apache-hive-2.3.4-bin/conf";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);
// 使用注冊的catalog
tableEnv.useCatalog("myhive");
Flink SQLCli中使用Hive Catalog
在FlinkSQL Cli中使用Hive Catalog很簡單,只需要配置一下sql-cli-defaults.yaml文件即可疾忍。配置內(nèi)容如下:
catalogs:
- name: myhive
type: hive
default-database: default
hive-conf-dir: /opt/modules/apache-hive-2.3.4-bin/conf
在FlinkSQL Cli中創(chuàng)建一張kafka表码倦,該表默認為普通表,即is_generic=true
CREATE TABLE user_behavior (
`user_id` BIGINT, -- 用戶id
`item_id` BIGINT, -- 商品id
`cat_id` BIGINT, -- 品類id
`action` STRING, -- 用戶行為
`province` INT, -- 用戶所在的省份
`ts` BIGINT, -- 用戶行為發(fā)生的時間戳
`proctime` AS PROCTIME(), -- 通過計算列產(chǎn)生一個處理時間列
`eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件時間
WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND -- 定義watermark
) WITH (
'connector' = 'kafka', -- 使用 kafka connector
'topic' = 'user_behavior', -- kafka主題
'scan.startup.mode' = 'earliest-offset', -- 偏移量
'properties.group.id' = 'group1', -- 消費者組
'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',
'format' = 'json', -- 數(shù)據(jù)源格式為json
'json.fail-on-missing-field' = 'true',
'json.ignore-parse-errors' = 'false'
);
我們可以在Hive客戶端中查看該表的元數(shù)據(jù)信息
hive (default)> desc formatted user_behavior;
Table Parameters:
...
is_generic true
...
從上面的元數(shù)據(jù)信息可以看出锭碳,is_generic=true,說明該表是一張普通表勿璃,如果在Hive中去查看該表擒抛,則會報錯。
上面創(chuàng)建的表是普通表补疑,該表不能使用Hive去查詢歧沪。那么,該如何創(chuàng)建一張Hive兼容表呢莲组?我們只需要在建表的屬性中顯示指定is_generic=false即可诊胞,具體如下:
CREATE TABLE hive_compatible_tbl (
`user_id` BIGINT, -- 用戶id
`item_id` BIGINT, -- 商品id
`cat_id` BIGINT, -- 品類id
`action` STRING, -- 用戶行為
`province` INT, -- 用戶所在的省份
`ts` BIGINT -- 用戶行為發(fā)生的時間戳
) WITH (
'connector' = 'kafka', -- 使用 kafka connector
'topic' = 'user_behavior', -- kafka主題
'scan.startup.mode' = 'earliest-offset', -- 偏移量
'properties.group.id' = 'group1', -- 消費者組
'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',
'format' = 'json', -- 數(shù)據(jù)源格式為json
'json.fail-on-missing-field' = 'true',
'json.ignore-parse-errors' = 'false',
'is_generic' = 'false'
);
當我們在Hive中查看該表的元數(shù)據(jù)信息時,可以看出:is_generic =false
hive (default)> desc formatted hive_compatible_tbl;
Table Parameters:
...
is_generic false
...
我們可以使用FlinkSQL Cli或者HiveCli向該表中寫入數(shù)據(jù),然后分別通過FlinkSQL Cli和Hive Cli去查看該表數(shù)據(jù)的變化
hive (default)> insert into hive_compatible_tbl select 2020,1221,100,'buy',11,1574330486;
hive (default)> select * from hive_compatible_tbl;
再在FlinkSQL Cli中查看該表撵孤,
Flink SQL> select user_id,item_id,action from hive_compatible_tbl;
user_id item_id action
2020 1221 buy
同樣迈着,我們可以在FlinkSQL Cli中去向該表中寫入數(shù)據(jù):
Flink SQL> insert into hive_compatible_tbl select 2020,1222,101,'fav',11,1574330486;
Flink SQL> select user_id,item_id,action from hive_compatible_tbl;
user_id item_id action
2020 1221 buy
2020 1222 fav
尖叫提示:
對于Hive兼容的表,需要注意數(shù)據(jù)類型邪码,具體的數(shù)據(jù)類型對應關系以及注意點如下
Flink 數(shù)據(jù)類型 | Hive 數(shù)據(jù)類型 |
---|---|
CHAR(p) | CHAR(p) |
VARCHAR(p) | VARCHAR(p) |
STRING | STRING |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | LONG |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(p, s) | DECIMAL(p, s) |
DATE | DATE |
TIMESTAMP(9) | TIMESTAMP |
BYTES | BINARY |
ARRAY<T> | LIST<T> |
MAP<K, V> | MAP<K, V> |
ROW | STRUCT |
注意:
- Hive
CHAR(p)
類型的最大長度為255 - Hive
VARCHAR(p)
類型的最大長度為65535 - Hive
MAP
類型的key僅支持基本類型裕菠,而Flink’sMAP
類型的key執(zhí)行任意類型 - Hive不支持聯(lián)合數(shù)據(jù)類型,比如STRUCT
- Hive’s
TIMESTAMP
的精度是 9 闭专, Hive UDFs函數(shù)只能處理 precision <= 9的TIMESTAMP
值 - Hive 不支持 Flink提供的
TIMESTAMP_WITH_TIME_ZONE
,TIMESTAMP_WITH_LOCAL_TIME_ZONE
, 及MULTISET
類型 - Flink
INTERVAL
類型與 HiveINTERVAL
類型不一樣
上面介紹了普通表和Hive兼容表奴潘,那么我們該如何使用Hive的語法進行建表呢?這個時候就需要使用Hive Dialect影钉。
什么是Hive Dialect
從Flink1.11.0開始画髓,只要開啟了Hive dialect配置,用戶就可以使用HiveQL語法平委,這樣我們就可以在Flink中使用Hive的語法使用一些DDL和DML操作奈虾。
Flink目前支持兩種SQL方言(SQL dialects),分別為:default和hive。默認的SQL方言是default肆汹,如果要使用Hive的語法愚墓,需要將SQL方言切換到hive。
如何使用Hive Dialect
在SQL Cli中使用Hive dialect
使用hive dialect只需要配置一個參數(shù)即可昂勉,該參數(shù)名稱為:table.sql-dialect浪册。我們就可以在sql-client-defaults.yaml配置文件中進行配置,也可以在具體的會話窗口中進行設定岗照,對于SQL dialect的切換村象,不需要進行重啟session。
execution:
planner: blink
type: batch
result-mode: table
configuration:
table.sql-dialect: hive
如果我們需要在SQL Cli中進行切換hive dialect攒至,可以使用如下命令:
Flink SQL> set table.sql-dialect=hive; -- 使用hive dialect
Flink SQL> set table.sql-dialect=default; -- 使用default dialect
尖叫提示:
一旦切換到了hive dialect厚者,就只能使用Hive的語法建表,如果嘗試使用Flink的語法建表迫吐,則會報錯
在Table API中配合dialect
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner()...build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// 使用hive dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
// 使用 default dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
操作示例
Flink SQL> set table.sql-dialect=hive;
-- 使用Hive語法創(chuàng)建一張表
CREATE TABLE IF NOT EXISTS `hive_dialect_tbl` (
`id` int COMMENT 'id',
`name` string COMMENT '名稱',
`age` int COMMENT '年齡'
)
COMMENT 'hive dialect表測試'
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
進入Hive客戶端去查看該表的元數(shù)據(jù)信息
desc formatted hive_dialect_tbl;
col_name data_type comment
# col_name data_type comment
id int
name string
age int
# Detailed Table Information
Database: default
Owner: null
CreateTime: Mon Dec 21 17:23:48 CST 2020
LastAccessTime: UNKNOWN
Retention: 0
Location: hdfs://kms-1.apache.com:8020/user/hive/warehouse/hive_dialect_tbl
Table Type: MANAGED_TABLE
Table Parameters:
comment hive dialect表測試
is_generic false
transient_lastDdlTime 1608542628
# Storage Information
SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
Compressed: No
Num Buckets: -1
Bucket Columns: []
Sort Columns: []
Storage Desc Params:
field.delim ,
serialization.format ,
很明顯库菲,該表是一張Hive兼容表,即is_generic=false志膀。
使用FlinkSQLCli向該表中寫入一條數(shù)據(jù):
Flink SQL> insert into hive_dialect_tbl select 1,'tom',20;
我們也可以在Hive的Cli中去操作該表
hive (default)> select * from hive_dialect_tbl;
hive (default)> insert into hive_dialect_tbl select 2,'jack',22;
以下是使用Hive方言的一些注意事項熙宇。
- Hive dialect只能用于操作Hive表,不能用于普通表溉浙。Hive方言應與HiveCatalog一起使用烫止。
- 雖然所有Hive版本都支持相同的語法,但是是否有特定功能仍然取決于使用的Hive版本戳稽。例如馆蠕,僅在Hive-2.4.0或更高版本中支持更新數(shù)據(jù)庫位置。
- Hive和Calcite具有不同的保留關鍵字。例如互躬,
default
在Calcite中是保留關鍵字播赁,在Hive中是非保留關鍵字。所以吨铸,在使用Hive dialect時行拢,必須使用反引號(`)引用此類關鍵字,才能將其用作標識符诞吱。 - 在Hive中不能查詢在Flink中創(chuàng)建的視圖舟奠。
當然,一旦開啟了Hive dialect房维,我們就可以按照Hive的操作方式在Flink中去處理Hive的數(shù)據(jù)了沼瘫,具體的操作與Hive一致,本文不再贅述咙俩。
總結(jié)
本文主要介紹了Hive Catalog和Hive Dialect耿戚。其中Hive Catalog的作用是持久化Flink的元數(shù)據(jù)信息,Hive Dialect是支持Hive語法的一個配置參數(shù)阿趁,這兩個概念是Flink集成Hive的關鍵膜蛔。下一篇分享將介紹如何使用Flink讀寫Hive。