Flume 實(shí)時獲取日志內(nèi)容插入MySQL

前言:

本文章適用于在Windows上使用Flume 自定義sink啼染,實(shí)時獲取日志文件內(nèi)容并輸出到Mysql表中隧枫。首先確保你的flume-ng可以啟動忧便,跳過個別步驟可自行百度。

1捏境、MySQL創(chuàng)建表:

CREATE TABLE `fruit` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `salesman` varchar(20) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;

2孽尽、創(chuàng)建自定義Sink:

由于Flume的sink無法連接數(shù)據(jù)庫窖壕,需要自己寫一個自定義sink來連接

2.1、打開Eclipse,新建一個maven project

勾上 Create a simple project (skip archtype selection)
Group Id:

org.flume.mysql.sink

Artifact Id:

flumedemo
2.2艇拍、配置pom.xml

<build>...</build> 方便后續(xù)maven打包插件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>org.flume.mysql.sink</groupId>
  <artifactId>flumedemo</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-sdk</artifactId>
            <version>1.5.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-configuration</artifactId>
            <version>1.7.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.25</version>
        </dependency>
        
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                   <artifactId> maven-assembly-plugin </artifactId>
                   <configuration>
                        <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                        <archive>
                             <manifest>
                                  <mainClass></mainClass>
                             </manifest>
                        </archive>
                   </configuration>
                   <executions>
                        <execution>
                             <id>make-assembly</id>
                             <phase>package</phase>
                             <goals>
                                  <goal>single</goal>
                             </goals>
                        </execution>
                   </executions>
              </plugin>
              <plugin>  
                <groupId>org.apache.maven.plugins</groupId>  
                <artifactId>maven-compiler-plugin</artifactId>  
                <version>3.1</version>  
                <configuration>  
                    <source>1.7</source>  
                    <target>1.7</target>  
                </configuration>  
            </plugin>

        </plugins>
    </build>
</project>
2.3狐蜕、新建包和類

右擊 src/main/java 新建一個package org.flume.mysql.sink
實(shí)體類Fruit.java

package org.flume.mysql.sink;

public class Fruit {
    private String name;
    private String salesman;
    
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getSalesman() {
        return salesman;
    }
    public void setSalesman(String salesman) {
        this.salesman = salesman;
    }
    
}

自定義sink MysqlSink.java

package org.flume.mysql.sink;

import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
 
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
 
public class MysqlSink extends AbstractSink implements Configurable {

    private Logger LOG = LoggerFactory.getLogger(MysqlSink.class);
    private String hostname;
    private String port;
    private String databaseName;
    private String tableName;
    private String user;
    private String password;
    private PreparedStatement preparedStatement;
    private Connection conn;
    private int batchSize;

    public MysqlSink() {
        LOG.info("Start MysqlSink");
    }

    public void configure(Context context) {
        hostname = context.getString("hostname");
        Preconditions.checkNotNull(hostname, "hostname must be set!!");
        port = context.getString("port");
        Preconditions.checkNotNull(port, "port must be set!!");
        databaseName = context.getString("databaseName");
        Preconditions.checkNotNull(databaseName, "databaseName must be set!!");
        tableName = context.getString("tableName");
        Preconditions.checkNotNull(tableName, "tableName must be set!!");
        user = context.getString("user");
        Preconditions.checkNotNull(user, "user must be set!!");
        password = context.getString("password");
        Preconditions.checkNotNull(password, "password must be set!!");
        batchSize = context.getInteger("batchSize", 100);
        Preconditions.checkNotNull(batchSize > 0, "batchSize must be a positive number!!");
    }

    @Override
    public void start() {
        super.start();
        try {            
            Class.forName("com.mysql.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        String url = "jdbc:mysql://" + hostname + ":" + port + "/" + databaseName;        

        try {
            conn = DriverManager.getConnection(url, user, password);
            conn.setAutoCommit(false);
            //創(chuàng)建一個Statement對象
            preparedStatement = conn.prepareStatement("insert into " + tableName +
                    " (name,salesman) values (?,?)");

        } catch (SQLException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    @Override
    public void stop() {
        super.stop();
        if (preparedStatement != null) {
            try {
                preparedStatement.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        if (conn != null) {
            try {
                conn.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }

    public Status process() throws EventDeliveryException {
        Status result = Status.READY;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        Event event;
        String content;
        List<Fruit> infos = Lists.newArrayList();
        transaction.begin();        
        try {
            for (int i = 0; i < batchSize; i++) {
                event = channel.take();     //從channel中獲取一條數(shù)據(jù)
                LOG.info("i : " + i);                
                if (event != null) {        //對事件進(jìn)行處理                    
                    content = new String(event.getBody());  //event 的 body為 "apple,wang"
                    LOG.info("content : " + content);
                    Fruit fruit=new Fruit();
                    if (content.contains(",")) {                        
                        //存儲 event 的fruit. name
                        fruit.setName(content.substring(0, content.indexOf(",")));
                        //存儲 event 的 fruit.salesman 逗號分開 ","
                        fruit.setSalesman(content.substring(content.indexOf(",") + 1));
                    }else{
                        fruit.setName(content);                       
                    }
                    infos.add(fruit);
                } else {
                    result = Status.BACKOFF;
                    //LOG.info("result : " + result);
                    break;
                }
            }

            if (infos.size() > 0) {
                preparedStatement.clearBatch();
                for (Fruit temp : infos) {
                    preparedStatement.setString(1, temp.getName());
                    preparedStatement.setString(2, temp.getSalesman());
                    preparedStatement.addBatch();
                }
                preparedStatement.executeBatch();
                conn.commit();
            }
            transaction.commit();       //執(zhí)行提交操作
        } catch (Exception e) {
            try {
                transaction.rollback(); //執(zhí)行回滾操作
                //LOG.info("------------transaction.rollback()------------");
            } catch (Exception e2) {
                LOG.error("Exception in rollback. Rollback might not have been" +
                        "successful.", e2);
            }
            LOG.error("Failed to commit transaction." +
                    "Transaction rolled back.", e);
            Throwables.propagate(e);
        } finally {
            transaction.close();
        }
        return result;
    }
}
LOG.info("***");    //輸出語句,調(diào)試卸夕,方便之后再flume-ng輸出看到
2.4、mvn打包*

右擊項(xiàng)目 flumedemo
1婆瓜、Maven → update project
2快集、Run As → Maven clean
3、Run As → Maven install (第一次可能比較久)
打包完成廉白!如下圖

image.png

在 target 目錄下復(fù)制 flumedemo-0.0.1-SNAPSHOT-jar-with-dependencies.jar到flume的lib目錄下

3个初、配置conf:

D:\com\apache-flume-1.8.0-bin\logs目錄新建一個空的fruitdata.log (之后添加數(shù)據(jù))
D:\com\apache-flume-1.8.0-bin\conf目錄新建一個fruit.conf

fruit.conf

agent1.sources = source1
agent1.sinks = mysqlSink
agent1.channels = channel1

# Execsource 命令tail -f 實(shí)時獲取文件新增變化
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -f D:/com/apache-flume-1.8.0-bin/logs/fruitdata.log
agent1.sources.source1.channels = channel1

# MysqlSink配置  package名.類名
agent1.sinks.mysqlSink.type = org.flume.mysql.sink.MysqlSink
agent1.sinks.mysqlSink.hostname = localhost
agent1.sinks.mysqlSink.port = 3306
agent1.sinks.mysqlSink.databaseName = flume     //數(shù)據(jù)庫名
agent1.sinks.mysqlSink.tableName = fruit        //表名字
agent1.sinks.mysqlSink.user=root                //用戶名
agent1.sinks.mysqlSink.password = ****          //密碼
agent1.sinks.mysqlSink.channel = channel1

agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 1000
agent1.channels.channel1.transactionCapactiy = 100

由于在Windows中沒有 tail -f 的命令,找了很久感謝前人的分享
tail 下載地址見文末zip
不建議放到C:\Windows\System32內(nèi)
解壓到D:\tail 環(huán)境變量path追加;D:\tail即可

4猴蹂、實(shí)驗(yàn):

目錄D:\com\apache-flume-1.8.0-bin 右鍵+shift打開cmd

D:\com\apache-flume-1.8.0-bin>flume-ng agent -c conf -f conf/fruit.conf -n agent1 -property "flume.root.logger=INFO,console"

此時向fruitdata.log添加數(shù)據(jù) 字段之間用,隔開

apple,wang
banana,peng
kiwifruit,cnstt
lemon,bob

截圖

image.png

image.png

逐條添加數(shù)據(jù)院溺,看到數(shù)據(jù)庫表內(nèi)也增加數(shù)據(jù),因?yàn)?code>tail -f 命令可以實(shí)時讀取磅轻。

至此完成在Windows環(huán)境下使用Flume 自定義Sink獲取日志輸出到MySQL中表內(nèi)珍逸,中文會出現(xiàn)亂碼,后續(xù)繼續(xù)研究聋溜!

謝謝閱讀谆膳,有幫助的點(diǎn)個?!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末撮躁,一起剝皮案震驚了整個濱河市漱病,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌把曼,老刑警劉巖杨帽,帶你破解...
    沈念sama閱讀 217,657評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異嗤军,居然都是意外死亡注盈,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評論 3 394
  • 文/潘曉璐 我一進(jìn)店門型雳,熙熙樓的掌柜王于貴愁眉苦臉地迎上來当凡,“玉大人,你說我怎么就攤上這事纠俭⊙亓浚” “怎么了?”我有些...
    開封第一講書人閱讀 164,057評論 0 354
  • 文/不壞的土叔 我叫張陵冤荆,是天一觀的道長朴则。 經(jīng)常有香客問我,道長钓简,這世上最難降的妖魔是什么乌妒? 我笑而不...
    開封第一講書人閱讀 58,509評論 1 293
  • 正文 為了忘掉前任汹想,我火速辦了婚禮,結(jié)果婚禮上撤蚊,老公的妹妹穿的比我還像新娘古掏。我一直安慰自己,他們只是感情好侦啸,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,562評論 6 392
  • 文/花漫 我一把揭開白布槽唾。 她就那樣靜靜地躺著,像睡著了一般光涂。 火紅的嫁衣襯著肌膚如雪庞萍。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,443評論 1 302
  • 那天忘闻,我揣著相機(jī)與錄音钝计,去河邊找鬼。 笑死齐佳,一個胖子當(dāng)著我的面吹牛私恬,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播重虑,決...
    沈念sama閱讀 40,251評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼践付,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了缺厉?” 一聲冷哼從身側(cè)響起永高,我...
    開封第一講書人閱讀 39,129評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎提针,沒想到半個月后命爬,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,561評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡辐脖,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,779評論 3 335
  • 正文 我和宋清朗相戀三年饲宛,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片嗜价。...
    茶點(diǎn)故事閱讀 39,902評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡艇抠,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出久锥,到底是詐尸還是另有隱情家淤,我是刑警寧澤,帶...
    沈念sama閱讀 35,621評論 5 345
  • 正文 年R本政府宣布瑟由,位于F島的核電站絮重,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜青伤,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,220評論 3 328
  • 文/蒙蒙 一督怜、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧狠角,春花似錦号杠、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,838評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至动遭,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間神得,已是汗流浹背厘惦。 一陣腳步聲響...
    開封第一講書人閱讀 32,971評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留哩簿,地道東北人宵蕉。 一個月前我還...
    沈念sama閱讀 48,025評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像节榜,于是被迫代替她去往敵國和親羡玛。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,843評論 2 354

推薦閱讀更多精彩內(nèi)容