11-flink讀寫MySQL

一、讀MySQL

1、通過(guò)JDBC方式定義MySQLDataSource類

1.1首先加入JDBC依賴
1.2定義JDBCInputFormat
1.3獲取Row類型的DataStreamSource
1.4轉(zhuǎn)化DataStream<Row>為DataStream<Student>

public class MysqlDataSource {

    private static final Logger log = LoggerFactory.getLogger(MySQLDataSource.class);

    public static DataStream<Student> readFromDb(StreamExecutionEnvironment streamExecutionEnvironment) throws Exception {
        //final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.定義field 類型
        TypeInformation[] fieldTypes = new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO};
        //2.定義field name
        String[] fieldNames = new String[]{"id", "name", "password", "age"};
        //3.定義Row類型
        RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames);
        
        String jdbcUrl = parameterTool.get(PropertiesConstants.MYSQL_JDBC_URL);
        log.info(jdbcUrl);

        //4.定義JDBCInputFormat
        JDBCInputFormat jdbcInputFormat = JDBCInputFormat
                .buildJDBCInputFormat()
                .setDrivername("com.mysql.jdbc.Driver")
                .setDBUrl(jdbcUrl)
                .setUsername(parameterTool.get(PropertiesConstants.MYSQL_USERNAME))
                .setPassword(parameterTool.get(PropertiesConstants.MYSQL_PASSWORD))
                .setQuery("select id, name, password, age from student")
                .setRowTypeInfo(rowTypeInfo)
                .finish();

        //5.以JDBCInputFormat形式讀取MySQL DB數(shù)據(jù)
        DataStreamSource<Row> dataStreamSourceRow = streamExecutionEnvironment.createInput(jdbcInputFormat);

        //階段性驗(yàn)證可以正確讀取
        dataStreamSourceRow.print();

        //6.將Row類型Stream轉(zhuǎn)化為Entity類型
        DataStream<Student> dataStream = dataStreamSourceRow.map(new RichMapFunction<Row, Student>() {
            @Override
            public Student map(Row value) throws Exception {
                Student s = new Student();
                s.setId((Integer) value.getField(0));
                s.setName((String) value.getField(1));
                s.setPassword((String) value.getField(2));
                s.setAge((Integer) value.getField(3));
                return s;
            }
        });

        log.info("read datasource end");
        return dataStream;
}

2鹅很、通過(guò)自定義DataSource方式

  • 實(shí)現(xiàn)RichSourceFunction<T>接口,T設(shè)置DataStream數(shù)據(jù)類型
  • 使用模板
    • open()方法初始化全局使用數(shù)據(jù)(比如PrepareStatement等,可類比構(gòu)造函數(shù)或者junit的的@Before這些)
    • run()方法
      • 一般使用while循環(huán)不斷獲取數(shù)據(jù)
      • while獲取的數(shù)據(jù)需要以流的形式發(fā)送出去麸锉,使用SourceContext.collect(yourData)就好
      • 這里sourceContext收集(collect)的數(shù)據(jù)可以是單條(一條Student)也可是List<Student>集合流昏,使用集合要把RichSourceFunction<T>泛型設(shè)為L(zhǎng)ist<Student>
    • cancel()方法用于停止while循環(huán),即停止獲取數(shù)據(jù)
/**
 * 通過(guò)RichSourceFunction 返回DataStream<Student>類型數(shù)據(jù)流鬓梅,且每隔10s讀取一次MySQL DB
 */
public class JdbcReader2 extends RichSourceFunction<Student> {
    private static final Logger logger = LoggerFactory.getLogger(JdbcReader2.class);

    private Connection connection = null;
    private PreparedStatement ps = null;
    private volatile boolean isRunning = true;

    //該方法主要用于打開數(shù)據(jù)庫(kù)連接供置,下面的ConfigKeys類是獲取配置的類
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        DriverManager.registerDriver(new Driver());
        ParameterTool parameterTool = ExecutionEnvUtil.PARAMETER_TOOL;
        String jdbcUrl = parameterTool.get(PropertiesConstants.MYSQL_JDBC_URL);
        connection = DriverManager.getConnection(jdbcUrl, "root", "abc123456");//獲取連接
        ps = connection.prepareStatement("select id, name, password, age, flag from student where flag='true'");
    }

    //執(zhí)行查詢并獲取結(jié)果
    @Override
    public void run(SourceContext<Student> ctx) throws Exception {

        // List<Student> students = new ArrayList<>();

        try {
            while (isRunning) {
                ResultSet resultSet = ps.executeQuery();
                while (resultSet.next()) {

                    Student student = new Student();
                    student.setId(resultSet.getInt(1));
                    student.setName(resultSet.getString(2));
                    student.setPassword(resultSet.getString(3));
                    student.setAge(resultSet.getInt(4));

                    String flag = resultSet.getString(5);
                    student.setFlag(flag);

                    if (Boolean.parseBoolean(flag)) {
                        //students.add(student);
                        //以單個(gè)Student為單位發(fā)送數(shù)據(jù)
                        ctx.collect(student);//發(fā)送結(jié)果
                        logger.info("student >>>>>>" + student);
                    }
                }

                Thread.sleep(1000 * 5);
            }
        } catch (Exception e) {
            logger.error("runException:{}", e);
        }
    }

    //關(guān)閉數(shù)據(jù)庫(kù)連接
    @Override
    public void cancel() {
        try {
            super.close();
            if (connection != null) {
                connection.close();
            }
            if (ps != null) {
                ps.close();
            }
        } catch (Exception e) {
            logger.error("runException:{}", e);
        }
        isRunning = false;
    }
}

3、兩種方式對(duì)比

  • JDBC需要引入單獨(dú)的依賴绽快,自定義DataSource方式無(wú)特殊依賴
  • JDBC不論讀還是寫只能處理批數(shù)據(jù)芥丧,RichSourceFunction還是付接口SourceFunction都是流式接口

二、寫MySQL

1坊罢、通過(guò)JDBC方式

Table API提供通過(guò)JDBC寫MySQL的方式

  • 獲取Table(可以通過(guò)DataStream轉(zhuǎn)化而來(lái))-table
  • 將table注冊(cè)到Environment(作為臨時(shí)view)-tempView
  • 創(chuàng)建inner-dest-table->out-dest-table映射(inner-dest-table是flink內(nèi)部表续担,通過(guò)insert數(shù)據(jù)到inner-dest-table 將數(shù)據(jù)插入到out-dest-table中
       final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //1.添加數(shù)據(jù)源
        DataStream<Student> studentDataStream = env.addSource(new JdbcReader2());


        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                //.useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env, settings);

        //2.從DataStream獲取數(shù)據(jù)
        Table table = streamTableEnvironment.fromDataStream(studentDataStream);

        streamTableEnvironment.createTemporaryView("temp_table", table);

        //3.創(chuàng)建sink內(nèi)部Table
        String destSql = FileUtil.readSourceFile("destination.sql");
        streamTableEnvironment.sqlUpdate(destSql);

        //4.將內(nèi)部Table插入到outer system
        String insertSql = FileUtil.readSourceFile("insert.sql");

        streamTableEnvironment.sqlUpdate(insertSql);


        env.execute("sort-streaming-data");

        log.info("end");
  • sql文件,保存到resources目錄,并用FileUtils讀然詈ⅰ(僅僅是外置SQL而已也可直接寫到代碼中)
#destination.sql
CREATE TABLE student_dest (
                              id INT,
                              name VARCHAR,
                              password VARCHAR,
                              age INT
) WITH (
    'connector.type' = 'jdbc', -- 使用 jdbc connector
    'connector.url' = 'jdbc:mysql://localhost:3306/flink_demo', -- jdbc url
    'connector.table' = 'student_2', -- 表名
    'connector.username' = 'root', -- 用戶名
    'connector.password' = 'abc123456', -- 密碼
    'connector.write.flush.max-rows' = '1' -- 默認(rèn)5000條物遇,為了演示改為1條
)

#insert.sql, temp_table為代碼臨時(shí)table
INSERT INTO student_dest
SELECT
    id,
    name,
    password,
    age
FROM temp_table

2、通過(guò)自定義Sink方式

  • 通過(guò)實(shí)行RichSinkFunction接口
  • 步驟(同RichSourceFunction是一致的)
    • open() 初始化數(shù)據(jù)
    • invoke() 每獲取一次數(shù)據(jù)將其處理存儲(chǔ)到outer system
    • close() 清理及關(guān)閉資源
public class MySQLSink extends RichSinkFunction<Student> {

    private static final Logger log = LoggerFactory.getLogger(MySQLSink.class);

    PreparedStatement ps;
    BasicDataSource dataSource;
    private Connection connection;

    /**
     * open() 方法中建立連接憾儒,這樣不用每次 invoke 的時(shí)候都要建立連接和釋放連接
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        dataSource = new BasicDataSource();
        connection = getConnection(dataSource);
        String sql = "insert into student_2(id, name, password, age) values(?, ?, ?, ?);";
        if (connection != null) {
            ps = this.connection.prepareStatement(sql);
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        //關(guān)閉連接和釋放資源
        if (connection != null) {
            connection.close();
        }
        if (ps != null) {
            ps.close();
        }
    }

    /**
     * 每條數(shù)據(jù)的插入都要調(diào)用一次 invoke() 方法
     *
     * @param value
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(Student value, Context context) throws Exception {
        if (ps == null) {
            return;
        }
        //遍歷數(shù)據(jù)集合
        Student student = value;
        //for (Student student : value) {
            ps.setInt(1, student.getId());
            ps.setString(2, student.getName());
            ps.setString(3, student.getPassword());
            ps.setInt(4, student.getAge());
            ps.addBatch();
        //}
        int[] count = ps.executeBatch();//批量后執(zhí)行
        log.info("成功了插入了 {} 行數(shù)據(jù)", count.length);
    }


    private static Connection getConnection(BasicDataSource dataSource) {
        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
        //注意询兴,替換成自己本地的 mysql 數(shù)據(jù)庫(kù)地址和用戶名、密碼
        dataSource.setUrl("jdbc:mysql://localhost:3306/flink_demo");
        dataSource.setUsername("root");
        dataSource.setPassword("abc123456");
        //設(shè)置連接池的一些參數(shù)
        dataSource.setInitialSize(10);
        dataSource.setMaxTotal(50);
        dataSource.setMinIdle(2);

        Connection con = null;
        try {
            con = dataSource.getConnection();
            log.info("創(chuàng)建連接池:{}", con);
        } catch (Exception e) {
            log.error("-----------mysql get connection has exception , msg = {}", e.getMessage());
        }
        return con;
    }
}

3航夺、jdbc和Sink方式對(duì)比

  • JDBC是用sqlQuery()和sqlUpdate()來(lái)執(zhí)行所有查詢和insert/update更新操作蕉朵,只需要將創(chuàng)建table,insert語(yǔ)句用SQL整理到一起即可,通過(guò)insert語(yǔ)句插入out system
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末阳掐,一起剝皮案震驚了整個(gè)濱河市始衅,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌缭保,老刑警劉巖汛闸,帶你破解...
    沈念sama閱讀 221,548評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異艺骂,居然都是意外死亡诸老,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,497評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門钳恕,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)别伏,“玉大人蹄衷,你說(shuō)我怎么就攤上這事±灏梗” “怎么了愧口?”我有些...
    開封第一講書人閱讀 167,990評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)类茂。 經(jīng)常有香客問(wèn)我耍属,道長(zhǎng),這世上最難降的妖魔是什么巩检? 我笑而不...
    開封第一講書人閱讀 59,618評(píng)論 1 296
  • 正文 為了忘掉前任厚骗,我火速辦了婚禮,結(jié)果婚禮上兢哭,老公的妹妹穿的比我還像新娘领舰。我一直安慰自己,他們只是感情好迟螺,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,618評(píng)論 6 397
  • 文/花漫 我一把揭開白布提揍。 她就那樣靜靜地躺著,像睡著了一般煮仇。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上谎仲,一...
    開封第一講書人閱讀 52,246評(píng)論 1 308
  • 那天浙垫,我揣著相機(jī)與錄音,去河邊找鬼郑诺。 笑死夹姥,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的辙诞。 我是一名探鬼主播辙售,決...
    沈念sama閱讀 40,819評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼飞涂!你這毒婦竟也來(lái)了旦部?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,725評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤较店,失蹤者是張志新(化名)和其女友劉穎士八,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體梁呈,經(jīng)...
    沈念sama閱讀 46,268評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡婚度,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,356評(píng)論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了官卡。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蝗茁。...
    茶點(diǎn)故事閱讀 40,488評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡醋虏,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出哮翘,到底是詐尸還是另有隱情颈嚼,我是刑警寧澤,帶...
    沈念sama閱讀 36,181評(píng)論 5 350
  • 正文 年R本政府宣布忍坷,位于F島的核電站粘舟,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏佩研。R本人自食惡果不足惜柑肴,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,862評(píng)論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望旬薯。 院中可真熱鬧晰骑,春花似錦、人聲如沸绊序。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,331評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)骤公。三九已至抚官,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間阶捆,已是汗流浹背凌节。 一陣腳步聲響...
    開封第一講書人閱讀 33,445評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留洒试,地道東北人倍奢。 一個(gè)月前我還...
    沈念sama閱讀 48,897評(píng)論 3 376
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像垒棋,于是被迫代替她去往敵國(guó)和親卒煞。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,500評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容

  • pyspark.sql模塊 模塊上下文 Spark SQL和DataFrames的重要類: pyspark.sql...
    mpro閱讀 9,464評(píng)論 0 13
  • Table API和SQL通過(guò)join API集成在一起叼架,這個(gè)join API的核心概念是Table畔裕,Table可...
    寫B(tài)ug的張小天閱讀 16,772評(píng)論 0 15
  • 一. Java基礎(chǔ)部分.................................................
    wy_sure閱讀 3,813評(píng)論 0 11
  • 1. 簡(jiǎn)介 1.1 什么是 MyBatis ? MyBatis 是支持定制化 SQL碉碉、存儲(chǔ)過(guò)程以及高級(jí)映射的優(yōu)秀的...
    笨鳥慢飛閱讀 5,527評(píng)論 0 4
  • 「你最富有垢粮、最值錢的東西是什么贴届?」 今天看到這個(gè)問(wèn)題的時(shí)候,我停頓了三秒鐘,我在思考的是我最值錢的是什么毫蚓?是我的腎...
    許先森在這里閱讀 785評(píng)論 0 2