源碼解讀Dubbo分層設計思想

如圖描述Dubbo實現(xiàn)的RPC整體分10層:service、config昌罩、proxy哭懈、registry、cluster茎用、monitor遣总、protocol睬罗、exchange、transport旭斥、serialize容达。

service:使用方定義的接口和實現(xiàn)類;

config:負責解析Dubbo定義的配置垂券,比如注解和xml配置花盐,各種參數(shù);

proxy:主要負責生成消費者和提供者的代理對象菇爪,加載框架功能算芯,比如提供者過濾器鏈,擴展點凳宙;

registry:負責注冊服務的定義和實現(xiàn)類的裝載熙揍;

cluster:只有消費者有這么一層,負責包裝多個服務提供者成一個‘大提供者’氏涩,加載負載均衡诈嘿、路有等擴展點;

monitor:定義監(jiān)控服務削葱,加載監(jiān)控實現(xiàn)提供者奖亚;

protocol:封裝RPC調(diào)用接口,管理調(diào)用實體的生命周期析砸;

exchange:封裝請求響應模式昔字,同步轉(zhuǎn)異步;

transport:抽象傳輸層模型首繁,兼容netty作郭、mina、grizzly等通訊框架弦疮;

serialize:抽象序列化模型夹攒,兼容多種序列化框架,包括:fastjson胁塞、fst咏尝、hessian2、kryo啸罢、kryo2编检、protobuf等,通過序列化支持跨語言的方式扰才,支持跨語言的rpc調(diào)用允懂;

Dubbo這么分層的目的在于實現(xiàn)層與層之間的解耦,每一層都定義了接口規(guī)范衩匣,也可以根據(jù)不同的業(yè)務需求定制蕾总、加載不同的實現(xiàn)粥航,具有極高的擴展性。

1生百,RPC調(diào)用過程

接下來結(jié)合上圖簡單描述一次完整的rpc調(diào)用過程:

從Dubbo分層的角度看递雀,詳細時序圖如下,藍色部分是服務消費端置侍,淺綠色部分是服務提供端映之,時序圖從消費端一次Dubbo方法調(diào)用開始拦焚,到服務端本地方法執(zhí)行結(jié)束蜡坊。


從Dubbo核心領域?qū)ο蟮慕嵌瓤矗覀円?a target="_blank">Dubbo官方文檔說明赎败,如下圖所示秕衙。Dubbo核心領域?qū)ο笫荌nvoker,消費端代理對象是proxy僵刮,包裝了Invoker的調(diào)用据忘;服務端代理對象是一個Invoker,他通過exporter包裝搞糕,當服務端接收到調(diào)用請求后勇吊,通過exporter找到Invoker,Invoker去實際執(zhí)行用戶的業(yè)務邏輯窍仰。


2汉规,Dubbo服務的注冊和發(fā)現(xiàn)流程

主要流程是:從注冊中心訂閱服務提供者,然后啟動tcp服務連接遠端提供者驹吮,將多個服務提供者合并成一個Invoker针史,用這個Invoker創(chuàng)建代理對象。


下圖出自開發(fā)指南-框架設計-暴露服務時序碟狞,主要流程是:創(chuàng)建本地服務的代理Invoker啄枕,啟動tcp服務暴露服務,然后將服務注冊到注冊中心族沃。


結(jié)合Dubbo服務的注冊和發(fā)現(xiàn)频祝,從配置層開始解釋每一層的作用和原理。

示例服務接口定義如下:

public interface CouponServiceViewFacade {

? ? /**

? ? * 查詢單張優(yōu)惠券

? ? */

? ? CouponViewDTO query(String code);

}



二:配置層:

1脆淹,做什么

配置層提供配置處理工具類智润,在容器啟動的時候,通過ServiceConfig.export實例化服務提供者未辆,ReferenceConfig.get實例化服務消費者對象窟绷。

Dubbo應用使用spring容器啟動時,Dubbo服務提供者配置處理器通過ServiceConfig.export啟動Dubbo遠程服務暴露本地服務咐柜。Dubbo服務消費者配置處理器通過ReferenceConfig.get實例化一個代理對象兼蜈,并通過注冊中心服務發(fā)現(xiàn)攘残,連接遠端服務提供者。

Dubbo配置可以使用注解和xml兩種形式为狸,本文采用注解的形式進行說明歼郭。


2,怎么做辐棒?

2.1服務消費端的解析

Spring容器啟動過程中病曾,填充bean屬性時,對含有Dubbo引用注解的屬性使用org.apache.dubbo.config.spring.beans.factory.annotation.ReferenceAnnotationBeanPostProcessor進行初始化漾根。如下是ReferenceAnnotationBeanPostProcessor的構(gòu)造方法泰涂,Dubbo服務消費者注解處理器處理以下三個注解:DubboReference.class、Reference.class辐怕、com.alibaba.dubbo.config.annotation.Reference.class修飾的類逼蒙。

ReferenceAnnotationBeanPostProcessor類定義:

public class ReferenceAnnotationBeanPostProcessor extends AbstractAnnotationBeanPostProcessor implements

? ? ? ? ApplicationContextAware {

? ? public ReferenceAnnotationBeanPostProcessor() {

? ? ? ? super(DubboReference.class, Reference.class, com.alibaba.dubbo.config.annotation.Reference.class);

? ? }

}


Dubbo服務發(fā)現(xiàn)到這一層,Dubbo即將開始構(gòu)建服務消費者的代理對象寄疏,CouponServiceViewFacade接口的代理實現(xiàn)類是牢。


2.2服務提供端的解析

Spring容器啟動的時候,加載注解@org.apache.dubbo.config.spring.context.annotation.DubboComponentScan指定范圍的類陕截,并初始化驳棱;初始化使用dubbo實現(xiàn)的擴展點org.apache.dubbo.config.spring.beans.factory.annotation.ServiceClassPostProcessor。

ServiceClassPostProcessor處理的注解類有DubboService.class,Service.class,com.alibaba.dubbo.config.annotation.Service.class农曲。

如下是ServiceClassPostProcessor類定義:


public class ServiceClassPostProcessor implements BeanDefinitionRegistryPostProcessor, EnvironmentAware,

? ? ? ? ResourceLoaderAware, BeanClassLoaderAware {

? ? private final static List<Class<? extends Annotation>> serviceAnnotationTypes = asList(

? ? ? ? ? ? DubboService.class,Service.class,com.alibaba.dubbo.config.annotation.Service.class

? ? );

社搅。。朋蔫。

}


等待Spring容器ContextRefreshedEvent事件罚渐,啟動Dubbo應用服務監(jiān)聽端口,暴露本地服務驯妄。

Dubbo服務注冊到這一層荷并,Dubbo即將開始構(gòu)建服務提供者的代理對象,CouponServiceViewFacade實現(xiàn)類的反射代理類青扔。



三:代理層:

為服務消費者生成代理實現(xiàn)實例源织,為服務提供者生成反射代理實例。

CouponServiceViewFacade的代理實現(xiàn)實例微猖,消費端在調(diào)用query方法的時候谈息,實際上是調(diào)用代理實現(xiàn)實例的query方法,通過他調(diào)用遠程服務凛剥。

//

// Source code recreated from a .class file by IntelliJ IDEA

// (powered by Fernflower decompiler)

//

package org.apache.dubbo.common.bytecode;

public class proxy1 implements DC, Destroyable, CouponServiceViewFacade, EchoService {

? ? public static Method[] methods;

? ? private InvocationHandler handler;

? ? public proxy1(InvocationHandler var1) {

? ? ? ? this.handler = var1;

? ? }

? ? public proxy1() {

? ? }

? ? public CouponViewDTO query(String var1) {

? ? ? ? Object[] var2 = new Object[]{var1};

? ? ? ? Object var3 = this.handler.invoke(this, methods[0], var2);

? ? ? ? return (CouponViewDTO)var3;

? ? }

}

CouponServiceViewFacade的反射代理實例侠仇,服務端接收到請求后,通過該實例的Invoke方法最終執(zhí)行本地方法query。

/**

* InvokerWrapper

*/

public class AbstractProxyInvoker<CouponServiceViewFacade> implements Invoker<CouponServiceViewFacade> {

? ? ? ? // 逻炊。互亮。。

? ? public AbstractProxyInvoker(CouponServiceViewFacade proxy, Class<CouponServiceViewFacade> type, URL url) {

? ? ? ? //余素。豹休。。

? ? ? ? this.proxy = proxy;

? ? ? ? this.type = type;

? ? ? ? this.url = url;

? ? }

? ? @Override

? ? public Result invoke(Invocation invocation) throws RpcException {

? ? ? ? //桨吊。威根。。

? ? ? ? Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());

? ? ? ? //视乐。洛搀。。

? ? }

? ? protected Object doInvoke(CouponServiceViewFacade proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable{

? ? ? ? //炊林。姥卢。卷要。

? ? ? ? return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);

? ? }

}

2渣聚,怎么做?

Dubbo代理工廠接口定義如下僧叉,定義了服務提供者和服務消費者的代理對象工廠方法奕枝。服務提供者代理對象和服務消費者代理對象都是通過工廠方法創(chuàng)建,工廠實現(xiàn)類可以通過SPI自定義擴展瓶堕。


@SPI("javassist")

public interface ProxyFactory {

? ? // 生成服務消費者代理對象

? ? @Adaptive({PROXY_KEY})

? ? <T> T getProxy(Invoker<T> invoker) throws RpcException;

? ? // 生成服務消費者代理對象

? ? @Adaptive({PROXY_KEY})

? ? <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;


? ? // 生成服務提供者代理對象

? ? @Adaptive({PROXY_KEY})

? ? <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;

}

2.1服務消費者

2.1.1 創(chuàng)建服務消費者代理類

默認采用Javaassist代理工廠實現(xiàn)隘道,Proxy.getProxy(interfaces)創(chuàng)建代理工廠類,newInstance創(chuàng)建具體代理對象郎笆。

public class JavassistProxyFactory extends AbstractProxyFactory {

? ? @Override

? ? @SuppressWarnings("unchecked")

? ? public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {

? ? ? ? return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));

? ? }

? ? 谭梗。。宛蚓。

}


2.1.2 服務消費者代理

Dubbo為每個服務消費者生成兩個代理類:代理工廠類激捏,接口代理類。

CouponServiceViewFacade代理工廠類:

public class Proxy1 extends Proxy implements DC {

? ? public Proxy1() {

? ? }

? ? public Object newInstance(InvocationHandler var1) {

? ? ? ? return new proxy1(var1);

? ? }

}


最終生成的CouponServiceViewFacade的代理對象如下凄吏,其中handler的實現(xiàn)類是InvokerInvocationHandler远舅,this.handler.invoke方法發(fā)起Dubbo調(diào)用。

//

// Source code recreated from a .class file by IntelliJ IDEA

// (powered by Fernflower decompiler)

//

package org.apache.dubbo.common.bytecode;

public class proxy1 implements DC, Destroyable, CouponServiceViewFacade, EchoService {

? ? public static Method[] methods;

? ? private InvocationHandler handler;

? ? public proxy1(InvocationHandler var1) {

? ? ? ? this.handler = var1;

? ? }

? ? public proxy1() {

? ? }

? ? public CouponViewDTO query(String var1) {

? ? ? ? Object[] var2 = new Object[]{var1};

? ? ? ? Object var3 = this.handler.invoke(this, methods[0], var2);

? ? ? ? return (CouponViewDTO)var3;

? ? }

}


2.2 服務提供者

2.2.1 創(chuàng)建服務提供者代理類

默認Javaassist代理工廠實現(xiàn)痕钢,使用Wrapper包裝本地服務提供者图柏。proxy是實際的服務提供者實例,即CouponServiceViewFacade的本地實現(xiàn)類任连,type是接口類定義蚤吹,URL是injvm協(xié)議URL。

public class JavassistProxyFactory extends AbstractProxyFactory {

? ? 随抠。裁着。余佃。

? ? @Override

? ? public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {

? ? ? ? // 代理包裝類,包裝了本地的服務提供者

? ? ? ? final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);

? ? ? ? // 代理類入口

? ? ? ? return new AbstractProxyInvoker<T>(proxy, type, url) {

? ? ? ? ? ? @Override

? ? ? ? ? ? protected Object doInvoke(T proxy, String methodName,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Class<?>[] parameterTypes,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Object[] arguments) throws Throwable {

? ? ? ? ? ? ? ? return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);

? ? ? ? ? ? }

? ? ? ? };

? ? }

}


2.2.2 Wrapper包裝類

Dubbo為每個服務提供者的本地實現(xiàn)生成一個Wrapper代理類跨算,抽象Wrapper類定義如下:

public abstract class Wrapper {

? ? 爆土。。诸蚕。

? ? abstract public Object invokeMethod(Object instance, String mn, Class<?>[] types, Object[] args) throws NoSuchMethodException, InvocationTargetException;

}

具體Wrapper代理類使用字節(jié)碼技術(shù)動態(tài)生成步势,本地服務CouponServiceViewFacade的代理包裝類舉例:

//

// Source code recreated from a .class file by IntelliJ IDEA

// (powered by Fernflower decompiler)

//

package org.apache.dubbo.common.bytecode;

import com.xxx.CouponServiceViewFacade;

import java.lang.reflect.InvocationTargetException;

import java.util.Map;

import org.apache.dubbo.common.bytecode.ClassGenerator.DC;

public class Wrapper25 extends Wrapper implements DC {

? 。背犯。坏瘩。

? ? public Wrapper25() {

? ? }

? ? public Object invokeMethod(Object var1, String var2, Class[] var3, Object[] var4) throws InvocationTargetException {

? ? ? ? CouponServiceViewFacade var5;

? ? ? ? try {

? ? ? ? ? ? var5 = (CouponServiceViewFacade)var1;

? ? ? ? } catch (Throwable var8) {

? ? ? ? ? ? throw new IllegalArgumentException(var8);

? ? ? ? }

? ? ? ? try {

? ? ? ? ? ? if ("query".equals(var2) && var3.length == 1) {

? ? ? ? ? ? ? ? return var5.query((String)var4[0]);

? ? ? ? ? ? }

? ? ? ? } catch (Throwable var9) {

? ? ? ? ? ? throw new InvocationTargetException(var9);

? ? ? ? }

? ? ? ? throw new NoSuchMethodException("Not found method \"" + var2 + "\" in class com.xxx.CouponServiceViewFacade.");

? ? }

。漠魏。倔矾。

}

在服務初始化流程中,服務消費者代理對象生成后初始化就完成了柱锹,服務消費端的初始化順序:ReferenceConfig.get->從注冊中心訂閱服務->啟動客戶端->創(chuàng)建DubboInvoker->構(gòu)建ClusterInvoker→創(chuàng)建服務代理對象哪自;

而服務提供端的初始化才剛開始,服務提供端的初始化順序:ServiceConfig.export->創(chuàng)建AbstractProxyInvoker禁熏,通過Injvm協(xié)議關(guān)聯(lián)本地服務->啟動服務端→注冊服務到注冊中心壤巷。


四、注冊層

4.1 做什么

封裝服務地址的注冊與發(fā)現(xiàn)瞧毙,以服務 URL 為配置中心胧华。服務提供者本地服務啟動成功后,監(jiān)聽Dubbo端口成功后宙彪,通過注冊協(xié)議發(fā)布到注冊中心矩动;服務消費者通過注冊協(xié)議訂閱服務,啟動本地應用連接遠程服務释漆。

注冊協(xié)議URL舉例:

zookeeper://xxx/org.apache.dubbo.registry.RegistryService?application=xxx&...

4.2 怎么做

注冊服務工廠接口定義如下悲没,注冊服務實現(xiàn)通過SPI擴展,默認是zk作為注冊中心灵汪。

@SPI("dubbo")

public interface RegistryFactory {

? ? @Adaptive({"protocol"})

? ? Registry getRegistry(URL url);

}

注冊服務接口定義檀训;

public interface RegistryService {


? ? void register(URL url);


? ? void unregister(URL url);


? ? void subscribe(URL url, NotifyListener listener);


? ? void unsubscribe(URL url, NotifyListener listener);


? ? List<URL> lookup(URL url);

}



五、集群層

5.1 做什么

服務消費方從注冊中心訂閱服務提供者后享言,將多個提供者包裝成一個提供者峻凫,并且封裝路由及負載均衡策略;并橋接注冊中心览露,以 Invoker 為中心荧琼,擴展接口為 Cluster, Directory, Router, LoadBalance;

服務提供端不存在集群層。

5.2 怎么做

5.2.1 Cluster

集群領域主要負責將多個服務提供者包裝成一個ClusterInvoker,注入路由處理器鏈和負載均衡策略。主要策略有:failover炭分、failfast交煞、failsafe喊巍、failback、forking、available、mergeable苟翻、broadcast、zone-aware骗污。

集群接口定義如下崇猫,只有一個方法:從服務目錄中的多個服務提供者構(gòu)建一個ClusterInvoker。

作用是對上層-代理層屏蔽集群層的邏輯需忿;代理層調(diào)用服務方法只需執(zhí)行Invoker.invoke诅炉,然后通過ClusterInvoker內(nèi)部的路由策略和負載均衡策略計算具體執(zhí)行哪個遠端服務提供者。

@SPI(Cluster.DEFAULT)

public interface Cluster {

? ? String DEFAULT = FailoverCluster.NAME;

? ? @Adaptive

? ? <T> Invoker<T> join(Directory<T> directory) throws RpcException;

? 屋厘。涕烧。。

}

ClusterInvoker執(zhí)行邏輯擅这,先路由策略過濾澈魄,然后負載均衡策略選擇最終的遠端服務提供者景鼠。示例代理如下:

public abstract class AbstractClusterInvoker<T> implements ClusterInvoker<T> {

仲翎。。铛漓。

? ? @Override

? ? public Result invoke(final Invocation invocation) throws RpcException {

? ? ? ? checkWhetherDestroyed();

? ? ? ? // binding attachments into invocation.

? ? ? ? Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();

? ? ? ? if (contextAttachments != null && contextAttachments.size() != 0) {

? ? ? ? ? ? ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);

? ? ? ? }

? ? ? ? // 集群invoker執(zhí)行時溯香,先使用路由鏈過濾服務提供者

? ? ? ? List<Invoker<T>> invokers = list(invocation);

? ? ? ? LoadBalance loadbalance = initLoadBalance(invokers, invocation);

? ? ? ? RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

? ? ? ? return doInvoke(invocation, invokers, loadbalance);

? ? }

。浓恶。玫坛。

}


5.2.2 Directory

服務目錄接口定義如下,Dubbo方法接口調(diào)用時包晰,將方法信息包裝成invocation湿镀,通過Directory.list過濾可執(zhí)行的遠端服務。

通過org.apache.dubbo.registry.integration.RegistryDirectory橋接注冊中心伐憾,監(jiān)聽注冊中心的路由配置修改勉痴、服務治理等事件。

public interface Directory<T> extends Node {


? ? Class<T> getInterface();

? ? List<Invoker<T>> list(Invocation invocation) throws RpcException;

? ? List<Invoker<T>> getAllInvokers();

? ? URL getConsumerUrl();

}


5.2.3 Router

從已知的所有服務提供者中根據(jù)路由規(guī)則刷選服務提供者树肃。

服務訂閱的時候初始化路由處理器鏈蒸矛,調(diào)用遠程服務的時候先使用路由鏈過濾服務提供者,再通過負載均衡選擇具體的服務節(jié)點。

路由處理器鏈工具類雏掠,提供路由篩選服務斩祭,監(jiān)聽更新服務提供者。

public class RouterChain<T> {

乡话。摧玫。。


? ? public List<Invoker<T>> route(URL url, Invocation invocation) {

? ? ? ? List<Invoker<T>> finalInvokers = invokers;

? ? ? ? for (Router router : routers) {

? ? ? ? ? ? finalInvokers = router.route(finalInvokers, url, invocation);

? ? ? ? }

? ? ? ? return finalInvokers;

? ? }

? ? /**

? ? * Notify router chain of the initial addresses from registry at the first time.

? ? * Notify whenever addresses in registry change.

? ? */

? ? public void setInvokers(List<Invoker<T>> invokers) {

? ? ? ? //路由鏈監(jiān)聽更新服務提供者

? ? ? ? this.invokers = (invokers == null ? Collections.emptyList() : invokers);

? ? ? ? routers.forEach(router -> router.notify(this.invokers));

? ? }

}


訂閱服務的時候绑青,將路由鏈注入到RegistryDirectory中席赂;

public class RegistryProtocol implements Protocol {

? ? 。时迫。颅停。

? ? private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {

? ? ? ? 。掠拳。癞揉。

? ? ? ? // 服務目錄初始化路由鏈

? ? ? ? directory.buildRouterChain(subscribeUrl);

? ? ? ? directory.subscribe(toSubscribeUrl(subscribeUrl));

? ? ? ? 。溺欧。喊熟。

? ? ? ? return registryInvokerWrapper;

? ? }

? ? 。姐刁。芥牌。

}

5.2.4 LoadBalance

根據(jù)不同的負載均衡策略從可使用的遠端服務實例中選擇一個,負責均衡接口定義如下:

@SPI(RandomLoadBalance.NAME)

public interface LoadBalance {

? ? @Adaptive("loadbalance")

? ? <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

}


六聂使、監(jiān)控層

6.1 做什么

監(jiān)控RPC調(diào)用次數(shù)和調(diào)用時間壁拉,以Statistics為中心,擴展接口為 MonitorFactory, Monitor, MonitorService柏靶。


6.2 怎么做

監(jiān)控工廠接口定義弃理,通過SPI方式進行擴展;

@SPI("dubbo")

public interface MonitorFactory {


? ? @Adaptive("protocol")

? ? Monitor getMonitor(URL url);

}

@Adaptive("protocol")

Monitor getMonitor(URL url);

監(jiān)控服務接口定義如下屎蜓,定義了一些默認的監(jiān)控維度和指標項痘昌;

public interface MonitorService {

? ? // 監(jiān)控維度

? ? String APPLICATION = "application";

? ? String INTERFACE = "interface";

? ? String METHOD = "method";

? ? String GROUP = "group";

? ? String VERSION = "version";

? ? String CONSUMER = "consumer";

? ? String PROVIDER = "provider";

? ? String TIMESTAMP = "timestamp";

? ? //監(jiān)控指標項

? ? String SUCCESS = "success";

? ? String FAILURE = "failure";

? ? String INPUT = INPUT_KEY;

? ? String OUTPUT = OUTPUT_KEY;

? ? String ELAPSED = "elapsed";

? ? String CONCURRENT = "concurrent";

? ? String MAX_INPUT = "max.input";

? ? String MAX_OUTPUT = "max.output";

? ? String MAX_ELAPSED = "max.elapsed";

? ? String MAX_CONCURRENT = "max.concurrent";

? ? void collect(URL statistics);

? ? List<URL> lookup(URL query);

}

6.2.1 MonitorFilter

通過過濾器的方式收集服務的調(diào)用次數(shù)和調(diào)用時間,默認實現(xiàn):

org.apache.dubbo.monitor.dubbo.DubboMonitor炬转。


七辆苔、協(xié)議層

7.1 做什么

封裝 RPC 調(diào)用,以 Invocation, Result 為中心扼劈,擴展接口為 Protocol, Invoker, Exporter驻啤。

接下來介紹Dubbo RPC過程中的常用概念:

1)Invocation是請求會話領域模型,每次請求有相應的Invocation實例测僵,負責包裝dubbo方法信息為請求參數(shù)街佑;

2)Result是請求結(jié)果領域模型谢翎,每次請求都有相應的Result實例,負責包裝dubbo方法響應沐旨;

3)Invoker是實體域森逮,代表一個可執(zhí)行實體,有本地磁携、遠程褒侧、集群三類;

4)Exporter服務提供者Invoker管理實體谊迄;

5)Protocol是服務域闷供,管理Invoker的生命周期,提供服務的暴露和引用入口统诺;

服務初始化流程中歪脏,從這一層開始進行遠程服務的暴露和連接引用。

對于CouponServiceViewFacade服務來說粮呢,服務提供端會監(jiān)聽Dubbo端口啟動tcp服務婿失;服務消費端通過注冊中心發(fā)現(xiàn)服務提供者信息,啟動tcp服務連接遠端提供者啄寡。


7.2 怎么做

協(xié)議接口定義如下豪硅,統(tǒng)一抽象了不同協(xié)議的服務暴露和引用模型,比如InjvmProtocol只需將Exporter挺物,Invoker關(guān)聯(lián)本地實現(xiàn)懒浮。DubboProtocol暴露服務的時候,需要監(jiān)控本地端口啟動服務识藤;引用服務的時候砚著,需要連接遠端服務。


@SPI("dubbo")

public interface Protocol {


? ? int getDefaultPort();


? ? @Adaptive

? ? <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;


? ? @Adaptive

? ? <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;


? ? void destroy();


? ? default List<ProtocolServer> getServers() {

? ? ? ? return Collections.emptyList();

? ? }

}


Invoker接口定義

Invocation是RPC調(diào)用的會話對象蹋岩,負責包裝請求參數(shù)赖草;Result是RPC調(diào)用的結(jié)果對象,負責包裝RPC調(diào)用的結(jié)果對象剪个,包括異常類信息;


public interface Invoker<T> extends Node {


? ? Class<T> getInterface();


? ? Result invoke(Invocation invocation) throws RpcException;

}

7.2.1 服務的暴露和引用

服務暴露的時候版确,開啟RPC服務端扣囊;引用服務的時候,開啟RPC客戶端绒疗。

public class DubboProtocol extends AbstractProtocol {

侵歇。。吓蘑。

? ? @Override

? ? public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {

? ? ? ? 惕虑。坟冲。。

? ? ? ? // 開啟rpc服務端

? ? ? ? openServer(url);

? ? ? ? optimizeSerialization(url);

? ? ? ? return exporter;

? ? }

? ? @Override

? ? public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {

? ? ? ? optimizeSerialization(url);

? ? ? ? // 創(chuàng)建dubbo invoker,開啟rpc客戶端

? ? ? ? DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);

? ? ? ? invokers.add(invoker);

? ? ? ? return invoker;

? ? }

溃蔫。健提。。

}

7.2.2 服務端響應請求

接收響應請求伟叛;

private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

? ? ? ? @Override

? ? ? ? public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {

? ? ? ? ? ? ? ? ? ? ? ? ? 私痹。。统刮。

? ? ? ? ? ? Invocation inv = (Invocation) message;

? ? ? ? ? ? Invoker<?> invoker = getInvoker(channel, inv);

? ? ? ? ? ? RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());

? ? ? ? ? ? //調(diào)用本地服務

? ? ? ? ? ? Result result = invoker.invoke(inv);

? ? ? ? ? ? return result.thenApply(Function.identity());

? ? ? ? }

? ? ? ? 紊遵。。侥蒙。

? ? };


7.2.3 客戶端發(fā)送請求

調(diào)用遠程服務暗膜;

public class DubboInvoker<T> extends AbstractInvoker<T> {

? ? 。鞭衩。桦山。

? ? @Override

? ? protected Result doInvoke(final Invocation invocation) throws Throwable {

? ? ? ? 。醋旦。恒水。

? ? ? ? ? ? boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);

? ? ? ? ? ? int timeout = calculateTimeout(invocation, methodName);

? ? ? ? ? ? if (isOneway) {

? ? ? ? ? ? ? ? boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);

? ? ? ? ? ? ? ? currentClient.send(inv, isSent);

? ? ? ? ? ? ? ? return AsyncRpcResult.newDefaultAsyncResult(invocation);

? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? ExecutorService executor = getCallbackExecutor(getUrl(), inv);

? ? ? ? ? ? ? ? CompletableFuture<AppResponse> appResponseFuture =

? ? ? ? ? ? ? ? ? ? ? ? currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);

? ? ? ? ? ? ? ? FutureContext.getContext().setCompatibleFuture(appResponseFuture);

? ? ? ? ? ? ? ? AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);

? ? ? ? ? ? ? ? result.setExecutor(executor);

? ? ? ? ? ? ? ? return result;

? ? ? ? ? ? }

? ? }

}


八、交換層

8.1 做什么

封裝請求響應模式饲齐,同步轉(zhuǎn)異步钉凌,以 Request, Response 為中心,擴展接口為 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer捂人。

使用request包裝Invocation作為完整的請求對象御雕,使用response包裝result作為完整的響應對象;Request滥搭、Response相比Invocation酸纲、Result添加了Dubbo的協(xié)議頭。

8.2 怎么做

交換器對象接口定義瑟匆,定義了遠程服務的綁定和連接闽坡,使用SPI方式進行擴展;

@SPI(HeaderExchanger.NAME)

public interface Exchanger {


? ? @Adaptive({Constants.EXCHANGER_KEY})

? ? ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;


? ? @Adaptive({Constants.EXCHANGER_KEY})

? ? ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;

}

@Adaptive({Constants.EXCHANGER_KEY})

ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;

@Adaptive({Constants.EXCHANGER_KEY})

ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;


8.2.1 服務提供者

服務提供端接收到請求后愁溜,本地執(zhí)行疾嗅,發(fā)送響應結(jié)果;

public class HeaderExchangeHandler implements ChannelHandlerDelegate {

? 冕象。代承。。

? ? void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {

? ? ? //封裝響應

? ? ? ? Response res = new Response(req.getId(), req.getVersion());

? 渐扮。论悴。掖棉。

? ? ? ? Object msg = req.getData();

? ? ? ? try {

? ? ? ? ? ? CompletionStage<Object> future = handler.reply(channel, msg);

? ? ? ? ? ? future.whenComplete((appResult, t) -> {

? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? if (t == null) {

? ? ? ? ? ? ? ? ? ? ? ? res.setStatus(Response.OK);

? ? ? ? ? ? ? ? ? ? ? ? res.setResult(appResult);

? ? ? ? ? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? ? ? ? ? res.setStatus(Response.SERVICE_ERROR);

? ? ? ? ? ? ? ? ? ? ? ? res.setErrorMessage(StringUtils.toString(t));

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? channel.send(res);

? ? ? ? ? ? ? ? } catch (RemotingException e) {

? ? ? ? ? ? ? ? ? ? logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);

? ? ? ? ? ? ? ? }

? ? ? ? ? ? });

? ? ? ? } catch (Throwable e) {

? ? ? ? ? ? res.setStatus(Response.SERVICE_ERROR);

? ? ? ? ? ? res.setErrorMessage(StringUtils.toString(e));

? ? ? ? ? ? channel.send(res);

? ? ? ? }

? ? }

。膀估。幔亥。

}

8.2.2 服務消費者

服務消費端發(fā)起請求的封裝,方法執(zhí)行成功后玖像,返回一個future紫谷;

final class HeaderExchangeChannel implements ExchangeChannel {

。捐寥。笤昨。

? //封裝請求實體

? ? @Override

? ? public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {

? ? ? 。握恳。瞒窒。

? ? ? ? // create request.

? ? ? ? Request req = new Request();

? ? ? ? req.setVersion(Version.getProtocolVersion());

? ? ? ? req.setTwoWay(true);

? ? ? ? //RpcInvocation

? ? ? ? req.setData(request);

? ? ? ? DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);

? ? ? ? try {

? ? ? ? ? ? channel.send(req);

? ? ? ? } catch (RemotingException e) {

? ? ? ? ? ? future.cancel();

? ? ? ? ? ? throw e;

? ? ? ? }

? ? ? ? return future;

? ? }

。乡洼。崇裁。

}


九、傳輸層

9.1 做什么

抽象傳輸層模型束昵,兼容netty拔稳、mina、grizzly等通訊框架锹雏。

9.2 怎么做

傳輸器接口定義如下,它與交換器Exchanger接口定義相似巴比,區(qū)別在于Exchanger是圍繞Dubbo的Request和Response封裝的操作門面接口,而Transporter更加的底層礁遵,Exchanger用于隔離Dubbo協(xié)議層和通訊層轻绞。

@SPI("netty")

public interface Transporter {


? ? @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})

? ? RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException;


? ? @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})

? ? Client connect(URL url, ChannelHandler handler) throws RemotingException;

}

自定義傳輸層模型


通過SPI的方式,動態(tài)選擇具體的傳輸框架佣耐,默認是netty政勃;

public class Transporters {

? ? 。兼砖。奸远。

? ? public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {

? ? ? ? 。掖鱼。然走。

? ? ? ? return getTransporter().bind(url, handler);

? ? }

? ? public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {

? ? ? ? 。戏挡。。

? ? ? ? return getTransporter().connect(url, handler);

? ? }

? ? public static Transporter getTransporter() {

? ? ? ? return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();

? ? }

}

netty框架的channel適配如下晨仑,采用裝飾模式褐墅,使用netty框架的channel作為Dubbo自定義的channel做實現(xiàn)拆檬;

final class NettyChannel extends AbstractChannel {

? ? private NettyChannel(Channel channel, URL url, ChannelHandler handler) {

? ? ? ? super(url, handler);

? ? ? ? if (channel == null) {

? ? ? ? ? ? throw new IllegalArgumentException("netty channel == null;");

? ? ? ? }

? ? ? ? this.channel = channel;

? ? }

}


十、序列化

10.1 做什么

抽象序列化模型妥凳,兼容多種序列化框架竟贯,包括:fastjson、fst逝钥、hessian2屑那、kryo、kryo2艘款、protobuf等持际,通過序列化支持跨語言的方式,支持跨語言的RPC調(diào)用哗咆。

10.2 怎么做

定義Serialization擴展點蜘欲,默認hessian2,支持跨語言晌柬。Serialization接口實際是一個工廠接口姥份,通過SPI擴展;實際序列化和反序列化工作由ObjectOutput年碘,ObjectInput完成澈歉,通過裝飾模式讓hessian2完成實際工作。


@SPI("hessian2")

public interface Serialization {


? ? byte getContentTypeId();


? ? String getContentType();

? ? @Adaptive

? ? ObjectOutput serialize(URL url, OutputStream output) throws IOException;


? ? @Adaptive

? ? ObjectInput deserialize(URL url, InputStream input) throws IOException;

}

10.2.1 通訊協(xié)議設計

下圖出自開發(fā)指南-實現(xiàn)細節(jié)-遠程通訊細節(jié)屿衅,描述Dubbo協(xié)議頭設計埃难;


0-15bit表示Dubbo協(xié)議魔法數(shù)字,值:0xdabb傲诵;

16bit請求響應標記凯砍,Request - 1; Response - 0;

17bit請求模式標記拴竹,只有請求消息才會有悟衩,1表示需要服務端返回響應;

18bit是事件消息標記栓拜,1表示該消息是事件消息座泳,比如心跳消息;

19-23bit是序列化類型標記幕与,hessian序列化id是2挑势,fastjson是6,詳見org.apache.dubbo.common.serialize.Constants啦鸣;

24-31bit表示狀態(tài)潮饱,只有響應消息才有用;

32-64bit是RPC請求ID诫给;

96-128bit是會話數(shù)據(jù)長度香拉;

128是消息體字節(jié)序列啦扬;

Dubbo將RPC整個過程分成核心的代理層、注冊層凫碌、集群層扑毡、協(xié)議層、傳輸層等盛险,層與層之間的職責邊界明確瞄摊;核心層都通過接口定義,不依賴具體實現(xiàn)苦掘,這些接口串聯(lián)起來形成了Dubbo的骨架换帜;這個骨架也可以看作是Dubbo的內(nèi)核,內(nèi)核使用SPI 機制加載插件(擴展點)鸟蜡,達到高度可擴展膜赃。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市揉忘,隨后出現(xiàn)的幾起案子跳座,更是在濱河造成了極大的恐慌,老刑警劉巖泣矛,帶你破解...
    沈念sama閱讀 212,884評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件疲眷,死亡現(xiàn)場離奇詭異,居然都是意外死亡您朽,警方通過查閱死者的電腦和手機狂丝,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,755評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來哗总,“玉大人几颜,你說我怎么就攤上這事⊙肚” “怎么了蛋哭?”我有些...
    開封第一講書人閱讀 158,369評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長涮母。 經(jīng)常有香客問我谆趾,道長,這世上最難降的妖魔是什么叛本? 我笑而不...
    開封第一講書人閱讀 56,799評論 1 285
  • 正文 為了忘掉前任沪蓬,我火速辦了婚禮,結(jié)果婚禮上来候,老公的妹妹穿的比我還像新娘跷叉。我一直安慰自己,他們只是感情好,可當我...
    茶點故事閱讀 65,910評論 6 386
  • 文/花漫 我一把揭開白布性芬。 她就那樣靜靜地躺著峡眶,像睡著了一般剧防。 火紅的嫁衣襯著肌膚如雪植锉。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 50,096評論 1 291
  • 那天峭拘,我揣著相機與錄音俊庇,去河邊找鬼。 笑死鸡挠,一個胖子當著我的面吹牛辉饱,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播拣展,決...
    沈念sama閱讀 39,159評論 3 411
  • 文/蒼蘭香墨 我猛地睜開眼彭沼,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了备埃?” 一聲冷哼從身側(cè)響起姓惑,我...
    開封第一講書人閱讀 37,917評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎按脚,沒想到半個月后于毙,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,360評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡辅搬,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,673評論 2 327
  • 正文 我和宋清朗相戀三年唯沮,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片堪遂。...
    茶點故事閱讀 38,814評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡介蛉,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出溶褪,到底是詐尸還是另有隱情币旧,我是刑警寧澤,帶...
    沈念sama閱讀 34,509評論 4 334
  • 正文 年R本政府宣布竿滨,位于F島的核電站佳恬,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏于游。R本人自食惡果不足惜毁葱,卻給世界環(huán)境...
    茶點故事閱讀 40,156評論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望贰剥。 院中可真熱鬧倾剿,春花似錦、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至芹缔,卻和暖如春坯癣,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背最欠。 一陣腳步聲響...
    開封第一講書人閱讀 32,123評論 1 267
  • 我被黑心中介騙來泰國打工示罗, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人芝硬。 一個月前我還...
    沈念sama閱讀 46,641評論 2 362
  • 正文 我出身青樓蚜点,卻偏偏與公主長得像,于是被迫代替她去往敵國和親拌阴。 傳聞我的和親對象是個殘疾皇子绍绘,可洞房花燭夜當晚...
    茶點故事閱讀 43,728評論 2 351

推薦閱讀更多精彩內(nèi)容