Java版本的Api主要涉及到兩個包:org.apache.zookeeper和org.apache.zookeeper.data
其余的包要么是Zookeeper內部使用盖桥,要么是Zookeeper服務實現(xiàn)的一部分亚斋。Java Client主要使用ZooKeeper這個類劲适,這個類有兩個構造方法碾篡,主要區(qū)別在于可以選擇是否傳入會話id和password铡羡。Zookeeper支持跨進程的會話id恢復機制拭嫁。有時一個Java應用程序會把會話id和password放在穩(wěn)定存儲服務中,當程序重啟時可以用于恢復早期的會話浮入。當創(chuàng)建好一個Zookeeper對象以后龙优,會新建兩個線程,一個IO線程和一個event線程事秀。IO線程基于Java Nio實現(xiàn)陋率,所有的輸入輸入操作都是通過此線程來完成。而所有的事件回調通過even線程來完成秽晚。會話保持比如重連、發(fā)送心跳由IO線程來完成筒愚。同步方法的響應也在IO線程里處理赴蝇。異步方法回調和監(jiān)聽事件的響應都是通過event線程來完成的。有幾個需要注意的點:
異步方法執(zhí)行和監(jiān)聽事件回調將按照完成順序有序執(zhí)行巢掺。調用者可以對回調事件進行任何處理句伶,但是同一時間不會處理到其他回調。
回調不會阻塞IO線程或者同步調用的執(zhí)行陆淀。
同步調用可能不會按照正確的順序返回考余。舉個栗子:假設客戶端發(fā)起一個異步讀/a請求,并且啟動/a的監(jiān)聽器轧苫,當成功讀取數(shù)據(jù)并且執(zhí)行異步回調的時候楚堤,在執(zhí)行邏輯里又發(fā)起了一個同步去/a的請求(雖然這種做法不推薦,但是代碼邏輯是沒有問題的),當兩次讀操作之間身冬,如果/a數(shù)據(jù)發(fā)生了變化衅胀,客戶端在獲取到同步讀結果之前,會收到/a發(fā)生變化的通知酥筝,但是由于事件隊列被異步回調阻塞住了(客戶端同一時間不會處理其他回調)滚躯,導致在變更event被執(zhí)行之前,同步讀操作會獲取到/a變化后的值嘿歌。
最后掸掏,來看下客戶端關閉邏輯。一旦Zookeeper被關閉或者收到了比如會話過期宙帝、授權失敗等事件丧凤,則Zookeeper對象會失效。在結束時茄唐,兩個線程關閉息裸,此時不應該對zookeeper句柄再進行任何操作。
1沪编、new ZooKeeper(String connectString,int sessionTimeout, Watcher watcher)發(fā)生的主要動作:
2呼盆、zookeeper.exists(String path,boolean watch, StatCallback cb, Object ctx)添加監(jiān)聽器的主要邏輯:
看源碼需要具備的知識儲備:java NIO,可以參考https://ifeve.com/overview/
3、代碼分析
基本上所有操作入口都在org.apache.zookeeper.Zookeeper
重點看一下ClientCnxn:創(chuàng)建了SendThread和EventThread兩個線程蚁廓,用于處理I/O和event访圃,
然后我們看下start()方法,就是啟動線程
再看下sendTread的run()方法相嵌,state的默認值是NOT_CONNECTED腿时,所以肯定能進到while里,并且由于當前沒有連接饭宾,所以會進入到startConnect()方法里批糟。
看下startConnect()方法,首先創(chuàng)建一個socketChanel看铆,然后調用registerAndConnect()將通道和地址綁定徽鼎。
接著看下registerAndConnect,首先像selector注冊socket連接事件弹惦,然后和addr建立連接否淤,最后primeConnection創(chuàng)建會話
看一下連接信息的填充方法primeConnection:主要是往outgoingQueue里寫和服務端的交互指令。這里包括重新注冊監(jiān)聽器棠隐,權限校驗信息寫入石抡。最后通過enableReadWriteOnly打開讀寫開關
再回過頭來看sendTread的run()方法。socket連接建立助泽,并且填充好基本的數(shù)據(jù)以后啰扛,接下來就是和服務端的數(shù)據(jù)傳輸了嚎京,主要是doTransport方法,涉及到zookeeper通信協(xié)議侠讯,這里暫時不展開
再來看下數(shù)據(jù)操作挖藏,我們以create為例:主要封裝在submitRequest里,最終會將數(shù)據(jù)存在outgoingQueue里
最后看下事件監(jiān)聽的部分:
首先通過exists(znode,true,watcher,Object)來啟用服務端的監(jiān)聽機制厢漩。第一步先通過ExistsWatchRegistration將watcher和znode綁定膜眠,然后將查找指令放到outgoingQueue傳送給服務端。
然后SendThread的readResponse用于處理服務端返回溜嗜,里面會根據(jù)path和事件類型宵膨,將新的event入隊。
void readResponse(ByteBuffer incomingBuffer)throws IOException {
WatchedEventwe =new WatchedEvent(event);
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we +" for sessionid 0x"
? ? ? ? ? ? + Long.toHexString(sessionId));
}
eventThread.queueEvent( we );//最后會將事件
}
接下來EventThread里run方法炸宵,通過processEvent(event)來處理event
可以看到processEvent方法里辟躏,最終會根據(jù)事件類型決定是否調用watcher.process呢還是調用statCallback.processResult()。
上述主要分析了client和server端會話建立的過程土全,以及具體的通信邏輯捎琐,還有event回調機制。缺少的一部分是客戶端如何獲取到服務端觸發(fā)的事件裹匙,這個和通信協(xié)議有關瑞凑,放到后面單獨來看。
參考文章:http://www.reibang.com/p/06e859181cc0
參考文章:https://blog.csdn.net/quhongwei_zhanqiu/article/details/45825975