dubbo發(fā)布服務(wù)的流程
1乍丈、具體的服務(wù)轉(zhuǎn)為invoker: ServiceConfig類通過ProxyFactory類的getInvoker方法训枢,將服務(wù)提供類ref生成invoker。
2餐曼、Invoker轉(zhuǎn)換成Exporter:打開通信端口碾盟,接聽來自客戶端的申請。
具體解析
1兢孝、當Spring容器實例化bean完成,ServiceBean會執(zhí)行onApplicationEvent方法,該方法調(diào)用ServiceConfig的export方法西潘。
2卷玉、ServiceConfig的父類ServiceConfig在初始化時,會率先完成protocol和proxyFactory的spi擴展
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
其中喷市,protocol 是協(xié)議的擴展相种,proxyFactory 是代理擴展(用于生成invoker)。
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws java.lang.Class {
if (arg1 == null)
throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
//根據(jù)URL配置信息獲取Protocol協(xié)議品姓,默認是dubbo
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
//根據(jù)協(xié)議名寝并,獲取Protocol的實現(xiàn)
//獲得Protocol的實現(xiàn)過程中,會對Protocol先進行依賴注入腹备,然后進行Wrapper包裝衬潦,最后返回被修改過的Protocol
//包裝經(jīng)過了ProtocolFilterWrapper,ProtocolListenerWrapper植酥,RegistryProtocol
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
public void destroy() {
throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
}
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory {
public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws java.lang.Object {
if (arg2 == null)
throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg2;
String extName = url.getParameter("proxy", "javassist");
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}
public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0);
}
}
關(guān)鍵點
//轉(zhuǎn)為invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
//invoker轉(zhuǎn)為exporter
Exporter<?> exporter = protocol.export(invoker);
ref轉(zhuǎn)為invoker 過程
public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws java.lang.Object {
if (arg2 == null)
throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg2;
String extName = url.getParameter("proxy", "javassist");
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}
dubbo默認情況下用javassist動態(tài)代理方式镀岛,將ref轉(zhuǎn)為invoker。
invoker的定義:
public interface Invoker<T> extends Node {
/**
* get service interface.
*
* @return service interface.
*/
Class<T> getInterface();
/**
* invoke.
*
* @param invocation
* @return result
* @throws RpcException
*/
Result invoke(Invocation invocation) throws RpcException;
}
Invocation 包含了ref類的相關(guān)方法名友驮,參數(shù)等漂羊。invoker可以根據(jù)這個invocation得到對應(yīng)的結(jié)果值。卸留?走越??耻瑟?
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
//下面這句話應(yīng)該是javassist動態(tài)代理的內(nèi)容旨指。結(jié)果就是wrapper這個實例里喳整,有invokeMethod方法,里面?zhèn)魅腩惖膶嵗懔椒龋梢缘玫筋惖膶嵗姆椒ǖ慕Y(jié)果
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
//返回一個Invoker實例瞬项,doInvoke方法中直接返回上面wrapper的invokeMethod
//關(guān)于生成的wrapper蔗蹋,請看下面列出的生成的代碼囱淋,其中invokeMethod方法中就有實現(xiàn)類對實際方法的調(diào)用
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
這里,就將服務(wù)類實例ref轉(zhuǎn)為了invoker妥衣。
invoker轉(zhuǎn)為exporter
Exporter<?> exporter = protocol.export(invoker);
轉(zhuǎn)為exporter的過程中皂吮,主要做了兩個工作:打開指定的端口號戒傻,監(jiān)聽來自客戶端的申請蜂筹;向Zookeeper等注冊中心注冊、訂閱服務(wù)艺挪;
向注冊中心注冊,訂閱服務(wù)
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
registry.register(registedProviderUrl);
// 訂閱override數(shù)據(jù)
// FIXME 提供者訂閱時口蝠,會影響同一JVM即暴露服務(wù)津坑,又引用同一服務(wù)的的場景,因為subscribed以服務(wù)名為緩存的key疆瑰,導致訂閱信息覆蓋乃摹。
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
//OverrideListener是RegistryProtocol的內(nèi)部類
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
//訂閱override數(shù)據(jù)
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
final Registry registry = getRegistry(originInvoker) 是根據(jù)originInvoker的url獲取注冊中心的地址跟衅,生成注冊中心的客戶端。其中 originInvoker的url是
registry://192.168.25.128:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-test-service&dubbo=2.5.3&export=dubbo%3A%2F%2F192.168.86.1%3A20880%2Fcn.andy.dubbo.DataService%3Fanyhost%3Dtrue%26application%3Ddubbo-test-service%26dispatcher%3Dall%26dubbo%3D2.5.3%26interface%3Dcn.andy.dubbo.DataService%26methods%3DdubboTest2%2CdubboTest%2CgetStringData%26mock%3Dtrue%26pid%3D47756%26retries%3D0%26service.filter%3DandyFilter%26side%3Dprovider%26threadpool%3Dfixed%26threads%3D100%26timeout%3D60000%26timestamp%3D1543287819642%26token%3D1234567&pid=47756®istry=zookeeper×tamp=1543287819603
cn.andy.dubbo.impl.DataServiceImpl@4f071df8
com.alibaba.dubbo.rpc.proxy.javassist.JavassistProxyFactory@4de41af9
interface cn.andy.dubbo.DataService
registry://192.168.25.128:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-test-service&dubbo=2.5.3&export=dubbo%3A%2F%2F192.168.86.1%3A20880%2Fcn.andy.dubbo.DataService%3Fanyhost%3Dtrue%26application%3Ddubbo-test-service%26dispatcher%3Dall%26dubbo%3D2.5.3%26interface%3Dcn.andy.dubbo.DataService%26methods%3DdubboTest2%2CdubboTest%2CgetStringData%26mock%3Dtrue%26pid%3D47756%26retries%3D0%26service.filter%3DandyFilter%26side%3Dprovider%26threadpool%3Dfixed%26threads%3D100%26timeout%3D60000%26timestamp%3D1543287819642%26token%3D1234567&pid=47756®istry=zookeeper×tamp=1543287819603
registry.register(registedProviderUrl)是將自己(url)注冊到注冊中心掰读。registedProviderUrl的地址是:
dubbo://192.168.86.1:20880/cn.andy.dubbo.DataService?anyhost=true&application=dubbo-test-service&dispatcher=all&dubbo=2.5.3&interface=cn.andy.dubbo.DataService&methods=dubboTest2,dubboTest,getStringData&mock=true&pid=47756&retries=0&service.filter=andyFilter&side=provider&threadpool=fixed&threads=100&timeout=60000×tamp=1543287819642&token=1234567
而 overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl)的地址添加了category=configurators&check=false這兩項蹈集,添加了configurators(配置規(guī)則)的寫入雇初,為后續(xù)的監(jiān)聽做準備
provider://192.168.86.1:20880/cn.andy.dubbo.DataService?anyhost=true&application=dubbo-test-service&category=configurators&check=false&dispatcher=all&dubbo=2.5.3&interface=cn.andy.dubbo.DataService&methods=dubboTest2,dubboTest,getStringData&mock=true&pid=47756&retries=0&service.filter=andyFilter&side=provider&threadpool=fixed&threads=100&timeout=60000×tamp=1543287819642&token=1234567
//訂閱override數(shù)據(jù)
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
這里是指,服務(wù)發(fā)布之后郭怪,當我們通過監(jiān)控中心或者治理中心或者直接通過代碼向注冊中心寫入配置規(guī)則時刊橘,注冊中心會通知dubbo,重新發(fā)布添加了配置規(guī)則的這個服務(wù)攒庵。
打開監(jiān)聽服務(wù)
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
上面這句就是完成了invoker轉(zhuǎn)為exporter。
最終浓冒,最轉(zhuǎn)為dubboProtocol的
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispaching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice){
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){
if (logger.isWarnEnabled()){
logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url);
return exporter;
}
openServer(url)就是打開url所對應(yīng)的netty的服務(wù)器端,進行監(jiān)聽熔萧。
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client 也可以暴露一個只有server可以調(diào)用的服務(wù)僚祷。
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
} else {
//server支持reset,配合override功能使用
server.reset(url);
}
}
}
private ExchangeServer createServer(URL url) {
//默認開啟server關(guān)閉時發(fā)送readonly事件
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
//默認開啟heartbeat
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
其中辙谜, server = Exchangers.bind(url, requestHandler)里的requestHandler就包含了我們的invoker俺榆。
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
//如果是callback 需要處理高版本調(diào)用低版本的問題
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || methodsStr.indexOf(",") == -1){
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods){
if (inv.getMethodName().equals(method)){
hasMethod = true;
break;
}
}
}
if (!hasMethod){
logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);
} else {
super.received(channel, message);
}
}
@Override
public void connected(Channel channel) throws RemotingException {
invoke(channel, Constants.ON_CONNECT_KEY);
}
@Override
public void disconnected(Channel channel) throws RemotingException {
if(logger.isInfoEnabled()){
logger.info("disconected from "+ channel.getRemoteAddress() + ",url:" + channel.getUrl());
}
invoke(channel, Constants.ON_DISCONNECT_KEY);
}
private void invoke(Channel channel, String methodKey) {
Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
if (invocation != null) {
try {
received(channel, invocation);
} catch (Throwable t) {
logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
}
}
}
private Invocation createInvocation(Channel channel, URL url, String methodKey) {
String method = url.getParameter(methodKey);
if (method == null || method.length() == 0) {
return null;
}
RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
invocation.setAttachment(Constants.PATH_KEY, url.getPath());
invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
if (url.getParameter(Constants.STUB_EVENT_KEY, false)){
invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
}
return invocation;
}
};
在received時間中調(diào)用reply方法蜕琴,reply里有invoker.invoke(inv),里面有對方法的最終調(diào)用凌简。