? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 一巫延、概述
? ? ? ? ? ? 四摔笤、SpringBoot+RabbitMQ整合
- 使用docker安裝RabbitMQ
1)消略、使用docker pull 下載rabbitmq
[root@localhost ~]# docker pull registry.docker-cn.com/library/rabbitmq:3-management
3-management: Pulling from library/rabbitmq
683abbb4ea60: Already exists
30a58d97bcb5: Pull complete
...
- 啟動rabbitMQ
[root@localhost ~]# docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
registry.docker-cn.com/library/redis latest 71a81cb279e3 9 days ago 83.4MB
registry.docker-cn.com/library/rabbitmq 3-management 500d74765467 9 days ago 149MB
mysql 5.7 66bc0f66b7af 9 days ago 372MB
[root@localhost ~]# docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq 500d74765467
7599303175cb42287d0f58c0b9d0db67070199670cad4f680f6348e41d6e2240
-
在瀏覽器輸入:http://主機地址:15672 進入rabbitMQ登錄頁面
-
輸入默認用戶名:guest/guest 進入管理界面
- 搭建springboot+rabbit工程
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
引入spring-boot-starter-amqp包后springboot幫我們自動配置:
- 執(zhí)行RabbitAutoConfiguration
- 自動配置類會幫我們自動配置連接工廠虚缎、RabbitTemplate(給RabbitMQ發(fā)送和接受消息)守谓、AmqpAdmin(RabbitMQ系統(tǒng)管理功能組件)
@Configuration
@ConditionalOnMissingBean({ConnectionFactory.class})
protected static class RabbitConnectionFactoryCreator {
protected RabbitConnectionFactoryCreator() {
}
@Bean
public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties config) throws Exception {
RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();
if(config.determineHost() != null) {
factory.setHost(config.determineHost());
}
factory.setPort(config.determinePort());
if(config.determineUsername() != null) {
factory.setUsername(config.determineUsername());
}
if(config.determinePassword() != null) {
factory.setPassword(config.determinePassword());
}
if(config.determineVirtualHost() != null) {
factory.setVirtualHost(config.determineVirtualHost());
}
if(config.getRequestedHeartbeat() != null) {
factory.setRequestedHeartbeat(config.getRequestedHeartbeat().intValue());
}
Ssl ssl = config.getSsl();
if(ssl.isEnabled()) {
factory.setUseSSL(true);
if(ssl.getAlgorithm() != null) {
factory.setSslAlgorithm(ssl.getAlgorithm());
}
factory.setKeyStore(ssl.getKeyStore());
factory.setKeyStorePassphrase(ssl.getKeyStorePassword());
factory.setTrustStore(ssl.getTrustStore());
factory.setTrustStorePassphrase(ssl.getTrustStorePassword());
}
if(config.getConnectionTimeout() != null) {
factory.setConnectionTimeout(config.getConnectionTimeout().intValue());
}
factory.afterPropertiesSet();
CachingConnectionFactory connectionFactory = new CachingConnectionFactory((com.rabbitmq.client.ConnectionFactory)factory.getObject());
connectionFactory.setAddresses(config.determineAddresses());
connectionFactory.setPublisherConfirms(config.isPublisherConfirms());
connectionFactory.setPublisherReturns(config.isPublisherReturns());
if(config.getCache().getChannel().getSize() != null) {
connectionFactory.setChannelCacheSize(config.getCache().getChannel().getSize().intValue());
}
if(config.getCache().getConnection().getMode() != null) {
connectionFactory.setCacheMode(config.getCache().getConnection().getMode());
}
if(config.getCache().getConnection().getSize() != null) {
connectionFactory.setConnectionCacheSize(config.getCache().getConnection().getSize().intValue());
}
if(config.getCache().getChannel().getCheckoutTimeout() != null) {
connectionFactory.setChannelCheckoutTimeout(config.getCache().getChannel().getCheckoutTimeout().longValue());
}
return connectionFactory;
}
}
-------------------------------------------------------------------------------------
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean({RabbitTemplate.class})
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
MessageConverter messageConverter = (MessageConverter)this.messageConverter.getIfUnique();
if(messageConverter != null) {
rabbitTemplate.setMessageConverter(messageConverter);
}
rabbitTemplate.setMandatory(this.determineMandatoryFlag());
Template templateProperties = this.properties.getTemplate();
Retry retryProperties = templateProperties.getRetry();
if(retryProperties.isEnabled()) {
rabbitTemplate.setRetryTemplate(this.createRetryTemplate(retryProperties));
}
if(templateProperties.getReceiveTimeout() != null) {
rabbitTemplate.setReceiveTimeout(templateProperties.getReceiveTimeout().longValue());
}
if(templateProperties.getReplyTimeout() != null) {
rabbitTemplate.setReplyTimeout(templateProperties.getReplyTimeout().longValue());
}
return rabbitTemplate;
}
------------------------------------------------------------------------------------------------
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnProperty(
prefix = "spring.rabbitmq",
name = {"dynamic"},
matchIfMissing = true
)
@ConditionalOnMissingBean({AmqpAdmin.class})
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
- RabbitProperties 封裝了RabbitMQ的所有配置
@ConfigurationProperties(
prefix = "spring.rabbitmq"
)
public class RabbitProperties {
private String host = "localhost";
private int port = 5672;
private String username;
private String password;
private final RabbitProperties.Ssl ssl = new RabbitProperties.Ssl();
private String virtualHost;
private String addresses;
private Integer requestedHeartbeat;
private boolean publisherConfirms;
private boolean publisherReturns;
private Integer connectionTimeout;
private final RabbitProperties.Cache cache = new RabbitProperties.Cache();
private final RabbitProperties.Listener listener = new RabbitProperties.Listener();
private final RabbitProperties.Template template = new RabbitProperties.Template();
private List<RabbitProperties.Address> parsedAddresses;
...
- application.properties配置
spring.rabbitmq.host=192.168.43.53
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
- 編寫測試類
- 自定義MessageConverter
package com.pyy.rabbitmq.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Created by Administrator on 2018/7/7 0007.
*/
@Configuration
public class MyAMQPConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
- Exchange.direct:點對點(單播模式)
發(fā)送消息:
/**
* 1穿铆、單播(點對點)
*/
@Test
public void direct() {
//Message需要自定構(gòu)造一個:定義消息體內(nèi)容和消息頭:
//rabbitTemplate.send(exchange, routeKey, message);
// object默認當成消息體,只需要傳入需要發(fā)送的內(nèi)容斋荞,自動序列化發(fā)送給rabbitmq
// rabbitTemplate.convertAndSend(exchange, routeKey, object);
User user = new User();
user.setUsername("張三");
user.setPassword("12312321");
// msg 對象被默認(SimpleMessageConverter -- byte[])序列化后發(fā)送出去
// 通過自定義:MessageConverter 實現(xiàn)JSON序列化
rabbitTemplate.convertAndSend("exchange.direct", "pyy.news", user);
}
只有Queue:pyy.news 該消息隊列接收到消息荞雏,其他隊列不能接收到該消息。
消息內(nèi)容如下:
接受消息:
/**
* 接受數(shù)據(jù)
*/
@Test
public void receve() {
Object obj = rabbitTemplate.receiveAndConvert("pyy.news");
System.out.println(obj.getClass());
System.out.println(obj);
if(obj instanceof User){
User u = (User) obj;
System.out.println(u.getUsername());
System.out.println(u.getPassword());
}
}
結(jié)果:
class com.pyy.rabbitmq.User
com.pyy.rabbitmq.User@4c27d39d
張三
12312321
- Exchange.fanout:廣播模式
/**
* 2平酿、廣播
*/
@Test
public void fanout() {
//Message需要自定構(gòu)造一個:定義消息體內(nèi)容和消息頭:
//rabbitTemplate.send(exchange, routeKey, message);
// object默認當成消息體凤优,只需要傳入需要發(fā)送的內(nèi)容,自動序列化發(fā)送給rabbitmq
// rabbitTemplate.convertAndSend(exchange, routeKey, object);
User user = new User();
user.setUsername("李四");
user.setPassword("123456");
// fanout模式路由鍵不用指定染服,所有綁定到這個交換機的消息隊列都能接受到該消息
rabbitTemplate.convertAndSend("exchange.fanout", "", user);
}
注意:fanout模式路由鍵不用指定别洪,所有綁定到這個交換機的消息隊列都能接受到該消息
- Exchange.topic 主題模式
/**
* 3、topic
*/
@Test
public void topic() {
//Message需要自定構(gòu)造一個:定義消息體內(nèi)容和消息頭:
//rabbitTemplate.send(exchange, routeKey, message);
// object默認當成消息體柳刮,只需要傳入需要發(fā)送的內(nèi)容挖垛,自動序列化發(fā)送給rabbitmq
// rabbitTemplate.convertAndSend(exchange, routeKey, object);
User user = new User();
user.setUsername("王五");
user.setPassword("123456");
// topic模式路由鍵只有和exchange綁定的路由鍵規(guī)則匹配,對應(yīng)的消息隊列就能收到消息
rabbitTemplate.convertAndSend("exchange.topic", "*.news", user);
}
SpringBoot高級-消息-@RabbitListener&@EnableRabbit監(jiān)聽
- 開啟RabbitListener注解支持
@EnableRabbit // 開啟基于注解的RabbitMQ
@SpringBootApplication
public class Springboot02RabbitmqApplication {
public static void main(String[] args) {
SpringApplication.run(Springboot02RabbitmqApplication.class, args);
}
}
- 使用@RabbitListener注解完成消息接收
package com.pyy.rabbitmq.service;
import com.pyy.rabbitmq.model.User;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
* Created by Administrator on 2018/7/8 0008.
*/
@Service
public class UserService {
/**
* 自動監(jiān)聽消息隊列:pyy.news
* @param user
*/
@RabbitListener(queues = {"pyy.news"})
public void receive(User user) {
System.out.println("收到消息:"+user);
}
//結(jié)果: 收到消息:User{username='張三', password='12312321'}
/**
* 自動監(jiān)聽消息隊列:pyy.news
* @param message
*/
@RabbitListener(queues = {"pyy"})
public void receive(Message message) {
System.out.println("消息頭:"+message.getMessageProperties());
System.out.println("消息體:"+message.getBody());
}
}
系統(tǒng)會自動監(jiān)聽指定名稱的消息隊列秉颗,只有有消息自動消費痢毒。
AmqpAdmin 管理rabbitmq相關(guān)操作:
package com.pyy.rabbitmq;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class Springboot02RabbitmqApplicationTests1 {
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
AmqpAdmin amqpAdmin;
/**
* 創(chuàng)建交換機:exchange
*/
@Test
public void createExchange() {
amqpAdmin.declareExchange(new DirectExchange("amqpadmin.DirectExchange"));
amqpAdmin.declareExchange(new FanoutExchange("amqpadmin.FanoutExchange"));
amqpAdmin.declareExchange(new TopicExchange("amqpadmin.TopicExchange"));
System.out.println("創(chuàng)建完畢");
}
/**
* 創(chuàng)建隊列:queye
*/
@Test
public void createQueue() {
amqpAdmin.declareQueue(new Queue("amqpadmin.queue", true));
System.out.println("創(chuàng)建完畢");
}
/**
* 創(chuàng)建綁定規(guī)則:binding
*/
@Test
public void createBinding() {
/**
String destination: 目的地
String exchange: 交換機
String routingKey: 路由鍵
Map<String, Object> arguments:
Binding.DestinationType destinationType:目的地類型
*/
amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE, "amqpadmin.DirectExchange", "amqp.haha", null));
System.out.println("創(chuàng)建完畢");
}
}