1. zkClient 介紹
zkClient 是基于 原生 zookeeper包下開發(fā)的客戶端钓简,解決原生API出現(xiàn)的超時重連(session失效重連)逗余、重復(fù)注冊watcher、異常處理.
該zkClient 已經(jīng)用在Dubbo, Kafka, Helix中闲擦。
2. zkClient 的類圖
3. zkClient 組件說明
zkClient: 利用該類的構(gòu)造函數(shù)對其進行連接zk宣羊。 使用過zkClient 連接舷嗡,會發(fā)現(xiàn)其底層使用了zookeeper原生的api 對zk進行連接的扫外。
IZKConnection的實現(xiàn)類ZkConnection, 就是設(shè)置一些基本的屬性廓脆, 比如
_zk : ZooKeeper 對象
_zookeeperLock: Lock對象
_servers: 連接字符ip:port
_sessionTimeOut :
前面有一篇文章中筛谚,已經(jīng)說了,使用ZooKeeper客戶端來注冊watcher有幾種方法:
1停忿、創(chuàng)建ZooKeeper對象時指定默認的Watcher
2驾讲、exists()
3、getData()
4席赂、getchildren
其中g(shù)etdata,exists注冊的是某個節(jié)點的事件處理器(watcher)吮铭,getchildren注冊的是子節(jié)點的事件處理器(watcher)。
而在ZKClient中颅停,根據(jù)事件類型谓晌,分為了節(jié)點事件(數(shù)據(jù)事件)、子節(jié)點事件癞揉。
對應(yīng)的事件處理器則是IZKDataListener和IZKChildListener纸肉。另外加入了Session相關(guān)的事件和事件處理器。
4. 啟動ZKClient流程
在創(chuàng)建ZKClient對象時喊熟,就完成了到ZooKeeper服務(wù)器連接的建立柏肪。具體過程是這樣的:
1、 啟動時芥牌,指定好connection string烦味,連接超時時間,序列化工具等胳泉。
2拐叉、 創(chuàng)建并啟動eventThread岩遗,用于接收事件扇商,并調(diào)度事件監(jiān)聽器Listener的執(zhí)行。
3宿礁、 連接到zookeeper服務(wù)器案铺,同時將ZKClient自身作為默認的Watcher。
5. 代碼流程
構(gòu)造zkClient 對象梆靖, 其中會運行connect方法控汉,
public ZkClient(IZkConnection zkConnection, int connectionTimeout, ZkSerializer zkSerializer, long operationRetryTimeout) {
this._childListener = new ConcurrentHashMap();
this._dataListener = new ConcurrentHashMap();
this._stateListener = new CopyOnWriteArraySet();
this._zkEventLock = new ZkLock();
if (zkConnection == null) {
throw new NullPointerException("Zookeeper connection is null!");
} else {
this._connection = zkConnection;
this._zkSerializer = zkSerializer;
this._operationRetryTimeoutInMillis = operationRetryTimeout;
this._isZkSaslEnabled = this.isZkSaslEnabled();
this.connect((long)connectionTimeout, this);
}
}
// 啟動連接
public void connect(long maxMsToWaitUntilConnected, Watcher watcher) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException {
boolean started = false;
this.acquireEventLock();
try {
this.setShutdownTrigger(false);
this._eventThread = new ZkEventThread(this._connection.getServers());
this._eventThread.start(); // ZkEventThread 是用來處理事件的線程
this._connection.connect(watcher); // 啟動連接
LOG.debug("Awaiting connection to Zookeeper server");
boolean waitSuccessful = this.waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS);
// 等待連接,連接成功之后會返回狀態(tài)
if (!waitSuccessful) {
throw new ZkTimeoutException("Unable to connect to zookeeper server '" + this._connection.getServers() + "' with timeout of " + maxMsToWaitUntilConnected + " ms");
}
started = true;
} finally {
this.getEventLock().unlock();
if (!started) {
this.close();
}
}
}
ZkEventThread.start
public void run() {
LOG.info("Starting ZkClient event thread.");
int eventId;
try {
for(; !this.isInterrupted(); LOG.debug("Delivering event #" + eventId + " done")) {
ZkEventThread.ZkEvent zkEvent = (ZkEventThread.ZkEvent)this._events.take(); // 獲取相應(yīng)的事件返吻, 這些事件是一些ZkEventThread.Event的實現(xiàn)類, 比如 childChangeEvent, newSessionEvent, DataChangeEvent事件
eventId = _eventId.incrementAndGet();
LOG.debug("Delivering event #" + eventId + " " + zkEvent);
try {
zkEvent.run();
} catch (InterruptedException var4) {
this.interrupt();
} catch (ZkInterruptedException var5) {
this.interrupt();
} catch (Throwable var6) {
LOG.error("Error handling event " + zkEvent, var6);
}
}
} catch (InterruptedException var7) {
LOG.info("Terminate ZkClient event thread.");
}
}
6. 為節(jié)點注冊Watcher
ZooKeeper的三個方法:getData姑子、getChildren、exists测僵,ZKClient都提供了相應(yīng)的代理方法街佑。就拿exists來看:
可以看到谢翎,是否注冊watcher,由hasListeners(path)來決定的沐旨。
hasListeners就是看有沒有與該數(shù)據(jù)節(jié)點綁定的listener森逮。
所以呢,默認情況下都會自動的為指定的path注冊watcher磁携,并且是默認的watcher(ZKClient)褒侧。怎么才能讓hasListeners判定值為true呢,也就是怎么才能為path綁定Listener呢谊迄?
ZKClient提供了訂閱功能:
一個新建的會話闷供,只需要在取得響應(yīng)的數(shù)據(jù)節(jié)點后,調(diào)用subscribteXxx就可以訂閱上相應(yīng)的事件了统诺。
zkClient 是使用listener來為節(jié)點注冊watcher的这吻, 這個watcher一般是zkClient本身。zkClient 是利用subscribteXxx來為path 訂閱listener事件的篙议。
7. ZooKeeper的變更操作
Zookeeper中提供的變更操作有:節(jié)點的創(chuàng)建唾糯、刪除,節(jié)點數(shù)據(jù)的修改鬼贱。
-
創(chuàng)建操作移怯,數(shù)據(jù)節(jié)點分為四種,ZKClient分別為他們提供了相應(yīng)的代理:
1 -
刪除節(jié)點的操作:
2 -
修改節(jié)點數(shù)據(jù)的操作:
3
writeDataReturnStat():寫數(shù)據(jù)并返回數(shù)據(jù)的狀態(tài)这难。
updateDataSerialized():修改已序列化的數(shù)據(jù)舟误。執(zhí)行過程是:先讀取數(shù)據(jù),然后使用DataUpdater對數(shù)據(jù)修改姻乓,最后調(diào)用writeData將修改后的數(shù)據(jù)發(fā)送給服務(wù)端嵌溢。
8.客戶端處理變更
前面已經(jīng)知道,ZKClient是默認的Watcher蹋岩,并且在為各個數(shù)據(jù)節(jié)點注冊的Watcher都是這個默認的Watcher赖草。
那么該是如何將各種事件通知給相應(yīng)的Listener呢?
處理過程大致可以概括為下面的步驟:
1剪个、判斷變更類型:變更類型分為State變更秧骑、ChildNode變更(創(chuàng)建子節(jié)點、刪除子節(jié)點扣囊、修改子節(jié)點數(shù)據(jù))乎折、
NodeData變更(創(chuàng)建指定node,刪除節(jié)點侵歇,節(jié)點數(shù)據(jù)變更)骂澄。
2、取出與path關(guān)聯(lián)的Listeners惕虑,并為每一個Listener創(chuàng)建一個ZKEvent坟冲,將ZkEvent交給ZkEventThread處理士修。
3、ZkEventThread線程樱衷,拿到ZkEvent后棋嘲,只需要調(diào)用ZkEvent的run方法進行處理。
從這里也可以知道矩桂,具體的怎么如何調(diào)用Listener卷玉,還要依賴于ZkEvent的run()實現(xiàn)了策肝。
9.序列化處理
ZooKeeper中察迟,會涉及到序列化戈咳、反序列化的操作有兩種:getData、setData癞蚕。
在ZKClient中蕊爵,分別用readData、writeData來替代了桦山。
對于readData:先調(diào)用zookeeper的getData攒射,然后進行使用ZKSerializer進行反序列化工作。
對于writeData:先使用ZKSerializer將對象序列化后恒水,再調(diào)用zookeeper的setData会放。
10.ZkClient如何解決使用ZooKeeper客戶端遇到的問題的呢?
- Watcher自動重注冊:這個要是依賴于hasListeners()的判斷钉凌,來決定是否再次注冊咧最。
有就將watcher注冊到path 路勁下 - Session失效重連:如果發(fā)現(xiàn)會話過期,就先關(guān)閉已有連接御雕,再重新建立連接矢沿。
- 異常處理:對比ZooKeeper和ZKClient,就可以發(fā)現(xiàn)ZooKeeper的所有操作都是拋異常的酸纲,
而ZKClient的所有操作捣鲸,都不會拋異常的。在發(fā)生異常時福青,它或做日志摄狱,或返回空,或做相應(yīng)的Listener調(diào)用无午。
相比于ZooKeeper官方客戶端,使用ZKClient時祝谚,只需要關(guān)注實際的Listener實現(xiàn)即可宪迟。
所以這個客戶端,還是推薦大家使用的交惯。
log: 遇到的問題:
1. Unable to connect to zookeeper server xxx:port with timeout of 10000
解決:
本地虛擬機次泽,以standlone模式啟動穿仪, 連接時,出現(xiàn)這個問題意荤,說明連接超時啊片,將connect time 設(shè)置大點就可以了
PS: 若你覺得可以、還行玖像、過得去紫谷、甚至不太差的話,可以“關(guān)注”或者“點贊”一下捐寥,就此謝過!