轉自:https://www.cnblogs.com/qingyunzong/category/1212387.html
按照自己理解加粗了重點
一、前言,所謂消息隊列
一個消息系統(tǒng)負責將數(shù)據(jù)從一個應用傳遞到另外一個應用逮诲,應用只需關注于數(shù)據(jù),無需關注數(shù)據(jù)在兩個或多個應用間是如何傳遞的础废。
有兩種主要的消息傳遞模式:點對點傳遞模式汛骂、發(fā)布-訂閱模式。
點對點消息傳遞模式
在點對點消息系統(tǒng)中评腺,消息持久化到一個隊列中帘瞭。
此時,將有一個或多個消費者消費隊列中的數(shù)據(jù)蒿讥。
但是一條消息只能被消費一次蝶念。
當一個消費者消費了隊列中的某條數(shù)據(jù)之后,該條數(shù)據(jù)則從消息隊列中刪除芋绸。
生產(chǎn)者發(fā)送一條消息到queue媒殉,只有一個消費者能收到。
該模式即使有多個消費者同時消費數(shù)據(jù)摔敛,也能保證數(shù)據(jù)處理的順序廷蓉。這種架構描述示意圖如下:
發(fā)布-訂閱消息傳遞模式
在發(fā)布-訂閱消息系統(tǒng)中,消息被持久化到一個topic中马昙。
與點對點消息系統(tǒng)不同的是桃犬,消費者可以訂閱一個或多個topic,
消費者可以消費該topic中所有的數(shù)據(jù)行楞,同一條數(shù)據(jù)可以被多個消費者消費攒暇,
數(shù)據(jù)被消費后不會立馬刪除。
發(fā)布者發(fā)送到topic的消息子房,只有訂閱了topic的訂閱者才會收到消息形用。
在發(fā)布-訂閱消息系統(tǒng)中,消息的生產(chǎn)者稱為發(fā)布者证杭,消費者稱為訂閱者田度。該模式的示例圖如下:
二、消息隊列的優(yōu)點
-
解耦
在項目啟動之初來預測將來項目會碰到什么需求解愤,是極其困難的每币。
消息系統(tǒng)在處理過程中間插入了一個隱含的、基于數(shù)據(jù)的接口層琢歇,兩邊的處理過程都要實現(xiàn)這一接口兰怠。
這允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束李茫。 -
冗余(副本)
有些情況下揭保,處理數(shù)據(jù)的過程會失敗。除非數(shù)據(jù)被持久化魄宏,否則將造成丟失秸侣。
消息隊列把數(shù)據(jù)進行持久化直到它們已經(jīng)被完全處理,通過這一方式規(guī)避了數(shù)據(jù)丟失風險宠互。
許多消息隊列所采用的"插入-獲取-刪除"范式中味榛,在把一個消息從隊列中刪除之前,
需要你的處理系統(tǒng)明確的指出該消息已經(jīng)被處理完畢予跌,從而確保你的數(shù)據(jù)被安全的保存直到你使用完畢搏色。 -
擴展性
因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的券册,
只要另外增加處理過程即可频轿。
不需要改變代碼、不需要調節(jié)參數(shù)烁焙。擴展就像調大電力按鈕一樣簡單航邢。 -
靈活性&峰值處理能力
在訪問量劇增的情況下,應用仍然需要繼續(xù)發(fā)揮作用骄蝇,但是這樣的突發(fā)流量并不常見膳殷;
如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。
使用消息隊列能夠使關鍵組件頂住突發(fā)的訪問壓力九火,而不會因為突發(fā)的超負荷的請求而完全崩潰赚窃。 -
可恢復性
系統(tǒng)的一部分組件失效時,不會影響到整個系統(tǒng)吃既。
消息隊列降低了進程間的耦合度考榨,
所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統(tǒng)恢復后被處理鹦倚。 -
順序保證
在大多使用場景下河质,數(shù)據(jù)處理的順序都很重要。
大部分消息隊列本來就是排序的震叙,并且能保證數(shù)據(jù)會按照特定的順序來處理掀鹅。
Kafka保證一個Partition內的消息的有序性。 -
緩沖
在任何重要的系統(tǒng)中媒楼,都會有需要不同的處理時間的元素乐尊。
例如,加載一張圖片比應用過濾器花費更少的時間划址。
消息隊列通過一個緩沖層來幫助任務最高效率的執(zhí)行———寫入隊列的處理會盡可能的快速扔嵌。
該緩沖有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度限府。 -
異步通信
很多時候,用戶不想也不需要立即處理消息痢缎。
消息隊列提供了異步處理機制胁勺,允許用戶把一個消息放入隊列,但并不立即處理它独旷。
想向隊列中放入多少消息就放多少署穗,然后在需要的時候再去處理它們。
三嵌洼、常用Message Queue對比
RabbitMQ
略
Redis
Redis是一個基于Key-Value對的NoSQL數(shù)據(jù)庫案疲,開發(fā)維護很活躍。
雖然它是一個Key-Value數(shù)據(jù)庫存儲系統(tǒng)麻养,但它本身支持MQ功能褐啡,所以完全可以當做一個輕量級的隊列服務來使用。
對于RabbitMQ和Redis的入隊和出隊操作回溺,各執(zhí)行100萬次春贸,每10萬次記錄一次執(zhí)行時間。
測試數(shù)據(jù)分為128Bytes遗遵、512Bytes萍恕、1K和10K四個不同大小的數(shù)據(jù)。
實驗表明:
入隊時车要,當數(shù)據(jù)比較小時Redis的性能要高于RabbitMQ允粤,而如果數(shù)據(jù)大小超過了10K,Redis則慢的無法忍受翼岁;
出隊時类垫,無論數(shù)據(jù)大小,Redis都表現(xiàn)出非常好的性能琅坡,而RabbitMQ的出隊性能則遠低于Redis悉患。
ZeroMQ
略
ActiveMQ
ActiveMQ是Apache下的一個子項目。
類似于ZeroMQ榆俺,它能夠以代理人和點對點的技術實現(xiàn)隊列售躁。
同時類似于RabbitMQ,它少量代碼就可以高效地實現(xiàn)高級應用場景茴晋。
Kafka/Jafka
Kafka是Apache下的一個子項目陪捷,是一個高性能跨語言分布式發(fā)布/訂閱消息隊列系統(tǒng),而Jafka是在Kafka之上孵化而來的诺擅,即Kafka的一個升級版市袖。
具有以下特性:
- 快速持久化,可以在O(1)的系統(tǒng)開銷下進行消息持久化烁涌;
- 高吞吐苍碟,在一臺普通的服務器上既可以達到10W/s的吞吐速率酒觅;
- 完全的分布式系統(tǒng),Broker微峰、Producer阐滩、Consumer都原生自動支持分布式,自動實現(xiàn)負載均衡县忌;
- 支持Hadoop數(shù)據(jù)并行加載,對于像Hadoop的一樣的日志數(shù)據(jù)和離線分析系統(tǒng)继效,但又要求實時處理的限制症杏,這是一個可行的解決方案。Kafka通過Hadoop的并行加載機制統(tǒng)一了在線和離線的消息處理瑞信。
Apache Kafka相對于ActiveMQ是一個非常輕量級的消息系統(tǒng)厉颤,除了性能非常好之外,還是一個工作良好的分布式系統(tǒng)凡简。
四逼友、Kafka中的術語解釋
在深入理解Kafka之前,先介紹一下Kafka中的術語秤涩。下圖展示了Kafka的相關術語以及之間的關系:
上圖中一個topic配置了3個partition。
Partition1有兩個offset:0和1筐眷。
Partition2有4個offset黎烈。
Partition3有1個offset。
副本的id和副本所在的機器的id恰好相同匀谣。
如果一個topic的副本數(shù)為3照棋,那么Kafka將在集群中為每個partition創(chuàng)建3個相同的副本。
集群中的每個broker存儲一個或多個partition武翎。多個producer和consumer可同時生產(chǎn)和消費數(shù)據(jù)烈炭。
Broker
Kafka 集群包含一個或多個服務器,服務器節(jié)點稱為broker宝恶。
broker存儲topic的數(shù)據(jù)符隙。
- 如果某topic有N個partition,集群有N個broker卑惜,那么每個broker存儲該topic的一個partition膏执。
- 如果某topic有N個partition,集群有(N+M)個broker露久,那么其中有N個broker存儲該topic的一個partition更米,剩下的M個broker不存儲該topic的partition數(shù)據(jù)。
- 如果某topic有N個partition毫痕,集群中broker數(shù)目少于N個征峦,那么一個broker存儲該topic的一個或多個partition迟几。在實際生產(chǎn)環(huán)境中,盡量避免這種情況的發(fā)生栏笆,這種情況容易導致Kafka集群數(shù)據(jù)不均衡类腮。
Topic
每條發(fā)布到Kafka集群的消息都有一個類別,這個類別被稱為Topic蛉加。
(物理上不同Topic的消息分開存儲蚜枢,邏輯上一個Topic的消息雖然保存于一個或多個broker上,
但用戶只需指定消息的Topic即可生產(chǎn)或消費數(shù)據(jù)而不必關心數(shù)據(jù)存于何處)
類似于數(shù)據(jù)庫的表名
Partition
topic中的數(shù)據(jù)分割為一個或多個partition针饥。
每個topic至少有一個partition厂抽。
(多個partition的情況下,一條消息存儲在topic下的其中一個partition中)
每個partition中的數(shù)據(jù)使用多個segment文件存儲丁眼。
partition中的數(shù)據(jù)是有序的筷凤,不同partition間的數(shù)據(jù)丟失了數(shù)據(jù)的順序。
如果topic有多個partition苞七,消費數(shù)據(jù)時就不能保證數(shù)據(jù)的順序藐守。
在需要嚴格保證消息的消費順序的場景下,需要將partition數(shù)目設為1蹂风。
Producer
生產(chǎn)者即數(shù)據(jù)的發(fā)布者卢厂,該角色將消息發(fā)布到Kafka的topic中。
broker接收到生產(chǎn)者發(fā)送的消息后硫眨,broker將該消息追加到當前用于追加數(shù)據(jù)的segment文件中足淆。
生產(chǎn)者發(fā)送的消息,存儲到一個partition中礁阁,生產(chǎn)者也可以指定數(shù)據(jù)存儲的partition巧号。
Consumer
消費者可以從broker中讀取數(shù)據(jù)。消費者可以消費多個topic中的數(shù)據(jù)姥闭。
Consumer Group
每個Consumer屬于一個特定的Consumer Group
(可為每個Consumer指定group name丹鸿,若不指定group name則屬于默認的group)。
Leader
,其中有且僅有一個作為Leader铜跑,
Leader是當前負責數(shù)據(jù)的讀寫的partition门怪。
Follower
Follower跟隨Leader,所有寫請求都通過Leader路由锅纺,
數(shù)據(jù)變更會廣播給所有Follower掷空,F(xiàn)ollower與Leader保持數(shù)據(jù)同步。
如果Leader失效,則從Follower中選舉出一個新的Leader坦弟。
當Follower與Leader掛掉护锤、卡住或者同步太慢,
leader會把這個follower從“in sync replicas”(ISR)列表中刪除酿傍,
重新創(chuàng)建一個Follower烙懦。
總結
五、Kafka的架構
如上圖所示赤炒,
一個典型的Kafka集群中包含:
- 若干Producer(可以是web前端產(chǎn)生的Page View氯析,或者是服務器日志,系統(tǒng)CPU莺褒、Memory等)
- 若干broker(Kafka支持水平擴展魄鸦,一般broker數(shù)量越多,集群吞吐率越高)
- 若干Consumer Group
- 一個Zookeeper集群
Kafka通過Zookeeper管理集群配置癣朗,選舉leader,以及在Consumer Group發(fā)生變化時進行rebalance旺罢。
Producer使用push模式將消息發(fā)布到broker旷余,
Consumer使用pull模式從broker訂閱并消費消息。
六扁达、Topics和Partition
Topic在邏輯上可以被認為是一個queue正卧,每條消費都必須指定它的Topic,
可以簡單理解為必須指明把這條消息放進哪個queue里跪解。
為了使得Kafka的吞吐率可以線性提高炉旷,物理上把Topic分成一個或多個Partition,
每個Partition在物理上對應一個文件夾叉讥,該文件夾下存儲這個Partition的所有消息和索引文件窘行。
創(chuàng)建一個topic時,同時可以指定分區(qū)數(shù)目图仓,分區(qū)數(shù)越多罐盔,其吞吐量也越大,
但是需要的資源也越多救崔,同時也會導致更高的不可用性惶看,
kafka在接收到生產(chǎn)者發(fā)送的消息之后,會根據(jù)均衡策略將消息存儲到不同的分區(qū)中六孵。
因為每條消息都被append到該Partition中纬黎,屬于順序寫磁盤,因此效率非常高
(經(jīng)驗證劫窒,順序寫磁盤效率比隨機寫內存還要高本今,這是Kafka高吞吐率的一個很重要的保證)。
- 對于傳統(tǒng)的message queue而言,一般會刪除已經(jīng)被消費的消息诈泼,
- 而Kafka集群會保留所有的消息懂拾,無論其被消費與否。
當然铐达,因為磁盤限制岖赋,不可能永久保留所有數(shù)據(jù)(實際上也沒必要),
因此Kafka提供兩種策略刪除舊數(shù)據(jù)瓮孙。
- 基于時間:例如可以通過配置$KAFKA_HOME/config/server.properties唐断,讓Kafka刪除一周前的數(shù)據(jù)
- 基于Partition文件大小:在Partition文件超過1GB時刪除舊數(shù)據(jù)
配置如下所示:
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according to the retention policies
log.retention.check.interval.ms=300000
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false
因為Kafka讀取特定消息的時間復雜度為O(1)杭抠,即與文件大小無關脸甘,
所以這里刪除過期文件與提高Kafka性能無關。
選擇怎樣的刪除策略只與磁盤以及具體的需求有關偏灿。
另外丹诀,
Kafka會為每一個Consumer Group保留一些metadata信息——當前消費的消息的position,也即offset翁垂。
這個offset由Consumer控制铆遭。正常情況下Consumer會在消費完一條消息后遞增該offset击困。
當然虽惭,Consumer也可將offset設成一個較小的值,重新消費一些消息边翁。
因為offet由Consumer控制啼肩,所以Kafka broker是無狀態(tài)的橄妆,它不需要標記哪些消息被哪些消費過,
也不需要通過broker去保證同一個Consumer Group只有一個Consumer能消費某一條消息祈坠,
因此也就不需要鎖機制害碾,這也為Kafka的高吞吐率提供了有力保障。
七赦拘、Producer消息路由
Producer發(fā)送消息到broker時蛮原,會根據(jù)Paritition機制選擇將其存儲到哪一個Partition。
如果Partition機制設置合理另绩,所有消息可以均勻分布到不同的Partition里儒陨,這樣就實現(xiàn)了負載均衡。
如果一個Topic對應一個文件笋籽,那這個文件所在的機器I/O將會成為這個Topic的性能瓶頸蹦漠,
而有了Partition后,不同的消息可以并行寫入不同broker的不同Partition里车海,極大的提高了吞吐率笛园。
可以通過以下途徑來指定Topic的默認Partition數(shù)量:
- (新建)在$KAFKA_HOME/config/server.properties中通過配置項num.partitions來指定
- (新建)在創(chuàng)建Topic時通過參數(shù)指定
- (創(chuàng)建之后)同時也可以在Topic創(chuàng)建之后通過Kafka提供的工具修改
在發(fā)送一條消息時隘击,可以指定這條消息的key,
Producer根據(jù)這個key和Partition機制來判斷應該將這條消息發(fā)送到哪個Parition研铆。
Paritition機制可以通過指定Producer的paritition.class這一參數(shù)來指定埋同,
該class必須實現(xiàn)kafka.producer.Partitioner接口。
八棵红、Consumer Group
使用Consumer high level API時凶赁,
同一Topic的一條消息只能被同一個Consumer Group內的一個Consumer消費,
但多個Consumer Group可同時消費這一消息逆甜。
這是Kafka用來實現(xiàn)一個Topic消息的廣播(發(fā)給所有的Consumer)和單播(發(fā)給某一個Consumer)的手段虱肄。
一個Topic可以對應多個Consumer Group。
如果需要實現(xiàn)廣播交煞,只要每個Consumer有一個獨立的Group就可以了咏窿。
要實現(xiàn)單播只要所有的Consumer在同一個Group里。
用Consumer Group還可以將Consumer進行自由的分組而不需要多次發(fā)送消息到不同的Topic素征。
實際上集嵌,Kafka的設計理念之一就是同時提供離線處理和實時處理。
根據(jù)這一特性御毅,
- 可以使用Storm這種實時流處理系統(tǒng)對消息進行實時在線處理纸淮,
- 同時使用Hadoop這種批處理系統(tǒng)進行離線處理,
- 還可以同時將數(shù)據(jù)實時備份到另一個數(shù)據(jù)中心亚享,
只需要保證這三個操作所使用的Consumer屬于不同的Consumer Group即可。
九绘面、Push vs. Pull
作為一個消息系統(tǒng)欺税,Kafka遵循了傳統(tǒng)的方式,
選擇由Producer向broker push消息揭璃,
由Consumer從broker pull消息晚凿。
一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume瘦馍,采用push模式歼秽。
事實上,push模式和pull模式各有優(yōu)劣情组。
- push模式很難適應消費速率不同的消費者燥筷,因為消息發(fā)送速率是由broker決定的。
push模式的目標是盡可能以最快速度傳遞消息院崇,但是這樣很容易造成Consumer來不及處理消息肆氓,
典型的表現(xiàn)就是拒絕服務以及網(wǎng)絡擁塞。 - 而pull模式則可以根據(jù)Consumer的消費能力以適當?shù)乃俾氏M消息底瓣。
對于Kafka而言谢揪,pull模式更合適。
pull模式可簡化broker的設計,Consumer可自主控制消費消息的速率拨扶,
同時Consumer可以自己控制消費方式——即可批量消費也可逐條消費凳鬓,
同時還能選擇不同的提交方式從而實現(xiàn)不同的傳輸語義。
十患民、Kafka delivery guarantee
有這么幾種可能的delivery guarantee:
At most once 消息可能會丟缩举,但絕不會重復傳輸
At least one 消息絕不會丟,但可能會重復傳輸
Exactly once 每條消息肯定會被傳輸一次且僅傳輸一次酒奶,很多時候這是用戶所想要的蚁孔。
當Producer向broker發(fā)送消息時,一旦這條消息被commit惋嚎,因數(shù)replication的存在杠氢,它就不會丟。
但是如果Producer發(fā)送數(shù)據(jù)給broker后另伍,遇到網(wǎng)絡問題而造成通信中斷鼻百,
那Producer就無法判斷該條消息是否已經(jīng)commit。
雖然Kafka無法確定網(wǎng)絡故障期間發(fā)生了什么摆尝,但是Producer可以生成一種類似于主鍵的東西温艇,發(fā)生故障時冪等性的重試多次,這樣就做到了Exactly once堕汞。
接下來討論的是消息從broker到Consumer的delivery guarantee語義勺爱。(僅針對Kafka consumer high level API)。Consumer在從broker讀取消息后讯检,可以選擇commit琐鲁,
該操作會在Zookeeper中保存該Consumer在該Partition中讀取的消息的offset。
該Consumer下一次再讀該Partition時會從下一條開始讀取人灼。
如未commit围段,下一次讀取的開始位置會跟上一次commit之后的開始位置相同。
當然可以將Consumer設置為autocommit投放,即Consumer一旦讀到數(shù)據(jù)立即自動commit奈泪。
如果只討論這一讀取消息的過程,那Kafka是確保了Exactly once灸芳。
但實際使用中應用程序并非在Consumer讀取完數(shù)據(jù)就結束了涝桅,
而是要進行進一步處理,而數(shù)據(jù)處理與commit的順序在很大程度上決定了消息從broker和consumer的delivery guarantee semantic烙样。
Kafka默認保證At least once苹支,并且
允許通過設置Producer異步提交來實現(xiàn)At most once。而
Exactly once要求與外部存儲系統(tǒng)協(xié)作误阻,幸運的是Kafka提供的offset可以非常直接非常容易得使用這種方式债蜜。
十一晴埂、下載、安裝與配置
- 下載地址:
http://kafka.apache.org/downloads.html
http://mirrors.hust.edu.cn/apache/ - 安裝前提(安裝ZK):
參考http://www.cnblogs.com/qingyunzong/p/8634335.html#_label4_0 - 安裝(以版本kafka_2.11-0.8.2.0.tgz為例)
tar -zxvf kafka_2.11-0.8.2.0.tgz -C apps
cd apps/
ln -s kafka_2.11-0.8.2.0/ kafka
- 配置(部分配置為0.8之前的配置)
cd apps/kafka/config/
vi server.properties
//當前機器在集群中的唯一標識寻定,和zookeeper的myid性質一樣
broker.id=0
//當前kafka對外提供服務的端口默認是9092
port=9092
//這個參數(shù)默認是關閉的儒洛,在0.8.1有個bug,DNS解析問題狼速,失敗率的問題琅锻。
host.name=hadoop1
//這個是borker進行網(wǎng)絡處理的線程數(shù)
num.network.threads=3
//這個是borker進行I/O處理的線程數(shù)
num.io.threads=8
//發(fā)送緩沖區(qū)buffer大小,數(shù)據(jù)不是一下子就發(fā)送的向胡,先回存儲到緩沖區(qū)了到達一定的大小后在發(fā)送恼蓬,能提高性能
socket.send.buffer.bytes=102400
//kafka接收緩沖區(qū)大小,當數(shù)據(jù)到達一定大小后在序列化到磁盤
socket.receive.buffer.bytes=102400
//這個參數(shù)是向kafka請求消息或者向kafka發(fā)送消息的請請求的最大數(shù)僵芹,這個值不能超過java的堆棧大小
socket.request.max.bytes=104857600
//消息存放的目錄处硬,這個目錄可以配置為“,”逗號分割的表達式拇派,上面的num.io.threads要大于這個目錄的個數(shù)這個目錄荷辕,
//如果配置多個目錄,新創(chuàng)建的topic他把消息持久化的地方是件豌,當前以逗號分割的目錄中疮方,那個分區(qū)數(shù)最少就放那一個
log.dirs=/home/hadoop/log/kafka-logs
//默認的分區(qū)數(shù),一個topic默認1個分區(qū)數(shù)
num.partitions=1
//每個數(shù)據(jù)目錄用來日志恢復的線程數(shù)目
num.recovery.threads.per.data.dir=1
//默認消息的最大持久化時間茧彤,168小時骡显,7天
log.retention.hours=168
//這個參數(shù)是:因為kafka的消息是以追加的形式落地到文件,當超過這個值的時候曾掂,kafka會新起一個文件
log.segment.bytes=1073741824
//每隔300000毫秒去檢查上面配置的log失效時間
log.retention.check.interval.ms=300000
//是否啟用log壓縮惫谤,一般不用啟用,啟用的話可以提高性能
log.cleaner.enable=false
//設置zookeeper的連接端口
zookeeper.connect=192.168.1.10:2181,192.168.1.11:2181,192.168.1.12:2181
//設置zookeeper的連接超時時間
zookeeper.connection.timeout.ms=6000
vi producer.properties
bootstrap.servers=192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092
#bootstrap.servers= 是0.9之后的配置參數(shù)遭殉,0.8及之前用metadata.broker.list=
vi consumer.properties
bootstrap.servers=192.168.1.4:9092,192.168.1.5:9092,192.168.1.6:9092
#bootstrap.servers= 是0.9之后的配置參數(shù),0.8及之前用zookeeper.connect=
- 將kafka的安裝包分發(fā)到其他節(jié)點
scp -r kafka_2.11-0.8.2.0/ hadoop2:$PWD
scp -r kafka_2.11-0.8.2.0/ hadoop3:$PWD
scp -r kafka_2.11-0.8.2.0/ hadoop4:$PWD
- 建軟聯(lián)
ln -s kafka_2.11-0.8.2.0/ kafka
- 修改環(huán)境變量
vi .bashrc
#Kafka
export KAFKA_HOME=/home/hadoop/apps/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source ~/.bashrc
十二博助、啟動
- 首先啟動zookeeper集群
所有zookeeper節(jié)點都需要執(zhí)行
zkServer.sh start
- 啟動Kafka集群服務(所有险污,分別到各個集群(hadoop1~4)啟動)
bin/kafka-server-start.sh config/server.properties
十三、創(chuàng)建的topic
- 創(chuàng)建
例:
通過kafka-topics.sh腳本來創(chuàng)建一個名為topic-test1并且副本數(shù)為3富岳、分區(qū)數(shù)為3的topic
bin/kafka-topics.sh --create --zookeeper hadoop1:2181 --replication-factor 3 --partitions 3 --topic topic-test1
- 查看topic副本信息
bin/kafka-topics.sh --describe --zookeeper hadoop1:2181 --topic topic-test1
- 查看已經(jīng)創(chuàng)建的topic信息
bin/kafka-topics.sh --list --zookeeper hadoop1:2181
- 生產(chǎn)者發(fā)送消息
bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic topic-test1
hadoop1顯示接收到消息
- 消費者消費消息
在hadoop2上消費消息
bin/kafka-console-consumer.sh --zookeeper hadoop1:2181 --from-beginning --topic topic-test1