上一篇 <<<Rabbitmq示例之通配符模式Topics
下一篇 >>>Rabbitmq隊列模式總結
1.核心思想
- RPC客戶端啟動后旱物,創(chuàng)建一個匿名澈圈、獨占的、回調的隊列
- RPC客戶端設置消息的2個屬性:replyTo和correlationId,然后將消息發(fā)送到隊列rpc_queue
- RPC服務端在隊列rpc_queue上等待消息鸡号。RPC服務端處理完收到消息后筝尾,然后將處理結果封裝成消息發(fā)送到replyTo指定的隊列上途戒,并且此消息帶上correlationId(此值為收到消息里的correlationId)
- RPC客戶端在隊列replyTo上等待消息命黔,當收到消息后,它會判斷收到消息的correlationId战得。如果值和自己之前發(fā)送的一樣疾层,則這個值就是RPC的處理結果
2.核心代碼
/**
* 客戶端
* 1、生成臨時隊列[replyQueueName]用于接收回調數(shù)據(jù)
* 2贡避、生成唯一標識[corrId]
* 3痛黎、將唯一標識和臨時隊列連同消息內容發(fā)送給服務器端的主隊列
* 。刮吧。湖饱。。杀捻。阻塞隊列監(jiān)聽服務器的消息反饋井厌。。致讥。仅仆。。
* 4垢袱、如果服務端已反饋墓拜,并且和唯一標識一致,則消費
*/
public class RpcClient {
private static final String RPC_QUEUE_NAME = "rpc_test";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = RabitMQConnection.getConnection();
final Channel channel = connection.createChannel();
// 定義臨時隊列请契,并返回生成的隊列名稱
String replyQueueName = channel.queueDeclare().getQueue();
//本次請求唯一ID
String corrId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
// 唯一標志本次請求
.correlationId(corrId)
// 設置回調隊列
.replyTo(replyQueueName)
.build();
String message = "我是來自客戶端的請求";
// 發(fā)送消息咳榜,發(fā)送到默認交換機
channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes("UTF-8"));
// 阻塞隊列夏醉,用于存儲回調結果
final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
if (properties.getCorrelationId().equals(corrId)) {
response.offer(new String(body, "UTF-8"));
}
}
});
// 獲取回調的結果
String result = response.take();
System.out.println(" 服務端響應數(shù)據(jù):'" + result + "'");
channel.close();
connection.close();
}
}
/**
* 服務端
* 1、使用同步鎖機制監(jiān)聽主隊列的請求情況
*
* 2涌韩、如果有請求畔柔,則反饋消息到消費者的臨時隊列中
*/
public class RpcServer {
private static final String RPC_QUEUE_NAME = "rpc_test";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabitMQConnection.getConnection();
Channel channel = connection.createChannel();
// 設置同時最多只能獲取一個消息
channel.basicQos(1);
System.out.println(" [RpcServer] Awaiting RPC requests");
// 定義消息的回調處理類
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 生成返回的結果,關鍵是設置correlationId值
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(properties.getCorrelationId())
.build();
// 生成返回
try {
Thread.sleep(1000 *1);
} catch (InterruptedException e) {
e.printStackTrace();
}
String response = "我是來自服務端的反饋";
// 回復消息臣樱,通知已經收到請求
channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
// 對消息進行應答
channel.basicAck(envelope.getDeliveryTag(), false);
// 喚醒正在消費者所有的線程
synchronized (this) {
this.notify();
}
}
};
// 消費消息
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
while (true) {
synchronized (consumer) {
try {
consumer.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
推薦閱讀:
<<<消息中間件的核心思想
<<<消息中間件常見問題匯總
<<<基于Netty簡單手寫消息中間件思路
<<<消息隊列常用名詞與中間件對比
<<<Rabbitmq基礎知識
<<<Rabbitmq示例之點對點簡單隊列
<<<Rabbitmq示例之工作(公平)隊列
<<<Rabbitmq示例之發(fā)布訂閱模式
<<<Rabbitmq示例之路由模式Routing
<<<Rabbitmq示例之通配符模式Topics
<<<Rabbitmq隊列模式總結
<<<Rabbitmq如何保證消息不丟失
<<<Springboot利用AmqpTemplate整合Rabbitmq
<<<Rabbitmq如何保證冪等性
<<<Rabbitmq的重試策略
<<<Rabbitmq通過死信隊列實現(xiàn)過期監(jiān)聽
<<<Rabbitmq解決分布式事務思路
<<<Rabbitmq解決分布式事務demo
<<<Rabbitmq環(huán)境安裝
<<<Kafka中的專業(yè)術語都有哪些
<<<Kafka的設計原理介紹
<<<Kafka集群如何實現(xiàn)相互感知
<<<Kafka如何實現(xiàn)分區(qū)及指定分區(qū)消費
<<<Kafka如何保證消息順序消費
<<<Kafka如何保證高吞吐量
<<<Kafka集群環(huán)境搭建
<<<RocketMQ架構原理
<<<RocketMQ靶擦、RabbitMQ和Kafka的對比
<<<SpringBoot整合RocketMQ示例
<<<RocketMQ保證順序消費demo
<<<RocketMQ如何動態(tài)擴容和縮容
<<<RocketMQ如何解決分布式事務
<<<RocketMQ單機版本安裝
<<<RocketMQ集群環(huán)境程序啟用相關知識點
<<<RocketMQ單機做主備實操
<<<RocketMQ所有配置說明