RabbitMQ 高級特性
消息可靠性
我們可以從以下幾方面來保證消息的可靠性:
- 客戶端代碼中的異常捕獲猛遍,包括生產(chǎn)者和消費者
- AMQP/RabbitMQ的事務機制
- 發(fā)送端確認機制
- 消息持久化機制
- Broker端的高可用集群
- 消費者確認機制
- 消費端限流
- 消息冪等性
異常捕獲機制
先執(zhí)行業(yè)務操作育灸,業(yè)務操作成功后執(zhí)行消息發(fā)送滩字,消息發(fā)送過程通過 try catch 方式捕獲異常,在異常處理的代碼塊中執(zhí)行回滾業(yè)務操作或者執(zhí)行消息重發(fā)操作等漓藕。這是一種最大努力確保的方式撵术,并無法保證100%絕對可靠话瞧,意味這里沒有異常并不代表消息就一定投遞成功。
boolean result = doBiz();
if (result){
try{
sendMsg();
} catch (Exception e){
// 業(yè)務回滾划滋、消息重發(fā)
rollbackBiz();
}
}
AMQP/RabbitMQ事務機制
沒有捕獲到異常并不能代表消息就一定投遞成功了处坪。
一直到事務提交后都沒有異常架专,確實說明消息是投遞成功了部脚,但是,這種方式在性能方面的開銷比較大丧没,一般不推薦使用呕童。
mport com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author jie.luo
* @since 2021/1/29
*/
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 聲明一個交換器
channel.exchangeDeclare("ex.rb", BuiltinExchangeType.DIRECT);
// 聲明一個消息隊列
channel.queueDeclare("queue.rb", false, false, false, null);
// 消息隊列與交換器綁定
channel.queueBind("queue.rb", "ex.rb", "key.rb");
try {
// 將channel設置為事務模式
channel.txSelect();
// 發(fā)送消息到交換器
channel.basicPublish("ex.rb", "key.rb", null, "message".getBytes());
// 提交事務夺饲,只有消息成功被Broker接收了才能提交成功
channel.txCommit();
} catch (Exception e) {
// 事務回滾
channel.txRollback();
}
channel.close();
connection.close();
}
}
發(fā)送端確認機制
RabbitMQ 后來引入了一種輕量級的方式套蒂,叫發(fā)送方確認(publisher confirm)機制操刀。生產(chǎn)者將信道設置成confirm(確認)模式骨坑,一旦信道進入 confirm 模式,所有在該信道上發(fā)布的消息都會被指派一個唯一的消息ID(從1開始)且警,一旦消息被投遞到所有匹配的隊列后(如果消息和隊列都是持久化的礁遣,那么確認消息會在消息持久化后發(fā)出)祟霍,RabbitMQ 就會發(fā)送一個確認(Basic.Ack)給生產(chǎn)者(包含消息的唯一ID)溶推,這樣生產(chǎn)者就知道消息已經(jīng)正確送達了冒萄。
RabbitMQ 回傳給生產(chǎn)者的確認消息中的 deliveryTag 字段包含了確認消息的序號她渴,另外棘伴,通過設置 channel.basicAck 方法中的 mutilple 參數(shù)焊夸,表示到這個序號之前的所有消息是否都已經(jīng)得到了處理了缰犁。生產(chǎn)者投遞消息后并不需要一直阻塞這帅容,可以繼續(xù)投遞下一條消息,并通過回調方式處理ack響應遣钳。如果 RabbitMQ 因為自身內部錯誤導致消息丟失等一次情況發(fā)送蕴茴,就會響應一條nack(Basic.Nack)命令姐直,生產(chǎn)者應用程序同樣可以在回調方法中處理該 nack 命令声畏。
同步確認消息
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 同步確認消息
*
* @author jie.luo
* @since 2021/1/29
*/
public class SyncPublisherConfirmsProducer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.confirmSelect();
// 聲明一個消息隊列
channel.queueDeclare("queue.pc", true, false, false, null);
// 聲明一個 交換器
channel.exchangeDeclare("ex.pc", BuiltinExchangeType.DIRECT, true, false, null);
// 將消息隊列和交換器綁定,并制定綁卡鍵
channel.queueBind("queue.pc", "ex.pc", "key.pc");
String message = "hello publisher confirm";
channel.basicPublish("ex.pc", "key.pc", null, message.getBytes());
try {
channel.waitForConfirmsOrDie(5_000);
System.out.println("消息被確認:message = " + message);
} catch (IOException e) {
e.printStackTrace();
System.err.println("消息被拒絕科展! message = " + message);
} catch (InterruptedException e) {
e.printStackTrace();
System.err.println("在不是Publisher Confirms的通道上使用該方法");
} catch (TimeoutException e) {
e.printStackTrace();
System.err.println("等待消息確認超時糠雨! message = " + message);
}
channel.close();
connection.close();
}
}
同步按批次確認消息
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 同步按批次確認消息
*
* @author jie.luo
* @since 2021/1/29
*/
public class SyncPublisherConfirmsBatchProducer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.confirmSelect();
// 聲明一個消息隊列
channel.queueDeclare("queue.pc", true, false, false, null);
// 聲明一個 交換器
channel.exchangeDeclare("ex.pc", BuiltinExchangeType.DIRECT, true, false, null);
// 將消息隊列和交換器綁定甘邀,并制定綁卡鍵
channel.queueBind("queue.pc", "ex.pc", "key.pc");
String message = "hello-";
// 批處理的大小
int batchSize = 10;
// 用于對需要等待確認消息的計數(shù)
int outStrandingConfirms = 0;
for (int i = 0; i < 103; i++) {
channel.basicPublish("ex.pc", "key.pc", null, (message + 1).getBytes());
outStrandingConfirms++;
if (outStrandingConfirms == batchSize) {
// 此時已經(jīng)有一個批次的消息需要同步等待broker的確認消息
channel.waitForConfirmsOrDie(5_000);
System.out.println("消息已經(jīng)被確認了");
outStrandingConfirms = 0;
}
}
if (outStrandingConfirms > 0) {
channel.waitForConfirmsOrDie(5_000);
System.out.println("剩余消息已經(jīng)被確認了");
}
channel.close();
connection.close();
}
}
通過回調確認消息
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
/**
* 通過回調確認消息
*
* @author jie.luo
* @since 2021/1/29
*/
public class AayncPublisherConfirmsProducer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.confirmSelect();
// 聲明一個消息隊列
channel.queueDeclare("queue.pc", true, false, false, null);
// 聲明一個 交換器
channel.exchangeDeclare("ex.pc", BuiltinExchangeType.DIRECT, true, false, null);
// 將消息隊列和交換器綁定乎澄,并制定綁卡鍵
channel.queueBind("queue.pc", "ex.pc", "key.pc");
ConcurrentNavigableMap<Long, String> outStandingConfirms = new ConcurrentSkipListMap<>();
// 設置channel的監(jiān)聽器测摔,處理確認消息和不確認的消息
channel.addConfirmListener(new ConfirmCallback() {
@Override
public void handle(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
System.out.println("編號小于等于 " + deliveryTag + " 的消息都已經(jīng)被確認了");
ConcurrentNavigableMap<Long, String> headMap = outStandingConfirms.headMap(deliveryTag, true);
// 清空 outStandingConfirms 中已經(jīng)被確認的消息信息
headMap.clear();
} else {
System.out.println("編號為:" + deliveryTag + " 的消息被確認");
// 移除已經(jīng)被確認的消息
outStandingConfirms.remove(deliveryTag);
}
}
}, new ConfirmCallback() {
@Override
public void handle(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
System.out.println("編號小于等于 " + deliveryTag + " 的消息【不】確認");
} else {
System.out.println("編號為:" + deliveryTag + " 的消息【不】確認");
}
}
});
for (int i = 0; i < 1000; i++) {
// 獲取下一條即將發(fā)送消息的消息id
long nextPublishSeqNo = channel.getNextPublishSeqNo();
String message = "message-" + 1;
channel.basicPublish("ex.pc", "key.pc", null, message.getBytes());
System.out.println("編號為:" + nextPublishSeqNo + " 的消息還未確認");
outStandingConfirms.put(nextPublishSeqNo, message);
}
Thread.sleep(10000);
channel.close();
connection.close();
}
}
Spring Boot 案例
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* @author jie.luo
* @since 2021/2/1
*/
@RestController
public class RabbitDemoController {
private RabbitTemplate rabbitTemplate;
@Autowired
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
this.rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息確認:" + correlationData.getId() + " " + new String(correlationData.getReturnedMessage().getBody()));
} else {
System.out.println(cause);
}
}
});
}
@GetMapping("/sendMsg")
public String sendMsg(@RequestParam int index) throws Exception {
MessageProperties properties = new MessageProperties();
properties.setCorrelationId("1234");
properties.setConsumerTag("msg-" + index);
properties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
properties.setContentEncoding("utf-8");
// 設置消息持久化
properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
CorrelationData cd = new CorrelationData();
cd.setId("msg" + index);
cd.setReturnedMessage(new Message("這是msg1的響應信息".getBytes("utf-8"), null));
Message message = new Message("這是等待確認的消息".getBytes("utf-8"), properties);
this.rabbitTemplate.convertAndSend("ex.ca", "key.ca", message, cd);
return "ok";
}
}
消息持久化機制
持久化是提高 RabbitMQ 可靠性的基礎,否則當 RabbitMQ 遇到異常時(重啟挟纱、端點紊服、停機)數(shù)據(jù)將會丟失,主要從以下幾方面來保障曉得的持久性:
-
Exchange持久化
通過定義時設置 durable 參數(shù)為 ture 來保證 Exchange 相關的元數(shù)據(jù)不丟失
-
Queue持久化
通過定義時設置 durable 參數(shù)為 ture 來保證 Queue 相關的元數(shù)據(jù)不丟失
-
消息的持久化
通過將消息的投遞模式(BasicProperties 中的 deliveryMode 屬性)設置為 2 参萄,即可實現(xiàn)消息的持久化讹挎,保證消息自身不丟失吆玖。
import com.rabbitmq.client.*;
/**
* @author jie.luo
* @since 2021/1/29
*/
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 聲明一個交換器
channel.exchangeDeclare("ex.per", BuiltinExchangeType.DIRECT, true, false, null);
// 聲明一個消息隊列
channel.queueDeclare("queue.per", true, false, false, null);
// 消息隊列與交換器綁定
channel.queueBind("queue.per", "ex.per", "key.per");
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.contentEncoding("text/plain");
// 消息持久化
builder.deliveryMode(2);
// 發(fā)送消息到交換器
channel.basicPublish("ex.per", "key.per", null, "this is persistent message".getBytes());
channel.close();
connection.close();
}
}
消費者確認機制
如何保證消息被消費者成功消費沾乘?
消息被消費者消費的過程中業(yè)務失敗了但是消息已經(jīng)出列了(被標記為已經(jīng)消費了)翅阵,我們又沒有任何重試尽爆,那么結果通消息丟失沒有什么分別。
RabbitMQ 在消費端會有 Ack 機制夭委,即消費端消息消費后募强,需要發(fā)送 Ack 確認報文給Broker端擎值,告知自己是否已經(jīng)消費完成,否則可能會一直重發(fā)消息到消息過期(AUTO模式)
這也是 最終一致性屹蚊、可恢復性 的基礎进每。
消費者消息確認模式:
-
none模式
消費的過程中自行捕獲異常田晚,引發(fā)異常后直接記錄日志并落到異常恢復表芹壕,再通過后臺定時任務掃描異程哂浚恢復表嘗試做重試動作鲫趁。如果業(yè)務不自行處理則有丟失數(shù)據(jù)的風險挨厚。
-
auto模式
自動Ack模式,不主動捕獲異常钉疫,當小費過程中出現(xiàn)異常時會將消息放回Queue中牲阁,然后消息會被重新分配到其他消費節(jié)點(如果沒有則還是選擇當前節(jié)點)重新被消費,默認會一直重發(fā)消息并指導消費完成返回Ack或一直到過期
-
manual模式
手動Ack模式备燃,消費者咨詢控制流程并手動調用channel相關的方法返回Ack
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author jie.luo
* @since 2021/1/29
*/
public class MyConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare("queue.ca", false, false, false, null);
channel.basicConsume("queue.ca", false, "myConsumer", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
System.out.println(new String(body));
// 確認消息
// 第一個參數(shù)是消息的標簽
// 第二個參數(shù)標識確認多個消息還是一個消息并齐,false-確認一個消息
// channel.basicAck(envelope.getDeliveryTag(), false);
// 用于拒收多條消息
// 第一個參數(shù)是消息的標簽
// 第二個參數(shù)表示不確認多個消息還是一個消息况褪,false-不確認一個消息
// 第三個參數(shù)表示不確認的消息是否需要重新入列更耻,然后重發(fā)秧均,true-需要重新入列
// channel.basicNack(envelope.getDeliveryTag(), false, true);
// 用于拒收一條消息
// 第二個參數(shù)表示不確認的消息是否需要重新入列,然后重發(fā)疙描,true-需要重新入列
channel.basicReject(envelope.getDeliveryTag(), true);
}
});
// channel.close();
// connection.close();
}
}
消費端限流
在電商的秒殺活動中起胰,活動一開始會有大量并發(fā)寫請求到達服務端巫延,需要對消息進行削峰處理炉峰,如何削峰疼阔?
當消息投遞速度遠快于消費速度時,隨著時間積累就會出現(xiàn)“消息積壓”迅细。消息中間件本身是具備一定的緩沖能力的茵典,但這個能力是有容量限制的宾舅,如果長期運行并沒有任何處理,最終會導致Broker崩潰帆离,而分布式系統(tǒng)的故障往往會發(fā)生上下游傳遞蜻直,連鎖反應那就會很悲劇...
一概而、RabbitMQ 可以對內存和磁盤使用量設置閾值囱修,當達到閾值后破镰,生產(chǎn)者將被阻塞(block)鲜漩,直到對應項指標恢復正常。全局上可以防止超大流量踩娘、消息積壓等導致的Broker被壓垮养渴。當內存受限或磁盤可用空間受限的時候泛烙,服務器都會暫時阻止連接蔽氨,服務器將暫停從發(fā)布消息的已連接客戶端的套接字讀取數(shù)據(jù)。連接心跳監(jiān)視也將被禁用宇立。所有網(wǎng)絡連接將在rabbitmqctl和管理插件中顯示為“已阻止”泄伪,這意味著它們尚未嘗試發(fā)布匿级,因此可以繼續(xù)或被阻止,這意味著它們已發(fā)布肖粮,現(xiàn)在已暫停尔苦。兼容的客戶端被阻止時將收到通知允坚。
在 /etc/rabbitmq/rabbitmq.conf中配置磁盤可用空間大小
# 設置磁盤可用空間大小稠项,單位字節(jié)。
# 當磁盤可用空間低于這個值的時候活逆,發(fā)出磁盤警告蔗候,觸發(fā)限流埂软。
# 如果設置了相當大小仰美,則忽略此絕對大小
disk_free_limit.absolute = 50000
# 使用計量單位咖杂,從RabbitMQ 3.6.0開始有效。對vm_memory_high_watermark同樣有效
disk_free_limit.absolute = 500KB
disk_free_limit.absolute = 50mb
disk_free_limit.absolute = 50GB
# 還可以使用相對于總可用內存的相對值來設置懦尝。
# 注意:此相對值不要低于1.0陵霉!當磁盤可用空間低于總可用內存的2.0倍的時候踊挠,觸發(fā)限流
disk_free_limit.relative = 2.0
# 內存限流閾值
# 0.4 表示閾值和總可用內存的比值《米茫總可用內存表示操作系統(tǒng)給每個進程分配的大小憋沿,或實際內存大小
# 如32位Windows沪猴,系統(tǒng)給每個進程最大2GB的內存运嗜,則此比值表示閾值為820MB
# vm_memory_high_watermark.relative = 0.4
# 還可以直接通過絕對值限制可用內存的大小洗出。單位字節(jié)。
# vm_memory_high_watermark.absolute = 1073741824
# 從RabbitMQ 3.6.0 開始,絕對值支持計量單位菠镇。如果設置了相對值利耍,則忽略此絕對值隘梨。
# vm_memory_high_watermark.absolute = 2GB
# 支持的單位:
# k,kib: kibibytes (2^10 - 1,024 bytes )
# M,Mib: mebibytes (2^20 - 1,048,576 bytes)
# G,Gib: gibibytes (2^30 - 1,073,741,824 bytes)
# kb: kilobytes (10^3 - 1,000 bytes)
# MB: megabytes (10^6 - 1,000,000 bytes)
# GB: gigabytes (10^9 - 1,000,000,000 bytes)
二舷嗡、RabbitMQ還默認提供一種基于 credit flow 的 流控 機制进萄,迷香每一個連接進行流控中鼠,當單個隊列達到最大流速時,或者多個隊列達到總流速是矛渴,都會觸發(fā)流控具温。觸發(fā)單個鏈接的流控可能是因為 connection桂躏、channel、queue的某一個過程處于flow狀態(tài)蛮位,這些狀態(tài)都可以從監(jiān)控平臺看到失仁。
三萄焦、RabbitMQ中有一種QoS保證機制拂封,可以 **限制Channel上接收到的未被Ack的消息數(shù)量 **冒签,如果超過這個數(shù)量限制RabbitMQ將不會再往消費端推送消息钟病。這是一種流控手段肠阱,可以防止大量消息瞬時從Broker送達消費端造成消費端巨大壓力(甚至壓垮消費端)屹徘。 比較值得注意的是 QoS機制僅對于消費端推模式有效缘回,對拉模式無效。
而且不支持NONE Ack模式啦吧。執(zhí)行channel.basicConsume
方法之前通過 channel.basicQoS
方法可以設置該數(shù)量授滓。消息的發(fā)送是異步的般堆,消息的確認也是異步的。 在消費者消費慢的時候私沮,可以設置Qos的prefetchCount仔燕,它表示broker在向消費者發(fā)送消息的時候晰搀,一旦發(fā)送了prefetchCount個消息而沒有一個消息確認的時候办斑,就停止發(fā)送乡翅。消費者確認一個蠕蚜,broker就發(fā)送一個波势,確認兩 個就發(fā)送兩個橄维。換句話說争舞,消費者確認多少竞川,broker就發(fā)送多少,消費者等待處理的個數(shù)永遠限制在prefetchCount個床牧。
如果對于每個消息都發(fā)送確認戈咳,增加了網(wǎng)絡流量著蛙,此時可以批量確認消息踏堡。如果設置了multiple為true,消費者在確認的時候诫隅,比如說id是8的消息確認了阎肝,則在8之前的所有消息都確認了风题。
生產(chǎn)者往往是希望自己產(chǎn)生的消息能快速投遞出去嫉父,而當消息投遞太快且超過了下游的消費速度時就容易出現(xiàn)消息積壓/堆積绕辖,所以仪际,從上游來講我們應該在生產(chǎn)端應用程序中也可以加入限流树碱、應急關等控制手段,避免超過Broker端的極限承載能力或者壓垮下游消費者框舔。
再看看下游刘绣,我們期望下游消費端能盡快消費完消息纬凤,而且還要防止瞬時大量消息壓垮消費端(推模式)移斩,我們期望消費端處理速度是最快向瓷、最穩(wěn)定而且還相對均勻(比較理想化)。
**提升下游應用的吞吐量** 和 **縮短消費過程的耗時** 你稚,優(yōu)化主要以下幾種方式:
1刁赖、有害應用程序的性能宇弛,縮短響應時間(需要時間)
2枪芒、增加消費者節(jié)點實例(成本增加谁尸,而且提成數(shù)據(jù)庫操作這些也可能是瓶頸)
3良蛮、調整并發(fā)消費的線程數(shù)(線程數(shù)并非越大越好决瞳,需要大量延遲調優(yōu)至合理值)
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
// 設置 channel 并發(fā)請求最大數(shù)
factory.setRequestedChannelMax(10);
// 設置自定義的線程工廠
ThreadFactory threadFactory = Executors.defaultThreadFactory();
factory.setThreadFactory(threadFactory);
@Bean
public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
// SimpleRabbitListenerContainerFactory發(fā)現(xiàn)消息中有content_type有text就會默認將其
// 轉換為String類型的皮胡,沒有content_type都按byte[]類型
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 設置并發(fā)線程數(shù)
factory.setConcurrentConsumers(10);
// 設置最大并發(fā)線程數(shù)
factory.setMaxConcurrentConsumers(20);
return factory;
}
# 并行消費數(shù)
spring.rabbitmq.listener.simple.concurrency=5
# 最大并行消費數(shù)
spring.rabbitmq.listener.simple.max-concurrency=10
消息可靠性保障
消息可靠性保障:
- 消息傳輸保障
- 各種限流、應急手段
- 業(yè)務層面的一些容錯烹笔、補償谤职、異常重試等手段
消息可靠傳輸 一般是業(yè)務系統(tǒng)接入消息中間件時 首要考慮的問題亿鲜,一般消息中間件的消息傳輸保障分為三個層級:
-
At most once
最多一次。消息可能會丟失漩蟆,但絕不會重復傳輸
-
At least once
最少一次怠李。消息絕不會丟失捺癞,但可能會重復傳輸
-
Exactly once
恰好一次髓介。每條消息肯定會被傳輸一次且僅傳輸一次
其中“最少一次”投遞實現(xiàn)需要考慮以下幾個方面內容:
- 消息生產(chǎn)者需要開啟事務機制或者publisher confirm 機制唐础,以確保消息可以可靠地傳輸?shù)?RabbitMQ中彻犁。
- 消息生產(chǎn)者需要配置使用
mandatory
參數(shù)或者備份交換器來確保消息能夠從交換器路由到隊列中汞幢,進而能夠保存下來而不會被丟棄森篷。 - 消息和隊列都需要進行持久化處理豺型,以確保 RabbitMQ服務器在遇到異常情況時不會造成消息丟失姻氨。
- 消費者在消費消息的同時需要將 autoAck 設置為 false肴焊,然后通過手動確認的方式去確認已經(jīng)正確消費的消息娶眷,以避免在消費端引起不必要的消息丟失届宠。
“最多一次” 的方式就無須考慮以上那些方面,生產(chǎn)者隨意發(fā)送灯萍,消費者隨意消費竟稳,不過這樣很難確保消息不會丟失熊痴。(估計有不少公司的業(yè)務系統(tǒng)都是這樣的果善,想想都覺得可怕)
“恰好一次” 是RabbitMQ目前無法保障的巾陕。
考慮這樣一種情況鄙煤,消費者在消費完一條消息之后向RabbitMQ 發(fā)送確認Basic.Ack 命令梯刚,此時由于網(wǎng)絡斷開或者其他原因造成RabbitMQ 并沒有收到這個確認命令亡资,那么RabbitMQ 不會將此條消息標記刪除。在重新建立連接之后锥腻,消費者還是會消費到這一條消息嗦董,這就造成了重復消費。
再考慮一種情況瘦黑,生產(chǎn)者在使用publisher confirm機制的時候京革,發(fā)送完一條消息等待RabbitMQ返回確認通知,此時網(wǎng)絡斷開幸斥,生產(chǎn)者捕獲到異常情況,為了確保消息可靠性選擇重新發(fā)送睡毒,這樣RabbitMQ 中就有兩條同樣的消息,在消費的時候消費者就會重復消費冗栗。
消息冥等性處理
剛剛我們講到演顾,追求高性能就無法保證消息的順序供搀,而追求可靠性那么就可能產(chǎn)生重復消息,從而導致重復消費...真是應證了那句老話:做架構就是權衡取舍钠至。
RabbitMQ層面有實現(xiàn)“**去重機制**”來保證“**恰好一次**”嗎葛虐?答案是并沒有。而且這個在目前主流的消息中間件都沒有實現(xiàn)棉钧。
借用淘寶沈洵的一句話:最好的解決辦法就是不去解決屿脐。當為了在基礎的分布式中間件中實現(xiàn)某種相對不太通用的功能,需要犧牲到性能宪卿、可靠性的诵、擴展性時,并且會額外增加很多復雜度佑钾,最簡單的辦法就是**交給業(yè)務自己去處理**西疤。事實證明,很多業(yè)務場景下是可以容忍重復消息的休溶。例如:操作日志收集代赁,而對一些金融類的業(yè)務則要求比較嚴苛
一般解決重復消息的辦法是,在消費端讓我們消費消息的操作具備冪等性兽掰。
冪等性問題并不是消息系統(tǒng)獨有芭碍,而是(分布式)系統(tǒng)中普遍存在的問題。例如:RPC框架調用超后會重試孽尽,HTTP請求會重復發(fā)起(用戶手抖多點了幾下按鈕)
冪等(Idempotence)是一個數(shù)學上的概念窖壕,它是這樣定義的:
如果一個函數(shù)f(x) 滿足:f(f(x)) = f(x),則函數(shù)f(x) 滿足冪等性泻云。這個概念被拓展到計算機領域艇拍,被用來描述一個操作、方法或者服務宠纯。
一個冪等操作的特點是卸夕,其任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同。一個冪等的方法婆瓜,使用同樣的參數(shù)快集,對它進行多次調用和一次調用,對系統(tǒng)產(chǎn)生的影響是一樣的廉白。
對于冪等的方法个初,不用擔心重復執(zhí)行會對系統(tǒng)造成任何改變。
舉個簡單的例子(在不考慮并發(fā)問題的情況下):
select * from xx where id=1
delete from xx where id=1
這兩條sql語句就是天然冪等的猴蹂,它本身的重復執(zhí)行并不會引起什么改變院溺。而update就要看情況的,
update xxx set amount = 100 where id =1
這條語句執(zhí)行1次和100次都是一樣的結果(最終余額都還是100)磅轻,所以它是滿足冪等性的珍逸。
而
update xxx set amount = amount + 100 where id =1
它就不滿足冪等性的逐虚。
業(yè)界對于冪等性的一些常見做法:
借助數(shù)據(jù)庫唯一索引,重復插入直接報錯谆膳,事務回滾叭爱。還是舉經(jīng)典的轉賬的例子,為了保證不重復扣款或者重復加錢漱病,我們這邊維護一張“資金變動流水表”买雾,里面至少需要交易單號、變動賬戶杨帽、變動金額等3個字段漓穿。我們選擇交易單號和變動賬戶做聯(lián)合唯一索引(單號是上游生成的可保證唯一性),這樣如果同一筆交易發(fā)生重復請求時就會直接報索引沖突睦尽,事務直接回滾∑骶唬現(xiàn)實中,數(shù)據(jù)庫唯一索引的方式通常做為兜底保證当凡;
前置檢查機制山害。這個很容易理解,并且有幾種實現(xiàn)辦法沿量。還是引用上面轉賬的例子浪慌,當我在執(zhí)行更改賬戶余額這個動作之前,我得先檢查下資金變動流水表(或者Tair中)中是否已經(jīng)存在這筆交易相關的記錄了朴则,
select * from xxx where accountNumber=xxx andorderId=yyy
权纤,如果已經(jīng)存在,那么直接返回乌妒,否則執(zhí)行正常的更新余額的動作汹想。為了防止并發(fā)問題,我們通常需要借助“排他鎖”來完成撤蚊。在支付寶有一條鐵律叫:一鎖古掏、二判、三操作侦啸。當然槽唾,我們也可以使用樂觀鎖或CAS機制,樂觀鎖一般會使用擴展一個版本號字段做判斷條件唯一Id機制光涂,比較通用的方式庞萍。對于每條消息我們都可以生成唯一Id,消費前判斷Tair中是否存在(MsgId做Tair排他鎖的key)忘闻,消費成功后將狀態(tài)寫入Tair中钝计,這樣就可以防止重復消費了。
對于接口請求類的冪等性保證要相對更復雜,我們通常要求上游請求時傳遞一個類GUID的請求號(或TOKEN)私恬,如果我們發(fā)現(xiàn)已經(jīng)存在了并且上一次請求處理結果是成功狀態(tài)的(有時候上游的重試請求是正常訴求交播,我們不能將上一次異常/失敗的處理結果返回或者直接提示“請求異常”践付,如果這樣重試就變得沒意義了)則不繼續(xù)往下執(zhí)行,直接返回“重復請求”的提示和上次的處理結果(上游通常是由于請求超時等未知情況才發(fā)起重試的缺厉,所以直接返回上次請求的處理結果就好了)永高。如果請求ID都不存在或者上次處理結果是失敗/異常的,那就繼續(xù)處理流程提针,并最終記錄最終的處理結果命爬。這個請求序號由上游自己生成,上游通用需要根據(jù)請求參數(shù)辐脖、時間間隔等因子來生成請求ID饲宛。同樣也需要利用這個請求ID做分布式鎖的KEY實現(xiàn)排他。
TTL機制
TTL嗜价,Time to Live 的簡稱艇抠,即過期時間
RabbitMQ 可以對**消息**和**隊列**兩個維度來設置TTL
任何消息中間件的容量和堆積能力都是有限的,如果有一些消息總是被消費掉久锥,那么需要有一種過期的機制來做兜底家淤。
目前有兩種方式可以設置消息的TTL。
通過Queue屬性設置瑟由,隊列中所有消息都有相同的過期時間絮重。
-
對消息自身進行單獨設置,每條消息的TTL可以不同歹苦。
如果兩種方法一起使用青伤,則消息的TTL以兩者之間較小數(shù)字為準。通常來講殴瘦,消息在隊列中的生成時間一旦超過設置的TTL值時狠角,就會變成“死信”(Dead Message),消費者默認就無法再收到該消息痴施。當然“死信”也可以被取出來消費的擎厢。
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare("ex.ttl",BuiltinExchangeType.DIRECT);
// 創(chuàng)建隊列(實際上使用的是AMQP default這個direct類型的交換器)
// 設置隊列屬性
Map<String, Object> arguments = new HashMap<>();
// 設置隊列的TTL
arguments.put("x-message-ttl", 30000);
// 設置隊列的空閑存活時間(如該隊列根本沒有消費者,一直沒有使用辣吃,隊列可以存活多久)
arguments.put("x-expires", 10000);
channel.queueDeclare("queue.ttl", false, false, false, arguments);
channel.queueBind("queue.ttl","ex.ttl","key.ttl");
for (int i = 0; i < 1000000; i++) {
String message = "Hello World!" + i;
channel.basicPublish(
"ex.ttl",
"key.ttl",
new AMQP.BasicProperties().builder().expiration("30000").build(),
message.getBytes()
);
System.out.println(" [X] Sent '" + message + "'");
}
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
此外动遭,還可以通過命令行方式設置全局TTL,執(zhí)行如下命令:
rabbitmqctl set_policy TTL ".*" '{"message-ttl":30000}' --apply-to queues
默認規(guī)則:
- 如果不設置TTL神得,則表示此消息不會過期
- 如果TTL設置為0厘惦,則表示除非此時可以直接將消息投遞到消費者,否則消息會被立即丟棄
注意理解 message-tt
l 、 x-expires
這兩個參數(shù)的區(qū)別宵蕉,有不同的含義酝静。但是這兩個參數(shù)屬性都遵循上面的默認規(guī)則。一般TTL相關的參數(shù)單位都是**毫秒(ms) **
死信隊列
在定義業(yè)務隊列時羡玛,可以考慮制定一個 死信交換器别智,并綁定一個死信隊列。當消息變成死信時稼稿,該消息就會發(fā)送到該死信隊列上薄榛,這樣方便我們查看消息失敗的原因。
DLX让歼、全稱:Dead-Letter-Exchange敞恋,死信交換器。消息在一個隊列中變成死信(Dead Letter)之后谋右,被重新發(fā)送到一個特殊的交換器(DLX)中硬猫,同時綁定DLX的隊列就變成 死信隊列
以下幾種情況導致消息變成死信:
- 消息被拒絕(Basic.Reject/Basic.Nack),并且設置requeue參數(shù)為false
- 消息過期
- 隊列達到最大長度
對RabbitMQ來說,DLX是一個非常有用的特性改执。它可以處理異常情況下啸蜜,消息不能夠被消費者正常消費(消費者調用了Basic.Nack
或Basic.Reject
)而被置入死信隊列中的情況,后續(xù)分析程序可以通過消費這個死信隊列的內容來分析當時所遇到的異常情況天梧,進而可以改善和優(yōu)化系統(tǒng)盔性。
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 定義一個死信交換器(也是一個普通的交換器)
channel.exchangeDeclare("exchange.dlx", "direct", true);
// 定義一個正常業(yè)務的交換器
channel.exchangeDeclare("exchange.biz", "fanout", true);
Map<String, Object> arguments = new HashMap<>();
// 設置隊列TTL
arguments.put("x-message-ttl", 10000);
// 設置該隊列所關聯(lián)的死信交換器(當隊列消息TTL到期后依然沒有消費,則加入死信隊列)
arguments.put("x-dead-letter-exchange", "exchange.dlx");
// 設置該隊列所關聯(lián)的死信交換器的routingKey呢岗,如果沒有特殊指定冕香,使用原隊列的routingKey
arguments.put("x-dead-letter-routing-key", "routing.key.dlx.test");
channel.queueDeclare("queue.biz", true, false, false, arguments);
channel.queueBind("queue.biz", "exchange.biz", "");
channel.queueDeclare("queue.dlx", true, false, false, null);
// 死信隊列和死信交換器
channel.queueBind("queue.dlx", "exchange.dlx", "routing.key.dlx.test");
channel.basicPublish("exchange.biz", "", MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx.test".getBytes());
} catch (Exception e) {
e.printStackTrace();
}
延遲隊列
延遲消息是指的消息發(fā)送出去后并不想立即就被消費,而是需要等(指定的)一段時間后才觸發(fā)消費后豫。
例如下面的業(yè)務場景:
在支付寶上面買電影票悉尾,鎖定了一個座位后系統(tǒng)默認會幫你保留15分鐘時間,如果15分鐘后還沒付款那么不好意思系統(tǒng)會自動把座位釋放掉挫酿。怎么實現(xiàn)類似的功能呢构眯?
- 可以用定時任務每分鐘掃一次,發(fā)現(xiàn)有占座超過15分鐘還沒付款的就釋放掉早龟。但是這樣做很低效惫霸,很多時候做的都是些無用功
- 可以用分布式鎖、分布式緩存的被動過期時間葱弟,15分鐘過期后鎖也釋放了壹店,緩存key也不存在了
- 還可以用延遲隊列,鎖座成功后會發(fā)送1條延遲消息芝加,這條消息15分鐘后才會被消費硅卢,消費的過程就是檢查這個座位是否已經(jīng)是“已付款”狀態(tài)
RabbitMQ 延遲隊列可以使用 rabbitmq_delayed_message_exchange
插件來實現(xiàn)
這里和TTL方式有個很大的不同就是TTL存放消息在死信隊列(delayqueue)里,而基于插件存放消息在延時交換機里(x-delayed-message exchange)
[圖片上傳失敗...(image-103635-1612746288866)]
- 生產(chǎn)者將消息(msg)和路由鍵(route key)發(fā)送指定的延時交換器(exchange)上
- 延時交換器(exchange)存儲消息等待消息到期根據(jù)路由鍵(route key)找到綁定自己的隊列(queue)并把消息給它
- 隊列(queue)再把消息發(fā)送給監(jiān)聽它的消費者(customer)
插件安裝步驟:
下載地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
-
安裝插件
將插件拷貝到 rabbitmq-server的安裝路徑:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.4/plugins
-
啟動插件
rabbitmq-plugins list rabbitmq-plugins enable rabbitmq_delayed_message_exchange
-
重啟 rabbitmq-server
systemctl restart rabbitmq-server
案例
配置信息
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* 延時隊列配置
*
* @author jie.luo
* @since 2021/2/1
*/
@Configuration
public class RabbitDelayConfig {
@Bean
public Queue queue() {
return new Queue("queue.delay", true, false, false, null);
}
@Bean
public Exchange exchange() {
Map<String, Object> props = new HashMap<>();
props.put("x-delayed-type", ExchangeTypes.DIRECT);
return new CustomExchange("ex.delay", "x-delayed-message", true, false, props);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with("key.delay").noargs();
}
}
消息生產(chǎn)者
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* @author jie.luo
* @since 2021/2/1
*/
@RestController
public class RabbitDemoController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send-delay-message")
public String sendDelayMsg(@RequestParam int delayTime) throws Exception {
MessageProperties properties = new MessageProperties();
// 指定消息延時時間
properties.setHeader("x-delay", delayTime * 1000);
String msg = "這是延遲消息,延遲時間: " + delayTime + " s";
Message message = new Message(msg.getBytes("utf-8"), properties);
this.rabbitTemplate.convertAndSend("ex.delay", "key.delay", message);
return "ok";
}
}
消費者
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.time.LocalDateTime;
/**
* 通過監(jiān)聽器消費
*
* @author jie.luo
* @since 2021/2/1
*/
@Component
public class BootConsumer {
@RabbitListener(queues = "queue.delay")
public void handlerDelayMessage(Message message, Channel channel) throws IOException {
System.out.println("消費消息: " + new String(message.getBody()) + " 消費時間: " + LocalDateTime.now());
// 手動確認消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}