通過查看LongPollingClient的構(gòu)造方法,我們看到盗棵,只有一個地方構(gòu)造了該類
//org.dromara.soul.admin.listener.http.HttpLongPollingDataChangedListener#doLongPolling
public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {
// 比較當(dāng)前的請求是否有g(shù)roup配置發(fā)生變化
List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);
String clientIp = getRemoteIp(request);
// 如果變化的group不為空壮韭,則立即返回結(jié)果
if (CollectionUtils.isNotEmpty(changedGroup)) {
this.generateResponse(response, changedGroup);
log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);
return;
}
// 開啟異步處理
final AsyncContext asyncContext = request.startAsync();
// AsyncContext.settimeout() does not timeout properly, so you have to control it yourself
//這里關(guān)閉了timeout,所以我們需要自己控制好
asyncContext.setTimeout(0L);
// 這里便是創(chuàng)建LongPollingClient纹因,即承接上篇文章的構(gòu)造部分喷屋,并交給scheduler處理
scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
}
這里看出,是接受了一個請求并瞭恰,判斷是否需要立刻返回變化結(jié)果屯曹,如果沒有立刻發(fā)現(xiàn)有配置變化,則會走異步處理
我們在看下這個方法在哪里調(diào)用惊畏。發(fā)現(xiàn)是在ConfigController
public class ConfigController {
@Resource
private HttpLongPollingDataChangedListener longPollingListener;
/**
* Fetch configs soul result.
*
* @param groupKeys the group keys
* @return the soul result
*/
@GetMapping("/fetch")
public SoulAdminResult fetchConfigs(@NotNull final String[] groupKeys) {
Map<String, ConfigData<?>> result = Maps.newHashMap();
for (String groupKey : groupKeys) {
ConfigData<?> data = longPollingListener.fetchConfig(ConfigGroupEnum.valueOf(groupKey));
result.put(groupKey, data);
}
return SoulAdminResult.success(SoulResultMessage.SUCCESS, result);
}
/**
* Listener.
*
* @param request the request
* @param response the response
*/
@PostMapping(value = "/listener")
public void listener(final HttpServletRequest request, final HttpServletResponse response) {
longPollingListener.doLongPolling(request, response);
}
}
這里我們看到恶耽,提供了兩種方式,一種是主動拉的方式颜启,fetch偷俭。
fetch方式主要實現(xiàn):就是拉取對應(yīng)group的配置信息
public ConfigData<?> fetchConfig(final ConfigGroupEnum groupKey) {
ConfigDataCache config = CACHE.get(groupKey.name());
switch (groupKey) {
case APP_AUTH:
List<AppAuthData> appAuthList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<AppAuthData>>() {
}.getType());
return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), appAuthList);
case PLUGIN:
List<PluginData> pluginList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<PluginData>>() {
}.getType());
return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), pluginList);
case RULE:
List<RuleData> ruleList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<RuleData>>() {
}.getType());
return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), ruleList);
case SELECTOR:
List<SelectorData> selectorList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<SelectorData>>() {
}.getType());
return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), selectorList);
case META_DATA:
List<MetaData> metaList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<MetaData>>() {
}.getType());
return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), metaList);
default:
throw new IllegalStateException("Unexpected groupKey: " + groupKey);
}
}
另一種是監(jiān)聽,就是org.dromara.soul.admin.listener.http.HttpLongPollingDataChangedListener#doLongPolling的調(diào)用方农曲。
我們在看下這里分別是哪里在調(diào)用
先看下fetch社搅。通過查找controller的http全局搜索,找到是在包soul-sync-data-http中的HttpSyncDataService乳规,這個類是由HttpSyncDataConfiguration來注入到spring容器中的形葬,這里用到了一個spring的特性,ObjectProvider
//org.dromara.soul.spring.boot.starter.sync.data.http.HttpSyncDataConfiguration
@Configuration
@ConditionalOnClass(HttpSyncDataService.class)
@ConditionalOnProperty(prefix = "soul.sync.http", name = "url")
@Slf4j
public class HttpSyncDataConfiguration {
@Bean
public SyncDataService httpSyncDataService(final ObjectProvider<HttpConfig> httpConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
log.info("you use http long pull sync soul data");
return new HttpSyncDataService(Objects.requireNonNull(httpConfig.getIfAvailable()), Objects.requireNonNull(pluginSubscriber.getIfAvailable()),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
/**
* Http config http config.
*
* @return the http config
*/
@Bean
@ConfigurationProperties(prefix = "soul.sync.http")
public HttpConfig httpConfig() {
return new HttpConfig();
}
}
HttpSyncDataConfiguration 注入了HttpConfig和SyncDataService暮的,但是如果想構(gòu)造這兩個對象笙以,有一個條件是@ConditionalOnProperty(prefix = "soul.sync.http", name = "url")需要有soul.sync.http配置存在
Soul的很多配置,都設(shè)置了條件猖腕,需要滿足條件才可以注入相關(guān)的類拆祈,這樣也大大減少了spring中無用類的管理
如果網(wǎng)關(guān)想要開啟http長輪訓(xùn)數(shù)據(jù)同步方式,只需要配置下soul.sync.http倘感,url配置admin的地址即可.http://localhost:9095
我們再說回HttpSyncDataService.
//org.dromara.soul.sync.data.http.HttpSyncDataService
@Slf4j
public class HttpSyncDataService implements SyncDataService, AutoCloseable {
private static final AtomicBoolean RUNNING = new AtomicBoolean(false);
private static final Gson GSON = new Gson();
/**
* default: 10s.
*/
private Duration connectionTimeout = Duration.ofSeconds(10);
/**
* only use for http long polling.
*/
private RestTemplate httpClient;
private ExecutorService executor;
private HttpConfig httpConfig;
//soul-admin的http的server列表
private List<String> serverList;
//數(shù)據(jù)刷新工廠
private DataRefreshFactory factory;
public HttpSyncDataService(final HttpConfig httpConfig, final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
this.factory = new DataRefreshFactory(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
this.httpConfig = httpConfig;
this.serverList = Lists.newArrayList(Splitter.on(",").split(httpConfig.getUrl()));
this.httpClient = createRestTemplate();
this.start();
}
private RestTemplate createRestTemplate() {
OkHttp3ClientHttpRequestFactory factory = new OkHttp3ClientHttpRequestFactory();
factory.setConnectTimeout((int) this.connectionTimeout.toMillis());
factory.setReadTimeout((int) HttpConstants.CLIENT_POLLING_READ_TIMEOUT);
return new RestTemplate(factory);
}
}
當(dāng)構(gòu)造器調(diào)用后放坏,初始化屬性后,調(diào)用start
//org.dromara.soul.sync.data.http.HttpSyncDataService#start
private void start() {
// 保證只初始化一次
if (RUNNING.compareAndSet(false, true)) {
// 1.1 fetch所有的配置
this.fetchGroupConfig(ConfigGroupEnum.values());
int threadSize = serverList.size();
this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
SoulThreadFactory.create("http-long-polling", true));
// 1.2 用定時線程池老玛,每60秒執(zhí)行一次http長輪訓(xùn)任務(wù)
this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
} else {
log.info("soul http long polling was started, executor=[{}]", executor);
}
}
1.1 我們先看下fetchGroupConfig是如何實現(xiàn)的
//org.dromara.soul.sync.data.http.HttpSyncDataService#fetchGroupConfig
//依次從server端fetch數(shù)據(jù)淤年,只要有一個服務(wù)fetch成功,即可跳出循環(huán)蜡豹,如果fetch最終都失敗了麸粮,拋出異常
private void fetchGroupConfig(final ConfigGroupEnum... groups) throws SoulException {
for (int index = 0; index < this.serverList.size(); index++) {
String server = serverList.get(index);
try {
//1.1.1
this.doFetchGroupConfig(server, groups);
break;
} catch (SoulException e) {
// no available server, throw exception.
if (index >= serverList.size() - 1) {
throw e;
}
log.warn("fetch config fail, try another one: {}", serverList.get(index + 1));
}
}
}
1.1.1 這里終于找到了/configs/fetch的調(diào)用的地方
//org.dromara.soul.sync.data.http.HttpSyncDataService#doFetchGroupConfig
private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
StringBuilder params = new StringBuilder();
for (ConfigGroupEnum groupKey : groups) {
params.append("groupKeys").append("=").append(groupKey.name()).append("&");
}
String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");
log.info("request configs: [{}]", url);
String json = null;
try {
//通過httpClient調(diào)用獲取到拿到的json
json = this.httpClient.getForObject(url, String.class);
} catch (RestClientException e) {
String message = String.format("fetch config fail from server[%s], %s", url, e.getMessage());
log.warn(message);
throw new SoulException(message, e);
}
// 1.1.1.1 更新本地緩存
boolean updated = this.updateCacheWithJson(json);
if (updated) {
log.info("get latest configs: [{}]", json);
return;
}
// not updated. it is likely that the current config server has not been updated yet. wait a moment.
log.info("The config of the server[{}] has not been updated or is out of date. Wait for 30s to listen for changes again.", server);
ThreadUtils.sleep(TimeUnit.SECONDS, 30);
}
1.1.1.1 通過DataRefreshFactory factory執(zhí)行更新邏輯
//org.dromara.soul.sync.data.http.HttpSyncDataService#updateCacheWithJson
private boolean updateCacheWithJson(final String json) {
JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);
JsonObject data = jsonObject.getAsJsonObject("data");
// if the config cache will be updated?
return factory.executor(data);
}
DataRefreshFactory 是HttpSyncDataService之前構(gòu)造器構(gòu)造的一個數(shù)據(jù)更新工廠,里面封裝了不同種數(shù)據(jù)的更新策略镜廉,這里用到了工廠方法的設(shè)計模式
//org.dromara.soul.sync.data.http.refresh.DataRefreshFactory
public final class DataRefreshFactory {
private static final EnumMap<ConfigGroupEnum, DataRefresh> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);
/**
* 工廠 內(nèi)部 封裝了一個 ConfigGroupEnum, DataRefresh 對應(yīng)關(guān)系弄诲,針對不同的ConfigGroup,并使用不同的數(shù)據(jù)監(jiān)聽方法
*/
public DataRefreshFactory(final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers) {
ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataRefresh(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataRefresh(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataRefresh(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AppAuthDataRefresh(authDataSubscribers));
ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataRefresh(metaDataSubscribers));
}
/**
* 對所有的數(shù)據(jù)做刷新處理
*/
public boolean executor(final JsonObject data) {
final boolean[] success = {false};
ENUM_MAP.values().parallelStream().forEach(dataRefresh -> success[0] = dataRefresh.refresh(data));
return success[0];
}
}
DataRefresh是一個接口娇唯,定義了數(shù)據(jù)刷新的行為
//org.dromara.soul.sync.data.http.refresh.DataRefresh
public interface DataRefresh {
/**
* 刷新數(shù)據(jù)并返回是否刷新成功
*/
Boolean refresh(JsonObject data);
/**
* 拿到緩存中的配置信息
*/
ConfigData<?> cacheConfigData();
}
看到DataRefresh有一個抽象實現(xiàn)齐遵,以及針對不同的數(shù)據(jù)的不同實現(xiàn)
AbstractDataRefresh定義了一些通用模版,并且定義了一個泛型视乐,因為之前我們傳入的是一個JsonObject洛搀,這里通過泛型,再將泛型轉(zhuǎn)化為對應(yīng)的類佑淀,這里技巧值得學(xué)習(xí)留美,比如refresh
// org.dromara.soul.sync.data.http.refresh.AbstractDataRefresh
/**
*/
protected abstract void refresh(List<T> data);
/**
這里將 JsonObject轉(zhuǎn)化為對應(yīng)的ConfigData<T>抽象起來,讓每個子類再去實現(xiàn)
*/
@Override
public Boolean refresh(final JsonObject data) {
boolean updated = false;
//轉(zhuǎn)換JsonObject
JsonObject jsonObject = convert(data);
if (null != jsonObject) {
//將JsonObject轉(zhuǎn)換為ConfigData
ConfigData<T> result = fromJson(jsonObject);
//是否需要更新緩存
if (this.updateCacheIfNeed(result)) {
updated = true;
//刷新緩存
refresh(result.getData());
}
}
return updated;
}
我們挑一個PluginDataRefresh看下具體的邏輯
//org.dromara.soul.sync.data.http.refresh.PluginDataRefresh
//通過調(diào)用父類的updateCacheIfNeed來判斷是否需要更新
@Override
protected boolean updateCacheIfNeed(final ConfigData<PluginData> result) {
return updateCacheIfNeed(result, ConfigGroupEnum.PLUGIN);
}
//org.dromara.soul.sync.data.http.refresh.AbstractDataRefresh#updateCacheIfNeed(org.dromara.soul.common.dto.ConfigData<T>, org.dromara.soul.common.enums.ConfigGroupEnum)
protected boolean updateCacheIfNeed(final ConfigData<T> newVal, final ConfigGroupEnum groupEnum) {
// 第一次初始化
if (GROUP_CACHE.putIfAbsent(groupEnum, newVal) == null) {
return true;
}
ResultHolder holder = new ResultHolder(false);
GROUP_CACHE.merge(groupEnum, newVal, (oldVal, value) -> {
// 比對md5以及更新時間伸刃,如果新的數(shù)據(jù)即更新到GROUP_CACHE中谎砾,否則保留原來的值
if (!StringUtils.equals(oldVal.getMd5(), newVal.getMd5()) && oldVal.getLastModifyTime() < newVal.getLastModifyTime()) {
log.info("update {} config: {}", groupEnum, newVal);
holder.result = true;
return newVal;
}
log.info("Get the same config, the [{}] config cache will not be updated, md5:{}", groupEnum, oldVal.getMd5());
return oldVal;
});
//返回 更新結(jié)果,這里的Holder實際上就是為了能夠拿到merge里面的更新狀態(tài)的一個持有類而已
return holder.result;
}
如果我們發(fā)現(xiàn)需要更新捧颅,并更新了本地緩存數(shù)據(jù)景图,則進行refresh
//org.dromara.soul.sync.data.http.refresh.PluginDataRefresh#refresh
@Override
protected void refresh(final List<PluginData> data) {
//如果data是空,代表是要清空緩存碉哑,調(diào)用refreshPluginDataAll
if (CollectionUtils.isEmpty(data)) {
log.info("clear all plugin data cache");
pluginDataSubscriber.refreshPluginDataAll();
} else {
//這里存疑挚币,為何要在這里還要執(zhí)行一次refreshAll
pluginDataSubscriber.refreshPluginDataAll();
//對每一個數(shù)據(jù)通過pluginDataSubscriber做處理
data.forEach(pluginDataSubscriber::onSubscribe);
}
}
我們看下PluginDataSubscriber,它里面有很多的默認(rèn)方法實現(xiàn),只有一個實現(xiàn)類CommonPluginDataSubscriber扣典,我們重點看下CommonPluginDataSubscriber的refreshPluginDataAll和onSubscribe
//org.dromara.soul.plugin.base.cache.CommonPluginDataSubscriber
//這里調(diào)用了
@Override
public void refreshPluginDataAll() {
BaseDataCache.getInstance().cleanPluginData();
}
@Override
public void onSubscribe(final PluginData pluginData) {
subscribeDataHandler(pluginData, DataEventTypeEnum.UPDATE);
}
/**
這里就是根據(jù)不同的事件和data類型做不同的處理
*/
private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
Optional.ofNullable(classData).ifPresent(data -> {
if (data instanceof PluginData) {
PluginData pluginData = (PluginData) data;
if (dataType == DataEventTypeEnum.UPDATE) {
//緩存數(shù)據(jù)
BaseDataCache.getInstance().cachePluginData(pluginData);
//對要處理的pluginData對應(yīng)的數(shù)據(jù)進行相關(guān)處理
Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));
} else if (dataType == DataEventTypeEnum.DELETE) {
BaseDataCache.getInstance().removePluginData(pluginData);
Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));
}
} else if (data instanceof SelectorData) {
SelectorData selectorData = (SelectorData) data;
if (dataType == DataEventTypeEnum.UPDATE) {
BaseDataCache.getInstance().cacheSelectData(selectorData);
Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
} else if (dataType == DataEventTypeEnum.DELETE) {
BaseDataCache.getInstance().removeSelectData(selectorData);
Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));
}
} else if (data instanceof RuleData) {
RuleData ruleData = (RuleData) data;
if (dataType == DataEventTypeEnum.UPDATE) {
BaseDataCache.getInstance().cacheRuleData(ruleData);
Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));
} else if (dataType == DataEventTypeEnum.DELETE) {
BaseDataCache.getInstance().removeRuleData(ruleData);
Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData));
}
}
});
}
我們在看下BaseDataCache是什么妆毕,它用到了單例模式,里面持有不同的緩存數(shù)據(jù)贮尖,將操作封裝了一下
//org.dromara.soul.plugin.base.cache.BaseDataCache
/**
* pluginName -> PluginData.
*/
private static final ConcurrentMap<String, PluginData> PLUGIN_MAP = Maps.newConcurrentMap();
/**
* pluginName -> SelectorData.
*/
private static final ConcurrentMap<String, List<SelectorData>> SELECTOR_MAP = Maps.newConcurrentMap();
/**
* selectorId -> RuleData.
*/
private static final ConcurrentMap<String, List<RuleData>> RULE_MAP = Maps.newConcurrentMap();
/**
* Clean plugin data.
*/
public void cleanPluginData() {
PLUGIN_MAP.clear();
}
/**
* Cache plugin data.
*
* @param pluginData the plugin data
*/
public void cachePluginData(final PluginData pluginData) {
Optional.ofNullable(pluginData).ifPresent(data -> PLUGIN_MAP.put(data.getName(), data));
}
剛才看到我們數(shù)據(jù)刷新緩存后還有一個handler處理笛粘,handlerMap是CommonPluginDataSubscriber在構(gòu)造器構(gòu)造的一個Map
//org.dromara.soul.plugin.base.cache.CommonPluginDataSubscriber
private final Map<String, PluginDataHandler> handlerMap;
/**
* Instantiates a new Common plugin data subscriber.
*
* @param pluginDataHandlerList the plugin data handler list
*/
public CommonPluginDataSubscriber(final List<PluginDataHandler> pluginDataHandlerList) {
this.handlerMap = pluginDataHandlerList.stream().collect(Collectors.toConcurrentMap(PluginDataHandler::pluginNamed, e -> e));
}
我們在看下PluginDataHandler做了什么,PluginDataHandler是一個接口,這里每一個插件都有一個對應(yīng)的實現(xiàn)類,我們看到了熟悉的身影薪前,Divide插件润努,Sofa插件,Dubbo插件等等
因為我們現(xiàn)在看的是PluginData的處理邏輯所以我們看下handlerPlugin的其中一個實現(xiàn)示括,ApacheDubboPluginDataHandler
//org.dromara.soul.plugin.apache.dubbo.handler.ApacheDubboPluginDataHandler
public class ApacheDubboPluginDataHandler implements PluginDataHandler {
@Override
public void handlerPlugin(final PluginData pluginData) {
//判斷是否開啟
if (null != pluginData && pluginData.getEnabled()) {
//拿到dubbo的注冊信息
DubboRegisterConfig dubboRegisterConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(), DubboRegisterConfig.class);
//Singleton.INST是一個單例模式铺浇,可以自己看下,很簡單
DubboRegisterConfig exist = Singleton.INST.get(DubboRegisterConfig.class);
//如果注冊信息不存在直接返回
if (Objects.isNull(dubboRegisterConfig)) {
return;
}
//當(dāng)前exit沒有垛膝,或者不想等
if (Objects.isNull(exist) || !dubboRegisterConfig.equals(exist)) {
// 初始化新的配置随抠,廢棄當(dāng)前緩存
ApplicationConfigCache.getInstance().init(dubboRegisterConfig);
ApplicationConfigCache.getInstance().invalidateAll();
}
Singleton.INST.single(DubboRegisterConfig.class, dubboRegisterConfig);
}
}
@Override
public String pluginNamed() {
return PluginEnum.DUBBO.getName();
}
}
這里主要講一下數(shù)據(jù)更新部分,對于插件部分的詳細內(nèi)容之后單拉一個插件系列再講
到這里我們捋清楚了數(shù)據(jù)fetch的流程繁涂。