分布式消息中間件
RabbitMQ是用Erlang語(yǔ)言編寫的分布式消息中間件佩伤,常常用在大型網(wǎng)站中作為消息隊(duì)列來(lái)使用,主要目的是各個(gè)子系統(tǒng)之間的解耦和異步處理率翅。消息中間件的基本模型是典型的生產(chǎn)者-消費(fèi)者模型,生產(chǎn)者發(fā)送消息到消息隊(duì)列,消費(fèi)者監(jiān)聽(tīng)消息隊(duì)列念秧,收到消息后消費(fèi)處理。
在使用RabbitMQ做消息分發(fā)時(shí)布疼,主要有三個(gè)概念要注意:Exchange摊趾,RoutingKey,Queue游两。
Exchange可以理解為交換器砾层,RoutingKey可以理解為路由,Queue作為真實(shí)存儲(chǔ)消息的隊(duì)列和某個(gè)Exchange綁定贱案,具體如何路由到感興趣的Queue則由Exchange的三種模式?jīng)Q定:
- fanout
- topic
- direct
Exchange為fanout時(shí)肛炮,生產(chǎn)者往此Exchange發(fā)送的消息會(huì)發(fā)給每個(gè)和其綁定的Queue,此時(shí)RoutingKey并不起作用;Exchange為topic時(shí)侨糟,生產(chǎn)者可以指定一個(gè)支持通配符的RoutingKey(如demo.*)發(fā)向此Exchange碍扔,凡是Exchange上RoutingKey滿足此通配符的Queue就會(huì)收到消息;direct類型的Exchange是最直接最簡(jiǎn)單的秕重,生產(chǎn)者指定Exchange和RoutingKey不同,然后往其發(fā)送消息,消息只能被綁定的滿足RoutingKey的Queue接受消息溶耘。(通常如果不指定RoutingKey的具體名字二拐,那么默認(rèn)的名字其實(shí)是Queue的名字)
Concurrency與Prefetch
在通常的使用中(Java項(xiàng)目),我們一般會(huì)結(jié)合spring-amqp框架來(lái)使用RabbitMQ汰具,spring-amqp底層調(diào)用RabbitMQ的java client來(lái)和Broker交互卓鹿,比如我們會(huì)用如下配置來(lái)建立RabbitMQ的連接池、聲明Queue以及指明監(jiān)聽(tīng)者的監(jiān)聽(tīng)行為:
<rabbit:connection-factory id="connectionFactory" />
<!-- template非必須留荔,主要用于生產(chǎn)者發(fā)送消息-->
<rabbit:template id="template" connection-factory="connectionFactory" />
<rabbit:queue name="remoting.queue" />
<rabbit:listener-container connection-factory="connectionFactory" concurrency="2" prefetch="3">
<rabbit:listener ref="listener" queue-names="remoting.queue" />
</rabbit:listener-container>
listener-container可以設(shè)置消費(fèi)者在監(jiān)聽(tīng)Queue的時(shí)候的各種參數(shù)吟孙,其中concurrency和prefetch是本篇文章比較關(guān)心的兩個(gè)參數(shù),以下是spring-amqp文檔的解釋:
prefetchCount(prefetch)
The number of messages to accept from the broker in one socket frame. The higher this is the faster the messages can be delivered, but the higher the risk of non-sequential processing. Ignored if the acknowledgeMode
is NONE. This will be increased, if necessary, to match the txSize
concurrentConsumers(concurrency)
The number of concurrent consumers to initially start for each listener.
簡(jiǎn)單解釋下就是concurrency設(shè)置的是對(duì)每個(gè)listener在初始化的時(shí)候設(shè)置的并發(fā)消費(fèi)者的個(gè)數(shù)聚蝶,prefetch是每次從一次性從broker里面取的待消費(fèi)的消息的個(gè)數(shù)杰妓,上面的配置在監(jiān)控后臺(tái)看到的效果如下:
圖中可以看出有兩個(gè)消費(fèi)者同時(shí)監(jiān)聽(tīng)Queue,但是注意這里的消息只有被一個(gè)消費(fèi)者消費(fèi)掉就會(huì)自動(dòng)ack碘勉,另外一個(gè)消費(fèi)者就不會(huì)再獲取到此消息巷挥,Prefetch Count為配置設(shè)置的值3,意味著每個(gè)消費(fèi)者每次會(huì)預(yù)取3個(gè)消息準(zhǔn)備消費(fèi)验靡。每個(gè)消費(fèi)者對(duì)應(yīng)的listener有個(gè)Exclusive參數(shù)倍宾,默認(rèn)為false, 如果設(shè)置為true,concurrency就必須設(shè)置為1胜嗓,即只能單個(gè)消費(fèi)者消費(fèi)隊(duì)列里的消息高职,適用于必須嚴(yán)格執(zhí)行消息隊(duì)列的消費(fèi)順序(先進(jìn)先出)。
源碼剖析
這里concurrency的實(shí)現(xiàn)方式不看源碼也能猜到辞州,肯定是用多線程的方式來(lái)實(shí)現(xiàn)的怔锌,此時(shí)同一進(jìn)程下打開的本地端口都是56278.下面看看listener-contaner對(duì)應(yīng)的org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer的源碼:
protected int initializeConsumers() {
int count = 0;
synchronized (this.consumersMonitor) {
if (this.consumers == null) {
this.cancellationLock.reset();
this.consumers = new HashMap<BlockingQueueConsumer, Boolean>(this.concurrentConsumers);
for (int i = 0; i < this.concurrentConsumers; i++) {
BlockingQueueConsumer consumer = createBlockingQueueConsumer();
this.consumers.put(consumer, true);
count++;
}
}
}
return count;
}
container啟動(dòng)的時(shí)候會(huì)根據(jù)設(shè)置的concurrency的值(同時(shí)不超過(guò)最大值)創(chuàng)建n個(gè)BlockingQueueConsumer。
protected void doStart() throws Exception {
//some code
synchronized (this.consumersMonitor) {
int newConsumers = initializeConsumers();
//some code
Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
for (BlockingQueueConsumer consumer : this.consumers.keySet()) {
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
this.taskExecutor.execute(processor);
}
//some code
}
}
在doStart()方法中調(diào)用initializeConsumers來(lái)初始化所有的消費(fèi)者变过,AsyncMessageProcessingConsumer作為真實(shí)的處理器包裝了BlockingQueueConsumer埃元,而AsyncMessageProcessingConsumer其實(shí)實(shí)現(xiàn)了Runnable接口,由this.taskExecutor.execute(processor)來(lái)啟動(dòng)消費(fèi)者線程媚狰。
private final class AsyncMessageProcessingConsumer implements Runnable {
private final BlockingQueueConsumer consumer;
private final CountDownLatch start;
private volatile FatalListenerStartupException startupException;
private AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer) {
this.consumer = consumer;
this.start = new CountDownLatch(1);
}
//some code
@Override
public void run() {
//some code
}
}
那么prefetch的值意味著什么呢岛杀?其實(shí)從名字上大致能看出,BlockingQueueConsumer內(nèi)部應(yīng)該維護(hù)了一個(gè)阻塞隊(duì)列BlockingQueue崭孤,prefetch應(yīng)該是這個(gè)阻塞隊(duì)列的長(zhǎng)度类嗤,看下BlockingQueueConsumer內(nèi)部有個(gè)queue衫生,這個(gè)queue不是對(duì)應(yīng)RabbitMQ的隊(duì)列,而是Consumer自己維護(hù)的內(nèi)存級(jí)別的隊(duì)列土浸,用來(lái)暫時(shí)存儲(chǔ)從RabbitMQ中取出來(lái)的消息:
private final BlockingQueue<Delivery> queue;
public BlockingQueueConsumer(ConnectionFactory connectionFactory,
MessagePropertiesConverter messagePropertiesConverter,
ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode,
boolean transactional, int prefetchCount, boolean defaultRequeueRejected,
Map<String, Object> consumerArgs, boolean exclusive, String... queues) {
//some code
this.queue = new LinkedBlockingQueue<Delivery>(prefetchCount);
}
BlockingQueueConsumer的構(gòu)造函數(shù)清楚說(shuō)明了每個(gè)消費(fèi)者內(nèi)部的隊(duì)列大小就是prefetch的大小。
業(yè)務(wù)問(wèn)題
前面說(shuō)過(guò)彭羹,設(shè)置并發(fā)的時(shí)候黄伊,要考慮具體的業(yè)務(wù)場(chǎng)景,對(duì)那種對(duì)消息的順序有苛刻要求的場(chǎng)景不適合并發(fā)消費(fèi)派殷,而對(duì)于其他場(chǎng)景还最,比如用戶注冊(cè)后給用戶發(fā)個(gè)提示短信,是不太在意哪個(gè)消息先被消費(fèi)毡惜,哪個(gè)消息后被消費(fèi)拓轻,因?yàn)槊總€(gè)消息是相對(duì)獨(dú)立的,后注冊(cè)的用戶先收到短信也并沒(méi)有太大影響经伙。
設(shè)置并發(fā)消費(fèi)除了能提高消費(fèi)的速度扶叉,還有另外一個(gè)好處:當(dāng)某個(gè)消費(fèi)者長(zhǎng)期阻塞,此時(shí)在當(dāng)前消費(fèi)者內(nèi)部的BlockingQueue的消息也會(huì)被一直阻塞帕膜,但是新來(lái)的消息仍然可以投遞給其他消費(fèi)者消費(fèi)枣氧,這種情況頂多會(huì)導(dǎo)致prefetch個(gè)數(shù)目的消息消費(fèi)有問(wèn)題,而不至于單消費(fèi)者情況下整個(gè)RabbitMQ的隊(duì)列會(huì)因?yàn)橐粋€(gè)消息有問(wèn)題而全部堵死垮刹。所有在合適的業(yè)務(wù)場(chǎng)景下达吞,需要合理設(shè)置concurrency和prefetch值。