注:這是RabbitMQ-java版Client的指導(dǎo)教程翻譯系列文章,歡迎大家批評指正
第一篇Hello Word了解RabbitMQ的基本用法
第二篇Work Queues介紹隊列的使用
第三篇Publish/Subscribe介紹轉(zhuǎn)換器以及其中fanout類型
第四篇Routing介紹direct類型轉(zhuǎn)換器
第五篇Topics介紹topic類型轉(zhuǎn)換器
第六篇RPC介紹遠(yuǎn)程調(diào)用
遠(yuǎn)程過程調(diào)用(Remote procedure call )
在第二篇指導(dǎo)教程中溺忧,我們學(xué)會在多個消費(fèi)者之間使用工作隊列循環(huán)分發(fā)任務(wù)昧诱。但是如果我們需要在遠(yuǎn)程電腦上運(yùn)行一個程序并且需要返回結(jié)果呢。這就是另一個不同的事情,這個模型就是聞名的遠(yuǎn)程過程調(diào)用,簡稱RPC。
在這篇指導(dǎo)教程中调塌,我們將會使用RabbitMQ去創(chuàng)建一個RPC系統(tǒng):一個客戶端和一個可測試的遠(yuǎn)程服務(wù)。因為我們沒有按時間消費(fèi)的任務(wù)去分配惠猿,所以將會創(chuàng)建一個仿制的RPC服務(wù)羔砾,用于返回斐波那契數(shù)列。
客戶端接口(Client interface)
為了解釋RPC服務(wù)是如何使用的偶妖,我們將創(chuàng)建一個簡單的客戶端類姜凄,它將會暴露出一個call的方法,用于發(fā)送RPC請求趾访,并且阻塞住直到結(jié)果被返回:
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);
RPC注意
盡管RPC在計算中是一個很常見的模式态秧,但是它仍有很多不足的地點(diǎn)。問題產(chǎn)生的原因程序員沒有意識到這個功能正在本地調(diào)用扼鞋,還是服務(wù)端反應(yīng)慢申鱼。對這樣不可預(yù)料的系統(tǒng)結(jié)果就會有困惑,就會去增加一些不必要的復(fù)雜的調(diào)試云头。濫用RPC可能會導(dǎo)致不可維護(hù)捐友,難于理解的代碼,而不是簡化軟件溃槐。
請注意匣砖,考慮下面的一些建議:
功能顯而易見在本地調(diào)用還是遠(yuǎn)程調(diào)用。
用文檔記錄你的系統(tǒng)昏滴,確保各組件之間依賴清晰猴鲫。
處理錯誤情況。當(dāng)RPC服務(wù)端好久沒有反應(yīng)谣殊,客戶端如何響應(yīng)拂共?
處于困境的時候避免使用RPC。如果可以使用姻几,你應(yīng)該使用異步請求方式宜狐,而不是RPC阻塞的方式。異步返回結(jié)果會被推送到另一個計算階段鲜棠。
返回隊列(Callback queue)
一般來說使用RabbitMQ來進(jìn)行RPC是簡單的肌厨,一個客戶端發(fā)出請求消息和一個服務(wù)端返回相應(yīng)消息培慌。為了能夠接受到響應(yīng)豁陆,我們需要在請求中發(fā)送一個callback隊列的地址,我們使用默認(rèn)的隊列(客戶端唯一的隊列),試試:
callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();
channel.basicPublish("", "rpc_queue", props, message.getBytes());
// ... then code to read a response message from the callback_queue ...
消息屬性
AMQP 0-9-1協(xié)議預(yù)先定義了14個屬性吵护,大部分的屬性很少使用盒音,下面有一些說明:
deliveryMode:標(biāo)記這個消息可以持久化(值為2)表鳍,或者短暫保留(值為其它的),在第二章里面有提到過這個屬性
contentType:用于描述文本的類型。例如經(jīng)常使用JSON的編碼方式祥诽,這是經(jīng)常設(shè)置的屬性:application/json
replyTo:經(jīng)常用于存儲返回隊列的名字
correlationId:對請求的RPC相應(yīng)是有用的
我們使用一個新的引用:
import com.rabbitmq.client.AMQP.BasicProperties;
相關(guān)聯(lián)的Id(Correlation Id)
按照目前上述的方法譬圣,我們需要為每一個RPC請求都創(chuàng)建一個callBack隊列,顯然不夠高效雄坪。幸運(yùn)的是這里有一種更好的方式厘熟,一個客戶端我們只需要創(chuàng)建一個callback隊列。
這會導(dǎo)致一個新的問題维哈,在隊列中接收的響應(yīng)不知道是哪一個請求的绳姨,這就需要使用到correlationId屬性,我們?yōu)槊恳粋€請求都設(shè)置唯一correlationId的值阔挠,基于這個值我們就能都找到匹配請求的相應(yīng)飘庄。如果我們有一個不匹配correlationId的值,或許可以刪除這條消息购撼,因為它不屬于我們的請求跪削。
你可能會問:在返回隊列中我們?yōu)槭裁磻?yīng)該忽略掉不知道的消息,而不是當(dāng)做一個錯誤迂求?在服務(wù)端有一種可能碾盐,在發(fā)送我們一條消息的之后,RPC服務(wù)死了揩局,但是反饋信息已經(jīng)發(fā)出去了廓旬。如果發(fā)生這種情況,重新啟動的RPC服務(wù)端將會再次處理這個消息谐腰,這就是為什么我們必須在客戶端處理這個多余的相應(yīng)孕豹。對于RPC也是這樣理解的(這一段亂七八糟)。
總結(jié)
我們RPC就像上圖這樣工作:
當(dāng)客戶端創(chuàng)建十气,它創(chuàng)建一個異步唯一的callback隊列
對于一個RPC請求來說励背,客戶端發(fā)送帶有兩個屬相的消息:replyTo,被設(shè)置callback隊列的名稱砸西;correlationId被設(shè)置為唯一請求的值叶眉。
這個請求被發(fā)送到rpc-queue隊列
在隊列中RPC工作者等待著請求,當(dāng)請求來的時候芹枷,它處理工作衅疙,把帶有反饋隊列的消息發(fā)送回客戶端,使用replyTo中的返回隊列鸳慈。
客戶端在callback隊列中等待返回數(shù)據(jù)饱溢,當(dāng)消息來得時候,它會檢查correlationId的屬性走芋,如果它匹配請求中的correlationId的值绩郎,就會返回相應(yīng)給應(yīng)用潘鲫。
綜合
斐波那契數(shù)列任務(wù):
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n-1) + fib(n-2);
}
我們聲明了斐波那契數(shù)列的方法,它表明只有一個有效積極的數(shù)輸入(不要期望去處理很多的數(shù)字肋杖,實(shí)現(xiàn)起來會很慢很慢的)
下面是RPCServer.javade daima 溉仑,這里下載:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
public static void main(String[] argv) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = null;
try {
connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
System.out.println(" [x] Awaiting RPC requests");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();
String response = "";
try {
String message = new String(body,"UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response += fib(n);
} catch (RuntimeException e){
System.out.println(" [.] " + e.toString());
} finally {
channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
//...
}}}
服務(wù)端代碼是非常直觀明了
跟以往一樣,先建立連接状植,創(chuàng)建通道浊竟,聲明隊列。我們可能想要運(yùn)行多個服務(wù)進(jìn)程津畸,為了創(chuàng)建更多的服務(wù)者逐沙,我們需要在channel.basicQos方法中設(shè)置prefetchCount的值。
我們使用bacisConsume去連接隊列洼畅,隊列中我們提供一個一個返回表單的對象吩案,用于工作并且返回相應(yīng)。
下面是RPCClient.java的源代碼帝簇,這里下載:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
}
public String call(String message) throws IOException, InterruptedException {
String corrId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
final BlockingQueue response = new ArrayBlockingQueue(1);
channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
if (properties.getCorrelationId().equals(corrId)) {
response.offer(new String(body, "UTF-8"));
}
}
});
return response.take();
}
public void close() throws IOException {
connection.close();
}
//...
}
客戶端代碼有一些調(diào)用:
我們創(chuàng)建連接徘郭,通道,聲明了唯一作為響應(yīng)的返回隊列丧肴,我們訂閱了callback的隊列残揉,這樣我們就可以接受到RPC的響應(yīng)。
我們的call方法將會調(diào)用RPC的請求芋浮。
這里抱环,我們第一次定義了唯一的correlationId值并且保存它。在DefaultConsumer中實(shí)現(xiàn)的handleDelivery的方法將會使用該值去和獲取到響應(yīng)的correlationId比較纸巷。
下一步镇草,我們發(fā)布一個請求的消息,并且?guī)в袃蓚€屬性:replyTo和correlationId.
這個時候瘤旨,我們可以停下來梯啤,等待合適的反饋。
消費(fèi)者開啟另一個線程中處理消息存哲,在響應(yīng)前我們應(yīng)該先擱置主線程因宇。使用BlockingQueue來處理,這就是我們創(chuàng)建只有一個容器ArrayblockingQueue祟偷,正如我們只需要去等待一個響應(yīng)察滑。
這個handleDelivery方法在做一些簡單的工作,對于每一個響應(yīng)消息它都會檢查correlationId是否是我們想要的那個修肠,然后把結(jié)果發(fā)送到blockQueue中贺辰。
同時主線程將會等待從BloakingQueue中取出消息。
最后我們返回結(jié)果給使用者。
客戶端發(fā)送請求:
RPCClient fibonacciRpc = new RPCClient();
System.out.println(" [x] Requesting fib(30)");
String response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");
fibonacciRpc.close();
現(xiàn)在希望好好看一下我們例子中的源代碼魂爪。
跟以前的指導(dǎo)文件中一樣編譯:
javac -cp $CP RPCClient.java RPCServer.java
我們RPC服務(wù)已經(jīng)準(zhǔn)備好了,現(xiàn)在開啟服務(wù):
java -cp $CP RPCServer
# => [x] Awaiting RPC requests
運(yùn)行想要獲取斐波那契數(shù)列的客戶端:
java -cp $CP RPCClient
# => [x] Requesting fib(30)
目前的設(shè)計不僅僅只是實(shí)現(xiàn)RPC服務(wù)的接口艰管,它還有一些其它的重要優(yōu)勢:如果RPC服務(wù)太慢滓侍,你可以按比例增加運(yùn)行一個RPC,在一個新的控制臺運(yùn)行第二個RPC服務(wù)牲芋。
在客戶端撩笆,RPC只能需要發(fā)送和接收一條消息,同步請求像queueDeclare是必須的缸浦,因此RPC客戶端的結(jié)果需要連接網(wǎng)絡(luò)才能獲取到一個簡單的RPC請求夕冲。(亂七八糟)
我們的代碼依然是非常的簡單,并沒有解決很復(fù)雜的問題裂逐,像:
- 如果沒有運(yùn)行服務(wù)端歹鱼,客戶端如何響應(yīng)?
- 客戶端應(yīng)該對RPC設(shè)置超時操作么卜高?
- 如果服務(wù)端發(fā)生故障了并且爆出了異常弥姻,應(yīng)該把異常發(fā)送給客戶端么?
- 在進(jìn)行處理之前掺涛,保護(hù)無效的消息么(檢查綁定和類型)庭敦?
第六節(jié)的內(nèi)容大致翻譯完了,這里是原文鏈接薪缆。
終篇是我對RabbitMQ使用理解的總結(jié)文章秧廉,歡迎討教。
--謝謝--