RabbitMq基本組件
一、 channel 信道:
信道是生產(chǎn)消費(fèi)者與rabbit通信的渠道,生產(chǎn)者publish或是消費(fèi)者subscribe一個(gè)隊(duì)列都是通過(guò)信道來(lái)通信的。信道是建立在TCP連接上的虛擬連接,什么意思呢?就是說(shuō)rabbitmq在一條TCP上建立成百上千個(gè)信道來(lái)達(dá)到多個(gè)線程處理熊赖,這個(gè)TCP被多個(gè)線程共享,每個(gè)線程對(duì)應(yīng)一個(gè)信道虑椎,信道在rabbit都有唯一的ID ,保證了信道私有性震鹉,對(duì)應(yīng)上唯一的線程使用。
怎么理解捆姜?和數(shù)據(jù)庫(kù)連接池的連接類似传趾。TCP連接不關(guān)閉,支持多次和mq服務(wù)端通信泥技。
疑問(wèn):為什么不建立多個(gè)TCP連接呢浆兰?原因是rabbit保證性能,系統(tǒng)為每個(gè)線程開(kāi)辟一個(gè)TCP是非常消耗性能零抬,每秒成百上千的建立銷毀TCP會(huì)嚴(yán)重消耗系統(tǒng)镊讼。所以rabbitmq選擇建立多個(gè)信道(建立在tcp的虛擬連接)連接到rabbit上。
類似概念:TCP是電纜平夜,信道就是里面的光纖,每個(gè)光纖都是獨(dú)立的卸亮,互不影響忽妒。
二、exchange 交換機(jī)和綁定routing key
exchange的作用就是類似路由器,routing key 就是路由鍵段直,服務(wù)器會(huì)根據(jù)路由鍵將消息從交換器路由到隊(duì)列上去吃溅。
exchange有多個(gè)種類:direct,fanout鸯檬,topic决侈,header(非路由鍵匹配,功能和direct類似喧务,很少用)赖歌。前三種類似集合對(duì)應(yīng)關(guān)系那樣,(direct)1:1,(fanout)1:N,(topic)N:1
direct: 1:1類似完全匹配
fanout:1:N 可以把一個(gè)消息并行發(fā)布到多個(gè)隊(duì)列上去功茴,簡(jiǎn)單的說(shuō)就是庐冯,當(dāng)多個(gè)隊(duì)列綁定到fanout的交換器,那么交換器一次性拷貝多個(gè)消息分別發(fā)送到綁定的隊(duì)列上,每個(gè)隊(duì)列有這個(gè)消息的副本坎穿。
ps:這個(gè)可以在業(yè)務(wù)上實(shí)現(xiàn)并行處理多個(gè)任務(wù)展父,比如,用戶上傳圖片功能玲昧,當(dāng)消息到達(dá)交換器上栖茉,它可以同時(shí)路由到積分增加隊(duì)列和其它隊(duì)列上,達(dá)到并行處理的目的孵延,并且易擴(kuò)展衡载,以后有什么并行任務(wù)的時(shí)候,直接綁定到fanout交換器不需求改動(dòng)之前的代碼隙袁。
topic N:1 痰娱,多個(gè)交換器可以路由消息到同一個(gè)隊(duì)列。根據(jù)模糊匹配菩收,比如一個(gè)隊(duì)列的routing key 為*.test 梨睁,那么凡是到達(dá)交換器的消息中的routing key 后綴.test都被路由到這個(gè)隊(duì)列上。
三娜饵、結(jié)合信道坡贺、交換器和路由鍵到隊(duì)列
總結(jié)幾點(diǎn)重要知識(shí):1.信道才是rabbit通信本質(zhì),生產(chǎn)者和消費(fèi)者都是通過(guò)信道完成消息生產(chǎn)消費(fèi)的箱舞。2.交換器本質(zhì)是一張路由查詢表(名稱和隊(duì)列id遍坟,類似于hash表),這是一個(gè)虛擬出來(lái)的東西晴股,并不存在真實(shí)的交換器愿伴。
消息的生命周期:生產(chǎn)者生產(chǎn)消息A 交由信道,信道通過(guò)消息(消息由載體和標(biāo)簽)的標(biāo)簽(路由鍵)放到交換器發(fā)送到隊(duì)列上(其實(shí)就是查詢匹配电湘,一旦匹配到了規(guī)則隔节,信道就直接和隊(duì)列產(chǎn)生連接鹅经,然后將消息發(fā)送過(guò)去)
mq消息可靠性分析
如何保證mq中的消息穩(wěn)定可靠地被使用消費(fèi)?
1 消息從產(chǎn)生到被消費(fèi)經(jīng)過(guò)以下幾步:
1 消息生產(chǎn)者產(chǎn)生發(fā)送到exchang
2 交換機(jī)根據(jù)路由規(guī)則將消息轉(zhuǎn)發(fā)到對(duì)應(yīng)的隊(duì)列上
3 消息在隊(duì)列上存儲(chǔ)
4 消費(fèi)者consumer訂閱隊(duì)列queue進(jìn)行消費(fèi)
如何保證上訴4步的可靠性
一消息從生產(chǎn)者發(fā)出到交換機(jī)需要后 交換機(jī)要返回確認(rèn)消息
以保證 生產(chǎn)者感知到消息是否正確發(fā)送到交換器中怎诫,如果失敗瘾晃,生產(chǎn)者可以進(jìn)一步處理重新發(fā)送。
確認(rèn)機(jī)制
1.開(kāi)啟事務(wù)
RabbiMQ客戶端與事務(wù)機(jī)制相關(guān)的方法有三個(gè):
channel.txSelect
用于將當(dāng)前信道轉(zhuǎn)換為事務(wù)模式幻妓。
channel.txCommit
用于提交事務(wù)
channel.txRollback
用于事務(wù)回滾
通過(guò)select方法開(kāi)啟事務(wù)后便可以發(fā)消息到RabbitMQ 如果事務(wù)提交成功蹦误,那么消息一定到了MQ中,如果事務(wù)提交之前異常報(bào)錯(cuò)肉津,便可以將其捕獲强胰,通過(guò)channel.txRollBack方法回滾。注意這里的事務(wù)與數(shù)據(jù)庫(kù)中事務(wù)的區(qū)別以及兩者共用時(shí)的情況阀圾,見(jiàn)后面補(bǔ)充A兩種事務(wù)哪廓。
缺點(diǎn):事務(wù)機(jī)制可以確保消息一定提交到了MQ中,或回滾重發(fā)初烘,但使用事務(wù)機(jī)制會(huì)吸干MQ的性能涡真。
2.發(fā)送方確認(rèn)機(jī)制(publisher confirm)
生產(chǎn)者將信道設(shè)置為確認(rèn)模式,一旦信道進(jìn)入confirm模式肾筐,所有在該信道發(fā)布的消息都會(huì)被指派一個(gè)唯一的ID,一旦消息被投遞到所有匹配的隊(duì)列之后哆料,RabbitMQ就會(huì)發(fā)送一個(gè)確認(rèn)Basic.Ack給生產(chǎn)者包含消息的唯一ID,這就使得生產(chǎn)者確認(rèn)消息正確到達(dá)目的地了。
RabbitMq回傳的生產(chǎn)者確認(rèn)消息中的deliveryTag包含了確認(rèn)消息的序號(hào)吗铐,此外RabbitMQ也可以設(shè)置channel.basicAck方法中的multiple參數(shù)东亦,表示這個(gè)序列號(hào)之前的消息都得到了處理。
相比于事務(wù)機(jī)制在一條消息發(fā)送后會(huì)使得發(fā)送端阻塞唬渗,發(fā)送確認(rèn)機(jī)制最大的好處是它是異步的典阵,一旦發(fā)送一條消息,生產(chǎn)者可以在等待信道返回確認(rèn)的同時(shí)發(fā)送下一條消息镊逝,當(dāng)消息得到最終確認(rèn)后壮啊,生產(chǎn)者可以通過(guò)回調(diào)處理消息,而因?yàn)殄e(cuò)誤導(dǎo)致消息丟失撑蒜,信道會(huì)返回一條nac看(Basic.Nack)命令歹啼,生產(chǎn)者通可以通過(guò)回調(diào)處理它。
具體方法:
channel.confirmSelect
即Confirm.Select命令將信道設(shè)置為confirm模式座菠。所有被發(fā)送的消息都會(huì)被ack或nack一次狸眼,但消息被confim的快慢沒(méi)有任何保證。
異步的確認(rèn)機(jī)制雖然提高了性能浴滴,但也存在隱患拓萌,因?yàn)楫惒骄褪且环N事后發(fā)現(xiàn)機(jī)制。這一點(diǎn)見(jiàn)補(bǔ)充B異步確認(rèn)與自動(dòng)重試巡莹。
注意:
事務(wù)機(jī)制和publisher confirm機(jī)制兩者是互斥的司志,不能共存甜紫。
若將已開(kāi)啟事務(wù)的信道再設(shè)置為publisher confirm模式降宅,RabbitMQ會(huì)報(bào)錯(cuò):{amqp_error, precondition_failed, "cannot switch from tx to confirm mode", 'confirm.select'}
若將已開(kāi)啟publisher confirm模式的信道再設(shè)置事務(wù)模式的話骂远,RabbitMQ會(huì)報(bào)錯(cuò){amqp_error, precondition_failed, "cannot switch from confirm to tx mode", 'tx.select' }
事務(wù)機(jī)制和publisher confirm模式可以確保消息能夠正常的發(fā)送到RabbitMQ的交換器,但如果交換器沒(méi)有能夠正確的匹配到隊(duì)列腰根,消息也會(huì)丟失激才。
這就需要下面將要講的其他機(jī)制進(jìn)行一把騷操作來(lái)確保可靠性额嘿。
二 交換機(jī)將消息路由到隊(duì)列時(shí)的可靠性機(jī)制
路由失敗返回生產(chǎn)者
channel.basicPublish
中有兩個(gè)重要的參數(shù)mandatory(強(qiáng)制)瘸恼,和immediate(立刻),兩者都可以將不可達(dá)目的地的消息返回給生產(chǎn)者册养。
RabbitMQ去掉了對(duì)immediate的參數(shù)支持东帅,因?yàn)樗鼤?huì)影響到鏡像隊(duì)列的性能,增加代碼復(fù)雜性球拦,建議用TTL和DLX方法替代靠闭。
當(dāng)mandatory設(shè)置為true后,當(dāng)消息無(wú)法被交換器根據(jù)自身類型和路由鍵路由到某個(gè)隊(duì)列的話坎炼,RabbitMQ會(huì)調(diào)用Basic.Return命令將消息返回給生產(chǎn)者愧膀。
生產(chǎn)者可以通過(guò)調(diào)用channel.addReturnListener來(lái)添加ReturnListener的監(jiān)聽(tīng)實(shí)現(xiàn),代碼為:
channel.basicPublish(EXCHANGE_NAME, "", true, MessageProperties.PERSISTENT_TEXT_PLAIN, "mandatory test".getBytes());
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP
.BasicProperties basicProperties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("Basic.Return返回的結(jié)果是:" + message);
}
});
上面代碼中生產(chǎn)者沒(méi)有成功的將消息路由到隊(duì)列谣光,此時(shí)RabbitMQ會(huì)通過(guò)Basic.Return返回“mandatory test”這條消息檩淋,之后生產(chǎn)者客戶端通過(guò)ReturnListener監(jiān)聽(tīng)到了這個(gè)事件,上面代碼的最后輸出應(yīng)該是“Basic.Return返回的結(jié)果是:mandatory test”萄金。
生產(chǎn)者拿到路由失敗的消息可以重試或進(jìn)行其他處理蟀悦。
當(dāng)mandatory設(shè)置為false后,這樣的消息則會(huì)被直接丟棄氧敢。
路由失敗存儲(chǔ)起來(lái)
此外RabbitMQ提供了備份交換器AE(Alternate Exchange)可以將不能被l路由的消息存儲(chǔ)起來(lái)而不返回給生產(chǎn)者日戈。
設(shè)置mandatory監(jiān)聽(tīng)實(shí)現(xiàn)的代碼較為復(fù)雜,可以使用備份路由器來(lái)避免消息丟失福稳。
再在需要的時(shí)候去處理這些消息涎拉。
可以在聲明交換器時(shí)調(diào)用channel.exchangeDeclare
方法時(shí)添加alternate-exchange參數(shù)來(lái)實(shí)現(xiàn),也可以通過(guò)設(shè)置策略的方式實(shí)現(xiàn)的圆,謙和的優(yōu)先級(jí)更高會(huì)覆蓋后面policy的設(shè)置鼓拧。路由策略設(shè)置見(jiàn)補(bǔ)充C。
備份交換器和普通路由器沒(méi)有太大區(qū)別越妈,被重新發(fā)送到備用路由器的路由鍵和從生產(chǎn)者發(fā)出的路由鍵相同季俩,備用交換器建議設(shè)置為fanout,且可以將一個(gè)備份路由器作為所有路由器的AE梅掠。但AE要確保綁定好了隊(duì)列酌住。如果mandatory參數(shù)和備份交換器一起使用那么mandatory失效店归。
這樣一來(lái)確保了第二部從路由器到隊(duì)列的可靠性。
三消息存入隊(duì)列之后的可靠性保障
持久化可以提高隊(duì)列的可靠性酪我,以防在異常(重啟消痛,關(guān)閉,宕機(jī))下的數(shù)據(jù)丟失都哭。
消息持久化與隊(duì)列持久化
隊(duì)列的持久化是在聲明隊(duì)列時(shí)秩伞,將durable設(shè)置為true實(shí)現(xiàn)的,若隊(duì)列不設(shè)置持久化欺矫,那么當(dāng)MQ服務(wù)重啟時(shí)纱新,相關(guān)隊(duì)列的元數(shù)據(jù)都會(huì)丟失,隊(duì)列中的數(shù)據(jù)也會(huì)丟失穆趴。
隊(duì)列的持久化能保證本身的元數(shù)據(jù)不會(huì)因?yàn)楫惓G失脸爱,但不能保證存儲(chǔ)的消息不丟失。
消息的持久化未妹,需要通過(guò)將消息的投遞模式BasicProperties中的deliveryMode屬性設(shè)置為2即可實(shí)現(xiàn)消息的持久化簿废。
隊(duì)列和消息的持久化都設(shè)置了,MQ服務(wù)重啟后消息仍然存在教寂,單獨(dú)設(shè)消息持久化捏鱼,隊(duì)列會(huì)在重啟后丟失,進(jìn)而消息丟失酪耕。
單獨(dú)設(shè)隊(duì)列持久化导梆,重啟后隊(duì)列存在但消息會(huì)丟失。
消息進(jìn)入MQ后需要一段時(shí)間才能存入磁盤中迂烁。MQ不為每條消息同步磁盤看尼,可能僅僅保存到操作系統(tǒng)緩存而不是物理緩存中,若在期間出現(xiàn)宕機(jī)盟步,重啟等異常情況藏斩,消息還沒(méi)有落盤,消息也會(huì)丟失却盘。
注意
在第一步中的事務(wù)機(jī)制或publisher confim機(jī)制狰域,服務(wù)端返回是在消息落盤之后執(zhí)行的,可以進(jìn)一步提高消息的可靠性黄橘。
這里的落盤返回并不和上面第一部說(shuō)的第一步只到exchange矛盾兆览,沒(méi)有綁定隊(duì)列就直接返回 綁定了就落盤返回 相當(dāng)于sync執(zhí)行一個(gè)方法,沒(méi)有綁定就是空方法塞关,綁定了就是實(shí)方法
鏡像隊(duì)列
鏡像隊(duì)列相當(dāng)于提供了副本抬探,即多副本保證高可用HA。當(dāng)主節(jié)點(diǎn)掛了帆赢,自動(dòng)切到子節(jié)點(diǎn)小压,除非整個(gè)集群掛了线梗,可以保證高可靠性。
四 消費(fèi)者消費(fèi)消息的可靠性
要確保消費(fèi)者完成了對(duì)消息的處理后再?gòu)年?duì)列中刪除消息怠益,隊(duì)列需要得到消費(fèi)者消費(fèi)的確認(rèn)消息仪搔,若消費(fèi)者消費(fèi)失敗則重新入隊(duì)。
消息確認(rèn)機(jī)制 message acknowledgement
消費(fèi)者在訂閱隊(duì)列時(shí)可以指定autoAck參數(shù)溉痢。
當(dāng)ack為false時(shí)僻造,RabbitMQ會(huì)等待消費(fèi)者顯示的回復(fù)了信號(hào)后才從內(nèi)存或磁盤中刪除(先打上標(biāo)記 之后再集中刪除)
當(dāng)ack為false時(shí)RabbitMq會(huì)把自動(dòng)發(fā)送出去的消息置為確認(rèn)憋他,然后刪除孩饼,不管消費(fèi)者是否真正的消費(fèi)了這些消息。
采用消息確認(rèn)機(jī)制后竹挡,RabbitMQ會(huì)一直等待持有消息的消費(fèi)者顯示的調(diào)用Basic.ACK命令為止镀娶,消費(fèi)者有足夠時(shí)間處理消息。
而此時(shí)RabbitMQ中的消息分為兩部分:一部分是等待投遞的消息揪罕,另一部分是已經(jīng)投遞給消費(fèi)者但還沒(méi)有收到確認(rèn)消息的消息梯码,若一直沒(méi)有收到確認(rèn)消息且此消息的消費(fèi)者已經(jīng)斷開(kāi)連接那么RabbitMQ會(huì)安排消息重新入隊(duì)等待投遞給笑一個(gè)消費(fèi)者。
RabbitMQ不會(huì)為未確認(rèn)的消息設(shè)置過(guò)期時(shí)間好啰,他判斷是否需要重新投遞給現(xiàn)在的唯一依據(jù)時(shí)該消息的消費(fèi)者鏈接是否已經(jīng)斷開(kāi)轩娶,未確認(rèn)的消息可以存在很久。
若消息消費(fèi)失敗可以調(diào)用Basic.Reject或Basic.Nack來(lái)拒絕當(dāng)前消息框往,若只是拒絕那么消息會(huì)丟失鳄抒,若將相應(yīng)的requeue參數(shù)設(shè)置為true,那么RabbitMQ會(huì)重新將這條消息插入隊(duì)列椰弊,重新投遞许溅;若設(shè)置requeue參數(shù)設(shè)置為false,RabbitMQ會(huì)把消息立即從隊(duì)列中移除不再發(fā)送秉版。
注意requeue的消息會(huì)存入隊(duì)列頭部贤重,可以快速的又被發(fā)送給消費(fèi)者,若消費(fèi)者又不能正常的消費(fèi)而又requeue就會(huì)加入一個(gè)無(wú)盡的循環(huán)清焕。
因此出現(xiàn)無(wú)法正常消費(fèi)的消息市不要采用requeue發(fā)生來(lái)確辈⒒龋可靠性,而是重新投遞到新的隊(duì)列中比如設(shè)置死信隊(duì)列秸妥,可以確保消息不丟失而避免死循環(huán)滚停。
對(duì)于死信隊(duì)列可以用另外的方式消費(fèi),找出問(wèn)題筛峭。