什么是MQ骡苞?
MQ全稱為Message Queue, 消息隊列(MQ)是應(yīng)用程序“對”應(yīng)用程序的通信方法。
MQ:生產(chǎn)者者往消息隊列中寫消息楷扬,消費可以讀取隊列中的消息解幽。
簡單隊列
1、生產(chǎn)者烘苹、隊列躲株、消費者
隊列是RabbitMQ的內(nèi)部對象,用于存儲消息镣衡。生產(chǎn)者(下圖中的P)生產(chǎn)消息并投遞到隊列中霜定,消費者(下圖中的C)可以從隊列中獲取消息并消費档悠。
生產(chǎn)者:生產(chǎn)消息發(fā)送到消息隊列中
package com.zhy.rabbit._01;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Send{
//隊列名稱
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws java.io.IOException
{
/**
* 創(chuàng)建連接連接到MabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
//設(shè)置MabbitMQ所在主機ip或者主機名
factory.setHost("localhost");
//創(chuàng)建一個連接
Connection connection = factory.newConnection();
//創(chuàng)建一個頻道
Channel channel = connection.createChannel();
//指定一個隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//發(fā)送的消息
String message = "hello world!";
//往隊列中發(fā)出一條消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//關(guān)閉頻道和連接
channel.close();
connection.close();
}
}
消費者:消費消息從消息隊列中取消息
package com.zhy.rabbit._01;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Recv
{
//隊列名稱
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws java.io.IOException,
java.lang.InterruptedException{
//打開連接和創(chuàng)建頻道,與發(fā)送端一樣
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//聲明隊列望浩,主要為了防止消息接收者先運行此程序辖所,隊列還不存在時創(chuàng)建隊列吗浩。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//創(chuàng)建隊列消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
//指定消費隊列
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true)
{
//nextDelivery是一個阻塞方法(內(nèi)部實現(xiàn)其實是阻塞隊列的take方法)
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
}
事件監(jiān)聽方式
工作者(消費者)Worker.java
public class Customer {
private final static String QUEUE_NAME = "rabbitMQ.test";
public static void main(String[] args) throws IOException, TimeoutException {
// 創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
//設(shè)置RabbitMQ地址
factory.setHost("localhost");
//創(chuàng)建一個新的連接
Connection connection = factory.newConnection();
//創(chuàng)建一個通道
Channel channel = connection.createChannel();
//聲明要關(guān)注的隊列
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
System.out.println("Customer Waiting Received messages");
//DefaultConsumer類實現(xiàn)了Consumer接口疲恢,通過傳入一個頻道穆咐,
// 告訴服務(wù)器我們需要那個頻道的消息芥映,如果頻道中有消息映砖,就會執(zhí)行事件觸發(fā)回調(diào)函數(shù)handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Customer Received '" + message + "'");
}
};
//自動回復(fù)隊列應(yīng)答 -- RabbitMQ中的消息確認(rèn)機制
channel.basicConsume(QUEUE_NAME, true, consumer);
}
Round-robin(輪詢分發(fā))
使用任務(wù)隊列的優(yōu)點之一就是可以輕易的并行工作琳骡。如果我們積壓了好多工作弯淘,我們可以通過增加工作者(消費者)來解決這一問題颜武,使得系統(tǒng)的伸縮性更加容易您觉。
修改一下NewTask拙寡,使用for循環(huán)模擬多次發(fā)送消息的過程:
for (int i = 0; i < 5; i++) {
// 發(fā)送的消息
String message = "Hello World"+Strings.repeat(".", i);
// 往隊列中發(fā)出一條消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
我們先啟動1個生產(chǎn)者實例,2個工作者實例顾犹,看一下如何執(zhí)行:
WorkQueue工作隊列
多個消費者可以訂閱同一個隊列倒庵,這時隊列中的消息會被平均分?jǐn)偨o多個消費者進行處理,而不是每個消費者都收到所有的消息并處理炫刷。
工作隊列(又名:任務(wù)隊列)的主要任務(wù)是為了避免立即做一個資源密集型的卻又必須等待完成的任務(wù)擎宝。相反的,我們進行任務(wù)調(diào)度:將任務(wù)封裝為消息并發(fā)給隊列浑玛。在后臺運行的工作者(consumer)將其取出绍申,然后最終執(zhí)行。當(dāng)你運行多個工作者(consumer)顾彰,隊列中的任務(wù)被工作進行共享執(zhí)行极阅。
準(zhǔn)備
使用Thread.Sleep()方法來模擬耗時。采用小數(shù)點的數(shù)量來表示任務(wù)的復(fù)雜性涨享。每一個點將住哪用1s的“工作”筋搏。例如,Hello... 處理完需要3s的時間厕隧。
發(fā)送端(生產(chǎn)者):NewTask.java
public class NewTask {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException {
/**
* 創(chuàng)建連接連接到MabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
// 設(shè)置MabbitMQ所在主機ip或者主機名
factory.setHost("127.0.0.1");
// 創(chuàng)建一個連接
Connection connection = factory.newConnection();
// 創(chuàng)建一個頻道
Channel channel = connection.createChannel();
// 指定一個隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 發(fā)送的消息
String message = "Hello World...";
// 往隊列中發(fā)出一條消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
// 關(guān)閉頻道和連接
channel.close();
connection.close();
}
}
工作者(消費者)Worker.java
public class Worker {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws IOException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
// 打開連接和創(chuàng)建頻道奔脐,與發(fā)送端一樣
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 聲明隊列,主要為了防止消息接收者先運行此程序吁讨,隊列還不存在時創(chuàng)建隊列髓迎。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 創(chuàng)建隊列消費者
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
System.out.println(" [x] Proccessing... at " +new Date().toLocaleString());
try {
for (char ch: message.toCharArray()) {
if (ch == '.') {
Thread.sleep(1000);
}
}
} catch (InterruptedException e) {
} finally {
System.out.println(" [x] Done! at " +new Date().toLocaleString());
}
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
運行結(jié)果如下:
任務(wù)分發(fā)機制
正主來了。建丧。排龄。下面開始介紹各種任務(wù)分發(fā)機制。
Round-robin(輪詢分發(fā))
使用任務(wù)隊列的優(yōu)點之一就是可以輕易的并行工作翎朱。如果我們積壓了好多工作橄维,我們可以通過增加工作者(消費者)來解決這一問題尺铣,使得系統(tǒng)的伸縮性更加容易。
修改一下NewTask争舞,使用for循環(huán)模擬多次發(fā)送消息的過程:
for (int i = 0; i < 5; i++) {
// 發(fā)送的消息
String message = "Hello World"+Strings.repeat(".", i);
// 往隊列中發(fā)出一條消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
我們先啟動1個生產(chǎn)者實例迄埃,2個工作者實例,看一下如何執(zhí)行:
Fair dispatch(公平分發(fā))
您可能已經(jīng)注意到兑障,任務(wù)分發(fā)仍然沒有完全按照我們想要的那樣。比如:現(xiàn)在有2個消費者蕉汪,所有的奇數(shù)的消息都是繁忙的流译,而偶數(shù)則是輕松的。按照輪詢的方式者疤,奇數(shù)的任務(wù)交給了第一個消費者福澡,所以一直在忙個不停。偶數(shù)的任務(wù)交給另一個消費者驹马,則立即完成任務(wù)革砸,然后閑得不行。而RabbitMQ則是不了解這些的糯累。
這是因為當(dāng)消息進入隊列算利,RabbitMQ就會分派消息。它不看消費者為應(yīng)答的數(shù)目泳姐,只是盲目的將第n條消息發(fā)給第n個消費者效拭。
為了解決這個問題,我們使用basicQos( prefetchCount = 1)方法胖秒,來限制RabbitMQ只發(fā)不超過1條的消息給同一個消費者缎患。當(dāng)消息處理完畢后,有了反饋阎肝,才會進行第二次發(fā)送挤渔。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
注:如果所有的工作者都處于繁忙狀態(tài),你的隊列有可能被填充滿风题。你可能會觀察隊列的使用情況判导,然后增加工作者,或者使用別的什么策略俯邓。
還有一點需要注意骡楼,使用公平分發(fā),必須關(guān)閉自動應(yīng)答稽鞭,改為手動應(yīng)答鸟整。這些內(nèi)容會在下篇博文中講述。
整體代碼如下:生產(chǎn)者NewTask.java
public class NewTask {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException {
/**
* 創(chuàng)建連接連接到MabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
// 設(shè)置MabbitMQ所在主機ip或者主機名
factory.setHost("127.0.0.1");
// 創(chuàng)建一個連接
Connection connection = factory.newConnection();
// 創(chuàng)建一個頻道
Channel channel = connection.createChannel();
// 指定一個隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
int prefetchCount = 1;
//限制發(fā)給同一個消費者不得超過1條消息
channel.basicQos(prefetchCount);
for (int i = 0; i < 5; i++) {
// 發(fā)送的消息
String message = "Hello World"+Strings.repeat(".",5-i)+(5-i);
// 往隊列中發(fā)出一條消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
// 關(guān)閉頻道和連接
channel.close();
connection.close();
}
}
消費者Worker.java
public class Worker {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws IOException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
// 打開連接和創(chuàng)建頻道朦蕴,與發(fā)送端一樣
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 聲明隊列篮条,主要為了防止消息接收者先運行此程序弟头,隊列還不存在時創(chuàng)建隊列。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);//保證一次只分發(fā)一個
// 創(chuàng)建隊列消費者
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
for (char ch: message.toCharArray()) {
if (ch == '.') {
Thread.sleep(1000);
}
}
} catch (InterruptedException e) {
} finally {
System.out.println(" [x] Done! at " +new Date().toLocaleString());
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
能者多勞涉茧,消息分發(fā)赴恨;
3.消息應(yīng)答
3.消息持久化
當(dāng)已經(jīng)存在的消息隊列不能持久化,如果想持久化伴栓,需要刪除當(dāng)前隊列伦连,從新定義。
訂閱模式
我們之前學(xué)習(xí)的都是一個消息只能被一個消費者消費,那么如果我想發(fā)一個消息 能被多個消費者消費,這時候怎么辦? 這時候我們就得用到了消息中的發(fā)布訂閱模型
- 在前面的教程中钳垮,我們創(chuàng)建了一個工作隊列惑淳,都是一個任務(wù)只交給一個消費者。
- 這次我們做 將消息發(fā)送給多個消費者饺窿。這種模式叫做“發(fā)布/訂閱”歧焦。
列:
類似微信訂閱號 發(fā)布文章消息 就可以廣播給所有的接收者。(訂閱者)
那么咱們來看一下圖,我們學(xué)過前兩種有一些不一樣,work 模式 是不是同一個隊列 多個消費者,而 ps 這種模式呢,是一個隊列對應(yīng)一個消費者,發(fā)布/訂閱模式還多了一個 X(交換機 轉(zhuǎn)發(fā)器) ,這時候我們要獲取消息 就需要隊列綁定到交換機上,交換機把消息發(fā)送到隊列 , 消費者才能獲取隊列的消息
- 發(fā)布/訂閱模式解讀:
1肚医、1 個生產(chǎn)者绢馍,多個消費者
2、每一個消費者都有自己的一個隊列
3肠套、生產(chǎn)者沒有將消息直接發(fā)送到隊列舰涌,而是發(fā)送到了交換機(轉(zhuǎn)發(fā)器)
4、每個隊列都要綁定到交換機
5你稚、生產(chǎn)者發(fā)送的消息舵稠,經(jīng)過交換機,到達(dá)隊列入宦,實現(xiàn)哺徊,一個消息被多個消費者獲取的目的
產(chǎn)者
后臺注冊 ->郵件?->短信
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// 獲取到連接以及mq通道
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 聲明exchange 交換機 轉(zhuǎn)發(fā)器
channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //fanout 分裂
// 消息內(nèi)容
String message = "Hello PB";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
那么先看一下控制臺 是不是有這個交換機
但是這個發(fā)送的消息到哪了呢? 消息丟失了!!!因為交換機沒有存儲消息的能力,在 rabbitmq 中只有隊列存儲消息的能力.因為這時還沒有隊列,所以就會丟失;
小結(jié):消息發(fā)送到了一個沒有綁定隊列的交換機時,消息就會丟失!
那么我們再來寫消費者
消費者 1
郵件發(fā)送系統(tǒng)
public class Recv {
private final static String QUEUE_NAME = "test_queue_fanout_email";
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// 獲取到連接以及mq通道
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 綁定隊列到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
//------------下面邏輯和work模式一樣-----
// 同一時刻服務(wù)器只會發(fā)一條消息給消費者
channel.basicQos(1);
// 定義一個消費者
Consumer consumer = new DefaultConsumer(channel) {
// 消息到達(dá) 觸發(fā)這個方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[1] Recv msg:" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[1] done ");
// 手動回執(zhí)
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
消費者 2
類似短信發(fā)送系統(tǒng)
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_fanout_2";
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// 獲取到連接以及mq通道
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 綁定隊列到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 同一時刻服務(wù)器只會發(fā)一條消息給消費者
// 定義一個消費者
Consumer consumer = new DefaultConsumer(channel) {
// 消息到達(dá) 觸發(fā)這個方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[2] Recv msg:" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[2] done ");
// 手動回執(zhí)
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
測試
一個消息 可以被多個消費者獲