作為分布式框架瓷产,最核心的功能無(wú)非是服務(wù)的暴露和服務(wù)的引用,今天我們先說(shuō)服務(wù)的暴露宅荤。
我們先從暴露服務(wù)配置說(shuō)起
@Bean
public ServiceBean logServiceServiceBean(LogService logService){
ServiceBean<LogService> serviceBean=new ServiceBean<>();
serviceBean.setInterface(LogService.class);
serviceBean.setRef(logService);
return serviceBean;
}
我們聲明了一個(gè)ServiceBean,他的接口類(lèi)型是LogService.class
,他的實(shí)現(xiàn)是注入進(jìn)來(lái)的logService
智哀。既然他被封裝成了ServiceBean拇颅,我們就有必要看看他是怎么實(shí)現(xiàn)的。ServiceBean實(shí)現(xiàn)了InitializingBean伸刃,我們就需要看看他是如何覆寫(xiě)其中的afterPropertiesSet方法的:
public void afterPropertiesSet() throws Exception {
if (getProvider() == null) {
Map<String, ProviderConfig> providerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProviderConfig.class, false, false);
if (providerConfigMap != null && providerConfigMap.size() > 0) {
Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false);
if ((protocolConfigMap == null || protocolConfigMap.size() == 0)
&& providerConfigMap.size() > 1) { // 兼容舊版本
List<ProviderConfig> providerConfigs = new ArrayList<ProviderConfig>();
for (ProviderConfig config : providerConfigMap.values()) {
if (config.isDefault() != null && config.isDefault().booleanValue()) {
providerConfigs.add(config);
}
}
if (providerConfigs.size() > 0) {
setProviders(providerConfigs);
}
} else {
ProviderConfig providerConfig = null;
for (ProviderConfig config : providerConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
if (providerConfig != null) {
throw new IllegalStateException("Duplicate provider configs: " + providerConfig + " and " + config);
}
providerConfig = config;
}
}
if (providerConfig != null) {
setProvider(providerConfig);
}
}
}
}
if (getApplication() == null
&& (getProvider() == null || getProvider().getApplication() == null)) {
Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false);
if (applicationConfigMap != null && applicationConfigMap.size() > 0) {
ApplicationConfig applicationConfig = null;
for (ApplicationConfig config : applicationConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
if (applicationConfig != null) {
throw new IllegalStateException("Duplicate application configs: " + applicationConfig + " and " + config);
}
applicationConfig = config;
}
}
if (applicationConfig != null) {
setApplication(applicationConfig);
}
}
}
if (getModule() == null
&& (getProvider() == null || getProvider().getModule() == null)) {
Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class, false, false);
if (moduleConfigMap != null && moduleConfigMap.size() > 0) {
ModuleConfig moduleConfig = null;
for (ModuleConfig config : moduleConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
if (moduleConfig != null) {
throw new IllegalStateException("Duplicate module configs: " + moduleConfig + " and " + config);
}
moduleConfig = config;
}
}
if (moduleConfig != null) {
setModule(moduleConfig);
}
}
}
if ((getRegistries() == null || getRegistries().size() == 0)
&& (getProvider() == null || getProvider().getRegistries() == null || getProvider().getRegistries().size() == 0)
&& (getApplication() == null || getApplication().getRegistries() == null || getApplication().getRegistries().size() == 0)) {
Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class, false, false);
if (registryConfigMap != null && registryConfigMap.size() > 0) {
List<RegistryConfig> registryConfigs = new ArrayList<RegistryConfig>();
for (RegistryConfig config : registryConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
registryConfigs.add(config);
}
}
if (registryConfigs != null && registryConfigs.size() > 0) {
super.setRegistries(registryConfigs);
}
}
}
if (getMonitor() == null
&& (getProvider() == null || getProvider().getMonitor() == null)
&& (getApplication() == null || getApplication().getMonitor() == null)) {
Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false);
if (monitorConfigMap != null && monitorConfigMap.size() > 0) {
MonitorConfig monitorConfig = null;
for (MonitorConfig config : monitorConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
if (monitorConfig != null) {
throw new IllegalStateException("Duplicate monitor configs: " + monitorConfig + " and " + config);
}
monitorConfig = config;
}
}
if (monitorConfig != null) {
setMonitor(monitorConfig);
}
}
}
if ((getProtocols() == null || getProtocols().size() == 0)
&& (getProvider() == null || getProvider().getProtocols() == null || getProvider().getProtocols().size() == 0)) {
Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false);
if (protocolConfigMap != null && protocolConfigMap.size() > 0) {
List<ProtocolConfig> protocolConfigs = new ArrayList<ProtocolConfig>();
for (ProtocolConfig config : protocolConfigMap.values()) {
if (config.isDefault() == null || config.isDefault().booleanValue()) {
protocolConfigs.add(config);
}
}
if (protocolConfigs != null && protocolConfigs.size() > 0) {
super.setProtocols(protocolConfigs);
}
}
}
if (getPath() == null || getPath().length() == 0) {
if (beanName != null && beanName.length() > 0
&& getInterface() != null && getInterface().length() > 0
&& beanName.startsWith(getInterface())) {
setPath(beanName);
}
}
if (! isDelay()) {
export();
}
}
從spring容器中獲取bean独榴,填充dubbo正常使用的屬性,例如
Provider,Application,Moudle等奕枝。填充完后棺榔,有一個(gè)重要的方法就是export()
。dubbo允許通過(guò)配置來(lái)進(jìn)行延遲暴露隘道,當(dāng)然一般情況下是同步的症歇,進(jìn)入了doExport()
,里面又是一通檢查,檢查完后谭梗,進(jìn)入了我們今天要著重開(kāi)始說(shuō)明的doExportUrls()
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
獲取到注冊(cè)中心的地址忘晤,是個(gè)列表,也就是說(shuō)可以有多個(gè)注冊(cè)中心,那么我們的注冊(cè)中心是在我們的配置文件中這個(gè)Bean指定的
@Bean
public RegistryConfig registry() {
RegistryConfig registryConfig = new RegistryConfig();
registryConfig.setAddress("127.0.0.1:2181");
registryConfig.setProtocol("zookeeper");
return registryConfig;
}
上面我們已經(jīng)找到了注冊(cè)中心的地址激捏,下面设塔,我們就是要把我們需要被暴露的服務(wù)的信息放到注冊(cè)中心上。
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName();
if (name == null || name.length() == 0) {
name = "dubbo";
}
String host = protocolConfig.getHost();
if (provider != null && (host == null || host.length() == 0)) {
host = provider.getHost();
}
boolean anyhost = false;
if (NetUtils.isInvalidLocalHost(host)) {
anyhost = true;
try {
host = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
logger.warn(e.getMessage(), e);
}
if (NetUtils.isInvalidLocalHost(host)) {
if (registryURLs != null && registryURLs.size() > 0) {
for (URL registryURL : registryURLs) {
try {
Socket socket = new Socket();
try {
SocketAddress addr = new InetSocketAddress(registryURL.getHost(), registryURL.getPort());
socket.connect(addr, 1000);
host = socket.getLocalAddress().getHostAddress();
break;
} finally {
try {
socket.close();
} catch (Throwable e) {}
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
}
if (NetUtils.isInvalidLocalHost(host)) {
host = NetUtils.getLocalHost();
}
}
}
Integer port = protocolConfig.getPort();
if (provider != null && (port == null || port == 0)) {
port = provider.getPort();
}
final int defaultPort = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort();
if (port == null || port == 0) {
port = defaultPort;
}
if (port == null || port <= 0) {
port = getRandomPort(name);
if (port == null || port < 0) {
port = NetUtils.getAvailablePort(defaultPort);
putRandomPort(name, port);
}
logger.warn("Use random available port(" + port + ") for protocol " + name);
}
Map<String, String> map = new HashMap<String, String>();
if (anyhost) {
map.put(Constants.ANYHOST_KEY, "true");
}
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, protocolConfig);
appendParameters(map, this);
if (methods != null && methods.size() > 0) {
for (MethodConfig method : methods) {
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
List<ArgumentConfig> arguments = method.getArguments();
if (arguments != null && arguments.size() > 0) {
for (ArgumentConfig argument : arguments) {
//類(lèi)型自動(dòng)轉(zhuǎn)換.
if(argument.getType() != null && argument.getType().length() >0){
Method[] methods = interfaceClass.getMethods();
//遍歷所有方法
if(methods != null && methods.length > 0){
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
//匹配方法名稱(chēng)远舅,獲取方法簽名.
if(methodName.equals(method.getName())){
Class<?>[] argtypes = methods[i].getParameterTypes();
//一個(gè)方法中單個(gè)callback
if (argument.getIndex() != -1 ){
if (argtypes[argument.getIndex()].getName().equals(argument.getType())){
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
}else {
throw new IllegalArgumentException("argument config error : the index attribute and type attirbute not match :index :"+argument.getIndex() + ", type:" + argument.getType());
}
} else {
//一個(gè)方法中多個(gè)callback
for (int j = 0 ;j<argtypes.length ;j++) {
Class<?> argclazz = argtypes[j];
if (argclazz.getName().equals(argument.getType())){
appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j){
throw new IllegalArgumentException("argument config error : the index attribute and type attirbute not match :index :"+argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
}else if(argument.getIndex() != -1){
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
}else {
throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}
}
}
} // end of methods for
}
if (generic) {
map.put("generic", String.valueOf(true));
map.put("methods", Constants.ANY_VALUE);
} else {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if(methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put("methods", Constants.ANY_VALUE);
}
else {
map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
if (! ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put("token", UUID.randomUUID().toString());
} else {
map.put("token", token);
}
}
if ("injvm".equals(protocolConfig.getName())) {
protocolConfig.setRegister(false);
map.put("notify", "false");
}
// 導(dǎo)出服務(wù)
String contextPath = protocolConfig.getContextpath();
if ((contextPath == null || contextPath.length() == 0) && provider != null) {
contextPath = provider.getContextpath();
}
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
String scope = url.getParameter(Constants.SCOPE_KEY);
//配置為none不暴露
if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
//配置不是remote的情況下做本地暴露 (配置為remote闰蛔,則表示只暴露遠(yuǎn)程服務(wù))
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
//如果配置不是local則暴露為遠(yuǎn)程服務(wù).(配置為local,則表示只暴露遠(yuǎn)程服務(wù))
if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && registryURLs.size() > 0
&& url.getParameter("register", true)) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
這個(gè)方法有點(diǎn)長(zhǎng)图柏,但是沒(méi)關(guān)系序六,我們一點(diǎn)點(diǎn)來(lái),首先思考一個(gè)問(wèn)題:如果給你了注冊(cè)中心的地址蚤吹,和要暴露的服務(wù)的信息例诀,你會(huì)怎么做這個(gè)暴露?如果是我裁着,我會(huì)這么做:暴露服務(wù)繁涂,首先要讓人知道的是你這個(gè)服務(wù)的ip和端口,這樣引用者才知道怎么連接二驰,連上服務(wù)器后扔罪,我們?cè)诳紤]調(diào)用哪個(gè)服務(wù)模塊的有哪些方法可以讓引用者調(diào)用,如果確定要調(diào)用哪個(gè)服務(wù)的哪個(gè)方法诸蚕,只有將這些信息都知道步势,引用才能知道怎么發(fā)起這個(gè)調(diào)用氧猬。所以我們?cè)俦┞斗?wù)的時(shí)候,IP坏瘩,端口盅抚,服務(wù)名稱(chēng),方法名稱(chēng)倔矾,是要放入到注冊(cè)中心的妄均。因?yàn)樽罱K服務(wù)的引用者是和注冊(cè)中心進(jìn)行交互的,獲取信息的哪自。那么確實(shí)dubbo也是這么做的丰包,它將所有的數(shù)據(jù)都放到自定義的URL的實(shí)例中,然后壤巷,將注冊(cè)中心的信息和URL的信息整合得到一起邑彪,到注冊(cè)中心進(jìn)行暴露服務(wù)。也就是下面這個(gè)關(guān)鍵的步驟
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
我們看下registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())
拼成的字符串經(jīng)過(guò)decode后是什么樣的
registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=log&dubbo=2.5.3&export=dubbo://192.168.1.102:20880/com.linyang.test.service.LogService?anyhost=true&application=log&default.proxy=javassist&default.retries=0&default.timeout=30000&default.version=LATEST&dubbo=2.5.3&interface=com.linyang.test.service.LogService&methods=modify,create&pid=4917&side=provider&threads=100×tamp=1525576255082&pid=4917®istry=zookeeper×tamp=1525575179509
剩下的就看會(huì)怎么利用這個(gè)字符串了胧华,利用ref這個(gè)接口的實(shí)現(xiàn)類(lèi)和接口類(lèi)型和這個(gè)URL創(chuàng)建了一個(gè)代理的invoker.根據(jù)上下文判斷是javasist類(lèi)型的ProxyFactory,他內(nèi)部創(chuàng)建了一個(gè)AbstractProxyInvoker類(lèi)型的類(lèi)的實(shí)例寄症,里面存儲(chǔ)了傳進(jìn)來(lái)的三個(gè)值。我們得到了這個(gè)invoker后矩动,就可以暴露他了有巧,暴露的對(duì)象是invoker,這是一個(gè)關(guān)鍵點(diǎn)
Exporter<?> exporter = protocol.export(invoker);
我們看到是協(xié)議protocal進(jìn)行暴露的悲没,這個(gè)協(xié)議最終調(diào)用的是registryProtocol篮迎。說(shuō)到這里大家有可能會(huì)有疑問(wèn),不是應(yīng)該是DubboProtocol嗎,其實(shí)不是這樣示姿,我們可以看下Protocol類(lèi)
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.Adaptive;
import com.alibaba.dubbo.common.extension.SPI;
/**
* Protocol. (API/SPI, Singleton, ThreadSafe)
*
* @author william.liangf
*/
@SPI("dubbo")
public interface Protocol {
/**
* 獲取缺省端口甜橱,當(dāng)用戶(hù)沒(méi)有配置端口時(shí)使用。
*
* @return 缺省端口
*/
int getDefaultPort();
/**
* 暴露遠(yuǎn)程服務(wù):<br>
* 1. 協(xié)議在接收請(qǐng)求時(shí)峻凫,應(yīng)記錄請(qǐng)求來(lái)源方地址信息:RpcContext.getContext().setRemoteAddress();<br>
* 2. export()必須是冪等的渗鬼,也就是暴露同一個(gè)URL的Invoker兩次,和暴露一次沒(méi)有區(qū)別荧琼。<br>
* 3. export()傳入的Invoker由框架實(shí)現(xiàn)并傳入,協(xié)議不需要關(guān)心差牛。<br>
*
* @param <T> 服務(wù)的類(lèi)型
* @param invoker 服務(wù)的執(zhí)行體
* @return exporter 暴露服務(wù)的引用命锄,用于取消暴露
* @throws RpcException 當(dāng)暴露服務(wù)出錯(cuò)時(shí)拋出,比如端口已占用
*/
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
/**
* 引用遠(yuǎn)程服務(wù):<br>
* 1. 當(dāng)用戶(hù)調(diào)用refer()所返回的Invoker對(duì)象的invoke()方法時(shí)偏化,協(xié)議需相應(yīng)執(zhí)行同URL遠(yuǎn)端export()傳入的Invoker對(duì)象的invoke()方法脐恩。<br>
* 2. refer()返回的Invoker由協(xié)議實(shí)現(xiàn),協(xié)議通常需要在此Invoker中發(fā)送遠(yuǎn)程請(qǐng)求侦讨。<br>
* 3. 當(dāng)url中有設(shè)置check=false時(shí)驶冒,連接失敗不能拋出異常苟翻,并內(nèi)部自動(dòng)恢復(fù)。<br>
*
* @param <T> 服務(wù)的類(lèi)型
* @param type 服務(wù)的類(lèi)型
* @param url 遠(yuǎn)程服務(wù)的URL地址
* @return invoker 服務(wù)的本地代理
* @throws RpcException 當(dāng)連接服務(wù)提供方失敗時(shí)拋出
*/
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
/**
* 釋放協(xié)議:<br>
* 1. 取消該協(xié)議所有已經(jīng)暴露和引用的服務(wù)骗污。<br>
* 2. 釋放協(xié)議所占用的所有資源崇猫,比如連接和端口。<br>
* 3. 協(xié)議在釋放后需忿,依然能暴露和引用新的服務(wù)诅炉。<br>
*/
void destroy();
}
在export方法上有@Adaptive注解,這個(gè)注解寫(xiě)在方法上屋厘,有個(gè)作用涕烧,可以根據(jù)傳入的URL來(lái)指定需要的協(xié)議,上面的URL中指名了是registry汗洒,所以他使用的應(yīng)該是registryProtocol協(xié)議议纯,那么我們想的那個(gè)DubboProtocol是在什么地方呢?不用急溢谤,是在RegistryProtocal里面做的痹扇。我們繼續(xù)說(shuō)。但是是不是只調(diào)了registryProtocol?當(dāng)時(shí)不是溯香,還記得在@adaptive那個(gè)注解的時(shí)候說(shuō)掃描鲫构,有提到warpper類(lèi),就是將協(xié)議進(jìn)行包裹玫坛,所以這個(gè)protocal是融合了ProtocolFilterWrapper和ProtocolListenerWrapper和DubboProtocal三者结笨,先執(zhí)行
ProtocolFilterWrapper和ProtocolListenerWrapper,他們針對(duì)registry協(xié)議沒(méi)做啥湿镀,直接進(jìn)行下一步炕吸,進(jìn)入了RegistryProtocol的export
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//獲得注冊(cè)中心
final Registry registry = getRegistry(originInvoker);
//獲得要注冊(cè)的鏈接,也就是真正的要暴露的服務(wù)
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
//想注冊(cè)中心注冊(cè)
registry.register(registedProviderUrl);
// 訂閱override數(shù)據(jù)
// FIXME 提供者訂閱時(shí)勉痴,會(huì)影響同一JVM即暴露服務(wù)赫模,又引用同一服務(wù)的的場(chǎng)景,因?yàn)閟ubscribed以服務(wù)名為緩存的key蒸矛,導(dǎo)致訂閱信息覆蓋瀑罗。
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//保證每次export都返回一個(gè)新的exporter實(shí)例
return new Exporter<T>() {
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
public void unexport() {
try {
exporter.unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
registry.unregister(registedProviderUrl);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
overrideListeners.remove(overrideSubscribeUrl);
registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
};
}
注冊(cè)協(xié)議做的事情很簡(jiǎn)單先進(jìn)行本地暴露,獲取響應(yīng)的注冊(cè)中心雏掠,想注冊(cè)中心注冊(cè)要暴露的服務(wù)斩祭,設(shè)置訂閱,注冊(cè)中心信息發(fā)生改變后會(huì)通知服務(wù)乡话,進(jìn)行數(shù)據(jù)的更新摧玫。說(shuō)起來(lái)很容易,但是其實(shí)里面封裝了大量的內(nèi)容绑青。首先我們來(lái)看下本地暴露的方法doLocalExport
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker){
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return (ExporterChangeableWrapper<T>) exporter;
}
延續(xù)了以往的Dubbo風(fēng)格诬像,使用了大量的本地緩存屋群。根據(jù)傳入的原始的invoker,獲取到cacheKey,從本地緩存bounds中獲取,獲取不到坏挠,就開(kāi)始了創(chuàng)建的過(guò)程芍躏。首先創(chuàng)建了一個(gè)包賺對(duì)象invokerDelegete,包含了原始的invoker和providerUrl,我們可以簡(jiǎn)單看下這個(gè)providerUrl.getProviderUrl(originInvoker)
結(jié)果是
dubbo://192.168.1.102:20880/com.linyang.test.service.LogService?anyhost=true&application=log&default.proxy=javassist&default.retries=0&default.timeout=30000&default.version=LATEST&dubbo=2.5.3&interface=com.linyang.test.service.LogService&methods=modify,create&pid=19026&side=provider&threads=100×tamp=1525594853055
看一下這個(gè)provider的鏈接提供的dubbo協(xié)議癞揉,所以纸肉,當(dāng)我們本地暴露的時(shí)候(Exporter<T>)protocol.export(invokerDelegete)
會(huì)發(fā)生什么?會(huì)真正調(diào)用的是DubboProtocol喊熟,當(dāng)然我們不能忘了ProtocolFilterWrapper和ProtocolListenerWrapper柏肪。OK,那我們?cè)俅慰纯此麄兊降鬃隽耸裁唇媾疲葋?lái)看看ProtocolFilterWrapper
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
這個(gè)filter處理的時(shí)候建立了一條調(diào)用鏈InvokerChain
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (filters.size() > 0) {
for (int i = filters.size() - 1; i >= 0; i --) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
public Class<T> getInterface() {
return invoker.getInterface();
}
public URL getUrl() {
return invoker.getUrl();
}
public boolean isAvailable() {
return invoker.isAvailable();
}
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
從SPI中獲取激活的Filter類(lèi)的實(shí)例烦味,在@activite的那一節(jié)也講過(guò),他的主要用法是在Filter上壁拉,其實(shí)就是說(shuō)的這里谬俄,然后將他們變成鏈?zhǔn)浇Y(jié)構(gòu),保證他們?cè)僬{(diào)用的時(shí)候弃理,一個(gè)接著一個(gè)溃论,當(dāng)然這是常用的filter的使用模式《徊看完了ProtocolFilterWrapper钥勋,我們?cè)倏聪滤南乱粋€(gè)ProtocolListenerWrapper。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return new ListenerExporterWrapper<T>(protocol.export(invoker),
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
}
他做的就是講Exporter包裝成ListenerExporterWrapper的實(shí)例辆苔,他是原來(lái)的exporter和從spi擴(kuò)展點(diǎn)中獲取的ExporterListener的實(shí)例組成算灸。下面到了我們的有一個(gè)重頭戲:DubboProtocal的export的地方
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispaching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice){
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){
if (logger.isWarnEnabled()){
logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url);
return exporter;
}
每次創(chuàng)建的時(shí)候都創(chuàng)建一個(gè)新的DubboExporter,并將其返回上層使用驻啤。到目前為止菲驴,我們還沒(méi)有提到本地起服務(wù),因?yàn)槟阆胱屍渌秸{(diào)用到自己骑冗,肯定是要開(kāi)一個(gè)socket赊瞬,但是目前我們還沒(méi)有看到,但是不用急沐旨,他的剩下的內(nèi)容都包含在openServer(url)
根據(jù)providerUrl進(jìn)行開(kāi)啟一個(gè)server
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client 也可以暴露一個(gè)只有server可以調(diào)用的服務(wù)森逮。
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
} else {
//server支持reset,配合override功能使用
server.reset(url);
}
}
}
直擊重點(diǎn)createServer(url)
private ExchangeServer createServer(URL url) {
//默認(rèn)開(kāi)啟server關(guān)閉時(shí)發(fā)送readonly事件
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
//默認(rèn)開(kāi)啟heartbeat
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
我們看到了Exchangers.bind(url, requestHandler)
Dubbo的Exchanger層,他具體的處理邏輯
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);
}
Exchanger的具體實(shí)現(xiàn)類(lèi)是HeaderExchanger磁携,所以調(diào)用他的bind方法
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
出現(xiàn)了Dubbo的Transporter層,他的bind方法和exchange差不多良风,找具體的Transporter進(jìn)行bind
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);
}
最后發(fā)現(xiàn)使用的是NettyTransporter谊迄,Netty大家滅有用過(guò)也應(yīng)該聽(tīng)說(shuō)過(guò)闷供,他是現(xiàn)在最為流行的Nio網(wǎng)絡(luò)框架。我們繼續(xù)
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
剩下的就是Netty的處理了统诺,起一個(gè)Server歪脏,開(kāi)啟端口等著調(diào)用者連進(jìn)來(lái)。OK這是DubboProtocol的export的過(guò)程粮呢。我們回到上面RegisterProtocol的部分
exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker);
經(jīng)過(guò)DubboProtocol暴露后得到一個(gè)具體的Exporter,將這個(gè)exporter和原始invoker封裝到了ExporterChangeableWrapper里面婿失,進(jìn)行返回。好了這就是export的全過(guò)程啄寡。
測(cè)試源碼
預(yù)告豪硅,看這里
下一篇: Dubbo 服務(wù)引用詳解
END