【手擼RPC框架】SpringBoot+Netty4實現(xiàn)RPC框架

【手擼RPC框架】SpringBoot+Netty4實現(xiàn)RPC框架

線程模型

Netty高性能架構設計

簡單了解React線程模型押赊,參考文章【五分鐘快速理解 Reactor 模型】

舉例說明:Reactor的三種線程模型

線程模型1:傳統(tǒng)阻塞 I/O 服務模型

模型特點:

  • 采用阻塞IO模式獲取輸入的數(shù)據(jù)
  • 每個鏈接都需要獨立的線程完成數(shù)據(jù)的輸入疼约,業(yè)務處理、數(shù)據(jù)返回蹦锋。

問題分析:

  • 當并發(fā)數(shù)很大炕置,就會創(chuàng)建大量的線程荣挨,占用很大系統(tǒng)資源
  • 連接創(chuàng)建后男韧,如果當前線程暫時沒有數(shù)據(jù)可讀,該線程會阻塞在read操作默垄,造成線程資源浪費此虑。

線程模型2:Reactor 模式

針對傳統(tǒng)阻塞I/O服務模型的2個缺點,解決方案如下:

  • 基于 I/O 復用模型:多個連接共用一個阻塞對象厕倍,應用程序只需要在一個阻塞對象等待寡壮,無需阻塞等待所有連接。當某個連接有新的數(shù)據(jù)可以處理時讹弯,操作系統(tǒng)通知應用程序况既,線程從阻塞狀態(tài)返回,開始進行業(yè)務處理组民。Reactor對應的叫法: 1. 反應器模式 2. 分發(fā)者模式(Dispatcher) 3. 通知者模式(notifier)
  • 基于線程池復用線程資源:不必再為每個連接創(chuàng)建線程棒仍,將連接完成后的業(yè)務處理任務分配給線程進行處理,一個線程可以處理多個連接的業(yè)務臭胜。

單 Reactor 單線程

模型分析

  • 優(yōu)點:模型簡單莫其,沒有多線程、進程通信耸三、競爭的問題乱陡,全部都在一個線程中完成
  • 缺點:性能問題,只有一個線程仪壮,無法完全發(fā)揮多核 CPU 的性能憨颠。Handler 在處理某個連接上的業(yè)務時,整個進程無法處理其他連接事件积锅,很容易導致性能瓶頸
  • 缺點:可靠性問題爽彤,線程意外終止,或者進入死循環(huán)缚陷,會導致整個系統(tǒng)通信模塊不可用适篙,不能接收和處理外部消息,造成節(jié)點故障
  • 使用場景:客戶端的數(shù)量有限箫爷,業(yè)務處理非橙陆冢快速,比如 Redis在業(yè)務處理的時間復雜度 O(1) 的情況

單 Reactor 多線程

模型分析

  • 優(yōu)點:可以充分的利用多核cpu 的處理能力
  • 缺點:多線程數(shù)據(jù)共享和訪問比較復雜虎锚, reactor 處理所有的事件的監(jiān)聽和響應硫痰,在單線程運行, 在高并發(fā)場景容易出現(xiàn)性能瓶頸.

主從 Reactor 多線程

模型分析

  • 優(yōu)點:父線程與子線程的數(shù)據(jù)交互簡單職責明確翁都,父線程只需要接收新連接碍论,子線程完成后續(xù)的業(yè)務處理谅猾。
  • 優(yōu)點:父線程與子線程的數(shù)據(jù)交互簡單柄慰,Reactor 主線程只需要把新連接傳給子線程鳍悠,子線程無需返回數(shù)據(jù)
  • 缺點:編程復雜度較高
  • 結合實例:這種模型在許多項目中廣泛使用,包括 Nginx 主從 Reactor 多進程模型坐搔,Memcached 主從多線程藏研,Netty 主從多線程模型的支持

先實現(xiàn)簡單的Netty通信

服務端示例

public static void main(String[] args) {
    //創(chuàng)建連接線程組,線程數(shù)為1概行。只負責處理連接請求
    NioEventLoopGroup boss = new NioEventLoopGroup(1);
    //創(chuàng)建工作線程組蠢挡,線程數(shù)默認為cpu核數(shù)*2。處理與客戶端的業(yè)務處理
    NioEventLoopGroup worker = new NioEventLoopGroup();
    //創(chuàng)建Server端的啟動對象
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    //配置線程組
    serverBootstrap.group(boss, worker)
        //使用 NioServerSocketChannel 作為服務器的通道實現(xiàn)
        .channel(NioServerSocketChannel.class)
        //給worker線程組初始化處理器
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline()
                    //添加字符串的編解碼器
                    .addLast(new StringDecoder())
                    .addLast(new StringEncoder())
                    //添加對象的編解碼器凳忙,ClassResolvers.weakCachingConcurrentResolver設置弱引用WeakReferenceMap緩存類加載器业踏,防止內存溢出
                    .addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())))
                    .addLast(new ObjectEncoder())
                    //添加自定義的業(yè)務處理器
                    .addLast(new SimpleChannelInboundHandler<Object>() {
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            log.info("客戶端連接啦。涧卵。勤家。客戶端地址:{}", ctx.channel().remoteAddress());
                        }
                        @Override
                        protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
                            log.info("服務端接收到的數(shù)據(jù):{}", o.toString());
                            //價值1個億的AI代碼
                            String str = o.toString();
                            str = str.replace("嗎", "");
                            str = str.replace("柳恐?", "伐脖!");
                            str = str.replace("? ", "乐设! ");
                            channelHandlerContext.writeAndFlush(str);
                        }
                    });
            }
        });
    //啟動并且監(jiān)聽
    ChannelFuture channelFuture = serverBootstrap.bind(8888).syncUninterruptibly();
    //監(jiān)聽關閉通道
    channelFuture.channel().closeFuture();
}

客戶端示例

public static void main(String[] args) {
    //設置客戶端工作線程
    NioEventLoopGroup worker = new NioEventLoopGroup();
    //創(chuàng)建客戶端啟動對象
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(worker)
        //通道連接者
        .channel(NioSocketChannel.class)
        //給worker線程組初始化處理器
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline()
                    //添加字符串的編解碼器
                    .addLast(new StringDecoder())
                    .addLast(new StringEncoder())
                    //添加對象的編解碼器讼庇,ClassResolvers.weakCachingConcurrentResolver設置弱引用WeakReferenceMap緩存類加載器,防止內存溢出
                    .addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())))
                    .addLast(new ObjectEncoder())
                    //添加自定義的業(yè)務處理器
                    .addLast(new SimpleChannelInboundHandler<Object>() {

                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            ctx.writeAndFlush("哈哈哈");
                        }

                        @Override
                        protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
                            log.info("客戶端接收到的數(shù)據(jù):{}", o.toString());
                        }
                    });
            }
        });

    ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).syncUninterruptibly();
    //客戶端需要輸入信息近尚,創(chuàng)建一個掃描器
    Scanner scanner = new Scanner(System.in);
    while (scanner.hasNextLine()) {
        String msg = scanner.nextLine();
        //通過channel發(fā)送到服務器端
        channel.writeAndFlush(msg + "\r\n");
    }
    channelFuture.channel().closeFuture();
}

快啟動試試看把蠕啄,不過需要注意的是,得先啟動服務端哦~

SpringBoot + Netty4實現(xiàn)rpc框架

好了肿男,接下來就讓我們進入正題介汹,讓我們利用我們所學的知識去實現(xiàn)自己一個簡單的rpc框架吧

簡單說下RPC(Remote Procedure Call)遠程過程調用,簡單的理解是一個節(jié)點請求另一個節(jié)點提供的服務舶沛。讓兩個服務之間調用就像調用本地方法一樣嘹承。

RPC時序圖:

QQ截圖20210421170511.png

RPC流程:

  1. 【客戶端】發(fā)起調用
  2. 【客戶端】數(shù)據(jù)編碼
  3. 【客戶端】發(fā)送編碼后的數(shù)據(jù)到服務端
  4. 【服務端】接收客戶端發(fā)送的數(shù)據(jù)
  5. 【服務端】對數(shù)據(jù)進行解碼
  6. 【服務端】處理消息業(yè)務并返回結果值
  7. 【服務端】對結果值編碼
  8. 【服務端】將編碼后的結果值回傳給客戶端
  9. 【客戶端】接收結果值
  10. 【客戶端】解碼結果值
  11. 【客戶端】處理返回數(shù)據(jù)業(yè)務

引入依賴

<dependencies>
    <!-- SpringBoot依賴 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <!-- Spring容器上下文 -->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
    </dependency>
    <!-- Spring配置 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <!-- Netty4 -->
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.58.Final</version>
    </dependency>
    <!-- 工具 -->
    <dependency>
        <groupId>cn.hutool</groupId>
        <artifactId>hutool-all</artifactId>
        <version>5.5.8</version>
    </dependency>
</dependencies>

編寫服務端

自定義消息協(xié)議:

/**
 * @author zc
 * @date 2021/3/1 17:43
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RpcMessage implements Serializable {
    private static final long serialVersionUID = 430507739718447406L;
    /**
     * interface接口名
     */
    private String name;
    /**
     * 方法名
     */
    private String methodName;
    /**
     * 參數(shù)類型
     */
    private Class<?>[] parTypes;
    /**
     * 參數(shù)
     */
    private Object[] pars;
    /**
     * 結果值
     */
    private Object result;
}

自定義Rpc注解:

/**
 * @author zc
 * @date 2021/3/2 15:36
 */
@Target(value = {ElementType.TYPE, ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface RpcServer {
}

定義ServerHandle業(yè)務處理器:

/**
 * Netty Server端Handle處理類,消息體RpcMessage
 * 實現(xiàn)ApplicationContextAware接口:該接口可以加載獲取到所有的 spring bean如庭。
 * 實現(xiàn)了這個接口的bean叹卷,當spring容器初始化的時候,會自動的將ApplicationContext注入進來
 *
 * @author ZC
 * @date 2021/3/1 22:15
 */
@Slf4j
@ChannelHandler.Sharable
public class ServerHandle extends SimpleChannelInboundHandler<RpcMessage> implements ApplicationContextAware {
    private Map<String, Object> serviceMap;

    /**
     * 在類被Spring容器加載時會自動執(zhí)行setApplicationAware
     *
     * @param applicationContext Spring上下文
     * @throws BeansException 異常信息
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        //從Spring容器中獲取到所有擁有@RpcServer注解的Beans集合坪它,Map<Name(對象類型骤竹,對象全路徑名),實例對象>
        Map<String, Object> beansWithAnnotation = applicationContext.getBeansWithAnnotation(RpcServer.class);
        log.info("被@RpcServer注解加載的Bean: {}", beansWithAnnotation);
        if (beansWithAnnotation.size() > 0) {
            Map<String, Object> map = new ConcurrentHashMap<>(16);
            for (Object o : beansWithAnnotation.values()) {
                //獲取該實例對象實現(xiàn)的接口Class
                Class<?> anInterface = o.getClass().getInterfaces()[0];
                //獲取該接口類名,作為Key往毡,實例對象作為Value
                map.put(anInterface.getName(), o);
            }
            //使用變量接住map
            serviceMap = map;
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("客戶端連接了: {}", ctx.channel().remoteAddress());
        super.channelActive(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("異常信息");
        cause.printStackTrace();
        super.exceptionCaught(ctx, cause);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage rpcMessage) throws Exception {
        log.info("客戶端發(fā)送的消息:{}", rpcMessage);
        //從Map中獲取實例對象
        Object service = serviceMap.get(rpcMessage.getName());
        //獲取調用方法
        Method method = service.getClass().getMethod(rpcMessage.getMethodName(), rpcMessage.getParTypes());
        method.setAccessible(true);
        //反射調用實例對象方法蒙揣,獲取返回值
        Object result = method.invoke(service, rpcMessage.getPars());
        rpcMessage.setResult(JSONUtil.toJsonStr(result));
        log.info("回給客戶端的消息:{}", rpcMessage);
        //Netty服務端將數(shù)據(jù)寫會Channel并發(fā)送給客戶端,同時添加一個監(jiān)聽器开瞭,當所有數(shù)據(jù)包發(fā)送完成后懒震,關閉通道
        channelHandlerContext.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE);
    }
}

定義NettyServer端:

/**
 * Netty服務端
 *
 * @author zc
 * @date 2021/2/24 13:23
 **/
@Slf4j
public class NettyServer {

    /**
     * server端處理器
     */
    private final ServerHandle serverHandle;
    /**
     * 服務端通道
     */
    private Channel channel;

    /**
     * 構造器
     *
     * @param serverHandle server處理器
     */
    public NettyServer(ServerHandle serverHandle) {
        this.serverHandle = serverHandle;
    }

    /**
     * 啟動
     *
     * @param port 啟動端口
     */
    public void start(int port) {
        EventLoopGroup boss = new NioEventLoopGroup(1);
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline()
                                    .addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())))
                                    .addLast(new ObjectEncoder())
                                    .addLast(serverHandle);
                        }
                    });

            final ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly();
            log.info("服務端啟動-端口: {}", port);
            channel = channelFuture.channel();
            channel.closeFuture().syncUninterruptibly();
        } catch (Exception e) {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

    /**
     * 關閉當前通道
     */
    public void stop() {
        channel.close();
    }
}

自定義rpc配置屬性類:

/**
 * @author zc
 * @date 2021/3/4 23:38
 */
@Component
@ConfigurationProperties(prefix = "netty")
@Data
public class NettyRpcProperties {
    private int serverPort;
}`

創(chuàng)建Server端啟動配置類:

/**
 * NettyServer服務端配置類
 *
 * @author zc
 * @date 2021/3/1 18:24
 */
@Slf4j
@Configuration
@EnableConfigurationProperties(NettyRpcProperties.class)
public class ServerBeanConfig {

    private final NettyRpcProperties nettyRpcProperties;

    @Autowired
    public ServerBeanConfig(NettyRpcProperties nettyRpcProperties) {
        this.nettyRpcProperties = nettyRpcProperties;
    }

    /**
     * 配置ServerHandle
     *
     * @return ServerHandle處理類
     */
    @Bean
    public ServerHandle serverHandle() {
        return new ServerHandle();
    }

    /**
     * 配置NettyServer
     *
     * @param handle ServerHandle處理類
     * @return NettyServer
     */
    @Bean
    public NettyServer nettyServer(ServerHandle handle) {
        NettyServer nettyServer = new NettyServer(handle);
//        nettyServer.start(nettyRpcProperties.getServerPort());
        return nettyServer;
    }

    /**
     * 解決SpringBoot端口無法監(jiān)聽問題
     */
    @Component
    static class NettyServerStart implements ApplicationRunner {
        private final NettyServer nettyServer;
        private final NettyRpcProperties properties;

        @Autowired
        NettyServerStart(NettyServer nettyServer, NettyRpcProperties properties) {
            this.nettyServer = nettyServer;
            this.properties = properties;
        }

        @Override
        public void run(ApplicationArguments args) throws Exception {
            log.info("===============ApplicationRunner");
            if (nettyServer != null) {
                nettyServer.start(properties.getServerPort());
            }
        }
    }
}

注入Spring容器

此時有兩種方式讓該配置自動注入Spring容器生效:

  1. 自動注入

    在resource目錄下創(chuàng)建META-INF目錄罩息,創(chuàng)建spring.factories文件

    在該文件里寫上

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=${包路徑:xxx.xxx.xxx}.${配置類:ServerBeanConfig}

    配置好之后,在SpringBoot啟動時會自動加載該配置類个扰。

  2. 通過注解注入

    /**
     * 自定義SpringBoot啟動注解
     * 注入ServerBeanConfig配置類
     *
     * @author ZC
     * @date 2021/3/1 23:48
     */
    @Target({ElementType.TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Inherited
    @ImportAutoConfiguration({ServerBeanConfig.class})
    public @interface EnableNettyServer {
    }
    

編寫客戶端


創(chuàng)建客戶端處理器`ClientHandle

/**
 * @author zc
 * @date 2021/3/2 15:19
 */
@Slf4j
@ChannelHandler.Sharable
public class ClientHandle extends SimpleChannelInboundHandler<RpcMessage> {
    /**
     * 定義消息Map瓷炮,將連接通道Channel作為key,消息返回值作為value
     */
    private final ConcurrentMap<Channel, RpcMessage> rpcMessageConcurrentMap;

    public ClientHandle(ConcurrentMap<Channel, RpcMessage> rpcMessageConcurrentMap) {
        this.rpcMessageConcurrentMap = rpcMessageConcurrentMap;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage rpcMessage) throws Exception {
        log.info("客戶端收到服務端消息:{}", rpcMessage);
        rpcMessageConcurrentMap.put(channelHandlerContext.channel(), rpcMessage);
    }
}

創(chuàng)建客戶端啟動類NettyClient

/**
 * @author ZC
 * @date 2021/3/1 23:30
 */
@Slf4j
public class NettyClient {

    private Channel channel;
    /**
     * 存放請求編號與響應對象的映射關系
     */
    private final ConcurrentMap<Channel, RpcMessage> rpcMessageConcurrentMap = new ConcurrentHashMap<>();

    public RpcMessage send(int port, final RpcMessage rpcMessage) {
        //客戶端需要一個事件循環(huán)組
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline()
                                    .addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())))
                                    .addLast(new ObjectEncoder())
                                    .addLast(new ClientHandle(rpcMessageConcurrentMap));
                        }
                    });
            final ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", port).syncUninterruptibly();
            log.info("連接服務端成功: " + channelFuture.channel().remoteAddress());
            channel = channelFuture.channel();
            channel.writeAndFlush(rpcMessage);
            log.info("發(fā)送數(shù)據(jù)成功:{}", rpcMessage);
            channel.closeFuture().syncUninterruptibly();
            return rpcMessageConcurrentMap.get(channel);
        } catch (Exception e) {
            log.error("client exception", e);
            return null;
        } finally {
            group.shutdownGracefully();
            //移除請求編號和響應對象直接的映射關系
            rpcMessageConcurrentMap.remove(channel);
        }
    }

    public void stop() {
        channel.close();
    }
}

定義Netty客戶端Bean后置處理器

/**
 * Netty客戶端Bean后置處理器
 * 實現(xiàn)Spring后置處理器接口:BeanPostProcessor
 * 在Bean對象在實例化和依賴注入完畢后递宅,在顯示調用初始化方法的前后添加我們自己的邏輯娘香。注意是Bean實例化完畢后及依賴注入完成后觸發(fā)的
 *
 * @author ZC
 * @date 2021/3/2 23:00
 */
@Slf4j
public class NettyClientBeanPostProcessor implements BeanPostProcessor {

    private final NettyClient nettyClient;

    public NettyClientBeanPostProcessor(NettyClient nettyClient) {
        this.nettyClient = nettyClient;
    }

    /**
     * 實例化、依賴注入完畢办龄,在調用顯示的初始化之前完成一些定制的初始化任務
     * 注意:方法返回值不能為null
     * 如果返回null那么在后續(xù)初始化方法將報空指針異澈嬲溃或者通過getBean()方法獲取不到Bean實例對象
     * 因為后置處理器從Spring IoC容器中取出bean實例對象沒有再次放回IoC容器中
     */
    @Override
    public Object postProcessBeforeInitialization(Object bean, @Nullable String beanName) throws BeansException {
        //獲取實例Class
        Class<?> beanClass = bean.getClass();
        do {
            //獲取該類所有字段
            Field[] fields = beanClass.getDeclaredFields();
            for (Field field : fields) {
                //判斷該字段是否擁有@RpcServer
                if (field.getAnnotation(RpcServer.class) != null) {
                    field.setAccessible(true);
                    try {
                        //通過JDK動態(tài)代理獲取該類的代理對象
                        Object o = Proxy.newProxyInstance(field.getType().getClassLoader(), new Class[]{field.getType()}, new ClientInvocationHandle(nettyClient));
                        //將代理類注入該字段
                        field.set(bean, o);
                        log.info("創(chuàng)建代理類 ===>>> {}", beanName);
                    } catch (IllegalAccessException e) {
                        log.error(e.getMessage());
                    }
                }
            }
        } while ((beanClass = beanClass.getSuperclass()) != null);
        return bean;
    }

    /**
     * 實例化、依賴注入俐填、初始化完畢時執(zhí)行
     * 注意:方法返回值不能為null
     * 如果返回null那么在后續(xù)初始化方法將報空指針異尘饕Γ或者通過getBean()方法獲取不到Bean實例對象
     * 因為后置處理器從Spring IoC容器中取出bean實例對象沒有再次放回IoC容器中
     */
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        // 可以根據(jù)beanName不同執(zhí)行不同的處理操作
        return bean;
    }

    /**
     * JDK動態(tài)代理處理器
     */
    static class ClientInvocationHandle implements InvocationHandler {
        private final NettyClient nettyClient;

        public ClientInvocationHandle(NettyClient nettyClient) {
            this.nettyClient = nettyClient;
        }

        /**
         * 代理方法調用
         *
         * @param proxy  代理類
         * @param method 方法
         * @param args   參數(shù)
         * @return 返回值
         */
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) {
            //組裝Netty參數(shù)
            RpcMessage rpcMessage = RpcMessage.builder()
                    .name(method.getDeclaringClass().getName())
                    .methodName(method.getName())
                    .parTypes(method.getParameterTypes())
                    .pars(args)
                    .build();
            //調用Netty,發(fā)送數(shù)據(jù)
            RpcMessage send = nettyClient.send(1111, rpcMessage);
            log.info("接收到服務端數(shù)據(jù):{}, 返回結果值 ====》》》》{}", send, send.getResult());
            return send.getResult();
        }
    }
}

定義客戶端配置類

/**
 * @author zc
 * @date 2021/3/1 18:24
 */
@Configuration
public class ClientBeanConfig {

    @Bean
    public NettyClient nettyClient() {
        return new NettyClient();
    }

    @Bean
    public NettyClientBeanPostProcessor nettyClientBeanPostProcessor(NettyClient nettyClient) {
        return new NettyClientBeanPostProcessor(nettyClient);
    }
}

最后和服務端一樣玷禽,注入Spring容器

/**
 * @author ZC
 * @date 2021/3/1 23:48
 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@ImportAutoConfiguration({ClientBeanConfig.class})
public @interface EnableNettyClient {
}

至此我們的SpringBoot + Netty4的就已經(jīng)實現(xiàn)了最最簡單的rpc框架模式了赫段;然后我們就可以引用我們自己的rpc依賴了。

最后再執(zhí)行一下maven命令

mvn install

netty-rpc-examples例子

接口服務

pom里啥也沒有矢赁。糯笙。。

定義一個接口

/**
 * @author zc
 * @date 2021/3/1 17:55
 */
public interface Test1Api {

    void test();

    void test(int id, String name);

    String testStr(int id);

    Object testObj();
}

rpc-server服務端

正常的SpringBoot工程

引入pom

<!-- 自定義rpc依賴 -->
<dependency>
    <groupId>cn.happyloves.rpc</groupId>
    <artifactId>netty-rpc</artifactId>
    <version>0.0.1</version>
</dependency>
<!-- 接口依賴 -->
<dependency>
    <groupId>cn.happyloves.netty.rpc.examples.api</groupId>
    <artifactId>rpc-api</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

配置屬性

# 應用名稱
spring.application.name=rpc-server
# 應用服務 WEB 訪問端口
server.port=8080
netty.server-port=1111

創(chuàng)建一個實體類

/**
 * @author ZC
 * @date 2021/3/2 23:59
 */
@Data
public class Account implements Serializable {
    private static final long serialVersionUID = 667178018106218163L;
    private Integer id;

    private String name;
    private String username;
    private String password;
}

創(chuàng)建Server實現(xiàn)Test1Api接口

/**
 * @author ZC
 * @date 2021/3/2 23:59
 */
@Slf4j
@Service
@RpcServer
public class TestServiceImpl implements Test1Api {
    @Override
    public void test() {
        log.info("111111111");
    }

    @Override
    public void test(int id, String name) {
        log.info("222222222,{},{}", id, name);
    }

    @Override
    public String testStr(int id) {
        log.info("33333333333333333,{}", id);
        return "33333333333333333 " + id;
    }

    @Override
    public Object testObj() {
        log.info("444444444444444444");
        Account account = new Account();
        account.setName("張三");
        return account;
    }
}

最后在SpringBoot啟動類上加上@EnableNettyServer

/**
 * @author ZC
 * @date 2021/3/2 23:55
 */
@EnableNettyServer
@SpringBootApplication
public class RpcServerApplication {
    public static void main(String[] args) {
        SpringApplication.run(RpcServerApplication.class, args);
    }
}

rpc-server客戶端

引入pom依賴

<dependency>
    <groupId>cn.happyloves.rpc</groupId>
    <artifactId>netty-rpc</artifactId>
    <version>0.0.1</version>
</dependency>
<dependency>
    <groupId>cn.happyloves.netty.rpc.examples.api</groupId>
    <artifactId>rpc-api</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

創(chuàng)建Controller

/**
 * @author ZC
 * @date 2021/3/3 0:04
 */
@RestController
public class ClientController {
    @RpcServer
    private Test1Api testServiceImpl;

    @GetMapping("/test1")
    public void test() {
        testServiceImpl.test();
    }

    @GetMapping("/test2")
    public void test(int id, String name) {
        testServiceImpl.test(id, name);
    }

    @GetMapping("/test3")
    public String testStr(int id) {
        return testServiceImpl.testStr(id);
    }

    @GetMapping("/test4")
    public Object testObj() {
        return testServiceImpl.testObj();
    }
}

最后在啟動類上加上注解@EnableNettyClient

@EnableNettyClient
@SpringBootApplication
public class RpcClientApplication {
    public static void main(String[] args) {
        SpringApplication.run(RpcClientApplication.class, args);
    }
}

先運行服務端撩银,在運行客戶端给涕,然后在調用客戶端接口就可以看到服務端能夠接收到客戶端發(fā)來的消息,然后服務端處理并返回额获,客戶端接收并返回够庙。。抄邀。

至此耘眨,一個小demo就完成了。

當然啦剔难,后續(xù)還有很多需求需要處理的偶宫,比方說當前demo中客戶端每次通信都需要創(chuàng)建一個實例去連接、服務的注冊吵冒、客戶端和服務端是同一個應用等等桦锄,這個后面再慢慢完善吧
趙小胖個人博客:https://zc.happyloves.cn:4443/wordpress/

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末图甜,一起剝皮案震驚了整個濱河市黑毅,隨后出現(xiàn)的幾起案子矿瘦,更是在濱河造成了極大的恐慌缚去,老刑警劉巖琼开,帶你破解...
    沈念sama閱讀 221,273評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件搞动,死亡現(xiàn)場離奇詭異鹦肿,居然都是意外死亡,警方通過查閱死者的電腦和手機碾篡,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,349評論 3 398
  • 文/潘曉璐 我一進店門魁瞪,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人剔蹋,你說我怎么就攤上這事辅髓∑溃” “怎么了洛口?”我有些...
    開封第一講書人閱讀 167,709評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長买优。 經(jīng)常有香客問我,道長挺举,這世上最難降的妖魔是什么杀赢? 我笑而不...
    開封第一講書人閱讀 59,520評論 1 296
  • 正文 為了忘掉前任湘纵,我火速辦了婚禮瞻佛,結果婚禮上,老公的妹妹穿的比我還像新娘适刀。我一直安慰自己取视,他們只是感情好,可當我...
    茶點故事閱讀 68,515評論 6 397
  • 文/花漫 我一把揭開白布常挚。 她就那樣靜靜地躺著作谭,像睡著了一般。 火紅的嫁衣襯著肌膚如雪奄毡。 梳的紋絲不亂的頭發(fā)上折欠,一...
    開封第一講書人閱讀 52,158評論 1 308
  • 那天,我揣著相機與錄音,去河邊找鬼锐秦。 笑死咪奖,一個胖子當著我的面吹牛,可吹牛的內容都是我干的酱床。 我是一名探鬼主播羊赵,決...
    沈念sama閱讀 40,755評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼扇谣!你這毒婦竟也來了昧捷?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,660評論 0 276
  • 序言:老撾萬榮一對情侶失蹤揍堕,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后汤纸,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體衩茸,經(jīng)...
    沈念sama閱讀 46,203評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,287評論 3 340
  • 正文 我和宋清朗相戀三年贮泞,在試婚紗的時候發(fā)現(xiàn)自己被綠了楞慈。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,427評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡啃擦,死狀恐怖囊蓝,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情令蛉,我是刑警寧澤聚霜,帶...
    沈念sama閱讀 36,122評論 5 349
  • 正文 年R本政府宣布,位于F島的核電站珠叔,受9級特大地震影響蝎宇,放射性物質發(fā)生泄漏。R本人自食惡果不足惜祷安,卻給世界環(huán)境...
    茶點故事閱讀 41,801評論 3 333
  • 文/蒙蒙 一姥芥、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧汇鞭,春花似錦凉唐、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,272評論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至读整,卻和暖如春玄坦,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,393評論 1 272
  • 我被黑心中介騙來泰國打工煎楣, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留豺总,地道東北人。 一個月前我還...
    沈念sama閱讀 48,808評論 3 376
  • 正文 我出身青樓择懂,卻偏偏與公主長得像喻喳,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子困曙,可洞房花燭夜當晚...
    茶點故事閱讀 45,440評論 2 359

推薦閱讀更多精彩內容