Nacos源碼學(xué)習(xí)系列服務(wù)端第4篇服務(wù)注冊之事件機制之一

Nacos通過前面3篇的講解我們已經(jīng)見識到了事件幾乎充斥再各個方法里面各種事件讓人眼花繚亂,服務(wù)注冊要發(fā)布ClientRegisterServiceEvent和InstanceMetadataEvent事件, 客戶端初次要發(fā)布ClientChangedEvent事件赶袄,而且一個事件的處理完畢又會觸發(fā)另外一個事件贴谎,同時因為事件都是基于異步的處理方式所以調(diào)試起來不方便,基于Nacos事件機制的復(fù)雜性贝咙,這里我不一上來就講解事件機制。而是先根據(jù)幾個具體的事件及處理邏輯讓大家對Nacos事件有一個逐步的了解,有了一定的基礎(chǔ)后再學(xué)習(xí)Nacos復(fù)雜的事件機制

發(fā)布了哪些事件

public class EphemeralClientOperationServiceImpl implements ClientOperationService {
    ...
    @Override
    public void registerInstance(Service service, Instance instance, String clientId) {
        ...
        //注冊完成發(fā)布事件
        NotifyCenter.publishEvent(new 
             ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
        NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, 
        //參考InstancePublishInfo genMetadataId() 方法
        //格式是實例的 ip:port:cluster
        instanceInfo.getMetadataId(), false));
    }
    ...
} 

public class AbstractClient {
    .... 

    @Override
    public boolean addServiceInstance(Service service, InstancePublishInfo 
              instancePublishInfo) {
        //  客戶端務(wù)發(fā)布服務(wù)實例布信息
        if (null == publishers.put(service, instancePublishInfo)) {
            MetricsMonitor.incrementInstanceCount();
        }
        //統(tǒng)一事件中心:客戶端變化事件  
        NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
        return true;
    ....
    }
}
image.gif

ClientRegisterServiceEvent


//服務(wù)注冊 下線 訂閱 取消訂閱的事件處理類
public class ClientServiceIndexesManager extends SmartSubscriber {
    //服務(wù)的所有發(fā)布者列表
    private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();

    //服務(wù)的所有訂閱者列表
    private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();

   //這塊涉及到Nacos的統(tǒng)一事件機制后面再解析 這邊讀者暫時只需要知道是去NotifyCenter 注冊下自己 至于為啥注冊 怎么注冊 后面的邏輯是怎么樣的 后續(xù)再講解
    public ClientServiceIndexesManager() {
        NotifyCenter.registerSubscriber(this, 
                  NamingEventPublisherFactory.getInstance());
    }

    //得到某個服務(wù)的所有注冊客戶端
    public Collection<String> getAllClientsRegisteredService(Service service) {
        return publisherIndexes.containsKey(service) ? publisherIndexes.get(service) : 
             new ConcurrentHashSet<>();
    }

    //得到某個服務(wù)的所有訂閱客戶端
    public Collection<String> getAllClientsSubscribeService(Service service) {
        return subscriberIndexes.containsKey(service) ? subscriberIndexes.get(service) : 
             new ConcurrentHashSet<>();
    }
    //所有訂閱的服務(wù)
    public Collection<Service> getSubscribedService() {
        return subscriberIndexes.keySet();
    }

    /**
     * Clear the service index without instances.
     *
     * @param service The service of the Nacos.
     */
    public void removePublisherIndexesByEmptyService(Service service) {
        if (publisherIndexes.containsKey(service) && publisherIndexes.get(service).isEmpty()) {
            publisherIndexes.remove(service);
        }
    }

    //該訂閱器訂閱哪些類型的事件
    @Override
    public List<Class<? extends Event>> subscribeTypes() {
        List<Class<? extends Event>> result = new LinkedList<>();
        result.add(ClientOperationEvent.ClientRegisterServiceEvent.class);
        result.add(ClientOperationEvent.ClientDeregisterServiceEvent.class);
        result.add(ClientOperationEvent.ClientSubscribeServiceEvent.class);
        result.add(ClientOperationEvent.ClientUnsubscribeServiceEvent.class);
        result.add(ClientEvent.ClientDisconnectEvent.class);
        return result;
    }

   //事件分2類
   //上線 下線 訂閱 取消訂閱屬于 ClientOperationEvent 事件
    @Override
    public void onEvent(Event event) {
        if (event instanceof ClientEvent.ClientDisconnectEvent) {
            handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
        } else if (event instanceof ClientOperationEvent) {
            handleClientOperation((ClientOperationEvent) event);
        }
    }

    //客戶端下線事件的處理 
    private void handleClientDisconnect(ClientEvent.ClientDisconnectEvent event) {
        Client client = event.getClient();
        for (Service each : client.getAllSubscribeService()) {
            removeSubscriberIndexes(each, client.getClientId());
        }
        for (Service each : client.getAllPublishedService()) {
            removePublisherIndexes(each, client.getClientId());
        }
    }

    private void handleClientOperation(ClientOperationEvent event) {
        Service service = event.getService();
        String clientId = event.getClientId();
        //服務(wù)注冊
        if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
            addPublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
         //服務(wù)下線  
            removePublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
         //服務(wù)訂閱
            addSubscriberIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) 
           {
         //取消訂閱
            removeSubscriberIndexes(service, clientId);
        }
    }

    //服務(wù)注冊
    private void addPublisherIndexes(Service service, String clientId) {
        publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
        publisherIndexes.get(service).add(clientId);
        NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
    }

    //從發(fā)布服務(wù)列表中移除該服務(wù)的該clientid
    private void removePublisherIndexes(Service service, String clientId) {
        if (!publisherIndexes.containsKey(service)) {
            return;
        }
        publisherIndexes.get(service).remove(clientId);
        NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
    }

    private void addSubscriberIndexes(Service service, String clientId) {
        subscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
        // Fix #5404, Only first time add need notify event.
        if (subscriberIndexes.get(service).add(clientId)) {
            NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, 
                clientId));
        }
    }

   //從該服務(wù)的訂閱者列表中移除該clientid
    private void removeSubscriberIndexes(Service service, String clientId) {
        if (!subscriberIndexes.containsKey(service)) {
            return;
        }
        //某個服務(wù)沒有訂閱者是key也一并刪除掉
        subscriberIndexes.get(service).remove(clientId);
        if (subscriberIndexes.get(service).isEmpty()) {
            subscriberIndexes.remove(service);
        }
    }
}

image.gif

上面ClientServiceIndexesManager 這個就是 服務(wù)訂閱 取消訂閱 服務(wù)上下線4個事件的訂閱者檐涝,這里面邏輯其實比較簡單。核心就是維護了 subscriberIndexes 和 publisherIndexes 兩個map法挨。維護的是服務(wù)的訂閱者信息和服務(wù)的發(fā)布者信息谁榜,其他方法也是對這2個map的元素的增加和刪除操作。

InstanceMetadataEvent

public class NamingMetadataManager extends SmartSubscriber {

    //存儲過期的
    private final Set<ExpiredMetadataInfo> expiredMetadataInfos;

    private ConcurrentMap<Service, ServiceMetadata> serviceMetadataMap;

    //以service 為key 嵌套Map 中以instance 的 metadataId為key
    private ConcurrentMap<Service, ConcurrentMap<String, InstanceMetadata>> 
        instanceMetadataMap;

   ...

   //該訂閱之支持3種事件類型
   //實例元數(shù)據(jù)變更事件
   //服務(wù)的元數(shù)據(jù)變更事件:
   //觸發(fā)時機:
   //1凡纳、ServiceManage 中 空服務(wù)自動清理(檢測到服務(wù)下沒有實例) EmptyServiceAutoCleanerV2
   //2窃植、(v1升級v2 雙寫移除服務(wù))DoubleWriteServiceRemovalToV2Task
   //客戶端斷連事件
   //觸發(fā)時機在EphemeralIpPortClientManager 講解的時候有一個 5s 定時執(zhí)行一次 
   //ExpiredClientCleaner任務(wù)的定時器定時清理么任何發(fā)布和訂閱的客戶端且已經(jīng)到了過期時機 
   @Override
    public List<Class<? extends Event>> subscribeTypes() {
        List<Class<? extends Event>> result = new LinkedList<>();
        result.add(MetadataEvent.InstanceMetadataEvent.class);
        result.add(MetadataEvent.ServiceMetadataEvent.class);
        result.add(ClientEvent.ClientDisconnectEvent.class);
        return result;
    }

    //3類事件的處理入口
    @Override
    public void onEvent(Event event) {
        if (event instanceof MetadataEvent.InstanceMetadataEvent) {
            handleInstanceMetadataEvent((MetadataEvent.InstanceMetadataEvent) event);
        } else if (event instanceof MetadataEvent.ServiceMetadataEvent) {
            handleServiceMetadataEvent((MetadataEvent.ServiceMetadataEvent) event);
        } else {
            handleClientDisconnectEvent((ClientEvent.ClientDisconnectEvent) event);
        }
    }

    private void handleClientDisconnectEvent(ClientEvent.ClientDisconnectEvent event) {
        for (Service each : event.getClient().getAllPublishedService()) {
            String metadataId = 
            event.getClient().getInstancePublishInfo(each).getMetadataId();
            if (containInstanceMetadata(each, metadataId)) {
                updateExpiredInfo(true, 
                ExpiredMetadataInfo.newExpiredInstanceMetadata(each, metadataId));
            }
        }
    }

    //檢查serviceMetadataMap中是否包含了該service
    //如果包含該service且event.isExpired()為true創(chuàng)建一個只包含service 和 createTime信息的 
    //ExpiredMetadataInfo對象
    //如果包含該service且event.isExpired()為false 則移除該ExpiredMetadataInfo 對象
    //
    private void handleServiceMetadataEvent(MetadataEvent.ServiceMetadataEvent event) {
        Service service = event.getService();
        if (containServiceMetadata(service)) {
            updateExpiredInfo(event.isExpired(), 
                 ExpiredMetadataInfo.newExpiredServiceMetadata(service));
        }
    }

    //方法的意思如果instanceMetadataMap存儲了實例的元數(shù)據(jù)信息根據(jù)event的isExpired屬性
    //確定是否要創(chuàng)建一個基于service 和 metadataId的過期對象信息
    //服務(wù)注冊上線的時候 isExpired 為false
    //服務(wù)主動下線 isExpired 為true
    //服務(wù)端心跳檢查觸發(fā)delete ip后 isExpired 為true 
    private void handleInstanceMetadataEvent(MetadataEvent.InstanceMetadataEvent event) 
    {
        Service service = event.getService();
        String metadataId = event.getMetadataId();
        if (containInstanceMetadata(service, metadataId)) {
            updateExpiredInfo(event.isExpired(),
                    ExpiredMetadataInfo.newExpiredInstanceMetadata(event.getService(), 
                      event.getMetadataId()));
        }
    }

    //如果是過期就創(chuàng)建一個過期信息否則移除掉原有的過期信息
    //留個疑問學(xué)習(xí)完后面的章節(jié)后回頭來解讀:為什么不直接刪除instanceMetadataMap里面 
    //metadataId對應(yīng)的數(shù)據(jù) 而是做一個看似浪費內(nèi)存和性能的操作呢?
    private void updateExpiredInfo(boolean expired, ExpiredMetadataInfo 
         expiredMetadataInfo) {
        if (expired) {
            expiredMetadataInfos.add(expiredMetadataInfo);
        } else {
            expiredMetadataInfos.remove(expiredMetadataInfo);
        }
    }

    //參考InstancePublishInfo genMetadataId() 方法
    //格式是實例的 ip:port:cluster
    public boolean containInstanceMetadata(Service service, String metadataId) {
        return instanceMetadataMap.containsKey(service) && 
             instanceMetadataMap.get(service).containsKey(metadataId);
    }

    //判斷serviceMetadataMap中是否存在該service 
    public boolean containServiceMetadata(Service service) {
        return serviceMetadataMap.containsKey(service);
    }
}

//過期元數(shù)據(jù)封裝類
public class ExpiredMetadataInfo {
    //服務(wù)信息
    private final Service service;
    //元數(shù)據(jù)id
    private final String metadataId;
    //對象的創(chuàng)建時間
    private final long createTime;
    ....
}

InstanceMetadataEvent 處理3類任務(wù)這里我們著重介紹MetadataEvent.InstanceMetadataEvent的處理邏輯簡單來說就是再服務(wù)下線(主動下線和被動下線) 的時候在 expiredMetadataInfos 信息加一個過期對象(包含service metadataId)

被動下線就是我們前面章節(jié)講的服務(wù)端心跳任務(wù)檢查到30s后服務(wù)沒有響應(yīng)就下線服務(wù)的邏輯荐糜。

這里留一個疑問:

** 為什么不直接刪除instanceMetadataMap里面巷怜,metadataId對應(yīng)的數(shù)據(jù) 而是做一個看似浪費內(nèi)存和性能的操作呢?**

服務(wù)元數(shù)據(jù)變更事件:
觸發(fā)時機:
1暴氏、ServiceManager 中 空服務(wù)自動清理(檢測到服務(wù)下沒有實例) EmptyServiceAutoCleanerV2
2延塑、(v1升級v2 雙寫移除服務(wù))DoubleWriteServiceRemovalToV2Task
這塊等后面講解到具體的場景的時候再具體解決,這里大家一筆帶過就可以答渔。

客戶端斷連事件
觸發(fā)時機:

EphemeralIpPortClientManager 講解的時候有一個 5s 定時執(zhí)行一次
ExpiredClientCleaner任務(wù)的定時器定時清理么任何發(fā)布和訂閱的客戶端且已經(jīng)到了過期時機讀者可以前往客戶端管理器那一篇看具體的邏輯

總結(jié)


下一篇我們繼續(xù)講解服務(wù)注冊過程中出現(xiàn)的事件及嵌套事件并做下總結(jié)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末页畦,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子研儒,更是在濱河造成了極大的恐慌豫缨,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,036評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件端朵,死亡現(xiàn)場離奇詭異好芭,居然都是意外死亡,警方通過查閱死者的電腦和手機冲呢,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,046評論 3 395
  • 文/潘曉璐 我一進店門舍败,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人敬拓,你說我怎么就攤上這事邻薯。” “怎么了乘凸?”我有些...
    開封第一講書人閱讀 164,411評論 0 354
  • 文/不壞的土叔 我叫張陵厕诡,是天一觀的道長。 經(jīng)常有香客問我营勤,道長灵嫌,這世上最難降的妖魔是什么壹罚? 我笑而不...
    開封第一講書人閱讀 58,622評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮寿羞,結(jié)果婚禮上猖凛,老公的妹妹穿的比我還像新娘。我一直安慰自己绪穆,他們只是感情好辨泳,可當(dāng)我...
    茶點故事閱讀 67,661評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著玖院,像睡著了一般漠吻。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上司恳,一...
    開封第一講書人閱讀 51,521評論 1 304
  • 那天途乃,我揣著相機與錄音,去河邊找鬼扔傅。 笑死耍共,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的猎塞。 我是一名探鬼主播试读,決...
    沈念sama閱讀 40,288評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼荠耽!你這毒婦竟也來了钩骇?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,200評論 0 276
  • 序言:老撾萬榮一對情侶失蹤铝量,失蹤者是張志新(化名)和其女友劉穎倘屹,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體慢叨,經(jīng)...
    沈念sama閱讀 45,644評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡纽匙,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,837評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了拍谐。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片烛缔。...
    茶點故事閱讀 39,953評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖轩拨,靈堂內(nèi)的尸體忽然破棺而出践瓷,到底是詐尸還是另有隱情,我是刑警寧澤亡蓉,帶...
    沈念sama閱讀 35,673評論 5 346
  • 正文 年R本政府宣布晕翠,位于F島的核電站,受9級特大地震影響寸宵,放射性物質(zhì)發(fā)生泄漏崖面。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,281評論 3 329
  • 文/蒙蒙 一梯影、第九天 我趴在偏房一處隱蔽的房頂上張望巫员。 院中可真熱鬧,春花似錦甲棍、人聲如沸简识。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,889評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽七扰。三九已至,卻和暖如春陪白,著一層夾襖步出監(jiān)牢的瞬間颈走,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,011評論 1 269
  • 我被黑心中介騙來泰國打工咱士, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留立由,地道東北人。 一個月前我還...
    沈念sama閱讀 48,119評論 3 370
  • 正文 我出身青樓序厉,卻偏偏與公主長得像锐膜,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子弛房,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,901評論 2 355

推薦閱讀更多精彩內(nèi)容