Flink 使用之 Hudi 編譯部署、配置和使用

Flink 使用介紹相關(guān)文檔目錄

Flink 使用介紹相關(guān)文檔目錄

前言

因業(yè)務要求對采集來的數(shù)據(jù)進行統(tǒng)一存儲霹崎,因此引入了Flink CDC - Hudi方案。Flink CDC在本人之前的博客已有介紹(參見Flink 使用之 MySQL CDC
)冶忱。本篇重點介紹Flink SQL結(jié)合Hudi的使用方法尾菇。Hudi表使用Flink SQL操作,為了便于業(yè)務人員使用囚枪,我們?yōu)槠涮峁㈱eppelin派诬,能夠以可視化的方式編寫并執(zhí)行Flink作業(yè),同時還可以圖形化展示數(shù)據(jù)分析結(jié)果链沼。

軟件版本

我們使用的軟件和版本如下所示:

  • Flink:1.13.2
  • Hudi:0.11.1
  • Zeppelin: 0.10.0

首先我們配置Flink Hudi環(huán)境茅诱。

下載編譯Hudi

找一臺已經(jīng)安裝了maven的服務器捶惜。執(zhí)行:

git clone https://github.com/apache/hudi.git

源代碼clone成功之后敦腔,切換分支到origin/release-0.11.1潜秋。接著執(zhí)行編譯命令:

mvn clean package -Dflink1.13 -Dscala2.11 -DskipTests -T 4

等待編譯完成。

編譯完成之后疾捍,F(xiàn)link hudi bundle的編譯輸出在hudi/packaging/hudi-flink-bundle/target奈辰,F(xiàn)link SQL支持Hudi所需jar包就在這個目錄,將其復制走備用乱豆。

使用Flink SQL Client的方法執(zhí)行Hudi SQL

在這一步我們使用Flink on yarn的方式啟動Flink SQL Client奖恰,然后通過它操作Hudi表。

首先我們下載Flink 1.13.2并解壓咙鞍。

wget https://archive.apache.org/dist/flink/flink-1.13.2/flink-1.13.2-bin-scala_2.11.tgz
tar -zxvf flink-1.13.2-bin-scala_2.11.tgz

配置Flink啟用checkpoint房官。編輯$FLINK_HOME/conf/flink-conf.yaml文件,添加或修改如下配置续滋。

# 啟用checkpoint,間隔時間為5000毫秒
execution.checkpointing.interval: 5000
# 使用rocksdb狀態(tài)后端
state.backend: rocksdb
# 配置狀態(tài)后端保存checkpoint的路徑
state.checkpoints.dir: hdfs://test24/flink-checkpoints
# 啟用增量checkpoint孵奶,配置rocksdb的時候建議開啟
state.backend.incremental: true

注意:務必啟用Flink checkpoint疲酌。否則Hudi流式作業(yè)會遇到數(shù)據(jù)無法插入等問題。

然后配置HADOOP的環(huán)境變量:

export HADOOP_CLASSPATH=`hadoop classpath`

Flink SQL client支持運行于standalone集群和Yarn集群上。

運行于standalone集群

編輯conf/flink-conf.yaml朗恳,修改如下內(nèi)容:

taskmanager.numberOfTaskSlots: 4

然后修改conf/worker湿颅,如下所示:

localhost
localhost
localhost
localhost

這樣子配置,啟動standlone集群時粥诫,會在本地啟動4個TaskManager油航。

下面的操作需要使用具有HDFS讀寫權(quán)限的用戶執(zhí)行。

啟動standalone集群:

./start-cluster.sh

接下來啟動Flink SQL Client:

./sql-client.sh embedded -j /path/to/hudi-flink-bundle_xxxxxx.jar

注意怀浆,-j后面是Hudi Flink bundle jar包谊囚。如果將Hudi包復制到了Flink安裝路徑的lib目錄,啟動sql-client的時候無需添加-j 參數(shù)执赡。

運行于Yarn集群

下面的操作需要使用具有HDFS讀寫權(quán)限和Yarn隊列提交權(quán)限的用戶執(zhí)行镰踏。

啟動Flink的yarn-session:

./yarn-session.sh -d -s 4 -jm 1024 -tm 1024

記得記下啟動的yarn-session對應的application id∩澈希可以通過Yarn ResourceManager UI來查看application id奠伪。

接下來啟動Flink SQL Client:

./sql-client.sh embedded -s yarn-session -j /path/to/hudi-flink-bundle_xxxxxx.jar

注意,-j后面是Hudi Flink bundle jar包首懈。如果將Hudi包復制到了Flink安裝路徑的lib目錄绊率,啟動sql-client的時候無需添加-j 參數(shù)。

操作Hudi表

成功進入Flink SQL Client之后究履,我們執(zhí)行插入測試數(shù)據(jù)的SQL:

CREATE TABLE t1(
  uuid VARCHAR(20),
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs:///zy/hudi/',
  'table.type' = 'MERGE_ON_READ'
);

-- insert data using values
INSERT INTO t1 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

稍等一段時間后執(zhí)行如下SQL即舌,檢查插入的數(shù)據(jù):

select * from t1;

如果能查詢出剛才插入的數(shù)據(jù),說明Flink Hudi運行環(huán)境配置無誤挎袜。

訪問元數(shù)據(jù)字段

除了用戶指定的字段外顽聂,Hudi表為了管理需要和實現(xiàn)自身的特性,引入了一些元數(shù)據(jù)字段盯仪。這些字段默認存在于所有Hudi表中紊搪。

元數(shù)據(jù)字段名稱和作用如下所示:

  • _hoodie_commit_time:對應數(shù)據(jù)的提交時間(存入Hudi表的時間,生成commit instant的時間)全景。
  • _hoodie_commit_seqno:每條數(shù)據(jù)都對應唯一的提交序列編號耀石。
  • _hoodie_record_key:Hudi記錄的主鍵,對應SQL中的primary key爸黄。
  • _hoodie_file_name:存儲該條數(shù)據(jù)的文件名滞伟。

在Flink SQL create語句中如果不指定元數(shù)據(jù)字段,則無法查詢這些字段的內(nèi)容炕贵。Spark沒有這個限制梆奈。如果需要在Flink中訪問這些元數(shù)據(jù)字段,需要在create table的時候指定称开。例如:

CREATE TABLE t1(
  uuid VARCHAR(20),
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  _hoodie_commit_time varchar(20),
  _hoodie_commit_seqno varchar(20),
  _hoodie_record_key varchar(20),
  _hoodie_partition_path varchar(20),
  _hoodie_file_name varchar(100),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs:///zy/hudi/',
  'table.type' = 'MERGE_ON_READ'
);

可以通過如下語句查詢元數(shù)據(jù)字段:

select _hoodie_commit_time, _hoodie_commit_seqno, _hoodie_record_key, _hoodie_file_name from t1;

Streaming增量讀

增量讀的含義是查詢一個給定的時刻(commit timestamp)之后更新或者新增的數(shù)據(jù)亩钟。Streaming讀查詢生成一個數(shù)據(jù)流乓梨,隨著數(shù)據(jù)源的插入或更新實時輸出結(jié)果。

首先變更一條數(shù)據(jù):

insert into t1 values
  ('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');

然后創(chuàng)建表t2清酥,使用相同的數(shù)據(jù)目錄:

CREATE TABLE t2(
  uuid VARCHAR(20),
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs:///zy/hudi/',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',  -- this option enable the streaming read
  'read.streaming.start-commit' = '20220620164900', -- specifies the start commit instant time
  'read.streaming.check-interval' = '4' -- specifies the check interval for finding new source commits, default 60s.
);

-- Then query the table in stream mode
select * from t2;

我們發(fā)現(xiàn)新增加的數(shù)據(jù)被查詢出來了扶镀。

read.streaming.start-commit的格式為yyyyMMDDhhmmss。

注意:此處可能會遇到錯誤:

java.lang.ClassNotFoundException: org.apache.hadoop.mapred.FileInputFormat
java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat

解決辦法為復制集群的hadoop-mapreduce-client-core.jarhive-exec-3.1.0.3.0.1.0-187.jar到Flink lib中焰轻。

常用配置項:

  • read.streaming.enabled: 是否啟用流式讀取臭觉。默認為false
  • read.start-commit: 從哪個commit time開始讀取。格式為yyyyMMDDhhmmss辱志。
  • read.streaming.skip_compaction: 是否跳過數(shù)據(jù)壓縮的commit蝠筑。有兩個目的:防止從compaction instant上讀取重復的數(shù)據(jù),啟用changelog模式的時候讀取壓縮前的數(shù)據(jù)荸频。默認為false菱肖。
  • clean.retain_commits: 保留超過多少個commit會被清理。多用于changelog模式旭从。默認為10稳强。

參考鏈接:https://hudi.apache.org/docs/hoodie_deltastreamer#streaming-query

Flink Hudi作業(yè)savepoint和恢復

我們使用一個場景(Flink Kafka數(shù)據(jù)入Hudi表)來模擬作業(yè)savepoint恢復。

前提條件:

  1. Flink 配置了checkpoint和悦。
  2. Flink Yarn session已啟動退疫。

下面開始啟動Kafka入Hudi表作業(yè)。

  1. 啟動Kafka console producer鸽素,用來輸入實驗數(shù)據(jù)褒繁。
./kafka-console-producer.sh --broker-list broker1:9092,broker2:9092,broker3:9092 --topic demo_topic
  1. 啟動Flink SQL client,創(chuàng)建Kafka數(shù)據(jù)源表馍忽。
CREATE TABLE kafka_t1(
  uuid VARCHAR(20),
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'broker1:9092,broker2:9092,broker3:9092',
'topic' = 'demo_topic',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true'
);
  1. 創(chuàng)建Hudi表棒坏。
CREATE TABLE t1(
  uuid VARCHAR(20),
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs:///path/to/hudi_kafka/',
  'table.type' = 'MERGE_ON_READ'
);
  1. 啟動Kafka寫入Hudi表任務。
insert into t1 select * from kafka_t1;

到此位置Flink從Kafka如Hudi作業(yè)能夠正常運行遭笋“用幔可以在Kafka console consumer插入CSV數(shù)據(jù)驗證:

id1,Danny,23,1970-01-01 00:00:01,par1
id2,Stephen,33,1970-01-01 00:00:02,par1
id3,Julian,53,1970-01-01 00:00:03,par2
id4,Fabian,31,1970-01-01 00:00:04,par2
id5,Sophia,18,1970-01-01 00:00:05,par3
id6,Emma,20,1970-01-01 00:00:06,par3
id7,Bob,44,1970-01-01 00:00:07,par4
id8,Han,56,1970-01-01 00:00:08,par4

接下來將作業(yè)保存savepoint并退出。

flink stop --savepointPath hdfs://ip:9000/path/to/savepoint/ jobID -yid application_id

其中:

  • hdfs://ip:9000/path/to/savepoint/為Flink savepoint保存目錄瓦呼。
  • JobIDinsert into語句創(chuàng)建的入Hudi數(shù)據(jù)湖任務喂窟。JobID可通過Flink Dashboard的Running Jobs頁面查到。
  • application_id為Flink yarn session的application ID央串。

執(zhí)行Flink stop的時候我們可以看到如下日志;

Suspending job "74c7fc1f08ad533071a137b813429de8" with a savepoint.
...
Savepoint completed. Path: hdfs://test24/savepoint/savepoint-74c7fc-babb7853052b

從中我們可以得知savepoint全路徑為hdfs://test24/savepoint/savepoint-74c7fc-babb7853052b磨澡,下面恢復作業(yè)的時候需要用到。

到現(xiàn)在Flink作業(yè)已經(jīng)停機质和。我們可以開始作業(yè)恢復過程稳摄。

首先啟動Flink SQL client。然后執(zhí)行SET execution.savepoint.path語句侦另,例如:

SET execution.savepoint.path=hdfs://test24/savepoint/savepoint-74c7fc-babb7853052b;

注意秩命,這種方式既可以從savepoint又可以從checkpoint恢復數(shù)據(jù)尉共。從checkpoint恢復數(shù)據(jù)需要路徑需要精確到chk-xxx目錄褒傅,例如:
/flink-checkpoints/015c2ed83fdbc66b3fe3193d09fed915/chk-1803弃锐。
其中015c2ed83fdbc66b3fe3193d09fed915為JobID,chk-1803為停機時候最后一次成功的checkpoint id殿托。

最后我們重新執(zhí)行create tableinsert into語句霹菊,確保和之前作業(yè)的相同。作業(yè)即可從savepoint/checkpoint恢復支竹。

Flink Hudi參數(shù)

詳細的配置請參考鏈接:https://hudi.apache.org/docs/flink_configuration/

Flink級別常用的配置如下:

  • taskmanager.numberOfTaskSlots: Task Manager的slot個數(shù)旋廷。如果使用yarn session,通過yarn session啟動命令控制礼搁。
  • parallelism.default: 默認的并行度饶碘。
  • execution.checkpointing.interval: checkpoint間隔時間,單位毫秒馒吴。
  • state.backend: 使用什么狀態(tài)后端扎运。Hudi建議使用rocksdb狀態(tài)后端。
  • state.backend.rocksdb.localdir: TM本地存放rocksdb文件的位置饮戳。
  • state.checkpoints.dir: checkpoint數(shù)據(jù)存儲目錄豪治,要求所有的Flink節(jié)點都能夠訪問到,建議使用HDFS等分布式文件系統(tǒng)扯罐。
  • state.backend.incremental: 是否啟用增量的checkpoint负拟。增量的checkpoint每次只保存和上一次的差異信息,減少數(shù)據(jù)存儲量和checkpoint耗時歹河。如果使用rocksdb為狀態(tài)后端掩浙,建議開啟增量checkpoint。

除了Flink常用的配置外秸歧,還有些表級別的參數(shù)可用于Hudi微調(diào)厨姚。表級別參數(shù)需要添加到SQL的with語句中。

  • write.precombine.field: 版本字段寥茫,基于此字段的大小來判斷消息是否進行更新遣蚀。默認為ts
  • write.task.max.size: 寫任務的最大內(nèi)存限制纱耻,默認為1024MB芭梯。如果超出會強制flush最大的bucket(批量數(shù)據(jù))。
  • write.rate.limit: 限制寫入速率弄喘。默認為0不加任何限制玖喘。
  • write.tasks: 寫入任務并行度。默認是4蘑志。
  • write.merge.max_memory: COP類型表需要合并base file和增量數(shù)據(jù)累奈,增量數(shù)據(jù)會被緩存然后溢寫到磁盤贬派。該配置項為此過程最大可使用的堆內(nèi)存大小。默認為100MB澎媒,內(nèi)存充裕時建議增大搞乏。
  • read.tasks: 讀取操作并發(fā)度。默認是4戒努。
  • compaction.tasks: MOR表online compaction的并行度请敦。默認是4。Online compaction會占用寫操作的資源储玫。建議使用offline compaction侍筛。
bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink1.13-bundle_2.11-0.11.1.jar --path hdfs://xxx:9000/table --schedule
  • compaction.schedule.enabled: 是否啟用周期性online compaction。默認開啟撒穷。
  • compaction.async.enabled: MOR表是否啟用異步compaction匣椰。默認開啟。
  • compaction.trigger.strategy: 觸發(fā)壓縮的策略端礼∏菪Γ可配置num_commits(多少次提交后觸發(fā)),time_elapsed(多久之后觸發(fā)齐媒,單位秒)蒲每,num_and_time(提交數(shù)和時間都要滿足)或者num_or_time(提交數(shù)或時間滿足一個條件)。默認為num_commits喻括。
  • compaction.delta_commits: 上面配置中提到的提交數(shù)邀杏。默認為5。
  • compaction.delta_seconds: 上面配置中提到的compaction間隔時間唬血。單位為秒望蜡。默認值是3600拷恨。
  • compaction.max_memory: 壓縮使用的最大內(nèi)存數(shù)脖律。默認為100MB冕杠,內(nèi)存充裕時建議增大。
  • hoodie.cleaner.commits.retained: COW表最多保留最近多少個commit配喳。默認為24酪穿。避免數(shù)據(jù)的無限膨脹。

MOR 表的增量log和base file合并通過compaction相關(guān)配置項來配置晴裹。
COW 表的歷史版本清除(避免數(shù)據(jù)膨脹)可參考https://blog.csdn.net/bluishglc/article/details/115531015

Bulk Insert

Bulk Insert(批量插入)適合導入大量數(shù)據(jù)到Hudi中被济。比正常寫入數(shù)據(jù)的方式快很多。使用batch運行模式進行bulk insert效率更高息拜。Bulk insert沒有序列化和數(shù)據(jù)合并去重過程溉潭。

涉及到的配置項有:

  • write.operation: 默認是upsert净响,可以設(shè)置為bulk_insert用來啟用bulk insert模式少欺。
  • write.tasks: 寫入任務并行度。默認為4馋贤。
  • write.bulk_insert.shuffle_input: 寫入之前是否shuffle數(shù)據(jù)赞别。可以減少小文件數(shù)量配乓,但是增加了數(shù)據(jù)傾斜的風險仿滔。默認為true。
  • write.bulk_insert.sort_input: 寫入之前是否排序犹芹。寫入多個分區(qū)的時候可以減少小文件數(shù)量崎页。默認為true。
  • write.sort.memory: 排序使用的內(nèi)存大小腰埂。默認為128MB飒焦。

參考鏈接:https://hudi.apache.org/docs/hoodie_deltastreamer#bulk-insert

Index Bootstrap

Index Bootstrap在創(chuàng)建表的時候讀取表的索引到Flink的state。這樣執(zhí)行upsert操作的時候就可以直接從state讀取索引屿笼,從而判斷新數(shù)據(jù)是新增還是修改牺荠。加載索引的過程可能會非常耗時。

涉及到的配置項有:

  • index.bootstrap.enabled: 啟用index bootstrap功能時驴一,會將Hudi表的索引一次性加載到Flink狀態(tài)中休雌。默認為false。
  • index.partition.regex: 決定加載哪些分區(qū)的索引到state中肝断。默認為*杈曲,即加載所有分區(qū)。

使用建議:

  1. 可以配置flink-conf.yaml中的execution.checkpointing.tolerable-failed-checkpoints = n配置項胸懈,確定能容忍的checkpoint失敗次數(shù)担扑。這個取決于Flink checkpoint的周期。如果Index bootstrap耗時長箫荡,建議調(diào)大這個值魁亦。
  2. Flink第一次checkpoint成功,說明Hudi index bootstrap過程結(jié)束羔挡。結(jié)束之后用戶可以退出或保存savepoint洁奈。下次啟動的時候可以直接加載savepoint间唉,無需再重新index bootstrap。

使用flink stop --savepointPath hdfs://ip:9000/path/to/savepoint/ JobID -yid application-id命令保存savepoint并退出利术。
恢復作業(yè)在sql client中執(zhí)行SET execution.savepoint.path=hdfs://ip:9000/path/to/savepoint/xxx;呈野,然后重新建表執(zhí)行insert重新啟動任務(需要和之前任務的SQL語句相同)。

  1. 再次啟動作業(yè)的時候印叁,設(shè)置index.bootstrap.enabledfalse被冒。

參考鏈接:https://hudi.apache.org/docs/hoodie_deltastreamer#index-bootstrap

Changelog 模式

Hudi可以記錄數(shù)據(jù)的中間狀態(tài)(I / -U / U / D) ,類似于Flink的changelog stream轮蜕。Hudi MOR表以行的形式存儲昨悼,支持保留變更狀態(tài)信息。

啟用changelog模式需要在表中開啟changelog.enabled=true配置項跃洛。開啟之后數(shù)據(jù)變更的中間結(jié)果都會被保留下來率触。

注意:

  1. 批量讀方式任然會合并中間結(jié)果,無論是否啟用changelog汇竭。
  2. 啟用changelog模式Hudi也只是盡力去保留中間變更數(shù)據(jù)葱蝗。異步壓縮會將changelog數(shù)據(jù)合并為最終結(jié)果。所以說如果數(shù)據(jù)沒有被及時消費掉细燎,那么這條數(shù)據(jù)只能讀取到它的最終狀態(tài)两曼。為了緩解這種情況,可以配合設(shè)置compaction.delta_commits 和/或 compaction.delta_seconds玻驻,讓compaction間隔時間加大悼凑,從而增加中間變更數(shù)據(jù)的保留時間。

參考鏈接:https://hudi.apache.org/docs/hoodie_deltastreamer#changelog-mode

Append 模式

如果insert操作用于攝入數(shù)據(jù)击狮,COW表默認不會合并小文件佛析。MOR表會追加數(shù)據(jù)到delta log中,然后定期執(zhí)行壓縮彪蓬。

小文件合并策略會導致性能下降寸莫。可以通過write.insert.cluster來控制COW表寫入數(shù)據(jù)時是否合并小文件档冬。默認為false膘茎,即不啟用。該選項僅僅對COW表生效酷誓。

參考鏈接:https://hudi.apache.org/docs/hoodie_deltastreamer#append-mode

通過Zeppelin使用Flink Hudi

Flink SQL Client可以讓用戶直接通過SQL方式創(chuàng)建流處理作業(yè)披坏,不再需要編寫Java/Scala代碼,但是它仍然基于命令行盐数,對業(yè)務人員不夠友好棒拂。為了方便業(yè)務人員使用,我們引入了Zeppelin。Zeppelin提供了交互數(shù)據(jù)分析和可視化功能帚屉,易用性能夠滿足業(yè)務人員的需求谜诫。

注意:Zeppelin,F(xiàn)link和Hudi三者之間存在版本兼容問題攻旦。本人目前驗證了Zeppelin 0.10.0喻旷,F(xiàn)link 1.13.2和Hudi 0.11.1。試用其他版本過程遇到了些奇怪的問題牢屋。所以說其他版本組件請謹慎操作且预。

下面我們開始部署和使用Zeppelin。

安裝和配置Zeppelin

在之前安裝Flink的服務器上烙无,下載Zeppelin 0.10.0版本:

wget https://dlcdn.apache.org/zeppelin/zeppelin-0.10.0/zeppelin-0.10.0-bin-all.tgz

下載完畢后將其解壓锋谐,由于Zeppelin必須使用Java8 151版本之后的JDK,如果系統(tǒng)自帶的JDK不滿足要求皱炉,需要專門為Zeppelin指定JDK怀估。JDK滿足要求的可以略過此步驟。

cd zeppelin-0.10.0-bin-all/conf/
cp zeppelin-env.sh.template zeppelin-env.sh

然后編輯zeppelin-env.sh合搅,加入一行:

export JAVA_HOME=/path/to/jdk8u302-b08

指定Zeppelin專屬的JDK。

Zeppelin默認的端口號是8080歧蕉,如果需要修改的話灾部,先創(chuàng)建一個zeppelin-site.xml文件:

cd zeppelin-0.10.0-bin-all/conf/
cp zeppelin-site.xml.template zeppelin-site.xml

修改如下內(nèi)容:

<property>
  <name>zeppelin.server.addr</name>
  <value>0.0.0.0</value>
  <description>Server binding address</description>
</property>

<property>
  <name>zeppelin.server.port</name>
  <value>9999</value>
  <description>Server port.</description>
</property>

例子中我們沒有綁定IP,端口號修改成了9999惯退。

最后我們通過如下命令啟動Zeppelin服務:

cd zeppelin-0.10.0-bin-all/bin
./zeppelin-daemon.sh start

緊接著打開瀏覽器輸入http://目標IP:9999赌髓,如果能夠打開Zeppelin頁面,說明配置無誤催跪。如果無法打開锁蠕,說明Zeppelin配置或者環(huán)境出現(xiàn)了問題“谜簦可以查看Zeppelin的運行日志荣倾,Zeppelin的運行日志位于zeppelin-0.10.0-bin-all/logs目錄。

Zeppelin主界面

配置Zeppelin的interpreter

Zeppelin具有非常多的interpreter骑丸。interpreter為Zeppelin的插件舌仍,用于支持各種各樣的編程語言和數(shù)據(jù)處理后端,例如Hive通危,Spark和Flink等铸豁。

在Zeppelin中使用Flink,就必須依賴Flink interpreter菊碟,自然也離不開配置节芥。我們重點關(guān)注幾個核心的配置項。點擊Zeppelin右上角的菜單逆害,選擇interpreter头镊,在新頁面的搜索框處輸入flink增炭,可以很方便的找到flink interpreter和他的配置。

flink interpreter 配置頁面

最為重要的幾個配置項為:

  • FLINK_HOME:Flink的安裝目錄拧晕,必須要配置隙姿。
  • HADOOP_CONF_DIR:Hadoop配置文件目錄,如果Flink作業(yè)運行使用yarn模式厂捞,或者是使用HDFS输玷,必須配置此項。
  • flink.execution.mode:Flink的運行模式靡馁,可以選擇local欲鹏,remote或者yarn。local為本地運行臭墨,remote需要連接遠端集群(需要配置flink.execution.remote.hostflink.execution.remote.port赔嚎,即遠端Flink集群JobManager所在的host和port),yarn為提交作業(yè)到y(tǒng)arn集群胧弛。本例子中我們使用yarn模式尤误。
  • flink.execution.jars:執(zhí)行Flink作業(yè)依賴的其他jar包,可以配置本地文件路徑或者是HDFS上的路徑结缚,多個文件使用逗號分隔损晤。在這個例子中我們需要使用hudi,因此需要配置hudi-flink-bundle_xxxxxx.jar的全路徑红竭。

配置完畢之后尤勋,我們點擊flink interpreter欄右上方的saverestart,使配置項生效茵宪。

驗證Zeppelin Flink interpreter是否配置正確

接下來是驗證步驟最冰,我們打開Notebook菜單,選擇Flink Tutorial -> Flink Basics稀火。找到下方的Batch WordCount暖哨,點擊右側(cè)的運行按鈕。如果下方能看到WordCount運行結(jié)果憾股,說明Flink interpreter配置無誤鹿蜀。

Flink WordCount運行成功頁面

創(chuàng)建Note并通過Flink SQL操作Hudi表

點擊Notebook菜單,選擇Create new note服球,在彈出的對話框中填寫note的名稱茴恰,選擇默認的interpreter為Flink,點擊create按鈕斩熊。

在編寫Flink SQl之前往枣,我們需要先寫提示符,用來告訴Zeppelin需要怎樣解析我們的編程腳本,提示符共有以下5種:

  • %flink - 創(chuàng)建ExecutionEnvironment/StreamExecutionEnvironment/BatchTableEnvironment/StreamTableEnvironment 并且提供Scala環(huán)境
  • %flink.pyflink - 提供Python環(huán)境
  • %flink.ipyflink - 提供ipython環(huán)境
  • %flink.ssql - 提供流處理SQL環(huán)境
  • %flink.bsql - 提供批處理SQL環(huán)境

這里我們使用%flink.ssql提示符分冈,填寫入前面Hudi的測試SQL并執(zhí)行圾另,如果能夠正常創(chuàng)建Hudi表,插入數(shù)據(jù)并查詢出雕沉。說明配置無誤集乔。Zeppelin可以正常使用。

Flink 操作Hudi表

使用Hive metastore

前面例子中表的元數(shù)據(jù)是在內(nèi)存中保存的坡椒,如果Flink yarn session退出扰路,表的元數(shù)據(jù)會丟失。下次使用的時候需要再次創(chuàng)建表倔叼,非常不便于使用汗唱。在這一節(jié)我們打算使用Hive的metastore作為元數(shù)據(jù)容器。表元數(shù)據(jù)保存在Hive的metastore中是一種方便的多的方案丈攒。表元數(shù)據(jù)不會因為Flink session的停止而丟失哩罪。

首先我們需要檢查配合使用的hive的版本。在Flink安裝目錄的lib中添加對應的依賴巡验。Hive版本和對應依賴請參考官方文檔:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/hive/overview/

本例子中我們使用Hive 3.1.0 配合Flink 1.13.2使用际插。需要準備如下文件:

  • flink-connector-hive_2.11-1.13.2.jar
  • hive-exec-3.1.0.jar
  • libfb303-0.9.3.jar
  • antlr-runtime-3.5.2.jar

第一個文件我們可以從中央倉庫下載,后面3個文件在hive安裝目錄能夠找到深碱。

接下來我們將這4個文件放置在Flink的lib目錄中:

cd /path/to/flink-1.13.2/lib
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.11/1.13.2/flink-connector-hive_2.11-1.13.2.jar

cd /path/to/hive/lib
cp hive-exec-3.1.0.jar /path/to/flink-1.13.2/lib/
cp libfb303-0.9.3.jar /path/to/flink-1.13.2/lib/
cp antlr-runtime-3.5.2.jar /path/to/flink-1.13.2/lib/

然后重啟Zeppelin的Flink interpreter腹鹉。

接下來我們創(chuàng)建Hive catalog。打開前一節(jié)創(chuàng)建的Flink note敷硅,執(zhí)行如下SQL語句:

%flink.ssql
CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'default-database' = 'default',
    'hive-conf-dir' = '/path/to/hive/conf'
);

這條SQL語句創(chuàng)建出一個Hive catalog,具體使用Hive的哪個database我們可以提前使用beeline查詢好愉阎。其中hive-conf-dir最為重要绞蹦,必須要指定Hive配置文件所在的目錄(一般是Hive安裝路徑下的conf目錄)。創(chuàng)建Hive Catalog成功的截圖如下:

創(chuàng)建Hive Catalog

然后我們執(zhí)行下面的SQL榜旦,測試下Flink能否獲取到Hive中default數(shù)據(jù)庫下的表幽七。

%flink.ssql
show catalogs;
use catalog myhive;
show tables;

執(zhí)行成功的輸出如下圖所示(table部分未截圖):


Show Catalogs

如果到這一步能夠列出Hive的catalog和管理的tables,說明前面步驟操作無誤溅呢,可以進行下一步澡屡,使用將Hudi表交給Hive catalog管理。

我們再次執(zhí)行第一節(jié)中的測試SQL咐旧。觀察Zeppelin的輸出驶鹉。


創(chuàng)建Hudi表

雖然這里執(zhí)行成功了,但是本人清理環(huán)境后反復測試這條SQL的時候遇到了錯誤:


錯誤信息

比較詭異铣墨。本人使用Flink SQL client執(zhí)行均無問題室埋,檢查Zeppelin Flink Session的classpath,發(fā)現(xiàn)hive-exec包已經(jīng)加載,應該不存在問題才對姚淆。懷疑是Zeppelin和Flink版本兼容存在問題孕蝉。待得到初步解決方案后本人將更新此博客。

更新內(nèi)容:作者已調(diào)通Zeppelin Flink Hive Hudi環(huán)境腌逢,參見博客:Flink Zeppelin Hudi Hive 整合環(huán)境配置和使用

使用 Hive sync

Hive sync模式Flink會使用Hive的metastore降淮,同時還保持同步,通過Flink維護的Hudi表也能夠通過Hive查詢搏讶。

啟用Hive sync需要重新編譯Hudi佳鳖,因為Hudi默認編譯參數(shù)是不包含Hive相關(guān)依賴的。在編譯之前我們必須要確定配合使用的Hive的版本窍蓝。這里以Hive3.1.0為例腋颠。修改hudi/packaging/hudi-flink-bundle/pom.xml文件,找到如下部分:

<profile>
  <id>flink-bundle-shade-hive3</id>
  <properties>
    <hive.version>3.1.0</hive.version>
    <flink.bundle.hive.scope>compile</flink.bundle.hive.scope>
  </properties>
  <dependencies>
    <dependency>
      <groupId>${hive.groupid}</groupId>
      <artifactId>hive-service-rpc</artifactId>
      <version>${hive.version}</version>
      <scope>${flink.bundle.hive.scope}</scope>
    </dependency>
  </dependencies>
</profile>

在這一段中吓笙,修改hive的版本為實際使用的版本淑玫。

然后進入hudi項目根目錄,執(zhí)行如下命令編譯

mvn clean package -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive3 -Pinclude-flink-sql-connector-hive

最后復制編譯輸出到Flink的lib目錄:

cp /opt/zy/hudi/packaging/hudi-flink-bundle/target/hudi-flink-bundle_2.11-0.11.1.jar /flink-1.13.2/lib/

注意:如果Flink目錄中已經(jīng)有flink-connector-hive的jar包面睛,請務必移除絮蒿,否則會出現(xiàn)依賴沖突。

整理好的Flink lib目錄如下所示:

flink-connector-kafka_2.11-1.13.2.jar
flink-csv-1.13.2.jar
flink-dist_2.11-1.13.2.jar
flink-json-1.13.2.jar
flink-shaded-zookeeper-3.4.14.jar
flink-table_2.11-1.13.2.jar
flink-table-blink_2.11-1.13.2.jar
hadoop-mapreduce-client-core-3.1.1.3.0.1.0-187.jar
hive-exec-3.1.0.3.0.1.0-187.jar
hudi-flink1.13-bundle_2.11-0.11.1.jar
kafka-clients-2.4.1.jar
log4j-1.2-api-2.12.1.jar
log4j-api-2.12.1.jar
log4j-core-2.12.1.jar
log4j-slf4j-impl-2.12.1.jar

接著我們需要處理Hive的依賴。這一步如果忘了做,后面使用Hive查詢Hudi表的時候遮咖,會報如下錯誤:

Error: Error while compiling statement: FAILED: RuntimeException java.lang.ClassNotFoundException: org.apache.hudi.hadoop.HoodieParquetInputFormat (state=42000,code=40000)

我們進入Hudi的packaging/hudi-hadoop-mr-bundle/target目錄军熏,復制hudi-hadoop-mr-bundle-0.11.1.jar到Hive安裝目錄的auxlib下。記得重啟Hive所有服務艾恼。

注意:如果使用HDP,請務必復制hudi-hadoop-mr-bundle-0.11.1.jar到hiveserver2所在機器的auxlib目錄,否則仍然會報ClassNotFoundException蜡饵。

到這一步Hive sync已經(jīng)配置完畢,接下來我們驗證Hive sync的功能胳施。

首先進入Flink SQL client溯祸,啟動之前記得執(zhí)行下面腳本:

export HADOOP_CLASSPATH=`hadoop classpath`

然后執(zhí)行如下SQL:

CREATE TABLE t1(
  uuid VARCHAR(20),
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs:///zy/hudi',
  'table.type' = 'COPY_ON_WRITE',
  'hive_sync.enable' = 'true',
  'hive_sync.mode' = 'hms',
  'hive_sync.metastore.uris' = 'thrift://sizu02:9083',
  'hive_sync.table'='t1', 
  'hive_sync.db'='default'
);

增加的幾個重要配置項的解釋如下:

  • table.type: 測試中我們使用COPY_ON_WRITE。如果使用MERGE_ON_READ舞肆,在生成parquet文件之前焦辅,Hive查詢不到數(shù)據(jù)
  • hive_sync.enable: 是否啟用hive同步
  • hive_sync.mode: hive同步模式,包含hms和jdbc兩種椿胯,這里使用hms模式
  • hive_sync.metastore.uris: 配置hive metastore的URI
  • hive_sync.table: 同步到hive中的表名稱
  • hive_sync.db: 同步到hive的哪個數(shù)據(jù)庫中

然后插入測試數(shù)據(jù):

INSERT INTO t1 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

然后執(zhí)行

select * from t1;

可以成功查詢出數(shù)據(jù)筷登。

最后我們測試下通過Hive的beeline查詢數(shù)據(jù):

set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;

select * from t1;

結(jié)果如下:

+-------------------------+--------------------------+------------------------+----------------------------+----------------------------------------------------+----------+----------+---------+--------+---------------+
| t1._hoodie_commit_time  | t1._hoodie_commit_seqno  | t1._hoodie_record_key  | t1._hoodie_partition_path  |                t1._hoodie_file_name                | t1.uuid  | t1.name  | t1.age  | t1.ts  | t1.partition  |
+-------------------------+--------------------------+------------------------+----------------------------+----------------------------------------------------+----------+----------+---------+--------+---------------+
| 20211019164113          | 20211019164113_3_1       | id1                    | par1                       | 4c10d680-7b18-40c9-952e-75435101cb55_3-4-0_20211019164113.parquet | id1      | Danny    | 23      | 1000   | par1          |
| 20211019164113          | 20211019164113_3_2       | id2                    | par1                       | 4c10d680-7b18-40c9-952e-75435101cb55_3-4-0_20211019164113.parquet | id2      | Stephen  | 33      | 2000   | par1          |
| 20211019164113          | 20211019164113_1_3       | id3                    | par2                       | 9e94999f-0ce5-4741-b579-3aa0d5ac5f1b_1-4-0_20211019164113.parquet | id3      | Julian   | 53      | 3000   | par2          |
| 20211019164113          | 20211019164113_1_4       | id4                    | par2                       | 9e94999f-0ce5-4741-b579-3aa0d5ac5f1b_1-4-0_20211019164113.parquet | id4      | Fabian   | 31      | 4000   | par2          |
| 20211019164113          | 20211019164113_0_1       | id5                    | par3                       | fa2c57a6-6573-477d-af06-6b8f3a34f8da_0-4-0_20211019164113.parquet | id5      | Sophia   | 18      | 5000   | par3          |
| 20211019164113          | 20211019164113_0_2       | id6                    | par3                       | fa2c57a6-6573-477d-af06-6b8f3a34f8da_0-4-0_20211019164113.parquet | id6      | Emma     | 20      | 6000   | par3          |
| 20211019164113          | 20211019164113_2_1       | id7                    | par4                       | 2162e217-8d2e-4f2c-bd8b-7d76e213a1f1_2-4-0_20211019164113.parquet | id7      | Bob      | 44      | 7000   | par4          |
| 20211019164113          | 20211019164113_2_2       | id8                    | par4                       | 2162e217-8d2e-4f2c-bd8b-7d76e213a1f1_2-4-0_20211019164113.parquet | id8      | Han      | 56      | 8000   | par4          |
+-------------------------+--------------------------+------------------------+----------------------------+----------------------------------------------------+----------+----------+---------+--------+---------------+

在Zeppelin中使用Flink Hudi Hive Sync

上面的環(huán)境如果直接在Zeppelin中運行會出現(xiàn)很多報錯,首先需要處理依賴問題压状。復制hadoop mapreduce的相關(guān)jar包到flink的lib目錄中仆抵。這里以HDP的為例:

cp hadoop-mapreduce-client-jobclient-3.1.1.3.0.1.0-187.jar hadoop-mapreduce-client-core-3.1.1.3.0.1.0-187.jar hadoop-mapreduce-client-common-3.1.1.3.0.1.0-187.jar /path/to/flink-1.13.2/lib/

然后配置Zeppelin的flink interpreter跟继,不要勾選zeppelin.flink.module.enableHive配置項,否則會出現(xiàn)插入的數(shù)據(jù)無法在Hive查詢到的問題镣丑。意思也就是說舔糖,不需在flink interpreter中配置任何hive相關(guān)的內(nèi)容。接下來按照上一節(jié)的操作莺匠,可以在Zeppelin中完美使用金吗。

Hudi表權(quán)限管理

權(quán)限的分配和鑒權(quán)我們使用Ranger組件。Ranger組件支持Hive數(shù)據(jù)庫趣竣,表摇庙,甚至是字段級別的權(quán)限控制。 Spark和Flink SQL可以使用Hive的Metastore遥缕。那么是否意味著可以通過Ranger控制Hive權(quán)限的方法去控制Spark/Flink操作Hudi表的權(quán)限呢卫袒?答案是否定的。Ranger的Hive插件只能控制用戶通過Hive操作表的權(quán)限单匣,Spark/Flink完全可以繞過這些權(quán)限控制夕凝。測試表明,Ranger Hive權(quán)限可以控制用戶使用beeline查詢户秤。用戶看不到自己無權(quán)操作的數(shù)據(jù)庫和表码秉。但是通過Spark/Flink SQL操作,Ranger Hive表權(quán)限無法控制鸡号。用戶可以操作Hive metastore中任意表转砖。

如果我們使用Spark/Flink操作Hudi表,權(quán)限問題應該怎么處理呢鲸伴?

答案是使用Ranger HDFS數(shù)據(jù)目錄讀寫權(quán)限和Yarn隊列提交任務權(quán)限結(jié)合配置府蔗。我們可以使用Ranger HDFS控制目標用戶對于表數(shù)據(jù)存儲目錄的權(quán)限,使用Ranger Yarn插件控制目標用戶使用Spark/Flink提交任務到Y(jié)arn指定隊列的權(quán)限汞窗。從而實現(xiàn)對Hudi表權(quán)限的管理礁竞。

注意:Yarn中有一個配置項叫做ranger.add-yarn-authorization。如果啟用杉辙,Yarn隊列的acl和Ranger acl會同時生效。多數(shù)情況下我們僅希望通過Ranger來管理Yarn隊列權(quán)限捶朵。在這種情況下蜘矢,需要關(guān)閉該配置。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末综看,一起剝皮案震驚了整個濱河市品腹,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌红碑,老刑警劉巖舞吭,帶你破解...
    沈念sama閱讀 218,941評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件泡垃,死亡現(xiàn)場離奇詭異,居然都是意外死亡羡鸥,警方通過查閱死者的電腦和手機蔑穴,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來惧浴,“玉大人存和,你說我怎么就攤上這事≈月茫” “怎么了捐腿?”我有些...
    開封第一講書人閱讀 165,345評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長柿顶。 經(jīng)常有香客問我茄袖,道長,這世上最難降的妖魔是什么嘁锯? 我笑而不...
    開封第一講書人閱讀 58,851評論 1 295
  • 正文 為了忘掉前任宪祥,我火速辦了婚禮,結(jié)果婚禮上猪钮,老公的妹妹穿的比我還像新娘品山。我一直安慰自己,他們只是感情好烤低,可當我...
    茶點故事閱讀 67,868評論 6 392
  • 文/花漫 我一把揭開白布肘交。 她就那樣靜靜地躺著,像睡著了一般扑馁。 火紅的嫁衣襯著肌膚如雪涯呻。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,688評論 1 305
  • 那天腻要,我揣著相機與錄音复罐,去河邊找鬼。 笑死雄家,一個胖子當著我的面吹牛效诅,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播趟济,決...
    沈念sama閱讀 40,414評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼乱投,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了顷编?” 一聲冷哼從身側(cè)響起戚炫,我...
    開封第一講書人閱讀 39,319評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎媳纬,沒想到半個月后双肤,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體施掏,經(jīng)...
    沈念sama閱讀 45,775評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年茅糜,在試婚紗的時候發(fā)現(xiàn)自己被綠了七芭。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,096評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡限匣,死狀恐怖抖苦,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情米死,我是刑警寧澤锌历,帶...
    沈念sama閱讀 35,789評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站峦筒,受9級特大地震影響究西,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜物喷,卻給世界環(huán)境...
    茶點故事閱讀 41,437評論 3 331
  • 文/蒙蒙 一卤材、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧峦失,春花似錦扇丛、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至隧魄,卻和暖如春卓练,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背购啄。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評論 1 271
  • 我被黑心中介騙來泰國打工襟企, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人狮含。 一個月前我還...
    沈念sama閱讀 48,308評論 3 372
  • 正文 我出身青樓顽悼,卻偏偏與公主長得像,于是被迫代替她去往敵國和親几迄。 傳聞我的和親對象是個殘疾皇子表蝙,可洞房花燭夜當晚...
    茶點故事閱讀 45,037評論 2 355

推薦閱讀更多精彩內(nèi)容