個(gè)人觀點(diǎn):大數(shù)據(jù)我們都知道hadoop,但并不都是hadoop.我們?cè)撊绾螛?gòu)建大數(shù)據(jù)庫項(xiàng)目寞缝。對(duì)于離線處理派敷,hadoop還是比較適合的蛹批,但是對(duì)于實(shí)時(shí)性比較強(qiáng)的撰洗,數(shù)據(jù)量比較大的篮愉,我們可以采用Storm,那么Storm和什么技術(shù)搭配差导,才能夠做一個(gè)適合自己的項(xiàng)目试躏。下面給大家可以參考。
可以帶著下面問題來閱讀本文章:
1.一個(gè)好的項(xiàng)目架構(gòu)應(yīng)該具備什么特點(diǎn)设褐?
2.本項(xiàng)目架構(gòu)是如何保證數(shù)據(jù)準(zhǔn)確性的颠蕴?
3.什么是Kafka?
4.flume+kafka如何整合助析?
5.使用什么腳本可以查看flume有沒有往Kafka傳輸數(shù)據(jù)
做軟件開發(fā)的都知道模塊化思想犀被,這樣設(shè)計(jì)的原因有兩方面:
一方面是可以模塊化,功能劃分更加清晰外冀,從“數(shù)據(jù)采集--數(shù)據(jù)接入--流失計(jì)算--數(shù)據(jù)輸出/存儲(chǔ)”
1).數(shù)據(jù)采集
負(fù)責(zé)從各節(jié)點(diǎn)上實(shí)時(shí)采集數(shù)據(jù)寡键,選用cloudera的flume來實(shí)現(xiàn)
2).數(shù)據(jù)接入
由于采集數(shù)據(jù)的速度和數(shù)據(jù)處理的速度不一定同步,因此添加一個(gè)消息中間件來作為緩沖雪隧,選用apache的kafka
3).流式計(jì)算
對(duì)采集到的數(shù)據(jù)進(jìn)行實(shí)時(shí)分析西轩,選用apache的storm
4).數(shù)據(jù)輸出
對(duì)分析后的結(jié)果持久化员舵,暫定用mysql
另一方面是模塊化之后,假如當(dāng)Storm掛掉了之后藕畔,數(shù)據(jù)采集和數(shù)據(jù)接入還是繼續(xù)在跑著马僻,數(shù)據(jù)不會(huì)丟失,storm起來之后可以繼續(xù)進(jìn)行流式計(jì)算注服;
那么接下來我們來看下整體的架構(gòu)圖
詳細(xì)介紹各個(gè)組件及安裝配置:
操作系統(tǒng):ubuntu
Flume
Flume是Cloudera提供的一個(gè)分布式韭邓、可靠、和高可用的海量日志采集祠汇、聚合和傳輸?shù)娜罩臼占到y(tǒng)仍秤,支持在日志系統(tǒng)中定制各類數(shù)據(jù)發(fā)送方,用于收集數(shù)據(jù);同時(shí)可很,F(xiàn)lume提供對(duì)數(shù)據(jù)進(jìn)行簡(jiǎn)單處理诗力,并寫到各種數(shù)據(jù)接受方(可定制)的能力。
下圖為flume典型的體系結(jié)構(gòu):
Flume數(shù)據(jù)源以及輸出方式:
Flume提供了從console(控制臺(tái))我抠、RPC(Thrift-RPC)苇本、text(文件)、tail(UNIX tail)菜拓、syslog(syslog日志系統(tǒng)瓣窄,支持TCP和UDP等2種模式),exec(命令執(zhí)行)等數(shù)據(jù)源上收集數(shù)據(jù)的能力,在我們的系統(tǒng)中目前使用exec方式進(jìn)行日志采集纳鼎。
Flume的數(shù)據(jù)接受方俺夕,可以是console(控制臺(tái))、text(文件)贱鄙、dfs(HDFS文件)劝贸、RPC(Thrift-RPC)和syslogTCP(TCP syslog日志系統(tǒng))等。在我們系統(tǒng)中由kafka來接收逗宁。
Flume下載及文檔:
http://flume.apache.org/
Flume安裝:
$tar zxvf apache-flume-1.4.0-bin.tar.gz/usr/local
復(fù)制代碼
Flume啟動(dòng)命令:
$bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name producer -Dflume.root.logger=INFO,console
復(fù)制代碼
Kafka
kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng)映九,她有如下特性:
通過O(1)的磁盤數(shù)據(jù)結(jié)構(gòu)提供消息的持久化,這種結(jié)構(gòu)對(duì)于即使數(shù)以TB的消息存儲(chǔ)也能夠保持長(zhǎng)時(shí)間的穩(wěn)定性能瞎颗。
高吞吐量:即使是非常普通的硬件kafka也可以支持每秒數(shù)十萬的消息件甥。
支持通過kafka服務(wù)器和消費(fèi)機(jī)集群來分區(qū)消息。
支持Hadoop并行數(shù)據(jù)加載哼拔。
kafka的目的是提供一個(gè)發(fā)布訂閱解決方案引有,它可以處理消費(fèi)者規(guī)模的網(wǎng)站中的所有動(dòng)作流數(shù)據(jù)。 這種動(dòng)作(網(wǎng)頁瀏覽倦逐,搜索和其他用戶的行動(dòng))是在現(xiàn)代網(wǎng)絡(luò)上的許多社會(huì)功能的一個(gè)關(guān)鍵因素譬正。 這些數(shù)據(jù)通常是由于吞吐量的要求而通過處理日志和日志聚合來解決。 對(duì)于像Hadoop的一樣的日志數(shù)據(jù)和離線分析系統(tǒng),但又要求實(shí)時(shí)處理的限制导帝,這是一個(gè)可行的解決方案守谓。kafka的目的是通過Hadoop的并行加載機(jī)制來統(tǒng)一線上和離線的消息處理,也是為了通過集群機(jī)來提供實(shí)時(shí)的消費(fèi)您单。
kafka分布式訂閱架構(gòu)如下圖:--取自Kafka官網(wǎng)
羅寶兄弟文章上的架構(gòu)圖是這樣的
其實(shí)兩者沒有太大區(qū)別斋荞,官網(wǎng)的架構(gòu)圖只是把Kafka簡(jiǎn)潔的表示成一個(gè)Kafka Cluster,而上面架構(gòu)圖就相對(duì)詳細(xì)一些虐秦;
Kafka版本:0.8.0
Kafka下載及文檔:http://kafka.apache.org/
Kafka安裝:
> tar xzf kafka-.tgz
> cd kafka-
> ./sbt update
> ./sbt package
> ./sbt assembly-package-dependency
復(fù)制代碼
啟動(dòng)及測(cè)試命令:
(1) start server
> bin/zookeeper-server-start.shconfig/zookeeper.properties
> bin/kafka-server-start.shconfig/server.properties
復(fù)制代碼
這里是官網(wǎng)上的教程平酿,kafka本身有內(nèi)置zookeeper,但是我自己在實(shí)際部署中是使用單獨(dú)的zookeeper集群悦陋,所以第一行命令我就沒執(zhí)行蜈彼,這里只是些出來給大家看下。
配置獨(dú)立的zookeeper集群需要配置server.properties文件俺驶,講zookeeper.connect修改為獨(dú)立集群的IP和端口
zookeeper.connect=nutch1:2181
復(fù)制代碼
(2)Create a topic
> bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test
> bin/kafka-list-topic.sh --zookeeperlocalhost:2181
復(fù)制代碼
(3)Send some messages
> bin/kafka-console-producer.sh--broker-list localhost:9092 --topic test
復(fù)制代碼
(4)Start a consumer
> bin/kafka-console-consumer.sh--zookeeper localhost:2181 --topic test --from-beginning
復(fù)制代碼
kafka-console-producer.sh和kafka-console-cousumer.sh只是系統(tǒng)提供的命令行工具幸逆。這里啟動(dòng)是為了測(cè)試是否能正常生產(chǎn)消費(fèi);驗(yàn)證流程正確性
在實(shí)際開發(fā)中還是要自行開發(fā)自己的生產(chǎn)者與消費(fèi)者暮现;
kafka的安裝也可以參考我之前寫的文章:http://blog.csdn.net/weijonathan/article/details/18075967
Storm
Twitter將Storm正式開源了还绘,這是一個(gè)分布式的、容錯(cuò)的實(shí)時(shí)計(jì)算系統(tǒng)栖袋,它被托管在GitHub上拍顷,遵循??Eclipse Public License 1.0。Storm是由BackType開發(fā)的實(shí)時(shí)處理系統(tǒng)塘幅,BackType現(xiàn)在已在Twitter麾下昔案。GitHub上的最新版本是Storm 0.5.2,基本是用Clojure寫的电媳。
Storm的主要特點(diǎn)如下:
簡(jiǎn)單的編程模型踏揣。類似于MapReduce降低了并行批處理復(fù)雜性,Storm降低了進(jìn)行實(shí)時(shí)處理的復(fù)雜性匆背。
可以使用各種編程語言呼伸。你可以在Storm之上使用各種編程語言身冀。默認(rèn)支持Clojure钝尸、Java、Ruby和Python搂根。要增加對(duì)其他語言的支持珍促,只需實(shí)現(xiàn)一個(gè)簡(jiǎn)單的Storm通信協(xié)議即可。
容錯(cuò)性剩愧。Storm會(huì)管理工作進(jìn)程和節(jié)點(diǎn)的故障猪叙。
水平擴(kuò)展。計(jì)算是在多個(gè)線程、進(jìn)程和服務(wù)器之間并行進(jìn)行的穴翩。
可靠的消息處理犬第。Storm保證每個(gè)消息至少能得到一次完整處理。任務(wù)失敗時(shí)芒帕,它會(huì)負(fù)責(zé)從消息源重試消息歉嗓。
快速。系統(tǒng)的設(shè)計(jì)保證了消息能得到快速的處理背蟆,使用?MQ作為其底層消息隊(duì)列鉴分。(0.9.0.1版本支持?MQ和netty兩種模式)
本地模式。Storm有一個(gè)“本地模式”带膀,可以在處理過程中完全模擬Storm集群志珍。這讓你可以快速進(jìn)行開發(fā)和單元測(cè)試。
由于篇幅問題垛叨,具體的安裝步驟可以參考:Storm-0.9.0.1安裝部署 指導(dǎo)
接下來重頭戲開始拉伦糯!那就是框架之間的整合啦
flume和kafka整合
1.下載flume-kafka-plus:https://github.com/beyondj2ee/flumeng-kafka-plugin
2.提取插件中的flume-conf.properties文件
修改該文件:#source section
producer.sources.s.type = exec
producer.sources.s.command = tail -f -n+1 /mnt/hgfs/vmshare/test.log
producer.sources.s.channels = c
修改所有topic的值改為test
將改后的配置文件放進(jìn)flume/conf目錄下
在該項(xiàng)目中提取以下jar包放入環(huán)境中flume的lib下:
注:這里的flumeng-kafka-plugin.jar這個(gè)包,后面在github項(xiàng)目中已經(jīng)移動(dòng)到package目錄了嗽元。找不到的童鞋可以到package目錄獲取舔株。
完成上面的步驟之后,我們來測(cè)試下flume+kafka這個(gè)流程有沒有走通还棱;
我們先啟動(dòng)flume载慈,然后再啟動(dòng)kafka,啟動(dòng)步驟按之前的步驟執(zhí)行珍手;接下來我們使用kafka的kafka-console-consumer.sh腳本查看是否有flume有沒有往Kafka傳輸數(shù)據(jù)办铡;
以上這個(gè)是我的test.log文件通過flume抓取傳到kafka的數(shù)據(jù);說明我們的flume和kafka流程走通了琳要;
大家還記得剛開始我們的流程圖么寡具,其中有一步是通過flume到kafka,還有一步是到hdfs的稚补;而我們這邊還沒有提到如何存入kafka且同時(shí)存如hdfs童叠;
flume是支持?jǐn)?shù)據(jù)同步復(fù)制,同步復(fù)制流程圖如下课幕,取自于flume官網(wǎng)厦坛,官網(wǎng)用戶指南地址:http://flume.apache.org/FlumeUserGuide.html
怎么設(shè)置同步復(fù)制呢,看下面的配置:
#2個(gè)channel和2個(gè)sink的配置文件??這里我們可以設(shè)置兩個(gè)sink乍惊,一個(gè)是kafka的杜秸,一個(gè)是hdfs的;
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
復(fù)制代碼
具體配置大伙根據(jù)自己的需求去設(shè)置润绎,這里就不具體舉例了
kafka和storm的整合
1.下載kafka-storm0.8插件:https://github.com/wurstmeister/storm-kafka-0.8-plus
2.使用maven package進(jìn)行編譯撬碟,得到storm-kafka-0.8-plus-0.3.0-SNAPSHOT.jar包--有轉(zhuǎn)載的童鞋注意下诞挨,這里的包名之前寫錯(cuò)了,現(xiàn)在改正確了呢蛤!不好意思惶傻!
3.將該jar包及kafka_2.9.2-0.8.0-beta1.jar、metrics-core-2.2.0.jar其障、scala-library-2.9.2.jar (這三個(gè)jar包在kafka項(xiàng)目中能找到)
備注:如果開發(fā)的項(xiàng)目需要其他jar达罗,記得也要放進(jìn)storm的Lib中比如用到了mysql就要添加mysql-connector-java-5.1.22-bin.jar到storm的lib下
那么接下來我們把storm也重啟下;
完成以上步驟之后静秆,我們還有一件事情要做粮揉,就是使用kafka-storm0.8插件,寫一個(gè)自己的Storm程序抚笔;
這里我給大伙附上一個(gè)我弄的storm程序扶认,百度網(wǎng)盤分享地址:鏈接:?http://pan.baidu.com/s/1jGBp99W?密碼: 9arq
先稍微看下程序的創(chuàng)建Topology代碼
數(shù)據(jù)操作主要在WordCounter類中,這里只是使用簡(jiǎn)單JDBC進(jìn)行插入處理
這里只需要輸入一個(gè)參數(shù)作為Topology名稱就可以了殊橙!我們這里使用本地模式辐宾,所以不輸入?yún)?shù),直接看流程是否走通膨蛮;
storm-0.9.0.1/bin/storm jar storm-start-demo-0.0.1-SNAPSHOT.jar com.storm.topology.MyTopology
復(fù)制代碼
先看下日志叠纹,這里打印出來了往數(shù)據(jù)庫里面插入數(shù)據(jù)了
然后我們查看下數(shù)據(jù)庫;插入成功了敞葛!
到這里我們的整個(gè)整合就完成了誉察!
但是這里還有一個(gè)問題,不知道大伙有沒有發(fā)現(xiàn)惹谐。
由于我們使用storm進(jìn)行分布式流式計(jì)算持偏,那么分布式最需要注意的是數(shù)據(jù)一致性以及避免臟數(shù)據(jù)的產(chǎn)生;所以我提供的測(cè)試項(xiàng)目只能用于測(cè)試氨肌,正式開發(fā)不能這樣處理鸿秆;
這里給的建議是建立一個(gè)zookeeper的分布式全局鎖,保證數(shù)據(jù)一致性怎囚,避免臟數(shù)據(jù)錄入卿叽!
zookeeper客戶端框架大伙可以使用Netflix Curator來完成。