RabbitMQ進階攻略抹蚀,分析RabbitMQ的典型應用場景,詳解RabbitMQ的運作流程
0企垦、引言
上次課我們僅說了RabbitMQ是一種消息中間件环壤,可以異步的處理客戶端發(fā)送的消息,但是我們并沒有將什么是消息中間件钞诡,使用消息中間件會給我們的系統帶來怎樣的性能提升郑现?這次課我們就來說一下什么是消息中間件,我們通過一個案例來引出消息中間件荧降,說明消息中間件在我們這個系統中扮演什么樣重要的角色接箫?
1、什么是消息中間件
上節(jié)課我們僅說了什么是RabbitMQ朵诫,但并沒有說什么是消息中間件辛友,為什么要使用消息中間件?
1)問題引入:
假設我們現在需要設計這樣一個用戶注冊系統:用戶注冊完成后剪返,需要給用戶發(fā)送激活郵件瞎领,開通用戶賬號,記錄用戶IP随夸、用戶設備、時間等信息震放。
起初的設計:
2)但存在的問題是:
由于多個系統強耦合在一起宾毒,用戶注冊響應會非常慢,嚴重影響了用戶的體驗殿遂,當流量大的時候诈铛,性能會更差乙各。
3)引入消息中間件:
為了解決上述問題,我們引入消息中間件幢竹,來實現系統的解耦耳峦,多個系統間通過消息中間件進行異步通信,最終的設計圖如下:
即實現了系統解耦焕毫,又提升了系統響應的速度
4)消息中間件介紹:
消息中間件(Message Queue Middleware蹲坷,簡稱MQ)又稱為消息隊列,是指利用高效可靠的消息傳遞機制進行與平臺無關的數據交流邑飒,并基于數據通信來進行分布式系統的構建循签。
2、應用場景分析
1)異步通信
在很多時候疙咸,為了加快應用系統整體運轉速度县匠,并不需要立即響應某些請求,消息中間件提供了異步處理機制撒轮,允許將一些請求信息放入消息中間件中乞旦,但并不立即處理它,而是慢慢處理题山。在有限資源下兰粉,使用消息中間件能夠使系統性能從容倍增!
如:用戶注冊成功的郵件通知臀蛛;用戶購物下單的信息通知亲桦;大數據日志收集處理
2)削峰
以防突發(fā)劇增流量瞬間沖垮系統,使用消息中間件可以支撐突發(fā)訪問壓力
3)業(yè)務系統解耦
系統間的耦合關系太強浊仆,會對系統的設計產生束縛客峭,也會增加系統的復雜性,通過消息中間件可以更好的設計系統抡柿,是一個系統完成指定的功能舔琅,而不是將所有的功能融合在同一個系統中。
3洲劣、回顧下上節(jié)課的內容
關于上節(jié)課說的binding key备蚓、routing key,我這里再解釋下囱稽,有點難以理解
其實兩者是一樣的郊尝,只不過是不同的叫法而已,我們可以統一叫它routing key
4战惊、詳解RabbitMQ運作流程
相對于上節(jié)課講的概念流昏,又增加了virtual host、connection、channel
Virtual Host:?每個virtual host本質上都是一個RabbitMQ Server(但是一個server中可以有多個virtual host)况凉,擁有它自己若干的個Exchange谚鄙、Queue和bings rule等等。其實這是一個虛擬概念刁绒,類似于權限控制組闷营。Virtual Host是權限控制的最小粒度。
Connection:??就是一個TCP的連接知市。Producer和Consumer都是通過TCP連接到RabbitMQ Server的傻盟。接下來的實踐案例中我們就可以看到,producer和consumer與exchange的通信的前提是先建立TCP連接初狰。
僅僅創(chuàng)建了TCP連接莫杈,producer和consumer與exchange還是不能通信的。我們還需要為每一個Connection創(chuàng)建Channel奢入。
Channel:??它是建立在上述TCP連接之上的虛擬連接筝闹。數據傳輸都是在Channel中進行的。AMQP協議規(guī)定只有通過Channel才能執(zhí)行AMQP的命令腥光。一個Connection可以包含多個Channel关顷。
有人要問了,為什么要使用Channel呢武福,直接用TCP連接不就好了么议双?
對于一個消息服務器來說,它的任務是處理海量的消息捉片,當有很多線程需要從RabbitMQ中消費消息或者生產消息平痰,那么必須建立很多個connection,也就是許多個TCP連接伍纫。然而對于操作系統而言宗雇,建立和關閉TCP連接是非常昂貴的開銷,而且TCP的連接數也有限制莹规,頻繁的建立關閉TCP連接對于系統的性能有很大的影響赔蒲,如果遇到高峰,性能瓶頸也隨之顯現良漱。RabbitMQ采用類似NIO的做法舞虱,選擇TCP連接服用,不僅可以減少性能開銷母市,同時也便于管理矾兜。在TCP連接中建立Channel是沒有上述代價的,可以復用TCP連接患久。對于Producer或者Consumer來說椅寺,可以并發(fā)的使用多個Channel進行Publish或者Receive舶沿。
有實驗表明,在Channel中配并,1秒可以Publish10K的數據包。對于普通的Consumer或者Producer來說高镐,這已經足夠了溉旋。除非有非常大的流量時,一個connection可能會產生性能瓶頸嫉髓,此時就需要開辟多個connection观腊。
5、實戰(zhàn)演練生產算行、消費梧油,串講整個運作流程
代碼倉庫:https://gitee.com/jikeh/JiKeHCN-RELEASE.git
項目名:rabbitmqdemo
項目代碼中有詳細的運轉流程注釋,這里就不細講了……
查看Tcp連接:
查看信道連接:
6州邢、拓展:使用默認交換器完成最簡單的案例
代碼倉庫:https://gitee.com/jikeh/JiKeHCN-RELEASE.git
項目名:
生產者(producer)把消息發(fā)送到一個名為“hello”的隊列中儡陨。
消費者(consumer)從這個隊列中獲取消息。
(1)引入jar包
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>LATEST</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.6</version>
</dependency>
(2)生產者發(fā)送消息
0)聲明常量參數
privatestaticfinalStringQUUE_NAME="hello";
privatestaticfinalStringIP_ADDRESS="localhost";
privatestaticfinalintPORT=5672;//默認端口號為5672
?
privatestaticfinalStringUSER_NAME="guest";
privatestaticfinalStringUSER_PWD="guest";
1)創(chuàng)建和RabbitMQ的連接
//初始化TCP連接參數
ConnectionFactoryfactory=newConnectionFactory();
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername(USER_NAME);
factory.setPassword(USER_PWD);
factory.setVirtualHost("/");
?
//創(chuàng)建連接
Connectionconnection=factory.newConnection();
?
//創(chuàng)建信道
Channelchannel=connection.createChannel();
2)聲明隊列
/**
* 語法格式:queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
* 創(chuàng)建一個持久化量淌、非排他的骗村、非自動刪除的隊列
* exclusive:排他的 由該客戶端獨占,一般設為false
* autoDelete:消息消費完呀枢,該隊列就會被刪除胚股,一般為false
*/
channel.queueDeclare(QUUE_NAME,true,false,false,null);
3)發(fā)送消息
//發(fā)送一條持久化的消息
//MessageProperties.PERSISTENT_TEXT_PLAIN:是以純文本格式發(fā)送
Stringmessage="Hello World!";
channel.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
4)關閉連接,信道
//關閉信道
channel.close();
?
//關閉連接
connection.close();
(3)消費者獲取消息
0)聲明常量參數
privatestaticfinalStringQUUE_NAME="hello";
privatestaticfinalStringIP_ADDRESS="localhost";
privatestaticfinalintPORT=5672;//默認端口號為5672
?
privatestaticfinalStringUSER_NAME="guest";
privatestaticfinalStringUSER_PWD="guest";
1)創(chuàng)建和RabbitMQ的連接裙秋,并且聲明隊列
//初始化TCP連接參數
ConnectionFactoryfactory=newConnectionFactory();
factory.setUsername(USER_NAME);
factory.setPassword(USER_PWD);
?
Address[]addresses=newAddress[]{
newAddress(IP_ADDRESS,PORT)
};
?
//創(chuàng)建連接
Connectionconnection=factory.newConnection(addresses);
?
//創(chuàng)建信道
finalChannelchannel=connection.createChannel();
2)需要為隊列定義一個回調(callback)函數琅拌。當我們獲取到消息的時候,就會調用此回調函數
Consumerconsumer=newDefaultConsumer(channel) {
@Override
publicvoidhandleDelivery(StringconsumerTag,Envelopeenvelope,AMQP.BasicPropertiesproperties,byte[]body)throwsIOException{
System.out.printf("receive message:"+newString(body));
?
/**
* true to acknowledge all messages up to and including the supplied delivery tag; false to acknowledge just the supplied delivery tag.
* true:確認包括傳輸標記之前的所有信息摘刑;
* false:僅確認該傳輸標記的消息
*/
channel.basicAck(envelope.getDeliveryTag(),false);
?? }
};
3)我們需要告訴RabbitMQ這個回調函數將會從名為"hello"的隊列中接收消息
//channel.basicConsume(QUEUE_NAME, true, consumer);//自動確認消息
channel.basicConsume(QUUE_NAME,consumer);
4)關閉連接进宝,信道
channel.close();
connection.close();
多資料分享下載,請關注:
極客慧:http://www.jikeh.cn
QQ群:375412858
默認交換器泣侮,默認綁定關系:
The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted.
參看文章:https://www.cloudamqp.com/blog/2015-05-18-part1-rabbitmq-for-beginners-what-is-rabbitmq.html