實(shí)際使用RabbitMQ過(guò)程中,如果完全不配置QoS迁央,這樣Rabbit會(huì)盡可能快速地
發(fā)送隊(duì)列中的所有消息到client端币励。因?yàn)閏onsumer在本地緩存所有的message阳仔,
從而極有可能導(dǎo)致OOM或者導(dǎo)致服務(wù)器內(nèi)存不足影響其它進(jìn)程的正常運(yùn)行攒霹。所以我們
需要通過(guò)設(shè)置Qos的prefetch count來(lái)控制consumer的流量导俘。同時(shí)設(shè)置得當(dāng)也會(huì)提高consumer的吞吐量。
prefetch與消息投遞
prefetch允許為每個(gè)consumer指定最大的unacked messages數(shù)目剔蹋。簡(jiǎn)單來(lái)說(shuō)就是用來(lái)指定一個(gè)consumer一次可以從Rabbit中獲取多少條message并緩存在client中(RabbitMQ提供的各種語(yǔ)言的client library)旅薄。一旦緩沖區(qū)滿了,Rabbit將會(huì)停止投遞新的message到該consumer中直到它發(fā)出ack泣崩。
假設(shè)prefetch值設(shè)為10少梁,共有兩個(gè)consumer。意味著每個(gè)consumer每次會(huì)從queue中預(yù)抓取 10 條消息到本地緩存著等待消費(fèi)矫付。同時(shí)該channel的unacked數(shù)變?yōu)?0凯沪。而Rabbit投遞的順序是,先為consumer1投遞滿10個(gè)message买优,再往consumer2投遞10個(gè)message妨马。如果這時(shí)有新message需要投遞,先判斷channel的unacked數(shù)是否等于20杀赢,如果是則不會(huì)將消息投遞到consumer中烘跺,message繼續(xù)呆在queue中。之后其中consumer對(duì)一條消息進(jìn)行ack脂崔,unacked此時(shí)等于19滤淳,Rabbit就判斷哪個(gè)consumer的unacked少于10,就投遞到哪個(gè)consumer中砌左。
總的來(lái)說(shuō)脖咐,consumer負(fù)責(zé)不斷處理消息,不斷ack汇歹,然后只要unacked數(shù)少于prefetch * consumer數(shù)目屁擅,broker就不斷將消息投遞過(guò)去。
如何設(shè)置
官方提供的java client可以通過(guò)channel來(lái)設(shè)置:
channel = connection.createChannel();
channel.basicQos(prefetch);
spring-amqp的話可通過(guò)配置文件來(lái)配置
<rabbit:listener-container connection-factory="connectionFactory" concurrency="2" prefetch="3">
<rabbit:listener ref="listener" queue-names="remoting.queue" />
</rabbit:listener-container>
這里需要注意的是产弹,spring-amqp中的prefetch默認(rèn)值是250派歌。
1.0 的spring-amqp默認(rèn)值是1; 2.0版本的為250取视。具體可看官方文檔
https://docs.spring.io/spring-amqp/docs/2.2.9.RELEASE/reference/html/#async-consumer
客戶端源碼剖析
官方Java客戶端提供了DefaultConsumer和QueueingConsumer兩種類來(lái)從queue中獲取消息硝皂。 其中QueueingConsumer內(nèi)部維護(hù)了一個(gè)阻塞隊(duì)列BlockingQueue,此隊(duì)列就是用來(lái)緩存從queue獲取的message作谭。當(dāng)調(diào)用 channel.basicConsume
后稽物,broker就會(huì)不斷往consumer投遞message,直到prefetch條折欠。
初始化的時(shí)候贝或,如果不指定BlockingQueue的長(zhǎng)度吼过,默認(rèn)值會(huì)設(shè)為Integer.MAX_VALUE,所以這就解釋了文章開(kāi)頭所說(shuō)的如果不設(shè)置Qos的話為什么會(huì)有可能導(dǎo)致OOM咪奖,因?yàn)榇藭r(shí)BlockingQueue會(huì)不斷膨脹盗忱,消耗內(nèi)存。所以設(shè)置了prefetch后羊赵,建議BlockingQueue的長(zhǎng)度(capacity)也初始化為prefetch趟佃。
另外需要注意的是,在調(diào)用channel.basicConsume
之后昧捷,consumer是通過(guò)異步方式來(lái)抓取message的闲昭,通過(guò)debug可以發(fā)現(xiàn)BlockingQueue的size是在異步地不斷增長(zhǎng)直到prefetch。而客戶端代碼可以通過(guò)consumer.nextDelivery()或consumer.nextDelivery(long timeout)方法來(lái)獲取message靡挥,其對(duì)應(yīng)的就是BlockingQueue的take()和poll(long timeout)方法序矩。
再來(lái)看看spring-amqp的comsumer,大致也一樣跋破。核心類BlockingQueueConsumer
public class BlockingQueueConsumer {
private final BlockingQueue<Delivery> queue;
//some code
...
public BlockingQueueConsumer(ConnectionFactory connectionFactory,
MessagePropertiesConverter messagePropertiesConverter,
ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode,
boolean transactional, int prefetchCount, boolean defaultRequeueRejected,
Map<String, Object> consumerArgs, boolean exclusive, String... queues) {
//... some code
this.queue = new LinkedBlockingQueue<Delivery>(prefetchCount);
}
BlockingQueueConsumer的構(gòu)造函數(shù)清楚說(shuō)明了每個(gè)消費(fèi)者內(nèi)部的隊(duì)列大小就是prefetch的大小簸淀。
吞吐量、延遲
prefetch并不是說(shuō)設(shè)置得越大越好毒返。過(guò)大可能導(dǎo)致consumer處理不過(guò)來(lái)租幕,一直在本地緩存的BlockingQueue里呆太久,這樣消息在客戶端的延遲就大大增加饿悬;而對(duì)于多個(gè)consumer的情況令蛉,則會(huì)分配不均勻,導(dǎo)致有些consumer一直在忙狡恬,有些則非常空閑蝎宇。
然而設(shè)置的過(guò)小弟劲,又會(huì)令到consumer不能充分工作,因?yàn)槲覀兛傁胨?00%的時(shí)間都是處于繁忙狀態(tài)姥芥,而這時(shí)可能會(huì)在處理完一條消息后兔乞,BlockingQueue為空,因?yàn)樾碌南⑦€未來(lái)得及到達(dá)凉唐,所以consumer就處于空閑狀態(tài)了庸追。
prefetch應(yīng)該設(shè)置多大,具體可參考這篇文章
Some queuing theory: throughput, latency and bandwidth
里面詳細(xì)論述吞吐量與prefetch之間的關(guān)系台囱。prefetch的設(shè)置與以下幾點(diǎn)有關(guān):
- 客戶端服務(wù)端之間網(wǎng)絡(luò)傳輸時(shí)間
- consumer消耗一條消息所執(zhí)行的業(yè)務(wù)邏輯的耗時(shí)
- 網(wǎng)絡(luò)狀況
【完】
如有紕漏淡溯,歡迎指出
參考資料:
RabbitMQ QOS vs. Competing Consumers