RabbitMQ是流行的開源消息隊列系統(tǒng)懒豹,用erlang語言開發(fā)糠亩。RabbitMQ是AMQP(高級消息隊列協(xié)議)的標準實現(xiàn)复亏。
RabbitMQ的結(jié)構(gòu)圖如下:
幾個概念說明:
Broker:簡單來說就是消息隊列服務器實體类腮。
Exchange:消息交換機做入,它指定消息按什么規(guī)則冒晰,路由到哪個隊列。
Queue:消息隊列載體竟块,每個消息都會被投入到一個 或多個隊列壶运。
Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來浪秘。
Routing Key:路由關(guān)鍵字蒋情,exchange根據(jù)這個關(guān)鍵字進行消息投遞埠况。
vhost:虛擬主機,一個broker里可以開設多個vhost棵癣,用作不同用戶的權(quán)限分離辕翰。
producer:消息生產(chǎn)者,就是投遞消息的程序狈谊。
consumer:消息消費者喜命,就是接受消息的程序。
channel:消息通道河劝,在客戶端的每個連接里壁榕,可建立多個channel,每個channel代表一個會話任務丧裁。
消息隊列的使用過程大概如下:
(1)客戶端連接到消息隊列服務器护桦,打開一個channel。
(2)客戶端聲明一個exchange煎娇,并設置相關(guān)屬性二庵。
(3)客戶端聲明一個queue,并設置相關(guān)屬性缓呛。
(4)客戶端使用routing key催享,在exchange和queue之間建立好綁定關(guān)系。
(5)客戶端投遞消息到exchange哟绊。
exchange接收到消息后因妙,就根據(jù)消息的key和已經(jīng)設置的binding,進行消息路由票髓,將消息投遞到一個或多個隊列里攀涵。
exchange也有幾個類型,完全根據(jù)key進行投遞的叫做Direct交換機洽沟,例如,綁定時設置了routing key為”abc”裆操,那么客戶端提交的消息怒详,只有設置了key為”abc”的才會投遞到隊列。對key進行模式匹配后進行投遞的叫做Topic交換機踪区,符號”#”匹配一個或多個詞昆烁,符號””匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”缎岗,”abc.”只匹配”abc.def”静尼。還有一種不需要key的,叫做Fanout交換機,它采取廣播模式鼠渺,一個消息進來時蜗元,投遞到與該交換機綁定的所有隊列。
RabbitMQ支持消息的持久化系冗,也就是數(shù)據(jù)寫在磁盤上,為了數(shù)據(jù)安全考慮薪鹦,我想大多數(shù)用戶都會選擇持久化掌敬。消息隊列持久化包括3個部分:
(1)exchange持久化,在聲明時指定durable => 1
(2)queue持久化池磁,在聲明時指定durable => 1
(3)消息持久化奔害,在投遞時指定delivery_mode => 2(1是非持久化)
如果exchange和queue都是持久化的,那么它們之間的binding也是持久化的地熄。如果exchange和queue兩者之間有一個持久化华临,一個非持久化,就不允許建立綁定端考。
以上內(nèi)容來自百度百科rabbitmq
安裝方式:
在安裝RabbitMQ前雅潭,首先需要安裝 Erlang環(huán)境
官網(wǎng):
http://www.erlang.org/
Windows版下載地址:http://www.erlang.org/download
Linux版: 使用yum安裝
#wget http://www.rabbitmq.com/releases/rabbitmq-server
/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rp
#yum install rabbitmq-server-3.6.5-1.noarch.rpm
具體安裝細節(jié)和配置大家可以看這篇文章:
RabbitMQ安裝教程(Windows/Linux都有)
pom.xml添加依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
創(chuàng)建兩個Maven工程,一個作為發(fā)送端却特,一個作為接受端扶供。
配置文件
Sender端:
server.port=8083
spring.application.name=spirng-boot-rabbitmq-sender
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
Receiver端:
server.port=8082
spring.application.name=spirng-boot-rabbitmq-receiver
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
注入Queue
rabbitMq五種消息模式:
1.直接模式Direct
2.工作隊列模式Work Queue
3.發(fā)布/訂閱模式Publish/Subscribe
4.路由模式Routing
5.通配符模式Topics
今天給大家實現(xiàn)最簡單的直接模式(Direct)
注入名字為"redirect"的對列,發(fā)送端和接受端要保證隊列的名字完全一致才能確保通信的成功裂明。
@Configuration
public class SenderConfig {
@Bean
public Queue queue() {
return new Queue("redirect");
}
}
發(fā)送消息的方法椿浓,為了便于觀察,這里使用了定時任務闽晦,每5秒鐘發(fā)送一條數(shù)據(jù)扳碍,有關(guān)定時任務的使用方法,大家可以看我之前寫的博客仙蛉,特別簡單笋敞。
@Component
public class Sender {
@Autowired
private AmqpTemplate template;
@Scheduled(cron="0/5 * * * * ? ") //每5秒執(zhí)行一次
public void send() {
DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
template.convertAndSend("redirect",sdf.format(new Date())+"消息來啦!");
}
}
啟動方法捅儒,開啟定時任務的注解液样。
@SpringBootApplication
@EnableScheduling
public class SpringBootRabbitMqSenderApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootRabbitMqSenderApplication.class, args);
}
}
Receiver端只需配置個Rabbit監(jiān)聽器,監(jiān)聽指定名字的隊列巧还,收到消息后鞭莽,直接控制臺打印出來。
@Component
public class Receive {
@RabbitListener(queues="redirect") //監(jiān)聽器監(jiān)聽指定的redirect
public void processC(Object obj) {
System.out.println("receiver:"+obj.toString());
}
}
然后分別啟動Sender端和Receiver端麸祷,發(fā)現(xiàn)Receiver端控制臺不斷輸出內(nèi)容澎怒。
一對多發(fā)送:
上面測試的是一個生產(chǎn)者對應一個消費者,下面我們看一下一個生產(chǎn)者對應多個消費者,,接收端新增一個方法喷面,發(fā)送端不變星瘾,隊列名字不變。
@Component
public class Receive2 {
//消費者1
@RabbitListener(queues="direct")
public void processC(String obj) {
System.out.println("Receiver1:"+obj);
}
//消費者2
@RabbitListener(queues="direct")
public void processC2(String obj) {
System.out.println("Receiver2:"+obj);
}
}
結(jié)論:
一個生產(chǎn)者惧辈,多個消費者琳状,消息會平均分配到每個消費者中!
多對多發(fā)送
再來看一下盒齿,多個生產(chǎn)者對應多個消費者念逞,會產(chǎn)生怎樣的情況?發(fā)送端復制一分發(fā)送的方法边翁。
@Component
public class Sender {
@Autowired
private AmqpTemplate template;
@Scheduled(cron="0/5 * * * * ? ") //每5秒執(zhí)行一次
public void send() {
DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//第一個參數(shù) 交換機名稱 第二個參數(shù)是 隊列的key 第三個參數(shù)是發(fā)送的內(nèi)容
template.convertAndSend("direct",sdf.format(new Date()));
}
@Scheduled(cron="0/5 * * * * ? ") //每5秒執(zhí)行一次
public void send2() {
DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//第一個參數(shù) 交換機名稱 第二個參數(shù)是 隊列的key 第三個參數(shù)是發(fā)送的內(nèi)容
template.convertAndSend("direct",sdf.format(new Date()));
}
}
結(jié)論
多個生產(chǎn)者翎承,多個消費者,消息同樣會平均分配到每個消費者中符匾!
發(fā)送對象:
需要注意的是實體類要實現(xiàn)序列化叨咖。
public class User implements Serializable{
private static final long serialVersionUID = 1L;
private int id;
private String name;
private String gender;
public User(int id, String name, String gender) {
super();
this.id = id;
this.name = name;
this.gender = gender;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getGender() {
return gender;
}
public void setGender(String gender) {
this.gender = gender;
}
@Override
public String toString() {
return "User [id=" + id + ", " + (name != null ? "name=" + name + ", " : "")
+ (gender != null ? "gender=" + gender : "") + "]";
}
}
發(fā)送內(nèi)容改為對象
@Component
public class Sender {
@Autowired
private AmqpTemplate template;
//@Scheduled(cron="0/5 * * * * ? ") //每5秒執(zhí)行一次
public void send(User user) {
template.convertAndSend("direct",user);
}
}
接收內(nèi)容變?yōu)閷ο?/p>
@Component
public class Receive2 {
//消費者
@RabbitListener(queues="direct")
public void processC(User user) {
System.out.println("Receive:"+user);
}
}
開始測試
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootRabbitMqSenderApplicationTests {
@Autowired
private Sender helloSender;
//Direct模式
@Test
public void send() {
helloSender.send(new User(1, "張三", "男"));
}
}
測試結(jié)果如下:
具體的數(shù)據(jù)傳輸信息可以瀏覽器訪問http://localhost:15672獲取隊列實時狀態(tài)和相關(guān)信息。
到這里啊胶,rabbitMqDirect模式已經(jīng)整合完成甸各,日后我會繼續(xù)將其他幾種消息模式整合完畢,敬請期待创淡!