Version 01
動(dòng)態(tài)代理通過(guò)jdk動(dòng)態(tài)代理方式實(shí)現(xiàn)通殃,網(wǎng)絡(luò)通信通過(guò)Socket實(shí)現(xiàn)编曼,服務(wù)注冊(cè)和獲取通過(guò)HashMap實(shí)現(xiàn),對(duì)象的序列化通過(guò)jdk 序列化方式實(shí)現(xiàn)松却。
此版本的代碼在:Version 01代碼
RPC框架會(huì)逐步進(jìn)行完善旭寿,目前是簡(jiǎn)單的第一版本警绩。
Module:rpc-framework-common
用于搭載請(qǐng)求信息和失敗信息。
dto包
-
RpcRequest
此模塊主要編寫rpcClient和rpcServer之間網(wǎng)絡(luò)傳輸時(shí)候的媒介:RpcRequest
其作為接口名盅称、方法名房蝉、參數(shù)僚匆、參數(shù)類型的載體。
因?yàn)橐ㄟ^(guò)網(wǎng)絡(luò)進(jìn)行傳輸搭幻,所以需要實(shí)現(xiàn)序列化接口咧擂。
這里使用lombok注解了@Builder和@Data
-
RpcResponse
用來(lái)做響應(yīng)的RpcResponse,主要搭載RpcServer向RpcClient返回的信息檀蹋,包括狀態(tài)碼松申,狀態(tài)信息以及返回的數(shù)據(jù)。
提供兩個(gè)狀態(tài)的設(shè)置數(shù)據(jù)的方法俯逾,success傳參為返回的數(shù)據(jù)贸桶,因?yàn)闋顟B(tài)碼和狀態(tài)信息均為成功。
fail需要傳入枚舉類RpcResponseCode桌肴,因?yàn)槭〉脑蚴怯卸喾N的皇筛,需要由上游決定。
enumeration包
枚舉類型
-
RpcResponseCode
枚舉類型坠七。為響應(yīng)狀態(tài)碼水醋,與RpcResponse協(xié)同工作
-
RpcErrorMessageEnum
枚舉類型。為異常信息彪置,與RpcException協(xié)同工作
exception包
-
RpcException
自定義異常拄踪,使用到了RpcErrorMessageEnum
Module:rpc-framework-simple
此模塊主要編寫RPC客戶端和RPC服務(wù)端
registry包
-
ServiceRegistry
定義接口,其功能為服務(wù)的獲取以及服務(wù)的注冊(cè)拳魁。
注冊(cè)服務(wù)需要傳入指定的服務(wù)惶桐;
獲取服務(wù)需要傳入服務(wù)名。
-
DefaultServiceRegistry
實(shí)現(xiàn)了上述接口
在注冊(cè)服務(wù)中潘懊,服務(wù)名為接口名姚糊,比如
com.wyb.neu.HelloServiceImpl
,并且保存需要注冊(cè)的服務(wù)是通過(guò)HashMap來(lái)保存的授舟,HashMap的key為接口名救恨,value為實(shí)現(xiàn)的類。添加服務(wù)的過(guò)程會(huì)將當(dāng)前服務(wù)實(shí)現(xiàn)的所有接口都添加進(jìn)去岂却。
@Override public <T> void register(T service) { //獲取服務(wù)名 String serviceName = service.getClass().getCanonicalName(); //是否已經(jīng)注冊(cè)過(guò) if (registeredService.contains(serviceName)) { return; } //先添加到注冊(cè)過(guò)的set集合中 registeredService.add(serviceName); //獲取接口集合 Class[] interfaces = service.getClass().getInterfaces(); //如果接口集合是0 if (interfaces.length == 0) { throw new RpcException(RpcErrorMessageEnum.SERVICE_NOT_IMPLEMENT_ANY_INTERFACE); } //將所有接口都添加進(jìn)去 for (Class i : interfaces) { serviceMap.put(i.getCanonicalName(), service); } logger.info("Add service: {} and interfaces:{}", serviceName, service.getClass().getInterfaces()); }``` 那么是如何獲取服務(wù)的呢? 傳入服務(wù)名裙椭,判斷HashMap里面有沒(méi)有就可以了躏哩。 ```java @Override public Object getService(String serviceName) { Object service = serviceMap.get(serviceName); if(null == service) throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND); return service; }
remoting包
-
RpcClient
使用jdk seriable以及Socket完成RpcRequest的發(fā)送,并且通過(guò)Socket以及序列化再接收返回信息揉燃,進(jìn)行判斷扫尺。
主要代碼:
public Object sendRpcRequest(RpcRequest rpcRequest, String host, int port)
{
//創(chuàng)建socket對(duì)象,根據(jù)指定的主機(jī)和端口
try(Socket socket = new Socket(host,port)){
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(rpcRequest);
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
// return objectInputStream.readObject();
// Server端的信息都封裝到了RpcServer中炊汤,后續(xù)操作是獲取RpcResponse正驻,然后判斷是否正確執(zhí)行
RpcResponse rpcResponse = (RpcResponse)objectInputStream.readObject();
if(rpcResponse == null)
{
logger.error("Invoke Service Fail , Service Name is : {}",rpcRequest.getInterfaceName());
}
if (rpcResponse.getCode() == null || !rpcResponse.getCode().equals(RpcResponseCode.SUCCESS.getCode())) {
logger.error("調(diào)用服務(wù)失敗,serviceName:{},RpcResponse:{}", rpcRequest.getInterfaceName(), rpcResponse);
throw new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE, "interfaceName:" + rpcRequest.getInterfaceName());
}
return rpcResponse.getData();
}catch (IOException | ClassNotFoundException e)
{
logger.error("occur exception ", e);
}
return null;
}
-
RpcClientProxy
使用jdk動(dòng)態(tài)代理弊攘,jdkProxy
主要代碼:
public <T>T getProxy(Class<T> clazz){
// 參數(shù)1:被代理類的類加載器 參數(shù)2:被代理類的接口
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
logger.info("Call \"invoke method\" and invoked method: {}", method.getName());
RpcRequest rpcRequest = RpcRequest.builder().methodName(method.getName())
.parameters(args)
.interfaceName(method.getDeclaringClass().getName())
.paramTypes(method.getParameterTypes())
.build();
RpcClient rpcClient = new RpcClient();
return rpcClient.sendRpcRequest(rpcRequest,host,port);
}
-
RpcServer
主要使用到ExecutorService線程池、Socket
主要功能為注冊(cè)服務(wù)姑曙,并且不斷監(jiān)聽(tīng)客戶端襟交,如果收到客戶端的請(qǐng)求,將請(qǐng)求執(zhí)行的任務(wù)加入到線程池中執(zhí)行伤靠。
主要的實(shí)現(xiàn)邏輯:
public void start(int port) {
try (ServerSocket server = new ServerSocket(port);) {
logger.info("server starts...");
Socket socket;
while ((socket = server.accept()) != null) {
logger.info("client connected");
threadPool.execute(new RpcRequestHandlerRunnable(socket, rpcRequestHandler, serviceRegistry));
}
threadPool.shutdown();
} catch (IOException e) {
logger.error("occur IOException:", e);
}
}
-
RpcRequestHandlerRunnable
實(shí)現(xiàn)了Runnable接口捣域,主要編寫線程執(zhí)行的主體run方法,其里面完成服務(wù)的獲取宴合,并調(diào)用RpcRequestHandler完成方法的執(zhí)行焕梅,并將結(jié)果寫回
@Override public void run() { try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream()); ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) { RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();//獲取Client傳入的RpcRequest String interfaceName = rpcRequest.getInterfaceName();//獲取要執(zhí)行的接口名 Object service = serviceRegistry.getService(interfaceName);//根據(jù)接口名去注冊(cè)中心查找Service Object result = rpcRequestHandler.handle(rpcRequest, service);//交到RpcRequestHandler中執(zhí)行 objectOutputStream.writeObject(RpcResponse.success(result)); objectOutputStream.flush(); } catch (IOException | ClassNotFoundException e) { logger.error("occur exception:", e); } }
-
RpcRequestHandler
利用反射,完成方法的調(diào)用卦洽,所執(zhí)行的方法再rpcRequest中保存著贞言,service的話,在線程里面已經(jīng)完成了獲取
public Object handle(RpcRequest rpcRequest, Object service) { Object result = null; try { result = invokeTargetMethod(rpcRequest, service); logger.info("service:{} successful invoke method:{}", rpcRequest.getInterfaceName(), rpcRequest.getMethodName()); } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { logger.error("occur exception", e); } return result; } private Object invokeTargetMethod(RpcRequest rpcRequest, Object service) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException { Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes()); if (null == method) { return RpcResponse.fail(RpcResponseCode.NOT_FOUND_METHOD); } return method.invoke(service, rpcRequest.getParameters()); }
Module:hello-service-api
HelloService
接口阀蒂,服務(wù)器提供的方法接口该窗,服務(wù)器提供的服務(wù)
public interface HelloService {
String hello(Hello hello);
}
Hello
實(shí)現(xiàn)了序列化,因?yàn)檫@個(gè)類也需要進(jìn)行網(wǎng)絡(luò)傳輸脂新,是HelloService的參數(shù)
public class Hello implements Serializable {
private String message;
private String description;
//get & set
public Hello(String message, String description) {
this.message = message;
this.description = description;
}
}
example-server
HelloServiceImpl
實(shí)現(xiàn)了HelloService中的Hello方法挪捕,服務(wù)器中的實(shí)現(xiàn)類
public class HelloServiceImpl implements HelloService{
private static final Logger logger = LoggerFactory.getLogger(HelloServiceImpl.class);
public String sayHello(Hello hello) {
logger.info("HelloServiceImpl Get: {}",hello.getMessage());
String results = "Hello description is "+hello.getDescription();
logger.info("HelloServiceImpl Return: {}",results);
return results;
}
}
RpcFrameworkSimpleServerMain
啟動(dòng)服務(wù)器,注冊(cè)服務(wù)争便,傳入需要注冊(cè)的服務(wù)以及端口號(hào)
public class RpcFrameworkSimpleMain {
public static void main(String[] args) {
HelloServiceImpl helloService = new HelloServiceImpl();
DefaultServiceRegistry defaultServiceRegistry = new DefaultServiceRegistry();
// 手動(dòng)注冊(cè)
defaultServiceRegistry.register(helloService);
RpcServer rpcServer = new RpcServer(defaultServiceRegistry);
rpcServer.start(9999);
}
}
example-client
RpcFrameworkSimpleClientMain
啟動(dòng)客戶端级零,獲取服務(wù)
public class RpcFrameworkSimpleClientMain {
public static void main(String[] args) {
RpcClientProxy rpcClientProxy = new RpcClientProxy("127.0.0.1",9999);
HelloService helloService = rpcClientProxy.getProxy(HelloService.class);
String hello = helloService.sayHello(new Hello("Hello","Hello World"));
System.out.println(hello);
}
}
運(yùn)行實(shí)例
1:?jiǎn)?dòng)Server
2:?jiǎn)?dòng)Client
此時(shí)的Client:
此時(shí)的Server: