Flume 監(jiān)聽(tīng)Avro客戶端 輸出到Kafka

前言:

本文章適用于在Windows上使用Flume 監(jiān)聽(tīng)Avro Client蕊程,模擬數(shù)據(jù)庫(kù)表的增量同步到Kafka中椒袍。首先確保你的flume-ng可以啟動(dòng),跳過(guò)個(gè)別步驟可自行百度藻茂。

1驹暑、MySQL創(chuàng)建表:

DROP TABLE IF EXISTS `avro`;
CREATE TABLE `avro` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `createdt` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=11 DEFAULT CHARSET=latin1;

INSERT INTO `avro` VALUES ('1', 'a', '2018-11-21 04:00:00');
INSERT INTO `avro` VALUES ('2', 'b', '2018-11-22 05:00:00');
INSERT INTO `avro` VALUES ('3', 'c', '2018-11-23 06:00:00');
INSERT INTO `avro` VALUES ('4', 'd', '2018-11-24 07:00:00');
INSERT INTO `avro` VALUES ('5', 'e', '2018-11-25 08:00:00');
INSERT INTO `avro` VALUES ('6', 'f', '2018-11-26 09:00:00');
INSERT INTO `avro` VALUES ('7', 'g', '2018-11-27 10:00:00');
INSERT INTO `avro` VALUES ('8', 'h', '2018-11-28 11:00:00');
INSERT INTO `avro` VALUES ('9', 'i', '2018-11-29 12:00:00');
INSERT INTO `avro` VALUES ('10', 'j', '2018-11-30 13:56:41');

avro表如圖


image.png

2、Avro 的官網(wǎng)實(shí)例

2.1辨赐、創(chuàng)建Flume Avro Client :(Thrift 同理)

可參見(jiàn)Flume官網(wǎng)實(shí)例

打開(kāi)Eclipse 右擊
src/main/java 新建一個(gè)package org.flume.avro
新建Class MyApp.java

package org.flume.avro;

import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.nio.charset.Charset;

public class MyApp {

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        MyRpcClientFacade client = new MyRpcClientFacade();
        // Initialize client with the remote Flume agent's host and port
        //端口與avro.conf a1.sources.r1.port一致     
        client.init("localhost", 41414);        
        String sampleData = "Hello Flume!";
        for (int i = 0; i < 5; i++) {
          client.sendDataToFlume(sampleData+" " + i);
        }   
        System.out.println("輸出完畢");
        client.cleanUp();
      } 
    }
    
class MyRpcClientFacade {
      private RpcClient client;
      private String hostname;
      private int port;

      public void init(String hostname, int port) {
        // Setup the RPC connection
        this.hostname = hostname;
        this.port = port;
        this.client = RpcClientFactory.getDefaultInstance(hostname, port);      //創(chuàng)建avro客戶端
        // Use the following method to create a thrift client (instead of the above line):
        // this.client = RpcClientFactory.getThriftInstance(hostname, port);    //創(chuàng)建Thrift客戶端
      }

      public void sendDataToFlume(String data) {
        // Create a Flume Event object that encapsulates the sample data
        // 調(diào)用EventBuilder重載的withBody()方法优俘。
        Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));        
        try {
          client.append(event);     // Send the event 發(fā)送數(shù)據(jù)
        } catch (EventDeliveryException e) {
          // clean up and recreate the client 清理并重新創(chuàng)建客戶端
          client.close();
          client = null;
          client = RpcClientFactory.getDefaultInstance(hostname, port);
          // Use the following method to create a thrift client (instead of the above line):
          // this.client = RpcClientFactory.getThriftInstance(hostname, port);
        }
      }

      public void cleanUp() {
        // Close the RPC connection
        client.close();
      }   
}

2.2、配置conf

Flume的conf目錄新建 avro.conf

a1.channels = c1
a1.sources = r1
a1.sinks = k1

a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414    //端口與MyApp.java中的port一致

a1.channels.c1.type = memory

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = avrosrc
a1.sinks.k1.brokerList = localhost:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20

a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

2.3掀序、輸出到Kafka

此處省略Kafka啟動(dòng)步驟帆焕,詳見(jiàn)鏈接
新建Kafka Topic avrosrc

kafka-run-class.bat kafka.admin.TopicCommand --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic avrosrc

查看Topic avrosrc(此時(shí)為空)

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic avrosrc --from-beginning

啟動(dòng)flume-ng

D:\com\apache-flume-1.8.0-bin>flume-ng agent -c conf -f conf/avro.conf -n a1 -property "flume.root.logger=INFO,console"

Eclipse 運(yùn)行 MyApp.java(右鍵 → Run As → Java Application)
此時(shí)觀察 Topic 有數(shù)據(jù)進(jìn)入


image.png

3、Avro 自定義

每秒隨機(jī)讀取數(shù)據(jù)庫(kù)avro表的一條數(shù)據(jù)不恭,并輸出到Kafka叶雹,模擬增量數(shù)據(jù)
修改 MyApp.java

package org.flume.avro;

import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;

import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;

public class MyApp {
    
    static final String DB_URL = "jdbc:mysql://localhost:3306/***";  //輸入DB名稱
    static final String USER = "***";      //DB用戶名
    static final String PASS = "***";    //DB密碼
    
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        MyRpcClientFacade client = new MyRpcClientFacade();     
        client.init("localhost", 41414);
        Connection conn = null;
        Statement stmt = null;    //真實(shí)場(chǎng)景使用PreparedStatement防止SQL注入
        try{            
            Class.forName("com.mysql.jdbc.Driver");                 // 注冊(cè) JDBC 驅(qū)動(dòng)               
            conn = DriverManager.getConnection(DB_URL,USER,PASS);   // 打開(kāi)鏈接   
            client.sendDataToFlume("Connect to db");         
            stmt = conn.createStatement();                          // 執(zhí)行查詢
            for(int i = 0;i < 10;i++){
                int index = (int)(Math.random()*10) + 1;
                String sql = "SELECT * FROM avro where id=" + index; 
                ResultSet rs = stmt.executeQuery(sql);              // 保存到結(jié)果集
                while(rs.next()){
                    int id  = rs.getInt("id");
                    String name = rs.getString("name");
                    Timestamp createdt = rs.getTimestamp("createdt");
                    System.out.print("ID: " + id);
                    System.out.print(", 名稱: " + name);
                    System.out.print(", 創(chuàng)建時(shí)間: " + createdt);
                    System.out.print("\n");
                     //client.sendDataToFlume 發(fā)送數(shù)據(jù)!
                    client.sendDataToFlume("id: " + id + ", name: " + name + ", createdt: " + createdt);   
                }
                rs.close();
                try {
                    Thread.sleep(1000);     //等待一秒,模擬增量場(chǎng)景
                } catch (InterruptedException e) {
                    e.printStackTrace(); 
                }
            }            
            stmt.close();
            conn.close();
        }catch(SQLException se){  // 處理 JDBC 錯(cuò)誤            
            se.printStackTrace();
        }catch(Exception e){  // 處理 Class.forName 錯(cuò)誤            
            e.printStackTrace();
        }finally{  // 關(guān)閉資源            
            try{
                if(stmt!=null) stmt.close();
            }catch(SQLException se2){
            }
            try{
                if(conn!=null) conn.close();
            }catch(SQLException se){
                se.printStackTrace();
            }
        }
        client.sendDataToFlume("avro結(jié)束");    //測(cè)試中文是否亂碼:是
        client.sendDataToFlume("avro over");
        System.out.println("avro結(jié)束");
        client.cleanUp();
      } 
    }
    
class MyRpcClientFacade {
      private RpcClient client;
      private String hostname;
      private int port;

      public void init(String hostname, int port) {
        // Setup the RPC connection
        this.hostname = hostname;
        this.port = port;
        this.client = RpcClientFactory.getDefaultInstance(hostname, port);      //創(chuàng)建avro客戶端
        // Use the following method to create a thrift client (instead of the above line):
        // this.client = RpcClientFactory.getThriftInstance(hostname, port);    //創(chuàng)建Thrift客戶端
      }

      public void sendDataToFlume(String data) {
        // Create a Flume Event object that encapsulates the sample data
        // 調(diào)用EventBuilder重載的withBody()方法换吧。
        Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));        
        try {
          client.append(event);     // Send the event 發(fā)送數(shù)據(jù)
        } catch (EventDeliveryException e) {
          // clean up and recreate the client 清理并重新創(chuàng)建客戶端
          client.close();
          client = null;
          client = RpcClientFactory.getDefaultInstance(hostname, port);
          // Use the following method to create a thrift client (instead of the above line):
          // this.client = RpcClientFactory.getThriftInstance(hostname, port);
        }
      }

      public void cleanUp() {
        // Close the RPC connection
        client.close();
      }

}

再次運(yùn)行 MyApp.java
隨機(jī)讀取表中10條數(shù)據(jù)(每秒一條)折晦,輸出到Kafka


image.png

至此完成在Windows環(huán)境下使用Flume 監(jiān)聽(tīng)Avro Client并輸出到Kafka中!

謝謝閱讀沾瓦,有幫助的點(diǎn)個(gè)?满着!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市贯莺,隨后出現(xiàn)的幾起案子风喇,更是在濱河造成了極大的恐慌,老刑警劉巖乖篷,帶你破解...
    沈念sama閱讀 212,454評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件响驴,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡撕蔼,警方通過(guò)查閱死者的電腦和手機(jī)豁鲤,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)鲸沮,“玉大人琳骡,你說(shuō)我怎么就攤上這事∷夏纾” “怎么了楣号?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,921評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我炫狱,道長(zhǎng)藻懒,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,648評(píng)論 1 284
  • 正文 為了忘掉前任视译,我火速辦了婚禮嬉荆,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘酷含。我一直安慰自己鄙早,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,770評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布椅亚。 她就那樣靜靜地躺著限番,像睡著了一般。 火紅的嫁衣襯著肌膚如雪呀舔。 梳的紋絲不亂的頭發(fā)上弥虐,一...
    開(kāi)封第一講書(shū)人閱讀 49,950評(píng)論 1 291
  • 那天,我揣著相機(jī)與錄音别威,去河邊找鬼躯舔。 笑死,一個(gè)胖子當(dāng)著我的面吹牛省古,可吹牛的內(nèi)容都是我干的粥庄。 我是一名探鬼主播,決...
    沈念sama閱讀 39,090評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼豺妓,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼惜互!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起琳拭,我...
    開(kāi)封第一講書(shū)人閱讀 37,817評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤训堆,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后白嘁,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體坑鱼,經(jīng)...
    沈念sama閱讀 44,275評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,592評(píng)論 2 327
  • 正文 我和宋清朗相戀三年絮缅,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了鲁沥。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,724評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡耕魄,死狀恐怖画恰,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情吸奴,我是刑警寧澤允扇,帶...
    沈念sama閱讀 34,409評(píng)論 4 333
  • 正文 年R本政府宣布缠局,位于F島的核電站,受9級(jí)特大地震影響考润,放射性物質(zhì)發(fā)生泄漏狭园。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,052評(píng)論 3 316
  • 文/蒙蒙 一额划、第九天 我趴在偏房一處隱蔽的房頂上張望妙啃。 院中可真熱鬧,春花似錦俊戳、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,815評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至渐北,卻和暖如春阿逃,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背赃蛛。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,043評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工恃锉, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人呕臂。 一個(gè)月前我還...
    沈念sama閱讀 46,503評(píng)論 2 361
  • 正文 我出身青樓破托,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親歧蒋。 傳聞我的和親對(duì)象是個(gè)殘疾皇子土砂,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,627評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容