1. 介紹
采用ZooKeeper+Kafka的方式建立集群玫恳,主要支持了消息傳遞和負(fù)載均衡,常見的一些文檔优俘,有的是概念較多京办,沒有實(shí)際操作;有的只有命令帆焕,不知其原理惭婿。這里就結(jié)合集群礦池來說說它的具體應(yīng)用場(chǎng)景,原理叶雹,以及具體實(shí)現(xiàn)财饥。
2. 舉例
先舉個(gè)例子,比如一個(gè)理發(fā)店:
一開始只有一個(gè)理發(fā)師折晦,洗剪吹都由他一個(gè)人負(fù)責(zé)佑力,那么只能一個(gè)客人做完全套(同一臺(tái)機(jī)器上操作各個(gè)步驟),再給下一個(gè)客人服務(wù)筋遭。
之后, 又來了幾個(gè)理發(fā)師,每個(gè)人負(fù)責(zé)接待一個(gè)客人(多臺(tái)服務(wù)器,每臺(tái)負(fù)責(zé)單個(gè)流程)漓滔。
再后來理發(fā)店又?jǐn)U大编饺,人多了(多臺(tái)不同用途的機(jī)器),老板發(fā)現(xiàn)如果把工作細(xì)分為:洗(A)剪(B)吹(C)三部分响驴,這三部分的工作量和難度都不同透且,可以安排專人專職,以節(jié)約人力成本豁鲤。三個(gè)模塊工作量不同秽誊,需要的人數(shù)也不同,如上圖所示琳骡,洗2人(2臺(tái)機(jī)器)锅论,剪3人(3臺(tái)機(jī)器),吹2人(2臺(tái)機(jī)器)楣号。Data為庫(kù)房(數(shù)據(jù)服務(wù)器)最易,每個(gè)人都可能去庫(kù)房存取工具。
顯然炫狱,A與B并不是一一對(duì)應(yīng)的關(guān)系藻懒。洗發(fā)之后,應(yīng)該分配剪發(fā)員執(zhí)行下一步驟视译。讓洗發(fā)員找每個(gè)剪發(fā)員查詢狀態(tài)顯然不是明智的作法嬉荆,而且,此時(shí)可能每個(gè)剪發(fā)員都不空閑酷含,A還有接下來的工作鄙早,也不能一接等在那兒。
于是就引入了另一個(gè)角色:接待員broker第美,當(dāng)某個(gè)A的工作做完全后蝶锋,則向broker的topic發(fā)一個(gè)消息,告訴它本層已完成什往,需要下一層處理扳缕,此時(shí)A是發(fā)送者producer,中介是broker(它也位于一臺(tái)機(jī)器上别威,這里沒用方框畫出來)躯舔,如果某個(gè)B空閑時(shí),也會(huì)連接到topic省古,等待下一個(gè)工作(如上圖所示)粥庄,此時(shí)B是接受者customer。B和C的通訊也是一樣豺妓,不過它們間通訊時(shí)B是producer惜互,C是customer布讹。
Broker是Kafka的一個(gè)實(shí)例,broker按功能不同又可建立多個(gè)topic训堆,topic里又可包含多個(gè)partition描验,每個(gè)partition都可保證先入先出,但partition之間不保證順序坑鱼。多個(gè)broker又可以做成Kafka群組膘流。綜上,Kafka是傳送消息的工具鲁沥,或者說是一套機(jī)制呼股,它實(shí)現(xiàn)了消息創(chuàng)建,存取等工作画恰。
在producer彭谁,customer,broker的背后又有著ZooKeeper的支持阐枣,它就好像理發(fā)店的排班員马靠,在這里主要負(fù)責(zé)負(fù)載均衡,將工作平均分配給customer蔼两,避免產(chǎn)生閑的閑死忙的忙死這種問題甩鳄。ZooKeeper也可以有多個(gè),即使某一個(gè)死掉额划,整個(gè)系統(tǒng)仍可正常運(yùn)轉(zhuǎn)妙啃。
從此處可見Kafka和Zookeeper上跑的不是執(zhí)行具體工作的程序(洗剪吹),它們的角色是提供通訊和負(fù)載均衡的協(xié)調(diào)者俊戳。具體洗剪吹運(yùn)行在哪個(gè)服務(wù)器上是另外的工作.
3. 配置過程
1) 配置ZooKeeper
在一臺(tái)或幾臺(tái)機(jī)器上配置和運(yùn)行ZooKeeper服務(wù)揖赴。
2) 配置Kafka
配置Kafka的 broker
創(chuàng)建topic及partition。
3) 修改程序
在程序中分別加入對(duì)producer和consumer的調(diào)用抑胎,以傳遞信息燥滑。
4) 說明
通過以上步驟,程序就可以在不同的機(jī)器通過簡(jiǎn)單地調(diào)用函數(shù)傳遞消息了阿逃。上述的ZooKeeper铭拧,Kafka,程序可以運(yùn)行在同一臺(tái)機(jī)器上恃锉,可以分別在運(yùn)行在不同機(jī)器上搀菩。
4. ZooKeeper
1) 介紹
ZooKeeper是一個(gè)快速、高可靠破托、容錯(cuò)肪跋、分布式的協(xié)調(diào)服務(wù)。
2) 配置
$ cp conf/zoo_sample.cfg conf/zoo.cfg
$ vi conf/zoo.cfg #修改配置文件
配置文件中server*是zookeeper服務(wù)器之間交互用的土砂,它使得一臺(tái)服務(wù)器死機(jī)時(shí)州既,另外一臺(tái)谜洽,能快速轉(zhuǎn)換為主zookeeper服務(wù)器
dataDir是數(shù)據(jù)目錄,clientPort為ZooKepper端口號(hào)易桃,一般不用改
$ ./startserver.sh start # 啟動(dòng)zooKeeper服務(wù)
3) 調(diào)試工具
$ bin/zkCli.sh -server 127.0.0.1:2181 #client端連上看一下
$ jps #查看進(jìn)程, 其中QuorumPeerMain是zooKeeper的主進(jìn)程
$ netstat -nap|grep 2181 # 查看配置文件中設(shè)置的端口是否打開
正常顯示為:
tcp 0 0 0.0.0.0:2181 0.0.0.0:* LISTEN 10737/java
5. Kafka
1) 介紹
Kafka是一個(gè)分布式的流數(shù)據(jù)處理平臺(tái)褥琐。
2) 配置文件
$ vi config/server.properties
這是默認(rèn)的配置文件,以下幾項(xiàng)比較重要
broker.id=1 # broker的標(biāo)識(shí)晤郑,具有唯一性
port=9092 # kafka服務(wù)的端口號(hào)
log.dirs=/tmp/kafka-logs # 存儲(chǔ)log的目錄,交互的信息就存在該目錄下
zookeeper.connect=127.0.0.1:2181 # zookeeper服務(wù)器的IP和端口,多個(gè)用逗號(hào)分開
host.name=127.0.0.1 # 運(yùn)行本程序機(jī)器的IP地址贸宏,在btcpool的cfg中指定
delete.topic.enable=true #建議加這句造寝,否則刪topic時(shí)候特別麻煩
message.max.bytes=20000000 # 支持大數(shù)據(jù)msg,否則傳輸時(shí)將丟棄大數(shù)據(jù)塊
replica.fetch.max.bytes=30000000 # 同上
3) 啟動(dòng)服務(wù)
$ bin/kafka-server-start.sh config/server.properties
4) 創(chuàng)建供btcpool使用的topic
需要先啟后臺(tái)服務(wù)吭练,再創(chuàng)建topic诫龙,我這里使用了一個(gè)本機(jī)的zookeeper,如果有多個(gè)鲫咽,請(qǐng)使用逗號(hào)分隔签赃。
$ ./bin/kafka-topics.sh --create --topic test --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1
5) 相關(guān)工具
i. 查看topic
$ ./bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 #查看topic
ii. 查看kafka服務(wù)
$ netstat -nap|grep 9092
正常運(yùn)行時(shí)返回
tcp 0 0 127.0.0.1:9092 0.0.0.0:* LISTEN 24961/java
6) 刪除topic
先刪除kafka存儲(chǔ)目錄(server.properties文件log.dirs配置,默認(rèn)為"/tmp/kafka-logs")相關(guān)topic目錄分尸。注意partition的數(shù)據(jù)都存在這個(gè)目錄中锦聊。
./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic test -delete
如果在server.properties中未指定delete.topic.enable=true,則只會(huì)做一個(gè)刪除標(biāo)記箩绍,不會(huì)真正刪除孔庭,刪除需要在zookeeper服務(wù)器上,通過zkCli.sh連進(jìn)去刪除材蛛。
7) 常出現(xiàn)的錯(cuò)誤
i. kafka-server-start.sh報(bào)錯(cuò)找不到主機(jī)圆到,然后直接退出
解決方法,在配置文件中設(shè)置主機(jī)
host.name=主機(jī)名
主機(jī)名通過命令hostname查看
ii. kafka-server-start.sh雖不退出卑吭,但不停報(bào)錯(cuò)java.lang.NoClassDefFoundError: Could not initialize class kafka.network.RequestChannel$
這也是主機(jī)名相關(guān)的問題芽淡,在/etc/hosts的最后填加一句
127.0.0.1 主機(jī)名
主機(jī)名通過命令hostname查看
8) 測(cè)試kafka
正常情況下,producer上發(fā)的豆赏,consumer都應(yīng)該能收到
i. 創(chuàng)建
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
ii. 接收
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
iii. 發(fā)送
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
如果topic沒建立挣菲,producer會(huì)自動(dòng)建立top,但參數(shù)可能與你想要的不同.
6. C程序調(diào)用kafka發(fā)消息
一般使用librdkafka庫(kù)河绽,下載庫(kù)源碼編譯己单,其中example目錄中的程序也可以用于測(cè)試。
7. 參考
1) Apache Kafka 分布式消息隊(duì)列中間件安裝與配置
http://blog.csdn.net/wangjia184/article/details/37921183
2) Kafka&Zookeeper原理與應(yīng)用場(chǎng)景介紹
https://wenku.baidu.com/view/a47389116ad97f192279168884868762caaebbd4.html