RabbitMQ筆記

?? 文章中所有的示例代碼全部上傳到自己的Github中聋庵,需要參考請(qǐng)移步RabbitMQ

消息隊(duì)列

什么是消息隊(duì)列?

在我真正開(kāi)始學(xué)習(xí)消息隊(duì)列之前就經(jīng)常聽(tīng)到這個(gè)詞,常見(jiàn)于各類(lèi)面試題和高并發(fā)場(chǎng)景孩擂,一般都會(huì)伴隨著分布式系統(tǒng)出現(xiàn),當(dāng)時(shí)自己覺(jué)得這個(gè)真的好難箱熬,好深?yuàn)W类垦,但是學(xué)習(xí)一段時(shí)間后發(fā)現(xiàn)實(shí)際上消息隊(duì)列的使用實(shí)際上并不難狈邑,難點(diǎn)在于對(duì)于業(yè)務(wù)的理解和模型的建立以及對(duì)出現(xiàn)的問(wèn)題處理這方面。

究竟什么是消息隊(duì)列?

計(jì)算機(jī)科學(xué)中蚤认,消息隊(duì)列(英語(yǔ):Message queue)是一種進(jìn)程間通信或同一進(jìn)程的不同線程間的通信方式米苹,軟件貯列用來(lái)處理一系列的輸入,通常是來(lái)自用戶砰琢。消息隊(duì)列提供了異步通信協(xié)議蘸嘶,每一個(gè)貯列中的紀(jì)錄包含詳細(xì)說(shuō)明的數(shù)據(jù),包含發(fā)生的時(shí)間陪汽,輸入設(shè)備的種類(lèi)训唱,以及特定的輸入?yún)?shù),也就是說(shuō):消息的發(fā)送者和接收者不需要同時(shí)與消息隊(duì)列交互掩缓。消息會(huì)保存在隊(duì)列中雪情,直到接收者取回它∧憷保——摘取自維基百科消息隊(duì)列

自己對(duì)于消息隊(duì)列的理解實(shí)際上可以從兩方面入手巡通,消息隊(duì)列

消息與隊(duì)列

什么是消息?香農(nóng)在《信息的數(shù)學(xué)理論》中定義能夠攜帶信息量的信息都可以稱(chēng)之為消息舍哄。簡(jiǎn)而言之有發(fā)送方有接收方并且能夠在二者之間傳遞信息的載體都可以稱(chēng)之為消息宴凉。

什么是隊(duì)列(queue)?一種經(jīng)典的先入先出的數(shù)據(jù)結(jié)構(gòu),后端插入數(shù)據(jù)表悬,前端獲取數(shù)據(jù)弥锄。

為什么需要消息隊(duì)列,這需要結(jié)合一些業(yè)務(wù)邏輯來(lái)說(shuō)蟆沫。在計(jì)算機(jī)中籽暇,硬件之間傳輸速率總是不盡相同,例如使用SATA盤(pán)向機(jī)械硬盤(pán)拷貝數(shù)據(jù)饭庞,無(wú)論SATA速率如何高戒悠,總要受到機(jī)械硬盤(pán)的限制。而軟件中同樣舟山,很多情況下绸狐,我們消息在單位時(shí)間內(nèi)獲取的速度和軟件滿足單位時(shí)間內(nèi)寫(xiě)入的速度并不相匹配,例如數(shù)據(jù)庫(kù)寫(xiě)入瓶頸累盗,這時(shí)候就需要使用到消息隊(duì)列作為緩沖寒矿,還有很多情況之后再總結(jié)。

將消息暫存在隊(duì)列中若债,保證消息到達(dá)的順序性同時(shí)還能夠保證消息的準(zhǔn)確性符相,我覺(jué)得這是消息隊(duì)列最可貴的地方。

常見(jiàn)消息隊(duì)列

RabbitMQ

RabbitMQ是使用Erlang編寫(xiě)的一個(gè)開(kāi)源的消息隊(duì)列拆座,本身支持很多的協(xié)議:AMQP主巍,XMPP, SMTP, STOMP冠息,也正因如此,它非常重量級(jí)孕索,更適合于企業(yè)級(jí)的開(kāi)發(fā)逛艰。同時(shí)實(shí)現(xiàn)了Broker構(gòu)架,這意味著消息在發(fā)送給客戶端時(shí)先在中心隊(duì)列排隊(duì)搞旭。對(duì)路由散怖,負(fù) 載均衡或者數(shù)據(jù)持久化都有很好的支持。

Redis

Redis是一個(gè)基于Key-Value對(duì)的NoSQL數(shù)據(jù)庫(kù)肄渗,開(kāi)發(fā)維護(hù)很活躍镇眷。雖然它是一個(gè)Key-Value數(shù)據(jù)庫(kù)存儲(chǔ)系統(tǒng),但它本身支持MQ功能翎嫡, 所以完全可以當(dāng)做一個(gè)輕量級(jí)的隊(duì)列服務(wù)來(lái)使用欠动。對(duì)于RabbitMQ和Redis的入隊(duì)和出隊(duì)操作,各執(zhí)行100萬(wàn)次惑申,每10萬(wàn)次記錄一次執(zhí)行時(shí)間具伍。測(cè)試 數(shù)據(jù)分為128Bytes、512Bytes圈驼、1K和10K四個(gè)不同大小的數(shù)據(jù)人芽。實(shí)驗(yàn)表明:入隊(duì)時(shí),當(dāng)數(shù)據(jù)比較小時(shí)Redis的性能要高于 RabbitMQ绩脆,而如果數(shù)據(jù)大小超過(guò)了10K萤厅,Redis則慢的無(wú)法忍受;出隊(duì)時(shí)靴迫,無(wú)論數(shù)據(jù)大小惕味,Redis都表現(xiàn)出非常好的性能,而RabbitMQ 的出隊(duì)性能則遠(yuǎn)低于Redis玉锌。

Kafka/Jafka

Kafka是Apache下的一個(gè)子項(xiàng)目赦拘,是一個(gè)高性能跨語(yǔ)言分布式Publish/Subscribe消息隊(duì)列系統(tǒng),而Jafka是在Kafka之上孵 化而來(lái)的芬沉,即Kafka的一個(gè)升級(jí)版。具有以下特性:快速持久化阁猜,可以在O(1)的系統(tǒng)開(kāi)銷(xiāo)下進(jìn)行消息持久化丸逸;高吞吐,在一臺(tái)普通的服務(wù)器上既可以達(dá)到 10W/s的吞吐速率剃袍;完全的分布式系統(tǒng)黄刚,Broker、Producer民效、Consumer都原生自動(dòng)支持分布式憔维,自動(dòng)實(shí)現(xiàn)復(fù)雜均衡涛救;支持Hadoop 數(shù)據(jù)并行加載,對(duì)于像Hadoop的一樣的日志數(shù)據(jù)和離線分析系統(tǒng)业扒,但又要求實(shí)時(shí)處理的限制检吆,這是一個(gè)可行的解決方案。Kafka通過(guò)Hadoop的并行 加載機(jī)制來(lái)統(tǒng)一了在線和離線的消息處理程储。Apache Kafka相對(duì)于ActiveMQ是一個(gè)非常輕量級(jí)的消息系統(tǒng)蹭沛,除了性能非常好之外,還是一個(gè)工作良好的分布式系統(tǒng)章鲤。

ZeroMQ

ZeroMQ號(hào)稱(chēng)最快的消息隊(duì)列系統(tǒng)摊灭,尤其針對(duì)大吞吐量的需求場(chǎng)景。ZMQ能夠?qū)崿F(xiàn)RabbitMQ不擅長(zhǎng)的高級(jí)/復(fù)雜的隊(duì)列败徊,但是開(kāi)發(fā)人員需要自己組合 多種技術(shù)框架帚呼,技術(shù)上的復(fù)雜度是對(duì)這MQ能夠應(yīng)用成功的挑戰(zhàn)。ZeroMQ具有一個(gè)獨(dú)特的非中間件的模式皱蹦,你不需要安裝和運(yùn)行一個(gè)消息服務(wù)器或中間件煤杀,因 為你的應(yīng)用程序?qū)缪萘诉@個(gè)服務(wù)角色。你只需要簡(jiǎn)單的引用ZeroMQ程序庫(kù)根欧,可以使用NuGet安裝怜珍,然后你就可以愉快的在應(yīng)用程序之間發(fā)送消息了。但 是ZeroMQ僅提供非持久性的隊(duì)列凤粗,也就是說(shuō)如果down機(jī)酥泛,數(shù)據(jù)將會(huì)丟失。其中嫌拣,Twitter的Storm中默認(rèn)使用ZeroMQ作為數(shù)據(jù)流的傳 輸柔袁。

ActiveMQ

ActiveMQ是Apache下的一個(gè)子項(xiàng)目。 類(lèi)似于ZeroMQ异逐,它能夠以代理人和點(diǎn)對(duì)點(diǎn)的技術(shù)實(shí)現(xiàn)隊(duì)列捶索。同時(shí)類(lèi)似于RabbitMQ,它少量代碼就可以高效地實(shí)現(xiàn)高級(jí)應(yīng)用場(chǎng)景灰瞻。

消息隊(duì)列常見(jiàn)適用場(chǎng)景

首先來(lái)說(shuō)個(gè)自己之前的經(jīng)歷吧腥例,在之前的電纜監(jiān)控管理項(xiàng)目中,需要與很多網(wǎng)關(guān)進(jìn)行數(shù)據(jù)交換酝润,而這個(gè)時(shí)間節(jié)點(diǎn)是1分鐘一次燎竖,所以在59秒的時(shí)候系統(tǒng)可能會(huì)收到非常多的MODBUS數(shù)據(jù),通過(guò)解析轉(zhuǎn)化為對(duì)象后要销,這些數(shù)據(jù)大約在3000個(gè)/s构回,但是這種數(shù)據(jù)高峰只出現(xiàn)在這一秒,其他的時(shí)候就非常平緩。由于當(dāng)時(shí)的服務(wù)器并不太好纤掸,所以在用戶使用時(shí)脐供,總會(huì)在一分鐘內(nèi)卡頓一兩秒的時(shí)間,用戶體驗(yàn)極差借跪。當(dāng)時(shí)的設(shè)置的解決方案是將Netty分批次阻塞處理政己,也就是說(shuō)讓這3000個(gè)數(shù)據(jù)在1分鐘內(nèi)均勻到達(dá)。

但是這樣有一些缺點(diǎn)垦梆,可能某個(gè)數(shù)據(jù)是1分1秒發(fā)送而我們將他阻塞到了1分59秒匹颤,數(shù)據(jù)的準(zhǔn)確性可能會(huì)下降不少『啵現(xiàn)在看來(lái)這種場(chǎng)景實(shí)際上非常適合消息隊(duì)列溪北,可以先將這些數(shù)據(jù)緩存到消息隊(duì)列中辛臊,再由隊(duì)列寫(xiě)入數(shù)據(jù)庫(kù)稀余。

1.異步處理

假設(shè)現(xiàn)在用戶需要注冊(cè)一個(gè)賬號(hào)寒瓦,注冊(cè)完成后需要發(fā)送注冊(cè)郵件和短信提示被冒,如果讓整個(gè)流程按照串行化執(zhí)行适室,從注冊(cè)到系統(tǒng)響應(yīng)共計(jì)150ms货邓。

image

如果讓發(fā)送郵件和發(fā)送短信兩個(gè)步驟并行執(zhí)行公浪,需要100ms響應(yīng)他宛,顯然時(shí)間已經(jīng)縮短了1/3.

image

如果加入了消息隊(duì)列來(lái)處理。

image

與之前的并行處理方式所不同的是欠气,在加入消息隊(duì)列后厅各,發(fā)送郵件與發(fā)送短信功能變?yōu)榱水惒教幚恚簿褪钦f(shuō)當(dāng)用戶將注冊(cè)信息存入數(shù)據(jù)庫(kù)后预柒,數(shù)據(jù)庫(kù)會(huì)將這個(gè)對(duì)象存入消息隊(duì)列队塘,此時(shí)就完成了響應(yīng)。隨后發(fā)送郵件和短信的模塊會(huì)自己去消息隊(duì)列中獲取該對(duì)象進(jìn)行發(fā)送宜鸯,此時(shí)用戶響應(yīng)為60ms憔古,最為快捷。

2.應(yīng)用解耦

消息隊(duì)列同樣常見(jiàn)于應(yīng)用的解耦方面淋袖,作為一個(gè)中間件鸿市,消息隊(duì)列讓兩個(gè)強(qiáng)耦合的關(guān)系變成二者同時(shí)依賴(lài)于消息隊(duì)列,二者中任意一個(gè)都能夠不依賴(lài)另一方所進(jìn)行工作即碗。如圖(畫(huà)圖不易焰情,取圖請(qǐng)注明)

image

如果不適用消息隊(duì)列直接讓訂單系統(tǒng)依賴(lài)于庫(kù)存系統(tǒng),則會(huì)出現(xiàn)二者任意一個(gè)崩潰整個(gè)系統(tǒng)都無(wú)法使用剥懒,而在加入消息隊(duì)列后烙样,如果庫(kù)存系統(tǒng)崩潰,用戶依舊可以正常下單蕊肥,只是這些訂單將存儲(chǔ)在訂單隊(duì)列中,待庫(kù)存系統(tǒng)修復(fù)后可從隊(duì)列中獲取進(jìn)行業(yè)務(wù)處理,讓整個(gè)系統(tǒng)更加安全壁却,依賴(lài)程度更低批狱。

3.流量削峰

當(dāng)某個(gè)時(shí)間點(diǎn)流量過(guò)大,會(huì)超過(guò)服務(wù)器負(fù)載的時(shí)候需要用到流量削峰展东,例如在秒殺活動(dòng)中赔硫,可以將秒殺訂單存放進(jìn)入消息隊(duì)列,如果超過(guò)隊(duì)列上限可以丟棄盐肃,然后再進(jìn)行訂單的處理爪膊,緩解服務(wù)器壓力。

image

RabbitMQ

RabbitMQ是一套開(kāi)源(MPL)的消息隊(duì)列服務(wù)軟件砸王,是由 LShift 提供的一個(gè) Advanced Message Queuing Protocol (AMQP) 的開(kāi)源實(shí)現(xiàn)推盛,由以高性能、健壯以及可伸縮性出名的 Erlang 寫(xiě)成谦铃。

可供選擇的消息隊(duì)列很多耘成,之所以優(yōu)先學(xué)習(xí)了RabbitMQ,主要在于其性能強(qiáng)驹闰,由Erlang編寫(xiě)瘪菌,天生對(duì)于高并發(fā)類(lèi)型友好,而且其學(xué)習(xí)資源豐富嘹朗,遇到問(wèn)題更加容易排查處理师妙。

安裝RabbitMQ

如果有能力最好按照官網(wǎng)的文檔進(jìn)行安裝,任何時(shí)候官方文檔都應(yīng)該是第一選擇屹培。

安裝完成之后默穴,打開(kāi)默認(rèn)RabbitMQ-Server分配的地址localhost:15672,默認(rèn)密碼和用戶名都為guest惫谤。

image

Get Started In Java

簡(jiǎn)單隊(duì)列

image

既然是消息隊(duì)列壁顶,我們首先來(lái)用Java寫(xiě)個(gè)Demo往消息隊(duì)列中存一些MSG,示例為簡(jiǎn)單隊(duì)列溜歪,由一個(gè)消費(fèi)者與一個(gè)生產(chǎn)者組成若专。

和數(shù)據(jù)庫(kù)相同,首先要獲取鏈接蝴猪,這里抽象處理调衰,編寫(xiě)一個(gè)RabbitMQConnectionUtils

package com.magnoliaory;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 工具類(lèi)自阱,類(lèi)似JDBC提供MQ的連接
 */
public class RabbitMQUtils {

    public static Connection getRabbitMQConnection
            (String host , Integer port , String virtualHost , String username , String password) {
        //定義一個(gè)工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //設(shè)置連接參數(shù)
        connectionFactory.setHost(host);
        //設(shè)置連接方式的端口嚎莉,例如amqp對(duì)應(yīng)5672
        connectionFactory.setPort(port);
        //設(shè)置Vhost , 可以看作數(shù)據(jù)庫(kù)
        connectionFactory.setVirtualHost(virtualHost);
        //設(shè)置用戶名和密碼
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);

        Connection connection = null;
        //獲取連接
        try {
            connection = connectionFactory.newConnection();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return connection;
    }
}

然后編寫(xiě)一個(gè)生產(chǎn)者類(lèi)來(lái)進(jìn)行消息的寫(xiě)入。

package com.magnoliaory;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class RabbitMQProduc {

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 11; i++) {
            prodceMessage();
            Thread.sleep(3000);
        }
    }

    /**
     * 隊(duì)列名稱(chēng)
     */
    public final static String QUEUE_NAME = "MyProductQueue";

    public static void prodceMessage() throws Exception {
        //獲取連接
        Connection connection = RabbitMQUtils.
                getRabbitMQConnection("localhost", 5672,
                        "/magnolia", "guest", "guest");

        //獲取通道
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //發(fā)送消息
        String msg = "HELLO WORLD";
        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

        //關(guān)閉
        channel.close();
        connection.close();

    }
}

可以看到沛豌,此時(shí)消息隊(duì)列中已經(jīng)有了十一個(gè)數(shù)據(jù)準(zhǔn)備就緒趋箩,等待我們get赃额。

image

編寫(xiě)消費(fèi)者

package com.magnoliaory;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Date;

public class RabbitMQConsumer {

    public static void main(String[] args) throws IOException {
        receiveMessage();
    }

    public final static String QUEUE_NAME = "MyProductQueue";


    public static void receiveMessage() throws IOException {
        //獲取連接
        Connection connection = RabbitMQUtils.
                getRabbitMQConnection("localhost", 5672,
                        "/magnolia", "guest", "guest");

        Channel channel = connection.createChannel();
        //隊(duì)列聲明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //事件模型
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("接收端收到信息 : "+msg + " " + new Date());
            }
        };
        //監(jiān)聽(tīng)隊(duì)列
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

    }

}

看下控制臺(tái)效果。

接收端收到信息 : HELLO WORLD Tue Jan 07 14:52:48 CST 2020
接收端收到信息 : HELLO WORLD Tue Jan 07 14:52:51 CST 2020
接收端收到信息 : HELLO WORLD Tue Jan 07 14:52:54 CST 2020
接收端收到信息 : HELLO WORLD Tue Jan 07 14:52:57 CST 2020
接收端收到信息 : HELLO WORLD Tue Jan 07 14:53:00 CST 2020
接收端收到信息 : HELLO WORLD Tue Jan 07 14:53:03 CST 2020
接收端收到信息 : HELLO WORLD Tue Jan 07 14:53:06 CST 2020
接收端收到信息 : HELLO WORLD Tue Jan 07 14:53:09 CST 2020
接收端收到信息 : HELLO WORLD Tue Jan 07 14:53:12 CST 2020
接收端收到信息 : HELLO WORLD Tue Jan 07 14:53:15 CST 2020

控制臺(tái)中能夠看到叫确,此時(shí)consumer依舊在監(jiān)聽(tīng)消息隊(duì)列跳芳。

image

工作模式(work)

工作隊(duì)列由一個(gè)生產(chǎn)者與多個(gè)消費(fèi)者組成。

之前代碼示例為簡(jiǎn)單隊(duì)列(Simple Queue)竹勉,其結(jié)構(gòu)為傳統(tǒng)的一對(duì)一類(lèi)型飞盆,但是再實(shí)際生產(chǎn)中,可能存在生產(chǎn)者生產(chǎn)速度遠(yuǎn)遠(yuǎn)高于消費(fèi)者消費(fèi)速度次乓,此時(shí)如果依舊按照一對(duì)一模式消費(fèi)吓歇,則會(huì)造成消息在隊(duì)列中積壓,所以我們可以根據(jù)業(yè)務(wù)產(chǎn)生的速率來(lái)構(gòu)建工作隊(duì)列票腰,讓消費(fèi)者和生產(chǎn)者之間速率接近平衡城看,此時(shí)的模型即為工作隊(duì)列。

image

實(shí)際上工作隊(duì)和簡(jiǎn)單隊(duì)列差異很小丧慈,更多的是一種業(yè)務(wù)邏輯上的區(qū)別析命。看下生產(chǎn)者代碼逃默。

package workQueue;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import sun.reflect.misc.ConstructorUtil;
import utils.RabbitMQUtils;

import java.io.IOException;

/**
 * @author fanhao
 * 工作隊(duì)列消息發(fā)送者
 */
public class WorkQueueProducer {

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 100; i++) {
            createMsg(i);
            System.out.println("第"+i+"消息發(fā)送完成");
            Thread.sleep(500);
        }
    }


    public static final String QUEUE_NAME = "MSG_PRODUCER";

    public static void createMsg(Integer temp) throws Exception {
        Connection connection = RabbitMQUtils.
                getRabbitMQConnection("localhost", 5672,
                        "/magnolia", "guest", "guest");
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        String msg = "消息生產(chǎn)者M(jìn)SG_PRDUCER第" + temp + "次發(fā)送消息";

        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

        channel.close();
        connection.close();

    }

}

消費(fèi)者代碼鹃愤。

package workQueue;

import com.rabbitmq.client.*;
import utils.RabbitMQUtils;

import java.io.IOException;
import java.util.Date;

public class WorkQueueConsumer {

    public static void main(String[] args) throws IOException {
        receiveMessage();
    }


    public final static String QUEUE_NAME = "MSG_PRODUCER";


    public static void receiveMessage() throws IOException {
        //獲取連接
        Connection connection = RabbitMQUtils.
                getRabbitMQConnection("localhost", 5672,
                        "/magnolia", "guest", "guest");

        Channel channel = connection.createChannel();
        //隊(duì)列聲明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //事件模型
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("接收端收到信息 : " + msg + "  " + new Date() + Thread.currentThread());
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        };
        //監(jiān)聽(tīng)隊(duì)列
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

    }
}

這里啟動(dòng)兩個(gè)consumer,可以看到一個(gè)奇怪的現(xiàn)象完域,兩個(gè)消費(fèi)者均分软吐,一個(gè)獲取到的是奇數(shù)消息,另一個(gè)獲取到偶數(shù)吟税,這種情況稱(chēng)之為輪詢分發(fā)凹耙。

輪詢分發(fā) : 無(wú)論Consumer負(fù)載是否相同,任務(wù)消息總是均分處理肠仪,即 K%N 處理方式肖抱。

針對(duì)這種情況可以使用公平分發(fā)的模式來(lái)處理。即使用basicQos(perfetch=1)异旧。公平分發(fā)主要區(qū)別在于意述,每次MQ都會(huì)按照f(shuō)etch數(shù)為消費(fèi)者發(fā)送消息,之后消費(fèi)者處理完成并給與RabbitMQ響應(yīng)后(手動(dòng)反饋)吮蛹,才會(huì)再次發(fā)送下一個(gè)消息荤崇,這樣就保證了性能強(qiáng)的消費(fèi)者處理更多的消息,即公平分發(fā)潮针。

首先要關(guān)閉consumer的自動(dòng)應(yīng)答术荤。

//監(jiān)聽(tīng)隊(duì)列
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck , defaultConsumer);

然后在生產(chǎn)者中添加對(duì)于公平隊(duì)列發(fā)送的通道設(shè)計(jì)即可。

//確認(rèn)響應(yīng)前只發(fā)送一個(gè)消息
channel.basicQos(1);

此時(shí)啟動(dòng)服務(wù)器每篷,發(fā)現(xiàn)二者的消費(fèi)行為就有了差距瓣戚。

image
image

消息應(yīng)答

消息應(yīng)答實(shí)際上就是一種溝通機(jī)制端圈,在消費(fèi)者和MQ之間的行為,當(dāng)消費(fèi)者獲取消息完成后带兜,會(huì)告知RabbitMQ枫笛,同時(shí)消息隊(duì)列將消息從內(nèi)存中刪除。

image

自動(dòng)應(yīng)答

使用自動(dòng)應(yīng)答autoack = true時(shí)刚照,消息將以輪詢的方式發(fā)送,此時(shí)如果一個(gè)消息已經(jīng)發(fā)送喧兄,并且目標(biāo)消費(fèi)者出現(xiàn)異常无畔,該消息將永遠(yuǎn)消失,所以這種情況下存在消息丟失的風(fēng)險(xiǎn)吠冤。

手動(dòng)應(yīng)答

如果使用手動(dòng)應(yīng)答浑彰,則在消息隊(duì)列未收到消息處理完成之前,并不會(huì)刪除該消息拯辙,如果消息發(fā)送后消費(fèi)者崩潰郭变,消息隊(duì)列將會(huì)把該消息發(fā)送給其他的消費(fèi)者。

這樣我們就解決了消費(fèi)者異常導(dǎo)致任務(wù)丟失的情況涯保,但是還存在RabbitMQ異常導(dǎo)致消息丟失的情況诉濒。所以RabbitMQ提供了消息持久化的功能保證我們能夠?qū)?nèi)存中的數(shù)據(jù)持久化到磁盤(pán)上,防止出現(xiàn)異常情況夕春。

消息持久化

在之前寫(xiě)一些簡(jiǎn)單的消息隊(duì)列時(shí)都會(huì)看到這樣一個(gè)方法未荒,注意RabbitMQ不允許重新定義不同參數(shù)的隊(duì)列,例如我們剛剛寫(xiě)好的內(nèi)容修改未true及志,將無(wú)法使用片排,只能新建一個(gè)隊(duì)列。

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

我們來(lái)看看源碼中這些參數(shù)的內(nèi)容速侈。

/**
 * Declare a queue
 * @see com.rabbitmq.client.AMQP.Queue.Declare
 * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
 * @param queue the name of the queue
 * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
 * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
 * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
 * @param arguments other properties (construction arguments) for the queue
 * @return a declaration-confirm method to indicate the queue was successfully declared
 * @throws java.io.IOException if an error is encountered
 */
   
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments) throws IOException;

可以看到the queue will survive a server restart這句話率寡,很明顯,這個(gè)durable即將消息隊(duì)列持久化到磁盤(pán)倚搬,讓隊(duì)列信息在服務(wù)器重啟后得以幸存冶共。持久化只需要將其設(shè)置為true即可。

訂閱模式(fanout)

消息被發(fā)布到通道上潭枣。訂閱者將收到其訂閱的主題上的所有消息比默,并且所有訂閱同一主題的訂閱者將接收到同樣的消息。

image

和前面兩種隊(duì)列相比較盆犁,訂閱模式有如下幾個(gè)特點(diǎn)命咐。

  • 每個(gè)消費(fèi)者擁有自己的消息隊(duì)列
  • 消息將由生產(chǎn)者發(fā)送到交換機(jī),再由交換機(jī)轉(zhuǎn)發(fā)到消息隊(duì)列中
  • 每個(gè)隊(duì)列都要被綁定在交換機(jī)上
  • 生產(chǎn)者發(fā)送的消息經(jīng)過(guò)交換機(jī)到達(dá)隊(duì)列后谐岁,一個(gè)消息可以被多個(gè)消費(fèi)者消費(fèi)

創(chuàng)建生產(chǎn)者

public class SubscripQueueProducer {

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 10; i++) {
            createMsg();
        }
    }

    //聲明交換機(jī)的名稱(chēng)
    public static final String EXCHANGE_NAME = "SUBSCRIPT_QUEUE";

    public static void createMsg() throws Exception {
        //同之前
        Connection connection = RabbitMQUtils.
                getRabbitMQConnection("localhost", 5672,
                        "/magnolia", "guest", "guest");
        Channel channel = connection.createChannel();

        //聲明交換機(jī)
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        //發(fā)送消息
        String msg = "這是一條來(lái)自交換機(jī)的消息醋奠。";

        channel.basicPublish(EXCHANGE_NAME,"" , null, msg.getBytes());

        channel.close();
        connection.close();

    }
}

運(yùn)行結(jié)果榛臼,查看控制臺(tái)能夠看到在交換機(jī)一欄中消息已經(jīng)被推送進(jìn)來(lái)。

image

但是我們發(fā)現(xiàn)窜司,消息卻丟失了沛善,這里需要注意,在RabbitMQ中塞祈,只有隊(duì)列有存儲(chǔ)能力金刁,交換機(jī)無(wú)存儲(chǔ)能力,在沒(méi)有隊(duì)列綁定時(shí)议薪,數(shù)據(jù)將被直接拋棄尤蛮。

下面我們寫(xiě)一個(gè)消費(fèi)者隊(duì)列。

private static final String QUEUE_NAME = "subscriptQueueConsumer";

    public static void getMsg() throws Exception {
        //同之前
        Connection connection = RabbitMQUtils.
                getRabbitMQConnection("localhost", 5672,
                        "/magnolia", "guest", "guest");
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //將隊(duì)列綁定在交換機(jī)上
        channel.queueBind(QUEUE_NAME, SubscriptQueueProducer.EXCHANGE_NAME, "");

        //事件模型
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println(" 1接收端收到信息 : "+msg + " " + new Date());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        };
        //監(jiān)聽(tīng)隊(duì)列
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}

先啟動(dòng)consumer監(jiān)聽(tīng)斯议,然后啟動(dòng)生產(chǎn)者产捞,這時(shí)候可以看到控制臺(tái)交換機(jī)中有綁定的隊(duì)列。

image

路由模式(direct)

路由模式實(shí)際上時(shí)對(duì)于訂閱模式的一個(gè)拓展哼御。

image

訂閱模式中只要生產(chǎn)者發(fā)送消息到交換機(jī)中坯临,交換機(jī)會(huì)將他們推送給所有的消息隊(duì)列,而路由模式則會(huì)在推送消息時(shí)攜帶一個(gè)Key恋昼,只有與該Key匹配的消息隊(duì)列才能接收到該消息看靠,也就是說(shuō),路由模式將能夠決定給哪些消息隊(duì)列推送消息焰雕。

主題模式(topic)

之前我們使用過(guò)一個(gè)方法 :

channel.queueBind(QUEUE_NAME, SubscriptQueueProducer.EXCHANGE_NAME, "");

其中最后一個(gè)參數(shù)未rountingKey(路由鍵)衷笋,而路由模式則和此值有關(guān)。在消費(fèi)者和生產(chǎn)者中只需要增加對(duì)于自身routingKey的綁定矩屁,同時(shí)修改Exchange的type為direct辟宗。

簡(jiǎn)單來(lái)講,主題模式類(lèi)似于正則表達(dá)式或者通配符吝秕。

image

如圖泊脐,其中以CH開(kāi)頭的消息都會(huì)發(fā)送到CH.#消息隊(duì)列中,以.News結(jié)尾都會(huì)發(fā)送到#.News消息隊(duì)列中烁峭,以此類(lèi)推容客。

下面就直接上代碼,生產(chǎn)者 :

public class TopicPouducer {

    //聲明交換機(jī)的名稱(chēng)
    public static final String EXCHANGE_NAME = "Topic";

    public static void createMsg() throws Exception {
        //同之前
        Connection connection = RabbitMQUtils.
                getRabbitMQConnection("localhost", 5672,
                        "/magnolia", "guest", "guest");
        Channel channel = connection.createChannel();

        //聲明交換機(jī)
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        //發(fā)送消息
        String msg = "路由信息";
      
        channel.basicPublish(EXCHANGE_NAME,"goods.car" , null, msg.getBytes());

        channel.close();
        connection.close();
    }
}

消費(fèi)者

public class TopicConsumer {

    private static final String QUEUE_NAME = "TopicConsumer";

    public static void getMsg() throws Exception {
        //同之前
        Connection connection = RabbitMQUtils.
                getRabbitMQConnection("localhost", 5672,
                        "/magnolia", "guest", "guest");
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //將隊(duì)列綁定在交換機(jī)上
        //隊(duì)列接收一切以goods.開(kāi)頭的消息
        channel.queueBind(QUEUE_NAME, SubscriptQueueProducer.EXCHANGE_NAME, "goods.#");

        //事件模型
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println(" 1接收端收到信息 : "+msg + " " + new Date());
            }
        };
        //監(jiān)聽(tīng)隊(duì)列
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}

消息確認(rèn)機(jī)制

之前消息應(yīng)答保證了消費(fèi)者與RabbitMQ之間的消息存儲(chǔ)安全约郁,但是在生產(chǎn)者一方缩挑,我們無(wú)法知道我們的消息是否準(zhǔn)確發(fā)送到了RabbitMQ消息隊(duì)列中。這里總結(jié)兩種方式 :

  • AMQP事務(wù)機(jī)制
  • Confirm模式

AMQP

try {
      channel.txSelect();
      channel.basicPublish(EXCHANGE_NAME,"" , null, msg.getBytes());
      channel.txCommit();
 } catch (IOException e) {
      channel.txRollback();
 }

和數(shù)據(jù)庫(kù)常用的事務(wù)機(jī)制類(lèi)似鬓梅,都是以commit和rollback為操作標(biāo)準(zhǔn)供置。

confirm

在confirm模式中,一旦channel使用該模式绽快,則其發(fā)送的每一條消息都將會(huì)附帶一個(gè)唯一ID值芥丧,當(dāng)RabbitMQ收到該消息時(shí)紧阔,將返回一個(gè)確認(rèn)給生產(chǎn)者,以告知消息安全到達(dá)续担。

由于confirm模式所使用的是異步處理擅耽,消息發(fā)送與ID確認(rèn)在不同線程處理,結(jié)果通過(guò)回調(diào)處理物遇,能夠有效提高性能乖仇。

//開(kāi)啟confim
channel.confirmSelect();

三種編程模式 :

  • 普通模式(串行) : 發(fā)送一條confirm信息
  • 批量模式(串行) : 發(fā)送一批confirm信息
  • 異步模式 : 提供回調(diào)方法

普通模式

if (!channel.waitForConfirms()) {
    System.out.println("消息發(fā)送失敗");
}

批量模式

for (int i = 0; i < 10; i++) {
     channel.basicPublish(EXCHANGE_NAME,"routingKey" , null, msg.getBytes());
}
if (!channel.waitForConfirms()) {
    System.out.println("消息發(fā)送失敗");
}

批量實(shí)際上就是讓所有信息發(fā)送完成后再確認(rèn)即可,大部分情況下批量模式性能更好询兴,但是如果批量模式出現(xiàn)問(wèn)題这敬,這一批數(shù)據(jù)都會(huì)重新發(fā)送,具體使用根據(jù)場(chǎng)景而定蕉朵。

異步模式

在異步模式中,生產(chǎn)者每發(fā)送一條消息都會(huì)將該消息的ID值放入一個(gè)集合當(dāng)中阳掐,然后由另一條線程根據(jù)集合進(jìn)行發(fā)送確認(rèn)始衅。

由于是集合,可能會(huì)有多個(gè)消息確認(rèn)收到缭保,這個(gè)multiple為true時(shí)就是說(shuō)明有多個(gè)符合要求汛闸。

package confirm;

public class ConfirmTest {

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 100; i++) {
            createMsg();
            System.out.println("發(fā)送完成");
        }
    }

    //聲明交換機(jī)的名稱(chēng)
    public static final String EXCHANGE_NAME = "SUBSCRIPT_QUEUE";

    public static void createMsg() throws Exception {
        //同之前
        Connection connection = RabbitMQUtils.
                getRabbitMQConnection("localhost", 5672,
                        "/magnolia", "guest", "guest");
        Channel channel = connection.createChannel();

        //未確認(rèn)的消息集合
        final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());

        channel.addConfirmListener(new ConfirmListener() {
            //沒(méi)有問(wèn)題
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                if (multiple) {
                    confirmSet.headSet(deliveryTag + 1).clear();
                }else {
                    confirmSet.remove(deliveryTag);
                }
            }
            //消息沒(méi)有確認(rèn)
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                if (multiple) {
                    confirmSet.headSet(deliveryTag + 1).clear();
                }else {
                    confirmSet.remove(deliveryTag);
                }
            }
        });

    }
}

感謝您百忙之中能夠看完本篇文章,祝您擁有美好的一天艺骂。

參考文章

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末诸老,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子钳恕,更是在濱河造成了極大的恐慌别伏,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,635評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件忧额,死亡現(xiàn)場(chǎng)離奇詭異厘肮,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)睦番,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,543評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門(mén)类茂,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人托嚣,你說(shuō)我怎么就攤上這事巩检。” “怎么了示启?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,083評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵兢哭,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我丑搔,道長(zhǎng)厦瓢,這世上最難降的妖魔是什么提揍? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,640評(píng)論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮煮仇,結(jié)果婚禮上劳跃,老公的妹妹穿的比我還像新娘。我一直安慰自己浙垫,他們只是感情好刨仑,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,640評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著夹姥,像睡著了一般杉武。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上辙售,一...
    開(kāi)封第一講書(shū)人閱讀 52,262評(píng)論 1 308
  • 那天轻抱,我揣著相機(jī)與錄音,去河邊找鬼旦部。 笑死祈搜,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的士八。 我是一名探鬼主播容燕,決...
    沈念sama閱讀 40,833評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼婚度!你這毒婦竟也來(lái)了蘸秘?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,736評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤蝗茁,失蹤者是張志新(化名)和其女友劉穎醋虏,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體评甜,經(jīng)...
    沈念sama閱讀 46,280評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡灰粮,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,369評(píng)論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了忍坷。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片粘舟。...
    茶點(diǎn)故事閱讀 40,503評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖佩研,靈堂內(nèi)的尸體忽然破棺而出柑肴,到底是詐尸還是另有隱情,我是刑警寧澤旬薯,帶...
    沈念sama閱讀 36,185評(píng)論 5 350
  • 正文 年R本政府宣布晰骑,位于F島的核電站,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏硕舆。R本人自食惡果不足惜秽荞,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,870評(píng)論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望抚官。 院中可真熱鬧扬跋,春花似錦、人聲如沸凌节。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,340評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)倍奢。三九已至朴上,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間卒煞,已是汗流浹背痪宰。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,460評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留畔裕,地道東北人酵镜。 一個(gè)月前我還...
    沈念sama閱讀 48,909評(píng)論 3 376
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像柴钻,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子垢粮,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,512評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容