1.消息模型
根據(jù)官方文檔得知,RabbitMQ有七種消息模型:
1.1 Hello World消息模型
1.1.1 介紹
翻譯成中文如下:
RabbitMQ是一個(gè)消息代理:它接受和轉(zhuǎn)發(fā)消息囤屹。你可以把它想象成一個(gè)郵局:當(dāng)你把你想寄出的郵件放進(jìn)一個(gè)郵箱里時(shí)觅廓,你可以確信郵件的收件人最終會(huì)收到郵件漓柑。在這個(gè)類比中竟宋,RabbitMQ是一個(gè)郵箱膳帕、一個(gè)郵局和一個(gè)郵遞員粘捎。
RabbitMQ與郵局的主要區(qū)別在于,它不處理紙張备闲,而是接受晌端、存儲(chǔ)和轉(zhuǎn)發(fā)二進(jìn)制的數(shù)據(jù)信息塊。
1.1.2 模型圖
-
P (Produce) 生產(chǎn)者恬砂,主要是生產(chǎn)消息咧纠,以及發(fā)送消息。說(shuō)白了就是一個(gè)發(fā)送消息的應(yīng)用程序
- 隊(duì)列 (queue) 隊(duì)列是RabbitMQ中的郵箱的名稱泻骤。盡管消息流經(jīng)RabbitMQ和您的應(yīng)用程序漆羔,但它們只能存儲(chǔ)在隊(duì)列中。隊(duì)列只受主機(jī)的內(nèi)存和磁盤限制的約束狱掂,它本質(zhì)上是一個(gè)大的消息緩沖區(qū)演痒。許多生產(chǎn)者可以將消息發(fā)送到一個(gè)隊(duì)列,而許多消費(fèi)者可以嘗試從一個(gè)隊(duì)列接收數(shù)據(jù)
-
C (consumer) 消費(fèi)和接受有著相似的含義趋惨。消費(fèi)者者是一個(gè)主要等待接收消息的程序
注意:生產(chǎn)者鸟顺、消費(fèi)者和代理不必駐留在同一主機(jī)上;事實(shí)上器虾,在大多數(shù)應(yīng)用程序中讯嫂,它們不必駐留在同一主機(jī)上。應(yīng)用程序也可以同時(shí)是生產(chǎn)者和消費(fèi)者兆沙。
1.1.3 代碼實(shí)現(xiàn)
接下來(lái)我們采用java語(yǔ)言編寫生產(chǎn)者程序欧芽,以及消費(fèi)者程序來(lái)感受一下其魅力。
生者者:Send
消費(fèi)者:Consumer
-
生產(chǎn)者代碼實(shí)現(xiàn):
-
模型圖:
-
由圖中可知葛圃,生產(chǎn)者不僅要生產(chǎn)消息千扔,還要將消息發(fā)送到指定隊(duì)列:
-
創(chuàng)建 springboot項(xiàng)目(wangzh-rabbitmq)
-
導(dǎo)入amqp依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
編寫工具類,用來(lái)獲取連接
package com.mq.rabbit.util; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class ConnectionUtil { public static Connection getConnection() throws Exception { // 1. 創(chuàng)建連接工廠库正,用來(lái)獲取連接 ConnectionFactory connectionFactory = new ConnectionFactory(); // 2. 設(shè)置基本信息 // 設(shè)置rabbitmq所在地址 connectionFactory.setHost("192.168.169.130"); // 設(shè)置用戶名曲楚,我們先前創(chuàng)建了一個(gè)wangzh的用戶 connectionFactory.setUsername("wangzh"); // 設(shè)置密碼 connectionFactory.setPassword("wangzh"); // 設(shè)置端口 這個(gè)端口是amqp協(xié)議的端口 connectionFactory.setPort(5672); return connectionFactory.newConnection(); } }
-
編寫發(fā)送端程序
package com.mq.rabbit.helloworld; import com.mq.rabbit.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { /** * 隊(duì)列名字 */ private static final String QUEUE_NAME="hello_word"; public static void main(String[] args) { try { // 1. 獲取連接 Connection connection = ConnectionUtil.getConnection(); /* * 2. 創(chuàng)建通道 * 生產(chǎn)者發(fā)送消息到隊(duì)列中需要借助通道 */ Channel channel = connection.createChannel(); /** * 3. (創(chuàng)建)聲明隊(duì)列 * 如果名字所對(duì)應(yīng)的隊(duì)列存在,那么就不存創(chuàng)建隊(duì)列褥符,而是去時(shí)使用現(xiàn)成對(duì)的隊(duì)列 * 如果名字對(duì)應(yīng)的對(duì)應(yīng)不存在洞渤,那么就去創(chuàng)建隊(duì)列 * 第一個(gè)參數(shù): 隊(duì)列的名字 * 第二個(gè)參數(shù): 是否聲明一個(gè)持久化隊(duì)列,true表示會(huì)將消息持久化 * 第三個(gè)參數(shù): 是否聲明一個(gè)獨(dú)占隊(duì)列(創(chuàng)建者可以使用的私有隊(duì)列属瓣,斷開后自動(dòng)刪除), true表示聲明成獨(dú)占隊(duì)列 * 第四個(gè)參數(shù): 是否聲明一個(gè)刪除隊(duì)列(消費(fèi)者客戶端連接斷開時(shí)是否自動(dòng)刪除隊(duì)列),true表示聲明成刪除隊(duì)列 * 第五個(gè)參數(shù): 隊(duì)列其他參數(shù) */ channel.queueDeclare(QUEUE_NAME,false,false,false,null); // 消息 String msg = "I am OK"; /** * 將消息存入隊(duì)列中 * 第一個(gè)參數(shù):使交換機(jī)的名字 我們后面再將交換機(jī) * 第二個(gè)參數(shù):隊(duì)列映射的路由key,我們后面再講 * 第三個(gè)參數(shù): 隊(duì)列消息其他屬性 * 第四個(gè)參數(shù): 發(fā)送消息的主體 */ channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); System.out.println("發(fā)送成功"); // 關(guān)閉資源 channel.close(); connection.close(); } catch (Exception e) { e.printStackTrace(); } } }
-
執(zhí)行程序
-
查看管理頁(yè)面
通過(guò)上圖我們可以看到當(dāng)生產(chǎn)者發(fā)送消息到隊(duì)列中時(shí)载迄,管理界面就能看到這個(gè)隊(duì)列,以及隊(duì)列里面的消息數(shù)抡蛙。
注意:我們只是在控制臺(tái)看到消息护昧,并不會(huì)去消費(fèi)這個(gè)消息。
-
編寫消費(fèi)者
package com.mq.rabbit.helloworld; import com.mq.rabbit.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer { private static final String QUEUE_NAME="hello_word"; public static void main(String[] args) throws Exception { // 1.獲取連接 Connection connection = ConnectionUtil.getConnection(); // 2.創(chuàng)建通道粗截,消費(fèi)者從隊(duì)列中獲取消息也是借助通道 Channel channel = connection.createChannel(); /* * 3.聲明隊(duì)列 * 如果隊(duì)列不存在就會(huì)創(chuàng)建隊(duì)列 * 由于我們?cè)谏a(chǎn)者者那邊已經(jīng)創(chuàng)建好了隊(duì)列 * 那么消費(fèi)者這邊就不會(huì)創(chuàng)建隊(duì)列 */ channel.queueDeclare(QUEUE_NAME,false,false,false,null); /* * 4. 監(jiān)聽隊(duì)列惋耙,如果隊(duì)列中有消息,就直接拿過(guò)來(lái) * 第一個(gè)參數(shù):隊(duì)列名字 * 第二個(gè)參數(shù):是否進(jìn)行消息自動(dòng)確認(rèn)熊昌,后面我們講ack參數(shù)時(shí)再說(shuō) * 第三個(gè)參數(shù):回調(diào)對(duì)象绽榛,從隊(duì)列中主動(dòng)獲取消息 */ channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){ /* * consumerTag:消費(fèi)者標(biāo)簽與消費(fèi)者相關(guān) * envelope:消息的打包數(shù)據(jù) * properties:消息的頭部數(shù)據(jù) * body:消息主體 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(consumerTag); System.out.println(envelope); System.out.println(properties); System.out.println("消費(fèi)的消息:" + new String(body)); } }); } }
-
執(zhí)行結(jié)果
由上圖可知:當(dāng)消息被消費(fèi)后,隊(duì)列里面就沒有這一條消息了婿屹。同時(shí)消費(fèi)者的應(yīng)用程序并沒有停止灭美,而是一致在運(yùn)行著,一致在監(jiān)聽隊(duì)列昂利。
自此我們一個(gè)簡(jiǎn)單的Helloword消息模型就寫完了
1.1.4 ACK 機(jī)制
我們來(lái)思考一下有沒有上述的例子什么問(wèn)題届腐??蜂奸?
1.消費(fèi)者當(dāng)消費(fèi)消息后犁苏,MQ就會(huì)把隊(duì)列中的消息刪除,那么MQ怎么就知道消息被消費(fèi)了呢扩所?
2.當(dāng)消費(fèi)者領(lǐng)取消息后围详,還沒有消費(fèi)就掛掉了,或者是發(fā)生異常祖屏,那么MQ就無(wú)法得知消息有沒有被消費(fèi)掉助赞。
為了解決上述問(wèn)題,RabbitMQ提供了一個(gè)消息確認(rèn)機(jī)制(ACK機(jī)制)赐劣,當(dāng)消費(fèi)者把隊(duì)列中的消息消費(fèi)以后嫉拐,會(huì)向Rabbi發(fā)送一個(gè)ACK
,告訴MQ消息已經(jīng)被消費(fèi)了,你可以把消息刪除了魁兼。
不過(guò)這種發(fā)送ACK
有兩種方式:
-
自動(dòng)發(fā)送ACK:消息一旦被接收婉徘,自動(dòng)向MQ發(fā)送ACK
-
代碼實(shí)現(xiàn):
如圖,當(dāng)設(shè)置為true
時(shí)咐汞,就會(huì)當(dāng)消費(fèi)者消費(fèi)完消息盖呼,自動(dòng)的向發(fā)送ACK -
缺陷:
為了演示我先向MQ中發(fā)送一條消息:
接下來(lái)修改我們消費(fèi)者的代碼:
-
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
int a = 10 / 0;
System.out.println("消費(fèi)的消息:" + new String(body));
}
運(yùn)行結(jié)果:
我們發(fā)現(xiàn)在消費(fèi)消息之前,拋出了異常化撕,也就是說(shuō)我們消息還沒有被消費(fèi)几晤,此時(shí)MQ就把隊(duì)列中的消息給刪除了。說(shuō)明消息丟失了植阴。
-
手動(dòng)ACK:消息接收后蟹瘾,不會(huì)自動(dòng)發(fā)送ACK圾浅,需要手動(dòng)發(fā)送
-
準(zhǔn)備工作
為了演示,我們向MQ中發(fā)送一條消息
-
修改消費(fèi)者代碼
package com.mq.rabbit.helloworld; import com.mq.rabbit.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer { private static final String QUEUE_NAME = "hello_word"; public static void main(String[] args) throws Exception { // 1.獲取連接 Connection connection = ConnectionUtil.getConnection(); // 2.創(chuàng)建通道憾朴,消費(fèi)者從隊(duì)列中獲取消息也是借助通道 Channel channel = connection.createChannel(); /* * 3.聲明隊(duì)列 * 如果隊(duì)列不存在就會(huì)創(chuàng)建隊(duì)列 * 由于我們?cè)谏a(chǎn)者者那邊已經(jīng)創(chuàng)建好了隊(duì)列 * 那么消費(fèi)者這邊就不會(huì)創(chuàng)建隊(duì)列 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); /* * 4. 監(jiān)聽隊(duì)列狸捕,如果隊(duì)列中有消息,就直接拿過(guò)來(lái) * 第一個(gè)參數(shù):隊(duì)列名字 * 第二個(gè)參數(shù):是否進(jìn)行消息自動(dòng)確認(rèn),false代表不再向MQ發(fā)送ACK * 第三個(gè)參數(shù):回調(diào)對(duì)象众雷,從隊(duì)列中主動(dòng)獲取消息 */ channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) { /* * consumerTag:消費(fèi)者標(biāo)簽與消費(fèi)者相關(guān) * envelope:消息的打包數(shù)據(jù) * properties:消息的頭部數(shù)據(jù) * body:消息主體 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費(fèi)的消息:" + new String(body)); /* * 1. 第一個(gè)參數(shù)是 傳輸?shù)臉?biāo)簽 * 2. 是否要確認(rèn)所有的消息 true:確認(rèn)所有信息灸拍,包括提供的傳輸標(biāo)簽 * false: 僅確認(rèn)提供的傳輸標(biāo)簽 */ channel.basicAck(envelope.getDeliveryTag(),false); } }); } }
這樣就實(shí)現(xiàn)了手動(dòng)發(fā)送
ACK
-
-
對(duì)比
上述可知,兩種發(fā)送ACK的方式砾省。那么我們到底用哪種方式:
- 如果消息不是特別重要鸡岗,即使丟失了對(duì)系統(tǒng)沒有什么影響,那么采用ACK比較方便
- 如果消息非常重要编兄,不允許丟失轩性,那么最好選擇手動(dòng)發(fā)送ACK。
2.work模型
work模型稱為:工作隊(duì)列模式
2.1介紹
2.1.1 模型圖
2.1.2 官方介紹
大概意思如下:
在第一個(gè)教程中翻诉,我們編寫了從命名隊(duì)列發(fā)送和接收消息的程序炮姨。在本例中,我們將創(chuàng)建一個(gè)工作隊(duì)列碰煌,用于在多個(gè)工人之間分發(fā)耗時(shí)的任務(wù)舒岸。
工作隊(duì)列(也稱為任務(wù)隊(duì)列)背后的主要思想是避免立即執(zhí)行資源密集型任務(wù),而必須等待它完成芦圾。相反蛾派,我們把任務(wù)安排在以后完成。我們將任務(wù)封裝為消息并將其發(fā)送到隊(duì)列个少。在后臺(tái)運(yùn)行的工作進(jìn)程將彈出任務(wù)并最終執(zhí)行作業(yè)洪乍。當(dāng)您運(yùn)行許多工作人員時(shí),任務(wù)將在他們之間共享夜焦。
這個(gè)概念在web應(yīng)用程序中特別有用壳澳,因?yàn)樵诙痰腍TTP請(qǐng)求窗口中無(wú)法處理復(fù)雜的任務(wù)。
接下來(lái)我們用java代碼去模擬這個(gè)過(guò)程:
P 生產(chǎn)者: 發(fā)布任務(wù)(生產(chǎn)消息)
C1 消費(fèi)者1: 獲取任務(wù)并完成任務(wù)
C2 消費(fèi)者2: 獲取任務(wù)并完成任務(wù)
2.2 編碼實(shí)現(xiàn)
2.2.1 生產(chǎn)者
-
代碼
package com.mq.rabbit.work; import com.mq.rabbit.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer { private static final String QUEUE_NAME = "hello_work"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); /* * 生產(chǎn)者發(fā)布20個(gè)任務(wù) */ for (int i = 1; i <= 20; i++) { String msg = "hello work: " + i; channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); } channel.close(); connection.close(); } }
2.2.2 消費(fèi)者1
-
代碼
package com.mq.rabbit.work; import com.mq.rabbit.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer1 { private static final String QUEUE_NAME = "hello_work"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { System.out.println("消費(fèi)者1:" + new String(body)); // 耗時(shí)操作 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag(),false); } }); } }
2.2.3 消費(fèi)者2
-
代碼
package com.mq.rabbit.work; import com.mq.rabbit.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer2 { private static final String QUEUE_NAME = "hello_work"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費(fèi)者2:" + new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }); } }
2.2.4 結(jié)果分析
-
先執(zhí)行消費(fèi)者1和消費(fèi)者2茫经,然后再執(zhí)行生產(chǎn)者
生產(chǎn)者總共發(fā)布了20條消息巷波,其中消費(fèi)者1和消費(fèi)者2分別消費(fèi)了10條。這就是工作隊(duì)列機(jī)制卸伞,將消息數(shù)平分給不同的消費(fèi)者去消費(fèi)抹镊。
2.2.5 存在的問(wèn)題
通過(guò)上述例子發(fā)現(xiàn)以下幾個(gè)問(wèn)題:
消費(fèi)者1去處理消息比較耗時(shí),消費(fèi)者2處理的消息比較快荤傲。但是他們處理的消息量是一樣垮耳。
當(dāng)消費(fèi)者2處理完成以后,一直處于空閑狀態(tài),而消息1卻一直在忙碌
這明顯是不合理的终佛。按照正確的做法應(yīng)該是消費(fèi)者2處理消息快俊嗽,多分配一些消息去處理。消費(fèi)者1處理消息慢就少分配一些消息铃彰,能者多勞乌询。那么該怎么去實(shí)現(xiàn)呢?
RabbitMQ中提供了一個(gè)basicQos
方法以及 prefetchCount=1
設(shè)置豌研。其功能就是告訴MQ一次不要向消費(fèi)者發(fā)送多條消息,等消息者把消息處理并確認(rèn)完成唬党。才會(huì)再次發(fā)送下一條消息鹃共。相反,如果消費(fèi)者還是處于忙率中驶拱,那么MQ就會(huì)把消息分派給不是很忙碌的消費(fèi)者霜浴。
2.2.6 改造消費(fèi)者
-
代碼
package com.mq.rabbit.work;
import com.mq.rabbit.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
private static final String QUEUE_NAME = "hello_work";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicQos(1);
channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
System.out.println("消費(fèi)者1:" + new String(body));
// 耗時(shí)操作
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
-
啟動(dòng)測(cè)試
這樣我們就實(shí)現(xiàn)了能者多勞
3.發(fā)布/訂閱模型
3.1 思考
通過(guò)上述模型我們可以指導(dǎo),同一條消息只能發(fā)送給一個(gè)消費(fèi)者蓝纲,但如果說(shuō)我想要把一個(gè)消息發(fā)給多個(gè)消費(fèi)者阴孟,這又該怎么做呢?
3.2 介紹
3.2.1 官方介紹
大體意思如下:
在之前的模式中税迷,我們創(chuàng)建了一個(gè)工作隊(duì)列永丝。工作隊(duì)列背后的假設(shè)是:每個(gè)任務(wù)都被精確地傳遞給一個(gè)工人。在這一部分中箭养,我們將做一些完全不同的事情——我們將向多個(gè)消費(fèi)者傳遞一條消息慕嚷。這種模式稱為“發(fā)布/訂閱”。
3.2.2 模型圖
生產(chǎn)者把消息發(fā)送給交換機(jī)X(圖中藍(lán)紫色部分)毕泌,交換機(jī)X將消息轉(zhuǎn)發(fā)到不同的隊(duì)列中喝检。
- 1個(gè)生產(chǎn)者,多個(gè)消費(fèi)者
- 每一個(gè)消費(fèi)者都有自己的隊(duì)列
- 生產(chǎn)者是將消息發(fā)送到交換機(jī)撼泛,交換機(jī)把消息轉(zhuǎn)發(fā)到了隊(duì)列
- 每一個(gè)隊(duì)列都需要綁定交換機(jī)
- 一條消息被多個(gè)消費(fèi)者消費(fèi)
3.2.3 交換機(jī)
-
介紹
大體意思如下:
交換機(jī)
在之前的模型中挠说,我們直接向隊(duì)列發(fā)送和接收消息。現(xiàn)在是時(shí)候在Rabbit中引入完整的消息傳遞模型了愿题。
生產(chǎn)者是發(fā)送消息的用戶應(yīng)用程序损俭。
隊(duì)列是存儲(chǔ)消息的緩沖區(qū)。
消費(fèi)者是接收消息的用戶應(yīng)用程序抠忘。
RabbitMQ消息傳遞模型的核心思想是撩炊,生產(chǎn)者從不將任何消息直接發(fā)送到隊(duì)列。實(shí)際上崎脉,生產(chǎn)者常常根本不知道消息是否會(huì)被傳遞到任何隊(duì)列拧咳。
相反,生產(chǎn)者只能向交換機(jī)發(fā)送消息囚灼。一方面交換機(jī)接收來(lái)自生產(chǎn)者的消息骆膝,另一方面它將它們推送到隊(duì)列中祭衩。
3.2.4 交換機(jī)類型
- Fanout 廣播,將消息轉(zhuǎn)發(fā)到所有綁定交換機(jī)的隊(duì)列上
- Direct 定向阅签,將消息轉(zhuǎn)發(fā)到符合指定
routing key
的隊(duì)列上 - Topic 通配符掐暮, 把 消息轉(zhuǎn)發(fā)符合
routing pattern(路由模式)
的隊(duì)列
注意:Exchange(交換機(jī))只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲(chǔ)消息的能力政钟,因此如果沒有任何隊(duì)列與Exchange綁定路克,或者沒有符合路由規(guī)則的隊(duì)列,那么消息會(huì)丟失养交!
3.2.5 發(fā)布/訂閱模型-Fanout
-
介紹
Fanout類型也稱為廣播類型精算,這種類型有以下特點(diǎn):
-
每個(gè)隊(duì)列都要綁定到交換機(jī),且生產(chǎn)者發(fā)送的消息只能發(fā)送到交換機(jī)碎连,由交換機(jī)決定將消息發(fā)送到哪個(gè)隊(duì)列灰羽。生產(chǎn)者無(wú)法決定,甚至生產(chǎn)者都不知道消息被轉(zhuǎn)發(fā)到了哪個(gè)隊(duì)列上
-
每一個(gè)消費(fèi)者都需要有自己的隊(duì)列鱼辙,可以有多個(gè)消費(fèi)者
-
交換機(jī)會(huì)把所有消息轉(zhuǎn)發(fā)到每一個(gè)綁定到交換機(jī)上的隊(duì)列
-
-
編碼實(shí)現(xiàn)
-
生產(chǎn)者
- 生產(chǎn)者跟隊(duì)列沒有關(guān)系廉嚼,只跟交換機(jī)有關(guān)系
- 發(fā)送消息發(fā)送到交換機(jī),不是隊(duì)列上
package com.mq.rabbit.fanout; import com.mq.rabbit.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.springframework.amqp.core.ExchangeTypes; public class Producer { private static final String EXCHANGE_NAME = "amq.fanout"; public static void main(String[] args) throws Exception { // 1.獲取連接 Connection connection = ConnectionUtil.getConnection(); // 2. 創(chuàng)建通道 Channel channel = connection.createChannel(); /* * 3.聲明交換機(jī) * 第一個(gè)參數(shù):交換機(jī)名字 * 第二個(gè)參數(shù): 交換機(jī)類型 */ channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT); String msg = "hello exchange"; channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes()); channel.close(); connection.close(); } }
-
消費(fèi)者1
1.消費(fèi)者需要綁定到隊(duì)列上倒戏,每一個(gè)消費(fèi)者有自己的隊(duì)列
package com.mq.rabbit.fanout; import com.mq.rabbit.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer1 { private static final String QUEUE_NAME = "consumer_queue_1"; private static final String EXCHANGE_NAME = "amq.fanout"; public static void main(String[] args) throws Exception { // 1.獲取連接 Connection connection = ConnectionUtil.getConnection(); // 2.創(chuàng)建通道 Channel channel = connection.createChannel(); // 3.聲明隊(duì)列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); /* * 4.將隊(duì)列綁定到交換機(jī) * 第一個(gè)參數(shù) 隊(duì)列名字 * 第二個(gè)參數(shù) 交換機(jī)名字 * 第三個(gè)參數(shù) 路由key 后面再說(shuō)這個(gè) */ channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); // 5. 監(jiān)聽隊(duì)列獲取消息 channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費(fèi)者1:" + new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }); } }
-
消費(fèi)者2
package com.mq.rabbit.fanout; import com.mq.rabbit.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer2 { private static final String QUEUE_NAME = "consumer_queue_2"; private static final String EXCHANGE_NAME = "amq.fanout"; public static void main(String[] args) throws Exception { // 1.獲取連接 Connection connection = ConnectionUtil.getConnection(); // 2.創(chuàng)建通道 Channel channel = connection.createChannel(); // 3.聲明隊(duì)列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); /* * 4.將隊(duì)列綁定到交換機(jī) * 第一個(gè)參數(shù) 隊(duì)列名字 * 第二個(gè)參數(shù) 交換機(jī)名字 * 第三個(gè)參數(shù) 路由key 后面再說(shuō)這個(gè) */ channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); // 5. 監(jiān)聽隊(duì)列獲取消息 channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費(fèi)者2:" + new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }); } }
-
啟動(dòng)測(cè)試:
如果先啟動(dòng)生產(chǎn)者怠噪,那么就會(huì)創(chuàng)建一個(gè)交換機(jī),并且給交換機(jī)發(fā)送消息峭梳,但是我們此時(shí)還沒有啟動(dòng)消費(fèi)者舰绘,所以交換機(jī)里面的消息也會(huì)丟失
-
如果先啟動(dòng)消費(fèi)者,那么隊(duì)列綁定的交換機(jī)并不存在葱椭,所以也沒法綁定捂寿,從而拋出異常
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'fanout2_exchange' in vhost '/', class-id=50, method-id=20) at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:522) at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346) at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182) at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114) at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:672) at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48) at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:599) at java.lang.Thread.run(Thread.java:745)
-
解決辦法
先啟動(dòng)一次生產(chǎn)者,創(chuàng)建交換機(jī)孵运,創(chuàng)建交換機(jī)秦陋,由于交換機(jī)不能存儲(chǔ)消息。所以消息就會(huì)丟失
-
再啟動(dòng)消費(fèi)者
我們可以看到消費(fèi)者并沒有消費(fèi)消息治笨,因?yàn)榻粨Q機(jī)里面已經(jīng)沒有消息了驳概。
交換機(jī)也也綁定了隊(duì)列。此時(shí)我們?cè)賳?dòng)一次生產(chǎn)者旷赖,由于交換機(jī)已經(jīng)存在顺又,所以就會(huì)往交換機(jī)里發(fā)送消息
當(dāng)然如果不想這么麻煩,也可以使用MQ提供的交換機(jī)等孵。如下:
4.Routing模型
4.1 介紹
Routing模型(路由模型)其實(shí)也是屬于發(fā)布/訂閱模型稚照。只不過(guò)是交換機(jī)類型不一樣,這里我們將學(xué)習(xí)Direct
交換機(jī)模型。這個(gè)類型于Fanout類型不同的是果录,F(xiàn)anout類型是給每一個(gè)綁定到交換機(jī)上的隊(duì)列發(fā)消息上枕,而Direct
則是可以向指定的隊(duì)列發(fā)送消息,通過(guò)RoutingKey(路由key)
官方介紹如下:
大體意思如下:
在Fanout類型中弱恒,生產(chǎn)者發(fā)布消息辨萍,所有消費(fèi)者都可以獲取所有消息。
在路由模型中返弹,我們將添加一個(gè)功能 - 我們將只能訂閱一部分消息锈玉。 例如,我們只能將重要的錯(cuò)誤消息引導(dǎo)到日志文件(以節(jié)省磁盤空間)义起,同時(shí)仍然能夠在控制臺(tái)上打印所有日志消息嘲玫。
但是,在某些場(chǎng)景下并扇,我們希望不同的消息被不同的隊(duì)列消費(fèi)。這時(shí)就要用到Direct類型的Exchange抡诞。在Direct模型下穷蛹,隊(duì)列與交換機(jī)的綁定,不能是任意綁定了昼汗,而是要指定一個(gè)RoutingKey(路由key)肴熏,生產(chǎn)者在向Exchange發(fā)送消息時(shí),也必須指定消息的routing key顷窒。
簡(jiǎn)而言之就是生產(chǎn)者需要告訴交換機(jī)要將消息發(fā)送到指定的隊(duì)列中蛙吏,怎么告訴就是通過(guò)RoutingKey(路由key)
4.2 模型圖
從圖中我們可以看出:
- P (生產(chǎn)者) 向 X(交換機(jī))發(fā)送消息時(shí)會(huì)指定 路由key,
- 由于交換機(jī)類型為
direct
,該交換機(jī)就根據(jù)不同的路由key將orange
消息轉(zhuǎn)發(fā)到了Q1
隊(duì)列,將消息black
,green
消息轉(zhuǎn)發(fā)到了Q2
隊(duì)列鞋吉,然后被彼此綁定的消費(fèi)者所消費(fèi)鸦做。
接下來(lái)我們將使用Java代碼模擬圖中過(guò)程。
4.3 代碼
-
生產(chǎn)者
package com.mq.rabbit.routting; import com.mq.rabbit.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.springframework.amqp.core.ExchangeTypes; public class Producer { public static final String EXCHANGE_NAME = "hello_exchange_direct"; public static void main(String[] args) throws Exception { // 1.創(chuàng)建連接 Connection connection = ConnectionUtil.getConnection(); // 2.創(chuàng)建通道 Channel channel = connection.createChannel(); //3. 創(chuàng)建交換機(jī) channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT); // 4. 發(fā)送消息 String orange = "hello orange"; /* * 5.發(fā)送消息 * 第一個(gè)參數(shù):交換機(jī)名字 * 第二個(gè)參數(shù):路由key * 第三個(gè)參數(shù):消息其他參數(shù) * 第四個(gè)參數(shù): 消息 */ channel.basicPublish(EXCHANGE_NAME,"q1",null,orange.getBytes()); String black = "hello black"; channel.basicPublish(EXCHANGE_NAME,"q2",null,black.getBytes()); String green = "hello green"; channel.basicPublish(EXCHANGE_NAME,"q2",null,green.getBytes()); channel.close(); connection.close(); } }
上述代碼可知:
- 生產(chǎn)者發(fā)送了三條消息
hello orange
,hello black
,hello green
-
orange
消息的路由key為q1
,到時(shí)候發(fā)送到q1
隊(duì)列上谓着,并消費(fèi)者1消費(fèi) -
black
,green
消息的路由可以為q2
,到時(shí)候發(fā)送到q2
隊(duì)列上泼诱,并被消費(fèi)者2消費(fèi)
- 生產(chǎn)者發(fā)送了三條消息
-
消費(fèi)者1
package com.mq.rabbit.routting; import com.mq.rabbit.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer1 { public static final String QUEUE_NAME = "consumer1"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊(duì)列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); /* * 綁定交換機(jī) * 第一個(gè)參數(shù)為隊(duì)列名字 * 第二個(gè)參數(shù)為交換機(jī)名字 * 第三個(gè)參數(shù)為路由key */ channel.queueBind(QUEUE_NAME,Producer.EXCHANGE_NAME,"q1"); channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); } }); } }
-
消費(fèi)者2
package com.mq.rabbit.routting; import com.mq.rabbit.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer2 { public static final String QUEUE_NAME = "consumer2"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊(duì)列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.queueBind(QUEUE_NAME,Producer.EXCHANGE_NAME,"q1"); channel.queueBind(QUEUE_NAME,Producer.EXCHANGE_NAME,"q2"); channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); } }); } }
4.4 測(cè)試
-
生產(chǎn)者測(cè)試
-
消費(fèi)者測(cè)試
效果滿足我們想要的,這就是MQ中的路由模型赊锚。
5.Topics模型
MQ中的Top模型其實(shí)也是屬于發(fā)布/訂閱中模型的一種治筒,只不過(guò)交換機(jī)模型換成了 Topic
5.1 介紹
大體意思如下:
路由key由一個(gè)或者多個(gè)參數(shù)組成,如果是多個(gè)單詞必須以 . 號(hào)隔開 例如:category.update
Topic
類型的交換機(jī)與Direct
相比舷蒲,都是可以根據(jù)RoutingKey
把消息路由到不同的隊(duì)列耸袜。只不過(guò)Topic
類型Exchange
可以讓隊(duì)列在綁定Routing key
的時(shí)候使用通配符
*
只能匹配一個(gè)單詞#
匹配一個(gè)或者多個(gè)單詞- 例如:
product.*
product.insert
能夠匹配到,product.insert.dd
就匹配不到- product.#
product.insert
,product.insert.dd
都能匹配到
5.2 模型圖
我們將發(fā)送所有描述動(dòng)物的消息。消息將用路由key發(fā)送牲平,路由key由三個(gè)字(兩個(gè)點(diǎn))組成堤框。路由key中的第一個(gè)詞將描述一種快速性、第二種顏色和第三種a物種:“<celerity><colour><species>”。
我們創(chuàng)建了三個(gè)綁定:Q1用綁定鍵*.orange.*
綁定胰锌,Q2用*.rabbit
和lazy.#
綁定骗绕。
這些綁定可以概括為:
Q1匹配的橙色動(dòng)物
Q2匹配兔子和懶惰動(dòng)物
例如:
? quick.orange.rabbit
就會(huì) 被 Q2隊(duì)列匹配到
? lazy.orange.elephant
就會(huì)被 Q1 Q2隊(duì)列匹配到
? lazy.pink.rabbit
就會(huì)被Q2隊(duì)列匹配到
quick.brown.fox
不會(huì)被任何隊(duì)列匹配到
5.3 代碼實(shí)現(xiàn)
-
生產(chǎn)者
使用topic 類型交換機(jī),路由key為:
lazy.pink.rabbit
lazy.orange.elephant
quick.orange.rabbit``package com.mq.rabbit.topic; import com.mq.rabbit.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.springframework.amqp.core.ExchangeTypes; public class Producer { public static final String EXCHNAGE_NAME = "hello_exchange_topic"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHNAGE_NAME, ExchangeTypes.TOPIC); String msg = "hello lazy.pink.rabbit"; channel.basicPublish(EXCHNAGE_NAME,"lazy.pink.rabbit",null,msg.getBytes()); msg = "hello lazy.orange.elephant"; channel.basicPublish(EXCHNAGE_NAME,"lazy.orange.elephant",null,msg.getBytes()); msg = "hello quick.orange.rabbit"; channel.basicPublish(EXCHNAGE_NAME,"quick.orange.rabbit",null,msg.getBytes()); channel.close(); connection.close(); } }
-
消費(fèi)者1
package com.mq.rabbit.topic; import com.mq.rabbit.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer1 { public static final String QUEUE_NAME = "Q1"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.queueBind(QUEUE_NAME,Producer.EXCHNAGE_NAME,"*.orange.*"); channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費(fèi)者1:" + new String(body)); } }); } }
-
消費(fèi)者2
package com.mq.rabbit.topic; import com.mq.rabbit.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer2 { public static final String QUEUE_NAME = "Q2"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.queueBind(QUEUE_NAME,Producer.EXCHNAGE_NAME,"*.*.rabbit"); channel.queueBind(QUEUE_NAME,Producer.EXCHNAGE_NAME,"lazy.#"); channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費(fèi)者2:" + new String(body)); } }); } }
-
測(cè)試
-
測(cè)試生產(chǎn)者
-
測(cè)試消費(fèi)者
以上就是我們的topic類型
-
6.消息堆積&丟失問(wèn)題
6.1 堆積
如何避免消息對(duì)接問(wèn)題:
- 在消費(fèi)者一方啟用多線程去消費(fèi)
- 使用work模型去分擔(dān)消息资昧,注意酬土,發(fā)布/訂閱模型可以和work模型結(jié)合使用
6.2 丟失
如何避免消息丟失
- 消費(fèi)端使用手動(dòng)ACK機(jī)制(如何消費(fèi)者在消費(fèi)消息之前,MQ就掛掉格带,那么這個(gè)操作無(wú)用)
- 將消息持久化
消息要想持久化撤缴,那么前提條件就是 交換機(jī),隊(duì)列都需要持久化
6.2.1 交換機(jī)持久化
6.2.2 隊(duì)列持久化
7. RPC模型
rpc 模型其實(shí)是屬于遠(yuǎn)程調(diào)用叽唱,不屬于消息模型屈呕,所以這里不說(shuō)明,如果對(duì)rpc感興趣棺亭,可以去了解一下dubbo