RabbitMQ默認(rèn)的調(diào)度機(jī)制是均勻的發(fā)送給消費(fèi)者,公平調(diào)度掂名。
但是實(shí)際情況下可能會(huì)出現(xiàn)消費(fèi)者消息能力不同的情況兰珍,這樣的話整體的消費(fèi)能力會(huì)降低藕溅。
RabbitMQ提供了prefetchcount=1來設(shè)置調(diào)度策略, 在消費(fèi)者處理并確認(rèn)前一條消息之前,不要向其發(fā)送新消息杯矩。相反栈虚,它將把它發(fā)送給下一個(gè)還不忙的消費(fèi)者。
如果是用的spring-boot-starter-amqp框架史隆,可以通過在配置文件設(shè)置"spring.rabbitmq.listener.simple.prefetch=1"
消費(fèi)者實(shí)現(xiàn)代碼:
@RabbitListener(queues = "q.test.direct")
public void exec1(Message message, @Headers Map<String, Object> headers, Channel channel) throws InterruptedException {
logger.info("exec1 message {}", new String(message.getBody()));
Thread.sleep(3000L);
}
@RabbitListener(queues = "q.test.direct")
public void exec2(Message message, @Headers Map<String, Object> headers, Channel channel) {
logger.info("exec2 message {}", new String(message.getBody()));
}
修改完成的執(zhí)行結(jié)果:
2019-09-19 17:06:17.347 INFO 79404 --- [ntContainer#0-1] com.jiang.rabbitmqdemo.Consumer : exec1 message test
2019-09-19 17:06:17.347 INFO 79404 --- [ntContainer#1-1] com.jiang.rabbitmqdemo.Consumer : exec2 message test
2019-09-19 17:06:17.351 INFO 79404 --- [ntContainer#1-1] com.jiang.rabbitmqdemo.Consumer : exec2 message test
2019-09-19 17:06:17.354 INFO 79404 --- [ntContainer#1-1] com.jiang.rabbitmqdemo.Consumer : exec2 message test
2019-09-19 17:06:17.357 INFO 79404 --- [ntContainer#1-1] com.jiang.rabbitmqdemo.Consumer : exec2 message test
2019-09-19 17:06:17.360 INFO 79404 --- [ntContainer#1-1] com.jiang.rabbitmqdemo.Consumer : exec2 message test
2019-09-19 17:06:17.362 INFO 79404 --- [ntContainer#1-1] com.jiang.rabbitmqdemo.Consumer : exec2 message test
2019-09-19 17:06:17.365 INFO 79404 --- [ntContainer#1-1] com.jiang.rabbitmqdemo.Consumer : exec2 message test
2019-09-19 17:06:17.369 INFO 79404 --- [ntContainer#1-1] com.jiang.rabbitmqdemo.Consumer : exec2 message test
2019-09-19 17:06:17.376 INFO 79404 --- [ntContainer#1-1] com.jiang.rabbitmqdemo.Consumer : exec2 message test