RabbitMQ
簡(jiǎn)介:
AMQP(Adcanced Message Queuing Protocol)
AMQP,即Advanced Message Queuing Protocol贪嫂,一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級(jí)消息隊(duì)列協(xié)議盆犁,是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn)背伴,為面向消息的中間件設(shè)計(jì)宫纬⊙得玻基于此協(xié)議的客戶端與消息中間件可傳遞消息既棺,并不受客戶端/中間件不同產(chǎn)品尺栖,不同的開發(fā)語言等條件的限制抽碌。Erlang中的實(shí)現(xiàn)有RabbitMQ等。
JMS
JMS即Java消息服務(wù)(JavaMessage Service)應(yīng)用程序接口,是一個(gè)Java平臺(tái)中關(guān)于面向消息中間件(MOM)的API货徙,用于在兩個(gè)應(yīng)用程序之間左权,或分布式系統(tǒng)中發(fā)送消息,進(jìn)行異步通信痴颊。
JMS是JavaEE規(guī)范中的一種赏迟,類比JDBC
很多消息中間件都實(shí)現(xiàn)了JMS規(guī)范,例如:ActiveMQ.RabbitMQ官方?jīng)]有提供JMS實(shí)現(xiàn)包蠢棱,但是開源社區(qū)有
AMQP 與 JMS 區(qū)別
JMS是定義了統(tǒng)一的接口锌杀,來對(duì)消息操作進(jìn)行統(tǒng)一;AMQP是通過規(guī)定協(xié)議來統(tǒng)一數(shù)據(jù)交互的格式
JMS限定了必須使用Java語言泻仙;AMQP只是協(xié)議糕再,不規(guī)定實(shí)現(xiàn)方式,因此是跨語言的玉转。
JMS規(guī)定了兩種消息模式突想;而AMQP的消息模式更加豐富
RabbitMQ相關(guān)概念介紹
概述
RabbitMQ是由erlang語言開發(fā),基于AMQP(Advanced Message Queue 高級(jí)消息隊(duì)列協(xié)議)協(xié)議實(shí)現(xiàn)的消息隊(duì)列究抓,它是一種應(yīng)用程序之間的通信方法猾担,消息隊(duì)列在分布式系統(tǒng)開發(fā)中應(yīng)用非常廣泛。
RabbitMQ官方地址:http://www.rabbitmq.com/
RabbitMQ提供了6種模式:簡(jiǎn)單模式刺下,work模式绑嘹,Publish/Subscribe發(fā)布與訂閱模式,Routing路由模式橘茉,Topics主題模式工腋,RPC遠(yuǎn)程調(diào)用模式(遠(yuǎn)程調(diào)用,不太算MQ畅卓;暫不作介紹)擅腰;
官網(wǎng)對(duì)應(yīng)模式介紹:https://www.rabbitmq.com/getstarted.html
相關(guān)部件簡(jiǎn)介
Message
消息,消息是不具名的髓介,它由消息頭和消息體組成。消息體是不透明的筋现,而消息頭則由一系列的可選屬性組成唐础,這些屬性包括routing-key(路由鍵)、priority(相對(duì)于其他消息的優(yōu)先權(quán))矾飞、delivery-mode(指出該消息可能需要持久性存儲(chǔ))等一膨。
Publisher
消息的生產(chǎn)者,也是一個(gè)向交換器發(fā)布消息的客戶端應(yīng)用程序洒沦。
Exchange
交換器豹绪,用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊(duì)列。message到達(dá)broker的第一站,根據(jù)查詢表中的routing key瞒津,分發(fā)到消息隊(duì)列中去蝉衣。
Binding
綁定,用于消息隊(duì)列和交換器之間的關(guān)聯(lián)巷蚪。一個(gè)綁定就是基于路由鍵將交換器和消息隊(duì)列連接起來的路由規(guī)則病毡,所以可以將交換器理解成一個(gè)由綁定構(gòu)成的路由表屁柏。
Queue
消息隊(duì)列啦膜,用來保存消息直到發(fā)送給消費(fèi)者。它是消息的容器淌喻,也是消息的終點(diǎn)僧家。一個(gè)消息可投入一個(gè)或多個(gè)隊(duì)列。消息一直在隊(duì)列里面裸删,等待消費(fèi)者連接到這個(gè)隊(duì)列將其取走八拱。
Connection
網(wǎng)絡(luò)連接,比如一個(gè)TCP連接烁落,連接Producer/Consumer與Broker乘粒。
Channel
信道,多路復(fù)用連接中的一條獨(dú)立的雙向數(shù)據(jù)流通道伤塌。信道是建立在真實(shí)的TCP連接內(nèi)地虛擬連接灯萍,AMQP 命令都是通過信道發(fā)出去的,不管是發(fā)布消息每聪、訂閱隊(duì)列還是接收消息旦棉,這些動(dòng)作都是通過信道完成。因?yàn)閷?duì)于操作系統(tǒng)來說建立和銷毀 TCP 都是非常昂貴的開銷药薯,所以引入了信道的概念绑洛,以復(fù)用一條 TCP 連接。
Consumer
消息的消費(fèi)者童本,表示一個(gè)從消息隊(duì)列中取得消息的客戶端應(yīng)用程序真屯。
Virtual Host
出于多租戶和安全因素設(shè)計(jì),虛擬主機(jī)穷娱,表示一批交換器绑蔫、消息隊(duì)列和相關(guān)對(duì)象。虛擬主機(jī)是共享相同的身份認(rèn)證和加密環(huán)境的獨(dú)立服務(wù)器域泵额。每個(gè) vhost 本質(zhì)上就是一個(gè) mini 版的 RabbitMQ 服務(wù)器配深,擁有自己的隊(duì)列、交換器嫁盲、綁定和權(quán)限機(jī)制篓叶。vhost 是 AMQP 概念的基礎(chǔ),必須在連接時(shí)指定,RabbitMQ 默認(rèn)的 vhost 是 / 缸托。
Broker
接受和分發(fā)消息的應(yīng)用左敌,表示消息隊(duì)列服務(wù)器實(shí)體,RabbitMQ Server 就是Message Broker嗦董。
RabbitMQ安裝(以下步驟基于centos7)
1. 安裝依賴環(huán)境
在線安裝依賴環(huán)境:
shell
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
2. 安裝Erlang
到官網(wǎng)下載以下文件
erlang-18.3-1.el7.centos.x86_64.rpm
socat-1.7.3.2-5.el7.lux.x86_64.rpm
rabbitmq-server-3.6.5-1.noarch.rpm
安裝命令
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
出錯(cuò)情況
如果出現(xiàn)如下錯(cuò)誤
說明gblic 版本太低母谎。我們可以查看當(dāng)前機(jī)器的gblic 版本
strings /lib64/libc.so.6 | grep GLIBC
當(dāng)前最高版本2.12,需要2.15.所以需要升級(jí)glibc
- 使用yum更新安裝依賴
sudo yum install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gcc make -y
-** 下載rpm包**
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-utils-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-static-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-common-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-devel-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/glibc-headers-2.17-55.el6.x86_64.rpm &
wget http://copr-be.cloud.fedoraproject.org/results/mosquito/myrepo-el6/epel-6-x86_64/glibc-2.17-55.fc20/nscd-2.17-55.el6.x86_64.rpm &
-** 安裝rpm包**
sudo rpm -Uvh *-2.17-55.el6.x86_64.rpm --force --nodeps
- 安裝完畢后再查看glibc版本,發(fā)現(xiàn)glibc版本已經(jīng)到2.17了
strings /lib/libc.so.6 | grep GLIBC
3. 安裝RabbitMQ
# 安裝
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
# 安裝
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
4.linux相關(guān)命令介紹
啟動(dòng)
service rabbitmq-server start # 啟動(dòng)服務(wù)
service rabbitmq-server stop # 停止服務(wù)
service rabbitmq-server restart # 重啟服務(wù)
配置文件介紹
cd /usr/share/doc/rabbitmq-server-3.6.5/
cp rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
RabbitMQ管理界面安裝
開啟管理界面
rabbitmq-plugins enable rabbitmq_management
修改默認(rèn)配置信息
vim /usr/lib/ra bbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
訪問管控臺(tái)
http://IP地址:15672
管控臺(tái)基本信息介紹
用戶的身份級(jí)別
1京革、 超級(jí)管理員(administrator)
可登陸管理控制臺(tái)奇唤,可查看所有的信息,并且可以對(duì)用戶匹摇,策略(policy)進(jìn)行操作咬扇。
2、 監(jiān)控者(monitoring)
可登陸管理控制臺(tái)廊勃,同時(shí)可以查看rabbitmq節(jié)點(diǎn)的相關(guān)信息(進(jìn)程數(shù)懈贺,內(nèi)存使用情況,磁盤使用情況等)
3坡垫、 策略制定者(policymaker)
可登陸管理控制臺(tái), 同時(shí)可以對(duì)policy進(jìn)行管理梭灿。但無法查看節(jié)點(diǎn)的相關(guān)信息(上圖紅框標(biāo)識(shí)的部分)。
4冰悠、 普通管理者(management)
僅可登陸管理控制臺(tái)堡妒,無法看到節(jié)點(diǎn)信息,也無法對(duì)策略進(jìn)行管理溉卓。
5皮迟、 其他
無法登陸管理控制臺(tái),通常就是普通的生產(chǎn)者和消費(fèi)者
添加用戶
Virtual Hosts配置
概念
像mysql擁有數(shù)據(jù)庫的概念并且可以指定用戶對(duì)庫和表等操作的權(quán)限桑寨。RabbitMQ也有類似的權(quán)限管理伏尼;在RabbitMQ中可以虛擬消息服務(wù)器Virtual Host,每個(gè)Virtual Hosts相當(dāng)于一個(gè)相對(duì)獨(dú)立的RabbitMQ服務(wù)器尉尾,每個(gè)VirtualHost之間是相互隔離的爆阶。exchange、queue沙咏、message不能互通辨图。 相當(dāng)于mysql的db。Virtual Name一般以/開頭芭碍。
創(chuàng)建Virtual Hosts
設(shè)置Virtual Hosts權(quán)限
**
RabbitMQ快速入門
基本步驟介紹
1.創(chuàng)建工程
2.添加對(duì)應(yīng)的依賴
3.編寫生產(chǎn)者發(fā)送消息
4.編寫消費(fèi)者接受消息
1.創(chuàng)建工程
2.添加對(duì)應(yīng)的依賴
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
3.編寫生產(chǎn)者發(fā)送消息
package com.pjh;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
/*創(chuàng)建連接工廠*/
ConnectionFactory connectionFactory = new ConnectionFactory();
/*設(shè)置RabbitMQ的主機(jī)地址,默認(rèn)為localhost*/
connectionFactory.setHost("121.196.111.120");
/*連接端口*/
connectionFactory.setPort(5672);
/*虛擬主機(jī)名稱徒役,默認(rèn)為/*/
connectionFactory.setVirtualHost("/demo");
/*連接用戶名*/
connectionFactory.setUsername("guest");
/*連接密碼*/
connectionFactory.setPassword("guest");
/*創(chuàng)建連接*/
Connection connection = connectionFactory.newConnection();
/*創(chuàng)建頻道*/
Channel channel = connection.createChannel();
/*聲明創(chuàng)建隊(duì)列*/
/*
* 參數(shù)介紹
* 參數(shù)1:隊(duì)列名稱孽尽,沒有改隊(duì)列就創(chuàng)建一個(gè)
* 參數(shù)2:是否定義持久化隊(duì)列
* 參數(shù)3:是否獨(dú)占本次連接
* 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列
* 參數(shù)5:隊(duì)列其它參數(shù)
* */
channel.queueDeclare("demo",true,false,false,null);
/*要發(fā)送的消息*/
String message="Hello Word=押尽!!";
/*
* 參數(shù)1:交換機(jī)名稱瞻讽,如果沒有指定則使用默認(rèn)Default Exchage
* 參數(shù)2:路由key,簡(jiǎn)單模式可以傳遞隊(duì)列名稱
* 參數(shù)3:消息其它屬性
* 參數(shù)4:消息內(nèi)容
*/
channel.basicPublish("","demo",null,message.getBytes());
System.out.println("已發(fā)送如下消息:");
System.out.println(message);
/*關(guān)閉資源*/
channel.close();
connection.close();
}
}
4.編寫消費(fèi)者接受消息
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
/*獲取連接*/
Connection connection = getConnection();
/*創(chuàng)建頻道*/
Channel channel = connection.createChannel();
/*
* 參數(shù)1:隊(duì)列名稱
* 參數(shù)2:是否定義持久化隊(duì)列
* 參數(shù)3:是否獨(dú)占本次連接
* 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列
* 參數(shù)5:隊(duì)列其它參數(shù)
* */
channel.queueDeclare("demo",true, false, false, null);
/*創(chuàng)建消費(fèi)者鸳吸,并設(shè)置消息處理*/
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* consumerTag 消息者標(biāo)簽,在channel.basicConsume時(shí)候可以指定
* envelope 消息包的內(nèi)容速勇,可從中獲取消息id晌砾,消息routingkey,交換機(jī)烦磁,消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送)
* properties 屬性信息
* body 消息
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//交換機(jī)
System.out.println("交換機(jī)為:" + envelope.getExchange());
//消息id
System.out.println("消息id為:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("接收到的消息為:" + new String(body, "utf-8"));
}
};
//監(jiān)聽消息
/**
* 參數(shù)1:隊(duì)列名稱
* 參數(shù)2:是否自動(dòng)確認(rèn)养匈,設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息都伪,設(shè)置為false則需要手動(dòng)確認(rèn)
* 參數(shù)3:消息接收到后回調(diào)
*/
String demo = channel.basicConsume("demo", true, defaultConsumer);
System.out.println(demo);
//不關(guān)閉資源呕乎,應(yīng)該一直監(jiān)聽消息
channel.close();
connection.close();
}
public static Connection getConnection() throws IOException, TimeoutException {
/*創(chuàng)建連接工廠*/
ConnectionFactory connectionFactory = new ConnectionFactory();
/*設(shè)置主機(jī)地址,默認(rèn)為本機(jī)地址*/
connectionFactory.setHost("121.196.111.120");
/*設(shè)置連接端口號(hào)*/
connectionFactory.setPort(5672);
/*設(shè)置虛擬主機(jī)名稱*/
connectionFactory.setVirtualHost("/demo");
/*設(shè)置連接用戶名*/
connectionFactory.setUsername("guest");
/*設(shè)置連接密碼*/
connectionFactory.setPassword("guest");
/*創(chuàng)建連接*/
return connectionFactory.newConnection();
}
}
相關(guān)方法參數(shù)介紹
channel.queueDeclare
channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
1. queue:
隊(duì)列的名稱 ;
**
2. durable:
是否持久化 陨晶;
- 當(dāng)
durable = false
時(shí)猬仁,隊(duì)列非持久化。因?yàn)殛?duì)列是存放在內(nèi)存中的先誉,所以當(dāng)RabbitMQ重啟或者服務(wù)器重啟時(shí)該隊(duì)列就會(huì)丟失 湿刽; - 當(dāng)
durable = true
時(shí),隊(duì)列持久化褐耳。當(dāng)RabbitMQ重啟后隊(duì)列不會(huì)丟失诈闺。RabbitMQ退出時(shí)它會(huì)將隊(duì)列信息保存到 Erlang自帶的Mnesia數(shù)據(jù)庫 中,當(dāng)RabbitMQ重啟之后會(huì)讀取該數(shù)據(jù)庫 漱病;
**
3. exclusive:
是否排外的 买雾;
- 當(dāng)
exclusive = true
則設(shè)置隊(duì)列為排他的。如果一個(gè)隊(duì)列被聲明為排他隊(duì)列杨帽,該隊(duì)列 僅對(duì)首次聲明它的連接(Connection)可見漓穿,是該Connection私有的,類似于加鎖注盈,并在連接斷開connection.close()
時(shí)自動(dòng)刪除 晃危; - 當(dāng)
exclusive = false
則設(shè)置隊(duì)列為非排他的,此時(shí)不同連接(Connection)的管道Channel可以使用該隊(duì)列 老客;
注意2點(diǎn):
**
- 排他隊(duì)列是 基于連接(Connection) 可見的僚饭,同個(gè)連接(Connection)的不同管道 (Channel) 是可以同時(shí)訪問同一連接創(chuàng)建的排他隊(duì)列 。其他連接是訪問不了的 胧砰,強(qiáng)制訪問將報(bào)錯(cuò):
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'hello-testExclusice' in vhost '/'.
鳍鸵;以下聲明是沒問題的:
Channel channel = connection.createChannel();
Channel channel2 = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, true, false, null);
channel2.queueDeclare(QUEUE_NAME, false, true, false, null);
=》如果是不同的 connection 創(chuàng)建的 channel 和 channel2,那么以上的
=》channel2.queueDeclare()是會(huì)報(bào)錯(cuò)的!!!!!!
-
"首次" 是指如果某個(gè)連接(Connection)已經(jīng)聲明了排他隊(duì)列,其他連接是不允許建立同名的排他隊(duì)列的尉间。這個(gè)與普通隊(duì)列不同:即使該隊(duì)列是持久化的(
durable = true
)偿乖,一旦連接關(guān)閉或者客戶端退出击罪,該排他隊(duì)列都會(huì)被自動(dòng)刪除,這種隊(duì)列適用于一個(gè)客戶端同時(shí)發(fā)送和讀取消息的應(yīng)用場(chǎng)景贪薪。
**
**
4. autoDelete:
是否自動(dòng)刪除 媳禁;如果autoDelete = true
,當(dāng)所有消費(fèi)者都與這個(gè)隊(duì)列斷開連接時(shí)画切,這個(gè)隊(duì)列會(huì)自動(dòng)刪除竣稽。注意: 不是說該隊(duì)列沒有消費(fèi)者連接時(shí)該隊(duì)列就會(huì)自動(dòng)刪除,因?yàn)楫?dāng)生產(chǎn)者聲明了該隊(duì)列且沒有消費(fèi)者連接消費(fèi)時(shí)霍弹,該隊(duì)列是不會(huì)自動(dòng)刪除的毫别。
**
**
5. arguments:
設(shè)置隊(duì)列的其他一些參數(shù),如 x-rnessage-ttl 典格、x-expires 拧烦、x-rnax-length 、x-rnax-length-bytes钝计、 x-dead-letter-exchange恋博、 x-deadletter-routing-key 、 x-rnax-priority 等私恬。