此文章參考:http://blog.csdn.net/zl18310999566/article/details/54341057
發(fā)送端服務(wù)器
配置文件信息
@Configuration
public class RabbitConfig {
/*此處?kù)o態(tài)常量薄榛,在原作中為了保證名稱統(tǒng)一使用裂明,可以不聲明*/
public static final String FOO_EXCHANGE = "callback.exchange.foo";
public static final String FOO_EXCHANGE_TOPIC = "callback.topic.exchange.foo";
public static final String FOO_ROUTINGKEY = "callback.routingkey.foo";
public static final String FOO_QUEUE = "callback.queue.foo";
/*end*/
/*顯式使用配置文件信息*/
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private Integer port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
@Value("${spring.rabbitmq.publisher-confirms}")
private boolean publisherConfirms;
/*end*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
/** 如果要進(jìn)行消息回調(diào)攻柠,則這里必須要設(shè)置為true */
connectionFactory.setPublisherConfirms(publisherConfirms);
return connectionFactory;
}
@Bean
/** 因?yàn)橐O(shè)置回調(diào)類,所以應(yīng)是prototype類型杯聚,如果是singleton類型瘤运,則回調(diào) 類為最后一次設(shè)置 */
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
/*
@Scope(value=ConfigurableBeanFactory.SCOPE_PROTOTYPE)這個(gè)是說在每次注入的時(shí)候回自動(dòng)創(chuàng)建一個(gè)新的bean實(shí)例
@Scope(value=ConfigurableBeanFactory.SCOPE_SINGLETON)單例模式峰伙,在整個(gè)應(yīng)用中只能創(chuàng)建一個(gè)實(shí)例
@Scope(value=WebApplicationContext.SCOPE_GLOBAL_SESSION)全局session中的一般不常用
@Scope(value=WebApplicationContext.SCOPE_APPLICATION)在一個(gè)web應(yīng)用中只創(chuàng)建一個(gè)實(shí)例
@Scope(value=WebApplicationContext.SCOPE_REQUEST)在一個(gè)請(qǐng)求中創(chuàng)建一個(gè)實(shí)例
@Scope(value=WebApplicationContext.SCOPE_SESSION)每次創(chuàng)建一個(gè)會(huì)話中創(chuàng)建一個(gè)實(shí)例
*/
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory()); //如果上面未使用顯示調(diào)用聲明 ConnectionFactory,此處運(yùn)行會(huì)報(bào)錯(cuò):ConnectionFactory is null
return template;
}
消息發(fā)送
@Component
public class MessageSender implements RabbitTemplate.ConfirmCallback{
private RabbitTemplate rabbitTemplate;
@Autowired
public MessageSender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
this.rabbitTemplate.setConfirmCallback(this);
}
public void send(String content) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(
RabbitConfig.FOO_EXCHANGE_TOPIC, //交換機(jī)
RabbitConfig.FOO_ROUTINGKEY,//路由
content, //內(nèi)容
correlationData
);
}
public void sendTopic(String content){
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(
RabbitConfig.FOO_EXCHANGE_TOPIC, //交換機(jī)
RabbitConfig.FOO_ROUTINGKEY + ".topic",//路由
content, //內(nèi)容
correlationData
);
}
/**
* 回調(diào)方法
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
}
}
接收端服務(wù)器
@Component
@Configuration
@RabbitListener(queues = RabbitMQFinal.FOO_QUEUE)
public class MessageReceiver {
/**
* 設(shè)置交換機(jī)
*
* @return
*/
@Bean
public DirectExchange directExchange() {
/**
* DirectExchange : 按照routingkey分發(fā)到指定隊(duì)列
* TopicExchange : 多關(guān)鍵字匹配
* FanoutExchange : 將消息分發(fā)到所有的綁定隊(duì)列浸卦,無routingkey的概念
* HeadersExchange : 通過添加屬性key-value匹配
*/
return new DirectExchange(RabbitMQFinal.FOO_EXCHANGE);
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(RabbitMQFinal.FOO_EXCHANGE_TOPIC);
}
/**
* 注冊(cè)隊(duì)列
*
* @return
*/
@Bean
public Queue queue() {
return new Queue(RabbitMQFinal.FOO_QUEUE);
}
/**
* 將隊(duì)列綁定至交換機(jī)
*
* @return
*/
@Bean
public Binding binding() {
//綁定隊(duì)列
return BindingBuilder.bind(queue()) //綁定隊(duì)列
.to(topicExchange()) //設(shè)置交換機(jī)
.with(RabbitMQFinal.FOO_ROUTINGKEY + ".#");//設(shè)置 routingkey topic采用模糊匹配
}
@RabbitHandler
public void process(@Payload String content) {
System.out.println("Receiver Value : " + content);
}
}
測(cè)試類
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = WebClientCoreApplication.class)
@WebAppConfiguration
public class RabbitTest {
@Autowired
MessageSender messageSender;
@Test
public void testRabbit(){
String content = "hello " + new Date();
System.out.println("Sender-message : " + content);
messageSender.send(content);
}
}
//WebClientCoreApplication 為當(dāng)前服務(wù)的啟動(dòng)類
測(cè)試過程
- 啟動(dòng)接收服務(wù)器
- 執(zhí)行測(cè)試類
期望值
- 發(fā)送服務(wù)器正常打印
- 接收服務(wù)器正常打印
- RabbitMQ 管理端可以正常查閱發(fā)送記錄
如果接收端未配置可用消息隊(duì)列,接收端會(huì)循環(huán)檢測(cè)如失。
寫在后面(可忽略)
通過查看RabbitMQ 管理端頁(yè)面
消息通過 ROUTINGKEY 完成綁定
生產(chǎn)者者不需要再聲明隊(duì)列绊诲,只需在消息中綁定 ROUTINGKEY 即可。
交換機(jī)設(shè)置褪贵,隊(duì)列處理交由消息監(jiān)聽者處理掂之。