自定義RPC框架

自定義RPC框架

分布式架構(gòu)網(wǎng)絡(luò)通信

分布式的基礎(chǔ)問題是遠程服務(wù)是怎么通訊的查吊。

java 領(lǐng)域有很多可實現(xiàn)遠程通訊的技術(shù)逻卖,例如:RMIHessian 戈鲁、SOAPESBJMS 等婆芦。

遠程通訊技術(shù)

RMI

JDK的RMI文檔:https://docs.oracle.com/javase/8/docs/technotes/guides/rmi/

Hessian

Hessian官網(wǎng):http://hessian.caucho.com/

SOAP

SOAP:https://zh.wikipedia.org/wiki/%E7%AE%80%E5%8D%95%E5%AF%B9%E8%B1%A1%E8%AE%BF%E9%97%AE%E5%8D%8F%E8%AE%AE

ESB

ESB:https://zh.wikipedia.org/wiki/%E4%BC%81%E4%B8%9A%E6%9C%8D%E5%8A%A1%E6%80%BB%E7%BA%BF

JMS

JMS:https://www.oracle.com/java/technologies/java-message-service.html

https://spring.io/guides/gs/messaging-jms/

  • Spring JMS is the Spring abstraction over the JMS API.
  • JMS is just an API; you need a physical broker to actually do messaging.
  • ActiveMQ is not a framework, it is an open-source JMS broker that supports the actual persistence and delivery of messages.
  • Spring JMS can be used with any JMS broker, including ActiveMQ. Each broker provides its own JMS API client jar.
  • RabbitMQ is not a native JMS broker; its native protocol is AMQP 0.9.1; it does, however, provide a JMS API client that can be used with Spring JMS, but Spring AMQP is the preferred library for talking to RabbitMQ because it provides much more functionality than is available over JMS.
  • There are lots of examples for using Spring JMS on the internet.
  • The simplest way to get started is with Spring Boot and @JmsListener.

基本原理

從網(wǎng)絡(luò)通信的底層去看员帮,通信要做的事情就是把流從一臺計算機傳輸?shù)搅硗庖慌_計算機。使用傳輸協(xié)議和網(wǎng)絡(luò)IO實現(xiàn)捞高,傳輸協(xié)議比較知名的如 TCP 氯材、UDP 等渣锦。

TCPUDP 都是在基于 socket 的概念上為某類應(yīng)用場景擴展出來的傳輸層協(xié)議。

網(wǎng)絡(luò) IO 主要由 bio 氢哮、nio袋毙、aio冗尤,所有的分布式都是基于這個原理實現(xiàn)的听盖。

什么是RPC

rpc全稱是 remote procedure call ,既遠程過程調(diào)用生闲。借助 RPC 可以做到像本地調(diào)用一樣調(diào)用遠程服務(wù)媳溺,是一種進程間的通信方式。

RPC 不是一個具體的技術(shù)碍讯,而是指整個網(wǎng)絡(luò)調(diào)用的過程悬蔽。

下面展示的是本地調(diào)用和遠程調(diào)用的示例:

例如有A服務(wù)器部署了一個應(yīng)用,B服務(wù)器也部署了一個應(yīng)用捉兴,現(xiàn)在A服務(wù)器上的應(yīng)用想要調(diào)用B服務(wù)器上的應(yīng)用的方法蝎困,由于,兩個應(yīng)用不在同一個服務(wù)器倍啥,因此不在同一個內(nèi)存空間禾乘,無法實現(xiàn)直接調(diào)用,需要通過網(wǎng)絡(luò)來表達調(diào)用的語義和傳達調(diào)用的數(shù)據(jù)虽缕。

<img src="media/16461457884893/16467573463113.jpg" style="zoom:50%;" />

<img src="media/16461457884893/16467573463123.jpg" style="zoom:50%;" />

RPC架構(gòu)

一個完整的RPC架構(gòu)包含四個完整的組件始藕,分表是Client,Client Stub氮趋,Server和Server Stub伍派。Stub可以理解為存根。

  • 客戶端(Client)剩胁,服務(wù)的調(diào)用方诉植。
  • 客戶端存根(Client Stub),存放服務(wù)端的地址消息昵观,將客戶端的請求打包成網(wǎng)絡(luò)消息晾腔,通過網(wǎng)絡(luò)遠程發(fā)送給服務(wù)方。
  • 服務(wù)端(Server)啊犬,真正的服務(wù)端提供者灼擂。
  • 服務(wù)端存根(Sever Stub),接收客戶端發(fā)送過來的消息觉至,將消息解包缤至,并調(diào)用本地方法。

<img src="media/16461457884893/16467573463136.jpg" style="zoom:50%;" />

<img src="media/16461457884893/16467573463152.jpg" style="zoom:50%;" />

1、客戶端以本地方式調(diào)用服務(wù)

2领斥、客戶端存根接收到調(diào)用之后嫉到,將方法參數(shù)組裝成能進行網(wǎng)絡(luò)傳輸?shù)南Ⅲw,消息體序列化為二進制

3月洛、客戶端通過socket將消息發(fā)送到服務(wù)端

4何恶、服務(wù)端存根接收到消息之后進行解碼,將消息對象反序列化

5嚼黔、服務(wù)端存根根據(jù)解碼結(jié)果調(diào)用本地服務(wù)

6细层、服務(wù)處理

7、本地服務(wù)執(zhí)行并將結(jié)果返回給服務(wù)端存根

8唬涧、服務(wù)端存根將返回結(jié)果打包成消息疫赎,將結(jié)果消息對象序列化

9、服務(wù)端通過socket將消息發(fā)送到客戶端

10碎节、客戶端存根接收到消息并進行解碼捧搞,將消息對象反序列化

11、客戶端得到最終結(jié)果

RPC的目標是只保留1狮荔、6胎撇、11,將其他的細節(jié)全部封裝起來殖氏。

注意:不管是什么類型的數(shù)據(jù)晚树,在輸出過程中都要轉(zhuǎn)換成二進制流,而接收方需要將二進制流恢復(fù)為對象雅采。

Java中常見的RPC框架有Hessian爵憎、gRPC、Dubbo等婚瓜,核心模塊都是通訊和序列化

https://github.com/grpc/grpc-java

RMI

Java的RMI指的是 Remote Method Invocation宝鼓,一種實現(xiàn)遠程過程調(diào)用(RPC)的API,能直接傳輸序列化后的Java對象闰渔。它的實現(xiàn)依賴于JVM席函,因此它能支撐一個JVM到另外一個JVM的調(diào)用铐望。

<img src="media/16461457884893/16467573463168.jpg" style="zoom:50%;" />

1冈涧、客戶端從遠程服務(wù)器的注冊表中查詢并獲取遠程對象的引用。

2正蛙、樁對象與遠程對象有相同的接口和方法列表督弓,當客戶端調(diào)用遠程對象時候,實際上是由樁對象代理完成乒验。

3愚隧、遠程引用層將樁的本地引用轉(zhuǎn)換為服務(wù)器上對象的遠程引用,再將調(diào)用層傳遞給傳輸層锻全,由傳輸層通過TCP協(xié)議發(fā)起調(diào)用狂塘。

4录煤、在服務(wù)端,傳輸層監(jiān)聽入站鏈接荞胡,收到客戶端的遠程調(diào)用之后妈踊,將引用轉(zhuǎn)發(fā)到上層的遠程引用層;

服務(wù)端的遠程引用層將客戶端發(fā)送的遠程引用轉(zhuǎn)換為本地虛擬機的引用泪漂,再將請求傳遞給骨架廊营;

骨架讀取參數(shù),將請求傳遞給服務(wù)器萝勤,由服務(wù)器進行實際的方法調(diào)用露筒。

5、如果遠程方法調(diào)用之后有返回值敌卓,服務(wù)器將結(jié)果沿著 "骨架->遠程引用層->傳輸層" 向下傳遞慎式。

6、客戶端的傳輸層接收到返回值之后假哎,又沿著 "傳輸層->遠程引用層->樁" 向上傳遞瞬捕,并最終將結(jié)果傳遞給客戶端程序。

RMI實例需求分析

1舵抹、服務(wù)端提供根據(jù)ID查詢用戶的方法

2肪虎、客戶端調(diào)用服務(wù)端方法,并返回用戶對象

3惧蛹、要求使用RMI進行遠程通訊

服務(wù)端實現(xiàn)

/**
 * RMI服務(wù)端
 *
 * @name: RMIServer
 * @author: terwer
 * @date: 2022-03-06 02:01
 **/
public class RMIServer {
    public static void main(String[] args) {
        try {
            // 1.注冊Registry實例扇救,綁定端口
            Registry registry = LocateRegistry.createRegistry(9998);
            // 2.創(chuàng)建遠程對象
            IUserService userService = new UserServiceImpl();
            // 3.將遠程對象注冊到RMI服務(wù)器(既服務(wù)端注冊表)
            registry.rebind("userService", userService);

            System.out.println("RMI服務(wù)端啟動成功");
        } catch (RemoteException e) {
            e.printStackTrace();
        }
    }
}

客戶端實現(xiàn)

/**
 * RMI客戶端
 *
 * @name: RMIClient
 * @author: terwer
 * @date: 2022-03-06 19:25
 **/
public class RMIClient {
    public static void main(String[] args) throws RemoteException, NotBoundException {
        // 1、獲取Registry實例
        Registry registry = LocateRegistry.getRegistry("127.0.0.1", 9998);
        // 2香嗓、通過Registry查找遠程對象
        IUserService userService = (IUserService) registry.lookup("userService");
        User user = userService.getUserById(1);
        System.out.println("userName = " + user.getName());
    }
}

效果預(yù)覽

<img src="media/16461457884893/16467573463195.png" alt="image-20220306193226744" style="zoom:50%;" />

<img src="media/16461457884893/16467573463236.png" alt="image-20220306193244530" style="zoom:50%;" />

基于Netty實現(xiàn)RPC框架

Dubbo 底層使用 Netty 作為網(wǎng)絡(luò)通訊框架迅腔,要求使用 Netty 實現(xiàn)一個簡單的 RPC 框架,消費者和提供者約定協(xié)議和接口靠娱,消費者遠程調(diào)用提供者的服務(wù)沧烈。

1、創(chuàng)建一個接口像云,定義抽象方法锌雀,用于消費者和提供者之間的約定。

2迅诬、創(chuàng)建一個提供者腋逆,該類需要監(jiān)聽消費者的請求,并按照約定返回數(shù)據(jù)侈贷。

3惩歉、創(chuàng)建一個消費者,該類需要透明的調(diào)用自己不存在的方法,內(nèi)部需要使用 Netty 進行數(shù)據(jù)通信撑蚌。

4上遥、提供者與消費者傳輸數(shù)據(jù)使用json字符串格式。

5争涌、提供者使用 Netty 集成 Spring Boot 環(huán)境露该。

案例:客戶端調(diào)用服務(wù)端,利用ID查詢User對象的方法

需求分析

<img src="media/16461457884893/16467573463305.png" alt="image-20220306234352362" style="zoom:50%;" />

具體實現(xiàn)

需要分成三個子項目

.
├── custom-rpc-api
├── custom-rpc-consumer
├── custom-rpc-provider
└── pom.xml

主項目

主項目的 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.terewrgreen</groupId>
    <artifactId>custom-rpc</artifactId>
    <packaging>pom</packaging>
    <version>1.0.0</version>

    <modules>
        <module>custom-rpc-api</module>
        <module>custom-rpc-provider</module>
        <module>custom-rpc-consumer</module>
    </modules>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.3</version>
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <curator.version>4.3.0</curator.version>
    </properties>

    <dependencies>
        <!--netty依賴 -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
        </dependency>
        <!--json依賴 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.79</version>
        </dependency>
        <!--lombok依賴 -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>
</project>

custom-rpc-api

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>custom-rpc</artifactId>
        <groupId>com.terewrgreen</groupId>
        <version>1.0.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>custom-rpc-api</artifactId>

    <name>custom-rpc-api</name>
    <url>http://www.terwergreen.com</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <pluginManagement>
        </pluginManagement>
    </build>
</project>

custom-rpc-consumer

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>custom-rpc</artifactId>
        <groupId>com.terewrgreen</groupId>
        <version>1.0.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>custom-rpc-consumer</artifactId>

    <name>custom-rpc-consumer</name>
    <url>http://www.terwergreen.com</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.terewrgreen</groupId>
            <artifactId>custom-rpc-api</artifactId>
            <version>1.0.0</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <pluginManagement>
        </pluginManagement>
    </build>
</project>

RpcClient

/**
 * Rpc客戶端
 * 1第煮、連接netty服務(wù)端
 * 2解幼、提供給調(diào)用者關(guān)閉資源的方法
 * 3、提供消息發(fā)送的方法
 *
 * @name: RpcClient
 * @author: terwer
 * @date: 2022-03-13 21:04
 **/
public class RpcClient {

    private NioEventLoopGroup group;
    private Channel channel;

    private String ip;
    private int port;

    private RpcClientHandler rpcClientHandler = new RpcClientHandler();
    private ExecutorService executorService = Executors.newCachedThreadPool();

    public RpcClient(String ip, int port) {
        this.ip = ip;
        this.port = port;
        initClient();
    }

    /**
     * 初始化客戶端包警,連接netty服務(wù)端
     */
    public void initClient() {

        try {
            // 創(chuàng)建線程組
            group = new NioEventLoopGroup();
            //  創(chuàng)建啟動助手
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .option(ChannelOption.SO_TIMEOUT, 3000)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            ChannelPipeline pipeline = channel.pipeline();
                            // String 編解碼器
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            // 客戶端處理類
                            pipeline.addLast(rpcClientHandler);
                        }
                    });
            channel = bootstrap.connect(ip, port).sync().channel();
            System.out.println("===========客戶端啟動成功==========");
        } catch (Exception e) {
            if (channel != null) {
                channel.close();
                System.out.println("客戶端關(guān)閉channel");
            }
            if (group != null) {
                group.shutdownGracefully();
                System.out.println("客戶端關(guān)閉group");
            }
            e.printStackTrace();
        }
    }

    public void close(){
        if (channel != null) {
            channel.close();
            System.out.println("外部調(diào)用客戶端關(guān)閉channel");
        }
        if (group != null) {
            group.shutdownGracefully();
            System.out.println("外部調(diào)用客戶端關(guān)閉group");
        }
    }

    public Object send(String msg) throws ExecutionException, InterruptedException {
        rpcClientHandler.setRequestMessage(msg);
        Future future = executorService.submit(rpcClientHandler);
        return future.get();
    }
}

RpcClienthandler

/**
 * 客戶端處理類
 * 1撵摆、發(fā)送消息
 * 2、接收消息
 *
 * @name: RpcClientHandler
 * @author: terwer
 * @date: 2022-03-13 23:01
 **/
public class RpcClientHandler extends SimpleChannelInboundHandler implements Callable {

    private ChannelHandlerContext ctx;
    // 消息
    private String requestMessage;
    private String responseMessage;

    public String getRequestMessage() {
        return requestMessage;
    }

    public void setRequestMessage(String requestMessage) {
        this.requestMessage = requestMessage;
    }

    /**
     * 通道讀取就緒事件
     *
     * @param channelHandlerContext
     * @param msg
     * @throws Exception
     */
    @Override
    protected synchronized void channelRead0(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
        responseMessage = (String) msg;
        // 喚醒等待線程
        notify();
    }

    /**
     * 通道就緒事件
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.ctx = ctx;
    }

    @Override
    public synchronized Object call() throws Exception {
        // 消息發(fā)送
        ctx.writeAndFlush(requestMessage);
        // 線程等待
        wait();
        return responseMessage;
    }
}

RpcClientProxy

/**
 * 客戶端代理類害晦,創(chuàng)建代理對象
 * 1特铝、封裝request請求對象
 * 2、創(chuàng)建RpcClient對象
 * 3壹瘟、發(fā)送消息
 * 4鲫剿、返回結(jié)果
 *
 * @name: RpcClientProxy
 * @author: terwer
 * @date: 2022-03-13 23:45
 **/
public class RpcClientProxy {
    public static Object createProxy(Class serviceClass) {
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{serviceClass}, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                // 1、封裝request請求對象
                RpcRequest rpcRequest = new RpcRequest();
                rpcRequest.setRequestId(UUID.randomUUID().toString());
                rpcRequest.setClassName(method.getDeclaringClass().getName());
                rpcRequest.setMethodName(method.getName());
                rpcRequest.setParameterTypes(method.getParameterTypes());
                rpcRequest.setParameters(args);

                // 2稻轨、創(chuàng)建RpcClient對象
                RpcClient rpcClient = new RpcClient("127.0.0.1", 9999);

                try {
                    // 3灵莲、發(fā)送消息
                    Object responseMessage = rpcClient.send(JSON.toJSONString(rpcRequest));

                    // 4、返回結(jié)果
                    RpcResponse response = JSON.parseObject(responseMessage.toString(), RpcResponse.class);
                    if (response.getError() != null) {
                        throw new RuntimeException(response.getError());
                    }
                    Object result = response.getResult();
                    Object object = JSON.parseObject(result.toString(), method.getReturnType());
                    return object;
                } catch (Exception e) {
                    throw e;
                } finally {
                    rpcClient.close();
                }


            }
        });
    }
}

ClientBoosStrap

/**
 * 客戶端啟動類
 *
 * @name: ClientBootStrap
 * @author: terwer
 * @date: 2022-03-14 00:00
 **/
public class ClientBootStrap {
    public static void main(String[] args) {
        IUSerService userService = (IUSerService) RpcClientProxy.createProxy(IUSerService.class);
        User user = userService.getById(1);
        System.out.println(user);
    }
}

custom-rpc-provider

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>custom-rpc</artifactId>
        <groupId>com.terewrgreen</groupId>
        <version>1.0.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>custom-rpc-provider</artifactId>

    <name>custom-rpc-provider</name>
    <url>http://www.terwergreen.com</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.terewrgreen</groupId>
            <artifactId>custom-rpc-api</artifactId>
            <version>1.0.0</version>
        </dependency>
        <!--Spring相關(guān)依賴 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <pluginManagement>
        </pluginManagement>
    </build>
</project>

UserServiceImpl

/**
 * 用戶服務(wù)實現(xiàn)類
 *
 * @name: UserServiceImpl
 * @author: terwer
 * @date: 2022-03-09 23:34
 **/
@RpcService
@Service
public class UserServiceImpl implements IUSerService {
    Map<Object, User> userMap = new HashMap<>();

    @Override
    public User getById(int id) {
        User user = new User();
        user.setId(1);
        user.setName("唐有煒");
        userMap.put(user.getId(), user);

        User user2 = new User();
        user2.setId(2);
        user2.setName("張三");
        userMap.put(user2.getId(), user2);

        return userMap.get(id);
    }
}

RpcServer

/**
 * 對外服務(wù)
 *
 * @name: RpcServer
 * @author: terwer
 * @date: 2022-03-09 23:53
 **/
@Service
public class RpcServer implements DisposableBean {
    private NioEventLoopGroup bossGroup;
    private NioEventLoopGroup workerGroup;

    @Autowired
    private RpcServerHandler rpcServerHandler;

    public void startServer(String ip, int port) {
        try {
            // 1殴俱、創(chuàng)建線程組
            bossGroup = new NioEventLoopGroup(1);
            workerGroup = new NioEventLoopGroup();

            // 2政冻、創(chuàng)建服務(wù)端啟動助手
            ServerBootstrap serverBootstrap = new ServerBootstrap();

            // 3、設(shè)置參數(shù)
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            ChannelPipeline pipeline = channel.pipeline();
                            // 添加String的編解碼器
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            // 業(yè)務(wù)處理類
                            pipeline.addLast(rpcServerHandler);
                        }
                    });

            // 4线欲、綁定端口
            ChannelFuture sync = serverBootstrap.bind(ip, port).sync();
            System.out.println("===========服務(wù)端啟動成功=============");
            sync.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (bossGroup != null) {
                bossGroup.shutdownGracefully();
                System.out.println("finally bossGroup成功關(guān)閉");
            }
            if (workerGroup != null) {
                workerGroup.shutdownGracefully();
                System.out.println("finally workerGroup成功關(guān)閉");
            }
        }
    }

    @Override
    public void destroy() throws Exception {
        if (bossGroup != null) {
            bossGroup.shutdownGracefully();
            System.out.println("destroy bossGroup成功關(guān)閉");
        }
        if (workerGroup != null) {
            workerGroup.shutdownGracefully();
            System.out.println("destroy workerGroup成功關(guān)閉");
        }
    }
}

RpcServerHandler

/**
 * 服務(wù)端處理類
 * <p>
 * 1明场、將標有@RpcService注解的類進行緩存
 * 2、接收客戶端請求
 * 3李丰、根據(jù)傳過來的beanName在緩存中查找對應(yīng)的bean
 * 4苦锨、解析請求中的方法名、參數(shù)類型趴泌、參數(shù)信息
 *
 * @name: RpcServerHandler
 * @author: terwer
 * @date: 2022-03-10 00:22
 **/
@Component
@ChannelHandler.Sharable
public class RpcServerHandler extends SimpleChannelInboundHandler<String> implements ApplicationContextAware {
    private static final Map SERVICE_INSTANCE_MAP = new ConcurrentHashMap();

    /**
     * 1舟舒、將標有@RpcService注解的類進行緩存
     *
     * @param applicationContext
     * @throws BeansException
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Map<String, Object> serviceMap = applicationContext.getBeansWithAnnotation(RpcService.class);
        if (serviceMap != null && serviceMap.size() > 0) {
            Set<Map.Entry<String, Object>> entries = serviceMap.entrySet();
            for (Map.Entry<String, Object> entry : entries) {
                Object serviceBean = entry.getValue();
                if (serviceBean.getClass().getInterfaces().length == 0) {
                    throw new RuntimeException("服務(wù)必須實現(xiàn)接口");
                }

                // 默認取第一個接口作為名稱
                SERVICE_INSTANCE_MAP.put(serviceBean.getClass().getInterfaces()[0].getName(), serviceBean);
            }

        }
    }

    /**
     * 通道讀取就緒事件
     *
     * @param channelHandlerContext
     * @param s
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        // 接收客戶端請求,轉(zhuǎn)換成RpcReuest
        RpcRequest rpcRequest = JSON.parseObject(msg, RpcRequest.class);
        RpcResponse rpcResponse = new RpcResponse();
        rpcResponse.setRequestId(rpcRequest.getRequestId());

        try {
            Object result = handler(rpcRequest);
            rpcResponse.setResult(result);
        } catch (Exception e) {
            rpcResponse.setError(e.getMessage());
            e.printStackTrace();
        }

        ctx.writeAndFlush(JSON.toJSONString(rpcResponse));
    }

    /**
     * 業(yè)務(wù)邏輯處理方法
     *
     * @param rpcRequest
     * @return
     */
    private Object handler(RpcRequest rpcRequest) throws InvocationTargetException {
        // 根據(jù)傳過來的beanName在緩存中查找對應(yīng)的bean
        Object serviceBean = SERVICE_INSTANCE_MAP.get(rpcRequest.getClassName());
        if(null == serviceBean){
            throw new RuntimeException("根據(jù)beanName找不到服務(wù)"+rpcRequest.getClassName());
        }

        // 解析請求中的方法名踱讨、參數(shù)類型魏蔗、參數(shù)信息
        Class<?> beanClass = serviceBean.getClass();
        String methodName = rpcRequest.getMethodName();
        Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
        Object[] parameters = rpcRequest.getParameters();

        // 反射調(diào)用
        FastClass fastClass = FastClass.create(beanClass);
        FastMethod fastMethod = fastClass.getMethod(methodName, parameterTypes);
        Object result = fastMethod.invoke(serviceBean, parameters);

        return result;
    }
}

ServerBootdtrapApplication

/**
 * 啟動類
 *
 * @name: ServerBootstrapApplication
 * @author: terwer
 * @date: 2022-03-09 23:46
 **/
@SpringBootApplication
public class ServerBootstrapApplication implements CommandLineRunner {
    @Autowired
    private RpcServer rpcServer;

    public static void main(String[] args) {
        SpringApplication.run(ServerBootstrapApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        new Thread(new Runnable() {
            @Override
            public void run() {
                rpcServer.startServer("127.0.0.1", 9999);
            }
        }).start();
    }
}

運行效果

<img src="https://raw.githubusercontent.com/terwer/upload/main/img/20220314005322.png" alt="image-20220314001934218" style="zoom:50%;" />

<img src="https://raw.githubusercontent.com/terwer/upload/main/img/20220314005333.png" alt="image-20220314002004686" style="zoom:50%;" />

錯誤解決

com.terewrgreen.rpc.provider.handler.RpcServerHandler is not a @Sharable handler, so can't be added or removed multiple times.

加上 @ChannelHandler.Sharable 注解即可砍的。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末痹筛,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌帚稠,老刑警劉巖谣旁,帶你破解...
    沈念sama閱讀 212,383評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異滋早,居然都是意外死亡榄审,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,522評論 3 385
  • 文/潘曉璐 我一進店門杆麸,熙熙樓的掌柜王于貴愁眉苦臉地迎上來搁进,“玉大人,你說我怎么就攤上這事昔头”剩” “怎么了?”我有些...
    開封第一講書人閱讀 157,852評論 0 348
  • 文/不壞的土叔 我叫張陵揭斧,是天一觀的道長莱革。 經(jīng)常有香客問我,道長讹开,這世上最難降的妖魔是什么盅视? 我笑而不...
    開封第一講書人閱讀 56,621評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮旦万,結(jié)果婚禮上闹击,老公的妹妹穿的比我還像新娘。我一直安慰自己成艘,他們只是感情好拇砰,可當我...
    茶點故事閱讀 65,741評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著狰腌,像睡著了一般除破。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上琼腔,一...
    開封第一講書人閱讀 49,929評論 1 290
  • 那天瑰枫,我揣著相機與錄音,去河邊找鬼丹莲。 笑死光坝,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的甥材。 我是一名探鬼主播盯另,決...
    沈念sama閱讀 39,076評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼洲赵!你這毒婦竟也來了鸳惯?” 一聲冷哼從身側(cè)響起商蕴,我...
    開封第一講書人閱讀 37,803評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎芝发,沒想到半個月后绪商,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,265評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡辅鲸,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,582評論 2 327
  • 正文 我和宋清朗相戀三年格郁,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片独悴。...
    茶點故事閱讀 38,716評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡例书,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出刻炒,到底是詐尸還是另有隱情雾叭,我是刑警寧澤,帶...
    沈念sama閱讀 34,395評論 4 333
  • 正文 年R本政府宣布落蝙,位于F島的核電站织狐,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏筏勒。R本人自食惡果不足惜移迫,卻給世界環(huán)境...
    茶點故事閱讀 40,039評論 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望管行。 院中可真熱鬧厨埋,春花似錦、人聲如沸捐顷。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,798評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽迅涮。三九已至废赞,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間叮姑,已是汗流浹背唉地。 一陣腳步聲響...
    開封第一講書人閱讀 32,027評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留传透,地道東北人耘沼。 一個月前我還...
    沈念sama閱讀 46,488評論 2 361
  • 正文 我出身青樓,卻偏偏與公主長得像朱盐,于是被迫代替她去往敵國和親群嗤。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,612評論 2 350

推薦閱讀更多精彩內(nèi)容