先思考下俐末,如果要實(shí)現(xiàn)服務(wù)發(fā)布和注冊(cè),需要做哪些事情?
- 配置文件解析或注解解析
- 啟動(dòng) netty 服務(wù)實(shí)現(xiàn)遠(yuǎn)程監(jiān)聽
- 服務(wù)注冊(cè)
服務(wù)端無外乎就這3件事,接下來我們用源碼分析來論證我們的猜想
Dubbo集成Spring
Spring 的標(biāo)簽擴(kuò)展
在 spring 中定義了兩個(gè)接口
1.NamespaceHandler: 注冊(cè)一堆 BeanDefinitionParser角钩,利用他們來進(jìn)行解析配置
2.BeanDefinitionParser:用于解析每個(gè) element 的內(nèi)容
Spring 默認(rèn)會(huì)加載 jar 包下的 META-INF/spring.handlers 文件尋找對(duì)應(yīng)的 NamespaceHandler。 Dubbo-config 模塊下的 dubbo-config-spring
Dubbo 的接入實(shí)現(xiàn)
Dubbo 中 spring 擴(kuò)展就是使用 spring 的自定義類型,所以同樣也有 NamespaceHandler递礼、BeanDefinitionParser惨险。而NamespaceHandler 是 DubboNamespaceHandler
public class DubboNamespaceHandler extends NamespaceHandlerSupport {
static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}
@Override
public void init() {
//BeanDefinitionParser接口全部都使用了 DubboBeanDefinitionParser實(shí)現(xiàn)
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true));
registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(MetadataReportConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("metrics", new DubboBeanDefinitionParser(MetricsConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
}
}
BeanDefinitionParser 全部都使用了 DubboBeanDefinitionParser,如果我們想看 dubbo:service 的配置脊髓,就直接看DubboBeanDefinitionParser(ServiceBean.class,true)
init()方法就是把不同的配置分別轉(zhuǎn)化成 spring 容器中的 bean 對(duì)象
application 對(duì)應(yīng) ApplicationConfig
registry 對(duì)應(yīng) RegistryConfig
monitor 對(duì)應(yīng) MonitorConfig
provider 對(duì)應(yīng) ProviderConfig
consumer 對(duì)應(yīng) ConsumerConfig
我們仔細(xì)看辫愉,發(fā)現(xiàn)涉及到服務(wù)發(fā)布和服務(wù)調(diào)用的兩個(gè)配置的解析,使用的是 ServiceBean 和 referenceBean将硝。并不是 config 結(jié)尾的恭朗,這兩個(gè)類稍微特殊些,當(dāng)然他同時(shí)也繼承了 ServiceConfig 和 ReferenceConfig依疼。
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
DubboBeanDefinitionParser
這里面是實(shí)現(xiàn)具體配置文件解析的入口痰腮,它重寫了 parse 方法,對(duì) spring 的配置進(jìn)行解析律罢。
public class DubboBeanDefinitionParser implements BeanDefinitionParser {
@SuppressWarnings("unchecked")
private static BeanDefinition parse(Element element, ParserContext parserContext, Class<?> beanClass, boolean required) {
//省略其他代碼...
else if (ServiceBean.class.equals(beanClass)) {
String className = element.getAttribute("class");
if (className != null && className.length() > 0) {
RootBeanDefinition classDefinition = new RootBeanDefinition();
classDefinition.setBeanClass(ReflectUtils.forName(className));
classDefinition.setLazyInit(false);
parseProperties(element.getChildNodes(), classDefinition);
beanDefinition.getPropertyValues().addPropertyValue("ref", new BeanDefinitionHolder(classDefinition, id + "Impl"));
}
}
//省略其他代碼...
}
}
我們關(guān)注一下 ServiceBean 的解析膀值,實(shí)際就是解析 dubbo:service 這個(gè)標(biāo)簽中對(duì)應(yīng)的屬性
ServiceBean 的實(shí)現(xiàn)
ServiceBean 這個(gè)類,分別實(shí)現(xiàn)了 InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware, ApplicationEventPublisherAware
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,
ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware,
ApplicationEventPublisherAware {
-
InitializingBean
接口為 bean 提供了初始化方法的方式误辑,它只包括 afterPropertiesSet 方法沧踏,凡是繼承該接口的類,在初始化 bean 的時(shí)候會(huì)執(zhí)行該方法巾钉,被重寫的方法為 afterPropertiesSet翘狱。 - DisposableBean
被重寫的方法為 destroy,bean 被銷毀的時(shí)候,spring 容器會(huì)自動(dòng)執(zhí)行 destory 方法睛琳,比如釋放資源 - ApplicationContextAware
實(shí)現(xiàn)了這個(gè)接口的 bean盒蟆,當(dāng) spring 容器初始化的時(shí)候,會(huì)自動(dòng)的將 ApplicationContext 注入進(jìn)來 -
ApplicationListener
ApplicationEvent 事件監(jiān)聽师骗,spring 容器啟動(dòng)后會(huì)發(fā)一個(gè)事件通知历等。被重寫的方法為:onApplicationEvent,onApplicationEvent方法傳入的對(duì)象是 ContextRefreshedEvent辟癌。這個(gè)對(duì)象是當(dāng) Spring 的上下文被刷新或者加載完畢的時(shí)候觸發(fā)的寒屯。因此服務(wù)就是在Spring 的上下文刷新后進(jìn)行導(dǎo)出操作的。 - BeanNameAware
獲得自身初始化時(shí)黍少,本身的 bean 的 id 屬性寡夹,被重寫的方法為 setBeanName - ApplicationEventPublisherAware
這個(gè)是一個(gè)異步事件發(fā)送器。被重寫的方法為 setApplicationEventPublisher,簡(jiǎn)單來說厂置,在 spring 里面提供了類似于消息隊(duì)列的異步事件解耦功能菩掏。(典型的觀察者模式的應(yīng)用)
spring 事件發(fā)送監(jiān)聽由 3 個(gè)部分組成:
1.ApplicationEvent:表示事件本身,自定義事件需要繼承該類
2.ApplicationEventPublisherAware:事件發(fā)送器昵济,需要實(shí)現(xiàn)該接口
3.ApplicationListener:事件監(jiān)聽器接口
ServiceBean 中服務(wù)暴露過程
在 ServiceBean 中智绸,我們暫且只需要關(guān)注兩個(gè)方法野揪,分別是:
1.在初始化 bean 的時(shí)候會(huì)執(zhí)行該方法 afterPropertiesSet
2.spring 容器啟動(dòng)后會(huì)發(fā)一個(gè)事件通知 onApplicationEvent
afterPropertiesSet
我們發(fā)現(xiàn)這個(gè)方法里面,就是把 dubbo 中配置的 application瞧栗、registry斯稳、service、protocol 等配置信息迹恐,加載到對(duì)應(yīng)的 config實(shí)體中挣惰,便于后續(xù)使用。大家可以自行看代碼殴边。
onApplicationEvent
spring 容器啟動(dòng)之后憎茂,會(huì)收到一個(gè)這樣的事件通知,這里面做了兩個(gè)事情:
? 1.判斷服務(wù)是否已經(jīng)發(fā)布過
? 2.如果沒有發(fā)布找都,則調(diào)用調(diào)用 export 進(jìn)行服務(wù)發(fā)布的流程(這里就是入口)
//監(jiān)聽 spring上下文被刷新或者加載的時(shí)候觸發(fā)
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (!isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export(); //導(dǎo)出唇辨、發(fā)布
}
}
export
@Override
public void export() {
super.export();
// Publish ServiceBeanExportedEvent
publishExportEvent();
}
serviceBean 中,重寫了 export 方法能耻,實(shí)現(xiàn)了 一個(gè)事件的發(fā)布赏枚。并且調(diào)用了 super.export() ,也就是會(huì)調(diào)用serviceBean 的父類ServiceConfig的 export 方法晓猛。
先整體來看一下這個(gè)父類ServiceConfig的作用饿幅,從名字來看,它應(yīng)該和其他所有 config 類一樣去實(shí)現(xiàn)對(duì)配置文件中 service 的配置信息的存儲(chǔ)戒职。
實(shí)際上這個(gè)類并不單純栗恩,所有的配置它都放在了一個(gè) AbstractServiceConfig 的抽象類,自己實(shí)現(xiàn)了很多對(duì)于服務(wù)發(fā)布之前要做的操作邏輯
ServiceConfig 配置類
我們接分析super.export();
export()
public synchronized void export() {
checkAndUpdateSubConfigs(); //檢查并且更新配置信息
if (!shouldExport()) { //當(dāng)前的服務(wù)是否需要發(fā)布, 通過配置實(shí)現(xiàn):@Service(export = false)
return;
}
if (shouldDelay()) {//檢查是否需要延時(shí)發(fā)布洪燥,通過配置@Service(delay = 1000)實(shí)現(xiàn)磕秤,單位毫秒
//這里的延時(shí)是通過定時(shí)器來實(shí)現(xiàn)
delayExportExecutor.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
doExport(); //如果沒有配置 delay,則直接調(diào)用 doExport 進(jìn)行發(fā)布
}
}
doExport
這里仍然還是在實(shí)現(xiàn)發(fā)布前的各種判斷捧韵,比如判斷
protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
}
if (exported) {//服務(wù)是否已經(jīng)發(fā)布過了
return;
}
exported = true;//設(shè)置發(fā)布狀態(tài)
if (StringUtils.isEmpty(path)) { //path 表示服務(wù)路徑市咆,默認(rèn)使用 interfaceName
path = interfaceName;
}
doExportUrls();
}
doExportUrls
- 記載所有配置的注冊(cè)中心地址
- 遍歷所有配置的協(xié)議,protocols
- 針對(duì)每種協(xié)議發(fā)布一個(gè)對(duì)應(yīng)協(xié)議的服務(wù)
private void doExportUrls() {
//加載所有配置的注冊(cè)中心的地址再来,組裝成一個(gè) URL
//URL(來驅(qū)動(dòng)流程的執(zhí)行)->[ registry://192.168.1.102:2181/org.apache.dubbo.registry.RegsitryService/....]
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
//group 跟 version 組成一個(gè) pathKey(serviceName)
String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
//applicationModel 用來存儲(chǔ) ProviderModel蒙兰,發(fā)布的服務(wù)的元數(shù)據(jù),后續(xù)會(huì)用到
ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
ApplicationModel.initProviderModel(pathKey, providerModel);
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
doExportUrlsFor1Protocol
發(fā)布指定協(xié)議的服務(wù)芒篷,我們以 Dubbo 服務(wù)為例搜变,由于代碼太多,就不全部貼出來
- 前面的一大串 if else 代碼是為了把當(dāng)前服務(wù)下所配置的<dubbo:method>參數(shù)進(jìn)行解析针炉,保存到map集合中
- 獲得當(dāng)前服務(wù)需要暴露的 ip 和端口
- 把解析到的所有數(shù)據(jù)挠他,組裝成一個(gè) URL,大概應(yīng)該是:
dubbo://192.168.1.102:20881/com.wei.ISayHelloService
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
//省略一大串 ifelse 代碼,用于解析<dubbo:method> 配置
//省略解析<dubbo:service>中配置參數(shù)的代碼篡帕,比如 token绩社、比如 service 中的 method 名稱等存儲(chǔ)在 map 中
//獲得當(dāng)前服務(wù)要發(fā)布的目標(biāo) ip 和 port
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
//組裝 URL
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
//這里是通過 ConfiguratorFactory 去實(shí)現(xiàn)動(dòng)態(tài)改變配置的功能摔蓝,這里暫時(shí)不涉及后續(xù)再分析
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
//url已經(jīng)組裝好了,接下來只需要發(fā)布愉耙。(context->url)
//scope 選擇服務(wù)發(fā)布的范圍(local/remote)
//同一個(gè)jvm里面調(diào)用,沒必要走遠(yuǎn)程通信 拌滋; injvm://ip:port..
//remote : dubbo://ip:port
//默認(rèn)情況下朴沿,如果是配置remote(registry),默認(rèn)發(fā)布遠(yuǎn)程和本地
String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// 如果是本地發(fā)布败砂,則直接調(diào)用exportLocal
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url); //TODO
}
// export to remote if the config is not local (export to local only when config is local)
//發(fā)布遠(yuǎn)程服務(wù)
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
下面remote代碼
//...
}
- local
服務(wù)只是 injvm 的服務(wù)赌渣,提供一種消費(fèi)者和提供者都在一個(gè) jvm 內(nèi)的調(diào)用方式。使用了 Injvm 協(xié)議昌犹,是一個(gè)偽協(xié)議坚芜,它不開啟端口,不發(fā)起遠(yuǎn)程調(diào)用斜姥,只在 JVM 內(nèi)直接關(guān)聯(lián)鸿竖,(通過集合的方式保存了發(fā)布的服務(wù)信息),但執(zhí)行 Dubbo 的 Filter 鏈铸敏。簡(jiǎn)單來說缚忧,就是你本地的 dubbo 服務(wù)調(diào)用,都依托于 dubbo 的標(biāo)準(zhǔn)來進(jìn)行杈笔。這樣可以享受到 dubbo 的一些配置服務(wù)闪水。 - remote
for (URL registryURL : registryURLs) {
//省略部分代碼...
//這個(gè)invoker具體是什么,下面會(huì)單獨(dú)開一個(gè)模塊分析
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
表示根據(jù)根據(jù)配置的注冊(cè)中心進(jìn)行遠(yuǎn)程發(fā)布蒙具。 遍歷多個(gè)注冊(cè)中心球榆,進(jìn)行協(xié)議的發(fā)布
- Invoker 是一個(gè)代理類,它是 Dubbo 的核心模型禁筏,其它模型都向它靠擾持钉,或轉(zhuǎn)換成它,它代表一個(gè)可執(zhí)行體融师,可向它發(fā)起invoke 調(diào)用右钾,它有可能是一個(gè)本地的實(shí)現(xiàn),也可能是一個(gè)遠(yuǎn)程的實(shí)現(xiàn)旱爆,也可能一個(gè)集群實(shí)現(xiàn)舀射。(很重要,后續(xù)單獨(dú)分析)
- DelegateProviderMetaDataInvoker怀伦,因?yàn)?2.7 引入了元數(shù)據(jù)脆烟,所以這里對(duì) invoker 做了委托,把 invoker 交給DelegateProviderMetaDataInvoker 來處理
- 調(diào)用 protocol.export(invoker)來發(fā)布這個(gè)代理
- 添加到 exporters 集合
protocol.export
protocol.export房待,這個(gè) protocol 是什么呢邢羔?找到定義處發(fā)現(xiàn)它是一個(gè)自適應(yīng)擴(kuò)展點(diǎn)驼抹,打開 Protocol 這個(gè)擴(kuò)展點(diǎn),又可以看到它是一個(gè)在方法層面上的自適應(yīng)擴(kuò)展拜鹤,意味著它實(shí)現(xiàn)了對(duì)于 export 這個(gè)方法的適配框冀。也就意味著這個(gè) Protocol 是一個(gè)動(dòng)態(tài)代理類,Protocol$Adaptive
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
這個(gè)動(dòng)態(tài)代理類敏簿,會(huì)根據(jù) url 中配置的 protocol name 來實(shí)現(xiàn)對(duì)應(yīng)協(xié)議的適配
Protocol$Adaptive
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
public void destroy() {
throw new UnsupportedOperationException("The method public abstract void org.apache.dubbo.rpc.Protocol.destroy()of interface org.apache.dubbo.rpc.Protocol is not adaptive method !");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort()of interface org.apache.dubbo.rpc.Protocol is not adaptive method !");
public org.apache.dubbo.rpc.Exporter export (org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys ([protocol])");
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
public org.apache.dubbo.rpc.Invoker refer (java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys ([protocol])");
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
}
}
那么在當(dāng)前的場(chǎng)景中明也,protocol 會(huì)是調(diào)用誰呢?目前發(fā)布的 invoker(URL)惯裕,實(shí)際上是一個(gè) registry://協(xié)議温数,所以Protocol$Adaptive,會(huì)通過 getExtension(extName)得到一個(gè) RegistryProtocol蜻势。
RegistryProtocol.export
很明顯撑刺,這個(gè) RegistryProtocol 是用來實(shí)現(xiàn)服務(wù)注冊(cè)的,這里面會(huì)有很多處理邏輯:
? 實(shí)現(xiàn)對(duì)應(yīng)協(xié)議的服務(wù)發(fā)布
? 實(shí)現(xiàn)服務(wù)注冊(cè)
? 訂閱服務(wù)重寫
//實(shí)現(xiàn)服務(wù)的注冊(cè)和發(fā)布
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//這里獲得的是 zookeeper 注冊(cè)中心的 url: zookeeper://ip:port
URL registryUrl = getRegistryUrl(originInvoker);
//這里是獲得服務(wù)提供者的 url, dubbo://ip:port...
URL providerUrl = getProviderUrl(originInvoker);
//訂閱 override 數(shù)據(jù)握玛。在 admin 控制臺(tái)可以針對(duì)服務(wù)進(jìn)行治理够傍,比如修改權(quán)重,修改路由機(jī)制等败许,當(dāng)注冊(cè)中心有此服務(wù)的覆蓋配置注冊(cè)進(jìn)來時(shí)王带,推送消息給提供者,重新暴露服務(wù)
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
/***********************************/
//(很重要)1.doLocalExport這里就是真正的服務(wù)發(fā)布邏輯
//這里就交給了具體的協(xié)議去暴露服務(wù)( 本質(zhì)就是去啟動(dòng)一個(gè)netty服務(wù))
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
//很重要)2.getRegistry這里就是真正的服務(wù)注冊(cè)邏輯
// 根據(jù) invoker 中的 url 獲取 Registry 實(shí)例: zookeeperRegistry
final Registry registry = getRegistry(originInvoker);
//獲取要注冊(cè)到注冊(cè)中心的 URL: dubbo://ip:port
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);
//to judge if we need to delay publish
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {{//是否配置了注冊(cè)中心市殷,如果是愕撰, 則需要注冊(cè)
//注冊(cè)到注冊(cè)中心的 URL
register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}
//設(shè)置注冊(cè)中心的訂閱
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
//保證每次 export 都返回一個(gè)新的 exporter 實(shí)例
return new DestroyableExporter<>(exporter);
}
doLocalExport
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
//bounds -chm ->computeIfAbsent if(map.get(key)==null){map.put()}
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
//對(duì)原有的 invoker,委托給了 InvokerDelegate,這個(gè)invoker具體是什么,下面會(huì)單獨(dú)分析
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
//將 invoker 轉(zhuǎn)換為 exporter 并啟動(dòng) netty 服務(wù)
//protocol.export -> DubboProtocol.export(本質(zhì)上就是 暴露一個(gè) 20880的端口)
//protocol- >Protocol$Apaptive ->QosProtocolWrapper(ProtocolListenerWrapper(ProtocolFilterWrapper(DubboProtocol(invoker))))
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
先通過 doLocalExport 來暴露一個(gè)服務(wù)醋寝,本質(zhì)上應(yīng)該是啟動(dòng)一個(gè)通信服務(wù),主要的步驟是將本地 ip 和 20880 端口打開黑毅,進(jìn)行監(jiān)聽
originInvoker: 應(yīng)該是 registry://ip:port/com.alibaba.dubbo.registry.RegistryService
key: 從 originInvoker 中獲得發(fā)布協(xié)議的 url: dubbo://ip:port/...
bounds: 一個(gè) prviderUrl 服務(wù) export 之后充甚,緩存到 bounds 中命锄,所以一個(gè) providerUrl 只會(huì)對(duì)應(yīng)一個(gè) exporter
computeIfAbsent 就相當(dāng)于, java8 的語法
if(bounds.get(key)==null){
bounds.put(key,s->{})
}
InvokerDelegete: 是 RegistryProtocol 的一個(gè)靜態(tài)內(nèi)部類十办,該類是一個(gè) originInvoker 的委托類,該類存儲(chǔ)了 originInvoker嗅绰,其父類 InvokerWrapper 還會(huì)存儲(chǔ) providerUrl舍肠,InvokerWrapper 會(huì)調(diào)用 originInvoker 的 invoke 方法,也會(huì)銷毀 invoker窘面〈溆铮可以管理 invoker 的生命周期。
protocol.export
public class RegistryProtocol implements Protocol {
private Protocol protocol;
//set方法設(shè)置一個(gè)依賴擴(kuò)展點(diǎn)(DubboProtocol) 包裝
public void setProtocol(Protocol protocol) {
this.protocol = protocol;
}
}
基于動(dòng)態(tài)代理的適配财边,很自然的就過渡到了 DubboProtocol 這個(gè)協(xié)議類中肌括,但是實(shí)際上是 DubboProtocol 嗎?
這里并不是獲得一個(gè)單純的 DubboProtocol 擴(kuò)展點(diǎn)酣难,而是會(huì)通過 Wrapper 對(duì) Protocol 進(jìn)行裝飾谍夭,裝飾器分別為:
QosProtocolWrapper/ProtocolListenerWrapper/ProtocolFilterWrapper/DubboProtocol
為什么是這樣黑滴?我們?cè)賮砜纯?spi 的代碼
DubboProtocol.export
Wrapper 包裝
在 ExtensionLoader.loadClass 這個(gè)方法中,有一段這樣的判斷紧索,如果當(dāng)前這個(gè)類是一個(gè) wrapper 包裝類袁辈,也就是這個(gè) wrapper中有構(gòu)造方法,參數(shù)是當(dāng)前被加載的擴(kuò)展點(diǎn)的類型齐板,則把這個(gè) wrapper 類加入到 cacheWrapperClass 緩存中吵瞻。
else if (isWrapperClass(clazz)) {
cacheWrapperClass(clazz);
}
private boolean isWrapperClass(Class<?> clazz) {
try {
clazz.getConstructor(type);
return true;
} catch (NoSuchMethodException e) {
return false;
}
}
我們可以在 dubbo 的配置文件(dubbo-qos/dubbo-rpc-api)中找到三個(gè) Wrapper
qos=org.apache.dubbo.qos.protocol.QosProtocolWrapper
filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
- QosprotocolWrapper, 如果當(dāng)前配置了注冊(cè)中心甘磨,則會(huì)啟動(dòng)一個(gè) Qos server.qos 是 dubbo 的在線運(yùn)維命令,dubbo2.5.8 新版本重構(gòu)了 telnet 模塊眯停,提供了新的 telnet 命令支持济舆,新版本的 telnet 端口與 dubbo 協(xié)議的端口是不同的端口,默認(rèn)為 22222
- ProtocolFilterWrapper莺债,對(duì) invoker 進(jìn)行 filter 的包裝滋觉,實(shí)現(xiàn)請(qǐng)求的過濾
- ProtocolListenerWrapper, 用于服務(wù) export 時(shí)候插入監(jiān)聽機(jī)制齐邦,暫未實(shí)現(xiàn)
接著椎侠,在 getExtension->createExtension 方法中,會(huì)對(duì) cacheWrapperClass 集合進(jìn)行判斷措拇,如果集合不為空我纪,則進(jìn)行包裝
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
ProtocolFilterWrapper
這個(gè)是一個(gè)過濾器的包裝,使用責(zé)任鏈模式丐吓,對(duì) invoker 進(jìn)行了包裝
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
//構(gòu)建責(zé)任鏈浅悉,基于激活擴(kuò)展點(diǎn)
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.g
etUrl(), key, group);
}
我們看如下文件:
/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Filter
默認(rèn)提供了非常多的過濾器。 然后基于條件激活擴(kuò)展點(diǎn)券犁,來對(duì) invoker 進(jìn)行包裝术健,從而在實(shí)現(xiàn)遠(yuǎn)程調(diào)用的時(shí)候,會(huì)經(jīng)過這些filter 進(jìn)行過濾粘衬。
export
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
//獲取服務(wù)標(biāo)識(shí)荞估,理解成服務(wù)坐標(biāo)也行。由服務(wù)組名稚新,服務(wù)名勘伺,服務(wù)版本號(hào)以及端口組成。比如
//${group}/copm.wei.ISayHelloService:${version}:20880
String key = serviceKey(url);
//創(chuàng)建 DubboExporter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
// 將 <key, exporter> 鍵值對(duì)放入緩存中,可以猜測(cè)到客戶端調(diào)用時(shí)枷莉,會(huì)用到這個(gè)map娇昙,獲取對(duì)應(yīng)的invoker進(jìn)行服務(wù)調(diào)用,這個(gè)invoker具體是什么,下面會(huì)單獨(dú)分析
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url); //openServer(url) 開啟一個(gè)服務(wù) 笤妙,暴露20880端口
optimizeSerialization(url); //優(yōu)化序列化
return exporter;
}
openServer
去開啟一個(gè)服務(wù)冒掌,并且放入到緩存中->在同一臺(tái)機(jī)器上(單網(wǎng)卡)噪裕,同一個(gè)端口上僅允許啟動(dòng)一個(gè)服務(wù)器實(shí)例
private void openServer(URL url) {
// 獲取 host:port,并將其作為服務(wù)器實(shí)例的 key股毫,用于標(biāo)識(shí)當(dāng)前的服務(wù)器實(shí)例
String key = url.getAddress();
////client 也可以暴露一個(gè)只有 server 可以調(diào)用的服務(wù)
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
//是否在 serverMap 中緩存了
ExchangeServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
// 創(chuàng)建服務(wù)器實(shí)例
serverMap.put(key, createServer(url));
}
}
} else {
// 服務(wù)器已創(chuàng)建膳音,則根據(jù) url 中的配置重置服務(wù)器
server.reset(url);
}
}
}
createServer
創(chuàng)建服務(wù),開啟心跳檢測(cè),默認(rèn)使用 netty铃诬。組裝 url
private ExchangeServer createServer(URL url) {
//組裝 url祭陷,在 url 中添加心跳時(shí)間、編解碼參數(shù)
url = URLBuilder.from(url)
// 當(dāng)服務(wù)關(guān)閉以后趣席,發(fā)送一個(gè)只讀的事件兵志,默認(rèn)是開啟狀態(tài)
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// 啟動(dòng)心跳配置
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
//獲得當(dāng)前應(yīng)該采用什么樣的方式來發(fā)布服務(wù), netty3, netty4, mina , grizzy,
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
//通過 SPI 檢測(cè)是否存在 server 參數(shù)所代表的 Transporter 拓展宣肚,不存在則拋出異常
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
//創(chuàng)建 ExchangeServer.
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
Exchangers.bind
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
//獲取 Exchanger想罕,默認(rèn)為 HeaderExchanger。
//調(diào)用 HeaderExchanger 的 bind 方法創(chuàng)建 ExchangeServer 實(shí)例
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
//HeaderExchanger.
return getExchanger(url).bind(url, handler);
}
public static Exchanger getExchanger(URL url) {
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}
public static Exchanger getExchanger(String type) {
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
靜態(tài)SPI的方式可知霉涨,是調(diào)用 HeaderExchanger 的 bind 方法創(chuàng)建 ExchangeServer 實(shí)例
headerExchanger.bind
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
/**
* new DecodeHandler(new HeaderExchangeHandler())
* Transporters.bind
* new HeaderExchangeServer
*/
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
這里面包含多個(gè)邏輯
? new DecodeHandler(new HeaderExchangeHandler(handler))
? Transporters.bind
? new HeaderExchangeServer
目前我們只需要關(guān)心 transporters.bind 方法即可
Transporters.bind
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
// 如果 handlers 元素?cái)?shù)量大于 1按价,則創(chuàng)建 ChannelHandler 分發(fā)器
handler = new ChannelHandlerDispatcher(handlers);
}
// 獲取自適應(yīng) Transporter 實(shí)例,并調(diào)用實(shí)例方法
//getTransporter() -> ExtensionLoader.getExtension(Transport.class).getExtension("netty");
return getTransporter().bind(url, handler);
}
getTransporter
public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
getTransporter 是一個(gè)自適應(yīng)擴(kuò)展點(diǎn)笙瑟,它針對(duì) bind 方法添加了自適應(yīng)注解楼镐,意味著,bing 方法的具體實(shí)現(xiàn)往枷,會(huì)基于Transporter$Adaptive 方法進(jìn)行適配框产,那么在這里面默認(rèn)的通信協(xié)議是 netty,所以它會(huì)采用 netty4 的實(shí)現(xiàn)师溅,也就是
org.apache.dubbo.remoting.transport.netty4.NettyTransporter
NettyTransporter.bind
創(chuàng)建一個(gè) nettyserver
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
NettyServer
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
進(jìn)入super方法
public abstract class AbstractServer extends AbstractEndpoint implements Server {
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
// 獲取 ip 和端口
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = ANYHOST_VALUE;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
try {
doOpen();// 調(diào)用模板方法 doOpen 啟動(dòng)服務(wù)器
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
//fixme replace this with better method
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
初始化一個(gè) nettyserver茅信,并且從 url 中獲得相應(yīng)的 ip/ port。然后調(diào)用 doOpen();
doOpen
開啟 netty 服務(wù)墓臭,都是netty相關(guān)的代碼蘸鲸,大家應(yīng)該都很熟悉了
@Override
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
要注意的是它這里用到了一個(gè) handler 來處理客戶端傳遞過來的請(qǐng)求:
nettyServerHandler
NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
這個(gè) handler 是一個(gè)鏈路,它的正確組成應(yīng)該是
MultiMessageHandler(heartbeatHandler(AllChannelHandler(DecodeHandler(HeaderExchangeHeadler(dubboProtocol
后續(xù)接收到的請(qǐng)求窿锉,會(huì)一層一層的處理酌摇,比較繁瑣。
Invoker 是什么
之前埋的伏筆嗡载,這里單獨(dú)分析下invoker
從前面的分析來看窑多,服務(wù)的發(fā)布分三個(gè)階段:
第一個(gè)階段會(huì)創(chuàng)造一個(gè) invoker
第二個(gè)階段會(huì)把經(jīng)歷過一系列處理的 invoker(各種包裝),在 DubboProtocol 中保存到 exporterMap 中
第三個(gè)階段把 dubbo 協(xié)議的 url 地址注冊(cè)到注冊(cè)中心上
前面沒有分析 Invoker洼滚,我們來簡(jiǎn)單看看 Invoker 到底是一個(gè)啥東西埂息。
Invoker 是 Dubbo 領(lǐng)域模型中非常重要的一個(gè)概念, 和 ExtensionLoader 的重要性是一樣的,如果 Invoker 沒有搞懂,那么不算是看懂了 Dubbo 的源碼千康。
我們繼續(xù)回到 ServiceConfig 中 export 的代碼享幽,這段代碼是還沒有分析過的。
//這個(gè)invoker具體是什么拾弃,下面會(huì)單獨(dú)開一個(gè)模塊分析
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
以這個(gè)作為入口來分析我們前面 export 出去的 invoker 到底是啥東西
ProxyFacotory.getInvoker
這個(gè)是一個(gè)代理工廠值桩,用來生成 invoker,從它的定義來看豪椿,它是一個(gè)自適應(yīng)擴(kuò)展點(diǎn)奔坟,看到這樣的擴(kuò)展點(diǎn),我們幾乎可以不假思索的想到它會(huì)存在一個(gè)動(dòng)態(tài)適配器類
ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
ProxyFactory
@SPI("javassist")
public interface ProxyFactory {
/**
* create proxy.
*
* @param invoker
* @return proxy
*/
@Adaptive({PROXY_KEY})
<T> T getProxy(Invoker<T> invoker) throws RpcException;
/**
* create proxy.
*
* @param invoker
* @return proxy
*/
@Adaptive({PROXY_KEY})
<T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;
/**
* create invoker.
*
* @param <T>
* @param proxy
* @param type
* @param url
* @return invoker
*/
@Adaptive({PROXY_KEY})
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}
這個(gè)方法的簡(jiǎn)單解讀為: 它是一個(gè) spi 擴(kuò)展點(diǎn)搭盾,并且默認(rèn)的擴(kuò)展實(shí)現(xiàn)是 javassit, 這個(gè)接口中有三個(gè)方法咳秉,并且都是加了@Adaptive 的自適應(yīng)擴(kuò)展點(diǎn)。所以如果調(diào)用 getInvoker 方法鸯隅,應(yīng)該會(huì)返回一個(gè) ProxyFactory$Adaptive
ProxyFactory$Adaptive
public class ProxyFactory$Adaptive implements org.apache.dubbo.rpc.ProxyFactory {
public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys ([proxy])");
org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0);
}
public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0, boolean arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys ([proxy])");
org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0, arg1);
}
public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, org.apache.dubbo.common.URL arg2) throws org.apache.dubbo.rpc.RpcException {
if (arg2 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg2;
String extName = url.getParameter("proxy", "javassist");
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys ([proxy])");
org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}
}
這個(gè)自適應(yīng)擴(kuò)展點(diǎn)滴某,做了三件事情
? 通過 ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(extName)獲取了一個(gè)指定名稱的擴(kuò)展點(diǎn),
? 在 dubbo-rpc-api/resources/META-INF/com.alibaba.dubbo.rpc.ProxyFactory 中,定義了 javassis=JavassisProxyFactory
? 調(diào)用 JavassisProxyFactory 的 getInvoker 方法
JavassistProxyFactory.getInvoker
先回顧我們自己定義的接口類
package com.wei;
public interface ISayHelloService {
String sayHello(String content);
}
和實(shí)現(xiàn)類
package com.wei;
import com.wei.ISayHelloService;
import org.apache.dubbo.config.annotation.Service;
@Service
public class SayHelloServiceImpl implements ISayHelloService {
@Override
public String sayHello(String content) {
return "Hello :"+content;
}
}
javassist 是一個(gè)動(dòng)態(tài)類庫滋迈,用來實(shí)現(xiàn)動(dòng)態(tài)代理的。
proxy:接口的實(shí)現(xiàn): com.wei.SayHelloServiceImpl
type:接口全稱 com.wei.ISayHelloService
url:協(xié)議地址:registry://...
public class JavassistProxyFactory extends AbstractProxyFactory {
...
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
//javassist 生成的動(dòng)態(tài)代理代碼户誓,構(gòu)建一個(gè)代理類
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
//構(gòu)建好了代理類之后饼灿,返回一個(gè) AbstractproxyInvoker,并且它實(shí)現(xiàn)了 doInvoke 方法
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
javassist 生成的動(dòng)態(tài)代理代碼
先進(jìn)入Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);方法,這里相當(dāng)于
Wrapper.getWrapper(com.wei.ISayHelloService.class)
type= com.wei.ISayHelloService
public static Wrapper getWrapper(Class<?> c) {
while (ClassGenerator.isDynamicClass(c)) // can not wrapper on dynamic class.
{
c = c.getSuperclass();
}
if (c == Object.class) {
return OBJECT_WRAPPER;
}
Wrapper ret = WRAPPER_MAP.get(c);
if (ret == null) {
//c=com.wei.ISayHelloService.class
ret = makeWrapper(c);
WRAPPER_MAP.put(c, ret);
}
return ret;
}
再進(jìn)入makeWrapper方法
private static Wrapper makeWrapper(Class<?> c) {
if (c.isPrimitive()) {
throw new IllegalArgumentException("Can not create wrapper for primitive type: " + c);
}
//這個(gè)name = com.wei.ISayHelloService
String name = c.getName();
ClassLoader cl = ClassUtils.getClassLoader(c);
StringBuilder c1 = new StringBuilder("public void setPropertyValue(Object o, String n, Object v){ ");
StringBuilder c2 = new StringBuilder("public Object getPropertyValue(Object o, String n){ ");
StringBuilder c3 = new StringBuilder("public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws " + InvocationTargetException.class.getName() + "{ ");
c1.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
c2.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
c3.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
//省略下面的拼接字符串代碼
通過斷點(diǎn)的方式帝美,在 Wrapper.getWrapper 中的 makeWrapper碍彭,會(huì)創(chuàng)建一個(gè)動(dòng)態(tài)代理類,核心的方法invokeMethod 拼接好的字符串的代碼如下:
public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
com.wei.ISayHelloService w;
try {
w = ((com.wei.ISayHelloServiceImpl) $1);
} catch (Throwable e) {
throw new IllegalArgumentException(e);
}
try {
if ("sayHello".equals($2) && $3.length == 1) {
//從這個(gè)生成的代碼可知悼潭,dubbo的服務(wù)調(diào)用并不是通過反射庇忌,還是直接生成好方法給客戶端調(diào)用
return ($w) w.sayHello((java.lang.String) $4[0]);
}
} catch (Throwable e) {
throw new java.lang.reflect.InvocationTargetException(e);
}
throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class com.wei.ISayHelloService.");
}
構(gòu)建好了代理類之后,返回一個(gè) AbstractproxyInvoker,并且它實(shí)現(xiàn)了 doInvoke 方法舰褪,這個(gè)地方似乎看到了 dubbo 消費(fèi)者調(diào)用過來的時(shí)候觸發(fā)的影子皆疹,因?yàn)?wrapper.invokeMethod 本質(zhì)上就是觸發(fā)上面動(dòng)態(tài)代理類的方法 invokeMethod。
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
//javassist 生成的動(dòng)態(tài)代理代碼占拍,構(gòu)建一個(gè)代理類
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
//構(gòu)建好了代理類之后略就,返回一個(gè) AbstractproxyInvoker,并且它實(shí)現(xiàn)了 doInvoke 方法,dubbo消費(fèi)端調(diào)用方法晃酒,就會(huì)觸發(fā)doInvoke方法
return new AbstractProxyInvoker<T>(proxy, type, url) {
//所以可以猜想得出表牢,假設(shè)客戶端調(diào)用ISayHelloService.sayHello方法,最終會(huì)調(diào)用此doInvoke方法
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
//而oInvoke方法贝次,最終就是調(diào)用上面動(dòng)態(tài)代理類生成好的方法 invokeMethod
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
從這段代碼也解密得出崔兴, 假設(shè)客戶端調(diào)用ISayHelloService.sayHello方法,最終會(huì)調(diào)用此doInvoke方法,而oInvoke方法敲茄,最終就是調(diào)用上面動(dòng)態(tài)代理類生成好的方法 invokeMethod位谋,而invokeMethod方法就是生成好的寫死的調(diào)用實(shí)現(xiàn)類SayHelloServiceImpl.sayHello方法(客戶端源碼分析在下篇文章)
所以,簡(jiǎn)單總結(jié)一下 Invoke 本質(zhì)上應(yīng)該是一個(gè)代理折汞,經(jīng)過層層包裝最終進(jìn)行了發(fā)布倔幼。當(dāng)消費(fèi)者發(fā)起請(qǐng)求的時(shí)候,會(huì)獲得這個(gè)invoker 進(jìn)行調(diào)用爽待。
最終發(fā)布出去的 invoker, 也不是單純的一個(gè)代理损同,也是經(jīng)過多層包裝
InvokerDelegate(DelegateProviderMetaDataInvoker(AbstractProxyInvoker()))
到這里,dubbo服務(wù)端的發(fā)布就正式完成了鸟款,我們也可用點(diǎn)對(duì)點(diǎn)直連的方式進(jìn)行調(diào)用服務(wù)端了膏燃,但是一般我們都是使用zk做注冊(cè)中心,所以還需要分析服務(wù)的注冊(cè)邏輯何什。
服務(wù)注冊(cè)流程
我們?cè)倩氐?RegistryProtocol 這個(gè)類的export方法中
//實(shí)現(xiàn)服務(wù)的注冊(cè)和發(fā)布
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//這里獲得的是 zookeeper 注冊(cè)中心的 url: zookeeper://ip:port
URL registryUrl = getRegistryUrl(originInvoker);
//這里是獲得服務(wù)提供者的 url, dubbo://ip:port...
URL providerUrl = getProviderUrl(originInvoker);
//訂閱 override 數(shù)據(jù)组哩。在 admin 控制臺(tái)可以針對(duì)服務(wù)進(jìn)行治理,比如修改權(quán)重处渣,修改路由機(jī)制等伶贰,當(dāng)注冊(cè)中心有此服務(wù)的覆蓋配置注冊(cè)進(jìn)來時(shí),推送消息給提供者罐栈,重新暴露服務(wù)
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
/***********************************/
//(很重要)1.doLocalExport這里就是真正的服務(wù)發(fā)布邏輯
//這里就交給了具體的協(xié)議去暴露服務(wù)( 本質(zhì)就是去啟動(dòng)一個(gè)netty服務(wù))
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
//很重要)2.getRegistry這里就是真正的服務(wù)注冊(cè)邏輯
// 根據(jù) invoker 中的 url 獲取 Registry 實(shí)例: zookeeperRegistry
final Registry registry = getRegistry(originInvoker);
//獲取要注冊(cè)到注冊(cè)中心的 URL: dubbo://ip:port
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);
//to judge if we need to delay publish
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {{//是否配置了注冊(cè)中心黍衙,如果是, 則需要注冊(cè)
//注冊(cè)到注冊(cè)中心的 URL
register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}
//設(shè)置注冊(cè)中心的訂閱
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
//保證每次 export 都返回一個(gè)新的 exporter 實(shí)例
return new DestroyableExporter<>(exporter);
}
1.doLocalExport這里就是真正的服務(wù)發(fā)布邏輯
我們現(xiàn)在從
2.getRegistry這里就是真正的服務(wù)注冊(cè)邏輯
這里開始繼續(xù)分析服務(wù)注冊(cè)邏輯
getRegistry
private Registry getRegistry(final Invoker<?> originInvoker) {
//把 url 轉(zhuǎn)化為配置的具體協(xié)議荠诬,比如 zookeeper://ip:port. 這樣后續(xù)獲得的注冊(cè)中心就會(huì)是基于 zk 的實(shí)現(xiàn)
URL registryUrl = getRegistryUrl(originInvoker);
return registryFactory.getRegistry(registryUrl);
}
- 把 url 轉(zhuǎn)化為對(duì)應(yīng)配置的注冊(cè)中心的具體協(xié)議
- 根據(jù)具體協(xié)議琅翻,從 registryFactory 中獲得指定的注冊(cè)中心實(shí)現(xiàn),那么這個(gè) registryFactory具體是怎么賦值的呢柑贞?
在 RegistryProtocol 中存在一段這樣的代碼方椎,很明顯這是通過依賴注入來實(shí)現(xiàn)的擴(kuò)展點(diǎn)。
private RegistryFactory registryFactory;
public void setRegistryFactory(RegistryFactory registryFactory) {
this.registryFactory = registryFactory;
}
按照擴(kuò)展點(diǎn)的加載規(guī)則钧嘶,我們可以先看看各個(gè)/META-INF/dubbo/internal 路徑下找到 RegistryFactory 的配置文件.這個(gè) factory 有多個(gè)擴(kuò)展點(diǎn)的實(shí)現(xiàn)棠众。
dubbo=org.apache.dubbo.registry.dubbo.DubboRegistryFactory
multicast=org.apache.dubbo.registry.multicast.MulticastRegistryFactory
zookeeper=org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory
redis=org.apache.dubbo.registry.redis.RedisRegistryFactory
consul=org.apache.dubbo.registry.consul.ConsulRegistryFactory
etcd3=org.apache.dubbo.registry.etcd.EtcdRegistryFactory
接著,找到 RegistryFactory 的實(shí)現(xiàn), 發(fā)現(xiàn)它里面有一個(gè)自適應(yīng)的方法康辑,根據(jù) url 中 protocol 傳入的值進(jìn)行適配
@SPI("dubbo")
public interface RegistryFactory {
/**
* Connect to the registry
* <p>
* Connecting the registry needs to support the contract: <br>
* 1. When the check=false is set, the connection is not checked, otherwise the exception is thrown when disconnection <br>
* 2. Support username:password authority authentication on URL.<br>
* 3. Support the backup=10.20.153.10 candidate registry cluster address.<br>
* 4. Support file=registry.cache local disk file cache.<br>
* 5. Support the timeout=1000 request timeout setting.<br>
* 6. Support session=60000 session timeout or expiration settings.<br>
*
* @param url Registry address, is not allowed to be empty
* @return Registry reference, never return empty value
*/
@Adaptive({"protocol"})
Registry getRegistry(URL url);
}
RegistryFactory$Adaptive
public class RegistryFactory$Adaptive implements org.apache.dubbo.registry.RegistryFactory {
public org.apache.dubbo.registry.Registry getRegistry(org.apache.dubbo.common.URL arg0) {
if (arg0 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg0;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.registry.RegistryFactory) name from url(" + url.toString() + ") use keys ([protocol])");
org.apache.dubbo.registry.RegistryFactory extension = (org.apache.dubbo.registry.RegistryFactory) ExtensionLoader.getExtensionLoader(org.apache.dubbo.registry.RegistryFactory.class).getExtension(extName);
return extension.getRegistry(arg0);
}
}
由于在前面的代碼中摄欲,url 中的 protocol 已經(jīng)改成了 zookeeper,那么這個(gè)時(shí)候根據(jù) zookeeper 獲得的 spi 擴(kuò)展點(diǎn)應(yīng)該是ZookeeperRegistryFactory疮薇,然后直接去找ZookeeperRegistryFactory.getRegistry
ZookeeperRegistryFactory
這個(gè)方法中并沒有 getRegistry 方法胸墙,而是在父類 AbstractRegistryFactory中找到getRegistry方法
public abstract class AbstractRegistryFactory implements RegistryFactory {
@Override
public Registry getRegistry(URL url) {
url = URLBuilder.from(url)
.setPath(RegistryService.class.getName())
.addParameter(INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(EXPORT_KEY, REFER_KEY)
.build();
String key = url.toServiceStringWithoutResolving();
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
//create registry by spi/ioc
//創(chuàng)建注冊(cè)中心
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
protected abstract Registry createRegistry(URL url);
}
邏輯很簡(jiǎn)單,
1.從緩存 REGISTRIES 中按咒,根據(jù) key 獲得對(duì)應(yīng)的 Registry
2.如果不存在迟隅,則創(chuàng)建 Registry
createRegistry
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
private ZookeeperTransporter zookeeperTransporter;
/**
* Invisible injection of zookeeper client via IOC/SPI
* @param zookeeperTransporter
*/
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
@Override
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
}
創(chuàng)建一個(gè) zookeeperRegistry但骨,把 url 和 zookeepertransporter 作為參數(shù)傳入。
zookeeperTransporter 這個(gè)屬性也是基于依賴注入來賦值的智袭,具體的流程就不再分析了奔缠,這個(gè)的值應(yīng)該是CuratorZookeeperTransporter 表示具體使用什么框架來和 zk 產(chǎn)生連接
ZookeeperRegistry
這個(gè)方法中使用了 CuratorZookeeperTransport 來實(shí)現(xiàn) zk 的連接
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
//獲得 group 名稱
String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(PATH_SEPARATOR)) {
group = PATH_SEPARATOR + group;
}
this.root = group;
//產(chǎn)生一個(gè) zookeeper 連接
zkClient = zookeeperTransporter.connect(url);
//添加 zookeeper 狀態(tài)變化事件
zkClient.addStateListener(state -> {
if (state == StateListener.RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
});
}
再回到RegistryProtocol.export方法,接著往下分析
// 根據(jù) invoker 中的 url 獲取 Registry 實(shí)例: zookeeperRegistry
final Registry registry = getRegistry(originInvoker);
//獲取要注冊(cè)到注冊(cè)中心的 URL: dubbo://ip:port
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);
//to judge if we need to delay publish
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {{//是否配置了注冊(cè)中心吼野,如果是校哎, 則需要注冊(cè)
//注冊(cè)到注冊(cè)中心的 URL
register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}
繼續(xù)分析 register(registryUrl, registeredProviderUrl); 方法
public void register(URL registryUrl, URL registeredProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registeredProviderUrl);
}
registry.register(registedProviderUrl);
繼續(xù)往下分析,會(huì)調(diào)用 registry.register 去講 dubbo://的協(xié)議地址注冊(cè)到 zookeeper 上
這個(gè)方法會(huì)調(diào)用 FailbackRegistry 類中的 register. 為什么呢瞳步?因?yàn)?ZookeeperRegistry 這個(gè)類中并沒有 register 這個(gè)方法闷哆,但是他的父類 FailbackRegistry 中存在 register 方法,而這個(gè)類又重寫了 AbstractRegistry 類中的 register 方法单起。所以我們可以直接定位到 FailbackRegistry 這個(gè)類中的 register 方法中
FailbackRegistry.register
@Override
public void register(URL url) {
super.register(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// Sending a registration request to the server side
// 調(diào)用子類實(shí)現(xiàn)真正的服務(wù)注冊(cè)抱怔,把 url 注冊(cè)到 zk 上
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
// 如果開啟了啟動(dòng)時(shí)檢測(cè),則直接拋出異常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
// 將失敗的注冊(cè)請(qǐng)求記錄到失敗列表嘀倒,定時(shí)重試
addFailedRegistered(url);
}
}
1.FailbackRegistry屈留,從名字上來看,是一個(gè)失敗重試機(jī)制
2.調(diào)用父類的 register 方法测蘑,講當(dāng)前 url 添加到緩存集合中
調(diào)用 doRegister 方法灌危,這個(gè)方法很明顯,是一個(gè)抽象方法碳胳,會(huì)由 ZookeeperRegistry 子類實(shí)現(xiàn)乍狐。
ZookeeperRegistry.doRegister
最終調(diào)用 curator 的客戶端把服務(wù)地址注冊(cè)到 zk
@Override
public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
至此服務(wù)注冊(cè)也分析完成。
——學(xué)自咕泡學(xué)院