Nacos通過前面3篇的講解我們已經(jīng)見識到了事件幾乎充斥再各個方法里面各種事件讓人眼花繚亂,服務(wù)注冊要發(fā)布ClientRegisterServiceEvent和InstanceMetadataEvent事件, 客戶端初次要發(fā)布ClientChangedEvent事件赶袄,而且一個事件的處理完畢又會觸發(fā)另外一個事件贴谎,同時因為事件都是基于異步的處理方式所以調(diào)試起來不方便,基于Nacos事件機制的復(fù)雜性贝咙,這里我不一上來就講解事件機制。而是先根據(jù)幾個具體的事件及處理邏輯讓大家對Nacos事件有一個逐步的了解,有了一定的基礎(chǔ)后再學(xué)習(xí)Nacos復(fù)雜的事件機制
發(fā)布了哪些事件
public class EphemeralClientOperationServiceImpl implements ClientOperationService {
...
@Override
public void registerInstance(Service service, Instance instance, String clientId) {
...
//注冊完成發(fā)布事件
NotifyCenter.publishEvent(new
ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton,
//參考InstancePublishInfo genMetadataId() 方法
//格式是實例的 ip:port:cluster
instanceInfo.getMetadataId(), false));
}
...
}
public class AbstractClient {
....
@Override
public boolean addServiceInstance(Service service, InstancePublishInfo
instancePublishInfo) {
// 客戶端務(wù)發(fā)布服務(wù)實例布信息
if (null == publishers.put(service, instancePublishInfo)) {
MetricsMonitor.incrementInstanceCount();
}
//統(tǒng)一事件中心:客戶端變化事件
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
return true;
....
}
}
ClientRegisterServiceEvent
//服務(wù)注冊 下線 訂閱 取消訂閱的事件處理類
public class ClientServiceIndexesManager extends SmartSubscriber {
//服務(wù)的所有發(fā)布者列表
private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
//服務(wù)的所有訂閱者列表
private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();
//這塊涉及到Nacos的統(tǒng)一事件機制后面再解析 這邊讀者暫時只需要知道是去NotifyCenter 注冊下自己 至于為啥注冊 怎么注冊 后面的邏輯是怎么樣的 后續(xù)再講解
public ClientServiceIndexesManager() {
NotifyCenter.registerSubscriber(this,
NamingEventPublisherFactory.getInstance());
}
//得到某個服務(wù)的所有注冊客戶端
public Collection<String> getAllClientsRegisteredService(Service service) {
return publisherIndexes.containsKey(service) ? publisherIndexes.get(service) :
new ConcurrentHashSet<>();
}
//得到某個服務(wù)的所有訂閱客戶端
public Collection<String> getAllClientsSubscribeService(Service service) {
return subscriberIndexes.containsKey(service) ? subscriberIndexes.get(service) :
new ConcurrentHashSet<>();
}
//所有訂閱的服務(wù)
public Collection<Service> getSubscribedService() {
return subscriberIndexes.keySet();
}
/**
* Clear the service index without instances.
*
* @param service The service of the Nacos.
*/
public void removePublisherIndexesByEmptyService(Service service) {
if (publisherIndexes.containsKey(service) && publisherIndexes.get(service).isEmpty()) {
publisherIndexes.remove(service);
}
}
//該訂閱器訂閱哪些類型的事件
@Override
public List<Class<? extends Event>> subscribeTypes() {
List<Class<? extends Event>> result = new LinkedList<>();
result.add(ClientOperationEvent.ClientRegisterServiceEvent.class);
result.add(ClientOperationEvent.ClientDeregisterServiceEvent.class);
result.add(ClientOperationEvent.ClientSubscribeServiceEvent.class);
result.add(ClientOperationEvent.ClientUnsubscribeServiceEvent.class);
result.add(ClientEvent.ClientDisconnectEvent.class);
return result;
}
//事件分2類
//上線 下線 訂閱 取消訂閱屬于 ClientOperationEvent 事件
@Override
public void onEvent(Event event) {
if (event instanceof ClientEvent.ClientDisconnectEvent) {
handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
} else if (event instanceof ClientOperationEvent) {
handleClientOperation((ClientOperationEvent) event);
}
}
//客戶端下線事件的處理
private void handleClientDisconnect(ClientEvent.ClientDisconnectEvent event) {
Client client = event.getClient();
for (Service each : client.getAllSubscribeService()) {
removeSubscriberIndexes(each, client.getClientId());
}
for (Service each : client.getAllPublishedService()) {
removePublisherIndexes(each, client.getClientId());
}
}
private void handleClientOperation(ClientOperationEvent event) {
Service service = event.getService();
String clientId = event.getClientId();
//服務(wù)注冊
if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
addPublisherIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
//服務(wù)下線
removePublisherIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
//服務(wù)訂閱
addSubscriberIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent)
{
//取消訂閱
removeSubscriberIndexes(service, clientId);
}
}
//服務(wù)注冊
private void addPublisherIndexes(Service service, String clientId) {
publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
publisherIndexes.get(service).add(clientId);
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
}
//從發(fā)布服務(wù)列表中移除該服務(wù)的該clientid
private void removePublisherIndexes(Service service, String clientId) {
if (!publisherIndexes.containsKey(service)) {
return;
}
publisherIndexes.get(service).remove(clientId);
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
}
private void addSubscriberIndexes(Service service, String clientId) {
subscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
// Fix #5404, Only first time add need notify event.
if (subscriberIndexes.get(service).add(clientId)) {
NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service,
clientId));
}
}
//從該服務(wù)的訂閱者列表中移除該clientid
private void removeSubscriberIndexes(Service service, String clientId) {
if (!subscriberIndexes.containsKey(service)) {
return;
}
//某個服務(wù)沒有訂閱者是key也一并刪除掉
subscriberIndexes.get(service).remove(clientId);
if (subscriberIndexes.get(service).isEmpty()) {
subscriberIndexes.remove(service);
}
}
}
上面ClientServiceIndexesManager 這個就是 服務(wù)訂閱 取消訂閱 服務(wù)上下線4個事件的訂閱者檐涝,這里面邏輯其實比較簡單。核心就是維護了 subscriberIndexes 和 publisherIndexes 兩個map法挨。維護的是服務(wù)的訂閱者信息和服務(wù)的發(fā)布者信息谁榜,其他方法也是對這2個map的元素的增加和刪除操作。
InstanceMetadataEvent
public class NamingMetadataManager extends SmartSubscriber {
//存儲過期的
private final Set<ExpiredMetadataInfo> expiredMetadataInfos;
private ConcurrentMap<Service, ServiceMetadata> serviceMetadataMap;
//以service 為key 嵌套Map 中以instance 的 metadataId為key
private ConcurrentMap<Service, ConcurrentMap<String, InstanceMetadata>>
instanceMetadataMap;
...
//該訂閱之支持3種事件類型
//實例元數(shù)據(jù)變更事件
//服務(wù)的元數(shù)據(jù)變更事件:
//觸發(fā)時機:
//1凡纳、ServiceManage 中 空服務(wù)自動清理(檢測到服務(wù)下沒有實例) EmptyServiceAutoCleanerV2
//2窃植、(v1升級v2 雙寫移除服務(wù))DoubleWriteServiceRemovalToV2Task
//客戶端斷連事件
//觸發(fā)時機在EphemeralIpPortClientManager 講解的時候有一個 5s 定時執(zhí)行一次
//ExpiredClientCleaner任務(wù)的定時器定時清理么任何發(fā)布和訂閱的客戶端且已經(jīng)到了過期時機
@Override
public List<Class<? extends Event>> subscribeTypes() {
List<Class<? extends Event>> result = new LinkedList<>();
result.add(MetadataEvent.InstanceMetadataEvent.class);
result.add(MetadataEvent.ServiceMetadataEvent.class);
result.add(ClientEvent.ClientDisconnectEvent.class);
return result;
}
//3類事件的處理入口
@Override
public void onEvent(Event event) {
if (event instanceof MetadataEvent.InstanceMetadataEvent) {
handleInstanceMetadataEvent((MetadataEvent.InstanceMetadataEvent) event);
} else if (event instanceof MetadataEvent.ServiceMetadataEvent) {
handleServiceMetadataEvent((MetadataEvent.ServiceMetadataEvent) event);
} else {
handleClientDisconnectEvent((ClientEvent.ClientDisconnectEvent) event);
}
}
private void handleClientDisconnectEvent(ClientEvent.ClientDisconnectEvent event) {
for (Service each : event.getClient().getAllPublishedService()) {
String metadataId =
event.getClient().getInstancePublishInfo(each).getMetadataId();
if (containInstanceMetadata(each, metadataId)) {
updateExpiredInfo(true,
ExpiredMetadataInfo.newExpiredInstanceMetadata(each, metadataId));
}
}
}
//檢查serviceMetadataMap中是否包含了該service
//如果包含該service且event.isExpired()為true創(chuàng)建一個只包含service 和 createTime信息的
//ExpiredMetadataInfo對象
//如果包含該service且event.isExpired()為false 則移除該ExpiredMetadataInfo 對象
//
private void handleServiceMetadataEvent(MetadataEvent.ServiceMetadataEvent event) {
Service service = event.getService();
if (containServiceMetadata(service)) {
updateExpiredInfo(event.isExpired(),
ExpiredMetadataInfo.newExpiredServiceMetadata(service));
}
}
//方法的意思如果instanceMetadataMap存儲了實例的元數(shù)據(jù)信息根據(jù)event的isExpired屬性
//確定是否要創(chuàng)建一個基于service 和 metadataId的過期對象信息
//服務(wù)注冊上線的時候 isExpired 為false
//服務(wù)主動下線 isExpired 為true
//服務(wù)端心跳檢查觸發(fā)delete ip后 isExpired 為true
private void handleInstanceMetadataEvent(MetadataEvent.InstanceMetadataEvent event)
{
Service service = event.getService();
String metadataId = event.getMetadataId();
if (containInstanceMetadata(service, metadataId)) {
updateExpiredInfo(event.isExpired(),
ExpiredMetadataInfo.newExpiredInstanceMetadata(event.getService(),
event.getMetadataId()));
}
}
//如果是過期就創(chuàng)建一個過期信息否則移除掉原有的過期信息
//留個疑問學(xué)習(xí)完后面的章節(jié)后回頭來解讀:為什么不直接刪除instanceMetadataMap里面
//metadataId對應(yīng)的數(shù)據(jù) 而是做一個看似浪費內(nèi)存和性能的操作呢?
private void updateExpiredInfo(boolean expired, ExpiredMetadataInfo
expiredMetadataInfo) {
if (expired) {
expiredMetadataInfos.add(expiredMetadataInfo);
} else {
expiredMetadataInfos.remove(expiredMetadataInfo);
}
}
//參考InstancePublishInfo genMetadataId() 方法
//格式是實例的 ip:port:cluster
public boolean containInstanceMetadata(Service service, String metadataId) {
return instanceMetadataMap.containsKey(service) &&
instanceMetadataMap.get(service).containsKey(metadataId);
}
//判斷serviceMetadataMap中是否存在該service
public boolean containServiceMetadata(Service service) {
return serviceMetadataMap.containsKey(service);
}
}
//過期元數(shù)據(jù)封裝類
public class ExpiredMetadataInfo {
//服務(wù)信息
private final Service service;
//元數(shù)據(jù)id
private final String metadataId;
//對象的創(chuàng)建時間
private final long createTime;
....
}
InstanceMetadataEvent 處理3類任務(wù)這里我們著重介紹MetadataEvent.InstanceMetadataEvent的處理邏輯簡單來說就是再服務(wù)下線(主動下線和被動下線) 的時候在 expiredMetadataInfos 信息加一個過期對象(包含service metadataId)
被動下線就是我們前面章節(jié)講的服務(wù)端心跳任務(wù)檢查到30s后服務(wù)沒有響應(yīng)就下線服務(wù)的邏輯荐糜。
這里留一個疑問:
** 為什么不直接刪除instanceMetadataMap里面巷怜,metadataId對應(yīng)的數(shù)據(jù) 而是做一個看似浪費內(nèi)存和性能的操作呢?**
服務(wù)元數(shù)據(jù)變更事件:
觸發(fā)時機:
1暴氏、ServiceManager 中 空服務(wù)自動清理(檢測到服務(wù)下沒有實例) EmptyServiceAutoCleanerV2
2延塑、(v1升級v2 雙寫移除服務(wù))DoubleWriteServiceRemovalToV2Task
這塊等后面講解到具體的場景的時候再具體解決,這里大家一筆帶過就可以答渔。客戶端斷連事件
觸發(fā)時機:EphemeralIpPortClientManager 講解的時候有一個 5s 定時執(zhí)行一次
ExpiredClientCleaner任務(wù)的定時器定時清理么任何發(fā)布和訂閱的客戶端且已經(jīng)到了過期時機讀者可以前往客戶端管理器那一篇看具體的邏輯
總結(jié)
下一篇我們繼續(xù)講解服務(wù)注冊過程中出現(xiàn)的事件及嵌套事件并做下總結(jié)