RabbitMQ的學(xué)習(xí)筆記
關(guān)于RabbitMQ的幾個(gè)角色如下:
關(guān)于名詞的通俗解析:
首先我們肯定知道RabbitMQ就是消息隊(duì)列的一種實(shí)現(xiàn)掩驱,那么圍繞這個(gè)阶祭,我們就可以思考一個(gè)消息隊(duì)列到底需要什么,當(dāng)然是需要隊(duì)列蛙埂,那么這個(gè)隊(duì)列就是Queue,那么其他的所有名詞都是圍繞這個(gè)Queue來(lái)拓展的。
首先抽诉,想要讓隊(duì)列不在本地運(yùn)行,而在網(wǎng)絡(luò)中運(yùn)行吐绵,肯定會(huì)有連接這個(gè)概念迹淌,所以就會(huì)有Connection,我們發(fā)一條消息連接一次拦赠,這樣很顯然是浪費(fèi)資源的巍沙,建立連接的過(guò)程也很耗時(shí),所以我們就會(huì)做一個(gè)東西讓他來(lái)管理連接荷鼠,當(dāng)我用的時(shí)候句携,直接從里邊拿出來(lái)已經(jīng)建立好的連接發(fā)信息,那么ConnectionFactory應(yīng)運(yùn)而生允乐。
接下來(lái)矮嫉,當(dāng)程序開(kāi)發(fā)時(shí)削咆,可能不止用到一個(gè)隊(duì)列,可能有訂單的隊(duì)列蠢笋、消息的隊(duì)列拨齐、任務(wù)的隊(duì)列等等,那么就需要給不同的queue發(fā)信息昨寞,那么和每一個(gè)隊(duì)列連接的這個(gè)概念瞻惋,就叫Channel
再往下來(lái),當(dāng)我們開(kāi)發(fā)的時(shí)候還有時(shí)候會(huì)用到這樣一種功能援岩,就是當(dāng)我發(fā)送一條消息歼狼,需要讓幾個(gè)queue都收到,那么怎么解決這個(gè)問(wèn)題呢享怀,難道我要給每一個(gè)queue發(fā)送一次消息羽峰?那豈不是浪費(fèi)帶寬又浪費(fèi)資源,我們能想到什么辦法呢添瓷,當(dāng)然是我們發(fā)送給RabbitMQ服務(wù)器一次梅屉,然后讓RabbitMQ服務(wù)器自己解析需要給哪個(gè)Queue發(fā),那么Exchange就是干這件事的
但是我們給Exchange發(fā)消息鳞贷,他怎么知道給哪個(gè)Queue發(fā)呢坯汤?這里就用到了RoutingKey和BindingKey
BindingKey是Exchange和Queue綁定規(guī)則的描述。Exchange接收到的消息中會(huì)帶有RoutingKey這個(gè)字段悄晃,Exchange就是根據(jù)這個(gè)RoutingKey和當(dāng)前Exchange所有綁定的BindingKey做匹配玫霎,如果滿足要求,就往BindingKey所綁定的Queue發(fā)送消息妈橄,這樣我們就解決了我們向RabbitMQ發(fā)送一次消息庶近,可以分發(fā)到不同的Queue的過(guò)程
關(guān)于RabbitMQ Server的問(wèn)題?
1問(wèn):如何申明的exchange和queue眷蚓?如何發(fā)送和接收消息鼻种?
答:
exchange 申明方法為:
/**
* exchange: exchange的名稱
* type: 四種類型fanout(分發(fā)),topic(匹配)沙热,direct(直連)叉钥,header(主題)
* durable:是否持久化;默認(rèn)為false不持久化
* autoDelete:是否自動(dòng)刪除篙贸;默認(rèn)false不刪除
* internal:默認(rèn)為false;true表示:這個(gè)exchange不接收client推送來(lái)的消息投队,只接收exchange之間通信
* Map<String, Object> arguments:
*/
exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;
queue的申明方法:
/**
* queue: queue的名稱
* durable:是否持久化;默認(rèn)為false不持久化
* exclusive:排他隊(duì)列爵川;
* autoDelete:是否自動(dòng)刪除敷鸦;默認(rèn)true自動(dòng)刪除
* Map<String, Object> arguments:
*/
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
- queue: queue的名稱
- durable跟交換機(jī)方法的參數(shù)一樣,true表示做持久化,當(dāng)RabbitMQ服務(wù)重啟時(shí)扒披,隊(duì)列依然存在值依。
- exclusive是排他隊(duì)列,如果一個(gè)隊(duì)列被聲明為排他隊(duì)列碟案,那么這個(gè)隊(duì)列只能被第一次聲明他的連接所見(jiàn)愿险,并在連接斷開(kāi)的時(shí)候自動(dòng)刪除。這里有三點(diǎn)需要說(shuō)明价说,1辆亏、同一個(gè)連接的不同channel,是可以訪問(wèn)同一連接下創(chuàng)建的排他隊(duì)列的熔任。2褒链、排他隊(duì)列只能被聲明一次,其他連接不允許聲明同名的排他隊(duì)列疑苔。3、即使排他隊(duì)列是持久化的甸鸟,當(dāng)連接斷開(kāi)或者客戶端退出時(shí)惦费,排他隊(duì)列依然會(huì)被刪除。
- autoDelete是自動(dòng)刪除抢韭,為true時(shí)薪贫,當(dāng)沒(méi)有任何消費(fèi)者訂閱該隊(duì)列時(shí),隊(duì)列會(huì)被自動(dòng)刪除刻恭。
- arguments:其它參數(shù):
x-message-ttl
:創(chuàng)建queue時(shí)設(shè)置該參數(shù)可指定消息在該queue中待多久瞧省,當(dāng)消息到期時(shí),會(huì)主動(dòng)刪除(可根據(jù)x-dead-letter-routing-key和x-dead-letter-exchange生成可延遲的死信隊(duì)列)x-expires
:queue存活時(shí)間鳍贾,創(chuàng)建queue時(shí)參數(shù)arguments設(shè)置了x-expires參數(shù)鞍匾,該queue會(huì)在x-expires到期后queue消息,親身測(cè)試直接消失(哪怕里面有未消費(fèi)的消息)骑科。x-max-length
:queue消息條數(shù)限制橡淑,限制加入queue中消息的條數(shù)。先進(jìn)先出原則咆爽,超過(guò)10條后面的消息會(huì)頂替前面的消息梁棠。x-max-length-bytes
:queue消息容量限制,該參數(shù)和x-max-length目的一樣限制隊(duì)列的容量斗埂,但是這個(gè)是靠隊(duì)列大蟹(bytes)來(lái)達(dá)到限制。x-dead-letter-routing-key
與x-dead-letter-exchange
:創(chuàng)建queue時(shí)參數(shù)arguments設(shè)置了x-dead-letter-routing-key和x-dead-letter-exchange呛凶,會(huì)在x-message-ttl時(shí)間到期后把消息放到x-dead-letter-routing-key和x-dead-letter-exchange指定的隊(duì)列中達(dá)到延遲隊(duì)列的目的男娄。x-max-priority
:消息優(yōu)先級(jí)(版本限制3.5+ )創(chuàng)建queue時(shí)arguments可以使用x-max-priority參數(shù)聲明優(yōu)先級(jí)隊(duì)列的最大優(yōu)先級(jí)(整數(shù),建議0-10之間),在發(fā)布消息的時(shí)候指定該消息的優(yōu)先級(jí)沪伙, 優(yōu)先級(jí)更高(數(shù)值更大的)的消息先被消費(fèi)瓮顽。目前使用更多的優(yōu)先級(jí)將消耗更多的資源(Erlang進(jìn)程)。 設(shè)置該參數(shù)同時(shí)設(shè)置死信隊(duì)列時(shí)或造成已過(guò)期的低優(yōu)先級(jí)消息會(huì)在未過(guò)期的高優(yōu)先級(jí)消息后面執(zhí)行围橡。 該參數(shù)會(huì)造成額外的CPU消耗暖混。x-queue-mode
: Lazy;Queues: 先將消息保存到磁盤(pán)上翁授,不放在內(nèi)存中拣播,當(dāng)消費(fèi)者開(kāi)始消費(fèi)的時(shí)候才加載到內(nèi)存中
發(fā)送消息
/**
* Publish a message.
*
* Publishing to a non-existent exchange will result in a channel-level
* protocol exception, which closes the channel.
*
* Invocations of <code>Channel#basicPublish</code> will eventually block if a
* <a >resource-driven alarm</a> is in effect.
*
* @see com.rabbitmq.client.AMQP.Basic.Publish
* @see <a >Resource-driven alarms</a>.
* @param exchange the exchange to publish the message to
* @param routingKey the routing key
* @param mandatory true if the 'mandatory' flag is to be set
* @param immediate true if the 'immediate' flag is to be
* set. Note that the RabbitMQ server does not support this flag.
* @param props other properties for the message - routing headers etc
* @param body the message body
* @throws java.io.IOException if an error is encountered
*/
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
throws IOException;
- routingKey:路由鍵,#匹配0個(gè)或多個(gè)單詞收擦,*匹配一個(gè)單詞贮配,在topic exchange做消息轉(zhuǎn)發(fā)用
- mandatory:true:如果exchange根據(jù)自身類型和消息routeKey無(wú)法找到一個(gè)符合條件的queue,那么會(huì)調(diào)用basic.return方法將消息返還給生產(chǎn)者塞赂。默認(rèn)false:出現(xiàn)上述情形broker會(huì)直接將消息扔掉
- immediate:true:如果exchange在將消息route到queue(s)時(shí)發(fā)現(xiàn)對(duì)應(yīng)的queue上沒(méi)有消費(fèi)者泪勒,那么這條消息不會(huì)放入隊(duì)列中。當(dāng)與消息routeKey關(guān)聯(lián)的所有queue(一個(gè)或多個(gè))都沒(méi)有消費(fèi)者時(shí)宴猾,該消息會(huì)通過(guò)basic.return方法返還給生產(chǎn)者圆存。默認(rèn)false
- BasicProperties :需要注意的是BasicProperties.deliveryMode,0:不持久化 1:持久化 這里指的是消息的持久化仇哆,配合channel(durable=true),queue(durable)可以實(shí)現(xiàn)沦辙,即使服務(wù)器宕機(jī),消息仍然保留
簡(jiǎn)單來(lái)說(shuō):mandatory標(biāo)志告訴服務(wù)器至少將該消息route到一個(gè)隊(duì)列中讹剔,否則將消息返還給生產(chǎn)者油讯;immediate標(biāo)志告訴服務(wù)器如果該消息關(guān)聯(lián)的queue上有消費(fèi)者,則馬上將消息投遞給它延欠,如果所有queue都沒(méi)有消費(fèi)者陌兑,直接把消息返還給生產(chǎn)者,不用將消息入隊(duì)列等待消費(fèi)者了衫冻。
消費(fèi)消息
在RabbitMQ中消費(fèi)者有2種方式獲取隊(duì)列中的消息:
- 通過(guò)basic.consume命令诀紊,訂閱某一個(gè)隊(duì)列中的消息,channel會(huì)自動(dòng)在處理完上一條消息之后,接收下一條消息隅俘。(同一個(gè)channel消息處理是串行的)邻奠。除非關(guān)閉channel或者取消訂閱,否則客戶端將會(huì)一直接收隊(duì)列的消息为居。
- 通過(guò)basic.get命令主動(dòng)獲取隊(duì)列中的消息碌宴,但是絕對(duì)不可以通過(guò)循環(huán)調(diào)用basic.get來(lái)代替basic.consume,這是因?yàn)閎asic.get RabbitMQ在實(shí)際執(zhí)行的時(shí)候蒙畴,是首先consume某一個(gè)隊(duì)列贰镣,然后檢索第一條消息呜象,然后再取消訂閱。如果是高吞吐率的消費(fèi)者碑隆,最好還是建議使用basic.consume恭陡。
簡(jiǎn)單總結(jié)一下就是說(shuō):
consume是只要隊(duì)列里面還有消息就一直取。
get是只取了隊(duì)列里面的第一條消息上煤。
因?yàn)間et開(kāi)銷(xiāo)大休玩,如果需要從一個(gè)隊(duì)列取消息的話,首選consume方式劫狠,慎用循環(huán)get方式拴疤。
問(wèn):什么是死信?
答:三種情況:
- 消息被拒絕(basic.reject或basic.nack)并且requeue=false.
- 消息TTL過(guò)期
- 隊(duì)列達(dá)到最大長(zhǎng)度(隊(duì)列滿了独泞,無(wú)法再添加數(shù)據(jù)到mq中)
問(wèn):什么是死信交換機(jī)呐矾?
答:在定義業(yè)務(wù)隊(duì)列的時(shí)候,要考慮指定一個(gè)死信交換機(jī)懦砂,死信交換機(jī)可以和任何一個(gè)普通的隊(duì)列進(jìn)行綁定蜒犯,然后
在業(yè)務(wù)隊(duì)列出現(xiàn)死信的時(shí)候就會(huì)將數(shù)據(jù)發(fā)送到死信隊(duì)列。
問(wèn):什么是死信隊(duì)列孕惜?
答:死信隊(duì)列實(shí)際上就是一個(gè)普通的隊(duì)列愧薛,只是這個(gè)隊(duì)列跟死信交換機(jī)進(jìn)行了綁定,用來(lái)存放死信而已
死信交換機(jī)圖解
問(wèn):什么場(chǎng)景下可以用延時(shí)隊(duì)列衫画?用RabbitMQ如何實(shí)現(xiàn)?
答:
電商系統(tǒng)中瓮栗,支付訂單未完成時(shí)削罩,30分鐘后提示用戶,可以用延時(shí)隊(duì)列來(lái)實(shí)現(xiàn)费奸。
rabbitmq本身不具有延時(shí)消息隊(duì)列的功能弥激,但是可以通過(guò)TTL(Time To Live)、DLX(Dead Letter Exchanges)特性實(shí)現(xiàn)愿阐。其原理給消息設(shè)置過(guò)期時(shí)間微服,在消息隊(duì)列上為過(guò)期消息指定轉(zhuǎn)發(fā)器,這樣消息過(guò)期后會(huì)轉(zhuǎn)發(fā)到與指定轉(zhuǎn)發(fā)器匹配的隊(duì)列上缨历,變向?qū)崿F(xiàn)延時(shí)隊(duì)列以蕴。
(rabbitmq-delayed-message-exchange 這個(gè)插件可以實(shí)現(xiàn)延遲隊(duì)列的功能)
問(wèn):設(shè)置每條消息的失效時(shí)間,先設(shè)置一條30s的失效消息發(fā)送給MQ辛孵,再設(shè)置一條10s的失效消息發(fā)給MQ丛肮,那么什么收到消息的順序是什么樣的?什么時(shí)間能收到魄缚?
答:
申明隊(duì)列隊(duì)時(shí)設(shè)置消息的失效時(shí)間宝与,方式為:
Map<String, Object> argsMap = new HashMap<String, Object>();
argsMap.put("x-message-ttl", 6000); // 消息6s后失效
argsMap.put("x-dead-letter-exchange", "some.dead.exchange.name");//設(shè)置死信交換機(jī)
argsMap.put("x-dead-letter-routing-key", "some-dead-routing-key");//設(shè)置死信routingKey
String myqueue = "myqueue2";
channel.queueDeclare(myqueue, false, false, false, argsMap);
設(shè)置每條消息的失效時(shí)間焚廊,方式為:
new AMQP.BasicProperties.Builder().deliveryMode(2).expiration("2000").build();
問(wèn):兩種設(shè)置失效方式有什么不一樣呢?
經(jīng)過(guò)試驗(yàn)得出习劫,對(duì)于第一種設(shè)置隊(duì)列TTL屬性的方法咆瘟,一旦消息過(guò)期,就會(huì)從隊(duì)列中抹去诽里,而第二種方法里袒餐,即使消息過(guò)期,也不會(huì)馬上從隊(duì)列中抹去须肆,因?yàn)槊織l消息是否過(guò)期時(shí)在即將投遞到server之前判定的匿乃,為什么兩者得處理方法不一致?因?yàn)榈谝环N方法里豌汇,隊(duì)列中已過(guò)期的消息肯定在隊(duì)列頭部幢炸,RabbitMQ只要定期從隊(duì)頭開(kāi)始掃描是否有過(guò)期消息即可,而第二種方法里拒贱,每條消息的過(guò)期時(shí)間不同宛徊,如果要?jiǎng)h除所有過(guò)期消息,勢(shì)必要掃描整個(gè)隊(duì)列逻澳,所以不如等到此消息即將被消費(fèi)時(shí)再判定是否過(guò)期闸天,如果過(guò)期,再進(jìn)行刪除斜做。
所以苞氮,會(huì)在30s之后,收到兩條消息瓤逼。
問(wèn):隊(duì)列設(shè)置了消息失效時(shí)間TTL為10s笼吟,消息1設(shè)置了失效時(shí)間5s,消息2設(shè)置了失效時(shí)間15s霸旗,當(dāng)無(wú)消費(fèi)者時(shí)贷帮,消息1和2分別多久后會(huì)出現(xiàn)在死信隊(duì)列里?
答:失效時(shí)間近的設(shè)置先生效诱告。消息1在5s后撵枢,消息2在10s后。
關(guān)于RabbitMQ連接的建立精居、傳輸?shù)膯?wèn)題
問(wèn):connection關(guān)閉后锄禽,channel是否會(huì)消失?
答:會(huì)箱蟆。
問(wèn):相同的connection沟绪、channel能否申明已經(jīng)申明過(guò)的exchange、queue空猜?
答:可以绽慈。
問(wèn):相同的connection恨旱、channel能否再次申明已經(jīng)申明過(guò)但屬性不一樣的exchange、queue坝疼,后申明的屬性是否覆蓋之前的屬性搜贤?
答:不可以反復(fù)申明不同屬性的exchange,會(huì)拋出異常钝凶。相同屬性的exchange可以反復(fù)申明仪芒。
問(wèn):新建立的connection、channel能否申明已經(jīng)被其他connection耕陷、channel申明過(guò)的exchange掂名、queue?
答:可以哟沫,但那是屬性必須和已經(jīng)申明的exchange一致饺蔑。
問(wèn):客戶端主動(dòng)斷開(kāi)connection,RabbitMQ服務(wù)端是否會(huì)刪除這個(gè)connection所申明的exchange嗜诀、queue猾警?
答:exchange默認(rèn)不會(huì)。queue默認(rèn)會(huì)隆敢。在創(chuàng)建的時(shí)候可以設(shè)置這個(gè)屬性发皿。
問(wèn):生產(chǎn)者client發(fā)送消息后,服務(wù)器宕機(jī)拂蝎,導(dǎo)致消息丟失穴墅。RabbitMQ是通過(guò)什么方式避免消息發(fā)送時(shí)丟失?
答:事務(wù)機(jī)制(性能消耗大)和confirm模式(消耗形伦浴)
事務(wù)機(jī)制
RabbitMQ提供了txSelect()封救、txCommit()和txRollback()三個(gè)方法對(duì)消息發(fā)送進(jìn)行事務(wù)管理,txSelect用于將通道channel開(kāi)啟事務(wù)模式捣作,txCommit用于提交事務(wù),txRollback用戶進(jìn)行事務(wù)回滾操作鹅士。
示例代碼:
try{
//channel開(kāi)啟事務(wù)模式
channel.txSelect();
//發(fā)送消息
channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
//模擬異常
int n = 1/0;
//提交事務(wù)
channel.txCommit();
}catch(Exception e){
e.printStackTrace();
channel.txRollback();
}
假如在txCommit之前發(fā)生了異常券躁,那么就可以通過(guò)Rollback進(jìn)行回滾操作。
以上是基于AMQP協(xié)議層的事務(wù)機(jī)制掉盅,確保了數(shù)據(jù)在生產(chǎn)者與RabbitMQ服務(wù)器之間的可靠性也拜,但是性能開(kāi)銷(xiāo)較大。
Confirm模式
RabbitMQ提供了一種低消耗的事務(wù)管理方式趾痘,將channel設(shè)置成confirm模式慢哈。confirm模式的channel,通過(guò)該channel發(fā)出的消息會(huì)生成一個(gè)唯一的有序ID(從1開(kāi)始)永票,一旦消息成功發(fā)送到相應(yīng)的隊(duì)列之后卵贱,RabbitMQ服務(wù)端會(huì)發(fā)送給生產(chǎn)者一個(gè)確認(rèn)標(biāo)志滥沫,包含消息的ID,這樣生產(chǎn)者就知道該消息已經(jīng)發(fā)送成功了键俱。如果消息和隊(duì)列是持久化的兰绣,那么當(dāng)消息成功寫(xiě)入磁盤(pán)之后,生產(chǎn)者會(huì)收到確認(rèn)消息编振。此外服務(wù)端也可以設(shè)置basic.ack的mutiple域缀辩,表明是否是批量確認(rèn)的消息,即該序號(hào)之前的所有消息都已經(jīng)收到了踪央。
confirm的機(jī)制是異步的臀玄,生產(chǎn)者可以在等待的同時(shí)繼續(xù)發(fā)送下一條消息,并且異步等待回調(diào)處理畅蹂,如果消息成功發(fā)送健无,會(huì)返回ack消息供異步處理,如果消息發(fā)送失敗發(fā)生異常魁莉,也會(huì)返回nack消息睬涧。confirm的時(shí)間沒(méi)有明確說(shuō)明,并且同一個(gè)消息只會(huì)被confirm一次旗唁。
我們?cè)谏a(chǎn)者使用如下代碼開(kāi)啟channel的confirm模式畦浓,**已經(jīng)開(kāi)啟事務(wù)機(jī)制的channel是不能開(kāi)啟confirm模式的。**
channel.confirmSelect();
處理ack或者nack的方式有三種:
串行confirm:
每發(fā)送一條消息就調(diào)用waitForConfirms()方法等待服務(wù)端confirm
//開(kāi)啟confirm模式
channel.confirmSelect();
String message = "Hello World";
//發(fā)送消息
channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
//判斷是否回復(fù)
if(channel.waitForConfirms()){
System.out.println("Message send success.");
}
其中waitForConfirms可以換成帶有時(shí)間參數(shù)的方法waitForConfirms(Long mills)指定等待響應(yīng)時(shí)間
批量confirm:
每發(fā)送一批次消息就調(diào)用waitForConfirms()方法等待服務(wù)端confirm
//開(kāi)啟confirm模式
channel.confirmSelect();
for(int i =0;i<1000;i++){
String message = "Hello World";
//發(fā)送消息
channel.basicPublish(EXCHANGE_NAME,
"",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
if(i%100==0){
//每發(fā)送100條判斷一次是否回復(fù)
if(channel.waitForConfirms()){
System.out.println("Message send success.");
}
}
}
批量的方法從數(shù)量級(jí)上降低了confirm的性能消耗检疫,提高了效率讶请,但是有個(gè)致命的缺陷,一旦回復(fù)確認(rèn)失敗屎媳,當(dāng)前確認(rèn)批次的消息會(huì)全部重新發(fā)送夺溢,導(dǎo)致消息重復(fù)發(fā)送。所以批量的confirm雖然性能提高了烛谊,但是消息的重復(fù)率也提高了风响。
異步confirm:
使用監(jiān)聽(tīng)方法,當(dāng)服務(wù)端confirm了一條或多條消息后丹禀,調(diào)用回調(diào)方法
//聲明一個(gè)用來(lái)記錄消息唯一ID的有序集合SortedSet
final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
//開(kāi)啟confirm模式
channel.confirmSelect();
//異步監(jiān)聽(tīng)方法 處理ack與nack方法
channel.addConfirmListener(new ConfirmListener() {
//處理ack multiple 是否批量 如果是批量 則將比該條小的所有數(shù)據(jù)都移除 否則只移除該條
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
//處理nack 與ack相同
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("There is Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
});
while (true) {
//獲取消息confirm的唯一ID
long nextSeqNo = channel.getNextPublishSeqNo();
String message = "Hello World.";
//發(fā)送消息
channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
//將ID加入到有序集合中
confirmSet.add(nextSeqNo);
}
每一個(gè)comfirm的通道維護(hù)一個(gè)集合状勤,每發(fā)送一條數(shù)據(jù),集合增加一個(gè)元素双泪,每異步響應(yīng)一條ack或者nack的數(shù)據(jù)持搜,集合刪除一條。SortedSet是一個(gè)有序的集合焙矛,它的有序是值大小的有序葫盼,不是插入時(shí)間的有序。JDK中waitForConfirms()方法也是使用了SortedSet集合
7問(wèn):RabbitMQ如何知道消息已經(jīng)被consumer消費(fèi)村斟?
答:ack機(jī)制贫导。為了保證RabbitMQ能夠感知消費(fèi)者正確取到了消息抛猫,RabbitMQ提供了消息確認(rèn)機(jī)制,與給生產(chǎn)者回復(fù)ACK的方式類似脱盲,當(dāng)隊(duì)列發(fā)送一條消息給消費(fèi)者時(shí)邑滨,會(huì)記錄一個(gè)unack標(biāo)志,當(dāng)消費(fèi)者拿到消息之后钱反,會(huì)回復(fù)一個(gè)ack標(biāo)志掖看,從而抵消了原來(lái)的unack標(biāo)志。
8問(wèn):消費(fèi)者中默認(rèn)的ack機(jī)制是什么樣的面哥?
答:默認(rèn)情況是:當(dāng)消費(fèi)者拿到消息之后立即回復(fù)ack而不管消息是否正確被處理哎壳,就回復(fù)。這個(gè)時(shí)間很快尚卫,以至于基本看不到unack的狀態(tài)归榕。
9問(wèn):消費(fèi)者的默認(rèn)ack機(jī)制,會(huì)有什么問(wèn)題么吱涉?
答:消息在消費(fèi)者端阻塞(消息重復(fù)或者大量的消息堆積)刹泄。假如消費(fèi)者在接收消息后,業(yè)務(wù)處理的過(guò)程中發(fā)生異常crash了怎爵,那么這條消息就消失了特石,持久化也無(wú)法解決這個(gè)問(wèn)題。這里就需要我們?cè)谌粘5臉I(yè)務(wù)處理中鳖链,消費(fèi)者要手動(dòng)的確認(rèn)消息姆蘸。確認(rèn)消息包括兩種,一種是ack芙委,另一種是unack逞敷,unack是表明我這條消息處理異常了,可以設(shè)置參數(shù)告訴MQ服務(wù)器是否需要將消息重新放入到隊(duì)列中灌侣。同時(shí)推捐,如果開(kāi)啟了手動(dòng)回復(fù)確認(rèn)的消費(fèi)者,當(dāng)消費(fèi)者異常斷開(kāi)時(shí)侧啼,沒(méi)有回復(fù)的消息會(huì)被重新放入隊(duì)列供給其他消費(fèi)者使用玖姑。所以程序員必須一定要記得回復(fù)消息確認(rèn),不然會(huì)導(dǎo)致消息重復(fù)或者大量的消息堆積慨菱。
10問(wèn):如何解決消費(fèi)者的消息阻塞(消息重復(fù)或者大量的消息堆積)?
阻塞的問(wèn)題的解決方案就是設(shè)置合理的prefetch大小戴甩,處理能力快的消費(fèi)者設(shè)置數(shù)值大符喝,處理更多的消息,處理能力慢的消費(fèi)者設(shè)置數(shù)值小甜孤,少處理消息协饲,也就不會(huì)發(fā)生阻塞畏腕。假設(shè)有兩個(gè)消費(fèi)者,消費(fèi)者1處理業(yè)務(wù)時(shí)間是2s茉稠,消費(fèi)者2處理業(yè)務(wù)時(shí)間是2ms描馅,都設(shè)置prefetch大小為10,那么就不會(huì)出現(xiàn)消費(fèi)者1積累大量的unack而线,這里最多的unack數(shù)目就是兩個(gè)prefetch的大小之和20铭污,同時(shí),MQ分發(fā)消息是先塞滿10個(gè)到消費(fèi)者1膀篮,再塞滿10個(gè)到消費(fèi)者2嘹狞,塞第21個(gè)的時(shí)候,先看消費(fèi)者1的緩沖池有沒(méi)有空位誓竿,沒(méi)有的話去看消費(fèi)者2磅网,因?yàn)橄M(fèi)者2的處理速度比1快1000倍,所以1000條數(shù)據(jù)前10條塞給消費(fèi)者1之后筷屡,后邊的數(shù)據(jù)就都塞給消費(fèi)者2了涧偷。設(shè)置prefetch大小的方法:在消費(fèi)者中加入代碼:channel.basicQos(10);