前言
前面我們講到的服務(wù)暴露流程中,沒(méi)有涉及Protocol相關(guān)操作歹撒,本篇我們來(lái)看Protocol的實(shí)現(xiàn)。 本文將重點(diǎn)分析Protocol的核心功能秋柄,以及Protocol的設(shè)計(jì)與實(shí)現(xiàn)原理庸队。從Protocol接口開(kāi)始积蜻,逐個(gè)分析具體Protocol的實(shí)現(xiàn)闯割。Protocol是dubbo中協(xié)議的抽象,負(fù)責(zé)服務(wù)的暴露竿拆、引用宙拉,在dubbo整個(gè)框架設(shè)計(jì)中位于protocol層(遠(yuǎn)程調(diào)用層),在exchange層(信息交換層)之上(各層間依賴(lài)關(guān)系:protocol -> exchange -> transport -> serialize)如输。Protocol接口支持SPI擴(kuò)展鼓黔,默認(rèn)SPI實(shí)現(xiàn)是DubboProtocol,同時(shí)支持方法級(jí)SPI。接口核心方法包括export(服務(wù)暴露)不见、refer(服務(wù)引用)澳化、destroy(協(xié)議銷(xiāo)毀)。AbstractProtocol作為基類(lèi)實(shí)現(xiàn)稳吮,提供默認(rèn)destroy方法實(shí)現(xiàn)缎谷。方便理解起見(jiàn),我把Protocol的SPI擴(kuò)展實(shí)現(xiàn)分為兩種灶似,第一種直接實(shí)現(xiàn)Protocol接口列林,這里暫且稱(chēng)之為Protocol代理,包括QosProtocolWrapper酪惭、RegistryProtocol希痴、ProtocolFilterWrapper、ProtocolListenerWrapper春感;第二種繼承AbstractProtocol基類(lèi)砌创,具體實(shí)現(xiàn)export、refer方法功能鲫懒,最具代表性的是DubboProtocol嫩实,但是RedisProtocol、MemcachedProtocol窥岩、MockProtocol僅提供服務(wù)引用功能(僅實(shí)現(xiàn)refer接口)甲献,不支持服務(wù)暴露;另外颂翼,有一部分Protocol通過(guò)繼承AbstractProxyProtocol(當(dāng)然晃洒,AbstractProxyProtocol繼承了AbstractProtocol)實(shí)現(xiàn),AbstractProxyProtocol內(nèi)部引入ProxyFactory朦乏,借助Hessian锥累、Spring的httpInvoker、Spring的RMI集歇、apache的cxf(webService框架)等Rpc桶略、RMI、Service框架實(shí)現(xiàn)服務(wù)的暴露和引用。類(lèi)之間的繼承關(guān)系如下
一际歼、Protocol接口
上面提到惶翻,Protocol對(duì)外暴露的核心接口有export(服務(wù)暴露)、refer(服務(wù)引用)鹅心、destroy(協(xié)議銷(xiāo)毀)吕粗,dubbo默認(rèn)采用的協(xié)議是DubboProtocol,其中服務(wù)暴露和服務(wù)引用支持方法級(jí)SPI旭愧。核心方法介紹如下:
1.1颅筋、 服務(wù)暴露
服務(wù)暴露,即對(duì)外暴露可用服務(wù)输枯,其本質(zhì)是創(chuàng)建socket連接议泵,并綁定url,對(duì)外暴露服務(wù)端口
export方法有以下幾點(diǎn)需要注意
1桃熄、收到請(qǐng)求后先口,會(huì)記錄請(qǐng)求源地址到 RpcContext,RpcContext.getContext().setRemoteAddress();
2瞳收、export方法要求冪等碉京,即對(duì)于同一個(gè)URL來(lái)說(shuō)調(diào)用一次與調(diào)用n次的結(jié)果一致;
3螟深、參數(shù)Invoker由框架傳遞谐宙,protocol無(wú)需關(guān)注。
1.2界弧、服務(wù)引用
refer方法有以下幾點(diǎn)需要注意
1卧惜、用戶調(diào)用refer方法返回的Invoker對(duì)象的invoke方法時(shí),protocol需要相應(yīng)的執(zhí)行Invoker的invoke方法夹纫;
2、由protocol負(fù)責(zé)refer方法返回的Invoker的具體實(shí)現(xiàn);
3设凹、當(dāng)URL中設(shè)置了check=false時(shí)舰讹,連接失敗時(shí)protocol應(yīng)該嘗試恢復(fù)而不是拋異常。
1.3 闪朱、協(xié)議銷(xiāo)毀
destroy方法有以下幾點(diǎn)需要注意
1月匣、銷(xiāo)毀方法需要取消當(dāng)前協(xié)議暴露和引用的所有服務(wù);
2、釋放所有占用的資源奋姿,比如連接锄开、端口等;
3称诗、當(dāng)被銷(xiāo)毀之后萍悴,protocol可以繼續(xù)暴露或者引用新服務(wù)。
一、Protocol代理
Protocol的代理實(shí)現(xiàn)主要有QosProtocolWrapper癣诱、ProtocolFilterWrapper计维、ProtocolListenerWrapper、RegistryProtocol撕予。其中OosProtocolWrapper提供服務(wù)的Qos鲫惶,即為dubbo服務(wù)提供Qos保障(Qos 指利用各種基礎(chǔ)技術(shù),為指定的網(wǎng)絡(luò)通信提供更好的服務(wù)能力实抡,是網(wǎng)絡(luò)的一種安全機(jī)制欠母, 用來(lái)解決網(wǎng)絡(luò)延遲和阻塞等問(wèn)題,通常Oos的關(guān)鍵指標(biāo)包括可用性吆寨、吞吐量赏淌、時(shí)延、時(shí)延變化(包括抖動(dòng)和漂移)和丟失)鸟废。嚴(yán)格來(lái)講猜敢,RegistryProtocol并非Protocol的代理實(shí)現(xiàn)(dubbo中對(duì)代理類(lèi)的定義是,構(gòu)造方法有且僅有一個(gè)參數(shù)盒延,即被代理類(lèi)對(duì)象)缩擂,但RegistryProtocol的服務(wù)暴露、引用均由內(nèi)部的protocol引用實(shí)現(xiàn)添寺,所以將其歸類(lèi)到代理胯盯,一并進(jìn)行解析。
1计露、QosProtocolWrapper
QosProtocolWrapper是Protocol的代理實(shí)現(xiàn)博脑,提供服務(wù)的Qos保障,需要注意的是票罐,dubbo默認(rèn)開(kāi)啟Qos叉趣,即默認(rèn)qos.enable=true;另外该押,只有協(xié)議采用registry時(shí)疗杉,才會(huì)啟動(dòng)Qos;Qos的實(shí)現(xiàn)借助Netty蚕礼,打開(kāi)本地端口(*默認(rèn)端口22222*)烟具,創(chuàng)建并啟動(dòng)Server。支持telnet使用奠蹬,格式:telnet ip port朝聋。
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// protocol使用registry時(shí),才會(huì)啟動(dòng)Qos服務(wù)囤躁,否則直接使用代理的protocol實(shí)現(xiàn)服務(wù)暴露
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
startQosServer(invoker.getUrl());
return protocol.export(invoker);
}
return protocol.export(invoker);
}
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// protocol使用registry時(shí)冀痕,才會(huì)啟動(dòng)Qos荔睹,復(fù)雜直接使用代理的protocol實(shí)現(xiàn)服務(wù)引用
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
startQosServer(url);
return protocol.refer(type, url);
}
return protocol.refer(type, url);
}
來(lái)看Qos服務(wù)啟動(dòng)的邏輯,核心邏輯是創(chuàng)建Server并啟動(dòng)(這里的Server是個(gè)單獨(dú)的工具類(lèi)),比較簡(jiǎn)單金度,直接來(lái)看Server的啟動(dòng)邏輯
public void start() throws Throwable {
if (!started.compareAndSet(false, true)) {
return;
}
boss = new NioEventLoopGroup(0, new DefaultThreadFactory("qos-boss", true));
worker = new NioEventLoopGroup(0, new DefaultThreadFactory("qos-worker", true));
// 創(chuàng)建NettyServer綁定端口
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);
serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
serverBootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new QosProcessHandler(welcome, acceptForeignIp));
}
});
try {
serverBootstrap.bind(port).sync();
logger.info("qos-server bind localhost:" + port);
} catch (Throwable throwable) {
logger.error("qos-server can not bind localhost:" + port, throwable);
throw throwable;
}
}
2应媚、ProtocolFilterWrapper
ProtocolFilterWrapper的核心邏輯就是buildInvokerChain,用于構(gòu)建invoker的調(diào)用鏈猜极,前面介紹buildInvokerChain方法時(shí)中姜,做過(guò)相關(guān)介紹,這里不過(guò)多解析跟伏。
3丢胚、ProtocolListenerWrapper
ProtocolListenerWrapper同樣是Protocol的代理實(shí)現(xiàn),借助ExporterListener(支持SPI擴(kuò)展,無(wú)實(shí)質(zhì)邏輯)受扳、InvokerListener(支持SPI擴(kuò)展携龟,無(wú)實(shí)質(zhì)邏輯),用于提供服務(wù)暴露勘高、引用實(shí)例的代理峡蟋,只有protocol非registry時(shí)才會(huì)返回實(shí)例代理。僅需關(guān)注核心方法export华望、refer蕊蝗,邏輯也比較簡(jiǎn)單
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 協(xié)議值為registry時(shí),直接執(zhí)行服務(wù)暴露,否則返回服務(wù)暴露后的代理實(shí)例
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return new ListenerExporterWrapper<T>(protocol.export(invoker),
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
}
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 協(xié)議值為registry時(shí)赖舟,直接執(zhí)行服務(wù)的引用,否則返回的代理實(shí)例
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
Collections.unmodifiableList(
ExtensionLoader.getExtensionLoader(InvokerListener.class)
.getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
}
上面的代碼中蓬戚,ListenerExporterWrapper與ListenerInvokerWrapper的邏輯比較簡(jiǎn)單,構(gòu)造方法中分別執(zhí)行ExporterListener.export宾抓、InvokerListener.invoke方法的代理邏輯子漩。
4、RegistryProtocol
RegistryProtocol石洗,即注冊(cè)中心協(xié)議實(shí)現(xiàn)幢泼,服務(wù)暴露多了創(chuàng)建注冊(cè)中心和服務(wù)注冊(cè)邏輯。當(dāng)URL中protocol=registry時(shí)讲衫,采用該協(xié)議缕棵。分析RegisgtryProtocol過(guò)程中,Registry的實(shí)現(xiàn)我們以全部以Registry的默認(rèn)SPI實(shí)現(xiàn)即DubboRegistry為例(后續(xù)會(huì)對(duì)Registry專(zhuān)門(mén)開(kāi)篇解析)焦人。RegistryProtocol的服務(wù)暴露包括:Url構(gòu)建、服務(wù)本地暴露重父、注冊(cè)中心構(gòu)建花椭、服務(wù)注冊(cè)幾個(gè)步驟
4.1 服務(wù)暴露
1、構(gòu)建URL
第一步房午,構(gòu)建registryUrl矿辽、providerUrl、overrideSubscribeUrl,以當(dāng)前Invoker中的URL(暫且叫做originUrl)為參照袋倔,按照原型模式創(chuàng)建雕蔽。registryUrl,即將originUrl中procotol值替換為registry值宾娜,從下面的示例可以清楚看出URL構(gòu)建的過(guò)程及結(jié)果批狐。
// 當(dāng)前Invoker中url值
originUrl = "registry://127.0.0.1:9090?export=dubbo://127.0.0.1:9453/org.apache.dubbo.registry.protocol.DemoService:1.0.0?notify=true&methods=test1,test2&side=con&side=consumer"
// registryUrl,即將protocol值由registry變更為dubbo(默認(rèn)注冊(cè)中心實(shí)現(xiàn)DubboRegistry)
registryUrl = "dubbo://127.0.0.1:9090?export=dubbo://127.0.0.1:9453/org.apache.dubbo.registry.protocol.DemoService:1.0.0?notify=true&methods=test1,test2&side=con&side=consumer"
// providerUrl,即originUrl中export值
providerUrl = "dubbo://127.0.0.1:9453/org.apache.dubbo.registry.protocol.DemoService:1.0.0?methods=test1,test2¬ify=true&side=consumer"
// overrideSubscribeUrl,即將originUrl中protocol值替換為provider,
//并新增category參數(shù)前塔,值為configurators嚣艇,為后面providerUrl的參數(shù)融合做準(zhǔn)備
overrideSubscribeUrl = "provider://127.0.0.1:9453/org.apache.dubbo.registry.protocol.DemoService:1.0.0?category=configurators&check=false&methods=test1,test2¬ify=true&side=consumer"
完成相關(guān)URL構(gòu)建之后,進(jìn)行URL參數(shù)融合华弓,核心代碼如下食零,重點(diǎn)關(guān)注overrideUrlWithConfig方法。providerConfigurationListener寂屏、serviceConfigurationListener構(gòu)建過(guò)程中贰谣,會(huì)加載動(dòng)態(tài)配置(關(guān)于配置的解析詳見(jiàn)dubbo之配置(Configuration))到對(duì)應(yīng)listener的configurators,通過(guò)listener的overrideUrl方法將configurators配置融合到providerUrl參數(shù)
// 構(gòu)建信息融合listener
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
// 動(dòng)態(tài)配置信息融合到provider
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
// 配置信息融合
private URL overrideUrlWithConfig(URL providerUrl, OverrideListener listener) {
// 應(yīng)用級(jí)配置信息融合到providerUrl
providerUrl = providerConfigurationListener.overrideUrl(providerUrl);
ServiceConfigurationListener serviceConfigurationListener = new ServiceConfigurationListener(providerUrl, listener);
// 服務(wù)級(jí)配置信息融合到providerUrl
serviceConfigurationListeners.put(providerUrl.getServiceKey(), serviceConfigurationListener);
return serviceConfigurationListener.overrideUrl(providerUrl);
}
有一點(diǎn)需要注意迁霎,ServiceConfigurationListener初始化過(guò)程中會(huì)從動(dòng)態(tài)配置拉取最新配置信息吱抚,然后執(zhí)行reExport(重新暴露),入口在process方法,邏輯如下:
protected final void initWith(String key) {
//動(dòng)態(tài)配置
DynamicConfiguration dynamicConfiguration = DynamicConfiguration.getDynamicConfiguration();
// 如果是nopDynamicConfiguration欧引,那么這里什么都不會(huì)做
dynamicConfiguration.addListener(key, this);
String rawConfig = dynamicConfiguration.getConfig(key);
if (!StringUtils.isEmpty(rawConfig)) {
//直接根據(jù)字符串配置频伤,賦值configurators.并執(zhí)行notifyOverride模板方法
process(new ConfigChangeEvent(key, rawConfig));
}
}
configurators的初始化不再過(guò)多解析,重點(diǎn)關(guān)注process方法中調(diào)用的notifyOverrides模板方法芝此,以ProviderConfigurationListener為例憋肖,notifyOverrides實(shí)現(xiàn)如下:
@Override
protected void notifyOverrides() {
overrideListeners.values().forEach(listener -> ((OverrideListener) listener).doOverrideIfNecessary());
}
繼續(xù)看方法內(nèi)部,執(zhí)行OverrideListener的doOverrideIfNecessary方法婚苹,邏輯比較簡(jiǎn)單岸更,對(duì)比配置覆蓋前后的url,若不一致膊升,則重新暴露
public synchronized void doOverrideIfNecessary() {
final Invoker<?> invoker;
// 原始invoker或者invoker代理
if (originInvoker instanceof InvokerDelegate) {
invoker = ((InvokerDelegate<?>) originInvoker).getInvoker();
} else {
invoker = originInvoker;
}
//The origin invoker怎炊,從Invoker中解析出原始URL(非注冊(cè)中心url)
URL originUrl = RegistryProtocol.this.getProviderUrl(invoker);
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<?> exporter = bounds.get(key);
if (exporter == null) {
logger.warn(new IllegalStateException("error state, exporter should not be null"));
return;
}
//The current, may have been merged many times
URL currentUrl = exporter.getInvoker().getUrl();
//Merged with this configuration,所有configurator參數(shù)合并到原始URL,生成新url
URL newUrl = getConfigedInvokerUrl(configurators, originUrl);
newUrl = getConfigedInvokerUrl(serviceConfigurationListeners.get(originUrl.getServiceKey())
.getConfigurators(), newUrl);
newUrl = getConfigedInvokerUrl(providerConfigurationListener.getConfigurators(), newUrl);
// 如果當(dāng)前url與合并后的新url不一致廓译,那么评肆,重新暴露新的url
if (!currentUrl.equals(newUrl)) {
RegistryProtocol.this.reExport(originInvoker, newUrl);
logger.info("exported provider url changed, origin url: " + originUrl +
", old export url: " + currentUrl + ", new export url: " + newUrl);
}
}
重新暴露即reExport方法分為兩部分,服務(wù)重新暴露和服務(wù)重新注冊(cè)
public <T> void reExport(final Invoker<T> originInvoker, URL newInvokerUrl) {
// update local exporter非区,本地exporter更新,檢查本地Exporter瓜挽,通過(guò)代理protocol重新暴露
ExporterChangeableWrapper exporter = doChangeLocalExport(originInvoker, newInvokerUrl);
// update registry
URL registryUrl = getRegistryUrl(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(newInvokerUrl, registryUrl);
//decide if we need to re-publish
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.getProviderWrapper(registeredProviderUrl, originInvoker);
ProviderInvokerWrapper<T> newProviderInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
/**
* Only if the new url going to Registry is different with the previous one should we do unregister and register.
* 原始provider已注冊(cè),且原始providerUrl與新的providerUrl不一致征绸,執(zhí)行重新注冊(cè)操作(注銷(xiāo)久橙、重新注冊(cè))
*/
if (providerInvokerWrapper.isReg() && !registeredProviderUrl.equals(providerInvokerWrapper.getProviderUrl())) {
unregister(registryUrl, providerInvokerWrapper.getProviderUrl());
register(registryUrl, registeredProviderUrl);
newProviderInvokerWrapper.setReg(true);
}
//更新注冊(cè)的url
exporter.setRegisterUrl(registeredProviderUrl);
}
2俄占、服務(wù)本地暴露
服務(wù)本地暴露,先從緩存中取之前的暴露結(jié)果淆衷,取通過(guò)protocol引用執(zhí)行服務(wù)暴露缸榄。這里實(shí)際執(zhí)行的是RegistryProtocol中通過(guò)IOC注入的protocol(默認(rèn)是DubboProtocol),也就是說(shuō)實(shí)際執(zhí)行的是DubboProtocol的export祝拯,執(zhí)行完成之后甚带,包裝結(jié)果Invoker到ExporterChangeableWrapper,并返回鹿驼。來(lái)看代碼:
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
//本地暴露欲低,即暴露invoker并緩存到bounds
final Invoker<?> invokerDelegete = new InvokerDelegate<T>(originInvoker, providerUrl);
// 生成代理類(lèi)exporter
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
3、構(gòu)建注冊(cè)中心
注冊(cè)中心通過(guò)RegistryFactory構(gòu)建畜晰,簡(jiǎn)單介紹一下RegistryFactory砾莱,RegistryFactory接口支持SPI,默認(rèn)SPI實(shí)現(xiàn)是DubboRegistryFactory凄鼻,也就是說(shuō)除非URL中指定了registry值腊瑟,否則默認(rèn)采用DubboRegistry。我們以DubboRegistry為例來(lái)解析注冊(cè)中心的構(gòu)建過(guò)程:
private Registry getRegistry(final Invoker<?> originInvoker) {
URL registryUrl = getRegistryUrl(originInvoker);
return registryFactory.getRegistry(registryUrl);
}
getRegistry方法由RegistryFactory的基類(lèi)AbstractRegistryFactory實(shí)現(xiàn)块蚌,基類(lèi)內(nèi)部定義了模板方法createRegistry闰非,由子類(lèi)實(shí)現(xiàn)。直接來(lái)看DubboRegistryFactory的createRegistry邏輯峭范〔扑桑基類(lèi)方法getRegistry中,dubbo會(huì)將當(dāng)前registryUrl中的path參數(shù)替換成org.apache.dubbo.registry.RegistryService纱控,同時(shí)補(bǔ)全methods參數(shù)值辆毡,即RegistryService聲明的方法lookup,unsubscribe,subscribe,unregister,register。那么創(chuàng)建Registry時(shí)甜害,url參數(shù)值為:
dubbo://127.0.0.1:9090/org.apache.dubbo.registry.RegistryService?callbacks=10000&connect.timeout=10000&interface=org.apache.dubbo.registry.RegistryService&lazy=true&methods=lookup,unsubscribe,subscribe,unregister,register&reconnect=false&sticky=true&subscribe.1.callback=true&timeout=10000&unsubscribe.1.callback=false
接著舶掖,根據(jù)backup參數(shù)值,url被拆分為多個(gè)url存放于url列表尔店,列表中url除地址外眨攘,其他參數(shù)完全相同。然后嚣州,借助RegistryDirectory鲫售,完成注冊(cè)中心的創(chuàng)建
@Override
public Registry createRegistry(URL url) {
// 補(bǔ)全RegistryService相關(guān)參數(shù)
url = getRegistryURL(url);
List<URL> urls = new ArrayList<>();
urls.add(url.removeParameter(Constants.BACKUP_KEY));
String backup = url.getParameter(Constants.BACKUP_KEY);
// 根據(jù)backUp地址,拆分成多個(gè)url该肴,url列表作為RegistryDirectory的notify參數(shù)
if (backup != null && backup.length() > 0) {
String[] addresses = Constants.COMMA_SPLIT_PATTERN.split(backup);
for (String address : addresses) {
urls.add(url.setAddress(address));
}
}
// 創(chuàng)建RegistryDirectory
RegistryDirectory<RegistryService> directory = new RegistryDirectory<>(RegistryService.class, url.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()).addParameterAndEncoded(Constants.REFER_KEY, url.toParameterString()));
// join生成虛擬Invoke
Invoker<RegistryService> registryInvoker = cluster.join(directory);
// 創(chuàng)建RegistryService代理對(duì)象
RegistryService registryService = proxyFactory.getProxy(registryInvoker);
// 創(chuàng)建注冊(cè)中心
DubboRegistry registry = new DubboRegistry(registryInvoker, registryService);
directory.setRegistry(registry);
// 這里的protocol就是通過(guò)ExtensionLoader的spi注入的
directory.setProtocol(protocol);
directory.setRouterChain(RouterChain.buildChain(url));
// 刷新Directory內(nèi)invokers緩存
directory.notify(urls);
// 訂閱consumer數(shù)據(jù)情竹,刷新內(nèi)部configurators,調(diào)用registry代理的subscribe()
directory.subscribe(new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, RegistryService.class.getName(), url.getParameters()));
return registry;
}
4沙庐、服務(wù)注冊(cè)
注冊(cè)中心創(chuàng)建完成后鲤妥,執(zhí)行服務(wù)注冊(cè),邏輯比較簡(jiǎn)單拱雏,直接看代碼
// 注冊(cè)中心服務(wù)URL與服務(wù)提供者URL參數(shù)合并.
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
// 服務(wù)提供者注冊(cè)到注冊(cè)表
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);
//to judge if we need to delay publish,服務(wù)是否需要注冊(cè)發(fā)布棉安,默認(rèn)true
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {
// 執(zhí)行服務(wù)注冊(cè),核心邏輯即執(zhí)行registry的register铸抑,以DubboRegistry為例贡耽,將服務(wù)url放入registered緩存,然后再執(zhí)行RegistryService代理對(duì)象的register方法鹊汛。
register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
最后蒲赂,創(chuàng)建DestroyableExporter,并返回。
4.2 服務(wù)引用
服務(wù)引用過(guò)程包括創(chuàng)建注冊(cè)中心、創(chuàng)建注冊(cè)中心目錄(RegistryDirectory)撬呢、聚合Invoker幾部分
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = url.setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY)).removeParameter(REGISTRY_KEY);
Registry registry = registryFactory.getRegistry(url);
// type為RegistryService破加,則直接返回當(dāng)前Invoker代理
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*",多個(gè)group情況下,使用mergableCluster
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
// 真正的引用邏輯
return doRefer(cluster, registry, type, url);
}
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 創(chuàng)建RegistryDirectory
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
// 這里為什么會(huì)有服務(wù)的注冊(cè)?看代碼日志贸人,是為了解決issue#3295,具體什么問(wèn)題已經(jīng)看不到了,屬實(shí)不理解
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
// cluster聚合走触,
Invoker invoker = cluster.join(directory);
// 注冊(cè)表注冊(cè)
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
這里有個(gè)問(wèn)題需要注意,在doRefer方法中有這么一段代碼疤苹,git log顯示是為了解決issue#3295做的修改互广,看不到3295具體是什么問(wèn)題,如果有知道的朋友卧土,勞煩告知惫皱,我及時(shí)修正說(shuō)明。
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
// 這里為什么會(huì)有服務(wù)的注冊(cè)?看代碼日志夸溶,是為了解決issue#3295逸吵,具體什么問(wèn)題已經(jīng)看不到了,屬實(shí)不理解
registry.register(directory.getRegisteredConsumerUrl());
}
4.3缝裁、協(xié)議銷(xiāo)毀
銷(xiāo)毀比較容易理解扫皱,注銷(xiāo)所有exporter、清理所有exporter緩存捷绑、移除所有配置監(jiān)聽(tīng)器韩脑。
@Override
public void destroy() {
// 注銷(xiāo)所有exporter
List<Exporter<?>> exporters = new ArrayList<Exporter<?>>(bounds.values());
for (Exporter<?> exporter : exporters) {
exporter.unexport();
}
// 清除所有exporter緩存
bounds.clear();
// 移除所有監(jiān)聽(tīng)器
DynamicConfiguration.getDynamicConfiguration()
.removeListener(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX, providerConfigurationListener);
}
二、Protocol實(shí)現(xiàn)
Protocol的直接實(shí)現(xiàn)類(lèi)包括常見(jiàn)的DubboProtocol粹污、HessianProtocol等段多。下面從AbstractProtocol開(kāi)始,逐個(gè)分析.
1壮吩、AbstractProtocol
AbstractProtocol基類(lèi)并沒(méi)有對(duì)Protocol接口做過(guò)多實(shí)現(xiàn)进苍,僅實(shí)現(xiàn)了destroy方法加缘,邏輯也比較簡(jiǎn)單,銷(xiāo)毀所有的Exporter和Invoker觉啊,具體代碼如下:
@Override
public void destroy() {
// 注意順序:先銷(xiāo)毀所有invoker拣宏,再銷(xiāo)毀所有exporter
for (Invoker<?> invoker : invokers) {
if (invoker != null) {
invokers.remove(invoker);
// 中間日志打印省略
invoker.destroy();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
for (String key : new ArrayList<String>(exporterMap.keySet())) {
Exporter<?> exporter = exporterMap.remove(key);
if (exporter != null) {
// 日志打印省略
exporter.unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
}
2、DubboProtocol
DubboProtocol是Protocol的默認(rèn)SPI實(shí)現(xiàn)杠人,默認(rèn)端口20880勋乾;除了transporter依賴(lài)netty之外,其他所有邏輯都是dubbo自己實(shí)現(xiàn)嗡善。開(kāi)文提到辑莫,protocol位于exchange層之上(protocol直接依賴(lài)exchange),所以這里也會(huì)對(duì)exchange層的部分實(shí)現(xiàn)做相關(guān)分析罩引。下面各吨,我們?nèi)砸詄xport、refer袁铐、destroy方法為入口绅你,開(kāi)始解析DubboProtocol。
2.1 export(服務(wù)暴露)
先來(lái)看export方法昭躺,核心邏輯分為兩部分(stubService的邏輯處理比較簡(jiǎn)單)忌锯,創(chuàng)建Exporter并緩存、綁定并啟動(dòng)NettyServer(實(shí)際上是綁定ip端口领炫,建立socket連接)偶垮;exporter的創(chuàng)建和緩存比較簡(jiǎn)單:
// export service.創(chuàng)建exporter并緩存
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
// 對(duì)stubService的處理省略
// 綁定server
openServer(url);
// 無(wú)實(shí)質(zhì)邏輯,可以忽略
optimizeSerialization(url);
重點(diǎn)關(guān)注openServer方法帝洪。首先似舵,dubbo會(huì)先從serverMap緩存里取server實(shí)例,取到則會(huì)執(zhí)行reset操作葱峡,具體邏輯在HeaderExchangeServer#reset方法砚哗;取不到則直接創(chuàng)建,核心邏輯在createServer方法砰奕。createServer方法內(nèi)部有幾個(gè)細(xì)節(jié)需要注意: 1蛛芥、server關(guān)閉發(fā)送只讀事件,開(kāi)關(guān)默認(rèn)開(kāi)啟军援;2仅淑、默認(rèn)心跳間隔60s;3胸哥、默認(rèn)采用NettyTransporter涯竟;4、編解碼器Codec,默認(rèn)采用DubboCodec庐船。createServer方法實(shí)際上通過(guò)Exchanger接口的bind方法實(shí)現(xiàn)银酬,以HeaderExchanger(Exchanger的默認(rèn)SPI實(shí)現(xiàn))為例,核心邏輯如下:
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
可以看到筐钟,bind方法又借助Transporter的bind實(shí)現(xiàn)捡硅,創(chuàng)建Server實(shí)例。以NettyTransporter(Transporter接口的默認(rèn)SPI實(shí)現(xiàn))為例盗棵,直接創(chuàng)建NettyServer:
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
Server實(shí)例化邏輯大部分由父類(lèi)AbstractServer構(gòu)造方法完成。另外北发,AbstractServer定義模板方法doOpen纹因,并在其構(gòu)造方法中調(diào)用,doOpen方法邏輯由子類(lèi)Server實(shí)現(xiàn)琳拨,也就說(shuō)在實(shí)例化NettyServer時(shí)會(huì)執(zhí)行NettyServer.doOpen的動(dòng)作瞭恰,具體代碼如下:
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
// 參數(shù)初始化階段
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = Constants.ANYHOST_VALUE;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
// 執(zhí)行doOpen模板方法,由具體的Server實(shí)現(xiàn)
try {
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
//fixme replace this with better method狱庇,初始化線程池
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
NettyServer的doOpen方法:
// 綁定ip與端口
@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
// boss惊畏、worker線程池
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind,最終調(diào)用netty的bootstrap.bind方法建立channel通道
channel = bootstrap.bind(getBindAddress());
}
到這里,createServer的動(dòng)作就完成了密任,其本質(zhì)是借助netty創(chuàng)建服務(wù)端socket連接颜启,并綁定ip和端口。再來(lái)看浪讳,若serverMap緩存中server已存在缰盏,則執(zhí)行reset方法。核心邏輯是淹遵,若當(dāng)前心跳值與url心跳值不一致或者當(dāng)前空閑超時(shí)時(shí)間與url的空閑超時(shí)時(shí)間不一致口猜,取消關(guān)閉任務(wù)(CloseTimerTask)并重啟空閑連接檢查任務(wù)(借助前面提到的HashedWheelTimer定時(shí)器實(shí)現(xiàn))。代碼如下:
@Override
public void reset(URL url) {
server.reset(url);
try {
int currHeartbeat = UrlUtils.getHeartbeat(getUrl());
int currIdleTimeout = UrlUtils.getIdleTimeout(getUrl());
int heartbeat = UrlUtils.getHeartbeat(url);
int idleTimeout = UrlUtils.getIdleTimeout(url);
// 當(dāng)前心跳值與url心跳值不一致或者當(dāng)前空閑超時(shí)時(shí)間與url的空閑超時(shí)時(shí)間不一致透揣,取消關(guān)閉任務(wù)并重啟空閑檢查任務(wù)济炎。
if (currHeartbeat != heartbeat || currIdleTimeout != idleTimeout) {
cancelCloseTask();
startIdleCheckTask(url);
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
2.2 refer(服務(wù)引用)
DubboProtocol中服務(wù)引用過(guò)程比較簡(jiǎn)單,大致分為連接Server辐真、創(chuàng)建client數(shù)組须尚,初始化Invoker,緩存至Invoker列表等幾個(gè)步驟侍咱,最后后返回創(chuàng)建的Invoker恨闪。
@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
// 無(wú)有效邏輯,可以忽略
optimizeSerialization(url);
//初始化Invoker,重點(diǎn)關(guān)注getClients
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
// 加入invokers列表緩存
invokers.add(invoker);
return invoker;
}
創(chuàng)建Client數(shù)據(jù)的過(guò)程比較有意思放坏,首先咙咽,dubbo默認(rèn)是不共享service連接的,即一個(gè)service一個(gè)連接淤年,我們按照共享和不共享連接兩種情況分析:
1钧敞、共享連接
a蜡豹、當(dāng)用戶沒(méi)有自定義連接數(shù)時(shí),dubbo會(huì)默認(rèn)共享連接溉苛,且連接數(shù)默認(rèn)為1镜廉;
b、構(gòu)建sharedClient列表愚战,此時(shí)會(huì)優(yōu)先從緩存map中取娇唯,若緩存的sharedClient列表內(nèi)client均可用則直接返回;若緩存結(jié)果為空寂玲,則初始化sharedClient列表并緩存到map塔插,返回sharedClient列表結(jié)果;若緩存結(jié)果非空拓哟,即緩存的sharedClient列表內(nèi)部分client不可用的情況想许,則會(huì)新建client替換列表中不可用的client。注意:這里創(chuàng)建的是ReferenceCountExchangeClient實(shí)例
c断序、將sharedClient列表內(nèi)client復(fù)制到client數(shù)組流纹,作為最終結(jié)果返回。
2违诗、不共享連接
直接執(zhí)行client數(shù)組的初始化(*initClient方法*)
核心代碼如下:
private ExchangeClient[] getClients(URL url) {
// 默認(rèn)不共享連接
boolean useShareConnect = false;
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
List<ReferenceCountExchangeClient> shareClients = null;
// 若用戶沒(méi)有自定義連接數(shù)漱凝,則dubbo認(rèn)為共享連接,否則每個(gè)服務(wù)獨(dú)享一個(gè)連接
if (connections == 0) {
useShareConnect = true;
/**
* The xml configuration should have a higher priority than properties.
*/
String shareConnectionsStr = url.getParameter(Constants.SHARE_CONNECTIONS_KEY, (String) null);
// 共享連接數(shù)默認(rèn)是1
connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(Constants.SHARE_CONNECTIONS_KEY,Constants.DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
// 構(gòu)建共享client列表
shareClients = getSharedClient(url, connections);
}
// clients.size >= 1
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (useShareConnect) {
clients[i] = shareClients.get(i);
} else {
clients[i] = initClient(url);
}
}
return clients;
}
構(gòu)建共享client列表的邏輯在getSharedClient方法诸迟,篇幅原因碉哑,這里略去。在構(gòu)建client列表過(guò)程中亮蒋,都會(huì)涉及client的初始化扣典,核心邏輯在initClient。initClient方法代碼如下慎玖,其中屬性值配置部分略過(guò)(屬性值配置,同樣默認(rèn)采用NettyTransporter贮尖,默認(rèn)心跳間隔60s,編解碼默認(rèn)采用DubboCodec):
private ExchangeClient initClient(URL url) {
// 屬性值配置及參數(shù)校驗(yàn)省略
ExchangeClient client;
try {
// 延遲連接趁怔,即延遲到真正使用client時(shí)才會(huì)進(jìn)行初始化
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
// 重點(diǎn)關(guān)注湿硝,借助org.apache.dubbo.remoting.exchange.Exchanger的connect方法,創(chuàng)建client
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
與2.1中export方法類(lèi)似润努,初始化client方法借助Exchanger接口connect方法實(shí)現(xiàn)关斜,同樣以HeadExchanger為例,
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
可以看到铺浇,connect方法通過(guò)Transporter的connect方法實(shí)現(xiàn)痢畜,以NettyTransporter為例,直接實(shí)例化NettyClient并返回
@Override
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
NettyClient實(shí)例化過(guò)程與NettyServer類(lèi)似,也是借助抽象基類(lèi)(AbstractClient)構(gòu)造方法完成丁稀。抽象基類(lèi)AbstractClient定義了doOpen吼拥、doConnect、doClose线衫、doDisconnect等模板方法凿可,留給具體子類(lèi)實(shí)現(xiàn)。在其構(gòu)造方法中授账,除了部分屬性初始化邏輯之外枯跑,會(huì)依次執(zhí)行doOpen、connect白热。
// 篇幅原因敛助,方法內(nèi)異常處理、日志打印均省略
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
// 是否需要重連棘捣,dubbo默認(rèn)無(wú)需重連
needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
// 打開(kāi)Channel
doOpen();
// 執(zhí)行連接 connect.邏輯比較簡(jiǎn)單,內(nèi)部實(shí)際還是執(zhí)行具體子類(lèi)的doConnect方法
connect();
// 初始化消費(fèi)者端線程池,然后移除緩存中線程池
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
}
以NettyClient為例休建,來(lái)看doOpen與doConnect方法內(nèi)部究竟做了什么:
@Override
protected void doOpen() throws Throwable {
// 日志配置
NettyHelper.setNettyLoggerFactory();
// 初始化bootStrap
bootstrap = new ClientBootstrap(channelFactory);
// config乍恐,完善bootStrap參數(shù)配置
// @see org.jboss.netty.channel.socket.SocketChannelConfig
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("connectTimeoutMillis", getConnectTimeout());
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
}
NettyClient的doOpen方法,僅僅是對(duì)bootStrap做了初始化和部分屬性的完善测砂,再來(lái)看doConnect方法:
// 省略異常處理與日志打印
@Override
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
// 通過(guò)netty建立連接茵烈,拿到NettyChannel
ChannelFuture future = bootstrap.connect(getConnectAddress());
try {
// 非中斷標(biāo)志
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
if (ret && future.isSuccess()) {
Channel newChannel = future.getChannel();
newChannel.setInterestOps(Channel.OP_READ_WRITE);
try {
// Close old channel,關(guān)閉老的channel
Channel oldChannel = NettyClient.this.channel; // copy reference
if (oldChannel != null) {
try {
oldChannel.close();
} finally {
// 老的channel從緩存中刪除
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
// 若當(dāng)前通道處于關(guān)閉狀態(tài),那么新建的通道也保持關(guān)閉狀態(tài)
if (NettyClient.this.isClosed()) {
try {
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
}
} else {
// 原本通道處于開(kāi)啟狀態(tài)砌些,那么當(dāng)前通道直接替換為新建的channel
NettyClient.this.channel = newChannel;
}
}
} else if (future.getCause() != null) {
// 異常包裝后拋出
} finally {
// 連接失敗呜投,則取消future
if (!isConnected()) {
future.cancel();
}
}
}
doConnect借助doOpen初始化后的bootStrap創(chuàng)建新的netty通道。有個(gè)細(xì)節(jié)需要注意存璃,每次連接都會(huì)新建Channel仑荐,而且新建的Channel要與連接前的channel狀態(tài)保持一致。到這里為止纵东,采用HeaderExchangerClient粘招、NettyTransporter的DubboProtocol的服務(wù)引用流程結(jié)束。
2.3 destroy(協(xié)議銷(xiāo)毀)
destroy主要用作資源的回收偎球,比如server洒扎、client的關(guān)閉等,直接來(lái)看代碼:
public void destroy() {
// 關(guān)閉serverMap緩存內(nèi)所有server
for (String key : new ArrayList<>(serverMap.keySet())) {
ExchangeServer server = serverMap.remove(key);
if (server == null) {
continue;
}
try {
if (logger.isInfoEnabled()) {
logger.info("Close dubbo server: " + server.getLocalAddress());
}
server.close(ConfigurationUtils.getServerShutdownTimeout());
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
// 關(guān)閉clientMap緩存內(nèi)所有client
for (String key : new ArrayList<>(referenceClientMap.keySet())) {
List<ReferenceCountExchangeClient> clients = referenceClientMap.remove(key);
if (CollectionUtils.isEmpty(clients)) {
continue;
}
for (ReferenceCountExchangeClient client : clients) {
closeReferenceCountExchangeClient(client);
}
}
// stubService方法緩存清空
stubServiceMethodsMap.clear();
// 父類(lèi)destroy方法衰絮,參考AbstractProtocol的destroy方法
super.destroy();
}
3袍冷、InjvmProtocol
InjvmProtocol即Protocol本地協(xié)議實(shí)現(xiàn),僅支持服務(wù)本地暴露和引用猫牡,默認(rèn)端口0胡诗。InjvmProtocol對(duì)export、refer非常簡(jiǎn)單
// export方法,新建InjvmExporter乃戈,放入父類(lèi)的exporterMap
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
// 順便來(lái)看InjvmExporter的構(gòu)造方法
InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
super(invoker);
this.key = key;
this.exporterMap = exporterMap;
exporterMap.put(key, this);
}
// refer方法褂痰,新建InjvmInvoker,直接返回
@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
return new InjvmInvoker<T>(serviceType, url, url.getServiceKey(), exporterMap);
}
除此之外症虑,有兩個(gè)地方需要注意缩歪,InjvmProtocol對(duì)外提供單例實(shí)現(xiàn),線程安全由ExtensionLoader.getExtesion保證(內(nèi)部DCL鎖保證線程安全)代碼如下:
private static InjvmProtocol INSTANCE;
public InjvmProtocol() {
INSTANCE = this;
}
public static InjvmProtocol getInjvmProtocol() {
if (INSTANCE == null) {
//注意谍憔,這里如果直接new InjvmProtocol則是非線程安全的單例實(shí)現(xiàn)匪蝙;
ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(InjvmProtocol.NAME); // load
}
return INSTANCE;
}
另一個(gè)需要注意的地方是對(duì)本地引用的判斷,如果url中的參數(shù)scope=local习贫,或者參數(shù)injvm=true會(huì)被判斷為本地引用逛球;另外,若本地有服務(wù)暴露苫昌,則也會(huì)通過(guò)本地引用颤绕,具體代碼如下:
public boolean isInjvmRefer(URL url) {
String scope = url.getParameter(Constants.SCOPE_KEY);
// 對(duì)于本地引用來(lái)說(shuō),scope = local 與 injvm = true是完全等價(jià)的
if (Constants.SCOPE_LOCAL.equals(scope) || (url.getParameter(Constants.LOCAL_PROTOCOL, false))) {
return true;
} else if (Constants.SCOPE_REMOTE.equals(scope)) {
// 遠(yuǎn)程引用
return false;
} else if (url.getParameter(Constants.GENERIC_KEY, false)) {
// 通用invocation非本地引用
return false;
} else if (getExporter(exporterMap, url) != null) {
// 默認(rèn)情況下祟身,如果有本地服務(wù)暴露奥务,則通過(guò)本地引用
return true;
} else {
return false;
}
}
4、RedisProtocol
RedisProtocol僅支持refer操作袜硫,協(xié)議默認(rèn)端口6379氯葬,內(nèi)部借助jedis實(shí)現(xiàn),將url中g(shù)et、set婉陷、delete參數(shù)值(默認(rèn)值分別為get帚称、set、delete)分別與會(huì)話域(Invocation)中的的方法名秽澳、參數(shù)(有且僅有一個(gè)參數(shù))匹配闯睹,若匹配成功,則返回一個(gè)Invoker實(shí)現(xiàn)(invoke方法內(nèi)部會(huì)執(zhí)行redis的get(或者set担神、delete方法瞻坝,并返回操作結(jié)果)。核心代碼如下:
// 其他參數(shù)設(shè)置代碼省略
final String get = url.getParameter("get", "get");
final String set = url.getParameter("set", Map.class.equals(type) ? "put" : "set");
final String delete = url.getParameter("delete", Map.class.equals(type) ? "remove" : "delete");
return new AbstractInvoker<T>(type, url) {
@Override
protected Result doInvoke(Invocation invocation) throws Throwable {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
// get方法名取自u(píng)rl中的get參數(shù),該方法有且只能有一個(gè)參數(shù)
if (get.equals(invocation.getMethodName())) {
if (invocation.getArguments().length != 1) {
throw new IllegalArgumentException("The redis get method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
// 執(zhí)行redis get操作
byte[] value = jedis.get(String.valueOf(invocation.getArguments()[0]).getBytes());
if (value == null) {
return new RpcResult();
}
ObjectInput oin = getSerialization(url).deserialize(url, new ByteArrayInputStream(value));
return new RpcResult(oin.readObject());
} else if (set.equals(invocation.getMethodName())) {
if (invocation.getArguments().length != 2) {
throw new IllegalArgumentException("The redis set method arguments mismatch, must be two arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
byte[] key = String.valueOf(invocation.getArguments()[0]).getBytes();
ByteArrayOutputStream output = new ByteArrayOutputStream();
ObjectOutput value = getSerialization(url).serialize(url, output);
value.writeObject(invocation.getArguments()[1]);
// 執(zhí)行redis的set操作
jedis.set(key, output.toByteArray());
if (expiry > 1000) {
jedis.expire(key, expiry / 1000);
}
return new RpcResult();
} else if (delete.equals(invocation.getMethodName())) {
if (invocation.getArguments().length != 1) {
throw new IllegalArgumentException("The redis delete method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
// 執(zhí)行redis的delete操作
jedis.del(String.valueOf(invocation.getArguments()[0]).getBytes());
return new RpcResult();
} else {
throw new UnsupportedOperationException("Unsupported method " + invocation.getMethodName() + " in redis service.");
}
} catch (Throwable t) {
RpcException re = new RpcException("Failed to invoke redis service method. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url + ", cause: " + t.getMessage(), t);
if (t instanceof TimeoutException || t instanceof SocketTimeoutException) {
re.setCode(RpcException.TIMEOUT_EXCEPTION);
} else if (t instanceof JedisConnectionException || t instanceof IOException) {
re.setCode(RpcException.NETWORK_EXCEPTION);
} else if (t instanceof JedisDataException) {
re.setCode(RpcException.SERIALIZATION_EXCEPTION);
}
throw re;
} finally {
if (jedis != null) {
try {
// 關(guān)閉redis
jedis.close();
} catch (Throwable t) {
logger.warn("returnResource error: " + t.getMessage(), t);
}
}
}
}
@Override
public void destroy() {
super.destroy();
try {
// 銷(xiāo)毀jedis連接池
jedisPool.destroy();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
};
5杏瞻、MemcachedProtocol
MemcachedProtocol同樣僅支持refer操作所刀,默認(rèn)端口 11211,內(nèi)部借助google的工具xmemcached實(shí)現(xiàn)(有興趣的同學(xué)可以了解xmemcached的使用捞挥,jar包版本如下),邏輯與RedisProcotol的refer方法類(lèi)似浮创,url中g(shù)et、set砌函、delete參數(shù)值(默認(rèn)值分別為get斩披、set溜族、delete)分別與會(huì)話域中的的方法名、參數(shù)(有且僅有一個(gè)參數(shù))匹配垦沉,若匹配上返回一個(gè)Invoker實(shí)現(xiàn)(Invoke方法會(huì)執(zhí)行memcache的get(或者set煌抒、delete方法),并返回操作結(jié)果)厕倍。
<dependency>
<groupId>com.googlecode.xmemcached</groupId>
<artifactId>xmemcached</artifactId>
<version>1.3.6</version>
</dependency>
核心代碼如下:
// 其他參數(shù)設(shè)置類(lèi)代碼省略
final int expiry = url.getParameter("expiry", 0);
final String get = url.getParameter("get", "get");
final String set = url.getParameter("set", Map.class.equals(type) ? "put" : "set");
final String delete = url.getParameter("delete", Map.class.equals(type) ? "remove" : "delete");
return new AbstractInvoker<T>(type, url) {
@Override
protected Result doInvoke(Invocation invocation) throws Throwable {
try {
if (get.equals(invocation.getMethodName())) {
if (invocation.getArguments().length != 1) {
throw new IllegalArgumentException("The memcached get method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
// 執(zhí)行memecache的get操作
return new RpcResult(memcachedClient.get(String.valueOf(invocation.getArguments()[0])));
} else if (set.equals(invocation.getMethodName())) {
// 參數(shù)長(zhǎng)度非法
if (invocation.getArguments().length != 2) {
throw new IllegalArgumentException("The memcached set method arguments mismatch, must be two arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
// 執(zhí)行memcache的set操作
memcachedClient.set(String.valueOf(invocation.getArguments()[0]), expiry, invocation.getArguments()[1]);
return new RpcResult();
} else if (delete.equals(invocation.getMethodName())) {
if (invocation.getArguments().length != 1) {
throw new IllegalArgumentException("The memcached delete method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
// 執(zhí)行memcache的delete操作
memcachedClient.delete(String.valueOf(invocation.getArguments()[0]));
return new RpcResult();
} else {
throw new UnsupportedOperationException("Unsupported method " + invocation.getMethodName() + " in memcached service.");
}
} catch (Throwable t) {
RpcException re = new RpcException("Failed to invoke memcached service method. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url + ", cause: " + t.getMessage(), t);
if (t instanceof TimeoutException || t instanceof SocketTimeoutException) {
re.setCode(RpcException.TIMEOUT_EXCEPTION);
} else if (t instanceof MemcachedException || t instanceof IOException) {
re.setCode(RpcException.NETWORK_EXCEPTION);
}
throw re;
}
}
@Override
public void destroy() {
super.destroy();
try {
// memcache關(guān)閉操作
memcachedClient.shutdown();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
};
小結(jié)
上面介紹的幾種procotol直接繼承基類(lèi)AbstractProtocol寡壮,都比較容易理解。其中DubboProtocol比較復(fù)雜讹弯,完全由dubbo自己實(shí)現(xiàn)况既,僅在tansport層(網(wǎng)絡(luò)傳輸層)依賴(lài)了netty的網(wǎng)絡(luò)通信能力,相關(guān)的分析參考DubboProtocol章節(jié)组民。除了上面介紹的幾種Protocol棒仍,還有一類(lèi)protocol具備代理能力,即下面章節(jié)將要介紹的AbstractProxyProtocol臭胜。
6莫其、AbstractProxyProtocol
6.1、基礎(chǔ)方法實(shí)現(xiàn)
AbstractProxyProtocol在AbstractProtocol基類(lèi)基礎(chǔ)上引入ProxyFactory耸三,同樣實(shí)現(xiàn)了export乱陡、refer方法(內(nèi)部主要借助定義的doExport、doRefer模板方法實(shí)現(xiàn)吕晌,模板方法由具體子類(lèi)實(shí)現(xiàn))蛋褥。AbstractProxyProtocol的子類(lèi)實(shí)現(xiàn)有HessianProtocol临燃、HttpProtocol睛驳、RestProtocol、RmiProtocol膜廊、WebServiceProtocol乏沸。先來(lái)看export、refer方法:
public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {
final String uri = serviceKey(invoker.getUrl());
// 優(yōu)先從緩存取
Exporter<T> exporter = (Exporter<T>) exporterMap.get(uri);
if (exporter != null) {
// exporter的url信息不一致時(shí)爪瓜,重新執(zhí)行暴露邏輯;url信息一致則直接返回
if (Objects.equals(exporter.getInvoker().getUrl(), invoker.getUrl())) {
return exporter;
}
}
// doExport方法由具體子類(lèi)實(shí)現(xiàn)蹬跃,這里會(huì)借助proxyFacotry生成代理實(shí)例
final Runnable runnable = doExport(proxyFactory.getProxy(invoker, true), invoker.getInterface(), invoker.getUrl());
// 匿名類(lèi)實(shí)現(xiàn),初始化exporter的Invoker铆铆,重寫(xiě)unExport方法
exporter = new AbstractExporter<T>(invoker) {
@Override
public void unexport() {
super.unexport();
exporterMap.remove(uri);
if (runnable != null) {
try {
runnable.run();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
};
// exporter放入exporterMap緩存
exporterMap.put(uri, exporter);
return exporter;
}
再來(lái)看refer方法:
@Override
public <T> Invoker<T> refer(final Class<T> type, final URL url) throws RpcException {
// 借助proxyFactory生成Invoker代理實(shí)例蝶缀,內(nèi)部調(diào)用doRefer方法
final Invoker<T> target = proxyFactory.getInvoker(doRefer(type, url), type, url);
Invoker<T> invoker = new AbstractInvoker<T>(type, url) {
@Override
protected Result doInvoke(Invocation invocation) throws Throwable {
try {
// 具體代理Invoker對(duì)象執(zhí)行invoke邏輯
Result result = target.invoke(invocation);
// 有異常,則包裝為rpcException之后薄货,重新拋出
Throwable e = result.getException();
if (e != null) {
for (Class<?> rpcException : rpcExceptions) {
if (rpcException.isAssignableFrom(e.getClass())) {
throw getRpcException(type, url, invocation, e);
}
}
}
return result;
} catch (RpcException e) {
if (e.getCode() == RpcException.UNKNOWN_EXCEPTION) {
e.setCode(getErrorCode(e.getCause()));
}
throw e;
} catch (Throwable e) {
throw getRpcException(type, url, invocation, e);
}
}
};
// 保存到invokers緩存列表
invokers.add(invoker);
return invoker;
}
邏輯比較簡(jiǎn)單翁都,內(nèi)部核心的doExport、doRefer方法由具體子類(lèi)實(shí)現(xiàn)(ProxyFactory部分后面會(huì)開(kāi)篇進(jìn)行分析),來(lái)看這兩個(gè)模板方法的定義:
// 模板方法 doExport
protected abstract <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException;
// 模板方法 doRefer
protected abstract <T> T doRefer(Class<T> type, URL url) throws RpcException;
6.2谅猾、核心輔助接口
了解完AbstractProxyProtocol內(nèi)部實(shí)現(xiàn)柄慰,介紹具體子類(lèi)實(shí)現(xiàn)之前鳍悠,先來(lái)看幾個(gè)輔助接口:HttpServer、HttpBinder坐搔、HttpHandler藏研。HttpBinder用于綁定HttpServer的具體執(zhí)行動(dòng)作(由HttpHandler實(shí)現(xiàn)),HttpServer則是所有HttpServer的抽象概行。其中蠢挡,HttpBinder支持SPI,默認(rèn)SPI實(shí)現(xiàn)是JettyHttpBinder占锯。
6.2.1袒哥、HttpServer
HttpServer接口繼承Resetable,即支持reset功能消略。除此之外堡称,提供基本的server關(guān)閉等功能。子類(lèi)通過(guò)繼承抽象基類(lèi)AbstractHttpServer艺演,實(shí)現(xiàn)類(lèi)包括JettyHttpServer却紧、ServletHttpServer、TomcatHttpServer胎撤,從名字可以看出晓殊,主要是借助Jetty、Tomcat巫俺、Servlet容器為dubbo的服務(wù)暴露提供支持。下面按照順序依次進(jìn)行分析,核心邏輯都在構(gòu)造方法叹卷,先來(lái)看JettyHttpServer。
6.2.1.1 JettyHttpServer
JettyHttpServer的構(gòu)造方法主要完成JettyServer的初始化和啟動(dòng):1、創(chuàng)建并初始化JettyServer墨技;2扣汪、啟動(dòng)JettyServer:
public JettyHttpServer(URL url, final HttpHandler handler) {
super(url, handler);
this.url = url;
// TODO we should leave this setting to slf4j
// we must disable the debug logging for production use
Log.setLog(new StdErrLog());
Log.getLog().setDebugEnabled(false);
// handler交給dispatcherServlet統(tǒng)一托管
DispatcherServlet.addHttpHandler(url.getParameter(Constants.BIND_PORT_KEY, url.getPort()), handler);
// 默認(rèn)線程數(shù)200,使用jetty的隊(duì)列線程池
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
QueuedThreadPool threadPool = new QueuedThreadPool();
threadPool.setDaemon(true);
threadPool.setMaxThreads(threads);
threadPool.setMinThreads(threads);
// 創(chuàng)建并初始化nettyServer,指定JettyServer線程池大小、connector赫段、以及Handler
server = new Server(threadPool);
ServerConnector connector = new ServerConnector(server);
String bindIp = url.getParameter(Constants.BIND_IP_KEY, url.getHost());
if (!url.isAnyHost() && NetUtils.isValidLocalHost(bindIp)) {
connector.setHost(bindIp);
}
connector.setPort(url.getParameter(Constants.BIND_PORT_KEY, url.getPort()));
server.addConnector(connector);
// DispatcherServlet 托管給ServletHandler
ServletHandler servletHandler = new ServletHandler();
ServletHolder servletHolder = servletHandler.addServletWithMapping(DispatcherServlet.class, "/*");
servletHolder.setInitOrder(2);
// 設(shè)置JettyServer的handler 為ServletContextHandler
ServletContextHandler context = new ServletContextHandler(server, "/", ServletContextHandler.SESSIONS);
context.setServletHandler(servletHandler);
ServletManager.getInstance().addServletContext(url.getParameter(Constants.BIND_PORT_KEY, url.getPort()), context.getServletContext());
try {
// 啟動(dòng)nettyServer
server.start();
} catch (Exception e) {
throw new IllegalStateException("Failed to start jetty server on " + url.getParameter(Constants.BIND_IP_KEY) + ":" + url.getParameter(Constants.BIND_PORT_KEY) + ", cause: "
+ e.getMessage(), e);
}
}
這塊代碼看起來(lái)比較好理解撩银,但是有許多細(xì)節(jié)需要注意够庙,比如自定義的Handler邏輯如何執(zhí)行暮屡?什么時(shí)候執(zhí)行准夷?首先,代碼注釋中做了說(shuō)明结闸,自定義Handler會(huì)交給DispatcherServlet管理;然后,創(chuàng)建ServletHandler實(shí)例图甜,調(diào)用ServertHandler.addServletWithMapping方法將DispatcherServlet交給Jetty的ServeltHandler钦讳;再接著匪凡,創(chuàng)建ServletContextHandler,并將該ServletHandler傳遞給該實(shí)例,最后將JettyServer(父類(lèi)HandlerWrapper的屬性)的_handler引用指向創(chuàng)建的ServletContextHandler實(shí)例,自定義的Handler就完全托管給JettyServer了狮惜,JettyServer的啟動(dòng)流程如下:
DispatcherServlet.addHttpHandler
-> new QueueThreadPool()
-> 新建JettyServer實(shí)例筏餐,new Server()
-> 同步執(zhí)行ServletHandler.addServletWithMapping
-> new ServletContextHandler()當(dāng)前handler托管給JettyServer
-> 啟動(dòng)JettyServer惠呼,JettyServer.start
下一個(gè)問(wèn)題是handler的handle邏輯什么時(shí)候執(zhí)行呢,可以肯定的一點(diǎn)是,handle一定是通過(guò)DispatcherServlet的service方法來(lái)執(zhí)行,來(lái)看邏輯:
@Override
protected void service(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
// 根據(jù)端口獲取具體使用的handler
HttpHandler handler = handlers.get(request.getLocalPort());
if (handler == null) {// service not found.
response.sendError(HttpServletResponse.SC_NOT_FOUND, "Service not found.");
} else {
// 具體的handle邏輯在這里執(zhí)行
handler.handle(request, response);
}
}
這么簡(jiǎn)單嗎?當(dāng)然不是,下著來(lái)看樟遣,執(zhí)行流程如下:
JettyServer初始化過(guò)程中,初始化線程池
new QueuedThreadPool() -> _runnable = new Runnable()
-> jetty線程池異步調(diào)度執(zhí)行 -> runJob()-> ChannelEndPoint._runFillalbe -> FillInterest.fillable()
-> AbstractConnection.ReadCallback.succeeded() -> HttpConnection.onFillable()
-> HttpChannelOverHttp.handle()
-> Server.handle()(JettyServer構(gòu)造過(guò)程中脱篙,會(huì)把傳入的handler塞到Server中)
-> 執(zhí)行具體Server子類(lèi)的Handler的handle方法
詳細(xì)流程有興趣的話可以參考Jetty的QueueThreadPool實(shí)現(xiàn)适刀。
6.2.1.2 、TomcatHttpServer
與JettyHttpServer類(lèi)似吧彪,TomcatHttpServer為dubbo提供web容器能力崩侠,核心邏輯在構(gòu)造方法,同樣包括兩部分售淡,Tomcat容器初始化和容器的啟動(dòng),代碼如下:
public TomcatHttpServer(URL url, final HttpHandler handler) {
super(url, handler);
this.url = url;
// 同樣的揍堕,自定義handler托管給DispatcherServlet
DispatcherServlet.addHttpHandler(url.getPort(), handler);
String baseDir = new File(System.getProperty("java.io.tmpdir")).getAbsolutePath();
// tomcat屬性配置楞慈,與server.xml中配置項(xiàng)等同
tomcat = new Tomcat();
tomcat.setBaseDir(baseDir);
tomcat.setPort(url.getPort());
tomcat.getConnector().setProperty("maxThreads", String.valueOf(url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS)));
tomcat.getConnector().setProperty(
"maxConnections", String.valueOf(url.getParameter(Constants.ACCEPTS_KEY, -1)));
tomcat.getConnector().setProperty("URIEncoding", "UTF-8");
tomcat.getConnector().setProperty("connectionTimeout", "60000");
tomcat.getConnector().setProperty("maxKeepAliveRequests", "-1");
tomcat.getConnector().setProtocol("org.apache.coyote.http11.Http11NioProtocol");
// DisptatcherServlet交給Tomcat的context管理
Context context = tomcat.addContext("/", baseDir);
Tomcat.addServlet(context, "dispatcher", new DispatcherServlet());
context.addServletMapping("/*", "dispatcher");
ServletManager.getInstance().addServletContext(url.getPort(), context.getServletContext());
try {
// 啟動(dòng)tomcat容器
tomcat.start();
} catch (LifecycleException e) {
throw new IllegalStateException("Failed to start tomcat server at " + url.getAddress(), e);
}
}
同樣的,我們來(lái)看自定義的handler的執(zhí)行流程运杭,DispatcherServlet入口就不再介紹了,與JettyHttpServer一樣通過(guò)線程池異步執(zhí)行腕巡,直接來(lái)看Tomcat的調(diào)度流程,有興趣的同學(xué)可以自己研究下Tomcat的工作流程
線程池內(nèi)工作線程隊(duì)列,SocketProcessorBase.run() -> NioEndpoint$SocketProcessor.doRun() -> AbstractProtocol.process()
-> AbstractProcessorLight.process() -> Http11Processor.service()
-> CoyoteAdapter.service() -> StandardEngineValve.invoke()
-> ErrorReportValve.invoke() -> StandardHostValve.invoke()
-> AuthenticatorBase.invoke() -> StandardContextValve.invoke()
-> StandardWrapperValve.invoke() -> ApplicationFilterChain.doFilter()
-> ApplicationFilterChain.internalDoFilter() -> HttpServlet.service() -> DispatcherServlet.service()
-> 執(zhí)行具體的handle邏輯
6.2.1.3 表伦、ServletHttpServer
ServletHttpServer比較簡(jiǎn)單翔怎,直接使用HttpServlet作為web容器容握,代碼也比較簡(jiǎn)單,不做過(guò)多解析令花。
public ServletHttpServer(URL url, HttpHandler handler) {
super(url, handler);
DispatcherServlet.addHttpHandler(url.getParameter(Constants.BIND_PORT_KEY, 8080), handler);
}
綜上趟章,HttpServer接口主要提供web容器北戏,借助HttpBinder將dubbo服務(wù)暴露URL與web容器綁定,由web容器統(tǒng)一管理消費(fèi)者請(qǐng)求搂赋。
6.2.2轰豆、HttpBinder
HttpBinder負(fù)責(zé)dubbo服務(wù)與web容器的綁定,接口支持SPI擴(kuò)展糙置,默認(rèn)實(shí)現(xiàn)是JettyHttpBinder亡容,即默認(rèn)使用Jetty作為web容器龟糕。當(dāng)然,可以通過(guò)URL參數(shù)指定容器類(lèi)型舶治,比如 &server=tomcat指定使用Tomcat坛悉。同時(shí)支持自定義Handler荡澎,用于web容器對(duì)綁定URL的處理衔彻。HttpBinder的邏輯非常簡(jiǎn)單祖搓,這里以JettyHttpBinder為例该贾,代碼如下:
@Override
public HttpServer bind(URL url, HttpHandler handler) {
// 綁定URL與web容器寇荧,同時(shí)指定handler
return new JettyHttpServer(url, handler);
}
6.2.3刃泌、HttpHandler
HttpHandler接口只有一個(gè)通用方法hanlde,在dubbo請(qǐng)求過(guò)程中,對(duì)請(qǐng)求做處理母截。
void handle(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException;
HttpHandler主要實(shí)現(xiàn)類(lèi)有HessianProtocol.HessianHandler到忽、HttpProtocol.InternalHandler、WebServiceProtocol.WebServiceHandler清寇,分別位于對(duì)應(yīng)的Protocol實(shí)現(xiàn)中喘漏,下面從HessianHandler開(kāi)始,逐一解析颗管。HessianHandler核心邏輯是借助HessianSkeleton完成單次rpc請(qǐng)求陷遮,代碼如下:
private class HessianHandler implements HttpHandler {
// 借助HessianSkeleton,實(shí)現(xiàn)rpc請(qǐng)求
// 什么時(shí)候執(zhí)行該handler垦江?
// 對(duì)于jetty來(lái)說(shuō)帽馋,構(gòu)建QueuedThreadPool時(shí),會(huì)從自己的任務(wù)隊(duì)列取出任務(wù)比吭,調(diào)用自己的runJob方法绽族,執(zhí)行Runnable邏輯(實(shí)際執(zhí)行的是ChannelEndPoint的_runFillable的run方法)
// 大致流程: QueuedThreadPool -> QueuedThreadPool.runJob() -> ChannelEndPoint._runFillalbe -> FillInterest.fillable() -> AbstractConnection.ReadCallback.succeeded() -> HttpConnection.onFillable() -> HttpChannelOverHttp.handle() -> Server.handle()(JettyServer構(gòu)造過(guò)程中,會(huì)把傳入的handler塞到Server中) -> 具體Server子類(lèi)的Handler的handle方法
@Override
public void handle(HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException {
String uri = request.getRequestURI();
HessianSkeleton skeleton = skeletonMap.get(uri);
//僅支持post方法衩藤,貌似2.7.1以上版本支持其他類(lèi)型請(qǐng)求
if (!request.getMethod().equalsIgnoreCase("POST")) {
response.setStatus(500);
} else {
RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());
Enumeration<String> enumeration = request.getHeaderNames();
while (enumeration.hasMoreElements()) {
String key = enumeration.nextElement();
if (key.startsWith(Constants.DEFAULT_EXCHANGER)) {
RpcContext.getContext().setAttachment(key.substring(Constants.DEFAULT_EXCHANGER.length()),
request.getHeader(key));
}
}
try {
// 調(diào)用Hesssian的invoke方法
skeleton.invoke(request.getInputStream(), response.getOutputStream());
} catch (Throwable e) {
throw new ServletException(e);
}
}
}
}
再來(lái)看WebServiceHandler吧慢,借助apache的cxf,使用ServletController 完成rpc請(qǐng)求過(guò)程赏表,代碼如下:
private class WebServiceHandler implements HttpHandler {
private volatile ServletController servletController;
@Override
public void handle(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
if (servletController == null) {
HttpServlet httpServlet = DispatcherServlet.getInstance();
if (httpServlet == null) {
response.sendError(500, "No such DispatcherServlet instance.");
return;
}
synchronized (this) {
if (servletController == null) {
servletController = new ServletController(transportFactory.getRegistry(), httpServlet.getServletConfig(), httpServlet);
}
}
}
RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());
servletController.invoke(request, response);
}
}
HttpProtocol對(duì)應(yīng)的Handler實(shí)現(xiàn)即InternalHandler检诗,利用spring的httpinvoker包中的HttpInvokerServiceExporter實(shí)現(xiàn)對(duì)請(qǐng)求的處理匈仗。代碼如下
private class InternalHandler implements HttpHandler {
@Override
public void handle(HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException {
String uri = request.getRequestURI();
HttpInvokerServiceExporter skeleton = skeletonMap.get(uri);
// 同樣僅支持post請(qǐng)求
if (!request.getMethod().equalsIgnoreCase("POST")) {
response.setStatus(500);
} else {
RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());
try {
skeleton.handleRequest(request, response);
} catch (Throwable e) {
throw new ServletException(e);
}
}
}
}
7、HessianProtocol
在介紹完AbstractProxyProtocol及相關(guān)輔助接口之后逢慌,我們來(lái)看HessianProtocol悠轩,重點(diǎn)關(guān)注doExport、doRefer方法,也是HessianProtocol服務(wù)暴露與服務(wù)引用的主要邏輯所在攻泼。Hessian的doExport核心邏輯火架,可以概括為兩部分,創(chuàng)建HttpServer和創(chuàng)建Exporter并返回忙菠。創(chuàng)建HttpServer的過(guò)程借助HttpBinder實(shí)現(xiàn)何鸡,即調(diào)用HttpBinder的bind方法生成對(duì)應(yīng)server,默認(rèn)的實(shí)現(xiàn)是JettyHttpServer牛欢。生成Exporter則更為簡(jiǎn)單骡男,只是創(chuàng)建了一個(gè)Runnable,用于實(shí)現(xiàn)Exporter的unExport方法氢惋,下面來(lái)看代碼:
protected <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException {
String addr = getAddr(url);
// 先從server緩存取
HttpServer server = serverMap.get(addr);
if (server == null) {
// 取不到則新建JettyHttpServer洞翩,并啟動(dòng)
server = httpBinder.bind(url, new HessianHandler());
serverMap.put(addr, server);
}
final String path = url.getAbsolutePath();
final HessianSkeleton skeleton = new HessianSkeleton(impl, type);
skeletonMap.put(path, skeleton);
// 通用服務(wù)
final String genericPath = path + "/" + Constants.GENERIC_KEY;
skeletonMap.put(genericPath, new HessianSkeleton(impl, GenericService.class));
return new Runnable() {
@Override
public void run() {
skeletonMap.remove(path);
skeletonMap.remove(genericPath);
}
};
}
再來(lái)看doRefer(),核心邏輯同樣可以分為兩部分稽犁,創(chuàng)建HessianProxyFactory焰望、利用HessianProxyFactory生成接口的代理實(shí)現(xiàn)并返回
@Override
@SuppressWarnings("unchecked")
protected <T> T doRefer(Class<T> serviceType, URL url) throws RpcException {
String generic = url.getParameter(Constants.GENERIC_KEY);
boolean isGeneric = ProtocolUtils.isGeneric(generic) || serviceType.equals(GenericService.class);
if (isGeneric) {
RpcContext.getContext().setAttachment(Constants.GENERIC_KEY, generic);
url = url.setPath(url.getPath() + "/" + Constants.GENERIC_KEY);
}
HessianProxyFactory hessianProxyFactory = new HessianProxyFactory();
boolean isHessian2Request = url.getParameter(Constants.HESSIAN2_REQUEST_KEY, Constants.DEFAULT_HESSIAN2_REQUEST);
hessianProxyFactory.setHessian2Request(isHessian2Request);
boolean isOverloadEnabled = url.getParameter(Constants.HESSIAN_OVERLOAD_METHOD_KEY, Constants.DEFAULT_HESSIAN_OVERLOAD_METHOD);
hessianProxyFactory.setOverloadEnabled(isOverloadEnabled);
String client = url.getParameter(Constants.CLIENT_KEY, Constants.DEFAULT_HTTP_CLIENT);
// 客戶端連接方式,httpclient,默認(rèn)值jdk
if ("httpclient".equals(client)) {
HessianConnectionFactory factory = new HttpClientConnectionFactory();
factory.setHessianProxyFactory(hessianProxyFactory);
hessianProxyFactory.setConnectionFactory(factory);
} else if (client != null && client.length() > 0 && !Constants.DEFAULT_HTTP_CLIENT.equals(client)) {
// 非默認(rèn)值,則直接拋異常
throw new IllegalStateException("Unsupported http protocol client=\"" + client + "\"!");
} else {
// 默認(rèn)采用Hessian連接
HessianConnectionFactory factory = new DubboHessianURLConnectionFactory();
factory.setHessianProxyFactory(hessianProxyFactory);
hessianProxyFactory.setConnectionFactory(factory);
}
int timeout = url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
hessianProxyFactory.setConnectTimeout(timeout);
hessianProxyFactory.setReadTimeout(timeout);
// 利用hessianProxyFactory生成引用服務(wù)實(shí)例已亥,實(shí)際上是采用jdk的動(dòng)態(tài)代理熊赖,生成serviceType實(shí)例
// 前面部分完成HessianProxyFactory的各項(xiàng)初始化工作,執(zhí)行create操作時(shí)虑椎,方法內(nèi)部創(chuàng)建HessianProxy(實(shí)現(xiàn)JDK動(dòng)態(tài)代理的InvocationHandler接口)實(shí)例震鹉,HessianProxy的_factory引用了當(dāng)前的HessianProxyFactory對(duì)象,用于創(chuàng)建hessian連接時(shí)開(kāi)啟連接捆姜。整個(gè)hessian連接開(kāi)啟流程可以表示為:serviceType$Proxy.method -> HessianProxy.invoke -> HessianProxy.sendRequest -> HessianProxyFactory.getConnectionFactory -> HessianConnectionFactory.open -> AbstractHessianOutput.call(執(zhí)行被代理的方法,結(jié)果存放connection的outputStream) -> invoke方法調(diào)用結(jié)束
// 這里有個(gè)疑問(wèn)传趾,每次調(diào)用結(jié)束之后HessianConnection都會(huì)直接關(guān)閉,下次請(qǐng)求過(guò)來(lái)再重新開(kāi)啟泥技,如何保證性能浆兰?
return (T) hessianProxyFactory.create(serviceType, url.setProtocol("http").toJavaURL(), Thread.currentThread().getContextClassLoader());
}
8、HttpProtocol
HttpProtocol借助Spring的httpinvoker以及HttpInvokerProxyFactoryBean實(shí)現(xiàn)服務(wù)的暴露和引用珊豹。服務(wù)暴露過(guò)程與HessianProtocol類(lèi)似簸呈,包括兩個(gè)步驟:創(chuàng)建Server、創(chuàng)建Exporter并返回店茶,其中Server的創(chuàng)建過(guò)程同樣借助前面提到的HttpBinder實(shí)現(xiàn)蜕便;服務(wù)引用則借助Spring的FactoryBean實(shí)現(xiàn),即HttpInvokerProxyFactoryBean贩幻,最終返回的引用實(shí)例是FactoryBean.getObject轿腺。代碼比較簡(jiǎn)單两嘴,這里就省略了。
9族壳、RestProtocol
RestProtocol的doExport溶诞、doRefer方法核心邏輯與HttpProtocol大同小異,不同之處在于决侈,1螺垢、dubbo為RestProtocol獨(dú)立抽象出一個(gè)RestServer接口,也就是說(shuō)在doExport過(guò)程中,創(chuàng)建的server是RestServer赖歌;2枉圃、服務(wù)引用過(guò)程借助于resteasy工具實(shí)現(xiàn),最終采用ResteasyWebTarget生成代理服務(wù)實(shí)例(內(nèi)部實(shí)際上還是JDK的動(dòng)態(tài)代理實(shí)現(xiàn))庐冯。這里我們?cè)敿?xì)介紹一下RestServer以及RestProtocol的服務(wù)引用的過(guò)程孽亲,先來(lái)看RestServer接口。
RestServer定義了REST服務(wù)的啟動(dòng)(start)展父、部署(deploy)返劲、解除部署(undeploy)、停止(stop)等方法栖茉,其中篮绿,啟動(dòng)、部署吕漂、解除部署由基類(lèi)BaseRestServer實(shí)現(xiàn)亲配,stop方法則由具體子類(lèi)實(shí)現(xiàn)。另外惶凝,BaseRestServer定義模板方法doStart吼虎、getDeployment,由子類(lèi)具體實(shí)現(xiàn)苍鲜。來(lái)看子類(lèi)DubboHttpServer(名字取得是不是容易讓人誤解)思灰、NettyServer。
9.1混滔、DubboHttpServer
重點(diǎn)關(guān)注DubboHttpServer的doStrart方法洒疚,核心邏輯是創(chuàng)建HttpServer,初始化dispatcher遍坟。借助了resteasy的HttpServletDispatcher和ResteasyDeployment
@Override
protected void doStart(URL url) {
// TODO jetty will by default enable keepAlive so the xml config has no effect now
// 這里也借助了dubbo抽象的HttpBinder
httpServer = httpBinder.bind(url, new RestHandler());
ServletContext servletContext = ServletManager.getInstance().getServletContext(url.getPort());
if (servletContext == null) {
servletContext = ServletManager.getInstance().getServletContext(ServletManager.EXTERNAL_SERVER_PORT);
}
if (servletContext == null) {
throw new RpcException("No servlet context found. If you are using server='servlet', " +
"make sure that you've configured " + BootstrapListener.class.getName() + " in web.xml");
}
servletContext.setAttribute(ResteasyDeployment.class.getName(), deployment);
try {
// 初始化dispatcher
dispatcher.init(new SimpleServletConfig(servletContext));
} catch (ServletException e) {
throw new RpcException(e);
}
}
9.2拳亿、NettyServer
NettyServer比較簡(jiǎn)單,核心邏輯再doStart方法愿伴,負(fù)責(zé)初始化server(*resteasy的NettyJaxrsServer實(shí)例*)肺魁,并啟動(dòng):
@Override
protected void doStart(URL url) {
String bindIp = url.getParameter(Constants.BIND_IP_KEY, url.getHost());
if (!url.isAnyHost() && NetUtils.isValidLocalHost(bindIp)) {
server.setHostname(bindIp);
}
// NettyJaxrsServer實(shí)例與ip地址、端口綁定隔节,核心參數(shù)初始化
server.setPort(url.getParameter(Constants.BIND_PORT_KEY, url.getPort()));
Map<ChannelOption, Object> channelOption = new HashMap<ChannelOption, Object>();
channelOption.put(ChannelOption.SO_KEEPALIVE, url.getParameter(Constants.KEEP_ALIVE_KEY, Constants.DEFAULT_KEEP_ALIVE));
server.setChildChannelOptions(channelOption);
server.setExecutorThreadCount(url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS));
server.setIoWorkerCount(url.getParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
server.setMaxRequestSize(url.getParameter(Constants.PAYLOAD_KEY, Constants.DEFAULT_PAYLOAD));
// 啟動(dòng)serfer
server.start();
}
來(lái)看doRefer過(guò)程鹅经,核心邏輯是初始化ResteasyClient寂呛,并將client加入list緩存,然后瘾晃,構(gòu)建ResteasyWebTarget贷痪,再通過(guò)ResteasyWebTarget的代理方法生成引用實(shí)例的代理,并返回蹦误,這里省略代碼劫拢。
10、RmiProtocol
RmiProtocol的doExport邏輯强胰,通過(guò)spring的RmiServiceExporter實(shí)現(xiàn)
@Override
protected <T> Runnable doExport(final T impl, Class<T> type, URL url) throws RpcException {
//初始化RmiServiceExporter舱沧,設(shè)置相關(guān)參數(shù)
final RmiServiceExporter rmiServiceExporter = new RmiServiceExporter();
rmiServiceExporter.setRegistryPort(url.getPort());
rmiServiceExporter.setServiceName(url.getPath());
rmiServiceExporter.setServiceInterface(type);
rmiServiceExporter.setService(impl);
try {
// spring擴(kuò)展邏輯
rmiServiceExporter.afterPropertiesSet();
} catch (RemoteException e) {
throw new RpcException(e.getMessage(), e);
}
return new Runnable() {
@Override
public void run() {
try {
rmiServiceExporter.destroy();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
};
}
doRefer方法則借助spring的RmiProxyFactoryBean實(shí)現(xiàn),兼容2.7.0以下版本邏輯偶洋;方法最終返回RmiProxyFactoryBean所代理的bean實(shí)例熟吏。
protected <T> T doRefer(final Class<T> serviceType, final URL url) throws RpcException {
// 初始化RmiProxyFactoryBean
final RmiProxyFactoryBean rmiProxyFactoryBean = new RmiProxyFactoryBean();
if (isRelease270OrHigher(url.getParameter(Constants.RELEASE_KEY))) {
rmiProxyFactoryBean.setRemoteInvocationFactory(RmiRemoteInvocation::new);
} else if (isRelease263OrHigher(url.getParameter(Constants.DUBBO_VERSION_KEY))) {
rmiProxyFactoryBean.setRemoteInvocationFactory(com.alibaba.dubbo.rpc.protocol.rmi.RmiRemoteInvocation::new);
}
rmiProxyFactoryBean.setServiceUrl(url.toIdentityString());
rmiProxyFactoryBean.setServiceInterface(serviceType);
rmiProxyFactoryBean.setCacheStub(true);
rmiProxyFactoryBean.setLookupStubOnStartup(true);
rmiProxyFactoryBean.setRefreshStubOnConnectFailure(true);
rmiProxyFactoryBean.afterPropertiesSet();
// 返回factoryBean代理對(duì)象實(shí)例
return (T) rmiProxyFactoryBean.getObject();
}
11、WebServiceProtocol
WebServiceProtocol的doExport方法玄窝,借助apache的cxf(webService框架)工具包牵寺,通過(guò)ServerFactoryBean完成服務(wù)與實(shí)現(xiàn)的綁定:
@Override
protected <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException {
String addr = getAddr(url);
HttpServer httpServer = serverMap.get(addr);
// 借助HttpBinder創(chuàng)建HttpServer
if (httpServer == null) {
httpServer = httpBinder.bind(url, new WebServiceHandler());
serverMap.put(addr, httpServer);
}
final ServerFactoryBean serverFactoryBean = new ServerFactoryBean();
serverFactoryBean.setAddress(url.getAbsolutePath());
serverFactoryBean.setServiceClass(type);
serverFactoryBean.setServiceBean(impl);
serverFactoryBean.setBus(bus);
serverFactoryBean.setDestinationFactory(transportFactory);
serverFactoryBean.create();
// Exporter匿名類(lèi),內(nèi)部邏輯實(shí)現(xiàn)
return new Runnable() {
@Override
public void run() {
if(serverFactoryBean.getServer()!= null) {
serverFactoryBean.getServer().destroy();
}
if(serverFactoryBean.getBus()!=null) {
serverFactoryBean.getBus().shutdown(true);
}
}
};
}
doRefer方法也是借助apache的cxf(webService框架)恩脂,通過(guò)ClientProxyFactoryBean完成應(yīng)用服務(wù)的實(shí)例化帽氓,并返回該實(shí)例。
@Override
@SuppressWarnings("unchecked")
protected <T> T doRefer(final Class<T> serviceType, final URL url) throws RpcException {
// 創(chuàng)建ClientProxyFactoryBean實(shí)例
ClientProxyFactoryBean proxyFactoryBean = new ClientProxyFactoryBean();
proxyFactoryBean.setAddress(url.setProtocol("http").toIdentityString());
proxyFactoryBean.setServiceClass(serviceType);
proxyFactoryBean.setBus(bus);
// 動(dòng)態(tài)代理創(chuàng)建服務(wù)引用實(shí)例
T ref = (T) proxyFactoryBean.create();
Client proxy = ClientProxy.getClient(ref);
HTTPConduit conduit = (HTTPConduit) proxy.getConduit();
HTTPClientPolicy policy = new HTTPClientPolicy();
policy.setConnectionTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT));
policy.setReceiveTimeout(url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
conduit.setClient(policy);
return ref;
}
總結(jié)
Protocol在dubbo中的位置非常重要东亦,本文按照接口杏节、代理實(shí)現(xiàn)、直接實(shí)現(xiàn)三個(gè)部分對(duì)Protocol做了解析典阵。總的來(lái)講镊逝,Dubbo中Protocol的實(shí)現(xiàn)分為兩種壮啊,1、代理實(shí)現(xiàn)撑蒜,比如RegistryProtocol歹啼,并不直接實(shí)現(xiàn)Protocol,而是借助內(nèi)部引用實(shí)例完成服務(wù)暴露座菠、引用狸眼;2、直接實(shí)現(xiàn)浴滴,比如DubboProtocol拓萌,Server -> Exchanger -> Transporter -> 利用Netty建立socket連接,執(zhí)行具體的服務(wù)暴露升略、引用微王;直接實(shí)現(xiàn)中還有一類(lèi)屡限,即借助web容器比如Jetty、tomcat炕倘、servlet或者三方框架如apache的cxf實(shí)現(xiàn)server的創(chuàng)建和啟動(dòng),然后將dubbo服務(wù)URL钧大、端口與server綁定,完成服務(wù)的暴露罩旋。
注:源碼版本2.7.1啊央,歡迎指正。