springboot筆記——整合消息中間件

消息中間件

在消息中間件中有 2 個(gè)重要的概念:消息代理和目的地。當(dāng)消息發(fā)送者發(fā)送消息后球昨,消息就被消息代理接管尔店,消息代理保證消息傳遞到指定目的地。

我們常用的消息代理有 JMS 和 AMQP 規(guī)范主慰。對(duì)應(yīng)地连锯,它們常見(jiàn)的實(shí)現(xiàn)分別是 ActiveMQ 和 RabbitMQ住拭。

整合 ActiveMQ

添加依賴(lài)

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-activemq</artifactId>

</dependency>

<dependency>

<groupId>org.apache.activemq</groupId> 

<artifactId>activemq-pool</artifactId> 

</dependency>

添加配置

activemq 配置

spring.activemq.broker-url=tcp://192.168.2.12:61616

spring.activemq.user=admin

spring.activemq.password=admin

spring.activemq.pool.enabled=false

spring.activemq.pool.max-connections=50

使用發(fā)布/訂閱模式時(shí),下邊配置需要設(shè)置成 true

spring.jms.pub-sub-domain=false

此處 spring.activemq.pool.enabled=false盆繁,表示關(guān)閉連接池盯孙。

編碼

配置類(lèi):

@Configuration

public class JmsConfirguration {

public static final String QUEUE_NAME = "activemq_queue";

public static final String TOPIC_NAME = "activemq_topic";

@Bean

public Queue queue() {

    return new ActiveMQQueue(QUEUE_NAME);

}

@Bean

public Topic topic() {

    return new ActiveMQTopic(TOPIC_NAME);

}

}

負(fù)責(zé)創(chuàng)建隊(duì)列和主題调卑。

消息生產(chǎn)者:

@Component

public class JmsSender {

@Autowired

private Queue queue;

@Autowired

private Topic topic;

@Autowired

private JmsMessagingTemplate jmsTemplate;

public void sendByQueue(String message) {

    this.jmsTemplate.convertAndSend(queue, message);

}

public void sendByTopic(String message) {

    this.jmsTemplate.convertAndSend(topic, message);

}

}

消息消費(fèi)者:

@Component

public class JmsReceiver {

@JmsListener(destination = JmsConfirguration.QUEUE_NAME)

public void receiveByQueue(String message) {

    System.out.println("接收隊(duì)列消息:" + message);

}

@JmsListener(destination = JmsConfirguration.TOPIC_NAME)

public void receiveByTopic(String message) {

    System.out.println("接收主題消息:" + message);

}

}

消息消費(fèi)者使用 @JmsListener 注解監(jiān)聽(tīng)消息莉兰。

測(cè)試

@RunWith(SpringRunner.class)

@SpringBootTest

public class JmsTest {

@Autowired

private JmsSender sender;

@Test

public void testSendByQueue() {

    for (int i = 1; i < 6; i++) {

        this.sender.sendByQueue("hello activemq queue " + i);

    }

}

@Test

public void testSendByTopic() {

    for (int i = 1; i < 6; i++) {

        this.sender.sendByTopic("hello activemq topic " + i);

    }

}

}

打印結(jié)果:

接收隊(duì)列消息:hello activemq queue 1

接收隊(duì)列消息:hello activemq queue 2

接收隊(duì)列消息:hello activemq queue 3

接收隊(duì)列消息:hello activemq queue 4

接收隊(duì)列消息:hello activemq queue 5

測(cè)試發(fā)布/訂閱模式時(shí),設(shè)置 spring.jms.pub-sub-domain=true

接收主題消息:hello activemq topic 1

接收主題消息:hello activemq topic 2

接收主題消息:hello activemq topic 3

接收主題消息:hello activemq topic 4

接收主題消息:hello activemq topic 5

整合 RabbitMQ

添加依賴(lài)

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-amqp</artifactId>

</dependency>

添加配置

spring.rabbitmq.host=192.168.2.30

spring.rabbitmq.port=5672

spring.rabbitmq.username=light

spring.rabbitmq.password=light

spring.rabbitmq.virtual-host=/test

編碼

配置類(lèi):

@Configuration

public class AmqpConfirguration {

//=============簡(jiǎn)單秦效、工作隊(duì)列模式===============

public static final String SIMPLE_QUEUE = "simple_queue";

@Bean

public Queue queue() {

    return new Queue(SIMPLE_QUEUE, true);

}

//===============發(fā)布/訂閱模式============

public static final String PS_QUEUE_1 = "ps_queue_1";

public static final String PS_QUEUE_2 = "ps_queue_2";

public static final String FANOUT_EXCHANGE = "fanout_exchange";

@Bean

public Queue psQueue1() {

    return new Queue(PS_QUEUE_1, true);

}

@Bean

public Queue psQueue2() {

    return new Queue(PS_QUEUE_2, true);

}

@Bean

public FanoutExchange fanoutExchange() {

    return new FanoutExchange(FANOUT_EXCHANGE);

}

@Bean

public Binding fanoutBinding1() {

    return BindingBuilder.bind(psQueue1()).to(fanoutExchange());

}

@Bean

public Binding fanoutBinding2() {

    return BindingBuilder.bind(psQueue2()).to(fanoutExchange());

}

//===============路由模式============

public static final String ROUTING_QUEUE_1 = "routing_queue_1";

public static final String ROUTING_QUEUE_2 = "routing_queue_2";

public static final String DIRECT_EXCHANGE = "direct_exchange";

@Bean

public Queue routingQueue1() {

    return new Queue(ROUTING_QUEUE_1, true);

}

@Bean

public Queue routingQueue2() {

    return new Queue(ROUTING_QUEUE_2, true);

}

@Bean

public DirectExchange directExchange() {

    return new DirectExchange(DIRECT_EXCHANGE);

}

@Bean

public Binding directBinding1() {

    return BindingBuilder.bind(routingQueue1()).to(directExchange()).with("user");

}

@Bean

public Binding directBinding2() {

    return BindingBuilder.bind(routingQueue2()).to(directExchange()).with("order");

}

//===============主題模式============

public static final String TOPIC_QUEUE_1 = "topic_queue_1";

public static final String TOPIC_QUEUE_2 = "topic_queue_2";

public static final String TOPIC_EXCHANGE = "topic_exchange";

@Bean

public Queue topicQueue1() {

    return new Queue(TOPIC_QUEUE_1, true);

}

@Bean

public Queue topicQueue2() {

    return new Queue(TOPIC_QUEUE_2, true);

}

@Bean

public TopicExchange topicExchange() {

    return new TopicExchange(TOPIC_EXCHANGE);

}

@Bean

public Binding topicBinding1() {

    return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("user.add");

}

@Bean

public Binding topicBinding2() {

    return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("user.#");

}

}

RabbitMQ 有多種工作模式,因此配置比較多拱雏。想了解相關(guān)內(nèi)容的讀者可以查看本站的《RabbitMQ 工作模式介紹》或者自行百度相關(guān)資料棉安。

消息生產(chǎn)者:

@Component

public class AmqpSender {

@Autowired

private AmqpTemplate amqpTemplate;

/**

 * 簡(jiǎn)單模式發(fā)送

 *

 * @param message

 */

public void simpleSend(String message) {

    this.amqpTemplate.convertAndSend(AmqpConfirguration.SIMPLE_QUEUE, message);

}

/**

 * 發(fā)布/訂閱模式發(fā)送

 *

 * @param message

 */

public void psSend(String message) {

    this.amqpTemplate.convertAndSend(AmqpConfirguration.FANOUT_EXCHANGE, "", message);

}

/**

 * 路由模式發(fā)送

 *

 * @param message

 */

public void routingSend(String routingKey, String message) {

    this.amqpTemplate.convertAndSend(AmqpConfirguration.DIRECT_EXCHANGE, routingKey, message);

}

/**

 * 主題模式發(fā)送

 *

 * @param routingKey

 * @param message

 */

public void topicSend(String routingKey, String message) {

    this.amqpTemplate.convertAndSend(AmqpConfirguration.TOPIC_EXCHANGE, routingKey, message);

}

}

消息消費(fèi)者:

@Component

public class AmqpReceiver {

/**

 * 簡(jiǎn)單模式接收

 *

 * @param message

 */

@RabbitListener(queues = AmqpConfirguration.SIMPLE_QUEUE)

public void simpleReceive(String message) {

    System.out.println("接收消息:" + message);

}

/**

 * 發(fā)布/訂閱模式接收

 *

 * @param message

 */

@RabbitListener(queues = AmqpConfirguration.PS_QUEUE_1)

public void psReceive1(String message) {

    System.out.println(AmqpConfirguration.PS_QUEUE_1 + "接收消息:" + message);

}

@RabbitListener(queues = AmqpConfirguration.PS_QUEUE_2)

public void psReceive2(String message) {

    System.out.println(AmqpConfirguration.PS_QUEUE_2 + "接收消息:" + message);

}

/**

 * 路由模式接收

 *

 * @param message

 */

@RabbitListener(queues = AmqpConfirguration.ROUTING_QUEUE_1)

public void routingReceive1(String message) {

    System.out.println(AmqpConfirguration.ROUTING_QUEUE_1 + "接收消息:" + message);

}

@RabbitListener(queues = AmqpConfirguration.ROUTING_QUEUE_2)

public void routingReceive2(String message) {

    System.out.println(AmqpConfirguration.ROUTING_QUEUE_2 + "接收消息:" + message);

}

/**

 * 主題模式接收

 *

 * @param message

 */

@RabbitListener(queues = AmqpConfirguration.TOPIC_QUEUE_1)

public void topicReceive1(String message) {

    System.out.println(AmqpConfirguration.TOPIC_QUEUE_1 + "接收消息:" + message);

}

@RabbitListener(queues = AmqpConfirguration.TOPIC_QUEUE_2)

public void topicReceive2(String message) {

    System.out.println(AmqpConfirguration.TOPIC_QUEUE_2 + "接收消息:" + message);

}

}

測(cè)試

@RunWith(SpringRunner.class)

@SpringBootTest

public class AmqpTest {

@Autowired

private AmqpSender sender;

@Test

public void testSimpleSend() {

    for (int i = 1; i < 6; i++) {

        this.sender.simpleSend("test simpleSend " + i);

    }

}

@Test

public void testPsSend() {

    for (int i = 1; i < 6; i++) {

        this.sender.psSend("test psSend " + i);

    }

}

@Test

public void testRoutingSend() {

    for (int i = 1; i < 6; i++) {

        this.sender.routingSend("order", "test routingSend " + i);

    }

}

@Test

public void testTopicSend() {

    for (int i = 1; i < 6; i++) {

        this.sender.topicSend("user.add", "test topicSend " + i);

    }

}

}

測(cè)試結(jié)果略過(guò)底扳。铸抑。。

踩坑提醒1:ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN

解決方案:

  1. 請(qǐng)確保用戶(hù)名和密碼是否正確衷模,需要注意的是用戶(hù)名和密碼的值是否包含空格或制表符(筆者測(cè)試時(shí)就是因?yàn)槊艽a多了一個(gè)制表符導(dǎo)致認(rèn)證失斎笛础)。

  2. 如果測(cè)試賬戶(hù)使用的是 guest阱冶,需要修改 rabbitmq.conf 文件刁憋。在該文件中添加 “l(fā)oopback_users = none” 配置。

踩坑提醒2:Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it

解決方案:

我們可以登陸 RabbitMQ 的管理界面木蹬,在 Queue 選項(xiàng)中手動(dòng)添加對(duì)應(yīng)的隊(duì)列至耻。

參考資料

消息中間件簡(jiǎn)單介紹

Spring Boot 官方文檔

Rabbit MQ 訪(fǎng)問(wèn)控制相關(guān)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市镊叁,隨后出現(xiàn)的幾起案子尘颓,更是在濱河造成了極大的恐慌,老刑警劉巖晦譬,帶你破解...
    沈念sama閱讀 216,692評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件疤苹,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡敛腌,警方通過(guò)查閱死者的電腦和手機(jī)卧土,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,482評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)像樊,“玉大人尤莺,你說(shuō)我怎么就攤上這事∩鳎” “怎么了颤霎?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,995評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀(guān)的道長(zhǎng)足绅。 經(jīng)常有香客問(wèn)我捷绑,道長(zhǎng),這世上最難降的妖魔是什么氢妈? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,223評(píng)論 1 292
  • 正文 為了忘掉前任粹污,我火速辦了婚禮,結(jié)果婚禮上首量,老公的妹妹穿的比我還像新娘壮吩。我一直安慰自己进苍,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,245評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布鸭叙。 她就那樣靜靜地躺著觉啊,像睡著了一般。 火紅的嫁衣襯著肌膚如雪沈贝。 梳的紋絲不亂的頭發(fā)上杠人,一...
    開(kāi)封第一講書(shū)人閱讀 51,208評(píng)論 1 299
  • 那天,我揣著相機(jī)與錄音宋下,去河邊找鬼嗡善。 笑死,一個(gè)胖子當(dāng)著我的面吹牛学歧,可吹牛的內(nèi)容都是我干的罩引。 我是一名探鬼主播,決...
    沈念sama閱讀 40,091評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼枝笨,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼袁铐!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起横浑,我...
    開(kāi)封第一講書(shū)人閱讀 38,929評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤剔桨,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后伪嫁,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體领炫,經(jīng)...
    沈念sama閱讀 45,346評(píng)論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,570評(píng)論 2 333
  • 正文 我和宋清朗相戀三年张咳,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了帝洪。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,739評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡脚猾,死狀恐怖葱峡,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情龙助,我是刑警寧澤砰奕,帶...
    沈念sama閱讀 35,437評(píng)論 5 344
  • 正文 年R本政府宣布,位于F島的核電站提鸟,受9級(jí)特大地震影響军援,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜称勋,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,037評(píng)論 3 326
  • 文/蒙蒙 一胸哥、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧赡鲜,春花似錦空厌、人聲如沸庐船。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,677評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)筐钟。三九已至,卻和暖如春赋朦,著一層夾襖步出監(jiān)牢的瞬間篓冲,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,833評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工北发, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留纹因,地道東北人喷屋。 一個(gè)月前我還...
    沈念sama閱讀 47,760評(píng)論 2 369
  • 正文 我出身青樓琳拨,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親屯曹。 傳聞我的和親對(duì)象是個(gè)殘疾皇子狱庇,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,647評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 15,908評(píng)論 2 11
  • 轉(zhuǎn)載2017年11月01日 09:54:03 2595 RabbitMQ 即一個(gè)消息隊(duì)列,主要是用來(lái)實(shí)現(xiàn)應(yīng)用程序的...
    楊傳池chris閱讀 6,350評(píng)論 1 0
  • 中間件在中大型的系統(tǒng)中應(yīng)用較為廣泛恶耽,主要用來(lái)解決系統(tǒng)模塊之間的強(qiáng)耦合關(guān)系密任;也就是說(shuō)消息中間件不需要同步返回結(jié)果,也...
    帥可兒妞閱讀 306評(píng)論 0 0
  • 在介紹NATS之前先了解下什么是分布式系統(tǒng)和消息中間件 對(duì)于分布式系統(tǒng)的定義偷俭,一直以來(lái)我都沒(méi)有找到或者想到特別簡(jiǎn)練...
    Java大生閱讀 2,412評(píng)論 0 0
  • 姓名:董子恒 公司:浙江駿鴻貿(mào)易有限公司 101期樂(lè)觀(guān)1組學(xué)員浪讳,509期反省2組志工,522期謙虛3組志工涌萤,540...
    董子恒閱讀 280評(píng)論 0 1