本文章是在網(wǎng)易云課堂的課程學(xué)習(xí)中編寫,部分圖片從網(wǎng)易云課堂ppt引用
一形纺、RabbitMQ簡介
是一個開源的AMQP實現(xiàn)
二逐样、RabbitMQ安裝運行
1、安裝依賴環(huán)境
在 http://www.rabbitmq.com/which-erlang.html 頁面查看安裝rabbitmq需要安裝erlang對應(yīng)的版本
在 https://github.com/rabbitmq/erlang-rpm/releases 頁面找到需要下載的erlang版本挪捕,
erlang-*.centos.x86_64.rpm
就是centos版本的级零。-
復(fù)制下載地址后奏纪,使用wget命令下載
wget -P /home/download https://github.com/rabbitmq/erlang-rpm/releases/download/v21.2.3/erlang-21.2.3-1.el7.centos.x86_64.rpm
-
安裝 Erlang
sudo rpm -Uvh /home/download/erlang-21.2.3-1.el7.centos.x86_64.rpm
-
安裝 socat
sudo yum install -y socat
2序调、安裝RabbitMQ
-
在官方下載頁面找到CentOS7版本的下載鏈接发绢,下載rpm安裝包
wget -P /home/download https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.9/rabbitmq-server-3.7.9-1.el7.noarch.rpm
提示:可以在
https://github.com/rabbitmq/rabbitmq-server/tags下載歷史版本
-
安裝RabbitMQ
sudo rpm -Uvh /home/download/rabbitmq-server-3.7.9-1.el7.noarch.rpm
3边酒、啟動和關(guān)閉
- 啟動服務(wù)
sudo systemctl start rabbitmq-server
若啟動報錯狸窘,可查看日志信息
-
查看狀態(tài)
sudo systemctl status rabbitmq-server
-
停止服務(wù)
sudo systemctl stop rabbitmq-server
-
設(shè)置開機啟動
sudo systemctl enable rabbitmq-server
4朦前、RabbitMQ基本配置
RabbitMQ有一套默認(rèn)的配置韭寸,一般能滿足日常開發(fā)需求恩伺。若需要修改椰拒,需要自己創(chuàng)建一個配置文件
touch /etc/rabbitmq/rabbitmq.conf
官網(wǎng)配置項說明:
https://www.rabbitmq.com/configure.html
5燃观、RabbitMQ管理界面
RabbitMQ安裝包中帶有管理插件缆毁,但要手動激活
-
開啟插件
rabbitmq-plugins enable rabbitmq_management
說明:rabbitmq有一個默認(rèn)的guest用戶,但只能通過localhost訪問践啄,所以需要添加一個能夠遠程訪問的用戶沉御。
-
添加用戶
rabbitmqctl add_user admin admin
-
為用戶分配操作權(quán)限
rabbitmqctl set_user_tags admin administrator
在一個RabbitMQ中吠裆,可以劃分多個虛擬主機
- 為用戶分配資源權(quán)限
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
配置完畢后试疙,可以在瀏覽器打開 15672 端口的控制臺頁面效斑,賬號密碼是 admin/admin
6缓屠、RabbitMQ端口
RabbitMQ會綁定一些端口,安裝完后并啟動服務(wù)后储耐,還不能進行外部通信什湘,需要將這些端口添加至防火墻闽撤。
-
添加端口
sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
-
重啟防火墻
sudo firewall-cmd --reload
下面對一些端口做一下介紹:
4369
是Erlang的端口/結(jié)點名稱映射程序哟旗,用來跟蹤節(jié)點名稱監(jiān)聽地址闸餐,在集群中起到一個類似DNS的作用舍沙。5672,5671
一般使用的是5672端口剔宪,沒有使用SSL。使用SSL的話格遭,是用5671端口拒迅。25672
可理解為是用于管理的端口璧微,用于RabbitMQ節(jié)點間和CLI工具通信硬梁,配合4369使用荧止。15672
HTTP管理端口跃巡,通過這個端口打開web可視化的管理頁面素邪,用于管理RabbitMQ兔朦,需要啟用management插件。61613, 61614
插件相關(guān)的端口声邦,當(dāng)STOMP插件啟用的時候打開翔忽,作為STOMP客戶端端口(根據(jù)是否使用TLS選擇)1883, 8883
插件相關(guān)的端口,當(dāng)MQTT插件啟用的時候打開胡野,作為MQTT客戶端端口(根據(jù)是否使用TLS選擇) 硫豆。默認(rèn)使用的是188315674
基于WebSocket的STOMP客戶端端口(當(dāng)插件Web STOMP啟用的時候打開)15675
基于WebSocket的MQTT客戶端端口(當(dāng)插件Web MQTT啟用的時候打開)
7熊响、RabbitMQ角色
none
不能訪問management插件management
查看自己的virtual hosts中的queues汗茄、exchanges、bindings等資源policymaker
比management角色多了些功能,專門用來管理相關(guān)的策略瞳腌。比如查看嫂侍、創(chuàng)建和刪除自己的virtual hosts所屬的policies和parametersmonitoring
比management角色多了些功能挑宠,主要用來監(jiān)控痹栖【景ⅲ可查看所有virtual hosts南捂,其他用戶的connections溺健、channels,節(jié)點級別的數(shù)據(jù)(比如clustering剖膳、memory情況)等administrator
權(quán)限最大的角色
三吱晒、RabbitMQ的簡單使用
1仑濒、maven依賴
- 在Java中使用RabbitMQ
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.1</version>
</dependency>
</dependencies>
- 在Spring中使用RabbitMQ
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2驼壶、隊列生產(chǎn)者
public class Producer {
public static void main(String[] args) {
// 1辅柴、創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
// 2瞭吃、設(shè)置連接屬性
factory.setHost("192.168.100.242");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3股冗、從連接工廠獲取連接
connection = factory.newConnection("生產(chǎn)者");
// 4止状、從鏈接中創(chuàng)建通道攒霹。一個連接可以創(chuàng)建多個channel
channel = connection.createChannel();
/**
* 5催束、聲明(創(chuàng)建)隊列
* 如果隊列不存在抠刺,才會創(chuàng)建
* RabbitMQ 不允許聲明兩個隊列名相同速妖,屬性不同的隊列罕容,否則會報錯
*
* queueDeclare參數(shù)說明:
* @param queue 隊列名稱
* @param durable 隊列是否持久化
* @param exclusive 是否排他锦秒,即是否為私有的脂崔,如果為true,會對當(dāng)前隊列加鎖砌左,其它通道不能訪問汇歹,并且在連接關(guān)閉時會自動刪除产弹,不受持久化和自動刪除的屬性控制
* @param autoDelete 是否自動刪除痰哨,當(dāng)最后一個消費者斷開連接之后是否自動刪除
* @param arguments 隊列參數(shù)斤斧,設(shè)置隊列的有效期撬讽、消息最大長度游昼、隊列中所有消息的生命周期等等
*/
channel.queueDeclare("queue1", false, false, false, null);
// 消息內(nèi)容
String message = "Hello World!";
// 6烘豌、發(fā)送消息
channel.basicPublish("", "queue1", null, message.getBytes());
System.out.println("消息已發(fā)送廊佩!");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// 7罐寨、關(guān)閉通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 8鸯绿、關(guān)閉連接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
3瓶蝴、隊列消費者
public class Consumer {
public static void main(String[] args) {
// 1拧簸、創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
// 2盆赤、設(shè)置連接屬性
factory.setHost("192.168.100.242");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3牺六、從連接工廠獲取連接
connection = factory.newConnection("消費者");
// 4淑际、從鏈接中創(chuàng)建通道
channel = connection.createChannel();
/**
* 5春缕、聲明(創(chuàng)建)隊列
* 如果隊列不存在锄贼,才會創(chuàng)建
* RabbitMQ 不允許聲明兩個隊列名相同咱娶,屬性不同的隊列膘侮,否則會報錯
*
* queueDeclare參數(shù)說明:
* @param queue 隊列名稱
* @param durable 隊列是否持久化
* @param exclusive 是否排他琼了,即是否為私有的雕薪,如果為true,會對當(dāng)前隊列加鎖所袁,其它通道不能訪問燥爷,
* 并且在連接關(guān)閉時會自動刪除前翎,不受持久化和自動刪除的屬性控制港华。
* 一般在隊列和交換器綁定時使用
* @param autoDelete 是否自動刪除,當(dāng)最后一個消費者斷開連接之后是否自動刪除
* @param arguments 隊列參數(shù)臊岸,設(shè)置隊列的有效期商模、消息最大長度施流、隊列中所有消息的生命周期等等
*/
channel.queueDeclare("queue1", false, false, false, null);
// 6、定義收到消息后的回調(diào)
DeliverCallback callback = new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("收到消息:" + new String(message.getBody(), "UTF-8"));
}
};
// 7瞪醋、監(jiān)聽隊列
channel.basicConsume("queue1", true, callback, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
}
});
System.out.println("開始接收消息");
System.in.read();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// 8忿晕、關(guān)閉通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 9、關(guān)閉連接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
四银受、AMQP協(xié)議
AMQP ( Advanced Message Queuing Protocol) 高級消息隊列協(xié)議践盼,是應(yīng)用層協(xié)議的一個開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計宾巍」净茫可基于實現(xiàn)AMQP顶霞,來實現(xiàn)消息中間件肄程。
1、AMQP結(jié)構(gòu)
2选浑、流轉(zhuǎn)過程
AMQP生產(chǎn)者流轉(zhuǎn)過程
connection - channel - publish - close
AMQP消費者流轉(zhuǎn)過程
五蓝厌、RabbitMQ核心概念
核心概念
- Producer:生產(chǎn)者,創(chuàng)建消息發(fā)布到RabbitMQ中
- Consumer:消費者古徒,獲取消息體
- Broker:消息中間件的服務(wù)節(jié)點
- 虛擬主機:每個broker可定義多個虛擬主機拓提,像mysql-server可以定義多個db。RabbitMQ默認(rèn)的vhost(虛擬主機)是 /
- connection:連接隧膘,一個connection可以創(chuàng)建任意個channel
- channel:建立在connection上的通道
- queue:隊列代态,用于存儲消息
- RoutingKey:路由鍵,需要與交換類型和綁定鍵(BindingKey)結(jié)合使用舀寓。生產(chǎn)者發(fā)送消息給交換器時胆数,會指定一個RoutingKey见秤,指定路由規(guī)則
- Binding:綁定淑蔚,將交換器與隊列關(guān)聯(lián)起來
-
exchange:交換器,將生產(chǎn)者發(fā)來的消息路由到一個或多個隊列抒倚。若路由不到,則根據(jù)生產(chǎn)者的屬性配置判莉,返回給生產(chǎn)者或直接丟棄豆挽。
exchange有四種模式,fanout券盅、direct帮哈、topic、headers模式锰镀,若不指定交換機娘侍,則使用默認(rèn)交換機,根據(jù)消息中指定的 queue 的名稱泳炉,匹配到對應(yīng)的queue憾筏。
fanout模式:綁定了的所有queue都會收到消息;
direct模式:將消息路由到BindingKey和Routing Key完全匹配的隊列花鹅;
topic模式:與direct類似氧腰,但可通過通配符進行模糊匹配。* 代表一個單詞刨肃,# 代表多個單詞
headers模式:根據(jù)消息中的 header 屬性匹配古拴。
整體運轉(zhuǎn)流程:
使用 exchange
1、生產(chǎn)者將消息發(fā)送到topic類型的交換器上真友,和routing的用法類似黄痪,都是通過routingKey路由,但topic類型交換器的routingKey支持通配符
// 路由關(guān)系如下:com.# --> queue-1 *.order.* ---> queue-2
// 消息內(nèi)容
String message = "Hello";
// 發(fā)送消息到topic_test交換器上
channel.basicPublish("topic-exchange", "com.order.create", null, message.getBytes());
System.out.println("消息 " + message + " 已發(fā)送锻狗!");
2满力、消費者通過一個臨時隊列和交換器綁定,接收發(fā)送到交換器上的消息
final String queueName = Thread.currentThread().getName();
try {
// 3轻纪、從連接工廠獲取連接
connection = factory.newConnection("消費者");
// 4油额、從鏈接中創(chuàng)建通道
channel = connection.createChannel();
// 定義消息接收回調(diào)對象
DeliverCallback callback = new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println(queueName + " 收到消息:" + new String(message.getBody(), "UTF-8"));
}
};
// 監(jiān)聽隊列
channel.basicConsume(queueName, true, callback, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
}
});
System.out.println(queueName + " 開始接收消息");
System.in.read();
3、使用 fanout 型交換器實現(xiàn) 發(fā)布訂閱模式刻帚。
啟動Consumer 類會開啟兩個消費者潦嘶,Producer 類運行后,兩個消費者都能接收到消息
【注意】利用臨時隊列崇众,可隨時添加一個queue掂僵,且不會互相影響。比如可以啟多個消費者服務(wù)顷歌,則exchange可以綁定多個臨時隊列锰蓬,從而收到發(fā)往exchange的消息。即發(fā)布-訂閱思想
1)消費者:通過一個臨時隊列和交換器綁定眯漩,接收發(fā)送到交換器上的消息
public class Consumer {
private static Runnable receive = new Runnable() {
public void run() {
// 1芹扭、創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
// 2麻顶、設(shè)置連接屬性
factory.setHost("192.168.100.242");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = null;
Channel channel = null;
final String clientName = Thread.currentThread().getName();
try {
// 3、從連接工廠獲取連接
connection = factory.newConnection("消費者");
// 4舱卡、從鏈接中創(chuàng)建通道
channel = connection.createChannel();
// 代碼定義交換器辅肾,不管是生產(chǎn)者或消費者都可以定義
channel.exchangeDeclare("ps_test", "fanout");
// 還可以定義一個臨時隊列,連接關(guān)閉后會自動刪除轮锥,此隊列是一個排他隊列
String queueName = channel.queueDeclare().getQueue();
// 將隊列和交換器綁定
channel.queueBind(queueName, "ps_test", "");
// 定義消息接收回調(diào)對象
DeliverCallback callback = new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println(clientName + " 收到消息:" + new String(message.getBody(), "UTF-8"));
}
};
// 監(jiān)聽隊列
channel.basicConsume(queueName, true, callback, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
}
});
System.out.println(clientName + " 開始接收消息");
System.in.read();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// 8矫钓、關(guān)閉通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 9、關(guān)閉連接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
};
public static void main(String[] args) {
new Thread(receive, "c1").start();
}
}
2)生產(chǎn)者:將消息發(fā)送到fanout類型的交換器上
public class Producer {
public static void main(String[] args) {
// 1舍杜、創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
// 2新娜、設(shè)置連接屬性
factory.setHost("192.168.100.242");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3、從連接工廠獲取連接
connection = factory.newConnection("生產(chǎn)者");
// 4蝴簇、從鏈接中創(chuàng)建通道
channel = connection.createChannel();
// 定義fanout類型的交換器
channel.exchangeDeclare("ps_test", "fanout");
// 消息內(nèi)容
String message = "Hello Publish";
// 發(fā)送消息到ps_test交換器上
channel.basicPublish("ps_test", "", null, message.getBytes());
System.out.println("消息 " + message + " 已發(fā)送杯活!");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// 7匆帚、關(guān)閉通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 8熬词、關(guān)閉連接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
4、實際使用中吸重,若往隊列queue1中發(fā)送多條消息互拾,queue1中堆積了大量消息,要如何加入消息的處理嚎幸?
——可以創(chuàng)建消費者集群
basicQos參數(shù)
- 控制推送消息的大小颜矿,提前預(yù)處理機制,有助于提高數(shù)據(jù)處理效率嫉晶。
- 在并發(fā)低于 1w 情況下骑疆,一般使用默認(rèn)值即可,也可根據(jù)業(yè)務(wù)情況調(diào)整替废。若設(shè)置的太小箍铭,操作太頻繁,不好椎镣。若設(shè)置的太大诈火,某個消費者堆積處理太多消息,其他消費者分不到消息處理状答。
// 同一時刻冷守,服務(wù)器會發(fā)送10條消息給消費者
channel.basicQos(10);