1叁丧、簡介
RabbitMQ 即一個(gè)消息隊(duì)列,主要是用來實(shí)現(xiàn)應(yīng)用程序的異步和解耦冈止,同時(shí)也能起到消息緩沖狂票,消息分發(fā)的作用。
2熙暴、創(chuàng)建一個(gè)springboot的項(xiàng)目
3闺属、添加RabbitMQ依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
4慌盯、在application.yml中配置RabbitMQ
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
publisher-confirms: true
virtual-host: /
5、創(chuàng)建一個(gè)rabbitMQ配置類(這個(gè)一定要看明白)
/**
* 說明:〈該類初始化創(chuàng)建隊(duì)列掂器、轉(zhuǎn)發(fā)器亚皂,并把隊(duì)列綁定到轉(zhuǎn)發(fā)器〉
*/
@Configuration
public class ApplicationConfig {
private static Logger log = LoggerFactory.getLogger(ApplicationConfig.class);
@Autowired
private CachingConnectionFactory connectionFactory;
final static String queueName = "helloQuery";
@Bean
public Queue helloQueue() {
return new Queue(queueName);
}
@Bean
public Queue userQueue() {
return new Queue("user");
}
@Bean
public Queue dirQueue() {
return new Queue("direct");
}
//===============以下是驗(yàn)證topic Exchange的隊(duì)列==========
// Bean默認(rèn)的name是方法名
@Bean(name="message")
public Queue queueMessage() {
return new Queue("topic.message");
}
@Bean(name="messages")
public Queue queueMessages() {
return new Queue("topic.messages");
}
//===============以上是驗(yàn)證topic Exchange的隊(duì)列===========
//===============以下是驗(yàn)證Fanout Exchange的隊(duì)列==========
@Bean(name="AMessage")
public Queue AMessage() {
return new Queue("fanout.A");
}
@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}
@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}
//===============以上是驗(yàn)證Fanout Exchange的隊(duì)列==========
/**
* exchange是交換機(jī)交換機(jī)的主要作用是接收相應(yīng)的消息并且綁定到指定的隊(duì)列.交換機(jī)有四種類型,分別為Direct,topic,headers,Fanout.
*
* Direct是RabbitMQ默認(rèn)的交換機(jī)模式,也是最簡單的模式.即創(chuàng)建消息隊(duì)列的時(shí)候,指定一個(gè)BindingKey.當(dāng)發(fā)送者發(fā)送消息的時(shí)候,指定對應(yīng)的Key.當(dāng)Key和消息隊(duì)列的BindingKey一致的時(shí)候,消息將會(huì)被發(fā)送到該消息隊(duì)列中.
*
* topic轉(zhuǎn)發(fā)信息主要是依據(jù)通配符,隊(duì)列和交換機(jī)的綁定主要是依據(jù)一種模式(通配符+字符串),而當(dāng)發(fā)送消息的時(shí)候,只有指定的Key和該模式相匹配的時(shí)候,消息才會(huì)被發(fā)送到該消息隊(duì)列中.
*
* headers也是根據(jù)一個(gè)規(guī)則進(jìn)行匹配,在消息隊(duì)列和交換機(jī)綁定的時(shí)候會(huì)指定一組鍵值對規(guī)則,而發(fā)送消息的時(shí)候也會(huì)指定一組鍵值對規(guī)則,當(dāng)兩組鍵值對規(guī)則相匹配的時(shí)候,消息會(huì)被發(fā)送到匹配的消息隊(duì)列中.
*
* Fanout是路由廣播的形式,將會(huì)把消息發(fā)給綁定它的全部隊(duì)列,即便設(shè)置了key,也會(huì)被忽略.
*/
@Bean
DirectExchange directExchange(){
return new DirectExchange("directExchange");
}
@Bean
TopicExchange exchange() {
// 參數(shù)1為交換機(jī)的名稱
return new TopicExchange("exchange");
}
/**
* //配置廣播路由器
* @return FanoutExchange
*/
@Bean
FanoutExchange fanoutExchange() {
// 參數(shù)1為交換機(jī)的名稱
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeDirect(@Qualifier("dirQueue")Queue dirQueue, DirectExchange directExchange){
return BindingBuilder.bind(dirQueue).to(directExchange).with("direct");
}
/**
* 將隊(duì)列topic.message與exchange綁定,routing_key為topic.message,就是完全匹配
* @param queueMessage
* @param exchange
* @return
*/
@Bean
// 如果參數(shù)名和上面用到方法名稱一樣国瓮,可以不用寫@Qualifier
Binding bindingExchangeMessage(@Qualifier("message")Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
/**
* 將隊(duì)列topic.messages與exchange綁定灭必,routing_key為topic.#,模糊匹配
* @param queueMessages
* @param exchange
* @return
*/
@Bean
Binding bindingExchangeMessages(@Qualifier("messages")Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
@Bean
Binding bindingExchangeA(@Qualifier("AMessage")Queue AMessage,FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
/**
* rabbitTemplate是thread safe的,主要是channel不能共用乃摹,但是在rabbitTemplate源碼里channel是threadlocal的禁漓,所以singleton沒問題。
* 但是rabbitTemplate要設(shè)置回調(diào)類孵睬,如果是singleton播歼,回調(diào)類就只能有一個(gè),所以如果想要設(shè)置不同的回調(diào)類掰读,就要設(shè)置為prototype的scope秘狞。
* @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE, proxyMode = ScopedProxyMode.TARGET_CLASS)
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(){
//若使用confirm-callback或return-callback,必須要配置publisherConfirms或publisherReturns為true
//每個(gè)rabbitTemplate只能有一個(gè)confirm-callback和return-callback蹈集,如果這里配置了烁试,那么寫生產(chǎn)者的時(shí)候不能再寫confirm-callback和return-callback
//使用return-callback時(shí)必須設(shè)置mandatory為true,或者在配置中設(shè)置mandatory-expression的值為true
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
//將對象序列化為json串
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
/**
* 如果消息沒有到exchange,則confirm回調(diào),ack=false
* 如果消息到達(dá)exchange,則confirm回調(diào),ack=true
* exchange到queue成功,則不回調(diào)return
* exchange到queue失敗,則回調(diào)return(需設(shè)置mandatory=true,否則不回回調(diào),消息就丟了)
*/
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
log.info("消息發(fā)送成功: correlationData:{}, ack{}, cause:{}", correlationData, ack, cause);
}else{
log.error("消息發(fā)送失敗: correlationData:{}, ack:{}, cause:{}", correlationData, ack, cause);
}
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("消息丟失: exchange:{}, route:{}, replyCode:{}, replyText:{}, message:{}", exchange, routingKey, replyCode, replyText, new String(message.getBody()));
}
});
return rabbitTemplate;
}
}
rabbitMQ配置類大約就這些內(nèi)容雾狈,里面我基本上都做了注釋廓潜。
下面我們就開始寫rabbitMQ的用法了
6、單生產(chǎn)者和單消費(fèi)者
6.1善榛、生產(chǎn)者
@Component
public class HelloSender1 {
/**
* AmqpTemplate可以說是RabbitTemplate父類,RabbitTemplate實(shí)現(xiàn)類RabbitOperations接口呻畸,RabbitOperations繼承了AmqpTemplate接口
*/
@Autowired
private AmqpTemplate rabbitTemplate;
@Autowired
private RabbitTemplate rabbitTemplate1;
/**
* 用于單生產(chǎn)者-》單消費(fèi)者測試
*/
public void send() {
String sendMsg = "hello1 " + new Date();
System.out.println("Sender1 : " + sendMsg); this.rabbitTemplate1.convertAndSend("helloQueue", sendMsg);
}
}
名為helloQueue的隊(duì)列在配置類創(chuàng)建好了移盆,項(xiàng)目啟動(dòng)的時(shí)候會(huì)自動(dòng)創(chuàng)建
6.2、消費(fèi)者
@Component
@RabbitListener(queues = "helloQueue")
public class HelloReceiver1 {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver1 : " + hello);
}
}
@RabbitListener注解是監(jiān)聽隊(duì)列的伤为,當(dāng)隊(duì)列有消息的時(shí)候咒循,它會(huì)自動(dòng)獲取。
@RabbitListener 標(biāo)注在類上面表示當(dāng)有收到消息的時(shí)候绞愚,就交給 @RabbitHandler 的方法處理叙甸,具體使用哪個(gè)方法處理,根據(jù) MessageConverter 轉(zhuǎn)換后的參數(shù)類型
注意
消息處理方法參數(shù)是由 MessageConverter 轉(zhuǎn)化位衩,若使用自定義 MessageConverter 則需要在 RabbitListenerContainerFactory 實(shí)例中去設(shè)置(默認(rèn) Spring 使用的實(shí)現(xiàn)是 SimpleRabbitListenerContainerFactory)
消息的 content_type 屬性表示消息 body 數(shù)據(jù)以什么數(shù)據(jù)格式存儲(chǔ)裆蒸,接收消息除了使用 Message 對象接收消息(包含消息屬性等信息)之外,還可直接使用對應(yīng)類型接收消息 body 內(nèi)容糖驴,但若方法參數(shù)類型不正確會(huì)拋異常:
application/octet-stream:二進(jìn)制字節(jié)數(shù)組存儲(chǔ)僚祷,使用 byte[]
application/x-java-serialized-object:java 對象序列化格式存儲(chǔ)佛致,使用 Object、相應(yīng)類型(反序列化時(shí)類型應(yīng)該同包同名辙谜,否者會(huì)拋出找不到類異常)
text/plain:文本數(shù)據(jù)類型存儲(chǔ)俺榆,使用 String
application/json:JSON 格式,使用 Object装哆、相應(yīng)類型
6.3罐脊、controller
/**
* 最簡單的hello生產(chǎn)和消費(fèi)實(shí)現(xiàn)(單生產(chǎn)者和單消費(fèi)者)
*/
@RequestMapping("/hello")
public void hello() {
helloSender1.send();
}
6.4、結(jié)果
控制臺(tái)的結(jié)果:
Sender1 : hello1 Mon Feb 18 10:13:35 CST 2019
2019-02-18 10:13:35,831 INFO (Application.java:169)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)
Receiver1 : hello1 Mon Feb 18 10:13:35 CST 2019
7蜕琴、單生產(chǎn)者對多消費(fèi)者
7.1萍桌、生產(chǎn)者
/**
* 用于單/多生產(chǎn)者-》多消費(fèi)者測試
*/
public void send(String msg) {
String sendMsg = msg + new Date();
System.out.println("Sender1 : " + sendMsg); this.rabbitTemplate.convertAndSend("helloQueue", sendMsg);
}
7.2、消費(fèi)者
消費(fèi)者1
@Component
@RabbitListener(queues = "helloQueue")
public class HelloReceiver1 {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver1 : " + hello);
}
}
消費(fèi)者2
@Component
@RabbitListener(queues = "helloQueue")
public class HelloReceiver2 {
@RabbitHandler public void process(String hello) {
System.out.println("Receiver2 : " + hello);
}
}
7.3奸绷、controller
/**
* 單生產(chǎn)者-多消費(fèi)者
*/
@RequestMapping("/oneToMany")
public void oneToMany() {
for(int i=0;i<10;i++){
helloSender1.send("hellomsg:"+i);
}
}
7.4梗夸、結(jié)果:
Sender1 : hellomsg:0Mon Feb 18 10:19:09 CST 2019
Sender1 : hellomsg:1Mon Feb 18 10:19:09 CST 2019
Sender1 : hellomsg:2Mon Feb 18 10:19:09 CST 2019
Sender1 : hellomsg:3Mon Feb 18 10:19:09 CST 2019
Sender1 : hellomsg:4Mon Feb 18 10:19:09 CST 2019
Sender1 : hellomsg:5Mon Feb 18 10:19:09 CST 2019
Sender1 : hellomsg:6Mon Feb 18 10:19:09 CST 2019
Sender1 : hellomsg:7Mon Feb 18 10:19:10 CST 2019
Sender1 : hellomsg:8Mon Feb 18 10:19:10 CST 2019
Sender1 : hellomsg:9Mon Feb 18 10:19:10 CST 2019
Receiver2 : hellomsg:0Mon Feb 18 10:19:09 CST 2019
Receiver2 : hellomsg:2Mon Feb 18 10:19:09 CST 2019
Receiver2 : hellomsg:4Mon Feb 18 10:19:09 CST 2019
Receiver1 : hellomsg:1Mon Feb 18 10:19:09 CST 2019
Receiver2 : hellomsg:6Mon Feb 18 10:19:09 CST 2019
Receiver1 : hellomsg:3Mon Feb 18 10:19:09 CST 2019
Receiver2 : hellomsg:8Mon Feb 18 10:19:10 CST 2019
Receiver1 : hellomsg:5Mon Feb 18 10:19:09 CST 2019
Receiver1 : hellomsg:7Mon Feb 18 10:19:10 CST 2019
Receiver1 : hellomsg:9Mon Feb 18 10:19:10 CST 2019
2019-02-18 10:19:10,041 INFO (Application.java:169)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)
2019-02-18 10:19:10,041 INFO (Application.java:169)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)
2019-02-18 10:19:10,042 INFO (Application.java:169)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)
2019-02-18 10:19:10,042 INFO (Application.java:169)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)
2019-02-18 10:19:10,042 INFO (Application.java:169)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)
2019-02-18 10:19:10,042 INFO (Application.java:169)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)
2019-02-18 10:19:10,044 INFO (Application.java:169)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)
2019-02-18 10:19:10,045 INFO (Application.java:169)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)
2019-02-18 10:19:10,045 INFO (Application.java:169)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)
2019-02-18 10:19:10,045 INFO (Application.java:169)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)
8、實(shí)體類的傳輸号醉,必須格式化
8.1反症、實(shí)體類
public class User implements Serializable {
private String name;
private String pass;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPass() {
return pass;
}
public void setPass(String pass) {
this.pass = pass;
}
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", pass='" + pass + '\'' +
'}';
}
}
8.2、生產(chǎn)者
/**
* 實(shí)體類的傳輸(springboot完美的支持對象的發(fā)送和接收畔派,不需要格外的配置铅碍。實(shí)體類必須序列化)
* @param user
*/
public void send(User user) {
System.out.println("user send : " + user.getName()+"/"+user.getPass());
this.rabbitTemplate.convertAndSend("userQueue", user);
}
8.3、消費(fèi)者
@Component
@RabbitListener(queues = "userQueue")
public class HelloReceiver3 {
@RabbitHandler
public void process(User user){
System.out.println("user receive : " + user.getName()+"/"+user.getPass());
}
}
8.4线椰、controller
/**
* 實(shí)體列的傳輸
*/
@RequestMapping("/userTest")
public void userTest(){
User user=new User();
user.setName("黃義波");
user.setPass("123456");
userSender.send(user);
}
8.5胞谈、結(jié)果
user send : 黃義波/123456
2019-02-18 10:24:24,251 INFO (Application.java:169)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)
user receive : 黃義波/123456
9、directExchange
Direct是RabbitMQ默認(rèn)的交換機(jī)模式,也是最簡單的模式.即創(chuàng)建消息隊(duì)列的時(shí)候,指定一個(gè)BindingKey.當(dāng)發(fā)送者發(fā)送消息的時(shí)候,指定對應(yīng)的Key.當(dāng)Key和消息隊(duì)列的BindingKey一致的時(shí)候,消息將會(huì)被發(fā)送到該消息隊(duì)列中.
9.1憨愉、在rabbitMQ配置類中添加內(nèi)容
@Bean
public Queue dirQueue() {
return new Queue("direct");
}
@Bean
DirectExchange directExchange(){
return new DirectExchange("directExchange");
}
/**
* 將隊(duì)列dirQueue與directExchange交換機(jī)綁定烦绳,routing_key為direct
* @param dirQueue
* @param directExchange
* @return
*/
@Bean
Binding bindingExchangeDirect(@Qualifier("dirQueue")Queue dirQueue,DirectExchange directExchange){
return BindingBuilder.bind(dirQueue).to(directExchange).with("direct");
}
9.2、生產(chǎn)者
@Component
public class DirectSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String msgString="directSender :hello i am hzb";
System.out.println(msgString);
this.rabbitTemplate.convertAndSend("direct", msgString);
}
}
9.3配紫、消費(fèi)者
@Component
@RabbitListener(queues = "direct")
public class DirectReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("directReceiver : " + msg);
}
}
9.4径密、controller
@RequestMapping("/directTest")
public void directTest() {
directSender.send();
}
9.5、結(jié)果
directSender :hello i am hyb
directReceiver : directSender :hello i am hyb
2019-02-18 10:33:25,974 INFO (Application.java:175)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)
10躺孝、topicExchange
topic轉(zhuǎn)發(fā)信息主要是依據(jù)通配符,隊(duì)列和交換機(jī)的綁定主要是依據(jù)一種模式(通配符+字符串),而當(dāng)發(fā)送消息的時(shí)候,只有指定的Key和該模式相匹配的時(shí)候,消息才會(huì)被發(fā)送到該消息隊(duì)列中.
10.1享扔、在rabbitMQ配置類中添加內(nèi)容
// Bean默認(rèn)的name是方法名
@Bean(name="message")
public Queue queueMessage() {
return new Queue("topic.message");
}
@Bean(name="messages")
public Queue queueMessages() {
return new Queue("topic.messages");
}
@Bean
TopicExchange exchange() {
// 參數(shù)1為交換機(jī)的名稱
return new TopicExchange("exchange");
}
/**
* 將隊(duì)列topic.message與exchange綁定,routing_key為topic.message,就是完全匹配
* @param queueMessage
* @param exchange
* @return
*/
@Bean
// 如果參數(shù)名和上面用到方法名稱一樣植袍,可以不用寫@Qualifier
Binding bindingExchangeMessage(@Qualifier("message")Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
/**
* 將隊(duì)列topic.messages與exchange綁定惧眠,routing_key為topic.#,模糊匹配
* @param queueMessages
* @param exchange
* @return
*/
@Bean
Binding bindingExchangeMessages(@Qualifier("messages")Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
10.2、生產(chǎn)者
@Component
public class TopicSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String msg1 = "I am topic.mesaage msg======";
System.out.println("sender1 : " + msg1);
this.rabbitTemplate.convertAndSend("exchange", "topic.message", msg1);
String msg2 = "I am topic.mesaages msg########";
System.out.println("sender2 : " + msg2);
this.rabbitTemplate.convertAndSend("exchange", "topic.messages", msg2);
}
}
10.3于个、消費(fèi)者
消費(fèi)者1
@Component
@RabbitListener(queues = "topic.message")
public class TopicMessageReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("topicMessageReceiver : " +msg);
}
}
消費(fèi)者2
@Component
@RabbitListener(queues = "topic.messages")
public class TopicMessagesReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("topicMessagesReceiver : " +msg);
}
}
10.4氛魁、controller
/**
* topic exchange類型rabbitmq測試
*/
@RequestMapping("/topicTest")
public void topicTest() {
topicSender.send();
}
10.5、結(jié)果
sender1 : I am topic.mesaage msg======
sender2 : I am topic.mesaages msg########
topicMessageReceiver : I am topic.mesaage msg======
topicMessagesReceiver : I am topic.mesaage msg======
topicMessagesReceiver : I am topic.mesaages msg########
2019-02-18 10:39:46,150 INFO (Application.java:175)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)
2019-02-18 10:39:46,206 INFO (Application.java:175)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)
11、fanoutExchange
Fanout是路由廣播的形式,將會(huì)把消息發(fā)給綁定它的全部隊(duì)列,即便設(shè)置了key,也會(huì)被忽略.
11.1呆盖、在rabbitMQ配置類中添加內(nèi)容
//===============以下是驗(yàn)證Fanout Exchange的隊(duì)列==========
@Bean(name="AMessage")
public Queue AMessage() {
return new Queue("fanout.A");
}
@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}
@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange() {
// 參數(shù)1為交換機(jī)的名稱
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA(@Qualifier("AMessage")Queue AMessage,FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
11.2拖云、生產(chǎn)者
@Component
public class FanoutSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String msgString="fanoutSender :hello i am hzb";
System.out.println(msgString); // 參數(shù)2被忽略
this.rabbitTemplate.convertAndSend("fanoutExchange","", msgString);
}
}
11.3、消費(fèi)者
消費(fèi)者A
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {
@RabbitHandler
public void process(String msg) {
System.out.println("FanoutReceiverA : " + msg);
}
}
消費(fèi)者B
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {
@RabbitHandler
public void process(String msg) {
System.out.println("FanoutReceiverB : " + msg);
}
}
消費(fèi)者C
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {
@RabbitHandler
public void process(String msg) {
System.out.println("FanoutReceiverC : " + msg);
}
}
11.4应又、controller
/**
* fanout exchange類型rabbitmq測試
*/
@RequestMapping("/fanoutTest")
public void fanoutTest() {
fanoutSender.send();
}
11.5宙项、結(jié)果
fanoutSender :hello i am hzb
FanoutReceiverA : fanoutSender :hello i am hyb
FanoutReceiverC : fanoutSender :hello i am hyb
FanoutReceiverB : fanoutSender :hello i am hyb
2019-02-18 10:45:38,760 INFO (Application.java:175)- 消息發(fā)送成功:correlationData(null),ack(true),cause(null)
12、配置類中的rabbitTemplate
@Bean
public RabbitTemplate rabbitTemplate(){
//若使用confirm-callback或return-callback株扛,必須要配置publisherConfirms或publisherReturns為true
//每個(gè)rabbitTemplate只能有一個(gè)confirm-callback和return-callback尤筐,如果這里配置了,那么寫生產(chǎn)者的時(shí)候不能再寫confirm-callback和return-callback
//使用return-callback時(shí)必須設(shè)置mandatory為true洞就,或者在配置中設(shè)置mandatory-expression的值為true
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
/**
* 如果消息沒有到exchange,則confirm回調(diào),ack=false
* 如果消息到達(dá)exchange,則confirm回調(diào),ack=true
* exchange到queue成功,則不回調(diào)return
* exchange到queue失敗,則回調(diào)return(需設(shè)置mandatory=true,否則不回回調(diào),消息就丟了)
*/
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
log.info("消息發(fā)送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
}else{
log.info("消息發(fā)送失敗:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
}
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
}
});
return rabbitTemplate;
}
好好看看注釋
13盆繁、不在配置類中配置callback
方法一:
13.1、配置一個(gè)接口
/**
* 說明:〈定義一個(gè)名為SendMessageService 的接口旬蟋,這個(gè)接口繼承了RabbitTemplate.ConfirmCallback油昂,
* ConfirmCallback接口是用來回調(diào)消息發(fā)送成功后的方法,當(dāng)一個(gè)消息被成功寫入到RabbitMQ服務(wù)端時(shí)倾贰,
* 會(huì)自動(dòng)的回調(diào)RabbitTemplate.ConfirmCallback接口內(nèi)的confirm方法完成通知
*/
public interface SendMessageService extends RabbitTemplate.ConfirmCallback{
void sendMessage(String exchange,String routekey,Object message);
}
13.2冕碟、實(shí)現(xiàn)這個(gè)接口
/**
* 說明:〈該類注入了RabbitTemplate,RabbitTemplate封裝了發(fā)送消息的方法匆浙,我們直接使用即可安寺。
* 可以看到我們構(gòu)建了一個(gè)回調(diào)返回的數(shù)據(jù),并使用convertAndSend方法發(fā)送了消息首尼。同時(shí)實(shí)現(xiàn)了confirm回調(diào)方法味赃,
* 通過判斷isSendSuccess可以知道消息是否發(fā)送成功脐帝,這樣我們就可以進(jìn)行進(jìn)一步處理厅瞎。
*/
@Service
public class SendMessageServiceImpl implements SendMessageService{
private static Logger logger = LoggerFactory.getLogger(SendMessageServiceImpl.class);
@Autowired private RabbitTemplate rabbitTemplate;
@Override
public void sendMessage(String exchange,String routekey,Object message) {
//設(shè)置回調(diào)對象
//rabbitTemplate.setConfirmCallback(this);
//rabbitTemplate.setMandatory(true);
//構(gòu)建回調(diào)返回的數(shù)據(jù)
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//rabbitTemplate.convertAndSend(Constants.SAVE_USER_EXCHANGE_NAME, Constants.SAVE_USER_QUEUE_ROUTE_KEY, message, correlationData);
rabbitTemplate.convertAndSend(exchange, routekey, message, correlationData);
logger.info("SendMessageServiceImpl() >>> 發(fā)送消息到RabbitMQ, 消息內(nèi)容: " + message);
}
/**
* 消息回調(diào)確認(rèn)方法
* @param correlationData 回調(diào)數(shù)據(jù)
* @param isSendSuccess 是否發(fā)送成功
* @param
*/
@Override
public void confirm(CorrelationData correlationData, boolean isSendSuccess, String s) {
logger.info("confirm回調(diào)方法>>>>>>>>>>>>>回調(diào)消息ID為: " + correlationData.getId());
if (isSendSuccess) {
logger.info("confirm回調(diào)方法>>>>>>>>>>>>>消息發(fā)送成功");
} else {
logger.info("confirm回調(diào)方法>>>>>>>>>>>>>消息發(fā)送失敗" + s);
}
}
}
方法二:
直接在生產(chǎn)者發(fā)送信息的時(shí)候修改rabbitTemplate
@Service
public class SendMessage1 {
private static Logger log = LoggerFactory.getLogger(SendMessage1.class);
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange, String routekey, Object message) {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("消息發(fā)送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
} else {
log.info("消息發(fā)送失敗:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
}
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
}
});
}
}
13慌闭、有時(shí)候消費(fèi)者出現(xiàn)錯(cuò)誤,需要人工處理
//構(gòu)建回調(diào)返回的數(shù)據(jù)
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//rabbitTemplate.convertAndSend(Constants.SAVE_USER_EXCHANGE_NAME, Constants.SAVE_USER_QUEUE_ROUTE_KEY, message, correlationData);
//rabbitTemplate.convertAndSend(exchange, routekey, message, correlationData);
// 將 CorrelationData的id 與 Message的correlationId綁定查排,然后關(guān)系保存起來,例如放到緩存中,然后人工處理
// 當(dāng)confirm或return回調(diào)時(shí),根據(jù)ack類別等,分別處理. 例如return或者ack=false則說明有問題,報(bào)警, 如果ack=true則刪除關(guān)系
// (因?yàn)閞eturn在confirm前,所以一條消息在return后又ack=true的情況也是按return處理)
Message message1 = MessageBuilder.withBody(message.toString().getBytes()).setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).setCorrelationId(correlationData.getId()).build();
rabbitTemplate.send(exchange, routekey, message1, correlationData);
將 CorrelationData的id 與 Message的correlationId綁定破加,然后關(guān)系保存起來,例如放到緩存中,然后人工處理
我們可以看到,這兩條消息關(guān)聯(lián)起來了雹嗦。
14、事務(wù)消息
消息確認(rèn)機(jī)制
RabbitMQ提供了transaction合是、confirm兩種消息確認(rèn)機(jī)制了罪。transaction即事務(wù)機(jī)制,手動(dòng)提交和回滾聪全;confirm機(jī)制提供了Confirmlistener和waitForConfirms兩種方式泊藕。confirm機(jī)制效率明顯會(huì)高于transaction機(jī)制,但后者的優(yōu)勢在于強(qiáng)一致性难礼。如果沒有特別的要求娃圆,建議使用confrim機(jī)制玫锋。
- 1、從實(shí)驗(yàn)來看讼呢,消息的確認(rèn)機(jī)制只是確認(rèn)publisher發(fā)送消息到broker撩鹿,由broker進(jìn)行應(yīng)答,不能確認(rèn)消息是否有效消費(fèi)悦屏。
- 2节沦、而為了確認(rèn)消息是否被發(fā)送給queue,應(yīng)該在發(fā)送消息中啟用參數(shù)mandatory=true础爬,使用ReturnListener接收未被發(fā)送成功的消息甫贯。
- 3、接下來就需要確認(rèn)消息是否被有效消費(fèi)看蚜。publisher端目前并沒有提供監(jiān)聽事件叫搁,但提供了應(yīng)答機(jī)制來保證消息被成功消費(fèi),應(yīng)答方式:
- basicAck:成功消費(fèi)供炎,消息從隊(duì)列中刪除
- basicNack:requeue=true渴逻,消息重新進(jìn)入隊(duì)列,false被刪除
- basicReject:等同于basicNack
- basicRecover:消息重入隊(duì)列碱茁,requeue=true裸卫,發(fā)送給新的consumer,false發(fā)送給相同的consumer
RabbitMQ是基于AMQP協(xié)議實(shí)現(xiàn)的纽竣,該協(xié)議實(shí)現(xiàn)了事務(wù)機(jī)制墓贿,因此RabbitMQ也支持事務(wù)機(jī)制。RabbitMQ中蜓氨,與事務(wù)機(jī)制有關(guān)的方法有三個(gè):
- txSelect():將當(dāng)前channel設(shè)置成transaction模式聋袋。
- txCommit():提交事務(wù)。
- txRollback():回滾事務(wù)穴吹。
- channel.basicPublish:發(fā)送消息幽勒,可以是多條,可以是消費(fèi)消息提交ack港令。
- autoAck=false啥容,手動(dòng)提交ack,以事務(wù)提交或回滾為準(zhǔn)顷霹。
- autoAck=true咪惠,不支持事務(wù)的,也就是說你即使在收到消息之后在回滾事務(wù)也是于事無補(bǔ)的淋淀,隊(duì)列已經(jīng)把消息移除了遥昧。
RabbitConfig
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Configuration
@Data
public class RabbitConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
}
RabbitUtil
- 通過@PostConstruct注解,在依賴注入之后,初始化ConnectionFactory炭臭。
- 提供創(chuàng)建ConnectionFactory的方法和創(chuàng)建Connection的方法永脓。
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@Component
public final class RabbitUtil {
@Resource
private RabbitConfig rabbitConfig;
private ConnectionFactory factory;
@PostConstruct
public void init() {
factory = new ConnectionFactory();
factory.setHost(rabbitConfig.getHost());
factory.setPort(rabbitConfig.getPort());
factory.setUsername(rabbitConfig.getUsername());
factory.setPassword(rabbitConfig.getPassword());
factory.setVirtualHost(rabbitConfig.getVirtualHost());
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(60000L);
}
public ConnectionFactory newConnectionFactory() {
return factory;
}
public Connection newConnection() throws IOException, TimeoutException {
return factory.newConnection();
}
}
TransactionProducer
import com.panda.rabbitmq.config.RabbitUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
@Component
public class TransactionProducer {
private final static String EXCHANGE_NAME = "publisherconfirm-exchange";
@Resource
private RabbitUtil rabbitUtil;
public void send(String routingKey, String message) throws IOException, TimeoutException {
Channel channel = null;
try {
Connection connection = rabbitUtil.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 聲明一個(gè)direct交換機(jī)
// 開啟事務(wù)
channel.txSelect();
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
//int i = 1 / 0;
// 提交事務(wù)
channel.txCommit();
} catch (Exception e) {
// 回滾事務(wù)
if (channel != null) {
channel.txRollback();
}
} finally {
if (channel != null) {
channel.close();
}
}
}
}
https://zhuanlan.zhihu.com/p/582787505