接上一章內(nèi)容色鸳,當(dāng)service的本地發(fā)布結(jié)束后進(jìn)入遠(yuǎn)程發(fā)布航唆,遠(yuǎn)程發(fā)布流程與本地發(fā)布類似將ref轉(zhuǎn)為invoker然后將invoker抓為expoter仔雷,代碼如下:
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
可以看到屋匕,ref轉(zhuǎn)invoker的方法與上一章中的exportLocal轉(zhuǎn)化的過(guò)程類似葛碧,執(zhí)行的都是JavassistProxyFactory中的getInvoker方法,然后返回一個(gè)new 的AbstractProxyInvoker對(duì)象过吻。
之后將得到的AbstractProxyInvoker封裝成一個(gè)DelegateProviderMetaDataInvoker进泼。
接下來(lái)我們來(lái)看Exporter<?> exporter = protocol.export(wrapperInvoker)這一步蔗衡。
執(zhí)行步驟:
1、通過(guò)協(xié)議protocol發(fā)布之前生成的DelegateProviderMetaDataInvoker乳绕,將invoker轉(zhuǎn)化為expoter
2绞惦、將invoker轉(zhuǎn)換為exporter --》啟動(dòng)netty--》注冊(cè)服務(wù)到zookeeper--》訂閱--》返回新的exporter實(shí)例
3、將exporter放入緩存對(duì)象
這里的protocol與proxyFactory一樣是javassist生成的一個(gè)class“protocol$Adaptive”洋措,執(zhí)行的export部分代碼如下:
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
f (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());//registry
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
根據(jù)代碼可以看出最后執(zhí)行的是RegistryProtocol.export(final Invoker<T> originInvoker)济蝉;
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);//進(jìn)行發(fā)布處理
URL registryUrl = getRegistryUrl(originInvoker);
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
//to judge to delay publish whether or not
boolean register = registedProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
if (register) {
register(registryUrl, registedProviderUrl);//將url路徑注冊(cè)到zookeeper上
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//Ensure that a new exporter instance is returned every time export
return new Exporter<T>() {
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
public void unexport() {
try {
exporter.unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
registry.unregister(registedProviderUrl);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
overrideListeners.remove(overrideSubscribeUrl);
registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
};
}
我們重點(diǎn)看方法doLocalExport:
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);//在這步會(huì)將invoker轉(zhuǎn)為exporter
bounds.put(key, exporter);
}
}
}
return exporter;
}
該方法首先查看緩存中時(shí)候存在相應(yīng)的expoter,不存在的話通過(guò)protocol發(fā)布創(chuàng)建一個(gè)新的ExporterChangeableWrapper菠发,debug可以發(fā)現(xiàn)這里的協(xié)議類型為
DubboProtocol.export(Invoker<T> invoker) 中主要做兩件事:
1王滤、將invoker轉(zhuǎn)為expoter
2、啟動(dòng)netty
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();//參數(shù)invoker對(duì)象是由ProtocolFilterWrapper中生成的封裝對(duì)象
// export service.
String key = serviceKey(url);//key = com.alibaba.dubbo.demo.DemoService:20880
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);//todo 是否提供stub存根
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);//todo 是否提供回聲服務(wù)
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url);
return exporter;
}
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));//createServer通過(guò)url創(chuàng)建service并且放入到map緩存中
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
/**
* 啟動(dòng)通過(guò)serverKey對(duì)應(yīng)名稱的服務(wù)服務(wù)滓鸠,默認(rèn)為netty
* @param url
* @return
*/
private ExchangeServer createServer(URL url) {
// send readonly event when server closes, it's enabled by default
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));//默認(rèn)心跳事件為true
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);//獲取服務(wù)名稱(默認(rèn)netty)
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);//requestHandler在RegistryProtocol創(chuàng)建時(shí)默認(rèn)被創(chuàng)建的一個(gè)對(duì)象
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
/***netty開(kāi)啟*/
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);//getExchanger(url)通過(guò)url參數(shù)獲取HeaderExchanger ,然后調(diào)用其bind方法
}
其中的ExchangeHandler 為:
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {//后期通信調(diào)用
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || methodsStr.indexOf(",") == -1) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
HeaderExchanger 中執(zhí)行的步驟分為如下:
1雁乡、將dubbo中創(chuàng)建的ExchangeHandler進(jìn)行包裝轉(zhuǎn)為HeaderExchangeHandler(HeaderExchangeHandler中重新封裝ExchangeHandler中的連接、斷開(kāi)等方法)然后將HeaderExchangeHandler轉(zhuǎn)為DecodeHandler (DecodeHandler提供封裝的received方法糜俗,received中調(diào)用內(nèi)部的編碼方法)
2踱稍、調(diào)用Transporters的bind方法,改方法最終調(diào)用NettyTransporter創(chuàng)建一個(gè)NettyServer服務(wù)
3吩跋、返回新建的HeaderExchangeServer
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
public class NettyTransporter implements Transporter {
public static final String NAME = "netty4";
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
}
netty的內(nèi)容這里暫時(shí)不做詳細(xì)解釋寞射,有興趣的可以自己看看。
OK锌钮,執(zhí)行完畢后我們返回RegistryProtocol的export方法doLocalExport(originInvoker)執(zhí)行完成后會(huì)執(zhí)行final Registry registry = getRegistry(originInvoker);進(jìn)行zkClient的創(chuàng)建桥温,執(zhí)行代碼如下:
private Registry getRegistry(final Invoker<?> originInvoker) {
URL registryUrl = getRegistryUrl(originInvoker);
/**
* 因url中的注冊(cè)中心參數(shù)為zookeeper故該出調(diào)用的實(shí)際是ZookeeperRegistryFactory
*/
return registryFactory.getRegistry(registryUrl);
}
public Registry getRegistry(URL url) {
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
String key = url.toServiceString();//key = zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService(基于demo)
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
registry = createRegistry(url);//啟動(dòng)zookeeper
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
zkClient = zookeeperTransporter.connect(url);
zkClient.addStateListener(new StateListener() {
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
}
完成后調(diào)用register(registryUrl, registedProviderUrl)將url注冊(cè)到zookeeper。最終代碼返回一個(gè)new Exporter對(duì)象梁丘。
return new Exporter<T>() {
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
public void unexport() {
try {
exporter.unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
registry.unregister(registedProviderUrl);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
overrideListeners.remove(overrideSubscribeUrl);
registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}