前一篇文章簡(jiǎn)單介紹了下RPC的基本原理,同時(shí)附上了一個(gè)小的demo蒲稳,但是這個(gè)小的demo并不能在生產(chǎn)上使用,因?yàn)樯a(chǎn)上的RPC還需要考慮很多因素伍派,比如接入注冊(cè)中心江耀、高性能的網(wǎng)絡(luò)通信、高性能的序列化和反序列化诉植、自動(dòng)路由祥国、容錯(cuò)處理等等。要實(shí)現(xiàn)生產(chǎn)上使用的先不談晾腔,我們先來實(shí)現(xiàn)一個(gè)稍微復(fù)雜的RPC舌稀,借助這個(gè)RPC例子來更深刻的理解RPC原理,為后續(xù)Dubbo源碼的分析做準(zhǔn)備建车。
一扩借、簡(jiǎn)單RPC架構(gòu)設(shè)計(jì)
回顧下RPC原理圖:
如果要自己設(shè)計(jì)實(shí)現(xiàn)稍微簡(jiǎn)單的一個(gè)rpc框架,應(yīng)該需要考慮注冊(cè)中心缤至、網(wǎng)絡(luò)通信潮罪、序列化等內(nèi)容,因?yàn)榭梢栽O(shè)計(jì)出如下鍵略的RPC架構(gòu)圖:
二领斥、項(xiàng)目起步說明
1嫉到、本RCP項(xiàng)目涉及的功能點(diǎn)
動(dòng)態(tài)代理、反射月洛、序列化何恶、反序列化、網(wǎng)絡(luò)通信嚼黔、編解碼细层、服務(wù)發(fā)現(xiàn)和注冊(cè)惜辑、心跳與鏈路檢測(cè)等
2、本小節(jié)內(nèi)容說明
設(shè)計(jì)的技術(shù)點(diǎn):動(dòng)態(tài)代理技術(shù)疫赎、反射(有關(guān)動(dòng)態(tài)代理可以看我的另一篇博文:代理模式)
實(shí)現(xiàn)的功能:簡(jiǎn)易的基于socket的rpc
3盛撑、本小節(jié)項(xiàng)目總體框架圖
三、編碼實(shí)現(xiàn)
1捧搞、api模塊
創(chuàng)建用于測(cè)試的接口
public interface HelloService {
String sayHello(String name);
}
2抵卫、common模塊
request請(qǐng)求實(shí)體類
public class Request implements Serializable {
private static final long serialVersionUID = 7929047349488932740L;
/**
* 請(qǐng)求表示id
*/
private String requestId;
/**
* 請(qǐng)求服務(wù)類型
*/
private String className;
/**
* 請(qǐng)求方法名稱
*/
private String methodName;
/**
* 請(qǐng)求方法參數(shù)類型數(shù)組
*/
private Class<?>[] parameterTypes;
/**
* 請(qǐng)求參數(shù)列表
*/
private Object[] args;
......省略getter/setter
}
response響應(yīng)實(shí)體類:
public class Response {
private static final long serialVersionUID = -1023480952777229650L;
private String requestId;
/**
* 響應(yīng)狀態(tài)嗎
*/
private int code;
/**
* 響應(yīng)消息說明
*/
private String msg;
/**
* 相應(yīng)數(shù)據(jù)
*/
private Object data;
......省略getter/setter
3、provider模塊
服務(wù)的暴露(包好服務(wù)的注冊(cè)和服務(wù)的發(fā)布)胎撇,服務(wù)端基本流程是
服務(wù)注冊(cè)->服務(wù)發(fā)布->服務(wù)啟動(dòng)監(jiān)聽請(qǐng)求(socket)->處理請(qǐng)求
//rpc代理服務(wù)介粘,用于暴露服務(wù)
public class RpcProxyServer {
/**
* 創(chuàng)建一個(gè)線程池
*/
ExecutorService executorService = Executors.newCachedThreadPool();
/**
* 端口號(hào)
*/
private int port;
/**
* 1、服務(wù)注冊(cè)
* @param serviceInterface
* @param impClass
* @return
*/
public RpcProxyServer register(Class serviceInterface, Class impClass) {
//注冊(cè)服務(wù)(接口名:實(shí)現(xiàn)類名)
ProcessorHandler.register(serviceInterface, impClass);
return this;
}
public RpcProxyServer(int port) {
this.port = port;
}
/**
*2晚树、 啟動(dòng)發(fā)布(啟動(dòng))
*/
public void start() {
System.out.println("服務(wù)啟動(dòng)====");
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(port);
while (true) {//3姻采、通過循環(huán)不斷接受請(qǐng)求
Socket socket = serverSocket.accept();//監(jiān)聽客戶端的請(qǐng)求
//4、每一個(gè)socket交給一個(gè)processorhandler處理题涨,這里的target就是真正的業(yè)務(wù)類
executorService.execute(new ProcessorHandler(socket));//處理客戶端的請(qǐng)求
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (serverSocket != null) {
try {
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
具體處理請(qǐng)求的handler
//服務(wù)端接受請(qǐng)求處理線程
public class ProcessorHandler implements Runnable {
private static final HashMap<String, Class<?>> serviceRegistry = new HashMap<String, Class<?>>();
/**
* socket
*/
private Socket socket;
public static void register(Class serviceInterface, Class impClass) {
//注冊(cè)服務(wù)(接口名:實(shí)現(xiàn)類名)
serviceRegistry.put(serviceInterface.getName(), impClass);
}
public ProcessorHandler(Socket socket) {
this.socket = socket;
}
public void run() {
//用于定義輸入流和輸出流
ObjectInputStream objectInputStream = null;
ObjectOutputStream objectOutputStream = null;
try {
objectInputStream = new ObjectInputStream(socket.getInputStream());
//從socket中讀取請(qǐng)求流對(duì)象
Request rpcRequest = (Request) objectInputStream.readObject();
//調(diào)用正真的處理方法
Object result = invoke(rpcRequest);
Response response = new Response();
response.setRequestId(rpcRequest.getRequestId());
response.setData(result);
response.setMsg(ResponseCodeEnum.SUCCESS.getMsg());
response.setCode(ResponseCodeEnum.SUCCESS.getCode());
//將結(jié)果通過socket輸出
objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(result);
objectOutputStream.flush();
} catch (Exception e) {
e.printStackTrace();
} finally {
closeOpenSource(objectInputStream, objectOutputStream);
}
}
private void closeOpenSource(ObjectInputStream objectInputStream, ObjectOutputStream objectOutputStream) {
if (objectInputStream != null) {
try {
objectInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (objectOutputStream != null) {
try {
objectOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 利用反射技術(shù)執(zhí)行正真的方法(這里只是簡(jiǎn)單的實(shí)現(xiàn)偎谁,沒有容錯(cuò)處理)
*
* @param rpcRequest
* @return
*/
private Object invoke(Request rpcRequest) throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InstantiationException, InvocationTargetException {
//獲取目標(biāo)對(duì)象并執(zhí)行目標(biāo)方法(也就是獲取注冊(cè)后的接口實(shí)現(xiàn)類對(duì)象)
Class<?> targetClass = serviceRegistry.get(rpcRequest.getClassName());
Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
Method method = targetClass.getMethod(rpcRequest.getMethodName(), parameterTypes);
Object[] args = rpcRequest.getArgs();
return method.invoke(targetClass.newInstance(), args);
}
}
用到的枚舉
public enum ResponseCodeEnum {
SUCCESS(0, "success"),
FAIL(1, "fail");
....省略
rpc接口實(shí)現(xiàn)類:
public class HelloServiceImpl implements HelloService {
public String sayHello(String name) {
return "hello " + name;
}
}
啟動(dòng)服務(wù)main方法:
public class Demo1Main {
public static void main(String[] args) {
//創(chuàng)建代理服務(wù)
RpcProxyServer rpcProxyServer = new RpcProxyServer(8888);
//注冊(cè)服務(wù)
rpcProxyServer.register(HelloService.class, HelloServiceImpl.class);
//啟動(dòng)服務(wù)
rpcProxyServer.start();
}
}
4、consumer模塊
消費(fèi)端模塊主要是通過jdk動(dòng)態(tài)代理的方式實(shí)現(xiàn)rpc接口代理請(qǐng)求遠(yuǎn)程纲堵,基本流程
client->創(chuàng)建代理對(duì)象->通過代理對(duì)象請(qǐng)求遠(yuǎn)程服務(wù)->接受返回的信息
public class ClientProxy<T> {
/**
* 服務(wù)端代理接口
*/
private Class<T> serverInstance;
/**
* 服務(wù)端地址
*/
private InetSocketAddress address;
public ClientProxy(Class<T> serverInstance, String ip, Integer port) {
this.address = new InetSocketAddress(ip, port);
this.serverInstance = serverInstance;
}
/**
* 獲取客戶端代理對(duì)象
*
* @return
*/
public T getClientInstance() {
return (T) Proxy.newProxyInstance(serverInstance.getClassLoader(), new Class<?>[]{serverInstance}, new RemoteInvocationHandler(address));
}
}
具體遠(yuǎn)程調(diào)用invoke方法(jdk動(dòng)態(tài)代理InvocationHandler)
public class RemoteInvocationHandler implements InvocationHandler {
/**
* 服務(wù)端地址
*/
private InetSocketAddress address;
public RemoteInvocationHandler(InetSocketAddress address) {
this.address=address;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Request rpcRequest = new Request();
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameterTypes(method.getParameterTypes());
rpcRequest.setArgs(args);
//通過網(wǎng)絡(luò)發(fā)送正式請(qǐng)求
RpcNetTransport netTransport = new RpcNetTransport(address.getPort(), address.getHostName());
Object result = (Object) netTransport.send(rpcRequest);
return result;//返回收到的結(jié)果
}
}
具體的rpc網(wǎng)絡(luò)請(qǐng)求(socket)
//網(wǎng)絡(luò)傳送
public class RpcNetTransport {
private int port;
private String host;
public RpcNetTransport(int port, String host) {
this.port = port;
this.host = host;
}
/**
* 發(fā)送請(qǐng)求
*
* @param request
*/
public Object send(Request request) throws IOException, ClassNotFoundException {
Socket socket = null;
ObjectOutputStream outputStream = null;
ObjectInputStream inputStream = null;
try {
// 1.創(chuàng)建Socket客戶端巡雨,根據(jù)指定地址連接遠(yuǎn)程服務(wù)提供者
socket = new Socket(host, port);
//2、將遠(yuǎn)程服務(wù)調(diào)用所需的接口類席函、方法名铐望、參數(shù)列表等編碼后發(fā)送給服務(wù)提供者
outputStream=new ObjectOutputStream(socket.getOutputStream());
outputStream.writeObject(request);
//3、同步阻塞等待服務(wù)器返回應(yīng)答茂附,獲取應(yīng)答后返回
inputStream = new ObjectInputStream(socket.getInputStream());
return inputStream.readObject();
} finally {
if (socket != null) {
socket.close();
}
if (outputStream != null) {
outputStream.close();
}
if (inputStream != null) {
inputStream.close();
}
}
}
}
消費(fèi)端消費(fèi)服務(wù)main方法:
public class Demo1Main {
public static void main(String[] args) {
ClientProxy clientProxy = new ClientProxy(HelloService.class, "127.0.0.1", 8888);
HelloService helloService = (HelloService) clientProxy.getClientInstance();
String result = helloService.sayHello("嘿嘿嘿");
System.out.println(result);
}
}
三正蛙、總結(jié)與思考
總結(jié):本節(jié)實(shí)現(xiàn)了一個(gè)非常簡(jiǎn)單的rpc原型項(xiàng)目,包含了服務(wù)注冊(cè)营曼、采用BIO的網(wǎng)絡(luò)通信模型傳送數(shù)據(jù)乒验、采用jdk原生代理模式進(jìn)行服務(wù)代理、采用jdk原生的序列化方式進(jìn)行序列化和反序列化等蒂阱。后續(xù)將會(huì)針對(duì)該原型項(xiàng)目不斷的改進(jìn)锻全,不斷的引入新的“武器”,來豐富整個(gè)rpc項(xiàng)目录煤。
后期預(yù)熱:引入注冊(cè)中心(解決服務(wù)治理問題)鳄厌、引入多種高效的序列化機(jī)制、引入NIO的網(wǎng)絡(luò)通信模型妈踊、引入軟負(fù)載均衡機(jī)制了嚎、引入spi擴(kuò)展機(jī)制、接入spring等等,敬請(qǐng)期待歪泳。