什么是消息系統(tǒng)续膳?
早期兩個(gè)應(yīng)用程序間進(jìn)行消息傳遞需要保證兩個(gè)應(yīng)用程序同時(shí)在線题山,并且耦合度很高坯临。為了解決應(yīng)用程序不在線的情況下業(yè)務(wù)正常運(yùn)轉(zhuǎn)焊唬,就產(chǎn)生了消息系統(tǒng),消費(fèi)發(fā)送者(生產(chǎn)者)將消息發(fā)送至消息系統(tǒng)看靠,消息接受者(消費(fèi)者)從消息系統(tǒng)中獲取消息赶促。
提到消息系統(tǒng),不得不說(shuō)一下JMS即Java消息服務(wù)(Java Message Service)應(yīng)用程序接口挟炬。是一個(gè)Java平臺(tái)中關(guān)于面向消息中間件的API鸥滨。用于在兩個(gè)應(yīng)用程序之間或分布式系統(tǒng)中發(fā)送消息,進(jìn)行異步通信谤祖。Java消息服務(wù)是一個(gè)與具體平臺(tái)無(wú)關(guān)的API婿滓。
通常消息傳遞有兩種類(lèi)型的消息模式可用一種是點(diǎn)對(duì)點(diǎn)queue隊(duì)列模式(p2p),另一種是topic發(fā)布-訂閱模式(public-subscribe)泊脐。
點(diǎn)對(duì)點(diǎn)消息系統(tǒng)
在點(diǎn)對(duì)點(diǎn)系統(tǒng)中空幻,消息被保留在隊(duì)列中烁峭。 一個(gè)或多個(gè)消費(fèi)者可以消耗隊(duì)列中的消息容客,但是特定消息只能由最多一個(gè)消費(fèi)者消費(fèi)。一旦消費(fèi)者讀取隊(duì)列中的消息约郁,它就從該隊(duì)列中消失缩挑。該系統(tǒng)的典型示例是訂單處理系統(tǒng),其中每個(gè)訂單將由一個(gè)訂單處理器處理鬓梅,但多個(gè)訂單處理器也可以同時(shí)工作供置。下圖描述了結(jié)構(gòu)。
發(fā)布 - 訂閱消息系統(tǒng)
在發(fā)布-訂閱系統(tǒng)中绽快,消息被保留在主題中芥丧。與點(diǎn)對(duì)點(diǎn)系統(tǒng)不同紧阔,消費(fèi)者可以訂閱一個(gè)或多個(gè)主題并使用該主題中的所有消息。 在發(fā)布 - 訂閱系統(tǒng)中续担,消息生產(chǎn)者稱為發(fā)布者擅耽,消息使用者稱為訂閱者。一個(gè)現(xiàn)實(shí)生活的例子是Dish電視物遇,它發(fā)布不同的渠道乖仇,如運(yùn)動(dòng),電影询兴,音樂(lè)等乃沙,任何人都可以訂閱自己的頻道集,并獲得他們訂閱的頻道時(shí)可用诗舰。
MQ消息隊(duì)列對(duì)比
下面針對(duì)RabbitMQ與kafka進(jìn)行對(duì)比
應(yīng)用場(chǎng)景上
RabbitMQ:遵循AMQP(Advanced Message Queuing Protocol)協(xié)議警儒,由內(nèi)在高并發(fā)的erlanng語(yǔ)言開(kāi)發(fā),用在實(shí)時(shí)的對(duì)可靠性要求比較高的消息傳遞上眶根。
kafka:是Linkedin于2010年12月份開(kāi)源的消息發(fā)布訂閱系統(tǒng),它主要用于處理活躍的流式數(shù)據(jù),大數(shù)據(jù)量的數(shù)據(jù)處理上冷蚂。
在吞吐量上
RabbitMQ在吞吐量方面稍遜于kafka,他們的出發(fā)點(diǎn)不一樣汛闸,rabbitMQ支持對(duì)消息的可靠的傳遞蝙茶,支持事務(wù),不支持批量的操作诸老;基于存儲(chǔ)的可靠性的要求存儲(chǔ)可以采用內(nèi)存或者硬盤(pán)隆夯。
kafka具有高的吞吐量,內(nèi)部采用消息的批量處理别伏,數(shù)據(jù)的存儲(chǔ)和獲取是本地磁盤(pán)順序批量操作蹄衷,消息處理的效率很高。
在集群負(fù)載均衡上
RabbitMQ的負(fù)載均衡需要單獨(dú)的loadbalancer進(jìn)行支持厘肮。
kafka采用zookeeper對(duì)集群中的broker愧口、consumer進(jìn)行協(xié)調(diào)管理。
什么是Kafka类茂?
Apache Kafka是一個(gè)分布式發(fā)布-訂閱消息系統(tǒng)和一個(gè)強(qiáng)大的隊(duì)列耍属,實(shí)際上就是JMS的一個(gè)變形,可以處理大量的數(shù)據(jù)巩检,并使您能夠?qū)⑾囊粋€(gè)端點(diǎn)傳遞到另一個(gè)端點(diǎn)厚骗。Kafka適合離線和在線消息消費(fèi)。Kafka消息保留在磁盤(pán)上兢哭,并在群集內(nèi)復(fù)制以防止數(shù)據(jù)丟失领舰。Kafka構(gòu)建在ZooKeeper同步服務(wù)之上。
Kafka的特性
以下是Kafka的幾個(gè)好處
高吞吐量、低延遲:kafka每秒可以處理幾十萬(wàn)條消息冲秽,它的延遲最低只有幾毫秒舍咖,每個(gè)topic可以分多個(gè)partition, consumer group 對(duì)partition進(jìn)行消費(fèi)操作。
可擴(kuò)展性:kafka集群支持熱擴(kuò)展
持久性锉桑、可靠性:消息被持久化到本地磁盤(pán)谎仲,并且支持?jǐn)?shù)據(jù)備份防止數(shù)據(jù)丟失
容錯(cuò)性:允許集群中節(jié)點(diǎn)失敗(若副本數(shù)量為n,則允許n-1個(gè)節(jié)點(diǎn)失斉俾亍)
高并發(fā):支持?jǐn)?shù)千個(gè)客戶端同時(shí)讀寫(xiě)
應(yīng)用場(chǎng)景
Kafka可以在許多用例中使用郑诺。 其中一些列出如下:
日志收集:一個(gè)公司可以用Kafka可以收集各種服務(wù)的log,通過(guò)kafka以統(tǒng)一接口服務(wù)的方式開(kāi)放給各種consumer杉武,例如hadoop辙诞、Hbase、Solr等轻抱。
消息系統(tǒng):解耦和生產(chǎn)者和消費(fèi)者飞涂、緩存消息等。
用戶活動(dòng)跟蹤:Kafka經(jīng)常被用來(lái)記錄web用戶或者app用戶的各種活動(dòng)祈搜,如瀏覽網(wǎng)頁(yè)较店、搜索、點(diǎn)擊等活動(dòng)容燕,這些活動(dòng)信息被各個(gè)服務(wù)器發(fā)布到kafka 的topic中梁呈,然后訂閱者通過(guò)訂閱這些topic來(lái)做實(shí)時(shí)的監(jiān)控分析,或者裝載到hadoop蘸秘、數(shù)據(jù)倉(cāng)庫(kù)中做離線分析和挖掘官卡。
運(yùn)營(yíng)指標(biāo):Kafka也經(jīng)常用來(lái)記錄運(yùn)營(yíng)監(jiān)控?cái)?shù)據(jù)。包括收集各種分布式應(yīng)用的數(shù)據(jù)醋虏,生產(chǎn)各種操作的集中反饋寻咒,比如報(bào)警和報(bào)告。
流式處理:比如spark streaming和storm
事件源
Kafka基本概念
Kafka中發(fā)布訂閱的對(duì)象是topic颈嚼。我們可以為每類(lèi)數(shù)據(jù)創(chuàng)建一個(gè)topic毛秘,把向topic發(fā)布消息的客戶端稱作producer,從topic訂閱消息的客戶端稱作consumer阻课。Producers和consumers可以同時(shí)從多個(gè)topic讀寫(xiě)數(shù)據(jù)叫挟。一個(gè)kafka集群由一個(gè)或多個(gè)broker服務(wù)器組成,它負(fù)責(zé)持久化和備份具體的kafka消息柑肴。
Broker(經(jīng)紀(jì)人):Kafka節(jié)點(diǎn)霞揉,一個(gè)Kafka節(jié)點(diǎn)就是一個(gè)broker旬薯,多個(gè)broker可以組成一個(gè)Kafka集群晰骑。
Topic(主題):一類(lèi)消息,消息存放的目錄即主題,例如page view日志硕舆、click日志等都可以以topic的形式存在秽荞,Kafka集群能夠同時(shí)負(fù)責(zé)多個(gè)topic的分發(fā)。
Partition:topic物理上的分組抚官,一個(gè)topic可以分為多個(gè)partition扬跋,每個(gè)partition是一個(gè)有序的隊(duì)列。
Segment:partition物理上由多個(gè)segment組成凌节,每個(gè)Segment存著message信息钦听。
offset:一條消息在消息系統(tǒng)中的偏移量。
Producer : 生產(chǎn)message發(fā)送到topic倍奢。
Consumer : 訂閱topic消費(fèi)message, consumer作為一個(gè)線程來(lái)消費(fèi)朴上。
ConsumerGroup:一個(gè)ConsumerGroup包含多個(gè)consumer,這個(gè)是預(yù)先在配置文件中配置好的。各個(gè)consumer(consumer線程)可以組成一個(gè)組(Consumer group)卒煞,partition中的每個(gè)message只能被組(Consumer group) 中的一個(gè)consumer(consumer 線程)消費(fèi)痪宰,如果一個(gè)message可以被多個(gè)consumer(consumer 線程) 消費(fèi)的話,那么這些consumer必須在不同的組畔裕。Kafka不支持一個(gè)partition中的message由兩個(gè)或兩個(gè)以上的consumer thread來(lái)處理衣撬,即便是來(lái)自不同的consumer group的也不行。它不能像AMQ那樣可以多個(gè)BET作為consumer去處理message扮饶,這是因?yàn)槎鄠€(gè)BET去消費(fèi)一個(gè)Queue中的數(shù)據(jù)的時(shí)候具练,由于要保證不能多個(gè)線程拿同一條message,所以就需要行級(jí)別悲觀鎖(for update),這就導(dǎo)致了consume的性能下降甜无,吞吐量不夠靠粪。而kafka為了保證吞吐量,只允許一個(gè)consumer線程去訪問(wèn)一個(gè)partition毫蚓。如果覺(jué)得效率不高的時(shí)候占键,可以加partition的數(shù)量來(lái)橫向擴(kuò)展,那么再加新的consumer thread去消費(fèi)元潘。這樣沒(méi)有鎖競(jìng)爭(zhēng)畔乙,充分發(fā)揮了橫向的擴(kuò)展性,吞吐量極高翩概。這也就形成了分布式消費(fèi)的概念牲距。
生產(chǎn)者和消費(fèi)者
針對(duì)生產(chǎn)者和消費(fèi)者,需要注意以下幾點(diǎn)
分區(qū)在producer端進(jìn)行
一個(gè)分區(qū)只會(huì)由消費(fèi)者組內(nèi)的一個(gè)consumer消費(fèi)钥庇,kafka會(huì)通過(guò)負(fù)載均衡機(jī)制自動(dòng)分配
offset由consumer端進(jìn)行維護(hù)牍鞠,一般交給zookeeper進(jìn)行維護(hù)
只能保證一個(gè)分區(qū)內(nèi)的數(shù)據(jù)是有序的
二、 Apache Kafka - 安裝步驟
注:安裝kafka前需要提前安裝JDK與zookeeper
Step 1: 下載Kafka并解壓
> tar -xzfkafka_2.9.2-0.8.1.1.tgz
> cdkafka_2.9.2-0.8.1.1
Step 2: 配置環(huán)境變量(可選)
vi/etc/profile
KAFKA_HOME=/opt/kafka_2.9.2-0.8.1.1
PATH=$PATH:$KAFKA_HOME/bin Step 3: 修改配置文件中的以下內(nèi)容
cd /opt/kafka_2.9.2-0.8.1.1/config
viserver.properties
broker.id=0 //為依次增長(zhǎng)的:0评姨、1难述、2、3、4胁后,集群中唯一id
log.dirs=/opt/kafka_2.9.2-0.8.1.1/logs //日志地址
zookeeper.connect=localhost:2181 //zookeeperServers列表店读,各節(jié)點(diǎn)以逗號(hào)分開(kāi)
cd /opt/kafka_2.9.2-0.8.1.1/config
vi zookeeper.properties
dataDir=/usr/local/kafka/zookeeper
dataLogDir=/usr/local/kafka/log/zookeeper
Step 4: 啟動(dòng)單節(jié)點(diǎn)服務(wù)
在kafka的bin中存在很多sh文件,其中包含對(duì)zookeeper的啟動(dòng)與停止攀芯。首先啟動(dòng)zookeeper再啟動(dòng)kafka的broker屯断。
./bin/zookeeper-server-start.shconfig/zookeeper.properties &
./bin/kafka-server-start.shconfig/server.properties &
Step 5: 創(chuàng)建topic
./bin/kafka-topics.sh --create --zookeeper192.168.2.105:2181 --replication-factor 1 --partitions 1 --topic testlzy
列出所有topic
./bin/kafka-topics.sh --zookeeper 192.168.2.105:2181--list
Step 5: 創(chuàng)建生產(chǎn)者
./bin/kafka-console-producer.sh--broker-list 192.168.2.105:9093 --topic testlzy
Step 6: 創(chuàng)建消費(fèi)者
./bin/kafka-console-consumer.sh --zookeeperlocalhost:2181 --topic testlzy --from-beginning
此時(shí)如果在生產(chǎn)者控制臺(tái)中發(fā)布消息,消費(fèi)者端能接收到侣诺,就算成功了殖演。
kafka常用命令
以下是kafka常用命令行總結(jié):
0. 查看有哪些主題:
./kafka-topics.sh --list --zookeeper192.168.0.201:2181
1. 查看topic的詳細(xì)信息
./kafka-topics.sh -zookeeper 127.0.0.1:2181-describe -topic testKJ1
2. 為topic增加副本
./kafka-reassign-partitions.sh -zookeeper127.0.0.1:2181 -reassignment-json-file json/partitions-to-move.json -execute
3. 創(chuàng)建topic
./kafka-topics.sh --create --zookeeperlocalhost:2181 --replication-factor 1 --partitions 1 --topic testKJ1
4. 為topic增加partition
./bin/kafka-topics.sh –zookeeper127.0.0.1:2181 –alter –partitions 20 –topic testKJ1
5. kafka生產(chǎn)者客戶端命令
./kafka-console-producer.sh --broker-listlocalhost:9092 --topic testKJ1
6. kafka消費(fèi)者客戶端命令
./kafka-console-consumer.sh -zookeeperlocalhost:2181 --from-beginning --topic testKJ1
7. kafka服務(wù)啟動(dòng)
./kafka-server-start.sh -daemon../config/server.properties
8. 下線broker
./kafka-run-class.shkafka.admin.ShutdownBroker --zookeeper 127.0.0.1:2181 --broker #brokerId#--num.retries 3 --retry.interval.ms 60
shutdown broker
9. 刪除topic
./kafka-run-class.shkafka.admin.DeleteTopicCommand --topic testKJ1 --zookeeper 127.0.0.1:2181
./kafka-topics.sh --zookeeperlocalhost:2181 --delete --topic testKJ1
10. 查看consumer組內(nèi)消費(fèi)的offset
./kafka-run-class.shkafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test--topic testKJ1
./kafka-consumer-offset-checker.sh --zookeeper192.168.0.201:2181 --group group1 --topic group1
三、 Apache Kafka – 核心原理
負(fù)載均衡
負(fù)載均衡的兩種策略(消費(fèi)端配置)
partition.assignment.strategy=range|round-robin
在kafka中partition分發(fā)消息給消費(fèi)者不是已消費(fèi)為力度進(jìn)行分配的年鸳,是以消費(fèi)者線程為力度進(jìn)行分配的剃氧。
Range
kafka中的每個(gè)topic的分區(qū)是獨(dú)立進(jìn)行分配的,topic間不受到任何影響阻星。
topic中先是對(duì)partition進(jìn)行數(shù)字排序朋鞍,線程按照字典排序。
接下來(lái)用分區(qū)的數(shù)量除以線程數(shù)量就是每個(gè)線程能夠分到的消息數(shù)量
partition_per_thread= 分區(qū)數(shù)量/線程數(shù)量
如果整除了妥箕,那么每個(gè)線程依次分配partition_per_thread個(gè)分區(qū)
如果不整除滥酥,低位的幾個(gè)thread會(huì)多消費(fèi)分區(qū)
如果分區(qū)個(gè)數(shù)少于線程數(shù)量,就會(huì)出現(xiàn)線程空閑的時(shí)候畦幢,因?yàn)閗afka會(huì)保證一個(gè)分區(qū)只能被一個(gè)消費(fèi)者進(jìn)行消費(fèi)坎吻。所以建議在配置的時(shí)候分區(qū)數(shù)量和消費(fèi)者線程數(shù)量相等最好。
Round-robin
在kafka中一個(gè)消費(fèi)者組是可以訂閱多個(gè)topic的宇葱。當(dāng)訂閱了多個(gè)topic后瘦真,他內(nèi)部會(huì)把所有topic進(jìn)行混亂以后再按照range策略走一遍,他會(huì)保證每個(gè)topic在consumer中的線程數(shù)量必須相等黍瞧。
備注
一般應(yīng)用range的比較多诸尽,如果consumer組中有個(gè)線程shutdown了,那么kafka會(huì)自動(dòng)的重新進(jìn)行負(fù)載均衡的分配印颤。這個(gè)負(fù)載均衡增加了下游的消費(fèi)能力您机。而且非常方便的進(jìn)行消費(fèi)者的擴(kuò)展。當(dāng)然kafka也可以去除這樣的負(fù)載均衡策略年局,默認(rèn)消費(fèi)端分為high level的客戶端(啟用負(fù)載均衡機(jī)制)和simple的客戶端(不啟用負(fù)載均衡际看,需要自己決定消費(fèi)哪個(gè)分區(qū)的消息)。
主從及副本分布
kafka的主從主要提供了分區(qū)容錯(cuò)的能力矢否,可以配置一個(gè)leader和若干follower仲闽,leader是處理消息,而follower只是leader的一個(gè)備份僵朗,平常的連接都是連在leader上的赖欣。當(dāng)leader宕機(jī)以后屑彻,kafka會(huì)從follower中選舉一臺(tái)leader來(lái)進(jìn)行服務(wù)。
對(duì)于第R個(gè)副本畏鼓,先隨機(jī)取一個(gè)broker放分區(qū)0酱酬,然后順序放其他分區(qū)壶谒。這樣保證了leader和follower均勻的分布在了每個(gè)broker上云矫。
通過(guò)-topic命令可以查看指定topic的分區(qū)和副本的分布情況
topic:topic名稱
partition:分區(qū)名稱
leader:此分區(qū)的leader在哪個(gè)broker上
replicas:所有的副本分布在哪個(gè)broker上
isr:replicas中所有in-sync的節(jié)點(diǎn)
對(duì)于in-sync
節(jié)點(diǎn)必須可以維護(hù)和zookeeper的連接,zookeeper通過(guò)心跳機(jī)制檢查每個(gè)節(jié)點(diǎn)的連接汗菜。
如果節(jié)點(diǎn)是個(gè)follower让禀。他必須能及時(shí)的同步leader的寫(xiě)操作,延時(shí)不能太久陨界。
設(shè)置方式
replica.lag.max.messages:落后的消息數(shù)量
replica.lag.time.max.ms:卡住的時(shí)間
kafka是通過(guò)這兩個(gè)參數(shù)去判斷是不是一個(gè)有效的副本follower巡揍。當(dāng)leader宕機(jī)以后,是從這些有效副本中進(jìn)行選舉的菌瘪。無(wú)效的是不參加選舉的腮敌。
kafka的持久化
消息格式
kafka的消息格式如圖
文件系統(tǒng)
kafka會(huì)將消息組織到硬盤(pán)上,在broker的數(shù)據(jù)目錄中會(huì)有以topic名稱-分區(qū)號(hào)命名的文件夾俏扩,
在文件夾中存在成對(duì)出現(xiàn)的文件糜工。kafka不是將所有消息放到一個(gè)大文件里,而是根據(jù)消息的offset進(jìn)行了分段录淡。每一個(gè)段內(nèi)放多少消息是可以配置的捌木。文件名字代表此文件中的第一個(gè)數(shù)據(jù)的offset。index為索引文件嫉戚,log為數(shù)據(jù)文件刨裆,存放的消息格式見(jiàn)上圖。對(duì)于index文件維護(hù)的是一個(gè)稀疏索引彬檀,由消息的編號(hào)指向物理偏移帆啃,運(yùn)行時(shí)會(huì)被加載到內(nèi)存。
過(guò)期數(shù)據(jù)清理
kafka既然支持了持久化窍帝,他對(duì)磁盤(pán)空間是有要求的链瓦。對(duì)于刪除過(guò)期數(shù)據(jù)kafka提供了兩種策略
1、默認(rèn)策略為直接刪除
l 超過(guò)指定的時(shí)間的消息:
log.retention.hours=168
l 超過(guò)指定大小后盯桦,刪除舊的消息:
log.retention.bytes=1073741824
2慈俯、壓縮(只在特定的業(yè)務(wù)場(chǎng)景下有意義)
全局:log.cleaner.enable=true
在特定的topic上:log.cleanup.policy=compact
保留每個(gè)key最后一個(gè)版本的信息,若最后一個(gè)版本消息內(nèi)容為空拥峦,這個(gè)key被刪除
在互聯(lián)網(wǎng)公司面試中贴膘,架構(gòu)的底層一定是面試官會(huì)問(wèn)問(wèn)的問(wèn)題,針對(duì)面試官一般會(huì)提到的問(wèn)題略号,我錄制了一些分布式刑峡,微服務(wù)洋闽,性能優(yōu)化等技術(shù)點(diǎn)底層原理的錄像視頻,加群617434785可以免費(fèi)獲取這些錄像突梦,里面還有些分布式诫舅,微服務(wù),性能優(yōu)化宫患,春天設(shè)計(jì)時(shí)刊懈,MyBatis的等源碼知識(shí)點(diǎn)的錄像視頻