本文介紹三種常用操作,基于spring-boot-starter-amqp依賴
- 手動ack
- work模式(能者多勞)
- 消息格式轉(zhuǎn)換
手動ack
消息確認(rèn)模式
在amqp協(xié)議中消息確認(rèn)有兩種模式
自動確認(rèn)模式(automatic acknowledgement model)當(dāng)消息代理將消息發(fā)送給應(yīng)用后立即刪除
顯式確認(rèn)模式(explicit acknowledgement model)待應(yīng)用發(fā)送一個確認(rèn)回執(zhí)后再刪除消息
而在spring-boot-starter-amqp,spring定義了三種
NONE 沒有ack的意思,對應(yīng)rabbitMQ的自動確認(rèn)模式
MANUAL 手動模式,對應(yīng)rabbitMQ的顯式確認(rèn)模式
AUTO 自動模式,對應(yīng)rabbitMQ的顯式確認(rèn)模式
首先注意的是spring-amqp中的自動模式與rabbit中的自動模式是不一樣的,其次,在spring-amqp中MANUAL 與 AUTO的關(guān)系有點類似于在spring中手動提交事務(wù)與自動提交事務(wù)的區(qū)別,一個是手動發(fā)送ack一個是在方法執(zhí)行完,沒有異常的情況下自動發(fā)送ack
代碼實現(xiàn)
三個步驟
設(shè)置消費者的消息確認(rèn)模式
手動確認(rèn)/拒絕消息
設(shè)置消息拒絕策略
設(shè)置消費者的消息確認(rèn)模式:
@Configuration
public class ListenerConfig {
@Bean("myListenerFactory")
public RabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory containerFactory=
new SimpleRabbitListenerContainerFactory();
containerFactory.setConnectionFactory(connectionFactory);
//設(shè)置消費者的消息確認(rèn)模式
containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return containerFactory;
}
}
手動確認(rèn)/拒絕消息:
@Component
@RabbitListener(
containerFactory = "myListenerFactory",
bindings = @QueueBinding(
value = @Queue("myManualAckQueue"),
exchange = @Exchange(value = "myManualAckExchange", type = ExchangeTypes.DIRECT),
key = "mine.manual"))
public class MyAckListener {
@RabbitHandler
public void onMessage(@Payload String msg,
@Headers Map<String, Object> headers,
Channel channel) throws Exception{
try {
System.out.println(msg);
//消息確認(rèn),(deliveryTag,multiple是否確認(rèn)所有消息)
channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG), false);
} catch (Exception e) {
//消息拒絕(deliveryTag,multiple,requeue拒絕后是否重新回到隊列)
channel.basicNack((Long) headers.get(AmqpHeaders.DELIVERY_TAG), false, false);
// 拒絕一條
// channel.basicReject();
}
}
}
設(shè)置消息拒絕策略:
拒絕策略是指,當(dāng)消息被消費者拒絕時該如何處理,丟棄或者是重新回到隊列.
在MANUAL 模式下,在拒絕消息的方法中設(shè)置
//消息拒絕(deliveryTag,multiple,requeue拒絕后是否重新回到隊列)
channel.basicNack((Long) headers.get(AmqpHeaders.DELIVERY_TAG), false, false);
在AUTO 模式下可通過RabbitListenerContainerFactory或是ListenerContainer設(shè)置,如
@Bean("myListenerFactory")
public RabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory containerFactory=
new SimpleRabbitListenerContainerFactory();
containerFactory.setConnectionFactory(connectionFactory);
//自動ack
containerFactory.setAcknowledgeMode(AcknowledgeMode.AUTO);
//拒絕策略,true回到隊列 false丟棄
containerFactory.setDefaultRequeueRejected(false);
return containerFactory;
}
需要注意的是,默認(rèn)的拒絕策略是回到隊列,所以,如果隊列只有一個消費者的話就會產(chǎn)生死循環(huán)
work模式-能者多勞
默認(rèn)情況下,如果有多個消費者在一個隊列上,消息是公平的分發(fā)給消費者的,一人一個輪著來,不考慮每個消費者之間的處理能力的差異,這可以通過設(shè)置預(yù)處理消息數(shù)(prefetchCount)緩解,或是使用work-能者多勞模式
work-能者多勞模式: 每個消費者的預(yù)處理消息數(shù)(prefetchCount)都設(shè)置為1,每個消費者消息確認(rèn)都為顯式確認(rèn)模式,即MANUAL,或是AUTO
如下,兩個消費者消費同一個queue上的消息,理論上consumer-one處理能力是consumer-two的兩倍
@Component
public class WorkListener {
private int one = 1;
private int two = 1;
@RabbitListener(containerFactory = "workListenerFactory",
queuesToDeclare = @Queue("workQueue"))
public void onMessageOne(String msg) throws InterruptedException {
Thread.sleep(100);
System.out.println("consumer-one 第 " + one + " 個消息 :" + msg);
one++;
}
@RabbitListener(containerFactory = "workListenerFactory",
queuesToDeclare = @Queue("workQueue"))
public void onMessageTwo(String msg) throws InterruptedException {
Thread.sleep(200);
System.out.println("consumer-two 第 " + two + " 個消息 :" + msg);
two++;
}
}
生產(chǎn)者,使用了上一篇中介紹的默認(rèn)交換機
@Autowired
private RabbitTemplate rabbitTemplate;
private void send() {
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend("workQueue", "this is a message");
}
}
執(zhí)行結(jié)果如下,符合預(yù)期,兩個消費者幾乎同時消費完畢,且one消費的消息數(shù)是two的兩倍
......
consumer-two 第 31 個消息 :this is a message
consumer-one 第 62 個消息 :this is a message
consumer-one 第 63 個消息 :this is a message
consumer-two 第 32 個消息 :this is a message
consumer-one 第 64 個消息 :this is a message
consumer-one 第 65 個消息 :this is a message
consumer-two 第 33 個消息 :this is a message
consumer-one 第 66 個消息 :this is a message
consumer-two 第 34 個消息 :this is a message
消息格式轉(zhuǎn)換
rabbirMQ中的消息對應(yīng)到j(luò)ava中對應(yīng)的實體類是 org.springframework.amqp.core.Message,所以消息轉(zhuǎn)換接口MessageConverter 有兩個主要方法 toMessage 和 fromMessage 顧名思義,即將發(fā)送的內(nèi)容與Message的互轉(zhuǎn)
SimpleMessageConverter
spring中默認(rèn)使用的是 SimpleMessageConverter 它的兩個轉(zhuǎn)化方法如下
toMessage,根據(jù) object instanceof xxx 轉(zhuǎn)化
fromMessage,根據(jù)MessageProperties的ContentType轉(zhuǎn)換
所以你大可以自己實現(xiàn)MessageConverter 接口自己轉(zhuǎn)換,當(dāng)然spring也提供了常用的轉(zhuǎn)化,如轉(zhuǎn)json,xml
Jackson2JsonMessageConverter
常用的將object與json互轉(zhuǎn)
生產(chǎn)者
@Autowired
private RabbitTemplate rabbitTemplate;
private void send() {
//實際項目不建議這么干,spring單例模式,
// 所以最好自己構(gòu)建一個"jasonRabbitTemplate",用的使用注入
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.convertAndSend("jsonQueue", new Student("zhangSan",15,"男"));
}
消費者
@Bean("jasonTemplate")
public RabbitTemplate jasonRabbitTemplate(ConnectionFactory connectionFactory) {
Jackson2JsonMessageConverter messageConverter =
new Jackson2JsonMessageConverter();
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//設(shè)置轉(zhuǎn)化類
rabbitTemplate.setMessageConverter(messageConverter);
return rabbitTemplate;
}
...
@Component
@RabbitListener(containerFactory = "jsonListenerFactory",
queuesToDeclare = @Queue("jsonQueue"))
public class JasonListener {
@RabbitHandler
public void onMessage(Student student) {
System.out.println(student);
}
}
消息內(nèi)容:
轉(zhuǎn)載請注明出處
系列文章