RabbitMQ 問(wèn)答式總結(jié)

RabbitMQ的學(xué)習(xí)筆記

關(guān)于RabbitMQ的幾個(gè)角色如下:

image

關(guān)于名詞的通俗解析:

首先我們肯定知道RabbitMQ就是消息隊(duì)列的一種實(shí)現(xiàn)掩驱,那么圍繞這個(gè)阶祭,我們就可以思考一個(gè)消息隊(duì)列到底需要什么,當(dāng)然是需要隊(duì)列蛙埂,那么這個(gè)隊(duì)列就是Queue,那么其他的所有名詞都是圍繞這個(gè)Queue來(lái)拓展的。

首先抽诉,想要讓隊(duì)列不在本地運(yùn)行,而在網(wǎng)絡(luò)中運(yùn)行吐绵,肯定會(huì)有連接這個(gè)概念迹淌,所以就會(huì)有Connection,我們發(fā)一條消息連接一次拦赠,這樣很顯然是浪費(fèi)資源的巍沙,建立連接的過(guò)程也很耗時(shí),所以我們就會(huì)做一個(gè)東西讓他來(lái)管理連接荷鼠,當(dāng)我用的時(shí)候句携,直接從里邊拿出來(lái)已經(jīng)建立好的連接發(fā)信息,那么ConnectionFactory應(yīng)運(yùn)而生允乐。

接下來(lái)矮嫉,當(dāng)程序開(kāi)發(fā)時(shí)削咆,可能不止用到一個(gè)隊(duì)列,可能有訂單的隊(duì)列蠢笋、消息的隊(duì)列拨齐、任務(wù)的隊(duì)列等等,那么就需要給不同的queue發(fā)信息昨寞,那么和每一個(gè)隊(duì)列連接的這個(gè)概念瞻惋,就叫Channel

再往下來(lái),當(dāng)我們開(kāi)發(fā)的時(shí)候還有時(shí)候會(huì)用到這樣一種功能援岩,就是當(dāng)我發(fā)送一條消息歼狼,需要讓幾個(gè)queue都收到,那么怎么解決這個(gè)問(wèn)題呢享怀,難道我要給每一個(gè)queue發(fā)送一次消息羽峰?那豈不是浪費(fèi)帶寬又浪費(fèi)資源,我們能想到什么辦法呢添瓷,當(dāng)然是我們發(fā)送給RabbitMQ服務(wù)器一次梅屉,然后讓RabbitMQ服務(wù)器自己解析需要給哪個(gè)Queue發(fā),那么Exchange就是干這件事的

但是我們給Exchange發(fā)消息鳞贷,他怎么知道給哪個(gè)Queue發(fā)呢坯汤?這里就用到了RoutingKey和BindingKey

BindingKey是Exchange和Queue綁定規(guī)則的描述。Exchange接收到的消息中會(huì)帶有RoutingKey這個(gè)字段悄晃,Exchange就是根據(jù)這個(gè)RoutingKey和當(dāng)前Exchange所有綁定的BindingKey做匹配玫霎,如果滿足要求,就往BindingKey所綁定的Queue發(fā)送消息妈橄,這樣我們就解決了我們向RabbitMQ發(fā)送一次消息庶近,可以分發(fā)到不同的Queue的過(guò)程


關(guān)于RabbitMQ Server的問(wèn)題?

1問(wèn):如何申明的exchange和queue眷蚓?如何發(fā)送和接收消息鼻种?

答:

exchange 申明方法為:
/**
 *  exchange: exchange的名稱
 *  type: 四種類型fanout(分發(fā)),topic(匹配)沙热,direct(直連)叉钥,header(主題)
 *  durable:是否持久化;默認(rèn)為false不持久化
 *  autoDelete:是否自動(dòng)刪除篙贸;默認(rèn)false不刪除
 *  internal:默認(rèn)為false;true表示:這個(gè)exchange不接收client推送來(lái)的消息投队,只接收exchange之間通信
 *  Map<String, Object> arguments:
 */
exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;
queue的申明方法:
/**
 *  queue: queue的名稱
 *  durable:是否持久化;默認(rèn)為false不持久化
 *  exclusive:排他隊(duì)列爵川;
 *  autoDelete:是否自動(dòng)刪除敷鸦;默認(rèn)true自動(dòng)刪除
 *  Map<String, Object> arguments:
 */
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
  • queue: queue的名稱
  • durable跟交換機(jī)方法的參數(shù)一樣,true表示做持久化,當(dāng)RabbitMQ服務(wù)重啟時(shí)扒披,隊(duì)列依然存在值依。
  • exclusive是排他隊(duì)列,如果一個(gè)隊(duì)列被聲明為排他隊(duì)列碟案,那么這個(gè)隊(duì)列只能被第一次聲明他的連接所見(jiàn)愿险,并在連接斷開(kāi)的時(shí)候自動(dòng)刪除。這里有三點(diǎn)需要說(shuō)明价说,1辆亏、同一個(gè)連接的不同channel,是可以訪問(wèn)同一連接下創(chuàng)建的排他隊(duì)列的熔任。2褒链、排他隊(duì)列只能被聲明一次,其他連接不允許聲明同名的排他隊(duì)列疑苔。3、即使排他隊(duì)列是持久化的甸鸟,當(dāng)連接斷開(kāi)或者客戶端退出時(shí)惦费,排他隊(duì)列依然會(huì)被刪除。
  • autoDelete是自動(dòng)刪除抢韭,為true時(shí)薪贫,當(dāng)沒(méi)有任何消費(fèi)者訂閱該隊(duì)列時(shí),隊(duì)列會(huì)被自動(dòng)刪除刻恭。
  • arguments:其它參數(shù):
    • x-message-ttl:創(chuàng)建queue時(shí)設(shè)置該參數(shù)可指定消息在該queue中待多久瞧省,當(dāng)消息到期時(shí),會(huì)主動(dòng)刪除(可根據(jù)x-dead-letter-routing-key和x-dead-letter-exchange生成可延遲的死信隊(duì)列)
    • x-expires:queue存活時(shí)間鳍贾,創(chuàng)建queue時(shí)參數(shù)arguments設(shè)置了x-expires參數(shù)鞍匾,該queue會(huì)在x-expires到期后queue消息,親身測(cè)試直接消失(哪怕里面有未消費(fèi)的消息)骑科。
    • x-max-length:queue消息條數(shù)限制橡淑,限制加入queue中消息的條數(shù)。先進(jìn)先出原則咆爽,超過(guò)10條后面的消息會(huì)頂替前面的消息梁棠。
    • x-max-length-bytes:queue消息容量限制,該參數(shù)和x-max-length目的一樣限制隊(duì)列的容量斗埂,但是這個(gè)是靠隊(duì)列大蟹(bytes)來(lái)達(dá)到限制。
    • x-dead-letter-routing-keyx-dead-letter-exchange :創(chuàng)建queue時(shí)參數(shù)arguments設(shè)置了x-dead-letter-routing-key和x-dead-letter-exchange呛凶,會(huì)在x-message-ttl時(shí)間到期后把消息放到x-dead-letter-routing-key和x-dead-letter-exchange指定的隊(duì)列中達(dá)到延遲隊(duì)列的目的男娄。
    • x-max-priority:消息優(yōu)先級(jí)(版本限制3.5+ )創(chuàng)建queue時(shí)arguments可以使用x-max-priority參數(shù)聲明優(yōu)先級(jí)隊(duì)列的最大優(yōu)先級(jí)(整數(shù),建議0-10之間),在發(fā)布消息的時(shí)候指定該消息的優(yōu)先級(jí)沪伙, 優(yōu)先級(jí)更高(數(shù)值更大的)的消息先被消費(fèi)瓮顽。目前使用更多的優(yōu)先級(jí)將消耗更多的資源(Erlang進(jìn)程)。 設(shè)置該參數(shù)同時(shí)設(shè)置死信隊(duì)列時(shí)或造成已過(guò)期的低優(yōu)先級(jí)消息會(huì)在未過(guò)期的高優(yōu)先級(jí)消息后面執(zhí)行围橡。 該參數(shù)會(huì)造成額外的CPU消耗暖混。
    • x-queue-mode: Lazy;Queues: 先將消息保存到磁盤(pán)上翁授,不放在內(nèi)存中拣播,當(dāng)消費(fèi)者開(kāi)始消費(fèi)的時(shí)候才加載到內(nèi)存中

發(fā)送消息

/**
     * Publish a message.
     *
     * Publishing to a non-existent exchange will result in a channel-level
     * protocol exception, which closes the channel.
     *
     * Invocations of <code>Channel#basicPublish</code> will eventually block if a
     * <a >resource-driven alarm</a> is in effect.
     *
     * @see com.rabbitmq.client.AMQP.Basic.Publish
     * @see <a >Resource-driven alarms</a>.
     * @param exchange the exchange to publish the message to
     * @param routingKey the routing key
     * @param mandatory true if the 'mandatory' flag is to be set
     * @param immediate true if the 'immediate' flag is to be
     * set. Note that the RabbitMQ server does not support this flag.
     * @param props other properties for the message - routing headers etc
     * @param body the message body
     * @throws java.io.IOException if an error is encountered
     */
    void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
            throws IOException;
  • routingKey:路由鍵,#匹配0個(gè)或多個(gè)單詞收擦,*匹配一個(gè)單詞贮配,在topic exchange做消息轉(zhuǎn)發(fā)用
  • mandatory:true:如果exchange根據(jù)自身類型和消息routeKey無(wú)法找到一個(gè)符合條件的queue,那么會(huì)調(diào)用basic.return方法將消息返還給生產(chǎn)者塞赂。默認(rèn)false:出現(xiàn)上述情形broker會(huì)直接將消息扔掉
  • immediate:true:如果exchange在將消息route到queue(s)時(shí)發(fā)現(xiàn)對(duì)應(yīng)的queue上沒(méi)有消費(fèi)者泪勒,那么這條消息不會(huì)放入隊(duì)列中。當(dāng)與消息routeKey關(guān)聯(lián)的所有queue(一個(gè)或多個(gè))都沒(méi)有消費(fèi)者時(shí)宴猾,該消息會(huì)通過(guò)basic.return方法返還給生產(chǎn)者圆存。默認(rèn)false
  • BasicProperties :需要注意的是BasicProperties.deliveryMode,0:不持久化 1:持久化 這里指的是消息的持久化仇哆,配合channel(durable=true),queue(durable)可以實(shí)現(xiàn)沦辙,即使服務(wù)器宕機(jī),消息仍然保留

簡(jiǎn)單來(lái)說(shuō):mandatory標(biāo)志告訴服務(wù)器至少將該消息route到一個(gè)隊(duì)列中讹剔,否則將消息返還給生產(chǎn)者油讯;immediate標(biāo)志告訴服務(wù)器如果該消息關(guān)聯(lián)的queue上有消費(fèi)者,則馬上將消息投遞給它延欠,如果所有queue都沒(méi)有消費(fèi)者陌兑,直接把消息返還給生產(chǎn)者,不用將消息入隊(duì)列等待消費(fèi)者了衫冻。

消費(fèi)消息

在RabbitMQ中消費(fèi)者有2種方式獲取隊(duì)列中的消息:

  • 通過(guò)basic.consume命令诀紊,訂閱某一個(gè)隊(duì)列中的消息,channel會(huì)自動(dòng)在處理完上一條消息之后,接收下一條消息隅俘。(同一個(gè)channel消息處理是串行的)邻奠。除非關(guān)閉channel或者取消訂閱,否則客戶端將會(huì)一直接收隊(duì)列的消息为居。
  • 通過(guò)basic.get命令主動(dòng)獲取隊(duì)列中的消息碌宴,但是絕對(duì)不可以通過(guò)循環(huán)調(diào)用basic.get來(lái)代替basic.consume,這是因?yàn)閎asic.get RabbitMQ在實(shí)際執(zhí)行的時(shí)候蒙畴,是首先consume某一個(gè)隊(duì)列贰镣,然后檢索第一條消息呜象,然后再取消訂閱。如果是高吞吐率的消費(fèi)者碑隆,最好還是建議使用basic.consume恭陡。

簡(jiǎn)單總結(jié)一下就是說(shuō):

consume是只要隊(duì)列里面還有消息就一直取。
get是只取了隊(duì)列里面的第一條消息上煤。
因?yàn)間et開(kāi)銷(xiāo)大休玩,如果需要從一個(gè)隊(duì)列取消息的話,首選consume方式劫狠,慎用循環(huán)get方式拴疤。

問(wèn):什么是死信?

答:三種情況:

  • 消息被拒絕(basic.reject或basic.nack)并且requeue=false.
  • 消息TTL過(guò)期
  • 隊(duì)列達(dá)到最大長(zhǎng)度(隊(duì)列滿了独泞,無(wú)法再添加數(shù)據(jù)到mq中)

問(wèn):什么是死信交換機(jī)呐矾?

答:在定義業(yè)務(wù)隊(duì)列的時(shí)候,要考慮指定一個(gè)死信交換機(jī)懦砂,死信交換機(jī)可以和任何一個(gè)普通的隊(duì)列進(jìn)行綁定蜒犯,然后
在業(yè)務(wù)隊(duì)列出現(xiàn)死信的時(shí)候就會(huì)將數(shù)據(jù)發(fā)送到死信隊(duì)列。

問(wèn):什么是死信隊(duì)列孕惜?

答:死信隊(duì)列實(shí)際上就是一個(gè)普通的隊(duì)列愧薛,只是這個(gè)隊(duì)列跟死信交換機(jī)進(jìn)行了綁定,用來(lái)存放死信而已

死信交換機(jī)圖解
死信交換機(jī)圖解

問(wèn):什么場(chǎng)景下可以用延時(shí)隊(duì)列衫画?用RabbitMQ如何實(shí)現(xiàn)?

答:
電商系統(tǒng)中瓮栗,支付訂單未完成時(shí)削罩,30分鐘后提示用戶,可以用延時(shí)隊(duì)列來(lái)實(shí)現(xiàn)费奸。
rabbitmq本身不具有延時(shí)消息隊(duì)列的功能弥激,但是可以通過(guò)TTL(Time To Live)、DLX(Dead Letter Exchanges)特性實(shí)現(xiàn)愿阐。其原理給消息設(shè)置過(guò)期時(shí)間微服,在消息隊(duì)列上為過(guò)期消息指定轉(zhuǎn)發(fā)器,這樣消息過(guò)期后會(huì)轉(zhuǎn)發(fā)到與指定轉(zhuǎn)發(fā)器匹配的隊(duì)列上缨历,變向?qū)崿F(xiàn)延時(shí)隊(duì)列以蕴。
(rabbitmq-delayed-message-exchange 這個(gè)插件可以實(shí)現(xiàn)延遲隊(duì)列的功能)

問(wèn):設(shè)置每條消息的失效時(shí)間,先設(shè)置一條30s的失效消息發(fā)送給MQ辛孵,再設(shè)置一條10s的失效消息發(fā)給MQ丛肮,那么什么收到消息的順序是什么樣的?什么時(shí)間能收到魄缚?

答:

申明隊(duì)列隊(duì)時(shí)設(shè)置消息的失效時(shí)間宝与,方式為:

Map<String, Object> argsMap = new HashMap<String, Object>();
argsMap.put("x-message-ttl", 6000); // 消息6s后失效
argsMap.put("x-dead-letter-exchange", "some.dead.exchange.name");//設(shè)置死信交換機(jī)
argsMap.put("x-dead-letter-routing-key", "some-dead-routing-key");//設(shè)置死信routingKey
String myqueue = "myqueue2";
channel.queueDeclare(myqueue, false, false, false, argsMap);

設(shè)置每條消息的失效時(shí)間焚廊,方式為:
new AMQP.BasicProperties.Builder().deliveryMode(2).expiration("2000").build();

問(wèn):兩種設(shè)置失效方式有什么不一樣呢?

經(jīng)過(guò)試驗(yàn)得出习劫,對(duì)于第一種設(shè)置隊(duì)列TTL屬性的方法咆瘟,一旦消息過(guò)期,就會(huì)從隊(duì)列中抹去诽里,而第二種方法里袒餐,即使消息過(guò)期,也不會(huì)馬上從隊(duì)列中抹去须肆,因?yàn)槊織l消息是否過(guò)期時(shí)在即將投遞到server之前判定的匿乃,為什么兩者得處理方法不一致?因?yàn)榈谝环N方法里豌汇,隊(duì)列中已過(guò)期的消息肯定在隊(duì)列頭部幢炸,RabbitMQ只要定期從隊(duì)頭開(kāi)始掃描是否有過(guò)期消息即可,而第二種方法里拒贱,每條消息的過(guò)期時(shí)間不同宛徊,如果要?jiǎng)h除所有過(guò)期消息,勢(shì)必要掃描整個(gè)隊(duì)列逻澳,所以不如等到此消息即將被消費(fèi)時(shí)再判定是否過(guò)期闸天,如果過(guò)期,再進(jìn)行刪除斜做。

所以苞氮,會(huì)在30s之后,收到兩條消息瓤逼。

問(wèn):隊(duì)列設(shè)置了消息失效時(shí)間TTL為10s笼吟,消息1設(shè)置了失效時(shí)間5s,消息2設(shè)置了失效時(shí)間15s霸旗,當(dāng)無(wú)消費(fèi)者時(shí)贷帮,消息1和2分別多久后會(huì)出現(xiàn)在死信隊(duì)列里?

答:失效時(shí)間近的設(shè)置先生效诱告。消息1在5s后撵枢,消息2在10s后。


關(guān)于RabbitMQ連接的建立精居、傳輸?shù)膯?wèn)題

問(wèn):connection關(guān)閉后锄禽,channel是否會(huì)消失?

答:會(huì)箱蟆。

問(wèn):相同的connection沟绪、channel能否申明已經(jīng)申明過(guò)的exchange、queue空猜?

答:可以绽慈。

問(wèn):相同的connection恨旱、channel能否再次申明已經(jīng)申明過(guò)但屬性不一樣的exchange、queue坝疼,后申明的屬性是否覆蓋之前的屬性搜贤?

答:不可以反復(fù)申明不同屬性的exchange,會(huì)拋出異常钝凶。相同屬性的exchange可以反復(fù)申明仪芒。

問(wèn):新建立的connection、channel能否申明已經(jīng)被其他connection耕陷、channel申明過(guò)的exchange掂名、queue?

答:可以哟沫,但那是屬性必須和已經(jīng)申明的exchange一致饺蔑。

問(wèn):客戶端主動(dòng)斷開(kāi)connection,RabbitMQ服務(wù)端是否會(huì)刪除這個(gè)connection所申明的exchange嗜诀、queue猾警?

答:exchange默認(rèn)不會(huì)。queue默認(rèn)會(huì)隆敢。在創(chuàng)建的時(shí)候可以設(shè)置這個(gè)屬性发皿。

問(wèn):生產(chǎn)者client發(fā)送消息后,服務(wù)器宕機(jī)拂蝎,導(dǎo)致消息丟失穴墅。RabbitMQ是通過(guò)什么方式避免消息發(fā)送時(shí)丟失?

答:事務(wù)機(jī)制(性能消耗大)和confirm模式(消耗形伦浴)

事務(wù)機(jī)制

RabbitMQ提供了txSelect()封救、txCommit()和txRollback()三個(gè)方法對(duì)消息發(fā)送進(jìn)行事務(wù)管理,txSelect用于將通道channel開(kāi)啟事務(wù)模式捣作,txCommit用于提交事務(wù),txRollback用戶進(jìn)行事務(wù)回滾操作鹅士。

示例代碼:

try{
   //channel開(kāi)啟事務(wù)模式
   channel.txSelect();
   //發(fā)送消息
   channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
   //模擬異常
   int n = 1/0;
   //提交事務(wù)
   channel.txCommit();
}catch(Exception e){
   e.printStackTrace();
   channel.txRollback();
}
假如在txCommit之前發(fā)生了異常券躁,那么就可以通過(guò)Rollback進(jìn)行回滾操作。
以上是基于AMQP協(xié)議層的事務(wù)機(jī)制掉盅,確保了數(shù)據(jù)在生產(chǎn)者與RabbitMQ服務(wù)器之間的可靠性也拜,但是性能開(kāi)銷(xiāo)較大。
Confirm模式
RabbitMQ提供了一種低消耗的事務(wù)管理方式趾痘,將channel設(shè)置成confirm模式慢哈。confirm模式的channel,通過(guò)該channel發(fā)出的消息會(huì)生成一個(gè)唯一的有序ID(從1開(kāi)始)永票,一旦消息成功發(fā)送到相應(yīng)的隊(duì)列之后卵贱,RabbitMQ服務(wù)端會(huì)發(fā)送給生產(chǎn)者一個(gè)確認(rèn)標(biāo)志滥沫,包含消息的ID,這樣生產(chǎn)者就知道該消息已經(jīng)發(fā)送成功了键俱。如果消息和隊(duì)列是持久化的兰绣,那么當(dāng)消息成功寫(xiě)入磁盤(pán)之后,生產(chǎn)者會(huì)收到確認(rèn)消息编振。此外服務(wù)端也可以設(shè)置basic.ack的mutiple域缀辩,表明是否是批量確認(rèn)的消息,即該序號(hào)之前的所有消息都已經(jīng)收到了踪央。
confirm的機(jī)制是異步的臀玄,生產(chǎn)者可以在等待的同時(shí)繼續(xù)發(fā)送下一條消息,并且異步等待回調(diào)處理畅蹂,如果消息成功發(fā)送健无,會(huì)返回ack消息供異步處理,如果消息發(fā)送失敗發(fā)生異常魁莉,也會(huì)返回nack消息睬涧。confirm的時(shí)間沒(méi)有明確說(shuō)明,并且同一個(gè)消息只會(huì)被confirm一次旗唁。
我們?cè)谏a(chǎn)者使用如下代碼開(kāi)啟channel的confirm模式畦浓,**已經(jīng)開(kāi)啟事務(wù)機(jī)制的channel是不能開(kāi)啟confirm模式的。**
channel.confirmSelect();

處理ack或者nack的方式有三種:

串行confirm:

每發(fā)送一條消息就調(diào)用waitForConfirms()方法等待服務(wù)端confirm

//開(kāi)啟confirm模式
channel.confirmSelect();
String message = "Hello World";
//發(fā)送消息
channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
//判斷是否回復(fù)
if(channel.waitForConfirms()){
    System.out.println("Message send success."); 
 }

其中waitForConfirms可以換成帶有時(shí)間參數(shù)的方法waitForConfirms(Long mills)指定等待響應(yīng)時(shí)間

批量confirm:

每發(fā)送一批次消息就調(diào)用waitForConfirms()方法等待服務(wù)端confirm

//開(kāi)啟confirm模式

channel.confirmSelect();

for(int i =0;i<1000;i++){
    String message = "Hello World";
    //發(fā)送消息
    channel.basicPublish(EXCHANGE_NAME,
                     "",
                     MessageProperties.PERSISTENT_TEXT_PLAIN,
                     message.getBytes());
    if(i%100==0){
        //每發(fā)送100條判斷一次是否回復(fù)
        if(channel.waitForConfirms()){
          System.out.println("Message send success."); 
        }
    }
}
批量的方法從數(shù)量級(jí)上降低了confirm的性能消耗检疫,提高了效率讶请,但是有個(gè)致命的缺陷,一旦回復(fù)確認(rèn)失敗屎媳,當(dāng)前確認(rèn)批次的消息會(huì)全部重新發(fā)送夺溢,導(dǎo)致消息重復(fù)發(fā)送。所以批量的confirm雖然性能提高了烛谊,但是消息的重復(fù)率也提高了风响。
異步confirm:

使用監(jiān)聽(tīng)方法,當(dāng)服務(wù)端confirm了一條或多條消息后丹禀,調(diào)用回調(diào)方法

//聲明一個(gè)用來(lái)記錄消息唯一ID的有序集合SortedSet
final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
//開(kāi)啟confirm模式
channel.confirmSelect();
//異步監(jiān)聽(tīng)方法 處理ack與nack方法
channel.addConfirmListener(new ConfirmListener() {
    //處理ack multiple 是否批量 如果是批量 則將比該條小的所有數(shù)據(jù)都移除 否則只移除該條
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        if (multiple) {
            confirmSet.headSet(deliveryTag + 1).clear();
        } else {
            confirmSet.remove(deliveryTag);
        }
    }
    //處理nack 與ack相同
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("There is Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
        if (multiple) {
            confirmSet.headSet(deliveryTag + 1).clear();
        } else {
            confirmSet.remove(deliveryTag);
        }
    }
});
while (true) {
    //獲取消息confirm的唯一ID
    long nextSeqNo = channel.getNextPublishSeqNo();
    String message = "Hello World.";
    //發(fā)送消息
    channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
    //將ID加入到有序集合中
    confirmSet.add(nextSeqNo);
}

每一個(gè)comfirm的通道維護(hù)一個(gè)集合状勤,每發(fā)送一條數(shù)據(jù),集合增加一個(gè)元素双泪,每異步響應(yīng)一條ack或者nack的數(shù)據(jù)持搜,集合刪除一條。SortedSet是一個(gè)有序的集合焙矛,它的有序是值大小的有序葫盼,不是插入時(shí)間的有序。JDK中waitForConfirms()方法也是使用了SortedSet集合

7問(wèn):RabbitMQ如何知道消息已經(jīng)被consumer消費(fèi)村斟?

答:ack機(jī)制贫导。為了保證RabbitMQ能夠感知消費(fèi)者正確取到了消息抛猫,RabbitMQ提供了消息確認(rèn)機(jī)制,與給生產(chǎn)者回復(fù)ACK的方式類似脱盲,當(dāng)隊(duì)列發(fā)送一條消息給消費(fèi)者時(shí)邑滨,會(huì)記錄一個(gè)unack標(biāo)志,當(dāng)消費(fèi)者拿到消息之后钱反,會(huì)回復(fù)一個(gè)ack標(biāo)志掖看,從而抵消了原來(lái)的unack標(biāo)志。

8問(wèn):消費(fèi)者中默認(rèn)的ack機(jī)制是什么樣的面哥?

答:默認(rèn)情況是:當(dāng)消費(fèi)者拿到消息之后立即回復(fù)ack而不管消息是否正確被處理哎壳,就回復(fù)。這個(gè)時(shí)間很快尚卫,以至于基本看不到unack的狀態(tài)归榕。

9問(wèn):消費(fèi)者的默認(rèn)ack機(jī)制,會(huì)有什么問(wèn)題么吱涉?

答:消息在消費(fèi)者端阻塞(消息重復(fù)或者大量的消息堆積)刹泄。假如消費(fèi)者在接收消息后,業(yè)務(wù)處理的過(guò)程中發(fā)生異常crash了怎爵,那么這條消息就消失了特石,持久化也無(wú)法解決這個(gè)問(wèn)題。這里就需要我們?cè)谌粘5臉I(yè)務(wù)處理中鳖链,消費(fèi)者要手動(dòng)的確認(rèn)消息姆蘸。確認(rèn)消息包括兩種,一種是ack芙委,另一種是unack逞敷,unack是表明我這條消息處理異常了,可以設(shè)置參數(shù)告訴MQ服務(wù)器是否需要將消息重新放入到隊(duì)列中灌侣。同時(shí)推捐,如果開(kāi)啟了手動(dòng)回復(fù)確認(rèn)的消費(fèi)者,當(dāng)消費(fèi)者異常斷開(kāi)時(shí)侧啼,沒(méi)有回復(fù)的消息會(huì)被重新放入隊(duì)列供給其他消費(fèi)者使用玖姑。所以程序員必須一定要記得回復(fù)消息確認(rèn),不然會(huì)導(dǎo)致消息重復(fù)或者大量的消息堆積慨菱。

10問(wèn):如何解決消費(fèi)者的消息阻塞(消息重復(fù)或者大量的消息堆積)?

阻塞的問(wèn)題的解決方案就是設(shè)置合理的prefetch大小戴甩,處理能力快的消費(fèi)者設(shè)置數(shù)值大符喝,處理更多的消息,處理能力慢的消費(fèi)者設(shè)置數(shù)值小甜孤,少處理消息协饲,也就不會(huì)發(fā)生阻塞畏腕。假設(shè)有兩個(gè)消費(fèi)者,消費(fèi)者1處理業(yè)務(wù)時(shí)間是2s茉稠,消費(fèi)者2處理業(yè)務(wù)時(shí)間是2ms描馅,都設(shè)置prefetch大小為10,那么就不會(huì)出現(xiàn)消費(fèi)者1積累大量的unack而线,這里最多的unack數(shù)目就是兩個(gè)prefetch的大小之和20铭污,同時(shí),MQ分發(fā)消息是先塞滿10個(gè)到消費(fèi)者1膀篮,再塞滿10個(gè)到消費(fèi)者2嘹狞,塞第21個(gè)的時(shí)候,先看消費(fèi)者1的緩沖池有沒(méi)有空位誓竿,沒(méi)有的話去看消費(fèi)者2磅网,因?yàn)橄M(fèi)者2的處理速度比1快1000倍,所以1000條數(shù)據(jù)前10條塞給消費(fèi)者1之后筷屡,后邊的數(shù)據(jù)就都塞給消費(fèi)者2了涧偷。設(shè)置prefetch大小的方法:在消費(fèi)者中加入代碼:channel.basicQos(10);

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市毙死,隨后出現(xiàn)的幾起案子燎潮,更是在濱河造成了極大的恐慌,老刑警劉巖规哲,帶你破解...
    沈念sama閱讀 218,386評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件跟啤,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡唉锌,警方通過(guò)查閱死者的電腦和手機(jī)隅肥,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,142評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)袄简,“玉大人腥放,你說(shuō)我怎么就攤上這事÷逃铮” “怎么了秃症?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,704評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)吕粹。 經(jīng)常有香客問(wèn)我种柑,道長(zhǎng),這世上最難降的妖魔是什么匹耕? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,702評(píng)論 1 294
  • 正文 為了忘掉前任聚请,我火速辦了婚禮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘驶赏。我一直安慰自己炸卑,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,716評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布煤傍。 她就那樣靜靜地躺著盖文,像睡著了一般。 火紅的嫁衣襯著肌膚如雪蚯姆。 梳的紋絲不亂的頭發(fā)上五续,一...
    開(kāi)封第一講書(shū)人閱讀 51,573評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音蒋失,去河邊找鬼返帕。 笑死,一個(gè)胖子當(dāng)著我的面吹牛篙挽,可吹牛的內(nèi)容都是我干的荆萤。 我是一名探鬼主播,決...
    沈念sama閱讀 40,314評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼铣卡,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼链韭!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起煮落,我...
    開(kāi)封第一講書(shū)人閱讀 39,230評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤敞峭,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后蝉仇,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體旋讹,經(jīng)...
    沈念sama閱讀 45,680評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,873評(píng)論 3 336
  • 正文 我和宋清朗相戀三年轿衔,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了沉迹。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,991評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡害驹,死狀恐怖鞭呕,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情宛官,我是刑警寧澤葫松,帶...
    沈念sama閱讀 35,706評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站底洗,受9級(jí)特大地震影響腋么,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜亥揖,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,329評(píng)論 3 330
  • 文/蒙蒙 一党晋、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧,春花似錦未玻、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,910評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至昼激,卻和暖如春庇绽,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背橙困。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,038評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工瞧掺, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人凡傅。 一個(gè)月前我還...
    沈念sama閱讀 48,158評(píng)論 3 370
  • 正文 我出身青樓辟狈,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親夏跷。 傳聞我的和親對(duì)象是個(gè)殘疾皇子哼转,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,941評(píng)論 2 355