一、RabbitMQ簡(jiǎn)介
AMQP昆庇,即Advanced Message Queuing Protocol,一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級(jí)消息隊(duì)列協(xié)議挨稿,是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)速址。
RabbitMQ辕棚,又稱為高性能分布式消息隊(duì)列,它實(shí)現(xiàn)了AMQP標(biāo)準(zhǔn)協(xié)議别凹。
分布式消息隊(duì)列有很多應(yīng)用場(chǎng)景草讶,比如異步處理、應(yīng)用解耦炉菲、流量削峰等堕战。
1、異步處理
用戶注冊(cè)后需要發(fā)送短信和郵件拍霜,傳統(tǒng)做法是先將用戶信息寫入數(shù)據(jù)庫(kù)嘱丢,然后發(fā)送短信、發(fā)送郵件沉御,都完成后返回屿讽。
如果用到消息隊(duì)列昭灵,可以先將用戶信息寫入數(shù)據(jù)庫(kù)吠裆,然后將注冊(cè)信息寫入消息隊(duì)列,發(fā)送短信烂完、發(fā)送郵件或者還有其他的業(yè)務(wù)邏輯都訂閱此消息试疙,完成發(fā)送。
2抠蚣、應(yīng)用解耦
還是上面的例子祝旷,如果在一個(gè)大型分布式網(wǎng)站中,用戶系統(tǒng)嘶窄、短信系統(tǒng)怀跛、郵件系統(tǒng)可能都是獨(dú)立的系統(tǒng)服務(wù)。
這時(shí)候柄冲,在用戶注冊(cè)成功后吻谋,你可以通過(guò)RPC遠(yuǎn)程調(diào)用不同的服務(wù)接口,但更好的做法還是通過(guò)消息隊(duì)列现横,訂閱自己感興趣的數(shù)據(jù),日后就算增加或者刪減功能舍悯,主業(yè)務(wù)都不用變動(dòng)肥荔。
3、流量削峰
一般在秒殺或者團(tuán)購(gòu)活動(dòng)中速种,流量激增,應(yīng)用面臨壓力過(guò)大低千∨湔螅可以在應(yīng)用前端加入消息隊(duì)列,通過(guò)設(shè)置隊(duì)列最大長(zhǎng)度來(lái)限制活動(dòng)人數(shù)示血。這時(shí)候闸餐,后端服務(wù)器就可以游刃有余的處理數(shù)據(jù)了。
二矾芙、消息通信
在AMQP協(xié)議中舍沙,有幾個(gè)基本概念,我們必須先搞明白剔宪。
1拂铡、Virtual host
虛擬主機(jī),每一個(gè)虛擬主機(jī)中包含所有的AMQP基本組件葱绒,用戶感帅、隊(duì)里、交換器等都是在虛擬主機(jī)里面創(chuàng)建地淀。典型的用法是失球,如果公司的多個(gè)產(chǎn)品只想用一個(gè)服務(wù)器,就可以把他們劃分到不同的虛擬主機(jī)中帮毁,里面的任何信息都是獨(dú)立存在实苞,互不干擾。
2烈疚、Connection
連接黔牵,應(yīng)用程序和服務(wù)器之間的TCP連接。
3爷肝、Channel
通道猾浦,當(dāng)你的應(yīng)用程序和服務(wù)器連接之后,就會(huì)創(chuàng)建TCP連接灯抛。一旦打開了TCP連接金赦,就可以創(chuàng)建一個(gè)Channel通道,所以說(shuō)Channel通道是一個(gè)TCP連接的內(nèi)部邏輯單元对嚼。
這是因?yàn)榧锌梗瑒?chuàng)建和銷毀TCP連接是比較昂貴的開銷,每一次訪問(wèn)都建立新的TCP連接的話猪半,不僅是巨大浪費(fèi)兔朦,而且還容易造成系統(tǒng)性能瓶頸偷线。
4、Queue
隊(duì)列沽甥,所有的消息最終都會(huì)被送到這里声邦,等待著被感興趣的人取走。
5摆舟、Exchange
交換器亥曹,消息到達(dá)服務(wù)的第一站就是交換器,然后根據(jù)分發(fā)規(guī)則恨诱,匹配路由鍵媳瞪,將消息放到對(duì)應(yīng)隊(duì)列中。值得注意的是照宝,交換器的類型不止一種蛇受。
Direct
直連交換器,只有在消息中的路由鍵和綁定關(guān)系中的鍵一致時(shí)厕鹃,交換器才把消息發(fā)到相應(yīng)隊(duì)列Fanout
廣播交換器兢仰,只要消息被發(fā)送到廣播交換器,它會(huì)將消息發(fā)到所有的隊(duì)列Topic
主題交換器剂碴,根據(jù)路由鍵把将,通配規(guī)則(*和#),將消息發(fā)到相應(yīng)隊(duì)列
6忆矛、Binding
綁定察蹲,交換器和隊(duì)列之間的綁定關(guān)系,綁定中就包含路由鍵催训,綁定信息被保存到交換器的查詢表中洽议,交換器根據(jù)它分發(fā)消息。
了解到這些組件相關(guān)概念后瞳腌,我們總結(jié)一下來(lái)看看绞铃,一條消息在RabbitMQ中是如何流轉(zhuǎn)的镜雨。
三嫂侍、持久化和發(fā)送方確認(rèn)
1、持久化
事實(shí)上荚坞,上圖所示只是一個(gè)最基本的消息流轉(zhuǎn)過(guò)程挑宠,交換器和隊(duì)列這些組件還有一個(gè)比較重要的屬性:持久化。
默認(rèn)情況下颓影,重啟RabbitMQ服務(wù)器之后各淀,我們創(chuàng)建的交換器和隊(duì)列都會(huì)消失不見,當(dāng)然了诡挂,如果里面還有未來(lái)得及消費(fèi)的數(shù)據(jù)碎浇,也將難于幸免临谱。
持久化交換器和隊(duì)列,為的是在AMQP服務(wù)器重啟之后奴璃,重新創(chuàng)建它們并綁定關(guān)系悉默,在RabbitMQ中,設(shè)置durable屬性為true即可苟穆。
不過(guò)抄课,除了這些還不夠。雖然保證了交換器和隊(duì)列是安全的雳旅,但那些還未來(lái)得及消費(fèi)的數(shù)據(jù)就變得岌岌可危跟磨。所以,我們還要設(shè)置消息的投遞模式為持久的攒盈。
這樣抵拘,如果RabbitMQ服務(wù)器重啟的話,我們的策略和相關(guān)數(shù)據(jù)才會(huì)確保無(wú)憂型豁。所以仑濒,我們說(shuō)能從AMQP服務(wù)器崩潰中恢復(fù)的消息,稱之為持久化消息偷遗。那么墩瞳,它必須保證以下三點(diǎn):
- 設(shè)置投遞模式為持久的
- 交換器為持久的
- 隊(duì)列為持久的
2、發(fā)送方確認(rèn)
到目前為止氏豌,我們已經(jīng)保證了消息的安全性喉酌。但是,還有另外一個(gè)問(wèn)題泵喘。由于發(fā)布操作是不返回任何信息給生產(chǎn)者的泪电,我們?cè)趺粗婪?wù)器是否正確接收消息并持久化到硬盤上了呢?
為此纪铺,我們可以將通道設(shè)置為事務(wù)模式相速。事務(wù)是AMQP標(biāo)準(zhǔn)中的一部分,但RabbitMQ有更好的做法鲜锚,那就是發(fā)送方確認(rèn)模式突诬,publisher confirm。如果設(shè)置了confirm模式芜繁,發(fā)布的消息會(huì)被分配一個(gè)唯一的ID號(hào)旺隙,等消息被投遞給匹配的隊(duì)列后,通道會(huì)發(fā)送一個(gè)發(fā)送方確認(rèn)模式給生產(chǎn)者(包含消息的唯一ID)骏令。
四蔬捷、與Spring整合實(shí)例
廢話了這么多,只是為了下面的代碼部分做下鋪墊榔袋。畢竟周拐,了解到上面內(nèi)容之后铡俐,代碼其實(shí)已經(jīng)快要躍然紙上了。
1妥粟、配置文件
配置文件中我們首先要聲明RabbitMQ服務(wù)器的信息高蜂,IP地址、端口號(hào)罕容、用戶名密碼等备恤,但尤為重要的是,設(shè)置發(fā)布確認(rèn)模式锦秒。
<bean id="rabbitConnectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="127.0.0.1"/>
<property name="username" value="shiqizhen"/>
<property name="password" value="shiqizhen"/>
<property name="port" value="5672"></property>
<property name="virtualHost" value="shiqizhen"></property>
<property name="publisherConfirms" value="true"></property>
<property name="publisherReturns" value="true"></property>
</bean>
接著露泊,還要聲明交換器和隊(duì)列,記得它們是持久化的哦旅择,durable為true惭笑。
<rabbit:admin connection-factory="rabbitConnectionFactory"/>
//隊(duì)列的名字、持久化生真、不要自動(dòng)刪除沉噩、不是獨(dú)享隊(duì)列
<rabbit:queue name="userInfoQueue" durable="true" auto-delete="false" exclusive="false"/>
//交換器,類型為direct柱蟀。并綁定交換器和隊(duì)列的關(guān)系川蒙,路由鍵為10086
<rabbit:direct-exchange name="user-exchange" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="userInfoQueue" key="10086"/>
</rabbit:bindings>
</rabbit:direct-exchange>
最后,配置消費(fèi)者和消息模板
//配置消費(fèi)者 ref為bean的引用 queues指明了消費(fèi)者與隊(duì)列的關(guān)系
//重要的是acknowledge 確認(rèn)模式為手動(dòng)確認(rèn)
<rabbit:listener-container connection-factory="rabbitConnectionFactory" acknowledge="manual">
<rabbit:listener ref="consumerListener" queues="userInfoQueue" method="onMessage" />
</rabbit:listener-container>
//配置Spring RabbitMQ消息模板
<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg ref="rabbitConnectionFactory"></constructor-arg>
<property name="confirmCallback" ref="publisherConfirm"></property>
<property name="returnCallback" ref="returnMsgCallBack"></property>
<property name="mandatory" value="true"></property>
</bean>
2长已、生產(chǎn)者
上面我們聲明了rabbitTemplate畜眨,直接用它的send方法發(fā)送消息即可。不過(guò)它有幾個(gè)參數(shù)必須先要了解下术瓮。
- exchange
交換器名稱康聂,消息發(fā)到哪個(gè)交換器上 - routingKey
路由鍵,交換器怎樣分發(fā)消息到對(duì)應(yīng)隊(duì)列 - Message
消息體對(duì)象胞四,它包含消息的主體和消息屬性恬汁。消息屬性包含很多附屬信息,比如消息內(nèi)容類型辜伟、消息ID氓侧、用戶ID等。 - CorrelationData
消息相關(guān)數(shù)據(jù)游昼,實(shí)際它只有一個(gè)ID的屬性甘苍。不過(guò)很重要,在發(fā)布方確認(rèn)的回調(diào)方法里烘豌,會(huì)帶有這個(gè)參數(shù)。我們可以根據(jù)它很直觀的看到哪條消息發(fā)送成功或失敗看彼。
@Controller
public class IndexController {
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("/send_msg")
@ResponseBody
public User send_msg() {
String exchange = "user-exchange";
String routingKey = "10086";
User user = new User();
String id = IdUtil.getId();
user.setUid(id);
user.setUsername("小小沙彌");
user.setPassword("1234");
user.setCreatetime(DateUtil.getDateTime(new Date()));
CorrelationData correlation = new CorrelationData(id);
Message message = new Message(JSONObject.toJSONBytes(user, SerializerFeature.WriteNullStringAsEmpty), new MessageProperties());
logger.info("已發(fā)送消息到RabbitMQ服務(wù)器:{}",JSONObject.toJSONString(user));
rabbitTemplate.send(exchange, routingKey,message,correlation);
return user;
}
}
3廊佩、消費(fèi)者
消費(fèi)者就是上面我們配置的listener ref引用的Bean囚聚。還記得我們把確認(rèn)模式設(shè)置了手動(dòng)確認(rèn),所以在消費(fèi)者端有個(gè)很重要的動(dòng)作标锄,就是確認(rèn)消息顽铸。
- channel.basicAck(deliveryTag, false)
第一個(gè)參數(shù)是RabbitMQ內(nèi)部產(chǎn)生的消息ID,第二個(gè)參數(shù)代表是否批量確認(rèn)消息料皇。通過(guò)這個(gè)指令我們告訴生產(chǎn)者端谓松,消息已經(jīng)被正確消費(fèi)了,RabbitMQ就會(huì)將此消息在磁盤上刪除践剂。 - channel.basicReject(deliveryTag, false)
拒絕消息鬼譬。如果消費(fèi)到的消息不是我們想要的,或者處理的時(shí)候報(bào)錯(cuò)逊脯,我們可以將消息拒絕优质。但值得注意的是第二個(gè)參數(shù)。如果設(shè)置為false军洼,說(shuō)明拒絕消息并將消息從服務(wù)器上刪除巩螃;如果設(shè)置為true,說(shuō)明拒絕消息并將消息重新放回隊(duì)列匕争。如果你的消費(fèi)者只有一個(gè)避乏,最好不要把它設(shè)置為true,否則消息會(huì)一直重試甘桑,直到把消費(fèi)者端服務(wù)器搞死淑际。如果因?yàn)樘幚硎《芙^的話,最好將消息刪除扇住,同時(shí)將消息記錄到日志文件或者數(shù)據(jù)庫(kù)中春缕。
@Service
public class ConsumerListener implements ChannelAwareMessageListener{
Logger logger = LoggerFactory.getLogger(this.getClass());
public void onMessage(Message message, Channel channel) throws Exception {
logger.info("消費(fèi)者監(jiān)聽到RabbitMQ消息...");
MessageProperties properties = message.getMessageProperties();
String msg = new String(message.getBody(),"utf-8");
logger.info("交換器:{},路由鍵:{}",properties.getReceivedExchange(),properties.getReceivedRoutingKey());
logger.info("消息內(nèi)容:{}",msg);
long deliveryTag = properties.getDeliveryTag();
channel.basicAck(deliveryTag, false);//確認(rèn)信息,false為不批量確認(rèn)
//channel.basicReject(deliveryTag, true);//true為重入隊(duì)列 false為刪除消息
}
}
4、發(fā)送方確認(rèn)
我們發(fā)送消息給RabbitMQ艘蹋,第一站就是交換器锄贼。RabbitMQ是否能正確接收消息,我們就靠它來(lái)反饋女阀。這里的CorrelationData就是在生產(chǎn)者端設(shè)置的宅荤,我們可以將它當(dāng)成消息ID,也可以直接把消息寫入這里浸策。
@Component
public class PublisherConfirm implements ConfirmCallback{
Logger logger = LoggerFactory.getLogger(this.getClass());
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
logger.info("消息投遞成功!");
}else {
logger.warn("消息投遞失敗,原因:{}冯键,消息ID:{}",cause,correlationData.getId());
}
}
}
如果我們把交換器的名字寫錯(cuò),那么在這里庸汗,你就會(huì)得到以下信息:
22:57:51,635 WARN PublisherConfirm:19 - 消息投遞失敗,原因:
channel error; protocol method: #method<channel.close>
(reply-code=404, reply-text=NOT_FOUND - no exchange 'user-exchange_xxx' in vhost 'shiqizhen', class-id=60, method-id=40)惫确,
消息ID:516387069669408768
22:57:51,638 ERROR CachingConnectionFactory:1278 - Channel shutdown:
channel error; protocol method:
#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'user-exchange_xxx' in vhost 'shiqizhen', class-id=60, method-id=40)
5、返回回調(diào)
除了設(shè)置RabbitMQ的發(fā)送方確認(rèn),在Spring中還有一個(gè)publisherReturns值的我們注意改化。雖然我們將消息發(fā)送到了交換器掩蛤,但交換器是否能正確將消息分發(fā)到對(duì)應(yīng)隊(duì)列,還要打個(gè)問(wèn)號(hào)陈肛。如果消息無(wú)法發(fā)送到指定的隊(duì)列揍鸟,那么publisherReturns就會(huì)發(fā)揮作用。記住句旱,如果想應(yīng)用這個(gè)特性阳藻,需要將mandatory設(shè)置為true。
@Component
public class ReturnMsgCallBack implements ReturnCallback{
Logger logger = LoggerFactory.getLogger(this.getClass());
public void returnedMessage(Message message, int replyCode,
String replyText, String exchange, String routingKey) {
logger.info("消息內(nèi)容:{}",new String(message.getBody()));
logger.info("回復(fù)文本:{},回復(fù)代碼:{}",replyText,replyCode);
logger.info("交換器名稱:{},路由鍵:{}",exchange,routingKey);
}
}
如果我們不小心寫錯(cuò)了路由鍵的名字谈撒,那就會(huì)調(diào)用到這里腥泥。
23:24:27,813 INFO ReturnMsgCallBack:16 - 消息內(nèi)容:{"createtime":"2018-11-25 23:24:24","password":"1234","role":null,"uid":"516393749815754752","username":"小小沙彌"}
23:24:27,814 INFO ReturnMsgCallBack:17 - 回復(fù)文本:NO_ROUTE,回復(fù)代碼:312
23:24:27,814 INFO ReturnMsgCallBack:18 - 交換器名稱:user-exchange,路由鍵:10086_xxx
//這里是發(fā)送方確認(rèn)打印的信息 說(shuō)投遞到交換器成功
23:24:27,814 INFO PublisherConfirm:17 - 消息投遞成功!
有個(gè)問(wèn)題,如同第一個(gè)例子港华,如果寫錯(cuò)了路由鍵的名稱道川,發(fā)送方確認(rèn)會(huì)打印ack為false的異常信息,但為什么不會(huì)調(diào)用到publisherReturns呢立宜?
如果路由鍵錯(cuò)誤冒萄,說(shuō)明消息壓根就沒(méi)有被接收到。這肯定是一個(gè)嚴(yán)重錯(cuò)誤橙数,所以RabbitMQ直接把當(dāng)前通道關(guān)閉了尊流。
Channel shutdown:
channel error; protocol method:
reply-code=404, reply-text=NOT_FOUND - no exchange 'user-exchange_xxx' in vhost ...
五、監(jiān)聽RabbitMQ服務(wù)器狀態(tài)
如果你的RabbitMQ服務(wù)不是一個(gè)集群灯帮,那么當(dāng)網(wǎng)絡(luò)故障或其他原因?qū)е翿abbitMQ服務(wù)停掉的時(shí)候崖技,我們?cè)趺醋瞿兀慨?dāng)然钟哥,你可以在Send方法中加入try/catch迎献,根據(jù)catch信息返回你的狀態(tài)。但有個(gè)更好的思路腻贰,可以結(jié)合使用吁恍。
在創(chuàng)建RabbitMQ服務(wù)連接的時(shí)候,我們要配置一個(gè)Bean播演,CachingConnectionFactory
它有個(gè)方法addConnectionListener
冀瓦,我們可以利用它來(lái)監(jiān)聽服務(wù)器的連接狀態(tài)。
public class RabbitMQConnectionListener implements ConnectionListener{
public void onCreate(Connection connection) {
System.out.println("服務(wù)器已啟動(dòng)...");
}
public void onClose(Connection connection) {
System.out.println("服務(wù)器已關(guān)閉...");
}
}
并在合適的位置写烤,比如Spring容器初始化方法里翼闽,加入這么一句rabbitConnectionFactory.addConnectionListener(new RabbitMQConnectionListener());
這樣,我們就可以掌握RabbitMQ服務(wù)器的連接狀態(tài)了洲炊,那么我們就可以根據(jù)此狀態(tài)感局,在生產(chǎn)者方調(diào)用send方法的時(shí)候尼啡,判斷此狀態(tài)。如果未連接蓝厌,可以先將消息保存到數(shù)據(jù)庫(kù)或者緩存中玄叠。當(dāng)連接到RabbitMQ古徒,我們先把緩存的消息拿出來(lái)發(fā)送拓提,再將此狀態(tài)重置為已連接。
六隧膘、總結(jié)
本文簡(jiǎn)單介紹了AMQP協(xié)議標(biāo)準(zhǔn)中的相關(guān)概念代态,以及RabbitMQ在Spring中如何正確配置使用持久化消息、發(fā)送方模式和返回回調(diào)等機(jī)制疹吃。并在最后蹦疑,介紹了在Spring中如何監(jiān)聽RabbitMQ的服務(wù)器連接狀態(tài)∪唬總而言之一句話歉摧,我們將要怎樣使用RabbitMQ叁温,才能保證消息不會(huì)丟失。希望本文對(duì)你使用RabbitMQ有所幫助!