spring boot集成RabbitMQ

RabbitMQ作為AMQP的代表性產(chǎn)品酸役,在項目中大量使用栗竖。結(jié)合現(xiàn)在主流的spring boot癣漆,極大簡化了開發(fā)過程中所涉及到的消息通信問題维咸。

首先正確的安裝RabbitMQ及運行正常。

RabbitMQ需啊erlang環(huán)境扑媚,所以首先安裝對應(yīng)版本的erlang腰湾,可在RabbitMQ官網(wǎng)下載

# rpm -ivh erlang-19.0.4-1.el7.centos.x86_64.rpm 

使用yum安裝RabbitMQ,避免缺少依賴包引起的安裝失敗

# yum install rabbitmq-server-3.6.6-1.el7.noarch.rpm

啟動RabbitMQ

# /sbin/service rabbitmq-server start

由于RabbitMQ默認(rèn)提供的guest用戶只能本地訪問,所以額外創(chuàng)建用戶用于測試

# /sbin/rabbitmqctl add_user test test123
用戶名:test疆股,密碼:test123

開啟web管理插件

# rabbitmq-plugins enable rabbitmq_management

并使用之前創(chuàng)建的用戶登錄,并設(shè)置該用戶為administrator,虛擬主機(jī)地址為/

spring boot 引入相關(guān)依賴

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>

消息生產(chǎn)者

application.properties添加一下配置
spring.rabbitmq.host=192.168.1.107
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=test123
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
spring boot配置類费坊,作用為指定隊列,交換器類型及綁定操作
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
    
    //聲明隊列
    @Bean
    public Queue queue1() {
        return new Queue("hello.queue1", true); // true表示持久化該隊列
    }
    
    @Bean
    public Queue queue2() {
        return new Queue("hello.queue2", true);
    }
    
    //聲明交互器
    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange("topicExchange");
    }

    //綁定
    @Bean
    public Binding binding1() {
        return BindingBuilder.bind(queue1()).to(topicExchange()).with("key.1");
    }
    
    @Bean
    public Binding binding2() {
        return BindingBuilder.bind(queue2()).to(topicExchange()).with("key.#");
    }
    
}

共聲明了2個隊列旬痹,分別是hello.queue1附井,hello.queue2,交換器類型為TopicExchange,并與hello.queue1两残,hello.queue2隊列分別綁定永毅。

生產(chǎn)者類
import java.util.UUID;

import javax.annotation.PostConstruct;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Sender implements RabbitTemplate.ConfirmCallback, ReturnCallback {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {  
            System.out.println("消息發(fā)送成功:" + correlationData);  
        } else {  
            System.out.println("消息發(fā)送失敗:" + cause);  
        }  
        
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println(message.getMessageProperties().getCorrelationIdString() + " 發(fā)送失敗");
        
    }

    //發(fā)送消息,不需要實現(xiàn)任何接口人弓,供外部調(diào)用沼死。
    public void send(String msg){
        
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        
        System.out.println("開始發(fā)送消息 : " + msg.toLowerCase());
        String response = rabbitTemplate.convertSendAndReceive("topicExchange", "key.1", msg, correlationId).toString();
        System.out.println("結(jié)束發(fā)送消息 : " + msg.toLowerCase());
        System.out.println("消費者響應(yīng) : " + response + " 消息處理完成");
    }
}

要點:

1.注入RabbitTemplate

2.實現(xiàn)RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback接口(非必須)。
ConfirmCallback接口用于實現(xiàn)消息發(fā)送到RabbitMQ交換器后接收ack回調(diào)崔赌。ReturnCallback接口用于實現(xiàn)消息發(fā)送到RabbitMQ交換器意蛀,但無相應(yīng)隊列與交換器綁定時的回調(diào)。

3.實現(xiàn)消息發(fā)送方法健芭。調(diào)用rabbitTemplate相應(yīng)的方法即可县钥。rabbitTemplate常用發(fā)送方法有

rabbitTemplate.send(message);   //發(fā)消息,參數(shù)類型為org.springframework.amqp.core.Message
rabbitTemplate.convertAndSend(object); //轉(zhuǎn)換并發(fā)送消息慈迈。 將參數(shù)對象轉(zhuǎn)換為org.springframework.amqp.core.Message后發(fā)送
rabbitTemplate.convertSendAndReceive(message) //轉(zhuǎn)換并發(fā)送消息,且等待消息者返回響應(yīng)消息若贮。   

針對業(yè)務(wù)場景選擇合適的消息發(fā)送方式即可。

消息消費者
application.properties添加一下配置
spring.rabbitmq.host=192.168.1.107
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=test123

spring.rabbitmq.listener.concurrency=2  //最小消息監(jiān)聽線程數(shù)
spring.rabbitmq.listener.max-concurrency=2 //最大消息監(jiān)聽線程數(shù)
消費者類
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Receiver {
    
    @RabbitListener(queues = "hello.queue1")
    public String processMessage1(String msg) {
        System.out.println(Thread.currentThread().getName() + " 接收到來自hello.queue1隊列的消息:" + msg);
        return msg.toUpperCase();
    }
    
    @RabbitListener(queues = "hello.queue2")
    public void processMessage2(String msg) {
        System.out.println(Thread.currentThread().getName() + " 接收到來自hello.queue2隊列的消息:" + msg);
    }
}

由于定義了2個隊列,所以分別定義不同的監(jiān)聽器監(jiān)聽不同的隊列谴麦。由于最小消息監(jiān)聽線程數(shù)和最大消息監(jiān)聽線程數(shù)都是2蠢沿,所以每個監(jiān)聽器各有2個線程實現(xiàn)監(jiān)聽功能。

要點:

1.監(jiān)聽器參數(shù)類型與消息實際類型匹配细移。在生產(chǎn)者中發(fā)送的消息實際類型是String搏予,所以這里監(jiān)聽器參數(shù)類型也是String。

2.如果監(jiān)聽器需要有響應(yīng)返回給生產(chǎn)者弧轧,直接在監(jiān)聽方法中return即可雪侥。

運行測試

import java.util.Date;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.sam.demo.rabbitmq.Application;
import com.sam.demo.rabbitmq.sender.Sender;

@RunWith(value=SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = Application.class)
public class RabbitTests {
    
    @Autowired
    private Sender sender;
    
    @Test
    public void sendTest() throws Exception {
        while(true){
            String msg = new Date().toString();
            sender.send(msg);
            Thread.sleep(1000);
        }
    }
}

輸出:

開始發(fā)送消息 : wed mar 29 23:20:52 cst 2017
SimpleAsyncTaskExecutor-1 接收到來自hello.queue2隊列的消息:Wed Mar 29 23:20:52 CST 2017
SimpleAsyncTaskExecutor-2 接收到來自hello.queue1隊列的消息:Wed Mar 29 23:20:52 CST 2017
結(jié)束發(fā)送消息 : wed mar 29 23:20:52 cst 2017
消費者響應(yīng) : WED MAR 29 23:20:52 CST 2017 消息處理完成
------------------------------------------------
消息發(fā)送成功:CorrelationData [id=340d14e6-cfcc-4653-9f95-29b37d50f886]
開始發(fā)送消息 : wed mar 29 23:20:53 cst 2017
SimpleAsyncTaskExecutor-1 接收到來自hello.queue1隊列的消息:Wed Mar 29 23:20:53 CST 2017
SimpleAsyncTaskExecutor-2 接收到來自hello.queue2隊列的消息:Wed Mar 29 23:20:53 CST 2017
結(jié)束發(fā)送消息 : wed mar 29 23:20:53 cst 2017
消費者響應(yīng) : WED MAR 29 23:20:53 CST 2017 消息處理完成
------------------------------------------------
消息發(fā)送成功:CorrelationData [id=e4e01f89-d0d4-405e-80f0-85bb20238f34]
開始發(fā)送消息 : wed mar 29 23:20:54 cst 2017
SimpleAsyncTaskExecutor-2 接收到來自hello.queue1隊列的消息:Wed Mar 29 23:20:54 CST 2017
SimpleAsyncTaskExecutor-1 接收到來自hello.queue2隊列的消息:Wed Mar 29 23:20:54 CST 2017
結(jié)束發(fā)送消息 : wed mar 29 23:20:54 cst 2017
消費者響應(yīng) : WED MAR 29 23:20:54 CST 2017 消息處理完成
------------------------------------------------

如果需要使用的其他的交換器類型潭辈,spring中都已提供實現(xiàn)栋荸,所有的交換器均實現(xiàn)org.springframework.amqp.core.AbstractExchange接口撬腾。

常用交換器類型如下:

Direct(DirectExchange):direct 類型的行為是"先匹配, 再投送". 即在綁定時設(shè)定一個 routing_key, 消息的routing_key完全匹配時, 才會被交換器投送到綁定的隊列中去考榨。

Topic(TopicExchange):按規(guī)則轉(zhuǎn)發(fā)消息(最靈活)馏锡。

Headers(HeadersExchange):設(shè)置header attribute參數(shù)類型的交換機(jī)耻涛。

Fanout(FanoutExchange):轉(zhuǎn)發(fā)消息到所有綁定隊列优训。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末呢燥,一起剝皮案震驚了整個濱河市搁吓,隨后出現(xiàn)的幾起案子原茅,更是在濱河造成了極大的恐慌,老刑警劉巖堕仔,帶你破解...
    沈念sama閱讀 216,496評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件擂橘,死亡現(xiàn)場離奇詭異,居然都是意外死亡摩骨,警方通過查閱死者的電腦和手機(jī)通贞,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,407評論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來恼五,“玉大人昌罩,你說我怎么就攤上這事≡致” “怎么了茎用?”我有些...
    開封第一講書人閱讀 162,632評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長睬罗。 經(jīng)常有香客問我轨功,道長,這世上最難降的妖魔是什么傅物? 我笑而不...
    開封第一講書人閱讀 58,180評論 1 292
  • 正文 為了忘掉前任夯辖,我火速辦了婚禮琉预,結(jié)果婚禮上董饰,老公的妹妹穿的比我還像新娘。我一直安慰自己,他們只是感情好卒暂,可當(dāng)我...
    茶點故事閱讀 67,198評論 6 388
  • 文/花漫 我一把揭開白布啄栓。 她就那樣靜靜地躺著,像睡著了一般也祠。 火紅的嫁衣襯著肌膚如雪昙楚。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,165評論 1 299
  • 那天诈嘿,我揣著相機(jī)與錄音堪旧,去河邊找鬼。 笑死奖亚,一個胖子當(dāng)著我的面吹牛淳梦,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播昔字,決...
    沈念sama閱讀 40,052評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼爆袍,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了作郭?” 一聲冷哼從身側(cè)響起陨囊,我...
    開封第一講書人閱讀 38,910評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎夹攒,沒想到半個月后蜘醋,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,324評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡芹助,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,542評論 2 332
  • 正文 我和宋清朗相戀三年堂湖,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片状土。...
    茶點故事閱讀 39,711評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡无蜂,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出蒙谓,到底是詐尸還是另有隱情斥季,我是刑警寧澤,帶...
    沈念sama閱讀 35,424評論 5 343
  • 正文 年R本政府宣布累驮,位于F島的核電站酣倾,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏谤专。R本人自食惡果不足惜躁锡,卻給世界環(huán)境...
    茶點故事閱讀 41,017評論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望置侍。 院中可真熱鬧映之,春花似錦拦焚、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,668評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至蠢甲,卻和暖如春僵刮,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背鹦牛。 一陣腳步聲響...
    開封第一講書人閱讀 32,823評論 1 269
  • 我被黑心中介騙來泰國打工搞糕, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人曼追。 一個月前我還...
    沈念sama閱讀 47,722評論 2 368
  • 正文 我出身青樓寞宫,卻偏偏與公主長得像,于是被迫代替她去往敵國和親拉鹃。 傳聞我的和親對象是個殘疾皇子辈赋,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,611評論 2 353

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)膏燕,斷路器钥屈,智...
    卡卡羅2017閱讀 134,651評論 18 139
  • Spring Boot 參考指南 介紹 轉(zhuǎn)載自:https://www.gitbook.com/book/qbgb...
    毛宇鵬閱讀 46,804評論 6 342
  • 安裝下面是有Windows和Linux不同系統(tǒng)下的詳細(xì)安裝步驟,這了不再做介紹安裝步驟這里需要注意RabbitMQ...
    RalapHao閱讀 1,533評論 0 3
  • RabbitMQ簡介 RabbitMQ是一個在AMQP基礎(chǔ)上完整的坝辫,可復(fù)用的企業(yè)消息系統(tǒng)MQ全稱為Message ...
    Raye閱讀 5,232評論 7 13
  • RabbitMQ 即一個消息隊列篷就,主要是用來實現(xiàn)應(yīng)用程序的異步和解耦,同時也能起到消息緩沖近忙,消息分發(fā)的作用竭业。 消息...
    極樂君閱讀 1,179評論 0 13