內(nèi)容來自:RabbitMQ Tutorials Java版
遠(yuǎn)程過程調(diào)用(RPC)
在第二個(gè)教程中据忘,我們學(xué)會(huì)了如何使用工作隊(duì)列將耗時(shí)的任務(wù)分發(fā)給多個(gè)工作者明肮。
但假如我們想調(diào)用遠(yuǎn)程電腦上的一個(gè)函數(shù)(或方法)并等待函數(shù)執(zhí)行的結(jié)果栅干,這時(shí)候該怎么辦呢邦蜜?好吧字管,這是一個(gè)不同的故事独泞。這種模式通常稱為遠(yuǎn)程過程調(diào)用RPC(Remote Procedure Call
)呐矾。
在今天的教程中,我們將會(huì)使用RabbitMQ來建立一個(gè)RPC系統(tǒng):一個(gè)客戶端和一個(gè)可擴(kuò)展的RPC服務(wù)端懦砂。因?yàn)槲覀儧]有任何現(xiàn)成的耗時(shí)任務(wù)蜒犯,我們將會(huì)創(chuàng)建一個(gè)假的RPC服務(wù),它將返回斐波那契數(shù)(Fibonacci numbers
)荞膘。
客戶端接口(Client interface)
為了演示如何使用RPC服務(wù)罚随,我們將創(chuàng)建一個(gè)簡單的客戶端類。它負(fù)責(zé)暴露一個(gè)名為call
的方法羽资,該方法將發(fā)送一個(gè)RPC請(qǐng)求并阻塞淘菩,直到接收到回答。
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);
關(guān)于RPC
盡管在計(jì)算領(lǐng)域RPC這種模式很普遍屠升,但它仍備受批評(píng)潮改。當(dāng)程序員不清楚一個(gè)方法到底是本地的還是一個(gè)在遠(yuǎn)程機(jī)器上執(zhí)行,問題就來了弥激。此類疑惑通常給調(diào)試帶來不必要的復(fù)雜性进陡。相比簡單的軟件,不恰當(dāng)?shù)腞PC使用會(huì)導(dǎo)致產(chǎn)生不可維護(hù)的面條代碼(spaghetti code)微服。
</br>將上面的話記在腦子里趾疚,并考慮一下建議:
①確保讓哪個(gè)函數(shù)調(diào)用是本地調(diào)用哪個(gè)是遠(yuǎn)程調(diào)用看起來很明顯。
②為系統(tǒng)寫文檔以蕴,清楚地表述組件間的依賴關(guān)系糙麦。
③處理錯(cuò)誤,比如當(dāng)RPC服務(wù)很久沒有反應(yīng)丛肮,客戶端應(yīng)該怎么辦赡磅。
</br>盡量避免RPC。如果可能宝与,你可以使用異步管道來代替RPC焚廊,像阻塞,結(jié)果將會(huì)異步地推送到下一個(gè)計(jì)算階段习劫。
回調(diào)隊(duì)列(Callback queue)
使用RabbitMQ來做RPC很容易咆瘟。客戶端發(fā)送一個(gè)請(qǐng)求消息诽里,服務(wù)端以一個(gè)響應(yīng)消息回應(yīng)袒餐。為了可以接收到響應(yīng),需要與請(qǐng)求(消息)一起,發(fā)送一個(gè)回調(diào)的隊(duì)列灸眼。我們使用默認(rèn)的隊(duì)列(Java獨(dú)有的):
callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();
channel.basicPublish("", "rpc_queue", props, message.getBytes());
// ... then code to read a response message from the callback_queue ...
消息屬性
AMPQ 0-9-1協(xié)議預(yù)定義了消息的14種屬性卧檐。大部分屬性都很少用到,除了下面的幾種:
①deliveryMode
:標(biāo)記一個(gè)消息是持久的(值為2)還是短暫的(2以外的任何值)焰宣,你可能還記得我們的第二個(gè)教程中用到過這個(gè)屬性霉囚。
②contentType
:描述編碼的mime-type
(mime-type of the encoding
)。比如最常使用JSON
格式匕积,就可以將該屬性設(shè)置為application/json
佛嬉。
③replyTo
:通常用來命名一個(gè)回調(diào)隊(duì)列。
④correlationId
:用來關(guān)聯(lián)RPC的響應(yīng)和請(qǐng)求闸天。
我們需要引入一個(gè)新的類:
import com.rabbitmq.client.AMQP.BasicProperties;
關(guān)聯(lián)標(biāo)識(shí)(Correlation Id)
在上面的方法中,我們?yōu)槊恳粋€(gè)RPC請(qǐng)求都創(chuàng)建了一個(gè)新的回調(diào)隊(duì)列斜做。這樣做顯然很低效苞氮,但幸好我們有更好的方式:讓我們?yōu)槊恳粋€(gè)客戶端創(chuàng)建一個(gè)回調(diào)隊(duì)列。
這樣做又引入了一個(gè)新的問題瓤逼,在回調(diào)隊(duì)列中收到響應(yīng)后不知道到底是屬于哪個(gè)請(qǐng)求的笼吟。這時(shí)候,Correlation Id
就可以派上用場了霸旗。對(duì)每一個(gè)請(qǐng)求贷帮,我們都創(chuàng)建一個(gè)唯一性的值作為Correlation Id
。之后诱告,當(dāng)我們從回調(diào)隊(duì)列中收到消息的時(shí)候撵枢,就可以查找這個(gè)屬性,基于這一點(diǎn)精居,我們就可以將一個(gè)響應(yīng)和一個(gè)請(qǐng)求進(jìn)行關(guān)聯(lián)锄禽。如果我們看到一個(gè)不知道的Correlation Id
值,我們就可以安全地丟棄該消息靴姿,因?yàn)樗粚儆谖覀兊恼?qǐng)求沃但。
你可能會(huì)問,為什么要忽視回調(diào)隊(duì)列中的不知道的消息佛吓,而不是直接以一個(gè)錯(cuò)誤失斚怼(failing with an error)。這是由于服務(wù)端可能存在的競爭條件维雇。盡管不會(huì)淤刃,但這種情況仍有可能發(fā)生:RPC服務(wù)端在發(fā)給我們答案之后就掛掉了,還沒來得及為請(qǐng)求發(fā)送一個(gè)確認(rèn)信息谆沃。如果發(fā)生這種情況钝凶,重啟后的RPC服務(wù)端將會(huì)重新處理該請(qǐng)求(因?yàn)闆]有給RabbitMQ發(fā)送確認(rèn)消息,RabbitMQ會(huì)重新發(fā)送消息給RPC服務(wù))。這就是為什么我們要在客戶端優(yōu)雅地處理重復(fù)響應(yīng)耕陷,并且理想情況下掂名,RPC服務(wù)要是冪等的。
總結(jié)
我們的RPC系統(tǒng)的工作流程如下:
當(dāng)客戶端啟動(dòng)后,它會(huì)創(chuàng)建一個(gè)異步的獨(dú)特的回調(diào)隊(duì)列。對(duì)于一個(gè)RPC請(qǐng)求砚哆,客戶端將會(huì)發(fā)送一個(gè)配置了兩個(gè)屬性的消息:一個(gè)是replyTo
屬性世落,設(shè)置為這個(gè)回調(diào)隊(duì)列;另一個(gè)是correlation id
屬性略吨,每一個(gè)請(qǐng)求都會(huì)設(shè)置為一個(gè)具有唯一性的值。這個(gè)請(qǐng)求將會(huì)發(fā)送到rpc_queue
隊(duì)列。
RPC工作者(即圖中的server
)將會(huì)等待rpc_queue
隊(duì)列的請(qǐng)求发皿。當(dāng)有請(qǐng)求到來時(shí),它就會(huì)開始干活(計(jì)算斐波那契數(shù))并將結(jié)果通過發(fā)送消息來返回拂蝎,該返回消息發(fā)送到replyTo
指定的隊(duì)列穴墅。
客戶端將等待回調(diào)隊(duì)列返回?cái)?shù)據(jù)。當(dāng)返回的消息到達(dá)時(shí)温自,它將檢查correlation id
屬性玄货。如果該屬性值和請(qǐng)求匹配,就將響應(yīng)返回給程序悼泌。
放在一塊
計(jì)算斐波那契數(shù)的任務(wù)如下:
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n-1) + fib(n-2);
}
我們定義了斐波那契函數(shù)松捉,它假設(shè)只會(huì)輸入正整數(shù)(不要期望該函數(shù)在輸入很大的數(shù)的時(shí)候可以好好工作,它可能是最慢的遞歸實(shí)現(xiàn))馆里。
RPC服務(wù)RPCServer.java
的代碼如下:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
//模擬的耗時(shí)任務(wù)隘世,即計(jì)算斐波那契數(shù)
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
}
public static void main(String[] argv) {
//創(chuàng)建連接和通道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = null;
try {
connection = factory.newConnection();
final Channel channel = connection.createChannel();
//聲明隊(duì)列
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
//一次只從隊(duì)列中取出一個(gè)消息
channel.basicQos(1);
System.out.println(" [x] Awaiting RPC requests");
//監(jiān)聽消息(即RPC請(qǐng)求)
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(properties.getCorrelationId())
.build();
//收到RPC請(qǐng)求后開始處理
String response = "";
try {
String message = new String(body, "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response += fib(n);
} catch (RuntimeException e) {
System.out.println(" [.] " + e.toString());
} finally {
//處理完之后,返回響應(yīng)(即發(fā)布消息)
System.out.println("[server current time] : " + System.currentTimeMillis());
channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
//loop to prevent reaching finally block
while (true) {
try {
Thread.sleep(100);
} catch (InterruptedException _ignore) {
}
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} finally {
if (connection != null)
try {
connection.close();
} catch (IOException _ignore) {
}
}
}
}
RPC服務(wù)的代碼很直白:
通常我們開始先建立連接鸠踪、通道并聲明隊(duì)列以舒。
我們可能會(huì)運(yùn)行多個(gè)服務(wù)進(jìn)程。為了負(fù)載均衡我們通過設(shè)置prefetchCount =1
將任務(wù)分發(fā)給多個(gè)服務(wù)進(jìn)程慢哈。
我們使用了basicConsume
來連接隊(duì)列蔓钟,并通過一個(gè)DefaultConsumer
對(duì)象提供回調(diào)。這個(gè)DefaultConsumer
對(duì)象將進(jìn)行工作并返回響應(yīng)卵贱。
我們的RPC客戶端RPCClient
代碼如下:
package com.maxwell.rabbitdemo;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
//定義一個(gè)RPC客戶端
public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
}
//真正地請(qǐng)求
public String call(String message) throws IOException, InterruptedException {
final String corrId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
if (properties.getCorrelationId().equals(corrId)) {
System.out.println("[client current time] : " + System.currentTimeMillis());
response.offer(new String(body, "UTF-8"));
}
}
});
return response.take();
}
//關(guān)閉連接
public void close() throws IOException {
connection.close();
}
public static void main(String[] argv) {
RPCClient fibonacciRpc = null;
String response = null;
try {
//創(chuàng)建一個(gè)RPC客戶端
fibonacciRpc = new RPCClient();
System.out.println(" [x] Requesting fib(30)");
//RPC客戶端發(fā)送調(diào)用請(qǐng)求滥沫,并等待影響,直到接收到
response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
} finally {
if (fibonacciRpc != null) {
try {
//關(guān)閉RPC客戶的連接
fibonacciRpc.close();
} catch (IOException _ignore) {
}
}
}
}
}
客戶端代碼看起來有一些復(fù)雜:
我們建立連接和通道键俱,并聲明了一個(gè)獨(dú)特的回調(diào)隊(duì)列兰绣。
我們訂閱這個(gè)回調(diào)隊(duì)列,所以我們可以接收RPC響應(yīng)编振。
我們的call方法執(zhí)行RPC請(qǐng)求缀辩。在call方法中,我們首先生成一個(gè)具有唯一性的correlationId
值并存在變量corrId
中。我們的DefaultConsumer
中的實(shí)現(xiàn)方法handleDelivery
會(huì)使用這個(gè)值來獲取爭取的響應(yīng)臀玄。然后瓢阴,我們發(fā)布了這個(gè)請(qǐng)求消息,并設(shè)置了replyTo
和correlationId
這兩個(gè)屬性健无。好了荣恐,現(xiàn)在我們可以坐下來耐心等待響應(yīng)到來了。
由于我們的消費(fèi)者處理(指handleDelivery
方法)是在子線程進(jìn)行的累贤,因此我們需要在響應(yīng)到來之前暫停主線程(否則主線程結(jié)束了叠穆,子線程接收到了影響傳給誰啊)臼膏。使用BlockingQueue
是一種解決方案硼被。在這里我們創(chuàng)建了一個(gè)阻塞隊(duì)列ArrayBlockingQueue
并將它的容量設(shè)為1,因?yàn)槲覀冎恍枰邮芤粋€(gè)響應(yīng)就可以啦渗磅。
handleDelivery
方法所做的很簡單祷嘶,當(dāng)有響應(yīng)來的時(shí)候,就檢查是不是和correlationId
匹配夺溢,匹配的話就放到阻塞隊(duì)列ArrayBlockingQueue
中。
同時(shí)烛谊,主線程正等待影響风响。
最終我們就可以將影響返回給用戶了。
現(xiàn)在丹禀,可以動(dòng)手實(shí)驗(yàn)了状勤。
首先,執(zhí)行RPC服務(wù)端双泪,讓它等待請(qǐng)求的到來持搜。
[x] Awaiting RPC requests
然后,執(zhí)行RPC客戶端焙矛,即RPCClient
中的main
方法葫盼,發(fā)起請(qǐng)求:
[x] Requesting fib(30)
[client current time] : 1500474305838
[.] Got '832040'
可以看到,客戶端很快就接受到了請(qǐng)求村斟,回頭看RPC服務(wù)端的時(shí)間:
[.] fib(30)
[server current time] : 1500474305835
上面這種設(shè)計(jì)并不是RPC服務(wù)端的唯一實(shí)現(xiàn)贫导,但是它有以下幾個(gè)重要的優(yōu)勢(shì):
①如果RPC服務(wù)端很慢,你可以通過運(yùn)行多個(gè)實(shí)例就可以實(shí)現(xiàn)擴(kuò)展蟆盹。
②在RPC客戶端孩灯,RPC要求發(fā)送和接受一個(gè)消息。非同步的方法queueDeclare
是必須的逾滥。這樣峰档,RPC客戶端只需要為一個(gè)RPC請(qǐng)求只進(jìn)行一次網(wǎng)絡(luò)往返。
但我們的代碼仍然太簡單,并沒有處理更復(fù)雜但也非常重要的問題讥巡,像:
①如果沒有服務(wù)端在運(yùn)行掀亩,客戶端該怎么辦
②客戶端應(yīng)該為一次RPC設(shè)置超時(shí)嗎
③如果服務(wù)端發(fā)生故障并拋出異常,它還應(yīng)該返回給客戶端嗎尚卫?
④在處理消息前归榕,先通過邊界檢查、類型判斷等手段過濾掉無效的消息等
說明
①與原文略有出入吱涉,如有疑問刹泄,請(qǐng)參閱原文
②原文均是編譯后通過javacp命令直接運(yùn)行程序,我是在IDE中進(jìn)行的怎爵,相應(yīng)的操作做了修改特石。
③添加了客戶端和服務(wù)端執(zhí)行時(shí)間。