Spring Boot 之 RabbitMQ 集成之路

最近集成了RabbitMQ消息隊(duì)列展父,試著研究一下RabbitMQ的幾種模式孤钦,參考官方文檔
新建項(xiàng)目什么的不講了。
配置pom.xml文件,用到了springboot對于AMQP(高級消息隊(duì)列協(xié)議,即面向消息的中間件的設(shè)計)

<!-- 添加springboot對amqp的支持 -->
   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
   </dependency>

創(chuàng)建個rabbit目錄皂冰,然后新建三個類吠裆,sender,receiver够掠,config

Work模式

@Configuration
public class MQConfig {
    
    public static final String QUEUE = "queue";
    
    /**
     * Direct模式 交換機(jī)Exchange
     * */
    @Bean
    public Queue queue() {
        return new Queue(QUEUE, true);
    }
}

config里配置好一個最簡單的queue
然后sender里使用AmqpTemplate去發(fā)送消息

@Service
public class MQSender {

    private static Logger log = LoggerFactory.getLogger(MQSender.class);
    
    @Autowired
    AmqpTemplate amqpTemplate ;
    
    public void send(Object message) {
        String msg = RedisService.beanToString(message);//這里只是我其他類里一個Object對象轉(zhuǎn)String對象民褂,你們可以自己實(shí)現(xiàn)
        log.info("send message:"+msg);
        amqpTemplate.convertAndSend(MQConfig.QUEUE, msg);
    }
}

然后在 receiver里接收消息

@Service
public class MQReceiver {

    private static Logger log = LoggerFactory.getLogger(MQReceiver.class);  

    @RabbitListener(queues=MQConfig.QUEUE)
    public void receive(String message) {
        log.info("receive message:"+message);
    }
}

在Controller調(diào)用send方法運(yùn)行之后控制臺會輸出如下結(jié)果

2019-02-18 11:31:33.377 INFO 2356 --- [nio-8080-exec-1] com.imooc.miaosha.rabbitmq.MQSender : send message:hello,world
2019-02-18 11:31:33.514 INFO 2356 --- [cTaskExecutor-1] com.imooc.miaosha.rabbitmq.MQReceiver : receive message:hello,world

公平派遣與循環(huán)派遣

默認(rèn)情況下,RabbitMQ將按順序?qū)⒚織l消息發(fā)送給下一個消費(fèi)者疯潭。平均而言赊堪,每個消費(fèi)者將獲得相同數(shù)量的消息。這種分發(fā)消息的方式稱為循環(huán)法竖哩。在這種模式下哭廉,調(diào)度不一定完全符合我們的要求。例如相叁,在有兩個工人的情況下遵绰,當(dāng)所有奇怪的消息都很重,甚至消息很輕時增淹,一個工人將經(jīng)常忙椿访,而另一個工作人員幾乎不會做任何工作。那么虑润,RabbitMQ對此一無所知成玫,仍然會均勻地發(fā)送消息。

發(fā)生這種情況是因?yàn)镽abbitMQ只是在消息進(jìn)入隊(duì)列時調(diào)度消息。它不會查看消費(fèi)者未確認(rèn)消息的數(shù)量哭当。它只是盲目地向第n個消費(fèi)者發(fā)送每個第n個消息猪腕。

Fanout 交換機(jī)模式

正如您可能從名稱中猜到的那樣,它只是將收到的所有消息廣播到它知道的所有隊(duì)列中钦勘。而這正是我們傳播信息所需要的陋葡。


參照官方實(shí)例的實(shí)現(xiàn),代碼如下
config彻采,里面的自動刪除隊(duì)列

@Configuration
public class MQConfig {
    /**
     * Fanout模式 交換機(jī)Exchange
     * */
    @Bean
    public Queue autoDeleteQueue1()  {
        return  new AnonymousQueue();
    }
    @Bean
    public Queue autoDeleteQueue2 ()  {
        return  new AnonymousQueue();
    }
    @Bean
    public FanoutExchange fanoutExchage(){
        return new FanoutExchange(FANOUT_EXCHANGE);
    }
    @Bean
    public Binding FanoutBinding1() {
        return BindingBuilder.bind(autoDeleteQueue1()).to(fanoutExchage());
    }
    @Bean
    public Binding FanoutBinding2() {
        return BindingBuilder.bind(autoDeleteQueue2()).to(fanoutExchage());
    }
}

臨時隊(duì)列 (AnonymousQueue)

您可能還記得以前我們使用的是具有特定名稱的隊(duì)列脖岛。能夠命名隊(duì)列對我們來說至關(guān)重要 - 我們需要將工作人員指向同一個隊(duì)列。當(dāng)您想要在生產(chǎn)者和消費(fèi)者之間共享隊(duì)列時颊亮,為隊(duì)列命名非常重要。

但是我們的Fanout示例并非如此陨溅。我們希望了解所有消息终惑,而不僅僅是其中的一部分消息。我們也只對目前流動的消息感興趣门扇,而不是舊消息雹有。要解決這個問題,我們需要兩件事臼寄。

首先霸奕,每當(dāng)我們連接到Rabbit時,我們都需要一個新的空隊(duì)列吉拳。為此质帅,我們可以使用隨機(jī)名稱創(chuàng)建隊(duì)列,或者更好 - 讓服務(wù)器為我們選擇隨機(jī)隊(duì)列名稱留攒。

其次煤惩,一旦我們斷開消費(fèi)者,就應(yīng)該自動刪除隊(duì)列炼邀。為了使用Spring AMQP客戶端執(zhí)行此操作魄揉,我們定義了AnonymousQueue,它使用生成的名稱創(chuàng)建一個非持久的拭宁,獨(dú)占的自動刪除隊(duì)列

sender里添加代碼:

    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void sendFanout() {
        StringBuilder builder = new StringBuilder("Hello");
        if (dots.incrementAndGet() == 3) {
            dots.set(1);
        }
        for (int i = 0; i < dots.get(); i++) {
            builder.append('.');
        }
        builder.append(count.incrementAndGet());
        String message = builder.toString();
        amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE, "", message);
        log.info("sendFanout [x] Sent '" + message + "'");
    }

receiver里添加代碼:

        @RabbitListener(queues="#{autoDeleteQueue1.name}")
    public void receiveFanout1(String in) throws InterruptedException{
        StopWatch watch = new StopWatch();
        watch.start();
        log.info("receiverFanout 1 " + " [x] Received '" + in + "'");
        doWork(in);
        watch.stop();
        log.info("receiverFanout 1 " + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
    }

    @RabbitListener(queues="#{autoDeleteQueue2.name}")
    public void receiveFanout2(String in) throws InterruptedException{
        StopWatch watch = new StopWatch();
        watch.start();
        log.info("receiverFanout 2 " + " [x] Received '" + in + "'");
        doWork(in);
        watch.stop();
        log.info("receiverFanout 2 " + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
        }

運(yùn)行之后控制臺可以看到如下信息洛退,可見exchange把消息廣播給了所有的receiver。

適用于需要廣播的操作

Direct 交換機(jī)模式

我們會將消息發(fā)送給特定的隊(duì)列杰标,而不是廣播兵怯。我們使用一個key作為路由鍵。這樣接收程序?qū)⒛軌蜻x擇它想要接收(或訂閱)的key



config代碼里添加:

    /**
     * Direct模式 交換機(jī)Exchange
     * */

    @Bean
    public DirectExchange direct(){
        return new DirectExchange(DIRECT_EXCHANGE);
    }
    @Bean
    public Binding DirectBinding1() {
        return BindingBuilder.bind(autoDeleteQueue1())
                .to(direct())
                .with("orange");
    }
    @Bean
    public Binding DirectBinding2() {
        return BindingBuilder.bind(autoDeleteQueue1())
                .to(direct())
                .with("black");
    }
    @Bean
    public Binding DirectBinding3() {
        return BindingBuilder.bind(autoDeleteQueue2())
                .to(direct())
                .with("green");
    }
    @Bean
    public Binding DirectBinding4() {
        return BindingBuilder.bind(autoDeleteQueue2())
                .to(direct())
                .with("black");
    }

sender代碼里添加:

    private final String[] keys = {"orange", "black", "green"};


    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void sendDirect() {
        StringBuilder builder = new StringBuilder("Hello to ");
        if (dots.incrementAndGet() == 3) {
            dots.set(0);
        }
        String key = keys[this.dots.get()];
        builder.append(key).append(' ');
        builder.append(this.count.get());
        String message = builder.toString();
        amqpTemplate.convertAndSend(MQConfig.DIRECT_EXCHANGE, key, message);
        log.info("sendDirect [x] Sent '" + message + "'");
    }

receiver代碼添加:

    @RabbitListener(queues="#{autoDeleteQueue1.name}")
    public void receiveFanout1(String in) throws InterruptedException{
        StopWatch watch = new StopWatch();
        watch.start();
        log.info("receiverDirect 1 " + " [x] Received '" + in + "'");
        doWork(in);
        watch.stop();
        log.info("receiverDirect 1 " + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
    }

    @RabbitListener(queues="#{autoDeleteQueue2.name}")
    public void receiveFanout2(String in) throws InterruptedException{
        StopWatch watch = new StopWatch();
        watch.start();
        log.info("receiverDirect 2 " + " [x] Received '" + in + "'");
        doWork(in);
        watch.stop();
        log.info("receiverDirect 2 " + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
    }

運(yùn)行后結(jié)果如下在旱,可見摇零,兩個receiver都能收到black信息,而receiver1只能收到green信息而receiver2只能收到orange信息。

Topic 交換機(jī)模式

主題交流
發(fā)送到主題交換的消息不能具有任意的 routing_key - 它必須是由點(diǎn)分隔的單詞列表驻仅。單詞可以是任何內(nèi)容谅畅,但通常它們指定與消息相關(guān)的一些功能。一些有效的路由鍵示例:“ stock.usd.nyse ”噪服,“ nyse.vmw ”毡泻,“ quick.orange.rabbit ”。路由密鑰中可以包含任意數(shù)量的單詞粘优,最多可達(dá)255個字節(jié)仇味。

綁定密鑰也必須采用相同的形式。主題交換背后的邏輯 類似于直接交換- 使用特定路由密鑰發(fā)送的消息將被傳遞到與匹配綁定密鑰綁定的所有隊(duì)列雹顺。但是綁定鍵有兩個重要的特殊情況:

  • *(星號)可以替代一個單詞丹墨。
  • #(hash)可以替換零個或多個單詞。



    config添加代碼如下:

    /**
     * Topic模式 交換機(jī)Exchange
     * */
    @Bean
    public TopicExchange topicExchage(){
        return new TopicExchange(TOPIC_EXCHANGE);
    }
    @Bean
    public Binding topicBinding1() {
        return BindingBuilder.bind(autoDeleteQueue1()).to(topicExchage()).with("*.orange.*");
    }
    @Bean
    public Binding topicBinding2() {
        return BindingBuilder.bind(autoDeleteQueue1()).to(topicExchage()).with("*.*.rabbit");
    }
    @Bean
    public Binding topicBinding3() {
        return BindingBuilder.bind(autoDeleteQueue2()).to(topicExchage()).with("lazy.#");
    }

sender添加如下代碼:

    private final String[] topickeys = {"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox",
            "lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"};

    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void sendTopic() {
        StringBuilder builder = new StringBuilder("Hello to ");
        if (this.dots.incrementAndGet() == topickeys.length) {
            this.dots.set(0);
        }
        String key = topickeys[this.dots.get()];
        builder.append(key).append(' ');
        builder.append(this.count.incrementAndGet());
        String message = builder.toString();
        amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, key, message);
        log.info(" [x] Sent '" + message + "'");
    }

receiver代碼添加如下:

    @RabbitListener(queues="#{autoDeleteQueue1.name}")
    public void receiveTopic1(String in) throws InterruptedException{
        StopWatch watch = new StopWatch();
        watch.start();
        log.info("receiverTopic 1 " + " [x] Received '" + in + "'");
        doWork(in);
        watch.stop();
        log.info("receiverTopic 1 " + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
    }

    @RabbitListener(queues="#{autoDeleteQueue2.name}")
    public void receiveTopic2(String in) throws InterruptedException{
        StopWatch watch = new StopWatch();
        watch.start();
        log.info("receiverTopic 2 " + " [x] Received '" + in + "'");
        doWork(in);
        watch.stop();
        log.info("receiverTopic 2 " + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
    }

運(yùn)行后得到如下結(jié)果嬉愧,路由密鑰設(shè)置為“ quick.orange.rabbit ”的消息將傳遞到兩個隊(duì)列贩挣。消息“ lazy.orange.elephant ”也將同時發(fā)送給他們。另一方面没酣,“ quick.orange.fox ”只會轉(zhuǎn)到第一個隊(duì)列王财,而“ lazy.brown.fox ”只會轉(zhuǎn)到第二個隊(duì)列≡1悖“ lazy.pink.rabbit ”將僅傳遞到第二個隊(duì)列一次绒净,即使它匹配兩個綁定〕ニィ“ quick.brown.fox ”與任何綁定都不匹配挂疆,因此它將被丟棄。

如果我們違反合同并發(fā)送帶有一個或四個單詞的消息哎垦,例如“ orange ”或“ quick.orange.male.rabbit ”囱嫩,會發(fā)生什么?好吧漏设,這些消息將不匹配任何綁定墨闲,并將丟失。

另一方面郑口,“ lazy.orange.male.rabbit ”鸳碧,即使它有四個單詞,也會匹配最后一個綁定犬性,并將被傳遞到第二個隊(duì)列瞻离。

可見,topic模式能夠?qū)崿F(xiàn)更加靈活的控制信息流向
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末乒裆,一起剝皮案震驚了整個濱河市套利,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖肉迫,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件验辞,死亡現(xiàn)場離奇詭異,居然都是意外死亡喊衫,警方通過查閱死者的電腦和手機(jī)跌造,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來族购,“玉大人壳贪,你說我怎么就攤上這事∏拚龋” “怎么了违施?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長瑟幕。 經(jīng)常有香客問我醉拓,道長,這世上最難降的妖魔是什么收苏? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮愤兵,結(jié)果婚禮上鹿霸,老公的妹妹穿的比我還像新娘。我一直安慰自己秆乳,他們只是感情好懦鼠,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著屹堰,像睡著了一般肛冶。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上扯键,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天睦袖,我揣著相機(jī)與錄音,去河邊找鬼荣刑。 笑死馅笙,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的厉亏。 我是一名探鬼主播董习,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼爱只!你這毒婦竟也來了皿淋?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎窝趣,沒想到半個月后疯暑,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡高帖,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年缰儿,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片散址。...
    茶點(diǎn)故事閱讀 39,690評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡乖阵,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出预麸,到底是詐尸還是另有隱情瞪浸,我是刑警寧澤,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布吏祸,位于F島的核電站对蒲,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏贡翘。R本人自食惡果不足惜蹈矮,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望鸣驱。 院中可真熱鬧泛鸟,春花似錦、人聲如沸踊东。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽闸翅。三九已至再芋,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間坚冀,已是汗流浹背济赎。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留记某,地道東北人联喘。 一個月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像辙纬,于是被迫代替她去往敵國和親豁遭。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評論 2 353

推薦閱讀更多精彩內(nèi)容