消息隊列中間件是分布式系統(tǒng)中重要的組件拼缝,主要解決應(yīng)用耦合,異步消息了嚎,流量削鋒等問題泪漂,實現(xiàn)高性能,高可用歪泳,可伸縮和最終一致性架構(gòu)使用較多的消息隊列有ActiveMQ萝勤,RabbitMQ,Kafka呐伞,RocketMQ等敌卓,這里主要講解RabbitMQ的簡單使用
一、 創(chuàng)建SpringBoot項目伶氢,并引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
二趟径、 Queue方式
1、定義配置類
/**
* @author Gjing
**/
@Configuration
public class RabbitMqConfiguration {
/**
* 聲明一個名為simple的隊列
*/
@Bean
public Queue testQueue() {
return new Queue("simple");
}
}
2癣防、聲明一個生產(chǎn)者
/**
* @author Gjing
**/
@Component
public class Producer {
@Resource
private AmqpTemplate rabbitTemplate;
public void send() {
String message = "hello";
this.rabbitTemplate.convertAndSend("simple", message);
}
}
3蜗巧、聲明消費者
/**
* @author Gjing
**/
@Slf4j
@Component
public class Consumer {
@RabbitListener(queues = "simple")
public void receive(String message) {
log.info("消費者1收到消息:{}", message);
}
}
4、創(chuàng)建Controller進(jìn)行調(diào)用
/**
* @author Gjing
**/
@RestController
public class DemoController {
@Resource
private Producer producer;
@PostMapping("/message")
public void send() {
for (int i = 0; i < 10; i++) {
this.producer.send();
}
}
}
執(zhí)行結(jié)果
三蕾盯、topic exchange方式
1幕屹、定義配置類
/**
* @author Gjing
**/
@Configuration
public class RabbitMqConfiguration {
/**
* 聲明一個名為topic.message1的隊列
*/
@Bean
public Queue topicQueue() {
return new Queue("topic.message1");
}
/**
* 聲明一個名為topic.message2的隊列
*/
@Bean
public Queue topicQueue2() {
return new Queue("topic.message2");
}
/**
* 聲明一個名為exchange的交換機(jī)
*/
@Bean
public TopicExchange exchange() {
return new TopicExchange("exchange");
}
/**
* 將topic.message1的隊列綁定到exchange交換機(jī)
*/
@Bean
public Binding bindMessage1() {
return BindingBuilder.bind(topicQueue()).to(exchange()).with("topic.message1");
}
/**
* 將topic.message2的隊列綁定到exchange交換機(jī)
*/
@Bean
public Binding bindMessage2() {
return BindingBuilder.bind(topicQueue2()).to(exchange()).with("topic.message2");
}
}
2、定義生產(chǎn)者
/**
* @author Gjing
**/
@Component
public class TopicProducer {
@Resource
private AmqpTemplate rabbitTemplate;
public void send() {
String message1 = "I am topic.message1";
String message2 = "I am topic.message2";
this.rabbitTemplate.convertAndSend("exchange", "topic.message1", message1);
this.rabbitTemplate.convertAndSend("exchange", "topic.message2", message2);
}
}
3、定義消費者1
/**
* @author Gjing
**/
@Component
@Slf4j
public class TopicConsumer1 {
@RabbitListener(queues = "topic.message1")
public void receive(String message) {
log.info("消費者1收到消息:{}", message);
}
}
4望拖、定義消費者2
/**
* @author Gjing
**/
@Component
@Slf4j
public class TopicConusmer2 {
@RabbitListener(queues = "topic.message2")
public void receive(String message) {
log.info("消費者2收到消息:{}", message);
}
}
5迅腔、創(chuàng)建controller進(jìn)行調(diào)用
/**
* @author Gjing
**/
@RestController
public class TopicController {
@Resource
private TopicProducer topicProducer;
@PostMapping("/message-topic")
public void sendMessageTopic() {
for (int i = 0; i < 10; i++) {
this.topicProducer.send();
}
}
}
執(zhí)行結(jié)果
四、fanout方式
1靠娱、定義配置類
/**
* @author Gjing
**/
@Configuration
public class RabbitMqConfiguration {
/**
* 聲明一個名為fanout.1的隊列
*/
@Bean
public Queue fanoutQueue1() {
return new Queue("fanout.1");
}
/**
* 聲明一個名為fanout.2的隊列
*/
@Bean
public Queue fanoutQueue2() {
return new Queue("fanout.2");
}
/**
* 聲明一個名為fanout.3的隊列
*/
@Bean
public Queue fanoutQueue3() {
return new Queue("fanout.3");
}
/**
* 聲明一個名為fanoutExchange的轉(zhuǎn)發(fā)器
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
/**
* 將隊列fanoutQueue1綁定到fanout轉(zhuǎn)發(fā)器
*/
@Bean
public Binding bindFanout1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
/**
* 將隊列fanoutQueue1綁定到fanout轉(zhuǎn)發(fā)器
*/
@Bean
public Binding bindFanout2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
/**
* 將隊列fanoutQueue1綁定到fanout轉(zhuǎn)發(fā)器
*/
@Bean
public Binding bindFanout3() {
return BindingBuilder.bind(fanoutQueue3()).to(fanoutExchange());
}
}
2、定義生產(chǎn)者
/**
* @author Gjing
**/
@Component
public class FanoutProducer {
@Resource
private AmqpTemplate amqpTemplate;
public void send() {
String message = "hello, I am speaker";
//這里的routingKey會被rabbitMQ忽略掠兄,如果不設(shè)置這個參數(shù)會導(dǎo)致發(fā)送消息失敗,
//所以這里隨便寫(我給他空字符串)像云,rabbitMQ會默認(rèn)發(fā)給所有綁定的
this.amqpTemplate.convertAndSend("fanoutExchange","", message);
}
}
3、定義消費者1
/**
* @author Gjing
**/
@Component
@Slf4j
public class FanoutConsumer1 {
@RabbitListener(queues = "fanout.1")
public void receive(String message) {
log.info("消費者1收到消息:{}", message);
}
}
4蚂夕、定義消費者2
/**
* @author Gjing
**/
@Component
@Slf4j
public class FanoutConsumer2 {
@RabbitListener(queues = "fanout.2")
public void receive(String message) {
log.info("消費者2收到消息:{}", message);
}
}
5迅诬、定義消費者3
/**
* @author Gjing
**/
@Component
@Slf4j
public class FanoutConsumer3 {
@RabbitListener(queues = "fanout.3")
public void receive(String message) {
log.info("消費者3收到消息:{}", message);
}
}
6、創(chuàng)建controller調(diào)用
/**
* @author Gjing
**/
@RestController
public class FanoutController {
@Resource
private FanoutProducer fanoutProducer;
@PostMapping("/message-fanout")
public void sendFanout() {
this.fanoutProducer.send();
}
}
執(zhí)行結(jié)果
五婿牍、RabbitMQ核心概念
server:又稱Broker侈贷,接受客戶端的連接實現(xiàn)AMQP實體服務(wù);connection:與broker的連接等脂;channel:網(wǎng)絡(luò)通道俏蛮,幾乎所有的操作都是在channel中進(jìn)行;message:服務(wù)器和應(yīng)用程序之間傳送的數(shù)據(jù)上遥,由properties和body組成搏屑,properties可以對消息進(jìn)行修飾,比如消息的優(yōu)先級和高級特性粉楚, body為消息的內(nèi)容辣恋;exchange:交換機(jī),接收消息模软,根據(jù)路郵件轉(zhuǎn)發(fā)消息到綁定的隊列伟骨;binding:exchange和queue之間的虛擬連接,可以包含routing key燃异;Routing key:一個路由規(guī)則携狭,虛擬機(jī)用他確定如何路由一個特定信息;Queue:也稱為message Queue回俐,消息隊列暑中,保存信息并將它們轉(zhuǎn)發(fā)給消費者。
Exchange類型:
-
Fanout:路由規(guī)則是把所有發(fā)送到該Exchange的消息路由到所有與她綁定的Queue中
備注:生產(chǎn)者P生產(chǎn)消息1推送到Exchange鲫剿,由于Exchange Type=fanout這時候會遵循fanout的規(guī)則將消息推送到所有與他綁定的Queue鳄逾。 -
direct:把消息路由到那些binding key與routing key完全匹配的Queue中。
備注:生產(chǎn)者P發(fā)送消息時Routing key = booking時灵莲,這時候?qū)⑾魉偷紼xchange雕凹,Exchange獲取到生產(chǎn)者發(fā)送過來的消息后,會根據(jù)自身的規(guī)則進(jìn)行與匹配響應(yīng)的Queue,這時候發(fā)現(xiàn)Queue1和Queue2都符合枚抵,就會將消息傳送給這兩個隊列线欲,如果我們以Routing key = create和routing key = confirm發(fā)送消息時,這時候消息只會被推送到Queue2隊列中汽摹,其他的Routing key 的消息會被丟棄李丰。 -
topic:模糊匹配,通過通配符滿足一部分規(guī)則就可以傳送逼泣,其中注意的是有兩個字符 ‘星號’ 和#號趴泌,其中 星號 用于匹配一個單詞,#號用于匹配多個單詞(可以是0個)
備注:當(dāng)生產(chǎn)者發(fā)送消息Routing Key=F.C.E的時候拉庶,這時候只滿足Queue1嗜憔,所以會被路由到Queue中,如果Routing Key=A.C.E這時候會被同是路由到Queue1和Queue2中氏仗,如果Routing Key=A.F.B時吉捶,這里只會發(fā)送一條消息到Queue2中。
常見面試題
- 什么是元數(shù)據(jù)皆尔?元數(shù)據(jù)分為哪些類型呐舔?包括哪些內(nèi)容?與cluster相關(guān)的元數(shù)據(jù)有哪些慷蠕?元數(shù)據(jù)是如何保存的滋早?元數(shù)據(jù)在cluster中是如何分布的?
在非cluster模式中砌们,元數(shù)據(jù)主要分為Queue元數(shù)據(jù)(Queue名字和屬性等)杆麸、Exchange元數(shù)據(jù)(Exchange名字、類型浪感、屬性等)昔头、binding元數(shù)據(jù)(存放路由關(guān)系的查找表)、vhost元數(shù)據(jù)(vhost范圍內(nèi)針對前三者的名字空間約束和安全屬性設(shè)置)影兽。在cluster模式下揭斧,包括cluster中node位置信息和node關(guān)系信息。元數(shù)據(jù)按照erlang node的類型確定是僅保存于RAM中峻堰,還是同時保存在RAM或者Disk上讹开,元數(shù)據(jù)在cluster中是全node分布的
- rabbitmq的一個Queue中存放的message是否有數(shù)量限制?
可以認(rèn)為無限制捐名,限制取決于機(jī)器的內(nèi)存旦万,但是消息過多會導(dǎo)致處理效率的下降。
- rabbitmq如何實現(xiàn)延遲隊列镶蹋?
沒有直接支持延遲隊列功能成艘,但是可以通過兩個特性來實現(xiàn)延遲隊列赏半,①TTL:通過隊列屬性設(shè)置,隊列中的所有消息都有相同的過期時間淆两、對消息進(jìn)行單獨設(shè)置断箫,每條消息TTL可以不同。如果同時使用秋冰,則消息的過期時間以兩者之間TTL較小的那個數(shù)值為準(zhǔn)仲义,消息在隊列的生存時間一旦超過設(shè)置的TTL值,就稱為dead letter剑勾。②DLX:Queue可以配置X-dead-letter-exchange和x-dead-letter-routing-key(可選)兩個參數(shù)埃撵,如果隊列內(nèi)出現(xiàn)了dead letter,則按照這兩個參數(shù)重新路由轉(zhuǎn)發(fā)到指定的隊列甥材。
- X-dead-letter-exchange:出現(xiàn)dead letter之后將dead letter重新發(fā)送到指定exchange
- 出現(xiàn)dead letter之后將dead letter重新按照指定的routing-key發(fā)送
- 出現(xiàn)dead letter的情況有:
- 消息或者隊列的TTL過期; 2. 隊列達(dá)到最大長度; 3. 消息被消費者拒絕
- 出現(xiàn)dead letter的情況有:
前往第二章:SpringBoot使用RabbitMQ(二)
以上為個人見解,如有誤歡迎各位指正