基于Netty的高性能JAVA的RPC框架

RPC的實(shí)現(xiàn)

1. RPC客戶端?

2. RPC服務(wù)端

RPC客戶端的實(shí)現(xiàn)

RPC客戶端和RPC服務(wù)器端需要一個(gè)相同的接口類老速,RPC客戶端通過一個(gè)代理類來調(diào)用RPC服務(wù)器端的函數(shù)

RpcConsumerImpl的實(shí)現(xiàn) ...... package com.alibaba.middleware.race.rpc.api.impl; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import com.alibaba.middleware.race.rpc.aop.ConsumerHook; import com.alibaba.middleware.race.rpc.api.RpcConsumer; import com.alibaba.middleware.race.rpc.async.ResponseCallbackListener; import com.alibaba.middleware.race.rpc.context.RpcContext; import com.alibaba.middleware.race.rpc.model.RpcRequest; import com.alibaba.middleware.race.rpc.model.RpcResponse; import com.alibaba.middleware.race.rpc.netty.RpcConnection; import com.alibaba.middleware.race.rpc.netty.RpcNettyConnection; import com.alibaba.middleware.race.rpc.tool.Tool; public class RpcConsumerImpl extends RpcConsumer implements InvocationHandler { private static AtomicLong callTimes = new AtomicLong(0L); private RpcConnection connection; private List<RpcConnection> connection_list; private Map<String,ResponseCallbackListener> asyncMethods; private Class<?> interfaceClass; private String version; private int timeout; private ConsumerHook hook; public Class<?> getInterfaceClass() { return interfaceClass; } public String getVersion() { return version; } public int getTimeout() { this.connection.setTimeOut(timeout); return timeout; } public ConsumerHook getHook() { return hook; } RpcConnection select() { //Random rd=new Random(System.currentTimeMillis()); int d=(int) (callTimes.getAndIncrement()%(connection_list.size()+1)); if(d==0) return connection; else { return connection_list.get(d-1); } } public RpcConsumerImpl() { //String ip=System.getProperty("SIP"); String ip="127.0.0.1"; this.asyncMethods=new HashMap<String,ResponseCallbackListener>(); this.connection=new RpcNettyConnection(ip,8888); this.connection.connect(); connection_list=new ArrayList<RpcConnection>(); int num=Runtime.getRuntime().availableProcessors()/3 -2; for (int i = 0; i < num; i++) { connection_list.add(new RpcNettyConnection(ip, 8888)); } for (RpcConnection conn:connection_list) { conn.connect(); } } public void destroy() throws Throwable { if (null != connection) { connection.close(); } } @SuppressWarnings("unchecked") public <T> T proxy(Class<T> interfaceClass) throws Throwable { if (!interfaceClass.isInterface()) { throw new IllegalArgumentException(interfaceClass.getName() + " is not an interface"); } return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] { interfaceClass }, this); } @Override public RpcConsumer interfaceClass(Class<?> interfaceClass) { // TODO Auto-generated method stub this.interfaceClass=interfaceClass; return this; } @Override public RpcConsumer version(String version) { // TODO Auto-generated method stub this.version=version; return this; } @Override public RpcConsumer clientTimeout(int clientTimeout) { // TODO Auto-generated method stub this.timeout=clientTimeout; return this; } @Override public RpcConsumer hook(ConsumerHook hook) { // TODO Auto-generated method stub this.hook=hook; return this; } @Override public Object instance() { // TODO Auto-generated method stub try { return proxy(this.interfaceClass); } catch (Throwable e) { e.printStackTrace(); } return null; } @Override public void asynCall(String methodName) { // TODO Auto-generated method stub asynCall(methodName, null); } @Override public <T extends ResponseCallbackListener> void asynCall( String methodName, T callbackListener) { this.asyncMethods.put(methodName, callbackListener); this.connection.setAsyncMethod(asyncMethods); for (RpcConnection conn:connection_list) { conn.setAsyncMethod(asyncMethods); } } @Override public void cancelAsyn(String methodName) { // TODO Auto-generated method stub this.asyncMethods.remove(methodName); this.connection.setAsyncMethod(asyncMethods); for (RpcConnection conn:connection_list) { conn.setAsyncMethod(asyncMethods); } } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // TODO Auto-generated method stub List<String> parameterTypes = new LinkedList<String>(); for (Class<?> parameterType : method.getParameterTypes()) { parameterTypes.add(parameterType.getName()); } RpcRequest request = new RpcRequest(); request.setRequestId(UUID.randomUUID().toString()); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParameters(args); if(hook!=null) hook.before(request); RpcResponse response = null; try { request.setContext(RpcContext.props); response = (RpcResponse) select().Send(request,asyncMethods.containsKey(request.getMethodName())); if(hook!=null) hook.after(request); if(!asyncMethods.containsKey(request.getMethodName())&&response.getExption()!=null) { Throwable e=(Throwable) Tool.deserialize(response.getExption(),response.getClazz()); throw e.getCause(); } } catch (Throwable t) { //t.printStackTrace(); //throw new RuntimeException(t); throw t; } finally { // if(asyncMethods.containsKey(request.getMethodName())&&asyncMethods.get(request.getMethodName())!=null) // { // cancelAsyn(request.getMethodName()); // } } if(response==null) { return null; } else if (response.getErrorMsg() != null) { throw response.getErrorMsg(); } else { return response.getAppResponse(); } } }

RpcConsumer consumer; consumer = (RpcConsumer) getConsumerImplClass().newInstance(); consumer.someMethod();

因?yàn)閏onsumer對(duì)象是通過代理生成的诬像,所以當(dāng)consumer調(diào)用的時(shí)候咖熟,就會(huì)調(diào)用invoke函數(shù),我們就可以把這次本地的函數(shù)調(diào)用的信息通過網(wǎng)絡(luò)發(fā)送到RPC服務(wù)器然后等待服務(wù)器返回的信息后再返回。

服務(wù)器實(shí)現(xiàn)

RPC服務(wù)器主要是在收到RPC客戶端之后解析出RPC調(diào)用的接口名除破,函數(shù)名以及參數(shù)。

package com.alibaba.middleware.race.rpc.api.impl; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.lang.reflect.Method; import java.util.HashMap; import java.util.Map; import net.sf.cglib.reflect.FastClass; import net.sf.cglib.reflect.FastMethod; import com.alibaba.middleware.race.rpc.context.RpcContext; import com.alibaba.middleware.race.rpc.model.RpcRequest; import com.alibaba.middleware.race.rpc.model.RpcResponse; import com.alibaba.middleware.race.rpc.serializer.KryoSerialization; import com.alibaba.middleware.race.rpc.tool.ByteObjConverter; import com.alibaba.middleware.race.rpc.tool.ReflectionCache; import com.alibaba.middleware.race.rpc.tool.Tool; /** * 處理服務(wù)器收到的RPC請(qǐng)求并返回結(jié)果 * @author sei.zz * */ public class RpcRequestHandler extends ChannelInboundHandlerAdapter { //對(duì)應(yīng)每個(gè)請(qǐng)求ID和端口好 對(duì)應(yīng)一個(gè)RpcContext的Map; private static Map<String,Map<String,Object>> ThreadLocalMap=new HashMap<String, Map<String,Object>>(); //服務(wù)端接口-實(shí)現(xiàn)類的映射表 private final Map<String, Object> handlerMap; KryoSerialization kryo=new KryoSerialization(); public RpcRequestHandler(Map<String, Object> handlerMap) { this.handlerMap = handlerMap; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("active"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub System.out.println("disconnected"); } //更新RpcContext的類容 private void UpdateRpcContext(String host,Map<String,Object> map) { if(ThreadLocalMap.containsKey(host)) { Map<String,Object> local=ThreadLocalMap.get(host); local.putAll(map);//把客戶端的加進(jìn)來 ThreadLocalMap.put(host, local);//放回去 for(Map.Entry<String, Object> entry:map.entrySet()){ //更新變量 RpcContext.addProp(entry.getKey(), entry.getValue()); } } else { ThreadLocalMap.put(host, map); //把對(duì)應(yīng)線程的Context更新 for(Map.Entry<String, Object> entry:map.entrySet()){ RpcContext.addProp(entry.getKey(), entry.getValue()); } } } //用來緩存住需要序列化的結(jié)果 private static Object cacheName=null; private static Object cacheVaule=null; @Override public void channelRead( ChannelHandlerContext ctx, Object msg) throws Exception { RpcRequest request=(RpcRequest)msg; String host=ctx.channel().remoteAddress().toString(); //更新上下文 UpdateRpcContext(host,request.getContext()); //TODO 獲取接口名 函數(shù)名 參數(shù) 找到實(shí)現(xiàn)類 反射實(shí)現(xiàn) RpcResponse response = new RpcResponse(); response.setRequestId(request.getRequestId()); try { Object result = handle(request); if(cacheName!=null&&cacheName.equals(result)) { response.setAppResponse(cacheVaule); } else { response.setAppResponse(ByteObjConverter.ObjectToByte(result)); cacheName=result; cacheVaule=ByteObjConverter.ObjectToByte(result); } } catch (Throwable t) { //response.setErrorMsg(t); response.setExption(Tool.serialize(t)); response.setClazz(t.getClass()); } ctx.writeAndFlush(response); } /** * 運(yùn)行調(diào)用的函數(shù)返回結(jié)果 * @param request * @return * @throws Throwable */ private static RpcRequest methodCacheName=null; private static Object methodCacheValue=null; private Object handle(RpcRequest request) throws Throwable { String className = request.getClassName(); Object classimpl = handlerMap.get(className);//通過類名找到實(shí)現(xiàn)的類 Class<?> clazz = classimpl.getClass(); String methodName = request.getMethodName(); Class<?>[] parameterTypes = request.getParameterTypes(); Object[] parameters = request.getParameters(); // Method method = ReflectionCache.getMethod(clazz.getName(),methodName, parameterTypes); // method.setAccessible(true); //System.out.println(className+":"+methodName+":"+parameters.length); if(methodCacheName!=null&&methodCacheName.equals(request)) { return methodCacheValue; } else { try { methodCacheName=request; if(methodMap.containsKey(methodName)) { methodCacheValue= methodMap.get(methodName).invoke(classimpl, parameters); return methodCacheValue; } else { FastClass serviceFastClass = FastClass.create(clazz); FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes); methodMap.put(methodName, serviceFastMethod); methodCacheValue= serviceFastMethod.invoke(classimpl, parameters); return methodCacheValue; } //return method.invoke(classimpl, parameters); } catch (Throwable e) { throw e.getCause(); } } } private Map<String,FastMethod> methodMap=new HashMap<String, FastMethod>(); @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //ctx.close(); //cause.printStackTrace(); ctx.close(); } }

handel函數(shù)通過Java的反射機(jī)制琼腔,找到要調(diào)用的接口類然后調(diào)用對(duì)應(yīng)函數(shù)然后執(zhí)行瑰枫,然后返回結(jié)果到客戶端,本次RPC調(diào)用結(jié)束丹莲。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末光坝,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子甥材,更是在濱河造成了極大的恐慌盯另,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,290評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件洲赵,死亡現(xiàn)場(chǎng)離奇詭異鸳惯,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)叠萍,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,107評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門芝发,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人俭令,你說我怎么就攤上這事后德。” “怎么了抄腔?”我有些...
    開封第一講書人閱讀 156,872評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵瓢湃,是天一觀的道長。 經(jīng)常有香客問我赫蛇,道長绵患,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,415評(píng)論 1 283
  • 正文 為了忘掉前任悟耘,我火速辦了婚禮落蝙,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘暂幼。我一直安慰自己筏勒,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,453評(píng)論 6 385
  • 文/花漫 我一把揭開白布旺嬉。 她就那樣靜靜地躺著管行,像睡著了一般。 火紅的嫁衣襯著肌膚如雪邪媳。 梳的紋絲不亂的頭發(fā)上捐顷,一...
    開封第一講書人閱讀 49,784評(píng)論 1 290
  • 那天荡陷,我揣著相機(jī)與錄音,去河邊找鬼迅涮。 笑死废赞,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的叮姑。 我是一名探鬼主播唉地,決...
    沈念sama閱讀 38,927評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼戏溺!你這毒婦竟也來了渣蜗?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,691評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤旷祸,失蹤者是張志新(化名)和其女友劉穎耕拷,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體托享,經(jīng)...
    沈念sama閱讀 44,137評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡骚烧,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,472評(píng)論 2 326
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了闰围。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片赃绊。...
    茶點(diǎn)故事閱讀 38,622評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖羡榴,靈堂內(nèi)的尸體忽然破棺而出碧查,到底是詐尸還是另有隱情,我是刑警寧澤校仑,帶...
    沈念sama閱讀 34,289評(píng)論 4 329
  • 正文 年R本政府宣布忠售,位于F島的核電站,受9級(jí)特大地震影響迄沫,放射性物質(zhì)發(fā)生泄漏稻扬。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,887評(píng)論 3 312
  • 文/蒙蒙 一羊瘩、第九天 我趴在偏房一處隱蔽的房頂上張望泰佳。 院中可真熱鬧,春花似錦尘吗、人聲如沸逝她。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,741評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽黔宛。三九已至,卻和暖如春侧戴,著一層夾襖步出監(jiān)牢的瞬間宁昭,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評(píng)論 1 265
  • 我被黑心中介騙來泰國打工酗宋, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留积仗,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,316評(píng)論 2 360
  • 正文 我出身青樓蜕猫,卻偏偏與公主長得像寂曹,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子回右,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,490評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理隆圆,服務(wù)發(fā)現(xiàn),斷路器翔烁,智...
    卡卡羅2017閱讀 134,629評(píng)論 18 139
  • RPC框架遠(yuǎn)程調(diào)用的實(shí)現(xiàn)方式在原理上是比較簡單的渺氧,即將調(diào)用的方法(接口名、方法名蹬屹、參數(shù)類型侣背、參數(shù))序列化之后發(fā)送到...
    謎碌小孩閱讀 3,093評(píng)論 0 13
  • 1、后海散步清理慨默,出汗排毒贩耐,持續(xù)和身體鏈接,提升補(bǔ)充身體能量 2厦取、感謝自己每天堅(jiān)持認(rèn)真地活在功課中潮太,把自己鉚釘在線...
    張艾雯閱讀 245評(píng)論 0 0
  • 越來越像空心的稻草人铡买,身體和靈魂同時(shí)被抽離,越來越麻木台谢,冷淡龜縮寻狂,難道這就是所謂的成長?每每想到這,腦海中便驚現(xiàn)一...
    icexu閱讀 177評(píng)論 0 0
  • 前些天纠亚,看了一部電影,然后問了身邊朋友這樣一個(gè)問題:假設(shè)你對(duì)象出軌了筋夏,你更容易接受他/她的出軌對(duì)象是男生還是女生蒂胞?...
    菠菜猩球閱讀 169評(píng)論 0 1