常用消息中間件對(duì)比
- Kafka是LinkedIn開源的分布式發(fā)布-訂閱消息系統(tǒng),目前歸屬于Apache定級(jí)項(xiàng)目丧没。Kafka主要特點(diǎn)是基于Pull的模式來(lái)處理消息消費(fèi)嘴脾,追求高吞吐量,一開始的目的就是用于日志收集和傳輸索绪。0.8版本開始支持復(fù)制待锈,不支持事務(wù)漠其,對(duì)消息的重復(fù)、丟失、錯(cuò)誤沒(méi)有嚴(yán)格要求和屎,適合產(chǎn)生大量數(shù)據(jù)的互聯(lián)網(wǎng)服務(wù)的數(shù)據(jù)收集業(yè)務(wù)拴驮。消息的消費(fèi)者主動(dòng)拉取消息。
- RabbitMQ是使用Erlang語(yǔ)言開發(fā)的開源消息隊(duì)列系統(tǒng)柴信,基于AMQP協(xié)議來(lái)實(shí)現(xiàn)套啤。AMQP的主要特征是面向消息、隊(duì)列随常、路由(包括點(diǎn)對(duì)點(diǎn)和發(fā)布/訂閱)潜沦、可靠性、安全绪氛。AMQP協(xié)議更多用在企業(yè)系統(tǒng)內(nèi)唆鸡,對(duì)數(shù)據(jù)一致性、穩(wěn)定性和可靠性要求很高的場(chǎng)景枣察,對(duì)性能和吞吐量的要求還在其次争占。消息的消費(fèi)者被動(dòng)拉取(rabbitMQ 推送消息給消費(fèi)者)序目。
- RocketMQ是阿里開源的消息中間件臂痕,它是純Java開發(fā),具有高吞吐量猿涨、高可用性握童、適合大規(guī)模分布式系統(tǒng)應(yīng)用的特點(diǎn)。RocketMQ思路起源于Kafka嘿辟,但并不是Kafka的一個(gè)Copy舆瘪,它對(duì)消息的可靠傳輸及事務(wù)性做了優(yōu)化片效,目前在阿里集團(tuán)被廣泛應(yīng)用于交易红伦、充值、流計(jì)算淀衣、消息推送昙读、日志流式處理、binglog分發(fā)等場(chǎng)景膨桥。
kafka
In a very basic structure, a producer publishes messages to a Kafka topic (synonymous with “messaging queue”). A topic is also considered as a message category or feed name to which messages are published. Kafka topics are created on a Kafka broker acting as a Kafka server. Kafka brokers also store the messages if required. Consumers then subscribe to the Kafka topic (one or more) to get the messages. Here, brokers and consumers use Zookeeper to get the state information and to track message offsets, respectivelyapache kafka 是一種分布式的蛮浑,基于發(fā)布/訂閱的消息系統(tǒng),由Scala語(yǔ)言編寫而成只嚣,它具備快速沮稚,可擴(kuò)展,可持久化的特點(diǎn)册舞,其與如下幾個(gè)主要關(guān)鍵特性有關(guān):
- Kafka具有近乎實(shí)時(shí)性的消息處理能力蕴掏,即使面對(duì)海量消息也能夠高效地存儲(chǔ)消息和查詢消息。Kafka將消息保存在磁盤中,在其設(shè)計(jì)理念中并不懼怕磁盤操作盛杰,它以順序讀寫的方式訪問(wèn)磁盤挽荡,從而避免了隨機(jī)讀取磁盤導(dǎo)致的性能瓶頸。Kafka把數(shù)據(jù)以消息的形式持久化到磁盤即供,即使Kafka出現(xiàn)宕機(jī)定拟,也可以保證數(shù)據(jù)不會(huì)丟失,為了避免磁盤上的數(shù)據(jù)不斷增長(zhǎng)逗嫡,Kafka提供了日志清理青自,日志壓縮等功能,對(duì)過(guò)時(shí)的祸穷,已經(jīng)處理完成的數(shù)據(jù)進(jìn)行清除性穿。在磁盤操作中,耗時(shí)最長(zhǎng)的就是尋道時(shí)間雷滚,這是導(dǎo)致磁盤的隨機(jī)I/O性能很差的主要原因需曾。
- Kafka支持批量讀寫消息,并且會(huì)對(duì)消息進(jìn)行批量壓縮祈远,這樣既能提高了網(wǎng)絡(luò)的利用率呆万,也提高了壓縮效率。
- Kafka支持消息分區(qū)车份,每個(gè)分區(qū)中的消息保證順序傳輸谋减,而分區(qū)之間則可以并發(fā)操作,這樣就提高了Kafka的并發(fā)能力扫沼。Kafka保證一個(gè)Partition內(nèi)消息的有序性出爹,但是不保證多個(gè)Partition之間的數(shù)據(jù)有順序。
- Kafka支持在線增加分區(qū)缎除,支持在線水平擴(kuò)展严就。
- Kafka支持為每個(gè)分區(qū)創(chuàng)建多個(gè)副本,其中只有一個(gè)Leader副本負(fù)責(zé)讀寫器罐,其他副本只負(fù)責(zé)與Leader副本進(jìn)行同步梢为,這種方式提高了數(shù)據(jù)的容災(zāi)能力,Kafka會(huì)將Leader副本均勻地分布在集群的服務(wù)器上轰坊,實(shí)現(xiàn)性能最大化铸董。Kafka的每個(gè)Topic(主題)都可以分為多個(gè)partition(分區(qū)),每個(gè)分區(qū)都有多個(gè)Replica(副本)肴沫,實(shí)現(xiàn)消息冗余備份粟害。每個(gè)分區(qū)中的消息是不同的,這類似于數(shù)據(jù)庫(kù)中水平切分的思想颤芬,提高了并發(fā)讀寫的能力悲幅。而同一分區(qū)的不同副本中保存的是相同的消息孽文,副本之間是一主多從的關(guān)系,其中Leader副本負(fù)責(zé)處理讀寫請(qǐng)求夺艰,F(xiàn)ollower副本則只與Leader副本進(jìn)行消息同步芋哭,當(dāng)Leader副本出現(xiàn)故障時(shí),則從Follower副本中重新選舉Leader副本對(duì)外提供服務(wù)郁副。
- Kafka Consumer:Consumer使用pull方式從服務(wù)器端拉取消息减牺,并且在Consumer端保存消費(fèi)的具體位置(position),當(dāng)消費(fèi)者宕機(jī)后恢復(fù)上線存谎,可以根據(jù)自己保存的消費(fèi)位置重新拉取需要的消息進(jìn)行消費(fèi)拔疚,這就不會(huì)造成消息丟失,也就是說(shuō)Kafka不決定何時(shí)既荚,如何消費(fèi)消息稚失,而是Consumer自己決定何時(shí),如何消費(fèi)消息恰聘。
- Consumer Group:可以將多個(gè)Consumer加入一個(gè)Consumer Group(消費(fèi)組)句各,在一個(gè)Consumer Group中,每個(gè)分區(qū)只能分配給一個(gè)Consumer消費(fèi)晴叨,當(dāng)Kafka服務(wù)端通過(guò)增加分區(qū)數(shù)量進(jìn)行水平擴(kuò)展后凿宾,可以向Consumer Group增加新的Consumer來(lái)提高整個(gè)Consumer Group的消費(fèi)能力。當(dāng)Consumer Group中的一個(gè)Consumer出現(xiàn)故障下線時(shí)兼蕊,會(huì)通過(guò)Rebalance操作將下線Consumer負(fù)責(zé)處理的分區(qū)分配給其他Consumer進(jìn)行處理初厚;當(dāng)下線Consumer重新上線加入Consumer Group時(shí),會(huì)再進(jìn)行一次Rebalance操作孙技,重新分配分區(qū)产禾。當(dāng)然一個(gè)Consumer Group可以訂閱多個(gè)不同Topic,每一個(gè)Consumer可以同時(shí)處理多個(gè)分區(qū)牵啦。
kafka 相關(guān)概念
消息: 消息是Kafka中最基本的數(shù)據(jù)單元亚情,消息由一串字節(jié)構(gòu)成,其中主要由key和value構(gòu)成蕾久,key和value也都是byte數(shù)組势似。key的主要作用是根據(jù)一定的策略拌夏,將此消息路由到指定的分區(qū)中僧著,這樣就可以保證包含同一個(gè)key的消息全部寫入同一分區(qū)中,key可以是null障簿。消息的真正有效負(fù)載是value部分的數(shù)據(jù)盹愚,為了提高網(wǎng)絡(luò)和存儲(chǔ)的利用率,生產(chǎn)者會(huì)批量發(fā)送消息到Kafka站故,并在發(fā)送之前對(duì)消息進(jìn)行壓縮皆怕。
-
Topic: A topic is a category or feed name to which messages are published by the message producers. In Kafka, topics are partitioned and each partition is represented by the ordered immutable sequence of messages. A Kafka cluster maintains the partitioned log for each topic. Each message in the partition is assigned a unique sequential ID called the offset.Topic是用于存儲(chǔ)消息的邏輯概念毅舆,可以看做一個(gè)消息集合,每個(gè)Topic可以有多個(gè)生產(chǎn)者和消費(fèi)者愈腾。每個(gè)Topic可以劃分為多個(gè)分區(qū)(至少一個(gè)分區(qū))憋活,每個(gè)消息在被添加到分區(qū)時(shí),都會(huì)被分配一個(gè)offset虱黄,它是消息在此分區(qū)中的唯一編號(hào)悦即,Kafka通過(guò)offset保證消息在分區(qū)內(nèi)的順序,offset的順序性不跨分區(qū)橱乱。
Log: 分區(qū)在邏輯上對(duì)應(yīng)著一個(gè)Log辜梳,當(dāng)生產(chǎn)者將消息寫入分區(qū)時(shí),實(shí)際上是寫入到分區(qū)對(duì)應(yīng)的Log中泳叠,Log是一個(gè)邏輯概念作瞄,可以對(duì)應(yīng)到磁盤上的一個(gè)文件夾,Log由多個(gè)Segment組成危纫,每個(gè)Segment對(duì)應(yīng)一個(gè)日志文件和索引文件宗挥,在面對(duì)海量數(shù)據(jù)時(shí),為避免出現(xiàn)超大文件种蝶,每個(gè)日志文件的大小是有限制的属韧,當(dāng)超出限制后則會(huì)創(chuàng)建新的Segment,繼續(xù)對(duì)外提供服務(wù)蛤吓。這里要注意宵喂,因?yàn)镵afka采用順序IO,所以只能向最新的Segment追加數(shù)據(jù)会傲。為了權(quán)衡文件大小锅棕,索引速度,占用內(nèi)存大小等多方面因素淌山,索引文件采用稀疏索引的方式裸燎,大小并不會(huì)很大,在運(yùn)行時(shí)會(huì)將其內(nèi)容映射到內(nèi)存中泼疑,提高索引速度德绿。
保留策略Retention Policy: 無(wú)論消費(fèi)者是否已經(jīng)消費(fèi)了消息,Kafka都會(huì)一直保存這些消息退渗,但并不會(huì)像數(shù)據(jù)庫(kù)那樣長(zhǎng)期保存移稳。為了避免磁盤被占滿,Kafka都會(huì)配置相應(yīng)的保留策略会油,以實(shí)現(xiàn)周期性地刪除陳舊的消息个粱,有如下兩種保留策略
(1)根據(jù)消息保留時(shí)間:當(dāng)消息在Kafka中保存的時(shí)間超過(guò)了指定時(shí)間,就可以被刪除翻翩;
(2)根據(jù)Topic存儲(chǔ)數(shù)據(jù)的大卸夹怼:當(dāng)Topic所占的日志文件大小大于一個(gè)閾值稻薇,則可以開始刪除最舊的消息。
Kafka會(huì)啟動(dòng)一個(gè)后臺(tái)線程胶征,定期檢查是否存在可以刪除的消息塞椎。保留策略的配置既可以全局配置也可以在Topic級(jí)別進(jìn)行配置。-
日志壓縮Log Compaction: Kafka會(huì)在后臺(tái)啟動(dòng)一個(gè)線程睛低,定期將相同key的消息進(jìn)行合并忱屑,只保留最新的value值,如下圖所示:
Broker: A Kafka cluster consists of one or more servers where each one may have one or more server processes running and is called the broker. Topics are created within the context of broker processes. 一個(gè)單獨(dú)的Kafka server就是一個(gè)Broker暇昂,Broker的主要工作就是接收生產(chǎn)者發(fā)過(guò)來(lái)的消息莺戒,分配offset,之后保存到磁盤中急波;同時(shí)接收消費(fèi)者从铲,其他Broker的請(qǐng)求,根據(jù)請(qǐng)求類型進(jìn)行相應(yīng)處理并返回響應(yīng)澄暮。
-
副本 Replica:
-
ISR(IN-Sync Replica):
HW(high watermark): HW標(biāo)記了一個(gè)特殊的offset名段,當(dāng)消費(fèi)者處理消息的時(shí)候,只能拉取到HW之前的消息泣懊,HW之后的消息對(duì)消費(fèi)者來(lái)說(shuō)是不可見的伸辟,與ISR集合類似,HW也是由Leader副本管理的馍刮,當(dāng)ISR集合中全部的Follower副本都拉取HW指定消息進(jìn)行同步后信夫,Leader副本會(huì)遞增HW的值。Kafka官網(wǎng)將HW之前的消息狀態(tài)稱為“commit”卡啰,其含義是這些消息在多個(gè)副本中同時(shí)存在静稻,即使此時(shí)Leader副本損壞,也不會(huì)出現(xiàn)數(shù)據(jù)丟失匈辱。
-
LEO(Log End Offset): LEO是所有的副本都會(huì)有的一個(gè)offset標(biāo)記振湾,它指向追加到當(dāng)前副本的最后一個(gè)消息的offset,當(dāng)生產(chǎn)者向Leader副本追加消息的時(shí)候亡脸,Leader副本的LEO標(biāo)記會(huì)遞增押搪;當(dāng)Follower副本成功從Leader副本拉取消息并更新到本地的時(shí)候,F(xiàn)ollower副本的LEO就會(huì)增加浅碾。
同步復(fù)制Synchronous replication: In synchronous replication, a producer first identifies the lead replica from ZooKeeper and publishes the message. As soon as the message is published, it is written to the log of the lead replica and all the followers of the lead start pulling the message; by using a single channel, the order of messages is ensured. Each follower replica sends an acknowledgement to the lead replica once the message is written to its respective logs. Once replications are complete and all expected acknowledgements are received, the lead replica sends an acknowledgement to the producer. On the consumer’s side, all the pulling of messages is done from the lead replica.
-
異步復(fù)制Asynchronous replication: The only difference in this mode is that, as soon as a lead replica writes the message to its local log, it sends the acknowledgement to the message client and does not wait for acknowledgements from follower replicas. But, as a downside, this mode does not ensure message delivery in case of a broker failure.
- Zookeeper: ZooKeeper serves as the coordination interface between the Kafka broker and consumers. The ZooKeeper overview given on the Hadoop Wiki site is as follows (http://wiki.apache.org/hadoop/ZooKeeper/ProjectDescription):
ZooKeeper allows distributed processes to coordinate with each other through a shared hierarchical name space of data registers (we call these registers znodes), much like a file system.
The main differences between ZooKeeper and standard filesystems are that every znode can have data associated with it and znodes are limited to the amount of data that they can have. ZooKeeper was designed to store coordination data: status information, configuration, location information, and so on.
- Producers: Producers publish data to the topics by choosing the appropriate partition within the topic. For load balancing, the allocation of messages to the topic partition can be done in a round-robin fashion or using a custom defined function.生產(chǎn)者的主要工作是生產(chǎn)消息大州,并將消息按照一定的規(guī)則推送到Topic的分區(qū)中,其中規(guī)則可以是根據(jù)消息的key的hash值選擇分區(qū)及穗,或按序輪詢?nèi)糠謪^(qū)的方式等摧茴。
-
Consumer: Consumers are the applications or processes that subscribe to topics and process the feed of published messages
- Cluster和Controller: 多個(gè)Broker可以做成一個(gè)Cluster(集群)對(duì)外提供服務(wù)绵载,每個(gè)Cluster當(dāng)中會(huì)選舉出一個(gè)Broker來(lái)?yè)?dān)任Controller埂陆,Controller是kafka集群的指揮中心苛白,而其他Broker則聽從Controller指揮實(shí)現(xiàn)相應(yīng)的功能,Controller負(fù)責(zé)管理分區(qū)的狀態(tài)焚虱,管理每個(gè)分區(qū)的副本狀態(tài)购裙,監(jiān)聽Zookeeper中數(shù)據(jù)的變化等工作。Controller也是一主多從的實(shí)現(xiàn)鹃栽,所有Broker都會(huì)監(jiān)聽Controller Leader的狀態(tài)躏率,當(dāng)Leader Controller出現(xiàn)故障的時(shí)則重新選舉新的Controller Leader.
kafka 設(shè)計(jì)
kafka自定義了一套網(wǎng)絡(luò)協(xié)議,只要遵守這套協(xié)議格式民鼓,就可以向Kafka發(fā)送消息薇芝,也可以從Kafka拉取消息。
kafka producer
kafka server
kafka 日志存儲(chǔ)
??kafka使用日志文件的方式保存生產(chǎn)者發(fā)送的消息丰嘉,每條消息都有一個(gè)offset值來(lái)表示它在分區(qū)中的偏移量夯到,這個(gè)offset值是邏輯值,并不是消息實(shí)際存儲(chǔ)的物理地址饮亏。offset值類似于數(shù)據(jù)庫(kù)表中的主鍵耍贾,主鍵唯一確定了數(shù)據(jù)庫(kù)表中的一條記錄,offset唯一確定了分區(qū)中的一條消息路幸,Kafka對(duì)應(yīng)的邏輯存儲(chǔ)機(jī)制如下圖:
?Kafka中存儲(chǔ)的一般是海量消息數(shù)據(jù)赫粥,為了避免日志文件過(guò)大,Log并不是直接對(duì)應(yīng)于磁盤上的一個(gè)日志文件予借,而是磁盤上的一個(gè)目錄越平,這個(gè)命令的命名規(guī)則是<topic_name>_<partition_id>,Log與分區(qū)之間的關(guān)系是一一對(duì)應(yīng)的灵迫,對(duì)應(yīng)分區(qū)中的全部消息都存儲(chǔ)在此目錄下的日志文件中秦叛。
?Kafka通過(guò)分段的方式將Log分為多個(gè)LogSegment,LogSegment是一個(gè)邏輯上的概念瀑粥,一個(gè)LogSegment對(duì)應(yīng)磁盤上的一個(gè)日志文件和一個(gè)索引文件挣跋,其中日志文件用于記錄消息,索引文件用于保存消息的索引狞换;隨著消息的不斷寫入避咆,日志文件的大小到達(dá)一個(gè)閾值時(shí)舟肉,就創(chuàng)建新的日志文件和索引文件繼續(xù)寫入后續(xù)的消息和索引信息,日志文件的命名規(guī)則是[baseOffset].log查库,baseOffset是日志文件中第一條消息的offset路媚,如下圖所示:
?為了提高查詢消息的效率,每個(gè)日志文件都對(duì)應(yīng)一個(gè)索引文件樊销,這個(gè)索引文件并沒(méi)有為每天消息都建立索引項(xiàng)整慎,而是使用稀疏索引方式為日志文件匯總的部分消息建立索引,如下圖所示:
kafka命令行工具
- kafka代理啟動(dòng):
>>>bin/kafka-server-start.sh config/server.properties
- 創(chuàng)建kafka Topic:
>>>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkatopic
- 生產(chǎn)者投遞消息:
>>>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafkatopic
- 消費(fèi)者消費(fèi)消息:
>>>bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning
參考
- Apache kafka源碼書籍
- Learing apache kafka 書籍