RabbitMQ之Qos prefetch

實(shí)際使用RabbitMQ過(guò)程中,如果完全不配置QoS迁央,這樣Rabbit會(huì)盡可能快速地
發(fā)送隊(duì)列中的所有消息到client端币励。因?yàn)閏onsumer在本地緩存所有的message阳仔,
從而極有可能導(dǎo)致OOM或者導(dǎo)致服務(wù)器內(nèi)存不足影響其它進(jìn)程的正常運(yùn)行攒霹。所以我們
需要通過(guò)設(shè)置Qos的prefetch count來(lái)控制consumer的流量导俘。同時(shí)設(shè)置得當(dāng)也會(huì)提高consumer的吞吐量。

prefetch與消息投遞

prefetch允許為每個(gè)consumer指定最大的unacked messages數(shù)目剔蹋。簡(jiǎn)單來(lái)說(shuō)就是用來(lái)指定一個(gè)consumer一次可以從Rabbit中獲取多少條message并緩存在client中(RabbitMQ提供的各種語(yǔ)言的client library)旅薄。一旦緩沖區(qū)滿了,Rabbit將會(huì)停止投遞新的message到該consumer中直到它發(fā)出ack泣崩。

假設(shè)prefetch值設(shè)為10少梁,共有兩個(gè)consumer。意味著每個(gè)consumer每次會(huì)從queue中預(yù)抓取 10 條消息到本地緩存著等待消費(fèi)矫付。同時(shí)該channel的unacked數(shù)變?yōu)?0凯沪。而Rabbit投遞的順序是,先為consumer1投遞滿10個(gè)message买优,再往consumer2投遞10個(gè)message妨马。如果這時(shí)有新message需要投遞,先判斷channel的unacked數(shù)是否等于20杀赢,如果是則不會(huì)將消息投遞到consumer中烘跺,message繼續(xù)呆在queue中。之后其中consumer對(duì)一條消息進(jìn)行ack脂崔,unacked此時(shí)等于19滤淳,Rabbit就判斷哪個(gè)consumer的unacked少于10,就投遞到哪個(gè)consumer中砌左。

總的來(lái)說(shuō)脖咐,consumer負(fù)責(zé)不斷處理消息,不斷ack汇歹,然后只要unacked數(shù)少于prefetch * consumer數(shù)目屁擅,broker就不斷將消息投遞過(guò)去。

如何設(shè)置

官方提供的java client可以通過(guò)channel來(lái)設(shè)置:

channel = connection.createChannel();
channel.basicQos(prefetch);

spring-amqp的話可通過(guò)配置文件來(lái)配置

<rabbit:listener-container connection-factory="connectionFactory" concurrency="2" prefetch="3">
    <rabbit:listener ref="listener" queue-names="remoting.queue" />
</rabbit:listener-container>

這里需要注意的是产弹,spring-amqp中的prefetch默認(rèn)值是250派歌。

1.0 的spring-amqp默認(rèn)值是1; 2.0版本的為250取视。具體可看官方文檔
https://docs.spring.io/spring-amqp/docs/2.2.9.RELEASE/reference/html/#async-consumer

客戶端源碼剖析

官方Java客戶端提供了DefaultConsumer和QueueingConsumer兩種類來(lái)從queue中獲取消息硝皂。 其中QueueingConsumer內(nèi)部維護(hù)了一個(gè)阻塞隊(duì)列BlockingQueue,此隊(duì)列就是用來(lái)緩存從queue獲取的message作谭。當(dāng)調(diào)用 channel.basicConsume后稽物,broker就會(huì)不斷往consumer投遞message,直到prefetch條折欠。

初始化的時(shí)候贝或,如果不指定BlockingQueue的長(zhǎng)度吼过,默認(rèn)值會(huì)設(shè)為Integer.MAX_VALUE,所以這就解釋了文章開(kāi)頭所說(shuō)的如果不設(shè)置Qos的話為什么會(huì)有可能導(dǎo)致OOM咪奖,因?yàn)榇藭r(shí)BlockingQueue會(huì)不斷膨脹盗忱,消耗內(nèi)存。所以設(shè)置了prefetch后羊赵,建議BlockingQueue的長(zhǎng)度(capacity)也初始化為prefetch趟佃。

另外需要注意的是,在調(diào)用channel.basicConsume之后昧捷,consumer是通過(guò)異步方式來(lái)抓取message的闲昭,通過(guò)debug可以發(fā)現(xiàn)BlockingQueue的size是在異步地不斷增長(zhǎng)直到prefetch。而客戶端代碼可以通過(guò)consumer.nextDelivery()或consumer.nextDelivery(long timeout)方法來(lái)獲取message靡挥,其對(duì)應(yīng)的就是BlockingQueue的take()和poll(long timeout)方法序矩。

再來(lái)看看spring-amqp的comsumer,大致也一樣跋破。核心類BlockingQueueConsumer

public class BlockingQueueConsumer {
    
    private final BlockingQueue<Delivery> queue;

    //some code
    ...

public BlockingQueueConsumer(ConnectionFactory connectionFactory,
            MessagePropertiesConverter messagePropertiesConverter,
            ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode,
            boolean transactional, int prefetchCount, boolean defaultRequeueRejected,
            Map<String, Object> consumerArgs, boolean exclusive, String... queues) {

        //... some code

        this.queue = new LinkedBlockingQueue<Delivery>(prefetchCount);
    }


BlockingQueueConsumer的構(gòu)造函數(shù)清楚說(shuō)明了每個(gè)消費(fèi)者內(nèi)部的隊(duì)列大小就是prefetch的大小簸淀。

吞吐量、延遲

prefetch并不是說(shuō)設(shè)置得越大越好毒返。過(guò)大可能導(dǎo)致consumer處理不過(guò)來(lái)租幕,一直在本地緩存的BlockingQueue里呆太久,這樣消息在客戶端的延遲就大大增加饿悬;而對(duì)于多個(gè)consumer的情況令蛉,則會(huì)分配不均勻,導(dǎo)致有些consumer一直在忙狡恬,有些則非常空閑蝎宇。

然而設(shè)置的過(guò)小弟劲,又會(huì)令到consumer不能充分工作,因?yàn)槲覀兛傁胨?00%的時(shí)間都是處于繁忙狀態(tài)姥芥,而這時(shí)可能會(huì)在處理完一條消息后兔乞,BlockingQueue為空,因?yàn)樾碌南⑦€未來(lái)得及到達(dá)凉唐,所以consumer就處于空閑狀態(tài)了庸追。

prefetch應(yīng)該設(shè)置多大,具體可參考這篇文章

Some queuing theory: throughput, latency and bandwidth

里面詳細(xì)論述吞吐量與prefetch之間的關(guān)系台囱。prefetch的設(shè)置與以下幾點(diǎn)有關(guān):

  1. 客戶端服務(wù)端之間網(wǎng)絡(luò)傳輸時(shí)間
  2. consumer消耗一條消息所執(zhí)行的業(yè)務(wù)邏輯的耗時(shí)
  3. 網(wǎng)絡(luò)狀況

【完】

如有紕漏淡溯,歡迎指出

參考資料:

RabbitMQ QOS vs. Competing Consumers

RabbitMQ消費(fèi)者的幾個(gè)參數(shù)

rabbitmq——prefetch count

AMQP: acknowledgement and prefetching

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市簿训,隨后出現(xiàn)的幾起案子咱娶,更是在濱河造成了極大的恐慌米间,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,273評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件膘侮,死亡現(xiàn)場(chǎng)離奇詭異屈糊,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)琼了,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,349評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門(mén)逻锐,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人雕薪,你說(shuō)我怎么就攤上這事昧诱。” “怎么了蹦哼?”我有些...
    開(kāi)封第一講書(shū)人閱讀 167,709評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵鳄哭,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我纲熏,道長(zhǎng)妆丘,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,520評(píng)論 1 296
  • 正文 為了忘掉前任局劲,我火速辦了婚禮勺拣,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘鱼填。我一直安慰自己药有,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,515評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布苹丸。 她就那樣靜靜地躺著愤惰,像睡著了一般。 火紅的嫁衣襯著肌膚如雪赘理。 梳的紋絲不亂的頭發(fā)上宦言,一...
    開(kāi)封第一講書(shū)人閱讀 52,158評(píng)論 1 308
  • 那天,我揣著相機(jī)與錄音商模,去河邊找鬼奠旺。 笑死,一個(gè)胖子當(dāng)著我的面吹牛施流,可吹牛的內(nèi)容都是我干的响疚。 我是一名探鬼主播,決...
    沈念sama閱讀 40,755評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼瞪醋,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼忿晕!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起趟章,我...
    開(kāi)封第一講書(shū)人閱讀 39,660評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤杏糙,失蹤者是張志新(化名)和其女友劉穎慎王,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體宏侍,經(jīng)...
    沈念sama閱讀 46,203評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡赖淤,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,287評(píng)論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了谅河。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片咱旱。...
    茶點(diǎn)故事閱讀 40,427評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖绷耍,靈堂內(nèi)的尸體忽然破棺而出吐限,到底是詐尸還是另有隱情,我是刑警寧澤褂始,帶...
    沈念sama閱讀 36,122評(píng)論 5 349
  • 正文 年R本政府宣布诸典,位于F島的核電站,受9級(jí)特大地震影響崎苗,放射性物質(zhì)發(fā)生泄漏狐粱。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,801評(píng)論 3 333
  • 文/蒙蒙 一胆数、第九天 我趴在偏房一處隱蔽的房頂上張望肌蜻。 院中可真熱鬧,春花似錦必尼、人聲如沸蒋搜。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,272評(píng)論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)豆挽。三九已至,卻和暖如春券盅,著一層夾襖步出監(jiān)牢的瞬間祷杈,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,393評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工渗饮, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人宿刮。 一個(gè)月前我還...
    沈念sama閱讀 48,808評(píng)論 3 376
  • 正文 我出身青樓互站,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親僵缺。 傳聞我的和親對(duì)象是個(gè)殘疾皇子胡桃,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,440評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)磕潮,斷路器翠胰,智...
    卡卡羅2017閱讀 134,693評(píng)論 18 139
  • 來(lái)源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器容贝。支持消息的持久化、事務(wù)之景、擁塞控...
    jiangmo閱讀 10,367評(píng)論 2 34
  • 1. 歷史 RabbitMQ是一個(gè)由erlang開(kāi)發(fā)的AMQP(Advanced Message Queue )的...
    高廣超閱讀 6,096評(píng)論 3 51
  • 關(guān)于消息隊(duì)列斤富,從前年開(kāi)始斷斷續(xù)續(xù)看了些資料,想寫(xiě)很久了锻狗,但一直沒(méi)騰出空满力,近來(lái)分別碰到幾個(gè)朋友聊這塊的技術(shù)選型,是時(shí)...
    預(yù)流閱讀 584,869評(píng)論 51 786
  • 為了一些初學(xué)習(xí)者更好理解我就從簡(jiǎn)單的解釋一下Rabbitmq的原理吧?轻纪,首先你可以這樣想RabbitMq就是一個(gè)隊(duì)...
    螃蟹和駱駝先生Yvan閱讀 7,408評(píng)論 6 4