1.概述
RPC
作為分布式系統(tǒng)中不可或缺的中間件,在業(yè)界已經(jīng)具有相當(dāng)成熟的技術(shù)實現(xiàn)攒钳,其中Dubbo
應(yīng)用得特別廣泛,本文將對Dubbo
服務(wù)暴露的流程進行介紹。在正式進入Dubbo
原理探究之前中狂,需要先弄清楚RPC
的基本模型:
consumer
代表服務(wù)調(diào)用方,provider
代表服務(wù)提供方扑毡,registry
代表注冊中心吃型。當(dāng)服務(wù)提供方啟動時會將自己的信息(服務(wù)ip,port等)記錄在注冊中心僚楞,這樣在調(diào)用方調(diào)用的時候,會先從注冊中心獲取到提供方的基本信息枉层,然后發(fā)送網(wǎng)絡(luò)請求給provider
完成調(diào)用泉褐;同時consumer
在啟動的時候,會向注冊中心訂閱消息鸟蜡,這樣就能在provider
發(fā)生變更的時候獲取到最新的信息膜赃,保證請求路由到正確的provider
。
2.Dubbo
服務(wù)暴露概覽
Dubbo
服務(wù)暴露就是指provider
向registry
注冊的過程揉忘,以zookeeper作為注冊中心跳座,netty作為通訊框架,本文基于2.6.x
版本代碼分析泣矛。使用Dubbo
官方提供的demo運行疲眷,dubbo-demo-provider
配置文件如下:
<!-- provider's application name, used for tracing dependency relationship -->
<dubbo:application name="demo-provider"/>
<!-- use multicast registry center to export service -->
<dubbo:registry address="zookeeper://127.0.0.1:2181"/>
<!-- use dubbo protocol to export service on port 20880 -->
<dubbo:protocol name="dubbo" port="20880"/>
<!-- service implementation, as same as regular local bean -->
<bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/>
<!-- declare the service interface to be exported -->
<dubbo:service proxy="jdk" interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>
服務(wù)暴露的整個過程大致可以分為四個階段:
- 創(chuàng)建invoker對象并對其加工
- 開啟netty服務(wù),用于后續(xù)接收消費方發(fā)起的網(wǎng)絡(luò)請求
- 獲取zookeeper連接并將服務(wù)注冊成為zookeeper上的節(jié)點(服務(wù)引用的時候通過獲取節(jié)點則能夠找到provider然后建立網(wǎng)絡(luò)連接)
- 返回
Exporter
對象并放入ServiceConfig
中的exporters
變量中存儲
3.創(chuàng)建invoker對象并對其加工
在ServiceConfig
類中會先調(diào)用exportLocal
完成本地暴露您朽,然后調(diào)用proxyFactory.getInvoker
獲取invoker對象狂丝,先關(guān)注下該類中的靜態(tài)變量proxyFactory
,它會調(diào)用ExtensionLoader
中的getAdaptiveExtension
方法完成初始化,該類是Dubbo
框架自身實現(xiàn)的一種SPI
機制几颜,能夠根據(jù)配置文件靈活選擇接口的具體實現(xiàn)倍试。
package com.alibaba.dubbo.config;
/**
* ServiceConfig
*
* @export
*/
public class ServiceConfig<T> extends AbstractServiceConfig {
private static final long serialVersionUID = 3033787999037024738L;
/**根據(jù)dubbo spi機制加載**/
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
/**根據(jù)dubbo spi機制加載**/
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
private final List<Exporter<?>> exporters = new ArrayList<Exporter<?>>();
private Class<?> interfaceClass;
// don't export when none is configured
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
/**本地暴露**/
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
/**獲取invoker對象**/
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this)
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
}
}
proxyFactory
在初始化的時候會被賦值為ProxyFactory$Adaptive
,該類是通過字節(jié)碼增強實現(xiàn)的
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adaptive implements com.alibaba.dubbo.rpc.ProxyFactory {
public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException {
if (arg2 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg2;
/**獲取url中的proxy參數(shù)蛋哭,默認為javassist**/
String extName = url.getParameter("proxy", "javassist");
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
/**根據(jù)proxy參數(shù)獲取對應(yīng)的ProxyFactory接口的實現(xiàn)**/
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}
}
Dubbo
針對每一個需要根據(jù)配置決定具體實現(xiàn)的接口都會在運行時生成以$Adaptive
后綴結(jié)尾的類县习,目的是能夠根據(jù)配置文件,在運行時靈活地選擇接口的具體實現(xiàn)谆趾,例如:String extName = url.getParameter("proxy", "javassist")
這行代碼躁愿,如果url
中的proxy
屬性為空即沒有在配置文件中指定,則默認使用javassist
作為extName
的值棺妓,加載出JavassistProxyFactory
類作為ProxyFactory
接口的缺省實現(xiàn)攘已。因為這里配置<dubbo:service proxy="jdk" interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>
,所以會加載JdkProxyFactory
怜跑。但通過debug發(fā)現(xiàn)样勃,com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName)
返回的是StubProxyFactoryWrapper
,原因是ProxyFactory
接口默認有一個包裝類實現(xiàn)性芬,無論是JavassistProxyFactory
還是JdkProxyFactory
都是在StubProxyFactoryWrapper
中通過構(gòu)造函數(shù)完成proxyFactory
屬性的設(shè)置峡眶,當(dāng)ProxyFactory$Adaptive
中執(zhí)行extension.getInvoker(arg0, arg1, arg2)
的時候,先會調(diào)用到StubProxyFactoryWrapper
的getInvoker
方法植锉,該方法的實現(xiàn)為proxyFactory.getInvoker(proxy, type, url)
辫樱,所以最后會調(diào)用到JdkProxyFactory
的getInvoker
方法。對于這種有包裝類實現(xiàn)的接口俊庇,$Adaptive
會優(yōu)先加載出包裝類狮暑,而根據(jù)配置所對應(yīng)的具體實現(xiàn)則是通過構(gòu)造函數(shù)的形式作為包裝類的屬性被注入,在調(diào)用的時候先調(diào)用包裝類從而間接調(diào)用到配置所對應(yīng)的實現(xiàn)辉饱,這里使用了裝飾器模式搬男,可以在調(diào)用之間增加額外的處理。整個流程可以歸納如下:
-
ProxyFactory$Adaptive
獲取extension
- 初始化
StubProxyFactoryWrapper
彭沼,構(gòu)造函數(shù)設(shè)置proxyFactory
屬性為JdkProxyFactory
public class StubProxyFactoryWrapper implements ProxyFactory {
private final ProxyFactory proxyFactory;
/**構(gòu)造函數(shù)中設(shè)置proxyFactory值為配置所對應(yīng)的接口實現(xiàn)**/
public StubProxyFactoryWrapper(ProxyFactory proxyFactory) {
this.proxyFactory = proxyFactory;
}
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException {
return proxyFactory.getInvoker(proxy, type, url);
}
}
-
ProxyFactory$Adaptive
調(diào)用extension.getInvoker
等同于StubProxyFactoryWrapper
調(diào)用getInvoker
對應(yīng)時序圖如下所示:
返回invoker
對象后缔逛,進入到protocol.export
方法,protocol
初始化為Protocol$Adaptive
姓惑,Protocol
接口有兩個包裝類ProtocolListenerWrapper
與ProtocolFilterWrapper
褐奴,顧名思義監(jiān)聽器與過濾器,該過程會先調(diào)用ProtocolListenerWrapper
于毙,該類的protocol
屬性在初始化的時候會被設(shè)置為ProtocolFilterWrapper
敦冬,在ServiceConfig
中調(diào)用protocol.export
時會直接進入到ProtocolListenerWrapper
中的protocol.export(invoker)
方法,接著進入ProtocolFilterWrapper
中的protocol.export(invoker)
方法唯沮,ProtocolFilterWrapper
初始化時會設(shè)置protocol
屬性為RegistryProtocol
匪补,因此該過程最終會調(diào)用到RegistryProtocol
的export
方法
/**
* ListenerProtocol
*/
public class ProtocolListenerWrapper implements Protocol {
private final Protocol protocol;
public ProtocolListenerWrapper(Protocol protocol) {
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
}
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
/**ServiceConfig調(diào)用protocol.export直接進入到該方法**/
return protocol.export(invoker);
}
/**RegistryProtocol調(diào)用protocol.export時候進入**/
return new ListenerExporterWrapper<T>(protocol.export(invoker),
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
}
}
根據(jù)ProtocolFilterWrapper
類的命名伞辛,可以看出它擁有與Spring Filter類似的功能,本質(zhì)上是一個過濾器
/**
* ListenerProtocol
*/
public class ProtocolFilterWrapper implements Protocol {
private final Protocol protocol;
public ProtocolFilterWrapper(Protocol protocol) {
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
}
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
/**ServiceConfig調(diào)用protocol.export直接進入到該方法**/
return protocol.export(invoker);
}
/**RegistryProtocol調(diào)用protocol.export時候進入**/
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
}
因為設(shè)置<dubbo:protocol name="dubbo" port="20880"/>
夯缺,RegistryProtocol
中的protocol
會被設(shè)置為DubboRegistry
蚤氏,當(dāng)調(diào)用protocol.export(invokerDelegete)
的時候,會再次進入到ProtocolListenerWrapper
包裝類中踊兜,這次會執(zhí)行new ListenerExporterWrapper<T>
/**
* ListenerExporter
*/
public class ListenerExporterWrapper<T> implements Exporter<T> {
private static final Logger logger = LoggerFactory.getLogger(ListenerExporterWrapper.class);
private final Exporter<T> exporter;
private final List<ExporterListener> listeners;
public ListenerExporterWrapper(Exporter<T> exporter, List<ExporterListener> listeners) {
if (exporter == null) {
throw new IllegalArgumentException("exporter == null");
}
this.exporter = exporter;
this.listeners = listeners;
if (listeners != null && !listeners.isEmpty()) {
RuntimeException exception = null;
for (ExporterListener listener : listeners) {
if (listener != null) {
try {
/**調(diào)用listener的exported方法竿滨,作為hook函數(shù)**/
listener.exported(this);
} catch (RuntimeException t) {
logger.error(t.getMessage(), t);
exception = t;
}
}
}
if (exception != null) {
throw exception;
}
}
}
@Override
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
@Override
public void unexport() {
try {
exporter.unexport();
} finally {
if (listeners != null && !listeners.isEmpty()) {
RuntimeException exception = null;
for (ExporterListener listener : listeners) {
if (listener != null) {
try {
listener.unexported(this);
} catch (RuntimeException t) {
logger.error(t.getMessage(), t);
exception = t;
}
}
}
if (exception != null) {
throw exception;
}
}
}
}
}
重點關(guān)注ListenerExporterWrapper
的構(gòu)造函數(shù),該函數(shù)主要完成兩件事捏境,一是賦值exporter
變量于游,將外部傳入的Exporter
對象保存,二是遍歷外部傳入的ExporterListener
列表并調(diào)用其exported
方法垫言。此處為擴展點贰剥,Dubbo
對于ExporterListener
接口只給出了ExporterListenerAdapter
這一實現(xiàn)且exported
方法實現(xiàn)為空,開發(fā)者可以自己實現(xiàn)該方法以達到擴展的目的筷频,當(dāng)然也可以自己實現(xiàn)ExporterListener
接口蚌成,依據(jù)Dubbo SPI機制加載執(zhí)行自定義的業(yè)務(wù)邏輯。除構(gòu)造函數(shù)以外凛捏,unexport
也擁有類似的功能担忧,當(dāng)調(diào)用unexport
的時候會遍歷之前保存的listeners
并調(diào)用其unexport
方法。
繼續(xù)回到ProtocolFilterWrapper
坯癣,當(dāng)調(diào)用return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER))
時瓶盛,會觸發(fā)buildInvokerChain
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
/**包裝invoker對象**/
return filter.invoke(next, invocation);
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
遍歷通過SPI加載出來的Filter
來包裝invoker
對象,當(dāng)invoker
調(diào)用invoke
方法時示罗,就會觸發(fā)Filter
中的實現(xiàn)邏輯惩猫,此處使用責(zé)任鏈設(shè)計模式,當(dāng)然開發(fā)者也可以實現(xiàn)自己的Filter
來對此處邏輯進行擴展蚜点。整個流程時序圖如下所示:
4.開啟netty服務(wù)
RegistryProtocol
中調(diào)用exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker)
進入到DubboProtocol
類中的export
方法帆锋,
/**
* dubbo protocol support.
*/
public class DubboProtocol extends AbstractProtocol {
public static final String NAME = "dubbo";
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
/**獲取key,com.alibaba.dubbo.demo.DemoService:20880**/
String key = serviceKey(url);
/**創(chuàng)建exporter對象**/
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
/**保存exporter對象**/
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
/**創(chuàng)建netty server**/
openServer(url);
/**初始化序列化器**/
optimizeSerialization(url);
return exporter;
}
private void openServer(URL url) {
// find server.
/**ip+port**/
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
/**從緩存中獲取server對象**/
ExchangeServer server = serverMap.get(key);
if (server == null) {
/**緩存中沒有則創(chuàng)建server**/
serverMap.put(key, createServer(url));
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
private ExchangeServer createServer(URL url) {
// send readonly event when server closes, it's enabled by default
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
/**設(shè)置傳輸協(xié)議為dubbo**/
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
/**netty Server初始化**/
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
}
首先會創(chuàng)建DubboExporter
對象禽额,并把該對象保存到exporterMap
中,其中類:端口
會被作為key皮官,例如:com.alibaba.dubbo.demo.DemoService:20880脯倒,openServer
中是獲取netty Server的具體邏輯,serverMap
中會存放創(chuàng)建好的server
捺氢,key是ip:port
藻丢,如果在serverMap
中沒有存儲server
,則調(diào)用createServer
方法創(chuàng)建摄乒。
5.獲取zookeeper連接并將服務(wù)注冊成為zookeeper上的節(jié)點
完成Netty Server啟動后悠反,通過RegistryProtocol
中的getRegistry
方法創(chuàng)建ZookeeperRegistry
對象残黑,該對象的構(gòu)造函數(shù)會進行zookeeper的連接。
private Registry getRegistry(final Invoker<?> originInvoker) {
URL registryUrl = getRegistryUrl(originInvoker);
return registryFactory.getRegistry(registryUrl);
}
通過返回的ZookeeperRegistry
對象斋否,調(diào)用subscribe
方法便將服務(wù)注冊成為了zookeeper的節(jié)點梨水,/dubbo/com.alibaba.dubbo.demo.DemoService/providers
,值為dubbo://ip:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bean.name=com.alibaba.dubbo.demo.DemoService&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=1644&proxy=jdk&side=provider×tamp=1603026176171
茵臭,里面包含服務(wù)的ip疫诽、端口、接口名旦委、接口中包含的方法奇徒、代理模式等等。
6.總結(jié)
整個服務(wù)暴露的過程就是服務(wù)向注冊中心注冊的過程缨硝,除了基本的實現(xiàn)以外摩钙,Dubbo
在該過程中還提供了Listener
和Filter
這兩個擴展點方便開發(fā)者進行定制化的實現(xiàn)。