Flink-CDC 同步Mysql數(shù)據(jù)到S3 Hudi

軟件版本

Mysql: 5.7
Hadoop: 3.1.3
Flink: 1.12.2
Hudi: 0.9.0
Hive: 2.3.7

1.Mysql建表并開啟bin_log

create table users(
    id bigint auto_increment primary key,
    name varchar(20) null,
    birthday timestamp default CURRENT_TIMESTAMP not null,
    ts timestamp default CURRENT_TIMESTAMP not null
);

2.安裝Hadoop

(1)解壓hadoop安裝包:tar -zxvf hadoop-3.1.3.tar.gz
(2)配置環(huán)境變量

export HADOOP_HOME=/Users/xxx/hadoop/hadoop-3.1.3
export HADOOP_COMMON_HOME=$HADOOP_HOME
export PATH=$HADOOP_HOME/bin:$PATH

#添加hadoop classpath
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

3.下載安裝Flink

(1)在Flink官網(wǎng)下載flink軟件包:https://flink.apache.org/downloads.html
(2)解壓:tar -zxvf flink-1.12.2-bin-scala_2.11.tgz
(3)配置flink(vim conf/flink-conf.yaml)癌蓖,開啟checkpoint(flink-cdc需要開啟checkpoint才能生成hudi commit科阎,提交數(shù)據(jù))

state.backend: filesystem
execution.checkpointing.interval: 10000
state.checkpoints.dir: file:///Users/xxx/flink/flink-1.12.2/hudi/flink-checkpoints
state.savepoints.dir: file:///Users/xxx/flink/flink-1.12.2/hudi/flink-savepoints

(4)配置flink(vim conf/flink-conf.yaml),增加slot數(shù)

taskmanager.numberOfTaskSlots: 4
vim workers
  1 localhost
  2 localhost
  3 localhost
  4 localhost

(4)啟動Flink:bin/start-cluster.sh

4.編譯Hudi业栅,拷貝jar包

(1)下載Hudi源碼:git clone https://github.com/apache/hudi.git
(2)切換到0.9.0分支:git checkout origin release-0.9.0
(3)編譯:mvn clean package -DskipTests
(4)編譯完成后,會在packaging/hudi-flink-bundle/target目錄下生成對應的jar包(hudi-flink-bundle_2.11-0.9.0.jar)间影,將此jar包拷貝至flink的lib目錄中:

cp hudi-flink-bundle_2.11-0.9.0.jar ~/flink/lib

5.將其他相關jar包拷貝至flink/lib目錄下

(1)flink-sql-connector-mysql-cdc-1.2.0.jar:用于連接mysql
(2)aws-java-sdk-bundle-1.11.874.jar/hadoop-aws-3.1.3.jar:用于連接aws s3

6.啟動sql-client

1.bin/sql-client.sh embedded
2.建立mysql 映射表
create table mysql_users(
    id bigint primary key not enforced,
    name string,
    birthday timestamp(3),
    ts timestamp(3)
) with (
    'connector' = 'mysql-cdc',
    'hostname' = '127.0.0.1',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'test_cdc',
    'table-name' = 'users'
);

3.建立hudi映射表
create table hudi_users(
    id bigint primary key not enforced,
    name string,
    birthday timestamp(3),
    ts timestamp(3),
    `partition` varchar(20)
) partitioned by (`partition`) with (
    'connector' = 'hudi',
    'table.type' = 'COPY_ON_WRITE',
    'path' = 's3a://xxx/yyy/hudi_users',
    'read.streaming.enabled' = 'true',
    'read.streaming.check-interval' = '1'
);

4.創(chuàng)建任務
insert into hudi_users select *, date_format(birthday, 'yyyyMMdd') from mysql_users;

檢查s3上是否生成了數(shù)據(jù)窒朋;

7.Hive建立external table

1.通過beeline連接hive
!connect jdbc:hive2://[ELB-DEV-Presto-hs2-s0000e2c5-06a22927ec8bb2f6.elb.us-east-1.amazonaws.com:10000/default;auth=noSasl](http://elb-dev-presto-hs2-s0000e2c5-06a22927ec8bb2f6.elb.us-east-1.amazonaws.com:10000/default;auth=noSasl)


CREATE EXTERNAL TABLE `hudi_user_mor`(               
   `_hoodie_commit_time` string,                    
   `_hoodie_commit_seqno` string,                   
   `_hoodie_record_key` string,                     
   `_hoodie_partition_path` string,                 
   `_hoodie_file_name` string,                      
   `id` bigint,                                     
   `name` string,                                   
   `birthday` bigint,                               
   `ts` bigint)                                     
 PARTITIONED BY (                                   
   `partition` string)                              
 ROW FORMAT SERDE                                   
   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  
 STORED AS INPUTFORMAT                              
   'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' 
 OUTPUTFORMAT                                       
   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
 LOCATION                                           
   's3a://xxx/yyy/hudi_users';

添加分區(qū):
alter table hudi_user_mor add if not exists partition(`partition`='par1') location 's3a://fw-itf/DFMOD-c34db792/target_table/par1';

8.通過presto查詢數(shù)據(jù)

1.進入presto
./presto-cli-0.248-executable.jar --server ELB-DEV-Presto-master-s0000eca1-efaff1be86b6ffa3.elb.us-east-1.amazonaws.com:9106 --catalog db

2.查詢數(shù)據(jù)
select * from hudi_user_mor where partition = 'par1' limit 5;

8.測試同步

在mysql中執(zhí)行增、刪艘绍、改語句,并在Hive或presto中進行查詢秫筏,可以實時的查詢到改動诱鞠。

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市这敬,隨后出現(xiàn)的幾起案子航夺,更是在濱河造成了極大的恐慌,老刑警劉巖崔涂,帶你破解...
    沈念sama閱讀 212,686評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件阳掐,死亡現(xiàn)場離奇詭異,居然都是意外死亡冷蚂,警方通過查閱死者的電腦和手機缭保,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,668評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來蝙茶,“玉大人艺骂,你說我怎么就攤上這事÷『唬” “怎么了钳恕?”我有些...
    開封第一講書人閱讀 158,160評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長蹄衷。 經(jīng)常有香客問我忧额,道長,這世上最難降的妖魔是什么愧口? 我笑而不...
    開封第一講書人閱讀 56,736評論 1 284
  • 正文 為了忘掉前任睦番,我火速辦了婚禮,結果婚禮上调卑,老公的妹妹穿的比我還像新娘抡砂。我一直安慰自己,他們只是感情好恬涧,可當我...
    茶點故事閱讀 65,847評論 6 386
  • 文/花漫 我一把揭開白布注益。 她就那樣靜靜地躺著,像睡著了一般溯捆。 火紅的嫁衣襯著肌膚如雪丑搔。 梳的紋絲不亂的頭發(fā)上厦瓢,一...
    開封第一講書人閱讀 50,043評論 1 291
  • 那天,我揣著相機與錄音啤月,去河邊找鬼煮仇。 笑死,一個胖子當著我的面吹牛谎仲,可吹牛的內(nèi)容都是我干的浙垫。 我是一名探鬼主播,決...
    沈念sama閱讀 39,129評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼郑诺,長吁一口氣:“原來是場噩夢啊……” “哼夹姥!你這毒婦竟也來了?” 一聲冷哼從身側響起辙诞,我...
    開封第一講書人閱讀 37,872評論 0 268
  • 序言:老撾萬榮一對情侶失蹤辙售,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后飞涂,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體旦部,經(jīng)...
    沈念sama閱讀 44,318評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,645評論 2 327
  • 正文 我和宋清朗相戀三年较店,在試婚紗的時候發(fā)現(xiàn)自己被綠了士八。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,777評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡泽西,死狀恐怖曹铃,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情捧杉,我是刑警寧澤,帶...
    沈念sama閱讀 34,470評論 4 333
  • 正文 年R本政府宣布秘血,位于F島的核電站味抖,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏灰粮。R本人自食惡果不足惜仔涩,卻給世界環(huán)境...
    茶點故事閱讀 40,126評論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望粘舟。 院中可真熱鬧熔脂,春花似錦、人聲如沸柑肴。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,861評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽晰骑。三九已至适秩,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背秽荞。 一陣腳步聲響...
    開封第一講書人閱讀 32,095評論 1 267
  • 我被黑心中介騙來泰國打工骤公, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人扬跋。 一個月前我還...
    沈念sama閱讀 46,589評論 2 362
  • 正文 我出身青樓阶捆,卻偏偏與公主長得像,于是被迫代替她去往敵國和親钦听。 傳聞我的和親對象是個殘疾皇子洒试,可洞房花燭夜當晚...
    茶點故事閱讀 43,687評論 2 351

推薦閱讀更多精彩內(nèi)容

  • 1.根據(jù)網(wǎng)上文章,客戶端使用flink1.11.4+iceberg-flink-runtime-0.11.1.ja...
    不喜歡代碼的帥戴戴閱讀 3,007評論 0 0
  • 一彪见、架構 二儡司、框架部署 2.1 準備 準備三臺虛擬機,操作系統(tǒng)為CentOS 7.x余指,每臺內(nèi)存至少8G以上捕犬。 步驟...
    CJ21閱讀 1,090評論 0 3
  • 16宿命:用概率思維提高你的勝算 以前的我是風險厭惡者,不喜歡去冒險酵镜,但是人生放棄了冒險碉碉,也就放棄了無數(shù)的可能。 ...
    yichen大刀閱讀 6,041評論 0 4
  • 公元:2019年11月28日19時42分農(nóng)歷:二零一九年 十一月 初三日 戌時干支:己亥乙亥己巳甲戌當月節(jié)氣:立冬...
    石放閱讀 6,876評論 0 2