RabbitMQ簡(jiǎn)單使用

RabbitMQ是一個(gè)支持AMQP(高級(jí)消息協(xié)議)協(xié)議的消息中間件饺谬。

RabbitMQ的消息通信模型如下所示:

RabbitMQ邏輯圖

消息中間件主要就是用來(lái)發(fā)送和消費(fèi)消息。在RabbitMQ中物赶,消息通過(guò)Publisher發(fā)送者將消息出去培遵,由Consumer接收者接收到消息,并執(zhí)行對(duì)應(yīng)的業(yè)務(wù)邏輯蹂风。但Publisher并不是直接將消息發(fā)送給Consumer,而是先將發(fā)送到一個(gè)Exchange交換機(jī)乾蓬,每一個(gè)交換機(jī)都會(huì)Binding綁定一個(gè)Queue隊(duì)列惠啄,所以Publisher將消息發(fā)送到Exchange交換機(jī)后路由到對(duì)應(yīng)的Queue,將消息存放在Queue中任内,等待Consumer來(lái)拉取數(shù)據(jù)撵渡。

消息的發(fā)布與消費(fèi)簡(jiǎn)單流程如下:

  • Publisher通過(guò)Channel信道將消息發(fā)送到RabbitMQ服務(wù)中的Exchange;
  • Exchange通過(guò)Binding綁定規(guī)則找到對(duì)應(yīng)的Queue并將消息發(fā)送到對(duì)應(yīng)的Queue隊(duì)列中;
  • Consumer消費(fèi)者通過(guò)Channel信道從Queue中拉取數(shù)據(jù)死嗦;

Exchange消息分發(fā)策略

Publisher發(fā)送消息的時(shí)候會(huì)指定Exchange交換機(jī)的名稱趋距,并指定一個(gè)路由key,該路由key會(huì)將消息發(fā)送到具體綁定了這個(gè)路由key對(duì)應(yīng)的Queue中越除。Exchange交換機(jī)將消息發(fā)送到Queue有幾種分發(fā)策略:

  • direct:當(dāng)指定direct分發(fā)策略時(shí)霞捡,如果消息的路由key與隊(duì)列綁定的路由key相同時(shí),交換器就會(huì)將消息發(fā)送到該隊(duì)列中俄占。例如發(fā)送消息是指定路由key為 rk1 监署,那么如果隊(duì)列綁定的路由key也是 rk1,那么交換機(jī)會(huì)將消息發(fā)送到該隊(duì)列;
  • fanout:當(dāng)指定fanout分發(fā)策略時(shí),交換機(jī)不會(huì)處理路由key,交換機(jī)會(huì)將消息發(fā)送到所有綁定了在該交換機(jī)的隊(duì)列上箱熬。
  • topic:當(dāng)指定topic分發(fā)策略時(shí),交換器會(huì)通過(guò)模式匹配分發(fā)消息狈邑,如果路由key與某個(gè)模式匹配時(shí)城须,交換機(jī)就會(huì)將消息發(fā)送到與該模式匹配的隊(duì)列中。例如某個(gè)隊(duì)列 queue 綁定的路由key的模式為 a.# 官地,當(dāng) publisher 發(fā)送消息時(shí)酿傍,如果指定發(fā)送的路由key為 a.b 或者是 a.c 時(shí),該隊(duì)列將會(huì)收到路由器發(fā)送的消息驱入。
  • headers: 這種分發(fā)策略幾乎不用了赤炒。

引入pox.xml文件

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
</dependency>

消費(fèi)生產(chǎn)者Producer

/**
 * 消息的生產(chǎn)者
 */
public class Producer {

    public static void main(String[] args) throws Exception {
        //創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設(shè)置用戶名與密碼
        factory.setUsername("guest");
        factory.setPassword("guest");
        //設(shè)置 RabbitMQ 地址
        factory.setHost("localhost");

        //獲取連接,連接到代理服務(wù)器
        Connection connection = factory.newConnection();
        //獲取通信信道
        Channel channel = connection.createChannel();

        //設(shè)置exchange交換器的名稱
        String exchangeName = "greeting-exchange";
        //聲明exchange交換器
        //第二個(gè)參數(shù)聲明綁定的方式為direct
        //第三個(gè)參數(shù)為是否持久化,設(shè)置為true表示持久化,反之是非持久化,
        // 持久化的可以將交換器存盤,在服務(wù)器重啟的時(shí)候不會(huì)丟失信息
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);

        //指定路由的key
        String routingKey = "xsg";
        //發(fā)送的消息體
        byte[] messageBodyBytes = "hello, how are you!".getBytes();
        //發(fā)布信息亏较,也就是將消息發(fā)布到greeting-exchange的交換器
        // 并指定一個(gè)路由key莺褒,表示要路由到哪一個(gè)隊(duì)列
        channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

        //關(guān)閉信道
        channel.close();
        //關(guān)閉連接
        connection.close();

    }
}

具體步驟:

  • 通過(guò)ConnectFactory獲取連接工廠,設(shè)置RabbitMQ服務(wù)器的用戶名雪情、密碼還有服務(wù)器地址遵岩;
  • 調(diào)用ConnectFactory的newConnection方法獲取一個(gè)連接Connection,連接到代理服務(wù)器巡通;
  • 通過(guò)Connection的createChannel獲取到通信通道尘执,消息將通過(guò)該通信通道發(fā)布出去;
  • 定義Exchange交換機(jī)的名稱宴凉,也就是要將消息發(fā)送到哪個(gè)交換機(jī)誊锭;
  • 通過(guò)exchangeDeclare聲明一個(gè)交換機(jī),需要指定這個(gè)交換機(jī)分發(fā)消息的策略弥锄,這里指定為direct丧靡,表示將消息路由到路由key一致的隊(duì)列中;
  • 通過(guò)管道channel的basicPublish方法發(fā)布消息籽暇,需要指定交換機(jī)的名稱温治,表示要將消息發(fā)送到哪個(gè)交換機(jī);指定路由key戒悠,表現(xiàn)交換機(jī)要將消息發(fā)送到哪個(gè)隊(duì)列中熬荆;
  • 發(fā)送成功后關(guān)閉管道與連接;

消息的消費(fèi)者Consumer

/**
 * 消息的消費(fèi)者
 */
public class Consumer {

    public static void main(String[] args) throws Exception {
        //創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設(shè)置用戶名與密碼
        factory.setUsername("guest");
        factory.setPassword("guest");
        //設(shè)置 RabbitMQ 地址
        factory.setHost("localhost");

        //獲取連接绸狐,連接到代理服務(wù)器
        Connection connection = factory.newConnection();
        //獲取通信信道
        final Channel channel = connection.createChannel();

        //設(shè)置exchange交換器的名稱
        String exchangeName = "greeting-exchange";
        //聲明exchange交換器
        //第二個(gè)參數(shù)聲明綁定的方式為direct
        //第三個(gè)參數(shù)為是否持久化,設(shè)置為true表示持久化,反之是非持久化,
        // 持久化的可以將交換器存盤,在服務(wù)器重啟的時(shí)候不會(huì)丟失信息
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);

        //獲取隊(duì)列的名稱
        String queueName = channel.queueDeclare().getQueue();
        //指定路由的key
        String routingKey = "xsg";

        //交換器與隊(duì)列通過(guò)路由鍵判定起來(lái)
        channel.queueBind(queueName, exchangeName, routingKey);

        while (true) {
            //是否自動(dòng)應(yīng)答
            boolean autoAck = false;
            String consumerTag = "";
            channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    String routingKey = envelope.getRoutingKey();
                    System.out.println("消息的路由鍵:" + routingKey);
                    //deliveryTag為條消息的一個(gè)唯一id惶看,用來(lái)告訴RabbitMQ哪條消息被消費(fèi)了
                    long deliveryTag = envelope.getDeliveryTag();
                    //第二個(gè)參數(shù)說(shuō)明如何處理這個(gè)失敗消息捏顺,為 true 表示該消息重新放回隊(duì)列頭六孵,值為 false 表示放棄這條消息
                    channel.basicAck(deliveryTag, false);
                    System.out.println("消費(fèi)的消息體內(nèi)容:" + new String(body, "utf-8"));
                }
            });

        }
    }

}

具體步驟:

  • 通過(guò)ConnectFactory獲取連接工廠纬黎,設(shè)置RabbitMQ服務(wù)器的用戶名、密碼還有服務(wù)器地址劫窒;
  • 調(diào)用ConnectFactory的newConnection方法獲取一個(gè)連接Connection本今,連接到代理服務(wù)器;
  • 通過(guò)Connection的createChannel獲取到通信通道主巍,消費(fèi)者將通過(guò)該通信通道獲取消息冠息;
  • 定義Exchange交換機(jī)的名稱,也就是要將消息發(fā)送到哪個(gè)交換機(jī)孕索;
  • 通過(guò)exchangeDeclare聲明一個(gè)交換機(jī)逛艰,需要指定這個(gè)交換機(jī)分發(fā)消息的策略,這里指定為direct搞旭,表示將消息路由到路由key一致的隊(duì)列中散怖;
  • 通過(guò)channel.queueDeclare().getQueue() 獲取隊(duì)列名稱,并定義一個(gè)路由key肄渗;
  • 通過(guò)channel.queueBind 將交換機(jī)與隊(duì)列綁定起來(lái)镇眷,需要傳入隊(duì)列Queue的名稱、交換機(jī)的名稱翎嫡、路由Key欠动,表示要將某個(gè)交換機(jī)通過(guò)路由key綁定到某個(gè)隊(duì)列;
  • 通過(guò) channel.basicConsume 獲取消息惑申,需要傳入隊(duì)列名稱具伍,表示獲取哪個(gè)隊(duì)列的消息;
  • 處理消息后關(guān)閉管道與連接圈驼;

運(yùn)行結(jié)果:

首先運(yùn)行Consumer人芽,然后運(yùn)行Product,可以看到Consumer控制臺(tái)輸出以下信息:

消息的路由鍵:xsg
消費(fèi)的消息體內(nèi)容:hello, how are you!
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末碗脊,一起剝皮案震驚了整個(gè)濱河市啼肩,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌衙伶,老刑警劉巖祈坠,帶你破解...
    沈念sama閱讀 212,599評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異矢劲,居然都是意外死亡赦拘,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,629評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門芬沉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)躺同,“玉大人阁猜,你說(shuō)我怎么就攤上這事√R眨” “怎么了剃袍?”我有些...
    開(kāi)封第一講書人閱讀 158,084評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)捎谨。 經(jīng)常有香客問(wèn)我民效,道長(zhǎng),這世上最難降的妖魔是什么涛救? 我笑而不...
    開(kāi)封第一講書人閱讀 56,708評(píng)論 1 284
  • 正文 為了忘掉前任畏邢,我火速辦了婚禮,結(jié)果婚禮上检吆,老公的妹妹穿的比我還像新娘舒萎。我一直安慰自己,他們只是感情好蹭沛,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,813評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布臂寝。 她就那樣靜靜地躺著,像睡著了一般致板。 火紅的嫁衣襯著肌膚如雪交煞。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書人閱讀 50,021評(píng)論 1 291
  • 那天斟或,我揣著相機(jī)與錄音素征,去河邊找鬼。 笑死萝挤,一個(gè)胖子當(dāng)著我的面吹牛御毅,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播怜珍,決...
    沈念sama閱讀 39,120評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼端蛆,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了酥泛?” 一聲冷哼從身側(cè)響起今豆,我...
    開(kāi)封第一講書人閱讀 37,866評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎柔袁,沒(méi)想到半個(gè)月后呆躲,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,308評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡捶索,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,633評(píng)論 2 327
  • 正文 我和宋清朗相戀三年插掂,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,768評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡辅甥,死狀恐怖酝润,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情璃弄,我是刑警寧澤要销,帶...
    沈念sama閱讀 34,461評(píng)論 4 333
  • 正文 年R本政府宣布,位于F島的核電站谢揪,受9級(jí)特大地震影響蕉陋,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜拨扶,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,094評(píng)論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望茁肠。 院中可真熱鬧患民,春花似錦、人聲如沸垦梆。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 30,850評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)托猩。三九已至印蓖,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間京腥,已是汗流浹背赦肃。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 32,082評(píng)論 1 267
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留公浪,地道東北人他宛。 一個(gè)月前我還...
    沈念sama閱讀 46,571評(píng)論 2 362
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像欠气,于是被迫代替她去往敵國(guó)和親厅各。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,666評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容