最近集成了RabbitMQ消息隊(duì)列展父,試著研究一下RabbitMQ的幾種模式孤钦,參考官方文檔
新建項(xiàng)目什么的不講了。
配置pom.xml文件,用到了springboot對于AMQP(高級消息隊(duì)列協(xié)議,即面向消息的中間件的設(shè)計)
<!-- 添加springboot對amqp的支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
創(chuàng)建個rabbit目錄皂冰,然后新建三個類吠裆,sender,receiver够掠,config
Work模式
@Configuration
public class MQConfig {
public static final String QUEUE = "queue";
/**
* Direct模式 交換機(jī)Exchange
* */
@Bean
public Queue queue() {
return new Queue(QUEUE, true);
}
}
config里配置好一個最簡單的queue
然后sender里使用AmqpTemplate去發(fā)送消息
@Service
public class MQSender {
private static Logger log = LoggerFactory.getLogger(MQSender.class);
@Autowired
AmqpTemplate amqpTemplate ;
public void send(Object message) {
String msg = RedisService.beanToString(message);//這里只是我其他類里一個Object對象轉(zhuǎn)String對象民褂,你們可以自己實(shí)現(xiàn)
log.info("send message:"+msg);
amqpTemplate.convertAndSend(MQConfig.QUEUE, msg);
}
}
然后在 receiver里接收消息
@Service
public class MQReceiver {
private static Logger log = LoggerFactory.getLogger(MQReceiver.class);
@RabbitListener(queues=MQConfig.QUEUE)
public void receive(String message) {
log.info("receive message:"+message);
}
}
在Controller調(diào)用send方法運(yùn)行之后控制臺會輸出如下結(jié)果
2019-02-18 11:31:33.377 INFO 2356 --- [nio-8080-exec-1] com.imooc.miaosha.rabbitmq.MQSender : send message:hello,world
2019-02-18 11:31:33.514 INFO 2356 --- [cTaskExecutor-1] com.imooc.miaosha.rabbitmq.MQReceiver : receive message:hello,world
公平派遣與循環(huán)派遣
默認(rèn)情況下,RabbitMQ將按順序?qū)⒚織l消息發(fā)送給下一個消費(fèi)者疯潭。平均而言赊堪,每個消費(fèi)者將獲得相同數(shù)量的消息。這種分發(fā)消息的方式稱為循環(huán)法竖哩。在這種模式下哭廉,調(diào)度不一定完全符合我們的要求。例如相叁,在有兩個工人的情況下遵绰,當(dāng)所有奇怪的消息都很重,甚至消息很輕時增淹,一個工人將經(jīng)常忙椿访,而另一個工作人員幾乎不會做任何工作。那么虑润,RabbitMQ對此一無所知成玫,仍然會均勻地發(fā)送消息。
發(fā)生這種情況是因?yàn)镽abbitMQ只是在消息進(jìn)入隊(duì)列時調(diào)度消息。它不會查看消費(fèi)者未確認(rèn)消息的數(shù)量哭当。它只是盲目地向第n個消費(fèi)者發(fā)送每個第n個消息猪腕。
Fanout 交換機(jī)模式
正如您可能從名稱中猜到的那樣,它只是將收到的所有消息廣播到它知道的所有隊(duì)列中钦勘。而這正是我們傳播信息所需要的陋葡。
參照官方實(shí)例的實(shí)現(xiàn),代碼如下
config彻采,里面的自動刪除隊(duì)列
@Configuration
public class MQConfig {
/**
* Fanout模式 交換機(jī)Exchange
* */
@Bean
public Queue autoDeleteQueue1() {
return new AnonymousQueue();
}
@Bean
public Queue autoDeleteQueue2 () {
return new AnonymousQueue();
}
@Bean
public FanoutExchange fanoutExchage(){
return new FanoutExchange(FANOUT_EXCHANGE);
}
@Bean
public Binding FanoutBinding1() {
return BindingBuilder.bind(autoDeleteQueue1()).to(fanoutExchage());
}
@Bean
public Binding FanoutBinding2() {
return BindingBuilder.bind(autoDeleteQueue2()).to(fanoutExchage());
}
}
臨時隊(duì)列 (AnonymousQueue)
您可能還記得以前我們使用的是具有特定名稱的隊(duì)列脖岛。能夠命名隊(duì)列對我們來說至關(guān)重要 - 我們需要將工作人員指向同一個隊(duì)列。當(dāng)您想要在生產(chǎn)者和消費(fèi)者之間共享隊(duì)列時颊亮,為隊(duì)列命名非常重要。
但是我們的Fanout示例并非如此陨溅。我們希望了解所有消息终惑,而不僅僅是其中的一部分消息。我們也只對目前流動的消息感興趣门扇,而不是舊消息雹有。要解決這個問題,我們需要兩件事臼寄。
首先霸奕,每當(dāng)我們連接到Rabbit時,我們都需要一個新的空隊(duì)列吉拳。為此质帅,我們可以使用隨機(jī)名稱創(chuàng)建隊(duì)列,或者更好 - 讓服務(wù)器為我們選擇隨機(jī)隊(duì)列名稱留攒。
其次煤惩,一旦我們斷開消費(fèi)者,就應(yīng)該自動刪除隊(duì)列炼邀。為了使用Spring AMQP客戶端執(zhí)行此操作魄揉,我們定義了AnonymousQueue,它使用生成的名稱創(chuàng)建一個非持久的拭宁,獨(dú)占的自動刪除隊(duì)列
sender里添加代碼:
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void sendFanout() {
StringBuilder builder = new StringBuilder("Hello");
if (dots.incrementAndGet() == 3) {
dots.set(1);
}
for (int i = 0; i < dots.get(); i++) {
builder.append('.');
}
builder.append(count.incrementAndGet());
String message = builder.toString();
amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE, "", message);
log.info("sendFanout [x] Sent '" + message + "'");
}
receiver里添加代碼:
@RabbitListener(queues="#{autoDeleteQueue1.name}")
public void receiveFanout1(String in) throws InterruptedException{
StopWatch watch = new StopWatch();
watch.start();
log.info("receiverFanout 1 " + " [x] Received '" + in + "'");
doWork(in);
watch.stop();
log.info("receiverFanout 1 " + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
}
@RabbitListener(queues="#{autoDeleteQueue2.name}")
public void receiveFanout2(String in) throws InterruptedException{
StopWatch watch = new StopWatch();
watch.start();
log.info("receiverFanout 2 " + " [x] Received '" + in + "'");
doWork(in);
watch.stop();
log.info("receiverFanout 2 " + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
}
運(yùn)行之后控制臺可以看到如下信息洛退,可見exchange把消息廣播給了所有的receiver。適用于需要廣播的操作
Direct 交換機(jī)模式
我們會將消息發(fā)送給特定的隊(duì)列杰标,而不是廣播兵怯。我們使用一個key作為路由鍵。這樣接收程序?qū)⒛軌蜻x擇它想要接收(或訂閱)的key
config代碼里添加:
/**
* Direct模式 交換機(jī)Exchange
* */
@Bean
public DirectExchange direct(){
return new DirectExchange(DIRECT_EXCHANGE);
}
@Bean
public Binding DirectBinding1() {
return BindingBuilder.bind(autoDeleteQueue1())
.to(direct())
.with("orange");
}
@Bean
public Binding DirectBinding2() {
return BindingBuilder.bind(autoDeleteQueue1())
.to(direct())
.with("black");
}
@Bean
public Binding DirectBinding3() {
return BindingBuilder.bind(autoDeleteQueue2())
.to(direct())
.with("green");
}
@Bean
public Binding DirectBinding4() {
return BindingBuilder.bind(autoDeleteQueue2())
.to(direct())
.with("black");
}
sender代碼里添加:
private final String[] keys = {"orange", "black", "green"};
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void sendDirect() {
StringBuilder builder = new StringBuilder("Hello to ");
if (dots.incrementAndGet() == 3) {
dots.set(0);
}
String key = keys[this.dots.get()];
builder.append(key).append(' ');
builder.append(this.count.get());
String message = builder.toString();
amqpTemplate.convertAndSend(MQConfig.DIRECT_EXCHANGE, key, message);
log.info("sendDirect [x] Sent '" + message + "'");
}
receiver代碼添加:
@RabbitListener(queues="#{autoDeleteQueue1.name}")
public void receiveFanout1(String in) throws InterruptedException{
StopWatch watch = new StopWatch();
watch.start();
log.info("receiverDirect 1 " + " [x] Received '" + in + "'");
doWork(in);
watch.stop();
log.info("receiverDirect 1 " + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
}
@RabbitListener(queues="#{autoDeleteQueue2.name}")
public void receiveFanout2(String in) throws InterruptedException{
StopWatch watch = new StopWatch();
watch.start();
log.info("receiverDirect 2 " + " [x] Received '" + in + "'");
doWork(in);
watch.stop();
log.info("receiverDirect 2 " + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
}
運(yùn)行后結(jié)果如下在旱,可見摇零,兩個receiver都能收到black信息,而receiver1只能收到green信息而receiver2只能收到orange信息。Topic 交換機(jī)模式
主題交流
發(fā)送到主題交換的消息不能具有任意的 routing_key - 它必須是由點(diǎn)分隔的單詞列表驻仅。單詞可以是任何內(nèi)容谅畅,但通常它們指定與消息相關(guān)的一些功能。一些有效的路由鍵示例:“ stock.usd.nyse ”噪服,“ nyse.vmw ”毡泻,“ quick.orange.rabbit ”。路由密鑰中可以包含任意數(shù)量的單詞粘优,最多可達(dá)255個字節(jié)仇味。
綁定密鑰也必須采用相同的形式。主題交換背后的邏輯 類似于直接交換- 使用特定路由密鑰發(fā)送的消息將被傳遞到與匹配綁定密鑰綁定的所有隊(duì)列雹顺。但是綁定鍵有兩個重要的特殊情況:
- *(星號)可以替代一個單詞丹墨。
-
#(hash)可以替換零個或多個單詞。
config添加代碼如下:
/**
* Topic模式 交換機(jī)Exchange
* */
@Bean
public TopicExchange topicExchage(){
return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
public Binding topicBinding1() {
return BindingBuilder.bind(autoDeleteQueue1()).to(topicExchage()).with("*.orange.*");
}
@Bean
public Binding topicBinding2() {
return BindingBuilder.bind(autoDeleteQueue1()).to(topicExchage()).with("*.*.rabbit");
}
@Bean
public Binding topicBinding3() {
return BindingBuilder.bind(autoDeleteQueue2()).to(topicExchage()).with("lazy.#");
}
sender添加如下代碼:
private final String[] topickeys = {"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox",
"lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"};
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void sendTopic() {
StringBuilder builder = new StringBuilder("Hello to ");
if (this.dots.incrementAndGet() == topickeys.length) {
this.dots.set(0);
}
String key = topickeys[this.dots.get()];
builder.append(key).append(' ');
builder.append(this.count.incrementAndGet());
String message = builder.toString();
amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, key, message);
log.info(" [x] Sent '" + message + "'");
}
receiver代碼添加如下:
@RabbitListener(queues="#{autoDeleteQueue1.name}")
public void receiveTopic1(String in) throws InterruptedException{
StopWatch watch = new StopWatch();
watch.start();
log.info("receiverTopic 1 " + " [x] Received '" + in + "'");
doWork(in);
watch.stop();
log.info("receiverTopic 1 " + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
}
@RabbitListener(queues="#{autoDeleteQueue2.name}")
public void receiveTopic2(String in) throws InterruptedException{
StopWatch watch = new StopWatch();
watch.start();
log.info("receiverTopic 2 " + " [x] Received '" + in + "'");
doWork(in);
watch.stop();
log.info("receiverTopic 2 " + " [x] Done in " + watch.getTotalTimeSeconds() + "s");
}
運(yùn)行后得到如下結(jié)果嬉愧,路由密鑰設(shè)置為“ quick.orange.rabbit ”的消息將傳遞到兩個隊(duì)列贩挣。消息“ lazy.orange.elephant ”也將同時發(fā)送給他們。另一方面没酣,“ quick.orange.fox ”只會轉(zhuǎn)到第一個隊(duì)列王财,而“ lazy.brown.fox ”只會轉(zhuǎn)到第二個隊(duì)列≡1悖“ lazy.pink.rabbit ”將僅傳遞到第二個隊(duì)列一次绒净,即使它匹配兩個綁定〕ニィ“ quick.brown.fox ”與任何綁定都不匹配挂疆,因此它將被丟棄。
如果我們違反合同并發(fā)送帶有一個或四個單詞的消息哎垦,例如“ orange ”或“ quick.orange.male.rabbit ”囱嫩,會發(fā)生什么?好吧漏设,這些消息將不匹配任何綁定墨闲,并將丟失。
另一方面郑口,“ lazy.orange.male.rabbit ”鸳碧,即使它有四個單詞,也會匹配最后一個綁定犬性,并將被傳遞到第二個隊(duì)列瞻离。
可見,topic模式能夠?qū)崿F(xiàn)更加靈活的控制信息流向