flink cdc 整合 數(shù)據(jù)湖hudi 同步 hive

1. 版本說(shuō)明

組件 版本
hudi 10.0
flink 13.5
hive 3.1.0

2. 實(shí)現(xiàn)效果 通過(guò)flink cdc 整合 hudi 到hive

flink cdc 講解
flink cdc 1.2實(shí)例
flink cdc 2.0 實(shí)例

3.flink 需要的jar 包

需要的包:flink-connector-mysql-cdc-2.0.2.jar

-rw-r--r-- 1 root root   7802399 2月  16 00:36 doris-flink-1.0-SNAPSHOT.jar
-rw-r--r-- 1 root root    249571 2月  16 00:36 flink-connector-jdbc_2.12-1.13.5.jar
-rw-r--r-- 1 root root    359138 2月  16 00:36 flink-connector-kafka_2.12-1.13.5.jar
-rw-r--r-- 1 root root  30087268 2月  17 22:12 flink-connector-mysql-cdc-2.0.2.jar
-rw-r--r-- 1 root root     92315 2月  16 00:36 flink-csv-1.13.5.jar
-rw-r--r-- 1 root root 106535830 2月  16 00:36 flink-dist_2.12-1.13.5.jar
-rw-r--r-- 1 root root    148127 2月  16 00:36 flink-json-1.13.5.jar
-rw-r--r-- 1 root root  43317025 2月  16 00:36 flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
-rw-r--r-- 1 root root   7709740 2月  16 00:36 flink-shaded-zookeeper-3.4.14.jar
-rw-r--r-- 1 root root   3674116 2月  16 00:36 flink-sql-connector-kafka_2.12-1.13.5.jar
-rw-r--r-- 1 root root  35051557 2月  16 00:35 flink-table_2.12-1.13.5.jar
-rw-r--r-- 1 root root  38613344 2月  16 00:36 flink-table-blink_2.12-1.13.5.jar
-rw-r--r-- 1 root root  62447468 2月  16 00:36 hudi-flink-bundle_2.12-0.10.0.jar
-rw-r--r-- 1 root root  17276348 2月  16 00:36 hudi-hadoop-mr-bundle-0.10.0.jar
-rw-r--r-- 1 root root    207909 2月  16 00:36 log4j-1.2-api-2.16.0.jar
-rw-r--r-- 1 root root    301892 2月  16 00:36 log4j-api-2.16.0.jar
-rw-r--r-- 1 root root   1789565 2月  16 00:36 log4j-core-2.16.0.jar
-rw-r--r-- 1 root root     24258 2月  16 00:36 log4j-slf4j-impl-2.16.0.jar
-rw-r--r-- 1 root root    724213 2月  16 00:36 mysql-connector-java-5.1.9.jar
[root@node01 lib]# pwd
/opt/module/flink/flink-1.13.5/lib
[root@node01 lib]# 

4. 實(shí)現(xiàn)功能場(chǎng)景

在這里插入圖片描述

5. 實(shí)現(xiàn)步驟

1.創(chuàng)建數(shù)據(jù)庫(kù)表,并且配置binlog 文件
2.在flinksql 中創(chuàng)建flink cdc 表
3.創(chuàng)建視圖
4.創(chuàng)建輸出表笆环,關(guān)聯(lián)Hudi表啸驯,并且自動(dòng)同步到Hive表
5.查詢視圖數(shù)據(jù)嗅绸,插入到輸出表 -- flink  后臺(tái)實(shí)時(shí)執(zhí)行

5.1 開啟mysql binlog

server-id=162
log-bin=mysql-bin
#sync-binlog=1
# 指定不同步的庫(kù)
binlog-ignore-db=information_schema
binlog-ignore-db=performance_schema
binlog-ignore-db=sys
binlog-ignore-db=mysql
binlog_format=ROW
expire_logs_days=30
binlog_row_image=full
#指定同步的庫(kù)
#binlog-do-db=test

重啟mysql service mysqld restart

5.2 創(chuàng)建mysql 表

CREATE TABLE `Flink_cdc` (
  `id` BIGINT(64) AUTO_INCREMENT PRIMARY KEY,
  `name` VARCHAR(64)  NULL,
  `age` INT(20) NULL,
    birthday TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
   ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
) ;
INSERT INTO `wudldb`.`Flink_cdc`(NAME,age) VALUES("flink",18) ;

5.3 在flinksql 中 創(chuàng)建flinkcdc 表

Flink SQL> CREATE TABLE source_mysql (
   id BIGINT PRIMARY KEY NOT ENFORCED,
   name STRING,
   age INT,
   birthday TIMESTAMP(3),
   ts TIMESTAMP(3)
 ) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = '192.168.1.162',
 'port' = '3306',
 'username' = 'root',
 'password' = '123456',
 'server-time-zone' = 'Asia/Shanghai',
 'debezium.snapshot.mode' = 'initial',
 'database-name' = 'wudldb',
 'table-name' = 'Flink_cdc'
 );
[INFO] Execute statement succeed.

5.4 創(chuàng)建flinksql 中的 flinkcdc 視圖

Flink SQL> create view view_source_flinkcdc_mysql 
> AS 
> SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') as part FROM source_mysql;
[INFO] Execute statement succeed.

5.5 創(chuàng)建輸出表野揪,關(guān)聯(lián)Hudi表,并且自動(dòng)同步到Hive表

Flink SQL> CREATE TABLE flink_cdc_sink_hudi_hive(
> id bigint ,
> name string,
> age int,
> birthday TIMESTAMP(3),
> ts TIMESTAMP(3),
> part VARCHAR(20),
> primary key(id) not enforced
> )
> PARTITIONED BY (part)
> with(
> 'connector'='hudi',
> 'path'= 'hdfs://192.168.1.161:8020/flink_cdc_sink_hudi_hive', 
> 'table.type'= 'MERGE_ON_READ',
> 'hoodie.datasource.write.recordkey.field'= 'id', 
> 'write.precombine.field'= 'ts',
> 'write.tasks'= '1',
> 'write.rate.limit'= '2000', 
> 'compaction.tasks'= '1', 
> 'compaction.async.enabled'= 'true',
> 'compaction.trigger.strategy'= 'num_commits',
> 'compaction.delta_commits'= '1',
> 'changelog.enabled'= 'true',
> 'read.streaming.enabled'= 'true',
> 'read.streaming.check-interval'= '3',
> 'hive_sync.enable'= 'true',
> 'hive_sync.mode'= 'hms',
> 'hive_sync.metastore.uris'= 'thrift://node02.com:9083',
> 'hive_sync.jdbc_url'= 'jdbc:hive2://node02.com:10000',
> 'hive_sync.table'= 'flink_cdc_sink_hudi_hive',
> 'hive_sync.db'= 'db_hive',
> 'hive_sync.username'= 'root',
> 'hive_sync.password'= '123456',
> 'hive_sync.support_timestamp'= 'true'
> );
[INFO] Execute statement succeed.

5.6 . 查詢視圖數(shù)據(jù),插入到輸出表

Flink SQL> INSERT INTO flink_cdc_sink_hudi_hive SELECT id, name,age,birthday, ts, part FROM view_source_flinkcdc_mysql ;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: c618c9f528b9793adf4418640bb2a0fc

5.7 查看flink 運(yùn)行job

在這里插入圖片描述

6.hudi 與hive 整合

將hudi hudi-hadoop-mr-bundle-0.10.0.jar 拷貝到hive的lib 目錄下面 , 重啟hive 服務(wù)

6.1 連接hive 查看hudi 同步到hive 中的表

0: jdbc:hive2://node01.com:2181,node02.com:21> show tables;
INFO  : Compiling command(queryId=hive_20220218000941_016798b7-3ecd-4c41-ae54-65e6a034968f): show tables
INFO  : Semantic Analysis Completed (retrial = false)
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:tab_name, type:string, comment:from deserializer)], properties:null)
INFO  : Completed compiling command(queryId=hive_20220218000941_016798b7-3ecd-4c41-ae54-65e6a034968f); Time taken: 0.016 seconds
INFO  : Executing command(queryId=hive_20220218000941_016798b7-3ecd-4c41-ae54-65e6a034968f): show tables
INFO  : Starting task [Stage-0:DDL] in serial mode
INFO  : Completed executing command(queryId=hive_20220218000941_016798b7-3ecd-4c41-ae54-65e6a034968f); Time taken: 0.012 seconds
INFO  : OK
+------------------------------+
|           tab_name           |
+------------------------------+
| flink_cdc_sink_hudi_hive_ro  |
| flink_cdc_sink_hudi_hive_rt  |
+------------------------------+

6.1 查詢

0: jdbc:hive2://node01.com:2181,node02.com:21> select id ,name , age , birthday from flink_cdc_sink_hudi_hive_ro;
INFO  : Compiling command(queryId=hive_20220218003353_57a46dca-3cd2-4da1-b455-bbd63da16413): select id ,name , age , birthday from flink_cdc_sink_hudi_hive_ro
INFO  : Semantic Analysis Completed (retrial = false)
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:id, type:bigint, comment:null), FieldSchema(name:name, type:string, comment:null), FieldSchema(name:age, type:int, comment:null), FieldSchema(name:birthday, type:bigint, comment:null)], properties:null)
INFO  : Completed compiling command(queryId=hive_20220218003353_57a46dca-3cd2-4da1-b455-bbd63da16413); Time taken: 0.124 seconds
INFO  : Executing command(queryId=hive_20220218003353_57a46dca-3cd2-4da1-b455-bbd63da16413): select id ,name , age , birthday from flink_cdc_sink_hudi_hive_ro
INFO  : Completed executing command(queryId=hive_20220218003353_57a46dca-3cd2-4da1-b455-bbd63da16413); Time taken: 0.029 seconds
INFO  : OK
+-----+--------+------+----------------+
| id  |  name  | age  |    birthday    |
+-----+--------+------+----------------+
| 1   | flink  | 18   | 1645142397000  |
+-----+--------+------+----------------+
1 row selected (0.278 seconds)
0: jdbc:hive2://node01.com:2181,node02.com:21> 

整體效果


在這里插入圖片描述

錯(cuò)誤 中途遇到一個(gè)錯(cuò)誤

flinkcdc 需要的 flink-connector-mysql-cdc-2.0.2.jar 而不是 flink-sql-connector-mysql-cdc-2.0.2.jar 這個(gè)包
否在會(huì)遇到一下錯(cuò)誤:

Flink SQL> select * from users_source_mysql;


Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
    at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
    at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/connect/data/Schema
    at java.lang.Class.getDeclaredMethods0(Native Method)
    at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
    at java.lang.Class.getDeclaredMethod(Class.java:2128)
    at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1629)
    at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79)
    at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
    at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:494)
    at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
    at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobVertex(StreamingJobGraphGenerator.java:597)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:457)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:378)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:179)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:117)
    at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:934)
    at org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:50)
    at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:39)
    at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:56)
    at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:67)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1957)
    at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:55)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:795)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1225)
    at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeOperation$3(LocalExecutor.java:213)
    at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:90)
    at org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:213)
    at org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:235)
    at org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:479)
    at org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:412)
    at org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$0(CliClient.java:327)
    at java.util.Optional.ifPresent(Optional.java:159)
    at org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:327)
    at org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297)
    at org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221)
    at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151)
    at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95)
    at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
    ... 1 more
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.connect.data.Schema
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 69 more

Shutting down the session...
done.
[root@node01 bin]# 

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末笛厦,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子俺夕,更是在濱河造成了極大的恐慌裳凸,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,185評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件劝贸,死亡現(xiàn)場(chǎng)離奇詭異姨谷,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)映九,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門梦湘,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人件甥,你說(shuō)我怎么就攤上這事捌议。” “怎么了引有?”我有些...
    開封第一講書人閱讀 163,524評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵瓣颅,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我轿曙,道長(zhǎng)弄捕,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,339評(píng)論 1 293
  • 正文 為了忘掉前任导帝,我火速辦了婚禮守谓,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘您单。我一直安慰自己斋荞,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,387評(píng)論 6 391
  • 文/花漫 我一把揭開白布虐秦。 她就那樣靜靜地躺著平酿,像睡著了一般。 火紅的嫁衣襯著肌膚如雪悦陋。 梳的紋絲不亂的頭發(fā)上蜈彼,一...
    開封第一講書人閱讀 51,287評(píng)論 1 301
  • 那天,我揣著相機(jī)與錄音俺驶,去河邊找鬼幸逆。 笑死,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的还绘。 我是一名探鬼主播楚昭,決...
    沈念sama閱讀 40,130評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼拍顷!你這毒婦竟也來(lái)了抚太?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,985評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤昔案,失蹤者是張志新(化名)和其女友劉穎尿贫,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體踏揣,經(jīng)...
    沈念sama閱讀 45,420評(píng)論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡帅霜,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,617評(píng)論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了呼伸。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片身冀。...
    茶點(diǎn)故事閱讀 39,779評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖括享,靈堂內(nèi)的尸體忽然破棺而出搂根,到底是詐尸還是另有隱情,我是刑警寧澤铃辖,帶...
    沈念sama閱讀 35,477評(píng)論 5 345
  • 正文 年R本政府宣布剩愧,位于F島的核電站,受9級(jí)特大地震影響娇斩,放射性物質(zhì)發(fā)生泄漏仁卷。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,088評(píng)論 3 328
  • 文/蒙蒙 一犬第、第九天 我趴在偏房一處隱蔽的房頂上張望锦积。 院中可真熱鬧,春花似錦歉嗓、人聲如沸丰介。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)哮幢。三九已至,卻和暖如春志珍,著一層夾襖步出監(jiān)牢的瞬間橙垢,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工伦糯, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留柜某,地道東北人点额。 一個(gè)月前我還...
    沈念sama閱讀 47,876評(píng)論 2 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像莺琳,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子载慈,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,700評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容