講過(guò)了集群容錯(cuò)的原理分析春寿,介些來(lái)我們看看這些服務(wù)是如何暴露出來(lái)供消費(fèi)者使用的喊熟。
這一節(jié)我們主要看看是什么時(shí)候開(kāi)始了服務(wù)暴露污淋,在這個(gè)過(guò)程中發(fā)生了什么嫉晶。
我們從ServiceBean
講起,先看一下繼承結(jié)構(gòu):
可見(jiàn)它實(shí)現(xiàn)了許多的spring的接口敛滋,其中有一個(gè)
ApplicationListener
许布,熟悉spring的話可以知道這是一個(gè)事件監(jiān)聽(tīng)方法,當(dāng)spring發(fā)出事件的時(shí)候執(zhí)行這個(gè)方法绎晃。在ServiceBean
中蜜唾,監(jiān)聽(tīng)的是ContextRefreshedEvent
事件。
public void onApplicationEvent(ContextRefreshedEvent event) {
if (isDelay() && !isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();
}
}
可以看到這里有個(gè)export
方法庶艾,要執(zhí)行此方法需要三個(gè)條件:
-
isDelay()
: 這個(gè)要注意是立即導(dǎo)出的意思袁余,不是需要延遲,通過(guò)實(shí)現(xiàn)過(guò)程就可以發(fā)現(xiàn)咱揍。
private boolean isDelay() {
Integer delay = getDelay();
ProviderConfig provider = getProvider();
if (delay == null && provider != null) {
delay = provider.getDelay();
} // 獲取到provider的 delay配置
return supportedApplicationListener && (delay == null || delay == -1);
}
看(delay == null || delay == -1)
就知道只有delay無(wú)效的時(shí)候才會(huì)返回true颖榜,所以這個(gè)方法好像寫(xiě)反了。
-
!isExported()
: 返回ServiceConfig
中exported
煤裙,通過(guò)查詢賦值過(guò)程可以發(fā)現(xiàn)只有在doExport
中賦值true
掩完。也就是說(shuō)在服務(wù)暴露后的一個(gè)標(biāo)記。 -
!isUnexported()
:返回ServiceConfig
中unexported
硼砰,同上可知是在服務(wù)銷(xiāo)毀時(shí)候賦值的標(biāo)記且蓬。
綜上可見(jiàn)要執(zhí)行export
方法的過(guò)程是 是否有延遲導(dǎo)出 && 是否已導(dǎo)出 && 是不是已被取消導(dǎo)出 來(lái)決定的。
繼續(xù)往下走進(jìn)入export
:
public synchronized void export() {
if (provider != null) {
if (export == null) {
export = provider.getExport();
}
if (delay == null) {
delay = provider.getDelay();
}
}
if (export != null && !export) {
return;
}
if (delay != null && delay > 0) {
delayExportExecutor.schedule(new Runnable() {
public void run() {
doExport();
}
}, delay, TimeUnit.MILLISECONDS);
} else {
doExport();
}
}
處理了是否需要延遲暴露题翰,需要的話放入線程池中稍后執(zhí)行doExport
缅疟;否則立即執(zhí)行。
doExport
:
protected synchronized void doExport() {
// 執(zhí)行過(guò)取消暴露的話報(bào)錯(cuò)
if (unexported) {
throw new IllegalStateException("Already unexported!");
}
// 已經(jīng)暴露過(guò)就不繼續(xù)了
if (exported) {
return;
}
exported = true;
// 檢查服務(wù)名稱合法性
if (interfaceName == null || interfaceName.length() == 0) {
throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
}
// 檢測(cè)provider是否為空遍愿,空的話新建并且根據(jù)系統(tǒng)變量進(jìn)行初始化。
checkDefault();
// 從其他配置中獲取配置耘斩,設(shè)置核心配置類(lèi)對(duì)象
if (provider != null) {
if (application == null) {
application = provider.getApplication();
}
if (module == null) {
module = provider.getModule();
}
if (registries == null) {
registries = provider.getRegistries();
}
if (monitor == null) {
monitor = provider.getMonitor();
}
if (protocols == null) {
protocols = provider.getProtocols();
}
}
if (module != null) {
if (registries == null) {
registries = module.getRegistries();
}
if (monitor == null) {
monitor = module.getMonitor();
}
}
if (application != null) {
if (registries == null) {
registries = application.getRegistries();
}
if (monitor == null) {
monitor = application.getMonitor();
}
}
// 設(shè)置泛化服務(wù)類(lèi)型
if (ref instanceof GenericService) {
interfaceClass = GenericService.class;
if (StringUtils.isEmpty(generic)) {
generic = Boolean.TRUE.toString();
}
} else {
try {
// 獲取對(duì)應(yīng)的服務(wù)類(lèi)信息
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
// 對(duì) interfaceClass沼填,以及 <dubbo:method> 標(biāo)簽中的必要字段進(jìn)行檢查
checkInterfaceAndMethods(interfaceClass, methods);
// 對(duì) ref 合法性進(jìn)行檢測(cè)
checkRef();
generic = Boolean.FALSE.toString();
}
if (local != null) {
if ("true".equals(local)) {
local = interfaceName + "Local";
}
Class<?> localClass;
try {
localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
if (!interfaceClass.isAssignableFrom(localClass)) {
throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);
}
}
if (stub != null) {
if ("true".equals(stub)) {
stub = interfaceName + "Stub";
}
Class<?> stubClass;
try {
stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
if (!interfaceClass.isAssignableFrom(stubClass)) {
throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + interfaceName);
}
}
// 檢查各模塊配置
checkApplication();
checkRegistry();
checkProtocol();
appendProperties(this);
checkStubAndMock(interfaceClass);
if (path == null || path.length() == 0) {
path = interfaceName;
}
// 導(dǎo)出服務(wù)
doExportUrls();
ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
}
總結(jié)一下在doExport
中做的事情:
- 檢測(cè)當(dāng)前服務(wù)狀態(tài),在未暴露括授,未取消暴露的情況下執(zhí)行坞笙。
- 檢測(cè)服務(wù)名稱的合法性
- 建立Provider,初始化
- 從當(dāng)前配置中獲取核心配置并賦值荚虚。
- 檢查是否是泛化調(diào)用薛夜,非泛華調(diào)用的進(jìn)行標(biāo)簽配置與服務(wù)類(lèi)型檢查。
- 本地存根的配置與檢查版述。
- 對(duì)配置類(lèi)進(jìn)行最后的檢查梯澜。
- 執(zhí)行暴露
先不管最后兩行代碼,留個(gè)問(wèn)題渴析,先來(lái)看看doExportUrls
中做了什么事情晚伙。
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
先獲取了注冊(cè)中心的url集合吮龄,再遍歷protocols進(jìn)行暴露服務(wù)。
loadRegistries
:
protected List<URL> loadRegistries(boolean provider) {
checkRegistry();
List<URL> registryList = new ArrayList<URL>();
if (registries != null && registries.size() > 0) {
for (RegistryConfig config : registries) {
String address = config.getAddress();
if (address == null || address.length() == 0) {
address = Constants.ANYHOST_VALUE;
}
String sysaddress = System.getProperty("dubbo.registry.address");
if (sysaddress != null && sysaddress.length() > 0) {
address = sysaddress;
}
if (address != null && address.length() > 0
&& !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
Map<String, String> map = new HashMap<String, String>();
appendParameters(map, application);
appendParameters(map, config);
map.put("path", RegistryService.class.getName());
map.put("dubbo", Version.getVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
if (!map.containsKey("protocol")) {
if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {
map.put("protocol", "remote");
} else {
map.put("protocol", "dubbo");
}
}
List<URL> urls = UrlUtils.parseURLs(address, map);
for (URL url : urls) {
url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
|| (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
registryList.add(url);
}
}
}
}
}
return registryList;
1.先從配置咆疗,環(huán)境變量獲取注冊(cè)中心的有效地址漓帚。
2.獲取到地址后,獲取應(yīng)用與注冊(cè)中心的配置信息午磁,再設(shè)置
-
path
注冊(cè)中心服務(wù)路徑尝抖, -
dubbo
dubbo版本, -
timestamp
啟動(dòng)時(shí)間戳迅皇, -
pid
應(yīng)用進(jìn)程id昧辽, -
protocol
協(xié)議信息 remote/dubbo
3.通過(guò)UrlUtils.parseURLs
組裝成Url,這里的address可以是多個(gè)喧半,對(duì)應(yīng)多個(gè)url奴迅,用|
或;
分割。
4.設(shè)置注冊(cè)中心的參數(shù)挺据,協(xié)議等信息取具,檢查有效性并加入結(jié)果中。
現(xiàn)在拿到了所有的注冊(cè)中心與協(xié)議信息扁耐,可以對(duì)協(xié)議與注冊(cè)中心的信息進(jìn)行組裝了暇检。
doExportUrlsFor1Protocol
:
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName();
if (name == null || name.length() == 0) {
name = "dubbo";
}
Map<String, String> map = new HashMap<String, String>();
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, protocolConfig);
appendParameters(map, this);
if (methods != null && !methods.isEmpty()) {
for (MethodConfig method : methods) {
// ......
} // end of methods for
}
if (ProtocolUtils.isGeneric(generic)) {
map.put(Constants.GENERIC_KEY, generic);
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(Constants.TOKEN_KEY, token);
}
}
if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
protocolConfig.setRegister(false);
map.put("notify", "false");
}
// export service
String contextPath = protocolConfig.getContextpath();
if ((contextPath == null || contextPath.length() == 0) && provider != null) {
contextPath = provider.getContextpath();
}
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
String scope = url.getParameter(Constants.SCOPE_KEY);
// don't export when none is configured
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
這里主要是進(jìn)行一些配置參數(shù)的組裝,對(duì)泛化與非泛化服務(wù)調(diào)用的參數(shù)處理等婉称,最后組裝成com.alibaba.dubbo.common.URL
來(lái)持有服務(wù)的信息块仆。太過(guò)于細(xì)節(jié)的地方現(xiàn)在先不看,上面省略了methods配置的處理王暗,下面簡(jiǎn)單來(lái)看一下做了什么事悔据。
if (methods != null && !methods.isEmpty()) {
for (MethodConfig method : methods) {
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
List<ArgumentConfig> arguments = method.getArguments();
if (arguments != null && !arguments.isEmpty()) {
for (ArgumentConfig argument : arguments) {
// convert argument type
if (argument.getType() != null && argument.getType().length() > 0) {
Method[] methods = interfaceClass.getMethods();
// visit all methods
if (methods != null && methods.length > 0) {
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// target the method, and get its signature
if (methodName.equals(method.getName())) {
Class<?>[] argtypes = methods[i].getParameterTypes();
// one callback in the method
if (argument.getIndex() != -1) {
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
} else {
// multiple callbacks in the method
for (int j = 0; j < argtypes.length; j++) {
Class<?> argclazz = argtypes[j];
if (argclazz.getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j) {
throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
} else if (argument.getIndex() != -1) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}
}
}
} // end of methods for
}
這里要對(duì)應(yīng)文檔看一下功能,處理的是這個(gè)標(biāo)簽:
<dubbo:reference interface="com.xxx.XxxService">
<dubbo:method name="findXxx" timeout="3000" retries="2">
<dubbo:argument index="0" callback="true" />
</dubbo:method>
</dubbo:reference>
對(duì)每一個(gè)method
標(biāo)簽處理俗壹,對(duì)每一個(gè)參數(shù)進(jìn)行設(shè)置科汗,對(duì)retry
與retries
進(jìn)行處理,統(tǒng)一用retries
表示绷雏,不重試的retries = 0头滔。
然后是一個(gè)很深的嵌套循環(huán),其實(shí)不難看懂涎显,兩層for循環(huán)for (ArgumentConfig argument : arguments) {
和for (int i = 0; i < methods.length; i++) {
主要是為了把標(biāo)簽的method與服務(wù)類(lèi)反射獲得的方法進(jìn)行匹配坤检,來(lái)執(zhí)行對(duì)callback的處理。
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
匹配到了方法中的參數(shù)期吓,并且類(lèi)型相同的化早歇,就把callback的配置放入map中。
如果沒(méi)有設(shè)置參數(shù)的index缺前,那就把所有的參數(shù)都配置上callback的配置蛀醉。
繼續(xù)看
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
加載對(duì)應(yīng)協(xié)議的ConfiguratorFactory拯刁,設(shè)置參數(shù)
下面終于到了暴露服務(wù)的核心了
String scope = url.getParameter(Constants.SCOPE_KEY);
// don't export when none is configured
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { // @1
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { // @2
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
首先獲取到scope
參數(shù),如果是none
就什么也不做逝段,接下來(lái)看到兩個(gè)if代碼塊
@1本地服務(wù)暴露垛玻,@2遠(yuǎn)程服務(wù)暴露
這個(gè)配置的作用就是指定暴露的位置,默認(rèn)暴露本地與遠(yuǎn)程奶躯,不是遠(yuǎn)程就是本地帚桩,不是本地就是遠(yuǎn)程。
@1中的exportLocal
單獨(dú)用一節(jié)來(lái)講嘹黔。
@2中有注冊(cè)中心時(shí)账嚎,為每一個(gè)注冊(cè)中心加載監(jiān)聽(tīng)參數(shù),最后生成invoker并導(dǎo)出儡蔓。沒(méi)有注冊(cè)中心時(shí)直接導(dǎo)出郭蕉。
在本地暴露或是遠(yuǎn)程暴露時(shí),都能看到時(shí)先用proxyFactory
創(chuàng)建一個(gè)invoker喂江,再使用protocol.export
進(jìn)行導(dǎo)出召锈,那我們先看一看invoker時(shí)怎么創(chuàng)建的。
Invoker 是實(shí)體域获询,它是 Dubbo 的核心模型涨岁,其它模型都向它靠擾,或轉(zhuǎn)換成它吉嚣,它代表一個(gè)可執(zhí)行體梢薪,可向它發(fā)起 invoke 調(diào)用,它有可能是一個(gè)本地的實(shí)現(xiàn)尝哆,也可能是一個(gè)遠(yuǎn)程的實(shí)現(xiàn)沮尿,也可能一個(gè)集群實(shí)現(xiàn)。
ProxyFactory
的默認(rèn)實(shí)現(xiàn)為JavassistProxyFactory
较解,簡(jiǎn)單地看一下里面是什么樣的:
/**
* JavaassistRpcProxyFactory
*/
public class JavassistProxyFactory extends AbstractProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
在getInvoker
方法中,先用Wrapper
為對(duì)應(yīng)的目標(biāo)類(lèi)創(chuàng)建一個(gè)wrapper赴邻,再返回一個(gè)匿名的Invoker印衔,重寫(xiě)了doInvoke
方法,將調(diào)用交給了wrapper.invokeMethod處理姥敛。
這里先簡(jiǎn)單介紹下Wrapper.getWrapper
方法奸焙,就是先從緩存中獲取目標(biāo)類(lèi)的wrapper對(duì)象,沒(méi)有的話,通過(guò)字符拼接的方式組裝方法与帆,通過(guò)Dubbo的ClassGenerator
來(lái)生成一個(gè)目標(biāo)類(lèi)的Wrapper對(duì)象了赌。細(xì)節(jié)方面在后面放一節(jié)來(lái)講,需要一下javassist知識(shí)玄糟。
Invoker的創(chuàng)建過(guò)程大概知道了勿她,下面看protocol.export
方法,通過(guò)SPI可以發(fā)現(xiàn)對(duì)應(yīng)多種實(shí)現(xiàn)阵翎,在調(diào)試過(guò)程中逢并,不能獲取到protocol的具體引用,是一個(gè)這個(gè)東西:
咱也不知道這是干啥的郭卫,這個(gè)需要了解dubbo spi的機(jī)制砍聊。關(guān)于spi的知識(shí)會(huì)在后面介紹,這里需要知道在調(diào)用InjvmProtocol或者其他的Protocol的時(shí)候贰军,需要先經(jīng)過(guò)
QosProtocolWrapper
玻蝌,ProtocolFilterWrapper
,ProtocolListenerWrapper
的export
方法词疼。先看一下本地暴露
exportLocal
:
private void exportLocal(URL url) {
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
URL local = URL.valueOf(url.toFullString())
.setProtocol(Constants.LOCAL_PROTOCOL)
.setHost(LOCALHOST)
.setPort(0);
ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
}
}
首先是判斷url中的protocol
參數(shù)是否設(shè)置了injvm
俯树,設(shè)置過(guò)的話表示已經(jīng)暴露過(guò),跳過(guò)寒跳。未設(shè)置的話進(jìn)入暴露流程聘萨。
為本地暴露組裝local的URL,加上protocol = injvm, host = 127.0.0.1, port = 0參數(shù)童太。
根據(jù)ref創(chuàng)建一個(gè)本地暴露用的invoker米辐,通過(guò)protocol.export
暴露,并加入到緩存中书释。
暴露的過(guò)程在InjvmProtocol
中:
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
在InjvmProtocol
維護(hù)了一個(gè)InjvmExporter
的緩存翘贮,暴露工作實(shí)際是就是創(chuàng)建一個(gè)InjvmExporter
對(duì)象。本地暴露就是這么簡(jiǎn)單爆惧,繼續(xù)分析遠(yuǎn)程暴露狸页。
遠(yuǎn)程注冊(cè)中心暴露是通過(guò)RegistryProtocol
執(zhí)行的,比本地復(fù)雜的多:
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker @1
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
URL registryUrl = getRegistryUrl(originInvoker); // @2
//registry provider
final Registry registry = getRegistry(originInvoker); // @3
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
//to judge to delay publish whether or not
boolean register = registedProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl); // @4
if (register) { // @5
register(registryUrl, registedProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); // @6
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl); //@7
}
@1通過(guò)doLocalExport
對(duì)服務(wù)進(jìn)行導(dǎo)出扯再。
@2獲取注冊(cè)中心的URL芍耘。
@3獲取注冊(cè)中心,并拿到provider的URL熄阻,在ProviderConsumerRegTable注冊(cè)表中進(jìn)行provider的注冊(cè)斋竞。
@5向注冊(cè)中心注冊(cè)服務(wù),并設(shè)置provider的注冊(cè)標(biāo)記秃殉。
@6創(chuàng)建override的監(jiān)聽(tīng)坝初,目前先不看是做什么的浸剩。
@7返回一個(gè)DestroyableExporter
除了@7其他的都比較復(fù)雜,我們來(lái)一個(gè)一個(gè)分析下去鳄袍。
先來(lái)看一下第一步的doLocalExport
做了什么
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
使用雙重檢查鎖來(lái)獲得緩存中的exporter绢要。如果不存在,通過(guò)protocol.export
創(chuàng)建拗小。之前講過(guò)在這個(gè)方法前會(huì)有三個(gè)裝飾器wrapper先執(zhí)行重罪,假設(shè)我們使用的是Dubbo的Protocol,那么來(lái)看一下DubboProtocol#export
方法:
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url);
optimizeSerialization(url);
return exporter;
}
開(kāi)頭是創(chuàng)建DubboExporter
的過(guò)程十籍,中間是關(guān)于本地存根相關(guān)的處理蛆封,這里不做分析」蠢酰看一下最后兩個(gè)方法openServer
和optimizeSerialization
是分析的重點(diǎn)惨篱。
openServer
:
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
key是url中的進(jìn)程地址與端口號(hào),每一個(gè)key維護(hù)一個(gè)ExchangeServer
围俘,沒(méi)有時(shí)進(jìn)行創(chuàng)建createServer
砸讳,有的話進(jìn)行重置server.reset(url)
。
下面分兩個(gè)分支界牡,先講一下創(chuàng)建createServer
的過(guò)程:
private ExchangeServer createServer(URL url) {
// send readonly event when server closes, it's enabled by default
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // @1
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); // @2
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler); // @3
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY); // @4
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
在前二行@1中設(shè)置了兩個(gè)參數(shù):
-
channel.readonly.sent
表示server關(guān)閉時(shí)是否發(fā)送只讀事件 -
heartbeat
心跳時(shí)間默認(rèn)1分鐘簿寂。
@2中獲取到server的類(lèi)型,并檢查是不是所支持的netty
宿亡,mina
常遂,grizzly
。
@3設(shè)置編碼解碼器參數(shù)并創(chuàng)建server挽荠。
@4檢查client
參數(shù)是不是支持的克胳。
主要步驟是檢測(cè)server參數(shù),創(chuàng)建server圈匆,檢測(cè)client參數(shù)漠另。
繼續(xù)來(lái)看Exchangers.bind
:
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);
}
看看getExchanger
做的事情:
public static Exchanger getExchanger(URL url) {
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}
public static Exchanger getExchanger(String type) {
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
獲取到了HeaderExchanger
實(shí)例,看看bind方法:
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
包含了4個(gè)邏輯跃赚,3個(gè)是構(gòu)造方法笆搓,一個(gè)是Transporters.bind
,我們主要關(guān)心下bind
方法:
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);
}
用默認(rèn)的NettyTransporter
纬傲,bind方法
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
直接創(chuàng)建了一個(gè)NettyServer
满败,繼續(xù)看它的構(gòu)造函數(shù):
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
super:
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = NetUtils.ANYHOST;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
try {
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
//fixme replace this with better method
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
首先獲取到ip,port等信息賦默認(rèn)值叹括,然后執(zhí)行doOpen
方法葫录。doOpen
是一個(gè)模板方法,由子類(lèi)實(shí)現(xiàn)领猾。繼續(xù)看看NettyServer
的doOpen
:
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
// https://issues.jboss.org/browse/NETTY-365
// https://issues.jboss.org/browse/NETTY-379
// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}
創(chuàng)建boss與worker的線程池米同,創(chuàng)建Netty的ServerBootstrap
,設(shè)置 PipelineFactory摔竿,綁定ip與端口面粮。
以上就是一個(gè)server的創(chuàng)建過(guò)程,后面的都是netty相關(guān)的源碼了继低。
總結(jié)一下這個(gè)server的創(chuàng)建過(guò)程熬苍,dubbo將netty的創(chuàng)建給封裝在NettyServer
中,通過(guò)spi獲取到相關(guān)的Transpoter袁翁,獲取到創(chuàng)建server的相關(guān)參數(shù)來(lái)創(chuàng)建server柴底。
第二個(gè)分支server.reset
,具體的調(diào)用路徑不貼了粱胜,主要看下AbstractServer
的reset
方法:
@Override
public void reset(URL url) {
if (url == null) {
return;
}
try {
if (url.hasParameter(Constants.ACCEPTS_KEY)) {
int a = url.getParameter(Constants.ACCEPTS_KEY, 0);
if (a > 0) {
this.accepts = a;
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
try {
if (url.hasParameter(Constants.IDLE_TIMEOUT_KEY)) {
int t = url.getParameter(Constants.IDLE_TIMEOUT_KEY, 0);
if (t > 0) {
this.idleTimeout = t;
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
try {
if (url.hasParameter(Constants.THREADS_KEY)
&& executor instanceof ThreadPoolExecutor && !executor.isShutdown()) {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
int threads = url.getParameter(Constants.THREADS_KEY, 0);
int max = threadPoolExecutor.getMaximumPoolSize();
int core = threadPoolExecutor.getCorePoolSize();
if (threads > 0 && (threads != max || threads != core)) {
if (threads < core) {
threadPoolExecutor.setCorePoolSize(threads);
if (core == max) {
threadPoolExecutor.setMaximumPoolSize(threads);
}
} else {
threadPoolExecutor.setMaximumPoolSize(threads);
if (core == max) {
threadPoolExecutor.setCorePoolSize(threads);
}
}
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
super.setUrl(getUrl().addParameters(url.getParameters()));
}
設(shè)置一些參數(shù) 最大可連接的客戶端數(shù)量accepts
柄驻,空閑超時(shí)時(shí)間idleTimeout
,線程數(shù)threads
焙压。
在設(shè)置threads
的時(shí)候鸿脓,會(huì)把threads
的值與當(dāng)前線程池比較。
- 如果
threads
比核心線程數(shù)小涯曲,那么減小核心線程數(shù)野哭。 - 如果
threads
比核心線程數(shù)大,那么增大最大線程數(shù)幻件。
這里這么設(shè)置我想是為了不影響原來(lái)正在執(zhí)行的線程吧拨黔,同時(shí)節(jié)省資源,留個(gè)疑問(wèn)绰沥?篱蝇??
最后一行是對(duì)url的一個(gè)重置揪利,把當(dāng)前的持有的url與暴露出來(lái)的url參數(shù)進(jìn)行合并态兴,放入AbstractPeer
里面的url。
protected void setUrl(URL url) {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
this.url = url;
}
AbstractPeer.url
是由volatile修飾的疟位,通過(guò)調(diào)試發(fā)現(xiàn)瞻润,這個(gè)url的基本信息path
,host
等都是不變的,變的只有parameters
甜刻∩茏玻可見(jiàn)這個(gè)url在這里的作用與具體的服務(wù)無(wú)關(guān),只是刷新參數(shù)信息得院,可能是與server相關(guān)的一些參數(shù)調(diào)整傻铣。當(dāng)然參數(shù)調(diào)整也不是任何都會(huì)調(diào)整,回過(guò)來(lái)看AbstractServer#reset
剛才講了三個(gè)參數(shù)的賦值祥绞,都是有條件的accepts
與idleTimeout
只能是單調(diào)遞增的調(diào)整非洲。這就是要把server的這些參數(shù)調(diào)整為暴露服務(wù)中的最大值以保證配置生效鸭限。
講完了createServer
和server.reset
了,整個(gè)openServer
的過(guò)程大概清楚了两踏。
然后調(diào)用optimizeSerialization(URL url)方法,將指定的序列化類(lèi)加載到緩存,通常配置kryo, fst序列化,不配置,默認(rèn)用hessian2序列化败京。
到此為止服務(wù)的遠(yuǎn)程暴露過(guò)程就結(jié)束了∶稳荆回到RegistryProtocol
赡麦,知道了doLoaclExport
后來(lái)看看是如何注冊(cè)的:
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
if (register) {
register(registryUrl, registedProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
首先看一下注冊(cè)表的結(jié)構(gòu):
public static ConcurrentHashMap<String, Set<ProviderInvokerWrapper>> providerInvokers = new ConcurrentHashMap<String, Set<ProviderInvokerWrapper>>();
public static ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>> consumerInvokers = new ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>>();
public static void registerProvider(Invoker invoker, URL registryUrl, URL providerUrl) {
ProviderInvokerWrapper wrapperInvoker = new ProviderInvokerWrapper(invoker, registryUrl, providerUrl);
String serviceUniqueName = providerUrl.getServiceKey();
Set<ProviderInvokerWrapper> invokers = providerInvokers.get(serviceUniqueName);
if (invokers == null) {
providerInvokers.putIfAbsent(serviceUniqueName, new ConcurrentHashSet<ProviderInvokerWrapper>());
invokers = providerInvokers.get(serviceUniqueName);
}
invokers.add(wrapperInvoker);
}
通過(guò)ConcurrentHashMap<String, Set<ProviderInvokerWrapper>> 分別維護(hù)了provider與consumer的服務(wù)映射,key為服務(wù)的group/serviceName:version組裝帕识,映射了不同的invoker泛粹。invoker由ProviderInvokerWrapper
裝飾,封裝了invoker肮疗,注冊(cè)中心url晶姊,invoker的url,provider的url族吻。同時(shí)由isReg
來(lái)標(biāo)記是否注冊(cè)成功帽借。
接下來(lái)看注冊(cè)的過(guò)程,主要分析下register
方法超歌。
public void register(URL registryUrl, URL registedProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registedProviderUrl);
}
看看AbstractRegistryFactory#getRegistry
:
@Override
public Registry getRegistry(URL url) {
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
String key = url.toServiceString();
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}