什么是RPC
RPC(全稱為 Remote Procedure Call)即遠(yuǎn)程過程調(diào)用
在維基百科中這樣解釋
遠(yuǎn)程過程調(diào)用(英語:Remote Procedure Call邑彪,縮寫為 RPC)是一個(gè)計(jì)算機(jī)通信協(xié)議漂羊。該協(xié)議允許運(yùn)行于一臺(tái)計(jì)算機(jī)的程序調(diào)用另一臺(tái)計(jì)算機(jī)的子程序搬卒,而程序員無需額外地為這個(gè)交互作用編程召嘶。
RPC解決的事情
- 進(jìn)程間通訊
- 提供和本地方法調(diào)用一樣的調(diào)用機(jī)制
- 屏蔽程序員對(duì)遠(yuǎn)程調(diào)用的細(xì)節(jié)實(shí)現(xiàn)
簡(jiǎn)單的RPC結(jié)構(gòu)
簡(jiǎn)單的RPC可以分為五個(gè)部分:
- client
- client-stub
- network-service
- server-stub
- server
這5個(gè)部分的關(guān)系及基本流程如下圖
關(guān)系及基本流程圖
RPC基本過程
- 服務(wù)消費(fèi)方(client)以本地調(diào)用方式調(diào)用服務(wù)胡诗;
- client stub接收到調(diào)用后負(fù)責(zé)將方法摊趾、參數(shù)等組裝成能夠進(jìn)行網(wǎng)絡(luò)傳輸?shù)南Ⅲw顽频;
- client stub找到服務(wù)地址肛著,并將消息發(fā)送到服務(wù)端圆兵;
- server stub收到消息后進(jìn)行解碼;
- server stub根據(jù)解碼結(jié)果通過反射調(diào)用本地的服務(wù)枢贿;
- 本地服務(wù)執(zhí)行并將結(jié)果返回給server stub殉农;
- server stub將返回結(jié)果打包成消息并發(fā)送至消費(fèi)方;
- client stub接收到消息局荚,并進(jìn)行解碼超凳;
- 服務(wù)消費(fèi)方得到最終結(jié)果愈污。
一般情況下我們的RPC就是對(duì)我們的上面過程的2~8進(jìn)行封裝然后,給client的程序員感覺就像調(diào)用本地的方法一樣轮傍。
簡(jiǎn)單實(shí)現(xiàn)
首先是Client端的應(yīng)用層怎么發(fā)起RPC暂雹,CallerApp
public class CallerApp {
public static void main(String[] args) {
Calculator calculator = new CalculatorRemoteImpl();
int result = calculator.add(3, 3);
System.out.println(result);
}
}
通過一個(gè)CalculatorRemoteImpl,我們把RPC的邏輯封裝進(jìn)去了创夜,客戶端調(diào)用時(shí)感知不到遠(yuǎn)程調(diào)用的麻煩杭跪。將上面的2、3驰吓、4進(jìn)行封裝
public class CalculatorRemoteImpl implements Calculator {
public static final int PORT = 9090;
private static Logger log = LoggerFactory.getLogger(CalculatorRemoteImpl.class);
public int add(int a, int b) {
List<String> addressList = lookupProviders("Calculator.add");
String address = chooseTarget(addressList);
try {
Socket socket = new Socket(address, PORT);
// 將請(qǐng)求序列化
CalculateRpcRequest calculateRpcRequest = generateRequest(a, b);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
// 將請(qǐng)求發(fā)給服務(wù)提供方
objectOutputStream.writeObject(calculateRpcRequest);
// 將響應(yīng)體反序列化
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
Object response = objectInputStream.readObject();
log.info("response is {}", response);
if (response instanceof Integer) {
return (Integer) response;
} else {
throw new InternalError();
}
} catch (Exception e) {
log.error("fail", e);
throw new InternalError();
}
}
private CalculateRpcRequest generateRequest(int a, int b) {
CalculateRpcRequest calculateRpcRequest = new CalculateRpcRequest();
calculateRpcRequest.setA(a);
calculateRpcRequest.setB(b);
calculateRpcRequest.setMethod("add");
return calculateRpcRequest;
}
private String chooseTarget(List<String> providers) {
if (null == providers || providers.size() == 0) {
throw new IllegalArgumentException();
}
return providers.get(0);
}
public static List<String> lookupProviders(String name) {
List<String> strings = new ArrayList();
strings.add("127.0.0.1");
return strings;
}
}
這里用到了Socket來進(jìn)行遠(yuǎn)程通訊涧尿,同時(shí)利用ObjectOutputStream的writeObject和ObjectInputStream的readObject,來實(shí)現(xiàn)序列化和反序列化棚瘟。
下面是service端的實(shí)現(xiàn)
public class CalleeApp {
private static Logger log = LoggerFactory.getLogger(CalleeApp.class);
private Calculator calculator = new CalculatorImpl();
public static void main(String[] args) throws IOException {
new CalleeApp().run();
}
private void run() throws IOException {
ServerSocket listener = new ServerSocket(9090);
try {
while (true) {
Socket socket = listener.accept();
try {
// 將請(qǐng)求反序列化
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
Object object = objectInputStream.readObject();
log.info("request is {}", object);
// 調(diào)用服務(wù)
int result = 0;
if (object instanceof CalculateRpcRequest) {
CalculateRpcRequest calculateRpcRequest = (CalculateRpcRequest) object;
if ("add".equals(calculateRpcRequest.getMethod())) {
result = calculator.add(calculateRpcRequest.getA(), calculateRpcRequest.getB());
} else {
throw new UnsupportedOperationException();
}
}
// 返回結(jié)果
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(new Integer(result));
} catch (Exception e) {
log.error("fail", e);
} finally {
socket.close();
}
}
} finally {
listener.close();
}
}
}
示例代碼在GitHub上下載