zk源碼閱讀49:CommitProcessor源碼解析

摘要

事務(wù)提交處理器蛉谜。對于非事務(wù)請求讼庇,該處理器會直接將其交付給下一級處理器處理;對于事務(wù)請求做祝,其會等待集群內(nèi)針對Proposal的投票直到該Proposal可被提交报腔,利用CommitProcessor,每個服務(wù)器都可以很好地控制對事務(wù)請求的順序處理剖淀。

屬性

    private static final Logger LOG = LoggerFactory.getLogger(CommitProcessor.class);

    /**
     * Requests that we are holding until the commit comes in.
     */
    LinkedList<Request> queuedRequests = new LinkedList<Request>();//請求隊列

    /**
     * Requests that have been committed.
     */
    LinkedList<Request> committedRequests = new LinkedList<Request>();

    RequestProcessor nextProcessor;//下一個處理器
    ArrayList<Request> toProcess = new ArrayList<Request>();//待處理的隊列

    /**
     * This flag indicates whether we need to wait for a response to come back from the
     * leader or we just let the sync operation flow through like a read. The flag will
     * be true if the CommitProcessor is in a Leader pipeline.
     */
    boolean matchSyncs;//看sync的請求是等待leader回復(fù),還是說直接處理纤房,像讀請求一樣纵隔。對于leader是false,對于learner是true
    
    volatile boolean finished = false;

說明:

commitProcessor區(qū)分事務(wù)請求和非事務(wù)請求
matchSyncs 在leader端是false炮姨,learner端是true捌刮,因為learner端sync請求需要等待leader回復(fù),而leader端本身則不需要

函數(shù)

構(gòu)造函數(shù)

    public CommitProcessor(RequestProcessor nextProcessor, String id,
            boolean matchSyncs, ZooKeeperServerListener listener) {
        super("CommitProcessor:" + id, listener);
        this.nextProcessor = nextProcessor;
        this.matchSyncs = matchSyncs;
    }

processRequest

處理請求

    synchronized public void processRequest(Request request) {
        // request.addRQRec(">commit");
        if (LOG.isDebugEnabled()) {
            LOG.debug("Processing request:: " + request);
        }
        
        if (!finished) {
            queuedRequests.add(request);//生產(chǎn)到請求隊列
            notifyAll();
        }
    }

注意上鎖

commit

提交請求請求

   synchronized public void commit(Request request) {//事務(wù)請求提交
        if (!finished) {//只要沒有結(jié)束
            if (request == null) {
                LOG.warn("Committed a null!",
                         new Exception("committing a null! "));
                return;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Committing request:: " + request);
            }
            committedRequests.add(request);//進入已提交隊列
            notifyAll();//通知
        }
    }

shutdown

關(guān)閉

    public void shutdown() {
        LOG.info("Shutting down");
        synchronized (this) {
            finished = true;
            queuedRequests.clear();
            notifyAll();
        }
        if (nextProcessor != null) {
            nextProcessor.shutdown();
        }
    }

run

核心的線程方法舒岸,先貼代碼再分析

@Override
    public void run() {
        try {
            Request nextPending = null;//下一個未處理的事務(wù)請求(不含leader端的sync請求),只要為null绅作,都會while循環(huán)從queuedRequests里面找到第一個事務(wù)請求,或者直到隊列為空
            while (!finished) {//只要沒有shutdown
                int len = toProcess.size();
                for (int i = 0; i < len; i++) {
                    nextProcessor.processRequest(toProcess.get(i));//待處理隊列交給下個處理器,按順序處理
                }
                toProcess.clear();//隊列清空
                synchronized (this) {//注意這里上鎖蛾派,不會出現(xiàn)執(zhí)行到過程中俄认,queuedRequests的size變了
                    if ((queuedRequests.size() == 0 || nextPending != null) //這部分結(jié)合尾部的while來讀,要么 請求隊列remove干凈洪乍,要么從中找到一個事務(wù)請求眯杏,賦值給nextPending, 不允許size>0且nextPending == null的情況
                            && committedRequests.size() == 0) {//且 沒有已提交事務(wù)
                        wait();
                        continue;
                    }
                    // First check and see if the commit came in for the pending
                    // request
                    if ((queuedRequests.size() == 0 || nextPending != null)// 不允許size>0且nextPending == null的情況
                            && committedRequests.size() > 0) {//如果有 已提交的請求
                        Request r = committedRequests.remove();
                        /*
                         * We match with nextPending so that we can move to the
                         * next request when it is committed. We also want to
                         * use nextPending because it has the cnxn member set
                         * properly.
                         */
                        if (nextPending != null
                                && nextPending.sessionId == r.sessionId
                                && nextPending.cxid == r.cxid) {//如果和nextPending匹配
                            // we want to send our version of the request.
                            // the pointer to the connection in the request
                            nextPending.hdr = r.hdr;
                            nextPending.txn = r.txn;
                            nextPending.zxid = r.zxid;
                            toProcess.add(nextPending);//加入待處理隊列
                            nextPending = null;//下一個pend的請求清空
                        } else {
                            // this request came from someone else so just
                            // send the commit packet
                            toProcess.add(r);//這種情況是nextPending還沒有來的及設(shè)置,nextPending==null的情況(代碼應(yīng)該再細分一下if else),不可能出現(xiàn)nextPending!=null而走到了這里的情況(算異常)
                        }
                    }
                }

                // We haven't matched the pending requests, so go back to
                // waiting
                if (nextPending != null) {//如果還有 未處理的事務(wù)請求(不含leader端的sync請求),就continue
                    continue;
                }

                synchronized (this) {//這一段的目的是找到一個 給nextPending賦值
                    // Process the next requests in the queuedRequests
                    while (nextPending == null && queuedRequests.size() > 0) {//只要queuedRequests隊列不空壳澳,從中找到第一個 事務(wù)請求(不含leader端的sync請求),前面的其他請求全部加入待處理隊列
                        Request request = queuedRequests.remove();
                        switch (request.type) {
                        case OpCode.create:
                        case OpCode.delete:
                        case OpCode.setData:
                        case OpCode.multi:
                        case OpCode.setACL:
                        case OpCode.createSession:
                        case OpCode.closeSession:
                            nextPending = request;
                            break;//大部分事務(wù)請求直接賦給nextPending岂贩,然后break
                        case OpCode.sync:
                            if (matchSyncs) {//如果需要等leader返回,該值learner端為true
                                nextPending = request;
                            } else {
                                toProcess.add(request);//不需要的話,直接加入待處理隊列里
                            }
                            break;//leader端matchSyncs是false巷波,learner端才需要等leader回復(fù)萎津,這里也break
                        default:
                            toProcess.add(request);//非事務(wù)請求卸伞,都直接加入待處理隊列
                        }
                    }
                }
            }
        } catch (InterruptedException e) {
            LOG.warn("Interrupted exception while waiting", e);
        } catch (Throwable e) {
            LOG.error("Unexpected exception causing CommitProcessor to exit", e);
        }
        LOG.info("CommitProcessor exited loop!");
    }

注意各種上鎖控制并發(fā)

里面的代碼寫的晦澀難懂,是我看過zk代碼里面最想吐槽的代碼了★鼻現(xiàn)在最新版本的zk這個類已經(jīng)改的面目全非了荤傲。
https://github.com/apache/zookeeper/blob/master/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java

代碼可以拆成幾個部分

1,2部分
3,4部分

完全可以按照 1,4,2,3的順序來讀,

1部分:遍歷toProcess隊列(非事務(wù)請求或者已經(jīng)提交的事務(wù)請求),交給下一個處理器處理部念,清空
4部分:只要不存在pend住的事務(wù)請求并且請求隊列不為空弃酌,一直遍歷請求隊列直到出現(xiàn)第一個事務(wù)請求或者隊列遍歷完,其間所有非事務(wù)請求全部加入toProcess隊列,代表可以直接交給下一個處理器處理的
2部分:在請求隊列remove干凈或者找到了事務(wù)請求的情況下儡炼,
如果沒有提交的請求妓湘,就等待。
如果有提交的請求乌询,取出來榜贴,看和之前記錄的下一個pend的請求是否match。
  match的話妹田,進入toProcess隊列唬党,nextPending置空
  不match的話,(基本上是nextPending為null,不會出現(xiàn)不為null且不匹配的情況),進入toProcess處理
3部分:如果 nextPending非空鬼佣,就不用再去遍歷請求隊列驶拱,找到下一個事務(wù)請求(即4部分),因此continue掉

思考

事務(wù)連續(xù)性怎么保證的

《paoxs到zk》說這里保證的晶衷,對此強烈懷疑蓝纲。
事務(wù)連續(xù)性看代碼應(yīng)該是各角色機器單線程處理保證的。(refer中 新版本就多線程了晌纫,一寫多讀)
因為run方法2部分里面的else根本沒有檢測和nextPending不match的情況
因此個人理解2部分的else中税迷,基本都是nextPending為null,屬于還沒來的及找nextPending锹漱,然后commit方法就被調(diào)用了箭养,就直接處理了
完善的寫法應(yīng)該是這里寫清楚,至少做一個不為空且不match的檢查才好

else里的不合理

run方法第2部分if語句的理解

(queuedRequests.size() == 0 || nextPending != null)

這個是針對第4部分while循環(huán)的條件哥牍,缺厦凇!
就是說要么隊列清空了 要么 找到nextPending
不允許 請求隊列不為空 且不存在 nextPending的情況

run方法nextPending的意義

下一個要處理的事務(wù)請求

吐槽

run方法

這是我看zk以來最糟心的代碼嗅辣。
順序上面已經(jīng)說過了懈词,按1,4,2,3來看
然后if條件,第二部分直接把
(queuedRequests.size() == 0 || nextPending != null)
抽到上層去不行嗎辩诞,一定要寫兩遍嗎坎弯。

然后else根本沒有完成檢查,讓人一開始根本搞不清楚nextPending的意義是什么,
反正匹配不匹配抠忘,大家都進入toProcess隊列撩炊。何必要寫nextPending。

看起來像是保證事務(wù)順序的崎脉,實際上事務(wù)順序是單線程保證的拧咳,和nextPending也沒關(guān)系。

refer

http://www.reibang.com/p/68c91b42ccd8
https://github.com/apache/zookeeper/blob/master/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
《paxos到zk》

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末囚灼,一起剝皮案震驚了整個濱河市骆膝,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌灶体,老刑警劉巖阅签,帶你破解...
    沈念sama閱讀 211,639評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異蝎抽,居然都是意外死亡政钟,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,277評論 3 385
  • 文/潘曉璐 我一進店門樟结,熙熙樓的掌柜王于貴愁眉苦臉地迎上來养交,“玉大人,你說我怎么就攤上這事瓢宦∷榱” “怎么了?”我有些...
    開封第一講書人閱讀 157,221評論 0 348
  • 文/不壞的土叔 我叫張陵驮履,是天一觀的道長鱼辙。 經(jīng)常有香客問我,道長疲吸,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,474評論 1 283
  • 正文 為了忘掉前任前鹅,我火速辦了婚禮摘悴,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘舰绘。我一直安慰自己蹂喻,他們只是感情好,可當我...
    茶點故事閱讀 65,570評論 6 386
  • 文/花漫 我一把揭開白布捂寿。 她就那樣靜靜地躺著口四,像睡著了一般。 火紅的嫁衣襯著肌膚如雪秦陋。 梳的紋絲不亂的頭發(fā)上蔓彩,一...
    開封第一講書人閱讀 49,816評論 1 290
  • 那天,我揣著相機與錄音,去河邊找鬼赤嚼。 笑死旷赖,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的更卒。 我是一名探鬼主播等孵,決...
    沈念sama閱讀 38,957評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼蹂空!你這毒婦竟也來了俯萌?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,718評論 0 266
  • 序言:老撾萬榮一對情侶失蹤上枕,失蹤者是張志新(化名)和其女友劉穎咐熙,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體姿骏,經(jīng)...
    沈念sama閱讀 44,176評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡糖声,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,511評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了分瘦。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蘸泻。...
    茶點故事閱讀 38,646評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖嘲玫,靈堂內(nèi)的尸體忽然破棺而出悦施,到底是詐尸還是另有隱情,我是刑警寧澤去团,帶...
    沈念sama閱讀 34,322評論 4 330
  • 正文 年R本政府宣布抡诞,位于F島的核電站,受9級特大地震影響土陪,放射性物質(zhì)發(fā)生泄漏昼汗。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,934評論 3 313
  • 文/蒙蒙 一鬼雀、第九天 我趴在偏房一處隱蔽的房頂上張望顷窒。 院中可真熱鬧,春花似錦源哩、人聲如沸鞋吉。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,755評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽谓着。三九已至,卻和暖如春坛掠,著一層夾襖步出監(jiān)牢的瞬間封寞,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,987評論 1 266
  • 我被黑心中介騙來泰國打工腹殿, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人矢炼。 一個月前我還...
    沈念sama閱讀 46,358評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像阿纤,于是被迫代替她去往敵國和親句灌。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,514評論 2 348

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理欠拾,服務(wù)發(fā)現(xiàn)胰锌,斷路器,智...
    卡卡羅2017閱讀 134,633評論 18 139
  • 摘要 本節(jié)講解會話藐窄,會話狀態(tài)以及會話的創(chuàng)建资昧,針對源碼SessionTrackerImp進行展開,主要講解 會話 客...
    赤子心_d709閱讀 1,797評論 0 2
  • 1. Java基礎(chǔ)部分 基礎(chǔ)部分的順序:基本語法,類相關(guān)的語法荆忍,內(nèi)部類的語法格带,繼承相關(guān)的語法,異常的語法刹枉,線程的語...
    子非魚_t_閱讀 31,598評論 18 399
  • 原創(chuàng)分享叽唱!有能力的人不一定擁有銷講技能,但是擁有銷講技能的人定一是有能力的人微宝。所以(銷講技能>能力)銷講技能就是收...
    陳卓越閱讀 340評論 1 4
  • 目前棺亭,現(xiàn)在,我應(yīng)該處在工作中蟋软,享受著工作帶來的滿足感镶摘。。岳守。凄敢。,實際呢湿痢,很累涝缝,負面情緒滿滿,哎蒙袍,這兩天確實有些受打...
    clannad月閱讀 1,342評論 0 0