文章內(nèi)容:
- 第1-4章:發(fā)送方確認(rèn)機(jī)制(Publisher Confirms)暴区,推薦使用異步確認(rèn)(ConfirmCallback接口)。
- 第5章:Alternate Exchange及老。
- 第6章:實(shí)現(xiàn)ReturnCallback接口势似,用來接收沒有Queue的退回消息。
關(guān)于如何確保發(fā)送方數(shù)據(jù)安全的問題稚新,官網(wǎng)也作了詳細(xì)的解釋:
- Publisher-side data safety topics (connection recovery, publisher confirms) :https://www.rabbitmq.com/publishers.html#data-safety
- Publisher Confirms:https://www.rabbitmq.com/confirms.html#publisher-confirms
Using standard AMQP 0-9-1, the only way to guarantee that a message isn't lost is by using transactions -- make the channel transactional then for each message or set of messages publish, commit. In this case, transactions are unnecessarily heavyweight and decrease throughput by a factor of 250. To remedy this, a confirmation mechanism was introduced.
AMQP協(xié)議提供的一個(gè)事務(wù)機(jī)制襟沮,雖然還能確保消息正確送達(dá)锥惋,但比較笨重(性能沒有很好)昌腰,在此基礎(chǔ)上引入了發(fā)送方確認(rèn)機(jī)制。
1. 那么如何實(shí)現(xiàn)發(fā)送方確認(rèn)機(jī)制膀跌?
- 首先要將信道(channel)設(shè)成confirm模式(事務(wù)信道不能設(shè)成confirm模式遭商,而conform模式的信道不具有事務(wù)性)。
- 一旦一個(gè)channel設(shè)成confirm模式后捅伤,Broker和其Producer都開始計(jì)數(shù)(從1開始計(jì)數(shù))株婴。
- Broker在收到消息后進(jìn)行消息確認(rèn)——在這個(gè)信道中給生產(chǎn)者發(fā)送一個(gè)確認(rèn)(basic.act)——消息確認(rèn)包含內(nèi)容:delivery-tag(即計(jì)數(shù))和multiple field。
2. 發(fā)送方確認(rèn)機(jī)制(publisher confirm)有三種方式:
a. 串行confirm模式(Publishing Messages Individually)
b. 批量confirm模式(Publishing Messages in Batches)
c. 異步confirm模式(Handling Publisher Confirms Asynchronously)
3. 如何用代碼實(shí)現(xiàn)
官網(wǎng)文章參考:https://www.rabbitmq.com/tutorials/tutorial-seven-java.html
官網(wǎng)基于原始的amqp-client.jar寫的代碼:https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/PublisherConfirms.java
3.1 首先是串行模式(Publishing Messages Individually)
- 配置:
publisher-confirm-type默認(rèn)是NONE暑认,也就是確認(rèn)機(jī)制是disabled困介。這里我們要把它set為SIMPLE模式。
Publisher確認(rèn)機(jī)制的方式是simple蘸际,意味著Producer發(fā)布一條消息后座哩,需要同步等待Broker的basic.act,官網(wǎng)例子用的是amqp-client.jar粮彤,我這里用的是Spring Boot集成RabbitMQ后的方式根穷。
spring:
rabbitmq:
port: 5672
host: localhost
virtual-host: spring-boot-test
publisher-confirm-type: simple
- Producer端代碼:
Producer發(fā)送一條消息,然后使用方法waitForConfirms(ms)等待导坟,這個(gè)方法會(huì)阻塞等待到Broker的消息確認(rèn)屿良。如果在規(guī)定時(shí)間內(nèi)沒有確認(rèn),就會(huì)報(bào)錯(cuò)惫周。
- boolean waitForConfirms(long timeout) throws AmqpException;
- void waitForConfirmsOrDie(long timeout) throws AmqpException;
上述方法區(qū)別:http://www.yayihouse.com/yayishuwu/chapter/2341
值得一提的是尘惧,如果Producer向一個(gè)不存在的exchange中發(fā)送消息,那么在執(zhí)行rabbitOperations. waitForConfirms的時(shí)候不會(huì)拋AmqpTimeoutException錯(cuò)誤递递,而是會(huì)拋出異常:com.rabbitmq.client.ShutdownSignalException: channel error;
@Slf4j
@SpringBootTest
public class ProducerConfirmServiceTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void singleConfirm() {
try {
rabbitTemplate.invoke(rabbitOperations -> {
rabbitTemplate.convertAndSend("direct.exchange", "direct-routing-key", "hello, i am direct message!");
// 等待Broker確認(rèn)時(shí)間:1ms喷橙,超過1ms報(bào)錯(cuò)
return rabbitOperations.waitForConfirms(1);
});
} catch (AmqpTimeoutException e) {
log.error("met timeout exception: ", e);
}
}
}
也可以用waitForConfirmsOrDie(ms)來確認(rèn):
rabbitTemplate.invoke(rabbitOperations -> {
rabbitOperations.convertAndSend("direct.exchange", "direct-routing-key", "hello, i am direct message!");
rabbitOperations.waitForConfirmsOrDie(100000);
return true;
});
由于是發(fā)布者確認(rèn)機(jī)制(發(fā)生在Publisher和Broker之間),消費(fèi)端的代碼沒有改動(dòng)登舞,這里就不貼了贰逾,詳細(xì)看 【RabbitMQ的那點(diǎn)事】與Spring boot集成:http://www.reibang.com/p/4a21a7fce14c
上述方法測試結(jié)果會(huì)報(bào)錯(cuò)(1ms太短了,Broker來不及確認(rèn)):
也可以通過Thread name=main看出是同步(阻塞等待)的菠秒,這里始終是主線程在執(zhí)行疙剑。另外雖然Broker確認(rèn)失敗了,因?yàn)锽roker其實(shí)是好的践叠,只是我們設(shè)的等待時(shí)間太短了言缤,所以消息依然是發(fā)送出去了。
2022-05-07 17:43:30.104 ERROR 63048 --- [ main] ProducerConfirmServiceTest : met timeout exception:
org.springframework.amqp.AmqpTimeoutException: java.util.concurrent.TimeoutException at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:73) ~[spring-rabbit-2.3.12.jar:2.3.12] at org.springframework.amqp.rabbit.core.RabbitTemplate.waitForConfirms(RabbitTemplate.java:2320) ~[spring-rabbit-2.3.12.jar:2.3.12]
...
3.2 其次是批量confirm確認(rèn)(Publishing Messages in Batches)
在#3.1示例是單條publish后Producer就開始等待Broker的確認(rèn)酵熙,當(dāng)然我們也可以在發(fā)布一定數(shù)量的消息后再開始確認(rèn)轧简,比如100條驰坊。
這樣做的好處是可以提高吞吐量匾二。缺點(diǎn)是如果收不到Broker的確認(rèn),我們不知道這一批中哪一個(gè)消息開始出了問題,所以可能需要將這100條都重新發(fā)送察藐,可能會(huì)造成重復(fù)發(fā)的情況皮璧。
@Slf4j
@SpringBootTest
public class ProducerConfirmServiceTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void batchConfirm() {
try {
rabbitTemplate.invoke(rabbitOperations -> {
for (int i = 0; i < 10; i ++) {
rabbitTemplate.convertAndSend("direct.exchange", "direct-routing-key", "message - " + i);
}
return rabbitOperations.waitForConfirms(10000);
});
} catch (AmqpTimeoutException e) {
log.error("met timeout exception: ", e);
}
}
}
3.3 最后是異步confirm確認(rèn)(Handling Publisher Confirms Asynchronously)
同步確認(rèn)的配置是publisher-confirm-type: simple
publisher-confirm-type: 還有另外一個(gè)配置項(xiàng)即:correlated,如果使用該配置項(xiàng)分飞,說明發(fā)送方也需要消息確認(rèn)悴务,并且可以通過CorrelationData來回傳額外的信息。這個(gè)分類方法與串行或批量無關(guān)譬猫,只是confirm能否回傳數(shù)據(jù)的分類方式讯檐。
以下是示例:
- 配置:publisher-confirm-type: correlated
spring:
rabbitmq:
port: 5672
host: localhost
virtual-host: spring-boot-test
publisher-confirm-type: correlated
- 新建一個(gè)ConfirmCallBack類,需要實(shí)現(xiàn)RabbitTemplate.ConfirmCallback接口 染服,重寫其confirm()方法别洪,方法內(nèi)有三個(gè)參數(shù)correlationData、ack柳刮、cause:
- correlationData:對象內(nèi)部只有一個(gè) id 屬性挖垛,用來表示當(dāng)前消息的唯一性。
- ack:消息投遞到broker 的狀態(tài)秉颗,true表示成功痢毒。
- cause:表示投遞失敗的原因。
@Slf4j
public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
public void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause) {
log.info("MsgSendConfirmCallBack , 回調(diào)id: {}", correlationData);
if(ack) {
log.info("消息發(fā)送成功");
} else {
log.info("消息發(fā)送失敗: {}", cause);
}
}
}
- Producer類:
在發(fā)送消息前需要先set一個(gè)ConfirmCallback蚕甥,發(fā)送消息的時(shí)候可以帶上CorrelationData哪替,在callback中可以接收該data:
@Slf4j
@SpringBootTest
public class ProducerConfirmServiceTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void confirmAsync() {
try {
rabbitTemplate.setConfirmCallback(new MsgSendConfirmCallBack());
CorrelationData correlationData = new CorrelationData();
log.info("開始發(fā)送消息");
correlationData.setId("100");
rabbitTemplate.convertAndSend("direct.exchange", "direct-routing-key", "hello, i am direct message!", correlationData);
} catch (AmqpTimeoutException e) {
log.error("met timeout exception: ", e);
}
}
}
測試結(jié)果:可以看出回調(diào)方法用的是自己的線程,即異步菇怀。并且能收到發(fā)送時(shí)帶的CorrelationData類:
2022-05-07 18:41:32.112 INFO 75420 --- [ main] ProducerConfirmServiceTest : 開始發(fā)送消息
2022-05-07 18:41:32.133 INFO 75420 --- [nectionFactory1] MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回調(diào)id: CorrelationData [id=100]
2022-05-07 18:41:32.135 INFO 75420 --- [nectionFactory1] MsgSendConfirmCallBack : 消息發(fā)送成功
也可以發(fā)送到錯(cuò)誤的exchange上來測試發(fā)送callback:
rabbitTemplate.convertAndSend("wrong.exchange", "direct-routing-key", "hello, i am direct message!", correlationData);
測試結(jié)果:callback會(huì)檢測到錯(cuò)誤夷家,也就是說使用ConfirmCallBack無論消息是否正確送到Broker,都會(huì)進(jìn)入該回調(diào)函數(shù)類中敏释。
2022-05-07 19:03:37.792 INFO 80121 --- [ main] ProducerConfirmServiceTest : 開始發(fā)送消息
2022-05-07 19:03:37.802 ERROR 80121 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'wrong.exchange' in vhost 'spring-boot-test', class-id=60, method-id=40)
2022-05-07 19:03:37.804 INFO 80121 --- [nectionFactory2] MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回調(diào)id: CorrelationData [id=100]
2022-05-07 19:03:37.806 INFO 80121 --- [nectionFactory2] MsgSendConfirmCallBack : 消息發(fā)送失敗: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'wrong.exchange' in vhost 'spring-boot-test', class-id=60, method-id=40)
4. 總結(jié)
發(fā)送方確認(rèn)機(jī)制是保證消息可靠環(huán)節(jié)的第1步库快。三種方式總結(jié)如下:
a. 發(fā)送消息串行(逐條)確認(rèn)——同步等待,簡單钥顽,但會(huì)限制吞吐量义屏。
b. 批量發(fā)送消息后再確認(rèn)——同步等待,簡單蜂大,能提高吞吐量闽铐,但極端情況下會(huì)造成消息的重復(fù)發(fā)送(無法精確定位到單條錯(cuò)誤消息)。
c. 發(fā)送消息后異步等待確認(rèn)奶浦,效率高兄墅,但需要正確的實(shí)現(xiàn)接口方法。
以下是官方的例子測評結(jié)果(官網(wǎng)代碼在#3一開始有貼)澳叉,發(fā)送消息的總條數(shù)都是50000條隙咸,Brokder和producer都在同一臺(tái)機(jī)器上:
測試用例 | 花費(fèi)時(shí)間 |
---|---|
串行確認(rèn)(同步逐條確認(rèn)) | 5,549 ms |
批量(按100條一批次沐悦,同步確認(rèn)) | 2,331 ms |
異步確認(rèn) | 4,054 ms |
生產(chǎn)環(huán)境往往Broker是單獨(dú)的機(jī)器,所以官網(wǎng)又做了以下的測試五督,同樣是發(fā)送50000條消息藏否,但這次是遠(yuǎn)程發(fā)送:
測試用例 | 花費(fèi)時(shí)間 |
---|---|
串行確認(rèn)(同步逐條確認(rèn)) | 231,541 ms |
批量(按100條一批次,同步確認(rèn)) | 7,232 ms |
異步確認(rèn) | 6,332 ms |
可以看到逐條發(fā)送后確認(rèn)的效率是驚人的低充包。批量確認(rèn)和異步確認(rèn)的效率差不太多副签。批量確認(rèn)的代碼容易實(shí)現(xiàn),而異步確認(rèn)的實(shí)現(xiàn)會(huì)比較復(fù)雜一些基矮。
結(jié)束了嗎淆储?還沒有!<医健遏考!
上述串行、批量確認(rèn)以及異步確認(rèn)蓝谨,都是為了解決:讓Producer知道信息有沒有成功的發(fā)送到Broker的Exchange交換機(jī)上灌具,但如果消息從Exchange 到 Queue投遞失敗(或者Exchange沒有匹配的Queue的話)譬巫,那么消息也會(huì)丟失咖楣,這時(shí)候要怎么辦?
- 當(dāng)發(fā)布者發(fā)布消息到Exchange上芦昔,但Exchange沒有綁定的Queue時(shí)诱贿,默認(rèn)情況下發(fā)布的消息會(huì)丟掉。當(dāng)然這時(shí)候我們也可以啟用Alternate Exchange咕缎,將沒有目的地的消息統(tǒng)一轉(zhuǎn)到這個(gè)Alternative Exchange上來珠十。
- 或者在發(fā)送消息的時(shí)候,將參數(shù)mandatory置為true凭豪,那么message就會(huì)退回到Producer方焙蹭,Producer方需要實(shí)現(xiàn)ReturnCallback接口(https://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/ReturnCallback.html),也能將退回的消息取到嫂伞。
針對上述兩種方式孔厉,具體來演示:
5. Alternate Exchange
關(guān)于Alternate Exchange, 參見官網(wǎng):https://www.rabbitmq.com/ae.html
以下是具體思路:
- 首先創(chuàng)建一個(gè)Exchange(Fanout類型),叫backup.exchange帖努。
- 再創(chuàng)建一個(gè)Queue撰豺,叫noBinding.queue,并綁定到backup.exchange上(因?yàn)樵揺xchange是fanout type拼余,所以routingKey為空)污桦。
- 在創(chuàng)建正常要使用的Exchange時(shí)(比如叫direct.exchange),可以將backup.exchange作為參數(shù)名為alternate-exchange的值匙监,傳入direct.exchange中凡橱。
以下是代碼示例:
先是Alternate Exchange的創(chuàng)建:
@Configuration
public class AlternateExchangeConfig {
@Bean
public Queue noRoutedQueue() {
return new Queue("noBinding.queue", true);
}
@Bean
public FanoutExchange backupExchange() {
return new FanoutExchange("backup.exchange");
}
@Bean
public Binding noBinding(Queue noRoutedQueue, FanoutExchange backupExchange) {
return BindingBuilder.bind(noRoutedQueue).to(backupExchange);
}
@RabbitListener(queues = "noBinding.queue")
public void listen(String in) {
System.out.println("[noBinding.queue]: " + in);
}
}
再是正常業(yè)務(wù)處理的Exchange
可以看到這里有新加arguments小作,key是alternate-exchange, value是上述創(chuàng)建的backup.exchange:
@Configuration
public class DirectExchangeConfig {
@Bean
public Queue directqueue() {
return new Queue("direct.queue", true);
}
@Bean
public DirectExchange directExchange() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("alternate-exchange", "backup.exchange");
DirectExchange directExchange = new DirectExchange("direct.exchange", true, false, arguments);
return directExchange;
}
@Bean
public Binding directBinding(Queue directqueue, DirectExchange directExchange) {
return BindingBuilder.bind(directqueue).to(directExchange).with("direct-routing-key");
}
@RabbitListener(queues = "direct.queue")
public void listen(String in) {
System.out.println("Direct Message Listener: " + in);
}
}
測試
將消息發(fā)送到direct.exchange,但是routingKey是錯(cuò)誤的梭纹,也就是這個(gè)消息沒有目的地:
@Slf4j
@SpringBootTest
public class ProducerServiceTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMessageToDirectExchangeWrongly() {
rabbitTemplate.convertAndSend("direct.exchange", "wrong.routing-key", "hello, i am direct message!");
}
}
測試印機(jī)結(jié)果:[noBinding.queue]: hello, i am direct message!
也就是由于routingKey是錯(cuò)的躲惰,消息并沒有從direct.exchange正確的發(fā)送到direct.queue上致份,而是轉(zhuǎn)發(fā)到了backup.exchange上变抽,通過廣播模式被noBinding.queue監(jiān)聽到。
6. 實(shí)現(xiàn)ReturnCallback接口來接收退回的消息
在rabbitmq原生的API中氮块,需要在發(fā)送的時(shí)候?qū)?shù)mandatory置為true绍载,然后通過實(shí)現(xiàn)ReturnCallback接口來接收退回的消息。
如果是和Spring Boot結(jié)合滔蝉,以下是示例:
配置:
首先需要先設(shè)置publisher-returns = true
spring:
rabbitmq:
port: 5672
host: localhost
virtual-host: spring-boot-test
publisher-returns: true
Producer類
嘗試往錯(cuò)誤的routingKey中發(fā)消息击儡,即topic.exchange通過a.wrong,找不到正確的Queue蝠引,由于publisher-returns為true阳谍,所以消息就被ReturnCallback捕捉到了。
在高版本的RabbitTemplate中的ReturnCallback是@Deprecated螃概,理由是提倡我們使用lamda表達(dá)式去實(shí)現(xiàn)矫夯,取而代之的是FunctionalInterface ReturnsCallback,這個(gè)接口其實(shí)就是ReturnCallback的子接口吊洼。
所以我們不需要單獨(dú)創(chuàng)建類训貌,而是在rabbitTemplate setReturnsCallback的時(shí)候直接使用lamda表達(dá)式,一般里面的實(shí)現(xiàn)可以是發(fā)送郵件等冒窍。
@Slf4j
@SpringBootTest
public class ReturnCallbackServiceTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void returnCallback() {
rabbitTemplate.setReturnsCallback((message) -> {
log.info("getMessage: {}", message.getMessage());
log.info("getRoutingKey: {}", message.getRoutingKey());
log.info("getExchange: {}", message.getExchange());
log.info("getReplyCode: {}", message.getReplyCode());
log.info("getReplyText: {}", message.getReplyText());
});
rabbitTemplate.convertAndSend("topic.exchange", "a.wrong", "important message!");
log.info("Finished for sending message...");
}
}
測試結(jié)果:
2022-05-10 12:46:50.280 INFO 58740 --- [ main] ReturnCallbackServiceTest : Finished for sending message...
2022-05-10 12:46:50.282 INFO 58740 --- [nectionFactory1] ReturnCallbackServiceTest : getMessage: (Body:'important message!' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
2022-05-10 12:46:50.284 INFO 58740 --- [nectionFactory1] ReturnCallbackServiceTest : getRoutingKey: a.wrong
2022-05-10 12:46:50.285 INFO 58740 --- [nectionFactory1] ReturnCallbackServiceTest : getExchange: topic.exchange
2022-05-10 12:46:50.285 INFO 58740 --- [nectionFactory1] ReturnCallbackServiceTest : getReplyCode: 312
2022-05-10 12:46:50.285 INFO 58740 --- [nectionFactory1] ReturnCallbackServiceTest : getReplyText: NO_ROUTE
- 關(guān)于Alternate Exchange和ReturnCallback的優(yōu)先級:如果同時(shí)設(shè)置了,那么Alternate Exchange的優(yōu)先級更高谬莹,也就是退回的消息會(huì)首先轉(zhuǎn)到設(shè)置的Alternate Exchange中樱调,從而不會(huì)調(diào)用ReturnCallback了。
- 如果是延遲隊(duì)列(delayed exchange)届良,那么ReturnCallback會(huì)一直報(bào)ReplyCode=312的錯(cuò)笆凌,也就是延遲隊(duì)列不適合使用ReturnCallback功能。