上節(jié)我們簡單了解了springboot2.x整合consul的入門教程,在最后的時候的時候遺留了整合RabbitMQ的過程,接下來我們來看詳細的過程
什么是RabbitMQ?
RabbitMQ是消息隊列的一種,其目的也是為了程序的異步和解耦,同樣也是為了消息的緩存和分發(fā)的作用.
RabbitMQ是實現(xiàn)了高級消息隊列協(xié)議(AMQP)的開源消息代理軟件,用在分布式系統(tǒng)來存儲轉發(fā)消息,其服務器端用Erlang語言編寫剃允,支持多種客戶端,如:Python、Ruby、.NET、Java奔则、JMS、C、PHP煌寇、ActionScript、XMPP逾雄、STOMP 等阀溶,支持 AJAX.在易用性、擴展性鸦泳、高可用性等方面表現(xiàn)不俗.
RabbitMQ相關組件介紹
通常我們在說到消息隊列時,都會談到生產者. 隊列 . 消費者,而RabbitMQ在這之上多了一層抽象,在生產者和隊列之間加入了交換機(Exchange)的概念,使得生產者和隊列沒有直接聯(lián)系,而是將之前的消息轉發(fā)的方式變?yōu)橄仁寝D發(fā)到交換機(Exchange),在由交換機(Exchange)轉發(fā)到消息隊列中.如圖:
我們來看看上圖每一部分都代表著什么?
- 上圖左側P代表生產者.
- 中間的部分為RabbitMQ,其中包含了交換機和隊列.
- 右側C代表消息的消費者.
在上述的組件中,其中最為重要的是虛擬機,交換機,隊列和綁定,分別來看:
- 虛擬主機:一個虛擬主機包括一般持有一組交換機.隊列和綁定,為什么需要多個虛擬主機呢银锻?很簡單, RabbitMQ 當中做鹰,用戶只能在虛擬主機的粒度進行權限控制击纬。 因此,如果需要禁止A組訪問B組的交換機/隊列/綁定钾麸,必須為A和B分別創(chuàng)建一個虛擬主機更振。每一個 RabbitMQ 服務器都有一個默認的虛擬主機''/''.
Exchange
交換機(Exchange)只用來消息的轉發(fā),不做消息的存儲,如果此時沒有隊列(Queue)綁定到交換機,會直接丟掉生產者(Producer)發(fā)送的消息,這里涉及到一個路由鍵的概念,消息交換到交換機的時候,交換機會找到對應的隊列(Queue)中,就是通過路由鍵來決定.
再者交換機(Exchange)的本質只是用作消息的轉發(fā)到綁定的隊列中,常見的交換機有四種模式,分別是:Direct,topic,Headers,Fanout.
- Direct:該類型的消息推送規(guī)則是先匹配,然后在把消息推送到由路由鍵(routingKey)指定的隊列中,如圖所示:
- Topic:按規(guī)則轉發(fā)消息
-
Headers:需要設置headers的屬性(attribute)類型的交換機即可.
-
Fanout:以廣播的方式發(fā)布消息
上述為簡單介紹了交換機(Exchange)的四種模式,圖是從別人那里接的,接下來我們來看重點springboot整合RabbitMQ的過程.
整合RabbitMQ過程
在我們之前搭建的服務(consul-producer)生產者中進行改造,首先是在pom文件中進行RabbitMQ依賴的添加,代碼如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
接著我們來看RabbitMQ來如何配置隊列以及交換機和綁定的過程,代碼如下:
package com.consul.mq;
import org.springframework.context.annotation.Configuration;
/**
* @author cb
*/
@Configuration
public class TopicRabbitConfig {
}
接著來看RabbitMQ的配置文件,代碼如下:
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#配置虛擬主機
spring.rabbitmq.virtual-host=/
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
上述配置中我們看到了username和password,其中guest為RabbitMQ安裝的初始用戶名和初始密碼,想改的話自己可以去改.
具體的創(chuàng)建隊列和交換機以及綁定,我們只需要在RabbitMQ的管理界面手動創(chuàng)建即可,可能大家在看別的文章時,都是通過代碼的方式來創(chuàng)建隊列等,來看如何創(chuàng)建的過程,前期是大家安裝了RabbitMQ,首先來啟動RabbitMQ,cmd到RabbitMQ安裝路徑的sbin目錄下,執(zhí)行:
rabbitmq-plugins enable rabbitmq_management
如圖:
該命令主要是來下載RabbitMQ的一些相關插件,此時我們的服務起來了,接著訪問http://localhost:15672,來到該頁面,輸入用戶名和密碼(均為guest)
我們來看如何創(chuàng)建Exchange和queue,如圖:
上圖為添加Exchange的過程,我們來看添加隊列的過程,如圖:
接著我們來看隊列和交換機如何綁定,如圖:
點擊我們創(chuàng)建的交換機(Exchange),來到上圖的界面,輸入隊列名和以及路由鍵(這里根據(jù)交換機的類型來輸入,有的交換機不需要路由鍵),所有的都準備妥當,我們需要創(chuàng)建消息的生產者和消費者,來看代碼:
package com.consul.send;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* @author cb
*/
@Component
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send1(){
Date date = new Date();
for (int i=0;i<5;i++){
String msg = "在時間為:"+ date+"生產了一個消息";
System.out.println("producer:"+msg);
rabbitTemplate.convertAndSend("topic.exchange","topic.routingKey",msg);
}
}
//fanout類型的
public void fanoutSend(){
String content = "類型為Fanout的交換機:"+System.currentTimeMillis();
System.out.println("FanoutProducer:"+content);
rabbitTemplate.convertAndSend("fanout.exchange","",content);
}
在來看消費者代碼:
package com.consul.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author cb
*/
@Component
public class MessageConsumer {
@RabbitListener(queues = "topic.queueA")
@RabbitHandler
public void receive(String message){
System.out.println("Consumer:"+message);
}
@RabbitListener(queues = "fanout.queue")
@RabbitHandler
public void fanoutReceive(String content){
System.out.println("Consumer:"+content);
}
我們通過@RabbitListener來監(jiān)聽我們創(chuàng)建的隊列,如代碼中的topic.queueA,來看運行結果:
這是我們的后臺消費者端的打印輸出,我們再來看,RabbitMQ管理臺queue中的消息總數(shù),如圖:
消息被我們消費完了,這就是RabbitMQ的簡單使用過程