RabbitMQ消費(fèi)者的幾個(gè)參數(shù)

分布式消息中間件

RabbitMQ是用Erlang語(yǔ)言編寫的分布式消息中間件佩伤,常常用在大型網(wǎng)站中作為消息隊(duì)列來(lái)使用,主要目的是各個(gè)子系統(tǒng)之間的解耦和異步處理率翅。消息中間件的基本模型是典型的生產(chǎn)者-消費(fèi)者模型,生產(chǎn)者發(fā)送消息到消息隊(duì)列,消費(fèi)者監(jiān)聽(tīng)消息隊(duì)列念秧,收到消息后消費(fèi)處理。

在使用RabbitMQ做消息分發(fā)時(shí)布疼,主要有三個(gè)概念要注意:Exchange摊趾,RoutingKey,Queue游两。

Exchange可以理解為交換器砾层,RoutingKey可以理解為路由,Queue作為真實(shí)存儲(chǔ)消息的隊(duì)列和某個(gè)Exchange綁定贱案,具體如何路由到感興趣的Queue則由Exchange的三種模式?jīng)Q定:

  • fanout
  • topic
  • direct

Exchange為fanout時(shí)肛炮,生產(chǎn)者往此Exchange發(fā)送的消息會(huì)發(fā)給每個(gè)和其綁定的Queue,此時(shí)RoutingKey并不起作用;Exchange為topic時(shí)侨糟,生產(chǎn)者可以指定一個(gè)支持通配符的RoutingKey(如demo.*)發(fā)向此Exchange碍扔,凡是Exchange上RoutingKey滿足此通配符的Queue就會(huì)收到消息;direct類型的Exchange是最直接最簡(jiǎn)單的秕重,生產(chǎn)者指定Exchange和RoutingKey不同,然后往其發(fā)送消息,消息只能被綁定的滿足RoutingKey的Queue接受消息溶耘。(通常如果不指定RoutingKey的具體名字二拐,那么默認(rèn)的名字其實(shí)是Queue的名字)

Concurrency與Prefetch

在通常的使用中(Java項(xiàng)目),我們一般會(huì)結(jié)合spring-amqp框架來(lái)使用RabbitMQ汰具,spring-amqp底層調(diào)用RabbitMQ的java client來(lái)和Broker交互卓鹿,比如我們會(huì)用如下配置來(lái)建立RabbitMQ的連接池、聲明Queue以及指明監(jiān)聽(tīng)者的監(jiān)聽(tīng)行為:

<rabbit:connection-factory id="connectionFactory" />

<!-- template非必須留荔,主要用于生產(chǎn)者發(fā)送消息-->
<rabbit:template id="template" connection-factory="connectionFactory" />

<rabbit:queue name="remoting.queue" />
<rabbit:listener-container connection-factory="connectionFactory" concurrency="2" prefetch="3">
    <rabbit:listener ref="listener" queue-names="remoting.queue" />
</rabbit:listener-container>

listener-container可以設(shè)置消費(fèi)者在監(jiān)聽(tīng)Queue的時(shí)候的各種參數(shù)吟孙,其中concurrency和prefetch是本篇文章比較關(guān)心的兩個(gè)參數(shù),以下是spring-amqp文檔的解釋:

prefetchCount(prefetch)
The number of messages to accept from the broker in one socket frame. The higher this is the faster the messages can be delivered, but the higher the risk of non-sequential processing. Ignored if the acknowledgeMode
is NONE. This will be increased, if necessary, to match the txSize

concurrentConsumers(concurrency)

The number of concurrent consumers to initially start for each listener.

簡(jiǎn)單解釋下就是concurrency設(shè)置的是對(duì)每個(gè)listener在初始化的時(shí)候設(shè)置的并發(fā)消費(fèi)者的個(gè)數(shù)聚蝶,prefetch是每次從一次性從broker里面取的待消費(fèi)的消息的個(gè)數(shù)杰妓,上面的配置在監(jiān)控后臺(tái)看到的效果如下:


圖中可以看出有兩個(gè)消費(fèi)者同時(shí)監(jiān)聽(tīng)Queue,但是注意這里的消息只有被一個(gè)消費(fèi)者消費(fèi)掉就會(huì)自動(dòng)ack碘勉,另外一個(gè)消費(fèi)者就不會(huì)再獲取到此消息巷挥,Prefetch Count為配置設(shè)置的值3,意味著每個(gè)消費(fèi)者每次會(huì)預(yù)取3個(gè)消息準(zhǔn)備消費(fèi)验靡。每個(gè)消費(fèi)者對(duì)應(yīng)的listener有個(gè)Exclusive參數(shù)倍宾,默認(rèn)為false, 如果設(shè)置為true,concurrency就必須設(shè)置為1胜嗓,即只能單個(gè)消費(fèi)者消費(fèi)隊(duì)列里的消息高职,適用于必須嚴(yán)格執(zhí)行消息隊(duì)列的消費(fèi)順序(先進(jìn)先出)。

源碼剖析

這里concurrency的實(shí)現(xiàn)方式不看源碼也能猜到辞州,肯定是用多線程的方式來(lái)實(shí)現(xiàn)的怔锌,此時(shí)同一進(jìn)程下打開的本地端口都是56278.下面看看listener-contaner對(duì)應(yīng)的org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer的源碼:

protected int initializeConsumers() {
  int count = 0;
  synchronized (this.consumersMonitor) {
    if (this.consumers == null) {
        this.cancellationLock.reset();
        this.consumers = new HashMap<BlockingQueueConsumer, Boolean>(this.concurrentConsumers);
        for (int i = 0; i < this.concurrentConsumers; i++) {
            BlockingQueueConsumer consumer = createBlockingQueueConsumer();
            this.consumers.put(consumer, true);
            count++;
        }
    }
  }
  return count;
}

container啟動(dòng)的時(shí)候會(huì)根據(jù)設(shè)置的concurrency的值(同時(shí)不超過(guò)最大值)創(chuàng)建n個(gè)BlockingQueueConsumer。

protected void doStart() throws Exception {
  //some code
  synchronized (this.consumersMonitor) {
    int newConsumers = initializeConsumers();

    //some code
    Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
    for (BlockingQueueConsumer consumer : this.consumers.keySet()) {
        AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
        processors.add(processor);
        this.taskExecutor.execute(processor);
    }
    //some code
  }
}

在doStart()方法中調(diào)用initializeConsumers來(lái)初始化所有的消費(fèi)者变过,AsyncMessageProcessingConsumer作為真實(shí)的處理器包裝了BlockingQueueConsumer埃元,而AsyncMessageProcessingConsumer其實(shí)實(shí)現(xiàn)了Runnable接口,由this.taskExecutor.execute(processor)來(lái)啟動(dòng)消費(fèi)者線程媚狰。

private final class AsyncMessageProcessingConsumer implements Runnable {
  private final BlockingQueueConsumer consumer;
  private final CountDownLatch start;
  private volatile FatalListenerStartupException startupException;

  private AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer) {
    this.consumer = consumer;
    this.start = new CountDownLatch(1);
  }

  //some code
  @Override
  public void run() {
     //some code
  }
}

那么prefetch的值意味著什么呢岛杀?其實(shí)從名字上大致能看出,BlockingQueueConsumer內(nèi)部應(yīng)該維護(hù)了一個(gè)阻塞隊(duì)列BlockingQueue崭孤,prefetch應(yīng)該是這個(gè)阻塞隊(duì)列的長(zhǎng)度类嗤,看下BlockingQueueConsumer內(nèi)部有個(gè)queue衫生,這個(gè)queue不是對(duì)應(yīng)RabbitMQ的隊(duì)列,而是Consumer自己維護(hù)的內(nèi)存級(jí)別的隊(duì)列土浸,用來(lái)暫時(shí)存儲(chǔ)從RabbitMQ中取出來(lái)的消息:

private final BlockingQueue<Delivery> queue;

public BlockingQueueConsumer(ConnectionFactory connectionFactory,
            MessagePropertiesConverter messagePropertiesConverter,
            ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode,
            boolean transactional, int prefetchCount, boolean defaultRequeueRejected,
            Map<String, Object> consumerArgs, boolean exclusive, String... queues) {
  //some code
  this.queue = new LinkedBlockingQueue<Delivery>(prefetchCount);
}

BlockingQueueConsumer的構(gòu)造函數(shù)清楚說(shuō)明了每個(gè)消費(fèi)者內(nèi)部的隊(duì)列大小就是prefetch的大小。

業(yè)務(wù)問(wèn)題

前面說(shuō)過(guò)彭羹,設(shè)置并發(fā)的時(shí)候黄伊,要考慮具體的業(yè)務(wù)場(chǎng)景,對(duì)那種對(duì)消息的順序有苛刻要求的場(chǎng)景不適合并發(fā)消費(fèi)派殷,而對(duì)于其他場(chǎng)景还最,比如用戶注冊(cè)后給用戶發(fā)個(gè)提示短信,是不太在意哪個(gè)消息先被消費(fèi)毡惜,哪個(gè)消息后被消費(fèi)拓轻,因?yàn)槊總€(gè)消息是相對(duì)獨(dú)立的,后注冊(cè)的用戶先收到短信也并沒(méi)有太大影響经伙。

設(shè)置并發(fā)消費(fèi)除了能提高消費(fèi)的速度扶叉,還有另外一個(gè)好處:當(dāng)某個(gè)消費(fèi)者長(zhǎng)期阻塞,此時(shí)在當(dāng)前消費(fèi)者內(nèi)部的BlockingQueue的消息也會(huì)被一直阻塞帕膜,但是新來(lái)的消息仍然可以投遞給其他消費(fèi)者消費(fèi)枣氧,這種情況頂多會(huì)導(dǎo)致prefetch個(gè)數(shù)目的消息消費(fèi)有問(wèn)題,而不至于單消費(fèi)者情況下整個(gè)RabbitMQ的隊(duì)列會(huì)因?yàn)橐粋€(gè)消息有問(wèn)題而全部堵死垮刹。所有在合適的業(yè)務(wù)場(chǎng)景下达吞,需要合理設(shè)置concurrency和prefetch值。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末荒典,一起剝皮案震驚了整個(gè)濱河市酪劫,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌寺董,老刑警劉巖覆糟,帶你破解...
    沈念sama閱讀 206,723評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異螃征,居然都是意外死亡搪桂,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,485評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門盯滚,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)踢械,“玉大人,你說(shuō)我怎么就攤上這事魄藕∧诹校” “怎么了?”我有些...
    開封第一講書人閱讀 152,998評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵背率,是天一觀的道長(zhǎng)话瞧。 經(jīng)常有香客問(wèn)我嫩与,道長(zhǎng),這世上最難降的妖魔是什么交排? 我笑而不...
    開封第一講書人閱讀 55,323評(píng)論 1 279
  • 正文 為了忘掉前任划滋,我火速辦了婚禮,結(jié)果婚禮上埃篓,老公的妹妹穿的比我還像新娘处坪。我一直安慰自己,他們只是感情好架专,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,355評(píng)論 5 374
  • 文/花漫 我一把揭開白布同窘。 她就那樣靜靜地躺著,像睡著了一般部脚。 火紅的嫁衣襯著肌膚如雪想邦。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,079評(píng)論 1 285
  • 那天委刘,我揣著相機(jī)與錄音丧没,去河邊找鬼。 笑死钱雷,一個(gè)胖子當(dāng)著我的面吹牛骂铁,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播罩抗,決...
    沈念sama閱讀 38,389評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼拉庵,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了套蒂?” 一聲冷哼從身側(cè)響起钞支,我...
    開封第一講書人閱讀 37,019評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎操刀,沒(méi)想到半個(gè)月后烁挟,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,519評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡骨坑,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,971評(píng)論 2 325
  • 正文 我和宋清朗相戀三年撼嗓,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片欢唾。...
    茶點(diǎn)故事閱讀 38,100評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡且警,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出礁遣,到底是詐尸還是另有隱情斑芜,我是刑警寧澤,帶...
    沈念sama閱讀 33,738評(píng)論 4 324
  • 正文 年R本政府宣布祟霍,位于F島的核電站杏头,受9級(jí)特大地震影響盈包,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜醇王,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,293評(píng)論 3 307
  • 文/蒙蒙 一呢燥、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧寓娩,春花似錦疮茄、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,289評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)徙邻。三九已至排嫌,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間缰犁,已是汗流浹背淳地。 一陣腳步聲響...
    開封第一講書人閱讀 31,517評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留帅容,地道東北人颇象。 一個(gè)月前我還...
    沈念sama閱讀 45,547評(píng)論 2 354
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像并徘,于是被迫代替她去往敵國(guó)和親遣钳。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,834評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容