一雹姊、消息隊(duì)列Message Queue
兩種模式
- 點(diǎn)對(duì)點(diǎn)模式
點(diǎn)對(duì)點(diǎn)模式是一個(gè)基于拉取或輪詢的消息傳送模型,由消費(fèi)者主動(dòng)拉取數(shù)據(jù)果元,客戶端需要實(shí)時(shí)開啟一個(gè)線程監(jiān)控隊(duì)列中是否有數(shù)據(jù)。 - 發(fā)布/訂閱模式
發(fā)布/訂閱模式是一個(gè)基于推送的消息傳送模型,由MQ主動(dòng)推送消息給所有訂閱者贱鄙,即使當(dāng)前訂閱者不可用。
優(yōu)點(diǎn)
- 解耦
允許你獨(dú)立的擴(kuò)展或修改兩邊的處理過程姨谷,只要確保它們遵循同樣的接口約束逗宁。 - 冗余
MQ把數(shù)據(jù)進(jìn)行持久化直到它們已經(jīng)被完全處理,通過這一方式規(guī)避了數(shù)據(jù)丟失風(fēng)險(xiǎn)梦湘。 - 擴(kuò)展
因?yàn)镸Q解耦了你的處理過程瞎颗,所以增大消息入隊(duì)和處理的頻率是很容易的件甥,只要另外增加處理過程即可,提高了靈活性和峰值處理能力哼拔。 - 可恢復(fù)性
消息隊(duì)列降低了進(jìn)程間的耦合度引有,所以即使一個(gè)處理消息的進(jìn)程掛掉,加入隊(duì)列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理倦逐。 - 順序保證
隊(duì)列本身就遵循FIFO的原則轿曙,保證了數(shù)據(jù)的處理順序。 - 緩沖
有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度僻孝,解決生產(chǎn)者和消費(fèi)者處理速度不一致的問題导帝。 - 異步通信
MQ提供了異步處理機(jī)制,允許用戶把消息放入隊(duì)列但不立即處理它穿铆,直到需要時(shí)再去處理您单。
二、Kafka簡(jiǎn)介
- Apache Kafka是一個(gè)開源消息系統(tǒng)荞雏,由Scala寫成虐秦。是由Apache軟件基金會(huì)開發(fā)的一個(gè)開源消息系統(tǒng)項(xiàng)目。
- Kafka最初是由LinkedIn公司開發(fā)凤优,并于2011年初開源悦陋。2012年10月從Apache Incubator畢業(yè)。該項(xiàng)目的目標(biāo)是為處理實(shí)時(shí)數(shù)據(jù)提供一個(gè)統(tǒng)一筑辨、高通量俺驶、低等待的平臺(tái)。
- Kafka是一個(gè)Distributed Message Queue棍辕。Kafka對(duì)消息保存時(shí)根據(jù)Topic進(jìn)行歸類暮现,發(fā)送消息者稱為Producer,消息接收者稱為Consumer楚昭,此外Kafka集群有多個(gè)Kafka實(shí)例組成栖袋,每個(gè)實(shí)例稱為Broker。
- Kafka需要依賴于Zookeeper保存一些Meta信息抚太,來保證系統(tǒng)可用性塘幅。
- 在流式計(jì)算中,Kafka一般用來緩存數(shù)據(jù)尿贫,Storm通過消費(fèi)Kafka的數(shù)據(jù)進(jìn)行計(jì)算电媳。
三、Kafka架構(gòu)
No. | 組件 | 說明 |
---|---|---|
1 | Producer | 消息生產(chǎn)者帅霜,向Kafka中發(fā)消息的客戶端匆背。 |
2 | Consumer | 消息消費(fèi)者,從Kafka中取消息的客戶端身冀。 |
3 | Broker | 一臺(tái)Kafka服務(wù)器就是一個(gè)Broker钝尸,一個(gè)集群由多個(gè)Broker組成,一個(gè)Broker可以容納多個(gè)Topic搂根。 |
4 | Topic | 屬于特定類別的消息流稱為Topic珍促, 數(shù)據(jù)存儲(chǔ)在Topic中,可以理解為一個(gè)隊(duì)列剩愧。 |
5 | Partition | 為了實(shí)現(xiàn)負(fù)載均衡和高并發(fā)猪叙,一個(gè)非常大的Topic可以通過分為多個(gè)Partition分布到多個(gè)Broker上,每個(gè)Partition都是一個(gè)有序的隊(duì)列仁卷。 |
6 | Consumer Group(CG) | 這是Kafka用來實(shí)現(xiàn)一個(gè)消息的廣播(發(fā)給所有Consumer)和單播(發(fā)給任意一個(gè)Consumer)的手段穴翩。一個(gè)Topic可以有多個(gè)CG,Topic中的消息會(huì)復(fù)制(不是真的復(fù)制锦积,是概念上的)到所有的CG芒帕,但每個(gè)Partition只會(huì)把消息發(fā)給該CG中的一個(gè)Consumer。如果需要實(shí)現(xiàn)廣播丰介,只要每個(gè)Consumer都有一個(gè)獨(dú)立的CG即可背蟆,需要實(shí)現(xiàn)單播,只要所有Consumer都在同一個(gè)CG即可哮幢。用CG還可以將Consumer進(jìn)行自由分組而不需要重復(fù)發(fā)送消息到不同的Topic带膀。 |
7 | Offset | Partition中的每條消息都會(huì)被分配一個(gè)有序的id(Offset)。Kafka只保證每個(gè)Partition中的順序橙垢,不保證多個(gè)Partition的順序垛叨。 |
8 | Leader | 負(fù)責(zé)給定Partition的所有讀取和寫入的節(jié)點(diǎn)。 |
9 | Follower | 跟隨Leader指令的節(jié)點(diǎn)被稱為Follower柜某。 如果Leader節(jié)點(diǎn)宕機(jī)点额,其中一個(gè)Follower將通過選舉自動(dòng)成為新的Leader。 |
四莺琳、Kafka工作流程
Producer生產(chǎn)過程
- 寫入方式
Producer采用Push模式將消息發(fā)布到Broker还棱,每條消息都被追加(append)到Partition中,屬于順序?qū)懘疟P(順序?qū)懘疟P效率比隨機(jī)寫內(nèi)存要高惭等,保障Kafka吞吐率)珍手。 -
分區(qū)(Partition)
消息發(fā)送時(shí)都被發(fā)送到一個(gè)topic,其本質(zhì)就是一個(gè)目錄辞做,而topic是由一些Partition Logs(分區(qū)日志)組成琳要,其組織結(jié)構(gòu)如下圖所示:
我們可以看到,每個(gè)Partition中的消息都是有序的秤茅,生產(chǎn)的消息被不斷追加到Partition log上稚补,其中的每一個(gè)消息都被賦予了一個(gè)唯一的offset值。
- 分區(qū)的原因
- 負(fù)載均衡框喳,方便在集群中擴(kuò)展课幕,每個(gè)Partition可以通過調(diào)整以適應(yīng)它所在的機(jī)器厦坛,而一個(gè)Topic又可以有多個(gè)Partition組成,因此整個(gè)集群就可以適應(yīng)任意大小的數(shù)據(jù)了乍惊。
- 可以提高并發(fā)杜秸,因?yàn)榭梢砸訮artition為單位讀寫了。
- 分區(qū)的原則
- 已指定Partition润绎,則直接使用該P(yáng)artition撬碟。
- 未指定Partition但指定了Key,則通過對(duì)Key進(jìn)行哈希計(jì)算得出一個(gè)Partition莉撇。
- Partition和Key都未指定呢蛤,則輪詢選出一個(gè)Partition。
- 分區(qū)的原因
- 副本(Replication)
同一個(gè)Partition可能會(huì)有多個(gè)Replication(對(duì)應(yīng)server.properties
配置中的default.replication.factor=N
)棍郎。沒有Replication的情況下其障,一旦Broker宕機(jī),其上所有Partition的數(shù)據(jù)都不可被Consumer消費(fèi)坝撑,同時(shí)Producer也不能再將數(shù)據(jù)存于其上的Partition静秆。引入Replication之后,同一個(gè)Partition可能會(huì)有多個(gè)Replication巡李,而這時(shí)需要在這些Replication之間選出一個(gè)Leader抚笔,Producer和Consumer只與這個(gè)Leader交互,其它Replication作為Follower從Leader中復(fù)制數(shù)據(jù)侨拦。 -
寫入流程
Broker保存過程
- 存儲(chǔ)方式
物理上把Topic分成一個(gè)或多個(gè)Partition(對(duì)應(yīng)server.properties
配置中的num.partitions=3
)殊橙,每個(gè)Partition物理上對(duì)應(yīng)一個(gè)文件夾(該文件夾存儲(chǔ)該P(yáng)artition的所有消息和索引文件),如下:
[atguigu@hadoop102 logs]$ ll
drwxrwxr-x. 2 atguigu atguigu 4096 8月 6 14:37 first-0
drwxrwxr-x. 2 atguigu atguigu 4096 8月 6 14:35 first-1
drwxrwxr-x. 2 atguigu atguigu 4096 8月 6 14:37 first-2
[atguigu@hadoop102 logs]$ cd first-0
[atguigu@hadoop102 first-0]$ ll
-rw-rw-r–. 1 atguigu atguigu 10485760 8月 6 14:33 00000000000000000000.index
-rw-rw-r–. 1 atguigu atguigu 219 8月 6 15:07 00000000000000000000.log
-rw-rw-r–. 1 atguigu atguigu 10485756 8月 6 14:33 00000000000000000000.timeindex
-rw-rw-r–. 1 atguigu atguigu 8 8月 6 14:37 leader-epoch-checkpoint
-
存儲(chǔ)策略
無論消息是否被消費(fèi)狱从,Kafka都會(huì)保留所有消息膨蛮。有兩種策略可以刪除舊數(shù)據(jù):- 基于時(shí)間:
log.retention.hours=168
- 基于大小:
log.retention.bytes=1073741824
需要注意的是季研,因?yàn)镵afka讀取特定消息的時(shí)間復(fù)雜度為O(1)敞葛,即與文件大小無關(guān),所以這里刪除過期文件與提高Kafka性能無關(guān)与涡。
- 基于時(shí)間:
-
Zookeeper存儲(chǔ)結(jié)構(gòu)
注意:只有Broker和Consumer在Zookeeper中注冊(cè)惹谐,Producer不在Zookeeper中注冊(cè)。
Consumer消費(fèi)過程
-
消費(fèi)者組
Consumer是以Consumer Group的方式工作驼卖,由一個(gè)或者多個(gè)Consumer組成一個(gè)Group氨肌,共同消費(fèi)一個(gè)Topic。每個(gè)Partition在同一時(shí)間只能由Group中的一個(gè)Consumer讀取酌畜,但是多個(gè)Group可以同時(shí)消費(fèi)同一個(gè)Partition怎囚。在圖中,有一個(gè)由三個(gè)Consumer組成的Group桥胞,有一個(gè)Consumer讀取Topic中的兩個(gè)Partition恳守,另外兩個(gè)Consumer分別讀取一個(gè)Partition考婴。某個(gè)Consumer讀取某個(gè)Partition,也可以叫做某個(gè)Consumer是某個(gè)Partition的擁有者井誉。
在這種情況下蕉扮,Consumer可以通過水平擴(kuò)展的方式同時(shí)讀取大量的消息整胃。另外颗圣,如果一個(gè)Consumer失敗了,那么Group中其它的Consumer會(huì)自動(dòng)負(fù)載均衡讀取之前失敗的Consumer讀取的Partition屁使。 - 消費(fèi)方式
Consumer采用Pull模式從Broker中讀取數(shù)據(jù)在岂。
Push模式很難適應(yīng)消費(fèi)速率不同的Consumer,因?yàn)橄l(fā)送速率是由Broker決定的蛮寂。它的目標(biāo)是盡可能以最快速度傳遞消息蔽午,但是這樣很容易造成Consumer來不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞酬蹋。而Pull模式則可以根據(jù)Consumer的消費(fèi)能力以適當(dāng)?shù)乃俾氏M(fèi)消息及老。
對(duì)于Kafka而言,Pull模式更合適范抓,它可簡(jiǎn)化Broker的設(shè)計(jì)骄恶,Consumer可自主控制消費(fèi)消息的速率,同時(shí)Consumer可以自己控制消費(fèi)方式——即可以批量消費(fèi)也可以逐條消費(fèi)匕垫,同時(shí)還能選擇不同的提交方式從而實(shí)現(xiàn)不同的傳輸語義僧鲁。
Pull模式不足之處是,如果Kafka沒有數(shù)據(jù)象泵,消費(fèi)者可能會(huì)陷入循環(huán)中寞秃,一直等待數(shù)據(jù)到達(dá)。為了避免這種情況偶惠,可以Pull請(qǐng)求中設(shè)置參數(shù)春寿,允許Consumer請(qǐng)求在等待數(shù)據(jù)到達(dá)的“長(zhǎng)輪詢”中進(jìn)行阻塞(并且可選地等待到給定的字節(jié)數(shù),以確保大的傳輸大泻瞿酢)绑改。
五、Kafka案例
生產(chǎn)者
package com.fas.kafka.service;
import org.apache.log4j.Logger;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import com.fas.kafka.util.KafkaUtil;
@Service
@Log4j
public class KafkaProducerService {
private KafkaTemplate<String, String> kafkaTemplate = KafkaUtil.getKafkaTemplate();
public void send(String topic, String data) {
ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send(topic, data);
listenableFuture.addCallback(result -> log.info("Send data to kafka [success] - " + topic + " - " + data), result -> log.info("Send data to kafka [error] - " + topic + " - " + data));
}
}
消費(fèi)者
package com.fas.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.MessageListener;
import com.alibaba.fastjson.JSONObject;
import com.fas.kafka.entity.DeviceOnlineData;
import lombok.extern.log4j.Log4j;
@Log4j
public class KafkaConsumer implements MessageListener<String, String> {
public static final BlockingQueue<DeviceOnlineData> DATA_ACCESS_BLOCKING_QUEUE = new LinkedBlockingQueue<>();
public static final String KAFKA_DATA_ACCESS_TOPIC = PropertiesUtil.KAFKA_PROPERTIES.getProperty("kafka.data_access.topic");
public static final String KAFKA_TEST_TOPIC = PropertiesUtil.KAFKA_PROPERTIES.getProperty("kafka.test.topic");
@Override
public void onMessage(ConsumerRecord<String, String> consumerRecord) {
String topic = consumerRecord.topic();
log.info("topic: " + topic);
String value = consumerRecord.value();
log.info("value: " + value);
if (KAFKA_DATA_ACCESS_TOPIC.equals(topic)) {
JSONObject.parseArray(value, DeviceOnlineData.class).stream().forEach(deviceOnlineData -> {
try {
DATA_ACCESS_BLOCKING_QUEUE.put(deviceOnlineData);
log.info("Put into blocking queue");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
} else if (KAFKA_TEST_TOPIC.equals(topic)) {
log.info("Test success");
} else {
log.info("Unknown topic");
}
}
}
六扒腕、常用命令
- 查看所有Topic
./bin/kafka-topics.sh --list --zookeeper zk01:2181
- 創(chuàng)建Topic
./bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic test
- 刪除Topic
./bin/kafka-topics.sh --delete --zookeeper zk01:2181 --topic test
需要server.properties
中設(shè)置delete.topic.enable=true
绢淀,否則只是標(biāo)記刪除,或者直接重啟 - 通過Shell命令發(fā)送消息
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
- 通過Shell消費(fèi)消息
./bin/kafka-console-consumer.sh --zookeeper zk01:2181 --from-beginning --topic test
- 查看消費(fèi)位置
./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper zk01:2181 --group testGroup
- 查看某個(gè)Topic的詳情
./bin/kafka-topics.sh --topic test --describe --zookeeper zk01:2181
- 對(duì)分區(qū)數(shù)進(jìn)行修改
./bin/kafka-topics.sh --zookeeper zk01 --alter --partitions 15 --topic utopic