自定義RPC框架
分布式架構(gòu)網(wǎng)絡(luò)通信
分布式的基礎(chǔ)問題是遠程服務(wù)是怎么通訊的查吊。
java
領(lǐng)域有很多可實現(xiàn)遠程通訊的技術(shù)逻卖,例如:RMI
、Hessian
戈鲁、SOAP
、ESB
和 JMS
等婆芦。
遠程通訊技術(shù)
RMI
JDK的RMI文檔:https://docs.oracle.com/javase/8/docs/technotes/guides/rmi/
Hessian
Hessian官網(wǎng):http://hessian.caucho.com/
SOAP
WSDL WS-* are language-agnostic.
JAX-WS are Java standard to build web service.
Apache CXF and Apache Axis 2 are two implementations of JAX-WS. They also offer JAX-RS implementations so that you can build Restful services.
CXF has better integration with Spring, and Camel(camel-cxf). And Axis 2 seems not have a active release.
ESB
ESB:https://zh.wikipedia.org/wiki/%E4%BC%81%E4%B8%9A%E6%9C%8D%E5%8A%A1%E6%80%BB%E7%BA%BF
Spring Integration:https://spring.io/projects/spring-integration
Mule ESB:https://www.mulesoft.com/resources/esb/what-mule-esb
Apache Camel:https://camel.apache.org/
JMS
JMS:https://www.oracle.com/java/technologies/java-message-service.html
https://spring.io/guides/gs/messaging-jms/
- Spring JMS is the Spring abstraction over the JMS API.
- JMS is just an API; you need a physical broker to actually do messaging.
- ActiveMQ is not a framework, it is an open-source JMS broker that supports the actual persistence and delivery of messages.
- Spring JMS can be used with any JMS broker, including ActiveMQ. Each broker provides its own JMS API client jar.
- RabbitMQ is not a native JMS broker; its native protocol is AMQP 0.9.1; it does, however, provide a JMS API client that can be used with Spring JMS, but Spring AMQP is the preferred library for talking to RabbitMQ because it provides much more functionality than is available over JMS.
- There are lots of examples for using Spring JMS on the internet.
- The simplest way to get started is with Spring Boot and
@JmsListener
.
基本原理
從網(wǎng)絡(luò)通信的底層去看员帮,通信要做的事情就是把流從一臺計算機傳輸?shù)搅硗庖慌_計算機。使用傳輸協(xié)議和網(wǎng)絡(luò)IO實現(xiàn)捞高,傳輸協(xié)議比較知名的如 TCP
氯材、UDP
等渣锦。
TCP
和 UDP
都是在基于 socket
的概念上為某類應(yīng)用場景擴展出來的傳輸層協(xié)議。
網(wǎng)絡(luò) IO
主要由 bio
氢哮、nio
袋毙、aio
冗尤,所有的分布式都是基于這個原理實現(xiàn)的听盖。
什么是RPC
rpc全稱是 remote procedure call
,既遠程過程調(diào)用生闲。借助 RPC
可以做到像本地調(diào)用一樣調(diào)用遠程服務(wù)媳溺,是一種進程間的通信方式。
RPC
不是一個具體的技術(shù)碍讯,而是指整個網(wǎng)絡(luò)調(diào)用的過程悬蔽。
下面展示的是本地調(diào)用和遠程調(diào)用的示例:
例如有A服務(wù)器部署了一個應(yīng)用,B服務(wù)器也部署了一個應(yīng)用捉兴,現(xiàn)在A服務(wù)器上的應(yīng)用想要調(diào)用B服務(wù)器上的應(yīng)用的方法蝎困,由于,兩個應(yīng)用不在同一個服務(wù)器倍啥,因此不在同一個內(nèi)存空間禾乘,無法實現(xiàn)直接調(diào)用,需要通過網(wǎng)絡(luò)來表達調(diào)用的語義和傳達調(diào)用的數(shù)據(jù)虽缕。
<img src="media/16461457884893/16467573463113.jpg" style="zoom:50%;" />
<img src="media/16461457884893/16467573463123.jpg" style="zoom:50%;" />
RPC架構(gòu)
一個完整的RPC架構(gòu)包含四個完整的組件始藕,分表是Client,Client Stub氮趋,Server和Server Stub伍派。Stub可以理解為存根。
- 客戶端(Client)剩胁,服務(wù)的調(diào)用方诉植。
- 客戶端存根(Client Stub),存放服務(wù)端的地址消息昵观,將客戶端的請求打包成網(wǎng)絡(luò)消息晾腔,通過網(wǎng)絡(luò)遠程發(fā)送給服務(wù)方。
- 服務(wù)端(Server)啊犬,真正的服務(wù)端提供者灼擂。
- 服務(wù)端存根(Sever Stub),接收客戶端發(fā)送過來的消息觉至,將消息解包缤至,并調(diào)用本地方法。
<img src="media/16461457884893/16467573463136.jpg" style="zoom:50%;" />
<img src="media/16461457884893/16467573463152.jpg" style="zoom:50%;" />
1、客戶端以本地方式調(diào)用服務(wù)
2领斥、客戶端存根接收到調(diào)用之后嫉到,將方法參數(shù)組裝成能進行網(wǎng)絡(luò)傳輸?shù)南Ⅲw,消息體序列化為二進制
3月洛、客戶端通過socket
將消息發(fā)送到服務(wù)端
4何恶、服務(wù)端存根接收到消息之后進行解碼,將消息對象反序列化
5嚼黔、服務(wù)端存根根據(jù)解碼結(jié)果調(diào)用本地服務(wù)
6细层、服務(wù)處理
7、本地服務(wù)執(zhí)行并將結(jié)果返回給服務(wù)端存根
8唬涧、服務(wù)端存根將返回結(jié)果打包成消息疫赎,將結(jié)果消息對象序列化
9、服務(wù)端通過socket將消息發(fā)送到客戶端
10碎节、客戶端存根接收到消息并進行解碼捧搞,將消息對象反序列化
11、客戶端得到最終結(jié)果
RPC的目標是只保留1狮荔、6胎撇、11,將其他的細節(jié)全部封裝起來殖氏。
注意:不管是什么類型的數(shù)據(jù)晚树,在輸出過程中都要轉(zhuǎn)換成二進制流,而接收方需要將二進制流恢復(fù)為對象雅采。
Java中常見的RPC框架有Hessian爵憎、gRPC、Dubbo等婚瓜,核心模塊都是通訊和序列化
https://github.com/grpc/grpc-java
RMI
Java的RMI指的是 Remote Method Invocation
宝鼓,一種實現(xiàn)遠程過程調(diào)用(RPC)的API,能直接傳輸序列化后的Java對象闰渔。它的實現(xiàn)依賴于JVM席函,因此它能支撐一個JVM到另外一個JVM的調(diào)用铐望。
<img src="media/16461457884893/16467573463168.jpg" style="zoom:50%;" />
1冈涧、客戶端從遠程服務(wù)器的注冊表中查詢并獲取遠程對象的引用。
2正蛙、樁對象與遠程對象有相同的接口和方法列表督弓,當客戶端調(diào)用遠程對象時候,實際上是由樁對象代理完成乒验。
3愚隧、遠程引用層將樁的本地引用轉(zhuǎn)換為服務(wù)器上對象的遠程引用,再將調(diào)用層傳遞給傳輸層锻全,由傳輸層通過TCP協(xié)議發(fā)起調(diào)用狂塘。
4录煤、在服務(wù)端,傳輸層監(jiān)聽入站鏈接荞胡,收到客戶端的遠程調(diào)用之后妈踊,將引用轉(zhuǎn)發(fā)到上層的遠程引用層;
服務(wù)端的遠程引用層將客戶端發(fā)送的遠程引用轉(zhuǎn)換為本地虛擬機的引用泪漂,再將請求傳遞給骨架廊营;
骨架讀取參數(shù),將請求傳遞給服務(wù)器萝勤,由服務(wù)器進行實際的方法調(diào)用露筒。
5、如果遠程方法調(diào)用之后有返回值敌卓,服務(wù)器將結(jié)果沿著 "骨架->遠程引用層->傳輸層" 向下傳遞慎式。
6、客戶端的傳輸層接收到返回值之后假哎,又沿著 "傳輸層->遠程引用層->樁" 向上傳遞瞬捕,并最終將結(jié)果傳遞給客戶端程序。
RMI實例需求分析
1舵抹、服務(wù)端提供根據(jù)ID查詢用戶的方法
2肪虎、客戶端調(diào)用服務(wù)端方法,并返回用戶對象
3惧蛹、要求使用RMI進行遠程通訊
服務(wù)端實現(xiàn)
/**
* RMI服務(wù)端
*
* @name: RMIServer
* @author: terwer
* @date: 2022-03-06 02:01
**/
public class RMIServer {
public static void main(String[] args) {
try {
// 1.注冊Registry實例扇救,綁定端口
Registry registry = LocateRegistry.createRegistry(9998);
// 2.創(chuàng)建遠程對象
IUserService userService = new UserServiceImpl();
// 3.將遠程對象注冊到RMI服務(wù)器(既服務(wù)端注冊表)
registry.rebind("userService", userService);
System.out.println("RMI服務(wù)端啟動成功");
} catch (RemoteException e) {
e.printStackTrace();
}
}
}
客戶端實現(xiàn)
/**
* RMI客戶端
*
* @name: RMIClient
* @author: terwer
* @date: 2022-03-06 19:25
**/
public class RMIClient {
public static void main(String[] args) throws RemoteException, NotBoundException {
// 1、獲取Registry實例
Registry registry = LocateRegistry.getRegistry("127.0.0.1", 9998);
// 2香嗓、通過Registry查找遠程對象
IUserService userService = (IUserService) registry.lookup("userService");
User user = userService.getUserById(1);
System.out.println("userName = " + user.getName());
}
}
效果預(yù)覽
<img src="media/16461457884893/16467573463195.png" alt="image-20220306193226744" style="zoom:50%;" />
<img src="media/16461457884893/16467573463236.png" alt="image-20220306193244530" style="zoom:50%;" />
基于Netty實現(xiàn)RPC框架
Dubbo
底層使用 Netty
作為網(wǎng)絡(luò)通訊框架迅腔,要求使用 Netty
實現(xiàn)一個簡單的 RPC
框架,消費者和提供者約定協(xié)議和接口靠娱,消費者遠程調(diào)用提供者的服務(wù)沧烈。
1、創(chuàng)建一個接口像云,定義抽象方法锌雀,用于消費者和提供者之間的約定。
2迅诬、創(chuàng)建一個提供者腋逆,該類需要監(jiān)聽消費者的請求,并按照約定返回數(shù)據(jù)侈贷。
3惩歉、創(chuàng)建一個消費者,該類需要透明的調(diào)用自己不存在的方法,內(nèi)部需要使用 Netty
進行數(shù)據(jù)通信撑蚌。
4上遥、提供者與消費者傳輸數(shù)據(jù)使用json字符串格式。
5争涌、提供者使用 Netty
集成 Spring Boot
環(huán)境露该。
案例:客戶端調(diào)用服務(wù)端,利用ID查詢User對象的方法
需求分析
<img src="media/16461457884893/16467573463305.png" alt="image-20220306234352362" style="zoom:50%;" />
具體實現(xiàn)
需要分成三個子項目
.
├── custom-rpc-api
├── custom-rpc-consumer
├── custom-rpc-provider
└── pom.xml
主項目
主項目的 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.terewrgreen</groupId>
<artifactId>custom-rpc</artifactId>
<packaging>pom</packaging>
<version>1.0.0</version>
<modules>
<module>custom-rpc-api</module>
<module>custom-rpc-provider</module>
<module>custom-rpc-consumer</module>
</modules>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.3</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<curator.version>4.3.0</curator.version>
</properties>
<dependencies>
<!--netty依賴 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<!--json依賴 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.79</version>
</dependency>
<!--lombok依賴 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>
custom-rpc-api
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>custom-rpc</artifactId>
<groupId>com.terewrgreen</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>custom-rpc-api</artifactId>
<name>custom-rpc-api</name>
<url>http://www.terwergreen.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<pluginManagement>
</pluginManagement>
</build>
</project>
custom-rpc-consumer
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>custom-rpc</artifactId>
<groupId>com.terewrgreen</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>custom-rpc-consumer</artifactId>
<name>custom-rpc-consumer</name>
<url>http://www.terwergreen.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.terewrgreen</groupId>
<artifactId>custom-rpc-api</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<pluginManagement>
</pluginManagement>
</build>
</project>
RpcClient
/**
* Rpc客戶端
* 1第煮、連接netty服務(wù)端
* 2解幼、提供給調(diào)用者關(guān)閉資源的方法
* 3、提供消息發(fā)送的方法
*
* @name: RpcClient
* @author: terwer
* @date: 2022-03-13 21:04
**/
public class RpcClient {
private NioEventLoopGroup group;
private Channel channel;
private String ip;
private int port;
private RpcClientHandler rpcClientHandler = new RpcClientHandler();
private ExecutorService executorService = Executors.newCachedThreadPool();
public RpcClient(String ip, int port) {
this.ip = ip;
this.port = port;
initClient();
}
/**
* 初始化客戶端包警,連接netty服務(wù)端
*/
public void initClient() {
try {
// 創(chuàng)建線程組
group = new NioEventLoopGroup();
// 創(chuàng)建啟動助手
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.SO_TIMEOUT, 3000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
// String 編解碼器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// 客戶端處理類
pipeline.addLast(rpcClientHandler);
}
});
channel = bootstrap.connect(ip, port).sync().channel();
System.out.println("===========客戶端啟動成功==========");
} catch (Exception e) {
if (channel != null) {
channel.close();
System.out.println("客戶端關(guān)閉channel");
}
if (group != null) {
group.shutdownGracefully();
System.out.println("客戶端關(guān)閉group");
}
e.printStackTrace();
}
}
public void close(){
if (channel != null) {
channel.close();
System.out.println("外部調(diào)用客戶端關(guān)閉channel");
}
if (group != null) {
group.shutdownGracefully();
System.out.println("外部調(diào)用客戶端關(guān)閉group");
}
}
public Object send(String msg) throws ExecutionException, InterruptedException {
rpcClientHandler.setRequestMessage(msg);
Future future = executorService.submit(rpcClientHandler);
return future.get();
}
}
RpcClienthandler
/**
* 客戶端處理類
* 1撵摆、發(fā)送消息
* 2、接收消息
*
* @name: RpcClientHandler
* @author: terwer
* @date: 2022-03-13 23:01
**/
public class RpcClientHandler extends SimpleChannelInboundHandler implements Callable {
private ChannelHandlerContext ctx;
// 消息
private String requestMessage;
private String responseMessage;
public String getRequestMessage() {
return requestMessage;
}
public void setRequestMessage(String requestMessage) {
this.requestMessage = requestMessage;
}
/**
* 通道讀取就緒事件
*
* @param channelHandlerContext
* @param msg
* @throws Exception
*/
@Override
protected synchronized void channelRead0(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
responseMessage = (String) msg;
// 喚醒等待線程
notify();
}
/**
* 通道就緒事件
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
}
@Override
public synchronized Object call() throws Exception {
// 消息發(fā)送
ctx.writeAndFlush(requestMessage);
// 線程等待
wait();
return responseMessage;
}
}
RpcClientProxy
/**
* 客戶端代理類害晦,創(chuàng)建代理對象
* 1特铝、封裝request請求對象
* 2、創(chuàng)建RpcClient對象
* 3壹瘟、發(fā)送消息
* 4鲫剿、返回結(jié)果
*
* @name: RpcClientProxy
* @author: terwer
* @date: 2022-03-13 23:45
**/
public class RpcClientProxy {
public static Object createProxy(Class serviceClass) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{serviceClass}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 1、封裝request請求對象
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setRequestId(UUID.randomUUID().toString());
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameterTypes(method.getParameterTypes());
rpcRequest.setParameters(args);
// 2稻轨、創(chuàng)建RpcClient對象
RpcClient rpcClient = new RpcClient("127.0.0.1", 9999);
try {
// 3灵莲、發(fā)送消息
Object responseMessage = rpcClient.send(JSON.toJSONString(rpcRequest));
// 4、返回結(jié)果
RpcResponse response = JSON.parseObject(responseMessage.toString(), RpcResponse.class);
if (response.getError() != null) {
throw new RuntimeException(response.getError());
}
Object result = response.getResult();
Object object = JSON.parseObject(result.toString(), method.getReturnType());
return object;
} catch (Exception e) {
throw e;
} finally {
rpcClient.close();
}
}
});
}
}
ClientBoosStrap
/**
* 客戶端啟動類
*
* @name: ClientBootStrap
* @author: terwer
* @date: 2022-03-14 00:00
**/
public class ClientBootStrap {
public static void main(String[] args) {
IUSerService userService = (IUSerService) RpcClientProxy.createProxy(IUSerService.class);
User user = userService.getById(1);
System.out.println(user);
}
}
custom-rpc-provider
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>custom-rpc</artifactId>
<groupId>com.terewrgreen</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>custom-rpc-provider</artifactId>
<name>custom-rpc-provider</name>
<url>http://www.terwergreen.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.terewrgreen</groupId>
<artifactId>custom-rpc-api</artifactId>
<version>1.0.0</version>
</dependency>
<!--Spring相關(guān)依賴 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<pluginManagement>
</pluginManagement>
</build>
</project>
UserServiceImpl
/**
* 用戶服務(wù)實現(xiàn)類
*
* @name: UserServiceImpl
* @author: terwer
* @date: 2022-03-09 23:34
**/
@RpcService
@Service
public class UserServiceImpl implements IUSerService {
Map<Object, User> userMap = new HashMap<>();
@Override
public User getById(int id) {
User user = new User();
user.setId(1);
user.setName("唐有煒");
userMap.put(user.getId(), user);
User user2 = new User();
user2.setId(2);
user2.setName("張三");
userMap.put(user2.getId(), user2);
return userMap.get(id);
}
}
RpcServer
/**
* 對外服務(wù)
*
* @name: RpcServer
* @author: terwer
* @date: 2022-03-09 23:53
**/
@Service
public class RpcServer implements DisposableBean {
private NioEventLoopGroup bossGroup;
private NioEventLoopGroup workerGroup;
@Autowired
private RpcServerHandler rpcServerHandler;
public void startServer(String ip, int port) {
try {
// 1殴俱、創(chuàng)建線程組
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
// 2政冻、創(chuàng)建服務(wù)端啟動助手
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 3、設(shè)置參數(shù)
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
// 添加String的編解碼器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// 業(yè)務(wù)處理類
pipeline.addLast(rpcServerHandler);
}
});
// 4线欲、綁定端口
ChannelFuture sync = serverBootstrap.bind(ip, port).sync();
System.out.println("===========服務(wù)端啟動成功=============");
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (bossGroup != null) {
bossGroup.shutdownGracefully();
System.out.println("finally bossGroup成功關(guān)閉");
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
System.out.println("finally workerGroup成功關(guān)閉");
}
}
}
@Override
public void destroy() throws Exception {
if (bossGroup != null) {
bossGroup.shutdownGracefully();
System.out.println("destroy bossGroup成功關(guān)閉");
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
System.out.println("destroy workerGroup成功關(guān)閉");
}
}
}
RpcServerHandler
/**
* 服務(wù)端處理類
* <p>
* 1明场、將標有@RpcService注解的類進行緩存
* 2、接收客戶端請求
* 3李丰、根據(jù)傳過來的beanName在緩存中查找對應(yīng)的bean
* 4苦锨、解析請求中的方法名、參數(shù)類型趴泌、參數(shù)信息
*
* @name: RpcServerHandler
* @author: terwer
* @date: 2022-03-10 00:22
**/
@Component
@ChannelHandler.Sharable
public class RpcServerHandler extends SimpleChannelInboundHandler<String> implements ApplicationContextAware {
private static final Map SERVICE_INSTANCE_MAP = new ConcurrentHashMap();
/**
* 1舟舒、將標有@RpcService注解的類進行緩存
*
* @param applicationContext
* @throws BeansException
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, Object> serviceMap = applicationContext.getBeansWithAnnotation(RpcService.class);
if (serviceMap != null && serviceMap.size() > 0) {
Set<Map.Entry<String, Object>> entries = serviceMap.entrySet();
for (Map.Entry<String, Object> entry : entries) {
Object serviceBean = entry.getValue();
if (serviceBean.getClass().getInterfaces().length == 0) {
throw new RuntimeException("服務(wù)必須實現(xiàn)接口");
}
// 默認取第一個接口作為名稱
SERVICE_INSTANCE_MAP.put(serviceBean.getClass().getInterfaces()[0].getName(), serviceBean);
}
}
}
/**
* 通道讀取就緒事件
*
* @param channelHandlerContext
* @param s
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// 接收客戶端請求,轉(zhuǎn)換成RpcReuest
RpcRequest rpcRequest = JSON.parseObject(msg, RpcRequest.class);
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setRequestId(rpcRequest.getRequestId());
try {
Object result = handler(rpcRequest);
rpcResponse.setResult(result);
} catch (Exception e) {
rpcResponse.setError(e.getMessage());
e.printStackTrace();
}
ctx.writeAndFlush(JSON.toJSONString(rpcResponse));
}
/**
* 業(yè)務(wù)邏輯處理方法
*
* @param rpcRequest
* @return
*/
private Object handler(RpcRequest rpcRequest) throws InvocationTargetException {
// 根據(jù)傳過來的beanName在緩存中查找對應(yīng)的bean
Object serviceBean = SERVICE_INSTANCE_MAP.get(rpcRequest.getClassName());
if(null == serviceBean){
throw new RuntimeException("根據(jù)beanName找不到服務(wù)"+rpcRequest.getClassName());
}
// 解析請求中的方法名踱讨、參數(shù)類型魏蔗、參數(shù)信息
Class<?> beanClass = serviceBean.getClass();
String methodName = rpcRequest.getMethodName();
Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
Object[] parameters = rpcRequest.getParameters();
// 反射調(diào)用
FastClass fastClass = FastClass.create(beanClass);
FastMethod fastMethod = fastClass.getMethod(methodName, parameterTypes);
Object result = fastMethod.invoke(serviceBean, parameters);
return result;
}
}
ServerBootdtrapApplication
/**
* 啟動類
*
* @name: ServerBootstrapApplication
* @author: terwer
* @date: 2022-03-09 23:46
**/
@SpringBootApplication
public class ServerBootstrapApplication implements CommandLineRunner {
@Autowired
private RpcServer rpcServer;
public static void main(String[] args) {
SpringApplication.run(ServerBootstrapApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
new Thread(new Runnable() {
@Override
public void run() {
rpcServer.startServer("127.0.0.1", 9999);
}
}).start();
}
}
運行效果
<img src="https://raw.githubusercontent.com/terwer/upload/main/img/20220314005322.png" alt="image-20220314001934218" style="zoom:50%;" />
<img src="https://raw.githubusercontent.com/terwer/upload/main/img/20220314005333.png" alt="image-20220314002004686" style="zoom:50%;" />
錯誤解決
com.terewrgreen.rpc.provider.handler.RpcServerHandler is not a @Sharable handler, so can't be added or removed multiple times.
加上 @ChannelHandler.Sharable
注解即可砍的。