我們之前學(xué)習(xí)的都是一個(gè)消息只能被一個(gè)消費(fèi)者消費(fèi),那么如果我想發(fā)一個(gè)消息 能被多個(gè)消費(fèi)者消費(fèi),這時(shí)候怎么辦? 這時(shí)候我們就得用到了消息中的發(fā)布訂閱模型膀斋。
1速缆、解讀
在前面的教程中,我們創(chuàng)建了一個(gè)工作隊(duì)列拆魏,都是一個(gè)任務(wù)只交給一個(gè)消費(fèi)者。 這次我們做 將消息發(fā)送給多個(gè)消費(fèi)者。這種模式叫做“發(fā)布/訂閱”嘹害。
類似微信訂閱號(hào) 發(fā)布文章消息 就可以廣播給所有的接收者。(訂閱者)
那么咱們來(lái)看一下圖,我們學(xué)過(guò)前兩種有一些不一樣,work模式 是不是同一個(gè)隊(duì)列 多個(gè)消費(fèi)者,而ps這種模式呢,是一個(gè)隊(duì)列對(duì)應(yīng)一個(gè)消費(fèi)者,Publish模式還多了一個(gè)exchange(交換機(jī) 轉(zhuǎn)發(fā)器) ,這時(shí)候我們要獲取消息 就需要隊(duì)列綁定到交換機(jī)上,交換機(jī)把消息發(fā)送到隊(duì)列 , 消費(fèi)者才能獲取隊(duì)列的消息
解讀:
1吮便、1個(gè)生產(chǎn)者笔呀,多個(gè)消費(fèi)者
2、每一個(gè)消費(fèi)者都有自己的一個(gè)隊(duì)列
3髓需、生產(chǎn)者沒(méi)有將消息直接發(fā)送到隊(duì)列许师,而是發(fā)送到了交換機(jī)(轉(zhuǎn)發(fā)器)
4、每個(gè)隊(duì)列都要綁定到交換機(jī)
5僚匆、生產(chǎn)者發(fā)送的消息微渠,經(jīng)過(guò)交換機(jī),到達(dá)隊(duì)列咧擂,實(shí)現(xiàn)逞盆,一個(gè)消息被多個(gè)消費(fèi)者獲取的目的
2、生產(chǎn)者
package com.hrabbit.rabbitmq.publish.send;
import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Auther: hrabbit
* @Date: 2018-06-28 下午2:40
* @Description:
*/
public class Send {
//交換機(jī)名稱
private final static String EXCHANGE_NAME = "hrabbit_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 獲取到連接以及mq通道
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//聲明一個(gè)交換機(jī)松申,一個(gè)參數(shù)為交換機(jī)名稱云芦,第二個(gè)參數(shù)為模式
channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //fanout 分裂
// 消息內(nèi)容
String message = "hello rabbitMQ!";
//發(fā)送消息
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println("Send '" + message + "'");
channel.close();
connection.close();
}
}
當(dāng)我們執(zhí)行發(fā)送消息之后,打開RabbitMQ的后臺(tái)管理界面贸桶,我們可以看到舅逸,交換機(jī)已經(jīng)存在了,但是這時(shí)候皇筛,大家思考一個(gè)問(wèn)題琉历,消息內(nèi)容跑到哪里去了呢旷痕?
這是因?yàn)榻粨Q機(jī)沒(méi)有存儲(chǔ)消息的能力,在rabbitmq中只有隊(duì)列存儲(chǔ)消息的能力.因?yàn)檫@時(shí)還沒(méi)有隊(duì)列,所以就會(huì)丟失;
所以,消息發(fā)送到了一個(gè)沒(méi)有綁定隊(duì)列的交換機(jī)時(shí),消息就會(huì)丟失!
3蹂析、消費(fèi)者1
聲明隊(duì)列為hrabbit_queue_fanout_phone
,將隊(duì)列也綁定到交換機(jī)hrabbit_exchange_fanout
,代碼如下:
package com.hrabbit.rabbitmq.publish.recove;
import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Auther: hrabbit
* @Date: 2018-06-28 下午2:41
* @Description:
*/
public class Recover {
private final static String QUEUE_NAME = "hrabbit_queue_fanout_phone";
private final static String EXCHANGE_NAME = "hrabbit_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
// 獲取到連接以及mq通道
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 綁定隊(duì)列到交換機(jī)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
//------------下面邏輯和work模式一樣-----
// 同一時(shí)刻服務(wù)器只會(huì)發(fā)一條消息給消費(fèi)者
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 消息到達(dá) 觸發(fā)這個(gè)方法
String msg = new String(body, "utf-8");
System.out.println("消費(fèi)者1號(hào):" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("消費(fèi)者1號(hào)執(zhí)行完畢翔始!");
// 手動(dòng)回執(zhí)
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
4斩箫、消費(fèi)者2
聲明隊(duì)列為hrabbit_queue_fanout_email
,并且將隊(duì)列也綁定到交換機(jī)hrabbit_exchange_fanout
,代碼如下:
package com.hrabbit.rabbitmq.publish.recove;
import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Auther: hrabbit
* @Date: 2018-06-28 下午2:41
* @Description:
*/
public class Recove2 {
private final static String QUEUE_NAME = "hrabbit_queue_fanout_email";
private final static String EXCHANGE_NAME = "hrabbit_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
// 獲取到連接以及mq通道
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 綁定隊(duì)列到交換機(jī)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
//------------下面邏輯和work模式一樣-----
// 同一時(shí)刻服務(wù)器只會(huì)發(fā)一條消息給消費(fèi)者
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 消息到達(dá) 觸發(fā)這個(gè)方法
String msg = new String(body, "utf-8");
System.out.println("消費(fèi)者2號(hào):" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("消費(fèi)者2號(hào)執(zhí)行完畢!");
// 手動(dòng)回執(zhí)
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}
5. 測(cè)試結(jié)果
消費(fèi)者1輸出:
消費(fèi)者2輸出:
從上圖中我們可以看到换团,消費(fèi)者1和消費(fèi)者2都可以接收到生產(chǎn)者發(fā)送過(guò)來(lái)的消息悉稠。
而從這張圖中我們可以發(fā)現(xiàn)
hrabbit_queue_fanout_email
和hrabbit_queue_fanout_phone
都綁定到這個(gè)交換機(jī)上了,因此都可以同時(shí)接收到消息艘包。系列文章:
RabbitMQ:RabbitMQ-理論基礎(chǔ)
RabbitMQ:快速入門hello word
RabbitMQ:RabbitMQ:work queues 工作隊(duì)列(Round-robin/Fair dispatch)
RabbitMQ:RabbitMQ:消息應(yīng)答與消息持久化
RabbitMQ:路由Routing
RabbitMQ:Topic類型的exchange
RabbitMQ:RabbitMQ之消息確認(rèn)機(jī)制(事務(wù)+Confirm)
RabbitMQ:spring整合RabbitMQ