九并级、soul源碼學習-http長輪訓數(shù)據(jù)同步機制詳解

上一節(jié)講了數(shù)據(jù)持久化后拂檩,發(fā)送事件后,Spring監(jiān)聽到事件后死遭,做了什么事,并看到現(xiàn)有四種數(shù)據(jù)同步機制凯旋。這節(jié)具體加一下http長輪訓

org.dromara.soul.admin.listener.http.HttpLongPollingDataChangedListener http長輪訓數(shù)據(jù)監(jiān)聽器

先看下構造器:在構造器中呀潭,構造了一個1024長度的阻塞隊列钉迷,以及一個ScheduledThreadPoolExecutor,并初始化HttpSyncProperties钠署,

/**
     * Blocked client.
     */
    private final BlockingQueue<LongPollingClient> clients;

    private final ScheduledExecutorService scheduler;

    private final HttpSyncProperties httpSyncProperties;

    /**
     * Instantiates a new Http long polling data changed listener.
     * @param httpSyncProperties the HttpSyncProperties
     */
    public HttpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {
        this.clients = new ArrayBlockingQueue<>(1024);
        this.scheduler = new ScheduledThreadPoolExecutor(1,
                SoulThreadFactory.create("long-polling", true));
        this.httpSyncProperties = httpSyncProperties;
    }

HttpSyncProperties主要是http同步的配置

@Getter
@Setter
@ConfigurationProperties(prefix = "soul.sync.http")
public class HttpSyncProperties {

    /**
     * Whether enabled http sync strategy, default: true.
     */
    private boolean enabled = true;

    /**
     * Periodically refresh the config data interval from the database, default: 5 minutes.
     */
    private Duration refreshInterval = Duration.ofMinutes(5);

}

主要定義了http同步開關以及刷新周期糠聪。 類初始化之后,更新各種數(shù)據(jù)緩存,然后執(zhí)行了一個定時任務谐鼎,每次調用refreshLocalCache刷新本地緩存

@Override
public final void afterPropertiesSet() {
  updateAppAuthCache();
  updatePluginCache();
  updateRuleCache();
  updateSelectorCache();
  updateMetaDataCache();
  afterInitialize();
}

@Override
protected void afterInitialize() {
  long syncInterval = httpSyncProperties.getRefreshInterval().toMillis();
  // Periodically check the data for changes and update the cache
  scheduler.scheduleWithFixedDelay(() -> {
    log.info("http sync strategy refresh config start.");
    try {
      this.refreshLocalCache();
      log.info("http sync strategy refresh config success.");
    } catch (Exception e) {
      log.error("http sync strategy refresh config error!", e);
    }
  }, syncInterval, syncInterval, TimeUnit.MILLISECONDS);
  log.info("http sync strategy refresh interval: {}ms", syncInterval);
}
private void refreshLocalCache() {
  this.updateAppAuthCache();
  this.updatePluginCache();
  this.updateRuleCache();
  this.updateSelectorCache();
  this.updateMetaDataCache();
}

這些方法要做的事情都很類似舰蟆,就是從數(shù)據(jù)庫拿到對應ConfigGroup的所有配置,并更新本地緩存狸棍,比如看updateAppAuthCache身害,就是將當前數(shù)據(jù)庫的配置更新到本地緩存,至于具體為什么要更新到本地緩存草戈,我們后面分曉塌鸯。

//org.dromara.soul.admin.listener.AbstractDataChangedListener
protected static final ConcurrentMap<String, ConfigDataCache> CACHE = new ConcurrentHashMap<>();
/**
     * Update app auth cache.
     */
protected void updateAppAuthCache() {
  this.updateCache(ConfigGroupEnum.APP_AUTH, appAuthService.listAll());
}

protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {
  String json = GsonUtils.getInstance().toJson(data);
  ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());
  ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);
  log.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);
}

之前我們看到,當數(shù)據(jù)事件變化監(jiān)聽器分發(fā)者唐片,監(jiān)聽到事件后丙猬,會調用各個監(jiān)聽器的對應方法:

//org.dromara.soul.admin.listener.DataChangedEventDispatcher
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
  for (DataChangedListener listener : listeners) {
    switch (event.getGroupKey()) {
      case APP_AUTH:
        listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
        break;
      case PLUGIN:
        listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
        break;
      case RULE:
        listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
        break;
      case SELECTOR:
        listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
        break;
      case META_DATA:
        listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
        break;
      default:
        throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
    }
  }
}

那么在長輪訓機制下,主要做了如下事情费韭,還拿AppAuth看下

@Override
public void onAppAuthChanged(final List<AppAuthData> changed, final DataEventTypeEnum eventType) {
  if (CollectionUtils.isEmpty(changed)) {
    return;
  }
  this.updateAppAuthCache();
  this.afterAppAuthChanged(changed, eventType);
}

接收到變更數(shù)據(jù)后茧球,會先更新下對應的內存緩存,然后再做數(shù)據(jù)變更星持。

//org.dromara.soul.admin.listener.http.HttpLongPollingDataChangedListener#afterAppAuthChanged
@Override
protected void afterAppAuthChanged(final List<AppAuthData> changed, final DataEventTypeEnum eventType) {
  scheduler.execute(new DataChangeTask(ConfigGroupEnum.APP_AUTH));
}

這里看到抢埋,通過線程池執(zhí)行一個數(shù)據(jù)變化任務

        /**
     * When a group's data changes, the thread is created to notify the client asynchronously.
     */
    class DataChangeTask implements Runnable {

        /**
         * The Group where the data has changed.
         */
        private final ConfigGroupEnum groupKey;

        /**
         * The Change time.
         */
        private final long changeTime = System.currentTimeMillis();

        /**
         * Instantiates a new Data change task.
         *
         * @param groupKey the group key
         */
        DataChangeTask(final ConfigGroupEnum groupKey) {
            this.groupKey = groupKey;
        }

        @Override
        public void run() {
          //循環(huán)所有的LongPollingClient,并調用了sendResponse
            for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {
                LongPollingClient client = iter.next();
                iter.remove();
                client.sendResponse(Collections.singletonList(groupKey));
                log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);
            }
        }
    }

我們先來看下LongPollingClient是個什么東東钉汗,它主要有以下幾個屬性,一個異步的上下文羹令,ip,超時時間和異步結果Future损痰。LongPollingClient本身實現(xiàn)了一個Runnable接口

class LongPollingClient implements Runnable {           
                /**
         * The Async context.
         */
        private final AsyncContext asyncContext;

        /**
         * The Ip.
         */
        private final String ip;

        /**
         * The Timeout time.
         */
        private final long timeoutTime;

        /**
         * The Async timeout future.
         */
        private Future<?> asyncTimeoutFuture;

我們再來看下run方法:這方法較難看懂福侈。

@Override
public void run() {
  //通過org.dromara.soul.admin.listener.http.HttpLongPollingDataChangedListener的ScheduledExecutorService scheduler延遲執(zhí)行一個一次性的動作,延遲時間是timeoutTime毫秒卢未,當延遲動作開始執(zhí)行時肪凛,將當前的LongPollingClient對象從clients中移除
  this.asyncTimeoutFuture = scheduler.schedule(() -> {
    clients.remove(LongPollingClient.this);
    //1.1
    List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());
    //1.2
    sendResponse(changedGroups);
  }, timeoutTime, TimeUnit.MILLISECONDS);
  //將當前對象加入到clients中
  clients.add(this);
}

這里LongPollingClient.this之前沒有見到過,主要是當我們在一個類的內部類中辽社,如果需要訪問外部類的方法或者成員域的時候伟墙,如果直接使用 this.成員域(與 內部類.this.成員域 沒有分別) 調用的顯然是內部類的域 , 如果我們想要訪問外部類的域的時候滴铅,就要必須使用 外部類.this.成員域

package com.test;
public class TestA 
{    
    public void tn()
    {          
        System.out.println("外部類tn");         
    }  
    Thread thread = new Thread(){     
          public void tn(){System.out.println("inner tn");}        
          public void run(){           
                 System.out.println("內部類run");        
                 TestA.this.tn();//調用外部類的tn方法戳葵。          
                 this.tn();//調用內部類的tn方法           
             }    
     };          
     public static void main(String aaa[])
     {new TestA().thread.start();}
}

1.1 compareChangedGroup具體做了什么,先不要關注HttpServletRequest是從哪來的汉匙,這里也看出了我們本地Cache的作用是什么

private List<ConfigGroupEnum> compareChangedGroup(final HttpServletRequest request) {
  List<ConfigGroupEnum> changedGroup = new ArrayList<>(4);
  for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
    // 針對每一個group獲取的對應的參數(shù)
    String[] params = StringUtils.split(request.getParameter(group.name()), ',');
    if (params == null || params.length != 2) {
      throw new SoulException("group param invalid:" + request.getParameter(group.name()));
    }
    //參數(shù)第一位時client端的Md5值拱烁, 第二位時client端的修改時間戳
    String clientMd5 = params[0];
    long clientModifyTime = NumberUtils.toLong(params[1]);
    //獲取本地緩存的配置
    ConfigDataCache serverCache = CACHE.get(group.name());
    // 檢查是否需要更新服務器的緩存配置
    //1.1.1
    if (this.checkCacheDelayAndUpdate(serverCache, clientMd5, clientModifyTime)) {
      changedGroup.add(group);
    }
  }
  return changedGroup;
}

1.1.1 checkCacheDelayAndUpdate

private boolean checkCacheDelayAndUpdate(final ConfigDataCache serverCache, final String clientMd5, final long clientModifyTime) {

  // 如果md5相等生蚁,說明配置相同,不需要更新
  if (StringUtils.equals(clientMd5, serverCache.getMd5())) {
    return false;
  }

  // 如果md5值不等戏自,說明服務器的配置和客戶端的緩存不一致
  long lastModifyTime = serverCache.getLastModifyTime();
  //在比對下服務器配置是否比客戶端的更新
  if (lastModifyTime >= clientModifyTime) {
    // 如果更新邦投,說明客戶端的配置是舊的,需要更新
    return true;
  }

  // 如果服務端的緩存配置擅笔,比客戶端的配置還要老志衣,那么說明,服務端的緩存配置需要更新了
  // 這里soul考慮到并發(fā)問題猛们,如果多個client都來soul拉取最新配置念脯,而當前的soul-admin配置因為都會走到這里,那么如果我們不加鎖的話阅懦,會導致和二,同時走到后面的refreshLocalCache,而refreshLocalCache我們前面看到是需要查詢數(shù)據(jù)庫并更新到本地緩存的耳胎,那么會導致大量的sql查詢給數(shù)據(jù)庫帶來壓力惯吕,所以這里加了一個鎖,并設置了超時時間
  boolean locked = false;
  try {
    locked = LOCK.tryLock(5, TimeUnit.SECONDS);
  } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    return true;
  }
  if (locked) {
    try {
      //這里在拿到鎖以后怕午,先去本地緩存再拿一遍最新的緩存配置废登,與剛才獲取到的配置做下對比,如果發(fā)現(xiàn)不相等郁惜,說明之前獲取到鎖之前已經(jīng)有數(shù)據(jù)更新到緩存堡距,
      ConfigDataCache latest = CACHE.get(serverCache.getGroup());
      if (latest != serverCache) {
        // 在判斷當前的最新配置和客戶端配置的Md5是否一致.
        return !StringUtils.equals(clientMd5, latest.getMd5());
      }
      // 更新緩存數(shù)據(jù)
      this.refreshLocalCache();
      //拿到最新的配置
      latest = CACHE.get(serverCache.getGroup());
      //比對
      return !StringUtils.equals(clientMd5, latest.getMd5());
    } finally {
      LOCK.unlock();
    }
  }
  // 沒有獲取到鎖,默認當成需要更新處理
  return true;

}

上面的代碼兆蕉,看出了soul設計的代碼的精妙之處

接著上面代碼羽戒,1.1之后,會調用sendResponse(changedGroups);

void sendResponse(final List<ConfigGroupEnum> changedGroups) {
  // 這里邏輯場景就是上面我們剛開始跟過來的DataChangeTask執(zhí)行的run里面虎韵,對所有client的主動觸發(fā)的場景易稠,這里是想取消掉client的run執(zhí)行時候的延遲動作,防止重復運行包蓝,具體原因還需要在往后看
  if (null != asyncTimeoutFuture) {
    asyncTimeoutFuture.cancel(false);
  }
  //生成response驶社,aysncContext完成
  generateResponse((HttpServletResponse) asyncContext.getResponse(), changedGroups);
  asyncContext.complete();
}

通過上面的源碼分析。我們現(xiàn)在主要有幾個疑惑點:

  1. AsyncContext到底是干嘛的测萎?
  2. 為什么是直接生成的Response返回亡电?
  3. Client是什么時候添加到org.dromara.soul.admin.listener.http.HttpLongPollingDataChangedListener#clients里面的

帶著這幾個問題,我們在看下一篇文章

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末硅瞧,一起剝皮案震驚了整個濱河市份乒,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖或辖,帶你破解...
    沈念sama閱讀 211,042評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件拇勃,死亡現(xiàn)場離奇詭異,居然都是意外死亡孝凌,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,996評論 2 384
  • 文/潘曉璐 我一進店門月腋,熙熙樓的掌柜王于貴愁眉苦臉地迎上來蟀架,“玉大人,你說我怎么就攤上這事榆骚∑模” “怎么了?”我有些...
    開封第一講書人閱讀 156,674評論 0 345
  • 文/不壞的土叔 我叫張陵妓肢,是天一觀的道長捌省。 經(jīng)常有香客問我,道長碉钠,這世上最難降的妖魔是什么纲缓? 我笑而不...
    開封第一講書人閱讀 56,340評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮喊废,結果婚禮上祝高,老公的妹妹穿的比我還像新娘。我一直安慰自己污筷,他們只是感情好工闺,可當我...
    茶點故事閱讀 65,404評論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著瓣蛀,像睡著了一般陆蟆。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上惋增,一...
    開封第一講書人閱讀 49,749評論 1 289
  • 那天叠殷,我揣著相機與錄音,去河邊找鬼器腋。 笑死溪猿,一個胖子當著我的面吹牛,可吹牛的內容都是我干的纫塌。 我是一名探鬼主播诊县,決...
    沈念sama閱讀 38,902評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼措左!你這毒婦竟也來了依痊?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,662評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎胸嘁,沒想到半個月后瓶摆,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,110評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡性宏,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,451評論 2 325
  • 正文 我和宋清朗相戀三年群井,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片毫胜。...
    茶點故事閱讀 38,577評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡书斜,死狀恐怖,靈堂內的尸體忽然破棺而出酵使,到底是詐尸還是另有隱情荐吉,我是刑警寧澤,帶...
    沈念sama閱讀 34,258評論 4 328
  • 正文 年R本政府宣布口渔,位于F島的核電站样屠,受9級特大地震影響,放射性物質發(fā)生泄漏缺脉。R本人自食惡果不足惜痪欲,卻給世界環(huán)境...
    茶點故事閱讀 39,848評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望攻礼。 院中可真熱鬧勤揩,春花似錦、人聲如沸秘蛔。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,726評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽深员。三九已至负蠕,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間倦畅,已是汗流浹背遮糖。 一陣腳步聲響...
    開封第一講書人閱讀 31,952評論 1 264
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留叠赐,地道東北人欲账。 一個月前我還...
    沈念sama閱讀 46,271評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像芭概,于是被迫代替她去往敵國和親赛不。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,452評論 2 348

推薦閱讀更多精彩內容