flume-ng+Kafka+Storm+HDFS 實(shí)時(shí)系統(tǒng)搭建

問(wèn)題導(dǎo)讀:

flume和kafka整合需要什么組件膳音?

flume-conf.properties需要做哪些修改堂淡?

kafka和storm的整合需要經(jīng)過(guò)哪些步驟幽崩?

一直以來(lái)都想接觸Storm實(shí)時(shí)計(jì)算這塊的東西,之前在弄這個(gè)的時(shí)候坎匿,跟群里的一些人討論過(guò),有的人說(shuō)雷激,直接用storm不就可以做實(shí)時(shí)處理了替蔬,用不著那么麻煩;其實(shí)不然屎暇,做軟件開(kāi)發(fā)的都知道模塊化思想承桥,這樣設(shè)計(jì)的原因有兩方面:

一方面是可以模塊化,功能劃分更加清晰根悼,從“數(shù)據(jù)采集--數(shù)據(jù)接入--流失計(jì)算--數(shù)據(jù)輸出/存儲(chǔ)”

1).數(shù)據(jù)采集

負(fù)責(zé)從各節(jié)點(diǎn)上實(shí)時(shí)采集數(shù)據(jù)凶异,選用cloudera的flume來(lái)實(shí)現(xiàn)

2).數(shù)據(jù)接入

由于采集數(shù)據(jù)的速度和數(shù)據(jù)處理的速度不一定同步,因此添加一個(gè)消息中間件來(lái)作為緩沖挤巡,選用apache的kafka

3).流式計(jì)算

對(duì)采集到的數(shù)據(jù)進(jìn)行實(shí)時(shí)分析剩彬,選用apache的storm

4).數(shù)據(jù)輸出

對(duì)分析后的結(jié)果持久化,暫定用mysql

另一方面是模塊化之后矿卑,加入當(dāng)Storm掛掉了之后喉恋,數(shù)據(jù)采集和數(shù)據(jù)接入還是繼續(xù)在跑著,數(shù)據(jù)不會(huì)丟失,storm起來(lái)之后可以繼續(xù)進(jìn)行流式計(jì)算轻黑;

那么接下來(lái)我們來(lái)看下整體的架構(gòu)圖

詳細(xì)介紹各個(gè)組件及安裝配置:

操作系統(tǒng):ubuntu

Flume

Flume是Cloudera提供的一個(gè)分布式糊肤、可靠、和高可用的海量日志采集苔悦、聚合和傳輸?shù)娜罩臼占到y(tǒng)轩褐,支持在日志系統(tǒng)中定制各類數(shù)據(jù)發(fā)送方,用于收集數(shù)據(jù);同時(shí)玖详,F(xiàn)lume提供對(duì)數(shù)據(jù)進(jìn)行簡(jiǎn)單處理把介,并寫(xiě)到各種數(shù)據(jù)接受方(可定制)的能力。

下圖為flume典型的體系結(jié)構(gòu):

Flume數(shù)據(jù)源以及輸出方式:

Flume提供了從console(控制臺(tái))蟋座、RPC(Thrift-RPC)拗踢、text(文件)、tail(UNIX tail)向臀、syslog(syslog日志系統(tǒng)巢墅,支持TCP和UDP等2種模式),exec(命令執(zhí)行)等數(shù)據(jù)源上收集數(shù)據(jù)的能力,在我們的系統(tǒng)中目前使用exec方式進(jìn)行日志采集券膀。

Flume的數(shù)據(jù)接受方君纫,可以是console(控制臺(tái))、text(文件)芹彬、dfs(HDFS文件)蓄髓、RPC(Thrift-RPC)和syslogTCP(TCP syslog日志系統(tǒng))等。在我們系統(tǒng)中由kafka來(lái)接收舒帮。

Flume下載及文檔:

http://flume.apache.org/

Flume安裝:

$tar zxvf apache-flume-1.4.0-bin.tar.gz/usr/local

復(fù)制代碼

Flume啟動(dòng)命令:

$bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name producer -Dflume.root.logger=INFO,console

復(fù)制代碼

Kafka

kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng)会喝,她有如下特性:

通過(guò)O(1)的磁盤(pán)數(shù)據(jù)結(jié)構(gòu)提供消息的持久化,這種結(jié)構(gòu)對(duì)于即使數(shù)以TB的消息存儲(chǔ)也能夠保持長(zhǎng)時(shí)間的穩(wěn)定性能玩郊。

高吞吐量:即使是非常普通的硬件kafka也可以支持每秒數(shù)十萬(wàn)的消息肢执。

支持通過(guò)kafka服務(wù)器和消費(fèi)機(jī)集群來(lái)分區(qū)消息。

支持Hadoop并行數(shù)據(jù)加載译红。

kafka的目的是提供一個(gè)發(fā)布訂閱解決方案预茄,它可以處理消費(fèi)者規(guī)模的網(wǎng)站中的所有動(dòng)作流數(shù)據(jù)。 這種動(dòng)作(網(wǎng)頁(yè)瀏覽侦厚,搜索和其他用戶的行動(dòng))是在現(xiàn)代網(wǎng)絡(luò)上的許多社會(huì)功能的一個(gè)關(guān)鍵因素反璃。 這些數(shù)據(jù)通常是由于吞吐量的要求而通過(guò)處理日志和日志聚合來(lái)解決。 對(duì)于像Hadoop的一樣的日志數(shù)據(jù)和離線分析系統(tǒng)假夺,但又要求實(shí)時(shí)處理的限制,這是一個(gè)可行的解決方案斋攀。kafka的目的是通過(guò)Hadoop的并行加載機(jī)制來(lái)統(tǒng)一線上和離線的消息處理已卷,也是為了通過(guò)集群機(jī)來(lái)提供實(shí)時(shí)的消費(fèi)。

kafka分布式訂閱架構(gòu)如下圖:--取自Kafka官網(wǎng)

羅寶兄弟文章上的架構(gòu)圖是這樣的

其實(shí)兩者沒(méi)有太大區(qū)別淳蔼,官網(wǎng)的架構(gòu)圖只是把Kafka簡(jiǎn)潔的表示成一個(gè)Kafka Cluster侧蘸,而羅寶兄弟的架構(gòu)圖就相對(duì)詳細(xì)一些裁眯;

Kafka版本:0.8.0

Kafka下載及文檔:http://kafka.apache.org/

Kafka安裝:

> tar xzf kafka-.tgz

> cd kafka-

> ./sbt update

> ./sbt package

> ./sbt assembly-package-dependency

復(fù)制代碼

啟動(dòng)及測(cè)試命令:

(1) start server

> bin/zookeeper-server-start.shconfig/zookeeper.properties

> bin/kafka-server-start.shconfig/server.properties

復(fù)制代碼

這里是官網(wǎng)上的教程,kafka本身有內(nèi)置zookeeper讳癌,但是我自己在實(shí)際部署中是使用單獨(dú)的zookeeper集群穿稳,所以第一行命令我就沒(méi)執(zhí)行,這里只是些出來(lái)給大家看下晌坤。

配置獨(dú)立的zookeeper集群需要配置server.properties文件逢艘,講zookeeper.connect修改為獨(dú)立集群的IP和端口

zookeeper.connect=nutch1:2181

復(fù)制代碼

(2)Create a topic

> bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test

> bin/kafka-list-topic.sh --zookeeperlocalhost:2181

復(fù)制代碼

(3)Send some messages

> bin/kafka-console-producer.sh--broker-list localhost:9092 --topic test

復(fù)制代碼

(4)Start a consumer

> bin/kafka-console-consumer.sh--zookeeper localhost:2181 --topic test --from-beginning

復(fù)制代碼

kafka-console-producer.sh和kafka-console-cousumer.sh只是系統(tǒng)提供的命令行工具。這里啟動(dòng)是為了測(cè)試是否能正常生產(chǎn)消費(fèi)骤菠;驗(yàn)證流程正確性

在實(shí)際開(kāi)發(fā)中還是要自行開(kāi)發(fā)自己的生產(chǎn)者與消費(fèi)者它改;

kafka的安裝也可以參考我之前寫(xiě)的文章:http://blog.csdn.net/weijonathan/article/details/18075967

Storm

Twitter將Storm正式開(kāi)源了,這是一個(gè)分布式的商乎、容錯(cuò)的實(shí)時(shí)計(jì)算系統(tǒng)央拖,它被托管在GitHub上,遵循 Eclipse Public License 1.0鹉戚。Storm是由BackType開(kāi)發(fā)的實(shí)時(shí)處理系統(tǒng)鲜戒,BackType現(xiàn)在已在Twitter麾下。GitHub上的最新版本是Storm 0.5.2抹凳,基本是用Clojure寫(xiě)的遏餐。

Storm的主要特點(diǎn)如下:

簡(jiǎn)單的編程模型。類似于MapReduce降低了并行批處理復(fù)雜性却桶,Storm降低了進(jìn)行實(shí)時(shí)處理的復(fù)雜性境输。

可以使用各種編程語(yǔ)言。你可以在Storm之上使用各種編程語(yǔ)言颖系。默認(rèn)支持Clojure嗅剖、Java、Ruby和Python嘁扼。要增加對(duì)其他語(yǔ)言的支持信粮,只需實(shí)現(xiàn)一個(gè)簡(jiǎn)單的Storm通信協(xié)議即可。

容錯(cuò)性趁啸。Storm會(huì)管理工作進(jìn)程和節(jié)點(diǎn)的故障强缘。

水平擴(kuò)展。計(jì)算是在多個(gè)線程不傅、進(jìn)程和服務(wù)器之間并行進(jìn)行的旅掂。

可靠的消息處理。Storm保證每個(gè)消息至少能得到一次完整處理访娶。任務(wù)失敗時(shí)商虐,它會(huì)負(fù)責(zé)從消息源重試消息。

快速。系統(tǒng)的設(shè)計(jì)保證了消息能得到快速的處理秘车,使用?MQ作為其底層消息隊(duì)列典勇。(0.9.0.1版本支持?MQ和netty兩種模式)

本地模式。Storm有一個(gè)“本地模式”叮趴,可以在處理過(guò)程中完全模擬Storm集群割笙。這讓你可以快速進(jìn)行開(kāi)發(fā)和單元測(cè)試。

由于篇幅問(wèn)題眯亦,具體的安裝步驟可以參考我之前寫(xiě)的文章:http://blog.csdn.net/weijonathan/article/details/17762477

接下來(lái)重頭戲開(kāi)始拉伤溉!那就是框架之間的整合啦

flume和kafka整合

1.下載

flume-kafka-plus:https://github.com/beyondj2ee/flumeng-kafka-plugin

2.提取插件中的flume-conf.properties文件

修改該文件:#source section

producer.sources.s.type = exec

producer.sources.s.command = tail -f -n+1 /mnt/hgfs/vmshare/test.log

producer.sources.s.channels = c

修改所有topic的值改為test

將改后的配置文件放進(jìn)flume/conf目錄下

在該項(xiàng)目中提取以下jar包放入環(huán)境中flume的lib下:

注:這里的flumeng-kafka-plugin.jar這個(gè)包,后面在github項(xiàng)目中已經(jīng)移動(dòng)到package目錄了搔驼。找不到的童鞋可以到package目錄獲取谈火。

完成上面的步驟之后,我們來(lái)測(cè)試下flume+kafka這個(gè)流程有沒(méi)有走通舌涨;

我們先啟動(dòng)flume糯耍,然后再啟動(dòng)kafka,啟動(dòng)步驟按之前的步驟執(zhí)行囊嘉;接下來(lái)我們使用kafka的kafka-console-consumer.sh腳本查看是否有flume有沒(méi)有往Kafka傳輸數(shù)據(jù)温技;

以上這個(gè)是我的test.log文件通過(guò)flume抓取傳到kafka的數(shù)據(jù);說(shuō)明我們的flume和kafka流程走通了扭粱;

大家還記得剛開(kāi)始我們的流程圖么舵鳞,其中有一步是通過(guò)flume到kafka,還有一步是到hdfs的琢蛤;而我們這邊還沒(méi)有提到如何存入kafka且同時(shí)存如hdfs蜓堕;

flume是支持?jǐn)?shù)據(jù)同步復(fù)制,同步復(fù)制流程圖如下博其,取自于flume官網(wǎng)套才,官網(wǎng)用戶指南地址:http://flume.apache.org/FlumeUserGuide.html

怎么設(shè)置同步復(fù)制呢,看下面的配置:

#2個(gè)channel和2個(gè)sink的配置文件 這里我們可以設(shè)置兩個(gè)sink慕淡,一個(gè)是kafka的背伴,一個(gè)是hdfs的;

a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1 c2

復(fù)制代碼

具體配置大伙根據(jù)自己的需求去設(shè)置峰髓,這里就不具體舉例了

kafka和storm的整合

1.下載kafka-storm0.8插件:https://github.com/wurstmeister/storm-kafka-0.8-plus

2.使用maven package進(jìn)行編譯傻寂,得到storm-kafka-0.8-plus-0.3.0-SNAPSHOT.jar包??--有轉(zhuǎn)載的童鞋注意下,這里的包名之前寫(xiě)錯(cuò)了携兵,現(xiàn)在改正確了疾掰!不好意思!

3.將該jar包及kafka_2.9.2-0.8.0-beta1.jar徐紧、metrics-core-2.2.0.jar个绍、scala-library-2.9.2.jar (這三個(gè)jar包在kafka項(xiàng)目中能找到)

備注:如果開(kāi)發(fā)的項(xiàng)目需要其他jar勒葱,記得也要放進(jìn)storm的Lib中比如用到了mysql就要添加mysql-connector-java-5.1.22-bin.jar到storm的lib下

那么接下來(lái)我們把storm也重啟下;

完成以上步驟之后巴柿,我們還有一件事情要做,就是使用kafka-storm0.8插件死遭,寫(xiě)一個(gè)自己的Storm程序广恢;

這里我給大伙附上一個(gè)我弄的storm程序,百度網(wǎng)盤(pán)分享地址:鏈接:http://pan.baidu.com/s/1dD28mDr密碼: 44r3

先稍微看下程序的創(chuàng)建Topology代碼

數(shù)據(jù)操作主要在WordCounter類中呀潭,這里只是使用簡(jiǎn)單JDBC進(jìn)行插入處理

這里只需要輸入一個(gè)參數(shù)作為T(mén)opology名稱就可以了钉迷!我們這里使用本地模式,所以不輸入?yún)?shù)钠署,直接看流程是否走通糠聪;

storm-0.9.0.1/bin/storm jar storm-start-demo-0.0.1-SNAPSHOT.jar com.storm.topology.MyTopology

先看下日志,這里打印出來(lái)了往數(shù)據(jù)庫(kù)里面插入數(shù)據(jù)了

然后我們查看下數(shù)據(jù)庫(kù)谐鼎;插入成功了舰蟆!

到這里我們的整個(gè)整合就完成了!

但是這里還有一個(gè)問(wèn)題狸棍,不知道大伙有沒(méi)有發(fā)現(xiàn)身害。

由于我們使用storm進(jìn)行分布式流式計(jì)算,那么分布式最需要注意的是數(shù)據(jù)一致性以及避免臟數(shù)據(jù)的產(chǎn)生草戈;所以我提供的測(cè)試項(xiàng)目只能用于測(cè)試塌鸯,正式開(kāi)發(fā)不能這樣處理;

同時(shí)給的建議是建立一個(gè)zookeeper的分布式全局鎖唐片,保證數(shù)據(jù)一致性丙猬,避免臟數(shù)據(jù)錄入!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末费韭,一起剝皮案震驚了整個(gè)濱河市茧球,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌揽思,老刑警劉巖袜腥,帶你破解...
    沈念sama閱讀 218,858評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異钉汗,居然都是意外死亡羹令,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)损痰,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)福侈,“玉大人,你說(shuō)我怎么就攤上這事卢未》玖荩” “怎么了堰汉?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,282評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)伟墙。 經(jīng)常有香客問(wèn)我翘鸭,道長(zhǎng),這世上最難降的妖魔是什么戳葵? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,842評(píng)論 1 295
  • 正文 為了忘掉前任就乓,我火速辦了婚禮,結(jié)果婚禮上拱烁,老公的妹妹穿的比我還像新娘生蚁。我一直安慰自己,他們只是感情好戏自,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,857評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布邦投。 她就那樣靜靜地躺著,像睡著了一般擅笔。 火紅的嫁衣襯著肌膚如雪志衣。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,679評(píng)論 1 305
  • 那天剂娄,我揣著相機(jī)與錄音蠢涝,去河邊找鬼。 笑死阅懦,一個(gè)胖子當(dāng)著我的面吹牛和二,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播耳胎,決...
    沈念sama閱讀 40,406評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼惯吕,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了怕午?” 一聲冷哼從身側(cè)響起穷娱,我...
    開(kāi)封第一講書(shū)人閱讀 39,311評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤毕荐,失蹤者是張志新(化名)和其女友劉穎郁妈,沒(méi)想到半個(gè)月后猩系,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,767評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡兆蕉,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評(píng)論 3 336
  • 正文 我和宋清朗相戀三年羽戒,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片虎韵。...
    茶點(diǎn)故事閱讀 40,090評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡易稠,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出包蓝,到底是詐尸還是另有隱情驶社,我是刑警寧澤企量,帶...
    沈念sama閱讀 35,785評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站亡电,受9級(jí)特大地震影響届巩,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜逊抡,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,420評(píng)論 3 331
  • 文/蒙蒙 一姆泻、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧冒嫡,春花似錦、人聲如沸四苇。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,988評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)月腋。三九已至蟀架,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間榆骚,已是汗流浹背片拍。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,101評(píng)論 1 271
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留妓肢,地道東北人捌省。 一個(gè)月前我還...
    沈念sama閱讀 48,298評(píng)論 3 372
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像碉钠,于是被迫代替她去往敵國(guó)和親纲缓。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,033評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容