消費確認(rèn)(comsumer acknowledgements)
broker與消費者之間的消息確認(rèn)稱為comsumer acknowledgements九昧,comsumer acknowledgements機制用于解決消費者與Rabbitmq服務(wù)器之間消息可靠傳輸蔼卡,它是在消費端消費成功之后通知broker消費端消費消息成功從而broker刪除這個消息。
RabbitMQ Java Client 實現(xiàn)消息確認(rèn)
自動確認(rèn)
向zhihao.miao.order
隊列中發(fā)送一條消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.concurrent.TimeUnit;
public class Consumer {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
/**
* basicConsume方法的第二個參數(shù)是boolean類型,true表示消息一旦投遞出去就自動確認(rèn)示弓,而false表示需要自己手動去確認(rèn)
* 自動確認(rèn)有丟消息的可能档桃,因為如果消費端消費邏輯拋出異常,也就是消費端沒有處理成功這條消息堪嫂,那么就相當(dāng)于丟失了消息
* 設(shè)置了false,表示需要人為手動的去確定消息木柬,只有消費者將消息消費成功之后給與broker人為確定才進行消息確認(rèn)
* 這邊也有個問題就是如果由于程序員自己的代碼的原因造成人為的拋出異常皆串,人工確認(rèn)那么消息就會一直重新入隊列,一直重發(fā)眉枕?
*/
String consumerTag = channel.basicConsume("zhihao.miao.order",true,new SimpleConsumer(channel));
System.out.println(consumerTag);
TimeUnit.SECONDS.sleep(30);
channel.close();
connection.close();
}
}
消費具體邏輯
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class SimpleConsumer extends DefaultConsumer {
public SimpleConsumer(Channel channel){
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(consumerTag);
System.out.println("-----收到消息了---------------");
System.out.println("消息屬性為:"+properties);
System.out.println("消息內(nèi)容為:"+new String(body));
try
{
int i = 1/0;
System.out.println(i);
}catch (Exception e){
e.printStackTrace();
}
}
}
控制臺打佣窀础:
amq.ctag-6_GQmh1tMeooWSiuqUmz0Q
java.lang.ArithmeticException: / by zero
-----收到消息了---------------
at com.zhihao.test.day04.SimpleConsumer.handleDelivery(SimpleConsumer.java:29)
amq.ctag-6_GQmh1tMeooWSiuqUmz0Q
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100)
消息屬性為:#contentHeader<basic>(content-type=json, content-encoding=null, headers={}, delivery-mode=2, priority=0, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
消息內(nèi)容為:{"orderId":"abba05db-050e-4b1a-97f1-c469b23ca27b","createTime":"2017-10-22T21:02:41.861","price":100.0}
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
此時可以看到消費端拋出了異常,但是我們發(fā)現(xiàn)這條消息也已經(jīng)消費掉了速挑,此時如果消費端消費邏輯使用spring進行管理的話消費端業(yè)務(wù)邏輯會進行回滾谤牡,這也就造成了實際意義的消息丟失。
手動確認(rèn)
自動確認(rèn)會造成實際意義上的消息丟失姥宝。
將basicConsume方法的第二個參數(shù)改為false翅萤,表示人工的進行消息確認(rèn),如果消費者正在監(jiān)聽隊列腊满,那么此時消息進入Unacked套么,而如果消費者停掉服務(wù),那么消息的狀態(tài)又變成Ready了碳蛋。這個機制表明了消息必須是ack確認(rèn)之后才會在server中刪除掉违诗。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.concurrent.TimeUnit;
public class Consumer {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//手動確認(rèn)
String consumerTag = channel.basicConsume("zhihao.miao.order",false,new SimpleConsumer(channel));
System.out.println(consumerTag);
TimeUnit.SECONDS.sleep(30);
channel.close();
connection.close();
}
}
消費具體邏輯
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class SimpleConsumer extends DefaultConsumer {
public SimpleConsumer(Channel channel){
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
System.out.println(consumerTag);
System.out.println("-----收到消息了--------------");
System.out.println("消息屬性為:"+properties);
System.out.println("消息內(nèi)容為:"+new String(body));
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.getChannel().basicAck(envelope.getDeliveryTag(),false);
System.out.println("消息消費成功");
}
}
發(fā)送一個header中包含error屬性的消息疮蹦,
改造消費邏輯
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class SimpleConsumer extends DefaultConsumer {
public SimpleConsumer(Channel channel){
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
System.out.println(consumerTag);
System.out.println("-----收到消息了--------------");
if(properties.getHeaders().get("error") != null){
try {
TimeUnit.SECONDS.sleep(15);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("nack");
this.getChannel().basicNack(envelope.getDeliveryTag(),false,true);
return;
}
System.out.println("消息屬性為:"+properties);
System.out.println("消息內(nèi)容為:"+new String(body));
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.getChannel().basicAck(envelope.getDeliveryTag(),false);
System.out.println("消息消費成功");
}
}
控制臺打印诸迟,說明該消息一直重新入隊列然后一直重新消費
amq.ctag-U5cHBcnxa5dhkYXjd1LFgQ
amq.ctag-U5cHBcnxa5dhkYXjd1LFgQ
-----收到消息了--------------
nack
amq.ctag-U5cHBcnxa5dhkYXjd1LFgQ
-----收到消息了--------------
nack
amq.ctag-U5cHBcnxa5dhkYXjd1LFgQ
-----收到消息了--------------
nack
amq.ctag-U5cHBcnxa5dhkYXjd1LFgQ
-----收到消息了--------------
nack
消費端也可以拒絕消息,
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class SimpleConsumer extends DefaultConsumer {
public SimpleConsumer(Channel channel){
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
System.out.println(consumerTag);
System.out.println("-----收到消息了--------------");
if(properties.getHeaders().get("error") != null){
try {
TimeUnit.SECONDS.sleep(15);
} catch (InterruptedException e) {
e.printStackTrace();
}
//這個api也支持拒絕消息消費愕乎,第二個參數(shù)表示是否重新入隊列
this.getChannel().basicReject(envelope.getDeliveryTag(),false);
System.out.println("消息無法消費阵苇,拒絕消息");
return;
}
System.out.println("消息屬性為:"+properties);
System.out.println("消息內(nèi)容為:"+new String(body));
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.getChannel().basicAck(envelope.getDeliveryTag(),false);
System.out.println("消息消費成功");
}
}
控制臺打印,因為設(shè)置了不重新入隊列感论,所以不再重新發(fā)消息了:
amq.ctag-kiy_49AkC3f4qRkqCMujrw
amq.ctag-kiy_49AkC3f4qRkqCMujrw
-----收到消息了--------------
消息無法消費绅项,拒絕消息
總結(jié)
消費端的消息確認(rèn)分為二個步驟,
- 在channel.basicConsume指定為手動確認(rèn)比肄。
- 具體根據(jù)業(yè)務(wù)邏輯來進行判斷什么是ack什么時候nack(又分為要不要重新requeue)
這邊有個問題就是nack時候或者reject時候重新入隊列如果業(yè)務(wù)端因為代碼邏輯問題一直重發(fā)怎樣去設(shè)置一個次數(shù)值快耿?
我的設(shè)想就是設(shè)置一個重新發(fā)送的遞增值囊陡,這個值與消息id對應(yīng),去處理解決它掀亥∽卜矗或者在redis或者memcache等其他保存方式然后記錄這個重發(fā)次數(shù)。
How do I set a number of retry attempts in RabbitMQ?
Spring AMQP消費端實現(xiàn)消息確認(rèn)
自動確認(rèn)
配置類
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.TimeUnit;
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("zhihao.miao.order");
/**
* 自動確認(rèn)涉及到一個問題就是如果在消息消息的時候拋出異常搪花,消息處理失敗遏片,但是因為自動確認(rèn)而server將該消息刪除了。
* NONE表示自動確認(rèn)
*/
container.setAcknowledgeMode(AcknowledgeMode.NONE);
container.setMessageListener((MessageListener) message -> {
System.out.println("====接收到消息=====");
System.out.println(new String(message.getBody()));
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
//相當(dāng)于自己的一些消費邏輯拋錯誤
throw new NullPointerException("consumer fail");
});
return container;
}
}
應(yīng)用啟動類
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.util.concurrent.TimeUnit;
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
TimeUnit.SECONDS.sleep(100);
System.out.println("message container startup");
context.close();
}
}
控制臺打哟楦汀:
====接收到消息=====
{"orderId":"d232eea5-35ae-4534-80f4-cfb31f49178f","createTime":"2017-10-22T22:11:34.239","price":100.0}
十月 22, 2017 10:11:58 下午 org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler handleError
警告: Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
Web控制臺上顯示消息消費確認(rèn)也成功吮便。問題還是自動確認(rèn)會造成事實上的消息丟失。
手動確認(rèn)
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.TimeUnit;
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("zhihao.miao.order");
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
System.out.println("====接收到消息=====");
System.out.println(new String(message.getBody()));
TimeUnit.SECONDS.sleep(10);
if(message.getMessageProperties().getHeaders().get("error") == null){
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("消息已經(jīng)確認(rèn)");
}else {
//channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("消息拒絕");
}
});
return container;
}
}
總結(jié)
AcknowledgeMode.NONE:自動確認(rèn)幢踏,等效于autoAck=true
AcknowledgeMode.MANUAL:手動確認(rèn)髓需,等效于autoAck=false,此時如果要實現(xiàn)ack和nack回執(zhí)的話房蝉,使用ChannelAwareMessageListener監(jiān)聽器處理授账。
AcknowledgeMode.AUTO的使用
我們發(fā)現(xiàn)AcknowledgeMode
除了AcknowledgeMode.NONE
和AcknowledgeMode.MANUAL
常量值之外還有一個AcknowledgeMode.AUTO
的常量。
配置類
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("zhihao.miao.order");
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setMessageListener((MessageListener) (message) -> {
System.out.println("====接收到消息=====");
System.out.println(new String(message.getBody()));
//拋出NullPointerException異常則重新入隊列
//throw new NullPointerException("消息消費失敗");
//當(dāng)拋出的異常是AmqpRejectAndDontRequeueException異常的時候惨驶,則消息會被拒絕,且requeue=false
//throw new AmqpRejectAndDontRequeueException("消息消費失敗");
//當(dāng)拋出ImmediateAcknowledgeAmqpException異常敛助,則消費者會被確認(rèn)
throw new ImmediateAcknowledgeAmqpException("消息消費失敗");
});
return container;
}
}
應(yīng)用啟動類
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.util.concurrent.TimeUnit;
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
TimeUnit.SECONDS.sleep(100);
System.out.println("message container startup");
context.close();
}
}
AcknowledgeMode.AUTO 根據(jù)方法的執(zhí)行情況來決定是否確認(rèn)還是拒絕(是否重新入queue)
- 如果消息成功被消費(成功的意思就是在消費的過程中沒有拋出異常)粗卜,則自動確認(rèn)。
1)當(dāng)拋出AmqpRejectAndDontRequeueException異常的時候纳击,則消息會被拒絕续扔,且requeue=false(不重新入隊列)
2)當(dāng)拋出ImmediateAcknowledgeAmqpException異常,則消費者會被確認(rèn)
3)其他的異常焕数,則消息會被拒絕纱昧,且requeue=true(如果此時只有一個消費者監(jiān)聽該隊列,則有發(fā)生死循環(huán)的風(fēng)險堡赔,多消費端也會造成資源的極大浪費识脆,這個在開發(fā)過程中一定要避免的)∩埔眩可以通過setDefaultRequeueRejected(默認(rèn)是true)去設(shè)置灼捂,
源碼分析
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
的doReceiveAndExecute
方法,
private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Throwable { //NOSONAR
Channel channel = consumer.getChannel();
for (int i = 0; i < this.txSize; i++) {
logger.trace("Waiting for message from consumer.");
Message message = consumer.nextMessage(this.receiveTimeout);
if (message == null) {
break;
}
try {
//具體的邏輯换团,具體執(zhí)行Listener
executeListener(channel, message);
}
//當(dāng)ImmediateAcknowledgeAmqpException異常的時候打印日志然后直接break
catch (ImmediateAcknowledgeAmqpException e) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("User requested ack for failed delivery: "
+ message.getMessageProperties().getDeliveryTag());
}
break;
}
catch (Throwable ex) { //NOSONAR
if (causeChainHasImmediateAcknowledgeAmqpException(ex)) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("User requested ack for failed delivery: "
+ message.getMessageProperties().getDeliveryTag());
}
break;
}
if (this.transactionManager != null) {
if (this.transactionAttribute.rollbackOn(ex)) {
RabbitResourceHolder resourceHolder = (RabbitResourceHolder) TransactionSynchronizationManager
.getResource(getConnectionFactory());
if (resourceHolder != null) {
consumer.clearDeliveryTags();
}
else {
/*
* If we don't actually have a transaction, we have to roll back
* manually. See prepareHolderForRollback().
*/
consumer.rollbackOnExceptionIfNecessary(ex);
}
throw ex; // encompassing transaction will handle the rollback.
}
else {
if (this.logger.isDebugEnabled()) {
this.logger.debug("No rollback for " + ex);
}
break;
}
}
else {
//進入這邊
consumer.rollbackOnExceptionIfNecessary(ex);
throw ex;
}
}
}
return consumer.commitIfNecessary(isChannelLocallyTransacted(channel));
}
進入rollbackOnExceptionIfNecessary
方法
public void rollbackOnExceptionIfNecessary(Throwable ex) throws Exception {
//當(dāng)ack機制為AUTO的時候
boolean ackRequired = !this.acknowledgeMode.isAutoAck() && !this.acknowledgeMode.isManual();
try {
if (this.transactional) {
if (logger.isDebugEnabled()) {
logger.debug("Initiating transaction rollback on application exception: " + ex);
}
RabbitUtils.rollbackIfNecessary(this.channel);
}
if (ackRequired) {
//是否入隊列悉稠,shouldRequeue就是具體的入隊列和不入隊列的判斷
boolean shouldRequeue = RabbitUtils.shouldRequeue(this.defaultRequeuRejected, ex, logger);
for (Long deliveryTag : this.deliveryTags) {
// With newer RabbitMQ brokers could use basicNack here...
//執(zhí)行拒絕策略
this.channel.basicReject(deliveryTag, shouldRequeue);
}
if (this.transactional) {
// Need to commit the reject (=nack)
RabbitUtils.commitIfNecessary(this.channel);
}
}
}
catch (Exception e) {
logger.error("Application exception overridden by rollback exception", ex);
throw e;
}
finally {
this.deliveryTags.clear();
}
}
是否入隊列的判斷(shouldRequeue
)
public static boolean shouldRequeue(boolean defaultRequeueRejected, Throwable throwable, Log logger) {
boolean shouldRequeue = defaultRequeueRejected ||
throwable instanceof MessageRejectedWhileStoppingException;
Throwable t = throwable;
while (shouldRequeue && t != null) {
//如果拋出的異常是AmqpRejectAndDontRequeueException的時候,不入隊列
if (t instanceof AmqpRejectAndDontRequeueException) {
shouldRequeue = false;
}
t = t.getCause();
}
if (logger.isDebugEnabled()) {
logger.debug("Rejecting messages (requeue=" + shouldRequeue + ")");
}
return shouldRequeue;
}
container.setDefaultRequeueRejected(false);
艘包,那么消息就不會重新入隊列的猛,只會拒絕一次耀盗。
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("zhihao.miao.order");
//自動確認(rèn)涉及到一個問題就是如果在消息消息的時候拋出異常,消息處理失敗卦尊,但是因為自動確認(rèn)而server將該消息刪除了叛拷。
//NONE表示自動確認(rèn)
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setDefaultRequeueRejected(false);
container.setMessageListener((MessageListener) (message) -> {
System.out.println("====接收到消息=====");
System.out.println(new String(message.getBody()));
throw new NullPointerException("消息消費失敗");
//throw new AmqpRejectAndDontRequeueException("消息消費失敗");
//throw new ImmediateAcknowledgeAmqpException("消息消費失敗");
});
return container;
}
使用@RabbitListener注解監(jiān)聽隊列
設(shè)置確認(rèn)模式是通過在容器中設(shè)置RabbitListenerContainerFactory實例的setAcknowledgeMode方法來設(shè)定。
配置:
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConsumerConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//默認(rèn)的確認(rèn)模式是AcknowledgeMode.AUTO
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
}
處理器:
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
@Component
public class MessageHandler {
@RabbitListener(queues ="zhihao.miao.order")
public void handleMessage(byte[] bytes, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
System.out.println("====消費消息===handleMessage");
System.out.println(new String(bytes));
channel.basicAck(tag,false);
}
}
應(yīng)用啟動類:
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.util.concurrent.TimeUnit;
@EnableRabbit
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
System.out.println("rabbit service startup");
TimeUnit.SECONDS.sleep(3000);
context.close();
}
}
可靠消息總結(jié)
實際使用mq的實例猫牡,每段時間定期的給經(jīng)常訂早餐的推送短信(上新品)胡诗。
登錄短信(也是使用消息中間件)
下單的時候,使用消息中間件發(fā)送到配送系統(tǒng)(消息不能丟失)淌友。
做到消息不能丟失煌恢,我們就要實現(xiàn)可靠消息,做到這一點震庭,我們要做到下面二點:
一:持久化
1: exchange要持久化
2: queue要持久化
3: message要持久化
二:消息確認(rèn)
1: 啟動消費返回(@ReturnList注解瑰抵,生產(chǎn)者就可以知道哪些消息沒有發(fā)出去)
2:生產(chǎn)者和Server(broker)之間的消息確認(rèn)。
3: 消費者和Server(broker)之間的消息確認(rèn)器联。
對于重要的消息二汛,要結(jié)合本地的消息表才能上生產(chǎn)。