ZooKeeper是用來協(xié)調(同步)分布式進程的服務,多個分布式進程通過ZooKeeper提供的API來操作共享的ZooKeeper內存數(shù)據(jù)對象ZNode來達成某種一致的行為或結果骄酗,這種模式本質上是基于狀態(tài)共享的并發(fā)模型
。ZooKeeper實現(xiàn)這些分布式進程的狀態(tài)(ZNode的Data盒蟆、Children)共享時,基于性能的考慮采用了類似的異步非阻塞的主動通知模式即Watch機制讨惩,使得分布式進程之間的“共享狀態(tài)通信”更加實時高效荐捻。注意处面,這種共享也需要zookeeper使得分布式進程能夠順序執(zhí)行鸳君,保證結果的正確性或颊,Zab協(xié)議使得ZooKeeper的內部修改狀態(tài)操作直接是有序串行的囱挑。在此不討論zab協(xié)議平挑。
Zookeeper Watcher架構
Zookeeper Watcher流程
是客戶端向服務端的某個節(jié)點路徑上注冊一個watcher唆涝,客戶端同時會在本地watcherManager中存儲特定的watcher廊酣,當發(fā)生節(jié)點數(shù)據(jù)或者節(jié)點子節(jié)點變化時亡驰,服務端會通知客戶端節(jié)點變化信息,然后客戶端收到通知后透乾,會調用回調函數(shù)续徽。
實現(xiàn)原理
Watcher接口
:客戶端用來接收從 ZooKeeper 服務端發(fā)過來的消息并且同步地處理這些消息,如果要處理這個消息亲澡,需要為客戶端注冊一個 CallBack(回調)watcher對象。設計如下:
在 Watcher 接口里面纫版,除了回調函數(shù) process 以外床绪,還包含 KeeperState 和 EventType 兩個枚舉類,分別代表了事件發(fā)生時ZooKeeper連接狀態(tài)和事件的類型其弊。
根據(jù)特定的事件癞己,調用
process(WatchedEvent event)
方法對事件進行處理。
WatchedEvent
和 WatcherEvent
都表示的是同一個事物梭伐,都是對一個服務端事件的封裝痹雅。不同的是,WatchedEvent 是一個邏輯事件糊识,用于服務端和客戶端程序執(zhí)行過程中所需的邏輯對象,而 WatcherEvent 因為實現(xiàn)了序列化接口朴沿,因此可以用于網(wǎng)絡傳輸。
服務端在線程 WatchedEvent 事件之后览芳,會調用 getWrapper 方法將自己包裝成一個可序列化的 WatcherEvent 事屯仗,以便通過網(wǎng)絡傳輸?shù)娇蛻舳恕?蛻舳嗽诮邮盏椒斩说倪@個事件對象后,首先會將 WatcherEvent 事件還原成一個 WatchedEvent 事件,并傳遞給 process 方法處理,回調方法 process 根據(jù)入?yún)⒕湍軌蚪馕龀鐾暾姆斩耸录恕?/p>
客戶端注冊Watcher
涉及接口:
//創(chuàng)建zk客戶端對象實例時注冊,這個 Watcher 將作為整個 ZooKeeper 會話期間的默認 Watcher山林,
//會一直被保存在客戶端 ZKWatchManager 的 defaultWatcher 里面,
//如果這個被創(chuàng)建的節(jié)點在其它時候被創(chuàng)建watcher并注冊砂蔽,則這個默認的watcher會被覆蓋
//watcher觸發(fā)一次就會失效诡右,不管是創(chuàng)建節(jié)點時的 watcher 還是以后創(chuàng)建的 watcher.因為服務端每次觸發(fā)之后就會刪掉服務端的watcher
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
getChildren(String path, Watcher watcher)
//Boolean watch表示是否使用上下文中默認的watcher,即創(chuàng)建zk實例時設置的watcher
getChildren(String path, boolean watch)
getData(String path, boolean watch, Stat stat)
getData(String path, Watcher watcher, AsyncCallback.DataCallback cb, Object ctx)
exists(String path, boolean watch)
exists(String path, Watcher watcher)
在 ZooKeeper 中淑蔚,Packet 是一個最小的通信協(xié)議單元,即數(shù)據(jù)包。Pakcet 用于進行客戶端與服務端之間的網(wǎng)絡傳輸仓犬,任何需要傳輸?shù)膶ο蠖夹枰b成一個 Packet 對象。在 ClientCnxn 中 WatchRegistration 也會被封裝到 Pakcet 中,然后由 SendThread 線程調用 queuePacke 方法把 Packet 放入發(fā)送隊列中等待客戶端發(fā)送,這又是一個異步過程慧库,分布式系統(tǒng)采用異步通信是一個普遍認同的觀念橡羞。隨后滋觉,SendThread 線程會通過 readResponse 方法接收來自服務端的響應肺蔚,異步地調用 finishPacket 方法從 Packet 中取出對應的 Watcher 并注冊到 ZKWatchManager 中去仇冯。
WatcherRegistation 除了 Header 和 request 兩個屬性被傳遞到了服務端等缀,其他都沒有到服務端冒掌,否則服務端就容易出現(xiàn)內存緊張甚至溢出的危險铃诬,因為數(shù)據(jù)量太大了毫深。這就是 ZooKeeper 為什么適用于分布式環(huán)境的原因,它在網(wǎng)絡中傳輸?shù)氖窍ⅲ皇菙?shù)據(jù)包實體今阳。
服務端處理 Watcher 流程
對于注冊 Watcher 請求
膝舅,F(xiàn)inalRequestProcessor 的 ProcessRequest 方法會判斷當前請求是否需要注冊 Watcher耿芹,如果為 true,就會將當前的 ServerCnxn 對象和數(shù)據(jù)節(jié)點路徑傳入 getData 方法中去增蹭。ServerCnxn 是一個 ZooKeeper 客戶端和服務器之間的連接接口,代表了一個客戶端和服務器的連接庇忌,我們后面講到的 process 回調方法,實際上也是從這里回調的初茶,所以可以把 ServerCnxn 看作是一個 Watcher 對象爽待。數(shù)據(jù)節(jié)點的節(jié)點路徑和 ServerCnxn 最終會被存儲在 WatchManager 的 watchTable 和 watch2Paths 中等龙。WatchManager 負責 Watcher 事件的觸發(fā)涯捻,它是一個統(tǒng)稱摄欲,在服務端 DataTree 會托管兩個 WatchManager,分別是watchTable和 watch2Paths吼野,分別對應數(shù)據(jù)變更 Watcher 和節(jié)點變更 Watcher馏臭。
當DataTree中節(jié)點數(shù)據(jù)內容或版本發(fā)生變化或節(jié)點變更時
固逗,會調用相應方法去觸發(fā) WatchManager 的 triggerWatch 方法隘弊,該方法返回 ZNODE 的信息,自此進入到回調本地 process 的序列弥奸。
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
//將事件類型(EventType)缓溅、通知狀態(tài)(WatchedEvent)墅拭、節(jié)點路徑封裝成一個 WatchedEvent 對象
HashSet<Watcher> watchers;
synchronized (this) {
//根據(jù)數(shù)據(jù)節(jié)點的節(jié)點路徑從 watchTable 里面取出對應的 Watcher招拙。如果沒有找到 Watcher 對象塌衰,
//說明沒有任何客戶端在該數(shù)據(jù)節(jié)點上注冊過 Watcher服爷,直接退出匾旭。如果找打了 Watcher 就將其提取出來色瘩,
//同時會直接從 watchTable 和 watch2Paths 里刪除 Watcher,即 Watcher 是一次性的逸寓,觸發(fā)一次就失效了居兆。
watchers = watchTable.remove(path);
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
//對于需要注冊 Watcher 的請求,ZooKeeper 會把請求對應的ServerCnxn 作為一個 Watcher 存儲竹伸,
//所以這里調用的 process 方法實質上是 ServerCnxn 的對應方法
w.process(e);
}
return watchers;
}
ServerCnxn 類代碼
synchronized public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"Deliver event " + event + " to 0x"
+ Long.toHexString(this.sessionId)
+ " through " + this);
}
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();
sendResponse(h, e, "notification");
}
客戶端收到消息后泥栖,會調用 ClientCnxn 的 SendThread.readResponse 方法來進行統(tǒng)一處理,如清單所示勋篓。如果響應頭 replyHdr 中標識的 Xid 為 02吧享,表示是 ping,如果為-4譬嚣,表示是驗證包钢颂,如果是-1,表示這是一個通知類型的響應拜银,然后進行反序列化殊鞭、處理 chrootPath、還原 WatchedEvent盐股、回調 Watcher 等步驟钱豁,其中回調 Watcher 步驟將 WacthedEvent 對象交給 EventThread 線程,在下一個輪詢周期中進行 Watcher 回調疯汁。
Zookeeper Watcher特點
注冊只能確保一次消費
無論是服務端還是客戶端牲尺,一旦一個 Watcher 被觸發(fā),ZooKeeper 都會將其從相應的存儲中移除。因此谤碳,開發(fā)人員在 Watcher 的使用上要記住的一點是需要反復注冊溃卡。這樣的設計有效地減輕了服務端的壓力。如果注冊一個 Watcher 之后一直有效蜒简,那么針對那些更新非常頻繁的節(jié)點瘸羡,服務端會不斷地向客戶端發(fā)送事件通知,這無論對于網(wǎng)絡還是服務端性能的影響都非常大搓茬。
持久Watcher需要每次收到通知事件后重復注冊犹赖。
客戶端串行執(zhí)行
客戶端 Watcher 回調的過程是一個串行同步的過程,這為我們保證了順序卷仑,同時峻村,需要開發(fā)人員注意的一點是,千萬不要因為一個 Watcher 的處理邏輯影響了整個客戶端的 Watcher 回調锡凝。
輕量級設計
WatchedEvent 是 ZooKeeper 整個 Watcher 通知機制的最小通知單元粘昨,這個數(shù)據(jù)結構中只包含三部分的內容:通知狀態(tài)、事件類型和節(jié)點路徑窜锯。也就是說张肾,Watcher 通知非常簡單,只會告訴客戶端發(fā)生了事件锚扎,而不會說明事件的具體內容吞瞪。例如針對 NodeDataChanged 事件,ZooKeeper 的 Watcher 只會通知客戶指定數(shù)據(jù)節(jié)點的數(shù)據(jù)內容發(fā)生了變更工秩,而對于原始數(shù)據(jù)以及變更后的新數(shù)據(jù)都無法從這個事件中直接獲取到尸饺,而是需要客戶端主動重新去獲取數(shù)據(jù),這也是 ZooKeeper 的 Watcher 機制的一個非常重要的特性助币。另外浪听,客戶端向服務端注冊 Watcher 的時候,并不會把客戶端真實的 Watcher 對象傳遞到服務端眉菱,僅僅只是在客戶端請求中使用 boolean 類型屬性進行了標記迹栓,同時服務端也僅僅只是保存了當前連接的 ServerCnxn 對象。這樣輕量級的 Watcher 機制設計俭缓,在網(wǎng)絡開銷和服務端內存開銷上都是非常廉價的克伊。
參考資料:
ZooKeeper Watcher機制
Apache ZooKeeper Watcher 機制源碼解釋
品味ZooKeeper之Watcher機制