1 消息隊(duì)列中間件簡介
消息隊(duì)列中間件是分布式系統(tǒng)中重要的組件,主要解決應(yīng)用耦合,異步消息,流量削鋒等問題實(shí)現(xiàn)高性能阁将,高可用,可伸縮和最終一致性[架構(gòu)] 使用較多的消息隊(duì)列有ActiveMQ右遭,RabbitMQ做盅,ZeroMQ,Kafka窘哈,MetaMQ吹榴,RocketMQ
以下介紹消息隊(duì)列在實(shí)際應(yīng)用中常用的使用場景:異步處理,應(yīng)用解耦宵距,流量削鋒和消息通訊四個(gè)場景
1.2什么是RabbitMQ
RabbitMQ 是一個(gè)由 Erlang 語言開發(fā)的AMQP 的開源實(shí)現(xiàn)腊尚。
AMQP :Advanced Message Queue,高級(jí)消息隊(duì)列協(xié)議满哪。它是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn)婿斥,為面向消息的中間件設(shè)計(jì),基于此協(xié)議的客戶端與消息中間件可傳遞消息哨鸭,并不受產(chǎn)品民宿、開發(fā)語言等條件的限制。
RabbitMQ 最初起源于金融系統(tǒng)像鸡,用于在分布式系統(tǒng)中存儲(chǔ)轉(zhuǎn)發(fā)消息活鹰,在易用性、擴(kuò)展性只估、高可用性等方面表現(xiàn)不俗志群。具體特點(diǎn)包括:
1.可靠性(Reliability)
RabbitMQ 使用一些機(jī)制來保證可靠性,如持久化蛔钙、傳輸確認(rèn)锌云、發(fā)布確認(rèn)。
2.靈活的路由(Flexible Routing)
在消息進(jìn)入隊(duì)列之前吁脱,通過Exchange 來路由消息的桑涎。對(duì)于典型的路由功能彬向,RabbitMQ已經(jīng)提供了一些內(nèi)置的Exchange 來實(shí)現(xiàn)。針對(duì)更復(fù)雜的路由功能攻冷,可以將多個(gè)Exchange 綁定在一起娃胆,也通過插件機(jī)制實(shí)現(xiàn)自己的Exchange 。
3.消息集群(Clustering)
多個(gè)RabbitMQ 服務(wù)器可以組成一個(gè)集群等曼,形成一個(gè)邏輯Broker 里烦。
4.高可用(Highly Available Queues)
隊(duì)列可以在集群中的機(jī)器上進(jìn)行鏡像,使得在部分節(jié)點(diǎn)出問題的情況下隊(duì)列仍然可用禁谦。
5.多種協(xié)議(Multi-protocol)
RabbitMQ 支持多種消息隊(duì)列協(xié)議招驴,比如STOMP、MQTT 等等枷畏。
6.多語言客戶端(Many Clients)
RabbitMQ 幾乎支持所有常用語言,比如Java虱饿、.NET拥诡、Ruby 等等。
7.管理界面(Management UI)
RabbitMQ 提供了一個(gè)易用的用戶界面氮发,使得用戶可以監(jiān)控和管理消息Broker 的許多方面渴肉。
8.跟蹤機(jī)制(Tracing)
如果消息異常,RabbitMQ 提供了消息跟蹤機(jī)制爽冕,使用者可以找出發(fā)生了什么仇祭。
9.插件機(jī)制(Plugin System)
RabbitMQ 提供了許多插件,來從多方面進(jìn)行擴(kuò)展颈畸,也可以編寫自己的插件乌奇。
1.3.2?主要概念
RabbitMQ Server:也叫broker server,它是一種傳輸服務(wù)眯娱。他的角色就是維護(hù)一條從Producer到Consumer的路線礁苗,保證數(shù)據(jù)能夠按照指定的方式進(jìn)行傳輸。
Producer:消息生產(chǎn)者徙缴,如圖A试伙、B、C于样,數(shù)據(jù)的發(fā)送方疏叨。消息生產(chǎn)者連接RabbitMQ服務(wù)器然后將消息投遞到Exchange。
Consumer:消息消費(fèi)者穿剖,如圖1蚤蔓、2、3携御,數(shù)據(jù)的接收方昌粤。消息消費(fèi)者訂閱隊(duì)列既绕,RabbitMQ將Queue中的消息發(fā)送到消息消費(fèi)者。
Exchange:生產(chǎn)者將消息發(fā)送到Exchange(交換器)涮坐,由Exchange將消息路由到一個(gè)或多個(gè)Queue中(或者丟棄)凄贩。Exchange并不存儲(chǔ)消息。RabbitMQ中的Exchange有direct袱讹、fanout疲扎、topic、headers四種類型捷雕,每種類型對(duì)應(yīng)不同的路由規(guī)則椒丧。
Queue:(隊(duì)列)是RabbitMQ的內(nèi)部對(duì)象,用于存儲(chǔ)消息救巷。消息消費(fèi)者就是通過訂閱隊(duì)列來獲取消息的壶熏,RabbitMQ中的消息都只能存儲(chǔ)在Queue中,生產(chǎn)者生產(chǎn)消息并最終投遞到Queue中浦译,消費(fèi)者可以從Queue中獲取消息并消費(fèi)棒假。多個(gè)消費(fèi)者可以訂閱同一個(gè)Queue,這時(shí)Queue中的消息會(huì)被平均分?jǐn)偨o多個(gè)消費(fèi)者進(jìn)行處理精盅,而不是每個(gè)消費(fèi)者都收到所有的消息并處理帽哑。
RoutingKey:生產(chǎn)者在將消息發(fā)送給Exchange的時(shí)候,一般會(huì)指定一個(gè)routing key叹俏,來指定這個(gè)消息的路由規(guī)則妻枕,而這個(gè)routing key需要與Exchange Type及binding key聯(lián)合使用才能最終生效。在Exchange Type與binding key固定的情況下(在正常使用時(shí)一般這些內(nèi)容都是固定配置好的)粘驰,我們的生產(chǎn)者就可以在發(fā)送消息給Exchange時(shí)屡谐,通過指定routing key來決定消息流向哪里。RabbitMQ為routing key設(shè)定的長度限制為255bytes晴氨。
Connection:(連接):Producer和Consumer都是通過TCP連接到RabbitMQ Server的康嘉。以后我們可以看到,程序的起始處就是建立這個(gè)TCP連接籽前。
Channels:(信道):它建立在上述的TCP連接中亭珍。數(shù)據(jù)流動(dòng)都是在Channel中進(jìn)行的。也就是說枝哄,一般情況是程序起始建立TCP連接肄梨,第二步就是建立這個(gè)Channel。
VirtualHost:權(quán)限控制的基本單位挠锥,一個(gè)VirtualHost里面有若干Exchange和MessageQueue众羡,以及指定被哪些user使用
3.3.2代碼實(shí)現(xiàn)
[if !supportLists](1)[endif]在UserService中新增方法,用于發(fā)送短信驗(yàn)證碼
@Autowired?
private?RedisTemplate?redisTemplate;
@Autowired?
private?RabbitTemplate rabbitTemplate;
/**
?* 發(fā)送短信驗(yàn)證碼
?* @param?mobile 手機(jī)號(hào)
?*/?
public?void?sendSms(String mobile){
//1.生成6位短信驗(yàn)證碼
Random random=new?Random();
int?max=999999;//最大數(shù)
int?min=100000;//最小數(shù)
int?code?= random.nextInt(max);//隨機(jī)生成
if(code?< min){
code=code+min;
}
System.out.println(mobile+"收到驗(yàn)證碼是:"+code);
//2.將驗(yàn)證碼放入redis?
redisTemplate.opsForValue().set("smscode_"+mobile, code+""?,5, TimeUnit.MINUTES?);//五分鐘過期
//3.將驗(yàn)證碼和手機(jī)號(hào)發(fā)動(dòng)到rabbitMQ中
Map<String,String> map=new?HashMap();
map.put("mobile",mobile);
map.put("code",code+"");
rabbitTemplate.convertAndSend("sms",map);
}
[if !supportLists](2)[endif]UserController新增方法
/**
* 發(fā)送短信驗(yàn)證碼
* @param mobile
*/
@RequestMapping(value="/sendsms/{mobile}",method=RequestMethod.POST)
public Result sendsms(@PathVariable String mobile ){
userService.sendSms(mobile);
return new Result(true,StatusCode.OK,"發(fā)送成功");
}