一、什么是消息隊列
? ? MQ (Message Quene) : 翻譯為 消息隊列 ,通過典型的 ?產(chǎn)者 和 消費者 模型,?產(chǎn)者不斷向消息隊列中?產(chǎn)消息,消費者不斷的從隊列中獲取消息腻惠。因為消息的?產(chǎn)和消費都是異步的,?且只關?消息的發(fā)送和接收哎壳,沒有業(yè)務邏輯的侵?,輕松的實現(xiàn)系統(tǒng)間解耦扎酷。別名為 消息中間件 通過利??效可靠的消息傳遞機制進?平臺?關的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來進?分布式系統(tǒng)的集成拂到。
二痪署、為什么要使用MQ
1.解耦
現(xiàn)在我有一個系統(tǒng)A,系統(tǒng)A可以產(chǎn)生一個userId,然后兄旬,現(xiàn)在有系統(tǒng)B和系統(tǒng)C都需要這個userId去做相關的操作,可以寫成如下操作
如果有一天惠桃,系統(tǒng)B的負責人告訴系統(tǒng)A的負責人,現(xiàn)在系統(tǒng)B的SystemBNeed2do(String userId)這個接口不再使用了辖试,讓系統(tǒng)A別去調(diào)它了辜王。那么就需要從代碼的基礎上去修改了。這樣緊密的耦合關系會導致很多麻煩罐孝,如果使用消息中間件就不會出現(xiàn)以上問題呐馆。
2.異步
我們再來看看下面這種情況:系統(tǒng)A還是直接調(diào)用系統(tǒng)B、C莲兢、D
假設系統(tǒng)A運算出userId具體的值需要50ms汹来,調(diào)用系統(tǒng)B的接口需要300ms,調(diào)用系統(tǒng)C的接口需要300ms改艇,調(diào)用系統(tǒng)D的接口需要300ms收班。那么這次請求就需要50+300+300+300=950ms
并且我們得知,系統(tǒng)A做的是主要的業(yè)務谒兄,而系統(tǒng)B摔桦、C、D是非主要的業(yè)務。比如系統(tǒng)A處理的是訂單下單邻耕,而系統(tǒng)B是訂單下單成功了鸥咖,那發(fā)送一條短信告訴具體的用戶此訂單已成功,而系統(tǒng)C和系統(tǒng)D也是處理一些小事而已兄世。
那么此時啼辣,為了提高用戶體驗和吞吐量,其實可以異步地調(diào)用系統(tǒng)B御滩、C鸥拧、D的接口。所以削解,我們可以弄成是這樣的:
系統(tǒng)A執(zhí)行完了以后住涉,將userId寫到消息隊列中,然后就直接返回了(至于其他的操作钠绍,則異步處理)舆声。
本來整個請求需要用950ms(同步)
現(xiàn)在將調(diào)用其他系統(tǒng)接口異步化,只需要100ms(異步)
3柳爽、削峰/限流
假設現(xiàn)在我們每個月要搞一次大促媳握,大促期間的并發(fā)可能會很高的,比如每秒3000個請求磷脯。假設我們現(xiàn)在有兩臺機器處理請求蛾找,并且每臺機器只能每次處理1000個請求。
那多出來的1000個請求赵誓,可能就把我們整個系統(tǒng)給搞崩了...所以打毛,有一種辦法,我們可以寫到消息隊列中:
系統(tǒng)B和系統(tǒng)C根據(jù)自己的能夠處理的請求數(shù)去消息隊列中拿數(shù)據(jù)俩功,這樣即便有每秒有8000個請求幻枉,那只是把請求放在消息隊列中,去拿消息隊列的消息由系統(tǒng)自己去控制诡蜓,這樣就不會把整個系統(tǒng)給搞崩熬甫。
三、常用的消息中間件
當今市?上有很多主流的消息中間件蔓罚,如?牌的 ActiveMQ 椿肩、 RabbitMQ ,炙?可熱的Kafka 豺谈,阿?巴巴?主開發(fā) RocketMQ 等郑象。
#1.ActiveMQ
ActiveMQ 是Apache出品,最流?的茬末,能?強勁的開源消息總線厂榛。它是?個完全?持JMS規(guī)范的的消息中間件。豐富的API,多種集群架構(gòu)模式讓ActiveMQ在業(yè)界成為?牌的消息中間件,在中?型企業(yè)頗受歡迎!
# 2.Kafka
Kafka是LinkedIn開源的分布式發(fā)布-訂閱消息系統(tǒng),?前歸屬于Apache頂級項?噪沙。Kafka主要特點是基于Pull的模式來處理消息消費,追求?吞吐量吐根,?開始的?的就是?于?志收集和傳輸正歼。0.8版本開始?持復制,不?持事務拷橘,對消息的重復局义、丟失、錯誤沒有嚴格要求冗疮,適合產(chǎn)??量數(shù)據(jù)的互聯(lián)?服務的數(shù)據(jù)收集業(yè)務萄唇。
# 3.RocketMQ
RocketMQ是阿?開源的消息中間件,它是純Java開發(fā)术幔,具有?吞吐量另萤、?可?性、適合?規(guī)模分
布式系統(tǒng)應?的特點诅挑。RocketMQ思路起源于Kafka四敞,但并不是Kafka的?個Copy,它
對消息的可靠傳輸及事務性做了優(yōu)化拔妥,?前在阿?集團被?泛應?于交易忿危、充值、流計算没龙、消
息推送铺厨、?志流式處理、binglog分發(fā)等場景硬纤。
# 4.RabbitMQ
RabbitMQ?Kafka可靠解滓,Kafka更適合IO?吞吐的處理,?般應?在?數(shù)據(jù)?志處理或?qū)崟r性(少量延遲)筝家,可靠性(少量丟數(shù)據(jù))要求稍低的場景使?伐蒂,?如ELK?志收集。
四肛鹏、RabbitMQ
基于 AMQP 協(xié)議逸邦,erlang語?開發(fā),是部署最?泛的開源消息中間件,是最受歡迎的開源消息中間件之?在扰。
官?教程 :https://www.rabbitmq.com/#getstarted
1.AMQP 協(xié)議
AMQP(advanced message queuingprotocol)`在2003年時被提出缕减,最早?于解決?融領不同平臺之間的消息傳遞交互問題。顧名思義芒珠,AMQP是?種協(xié)議桥狡,更準確的說是?種binary wire-level protocol(鏈接協(xié)議)。這是其和JMS的本質(zhì)差別,AMQP不從API層進?限定裹芝,?是直接定義?
絡交換的數(shù)據(jù)格式部逮。這使得實現(xiàn)了AMQP的provider天然性就是跨平臺的。以下是AMQP協(xié)議模型:
2.RabbitMQ 的安裝
下載地址:https://www.rabbitmq.com/download.html
注意:處理要下載rabbitmq的按轉(zhuǎn)包外還要下載ellong環(huán)境
3.安裝步驟
先安裝安裝Erlang嫂易,再安裝rabbitmq.exe,直接下一步兄朋,傻瓜式安裝,安裝完成點擊開始會出現(xiàn)這里會出現(xiàn)啟動怜械、停止颅和、重新安裝等
1.點擊
2.輸入命令:
rabbitmq-plugins enable rabbitmq_management
3.打開瀏覽器控制臺
默認賬號guest guest
如果不能訪問,
文件夾為隱藏 需要再文件夾選項中把隱藏文件夾打開顯示
C:\Users\XYH\AppData\Roaming\RabbitMQ\db里面的數(shù)據(jù)刪除再次安裝一下Rabbitmq.exe
然后執(zhí)行
rabbitmq-plugins enable rabbitmq_management
就可以訪問到了
五、web管理界?介紹
1.簡介
connections:?論?產(chǎn)者還是消費者缕允,都需要與RabbitMQ建?連接后才可以完成消息的?產(chǎn)和消費峡扩,在這?可以查看連接情況
channels:通道,建?連接后障本,會形成通道教届,消息的投遞獲取依賴通道。
Exchanges:交換機驾霜,?來實現(xiàn)消息的路由
Queues:隊列巍佑,即消息隊列,消息存放在隊列中寄悯,等待消費萤衰,消費后被移除隊列。
admin:用戶
2.Admin?戶和虛擬主機管理
a.添加用戶
上?的Tags選項猜旬,其實是指定?戶的??脆栋,可選的有以下?個:
==》超級管理員(administrator):可登陸管理控制臺,可查看所有的信息洒擦,并且可以對?戶椿争,策略(policy)進?操作。
==》監(jiān)控者(monitoring):可登陸管理控制臺熟嫩,同時可以查看rabbitmq節(jié)點的相關信息(進程數(shù)秦踪,內(nèi)存
使?情況,磁盤使?情況等)
==》策略制定者(policymaker):可登陸管理控制臺, 同時可以對policy進?管理掸茅。但?法查看節(jié)點的相關信息(上圖紅框標識的部分)椅邓。
==》普通管理者(management):僅可登陸管理控制臺,?法看到節(jié)點信息昧狮,也?法對策略進?理景馁。
==》其他:?法登陸管理控制臺,通常就是普通的?產(chǎn)者和消費者逗鸣。
b.創(chuàng)建虛擬主機
虛擬主機:為了讓各個?戶可以互不?擾的?作合住,RabbitMQ添加了虛擬主機(Virtual Hosts)的概念绰精。其實就是?個獨?的訪問路徑,不同?戶使?不同路徑透葛,各?有??的隊列笨使、交換機,互相不會影響僚害。
c硫椰、綁定虛擬主機
六、RabbitMQ 的第?個程序
a.AMQP協(xié)議
b贡珊、RabbitMQ?持的消息模型
c.引入依賴
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>
# 第?種模型(直連)
在上圖的模型中最爬,有以下概念:
P:?產(chǎn)者涉馁,也就是要發(fā)送消息的程序
C:消費者:消息的接受者门岔,會?直等待消息到來。
queue:消息隊列烤送,圖中紅?部分寒随。類似?個郵箱,可以緩存消息帮坚;?產(chǎn)者向其中投遞消息妻往,消費者從其中取出消息。
開發(fā)?產(chǎn)者
//創(chuàng)建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
//設置端ip號
connectionFactory.setHost("127.0.0.1");
//設置端口號
connectionFactory.setPort(5672);
//設置用戶名和密碼
connectionFactory.setUsername("xyh");
connectionFactory.setPassword("123");
//訪問的虛擬通道
connectionFactory.setVirtualHost("/vhost");
//連接
Connection connection = connectionFactory.newConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
//通道綁定對應的消息隊列
//參數(shù)1: 是否持久化 參數(shù)2:是否獨占隊列 參
//數(shù)3:是否?動刪除 參數(shù)4:其他屬性
channel.queueDeclare("hello",false,false,false,null);
//發(fā)布消息
channel.basicPublish("","hello",null,"hello".getBytes());
channel.close();
connection.close();
消費者
//創(chuàng)建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
//設置端ip號
connectionFactory.setHost("127.0.0.1");
//設置端口號
connectionFactory.setPort(5672);
//設置用戶名和密碼
connectionFactory.setUsername("xyh");
connectionFactory.setPassword("123");
//訪問的虛擬通道
connectionFactory.setVirtualHost("/vhost");
//連接
Connection connection = connectionFactory.newConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
channel.queueDeclare("hello",false,false,false,null);
//消費消息
channel.basicConsume("hello",true,new DefaultConsumer(channel){
? ? @Override
? ? public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
? ? ? ? System.out.println(new String(body));
? ? }
});
七试和、springboot實現(xiàn)簡單的rabbitmq
1.導入依賴
<dependency>
? ? <groupId>org.springframework.boot</groupId>
? ? <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.配置rabbitmq的配置信息
spring.application.name=rabbitmq-springboot
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=dt87
spring.rabbitmq.password=123
spring.rabbitmq.virtual-host=/dt87
3.消息生產(chǎn)者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test(){
? ? //直接獲取rabbitmq的模板對象
? ? rabbitTemplate.convertAndSend("hello","hello world");
}
4讯泣、消息的消費者
@Component
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class Consumer {
? ? //消費消息的方法
? ? @RabbitHandler
? ? public void test(String message){
? ? ? ? System.out.println(message);
? ? }
}
}