為什么選擇Flume 1.7版本呢痰哨?
Flume1.7有了很多新功能屯阀,而且對Kafka支持更加全面。其中一個TAILDIR source可以自動監(jiān)控目錄下所有文件變化,在我做的項目中用的就是這個TAILDIR Source的使用约巷。
1. 安裝
下載地址:apache-flume-1.7.0
下載完成后,在/opt/ebohailife/目錄下上傳、解壓
[ebohailife@e-bohailife-dat002 ~]$ tar -zxvf apache-flume-1.7.0-bin.tar.gz
- 檢測安裝是否成功:/opt/ebohailife/flume/apache-flume-1.7.0-bin/bin/flume-ng version
打印以下信息,則表示安裝成功了
[ebohailife@e-bohailife-dat002 conf]$ ../bin/flume-ng version
Flume 1.7.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 511d868555dd4d16e6ce4fedc72c2d1454546707
Compiled by bessbd on Wed Oct 12 20:51:10 CEST 2016
From source with checksum 0d21b3ffdc55a07e1d08875872c00523
2. 開發(fā)
- 更改Flume配置文件
[ebohailife@e-bohailife-dat002 conf]$ echo $JAVA_HOME
/opt/ebohailife/jdk1.7.0_80
[ebohailife@e-bohailife-dat002 conf]$ cp flume-env.sh.template flume-env.sh
vi flume-env.sh # 修改flume-env.sh中JAVA_HOME變量的值
- 創(chuàng)建Flume任務(wù)的配置文件 taildir_behavior.conf
[ebohailife@e-bohailife-uat002 conf]$ vi taildir_behavior.conf
#agent命名為a1
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/ebohailife/logs/rest/.*behavior.*
a1.sources.r1.positionFile = /tmp/flume/taildir_behavior_position.json
a1.sources.r1.fileHeader = false
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = 10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092
a1.sinks.k1.kafka.topic = behaviorlog_r1p3
a1.sinks.k1.kafka.producer.acks= 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.flumeBatchSize = 100
# a1.sinks.k1.topic = behaviorlog_r1p3
# Kafka集群Broker列表寺董,以下屬性在1.7以上版本已棄用
# a1.sinks.k1.brokerList = 10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092
# a1.sinks.k1.requiredAcks = 1
# a1.sinks.k1.batchSize = 100
# Use a channel which buffers events in file
a1.channels.c1.type = file
#檢查點文件存儲路徑
a1.channels.c1.checkpointDir = /opt/ebohailife/apache-flume-1.7.0-bin/checkpoint
#消息數(shù)據(jù)存儲路徑
a1.channels.c1.dataDirs = /opt/ebohailife/apache-flume-1.7.0-bin/data
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 創(chuàng)建Flume任務(wù)的配置文件 taildir_phoneinfo.conf
[ebohailife@e-bohailife-uat002 conf]$ vi taildir_phoneinfo.conf
#agent命名為a2
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = TAILDIR
a2.sources.r2.filegroups = f1
a2.sources.r2.filegroups.f1 = /opt/ebohailife/logs/rest/.*phoneinfo.*
a2.sources.r2.positionFile = /tmp/flume/taildir_phoneinfo_position.json
a2.sources.r2.fileHeader = false
# Describe the sink
a2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a2.sinks.k2.kafka.bootstrap.servers = 10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092
a2.sinks.k2.kafka.topic = phoneinfolog_r1p3
a2.sinks.k2.kafka.producer.acks= 1
a2.sinks.k2.kafka.producer.linger.ms = 1
a2.sinks.k2.flumeBatchSize = 100
# a2.sinks.k2.topic = behaviorlog_r1p3
# Kafka集群Broker列表,以下屬性在1.7以上版本已棄用
# a2.sinks.k2.brokerList = 10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092
# a2.sinks.k2.requiredAcks = 1
# a2.sinks.k2.batchSize = 100
# Use a channel which buffers events in file
a2.channels.c2.type = file
#檢查點文件存儲路徑
a2.channels.c2.checkpointDir = /opt/ebohailife/apache-flume-1.7.0-bin/checkpoint
#消息數(shù)據(jù)存儲路徑
a2.channels.c2.dataDirs = /opt/ebohailife/apache-flume-1.7.0-bin/data
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
- 創(chuàng)建Kafka Topic
# 創(chuàng)建topic behaviorlog_r1p3
./kafka-topics.sh --zookeeper 10.104.0.227:2181 --create --topic behaviorlog_r1p3 --partition 3 --replication-factor 1
# 創(chuàng)建topic phoneinfolog_r1p3
./kafka-topics.sh --zookeeper 10.104.0.227:2181 --create --topic phoneinfolog_r1p3 --partition 3 --replication-factor 1
- 查看topic
./kafka-topics.sh --list --zookeeper 10.104.0.227:2181
- 啟動Flume NG刻剥,后臺運行
./flume-ng agent -c /opt/ebohailife/apache-flume-1.7.0-bin/conf -f /opt/ebohailife/apache-flume-1.7.0-bin/conf/taildir_behavior.conf -n a1 >/dev/null 2>&1 &
./flume-ng agent -c /opt/ebohailife/apache-flume-1.7.0-bin/conf -f /opt/ebohailife/apache-flume-1.7.0-bin/conf/taildir_phoneinfo.conf -n a2 >/dev/null 2>&1 &
# -Dflume.root.logger=INFO,console
- 啟動Kafka Consumer遮咖,后臺運行
# 啟動behaviorlog_r1p3
./kafka-console-consumer.sh --topic behaviorlog_r1p3 --bootstrap-server 10.104.0.226:9092 >/dev/null 2>&1 &
# 啟動phoneinfolog_r1p3
./kafka-console-consumer.sh --topic phoneinfolog_r1p3 --bootstrap-server 10.104.0.226:9092 >/dev/null 2>&1 &
- 創(chuàng)建日志收集流
# 創(chuàng)建phoneinfo_log流
CREATE STREAM phoneinfo_log_stream(phoneinfo STRING, tmp1 STRING, ip STRING, tmp2 STRING, phone_model STRING, tmp3 STRING, phone_version STRING, tmp4 STRING, area STRING, tmp5 STRING, start_time TIMESTAMP, tmp6 STRING, KDDI STRING, tmp7 STRING, app_source STRING, tmp8 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' TBLPROPERTIES("topic"="phoneinfolog_r1p3" ,"kafka.zookeeper"="10.104.0.227:2181","kafka.broker.list"="10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092");
# 創(chuàng)建behavior_log流
CREATE STREAM behavior_log_stream(eventid STRING, tmp1 STRING, ip STRING, tmp2 STRING, user_id STRING, tmp3 STRING, user_name STRING, tmp4 STRING, in_time TIMESTAMP, tmp5 STRING, operate_time TIMESTAMP, tmp6 STRING, phone_unicode STRING, tmp7 STRING, trigger_count STRING, tmp8 STRING, diff_in_oper INT, tmp9 STRING, tel_no STRING, tmp10 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' TBLPROPERTIES("topic"="behaviorlog_r1p3" ,"kafka.zookeeper"="10.104.0.227:2181","kafka.broker.list"="10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092");
- 創(chuàng)建日志表
# 創(chuàng)建phoneinfo_log表
CREATE TABLE phoneinfo_log_tab(phone STRING, ip STRING, phone_model STRING, phone_version STRING, area STRING, start_time TIMESTAMP, KDDI STRING, app_source STRING);
# 創(chuàng)建behavior_log表
CREATE TABLE behavior_log_tab(eventid STRING, ip STRING, user_id STRING, user_name STRING, in_time TIMESTAMP, operate_time TIMESTAMP,
phone_unicode STRING, trigger_count STRING, diff_in_oper INT, tel_no STRING);
為防止小文件過多,進行以下設(shè)置:
set streamsql.enable.hdfs.batchflush = true # 打開批量flush開關(guān)
set streamsql.hdfs.batchflush.size = <num> #設(shè)置一次flush的消息個數(shù)造虏,消息量達到該參數(shù)時flush一次
set [streamsql.hdfs.batchflush.interval.ms](http://streamsql.hdfs.batchflush.interval.ms) = <num> #設(shè)置每過多長時間(單位為毫秒)flush一次
# 需滿足 batchflush.size 和 [batchflush.interval.ms](http://batchflush.interval.ms) 其中的一個條件即會觸發(fā)一次flush
- 啟動日志流
# 觸發(fā)phoneinfo_log_stream流計算
INSERT INTO phoneinfo_log_tab SELECT phoneinfo, ip, phone_model, phone_version, area, start_time, KDDI, app_source FROM phoneinfo_log_stream;
# 觸發(fā)behavior_log_stream流計算
INSERT INTO behavior_log_tab SELECT eventid, ip, user_id, user_name, in_time, operate_time, phone_unicode, trigger_count, diff_in_oper, tel_no FROM behavior_log_stream;