萬(wàn)萬(wàn)沒(méi)想到一個(gè)xxl-job源碼分析吟策,竟然能引發(fā)這么多血案!(上)

前言

XxlJob 2.0.1版本源碼的止,其實(shí)在去年這個(gè)時(shí)候我已經(jīng)看完了檩坚,并且做了很詳細(xì)的注釋。但是由于自己太懶了诅福,沒(méi)有寫成博客分享匾委。俗話說(shuō)好記性不如爛筆頭,于是乎我挑出幾個(gè)源碼實(shí)現(xiàn)中我認(rèn)為不錯(cuò)的知識(shí)點(diǎn)并且結(jié)合自己的見解氓润,來(lái)分享一波赂乐。

image.png

因?yàn)槠^(guò)于長(zhǎng),簡(jiǎn)書不讓發(fā)咖气,要分成上下節(jié)挨措!


源碼思考

  • 1.executor端是開啟一個(gè)jettyServer,其中配置了JettyServerHandler。將請(qǐng)求交給XxlRpcResponse xxlRpcResponse = xxlRpcProviderFactory.invokeService(xxlRpcRequest);

詳細(xì)可以看JettyServerHandler中的handle函數(shù)

    @Override
    public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {

        if ("/services".equals(target)) {   // services mapping

            StringBuffer stringBuffer = new StringBuffer("<ui>");
            for (String serviceKey: xxlRpcProviderFactory.getServiceData().keySet()) {
                stringBuffer.append("<li>").append(serviceKey).append(": ").append(xxlRpcProviderFactory.getServiceData().get(serviceKey)).append("</li>");
            }
            stringBuffer.append("</ui>");

            writeResponse(baseRequest, response, stringBuffer.toString().getBytes());
            return;
        } else {    // default remoting mapping

            // request parse
            XxlRpcRequest xxlRpcRequest = null;
            try {

                xxlRpcRequest = parseRequest(request);
            } catch (Exception e) {
                writeResponse(baseRequest, response, ThrowableUtil.toString(e).getBytes());
                return;
            }

            // invoke
            XxlRpcResponse xxlRpcResponse = xxlRpcProviderFactory.invokeService(xxlRpcRequest);

            // response-serialize + response-write
            byte[] responseBytes = xxlRpcProviderFactory.getSerializer().serialize(xxlRpcResponse);
            writeResponse(baseRequest, response, responseBytes);
        }

    }

executor端通過(guò)上文中xxlRpcProviderFactory.invokeService(xxlRpcRequest);函數(shù)執(zhí)行本地暴露的服務(wù)方法

    /**
     * invoke service
     *
     * @param xxlRpcRequest
     * @return
     */
    public XxlRpcResponse invokeService(XxlRpcRequest xxlRpcRequest) {

        //  make response
        XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
        xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());

        // match service bean
        String serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion());
        Object serviceBean = serviceData.get(serviceKey);

        // valid
        if (serviceBean == null) {
            xxlRpcResponse.setErrorMsg("The serviceKey["+ serviceKey +"] not found.");
            return xxlRpcResponse;
        }

        if (System.currentTimeMillis() - xxlRpcRequest.getCreateMillisTime() > 3*60*1000) {
            xxlRpcResponse.setErrorMsg("The timestamp difference between admin and executor exceeds the limit.");
            return xxlRpcResponse;
        }
        if (accessToken!=null && accessToken.trim().length()>0 && !accessToken.trim().equals(xxlRpcRequest.getAccessToken())) {
            xxlRpcResponse.setErrorMsg("The access token[" + xxlRpcRequest.getAccessToken() + "] is wrong.");
            return xxlRpcResponse;
        }

        // invoke
        try {
            Class<?> serviceClass = serviceBean.getClass();
            String methodName = xxlRpcRequest.getMethodName();
            Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
            Object[] parameters = xxlRpcRequest.getParameters();

            Method method = serviceClass.getMethod(methodName, parameterTypes);
            method.setAccessible(true);
            Object result = method.invoke(serviceBean, parameters);

            /*FastClass serviceFastClass = FastClass.create(serviceClass);
            FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
            Object result = serviceFastMethod.invoke(serviceBean, parameters);*/

            xxlRpcResponse.setResult(result);
        } catch (Throwable t) {
            logger.error("xxl-rpc provider invokeService error.", t);
            xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t));
        }

        return xxlRpcResponse;
    }

executor端服務(wù)提供者維護(hù)在serviceData中, serviceDatamap結(jié)構(gòu)崩溪。key是服務(wù)提供者className拼接版本號(hào), value則是服務(wù)提供者本身實(shí)例浅役。

    // ---------------------- server invoke ----------------------

    /**
     * init local rpc service map
     */
    private Map<String, Object> serviceData = new HashMap<String, Object>();
    public Map<String, Object> getServiceData() {
        return serviceData;
    }

admin端其實(shí)是沒(méi)有開啟jettyServer服務(wù)器,可以通過(guò)XxlJobDynamicScheduler
initRpcProvider()一看究竟伶唯。

    // ---------------------- init + destroy ----------------------
    public void start() throws Exception {
        // valid
        Assert.notNull(scheduler, "quartz scheduler is null");

        // init i18n
        initI18n();

        // admin registry monitor run
        /*
         啟動(dòng)自動(dòng)注冊(cè)線程觉既,獲取類型為自動(dòng)注冊(cè)的執(zhí)行器信息,完成機(jī)器的自動(dòng)注冊(cè)與發(fā)現(xiàn)
         */
        JobRegistryMonitorHelper.getInstance().start();

        // admin monitor run
        /**
         * 啟動(dòng)失敗日志監(jiān)控線程
         */
        JobFailMonitorHelper.getInstance().start();

        // admin-server
        /**
         * 暴露AdminBiz服務(wù),并設(shè)置jettyServerHandler
         */
        initRpcProvider();

        logger.info(">>>>>>>>> init xxl-job admin success.");
    }
    // ---------------------- admin rpc provider (no server version) ----------------------
    private static JettyServerHandler jettyServerHandler;
    private void initRpcProvider(){
        // init
        XxlRpcProviderFactory xxlRpcProviderFactory = new XxlRpcProviderFactory();
        xxlRpcProviderFactory.initConfig(NetEnum.JETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), null, 0, XxlJobAdminConfig.getAdminConfig().getAccessToken(), null, null);

        // add services
        xxlRpcProviderFactory.addService(AdminBiz.class.getName(), null, XxlJobAdminConfig.getAdminConfig().getAdminBiz());

        // jetty handler
        jettyServerHandler = new JettyServerHandler(xxlRpcProviderFactory);
    }
    private void stopRpcProvider() throws Exception {
        new XxlRpcInvokerFactory().stop();
    }
    public static void invokeAdminService(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
        jettyServerHandler.handle(null, new Request(null, null), request, response);
    }

admin端通過(guò)API服務(wù)暴露出自己的服務(wù)抵怎。比如執(zhí)行器注冊(cè)服務(wù)奋救,任務(wù)結(jié)果回調(diào)服務(wù)。admin端服務(wù)暴露都是通過(guò)JobApiController實(shí)現(xiàn)的反惕,來(lái)達(dá)到和executor端類似的效果,請(qǐng)求交給JettyServerHandler處理, 然后通過(guò)xxlRpcProviderFacotry調(diào)用本地方法尝艘。

@Controller
public class JobApiController implements InitializingBean {


    @Override
    public void afterPropertiesSet() throws Exception {

    }

    @RequestMapping(AdminBiz.MAPPING)
    @PermessionLimit(limit=false)
    public void api(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
        XxlJobDynamicScheduler.invokeAdminService(request, response);
    }
}

XxlRpcReferenceBean這個(gè)factoryBean中的getObject方法用于創(chuàng)建代理對(duì)象,
代理邏輯中過(guò)濾掉了非業(yè)務(wù)方法,也就是Object類中的方法姿染。只是將目標(biāo)類的方法名背亥,參數(shù),類名等信息包裝成XxlRpcRequest,通過(guò)JettyClient發(fā)送給調(diào)度中心悬赏。調(diào)度中心的接口地址為"admin端的ip/api"狡汉。調(diào)度中心的API接口拿到請(qǐng)求之后通過(guò)參數(shù)里面的類名,方法,參數(shù),版本號(hào)等信息反射出來(lái)一個(gè)服務(wù)實(shí)例,調(diào)用invoke執(zhí)行方法,也就是上文中提到的JobApiController闽颇。

每個(gè)XxlRpcReferenceBean對(duì)象中都會(huì)初始化一個(gè)JettyClient對(duì)象盾戴。感覺(jué)這樣做有點(diǎn)耗性能,需要優(yōu)化啊兵多。畢竟創(chuàng)建一個(gè)JettyClient對(duì)象開銷并不小尖啡,也許你使用操作不恰當(dāng)會(huì)造成OOM橄仆。之前有一位朋友就是對(duì)每一個(gè)請(qǐng)求都創(chuàng)建了一個(gè)HttpClient,這樣由于創(chuàng)建每一個(gè)HttpClient實(shí)例的時(shí)候都會(huì)調(diào)用evictExpireConnections衅斩,造成有多少請(qǐng)求就會(huì)創(chuàng)建多少個(gè)定時(shí)線程盆顾,最后造成系統(tǒng)OOM。所以建議這里最好采用單例或者將JettyClient緩存起來(lái)畏梆。


    // ---------------------- initClient ----------------------

    Client client = null;

    private void initClient() {
        try {
            client = netType.clientClass.newInstance();
            client.init(this);
        } catch (InstantiationException | IllegalAccessException e) {
            throw new XxlRpcException(e);
        }
    }

創(chuàng)建發(fā)起RPC請(qǐng)求的代理對(duì)象

public Object getObject() {
        return Proxy.newProxyInstance(Thread.currentThread()
                .getContextClassLoader(), new Class[] { iface },
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        String className = method.getDeclaringClass().getName();

                        // filter method like "Object.toString()"
                        if (Object.class.getName().equals(className)) {
                            logger.info(">>>>>>>>>>> xxl-rpc proxy class-method not support [{}.{}]", className, method.getName());
                            throw new XxlRpcException("xxl-rpc proxy class-method not support");
                        }

                        // address
                        String address = routeAddress();
                        if (address==null || address.trim().length()==0) {
                            throw new XxlRpcException("xxl-rpc reference bean["+ className +"] address empty");
                        }

                        // request
                        XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();
                        xxlRpcRequest.setRequestId(UUID.randomUUID().toString());
                        xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());
                        xxlRpcRequest.setAccessToken(accessToken);
                        xxlRpcRequest.setClassName(className);
                        xxlRpcRequest.setMethodName(method.getName());
                        xxlRpcRequest.setParameterTypes(method.getParameterTypes());
                        xxlRpcRequest.setParameters(args);
                        
                        // send
                        if (CallType.SYNC == callType) {
                            try {
                                // future set
                                XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(xxlRpcRequest, null);

                                // do invoke
                                client.asyncSend(address, xxlRpcRequest);

                                // future get
                                XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
                                if (xxlRpcResponse.getErrorMsg() != null) {
                                    throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
                                }
                                return xxlRpcResponse.getResult();
                            } catch (Exception e) {
                                logger.info(">>>>>>>>>>> xxl-job, invoke error, address:{}, XxlRpcRequest{}", address, xxlRpcRequest);

                                throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
                            } finally{
                                // remove-InvokerFuture
                                XxlRpcFutureResponseFactory.removeInvokerFuture(xxlRpcRequest.getRequestId());
                            }
                        } else if (CallType.FUTURE == callType) {

                            // thread future set
                            XxlRpcInvokeFuture invokeFuture = null;
                            try {
                                // future set
                                invokeFuture = new XxlRpcInvokeFuture(new XxlRpcFutureResponse(xxlRpcRequest, null));
                                XxlRpcInvokeFuture.setFuture(invokeFuture);

                                // do invoke
                                client.asyncSend(address, xxlRpcRequest);

                                return null;
                            } catch (Exception e) {
                                logger.info(">>>>>>>>>>> xxl-job, invoke error, address:{}, XxlRpcRequest{}", address, xxlRpcRequest);

                                // remove-InvokerFuture
                                invokeFuture.stop();

                                throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
                            }

                        } else if (CallType.CALLBACK == callType) {

                            // get callback
                            XxlRpcInvokeCallback finalInvokeCallback = invokeCallback;
                            XxlRpcInvokeCallback threadInvokeCallback = XxlRpcInvokeCallback.getCallback();
                            if (threadInvokeCallback != null) {
                                finalInvokeCallback = threadInvokeCallback;
                            }
                            if (finalInvokeCallback == null) {
                                throw new XxlRpcException("xxl-rpc XxlRpcInvokeCallback(CallType="+ CallType.CALLBACK.name() +") cannot be null.");
                            }

                            try {
                                // future set
                                XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(xxlRpcRequest, finalInvokeCallback);

                                client.asyncSend(address, xxlRpcRequest);
                            } catch (Exception e) {
                                logger.info(">>>>>>>>>>> xxl-job, invoke error, address:{}, XxlRpcRequest{}", address, xxlRpcRequest);

                                // future remove
                                XxlRpcFutureResponseFactory.removeInvokerFuture(xxlRpcRequest.getRequestId());

                                throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
                            }

                            return null;
                        } else if (CallType.ONEWAY == callType) {
                            client.asyncSend(address, xxlRpcRequest);
                            return null;
                        } else {
                            throw new XxlRpcException("xxl-rpc callType["+ callType +"] invalid");
                        }

                    }
                });
    }

這里只講下SYNC模式的請(qǐng)求您宪,XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(xxlRpcRequest, null);創(chuàng)建XxlRpcFutureResponse對(duì)象(Future任務(wù)執(zhí)行的結(jié)果), 這里的invokeCallback回調(diào)是null

    public XxlRpcFutureResponse(XxlRpcRequest request, XxlRpcInvokeCallback invokeCallback) {
        this.request = request;
        this.invokeCallback = invokeCallback;

        // set-InvokerFuture
        XxlRpcFutureResponseFactory.setInvokerFuture(request.getRequestId(), this);
    }

當(dāng)我們獲取Future執(zhí)行結(jié)果時(shí), XxlRpcFutureResponse中的done變量如果是false, 一直阻塞線程(還有超時(shí)機(jī)制), 除非有調(diào)用setResponse(XxlRpcResponse response)方法

使done變量為true, 并獲取鎖,調(diào)用lock.notifyAll(), 喚醒線程,并返回執(zhí)行結(jié)果奠涌。

JettyClient中發(fā)送請(qǐng)求宪巨,調(diào)用asyncSend方法

@Override
public void asyncSend(String address, XxlRpcRequest xxlRpcRequest) throws Exception {
    // do invoke
    postRequestAsync(address, xxlRpcRequest);
}

postRequestAsync方法拿到RPC請(qǐng)求執(zhí)行結(jié)果,我們應(yīng)該通知future铣猩,喚醒等待揖铜。

// deserialize response
XxlRpcResponse xxlRpcResponse = (XxlRpcResponse) xxlRpcReferenceBean.getSerializer().deserialize(responseBytes, XxlRpcResponse.class);
// notify response
XxlRpcFutureResponseFactory.notifyInvokerFuture(xxlRpcResponse.getRequestId(), xxlRpcResponse);

在初始化XxlRpcFutureResponse中, 我們有調(diào)用setInvokerFuture方法將消息和XxlRpcFutureResponse結(jié)果維護(hù)起來(lái)。

當(dāng)在JettyClient執(zhí)行完請(qǐng)求獲取結(jié)果時(shí), 調(diào)用notifyInvokerFuture方法設(shè)置XxlRpcFutureResponse中的xxlRpcResponse屬性
也就是真實(shí)的執(zhí)行結(jié)果达皿。

JettyClient方法請(qǐng)求雖然是異步的, 但是這里還是同步阻塞獲取執(zhí)行結(jié)果。

public class XxlRpcFutureResponseFactory {

    private static ConcurrentMap<String, XxlRpcFutureResponse> futureResponsePool = new ConcurrentHashMap<String, XxlRpcFutureResponse>();

    public static void setInvokerFuture(String requestId, XxlRpcFutureResponse futureResponse){
        // TODO贿肩,running future method-isolation and limit
        futureResponsePool.put(requestId, futureResponse);
    }

    public static void removeInvokerFuture(String requestId){
        futureResponsePool.remove(requestId);
    }

    public static void notifyInvokerFuture(String requestId, XxlRpcResponse xxlRpcResponse){
        XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
        if (futureResponse != null) {
            futureResponse.setResponse(xxlRpcResponse);
            futureResponsePool.remove(requestId);
        }
    }

}
  • 2.XxlJob的核心包括XxlRpc,上文大致提到過(guò)其簡(jiǎn)單的流程峦椰。如果我們想要實(shí)現(xiàn)一個(gè)RPC,應(yīng)該要做些什么呢汰规?比如序列化汤功,壓縮算法,協(xié)議溜哮,動(dòng)態(tài)代理滔金,服務(wù)注冊(cè),加密茂嗓,網(wǎng)絡(luò)編碼餐茵,連接管理,健康檢測(cè)述吸,負(fù)載均衡忿族,優(yōu)雅啟停機(jī),異常重試蝌矛,業(yè)務(wù)分組以及熔斷限流等等道批。由于之前寫過(guò)一個(gè)基于Netty簡(jiǎn)單的RPC框架,因此可以通過(guò)和XxlRpc對(duì)比來(lái)查漏補(bǔ)缺入撒。(這里不會(huì)講Netty隆豹,只會(huì)粗略講一下Rpc代理)

netty-rpc-client端掃描需要代理服務(wù)的接口并且修改BeanDefinition初始化的方式。在Spring容器中實(shí)例化一個(gè)對(duì)象的方式有:Supplier茅逮,FactoryBean璃赡,指定FactoryMethodNameFactoryBeanName,Constructor等等簿煌。

@Slf4j
public class ClassPathRpcScanner extends ClassPathBeanDefinitionScanner {

    private RpcFactoryBean<?> rpcFactoryBean = new RpcFactoryBean<Object>();

    private Class<? extends Annotation> annotationClass;

    public void setAnnotationClass(Class<? extends Annotation> annotationClass) {
        this.annotationClass = annotationClass;
    }

    public ClassPathRpcScanner(BeanDefinitionRegistry registry) {
        super(registry);
    }

    @Override
    protected Set<BeanDefinitionHolder> doScan(String... basePackages) {
        Set<BeanDefinitionHolder> beanDefinitions = super.doScan(basePackages);
        if (CollectionUtils.isEmpty(beanDefinitions)) {
            logger.warn("No RPC mapper was found in '"
                + Arrays.toString(basePackages)
                + "' package. Please check your configuration");
        } else {
            processBeanDefinitions(beanDefinitions);
        }
        return beanDefinitions;
    }

    public void registerFilter() {
        boolean acceptAllInterfaces = true;
        if (this.annotationClass != null) {
            addIncludeFilter(new AnnotationTypeFilter(this.annotationClass));
            acceptAllInterfaces = false;
        }

        if (acceptAllInterfaces) {
            addIncludeFilter(new TypeFilter() {
                @Override
                public boolean match(MetadataReader metadataReader, MetadataReaderFactory metadataReaderFactory) throws IOException {
                    return true;
                }
            });
        }

        // exclude package-info.java
        addExcludeFilter(new TypeFilter() {
            @Override
            public boolean match(MetadataReader metadataReader, MetadataReaderFactory metadataReaderFactory) throws IOException {
                String className = metadataReader.getClassMetadata().getClassName();
                return className.endsWith("package-info");
            }
        });
    }

    private void processBeanDefinitions(Set<BeanDefinitionHolder> beanDefinitionHolders) {
        GenericBeanDefinition genericBeanDefinition = null;

        for (BeanDefinitionHolder holder : beanDefinitionHolders) {
            genericBeanDefinition = (GenericBeanDefinition) holder.getBeanDefinition();
            // genericBeanDefinition.getConstructorArgumentValues().addGenericArgumentValue(genericBeanDefinition.getBeanClassName());
            // genericBeanDefinition.setBeanClass(this.rpcFactoryBean.getClass());

            genericBeanDefinition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);


            /**
             * Bean的初始化就可以通過(guò)Supplier.get() 和 設(shè)置factoryBean.getObject 有異曲同工之妙
             */

//            try {
//                genericBeanDefinition.setInstanceSupplier(new RpcSupplier<>(Class.forName(genericBeanDefinition.getBeanClassName())));
//            } catch (Exception ex) {
//                throw new RuntimeException(ex);
//            }


            /**
             *
             * 指定factoryMethodName,FactoryBeanName
             * 當(dāng)我們能找到無(wú)參 就先執(zhí)行無(wú)參方法,最后執(zhí)行有參的方法。方法的參數(shù)來(lái)源于ConstructorArgumentValue
             *
             * 這里設(shè)置獨(dú)一份的鉴吹。 我們?nèi)绻O(shè)置了FactoryMethodName, 要注入的類型要和Method ReturnType要匹配起來(lái)
             * 不然會(huì)報(bào)錯(cuò)姨伟。這個(gè)customFactoryBean不能設(shè)置成泛型
             */

            /**
             * DefaultListableBeanFactory:findAutowireCandidates
             * DefaultListableBeanFactory:doGetBeanNamesForType
             * AbstractAutowiredCapableBeanFactory:determineTargetType
             * AbstractAutowiredCapableBeanFactory:getTypeForFactoryMethod
             */
            genericBeanDefinition.getConstructorArgumentValues().addGenericArgumentValue(genericBeanDefinition.getBeanClassName());
            genericBeanDefinition.setFactoryMethodName("getObject");
            genericBeanDefinition.setFactoryBeanName("customFactoryBean");


            // genericBeanDefinition.setFactoryBeanName();
            // genericBeanDefinition.setFactoryMethodName();
            log.info("ClassPathRpcScanner設(shè)置GenericBeanDefinition:{}", genericBeanDefinition);
            log.info("ClassPathRpcScanner設(shè)置BeanDefinitionHolder:{}", holder);
        }
    }

    @Override
    protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
        return beanDefinition.getMetadata().isInterface() && beanDefinition.getMetadata().isIndependent();
    }
}

指定FactoryBeanName和FactoryMethodName創(chuàng)建代理對(duì)象

public class CustomFactoryBean<T> {

    @Autowired
    private RpcFactory<T> rpcFactory;

    public <T> T getObject(Class<T> rpcInterface) {
        return (T) Proxy.newProxyInstance(rpcInterface.getClassLoader(), new Class[] {rpcInterface}, this.rpcFactory);
    }


    public static void main(String[] args) {
        Class factoryClass = ClassUtils.getUserClass(CustomFactoryBean.class);
        Method[] candidates = ReflectionUtils.getUniqueDeclaredMethods(factoryClass);
        Method getObject = null;
        for (Method method : candidates) {
            if ("getObject".equalsIgnoreCase(method.getName())) {
                getObject = method;
            }
        }
        System.out.println(getObject);
        /**
         * 測(cè)試泛型
         */
        System.out.println(getObject.getTypeParameters().length);
        System.out.println(getObject.getTypeParameters());
        System.out.println(Arrays.asList(getObject.getParameterTypes()));
    }


}

FactoryBean創(chuàng)建代理對(duì)象

public class RpcFactoryBean<T> implements FactoryBean<T> {

    private Class<T> rpcInterface;

    @Autowired
    private RpcFactory<T> rpcFactory;

    public RpcFactoryBean() {

    }

    public RpcFactoryBean(Class<T> rpcInterface) {
        this.rpcInterface = rpcInterface;
    }

    @Nullable
    @Override
    public T getObject() throws Exception {
        return getRpc();
    }

    @Nullable
    @Override
    public Class<?> getObjectType() {
        return this.rpcInterface;
    }

    @Override
    public boolean isSingleton() {
        return (3&1) == 1;
    }

    public <T> T getRpc() {
        /**
         * 這一步不要寫成了rpcInterface.getInterfaces()
         */
        return (T) Proxy.newProxyInstance(rpcInterface.getClassLoader(), new Class[] {this.rpcInterface}, this.rpcFactory);
    }

    public static void main(String[] args) {
        System.out.println(InfoUserService.class.getInterfaces());
    }
}

Supplier方式創(chuàng)建代理對(duì)象

public class RpcSupplier<T> implements Supplier<T> {

    private Class<T> rpcInterface;

    public RpcSupplier(Class<T> rpcInterface) {
        this.rpcInterface = rpcInterface;
    }

    /**
     * 這里不用注入的方式, 因?yàn)镽pcSupplier沒(méi)有被Spring容器托管
     */
    private RpcFactory<T> rpcFactory = null;

    @Override
    public T get() {
        ApplicationContext context = SpringApplicationContextUtil.getApplicationContext();

        if (context != null) {
            rpcFactory = context.getBean(RpcFactory.class);
        }

        return (T) Proxy.newProxyInstance(rpcInterface.getClassLoader(), new Class[] {this.rpcInterface}, this.rpcFactory);
    }

}

InvocationHandler

@Component
@Slf4j
public class RpcFactory<T> implements InvocationHandler {

    @Autowired(required = false)
    private NettyClient nettyClient = new NettyClient();

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Request request = new Request();
        request.setClassName(method.getDeclaringClass().getName());
        request.setMethodName(method.getName());
        request.setParameters(args);
        request.setParameterType(method.getParameterTypes());
        request.setId(IdUtil.getId());

        Object result = nettyClient.send(request);
        Class<?> returnType = method.getReturnType();

        Response response = JSON.parseObject(result.toString(), Response.class);

        if (response.getCode() == 1) {
            throw new RuntimeException(response.getErrorMsg());
        }

        if (returnType.isPrimitive() || String.class.isAssignableFrom(returnType)) {
            return response.getData();
        } else if (Collection.class.isAssignableFrom(returnType)) {
            return JSONArray.parseArray(response.getData().toString(), Object.class);
        } else if (Map.class.isAssignableFrom(returnType)) {
            return JSON.parseObject(response.getData().toString(), Map.class);
        } else {
            Object data = response.getData();
            if (data != null) {
                return JSON.parseObject(data.toString(), returnType);
            } else {
                return null;
            }
        }

    }
}

Rpc掃描配置,指定要掃描的路徑和服務(wù)接口豆励。

@Component
public class RpcScannerConfigurer implements BeanDefinitionRegistryPostProcessor {

    private static final String BASE_PACKAGE = "com.cmazxiaoma.springcloud.netty.service";

    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
        ClassPathRpcScanner classPathRpcScanner = new ClassPathRpcScanner(registry);
        classPathRpcScanner.setAnnotationClass(RpcService.class);
        classPathRpcScanner.registerFilter();
        classPathRpcScanner.scan(StringUtils.tokenizeToStringArray(BASE_PACKAGE, ConfigurableApplicationContext.CONFIG_LOCATION_DELIMITERS));
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {

    }

    public static void main(String[] args) {
        System.out.println(Arrays.asList(StringUtils.tokenizeToStringArray("com.cmazxiaoma.springcloud.netty.service",  ConfigurableApplicationContext.CONFIG_LOCATION_DELIMITERS)));
    }
}

在我們netty-rpc-client, 我們掃描所有被@RpcService注解的接口, 為這些接口
創(chuàng)建BeanDefinition, 指定其BeanClass是我們的factory類, 指定其接口的類型夺荒。

我們?cè)谶@個(gè)factory類,給服務(wù)接口做動(dòng)態(tài)代理,其InvocationHandler中的invoke函數(shù)就會(huì)發(fā)起rpc調(diào)用良蒸。

那我們?cè)趺匆梅?wù)者提供者接口呢?

Controller類中通過(guò)@Autowired注入服務(wù)提供者接口即可技扼。

再看XxlJob的實(shí)現(xiàn)

比如我們?cè)?code>Controller中 把服務(wù)接口打上@XxlRpcReference注解, 可以設(shè)置timeout,version,序列化算法等等等屬性

然后Spring容器在這個(gè)服務(wù)接口依賴的bean實(shí)例化(postProcessAfterInstantiation)的時(shí)候, 會(huì)為被@XxlRpcReference注解的服務(wù)接口字段,創(chuàng)建XxlRpcReferenceBean, 同時(shí)把引用賦值給這些服務(wù)接口嫩痰。

XxlRpcReferenceBean也是一個(gè)工廠類, 內(nèi)部也做了動(dòng)態(tài)代理剿吻。

我覺(jué)得這些XxlRpcReferenceBean最好緩存起來(lái),不然10次引用服務(wù)接口,就要?jiǎng)?chuàng)建10次對(duì)象(內(nèi)部還要?jiǎng)?chuàng)建JettyClient對(duì)象)。

public class XxlRpcSpringInvokerFactory extends InstantiationAwareBeanPostProcessorAdapter implements InitializingBean,DisposableBean, BeanFactoryAware {
    private Logger logger = LoggerFactory.getLogger(XxlRpcSpringInvokerFactory.class);

    // ---------------------- config ----------------------

    private Class<? extends ServiceRegistry> serviceRegistryClass;          // class.forname
    private Map<String, String> serviceRegistryParam;


    public void setServiceRegistryClass(Class<? extends ServiceRegistry> serviceRegistryClass) {
        this.serviceRegistryClass = serviceRegistryClass;
    }

    public void setServiceRegistryParam(Map<String, String> serviceRegistryParam) {
        this.serviceRegistryParam = serviceRegistryParam;
    }


    // ---------------------- util ----------------------

    private XxlRpcInvokerFactory xxlRpcInvokerFactory;

    @Override
    public void afterPropertiesSet() throws Exception {
        // start invoker factory
        xxlRpcInvokerFactory = new XxlRpcInvokerFactory(serviceRegistryClass, serviceRegistryParam);
        xxlRpcInvokerFactory.start();
    }

    @Override
    public boolean postProcessAfterInstantiation(final Object bean, final String beanName) throws BeansException {

        ReflectionUtils.doWithFields(bean.getClass(), new ReflectionUtils.FieldCallback() {
            @Override
            public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
                if (field.isAnnotationPresent(XxlRpcReference.class)) {
                    // valid
                    Class iface = field.getType();
                    if (!iface.isInterface()) {
                        throw new XxlRpcException("xxl-rpc, reference(XxlRpcReference) must be interface.");
                    }

                    XxlRpcReference rpcReference = field.getAnnotation(XxlRpcReference.class);

                    // init reference bean
                    XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean(
                            rpcReference.netType(),
                            rpcReference.serializer().getSerializer(),
                            rpcReference.callType(),
                            iface,
                            rpcReference.version(),
                            rpcReference.timeout(),
                            rpcReference.address(),
                            rpcReference.accessToken(),
                            null
                    );

                    Object serviceProxy = referenceBean.getObject();

                    // set bean
                    field.setAccessible(true);
                    field.set(bean, serviceProxy);

                    logger.info(">>>>>>>>>>> xxl-rpc, invoker factory init reference bean success. serviceKey = {}, bean.field = {}.{}",
                            XxlRpcProviderFactory.makeServiceKey(iface.getName(), rpcReference.version()), beanName, field.getName());
                }
            }
        });

        return super.postProcessAfterInstantiation(bean, beanName);
    }


    @Override
    public void destroy() throws Exception {

        // stop invoker factory
        xxlRpcInvokerFactory.stop();
    }

    private BeanFactory beanFactory;

    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = beanFactory;
    }
}

在我們netty-rpc-server端, 掃描@RpcService維護(hù)到一個(gè)map里面, key是接口類型,value是服務(wù)提供者實(shí)現(xiàn)類串纺。當(dāng)消費(fèi)者發(fā)起rpc調(diào)用時(shí), 需要調(diào)用服務(wù)提供者實(shí)現(xiàn)類的method時(shí), 就減少了反射服務(wù)提供者實(shí)現(xiàn)類的性能開銷丽旅。這里是不是可以把服務(wù)實(shí)現(xiàn)類的所有method都可以緩存起來(lái)? 我覺(jué)得有利有弊,可以通過(guò)CPU纺棺,內(nèi)存榄笙,維護(hù)緩存的成本這3個(gè)維度決策。綜合考慮祷蝌,還是不要緩存起來(lái)較好茅撞。一個(gè)method反射次數(shù)超過(guò)15次,會(huì)交給MethodAccessorImpl處理巨朦,會(huì)在內(nèi)存中生成對(duì)應(yīng)的字節(jié)碼米丘,并調(diào)用ClassDefiner.defineClass創(chuàng)建對(duì)應(yīng)的class對(duì)象,性能會(huì)得到一定的提升糊啡。

Java 反射效率低的原因:

  • Method#invoke 方法會(huì)對(duì)參數(shù)做封裝和解封操作:我們可以看到拄查,invoke 方法的參數(shù)是 Object[] 類型,也就是說(shuō)悔橄,如果方法參數(shù)是簡(jiǎn)單類型的話靶累,需要在此轉(zhuǎn)化成 Object 類型,例如 long ,在 javac compile 的時(shí)候 用了Long.valueOf() 轉(zhuǎn)型癣疟,也就大量了生成了Long 的 Object, 同時(shí) 傳入的參數(shù)是Object[]數(shù)值,那還需要額外封裝object數(shù)組挣柬。而在上面 MethodAccessorGenerator#emitInvoke 方法里我們看到,生成的字節(jié)碼時(shí)睛挚,會(huì)把參數(shù)數(shù)組拆解開來(lái)邪蛔,把參數(shù)恢復(fù)到?jīng)]有被 Object[] 包裝前的樣子,同時(shí)還要對(duì)參數(shù)做校驗(yàn)扎狱,這里就涉及到了解封操作侧到。因此勃教,在反射調(diào)用的時(shí)候,因?yàn)榉庋b和解封匠抗,產(chǎn)生了額外的不必要的內(nèi)存浪費(fèi)故源,當(dāng)調(diào)用次數(shù)達(dá)到一定量的時(shí)候,還會(huì)導(dǎo)致 GC汞贸。

  • 需要檢查方法可見性绳军。我們會(huì)發(fā)現(xiàn),反射時(shí)每次調(diào)用都必須檢查方法的可見性(在 Method.invoke 里)

  • 需要校驗(yàn)參數(shù):反射時(shí)也必須檢查每個(gè)實(shí)際參數(shù)與形式參數(shù)的類型匹配性(在NativeMethodAccessorImpl.invoke0 里或者生成的 Java 版 MethodAccessor.invoke 里)

  • 反射方法難以內(nèi)聯(lián):Method invoke 就像是個(gè)獨(dú)木橋一樣矢腻,各處的反射調(diào)用都要擠過(guò)去门驾,在調(diào)用點(diǎn)上收集到的類型信息就會(huì)很亂,影響內(nèi)聯(lián)程序的判斷多柑,使得 Method.invoke() 自身難以被內(nèi)聯(lián)到調(diào)用方奶是。(方法內(nèi)聯(lián)指的是在即時(shí)編譯過(guò)程中遇到方法調(diào)用時(shí),直接編譯目標(biāo)方法的方法體,并替換原方法調(diào)用。這樣就不再需要像調(diào)用方法那樣的壓棧竣灌,出棧聂沙,傳參了)

  • JIT 無(wú)法優(yōu)化:因?yàn)榉瓷渖婕暗絼?dòng)態(tài)加載的類型,所以無(wú)法進(jìn)行優(yōu)化帐偎。

回到正軌逐纬,我們這一套實(shí)現(xiàn)和Xxl-Rpc的實(shí)現(xiàn)方式大同小異。

public class XxlRpcSpringProviderFactory extends XxlRpcProviderFactory implements ApplicationContextAware, InitializingBean,DisposableBean {

    // ---------------------- config ----------------------

    private String netType = NetEnum.JETTY.name();
    private String serialize = Serializer.SerializeEnum.HESSIAN.name();

    private String ip = IpUtil.getIp();             // for registry
    private int port = 7080;                        // default port
    private String accessToken;

    private Class<? extends ServiceRegistry> serviceRegistryClass;                          // class.forname
    private Map<String, String> serviceRegistryParam;


    // set
    public void setNetType(String netType) {
        this.netType = netType;
    }

    public void setSerialize(String serialize) {
        this.serialize = serialize;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public void setAccessToken(String accessToken) {
        this.accessToken = accessToken;
    }

    public void setServiceRegistryClass(Class<? extends ServiceRegistry> serviceRegistryClass) {
        this.serviceRegistryClass = serviceRegistryClass;
    }

    public void setServiceRegistryParam(Map<String, String> serviceRegistryParam) {
        this.serviceRegistryParam = serviceRegistryParam;
    }


    // util
    private void prepareConfig(){

        // prepare config
        NetEnum netTypeEnum = NetEnum.autoMatch(netType, null);
        Serializer.SerializeEnum serializeEnum = Serializer.SerializeEnum.match(serialize, null);
        Serializer serializer = serializeEnum!=null?serializeEnum.getSerializer():null;

        if (port <= 0) {
            throw new XxlRpcException("xxl-rpc provider port["+ port +"] is unvalid.");
        }
        if (NetUtil.isPortUsed(port)) {
            throw new XxlRpcException("xxl-rpc provider port["+ port +"] is used.");
        }

        // init config
        super.initConfig(netTypeEnum, serializer, ip, port, accessToken, serviceRegistryClass, serviceRegistryParam);
    }


    // ---------------------- util ----------------------

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

        Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(XxlRpcService.class);
        if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
            for (Object serviceBean : serviceBeanMap.values()) {
                // valid
                if (serviceBean.getClass().getInterfaces().length ==0) {
                    throw new XxlRpcException("xxl-rpc, service(XxlRpcService) must inherit interface.");
                }
                // add service
                XxlRpcService xxlRpcService = serviceBean.getClass().getAnnotation(XxlRpcService.class);

                String iface = serviceBean.getClass().getInterfaces()[0].getName();
                String version = xxlRpcService.version();

                super.addService(iface, version, serviceBean);
            }
        }

        // TODO削樊,addServices by api + prop

    }

    @Override
    public void afterPropertiesSet() throws Exception {
        this.prepareConfig();
        super.start();
    }

    @Override
    public void destroy() throws Exception {
        super.stop();
    }

}

  • 3.XxlJob對(duì)Groovy的支持,同時(shí)還支持注入Spring中的bean.(畫外音:Zuul使用Grovvy定義動(dòng)態(tài)過(guò)濾器的時(shí)候兔毒,刪除Grovvy文件并不能從當(dāng)前運(yùn)行的api網(wǎng)關(guān)中移除這個(gè)過(guò)濾器漫贞,只能將shouldFilter返回false。目前過(guò)濾器無(wú)法注入Spring中的bean)育叁。
public class GlueFactory {


    private static GlueFactory glueFactory = new GlueFactory();
    public static GlueFactory getInstance(){
        return glueFactory;
    }
    public static void refreshInstance(int type){
        if (type == 0) {
            glueFactory = new GlueFactory();
        } else if (type == 1) {
            glueFactory = new SpringGlueFactory();
        }
    }


    /**
     * groovy class loader
     */
    private GroovyClassLoader groovyClassLoader = new GroovyClassLoader();


    /**
     * load new instance, prototype
     *
     * @param codeSource
     * @return
     * @throws Exception
     */
    public IJobHandler loadNewInstance(String codeSource) throws Exception{
        if (codeSource!=null && codeSource.trim().length()>0) {
            Class<?> clazz = groovyClassLoader.parseClass(codeSource);
            if (clazz != null) {
                Object instance = clazz.newInstance();
                if (instance!=null) {
                    if (instance instanceof IJobHandler) {
                        this.injectService(instance);
                        return (IJobHandler) instance;
                    } else {
                        throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadNewInstance error, "
                                + "cannot convert from instance["+ instance.getClass() +"] to IJobHandler");
                    }
                }
            }
        }
        throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadNewInstance error, instance is null");
    }

    /**
     * inject service of bean field
     *
     * @param instance
     */
    public void injectService(Object instance) {
        // do something
    }

}
public class SpringGlueFactory extends GlueFactory {
    private static Logger logger = LoggerFactory.getLogger(SpringGlueFactory.class);


    /**
     * inject action of spring
     * @param instance
     */
    @Override
    public void injectService(Object instance){
        if (instance==null) {
            return;
        }

        if (XxlJobSpringExecutor.getApplicationContext() == null) {
            return;
        }

        Field[] fields = instance.getClass().getDeclaredFields();
        for (Field field : fields) {
            if (Modifier.isStatic(field.getModifiers())) {
                continue;
            }

            Object fieldBean = null;
            // with bean-id, bean could be found by both @Resource and @Autowired, or bean could only be found by @Autowired

            if (AnnotationUtils.getAnnotation(field, Resource.class) != null) {
                try {
                    Resource resource = AnnotationUtils.getAnnotation(field, Resource.class);
                    if (resource.name()!=null && resource.name().length()>0){
                        fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(resource.name());
                    } else {
                        fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(field.getName());
                    }
                } catch (Exception e) {
                }
                if (fieldBean==null ) {
                    fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(field.getType());
                }
            } else if (AnnotationUtils.getAnnotation(field, Autowired.class) != null) {
                Qualifier qualifier = AnnotationUtils.getAnnotation(field, Qualifier.class);
                if (qualifier!=null && qualifier.value()!=null && qualifier.value().length()>0) {
                    fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(qualifier.value());
                } else {
                    fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(field.getType());
                }
            }

            if (fieldBean!=null) {
                field.setAccessible(true);
                try {
                    field.set(instance, fieldBean);
                } catch (IllegalArgumentException e) {
                    logger.error(e.getMessage(), e);
                } catch (IllegalAccessException e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    }

}

  • 4.Shiro通過(guò)實(shí)現(xiàn)DestructionAwareBeanPostProcessor完成對(duì)Bean生命周期的掌握迅脐,異曲同工之妙的還有ApplicationListenerDetector等等等。

我覺(jué)得xxl-job可以用這個(gè)更加優(yōu)雅的方式來(lái)完成對(duì)bean的生命周期的托管豪嗽。

public class LifecycleBeanPostProcessor implements DestructionAwareBeanPostProcessor, PriorityOrdered {

    /**
     * Private internal class log instance.
     */
    private static final Logger log = LoggerFactory.getLogger(LifecycleBeanPostProcessor.class);

    /**
     * Order value of this BeanPostProcessor.
     */
    private int order;

    /**
     * Default Constructor.
     */
    public LifecycleBeanPostProcessor() {
        this(LOWEST_PRECEDENCE);
    }

    /**
     * Constructor with definable {@link #getOrder() order value}.
     *
     * @param order order value of this BeanPostProcessor.
     */
    public LifecycleBeanPostProcessor(int order) {
        this.order = order;
    }

    /**
     * Calls the <tt>init()</tt> methods on the bean if it implements {@link org.apache.shiro.util.Initializable}
     *
     * @param object the object being initialized.
     * @param name   the name of the bean being initialized.
     * @return the initialized bean.
     * @throws BeansException if any exception is thrown during initialization.
     */
    public Object postProcessBeforeInitialization(Object object, String name) throws BeansException {
        if (object instanceof Initializable) {
            try {
                if (log.isDebugEnabled()) {
                    log.debug("Initializing bean [" + name + "]...");
                }

                ((Initializable) object).init();
            } catch (Exception e) {
                throw new FatalBeanException("Error initializing bean [" + name + "]", e);
            }
        }
        return object;
    }


    /**
     * Does nothing - merely returns the object argument immediately.
     */
    public Object postProcessAfterInitialization(Object object, String name) throws BeansException {
        // Does nothing after initialization
        return object;
    }


    /**
     * Calls the <tt>destroy()</tt> methods on the bean if it implements {@link org.apache.shiro.util.Destroyable}
     *
     * @param object the object being initialized.
     * @param name   the name of the bean being initialized.
     * @throws BeansException if any exception is thrown during initialization.
     */
    public void postProcessBeforeDestruction(Object object, String name) throws BeansException {
        if (object instanceof Destroyable) {
            try {
                if (log.isDebugEnabled()) {
                    log.debug("Destroying bean [" + name + "]...");
                }

                ((Destroyable) object).destroy();
            } catch (Exception e) {
                throw new FatalBeanException("Error destroying bean [" + name + "]", e);
            }
        }
    }

    /**
     * Order value of this BeanPostProcessor.
     *
     * @return order value.
     */
    public int getOrder() {
        // LifecycleBeanPostProcessor needs Order. See https://issues.apache.org/jira/browse/SHIRO-222
        return order;
    }

    /**
     * Return true only if <code>bean</code> implements Destroyable.
     * @param bean bean to check if requires destruction.
     * @return true only if <code>bean</code> implements Destroyable.
     * @since 1.4
     */
    @SuppressWarnings("unused")
    public boolean requiresDestruction(Object bean) {
        return (bean instanceof Destroyable);
    }
}
    1. Dubbo中谴蔑,Invoke是一個(gè)關(guān)鍵組件。在服務(wù)者和消費(fèi)者之間都充當(dāng)服務(wù)調(diào)用的一個(gè)職責(zé)龟梦,有點(diǎn)像xxl-job中的XxlRpcProviderFactory隐锭。

Dubbo中的ProxyFactorygetInvoker方法用于在服務(wù)提供者端计贰,將服務(wù)的具體實(shí)現(xiàn)類轉(zhuǎn)換成Invoker钦睡。getProxy方法用于在消費(fèi)者端,將invoker轉(zhuǎn)換成客戶端需要的接口躁倒。在服務(wù)提供者ServiceConfig和消費(fèi)者ReferenceConfig中荞怒,都會(huì)對(duì)proxyFactory通過(guò)ExtensionLoader擴(kuò)展機(jī)制生成適配類ProxyFactory$Adaptive洒琢。這個(gè)適配類會(huì)根據(jù)URLProxyFactory參數(shù)選擇對(duì)應(yīng)的實(shí)現(xiàn)類進(jìn)行操作。

/**
 * ProxyFactory. (API/SPI, Singleton, ThreadSafe)
 */
@SPI("javassist")
public interface ProxyFactory {

    /**
     * create proxy.
     *
     * @param invoker
     * @return proxy
     */
    @Adaptive({Constants.PROXY_KEY})
    <T> T getProxy(Invoker<T> invoker) throws RpcException;

    /**
     * create invoker.
     *
     * @param <T>
     * @param proxy
     * @param type
     * @param url
     * @return invoker
     */
    @Adaptive({Constants.PROXY_KEY})
    <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;

}

ReferenceBean創(chuàng)建的代理對(duì)象褐桌,就要經(jīng)過(guò)InvokerInovationHandler處理衰抑。首先會(huì)經(jīng)過(guò)AbstractInvoker中的public Result invoke(Invocation inv) throws RpcException,然后再交給其子類doInvoke(invocation)實(shí)現(xiàn)荧嵌。

public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }

}

可以看看DubboInvoker中的doInvoke實(shí)現(xiàn)呛踊,發(fā)起遠(yuǎn)程調(diào)用。

@Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout);
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

服務(wù)提供者收到請(qǐng)求時(shí)完丽,會(huì)回調(diào)此方法恋技。最終會(huì)通過(guò)AbstractProxyInvoker調(diào)用到上文提到過(guò) ProxyFactory生成的Invoker對(duì)象。

private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

        @Override
        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);
                // need to consider backward-compatibility if it's a callback
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || methodsStr.indexOf(",") == -1) {
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods) {
                            if (inv.getMethodName().equals(method)) {
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod) {
                        logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                                + " not found in callback service interface ,invoke will be ignored."
                                + " please update the api interface. url is:"
                                + invoker.getUrl()) + " ,invocation is :" + inv);
                        return null;
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: "
                    + (message == null ? null : (message.getClass().getName() + ": " + message))
                    + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }

AbstractProxyInvoker最終會(huì)調(diào)用服務(wù)提供者類中的方法逻族。

    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        try {
            return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
        } catch (InvocationTargetException e) {
            return new RpcResult(e.getTargetException());
        } catch (Throwable e) {
            throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
public class JavassistProxyFactory extends AbstractProxyFactory {

    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

}

我分析的比較簡(jiǎn)陋蜻底,畢竟這篇文章可不能讓Dubbo喧賓奪主。對(duì)于xxl-Rpc在消費(fèi)者掃描服務(wù)接口做動(dòng)態(tài)代理聘鳞,在提供者掃描服務(wù)實(shí)例并維護(hù)的實(shí)現(xiàn)薄辅,我們對(duì)此可以看下Dubbo在這一塊是怎么實(shí)現(xiàn)的。(在Dubbo中抠璃,服務(wù)實(shí)例維護(hù)主要依托Exporter站楚。它是服務(wù)暴露數(shù)據(jù)結(jié)構(gòu),規(guī)定了通常暴露某個(gè)協(xié)議的服務(wù)時(shí)搏嗡,exporter具有獲取該服務(wù)可執(zhí)行對(duì)象Invoker的能力窿春,以及取消暴露的能力。Invoker轉(zhuǎn)化為Exporter時(shí)服務(wù)暴露的關(guān)鍵采盒, 將該協(xié)議下的Exporter維護(hù)到exporterMap中將完成整個(gè)服務(wù)的暴露旧乞,往往每一種協(xié)議都會(huì)生成與之對(duì)應(yīng)的ExporterExporter本文不會(huì)講到磅氨,有興趣的可以看下DubboProtocol,RegistryProtocol)

欲知后事如何,且聽下回分解
萬(wàn)萬(wàn)沒(méi)想到一個(gè)xxl-job源碼分析尺栖,竟然能引發(fā)這么多血案!(下)


尾言

萬(wàn)萬(wàn)沒(méi)想到烦租,一個(gè)知識(shí)點(diǎn)竟然能引發(fā)這么多血案延赌!溜了溜了,俯臥撐搞起叉橱。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末挫以,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子赏迟,更是在濱河造成了極大的恐慌屡贺,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,122評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異甩栈,居然都是意外死亡泻仙,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門量没,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)玉转,“玉大人,你說(shuō)我怎么就攤上這事殴蹄【孔ィ” “怎么了?”我有些...
    開封第一講書人閱讀 164,491評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵袭灯,是天一觀的道長(zhǎng)刺下。 經(jīng)常有香客問(wèn)我,道長(zhǎng)稽荧,這世上最難降的妖魔是什么橘茉? 我笑而不...
    開封第一講書人閱讀 58,636評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮姨丈,結(jié)果婚禮上畅卓,老公的妹妹穿的比我還像新娘。我一直安慰自己蟋恬,他們只是感情好翁潘,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,676評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著歼争,像睡著了一般拜马。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上沐绒,一...
    開封第一講書人閱讀 51,541評(píng)論 1 305
  • 那天一膨,我揣著相機(jī)與錄音,去河邊找鬼洒沦。 笑死,一個(gè)胖子當(dāng)著我的面吹牛价淌,可吹牛的內(nèi)容都是我干的申眼。 我是一名探鬼主播,決...
    沈念sama閱讀 40,292評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼蝉衣,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼括尸!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起病毡,我...
    開封第一講書人閱讀 39,211評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤濒翻,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體有送,經(jīng)...
    沈念sama閱讀 45,655評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡淌喻,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,846評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了雀摘。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片裸删。...
    茶點(diǎn)故事閱讀 39,965評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖阵赠,靈堂內(nèi)的尸體忽然破棺而出涯塔,到底是詐尸還是另有隱情,我是刑警寧澤清蚀,帶...
    沈念sama閱讀 35,684評(píng)論 5 347
  • 正文 年R本政府宣布匕荸,位于F島的核電站,受9級(jí)特大地震影響枷邪,放射性物質(zhì)發(fā)生泄漏榛搔。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,295評(píng)論 3 329
  • 文/蒙蒙 一齿风、第九天 我趴在偏房一處隱蔽的房頂上張望药薯。 院中可真熱鬧,春花似錦救斑、人聲如沸童本。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)穷娱。三九已至,卻和暖如春运沦,著一層夾襖步出監(jiān)牢的瞬間泵额,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工携添, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留嫁盲,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,126評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親控淡。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,914評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 先看官網(wǎng)兩張圖【引用來(lái)自官網(wǎng)】:image.png 官網(wǎng)說(shuō)明: 1.首先 ServiceConfig 類拿到對(duì)外提...
    致慮閱讀 1,434評(píng)論 1 4
  • 我準(zhǔn)備戰(zhàn)斗到最后瘾蛋,不是因?yàn)槲矣赂遥俏蚁胍娮C一切矫限。 ——雙雪濤《獵人》 [TOC]Thinking 一個(gè)技術(shù)...
    小安的大情調(diào)閱讀 2,001評(píng)論 0 0
  • 時(shí)序圖 在講解源碼前哺哼,先看下官方文檔提供的時(shí)序圖佩抹,后面的講解基本是這個(gè)路線,但是會(huì)更細(xì)節(jié)化 大致邏輯 首先服務(wù)的實(shí)...
    土豆肉絲蓋澆飯閱讀 2,891評(píng)論 2 3
  • Pomelo中游戲服務(wù)器是一個(gè)多進(jìn)程相互協(xié)作的環(huán)境取董,各個(gè)進(jìn)程之間通信采用RPC(遠(yuǎn)程過(guò)程調(diào)用)的形式完成棍苹,通過(guò)底層...
    JunChow520閱讀 1,948評(píng)論 0 1
  • 1.服務(wù)發(fā)布概述 Dubbo 服務(wù)導(dǎo)出過(guò)程始于 Spring 容器發(fā)布刷新事件[dubbo:service -->...
    喂豬喝拿鐵閱讀 285評(píng)論 0 0