RabbitMQ 簡介
RabbitMQ是一個在AMQP(Advanced Message Queuing Protocol )基礎(chǔ)上實(shí)現(xiàn)的,可復(fù)用的企業(yè)消息系統(tǒng)躺彬。它可以用于大型軟件系統(tǒng)各個模塊之間的高效通信煤墙,支持高并發(fā),支持可擴(kuò)展宪拥。
AMQP
AMQP仿野,即Advanced Message Queuing Protocol,一個提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級消息隊列協(xié)議,是應(yīng)用層協(xié)議的一個開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計∷基于此協(xié)議的客戶端與消息中間件可傳遞消息脚作,并不受客戶端/中間件不同產(chǎn)品,不同的開發(fā)語言等條件的限制缔刹。
消息隊列
MQ 全稱為Message Queue, 消息隊列球涛。是一種應(yīng)用程序?qū)?yīng)用程序的通信方法。應(yīng)用程序通過讀寫出入隊列的消息(針對應(yīng)用程序的數(shù)據(jù))來通信桨螺,而無需專用連接來鏈接它們宾符。
消息傳遞指的是程序之間通過在消息中發(fā)送數(shù)據(jù)進(jìn)行通信,而不是通過直接調(diào)用彼此來通信灭翔。隊列的使用除去了接收和發(fā)送應(yīng)用程序同時執(zhí)行的要求魏烫。
在項目中,將一些無需即時返回且耗時的操作提取出來肝箱,進(jìn)行了異步處理哄褒,而這種異步處理的方式大大的節(jié)省了服務(wù)器的請求響應(yīng)時間,從而提高了系統(tǒng)的吞吐量煌张。
消息隊列的使用場景是怎樣的呐赡?小紅和小明讀書的例子
RabbitMQ 應(yīng)用場景
對于一個大型的軟件系統(tǒng)來說,它會有很多的組件或者說模塊或者說子系統(tǒng)或者(subsystem or Component or submodule)骏融。那么這些模塊的如何通信链嘀?這和傳統(tǒng)的IPC有很大的區(qū)別。傳統(tǒng)的IPC很多都是在單一系統(tǒng)上的档玻,模塊耦合性很大怀泊,不適合擴(kuò)展(Scalability);如果使用socket那么不同的模塊的確可以部署到不同的機(jī)器上误趴,但是還是有很多問題需要解決霹琼。比如:
1)信息的發(fā)送者和接收者如何維持這個連接,如果一方的連接中斷,這期間的數(shù)據(jù)如何方式丟失枣申?
2)如何降低發(fā)送者和接收者的耦合度售葡?
3)如何讓Priority高的接收者先接到數(shù)據(jù)?
4)如何做到load balance忠藤?有效均衡接收者的負(fù)載挟伙?
5)如何有效的將數(shù)據(jù)發(fā)送到相關(guān)的接收者?也就是說將接收者subscribe 不同的數(shù)據(jù)熄驼,如何做有效的filter像寒。
6)如何做到可擴(kuò)展,甚至將這個通信模塊發(fā)到cluster上瓜贾?
7)如何保證接收者接收到了完整诺祸,正確的數(shù)據(jù)?
AMDQ協(xié)議解決了以上的問題祭芦,而RabbitMQ實(shí)現(xiàn)了AMQP筷笨。
概念介紹
- Broker:簡單來說就是消息隊列服務(wù)器實(shí)體。
- Exchange:消息交換機(jī)龟劲,它指定消息按什么規(guī)則胃夏,路由到哪個隊列。
- Queue:消息隊列載體昌跌,每個消息都會被投入到一個或多個隊列仰禀。
- Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來蚕愤。
- Routing Key:路由關(guān)鍵字答恶,exchange根據(jù)這個關(guān)鍵字進(jìn)行消息投遞。
- vhost:虛擬主機(jī)萍诱,一個broker里可以開設(shè)多個vhost悬嗓,用作不同用戶的權(quán)限分離。
- producer:消息生產(chǎn)者裕坊,就是投遞消息的程序包竹。
- consumer:消息消費(fèi)者,就是接受消息的程序籍凝。
- channel:消息通道周瞎,在客戶端的每個連接里,可建立多個channel饵蒂,每個channel代表一個會話任務(wù)声诸。
RabbitMQ使用流程
AMQP模型中,消息在producer中產(chǎn)生苹享,發(fā)送到MQ的exchange上双絮,exchange根據(jù)配置的路由方式發(fā)到相應(yīng)的Queue上,Queue又將消息發(fā)送給consumer得问,消息從queue到consumer有push和pull兩種方式囤攀。 消息隊列的使用過程大概如下:
- 客戶端連接到消息隊列服務(wù)器,打開一個channel宫纬。
- 客戶端聲明一個exchange焚挠,并設(shè)置相關(guān)屬性。
- 客戶端聲明一個queue漓骚,并設(shè)置相關(guān)屬性蝌衔。
- 客戶端使用routing key,在exchange和queue之間建立好綁定關(guān)系蝌蹂。
- 客戶端投遞消息到exchange噩斟。
exchange接收到消息后,就根據(jù)消息的key和已經(jīng)設(shè)置的binding孤个,進(jìn)行消息路由剃允,將消息投遞到一個或多個隊列里。 exchange也有幾個類型齐鲤,完全根據(jù)key進(jìn)行投遞的叫做Direct交換機(jī)斥废,例如,綁定時設(shè)置了routing key為”abc”给郊,那么客戶端提交的消息牡肉,只有設(shè)置了key為”abc”的才會投遞到隊列。
RabbitMQ安裝教程
- Windows Linux安裝教程
-
Mac 安裝教程
安裝成功后打開瀏覽器淆九,訪問 http://localhost:15672
rabbitMQ常用的命令
啟動監(jiān)控管理器:rabbitmq-plugins enable rabbitmq_management
關(guān)閉監(jiān)控管理器:rabbitmq-plugins disable rabbitmq_management
啟動rabbitmq:rabbitmq-service start
關(guān)閉rabbitmq:rabbitmq-service stop
查看所有的隊列:rabbitmqctl list_queues
清除所有的隊列:rabbitmqctl reset
關(guān)閉應(yīng)用:rabbitmqctl stop_app
啟動應(yīng)用:rabbitmqctl start_app
用戶和權(quán)限設(shè)置
添加用戶:rabbitmqctl add_user username password
分配角色:rabbitmqctl set_user_tags username administrator
新增虛擬主機(jī):rabbitmqctl add_vhost vhost_name
將新虛擬主機(jī)授權(quán)給新用戶:rabbitmqctl set_permissions -p vhost_name username “.*” “.*” “.*”
(后面三個”*”代表用戶擁有配置统锤、寫、讀全部權(quán)限)
角色說明
- 超級管理員(administrator)
可登陸管理控制臺吩屹,可查看所有的信息跪另,并且可以對用戶,策略(policy)進(jìn)行操作煤搜。 - 監(jiān)控者(monitoring)
可登陸管理控制臺免绿,同時可以查看rabbitmq節(jié)點(diǎn)的相關(guān)信息(進(jìn)程數(shù),內(nèi)存使用情況擦盾,磁盤使用情況等) - 策略制定者(policymaker)
可登陸管理控制臺, 同時可以對policy進(jìn)行管理嘲驾。但無法查看節(jié)點(diǎn)的相關(guān)信息(上圖紅框標(biāo)識的部分)。 - 普通管理者(management)
僅可登陸管理控制臺迹卢,無法看到節(jié)點(diǎn)信息辽故,也無法對策略進(jìn)行管理。 - 其他
無法登陸管理控制臺腐碱,通常就是普通的生產(chǎn)者和消費(fèi)者誊垢。
Java入門實(shí)例(Helloworld)
一個producer發(fā)送消息掉弛,一個接收者接收消息,并在控制臺打印出來喂走。如下圖:
Java客戶端配置
下面是Java客戶端的maven依賴的配置殃饿。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.0.0</version>
</dependency>
發(fā)送端:Send.java 連接到RabbitMQ(此時服務(wù)需要啟動),發(fā)送一條數(shù)據(jù)芋肠,然后退出乎芳。
package cn.buyforyou;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Send
{
//隊列名稱
private final static String QUEUE_NAME = "helloMQ";
public static void main(String[] argv) throws java.io.IOException, TimeoutException
{
/**
* 創(chuàng)建連接連接到MabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
//設(shè)置MabbitMQ所在主機(jī)ip或者主機(jī)名
factory.setHost("localhost");
//創(chuàng)建一個連接
Connection connection = factory.newConnection();
//創(chuàng)建一個頻道
Channel channel = connection.createChannel();
//指定一個隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//發(fā)送的消息
String message = "hello world!";
//往隊列中發(fā)出一條消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//關(guān)閉頻道和連接
channel.close();
connection.close();
}
}
值得注意的是隊列只會在它不存在的時候創(chuàng)建,多次聲明并不會重復(fù)創(chuàng)建帖池。信息的內(nèi)容是字節(jié)數(shù)組奈惑,也就意味著你可以傳遞任何數(shù)據(jù)。
接收端:Recv.java 不斷等待服務(wù)器推送消息睡汹,然后在控制臺輸出肴甸。
package cn.buyforyou;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Recv {
// 隊列名稱
private final static String QUEUE_NAME = "helloMQ";
public static void main(String[] argv) throws Exception {
// 打開連接和創(chuàng)建頻道,與發(fā)送端一樣
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//聲明隊列囚巴,主要為了防止消息接收者先運(yùn)行此程序雷滋,隊列還不存在時創(chuàng)建隊列。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//創(chuàng)建消費(fèi)者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
RabbitMQ工作隊列-Work Queues(Java實(shí)例)
創(chuàng)建一個工作隊列用來在工作者(consumer)間分發(fā)耗時任務(wù)文兢。
工作隊列的主要任務(wù)是:避免立刻執(zhí)行資源密集型任務(wù)晤斩,然后必須等待其完成。相反地姆坚,我們進(jìn)行任務(wù)調(diào)度:我們把任務(wù)封裝為消息發(fā)送給隊列澳泵。工作進(jìn)行在后臺運(yùn)行并不斷的從隊列中取出任務(wù)然后執(zhí)行。當(dāng)你運(yùn)行了多個工作進(jìn)程時兼呵,任務(wù)隊列中的任務(wù)將會被工作進(jìn)程共享執(zhí)行兔辅。
這樣的概念在web應(yīng)用中極其有用,當(dāng)在很短的HTTP請求間需要執(zhí)行復(fù)雜的任務(wù)击喂。
準(zhǔn)備
我們使用Thread.sleep來模擬耗時的任務(wù)维苔。我們在發(fā)送到隊列的消息的末尾添加一定數(shù)量的點(diǎn),每個點(diǎn)代表在工作線程中需要耗時1秒懂昂,例如hello…將會需要等待3秒介时。
發(fā)送端:
NewTask.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = getMessage(argv);
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
private static String getMessage(String[] strings) {
if (strings.length < 1)
return "Hello World!";
return joinStrings(strings, " ");
}
private static String joinStrings(String[] strings, String delimiter) {
int length = strings.length;
if (length == 0) return "";
StringBuilder words = new StringBuilder(strings[0]);
for (int i = 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}
接收端:
Work.java
import com.rabbitmq.client.*;
import java.io.IOException;
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
}
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
循環(huán)調(diào)度
使用任務(wù)隊列的好處是能夠很容易的并行工作。如果我們積壓了很多工作凌彬,我們僅僅通過增加更多的工作者就可以解決問題沸柔,使系統(tǒng)的伸縮性更加容易。
消息確認(rèn)
執(zhí)行一個任務(wù)需要花費(fèi)幾秒鐘铲敛。你可能會擔(dān)心當(dāng)一個工作者在執(zhí)行任務(wù)時發(fā)生中斷褐澎。我們上面的代碼,一旦RabbItMQ交付了一個信息給消費(fèi)者伐蒋,會馬上從內(nèi)存中移除這個信息工三。在這種情況下迁酸,如果殺死正在執(zhí)行任務(wù)的某個工作者,我們會丟失它正在處理的信息俭正。我們也會丟失已經(jīng)轉(zhuǎn)發(fā)給這個工作者且它還未執(zhí)行的消息胁出。
boolean ack = false ; //打開應(yīng)答機(jī)制
channel.basicConsume(QUEUE_NAME, ack, consumer);
//另外需要在每次處理完成一個消息后,手動發(fā)送一次應(yīng)答段审。
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
消息的持久性
我們已經(jīng)學(xué)習(xí)了即使消費(fèi)者被殺死,消息也不會被丟失闹蒜。但是如果此時RabbitMQ服務(wù)被停止寺枉,我們的消息仍然會丟失。
當(dāng)RabbitMQ退出或者異常退出绷落,將會丟失所有的隊列和信息姥闪,除非你告訴它不要丟失。我們需要做兩件事來確保信息不會被丟失:我們需要給所有的隊列和消息設(shè)置持久化的標(biāo)志砌烁。
第一筐喳, 我們需要確認(rèn)RabbitMQ永遠(yuǎn)不會丟失我們的隊列。為了這樣函喉,我們需要聲明它為持久化的避归。
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
注:RabbitMQ不允許使用不同的參數(shù)重新定義一個隊列,所以已經(jīng)存在的隊列管呵,我們無法修改其屬性梳毙。
第二, 我們需要標(biāo)識我們的信息為持久化的捐下。通過設(shè)置MessageProperties(implements BasicProperties)值為PERSISTENT_TEXT_PLAIN账锹。
channel.basicPublish("", "task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
現(xiàn)在你可以執(zhí)行一個發(fā)送消息的程序,然后關(guān)閉服務(wù)坷襟,再重新啟動服務(wù)奸柬,運(yùn)行消費(fèi)者程序做下實(shí)驗(yàn)。
公平的分配
或許會發(fā)現(xiàn)婴程,目前的消息轉(zhuǎn)發(fā)機(jī)制(Round-robin)并非是我們想要的廓奕。例如,這樣一種情況档叔,對于兩個消費(fèi)者懂从,有一系列的任務(wù),奇數(shù)任務(wù)特別耗時蹲蒲,而偶數(shù)任務(wù)卻很輕松番甩,這樣造成一個消費(fèi)者一直繁忙,另一個消費(fèi)者卻很快執(zhí)行完任務(wù)后等待届搁。
造成這樣的原因是因?yàn)镽abbitMQ僅僅是當(dāng)消息到達(dá)隊列進(jìn)行轉(zhuǎn)發(fā)消息缘薛。并不在乎有多少任務(wù)消費(fèi)者并未傳遞一個應(yīng)答給RabbitMQ窍育。僅僅盲目轉(zhuǎn)發(fā)所有的奇數(shù)給一個消費(fèi)者,偶數(shù)給另一個消費(fèi)者宴胧。
為了解決這樣的問題漱抓,我們可以使用basicQos方法,傳遞參數(shù)為prefetchCount = 1恕齐。這樣告訴RabbitMQ不要在同一時間給一個消費(fèi)者超過一條消息乞娄。換句話說,只有在消費(fèi)者空閑的時候會發(fā)送下一條信息显歧。
int prefetchCount = 1;
channel.basicQos(prefetchCount);