如圖:在ZooKeeper中绣檬,使用Watcher機(jī)制來實(shí)現(xiàn)通知功能。ZooKeeper允許客戶端向服務(wù)端注冊一個Watcher監(jiān)聽崩泡∨ⅲ客戶端將watcher對象存儲在WatchManager的同時,會向Zookeeper服務(wù)器注冊Watcher。當(dāng)ZooKeeper服務(wù)器端觸發(fā)Watcher事件后哲鸳,會向客戶端發(fā)送通知臣疑,客戶端線程從WatchManager中取出對應(yīng)的Watcher對象來執(zhí)行回調(diào)邏輯。
事件類型
NodeCreated? Watcher監(jiān)聽的節(jié)點(diǎn)被創(chuàng)建
NodeDeleted?Watcher監(jiān)聽的節(jié)點(diǎn)被刪除
NodeDataChanged?Watcher監(jiān)聽的節(jié)點(diǎn)內(nèi)容變化
NodeChildChanged?Watcher監(jiān)聽的節(jié)點(diǎn)的子節(jié)點(diǎn)內(nèi)容變化
Watcher接口
定義了時間通知的相關(guān)邏輯:通知狀態(tài)KeeperState徙菠、事件類型EventType讯沈、回調(diào)方法:process(WatchedEvent event)。
process方法的參數(shù)WatchedEvent包含了事件的三個基本屬性:通知狀態(tài)(keeperState)婿奔,事件類型(EventType)和節(jié)點(diǎn)路徑(path)缺狠。ZooKeeper使用WatchedEvent對象來封裝服務(wù)端事件并傳遞給Watcher,從而方便回調(diào)方法process對服務(wù)端事件進(jìn)行處理萍摊。
WatchedEvent和WatcherEvent表示其實(shí)的是同一個事物挤茄,作用都是對一個服務(wù)端事件的封裝。不同的是冰木,WatchedEvent是一個邏輯事件穷劈,其作用是描述服務(wù)端和客戶端程序執(zhí)行過程中所需的邏輯對象,而WatcherEvent實(shí)現(xiàn)了序列化接口片酝,用于網(wǎng)絡(luò)傳輸。
服務(wù)端在生成WatchedEvent事件之后挖腰,會調(diào)用getWrapper()方法將自己包裝成一個可序列化的WatcherEvent事件雕沿,以便通過網(wǎng)絡(luò)傳輸?shù)娇蛻舳恕猴仑?蛻舳嗽诮邮盏椒?wù)端的這個事件對象后审轮,首先會將WatcherEvent還原成一個WatchedEvent事件,并傳遞給process方法處理辽俗,回調(diào)方法process根據(jù)入?yún)⒕湍軌蚪馕龀鐾暾姆?wù)端事件了疾渣。
可以看到,WatchedEvent和WatcherEvent對ZooKeeper服務(wù)端事件的封裝都是很簡單的崖飘。服務(wù)端只會發(fā)送給客戶端一個事件類型榴捡,通知狀態(tài)和節(jié)點(diǎn)路徑,客戶端無法直接從該事件中獲取到對應(yīng)數(shù)據(jù)節(jié)點(diǎn)的原始數(shù)據(jù)內(nèi)容以及變更后的新數(shù)據(jù)內(nèi)容朱浴,也就是說需要客戶端在此主動去重新獲取數(shù)據(jù)吊圾。
工作機(jī)制
ZooKeeper的watcher機(jī)制,可以概括為以下三個過程:客戶單注冊Watcher翰蠢,服務(wù)端處理Watcher和客戶端回調(diào)Watcher
客戶端注冊Watch
public ZooKeeper(String connectString项乒,int sessionTimeout,Watcher watcher);
在創(chuàng)建一個ZooKeeper客戶單的實(shí)例時可以向構(gòu)造方法中傳入一個默認(rèn)的Watcher,這個Watcher將作為這個ZooKeeper會話期間的默認(rèn)Watcher梁沧,會一直被保存在客戶端ZKWatchManager的defaultWatcher中檀何。另外,ZooKeeper客戶端也可以通過getData,getChildren和exist三個接口來向ZooKeeper服務(wù)器注冊Watcher频鉴,無論使用哪種方式栓辜,注冊Watcher的工作原理都是一致的,這里我們以getData這個接口為例來說明砚殿。getData接口用于獲取指定節(jié)點(diǎn)的數(shù)據(jù)內(nèi)容啃憎,主要有兩個方法:
public byte[] getData(String path,boolean watch,Stat stat)
public byte[] getData(final String path,Watcher watcher,Stat stat)
這兩個接口上都可以進(jìn)行Watch的注冊,第一個接口通過一個boolean參數(shù)來標(biāo)識是否使用上文提到的默認(rèn)Watcher來進(jìn)行注冊似炎,具體的注冊邏輯和第二個接口是一致的辛萍。
在向getData接口注冊Watcher后,客戶端首先會對當(dāng)前客戶端請求request進(jìn)行標(biāo)記羡藐,將其設(shè)置為“使用Watcher監(jiān)聽”贩毕,同時會封裝一個Watcher的注冊信息WatchRegistration對象,用于暫時保存數(shù)據(jù)節(jié)點(diǎn)的路徑和Watcher的對應(yīng)關(guān)系仆嗦,具體的邏輯代碼如下:
? ? ?其中的cnxn是clientcnxn對象辉阶,幾個重要參數(shù)需要解釋一下,h是存放的類型瘩扼,比如這里是getdata谆甜,調(diào)用exist的情況下這里就是exist,request封裝了getdata方法中傳遞的path(節(jié)點(diǎn)路徑)和watcher集绰。在ZooKeeper中规辱,Packet數(shù)據(jù)包是最小的通信協(xié)議單元,用于進(jìn)行客戶端與服務(wù)端之間的網(wǎng)絡(luò)傳輸栽燕,任何需要傳輸?shù)念~對應(yīng)都需要包裝成一個Packet對象罕袋。因此,在ClientCnxn中WatchRegistration又會被封裝到Packet中碍岔,然后放入發(fā)送隊(duì)列(outgoingquen)中等待客戶端發(fā)送浴讯。
? ? ?隨后,ZooKeeper客戶端向服務(wù)端發(fā)送這個請求蔼啦,同時等待請求的返回榆纽。完成請求發(fā)送后,會由客戶端SendThread線程的readResponse方法負(fù)責(zé)接收來自服務(wù)端的響應(yīng)捏肢,finishPacket方法會從Packet中取出對應(yīng)的Watcher并注冊到ZkWatchManager中去掠河。
客戶端將Watcher對象給ZKWatchManager,最終保存到dataWatches中去猛计。ZKWatchManager.dataWatches是一個Map<String,Set<Watcher>>類型的數(shù)據(jù)結(jié)構(gòu)唠摹。這樣就完成了客戶端的注冊。主要做了兩件事情奉瘤,1.封裝發(fā)送到服務(wù)端的注冊Watcher請求勾拉,并且發(fā)送煮甥。2.請求發(fā)送后,客戶端的SendThread線程的readResponse方法接收到服務(wù)端的響應(yīng)后注冊Watcher到客戶端的ZkWatchManager中藕赞。
服務(wù)端處理Watcher
服務(wù)端接收Watcher后的存儲過程:
case OpCode.getData:{
? ? ...
? ? byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch()?cnxn:null);
? ? ? rsp = new GetDataResponse(b,stat);
? ? ? break;
}
從getData請求的處理邏輯中成肘,我們可以看到,當(dāng)getDataRequest.getWatch()為true的時候斧蜕,ZooKeeper就認(rèn)為當(dāng)前客戶端請求需要進(jìn)行Watcher注冊双霍,于是就會將當(dāng)前的ServerCnxn對象和數(shù)據(jù)節(jié)點(diǎn)路徑傳入getData方法中去。那么為什么要傳入ServerCnxn呢批销?ServerCnxn是一個ZooKeeper客戶端和服務(wù)器之間的連接接口洒闸,代表了一個客戶端和服務(wù)器的連接。ServerCnxn接口的默認(rèn)實(shí)現(xiàn)是NIOServerCnxn均芽,同時從3.4.0版本開始丘逸,引入了基于Netty的實(shí)現(xiàn):NettyServerCnxn。無論采用哪種實(shí)現(xiàn)方式掀宋,都實(shí)現(xiàn)了Watcher的process接口深纲,因此我們可以把ServerCnxn看作是一個Watcher對象。數(shù)據(jù)節(jié)點(diǎn)的節(jié)點(diǎn)路徑和ServerCnxn最終會被存儲在WatcherManager的watchTable和watch2Paths中劲妙。
WatchManager是ZooKeeper服務(wù)端Watcher的管理者湃鹊,其內(nèi)部管理的watchTable和watch2Pashs兩個存儲結(jié)構(gòu),分別從兩個維度對Watcher進(jìn)行存儲镣奋。
watchTable是從數(shù)據(jù)節(jié)點(diǎn)路徑的粒度來托管Watcher
watch2Paths是從Watcher的粒度來控制事件觸發(fā)需要觸發(fā)的數(shù)據(jù)節(jié)點(diǎn)币呵。
同時,WatchManager還負(fù)責(zé)Watcher事件的觸發(fā)唆途,并移除那些已經(jīng)被觸發(fā)的Watcher富雅。注意掸驱,WatchManager只是一個統(tǒng)稱肛搬,在服務(wù)端,DataTree中會托管兩個WatchManager毕贼,分別是dataWatches和childWatches温赔,分別對應(yīng)數(shù)據(jù)變更Watcher和子節(jié)點(diǎn)變更Watcher。在本例中鬼癣,因?yàn)槭莋etData接口陶贼,因此會被存儲在dataWatches中。
事件觸發(fā)
setdata()會觸發(fā)NodeDataChanged事件待秃。其中調(diào)用了triggerWatch方法拜秧,我們來看triggerWatch方法中又做了什么:
這個方法主要是做了事件的觸發(fā)。其過程如下:
第一步:封裝WatchedEvent
第二步:檢測watchTable中是否有對應(yīng)的Watcher
根據(jù)數(shù)據(jù)節(jié)點(diǎn)路徑從watchTable中取出對應(yīng)的Watcher章郁。如果沒有枉氮,說明沒有任何客戶端在該數(shù)據(jù)節(jié)點(diǎn)上注冊過Watcher志衍,直接退出。而如果找到了這個Watcher聊替,會將其提取出來楼肪,同時會直接從watchTable和watch2Paths中將其刪除——從這里我們也可以看出,Watcher在服務(wù)端是一次性的惹悄,即觸發(fā)一次就失效了春叫。
第三步.調(diào)用process方法來觸發(fā)Watcher。
在這一步中泣港,會逐個依次地調(diào)用從步驟2中找出的所有Water的process方法暂殖。那么這里的process方法究竟做了什么呢?在上文中我們已經(jīng)提到爷速,對于需要注冊Watcher的請求央星,ZooKeeper會把當(dāng)前請求對應(yīng)的ServerCnxn作為一個Watcher進(jìn)行存儲,因此惫东,這里的process方法莉给,事實(shí)上就是ServerCnxn的對應(yīng)方法:
標(biāo)記“-1”,標(biāo)識當(dāng)前是一個通知廉沮。
將WawtchedEvent包裝成WatcherEvent颓遏。
通過sendResponse向客戶端發(fā)送通知。
客戶端回調(diào)Watcher
客戶端收到服務(wù)端的響應(yīng)滞时,通過SendThread.readResponse()處理叁幢,解析到replyheader為-1,就知道這是一個通知了坪稽,通知事件主要是通過EventThread來處理的曼玩,所以這里再去調(diào)用EventThread.queueEvent方法。類似于服務(wù)端窒百,queueEvent方法首先會根據(jù)該通知事件黍判,從ZKWatchManager中取出所有相關(guān)的Watcher。
獲取到相關(guān)的額所有Watcher后篙梢,會將其放入waitingEvents這個隊(duì)列中去顷帖。WaitingEvents是一個待處理Watcher隊(duì)列,EventThread的run方法會不斷對該隊(duì)列進(jìn)行處理渤滞。