自定義sourceAndsink

連接kafka獲取數(shù)據(jù)

//flink流處理
StreamExecutionEnvironment exe = StreamExecutionEnvironment.getExecutionEnvironment();
        //將Properties創(chuàng)建出來(lái)
        Properties properties = new Properties();
        //將配置信息set到Properties
        properties.setProperty("bootstrap.servers","node4:9092");
        properties.setProperty("group.id","1");
        //使用FlinkKafkaConsumer011(自己的kafka版本[參數(shù)](z'ji't'y))
        FlinkKafkaConsumer011<String> jk = new FlinkKafkaConsumer011<>("jk", new SimpleStringSchema(),properties);
        //
        DataStreamSource<String> stringDataStreamSource = exe.addSource(jk);

MySQLsource

                      使用RichSourceFunction自定義source
public class MySqlSource extends RichSourceFunction<Tuple4<String, String, Double, Long>> {

    Connection conn;
    Statement stat;
    boolean flag = true;


    @Override
    //open內(nèi)寫(xiě)連接方法
    public void open(Configuration parameters) throws Exception {
        //類(lèi)反射mysqlDriver
        Class.forName("com.mysql.jdbc.Driver");
        //mysql的連接方式用戶(hù)密碼
        conn = DriverManager.getConnection("jdbc:mysql://node4:3306/1704e", "root", "123456");
        stat = conn.createStatement();
    }

    @Override
    //run內(nèi)寫(xiě)具體的邏輯
    public void run(SourceContext<Tuple4<String, String, Double, Long>> ctx) throws Exception {
        while (flag) {
            ResultSet resultSet = stat.executeQuery("select * from t_dev where dev_state = 0");
            StringBuffer sb = new StringBuffer("(");
            int count = 0;
            while (resultSet.next()){
                count ++;
                long id = resultSet.getLong(1);
                String devId = resultSet.getString(2);
                String metric = resultSet.getString(3);
                double value = resultSet.getDouble(4);
                long timestamp = resultSet.getLong(5);
                sb.append(id+",");
                ctx.collect(Tuple4.of(devId, metric, value,timestamp));
            }
            String ids = sb.toString();
            ids = ids.substring(0, ids.length() - 1);
            ids = ids + ")";
            if(count != 0){
                String updateSql = "update t_dev set dev_state = 1 where id in "+ids;
                System.out.println(updateSql);
                stat.execute(updateSql);
            }
            Thread.sleep(10000);
        }
    }

    @Override
    public void cancel() {
        flag = false;
    }

    @Override
    public void close() throws Exception {
        if(conn != null){
            conn.close();
        }
    }
}

HBaseSink

                          //繼承RichSinkFunction
public class HBaseSink extends RichSinkFunction<Tuple6<String, String, Double, Double, Double, Long>> {

    private org.apache.hadoop.conf.Configuration conf;
    private Connection conn;
    private Table table;
    private SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");


    @Override
    //open內(nèi)寫(xiě)連接方法
    public void open(Configuration parameters) throws Exception {
        //創(chuàng)建Hbase連接
        conf = HBaseConfiguration.create();
        //配置信息
        conf.set("hbase.zookeeper.quorum","node4:2181");
        System.out.println("long...........");
        conn = ConnectionFactory.createConnection(conf);
        System.out.println("ok....................");
        //獲取表
        table = conn.getTable(TableName.valueOf("ns1:t_dev_data"));
    }

    @Override
    //invoke 具有邏輯實(shí)現(xiàn)
    public void invoke(Tuple6<String, String, Double, Double, Double, Long> value, Context context) throws Exception {
        String format = sdf.format(value.f5);
        String rowKeyStr = value.f0 + value.f1 + format;

        Put put = new Put(Bytes.toBytes(rowKeyStr));
        put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("max"),Bytes.toBytes(value.f2));
        put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("min"),Bytes.toBytes(value.f3));
        put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("avg"),Bytes.toBytes(value.f4));

        table.put(put);

    }

    @Override
    public void close() throws Exception {
        if(table != null) {
            table.close();
        }
        if(conn != null){
            conn.close();
        }
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末饺律,一起剝皮案震驚了整個(gè)濱河市撕彤,隨后出現(xiàn)的幾起案子讹躯,更是在濱河造成了極大的恐慌,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,542評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異扩借,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)缤至,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,596評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén)潮罪,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人领斥,你說(shuō)我怎么就攤上這事嫉到。” “怎么了月洛?”我有些...
    開(kāi)封第一講書(shū)人閱讀 158,021評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵何恶,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我膊存,道長(zhǎng)导而,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,682評(píng)論 1 284
  • 正文 為了忘掉前任隔崎,我火速辦了婚禮今艺,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘爵卒。我一直安慰自己虚缎,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,792評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著实牡,像睡著了一般陌僵。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上创坞,一...
    開(kāi)封第一講書(shū)人閱讀 49,985評(píng)論 1 291
  • 那天碗短,我揣著相機(jī)與錄音,去河邊找鬼题涨。 笑死偎谁,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的纲堵。 我是一名探鬼主播巡雨,決...
    沈念sama閱讀 39,107評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼席函!你這毒婦竟也來(lái)了铐望?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 37,845評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤茂附,失蹤者是張志新(化名)和其女友劉穎正蛙,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體何之,經(jīng)...
    沈念sama閱讀 44,299評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡跟畅,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,612評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片顾犹。...
    茶點(diǎn)故事閱讀 38,747評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡拆讯,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出趁俊,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 34,441評(píng)論 4 333
  • 正文 年R本政府宣布辐赞,位于F島的核電站,受9級(jí)特大地震影響硝训,放射性物質(zhì)發(fā)生泄漏响委。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,072評(píng)論 3 317
  • 文/蒙蒙 一窖梁、第九天 我趴在偏房一處隱蔽的房頂上張望赘风。 院中可真熱鬧,春花似錦纵刘、人聲如沸邀窃。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,828評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)瞬捕。三九已至鞍历,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間肪虎,已是汗流浹背劣砍。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,069評(píng)論 1 267
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留扇救,地道東北人秆剪。 一個(gè)月前我還...
    沈念sama閱讀 46,545評(píng)論 2 362
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像爵政,于是被迫代替她去往敵國(guó)和親仅讽。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,658評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容