摘要
前面講完了watch的數(shù)據(jù)結(jié)構(gòu)以及在client和server端的統(tǒng)一管理
這一節(jié)講解watch的流程機制,不過這一系列本來是先講zk的數(shù)據(jù)結(jié)構(gòu)的衡创,講到watch數(shù)據(jù)結(jié)構(gòu),不講watch的機制又不合理
所以這一節(jié)會順帶簡單講解zk client與server的交互
本節(jié)主要講解
client端注冊watcher
client端watcher的注冊,管理
client端watcher在網(wǎng)絡(luò)請求中的體現(xiàn)
client端接收server回復(fù)時,watcher的注冊
server端處理watcher
收到client請求時進行ServerCnxn的注冊
觸發(fā)事件時,通過WatchManager找到相應(yīng)Watcher(ServerCnxn),進而通知對該事件感興趣的client
client端回調(diào)watcher
client端接收server的通知,調(diào)用queueEvent函數(shù)放在waitingEvents隊列中
ClientCnxn.EventThread#run調(diào)用ClientCnxn.EventThread#processEvent,消費waitingEvents哥艇,回調(diào)watcher.process()
client端注冊watcher
注冊方式
注冊方式主要分為兩種,都在Zookeeper類中僻澎,所在的方法列舉如下
1.注冊默認watcher
public synchronized void register(Watcher watcher)
構(gòu)造函數(shù)
2.getData,getChildren和exist請求中注冊自定義Watcher
public Stat exists(final String path, Watcher watcher)
public void exists(final String path, Watcher watcher, StatCallback cb, Object ctx)
public byte[] getData(final String path, Watcher watcher, Stat stat)
public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx)
public List<String> getChildren(final String path, Watcher watcher)
public void getChildren(final String path, Watcher watcher, ChildrenCallback cb, Object ctx)
public List<String> getChildren(final String path, Watcher watcher, Stat stat)
public void getChildren(final String path, Watcher watcher, Children2Callback cb, Object ctx)
第一種默認實現(xiàn)是client構(gòu)造Zookeeper對象時傳遞貌踏,會記錄在ZooKeeper.ZKWatchManager#defaultWatcher中,這種可以理解為"假"注冊(自己理解),因為它只是記錄了默認的watcher,但是并不一定存在于請求中
第二種是client特定請求注冊特定的watcher,這種可以理解為"真"注冊窟勃,只要請求得到的response沒問題祖乳,就會有相應(yīng)的記錄存在于ZooKeeper.ZKWatchManager中
但是對于ZK API請求來說,用不用默認watcher,底層注冊邏輯都是一樣的
比如getData利用默認的watcher秉氧,源碼如下
public byte[] getData(String path, boolean watch, Stat stat)
throws KeeperException, InterruptedException {
return getData(path, watch ? watchManager.defaultWatcher : null, stat);//實際上就是調(diào)用記錄好的defaultWatcher
}
實際上調(diào)用的還是public byte[] getData(final String path, Watcher watcher, Stat stat)函數(shù)眷昆,底層調(diào)用的和不用默認watcher函數(shù)是一樣的
client中watcher的注冊,管理
通過ZooKeeper.WatchRegistration進行管理,類以及子類如下
源碼如下
abstract class WatchRegistration {//client中管理watch注冊的類
private Watcher watcher;//注冊的watcher
private String clientPath;//監(jiān)聽的znode path
public WatchRegistration(Watcher watcher, String clientPath)
{
this.watcher = watcher;
this.clientPath = clientPath;
}
//根據(jù)response的resultCode來獲取所有注冊的path以及對應(yīng)的watcher集合
abstract protected Map<String, Set<Watcher>> getWatches(int rc);
/**
* Register the watcher with the set of watches on path.
* @param rc the result code of the operation that attempted to
* add the watch on the path.
*/
public void register(int rc) {//根據(jù)response的resultCode來注冊watcher到一個path
if (shouldAddWatch(rc)) {//如果可以添加
Map<String, Set<Watcher>> watches = getWatches(rc);//獲取所有注冊的path以及對應(yīng)的watcher集合
synchronized(watches) {
Set<Watcher> watchers = watches.get(clientPath);//找到該path
if (watchers == null) {
watchers = new HashSet<Watcher>();
watches.put(clientPath, watchers);
}
watchers.add(watcher);//添加當前watcher
}
}
}
/**
* Determine whether the watch should be added based on return code.
* @param rc the result code of the operation that attempted to add the
* watch on the node
* @return true if the watch should be added, otw false
*/
protected boolean shouldAddWatch(int rc) {//根據(jù)resultCode判斷是否可以添加watch
return rc == 0;
}
}
屬性clientPath和watcher分別是監(jiān)聽關(guān)注的znode的path和對應(yīng)處理的watcher
注冊邏輯就是根據(jù)response的resultCode汁咏,判斷是否可以添加watch亚斋,可以添加的話,就在Map<String, Set<Watcher>>添加記錄
這里可能疑惑攘滩,這個abstract protected Map<String, Set<Watcher>> getWatches(int rc);的實現(xiàn)是怎樣的
DataWatchRegistration getWatches返回 ZooKeeper.ZKWatchManager#dataWatches
ExistsWatchRegistration getWatches返回
@Override
protected Map<String, Set<Watcher>> getWatches(int rc) {
return rc == 0 ? watchManager.dataWatches : watchManager.existWatches;
}
ChildWatchRegistration getWatches返回 ZooKeeper.ZKWatchManager#childWatches
這里都涉及到ZKWatchManager的三個Map,dataWatches,existWatches,childWatches 具體可以參考前面watch的第一節(jié)client存儲相關(guān)部分
watcher在請求中,通過標志位發(fā)送給server
接著跟進上面的getData函數(shù)
public byte[] getData(final String path, Watcher watcher, Stat stat)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {//如果有watcher帅刊,就注冊
wcb = new DataWatchRegistration(watcher, clientPath);//生成一個DataWatchRegistration,即Data的watch的注冊
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();//生成請求頭
h.setType(ZooDefs.OpCode.getData);//設(shè)置請求類型為getData
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);//設(shè)置標志位,是否函數(shù)watch
GetDataResponse response = new GetDataResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);//client端提交請求
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
if (stat != null) {
DataTree.copyStat(response.getStat(), stat);
}
return response.getData();
}
上面函數(shù)中漂问,只要注意有注釋的部分厚掷,即watch相關(guān)部分的代碼
里面調(diào)用了ClientCnxn#submitRequestsubmitRequest 源碼如下
//提交請求
public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration)
throws InterruptedException {
ReplyHeader r = new ReplyHeader();//生成回復(fù)頭
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration);
synchronized (packet) {
while (!packet.finished) {//如果packet沒有處理完,就一直等著
packet.wait();
}
}
return r;
}
里面調(diào)用了ClientCnxn#queuePacket函數(shù)
在 ZooKeeper 中,Packet 是一個最小的通信協(xié)議單元级解,即數(shù)據(jù)包。
Pakcet 用于進行客戶端與服務(wù)端之間的網(wǎng)絡(luò)傳輸田绑,任何需要傳輸?shù)膶ο蠖夹枰b成一個 Packet 對象勤哗。
在 ClientCnxn 中 WatchRegistration 也會被封裝到 Packet 中,調(diào)用 queuePacket放入outgoingQueue即發(fā)送隊列中(生產(chǎn)packet)
然后SendThread 線程調(diào)用doTransport方法,從outgoingQueue中消費Packet,客戶端發(fā)送
queuePacket函數(shù)作為生產(chǎn)者,代碼中調(diào)用
outgoingQueue.add(packet);
然后ClientCnxn.SendThread作為消費者掩驱,run方法中調(diào)用ClientCnxnSocket#doTransport
參考實現(xiàn)類ClientCnxnSocketNIO#doTransport
里面調(diào)用了ClientCnxnSocketNIO#doIO
此時是發(fā)送請求芒划,調(diào)用了ClientCnxn.Packet#createBB
public void createBB() {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
boa.writeInt(-1, "len"); // We'll fill this in later
if (requestHeader != null) {
requestHeader.serialize(boa, "header");//序列化請求頭,包含xid和type
}
if (request instanceof ConnectRequest) {
request.serialize(boa, "connect");
// append "am-I-allowed-to-be-readonly" flag
boa.writeBool(readOnly, "readOnly");
} else if (request != null) {
request.serialize(boa, "request");//序列化request(對于特定請求如GetDataRequest,包含了是否存在watcher的標志位)
}
baos.close();
this.bb = ByteBuffer.wrap(baos.toByteArray());
this.bb.putInt(this.bb.capacity() - 4);
this.bb.rewind();
} catch (IOException e) {
LOG.warn("Ignoring unexpected exception", e);
}
}
這里有一點值得注意:
client的watcher并沒有進行網(wǎng)絡(luò)傳輸,server并不知道client的watcher觸發(fā)時process函數(shù)要怎么執(zhí)行
但是對于特定請求類型比如GetDataRequest欧穴,序列化的時候會傳遞一個標志位watch民逼,表示是否watch
server在處理的時候,只知道client是否watch某個path
發(fā)送請求的時候涮帘,watcher還并沒有注冊在client端拼苍,要等server的返回
請求回復(fù)后,watcher在client端注冊
ClientCnxn.SendThread中调缨,讀取server的回復(fù)
調(diào)用了ClientCnxnSocketNIO#doTransport
調(diào)用了ClientCnxnSocketNIO#doIO
調(diào)用了ClientCnxn.SendThread#readResponse
調(diào)用了ClientCnxn#finishPacket
private void finishPacket(Packet p) {//Packet請求發(fā)送疮鲫,收到回復(fù)吆你,進行處理之后
if (p.watchRegistration != null) {//如果有要注冊的watchRegistration
p.watchRegistration.register(p.replyHeader.getErr());//根據(jù)response code進行注冊
}
if (p.cb == null) {//如果沒有異步回調(diào)
synchronized (p) {
p.finished = true;
p.notifyAll();
}
} else {//如果有異步回調(diào)
p.finished = true;
eventThread.queuePacket(p);
}
}
里面只要注意watchRegistration最終進行了注冊就行,記錄在WatchRegistration的三個實現(xiàn)類中
client注冊watcher的小結(jié)
流程用自己的話說,如下
1.client發(fā)送getData,getChildren,exist請求時俊犯,傳入自定義的watcher妇多,或利用ZooKeeper構(gòu)造函數(shù)的默認Watcher
2.將請求封裝為Packet,在RequestHeader記錄是否需要watcher,記錄放入生產(chǎn)者隊列ClientCnxn#outgoingQueue
3.ClientCnxn.SendThread消費outgoingQueue
調(diào)用ClientCnxnSocketNIO#doTransport
調(diào)用ClientCnxnSocketNIO#doIO
調(diào)用ClientCnxn.Packet#createBB
序列化的時候,將request記性序列化燕侠,里面包含一個是否帶有watch的標志位(不包含watcher對象)
4.server進行相應(yīng)的處理,之后進行回復(fù)
可以參考FinalRequestProcessor#processRequest中對于getData的請求處理
利用getDataRequest.getWatch())者祖,看是否client需要watch,進而注冊到DataTree的WatchManager中绢彤,下面會講的
5.ClientCnxn.SendThread讀取回復(fù)
調(diào)用ClientCnxnSocketNIO#doTransport
調(diào)用ClientCnxnSocketNIO#doIO
調(diào)用ClientCnxn.SendThread#readResponse
調(diào)用ClientCnxn#finishPacket
利用response code七问,進行watcher的注冊,記錄在ZooKeeper.WatchRegistration對應(yīng)的實現(xiàn)類中
示意圖如下杖虾,下面會介紹server是如何處理client的watch注冊請求的
服務(wù)端處理watcher
服務(wù)端注冊watcher
上面只講了client發(fā)送注冊watcher的請求烂瘫,client根據(jù)server的response進行watcher的注冊,并沒有講解server是怎么處理請求的奇适,這里展開講解
時序圖如下
server前面的調(diào)用鏈這里不展開坟比,感興趣參考這個鏈接
就從FinalRequestProcessor#processRequest的處理開始講
針對getData的請求處理如下
case OpCode.getData: {//getData請求
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
getDataRequest);//反序列化出getDataRequest
DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}//驗證path對應(yīng)的node是否存在
PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ,
request.authInfo);//驗證ACL權(quán)限
Stat stat = new Stat();
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
getDataRequest.getWatch() ? cnxn : null);//如果有watch標志位,Watcher就傳cnxn
rsp = new GetDataResponse(b, stat);
break;
}
注意這里根據(jù)GetDataRequest對象的getWatch()方法嚷往,即client是否在這個path上有要注冊watcher葛账,有的話,就注冊cnxn
cnxn是一個ServerCnxn對象,ServerCnxn是什么
類圖如下
ServerCnxn 是一個 ZooKeeper 客戶端和服務(wù)器之間的連接接口皮仁,代表了一個客戶端和服務(wù)器的連接.
實現(xiàn)了Watcher接口籍琳,有兩個子類
作用就是:
這個Watcher的實現(xiàn)類記錄了client和server的連接,回調(diào)的時候贷祈,可以直接發(fā)送response告訴client趋急,有事件觸發(fā)了
在下面講server觸發(fā)Watcher的時候會詳細介紹
上面的FinalRequestProcessor#processRequest調(diào)用會進入
ZKDatabase#getData
DataTree#getData
public byte[] getData(String path, Stat stat, Watcher watcher)
throws KeeperException.NoNodeException {
DataNode n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
synchronized (n) {
n.copyStat(stat);
if (watcher != null) {
dataWatches.addWatch(path, watcher);//注冊watcher到dataWatches
}
return n.data;//返回byte[]
}
}
這里稍微提前了一點,講到了DataTree势誊,好在這里并不是很難理解,因為前面已經(jīng)講過WatchManager
WatchManager 負責 Watcher 事件的觸發(fā)呜达,它是一個統(tǒng)稱
在服務(wù)端 DataTree 會托管兩個 WatchManager,分別是 dataWatches 和 childWatches
分別對應(yīng)數(shù)據(jù)變更 Watcher 和子節(jié)點變更 Watcher粟耻。
因此查近,結(jié)合上面的時序圖,就可以理解請求是如何經(jīng)過ZKdatabase到DataTree最終記錄在WatchManager,這里就完成了watcher在服務(wù)端的注冊
服務(wù)端返回getData()給client這個參照時序圖就好了挤忙,這里不贅述
server觸發(fā)watch
上面的watch事件是針對getData的霜威,加入這個時候同樣path的znode有一個setData請求
(忽略前面的調(diào)用鏈)
進入到DataTree#setData
調(diào)用WatchManager#triggerWatch方法
在WatchManger那一節(jié)講過
//從指定的watcher集合supress 中篩選出要觸發(fā)的watcher,將剩下的watcher執(zhí)行對應(yīng)的回調(diào)
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
//構(gòu)建WatchedEvent
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this) {
watchers = watchTable.remove(path);
if (watchers == null || watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"No watchers for " + path);
}
return null;//如果watchTable沒有path這條記錄册烈,返回空
}
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);//在watch2Paths中刪掉[watcher,path]這種記錄
if (paths != null) {
paths.remove(path);
}
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {//從supress中過濾掉部分watcher(類似抑制觸發(fā))
continue;
}
w.process(e);//沒有被抑制的watcher進行回調(diào)
}
return watchers;//返回所有觸發(fā)的watcher
}
這里只要知道調(diào)用w.process(e)的時候w是什么
在server端注冊watch的時候講過戈泼,是ServerCnxn對象,以NIOServerCnxn這個子類的實現(xiàn)為例
@Override
synchronized public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);//xid為-1表示為通知
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"Deliver event " + event + " to 0x"
+ Long.toHexString(this.sessionId)
+ " through " + this);
}
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();//包裝為WatcherEvent來提供網(wǎng)絡(luò)傳輸
sendResponse(h, e, "notification");//給client發(fā)送請求,通知WatchedEvent的發(fā)生
}
也就是說,server觸發(fā)watcher,回調(diào)process函數(shù)其實就是告訴需要watch的client矮冬,WatcherEvent 發(fā)生了
服務(wù)端處理watcher小結(jié)
注冊時watcher是ServerCnxn類型谈宛,保存了和client的會話,如果client發(fā)送請求的時候,request的標志位watch為true胎署,server才會將這個會話注冊到WatchManager(否則server知道client對這個path不感興趣吆录,下次這個path變化了也不通知你)
觸發(fā)watcher時,就利用watchManager琼牧,找到path對應(yīng)的watcher即ServerCnxn恢筝,告訴連接的client方,發(fā)生了WatcherEvent,client自己再處理
client回調(diào)watcher
ClientCnxn.SendThread讀取回復(fù)
調(diào)用ClientCnxnSocketNIO#doTransport
調(diào)用ClientCnxnSocketNIO#doIO
調(diào)用ClientCnxn.SendThread#readResponse
里面處理事件通知的代碼段
if (replyHdr.getXid() == -1) {//-1代表通知類型 即WatcherEvent
// -1 means notification
if (LOG.isDebugEnabled()) {
LOG.debug("Got notification sessionid:0x"
+ Long.toHexString(sessionId));
}
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");//反序列化WatcherEvent
// convert from a server path to a client path
if (chrootPath != null) {//把serverPath轉(zhuǎn)化成clientPath
String serverPath = event.getPath();
if(serverPath.compareTo(chrootPath)==0)
event.setPath("/");
else if (serverPath.length() > chrootPath.length())
event.setPath(serverPath.substring(chrootPath.length()));
else {
LOG.warn("Got server path " + event.getPath()
+ " which is too short for chroot path "
+ chrootPath);
}
}
WatchedEvent we = new WatchedEvent(event);//WatcherEvent還原成WatchedEvent
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we + " for sessionid 0x"
+ Long.toHexString(sessionId));
}
eventThread.queueEvent( we );//加入隊列
return;
}
這里將WatchedEvent 放入生成隊列,調(diào)用ClientCnxn.EventThread#queueEvent
public void queueEvent(WatchedEvent event) {//將WatchedEvent加入隊列
if (event.getType() == EventType.None
&& sessionState == event.getState()) {
return;
}//驗證狀態(tài)???
sessionState = event.getState();
// materialize the watchers based on the event
WatcherSetEventPair pair = new WatcherSetEventPair(
watcher.materialize(event.getState(), event.getType(),
event.getPath()),
event);//用WatcherSetEventPair封裝watchers和watchedEvent
// queue the pair (watch set & event) for later processing
waitingEvents.add(pair);//加入隊列
}
waitingEvents的消費在ClientCnxn.EventThread#run中
最終調(diào)用了ClientCnxn.EventThread#processEvent
相應(yīng)處理回調(diào)的代碼塊是
if (event instanceof WatcherSetEventPair) {//如果是通知事件
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {//從WatcherSetEventPair這個wraper中取出watchers和event
try {
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
}
可能有個問題巨坊,哪里用到了client中watch的記錄,進行相應(yīng)的get和remove操作
答案就在ClientCnxn.EventThread#queueEvent中調(diào)用了ClientWatchManager#materialize方法撬槽,參考前面講ClientWatchManager的章節(jié)
這樣就完成了client中watcher的回調(diào)
思考
watcher與client server的網(wǎng)絡(luò)傳輸
client在Packet中封裝了WatchRegistration,但是在發(fā)送給server的時候趾撵,并沒有傳輸watcher對象
只是在Packet的Request對象中侄柔,存在一個標志位watch
server根據(jù)標志位進行處理,有標志位則記錄一個ServerCnxn
可以參考GetDataRequest#watch的get和set方法的引用
這也就是Watcher的輕量的特性體現(xiàn)
server watcher的注冊
server注冊的watcher是ServerCnxn的子類,它記錄了client和server的回話
回調(diào)函數(shù)process就是給client發(fā)通知占调,告訴client發(fā)生了怎么樣的WatchedEvent
然后client自己根據(jù)本地注冊的watcher去進行對應(yīng)的process
client注冊的watcher和server注冊的watcher有什么區(qū)別
作用和類型有區(qū)別
client注冊的watcher類型沒有限制,作用就是說client監(jiān)控到xx事件后干的事情暂题,比如重新獲取數(shù)據(jù)
server注冊的watcher都是ServerCnxn類型,作用就是告訴對應(yīng)client 發(fā)生了xx WatchedEvent就行
由于watcher并沒有直接在網(wǎng)絡(luò)進行傳輸究珊,所以兩者并不一樣
server怎么知道一個WatchedEvent觸發(fā)薪者,要通知哪些client
server的watch是ServerCnxn,保持了和Client的對話剿涮,直接回調(diào)process就行了
都是ServerCnxn(實現(xiàn)了Watcher)的功勞
觸發(fā)的粒度是什么言津,是(path)還是(path,EventType)
是path,監(jiān)聽的時候是監(jiān)聽一個path取试,只是對不同的 EventType進行不同的處理或者不處理
即使對所有EventType都不處理悬槽,server記錄對應(yīng)的path有響應(yīng)事件發(fā)生還是會告訴給client
參照WatchManager#triggerWatch
粒度是(path)
問題
ServerCnxn怎么保存client連接會話信息的
源碼還要自己再看
ClientCnxn.EventThread#queueEvent
里面驗證狀態(tài)是干嗎用,后續(xù)研究請求發(fā)出和處理response的時候再看
refer
《paxos到zk》第7章
https://www.ibm.com/developerworks/cn/opensource/os-cn-apache-zookeeper-watcher/index.html
http://www.cnblogs.com/leesf456/p/6291004.html
http://blog.csdn.net/u012291108/article/details/59698624
http://blog.csdn.net/qianshangding0708/article/details/50084155