1.架構(gòu)圖
flink-數(shù)倉.png
2.實(shí)現(xiàn)實(shí)例
2.1 通過flink cdc 的兩張表 合并 成一張視圖, 同時寫入到數(shù)據(jù)湖(hudi) 中 同時寫入到kafka 中
flink-hudishili.png
2.2 實(shí)現(xiàn)思路
1.在flinksql 中創(chuàng)建flink cdc 表
2.創(chuàng)建視圖(用兩張表關(guān)聯(lián)后需要的列的結(jié)果顯示為一張速度)
3.創(chuàng)建輸出表,關(guān)聯(lián)Hudi表锣枝,并且自動同步到Hive表
4.查詢視圖數(shù)據(jù)锯仪,插入到輸出表 -- flink 后臺實(shí)時執(zhí)行
2.3pom 文件需要的類
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>wudl-hudi</artifactId>
<groupId>wudl-hudi</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>flink13.5-hudi</artifactId>
<!-- 指定倉庫位置流译,依次為aliyun黍析、apache和cloudera倉庫 -->
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>apache</id>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>spring-plugin</id>
<url>https://repo.spring.io/plugins-release/</url>
</repository>
</repositories>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<flink.version>1.13.5</flink.version>
<hadoop.version>2.7.3</hadoop.version>
<mysql.version>8.0.16</mysql.version>
<flink-mysql-cdc>2.0.2</flink-mysql-cdc>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.12</artifactId>
<version>1.10.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flink-mysql-cdc}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version>
</dependency>
<!-- Flink Client -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Table API & SQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink-bundle_${scala.binary.version}</artifactId>
<version>0.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.7.5-10.0</version>
</dependency>
<!-- MySQL/FastJson/lombok -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
<!-- slf4j及l(fā)og4j -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
2.4 代碼實(shí)現(xiàn)
package com.wudl.hudi.sink;
//import org.apache.flink.api.common.restartstrategy.RestartStrategies;
//import org.apache.flink.runtime.state.filesystem.FsStateBackend;
//import org.apache.flink.streaming.api.CheckpointingMode;
//import org.apache.flink.streaming.api.environment.CheckpointConfig;
//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
//import org.apache.flink.table.api.EnvironmentSettings;
//import org.apache.flink.table.api.Table;
//import org.apache.flink.table.api.TableResult;
//import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
//
//import static org.apache.flink.table.api.Expressions.$;
import com.wudl.hudi.utils.MyKafkaUtil;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.$;
/**
* @author :wudl
* @date :Created in 2022-02-19 22:18
* @description:
* @modified By:
* @version: 1.0
*/
public class MysqlJoinMysqlHuDi {
public static void main(String[] args) throws Exception {
// 1-獲取表執(zhí)行環(huán)境getExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// TODO: 由于增量將數(shù)據(jù)寫入到Hudi表严肪,所以需要啟動Flink Checkpoint檢查點(diǎn)
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 1.1 開啟CK
env.enableCheckpointing(5000L);
env.getCheckpointConfig().setCheckpointTimeout(10000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//正常Cancel任務(wù)時,保留最后一次CK
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//重啟策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
//狀態(tài)后端
env.setStateBackend(new FsStateBackend("hdfs://192.168.1.161:8020/flink-hudi/ck"));
//設(shè)置訪問HDFS的用戶名
System.setProperty("HADOOP_USER_NAME", "root");
// 2-創(chuàng)建輸入表姨涡,TODO:從Kafka消費(fèi)數(shù)據(jù)
tableEnv.executeSql(
"CREATE TABLE IF NOT EXISTS source_mysql ( " +
" id BIGINT primary key NOT ENFORCED ," +
" name string," +
" age int ," +
" birthday TIMESTAMP(3)," +
" ts TIMESTAMP(3)" +
") WITH ( " +
" 'connector' = 'mysql-cdc', " +
" 'hostname' = '192.168.1.162', " +
" 'port' = '3306', " +
" 'username' = 'root', " +
" 'password' = '123456', " +
" 'server-time-zone' = 'Asia/Shanghai', " +
" 'scan.startup.mode' = 'initial', " +
" 'database-name' = 'wudldb', " +
" 'table-name' = 'Flink_cdc' " +
" )");
tableEnv.executeSql(
"CREATE TABLE IF NOT EXISTS source_mysql_Flink_cdd ( " +
" id BIGINT primary key NOT ENFORCED ," +
" phone string," +
" address string ," +
" ts TIMESTAMP(3)" +
") WITH ( " +
" 'connector' = 'mysql-cdc', " +
" 'hostname' = '192.168.1.162', " +
" 'port' = '3306', " +
" 'username' = 'root', " +
" 'password' = '123456', " +
" 'server-time-zone' = 'Asia/Shanghai', " +
" 'scan.startup.mode' = 'initial', " +
" 'database-name' = 'wudldb', " +
" 'table-name' = 'Flink_cdd' " +
" )");
String joinSql = "SELECT b.id id,b.name name,b.age age,CAST(b.birthday as STRING) birthday ,a.phone phone,a.address address,CAST(a.ts AS STRING) ts FROM source_mysql_Flink_cdd a INNER JOIN source_mysql b ON a.id = b.id";
Table tableMysqlJoin = tableEnv.sqlQuery(joinSql);
// 4-創(chuàng)建輸出表衩藤,TODO: 關(guān)聯(lián)到Hudi表,指定Hudi表名稱涛漂,存儲路徑赏表,字段名稱等等信息
tableEnv.createTemporaryView("viewFlinkCdc",tableMysqlJoin);
tableEnv.executeSql(
"CREATE TABLE myslqjoinmysqlhudiSink (" +
" id BIGINT PRIMARY KEY NOT ENFORCED," +
" name STRING," +
" age INT," +
" birthday STRING," +
" phone STRING," +
" address STRING," +
" ts STRING" +
")" +
"WITH (" +
" 'connector' = 'hudi'," +
" 'path' = 'file:///D:/myslqjoinmysqlhudiSink'," +
// " 'path' = 'hdfs://192.168.1.161:8020/hudi-warehouse/order_hudi_sink' ,\n" +
" 'table.type' = 'MERGE_ON_READ'," +
" 'write.operation' = 'upsert'," +
" 'hoodie.datasource.write.recordkey.field'= 'id'," +
" 'write.precombine.field' = 'ts'," +
" 'write.tasks'= '1'" +
")"
);
TableResult kafkaSink = tableEnv.executeSql(
"CREATE TABLE flinkCdc_kafka_Sink (" +
" id BIGINT NOT NULL," +
" name STRING," +
" age INT," +
" birthday STRING," +
" phone STRING," +
" address STRING," +
" ts STRING" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'sinktest'," +
" 'scan.startup.mode' = 'earliest-offset', "+
" 'properties.bootstrap.servers' = '192.168.1.161:6667'," +
" 'format' = 'debezium-json'," +
" 'debezium-json.ignore-parse-errors'='true' " +
")"
);
// // 5-通過子查詢方式,將數(shù)據(jù)寫入輸出表
tableEnv.executeSql(
"INSERT INTO myslqjoinmysqlhudiSink " +
"SELECT id,name,age,birthday,phone,address, ts FROM viewFlinkCdc"
);
tableEnv.sqlQuery("select * from flinkCdc_kafka_Sink").printSchema();
tableEnv.sqlQuery("select * from viewFlinkCdc").printSchema();
tableEnv.executeSql("insert into flinkCdc_kafka_Sink SELECT b.id id,b.name name,b.age age,CAST(b.birthday as STRING) birthday ,a.phone phone,a.address address,CAST(a.ts AS STRING) ts FROM source_mysql_Flink_cdd a INNER JOIN source_mysql b ON a.id = b.id ");
// tableEnv.executeSql("insert into myslqjoinmysqlhudiSink SELECT b.id id,b.name name,b.age age,CAST(b.birthday as STRING) birthday ,a.phone phone,a.address address,CAST(a.ts AS STRING) ts FROM source_mysql_Flink_cdd a INNER JOIN source_mysql b ON a.id = b.id ");
// tableEnv.executeSql("insert into flinkCdc_kafka_Sink select id,name,age,CAST(birthday as STRING) birthday ,phone, address,CAST(ts AS STRING) ts from myslqjoinmysqlhudiSink ");
// tableEnv.executeSql("insert into flinkcdc_hudi_sink select id,name,age,CAST(birthday as STRING) birthday, CAST(ts as STRING) ts from source_mysql ");
System.out.println("--------------------------");
}
}
2.5 mysql 表結(jié)構(gòu)
CREATE TABLE `Flink_cdc` (
`id` bigint(64) NOT NULL AUTO_INCREMENT,
`name` varchar(64) DEFAULT NULL,
`age` int(20) DEFAULT NULL,
`birthday` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7604 DEFAULT CHARSET=utf8mb4
#*********************************************************************************
CREATE TABLE `Flink_cdd` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`phone` varchar(20) DEFAULT NULL,
`address` varchar(200) DEFAULT NULL,
`ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7417 DEFAULT CHARSET=utf8mb4
2.6mysql 代碼實(shí)現(xiàn)
package com.wudl.hudi.source;
import com.alibaba.fastjson.JSON;
import com.wudl.hudi.entity.FlinkCdcBean;
import com.wudl.hudi.entity.Order;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.text.SimpleDateFormat;
import java.util.*;
/**
* @author :wudl
* @date :Created in 2022-02-19 14:24
* @description:
* @modified By:
* @version: 1.0
*/
public class GenerateMysqlFlinkCdcBean implements SourceFunction<FlinkCdcBean> {
private boolean isRunning = true;
String[] citys = {"北京", "廣東", "山東", "江蘇", "河南", "上海", "河北", "浙江", "香港", "山西", "陜西", "湖南", "重慶", "福建", "天津", "云南", "四川", "廣西", "安徽", "海南", "江西", "湖北", "山西", "遼寧", "內(nèi)蒙古"};
Integer i = 0;
List<Order> list = new ArrayList<>();
@Override
public void run(SourceContext<FlinkCdcBean> ctx) throws Exception {
Random random = new Random();
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
while (isRunning) {
int number = random.nextInt(4) + 1;
String name = getChineseName();
String address = citys[random.nextInt(citys.length)];
int age = random.nextInt(25);
String birthday = getDate();
String phone = getTel();
java.sql.Timestamp ts = new java.sql.Timestamp(df.parse(getDate()).getTime());
FlinkCdcBean flinkCdcBean = new FlinkCdcBean(name, age, birthday, ts, phone, address);
ctx.collect(flinkCdcBean);
}
}
/**
* 獲取當(dāng)前時間
*
* @return
*/
public static String getDate() throws InterruptedException {
Calendar calendar = Calendar.getInstance();
Date date = calendar.getTime();
String dataStr = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(date);
Thread.sleep(10);
return dataStr;
}
public static int getNum(int start, int end) {
return (int) (Math.random() * (end - start + 1) + start);
}
private static String[] telFirst = "134,135,136,137,138,139,150,151,152,157,158,159,130,131,132,155,156,133,153".split(",");
/**
* 獲取手機(jī)號碼
*
* @return
*/
private static String getTel() {
int index = getNum(0, telFirst.length - 1);
String first = telFirst[index];
String second = String.valueOf(getNum(1, 888) + 10000).substring(1);
String third = String.valueOf(getNum(1, 9100) + 10000).substring(1);
return first + second + third;
}
@Override
public void cancel() {
isRunning = false;
}
private static String firstName="趙錢孫李周吳鄭王馮陳褚衛(wèi)蔣沈韓楊朱秦尤許何呂施張孔曹嚴(yán)華金魏陶姜戚謝鄒喻柏水竇章云蘇潘葛奚范彭郎魯韋昌馬苗鳳花方俞任袁柳酆鮑史唐費(fèi)廉岑薛雷賀倪湯滕殷羅畢郝鄔安常樂于時傅皮卞齊康伍余元卜顧孟平黃和穆蕭尹姚邵湛汪祁毛禹狄米貝明臧計伏成戴談宋茅龐熊紀(jì)舒屈項(xiàng)祝董梁杜阮藍(lán)閔席季麻強(qiáng)賈路婁危江童顏郭梅盛林刁鐘徐邱駱高夏蔡田樊胡凌霍虞萬支柯咎管盧莫經(jīng)房裘繆干解應(yīng)宗宣丁賁鄧郁單杭洪包諸左石崔吉鈕龔程嵇邢滑裴陸榮翁荀羊於惠甄魏加封芮羿儲靳汲邴糜松井段富巫烏焦巴弓牧隗山谷車侯宓蓬全郗班仰秋仲伊宮寧仇欒暴甘鈄厲戎祖武符劉姜詹束龍葉幸司韶郜黎薊薄印宿白懷蒲臺從鄂索咸籍賴卓藺屠蒙池喬陰郁胥能蒼雙聞莘黨翟譚貢勞逄姬申扶堵冉宰酈雍卻璩桑桂濮牛壽通邊扈燕冀郟浦尚農(nóng)溫別莊晏柴瞿閻充慕連茹習(xí)宦艾魚容向古易慎戈廖庚終暨居衡步都耿滿弘匡國文寇廣祿闕東毆殳沃利蔚越夔隆師鞏厙聶晁勾敖融冷訾辛闞那簡饒空曾毋沙乜養(yǎng)鞠須豐巢關(guān)蒯相查后江紅游竺權(quán)逯蓋益桓公萬俟司馬上官歐陽夏侯諸葛聞人東方赫連皇甫尉遲公羊澹臺公冶宗政濮陽淳于仲孫太叔申屠公孫樂正軒轅令狐鐘離閭丘長孫慕容鮮于宇文司徒司空亓官司寇仉督子車顓孫端木巫馬公西漆雕樂正壤駟公良拓拔夾谷宰父谷粱晉楚閻法汝鄢涂欽段干百里東郭南門呼延歸海羊舌微生岳帥緱亢況后有琴梁丘左丘東門西門商牟佘佴伯賞南宮墨哈譙笪年愛陽佟第五言福百家姓續(xù)";
private static String girl="秀娟英華慧巧美娜靜淑惠珠翠雅芝玉萍紅娥玲芬芳燕彩春菊蘭鳳潔梅琳素云蓮真環(huán)雪榮愛妹霞香月鶯媛艷瑞凡佳嘉瓊勤珍貞莉桂娣葉璧璐婭琦晶妍茜秋珊莎錦黛青倩婷姣婉嫻瑾穎露瑤怡嬋雁蓓紈儀荷丹蓉眉君琴蕊薇菁夢嵐苑婕馨瑗琰韻融園藝詠卿聰瀾純毓悅昭冰爽琬茗羽希寧欣飄育瀅馥筠柔竹靄凝曉歡霄楓蕓菲寒伊亞宜可姬舒影荔枝思麗 ";
private static String boy="偉剛勇毅俊峰強(qiáng)軍平保東文輝力明永健世廣志義興良海山仁波寧貴福生龍?jiān)珖鴦賹W(xué)祥才發(fā)武新利清飛彬富順信子杰濤昌成康星光天達(dá)安巖中茂進(jìn)林有堅(jiān)和彪博誠先敬震振壯會思群豪心邦承樂紹功松善厚慶磊民友裕河哲江超浩亮政謙亨奇固之輪翰朗伯宏言若鳴朋斌梁棟維啟克倫翔旭鵬澤晨辰士以建家致樹炎德行時泰盛雄琛鈞冠策騰楠榕風(fēng)航弘";
/**
* 返回中文姓名
*/
private static String name_sex = "";
private static String getChineseName() {
int index=getNum(0, firstName.length()-1);
String first=firstName.substring(index, index+1);
int sex=getNum(0,1);
String str=boy;
int length=boy.length();
if(sex==0){
str=girl;
length=girl.length();
name_sex = "女";
}else {
name_sex="男";
}
index=getNum(0,length-1);
String second=str.substring(index, index+1);
int hasThird=getNum(0,1);
String third="";
if(hasThird==1){
index=getNum(0,length-1);
third=str.substring(index, index+1);
}
return first+second+third;
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<FlinkCdcBean> addSource = env.addSource(new GenerateMysqlFlinkCdcBean());
addSource.print();
Thread.sleep(5000);
addSource.addSink(new MysqlJdbcSink());
env.execute();
}
}
package com.wudl.hudi.source;
import com.wudl.hudi.entity.FlinkCdcBean;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
/**
* @author :wudl
* @date :Created in 2022-02-19 15:08
* @description:
* @modified By:
* @version: 1.0
*/
public class MysqlJdbcSink extends RichSinkFunction<FlinkCdcBean> {
// 聲明連接和預(yù)編譯語句
Connection connection = null;
PreparedStatement insertStmtFlink_cdc = null;
PreparedStatement insertStmtFlink_cdd = null;
@Override
public void open(Configuration parameters) throws Exception {
connection = DriverManager.getConnection("jdbc:mysql://192.168.1.162:3306/wudldb?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai", "root", "123456");
insertStmtFlink_cdc = connection.prepareStatement("INSERT INTO `wudldb`.`Flink_cdc`(NAME,age,birthday,ts) VALUES(?,?,?,?) ");
insertStmtFlink_cdd = connection.prepareStatement("INSERT INTO `wudldb`.`Flink_cdd` (phone,address,ts) VALUES(?,?,?) ");
}
// 每來一條數(shù)據(jù)匈仗,調(diào)用連接瓢剿,執(zhí)行sql
@Override
public void invoke(FlinkCdcBean fc, Context context) throws Exception {
/***************** */
insertStmtFlink_cdc.setString(1, fc.getName());
insertStmtFlink_cdc.setInt(2, fc.getAge());
insertStmtFlink_cdc.setString(3, fc.getTs().toString());
insertStmtFlink_cdc.setString(4, fc.getTs().toString());
insertStmtFlink_cdc.execute();
insertStmtFlink_cdd.setString(1, fc.getPhone());
insertStmtFlink_cdd.setString(2, fc.getAddress());
insertStmtFlink_cdd.setString(3, fc.getTs().toString());
insertStmtFlink_cdd.executeUpdate();
}
@Override
public void close() throws Exception {
insertStmtFlink_cdc.close();
insertStmtFlink_cdd.close();
connection.close();
}
}
2.7 讀取hudi 數(shù)據(jù)
package com.wudl.hudi.sink;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @author :wudl
* @date :Created in 2022-02-19 22:18
* @description:
* @modified By:
* @version: 1.0
*/
public class MysqlJoinMysqlHuDiRead {
public static void main(String[] args) throws Exception {
// 1-獲取表執(zhí)行環(huán)境getExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// TODO: 由于增量將數(shù)據(jù)寫入到Hudi表,所以需要啟動Flink Checkpoint檢查點(diǎn)
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql(
"CREATE TABLE order_hudi(\n" +
" id BIGINT PRIMARY KEY NOT ENFORCED," +
" name STRING," +
" age INT," +
" birthday STRING," +
" phone STRING," +
" address STRING," +
" ts STRING" +
")" +
"WITH (" +
" 'connector' = 'hudi'," +
" 'path' = 'file:///D:/myslqjoinmysqlhudiSink'," +
" 'table.type' = 'MERGE_ON_READ'," +
" 'read.streaming.enabled' = 'true'," +
" 'read.streaming.check-interval' = '4'" +
")"
);
tableEnv.executeSql("select * from order_hudi ").print();
}
}
在這里插入圖片描述
運(yùn)行jar
2.8 提交集群后需要修改的hdfs 上面的路徑
tableEnv.executeSql("CREATE TABLE myslqjoinmysqlhudiSink(\n" +
"id bigint ,\n" +
"name string,\n" +
"age int,\n" +
"birthday STRING,\n" +
"phone STRING,\n" +
"address STRING,\n" +
"ts TIMESTAMP(3),\n" +
"primary key(id) not enforced\n" +
")\n" +
"with(\n" +
"'connector'='hudi',\n" +
"'path'= 'hdfs://192.168.1.161:8020/myslqjoinmysqlhudiSink', \n" +
"'table.type'= 'MERGE_ON_READ',\n" +
"'hoodie.datasource.write.recordkey.field'= 'id', \n" +
"'write.precombine.field'= 'ts',\n" +
"'write.tasks'= '1',\n" +
"'write.rate.limit'= '2000', \n" +
"'compaction.tasks'= '1', \n" +
"'compaction.async.enabled'= 'true',\n" +
"'compaction.trigger.strategy'= 'num_commits',\n" +
"'compaction.delta_commits'= '1',\n" +
"'changelog.enabled'= 'true',\n" +
"'read.streaming.enabled'= 'true',\n" +
"'read.streaming.check-interval'= '3',\n" +
"'hive_sync.enable'= 'true',\n" +
"'hive_sync.mode'= 'hms',\n" +
"'hive_sync.metastore.uris'= 'thrift://node02.com:9083',\n" +
"'hive_sync.jdbc_url'= 'jdbc:hive2://node02.com:10000',\n" +
"'hive_sync.table'= 'myslqjoinmysqlhudiSink',\n" +
"'hive_sync.db'= 'db_hive',\n" +
"'hive_sync.username'= 'root',\n" +
"'hive_sync.password'= '123456',\n" +
"'hive_sync.support_timestamp'= 'true'\n" +
")");
2.9 命令提交
[root@node01 bin]# ./flink run -m 192.168.1.161:8081 -c com.wudl.hudi.sink.MysqlJoinMysqlHuDi /opt/module/jar/flink13.5-hudi-1.0-SNAPSHOT-jar-with-dependencies.jar
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Job has been submitted with JobID 225aba756224502aa9e643d75560ddb9
Job has been submitted with JobID 63a6b9e2bb697a4ce0c3f993c720b534
--------------------------
[root@node01 bin]#
flink 后臺
flink-houtai.png
效果
flink-提交集群cdc-hudi.png