zookeeper的watcher機制

1 首先watcher機制的watcher是什么施敢?模型是什么?

watcher就像現(xiàn)實生活中的監(jiān)聽器辐脖,那server就相當(dāng)于會議室等饲宛,各個客戶端(人),都可以訪問的地方嗜价。我不需要知道其他人干了什么事情艇抠,我只監(jiān)聽,我感興趣的內(nèi)容(DataNode)久锥。 DataNode內(nèi)容變更家淤。dataNode 的新增,或者子DataNode改變等事件瑟由。都會引起我的興趣絮重,然后監(jiān)聽器就會,給我推送內(nèi)容。

watcher的模型青伤,包括最重要的回調(diào),process回調(diào)方法督怜。 還有Event事件,還有event發(fā)生時server的狀態(tài)KeeperState潮模。


image.png

2 watcher機制亮蛔,有什么用?

1 結(jié)合zookeeper數(shù)據(jù)模型中的擎厢,臨時節(jié)點Znode,可以實現(xiàn)究流,服務(wù)注冊與發(fā)現(xiàn),集群配置動態(tài)更新等功能动遭。
2 結(jié)合zookeeper數(shù)據(jù)模型中的芬探,臨時順序節(jié)點Znode特性,可以實現(xiàn)分布式鎖厘惦,隊列偷仿。
3 從設(shè)計模式的角度看,watcher的注冊宵蕉,觸發(fā)酝静,就是分布式消息的發(fā)布訂閱模式,也就是觀察者模式羡玛。

3 watcher的運行機制

模擬這樣一個場景别智,zk.getChildren(path, childrenWatcher) . childrenWatcher是重寫了process(WatchedEvent event)方法的watcher對象。

3.1 watcher的client端稼稿,怎么完成注冊
ZooKeeper#getChildren(String, Watcher)
public List<String> getChildren(final String path, Watcher watcher)
    {
          "省略n行代碼================"
        WatchRegistration wcb = null;
        if (watcher != null) {
            wcb = new ChildWatchRegistration(watcher, clientPath);
        }
        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.getChildren);
        GetChildrenRequest request = new GetChildrenRequest();
        request.setPath(serverPath);
        "實際網(wǎng)絡(luò)上傳輸薄榛,只傳path,還有watch這個boolean變量"
        request.setWatch(watcher != null);
        GetChildrenResponse response = new GetChildrenResponse();
        ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                    clientPath);
        }
        return response.getChildren();
    }

構(gòu)建完了請求對象之后让歼,放入outgoingQueue中敞恋。等待SendThread的run方法發(fā)送

Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
            Record response, AsyncCallback cb, String clientPath,
            String serverPath, Object ctx, WatchRegistration watchRegistration)
    {
        Packet packet = null; 
        synchronized (outgoingQueue) {
            packet = new Packet(h, r, request, response, watchRegistration);
            packet.cb = cb;
            packet.ctx = ctx;
            packet.clientPath = clientPath;
            packet.serverPath = serverPath;
            if (!state.isAlive() || closing) {
                conLossPacket(packet);
            } else {
                // If the client is asking to close the session then
                // mark as closing
                if (h.getType() == OpCode.closeSession) {
                    closing = true;
                }
                outgoingQueue.add(packet);
            }
        }
        sendThread.getClientCnxnSocket().wakeupCnxn();
        return packet;
    }
ClientCnxn.SendThread#run
while (state.isAlive()) {
                try {
"省略n行代碼======================"
                    clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
"省略n行代碼======================"
                } 
.ClientCnxnSocketNIO#doIO
if (sockKey.isWritable()) {
            synchronized(outgoingQueue) {
                "從outpingQueue中拿到需要發(fā)送的packet  "
                Packet p = findSendablePacket(outgoingQueue,
                        cnxn.sendThread.clientTunneledAuthenticationInProgress());

                if (p != null) {
                    updateLastSend();
                    // If we already started writing p, p.bb will already exist
                    if (p.bb == null) {
                        if ((p.requestHeader != null) &&
                                (p.requestHeader.getType() != OpCode.ping) &&
                                (p.requestHeader.getType() != OpCode.auth)) {
                            p.requestHeader.setXid(cnxn.getXid());
                        }
"這個方法里只序列化了request字段和requestHeader字段到Packet類中的bb字段ByteBuffer"
                        p.createBB();
                    }
                    sock.write(p.bb);
                    if (!p.bb.hasRemaining()) {
                        sentCount++;
                        outgoingQueue.removeFirstOccurrence(p);
                        if (p.requestHeader != null
                                && p.requestHeader.getType() != OpCode.ping
                                && p.requestHeader.getType() != OpCode.auth) {
                            synchronized (pendingQueue) {
  "剛才說了requestHeader序列化了,而且有東西谋右,所以pendingQueue加入packet"
                                pendingQueue.add(p);
                            }
                        }
                    }
                }
                if (outgoingQueue.isEmpty()) {     
                    disableWrite();
                } else if (!initialized && p != null && !p.bb.hasRemaining()) {
                                   disableWrite();
                } else {
                    enableWrite();
                }
            }
        }

到這里實際上硬猫,還沒有完成客戶端的watcher注冊。
實際上是改执,服務(wù)端先完成的注冊浦徊。但是服務(wù)端注冊,我們后面看天梧,這里先假定他正常返回了getChildrenRequest請求的數(shù)據(jù)盔性。

這個時候,還是ClientCnxnSocketNIO#doIO的方法呢岗。

if (sockKey.isReadable()) {
            int rc = sock.read(incomingBuffer);
           
            if (!incomingBuffer.hasRemaining()) {
                incomingBuffer.flip();
                if (incomingBuffer == lenBuffer) {
                    recvCount++;
                    readLength();
                } 
                } else {
                    "讀取服務(wù)端返回的數(shù)據(jù)"
                    sendThread.readResponse(incomingBuffer);
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                }
            }
        }
org.apache.zookeeper.ClientCnxn.SendThread#readResponse
Packet packet;
            synchronized (pendingQueue) {
                if (pendingQueue.size() == 0) {
                    throw new IOException("Nothing in the queue, but got "
                            + replyHdr.getXid());
                }
                "拿到之前放在pendingqueue中的packet"
                packet = pendingQueue.remove();
            }
            try {
                packet.replyHeader.setXid(replyHdr.getXid());
                packet.replyHeader.setErr(replyHdr.getErr());
                packet.replyHeader.setZxid(replyHdr.getZxid());
                if (replyHdr.getZxid() > 0) {
                    lastZxid = replyHdr.getZxid();
                }
                if (packet.response != null && replyHdr.getErr() == 0) {
                    packet.response.deserialize(bbia, "response");
                }
            } finally {
                "這個方法最重要冕香,才是完成watcher注冊的地方"
                finishPacket(packet);
            }

這個方法最重要蛹尝,才是完成watcher注冊的地方

ClientCnxn#finishPacket
private void finishPacket(Packet p) {
        if (p.watchRegistration != null) {
            "這里的p.watchRegistration實際上是ChildWatchRegistration"
            p.watchRegistration.register(p.replyHeader.getErr());
        }

        if (p.cb == null) {
            synchronized (p) {
                p.finished = true;
                p.notifyAll();
            }
        } else {
            p.finished = true;
            eventThread.queuePacket(p);
        }
    }
public void register(int rc) {
            if (shouldAddWatch(rc)) {
 "因為ChildWatchRegistration重寫了getWatches方法,返回的是watchManager.childWatches;"
"watchManager實際上就是ZKWatchManager"
                Map<String, Set<Watcher>> watches = getWatches(rc);
                synchronized(watches) {
                    Set<Watcher> watchers = watches.get(clientPath);
                    if (watchers == null) {
                        watchers = new HashSet<Watcher>();
                        watches.put(clientPath, watchers);
                    }
                    watchers.add(watcher);
                }
            }
        }

到此悉尾,完成了client端的注冊突那。
附上流程圖,還有類圖:


zookeeper的watch通知機制.png
wacher注冊觸發(fā)整體流程.png
3.2 server端是怎么注冊的呢构眯?

在前面的【zookeeper集群的分布式事務(wù)請求處理過程】中我們講過愕难,責(zé)任鏈的確定是根據(jù)其角色來確定的。 當(dāng)前我們是單機版的惫霸。所以我們的getChildren請求猫缭,最終會到服務(wù)端的責(zé)任鏈 PrepRequestProcessor===>SyncRequestProcessor===>FInalRequestProcessor
當(dāng)我們發(fā)送包含watcher的getChildren的請求對象時,PrepRequestProcessor只是checkSession是否過期expired 或者sessIon是否遷移moved
SyncRequestProcessor和持久化事物日志有關(guān)壹店。如果不需要持久化猜丹,就會走到FInalRequestProcessor。這個類中

FinalRequestProcessor#processRequest
case OpCode.getChildren: {
                lastOp = "GETC";
                GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request,
                        getChildrenRequest);
                "從內(nèi)存的dataTree中獲取path所對應(yīng)的節(jié)點"
                DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath());
                if (n == null) {
                    throw new KeeperException.NoNodeException();
                }
                PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
                        ZooDefs.Perms.READ,
                        request.authInfo);
                  "如果getChildrenRequest中的watch布爾變量是true的話硅卢,傳入cnxn對象"
             "因為cnxn對象實現(xiàn)了watcher接口方便后面的時候回調(diào)"
                List<String> children = zks.getZKDatabase().getChildren(
                        getChildrenRequest.getPath(), null, getChildrenRequest
                                .getWatch() ? cnxn : null);
                rsp = new GetChildrenResponse(children);
                break;
            }
public List<String> getChildren(String path, Stat stat, Watcher watcher)
            throws KeeperException.NoNodeException {
        DataNode n = nodes.get(path);
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        synchronized (n) {
            if (stat != null) {
                n.copyStat(stat);
            }
            List<String> children = new ArrayList<String>(n.getChildren());

            if (watcher != null) {
                "把cnxn對象注冊到WatchManager childWatches中"
                childWatches.addWatch(path, watcher);
            }
            return children;
        }
    }
3.3 watcher在服務(wù)端的回調(diào)

當(dāng)我們在path下面新建子節(jié)點的時候射窒,我們就會觸發(fā),這個NodeChildrenChanged事件将塑。
首先進入的是PrepRequestProcessor

.PrepRequestProcessor#pRequest
case OpCode.create:
                CreateRequest createRequest = new CreateRequest();
                "這里得到了創(chuàng)建節(jié)點對應(yīng)的zxid脉顿,轉(zhuǎn)化成事物請求,主要添加了ChangeRecord"
                pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true);
                break;

接下來是SyncRequestProcessor点寥。append事物日志txnLog到streamsToFlush鏈表中艾疟。
然后flush到磁盤。
在接下來是FInalRequestProcessor开财。

FinalRequestProcessor#processRequest
synchronized (zks.outstandingChanges) {
            while (!zks.outstandingChanges.isEmpty()
                    && zks.outstandingChanges.get(0).zxid <= request.zxid) {
                ChangeRecord cr = zks.outstandingChanges.remove(0);
                if (cr.zxid < request.zxid) {
                    LOG.warn("Zxid outstanding "
                            + cr.zxid
                            + " is less than current " + request.zxid);
                }
                if (zks.outstandingChangesForPath.get(cr.path) == cr) {
                    zks.outstandingChangesForPath.remove(cr.path);
                }
            }
            if (request.hdr != null) {
               TxnHeader hdr = request.hdr;
               Record txn = request.txn;
                "處理事物請求"
               rc = zks.processTxn(hdr, txn);
            }
            // do not add non quorum packets to the queue.
            if (Request.isQuorum(request.type)) {
                zks.getZKDatabase().addCommittedProposal(request);
            }
        }
public String createNode(String path, byte data[], List<ACL> acl,
            long ephemeralOwner, int parentCVersion, long zxid, long time)
            throws KeeperException.NoNodeException,
            KeeperException.NodeExistsException {
         "省略N行代碼============"
        dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
        "其中這個地方是我們之前注冊的watcher,現(xiàn)在觸發(fā)回調(diào)"
        childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
                Event.EventType.NodeChildrenChanged);
        return path;
    }
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
        WatchedEvent e = new WatchedEvent(type,
                KeeperState.SyncConnected, path);
        HashSet<Watcher> watchers;
        synchronized (this) {
            "這個地方解釋了 ,watcher的一次性注冊误褪,需要繼續(xù)監(jiān)聽的話责鳍,得重新注冊watcher"
            watchers = watchTable.remove(path);
            if (watchers == null || watchers.isEmpty()) {
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logTraceMessage(LOG,
                            ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                            "No watchers for " + path);
                }
                return null;
            }
            for (Watcher w : watchers) {
                HashSet<String> paths = watch2Paths.get(w);
                if (paths != null) {
                    paths.remove(path);
                }
            }
        }
        for (Watcher w : watchers) {
            if (supress != null && supress.contains(w)) {
                continue;
            }
            "這里觸發(fā)回調(diào),記得之前我們放入的是ServerCnxn對象兽间,所以我們跳到ServerCnxn的代碼"
            w.process(e);
        }
        return watchers;
    }
.NIOServerCnxn#process
synchronized public void process(WatchedEvent event) {
        "注意這里的zxid是-1,在客戶端判斷的時候會用到"
        ReplyHeader h = new ReplyHeader(-1, -1L, 0);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                     "Deliver event " + event + " to 0x"
                                     + Long.toHexString(this.sessionId)
                                     + " through " + this);
        }

        // Convert WatchedEvent to a type that can be sent over the wire
        WatcherEvent e = event.getWrapper();
        "因為我們發(fā)送的都是watcher历葛,所以watcherEvent才是可以在網(wǎng)絡(luò)上傳輸?shù)?
        sendResponse(h, e, "notification");
    }
3.4 接下來就是最后Client端的回調(diào)
.ClientCnxn.SendThread#readResponse
if (replyHdr.getXid() == -1) {
                // -1 means notification
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got notification sessionid:0x"
                        + Long.toHexString(sessionId));
                }
                WatcherEvent event = new WatcherEvent();
                event.deserialize(bbia, "response");

                // convert from a server path to a client path
                if (chrootPath != null) {
                    String serverPath = event.getPath();
                    if(serverPath.compareTo(chrootPath)==0)
                        event.setPath("/");
                    else if (serverPath.length() > chrootPath.length())
                        event.setPath(serverPath.substring(chrootPath.length()));
                    else {
                        LOG.warn("Got server path " + event.getPath()
                                + " which is too short for chroot path "
                                + chrootPath);
                    }
                }

                WatchedEvent we = new WatchedEvent(event);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got " + we + " for sessionid 0x"
                            + Long.toHexString(sessionId));
                }
                "把接收到的Event放入waitingEvents隊列中"
                eventThread.queueEvent( we );
                return;
            }
public void queueEvent(WatchedEvent event) {
            if (event.getType() == EventType.None
                    && sessionState == event.getState()) {
                return;
            }
            sessionState = event.getState();

            // materialize the watchers based on the event
            WatcherSetEventPair pair = new WatcherSetEventPair(
                    "根據(jù)event,從ClientWatchManager嘀略,也就是ZKWatchManager恤溶,"
"中找到watcher 集合,我們前面觸發(fā)的是NodeChildrenChanged帜羊,所以從Map<String, Set<Watcher>> "
"childWatches獲得Set<Watcher>"
                    watcher.materialize(event.getState(), event.getType(),
                            event.getPath()),
                            event);
            // queue the pair (watch set & event) for later processing
            waitingEvents.add(pair);
        }
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
                                        Watcher.Event.EventType type,
                                        String clientPath)
        {
            Set<Watcher> result = new HashSet<Watcher>();

            switch (type) {
               "省略n行代碼===================="
            case NodeChildrenChanged:
                synchronized (childWatches) {
                    "從childWatches中刪除咒程,刪除的watcher都加入到result返回結(jié)果中"
                    addTo(childWatches.remove(clientPath), result);
                }
                break;
            default:
                String msg = "Unhandled watch event type " + type
                    + " with state " + state + " on path " + clientPath;
                LOG.error(msg);
                throw new RuntimeException(msg);
            }

            return result;
        }
.ClientCnxn.EventThread#run
public void run() {
           try {
              isRunning = true;
              while (true) {
                 Object event = waitingEvents.take();
                 if (event == eventOfDeath) {
                    wasKilled = true;
                 } else {
                      "客戶端的回調(diào)"
                    processEvent(event);
                 }
                 if (wasKilled)
                    synchronized (waitingEvents) {
                       if (waitingEvents.isEmpty()) {
                          isRunning = false;
                          break;
                       }
                    }
              }
           } catch (InterruptedException e) {
              LOG.error("Event thread exiting due to interruption", e);
           }

            LOG.info("EventThread shut down for session: 0x{}",
                     Long.toHexString(getSessionId()));
        }
private void processEvent(Object event) {
          try {
             if (event instanceof WatcherSetEventPair) {
                  // each watcher will process the event
                  WatcherSetEventPair pair = (WatcherSetEventPair) event;
                  for (Watcher watcher : pair.watchers) {
                      try {
                            "客戶端的回調(diào)"
                          watcher.process(pair.event);
                      } catch (Throwable t) {
                          LOG.error("Error while calling watcher ", t);
                      }
                  }
              } 
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市讼育,隨后出現(xiàn)的幾起案子帐姻,更是在濱河造成了極大的恐慌稠集,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,858評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件饥瓷,死亡現(xiàn)場離奇詭異剥纷,居然都是意外死亡,警方通過查閱死者的電腦和手機呢铆,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評論 3 395
  • 文/潘曉璐 我一進店門晦鞋,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人棺克,你說我怎么就攤上這事悠垛。” “怎么了逆航?”我有些...
    開封第一講書人閱讀 165,282評論 0 356
  • 文/不壞的土叔 我叫張陵鼎文,是天一觀的道長。 經(jīng)常有香客問我因俐,道長拇惋,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,842評論 1 295
  • 正文 為了忘掉前任抹剩,我火速辦了婚禮撑帖,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘澳眷。我一直安慰自己胡嘿,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,857評論 6 392
  • 文/花漫 我一把揭開白布钳踊。 她就那樣靜靜地躺著衷敌,像睡著了一般。 火紅的嫁衣襯著肌膚如雪拓瞪。 梳的紋絲不亂的頭發(fā)上缴罗,一...
    開封第一講書人閱讀 51,679評論 1 305
  • 那天,我揣著相機與錄音祭埂,去河邊找鬼面氓。 笑死,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播声离,決...
    沈念sama閱讀 40,406評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼嘁傀,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,311評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎柏锄,沒想到半個月后酿箭,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,767評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡趾娃,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年缭嫡,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片抬闷。...
    茶點故事閱讀 40,090評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡妇蛀,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出笤成,到底是詐尸還是另有隱情评架,我是刑警寧澤,帶...
    沈念sama閱讀 35,785評論 5 346
  • 正文 年R本政府宣布炕泳,位于F島的核電站纵诞,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏培遵。R本人自食惡果不足惜浙芙,卻給世界環(huán)境...
    茶點故事閱讀 41,420評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望籽腕。 院中可真熱鬧嗡呼,春花似錦、人聲如沸皇耗。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽郎楼。三九已至万伤,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間呜袁,已是汗流浹背敌买。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評論 1 271
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留傅寡,地道東北人放妈。 一個月前我還...
    沈念sama閱讀 48,298評論 3 372
  • 正文 我出身青樓北救,卻偏偏與公主長得像荐操,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子珍策,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,033評論 2 355