簡書:亞武de小文 【原創(chuàng):轉(zhuǎn)載請注明出處】
Rabbitmq集成
LengToo上學(xué).png
一、相關(guān)配置
-
導(dǎo)入maven依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.1.6.RELEASE</version> </dependency>
或者
新建項(xiàng)目的時(shí)候直接選中
springboot集成rabbitmq.png -
在application.yml文件當(dāng)中引入RabbitMQ基本的配置信息
spring: # rabbitmq基本信息 rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest
二今阳、代碼編寫
定義RabbitConfig類师溅,配置Exchange、Queue酣栈、及綁定交換機(jī)
-
RabbitConfig.java
package com.yawu.xiaowen.springboot.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; public static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform"; /** * 此處以topic交換機(jī)為例 * 交換機(jī)配置 * ExchangeBuilder提供了fanout、direct汹押、topic矿筝、header交換機(jī)類型的配置 * * @return the exchange */ @Bean(EXCHANGE_TOPICS_INFORM) public Exchange EXCHANGE_TOPICS_INFORM() { // durable(true)持久化,消息隊(duì)列重啟后交換機(jī)仍然存在 return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); } //聲明隊(duì)列 @Bean(QUEUE_INFORM_SMS) public Queue QUEUE_INFORM_SMS() { Queue queue = new Queue(QUEUE_INFORM_SMS); return queue; } //聲明隊(duì)列 @Bean(QUEUE_INFORM_EMAIL) public Queue QUEUE_INFORM_EMAIL() { Queue queue = new Queue(QUEUE_INFORM_EMAIL); return queue; } /** * channel.queueBind(INFORM_QUEUE_SMS,"inform_exchange_topic","inform.#.sms.#"); * 綁定隊(duì)列到交換機(jī) . * * @param queue the queue * @param exchange the exchange * @return the binding */ @Bean public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("inform.#.sms.#").noargs(); } @Bean public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("inform.#.email.#").noargs(); } }
生產(chǎn)者
- 使用RarbbitTemplate發(fā)送消息
-
SpProducer.java
package com.yawu.xiaowen.springboot.pcdemo; import com.yawu.xiaowen.springboot.config.RabbitConfig; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * Springboot的生產(chǎn)者 * * @author yawu * @date 2019.07.02 */ @Component public class SpProducer { @Autowired RabbitTemplate rabbitTemplate; public void sendByTopics() { for (int i = 0; i < 5; i++) { String message = "sms email inform to user" + i; rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TOPICS_INFORM, "inform.sms.email", message); System.out.println("發(fā)送的消息:'" + message + "'"); } } }
消費(fèi)者
-
SpConsumer.java
package com.yawu.xiaowen.springboot.pcdemo; import com.rabbitmq.client.Channel; import com.yawu.xiaowen.springboot.config.RabbitConfig; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * Springboot的消費(fèi)者 * * @author yawu * @date 2019.07.02 */ @Component public class SpConsumer { // 監(jiān)聽email隊(duì)列 @RabbitListener(queues = {RabbitConfig.QUEUE_INFORM_EMAIL}) public void receive_email(String msg, Message message, Channel channel) { System.out.println("接收到的email棚贾;" + msg); } // 監(jiān)聽sms隊(duì)列 @RabbitListener(queues = {RabbitConfig.QUEUE_INFORM_SMS}) public void receive_sms(String msg, Message message, Channel channel) { System.out.println("接收到的sms窖维;" + msg); } }
測試運(yùn)行
-
XiaowenApplicationTests.java
package com.yawu.xiaowen; import com.yawu.xiaowen.springboot.pcdemo.SpConsumer; import com.yawu.xiaowen.springboot.pcdemo.SpProducer; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; /** * Springboot的啟動(dòng)測試類 * * @author yawu * @date 2019.07.02 */ @RunWith(SpringRunner.class) @SpringBootTest(classes = XiaowenApplication.class, value = "spring.profiles.active=boot") public class XiaowenApplicationTests { @Autowired SpProducer spProducer; @Autowired SpConsumer spConsumer; @Test public void testSendMsg() { spProducer.sendByTopics(); try { Thread.sleep(10 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
集成rabbitmq運(yùn)行結(jié)果.png