Flink使用HiveCatalog可以通過(guò)批或者流的方式來(lái)處理Hive中的表厕倍。這就意味著Flink既可以作為Hive的一個(gè)批處理引擎赢织,也可以通過(guò)流處理的方式來(lái)讀寫(xiě)Hive中的表磷蜀,從而為實(shí)時(shí)數(shù)倉(cāng)的應(yīng)用和流批一體的落地實(shí)踐奠定了堅(jiān)實(shí)的基礎(chǔ)弥激。本文將以Flink1.12為例,介紹Flink集成Hive的另外一個(gè)非常重要的方面——Hive維表JOIN(Temporal Table Join)與Flink讀寫(xiě)Hive表的方式亲铡。以下是全文,希望本文對(duì)你有所幫助。
公眾號(hào)『大數(shù)據(jù)技術(shù)與數(shù)倉(cāng)』奖蔓,回復(fù)『資料』領(lǐng)取大數(shù)據(jù)資料包
Flink寫(xiě)入Hive表
Flink支持以批處理(Batch)和流處理(Streaming)的方式寫(xiě)入Hive表赞草。當(dāng)以批處理的方式寫(xiě)入Hive表時(shí),只有當(dāng)寫(xiě)入作業(yè)結(jié)束時(shí)吆鹤,才可以看到寫(xiě)入的數(shù)據(jù)厨疙。批處理的方式寫(xiě)入支持append模式和overwrite模式。
批處理模式寫(xiě)入
- 向非分區(qū)表寫(xiě)入數(shù)據(jù)
Flink SQL> use catalog myhive; -- 使用catalog
Flink SQL> INSERT INTO users SELECT 2,'tom';
Flink SQL> set execution.type=batch; -- 使用批處理模式
Flink SQL> INSERT OVERWRITE users SELECT 2,'tom';
- 向分區(qū)表寫(xiě)入數(shù)據(jù)
-- 向靜態(tài)分區(qū)表寫(xiě)入數(shù)據(jù)
Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1', my_date='2019-08-08') SELECT 'Tom', 25;
-- 向動(dòng)態(tài)分區(qū)表寫(xiě)入數(shù)據(jù)
Flink SQL> INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', '2019-08-08';
流處理模式寫(xiě)入
流式寫(xiě)入Hive表疑务,不支持**Insert overwrite **方式沾凄,否則報(bào)如下錯(cuò)誤:
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Streaming mode not support overwrite.
下面的示例是將kafka的數(shù)據(jù)流式寫(xiě)入Hive的分區(qū)表
-- 使用流處理模式
Flink SQL> set execution.type=streaming;
-- 使用Hive方言
Flink SQL> SET table.sql-dialect=hive;
-- 創(chuàng)建一張Hive分區(qū)表
CREATE TABLE user_behavior_hive_tbl (
`user_id` BIGINT, -- 用戶(hù)id
`item_id` BIGINT, -- 商品id
`cat_id` BIGINT, -- 品類(lèi)id
`action` STRING, -- 用戶(hù)行為
`province` INT, -- 用戶(hù)所在的省份
`ts` BIGINT -- 用戶(hù)行為發(fā)生的時(shí)間戳
) PARTITIONED BY (dt STRING,hr STRING,mi STRING) STORED AS parquet TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='0S',
'sink.partition-commit.policy.kind'='metastore,success-file'
);
-- 使用默認(rèn)SQL方言
Flink SQL> SET table.sql-dialect=default;
-- 創(chuàng)建一張kafka數(shù)據(jù)源表
CREATE TABLE user_behavior (
`user_id` BIGINT, -- 用戶(hù)id
`item_id` BIGINT, -- 商品id
`cat_id` BIGINT, -- 品類(lèi)id
`action` STRING, -- 用戶(hù)行為
`province` INT, -- 用戶(hù)所在的省份
`ts` BIGINT, -- 用戶(hù)行為發(fā)生的時(shí)間戳
`proctime` AS PROCTIME(), -- 通過(guò)計(jì)算列產(chǎn)生一個(gè)處理時(shí)間列
`eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件時(shí)間
WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND -- 定義watermark
) WITH (
'connector' = 'kafka', -- 使用 kafka connector
'topic' = 'user_behaviors', -- kafka主題
'scan.startup.mode' = 'earliest-offset', -- 偏移量
'properties.group.id' = 'group1', -- 消費(fèi)者組
'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',
'format' = 'json', -- 數(shù)據(jù)源格式為json
'json.fail-on-missing-field' = 'true',
'json.ignore-parse-errors' = 'false'
);
關(guān)于Hive表的一些屬性解釋?zhuān)?/p>
-
partition.time-extractor.timestamp-pattern
- 默認(rèn)值:(none)
- 解釋?zhuān)悍謪^(qū)時(shí)間抽取器,與 DDL 中的分區(qū)字段保持一致,如果是按天分區(qū)知允,則可以是**
year-
day
dt $hour:00:00`;
-
sink.partition-commit.trigger
- 默認(rèn)值:process-time
- 解釋?zhuān)悍謪^(qū)觸發(fā)器類(lèi)型,可選 process-time 或partition-time涤垫。
- process-time:不需要時(shí)間提取器和水位線(xiàn)配椭,當(dāng)當(dāng)前時(shí)間大于分區(qū)創(chuàng)建時(shí)間 + sink.partition-commit.delay 中定義的時(shí)間,提交分區(qū)雹姊;
- partition-time:需要 Source 表中定義 watermark股缸,當(dāng) watermark > 提取到的分區(qū)時(shí)間 +sink.partition-commit.delay 中定義的時(shí)間,提交分區(qū)吱雏;
-
sink.partition-commit.delay
- 默認(rèn)值:0S
- 解釋?zhuān)悍謪^(qū)提交的延時(shí)時(shí)間敦姻,如果是按天分區(qū),則該屬性的值為:1d歧杏,如果是按小時(shí)分區(qū)镰惦,則該屬性值為1h;
-
sink.partition-commit.policy.kind
默認(rèn)值:(none)
-
解釋?zhuān)禾峤环謪^(qū)的策略,用于通知下游的應(yīng)用該分區(qū)已經(jīng)完成了寫(xiě)入犬绒,也就是說(shuō)該分區(qū)的數(shù)據(jù)可以被訪問(wèn)讀取旺入。可選的值如下:
- metastore:添加分區(qū)的元數(shù)據(jù)信息凯力,僅Hive表支持該值配置
-
success-file:在表的存儲(chǔ)路徑下添加一個(gè)
_SUCCESS
文件
可以同時(shí)配置上面的兩個(gè)值茵瘾,比如metastore,success-file
執(zhí)行流式寫(xiě)入Hive表
-- streaming sql,將數(shù)據(jù)寫(xiě)入Hive表
INSERT INTO user_behavior_hive_tbl
SELECT
user_id,
item_id,
cat_id,
action,
province,
ts,
FROM_UNIXTIME(ts, 'yyyy-MM-dd'),
FROM_UNIXTIME(ts, 'HH'),
FROM_UNIXTIME(ts, 'mm')
FROM user_behavior;
-- batch sql,查詢(xún)Hive表的分區(qū)數(shù)據(jù)
SELECT * FROM user_behavior_hive_tbl WHERE dt='2021-01-04' AND hr='16' AND mi = '46';
同時(shí)查看Hive表的分區(qū)數(shù)據(jù):
尖叫提示:
1.Flink讀取Hive表默認(rèn)使用的是batch模式,如果要使用流式讀取Hive表咐鹤,需要而外指定一些參數(shù)拗秘,見(jiàn)下文。
2.只有在完成 Checkpoint 之后祈惶,文件才會(huì)從 In-progress 狀態(tài)變成 Finish 狀態(tài)雕旨,同時(shí)生成
_SUCCESS
文件扮匠,所以,F(xiàn)link流式寫(xiě)入Hive表需要開(kāi)啟并配置 Checkpoint凡涩。對(duì)于Flink SQL Client而言棒搜,需要在flink-conf.yaml中開(kāi)啟CheckPoint,配置內(nèi)容為:state.backend: filesystem
execution.checkpointing.externalized-checkpoint-retention:RETAIN_ON_CANCELLATION
execution.checkpointing.interval: 60s
execution.checkpointing.mode: EXACTLY_ONCE
state.savepoints.dir: hdfs://kms-1:8020/flink-savepoints
Flink讀取Hive表
Flink支持以批處理(Batch)和流處理(Streaming)的方式讀取Hive中的表活箕。批處理的方式與Hive的本身查詢(xún)類(lèi)似力麸,即只在提交查詢(xún)的時(shí)刻查詢(xún)一次Hive表。流處理的方式將會(huì)持續(xù)地監(jiān)控Hive表讹蘑,并且會(huì)增量地提取新的數(shù)據(jù)末盔。默認(rèn)情況下筑舅,F(xiàn)link是以批處理的方式讀取Hive表座慰。
關(guān)于流式讀取Hive表,F(xiàn)link既支持分區(qū)表又支持非分區(qū)表翠拣。對(duì)于分區(qū)表而言版仔,F(xiàn)link將會(huì)監(jiān)控新產(chǎn)生的分區(qū)數(shù)據(jù),并以增量的方式讀取這些數(shù)據(jù)误墓。對(duì)于非分區(qū)表蛮粮,F(xiàn)link會(huì)監(jiān)控Hive表存儲(chǔ)路徑文件夾里面的新文件,并以增量的方式讀取新的數(shù)據(jù)谜慌。
Flink讀取Hive表可以配置一下參數(shù):
-
streaming-source.enable
- 默認(rèn)值:false
- 解釋?zhuān)菏欠耖_(kāi)啟流式讀取 Hive 表然想,默認(rèn)不開(kāi)啟。
-
streaming-source.partition.include
- 默認(rèn)值:all
- 解釋?zhuān)号渲米x取Hive的分區(qū)欣范,包括兩種方式:all和latest变泄。all意味著讀取所有分區(qū)的數(shù)據(jù),latest表示只讀取最新的分區(qū)數(shù)據(jù)恼琼。值得注意的是妨蛹,latest方式只能用于開(kāi)啟了流式讀取Hive表,并用于維表JOIN的場(chǎng)景晴竞。
-
streaming-source.monitor-interval
- 默認(rèn)值:None
- 解釋?zhuān)撼掷m(xù)監(jiān)控Hive表分區(qū)或者文件的時(shí)間間隔蛙卤。值得注意的是,當(dāng)以流的方式讀取Hive表時(shí)噩死,該參數(shù)的默認(rèn)值是1m颤难,即1分鐘。當(dāng)temporal join時(shí)已维,默認(rèn)的值是60m乐严,即1小時(shí)。另外衣摩,該參數(shù)配置不宜過(guò)短 昂验,最短是1 個(gè)小時(shí)捂敌,因?yàn)槟壳暗膶?shí)現(xiàn)是每個(gè) task 都會(huì)查詢(xún) metastore,高頻的查可能會(huì)對(duì)metastore 產(chǎn)生過(guò)大的壓力既琴。
-
streaming-source.partition-order
- 默認(rèn)值:partition-name
- 解釋?zhuān)簊treaming source的分區(qū)順序占婉。默認(rèn)的是partition-name,表示使用默認(rèn)分區(qū)名稱(chēng)順序加載最新分區(qū)甫恩,也是推薦使用的方式逆济。除此之外還有兩種方式,分別為:create-time和partition-time磺箕。其中create-time表示使用分區(qū)文件創(chuàng)建時(shí)間順序奖慌。partition-time表示使用分區(qū)時(shí)間順序。指的注意的是松靡,對(duì)于非分區(qū)表简僧,該參數(shù)的默認(rèn)值為:create-time。
-
streaming-source.consume-start-offset
- 默認(rèn)值:None
- 解釋?zhuān)毫魇阶x取Hive表的起始偏移量雕欺。
-
partition.time-extractor.kind
- 默認(rèn)值:default
- 分區(qū)時(shí)間提取器類(lèi)型岛马。用于從分區(qū)中提取時(shí)間,支持default和自定義屠列。如果使用default啦逆,則需要通過(guò)參數(shù)
partition.time-extractor.timestamp-pattern
配置時(shí)間戳提取的正則表達(dá)式。
在 SQL Client 中需要顯示地開(kāi)啟 SQL Hint 功能
Flink SQL> set table.dynamic-table-options.enabled= true;
使用SQLHint流式查詢(xún)Hive表
SELECT * FROM user_behavior_hive_tbl /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2021-01-03') */;
Hive維表JOIN
Flink 1.12 支持了 Hive 最新的分區(qū)作為時(shí)態(tài)表的功能笛洛,可以通過(guò) SQL 的方式直接關(guān)聯(lián) Hive 分區(qū)表的最新分區(qū)夏志,并且會(huì)自動(dòng)監(jiān)聽(tīng)最新的 Hive 分區(qū),當(dāng)監(jiān)控到新的分區(qū)后苛让,會(huì)自動(dòng)地做維表數(shù)據(jù)的全量替換沟蔑。
Flink支持的是processing-time的temporal join,也就是說(shuō)總是與最新版本的時(shí)態(tài)表進(jìn)行JOIN蝌诡。另外溉贿,F(xiàn)link既支持非分區(qū)表的temporal join,又支持分區(qū)表的temporal join浦旱。對(duì)于分區(qū)表而言宇色,F(xiàn)link會(huì)監(jiān)聽(tīng)Hive表的最新分區(qū)數(shù)據(jù)。值得注意的是颁湖,F(xiàn)link尚不支持 event-time temporal join宣蠕。
Temporal Join最新分區(qū)
對(duì)于一張隨著時(shí)間變化的Hive分區(qū)表,F(xiàn)link可以讀取該表的數(shù)據(jù)作為一個(gè)無(wú)界流甥捺。如果Hive分區(qū)表的每個(gè)分區(qū)都包含全量的數(shù)據(jù)抢蚀,那么每個(gè)分區(qū)將做為一個(gè)時(shí)態(tài)表的版本數(shù)據(jù),即將最新的分區(qū)數(shù)據(jù)作為一個(gè)全量維表數(shù)據(jù)镰禾。值得注意的是皿曲,該功能特點(diǎn)僅支持Flink的STREAMING模式唱逢。
使用 Hive 最新分區(qū)作為 Tempmoral table 之前,需要設(shè)置必要的兩個(gè)參數(shù):
'streaming-source.enable' = 'true',
'streaming-source.partition.include' = 'latest'
除此之外還有一些其他的參數(shù)屋休,關(guān)于參數(shù)的解釋見(jiàn)上面的分析坞古。我們?cè)谑褂肏ive維表的時(shí)候,既可以在創(chuàng)建Hive表時(shí)指定具體的參數(shù)劫樟,也可以使用SQL Hint的方式動(dòng)態(tài)指定參數(shù)痪枫。一個(gè)Hive維表的創(chuàng)建模板如下:
-- 使用Hive的sql方言
SET table.sql-dialect=hive;
CREATE TABLE dimension_table (
product_id STRING,
product_name STRING,
unit_price DECIMAL(10, 4),
pv_count BIGINT,
like_count BIGINT,
comment_count BIGINT,
update_time TIMESTAMP(3),
update_user STRING,
...
) PARTITIONED BY (pt_year STRING, pt_month STRING, pt_day STRING) TBLPROPERTIES (
-- 方式1:按照分區(qū)名排序來(lái)識(shí)別最新分區(qū)(推薦使用該種方式)
'streaming-source.enable' = 'true', -- 開(kāi)啟Streaming source
'streaming-source.partition.include' = 'latest',-- 選擇最新分區(qū)
'streaming-source.monitor-interval' = '12 h',-- 每12小時(shí)加載一次最新分區(qū)數(shù)據(jù)
'streaming-source.partition-order' = 'partition-name', -- 按照分區(qū)名排序
-- 方式2:分區(qū)文件的創(chuàng)建時(shí)間排序來(lái)識(shí)別最新分區(qū)
'streaming-source.enable' = 'true',
'streaming-source.partition.include' = 'latest',
'streaming-source.partition-order' = 'create-time',-- 分區(qū)文件的創(chuàng)建時(shí)間排序
'streaming-source.monitor-interval' = '12 h'
-- 方式3:按照分區(qū)時(shí)間排序來(lái)識(shí)別最新分區(qū)
'streaming-source.enable' = 'true',
'streaming-source.partition.include' = 'latest',
'streaming-source.monitor-interval' = '12 h',
'streaming-source.partition-order' = 'partition-time', -- 按照分區(qū)時(shí)間排序
'partition.time-extractor.kind' = 'default',
'partition.time-extractor.timestamp-pattern' = '$pt_year-$pt_month-$pt_day 00:00:00'
);
有了上面的Hive維表,我們就可以使用該維表與Kafka的實(shí)時(shí)流數(shù)據(jù)進(jìn)行JOIN叠艳,得到相應(yīng)的寬表數(shù)據(jù)奶陈。
-- 使用default sql方言
SET table.sql-dialect=default;
-- kafka實(shí)時(shí)流數(shù)據(jù)表
CREATE TABLE orders_table (
order_id STRING,
order_amount DOUBLE,
product_id STRING,
log_ts TIMESTAMP(3),
proctime as PROCTIME()
) WITH (...);
-- 將流表與hive最新分區(qū)數(shù)據(jù)關(guān)聯(lián)
SELECT *
FROM orders_table AS orders
JOIN dimension_table FOR SYSTEM_TIME AS OF orders.proctime AS dim
ON orders.product_id = dim.product_id;
除了在定義Hive維表時(shí)指定相關(guān)的參數(shù),我們還可以通過(guò)SQL Hint的方式動(dòng)態(tài)指定相關(guān)的參數(shù)附较,具體方式如下:
SELECT *
FROM orders_table AS orders
JOIN dimension_table
/*+ OPTIONS('streaming-source.enable'='true',
'streaming-source.partition.include' = 'latest',
'streaming-source.monitor-interval' = '1 h',
'streaming-source.partition-order' = 'partition-name') */
FOR SYSTEM_TIME AS OF orders.proctime AS dim -- 時(shí)態(tài)表(維表)
ON orders.product_id = dim.product_id;
Temporal Join最新表
對(duì)于Hive的非分區(qū)表吃粒,當(dāng)使用temporal join時(shí),整個(gè)Hive表會(huì)被緩存到Slot內(nèi)存中翅睛,然后根據(jù)流中的數(shù)據(jù)對(duì)應(yīng)的key與其進(jìn)行匹配声搁。使用最新的Hive表進(jìn)行temporal join不需要進(jìn)行額外的配置黑竞,我們只需要配置一個(gè)Hive表緩存的TTL時(shí)間捕发,該時(shí)間的作用是:當(dāng)緩存過(guò)期時(shí),就會(huì)重新掃描Hive表并加載最新的數(shù)據(jù)很魂。
-
lookup.join.cache.ttl
- 默認(rèn)值:60min
- 解釋?zhuān)罕硎揪彺鏁r(shí)間扎酷。由于 Hive 維表會(huì)把維表所有數(shù)據(jù)緩存在 TM 的內(nèi)存中,當(dāng)維表數(shù)據(jù)量很大時(shí)遏匆,很容易造成 OOM法挨。當(dāng)然TTL的時(shí)間也不能太短,因?yàn)闀?huì)頻繁地加載數(shù)據(jù)幅聘,從而影響性能凡纳。
尖叫提示:
當(dāng)使用此種方式時(shí),Hive表必須是有界的lookup表帝蒿,即非Streaming Source的時(shí)態(tài)表荐糜,換句話(huà)說(shuō),該表的屬性streaming-source.enable = false葛超。
如果要使用Streaming Source的時(shí)態(tài)表暴氏,記得配置streaming-source.monitor-interval的值,即數(shù)據(jù)更新的時(shí)間間隔绣张。
-- Hive維表數(shù)據(jù)使用批處理的方式按天裝載
SET table.sql-dialect=hive;
CREATE TABLE dimension_table (
product_id STRING,
product_name STRING,
unit_price DECIMAL(10, 4),
pv_count BIGINT,
like_count BIGINT,
comment_count BIGINT,
update_time TIMESTAMP(3),
update_user STRING,
...
) TBLPROPERTIES (
'streaming-source.enable' = 'false', -- 關(guān)閉streaming source
'streaming-source.partition.include' = 'all', -- 讀取所有數(shù)據(jù)
'lookup.join.cache.ttl' = '12 h'
);
-- kafka事實(shí)表
SET table.sql-dialect=default;
CREATE TABLE orders_table (
order_id STRING,
order_amount DOUBLE,
product_id STRING,
log_ts TIMESTAMP(3),
proctime as PROCTIME()
) WITH (...);
-- Hive維表join答渔,F(xiàn)link會(huì)加載該維表的所有數(shù)據(jù)到內(nèi)存中
SELECT *
FROM orders_table AS orders
JOIN dimension_table FOR SYSTEM_TIME AS OF orders.proctime AS dim
ON orders.product_id = dim.product_id;
尖叫提示:
1.每一個(gè)子任務(wù)都需要緩存一份維表的全量數(shù)據(jù),一定要確保TM的task Slot 大小能夠容納維表的數(shù)據(jù)量侥涵;
2.推薦將streaming-source.monitor-interval和lookup.join.cache.ttl的值設(shè)為一個(gè)較大的數(shù)沼撕,因?yàn)轭l繁的更新和加載數(shù)據(jù)會(huì)影響性能宋雏。
3.當(dāng)緩存的維表數(shù)據(jù)需要重新刷新時(shí),目前的做法是將整個(gè)表進(jìn)行加載务豺,因此不能夠?qū)⑿聰?shù)據(jù)與舊數(shù)據(jù)區(qū)分開(kāi)來(lái)好芭。
Hive維表JOIN示例
假設(shè)維表的數(shù)據(jù)是通過(guò)批處理的方式(比如每天)裝載至Hive中,而Kafka中的事實(shí)流數(shù)據(jù)需要與該維表進(jìn)行JOIN冲呢,從而構(gòu)建一個(gè)寬表數(shù)據(jù)舍败,這個(gè)時(shí)候就可以使用Hive的維表JOIN。
- 創(chuàng)建一張kafka數(shù)據(jù)源表,實(shí)時(shí)流
SET table.sql-dialect=default;
CREATE TABLE fact_user_behavior (
`user_id` BIGINT, -- 用戶(hù)id
`item_id` BIGINT, -- 商品id
`action` STRING, -- 用戶(hù)行為
`province` INT, -- 用戶(hù)所在的省份
`ts` BIGINT, -- 用戶(hù)行為發(fā)生的時(shí)間戳
`proctime` AS PROCTIME(), -- 通過(guò)計(jì)算列產(chǎn)生一個(gè)處理時(shí)間列
`eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件時(shí)間
WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND -- 定義watermark
) WITH (
'connector' = 'kafka', -- 使用 kafka connector
'topic' = 'user_behaviors', -- kafka主題
'scan.startup.mode' = 'earliest-offset', -- 偏移量
'properties.group.id' = 'group1', -- 消費(fèi)者組
'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',
'format' = 'json', -- 數(shù)據(jù)源格式為json
'json.fail-on-missing-field' = 'true',
'json.ignore-parse-errors' = 'false'
);
- 創(chuàng)建一張Hive維表
SET table.sql-dialect=hive;
CREATE TABLE dim_item (
item_id BIGINT,
item_name STRING,
unit_price DECIMAL(10, 4)
) PARTITIONED BY (dt STRING) TBLPROPERTIES (
'streaming-source.enable' = 'true',
'streaming-source.partition.include' = 'latest',
'streaming-source.monitor-interval' = '12 h',
'streaming-source.partition-order' = 'partition-name'
);
- 關(guān)聯(lián)Hive維表的最新數(shù)據(jù)
SELECT
fact.item_id,
dim.item_name,
count(*) AS buy_cnt
FROM fact_user_behavior AS fact
LEFT JOIN dim_item FOR SYSTEM_TIME AS OF fact.proctime AS dim
ON fact.item_id = dim.item_id
WHERE fact.action = 'buy'
GROUP BY fact.item_id,dim.item_name;
使用SQL Hint方式敬拓,關(guān)聯(lián)非分區(qū)的Hive維表:
set table.dynamic-table-options.enabled= true;
SELECT
fact.item_id,
dim.item_name,
count(*) AS buy_cnt
FROM fact_user_behavior AS fact
LEFT JOIN dim_item1
/*+ OPTIONS('streaming-source.enable'='false',
'streaming-source.partition.include' = 'all',
'lookup.join.cache.ttl' = '12 h') */
FOR SYSTEM_TIME AS OF fact.proctime AS dim
ON fact.item_id = dim.item_id
WHERE fact.action = 'buy'
GROUP BY fact.item_id,dim.item_name;
總結(jié)
本文以最新版本的Flink1.12為例邻薯,介紹了Flink讀寫(xiě)Hive的不同方式,并對(duì)每種方式給出了相應(yīng)的使用示例乘凸。在實(shí)際應(yīng)用中厕诡,通常有將實(shí)時(shí)數(shù)據(jù)流與 Hive 維表 join 來(lái)構(gòu)造寬表的需求,F(xiàn)link提供了Hive維表JOIN营勤,可以簡(jiǎn)化用戶(hù)使用的復(fù)雜度灵嫌。本文在最后詳細(xì)說(shuō)明了Flink進(jìn)行Hive維表JOIN的基本步驟以及使用示例,希望對(duì)你有所幫助葛作。
公眾號(hào)『大數(shù)據(jù)技術(shù)與數(shù)倉(cāng)』寿羞,回復(fù)『資料』領(lǐng)取大數(shù)據(jù)資料包