rabbit mq 學(xué)習(xí)

% rabbitMQ learn
% qijun
% 19/01/2018

mq 的一些概念

  • mq: mq 是一個message broker (消息中介)
  • AMQP (Advanced Message Queue ) 一個標(biāo)準(zhǔn)的消息隊列標(biāo)準(zhǔn)
  • RabbitMQ是一個由erlang開發(fā)的AMQP(Advanced Message Queue )的開源實現(xiàn)

rabbit mq 的一些概念

rabbit mq 的適用場景架構(gòu)圖

image.png
  • Client A &Client B 為消息的producer 消息由payload 和 label 組成晰洒,label是exchange的名字或者說是一個tag闺金,它描述了payload湃崩,而且RabbitMQ也是通過這個label來決定把這個Message發(fā)給哪個Consumer
  • client 1 & client2 & client3 消息的consumer, 消息的接受者 接收到的消息是去除label 的消息艇纺,緊包含消息的內(nèi)容苔严,消費者通過訂閱隊列獲取消息刀荒。
  • 中間是的 rabbit server 由 交換器,routingKey 和queue 組成首懈,交換器和queue 通過routingKey 綁定看成,消息通過交換器和routingKey 路由到相應(yīng)的queue
  • Connection: 就是一個TCP的連接。Producer和Consumer都是通過TCP連接到RabbitMQ Server的鹏秋。程序的起始處就是建立這個TCP連接尊蚁。
  • Channels: 虛擬連接。它建立在上述的TCP連接中侣夷。數(shù)據(jù)流動都是在Channel中進(jìn)行的横朋。也就是說,一般情況是程序起始建立TCP連接百拓,第二步就是建立這個Channel琴锭。

四種交換器

由上面可知,消息通過交換器衙传,通過對應(yīng)的routekey 路由到queue, 交換器的類型一共有三種

  1. direct 如果 routing key 匹配, 那么Message就會被傳遞到相應(yīng)的queue中
  2. fanout 廣播到所有綁定的queue(假設(shè)你有一個消息需要發(fā)送給a和b,如果現(xiàn)在還需要發(fā)送給c决帖,使用fanout 交換器,只需要在c的代碼中創(chuàng)建一個隊列蓖捶,然后綁定到fanout 交換器即可)
  3. topic 對key進(jìn)行模式匹配地回,比如ab.1,ab.2都可以傳遞到所有routingkey 為ab.*的queue
    基于topic類型交換器的routing key不是唯一的,而是一系列詞俊鱼,基于點區(qū)分刻像。
    例如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"
    binding key也是。*表示只匹配一個關(guān)鍵字 #可以匹配0或者多個關(guān)鍵字并闲。
    比如*.a.b的隊列接受1.a.b 或者2.a.b等等
  4. header header交換器和 direct幾乎一樣细睡,性能更差,基本不會用到

匿名交換器(默認(rèn))

事實上焙蚓,你在代碼中不創(chuàng)建交換器也是可以通過rabbit mq 發(fā)送消息的纹冤,因為rabbit 提供了默認(rèn)的交換器。

image.png

如圖中空白字符串名字的交換器為默認(rèn)的交換器购公,類型為direct
本質(zhì)上所有的消息發(fā)送都要送往exchange(可以沒有隊列萌京,但不能沒有交換機(jī),沒有隊列時消息直接被丟棄)宏浩。
RabbitMQ提供了一種直接向Queue發(fā)送消息的快捷方法:直接使用未命名的exchange知残,不用綁定routing_key,直接用它指定隊列名比庄。

  channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 發(fā)送消息
        String message = "Hello World!";
        // basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
        // 參數(shù)1 exchange :交換器
        // 參數(shù)2 routingKey : 路由鍵
        // 參數(shù)3 props : 消息的其他參數(shù)
        // 參數(shù)4 body : 消息體
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

消息的確認(rèn)和拒絕

使用ack確認(rèn)Message的正確傳遞
默認(rèn)情況下求妹,如果Message 已經(jīng)被某個Consumer正確的接收到了,那么該Message就會被從queue中移除佳窑。當(dāng)然也可以讓同一個Message發(fā)送到很多的Consumer
如果一個queue沒被任何的Consumer Subscribe(訂閱)制恍,那么础废,如果這個queue有數(shù)據(jù)到達(dá)韧掩,那么這個數(shù)據(jù)會被cache,不會被丟棄铅乡。當(dāng)有Consumer時,這個數(shù)據(jù)會被立即發(fā)送到這個Consumer鹃唯,這個數(shù)據(jù)被Consumer正確收到時爱榕,這個數(shù)據(jù)就被從queue中刪除。
那么什么是正確收到呢坡慌?通過ack黔酥。每個Message都要被acknowledged(確認(rèn),ack)洪橘。我們可以顯示的在程序中去ack跪者,也可以自動的ack。
如果在收到數(shù)據(jù)后處理數(shù)據(jù)時程序發(fā)生錯誤梨树,無法正確處理數(shù)據(jù)坑夯,而是被reject岖寞。reject 參數(shù)設(shè)為true時RabbitMQ Server會把這個信息發(fā)送到下一個Consumer抡四,設(shè)為false也可以從隊列中把這條消息刪除。
如果這個app有bug仗谆,忘記了ack指巡,那么RabbitMQ Server不會再發(fā)送數(shù)據(jù)給它,因為Server認(rèn)為這個Consumer處理能力有限隶垮。
而且ack的機(jī)制可以起到限流的作用(Benefitto throttling):在Consumer處理完成數(shù)據(jù)后發(fā)送ack藻雪,甚至在額外的延時后發(fā)送ack,將有效的balance Consumer的load狸吞。

在什么地方創(chuàng)建queue

Consumer和Procuder都可以通過 queue.declare 創(chuàng)建queue勉耀。對于某個Channel來說,Consumer不能declare一個queue蹋偏,卻訂閱其他的queue便斥。當(dāng)然也可以創(chuàng)建私有的queue。這樣只有app本身才可以使用這個queue威始。queue也可以自動刪除枢纠,被標(biāo)為auto-delete的queue在最后一個Consumer unsubscribe后就會被自動刪除。那么如果是創(chuàng)建一個已經(jīng)存在的queue呢黎棠?那么不會有任何的影響晋渺。需要注意的是沒有任何的影響,也就是說第二次創(chuàng)建如果參數(shù)和第一次不一樣脓斩,那么該操作雖然成功木西,但是queue的屬性并不會被修改。

那么誰應(yīng)該負(fù)責(zé)創(chuàng)建這個queue呢随静?是Consumer八千,還是Producer?

如果queue不存在,當(dāng)然Consumer不會得到任何的Message叼丑。但是如果queue不存在关翎,那么Producer Publish的Message會被丟棄。所以鸠信,還是為了數(shù)據(jù)不丟失纵寝,Consumer和Producer都try to create the queue!反正不管怎么樣星立,這個接口都不會出問題爽茴。
queue對load balance的處理是完美的。對于多個Consumer來說绰垂,RabbitMQ 使用循環(huán)的方式(round-robin)的方式均衡的發(fā)送給不同的Consumer室奏。

VirtualHost

在RabbitMQ中可以虛擬消息服務(wù)器VirtualHost,每個VirtualHost相當(dāng)月一個相對獨立的RabbitMQ服務(wù)器劲装,每個VirtualHost之間是相互隔離的胧沫。exchange、queue占业、message不能互通绒怨。
在RabbitMQ中無法通過AMQP創(chuàng)建VirtualHost,可以通過以下命令來創(chuàng)建谦疾。
rabbitmqctl add_vhost [vhostname]

windows下如何安裝rabbit mq

  1. rabbit mq 運行于erlang之上南蹂,需要先安裝erlang http://www.erlang.org/downloads 下載,并使用管理員運行安裝
  2. 安裝rabbit mq https://www.rabbitmq.com/download.html
  3. 新增環(huán)境變量 ERLANG_HOME= C:\Program Files\erl9.2
    RABBITMQ_SERVER = C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.2
    配置環(huán)境變量
    Path=%ERLANG_HOME%\bin;%RABBITMQ_SERVER%\sbin
  4. 替換 erlang cookie
    拷貝C:\WINDOWS 下的.erlang.cookie (還有可能在C:\Windows\System32\config\systemprofile)文件替換 C:\Users%USERNAME%.erlang.cookie 或者 C:\Documents and
    Settings%USERNAME%.erlang.cookie
  5. 通過startMenu 啟動erlang 服務(wù) 和停止 rabbit mq 可以以服務(wù)的方式和按進(jìn)程的方式啟動念恍,建議使用服務(wù)方式啟動,然后在rabbit mq的命令行(RabbitMQ Command Prompt 開始菜單中) 執(zhí)行 rabbitmq-plugins enable rabbitmq_management
    最后就可以通過 http://localhost:15672/ 賬號guest 密碼guest 訪問rabbit mq的控制臺 /是默認(rèn)的VirtualHost

常用命令

停止 broker
查詢 broker 狀態(tài) rabbitmqctl status
更多的命令請查閱 https://www.rabbitmq.com/man/rabbitmqctl.8.html

實戰(zhàn)

下面會通過兩個例子六剥,演示如何使用rabbitmq,第一個原生的java api 使用direct 交換器演示 routing,第二個例子使用topic 交換器峰伙。spring mvc疗疟,spring boot 中的注解和接口本質(zhì)上是對原生接口的包裝,spring 會隱藏一些操作词爬,對理解rabbit mq的工作流程會造成阻礙秃嗜,先使用原生api做演示一般的工作流程,而后結(jié)合springboot 演示在項目中如何使用rabbit mq顿膨。

rabbitmq 消費者和生產(chǎn)者兩端的在處理消息時經(jīng)歷的步驟

  1. 創(chuàng)建連接工廠ConnectionFactory
  2. 通過連接獲取通信通道Channel
  3. 聲明交換機(jī)Exchange(可選)
  4. 申明隊列(可選)
  5. 綁定交換機(jī)和隊列(可選)
    之后生產(chǎn)者通過channel發(fā)送消息锅锨,消費者獲取并處理消息

rabbitmq comsumer 消息獲取方式

rabbitMQ中consumer通過建立到queue的連接,創(chuàng)建channel對象恋沃,通過channel通道獲取message,
Consumer可以聲明式的以API輪詢poll的方式主動從queue的獲取消息必搞,也可以通過訂閱的方式被動的從Queue中消費消息。

使用原生rabbitmq api 的例子

代碼發(fā)送三種類型的日志到交換器囊咏,交換器通過routingkey 分發(fā)到不同的queue

maven 依賴

   <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.3</version>
        </dependency>

消息發(fā)送

public class EmitLogDirect {
    private static final String EXCHANGE_NAME = "direct_logs";
    private static final String[] LOG_LEVEL_ARR = {"debug", "info", "error"};

    public static void main(String[] args) throws IOException, TimeoutException {
        // 創(chuàng)建連接
        ConnectionFactory factory = new ConnectionFactory();
        // 設(shè)置 RabbitMQ 的主機(jī)名
        factory.setHost("localhost");
        // 創(chuàng)建一個連接
        Connection connection = factory.newConnection();
        // 創(chuàng)建一個通道
        Channel channel = connection.createChannel();
        // 指定一個交換器
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 發(fā)送消息
        for (int i = 0; i < 10; i++)  {
            int rand = new Random().nextInt(3);
            String severity  = LOG_LEVEL_ARR[rand];
            String message = "Qijun-MSG log : [" +severity+ "]" + UUID.randomUUID().toString();
            // 發(fā)布消息至交換器
            channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }
        // 關(guān)閉頻道和連接
        channel.close();
        connection.close();
    }
}

消息接收

public class ReceiveLogsDirect {
    private static final String EXCHANGE_NAME = "direct_logs";
    private static final String[] LOG_LEVEL_ARR = {"debug", "info", "error"};

    public static void main(String[] args) throws IOException, TimeoutException {
        // 創(chuàng)建連接
        ConnectionFactory factory = new ConnectionFactory();
        // 設(shè)置 RabbitMQ 的主機(jī)名
        factory.setHost("localhost");
        // 創(chuàng)建一個連接
        Connection connection = factory.newConnection();
        // 創(chuàng)建一個通道
        Channel channel = connection.createChannel();
        // 指定一個交換器
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 設(shè)置日志級別
        int rand = new Random().nextInt(3);

        // 創(chuàng)建三個非持久的恕洲、唯一的塔橡、自動刪除的隊列,分別接收不同的日志信息
        String debugQueueName = channel.queueDeclare().getQueue();
        String InfoQueueName = channel.queueDeclare().getQueue();
        String ErrorQueueName = channel.queueDeclare().getQueue();
        // 綁定交換器和隊列
        // queueBind(String queue, String exchange, String routingKey)
        // 參數(shù)1 queue :隊列名
        // 參數(shù)2 exchange :交換器名
        // 參數(shù)3 routingKey :路由鍵名
        channel.queueBind(debugQueueName, EXCHANGE_NAME, LOG_LEVEL_ARR[0]);
        channel.queueBind(InfoQueueName, EXCHANGE_NAME, LOG_LEVEL_ARR[1]);
        channel.queueBind(ErrorQueueName, EXCHANGE_NAME, LOG_LEVEL_ARR[2]);

        // rabbit mq 消息的推送支持poll 也支持訂閱霜第,先創(chuàng)建一個poll 方式的comsumer
        QueueingConsumer pollConsumer = new QueueingConsumer(channel);
        channel.basicConsume(ErrorQueueName, true, pollConsumer);

        // 創(chuàng)建訂閱類型的消費者
        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Received '" + message + "' from "+envelope.getRoutingKey()+ " by subscribe" );
            }
        };
        channel.basicConsume(debugQueueName, true, consumer);
        channel.basicConsume(InfoQueueName, true, consumer);

        // 通過 循環(huán)poll 獲取隊列中的所有消息  
        while (true) {
            QueueingConsumer.Delivery delivery = null;
            try {
                delivery = pollConsumer.nextDelivery();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            String message = new String(delivery.getBody());
            String routingKey = delivery.getEnvelope().getRoutingKey();


            System.out.println("Received '" + message + "' from "+routingKey +" by poll");
        }

    }
}

源碼

springboot 中使用rabbit mq 的例子

maven 依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

ConnectionFactory配置

// 項目中可通過配置文件讀取來獲取 connect 參數(shù)
 @Bean
    public CachingConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setHost("localhost");
        cachingConnectionFactory.setPort(5672);
        cachingConnectionFactory.setUsername("guest");
        cachingConnectionFactory.setPassword("guest");
        cachingConnectionFactory.setVirtualHost("/");
        return cachingConnectionFactory;
    }

CachingConnectionFactory 內(nèi)部通過com.rabbitmq.client.ConnectionFactory 去設(shè)置 connect的參數(shù)

public abstract class AbstractConnectionFactory implements ConnectionFactory, DisposableBean, BeanNameAware {
    private static final String BAD_URI = "setUri() was passed an invalid URI; it is ignored";
    protected final Log logger = LogFactory.getLog(this.getClass());
    private final com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory;

通過 RabbitAdmin 配置隊列葛家,交換機(jī)和binding

   public static final String  ROUTER_KEY_1 = "*.orange.*";
 @Bean
    public RabbitAdmin rabbitAdmin() {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory());
       //申明一個 一個topic類型的交換機(jī),routingkey 使用通配符
        TopicExchange topicExchange =(TopicExchange)ExchangeBuilder.topicExchange(QUEUE_EXCHANGE_NAME).durable(true).build();
        rabbitAdmin.declareExchange(topicExchange);
        Queue firstQueue = new Queue(QUEUE_NAME);
        rabbitAdmin.declareQueue(firstQueue);
        rabbitAdmin.declareBinding(BindingBuilder.bind(firstQueue).to(topicExchange).with(ROUTER_KEY_1));
        return rabbitAdmin;
    }

消息消費的兩種方法(推薦使用第二種泌类,更靈活)

  1. 通過SimpleMessageListenerContainer 綁定特定的messageListener
@Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "receive2");
    }
 @Bean
    SimpleMessageListenerContainer container(MessageListenerAdapter messageListenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setQueueNames(QUEUE_NAME);
        container.setMessageListener(messageListenerAdapter);
        return container;
    }
@Service
public class Receiver {

    public void receiveMessage(String message) {
        System.out.println("Received<" + message + ">");
    }

    public void receive2(String in) throws InterruptedException {
        System.out.println("in message"+in);
    }
}
  1. 使用 SimpleRabbitListenerContainerFactory 和 @RabbitListener 方式接收mq 的消息
  @Bean
    public SimpleRabbitListenerContainerFactory myContainerFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        //設(shè)置了每個消費者再不回ack的情況下最大可接收消息的條數(shù)
        factory.setPrefetchCount(100);
        configurer.configure(factory, connectionFactory);
        return factory;
    }
/**
 * @author 祁軍
 * 使用 SimpleRabbitListenerContainerFactory 和 @RabbitListener 方式接收mq 的消息
 */
@Service
public class Receiver1 {
    @RabbitListener(queues = "${rabbitConfiguration.queue}", containerFactory = "myContainerFactory")
    public void processMessage(String msg){
        System.out.println("Receiver1 got message" + msg);
    }
}

sender

@Service
public class Sender {
    private RabbitTemplate rabbitTemplate;

    @Autowired
    public Sender(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void send() {
       // 發(fā)送兩次routing key不同 由于 是topic exchange routing key 為通配符可達(dá)到同一隊列
        System.out.println("sender is sending message");
        rabbitTemplate.convertAndSend(RabbitMQConfig.QUEUE_EXCHANGE_NAME,"aaa.orange.bbb", "hello,world1");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        rabbitTemplate.convertAndSend(RabbitMQConfig.QUEUE_EXCHANGE_NAME,"aaa.orange.ccc", "hello,world2");
    }
}

測試

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = Application.class)
public class RabbitMQTest {

    @Autowired
    private Sender sender;

    @Test
    public void send() throws Exception {
        sender.send();
    }

}

源碼

rabbit mq 的其他應(yīng)用場景

working queue

當(dāng)有Consumer需要大量的運算時癞谒,RabbitMQ Server需要一定的分發(fā)機(jī)制來balance每個Consumer的load。試想一下刃榨,對于web application來說弹砚,在一個很多的HTTP request里是沒有時間來處理復(fù)雜的運算的,只能通過后臺的一些工作線程來完成枢希。應(yīng)用場景就是RabbitMQ Server會將queue的Message分發(fā)給不同的Consumer以處理計算密集型的任務(wù)桌吃。

image.png

RPC

MQ本身是基于異步的消息處理,前面的示例中所有的生產(chǎn)者(P)將消息發(fā)送到RabbitMQ后不會知道消費者(C)處理成功或者失敯巍(甚至連有沒有消費者來處理這條消息都不知道)茅诱。
但實際的應(yīng)用場景中,我們很可能需要一些同步處理呕屎,需要同步等待服務(wù)端將我的消息處理完成后再進(jìn)行下一步處理让簿。這相當(dāng)于RPC(Remote Procedure Call敬察,遠(yuǎn)程過程調(diào)用)秀睛。在RabbitMQ中也支持RPC。

image.png

RabbitMQ中實現(xiàn)RPC的機(jī)制是:

  1. 客戶端發(fā)送請求(消息)時莲祸,在消息的屬性(MessageProperties蹂安,在AMQP協(xié)議中定義了14中properties,這些屬性會隨著消息一起發(fā)送)中設(shè)置兩個值replyTo(一個Queue名稱锐帜,用于告訴服務(wù)器處理完成后將通知我的消息發(fā)送到這個Queue中)和correlationId(此次請求的標(biāo)識號田盈,服務(wù)器處理完成后需要將此屬性返還,客戶端將根據(jù)這個id了解哪條請求被成功執(zhí)行了或執(zhí)行失斀裳帧)
  2. 服務(wù)器端收到消息并處理
  3. 服務(wù)器端處理完消息后允瞧,將生成一條應(yīng)答消息到replyTo指定的Queue,同時帶上correlationId屬性
  4. 客戶端之前已訂閱replyTo指定的Queue蛮拔,從中收到服務(wù)器的應(yīng)答消息后述暂,根據(jù)其中的correlationId屬性分析哪條請求被執(zhí)行了,根據(jù)執(zhí)行結(jié)果進(jìn)行后續(xù)業(yè)務(wù)處理

rabbitmq 消息的可靠性

  1. 發(fā)送端的comfirm 機(jī)制建炫,通過注冊回調(diào)畦韭,我們可以知道消息是否已經(jīng)發(fā)送到exchange 或者queue,如果沒有正確發(fā)送,我們可以通過replycode來判斷進(jìn)行后續(xù)什么操作肛跌,然后根據(jù)業(yè)務(wù)場景
    比如發(fā)送告警艺配,或者重發(fā)來應(yīng)對察郁。
  2. 消息的持久化,通過交換機(jī)转唉,隊列和消息的持久化來實現(xiàn)
  3. rabbitmq 從queue 發(fā)消息給消費者皮钠,如果消費者選擇no ack 則queue每發(fā)一條消息,rabbitmq 就會把消息刪除赠法,如果cosumer 由于某種問題消費消息出錯鳞芙,rabbitmq也會把消息刪除。
    我們需要在comsumer 關(guān)閉自動ack期虾,使用basic ack 手工應(yīng)答保證消息被正確消費原朝,如果消費失敗,basic nack 可以刪除隊列消息或者重新入原隊列镶苞,可能導(dǎo)致死循環(huán)
    如果不希望把有問題的消息刪除或者重新入原來的隊列喳坠,可以指定一個死信隊列,錯誤的消息重新入死信對列茂蚓,然后再次被消費壕鹉。

發(fā)送端的ack

rabbitmq提供了確認(rèn)ack機(jī)制,可以用來確認(rèn)消息是否到broker 或者queue聋涨。

/**confirmcallback用來確認(rèn)消息是否到達(dá)broker*/     
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    if (!ack) {
        //log error
    } else {
        //maybe delete msg in db
    }
});
 /**若消息不能正確的達(dá)到指定的隊列會調(diào)用 */
rabbitTemplate.setReturnCallback((message, replyCode, replyText, tmpExchange, tmpRoutingKey) -> {
    log.info("send message failed: " + replyCode + " " + replyText);
    // resend message
   
});

消息的持久化

// 交換機(jī)的持久化
// 參數(shù)1 name :交互器名
// 參數(shù)2 durable :是否持久化
// 參數(shù)3 autoDelete :當(dāng)所有消費客戶端連接斷開后晾浴,是否自動刪除隊列
new TopicExchange(name, durable, autoDelete)

// 隊列是持久化
// 參數(shù)1 name :隊列名
// 參數(shù)2 durable :是否持久化
// 參數(shù)3 exclusive :僅創(chuàng)建者可以使用的私有隊列,斷開后自動刪除
// 參數(shù)4 autoDelete : 當(dāng)所有消費客戶端連接斷開后牍白,是否自動刪除隊列
new Queue(name, durable, exclusive, autoDelete);

springAMQP  的消息持久化是默認(rèn)的

消費者端的手工確認(rèn)

如果一直不回ack脊凰,mq會block 這個消費者

      @Bean
    SimpleMessageListenerContainer container() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setQueueNames(QUEUE_NAME);
        //設(shè)定單次可分發(fā)給消費則的消息個數(shù)
        container.setPrefetchCount(1);
        container.setMaxConcurrentConsumers(1);
        container.setConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setMessageListener(new ChannelAwareMessageListener() {

            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                byte[] body = message.getBody();
                try {
                    log.info("receive msg: " + new String(body));
                    //do something
                } catch (Exception e) {
                } finally {
//                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認(rèn)消息成功消費
                }

            }

        });
        return container;
    }

springAMQP 提供的確認(rèn)方式

很明顯上述代碼提供的手工確認(rèn)方式(使用ChannelAwareMessageListener)很不優(yōu)雅,你需要創(chuàng)建多個bean 然后綁定queue茂腥。
當(dāng)setDefaultRequeueRejected(true) (默認(rèn)情況下)狸涌,如果消息被正常消費,container 會ack最岗,然后隊列刪除消息帕胆,如果消費者拋出異常,container會reject這個消息般渡,然后這個消息會requeue到原來的消息隊列懒豹,如果業(yè)務(wù)一直處在這個異常情況下,requeue的消息會再次回到消費者驯用,然后死循環(huán)脸秽,這種情況很顯然不行,spring AMQP 提供的替代方式:listener拋出AmqpRejectAndDontRequeueException晨汹,則這個消息會被拋棄豹储,或者進(jìn)入死信隊列,Listener拋出AmqpRejectAndDontRequeueException還可以通過配置factory 的ErrorHandler 把你拋出的異常 轉(zhuǎn)換為AmqpRejectAndDontRequeueException,如下式例淘这,如果你的listener 拋出了XMLException 則這個消息會被discard(在沒有配置死信隊列的情況下)剥扣。

factory.setErrorHandler(new ConditionalRejectingErrorHandler(
                t -> t instanceof ListenerExecutionFailedException && t.getCause() instanceof XMLException));

factory.setDefaultRequeueRejected(false); 則只要listener 拋出異常巩剖,message就會被discard或者轉(zhuǎn)入死信隊列,如果需要針對不同的異常(比如可短時間內(nèi)恢復(fù)的異常钠怯,需要重入原隊列佳魔,不可恢復(fù)的異常discard 或者入死信隊列)建議設(shè)置成true,然后配置ErrorHandler 如上

springAMQP 如何配置死信隊列

當(dāng)然你可以通過創(chuàng)建一個死信隊列晦炊,然后在listener端消費時重新發(fā)送到死信隊列鞠鲜,但springAMQP 提供了更好的方式如下

@Bean
TopicExchange exchange()
{
    return new TopicExchange(DEFAULT_EXCHANGE);
}

@Bean
Queue deadLetterQueue()
{
    return new Queue(DEAD_LETTER_QUEUE,true);
}

@Bean
Queue queue()
{
    // 通過args參數(shù)為當(dāng)前隊列綁定一個死信隊列
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-dead-letter-exchange", DEFAULT_EXCHANGE);
    args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE);
    return new Queue(WORKORDER_QUEUE,true,false,false,args);
}
@Bean
Binding binding(Queue queue, TopicExchange exchange)
{
    return BindingBuilder.bind(queue).to(exchange).with(WORKORDER_QUEUE);
}

@Bean
Binding bindingDeadLetter(Queue deadLetterQueue, TopicExchange exchange)
{
    return BindingBuilder.bind(deadLetterQueue).to(exchange).with(DEAD_LETTER_QUEUE);
}



消費者拋出AmqpRejectAndDontRequeueException 異常時則會進(jìn)入死信隊列


  @RabbitListener(queues = RabbitConfig.WORKORDER_QUEUE)
    public void processMessage(String msg) throws Exception
    {
        
            throw new AmqpRejectAndDontRequeueException("to dead-letter");
        
    }

死信隊列的消費者

@Service
public class ErrorHandler {
    @RabbitListener(queues = "dead_queue", containerFactory = "myContainerFactory")
    public void handleError(Object message){
        System.out.println("XXXXXXX"+message);
    }
}

其他高級主題

rabbit mq的消息確認(rèn)機(jī)制(包括producer到broker 和broker 到 consumer的確認(rèn)),集群等等。

參考

https://www.rabbitmq.com/getstarted.html
https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/spring-amqp
https://docs.spring.io/spring-amqp/reference/html/
http://blog.720ui.com/2017/springboot_06_mq_rabbitmq/
http://www.cnblogs.com/xingzc/p/5945030.html
https://www.cnblogs.com/diegodu/p/4971586.html
http://blog.csdn.net/column/details/rabbitmq.html
http://blog.csdn.net/u013256816/article/category/6532725/1

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末断国,一起剝皮案震驚了整個濱河市贤姆,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌稳衬,老刑警劉巖霞捡,帶你破解...
    沈念sama閱讀 218,755評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異薄疚,居然都是意外死亡碧信,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評論 3 395
  • 文/潘曉璐 我一進(jìn)店門街夭,熙熙樓的掌柜王于貴愁眉苦臉地迎上來砰碴,“玉大人,你說我怎么就攤上這事板丽〕释鳎” “怎么了?”我有些...
    開封第一講書人閱讀 165,138評論 0 355
  • 文/不壞的土叔 我叫張陵檐什,是天一觀的道長碴卧。 經(jīng)常有香客問我,道長乃正,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,791評論 1 295
  • 正文 為了忘掉前任婶博,我火速辦了婚禮瓮具,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘凡人。我一直安慰自己名党,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,794評論 6 392
  • 文/花漫 我一把揭開白布挠轴。 她就那樣靜靜地躺著传睹,像睡著了一般。 火紅的嫁衣襯著肌膚如雪岸晦。 梳的紋絲不亂的頭發(fā)上欧啤,一...
    開封第一講書人閱讀 51,631評論 1 305
  • 那天睛藻,我揣著相機(jī)與錄音,去河邊找鬼邢隧。 笑死店印,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的倒慧。 我是一名探鬼主播按摘,決...
    沈念sama閱讀 40,362評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼纫谅!你這毒婦竟也來了炫贤?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,264評論 0 276
  • 序言:老撾萬榮一對情侶失蹤付秕,失蹤者是張志新(化名)和其女友劉穎照激,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體盹牧,經(jīng)...
    沈念sama閱讀 45,724評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡俩垃,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了汰寓。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片口柳。...
    茶點故事閱讀 40,040評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖有滑,靈堂內(nèi)的尸體忽然破棺而出跃闹,到底是詐尸還是另有隱情,我是刑警寧澤毛好,帶...
    沈念sama閱讀 35,742評論 5 346
  • 正文 年R本政府宣布望艺,位于F島的核電站,受9級特大地震影響肌访,放射性物質(zhì)發(fā)生泄漏找默。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,364評論 3 330
  • 文/蒙蒙 一吼驶、第九天 我趴在偏房一處隱蔽的房頂上張望惩激。 院中可真熱鬧,春花似錦蟹演、人聲如沸风钻。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽骡技。三九已至,卻和暖如春羞反,著一層夾襖步出監(jiān)牢的瞬間布朦,已是汗流浹背囤萤。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留喝滞,地道東北人阁将。 一個月前我還...
    沈念sama閱讀 48,247評論 3 371
  • 正文 我出身青樓,卻偏偏與公主長得像右遭,于是被迫代替她去往敵國和親做盅。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,979評論 2 355