(6)FlinkSQL將kafka數(shù)據寫入到mysql方式一

1.png

這里不展開zookeeper帐萎、kafka安裝配置
(1)首先需要啟動zookeeper和kafka


1.1.png

(2)定義一個kafka生產者

package com.producers;

import com.alibaba.fastjson.JSONObject;
import com.pojo.Event;
import com.pojo.WaterSensor;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.Random;

/**
 * Created by lj on 2022-07-09.
 */
public class Kafaka_Producer {
    public final static String bootstrapServers = "127.0.0.1:9092";

    public static void main(String[] args) {
        Properties props = new Properties();
        //設置Kafka服務器地址
        props.put("bootstrap.servers", bootstrapServers);
        //設置數(shù)據key的序列化處理類
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //設置數(shù)據value的序列化處理類
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        try {
            int i = 0;
            Random r=new Random();   //不傳入種子
            String[] lang = {"flink","spark","hadoop","hive","hbase","impala","presto","superset","nbi"};

            while(true) {
                Thread.sleep(2000);
                WaterSensor waterSensor = new WaterSensor(lang[r.nextInt(lang.length)],i,i);
                i++;

                String msg = JSONObject.toJSONString(waterSensor);
                System.out.println(msg);
                RecordMetadata recordMetadata = producer.send(new ProducerRecord<>("kafka_data_waterSensor", null, null,  msg)).get();
//                System.out.println("recordMetadata: {"+ recordMetadata +"}");
            }

        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }
}

(3)定義一個消息對象

package com.pojo;

import java.io.Serializable;

/**
 * Created by lj on 2022-07-05.
 */
public class WaterSensor implements Serializable {
    private String id;
    private long ts;
    private int vc;

    public WaterSensor(){

    }

    public WaterSensor(String id,long ts,int vc){
        this.id = id;
        this.ts = ts;
        this.vc = vc;
    }

    public int getVc() {
        return vc;
    }

    public void setVc(int vc) {
        this.vc = vc;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public long getTs() {
        return ts;
    }

    public void setTs(long ts) {
        this.ts = ts;
    }
}

(4)從kafka接入數(shù)據叫潦,并寫入到mysql

public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //讀取kafka的數(shù)據
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");

        DataStreamSource<String> streamSource = env.addSource(
                new FlinkKafkaConsumer<String>(
                        "kafka_waterSensor",
                        new SimpleStringSchema(),
                        properties)
        );

        SingleOutputStreamOperator<WaterSensor> waterDS = streamSource.map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor map(String s) throws Exception {
                JSONObject json  = (JSONObject)JSONObject.parse(s);
                return new WaterSensor(json.getString("id"),json.getLong("ts"),json.getInteger("vc"));
            }
        });

        // 將流轉化為表
        Table table = tableEnv.fromDataStream(waterDS,
                $("id"),
                $("ts"),
                $("vc"),
                $("pt").proctime());

        tableEnv.createTemporaryView("EventTable", table);


        tableEnv.executeSql("CREATE TABLE flinksink (" +
                "componentname STRING," +
                "componentcount BIGINT NOT NULL," +
                "componentsum BIGINT" +
                ") WITH (" +
                "'connector.type' = 'jdbc'," +
                "'connector.url' = 'jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai'," +
                "'connector.table' = 'flinksink'," +
                "'connector.driver' =  'com.mysql.cj.jdbc.Driver'," +
                "'connector.username' = 'root'," +
                "'connector.password' = 'root'," +
                "'connector.write.flush.max-rows'='3'\r\n" +
                ")"
        );
        Table mysql_user = tableEnv.from("flinksink");
        mysql_user.printSchema();

        Table result = tableEnv.sqlQuery(
                "SELECT " +
                        "id as componentname, " +                //window_start, window_end,
                        "COUNT(ts) as componentcount ,SUM(ts) as componentsum " +
                        "FROM TABLE( " +
                        "TUMBLE( TABLE EventTable , " +
                        "DESCRIPTOR(pt), " +
                        "INTERVAL '10' SECOND)) " +
                        "GROUP BY id , window_start, window_end"
        );

        //方式一:寫入數(shù)據庫
//        result.executeInsert("flinksink").print(); //;.insertInto("flinksink");

        //方式二:寫入數(shù)據庫
        tableEnv.createTemporaryView("ResultTable", result);
        tableEnv.executeSql("insert into flinksink SELECT * FROM ResultTable").print();

//        tableEnv.toAppendStream(result, Row.class).print("toAppendStream");           //追加模式
        env.execute();

    }

(5)效果演示


2.png
3.png
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末绘雁,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子胚鸯,更是在濱河造成了極大的恐慌,老刑警劉巖,帶你破解...
    沈念sama閱讀 207,113評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件伙菊,死亡現(xiàn)場離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機镜硕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,644評論 2 381
  • 文/潘曉璐 我一進店門运翼,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人兴枯,你說我怎么就攤上這事血淌。” “怎么了财剖?”我有些...
    開封第一講書人閱讀 153,340評論 0 344
  • 文/不壞的土叔 我叫張陵江兢,是天一觀的道長。 經常有香客問我健蕊,道長程帕,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,449評論 1 279
  • 正文 為了忘掉前任瞳氓,我火速辦了婚禮策彤,結果婚禮上,老公的妹妹穿的比我還像新娘匣摘。我一直安慰自己店诗,他們只是感情好,可當我...
    茶點故事閱讀 64,445評論 5 374
  • 文/花漫 我一把揭開白布音榜。 她就那樣靜靜地躺著庞瘸,像睡著了一般。 火紅的嫁衣襯著肌膚如雪赠叼。 梳的紋絲不亂的頭發(fā)上擦囊,一...
    開封第一講書人閱讀 49,166評論 1 284
  • 那天,我揣著相機與錄音嘴办,去河邊找鬼瞬场。 笑死,一個胖子當著我的面吹牛涧郊,可吹牛的內容都是我干的贯被。 我是一名探鬼主播,決...
    沈念sama閱讀 38,442評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼妆艘,長吁一口氣:“原來是場噩夢啊……” “哼彤灶!你這毒婦竟也來了?” 一聲冷哼從身側響起批旺,我...
    開封第一講書人閱讀 37,105評論 0 261
  • 序言:老撾萬榮一對情侶失蹤幌陕,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后汽煮,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體搏熄,經...
    沈念sama閱讀 43,601評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡茅诱,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,066評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了搬卒。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片瑟俭。...
    茶點故事閱讀 38,161評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖契邀,靈堂內的尸體忽然破棺而出摆寄,到底是詐尸還是另有隱情,我是刑警寧澤坯门,帶...
    沈念sama閱讀 33,792評論 4 323
  • 正文 年R本政府宣布微饥,位于F島的核電站,受9級特大地震影響古戴,放射性物質發(fā)生泄漏欠橘。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,351評論 3 307
  • 文/蒙蒙 一现恼、第九天 我趴在偏房一處隱蔽的房頂上張望肃续。 院中可真熱鬧,春花似錦叉袍、人聲如沸始锚。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,352評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽瞧捌。三九已至,卻和暖如春润文,著一層夾襖步出監(jiān)牢的瞬間姐呐,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,584評論 1 261
  • 我被黑心中介騙來泰國打工典蝌, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留曙砂,地道東北人。 一個月前我還...
    沈念sama閱讀 45,618評論 2 355
  • 正文 我出身青樓赠法,卻偏偏與公主長得像麦轰,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子砖织,可洞房花燭夜當晚...
    茶點故事閱讀 42,916評論 2 344

推薦閱讀更多精彩內容