一個簡單的rpc demo
最近在網(wǎng)上看到阿里巴巴2015年的中間件性能挑戰(zhàn)賽的一個題目圣蝎,實現(xiàn)一個簡單的RPC框架刹悴,于是乎有一種沖動實現(xiàn)一個簡單的rpc,要求基本按照競賽題目的要求蛹稍,具體如下:
1.要成為框架:對于框架的使用者,隱藏RPC實現(xiàn)扒吁。
2.網(wǎng)絡模塊可以自己編寫,如果要使用IO框架室囊,要求使用netty-4.0.23.Final。
3.能夠傳輸基本類型魁索、自定義業(yè)務類型融撞、異常類型(要在客戶端拋出)。
4.支持異步調用粗蔚,提供future尝偎、callback的能力。
5.要處理超時場景鹏控,服務端處理時間較長時致扯,客戶端在指定時間內跳出本次調用。
6.提供RPC上下文当辐,客戶端可以透傳數(shù)據(jù)給服務端抖僵。
7.提供Hook,讓開發(fā)人員進行RPC層面的AOP缘揪。
最終預期的框架結構:
- ConsumerService耍群、ProviderService是提供給client和server端使用的api接口。
- InterceptorChain:提供了RPC層面的AOP功能找筝。日志蹈垢、白名單過濾、權限認證等等袖裕。
- RpcContext:提供上線文曹抬,雙端可以透明傳輸數(shù)據(jù)。
- Connector急鳄、Acceptor:網(wǎng)絡模塊谤民,第一步自己用JavaSocket實現(xiàn)。
要求有了攒岛,下面第一步先整一個能跑起來的赖临!
第一步先跑起來
先把我們預期能實現(xiàn)的功能擺出來:
基本調用鏈路暢通,能夠傳輸基本類型灾锯、自定義業(yè)務類型兢榨、異常類型(要在客戶端拋出)。
測試用的業(yè)務接口UserService:
/**
* 測試用業(yè)務接口
*
* @author wqx
*
*/
public interface UserService {
/**
* 基本鏈路測試
*
* @return
*/
public String test();
/**
* 自定義業(yè)務類型測試
*
* @param userId
* @return
*/
public User queryUserById(int userId);
/**
* 異常測試
*
* @throws IOException
*/
public Object exceptionTest() throws RpcException;
}
業(yè)務實現(xiàn)UserServiceImpl類:
/**
* 測試業(yè)務接口實現(xiàn)類
*
* @author wqx
*
*/
public class UserServiceImpl implements UserService {
public String test() {
return "hello client, this is rpc server.";
}
public User queryUserById(int userId) {
User parent = new User(100,"小明爸爸");
User child = new User(101,"小明同學");
parent.addChild(child);
return parent;
}
public Object exceptionTest() throws RpcException {
throw new RpcException("exception occur in server!3炒稀凌那!");
}
}
測試用的自定義業(yè)務類型User
/**
* 測試用的自定義業(yè)務類型
*
* @author wqx
*
*/
public class User implements java.io.Serializable{
private static final long serialVersionUID = 493399440916323966L;
private Integer id;
private String name;
private List<User> childs;
public void addChild(User child){
if(childs == null){
childs = new ArrayList<User>();
}
childs.add(child);
}
//。吟逝。帽蝶。getter setter
}
需求明確。块攒。励稳。鍵盤飛起。囱井。驹尼。
基本步驟很簡單,通過Proxy對客戶端方法調用進行攔截庞呕,在代理對象的回調方法中新翎,發(fā)起遠程調用,網(wǎng)絡模塊先采用簡單的Java提供的SocketAPI吧住练,對象的序列化和反序列化也是用JDK自帶的功能實現(xiàn)地啰。一切從簡從速!=补洹亏吝!
基本流程如下圖:
- invokeMethod:客戶端調用目標對象的方法,被代理對象攔截盏混。
- wrap Request:封裝調用參數(shù)(方法名顺呕,方法參數(shù)等),實現(xiàn)RpcRequest對象括饶,并序列化株茶。
- network transport:網(wǎng)絡傳輸,將序列化的參數(shù)對象傳輸?shù)侥繕朔斩恕?/li>
- unwrap request:對接收到的請求參數(shù)進行反序列化過程图焰。
- invokeMethod:通過反射機制method.invoke(obj,args)調用目標方法启盛。
- 將結果包裝在RpcResponse對象中,進行序列化技羔,為返回做準備僵闯。
- 和步驟3一樣,服務端通過網(wǎng)絡將結果返回給客戶端藤滥。
- unwrap Response:反序列化鳖粟,得到RpcResponse對象,從中獲取結果retVal拙绊,并返回客戶端向图。
過程很簡單泳秀,下面開始實現(xiàn)第一個組件RpcBuilder,主要功能用來生成client端和server端的代理對象榄攀。
/**
* rpc服務類
*
* @author wqx
*
*/
public final class RpcBuilder {
//構建客戶端的代理對象
public static Object buildRpcClient(final Class<?> interfaces,final String host,final int port){
if(interfaces == null){
throw new IllegalArgumentException("interfaces can not be null");
}
return Proxy.newProxyInstance(RpcBuilder.class.getClassLoader(), new Class<?>[]{interfaces},
new InvocationHandler(){
//攔截目標方法->序列化method對象->發(fā)起socket連接
public Object invoke(Object proxy, Method method,
Object[] args) throws Throwable {
//創(chuàng)建連接,獲取輸入輸出流
Socket socket = new Socket(host,port);
Object retVal = null;
try{
ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream in = new ObjectInputStream(socket.getInputStream());
try{
//構造請求參數(shù)對象
RpcRequest request = new RpcRequest(method.getName(), method.getParameterTypes(),args);
//發(fā)送
out.writeObject(request);
//接受server端的返回信息---阻塞
Object response = in.readObject();
if(response instanceof RpcResponse){
RpcResponse rpcResp = (RpcResponse)response;
if(!rpcResp.isError()){
retVal = rpcResp.getResponseBody();
}else{
return new Throwable(rpcResp.getErrorMsg());
}
}
}finally{
out.close();
in.close();
}
}finally{
socket.close();
}
return retVal;
}
});
}
private static int nThreads = Runtime.getRuntime().availableProcessors() * 2;
private static ExecutorService handlerPool = Executors.newFixedThreadPool(nThreads);
public static void buildRpcServer(final Object service, final int port) throws IOException{
if (service == null)
throw new IllegalArgumentException("service can not be null.");
ServerSocket server = new ServerSocket(port);
System.out.println("server started!!!");
while(true){
Socket socket = server.accept();//監(jiān)聽請求--阻塞
//交由線程池異步處理
handlerPool.submit(new Handler(service,socket));
}
}
static class Handler implements Runnable{
private Object service;
private Socket socket;
public Handler(Object service,Socket socket){
this.service = service;
this.socket = socket;
}
public void run() {
try{
ObjectInputStream in = null;
ObjectOutputStream out = null;
RpcResponse response = new RpcResponse();
try {
in = new ObjectInputStream(socket.getInputStream());
out = new ObjectOutputStream(socket.getOutputStream());
Object req = in.readObject();
if(req instanceof RpcRequest){
RpcRequest rpcRequest = (RpcRequest)req;
Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
Object retVal = method.invoke(service, rpcRequest.getArgs());
response.setResponseBody(retVal);
out.writeObject(response);
}else{
throw new IllegalArgumentException("bad request!");
}
} catch (Exception e) {
response.setErrorMsg(e.getMessage());
response.setResponseBody(e);
out.writeObject(response);
}finally{
in.close();
out.close();
}
}catch(Exception e){}
}
}
}
其中每次發(fā)送請求包含的信息(方法名嗜傅、參數(shù)名),將封裝在RpcRequest中檩赢,實現(xiàn)如下:
/**
* 封裝請求參數(shù)
*
* @author wqx
*
*/
public class RpcRequest implements Serializable
{
/**
*
*/
private static final long serialVersionUID = -7102839100899303105L;
//方法名
private String methodName;
//參數(shù)類型
private Class<?>[] parameterTypes;
//參數(shù)列表
private Object[] args;
public RpcRequest(String methodName,Class<?>[] parameterTypes,Object[] args)
{
this.methodName = methodName;
this.parameterTypes = parameterTypes;
this.args = args;
}
//getter and sette
服務端返回的執(zhí)行結果封裝在RpcResponse中吕嘀,如下所示:
/**
* 響應對象
*
* @author wqx
*
*/
public class RpcResponse implements Serializable{
static private final long serialVersionUID = -4364536436151723421L;
//響應實體
private Object responseBody;
//錯誤信息
private String errorMsg;
public boolean isError(){
return errorMsg == null ? false:true;
}
//getter and setter
}
自定義的業(yè)務異常RpcException:
/**
* 自定義異常
*
* @author wqx
*
*/
public class RpcException extends RuntimeException {
private static final long serialVersionUID = -2157872157006208360L;
public RpcException(String msg){
super(msg);
}
}
client端測試代碼:
// client端
public class ClientTest {
private static String host = "127.0.0.1";
private static int port = 8888;
public static void main(String[] args) {
UserService userService = (UserService) RpcBuilder.buildRpcClient(UserService.class, host, port);
Object msg = null;
try{
msg = userService.test();//測試基本鏈路是否暢通
// msg = userService.exceptionTest();//異常測試
// msg = userService.queryUserById(0);//傳輸自定義業(yè)務類型
// if(msg instanceof User){
// System.out.println("parent:" + ((User)msg).getName());
// System.out.println("child:" + ((User)msg).getChilds().get(0).getName());
// }
System.out.println("msg:" + msg);
}catch(Exception e){
System.out.println("errorMsg:" + e);
}
}
}
server端的測試代碼:
public class ServerTest {
private static int port = 8888;
public static void main(String[] args) throws IOException {
UserService userService = new UserServiceImpl();
//暴露服務
RpcBuilder.buildRpcServer(userService,port);
}
}
測試test方法:預期輸出:
msg : hello client, this is rpc server.
測試exceptionTest方法:
輸出:
errorMsg:edu.ouc.rpc.RpcException: exception occur in server!U曷鳌偶房!
測試queryUserById方法:
輸出:
parent:小明爸爸
child:小明同學
done!>蝴悉!
上述源碼托管在github上
實現(xiàn)簡單但問題多多
序列化方案(jdk原生序列化方案性能太差了)
網(wǎng)絡傳輸方案(Socket BIO需要)
服務端方法執(zhí)行時間很長怎么辦?
異步調用如何實現(xiàn)瘾敢?
。尿这。簇抵。
一個一個get!I渲凇碟摆!