手寫 RPC 框架

前言

在微服務(wù)當(dāng)?shù)赖慕裉煳雒植际较到y(tǒng)越來越重要,實(shí)現(xiàn)服務(wù)化首先就要考慮服務(wù)之間的通信問題逃默。這里面涉及序列化鹃愤、反序列化、尋址完域、連接等等問題软吐。不過,有了 RPC 框架吟税,我們就無需苦惱凹耙。

一、什么是 RPC?

RPC(Remote Procedure Call)— 遠(yuǎn)程過程調(diào)用肠仪,是一個(gè)計(jì)算機(jī)通信協(xié)議肖抱。該協(xié)議允許運(yùn)行于一臺(tái)計(jì)算機(jī)的程序調(diào)用另一臺(tái)計(jì)算機(jī)的程序,而程序員無需額外地為這個(gè)交互作用編程异旧。
值得注意是意述,兩個(gè)或多個(gè)應(yīng)用程序都分布在不同的服務(wù)器上,它們之間的調(diào)用都像是本地方法調(diào)用一樣。


RPC框架有很多荤崇,比較知名的如阿里的 Dubbo镐依、google 的 gRPC、Go 語言的 rpcx天试、Apache 的 thrift槐壳。當(dāng)然了,還有Spring Cloud喜每,不過對(duì)于 Spring Cloud 來說务唐,RPC 只是它的一個(gè)功能模塊。

如果要實(shí)現(xiàn)一個(gè)基本功能带兜、簡(jiǎn)單的 RPC枫笛,要涉及哪些東西呢?

  • 動(dòng)態(tài)代理
  • 反射
  • 序列化刚照、反序列化
  • 網(wǎng)絡(luò)通信
  • 編解碼
  • 服務(wù)發(fā)現(xiàn)與注冊(cè)
  • 心跳與鏈路檢測(cè)
  • ......

下面刑巧,我們一起通過代碼來分析,怎么把技術(shù)點(diǎn)串到一起无畔,實(shí)現(xiàn)我們自己的 RPC啊楚。

二、環(huán)境準(zhǔn)備

在開始之前浑彰,筆者先介紹一下所用到的軟件環(huán)境恭理。

SpringBoot、Netty郭变、zookeeper颜价、zkclient、fastjson

  • SpringBoot 項(xiàng)目的基礎(chǔ)框架
  • Netty 通信服務(wù)器
  • zookeeper 服務(wù)發(fā)現(xiàn)與注冊(cè)
  • zkclient zookeeper客戶端
  • fastjson 序列化诉濒、反序列化

三周伦、RPC 生產(chǎn)者

1、服務(wù)接口API

整個(gè) RPC 系統(tǒng)未荒,我們分為生成者和消費(fèi)者专挪。首先他們有一個(gè)共同的服務(wù)接口 API。在這里茄猫,我們搞一個(gè)操作用戶信息的 service 接口狈蚤。

public interface InfoUserService {
    List<InfoUser> insertInfoUser(InfoUser infoUser);
    InfoUser getInfoUserById(String id);
    void deleteInfoUserById(String id);
    String getNameById(String id);
    Map<String,InfoUser> getAllUser();
}

2困肩、服務(wù)類實(shí)現(xiàn)

作為生產(chǎn)者划纽,它當(dāng)然要有實(shí)現(xiàn)類,我們創(chuàng)建InfoUserServiceImpl實(shí)現(xiàn)類锌畸,并用注解把它標(biāo)注為 RPC 的服務(wù)勇劣,然后注冊(cè)到 Srping 的 Bean 容器中。在這里,我們把infoUserMap當(dāng)做數(shù)據(jù)庫比默,存儲(chǔ)用戶信息幻捏。

package com.viewscenes.netsupervisor.service.impl;

@RpcService
public class InfoUserServiceImpl implements InfoUserService {

    Logger logger = LoggerFactory.getLogger(this.getClass());
    //當(dāng)做數(shù)據(jù)庫,存儲(chǔ)用戶信息
    Map<String,InfoUser> infoUserMap = new HashMap<>();

    public List<InfoUser> insertInfoUser(InfoUser infoUser) {
        logger.info("新增用戶信息:{}", JSONObject.toJSONString(infoUser));
        infoUserMap.put(infoUser.getId(),infoUser);
        return getInfoUserList();
    }
    public InfoUser getInfoUserById(String id) {
        InfoUser infoUser = infoUserMap.get(id);
        logger.info("查詢用戶ID:{}",id);
        return infoUser;
    }

    public List<InfoUser> getInfoUserList() {
        List<InfoUser> userList = new ArrayList<>();
        Iterator<Map.Entry<String, InfoUser>> iterator = infoUserMap.entrySet().iterator();
        while (iterator.hasNext()){
            Map.Entry<String, InfoUser> next = iterator.next();
            userList.add(next.getValue());
        }
        logger.info("返回用戶信息記錄數(shù):{}",userList.size());
        return userList;
    }
    public void deleteInfoUserById(String id) {
        logger.info("刪除用戶信息:{}",JSONObject.toJSONString(infoUserMap.remove(id)));
    }
    public String getNameById(String id){
        logger.info("根據(jù)ID查詢用戶名稱:{}",id);
        return infoUserMap.get(id).getName();
    }
    public Map<String,InfoUser> getAllUser(){
        logger.info("查詢所有用戶信息{}",infoUserMap.keySet().size());
        return infoUserMap;
    }
}

注解@RpcService定義如下:

package com.viewscenes.netsupervisor.annotation;

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {}

3命咐、請(qǐng)求信息和返回信息

所有的請(qǐng)求信息和返回信息篡九,我們用兩個(gè) JavaBean 來表示。其中的重點(diǎn)是醋奠,返回信息要帶有請(qǐng)求信息的ID榛臼。

package com.viewscenes.netsupervisor.entity;
public class Request {
    private String id;
    private String className;// 類名
    private String methodName;// 函數(shù)名稱
    private Class<?>[] parameterTypes;// 參數(shù)類型
    private Object[] parameters;// 參數(shù)列表
    get/set ...
}
package com.viewscenes.netsupervisor.entity;
public class Response {
    private String requestId;
    private int code;
    private String error_msg;
    private Object data;
    get/set ...
}

4、Netty 服務(wù)端

Netty 作為高性能的 NIO 通信框架窜司,在很多 RPC 框架中都有它的身影沛善。我們也采用它當(dāng)做通信服務(wù)器。說到這塞祈,我們先看個(gè)配置文件金刁,重點(diǎn)有兩個(gè),zookeeper 的注冊(cè)地址和 Netty 通信服務(wù)器的地址议薪。

#TOMCAT端口
server.port=8001
#zookeeper注冊(cè)地址
registry.address=192.168.174.10:2181
#RPC服務(wù)提供者地址
rpc.server.address=192.168.210.81:18868

為了方便管理尤蛮,我們把它也注冊(cè)成 Bean,同時(shí)實(shí)現(xiàn) ApplicationContextAware 接口斯议,把上面 @RpcService 注解的服務(wù)類撈出來抵屿,緩存起來,供消費(fèi)者調(diào)用捅位。同時(shí)轧葛,作為服務(wù)器,還要對(duì)客戶端的鏈路進(jìn)行心跳檢測(cè)艇搀,超過60秒未讀寫數(shù)據(jù)尿扯,關(guān)閉此連接。

package com.viewscenes.netsupervisor.netty.server;

import com.viewscenes.netsupervisor.annotation.RpcService;
import com.viewscenes.netsupervisor.netty.codec.json.JSONDecoder;
import com.viewscenes.netsupervisor.netty.codec.json.JSONEncoder;
import com.viewscenes.netsupervisor.registry.ServiceRegistry;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;

@Component
public class NettyServer implements ApplicationContextAware,InitializingBean{

    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
    private static final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    private static final EventLoopGroup workerGroup = new NioEventLoopGroup(4);

    private Map<String, Object> serviceMap = new HashMap<>();

    @Value("${rpc.server.address}")
    private String serverAddress;

    @Autowired
    ServiceRegistry registry;

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

        Map<String, Object> beans = applicationContext.getBeansWithAnnotation(RpcService.class);
        for(Object serviceBean:beans.values()){

            Class<?> clazz = serviceBean.getClass();

            Class<?>[] interfaces = clazz.getInterfaces();

            for (Class<?> inter : interfaces){
                String interfaceName = inter.getName();
                logger.info("加載服務(wù)類: {}", interfaceName);
                serviceMap.put(interfaceName, serviceBean);
            }
        }
        logger.info("已加載全部服務(wù)接口:{}", serviceMap);
    }

    public void afterPropertiesSet() throws Exception {
        start();
    }

    public void start(){

        final NettyServerHandler handler = new NettyServerHandler(serviceMap);

        new Thread(() -> {
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup,workerGroup).
                        channel(NioServerSocketChannel.class).
                        option(ChannelOption.SO_BACKLOG,1024).
                        childOption(ChannelOption.SO_KEEPALIVE,true).
                        childOption(ChannelOption.TCP_NODELAY,true).
                        childHandler(new ChannelInitializer<SocketChannel>() {
                            //創(chuàng)建NIOSocketChannel成功后焰雕,在進(jìn)行初始化時(shí)衷笋,將它的ChannelHandler設(shè)置到ChannelPipeline中,用于處理網(wǎng)絡(luò)IO事件
                            protected void initChannel(SocketChannel channel) throws Exception {
                                ChannelPipeline pipeline = channel.pipeline();
                                pipeline.addLast(new IdleStateHandler(0, 0, 60));
                                pipeline.addLast(new JSONEncoder());
                                pipeline.addLast(new JSONDecoder());
                                pipeline.addLast(new HeartBeatHandler());
                                pipeline.addLast(handler);
                            }
                        });

                String[] array = serverAddress.split(":");
                String host = array[0];
                int port = Integer.parseInt(array[1]);
                ChannelFuture cf = bootstrap.bind(host,port).sync();
                logger.info("RPC 服務(wù)器啟動(dòng).監(jiān)聽端口:"+port);
                registry.register(serverAddress);
                //等待服務(wù)端監(jiān)聽端口關(guān)閉
                cf.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }).start();
    }
}

setApplicationContext方法矩屁,將被 @RpcService 注解的服務(wù)類辟宗,存儲(chǔ)在 serviceMap 中。start方法吝秕,啟動(dòng) Netty 服務(wù)端泊脐。new IdleStateHandler(0, 0, 60)檢測(cè)心跳機(jī)制,表示 60s 內(nèi)如果沒有接收到客戶端的讀寫請(qǐng)求烁峭,將走ChannelInboundHandlerAdapter.userEventTriggered方法容客。于是我們自定義HeartBeatHandler心跳處理器秕铛,來重寫userEventTriggered方法,將連接關(guān)閉缩挑。

package com.viewscenes.netsupervisor.netty.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 用于檢測(cè)channel的心跳handler
 * 繼承ChannelInboundHandlerAdapter但两,從而不需要實(shí)現(xiàn)channelRead0 方法
 * @author K. L. Mao
 * @create 2019/2/22
 */
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {

    private final Logger logger = LoggerFactory.getLogger(HeartBeatHandler.class);

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent)evt;
            if (event.state()== IdleState.ALL_IDLE){
                logger.info("客戶端已超過60秒未讀寫數(shù)據(jù),關(guān)閉連接.{}",ctx.channel().remoteAddress());
                ctx.channel().close();
            }
        }
    }
}

在處理器中的構(gòu)造函數(shù)中,我們先把服務(wù) Bean 的 serviceMap 傳進(jìn)來供置,所有的處理要基于這個(gè) serviceMap 才能找到對(duì)應(yīng)的實(shí)現(xiàn)類谨湘。在channelRead中,獲取請(qǐng)求方法的信息芥丧,然后通過反射調(diào)用方法獲取返回值悲关。

package com.viewscenes.netsupervisor.netty.server;

import com.alibaba.fastjson.JSON;
import com.viewscenes.netsupervisor.entity.Request;
import com.viewscenes.netsupervisor.entity.Response;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Method;
import java.util.Map;

/**
 * @program: rpc-provider
 * @description: ${description}
 * @author: shiqizhen
 * @create: 2018-11-30 17:27
 **/
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    private final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
    private final Map<String, Object> serviceMap;

    public NettyServerHandler(Map<String, Object> serviceMap) {
        this.serviceMap = serviceMap;
    }

    public void channelActive(ChannelHandlerContext ctx)   {
        logger.info("客戶端連接成功!"+ctx.channel().remoteAddress());
    }

    public void channelInactive(ChannelHandlerContext ctx)   {
        logger.info("客戶端斷開連接!{}",ctx.channel().remoteAddress());
        ctx.channel().close();
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg)   {
        Request request = JSON.parseObject(msg.toString(),Request.class);

        if ("heartBeat".equals(request.getMethodName())) {
            logger.info("客戶端心跳信息..."+ctx.channel().remoteAddress());
        }else{
            logger.info("RPC客戶端請(qǐng)求接口:"+request.getClassName()+"   方法名:"+request.getMethodName());
            Response response = new Response();
            response.setRequestId(request.getId());
            try {
                Object result = this.handler(request);
                response.setData(result);
            } catch (Throwable e) {
                e.printStackTrace();
                response.setCode(1);
                response.setError_msg(e.toString());
                logger.error("RPC Server handle request error",e);
            }
            ctx.writeAndFlush(response);
        }
    }

    /**
     * 通過反射,執(zhí)行本地方法
     * @param request
     * @return
     * @throws Throwable
     */
    private Object handler(Request request) throws Throwable{
        String className = request.getClassName();
        Object serviceBean = serviceMap.get(className);

        if (serviceBean!=null){
            Class<?> serviceClass = serviceBean.getClass();
            String methodName = request.getMethodName();
            Class<?>[] parameterTypes = request.getParameterTypes();
            Object[] parameters = request.getParameters();

            Method method = serviceClass.getMethod(methodName, parameterTypes);
            method.setAccessible(true);
            return method.invoke(serviceBean, getParameters(parameterTypes,parameters));
        }else{
            throw new Exception("未找到服務(wù)接口,請(qǐng)檢查配置!:"+className+"#"+request.getMethodName());
        }
    }

    /**
     * 獲取參數(shù)列表
     * @param parameterTypes
     * @param parameters
     * @return
     */
    private Object[] getParameters(Class<?>[] parameterTypes,Object[] parameters){
        if (parameters==null || parameters.length==0){
            return parameters;
        }else{
            Object[] new_parameters = new Object[parameters.length];
            for(int i=0;i<parameters.length;i++){
                new_parameters[i] = JSON.parseObject(parameters[i].toString(),parameterTypes[i]);
            }
            return new_parameters;
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)   {
        logger.info(cause.getMessage());
        ctx.close();
    }
}

4娄柳、服務(wù)注冊(cè)

我們啟動(dòng)了 Netty 通信服務(wù)器寓辱,并且把服務(wù)實(shí)現(xiàn)類加載到緩存,等待請(qǐng)求時(shí)調(diào)用赤拒。這一步秫筏,我們要進(jìn)行服務(wù)注冊(cè)。為了簡(jiǎn)單化處理挎挖,我們只注冊(cè)通信服務(wù)器的監(jiān)聽地址即可这敬。
在上面代碼中,bind 之后我們執(zhí)行了registry.register(serverAddress);它的作用就是蕉朵,將 Netty 監(jiān)聽的 IP 端口注冊(cè)到 zookeeper崔涂。

package com.viewscenes.netsupervisor.registry;
@Component
public class ServiceRegistry {
    Logger logger = LoggerFactory.getLogger(this.getClass());
    @Value("${registry.address}")
    private String registryAddress;
    private static final String ZK_REGISTRY_PATH = "/rpc";

    public void register(String data) {
        if (data != null) {
            ZkClient client = connectServer();
            if (client != null) {
                AddRootNode(client);
                createNode(client, data);
            }
        }
    }
    //連接zookeeper
    private ZkClient connectServer() {
        ZkClient client = new ZkClient(registryAddress,20000,20000);
        return client;
    }
    //創(chuàng)建根目錄/rpc
    private void AddRootNode(ZkClient client){
        boolean exists = client.exists(ZK_REGISTRY_PATH);
        if (!exists){
            client.createPersistent(ZK_REGISTRY_PATH);
            logger.info("創(chuàng)建zookeeper主節(jié)點(diǎn) {}",ZK_REGISTRY_PATH);
        }
    }
    //在/rpc根目錄下,創(chuàng)建臨時(shí)順序子節(jié)點(diǎn)
    private void createNode(ZkClient client, String data) {
        String path = client.create(ZK_REGISTRY_PATH + "/provider", data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        logger.info("創(chuàng)建zookeeper數(shù)據(jù)節(jié)點(diǎn) ({} => {})", path, data);
    }
}

有一點(diǎn)需要注意始衅,子節(jié)點(diǎn)必須是臨時(shí)節(jié)點(diǎn)冷蚂。這樣,生產(chǎn)者端停掉之后汛闸,才能通知到消費(fèi)者蝙茶,把此服務(wù)從服務(wù)列表中剔除。到此為止诸老,生產(chǎn)者端已經(jīng)完成隆夯。我們看一下它的啟動(dòng)日志:

加載服務(wù)類: com.viewscenes.netsupervisor.service.InfoUserService
已加載全部服務(wù)接口:{com.viewscenes.netsupervisor.service.InfoUserService=com.viewscenes.netsupervisor.service.impl.InfoUserServiceImpl@46cc127b}
Initializing ExecutorService 'applicationTaskExecutor'
Tomcat started on port(s): 8001 (http) with context path ''
Started RpcProviderApplication in 2.003 seconds (JVM running for 3.1)
RPC 服務(wù)器啟動(dòng).監(jiān)聽端口:18868
Starting ZkClient event thread.
Socket connection established to node1/192.168.174.10:2181, initiating session
Session establishment complete on server node1/192.168.174.10:2181, sessionid = 0x367835b48970010, negotiated timeout = 4000
zookeeper state changed (SyncConnected)
創(chuàng)建zookeeper主節(jié)點(diǎn) /rpc
創(chuàng)建zookeeper數(shù)據(jù)節(jié)點(diǎn) (/rpc/provider0000000000 => 192.168.210.81:18868)

四、RPC 消費(fèi)者

首先别伏,我們需要把生產(chǎn)者端的服務(wù)接口API蹄衷,即InfoUserService。以相同的目錄放到消費(fèi)者端厘肮。路徑不同愧口,調(diào)用會(huì)找不到的哦(實(shí)際項(xiàng)目上,是通過依賴 jar 包實(shí)現(xiàn)的)轴脐。

1调卑、代理

RPC的目標(biāo)其中有一條抡砂,程序員無需額外地為這個(gè)交互作用編程大咱。所以恬涧,我們?cè)谡{(diào)用的時(shí)候,就像調(diào)用本地方法一樣碴巾。就像下面這樣:

@Controller
public class IndexController {  
    @Autowired
    private InfoUserService userService;
    
    @RequestMapping("getById")
    @ResponseBody
    public InfoUser getById(String id){
        logger.info("根據(jù)ID查詢用戶信息:{}",id);
        return userService.getInfoUserById(id);
    }
}

那么溯捆,問題來了。消費(fèi)者端并沒有此接口的實(shí)現(xiàn)厦瓢,怎么調(diào)用到的呢提揍?這里,首先就是代理煮仇。這里用的是 Spring 的工廠 Bean 機(jī)制創(chuàng)建的代理對(duì)象(JDK 動(dòng)態(tài)代理)劳跃,類似于 MyBatis 中的 Mapper 接口的調(diào)用。

首先浙垫,創(chuàng)建代理類(必須實(shí)現(xiàn)InvocationHandler):

package com.viewscenes.netsupervisor.configurer.rpc;

@Component
public class RpcFactory implements InvocationHandler {

    @Autowired
    private NettyClient client;

    Logger logger = LoggerFactory.getLogger(this.getClass());

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Request request = new Request();
        request.setClassName(method.getDeclaringClass().getName());
        request.setMethodName(method.getName());
        request.setParameters(args);
        request.setParameterTypes(method.getParameterTypes());
        request.setId(IdUtil.getId());

        Object result = client.send(request);
        Class<?> returnType = method.getReturnType();

        Response response = JSON.parseObject(result.toString(), Response.class);
        if (response.getCode()==1){
            throw new Exception(response.getError_msg());
        }
        if (returnType.isPrimitive() || String.class.isAssignableFrom(returnType)){
            return response.getData();
        }else if (Collection.class.isAssignableFrom(returnType)){
            return JSONArray.parseArray(response.getData().toString(),Object.class);
        }else if(Map.class.isAssignableFrom(returnType)){
            return JSON.parseObject(response.getData().toString(),Map.class);
        }else{
            Object data = response.getData();
            return JSONObject.parseObject(data.toString(), returnType);
        }
    }
}

這個(gè)代理類的invoke方法刨仑,會(huì)將客戶端的Request發(fā)送給Netty 服務(wù)端,并接收服務(wù)端的返回值夹姥。

定義一個(gè) RPC 工廠 Bean:

package com.viewscenes.netsupervisor.configurer.rpc;

import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.support.AbstractBeanFactory;
import org.springframework.context.support.AbstractApplicationContext;

import java.lang.reflect.Proxy;

/**
 * Created by MACHENIKE on 2018-12-03.
 */
public class RpcFactoryBean<T> implements FactoryBean<T> {

    private Class<T> rpcInterface;

    @Autowired
    private RpcFactory<T> factory;

    /**
     * {@link AbstractApplicationContext#registerBeanPostProcessors(org.springframework.beans.factory.config.ConfigurableListableBeanFactory)}
     * 會(huì)通過 rpcInterface 實(shí)例化 RpcFactoryBean
     * @param rpcInterface
     */
    public RpcFactoryBean(Class<T> rpcInterface) {
        this.rpcInterface = rpcInterface;
    }

    /**
     * 把 Bean 的定義 GenericBeanDefinition 放到了容器之后杉武,就需要初始化這些 Bean,
     * 而 Bean 的初始化時(shí)機(jī)有2個(gè):
     *      1、在程序第一個(gè)主動(dòng)調(diào)用 getBean 的時(shí)候
     *      2辙售、在完成容器初始化的時(shí)候會(huì)初始化 lazy-init 配置為 false 的Bean(默認(rèn)為false)
     * 在這里轻抱,由于 RpcFactoryBean 未設(shè)置懶加載,故初始化的時(shí)機(jī)是第二種旦部。上面兩種初始化的過程都是一樣的祈搜,
     * 都會(huì)調(diào)用 {@link AbstractBeanFactory#doGetBean(java.lang.String, java.lang.Class, java.lang.Object[], boolean) 方法,
     * 里面有個(gè)方法 getObjectForBeanInstance士八,會(huì)判斷當(dāng)前的 Bean 是否實(shí)現(xiàn)了 {@link FactoryBean}夭问。
     * 如果該 Bean 未實(shí)現(xiàn) FactoryBean 接口,則直接返回該Bean實(shí)例曹铃;
     * 如果該 Bean 實(shí)現(xiàn)了 FactoryBean 接口缰趋,則會(huì)返回的實(shí)例是 getObject() 返回值。
     * @return
     * @throws Exception
     */
    public T getObject() throws Exception {
        return getRpc();
    }

    public Class<?> getObjectType() {
        return this.rpcInterface;
    }

    public boolean isSingleton() {
        return true;
    }

    /**
     * JDK 動(dòng)態(tài)代理陕见,當(dāng)調(diào)用 rpcInterface 接口的方法時(shí)秘血,會(huì)走 factory 的 invoke 方法
     * @param <T>
     * @return
     */
    public <T> T getRpc() {
        return (T) Proxy.newProxyInstance(rpcInterface.getClassLoader(), new Class[] { rpcInterface },factory);
    }
}

該類實(shí)現(xiàn)了FactoryBean接口,表示這個(gè)類是工廠 Bean评甜,它在 Spring 容器存放的實(shí)例不是類本身灰粮,而是getObject的返回值。這里聲明了一個(gè)參數(shù)構(gòu)造器忍坷,目的是在程序啟動(dòng)過程中粘舟,調(diào)用AbstractApplicationContext.refresh方法熔脂,refresh方法里面會(huì)走registerBeanPostProcessors方法,該方法會(huì)通過反射柑肴,把rpcInterface傳過來實(shí)例化 RpcFactoryBean霞揉。

接下來,我們就要定義一個(gè)路徑掃描類晰骑,來掃描指定路徑下的接口适秩,生成 Bean 的定義 BeanDefinition,并放進(jìn)容器硕舆。

package com.viewscenes.netsupervisor.configurer.rpc;

import org.springframework.beans.factory.annotation.AnnotatedBeanDefinition;
import org.springframework.beans.factory.config.BeanDefinitionHolder;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.GenericBeanDefinition;
import org.springframework.context.annotation.ClassPathBeanDefinitionScanner;
import org.springframework.core.type.classreading.MetadataReader;
import org.springframework.core.type.classreading.MetadataReaderFactory;
import org.springframework.core.type.filter.AnnotationTypeFilter;
import org.springframework.core.type.filter.TypeFilter;

import java.io.IOException;
import java.lang.annotation.Annotation;
import java.util.Arrays;
import java.util.Set;

/**
 * Created by MACHENIKE on 2018-12-03.
 */
public class ClassPathRpcScanner extends ClassPathBeanDefinitionScanner{

    public ClassPathRpcScanner(BeanDefinitionRegistry registry) {
        super(registry);
    }

    public Set<BeanDefinitionHolder> doScan(String... basePackages) {
        // 獲取指定路徑下的 beanDefinitions
        Set<BeanDefinitionHolder> beanDefinitions = super.doScan(basePackages);

        if (beanDefinitions.isEmpty()) {
            logger.warn("No RPC mapper was found in '"
                    + Arrays.toString(basePackages)
                    + "' package. Please check your configuration.");
        } else {
            // 對(duì) beanDefinitions 進(jìn)行注冊(cè)
            processBeanDefinitions(beanDefinitions);
        }

        return beanDefinitions;
    }

    /**
     * 方法會(huì)根據(jù)配置的屬性生成對(duì)應(yīng)的過濾器秽荞,然后這些過濾器在掃描的時(shí)候會(huì)起作用。
     */
    public void registerFilters() {
        // default include filter that accepts all classes 接收所有接口
        addIncludeFilter((metadataReader, metadataReaderFactory) ->
                true);

        // exclude package-info.java
        addExcludeFilter((metadataReader, metadataReaderFactory) -> {
            String className = metadataReader.getClassMetadata()
                    .getClassName();
            return className.endsWith("package-info");
        });
    }

    private void processBeanDefinitions(
            Set<BeanDefinitionHolder> beanDefinitions) {

        GenericBeanDefinition definition;

        for (BeanDefinitionHolder holder : beanDefinitions) {

            definition = (GenericBeanDefinition) holder.getBeanDefinition();
            definition.getConstructorArgumentValues().addGenericArgumentValue(definition.getBeanClassName());
            // 設(shè)置接口的 beanClass 都為 RpcFactoryBean<?>抚官,因?yàn)?RpcFactoryBean 實(shí)現(xiàn)了 FactoryBean 接口扬跋,這樣初始化 Bean 時(shí)就會(huì)調(diào)用 getObject 方法
            definition.setBeanClass(RpcFactoryBean.class);
            // 設(shè)置BeanDefinition自動(dòng)注入類型,這樣就能被 Spring 管理了
            definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
            System.out.println(holder);
        }
    }

    @Override
    protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
        return beanDefinition.getMetadata().isInterface() && beanDefinition.getMetadata().isIndependent();
    }
}

doScan方法凌节,是掃描出指定路徑下的BeanDefinitionHolder钦听,然后執(zhí)行processBeanDefinitions(beanDefinitions),對(duì)BeanDefinitionHolder進(jìn)行注冊(cè)(放入容器管理)刊咳。
registerFilters方法的目的是彪见,根據(jù)配置的屬性生成對(duì)應(yīng)的過濾器,然后這些過濾器在掃描的時(shí)候會(huì)起作用娱挨。本案例中由于沒有配置任何屬性余指,故生成接收所有接口的過濾器。

接下來就是要將掃描器ClassPathRpcScanner放入配置類跷坝,讓其生效了酵镜。

package com.viewscenes.netsupervisor.configurer.rpc;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

/**
 * Created by MACHENIKE on 2018-12-03.
 */
@Component
public class RpcScannerConfigurer implements BeanDefinitionRegistryPostProcessor {

    // 這個(gè)可以存在配置文件
    String basePackage = "com.viewscenes.netsupervisor.service";

    /**
     * 掃描指定路徑下的接口,生成 Bean 的定義 GenericBeanDefinition柴钻,并放到了容器
     * @param beanDefinitionRegistry
     * @throws BeansException
     */
    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException {
        ClassPathRpcScanner scanner = new ClassPathRpcScanner(beanDefinitionRegistry);

        scanner.registerFilters();

        scanner.scan(StringUtils.tokenizeToStringArray(this.basePackage, ConfigurableApplicationContext.CONFIG_LOCATION_DELIMITERS));
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {

    }
}

該配置類實(shí)現(xiàn)BeanDefinitionRegistryPostProcessor淮韭,重寫postProcessBeanDefinitionRegistry方法。然后引入ClassPathRpcScanner贴届,調(diào)用其registerFiltersscan方法進(jìn)行BeanDefinition的注冊(cè)靠粪。

注冊(cè)完成之后,只是把 Bean 的定義BeanDefinition放到了容器毫蚓,還沒有初始化這些 Bean占键,而 Bean 的初始化時(shí)機(jī)有2個(gè):

  • 1、在程序第一個(gè)主動(dòng)調(diào)用getBean的時(shí)候
  • 2元潘、在完成容器初始化的時(shí)候會(huì)初始化 lazy-init 配置為 false 的Bean(默認(rèn)為false)

由于RpcFactoryBean未設(shè)置懶加載畔乙,故容器初始化完成的時(shí)候就會(huì)初始化RpcFactoryBean。初始化的過程中翩概,會(huì)調(diào)用AbstractBeanFactory.getBean方法牲距,這就涉及到 Spring IOC 中返咱,創(chuàng)建 Bean 的過程,直接上源碼:

protected <T> T doGetBean(
            final String name, final Class<T> requiredType, final Object[] args, boolean typeCheckOnly)
            throws BeansException {
         // 轉(zhuǎn)化beanName牍鞠,因?yàn)?FactoryBean 是以“&”前綴的咖摹,需要去掉
        final String beanName = transformedBeanName(name);
        Object bean;

        // 創(chuàng)建單例的 Bean
        Object sharedInstance = getSingleton(beanName);
        if (sharedInstance != null && args == null) {
            if (logger.isDebugEnabled()) {
                if (isSingletonCurrentlyInCreation(beanName)) {
                    logger.debug("Returning eagerly cached instance of singleton bean '" + beanName +
                            "' that is not fully initialized yet - a consequence of a circular reference");
                }
                else {
                    logger.debug("Returning cached instance of singleton bean '" + beanName + "'");
                }
            }
            // 根據(jù)實(shí)例信息獲取真正的實(shí)例,因?yàn)?FactotyBean 的實(shí)例不是 sharedInstance皮服,而是其 getObect() 的返回值
            bean = getObjectForBeanInstance(sharedInstance, name, beanName, null);
        }
        ......

        // 校驗(yàn)類型與實(shí)例是否匹配
        if (requiredType != null && bean != null && !requiredType.isInstance(bean)) {
            try {
                return getTypeConverter().convertIfNecessary(bean, requiredType);
            }
            catch (TypeMismatchException ex) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Failed to convert bean '" + name + "' to required type '" +
                            ClassUtils.getQualifiedName(requiredType) + "'", ex);
                }
                throw new BeanNotOfRequiredTypeException(name, requiredType, bean.getClass());
            }
        }
        return (T) bean;
    }

doGetBean里面有個(gè)方法 getObjectForBeanInstance楞艾,會(huì)判斷當(dāng)前的 Bean 是否實(shí)現(xiàn)了FactoryBean参咙。
如果該 Bean 未實(shí)現(xiàn)FactoryBean接口龄广,則直接返回該 Bean 實(shí)例;
如果該 Bean 實(shí)現(xiàn)了FactoryBean接口蕴侧,則會(huì)返回的實(shí)例是getObject()返回值择同。

protected Object getObjectForBeanInstance(
            Object beanInstance, String name, String beanName, RootBeanDefinition mbd) {

        // 非 FactoryBean 不能以“&”為前綴
        if (BeanFactoryUtils.isFactoryDereference(name) && !(beanInstance instanceof FactoryBean)) {
            throw new BeanIsNotAFactoryException(transformedBeanName(name), beanInstance.getClass());
        }

        // 驗(yàn)證是否是 FactoryBean,如果不是净宵,直接返回實(shí)例
        if (!(beanInstance instanceof FactoryBean) || BeanFactoryUtils.isFactoryDereference(name)) {
            return beanInstance;
        }

        Object object = null;
        if (mbd == null) {
            object = getCachedObjectForFactoryBean(beanName);
        }
        if (object == null) {
            // Return bean instance from factory.
            FactoryBean<?> factory = (FactoryBean<?>) beanInstance;
            // Caches object obtained from FactoryBean if it is a singleton.
            if (mbd == null && containsBeanDefinition(beanName)) {
                mbd = getMergedLocalBeanDefinition(beanName);
            }
            boolean synthetic = (mbd != null && mbd.isSynthetic());
            // 返回 FactoryBean 實(shí)例
            object = getObjectFromFactoryBean(factory, beanName, !synthetic);
        }
        return object;
    }

由于我們的RpcFactoryBean實(shí)現(xiàn)了FactoryBean敲才,故其初始化的過程中,返回的實(shí)例是getObject()的返回值择葡。我們可以看到紧武,getObject()的實(shí)現(xiàn)使用了 JDK 動(dòng)態(tài)代理(T) Proxy.newProxyInstance(rpcInterface.getClassLoader(), new Class[] { rpcInterface },factory),返回值為被代理對(duì)象rpcInterface的實(shí)例敏储。于是阻星,當(dāng)rpcInterface接口調(diào)用其方法時(shí),就會(huì)走RpcFactory.invoke方法已添。在這里妥箕,封裝請(qǐng)求信息,然后調(diào)用 Netty 的客戶端方法發(fā)送消息更舞。然后根據(jù)方法返回值類型畦幢,轉(zhuǎn)成相應(yīng)的對(duì)象返回。

2缆蝉、服務(wù)發(fā)現(xiàn)

在生產(chǎn)者端宇葱,我們把服務(wù) IP、端口都注冊(cè)到 zookeeper 中刊头,所以這里黍瞧,我們要去拿到服務(wù)地址,然后通過 Netty 連接芽偏。重要的是雷逆,還要對(duì)根目錄進(jìn)行監(jiān)聽子節(jié)點(diǎn)數(shù)據(jù)變化,這樣隨著生產(chǎn)者的上線和下線污尉,消費(fèi)者端可以及時(shí)感知膀哲。

package com.viewscenes.netsupervisor.connection;

@Component
public class ServiceDiscovery {

    @Value("${registry.address}")
    private String registryAddress;
    @Autowired
    ConnectManage connectManage;

    // 服務(wù)地址列表
    private volatile List<String> addressList = new ArrayList<>();
    private static final String ZK_REGISTRY_PATH = "/rpc";
    private ZkClient client;

    Logger logger = LoggerFactory.getLogger(this.getClass());

    @PostConstruct
    public void init(){
        client = connectServer();
        if (client != null) {
            watchNode(client);
        }
    }
    
    //連接zookeeper
    private ZkClient connectServer() {
        ZkClient client = new ZkClient(registryAddress,30000,30000);
        return client;
    }
    //監(jiān)聽子節(jié)點(diǎn)變化(子節(jié)點(diǎn)的增加和刪除)
    private void watchNode(final ZkClient client) {
        List<String> nodeList = client.subscribeChildChanges(ZK_REGISTRY_PATH, (s, nodes) -> {
            logger.info("監(jiān)聽到子節(jié)點(diǎn)變化{}",JSONObject.toJSONString(nodes));
            addressList.clear();
            getNodeData(nodes);
            updateConnectedServer();
        });
        getNodeData(nodeList);
        logger.info("已發(fā)現(xiàn)服務(wù)列表...{}", JSONObject.toJSONString(addressList));
        updateConnectedServer();
    }
    //連接生產(chǎn)者端服務(wù)
    private void updateConnectedServer(){
        connectManage.updateConnectServer(addressList);
    }

    private void getNodeData(List<String> nodes){
        logger.info("/rpc子節(jié)點(diǎn)數(shù)據(jù)為:{}", JSONObject.toJSONString(nodes));
        for(String node:nodes){
            String address = client.readData(ZK_REGISTRY_PATH+"/"+node);
            addressList.add(address);
        }
    }
}

其中往产,connectManage.updateConnectServer(addressList);就是根據(jù)服務(wù)地址,去連接生產(chǎn)者端的 Netty 服務(wù)某宪。然后創(chuàng)建一個(gè) Channel 列表仿村,在發(fā)送消息的時(shí)候,從中選取一個(gè) Channel 和生產(chǎn)者端進(jìn)行通信兴喂。

3蔼囊、Netty 客戶端

Netty 客戶端有兩個(gè)方法比較重要,一個(gè)是doConnect:根據(jù)IP衣迷、端口連接服務(wù)器畏鼓,返回Channel,加入到連接管理器壶谒;一個(gè)是send:用 Channel 發(fā)送請(qǐng)求數(shù)據(jù)云矫。同時(shí),作為客戶端汗菜,空閑的時(shí)候還要往服務(wù)端發(fā)送心跳信息让禀。

package com.viewscenes.netsupervisor.netty.client;

import com.alibaba.fastjson.JSONArray;
import com.viewscenes.netsupervisor.connection.ConnectManage;
import com.viewscenes.netsupervisor.entity.Request;
import com.viewscenes.netsupervisor.entity.Response;
import com.viewscenes.netsupervisor.netty.codec.json.JSONDecoder;
import com.viewscenes.netsupervisor.netty.codec.json.JSONEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

/**
 * Created by MACHENIKE on 2018-12-03.
 */
@Component
public class NettyClient {

    Logger logger = LoggerFactory.getLogger(this.getClass());

    private EventLoopGroup group = new NioEventLoopGroup(1);
    private Bootstrap bootstrap = new Bootstrap();

    @Autowired
    NettyClientHandler clientHandler;

    @Autowired
    ConnectManage connectManage;


    public NettyClient(){
        bootstrap.group(group).
                channel(NioSocketChannel.class).
                option(ChannelOption.TCP_NODELAY, true).
                option(ChannelOption.SO_KEEPALIVE,true).
                handler(new ChannelInitializer<SocketChannel>() {
                    //創(chuàng)建NIOSocketChannel成功后,在進(jìn)行初始化時(shí)陨界,將它的ChannelHandler設(shè)置到ChannelPipeline中巡揍,用于處理網(wǎng)絡(luò)IO事件
                    protected void initChannel(SocketChannel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
                        pipeline.addLast(new IdleStateHandler(0, 0, 30));
                        pipeline.addLast(new JSONEncoder());
                        pipeline.addLast(new JSONDecoder());
                        pipeline.addLast(new HeartBeatHandler());
                        pipeline.addLast(clientHandler);
                    }
                });
    }

    @PreDestroy
    public void destroy(){
        logger.info("RPC客戶端退出,釋放資源!");
        group.shutdownGracefully();
    }

    public Object send(Request request) throws InterruptedException{

        Channel channel = connectManage.chooseChannel();
        if (channel!=null && channel.isActive()) {
            SynchronousQueue<Object> queue = clientHandler.sendRequest(request,channel);
            Object result = queue.take();
            return JSONArray.toJSONString(result);
        }else{
            Response res = new Response();
            res.setCode(1);
            res.setError_msg("未正確連接到服務(wù)器.請(qǐng)檢查相關(guān)配置信息!");
            return JSONArray.toJSONString(res);
        }
    }
    public Channel doConnect(SocketAddress address) throws InterruptedException {
        ChannelFuture future = bootstrap.connect(address);
        Channel channel = future.sync().channel();
        return channel;
    }
}

這里,我們依然定義了一個(gè)心跳機(jī)制處理器HeartBeatHandler菌瘪,目的是在new IdleStateHandler(0, 0, 30)約定的30s內(nèi)客戶端未與服務(wù)端發(fā)生通信腮敌,為了告訴服務(wù)端該客戶端依然正常工作(因?yàn)榉?wù)端心跳檢測(cè)是60s),則客戶端需要發(fā)送心跳包給服務(wù)端麻车。

package com.viewscenes.netsupervisor.netty.client;

import com.viewscenes.netsupervisor.entity.Request;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 用于檢測(cè)channel的心跳handler
 * 繼承ChannelInboundHandlerAdapter缀皱,從而不需要實(shí)現(xiàn)channelRead0 方法
 * @author K. L. Mao
 * @create 2019/2/22
 */
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {

    private final Logger logger = LoggerFactory.getLogger(HeartBeatHandler.class);

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        logger.info("已超過30秒未與RPC服務(wù)器進(jìn)行讀寫操作!將發(fā)送心跳消息...");
        if (evt instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent)evt;
            if (event.state()== IdleState.ALL_IDLE){
                Request request = new Request();
                request.setMethodName("heartBeat");
                ctx.channel().writeAndFlush(request);
            }
        }
    }
}

我們必須重點(diǎn)關(guān)注send方法,它是在代理對(duì)象invoke方法調(diào)用到的动猬。首先從連接器中輪詢選擇一個(gè) Channel啤斗,然后發(fā)送數(shù)據(jù)。但是赁咙,Netty 是異步操作钮莲,我們還要轉(zhuǎn)為同步,就是說要等待生產(chǎn)者端返回?cái)?shù)據(jù)才往下執(zhí)行彼水。筆者在這里用的是同步隊(duì)列SynchronousQueue崔拥,它的take方法會(huì)阻塞在這里,直到里面有數(shù)據(jù)可讀凤覆。然后在處理器中链瓦,拿到返回信息寫到隊(duì)列中,take方法返回。

package com.viewscenes.netsupervisor.netty.client;

import com.alibaba.fastjson.JSON;
import com.viewscenes.netsupervisor.connection.ConnectManage;
import com.viewscenes.netsupervisor.entity.Request;
import com.viewscenes.netsupervisor.entity.Response;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.SynchronousQueue;

/**
 * Created by MACHENIKE on 2018-12-03.
 */
@Component
@ChannelHandler.Sharable
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    @Autowired
    NettyClient client;

    @Autowired
    ConnectManage connectManage;

    Logger logger = LoggerFactory.getLogger(this.getClass());

    private ConcurrentHashMap<String,SynchronousQueue<Object>> queueMap = new ConcurrentHashMap<>();

    public void channelActive(ChannelHandlerContext ctx) {
        logger.info("已連接到RPC服務(wù)器.{}",ctx.channel().remoteAddress());
    }

    public void channelInactive(ChannelHandlerContext ctx) {
        InetSocketAddress address =(InetSocketAddress) ctx.channel().remoteAddress();
        logger.info("與RPC服務(wù)器斷開連接."+address);
        ctx.channel().close();
        connectManage.removeChannel(ctx.channel());
    }

    /**
     * 接收服務(wù)端返回信息
     * @param ctx
     * @param msg
     * @throws Exception
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {
        Response response = JSON.parseObject(msg.toString(),Response.class);
        String requestId = response.getRequestId();
        SynchronousQueue<Object> queue = queueMap.get(requestId);
        queue.put(response);
        queueMap.remove(requestId);
    }

    public SynchronousQueue<Object> sendRequest(Request request,Channel channel) {
        SynchronousQueue<Object> queue = new SynchronousQueue<>();
        queueMap.put(request.getId(), queue);
        channel.writeAndFlush(request);
        return queue;
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
        logger.info("RPC通信服務(wù)器發(fā)生異常.{}",cause);
        ctx.channel().close();
    }
}

至此慈俯,消費(fèi)者端也基本完成兔朦。同樣的蜡感,我們先看一下啟動(dòng)日志:

Waiting for keeper state SyncConnected
Opening socket connection to server 192.168.174.10:2181. Will not attempt to authenticate using SASL (unknown error)
Socket connection established to 192.168.174.10:2181, initiating session
Session establishment complete on server 192.168.174.10:2181, sessionid = 0x100000273ba002c, negotiated timeout = 20000
zookeeper state changed (SyncConnected)
/rpc子節(jié)點(diǎn)數(shù)據(jù)為:["provider0000000015"]
已發(fā)現(xiàn)服務(wù)列表...["192.168.210.81:18868"]
加入Channel到連接管理器./192.168.100.74:18868
已連接到RPC服務(wù)器./192.168.210.81:18868
Initializing ExecutorService 'applicationTaskExecutor'
Tomcat started on port(s): 7002 (http) with context path ''
Started RpcConsumerApplication in 4.218 seconds (JVM running for 5.569)

五、總結(jié)

本文簡(jiǎn)單介紹了 RPC 的整個(gè)流程是晨,如果你正在學(xué)習(xí) RPC 的相關(guān)知識(shí)媳瞪,可以根據(jù)文中的例子缝裤,自己實(shí)現(xiàn)一遍刻肄。相信寫完之后阐污,你會(huì)對(duì) RPC 會(huì)有更深一些的認(rèn)識(shí)。

生產(chǎn)者端流程:

  • 加載服務(wù)突梦,并緩存
  • 啟動(dòng)通訊服務(wù)器(Netty)
  • 服務(wù)注冊(cè)(把通訊地址放入 zookeeper诫舅,也可以把加載到的服務(wù)也放進(jìn)去)
  • 反射,本地調(diào)用

消費(fèi)者端流程:

  • 代理服務(wù)接口
  • 服務(wù)發(fā)現(xiàn)(連接 zookeeper阳似,拿到服務(wù)地址列表)
  • 遠(yuǎn)程調(diào)用(輪詢生產(chǎn)者服務(wù)列表骚勘,發(fā)送消息)

消費(fèi)端調(diào)用服務(wù)流程:由于消費(fèi)端使用了 JDK 動(dòng)態(tài)代理(默認(rèn)是使用 javassist 生成字節(jié)碼包做的代理)铐伴,代理了服務(wù)接口撮奏,于是當(dāng)調(diào)用服務(wù)接口時(shí),會(huì)走代理類的invoke方法当宴,invoke方法會(huì)將接口信息通過 Netty 發(fā)送給服務(wù)端畜吊,服務(wù)端首先通過接口名找到其實(shí)現(xiàn)類(內(nèi)部保存了映射關(guān)系),然后通過反射執(zhí)行本地實(shí)現(xiàn)類的方法户矢,最后將返回結(jié)果通過 Netty 發(fā)送給消費(fèi)端玲献。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市梯浪,隨后出現(xiàn)的幾起案子捌年,更是在濱河造成了極大的恐慌,老刑警劉巖挂洛,帶你破解...
    沈念sama閱讀 206,482評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件礼预,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡虏劲,警方通過查閱死者的電腦和手機(jī)托酸,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來柒巫,“玉大人励堡,你說我怎么就攤上這事”ぬ停” “怎么了应结?”我有些...
    開封第一講書人閱讀 152,762評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)泉唁。 經(jīng)常有香客問我鹅龄,道長(zhǎng)币狠,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,273評(píng)論 1 279
  • 正文 為了忘掉前任砾层,我火速辦了婚禮漩绵,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘肛炮。我一直安慰自己止吐,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,289評(píng)論 5 373
  • 文/花漫 我一把揭開白布侨糟。 她就那樣靜靜地躺著碍扔,像睡著了一般。 火紅的嫁衣襯著肌膚如雪秕重。 梳的紋絲不亂的頭發(fā)上不同,一...
    開封第一講書人閱讀 49,046評(píng)論 1 285
  • 那天,我揣著相機(jī)與錄音溶耘,去河邊找鬼二拐。 笑死,一個(gè)胖子當(dāng)著我的面吹牛凳兵,可吹牛的內(nèi)容都是我干的百新。 我是一名探鬼主播,決...
    沈念sama閱讀 38,351評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼庐扫,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼饭望!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起形庭,我...
    開封第一講書人閱讀 36,988評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤铅辞,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后萨醒,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體斟珊,經(jīng)...
    沈念sama閱讀 43,476評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,948評(píng)論 2 324
  • 正文 我和宋清朗相戀三年验靡,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了倍宾。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,064評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡胜嗓,死狀恐怖高职,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情辞州,我是刑警寧澤怔锌,帶...
    沈念sama閱讀 33,712評(píng)論 4 323
  • 正文 年R本政府宣布,位于F島的核電站,受9級(jí)特大地震影響埃元,放射性物質(zhì)發(fā)生泄漏涝涤。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,261評(píng)論 3 307
  • 文/蒙蒙 一岛杀、第九天 我趴在偏房一處隱蔽的房頂上張望阔拳。 院中可真熱鬧,春花似錦类嗤、人聲如沸糊肠。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,264評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽货裹。三九已至,卻和暖如春精偿,著一層夾襖步出監(jiān)牢的瞬間弧圆,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,486評(píng)論 1 262
  • 我被黑心中介騙來泰國打工笔咽, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留搔预,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,511評(píng)論 2 354
  • 正文 我出身青樓拓轻,卻偏偏與公主長(zhǎng)得像斯撮,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子扶叉,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,802評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容

  • 在分析RMI原理一文中,我們知道RMI是通過底層封裝TCP網(wǎng)絡(luò)通信實(shí)現(xiàn)帕膜≡嫜酰基于此思路本文從以下切入點(diǎn)實(shí)現(xiàn)一個(gè)簡(jiǎn)單的R...
    匠丶閱讀 2,921評(píng)論 1 4
  • 在分布式服務(wù)框架中,一個(gè)最基礎(chǔ)的問題就是遠(yuǎn)程服務(wù)是怎么通訊的垮刹,在Java領(lǐng)域中有很多可實(shí)現(xiàn)遠(yuǎn)程通訊的技術(shù)达吞,例如:R...
    java菜閱讀 980評(píng)論 0 2
  • 八期行動(dòng)星球:第12天。QQ 115 目標(biāo):1.步行5000步 完成程度:目標(biāo)已完成荒典。
    靜心小站閱讀 145評(píng)論 0 0
  • 學(xué)校是我們學(xué)習(xí)和生活的地方酪劫,學(xué)校又成為校園是美麗的。 走進(jìn)學(xué)校大門寺董,首先映入眼簾的是正面的一棵棵高大的松樹覆糟,像忠實(shí)...
    許雅晶閱讀 315評(píng)論 0 0
  • 1.付出不亞于任何人的努力 2.要謙虛,不要驕傲 3.要每天反省 4.活著遮咖,就要感謝 5.積善行滩字,思利他 6.不要...
    六項(xiàng)精進(jìn)阿晉閱讀 194評(píng)論 0 0