Flink 版本數(shù)據(jù)湖(hudi)實(shí)時數(shù)倉

1.架構(gòu)圖

flink-數(shù)倉.png

2.實(shí)現(xiàn)實(shí)例

2.1 通過flink cdc 的兩張表 合并 成一張視圖, 同時寫入到數(shù)據(jù)湖(hudi) 中 同時寫入到kafka 中

flink-hudishili.png

2.2 實(shí)現(xiàn)思路

1.在flinksql 中創(chuàng)建flink cdc 表
2.創(chuàng)建視圖(用兩張表關(guān)聯(lián)后需要的列的結(jié)果顯示為一張速度)
3.創(chuàng)建輸出表,關(guān)聯(lián)Hudi表锣枝,并且自動同步到Hive表
4.查詢視圖數(shù)據(jù)锯仪,插入到輸出表 -- flink  后臺實(shí)時執(zhí)行

2.3pom 文件需要的類

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>wudl-hudi</artifactId>
        <groupId>wudl-hudi</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>flink13.5-hudi</artifactId>


    <!-- 指定倉庫位置流译,依次為aliyun黍析、apache和cloudera倉庫 -->
    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>apache</id>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>

        <repository>
            <id>spring-plugin</id>
            <url>https://repo.spring.io/plugins-release/</url>
        </repository>

    </repositories>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <flink.version>1.13.5</flink.version>
        <hadoop.version>2.7.3</hadoop.version>
        <mysql.version>8.0.16</mysql.version>
        <flink-mysql-cdc>2.0.2</flink-mysql-cdc>
    </properties>
    <dependencies>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc_2.12</artifactId>
            <version>1.10.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>${flink-mysql-cdc}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.78</version>
        </dependency>


        <!-- Flink Client -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Flink Table API & SQL -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-flink-bundle_${scala.binary.version}</artifactId>
            <version>0.10.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>2.7.5-10.0</version>
        </dependency>

        <!-- MySQL/FastJson/lombok -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>

        <!-- slf4j及l(fā)og4j -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

2.4 代碼實(shí)現(xiàn)

package com.wudl.hudi.sink;

//import org.apache.flink.api.common.restartstrategy.RestartStrategies;
//import org.apache.flink.runtime.state.filesystem.FsStateBackend;
//import org.apache.flink.streaming.api.CheckpointingMode;
//import org.apache.flink.streaming.api.environment.CheckpointConfig;
//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
//import org.apache.flink.table.api.EnvironmentSettings;
//import org.apache.flink.table.api.Table;
//import org.apache.flink.table.api.TableResult;
//import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
//
//import static org.apache.flink.table.api.Expressions.$;

import com.wudl.hudi.utils.MyKafkaUtil;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @author :wudl
 * @date :Created in 2022-02-19 22:18
 * @description:
 * @modified By:
 * @version: 1.0
 */

public class MysqlJoinMysqlHuDi {
    public static void main(String[] args) throws Exception {
        // 1-獲取表執(zhí)行環(huán)境getExecutionEnvironment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // TODO: 由于增量將數(shù)據(jù)寫入到Hudi表严肪,所以需要啟動Flink Checkpoint檢查點(diǎn)
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);


        // 1.1 開啟CK
        env.enableCheckpointing(5000L);
        env.getCheckpointConfig().setCheckpointTimeout(10000L);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //正常Cancel任務(wù)時,保留最后一次CK
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //重啟策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
        //狀態(tài)后端
        env.setStateBackend(new FsStateBackend("hdfs://192.168.1.161:8020/flink-hudi/ck"));
        //設(shè)置訪問HDFS的用戶名
        System.setProperty("HADOOP_USER_NAME", "root");

        // 2-創(chuàng)建輸入表姨涡,TODO:從Kafka消費(fèi)數(shù)據(jù)
        tableEnv.executeSql(
                "CREATE TABLE IF NOT EXISTS   source_mysql ( " +
                        "  id BIGINT  primary key NOT ENFORCED ," +
                        "  name string," +
                        "  age int ," +
                        "  birthday TIMESTAMP(3)," +
                        "  ts TIMESTAMP(3)" +
                        ") WITH ( " +
                        " 'connector' = 'mysql-cdc', " +
                        " 'hostname' = '192.168.1.162', " +
                        " 'port' = '3306', " +
                        " 'username' = 'root', " +
                        " 'password' = '123456', " +
                        " 'server-time-zone' = 'Asia/Shanghai', " +
                        " 'scan.startup.mode' = 'initial', " +
                        " 'database-name' = 'wudldb', " +
                        " 'table-name' = 'Flink_cdc' " +
                        " )");

        tableEnv.executeSql(
                "CREATE TABLE IF NOT EXISTS   source_mysql_Flink_cdd ( " +
                        "  id BIGINT  primary key NOT ENFORCED ," +
                        "  phone string," +
                        "  address string ," +
                        "  ts TIMESTAMP(3)" +
                        ") WITH ( " +
                        " 'connector' = 'mysql-cdc', " +
                        " 'hostname' = '192.168.1.162', " +
                        " 'port' = '3306', " +
                        " 'username' = 'root', " +
                        " 'password' = '123456', " +
                        " 'server-time-zone' = 'Asia/Shanghai', " +
                        " 'scan.startup.mode' = 'initial', " +
                        " 'database-name' = 'wudldb', " +
                        " 'table-name' = 'Flink_cdd' " +
                        " )");
        String joinSql = "SELECT b.id id,b.name name,b.age age,CAST(b.birthday as STRING) birthday ,a.phone phone,a.address address,CAST(a.ts AS STRING) ts FROM source_mysql_Flink_cdd  a INNER JOIN   source_mysql b ON a.id = b.id";
        Table tableMysqlJoin = tableEnv.sqlQuery(joinSql);


// 4-創(chuàng)建輸出表衩藤,TODO: 關(guān)聯(lián)到Hudi表,指定Hudi表名稱涛漂,存儲路徑赏表,字段名稱等等信息
        tableEnv.createTemporaryView("viewFlinkCdc",tableMysqlJoin);     
        tableEnv.executeSql(
                "CREATE TABLE myslqjoinmysqlhudiSink (" +
                        " id BIGINT PRIMARY KEY NOT ENFORCED," +
                        " name STRING," +
                        " age INT," +
                        " birthday STRING," +
                        " phone STRING," +
                        " address STRING," +
                        " ts STRING" +
                        ")" +
                        "WITH (" +
                        " 'connector' = 'hudi'," +
                        " 'path' = 'file:///D:/myslqjoinmysqlhudiSink'," +
//                      " 'path' = 'hdfs://192.168.1.161:8020/hudi-warehouse/order_hudi_sink' ,\n" +
                        " 'table.type' = 'MERGE_ON_READ'," +
                        " 'write.operation' = 'upsert'," +
                        " 'hoodie.datasource.write.recordkey.field'= 'id'," +
                        " 'write.precombine.field' = 'ts'," +
                        " 'write.tasks'= '1'" +
                        ")"
        );
        TableResult kafkaSink = tableEnv.executeSql(
                "CREATE TABLE flinkCdc_kafka_Sink (" +
                        "  id BIGINT NOT NULL," +
                        "  name STRING," +
                        "  age INT," +
                        "  birthday STRING," +
                        "  phone STRING," +
                        "  address STRING," +
                        "  ts STRING" +
                        ") WITH (" +
                        "  'connector' = 'kafka'," +
                        "  'topic' = 'sinktest'," +
                        "  'scan.startup.mode' = 'earliest-offset', "+
                        "  'properties.bootstrap.servers' = '192.168.1.161:6667'," +
                        "  'format' = 'debezium-json'," +
                        "    'debezium-json.ignore-parse-errors'='true' " +
                        ")"
        );

//        // 5-通過子查詢方式,將數(shù)據(jù)寫入輸出表
        tableEnv.executeSql(
                "INSERT INTO myslqjoinmysqlhudiSink " +
                        "SELECT id,name,age,birthday,phone,address, ts FROM viewFlinkCdc"
        );


            tableEnv.sqlQuery("select * from flinkCdc_kafka_Sink").printSchema();
            tableEnv.sqlQuery("select * from viewFlinkCdc").printSchema();


        tableEnv.executeSql("insert into flinkCdc_kafka_Sink  SELECT b.id id,b.name name,b.age age,CAST(b.birthday as STRING) birthday ,a.phone phone,a.address address,CAST(a.ts AS STRING) ts FROM source_mysql_Flink_cdd  a INNER JOIN   source_mysql b ON a.id = b.id ");
      //  tableEnv.executeSql("insert into myslqjoinmysqlhudiSink  SELECT b.id id,b.name name,b.age age,CAST(b.birthday as STRING) birthday ,a.phone phone,a.address address,CAST(a.ts AS STRING) ts FROM source_mysql_Flink_cdd  a INNER JOIN   source_mysql b ON a.id = b.id ");
     //   tableEnv.executeSql("insert into flinkCdc_kafka_Sink  select id,name,age,CAST(birthday as STRING) birthday ,phone, address,CAST(ts AS STRING) ts  from myslqjoinmysqlhudiSink ");

//        tableEnv.executeSql("insert into flinkcdc_hudi_sink  select id,name,age,CAST(birthday as STRING) birthday,  CAST(ts as STRING) ts  from source_mysql ");
        System.out.println("--------------------------");


    }
}

2.5 mysql 表結(jié)構(gòu)

CREATE TABLE `Flink_cdc` (
  `id` bigint(64) NOT NULL AUTO_INCREMENT,
  `name` varchar(64) DEFAULT NULL,
  `age` int(20) DEFAULT NULL,
  `birthday` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7604 DEFAULT CHARSET=utf8mb4
#*********************************************************************************
CREATE TABLE `Flink_cdd` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `phone` varchar(20) DEFAULT NULL,
  `address` varchar(200) DEFAULT NULL,
  `ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7417 DEFAULT CHARSET=utf8mb4

2.6mysql 代碼實(shí)現(xiàn)

package com.wudl.hudi.source;

import com.alibaba.fastjson.JSON;
import com.wudl.hudi.entity.FlinkCdcBean;
import com.wudl.hudi.entity.Order;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.text.SimpleDateFormat;
import java.util.*;

/**
 * @author :wudl
 * @date :Created in 2022-02-19 14:24
 * @description:
 * @modified By:
 * @version: 1.0
 */

public class GenerateMysqlFlinkCdcBean implements SourceFunction<FlinkCdcBean> {

    private boolean isRunning = true;
    String[] citys = {"北京", "廣東", "山東", "江蘇", "河南", "上海", "河北", "浙江", "香港", "山西", "陜西", "湖南", "重慶", "福建", "天津", "云南", "四川", "廣西", "安徽", "海南", "江西", "湖北", "山西", "遼寧", "內(nèi)蒙古"};
    Integer i = 0;
    List<Order> list = new ArrayList<>();

    @Override
    public void run(SourceContext<FlinkCdcBean> ctx) throws Exception {
        Random random = new Random();
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        while (isRunning) {
            int number = random.nextInt(4) + 1;

            String name = getChineseName();

            String address = citys[random.nextInt(citys.length)];
            int age = random.nextInt(25);
            String birthday = getDate();
            String phone = getTel();
            java.sql.Timestamp ts = new java.sql.Timestamp(df.parse(getDate()).getTime());
            FlinkCdcBean flinkCdcBean = new FlinkCdcBean(name, age, birthday, ts, phone, address);
            ctx.collect(flinkCdcBean);

        }
    }

    /**
     * 獲取當(dāng)前時間
     *
     * @return
     */
    public static String getDate() throws InterruptedException {
        Calendar calendar = Calendar.getInstance();
        Date date = calendar.getTime();
        String dataStr = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(date);
        Thread.sleep(10);
        return dataStr;
    }

    public static int getNum(int start, int end) {
        return (int) (Math.random() * (end - start + 1) + start);
    }

    private static String[] telFirst = "134,135,136,137,138,139,150,151,152,157,158,159,130,131,132,155,156,133,153".split(",");

    /**
     * 獲取手機(jī)號碼
     *
     * @return
     */
    private static String getTel() {
        int index = getNum(0, telFirst.length - 1);
        String first = telFirst[index];
        String second = String.valueOf(getNum(1, 888) + 10000).substring(1);
        String third = String.valueOf(getNum(1, 9100) + 10000).substring(1);
        return first + second + third;
    }

    @Override
    public void cancel() {
        isRunning = false;
    }


    private static String firstName="趙錢孫李周吳鄭王馮陳褚衛(wèi)蔣沈韓楊朱秦尤許何呂施張孔曹嚴(yán)華金魏陶姜戚謝鄒喻柏水竇章云蘇潘葛奚范彭郎魯韋昌馬苗鳳花方俞任袁柳酆鮑史唐費(fèi)廉岑薛雷賀倪湯滕殷羅畢郝鄔安常樂于時傅皮卞齊康伍余元卜顧孟平黃和穆蕭尹姚邵湛汪祁毛禹狄米貝明臧計伏成戴談宋茅龐熊紀(jì)舒屈項(xiàng)祝董梁杜阮藍(lán)閔席季麻強(qiáng)賈路婁危江童顏郭梅盛林刁鐘徐邱駱高夏蔡田樊胡凌霍虞萬支柯咎管盧莫經(jīng)房裘繆干解應(yīng)宗宣丁賁鄧郁單杭洪包諸左石崔吉鈕龔程嵇邢滑裴陸榮翁荀羊於惠甄魏加封芮羿儲靳汲邴糜松井段富巫烏焦巴弓牧隗山谷車侯宓蓬全郗班仰秋仲伊宮寧仇欒暴甘鈄厲戎祖武符劉姜詹束龍葉幸司韶郜黎薊薄印宿白懷蒲臺從鄂索咸籍賴卓藺屠蒙池喬陰郁胥能蒼雙聞莘黨翟譚貢勞逄姬申扶堵冉宰酈雍卻璩桑桂濮牛壽通邊扈燕冀郟浦尚農(nóng)溫別莊晏柴瞿閻充慕連茹習(xí)宦艾魚容向古易慎戈廖庚終暨居衡步都耿滿弘匡國文寇廣祿闕東毆殳沃利蔚越夔隆師鞏厙聶晁勾敖融冷訾辛闞那簡饒空曾毋沙乜養(yǎng)鞠須豐巢關(guān)蒯相查后江紅游竺權(quán)逯蓋益桓公萬俟司馬上官歐陽夏侯諸葛聞人東方赫連皇甫尉遲公羊澹臺公冶宗政濮陽淳于仲孫太叔申屠公孫樂正軒轅令狐鐘離閭丘長孫慕容鮮于宇文司徒司空亓官司寇仉督子車顓孫端木巫馬公西漆雕樂正壤駟公良拓拔夾谷宰父谷粱晉楚閻法汝鄢涂欽段干百里東郭南門呼延歸海羊舌微生岳帥緱亢況后有琴梁丘左丘東門西門商牟佘佴伯賞南宮墨哈譙笪年愛陽佟第五言福百家姓續(xù)";
    private static String girl="秀娟英華慧巧美娜靜淑惠珠翠雅芝玉萍紅娥玲芬芳燕彩春菊蘭鳳潔梅琳素云蓮真環(huán)雪榮愛妹霞香月鶯媛艷瑞凡佳嘉瓊勤珍貞莉桂娣葉璧璐婭琦晶妍茜秋珊莎錦黛青倩婷姣婉嫻瑾穎露瑤怡嬋雁蓓紈儀荷丹蓉眉君琴蕊薇菁夢嵐苑婕馨瑗琰韻融園藝詠卿聰瀾純毓悅昭冰爽琬茗羽希寧欣飄育瀅馥筠柔竹靄凝曉歡霄楓蕓菲寒伊亞宜可姬舒影荔枝思麗 ";
    private static String boy="偉剛勇毅俊峰強(qiáng)軍平保東文輝力明永健世廣志義興良海山仁波寧貴福生龍?jiān)珖鴦賹W(xué)祥才發(fā)武新利清飛彬富順信子杰濤昌成康星光天達(dá)安巖中茂進(jìn)林有堅(jiān)和彪博誠先敬震振壯會思群豪心邦承樂紹功松善厚慶磊民友裕河哲江超浩亮政謙亨奇固之輪翰朗伯宏言若鳴朋斌梁棟維啟克倫翔旭鵬澤晨辰士以建家致樹炎德行時泰盛雄琛鈞冠策騰楠榕風(fēng)航弘";
    /**
     * 返回中文姓名
     */
    private static String name_sex = "";
    private static String getChineseName() {
        int index=getNum(0, firstName.length()-1);
        String first=firstName.substring(index, index+1);
        int sex=getNum(0,1);
        String str=boy;
        int length=boy.length();
        if(sex==0){
            str=girl;
            length=girl.length();
            name_sex = "女";
        }else {
            name_sex="男";
        }
        index=getNum(0,length-1);
        String second=str.substring(index, index+1);
        int hasThird=getNum(0,1);
        String third="";
        if(hasThird==1){
            index=getNum(0,length-1);
            third=str.substring(index, index+1);
        }
        return first+second+third;
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<FlinkCdcBean> addSource = env.addSource(new GenerateMysqlFlinkCdcBean());
        addSource.print();

        Thread.sleep(5000);
        addSource.addSink(new MysqlJdbcSink());
        env.execute();
    }

}

package com.wudl.hudi.source;

import com.wudl.hudi.entity.FlinkCdcBean;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

/**
 * @author :wudl
 * @date :Created in 2022-02-19 15:08
 * @description:
 * @modified By:
 * @version: 1.0
 */

public class MysqlJdbcSink extends RichSinkFunction<FlinkCdcBean> {
    // 聲明連接和預(yù)編譯語句
    Connection connection = null;
    PreparedStatement insertStmtFlink_cdc = null;
    PreparedStatement insertStmtFlink_cdd = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        connection = DriverManager.getConnection("jdbc:mysql://192.168.1.162:3306/wudldb?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai", "root", "123456");
        insertStmtFlink_cdc = connection.prepareStatement("INSERT INTO `wudldb`.`Flink_cdc`(NAME,age,birthday,ts) VALUES(?,?,?,?) ");
        insertStmtFlink_cdd = connection.prepareStatement("INSERT INTO  `wudldb`.`Flink_cdd` (phone,address,ts) VALUES(?,?,?)  ");

    }

    // 每來一條數(shù)據(jù)匈仗,調(diào)用連接瓢剿,執(zhí)行sql
    @Override
    public void invoke(FlinkCdcBean fc, Context context) throws Exception {
/*****************    */
        insertStmtFlink_cdc.setString(1, fc.getName());
        insertStmtFlink_cdc.setInt(2, fc.getAge());
        insertStmtFlink_cdc.setString(3, fc.getTs().toString());
        insertStmtFlink_cdc.setString(4, fc.getTs().toString());
        insertStmtFlink_cdc.execute();

        insertStmtFlink_cdd.setString(1, fc.getPhone());
        insertStmtFlink_cdd.setString(2, fc.getAddress());
        insertStmtFlink_cdd.setString(3, fc.getTs().toString());
        insertStmtFlink_cdd.executeUpdate();
    }

    @Override
    public void close() throws Exception {
        insertStmtFlink_cdc.close();
        insertStmtFlink_cdd.close();
        connection.close();
    }
}

2.7 讀取hudi 數(shù)據(jù)

package com.wudl.hudi.sink;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @author :wudl
 * @date :Created in 2022-02-19 22:18
 * @description:
 * @modified By:
 * @version: 1.0
 */

public class MysqlJoinMysqlHuDiRead {
    public static void main(String[] args) throws Exception {
        // 1-獲取表執(zhí)行環(huán)境getExecutionEnvironment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // TODO: 由于增量將數(shù)據(jù)寫入到Hudi表,所以需要啟動Flink Checkpoint檢查點(diǎn)
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql(
                "CREATE TABLE order_hudi(\n" +
                        " id BIGINT PRIMARY KEY NOT ENFORCED," +
                        " name STRING," +
                        " age INT," +
                        " birthday STRING," +
                        " phone STRING," +
                        " address STRING," +
                        " ts STRING" +
                        ")" +
                        "WITH (" +
                        "    'connector' = 'hudi'," +
                        "    'path' = 'file:///D:/myslqjoinmysqlhudiSink'," +
                        "    'table.type' = 'MERGE_ON_READ'," +
                        "    'read.streaming.enabled' = 'true'," +
                        "    'read.streaming.check-interval' = '4'" +
                        ")"
        );
        tableEnv.executeSql("select * from  order_hudi ").print();


    }
}

在這里插入圖片描述

運(yùn)行jar

2.8 提交集群后需要修改的hdfs 上面的路徑

 tableEnv.executeSql("CREATE TABLE myslqjoinmysqlhudiSink(\n" +
                "id bigint ,\n" +
                "name string,\n" +
                "age int,\n" +
                "birthday STRING,\n" +
                "phone STRING,\n" +
                "address STRING,\n" +
                "ts TIMESTAMP(3),\n" +
                "primary key(id) not enforced\n" +
                ")\n" +
                "with(\n" +
                "'connector'='hudi',\n" +
                "'path'= 'hdfs://192.168.1.161:8020/myslqjoinmysqlhudiSink', \n" +
                "'table.type'= 'MERGE_ON_READ',\n" +
                "'hoodie.datasource.write.recordkey.field'= 'id', \n" +
                "'write.precombine.field'= 'ts',\n" +
                "'write.tasks'= '1',\n" +
                "'write.rate.limit'= '2000', \n" +
                "'compaction.tasks'= '1', \n" +
                "'compaction.async.enabled'= 'true',\n" +
                "'compaction.trigger.strategy'= 'num_commits',\n" +
                "'compaction.delta_commits'= '1',\n" +
                "'changelog.enabled'= 'true',\n" +
                "'read.streaming.enabled'= 'true',\n" +
                "'read.streaming.check-interval'= '3',\n" +
                "'hive_sync.enable'= 'true',\n" +
                "'hive_sync.mode'= 'hms',\n" +
                "'hive_sync.metastore.uris'= 'thrift://node02.com:9083',\n" +
                "'hive_sync.jdbc_url'= 'jdbc:hive2://node02.com:10000',\n" +
                "'hive_sync.table'= 'myslqjoinmysqlhudiSink',\n" +
                "'hive_sync.db'= 'db_hive',\n" +
                "'hive_sync.username'= 'root',\n" +
                "'hive_sync.password'= '123456',\n" +
                "'hive_sync.support_timestamp'= 'true'\n" +
                ")");

2.9 命令提交

[root@node01 bin]# ./flink run -m 192.168.1.161:8081 -c com.wudl.hudi.sink.MysqlJoinMysqlHuDi /opt/module/jar/flink13.5-hudi-1.0-SNAPSHOT-jar-with-dependencies.jar 
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Job has been submitted with JobID 225aba756224502aa9e643d75560ddb9
Job has been submitted with JobID 63a6b9e2bb697a4ce0c3f993c720b534
--------------------------
[root@node01 bin]# 

flink 后臺


flink-houtai.png

效果


flink-提交集群cdc-hudi.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末悠轩,一起剝皮案震驚了整個濱河市间狂,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌火架,老刑警劉巖鉴象,帶你破解...
    沈念sama閱讀 222,183評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異何鸡,居然都是意外死亡炼列,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,850評論 3 399
  • 文/潘曉璐 我一進(jìn)店門音比,熙熙樓的掌柜王于貴愁眉苦臉地迎上來俭尖,“玉大人,你說我怎么就攤上這事』纾” “怎么了焰望?”我有些...
    開封第一講書人閱讀 168,766評論 0 361
  • 文/不壞的土叔 我叫張陵,是天一觀的道長已亥。 經(jīng)常有香客問我熊赖,道長,這世上最難降的妖魔是什么虑椎? 我笑而不...
    開封第一講書人閱讀 59,854評論 1 299
  • 正文 為了忘掉前任震鹉,我火速辦了婚禮,結(jié)果婚禮上捆姜,老公的妹妹穿的比我還像新娘传趾。我一直安慰自己,他們只是感情好泥技,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,871評論 6 398
  • 文/花漫 我一把揭開白布浆兰。 她就那樣靜靜地躺著,像睡著了一般珊豹。 火紅的嫁衣襯著肌膚如雪簸呈。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,457評論 1 311
  • 那天店茶,我揣著相機(jī)與錄音蜕便,去河邊找鬼。 笑死贩幻,一個胖子當(dāng)著我的面吹牛玩裙,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播段直,決...
    沈念sama閱讀 40,999評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼溶诞!你這毒婦竟也來了鸯檬?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,914評論 0 277
  • 序言:老撾萬榮一對情侶失蹤螺垢,失蹤者是張志新(化名)和其女友劉穎喧务,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體枉圃,經(jīng)...
    沈念sama閱讀 46,465評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡功茴,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,543評論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了孽亲。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片坎穿。...
    茶點(diǎn)故事閱讀 40,675評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出玲昧,到底是詐尸還是另有隱情栖茉,我是刑警寧澤,帶...
    沈念sama閱讀 36,354評論 5 351
  • 正文 年R本政府宣布孵延,位于F島的核電站吕漂,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏尘应。R本人自食惡果不足惜惶凝,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,029評論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望犬钢。 院中可真熱鬧苍鲜,春花似錦、人聲如沸娜饵。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,514評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽箱舞。三九已至遍坟,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間晴股,已是汗流浹背愿伴。 一陣腳步聲響...
    開封第一講書人閱讀 33,616評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留电湘,地道東北人隔节。 一個月前我還...
    沈念sama閱讀 49,091評論 3 378
  • 正文 我出身青樓,卻偏偏與公主長得像寂呛,于是被迫代替她去往敵國和親怎诫。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,685評論 2 360

推薦閱讀更多精彩內(nèi)容