RabbitMQ筆記十六:消息確認(rèn)之二(Consumer Acknowledgements)

消費確認(rèn)(comsumer acknowledgements)

broker與消費者之間的消息確認(rèn)稱為comsumer acknowledgements九昧,comsumer acknowledgements機制用于解決消費者與Rabbitmq服務(wù)器之間消息可靠傳輸蔼卡,它是在消費端消費成功之后通知broker消費端消費消息成功從而broker刪除這個消息。

RabbitMQ Java Client 實現(xiàn)消息確認(rèn)

自動確認(rèn)

zhihao.miao.order隊列中發(fā)送一條消息

web管控臺查看

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.concurrent.TimeUnit;

public class Consumer {
    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        /**
         * basicConsume方法的第二個參數(shù)是boolean類型,true表示消息一旦投遞出去就自動確認(rèn)示弓,而false表示需要自己手動去確認(rèn)
         * 自動確認(rèn)有丟消息的可能档桃,因為如果消費端消費邏輯拋出異常,也就是消費端沒有處理成功這條消息堪嫂,那么就相當(dāng)于丟失了消息
         * 設(shè)置了false,表示需要人為手動的去確定消息木柬,只有消費者將消息消費成功之后給與broker人為確定才進行消息確認(rèn)
         * 這邊也有個問題就是如果由于程序員自己的代碼的原因造成人為的拋出異常皆串,人工確認(rèn)那么消息就會一直重新入隊列,一直重發(fā)眉枕?
         */

        String consumerTag = channel.basicConsume("zhihao.miao.order",true,new SimpleConsumer(channel));
        System.out.println(consumerTag);

        TimeUnit.SECONDS.sleep(30);

        channel.close();
        connection.close();
    }
}

消費具體邏輯

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class SimpleConsumer extends DefaultConsumer {

    public SimpleConsumer(Channel channel){
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println(consumerTag);
        System.out.println("-----收到消息了---------------");
        System.out.println("消息屬性為:"+properties);
        System.out.println("消息內(nèi)容為:"+new String(body));
        try
        {
            int i = 1/0;
            System.out.println(i);
        }catch (Exception e){
            e.printStackTrace();
        }

    }
}

控制臺打佣窀础:

amq.ctag-6_GQmh1tMeooWSiuqUmz0Q
java.lang.ArithmeticException: / by zero
-----收到消息了---------------
    at com.zhihao.test.day04.SimpleConsumer.handleDelivery(SimpleConsumer.java:29)
amq.ctag-6_GQmh1tMeooWSiuqUmz0Q
    at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
    at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100)
消息屬性為:#contentHeader<basic>(content-type=json, content-encoding=null, headers={}, delivery-mode=2, priority=0, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
消息內(nèi)容為:{"orderId":"abba05db-050e-4b1a-97f1-c469b23ca27b","createTime":"2017-10-22T21:02:41.861","price":100.0}
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

此時可以看到消費端拋出了異常,但是我們發(fā)現(xiàn)這條消息也已經(jīng)消費掉了速挑,此時如果消費端消費邏輯使用spring進行管理的話消費端業(yè)務(wù)邏輯會進行回滾谤牡,這也就造成了實際意義的消息丟失。

web管控臺

手動確認(rèn)

自動確認(rèn)會造成實際意義上的消息丟失姥宝。

將basicConsume方法的第二個參數(shù)改為false翅萤,表示人工的進行消息確認(rèn),如果消費者正在監(jiān)聽隊列腊满,那么此時消息進入Unacked套么,而如果消費者停掉服務(wù),那么消息的狀態(tài)又變成Ready了碳蛋。這個機制表明了消息必須是ack確認(rèn)之后才會在server中刪除掉违诗。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.concurrent.TimeUnit;

public class Consumer {
    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        //手動確認(rèn)
        String consumerTag = channel.basicConsume("zhihao.miao.order",false,new SimpleConsumer(channel));
        System.out.println(consumerTag);

        TimeUnit.SECONDS.sleep(30);

        channel.close();
        connection.close();
    }
}

消費具體邏輯

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class SimpleConsumer extends DefaultConsumer {

    public SimpleConsumer(Channel channel){
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
        System.out.println(consumerTag);
        System.out.println("-----收到消息了--------------");

        System.out.println("消息屬性為:"+properties);
        System.out.println("消息內(nèi)容為:"+new String(body));

        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.getChannel().basicAck(envelope.getDeliveryTag(),false);
        System.out.println("消息消費成功");
    }
}
此時消息已經(jīng)發(fā)送給消費者,但是消費者還沒有進行手動確認(rèn)

發(fā)送一個header中包含error屬性的消息疮蹦,

發(fā)送一個header中包含error屬性的消息

改造消費邏輯

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeUnit;


public class SimpleConsumer extends DefaultConsumer {

    public SimpleConsumer(Channel channel){
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
        System.out.println(consumerTag);
        System.out.println("-----收到消息了--------------");

        if(properties.getHeaders().get("error") != null){
            try {
                TimeUnit.SECONDS.sleep(15);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
          
            System.out.println("nack");
            this.getChannel().basicNack(envelope.getDeliveryTag(),false,true);

            return;
        }
        System.out.println("消息屬性為:"+properties);
        System.out.println("消息內(nèi)容為:"+new String(body));


        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.getChannel().basicAck(envelope.getDeliveryTag(),false);
        System.out.println("消息消費成功");
    }
}

控制臺打印诸迟,說明該消息一直重新入隊列然后一直重新消費

amq.ctag-U5cHBcnxa5dhkYXjd1LFgQ
amq.ctag-U5cHBcnxa5dhkYXjd1LFgQ
-----收到消息了--------------
nack
amq.ctag-U5cHBcnxa5dhkYXjd1LFgQ
-----收到消息了--------------
nack
amq.ctag-U5cHBcnxa5dhkYXjd1LFgQ
-----收到消息了--------------
nack
amq.ctag-U5cHBcnxa5dhkYXjd1LFgQ
-----收到消息了--------------
nack

消費端也可以拒絕消息,

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class SimpleConsumer extends DefaultConsumer {

    public SimpleConsumer(Channel channel){
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
        System.out.println(consumerTag);
        System.out.println("-----收到消息了--------------");

        if(properties.getHeaders().get("error") != null){
            try {
                TimeUnit.SECONDS.sleep(15);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
           
            //這個api也支持拒絕消息消費愕乎,第二個參數(shù)表示是否重新入隊列
            this.getChannel().basicReject(envelope.getDeliveryTag(),false);
            System.out.println("消息無法消費阵苇,拒絕消息");
            return;
        }
        System.out.println("消息屬性為:"+properties);
        System.out.println("消息內(nèi)容為:"+new String(body));


        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.getChannel().basicAck(envelope.getDeliveryTag(),false);
        System.out.println("消息消費成功");
    }
}

控制臺打印,因為設(shè)置了不重新入隊列感论,所以不再重新發(fā)消息了:

amq.ctag-kiy_49AkC3f4qRkqCMujrw
amq.ctag-kiy_49AkC3f4qRkqCMujrw
-----收到消息了--------------
消息無法消費绅项,拒絕消息

總結(jié)
消費端的消息確認(rèn)分為二個步驟,

  • 在channel.basicConsume指定為手動確認(rèn)比肄。
  • 具體根據(jù)業(yè)務(wù)邏輯來進行判斷什么是ack什么時候nack(又分為要不要重新requeue)

這邊有個問題就是nack時候或者reject時候重新入隊列如果業(yè)務(wù)端因為代碼邏輯問題一直重發(fā)怎樣去設(shè)置一個次數(shù)值快耿?
我的設(shè)想就是設(shè)置一個重新發(fā)送的遞增值囊陡,這個值與消息id對應(yīng),去處理解決它掀亥∽卜矗或者在redis或者memcache等其他保存方式然后記錄這個重發(fā)次數(shù)。
How do I set a number of retry attempts in RabbitMQ?

Spring AMQP消費端實現(xiàn)消息確認(rèn)

自動確認(rèn)

配置類

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.TimeUnit;

@Configuration
public class MQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("zhihao.miao.order");
        /**
         * 自動確認(rèn)涉及到一個問題就是如果在消息消息的時候拋出異常搪花,消息處理失敗遏片,但是因為自動確認(rèn)而server將該消息刪除了。
         * NONE表示自動確認(rèn)
         */
        container.setAcknowledgeMode(AcknowledgeMode.NONE);
        container.setMessageListener((MessageListener) message -> {
            System.out.println("====接收到消息=====");
            System.out.println(new String(message.getBody()));

            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //相當(dāng)于自己的一些消費邏輯拋錯誤
            throw new NullPointerException("consumer fail");

        });
        return container;
    }
}

應(yīng)用啟動類

import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.util.concurrent.TimeUnit;

@ComponentScan
public class Application {

    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);

        TimeUnit.SECONDS.sleep(100);
        System.out.println("message container startup");

        context.close();
    }
}

控制臺打哟楦汀:

====接收到消息=====
{"orderId":"d232eea5-35ae-4534-80f4-cfb31f49178f","createTime":"2017-10-22T22:11:34.239","price":100.0}
十月 22, 2017 10:11:58 下午 org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler handleError
警告: Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception

Web控制臺上顯示消息消費確認(rèn)也成功吮便。問題還是自動確認(rèn)會造成事實上的消息丟失。

手動確認(rèn)

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.TimeUnit;

@Configuration
public class MQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("zhihao.miao.order");

        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
            System.out.println("====接收到消息=====");
            System.out.println(new String(message.getBody()));
            TimeUnit.SECONDS.sleep(10);
            if(message.getMessageProperties().getHeaders().get("error") == null){
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
                System.out.println("消息已經(jīng)確認(rèn)");
            }else {
                //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
                channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
                System.out.println("消息拒絕");
            }

        });
        return container;
    }
}

總結(jié)

AcknowledgeMode.NONE:自動確認(rèn)幢踏,等效于autoAck=true
AcknowledgeMode.MANUAL:手動確認(rèn)髓需,等效于autoAck=false,此時如果要實現(xiàn)ack和nack回執(zhí)的話房蝉,使用ChannelAwareMessageListener監(jiān)聽器處理授账。

AcknowledgeMode.AUTO的使用

我們發(fā)現(xiàn)AcknowledgeMode除了AcknowledgeMode.NONEAcknowledgeMode.MANUAL常量值之外還有一個AcknowledgeMode.AUTO的常量。

配置類

import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("zhihao.miao.order");
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setMessageListener((MessageListener) (message) -> {
            System.out.println("====接收到消息=====");
            System.out.println(new String(message.getBody()));
            //拋出NullPointerException異常則重新入隊列
            //throw new NullPointerException("消息消費失敗");
            //當(dāng)拋出的異常是AmqpRejectAndDontRequeueException異常的時候惨驶,則消息會被拒絕,且requeue=false
            //throw new AmqpRejectAndDontRequeueException("消息消費失敗");
            //當(dāng)拋出ImmediateAcknowledgeAmqpException異常敛助,則消費者會被確認(rèn)
            throw new ImmediateAcknowledgeAmqpException("消息消費失敗");

        });
        return container;
    }
}

應(yīng)用啟動類

import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.util.concurrent.TimeUnit;

@ComponentScan
public class Application {

    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);

        TimeUnit.SECONDS.sleep(100);
        System.out.println("message container startup");

        context.close();
    }
}

AcknowledgeMode.AUTO 根據(jù)方法的執(zhí)行情況來決定是否確認(rèn)還是拒絕(是否重新入queue)

  • 如果消息成功被消費(成功的意思就是在消費的過程中沒有拋出異常)粗卜,則自動確認(rèn)。

1)當(dāng)拋出AmqpRejectAndDontRequeueException異常的時候纳击,則消息會被拒絕续扔,且requeue=false(不重新入隊列)
2)當(dāng)拋出ImmediateAcknowledgeAmqpException異常,則消費者會被確認(rèn)
3)其他的異常焕数,則消息會被拒絕纱昧,且requeue=true(如果此時只有一個消費者監(jiān)聽該隊列,則有發(fā)生死循環(huán)的風(fēng)險堡赔,多消費端也會造成資源的極大浪費识脆,這個在開發(fā)過程中一定要避免的)∩埔眩可以通過setDefaultRequeueRejected(默認(rèn)是true)去設(shè)置灼捂,

源碼分析

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainerdoReceiveAndExecute方法,

private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Throwable { //NOSONAR

    Channel channel = consumer.getChannel();

    for (int i = 0; i < this.txSize; i++) {

        logger.trace("Waiting for message from consumer.");
        Message message = consumer.nextMessage(this.receiveTimeout);
        if (message == null) {
            break;
        }
        try {
           //具體的邏輯换团,具體執(zhí)行Listener
            executeListener(channel, message);
        }
        //當(dāng)ImmediateAcknowledgeAmqpException異常的時候打印日志然后直接break
        catch (ImmediateAcknowledgeAmqpException e) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("User requested ack for failed delivery: "
                        + message.getMessageProperties().getDeliveryTag());
            }
            break;
        }
        catch (Throwable ex) { //NOSONAR
            if (causeChainHasImmediateAcknowledgeAmqpException(ex)) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("User requested ack for failed delivery: "
                            + message.getMessageProperties().getDeliveryTag());
                }
                break;
            }
            if (this.transactionManager != null) {
                if (this.transactionAttribute.rollbackOn(ex)) {
                    RabbitResourceHolder resourceHolder = (RabbitResourceHolder) TransactionSynchronizationManager
                            .getResource(getConnectionFactory());
                    if (resourceHolder != null) {
                        consumer.clearDeliveryTags();
                    }
                    else {
                        /*
                         * If we don't actually have a transaction, we have to roll back
                         * manually. See prepareHolderForRollback().
                         */
                        consumer.rollbackOnExceptionIfNecessary(ex);
                    }
                    throw ex; // encompassing transaction will handle the rollback.
                }
                else {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("No rollback for " + ex);
                    }
                    break;
                }
            }
            else {
               //進入這邊
                consumer.rollbackOnExceptionIfNecessary(ex);
                throw ex;
            }
        }
    }

    return consumer.commitIfNecessary(isChannelLocallyTransacted(channel));

}

進入rollbackOnExceptionIfNecessary方法

public void rollbackOnExceptionIfNecessary(Throwable ex) throws Exception {

  //當(dāng)ack機制為AUTO的時候
    boolean ackRequired = !this.acknowledgeMode.isAutoAck() && !this.acknowledgeMode.isManual();
    try {
        if (this.transactional) {
            if (logger.isDebugEnabled()) {
                logger.debug("Initiating transaction rollback on application exception: " + ex);
            }
            RabbitUtils.rollbackIfNecessary(this.channel);
        }
        if (ackRequired) {
           //是否入隊列悉稠,shouldRequeue就是具體的入隊列和不入隊列的判斷
            boolean shouldRequeue = RabbitUtils.shouldRequeue(this.defaultRequeuRejected, ex, logger);
            for (Long deliveryTag : this.deliveryTags) {
                // With newer RabbitMQ brokers could use basicNack here...
                //執(zhí)行拒絕策略
                this.channel.basicReject(deliveryTag, shouldRequeue);
            }
            if (this.transactional) {
                // Need to commit the reject (=nack)
                RabbitUtils.commitIfNecessary(this.channel);
            }
        }
    }
    catch (Exception e) {
        logger.error("Application exception overridden by rollback exception", ex);
        throw e;
    }
    finally {
        this.deliveryTags.clear();
    }
}

是否入隊列的判斷(shouldRequeue

public static boolean shouldRequeue(boolean defaultRequeueRejected, Throwable throwable, Log logger) {
    boolean shouldRequeue = defaultRequeueRejected ||
            throwable instanceof MessageRejectedWhileStoppingException;
    Throwable t = throwable;
    while (shouldRequeue && t != null) {
       //如果拋出的異常是AmqpRejectAndDontRequeueException的時候,不入隊列
        if (t instanceof AmqpRejectAndDontRequeueException) {
            shouldRequeue = false;
        }
        t = t.getCause();
    }
    if (logger.isDebugEnabled()) {
        logger.debug("Rejecting messages (requeue=" + shouldRequeue + ")");
    }
    return shouldRequeue;
}

container.setDefaultRequeueRejected(false);艘包,那么消息就不會重新入隊列的猛,只會拒絕一次耀盗。

@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("zhihao.miao.order");
    //自動確認(rèn)涉及到一個問題就是如果在消息消息的時候拋出異常,消息處理失敗卦尊,但是因為自動確認(rèn)而server將該消息刪除了叛拷。
    //NONE表示自動確認(rèn)
    container.setAcknowledgeMode(AcknowledgeMode.AUTO);
    container.setDefaultRequeueRejected(false);
    container.setMessageListener((MessageListener) (message) -> {
        System.out.println("====接收到消息=====");
        System.out.println(new String(message.getBody()));
        throw new NullPointerException("消息消費失敗");
        //throw new AmqpRejectAndDontRequeueException("消息消費失敗");
        //throw new ImmediateAcknowledgeAmqpException("消息消費失敗");

    });
    return container;
}

使用@RabbitListener注解監(jiān)聽隊列

設(shè)置確認(rèn)模式是通過在容器中設(shè)置RabbitListenerContainerFactory實例的setAcknowledgeMode方法來設(shè)定。

配置:

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConsumerConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        return factory;
    }

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //默認(rèn)的確認(rèn)模式是AcknowledgeMode.AUTO
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }
}

處理器:

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

@Component
public class MessageHandler {

    @RabbitListener(queues ="zhihao.miao.order")
    public void handleMessage(byte[] bytes, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        System.out.println("====消費消息===handleMessage");
        System.out.println(new String(bytes));
        channel.basicAck(tag,false);
    }
}

應(yīng)用啟動類:

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.util.concurrent.TimeUnit;

@EnableRabbit
@ComponentScan
public class Application {
    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
        System.out.println("rabbit service startup");
        TimeUnit.SECONDS.sleep(3000);
        context.close();
    }
}

可靠消息總結(jié)

實際使用mq的實例猫牡,每段時間定期的給經(jīng)常訂早餐的推送短信(上新品)胡诗。
登錄短信(也是使用消息中間件)
下單的時候,使用消息中間件發(fā)送到配送系統(tǒng)(消息不能丟失)淌友。

做到消息不能丟失煌恢,我們就要實現(xiàn)可靠消息,做到這一點震庭,我們要做到下面二點:

一:持久化
1: exchange要持久化
2: queue要持久化
3: message要持久化
二:消息確認(rèn)
1: 啟動消費返回(@ReturnList注解瑰抵,生產(chǎn)者就可以知道哪些消息沒有發(fā)出去)
2:生產(chǎn)者和Server(broker)之間的消息確認(rèn)。
3: 消費者和Server(broker)之間的消息確認(rèn)器联。

對于重要的消息二汛,要結(jié)合本地的消息表才能上生產(chǎn)。

參考資料Consumer Acknowledgements and Publisher Confirms

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末拨拓,一起剝皮案震驚了整個濱河市肴颊,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌渣磷,老刑警劉巖婿着,帶你破解...
    沈念sama閱讀 223,002評論 6 519
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異醋界,居然都是意外死亡竟宋,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,357評論 3 400
  • 文/潘曉璐 我一進店門形纺,熙熙樓的掌柜王于貴愁眉苦臉地迎上來丘侠,“玉大人,你說我怎么就攤上這事逐样∥献郑” “怎么了?”我有些...
    開封第一講書人閱讀 169,787評論 0 365
  • 文/不壞的土叔 我叫張陵脂新,是天一觀的道長秽澳。 經(jīng)常有香客問我,道長戏羽,這世上最難降的妖魔是什么担神? 我笑而不...
    開封第一講書人閱讀 60,237評論 1 300
  • 正文 為了忘掉前任,我火速辦了婚禮始花,結(jié)果婚禮上妄讯,老公的妹妹穿的比我還像新娘孩锡。我一直安慰自己,他們只是感情好亥贸,可當(dāng)我...
    茶點故事閱讀 69,237評論 6 398
  • 文/花漫 我一把揭開白布躬窜。 她就那樣靜靜地躺著,像睡著了一般炕置。 火紅的嫁衣襯著肌膚如雪荣挨。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,821評論 1 314
  • 那天朴摊,我揣著相機與錄音默垄,去河邊找鬼。 笑死甚纲,一個胖子當(dāng)著我的面吹牛口锭,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播介杆,決...
    沈念sama閱讀 41,236評論 3 424
  • 文/蒼蘭香墨 我猛地睜開眼鹃操,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了春哨?” 一聲冷哼從身側(cè)響起荆隘,我...
    開封第一講書人閱讀 40,196評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎赴背,沒想到半個月后椰拒,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,716評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡癞尚,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,794評論 3 343
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了乱陡。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片浇揩。...
    茶點故事閱讀 40,928評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖憨颠,靈堂內(nèi)的尸體忽然破棺而出胳徽,到底是詐尸還是另有隱情,我是刑警寧澤爽彤,帶...
    沈念sama閱讀 36,583評論 5 351
  • 正文 年R本政府宣布养盗,位于F島的核電站,受9級特大地震影響适篙,放射性物質(zhì)發(fā)生泄漏往核。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,264評論 3 336
  • 文/蒙蒙 一嚷节、第九天 我趴在偏房一處隱蔽的房頂上張望聂儒。 院中可真熱鬧虎锚,春花似錦、人聲如沸衩婚。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,755評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽非春。三九已至柱徙,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間奇昙,已是汗流浹背护侮。 一陣腳步聲響...
    開封第一講書人閱讀 33,869評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留敬矩,地道東北人概行。 一個月前我還...
    沈念sama閱讀 49,378評論 3 379
  • 正文 我出身青樓,卻偏偏與公主長得像弧岳,于是被迫代替她去往敵國和親凳忙。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,937評論 2 361

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理禽炬,服務(wù)發(fā)現(xiàn)涧卵,斷路器,智...
    卡卡羅2017閱讀 134,720評論 18 139
  • 背景介紹 Kafka簡介 Kafka是一種分布式的腹尖,基于發(fā)布/訂閱的消息系統(tǒng)柳恐。主要設(shè)計目標(biāo)如下: 以時間復(fù)雜度為O...
    高廣超閱讀 12,843評論 8 167
  • 來源 RabbitMQ是用Erlang實現(xiàn)的一個高并發(fā)高可靠AMQP消息隊列服務(wù)器。支持消息的持久化热幔、事務(wù)乐设、擁塞控...
    jiangmo閱讀 10,369評論 2 34
  • 本文轉(zhuǎn)載自http://dataunion.org/?p=9307 背景介紹Kafka簡介Kafka是一種分布式的...
    Bottle丶Fish閱讀 5,476評論 0 34
  • 1.什么是消息隊列 消息隊列允許應(yīng)用間通過消息的發(fā)送與接收的方式進行通信,當(dāng)消息接收方服務(wù)忙或不可用時绎巨,其提供了一...
    zhuke閱讀 4,476評論 0 12