RPC框架實(shí)現(xiàn)之路01-極簡(jiǎn)的RPC框架實(shí)現(xiàn)

Version 01

動(dòng)態(tài)代理通過(guò)jdk動(dòng)態(tài)代理方式實(shí)現(xiàn)通殃,網(wǎng)絡(luò)通信通過(guò)Socket實(shí)現(xiàn)编曼,服務(wù)注冊(cè)和獲取通過(guò)HashMap實(shí)現(xiàn),對(duì)象的序列化通過(guò)jdk 序列化方式實(shí)現(xiàn)松却。
此版本的代碼在:Version 01代碼
RPC框架會(huì)逐步進(jìn)行完善旭寿,目前是簡(jiǎn)單的第一版本警绩。

Module:rpc-framework-common

用于搭載請(qǐng)求信息和失敗信息。

dto包

  • RpcRequest

    此模塊主要編寫rpcClient和rpcServer之間網(wǎng)絡(luò)傳輸時(shí)候的媒介:RpcRequest

    其作為接口名盅称、方法名房蝉、參數(shù)僚匆、參數(shù)類型的載體。

    因?yàn)橐ㄟ^(guò)網(wǎng)絡(luò)進(jìn)行傳輸搭幻,所以需要實(shí)現(xiàn)序列化接口咧擂。

    這里使用lombok注解了@Builder和@Data

  • RpcResponse

    用來(lái)做響應(yīng)的RpcResponse,主要搭載RpcServer向RpcClient返回的信息檀蹋,包括狀態(tài)碼松申,狀態(tài)信息以及返回的數(shù)據(jù)。

    提供兩個(gè)狀態(tài)的設(shè)置數(shù)據(jù)的方法俯逾,success傳參為返回的數(shù)據(jù)贸桶,因?yàn)闋顟B(tài)碼和狀態(tài)信息均為成功。

    fail需要傳入枚舉類RpcResponseCode桌肴,因?yàn)槭〉脑蚴怯卸喾N的皇筛,需要由上游決定。

enumeration包

枚舉類型

  • RpcResponseCode

    枚舉類型坠七。為響應(yīng)狀態(tài)碼水醋,與RpcResponse協(xié)同工作

  • RpcErrorMessageEnum

    枚舉類型。為異常信息彪置,與RpcException協(xié)同工作

exception包

  • RpcException

    自定義異常拄踪,使用到了RpcErrorMessageEnum

Module:rpc-framework-simple

此模塊主要編寫RPC客戶端和RPC服務(wù)端

registry包

  • ServiceRegistry

    定義接口,其功能為服務(wù)的獲取以及服務(wù)的注冊(cè)拳魁。

    注冊(cè)服務(wù)需要傳入指定的服務(wù)惶桐;

    獲取服務(wù)需要傳入服務(wù)名。

  • DefaultServiceRegistry

    實(shí)現(xiàn)了上述接口

    在注冊(cè)服務(wù)中潘懊,服務(wù)名為接口名姚糊,比如com.wyb.neu.HelloServiceImpl,并且保存需要注冊(cè)的服務(wù)是通過(guò)HashMap來(lái)保存的授舟,HashMap的key為接口名救恨,value為實(shí)現(xiàn)的類。

    添加服務(wù)的過(guò)程會(huì)將當(dāng)前服務(wù)實(shí)現(xiàn)的所有接口都添加進(jìn)去岂却。

     @Override
     public <T> void register(T service) {
     //獲取服務(wù)名
     String serviceName = service.getClass().getCanonicalName();
     //是否已經(jīng)注冊(cè)過(guò)
     if (registeredService.contains(serviceName)) {
     return;
     }
     //先添加到注冊(cè)過(guò)的set集合中
     registeredService.add(serviceName);
     //獲取接口集合
     Class[] interfaces = service.getClass().getInterfaces();
     //如果接口集合是0
     if (interfaces.length == 0) {
     throw new RpcException(RpcErrorMessageEnum.SERVICE_NOT_IMPLEMENT_ANY_INTERFACE);
     }
     //將所有接口都添加進(jìn)去
     for (Class i : interfaces) {
     serviceMap.put(i.getCanonicalName(), service);
     }
     logger.info("Add service: {} and interfaces:{}", serviceName, service.getClass().getInterfaces());
     }```
    
    那么是如何獲取服務(wù)的呢?
    
    傳入服務(wù)名裙椭,判斷HashMap里面有沒(méi)有就可以了躏哩。
    
     ```java
     @Override
     public Object getService(String serviceName) {
     Object service = serviceMap.get(serviceName);
     if(null == service)
     throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND);
     return service;
     }
    

remoting包

  • RpcClient

    使用jdk seriable以及Socket完成RpcRequest的發(fā)送,并且通過(guò)Socket以及序列化再接收返回信息揉燃,進(jìn)行判斷扫尺。

    主要代碼:

  public Object sendRpcRequest(RpcRequest rpcRequest, String host, int port)
     {
     //創(chuàng)建socket對(duì)象,根據(jù)指定的主機(jī)和端口
     try(Socket socket = new Socket(host,port)){
     ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
     objectOutputStream.writeObject(rpcRequest);
     ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
    //            return objectInputStream.readObject();
    //            Server端的信息都封裝到了RpcServer中炊汤,后續(xù)操作是獲取RpcResponse正驻,然后判斷是否正確執(zhí)行
     RpcResponse rpcResponse = (RpcResponse)objectInputStream.readObject();
     if(rpcResponse == null)
     {
     logger.error("Invoke Service Fail , Service Name is : {}",rpcRequest.getInterfaceName());
     }
     if (rpcResponse.getCode() == null || !rpcResponse.getCode().equals(RpcResponseCode.SUCCESS.getCode())) {
     logger.error("調(diào)用服務(wù)失敗,serviceName:{},RpcResponse:{}", rpcRequest.getInterfaceName(), rpcResponse);
     throw new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE, "interfaceName:" + rpcRequest.getInterfaceName());
     }
     return rpcResponse.getData();
     }catch (IOException | ClassNotFoundException e)
     {
     logger.error("occur exception ", e);
     }
     return null;
     }
  • RpcClientProxy

    使用jdk動(dòng)態(tài)代理弊攘,jdkProxy

    主要代碼:

 public <T>T getProxy(Class<T> clazz){
  // 參數(shù)1:被代理類的類加載器 參數(shù)2:被代理類的接口
  return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
 }
 @Override
 public Object invoke(Object proxy, Method method, Object[] args)  {
  logger.info("Call \"invoke method\" and invoked method: {}", method.getName());
  RpcRequest rpcRequest = RpcRequest.builder().methodName(method.getName())
  .parameters(args)
  .interfaceName(method.getDeclaringClass().getName())
  .paramTypes(method.getParameterTypes())
  .build();
  RpcClient rpcClient = new RpcClient();
  return rpcClient.sendRpcRequest(rpcRequest,host,port);
 }
  • RpcServer

    主要使用到ExecutorService線程池、Socket

    主要功能為注冊(cè)服務(wù)姑曙,并且不斷監(jiān)聽(tīng)客戶端襟交,如果收到客戶端的請(qǐng)求,將請(qǐng)求執(zhí)行的任務(wù)加入到線程池中執(zhí)行伤靠。

    主要的實(shí)現(xiàn)邏輯:

  public void start(int port) {

   try (ServerSocket server = new ServerSocket(port);) {
   logger.info("server starts...");
   Socket socket;
   while ((socket = server.accept()) != null) {
   logger.info("client connected");
   threadPool.execute(new RpcRequestHandlerRunnable(socket, rpcRequestHandler, serviceRegistry));
   }
   threadPool.shutdown();
   } catch (IOException e) {
   logger.error("occur IOException:", e);
   }
  }
  • RpcRequestHandlerRunnable

    實(shí)現(xiàn)了Runnable接口捣域,主要編寫線程執(zhí)行的主體run方法,其里面完成服務(wù)的獲取宴合,并調(diào)用RpcRequestHandler完成方法的執(zhí)行焕梅,并將結(jié)果寫回

    @Override
    public void run() {
    try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
    ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
    RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();//獲取Client傳入的RpcRequest
    String interfaceName = rpcRequest.getInterfaceName();//獲取要執(zhí)行的接口名
    Object service = serviceRegistry.getService(interfaceName);//根據(jù)接口名去注冊(cè)中心查找Service
    Object result = rpcRequestHandler.handle(rpcRequest, service);//交到RpcRequestHandler中執(zhí)行
    objectOutputStream.writeObject(RpcResponse.success(result));
    objectOutputStream.flush();
    } catch (IOException | ClassNotFoundException e) {
    logger.error("occur exception:", e);
    }
    }
    
  • RpcRequestHandler

    利用反射,完成方法的調(diào)用卦洽,所執(zhí)行的方法再rpcRequest中保存著贞言,service的話,在線程里面已經(jīng)完成了獲取

     public Object handle(RpcRequest rpcRequest, Object service) {
     Object result = null;
     try {
     result = invokeTargetMethod(rpcRequest, service);
     logger.info("service:{} successful invoke method:{}", rpcRequest.getInterfaceName(), rpcRequest.getMethodName());
     } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
     logger.error("occur exception", e);
     }
     return result;
     }
    
     private Object invokeTargetMethod(RpcRequest rpcRequest, Object service) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
     Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
     if (null == method) {
     return RpcResponse.fail(RpcResponseCode.NOT_FOUND_METHOD);
     }
     return method.invoke(service, rpcRequest.getParameters());
     }
    

Module:hello-service-api

HelloService

接口阀蒂,服務(wù)器提供的方法接口该窗,服務(wù)器提供的服務(wù)

public interface HelloService {
 String hello(Hello hello);
}

Hello

實(shí)現(xiàn)了序列化,因?yàn)檫@個(gè)類也需要進(jìn)行網(wǎng)絡(luò)傳輸脂新,是HelloService的參數(shù)

public class Hello implements Serializable {
    private String message;
    private String description;
    //get & set

    public Hello(String message, String description) {
        this.message = message;
        this.description = description;
    }
}

example-server

HelloServiceImpl

實(shí)現(xiàn)了HelloService中的Hello方法挪捕,服務(wù)器中的實(shí)現(xiàn)類

public class HelloServiceImpl implements HelloService{
    private static final Logger logger = LoggerFactory.getLogger(HelloServiceImpl.class);

    public String sayHello(Hello hello) {
        logger.info("HelloServiceImpl Get: {}",hello.getMessage());
        String results = "Hello description is "+hello.getDescription();
        logger.info("HelloServiceImpl Return: {}",results);
        return results;
    }
}

RpcFrameworkSimpleServerMain

啟動(dòng)服務(wù)器,注冊(cè)服務(wù)争便,傳入需要注冊(cè)的服務(wù)以及端口號(hào)

public class RpcFrameworkSimpleMain {
    public static void main(String[] args) {
        HelloServiceImpl helloService = new HelloServiceImpl();
        DefaultServiceRegistry defaultServiceRegistry = new DefaultServiceRegistry();
        // 手動(dòng)注冊(cè)
        defaultServiceRegistry.register(helloService);
        RpcServer rpcServer = new RpcServer(defaultServiceRegistry);
        rpcServer.start(9999);
    }
}

example-client

RpcFrameworkSimpleClientMain

啟動(dòng)客戶端级零,獲取服務(wù)

public class RpcFrameworkSimpleClientMain {
    public static void main(String[] args) {
        RpcClientProxy rpcClientProxy = new RpcClientProxy("127.0.0.1",9999);
        HelloService helloService = rpcClientProxy.getProxy(HelloService.class);
        String hello = helloService.sayHello(new Hello("Hello","Hello World"));
        System.out.println(hello);
    }
}

運(yùn)行實(shí)例

1:?jiǎn)?dòng)Server

Start Server

2:?jiǎn)?dòng)Client

此時(shí)的Client:

Start Client

此時(shí)的Server:

Server when Start Client
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市滞乙,隨后出現(xiàn)的幾起案子奏纪,更是在濱河造成了極大的恐慌,老刑警劉巖斩启,帶你破解...
    沈念sama閱讀 211,290評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件序调,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡兔簇,警方通過(guò)查閱死者的電腦和手機(jī)发绢,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,107評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)垄琐,“玉大人边酒,你說(shuō)我怎么就攤上這事±昃剑” “怎么了墩朦?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,872評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)翻擒。 經(jīng)常有香客問(wèn)我氓涣,道長(zhǎng)牛哺,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,415評(píng)論 1 283
  • 正文 為了忘掉前任劳吠,我火速辦了婚禮引润,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘赴背。我一直安慰自己椰拒,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,453評(píng)論 6 385
  • 文/花漫 我一把揭開(kāi)白布凰荚。 她就那樣靜靜地躺著燃观,像睡著了一般。 火紅的嫁衣襯著肌膚如雪便瑟。 梳的紋絲不亂的頭發(fā)上缆毁,一...
    開(kāi)封第一講書(shū)人閱讀 49,784評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音到涂,去河邊找鬼脊框。 笑死,一個(gè)胖子當(dāng)著我的面吹牛践啄,可吹牛的內(nèi)容都是我干的浇雹。 我是一名探鬼主播,決...
    沈念sama閱讀 38,927評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼屿讽,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼昭灵!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起伐谈,我...
    開(kāi)封第一講書(shū)人閱讀 37,691評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤烂完,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后诵棵,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體抠蚣,經(jīng)...
    沈念sama閱讀 44,137評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,472評(píng)論 2 326
  • 正文 我和宋清朗相戀三年履澳,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了嘶窄。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,622評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡距贷,死狀恐怖柄冲,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情储耐,我是刑警寧澤羊初,帶...
    沈念sama閱讀 34,289評(píng)論 4 329
  • 正文 年R本政府宣布滨溉,位于F島的核電站什湘,受9級(jí)特大地震影響长赞,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜闽撤,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,887評(píng)論 3 312
  • 文/蒙蒙 一得哆、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧哟旗,春花似錦贩据、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,741評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至舍沙,卻和暖如春近上,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背拂铡。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,977評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工壹无, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人感帅。 一個(gè)月前我還...
    沈念sama閱讀 46,316評(píng)論 2 360
  • 正文 我出身青樓斗锭,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親失球。 傳聞我的和親對(duì)象是個(gè)殘疾皇子岖是,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,490評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容