系統(tǒng)模型
zk的視圖結(jié)構(gòu)和標準的unix文件系統(tǒng)非常類似膳音,但沒有引入傳統(tǒng)文件系統(tǒng)中目錄和文件等概念,而是使用了其獨有的”數(shù)據(jù)節(jié)點“概念苍凛,稱之為ZNode兵志,Znode是Zookeeper中數(shù)據(jù)的最小單元想罕, 每個Znode上都可以保存數(shù)據(jù),同時還可以掛載子節(jié)點惭适,這樣構(gòu)成了一個層次化的命名空間楼镐,我們稱之為樹框产。
事務: 在zk中,事務是指能夠改變Zookeeper服務器狀態(tài)的操作盾舌,也叫事務操作或更新操作蘸鲸,一般包括數(shù)據(jù)節(jié)點創(chuàng)建與刪除酌摇、數(shù)據(jù)節(jié)點內(nèi)容更新和客戶端會話創(chuàng)建與失效等操作。對于每一個事務請求仍稀,zk都會為其分配一個全局唯一的事務ID,用ZXID來表示埂息,通常64位,每一個ZXID對應一次更新操作铲掐。
**節(jié)點類型:在zk中值桩,每個數(shù)據(jù)節(jié)點都是有生命周期的奔坟,其生命周期的長短取決于數(shù)據(jù)節(jié)點的節(jié)點類型:
- 持久節(jié)點: 該數(shù)據(jù)節(jié)點被創(chuàng)建后,就會一直存在于ZooKeeper服務器上婉支,直到有刪除操作來清除這個節(jié)點澜建。
- 持久順序節(jié)點: 在持久性的基礎(chǔ)上增加了順序性霎奢,zk會根據(jù)節(jié)點創(chuàng)建的先后順序饼灿,自動為給定節(jié)點加上一個數(shù)字后綴,
- 臨時節(jié)點: 臨時節(jié)點的生命周期是和客戶端的會話綁定在一起晤硕,客戶端會話失效舞箍,這個節(jié)點就會被刪除
- 臨時順序節(jié)點: 在臨時節(jié)點的基礎(chǔ)上有了順序性這個特性皆疹。
Watcher機制
zk的watcher由客戶端略就、客戶端watchManager和zk服務器組成,zk客戶端向zk服務器注冊watcher的同時窄绒,會將watcher對象存儲在客戶端的watcherManager彰导,zk服務器觸發(fā)watcher事件后,會向客戶端發(fā)送通知山析,客戶端線程從watcher manager中取出對應的watcher對象倔幼,執(zhí)行相應的回調(diào)邏輯损同。
上圖中的小紅旗是一個watcher,當小紅旗被創(chuàng)建并注冊到node1節(jié)點后茂卦,就會監(jiān)聽node1+ node_a + node_b或者node_a + node_b,這里兩種情況是因為在創(chuàng)建watcher注冊時有多種途徑等龙,并且watcher不能監(jiān)聽到孫子節(jié)點伶贰,此外黍衙,watcher設置后是一次性的,觸發(fā)一次后就失效位仁,如果想一直監(jiān)聽方椎,需要在process回調(diào)函數(shù)里重新注冊相同的watcher
在zk中接口類Watcher定義了事件通知相關(guān)的邏輯棠众,包含了KeeperState和EventType兩個枚舉類,代表通知狀態(tài)和事件類型轿亮,
Watcher接口擁有process函數(shù)我注,用于處理回調(diào)
內(nèi)部類Event又包含內(nèi)部類KeeperState以及EventType
KeeperState用于記錄Event發(fā)生時的zk狀態(tài)(通知狀態(tài))
EventType用于記錄Event的類型
方法process
//回調(diào)函數(shù)實現(xiàn)該函數(shù)迟隅,表示根據(jù)event執(zhí)行的行為
abstract public void process(WatchedEvent event);
內(nèi)部類Event
包含KeeperState和EventType兩個內(nèi)部類,通過枚舉類實現(xiàn)
方法很簡單掠抬,就是int值與對應枚舉類型的轉(zhuǎn)換
兩者的枚舉類型以及兩者之間的關(guān)系校哎,觸發(fā)條件可以參考《paxos到zk》中的圖
WatchedEvent 和 WatcherEvent
WatchedEvent :代表zk上一個Watcher能夠回應的變化,包含了變化事件的類型闷哆,zk狀態(tài)以及變化影響的znode的path
WatcherEvent : 是WatchedEvent用于網(wǎng)絡傳輸?shù)姆庋b類
三個成員變量很好的解釋了WatchedEvent的意義抱怔,即事件的類型,zk狀態(tài)以及變化影響的znode的path
WatcherEvent有一個getWrapper方法局冰,
/**
* Convert WatchedEvent to type that can be sent over network
*/
//轉(zhuǎn)化成可供網(wǎng)絡傳輸灌危,序列化的WatcherEvent
public WatcherEvent getWrapper() {
return new WatcherEvent(eventType.getIntValue(),
keeperState.getIntValue(),
path);
}
}
WatcherEvent實現(xiàn)了Record接口勇蝙,可以理解為WatchedEvent用于網(wǎng)絡傳輸?shù)姆庋b類
ClientWatchManager接口和實現(xiàn)類ZKWatchManager
ClientWatchManager接口: 用戶根據(jù)Event得到需要通知的watcher
ZKWatchManager為ClientWatchManager的實現(xiàn),ClientWatchManager接口只有一個函數(shù),
//ClientWatchManager負責根據(jù)Event得到需要通知哪些watcher
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
Watcher.Event.EventType type, String path);
默認實現(xiàn)類是Zookeeper的內(nèi)部類ZKWatchManager,
private static class ZKWatchManager implements ClientWatchManager {
private final Map<String, Set<Watcher>> dataWatches =
new HashMap<String, Set<Watcher>>();//針對內(nèi)容的watch
private final Map<String, Set<Watcher>> existWatches =
new HashMap<String, Set<Watcher>>();//針對exist API相關(guān)的watch
private final Map<String, Set<Watcher>> childWatches =
new HashMap<String, Set<Watcher>>();//針對getChildren API相關(guān)的watch
private volatile Watcher defaultWatcher;//client傳遞的,默認的watcher實現(xiàn)
final private void addTo(Set<Watcher> from, Set<Watcher> to) {
if (from != null) {
to.addAll(from);
}
}
/* (non-Javadoc)
* @see org.apache.zookeeper.ClientWatchManager#materialize(Event.KeeperState,
* Event.EventType, java.lang.String)
*/
@Override
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
Watcher.Event.EventType type,
String clientPath)
{
Set<Watcher> result = new HashSet<Watcher>();
switch (type) {
case None://eventType是null
// 則所有dataWatches,existWatches,childWatches都需要被通知
result.add(defaultWatcher);//添加默認watcher
boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
state != Watcher.Event.KeeperState.SyncConnected;//獲取clear標記
synchronized(dataWatches) {
for(Set<Watcher> ws: dataWatches.values()) {
result.addAll(ws);
}
if (clear) {
dataWatches.clear();
}
}
synchronized(existWatches) {
for(Set<Watcher> ws: existWatches.values()) {
result.addAll(ws);
}
if (clear) {
existWatches.clear();
}
}
synchronized(childWatches) {
for(Set<Watcher> ws: childWatches.values()) {
result.addAll(ws);
}
if (clear) {
childWatches.clear();
}
}
return result;
case NodeDataChanged:
case NodeCreated:
//如果節(jié)點內(nèi)容變化或者創(chuàng)建
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);//從dataWatches中移除,并且添加到result中
}
synchronized (existWatches) {
addTo(existWatches.remove(clientPath), result);//從existWatches中移除盗誊,并且添加到result中
}
break;
case NodeChildrenChanged:
synchronized (childWatches) {
addTo(childWatches.remove(clientPath), result);
}
break;
case NodeDeleted:
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
// XXX This shouldn't be needed, but just in case
synchronized (existWatches) {
Set<Watcher> list = existWatches.remove(clientPath);
if (list != null) {
addTo(existWatches.remove(clientPath), result);
LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
}
}
synchronized (childWatches) {
addTo(childWatches.remove(clientPath), result);
}
break;
default://默認處理
String msg = "Unhandled watch event type " + type
+ " with state " + state + " on path " + clientPath;
LOG.error(msg);
throw new RuntimeException(msg);
}
//返回結(jié)果
return result;
}
}
該方法在事件發(fā)生后哈踱,返回需要被通知的Watcher集合开镣。是根據(jù)已經(jīng)注冊的watches(分為三類,data,children,exist)咽扇,根據(jù)path找到對應的watches陕壹,得到一個result集合進行返回
** WatcherSetEventPair ** : 將Event以及對應需要觸發(fā)的watches集合進行組合綁定,放到waitingEvents隊列
private static class WatcherSetEventPair {
private final Set<Watcher> watchers;//事件觸發(fā)需要被通知的watches集合
private final WatchedEvent event;//事件
public WatcherSetEventPair(Set<Watcher> watchers, WatchedEvent event) {
this.watchers = watchers;
this.event = event;
}
}
watcher注冊過程
創(chuàng)建zk客戶端對象實例時注冊:
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean
canBeReadOnly)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
通過這種方式注冊的watcher將會作為整個zk會話期間默認的watcher糠馆,會一直被保存在客戶端zk watchManager的defaultWatcher中又碌。
其他注冊watcher的API:
getChildren(String path, Watcher watcher)
getChildren(String path, boolean watch)
exists(String path, Watcher watcher)
getData(String path, boolean watch, Stat stat)
...
Boolean watch表示是否使用上下文默認的watcher绊袋,即創(chuàng)建zk時設置的watcher
客戶端注冊過程
我們以getData這個接口為例說明:
watcher在請求中,通過標志位發(fā)送給server
public byte[] getData(final String path, Watcher watcher, Stat stat)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {//如果有watcher愤炸,就注冊
wcb = new DataWatchRegistration(watcher, clientPath);//生成一個DataWatchRegistration,即Data的watch的注冊
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();//生成請求頭
h.setType(ZooDefs.OpCode.getData);//設置請求類型為getData
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);//設置標志位,是否函數(shù)watch
GetDataResponse response = new GetDataResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);//client端提交請求
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
if (stat != null) {
DataTree.copyStat(response.getStat(), stat);
}
return response.getData();
}
public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException {
return this.getData(path, watch?this.watchManager.defaultWatcher:null, stat);
}
里面調(diào)用了ClientCnxn#submitRequestsubmitRequest 源碼如下
//提交請求
public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration)
throws InterruptedException {
ReplyHeader r = new ReplyHeader();//生成回復頭
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration);
synchronized (packet) {
while (!packet.finished) {//如果packet沒有處理完,就一直等著
packet.wait();
}
}
return r;
}
里面調(diào)用了ClientCnxn#queuePacket函數(shù), queuePacket函數(shù)作為生產(chǎn)者,代碼中調(diào)用
outgoingQueue.add(packet);
在 ZooKeeper 中缤苫,Packet 是一個最小的通信協(xié)議單元墅拭,即數(shù)據(jù)包。
Pakcet 用于進行客戶端與服務端之間的網(wǎng)絡傳輸舒憾,任何需要傳輸?shù)膶ο蠖夹枰b成一個 Packet 對象穗熬。
在 ClientCnxn 中 WatchRegistration 也會被封裝到 Packet 中,調(diào)用 queuePacket放入outgoingQueue即發(fā)送隊列中(生產(chǎn)packet)
然后SendThread 線程調(diào)用doTransport方法,從outgoingQueue中消費Packet,客戶端發(fā)送, 參考實現(xiàn)類ClientCnxnSocketNIO#doTransport里面調(diào)用了ClientCnxnSocketNIO#doIO此時是發(fā)送請求唤蔗,調(diào)用了ClientCnxn.Packet#createBB
public void createBB() {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
boa.writeInt(-1, "len"); // We'll fill this in later
if (requestHeader != null) {
requestHeader.serialize(boa, "header");//序列化請求頭妓柜,包含xid和type
}
if (request instanceof ConnectRequest) {
request.serialize(boa, "connect");
// append "am-I-allowed-to-be-readonly" flag
boa.writeBool(readOnly, "readOnly");
} else if (request != null) {
request.serialize(boa, "request");//序列化request(對于特定請求如GetDataRequest,包含了是否存在watcher的標志位)
}
baos.close();
this.bb = ByteBuffer.wrap(baos.toByteArray());
this.bb.putInt(this.bb.capacity() - 4);
this.bb.rewind();
} catch (IOException e) {
LOG.warn("Ignoring unexpected exception", e);
}
}
client的watcher并沒有進行網(wǎng)絡傳輸,server并不知道client的watcher觸發(fā)時process函數(shù)要怎么執(zhí)行
但是對于特定請求類型比如GetDataRequest,序列化的時候會傳遞一個標志位watch藏雏,表示是否watch
server在處理的時候作煌,只知道client是否watch某個path
上面getData兩個接口都可以進行Watcher的注冊,第二個接口通過一個Boolean參數(shù)來標識是否使用默認的Watcher來進行注冊蚤告,然后還是調(diào)用第一個方法來完成注冊邏輯服爷,在第一個方法中仍源,客戶端使用this.cnxn.submitRequest(h, request, response, wcb) 方法向服務器發(fā)送這個注冊請求,完成請求發(fā)送后逗爹,
發(fā)送請求的時候嚎于,watcher還并沒有注冊在client端,要等server的返回,
ClientCnxn.SendThread中袍睡,讀取server的回復
調(diào)用了ClientCnxnSocketNIO#doTransport
調(diào)用了ClientCnxnSocketNIO#doIO
調(diào)用了ClientCnxn.SendThread#readResponse
調(diào)用了ClientCnxn#finishPacket
finishPacket方法斑胜,它會從Packet中取出對應的Watcher并注冊到zkWatchManager中去嫌吠,也即請求回復后辫诅,watcher才在client端注冊,
private void finishPacket(Packet p) {//Packet請求發(fā)送簇宽,收到回復吧享,進行處理之后
if (p.watchRegistration != null) {//如果有要注冊的watchRegistration
p.watchRegistration.register(p.replyHeader.getErr());//根據(jù)response code進行注冊
}
if (p.cb == null) {//如果沒有異步回調(diào)
synchronized (p) {
p.finished = true;
p.notifyAll();
}
} else {//如果有異步回調(diào)
p.finished = true;
eventThread.queuePacket(p);
}
}
watchRegistration.register方法就是把 WatchRegistration 子類里面的 Watcher 實例放到 ZKWatchManager 的 dataWatches 中存儲起來
abstract class WatchRegistration {//client中管理watch注冊的類
private Watcher watcher;//注冊的watcher
private String clientPath;//監(jiān)聽的znode path
public WatchRegistration(Watcher watcher, String clientPath)
{
this.watcher = watcher;
this.clientPath = clientPath;
}
//根據(jù)response的resultCode來獲取所有注冊的path以及對應的watcher集合
abstract protected Map<String, Set<Watcher>> getWatches(int rc);
/**
* Register the watcher with the set of watches on path.
* @param rc the result code of the operation that attempted to
* add the watch on the path.
*/
public void register(int rc) {//根據(jù)response的resultCode來注冊watcher到一個path
if (shouldAddWatch(rc)) {//如果可以添加
Map<String, Set<Watcher>> watches = getWatches(rc);//獲取所有注冊的path以及對應的watcher集合
synchronized(watches) {
Set<Watcher> watchers = watches.get(clientPath);//找到該path
if (watchers == null) {
watchers = new HashSet<Watcher>();
watches.put(clientPath, watchers);
}
watchers.add(watcher);//添加當前watcher
}
}
}
/**
* Determine whether the watch should be added based on return code.
* @param rc the result code of the operation that attempted to add the
* watch on the node
* @return true if the watch should be added, otw false
*/
protected boolean shouldAddWatch(int rc) {//根據(jù)resultCode判斷是否可以添加watch
return rc == 0;
}
}
屬性clientPath和watcher分別是監(jiān)聽關(guān)注的znode的path和對應處理的watcher
注冊邏輯就是根據(jù)response的resultCode,判斷是否可以添加watch殊鞭,可以添加的話,就在Map<String, Set<Watcher>>添加記錄.
簡單來說锯仪,就是當使用ZooKeeper 構(gòu)造方法或者使用 getData趾盐、exists 和 getChildren 三個接口來向 ZooKeeper 服務器注冊 Watcher 的時候救鲤,首先將此消息傳遞給服務端,傳遞成功后斥扛,服務端會通知客戶端丹锹,然后客戶端將該路徑和Watcher對應關(guān)系存儲起來備用。
client注冊watcher的小結(jié)
1.client發(fā)送getData,getChildren,exist請求時峻村,傳入自定義的watcher粘昨,或利用ZooKeeper構(gòu)造函數(shù)的默認Watcher
2.將請求封裝為Packet,在RequestHeader記錄是否需要watcher,記錄放入生產(chǎn)者隊列ClientCnxn#outgoingQueue
3.ClientCnxn.SendThread消費outgoingQueue
調(diào)用ClientCnxnSocketNIO#doTransport
調(diào)用ClientCnxnSocketNIO#doIO
調(diào)用ClientCnxn.Packet#createBB
序列化的時候窜锯,將request記性序列化,里面包含一個是否帶有watch的標志位(不包含watcher對象)
4.server進行相應的處理,之后進行回復
可以參考FinalRequestProcessor#processRequest中對于getData的請求處理
利用getDataRequest.getWatch())吞瞪,看是否client需要watch芍秆,進而注冊到DataTree的WatchManager中翠勉,下面會講的
5.ClientCnxn.SendThread讀取回復
調(diào)用ClientCnxnSocketNIO#doTransport
調(diào)用ClientCnxnSocketNIO#doIO
調(diào)用ClientCnxn.SendThread#readResponse
調(diào)用ClientCnxn#finishPacket
利用response code,進行watcher的注冊对碌,記錄在ZooKeeper.WatchRegistration對應的實現(xiàn)類中
服務端處理watcher
server前面的調(diào)用鏈這里不展開,從FinalRequestProcessor 類接收到客戶端請求后诉位,會調(diào)用 processRequest 方法進行處理菜枷,會進一步轉(zhuǎn)向 ZooKeeperServer 的 processRequest 進行進一步處理,處理結(jié)由 ZKDatabase 類返回
對于注冊 Watcher 請求椿息,F(xiàn)inalRequestProcessor 的 ProcessRequest 方法會判斷當前請求是否需要注冊 Watcher寝优,如果為 true枫耳,就會將當前的 ServerCnxn 對象和數(shù)據(jù)節(jié)點路徑傳入 getData 方法中去迁杨。ServerCnxn 是一個 ZooKeeper 客戶端和服務器之間的連接接口,代表了一個客戶端和服務器的連接.
實現(xiàn)了Watcher接口捷沸,這個Watcher的實現(xiàn)類記錄了client和server的連接狐史,回調(diào)的時候骏全,可以直接發(fā)送response告訴client,有事件觸發(fā)了试吁,數(shù)據(jù)節(jié)點的節(jié)點路徑和 ServerCnxn 最終會被存儲在 WatchManager 的 watchTable 和 watch2Paths 中楼咳。
case OpCode.getData: {//getData請求
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
getDataRequest);//反序列化出getDataRequest
DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}//驗證path對應的node是否存在
PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ,
request.authInfo);//驗證ACL權(quán)限
Stat stat = new Stat();
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
getDataRequest.getWatch() ? cnxn : null);//如果有watch標志位母怜,Watcher就傳cnxn
rsp = new GetDataResponse(b, stat);
break;
}
上面的FinalRequestProcessor#processRequest調(diào)用會進入
ZKDatabase#getData
DataTree#getData
public byte[] getData(String path, Stat stat, Watcher watcher)
throws KeeperException.NoNodeException {
DataNode n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
synchronized (n) {
n.copyStat(stat);
if (watcher != null) {
dataWatches.addWatch(path, watcher);//注冊watcher到dataWatches
}
return n.data;//返回byte[]
}
}
WatchManager 負責 Watcher 事件的觸發(fā)糙申,它是一個統(tǒng)稱,在服務端 DataTree 會托管兩個 WatchManager缕陕,分別是 dataWatches 和 childWatches疙挺,分別對應數(shù)據(jù)變更 Watcher 和子節(jié)點變更 Watcher铐然。
.WatchManger 兩個隊列
private final HashMap<String, HashSet<Watcher>> watchTable =
new HashMap<String, HashSet<Watcher>>();
private final HashMap<Watcher, HashSet<String>> watch2Paths =
new HashMap<Watcher, HashSet<String>>();
public synchronized void addWatch(String path, Watcher watcher) {
HashSet<Watcher> list = watchTable.get(path);
if (list == null) {
// don't waste memory if there are few watches on a node
// rehash when the 4th entry is added, doubling size thereafter
// seems like a good compromise
list = new HashSet<Watcher>(4);
watchTable.put(path, list);
}
list.add(watcher);
HashSet<String> paths = watch2Paths.get(watcher);
if (paths == null) {
// cnxns typically have many watches, so use default cap here
paths = new HashSet<String>();
watch2Paths.put(watcher, paths);
}
paths.add(path);
}
結(jié)合上面的時序圖,就可以理解請求是如何經(jīng)過ZKdatabase到DataTree最終記錄在WatchManager,這里就完成了watcher在服務端的注冊沥阳。
server觸發(fā)watch
當發(fā)生 Create桐罕、Delete桂敛、NodeChange(數(shù)據(jù)變更)這樣的事件后术唬,DataTree 會調(diào)用相應方法去觸發(fā) WatchManager 的 triggerWatch 方法,該方法返回 ZNODE 的信息粗仓,自此進入到回調(diào)本地 process 的序列借浊,這里以setdata為例:
processTxn 代碼
public ProcessTxnResult processTxn(TxnHeader header, Record txn)
{
ProcessTxnResult rc = new ProcessTxnResult();
try {
switch (header.getType()) {
case OpCode.setData:
SetDataTxn setDataTxn = (SetDataTxn) txn;
rc.path = setDataTxn.getPath();
rc.stat = setData(setDataTxn.getPath(), setDataTxn
.getData(), setDataTxn.getVersion(), header
.getZxid(), header.getTime());
break;
setData 代碼
public Stat setData(String path, byte data[], int version, long zxid,
long time) throws KeeperException.NoNodeException {
Stat s = new Stat();
DataNodeV1 n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
synchronized (n) {
n.data = data;
n.stat.setMtime(time);
n.stat.setMzxid(zxid);
n.stat.setVersion(version);
n.copyStat(s);
}
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
return s;
}
triggerWatch 代碼
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
//將事件類型(EventType)巴碗、通知狀態(tài)(WatchedEvent)、節(jié)點路徑封裝成一個 WatchedEvent 對象
HashSet<Watcher> watchers;
synchronized (this) {
//根據(jù)數(shù)據(jù)節(jié)點的節(jié)點路徑從 watchTable 里面取出對應的 Watcher召噩。如果沒有找到 Watcher 對象逸爵,
說明沒有任何客戶端在該數(shù)據(jù)節(jié)點上注冊過 Watcher师倔,直接退出。如果找打了 Watcher 就將其提取出來疲恢,
同時會直接從 watchTable 和 watch2Paths 里刪除 Watcher,即 Watcher 是一次性的棚愤,觸發(fā)一次就失效了杂数。
watchers = watchTable.remove(path);
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
//對于需要注冊 Watcher 的請求揍移,ZooKeeper 會把請求對應的 ServerCnxn 作為一個 Watcher 存儲,
所以這里調(diào)用的 process 方法實質(zhì)上是 ServerCnxn 的對應方法
w.process(e);
}
return watchers;
}
從上面的代碼我們可以總結(jié)出斯够,如果想要處理一個 Watcher读规,需要執(zhí)行的步驟如下所示:
將事件類型(EventType)燃少、通知狀態(tài)(WatchedEvent)、節(jié)點路徑封裝成一個 WatchedEvent 對象碍遍。
根據(jù)數(shù)據(jù)節(jié)點的節(jié)點路徑從 watchTable 里面取出對應的 Watcher阳液。如果沒有找到 Watcher 對象帘皿,說明沒有任何客戶端在該數(shù)據(jù)節(jié)點上注冊過 Watcher,直接退出虽填。如果找到了 Watcher 就將其提取出來曹动,同時會直接從 watchTable 和 watch2Paths 里刪除 Watcher墓陈,即 Watcher 是一次性的第献,觸發(fā)一次就失效了庸毫。
對于需要注冊 Watcher 的請求押框,ZooKeeper 會把請求對應的 ServerCnxn 作為一個 Watcher 存儲橡伞,所以這里調(diào)用的 process 方法實質(zhì)上是 ServerCnxn 的對應方法晋被,在請求頭標記“-1”表示當前是一個通知,將 WatchedEvent 包裝成 WatcherEvent 用于網(wǎng)絡傳輸序列化挂脑,向客戶端發(fā)送通知
@Override
synchronized public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);//xid為-1表示為通知
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"Deliver event " + event + " to 0x"
+ Long.toHexString(this.sessionId)
+ " through " + this);
}
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();//包裝為WatcherEvent來提供網(wǎng)絡傳輸
sendResponse(h, e, "notification");//給client發(fā)送請求,通知WatchedEvent的發(fā)生
}
也就是說崭闲,server觸發(fā)watcher,回調(diào)process函數(shù)其實就是告訴需要watch的client威蕉,WatcherEvent 發(fā)生了
服務端處理watcher小結(jié)
注冊時watcher是ServerCnxn類型韧涨,保存了和client的會話,如果client發(fā)送請求的時候,request的標志位watch為true如孝,server才會將這個會話注冊到WatchManager(否則server知道client對這個path不感興趣第晰,下次這個path變化了也不通知你)
觸發(fā)watcher時彬祖,就利用watchManager,找到path對應的watcher即ServerCnxn腹躁,告訴連接的client方南蓬,發(fā)生了WatcherEvent,client自己再處理
client回調(diào)watcher
客戶端收到消息后,會調(diào)用 ClientCnxn 的 SendThread.readResponse 方法來進行統(tǒng)一處理弱左,如清單所示炕淮。如果響應頭 replyHdr 中標識的 Xid 為 02,表示是 ping涂圆,如果為-4润歉,表示是驗證包,如果是-1嚼鹉,表示這是一個通知類型的響應驱富,然后進行反序列化、處理 chrootPath褐鸥、還原 WatchedEvent宴树、回調(diào) Watcher 等步驟,其中回調(diào) Watcher 步驟將 WacthedEvent 對象交給 EventThread 線程晶疼,在下一個輪詢周期中進行 Watcher 回調(diào)酒贬。
客戶端回調(diào)watcher
服務端會通過使用 ServerCnxn 對應的 TCP 連接來向客戶端發(fā)送一個 WatcherEvent 事件。ClientCnxn.SendThread讀取回復
調(diào)用ClientCnxnSocketNIO#doTransport
調(diào)用ClientCnxnSocketNIO#doIO
調(diào)用ClientCnxn.SendThread#readResponse
里面處理事件通知的代碼段
if (replyHdr.getXid() == -1) {//-1代表通知類型 即WatcherEvent
// -1 means notification
if (LOG.isDebugEnabled()) {
LOG.debug("Got notification sessionid:0x"
+ Long.toHexString(sessionId));
}
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");//反序列化WatcherEvent
// convert from a server path to a client path
if (chrootPath != null) {//把serverPath轉(zhuǎn)化成clientPath
String serverPath = event.getPath();
if(serverPath.compareTo(chrootPath)==0)
event.setPath("/");
else if (serverPath.length() > chrootPath.length())
event.setPath(serverPath.substring(chrootPath.length()));
else {
LOG.warn("Got server path " + event.getPath()
+ " which is too short for chroot path "
+ chrootPath);
}
}
WatchedEvent we = new WatchedEvent(event);//WatcherEvent還原成WatchedEvent
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we + " for sessionid 0x"
+ Long.toHexString(sessionId));
}
eventThread.queueEvent( we );//加入隊列
return;
}
對于一個來自服務端的響應翠霍,都是經(jīng)過一堆的 NIO 處理類到達客戶端,然后由 SendThread.readResponse(ByteBuffer incomingBuffer) 方法來統(tǒng)一進行處理的寒匙。如果響應頭 replyHdr 中標識了 xid 為 -1零如,表面這是一個通知類型的響應,對其的處理大體上分為如下步驟锄弱。
- 反序列化 packet2.deserialize(bbia, “response”);將字節(jié)流轉(zhuǎn)換成 WatcherEvent 對象考蕾。
- 還原WatchedEvent, WatchedEvent we1 = new WatchedEvent(packet2);
- 回調(diào) Watcher : ClientCnxn.this.eventThread.queueEvent(we1); 最后將 WatchedEvent 對象交給 eventThread 線程会宪,在下一個輪詢周期中進行回調(diào)肖卧。
下面來看一下eventThread.queueEvent(we1)里面的邏輯:
public void queueEvent(WatchedEvent event) {//將WatchedEvent加入隊列
if (event.getType() == EventType.None
&& sessionState == event.getState()) {
return;
}
sessionState = event.getState();
// materialize the watchers based on the event
WatcherSetEventPair pair = new WatcherSetEventPair(
watcher.materialize(event.getState(), event.getType(),
event.getPath()),
event);//用WatcherSetEventPair封裝watchers和watchedEvent
// queue the pair (watch set & event) for later processing
waitingEvents.add(pair);//加入隊列
}
對于這個方法,首先使用該 event 來生成一個 WatcherSetEventPair 類型的pari掸鹅,這個pari只是把 event 加了一個殼塞帐,然后附加上了 這個節(jié)點上所有的 Watcher :
private static class WatcherSetEventPair {
private final Set<Watcher> watchers;
private final WatchedEvent event;
那么是如何獲取到注冊該節(jié)點的所有watcher呢拦赠?看一下上面的 ClientCnxn.this.watcher.materialize(event.getState(), event.getType(), event.getPath()) 這個方法,以 NodeCreated 事件為例:
public Set<Watcher> materialize(KeeperState state, EventType type, String clientPath) {
HashSet result = new HashSet();
Map msg;
switch(null.$SwitchMap$org$apache$zookeeper$Watcher$Event$EventType[type.ordinal()]) {
...
case NodeDataChanged:
case NodeCreated:
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
synchronized (existWatches) {
addTo(existWatches.remove(clientPath), result);
}
break;
case NodeChildrenChanged:
synchronized (childWatches) {
addTo(childWatches.remove(clientPath), result);
}
break;
...
客戶端在識別出事件類型 EventType 后葵姥,會從相應的 Watcher 存儲(即 dataWatches荷鼠、existWatches 或 childWatches 中的一個或多個,本例中就是從 dataWatches 和 existWatches 兩個存儲中獲取榔幸,因為允乐,節(jié)點創(chuàng)建事件不會在 childWatches 中存儲)中去除對應的 Watcher。需要注意的是削咆,這里使用的是 remove 接口牍疏,因此也表明了客戶端的 Watcher 機制同樣是一次性的,即一旦被觸發(fā)后态辛,該 Watcher 就失效了麸澜。
取到所有的 Watcher 后挺尿,放到 pari 的 Set 里面奏黑,然后再把這個 pari 放到 waitingEvents 里面,而 waitingEvents 是啥玩意兒呢编矾?
private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue();
public void run() {
try {
this.isRunning = true;
while(true) {
Object e = this.waitingEvents.take();//循環(huán)取pari
if(e == ClientCnxn.this.eventOfDeath) {
this.wasKilled = true;
} else {
this.processEvent(e);//進行處理
}
if(this.wasKilled) {
LinkedBlockingQueue var2 = this.waitingEvents;
synchronized(this.waitingEvents) {
if(this.waitingEvents.isEmpty()) {
this.isRunning = false;
break;
}
}
}
}
} catch (InterruptedException var5) {
ClientCnxn.LOG.error("Event thread exiting due to interruption", var5);
}
}
waitingEvents 是一個待處理 Watcher 的隊列熟史,waitingEvents的消費在ClientCnxn.EventThread#run中,EventThread 的 run() 方法會不斷從隊列中取數(shù)據(jù)窄俏,交由 processEvent 方法處理:
private void processEvent(Object event) {
try {
if(event instanceof ClientCnxn.WatcherSetEventPair) {
ClientCnxn.WatcherSetEventPair t = (ClientCnxn.WatcherSetEventPair)event;
Iterator rc = t.watchers.iterator();
while(rc.hasNext()) {
Watcher clientPath = (Watcher)rc.next();
try {
clientPath.process(t.event);
} catch (Throwable var11) {
ClientCnxn.LOG.error("Error while calling watcher ", var11);
}
}
} else {
OK蹂匹,針對于本次事件,取出所有的 Watcher 類型的對象凹蜈,遍歷運行process方法限寞,進行串行同步處理伍宦。此處 processEvent 方法中的 Watcher 才是之前客戶端真正注冊的 Watcher诬乞,調(diào)用其 process 方法就可以實現(xiàn) Watcher 的回調(diào)了喝峦。客戶端只能收到服務器發(fā)過來的相關(guān)事件通知氢卡,并不能獲取到對應數(shù)據(jù)節(jié)點的原始數(shù)據(jù)和變更后的數(shù)據(jù)炭剪,如果需要知道變更前或者變更后的數(shù)據(jù)靡菇,需要調(diào)用相關(guān)接口獲取新的數(shù)據(jù)鸽斟。
思考
client注冊的watcher和server注冊的watcher有什么區(qū)別
作用和類型有區(qū)別
client注冊的watcher類型沒有限制,作用就是說client監(jiān)控到xx事件后干的事情挚冤,比如重新獲取數(shù)據(jù)
server注冊的watcher都是ServerCnxn類型妈橄,作用就是告訴對應client 發(fā)生了xx WatchedEvent就行
由于watcher并沒有直接在網(wǎng)絡進行傳輸庶近,所以兩者并不一樣server怎么知道一個WatchedEvent觸發(fā),要通知哪些client
server的watch是ServerCnxn眷蚓,保持了和Client的對話鼻种,直接回調(diào)process就行了
都是ServerCnxn(實現(xiàn)了Watcher)的功勞
watcher 特性總結(jié)
輕量
WatcherEvent 是 ZooKeeper 整個 Watcher 通知機制的最小通知單元,這個數(shù)據(jù)結(jié)構(gòu)中只包含三部分內(nèi)容:通知狀態(tài)沙热、事件類型和節(jié)點路徑普舆。也就是說恬口,Watcher 通知非常簡單,只會告訴客戶端發(fā)生了事件沼侣,而不會說明事件的具體內(nèi)容祖能。例如針對 NodeDataChanged 事件,ZooKeeper 的Watcher 只會通知客戶端指定數(shù)據(jù)節(jié)點的數(shù)據(jù)內(nèi)容發(fā)生了變更蛾洛,而對于原始數(shù)據(jù)以及變更后的新數(shù)據(jù)都無法從這個事件中直接獲取到养铸,而是需要客戶端主動重新去獲取數(shù)據(jù)——這也是 ZooKeeper 的 Watcher 機制的一個非常重要的特性≡欤客戶端向服務端注冊 Watcher 的時候钞螟,并不會把客戶端真實的 Watcher 對象傳遞到服務端,僅僅只是在客戶端請求中使用 boolean 類型屬性進行了標記谎碍,同時服務端也僅僅只是保存了當前連接的 ServerCnxn 對象鳞滨。這樣輕量級的 Watcher 機制設計,在網(wǎng)絡開銷和服務端內(nèi)存開銷上都是非常廉價的蟆淀。一次性
無論是服務端還是客戶端拯啦,一旦一個 Watcher 被觸發(fā),ZooKeeper 都會將其從相應的存儲中移除熔任。因此褒链,在 Watcher 的使用上,需要反復注冊疑苔。這樣的設計有效地減輕了服務端的壓力,如果注冊一個 Watcher 之后一直有效甫匹,那么針對那些更新非常頻繁的節(jié)點,服務端會不斷地向客戶端發(fā)送事件通知惦费,這無論對于網(wǎng)絡還是服務端性能的影響都非常大兵迅。客戶端串行執(zhí)行
客戶端 Watcher 回調(diào)的過程是一個串行同步的過程,這為我們保證了順序薪贫,