Flink on Hive構(gòu)建流批一體數(shù)倉(cāng)

Flink使用HiveCatalog可以通過(guò)或者的方式來(lái)處理Hive中的表厕倍。這就意味著Flink既可以作為Hive的一個(gè)批處理引擎赢织,也可以通過(guò)流處理的方式來(lái)讀寫(xiě)Hive中的表磷蜀,從而為實(shí)時(shí)數(shù)倉(cāng)的應(yīng)用和流批一體的落地實(shí)踐奠定了堅(jiān)實(shí)的基礎(chǔ)弥激。本文將以Flink1.12為例,介紹Flink集成Hive的另外一個(gè)非常重要的方面——Hive維表JOIN(Temporal Table Join)與Flink讀寫(xiě)Hive表的方式亲铡。以下是全文,希望本文對(duì)你有所幫助。

公眾號(hào)『大數(shù)據(jù)技術(shù)與數(shù)倉(cāng)』奖蔓,回復(fù)『資料』領(lǐng)取大數(shù)據(jù)資料包

Flink寫(xiě)入Hive表

Flink支持以批處理(Batch)和流處理(Streaming)的方式寫(xiě)入Hive表赞草。當(dāng)以批處理的方式寫(xiě)入Hive表時(shí),只有當(dāng)寫(xiě)入作業(yè)結(jié)束時(shí)吆鹤,才可以看到寫(xiě)入的數(shù)據(jù)厨疙。批處理的方式寫(xiě)入支持append模式和overwrite模式

批處理模式寫(xiě)入

  • 向非分區(qū)表寫(xiě)入數(shù)據(jù)
Flink SQL> use catalog myhive; -- 使用catalog
Flink SQL> INSERT INTO users SELECT 2,'tom';
Flink SQL> set execution.type=batch; -- 使用批處理模式
Flink SQL> INSERT OVERWRITE users SELECT 2,'tom';
  • 向分區(qū)表寫(xiě)入數(shù)據(jù)
-- 向靜態(tài)分區(qū)表寫(xiě)入數(shù)據(jù)
Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1', my_date='2019-08-08') SELECT 'Tom', 25;
-- 向動(dòng)態(tài)分區(qū)表寫(xiě)入數(shù)據(jù)
Flink SQL> INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', '2019-08-08';

流處理模式寫(xiě)入

流式寫(xiě)入Hive表疑务,不支持**Insert overwrite **方式沾凄,否則報(bào)如下錯(cuò)誤:

[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Streaming mode not support overwrite.

下面的示例是將kafka的數(shù)據(jù)流式寫(xiě)入Hive的分區(qū)表

-- 使用流處理模式
Flink SQL> set execution.type=streaming;
-- 使用Hive方言
Flink SQL> SET table.sql-dialect=hive; 
-- 創(chuàng)建一張Hive分區(qū)表
CREATE TABLE user_behavior_hive_tbl (
   `user_id` BIGINT, -- 用戶(hù)id
    `item_id` BIGINT, -- 商品id
    `cat_id` BIGINT, -- 品類(lèi)id
    `action` STRING, -- 用戶(hù)行為
    `province` INT, -- 用戶(hù)所在的省份
    `ts` BIGINT -- 用戶(hù)行為發(fā)生的時(shí)間戳
) PARTITIONED BY (dt STRING,hr STRING,mi STRING) STORED AS parquet  TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='0S',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);

-- 使用默認(rèn)SQL方言
Flink SQL> SET table.sql-dialect=default; 
-- 創(chuàng)建一張kafka數(shù)據(jù)源表
CREATE TABLE user_behavior ( 
    `user_id` BIGINT, -- 用戶(hù)id
    `item_id` BIGINT, -- 商品id
    `cat_id` BIGINT, -- 品類(lèi)id
    `action` STRING, -- 用戶(hù)行為
    `province` INT, -- 用戶(hù)所在的省份
    `ts` BIGINT, -- 用戶(hù)行為發(fā)生的時(shí)間戳
    `proctime` AS PROCTIME(), -- 通過(guò)計(jì)算列產(chǎn)生一個(gè)處理時(shí)間列
    `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件時(shí)間
     WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND  -- 定義watermark
 ) WITH ( 
    'connector' = 'kafka', -- 使用 kafka connector
    'topic' = 'user_behaviors', -- kafka主題
    'scan.startup.mode' = 'earliest-offset', -- 偏移量
    'properties.group.id' = 'group1', -- 消費(fèi)者組
    'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 
    'format' = 'json', -- 數(shù)據(jù)源格式為json
    'json.fail-on-missing-field' = 'true',
    'json.ignore-parse-errors' = 'false'
);

關(guān)于Hive表的一些屬性解釋?zhuān)?/p>

  • partition.time-extractor.timestamp-pattern

    • 默認(rèn)值:(none)
    • 解釋?zhuān)悍謪^(qū)時(shí)間抽取器,與 DDL 中的分區(qū)字段保持一致,如果是按天分區(qū)知允,則可以是**dt**撒蟀,如果是按年(year)月(month)日(day)時(shí)(hour)進(jìn)行分區(qū),則該屬性值為:`year-month-day hour:00:00`温鸽,如果是按天時(shí)進(jìn)行分區(qū)保屯,則該屬性值為:`dt $hour:00:00`;
  • sink.partition-commit.trigger

    • 默認(rèn)值:process-time
    • 解釋?zhuān)悍謪^(qū)觸發(fā)器類(lèi)型,可選 process-time 或partition-time涤垫。
      • process-time:不需要時(shí)間提取器和水位線(xiàn)配椭,當(dāng)當(dāng)前時(shí)間大于分區(qū)創(chuàng)建時(shí)間 + sink.partition-commit.delay 中定義的時(shí)間,提交分區(qū)雹姊;
      • partition-time:需要 Source 表中定義 watermark股缸,當(dāng) watermark > 提取到的分區(qū)時(shí)間 +sink.partition-commit.delay 中定義的時(shí)間,提交分區(qū)吱雏;
  • sink.partition-commit.delay

    • 默認(rèn)值:0S
    • 解釋?zhuān)悍謪^(qū)提交的延時(shí)時(shí)間敦姻,如果是按天分區(qū),則該屬性的值為:1d歧杏,如果是按小時(shí)分區(qū)镰惦,則該屬性值為1h;
  • sink.partition-commit.policy.kind

    • 默認(rèn)值:(none)

    • 解釋?zhuān)禾峤环謪^(qū)的策略,用于通知下游的應(yīng)用該分區(qū)已經(jīng)完成了寫(xiě)入犬绒,也就是說(shuō)該分區(qū)的數(shù)據(jù)可以被訪問(wèn)讀取旺入。可選的值如下:

      • metastore:添加分區(qū)的元數(shù)據(jù)信息凯力,僅Hive表支持該值配置
      • success-file:在表的存儲(chǔ)路徑下添加一個(gè)_SUCCESS文件

      可以同時(shí)配置上面的兩個(gè)值茵瘾,比如metastore,success-file

執(zhí)行流式寫(xiě)入Hive表

-- streaming sql,將數(shù)據(jù)寫(xiě)入Hive表
INSERT INTO user_behavior_hive_tbl 
SELECT 
    user_id,
    item_id,
    cat_id,
    action,
    province,
    ts,
    FROM_UNIXTIME(ts, 'yyyy-MM-dd'),
    FROM_UNIXTIME(ts, 'HH'),
    FROM_UNIXTIME(ts, 'mm')
FROM user_behavior;

-- batch sql,查詢(xún)Hive表的分區(qū)數(shù)據(jù)
SELECT * FROM user_behavior_hive_tbl WHERE dt='2021-01-04' AND  hr='16' AND mi = '46';

同時(shí)查看Hive表的分區(qū)數(shù)據(jù):


在這里插入圖片描述

尖叫提示:

1.Flink讀取Hive表默認(rèn)使用的是batch模式,如果要使用流式讀取Hive表咐鹤,需要而外指定一些參數(shù)拗秘,見(jiàn)下文。

2.只有在完成 Checkpoint 之后祈惶,文件才會(huì)從 In-progress 狀態(tài)變成 Finish 狀態(tài)雕旨,同時(shí)生成_SUCCESS文件扮匠,所以,F(xiàn)link流式寫(xiě)入Hive表需要開(kāi)啟并配置 Checkpoint凡涩。對(duì)于Flink SQL Client而言棒搜,需要在flink-conf.yaml中開(kāi)啟CheckPoint,配置內(nèi)容為:

state.backend: filesystem
execution.checkpointing.externalized-checkpoint-retention:RETAIN_ON_CANCELLATION
execution.checkpointing.interval: 60s
execution.checkpointing.mode: EXACTLY_ONCE
state.savepoints.dir: hdfs://kms-1:8020/flink-savepoints

Flink讀取Hive表

Flink支持以批處理(Batch)和流處理(Streaming)的方式讀取Hive中的表活箕。批處理的方式與Hive的本身查詢(xún)類(lèi)似力麸,即只在提交查詢(xún)的時(shí)刻查詢(xún)一次Hive表。流處理的方式將會(huì)持續(xù)地監(jiān)控Hive表讹蘑,并且會(huì)增量地提取新的數(shù)據(jù)末盔。默認(rèn)情況下筑舅,F(xiàn)link是以批處理的方式讀取Hive表座慰。

關(guān)于流式讀取Hive表,F(xiàn)link既支持分區(qū)表又支持非分區(qū)表翠拣。對(duì)于分區(qū)表而言版仔,F(xiàn)link將會(huì)監(jiān)控新產(chǎn)生的分區(qū)數(shù)據(jù),并以增量的方式讀取這些數(shù)據(jù)误墓。對(duì)于非分區(qū)表蛮粮,F(xiàn)link會(huì)監(jiān)控Hive表存儲(chǔ)路徑文件夾里面的新文件,并以增量的方式讀取新的數(shù)據(jù)谜慌。

Flink讀取Hive表可以配置一下參數(shù):

  • streaming-source.enable

    • 默認(rèn)值:false
    • 解釋?zhuān)菏欠耖_(kāi)啟流式讀取 Hive 表然想,默認(rèn)不開(kāi)啟。
  • streaming-source.partition.include

    • 默認(rèn)值:all
    • 解釋?zhuān)号渲米x取Hive的分區(qū)欣范,包括兩種方式:all和latest变泄。all意味著讀取所有分區(qū)的數(shù)據(jù),latest表示只讀取最新的分區(qū)數(shù)據(jù)恼琼。值得注意的是妨蛹,latest方式只能用于開(kāi)啟了流式讀取Hive表,并用于維表JOIN的場(chǎng)景晴竞。
  • streaming-source.monitor-interval

    • 默認(rèn)值:None
    • 解釋?zhuān)撼掷m(xù)監(jiān)控Hive表分區(qū)或者文件的時(shí)間間隔蛙卤。值得注意的是,當(dāng)以流的方式讀取Hive表時(shí)噩死,該參數(shù)的默認(rèn)值是1m颤难,即1分鐘。當(dāng)temporal join時(shí)已维,默認(rèn)的值是60m乐严,即1小時(shí)。另外衣摩,該參數(shù)配置不宜過(guò)短 昂验,最短是1 個(gè)小時(shí)捂敌,因?yàn)槟壳暗膶?shí)現(xiàn)是每個(gè) task 都會(huì)查詢(xún) metastore,高頻的查可能會(huì)對(duì)metastore 產(chǎn)生過(guò)大的壓力既琴。
  • streaming-source.partition-order

    • 默認(rèn)值:partition-name
    • 解釋?zhuān)簊treaming source的分區(qū)順序占婉。默認(rèn)的是partition-name,表示使用默認(rèn)分區(qū)名稱(chēng)順序加載最新分區(qū)甫恩,也是推薦使用的方式逆济。除此之外還有兩種方式,分別為:create-time和partition-time磺箕。其中create-time表示使用分區(qū)文件創(chuàng)建時(shí)間順序奖慌。partition-time表示使用分區(qū)時(shí)間順序。指的注意的是松靡,對(duì)于非分區(qū)表简僧,該參數(shù)的默認(rèn)值為:create-time
  • streaming-source.consume-start-offset

    • 默認(rèn)值:None
    • 解釋?zhuān)毫魇阶x取Hive表的起始偏移量雕欺。
  • partition.time-extractor.kind
    • 默認(rèn)值:default
    • 分區(qū)時(shí)間提取器類(lèi)型岛马。用于從分區(qū)中提取時(shí)間,支持default和自定義屠列。如果使用default啦逆,則需要通過(guò)參數(shù)partition.time-extractor.timestamp-pattern配置時(shí)間戳提取的正則表達(dá)式。

在 SQL Client 中需要顯示地開(kāi)啟 SQL Hint 功能

Flink SQL> set table.dynamic-table-options.enabled= true;  

使用SQLHint流式查詢(xún)Hive表

SELECT * FROM user_behavior_hive_tbl /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2021-01-03') */;

Hive維表JOIN

Flink 1.12 支持了 Hive 最新的分區(qū)作為時(shí)態(tài)表的功能笛洛,可以通過(guò) SQL 的方式直接關(guān)聯(lián) Hive 分區(qū)表的最新分區(qū)夏志,并且會(huì)自動(dòng)監(jiān)聽(tīng)最新的 Hive 分區(qū),當(dāng)監(jiān)控到新的分區(qū)后苛让,會(huì)自動(dòng)地做維表數(shù)據(jù)的全量替換沟蔑。

Flink支持的是processing-time的temporal join,也就是說(shuō)總是與最新版本的時(shí)態(tài)表進(jìn)行JOIN蝌诡。另外溉贿,F(xiàn)link既支持非分區(qū)表的temporal join,又支持分區(qū)表的temporal join浦旱。對(duì)于分區(qū)表而言宇色,F(xiàn)link會(huì)監(jiān)聽(tīng)Hive表的最新分區(qū)數(shù)據(jù)。值得注意的是颁湖,F(xiàn)link尚不支持 event-time temporal join宣蠕。

Temporal Join最新分區(qū)

對(duì)于一張隨著時(shí)間變化的Hive分區(qū)表,F(xiàn)link可以讀取該表的數(shù)據(jù)作為一個(gè)無(wú)界流甥捺。如果Hive分區(qū)表的每個(gè)分區(qū)都包含全量的數(shù)據(jù)抢蚀,那么每個(gè)分區(qū)將做為一個(gè)時(shí)態(tài)表的版本數(shù)據(jù),即將最新的分區(qū)數(shù)據(jù)作為一個(gè)全量維表數(shù)據(jù)镰禾。值得注意的是皿曲,該功能特點(diǎn)僅支持Flink的STREAMING模式唱逢。

使用 Hive 最新分區(qū)作為 Tempmoral table 之前,需要設(shè)置必要的兩個(gè)參數(shù):

'streaming-source.enable' = 'true',  
'streaming-source.partition.include' = 'latest'

除此之外還有一些其他的參數(shù)屋休,關(guān)于參數(shù)的解釋見(jiàn)上面的分析坞古。我們?cè)谑褂肏ive維表的時(shí)候,既可以在創(chuàng)建Hive表時(shí)指定具體的參數(shù)劫樟,也可以使用SQL Hint的方式動(dòng)態(tài)指定參數(shù)痪枫。一個(gè)Hive維表的創(chuàng)建模板如下:

-- 使用Hive的sql方言
SET table.sql-dialect=hive;
CREATE TABLE dimension_table (
  product_id STRING,
  product_name STRING,
  unit_price DECIMAL(10, 4),
  pv_count BIGINT,
  like_count BIGINT,
  comment_count BIGINT,
  update_time TIMESTAMP(3),
  update_user STRING,
  ...
) PARTITIONED BY (pt_year STRING, pt_month STRING, pt_day STRING) TBLPROPERTIES (
  -- 方式1:按照分區(qū)名排序來(lái)識(shí)別最新分區(qū)(推薦使用該種方式)
  'streaming-source.enable' = 'true', -- 開(kāi)啟Streaming source
  'streaming-source.partition.include' = 'latest',-- 選擇最新分區(qū)
  'streaming-source.monitor-interval' = '12 h',-- 每12小時(shí)加載一次最新分區(qū)數(shù)據(jù)
  'streaming-source.partition-order' = 'partition-name',  -- 按照分區(qū)名排序

  -- 方式2:分區(qū)文件的創(chuàng)建時(shí)間排序來(lái)識(shí)別最新分區(qū)
  'streaming-source.enable' = 'true',
  'streaming-source.partition.include' = 'latest',
  'streaming-source.partition-order' = 'create-time',-- 分區(qū)文件的創(chuàng)建時(shí)間排序
  'streaming-source.monitor-interval' = '12 h'

  -- 方式3:按照分區(qū)時(shí)間排序來(lái)識(shí)別最新分區(qū)
  'streaming-source.enable' = 'true',
  'streaming-source.partition.include' = 'latest',
  'streaming-source.monitor-interval' = '12 h',
  'streaming-source.partition-order' = 'partition-time', -- 按照分區(qū)時(shí)間排序
  'partition.time-extractor.kind' = 'default',
  'partition.time-extractor.timestamp-pattern' = '$pt_year-$pt_month-$pt_day 00:00:00' 
);

有了上面的Hive維表,我們就可以使用該維表與Kafka的實(shí)時(shí)流數(shù)據(jù)進(jìn)行JOIN叠艳,得到相應(yīng)的寬表數(shù)據(jù)奶陈。

-- 使用default sql方言
SET table.sql-dialect=default;
-- kafka實(shí)時(shí)流數(shù)據(jù)表
CREATE TABLE orders_table (
  order_id STRING,
  order_amount DOUBLE,
  product_id STRING,
  log_ts TIMESTAMP(3),
  proctime as PROCTIME()
) WITH (...);

-- 將流表與hive最新分區(qū)數(shù)據(jù)關(guān)聯(lián) 
SELECT *
FROM orders_table AS orders
JOIN dimension_table FOR SYSTEM_TIME AS OF orders.proctime AS dim 
ON orders.product_id = dim.product_id;

除了在定義Hive維表時(shí)指定相關(guān)的參數(shù),我們還可以通過(guò)SQL Hint的方式動(dòng)態(tài)指定相關(guān)的參數(shù)附较,具體方式如下:

SELECT *
FROM orders_table AS orders
JOIN dimension_table
/*+ OPTIONS('streaming-source.enable'='true',             
    'streaming-source.partition.include' = 'latest',
    'streaming-source.monitor-interval' = '1 h',
    'streaming-source.partition-order' = 'partition-name') */
FOR SYSTEM_TIME AS OF orders.proctime AS dim -- 時(shí)態(tài)表(維表)
ON orders.product_id = dim.product_id;

Temporal Join最新表

對(duì)于Hive的非分區(qū)表吃粒,當(dāng)使用temporal join時(shí),整個(gè)Hive表會(huì)被緩存到Slot內(nèi)存中翅睛,然后根據(jù)流中的數(shù)據(jù)對(duì)應(yīng)的key與其進(jìn)行匹配声搁。使用最新的Hive表進(jìn)行temporal join不需要進(jìn)行額外的配置黑竞,我們只需要配置一個(gè)Hive表緩存的TTL時(shí)間捕发,該時(shí)間的作用是:當(dāng)緩存過(guò)期時(shí),就會(huì)重新掃描Hive表并加載最新的數(shù)據(jù)很魂。

  • lookup.join.cache.ttl

    • 默認(rèn)值:60min
    • 解釋?zhuān)罕硎揪彺鏁r(shí)間扎酷。由于 Hive 維表會(huì)把維表所有數(shù)據(jù)緩存在 TM 的內(nèi)存中,當(dāng)維表數(shù)據(jù)量很大時(shí)遏匆,很容易造成 OOM法挨。當(dāng)然TTL的時(shí)間也不能太短,因?yàn)闀?huì)頻繁地加載數(shù)據(jù)幅聘,從而影響性能凡纳。

    尖叫提示:

    當(dāng)使用此種方式時(shí),Hive表必須是有界的lookup表帝蒿,即非Streaming Source的時(shí)態(tài)表荐糜,換句話(huà)說(shuō),該表的屬性streaming-source.enable = false葛超。

    如果要使用Streaming Source的時(shí)態(tài)表暴氏,記得配置streaming-source.monitor-interval的值,即數(shù)據(jù)更新的時(shí)間間隔绣张。

-- Hive維表數(shù)據(jù)使用批處理的方式按天裝載
SET table.sql-dialect=hive;
CREATE TABLE dimension_table (
  product_id STRING,
  product_name STRING,
  unit_price DECIMAL(10, 4),
  pv_count BIGINT,
  like_count BIGINT,
  comment_count BIGINT,
  update_time TIMESTAMP(3),
  update_user STRING,
  ...
) TBLPROPERTIES (
  'streaming-source.enable' = 'false', -- 關(guān)閉streaming source
  'streaming-source.partition.include' = 'all',  -- 讀取所有數(shù)據(jù)
  'lookup.join.cache.ttl' = '12 h'
);
-- kafka事實(shí)表
SET table.sql-dialect=default;
CREATE TABLE orders_table (
  order_id STRING,
  order_amount DOUBLE,
  product_id STRING,
  log_ts TIMESTAMP(3),
  proctime as PROCTIME()
) WITH (...);

-- Hive維表join答渔,F(xiàn)link會(huì)加載該維表的所有數(shù)據(jù)到內(nèi)存中
SELECT *
FROM orders_table AS orders
JOIN dimension_table FOR SYSTEM_TIME AS OF orders.proctime AS dim
ON orders.product_id = dim.product_id;

尖叫提示:

1.每一個(gè)子任務(wù)都需要緩存一份維表的全量數(shù)據(jù),一定要確保TM的task Slot 大小能夠容納維表的數(shù)據(jù)量侥涵;

2.推薦將streaming-source.monitor-interval和lookup.join.cache.ttl的值設(shè)為一個(gè)較大的數(shù)沼撕,因?yàn)轭l繁的更新和加載數(shù)據(jù)會(huì)影響性能宋雏。

3.當(dāng)緩存的維表數(shù)據(jù)需要重新刷新時(shí),目前的做法是將整個(gè)表進(jìn)行加載务豺,因此不能夠?qū)⑿聰?shù)據(jù)與舊數(shù)據(jù)區(qū)分開(kāi)來(lái)好芭。

Hive維表JOIN示例

假設(shè)維表的數(shù)據(jù)是通過(guò)批處理的方式(比如每天)裝載至Hive中,而Kafka中的事實(shí)流數(shù)據(jù)需要與該維表進(jìn)行JOIN冲呢,從而構(gòu)建一個(gè)寬表數(shù)據(jù)舍败,這個(gè)時(shí)候就可以使用Hive的維表JOIN。

  • 創(chuàng)建一張kafka數(shù)據(jù)源表,實(shí)時(shí)流
SET table.sql-dialect=default;
CREATE TABLE fact_user_behavior ( 
    `user_id` BIGINT, -- 用戶(hù)id
    `item_id` BIGINT, -- 商品id
    `action` STRING, -- 用戶(hù)行為
    `province` INT, -- 用戶(hù)所在的省份
    `ts` BIGINT, -- 用戶(hù)行為發(fā)生的時(shí)間戳
    `proctime` AS PROCTIME(), -- 通過(guò)計(jì)算列產(chǎn)生一個(gè)處理時(shí)間列
    `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件時(shí)間
     WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND  -- 定義watermark
 ) WITH ( 
    'connector' = 'kafka', -- 使用 kafka connector
    'topic' = 'user_behaviors', -- kafka主題
    'scan.startup.mode' = 'earliest-offset', -- 偏移量
    'properties.group.id' = 'group1', -- 消費(fèi)者組
    'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 
    'format' = 'json', -- 數(shù)據(jù)源格式為json
    'json.fail-on-missing-field' = 'true',
    'json.ignore-parse-errors' = 'false'
);
  • 創(chuàng)建一張Hive維表
SET table.sql-dialect=hive;
CREATE TABLE dim_item (
  item_id BIGINT,
  item_name STRING,
  unit_price DECIMAL(10, 4)
) PARTITIONED BY (dt STRING) TBLPROPERTIES (
  'streaming-source.enable' = 'true',
  'streaming-source.partition.include' = 'latest',
  'streaming-source.monitor-interval' = '12 h',
  'streaming-source.partition-order' = 'partition-name'
);
  • 關(guān)聯(lián)Hive維表的最新數(shù)據(jù)
SELECT 
    fact.item_id,
    dim.item_name,
    count(*) AS buy_cnt
FROM fact_user_behavior AS fact
LEFT JOIN dim_item FOR SYSTEM_TIME AS OF fact.proctime AS dim
ON fact.item_id = dim.item_id
WHERE fact.action = 'buy'
GROUP BY fact.item_id,dim.item_name;

使用SQL Hint方式敬拓,關(guān)聯(lián)非分區(qū)的Hive維表:

set table.dynamic-table-options.enabled= true; 
SELECT 
    fact.item_id,
    dim.item_name,
    count(*) AS buy_cnt
FROM fact_user_behavior AS fact
LEFT JOIN dim_item1
/*+ OPTIONS('streaming-source.enable'='false',             
    'streaming-source.partition.include' = 'all',
    'lookup.join.cache.ttl' = '12 h') */
FOR SYSTEM_TIME AS OF fact.proctime AS dim
ON fact.item_id = dim.item_id
WHERE fact.action = 'buy'
GROUP BY fact.item_id,dim.item_name;

總結(jié)

本文以最新版本的Flink1.12為例邻薯,介紹了Flink讀寫(xiě)Hive的不同方式,并對(duì)每種方式給出了相應(yīng)的使用示例乘凸。在實(shí)際應(yīng)用中厕诡,通常有將實(shí)時(shí)數(shù)據(jù)流與 Hive 維表 join 來(lái)構(gòu)造寬表的需求,F(xiàn)link提供了Hive維表JOIN营勤,可以簡(jiǎn)化用戶(hù)使用的復(fù)雜度灵嫌。本文在最后詳細(xì)說(shuō)明了Flink進(jìn)行Hive維表JOIN的基本步驟以及使用示例,希望對(duì)你有所幫助葛作。

公眾號(hào)『大數(shù)據(jù)技術(shù)與數(shù)倉(cāng)』寿羞,回復(fù)『資料』領(lǐng)取大數(shù)據(jù)資料包

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市赂蠢,隨后出現(xiàn)的幾起案子绪穆,更是在濱河造成了極大的恐慌,老刑警劉巖虱岂,帶你破解...
    沈念sama閱讀 218,941評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件玖院,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡第岖,警方通過(guò)查閱死者的電腦和手機(jī)难菌,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)蔑滓,“玉大人郊酒,你說(shuō)我怎么就攤上這事√瘫” “怎么了猎塞?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,345評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)杠纵。 經(jīng)常有香客問(wèn)我荠耽,道長(zhǎng),這世上最難降的妖魔是什么比藻? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,851評(píng)論 1 295
  • 正文 為了忘掉前任铝量,我火速辦了婚禮倘屹,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘慢叨。我一直安慰自己纽匙,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,868評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布拍谐。 她就那樣靜靜地躺著烛缔,像睡著了一般。 火紅的嫁衣襯著肌膚如雪轩拨。 梳的紋絲不亂的頭發(fā)上践瓷,一...
    開(kāi)封第一講書(shū)人閱讀 51,688評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音亡蓉,去河邊找鬼晕翠。 笑死,一個(gè)胖子當(dāng)著我的面吹牛砍濒,可吹牛的內(nèi)容都是我干的淋肾。 我是一名探鬼主播,決...
    沈念sama閱讀 40,414評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼爸邢,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼樊卓!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起甲棍,我...
    開(kāi)封第一講書(shū)人閱讀 39,319評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤简识,失蹤者是張志新(化名)和其女友劉穎赶掖,沒(méi)想到半個(gè)月后感猛,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,775評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡奢赂,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評(píng)論 3 336
  • 正文 我和宋清朗相戀三年陪白,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片膳灶。...
    茶點(diǎn)故事閱讀 40,096評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡咱士,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出轧钓,到底是詐尸還是另有隱情序厉,我是刑警寧澤,帶...
    沈念sama閱讀 35,789評(píng)論 5 346
  • 正文 年R本政府宣布毕箍,位于F島的核電站弛房,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏而柑。R本人自食惡果不足惜文捶,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,437評(píng)論 3 331
  • 文/蒙蒙 一荷逞、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧粹排,春花似錦种远、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,993評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至射富,卻和暖如春常拓,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背辉浦。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,107評(píng)論 1 271
  • 我被黑心中介騙來(lái)泰國(guó)打工弄抬, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人宪郊。 一個(gè)月前我還...
    沈念sama閱讀 48,308評(píng)論 3 372
  • 正文 我出身青樓掂恕,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親弛槐。 傳聞我的和親對(duì)象是個(gè)殘疾皇子懊亡,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,037評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容