Zookeeper全解析--內(nèi)部工作機制-源碼解析

來源: https://www.cnblogs.com/f1194361820/p/5519227.html
https://www.cnblogs.com/duanxz/p/3783266.html

zookeeper結(jié)構(gòu)圖

1

創(chuàng)建ZooKeeper對象時性雄,應(yīng)會創(chuàng)建一個ClientCnxn(代表了客戶端連接對象)。與此同時啟動了兩個線程:SendThread、EventThread妙蔗。兩個隊列:outgoingQueue和pendingQueue愉择。

模塊:

我們可以認為ZK的Client由三個主要模塊組成:Zookeeper, WatcherManager, ClientCnxn

Zookeeper是ZK Client端的真正接口碍庵,用戶可以操作的最主要的類了赌,當(dāng)用戶創(chuàng)建一個Zookeeper實例以后巩那,幾乎所有的操作都被這個實例包辦了吏夯,用戶不用關(guān)心怎么連接到Server,Watcher什么時候被觸發(fā)等等令人傷神的問題即横。

WatcherManager噪生,顧名思義,它是用來管理Watcher的东囚,Watcher是ZK的一大特色功能跺嗽,允許多個Client對一個或多個ZNode進行監(jiān)控,當(dāng)ZNode有變化時能夠通知到監(jiān)控這個ZNode的各個Client。我們把一個ZK Client簡單看成一個Zookeeper實例桨嫁,那么這個實例內(nèi)部的WatcherManager就管理了ZK Client綁定的所有Watcher植兰。

ClientCnxn是管理所有網(wǎng)絡(luò)IO的模塊,所有和ZK Server交互的信息和數(shù)據(jù)都經(jīng)過這個模塊璃吧,包括給ZK Server發(fā)送Request楣导,從ZK Server接受Response,以及從ZK Server接受Watcher Event畜挨。ClientCnxn完全管理了網(wǎng)絡(luò)筒繁,從外部看來網(wǎng)絡(luò)操作是透明的。

線程:

每當(dāng)我們創(chuàng)建一個Zookeeper實例的時候巴元,會有兩個線程被創(chuàng)建:SendThread和EventThread毡咏。所以當(dāng)我們使用ZK Client端的時候應(yīng)該盡量只創(chuàng)建一個Zookeeper實例并反復(fù)使用。大量的創(chuàng)建銷毀Zookeeper實例不僅會反復(fù)的創(chuàng)建和銷毀線程逮刨,而且會在Server端創(chuàng)建大量的Session呕缭。

SendThread是真正處理網(wǎng)絡(luò)IO的線程,所有通過網(wǎng)絡(luò)發(fā)送和接受的數(shù)據(jù)包都在這個線程中處理修己。這個線程的主體是一個while循環(huán):

while (zooKeeper.state.isAlive()) {
        try {
            if (sockKey == null) {
            // don’t re-establish connection if we are closing
                if (closing) {
                    break;
                }
                startConnect();
                lastSend = now;
                lastHeard = now;
            }
            … ….
            selector.select(to);
            Set<SelectionKey> selected;
            synchronized (this) {
                selected = selector.selectedKeys();
            }
            // Everything below and until we get back to the select is
            // non blocking, so time is effectively a constant. That is
            // Why we just have to do this once, here
            now = System.currentTimeMillis();
            for (SelectionKey k : selected) {
                … …
                if (doIO()) {
                    lastHeard = now;
                }
                … …
            }
        }
        catch() {
            … …
        }
    }

這里用了java的nio功能恢总,當(dāng)selector偵測到事件發(fā)生的時候就會觸發(fā)一次循環(huán),主要的操作會在doIO()里面完成:

boolean doIO() throws InterruptedException, IOException {
        boolean packetReceived = false;
        SocketChannel sock = (SocketChannel) sockKey.channel();
        if (sock == null) {
            throw new IOException(“Socket is null!”);
        }
        if (sockKey.isReadable()) {
            … …
        }
         
        if (sockKey.isWritable()) {
        … …
        }

        if (outgoingQueue.isEmpty()) {
            disableWrite();
        } else {
            enableWrite();
        }
        return packetReceived;
    }

這個過程大概是這樣的:

  1. 如果有數(shù)據(jù)可讀箩退,則讀取數(shù)據(jù)包离熏,如果數(shù)據(jù)包是先前發(fā)出去的Request的Response佳谦,那么這個數(shù)據(jù)包一定在Pending Queue里面戴涝。將它從Pending Queue里面移走,并將此信息添加到Waiting Event Queue 里面钻蔑,如果數(shù)據(jù)包是一個Watcher Event啥刻,將此信息添加到Waiting Event Queue里面。

  2. 如果OutgoingQueue里面有數(shù)據(jù)需要發(fā)送咪笑,則發(fā)送數(shù)據(jù)包并把數(shù)據(jù)包從Outgoing Queue移至Pending Queue可帽,意思是數(shù)據(jù)我已經(jīng)發(fā)出去了,但還要等待Server端的回復(fù)窗怒,所以這個請求現(xiàn)在是Pending 的狀態(tài)映跟。

另外一個線程EventThread是用來處理Event的。前面提到SendThread從Server收到數(shù)據(jù)的時候會把一些信息添加到Event Thread里面扬虚,比如Finish Event和Watcher Event努隙。EventThread就是專門用來處理這些Event的,收到Finish Event的時候會把相對應(yīng)的Package置成Finish狀態(tài)辜昵,這樣等待結(jié)果的Client函數(shù)就能得以返回荸镊。收到Watcher Event的時候會聯(lián)系WatcherManager找到相對應(yīng)的Watcher,從WatcherManager里面移除這個Watcher(因為每個Watcher只會被通知一次) 并回調(diào)Watcher的process函數(shù)。所以所有Watcher的process函數(shù)是運行在EventThread里面的躬存。

保持連接:

到目前為止應(yīng)該已經(jīng)大概介紹了ZK Client端的大致結(jié)構(gòu)和處理流程张惹。還剩下一個問題就是當(dāng)網(wǎng)絡(luò)出問題時ZK Client是如何處理的。其實這個過程并不復(fù)雜岭洲,大概是執(zhí)行以下步驟:

  1. 網(wǎng)絡(luò)發(fā)生故障宛逗,網(wǎng)絡(luò)操作拋出的異常被捕獲。

  2. 確認網(wǎng)絡(luò)操作失敗钦椭,清除當(dāng)前與Server相關(guān)的網(wǎng)絡(luò)資源拧额,包括Socket等等。

  3. 在Server列表中逐個嘗試鏈接Server彪腔。

這個過程從外界看來是透明的侥锦,外界并不會覺察到ZK Client已經(jīng)悄悄地更換了一個連接的Server。

注意點德挣, SendThread 處理流程

版本 zk3.5.5


public void run() {
....
// don't re-establish connection if we are closing
// determine whether we need to send an AuthFailed event.
// An authentication error occurred during authentication with the Zookeeper Server.
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this); // 主要是這里恭垦,這里會發(fā)送
...
}

clientCnxnSocket 是一個借口, 有兩個實現(xiàn)類ClientCnxnSocketNIO 和 ClientCnxnSocketNetty

如何選擇NIO的格嗅,(注意這里默認選擇NIO)
ClientCnxnSocketNIO .doTransport()

void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn)
            throws IOException, InterruptedException {
        selector.select(waitTimeOut);   // nio的selector 選擇器選擇
        Set<SelectionKey> selected;
        synchronized (this) {
            selected = selector.selectedKeys();
        }
        // Everything below and until we get back to the select is
        // non blocking, so time is effectively a constant. That is
        // Why we just have to do this once, here
        updateNow();
        for (SelectionKey k : selected) {
            SocketChannel sc = ((SocketChannel) k.channel());
            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
                if (sc.finishConnect()) {
                    updateLastSendAndHeard();
                    updateSocketAddresses();
                    sendThread.primeConnection();
                }
            } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                doIO(pendingQueue, cnxn);
            }
        }
        if (sendThread.getZkState().isConnected()) {
            if (findSendablePacket(outgoingQueue,
                    sendThread.tunnelAuthInProgress()) != null) {
                enableWrite();
            }
        }
        selected.clear();
    }

ClientCnxnSocketNetty,

ClientCnxnSocketNetty implements ClientCnxnSocket abstract methods.
It's responsible for connecting to server, reading/writing network traffic and
being a layer between network data and higher level packets.
這是負責(zé)為聯(lián)機閱讀/寫作到服務(wù)器番挺,和網(wǎng)絡(luò)交流數(shù)據(jù)包。

@Override
    void doTransport(int waitTimeOut,
                     List<Packet> pendingQueue,
                     ClientCnxn cnxn)
            throws IOException, InterruptedException {
        try {
            if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) {
                return;
            }
            Packet head = null;
            if (needSasl.get()) {
                if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {
                    return;
                }
            } else {
                head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS);
            }
            // check if being waken up on closing.
            if (!sendThread.getZkState().isAlive()) {
                // adding back the packet to notify of failure in conLossPacket().
                addBack(head);
                return;
            }
            // channel disconnection happened
            if (disconnected.get()) {
                addBack(head);
                throw new EndOfStreamException("channel for sessionid 0x"
                        + Long.toHexString(sessionId)
                        + " is lost");
            }
            if (head != null) {
                doWrite(pendingQueue, head, cnxn);
            }
        } finally {
            updateNow();
        }
    }

同步調(diào)用:

同步調(diào)用屯掖,就是客戶端成功發(fā)送請求后玄柏,才繼續(xù)執(zhí)行。例如:zk.create(path,data,acl,createMode)贴铜;

這行代碼會發(fā)起一個同步調(diào)用粪摘。一個線程A執(zhí)行這個create時,會創(chuàng)建一個表示create動作的packet绍坝,放到數(shù)據(jù)發(fā)送隊列outgoingQueue徘意。之后當(dāng)線程A就開始等待,直到SendThread線程從outgoingQueue隊列取出該packet轩褐,并將其成功發(fā)送(已收到服務(wù)端的回應(yīng)為準)椎咧。然后線程A才繼續(xù)執(zhí)行。

異步調(diào)用:

異步調(diào)用把介,就是客戶端不會管請求是否發(fā)送成功勤讽,都會繼續(xù)執(zhí)行。例如:zk.create(path,data,acl,createMode,stringCallback)拗踢;

這行代碼會發(fā)起一個異步調(diào)用脚牍。一個線程A執(zhí)行這個create時,會創(chuàng)建一個表示create動作的packet秒拔,放到數(shù)據(jù)發(fā)送隊列outgoingQueue莫矗。線程A接著就去執(zhí)行下一行代碼了飒硅,而不會去管數(shù)據(jù)packet是否由SendThread線程發(fā)送到服務(wù)端了。

SendThread的職責(zé):

1:創(chuàng)建一個長連接作谚,用于會話保持

通過周期性的發(fā)送ping packet到當(dāng)前連接的ZooKeeper服務(wù)器實例三娩。這個過程,我們通常稱為心跳妹懒。每當(dāng)客戶端與服務(wù)端的連接斷開后雀监,會自動重新連接到下一個服務(wù)器。如果斷開的是最后與一個服務(wù)器的連接眨唬,那么會重新連接到第一個服務(wù)器会前。

2:使用這個長連接與服務(wù)器通信

1)發(fā)送客戶調(diào)用

不斷的從outgoingQueue取出packet發(fā)給服務(wù)端。當(dāng)發(fā)送的是Client的同步調(diào)用的packet匾竿,則在發(fā)送packet后瓦宜,立即通知客戶端同步調(diào)用線程繼續(xù)執(zhí)行。當(dāng)發(fā)送的是Client的異步調(diào)用岭妖,則會將packet發(fā)給服務(wù)端临庇,并保存到pendingQueue。當(dāng)從服務(wù)端發(fā)回響應(yīng)后昵慌,生成一個packet 完成事件交給EventThread假夺,由Event執(zhí)行CallBack調(diào)用。

2)處理服務(wù)端響應(yīng)

對服務(wù)端響應(yīng)反序列化后斋攀,根據(jù)響應(yīng)分類進行處理如下:

  • Ping的響應(yīng):不做處理

  • 認證失敗的響應(yīng):創(chuàng)建認證失敗的WatchedEvent已卷,并將event交給EventThread處理。

  • 服務(wù)端的數(shù)據(jù)變更通知:生成相應(yīng)的數(shù)據(jù)變更WatchedEvent淳蔼,并將event交給EventThread處理侧蘸。

  • 服務(wù)端對調(diào)用的回應(yīng):不論是同步調(diào)用還是異步調(diào)用,服務(wù)端都會給出回應(yīng)肖方。收到此類回應(yīng)后闺魏,先是將watcher放到watcherManager中未状。然后對同步俯画、異步做后續(xù)處理。

     如果是同步調(diào)用司草,則通知發(fā)起調(diào)用的線程繼續(xù)處理艰垂。
     如果是異步調(diào)用,則將該packet交由EventThread來處理埋虹。例如對create猜憎、delete、exists搔课、getData胰柑、getChildren方法調(diào)用的響應(yīng)。
    

EventThread做了什么事呢?

從上述描述中,也可以看出EventThread用于對接收到的packet或者event進行處理:

  • 如果是event柬讨,則從WatcherManager中取出相應(yīng)的Watcher進行處理崩瓤。
  • 如果是packet,則執(zhí)行相關(guān)聯(lián)的AsyncCallback踩官。

通過源碼的閱讀却桶,知道在使用ZooKeeper客戶端時要注意以下兩點:

  • 在進行Watcher回調(diào)時,會從WatchManager取出與相關(guān)path關(guān)聯(lián)的多個Watcher(此時WatchManager中就不會再有這個path相關(guān)的Watcher了)蔗牡,然后串行的調(diào)用這多個Watcher#process方法颖系。所以在編程時,會根據(jù)業(yè)務(wù)的需要辩越,有可能會反復(fù)注冊Watcher嘁扼。
  • 另外因為多個Watcher的調(diào)用是串行的,所以不要因為一個Watcher的處理邏輯影響了整個客戶端的Watcher回調(diào)黔攒。

好了偷拔,對于ZK Client的介紹大概就這么多了,希望這樣的介紹對于大家學(xué)習(xí)和使用Zookeeper有一些幫助亏钩。對于文章中沒有介紹或者沒有說清楚的地方需要進一步查看源碼來解決莲绰。

PS: 若你覺得可以、還行姑丑、過得去蛤签、甚至不太差的話,可以“關(guān)注”或者“點贊”一下栅哀,就此謝過!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末震肮,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子留拾,更是在濱河造成了極大的恐慌戳晌,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,188評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件痴柔,死亡現(xiàn)場離奇詭異沦偎,居然都是意外死亡,警方通過查閱死者的電腦和手機咳蔚,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,464評論 3 395
  • 文/潘曉璐 我一進店門豪嚎,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人谈火,你說我怎么就攤上這事侈询。” “怎么了糯耍?”我有些...
    開封第一講書人閱讀 165,562評論 0 356
  • 文/不壞的土叔 我叫張陵扔字,是天一觀的道長囊嘉。 經(jīng)常有香客問我,道長革为,這世上最難降的妖魔是什么哗伯? 我笑而不...
    開封第一講書人閱讀 58,893評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮篷角,結(jié)果婚禮上焊刹,老公的妹妹穿的比我還像新娘。我一直安慰自己恳蹲,他們只是感情好虐块,可當(dāng)我...
    茶點故事閱讀 67,917評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著嘉蕾,像睡著了一般贺奠。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上错忱,一...
    開封第一講書人閱讀 51,708評論 1 305
  • 那天儡率,我揣著相機與錄音,去河邊找鬼以清。 笑死儿普,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的掷倔。 我是一名探鬼主播眉孩,決...
    沈念sama閱讀 40,430評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼勒葱!你這毒婦竟也來了浪汪?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,342評論 0 276
  • 序言:老撾萬榮一對情侶失蹤凛虽,失蹤者是張志新(化名)和其女友劉穎死遭,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體凯旋,經(jīng)...
    沈念sama閱讀 45,801評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡呀潭,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,976評論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了瓦阐。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蜗侈。...
    茶點故事閱讀 40,115評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡篷牌,死狀恐怖睡蟋,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情枷颊,我是刑警寧澤戳杀,帶...
    沈念sama閱讀 35,804評論 5 346
  • 正文 年R本政府宣布该面,位于F島的核電站,受9級特大地震影響信卡,放射性物質(zhì)發(fā)生泄漏隔缀。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,458評論 3 331
  • 文/蒙蒙 一傍菇、第九天 我趴在偏房一處隱蔽的房頂上張望猾瘸。 院中可真熱鬧,春花似錦丢习、人聲如沸牵触。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,008評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽揽思。三九已至,卻和暖如春见擦,著一層夾襖步出監(jiān)牢的瞬間钉汗,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,135評論 1 272
  • 我被黑心中介騙來泰國打工鲤屡, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留损痰,地道東北人。 一個月前我還...
    沈念sama閱讀 48,365評論 3 373
  • 正文 我出身青樓酒来,卻偏偏與公主長得像徐钠,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,055評論 2 355

推薦閱讀更多精彩內(nèi)容