源碼閱讀筆記:分布式服務(wù)框架XXL-RPC(基于1.4.1)todo

前言:接上篇若治,看完了注冊中心破加,該看看RPC框架了——《分布式服務(wù)框架XXL-RPC》

老樣子,想看看它自己怎么吹的

1.1 概述>

XXL-RPC 是一個分布式服務(wù)框架武学,提供穩(wěn)定高性能的RPC遠程服務(wù)調(diào)用功能绳泉。擁有"高性能逊抡、分布式、注冊中心零酪、負載均衡冒嫡、服務(wù)治理"等特性。現(xiàn)已開放源代碼四苇,開箱即用孝凌。>

1.2 特性>

  • 1、快速接入:接入步驟非常簡潔月腋,兩分鐘即可上手蟀架;
  • 2、服務(wù)透明:系統(tǒng)完整的封裝了底層通信細節(jié)榆骚,開發(fā)時調(diào)用遠程服務(wù)就像調(diào)用本地服務(wù)片拍,在提供遠程調(diào)用能力時不損失本地調(diào)用的語義簡潔性;
  • 3寨躁、多調(diào)用方案:支持 SYNC穆碎、ONEWAY、FUTURE职恳、CALLBACK 等方案;
  • 4方面、多通訊方案:支持 TCP 和 HTTP 兩種通訊方式進行服務(wù)調(diào)用放钦;其中 TCP 提供可選方案 NETTY 或 MINA ,HTTP 提供可選方案 NETTY_HTTP 或 Jetty恭金;
  • 5操禀、多序列化方案:支持 HESSIAN、HESSIAN1横腿、PROTOSTUFF颓屑、KRYO斤寂、JACKSON 等方案;
  • 6揪惦、負載均衡/軟負載:提供豐富的負載均衡策略遍搞,包括:輪詢、隨機器腋、LRU溪猿、LFU、一致性HASH等纫塌;
  • 7诊县、注冊中心:可選組件,支持服務(wù)注冊并動態(tài)發(fā)現(xiàn)措左;可選擇不啟用依痊,直接指定服務(wù)提供方機器地址通訊;選擇啟用時怎披,內(nèi)置可選方案:“XXL-REGISTRY 輕量級注冊中心”(推薦)抗悍、“ZK注冊中心”、“Local注冊中心”等钳枕;
  • 8缴渊、服務(wù)治理:提供服務(wù)治理中心,可在線管理注冊的服務(wù)信息鱼炒,如服務(wù)鎖定衔沼、禁用等;
  • 9昔瞧、服務(wù)監(jiān)控:可在線監(jiān)控服務(wù)調(diào)用統(tǒng)計信息以及服務(wù)健康狀況等(計劃中)指蚁;
  • 10、容錯:服務(wù)提供方集群注冊時自晰,某個服務(wù)節(jié)點不可用時將會自動摘除凝化,同時消費方將會移除失效節(jié)點將流量分發(fā)到其余節(jié)點,提高系統(tǒng)容錯能力酬荞。
  • 11搓劫、解決1+1問題:傳統(tǒng)分布式通訊一般通過nginx或f5做集群服務(wù)的流量負載均衡,每次請求在到達目標服務(wù)機器之前都需要經(jīng)過負載均衡機器混巧,即1+1枪向,這將會把流量放大一倍。而XXL-RPC將會從消費方直達服務(wù)提供方咧党,每次請求直達目標機器秘蛔,從而可以避免上述問題;
  • 12、高兼容性:得益于優(yōu)良的兼容性與模塊化設(shè)計深员,不限制外部框架负蠕;除 spring/springboot 環(huán)境之外,理論上支持運行在任何Java代碼中倦畅,甚至main方法直接啟動運行遮糖;
  • 13、泛化調(diào)用:服務(wù)調(diào)用方不依賴服務(wù)方提供的API滔迈;

還是老套路止吁,直接代碼下下來,跑個demo看看(ps:注冊中心它自己推薦的XXL-REGISTRY)
先看看provider的代碼燎悍。provider提供了一個簡單的sayHi方法敬惦,代碼如下。

@XxlRpcService
@Service
public class DemoServiceImpl implements DemoService {
    private static Logger logger = LoggerFactory.getLogger(DemoServiceImpl.class);

    @Override
    public UserDTO sayHi(String name) {

        String word = MessageFormat.format("Hi {0}, from {1} as {2}",
                name, DemoServiceImpl.class.getName(), String.valueOf(System.currentTimeMillis()));

        if ("error".equalsIgnoreCase(name)) throw new RuntimeException("test exception.");

        UserDTO userDTO = new UserDTO(name, word);
        logger.info(userDTO.toString());

        return userDTO;
    }

}

結(jié)著按照說明稍微配置一下注冊中心谈山,provider走你俄删!
成功啟動后可以在注冊中心看到剛剛啟動的provider


注冊中心

點開編輯看看


編輯

很好,看起來非常美好奏路,注冊key就是接口畴椰,注冊信息就是provider的url,看起來非常ok鸽粉。
那么我們再把consumer跑起來看看斜脂,配置同一個注冊中心,走你触机!
控制臺表示成功運行起來了帚戳。那么我們看看怎么測試一下遠程調(diào)用。
翻一翻consumer代碼儡首,看到一個controller片任,然后在controller里面會調(diào)用遠程方法。

@Controller
public class IndexController {
    
    @XxlRpcReference
    private DemoService demoService;


    @RequestMapping("")
    @ResponseBody
    public UserDTO http(String name) {

        try {
            return demoService.sayHi(name);
        } catch (Exception e) {
            e.printStackTrace();
            return new UserDTO(null, e.getMessage());
        }
    }

}

ok蔬胯,讓我們打開瀏覽器試一試对供。和預(yù)期的一樣,得到了provider的response


瀏覽器直接調(diào)用

這樣一個最簡單的遠程調(diào)用就完成了氛濒。
那么接下來产场,就該看看這些功能是怎么實現(xiàn)的了。先帖個框架自己對rpc的描述

4.4 RPC工作原理剖析

rpc原理

概念
1泼橘、serialization:序列化涝动,通訊數(shù)據(jù)需要經(jīng)過序列化,從而支持在網(wǎng)絡(luò)中傳輸;
2、deserialization:反序列化尝偎,服務(wù)接受到序列化的請求數(shù)據(jù),需要序列化為底層原始數(shù)據(jù)米愿;
3、stub:體現(xiàn)在XXL-RPC為服務(wù)的api接口鼻吮;
4育苟、skeleton:體現(xiàn)在XXL-RPC為服務(wù)的實現(xiàn)api接口的具體服務(wù);
5椎木、proxy:根據(jù)遠程服務(wù)的stub生成的代理服務(wù)违柏,對開發(fā)人員透明;
6香椎、provider:遠程服務(wù)的提供方漱竖;
7、consumer:遠程服務(wù)的消費方畜伐;

RPC通訊馍惹,可大致劃分為四個步驟,可參考上圖進行理解:(XXL-RPC提供了多種調(diào)用方案玛界,此處以 “SYNC” 方案為例講解万矾;)>
1、consumer發(fā)起請求:consumer會根據(jù)遠程服務(wù)的stub實例化遠程服務(wù)的代理服務(wù)慎框,在發(fā)起請求時良狈,代理服務(wù)會封裝本次請求相關(guān)底層數(shù)據(jù),如服務(wù)iface笨枯、methos薪丁、params等等,然后將數(shù)據(jù)經(jīng)過serialization之后發(fā)送給provider猎醇;
2窥突、provider接收請求:provider接收到請求數(shù)據(jù),首先會deserialization獲取原始請求數(shù)據(jù)硫嘶,然后根據(jù)stub匹配目標服務(wù)并調(diào)用阻问;
3、provider響應(yīng)請求:provider在調(diào)用目標服務(wù)后沦疾,封裝服務(wù)返回數(shù)據(jù)并進行serialization称近,然后把數(shù)據(jù)傳輸給consumer;
4哮塞、consumer接收響應(yīng):consumer接受到相應(yīng)數(shù)據(jù)后刨秆,首先會deserialization獲取原始數(shù)據(jù),然后根據(jù)stub生成調(diào)用返回結(jié)果忆畅,返回給請求調(diào)用處衡未。結(jié)束。

其實已經(jīng)講得比較清楚,在官方提供的demo里缓醋,

  1. consumer調(diào)用sayHi方法
  2. 通過注冊中心找到provider
  3. 代理類封裝請求并序列化后發(fā)送給provider
  4. provider反序列化數(shù)據(jù)如失,發(fā)現(xiàn)調(diào)用的是sayHi方法
  5. 把調(diào)用結(jié)果序列化返回給consumer
  6. consumer反序列化返回結(jié)果

接下來回到代碼本身,看看這一系列過程是怎么實現(xiàn)的送粱。
先從provider的demo入手吧褪贵。
先看看配置,只配置了一個XxlRpcSpringProviderFactorybean
從配置代碼來看抗俄,配置了provider的端口脆丁,注冊中心的類型(xxl-registry or zookeeper or local,這里是xxl-registry) 动雹,已經(jīng)注冊中心的一些參數(shù)(這里是對應(yīng)注冊中心xxl-registry需要的配置:注冊中心地址槽卫,環(huán)境,token)

@Configuration
public class XxlRpcProviderConfig {
    private Logger logger = LoggerFactory.getLogger(XxlRpcProviderConfig.class);

    @Value("${xxl-rpc.remoting.port}")
    private int port;

    @Value("${xxl-rpc.registry.xxlregistry.address}")
    private String address;

    @Value("${xxl-rpc.registry.xxlregistry.env}")
    private String env;

    @Value("${xxl-rpc.registry.xxlregistry.token}")
    private String token;

    @Bean
    public XxlRpcSpringProviderFactory xxlRpcSpringProviderFactory() {

        XxlRpcSpringProviderFactory providerFactory = new XxlRpcSpringProviderFactory();
        providerFactory.setPort(port);
        providerFactory.setServiceRegistryClass(XxlRegistryServiceRegistry.class);
        providerFactory.setServiceRegistryParam(new HashMap<String, String>(){{
            put(XxlRegistryServiceRegistry.XXL_REGISTRY_ADDRESS, address);
            put(XxlRegistryServiceRegistry.ENV, env);
            put(XxlRegistryServiceRegistry.ACCESS_TOKEN,token);
        }});

        logger.info(">>>>>>>>>>> xxl-rpc provider config init finish.");
        return providerFactory;
    }

}

ok洽胶,那我們在看看XxlRpcSpringProviderFactory有什么花頭晒夹。
實現(xiàn)了3個接口
implements ApplicationContextAware, InitializingBean,DisposableBean
,并繼承自XxlRpcProviderFactory

ps:這幾個接口都是一些spring bean的一些擴展姊氓,詳細的可以自行搜索丐怯, 下面給出一些簡單的描述

InitialingBean是一個接口,提供了一個唯一的方法afterPropertiesSet()翔横。
DisposableBean也是一個接口读跷,提供了一個唯一的方法destory()
前者顧名思義在Bean屬性都設(shè)置完畢后調(diào)用afterPropertiesSet()方法做一些初始化的工作禾唁,后者在Bean生命周期結(jié)束前調(diào)用destory()方法做一些收尾工作

實現(xiàn)ApplicationContextAware接口的Bean效览,在Bean加載的過程中可以獲取到Spring的ApplicationContext,這個尤其重要荡短,ApplicationContext是Spring應(yīng)用上下文丐枉,從ApplicationContext中可以獲取包括任意的Bean在內(nèi)的大量Spring容器內(nèi)容和信息

public class XxlRpcSpringProviderFactory extends XxlRpcProviderFactory implements ApplicationContextAware, InitializingBean,DisposableBean {

這里比較好理解,因為XxlRpcSpringProviderFactory是針對spring的客戶端掘托,所需需要額外實現(xiàn)幾個接口瘦锹,主要的邏輯是在它的父類XxlRpcProviderFactory里面。
先看看第一段

// ---------------------- config ----------------------

里面的內(nèi)容
netType定義了網(wǎng)絡(luò)通信協(xié)議(NETTY闪盔,NETTY_HTTP弯院,MINA,JETTY)
Serializer定義了序列化方式
ip,port,accessToken還不知道干嘛泪掀,留著
serviceRegistryClassserviceRegistryParam定義注冊中心類和參數(shù)

    private NetEnum netType;
    private Serializer serializer;

    private String ip;                  // for registry
    private int port;                   // default port
    private String accessToken;

    private Class<? extends ServiceRegistry> serviceRegistryClass;
    private Map<String, String> serviceRegistryParam;

再往下看听绳,這些屬性的設(shè)置全是在initConfig方法中設(shè)值的。
通過這段代碼异赫,就可以知道椅挣,ip其實是給consumer使用的本地ip(會注冊到注冊中心的ip)头岔,port其實就是用來和consumer通信的端口號(比如剛剛demo里面的7080)

public void initConfig(NetEnum netType,
                          Serializer serializer,
                          String ip,
                          int port,
                          String accessToken,
                           Class<? extends ServiceRegistry> serviceRegistryClass,
                          Map<String, String> serviceRegistryParam) {

        // init
        this.netType = netType;
        this.serializer = serializer;
        this.ip = ip;
        this.port = port;
        this.accessToken = accessToken;
        this.serviceRegistryClass = serviceRegistryClass;
        this.serviceRegistryParam = serviceRegistryParam;

        // valid
        if (this.netType==null) {
            throw new XxlRpcException("xxl-rpc provider netType missing.");
        }
        if (this.serializer==null) {
            throw new XxlRpcException("xxl-rpc provider serializer missing.");
        }
        if (this.ip == null) {
            this.ip = IpUtil.getIp();
        }
        if (this.port <= 0) {
            this.port = 7080;
        }
        if (NetUtil.isPortUsed(this.port)) {
            throw new XxlRpcException("xxl-rpc provider port["+ this.port +"] is used.");
        }
        if (this.serviceRegistryClass != null) {
            if (this.serviceRegistryParam == null) {
                throw new XxlRpcException("xxl-rpc provider serviceRegistryParam is missing.");
            }
        }

    }

這段代碼其實就是一些基礎(chǔ)參數(shù)的config。比如用什么通信協(xié)議贴妻,開房什么端口切油,用什么注冊中心等等蝙斜。
再接著往下看

// ---------------------- start / stop ----------------------

看看start和stop代碼有什么

    private Server server;
    private ServiceRegistry serviceRegistry;
    private String serviceAddress;

先往下看名惩,定義了start和stop方法,仔細一看是針對server的startstop孕荠,server是netType的一個instance娩鹉。
那么就比較好理解了,server就是負責(zé)通信的實例(demo里是netty)稚伍。
首先拿到server的實例弯予,然后設(shè)置了setStartedCallbacksetStopedCallback,并調(diào)用了start方法个曙。

public void start() throws Exception {
        // start server
        serviceAddress = IpUtil.getIpPort(this.ip, port);
        server = netType.serverClass.newInstance();
        server.setStartedCallback(new BaseCallback() {      // serviceRegistry started
            @Override
            public void run() throws Exception {
                // start registry
                if (serviceRegistryClass != null) {
                    serviceRegistry = serviceRegistryClass.newInstance();
                    serviceRegistry.start(serviceRegistryParam);
                    if (serviceData.size() > 0) {
                        serviceRegistry.registry(serviceData.keySet(), serviceAddress);
                    }
                }
            }
        });
        server.setStopedCallback(new BaseCallback() {       // serviceRegistry stoped
            @Override
            public void run() {
                // stop registry
                if (serviceRegistry != null) {
                    if (serviceData.size() > 0) {
                        serviceRegistry.remove(serviceData.keySet(), serviceAddress);
                    }
                    serviceRegistry.stop();
                    serviceRegistry = null;
                }
            }
        });
        server.start(this);
    }

    public void  stop() throws Exception {
        // stop server
        server.stop();
    }

那我們再深入看看server這些callbackstart方法都干了什么吧锈嫩。
server是一個抽象類(nettyServer會繼承這個類),定義了BaseCallback類型的startedCallbackstopedCallback垦搬,根據(jù)名字猜測是通信server調(diào)用start后呼寸,會調(diào)用startedCallback.run方法,server調(diào)用stop之后會調(diào)用stopedCallback.run方法猴贰。好像比較抽象对雪,畢竟是抽象類。

public abstract class Server {
    protected static final Logger logger = LoggerFactory.getLogger(Server.class);


    private BaseCallback startedCallback;
    private BaseCallback stopedCallback;

    public void setStartedCallback(BaseCallback startedCallback) {
        this.startedCallback = startedCallback;
    }

    public void setStopedCallback(BaseCallback stopedCallback) {
        this.stopedCallback = stopedCallback;
    }


    /**
     * start server
     *
     * @param xxlRpcProviderFactory
     * @throws Exception
     */
    public abstract void start(final XxlRpcProviderFactory xxlRpcProviderFactory) throws Exception;

    /**
     * callback when started
     */
    public void onStarted() {
        if (startedCallback != null) {
            try {
                startedCallback.run();
            } catch (Exception e) {
                logger.error(">>>>>>>>>>> xxl-rpc, server startedCallback error.", e);
            }
        }
    }

    /**
     * stop server
     *
     * @throws Exception
     */
    public abstract void stop() throws Exception;

    /**
     * callback when stoped
     */
    public void onStoped() {
        if (stopedCallback != null) {
            try {
                stopedCallback.run();
            } catch (Exception e) {
                logger.error(">>>>>>>>>>> xxl-rpc, server stopedCallback error.", e);
            }
        }
    }

}

那我們直接進入繼承他的NettyServer看看米绕,這樣應(yīng)該就比較清晰了瑟捣。
start方法里面直接開了一個守護線程,線程做的事情非常簡單:配置并開啟netty服務(wù)栅干,并調(diào)用onStarted方法
stop方法更簡單迈套,直接interrupt,并調(diào)用onStoped方法碱鳞。

public class NettyServer extends Server {

    private Thread thread;

    @Override
    public void start(final XxlRpcProviderFactory xxlRpcProviderFactory) throws Exception {

        thread = new Thread(new Runnable() {
            @Override
            public void run() {

                // param
                final ThreadPoolExecutor serverHandlerPool = ThreadPoolUtil.makeServerThreadPool(NettyServer.class.getSimpleName());
                EventLoopGroup bossGroup = new NioEventLoopGroup();
                EventLoopGroup workerGroup = new NioEventLoopGroup();

                try {
                    // start server
                    ServerBootstrap bootstrap = new ServerBootstrap();
                    bootstrap.group(bossGroup, workerGroup)
                            .channel(NioServerSocketChannel.class)
                            .childHandler(new ChannelInitializer<SocketChannel>() {
                                @Override
                                public void initChannel(SocketChannel channel) throws Exception {
                                    channel.pipeline()
                                            .addLast(new IdleStateHandler(0,0,10, TimeUnit.MINUTES))
                                            .addLast(new NettyDecoder(XxlRpcRequest.class, xxlRpcProviderFactory.getSerializer()))
                                            .addLast(new NettyEncoder(XxlRpcResponse.class, xxlRpcProviderFactory.getSerializer()))
                                            .addLast(new NettyServerHandler(xxlRpcProviderFactory, serverHandlerPool));
                                }
                            })
                            .childOption(ChannelOption.TCP_NODELAY, true)
                            .childOption(ChannelOption.SO_KEEPALIVE, true);

                    // bind
                    ChannelFuture future = bootstrap.bind(xxlRpcProviderFactory.getPort()).sync();

                    logger.info(">>>>>>>>>>> xxl-rpc remoting server start success, nettype = {}, port = {}", NettyServer.class.getName(), xxlRpcProviderFactory.getPort());
                    onStarted();

                    // wait util stop
                    future.channel().closeFuture().sync();

                } catch (Exception e) {
                    if (e instanceof InterruptedException) {
                        logger.info(">>>>>>>>>>> xxl-rpc remoting server stop.");
                    } else {
                        logger.error(">>>>>>>>>>> xxl-rpc remoting server error.", e);
                    }
                } finally {

                    // stop
                    try {
                        serverHandlerPool.shutdown();    // shutdownNow
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                    try {
                        workerGroup.shutdownGracefully();
                        bossGroup.shutdownGracefully();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }

                }
            }
        });
        thread.setDaemon(true);
        thread.start();

    }

    @Override
    public void stop() throws Exception {

        // destroy server thread
        if (thread != null && thread.isAlive()) {
            thread.interrupt();
        }

        // on stop
        onStoped();
        logger.info(">>>>>>>>>>> xxl-rpc remoting server destroy success.");
    }

}

讓我們再回到剛才的代碼桑李,startedCallback方法就是在netty服務(wù)啟動完成之后,把provider的信息注冊到注冊中心

ServiceRegistry是個注冊中心的抽象劫笙,demo里面用的是XxlRegistryServiceRegistry芙扎,其實就是對注冊中心操作的一些封裝,代碼非常簡單填大,不懂可以參考上一篇源碼閱讀:分布式服務(wù)注冊中心XXL-REGISTRY(基于1.0.2)

server.setStartedCallback(new BaseCallback() {      // serviceRegistry started
            @Override
            public void run() throws Exception {
                // start registry
                if (serviceRegistryClass != null) {
                    serviceRegistry = serviceRegistryClass.newInstance();
                    serviceRegistry.start(serviceRegistryParam);
                    if (serviceData.size() > 0) {
                        serviceRegistry.registry(serviceData.keySet(), serviceAddress);
                    }
                }
            }
        });

stopedCallback也比較好理解了戒洼,就是在netty服務(wù)關(guān)閉之后,從注冊中心移除自己允华。

server.setStopedCallback(new BaseCallback() {       // serviceRegistry stoped
            @Override
            public void run() {
                // stop registry
                if (serviceRegistry != null) {
                    if (serviceData.size() > 0) {
                        serviceRegistry.remove(serviceData.keySet(), serviceAddress);
                    }
                    serviceRegistry.stop();
                    serviceRegistry = null;
                }
            }
        });

最后再看看server invoke里面有什么吧
看起來像是用來記rpc service的圈浇,先不管他寥掐,接著往下看

// ---------------------- server invoke ----------------------
/**
     * init local rpc service map
     */
    private Map<String, Object> serviceData = new HashMap<String, Object>();
    public Map<String, Object> getServiceData() {
        return serviceData;
    }

好像就是字符串的拼接,不知道干嘛的磷蜀,先往下看

    /**
     * make service key
     *
     * @param iface
     * @param version
     * @return
     */
    public static String makeServiceKey(String iface, String version){
        String serviceKey = iface;
        if (version!=null && version.trim().length()>0) {
            serviceKey += "#".concat(version);
        }
        return serviceKey;
    }

addService用到了makeServiceKey召耘,看來這個是用來做唯一主鍵的。
根據(jù)名字推測褐隆,應(yīng)該是往前面的serviceData里面把serviceBean放進去污它。

/**
     * add service
     *
     * @param iface
     * @param version
     * @param serviceBean
     */
    public void addService(String iface, String version, Object serviceBean){
        String serviceKey = makeServiceKey(iface, version);
        serviceData.put(serviceKey, serviceBean);

        logger.info(">>>>>>>>>>> xxl-rpc, provider factory add service success. serviceKey = {}, serviceBean = {}", serviceKey, serviceBean.getClass());
    }

通過IDE看看哪里用了addService方法
發(fā)現(xiàn)在剛才的XxlRpcSpringProviderFactory就有用到!
看下代碼庶弃,其實很簡單:

  • 從spring上下文找到加了XxlRpcService注解的bean
  • 接口名+版本號作為唯一主鍵把bean放入serviceData里面
@Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

        Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(XxlRpcService.class);
        if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
            for (Object serviceBean : serviceBeanMap.values()) {
                // valid
                if (serviceBean.getClass().getInterfaces().length ==0) {
                    throw new XxlRpcException("xxl-rpc, service(XxlRpcService) must inherit interface.");
                }
                // add service
                XxlRpcService xxlRpcService = serviceBean.getClass().getAnnotation(XxlRpcService.class);

                String iface = serviceBean.getClass().getInterfaces()[0].getName();
                String version = xxlRpcService.version();

                super.addService(iface, version, serviceBean);
            }
        }

        // TODO衫贬,addServices by api + prop

    }

最后一個方法,從方法名就能看出來歇攻,調(diào)用service固惯,接受一個xxlRpcRequest參數(shù)
serviceData里面取出request里面要調(diào)用的bean
通過反射調(diào)用方法并返回response

/**
     * invoke service
     *
     * @param xxlRpcRequest
     * @return
     */
    public XxlRpcResponse invokeService(XxlRpcRequest xxlRpcRequest) {

        //  make response
        XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
        xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());

        // match service bean
        String serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion());
        Object serviceBean = serviceData.get(serviceKey);

        // valid
        if (serviceBean == null) {
            xxlRpcResponse.setErrorMsg("The serviceKey["+ serviceKey +"] not found.");
            return xxlRpcResponse;
        }

        if (System.currentTimeMillis() - xxlRpcRequest.getCreateMillisTime() > 3*60*1000) {
            xxlRpcResponse.setErrorMsg("The timestamp difference between admin and executor exceeds the limit.");
            return xxlRpcResponse;
        }
        if (accessToken!=null && accessToken.trim().length()>0 && !accessToken.trim().equals(xxlRpcRequest.getAccessToken())) {
            xxlRpcResponse.setErrorMsg("The access token[" + xxlRpcRequest.getAccessToken() + "] is wrong.");
            return xxlRpcResponse;
        }

        try {
            // invoke
            Class<?> serviceClass = serviceBean.getClass();
            String methodName = xxlRpcRequest.getMethodName();
            Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
            Object[] parameters = xxlRpcRequest.getParameters();

            Method method = serviceClass.getMethod(methodName, parameterTypes);
            method.setAccessible(true);
            Object result = method.invoke(serviceBean, parameters);

            /*FastClass serviceFastClass = FastClass.create(serviceClass);
            FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
            Object result = serviceFastMethod.invoke(serviceBean, parameters);*/

            xxlRpcResponse.setResult(result);
        } catch (Throwable t) {
            // catch error
            logger.error("xxl-rpc provider invokeService error.", t);
            xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t));
        }

        return xxlRpcResponse;
    }

ok,到這里為止缴守,XxlRpcProviderFactory的代碼看完了葬毫,來總結(jié)一下它究竟能干什么事情

  • 配置通信協(xié)議,序列化方式屡穗,注冊中心
  • 開啟通信server
  • serviceData里所有的provider服務(wù)注冊到注冊中心
  • 通過反射機制贴捡,提供調(diào)用服務(wù)的(invokeService)方法
    看完了XxlRpcProviderFactory,我們再回到XxlRpcSpringProviderFactory
    與父類不同的是鸡捐,提供了netType默認使用netty栈暇,序列化默認使用hessian
    // ---------------------- config ----------------------

    private String netType = NetEnum.NETTY.name();
    private String serialize = Serializer.SerializeEnum.HESSIAN.name();

再看看必須實現(xiàn)的幾個接口
這個方法前面已經(jīng)看到過,這里是把所有帶有XxlRpcService注解的bean放到serverData這個map里面

@Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

        Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(XxlRpcService.class);
        if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
            for (Object serviceBean : serviceBeanMap.values()) {
                // valid
                if (serviceBean.getClass().getInterfaces().length ==0) {
                    throw new XxlRpcException("xxl-rpc, service(XxlRpcService) must inherit interface.");
                }
                // add service
                XxlRpcService xxlRpcService = serviceBean.getClass().getAnnotation(XxlRpcService.class);

                String iface = serviceBean.getClass().getInterfaces()[0].getName();
                String version = xxlRpcService.version();

                super.addService(iface, version, serviceBean);
            }
        }

        // TODO箍镜,addServices by api + prop

    }

最后兩個方法源祈,很簡單,就是配置一下基本參數(shù)色迂,以及調(diào)用父類的方法香缺。

    @Override
    public void afterPropertiesSet() throws Exception {
        this.prepareConfig();
        super.start();
    }

    @Override
    public void destroy() throws Exception {
        super.stop();
    }

這樣一來,XxlRpcSpringProviderFactory就全部閱讀完了歇僧。我們重新梳理一遍流程XxlRpcInvokerConfig干的事情

  1. 配置通信框架(netty)图张,序列化框架(hessian),注冊中心(xxl-registry)
  2. 把使用了XxlRpcService注解的bean全部put到Map<String,Object> serviceData里面诈悍,key為bean繼承的接口名+版本號祸轮,Object為service的bean本身
  3. 啟動通信框架(netty),啟動成功后把serviceData里面的bean注冊到注冊中心
    這樣侥钳,provider就已經(jīng)完全啟動完成了适袜,一切準備就緒,就等客戶端調(diào)用了舷夺!
    那么苦酱,我們再看看客戶端的代碼售貌!
    老套路,從配置開始看起疫萤,和provider的配置差不多颂跨,就不在贅述
@Configuration
public class XxlRpcInvokerConfig {
    private Logger logger = LoggerFactory.getLogger(XxlRpcInvokerConfig.class);


    @Value("${xxl-rpc.registry.xxlregistry.address}")
    private String address;

    @Value("${xxl-rpc.registry.xxlregistry.env}")
    private String env;

    @Value("${xxl-rpc.registry.xxlregistry.token}")
    private String token;


    @Bean
    public XxlRpcSpringInvokerFactory xxlJobExecutor() {

        XxlRpcSpringInvokerFactory invokerFactory = new XxlRpcSpringInvokerFactory();
        invokerFactory.setServiceRegistryClass(XxlRegistryServiceRegistry.class);
        invokerFactory.setServiceRegistryParam(new HashMap<String, String>(){{
            put(XxlRegistryServiceRegistry.XXL_REGISTRY_ADDRESS, address);
            put(XxlRegistryServiceRegistry.ENV, env);
            put(XxlRegistryServiceRegistry.ACCESS_TOKEN,token);
        }});

        logger.info(">>>>>>>>>>> xxl-rpc invoker config init finish.");
        return invokerFactory;
    }

}

直接去XxlRpcSpringInvokerFactory里面看看吧,InitializingBean,DisposableBean不再贅述

實現(xiàn)BeanFactoryAware接口的Bean扯饶,在Bean加載的過程中可以獲取到加載該Bean的BeanFactory

InstantiationAwareBeanPostProcessor作用的是Bean實例化前后恒削,即:
1、Bean構(gòu)造出來之前調(diào)用postProcessBeforeInstantiation()方法
2帝际、Bean構(gòu)造出來之后調(diào)用postProcessAfterInstantiation()方法

public class XxlRpcSpringInvokerFactory extends InstantiationAwareBeanPostProcessorAdapter implements InitializingBean,DisposableBean, BeanFactoryAware {

ok蔓同,看看具體類里面的代碼,先看第一段config相關(guān)
這段代碼似曾相識蹲诀,在ProviderFactory里面也有:注冊中心配置

// ---------------------- config ----------------------

    private Class<? extends ServiceRegistry> serviceRegistryClass;          // class.forname
    private Map<String, String> serviceRegistryParam;


    public void setServiceRegistryClass(Class<? extends ServiceRegistry> serviceRegistryClass) {
        this.serviceRegistryClass = serviceRegistryClass;
    }

    public void setServiceRegistryParam(Map<String, String> serviceRegistryParam) {
        this.serviceRegistryParam = serviceRegistryParam;
    }

接著往下看,定義了一個 XxlRpcInvokerFactory

// ---------------------- util ----------------------

    private XxlRpcInvokerFactory xxlRpcInvokerFactory;

進到XxlRpcInvokerFactory 里面看看吧
首先很明顯弃揽,這是一個單例模式
然后也配置了注冊中心

public class XxlRpcInvokerFactory {
    private static Logger logger = LoggerFactory.getLogger(XxlRpcInvokerFactory.class);

    // ---------------------- default instance ----------------------

    private static volatile XxlRpcInvokerFactory instance = new XxlRpcInvokerFactory(LocalServiceRegistry.class, null);
    public static XxlRpcInvokerFactory getInstance() {
        return instance;
    }


    // ---------------------- config ----------------------

    private Class<? extends ServiceRegistry> serviceRegistryClass;          // class.forname
    private Map<String, String> serviceRegistryParam;


    public XxlRpcInvokerFactory() {
    }
    public XxlRpcInvokerFactory(Class<? extends ServiceRegistry> serviceRegistryClass, Map<String, String> serviceRegistryParam) {
        this.serviceRegistryClass = serviceRegistryClass;
        this.serviceRegistryParam = serviceRegistryParam;
    }
    // 略
}

再往下看脯爪,似曾相識的代碼
start方法是開始想注冊中心注冊
stop方法先從注冊中心移除,然后在吧stopCallbackList里面還沒執(zhí)行的方法執(zhí)行了
那什么時候調(diào)用addStopCallBackstopCallBack加進list呢矿微?用IDE搜一搜痕慢,
發(fā)現(xiàn)在JettyClientConnectClient的時候用到了,好像暫時和我們demo的代碼沒什么關(guān)系涌矢,先放一放
最后把responseCallbackThreadPool 線程池shutDown了掖举。

// ---------------------- start / stop ----------------------

    public void start() throws Exception {
        // start registry
        if (serviceRegistryClass != null) {
            serviceRegistry = serviceRegistryClass.newInstance();
            serviceRegistry.start(serviceRegistryParam);
        }
    }

    public void  stop() throws Exception {
        // stop registry
        if (serviceRegistry != null) {
            serviceRegistry.stop();
        }

        // stop callback
        if (stopCallbackList.size() > 0) {
            for (BaseCallback callback: stopCallbackList) {
                try {
                    callback.run();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }

        // stop CallbackThreadPool
        stopCallbackThreadPool();
    }

// ---------------------- service registry ----------------------

    private ServiceRegistry serviceRegistry;
    public ServiceRegistry getServiceRegistry() {
        return serviceRegistry;
    }


    // ---------------------- service registry ----------------------

    private List<BaseCallback> stopCallbackList = new ArrayList<BaseCallback>();

    public void addStopCallBack(BaseCallback callback){
        stopCallbackList.add(callback);
    }


responseCallbackThreadPool線程池是用來干什么的?用IDE搜一搜娜庇,就在stopCallbackThreadPool上面
executeResponseCallback接受一個Runnable對象塔次,并初始化線程池,并放入線程池
那么executeResponseCallback什么時候會被用到名秀?

// ---------------------- response callback ThreadPool ----------------------

    private ThreadPoolExecutor responseCallbackThreadPool = null;
    public void executeResponseCallback(Runnable runnable){

        if (responseCallbackThreadPool == null) {
            synchronized (this) {
                if (responseCallbackThreadPool == null) {
                    responseCallbackThreadPool = new ThreadPoolExecutor(
                            10,
                            100,
                            60L,
                            TimeUnit.SECONDS,
                            new LinkedBlockingQueue<Runnable>(1000),
                            new ThreadFactory() {
                                @Override
                                public Thread newThread(Runnable r) {
                                    return new Thread(r, "xxl-rpc, XxlRpcInvokerFactory-responseCallbackThreadPool-" + r.hashCode());
                                }
                            },
                            new RejectedExecutionHandler() {
                                @Override
                                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                                    throw new XxlRpcException("xxl-rpc Invoke Callback Thread pool is EXHAUSTED!");
                                }
                            });     // default maxThreads 300, minThreads 60
                }
            }
        }
        responseCallbackThreadPool.execute(runnable);
    }
    public void stopCallbackThreadPool() {
        if (responseCallbackThreadPool != null) {
            responseCallbackThreadPool.shutdown();
        }
    }

其實就在上面
定義了一個concurrentMap励负,參數(shù)是一個string和一個XxlRpcFutureResponse
根據(jù)future這個名字,可以猜一下應(yīng)該是個future(多線程)操作相關(guān)的
set和remove方法看起來就是對map進行一些關(guān)于request的操作
notifyInvokerFuture方法匕得,從futureResponsePool根據(jù)requestId取出一個XxlRpcFutureResponse對象
然后判斷Response的狀態(tài)继榆,并做一些設(shè)置,然后從futureResponsePool移出這個requestId
到這里汁掠,還比較蒙逼略吨,大概只能看出,利用了future考阱,對response做一些處理翠忠。
那么問題來了,response是什么時候生產(chǎn)的羔砾?為什么會有一堆callback方法负间?這樣做的目的是什么偶妖?
因為沒有看到調(diào)用遠程服務(wù)的代碼,看不懂很正常政溃!

 // ---------------------- future-response pool ----------------------
    // XxlRpcFutureResponseFactory

    private ConcurrentMap<String, XxlRpcFutureResponse> futureResponsePool = new ConcurrentHashMap<String, XxlRpcFutureResponse>();
    public void setInvokerFuture(String requestId, XxlRpcFutureResponse futureResponse){
        futureResponsePool.put(requestId, futureResponse);
    }
    public void removeInvokerFuture(String requestId){
        futureResponsePool.remove(requestId);
    }
    public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse){

        // get
        final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
        if (futureResponse == null) {
            return;
        }

        // notify
        if (futureResponse.getInvokeCallback()!=null) {

            // callback type
            try {
                executeResponseCallback(new Runnable() {
                    @Override
                    public void run() {
                        if (xxlRpcResponse.getErrorMsg() != null) {
                            futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg()));
                        } else {
                            futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
                        }
                    }
                });
            }catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        } else {

            // other nomal type
            futureResponse.setResponse(xxlRpcResponse);
        }

        // do remove
        futureResponsePool.remove(requestId);

    }

讓我們回到XxlRpcSpringInvokerFactory趾访,還剩最后一個方法postProcessAfterInstantiation
看看都干了些什么吧
取出加了XxlRpcReference注解的字段(field)
組裝成XxlRpcReferenceBean,并根據(jù)名字猜測董虱,通過這個bean得到一個service的代理對象扼鞋!

@Override
    public boolean postProcessAfterInstantiation(final Object bean, final String beanName) throws BeansException {

        // collection
        final Set<String> serviceKeyList = new HashSet<>();

        // parse XxlRpcReferenceBean
        ReflectionUtils.doWithFields(bean.getClass(), new ReflectionUtils.FieldCallback() {
            @Override
            public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
                if (field.isAnnotationPresent(XxlRpcReference.class)) {
                    // valid
                    Class iface = field.getType();
                    if (!iface.isInterface()) {
                        throw new XxlRpcException("xxl-rpc, reference(XxlRpcReference) must be interface.");
                    }

                    XxlRpcReference rpcReference = field.getAnnotation(XxlRpcReference.class);

                    // init reference bean
                    XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean(
                            rpcReference.netType(),
                            rpcReference.serializer().getSerializer(),
                            rpcReference.callType(),
                            rpcReference.loadBalance(),
                            iface,
                            rpcReference.version(),
                            rpcReference.timeout(),
                            rpcReference.address(),
                            rpcReference.accessToken(),
                            null,
                            xxlRpcInvokerFactory
                    );

                    Object serviceProxy = referenceBean.getObject();

                    // set bean
                    field.setAccessible(true);
                    field.set(bean, serviceProxy);

                    logger.info(">>>>>>>>>>> xxl-rpc, invoker factory init reference bean success. serviceKey = {}, bean.field = {}.{}",
                            XxlRpcProviderFactory.makeServiceKey(iface.getName(), rpcReference.version()), beanName, field.getName());

                    // collection
                    String serviceKey = XxlRpcProviderFactory.makeServiceKey(iface.getName(), rpcReference.version());
                    serviceKeyList.add(serviceKey);

                }
            }
        });

        // mult discovery
        if (xxlRpcInvokerFactory.getServiceRegistry() != null) {
            try {
                xxlRpcInvokerFactory.getServiceRegistry().discovery(serviceKeyList);
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }

        return super.postProcessAfterInstantiation(bean, beanName);
    }

看看getObject做了什么吧

  • 配置一個動態(tài)代理(猜測就是調(diào)用遠程服務(wù)用的)
  • 根據(jù)XxlRpcInvokerFactory配置的注冊中心,查找provider地址
  • 通過通信框架愤诱,把數(shù)據(jù)發(fā)送到provider機器
  • NettyClientHandler獲得響應(yīng)的時候云头, 會調(diào)用futureResponse.setResponse(xxlRpcResponse);把拿到的response放進futureResponse里面
  • 再通過XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);拿到response(同步調(diào)用)
// ---------------------- util ----------------------

    public Object getObject() {
        return Proxy.newProxyInstance(Thread.currentThread()
                .getContextClassLoader(), new Class[] { iface },
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

                        // method param
                        String className = method.getDeclaringClass().getName();    // iface.getName()
                        String varsion_ = version;
                        String methodName = method.getName();
                        Class<?>[] parameterTypes = method.getParameterTypes();
                        Object[] parameters = args;

                        // filter for generic
                        if (className.equals(XxlRpcGenericService.class.getName()) && methodName.equals("invoke")) {

                            Class<?>[] paramTypes = null;
                            if (args[3]!=null) {
                                String[] paramTypes_str = (String[]) args[3];
                                if (paramTypes_str.length > 0) {
                                    paramTypes = new Class[paramTypes_str.length];
                                    for (int i = 0; i < paramTypes_str.length; i++) {
                                        paramTypes[i] = ClassUtil.resolveClass(paramTypes_str[i]);
                                    }
                                }
                            }

                            className = (String) args[0];
                            varsion_ = (String) args[1];
                            methodName = (String) args[2];
                            parameterTypes = paramTypes;
                            parameters = (Object[]) args[4];
                        }

                        // filter method like "Object.toString()"
                        if (className.equals(Object.class.getName())) {
                            logger.info(">>>>>>>>>>> xxl-rpc proxy class-method not support [{}#{}]", className, methodName);
                            throw new XxlRpcException("xxl-rpc proxy class-method not support");
                        }

                        // address
                        String finalAddress = address;
                        if (finalAddress==null || finalAddress.trim().length()==0) {
                            if (invokerFactory!=null && invokerFactory.getServiceRegistry()!=null) {
                                // discovery
                                String serviceKey = XxlRpcProviderFactory.makeServiceKey(className, varsion_);
                                TreeSet<String> addressSet = invokerFactory.getServiceRegistry().discovery(serviceKey);
                                // load balance
                                if (addressSet==null || addressSet.size()==0) {
                                    // pass
                                } else if (addressSet.size()==1) {
                                    finalAddress = addressSet.first();
                                } else {
                                    finalAddress = loadBalance.xxlRpcInvokerRouter.route(serviceKey, addressSet);
                                }

                            }
                        }
                        if (finalAddress==null || finalAddress.trim().length()==0) {
                            throw new XxlRpcException("xxl-rpc reference bean["+ className +"] address empty");
                        }

                        // request
                        XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();
                        xxlRpcRequest.setRequestId(UUID.randomUUID().toString());
                        xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());
                        xxlRpcRequest.setAccessToken(accessToken);
                        xxlRpcRequest.setClassName(className);
                        xxlRpcRequest.setMethodName(methodName);
                        xxlRpcRequest.setParameterTypes(parameterTypes);
                        xxlRpcRequest.setParameters(parameters);
                        
                        // send
                        if (CallType.SYNC == callType) {
                            // future-response set
                            XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
                            try {
                                // do invoke
                                client.asyncSend(finalAddress, xxlRpcRequest);

                                // future get
                                XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
                                if (xxlRpcResponse.getErrorMsg() != null) {
                                    throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
                                }
                                return xxlRpcResponse.getResult();
                            } catch (Exception e) {
                                logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);

                                throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
                            } finally{
                                // future-response remove
                                futureResponse.removeInvokerFuture();
                            }
                        } else if (CallType.FUTURE == callType) {
                            // future-response set
                            XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
                            try {
                                // invoke future set
                                XxlRpcInvokeFuture invokeFuture = new XxlRpcInvokeFuture(futureResponse);
                                XxlRpcInvokeFuture.setFuture(invokeFuture);

                                // do invoke
                                client.asyncSend(finalAddress, xxlRpcRequest);

                                return null;
                            } catch (Exception e) {
                                logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);

                                // future-response remove
                                futureResponse.removeInvokerFuture();

                                throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
                            }

                        } else if (CallType.CALLBACK == callType) {

                            // get callback
                            XxlRpcInvokeCallback finalInvokeCallback = invokeCallback;
                            XxlRpcInvokeCallback threadInvokeCallback = XxlRpcInvokeCallback.getCallback();
                            if (threadInvokeCallback != null) {
                                finalInvokeCallback = threadInvokeCallback;
                            }
                            if (finalInvokeCallback == null) {
                                throw new XxlRpcException("xxl-rpc XxlRpcInvokeCallback(CallType="+ CallType.CALLBACK.name() +") cannot be null.");
                            }

                            // future-response set
                            XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, finalInvokeCallback);
                            try {
                                client.asyncSend(finalAddress, xxlRpcRequest);
                            } catch (Exception e) {
                                logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);

                                // future-response remove
                                futureResponse.removeInvokerFuture();

                                throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
                            }

                            return null;
                        } else if (CallType.ONEWAY == callType) {
                            client.asyncSend(finalAddress, xxlRpcRequest);
                            return null;
                        } else {
                            throw new XxlRpcException("xxl-rpc callType["+ callType +"] invalid");
                        }

                    }
                });
    }

回到前面的代碼
getObject的代理之后,為field設(shè)置了代理淫半,并把serviceKey(接口名+版本號)放在里一個set里面溃槐。

現(xiàn)先總結(jié)一下XxlRpcInvokerConfig干的事情吧

  1. 初始化一個XxlRpcSpringInvokerFactory 的bean
  2. 配置注冊中心,通信框架科吭,序列化框架
  3. 向注冊中心注冊
  4. @XxlRpcReference注解的field配置代理

是不是似曾相識昏滴,沒錯,和provider的config其實幾乎一樣对人。
那我們從consumer角度看看谣殊,一次調(diào)用是怎么完成的吧!看完了牺弄,前面的疑問應(yīng)該都能解決了姻几,吧
看看調(diào)用代碼,用@XxlRpcReference注解了provider提供的接口
然后直接通過接口調(diào)用方法势告。
ok蛇捌,那么所有貓膩都在@XxlRpcReference這個注解里面了

@Controller
public class IndexController {
    
    @XxlRpcReference
    private DemoService demoService;


    @RequestMapping("")
    @ResponseBody
    public UserDTO http(String name) {

        try {
            return demoService.sayHi(name);
        } catch (Exception e) {
            e.printStackTrace();
            return new UserDTO(null, e.getMessage());
        }
    }

}

先看看這個注解本身吧
給了幾個默認值:默認使用netty,使用HESSIAN培慌,使用同步調(diào)用豁陆,負責(zé)均衡方式是輪詢

@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface XxlRpcReference {

    NetEnum netType() default NetEnum.NETTY;
    Serializer.SerializeEnum serializer() default Serializer.SerializeEnum.HESSIAN;
    CallType callType() default CallType.SYNC;
    LoadBalance loadBalance() default LoadBalance.ROUND;

    //Class<?> iface;
    String version() default "";

    long timeout() default 1000;

    String address() default "";
    String accessToken() default "";

    //XxlRpcInvokeCallback invokeCallback() ;

}

搜一搜那里對這個注解做了處理吧
似曾相識,就是前面提到過的XxlRpcSpringInvokerFactorypostProcessAfterInstantiation方法吵护!
所以整個調(diào)用過程應(yīng)該就是調(diào)用代理方法的過程盒音。
這樣整個客戶端調(diào)用過程就比較清晰了

  • 初始化的時候,配置客戶端的通信框架馅而,序列化框架祥诽,注冊中心
  • 通過掃描@XxlRpcReference注解,初始化provider提供的接口的代理
  • 進行遠程調(diào)用的時候瓮恭,實際是代理調(diào)用
  • 通過代理通信協(xié)議客戶端和注冊中心雄坪,向provider請求數(shù)據(jù)

發(fā)一次請求,下個斷點看看
果然進到這個代理調(diào)用里面了


看看服務(wù)端是怎么接收請求的吧屯蹦,有了前面的服務(wù)端代碼閱讀和客戶端調(diào)用代碼閱讀维哈,其實服務(wù)端的就比較簡單了
長話短說:
在裝載XxlRpcSpringProviderFactory的時候會掃描@XxlRpcService注解的類绳姨,并把它作為service放進(put)服務(wù)map里面

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

        Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(XxlRpcService.class);
        if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
            for (Object serviceBean : serviceBeanMap.values()) {
                // valid
                if (serviceBean.getClass().getInterfaces().length ==0) {
                    throw new XxlRpcException("xxl-rpc, service(XxlRpcService) must inherit interface.");
                }
                // add service
                XxlRpcService xxlRpcService = serviceBean.getClass().getAnnotation(XxlRpcService.class);

                String iface = serviceBean.getClass().getInterfaces()[0].getName();
                String version = xxlRpcService.version();

                super.addService(iface, version, serviceBean);
            }
        }

        // TODO,addServices by api + prop

    }

當發(fā)生遠程調(diào)用的時候阔挠,會調(diào)用invokeService方法飘庄,主要的代碼就是這段通過反射來獲取真正需要調(diào)用的方法

        try {
            // invoke
            Class<?> serviceClass = serviceBean.getClass();
            String methodName = xxlRpcRequest.getMethodName();
            Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
            Object[] parameters = xxlRpcRequest.getParameters();

            Method method = serviceClass.getMethod(methodName, parameterTypes);
            method.setAccessible(true);
            Object result = method.invoke(serviceBean, parameters);

            /*FastClass serviceFastClass = FastClass.create(serviceClass);
            FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
            Object result = serviceFastMethod.invoke(serviceBean, parameters);*/

            xxlRpcResponse.setResult(result);
        } catch (Throwable t) {
            // catch error
            logger.error("xxl-rpc provider invokeService error.", t);
            xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t));
        }

ok,到這里整個同步調(diào)用的流程就比較清楚了购撼。我們在回顧一下它吹噓的功能跪削。

  • 1、快速接入:接入步驟非常簡潔迂求,兩分鐘即可上手碾盐;

確實還行

  • 2、服務(wù)透明:系統(tǒng)完整的封裝了底層通信細節(jié)揩局,開發(fā)時調(diào)用遠程服務(wù)就像調(diào)用本地服務(wù)毫玖,在提供遠程調(diào)用能力時不損失本地調(diào)用的語義簡潔性;

通過掃描注解和反射的方式谐腰,做到了本地調(diào)用的效果

  • 3孕豹、多調(diào)用方案:支持 SYNC、ONEWAY十气、FUTURE、CALLBACK 等方案春霍;

現(xiàn)在我們只試過同步調(diào)用(SYNC)砸西,接下來看看其他調(diào)用方式吧

ONEWAY

用ONEWAY模式拿到的返回值是null。
provider的代碼如下址儒,其實就是發(fā)起了一個不需要結(jié)果的調(diào)用

else if (CallType.ONEWAY == callType) {
    client.asyncSend(finalAddress, xxlRpcRequest);
    return null;
}

FUTURE

直接返回null芹枷,拿到結(jié)果要從future里面get

else if (CallType.FUTURE == callType) {
    // future-response set
    XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
    try {
        // invoke future set
        XxlRpcInvokeFuture invokeFuture = new XxlRpcInvokeFuture(futureResponse);
        XxlRpcInvokeFuture.setFuture(invokeFuture);
        // do invoke
        client.asyncSend(finalAddress, xxlRpcRequest);
        return null;
    } catch (Exception e) {
        logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
        // future-response remove
        futureResponse.removeInvokerFuture();
        throw (e instanceof XxlRpcException) ? e : new XxlRpcException(e);
    }
}
    @RequestMapping("")
    @ResponseBody
    public UserDTO http(String name) {

        try {
            // dto is null
            UserDTO dto =  demoService.sayHi(name);
            Future<UserDTO> future = XxlRpcInvokeFuture.getFuture(UserDTO.class);
            return future.get();
        } catch (Exception e) {
            e.printStackTrace();
            return new UserDTO(null, e.getMessage());
        }
    }

callback

調(diào)用完成后會調(diào)用onSuccessonFailure方法

else if (CallType.CALLBACK == callType) {
    // get callback
    XxlRpcInvokeCallback finalInvokeCallback = invokeCallback;
    XxlRpcInvokeCallback threadInvokeCallback = XxlRpcInvokeCallback.getCallback();
    if (threadInvokeCallback != null) {
        finalInvokeCallback = threadInvokeCallback;
    }
    if (finalInvokeCallback == null) {
        throw new XxlRpcException("xxl-rpc XxlRpcInvokeCallback(CallType=" + CallType.CALLBACK.name() + ") cannot be null.");
    }
    // future-response set
    XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, finalInvokeCallback);
    try {
        client.asyncSend(finalAddress, xxlRpcRequest);
    } catch (Exception e) {
        logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
        // future-response remove
        futureResponse.removeInvokerFuture();
        throw (e instanceof XxlRpcException) ? e : new XxlRpcException(e);
    }
    return null;
}
XxlRpcInvokeCallback.setCallback(new XxlRpcInvokeCallback<UserDTO>() {
    @Override
    public void onSuccess(UserDTO result) {
        System.out.println(result);
    }
    @Override
    public void onFailure(Throwable exception) {
        exception.printStackTrace();
    }
});
demoService.sayHi("[CALLBACK]jack");

ps:這部分有很多值得深度閱讀的地方,暫時todo

  • 4莲趣、多通訊方案:支持 TCP 和 HTTP 兩種通訊方式進行服務(wù)調(diào)用鸳慈;其中 TCP 提供可選方案 NETTY 或
    MINA ,HTTP 提供可選方案 NETTY_HTTP 或 Jetty喧伞;

通過繼承同一接口來完成調(diào)用方的細節(jié)隱藏

  • 5走芋、多序列化方案:支持 HESSIAN、HESSIAN1潘鲫、PROTOSTUFF翁逞、KRYO、JACKSON 等方案溉仑;

和通信的方式差不多

  • 6挖函、負載均衡/軟負載:提供豐富的負載均衡策略,包括:輪詢浊竟、隨機怨喘、LRU津畸、LFU、一致性HASH等必怜;

這部分比較簡單肉拓,不在贅述

  • 7、注冊中心:可選組件棚赔,支持服務(wù)注冊并動態(tài)發(fā)現(xiàn)帝簇;可選擇不啟用,直接指定服務(wù)提供方機器地址通訊靠益;選擇啟用時丧肴,內(nèi)置可選方案:“XXL-REGISTRY 輕量級注冊中心”(推薦)、“ZK注冊中心”胧后、“Local注冊中心”等芋浮;

前面已經(jīng)提到

  • 8、服務(wù)治理:提供服務(wù)治理中心壳快,可在線管理注冊的服務(wù)信息纸巷,如服務(wù)鎖定、禁用等眶痰;

通過注冊中心實現(xiàn)

  • 9瘤旨、服務(wù)監(jiān)控:可在線監(jiān)控服務(wù)調(diào)用統(tǒng)計信息以及服務(wù)健康狀況等(計劃中);

pass

  • 10竖伯、容錯:服務(wù)提供方集群注冊時存哲,某個服務(wù)節(jié)點不可用時將會自動摘除,同時消費方將會移除失效節(jié)點將流量分發(fā)到其余節(jié)點七婴,提高系統(tǒng)容錯能力祟偷。

通過注冊中心實現(xiàn)

  • 11、解決1+1問題:傳統(tǒng)分布式通訊一般通過nginx或f5做集群服務(wù)的流量負載均衡打厘,每次請求在到達目標服務(wù)機器之前都需要經(jīng)過負載均衡機器修肠,即1+1,這將會把流量放大一倍户盯。而XXL-RPC將會從消費方直達
    服務(wù)提供方嵌施,每次請求直達目標機器,從而可以避免上述問題先舷;

通過注冊中心實現(xiàn)

  • 12艰管、高兼容性:得益于優(yōu)良的兼容性與模塊化設(shè)計,不限制外部框架蒋川;除 spring/springboot 環(huán)境之外牲芋,理論上支持運行在任何Java代碼中,甚至main方法直接啟動運行;

demo里面有缸浦,比較簡單夕冲,就是不通過反射,手動創(chuàng)建對象裂逐。

  • 13歹鱼、泛化調(diào)用:服務(wù)調(diào)用方不依賴服務(wù)方提供的API;

同上

ok卜高,就剩下一個todo了弥姻,那就是各種調(diào)用方案的具體實現(xiàn)
先todo了...

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市掺涛,隨后出現(xiàn)的幾起案子庭敦,更是在濱河造成了極大的恐慌,老刑警劉巖薪缆,帶你破解...
    沈念sama閱讀 212,884評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件秧廉,死亡現(xiàn)場離奇詭異,居然都是意外死亡拣帽,警方通過查閱死者的電腦和手機疼电,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,755評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來减拭,“玉大人蔽豺,你說我怎么就攤上這事∨》啵” “怎么了茫虽?”我有些...
    開封第一講書人閱讀 158,369評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長既们。 經(jīng)常有香客問我,道長正什,這世上最難降的妖魔是什么啥纸? 我笑而不...
    開封第一講書人閱讀 56,799評論 1 285
  • 正文 為了忘掉前任,我火速辦了婚禮婴氮,結(jié)果婚禮上斯棒,老公的妹妹穿的比我還像新娘。我一直安慰自己主经,他們只是感情好荣暮,可當我...
    茶點故事閱讀 65,910評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著罩驻,像睡著了一般穗酥。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 50,096評論 1 291
  • 那天砾跃,我揣著相機與錄音寓免,去河邊找鬼脉课。 笑死,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的吆寨。 我是一名探鬼主播,決...
    沈念sama閱讀 39,159評論 3 411
  • 文/蒼蘭香墨 我猛地睜開眼驱闷,長吁一口氣:“原來是場噩夢啊……” “哼核偿!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起碳竟,我...
    開封第一講書人閱讀 37,917評論 0 268
  • 序言:老撾萬榮一對情侶失蹤草丧,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后瞭亮,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體方仿,經(jīng)...
    沈念sama閱讀 44,360評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,673評論 2 327
  • 正文 我和宋清朗相戀三年统翩,在試婚紗的時候發(fā)現(xiàn)自己被綠了仙蚜。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,814評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡厂汗,死狀恐怖委粉,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情娶桦,我是刑警寧澤贾节,帶...
    沈念sama閱讀 34,509評論 4 334
  • 正文 年R本政府宣布,位于F島的核電站衷畦,受9級特大地震影響栗涂,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜祈争,卻給世界環(huán)境...
    茶點故事閱讀 40,156評論 3 317
  • 文/蒙蒙 一斤程、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧菩混,春花似錦忿墅、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至邢疙,卻和暖如春棍弄,著一層夾襖步出監(jiān)牢的瞬間望薄,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,123評論 1 267
  • 我被黑心中介騙來泰國打工照卦, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留式矫,地道東北人。 一個月前我還...
    沈念sama閱讀 46,641評論 2 362
  • 正文 我出身青樓役耕,卻偏偏與公主長得像采转,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子瞬痘,可洞房花燭夜當晚...
    茶點故事閱讀 43,728評論 2 351

推薦閱讀更多精彩內(nèi)容