zookeeper watcher機制源碼解析

系統(tǒng)模型

zk的視圖結(jié)構(gòu)和標準的unix文件系統(tǒng)非常類似膳音,但沒有引入傳統(tǒng)文件系統(tǒng)中目錄和文件等概念,而是使用了其獨有的”數(shù)據(jù)節(jié)點“概念苍凛,稱之為ZNode兵志,Znode是Zookeeper中數(shù)據(jù)的最小單元想罕, 每個Znode上都可以保存數(shù)據(jù),同時還可以掛載子節(jié)點惭适,這樣構(gòu)成了一個層次化的命名空間楼镐,我們稱之為樹框产。


image.png

事務: 在zk中,事務是指能夠改變Zookeeper服務器狀態(tài)的操作盾舌,也叫事務操作或更新操作蘸鲸,一般包括數(shù)據(jù)節(jié)點創(chuàng)建與刪除酌摇、數(shù)據(jù)節(jié)點內(nèi)容更新和客戶端會話創(chuàng)建與失效等操作。對于每一個事務請求仍稀,zk都會為其分配一個全局唯一的事務ID,用ZXID來表示埂息,通常64位,每一個ZXID對應一次更新操作铲掐。

**節(jié)點類型:在zk中值桩,每個數(shù)據(jù)節(jié)點都是有生命周期的奔坟,其生命周期的長短取決于數(shù)據(jù)節(jié)點的節(jié)點類型:

  • 持久節(jié)點: 該數(shù)據(jù)節(jié)點被創(chuàng)建后,就會一直存在于ZooKeeper服務器上婉支,直到有刪除操作來清除這個節(jié)點澜建。
  • 持久順序節(jié)點: 在持久性的基礎(chǔ)上增加了順序性霎奢,zk會根據(jù)節(jié)點創(chuàng)建的先后順序饼灿,自動為給定節(jié)點加上一個數(shù)字后綴,
  • 臨時節(jié)點: 臨時節(jié)點的生命周期是和客戶端的會話綁定在一起晤硕,客戶端會話失效舞箍,這個節(jié)點就會被刪除
  • 臨時順序節(jié)點: 在臨時節(jié)點的基礎(chǔ)上有了順序性這個特性皆疹。

Watcher機制

image.png

zk的watcher由客戶端略就、客戶端watchManager和zk服務器組成,zk客戶端向zk服務器注冊watcher的同時窄绒,會將watcher對象存儲在客戶端的watcherManager彰导,zk服務器觸發(fā)watcher事件后,會向客戶端發(fā)送通知山析,客戶端線程從watcher manager中取出對應的watcher對象倔幼,執(zhí)行相應的回調(diào)邏輯损同。

image.png

上圖中的小紅旗是一個watcher,當小紅旗被創(chuàng)建并注冊到node1節(jié)點后茂卦,就會監(jiān)聽node1+ node_a + node_b或者node_a + node_b,這里兩種情況是因為在創(chuàng)建watcher注冊時有多種途徑等龙,并且watcher不能監(jiān)聽到孫子節(jié)點伶贰,此外黍衙,watcher設置后是一次性的,觸發(fā)一次后就失效位仁,如果想一直監(jiān)聽方椎,需要在process回調(diào)函數(shù)里重新注冊相同的watcher
在zk中接口類Watcher定義了事件通知相關(guān)的邏輯棠众,包含了KeeperState和EventType兩個枚舉類,代表通知狀態(tài)和事件類型轿亮,

image.png

Watcher接口擁有process函數(shù)我注,用于處理回調(diào)
內(nèi)部類Event又包含內(nèi)部類KeeperState以及EventType
KeeperState用于記錄Event發(fā)生時的zk狀態(tài)(通知狀態(tài))
EventType用于記錄Event的類型

方法process

//回調(diào)函數(shù)實現(xiàn)該函數(shù)迟隅,表示根據(jù)event執(zhí)行的行為
abstract public void process(WatchedEvent event);

內(nèi)部類Event
包含KeeperState和EventType兩個內(nèi)部類,通過枚舉類實現(xiàn)
方法很簡單掠抬,就是int值與對應枚舉類型的轉(zhuǎn)換
兩者的枚舉類型以及兩者之間的關(guān)系校哎,觸發(fā)條件可以參考《paxos到zk》中的圖

image.png

WatchedEvent 和 WatcherEvent

WatchedEvent :代表zk上一個Watcher能夠回應的變化,包含了變化事件的類型闷哆,zk狀態(tài)以及變化影響的znode的path
WatcherEvent : 是WatchedEvent用于網(wǎng)絡傳輸?shù)姆庋b類


image.png

三個成員變量很好的解釋了WatchedEvent的意義抱怔,即事件的類型,zk狀態(tài)以及變化影響的znode的path

WatcherEvent有一個getWrapper方法局冰,

/**
     *  Convert WatchedEvent to type that can be sent over network
     */
    //轉(zhuǎn)化成可供網(wǎng)絡傳輸灌危,序列化的WatcherEvent
    public WatcherEvent getWrapper() {
        return new WatcherEvent(eventType.getIntValue(), 
                                keeperState.getIntValue(), 
                                path);
    }
}

WatcherEvent實現(xiàn)了Record接口勇蝙,可以理解為WatchedEvent用于網(wǎng)絡傳輸?shù)姆庋b類

ClientWatchManager接口和實現(xiàn)類ZKWatchManager

ClientWatchManager接口: 用戶根據(jù)Event得到需要通知的watcher
ZKWatchManager為ClientWatchManager的實現(xiàn),ClientWatchManager接口只有一個函數(shù),

//ClientWatchManager負責根據(jù)Event得到需要通知哪些watcher
    public Set<Watcher> materialize(Watcher.Event.KeeperState state,
        Watcher.Event.EventType type, String path);

默認實現(xiàn)類是Zookeeper的內(nèi)部類ZKWatchManager,

private static class ZKWatchManager implements ClientWatchManager {
        private final Map<String, Set<Watcher>> dataWatches =
            new HashMap<String, Set<Watcher>>();//針對內(nèi)容的watch
        private final Map<String, Set<Watcher>> existWatches =
            new HashMap<String, Set<Watcher>>();//針對exist API相關(guān)的watch
        private final Map<String, Set<Watcher>> childWatches =
            new HashMap<String, Set<Watcher>>();//針對getChildren API相關(guān)的watch

        private volatile Watcher defaultWatcher;//client傳遞的,默認的watcher實現(xiàn)

        final private void addTo(Set<Watcher> from, Set<Watcher> to) {
            if (from != null) {
                to.addAll(from);
            }
        }

        /* (non-Javadoc)
         * @see org.apache.zookeeper.ClientWatchManager#materialize(Event.KeeperState, 
         *                                                        Event.EventType, java.lang.String)
         */
        @Override
        public Set<Watcher> materialize(Watcher.Event.KeeperState state,
                                        Watcher.Event.EventType type,
                                        String clientPath)
        {
            Set<Watcher> result = new HashSet<Watcher>();

            switch (type) {
            case None://eventType是null
                // 則所有dataWatches,existWatches,childWatches都需要被通知
                result.add(defaultWatcher);//添加默認watcher
                boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
                        state != Watcher.Event.KeeperState.SyncConnected;//獲取clear標記

                synchronized(dataWatches) {
                    for(Set<Watcher> ws: dataWatches.values()) {
                        result.addAll(ws);
                    }
                    if (clear) {
                        dataWatches.clear();
                    }
                }

                synchronized(existWatches) {
                    for(Set<Watcher> ws: existWatches.values()) {
                        result.addAll(ws);
                    }
                    if (clear) {
                        existWatches.clear();
                    }
                }

                synchronized(childWatches) {
                    for(Set<Watcher> ws: childWatches.values()) {
                        result.addAll(ws);
                    }
                    if (clear) {
                        childWatches.clear();
                    }
                }

                return result;
            case NodeDataChanged:
            case NodeCreated:
                //如果節(jié)點內(nèi)容變化或者創(chuàng)建
                synchronized (dataWatches) {
                    addTo(dataWatches.remove(clientPath), result);//從dataWatches中移除,并且添加到result中
                }
                synchronized (existWatches) {
                    addTo(existWatches.remove(clientPath), result);//從existWatches中移除盗誊,并且添加到result中
                }
                break;
            case NodeChildrenChanged:
                synchronized (childWatches) {
                    addTo(childWatches.remove(clientPath), result);
                }
                break;
            case NodeDeleted:
                synchronized (dataWatches) {
                    addTo(dataWatches.remove(clientPath), result);
                }
                // XXX This shouldn't be needed, but just in case
                synchronized (existWatches) {
                    Set<Watcher> list = existWatches.remove(clientPath);
                    if (list != null) {
                        addTo(existWatches.remove(clientPath), result);
                        LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
                    }
                }
                synchronized (childWatches) {
                    addTo(childWatches.remove(clientPath), result);
                }
                break;
            default://默認處理
                String msg = "Unhandled watch event type " + type
                    + " with state " + state + " on path " + clientPath;
                LOG.error(msg);
                throw new RuntimeException(msg);
            }
            //返回結(jié)果
            return result;
        }
    }

該方法在事件發(fā)生后哈踱,返回需要被通知的Watcher集合开镣。是根據(jù)已經(jīng)注冊的watches(分為三類,data,children,exist)咽扇,根據(jù)path找到對應的watches陕壹,得到一個result集合進行返回

** WatcherSetEventPair ** : 將Event以及對應需要觸發(fā)的watches集合進行組合綁定,放到waitingEvents隊列

  private static class WatcherSetEventPair {
        private final Set<Watcher> watchers;//事件觸發(fā)需要被通知的watches集合
        private final WatchedEvent event;//事件

        public WatcherSetEventPair(Set<Watcher> watchers, WatchedEvent event) {
            this.watchers = watchers;
            this.event = event;
        }
    }

watcher注冊過程
創(chuàng)建zk客戶端對象實例時注冊:

ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean 
canBeReadOnly)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)

通過這種方式注冊的watcher將會作為整個zk會話期間默認的watcher糠馆,會一直被保存在客戶端zk watchManager的defaultWatcher中又碌。
其他注冊watcher的API:

getChildren(String path, Watcher watcher)
getChildren(String path, boolean watch)
exists(String path, Watcher watcher)
getData(String path, boolean watch, Stat stat)
...

Boolean watch表示是否使用上下文默認的watcher绊袋,即創(chuàng)建zk時設置的watcher

客戶端注冊過程

我們以getData這個接口為例說明:
watcher在請求中,通過標志位發(fā)送給server

   public byte[] getData(final String path, Watcher watcher, Stat stat)
        throws KeeperException, InterruptedException
     {
        final String clientPath = path;
        PathUtils.validatePath(clientPath);

        // the watch contains the un-chroot path
        WatchRegistration wcb = null;
        if (watcher != null) {//如果有watcher愤炸,就注冊
            wcb = new DataWatchRegistration(watcher, clientPath);//生成一個DataWatchRegistration,即Data的watch的注冊
        }

        final String serverPath = prependChroot(clientPath);

        RequestHeader h = new RequestHeader();//生成請求頭
        h.setType(ZooDefs.OpCode.getData);//設置請求類型為getData
        GetDataRequest request = new GetDataRequest();
        request.setPath(serverPath);
        request.setWatch(watcher != null);//設置標志位,是否函數(shù)watch
        GetDataResponse response = new GetDataResponse();
        ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);//client端提交請求
        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                    clientPath);
        }
        if (stat != null) {
            DataTree.copyStat(response.getStat(), stat);
        }
        return response.getData();
}
    public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException {
        return this.getData(path, watch?this.watchManager.defaultWatcher:null, stat);
    }

里面調(diào)用了ClientCnxn#submitRequestsubmitRequest 源碼如下

//提交請求
    public ReplyHeader submitRequest(RequestHeader h, Record request,
            Record response, WatchRegistration watchRegistration)
            throws InterruptedException {
        ReplyHeader r = new ReplyHeader();//生成回復頭
        Packet packet = queuePacket(h, r, request, response, null, null, null,
                    null, watchRegistration);
        synchronized (packet) {
            while (!packet.finished) {//如果packet沒有處理完,就一直等著
                packet.wait();
            }
        }
        return r;
    }

里面調(diào)用了ClientCnxn#queuePacket函數(shù), queuePacket函數(shù)作為生產(chǎn)者,代碼中調(diào)用

outgoingQueue.add(packet);

在 ZooKeeper 中缤苫,Packet 是一個最小的通信協(xié)議單元墅拭,即數(shù)據(jù)包。
Pakcet 用于進行客戶端與服務端之間的網(wǎng)絡傳輸舒憾,任何需要傳輸?shù)膶ο蠖夹枰b成一個 Packet 對象穗熬。
在 ClientCnxn 中 WatchRegistration 也會被封裝到 Packet 中,調(diào)用 queuePacket放入outgoingQueue即發(fā)送隊列中(生產(chǎn)packet)
然后SendThread 線程調(diào)用doTransport方法,從outgoingQueue中消費Packet,客戶端發(fā)送, 參考實現(xiàn)類ClientCnxnSocketNIO#doTransport里面調(diào)用了ClientCnxnSocketNIO#doIO此時是發(fā)送請求唤蔗,調(diào)用了ClientCnxn.Packet#createBB

 public void createBB() {
            try {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
                boa.writeInt(-1, "len"); // We'll fill this in later
                if (requestHeader != null) {
                    requestHeader.serialize(boa, "header");//序列化請求頭妓柜,包含xid和type
                }
                if (request instanceof ConnectRequest) {
                    request.serialize(boa, "connect");
                    // append "am-I-allowed-to-be-readonly" flag
                    boa.writeBool(readOnly, "readOnly");
                } else if (request != null) {
                    request.serialize(boa, "request");//序列化request(對于特定請求如GetDataRequest,包含了是否存在watcher的標志位)
                }
                baos.close();
                this.bb = ByteBuffer.wrap(baos.toByteArray());
                this.bb.putInt(this.bb.capacity() - 4);
                this.bb.rewind();
            } catch (IOException e) {
                LOG.warn("Ignoring unexpected exception", e);
            }
        }

client的watcher并沒有進行網(wǎng)絡傳輸,server并不知道client的watcher觸發(fā)時process函數(shù)要怎么執(zhí)行
但是對于特定請求類型比如GetDataRequest,序列化的時候會傳遞一個標志位watch藏雏,表示是否watch
server在處理的時候作煌,只知道client是否watch某個path

上面getData兩個接口都可以進行Watcher的注冊,第二個接口通過一個Boolean參數(shù)來標識是否使用默認的Watcher來進行注冊蚤告,然后還是調(diào)用第一個方法來完成注冊邏輯服爷,在第一個方法中仍源,客戶端使用this.cnxn.submitRequest(h, request, response, wcb) 方法向服務器發(fā)送這個注冊請求,完成請求發(fā)送后逗爹,
發(fā)送請求的時候嚎于,watcher還并沒有注冊在client端,要等server的返回,
ClientCnxn.SendThread中袍睡,讀取server的回復
調(diào)用了ClientCnxnSocketNIO#doTransport
調(diào)用了ClientCnxnSocketNIO#doIO
調(diào)用了ClientCnxn.SendThread#readResponse
調(diào)用了ClientCnxn#finishPacket
finishPacket方法斑胜,它會從Packet中取出對應的Watcher并注冊到zkWatchManager中去嫌吠,也即請求回復后辫诅,watcher才在client端注冊,

private void finishPacket(Packet p) {//Packet請求發(fā)送簇宽,收到回復吧享,進行處理之后
        if (p.watchRegistration != null) {//如果有要注冊的watchRegistration
            p.watchRegistration.register(p.replyHeader.getErr());//根據(jù)response code進行注冊
        }

        if (p.cb == null) {//如果沒有異步回調(diào)
            synchronized (p) {
                p.finished = true;
                p.notifyAll();
            }
        } else {//如果有異步回調(diào)
            p.finished = true;
            eventThread.queuePacket(p);
        }
    }
image.png

watchRegistration.register方法就是把 WatchRegistration 子類里面的 Watcher 實例放到 ZKWatchManager 的 dataWatches 中存儲起來

   abstract class WatchRegistration {//client中管理watch注冊的類
        private Watcher watcher;//注冊的watcher
        private String clientPath;//監(jiān)聽的znode path
        public WatchRegistration(Watcher watcher, String clientPath)
        {
            this.watcher = watcher;
            this.clientPath = clientPath;
        }
        //根據(jù)response的resultCode來獲取所有注冊的path以及對應的watcher集合
        abstract protected Map<String, Set<Watcher>> getWatches(int rc);

        /**
         * Register the watcher with the set of watches on path.
         * @param rc the result code of the operation that attempted to
         * add the watch on the path.
         */
        public void register(int rc) {//根據(jù)response的resultCode來注冊watcher到一個path
            if (shouldAddWatch(rc)) {//如果可以添加
                Map<String, Set<Watcher>> watches = getWatches(rc);//獲取所有注冊的path以及對應的watcher集合
                synchronized(watches) {
                    Set<Watcher> watchers = watches.get(clientPath);//找到該path
                    if (watchers == null) {
                        watchers = new HashSet<Watcher>();
                        watches.put(clientPath, watchers);
                    }
                    watchers.add(watcher);//添加當前watcher
                }
            }
        }
        /**
         * Determine whether the watch should be added based on return code.
         * @param rc the result code of the operation that attempted to add the
         * watch on the node
         * @return true if the watch should be added, otw false
         */
        protected boolean shouldAddWatch(int rc) {//根據(jù)resultCode判斷是否可以添加watch
            return rc == 0;
        }
    }

屬性clientPath和watcher分別是監(jiān)聽關(guān)注的znode的path和對應處理的watcher
注冊邏輯就是根據(jù)response的resultCode,判斷是否可以添加watch殊鞭,可以添加的話,就在Map<String, Set<Watcher>>添加記錄.

簡單來說锯仪,就是當使用ZooKeeper 構(gòu)造方法或者使用 getData趾盐、exists 和 getChildren 三個接口來向 ZooKeeper 服務器注冊 Watcher 的時候救鲤,首先將此消息傳遞給服務端,傳遞成功后斥扛,服務端會通知客戶端丹锹,然后客戶端將該路徑和Watcher對應關(guān)系存儲起來備用。

client注冊watcher的小結(jié)
1.client發(fā)送getData,getChildren,exist請求時峻村,傳入自定義的watcher粘昨,或利用ZooKeeper構(gòu)造函數(shù)的默認Watcher
2.將請求封裝為Packet,在RequestHeader記錄是否需要watcher,記錄放入生產(chǎn)者隊列ClientCnxn#outgoingQueue
3.ClientCnxn.SendThread消費outgoingQueue
  調(diào)用ClientCnxnSocketNIO#doTransport
  調(diào)用ClientCnxnSocketNIO#doIO
  調(diào)用ClientCnxn.Packet#createBB
  序列化的時候窜锯,將request記性序列化,里面包含一個是否帶有watch的標志位(不包含watcher對象)
4.server進行相應的處理,之后進行回復
  可以參考FinalRequestProcessor#processRequest中對于getData的請求處理
  利用getDataRequest.getWatch())吞瞪,看是否client需要watch芍秆,進而注冊到DataTree的WatchManager中翠勉,下面會講的
5.ClientCnxn.SendThread讀取回復
  調(diào)用ClientCnxnSocketNIO#doTransport
  調(diào)用ClientCnxnSocketNIO#doIO
  調(diào)用ClientCnxn.SendThread#readResponse
  調(diào)用ClientCnxn#finishPacket
  利用response code,進行watcher的注冊对碌,記錄在ZooKeeper.WatchRegistration對應的實現(xiàn)類中
服務端處理watcher
image.png

server前面的調(diào)用鏈這里不展開,從FinalRequestProcessor 類接收到客戶端請求后诉位,會調(diào)用 processRequest 方法進行處理菜枷,會進一步轉(zhuǎn)向 ZooKeeperServer 的 processRequest 進行進一步處理,處理結(jié)由 ZKDatabase 類返回


image.png

對于注冊 Watcher 請求椿息,F(xiàn)inalRequestProcessor 的 ProcessRequest 方法會判斷當前請求是否需要注冊 Watcher寝优,如果為 true枫耳,就會將當前的 ServerCnxn 對象和數(shù)據(jù)節(jié)點路徑傳入 getData 方法中去迁杨。ServerCnxn 是一個 ZooKeeper 客戶端和服務器之間的連接接口,代表了一個客戶端和服務器的連接.
實現(xiàn)了Watcher接口捷沸,這個Watcher的實現(xiàn)類記錄了client和server的連接狐史,回調(diào)的時候骏全,可以直接發(fā)送response告訴client,有事件觸發(fā)了试吁,數(shù)據(jù)節(jié)點的節(jié)點路徑和 ServerCnxn 最終會被存儲在 WatchManager 的 watchTable 和 watch2Paths 中楼咳。

case OpCode.getData: {//getData請求
                lastOp = "GETD";
                GetDataRequest getDataRequest = new GetDataRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request,
                        getDataRequest);//反序列化出getDataRequest
                DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
                if (n == null) {
                    throw new KeeperException.NoNodeException();
                }//驗證path對應的node是否存在
                PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
                        ZooDefs.Perms.READ,
                        request.authInfo);//驗證ACL權(quán)限
                Stat stat = new Stat();
                byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                        getDataRequest.getWatch() ? cnxn : null);//如果有watch標志位母怜,Watcher就傳cnxn
                rsp = new GetDataResponse(b, stat);
                break;
            }

上面的FinalRequestProcessor#processRequest調(diào)用會進入
ZKDatabase#getData
DataTree#getData

public byte[] getData(String path, Stat stat, Watcher watcher)
            throws KeeperException.NoNodeException {
        DataNode n = nodes.get(path);
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        synchronized (n) {
            n.copyStat(stat);
            if (watcher != null) {
                dataWatches.addWatch(path, watcher);//注冊watcher到dataWatches
            }
            return n.data;//返回byte[]
        }
    }

WatchManager 負責 Watcher 事件的觸發(fā)糙申,它是一個統(tǒng)稱,在服務端 DataTree 會托管兩個 WatchManager缕陕,分別是 dataWatches 和 childWatches疙挺,分別對應數(shù)據(jù)變更 Watcher 和子節(jié)點變更 Watcher铐然。
.WatchManger 兩個隊列

private final HashMap<String, HashSet<Watcher>> watchTable =
 new HashMap<String, HashSet<Watcher>>();
 
 private final HashMap<Watcher, HashSet<String>> watch2Paths =
 new HashMap<Watcher, HashSet<String>>();
  public synchronized void addWatch(String path, Watcher watcher) {
        HashSet<Watcher> list = watchTable.get(path);
        if (list == null) {
            // don't waste memory if there are few watches on a node
            // rehash when the 4th entry is added, doubling size thereafter
            // seems like a good compromise
            list = new HashSet<Watcher>(4);
            watchTable.put(path, list);
        }
        list.add(watcher);

        HashSet<String> paths = watch2Paths.get(watcher);
        if (paths == null) {
            // cnxns typically have many watches, so use default cap here
            paths = new HashSet<String>();
            watch2Paths.put(watcher, paths);
        }
        paths.add(path);
    }

結(jié)合上面的時序圖,就可以理解請求是如何經(jīng)過ZKdatabase到DataTree最終記錄在WatchManager,這里就完成了watcher在服務端的注冊沥阳。

server觸發(fā)watch

當發(fā)生 Create桐罕、Delete桂敛、NodeChange(數(shù)據(jù)變更)這樣的事件后术唬,DataTree 會調(diào)用相應方法去觸發(fā) WatchManager 的 triggerWatch 方法,該方法返回 ZNODE 的信息粗仓,自此進入到回調(diào)本地 process 的序列借浊,這里以setdata為例:
processTxn 代碼

public ProcessTxnResult processTxn(TxnHeader header, Record txn)
 {
 ProcessTxnResult rc = new ProcessTxnResult();
 
 try {
switch (header.getType()) {
case OpCode.setData:
 SetDataTxn setDataTxn = (SetDataTxn) txn;
 rc.path = setDataTxn.getPath();
 rc.stat = setData(setDataTxn.getPath(), setDataTxn
 .getData(), setDataTxn.getVersion(), header
 .getZxid(), header.getTime());
 break;

setData 代碼

public Stat setData(String path, byte data[], int version, long zxid,
 long time) throws KeeperException.NoNodeException {
 Stat s = new Stat();
 DataNodeV1 n = nodes.get(path);
 if (n == null) {
 throw new KeeperException.NoNodeException();
 }
 synchronized (n) {
 n.data = data;
 n.stat.setMtime(time);
 n.stat.setMzxid(zxid);
 n.stat.setVersion(version);
 n.copyStat(s);
 }
 dataWatches.triggerWatch(path, EventType.NodeDataChanged);
 return s;
 }

triggerWatch 代碼

public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
 KeeperState.SyncConnected, path);
//將事件類型(EventType)巴碗、通知狀態(tài)(WatchedEvent)、節(jié)點路徑封裝成一個 WatchedEvent 對象
 HashSet<Watcher> watchers;
 synchronized (this) {
//根據(jù)數(shù)據(jù)節(jié)點的節(jié)點路徑從 watchTable 里面取出對應的 Watcher召噩。如果沒有找到 Watcher 對象逸爵,
說明沒有任何客戶端在該數(shù)據(jù)節(jié)點上注冊過 Watcher师倔,直接退出。如果找打了 Watcher 就將其提取出來疲恢,
同時會直接從 watchTable 和 watch2Paths 里刪除 Watcher,即 Watcher 是一次性的棚愤,觸發(fā)一次就失效了杂数。
 watchers = watchTable.remove(path);
for (Watcher w : watchers) {
 HashSet<String> paths = watch2Paths.get(w);
 }
 }
 for (Watcher w : watchers) {
 if (supress != null && supress.contains(w)) {
 continue;
 }
//對于需要注冊 Watcher 的請求揍移,ZooKeeper 會把請求對應的 ServerCnxn 作為一個 Watcher 存儲,
所以這里調(diào)用的 process 方法實質(zhì)上是 ServerCnxn 的對應方法
 w.process(e);
 }
 return watchers;
}

從上面的代碼我們可以總結(jié)出斯够,如果想要處理一個 Watcher读规,需要執(zhí)行的步驟如下所示:

  1. 將事件類型(EventType)燃少、通知狀態(tài)(WatchedEvent)、節(jié)點路徑封裝成一個 WatchedEvent 對象碍遍。

  2. 根據(jù)數(shù)據(jù)節(jié)點的節(jié)點路徑從 watchTable 里面取出對應的 Watcher阳液。如果沒有找到 Watcher 對象帘皿,說明沒有任何客戶端在該數(shù)據(jù)節(jié)點上注冊過 Watcher,直接退出虽填。如果找到了 Watcher 就將其提取出來曹动,同時會直接從 watchTable 和 watch2Paths 里刪除 Watcher墓陈,即 Watcher 是一次性的第献,觸發(fā)一次就失效了庸毫。

  3. 對于需要注冊 Watcher 的請求押框,ZooKeeper 會把請求對應的 ServerCnxn 作為一個 Watcher 存儲橡伞,所以這里調(diào)用的 process 方法實質(zhì)上是 ServerCnxn 的對應方法晋被,在請求頭標記“-1”表示當前是一個通知,將 WatchedEvent 包裝成 WatcherEvent 用于網(wǎng)絡傳輸序列化挂脑,向客戶端發(fā)送通知

  @Override
    synchronized public void process(WatchedEvent event) {
        ReplyHeader h = new ReplyHeader(-1, -1L, 0);//xid為-1表示為通知
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                     "Deliver event " + event + " to 0x"
                                     + Long.toHexString(this.sessionId)
                                     + " through " + this);
        }

        // Convert WatchedEvent to a type that can be sent over the wire
        WatcherEvent e = event.getWrapper();//包裝為WatcherEvent來提供網(wǎng)絡傳輸

        sendResponse(h, e, "notification");//給client發(fā)送請求,通知WatchedEvent的發(fā)生
    }

也就是說崭闲,server觸發(fā)watcher,回調(diào)process函數(shù)其實就是告訴需要watch的client威蕉,WatcherEvent 發(fā)生了

服務端處理watcher小結(jié)

注冊時watcher是ServerCnxn類型韧涨,保存了和client的會話,如果client發(fā)送請求的時候,request的標志位watch為true如孝,server才會將這個會話注冊到WatchManager(否則server知道client對這個path不感興趣第晰,下次這個path變化了也不通知你)
觸發(fā)watcher時彬祖,就利用watchManager,找到path對應的watcher即ServerCnxn腹躁,告訴連接的client方南蓬,發(fā)生了WatcherEvent,client自己再處理

client回調(diào)watcher

客戶端收到消息后,會調(diào)用 ClientCnxn 的 SendThread.readResponse 方法來進行統(tǒng)一處理弱左,如清單所示炕淮。如果響應頭 replyHdr 中標識的 Xid 為 02,表示是 ping涂圆,如果為-4润歉,表示是驗證包,如果是-1嚼鹉,表示這是一個通知類型的響應驱富,然后進行反序列化、處理 chrootPath褐鸥、還原 WatchedEvent宴树、回調(diào) Watcher 等步驟,其中回調(diào) Watcher 步驟將 WacthedEvent 對象交給 EventThread 線程晶疼,在下一個輪詢周期中進行 Watcher 回調(diào)酒贬。

客戶端回調(diào)watcher

服務端會通過使用 ServerCnxn 對應的 TCP 連接來向客戶端發(fā)送一個 WatcherEvent 事件。ClientCnxn.SendThread讀取回復
調(diào)用ClientCnxnSocketNIO#doTransport
調(diào)用ClientCnxnSocketNIO#doIO
調(diào)用ClientCnxn.SendThread#readResponse
里面處理事件通知的代碼段

            if (replyHdr.getXid() == -1) {//-1代表通知類型 即WatcherEvent

                // -1 means notification
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got notification sessionid:0x"
                        + Long.toHexString(sessionId));
                }
                WatcherEvent event = new WatcherEvent();
                event.deserialize(bbia, "response");//反序列化WatcherEvent

                // convert from a server path to a client path
                if (chrootPath != null) {//把serverPath轉(zhuǎn)化成clientPath
                    String serverPath = event.getPath();
                    if(serverPath.compareTo(chrootPath)==0)
                        event.setPath("/");
                    else if (serverPath.length() > chrootPath.length())
                        event.setPath(serverPath.substring(chrootPath.length()));
                    else {
                        LOG.warn("Got server path " + event.getPath()
                                + " which is too short for chroot path "
                                + chrootPath);
                    }
                }

                WatchedEvent we = new WatchedEvent(event);//WatcherEvent還原成WatchedEvent
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got " + we + " for sessionid 0x"
                            + Long.toHexString(sessionId));
                }
                eventThread.queueEvent( we );//加入隊列
                return;
            }

對于一個來自服務端的響應翠霍,都是經(jīng)過一堆的 NIO 處理類到達客戶端,然后由 SendThread.readResponse(ByteBuffer incomingBuffer) 方法來統(tǒng)一進行處理的寒匙。如果響應頭 replyHdr 中標識了 xid 為 -1零如,表面這是一個通知類型的響應,對其的處理大體上分為如下步驟锄弱。

  • 反序列化 packet2.deserialize(bbia, “response”);將字節(jié)流轉(zhuǎn)換成 WatcherEvent 對象考蕾。
  • 還原WatchedEvent, WatchedEvent we1 = new WatchedEvent(packet2);
  • 回調(diào) Watcher : ClientCnxn.this.eventThread.queueEvent(we1); 最后將 WatchedEvent 對象交給 eventThread 線程会宪,在下一個輪詢周期中進行回調(diào)肖卧。

下面來看一下eventThread.queueEvent(we1)里面的邏輯:

     public void queueEvent(WatchedEvent event) {//將WatchedEvent加入隊列
           if (event.getType() == EventType.None
                   && sessionState == event.getState()) {
               return;
           }
           sessionState = event.getState();

           // materialize the watchers based on the event
           WatcherSetEventPair pair = new WatcherSetEventPair(
                   watcher.materialize(event.getState(), event.getType(),
                           event.getPath()),
                           event);//用WatcherSetEventPair封裝watchers和watchedEvent
           // queue the pair (watch set & event) for later processing
           waitingEvents.add(pair);//加入隊列
       }

對于這個方法,首先使用該 event 來生成一個 WatcherSetEventPair 類型的pari掸鹅,這個pari只是把 event 加了一個殼塞帐,然后附加上了 這個節(jié)點上所有的 Watcher :

    private static class WatcherSetEventPair {
        private final Set<Watcher> watchers;
        private final WatchedEvent event;

那么是如何獲取到注冊該節(jié)點的所有watcher呢拦赠?看一下上面的 ClientCnxn.this.watcher.materialize(event.getState(), event.getType(), event.getPath()) 這個方法,以 NodeCreated 事件為例:

        public Set<Watcher> materialize(KeeperState state, EventType type, String clientPath) {
            HashSet result = new HashSet();
            Map msg;
            switch(null.$SwitchMap$org$apache$zookeeper$Watcher$Event$EventType[type.ordinal()]) {
            ...
            case NodeDataChanged:
            case NodeCreated:
                synchronized (dataWatches) {
                    addTo(dataWatches.remove(clientPath), result);
                }
                synchronized (existWatches) {
                    addTo(existWatches.remove(clientPath), result);
                }
                break;
            case NodeChildrenChanged:
                synchronized (childWatches) {
                    addTo(childWatches.remove(clientPath), result);
                }
                break;
                ...

客戶端在識別出事件類型 EventType 后葵姥,會從相應的 Watcher 存儲(即 dataWatches荷鼠、existWatches 或 childWatches 中的一個或多個,本例中就是從 dataWatches 和 existWatches 兩個存儲中獲取榔幸,因為允乐,節(jié)點創(chuàng)建事件不會在 childWatches 中存儲)中去除對應的 Watcher。需要注意的是削咆,這里使用的是 remove 接口牍疏,因此也表明了客戶端的 Watcher 機制同樣是一次性的,即一旦被觸發(fā)后态辛,該 Watcher 就失效了麸澜。

取到所有的 Watcher 后挺尿,放到 pari 的 Set 里面奏黑,然后再把這個 pari 放到 waitingEvents 里面,而 waitingEvents 是啥玩意兒呢编矾?

private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue();

        public void run() {
            try {
                this.isRunning = true;

                while(true) {
                    Object e = this.waitingEvents.take();//循環(huán)取pari
                    if(e == ClientCnxn.this.eventOfDeath) {
                        this.wasKilled = true;
                    } else {
                        this.processEvent(e);//進行處理
                    }

                    if(this.wasKilled) {
                        LinkedBlockingQueue var2 = this.waitingEvents;
                        synchronized(this.waitingEvents) {
                            if(this.waitingEvents.isEmpty()) {
                                this.isRunning = false;
                                break;
                            }
                        }
                    }
                }
            } catch (InterruptedException var5) {
                ClientCnxn.LOG.error("Event thread exiting due to interruption", var5);
            }

        }

waitingEvents 是一個待處理 Watcher 的隊列熟史,waitingEvents的消費在ClientCnxn.EventThread#run中,EventThread 的 run() 方法會不斷從隊列中取數(shù)據(jù)窄俏,交由 processEvent 方法處理:

        private void processEvent(Object event) {
            try {
                if(event instanceof ClientCnxn.WatcherSetEventPair) {
                    ClientCnxn.WatcherSetEventPair t = (ClientCnxn.WatcherSetEventPair)event;
                    Iterator rc = t.watchers.iterator();

                    while(rc.hasNext()) {
                        Watcher clientPath = (Watcher)rc.next();

                        try {
                            clientPath.process(t.event);
                        } catch (Throwable var11) {
                            ClientCnxn.LOG.error("Error while calling watcher ", var11);
                        }
                    }
                } else {

OK蹂匹,針對于本次事件,取出所有的 Watcher 類型的對象凹蜈,遍歷運行process方法限寞,進行串行同步處理伍宦。此處 processEvent 方法中的 Watcher 才是之前客戶端真正注冊的 Watcher诬乞,調(diào)用其 process 方法就可以實現(xiàn) Watcher 的回調(diào)了喝峦。客戶端只能收到服務器發(fā)過來的相關(guān)事件通知氢卡,并不能獲取到對應數(shù)據(jù)節(jié)點的原始數(shù)據(jù)和變更后的數(shù)據(jù)炭剪,如果需要知道變更前或者變更后的數(shù)據(jù)靡菇,需要調(diào)用相關(guān)接口獲取新的數(shù)據(jù)鸽斟。

思考
  1. client注冊的watcher和server注冊的watcher有什么區(qū)別
    作用和類型有區(qū)別
    client注冊的watcher類型沒有限制,作用就是說client監(jiān)控到xx事件后干的事情挚冤,比如重新獲取數(shù)據(jù)
    server注冊的watcher都是ServerCnxn類型妈橄,作用就是告訴對應client 發(fā)生了xx WatchedEvent就行
    由于watcher并沒有直接在網(wǎng)絡進行傳輸庶近,所以兩者并不一樣

  2. server怎么知道一個WatchedEvent觸發(fā),要通知哪些client
    server的watch是ServerCnxn眷蚓,保持了和Client的對話鼻种,直接回調(diào)process就行了
    都是ServerCnxn(實現(xiàn)了Watcher)的功勞

watcher 特性總結(jié)
  • 輕量
    WatcherEvent 是 ZooKeeper 整個 Watcher 通知機制的最小通知單元,這個數(shù)據(jù)結(jié)構(gòu)中只包含三部分內(nèi)容:通知狀態(tài)沙热、事件類型和節(jié)點路徑普舆。也就是說恬口,Watcher 通知非常簡單,只會告訴客戶端發(fā)生了事件沼侣,而不會說明事件的具體內(nèi)容祖能。例如針對 NodeDataChanged 事件,ZooKeeper 的Watcher 只會通知客戶端指定數(shù)據(jù)節(jié)點的數(shù)據(jù)內(nèi)容發(fā)生了變更蛾洛,而對于原始數(shù)據(jù)以及變更后的新數(shù)據(jù)都無法從這個事件中直接獲取到养铸,而是需要客戶端主動重新去獲取數(shù)據(jù)——這也是 ZooKeeper 的 Watcher 機制的一個非常重要的特性≡欤客戶端向服務端注冊 Watcher 的時候钞螟,并不會把客戶端真實的 Watcher 對象傳遞到服務端,僅僅只是在客戶端請求中使用 boolean 類型屬性進行了標記谎碍,同時服務端也僅僅只是保存了當前連接的 ServerCnxn 對象鳞滨。這樣輕量級的 Watcher 機制設計,在網(wǎng)絡開銷和服務端內(nèi)存開銷上都是非常廉價的蟆淀。

  • 一次性
    無論是服務端還是客戶端拯啦,一旦一個 Watcher 被觸發(fā),ZooKeeper 都會將其從相應的存儲中移除熔任。因此褒链,在 Watcher 的使用上,需要反復注冊疑苔。這樣的設計有效地減輕了服務端的壓力,如果注冊一個 Watcher 之后一直有效甫匹,那么針對那些更新非常頻繁的節(jié)點,服務端會不斷地向客戶端發(fā)送事件通知惦费,這無論對于網(wǎng)絡還是服務端性能的影響都非常大兵迅。

  • 客戶端串行執(zhí)行
    客戶端 Watcher 回調(diào)的過程是一個串行同步的過程,這為我們保證了順序薪贫,

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末恍箭,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子后雷,更是在濱河造成了極大的恐慌季惯,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,194評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件臀突,死亡現(xiàn)場離奇詭異勉抓,居然都是意外死亡,警方通過查閱死者的電腦和手機候学,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評論 2 385
  • 文/潘曉璐 我一進店門藕筋,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人梳码,你說我怎么就攤上這事隐圾∥橄疲” “怎么了?”我有些...
    開封第一講書人閱讀 156,780評論 0 346
  • 文/不壞的土叔 我叫張陵暇藏,是天一觀的道長蜜笤。 經(jīng)常有香客問我,道長盐碱,這世上最難降的妖魔是什么把兔? 我笑而不...
    開封第一講書人閱讀 56,388評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮瓮顽,結(jié)果婚禮上县好,老公的妹妹穿的比我還像新娘。我一直安慰自己暖混,他們只是感情好缕贡,可當我...
    茶點故事閱讀 65,430評論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著拣播,像睡著了一般晾咪。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上诫尽,一...
    開封第一講書人閱讀 49,764評論 1 290
  • 那天禀酱,我揣著相機與錄音炬守,去河邊找鬼牧嫉。 笑死,一個胖子當著我的面吹牛减途,可吹牛的內(nèi)容都是我干的酣藻。 我是一名探鬼主播,決...
    沈念sama閱讀 38,907評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼鳍置,長吁一口氣:“原來是場噩夢啊……” “哼辽剧!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起税产,我...
    開封第一講書人閱讀 37,679評論 0 266
  • 序言:老撾萬榮一對情侶失蹤怕轿,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后辟拷,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體撞羽,經(jīng)...
    沈念sama閱讀 44,122評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,459評論 2 325
  • 正文 我和宋清朗相戀三年衫冻,在試婚紗的時候發(fā)現(xiàn)自己被綠了诀紊。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,605評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡隅俘,死狀恐怖邻奠,靈堂內(nèi)的尸體忽然破棺而出笤喳,到底是詐尸還是另有隱情,我是刑警寧澤碌宴,帶...
    沈念sama閱讀 34,270評論 4 329
  • 正文 年R本政府宣布杀狡,位于F島的核電站,受9級特大地震影響贰镣,放射性物質(zhì)發(fā)生泄漏捣卤。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,867評論 3 312
  • 文/蒙蒙 一八孝、第九天 我趴在偏房一處隱蔽的房頂上張望董朝。 院中可真熱鬧,春花似錦干跛、人聲如沸子姜。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,734評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽哥捕。三九已至,卻和暖如春嘉熊,著一層夾襖步出監(jiān)牢的瞬間遥赚,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,961評論 1 265
  • 我被黑心中介騙來泰國打工阐肤, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留凫佛,地道東北人。 一個月前我還...
    沈念sama閱讀 46,297評論 2 360
  • 正文 我出身青樓孕惜,卻偏偏與公主長得像愧薛,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子衫画,可洞房花燭夜當晚...
    茶點故事閱讀 43,472評論 2 348