1 首先watcher機制的watcher是什么施敢?模型是什么?
watcher就像現(xiàn)實生活中的監(jiān)聽器
辐脖,那server就相當(dāng)于會議室等饲宛,各個客戶端(人),都可以訪問的地方嗜价。我不需要知道其他人干了什么事情艇抠,我只監(jiān)聽,我感興趣的內(nèi)容(DataNode
)久锥。 DataNode內(nèi)容變更家淤。dataNode 的新增,或者子DataNode改變等事件
瑟由。都會引起我的興趣絮重,然后監(jiān)聽器就會,給我推送內(nèi)容。
watcher的模型青伤,包括最重要的回調(diào),process回調(diào)方法督怜。 還有Event事件,還有event發(fā)生時server的狀態(tài)KeeperState潮模。
2 watcher機制亮蛔,有什么用?
1 結(jié)合zookeeper數(shù)據(jù)模型中的擎厢,臨時節(jié)點Znode,可以實現(xiàn)究流,服務(wù)注冊與發(fā)現(xiàn),集群配置動態(tài)更新等功能动遭。
2 結(jié)合zookeeper數(shù)據(jù)模型中的芬探,臨時順序節(jié)點Znode特性,可以實現(xiàn)分布式鎖厘惦,隊列偷仿。
3 從設(shè)計模式的角度看,watcher的注冊宵蕉,觸發(fā)酝静,就是分布式消息的發(fā)布訂閱模式,也就是觀察者模式羡玛。
3 watcher的運行機制
模擬這樣一個場景别智,zk.getChildren(path, childrenWatcher) . childrenWatcher是重寫了process(WatchedEvent event)方法的watcher對象。
3.1 watcher的client端稼稿,怎么完成注冊
ZooKeeper#getChildren(String, Watcher)
public List<String> getChildren(final String path, Watcher watcher)
{
"省略n行代碼================"
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new ChildWatchRegistration(watcher, clientPath);
}
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getChildren);
GetChildrenRequest request = new GetChildrenRequest();
request.setPath(serverPath);
"實際網(wǎng)絡(luò)上傳輸薄榛,只傳path,還有watch這個boolean變量"
request.setWatch(watcher != null);
GetChildrenResponse response = new GetChildrenResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
return response.getChildren();
}
構(gòu)建完了請求對象之后让歼,放入outgoingQueue中敞恋。等待SendThread的run方法發(fā)送
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration)
{
Packet packet = null;
synchronized (outgoingQueue) {
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
// mark as closing
if (h.getType() == OpCode.closeSession) {
closing = true;
}
outgoingQueue.add(packet);
}
}
sendThread.getClientCnxnSocket().wakeupCnxn();
return packet;
}
ClientCnxn.SendThread#run
while (state.isAlive()) {
try {
"省略n行代碼======================"
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
"省略n行代碼======================"
}
.ClientCnxnSocketNIO#doIO
if (sockKey.isWritable()) {
synchronized(outgoingQueue) {
"從outpingQueue中拿到需要發(fā)送的packet "
Packet p = findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress());
if (p != null) {
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != OpCode.ping) &&
(p.requestHeader.getType() != OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
}
"這個方法里只序列化了request字段和requestHeader字段到Packet類中的bb字段ByteBuffer"
p.createBB();
}
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
sentCount++;
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
"剛才說了requestHeader序列化了,而且有東西谋右,所以pendingQueue加入packet"
pendingQueue.add(p);
}
}
}
}
if (outgoingQueue.isEmpty()) {
disableWrite();
} else if (!initialized && p != null && !p.bb.hasRemaining()) {
disableWrite();
} else {
enableWrite();
}
}
}
到這里實際上硬猫,還沒有完成客戶端的watcher注冊。
實際上是改执,服務(wù)端先完成的注冊浦徊。但是服務(wù)端注冊,我們后面看天梧,這里先假定他正常返回了getChildrenRequest請求的數(shù)據(jù)盔性。
這個時候,還是ClientCnxnSocketNIO#doIO的方法呢岗。
if (sockKey.isReadable()) {
int rc = sock.read(incomingBuffer);
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
recvCount++;
readLength();
}
} else {
"讀取服務(wù)端返回的數(shù)據(jù)"
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
org.apache.zookeeper.ClientCnxn.SendThread#readResponse
Packet packet;
synchronized (pendingQueue) {
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got "
+ replyHdr.getXid());
}
"拿到之前放在pendingqueue中的packet"
packet = pendingQueue.remove();
}
try {
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
if (packet.response != null && replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response");
}
} finally {
"這個方法最重要冕香,才是完成watcher注冊的地方"
finishPacket(packet);
}
這個方法最重要蛹尝,才是完成watcher注冊的地方
ClientCnxn#finishPacket
private void finishPacket(Packet p) {
if (p.watchRegistration != null) {
"這里的p.watchRegistration實際上是ChildWatchRegistration"
p.watchRegistration.register(p.replyHeader.getErr());
}
if (p.cb == null) {
synchronized (p) {
p.finished = true;
p.notifyAll();
}
} else {
p.finished = true;
eventThread.queuePacket(p);
}
}
public void register(int rc) {
if (shouldAddWatch(rc)) {
"因為ChildWatchRegistration重寫了getWatches方法,返回的是watchManager.childWatches;"
"watchManager實際上就是ZKWatchManager"
Map<String, Set<Watcher>> watches = getWatches(rc);
synchronized(watches) {
Set<Watcher> watchers = watches.get(clientPath);
if (watchers == null) {
watchers = new HashSet<Watcher>();
watches.put(clientPath, watchers);
}
watchers.add(watcher);
}
}
}
到此悉尾,完成了client端的注冊突那。
附上流程圖,還有類圖:
3.2 server端是怎么注冊的呢构眯?
在前面的【zookeeper集群的分布式事務(wù)請求處理過程】中我們講過愕难,責(zé)任鏈的確定是根據(jù)其角色來確定的。 當(dāng)前我們是單機版的惫霸。所以我們的getChildren請求猫缭,最終會到服務(wù)端的責(zé)任鏈 PrepRequestProcessor===>SyncRequestProcessor===>FInalRequestProcessor
當(dāng)我們發(fā)送包含watcher的getChildren的請求對象時,PrepRequestProcessor只是checkSession是否過期expired 或者sessIon是否遷移moved
SyncRequestProcessor和持久化事物日志有關(guān)壹店。如果不需要持久化猜丹,就會走到FInalRequestProcessor。這個類中
FinalRequestProcessor#processRequest
case OpCode.getChildren: {
lastOp = "GETC";
GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
getChildrenRequest);
"從內(nèi)存的dataTree中獲取path所對應(yīng)的節(jié)點"
DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}
PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ,
request.authInfo);
"如果getChildrenRequest中的watch布爾變量是true的話硅卢,傳入cnxn對象"
"因為cnxn對象實現(xiàn)了watcher接口方便后面的時候回調(diào)"
List<String> children = zks.getZKDatabase().getChildren(
getChildrenRequest.getPath(), null, getChildrenRequest
.getWatch() ? cnxn : null);
rsp = new GetChildrenResponse(children);
break;
}
public List<String> getChildren(String path, Stat stat, Watcher watcher)
throws KeeperException.NoNodeException {
DataNode n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
synchronized (n) {
if (stat != null) {
n.copyStat(stat);
}
List<String> children = new ArrayList<String>(n.getChildren());
if (watcher != null) {
"把cnxn對象注冊到WatchManager childWatches中"
childWatches.addWatch(path, watcher);
}
return children;
}
}
3.3 watcher在服務(wù)端的回調(diào)
當(dāng)我們在path下面新建子節(jié)點的時候射窒,我們就會觸發(fā),這個NodeChildrenChanged事件将塑。
首先進入的是PrepRequestProcessor
.PrepRequestProcessor#pRequest
case OpCode.create:
CreateRequest createRequest = new CreateRequest();
"這里得到了創(chuàng)建節(jié)點對應(yīng)的zxid脉顿,轉(zhuǎn)化成事物請求,主要添加了ChangeRecord"
pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true);
break;
接下來是SyncRequestProcessor点寥。append事物日志txnLog到streamsToFlush鏈表中艾疟。
然后flush到磁盤。
在接下來是FInalRequestProcessor开财。
FinalRequestProcessor#processRequest
synchronized (zks.outstandingChanges) {
while (!zks.outstandingChanges.isEmpty()
&& zks.outstandingChanges.get(0).zxid <= request.zxid) {
ChangeRecord cr = zks.outstandingChanges.remove(0);
if (cr.zxid < request.zxid) {
LOG.warn("Zxid outstanding "
+ cr.zxid
+ " is less than current " + request.zxid);
}
if (zks.outstandingChangesForPath.get(cr.path) == cr) {
zks.outstandingChangesForPath.remove(cr.path);
}
}
if (request.hdr != null) {
TxnHeader hdr = request.hdr;
Record txn = request.txn;
"處理事物請求"
rc = zks.processTxn(hdr, txn);
}
// do not add non quorum packets to the queue.
if (Request.isQuorum(request.type)) {
zks.getZKDatabase().addCommittedProposal(request);
}
}
public String createNode(String path, byte data[], List<ACL> acl,
long ephemeralOwner, int parentCVersion, long zxid, long time)
throws KeeperException.NoNodeException,
KeeperException.NodeExistsException {
"省略N行代碼============"
dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
"其中這個地方是我們之前注冊的watcher,現(xiàn)在觸發(fā)回調(diào)"
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
Event.EventType.NodeChildrenChanged);
return path;
}
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this) {
"這個地方解釋了 ,watcher的一次性注冊误褪,需要繼續(xù)監(jiān)聽的話责鳍,得重新注冊watcher"
watchers = watchTable.remove(path);
if (watchers == null || watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"No watchers for " + path);
}
return null;
}
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
"這里觸發(fā)回調(diào),記得之前我們放入的是ServerCnxn對象兽间,所以我們跳到ServerCnxn的代碼"
w.process(e);
}
return watchers;
}
.NIOServerCnxn#process
synchronized public void process(WatchedEvent event) {
"注意這里的zxid是-1,在客戶端判斷的時候會用到"
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"Deliver event " + event + " to 0x"
+ Long.toHexString(this.sessionId)
+ " through " + this);
}
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();
"因為我們發(fā)送的都是watcher历葛,所以watcherEvent才是可以在網(wǎng)絡(luò)上傳輸?shù)?
sendResponse(h, e, "notification");
}
3.4 接下來就是最后Client端的回調(diào)
.ClientCnxn.SendThread#readResponse
if (replyHdr.getXid() == -1) {
// -1 means notification
if (LOG.isDebugEnabled()) {
LOG.debug("Got notification sessionid:0x"
+ Long.toHexString(sessionId));
}
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
// convert from a server path to a client path
if (chrootPath != null) {
String serverPath = event.getPath();
if(serverPath.compareTo(chrootPath)==0)
event.setPath("/");
else if (serverPath.length() > chrootPath.length())
event.setPath(serverPath.substring(chrootPath.length()));
else {
LOG.warn("Got server path " + event.getPath()
+ " which is too short for chroot path "
+ chrootPath);
}
}
WatchedEvent we = new WatchedEvent(event);
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we + " for sessionid 0x"
+ Long.toHexString(sessionId));
}
"把接收到的Event放入waitingEvents隊列中"
eventThread.queueEvent( we );
return;
}
public void queueEvent(WatchedEvent event) {
if (event.getType() == EventType.None
&& sessionState == event.getState()) {
return;
}
sessionState = event.getState();
// materialize the watchers based on the event
WatcherSetEventPair pair = new WatcherSetEventPair(
"根據(jù)event,從ClientWatchManager嘀略,也就是ZKWatchManager恤溶,"
"中找到watcher 集合,我們前面觸發(fā)的是NodeChildrenChanged帜羊,所以從Map<String, Set<Watcher>> "
"childWatches獲得Set<Watcher>"
watcher.materialize(event.getState(), event.getType(),
event.getPath()),
event);
// queue the pair (watch set & event) for later processing
waitingEvents.add(pair);
}
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
Watcher.Event.EventType type,
String clientPath)
{
Set<Watcher> result = new HashSet<Watcher>();
switch (type) {
"省略n行代碼===================="
case NodeChildrenChanged:
synchronized (childWatches) {
"從childWatches中刪除咒程,刪除的watcher都加入到result返回結(jié)果中"
addTo(childWatches.remove(clientPath), result);
}
break;
default:
String msg = "Unhandled watch event type " + type
+ " with state " + state + " on path " + clientPath;
LOG.error(msg);
throw new RuntimeException(msg);
}
return result;
}
.ClientCnxn.EventThread#run
public void run() {
try {
isRunning = true;
while (true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
"客戶端的回調(diào)"
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}
LOG.info("EventThread shut down for session: 0x{}",
Long.toHexString(getSessionId()));
}
private void processEvent(Object event) {
try {
if (event instanceof WatcherSetEventPair) {
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
"客戶端的回調(diào)"
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
}