集群處理請求分兩種:事務(wù)和非事務(wù)劲室,對于非事務(wù),請求處理和單機類似获搏,節(jié)點本地就可以完成數(shù)據(jù)的請求;事務(wù)請求需要提交給Leader處理
失乾,Leader以投票的形式常熙,等待半數(shù)的Follower的投票纬乍,完成同步后才將操作結(jié)果返回
。
這里裸卫,無論什么模式蕾额、節(jié)點類型,處理客戶端請求的都是ServerCnxnFactory的子類彼城,默認為NIOServerCnxnFactory诅蝶,只是其內(nèi)部處理調(diào)用鏈的zkServer實例不同,單機模式為ZooKeeperServer的實例募壕,其他類型的節(jié)點使用ZooKeeperServer類的子類. ZooKeeperServer的子類UML類圖如下:
1 這么多ZooKeeperServer的子類调炬,一個事物請求來了,調(diào)用什么方法去處理事物
1.1 在org.apache.zookeeper.server.quorum.QuorumPeer#run方法中舱馅,首先確定角色缰泡。
while (running) {
switch (getPeerState()) {
case LOOKING:
"省略n行代碼====================="
"當(dāng)peerState是LOOKING的時候,進行選舉投票代嗤,選舉出leader"
setCurrentVote(makeLEStrategy().lookForLeader());
}
break;
case OBSERVING:
try {
LOG.info("OBSERVING");
"當(dāng)選舉過之后是OBSERVING 狀態(tài)棘钞,那么實例化的是ObserverZooKeeperServer"
setObserver(makeObserver(logFactory));
"同步leader"
observer.observeLeader();
}
"省略n行代碼====================="
break;
case FOLLOWING:
try {
LOG.info("FOLLOWING");
"當(dāng)選舉過之后是FOLLOWING狀態(tài),那么實例化的是
FollowerZooKeeperServer"
setFollower(makeFollower(logFactory));
"同步leader"
follower.followLeader();
}
"省略n行代碼====================="
break;
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
}
"省略n行代碼====================="
break;
}
}
從上面的代碼可以看出干毅,在選舉成功之后宜猜,就確定了,每個服務(wù)器是什么狀態(tài)硝逢,也就確定是什么ZooKeeperServer實例姨拥。
1.2 對于每個ZooKeeperServer實例,他的業(yè)務(wù)處理鏈是不同的渠鸽。
責(zé)任鏈由setupRequestProcessors方法確定
比如FollowerZooKeeperServer實例
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor,
Long.toString(getServerId()), true,
getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
syncProcessor = new SyncRequestProcessor(this,
new SendAckRequestProcessor((Learner)getFollower()));
syncProcessor.start();
}
firstProcessor 是FollowerRequestProcessor叫乌,下一個是CommitProcessor,再下一個是
FinalRequestProcessor徽缚。而且還另外聚合了SyncRequestProcessor憨奸,下一個是SendAckRequestProcessor 。
1.3 那這個處理鏈是什么時候確定的呢凿试?
case OBSERVING:
try {
"實例化ObserverZookeeperServer"
setObserver(makeObserver(logFactory));
"從 leader那邊同步代碼,并且完成ObserverZookeeperServer的初始化排宰,包括責(zé)任鏈的建立"
observer.observeLeader();
}
"省略n行代碼====================="
break;
case FOLLOWING:
try {
LOG.info("FOLLOWING");
"實例化FollowerZookeeperServer"
setFollower(makeFollower(logFactory));
"從 leader那邊同步代碼,并且完成FollowerZookeeperServer的初始化红省,包括責(zé)任鏈的建立"
follower.followLeader();
}
"省略n行代碼====================="
break;
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
}
break;
}
在org.apache.zookeeper.server.quorum.Learner#syncWithLeader的469行 zk.startup(); org.apache.zookeeper.server.quorum.Leader#lead 431 行 startZkServer();
調(diào)用下面的setupRequestProcessors方法额各,構(gòu)建責(zé)任鏈。
因為各自都繼承了zookeeperServer(繼承結(jié)構(gòu)看上圖)吧恃,并且重寫了setupRequestProcessors方法虾啦。
所以這里實際上是調(diào)用了各種ZookeeperServer實例的setupRequestProcessors方法。
public synchronized void startup() {
if (sessionTracker == null) {
createSessionTracker();
}
startSessionTracker();
setupRequestProcessors();
registerJMX();
setState(State.RUNNING);
notifyAll();
}
2 請求的入口在哪里?
"org.apache.zookeeper.server.NIOServerCnxnFactory#run"
當(dāng)讀寫事件就緒時傲醉,NIOServerCnxn對象進行IO任務(wù)蝇闭。
else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
NIOServerCnxn c = (NIOServerCnxn) k.attachment();
c.doIO(k);
}
"org.apache.zookeeper.server.ZooKeeperServer#processPacket"
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
h.getType(), incomingBuffer, cnxn.getAuthInfo());
si.setOwner(ServerCnxn.me);
submitRequest(si);
try {
touch(si.cnxn);
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
"丟給對應(yīng)的firstProcessor去處理,事物邏輯硬毕。
而對于不同的角色呻引,比如說leader,對應(yīng)的是LeaderZooKeeperServer
,而 follower對應(yīng)的是FollowerZookeeperServer"
firstProcessor.processRequest(si);
if (si.cnxn != null) {
incInProcess();
}
} else {
LOG.warn("Received packet at server of unknown type " + si.type);
new UnimplementedRequestProcessor().processRequest(si);
}
}
submitRequest之后,已經(jīng)丟給具體的責(zé)任鏈去處理了吐咳。而不同角色逻悠,不同的ZOokeeperServer實例,對應(yīng)的firstProcessor是不同的韭脊。
大概流程圖如下:
3 當(dāng)客戶端請求到達leader的時候童谒,事物的流程是怎么樣的?
leader的firstProcessor沪羔,從LeaderZooKeeperServer#setupRequestProcessors方法中饥伊,可以看出PrepRequestProcessor是firstProcessor
。processRequest方法蔫饰,只是把request對象添加到submittedRequests阻塞隊列中琅豆。業(yè)務(wù)處理在run方法中。
3.1 PrepRequestProcessor 對事物請求加事物頭篓吁,非事物請求茫因,checkSession
while (true) {
Request request = submittedRequests.take();
pRequest(request);
}
switch (request.type) {
case OpCode.create:
CreateRequest createRequest = new CreateRequest();
"如果是 create,delete ,set等改變內(nèi)存數(shù)據(jù)庫,zkDatabase的請求越除,轉(zhuǎn)化成事物請求"
pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true);
break;
"省略n行代碼====================="
//create/close session don't require request record
case OpCode.createSession:
case OpCode.closeSession:
"因為zookeeper的事物請求都是leader處理的节腐,
所以他的分布式唯一id,
只要在leader側(cè)ks.getNextZxid()摘盆,
唯一即可。AtomicLong 類型饱苟,保證事物請求并發(fā)時孩擂,線程安全。"
pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
break;
"對內(nèi)存數(shù)據(jù)庫zkDatabase不做改變的箱熬,就checkSession"
//All the rest don't need to create a Txn - just verify session
case OpCode.sync:
case OpCode.exists:
case OpCode.getData:
case OpCode.getACL:
case OpCode.getChildren:
case OpCode.getChildren2:
case OpCode.ping:
case OpCode.setWatches:
zks.sessionTracker.checkSession(request.sessionId,
request.getOwner());
break;
default:
LOG.warn("unknown type " + request.type);
break;
}
request.zxid = zks.getZxid();
nextProcessor.processRequest(request);
3.2 ProposalRequestProcessor类垦。對事物請求和非事物請求分流
ProposalRequestProcessor#processRequest
if(request instanceof LearnerSyncRequest){
zks.getLeader().processSync((LearnerSyncRequest)request);
} else {
"非事物請求交給下一個CommitProcessor"
nextProcessor.processRequest(request);
"hdr不為空,說明是事物請求城须,委托給leader蚤认,發(fā)送proposal消息"
if (request.hdr != null) {
// We need to sync and get consensus on any transactions
try {
zks.getLeader().propose(request);
} catch (XidRolloverException e) {
throw new RequestProcessorException(e.getMessage(), e);
}
"并且自己先持久化到txnLog日志里面"
syncProcessor.processRequest(request);
}
}
3.2.2 首先講一下,事物消息刷新到txnLog的過程糕伐。syncProcessor處理持久化事物日志的過程砰琢。
public void run() {
try {
int logCount = 0;
// we do this in an attempt to ensure that not all of the servers
// in the ensemble take a snapshot at the same time
"不讓所有的zookeeperServer一起發(fā)起快照"
setRandRoll(r.nextInt(snapCount/2));
while (true) {
Request si = null;
"第一輪循環(huán)是toFlush為空,進入下面的si!=null 的判斷"
if (toFlush.isEmpty()) {
si = queuedRequests.take();
} else {
si = queuedRequests.poll();
if (si == null) {
flush(toFlush);
continue;
}
}
if (si == requestOfDeath) {
break;
}
if (si != null) {
// track the number of records written to the log
"調(diào)用FileTxnLog#append,追加transaction log 到日志文件"
if (zks.getZKDatabase().append(si)) {
logCount++;
if (logCount > (snapCount / 2 + randRoll)) {
setRandRoll(r.nextInt(snapCount/2));
// roll the log
zks.getZKDatabase().rollLog();
// take a snapshot
if (snapInProcess != null && snapInProcess.isAlive()) {
LOG.warn("Too busy to snap, skipping");
} else {
snapInProcess = new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
} catch(Exception e) {
LOG.warn("Unexpected exception", e);
}
}
};
snapInProcess.start();
}
logCount = 0;
}
} else if (toFlush.isEmpty()) {
// optimization for read heavy workloads
// iff this is a read, and there are no pending
// flushes (writes), then just pass this to the next
// processor
if (nextProcessor != null) {
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
continue;
}
"第一輪最后陪汽,添加到toFlush训唱,第二輪就會flush,也就是hit the disk "
toFlush.add(si);
if (toFlush.size() > 1000) {
flush(toFlush);
}
}
}
} catch (Throwable t) {
handleException(this.getName(), t);
running = false;
}
LOG.info("SyncRequestProcessor exited!");
}
下面是flush方法
private void flush(LinkedList<Request> toFlush)
throws IOException, RequestProcessorException
{
if (toFlush.isEmpty())
return;
"這里的commit才是真正的hit the disk , "
"而前面的append只是加入到groupcommit的數(shù)組中"
"LinkedList<FileOutputStream> streamsToFlush"
zks.getZKDatabase().commit();
while (!toFlush.isEmpty()) {
Request i = toFlush.remove();
if (nextProcessor != null) {
"這里進入AckRequestProcessor"
nextProcessor.processRequest(i);
}
}
if (nextProcessor != null && nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
tip:
磁盤寫緩存只有強制事務(wù)日志刷到磁盤后,server才能對proposal進行ack操作挚冤。說得更明白一點况增,server會調(diào)用ZKDatabase的commit方法,這最終會調(diào)用FileChannel.force方法训挡。這樣澳骤,server會在ack之前保證事務(wù)已被持久化到磁盤。關(guān)于此事還有一點要注意澜薄,現(xiàn)代磁盤有一個寫緩存宴凉,可以保存要寫到磁盤的數(shù)據(jù)。如果啟用了寫緩存表悬,強行刷新不能保證返回的時候數(shù)據(jù)已落到磁盤弥锄,數(shù)據(jù)會落到寫緩存中。為了保證在FileChannel.force()返回后數(shù)據(jù)落到磁盤蟆沫,要禁用寫磁盤緩存籽暇。操作系統(tǒng)有許多方式可以禁用
3.2.1 發(fā)送proposal消息
Leader#propose
public Proposal propose(Request request) throws XidRolloverException {
"省略n行代碼======================"
"packetType =PROPOSAL "
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
baos.toByteArray(), null);
Proposal p = new Proposal();
p.packet = pp;
p.request = request;
synchronized (this) {
if (LOG.isDebugEnabled()) {
LOG.debug("Proposing:: " + request);
}
lastProposed = p.packet.getZxid();
"ConcurrentMap<Long, Proposal> outstandingProposals 發(fā)送出去的提案"
outstandingProposals.put(lastProposed, p);
sendPacket(pp);
}
return p;
}
void sendPacket(QuorumPacket qp) {
"HashSet<LearnerHandler> forwardingFollowers"
"給所有的follower發(fā)送proposal消息"
"LearnerHandler 是leader和learner的通訊紐帶"
synchronized (forwardingFollowers) {
for (LearnerHandler f : forwardingFollowers) {
f.queuePacket(qp);
}
}
}
3.3 AckRequestProcessor 判斷ackSet是否過半。如果是饭庞,就發(fā)送COMMIT和INFORM 消息
public void processRequest(Request request) {
QuorumPeer self = leader.self;
if(self != null)
leader.processAck(self.getId(), request.zxid, null);
else
LOG.error("Null QuorumPeer");
}
Leader#processAck
synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
"省略n行代碼======================"
"前面ProposalProcessor已經(jīng)放進去一個了"
Proposal p = outstandingProposals.get(zxid);
"leader添加自己到ackSet中戒悠,自己肯定投自己的提案,只要再接受一個ACK就"
"可以通過了"
p.ackSet.add(sid);
"判斷是否過半"
if (self.getQuorumVerifier().containsQuorum(p.ackSet)){
if (zxid != lastCommitted+1) {
LOG.warn("Commiting zxid 0x{} from {} not first!",
Long.toHexString(zxid), followerAddr);
LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
}
outstandingProposals.remove(zxid);
if (p.request != null) {
toBeApplied.add(p);
}
if (p.request == null) {
LOG.warn("Going to commmit null request for proposal: {}", p);
}
commit(zxid);
inform(p);
zk.commitProcessor.commit(p.request);
if(pendingSyncs.containsKey(zxid)){
for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
sendSync(r);
}
}
}
}
這兒有個疑問
就是加入執(zhí)行過半判斷舟山,還沒有接受到過半的ACK怎么辦绸狐?直接認定失敗累盗?
3.4 follower.followLeader的時候寒矿,接受PROPOSAL消息
Follower#followLeader
while (this.isRunning()) {
readPacket(qp);
processPacket(qp);
}
Follower#processPacket
"處理PROPOSAL消息"
case Leader.PROPOSAL:
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
if (hdr.getZxid() != lastQueued + 1) {
LOG.warn("Got zxid 0x"
+ Long.toHexString(hdr.getZxid())
+ " expected 0x"
+ Long.toHexString(lastQueued + 1));
}
lastQueued = hdr.getZxid();
"進入FollowerZooKeeperServer#logRequest方法,進入syncProcessor"
"然后進入SyncRequestProcessor#flush方法若债,進入下一個Processor"
fzk.logRequest(hdr, txn);
break;
3.5 SendAckRequestProcessor follower發(fā)送ACK 給leader,發(fā)現(xiàn)observer是沒有發(fā)的
public void processRequest(Request si) {
if(si.type != OpCode.sync){
QuorumPacket qp = new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null,
null);
try {
learner.writePacket(qp, false);
} catch (IOException e) {
LOG.warn("Closing connection to leader, exception during packet send", e);
try {
if (!learner.sock.isClosed()) {
learner.sock.close();
}
} catch (IOException e1) {
// Nothing to do, we are shutting things down, so an exception here is irrelevant
LOG.debug("Ignoring error closing the connection", e1);
}
}
}
}
3.6 LearnerHandler接受ACK
LearnerHandler#run 576 行
switch (qp.getType()) {
case Leader.ACK:
if (this.learnerType == LearnerType.OBSERVER) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received ACK from Observer " + this.sid);
}
}
syncLimitCheck.updateAck(qp.getZxid());
"這里的sid符相,指的就是發(fā)送ACK對應(yīng)的Follower"
leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
break;
然后Leader#processAck方法,把上面的follower的sid蠢琳,假如到對應(yīng)Proposal的ackSet中啊终。
Leader#processAck
"過半之后"
if (self.getQuorumVerifier().containsQuorum(p.ackSet)){
if (zxid != lastCommitted+1) {
LOG.warn("Commiting zxid 0x{} from {} not first!",
Long.toHexString(zxid), followerAddr);
LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
}
outstandingProposals.remove(zxid);
if (p.request != null) {
"將要被使用的proposal"
toBeApplied.add(p);
}
if (p.request == null) {
LOG.warn("Going to commmit null request for proposal: {}", p);
}
"給follower發(fā)送COMMIT 消息,給observer發(fā)送INFORM消息"
commit(zxid);
inform(p);
zk.commitProcessor.commit(p.request);
if(pendingSyncs.containsKey(zxid)){
for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
sendSync(r);
}
}
}
3.6.1 follower接受COMMIT消息,并且commit
Follower#processPacket
case Leader.COMMIT:
fzk.commit(qp.getZxid());
CommitProcessor
CommitProcessor#run
public void run() {
try {
Request nextPending = null;
while (!finished) {
"第一輪循環(huán)toProcess長度是0 傲须,committedRequests長度是1 "
int len = toProcess.size();
for (int i = 0; i < len; i++) {
nextProcessor.processRequest(toProcess.get(i));
}
toProcess.clear();
synchronized (this) {
if ((queuedRequests.size() == 0 || nextPending != null)
&& committedRequests.size() == 0) {
wait();
continue;
}
// First check and see if the commit came in for the pending
// request
"第一輪循環(huán)toProcess長度是0 蓝牲,committedRequests長度是1 走這里,加入到toProcess "
if ((queuedRequests.size() == 0 || nextPending != null)
&& committedRequests.size() > 0) {
Request r = committedRequests.remove();
/*
* We match with nextPending so that we can move to the
* next request when it is committed. We also want to
* use nextPending because it has the cnxn member set
* properly.
*/
if (nextPending != null
&& nextPending.sessionId == r.sessionId
&& nextPending.cxid == r.cxid) {
// we want to send our version of the request.
// the pointer to the connection in the request
nextPending.hdr = r.hdr;
nextPending.txn = r.txn;
nextPending.zxid = r.zxid;
toProcess.add(nextPending);
nextPending = null;
} else {
// this request came from someone else so just
// send the commit packet
toProcess.add(r);
}
}
}
// We haven't matched the pending requests, so go back to
// waiting
if (nextPending != null) {
continue;
}
synchronized (this) {
// Process the next requests in the queuedRequests
while (nextPending == null && queuedRequests.size() > 0) {
Request request = queuedRequests.remove();
switch (request.type) {
case OpCode.create:
case OpCode.delete:
case OpCode.setData:
case OpCode.multi:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
nextPending = request;
break;
case OpCode.sync:
if (matchSyncs) {
nextPending = request;
} else {
toProcess.add(request);
}
break;
default:
toProcess.add(request);
}
}
}
}
} catch (InterruptedException e) {
LOG.warn("Interrupted exception while waiting", e);
} catch (Throwable e) {
LOG.error("Unexpected exception causing CommitProcessor to exit", e);
}
LOG.info("CommitProcessor exited loop!");
}
3.6.2 然后就是follower 的FinalRequestProcessor
synchronized (zks.outstandingChanges) {
while (!zks.outstandingChanges.isEmpty()
&& zks.outstandingChanges.get(0).zxid <= request.zxid) {
ChangeRecord cr = zks.outstandingChanges.remove(0);
if (cr.zxid < request.zxid) {
LOG.warn("Zxid outstanding "
+ cr.zxid
+ " is less than current " + request.zxid);
}
if (zks.outstandingChangesForPath.get(cr.path) == cr) {
zks.outstandingChangesForPath.remove(cr.path);
}
}
if (request.hdr != null) {
TxnHeader hdr = request.hdr;
Record txn = request.txn;
"進入ZkDatabase處理txn"
rc = zks.processTxn(hdr, txn);
}
// do not add non quorum packets to the queue.
if (Request.isQuorum(request.type)) {
zks.getZKDatabase().addCommittedProposal(request);
}
}
以及構(gòu)建response對象返回泰讽。
leader側(cè)同上例衍。
4 假如request是發(fā)送到follower或者observer的話昔期,發(fā)送REQUEST消息給leader,讓leader來處理
FollowerRequestProcessor#run
while (!finished) {
Request request = queuedRequests.take();
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
'F', request, "");
}
if (request == Request.requestOfDeath) {
break;
}
// We want to queue the request to be processed before we submit
// the request to the leader so that we are ready to receive
// the response
nextProcessor.processRequest(request);
// We now ship the request to the leader. As with all
// other quorum operations, sync also follows this code
// path, but different from others, we need to keep track
// of the sync operations this follower has pending, so we
// add it to pendingSyncs.
switch (request.type) {
case OpCode.sync:
zks.pendingSyncs.add(request);
"發(fā)送REQUEST消息給leader"
zks.getFollower().request(request);
break;
case OpCode.create:
case OpCode.delete:
case OpCode.setData:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
case OpCode.multi:
"發(fā)送REQUEST消息給leader"
zks.getFollower().request(request);
break;
}
}