上一節(jié)講了數(shù)據(jù)持久化后拂檩,發(fā)送事件后,Spring監(jiān)聽到事件后死遭,做了什么事,并看到現(xiàn)有四種數(shù)據(jù)同步機制凯旋。這節(jié)具體加一下http長輪訓
org.dromara.soul.admin.listener.http.HttpLongPollingDataChangedListener http長輪訓數(shù)據(jù)監(jiān)聽器
先看下構造器:在構造器中呀潭,構造了一個1024長度的阻塞隊列钉迷,以及一個ScheduledThreadPoolExecutor,并初始化HttpSyncProperties钠署,
/**
* Blocked client.
*/
private final BlockingQueue<LongPollingClient> clients;
private final ScheduledExecutorService scheduler;
private final HttpSyncProperties httpSyncProperties;
/**
* Instantiates a new Http long polling data changed listener.
* @param httpSyncProperties the HttpSyncProperties
*/
public HttpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {
this.clients = new ArrayBlockingQueue<>(1024);
this.scheduler = new ScheduledThreadPoolExecutor(1,
SoulThreadFactory.create("long-polling", true));
this.httpSyncProperties = httpSyncProperties;
}
HttpSyncProperties主要是http同步的配置
@Getter
@Setter
@ConfigurationProperties(prefix = "soul.sync.http")
public class HttpSyncProperties {
/**
* Whether enabled http sync strategy, default: true.
*/
private boolean enabled = true;
/**
* Periodically refresh the config data interval from the database, default: 5 minutes.
*/
private Duration refreshInterval = Duration.ofMinutes(5);
}
主要定義了http同步開關以及刷新周期糠聪。 類初始化之后,更新各種數(shù)據(jù)緩存,然后執(zhí)行了一個定時任務谐鼎,每次調用refreshLocalCache刷新本地緩存
@Override
public final void afterPropertiesSet() {
updateAppAuthCache();
updatePluginCache();
updateRuleCache();
updateSelectorCache();
updateMetaDataCache();
afterInitialize();
}
@Override
protected void afterInitialize() {
long syncInterval = httpSyncProperties.getRefreshInterval().toMillis();
// Periodically check the data for changes and update the cache
scheduler.scheduleWithFixedDelay(() -> {
log.info("http sync strategy refresh config start.");
try {
this.refreshLocalCache();
log.info("http sync strategy refresh config success.");
} catch (Exception e) {
log.error("http sync strategy refresh config error!", e);
}
}, syncInterval, syncInterval, TimeUnit.MILLISECONDS);
log.info("http sync strategy refresh interval: {}ms", syncInterval);
}
private void refreshLocalCache() {
this.updateAppAuthCache();
this.updatePluginCache();
this.updateRuleCache();
this.updateSelectorCache();
this.updateMetaDataCache();
}
這些方法要做的事情都很類似舰蟆,就是從數(shù)據(jù)庫拿到對應ConfigGroup的所有配置,并更新本地緩存狸棍,比如看updateAppAuthCache身害,就是將當前數(shù)據(jù)庫的配置更新到本地緩存,至于具體為什么要更新到本地緩存草戈,我們后面分曉塌鸯。
//org.dromara.soul.admin.listener.AbstractDataChangedListener
protected static final ConcurrentMap<String, ConfigDataCache> CACHE = new ConcurrentHashMap<>();
/**
* Update app auth cache.
*/
protected void updateAppAuthCache() {
this.updateCache(ConfigGroupEnum.APP_AUTH, appAuthService.listAll());
}
protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {
String json = GsonUtils.getInstance().toJson(data);
ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());
ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);
log.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);
}
之前我們看到,當數(shù)據(jù)事件變化監(jiān)聽器分發(fā)者唐片,監(jiān)聽到事件后丙猬,會調用各個監(jiān)聽器的對應方法:
//org.dromara.soul.admin.listener.DataChangedEventDispatcher
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
for (DataChangedListener listener : listeners) {
switch (event.getGroupKey()) {
case APP_AUTH:
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
case PLUGIN:
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE:
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR:
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA:
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
default:
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}
那么在長輪訓機制下,主要做了如下事情费韭,還拿AppAuth看下
@Override
public void onAppAuthChanged(final List<AppAuthData> changed, final DataEventTypeEnum eventType) {
if (CollectionUtils.isEmpty(changed)) {
return;
}
this.updateAppAuthCache();
this.afterAppAuthChanged(changed, eventType);
}
接收到變更數(shù)據(jù)后茧球,會先更新下對應的內存緩存,然后再做數(shù)據(jù)變更星持。
//org.dromara.soul.admin.listener.http.HttpLongPollingDataChangedListener#afterAppAuthChanged
@Override
protected void afterAppAuthChanged(final List<AppAuthData> changed, final DataEventTypeEnum eventType) {
scheduler.execute(new DataChangeTask(ConfigGroupEnum.APP_AUTH));
}
這里看到抢埋,通過線程池執(zhí)行一個數(shù)據(jù)變化任務
/**
* When a group's data changes, the thread is created to notify the client asynchronously.
*/
class DataChangeTask implements Runnable {
/**
* The Group where the data has changed.
*/
private final ConfigGroupEnum groupKey;
/**
* The Change time.
*/
private final long changeTime = System.currentTimeMillis();
/**
* Instantiates a new Data change task.
*
* @param groupKey the group key
*/
DataChangeTask(final ConfigGroupEnum groupKey) {
this.groupKey = groupKey;
}
@Override
public void run() {
//循環(huán)所有的LongPollingClient,并調用了sendResponse
for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {
LongPollingClient client = iter.next();
iter.remove();
client.sendResponse(Collections.singletonList(groupKey));
log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);
}
}
}
我們先來看下LongPollingClient是個什么東東钉汗,它主要有以下幾個屬性,一個異步的上下文羹令,ip,超時時間和異步結果Future损痰。LongPollingClient本身實現(xiàn)了一個Runnable接口
class LongPollingClient implements Runnable {
/**
* The Async context.
*/
private final AsyncContext asyncContext;
/**
* The Ip.
*/
private final String ip;
/**
* The Timeout time.
*/
private final long timeoutTime;
/**
* The Async timeout future.
*/
private Future<?> asyncTimeoutFuture;
我們再來看下run方法:這方法較難看懂福侈。
@Override
public void run() {
//通過org.dromara.soul.admin.listener.http.HttpLongPollingDataChangedListener的ScheduledExecutorService scheduler延遲執(zhí)行一個一次性的動作,延遲時間是timeoutTime毫秒卢未,當延遲動作開始執(zhí)行時肪凛,將當前的LongPollingClient對象從clients中移除
this.asyncTimeoutFuture = scheduler.schedule(() -> {
clients.remove(LongPollingClient.this);
//1.1
List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());
//1.2
sendResponse(changedGroups);
}, timeoutTime, TimeUnit.MILLISECONDS);
//將當前對象加入到clients中
clients.add(this);
}
這里LongPollingClient.this之前沒有見到過,主要是當我們在一個類的內部類中辽社,如果需要訪問外部類的方法或者成員域的時候伟墙,如果直接使用 this.成員域(與 內部類.this.成員域 沒有分別) 調用的顯然是內部類的域 , 如果我們想要訪問外部類的域的時候滴铅,就要必須使用 外部類.this.成員域
package com.test;
public class TestA
{
public void tn()
{
System.out.println("外部類tn");
}
Thread thread = new Thread(){
public void tn(){System.out.println("inner tn");}
public void run(){
System.out.println("內部類run");
TestA.this.tn();//調用外部類的tn方法戳葵。
this.tn();//調用內部類的tn方法
}
};
public static void main(String aaa[])
{new TestA().thread.start();}
}
1.1 compareChangedGroup具體做了什么,先不要關注HttpServletRequest是從哪來的汉匙,這里也看出了我們本地Cache的作用是什么
private List<ConfigGroupEnum> compareChangedGroup(final HttpServletRequest request) {
List<ConfigGroupEnum> changedGroup = new ArrayList<>(4);
for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
// 針對每一個group獲取的對應的參數(shù)
String[] params = StringUtils.split(request.getParameter(group.name()), ',');
if (params == null || params.length != 2) {
throw new SoulException("group param invalid:" + request.getParameter(group.name()));
}
//參數(shù)第一位時client端的Md5值拱烁, 第二位時client端的修改時間戳
String clientMd5 = params[0];
long clientModifyTime = NumberUtils.toLong(params[1]);
//獲取本地緩存的配置
ConfigDataCache serverCache = CACHE.get(group.name());
// 檢查是否需要更新服務器的緩存配置
//1.1.1
if (this.checkCacheDelayAndUpdate(serverCache, clientMd5, clientModifyTime)) {
changedGroup.add(group);
}
}
return changedGroup;
}
1.1.1 checkCacheDelayAndUpdate
private boolean checkCacheDelayAndUpdate(final ConfigDataCache serverCache, final String clientMd5, final long clientModifyTime) {
// 如果md5相等生蚁,說明配置相同,不需要更新
if (StringUtils.equals(clientMd5, serverCache.getMd5())) {
return false;
}
// 如果md5值不等戏自,說明服務器的配置和客戶端的緩存不一致
long lastModifyTime = serverCache.getLastModifyTime();
//在比對下服務器配置是否比客戶端的更新
if (lastModifyTime >= clientModifyTime) {
// 如果更新邦投,說明客戶端的配置是舊的,需要更新
return true;
}
// 如果服務端的緩存配置擅笔,比客戶端的配置還要老志衣,那么說明,服務端的緩存配置需要更新了
// 這里soul考慮到并發(fā)問題猛们,如果多個client都來soul拉取最新配置念脯,而當前的soul-admin配置因為都會走到這里,那么如果我們不加鎖的話阅懦,會導致和二,同時走到后面的refreshLocalCache,而refreshLocalCache我們前面看到是需要查詢數(shù)據(jù)庫并更新到本地緩存的耳胎,那么會導致大量的sql查詢給數(shù)據(jù)庫帶來壓力惯吕,所以這里加了一個鎖,并設置了超時時間
boolean locked = false;
try {
locked = LOCK.tryLock(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return true;
}
if (locked) {
try {
//這里在拿到鎖以后怕午,先去本地緩存再拿一遍最新的緩存配置废登,與剛才獲取到的配置做下對比,如果發(fā)現(xiàn)不相等郁惜,說明之前獲取到鎖之前已經(jīng)有數(shù)據(jù)更新到緩存堡距,
ConfigDataCache latest = CACHE.get(serverCache.getGroup());
if (latest != serverCache) {
// 在判斷當前的最新配置和客戶端配置的Md5是否一致.
return !StringUtils.equals(clientMd5, latest.getMd5());
}
// 更新緩存數(shù)據(jù)
this.refreshLocalCache();
//拿到最新的配置
latest = CACHE.get(serverCache.getGroup());
//比對
return !StringUtils.equals(clientMd5, latest.getMd5());
} finally {
LOCK.unlock();
}
}
// 沒有獲取到鎖,默認當成需要更新處理
return true;
}
上面的代碼兆蕉,看出了soul設計的代碼的精妙之處
接著上面代碼羽戒,1.1之后,會調用sendResponse(changedGroups);
void sendResponse(final List<ConfigGroupEnum> changedGroups) {
// 這里邏輯場景就是上面我們剛開始跟過來的DataChangeTask執(zhí)行的run里面虎韵,對所有client的主動觸發(fā)的場景易稠,這里是想取消掉client的run執(zhí)行時候的延遲動作,防止重復運行包蓝,具體原因還需要在往后看
if (null != asyncTimeoutFuture) {
asyncTimeoutFuture.cancel(false);
}
//生成response驶社,aysncContext完成
generateResponse((HttpServletResponse) asyncContext.getResponse(), changedGroups);
asyncContext.complete();
}
通過上面的源碼分析。我們現(xiàn)在主要有幾個疑惑點:
- AsyncContext到底是干嘛的测萎?
- 為什么是直接生成的Response返回亡电?
- Client是什么時候添加到org.dromara.soul.admin.listener.http.HttpLongPollingDataChangedListener#clients里面的
帶著這幾個問題,我們在看下一篇文章