基于netty的rpc框架
[TOC]
如果你已經(jīng)對以下東東有所了解,那么你就可以完成一個rpc框架了
- Java的反射技術(shù)
- java的動態(tài)代理機制
- 基于nio的框架netty
- 全世界最好的框架-spring
- java的序列化
神馬是rpc第租?
-
在這個大數(shù)據(jù)時代人乓,很多公司的服務(wù)器都是以集群的方式存在的瘪弓。在我們傳統(tǒng)的mvc后臺開發(fā)中濒募,我們就需要把不同層的服務(wù)部署到不同的服務(wù)器上面懈糯,這個每個服務(wù)器的的壓力就會比較小了奢方。
但是這樣也會帶來一個問題——我在這臺機器上如何才能調(diào)用到另一臺機器的代碼呢润樱?這是個問題渣触。
我們先來舉個栗子:
比如我們在一個傳統(tǒng)mvc項目中,我們有一個UserController處理用戶的請求壹若,假如它長的這個樣子:
@RestController @RequestMapping(value = "/user") public class UserController { @Autowired UserService userService; @RequestMapping(value = "/current") public AjaxResponse login(String name,String password) { return AjaxResponse.success(userService.login(name,password)); } }
正常情況下嗅钻,這個UserService的實現(xiàn)肯定是在同一個項目或者是本地的皂冰,早就已經(jīng)被加入到spring容器中了,不過加入我們?yōu)榱藴p少服務(wù)器的壓力养篓,我們將UserService的實現(xiàn)放到另一臺服務(wù)器上秃流,加入我們有一個膜法,可以在本地的Controller像調(diào)用本地方法一樣調(diào)用另一臺的userServiceImpl就好了柳弄,Rpc就是這樣一種技術(shù)舶胀。
實現(xiàn)思路
-
表面上看這是一個很難完成的任務(wù),本機怎么可能可以調(diào)用到遠程的方法呢碧注?不過如果我們這個任務(wù)拆分開來嚣伐,就會發(fā)現(xiàn)只要一步一步來,其實還是挺簡單的萍丐。
我們可以換一種思路纤控,既然直接調(diào)用不行,我們可以曲線救國呀碉纺,我們只要把調(diào)用方法的對象的名稱,方法的名字刻撒,方法的參數(shù)與方法的類型都通過網(wǎng)絡(luò)發(fā)送到另一臺機器上骨田,另一條機器接收到之后根據(jù)請求信息調(diào)用該對象的方法,然后在把執(zhí)行結(jié)果通過網(wǎng)路直接返回回來不就ok了声怔。其實态贤,rpc框架的大體思路就是如此。
-
大體實現(xiàn)流程如下:
- 通過java的動態(tài)代理機制為我們UserService創(chuàng)建代理對象醋火,在代理對象執(zhí)行方法的時候?qū)嶋H上已經(jīng)被我們定制的方法攔截悠汽。
- 在攔截的邏輯里面,我們在獲取到調(diào)用的方法的所有接口芥驳,方法名柿冲,參數(shù)集合,參數(shù)類型集合后封裝到一個JavaBean——request中去兆旬,然后我們將這個對象序列化之后通過網(wǎng)絡(luò)傳輸?shù)搅硪慌_機器上假抄。
- 另一臺機器接受到這個網(wǎng)絡(luò)請求后,將數(shù)據(jù)反序列化為Request對象丽猬,從而了解我們請求的是具體是什么對象的什么方法宿饱,然后服務(wù)器通過反射的方式調(diào)用,并將執(zhí)行結(jié)果通過另一個JavaBean——Response返回脚祟。
- 本機收到服務(wù)端的返回谬以。整個rpc調(diào)用就完成了由桌。
-
如下圖所示为黎,由于畫圖水平有限,不過大致就是這個意思:
代碼具體實現(xiàn)
-
首先我們需要為我們的網(wǎng)絡(luò)請求分裝兩個JavaBean碍舍,分別為Request與Response.柠座。
//在Request中應(yīng)有的屬性 private String requestId; private String className; private String methodName; private Class<?>[] parameterTypes; private Object[] parameters; //在response應(yīng)該有的屬性 private String requestId; private Throwable error; private Object result;
-
創(chuàng)建RpcClient片橡,封裝我們的網(wǎng)絡(luò)請求流程妈经。其中最重要的是這個方法:
public Response send(Request request) throws InterruptedException { ClientBootstrap bootstrap = new ClientBootstrap(); ExecutorService boss = Executors.newCachedThreadPool(); ExecutorService work = Executors.newCachedThreadPool(); bootstrap.setFactory(new NioClientSocketChannelFactory(boss,work)); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder",new ResponseDecoder()); pipeline.addLast("encoder",new RequestEncoder()); pipeline.addLast("handler",RpcClient.this); return pipeline; } }); ChannelFuture connect = bootstrap.connect(new InetSocketAddress(address, port)).sync(); connect.getChannel().write(request).sync(); //阻塞線程直到完成請求或者請求失敗 synchronized (obj){ obj.wait(); } connect.getChannel().close().sync(); return this.response; }
這里用netty3進行的網(wǎng)咯請求捧书,這里
ResponseDecoder
與RequestEncoder
是對Response與Request進行的序列化與反序列化吹泡,采用的谷歌的Protostuff序列化框架實現(xiàn)(為啥不用java自帶的序列化工具呢?因為java自定的序列化附帶了很多其他信息经瓷,序列化的字節(jié)長度比谷歌的長好幾倍爆哑,所以是為了節(jié)約帶寬,同時Protostuff的序列化支持多種編程語言) -
創(chuàng)建代理的工具類舆吮,返回代理對象揭朝。
public <T>T proxy(Class<?> clazz){ return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[] { clazz }, new InvocationHandler() { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Request request = new Request(); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setParameters(args); request.setRequestId(UUID.randomUUID().toString()); request.setParameterTypes(method.getParameterTypes()); RpcClient client =new RpcClient(address,port); //通過封裝的網(wǎng)絡(luò)框架進行網(wǎng)絡(luò)請求 Response response = client.send(request); if (response.getError()!=null){ throw response.getError(); } else{ return response; } } }); }
-
服務(wù)端在開啟服務(wù)的時候就需要通過spring掃描所有的service實現(xiàn)類,將其裝進spring的容器中色冀。
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String, Object> beansWithAnnotation = applicationContext.getBeansWithAnnotation(RPCService.class); for(Map.Entry<String,Object> entry :beansWithAnnotation.entrySet()){ String interfaceName = entry.getValue().getClass() .getAnnotation(RPCService.class).value().getName(); serviceMap.put(interfaceName,entry.getValue()); } startServer(); }
需要發(fā)布的服務(wù)類都需要使用
@RPCService
注解潭袱,這是一個自定義的注解。 -
在服務(wù)端收到客戶端的網(wǎng)絡(luò)請求之后锋恬,我們就需要從spring容器中找到請求的服務(wù)類完成調(diào)用并返回執(zhí)行結(jié)果屯换。
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception { Request request = (Request) event.getMessage(); Response response = new Response(); //調(diào)用請求類的請求方法執(zhí)行并返回執(zhí)行結(jié)果 Object invoke = null; try { Object requestBean = serviceMap.get(request.getClassName()); Class<?> requestClazz = Class.forName(request.getClassName()); Method method = requestClazz.getMethod(request.getMethodName(), request.getParameterTypes()); invoke = method.invoke(requestBean, request.getParameters()); response.setRequestId(UUID.randomUUID().toString()); response.setResult(invoke); } catch (Exception e) { response.setError(e); response.setRequestId(UUID.randomUUID().toString()); } System.out.println(request+""+response); //返回執(zhí)行結(jié)果 ctx.getChannel().write(response); }
總結(jié)
- 整體的流程還是比較簡單的,就是具體實現(xiàn)的時候會有一些細節(jié)問題需要好好處理与学。雖然是第一次寫這種輪子程序彤悔,不過感覺還是不錯的。完整代碼已上傳到我的GitHub倉庫里面索守,有興趣的小伙伴可以去看看晕窑。