消息隊列中間件是分布式系統(tǒng)中重要的組件牵祟,主要解決應(yīng)用耦合深夯,異步消息,流量削鋒等問題诺苹,實現(xiàn)高性能咕晋,高可用,可伸縮和最終一致性架構(gòu)使用較多的消息隊列有ActiveMQ收奔,RabbitMQ掌呜,Kafka,RocketMQ等筹淫,這里主要講解RabbitMQ的簡單使用
創(chuàng)建SpringBoot項目站辉,并引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Queue方式
- 定義配置類
/**
* @author Gjing
**/
@Configuration
public class RabbitMqConfiguration {
/**
* 聲明一個名為simple的隊列
*/
@Bean
public Queue testQueue() {
return new Queue("simple");
}
}
- 聲明一個生產(chǎn)者
/**
* @author Gjing
**/
@Component
public class Producer {
@Resource
private AmqpTemplate rabbitTemplate;
public void send() {
String message = "hello";
this.rabbitTemplate.convertAndSend("simple", message);
}
}
- 聲明消費者
/**
* @author Gjing
**/
@Slf4j
@Component
public class Consumer {
@RabbitListener(queues = "simple")
public void receive(String message) {
log.info("消費者1收到消息:{}", message);
}
}
- 創(chuàng)建Controller進行調(diào)用
/**
* @author Gjing
**/
@RestController
public class DemoController {
@Resource
private Producer producer;
@PostMapping("/message")
public void send() {
for (int i = 0; i < 10; i++) {
this.producer.send();
}
}
}
- 執(zhí)行結(jié)果
[圖片上傳失敗...(image-67b764-1561538726085)]
topic exchange方式
- 定義配置類
/**
* @author Gjing
**/
@Configuration
public class RabbitMqConfiguration {
/**
* 聲明一個名為topic.message1的隊列
*/
@Bean
public Queue topicQueue() {
return new Queue("topic.message1");
}
/**
* 聲明一個名為topic.message2的隊列
*/
@Bean
public Queue topicQueue2() {
return new Queue("topic.message2");
}
/**
* 聲明一個名為exchange的交換機
*/
@Bean
public TopicExchange exchange() {
return new TopicExchange("exchange");
}
/**
* 將topic.message1的隊列綁定到exchange交換機
*/
@Bean
public Binding bindMessage1() {
return BindingBuilder.bind(topicQueue()).to(exchange()).with("topic.message1");
}
/**
* 將topic.message2的隊列綁定到exchange交換機
*/
@Bean
public Binding bindMessage2() {
return BindingBuilder.bind(topicQueue2()).to(exchange()).with("topic.message2");
}
}
- 定義生產(chǎn)者
/**
* @author Gjing
**/
@Component
public class TopicProducer {
@Resource
private AmqpTemplate rabbitTemplate;
public void send() {
String message1 = "I am topic.message1";
String message2 = "I am topic.message2";
this.rabbitTemplate.convertAndSend("exchange", "topic.message1", message1);
this.rabbitTemplate.convertAndSend("exchange", "topic.message2", message2);
}
}
- 定義消費者1
/**
* @author Gjing
**/
@Component
@Slf4j
public class TopicConsumer1 {
@RabbitListener(queues = "topic.message1")
public void receive(String message) {
log.info("消費者1收到消息:{}", message);
}
}
- 定義消費者2
/**
* @author Gjing
**/
@Component
@Slf4j
public class TopicConusmer2 {
@RabbitListener(queues = "topic.message2")
public void receive(String message) {
log.info("消費者2收到消息:{}", message);
}
}
- 創(chuàng)建controller進行調(diào)用
/**
* @author Gjing
**/
@RestController
public class TopicController {
@Resource
private TopicProducer topicProducer;
@PostMapping("/message-topic")
public void sendMessageTopic() {
for (int i = 0; i < 10; i++) {
this.topicProducer.send();
}
}
}
-
執(zhí)行結(jié)果
1560475964_1_
fanout方式
- 定義配置類
/**
* @author Gjing
**/
@Configuration
public class RabbitMqConfiguration {
/**
* 聲明一個名為fanout.1的隊列
*/
@Bean
public Queue fanoutQueue1() {
return new Queue("fanout.1");
}
/**
* 聲明一個名為fanout.2的隊列
*/
@Bean
public Queue fanoutQueue2() {
return new Queue("fanout.2");
}
/**
* 聲明一個名為fanout.3的隊列
*/
@Bean
public Queue fanoutQueue3() {
return new Queue("fanout.3");
}
/**
* 聲明一個名為fanoutExchange的轉(zhuǎn)發(fā)器
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
/**
* 將隊列fanoutQueue1綁定到fanout轉(zhuǎn)發(fā)器
*/
@Bean
public Binding bindFanout1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
/**
* 將隊列fanoutQueue1綁定到fanout轉(zhuǎn)發(fā)器
*/
@Bean
public Binding bindFanout2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
/**
* 將隊列fanoutQueue1綁定到fanout轉(zhuǎn)發(fā)器
*/
@Bean
public Binding bindFanout3() {
return BindingBuilder.bind(fanoutQueue3()).to(fanoutExchange());
}
}
- 定義生產(chǎn)者
/**
* @author Gjing
**/
@Component
public class FanoutProducer {
@Resource
private AmqpTemplate amqpTemplate;
public void send() {
String message = "hello, I am speaker";
//這里的routingKey會被rabbitMQ忽略,如果不設(shè)置這個參數(shù)會導(dǎo)致發(fā)送消息失敗,
//所以這里隨便寫(我給他空字符串)损姜,rabbitMQ會默認(rèn)發(fā)給所有綁定的
this.amqpTemplate.convertAndSend("fanoutExchange","", message);
}
}
- 定義消費者1
/**
* @author Gjing
**/
@Component
@Slf4j
public class FanoutConsumer1 {
@RabbitListener(queues = "fanout.1")
public void receive(String message) {
log.info("消費者1收到消息:{}", message);
}
}
- 定義消費者2
/**
* @author Gjing
**/
@Component
@Slf4j
public class FanoutConsumer2 {
@RabbitListener(queues = "fanout.2")
public void receive(String message) {
log.info("消費者2收到消息:{}", message);
}
}
- 定義消費者3
/**
* @author Gjing
**/
@Component
@Slf4j
public class FanoutConsumer3 {
@RabbitListener(queues = "fanout.3")
public void receive(String message) {
log.info("消費者3收到消息:{}", message);
}
}
- 創(chuàng)建controller調(diào)用
/**
* @author Gjing
**/
@RestController
public class FanoutController {
@Resource
private FanoutProducer fanoutProducer;
@PostMapping("/message-fanout")
public void sendFanout() {
this.fanoutProducer.send();
}
}
RabbitMQ核心概念
server:又稱Broker饰剥,接受客戶端的連接實現(xiàn)AMQP實體服務(wù);connection:與broker的連接摧阅;channel:網(wǎng)絡(luò)通道汰蓉,幾乎所有的操作都是在channel中進行;message:服務(wù)器和應(yīng)用程序之間傳送的數(shù)據(jù)棒卷,由properties和body組成顾孽,properties可以對消息進行修飾祝钢,比如消息的優(yōu)先級和高級特性, body為消息的內(nèi)容若厚;exchange:交換機拦英,接收消息,根據(jù)路郵件轉(zhuǎn)發(fā)消息到綁定的隊列测秸;binding:exchange和queue之間的虛擬連接疤估,可以包含routing key;Routing key:一個路由規(guī)則霎冯,虛擬機用他確定如何路由一個特定信息铃拇;Queue:也稱為message Queue,消息隊列沈撞,保存信息并將它們轉(zhuǎn)發(fā)給消費者慷荔。
Exchange類型:
-
Fanout:路由規(guī)則是把所有發(fā)送到該Exchange的消息路由到所有與她綁定的Queue中
1
備注:生產(chǎn)者P生產(chǎn)消息1推送到Exchange,由于Exchange Type=fanout這時候會遵循fanout的規(guī)則將消息推送到所有與他綁定的Queue缠俺。
-
direct:把消息路由到那些binding key與routing key完全匹配的Queue中显晶。
2
備注:生產(chǎn)者P發(fā)送消息時Routing key = bloking時,這時候?qū)⑾魉偷紼xchange壹士,Exchange獲取到生產(chǎn)者發(fā)送過來的消息后吧碾,會根據(jù)自身的規(guī)則進行與匹配響應(yīng)的Queue,這時候發(fā)現(xiàn)Queue1和Queue2都符合墓卦,就會將消息傳送給這兩個隊列,如果我們以Routing key = create和routing key = confirm發(fā)送消息時户敬,這時候消息只會被推送到Queue2隊列中落剪,其他的Routing key 的消息會被丟棄。
-
topic:模糊匹配尿庐,通過通配符滿足一部分規(guī)則就可以傳送忠怖,其中注意的是有兩個字符 ‘星號’ 和#號,其中 星號 用于匹配一個單詞抄瑟,#號用于匹配多個單詞(可以是0個)
[圖片上傳失敗...(image-85b63c-1561538726082)]
備注:當(dāng)生產(chǎn)者發(fā)送消息Routing Key=F.C.E的時候凡泣,這時候只滿足Queue1,所以會被路由到Queue中皮假,如果Routing Key=A.C.E這時候會被同是路由到Queue1和Queue2中鞋拟,如果Routing Key=A.F.B時,這里只會發(fā)送一條消息到Queue2中惹资。
常見面試題
- 什么是元數(shù)據(jù)贺纲?元數(shù)據(jù)分為哪些類型?包括哪些內(nèi)容褪测?與cluster相關(guān)的元數(shù)據(jù)有哪些猴誊?元數(shù)據(jù)是如何保存的潦刃?元數(shù)據(jù)在cluster中是如何分布的?
在非cluster模式中懈叹,元數(shù)據(jù)主要分為Queue元數(shù)據(jù)(Queue名字和屬性等)乖杠、Exchange元數(shù)據(jù)(Exchange名字、類型澄成、屬性等)胧洒、binding元數(shù)據(jù)(存放路由關(guān)系的查找表)、vhost元數(shù)據(jù)(vhost范圍內(nèi)針對前三者的名字空間約束和安全屬性設(shè)置)环揽。在cluster模式下略荡,包括cluster中node位置信息和node關(guān)系信息。元數(shù)據(jù)按照erlang node的類型確定是僅保存于RAM中歉胶,還是同時保存在RAM或者Disk上汛兜,元數(shù)據(jù)在cluster中是全node分布的
- rabbitmq的一個Queue中存放的message是否有數(shù)量限制?
可以認(rèn)為無限制通今,限制取決于機器的內(nèi)存粥谬,但是消息過多會導(dǎo)致處理效率的下降。
- rabbitmq如何實現(xiàn)延遲隊列辫塌?
沒有直接支持延遲隊列功能漏策,但是可以通過兩個特性來實現(xiàn)延遲隊列,①TTL:通過隊列屬性設(shè)置臼氨,隊列中的所有消息都有相同的過期時間掺喻、對消息進行單獨設(shè)置,每條消息TTL可以不同储矩。如果同時使用感耙,則消息的過期時間以兩者之間TTL較小的那個數(shù)值為準(zhǔn),消息在隊列的生存時間一旦超過設(shè)置的TTL值持隧,就稱為dead letter即硼。②DLX:Queue可以配置X-dead-letter-exchange和x-dead-letter-routing-key(可選)兩個參數(shù),如果隊列內(nèi)出現(xiàn)了dead letter屡拨,則按照這兩個參數(shù)重新路由轉(zhuǎn)發(fā)到指定的隊列只酥。
X-dead-letter-exchange:出現(xiàn)dead letter之后將dead letter重新發(fā)送到指定exchange
-
出現(xiàn)dead letter之后將dead letter重新按照指定的routing-key發(fā)送
- 出現(xiàn)dead letter的情況有:
- 消息或者隊列的TTL過期; 2. 隊列達(dá)到最大長度; 3. 消息被消費者拒絕