前言
初學(xué) Zookeeper 會(huì)發(fā)現(xiàn)客戶端有兩種回調(diào)方式: Watcher 和 AsyncCallback艘包,而 Zookeeper 的使用是離不開這兩種方式的,搞清楚它們之間的區(qū)別與實(shí)現(xiàn)顯得尤為重要爹凹。本文將圍繞下面幾個(gè)方面展開
- Watcher 和 AsyncCallback 的區(qū)別
- Watcher 的回調(diào)實(shí)現(xiàn)
- AsyncCallback 的回調(diào)實(shí)現(xiàn)
- IO 與事件處理
Watcher 和 AsyncCallback 的區(qū)別
我們先通過(guò)一個(gè)例子來(lái)感受一下:
zooKeeper.getData(root, new Watcher() {
public void process(WatchedEvent event) {
}
}, new AsyncCallback.DataCallback() {
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
}
}, null);
可以看到,getData
方法可以同時(shí)設(shè)置兩個(gè)回調(diào):Watcher 和 AsyncCallback,同樣是回調(diào)硼补,它們的區(qū)別是什么呢?要解決這個(gè)問題沛鸵,我們就得從這兩個(gè)接口的功能入手括勺。
-
Watcher
:Watcher
是用于監(jiān)聽節(jié)點(diǎn),session 狀態(tài)的曲掰,比如getData
對(duì)數(shù)據(jù)節(jié)點(diǎn)a
設(shè)置了watcher
疾捍,那么當(dāng)a
的數(shù)據(jù)內(nèi)容發(fā)生改變時(shí),客戶端會(huì)收到NodeDataChanged
通知栏妖,然后進(jìn)行watcher
的回調(diào)乱豆。 -
AsyncCallback
:AsyncCallback
是在以異步方式使用 ZooKeeper API 時(shí),用于處理返回結(jié)果的吊趾。例如:getData
同步調(diào)用的版本是:byte[] getData(String path, boolean watch,Stat stat)
宛裕,異步調(diào)用的版本是:void getData(String path,Watcher watcher,AsyncCallback.DataCallback cb,Object ctx)
,可以看到论泛,前者是直接返回獲取的結(jié)果揩尸,后者是通過(guò)AsyncCallback
回調(diào)處理結(jié)果的。
Watcher
Watcher 主要是通過(guò)ClientWatchManager
進(jìn)行管理的屁奏。下面是 Watcher 相關(guān)類圖
添加 Watcher 的流程如下:
Watcher 的類型
ClientWatchManager
中有四種Watcher
-
defaultWatcher
:創(chuàng)建Zookeeper
連接時(shí)傳入的Watcher
岩榆,用于監(jiān)聽 session 狀態(tài) -
dataWatches
:存放getData
傳入的Watcher
-
existWatches
:存放exists
傳入的Watcher
,如果節(jié)點(diǎn)已存在,則Watcher
會(huì)被添加到dataWatches
-
childWatches
:存放getChildren
傳入的Watcher
從代碼上可以發(fā)現(xiàn)勇边,監(jiān)聽器是存在HashMap
中的犹撒,key
是節(jié)點(diǎn)名稱path
,value
是Set<Watcher>
private final Map<String, Set<Watcher>> dataWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches =
new HashMap<String, Set<Watcher>>();
private volatile Watcher defaultWatcher;
通知的狀態(tài)類型與事件類型
在Watcher
接口中,已經(jīng)定義了所有的狀態(tài)類型和事件類型
-
KeeperState.Disconnected(0)
此時(shí)客戶端處于斷開連接狀態(tài),和ZK集群都沒有建立連接奕删。
-
EventType.None(-1)
觸發(fā)條件:一般是在與服務(wù)器斷開連接的時(shí)候,客戶端會(huì)收到這個(gè)事件祥款。
-
-
KeeperState. SyncConnected(3)
此時(shí)客戶端處于連接狀態(tài)
-
EventType.None(-1)
觸發(fā)條件:客戶端與服務(wù)器成功建立會(huì)話之后,會(huì)收到這個(gè)通知月杉。
-
EventType. NodeCreated (1)
觸發(fā)條件:所關(guān)注的節(jié)點(diǎn)被創(chuàng)建镰踏。
-
EventType. NodeDeleted (2)
觸發(fā)條件:所關(guān)注的節(jié)點(diǎn)被刪除。
-
EventType. NodeDataChanged (3)
觸發(fā)條件:所關(guān)注的節(jié)點(diǎn)的內(nèi)容有更新沙合。注意奠伪,這個(gè)地方說(shuō)的內(nèi)容是指數(shù)據(jù)的版本
號(hào)dataVersion
。因此首懈,即使使用相同的數(shù)據(jù)內(nèi)容來(lái)更新绊率,還是會(huì)收到這個(gè)事件通知的。無(wú)論如何究履,調(diào)用了更新接口滤否,就一定會(huì)更新dataVersion
的。 -
EventType. NodeChildrenChanged (4)
觸發(fā)條件:所關(guān)注的節(jié)點(diǎn)的子節(jié)點(diǎn)有變化最仑。這里說(shuō)的變化是指子節(jié)點(diǎn)的個(gè)數(shù)和組成藐俺,具體到子節(jié)點(diǎn)內(nèi)容的變化是不會(huì)通知的。
-
-
KeeperState. AuthFailed(4)
認(rèn)證失敗
- EventType.None(-1)
-
KeeperState. Expired(-112)
session 超時(shí)
- EventType.None(-1)
materialize 方法
ClientWatchManager
只有一個(gè)方法泥彤,那就是materialize
欲芹,它根據(jù)事件類型type
和path
返回監(jiān)聽該節(jié)點(diǎn)的特定類型的Watcher
。
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
Watcher.Event.EventType type, String path);
核心邏輯如下:
-
type == None
:返回所有Watcher
吟吝,也就是說(shuō)所有的Watcher
都會(huì)被觸發(fā)菱父。如果disableAutoWatchReset == true
且當(dāng)前state != SyncConnected
,那么還會(huì)清空Watcher
剑逃,意味著移除所有在節(jié)點(diǎn)上的Watcher
浙宜。 -
type == NodeDataChanged | NodeCreated
:返回監(jiān)聽path
節(jié)點(diǎn)的dataWatches & existWatches
-
type == NodeChildrenChanged
:返回監(jiān)聽path
節(jié)點(diǎn)的childWatches
-
type == NodeDeleted
:返回監(jiān)聽path
節(jié)點(diǎn)的dataWatches | childWatches
每次返回都會(huì)從HashMap
中移除節(jié)點(diǎn)對(duì)應(yīng)的Watcher
,例如:addTo(dataWatches.remove(clientPath), result);
蛹磺,這就是為什么Watcher
是一次性的原因(defaultWatcher
除外)粟瞬。值得注意的是,由于使用的是HashSet
存儲(chǔ)Watcher
萤捆,重復(fù)添加同一個(gè)實(shí)例的Watcher
也只會(huì)被觸發(fā)一次裙品。
AsyncCallback
Zookeeper 的exists
,getData
,getChildren
方法都有異步的版本乓梨,它們與同步方法的區(qū)別僅僅在于是否等待響應(yīng),底層發(fā)送都是通過(guò)sendThread
異步發(fā)送的清酥。下面我們用一幅圖來(lái)說(shuō)明:
上面的圖展示了同步/異步調(diào)用
getData
的流程,其他方法也是類似的蕴侣。
IO 與事件處理
Zookeeper 客戶端會(huì)啟動(dòng)兩個(gè)常駐線程
-
SendThread
:負(fù)責(zé) IO 操作焰轻,包括發(fā)送,接受響應(yīng)昆雀,發(fā)送 ping 等辱志。 -
EventThread
:負(fù)責(zé)處理事件,執(zhí)行回調(diào)函數(shù)狞膘。
readResponse
readResponse
是SendThread
處理響應(yīng)的核心函數(shù)揩懒,核心邏輯如下:
接受服務(wù)器的響應(yīng),并反序列化出
ReplyHeader
: 有一個(gè)單獨(dú)的線程SendThread
挽封,負(fù)責(zé)接收服務(wù)器端的響應(yīng)已球。假設(shè)接受到的服務(wù)器傳遞過(guò)來(lái)的字節(jié)流是incomingBuffer
,那么就將這個(gè)incomingBuffer
反序列化為ReplyHeader
辅愿。-
判斷響應(yīng)類型:判斷
ReplyHeader
是Watcher
響應(yīng)還是AsyncCallback
響應(yīng):ReplyHeader.getXid()
存儲(chǔ)了響應(yīng)類型智亮。- 如果是
Watcher
類型響應(yīng):從ReplyHeader
中創(chuàng)建WatchedEvent
,WatchedEvent
里面存儲(chǔ)了節(jié)點(diǎn)的路徑点待,然后去WatcherManager
中找到和這個(gè)節(jié)點(diǎn)相關(guān)聯(lián)的所有Watcher
阔蛉,將他們寫入到EventThread
的waitingEvents
中。 - 如果是
AsyncCallback
類型響應(yīng):從ReplyHeader
中讀取response
癞埠,這個(gè)response
描述了是Exists状原,setData,getData苗踪,getChildren颠区,create.....
中的哪一個(gè)異步回調(diào)。從pendingQueue
中拿到Packet
通铲,Packet
中的cb
存儲(chǔ)了AsyncCallback
瓦呼,也就是異步 API 的結(jié)果回調(diào)。最后將Packet
寫入到EventThread
的waitingEvents
中测暗。
- 如果是
processEvent
processEvent
是EventThread
處理事件核心函數(shù)央串,核心邏輯如下:
- 如果
event instanceof WatcherSetEventPair
,取出pair
中的Watchers
碗啄,逐個(gè)調(diào)用watcher.process(pair.event)
- 否則
event
為AsyncCallback
质和,根據(jù)p.response
判斷為哪種響應(yīng)類型,執(zhí)行響應(yīng)的回調(diào)processResult
稚字。
可見饲宿,Watcher
和AsyncCallback
都是由EventThread
處理的厦酬,通過(guò)processEvent
進(jìn)行區(qū)分處理。
總結(jié)
Zookeeper 客戶端中Watcher
和AsyncCallback
都是異步回調(diào)的方式瘫想,但它們回調(diào)的時(shí)機(jī)是不一樣的仗阅,前者是由服務(wù)器發(fā)送事件觸發(fā)客戶端回調(diào),后者是在執(zhí)行了請(qǐng)求后得到響應(yīng)后客戶端主動(dòng)觸發(fā)的国夜。它們的共同點(diǎn)在于都需要在獲取了服務(wù)器響應(yīng)之后减噪,由SendThread
寫入EventThread
的waitingEvents
中,然后由EventThread
逐個(gè)從事件隊(duì)列中獲取并處理车吹。
參考資料
ZooKeeper個(gè)人筆記客戶端watcher和AsycCallback回調(diào)
【ZooKeeper Notes 13】ZooKeeper Watcher的事件通知類型