Flume1.7+Kafka+Streaming集成開發(fā)

為什么選擇Flume 1.7版本呢痰哨?
Flume1.7有了很多新功能屯阀,而且對Kafka支持更加全面。其中一個TAILDIR source可以自動監(jiān)控目錄下所有文件變化,在我做的項目中用的就是這個TAILDIR Source的使用约巷。

1. 安裝

  • 下載地址:apache-flume-1.7.0

  • 下載完成后,在/opt/ebohailife/目錄下上傳、解壓


[ebohailife@e-bohailife-dat002 ~]$ tar -zxvf apache-flume-1.7.0-bin.tar.gz

  • 檢測安裝是否成功:/opt/ebohailife/flume/apache-flume-1.7.0-bin/bin/flume-ng version

打印以下信息,則表示安裝成功了


[ebohailife@e-bohailife-dat002 conf]$ ../bin/flume-ng version

Flume 1.7.0

Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git

Revision: 511d868555dd4d16e6ce4fedc72c2d1454546707

Compiled by bessbd on Wed Oct 12 20:51:10 CEST 2016

From source with checksum 0d21b3ffdc55a07e1d08875872c00523

2. 開發(fā)

- 更改Flume配置文件


[ebohailife@e-bohailife-dat002 conf]$ echo $JAVA_HOME 

/opt/ebohailife/jdk1.7.0_80 

[ebohailife@e-bohailife-dat002 conf]$ cp flume-env.sh.template flume-env.sh

vi flume-env.sh # 修改flume-env.sh中JAVA_HOME變量的值

- 創(chuàng)建Flume任務(wù)的配置文件 taildir_behavior.conf


[ebohailife@e-bohailife-uat002 conf]$ vi taildir_behavior.conf 

#agent命名為a1

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = TAILDIR

a1.sources.r1.filegroups = f1

a1.sources.r1.filegroups.f1 = /opt/ebohailife/logs/rest/.*behavior.*

a1.sources.r1.positionFile = /tmp/flume/taildir_behavior_position.json

a1.sources.r1.fileHeader = false

# Describe the sink

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.bootstrap.servers = 10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092

a1.sinks.k1.kafka.topic = behaviorlog_r1p3

a1.sinks.k1.kafka.producer.acks= 1

a1.sinks.k1.kafka.producer.linger.ms = 1

a1.sinks.k1.flumeBatchSize = 100

# a1.sinks.k1.topic = behaviorlog_r1p3

# Kafka集群Broker列表寺董,以下屬性在1.7以上版本已棄用

# a1.sinks.k1.brokerList = 10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092

# a1.sinks.k1.requiredAcks = 1

# a1.sinks.k1.batchSize = 100

# Use a channel which buffers events in file

a1.channels.c1.type = file

#檢查點文件存儲路徑

a1.channels.c1.checkpointDir = /opt/ebohailife/apache-flume-1.7.0-bin/checkpoint

#消息數(shù)據(jù)存儲路徑

a1.channels.c1.dataDirs = /opt/ebohailife/apache-flume-1.7.0-bin/data

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

- 創(chuàng)建Flume任務(wù)的配置文件 taildir_phoneinfo.conf


[ebohailife@e-bohailife-uat002 conf]$ vi taildir_phoneinfo.conf

#agent命名為a2

a2.sources = r2

a2.sinks = k2

a2.channels = c2

# Describe/configure the source

a2.sources.r2.type = TAILDIR

a2.sources.r2.filegroups = f1

a2.sources.r2.filegroups.f1 = /opt/ebohailife/logs/rest/.*phoneinfo.*

a2.sources.r2.positionFile = /tmp/flume/taildir_phoneinfo_position.json

a2.sources.r2.fileHeader = false

# Describe the sink

a2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink

a2.sinks.k2.kafka.bootstrap.servers = 10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092

a2.sinks.k2.kafka.topic = phoneinfolog_r1p3

a2.sinks.k2.kafka.producer.acks= 1

a2.sinks.k2.kafka.producer.linger.ms = 1

a2.sinks.k2.flumeBatchSize = 100

# a2.sinks.k2.topic = behaviorlog_r1p3

# Kafka集群Broker列表,以下屬性在1.7以上版本已棄用

# a2.sinks.k2.brokerList = 10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092

# a2.sinks.k2.requiredAcks = 1

# a2.sinks.k2.batchSize = 100

# Use a channel which buffers events in file

a2.channels.c2.type = file

#檢查點文件存儲路徑

a2.channels.c2.checkpointDir = /opt/ebohailife/apache-flume-1.7.0-bin/checkpoint

#消息數(shù)據(jù)存儲路徑

a2.channels.c2.dataDirs = /opt/ebohailife/apache-flume-1.7.0-bin/data

# Bind the source and sink to the channel

a2.sources.r2.channels = c2

a2.sinks.k2.channel = c2

- 創(chuàng)建Kafka Topic


# 創(chuàng)建topic behaviorlog_r1p3 

./kafka-topics.sh --zookeeper 10.104.0.227:2181 --create --topic behaviorlog_r1p3 --partition 3 --replication-factor 1

# 創(chuàng)建topic phoneinfolog_r1p3

./kafka-topics.sh --zookeeper 10.104.0.227:2181 --create --topic phoneinfolog_r1p3 --partition 3 --replication-factor 1

- 查看topic


./kafka-topics.sh  --list --zookeeper 10.104.0.227:2181

- 啟動Flume NG刻剥,后臺運行


./flume-ng agent -c /opt/ebohailife/apache-flume-1.7.0-bin/conf -f /opt/ebohailife/apache-flume-1.7.0-bin/conf/taildir_behavior.conf  -n a1  >/dev/null 2>&1 &

./flume-ng agent -c /opt/ebohailife/apache-flume-1.7.0-bin/conf -f /opt/ebohailife/apache-flume-1.7.0-bin/conf/taildir_phoneinfo.conf  -n a2  >/dev/null 2>&1 & 

#  -Dflume.root.logger=INFO,console 

- 啟動Kafka Consumer遮咖,后臺運行


# 啟動behaviorlog_r1p3

./kafka-console-consumer.sh --topic behaviorlog_r1p3 --bootstrap-server 10.104.0.226:9092 >/dev/null 2>&1 &

 # 啟動phoneinfolog_r1p3

./kafka-console-consumer.sh --topic phoneinfolog_r1p3 --bootstrap-server 10.104.0.226:9092 >/dev/null 2>&1 & 

- 創(chuàng)建日志收集流


# 創(chuàng)建phoneinfo_log流

CREATE STREAM phoneinfo_log_stream(phoneinfo STRING, tmp1 STRING, ip STRING, tmp2 STRING, phone_model STRING, tmp3 STRING,  phone_version STRING, tmp4 STRING,  area STRING, tmp5 STRING, start_time TIMESTAMP, tmp6 STRING, KDDI STRING, tmp7 STRING, app_source STRING, tmp8 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' TBLPROPERTIES("topic"="phoneinfolog_r1p3" ,"kafka.zookeeper"="10.104.0.227:2181","kafka.broker.list"="10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092"); 

# 創(chuàng)建behavior_log流

CREATE STREAM behavior_log_stream(eventid STRING, tmp1 STRING, ip STRING, tmp2 STRING, user_id STRING, tmp3 STRING,  user_name STRING, tmp4 STRING,  in_time TIMESTAMP, tmp5 STRING, operate_time TIMESTAMP, tmp6 STRING, phone_unicode STRING, tmp7 STRING, trigger_count STRING, tmp8 STRING, diff_in_oper INT, tmp9 STRING, tel_no STRING, tmp10 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' TBLPROPERTIES("topic"="behaviorlog_r1p3" ,"kafka.zookeeper"="10.104.0.227:2181","kafka.broker.list"="10.104.0.226:9092,10.104.0.227:9092,10.104.0.228:9092"); 

- 創(chuàng)建日志表


# 創(chuàng)建phoneinfo_log表

CREATE TABLE phoneinfo_log_tab(phone STRING, ip STRING, phone_model STRING, phone_version STRING, area STRING, start_time TIMESTAMP, KDDI STRING, app_source STRING);

# 創(chuàng)建behavior_log表

CREATE TABLE behavior_log_tab(eventid STRING, ip STRING, user_id STRING, user_name STRING,  in_time TIMESTAMP,  operate_time TIMESTAMP,  

phone_unicode STRING, trigger_count STRING, diff_in_oper INT, tel_no STRING);

為防止小文件過多,進行以下設(shè)置:

set streamsql.enable.hdfs.batchflush = true # 打開批量flush開關(guān)
set streamsql.hdfs.batchflush.size = <num> #設(shè)置一次flush的消息個數(shù)造虏,消息量達到該參數(shù)時flush一次
set [streamsql.hdfs.batchflush.interval.ms](http://streamsql.hdfs.batchflush.interval.ms) = <num> #設(shè)置每過多長時間(單位為毫秒)flush一次 

# 需滿足 batchflush.size 和 [batchflush.interval.ms](http://batchflush.interval.ms) 其中的一個條件即會觸發(fā)一次flush

- 啟動日志流


# 觸發(fā)phoneinfo_log_stream流計算

INSERT INTO phoneinfo_log_tab SELECT phoneinfo, ip, phone_model, phone_version, area, start_time, KDDI, app_source FROM phoneinfo_log_stream; 

# 觸發(fā)behavior_log_stream流計算 

INSERT INTO behavior_log_tab SELECT eventid, ip, user_id, user_name,  in_time,  operate_time, phone_unicode, trigger_count, diff_in_oper, tel_no FROM behavior_log_stream;

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末御吞,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子漓藕,更是在濱河造成了極大的恐慌陶珠,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,454評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件享钞,死亡現(xiàn)場離奇詭異揍诽,居然都是意外死亡,警方通過查閱死者的電腦和手機栗竖,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評論 3 385
  • 文/潘曉璐 我一進店門暑脆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人划滋,你說我怎么就攤上這事饵筑“Bǎ” “怎么了处坪?”我有些...
    開封第一講書人閱讀 157,921評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長架专。 經(jīng)常有香客問我同窘,道長,這世上最難降的妖魔是什么部脚? 我笑而不...
    開封第一講書人閱讀 56,648評論 1 284
  • 正文 為了忘掉前任想邦,我火速辦了婚禮,結(jié)果婚禮上委刘,老公的妹妹穿的比我還像新娘丧没。我一直安慰自己,他們只是感情好锡移,可當我...
    茶點故事閱讀 65,770評論 6 386
  • 文/花漫 我一把揭開白布呕童。 她就那樣靜靜地躺著,像睡著了一般淆珊。 火紅的嫁衣襯著肌膚如雪夺饲。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,950評論 1 291
  • 那天,我揣著相機與錄音往声,去河邊找鬼擂找。 笑死,一個胖子當著我的面吹牛浩销,可吹牛的內(nèi)容都是我干的贯涎。 我是一名探鬼主播,決...
    沈念sama閱讀 39,090評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼慢洋,長吁一口氣:“原來是場噩夢啊……” “哼柬采!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起且警,我...
    開封第一講書人閱讀 37,817評論 0 268
  • 序言:老撾萬榮一對情侶失蹤粉捻,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后斑芜,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體肩刃,經(jīng)...
    沈念sama閱讀 44,275評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,592評論 2 327
  • 正文 我和宋清朗相戀三年杏头,在試婚紗的時候發(fā)現(xiàn)自己被綠了盈包。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,724評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡醇王,死狀恐怖呢燥,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情寓娩,我是刑警寧澤叛氨,帶...
    沈念sama閱讀 34,409評論 4 333
  • 正文 年R本政府宣布,位于F島的核電站棘伴,受9級特大地震影響寞埠,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜焊夸,卻給世界環(huán)境...
    茶點故事閱讀 40,052評論 3 316
  • 文/蒙蒙 一仁连、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧阱穗,春花似錦饭冬、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,815評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至遣钳,卻和暖如春扰魂,著一層夾襖步出監(jiān)牢的瞬間麦乞,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,043評論 1 266
  • 我被黑心中介騙來泰國打工劝评, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留姐直,地道東北人。 一個月前我還...
    沈念sama閱讀 46,503評論 2 361
  • 正文 我出身青樓蒋畜,卻偏偏與公主長得像声畏,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子姻成,可洞房花燭夜當晚...
    茶點故事閱讀 43,627評論 2 350

推薦閱讀更多精彩內(nèi)容