kafka與RabbitMQ的區(qū)別?
1错览、確認機制不同
在RabbitMQ中纫雁,消息確認是指生產(chǎn)者發(fā)送消息到RabbitMQ后,等待RabbitMQ返回確認信息倾哺,確認消息已經(jīng)被正確地接收并處理轧邪。通常情況下,【消息確認是同步進行的】即必須等到 【消息隊列】 接收到和處理了消息后羞海,才會返回確認信息忌愚。這樣可以確保消息隊列完全處理了消息,避免消息的丟失却邓,保證了數(shù)據(jù)的一致性和可靠性硕糊。
而在Kafka中,為了保證高吞吐率腊徙,沒有提供消息確認機制(at-most-once和at-least-once除外)简十,數(shù)據(jù)是異步推送到broker中的,一旦生產(chǎn)者把消息推送到broker后撬腾,就不會再對該消息進行確認處理螟蝙,消費者需要自己對消息的處理情況進行確認。在Kafka中民傻,也有一些同步的模式(例如producer.send()方法中的同步調(diào)用)胰默,可以確保發(fā)送的消息被同步復(fù)制到Kafka的所有副本,并在接收到確認后才會返回饰潜。
2初坠、實施部署不同
RabbitMQ 穩(wěn)定可靠,數(shù)據(jù)?致,?持多協(xié)議,有消息確認,基于erlang語?
Kafka ?吞吐,?性能,快速持久化,?消息確認,?消息遺漏,可能會有有重復(fù)消息,依賴于zookeeper,成本高和簸,“成本高”的含義是指 Kafka 部署和運維所需要的成本相比 RabbitMQ更高需要更加復(fù)雜的基礎(chǔ)設(shè)施來支持它的運行彭雾,這些基礎(chǔ)設(shè)施包括:
一、 集群環(huán)境成本高:Kafka 集群需要依賴 ZooKeeper 來實現(xiàn)節(jié)點的選舉和協(xié)調(diào)工作锁保,而 ZooKeeper 集群的部署和配置都比 RabbitMQ 更加復(fù)雜薯酝,需要更多的服務(wù)器資源和維護成本
二. 數(shù)據(jù)持久化成本高:Kafka 依賴磁盤的高速讀寫特性來實現(xiàn)其快速的消息持久化半沽,因此需要更高的成本來維護磁盤的健康狀態(tài)和數(shù)據(jù)備份和恢復(fù)等工作。
總的來說吴菠,Kafka 的高吞吐者填、高性能的特點是與高成本密切相關(guān)的,如果一個應(yīng)用場景對性能要求很高做葵,且具備一定的技術(shù)架構(gòu)能力占哟,那么可以選擇使用 Kafka。而如果應(yīng)用場景對可靠性和穩(wěn)定性要求較高酿矢,對【性能要求不太高榨乎,且業(yè)務(wù)規(guī)模不太大 ,那么選擇 RabbitMQ 更為合適瘫筐。
邏輯結(jié)構(gòu):
用一棟大樓類比說明
1蜜暑、Broker 是指 RabbitMQ 服務(wù)器實例,并根據(jù)預(yù)先定義的規(guī)則將消息發(fā)送到合適的隊列或 Exchange
2策肝、exchange是 RabbitMQ 中用于接收和分發(fā)消息的中間組件肛捍,它負責(zé)接收來自生產(chǎn)者的消息,并將其路由到與之相關(guān)的1個或多個隊列 (生產(chǎn)者到交換機)
3之众、隊列:房間是存放消息的地方(每個房間)
4拙毫、綁定:綁定規(guī)定了交換機和隊列之間的連接(樓層之間的通道)
5疯淫、虛擬主機:虛擬主機則是 RabbitMQ 中用于【邏輯隔離】的重要概念符衔,它類似于一個命名空間(每層樓)
可以在一個 Broker 內(nèi)部創(chuàng)建多個不同的虛擬主機车要,每個虛擬主機都有自己的命名空間疗杉,并且可以獨立配置和管理其中Exchange蒂破、Queue度苔、Binding音诫、用戶權(quán)限等
在 RabbitMQ 中是可以設(shè)置多個
交換機的攻泼。通過設(shè)置不同類型的交換機以及它們之間的綁定關(guān)系竣付,可以實現(xiàn)復(fù)雜的消息路由和處理邏輯
交換機類型和工作模式都涉及消息在 RabbitMQ 中的傳遞和處理诡延,但它們的焦點和作用有所不同。交換機類型主要關(guān)注消息的路由和分發(fā)古胆,決定消息如何被發(fā)送到隊列
中肆良;而工作模式主要關(guān)注消費者對消息的接收和處理方式,決定消息如何被消費
消息發(fā)送模式:
1逸绎、簡單模式:一個隊列只有一個消費者
2惹恃、工作模式:多個消費者監(jiān)聽同一個隊列,但消費者中只有一個消費者會成功消費消息(負載均衡)
3棺牧、發(fā)布/訂閱模式:一個交換機綁定多個隊列巫糙,消息同時被所有隊列消費(集體通知)
4、路由模式:它通過在交換機和隊列之間建立綁定關(guān)系颊乘,并定義路由鍵來實現(xiàn)消息的有選擇性
地路由傳遞(訂閱廣告)
有一個即時消息系統(tǒng)或者新聞訂閱系統(tǒng)需要將消息或新聞推送給多個用戶参淹,這時可以采用發(fā)布/訂閱模式醉锄。生產(chǎn)者將消息發(fā)送到交換機 (exchange),交換機將消息廣播給多個訂閱者浙值,訂閱者通過綁定交換機來接收消息恳不。在這種模式下,每個訂閱者都會接收到相同的消息
开呐,可以采用不同的隊列來對訂閱者進行分組烟勋,以實現(xiàn)訂閱者之間的隔離和不同級別的消息推送
工作模式和發(fā)布訂閱模式的區(qū)別?
在工作模式中筐付,多個消費者可以并行地處理來自同一個隊列的任務(wù)
每個消息只能由一個消費者處理神妹,但是多個消息可以同時被多個消費者處理
在發(fā)布訂閱模式中,雖然消息會被廣播到多個訂閱者家妆,但是每個訂閱者獨立地接收和處理消息鸵荠。因此,不同的訂閱者可以并行地處理自己接收到的消息路由模式和發(fā)布訂閱模式的區(qū)別伤极?
發(fā)布訂閱Fanout(廣播)模式:在 Fanout 模式中蛹找,交換機會將消息廣播到綁定的所有隊列。無論隊列是否有不同的路由鍵或其他綁定條件哨坪,交換機都會將消息傳遞給所有綁定的隊列庸疾。
路由Redirect(重定向)模式:在 Redirect 模式中,交換機會將消息發(fā)送到1個特定的隊列而不是廣播到所有綁定的隊列当编。這種模式適用于需要將消息定向發(fā)送到1個特定的隊列
創(chuàng)建用戶和創(chuàng)建交換機
創(chuàng)建用戶:創(chuàng)建用戶名届慈、密碼
創(chuàng)建交換機:
1、選擇虛擬主機
2忿偷、選擇交換機類型
3金顿、持久性……
在創(chuàng)建交換機時,durability
是一個可選的參數(shù)鲤桥,用于配置交換機的持久性揍拆。持久性指的是當消息代理(如 RabbitMQ)重新啟動
時,交換機是否仍然存在茶凳。如果將 durability 設(shè)置為 true嫂拴,交換機將被標記為持久的,即使消息代理重新啟動贮喧,交換機也將保留
筒狠。這意味著在消息代理恢復(fù)正常運行后,交換機將保持不變
如果你的應(yīng)用程序需要頻繁地更改或刪除交換機箱沦,并且你不希望在每次更改時手動刪除或重新創(chuàng)建交換機辩恼,那么可以選擇非持久化交換機
4、交換機綁定隊列
發(fā)布消息并設(shè)置消息屬性
在 RabbitMQ 的 Java 客戶端 API 中,可以在發(fā)布消息時設(shè)置這些消息屬性运挫。
下面是一個示例代碼,展示了如何設(shè)置消息的持久性套耕、優(yōu)先級和過期時間
:
// 創(chuàng)建連接和通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 創(chuàng)建消息屬性
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 設(shè)置消息的持久性
.priority(5) // 設(shè)置消息的優(yōu)先級
.expiration("10000") // 設(shè)置消息的過期時間
.build();
// 發(fā)布消息
String exchangeName = "exchange1";
String routingKey = "routingKey1";
String message = "Hello, RabbitMQ!";
channel.basicPublish(exchangeName, routingKey, properties, message.getBytes());
// 關(guān)閉連接和通道
channel.close();
connection.close();
在上述代碼中谁帕,首先創(chuàng)建了連接和通道。然后使用 AMQP.BasicProperties.Builder
類來創(chuàng)建消息屬性對象冯袍,通過鏈式調(diào)用方法來設(shè)置屬性匈挖,例如使用 deliveryMode
方法設(shè)置消息的持久性(1非持久2持久),使用 priority
方法設(shè)置消息的優(yōu)先級康愤,使用 expiration
方法設(shè)置消息的過期時間儡循。最后,調(diào)用 basicPublish
方法發(fā)布消息時征冷,將消息屬性對象作為參數(shù)傳遞择膝,即可在消息中包含相應(yīng)的屬性。
使用RabbitMQ傳遞對象
發(fā)送和接收的都是字符串/字節(jié)數(shù)組類型的消息
- 使用
序列化
對象
消息提供者
@Service
public class MQService {
@Resource
private AmqpTemplate amqpTemplate;
public void sendGoodsToMq(Goods goods){
//消息隊列可以發(fā)送 字符串检激、字節(jié)數(shù)組肴捉、序列化對象
byte[] bytes = SerializationUtils.serialize(goods);
amqpTemplate.convertAndSend("","queue1",bytes);
}
}
消息消費者
@Component
@RabbitListener(queues = "queue1")
public class ReceiveService {
@RabbitHandler
public void receiveMsg(byte[] bs){
Goods goods = (Goods) SerializationUtils.deserialize(bs);
System.out.println("byte[]---"+goods);
}
}
PS:為什么要實現(xiàn)序列化接口?
1、易于傳輸和存儲:將 Java 對象序列化為字節(jié)數(shù)組
后叔收,可以將其轉(zhuǎn)換為不同的格式齿穗,例如 JSON 或 XML,用于傳輸?shù)讲煌南到y(tǒng)之間饺律。字節(jié)數(shù)組還可以在數(shù)據(jù)庫中進行持久化存儲窃页,以便以后檢索和使用。
2复濒、平臺無關(guān)性:通過使用序列化對象脖卖,可以將 Java 對象轉(zhuǎn)換為平臺無關(guān)的字節(jié)序列,這樣它們可以在不同的系統(tǒng)
之間進行傳輸和存儲巧颈,包括跨語言和跨平臺的場景胚嘲。
3、可擴展性:可以在對象中添加新字段和屬性洛二,并保證可以與舊版本兼容馋劈,以便在不同版本的應(yīng)用程序之間進行傳輸和存儲。這可以通過維護對象的序列化版本來實現(xiàn)晾嘶。
4妓雾、高效性:通過序列化對象,可以減少在網(wǎng)絡(luò)或磁盤 I/O 期間傳輸數(shù)據(jù)的大小垒迂,從而提高傳輸效率并減少網(wǎng)絡(luò)流量械姻。這是因為序列化過程通常會去除對象中的一些額外信息或元數(shù)據(jù),只保留必要的數(shù)據(jù)以及用于恢復(fù)對象的必要信息机断。
- 使用
JSON
字符串傳遞
消息提供者
@Service
public class MQService {
@Resource
private AmqpTemplate amqpTemplate;
public void sendGoodsToMq(Goods goods) throws
JsonProcessingException {
//消息隊列可以發(fā)送 字符串楷拳、字節(jié)數(shù)組绣夺、序列化對象
ObjectMapper objectMapper = new ObjectMapper();
String msg = objectMapper.writeValueAsString(goods);
amqpTemplate.convertAndSend("","queue1",msg);
}
}
RabbitMQ事務(wù)、消息確認和return機制欢揖、手動ACK
RabbitMQ事務(wù)指的是基于客戶端實現(xiàn)的事務(wù)管理陶耍,當在消息發(fā)送過程中添加了事務(wù),處理效率降低
幾十倍甚至上百倍
Connection connection = RabbitMQUtil.getConnection(); //connection 表
示與 host1的連接
Channel channel = connection.createChannel();
channel.txSelect(); //開啟事務(wù)
try{
channel.basicPublish("ex4", "k1", null, msg.getBytes());
channel.txCommit(); //提交事務(wù)
}catch (Exception e){
channel.txRollback(); //事務(wù)回滾
}finally{
channel.close();
connection.close();
}
同步等待:為了實現(xiàn)事務(wù)的原子性
她混,事務(wù)提交是一個同步操作烈钞,即發(fā)送端會等待事務(wù)提交的結(jié)果,只有在返回提交成功的響應(yīng)后才會繼續(xù)發(fā)送下一條消息坤按。
重復(fù)操作:如果發(fā)送端在發(fā)送消息的過程中出現(xiàn)錯誤或異常毯欣,事務(wù)會回滾
并且消息會重發(fā)
,這種方式可以保證消息的可靠性傳遞
所以臭脓,在實際場景中酗钞,如果對消息的實時性要求較高或?qū)ο⑻幚淼耐掏铝坑休^高要求,建議盡量避免使用事務(wù)
PS:消息吞吐量(Message Throughput):消息吞吐量指的是系統(tǒng)在單位時間
內(nèi)處理的消息量来累,對于需要高吞吐量的應(yīng)用來說算吩,高效地傳遞和處理消息是非常重要的。使用事務(wù)機制會對消息發(fā)送的性能造成一定的影響佃扼,因為事務(wù)機制需要等待事務(wù)提交的確認
偎巢,會增加發(fā)送消息的延遲。
事務(wù)如何確保消息發(fā)送的可靠性?
消息的可靠性:從 生產(chǎn)者發(fā)送消息 —— 消息隊列存儲消息 —— 消費者消費消息的整個過程中消息的安全性及可控性
RabbitMQ提供了消息確認機制及return機制
意義:消息確認機制可以幫助發(fā)送方確保消息成功發(fā)送到交換機兼耀,并可以跟蹤
消息是否被一個或多個消費者成功消費
压昼。Return 機制則是確保了消息成功從交換機分發(fā)到隊列的過程,當消息無法
成功路由到隊列時瘤运,消息會被 Return
給消息提供者
- 消息確認機制:當消息提供者將消息發(fā)送到交換機時窍霞,交換機會對消息提供者進行反饋(到達交換機)
- return機制:當消息達到交換機之后,交換機會將消息分發(fā)到隊列拯坟,MQ會將分發(fā)的結(jié)果也會反饋給消息提供者(到達隊列)
在 RabbitMQ 中使用消息確認機制可以確保消息被成功發(fā)送到交換機但金,并被一個或多個消費者成功消費。
RabbitMQ 提供了同步和異步兩種消息確認方式
同步消息確認指的是郁季,在消息發(fā)送之后冷溃,發(fā)送方會阻塞等待確認結(jié)果。具體來說梦裂,發(fā)送方通過 waitForConfirms()
方法等待 RabbitMQ 發(fā)送確認結(jié)果似枕,直到接收到確認結(jié)果或者超時時間到達。如果確認結(jié)果為消息成功發(fā)送到交換機并被一個或多個消費者消費年柠,則 waitForConfirms()
方法返回 true凿歼,否則返回 false。
異步消息確認指的是,在消息發(fā)送之后答憔,發(fā)送方不會阻塞等待確認結(jié)果味赃。相反,它通過添加ConfirmListener
來異步處理確認回調(diào)
虐拓。確認回調(diào)會在消息成功發(fā)送到交換機并被一個或多個消費者消費或者發(fā)送失敗時觸發(fā)心俗。
異步消息確認相較于同步消息確認,其對消息發(fā)送方更加友好侯嘀。使用異步確認的優(yōu)點是可以提高發(fā)送消息的吞吐量,因為發(fā)送方能夠繼續(xù)
發(fā)送下一條
消息谱轨,而不需要等待每條消息的確認結(jié)果戒幔。另外,異步確認還可以通過監(jiān)聽確認回調(diào)在消息發(fā)送失敗
時及時發(fā)現(xiàn)并處理這種情況土童,而同步確認則只能通過超時等待
的方式判斷消息是否發(fā)送成功诗茎。
普通maven項目的消息確認機制
下面是 RabbitMQ 中 Java 客戶端使用消息確認和批量發(fā)送消息的示例代碼,以及使用確認監(jiān)聽器異步處理確認結(jié)果:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class MessageSender {
private static final String EXCHANGE_NAME = "ex1";
private static final String ROUTING_KEY = "a";
private static final String MESSAGE = "Hello, RabbitMQ!";
private static final int MESSAGE_COUNT = 10;
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//開啟消息確Z
channel.confirmSelect();
//批量發(fā)送消息
for (int i=0 ; i<MESSAGE_COUNT ; i++) {
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, MESSAGE.getBytes());
}献汗、
//假如發(fā)送消息需要10s敢订,waitForConfirms會進?阻塞狀態(tài)(同步)
//boolean b = channel.waitForConfirms();
//使用監(jiān)聽器異步處理確認結(jié)果
channel.addConfirmListener(new ConfirmListener() {
//參數(shù)1:long deliveryTag 返回消息的標識
//參數(shù)2:boolean multiple 是否為批量confirm
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("~~~~~消息成功發(fā)送到交換機");
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("~~~~~消息發(fā)送到交換機失敗");
}
});
}
}
當調(diào)用 channel.waitForConfirms()
時,它會阻塞當前線程罢吃,直到 RabbitMQ 確認消息是否成功發(fā)送到隊列或交換機楚午。
換句話說,在以下情況下尿招,waitForConfirms()
會阻塞程序繼續(xù)執(zhí)行:
- 當你發(fā)送一條消息后矾柜,還沒有收到 RabbitMQ 的確認消息;
- 當發(fā)送消息時發(fā)生異常就谜,如網(wǎng)絡(luò)中斷或發(fā)送超時怪蔑;
- 當你設(shè)置了批量發(fā)送消息的模式,并且尚未收到足夠數(shù)量的消息確認丧荐,或者
超過
了設(shè)置的超時時間
缆瓣。
簡而言之,waitForConfirms()
會等待確認消息的返回虹统,確保消息成功發(fā)送到 RabbitMQ弓坞。
消息確認機制不光監(jiān)聽成功
的消息同時也監(jiān)聽失敗
的消息
spring項目的return機制
@Component
public class MyReturnListener implements RabbitTemplate.ReturnsCallback {
@Autowired
private AmqpTemplate amqpTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(this);
}
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("消息從交換機分發(fā)到隊列失敗");
String exchange = returnedMessage.getExchange();
String routingKey = returnedMessage.getRoutingKey();
String msg = returnedMessage.getMessage().toString();
amqpTemplate.convertAndSend(exchange,routingKey,msg);
}
}
在上述代碼中,returnedMessage
方法是用于處理消息從交換機分發(fā)到隊列失敗的回調(diào)方法车荔。當消息無法路由到指定的隊列時昼丑,會觸發(fā)該方法。
下面是代碼中各個參數(shù)的含義:
returnedMessage
對象:表示被退回的消息的相關(guān)信息夸赫。
getExchange()
:獲取退回消息所使用的交換機名稱菩帝。
getRoutingKey()
:獲取退回消息時所使用的路由鍵。
getMessage()
:獲取退回的消息對象,可以通過toString()
方法將其轉(zhuǎn)換為字符串呼奢。
在該方法中宜雀,它首先打印了一條消息提示,表示消息從交換機分發(fā)到隊列失敗握础。然后辐董,通過 amqpTemplate.convertAndSend(exchange,routingKey, msg)
方法將消息重新發(fā)送到指定的交換機和路由鍵,以便重新分發(fā)消息
RabbitMQ消費者手動應(yīng)答(手動ACK)
@Component
@RabbitListener(queues = "queue01")
public class Consumer1 {
@RabbitHandler
public void process(String msg, Channel channel, Message message) throws IOException {
try {
System.out.println("成功接收到消息 msg = " + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
System.err.println("接收消息失敗 msg = " + msg);
}
}
}
在上述代碼中禀综,channel.basicAck
是用于手動確認消息的方法简烘,用于告知 RabbitMQ 已經(jīng)成功處理(消費)了一個消息。下面是 channel.basicAck
方法的各個參數(shù)的詳細說明:
deliveryTag
:這是一個消息的唯一標識符定枷。每個消息都有一個獨特的deliveryTag
孤澎,用于標識消息的順序。通過message.getMessageProperties().getDeliveryTag()
可以獲取當前處理消息的deliveryTag
欠窒。
PS:每條消息在 RabbitMQ 中都會被分配一個唯一
的deliveryTag
覆旭,用于標識消息的順序。deliveryTag 是一個連續(xù)遞增的正整數(shù)
岖妄,每次消費一條消息時型将,deliveryTag 會被分配并隨消息一起傳遞給消費者。消費者在處理完一條消息后荐虐,需要對該消息進行手動 ACK 才能告知 RabbitMQ 已經(jīng)成功處理了該消息七兜,此時會將該消息的 deliveryTag 返回給 RabbitMQ 作為確認-
multiple
:這是一個布爾值,用于指定是否確認多個消息福扬。對于簡單地確認單個消息惊搏,可以將其設(shè)置為false
。如果要確認多個消息忧换,可以將其設(shè)置為true
恬惯,這將確認所有比當前deliveryTag
小(包括當前deliveryTag
)的未確認消息亚茬。
PS:在消息隊列系統(tǒng)中酪耳,通常會有多個消息同時被發(fā)送到隊列中等待消費。當消費者處理消息時刹缝,通常會以一批消息的方式進行確認碗暗,而非逐條確認。因此梢夯,multiple參數(shù)的存在是為了提供一種批量確認
消息的機制在這種情況下言疗,如果設(shè)置為
true
,則表示當前消息之前的所有消息也被認為已經(jīng)處理完成颂砸,全部移除噪奄。如果設(shè)置為false
死姚,則僅確認當前deliveryTag
的消息,然后將其從隊列中移除勤篮。通常設(shè)置為false都毒。 requeue
:這是一個布爾值,用于指定當確認消息時碰缔,是否將消息重新加入到隊列账劲。如果設(shè)置為false
,則代表不重新加入隊列金抡,即認為消息已被完全處理瀑焦。如果設(shè)置為true
,則代表將消息重新加入隊列梗肝,以便重新被消費榛瓮。通常情況下,如果消息處理成功统捶,則設(shè)置為false
榆芦,如果消息處理失敗柄粹,則設(shè)置為true
喘鸟。
在上述代碼中,第一個 channel.basicAck
用于確認消息已經(jīng)成功處理驻右,同時使用了 message.getMessageProperties().getDeliveryTag()
來獲取當前消息的 deliveryTag
值什黑。
而第二個 channel.basicAck
是在處理消息出現(xiàn)異常時的回退機制。它使用與第一個 channel.basicAck
相同的參數(shù)堪夭,以確認消息處理失敗愕把,然后將消息重新加入隊列(requeue
設(shè)置為 true
),以便重新消費森爽。同時恨豁,使用了 message.getMessageProperties().getDeliveryTag()
獲取當前消息的 deliveryTag
值。
手動ACK的意義體現(xiàn)在以下幾個方面:
1爬迟、確遍倜郏可靠性:消費者在處理消息時,可能會發(fā)生異掣杜唬或出現(xiàn)錯誤的情況计福。如果沒有手動ACK機制,當消費者在處理消息過程中出現(xiàn)異常時徽职,消息隊列將無法得知消息是否被成功處理象颖,可能會導(dǎo)致消息的丟失
或重復(fù)
消費。手動ACK可以確保消費者在處理消息成功后再發(fā)送確認姆钉,從而確保消息的可靠性说订。
2抄瓦、提高消息處理效率:手動ACK機制可以確保消息隊列在收到消費者的ACK確認后,【將該消息從隊列中移除
】克蚂,從而減少隊列的負荷
闺鲸。通過手動ACK,消費者可以在處理完一條消息后立即發(fā)送確認埃叭,減少了消息在隊列中滯留的時間摸恍,提高了消息的處理效率
。
手動ACK與消息確認機制和return機制之間的區(qū)別赤屋?
手動 ACK 與消息確認機制:手動 ACK 是消費者在處理完一條消息后立镶,向 RabbitMQ 發(fā)送確認信號,告知 RabbitMQ 已成功處理該消息的機制类早。通過手動 ACK媚媒,消費者
可以明確地通知 RabbitMQ 消息已經(jīng)被處理,避免消息丟失
或重復(fù)消費
的情況發(fā)生涩僻,手動ACK是消費端
的機制缭召。消息確認機制則是生產(chǎn)者端
的機制,用于確認消息已經(jīng)被發(fā)送到交換機并被消費者接收逆日,確保消息的可靠傳遞嵌巷。手動 ACK 和消息確認機制共同保證了消息在生產(chǎn)者和消費者之間的可靠
傳遞。
手動 ACK 與消息返回機制:消息返回機制會在消息無法被路由到隊列時將消息返回給生產(chǎn)者室抽。在這種情況下搪哪,消費者可能無法接收到消息,因此消費者不會發(fā)送 ACK坪圾。手動 ACK 可以確保消費者在成功處理消息后再發(fā)送確認信號晓折,而不是直接丟棄未處理的消息。通過手動 ACK兽泄,消費者能夠在實際處理消息后再確認消息的處理情況漓概,提高了消息傳遞的完整性和可靠性
消息的冪等性問題
要在 RabbitMQ 中實現(xiàn)冪等性,可以使用 Redis 的 SETNX`(Set if Not Exists)命令來進行輔助病梢。
- 意義:
確保消息被處理一次而不會重復(fù)處理
確保消息處理的唯一性和正確性
SETNX 是 Redis 提供的一種原子性命令
胃珍,用于設(shè)置指定鍵的值,僅當該鍵不存在時才會設(shè)置成功飘千。在 RabbitMQ 的消息處理中堂鲜,可以將消息的唯一標識
或關(guān)鍵信息作為鍵,將已處理消息的標記作為值存儲在 Redis 中护奈,并利用 SETNX 來保證只有第一次
處理消息時才會成功設(shè)置標記缔莲,從而保證消息的冪等性
下面是一個示例代碼:
@Component
@RabbitListener(queues = "queue01")
public class Consumer1 {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private final String LOCK_PREFIX = "messageLock:";
private final long LOCK_EXPIRE_TIME = 60 * 1000; // 鎖的過期時間,單位毫秒
@RabbitHandler
public void process(String msg, Channel channel, Message message) throws IOException {
// 使用消息的唯一標識作為鎖的 key
String lockKey = LOCK_PREFIX + message.getMessageProperties().getDeliveryTag();
Boolean isLocked = false;
try {
// 嘗試獲取鎖
isLocked = redisTemplate.opsForValue().setIfAbsent(lockKey, "");
// 設(shè)置鎖的過期時間
redisTemplate.expire(lockKey, LOCK_EXPIRE_TIME, TimeUnit.MILLISECONDS);
// 如果成功獲取到了鎖霉旗,則處理消息
if (isLocked) {
System.out.println("成功獲取到鎖并處理消息 msg = " + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else {
System.out.println("未獲取到鎖痴奏,消息已被其他消費者處理 msg = " + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
System.err.println("處理消息失敗 msg = " + msg);
} finally {
// 釋放鎖
if (isLocked) {
redisTemplate.delete(lockKey);
}
}
}
}
- 添加了
RedisTemplate<String, Object>
對象的自動裝配蛀骇,該對象用于與 Redis 進行交互。 - 聲明了一個以
messageLock:
作為前綴的鎖定鍵读拆,使用消息的唯一標識作為鎖的唯一標識擅憔。 - 使用
RedisTemplate
的opsForValue().setIfAbsent()
方法嘗試獲取鎖,如果成功獲取到鎖則繼續(xù)處理該消息檐晕,否則說明該消息已被其他消費者處理暑诸。 - 使用
redisTemplate.expire()
設(shè)置鎖的過期時間,避免死鎖辟灰。 - 在
finally
塊中个榕,如果成功獲取到了鎖,則釋放鎖芥喇。
延遲機制
未完待續(xù)