ZooKeeper源碼學(xué)習(xí)筆記(1)--client端解析

前言

ZooKeeper是一個相對簡單的分布式協(xié)調(diào)服務(wù)弯洗,通過閱讀源碼我們能夠更進一步的清楚分布式的原理但汞。

環(huán)境

ZooKeeper 3.4.9

入口函數(shù)

bin/zkCli.sh中排监,我們看到client端的真實入口其實是一個org.apache.zookeeper.ZooKeeperMain的Java類

"$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \
     -cp "$CLASSPATH" $CLIENT_JVMFLAGS $JVMFLAGS \
     org.apache.zookeeper.ZooKeeperMain "$@"

通過源碼走讀贬堵,看到在ZooKeeperMain中主要由兩部分構(gòu)成

connectToZK(cl.getOption("server"));

while ((line = (String)readLine.invoke(console, getPrompt())) != null) {
  executeLine(line);
}
  1. 構(gòu)造一個ZooKeeper對象招驴,同ZooKeeperServer進行建立通信連接
  2. 通過反射調(diào)用jline.ConsoleReader類腹鹉,對終端輸入進行讀取,然后通過解析單行命令怔蚌,調(diào)用ZooKeeper接口巩步。

如上所述,client端其實是對 zookeeper.jar 的簡單封裝桦踊,在構(gòu)造出一個ZooKeeper對象后椅野,通過解析用戶輸入,調(diào)用 ZooKeeper 接口和 Server 進行交互。

ZooKeeper 類

剛才我們看到 client 端同 ZooKeeper Server 之間的交互其實是通過 ZooKeeper 對象進行的竟闪,接下來我們詳細深入到 ZooKeeper 類中离福,看看其和服務(wù)端的交互邏輯。

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            boolean canBeReadOnly)
        throws IOException 
{
  ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
  HostProvider hostProvider = new StaticHostProvider( connectStringParser.getServerAddresses());
  cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
  hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly);
  cnxn.start();
}

在 ZooKeeper的構(gòu)造方法中,可以看到 ZooKeeper 中使用 Server 的服務(wù)器地址構(gòu)建了一個 ClientCnxn 類炼蛤,在這個類中妖爷,系統(tǒng)新建了兩個線程

sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();

其中,SendThread 負責將ZooKeeper的請求信息封裝成一個Packet理朋,發(fā)送給 Server ,并維持同Server的心跳絮识,EventThread負責解析通過通過SendThread得到的Response,之后發(fā)送給Watcher::processEvent進行詳細的事件處理嗽上。

Client 時序圖

如上圖所示次舌,Client中在終端輸入指令后,會被封裝成一個Request請求炸裆,通過submitRequest垃它,進一步被封裝成Packet包鲜屏,提交給SendThread處理烹看。

SendThread通過doTransportPacket發(fā)送給Server,并通過readResponse獲取結(jié)果,解析成一個Event洛史,再將Event加入EventThread的隊列中等待執(zhí)行惯殊。

EventThread通過processEvent消費隊列中的Event事件。

SendThread

SendThread 的主要作用除了將Packet包發(fā)送給Server之外也殖,還負責維持Client和Server之間的心跳土思,確保 session 存活。

現(xiàn)在讓我們從源碼出發(fā)忆嗜,看看SendThread究竟是如何運行的己儒。

SendThread是一個線程類,因此我們進入其run()方法捆毫,看看他的啟動流程闪湾。

while (state.isAlive()) {
  if (!clientCnxnSocket.isConnected()) {
    // 啟動和server的socket鏈接
    startConnect();
  }
  // 根據(jù)上次的連接時間,判斷是否超時
  if (state.isConnected()) {
    to = readTimeout - clientCnxnSocket.getIdleRecv();
  } else {
    to = connectTimeout - clientCnxnSocket.getIdleRecv();
  }
  if (to <= 0) {
    throw new SessionTimeoutException(warnInfo);
  }
  // 發(fā)送心跳包
  if (state.isConnected()) {
    if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
      sendPing();
      clientCnxnSocket.updateLastSend();
    }
  }
  // 將指令信息發(fā)送給 Server
  clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
}

從上面的代碼中绩卤,可以看出SendThread的主要任務(wù)如下:

  1. 創(chuàng)建同 Server 之間的 socket 鏈接
  2. 判斷鏈接是否超時
  3. 定時發(fā)送心跳任務(wù)
  4. 將ZooKeeper指令發(fā)送給Server

與 Server 的長鏈接

ZooKeeper通過獲取ZOOKEEPER_CLIENT_CNXN_SOCKET變量構(gòu)造了一個ClientCnxnSocket對象途样,默認情況下是ClientCnxnSocketNIO

String clientCnxnSocketName = System
                .getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
if (clientCnxnSocketName == null) {
  clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
}

ClientCnxnSocketNIO::connect中我們可以看到這里同Server之間創(chuàng)建了一個socket鏈接。

SocketChannel sock = createSock();
registerAndConnect(sock, addr);

超時與心跳

SendThread::run中濒憋,可以看到針對鏈接是否建立分別有readTimeoutconnetTimeout 兩種超時時間何暇,一旦發(fā)現(xiàn)鏈接超時,則拋出異常凛驮,終止 SendThread裆站。

在沒有超時的情況下,如果判斷距離上次心跳時間超過了1/2個超時時間,會再次發(fā)送心跳數(shù)據(jù)遏插,避免訪問超時捂贿。

發(fā)送 ZooKeeper 指令

在時序圖中,我們看到從終端輸入指令后胳嘲,我們會將其解析成一個Packet 包厂僧,等待SendThread進行發(fā)送。

ZooKeeper::create為例

RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.create);
CreateRequest request = new CreateRequest();
CreateResponse response = new CreateResponse();
request.setData(data);
request.setFlags(createMode.toFlag());
request.setPath(serverPath);
if (acl != null && acl.size() == 0) {
    throw new KeeperException.InvalidACLException();
}
request.setAcl(acl);
ReplyHeader r = cnxn.submitRequest(h, request, response, null);

在這里create指令了牛,被封裝成了一個 CreateRequest颜屠,通過submitRequest被轉(zhuǎn)成了一個Packet

public ReplyHeader submitRequest(RequestHeader h, Record request,
            Record response, WatchRegistration watchRegistration)
            throws InterruptedException {
    ReplyHeader r = new ReplyHeader();
    Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration);
    synchronized (packet) {
        while (!packet.finished) {
            packet.wait();
        }
    }
    return r;
}

Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
            Record response, AsyncCallback cb, String clientPath,
            String serverPath, Object ctx, WatchRegistration watchRegistration) {
    Packet packet = null;
    // Note that we do not generate the Xid for the packet yet. It is
    // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
    // where the packet is actually sent.
    synchronized (outgoingQueue) {
        packet = new Packet(h, r, request, response, watchRegistration);
        packet.cb = cb;
        packet.ctx = ctx;
        packet.clientPath = clientPath;
        packet.serverPath = serverPath;
        if (!state.isAlive() || closing) {
            conLossPacket(packet);
        } else {
            // If the client is asking to close the session then
            // mark as closing
            if (h.getType() == OpCode.closeSession) {
                closing = true;
            }
            outgoingQueue.add(packet);
        }
    }
    sendThread.getClientCnxnSocket().wakeupCnxn();
    return packet;
}

submitRequest中,我們進一步看到Request被封裝成一個Packet包鹰祸,并加入SendThread::outgoingQueue隊列中甫窟,等待執(zhí)行。

Note:在這里我們還看到蛙婴,ZooKeeper方法中所謂的同步方法其實就是在Packet被提交到SendThread之后粗井,陷入一個while循環(huán),等待處理完成后再跳出的過程

SendThread::runwhile循環(huán)中街图,ZooKeeper通過doTransport將存放在outgoingQueue中的Packet包發(fā)送給 Server浇衬。

void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn) {
    if (sockKey.isReadable()) {
        // 讀取response信息
        sendThread.readResponse(incomingBuffer);
    }
    if (sockKey.isWritable()) {
        Packet p = findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress());
        sock.write(p.bb);
    }
}

doIO發(fā)送socket信息之前,先從socket中獲取返回數(shù)據(jù)餐济,通過readResonse進行處理耘擂。

void readResponse(ByteBuffer incomingBuffer) throws IOException {
     ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
     BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
     ReplyHeader replyHdr = new ReplyHeader();
     replyHdr.deserialize(bbia, "header");
     if (replyHdr.getXid() == -1) {
        WatcherEvent event = new WatcherEvent();
        event.deserialize(bbia, "response");
        WatchedEvent we = new WatchedEvent(event);
        eventThread.queueEvent( we );
     }
}

readReponse中,通過解析數(shù)據(jù),我們可以得到WatchedEvent對象絮姆,并將其壓入EventThread的消息隊列醉冤,等待分發(fā)

EventThread

public void run() {
    while (true) {
        Object event = waitingEvents.take();
        if (event == eventOfDeath) {
            wasKilled = true;
        } else {
            processEvent(event);
        }
}

EventThread中通過processEvent對隊列中的事件進行消費,并分發(fā)給不同的Watcher

watch事件注冊和分發(fā)

通常在ZooKeeper中篙悯,我們會為指定節(jié)點添加一個Watcher蚁阳,用于監(jiān)聽節(jié)點變化情況,以ZooKeeper:exist為例

// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
    wcb = new ExistsWatchRegistration(watcher, clientPath);
}

final String serverPath = prependChroot(clientPath);

RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.exists);
ExistsRequest request = new ExistsRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
SetDataResponse response = new SetDataResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);

代碼的大致邏輯和create類似鸽照,但是對wathcer做了一層ExistWatchRegistration的包裝螺捐,當packet對象完成請求之后,調(diào)用register方法移宅,根據(jù)不同包裝的WatchRegistration將watch注冊到不同watch列表中归粉,等待回調(diào)。

if (p.watchRegistration != null) {
    p.watchRegistration.register(p.replyHeader.getErr());
}

在 ZooKeeper 中一共有三種類型的WatchRegistration漏峰,分別對應(yīng)DataWatchRegistration,ChildWatchRegistration,ExistWatchRegistration糠悼。 并在ZKWatchManager類中根據(jù)每種類型的WatchRegistration,分別有一張map表負責存放。

private final Map<String, Set<Watcher>> dataWatches =
            new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches =
            new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches =
            new HashMap<String, Set<Watcher>>();

EventThread::processEvent 時浅乔,根據(jù)event的所屬路徑倔喂,從三張map中獲取對應(yīng)的watch列表進行消息通知及處理铝条。

總結(jié)

client 端的源碼分析就到此為止了。

ZooKeeper Client 的源碼很簡單席噩,擁有三個獨立線程分別對命令進行處理班缰,分發(fā)和響應(yīng)操作,在保證各個線程相互獨立的基礎(chǔ)上悼枢,盡可能避免了多線程操作中出現(xiàn)鎖的情況埠忘。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市馒索,隨后出現(xiàn)的幾起案子莹妒,更是在濱河造成了極大的恐慌,老刑警劉巖绰上,帶你破解...
    沈念sama閱讀 212,454評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件旨怠,死亡現(xiàn)場離奇詭異,居然都是意外死亡蜈块,警方通過查閱死者的電腦和手機鉴腻,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來百揭,“玉大人爽哎,你說我怎么就攤上這事⌒啪” “怎么了倦青?”我有些...
    開封第一講書人閱讀 157,921評論 0 348
  • 文/不壞的土叔 我叫張陵瓮床,是天一觀的道長盹舞。 經(jīng)常有香客問我,道長隘庄,這世上最難降的妖魔是什么踢步? 我笑而不...
    開封第一講書人閱讀 56,648評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮丑掺,結(jié)果婚禮上获印,老公的妹妹穿的比我還像新娘。我一直安慰自己街州,他們只是感情好兼丰,可當我...
    茶點故事閱讀 65,770評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著唆缴,像睡著了一般鳍征。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上面徽,一...
    開封第一講書人閱讀 49,950評論 1 291
  • 那天艳丛,我揣著相機與錄音匣掸,去河邊找鬼。 笑死氮双,一個胖子當著我的面吹牛碰酝,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播戴差,決...
    沈念sama閱讀 39,090評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼送爸,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了暖释?” 一聲冷哼從身側(cè)響起碱璃,我...
    開封第一講書人閱讀 37,817評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎饭入,沒想到半個月后嵌器,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,275評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡谐丢,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,592評論 2 327
  • 正文 我和宋清朗相戀三年爽航,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片乾忱。...
    茶點故事閱讀 38,724評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡讥珍,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出窄瘟,到底是詐尸還是另有隱情衷佃,我是刑警寧澤,帶...
    沈念sama閱讀 34,409評論 4 333
  • 正文 年R本政府宣布蹄葱,位于F島的核電站氏义,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏图云。R本人自食惡果不足惜惯悠,卻給世界環(huán)境...
    茶點故事閱讀 40,052評論 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望竣况。 院中可真熱鬧克婶,春花似錦、人聲如沸丹泉。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,815評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽摹恨。三九已至筋岛,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間睬塌,已是汗流浹背泉蝌。 一陣腳步聲響...
    開封第一講書人閱讀 32,043評論 1 266
  • 我被黑心中介騙來泰國打工歇万, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人勋陪。 一個月前我還...
    沈念sama閱讀 46,503評論 2 361
  • 正文 我出身青樓贪磺,卻偏偏與公主長得像,于是被迫代替她去往敵國和親诅愚。 傳聞我的和親對象是個殘疾皇子寒锚,可洞房花燭夜當晚...
    茶點故事閱讀 43,627評論 2 350

推薦閱讀更多精彩內(nèi)容