Rabbitmq示例之RPC模式

上一篇 <<<Rabbitmq示例之通配符模式Topics
下一篇 >>>Rabbitmq隊列模式總結


1.核心思想

  1. RPC客戶端啟動后旱物,創(chuàng)建一個匿名澈圈、獨占的、回調的隊列
  2. RPC客戶端設置消息的2個屬性:replyTo和correlationId,然后將消息發(fā)送到隊列rpc_queue
  3. RPC服務端在隊列rpc_queue上等待消息鸡号。RPC服務端處理完收到消息后筝尾,然后將處理結果封裝成消息發(fā)送到replyTo指定的隊列上途戒,并且此消息帶上correlationId(此值為收到消息里的correlationId)
  4. RPC客戶端在隊列replyTo上等待消息命黔,當收到消息后,它會判斷收到消息的correlationId战得。如果值和自己之前發(fā)送的一樣疾层,則這個值就是RPC的處理結果

2.核心代碼

/**
 * 客戶端
 * 1、生成臨時隊列[replyQueueName]用于接收回調數(shù)據(jù)
 * 2贡避、生成唯一標識[corrId]
 * 3痛黎、將唯一標識和臨時隊列連同消息內容發(fā)送給服務器端的主隊列
 * 。刮吧。湖饱。。杀捻。阻塞隊列監(jiān)聽服務器的消息反饋井厌。。致讥。仅仆。。
 * 4垢袱、如果服務端已反饋墓拜,并且和唯一標識一致,則消費
 */
public class RpcClient {
    private static final String RPC_QUEUE_NAME = "rpc_test";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

            Connection connection = RabitMQConnection.getConnection();
            final Channel channel = connection.createChannel();
            // 定義臨時隊列请契,并返回生成的隊列名稱
            String replyQueueName = channel.queueDeclare().getQueue();
            //本次請求唯一ID
            String corrId = UUID.randomUUID().toString();
            AMQP.BasicProperties props = new AMQP.BasicProperties
                    .Builder()
                    // 唯一標志本次請求
                    .correlationId(corrId)
                    // 設置回調隊列
                    .replyTo(replyQueueName)
                    .build();
            String message = "我是來自客戶端的請求";
        // 發(fā)送消息咳榜,發(fā)送到默認交換機
        channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes("UTF-8"));

        // 阻塞隊列夏醉,用于存儲回調結果
        final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
        channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                if (properties.getCorrelationId().equals(corrId)) {
                    response.offer(new String(body, "UTF-8"));
                }
            }
        });
        // 獲取回調的結果
        String result = response.take();
        System.out.println(" 服務端響應數(shù)據(jù):'" + result + "'");
        channel.close();
        connection.close();
    }
}


/**
 * 服務端
 * 1、使用同步鎖機制監(jiān)聽主隊列的請求情況
 *
 * 2涌韩、如果有請求畔柔,則反饋消息到消費者的臨時隊列中
 */
public class RpcServer {

    private static final String RPC_QUEUE_NAME = "rpc_test";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = RabitMQConnection.getConnection();
        Channel channel = connection.createChannel();
        // 設置同時最多只能獲取一個消息
        channel.basicQos(1);
        System.out.println(" [RpcServer] Awaiting RPC requests");
        // 定義消息的回調處理類
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 生成返回的結果,關鍵是設置correlationId值
                AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                        .Builder()
                        .correlationId(properties.getCorrelationId())
                        .build();
                // 生成返回
                try {
                    Thread.sleep(1000 *1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String response = "我是來自服務端的反饋";
                // 回復消息臣樱,通知已經收到請求
                channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
                // 對消息進行應答
                channel.basicAck(envelope.getDeliveryTag(), false);
                // 喚醒正在消費者所有的線程
                synchronized (this) {
                    this.notify();
                }
            }
        };
        // 消費消息
        channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

        while (true) {
            synchronized (consumer) {
                try {
                    consumer.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

推薦閱讀:
<<<消息中間件的核心思想
<<<消息中間件常見問題匯總
<<<基于Netty簡單手寫消息中間件思路
<<<消息隊列常用名詞與中間件對比
<<<Rabbitmq基礎知識
<<<Rabbitmq示例之點對點簡單隊列
<<<Rabbitmq示例之工作(公平)隊列
<<<Rabbitmq示例之發(fā)布訂閱模式
<<<Rabbitmq示例之路由模式Routing
<<<Rabbitmq示例之通配符模式Topics
<<<Rabbitmq隊列模式總結
<<<Rabbitmq如何保證消息不丟失
<<<Springboot利用AmqpTemplate整合Rabbitmq
<<<Rabbitmq如何保證冪等性
<<<Rabbitmq的重試策略
<<<Rabbitmq通過死信隊列實現(xiàn)過期監(jiān)聽
<<<Rabbitmq解決分布式事務思路
<<<Rabbitmq解決分布式事務demo
<<<Rabbitmq環(huán)境安裝
<<<Kafka中的專業(yè)術語都有哪些
<<<Kafka的設計原理介紹
<<<Kafka集群如何實現(xiàn)相互感知
<<<Kafka如何實現(xiàn)分區(qū)及指定分區(qū)消費
<<<Kafka如何保證消息順序消費
<<<Kafka如何保證高吞吐量
<<<Kafka集群環(huán)境搭建
<<<RocketMQ架構原理
<<<RocketMQ靶擦、RabbitMQ和Kafka的對比
<<<SpringBoot整合RocketMQ示例
<<<RocketMQ保證順序消費demo
<<<RocketMQ如何動態(tài)擴容和縮容
<<<RocketMQ如何解決分布式事務
<<<RocketMQ單機版本安裝
<<<RocketMQ集群環(huán)境程序啟用相關知識點
<<<RocketMQ單機做主備實操
<<<RocketMQ所有配置說明

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市雇毫,隨后出現(xiàn)的幾起案子玄捕,更是在濱河造成了極大的恐慌,老刑警劉巖嘴拢,帶你破解...
    沈念sama閱讀 219,539評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件桩盲,死亡現(xiàn)場離奇詭異寂纪,居然都是意外死亡席吴,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,594評論 3 396
  • 文/潘曉璐 我一進店門捞蛋,熙熙樓的掌柜王于貴愁眉苦臉地迎上來孝冒,“玉大人,你說我怎么就攤上這事拟杉∽校” “怎么了?”我有些...
    開封第一講書人閱讀 165,871評論 0 356
  • 文/不壞的土叔 我叫張陵搬设,是天一觀的道長穴店。 經常有香客問我,道長拿穴,這世上最難降的妖魔是什么泣洞? 我笑而不...
    開封第一講書人閱讀 58,963評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮默色,結果婚禮上球凰,老公的妹妹穿的比我還像新娘。我一直安慰自己腿宰,他們只是感情好呕诉,可當我...
    茶點故事閱讀 67,984評論 6 393
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著吃度,像睡著了一般甩挫。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上椿每,一...
    開封第一講書人閱讀 51,763評論 1 307
  • 那天捶闸,我揣著相機與錄音夜畴,去河邊找鬼。 笑死删壮,一個胖子當著我的面吹牛贪绘,可吹牛的內容都是我干的。 我是一名探鬼主播央碟,決...
    沈念sama閱讀 40,468評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼税灌,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了亿虽?” 一聲冷哼從身側響起菱涤,我...
    開封第一講書人閱讀 39,357評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎洛勉,沒想到半個月后粘秆,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經...
    沈念sama閱讀 45,850評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡收毫,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,002評論 3 338
  • 正文 我和宋清朗相戀三年攻走,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片此再。...
    茶點故事閱讀 40,144評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡昔搂,死狀恐怖,靈堂內的尸體忽然破棺而出输拇,到底是詐尸還是另有隱情摘符,我是刑警寧澤,帶...
    沈念sama閱讀 35,823評論 5 346
  • 正文 年R本政府宣布策吠,位于F島的核電站逛裤,受9級特大地震影響,放射性物質發(fā)生泄漏猴抹。R本人自食惡果不足惜带族,卻給世界環(huán)境...
    茶點故事閱讀 41,483評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望洽糟。 院中可真熱鬧炉菲,春花似錦、人聲如沸坤溃。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,026評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽薪介。三九已至祠饺,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間汁政,已是汗流浹背道偷。 一陣腳步聲響...
    開封第一講書人閱讀 33,150評論 1 272
  • 我被黑心中介騙來泰國打工缀旁, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人勺鸦。 一個月前我還...
    沈念sama閱讀 48,415評論 3 373
  • 正文 我出身青樓并巍,卻偏偏與公主長得像,于是被迫代替她去往敵國和親换途。 傳聞我的和親對象是個殘疾皇子懊渡,可洞房花燭夜當晚...
    茶點故事閱讀 45,092評論 2 355

推薦閱讀更多精彩內容