場景
在使用RabbitMQ時,默認預取消息隊列中的全部數據緩存在本地崇堵,導致只有一個進程能獲取資源型诚,而其他進程處于閑置狀態(tài)。而持有資源的進程消費能力有限時鸳劳,會導致消息隊列積壓俺驶。
說明
消息的輪詢分配機制和盡可能快速推送消息的機制給實際使用帶來困難。實際情況下棍辕,每個消費者處理消息的能力暮现、每個消息處理所需時間可能都是不同的,若只是機械化地順次分配楚昭,可能造成一個消費者由于處理的消息的業(yè)務復雜栖袋、處理能力低而積壓消息,另一個消費者早早處理完所有的消息抚太,處于空閑狀態(tài)塘幅,造成系統(tǒng)的處理能力的浪費。且無法加入新的消費者以提高系統(tǒng)的處理能力尿贫。
希望達到的效果是:每個消費者都根據自身處理能力合理分配消息處理任務电媳,既無擠壓也無空閑,新加入的消費者也能分擔消息處理任務庆亡,使系統(tǒng)的處理能力能夠平行擴展匾乓。
RabbitMQ
客戶端可通過Channel
類的basicQos(int prefetchCount)
設置消費者的預取數目,即消費者最大的未確認消息的數目又谋。
假設prefetchCount=10
拼缝,有兩個消費者,兩個消費者依次從隊列中抓取10條消息緩存本地彰亥,若此時有新的消息到達隊列咧七,先判斷信道中未確認的消息是否大于或等于20條,若是任斋,則不向信道中投遞消息继阻,當信道中未確認消息數小于20條后,信道中哪個消費者未確認消息小于10條,就將消息投遞給哪個消費者瘟檩。
channel.basicQos()
中設置的預取數量多少合適犬第,是一個頗有講究的問題。我們希望充分利用消費者的處理能力芒帕,因此不宜設置過小歉嗓,否則在消費者處理消息后,RabbitMQ
收到確認消息后才會投遞新的消息背蟆,導致此期間消費者處于空閑狀態(tài)鉴分,浪費消費者的處理能力;但設置過大带膀,又可能使消息積壓在消費者的緩存里志珍,我們希望對于來不及處理的消息,應保留在隊列中垛叨,便于加入新的消費者或空閑出來的消費者分擔消息處理任務伦糯。
? RabbitMQ
官網的一篇文章詳細討論了預取數量的設置問題:
? https://www.rabbitmq.com/blog/2012/05/11/some-queuing-theory-throughput-latency-and-bandwidth/
文章大致內容如下:
假設從RabbitMQ
服務端隊列取消息、傳輸消息到消費者耗時為50ms嗽元,消費者消費消息耗時4ms敛纲,消費者傳輸確認消息到服務端耗時為50ms。若網絡狀況剂癌、消費者處理速度穩(wěn)定淤翔,則預取數量的最優(yōu)數值為:(50 + 4 + 50)/4=26個。
最初服務端將向客戶端發(fā)送26
條消息佩谷,并緩存在客戶端本地旁壮,當消費者處理好第一個消息后,向服務端發(fā)送確認消息并取本地緩存的第二個消息谐檀,確認消息由客戶端傳送到服務端耗時50ms
抡谐,服務端收到確認后發(fā)送新的消息經過50ms
又到達了客戶端,而余下的25
個消息被消費耗時為25×4=100ms
桐猬,所以當新的消息達到時麦撵,第一輪的26
個消息恰好全部處理完。依次類推课幕,之后厦坛,每當處理完一個舊有的消息時五垮,恰好會到達一個新的消息乍惊。既不會發(fā)生消息積壓,消費者也不會空閑放仗。
? 但實際情況是润绎,網絡的傳輸狀況、消費者處理消息的速度都不會是恒定的,會時快時慢莉撇,造成消息積壓或消費者空閑呢蛤,這就要求預取數量要與網絡和消費者的狀況實時改變。
總結
說白了棍郎,預取數據就是為了控制消費者在獲取/發(fā)送數據與業(yè)務邏輯之間能夠更好的銜接其障,避免某個消息積壓或過于空閑的情況出現。