dubbo服務(wù)發(fā)布
dubbo服務(wù)發(fā)布只需在spring.xml中如下配置即可:
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" />
export初始化
通過2-dubbo結(jié)合spring可知洽议,<dubbo:service>
解析后封裝到ServiceBean中撮奏;ServiceBean定義如下拱她,繼承了dubbo定義的類ServiceConfig,實(shí)現(xiàn)了5個spring的接口柠辞,為了融入spring容器的啟動過程中:
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware {
... ...
}
ServiceBean實(shí)現(xiàn)了ApplicationListener接口绞惦,當(dāng)spring容器觸發(fā)了ContextRefreshedEvent事件時易核,就會調(diào)用ServiceConfig中的export()
方法發(fā)布<dubbo:service>
申明的dubbo服務(wù)狼电,且在dubbo的info級別日志中有相應(yīng)的日志:
public void onApplicationEvent(ApplicationEvent event) {
if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) {
if (isDelay() && ! isExported() && ! isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();
}
}
}
info日志示例:The service ready on spring started. service: com.alibaba.dubbo.demo.DemoService
ServiceConfig.export()
ServiceConfig中export()
方法部分源碼如下,如果<dubbo:service>
中申明了delay
(例如<dubbo:service interface="com.alibaba.dubbo.demo.TestService" ref="testService" delay="3000"/>
)盆均,那么延遲調(diào)用doExport()
發(fā)布這個服務(wù)塞弊,否則直接調(diào)用doExport()
發(fā)布服務(wù):
public synchronized void export() {
... ...
if (delay != null && delay > 0) {
Thread thread = new Thread(new Runnable() {
public void run() {
try {
Thread.sleep(delay);
} catch (Throwable e) {
}
doExport();
}
});
thread.setDaemon(true);
thread.setName("DelayExportServiceThread");
thread.start();
} else {
doExport();
}
}
ServiceConfig.doExport()
的作用:
- 檢查
<dubbo:service>
中是否配置了interface, 如果為空,那么拋出異常: - 檢查xml配置中申明的interface的類型是否是java interface類型(interfaceClass.isInterface())
- 檢查xml配置中interface和ref是否匹配(interfaceClass.isInstance(ref))
- application®istry&protocol等有效性檢查泪姨;
- 有效性檢查通過后游沿,調(diào)用doExportUrls()發(fā)布dubbo服務(wù);
ServiceConfig.doExportUrls()
通過調(diào)用loadRegistries(true)得到所有registry的url地址肮砾,例如在dubbo.properties
中通過配置dubbo.registry.address=zookeeper://127.0.0.1:2181
诀黍;protocols就是將要發(fā)布服務(wù)的協(xié)議集合(dubbo服務(wù)可以同時暴露多種協(xié)議),可以在dubbo.properties
中配置唇敞,以dubbo協(xié)議為例:
dubbo.protocol.name=dubbo
dubbo.protocol.port=20880
ServiceConfig.doExportUrls()
源碼如下:
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);
// 一般只配置dubbo協(xié)議蔗草,那么protocols就是:<dubbo:protocol name="dubbo" port="20880" id="dubbo" />
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
ServiceConfig.doExportUrlsFor1Protocol()
先把所有相關(guān)屬性封裝到Map中咒彤,例如protocol=dubbo疆柔,host=10.0.0.1咒精,port=20880,path=com.alibaba.dubbo.demo.TestService等旷档,然后構(gòu)造dubbo定義的統(tǒng)一數(shù)據(jù)模型URL:
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
得到的url如下所示(這個url非常重要模叙,貫穿整個dubbo服務(wù)的發(fā)布和調(diào)用過程,可以在服務(wù)發(fā)布后在dubbo-monitor中看到):
ServiceConfig.doExportUrlsFor1Protocol()
中根據(jù)scope判斷服務(wù)的發(fā)布范圍:
- 如果配置scope=none鞋屈, 那么不需要發(fā)布這個dubbo服務(wù)范咨;
- 沒有配置scope=noe且配置的scope!=remote, 那么本地暴露 這個dubbo服務(wù)厂庇;
- 沒有配置scope=noe且配置的scope!=remote且配置的scope!=local渠啊,那么遠(yuǎn)程暴露這個dubbo服務(wù)(例如遠(yuǎn)程暴露這個服務(wù)到zk上,默認(rèn)情況下scope沒有配置权旷,就是在這里發(fā)布服務(wù))替蛉;
實(shí)現(xiàn)源碼如下:
//配置為none不暴露
if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
//配置不是remote的情況下做本地暴露 (配置為remote,則表示只暴露遠(yuǎn)程服務(wù))
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
//如果配置不是local則暴露為遠(yuǎn)程服務(wù).(配置為local拄氯,則表示只暴露遠(yuǎn)程服務(wù))
if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
// 如果注冊url地址存在躲查,例如申明了注冊的zk地址
if (registryURLs != null && registryURLs.size() > 0
&& url.getParameter("register", true)) {
// 注冊的zk地址可能是集群,那么需要遍歷這些地址一一進(jìn)行注冊
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
// 如果申明了dubbo-monitor译柏,那么再url地址上append類似monitor=monitor全地址
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
// 默認(rèn)都是dubbo協(xié)議镣煮,所以調(diào)用DubboProtol.export(Invoker)
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
}
}
由這段源碼可知,如果發(fā)布dubbo服務(wù)到zookeeper上鄙麦,invoker.getUrl()的值為:
registry://10.0.53.87:2188/com.alibaba.dubbo.registry.RegistryService?application=dubbo-test&dubbo=2.0.0&export=dubbo%3A%2F%2F10.52.16.218%3A20886%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-test%26dubbo%3D2.0.0%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26loadbalance%3Droundrobin%26methods%3DsayHello%26owner%3Dafei%26pid%3D2380%26side%3Dprovider%26timestamp%3D1509953019382&owner=afei&pid=2380®istry=zookeeper×tamp=1509953019349
且會有兩行info級別的日志典唇;
Export dubbo service com.alibaba.dubbo.demo.DemoService to url ... ...
Register dubbo service com.alibaba.dubbo.demo.DemoService url ... ...
Protocol.export()
com.alibaba.dubbo.rpc.Protocol中暴露服務(wù)接口申明:
/**
* 暴露遠(yuǎn)程服務(wù):<br>
* 1. 協(xié)議在接收請求時,應(yīng)記錄請求來源方地址信息:RpcContext.getContext().setRemoteAddress();<br>
* 2. export()必須是冪等的胯府,也就是暴露同一個URL的Invoker兩次介衔,和暴露一次沒有區(qū)別。<br>
* 3. export()傳入的Invoker由框架實(shí)現(xiàn)并傳入盟劫,協(xié)議不需要關(guān)心夜牡。<br>
*
* @param <T> 服務(wù)的類型
* @param invoker 服務(wù)的執(zhí)行體
* @return exporter 暴露服務(wù)的引用,用于取消暴露
* @throws RpcException 當(dāng)暴露服務(wù)出錯時拋出侣签,比如端口已占用
*/
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
RegistryProtocol.export()
源碼如下:
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//registry provider塘装,根據(jù)發(fā)布的服務(wù)originInvoker得到Registry實(shí)例,由于一般都使用zookeeper為注冊中心影所,所以這里得到的是ZookeeperRegistry蹦肴;
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
// 所以這里調(diào)用ZookeeperRegistry.register(URL)把需要發(fā)布的服務(wù)注冊到zookeeper中
registry.register(registedProviderUrl);
// 訂閱override數(shù)據(jù)
// FIXME 提供者訂閱時,會影響同一JVM即暴露服務(wù)猴娩,又引用同一服務(wù)的的場景阴幌,因?yàn)閟ubscribed以服務(wù)名為緩存的key勺阐,導(dǎo)致訂閱信息覆蓋。
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//保證每次export都返回一個新的exporter實(shí)例
return new Exporter<T>() {
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
public void unexport() {
try {
exporter.unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
registry.unregister(registedProviderUrl);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
overrideListeners.remove(overrideSubscribeUrl);
registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
};
}
核心調(diào)用registry.register(registedProviderUrl)
- 調(diào)用AbstractRegistry.register(URL)矛双,把這次需要注冊的URL加到Set<URL> registered中渊抽,即本地緩存新的注冊URL;
- 在ZookeeperRegistry.doRegister(URL)調(diào)用AbstractZookeeperClient.create()议忽,toUrlPath將URL形式的地址轉(zhuǎn)換成zookeeper路徑懒闷,最終在AbstractZookeeperClient中把需要發(fā)布的服務(wù)的URL保存到zookeeper中;(依賴第三方j(luò)ar包:org.I0Itec.zkclient)
- ZookeeperRegistry.doRegister(url)注冊服務(wù)如果失斦恍摇:
- 如果開啟了啟動檢查check=true愤估,那么直接拋出異常;
- 如果沒有開啟啟動檢查速址,那么將失敗的注冊請求記錄到失敗列表玩焰,定時重試;
核心調(diào)用registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener)
:
- 對發(fā)布的dubbo服務(wù)的這個url進(jìn)行監(jiān)聽, 當(dāng)服務(wù)變化有時通知重新暴露服務(wù)芍锚, 以zookeeper為例昔园,暴露服務(wù)會在zookeeper生成一個節(jié)點(diǎn),當(dāng)節(jié)點(diǎn)發(fā)生變化的時候會觸發(fā)overrideSubscribeListener的notify方法重新暴露服務(wù)闹炉;
重試機(jī)制
注冊服務(wù)失敗后蒿赢,會將url加入重試url集合中,failedRegistered.add(url);
重試任務(wù)在FailbackRegistry中實(shí)現(xiàn):
public FailbackRegistry(URL url) {
super(url);
int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
// retryExecutor是一個單獨(dú)的線程池Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true)); 默認(rèn)重試周期是5s渣触;
this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
public void run() {
// 檢測并連接注冊中心
try {
retry();
} catch (Throwable t) { // 防御性容錯
logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
}
}
}, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}
監(jiān)聽機(jī)制
- 訂閱并設(shè)置監(jiān)聽registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
--> FailbackRegistry.subscribe(URL url, NotifyListener listener)
--> ZookeeperRegistry.doSubscribe(final URL url, final NotifyListener listener)羡棵,部分實(shí)現(xiàn)源碼如下:
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List<String> currentChilds) {
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
zkClient.create(path, false);
// 在準(zhǔn)備監(jiān)聽的path上添加ChildListener
List<String> children = zkClient.addChildListener(path, zkListener);
- 服務(wù)有變化時notify:
FailbackRegistry.notify(URL url, NotifyListener listener, List<URL> urls)
以path:/dubbo/com.alibaba.dubbo.demo.TestService/providers
為例,Consumer會監(jiān)聽這個zk路徑嗅钻;
- 假設(shè)在consumer啟動時只有1個provider:dubbo://10.52.17.98:20888... 皂冰;當(dāng)再啟動一個provider:dubbo://10.52.17.98:20886后,path路徑/dubbo/com.alibaba.dubbo.demo.TestService/providers就會變化养篓,結(jié)果就觸發(fā)notify(URL url, NotifyListener listener, List<URL> urls)秃流,此時List<URL> urls中有兩個provider,即dubbo://10.52.17.98:20886...和dubbo://10.52.17.98:20888...
- 或者在consumer啟動時有2個provider:dubbo://10.52.17.98:20886... 和 dubbo://10.52.17.98:20888... 柳弄;當(dāng)關(guān)閉一個provider:dubbo://10.52.17.98:20886后舶胀,path路徑/dubbo/com.alibaba.dubbo.demo.TestService/providers也會變化,結(jié)果就觸發(fā)notify(URL url, NotifyListener listener, List<URL> urls)碧注,此時List<URL> urls中只有1個provider嚣伐,即dubbo://10.52.17.98:20888...;
--> AbstractRegistry.notify(URL url, NotifyListener listener, List<URL> urls)
--> RegistryDirectory.notify(List<URL> urls)
--> RegistryDirectory.refreshInvoker(List<URL> invokerUrls)萍丐,這里調(diào)用toMethodInvokers(Map<String, Invoker<T>> invokersMap)
的實(shí)現(xiàn)比較重要轩端,將invokers列表轉(zhuǎn)成與方法的映射關(guān)系,且每個方法對應(yīng)的List<Invoker>
需要通過Collections.sort(methodInvokers, InvokerComparator.getComparator());
排序逝变,排序后基茵,還要將其轉(zhuǎn)為unmodifiable的map:
for (String method : new HashSet<String>(newMethodInvokerMap.keySet())) {
List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
Collections.sort(methodInvokers, InvokerComparator.getComparator());
newMethodInvokerMap.put(method, Collections.unmodifiableList(methodInvokers));
}
return Collections.unmodifiableMap(newMethodInvokerMap);
其中InvokerComparator 的定義如下奋构,即直接根據(jù)url進(jìn)行比較排序:
private static class InvokerComparator implements Comparator<Invoker<?>> {
private static final InvokerComparator comparator = new InvokerComparator();
public static InvokerComparator getComparator() {
return comparator;
}
private InvokerComparator() {}
public int compare(Invoker<?> o1, Invoker<?> o2) {
return o1.getUrl().toString().compareTo(o2.getUrl().toString());
}
}
最后刷新本地緩存的方法和List<Invoker>關(guān)系:
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
DubboProtocol.export()
dubbo協(xié)議發(fā)布服務(wù)會調(diào)用DubboProtocol.export(),
- 從Invoker中獲取URL:
URL url = invoker.getUrl(); - 根據(jù)URL得到key, 由暴露的服務(wù)接口+端口組成拱层,例如com.alibaba.dubbo.demo.DemoService:20886
String key = serviceKey(url); - 構(gòu)造DubboExporter存到Map中l(wèi)ocal cache化:
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter); - 調(diào)用DubboProtocol.openServer()開啟netty(默認(rèn))服務(wù)保持通信弥臼,并設(shè)置requestHandler處理consumer對provider的調(diào)用請求;
DubboProtocol.openServer():
key的值就是IP:Port舱呻,例如10.52.17.167:20886醋火,根據(jù)key從serverMap中如果取不到ExchangeServer悠汽,表示還沒綁定服務(wù)端口箱吕,需要調(diào)用createServer(url)-->Exchangers.bind(url, requestHandler)-->Transporters.getTransporter().bind(url, handler)(dubbo支持mina,netty柿冲,grizzly茬高,默認(rèn)實(shí)現(xiàn)是netty) --> NettyTransporter.bind(URL, ChannelHandler) --> NettyServer.open();
NettyServer.open()源碼如下:
@Override
rotected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
// https://issues.jboss.org/browse/NETTY-365
// https://issues.jboss.org/browse/NETTY-379
// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}
開啟Netty服務(wù)幾個重要的地方
- 構(gòu)造ChannelPipeline時指定了編碼&解碼假抄,其中編碼為NettyCodecAdapter.getEncoder()怎栽,解碼為NettyCodecAdapter.getDncoder();
- 指定了handler為
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
處理請求宿饱;
附dubbo官方給出的暴露服務(wù)時序圖:
https://dubbo.gitbooks.io/dubbo-dev-book/design.html