前言
ZooKeeper是一個相對簡單的分布式協(xié)調(diào)服務(wù)弯洗,通過閱讀源碼我們能夠更進一步的清楚分布式的原理但汞。
環(huán)境
ZooKeeper 3.4.9
入口函數(shù)
在bin/zkCli.sh
中排监,我們看到client端的真實入口其實是一個org.apache.zookeeper.ZooKeeperMain
的Java類
"$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \
-cp "$CLASSPATH" $CLIENT_JVMFLAGS $JVMFLAGS \
org.apache.zookeeper.ZooKeeperMain "$@"
通過源碼走讀贬堵,看到在ZooKeeperMain
中主要由兩部分構(gòu)成
connectToZK(cl.getOption("server"));
while ((line = (String)readLine.invoke(console, getPrompt())) != null) {
executeLine(line);
}
- 構(gòu)造一個
ZooKeeper
對象招驴,同ZooKeeperServer進行建立通信連接 - 通過反射調(diào)用
jline.ConsoleReader
類腹鹉,對終端輸入進行讀取,然后通過解析單行命令怔蚌,調(diào)用ZooKeeper
接口巩步。
如上所述,client端其實是對 zookeeper.jar 的簡單封裝桦踊,在構(gòu)造出一個ZooKeeper對象后椅野,通過解析用戶輸入,調(diào)用 ZooKeeper 接口和 Server 進行交互。
ZooKeeper 類
剛才我們看到 client 端同 ZooKeeper Server 之間的交互其實是通過 ZooKeeper 對象進行的竟闪,接下來我們詳細深入到 ZooKeeper 類中离福,看看其和服務(wù)端的交互邏輯。
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly)
throws IOException
{
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
HostProvider hostProvider = new StaticHostProvider( connectStringParser.getServerAddresses());
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly);
cnxn.start();
}
在 ZooKeeper的構(gòu)造方法中,可以看到 ZooKeeper 中使用 Server 的服務(wù)器地址構(gòu)建了一個 ClientCnxn
類炼蛤,在這個類中妖爷,系統(tǒng)新建了兩個線程
sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
其中,SendThread
負責將ZooKeeper
的請求信息封裝成一個Packet
理朋,發(fā)送給 Server ,并維持同Server的心跳絮识,EventThread
負責解析通過通過SendThread
得到的Response
,之后發(fā)送給Watcher::processEvent
進行詳細的事件處理嗽上。
如上圖所示次舌,Client中在終端輸入指令后,會被封裝成一個Request
請求炸裆,通過submitRequest
垃它,進一步被封裝成Packet
包鲜屏,提交給SendThread
處理烹看。
SendThread
通過doTransport
將Packet
發(fā)送給Server,并通過readResponse
獲取結(jié)果,解析成一個Event
洛史,再將Event
加入EventThread
的隊列中等待執(zhí)行惯殊。
EventThread
通過processEvent
消費隊列中的Event
事件。
SendThread
SendThread
的主要作用除了將Packet
包發(fā)送給Server之外也殖,還負責維持Client和Server之間的心跳土思,確保 session 存活。
現(xiàn)在讓我們從源碼出發(fā)忆嗜,看看SendThread
究竟是如何運行的己儒。
SendThread
是一個線程類,因此我們進入其run()
方法捆毫,看看他的啟動流程闪湾。
while (state.isAlive()) {
if (!clientCnxnSocket.isConnected()) {
// 啟動和server的socket鏈接
startConnect();
}
// 根據(jù)上次的連接時間,判斷是否超時
if (state.isConnected()) {
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
if (to <= 0) {
throw new SessionTimeoutException(warnInfo);
}
// 發(fā)送心跳包
if (state.isConnected()) {
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
sendPing();
clientCnxnSocket.updateLastSend();
}
}
// 將指令信息發(fā)送給 Server
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
}
從上面的代碼中绩卤,可以看出SendThread
的主要任務(wù)如下:
- 創(chuàng)建同 Server 之間的 socket 鏈接
- 判斷鏈接是否超時
- 定時發(fā)送心跳任務(wù)
- 將ZooKeeper指令發(fā)送給Server
與 Server 的長鏈接
ZooKeeper
通過獲取ZOOKEEPER_CLIENT_CNXN_SOCKET
變量構(gòu)造了一個ClientCnxnSocket
對象途样,默認情況下是ClientCnxnSocketNIO
類
String clientCnxnSocketName = System
.getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
if (clientCnxnSocketName == null) {
clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
}
在ClientCnxnSocketNIO::connect
中我們可以看到這里同Server之間創(chuàng)建了一個socket鏈接。
SocketChannel sock = createSock();
registerAndConnect(sock, addr);
超時與心跳
在SendThread::run
中濒憋,可以看到針對鏈接是否建立分別有readTimeout
和connetTimeout
兩種超時時間何暇,一旦發(fā)現(xiàn)鏈接超時,則拋出異常凛驮,終止 SendThread
裆站。
在沒有超時的情況下,如果判斷距離上次心跳時間超過了1/2個超時時間,會再次發(fā)送心跳數(shù)據(jù)遏插,避免訪問超時捂贿。
發(fā)送 ZooKeeper 指令
在時序圖中,我們看到從終端輸入指令后胳嘲,我們會將其解析成一個Packet
包厂僧,等待SendThread
進行發(fā)送。
以ZooKeeper::create
為例
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.create);
CreateRequest request = new CreateRequest();
CreateResponse response = new CreateResponse();
request.setData(data);
request.setFlags(createMode.toFlag());
request.setPath(serverPath);
if (acl != null && acl.size() == 0) {
throw new KeeperException.InvalidACLException();
}
request.setAcl(acl);
ReplyHeader r = cnxn.submitRequest(h, request, response, null);
在這里create指令了牛,被封裝成了一個 CreateRequest
颜屠,通過submitRequest
被轉(zhuǎn)成了一個Packet
包
public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration)
throws InterruptedException {
ReplyHeader r = new ReplyHeader();
Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration);
synchronized (packet) {
while (!packet.finished) {
packet.wait();
}
}
return r;
}
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration) {
Packet packet = null;
// Note that we do not generate the Xid for the packet yet. It is
// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
// where the packet is actually sent.
synchronized (outgoingQueue) {
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
// mark as closing
if (h.getType() == OpCode.closeSession) {
closing = true;
}
outgoingQueue.add(packet);
}
}
sendThread.getClientCnxnSocket().wakeupCnxn();
return packet;
}
在submitRequest
中,我們進一步看到Request
被封裝成一個Packet
包鹰祸,并加入SendThread::outgoingQueue
隊列中甫窟,等待執(zhí)行。
Note:在這里我們還看到蛙婴,ZooKeeper方法中所謂的同步方法其實就是在Packet
被提交到SendThread
之后粗井,陷入一個while
循環(huán),等待處理完成后再跳出的過程
在SendThread::run
的while
循環(huán)中街图,ZooKeeper通過doTransport
將存放在outgoingQueue
中的Packet
包發(fā)送給 Server浇衬。
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn) {
if (sockKey.isReadable()) {
// 讀取response信息
sendThread.readResponse(incomingBuffer);
}
if (sockKey.isWritable()) {
Packet p = findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress());
sock.write(p.bb);
}
}
在doIO
發(fā)送socket信息之前,先從socket中獲取返回數(shù)據(jù)餐济,通過readResonse
進行處理耘擂。
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");
if (replyHdr.getXid() == -1) {
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
WatchedEvent we = new WatchedEvent(event);
eventThread.queueEvent( we );
}
}
在readReponse
中,通過解析數(shù)據(jù),我們可以得到WatchedEvent
對象絮姆,并將其壓入EventThread
的消息隊列醉冤,等待分發(fā)
EventThread
public void run() {
while (true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
processEvent(event);
}
}
在EventThread
中通過processEvent
對隊列中的事件進行消費,并分發(fā)給不同的Watcher
watch事件注冊和分發(fā)
通常在ZooKeeper中篙悯,我們會為指定節(jié)點添加一個Watcher
蚁阳,用于監(jiān)聽節(jié)點變化情況,以ZooKeeper:exist
為例
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new ExistsWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.exists);
ExistsRequest request = new ExistsRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
SetDataResponse response = new SetDataResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
代碼的大致邏輯和create
類似鸽照,但是對wathcer做了一層ExistWatchRegistration
的包裝螺捐,當packet
對象完成請求之后,調(diào)用register
方法移宅,根據(jù)不同包裝的WatchRegistration
將watch注冊到不同watch列表中归粉,等待回調(diào)。
if (p.watchRegistration != null) {
p.watchRegistration.register(p.replyHeader.getErr());
}
在 ZooKeeper 中一共有三種類型的WatchRegistration
漏峰,分別對應(yīng)DataWatchRegistration
,ChildWatchRegistration
,ExistWatchRegistration
糠悼。 并在ZKWatchManager
類中根據(jù)每種類型的WatchRegistration
,分別有一張map表負責存放。
private final Map<String, Set<Watcher>> dataWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches =
new HashMap<String, Set<Watcher>>();
當 EventThread::processEvent
時浅乔,根據(jù)event
的所屬路徑倔喂,從三張map中獲取對應(yīng)的watch列表進行消息通知及處理铝条。
總結(jié)
client 端的源碼分析就到此為止了。
ZooKeeper Client 的源碼很簡單席噩,擁有三個獨立線程分別對命令進行處理班缰,分發(fā)和響應(yīng)操作,在保證各個線程相互獨立的基礎(chǔ)上悼枢,盡可能避免了多線程操作中出現(xiàn)鎖的情況埠忘。