?dubbo服務(wù)暴露就是一個(gè)遠(yuǎn)程代理隧甚,打開網(wǎng)絡(luò)監(jiān)聽,接受服務(wù)調(diào)用請(qǐng)求渡冻,將服務(wù)接口名呻逆,IP,port發(fā)布到注冊(cè)中心的過程菩帝。通過《dubbo啟動(dòng)過程分析》可以了解到,在spring容器啟動(dòng)時(shí)會(huì)將容器中所有的bean初始化成單實(shí)例(默認(rèn)),如果bean繼承相應(yīng)的接口呼奢,在實(shí)例初始化完成后宜雀,會(huì)調(diào)用實(shí)現(xiàn)類中某些接口方法。dubbo的初始化也是通過這樣一個(gè)過程完成的握础。
ServiceConfig.export->doExport()-> doExportUrls()->doExportUrlsFor1Protocol()
//根據(jù)url暴露
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName();//獲取服務(wù)提供者協(xié)議名稱
if (name == null || name.length() == 0) {
name = "dubbo";//默認(rèn)是dubbo
}
//獲取服務(wù)主機(jī)名辐董,為空則自動(dòng)查找本機(jī)IP
if (NetUtils.isInvalidLocalHost(host)) {
anyhost = true;
try {
host = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
logger.warn(e.getMessage(), e);
}
-----------------------------------------------
}
//SPI加載協(xié)議實(shí)現(xiàn)類中的默認(rèn)端口常量
final int defaultPort = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort();
if (port == null || port == 0) {//protocol沒有配置port
port = defaultPort;
}
if (port == null || port <= 0) {
port = getRandomPort(name);//使用本機(jī)隨機(jī)可用端口
if (port == null || port < 0) {
port = NetUtils.getAvailablePort(defaultPort);
putRandomPort(name, port);
}
logger.warn("Use random available port(" + port + ") for protocol " + name);
}
//組織參數(shù)
Map<String, String> map = new HashMap<String, String>();
..............................................................
//形成類似dubbo://的統(tǒng)一URL
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
String scope = url.getParameter(Constants.SCOPE_KEY);//獲取是scope屬性
//配置為none不暴露
if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
//配置不是remote的情況下做本地暴露 (配置為remote,則表示只暴露遠(yuǎn)程服務(wù))
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
//如果配置不是local則暴露為遠(yuǎn)程服務(wù).(配置為local禀综,則表示只暴露本地服務(wù))
if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && registryURLs.size() > 0
&& url.getParameter("register", true)) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
//導(dǎo)入監(jiān)控中心信息
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
//首先將URL中dubbo替換為registry简烘,創(chuàng)建遠(yuǎn)程代理Invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
//此處protocol為spi加載的適配類,會(huì)根據(jù)invoker中的protocol不同
//調(diào)用不同的具體實(shí)現(xiàn)類定枷,此處在SPI分析中已經(jīng)說明
//Protocol$Adaptor.export()-->dubbofilterwrapper.export()-->dubbolistenerwrapper.export()-->dubboprotocol.export()
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
} else {//本地服務(wù)
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
接下來將創(chuàng)建接口實(shí)現(xiàn)類的包裝類孤澎,也就是服務(wù)的包裝類,調(diào)用過程如下:
proxyfactory$adpative.getInvoker()--> StubProxyFactoryWrapper.getInvoker()->JavassistProxyFactory.getInvoker()
調(diào)用Javaassit創(chuàng)建服務(wù)包裝類欠窒,生成的類如下覆旭,重點(diǎn)看invokeMethod()這個(gè)方法,后面invoker.invoke()會(huì)掉到此方法去執(zhí)行業(yè)務(wù)邏輯岖妄。
package com.alibaba.dubbo.common.bytecode;
import com.alibaba.dubbo.demo.provider.DemoServiceImpl;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
public class Wrapper1
extends Wrapper
implements ClassGenerator.DC
{
...........................................................
public Object invokeMethod(Object paramObject, String paramString, Class[] paramArrayOfClass, Object[] paramArrayOfObject)
throws InvocationTargetException
{
DemoServiceImpl localDemoServiceImpl;
try
{
localDemoServiceImpl = (DemoServiceImpl)paramObject;
}
catch (Throwable localThrowable1)
{
throw new IllegalArgumentException(localThrowable1);
}
try
{
if ((!"sayHello".equals(paramString)) || (paramArrayOfClass.length == 1)) {
return localDemoServiceImpl.sayHello((String)paramArrayOfObject[0]);
}
}
catch (Throwable localThrowable2)
{
throw new InvocationTargetException(localThrowable2);
}
}
}
最后返回AbstractProxyInvoker的實(shí)現(xiàn)型将,此實(shí)現(xiàn)類是一個(gè)非常重要的類,包含了服務(wù)實(shí)現(xiàn)類荐虐,服務(wù)接口七兜,url,還有剛才生成的服務(wù)包裝類的引用福扬。
然后執(zhí)行到如下的方法:
Exporter<?> exporter = protocol.export(invoker);
此處調(diào)用的過程為protocol$Adaptive.export()-->ProtocolFilterWrapper.export()--->ProtocolListenerWrapper.export()腕铸,兩個(gè)wrapper什么都沒有做,直接放行忧换,最終調(diào)到RegistryProtocol.export()
參數(shù)為剛才封裝好的AbstractProxyInvoker實(shí)現(xiàn)類恬惯,然后:
ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
下面來分析這個(gè)方法,這個(gè)方法非常重要亚茬,就在此方法中調(diào)用DubboProtocol完成服務(wù)的發(fā)布
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker){
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
exporter = new ExporterChangeableWrapper<T>
//此處打開網(wǎng)絡(luò)監(jiān)聽
((Exporter<T>)protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return (ExporterChangeableWrapper<T>) exporter;
}
在doLocalExport()中酪耳,將provider url與AbstractProxyInvoker實(shí)現(xiàn)類封裝到InvokerDelegete對(duì)象中,然后執(zhí)行protocol.export(invokerDelegete)方法刹缝,此處后面會(huì)重點(diǎn)分析碗暗,此方法返回一個(gè)DubboExporter對(duì)象,將此對(duì)象與AbstractProxyInvoker實(shí)現(xiàn)類封裝在ExporterChangeableWrapper對(duì)象中梢夯,并存儲(chǔ)在RegistryProtocol這個(gè)類的bounds屬性言疗,這個(gè)屬性是一個(gè)線程安全的map,以便以后服務(wù)調(diào)用使用颂砸。下面分析調(diào)用dubbo協(xié)議打開網(wǎng)絡(luò)監(jiān)聽的過程噪奄。依然是SPI機(jī)制死姚,經(jīng)過ProtocolFilterWrapper.export(),完成對(duì)Invoker的包裝勤篮,Invoker中加入了Filter調(diào)用鏈都毒。
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
//SPI機(jī)制加載所有Filter擴(kuò)展
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (filters.size() > 0) {
//加入Filter執(zhí)行鏈
for (int i = filters.size() - 1; i >= 0; i --) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
.......................................................
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
.......................................................
};
}
}
return last;
}
然后經(jīng)過ProtocolListenerWrapper.export()調(diào)到DubboProtocol.export(),將結(jié)果封裝成ListenerExporterWrapper返回碰缔。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
...................................................
openServer(url);
return exporter;
}
將前面?zhèn)鬟f過來的invoker,key账劲,exporterMap封裝到DubboExporter中,打開server此處是后續(xù)調(diào)用netty打開網(wǎng)絡(luò)監(jiān)聽,最終返回到DubboProtocol處的是HeaderExchangeServer金抡,此對(duì)象持有nettyserver瀑焦,在nettyserver構(gòu)造方法中,有兩個(gè)構(gòu)造參數(shù)一個(gè)是url梗肝,另一個(gè)是DecodeHandler榛瓮,此對(duì)象里又封裝了HeaderExchangeHandler,又封裝了ExchangeHandler统捶。初始化了nettyserver的基本參數(shù)如:ip,port,timeout等等榆芦。具體調(diào)用流程如下:
openServer(url)-->createServer(url)-->Exchangers.bind(url, requestHandler)-->HeaderExchanger.bind()-->NettyTransporter.bind()-->nettyserver.doopen()
在doopen()方法中,就是對(duì)netty的初始化操作喘鸟,設(shè)置線程池匆绣,綁定decoder、encoder什黑、handler崎淳,然后打開端口,進(jìn)行監(jiān)聽愕把。
@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
channel = bootstrap.bind(getBindAddress());
}
打開網(wǎng)絡(luò)監(jiān)聽后拣凹,再次回到RegistryProtocol類中,下面就開始調(diào)用注冊(cè)中心進(jìn)行服務(wù)注冊(cè)和訂閱恨豁。首先調(diào)用getRegistry(originInvoker)嚣镜,spi機(jī)制,初始化時(shí)注入的是registryFactory適配類橘蜜,根據(jù)url中注冊(cè)中心參數(shù)獲取具體的實(shí)現(xiàn)類菊匿,此處是ZookeeperRegistryFactory
private Registry getRegistry(final Invoker<?> originInvoker){
URL registryUrl = originInvoker.getUrl();
if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
}
return registryFactory.getRegistry(registryUrl);
}
調(diào)用到AbstractRegistryFactory.getRegistry()>ZookeeperRegistryFactory.createRegistry(),最終返回一個(gè) ZookeeperRegistry對(duì)象
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
//此處是spi機(jī)制注入的適配類
private ZookeeperTransporter zookeeperTransporter;
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
}
返回到RegistryProtocol.export(),獲得到了 ZookeeperRegistry计福,然后調(diào)用registry.register(registedProviderUrl)進(jìn)行注冊(cè)跌捆,進(jìn)一步跟蹤,調(diào)用鏈路為:FailbackRegistry.register()-->AbstractRegistry.register()-->ZookeeperRegistry.doRegister()象颖,在zk上創(chuàng)建一個(gè)臨時(shí)節(jié)點(diǎn)佩厚,注冊(cè)完成
protected void doRegister(URL url) {
try {
//在zk上創(chuàng)建一個(gè)臨時(shí)節(jié)點(diǎn)
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
回到RegistryProtocol,注冊(cè)完成之后说订,下面開始訂閱抄瓦,需要感知zk node的變化潮瓶,此處使用的zk的watcher機(jī)制,首先初始化一個(gè)NotifyListener闺鲸,后面監(jiān)聽變化調(diào)用到此對(duì)象中的notify方法筋讨。
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
調(diào)用鏈路:FailbackRegistry.subscribe()-->AbstractRegistry.subscribe()-->ZookeeperRegistry.doSubscribe(),主要是注冊(cè)主節(jié)點(diǎn)和子節(jié)點(diǎn)監(jiān)聽
protected void doSubscribe(final URL url, final NotifyListener listener) {
.......................
List<URL> urls = new ArrayList<URL>();
for (String path : toCategoriesPath(url)) {
//zkListeners根據(jù)key存儲(chǔ)了主節(jié)點(diǎn)和子節(jié)點(diǎn)監(jiān)聽
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {//沒有 初始化
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
//根據(jù)主節(jié)點(diǎn)監(jiān)聽獲取子節(jié)點(diǎn)監(jiān)聽
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {//沒有,new一個(gè)
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List<String> currentChilds) {
//節(jié)點(diǎn)刪除摸恍,更新會(huì)觸發(fā)notify方法
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
zkClient.create(path, false);//創(chuàng)造一個(gè)永久節(jié)點(diǎn)?赤屋?
//添加子節(jié)點(diǎn)監(jiān)聽器
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
notify(url, listener, urls);
}
.......................
}
notify(url, listener, urls)調(diào)用鏈比較復(fù)雜立镶,鏈路為:
FailbackRegistry.notify()-->FailbackRegistry.doNotify()-->AbstractRegistry.notify()-->listener.notify(),此處的listener為剛才registryProtocol傳遞過來的OverrideListener,然后調(diào)用OverrideListener.notify()-->RegistryProtocol.doChangeLocalExport(),對(duì)修改了url的invoker重新export,至此整個(gè)發(fā)布過程全部完成类早。
private <T> void doChangeLocalExport(final Invoker<T> originInvoker, URL newInvokerUrl){
String key = getCacheKey(originInvoker);
final ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null){
logger.warn(new IllegalStateException("error state, exporter should not be null"));
return ;//不存在是異常場景 直接返回
} else {
final Invoker<T> invokerDelegete = new InvokerDelegete<T>(originInvoker, newInvokerUrl);
//調(diào)用dubboprotocol.export()重新發(fā)布
exporter.setExporter(protocol.export(invokerDelegete));
}
}