前言
本文只是簡單實(shí)現(xiàn)了一次RPC調(diào)用示例槽卫,以理解其調(diào)用原理叠洗。一些主流RPC框架的其他功能并沒有實(shí)現(xiàn)。(如服務(wù)自動(dòng)注冊與發(fā)現(xiàn)拦键,流控,動(dòng)態(tài)配置等)
PRC調(diào)用核心
像調(diào)用本地代碼一樣調(diào)用遠(yuǎn)程服務(wù)檩淋。
調(diào)用方只需調(diào)用服務(wù)方所提供的接口芬为,通過Java動(dòng)態(tài)代理,代理方法內(nèi)狼钮,與服務(wù)方進(jìn)行網(wǎng)絡(luò)交互碳柱,得到服務(wù)方返回結(jié)果捡絮“疚撸基于上述,調(diào)用方只需依賴服務(wù)方所提供的接口福稳。在使用時(shí)的感覺就像是涎拉,調(diào)用了本地代碼一樣。其實(shí)是用代理模式屏蔽了底層的網(wǎng)絡(luò)交互的圆。
簡單的RPC調(diào)用所涉及的底層技術(shù)
Java動(dòng)態(tài)代理
Java反射
Java序列化
NIO網(wǎng)絡(luò)模型(Netty)
Netty是什么
- Netty 是 JBoss 公司用 Java 寫的一個(gè) Jar 包(庫)鼓拧,目的是快速開發(fā)高性能、高可靠性的網(wǎng)絡(luò)服務(wù)器和客戶端程序
- Netty 提供異步越妈、無阻塞季俩、事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架和工具
- Netty 是目前公認(rèn)的網(wǎng)絡(luò)編程最好的框架,官網(wǎng)地址:http://netty.io/
- GitHub 托管地址:https://github.com/netty/netty
- Netty 底層封裝的也是 Java 的NIO梅掠,所以也叫NIO框架酌住,常用于開發(fā)分布式系統(tǒng)
推薦該博主的關(guān)于netty的文章:
https://blog.csdn.net/wangmx1993328/article/details/83035760
動(dòng)手實(shí)現(xiàn)
調(diào)用時(shí)序圖
我們先來看一次PRC調(diào)用的時(shí)序圖:
- Client:服務(wù)調(diào)用方
- Proxy : 調(diào)用方動(dòng)態(tài)代理組件
- Netty_C:調(diào)用方Netty客戶端
- 注冊中心:服務(wù)自動(dòng)注冊與發(fā)現(xiàn)店归,可以是ZooKeeper
- Netty_S : 提供方Netty服務(wù)端
- Server: 服務(wù)提供方
注:上圖紅色虛線框中的功能,本示例沒有涵蓋酪我。本示例通過API方式配置消痛,直連服務(wù)節(jié)點(diǎn)。
類圖
- MyRpcServiceContainer : 調(diào)用方入口都哭,主要用于獲取代理服務(wù)秩伞,存儲服務(wù)節(jié)點(diǎn)的信息。
- MyRpcServiceGroup : 服務(wù)節(jié)點(diǎn)的集合欺矫,并提供負(fù)載均衡策略(未實(shí)現(xiàn))纱新。
- MyRpcServiceNode : 單個(gè)服務(wù)節(jié)點(diǎn)的信息。
- MyRpcClientProxy : 調(diào)用方動(dòng)態(tài)代理實(shí)現(xiàn)類穆趴。
- MyRpcClient : Netty客戶端怒炸,對連接信息進(jìn)行配置,如序列化反序列化Handler和異步處理返回結(jié)果的Handler毡代。
- MyRpcClientHandler : Netty客戶端異步處理的Handler阅羹。主要用于發(fā)送請求信息等。
- MyRpcRequest : 請求對象封裝教寂,包含請求接口捏鱼,請求方法,請求參數(shù)等酪耕。
- MyRpcResponse : 請求結(jié)果封裝导梆,包含方法返回結(jié)果。
- MyRpcServer : 提供方入口迂烁,主要用于暴露服務(wù)看尼。
- MyRpcServerConfig : 提供方服務(wù)的集合,以及一些配置信息盟步。
- MyRpcServiceImplProxy : 提供方服務(wù)代理藏斩,代理服務(wù)的方法具體實(shí)現(xiàn),并提供流控等功能却盘。
- MyRpcFlowControl : 流控計(jì)數(shù)器狰域,針對接口、方法維度黄橘,提供流控計(jì)數(shù)功能兆览。
- MyRpcServerHandler : Netty服務(wù)端異步處理的Handler。主要用于發(fā)送執(zhí)行結(jié)果等塞关。
代碼講解
TODO
測試
服務(wù)提供方
public class MyRpcServerTest {
@Test
public void testName() {
MyRpcServer myRpcServer = new MyRpcServer();
MyRpcServerConfig config = new MyRpcServerConfig();
config.setPort(8888);
//注冊服務(wù)抬探,此處為api方式
MyRpcServiceImplProxy implProxy = new MyRpcServiceImplProxy(DemoRpcService.class);
config.addService(DemoPrcInterface.class, implProxy);
myRpcServer.config(config).start();
}
}
十一月 13, 2019 5:24:09 下午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0xea1be79c] REGISTERED
十一月 13, 2019 5:24:09 下午 io.netty.handler.logging.LoggingHandler bind
信息: [id: 0xea1be79c] BIND: 0.0.0.0/0.0.0.0:8888
十一月 13, 2019 5:24:09 下午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0xea1be79c, L:/0:0:0:0:0:0:0:0:8888] ACTIVE
main,服務(wù)器開始監(jiān)聽端口,等待客戶端連接.........
十一月 13, 2019 5:24:25 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xea1be79c, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0xac188c6d, L:/127.0.0.1:8888 - R:/127.0.0.1:61210]
十一月 13, 2019 5:24:25 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xea1be79c, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
十一月 13, 2019 5:24:25 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xea1be79c, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0x46558bc0, L:/127.0.0.1:8888 - R:/127.0.0.1:61208]
十一月 13, 2019 5:24:25 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xea1be79c, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
十一月 13, 2019 5:24:25 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xea1be79c, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0x7b6f1d71, L:/127.0.0.1:8888 - R:/127.0.0.1:61209]
十一月 13, 2019 5:24:25 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xea1be79c, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
服務(wù)調(diào)用方
public class MyRpcServiceContainerTest {
private CountDownLatch countDownLatch = new CountDownLatch(3);
@Test
public void testName() throws InterruptedException {
Map<Class, MyRpcServiceGroup> serviceInfoMap = new HashMap<Class, MyRpcServiceGroup>();
MyRpcServiceGroup serviceInfo = new MyRpcServiceGroup();
serviceInfo.addNode(new MyRpcServiceNode("127.0.0.1",8888));
serviceInfoMap.put(DemoPrcInterface.class, serviceInfo);
MyRpcServiceContainer container = new MyRpcServiceContainer(serviceInfoMap);
//上面為服務(wù)發(fā)現(xiàn),此處為api方式
final DemoPrcInterface intf = container.getService(DemoPrcInterface.class);
//System.out.println(intf.helloWithName("zxm"));
final DemoReq req = new DemoReq();
req.setUuid("123456");
for (int i = 0; i < 3; i++) {
new Thread(new Runnable() {
public void run() {
try {
System.out.println(intf.callRemoteService(req,"yyxl").getUuid());
}catch (Exception e){
e.printStackTrace();
}finally {
countDownLatch.countDown();
}
}
}).start();
}
countDownLatch.await();
}
}
Thread-2,客戶端發(fā)起異步連接..........
Thread-1,客戶端發(fā)起異步連接..........
Thread-0,客戶端發(fā)起異步連接..........
nioEventLoopGroup-3-1,Server return Message:com.yyxl.myrpc.service.dto.MyRpcResponse@6585879b
nioEventLoopGroup-4-1,Server return Message:com.yyxl.myrpc.service.dto.MyRpcResponse@74997d68
java.lang.reflect.UndeclaredThrowableException
at com.sun.proxy.$Proxy3.callRemoteService(Unknown Source)
at com.yyxl.myrpc.service.consumer.MyRpcServiceContainerTest$1.run(MyRpcServiceContainerTest.java:32)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: 流控
at com.yyxl.myrpc.service.provider.MyRpcServiceImplProxy.call(MyRpcServiceImplProxy.java:57)
at com.yyxl.myrpc.service.provider.MyRpcServerHandler.channelRead(MyRpcServerHandler.java:26)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 more
java.lang.reflect.UndeclaredThrowableException
at com.sun.proxy.$Proxy3.callRemoteService(Unknown Source)
at com.yyxl.myrpc.service.consumer.MyRpcServiceContainerTest$1.run(MyRpcServiceContainerTest.java:32)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: 流控
at com.yyxl.myrpc.service.provider.MyRpcServiceImplProxy.call(MyRpcServiceImplProxy.java:57)
at com.yyxl.myrpc.service.provider.MyRpcServerHandler.channelRead(MyRpcServerHandler.java:26)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 more
nioEventLoopGroup-2-1,Server return Message:com.yyxl.myrpc.service.dto.MyRpcResponse@686c50af
123456##yyxl
我們可以看到帆赢,默認(rèn)流控閥值為1小压,前2個(gè)請求直接返回流控異常砰左,第3個(gè)返回正常調(diào)用結(jié)果。
參考
徹底理解Netty场航,這一篇文章就夠了
Netty 入門示例詳解
Netty之傳輸POJO(使用Java自帶的序列化方式)
動(dòng)態(tài)代理
【Java 筆記】Java 反射相關(guān)整理