RabbitMQ基礎(chǔ)知識(shí)

RabbitMQ簡(jiǎn)介

RabbitMQ是一個(gè)開源的消息代理和隊(duì)列服務(wù)器,用來通過普通協(xié)議在完全不同的應(yīng)用之間共享數(shù)據(jù)冈敛,RabbitMQ是使用Erlang語言來編寫的,并且RabbitMQ是給予AMQP協(xié)議(Advanced Message Queuing Protocol 高級(jí)消息隊(duì)列協(xié)議僵刮,是具有現(xiàn)代特征的二進(jìn)制協(xié)議涂臣。是一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn)羹幸,為面向消息的中間件設(shè)計(jì))的脊髓。

AMQP核心概念

  1. Server:又稱Broker,接收客戶端的連接栅受,實(shí)現(xiàn)AMQP實(shí)體服務(wù)将硝;
  2. Connection:連接,應(yīng)用程序和Broker之間的網(wǎng)絡(luò)連接窘疮;
  3. Channel:網(wǎng)絡(luò)信道袋哼,幾乎所有的操作都是在Channel中進(jìn)行的,Channel是進(jìn)行消息讀寫的通道闸衫√喂幔客戶端可以建立多個(gè)Channel,每個(gè)Channel代表一個(gè)會(huì)話任務(wù)蔚出,有點(diǎn)類似于數(shù)據(jù)中的session弟翘;
  4. Message:消息,服務(wù)器和應(yīng)用程序之間傳送的數(shù)據(jù)骄酗,由Properties和Body組成稀余。Properties可以對(duì)消息進(jìn)行修飾,比如消息的優(yōu)先級(jí)趋翻、延遲等高級(jí)特性睛琳,Body則就是消息體內(nèi)容;
  5. Virtual Host:虛擬主機(jī)踏烙,用于進(jìn)行邏輯隔離师骗,最上層的消息路由。一個(gè)Virtual Host里面可以有若干個(gè)Exchange和Queue讨惩,同一個(gè)Virtual Host里面不能有相同名稱的Exchange或Queue辟癌,有點(diǎn)類似于Redis中的16個(gè)db,是邏輯層面的隔離荐捻;
  6. Exchange:交換機(jī)黍少,接收消息,根據(jù)路由鍵轉(zhuǎn)發(fā)消息到綁定的隊(duì)列(Producer生產(chǎn)消息后都是直接投遞到Exchange中)处面;
  7. Binding:Exchange和Queue之間的虛擬連接厂置,binding中可以包含routing key;
  8. Routing Key:一個(gè)路由規(guī)則魂角,虛擬機(jī)可以用它來確定如何路由一個(gè)特定的消息昵济;
  9. Queue:也被稱為Message Queue,消息隊(duì)列,保存消息并將它們轉(zhuǎn)發(fā)給消費(fèi)者砸紊。

RabbitMQ架構(gòu)圖

RabbitMQ架構(gòu)圖.jpeg

Producer生產(chǎn)消息之后直接將消息投遞到Exchange中传于,在投遞的時(shí)候需要指定兩個(gè)重要的信息,一個(gè)是消息需要被投遞到哪個(gè)Exchange上醉顽,另一個(gè)是Routing Key沼溜,也就是將消息路由到哪個(gè)Message Queue上。

RabbitMQ安裝

參考官網(wǎng)的安裝游添,已經(jīng)非常詳細(xì)了系草,官網(wǎng)推薦的安裝是將RabbitMQ和Erlang一起安裝了,如果要單獨(dú)安裝的話唆涝,需要注意RabbitMQ和Erlang之間的版本需要對(duì)應(yīng)找都。
https://www.rabbitmq.com/install-rpm.html

RabbitMQ基本使用

  1. 服務(wù)的啟動(dòng):rabbitmq-server start &
  2. 服務(wù)的停止:rabbitmqctl stop_app
  3. 管理插件:rabbitmq-plugins enable rabbitmq_management(啟動(dòng)管控臺(tái)插件,方便圖形化管理rabbitmq)
  4. 訪問地址:http://localhost:15672

RabbitMQ常用命令-基礎(chǔ)操作

  1. rabbitmqctl stop_app: 關(guān)閉應(yīng)用
  2. rabbitmqctl start_app: 啟動(dòng)應(yīng)用
  3. rabbitmqctl status: 查看節(jié)點(diǎn)狀態(tài)
  4. rabbitmqctl add_user username password: 添加用戶
  5. rabbitmqctl list_users: 列出所有用戶
  6. rabbitmqctl delete_user username: 刪除用戶
  7. rabbitmqctl clear_permissions -p vhostpath username: 清除用戶權(quán)限
  8. rabbitmqctl list_user_permissions username: 列出用戶權(quán)限
  9. rabbitmqctl change_password username newpassword: 修改密碼
  10. rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*": 設(shè)置用戶權(quán)限(權(quán)限分別為configure write read廊酣,也就是可以配置能耻、可寫、可讀)
  11. rabbitmqctl add_vhost vhostpath: 創(chuàng)建虛擬主機(jī)
  12. rabbitmqctl list_vhosts: 列出所有虛擬主機(jī)
  13. rabbitmqctl list_permissions -p vhostpath: 列出虛擬主機(jī)上所有權(quán)限
  14. rabbitmqctl list_queues: 查看所有隊(duì)列信息
  15. rabbitmqctl -p vhostpath purge_queue blue: 清楚隊(duì)列中的消息

RabbitMQ常用命令-高級(jí)操作

  1. rabbitmqctl reset: 移除所有數(shù)據(jù)亡驰,要在rabbitmqctl stop_app之后使用
  2. rabbitmqctl join_cluster <clusternode> [--ram]: 組成集群命令
  3. rabbitmqctl change_cluster_node_type <clusternode> disc | ram: 修改集群節(jié)點(diǎn)的存儲(chǔ)形式晓猛,disc為磁盤存儲(chǔ),消息數(shù)據(jù)是存儲(chǔ)在磁盤上的凡辱,可靠性高戒职,但是持久化時(shí)間長,ram是內(nèi)存存儲(chǔ)透乾,消息是存儲(chǔ)在內(nèi)存中洪燥,性能好,但是可能存在丟失
  4. rabbitmqctl forget_cluster_node [--offline]: 忘記節(jié)點(diǎn)(摘除節(jié)點(diǎn))
  5. rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2]...: 修改節(jié)點(diǎn)名稱

生產(chǎn)者消費(fèi)者模型構(gòu)建

  1. 創(chuàng)建好一個(gè)SpringBoot或者Spring或者普通的Java項(xiàng)目
  2. 安裝RabbitMQ相關(guān)依賴
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.5</version>
</dependency>
public class Producer {
    public static void main(String[] args) throws Exception {

        // 1. 創(chuàng)建一個(gè)ConnectionFactory乳乌,并且進(jìn)行相關(guān)連接配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通過連接工廠創(chuàng)建一個(gè)連接
        Connection connection = connectionFactory.newConnection();

        // 3. 通過Connection創(chuàng)建一個(gè)Channel
        Channel channel = connection.createChannel();

        // 4. 通過Channel發(fā)送數(shù)據(jù)
        /*
         * basicPublish的四個(gè)參數(shù)為別為:
         * exchange: 交換機(jī)捧韵,如果為空的,routingKey的規(guī)則就是routingKey需要和消息隊(duì)列的名稱一樣钦扭,不然就發(fā)送失敗
         * routingKey: 路由規(guī)則
         * properties: 消息的額外修飾
         * body: 消息體纫版,也就是消息的主要內(nèi)容
         */
        for (int i = 0; i < 5; i++) {
            String msg = "Hello, RabbitMQ!";
            channel.basicPublish("", "test001", null, msg.getBytes());
        }

        // 5. 關(guān)閉連接
        channel.close();
        connection.close();
    }
}
public class Consumer {
    public static void main(String[] args) throws Exception {

        // 1. 創(chuàng)建一個(gè)ConnectionFactory床绪,并且進(jìn)行相關(guān)連接配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通過連接工廠創(chuàng)建一個(gè)連接
        Connection connection = connectionFactory.newConnection();

        // 3. 通過Connection創(chuàng)建一個(gè)Channel
        Channel channel = connection.createChannel();

        // 4. 聲明一個(gè)隊(duì)列
        /*
         * queueDeclare方法的五個(gè)參數(shù)
         * queue: 隊(duì)列的名稱
         * durable: 是否是持久化客情,也就是RabbitMQ服務(wù)重啟之后消息隊(duì)列是否被保存,為true就是持久化癞己,服務(wù)重啟消息隊(duì)列不會(huì)被刪除
         * exclusive: 是否獨(dú)占膀斋,有點(diǎn)類似于獨(dú)占鎖
         * autoDelete: 是否開啟自動(dòng)刪除,也就是當(dāng)該消息隊(duì)列沒有被綁定到任何一個(gè)Exchange上時(shí)是否自動(dòng)刪除
         * arguments: 額外的參數(shù)
         */
        String queueName = "test001";
        channel.queueDeclare(queueName, true, true, false, null);

        // 5. 創(chuàng)建消費(fèi)者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        // 6. 設(shè)置Channel
        /*
         * basicConsume的三個(gè)參數(shù)的函數(shù)
         * queue: 隊(duì)列的名稱
         * autoAck: 是否自動(dòng)簽收痹雅,為true表示當(dāng)Consumer收到消息之后自動(dòng)發(fā)送ACK確定給Broker
         * callback: 指定消費(fèi)者
         */
        channel.basicConsume(queueName, true, queueingConsumer);

        // 7. 獲取消息
        while (true) {
            Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("消費(fèi)端:" + msg);
        }
    }
}

交換機(jī)Exchange詳解

交換機(jī)屬性

  1. Name:交換機(jī)名稱
  2. Type:交換機(jī)類型仰担,大致有direct、topic绩社、fanout摔蓝、headers四種
  3. Durability:是否需要持久化赂苗,true為持久化
  4. AutoDelete:當(dāng)最后一個(gè)綁定到Exchange上的隊(duì)列被刪除后,是否自動(dòng)刪除該Exchange
  5. Internal:當(dāng)前Exchange是否用于RabbitMQ內(nèi)部使用贮尉,默認(rèn)為false
  6. Arguments:擴(kuò)展參數(shù)拌滋,用于擴(kuò)展AMQP協(xié)議定制化使用

交換機(jī)類型 - Direct Exchange

所有發(fā)送到Direct Exchange上的消息都會(huì)被轉(zhuǎn)發(fā)到RoutingKey中指定的Queue中,在Direct模式下可以使用RabbitMQ自帶的Exchange:default Exchange猜谚,所以不需要將Exchange進(jìn)行任何綁定(binding)操作(默認(rèn)的RoutingKey就是隊(duì)列的名稱)败砂,消息傳遞時(shí),RoutingKey必須完全匹配(名稱完全一樣魏铅,不支持模糊匹配)才會(huì)被隊(duì)列接收昌犹,否則該消息會(huì)被拋棄。

public class Producer {
    public static void main(String[] args) throws Exception {

        // 1. 創(chuàng)建一個(gè)ConnectionFactory览芳,并且進(jìn)行相關(guān)連接配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通過連接工廠創(chuàng)建一個(gè)連接
        Connection connection = connectionFactory.newConnection();

        // 3. 通過Connection創(chuàng)建一個(gè)Channel
        Channel channel = connection.createChannel();

        // 4. 聲明Exchange的名稱和RoutingKey
        String exchangeName = "test_direct_exchange";
        String routingKey = "test.direct";

        // 5. 發(fā)送消息
        String msg = "Hello RabbitMQ - Direct Exchange Message...";
        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());

        // 6. 關(guān)閉連接
        channel.close();
        connection.close();
    }
}
public class Consumer {
    public static void main(String[] args) throws Exception {

        // 1. 創(chuàng)建一個(gè)ConnectionFactory斜姥,并且進(jìn)行相關(guān)連接配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通過連接工廠創(chuàng)建一個(gè)連接
        Connection connection = connectionFactory.newConnection();

        // 3. 通過Connection創(chuàng)建一個(gè)Channel
        Channel channel = connection.createChannel();

        // 4. 聲明Exchange、Queue沧竟、RoutingKey
        String exchangeName = "test_direct_exchange";
        String exchangeType = "direct";
        String queueName = "test_direct_queue";
        String routingKey = "test.direct";
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, true, true, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        // 5. 創(chuàng)建消費(fèi)者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        // 6. 設(shè)置Channel
        channel.basicConsume(queueName, true, queueingConsumer);

        // 7. 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("消費(fèi)端:" + msg);
        }
    }
}

交換機(jī)類型 - Topic Exchange

所有發(fā)送到Topic Exchange上的消息被轉(zhuǎn)發(fā)到所有關(guān)系RoutingKey中指定Topic的Queue中疾渴,Exchange將RoutingKey和某個(gè)Topic進(jìn)行模糊匹配,此時(shí)隊(duì)列需要綁定一個(gè)Topic屯仗。
上面這句話有點(diǎn)拗口搞坝,其實(shí)簡(jiǎn)單來說,就是當(dāng)Exchange的類型為topic時(shí)魁袜,RoutingKey是一組規(guī)則(不再僅僅表示一個(gè)規(guī)則桩撮,Direct Exchange中的RoutingKey就是一個(gè)規(guī)則,Producer傳遞的RoutingKey必須和Exchange中的RoutingKey名稱完全一致才能發(fā)送成功)峰弹,通過這組規(guī)則可以將多個(gè)RoutingKey和一個(gè)Queue進(jìn)行關(guān)聯(lián)店量,只要滿足RoutingKey的規(guī)則就會(huì)被路由到相關(guān)的隊(duì)列中(比如RoutingKey為log.#,只要符合這個(gè)規(guī)則的消息都會(huì)被路由到相關(guān)隊(duì)列中)鞠呈。
在制定RoutingKey時(shí)可以使用通配符進(jìn)行模糊匹配融师,符號(hào)#表示匹配一個(gè)或多個(gè)詞,*表示匹配一個(gè)詞(注意這里是詞蚁吝,而不是字符)旱爆,比如log.#可以匹配到log.info.oalog.*只能匹配到log.info窘茁,是匹配不到log.info.oa

public class Producer {
    public static void main(String[] args) throws Exception {

        // 1. 創(chuàng)建一個(gè)ConnectionFactory怀伦,并且進(jìn)行相關(guān)連接配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通過連接工廠創(chuàng)建一個(gè)連接
        Connection connection = connectionFactory.newConnection();

        // 3. 通過Connection創(chuàng)建一個(gè)Channel
        Channel channel = connection.createChannel();

        // 4. 聲明Exchange的名稱和RoutingKey
        String exchangeName = "test_topic_exchange";
        String routingKey1 = "log.info.oa";
        String routingKey2 = "log.error";
        String routingKey3 = "log.debug";

        // 5. 發(fā)送消息
        String msg = "Hello RabbitMQ - Topic Exchange Message...";
        channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
        channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
        channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());

        // 6. 關(guān)閉連接
        channel.close();
        connection.close();
    }
}
public class Consumer {
    public static void main(String[] args) throws Exception {

        // 1. 創(chuàng)建一個(gè)ConnectionFactory,并且進(jìn)行相關(guān)連接配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通過連接工廠創(chuàng)建一個(gè)連接
        Connection connection = connectionFactory.newConnection();

        // 3. 通過Connection創(chuàng)建一個(gè)Channel
        Channel channel = connection.createChannel();

        // 4. 聲明Exchange山林、Queue房待、RoutingKey
        String exchangeName = "test_topic_exchange";
        String exchangeType = "topic";
        String queueName = "test_topic_queue";
        // String routingKey = "log.*";
        String routingKey = "log.#";
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, true, true, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        // 5. 創(chuàng)建消費(fèi)者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        // 6. 設(shè)置Channel
        channel.basicConsume(queueName, true, queueingConsumer);

        // 7. 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("消費(fèi)端:" + msg);
        }
    }
}

交換機(jī)類型 - Fanout Exchange

該種交換機(jī)類型是不會(huì)處理RoutingKey的,只會(huì)簡(jiǎn)單地將隊(duì)列綁定到交換機(jī)上,發(fā)送到交換機(jī)的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上桑孩,F(xiàn)anout Exchange是轉(zhuǎn)發(fā)消息最快的拜鹤,因?yàn)椴粫?huì)處理路由相關(guān)的操作,即使指定了RoutingKey也不會(huì)理會(huì)

public class Producer {
    public static void main(String[] args) throws Exception {

        // 1. 創(chuàng)建一個(gè)ConnectionFactory流椒,并且進(jìn)行相關(guān)連接配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通過連接工廠創(chuàng)建一個(gè)連接
        Connection connection = connectionFactory.newConnection();

        // 3. 通過Connection創(chuàng)建一個(gè)Channel
        Channel channel = connection.createChannel();

        // 4. 聲明Exchange的名稱和RoutingKey
        String exchangeName = "test_fanout_exchange";
        // 指定了RoutingKey也沒有作用
        String routingKey = "log.debug";

        // 5. 發(fā)送消息
        String msg = "Hello RabbitMQ - Fanout Exchange Message...";
        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());

        // 6. 關(guān)閉連接
        channel.close();
        connection.close();
    }
}
public class Consumer {
    public static void main(String[] args) throws Exception {

        // 1. 創(chuàng)建一個(gè)ConnectionFactory署惯,并且進(jìn)行相關(guān)連接配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通過連接工廠創(chuàng)建一個(gè)連接
        Connection connection = connectionFactory.newConnection();

        // 3. 通過Connection創(chuàng)建一個(gè)Channel
        Channel channel = connection.createChannel();

        // 4. 聲明Exchange、Queue镣隶、RoutingKey
        String exchangeName = "test_fanout_exchange";
        String exchangeType = "fanout";
        String queueName = "test_fanout_queue";
        String routingKey = "test";
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, true, true, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        // 5. 創(chuàng)建消費(fèi)者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        // 6. 設(shè)置Channel
        channel.basicConsume(queueName, true, queueingConsumer);

        // 7. 獲取消息
        while (true) {
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("消費(fèi)端:" + msg);
        }
    }
}

綁定极谊、隊(duì)列、消息安岂、虛擬主機(jī)詳解

綁定Binding

是指Exchange和Exchange轻猖、Exchange和Queue之間的連接關(guān)系

隊(duì)列

是指消息隊(duì)列,實(shí)際存儲(chǔ)消息數(shù)據(jù)的域那。包含一些屬性咙边,比如Durability表示是否持久化,Durable就是持久化次员,Transient表示不持久化败许;Autodelete表示當(dāng)最后一個(gè)監(jiān)聽被移除后,該Queue是否被自動(dòng)刪除淑蔚。

Message

是指服務(wù)器和應(yīng)用程序之間傳送的數(shù)據(jù)市殷,本質(zhì)上就是一段數(shù)據(jù),由Properties和Payload(Body)組成刹衫,也包含一些屬性醋寝,比如delivery modeheaders(自定義屬性)带迟、content_type音羞、content_encodingpriority仓犬、correlation_id嗅绰、reply_toexpiration搀继、message_id窘面、timestamptype律歼、user_id民镜、app_id啡专、cluster_id险毁。

如何發(fā)送攜帶Properties的Message呢?

public class Producer {
    public static void main(String[] args) throws Exception {

        // 1. 創(chuàng)建一個(gè)ConnectionFactory,并且進(jìn)行相關(guān)連接配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通過連接工廠創(chuàng)建一個(gè)連接
        Connection connection = connectionFactory.newConnection();

        // 3. 通過Connection創(chuàng)建一個(gè)Channel
        Channel channel = connection.createChannel();

        // 4. 通過Channel發(fā)送數(shù)據(jù)

        Map<String, Object> headers = new HashMap<>();
        headers.put("name", "snow");
        headers.put("sex", "man");
        
        // 設(shè)置Properties
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                .deliveryMode(2)
                .expiration("15000")
                .contentEncoding("UTF-8")
                .headers(headers)
                .build();

        for (int i = 0; i < 5; i++) {
            String msg = "Hello, RabbitMQ!";
            channel.basicPublish("", "test001", properties, msg.getBytes());
        }

        // 5. 關(guān)閉連接
        channel.close();
        connection.close();
    }
}
public class Consumer {
    public static void main(String[] args) throws Exception {

        // 1. 創(chuàng)建一個(gè)ConnectionFactory畔况,并且進(jìn)行相關(guān)連接配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通過連接工廠創(chuàng)建一個(gè)連接
        Connection connection = connectionFactory.newConnection();

        // 3. 通過Connection創(chuàng)建一個(gè)Channel
        Channel channel = connection.createChannel();

        // 4. 聲明一個(gè)隊(duì)列
        String queueName = "test001";
        channel.queueDeclare(queueName, true, false, false, null);

        // 5. 創(chuàng)建消費(fèi)者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        // 6. 設(shè)置Channel
        channel.basicConsume(queueName, true, queueingConsumer);

        // 7. 獲取消息
        while (true) {
            Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            Map<String, Object> headers = delivery.getProperties().getHeaders();
            System.out.println("消費(fèi)端:" + msg);
            System.out.println(headers.get("name"));
        }
    }
}

RabbitMQ高級(jí)特性

消息如何保證100%的投遞成功方案-1

什么是生產(chǎn)端的可靠性投遞?

  1. 保障消息的成功發(fā)出
  2. 保障MQ節(jié)點(diǎn)的成功接收
  3. 發(fā)送端收到MQ節(jié)點(diǎn)(Broker)的確認(rèn)應(yīng)答
  4. 完善的消息補(bǔ)償機(jī)制(也就是消息投遞失敗或者未收到Broker的確認(rèn)應(yīng)答的補(bǔ)償措施)

消息可靠性投遞的解決方案

  1. 消息落庫,對(duì)消息狀態(tài)進(jìn)行打標(biāo)
  2. 消息的延遲投遞季稳,做二次確認(rèn)麸俘,回調(diào)檢查

[圖片上傳失敗...(image-d40d42-1636988003182)]

  1. Producer端首先將業(yè)務(wù)信息入庫,同時(shí)創(chuàng)建一條消息入庫吵瞻,設(shè)置消息的status為0(表示消息已經(jīng)投遞)
  2. Producer端生成一條消息Message投遞到Broker
  3. Broker收到消息之后葛菇,發(fā)送確認(rèn)Confirm返回給Producer
  4. Producer收到Broker發(fā)送過來的Confirm之后,就將消息數(shù)據(jù)庫中消息的狀態(tài)為1(表示消息已經(jīng)投遞成功)
  5. 因?yàn)椴襟E2和步驟3都有可能發(fā)生故障橡羞,也就是消息投遞失敗眯停,或者網(wǎng)絡(luò)等原因造成Producer未收到Broker發(fā)送過來的Confirm消息,所以需要開啟一個(gè)分布式定時(shí)任務(wù)從消息數(shù)據(jù)庫中抓取status為0的消息
  6. 將抓取出來的status為0的消息重新投遞給Broker卿泽,重復(fù)上述動(dòng)作
  7. 因?yàn)樵跇O端狀況下有些消息可能就是會(huì)投遞失敗莺债,不能無休止地重新投遞,可以設(shè)置一個(gè)投遞上限签夭,比如最大重新投遞次數(shù)為3齐邦,如果3次投遞均失敗,就將消息數(shù)據(jù)庫中的消息狀態(tài)設(shè)置為3第租,之后再建立補(bǔ)償措施來對(duì)status為3的消息進(jìn)行處理

缺點(diǎn):由于在最開始進(jìn)行了兩次入庫的操作措拇,所以在高并發(fā)的情況下其實(shí)會(huì)有性能上的問題。

消息如何保證100%的投遞成功方案-2

[圖片上傳失敗...(image-7aeb0b-1636988003182)]

  1. Producer端首先對(duì)業(yè)務(wù)消息進(jìn)行入庫慎宾,然后同時(shí)生成兩條相同的消息儡羔,一條消息立即發(fā)出,另一條消息延遲一段時(shí)間再次發(fā)出
  2. Consumer端對(duì)消息隊(duì)列進(jìn)行監(jiān)聽璧诵,從中取出消息進(jìn)行消費(fèi)汰蜘,在消費(fèi)完一條消息之后,需要向Broker發(fā)送一個(gè)消費(fèi)確認(rèn)Confirm之宿,表示該條消息已被消費(fèi)
  3. Callback Service對(duì)Consumer端發(fā)送的消費(fèi)確認(rèn)消息進(jìn)行監(jiān)聽族操,如果收到了Consumer端發(fā)送過來的消費(fèi)確認(rèn),就將消息數(shù)據(jù)庫中的消息進(jìn)行入庫
  4. 同時(shí)Callback還會(huì)對(duì)Producer端發(fā)送的另一條延遲消息進(jìn)行監(jiān)聽比被,如果收到了Producer發(fā)送過來的延遲消息色难,就從消息數(shù)據(jù)庫中查詢?cè)摋l消息是否已被消費(fèi),如果查詢不到或者消息消費(fèi)失敗等缀,Callback Service就通知Producer進(jìn)行消息重發(fā)

優(yōu)點(diǎn):由于最開始只是進(jìn)行了一次入庫的操作枷莉,性能得到了較大的提升,而Callback Service是一個(gè)補(bǔ)償措施尺迂,對(duì)業(yè)務(wù)的性能并不會(huì)產(chǎn)生實(shí)際的影響

具體的實(shí)現(xiàn)請(qǐng)參考:RabbitMQ之消息可靠性投遞實(shí)現(xiàn)

冪等性概念及業(yè)界主流解決方案

什么是冪等性笤妙?
通俗來說冒掌,就是假如我們要對(duì)一件事進(jìn)行操作,這個(gè)操作可能重復(fù)進(jìn)行100次或者1000次蹲盘,那么無論操作多少次股毫,這些操作的結(jié)果都是一樣的,就像數(shù)據(jù)庫中的樂觀鎖機(jī)制召衔,比如我們多個(gè)線程同時(shí)更新庫存的SQL語句铃诬,不采用樂觀鎖的機(jī)制的話可能會(huì)存在線程安全問題導(dǎo)致數(shù)據(jù)不一致,update sku set count = count - 1, version = version + 1 where version = 1苍凛,加上一個(gè)樂觀鎖來保證線程安全趣席,當(dāng)然樂觀鎖的背后采用的原理是CAS(CompareAndSwap,也就是先比較然后再替換醇蝴,保證操作的原子性)吩坝。

在海量訂單產(chǎn)生的業(yè)務(wù)高峰期,如何避免消息的重復(fù)消費(fèi)問題哑蔫?
在業(yè)務(wù)高峰期钉寝,可能會(huì)存在網(wǎng)絡(luò)原因或者其他原因?qū)е翽roducer端的消息重發(fā),消費(fèi)端要實(shí)現(xiàn)冪等性闸迷,就意味著我們的消息永遠(yuǎn)不會(huì)消費(fèi)多次嵌纲,即使我們收到了多條一樣的消息,解決方案大致有兩種:

  1. 唯一ID + 指紋碼 機(jī)制腥沽,利用數(shù)據(jù)庫主鍵去重
  2. 利用Redis的原子性去實(shí)現(xiàn)

唯一ID + 指紋碼 機(jī)制

  1. 唯一ID + 指紋碼 機(jī)制逮走,利用數(shù)據(jù)庫進(jìn)行主鍵去重
  2. select count(1) from order where id = 唯一ID + 指紋碼,在消費(fèi)的時(shí)候先進(jìn)行查詢今阳,如果查詢結(jié)果為1的話就表示已經(jīng)被消費(fèi)過了就不再重復(fù)進(jìn)行消費(fèi)了师溅,沒有查詢出結(jié)果的話就說明沒有被消費(fèi),就進(jìn)行數(shù)據(jù)庫的入庫
  3. 好處:實(shí)現(xiàn)簡(jiǎn)單
  4. 壞處:高并發(fā)下有數(shù)據(jù)庫寫入的性能瓶頸
  5. 解決方案:根據(jù)ID進(jìn)行分庫分表盾舌,進(jìn)行算法路由墓臭,比如對(duì)ID進(jìn)行路由算法路由到不同的數(shù)據(jù)庫中,分?jǐn)傉麄€(gè)數(shù)據(jù)流量的壓力

利用Redis原子特性實(shí)現(xiàn)

  1. 使用Redis實(shí)現(xiàn)消費(fèi)端的冪等妖谴,有幾個(gè)需要考慮的問題
  2. 第一:是否要進(jìn)行數(shù)據(jù)庫入庫的操作窿锉,如果要入庫的話,如何使得數(shù)據(jù)庫和緩存的入庫做到原子性膝舅,也就是如何實(shí)現(xiàn)數(shù)據(jù)庫和緩存的數(shù)據(jù)一致性嗡载,因?yàn)橛锌赡艹霈F(xiàn)這樣的情況,redis中保存了該order的數(shù)據(jù)仍稀,但是在保存到數(shù)據(jù)庫的時(shí)候出現(xiàn)了問題洼滚,導(dǎo)致數(shù)據(jù)庫中沒有保存成功,然后如何保證數(shù)據(jù)準(zhǔn)確地被同時(shí)保存在數(shù)據(jù)庫中呢技潘?
  3. 第二:如果不進(jìn)行數(shù)據(jù)庫入庫的話遥巴,那么都存儲(chǔ)到緩存redis中千康,又如何設(shè)置定時(shí)同步的策略呢,因?yàn)閿?shù)據(jù)不可能一直保存在redis中挪哄,而且就算一直保存在redis中吧秕,redis服務(wù)也有可能會(huì)出現(xiàn)問題琉闪,這也是需要重點(diǎn)考慮的問題

Confirm確認(rèn)消息詳解

什么是Confirm消息確認(rèn)機(jī)制迹炼?
消息的確認(rèn),是指Producer投遞消息后颠毙,如果Broker收到消息斯入,則會(huì)給我們Producer一個(gè)應(yīng)答,Producer進(jìn)行接收應(yīng)答蛀蜜,用來確定這條消息是否正常地發(fā)送到了Broker刻两,這種方式也是消息的可靠性投遞的核心保障。

如何實(shí)現(xiàn)Confirm確認(rèn)消息滴某?

  1. 在channel上開啟確認(rèn)模式:channel.confirmSelect()
  2. 在channel上添加監(jiān)聽:addConfirmListener磅摹,監(jiān)聽成功或者失敗的返回結(jié)果,根據(jù)具體的結(jié)果對(duì)消息進(jìn)行重新發(fā)送或者日志記錄等后續(xù)處理
public class Producer {
    public static void main(String[] args) throws Exception {

        // 1. 創(chuàng)建一個(gè)ConnectionFactory霎奢,并且進(jìn)行相關(guān)連接配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通過連接工廠創(chuàng)建一個(gè)連接
        Connection connection = connectionFactory.newConnection();

        // 3. 通過Connection創(chuàng)建一個(gè)Channel
        Channel channel = connection.createChannel();

        // 4. 指定消息投遞模式:消息的確認(rèn)模式
        channel.confirmSelect();

        String exchangeName = "test_confirm_exchange";
        String routingKey = "confirm.save";

        // 5. 發(fā)送消息
        String msg = "Hello RabbitMQ! Send a confirm message.";
        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());

        // 6. 添加一個(gè)確認(rèn)監(jiān)聽
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("------ACK!------");
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("------NO ACK!------");
            }
        });
    }
}
public class Consumer {
    public static void main(String[] args) throws Exception {
        // 1. 創(chuàng)建一個(gè)ConnectionFactory户誓,并且進(jìn)行相關(guān)連接配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通過連接工廠創(chuàng)建一個(gè)連接
        Connection connection = connectionFactory.newConnection();

        // 3. 通過Connection創(chuàng)建一個(gè)Channel
        Channel channel = connection.createChannel();

        // 4. 聲明Exchange、Queue幕侠、RoutingKey
        String exchangeName = "test_confirm_exchange";
        String routingKey = "confirm.*";
        String queueName = "test_confirm_queue";
        channel.exchangeDeclare(exchangeName, "topic", true);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        // 5. 創(chuàng)建消費(fèi)者消費(fèi)消息
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, queueingConsumer);

        while (true) {
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("消費(fèi)端: " + msg);
        }

    }
}

Return返回消息詳解

什么是Return返回消息機(jī)制帝美?
ReturnListener用于處理一些不可路由的消息,Producer生產(chǎn)一條消息之后晤硕,通過指定一個(gè)Exchange和RoutingKey悼潭,將消息送達(dá)到某一個(gè)隊(duì)列中去,然后Consumer監(jiān)聽隊(duì)列舞箍,進(jìn)行消息的消費(fèi)處理操作舰褪,但是在某些情況下,Producer在投遞消息的時(shí)候疏橄,指定的Exchange不存在或者RoutingKey路由不到抵知,就說明消息投遞失敗,這個(gè)時(shí)候如果需要監(jiān)聽這種不可達(dá)的消息软族,就需要使用ReturnListener刷喜。
在使用ReturnListener的基礎(chǔ)API時(shí)有一個(gè)關(guān)鍵的配置項(xiàng)是Mandatory,該參數(shù)為true立砸,則ReturnListener會(huì)接收到路由不可達(dá)的消息掖疮,然后進(jìn)行后續(xù)的處理,如果為false颗祝,那么Broker端會(huì)自動(dòng)刪除該消息浊闪,ReturnListener是監(jiān)聽不到的恼布。

public class Producer {
    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        String exchangeName = "test_return_exchange";
        String routingKey = "return.save";
        String routingKeyError = "snow.save";

        String msg = "Hello RabbitMQ! Send a Return message.";
        boolean mandatory = true;
        channel.basicPublish(exchangeName, routingKeyError, mandatory, null, msg.getBytes());

        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("----handle return----");
                System.out.println("replyText: " + replyText);
                System.out.println("exchange: " + exchange);
                System.out.println("routingKey: " + routingKey);
                System.out.println("properties: " + properties);
                System.out.println("body: " + new String(body));
            }
        });
    }
}
public class Consumer {
    public static void main(String[] args) throws Exception {
        // 1. 創(chuàng)建一個(gè)ConnectionFactory,并且進(jìn)行相關(guān)連接配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通過連接工廠創(chuàng)建一個(gè)連接
        Connection connection = connectionFactory.newConnection();

        // 3. 通過Connection創(chuàng)建一個(gè)Channel
        Channel channel = connection.createChannel();

        // 4. 聲明Exchange搁宾、Queue折汞、RoutingKey
        String exchangeName = "test_return_exchange";
        String routingKey = "return.*";
        String queueName = "test_return_queue";
        channel.exchangeDeclare(exchangeName, "topic", true);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        // 5. 創(chuàng)建消費(fèi)者消費(fèi)消息
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, queueingConsumer);

        while (true) {
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("消費(fèi)端: " + msg);
        }

    }
}

自定義消費(fèi)者使用

如何自定義消費(fèi)者進(jìn)行消息消費(fèi)?
在之前盖腿,我們都是采用默認(rèn)的QueueingConsumer來創(chuàng)建一個(gè)消費(fèi)者爽待,之后再使用while循環(huán)來不停地取出消息,但是這種方式不是特別好翩腐,一般我們會(huì)自定義自己的Consumer鸟款,那么要實(shí)現(xiàn)自定義的Consumer有兩種方式,一種是實(shí)現(xiàn)Consumer的接口茂卦,但是這種實(shí)現(xiàn)方式需要重寫很多方法何什,另一種是繼承DefaultConsumer,重寫其中的

public class Producer {
    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        String exchangeName = "test_consumer_exchange";
        String routingKey = "consumer.save";

        String msg = "Hello RabbitMQ! Send a Consumer message.";
        for (int i = 0; i < 5; i++) {
            channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
        }
    }
}
public class Consumer {
    public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建一個(gè)ConnectionFactory等龙,并且進(jìn)行相關(guān)連接配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通過連接工廠創(chuàng)建一個(gè)連接
        Connection connection = connectionFactory.newConnection();

        // 3. 通過Connection創(chuàng)建一個(gè)Channel
        Channel channel = connection.createChannel();

        // 4. 聲明Exchange处渣、Queue、RoutingKey
        String exchangeName = "test_consumer_exchange";
        String routingKey = "consumer.*";
        String queueName = "test_consumer_queue";
        channel.exchangeDeclare(exchangeName, "topic", true);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        // 5. 創(chuàng)建消費(fèi)者消費(fèi)消息
        channel.basicConsume(queueName, true, new MyConsumer(channel));
    }
}
public class MyConsumer extends DefaultConsumer {
    public MyConsumer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("----my consumer handle delivery----");
        System.out.println("consumerTag: " + consumerTag);
        System.out.println("envelope: " + envelope);
        System.out.println("properties: " + properties);
        System.out.println("body: " + new String(body));
    }
}

消費(fèi)端的限流策略

什么是消費(fèi)端的限流蛛砰?
假設(shè)一個(gè)場(chǎng)景罐栈,就是我們的RabbitMQ服務(wù)器有上萬條未處理的消息,此時(shí)如果我們隨便打開一個(gè)消費(fèi)者客戶端暴备,會(huì)出現(xiàn)下面的情況悠瞬,就是巨量的消息瞬間全部推送過來,但是我們的單個(gè)客戶端無法同時(shí)處理這么多數(shù)據(jù)涯捻,就有可能造成服務(wù)器崩潰浅妆。
RabbitMQ提供了一種qos(Quality of Service 服務(wù)質(zhì)量保證)功能,即在非自動(dòng)確認(rèn)消息(autoAck為false)的前提下障癌,如果一定數(shù)目的消息(通過基于Consumer或者channel設(shè)置的Qos的值)未被確認(rèn)前凌外,不進(jìn)行消費(fèi)新的消息。
void BasicQos(uint prefetchSize, ushort prefetchCount, bool global)中的prefetchSize表示單個(gè)消息的大小涛浙,為0表示不限制單個(gè)消息的大小康辑,prefetchCount會(huì)告訴RabbitMQ不要同時(shí)給一個(gè)消費(fèi)者推送超過N個(gè)消息,即一旦有N個(gè)消息還沒有Ack轿亮,則該Consumer就將block阻塞住疮薇,直到有消息被Ack,global表示是否將前兩個(gè)參數(shù)的設(shè)置應(yīng)用于channel我注,簡(jiǎn)單點(diǎn)說就是前兩個(gè)限制是channel級(jí)別還是Consumer級(jí)別的按咒,一般設(shè)置為false,表示Consumer級(jí)別(prefetchCount只在autoAck為false的情況下才會(huì)生效但骨,在自動(dòng)Ack的情況下是無效的)

public class Producer {
    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        String exchangeName = "test_qos_exchange";
        String routingKey = "qos.save";

        String msg = "Hello RabbitMQ! Send a QOS message.";
        for (int i = 0; i < 5; i++) {
            channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
        }
    }
}
public class Consumer {
    public static void main(String[] args) throws Exception {
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通過連接工廠創(chuàng)建一個(gè)連接
        Connection connection = connectionFactory.newConnection();

        // 3. 通過Connection創(chuàng)建一個(gè)Channel
        Channel channel = connection.createChannel();

        // 4. 聲明Exchange励七、Queue智袭、RoutingKey
        String exchangeName = "test_qos_exchange";
        String routingKey = "qos.*";
        String queueName = "test_qos_queue";
        channel.exchangeDeclare(exchangeName, "topic", true);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        // 5. 限流,記得將basicConsume方法中的autoAck的值設(shè)置為false
        channel.basicQos(0, 1, false);

        channel.basicConsume(queueName, false, new MyConsumer(channel));
    }
}
public class MyConsumer extends DefaultConsumer {

    private Channel channel;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("----my consumer handle delivery----");
        System.out.println("consumerTag: " + consumerTag);
        System.out.println("envelope: " + envelope);
        System.out.println("properties: " + properties);
        System.out.println("body: " + new String(body));

        // 消息的Ack確認(rèn)掠抬,basicAck的第一個(gè)參數(shù)為消息的deliveryTag吼野,第二個(gè)參數(shù)為是否批量簽收,如果限制的消息個(gè)數(shù)大于1两波,可以設(shè)置為true
        this.channel.basicAck(envelope.getDeliveryTag(), false);
    }
}

消費(fèi)端ACK與重回隊(duì)列機(jī)制

消費(fèi)端的手工ACK和NACK為什么會(huì)存在瞳步?

  1. 消費(fèi)端在進(jìn)行消息消費(fèi)的時(shí)候,如果由于業(yè)務(wù)異常我們可以進(jìn)行日志的記錄雨女,然后進(jìn)行補(bǔ)償谚攒,如果采用自動(dòng)ACK的話就達(dá)不到需求
  2. 如果由于服務(wù)器宕機(jī)等嚴(yán)重問題阳准,我們也需要手工進(jìn)行ACK來保障消費(fèi)端消費(fèi)成功氛堕,因?yàn)橄M(fèi)者宕機(jī)后,Broker收不到ACK或者NACK野蝇,就會(huì)重新發(fā)送消息給消費(fèi)端再次消費(fèi)讼稚,因?yàn)樵谧詣?dòng)ACK的機(jī)制下Broker發(fā)送消息給消費(fèi)者時(shí),自動(dòng)確認(rèn)消息被處理完畢

消費(fèi)端的重回隊(duì)列機(jī)制

  1. 消費(fèi)端重回隊(duì)列是為了將沒有處理成功的消息重新投遞給Broker
  2. 一般在實(shí)際應(yīng)用中绕沈,都會(huì)關(guān)閉重回隊(duì)列锐想,也就是將requeue設(shè)置為false
public class Producer {
    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        String exchangeName = "test_ack_exchange";
        String routingKey = "ack.save";

        for (int i = 0; i < 5; i++) {

            Map<String, Object> headers = new HashMap<>();
            headers.put("num", i);

            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    .headers(headers)
                    .build();

            String msg = "Hello RabbitMQ! Send a ACK message." + i;
            channel.basicPublish(exchangeName, routingKey, properties, msg.getBytes());
        }
    }
}
public class Consumer {
    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通過連接工廠創(chuàng)建一個(gè)連接
        Connection connection = connectionFactory.newConnection();

        // 3. 通過Connection創(chuàng)建一個(gè)Channel
        Channel channel = connection.createChannel();

        // 4. 聲明Exchange、Queue乍狐、RoutingKey
        String exchangeName = "test_ack_exchange";
        String routingKey = "ack.*";
        String queueName = "test_ack_queue";
        channel.exchangeDeclare(exchangeName, "topic", true);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        // 將autoAck設(shè)置為false赠摇,手工Ack確認(rèn)
        channel.basicConsume(queueName, false, new MyConsumer(channel));
    }
}
public class MyConsumer extends DefaultConsumer {

    private final Channel channel;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("----my consumer handle delivery----");
        System.out.println("body: " + new String(body));

        if ((Integer) properties.getHeaders().get("num") == 0) {
            // 第三個(gè)參數(shù)requeue表示是否重回隊(duì)列
            this.channel.basicNack(envelope.getDeliveryTag(), false, false);
        } else {
            // 消息的Ack確認(rèn),basicAck的第一個(gè)參數(shù)為消息的deliveryTag浅蚪,第二個(gè)參數(shù)為是否批量簽收藕帜,如果限制的消息個(gè)數(shù)大于1,可以設(shè)置為true
            this.channel.basicAck(envelope.getDeliveryTag(), false);
        }
    }
}

TTL消息詳解

  1. TTL 是Time To Live的縮寫惜傲,也就是生存時(shí)間
  2. RabbitMQ支持消息的過期時(shí)間洽故,在消息發(fā)送的時(shí)候可以再Properties中指定expiration過期時(shí)間
  3. RabbitMQ支持隊(duì)列的過期時(shí)間,從消息入隊(duì)列開始計(jì)算盗誊,如果超過了隊(duì)列設(shè)置的超時(shí)時(shí)間配置還沒有被消費(fèi)时甚,該消息就會(huì)被自動(dòng)清除

死信隊(duì)列詳解

死信隊(duì)列 DLX Dead-Letter-Exchange

  1. 利用DLX,當(dāng)消息在一個(gè)隊(duì)列中變成死信(dead message)之后哈踱,它能被重新publish到另一個(gè)Exchange荒适,這個(gè)Exchange就是DLX
  2. DLX也是一個(gè)正常的Exchange,和一般的Exchange沒有什么區(qū)別开镣,它可以在任何隊(duì)列上被指定(也就是需要設(shè)置隊(duì)列的屬性)刀诬,這樣的話只要這個(gè)隊(duì)列中有死信就會(huì)被重新發(fā)布到DLX中
  3. 當(dāng)設(shè)置了DLX的隊(duì)列中有死信時(shí),RabbitMQ就會(huì)自動(dòng)將這個(gè)死信重新發(fā)布到設(shè)置的Exchange中去哑子,從而被路由到另一個(gè)隊(duì)列
  4. 可以監(jiān)聽這個(gè)隊(duì)列中的消息做相應(yīng)的處理舅列,這個(gè)特性可以彌補(bǔ)RabbitMQ3.0版本以前支持的immediate參數(shù)的功能

消息變成死信的情況

  1. 消息被拒絕或消費(fèi)失敿「睢(basicReject/basicNack)并且requeue為false(不重回隊(duì)列)
  2. 消息TTL過期
  3. 隊(duì)列達(dá)到最大長度

死信隊(duì)列的設(shè)置
首先要設(shè)置死信隊(duì)列的Exchange和Queue,然后進(jìn)行綁定

  1. Exchange: dlx.exchange(名字可以任意日室)
  2. Queue: dlx.queue(名字可以任意劝殉ā)
  3. RoutingKey: # (為#表示任何消息都可以被路由到dlx.queue中)
    然后再進(jìn)行正常的交換機(jī)、隊(duì)列聲明和綁定榨惠,只不過需要再被設(shè)置死信隊(duì)列的隊(duì)列中加上一個(gè)參數(shù):arguments.put("x-dead-letter-exchange", "dlx.exchange")奋早,這樣消息在過期、不重回隊(duì)列赠橙、隊(duì)列達(dá)到最大長度時(shí)被直接路由到死信隊(duì)列中
public class Producer {
    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        String exchangeName = "test_dlx_exchange";
        String routingKey = "dlx.save";

        for (int i = 0; i < 1; i++) {

            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                    .deliveryMode(2)
                    .expiration("10000")
                    .contentEncoding("UTF-8")
                    .build();

            String msg = "Hello RabbitMQ! Send a ACK message.";
            channel.basicPublish(exchangeName, routingKey, properties, msg.getBytes());
        }
    }
}
public class Consumer {
    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        // 2. 通過連接工廠創(chuàng)建一個(gè)連接
        Connection connection = connectionFactory.newConnection();

        // 3. 通過Connection創(chuàng)建一個(gè)Channel
        Channel channel = connection.createChannel();

        // 4. 聲明Exchange耽装、Queue、RoutingKey
        String exchangeName = "test_dlx_exchange";
        String routingKey = "dlx.*";
        String queueName = "test_dlx_queue";

        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "dlx.exchange");

        channel.exchangeDeclare(exchangeName, "topic", true);
        channel.queueDeclare(queueName, true, false, false, arguments);
        channel.queueBind(queueName, exchangeName, routingKey);

        // 死信隊(duì)列的聲明
        channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
        channel.queueDeclare("dlx.queue", true, false, false, null);
        channel.queueBind("dlx.queue", "dlx.exchange", "#");

        // 將autoAck設(shè)置為false期揪,手工Ack確認(rèn)
        channel.basicConsume(queueName, false, new MyConsumer(channel));
    }
}
public class MyConsumer extends DefaultConsumer {

    private final Channel channel;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("----my consumer handle delivery----");
        System.out.println("body: " + new String(body));
        // 消息的Ack確認(rèn)掉奄,basicAck的第一個(gè)參數(shù)為消息的deliveryTag,第二個(gè)參數(shù)為是否批量簽收凤薛,如果限制的消息個(gè)數(shù)大于1姓建,可以設(shè)置為true
        this.channel.basicAck(envelope.getDeliveryTag(), false);
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市缤苫,隨后出現(xiàn)的幾起案子速兔,更是在濱河造成了極大的恐慌,老刑警劉巖活玲,帶你破解...
    沈念sama閱讀 222,000評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件涣狗,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡舒憾,警方通過查閱死者的電腦和手機(jī)镀钓,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,745評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來珍剑,“玉大人掸宛,你說我怎么就攤上這事≌凶荆” “怎么了唧瘾?”我有些...
    開封第一講書人閱讀 168,561評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長别凤。 經(jīng)常有香客問我饰序,道長,這世上最難降的妖魔是什么规哪? 我笑而不...
    開封第一講書人閱讀 59,782評(píng)論 1 298
  • 正文 為了忘掉前任求豫,我火速辦了婚禮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘蝠嘉。我一直安慰自己最疆,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,798評(píng)論 6 397
  • 文/花漫 我一把揭開白布蚤告。 她就那樣靜靜地躺著努酸,像睡著了一般。 火紅的嫁衣襯著肌膚如雪杜恰。 梳的紋絲不亂的頭發(fā)上获诈,一...
    開封第一講書人閱讀 52,394評(píng)論 1 310
  • 那天,我揣著相機(jī)與錄音心褐,去河邊找鬼舔涎。 笑死,一個(gè)胖子當(dāng)著我的面吹牛逗爹,可吹牛的內(nèi)容都是我干的亡嫌。 我是一名探鬼主播,決...
    沈念sama閱讀 40,952評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼桶至,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼昼伴!你這毒婦竟也來了匾旭?” 一聲冷哼從身側(cè)響起镣屹,我...
    開封第一講書人閱讀 39,852評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎价涝,沒想到半個(gè)月后女蜈,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,409評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡色瘩,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,483評(píng)論 3 341
  • 正文 我和宋清朗相戀三年伪窖,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片居兆。...
    茶點(diǎn)故事閱讀 40,615評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡覆山,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出泥栖,到底是詐尸還是另有隱情簇宽,我是刑警寧澤,帶...
    沈念sama閱讀 36,303評(píng)論 5 350
  • 正文 年R本政府宣布吧享,位于F島的核電站魏割,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏钢颂。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,979評(píng)論 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望尼桶。 院中可真熱鬧锯仪,春花似錦疯汁、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,470評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽溢豆。三九已至,卻和暖如春瘸羡,著一層夾襖步出監(jiān)牢的瞬間犹赖,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,571評(píng)論 1 272
  • 我被黑心中介騙來泰國打工麸折, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留粘昨,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 49,041評(píng)論 3 377
  • 正文 我出身青樓芭析,卻偏偏與公主長得像吞瞪,于是被迫代替她去往敵國和親芍秆。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,630評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容