6.RPC#前山翻譯

注:這是RabbitMQ-java版Client的指導(dǎo)教程翻譯系列文章,歡迎大家批評指正
第一篇Hello Word了解RabbitMQ的基本用法
第二篇Work Queues介紹隊列的使用
第三篇Publish/Subscribe介紹轉(zhuǎn)換器以及其中fanout類型
第四篇Routing介紹direct類型轉(zhuǎn)換器
第五篇Topics介紹topic類型轉(zhuǎn)換器
第六篇RPC介紹遠(yuǎn)程調(diào)用

遠(yuǎn)程過程調(diào)用(Remote procedure call )

在第二篇指導(dǎo)教程中溺忧,我們學(xué)會在多個消費(fèi)者之間使用工作隊列循環(huán)分發(fā)任務(wù)昧诱。但是如果我們需要在遠(yuǎn)程電腦上運(yùn)行一個程序并且需要返回結(jié)果呢。這就是另一個不同的事情,這個模型就是聞名的遠(yuǎn)程過程調(diào)用,簡稱RPC。

在這篇指導(dǎo)教程中调塌,我們將會使用RabbitMQ去創(chuàng)建一個RPC系統(tǒng):一個客戶端和一個可測試的遠(yuǎn)程服務(wù)。因為我們沒有按時間消費(fèi)的任務(wù)去分配惠猿,所以將會創(chuàng)建一個仿制的RPC服務(wù)羔砾,用于返回斐波那契數(shù)列。

客戶端接口(Client interface)

為了解釋RPC服務(wù)是如何使用的偶妖,我們將創(chuàng)建一個簡單的客戶端類姜凄,它將會暴露出一個call的方法,用于發(fā)送RPC請求趾访,并且阻塞住直到結(jié)果被返回:

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();

String result = fibonacciRpc.call("4");

System.out.println( "fib(4) is " + result);  

RPC注意

盡管RPC在計算中是一個很常見的模式态秧,但是它仍有很多不足的地點(diǎn)。問題產(chǎn)生的原因程序員沒有意識到這個功能正在本地調(diào)用扼鞋,還是服務(wù)端反應(yīng)慢申鱼。對這樣不可預(yù)料的系統(tǒng)結(jié)果就會有困惑,就會去增加一些不必要的復(fù)雜的調(diào)試云头。濫用RPC可能會導(dǎo)致不可維護(hù)捐友,難于理解的代碼,而不是簡化軟件溃槐。

請注意匣砖,考慮下面的一些建議:

  • 功能顯而易見在本地調(diào)用還是遠(yuǎn)程調(diào)用。

  • 用文檔記錄你的系統(tǒng)昏滴,確保各組件之間依賴清晰猴鲫。

  • 處理錯誤情況。當(dāng)RPC服務(wù)端好久沒有反應(yīng)谣殊,客戶端如何響應(yīng)拂共?

處于困境的時候避免使用RPC。如果可以使用姻几,你應(yīng)該使用異步請求方式宜狐,而不是RPC阻塞的方式。異步返回結(jié)果會被推送到另一個計算階段鲜棠。

返回隊列(Callback queue)

一般來說使用RabbitMQ來進(jìn)行RPC是簡單的肌厨,一個客戶端發(fā)出請求消息和一個服務(wù)端返回相應(yīng)消息培慌。為了能夠接受到響應(yīng)豁陆,我們需要在請求中發(fā)送一個callback隊列的地址,我們使用默認(rèn)的隊列(客戶端唯一的隊列),試試:

callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties

.Builder()

.replyTo(callbackQueueName)

.build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

// ... then code to read a response message from the callback_queue ...

消息屬性

AMQP 0-9-1協(xié)議預(yù)先定義了14個屬性吵护,大部分的屬性很少使用盒音,下面有一些說明:

deliveryMode:標(biāo)記這個消息可以持久化(值為2)表鳍,或者短暫保留(值為其它的),在第二章里面有提到過這個屬性

contentType:用于描述文本的類型。例如經(jīng)常使用JSON的編碼方式祥诽,這是經(jīng)常設(shè)置的屬性:application/json

replyTo:經(jīng)常用于存儲返回隊列的名字

correlationId:對請求的RPC相應(yīng)是有用的

我們使用一個新的引用:

 import com.rabbitmq.client.AMQP.BasicProperties;

相關(guān)聯(lián)的Id(Correlation Id)

按照目前上述的方法譬圣,我們需要為每一個RPC請求都創(chuàng)建一個callBack隊列,顯然不夠高效雄坪。幸運(yùn)的是這里有一種更好的方式厘熟,一個客戶端我們只需要創(chuàng)建一個callback隊列。

這會導(dǎo)致一個新的問題维哈,在隊列中接收的響應(yīng)不知道是哪一個請求的绳姨,這就需要使用到correlationId屬性,我們?yōu)槊恳粋€請求都設(shè)置唯一correlationId的值阔挠,基于這個值我們就能都找到匹配請求的相應(yīng)飘庄。如果我們有一個不匹配correlationId的值,或許可以刪除這條消息购撼,因為它不屬于我們的請求跪削。

你可能會問:在返回隊列中我們?yōu)槭裁磻?yīng)該忽略掉不知道的消息,而不是當(dāng)做一個錯誤迂求?在服務(wù)端有一種可能碾盐,在發(fā)送我們一條消息的之后,RPC服務(wù)死了揩局,但是反饋信息已經(jīng)發(fā)出去了廓旬。如果發(fā)生這種情況,重新啟動的RPC服務(wù)端將會再次處理這個消息谐腰,這就是為什么我們必須在客戶端處理這個多余的相應(yīng)孕豹。對于RPC也是這樣理解的(這一段亂七八糟)。

總結(jié)

python-six.png

我們RPC就像上圖這樣工作:

當(dāng)客戶端創(chuàng)建十气,它創(chuàng)建一個異步唯一的callback隊列

對于一個RPC請求來說励背,客戶端發(fā)送帶有兩個屬相的消息:replyTo,被設(shè)置callback隊列的名稱砸西;correlationId被設(shè)置為唯一請求的值叶眉。

這個請求被發(fā)送到rpc-queue隊列

在隊列中RPC工作者等待著請求,當(dāng)請求來的時候芹枷,它處理工作衅疙,把帶有反饋隊列的消息發(fā)送回客戶端,使用replyTo中的返回隊列鸳慈。

客戶端在callback隊列中等待返回數(shù)據(jù)饱溢,當(dāng)消息來得時候,它會檢查correlationId的屬性走芋,如果它匹配請求中的correlationId的值绩郎,就會返回相應(yīng)給應(yīng)用潘鲫。

綜合

斐波那契數(shù)列任務(wù):

private static int fib(int n) {

      if (n == 0) return 0;

      if (n == 1) return 1;

      return fib(n-1) + fib(n-2);

    }

我們聲明了斐波那契數(shù)列的方法,它表明只有一個有效積極的數(shù)輸入(不要期望去處理很多的數(shù)字肋杖,實(shí)現(xiàn)起來會很慢很慢的)
下面是RPCServer.javade daima 溉仑,這里下載

import com.rabbitmq.client.*;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    public static void main(String[] argv) {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("localhost");

        Connection connection = null;

        try {

            connection = factory.newConnection();

            Channel channel = connection.createChannel();

            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

            channel.basicQos(1);

            System.out.println(" [x] Awaiting RPC requests");

            Consumer consumer = new DefaultConsumer(channel) {

                @Override

                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();

                   String response = "";

                    try {

                        String message = new String(body,"UTF-8");

                         int n = Integer.parseInt(message);

                         System.out.println(" [.] fib(" + message + ")");

                         response += fib(n);

                     }  catch (RuntimeException e){

                          System.out.println(" [.] " + e.toString());

                     }  finally {

                             channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));

                            channel.basicAck(envelope.getDeliveryTag(), false);

                 }

              }

         };

         channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

       //...

}}}

服務(wù)端代碼是非常直觀明了
跟以往一樣,先建立連接状植,創(chuàng)建通道浊竟,聲明隊列。我們可能想要運(yùn)行多個服務(wù)進(jìn)程津畸,為了創(chuàng)建更多的服務(wù)者逐沙,我們需要在channel.basicQos方法中設(shè)置prefetchCount的值。
我們使用bacisConsume去連接隊列洼畅,隊列中我們提供一個一個返回表單的對象吩案,用于工作并且返回相應(yīng)。
下面是RPCClient.java的源代碼帝簇,這里下載

import com.rabbitmq.client.*;

import java.io.IOException;

import java.util.UUID;

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.TimeoutException;

public class RPCClient {

    private Connection connection;

    private Channel channel;

    private String requestQueueName = "rpc_queue";

    private String replyQueueName;

    public RPCClient() throws IOException, TimeoutException {

    ConnectionFactory factory = new ConnectionFactory();

    factory.setHost("localhost");

    connection = factory.newConnection();

    channel = connection.createChannel();

    replyQueueName = channel.queueDeclare().getQueue();

}

    public String call(String message) throws IOException, InterruptedException {

    String corrId = UUID.randomUUID().toString();

    AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();

    channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

    final BlockingQueue response = new ArrayBlockingQueue(1);

    channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {

        @Override

        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

        if (properties.getCorrelationId().equals(corrId)) {

             response.offer(new String(body, "UTF-8"));

        }

       }

      });

     return response.take();

    }

  public void close() throws IOException {

   connection.close();

   }

//...

}

客戶端代碼有一些調(diào)用:

我們創(chuàng)建連接徘郭,通道,聲明了唯一作為響應(yīng)的返回隊列丧肴,我們訂閱了callback的隊列残揉,這樣我們就可以接受到RPC的響應(yīng)。

我們的call方法將會調(diào)用RPC的請求芋浮。

這里抱环,我們第一次定義了唯一的correlationId值并且保存它。在DefaultConsumer中實(shí)現(xiàn)的handleDelivery的方法將會使用該值去和獲取到響應(yīng)的correlationId比較纸巷。

下一步镇草,我們發(fā)布一個請求的消息,并且?guī)в袃蓚€屬性:replyTo和correlationId.

這個時候瘤旨,我們可以停下來梯啤,等待合適的反饋。

消費(fèi)者開啟另一個線程中處理消息存哲,在響應(yīng)前我們應(yīng)該先擱置主線程因宇。使用BlockingQueue來處理,這就是我們創(chuàng)建只有一個容器ArrayblockingQueue祟偷,正如我們只需要去等待一個響應(yīng)察滑。

這個handleDelivery方法在做一些簡單的工作,對于每一個響應(yīng)消息它都會檢查correlationId是否是我們想要的那個修肠,然后把結(jié)果發(fā)送到blockQueue中贺辰。

同時主線程將會等待從BloakingQueue中取出消息。

最后我們返回結(jié)果給使用者。
客戶端發(fā)送請求:

RPCClient fibonacciRpc = new RPCClient();

System.out.println(" [x] Requesting fib(30)");

String response = fibonacciRpc.call("30");

System.out.println(" [.] Got '" + response + "'");

fibonacciRpc.close();

現(xiàn)在希望好好看一下我們例子中的源代碼魂爪。

跟以前的指導(dǎo)文件中一樣編譯:

 javac -cp $CP RPCClient.java RPCServer.java

我們RPC服務(wù)已經(jīng)準(zhǔn)備好了,現(xiàn)在開啟服務(wù):

java -cp $CP RPCServer

# => [x] Awaiting RPC requests

運(yùn)行想要獲取斐波那契數(shù)列的客戶端:

java -cp $CP RPCClient

# => [x] Requesting fib(30)

目前的設(shè)計不僅僅只是實(shí)現(xiàn)RPC服務(wù)的接口艰管,它還有一些其它的重要優(yōu)勢:如果RPC服務(wù)太慢滓侍,你可以按比例增加運(yùn)行一個RPC,在一個新的控制臺運(yùn)行第二個RPC服務(wù)牲芋。

在客戶端撩笆,RPC只能需要發(fā)送和接收一條消息,同步請求像queueDeclare是必須的缸浦,因此RPC客戶端的結(jié)果需要連接網(wǎng)絡(luò)才能獲取到一個簡單的RPC請求夕冲。(亂七八糟)

我們的代碼依然是非常的簡單,并沒有解決很復(fù)雜的問題裂逐,像:

  • 如果沒有運(yùn)行服務(wù)端歹鱼,客戶端如何響應(yīng)?
  • 客戶端應(yīng)該對RPC設(shè)置超時操作么卜高?
  • 如果服務(wù)端發(fā)生故障了并且爆出了異常弥姻,應(yīng)該把異常發(fā)送給客戶端么?
  • 在進(jìn)行處理之前掺涛,保護(hù)無效的消息么(檢查綁定和類型)庭敦?

第六節(jié)的內(nèi)容大致翻譯完了,這里是原文鏈接薪缆。

終篇是我對RabbitMQ使用理解的總結(jié)文章秧廉,歡迎討教。
--謝謝--

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末拣帽,一起剝皮案震驚了整個濱河市疼电,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌减拭,老刑警劉巖澜沟,帶你破解...
    沈念sama閱讀 222,865評論 6 518
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異峡谊,居然都是意外死亡茫虽,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,296評論 3 399
  • 文/潘曉璐 我一進(jìn)店門既们,熙熙樓的掌柜王于貴愁眉苦臉地迎上來濒析,“玉大人,你說我怎么就攤上這事啥纸『判樱” “怎么了?”我有些...
    開封第一講書人閱讀 169,631評論 0 364
  • 文/不壞的土叔 我叫張陵,是天一觀的道長盾致。 經(jīng)常有香客問我主经,道長,這世上最難降的妖魔是什么庭惜? 我笑而不...
    開封第一講書人閱讀 60,199評論 1 300
  • 正文 為了忘掉前任罩驻,我火速辦了婚禮,結(jié)果婚禮上护赊,老公的妹妹穿的比我還像新娘惠遏。我一直安慰自己,他們只是感情好骏啰,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,196評論 6 398
  • 文/花漫 我一把揭開白布节吮。 她就那樣靜靜地躺著,像睡著了一般判耕。 火紅的嫁衣襯著肌膚如雪透绩。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,793評論 1 314
  • 那天壁熄,我揣著相機(jī)與錄音渺贤,去河邊找鬼。 笑死请毛,一個胖子當(dāng)著我的面吹牛志鞍,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播方仿,決...
    沈念sama閱讀 41,221評論 3 423
  • 文/蒼蘭香墨 我猛地睜開眼固棚,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了仙蚜?” 一聲冷哼從身側(cè)響起此洲,我...
    開封第一講書人閱讀 40,174評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎委粉,沒想到半個月后呜师,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,699評論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡贾节,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,770評論 3 343
  • 正文 我和宋清朗相戀三年汁汗,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片栗涂。...
    茶點(diǎn)故事閱讀 40,918評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡知牌,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出斤程,到底是詐尸還是另有隱情角寸,我是刑警寧澤,帶...
    沈念sama閱讀 36,573評論 5 351
  • 正文 年R本政府宣布,位于F島的核電站扁藕,受9級特大地震影響沮峡,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜亿柑,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,255評論 3 336
  • 文/蒙蒙 一邢疙、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧橄杨,春花似錦秘症、人聲如沸照卦。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,749評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽役耕。三九已至采转,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間瞬痘,已是汗流浹背故慈。 一陣腳步聲響...
    開封第一講書人閱讀 33,862評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留框全,地道東北人察绷。 一個月前我還...
    沈念sama閱讀 49,364評論 3 379
  • 正文 我出身青樓,卻偏偏與公主長得像津辩,于是被迫代替她去往敵國和親拆撼。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,926評論 2 361

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理喘沿,服務(wù)發(fā)現(xiàn)闸度,斷路器,智...
    卡卡羅2017閱讀 134,719評論 18 139
  • “ 消息隊列已經(jīng)逐漸成為企業(yè)IT系統(tǒng)內(nèi)部通信的核心手段蚜印。它具有低耦合莺禁、可靠投遞、廣播窄赋、流量控制哟冬、最終一致性等一系列...
    落羽成霜丶閱讀 3,993評論 1 41
  • 來源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個高并發(fā)高可靠AMQP消息隊列服務(wù)器。支持消息的持久化忆绰、事務(wù)柒傻、擁塞控...
    jiangmo閱讀 10,369評論 2 34
  • kafka的定義:是一個分布式消息系統(tǒng),由LinkedIn使用Scala編寫较木,用作LinkedIn的活動流(Act...
    時待吾閱讀 5,328評論 1 15
  • 親愛的愛π: 你好爸驴!未來已經(jīng)來到了萎馅,你實(shí)現(xiàn)了當(dāng)初你曾經(jīng)有過的夢想嗎双戳? 你還記得自己曾經(jīng)的夢想嗎? ...
    美生活閱讀 146評論 0 1