如何調(diào)用他人的遠程服務(wù)?
由于各服務(wù)部署在不同機器,服務(wù)間的調(diào)用免不了網(wǎng)絡(luò)通信過程漱凝,服務(wù)消費方每調(diào)用一個服務(wù)都要寫一坨網(wǎng)絡(luò)通信相關(guān)的代碼,不僅復(fù)雜而且極易出錯揽咕。要讓網(wǎng)絡(luò)通信細節(jié)對使用者透明鞋邑,我們需要對通信細節(jié)進行封裝,我們先看下一個RPC調(diào)用的流程涉及到哪些通信細節(jié):
- 服務(wù)消費方(client)調(diào)用以本地調(diào)用方式調(diào)用服務(wù)察藐;
- client stub接收到調(diào)用后負責將方法埂息、參數(shù)等組裝成能夠進行網(wǎng)絡(luò)傳輸?shù)南Ⅲw技潘;
- client stub找到服務(wù)地址,并將消息發(fā)送到服務(wù)端千康;
- server stub收到消息后進行解碼享幽;
- server stub根據(jù)解碼結(jié)果調(diào)用本地的服務(wù);
- 本地服務(wù)執(zhí)行并將結(jié)果返回給server stub拾弃;
- server stub將返回結(jié)果打包成消息并發(fā)送至消費方值桩;
- client stub接收到消息,并進行解碼砸彬;
- 服務(wù)消費方得到最終結(jié)果颠毙。
RPC的目標就是要2~8這些步驟都封裝起來,讓用戶對這些細節(jié)透明砂碉。
1. 怎么做到透明化遠程服務(wù)調(diào)用蛀蜜?
怎么封裝通信細節(jié)才能讓用戶像以本地調(diào)用方式調(diào)用遠程服務(wù)呢?對java來說就是使用代理增蹭!java代理有兩種方式:1) jdk 動態(tài)代理滴某;2)字節(jié)碼生成。盡管字節(jié)碼生成方式實現(xiàn)的代理更為強大和高效滋迈,但代碼維護不易霎奢,大部分公司實現(xiàn)RPC框架時還是選擇動態(tài)代理方式。我們這個最簡易版的自然也是采用動態(tài)代理的方式饼灿。
2. 怎么對消息進行編碼和解碼幕侠?
2.1 確定消息數(shù)據(jù)結(jié)構(gòu)
- 接口名稱:在我們的例子里接口名是“HelloWorldService”,如果不傳碍彭,服務(wù)端就不知道調(diào)用哪個接口了晤硕;
- 方法名:一個接口內(nèi)可能有很多方法悼潭,如果不傳方法名服務(wù)端也就不知道調(diào)用哪個方法;
- 參數(shù)類型&參數(shù)值參數(shù)類型有很多舞箍,比如有bool舰褪、int、long疏橄、double占拍、string、map捎迫、list晃酒,甚至如struct(class)以及相應(yīng)的參數(shù)值;超時時間
2.2 序列化
從RPC的角度上看立砸,主要看三點:1)通用性掖疮,比如是否能支持Map等復(fù)雜的數(shù)據(jù)結(jié)構(gòu);2)性能颗祝,包括時間復(fù)雜度和空間復(fù)雜度,由于RPC框架將會被公司幾乎所有服務(wù)使用恼布,如果序列化上能節(jié)約一點時間螺戳,對整個公司的收益都將非常可觀折汞,同理如果序列化上能節(jié)約一點內(nèi)存倔幼,網(wǎng)絡(luò)帶寬也能省下不少;3)可擴展性爽待,對互聯(lián)網(wǎng)公司而言损同,業(yè)務(wù)變化飛快,如果序列化協(xié)議具有良好的可擴展性鸟款,支持自動增加新的業(yè)務(wù)字段膏燃,而不影響老的服務(wù),這將大大提供系統(tǒng)的靈活度何什。我們的是最簡易版所以就采用了jdk序列化的方式來處理组哩。
開始擼代碼
-
初始化工程
首先創(chuàng)建2個項目分別是server和client;server項目下兩個模塊分別是rpc-server-api和rpc-server-provider处渣。
為什么server項目要創(chuàng)建兩個模塊伶贰?
client在調(diào)用服務(wù)端的服務(wù)時需要知道服務(wù)端的一些信息,client可以依賴于這個模塊罐栈。我們的項目中SDK和契約包就是提供了這個功能黍衙。而真正的實現(xiàn)是放在rpc-server-provider中。
- rpc-server-api
public interface IHelloService {
String sayHello(String content);
String saveUser(User user);
}
請求參數(shù)類
private String className;
private String methodName;
private Object[] parameters;
-
rpc-server-provider
首先rpc-server-provider是依賴rpc-server-api的荠诬。我們寫一個實現(xiàn)類琅翻,來實現(xiàn)api中定義的接口位仁。
public class HelloServiceImpl implements IHelloService{ @Override public String sayHello(String content) { System.out.println("request in sayHello:"+content); return "Say Hello:"+content; } }
我這么寫好了實現(xiàn)遠程要怎么才能調(diào)用的到呢?我們還需把服務(wù)暴露出去望迎,那就需要一個服務(wù)暴露的方法障癌。這里就是不斷去接受請求,每一個socket交給一個processorHandler來處理辩尊。
public class RpcProxyServer {
ExecutorService executorService = Executors.newCachedThreadPool();
public void publisher(Object service, int port) {
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(port);
while (true) {//不斷接受請求
Socket socket = serverSocket.accept();//BIO
//每一個socket 交給一個processorHandler來處理
executorService.execute(new ProcessorHandler(socket, service));
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (serverSocket != null) {
try {
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
具體processorHandler的代碼涛浙,從socket中獲取請求對象,并是使用請求對象調(diào)用本服務(wù)方法摄欲,同時返回方法執(zhí)行結(jié)果轿亮,將返回結(jié)果寫入socket中。
public class ProcessorHandler implements Runnable {
private Socket socket;
private Object service;
public ProcessorHandler(Socket socket, Object service) {
this.socket = socket;
this.service = service;
}
@Override
public void run() {
try (InputStream inputStream = socket.getInputStream();
ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
//輸入流中應(yīng)該有什么東西胸墙?
//請求哪個類我注,方法名稱、參數(shù)
RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
Object result = invoke(rpcRequest); //反射調(diào)用本地服務(wù)
objectOutputStream.writeObject(result);
objectOutputStream.flush();
} catch (IOException | ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
}
}
private Object invoke(RpcRequest request) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
//反射調(diào)用
Object[] args = request.getParameters(); //拿到客戶端請求的參數(shù)
Class<?>[] types = new Class[args.length]; //獲得每個參數(shù)的類型
for (int i = 0; i < args.length; i++) {
types[i] = args[i].getClass();
}
Class clazz = Class.forName(request.getClassName()); //跟去請求的類進行加載
Method method = clazz.getMethod(request.getMethodName(), types); //sayHello, saveUser找到這個類中的方法
return method.invoke(service, args);
}
}
大功告成迟隅,把服務(wù)發(fā)布出去但骨。
/**
* Hello world!
*
*/
public class App {
public static void main( String[] args ){
IHelloService helloService=new HelloServiceImpl();
RpcProxyServer proxyServer=new RpcProxyServer();
// 發(fā)布到8080端口
proxyServer.publisher(helloService,8080);
}
}
-
客戶端代碼開擼。我們現(xiàn)在在客戶端依賴了服務(wù)端的api(SDK智袭、契約包)如何才能實現(xiàn)調(diào)用遠程方法呢奔缠?類似于服務(wù)端代理類。
public class RpcProxyClient { public <T> T clientProxy(final Class<T> interfaceCls,final String host,final int port){ return (T)Proxy.newProxyInstance(interfaceCls.getClassLoader(), new Class<?>[]{interfaceCls},new RemoteInvocationHandler(host,port)); } }
public class RemoteInvocationHandler implements InvocationHandler { private String host; private int port; public RemoteInvocationHandler(String host, int port) { this.host = host; this.port = port; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //請求數(shù)據(jù)的包裝 RpcRequest rpcRequest=new RpcRequest(); rpcRequest.setClassName(method.getDeclaringClass().getName()); rpcRequest.setMethodName(method.getName()); rpcRequest.setParameters(args); //遠程通信 RpcNetTransport netTransport=new RpcNetTransport(host,port); Object result=netTransport.send(rpcRequest); return result; } }
處理網(wǎng)絡(luò)傳輸?shù)念?/p>
public class RpcNetTransport { private String host; private int port; public RpcNetTransport(String host, int port) { this.host = host; this.port = port; } public Object send(RpcRequest request) { Object result = null; try (//建立連接 Socket socket = new Socket(host, port); //網(wǎng)絡(luò)socket ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream()); ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream())) { outputStream.writeObject(request); //序列化() outputStream.flush(); result = inputStream.readObject(); } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); } return result; }
最后使用這個遠程調(diào)用:
public class App { public static void main(String[] args) { RpcProxyClient rpcProxyClient = new RpcProxyClient(); IHelloService iHelloService = rpcProxyClient.clientProxy(IHelloService.class,"localhost",8080); } }
最后畫一張來總結(jié)一下整個流程