摘要
事務(wù)提交處理器蛉谜。對于非事務(wù)請求讼庇,該處理器會直接將其交付給下一級處理器處理;對于事務(wù)請求做祝,其會等待集群內(nèi)針對Proposal的投票直到該Proposal可被提交报腔,利用CommitProcessor,每個服務(wù)器都可以很好地控制對事務(wù)請求的順序處理剖淀。
屬性
private static final Logger LOG = LoggerFactory.getLogger(CommitProcessor.class);
/**
* Requests that we are holding until the commit comes in.
*/
LinkedList<Request> queuedRequests = new LinkedList<Request>();//請求隊列
/**
* Requests that have been committed.
*/
LinkedList<Request> committedRequests = new LinkedList<Request>();
RequestProcessor nextProcessor;//下一個處理器
ArrayList<Request> toProcess = new ArrayList<Request>();//待處理的隊列
/**
* This flag indicates whether we need to wait for a response to come back from the
* leader or we just let the sync operation flow through like a read. The flag will
* be true if the CommitProcessor is in a Leader pipeline.
*/
boolean matchSyncs;//看sync的請求是等待leader回復(fù),還是說直接處理纤房,像讀請求一樣纵隔。對于leader是false,對于learner是true
volatile boolean finished = false;
說明:
commitProcessor區(qū)分事務(wù)請求和非事務(wù)請求
matchSyncs 在leader端是false炮姨,learner端是true捌刮,因為learner端sync請求需要等待leader回復(fù),而leader端本身則不需要
函數(shù)
構(gòu)造函數(shù)
public CommitProcessor(RequestProcessor nextProcessor, String id,
boolean matchSyncs, ZooKeeperServerListener listener) {
super("CommitProcessor:" + id, listener);
this.nextProcessor = nextProcessor;
this.matchSyncs = matchSyncs;
}
processRequest
處理請求
synchronized public void processRequest(Request request) {
// request.addRQRec(">commit");
if (LOG.isDebugEnabled()) {
LOG.debug("Processing request:: " + request);
}
if (!finished) {
queuedRequests.add(request);//生產(chǎn)到請求隊列
notifyAll();
}
}
注意上鎖
commit
提交請求請求
synchronized public void commit(Request request) {//事務(wù)請求提交
if (!finished) {//只要沒有結(jié)束
if (request == null) {
LOG.warn("Committed a null!",
new Exception("committing a null! "));
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Committing request:: " + request);
}
committedRequests.add(request);//進入已提交隊列
notifyAll();//通知
}
}
shutdown
關(guān)閉
public void shutdown() {
LOG.info("Shutting down");
synchronized (this) {
finished = true;
queuedRequests.clear();
notifyAll();
}
if (nextProcessor != null) {
nextProcessor.shutdown();
}
}
run
核心的線程方法舒岸,先貼代碼再分析
@Override
public void run() {
try {
Request nextPending = null;//下一個未處理的事務(wù)請求(不含leader端的sync請求),只要為null绅作,都會while循環(huán)從queuedRequests里面找到第一個事務(wù)請求,或者直到隊列為空
while (!finished) {//只要沒有shutdown
int len = toProcess.size();
for (int i = 0; i < len; i++) {
nextProcessor.processRequest(toProcess.get(i));//待處理隊列交給下個處理器,按順序處理
}
toProcess.clear();//隊列清空
synchronized (this) {//注意這里上鎖蛾派,不會出現(xiàn)執(zhí)行到過程中俄认,queuedRequests的size變了
if ((queuedRequests.size() == 0 || nextPending != null) //這部分結(jié)合尾部的while來讀,要么 請求隊列remove干凈洪乍,要么從中找到一個事務(wù)請求眯杏,賦值給nextPending, 不允許size>0且nextPending == null的情況
&& committedRequests.size() == 0) {//且 沒有已提交事務(wù)
wait();
continue;
}
// First check and see if the commit came in for the pending
// request
if ((queuedRequests.size() == 0 || nextPending != null)// 不允許size>0且nextPending == null的情況
&& committedRequests.size() > 0) {//如果有 已提交的請求
Request r = committedRequests.remove();
/*
* We match with nextPending so that we can move to the
* next request when it is committed. We also want to
* use nextPending because it has the cnxn member set
* properly.
*/
if (nextPending != null
&& nextPending.sessionId == r.sessionId
&& nextPending.cxid == r.cxid) {//如果和nextPending匹配
// we want to send our version of the request.
// the pointer to the connection in the request
nextPending.hdr = r.hdr;
nextPending.txn = r.txn;
nextPending.zxid = r.zxid;
toProcess.add(nextPending);//加入待處理隊列
nextPending = null;//下一個pend的請求清空
} else {
// this request came from someone else so just
// send the commit packet
toProcess.add(r);//這種情況是nextPending還沒有來的及設(shè)置,nextPending==null的情況(代碼應(yīng)該再細分一下if else),不可能出現(xiàn)nextPending!=null而走到了這里的情況(算異常)
}
}
}
// We haven't matched the pending requests, so go back to
// waiting
if (nextPending != null) {//如果還有 未處理的事務(wù)請求(不含leader端的sync請求),就continue
continue;
}
synchronized (this) {//這一段的目的是找到一個 給nextPending賦值
// Process the next requests in the queuedRequests
while (nextPending == null && queuedRequests.size() > 0) {//只要queuedRequests隊列不空壳澳,從中找到第一個 事務(wù)請求(不含leader端的sync請求),前面的其他請求全部加入待處理隊列
Request request = queuedRequests.remove();
switch (request.type) {
case OpCode.create:
case OpCode.delete:
case OpCode.setData:
case OpCode.multi:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
nextPending = request;
break;//大部分事務(wù)請求直接賦給nextPending岂贩,然后break
case OpCode.sync:
if (matchSyncs) {//如果需要等leader返回,該值learner端為true
nextPending = request;
} else {
toProcess.add(request);//不需要的話,直接加入待處理隊列里
}
break;//leader端matchSyncs是false巷波,learner端才需要等leader回復(fù)萎津,這里也break
default:
toProcess.add(request);//非事務(wù)請求卸伞,都直接加入待處理隊列
}
}
}
}
} catch (InterruptedException e) {
LOG.warn("Interrupted exception while waiting", e);
} catch (Throwable e) {
LOG.error("Unexpected exception causing CommitProcessor to exit", e);
}
LOG.info("CommitProcessor exited loop!");
}
注意各種上鎖控制并發(fā)
里面的代碼寫的晦澀難懂,是我看過zk代碼里面最想吐槽的代碼了★鼻現(xiàn)在最新版本的zk這個類已經(jīng)改的面目全非了荤傲。
https://github.com/apache/zookeeper/blob/master/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
代碼可以拆成幾個部分
完全可以按照 1,4,2,3的順序來讀,
1部分:遍歷toProcess隊列(非事務(wù)請求或者已經(jīng)提交的事務(wù)請求),交給下一個處理器處理部念,清空
4部分:只要不存在pend住的事務(wù)請求并且請求隊列不為空弃酌,一直遍歷請求隊列直到出現(xiàn)第一個事務(wù)請求或者隊列遍歷完,其間所有非事務(wù)請求全部加入toProcess隊列,代表可以直接交給下一個處理器處理的
2部分:在請求隊列remove干凈或者找到了事務(wù)請求的情況下儡炼,
如果沒有提交的請求妓湘,就等待。
如果有提交的請求乌询,取出來榜贴,看和之前記錄的下一個pend的請求是否match。
match的話妹田,進入toProcess隊列唬党,nextPending置空
不match的話,(基本上是nextPending為null,不會出現(xiàn)不為null且不匹配的情況),進入toProcess處理
3部分:如果 nextPending非空鬼佣,就不用再去遍歷請求隊列驶拱,找到下一個事務(wù)請求(即4部分),因此continue掉
思考
事務(wù)連續(xù)性怎么保證的
《paoxs到zk》說這里保證的晶衷,對此強烈懷疑蓝纲。
事務(wù)連續(xù)性看代碼應(yīng)該是各角色機器單線程處理保證的。(refer中 新版本就多線程了晌纫,一寫多讀)
因為run方法2部分里面的else根本沒有檢測和nextPending不match的情況
因此個人理解2部分的else中税迷,基本都是nextPending為null,屬于還沒來的及找nextPending锹漱,然后commit方法就被調(diào)用了箭养,就直接處理了
完善的寫法應(yīng)該是這里寫清楚,至少做一個不為空且不match的檢查才好
run方法第2部分if語句的理解
(queuedRequests.size() == 0 || nextPending != null)
這個是針對第4部分while循環(huán)的條件哥牍,缺厦凇!
就是說要么隊列清空了 要么 找到nextPending
不允許 請求隊列不為空 且不存在 nextPending的情況
run方法nextPending的意義
下一個要處理的事務(wù)請求
吐槽
run方法
這是我看zk以來最糟心的代碼嗅辣。
順序上面已經(jīng)說過了懈词,按1,4,2,3來看
然后if條件,第二部分直接把
(queuedRequests.size() == 0 || nextPending != null)
抽到上層去不行嗎辩诞,一定要寫兩遍嗎坎弯。
然后else根本沒有完成檢查,讓人一開始根本搞不清楚nextPending的意義是什么,
反正匹配不匹配抠忘,大家都進入toProcess隊列撩炊。何必要寫nextPending。
看起來像是保證事務(wù)順序的崎脉,實際上事務(wù)順序是單線程保證的拧咳,和nextPending也沒關(guān)系。
refer
http://www.reibang.com/p/68c91b42ccd8
https://github.com/apache/zookeeper/blob/master/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
《paxos到zk》