來源: https://www.cnblogs.com/f1194361820/p/5519227.html
https://www.cnblogs.com/duanxz/p/3783266.html
zookeeper結(jié)構(gòu)圖
創(chuàng)建ZooKeeper對象時性雄,應(yīng)會創(chuàng)建一個ClientCnxn(代表了客戶端連接對象)。與此同時啟動了兩個線程:SendThread、EventThread妙蔗。兩個隊列:outgoingQueue和pendingQueue愉择。
模塊:
我們可以認為ZK的Client由三個主要模塊組成:Zookeeper, WatcherManager, ClientCnxn
Zookeeper是ZK Client端的真正接口碍庵,用戶可以操作的最主要的類了赌,當(dāng)用戶創(chuàng)建一個Zookeeper實例以后巩那,幾乎所有的操作都被這個實例包辦了吏夯,用戶不用關(guān)心怎么連接到Server,Watcher什么時候被觸發(fā)等等令人傷神的問題即横。
WatcherManager噪生,顧名思義,它是用來管理Watcher的东囚,Watcher是ZK的一大特色功能跺嗽,允許多個Client對一個或多個ZNode進行監(jiān)控,當(dāng)ZNode有變化時能夠通知到監(jiān)控這個ZNode的各個Client。我們把一個ZK Client簡單看成一個Zookeeper實例桨嫁,那么這個實例內(nèi)部的WatcherManager就管理了ZK Client綁定的所有Watcher植兰。
ClientCnxn是管理所有網(wǎng)絡(luò)IO的模塊,所有和ZK Server交互的信息和數(shù)據(jù)都經(jīng)過這個模塊璃吧,包括給ZK Server發(fā)送Request楣导,從ZK Server接受Response,以及從ZK Server接受Watcher Event畜挨。ClientCnxn完全管理了網(wǎng)絡(luò)筒繁,從外部看來網(wǎng)絡(luò)操作是透明的。
線程:
每當(dāng)我們創(chuàng)建一個Zookeeper實例的時候巴元,會有兩個線程被創(chuàng)建:SendThread和EventThread毡咏。所以當(dāng)我們使用ZK Client端的時候應(yīng)該盡量只創(chuàng)建一個Zookeeper實例并反復(fù)使用。大量的創(chuàng)建銷毀Zookeeper實例不僅會反復(fù)的創(chuàng)建和銷毀線程逮刨,而且會在Server端創(chuàng)建大量的Session呕缭。
SendThread是真正處理網(wǎng)絡(luò)IO的線程,所有通過網(wǎng)絡(luò)發(fā)送和接受的數(shù)據(jù)包都在這個線程中處理修己。這個線程的主體是一個while循環(huán):
while (zooKeeper.state.isAlive()) {
try {
if (sockKey == null) {
// don’t re-establish connection if we are closing
if (closing) {
break;
}
startConnect();
lastSend = now;
lastHeard = now;
}
… ….
selector.select(to);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
// Everything below and until we get back to the select is
// non blocking, so time is effectively a constant. That is
// Why we just have to do this once, here
now = System.currentTimeMillis();
for (SelectionKey k : selected) {
… …
if (doIO()) {
lastHeard = now;
}
… …
}
}
catch() {
… …
}
}
這里用了java的nio功能恢总,當(dāng)selector偵測到事件發(fā)生的時候就會觸發(fā)一次循環(huán),主要的操作會在doIO()里面完成:
boolean doIO() throws InterruptedException, IOException {
boolean packetReceived = false;
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException(“Socket is null!”);
}
if (sockKey.isReadable()) {
… …
}
if (sockKey.isWritable()) {
… …
}
if (outgoingQueue.isEmpty()) {
disableWrite();
} else {
enableWrite();
}
return packetReceived;
}
這個過程大概是這樣的:
如果有數(shù)據(jù)可讀箩退,則讀取數(shù)據(jù)包离熏,如果數(shù)據(jù)包是先前發(fā)出去的Request的Response佳谦,那么這個數(shù)據(jù)包一定在Pending Queue里面戴涝。將它從Pending Queue里面移走,并將此信息添加到Waiting Event Queue 里面钻蔑,如果數(shù)據(jù)包是一個Watcher Event啥刻,將此信息添加到Waiting Event Queue里面。
如果OutgoingQueue里面有數(shù)據(jù)需要發(fā)送咪笑,則發(fā)送數(shù)據(jù)包并把數(shù)據(jù)包從Outgoing Queue移至Pending Queue可帽,意思是數(shù)據(jù)我已經(jīng)發(fā)出去了,但還要等待Server端的回復(fù)窗怒,所以這個請求現(xiàn)在是Pending 的狀態(tài)映跟。
另外一個線程EventThread是用來處理Event的。前面提到SendThread從Server收到數(shù)據(jù)的時候會把一些信息添加到Event Thread里面扬虚,比如Finish Event和Watcher Event努隙。EventThread就是專門用來處理這些Event的,收到Finish Event的時候會把相對應(yīng)的Package置成Finish狀態(tài)辜昵,這樣等待結(jié)果的Client函數(shù)就能得以返回荸镊。收到Watcher Event的時候會聯(lián)系WatcherManager找到相對應(yīng)的Watcher,從WatcherManager里面移除這個Watcher(因為每個Watcher只會被通知一次) 并回調(diào)Watcher的process函數(shù)。所以所有Watcher的process函數(shù)是運行在EventThread里面的躬存。
保持連接:
到目前為止應(yīng)該已經(jīng)大概介紹了ZK Client端的大致結(jié)構(gòu)和處理流程张惹。還剩下一個問題就是當(dāng)網(wǎng)絡(luò)出問題時ZK Client是如何處理的。其實這個過程并不復(fù)雜岭洲,大概是執(zhí)行以下步驟:
網(wǎng)絡(luò)發(fā)生故障宛逗,網(wǎng)絡(luò)操作拋出的異常被捕獲。
確認網(wǎng)絡(luò)操作失敗钦椭,清除當(dāng)前與Server相關(guān)的網(wǎng)絡(luò)資源拧额,包括Socket等等。
在Server列表中逐個嘗試鏈接Server彪腔。
這個過程從外界看來是透明的侥锦,外界并不會覺察到ZK Client已經(jīng)悄悄地更換了一個連接的Server。
注意點德挣, SendThread 處理流程
版本 zk3.5.5
public void run() {
....
// don't re-establish connection if we are closing
// determine whether we need to send an AuthFailed event.
// An authentication error occurred during authentication with the Zookeeper Server.
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this); // 主要是這里恭垦,這里會發(fā)送
...
}
clientCnxnSocket 是一個借口, 有兩個實現(xiàn)類ClientCnxnSocketNIO 和 ClientCnxnSocketNetty
如何選擇NIO的格嗅,(注意這里默認選擇NIO)
ClientCnxnSocketNIO .doTransport()
void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn)
throws IOException, InterruptedException {
selector.select(waitTimeOut); // nio的selector 選擇器選擇
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
// Everything below and until we get back to the select is
// non blocking, so time is effectively a constant. That is
// Why we just have to do this once, here
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
updateSocketAddresses();
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
doIO(pendingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
if (findSendablePacket(outgoingQueue,
sendThread.tunnelAuthInProgress()) != null) {
enableWrite();
}
}
selected.clear();
}
ClientCnxnSocketNetty,
ClientCnxnSocketNetty implements ClientCnxnSocket abstract methods.
It's responsible for connecting to server, reading/writing network traffic and
being a layer between network data and higher level packets.
這是負責(zé)為聯(lián)機閱讀/寫作到服務(wù)器番挺,和網(wǎng)絡(luò)交流數(shù)據(jù)包。
@Override
void doTransport(int waitTimeOut,
List<Packet> pendingQueue,
ClientCnxn cnxn)
throws IOException, InterruptedException {
try {
if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) {
return;
}
Packet head = null;
if (needSasl.get()) {
if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {
return;
}
} else {
head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS);
}
// check if being waken up on closing.
if (!sendThread.getZkState().isAlive()) {
// adding back the packet to notify of failure in conLossPacket().
addBack(head);
return;
}
// channel disconnection happened
if (disconnected.get()) {
addBack(head);
throw new EndOfStreamException("channel for sessionid 0x"
+ Long.toHexString(sessionId)
+ " is lost");
}
if (head != null) {
doWrite(pendingQueue, head, cnxn);
}
} finally {
updateNow();
}
}
同步調(diào)用:
同步調(diào)用屯掖,就是客戶端成功發(fā)送請求后玄柏,才繼續(xù)執(zhí)行。例如:zk.create(path,data,acl,createMode)贴铜;
這行代碼會發(fā)起一個同步調(diào)用粪摘。一個線程A執(zhí)行這個create時,會創(chuàng)建一個表示create動作的packet绍坝,放到數(shù)據(jù)發(fā)送隊列outgoingQueue徘意。之后當(dāng)線程A就開始等待,直到SendThread線程從outgoingQueue隊列取出該packet轩褐,并將其成功發(fā)送(已收到服務(wù)端的回應(yīng)為準)椎咧。然后線程A才繼續(xù)執(zhí)行。
異步調(diào)用:
異步調(diào)用把介,就是客戶端不會管請求是否發(fā)送成功勤讽,都會繼續(xù)執(zhí)行。例如:zk.create(path,data,acl,createMode,stringCallback)拗踢;
這行代碼會發(fā)起一個異步調(diào)用脚牍。一個線程A執(zhí)行這個create時,會創(chuàng)建一個表示create動作的packet秒拔,放到數(shù)據(jù)發(fā)送隊列outgoingQueue莫矗。線程A接著就去執(zhí)行下一行代碼了飒硅,而不會去管數(shù)據(jù)packet是否由SendThread線程發(fā)送到服務(wù)端了。
SendThread的職責(zé):
1:創(chuàng)建一個長連接作谚,用于會話保持
通過周期性的發(fā)送ping packet到當(dāng)前連接的ZooKeeper服務(wù)器實例三娩。這個過程,我們通常稱為心跳妹懒。每當(dāng)客戶端與服務(wù)端的連接斷開后雀监,會自動重新連接到下一個服務(wù)器。如果斷開的是最后與一個服務(wù)器的連接眨唬,那么會重新連接到第一個服務(wù)器会前。
2:使用這個長連接與服務(wù)器通信
1)發(fā)送客戶調(diào)用
不斷的從outgoingQueue取出packet發(fā)給服務(wù)端。當(dāng)發(fā)送的是Client的同步調(diào)用的packet匾竿,則在發(fā)送packet后瓦宜,立即通知客戶端同步調(diào)用線程繼續(xù)執(zhí)行。當(dāng)發(fā)送的是Client的異步調(diào)用岭妖,則會將packet發(fā)給服務(wù)端临庇,并保存到pendingQueue。當(dāng)從服務(wù)端發(fā)回響應(yīng)后昵慌,生成一個packet 完成事件交給EventThread假夺,由Event執(zhí)行CallBack調(diào)用。
2)處理服務(wù)端響應(yīng)
對服務(wù)端響應(yīng)反序列化后斋攀,根據(jù)響應(yīng)分類進行處理如下:
Ping的響應(yīng):不做處理
認證失敗的響應(yīng):創(chuàng)建認證失敗的WatchedEvent已卷,并將event交給EventThread處理。
服務(wù)端的數(shù)據(jù)變更通知:生成相應(yīng)的數(shù)據(jù)變更WatchedEvent淳蔼,并將event交給EventThread處理侧蘸。
-
服務(wù)端對調(diào)用的回應(yīng):不論是同步調(diào)用還是異步調(diào)用,服務(wù)端都會給出回應(yīng)肖方。收到此類回應(yīng)后闺魏,先是將watcher放到watcherManager中未状。然后對同步俯画、異步做后續(xù)處理。
如果是同步調(diào)用司草,則通知發(fā)起調(diào)用的線程繼續(xù)處理艰垂。 如果是異步調(diào)用,則將該packet交由EventThread來處理埋虹。例如對create猜憎、delete、exists搔课、getData胰柑、getChildren方法調(diào)用的響應(yīng)。
EventThread做了什么事呢?
從上述描述中,也可以看出EventThread用于對接收到的packet或者event進行處理:
- 如果是event柬讨,則從WatcherManager中取出相應(yīng)的Watcher進行處理崩瓤。
- 如果是packet,則執(zhí)行相關(guān)聯(lián)的AsyncCallback踩官。
通過源碼的閱讀却桶,知道在使用ZooKeeper客戶端時要注意以下兩點:
- 在進行Watcher回調(diào)時,會從WatchManager取出與相關(guān)path關(guān)聯(lián)的多個Watcher(此時WatchManager中就不會再有這個path相關(guān)的Watcher了)蔗牡,然后串行的調(diào)用這多個Watcher#process方法颖系。所以在編程時,會根據(jù)業(yè)務(wù)的需要辩越,有可能會反復(fù)注冊Watcher嘁扼。
- 另外因為多個Watcher的調(diào)用是串行的,所以不要因為一個Watcher的處理邏輯影響了整個客戶端的Watcher回調(diào)黔攒。
好了偷拔,對于ZK Client的介紹大概就這么多了,希望這樣的介紹對于大家學(xué)習(xí)和使用Zookeeper有一些幫助亏钩。對于文章中沒有介紹或者沒有說清楚的地方需要進一步查看源碼來解決莲绰。
PS: 若你覺得可以、還行姑丑、過得去蛤签、甚至不太差的話,可以“關(guān)注”或者“點贊”一下栅哀,就此謝過!