按照dubbo官網(wǎng)的介紹九默,如下
Apache Dubbo 是一款高性能董栽、輕量級(jí)的開(kāi)源 Java 服務(wù)框架
記得最開(kāi)始蒿往,dubbo是把自己定位成一款高性能的rpc框架掸掸,我們現(xiàn)在還是按照rpc的定位進(jìn)行分析,dubbo的整個(gè)對(duì)外的框架非常的簡(jiǎn)單對(duì)稱(chēng)美拳锚,如下
但是內(nèi)部的實(shí)現(xiàn)非常的復(fù)雜假栓,如下
優(yōu)秀的框架都是相似的寻行,整體對(duì)外的框架非常的簡(jiǎn)單霍掺,但是內(nèi)部設(shè)計(jì)非常的復(fù)雜,將簡(jiǎn)單留給客戶(hù)拌蜘,將復(fù)雜封裝給自己杆烁。
Make It Simple
按照上圖的介紹,除了service和config層简卧,其它的層都是spi兔魂,所謂spi就是支持客戶(hù)自定義的替換,等于說(shuō)整個(gè)duubo可以認(rèn)為是個(gè)戴高樂(lè)積木举娩,而每一層的實(shí)現(xiàn)都是可以替換成用戶(hù)自己的技術(shù)棧析校。這樣也方便各個(gè)公司在引入dubbo的時(shí)候進(jìn)行定制化的改造。
整個(gè)調(diào)用鏈從上往下分為十層铜涉,下面依次簡(jiǎn)單的介紹
1 config層智玻,如下圖,dubbo提供了對(duì)模塊的配置能力芙代,最重要的是ServiceConfig(provide端)與ReferenceConfig(consumer端)的配置能力吊奢,當(dāng)然還有MonitorConfig(監(jiān)控中心的配置),ApplicationConfig(全局的應(yīng)用配置)纹烹,RegistryConfig(注冊(cè)中心的配置)页滚,ProtocolConfig(協(xié)議配置)等召边,dubbo中提供的配置類(lèi)圖依賴(lài)如下
2 proxy代理層,主要是為了服務(wù)接口的透明代理裹驰,對(duì)外提供方便和透明的引用隧熙,生成服務(wù)的客戶(hù)端的stub和服務(wù)端的Skeleton,dubbo中提供的類(lèi)圖依賴(lài)如下
3 registry 注冊(cè)中心層:封裝服務(wù)地址的注冊(cè)與發(fā)現(xiàn)幻林,以服務(wù) URL 為中心贱鼻,擴(kuò)展接口為 RegistryFactory, Registry, RegistryService,dubbo支持多注冊(cè)中心滋将,dubbo中針對(duì)RegistryFactory的類(lèi)依賴(lài)圖如下
4 cluster 路由層:封裝多個(gè)提供者的路由及負(fù)載均衡邻悬,并橋接注冊(cè)中心,以 Invoker 為中心随闽,擴(kuò)展接口為 Cluster, Directory, Router, LoadBalance父丰,針對(duì)Cluster的實(shí)現(xiàn)類(lèi)圖如下
默認(rèn)為FailOverCluster(失敗重試),當(dāng)然我們可以配置自己的策略
5monitor 監(jiān)控層 略
6 protocol 遠(yuǎn)程調(diào)用層:封裝 RPC 調(diào)用掘宪,以 Invocation, Result 為中心蛾扇,擴(kuò)展接口為 Protocol, Invoker, Exporter,我們代碼的大部分都在這一層進(jìn)行分析魏滚,Protocol的類(lèi)圖依賴(lài)關(guān)系如下
7 exchange 信息交換層:封裝請(qǐng)求響應(yīng)模式镀首,同步轉(zhuǎn)異步,以 Request, Response 為中心鼠次,擴(kuò)展接口為 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer更哄,主要是對(duì)transport層的request和response的封裝。
8 transport 網(wǎng)絡(luò)傳輸層:抽象 mina 和 netty 為統(tǒng)一接口腥寇,以 Message 為中心成翩,擴(kuò)展接口為 Channel, Transporter, Client, Server, Codec
9 serialize 數(shù)據(jù)序列化層
針對(duì)上面的類(lèi)圖的依賴(lài),我們可以發(fā)現(xiàn)依賴(lài)圖大多都是矮胖的赦役,等于說(shuō)dubbo給我們默認(rèn)的實(shí)現(xiàn)了很多的策略麻敌,我們只需要按需取用即可(策略模式)
整個(gè)源碼包的模塊如下
。
dubbo主要是服務(wù)的暴露和發(fā)現(xiàn)調(diào)用掂摔,整個(gè)服務(wù)的暴露時(shí)序圖如下
而服務(wù)的發(fā)現(xiàn)引用調(diào)用時(shí)序圖如下
DDD
在 Dubbo 的核心領(lǐng)域模型中:
- Protocol 是服務(wù)域术羔,它是 Invoker 暴露和引用的主功能入口,它負(fù)責(zé) Invoker 的生命周期管理乙漓〖独可以認(rèn)為Protocol實(shí)現(xiàn)了對(duì)Invoke的封裝
- Invoker 是實(shí)體域,它是 Dubbo 的核心模型簇秒,其它模型都向它靠擾鱼喉,或轉(zhuǎn)換成它,它代表一個(gè)可執(zhí)行體,可向它發(fā)起 invoke 調(diào)用扛禽,它有可能是一個(gè)本地的實(shí)現(xiàn)锋边,也可能是一個(gè)遠(yuǎn)程的實(shí)現(xiàn),也可能一個(gè)集群實(shí)現(xiàn)编曼。
- Invocation 是會(huì)話(huà)域豆巨,它持有調(diào)用過(guò)程中的變量,比如方法名掐场,參數(shù)等往扔。
我們的代碼的分析也主要集中在Protocol和Invoker上面。
當(dāng)我們希望將某個(gè)service暴露成dubbo接口的時(shí)候熊户,只需要使用dubbo的注解@service即可萍膛,然后該service會(huì)被注冊(cè)到Spring里面成為一個(gè)bean,同時(shí)也會(huì)生成一個(gè)ServiceBean嚷堡,其refer這個(gè)bean蝗罗,所以我們的分析也重點(diǎn)在ServiceBean,ServiceBean的繼承關(guān)系如下
由于ServiceBean實(shí)現(xiàn)了InitializingBean蝌戒,所以在其屬性填充完畢之后串塑,執(zhí)行afterPropertiesSet(),在afterPropertiesSet主要是對(duì)各種配置進(jìn)行檢查填充和校驗(yàn)。
而又由于ServiceBean實(shí)現(xiàn)了ApplicationListener<ContextRefreshedEvent>北苟,所以在監(jiān)聽(tīng)到ContextRefreshedEvent之后桩匪,如下
Class ServiceBean
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (isDelay() && !isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();//暴露和注冊(cè)服務(wù)
}
}
可以看到export方法是其最核心的方法。我們接下來(lái)對(duì)export方法進(jìn)行進(jìn)一步的分析友鼻。
export是在ServiceBean的父類(lèi)ServiceConfig里面實(shí)現(xiàn)的傻昙,如下
Class ServiceConfig
public synchronized void export() {
if (provider != null) {
if (export == null) {
export = provider.getExport();
}
if (delay == null) {
delay = provider.getDelay();
}
}
if (export != null && !export) {
return;
}
//是否需要延遲暴露,如果需要的話(huà)使用線程池異步線程實(shí)現(xiàn)暴露
//如果在啟動(dòng)的時(shí)候比較慢桃移,可以設(shè)置延遲暴露的方式
if (delay != null && delay > 0) {
delayExportExecutor.schedule(new Runnable() {
@Override
public void run() {
doExport();
}
}, delay, TimeUnit.MILLISECONDS);
} else {
doExport();
}
}
protected synchronized void doExport() {
****
checkApplication();
checkRegistry();
checkProtocol();
appendProperties(this);
checkStub(interfaceClass);
checkMock(interfaceClass);
if (path == null || path.length() == 0) {
path = interfaceName;
}
//核心
doExportUrls();
ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
}
在doExport方法里面又做了很多的校驗(yàn)屋匕,而最重要的方法就是doExportUrls。
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
如上我們可以看到借杰,dubbo是支持多注冊(cè)中心和多protocol的,針對(duì)每一個(gè)注冊(cè)地址进泼,每一個(gè)protocol蔗衡,都會(huì)調(diào)用一次 doExportUrlsFor1Protocol(protocolConfig, registryURLs)。
一般來(lái)來(lái)說(shuō)乳绕,不是非常特殊的場(chǎng)景绞惦,我們一般都是單注冊(cè)中心(默認(rèn)是zk),單Protocol(dubbo洋措,注意這里是rpc協(xié)議)济蝉,如下如,我的電腦就是配置的單注冊(cè)中心,單rpc協(xié)議(dubbo)
而接下來(lái)的doExportUrlsFor1Protocol方法寫(xiě)的比較混亂王滤,代碼比較長(zhǎng)贺嫂,我們一行行看一下。
Class ServiceConfig
doExportUrlsFor1Protocol 分段分析如下
String name = protocolConfig.getName();
if (name == null || name.length() == 0) {
name = "dubbo";
}
Map<String, String> map = new HashMap<String, String>();
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
appendParameters(map, application); //收集application配置信息
appendParameters(map, module); //收集module配置信息
appendParameters(map, provider, Constants.DEFAULT_KEY); //收集provider配置信息
appendParameters(map, protocolConfig); //收集protocolConfig配置信息
appendParameters(map, this); //收集ServiceConfig配置信息
可以看到依次將 application --->module--->provider--->protocolConfig--->ServiceConfig
的信息收集到map里面來(lái)雁乡,如果存在重復(fù)的配置信息第喳,后面的配置會(huì)覆蓋前面的配置,所以針對(duì)一些全局的缺省配置我們可以配置在前面踱稍,而一些很細(xì)節(jié)的配置曲饱,我們配置在后面即可。
在ServiceConfig里面可以針對(duì)具體的method進(jìn)行進(jìn)一步的配置珠月,如使用注解配置扩淀,樣例如下
@Service(methods = [@Method(name = "orderCancel",retries = 2)])
在進(jìn)行服務(wù)暴露的過(guò)程中,有一段代碼如下
exportLocal(url);
private void exportLocal(URL url) {
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
URL local = URL.valueOf(url.toFullString())
.setProtocol(Constants.LOCAL_PROTOCOL)
.setHost(LOCALHOST)
.setPort(0);
StaticContext.getContext(Constants.SERVICE_IMPL_CLASS).put(url.getServiceKey(), getServiceClass(ref));
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
}
}
其中
protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
會(huì)將ref就是真實(shí)處理的bean啤挎,stub interfaceClass 和local url進(jìn)行封裝成一個(gè)Invoker就行暴露引矩,而在dubbo中最重要對(duì)象就是這個(gè)Invoker。
而在暴露Invoker的時(shí)候侵浸,首先要拿到proxyFactory旺韭,其中proxyFactory的初始化語(yǔ)句如下
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
這個(gè)proxyFactory是動(dòng)態(tài)生成的(也是真牛)手動(dòng)的生成java文件,然后編譯加載掏觉,這種使用代碼的方式來(lái)動(dòng)態(tài)生成代碼的方式更加的靈活区端,在我的電腦上,動(dòng)態(tài)生成的這個(gè)proxyFactory的代碼如下
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adaptive implements com.alibaba.dubbo.rpc.ProxyFactory {
public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0);
}
public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0, boolean arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0, arg1);
}
public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException {
if (arg2 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg2;
String extName = url.getParameter("proxy", "javassist");
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}
}
從上面的代碼我們可以認(rèn)為 ProxyFactory$Adaptive就是對(duì)JavassistProxyFactory的簡(jiǎn)單的代理澳腹。
而在拿到Invoke之后织盼,使用protocol對(duì)其生命周期進(jìn)行管理,我們使用同樣的方法酱塔,看protocol動(dòng)態(tài)生成的代碼如下
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
public void destroy() {throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
}
有上面的代碼我們可以知道沥邻,最終會(huì)調(diào)用InjvmProtocol的export方法,分析到最后我們發(fā)現(xiàn)其實(shí)這個(gè)方法將該service在當(dāng)前的jvm中暴露出來(lái)羊娃。
最重要的是接下的在注冊(cè)中心的暴露唐全,源碼如下
Class ServiceConfig
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
在我 的本機(jī)上registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())的結(jié)果是
registry://***:2181/com.alibaba.dubbo.registry.RegistryService?application=contract-logistics-wms&backup=****:2181,***:2181&dubbo=2.0.2&export=dubbo%3A%2F%2F10.10.131.127%3A20880%2Fcom.**.wms.dubbo.service.OrderCancelService%3Fanyhost%3Dtrue%26application%3Dcontract-logistics-wms%26bean.name%3DServiceBean%3Acom.zto.wms.dubbo.service.OrderCancelService%26bind.ip%3D10.10.131.127%26bind.port%3D20880%26default.delay%3D-1%26default.retries%3D2%26default.service.filter%3DCatTransaction%26delay%3D-1%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.zto.wms.dubbo.service.OrderCancelService%26logger%3Dslf4j%26methods%3DorderCancel%26orderCancel.retries%3D2%26orderCancel.return%3Dtrue%26pid%3D24140%26side%3Dprovider%26timestamp%3D1618222873762&logger=slf4j&pid=24140®ister=true®istry=zookeeper&subscribe=true×tamp=1618222873751
最終拿到的invoker如下圖
第二句
DelegateProviderMetaDataInvoker(invoker, this);
其實(shí)就是將當(dāng)前的serviceConfig最為metadata跟invoke一起封裝起來(lái)(典型的裝飾模式)
第三句
Exporter<?> exporter = protocol.export(wrapperInvoker)
根據(jù)前面的分析,最終會(huì)調(diào)到RegistryProtocol的export方法蕊玷,其代碼如下
Class RegistryProtocol
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
URL registryUrl = getRegistryUrl(originInvoker);
//registry provider
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
//to judge to delay publish whether or not
boolean register = registeredProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
if (register) {
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}
在該方法中主要分兩步
第一步 將服務(wù)在本地暴露出來(lái)(如果不暴露出來(lái)邮利,別人無(wú)法調(diào)用)
第二步 將服務(wù)注冊(cè)到zk(如果不注冊(cè),別人不知道有這個(gè)服務(wù))
先看第一步
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
//這一步是關(guān)鍵垃帅,由于invokerDelegete.getUrl.getProtol = "dubbo"
//所以最終的調(diào)用了DubboProtocol的export方法
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
我們跟到DubboProtocol的export的方法里面去延届,代碼如下
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
//最重要的是這句
openServer(url);
optimizeSerialization(url);
return exporter;
}
我們跟到openServer(url)里面去
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
首先看當(dāng)前ip:port下是否開(kāi)啟了本地服務(wù),如果沒(méi)有那么調(diào)用createServer贸诚,
在dubbo中默認(rèn)使用的是netty server方庭,代碼如下
private ExchangeServer createServer(URL url) {
// send readonly event when server closes, it's enabled by default
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
//這句綁定url(包含ip和port)和請(qǐng)求處理器
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
Class Exchangers
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);
}
Class HeaderExchanger
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);
}
而getTransporter()最終會(huì)返回netty4 的NettyTransporter
如下
Class NettyTransporter
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
Class NettyServer
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
Class AbstractServer
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = NetUtils.ANYHOST;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
try {
//這句最重要
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
//fixme replace this with better method
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
如果你能看到這里厕吉,真的很佩服你,終于看到了我們熟悉的netty代碼械念,后面我們會(huì)專(zhuān)門(mén)的針對(duì)netty做介紹(現(xiàn)在我還了解的不深入)
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
通過(guò)如上的操作之后头朱,在本地起了個(gè)netty服務(wù),理論上我們現(xiàn)在就可以接受rpc請(qǐng)求了订讼,但是我們還需要回到開(kāi)頭髓窜,將服務(wù)注冊(cè)到zk上面去,回到開(kāi)頭欺殿。
@Class RegistryProtocol
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker 前面講的一大堆就是講的這個(gè)
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//找到注冊(cè)的zk地址
URL registryUrl = getRegistryUrl(originInvoker);
//registry provider
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
//to judge to delay publish whether or not
boolean register = registeredProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
if (register) {
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}
最終在ZookeeperRegistry調(diào)用doRegister進(jìn)行注冊(cè)
代碼如下
protected void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
經(jīng)過(guò)上面的操作之后寄纵,最終會(huì)在
/dubbo/com.***.wms.dubbo.service.OrderCancelService/providers 永久節(jié)點(diǎn)下注冊(cè)一個(gè)臨時(shí)節(jié)點(diǎn)信息,巧妙的利用了zk的永久節(jié)點(diǎn)和臨時(shí)節(jié)點(diǎn)的特征脖苏,可以動(dòng)態(tài)的增減注冊(cè)信息程拭。
然后在此path下注冊(cè)監(jiān)聽(tīng)器防止暴露的url被重寫(xiě)(這個(gè)邏輯可以先忽略,我也沒(méi)搞清楚)
最終返回一個(gè)DestroyableExporter棍潘,可以在返回的時(shí)候取消所有注冊(cè)信息恃鞋。
如上就是一個(gè)dubbo service暴露的全過(guò)程,后面我們接著介紹一個(gè)service unexport的全過(guò)程亦歉。