RMQ可靠性投遞
一.什么是RMQ的可靠性投遞
1.保障消息的成功發(fā)出
2.保障MQ節(jié)點的成功接收
3.發(fā)送端收到MQ節(jié)點(broker)確認應答
4.完善的消息補償機制
在實際生產中,很難保障前三點的完全可靠欧募,比如在極端的環(huán)境中,生產者發(fā)送消息失敗了处坪,發(fā)送端在接受確認應答時突然發(fā)生網絡閃斷等等情況,很難保障可靠性投遞颤枪,所以就需要有第四點完善的消息補償機制寿烟。
二虽抄、大廠的解決方案
第一種:消息落庫
流程的示意圖如上所示:
step1:比如我下單成功了走搁,對我的業(yè)務數據進行入庫,同時對消息數據入庫迈窟;
setp2:發(fā)送消息到MQ服務上私植;
step3: 按照正常的流程就是消費者監(jiān)聽到該消息,就根據唯一id修改該消息的狀態(tài)為已消費车酣,并給一個確認應答ack到Listener曲稼。
step4: 修改消息的投遞狀態(tài);
step5: 如果出現意外情況湖员,消費者未接收到或者Listener接收確認時發(fā)生網絡閃斷贫悄,接收不到,這時候就需要用到我們的分布式定時任務來從msg數據庫抓取那些超時了還未被消費的消息;
step6: 對超時任務重新發(fā)送一遍娘摔;
step7: 重試機制里面要設置重試次數限制窄坦,因為一些外部的原因導致一直發(fā)送失敗的,不能重試太多次凳寺,要不然會拖垮整個服務鸭津。
例如重試三次還是失敗的,就把消息的status設置成2肠缨,然后通過補償機制逆趋,人工去處理。實際生產中晒奕,這種情況還是比較少的闻书,但是你不能沒有這個補償機制,要不然就做不到可靠性了脑慧。
優(yōu)點:
架構實現清晰魄眉,流程簡單;
缺點:
一次業(yè)務操作2次入庫漾橙,高并發(fā)場景下影響性能杆融。
有人可能就想到采用分布式事務來保證數據的一致性,但是在大型互聯網中霜运,基本很少采用事務脾歇,都是采用補償機制蒋腮。
第二種:延遲投遞,做二次確認藕各,回調檢查池摧。
既然第一種方案在高并發(fā)場景下不適合,這時候需要我們的第二種方案激况,流程圖如下作彤。
step1:完成業(yè)務操作落庫后,發(fā)送消息出去乌逐;
step2: 業(yè)務消息發(fā)送后竭讳,延遲3分鐘或5分鐘再發(fā)送消息數據;
step3: 消費者訂閱消息進行消費;
step4: 完成后重新組裝一個確認消費的消息浙踢;
step5: 補償服務監(jiān)聽消費者發(fā)送到特定隊列的確認消息绢慢,修改消息數據庫的信息;
step6: 補償服務延遲檢查生產者發(fā)出的延遲檢查的消息洛波,去消息數據庫判斷胰舆,如果已經消費成功,檢查通過蹬挤;
step7: 如果檢查沒通過缚窿,則向生產者發(fā)送重新發(fā)送消息的命令。
雖然第二種方案也是無法做到100%的可靠傳遞焰扳,在特別極端的情況倦零,還是需要定時任務和補償機制進行輔助的。但是第二種方案的核心是減少數據庫操作蓝翰,在高并發(fā)場景下光绕,我們考慮的不是百分百的可靠性了,而是考慮可用性畜份,性能能否扛得住這個流量诞帐,所以我能減少一次數據庫操作就減少一次。我上游服務減少了一次數據庫操作爆雹,我的服務性能相對而言就提高了一些停蕉,而且又能把異步callback Server補償服務解耦出來。
confirm機制
- 消息的確認钙态,是指生產投遞消息后慧起,如果broker收到消息,會給生產者一個應答
- 生產者接受應答册倒,用來確認這條消息是否正常發(fā)送到broker
如何實現消息的確認機制蚓挤?
第一步在channe開啟確認模式,channel.confirmSelect();
第二步在channel添加監(jiān)聽,addConfirmListener,根據監(jiān)聽的結果對消息進行重新發(fā)送或記錄日志等處理灿意。
代碼如下:
Producer類
package com.bfxy.rabbitmq.api.confirm;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) throws Exception {
//1 創(chuàng)建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("10.136.198.52");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
//2 獲取C onnection
Connection connection = connectionFactory.newConnection();
//3 通過Connection創(chuàng)建一個新的Channel
Channel channel = connection.createChannel();
//4 指定我們的消息投遞模式: 消息的確認模式
channel.confirmSelect();
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.save";
//5 發(fā)送一條消息
String msg = "Hello RabbitMQ Send confirm message!";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
//6 添加一個確認監(jiān)聽
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------no ack!-----------");
}
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------ack!-----------");
}
});
}
}
Consumer類
package com.bfxy.rabbitmq.api.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class Consumer {
public static void main(String[] args) throws Exception {
//1 創(chuàng)建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("10.136.198.52");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
//2 獲取C onnection
Connection connection = connectionFactory.newConnection();
//3 通過Connection創(chuàng)建一個新的Channel
Channel channel = connection.createChannel();
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.#";
String queueName = "test_confirm_queue";
//4 聲明交換機和隊列 然后進行綁定設置, 最后制定路由Key
channel.exchangeDeclare(exchangeName,"topic",true);
channel.queueDeclare(queueName,true,false,false,null); channel.queueBind(queueName,exchangeName,routingKey);
//5 創(chuàng)建消費者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//6 設置信道 channel.basicConsume(queueName,true,queueingConsumer);
//7 消費消息內容
while (true){
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("消費端:"+ msg);
}
}
}
先啟動消費者估灿,再啟動生產者: 消費者打印出消費成功的信息,然后重新發(fā)送消息缤剧,生產者接收到確認消費成功的消息后馅袁,打印出確認信息。
首先消費者控制臺如下
然后生產者控制臺打印信息如下: