http://blog.csdn.net/u011493599/article/details/62892490
引入jar包
[java]?view plain?copy
????
org.springframework.boot????
spring-boot-starter-amqp????
1在resource下創(chuàng)建rabbitmq.properties
[java]?view plain?copy
#是訪問port不是15672,15672是api和管理界面的port??
spring.rabbitmq.addresses=localhost:5672??
spring.rabbitmq.username=admin??
spring.rabbitmq.password=123456??
#如果要進(jìn)行消息回調(diào)蔓涧,則這里必須要設(shè)置為true??
spring.rabbitmq.publisherconfirms=true??
2創(chuàng)建rabbitmq對(duì)象RabbitMq
[java]?view plain?copy
package?com.demo.model;??
import?lombok.Getter;??
import?lombok.Setter;??
import?org.springframework.boot.context.properties.ConfigurationProperties;??
import?org.springframework.context.annotation.Configuration;??
/**
?*?Created?by?huguoju?on?2017/3/2.
?*?rabbitmq配置文件
?*/??
@Configuration??
@Getter??
@Setter??
@ConfigurationProperties(locations?=?"classpath:rabbitmq/rabbitmq.properties",prefix?=?"spring.rabbitmq")??
public?class?RabbitMq{??
private?String?addresses;??
private?String?username;??
private?String?password;??
private?Boolean?publisherconfirms;??
}??
3生產(chǎn)者配置
? ?3.1通用性基礎(chǔ)配置
[java]?view plain?copy
package?com.demo.rabbitmq.sender;??
import?com.demo.model.RabbitMq;??
import?lombok.extern.slf4j.Slf4j;??
import?org.springframework.amqp.core.Message;??
import?org.springframework.amqp.core.MessageListener;??
import?org.springframework.amqp.core.Queue;??
import?org.springframework.amqp.rabbit.connection.CachingConnectionFactory;??
import?org.springframework.amqp.rabbit.connection.ConnectionFactory;??
import?org.springframework.amqp.rabbit.core.RabbitAdmin;??
import?org.springframework.amqp.rabbit.core.RabbitTemplate;??
import?org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;??
import?org.springframework.amqp.rabbit.support.CorrelationData;??
import?org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;??
import?org.springframework.beans.factory.annotation.Autowired;??
import?org.springframework.beans.factory.config.ConfigurableBeanFactory;??
import?org.springframework.context.annotation.Bean;??
import?org.springframework.context.annotation.Configuration;??
import?org.springframework.context.annotation.Scope;??
import?org.springframework.messaging.converter.MappingJackson2MessageConverter;??
/**
?*?Created?by?huguoju?on?2017/3/2.
?*?創(chuàng)建消息生產(chǎn)者
?*/??
@Configuration??
@Slf4j??
public?class?AmqpConfig?{??
@Autowired??
private?RabbitMq?rabbitMq;??
/**
?????*?連接rabbitmq
?????*?@return
?????*/??
@Bean??
public?ConnectionFactory?connectionFactory(){??
CachingConnectionFactory?connectionFactory=new?CachingConnectionFactory();??
????????connectionFactory.setAddresses(rabbitMq.getAddresses());??
????????connectionFactory.setUsername(rabbitMq.getUsername());??
????????connectionFactory.setPassword(rabbitMq.getPassword());??
/**
?????????*?對(duì)于每一個(gè)RabbitTemplate只支持一個(gè)ReturnCallback。
?????????*?對(duì)于返回消息笋额,模板的mandatory屬性必須被設(shè)定為true元暴,
?????????*?它同樣要求CachingConnectionFactory的publisherReturns屬性被設(shè)定為true。
?????????*?如果客戶端通過調(diào)用setReturnCallback(ReturnCallback?callback)注冊(cè)了RabbitTemplate.ReturnCallback兄猩,那么返回將被發(fā)送到客戶端茉盏。
?????????*?這個(gè)回調(diào)函數(shù)必須實(shí)現(xiàn)下列方法:
?????????*void?returnedMessage(Message?message,?intreplyCode,?String?replyText,String?exchange,?String?routingKey);
?????????*/??
//?connectionFactory.setPublisherReturns(true);??
/**
?????????*?同樣一個(gè)RabbitTemplate只支持一個(gè)ConfirmCallback。
?????????*?對(duì)于發(fā)布確認(rèn)枢冤,template要求CachingConnectionFactory的publisherConfirms屬性設(shè)置為true鸠姨。
?????????*?如果客戶端通過setConfirmCallback(ConfirmCallback?callback)注冊(cè)了RabbitTemplate.ConfirmCallback,那么確認(rèn)消息將被發(fā)送到客戶端淹真。
?????????*?這個(gè)回調(diào)函數(shù)必須實(shí)現(xiàn)以下方法:
?????????*?void?confirm(CorrelationData?correlationData,?booleanack);
?????????*/??
????????connectionFactory.setPublisherConfirms(rabbitMq.getPublisherconfirms());??
return?connectionFactory;??
????}??
/**
?????*?rabbitAdmin代理類
?????*?@return
?????*/??
@Bean??
public?RabbitAdmin?rabbitAdmin(ConnectionFactory?connectionFactory){??
return?new?RabbitAdmin(connectionFactory);??
????}??
/**
?????*?創(chuàng)建rabbitTemplate?消息模板類
?????*?prototype原型模式:每次獲取Bean的時(shí)候會(huì)有一個(gè)新的實(shí)例
?????*??因?yàn)橐O(shè)置回調(diào)類讶迁,所以應(yīng)是prototype類型,如果是singleton類型趟咆,則回調(diào)類為最后一次設(shè)置
?????*?@return
?????*/??
@Bean??
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)??
public?RabbitTemplate?rabbitTemplate(){??
RabbitTemplate?rabbitTemplate=new?RabbitTemplate(connectionFactory());??
//?rabbitTemplate.setMandatory(true);//返回消息必須設(shè)置為true??
rabbitTemplate.setMessageConverter(new?Jackson2JsonMessageConverter());//數(shù)據(jù)轉(zhuǎn)換為json存入消息隊(duì)列??
//??rabbitTemplate.setReplyAddress(replyQueue().getName());??
//??rabbitTemplate.setReplyTimeout(100000000);??
//發(fā)布確認(rèn)??
rabbitTemplate.setConfirmCallback(new?RabbitTemplate.ConfirmCallback()?{??
//消息發(fā)送到queue時(shí)就執(zhí)行??
@Override??
public?void?confirm(CorrelationData?correlationData,?boolean?b,?String?s)?{??
log.debug(correlationData+"http://////");??
if?(!b){??
log.debug("發(fā)送到queue失敗");??
throw?new?RuntimeException("send?error?"?+?s);??
????????????????}??
????????????}??
????????});??
return?rabbitTemplate;??
????}??
}??
?3.2創(chuàng)建exchange
[java]?view plain?copy
package?com.demo.rabbitmq.sender;??
/**
?*?Created?by?huguoju?on?2017/3/2.
?*?exchange交換機(jī)配置
?*/??
public?interface?RabbitMqExchange?{??
final?String?CONTRACT_FANOUT?=?"CONTRACT_FANOUT";??
final?String?CONTRACT_TOPIC?=?"CONTRACT_TOPIC";??
final?String?CONTRACT_DIRECT?=?"CONTRACT_DIRECT";??
}??
3.3創(chuàng)建queue
[java]?view plain?copy
package?com.demo.rabbitmq.sender;??
/**
?*?Created?by?huguoju?on?2017/3/2.
?*?消息隊(duì)列配置
?*/??
public?interface?RabbitMqQueue?{??
final?String?CONTRACE_SELF?="CONTRACT_SELF";??
final?String?CONTRACE_TENANT?="CONTRACT_TENANT";??
}??
3.4針對(duì)rabbitmq服務(wù)性的配置添瓷,配置queue和交換機(jī)并綁定
[java]?view plain?copy
package?com.demo.rabbitmq.sender;??
import?org.springframework.amqp.core.*;??
import?org.springframework.amqp.rabbit.core.RabbitAdmin;??
import?org.springframework.beans.factory.annotation.Autowired;??
import?org.springframework.beans.factory.annotation.Qualifier;??
import?org.springframework.context.annotation.Bean;??
import?org.springframework.context.annotation.Configuration;??
/**
?*?Created?by?huguoju?on?2017/3/2.
?*?交換機(jī)配置并綁定queue
?*/??
@Configuration??
public?class?ContractExchangeConfig?{??
@Autowired??
private?RabbitAdmin?rabbitAdmin;??
/**
?????*?不處理路由鍵。你只需要簡單的將隊(duì)列綁定到交換機(jī)上值纱。一個(gè)發(fā)送到交換機(jī)的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上鳞贷。很像子網(wǎng)廣播,每臺(tái)子網(wǎng)內(nèi)的主機(jī)都獲得了一份復(fù)制的消息虐唠。Fanout交換機(jī)轉(zhuǎn)發(fā)消息是最快的搀愧。
?????*?@return
?????*/??
//????@Bean??
//????FanoutExchange?contractFanoutExchange(){??
//????????FanoutExchange?fanoutExchange=new?FanoutExchange(RabbitMqExchange.CONTRACT_FANOUT);??
//????????rabbitAdmin.declareExchange(fanoutExchange);??
//????????return?fanoutExchange;??
//????}??
/**
?????*??將路由鍵和某模式進(jìn)行匹配。此時(shí)隊(duì)列需要綁定要一個(gè)模式上。符號(hào)“#”匹配一個(gè)或多個(gè)詞咱筛,符號(hào)“*”匹配不多不少一個(gè)詞搓幌。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*”?只會(huì)匹配到“audit.irs”
?????*??默認(rèn):,?durable?=?true,?autoDelete?=?false
?????*?@return
?????*/??
@Bean??
????TopicExchange?contractTopicExchangeDurable(){??
TopicExchange?contractTopicExchange=new?TopicExchange(RabbitMqExchange.CONTRACT_TOPIC);??
????????rabbitAdmin.declareExchange(contractTopicExchange);??
return?contractTopicExchange;??
????}??
/**
?????*??處理路由鍵迅箩。需要將一個(gè)隊(duì)列綁定到交換機(jī)上溉愁,要求該消息與一個(gè)特定的路由鍵完全匹配。這是一個(gè)完整的匹配饲趋。如果一個(gè)隊(duì)列綁定到該交換機(jī)上要求路由鍵?“dog”拐揭,則只有被標(biāo)記為“dog”的消息才被轉(zhuǎn)發(fā),不會(huì)轉(zhuǎn)發(fā)dog.puppy奕塑,也不會(huì)轉(zhuǎn)發(fā)dog.guard堂污,只會(huì)轉(zhuǎn)發(fā)dog
?????*?@return
?????*/??
@Bean??
????DirectExchange?contractDirectExchange(){??
DirectExchange?contractDirectExchange=new?DirectExchange(RabbitMqExchange.CONTRACT_DIRECT);??
????????rabbitAdmin.declareExchange(contractDirectExchange);??
return?contractDirectExchange;??
????}??
@Bean??
????Queue?queueContract(){??
Queue?queue=new?Queue(RabbitMqQueue.CONTRACE_SELF,true);??
????????rabbitAdmin.declareQueue(queue);??
return?queue;??
????}??
@Bean??
????Queue?queueTenant(){??
Queue?queue=new?Queue(RabbitMqQueue.CONTRACE_TENANT,true);??
????????rabbitAdmin.declareQueue(queue);??
return?queue;??
????}??
//????@Bean??
//????Binding?bindingExchangeContract(Queue?queueContract,FanoutExchange?exchange){??
//????????Binding?binding=BindingBuilder.bind(queueContract).to(exchange);??
//????????rabbitAdmin.declareBinding(binding);??
//????????return?binding;??
//????}??
@Bean??
????Binding?bindingExchangeContract(Queue?queueContract,TopicExchange?exchange){??
????????Binding?binding=BindingBuilder.bind(queueContract).to(exchange).with(RabbitMqQueue.CONTRACE_SELF);??
????????rabbitAdmin.declareBinding(binding);??
return?binding;??
????}??
//????@Bean??
//????Binding?bindingExchangeContract(Queue?queueContract,DirectExchange?exchange){??
//????????Binding?binding=BindingBuilder.bind(queueContract).to(exchange).with(RabbitMqQueue.CONTRACE_SELF);??
//????????rabbitAdmin.declareBinding(binding);??
//????????return?binding;??
//????}??
@Bean??
????Binding?bindingExchangeTenant(Queue?queueTenant,?TopicExchange?exchange)?{??
????????Binding?binding?=?BindingBuilder.bind(queueTenant).to(exchange).with(RabbitMqQueue.CONTRACE_TENANT);??
????????rabbitAdmin.declareBinding(binding);??
return?binding;??
????}??
//????@Bean??
//????Binding?bindingExchangeTenant(Queue?queueTenant,?DirectExchange?exchange)?{??
//????????Binding?binding?=?BindingBuilder.bind(queueTenant).to(exchange).with(RabbitMqQueue.CONTRACE_TENANT);??
//????????rabbitAdmin.declareBinding(binding);??
//????????return?binding;??
//????}??
}??
3.5創(chuàng)建消息體
[java]?view plain?copy
package?com.demo.rabbitmq.sender;??
import?lombok.Builder;??
import?lombok.Data;??
import?lombok.Getter;??
import?java.util.Date;??
import?java.util.List;??
/**??
*?Created?by?huguoju?on2017/3/3.??
[java]?view plain?copy
?*不能用@Builder,因?yàn)閖son反編譯的時(shí)候需要set方法龄砰,builder沒有set方法??
?*?合同消息載體??
?*/??
//@Builder??
//@Getter??
@Data??
public?class?ContractRabbitMq?{??
private?String?id;??
private?String?name;??
private?List?testList;??
private?Date?createDate;??
}??
[java]?view plain?copy
package?com.demo.rabbitmq.sender;??
import?lombok.Builder;??
import?lombok.Getter;??
/**
?*?Created?by?huguoju?on?2017/3/3.
?*?tenant消息載體
?*/??
@Builder??
@Getter??
public?class?TenantRabbitMq?{??
private?String?id;??
private?String?name;??
}??
4消費(fèi)者配置,實(shí)際使用時(shí)應(yīng)該和生產(chǎn)者不在一個(gè)項(xiàng)目里盟猖,這里只是演示,所有放在了一個(gè)項(xiàng)目里换棚,很多公用的文件在實(shí)際開發(fā)中可以打jar用
4.1消息的監(jiān)聽的代理類
[java]?view plain?copy
package?com.demo.rabbitmq.consumer;??
import?com.demo.model.RabbitMq;??
import?com.demo.rabbitmq.sender.RabbitMqExchange;??
import?com.demo.rabbitmq.sender.RabbitMqQueue;??
import?org.springframework.amqp.core.*;??
import?org.springframework.amqp.rabbit.annotation.EnableRabbit;??
import?org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;??
import?org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;??
import?org.springframework.amqp.rabbit.connection.CachingConnectionFactory;??
import?org.springframework.amqp.rabbit.connection.ConnectionFactory;??
import?org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;??
import?org.springframework.beans.factory.annotation.Autowired;??
import?org.springframework.context.annotation.Bean;??
import?org.springframework.context.annotation.Configuration;??
import?org.springframework.messaging.converter.MappingJackson2MessageConverter;??
import?org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;??
/**
?*?Created?by?huguoju?on?2017/3/3.
?*?接收方配置
?*?消息的監(jiān)聽的代理類
?*/??
@Configuration??
@EnableRabbit??
public?class?ConsumerConfig?implements?RabbitListenerConfigurer?{??
@Autowired??
????ReceiverService?receiverService;??
@Autowired??
private?RabbitMq?rabbitMq;??
@Autowired??
private?ConnectionFactory?connectionFactory;??
@Bean??
public?DefaultMessageHandlerMethodFactory?handlerMethodFactory(){??
DefaultMessageHandlerMethodFactory?factory=new?DefaultMessageHandlerMethodFactory();??
factory.setMessageConverter(new?MappingJackson2MessageConverter());??
return?factory;??
????}??
//????@Bean??
//????public?SimpleMessageListenerContainer?messageContainer()?{??
//????????SimpleMessageListenerContainer?container?=?new?SimpleMessageListenerContainer(connectionFactory());??
//????????container.setQueues(queueContract());??
//??????//??container.setExposeListenerChannel(true);??
//????????container.setMaxConcurrentConsumers(1);??
//????????container.setConcurrentConsumers(1);??
//????????container.setAcknowledgeMode(AcknowledgeMode.MANUAL);?//設(shè)置確認(rèn)模式手工確認(rèn)??
//????????container.setMessageListener(new?MessageListener()?{??
//??
//????????????@Override??
//????????????public?void?onMessage(Message?message)?{??
//????????????????byte[]?body?=?message.getBody();??
//????????????????System.out.println("receive?msg?:?"?+?new?String(body));??
//???????????????//?channel.basicAck(message.getMessageProperties().getDeliveryTag(),?false);?//確認(rèn)消息成功消費(fèi)??
//????????????}??
//????????});??
//????????return?container;??
//????}??
@Bean??
public?SimpleRabbitListenerContainerFactory?rabbitListenerContainerFactory(){??
SimpleRabbitListenerContainerFactory?factory=new?SimpleRabbitListenerContainerFactory();??
????????factory.setConnectionFactory(connectionFactory);??
????????factory.setAcknowledgeMode(AcknowledgeMode.AUTO);??
return?factory;??
????}??
@Override??
public?void?configureRabbitListeners(RabbitListenerEndpointRegistrar?rabbitListenerEndpointRegistrar)?{??
????????rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(handlerMethodFactory());??
????}??
}??
4.2消費(fèi)者監(jiān)聽
[java]?view plain?copy
package?com.demo.rabbitmq.consumer;??
import?com.demo.rabbitmq.sender.ContractRabbitMq;??
import?com.demo.rabbitmq.sender.RabbitMqQueue;??
import?com.demo.rabbitmq.sender.TenantRabbitMq;??
import?com.fasterxml.jackson.databind.ObjectMapper;??
import?org.springframework.amqp.rabbit.annotation.RabbitHandler;??
import?org.springframework.amqp.rabbit.annotation.RabbitListener;??
import?org.springframework.stereotype.Component;??
import?java.io.IOException;??
/**
?*?Created?by?huguoju?on?2017/3/3.
?*/??
@Component??
public?class?ReceiverService?{??
@RabbitListener(queues?=?RabbitMqQueue.CONTRACE_SELF)??
@RabbitHandler??
public?void?receiveContractQueue(ContractRabbitMq?contract)?{??
ObjectMapper?objectMapper=new?ObjectMapper();??
try?{??
System.out.println("Received?contract<"?+?objectMapper.writeValueAsString(contract)?+?">");??
}catch?(IOException?e)?{??
????????????e.printStackTrace();??
????????}??
????}??
@RabbitListener(queues?=?RabbitMqQueue.CONTRACE_TENANT)??
public?void?receiveTenantQueue(TenantRabbitMq?tenant)?{??
ObjectMapper?objectMapper=new?ObjectMapper();??
try?{??
System.out.println("Received?contract<"?+?objectMapper.writeValueAsString(tenant)?+?">");??
}catch?(IOException?e)?{??
????????????e.printStackTrace();??
????????}??
????}??
}??
以上就完成了式镐。
創(chuàng)建測(cè)試controller
[java]?view plain?copy
package?com.demo.controller;??
import?com.demo.rabbitmq.sender.ContractRabbitMq;??
import?com.demo.service.rabbitMq.ContractRabbitmqService;??
import?com.google.common.collect.Lists;??
import?io.swagger.annotations.Api;??
import?org.springframework.beans.factory.annotation.Autowired;??
import?org.springframework.web.bind.annotation.RequestMapping;??
import?org.springframework.web.bind.annotation.RequestMethod;??
import?org.springframework.web.bind.annotation.RestController;??
import?java.util.Date;??
/**
?*?Created?by?huguoju?on?2017/3/6.
?*/??
@RestController??
@RequestMapping("rabbitmq")??
@Api(value?=?"測(cè)試rabbitmq",tags?=?"測(cè)試rabbitmq")??
public?class?RabbitMqController?{??
@Autowired??
public?ContractRabbitmqService?contractRabbitmqService;??
@RequestMapping(value?=?"contract/topic",method?=?{RequestMethod.POST,RequestMethod.GET})??
public?void?contractTopic(){??
ContractRabbitMq?mq=new?ContractRabbitMq();??
mq.setId("15");??
mq.setName("測(cè)試");??
mq.setTestList(Lists.newArrayList("111","222"));??
mq.setCreateDate(new?Date());??
????????contractRabbitmqService.sendContractRabbitmqTopic(mq);??
????}??
}??