spring cloud gateway 二次開發(fā)之 ServerListUpdater 服務(wù)列表更新

年后不久換了部門,一直在改Bug和優(yōu)化。僚害。。終于有了點(diǎn)時(shí)間繁调,把之前漏下沒(méi)記錄的點(diǎn)慢慢補(bǔ)上

gateway使用ribbon作為服務(wù)調(diào)用的負(fù)載均衡中間件萨蚕,根據(jù)配置的 IRule 對(duì)拉取到的服務(wù)列表進(jìn)行負(fù)載

而這些真正提供服務(wù)的實(shí)例是有動(dòng)態(tài)上下線的情況存在的,為了保證輪詢到的服務(wù)實(shí)例能正常訪問(wèn)蹄胰,ribbon中有一個(gè)接口

ServerListUpdater 會(huì)定期對(duì)服務(wù)列表進(jìn)行更新

在使用 Eureka 作為注冊(cè)中心的時(shí)候岳遥,ServerListUpdater有兩個(gè)實(shí)現(xiàn)類:

  • PollingServerListUpdater :定時(shí)從注冊(cè)中心拉取服務(wù)列表,如果沒(méi)有配置裕寨,默認(rèn)為30秒

  • EurekaNotificationServerListUpdater :注冊(cè)中心中的服務(wù)有變動(dòng)時(shí)浩蓉,通知客戶端派继,EurekaNotificationServerListUpdater是通過(guò)添加了一個(gè)監(jiān)聽器,當(dāng)收到注冊(cè)中心的通知后捻艳,做出相應(yīng)的動(dòng)作

PollingServerListUpdater 也是 默認(rèn)的 ServerListUpdater 配置

分別看一下它們的代碼實(shí)現(xiàn)

PollingServerListUpdater

// 從注冊(cè)中心拉取服務(wù)列表的間隔
private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;


 @Override
    public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {

            // 這里封裝了一個(gè)進(jìn)行服務(wù)拉取的任務(wù)
            final Runnable wrapperRunnable = new Runnable() {
                @Override
                public void run() {
                    if (!isActive.get()) {
                        if (scheduledFuture != null) {
                            scheduledFuture.cancel(true);
                        }
                        return;
                    }
                    try {
                        //這里是進(jìn)行服務(wù)列表更新的動(dòng)作
                        updateAction.doUpdate();
                        lastUpdated = System.currentTimeMillis();
                    } catch (Exception e) {
                        logger.warn("Failed one update cycle", e);
                    }
                }
            };
           
            // 使用 scheduled 進(jìn)行定時(shí)拉取
            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                    wrapperRunnable,
                    initialDelayMs,
                    refreshIntervalMs,//這個(gè)就是剛才配置的拉取間隔
                    TimeUnit.MILLISECONDS
            );
        } else {
            logger.info("Already active, no-op");
        }
    }


EurekaNotificationServerListUpdater

 @Override
    public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {
            this.updateListener = new EurekaEventListener() {

               // 監(jiān)聽 Eureka發(fā)布的事件驾窟,然后拉取最新的列表
                @Override
                public void onEvent(EurekaEvent event) {
                    if (event instanceof CacheRefreshedEvent) {
                        if (!updateQueued.compareAndSet(false, true)) {  // if an update is already queued
                            logger.info("an update action is already queued, returning as no-op");
                            return;
                        }

                        if (!refreshExecutor.isShutdown()) {
                            try {
                                refreshExecutor.submit(new Runnable() {
                                    @Override
                                    public void run() {
                                        try {
                                            //這里是進(jìn)行服務(wù)列表更新的動(dòng)作
                                            updateAction.doUpdate();
                                            lastUpdated.set(System.currentTimeMillis());
                                        } catch (Exception e) {
                                            logger.warn("Failed to update serverList", e);
                                        } finally {
                                            updateQueued.set(false);
                                        }
                                    }
                                });  // fire and forget
                            } catch (Exception e) {
                                logger.warn("Error submitting update task to executor, skipping one round of updates", e);
                                updateQueued.set(false);  // if submit fails, need to reset updateQueued to false
                            }
                        }
                        else {
                            logger.debug("stopping EurekaNotificationServerListUpdater, as refreshExecutor has been shut down");
                            stop();
                        }
                    }
                }
            };
        
      ...以下代碼省略

        } else {
            logger.info("Update listener already registered, no-op");
        }
    }

可以看到,這兩個(gè) ServerListUpdater 實(shí)現(xiàn)類在更新服務(wù)列表的時(shí)候认轨,都做了同一個(gè)動(dòng)作updateAction.doUpdate()
進(jìn)入這個(gè)方法绅络,發(fā)現(xiàn)它是一個(gè)定義在ServerListUpdater 中的一個(gè)接口

public interface ServerListUpdater {

    /**
     * an interface for the updateAction that actually executes a server list update
     */
    public interface UpdateAction {
        void doUpdate();
    }
 ...以下代碼省略

}

而它的方法實(shí)現(xiàn)是在 負(fù)載均衡器 DynamicServerListLoadBalancer 中定義的

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
    private static final Logger LOGGER = LoggerFactory.getLogger(DynamicServerListLoadBalancer.class);

   ......代碼省略......

    protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            // UpdateAction的方法實(shí)現(xiàn),調(diào)用了另一個(gè)方法
            updateListOfServers();
        }
    };

    
    
    @VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            //真正進(jìn)行列表更新的地方
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }

......代碼省略......

}

可以看到嘁字,這個(gè) serverListImpl 就是在負(fù)載均衡器的ServerList屬性昨稼,使用這個(gè)接口的getUpdatedListOfServers方法進(jìn)行列表更新,因?yàn)槲业捻?xiàng)目里使用的是自己的注冊(cè)中心拳锚,沒(méi)有用Eureka,所以也寫了一個(gè)實(shí)現(xiàn)類寻行,參照了使用Eureka的情況下的默認(rèn)類 DiscoveryEnabledNIWSServerList霍掺,讓我們看看這個(gè)類的代碼

 @Override
    public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
        //其實(shí)在更新的時(shí)候,調(diào)用的也是從注冊(cè)中心拉取列表的方法
        return obtainServersViaDiscovery();
    }
    
   //從Eureka拉取服務(wù)
    private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
        List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();

        ......代碼省略......

        return serverList;
    }

由此我們知道拌蜘,無(wú)論配置哪個(gè) ServerListUpdater杆烁,在更新服務(wù)列表的時(shí)候,都是調(diào)用ServerList接口進(jìn)行一次服務(wù)拉取简卧,然后更新本地的列表兔魂,只是觸發(fā)的時(shí)機(jī)不同:

  • PollingServerListUpdater 30秒拉取一次(時(shí)間可以修改)
  • EurekaNotificationServerListUpdater 當(dāng)服務(wù)更新時(shí),通知客戶端

那么這兩種方法分別有什么弊端呢举娩?

先說(shuō)說(shuō)PollingServerListUpdater析校,如果在拉取的間隔中,有服務(wù)下線了铜涉,極端情況下智玻,原來(lái)所有的實(shí)例都不可用,換成了新的一批實(shí)例芙代,要等到下一次拉取的時(shí)間點(diǎn)才會(huì)更新吊奢,這樣會(huì)造成最久30秒的時(shí)間服務(wù)不可用。 比如:原來(lái)5個(gè)實(shí)例 A纹烹、B页滚、C、D铺呵、E變成了 F裹驰、G、H片挂、I邦马、J,但是由于并沒(méi)有到更新的時(shí)間點(diǎn),ribbon保存的還是老的服務(wù)實(shí)例滋将,而這時(shí)它們都已下線邻悬,無(wú)法提供服務(wù)。

那如果我們換成随闽,一旦服務(wù)變更(這里是上下線都會(huì)通知)就通知客戶端的 EurekaNotificationServerListUpdater 會(huì)怎么樣呢父丰,這樣好像可以實(shí)時(shí)的替換成最新的可用實(shí)例,保證服務(wù)不會(huì)打到失效的實(shí)例上掘宪《晟龋可是這會(huì)有另一個(gè)問(wèn)題,也是在極端情況下魏滚,如果這次通知由于網(wǎng)絡(luò)問(wèn)題镀首,沒(méi)有通知到客戶端,那么這次變動(dòng)過(guò)后鼠次,如果一直沒(méi)有服務(wù)變更更哄,客戶端就再也不會(huì)進(jìn)行服務(wù)的拉取,這個(gè)時(shí)候造成不可用的時(shí)間就難以預(yù)估了腥寇。比如:在9:00的時(shí)候成翩,A、B赦役、C麻敌、D、E服務(wù)全部下線掂摔,F(xiàn)术羔、G、H乙漓、I聂示、J上線,Eureka通知客戶端簇秒,但網(wǎng)絡(luò)抖動(dòng)鱼喉,客戶端沒(méi)有收到,或者客戶端收到了趋观,拉取時(shí)候失敗扛禽,并沒(méi)有更新本地的列表,這樣只有等到下次收到通知時(shí)才會(huì)去拉取皱坛,假設(shè)接下來(lái)服務(wù)很穩(wěn)定编曼,12:00的時(shí)候才有一次更新,這樣就有3個(gè)小時(shí)的服務(wù)不可用剩辟。

那么有沒(méi)有什么辦法既可以擁有2個(gè)實(shí)現(xiàn)類的優(yōu)勢(shì)掐场,又摒棄它們的弊端呢往扔?有的!
小孩子才做選擇熊户,我們大人是全都要萍膛!那就是自己寫一個(gè)ServerListUpdater的實(shí)現(xiàn)類,然后:

  • 1 啟動(dòng)一個(gè)定時(shí)任務(wù)嚷堡,定時(shí)從注冊(cè)中心拉取蝗罗,而這個(gè)拉取間隔也可以根據(jù)自己的預(yù)估進(jìn)行修改。
  • 2 注冊(cè)一個(gè)監(jiān)聽器蝌戒,可以收到注冊(cè)中心的服務(wù)變更通知
  • 3 在配置中串塑,把默認(rèn)的 ServerListUpdater改成自己寫的 ServerListUpdater,這樣ribbonConfig類在看到有ServerListUpdater的實(shí)現(xiàn)類的情況下北苟,就不會(huì)加載 Eureka的 ServerListUpdater了桩匪。

實(shí)現(xiàn)類的代碼太長(zhǎng),就不貼了友鼻,邏輯很清晰傻昙,可以參考PollingServerListUpdater 和 EurekaNotificationServerListUpdater,把他們的代碼照著寫就行

接下來(lái)就是配置桃移,在配置類中添加上自己的實(shí)現(xiàn)類

/**
     * 服務(wù)更新通知機(jī)制
     * @param notificationService
     * @param clientConfig
     * @return
     */
    @Bean
    public ServerListUpdater ribbonServerListUpdater(NotificationServiceImpl notificationService, IClientConfig clientConfig) {
        return new PollingNotificationServerListUpdater(notificationService, clientConfig);
    }

這樣就實(shí)現(xiàn)了輪詢和通知并存的形式。
打完收工葛碧!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末借杰,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子进泼,更是在濱河造成了極大的恐慌蔗衡,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,376評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件乳绕,死亡現(xiàn)場(chǎng)離奇詭異绞惦,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)洋措,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,126評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門济蝉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人菠发,你說(shuō)我怎么就攤上這事王滤。” “怎么了滓鸠?”我有些...
    開封第一講書人閱讀 156,966評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵雁乡,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我糜俗,道長(zhǎng)踱稍,這世上最難降的妖魔是什么曲饱? 我笑而不...
    開封第一講書人閱讀 56,432評(píng)論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮珠月,結(jié)果婚禮上扩淀,老公的妹妹穿的比我還像新娘。我一直安慰自己桥温,他們只是感情好引矩,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,519評(píng)論 6 385
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著侵浸,像睡著了一般旺韭。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上掏觉,一...
    開封第一講書人閱讀 49,792評(píng)論 1 290
  • 那天区端,我揣著相機(jī)與錄音,去河邊找鬼澳腹。 笑死织盼,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的酱塔。 我是一名探鬼主播沥邻,決...
    沈念sama閱讀 38,933評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼羊娃!你這毒婦竟也來(lái)了唐全?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,701評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤蕊玷,失蹤者是張志新(化名)和其女友劉穎邮利,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體垃帅,經(jīng)...
    沈念sama閱讀 44,143評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡延届,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,488評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了贸诚。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片方庭。...
    茶點(diǎn)故事閱讀 38,626評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖酱固,靈堂內(nèi)的尸體忽然破棺而出二鳄,到底是詐尸還是另有隱情,我是刑警寧澤媒怯,帶...
    沈念sama閱讀 34,292評(píng)論 4 329
  • 正文 年R本政府宣布订讼,位于F島的核電站,受9級(jí)特大地震影響扇苞,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,896評(píng)論 3 313
  • 文/蒙蒙 一碗降、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧程拭,春花似錦、人聲如沸棍潘。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,742評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)亦歉。三九已至恤浪,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間肴楷,已是汗流浹背水由。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留赛蔫,地道東北人砂客。 一個(gè)月前我還...
    沈念sama閱讀 46,324評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像呵恢,于是被迫代替她去往敵國(guó)和親鞠值。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,494評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容