Dubbo的分支: 3.0
Dubbo的服務(wù)提供者會(huì)將RPC服務(wù)的調(diào)用說明,導(dǎo)出到配置中心唱较。然后服務(wù)的消費(fèi)者向配置中心訂閱這些服務(wù)天梧,也就是引用這些服務(wù)戚丸。
服務(wù)端-服務(wù)提供方-暴露/導(dǎo)出服務(wù)
-
dubbo根據(jù)spring的擴(kuò)展api,增加了dubbo命名空間裸违,以及各種xml元素掖桦,用來配置服務(wù)。
- 關(guān)于dubbo和spring的集成供汛,以及spring對自定義的命名空間解析枪汪,詳見NamespaceHandler,NamespaceHandlerSupport的使用怔昨。
spring boot啟動(dòng)雀久,解析dubbo框架xml配置文件,裝載實(shí)例趁舀,放入spring容器岸啡,到這相當(dāng)于實(shí)例化bean完成。
-
dubbo增加了應(yīng)用上下文監(jiān)聽類DubboBootstrapApplicationListener赫编,類內(nèi)覆蓋onApplicationContextEvent(ApplicationContextEvent event)方法,啟動(dòng)dubbo奋隶。
@Override public void onApplicationContextEvent(ApplicationContextEvent event) { //使用事件監(jiān)聽器的方式與spring集成 if (event instanceof ContextRefreshedEvent) { onContextRefreshedEvent((ContextRefreshedEvent) event); } else if (event instanceof ContextClosedEvent) { onContextClosedEvent((ContextClosedEvent) event); } } private void onContextRefreshedEvent(ContextRefreshedEvent event) { //啟動(dòng)dubbo dubboBootstrap.start(); }
-
org.apache.dubbo.config.bootstrap.DubboBootstrap#start
-
如果是第一次啟動(dòng)的話擂送,調(diào)用initialize()完成spi加載、配置中心初始化唯欣、事件監(jiān)聽等初始化嘹吨。
public DubboBootstrap start() { if (started.compareAndSet(false, true)) { startup.set(false); initialize(); //start方法內(nèi)有兩行關(guān)鍵的代碼,一個(gè)是服務(wù)提供方開始暴露/導(dǎo)出所有服務(wù)境氢,一個(gè)是消費(fèi)方開始發(fā)現(xiàn)/引用服務(wù) // 1. 導(dǎo)出dubbo所有服務(wù) exportServices(); // Not only provider register if (!isOnlyRegisterProvider() || hasExportedServices()) { // 2. export MetadataService exportMetadataService(); //3. Register the local ServiceInstance if required registerServiceInstance(); } //消費(fèi)者引用服務(wù) referServices(); //省略... } return this; }
-
調(diào)用exportServices暴露/導(dǎo)出服務(wù)蟀拷。
private void exportServices() { //從配置中心拿到配置的所有service bean,依次導(dǎo)出萍聊。 configManager.getServices().forEach(sc -> { ServiceConfig serviceConfig = (ServiceConfig) sc; serviceConfig.setBootstrap(this); if (exportAsync) { //異步導(dǎo)出 ExecutorService executor = executorRepository.getServiceExporterExecutor(); Future<?> future = executor.submit(() -> { sc.export(); exportedServices.add(sc); }); asyncExportingFutures.add(future); } else { //同步導(dǎo)出 sc.export(); exportedServices.add(sc); } }); }
-
遍歷所有服務(wù)问芬,依次調(diào)用org.apache.dubbo.config.ServiceConfig#export,導(dǎo)出服務(wù)寿桨。最終調(diào)用org.apache.dubbo.config.ServiceConfig#doExportUrls此衅,對該服務(wù)配置的暴露協(xié)議,每個(gè)協(xié)議暴露一份服務(wù)亭螟。
private void doExportUrls() { //repository.services存有所有的服務(wù)描述對象挡鞍,包括默認(rèn)加載的隱含服務(wù), //如EchoService,GenericService,MonitorService,MetricsService //repository.providers存有所有的服務(wù)提供者對象 //repository.consumers存有所有的服務(wù)消費(fèi)者對象 ServiceRepository repository = ApplicationModel.getServiceRepository(); //此處省略部分代碼...... //獲取向配置中心注冊的服務(wù)url格式预烙,即registry://xxxx List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true); for (ProtocolConfig protocolConfig : protocols) { //此處省略部分代碼...... //因?yàn)橐粋€(gè)服務(wù)可以使用多種通訊協(xié)議墨微,所以此處是每個(gè)協(xié)議導(dǎo)出一份服務(wù)配置,如dubbo/http/injvm協(xié)議各一份 doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
-
導(dǎo)出的核心方法:org.apache.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol
- 因?yàn)閐ubbo設(shè)計(jì)的是使用url作為配置總線扁掸,代表最終的導(dǎo)出格式翘县。這個(gè)方法會(huì)先補(bǔ)充url信息最域,如失敗重試retry、token炼蹦、Generic服務(wù)等數(shù)據(jù)羡宙。
- 判斷一個(gè)服務(wù)只要不是配置了只能暴露給遠(yuǎn)程的話,就會(huì)導(dǎo)出一份本地injvm服務(wù)掐隐,即調(diào)用exportLocal(url)狗热。
- 暴露給遠(yuǎn)程。
-
單獨(dú)說下導(dǎo)出一個(gè)服務(wù)的主要邏輯
-
這里需要先解釋下dubbo設(shè)計(jì)的幾個(gè)類:Protocol/Exporter/Invoker虑省,說下這三個(gè)類的關(guān)系匿刮。
- Protocol是協(xié)議的意思,比如Dubbo探颈、Injvm協(xié)議熟丸。一個(gè)服務(wù)要以某個(gè)協(xié)議的方式暴露出來,即可以用這個(gè)協(xié)議來訪問這個(gè)服務(wù)伪节,如HelloService以http協(xié)議的方式暴露出來光羞,也就是以后可以用http的方式訪問這個(gè)服務(wù)。
- Dubbo抽象出來一個(gè)Exporter的概念怀大,負(fù)責(zé)如何暴露服務(wù)纱兑,如何取消服務(wù)暴露。 一個(gè)服務(wù)被暴露出來化借,需要知道它該怎么調(diào)用潜慎,就有了Invoker。Invoker負(fù)責(zé)實(shí)際的調(diào)用服務(wù)返回響應(yīng)蓖康。
- 綜上铐炫,1個(gè)Protocol包含多個(gè)服務(wù)的Exporter,1個(gè)Exporter在大部分情況下只包含1個(gè)Invoker蒜焊。
- 從代碼來看三者的關(guān)系:
public abstract class AbstractProtocol implements Protocol { //該協(xié)議的所有導(dǎo)出者倒信,即暴露的所有服務(wù) //"greeting/org.apache.dubbo.demo.GreetingService:1.0.0:20880" -> DubboExporter對象 //"greeting/org.apache.dubbo.demo.GreetingService:1.0.0" -> InjvmExporter對象 protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>(); //省略...... } public abstract class AbstractExporter<T> implements Exporter<T> { //每個(gè)AbstractExporter子類都會(huì)至少包含一個(gè)Invoker private final Invoker<T> invoker; //省略...... }
-
再說下這三者的組裝過程:
public class ServiceConfig<T> extends ServiceConfigBase<T> { //自適應(yīng)Protocol,默認(rèn)以DubboProtocol為基礎(chǔ)泳梆,按優(yōu)先級在外層包裝各種包裝類堤结,如:ProtocolFilterWrapper,ProtocolListenerWrapper鸭丛,QosProtocolWrapper private static final Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); //Invoker動(dòng)態(tài)代理工廠竞穷,默認(rèn)為JavassistProxyFactory private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); //所有導(dǎo)出的服務(wù) private final List<Exporter<?>> exporters = new ArrayList<Exporter<?>>(); private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { //省略... if (CollectionUtils.isNotEmpty(registryURLs)) { for (URL registryURL : registryURLs) { //省略... //使用JavassistProxyFactory動(dòng)態(tài)生成、加載invoker字節(jié)碼鳞溉。以GreetingService舉例 //ref=greetingServiceImpl瘾带,interfaceClass=GreetingService.class,第三個(gè)參數(shù)是url=injvm://127.0.0.1/org.apache.dubbo.demo.GreetingService ?anyhost=true&application=demo-provider&bind.ip=192.168.2.3&bind.port=20880 &deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=greeting &interface=org.apache.dubbo.demo.GreetingService&mapping-type=metadata &mapping.type=metadata&metadata-type=remote&methods=hello&pid=25062&qos.port=22222 &release=&revision=1.0.0&side=provider&timeout=5000×tamp=1615949370199&version=1.0.0 //JavassistProxyFactory.getInvoker方法內(nèi)部會(huì)先生成invoker wrapper包裝類熟菲,目的是統(tǒng)一bean的方法調(diào)用看政。生成的wrapper類詳見下面Wrapper1朴恳。 Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); //使用JavassistProxyFactory動(dòng)態(tài)生成、加載invoker字節(jié)碼允蚣。以GreetingService舉例 //ref=greetingServiceImpl于颖,interfaceClass=GreetingService.class, //第三個(gè)參數(shù)是url=injvm://127.0.0.1/org.apache.dubbo.demo.GreetingService?anyhost=true&application=demo-provider // &bind.ip=192.168.2.3&bind.port=20880 &deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=greeting // &interface=org.apache.dubbo.demo.GreetingService&mapping-type=metadata &mapping.type=metadata // &metadata-type=remote&methods=hello&pid=25062&qos.port=22222 &release=&revision=1.0.0&side=provider // &timeout=5000×tamp=1615949370199&version=1.0.0 //JavassistProxyFactory.getInvoker方法內(nèi)部會(huì)先生成invoker wrapper包裝類嚷兔。生成的wrapper類詳見下面Wrapper1森渐。 Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); //將invoker以某個(gè)協(xié)議暴露出去,返回exporter //在protocol的多層包裝對象中冒晰,有一個(gè)ProtocolFilterWrapper同衣,調(diào)用export方法時(shí),它會(huì)先判斷是否為registry協(xié)議壶运,如果不是耐齐, //需要先構(gòu)造invoker的過濾器鏈,增強(qiáng)invoker功能蒋情,如MonitorFilter,TimeoutFilter,TraceFilter,ExceptionFilter等埠况,再繼續(xù)導(dǎo)出。 Exporter<?> exporter = PROTOCOL.export(wrapperInvoker); exporters.add(exporter); } } else { //省略棵癣,基本同上 Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); Exporter<?> exporter = PROTOCOL.export(wrapperInvoker); exporters.add(exporter); } } }
-
服務(wù)如何暴露給遠(yuǎn)程询枚,即向公共配置中心注冊服務(wù)
上面說過我們在xml中配置一個(gè)服務(wù),可以指定它以哪些協(xié)議的方式暴露出來浙巫,比如dubbo,injvm刷后,http的畴,grpc,hessian等
dubbo會(huì)遍歷所有協(xié)議尝胆,每一個(gè)協(xié)議都會(huì)調(diào)用doExportUrlsFor1Protocol()來導(dǎo)出對應(yīng)格式的服務(wù)丧裁。
dubbo會(huì)判斷如果這個(gè)協(xié)議配置了暴露給遠(yuǎn)程,那么會(huì)根據(jù)當(dāng)前協(xié)議的url含衔,創(chuàng)建一個(gè)用于注冊registry的url煎娇。如,當(dāng)前是http協(xié)議贪染,框架判斷需要暴露給遠(yuǎn)程缓呛,即需要注冊到公共配置中心,那么會(huì)根據(jù)http://xxx杭隙,生成一個(gè)registry://xxx的url哟绊,用于注冊服務(wù)。
-
服務(wù)導(dǎo)出是由協(xié)議對象調(diào)用export()觸發(fā)痰憎,Exporter<?> exporter = PROTOCOL.export(wrapperInvoker)票髓。其中registry://協(xié)議是由RegistryProtocol.export負(fù)責(zé)導(dǎo)出攀涵。
@Override public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { //orginInvoker的url=registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?xxx //得到registryUrl=zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?xxx URL registryUrl = getRegistryUrl(originInvoker); //得到providerUrl=dubbo://192.168.2.3:20880/org.apache.dubbo.demo.GreetingService?xxx URL providerUrl = getProviderUrl(originInvoker); //省略... // decide if we need to delay publish boolean register = providerUrl.getParameter(REGISTER_KEY, true); if (register) { //立即向配置中心如zk,注冊服務(wù) register(registryUrl, registeredProviderUrl); } //省略... //Ensure that a new exporter instance is returned every time export return new DestroyableExporter<>(exporter); } private void register(URL registryUrl, URL registeredProviderUrl) { //根據(jù)url協(xié)議頭洽沟,獲取配置中心注冊器對象以故,得到ZookeeperRegistry Registry registry = registryFactory.getRegistry(registryUrl); //調(diào)用ZookeeperRegistry的注冊方法,內(nèi)部為使用CuratorZookeeperClient向配置中心寫服務(wù)數(shù)據(jù) //到這里也就完成了將服務(wù)注冊到公共配置中心裆操,暴露給遠(yuǎn)程了 registry.register(registeredProviderUrl); }
到此服務(wù)暴露完畢怒详。
客戶端-服務(wù)消費(fèi)方-發(fā)現(xiàn)/引用服務(wù)
先需要理解消費(fèi)方的幾個(gè)重要概念
- 先舉個(gè)例子,借著例子來理解跷车。假設(shè)在北京和上海分別有一個(gè)注冊中心zk-beijing棘利,zk-shanghai。在北京有三臺(tái)機(jī)器朽缴,ip分別為1.1.1.1/2.2.2.2/3.3.3.3善玫。其中1.1.1.1為消費(fèi)者,2.2.2.2和3.3.3.3為服務(wù)提供者密强。2.2.2.2提供ServiceA,ServiceB兩個(gè)服務(wù)茅郎,3.3.3.3提供ServiceA,ServiceC兩個(gè)服務(wù)。
-
我們先簡化環(huán)境或渤,看下只有一個(gè)注冊中心的情況系冗,即只向zk-beijing注冊。在2.2.2.2/3.3.3.3啟動(dòng)初始化時(shí)薪鹦,會(huì)向注冊中心注冊暴露服務(wù)掌敬,路徑為:
//zookeeper路徑結(jié)構(gòu) /dubbo /ServiceA /providers /URL.encode(dubbo://2.2.2.2:20880/ServiceA?anyhost=true&application=demo-provider&delay=5000&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=ServiceA&mapping-type=metadata&mapping.type=metadata&metadata-type=remote&methods=sayHello,sayHelloAsync&pid=37004&release=&side=provider&timeout=3000×tamp=1617773391401) /URL.encode(dubbo://3.3.3.3:20880/ServiceA?anyhost=true&application=demo-provider&delay=5000&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=ServiceA&mapping-type=metadata&mapping.type=metadata&metadata-type=remote&methods=sayHello,sayHelloAsync&pid=35111&release=&side=provider&timeout=3000×tamp=1617773391507) /ServiceB /providers /URL.encode(rest://2.2.2.2:80/ServiceB?anyhost=true&application=demo-provider&delay=5000&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=ServiceB&mapping-type=metadata&mapping.type=metadata&metadata-type=remote&methods=buy,sell&pid=37004&release=&revision=1.0.0&side=provider&timeout=5000×tamp=1617773396526&version=1.0.0) /ServiceC /providers /URL.encode(dubbo://3.3.3.3:20880/ServiceC?anyhost=true&application=demo-provider&delay=5000&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=ServiceC&mapping-type=metadata&mapping.type=metadata&metadata-type=remote&methods=go,back&pid=351111&release=&side=provider&timeout=3000×tamp=1617773391507)
從路徑可以看出,dubbo不是以一個(gè)機(jī)器為基本單位池磁,暴露該機(jī)器下有哪些服務(wù)的奔害,如:/dubbo/2.2.2.2/我的服務(wù)xxx。dubbo是以服務(wù)為單位來暴露地熄。因?yàn)槲覀円枚际且媚膫€(gè)服務(wù)华临,暴露也應(yīng)該是暴露哪個(gè)服務(wù),服務(wù)下標(biāo)明哪些機(jī)器上有該服務(wù)端考。你需要引用哪些服務(wù)就可以隨意的引用哪些雅潭,不需要引用整個(gè)機(jī)器暴露出來的所有服務(wù)。假如一個(gè)服務(wù)暴露在多臺(tái)機(jī)器上却特,就更是個(gè)問題了扶供。
再看多注冊中心,假如有另外幾臺(tái)機(jī)器裂明,向shanghai注冊诚欠,情況和beijing差不多。這種多注冊中心存在時(shí),可以看做有兩個(gè)異地服務(wù)集群轰绵,比如HelloService有北京和上海兩個(gè)服務(wù)集群粉寞。
-
概念一:Invoker
- Invoker是什么:從zk樹上看,/providers下的每一個(gè)服務(wù)url在消費(fèi)方都會(huì)對應(yīng)一個(gè)Invoker左腔。如果1.1.1.1引用了這三個(gè)服務(wù)唧垦,那就會(huì)創(chuàng)建4個(gè)對應(yīng)的Invoker。一個(gè)Invoker表示一個(gè)消費(fèi)方持有的液样,對一個(gè)提供方某一個(gè)服務(wù)的引用振亮。Invoker內(nèi)包含連接池屬性字段ExchangeClient[] clients。因?yàn)镃lusterInvoker的存在鞭莽,為了區(qū)分坊秸,Invoker也叫做普通Invoker。
概念二:Directory
- Directory就是多個(gè)Invoker的集合澎怒,實(shí)現(xiàn)類一般會(huì)有一個(gè)List<Invoker<T>>屬性字段褒搔,存儲(chǔ)Invoker。為了方便表示以及操作多Invoker喷面,抽象出來的概念星瘾。
概念三:Router
- Router路由。在實(shí)際使用中惧辈,可能由于我們部署的物理機(jī)性能有差別琳状,有幾臺(tái)機(jī)器性能特別好,或者說為了測試某幾臺(tái)機(jī)器上的服務(wù)盒齿,要求請求更多的或者全部路由到某幾臺(tái)機(jī)器念逞,Router負(fù)責(zé)的就是這個(gè)路由邏輯。Router會(huì)從Directory中遍歷選出與路由規(guī)則匹配的invoker子集边翁,然后交給負(fù)載均衡器翎承。
概念四:LoadBalance
- LoadBalance(負(fù)載均衡器)接收Router篩選出來的可以使用的invoker集合,從中選出一個(gè)普通invoker倒彰,用于發(fā)起實(shí)際調(diào)用。
概念五:ClusterInvoker
- ClusterInvoker就是實(shí)際的某一個(gè)物理集群了莱睁,如HelloService服務(wù)的北京集群待讳。
概念六:Cluster
- Cluster是對實(shí)際物理集群抽象出的最上層概念,即某一個(gè)服務(wù)的所有物理集群的集合叫做Cluster仰剿。如HelloService的北京和上海兩個(gè)異地物理集群合起來叫做Cluster创淡。
總結(jié)
- HelloService服務(wù)部署在兩個(gè)異地集群,一個(gè)北京南吮,一個(gè)上海琳彩。兩地各自都有一個(gè)注冊中心zk。
- 自下而上的組裝過程:
- dubbo會(huì)從北京的注冊中心zk讀取HelloService的所有提供者,一個(gè)提供者封裝成一個(gè)普通Invoker露乏。
- 將多個(gè)普通Invoker存儲(chǔ)到一個(gè)Directory中碧浊。
- 根據(jù)業(yè)務(wù)特性,創(chuàng)建一個(gè)對應(yīng)的集群執(zhí)行器對象ClusterInvoker瘟仿。選擇適合的Router和LoadBalance箱锐,將Directory傳入。到此北京集群的invoker封裝好了劳较。
- 重復(fù)上述步驟驹止,將上海集群也封裝成一個(gè)ClusterInvoker。
- 將北京观蜗、上海ClusterInvoker封裝到多注冊中心Cluster對象中臊恋。
- 服務(wù)以Cluster為入口,對外提供服務(wù)墓捻。
- 自上而下的調(diào)用過程:
Cluster會(huì)根據(jù)訂閱的各個(gè)注冊中心的配置抖仅、與服務(wù)消費(fèi)方是否處于同一個(gè)物理集群、權(quán)重等先選出來一個(gè)調(diào)用的ClusterInvoker毙替,即先選出來一個(gè)物理集群岸售。假如選出來的是北京集群。
然后在北京集群ClusterInvoker執(zhí)行Router路由厂画,選出匹配的普通Invoker集合凸丸。
在路由后的Invoker集合上,執(zhí)行負(fù)載均衡袱院,由LoadBalance選出一個(gè)最終用來執(zhí)行RPC的普通Invoker屎慢。
-
執(zhí)行Invoker.invoke(Invocation invocation),發(fā)起調(diào)用忽洛。
//從代碼看下調(diào)用過程 public abstract class AbstractClusterInvoker<T> implements ClusterInvoker<T> { public Result invoke(final Invocation invocation) throws RpcException { //省略 //使用路由器Router從ClusterInvoker.directory存儲(chǔ)的所有的invoker中腻惠,路由出符合路由規(guī)則的一批invoker。 List<Invoker<T>> invokers = list(invocation); //上面選出了方向正確的一批invoker欲虚,現(xiàn)在需要負(fù)載均衡選一個(gè)執(zhí)行了集灌。此處初始化負(fù)載均衡器LoadBalance。 LoadBalance loadbalance = initLoadBalance(invokers, invocation); //為異步執(zhí)行附著調(diào)用id RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); //使用負(fù)載均衡器選一個(gè)invoker复哆,發(fā)起調(diào)用欣喧,返回響應(yīng)。 return doInvoke(invocation, invokers, loadbalance); } }
消費(fèi)方發(fā)現(xiàn)/引用服務(wù)的過程
- Dubbo是嚴(yán)重依賴Spring框架的梯找,我們在xml配置了java interface > HelloService的Bean聲明和引用關(guān)系后唆阿,如何在服務(wù)消費(fèi)方生成、加載該接口的實(shí)現(xiàn)類锈锤,創(chuàng)建實(shí)例對象驯鳖,納入到Spring容器管理闲询,在需要的地方注入。
- Dubbo是由FactoryBean入手的浅辙,過程與MyBatis的Mapper類生成過程類似扭弧。(注:3.0分支中,只有延遲加載的代理對象是使用FactoryBean功能生成的摔握,普通代理對象是使用ReferenceConfig.get()生成的寄狼。而在master分支中,所有代理對象都是使用FactoryBean生成氨淌,底層實(shí)現(xiàn)也是調(diào)用的ReferenceConfig.get())
- 下面說的是3.0分支普通代理對象的生成過程泊愧,沒有使用FactoryBean。
-
與服務(wù)暴露一樣盛正,服務(wù)引用也是使用事件監(jiān)聽的方式與spring集成删咱,都會(huì)調(diào)用的org.apache.dubbo.config.bootstrap.DubboBootstrap#start方法,該方法上面說過,有兩個(gè)關(guān)鍵的點(diǎn),一個(gè)是exportServices():提供方開始暴露/導(dǎo)出所有服務(wù)愕够,另一個(gè)是referServices():消費(fèi)方開始發(fā)現(xiàn)/引用服務(wù)映跟。
public DubboBootstrap start() { if (started.compareAndSet(false, true)) { startup.set(false); initialize(); //start方法內(nèi)有兩行關(guān)鍵的代碼燕侠,一個(gè)是服務(wù)提供方開始暴露/導(dǎo)出所有服務(wù),一個(gè)是消費(fèi)方開始發(fā)現(xiàn)/引用服務(wù) // 1. 導(dǎo)出dubbo所有服務(wù) exportServices(); // Not only provider register if (!isOnlyRegisterProvider() || hasExportedServices()) { // 2. export MetadataService exportMetadataService(); //3. Register the local ServiceInstance if required registerServiceInstance(); } //消費(fèi)者引用服務(wù) referServices(); //省略... } return this; }
-
referServices()中會(huì)觸發(fā)緩存數(shù)據(jù)的初始化、加載。
private void referServices() { //configManager.getReferences()拿到的是對xml中配置的所有<dubbo:reference xxx/> //解析后封裝的ReferenceConfig對象 configManager.getReferences().forEach(rc -> { //省略... //cache為ReferenceConfigCache類型多艇,底層是一個(gè)ConcurrentHashMap, //調(diào)用get會(huì)初始化對應(yīng)的ReferenceConfig對象像吻,放入緩存中 cache.get(rc); }); }
-
ReferenceConfigCache.get(rc)峻黍,從緩存拿消費(fèi)方引用的服務(wù)代理對象,如果沒有觸發(fā)實(shí)現(xiàn)類的生成拨匆、加載姆涩,然后創(chuàng)建代理類放入cache。
public <T> T get(ReferenceConfigBase<T> referenceConfig) { //key=org.apache.dubbo.demo.DemoService String key = generator.generateKey(referenceConfig); //type=DemoService.class Class<?> type = referenceConfig.getInterfaceClass(); //proxies結(jié)構(gòu)=< DemoService.class, <"org.apache.dubbo.demo.DemoService", DemoService實(shí)現(xiàn)類代理對象> > proxies.computeIfAbsent(type, _t -> new ConcurrentHashMap<>()); //proxiesOfType結(jié)構(gòu)=<"org.apache.dubbo.demo.DemoService", DemoService實(shí)現(xiàn)類代理對象> ConcurrentMap<String, Object> proxiesOfType = proxies.get(type); proxiesOfType.computeIfAbsent(key, _k -> { //由ReferenceConfig配置對象觸發(fā)對應(yīng)服務(wù)接口實(shí)現(xiàn)類的創(chuàng)建惭每、加載骨饿,以及代理對象的創(chuàng)建。 //proxy就是在消費(fèi)方引用的服務(wù)代理對象 Object proxy = referenceConfig.get(); referredReferences.put(key, referenceConfig); return proxy; }); return (T) proxiesOfType.get(key); }
-
關(guān)鍵的來了台腥,ReferenceConfig類包含了生成宏赘、加載實(shí)現(xiàn)類,創(chuàng)建代理對象的主要邏輯览爵。
public class ReferenceConfig<T> extends ReferenceConfigBase<T> { //缺省FailoverCluster private static final Cluster CLUSTER = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension(); //缺省JavassistProxyFactory private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); //根據(jù)url協(xié)議類型決定置鼻,url=registry://xxx镇饮,對應(yīng)RegistryProtocol蜓竹。url=dubbo://xxx,對應(yīng)DubboProtocol。url=injvm://xxx俱济,對應(yīng)InjvmProtocol嘶是。 private static final Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); //ClusterInvoker類型 private transient volatile Invoker<?> invoker; //直連提供者地址或者注冊中心地址 protected final List<URL> urls = new ArrayList<URL>(); //引用的代理對象 private transient volatile T ref; //1.生成的代理對象會(huì)set到屬性字段中,為null的話蛛碌,開始引用初始化聂喇。 public synchronized T get() { if (ref == null) { //觸發(fā)引用初始化 init(); } return ref; } //2. 初始化 public synchronized void init() { if (initialized) return; //解析各種消費(fèi)方配置,放入map //省略... //map={"mapping-type":"metadata","init":"false","side":"consumer","register.ip":"192.168.2.3", // "release":"","methods":"sayHello,sayHelloAsync","qos.port":"33333","provided-by":"demo-provider", // "dubbo":"2.0.2","pid":"35116","check":"true","interface":"org.apache.dubbo.demo.DemoService", // "enable.auto.migration":"true","mapping.type":"metadata","metadata-type":"remote", // "application":"demo-consumer","sticky":"false","timestamp":"1617676272015","enable-auto-migration":"true"} //為org.apache.dubbo.demo.DemoService接口創(chuàng)建消費(fèi)方代理對象 ref = createProxy(map); //分發(fā)ReferenceConfigInitializedEvent事件 dispatch(new ReferenceConfigInitializedEvent(this, invoker)); } //3.創(chuàng)建動(dòng)態(tài)代理對象 private T createProxy(Map<String, String> map) { //從配置中找出是否應(yīng)該引用同一JVM中的服務(wù)蔚携。默認(rèn)行為為true if (shouldJvmRefer(map)) { //injvm本地引用 URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map); //InjvmProtocol.refer返回InjvmInvoker類型 invoker = REF_PROTOCOL.refer(interfaceClass, url); } else { //遠(yuǎn)程引用 //url為配置的peer-to-peer調(diào)用地址希太,或者注冊中心地址 //假如服務(wù)集群有5臺(tái)機(jī)器,你只想測試調(diào)用其中的A,B兩臺(tái)機(jī)器酝蜒,配置的就是直連提供者AB的地址 //<dubbo:reference id="as" interface="AService" url="dubbo://1.1.1.1:20890;第二提供者地址"/> if (urls.size() == 1) { //一個(gè)服務(wù)只有一個(gè)url誊辉,有兩種情況:1.只有一個(gè)注冊中心;2.直連一個(gè)提供者亡脑。 //這里返回的invoker一定是ClusterInvoker子類型堕澄,默認(rèn)為FailoverClusterInvoker, //注冊中心的提供者或者直連url都會(huì)轉(zhuǎn)為普通invoker->directory霉咨,再set到ClusterInvoker中返回 //如果url.protocol=service-discovery-registry蛙紫,對應(yīng)的protocol為:RegistryProtocol invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0)); } else { //一個(gè)服務(wù)多個(gè)url,可能有三種情況:1.多個(gè)注冊中心途戒;2.直連多個(gè)提供者坑傅。3.兩者共存,即url配了注冊中心地址和直連提供者地址棺滞,這種情況不確定可不可以裁蚁,沒試出來。 //如果url中有注冊協(xié)議继准,即從注冊中心訂閱服務(wù)枉证,返回的invoker為ZoneAwareClusterInvoker類型 //無注冊協(xié)議時(shí),返回FailoverClusterInvoker if (registryURL != null) { String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME); invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers)); } else { String cluster = "省略..."; invoker = Cluster.getCluster(cluster).join(new StaticDirectory(invokers)); } } } //JavassistProxyFactory.getProxy return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic)); } }
到這就完成了消費(fèi)者引用服務(wù)代理對象移必,其中Cluster和Invoker的組裝過程室谚,就是上面提到的。
-
補(bǔ)充下master分支中崔泵,改版的使用FactoryBean生成代理對象邏輯:
public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean { @Override public Object getObject() { //調(diào)用ReferenceConfig.get()生成代理對象 return get(); } @Override public Class<?> getObjectType() { //返回服務(wù)接口class return getInterfaceClass(); } }