RabbitMQ的基本概念

有問題請聯(lián)系我QQ:273206491

RabbitMQ是基于Erlang語言(俗稱:二郎神)對AMQP協(xié)議的實(shí)現(xiàn)捂襟。

1檩禾、各個模塊之間的一覽圖

image.png

2刹勃、連接

這里以Java的客戶端進(jìn)行說明柄错,客戶端與RabbitMQ服務(wù)器之間是基于TCP連接的剩蟀,而TCP連接的創(chuàng)建和銷毀都非常耗費(fèi)資源催蝗,因此RabbitMQ使用連接復(fù)用模式,也就是我們常用的Channel育特,一個TCP連接可以創(chuàng)建多個Channel丙号,不同Channel之間是互相獨(dú)立的,一個線程使用一個Channel是安全的缰冤,不會出現(xiàn)多線程共享同一個連接的問題(線程共享一個資源很容易出現(xiàn)線程安全的問題)犬缨。

2.1、連接創(chuàng)建示例

//連接工廠的初始化
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("rabbitmq服務(wù)器的地址");
connectionFactory.setUsername("用戶名");
connectionFactory.setPassword("密碼");
//通過連接工廠與rabbitmq之間建立一個tcp連接
Connection connection =connectionFactory.newConnection();
//通過連接創(chuàng)建信道棉浸,多個線程之間不要共享同一個信道
Channel channel = connection.createChannel();

2.2怀薛、資源釋放問題

在連接不使用的時(shí)候及時(shí)關(guān)閉連接是非常重要的步驟,可能你在本地開發(fā)時(shí)不釋放連接不會有什么問題迷郑,但一旦程序上線后枝恋,連接不釋放很有很多導(dǎo)致RabbitMQ的連接因耗盡而無法接受新的連接請求或者其他什么問題。
釋放連接的方式也非常簡單嗡害,直接調(diào)用Connection對象的close方法可以釋放一個連接焚碌,同時(shí)也會釋放這個連接下的所有Channel資源。

3霸妹、生產(chǎn)者

用于生產(chǎn)消息十电,使用basicPublish向RabbitMQ發(fā)送消息

upChannel.basicPublish("msgExchange", "message", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("utf-8"));
  • exchange 交換器的名稱
  • routingKey 路由key(當(dāng)交換器的類型是fanout時(shí),路由key是不生效的)
  • props 用來設(shè)置消息的相關(guān)屬性
///下面這兩種方式是等價(jià)的
MessageProperties.PERSISTENT_TEXT_PLAIN
或者
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder();
properties.deliveryMode(2);
properties.contentType("text/plain");
properties.headers(null);//用于設(shè)置自己的header屬性抑堡,交換器的類型中就有一種為headers(但不推薦使用)摆出,當(dāng)我們設(shè)置的headers屬性值和交換器綁定的值一致是就能夠路由到響應(yīng)隊(duì)列中
  • body 消息體
  • mandatory

4、交換器

交換器可以認(rèn)為是一個消息中轉(zhuǎn)站首妖,他通過和隊(duì)列進(jìn)行綁定偎漫,我們把消息發(fā)送到交換器中,交換器根據(jù)路由key再決定將消息投遞給哪個隊(duì)列(交換器的類型為fanout時(shí)有缆,路由key是無效的)象踊。

4.1温亲、類型

  • fanout 會把消息給綁定到這個交換器的所有隊(duì)列都發(fā)生一遍
  • direct 只將消息發(fā)生給路由匹配的隊(duì)列
  • topic 將消息發(fā)生獲取路由匹配的對象,但是這里的匹配支持模糊匹配杯矩,rabbitmq的路由key使用點(diǎn)"."來分隔一個單詞栈虚,"*"匹配一個單詞,"#"匹配一個或者多個單詞史隆。舉個例子:路由key:com.pingwazi.rabbitmq魂务,綁定key1:com.#,綁定key2:com.*.*是匹配的泌射。
  • headers 發(fā)生消息的headers屬性完全匹配是則認(rèn)為匹配粘姜,這個模式不常用,并且性能也很差熔酷。

4.2孤紧、創(chuàng)建

//聲明一個交換器,如果存在就不創(chuàng)建拒秘,如果存在的交換器參數(shù)與聲明交換器參數(shù)不匹配就會報(bào)錯号显,如果不存在就會創(chuàng)建
//這方式什么是同步的
upChannel.exchangeDeclare("msgExchange",BuiltinExchangeType.DIRECT,false);
//這種方式聲明式異步的,但是不推薦使用躺酒,因?yàn)榭赡苣阍谡{(diào)用了之后就去使用的時(shí)候RabbitMQ服務(wù)器還沒有創(chuàng)建好押蚤。
upChannel.exchangeDeclareNoWait();
  • exchange 交換器的名字
  • type 交換器的類型
  • durable 指定這個交換器是持久化的
  • autoDelete 是否自動刪除,true則表示自動刪除阴颖,但是有一個前提條件活喊,那就是曾經(jīng)至少要有一個隊(duì)列或者交換器與之綁定,但是現(xiàn)在都已經(jīng)接觸綁定了量愧。這時(shí)RabbitMQ服務(wù)器才會自動的吧這個交換器給刪除掉钾菊,也就是說如果這個交換器創(chuàng)建出來后,沒有任何隊(duì)列或者交換器與之綁定的話偎肃,RabbitMQ是不會自動刪除的煞烫,即使我們設(shè)置了自動刪除。
  • internal 設(shè)置是否是內(nèi)置的交換器累颂,如果設(shè)置為true滞详,則表示是內(nèi)置的,那么客戶端程序?qū)o法直接向這個交換器發(fā)送消息紊馏,只能通過交換器路由到交換器的方式料饥。
  • argument 其他可選參數(shù)

4.3、判斷交換器是否存在

//判斷指定交換器是否存在朱监,如果不存在就會報(bào)404異常岸啡。
upChannel.exchangeDeclarePassive();

4.4、刪除交換器

// 同步刪除
upChannel.exchangeDelete("",true);
//異步刪除(不需要等待刪除完成)
upChannel.exchangeDeleteNoWait("",true);
  • exchange 交換器的名稱
  • ifUnused 是否只刪除沒有被使用的交換器

5赫编、綁定路由key

//將交換器與隊(duì)列進(jìn)行綁定通過message進(jìn)行綁定
upChannel.queueBind("msgQueue","msgExchange","message");
//將交換器與隊(duì)列進(jìn)行解綁
upChannel.queueUnbind("","","")
//將交換器與交換器綁定在一起
upChannel.exchangeBind( "","","")
//將交換器與交換器解綁
upChannel.exchangeUnbind( "","","")

綁定路由key是在綁定交換器和隊(duì)列時(shí)指定的一個key巡蘸,其中message就是masQueue隊(duì)列與msgExchange交換器的綁定路由key奋隶。

路由key是發(fā)送消息的時(shí)候指定的key,交換器類型為direct或者topic時(shí)悦荒,路由key與綁定路由key后才會將消息發(fā)送到對應(yīng)的隊(duì)列中唯欣。

6、隊(duì)列

隊(duì)列是RabbitMQ實(shí)際存儲消息的地方

//聲明一個名字為msgQueue搬味、持久化境氢、不排他、不自動刪除的隊(duì)列碰纬,如果此隊(duì)列已經(jīng)存在就不再創(chuàng)建
upChannel.queueDeclare("msgQueue", true, false, false, null);
//異步聲明一個隊(duì)列产还,不推薦使用,原因同交換器一樣
upChannel.queueDeclareNoWait();
  • queue 隊(duì)列的名稱
  • durable 設(shè)置隊(duì)列是否為持久化隊(duì)列嘀趟,如果為true這表示是持久化隊(duì)列
  • exclusive 是否為排它隊(duì)列,值為true則表示為排他隊(duì)列愈诚,如果一個隊(duì)列是排它隊(duì)列她按,那么除了創(chuàng)建他的連接(注意:這里說的是連接,不是channel)能夠使用之外炕柔,其他連接都不能使用酌泰,并且在創(chuàng)建它的連接斷開時(shí),這個隊(duì)列會自動刪除匕累,即使設(shè)置為持久化隊(duì)列也是如此陵刹。
  • autoDelete 是否自動刪除,值為true則表示自動刪除欢嘿,自動刪除有一個前提條件衰琐,那就是曾經(jīng)至少有一個消費(fèi)者使用了這個隊(duì)列,并且現(xiàn)在已經(jīng)沒有任何消費(fèi)與這個隊(duì)列建立了連接炼蹦,這時(shí)候RabbitMQ才會自動刪除這個隊(duì)列羡宙。也就是說如果這個隊(duì)列還沒有任何消費(fèi)與建立過連接,那么RabbitMQ是不會自動刪除的掐隐。
  • arguments 其他的一些參數(shù)設(shè)置

6.1狗热、判斷隊(duì)列是否存在

//判斷隊(duì)列是否存在
upChannel.queueDeclarePassive();

6.2、刪除隊(duì)列

//同步刪除
upChannel.queueDelete("",true,true);
//異步刪除
upChannel.queueDeleteNoWait("",true,true);
  • queue 隊(duì)列名稱
  • ifUnused 是否只刪除沒有被使用過的隊(duì)列
  • ifEmpty 是否只刪除空的隊(duì)列

7虑省、消費(fèi)者

消息的消費(fèi)者有兩種常用的模式匿刮,即推模式和拉模式。兩種模式的實(shí)現(xiàn)方式完全不同探颈,推模式是只要隊(duì)列中有消息了就會推送給消費(fèi)者(當(dāng)然了也要受未確認(rèn)消息數(shù)的限制)熟丸,而拉模式則是消費(fèi)者需要的時(shí)候再去RabbitMQ中獲取,而他一次也只能獲取獲取一條消息膝擂。

7.1虑啤、拉模式

image.png

從圖中可以看出隙弛,在單線程的情況下,消息的處理速度是比較慢的狞山,當(dāng)然了這里也可以使用多線程不斷的從Rabbitmq中去獲取全闷,但這樣就需要手動實(shí)現(xiàn)獲取算法了。不廢話萍启,先上代碼总珠!

private void receiveGetMessage()
    {
        try
        {
            Channel downChannel=connection.createChannel();
            //交換器類型:fanout、direct勘纯、topic
            //聲明一個名字為msgExchange局服、類型為direct并且持久化的交換器,如果交換器已經(jīng)存在就不再創(chuàng)建
            downChannel.exchangeDeclare("msgExchange",BuiltinExchangeType.DIRECT,false);
            //聲明一個名字為msgQueue驳遵、持久化淫奔、不排他、不自動刪除的隊(duì)列堤结,如果此隊(duì)列已經(jīng)存在就不再創(chuàng)建
            downChannel.queueDeclare("msgQueue", true, false, false, null);
            //將交換器與隊(duì)列進(jìn)行綁定通過message進(jìn)行綁定
            downChannel.queueBind("msgQueue","msgExchange","message");
            //消息未確認(rèn)消息的數(shù)量
            downChannel.basicQos(1);//在非自動確認(rèn)的模式下唆迁,限制最多允許未確認(rèn)的消息數(shù)量
            boolean isBreak=false;
            while (!isBreak)
            {
                //消費(fèi)消息
                GetResponse msgData = downChannel.basicGet("", false);
                String msgBody=new String(msgData.getBody(), "utf-8");
                System.out.println(Thread.currentThread().getId()+"RabbitMQ拉模式消費(fèi)者收到消息: " + msgBody);
                //回復(fù)確認(rèn)消息
                downChannel.basicAck(msgData.getEnvelope().getDeliveryTag(),false);
                if(StringUtils.isEmpty(msgBody))
                    isBreak=true;
            }
            downChannel.close();
        }
        catch (ShutdownSignalException ex)
        {
            //連接異常關(guān)閉了,這里要進(jìn)行檢查竞穷,并嘗試重新建立連接
            ex.printStackTrace();
        }
        catch (IOException ex)
        {
            //發(fā)生io異常需要進(jìn)行處理唐责,對應(yīng)channel可能關(guān)閉了
            ex.printStackTrace();
        } catch (TimeoutException e) {
            //信道資源釋放超時(shí),可能對應(yīng)的channel關(guān)閉了
            e.printStackTrace();
        }
    }

以上代碼我是通過但線程循環(huán)的方式從RabbitMQ中拉取代碼瘾带,這種模式處理速度較慢鼠哥,在不是用多線程進(jìn)行處理的情況下,這中模式適合用于處理單個消息比較耗時(shí)的場景看政。

7.2朴恳、推模式

image.png

通過運(yùn)行如下代碼可以看得出,推模式實(shí)際上是使用了多線程的在進(jìn)行處理的允蚣。但是他的吞吐量是默認(rèn)拉模式的好幾倍菜皂,這中模式適合于處理每個消息的時(shí)間比較短的場景。

private void receivePushMessage()
    {
        try
        {
            Channel downChannel=connection.createChannel();
            //交換器類型:fanout厉萝、direct恍飘、topic
            //聲明一個名字為msgExchange、類型為direct并且持久化的交換器谴垫,如果交換器已經(jīng)存在就不再創(chuàng)建
            downChannel.exchangeDeclare("msgExchange",BuiltinExchangeType.DIRECT,false);
            //聲明一個名字為msgQueue章母、持久化、不排他翩剪、不自動刪除的隊(duì)列乳怎,如果此隊(duì)列已經(jīng)存在就不再創(chuàng)建
            downChannel.queueDeclare("msgQueue", true, false, false, null);
            //將交換器與隊(duì)列進(jìn)行綁定通過message進(jìn)行綁定
            downChannel.queueBind("msgQueue","msgExchange","message");
            //消息未確認(rèn)消息的數(shù)量
            downChannel.basicQos(10000);//在非自動確認(rèn)的模式下,限制最多允許未確認(rèn)的消息數(shù)量
            //消費(fèi)消息
            downChannel.basicConsume("msgQueue",createConsumer(downChannel));
            System.out.println("RabbitMQ消費(fèi)者正在運(yùn)行中...");
            //不能釋放信道資源!!!
            //因?yàn)檫@里的消費(fèi)者是用的推模式前弯,如果關(guān)閉了信道蚪缀,后面在進(jìn)行消息消費(fèi)的時(shí)候會報(bào)錯
            //downChannel.close();
        }
        catch (ShutdownSignalException ex)
        {
            //連接異常關(guān)閉了秫逝,這里要進(jìn)行檢查,并嘗試重新建立連接
            ex.printStackTrace();
        }
        catch (IOException ex)
        {
            //發(fā)生io異常需要進(jìn)行處理询枚,對應(yīng)channel可能關(guān)閉了
            ex.printStackTrace();
        }
    }

    /**
     * 創(chuàng)建消費(fèi)對象
     * @param channel
     * @return
     */
    private Consumer createConsumer(Channel channel)
    {
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                String message = new String(body);
                System.out.println(Thread.currentThread().getId()+"RabbitMQ推模式消費(fèi)者收到消息: " + message);
                // 消息確認(rèn)
                try {
                    channel.basicAck(envelope.getDeliveryTag(), false);//手動確認(rèn)消息
                } catch (IOException e) {
                    //發(fā)生io異常需要進(jìn)行處理违帆,對應(yīng)channel可能關(guān)閉了
                    e.printStackTrace();
                }
            }
        };
        return consumer;
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市金蜀,隨后出現(xiàn)的幾起案子刷后,更是在濱河造成了極大的恐慌,老刑警劉巖渊抄,帶你破解...
    沈念sama閱讀 217,826評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件尝胆,死亡現(xiàn)場離奇詭異,居然都是意外死亡护桦,警方通過查閱死者的電腦和手機(jī)含衔,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,968評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來二庵,“玉大人抱慌,你說我怎么就攤上這事≌A裕” “怎么了?”我有些...
    開封第一講書人閱讀 164,234評論 0 354
  • 文/不壞的土叔 我叫張陵强经,是天一觀的道長睡陪。 經(jīng)常有香客問我,道長匿情,這世上最難降的妖魔是什么兰迫? 我笑而不...
    開封第一講書人閱讀 58,562評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮炬称,結(jié)果婚禮上汁果,老公的妹妹穿的比我還像新娘。我一直安慰自己玲躯,他們只是感情好据德,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,611評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著跷车,像睡著了一般棘利。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上朽缴,一...
    開封第一講書人閱讀 51,482評論 1 302
  • 那天善玫,我揣著相機(jī)與錄音,去河邊找鬼密强。 笑死茅郎,一個胖子當(dāng)著我的面吹牛蜗元,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播系冗,決...
    沈念sama閱讀 40,271評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼奕扣,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了毕谴?” 一聲冷哼從身側(cè)響起成畦,我...
    開封第一講書人閱讀 39,166評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎涝开,沒想到半個月后循帐,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,608評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡舀武,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,814評論 3 336
  • 正文 我和宋清朗相戀三年拄养,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片银舱。...
    茶點(diǎn)故事閱讀 39,926評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡瘪匿,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出寻馏,到底是詐尸還是另有隱情棋弥,我是刑警寧澤,帶...
    沈念sama閱讀 35,644評論 5 346
  • 正文 年R本政府宣布诚欠,位于F島的核電站顽染,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏轰绵。R本人自食惡果不足惜粉寞,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,249評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望左腔。 院中可真熱鬧唧垦,春花似錦、人聲如沸液样。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,866評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽鞭莽。三九已至双炕,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間撮抓,已是汗流浹背妇斤。 一陣腳步聲響...
    開封第一講書人閱讀 32,991評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人站超。 一個月前我還...
    沈念sama閱讀 48,063評論 3 370
  • 正文 我出身青樓荸恕,卻偏偏與公主長得像,于是被迫代替她去往敵國和親死相。 傳聞我的和親對象是個殘疾皇子融求,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,871評論 2 354