SpringBoot使用RabbitMQ消息隊列

消息隊列中間件是分布式系統(tǒng)中重要的組件牵祟,主要解決應(yīng)用耦合深夯,異步消息,流量削鋒等問題诺苹,實現(xiàn)高性能咕晋,高可用,可伸縮和最終一致性架構(gòu)使用較多的消息隊列有ActiveMQ收奔,RabbitMQ掌呜,Kafka,RocketMQ等筹淫,這里主要講解RabbitMQ的簡單使用

創(chuàng)建SpringBoot項目站辉,并引入依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Queue方式

  • 定義配置類
/**
 * @author Gjing
 **/
@Configuration
public class RabbitMqConfiguration {

    /**
     * 聲明一個名為simple的隊列
     */
    @Bean
    public Queue testQueue() {
        return new Queue("simple");
    }
}    
  • 聲明一個生產(chǎn)者
/**
 * @author Gjing
 **/
@Component
public class Producer {

    @Resource
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String message = "hello";
        this.rabbitTemplate.convertAndSend("simple", message);
    }
}
  • 聲明消費者
/**
 * @author Gjing
 **/
@Slf4j
@Component
public class Consumer {

    @RabbitListener(queues = "simple")
    public void receive(String message) {
        log.info("消費者1收到消息:{}", message);
    }
}
  • 創(chuàng)建Controller進行調(diào)用
/**
 * @author Gjing
 **/
@RestController
public class DemoController {

    @Resource
    private Producer producer;

    @PostMapping("/message")
    public void send() {
        for (int i = 0; i < 10; i++) {
            this.producer.send();
        }
    }
}
  • 執(zhí)行結(jié)果
    [圖片上傳失敗...(image-67b764-1561538726085)]

topic exchange方式

  • 定義配置類
/**
 * @author Gjing
 **/
@Configuration
public class RabbitMqConfiguration {

    /**
     * 聲明一個名為topic.message1的隊列
     */
    @Bean
    public Queue topicQueue() {
        return new Queue("topic.message1");
    }

    /**
     * 聲明一個名為topic.message2的隊列
     */
    @Bean
    public Queue topicQueue2() {
        return new Queue("topic.message2");
    }

    /**
     * 聲明一個名為exchange的交換機
     */
    @Bean
    public TopicExchange exchange() {
        return new TopicExchange("exchange");
    }

    /**
     * 將topic.message1的隊列綁定到exchange交換機
     */
    @Bean
    public Binding bindMessage1() {
        return BindingBuilder.bind(topicQueue()).to(exchange()).with("topic.message1");
    }

    /**
     * 將topic.message2的隊列綁定到exchange交換機
     */
    @Bean
    public Binding bindMessage2() {
        return BindingBuilder.bind(topicQueue2()).to(exchange()).with("topic.message2");
    }
}
  • 定義生產(chǎn)者
/**
 * @author Gjing
 **/
@Component
public class TopicProducer {

    @Resource
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String message1 = "I am topic.message1";
        String message2 = "I am topic.message2";
        this.rabbitTemplate.convertAndSend("exchange", "topic.message1", message1);
        this.rabbitTemplate.convertAndSend("exchange", "topic.message2", message2);
    }
}
  • 定義消費者1
/**
 * @author Gjing
 **/
@Component
@Slf4j
public class TopicConsumer1 {

    @RabbitListener(queues = "topic.message1")
    public void receive(String message) {
        log.info("消費者1收到消息:{}", message);
    }
}
  • 定義消費者2
/**
 * @author Gjing
 **/
@Component
@Slf4j
public class TopicConusmer2 {
    @RabbitListener(queues = "topic.message2")
    public void receive(String message) {
        log.info("消費者2收到消息:{}", message);
    }
}
  • 創(chuàng)建controller進行調(diào)用
/**
 * @author Gjing
 **/
@RestController
public class TopicController {
    @Resource
    private TopicProducer topicProducer;

    @PostMapping("/message-topic")
    public void sendMessageTopic() {
        for (int i = 0; i < 10; i++) {
            this.topicProducer.send();
        }
    }
}
  • 執(zhí)行結(jié)果


    1560475964_1_

fanout方式

  • 定義配置類
/**
 * @author Gjing
 **/
@Configuration
public class RabbitMqConfiguration {

    /**
     * 聲明一個名為fanout.1的隊列
     */
    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanout.1");
    }
    /**
     * 聲明一個名為fanout.2的隊列
     */
    @Bean
    public Queue fanoutQueue2() {
        return new Queue("fanout.2");
    }
    /**
     * 聲明一個名為fanout.3的隊列
     */
    @Bean
    public Queue fanoutQueue3() {
        return new Queue("fanout.3");
    }

    /**
     * 聲明一個名為fanoutExchange的轉(zhuǎn)發(fā)器
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    /**
     * 將隊列fanoutQueue1綁定到fanout轉(zhuǎn)發(fā)器
     */
    @Bean
    public Binding bindFanout1() {
        return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
    }

    /**
     * 將隊列fanoutQueue1綁定到fanout轉(zhuǎn)發(fā)器
     */
    @Bean
    public Binding bindFanout2() {
        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
    }

    /**
     * 將隊列fanoutQueue1綁定到fanout轉(zhuǎn)發(fā)器
     */
    @Bean
    public Binding bindFanout3() {
        return BindingBuilder.bind(fanoutQueue3()).to(fanoutExchange());
    }
}
  • 定義生產(chǎn)者
/**
 * @author Gjing
 **/
@Component
public class FanoutProducer {
    @Resource
    private AmqpTemplate amqpTemplate;

    public void send() {
        String message = "hello, I am speaker";
        //這里的routingKey會被rabbitMQ忽略,如果不設(shè)置這個參數(shù)會導(dǎo)致發(fā)送消息失敗,
        //所以這里隨便寫(我給他空字符串)损姜,rabbitMQ會默認(rèn)發(fā)給所有綁定的
        this.amqpTemplate.convertAndSend("fanoutExchange","", message);
    }
}
  • 定義消費者1
/**
 * @author Gjing
 **/
@Component
@Slf4j
public class FanoutConsumer1 {

    @RabbitListener(queues = "fanout.1")
    public void receive(String message) {
        log.info("消費者1收到消息:{}", message);
    }
}
  • 定義消費者2
/**
 * @author Gjing
 **/
@Component
@Slf4j
public class FanoutConsumer2 {
    @RabbitListener(queues = "fanout.2")
    public void receive(String message) {
        log.info("消費者2收到消息:{}", message);
    }
}
  • 定義消費者3
/**
 * @author Gjing
 **/
@Component
@Slf4j
public class FanoutConsumer3 {
    @RabbitListener(queues = "fanout.3")
    public void receive(String message) {
        log.info("消費者3收到消息:{}", message);
    }
}
  • 創(chuàng)建controller調(diào)用
/**
 * @author Gjing
 **/
@RestController
public class FanoutController {
    @Resource
    private FanoutProducer fanoutProducer;

    @PostMapping("/message-fanout")
    public void sendFanout() {
        this.fanoutProducer.send();
    }
}
image.png

RabbitMQ核心概念

server:又稱Broker饰剥,接受客戶端的連接實現(xiàn)AMQP實體服務(wù);connection:與broker的連接摧阅;channel:網(wǎng)絡(luò)通道汰蓉,幾乎所有的操作都是在channel中進行;message:服務(wù)器和應(yīng)用程序之間傳送的數(shù)據(jù)棒卷,由properties和body組成顾孽,properties可以對消息進行修飾祝钢,比如消息的優(yōu)先級和高級特性, body為消息的內(nèi)容若厚;exchange:交換機拦英,接收消息,根據(jù)路郵件轉(zhuǎn)發(fā)消息到綁定的隊列测秸;binding:exchange和queue之間的虛擬連接疤估,可以包含routing key;Routing key:一個路由規(guī)則霎冯,虛擬機用他確定如何路由一個特定信息铃拇;Queue:也稱為message Queue,消息隊列沈撞,保存信息并將它們轉(zhuǎn)發(fā)給消費者慷荔。

Exchange類型:

  • Fanout:路由規(guī)則是把所有發(fā)送到該Exchange的消息路由到所有與她綁定的Queue中
    1

備注:生產(chǎn)者P生產(chǎn)消息1推送到Exchange,由于Exchange Type=fanout這時候會遵循fanout的規(guī)則將消息推送到所有與他綁定的Queue缠俺。

  • direct:把消息路由到那些binding key與routing key完全匹配的Queue中显晶。
    2

備注:生產(chǎn)者P發(fā)送消息時Routing key = bloking時,這時候?qū)⑾魉偷紼xchange壹士,Exchange獲取到生產(chǎn)者發(fā)送過來的消息后吧碾,會根據(jù)自身的規(guī)則進行與匹配響應(yīng)的Queue,這時候發(fā)現(xiàn)Queue1和Queue2都符合墓卦,就會將消息傳送給這兩個隊列,如果我們以Routing key = create和routing key = confirm發(fā)送消息時户敬,這時候消息只會被推送到Queue2隊列中落剪,其他的Routing key 的消息會被丟棄。

  • topic:模糊匹配尿庐,通過通配符滿足一部分規(guī)則就可以傳送忠怖,其中注意的是有兩個字符 ‘星號’ 和#號,其中 星號 用于匹配一個單詞抄瑟,#號用于匹配多個單詞(可以是0個)
    [圖片上傳失敗...(image-85b63c-1561538726082)]

備注:當(dāng)生產(chǎn)者發(fā)送消息Routing Key=F.C.E的時候凡泣,這時候只滿足Queue1,所以會被路由到Queue中皮假,如果Routing Key=A.C.E這時候會被同是路由到Queue1和Queue2中鞋拟,如果Routing Key=A.F.B時,這里只會發(fā)送一條消息到Queue2中惹资。

常見面試題

  • 什么是元數(shù)據(jù)贺纲?元數(shù)據(jù)分為哪些類型?包括哪些內(nèi)容褪测?與cluster相關(guān)的元數(shù)據(jù)有哪些猴誊?元數(shù)據(jù)是如何保存的潦刃?元數(shù)據(jù)在cluster中是如何分布的?

在非cluster模式中懈叹,元數(shù)據(jù)主要分為Queue元數(shù)據(jù)(Queue名字和屬性等)乖杠、Exchange元數(shù)據(jù)(Exchange名字、類型澄成、屬性等)胧洒、binding元數(shù)據(jù)(存放路由關(guān)系的查找表)、vhost元數(shù)據(jù)(vhost范圍內(nèi)針對前三者的名字空間約束和安全屬性設(shè)置)环揽。在cluster模式下略荡,包括cluster中node位置信息和node關(guān)系信息。元數(shù)據(jù)按照erlang node的類型確定是僅保存于RAM中歉胶,還是同時保存在RAM或者Disk上汛兜,元數(shù)據(jù)在cluster中是全node分布的

  • rabbitmq的一個Queue中存放的message是否有數(shù)量限制?

可以認(rèn)為無限制通今,限制取決于機器的內(nèi)存粥谬,但是消息過多會導(dǎo)致處理效率的下降。

  • rabbitmq如何實現(xiàn)延遲隊列辫塌?

沒有直接支持延遲隊列功能漏策,但是可以通過兩個特性來實現(xiàn)延遲隊列,①TTL:通過隊列屬性設(shè)置臼氨,隊列中的所有消息都有相同的過期時間掺喻、對消息進行單獨設(shè)置,每條消息TTL可以不同储矩。如果同時使用感耙,則消息的過期時間以兩者之間TTL較小的那個數(shù)值為準(zhǔn),消息在隊列的生存時間一旦超過設(shè)置的TTL值持隧,就稱為dead letter即硼。②DLX:Queue可以配置X-dead-letter-exchange和x-dead-letter-routing-key(可選)兩個參數(shù),如果隊列內(nèi)出現(xiàn)了dead letter屡拨,則按照這兩個參數(shù)重新路由轉(zhuǎn)發(fā)到指定的隊列只酥。

  • X-dead-letter-exchange:出現(xiàn)dead letter之后將dead letter重新發(fā)送到指定exchange

  • 出現(xiàn)dead letter之后將dead letter重新按照指定的routing-key發(fā)送

    • 出現(xiàn)dead letter的情況有:
    1. 消息或者隊列的TTL過期; 2. 隊列達(dá)到最大長度; 3. 消息被消費者拒絕
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市呀狼,隨后出現(xiàn)的幾起案子裂允,更是在濱河造成了極大的恐慌,老刑警劉巖赠潦,帶你破解...
    沈念sama閱讀 218,451評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件叫胖,死亡現(xiàn)場離奇詭異,居然都是意外死亡她奥,警方通過查閱死者的電腦和手機瓮增,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,172評論 3 394
  • 文/潘曉璐 我一進店門怎棱,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人绷跑,你說我怎么就攤上這事拳恋。” “怎么了砸捏?”我有些...
    開封第一講書人閱讀 164,782評論 0 354
  • 文/不壞的土叔 我叫張陵谬运,是天一觀的道長。 經(jīng)常有香客問我垦藏,道長梆暖,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,709評論 1 294
  • 正文 為了忘掉前任掂骏,我火速辦了婚禮轰驳,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘弟灼。我一直安慰自己级解,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,733評論 6 392
  • 文/花漫 我一把揭開白布田绑。 她就那樣靜靜地躺著勤哗,像睡著了一般。 火紅的嫁衣襯著肌膚如雪掩驱。 梳的紋絲不亂的頭發(fā)上芒划,一...
    開封第一講書人閱讀 51,578評論 1 305
  • 那天,我揣著相機與錄音欧穴,去河邊找鬼腊状。 笑死,一個胖子當(dāng)著我的面吹牛苔可,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播袋狞,決...
    沈念sama閱讀 40,320評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼焚辅,長吁一口氣:“原來是場噩夢啊……” “哼蠢箩!你這毒婦竟也來了轩端?” 一聲冷哼從身側(cè)響起遗座,我...
    開封第一講書人閱讀 39,241評論 0 276
  • 序言:老撾萬榮一對情侶失蹤粒蜈,失蹤者是張志新(化名)和其女友劉穎音念,沒想到半個月后等恐,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體途样,經(jīng)...
    沈念sama閱讀 45,686評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡壹甥,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,878評論 3 336
  • 正文 我和宋清朗相戀三年砌梆,在試婚紗的時候發(fā)現(xiàn)自己被綠了默责。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片贬循。...
    茶點故事閱讀 39,992評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖桃序,靈堂內(nèi)的尸體忽然破棺而出杖虾,到底是詐尸還是另有隱情,我是刑警寧澤媒熊,帶...
    沈念sama閱讀 35,715評論 5 346
  • 正文 年R本政府宣布奇适,位于F島的核電站,受9級特大地震影響芦鳍,放射性物質(zhì)發(fā)生泄漏嚷往。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,336評論 3 330
  • 文/蒙蒙 一柠衅、第九天 我趴在偏房一處隱蔽的房頂上張望皮仁。 院中可真熱鬧,春花似錦茄茁、人聲如沸魂贬。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,912評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽付燥。三九已至,卻和暖如春愈犹,著一層夾襖步出監(jiān)牢的瞬間键科,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,040評論 1 270
  • 我被黑心中介騙來泰國打工漩怎, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留勋颖,地道東北人。 一個月前我還...
    沈念sama閱讀 48,173評論 3 370
  • 正文 我出身青樓勋锤,卻偏偏與公主長得像饭玲,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子叁执,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,947評論 2 355

推薦閱讀更多精彩內(nèi)容