Tomcat學(xué)習(xí)筆記之NIO處理分析(二)

前言

前面已經(jīng)初步分析請(qǐng)求流程,下面我們繼續(xù)洗出。

Poller流程處理

從上一篇直到Acceptor接受到請(qǐng)求并注冊(cè)到Poller中的events緩存棧中静檬,下面來(lái)想起看一下Poller的處理流程宽档。

        public void run() {
            // Loop until destroy() is called
            while (true) {

                boolean hasEvents = false;

                try {
                    if (!close) {
                        //1. 更新PollerEvent隊(duì)列尉姨,主要執(zhí)行PollerEvent的run方法來(lái)更新selector感興趣的事件
                        hasEvents = events();
                        //2. 將wakeupCounter設(shè)置為-1,如果oldvalue>0雌贱,做非阻塞select;否則做超時(shí)的阻塞select啊送。其中wakeupCounter在#addEvent()時(shí)會(huì)加1
                        if (wakeupCounter.getAndSet(-1) > 0) {
                            // If we are here, means we have other stuff to do
                            // Do a non blocking select
                            keyCount = selector.selectNow();
                        } else {
                            keyCount = selector.select(selectorTimeout);
                        }
                        //3. wakeupCounter設(shè)置為0
                        wakeupCounter.set(0);
                    }
                    if (close) {
                        events();
                        timeout(0, false);
                        try {
                            selector.close();
                        } catch (IOException ioe) {
                            log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
                        }
                        break;
                    }
                } catch (Throwable x) {
                    ExceptionUtils.handleThrowable(x);
                    log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
                    continue;
                }
                //4. keyCount為0偿短,說(shuō)明沒有事件到來(lái),再執(zhí)行一次#events()
                if (keyCount == 0) hasEvents = (hasEvents | events());
                //5. 遍歷SelectionKey(事件已到來(lái)馋没,進(jìn)行后續(xù)處理)昔逗,進(jìn)行讀寫處理
                Iterator<SelectionKey> iterator =
                        keyCount > 0 ? selector.selectedKeys().iterator() : null;
                // Walk through the collection of ready keys and dispatch
                // any active event.
                while (iterator != null && iterator.hasNext()) {
                    SelectionKey sk = iterator.next();
                    NioSocketWrapper attachment = (NioSocketWrapper) sk.attachment();
                    // Attachment may be null if another thread has called
                    // cancelledKey()
                    if (attachment == null) {
                        iterator.remove();
                    } else {
                        iterator.remove();
                        //6. 提交到線程池中進(jìn)行處理處理
                        processKey(sk, attachment);
                    }
                }

                // Process timeouts
                timeout(keyCount, hasEvents);
            }

            getStopLatch().countDown();
        }
       public boolean events() {
            boolean result = false;

            PollerEvent pe = null;
            for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++) {
                result = true;
                try {
                    pe.run();
                    pe.reset();
                    if (running && !paused) {
                        eventCache.push(pe);
                    }
                } catch (Throwable x) {
                    log.error(sm.getString("endpoint.nio.pollerEventError"), x);
                }
            }

            return result;
        }

主要流程如下:

  • 調(diào)用#event()方法,更新selector感興趣的事件篷朵;
  • 執(zhí)行#selector.select()方法勾怒,檢測(cè)是否有事件到來(lái);
  • 將到來(lái)的事件提交至線程池進(jìn)行下一步處理声旺。
        protected void processKey(SelectionKey sk, NioSocketWrapper socketWrapper) {
            try {
                if (close) {
                    cancelledKey(sk);
                } else if (sk.isValid() && socketWrapper != null) {
                    if (sk.isReadable() || sk.isWritable()) {
                        if (socketWrapper.getSendfileData() != null) {
                            processSendfile(sk, socketWrapper, false);
                        } else {
                            //1. 在通道上注銷對(duì)已經(jīng)發(fā)生事件的關(guān)注
                            unreg(sk, socketWrapper, sk.readyOps());
                            boolean closeSocket = false;
                            // Read goes before write
                            if (sk.isReadable()) {
                                //2. 進(jìn)行異步IO的處理或者交給SocketProcessor處理讀操作
                                if (socketWrapper.readOperation != null) {
                                    getExecutor().execute(socketWrapper.readOperation);
                                } else if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
                                    closeSocket = true;
                                }
                            }
                            if (!closeSocket && sk.isWritable()) {
                                //3. 進(jìn)行異步IO的處理或者交給SocketProcessor處理寫操作
                                if (socketWrapper.writeOperation != null) {
                                    getExecutor().execute(socketWrapper.writeOperation);
                                } else if (!processSocket(socketWrapper, SocketEvent.OPEN_WRITE, true)) {
                                    closeSocket = true;
                                }
                            }
                            if (closeSocket) {
                                cancelledKey(sk);
                            }
                        }
                    }
                } else {
                    // Invalid key
                    cancelledKey(sk);
                }
            } catch (CancelledKeyException ckx) {
                cancelledKey(sk);
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.error(sm.getString("endpoint.nio.keyProcessingError"), t);
            }
        }
        //防止了通道對(duì)同一個(gè)事件不斷select的問(wèn)題
        protected void unreg(SelectionKey sk, NioSocketWrapper socketWrapper, int readyOps) {
            // This is a must, so that we don't have multiple threads messing with the socket
            reg(sk, socketWrapper, sk.interestOps() & (~readyOps));
        }
        
        protected void reg(SelectionKey sk, NioSocketWrapper socketWrapper, int intops) {
            sk.interestOps(intops);
            socketWrapper.interestOps(intops);
        }

這里主要注銷了對(duì)已經(jīng)發(fā)生事件的關(guān)注笔链,然后將具體的處理邏輯交給SocketProcessor來(lái)處理,后面會(huì)介紹腮猖。

工作線程流程處理

從上面了解到鉴扫,最后會(huì)調(diào)用#Poller.processSocket()方法,將處理邏輯交給SocketProcessor類澈缺,我們來(lái)看下:

public boolean processSocket(SocketWrapperBase<S> socketWrapper,
            SocketEvent event, boolean dispatch) {
        try {
            if (socketWrapper == null) {
                return false;
            }
            //1. 從緩存棧中獲取SocketProcessor坪创,無(wú)則創(chuàng)建否則重置SocketProcessor對(duì)象
            SocketProcessorBase<S> sc = processorCache.pop();
            if (sc == null) {
                sc = createSocketProcessor(socketWrapper, event);
            } else {
                sc.reset(socketWrapper, event);
            }
            //2. 獲取線程池,如果配置了線程池則將SocketProcessor提交到線程池中執(zhí)行姐赡,否則直接執(zhí)行SocketProcessor的run方法莱预。
            Executor executor = getExecutor();
            if (dispatch && executor != null) {
                executor.execute(sc);
            } else {
                sc.run();
            }
        } catch (RejectedExecutionException ree) {
            getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
            return false;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            getLog().error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;
    }

如果配置了線程池,則提交SocketProcessor線程至線程池中執(zhí)行项滑,否則直接執(zhí)行#SocketProcessor.run()依沮。下面來(lái)看下SocketProcessor類:

public abstract class SocketProcessorBase<S> implements Runnable {

    protected SocketWrapperBase<S> socketWrapper;
    //socket事件狀態(tài)
    protected SocketEvent event;

    public SocketProcessorBase(SocketWrapperBase<S> socketWrapper, SocketEvent event) {
        reset(socketWrapper, event);
    }


    public void reset(SocketWrapperBase<S> socketWrapper, SocketEvent event) {
        Objects.requireNonNull(event);
        this.socketWrapper = socketWrapper;
        this.event = event;
    }


    @Override
    public final void run() {
        synchronized (socketWrapper) {
            // It is possible that processing may be triggered for read and
            // write at the same time. The sync above makes sure that processing
            // does not occur in parallel. The test below ensures that if the
            // first event to be processed results in the socket being closed,
            // the subsequent events are not processed.
            if (socketWrapper.isClosed()) {
                return;
            }
            doRun();
        }
    }
    //子類實(shí)現(xiàn)
    protected abstract void doRun();
}
protected class SocketProcessor extends SocketProcessorBase<NioChannel> {

        public SocketProcessor(SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
            super(socketWrapper, event);
        }

        @Override
        protected void doRun() {
            NioChannel socket = socketWrapper.getSocket();
            SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());

            try {
                int handshake = -1;

                try {
                    if (key != null) {
                        //NioChannel默認(rèn)返回true,SecureNioChannel這里才需要處理
                        if (socket.isHandshakeComplete()) {
                            // No TLS handshaking required. Let the handler
                            // process this socket / event combination.
                            handshake = 0;
                        } else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
                                event == SocketEvent.ERROR) {
                            // Unable to complete the TLS handshake. Treat it as
                            // if the handshake failed.
                            handshake = -1;
                        } else {
                            //具體處理
                            handshake = socket.handshake(key.isReadable(), key.isWritable());
                            // The handshake process reads/writes from/to the
                            // socket. status may therefore be OPEN_WRITE once
                            // the handshake completes. However, the handshake
                            // happens when the socket is opened so the status
                            // must always be OPEN_READ after it completes. It
                            // is OK to always set this as it is only used if
                            // the handshake completes.
                            event = SocketEvent.OPEN_READ;
                        }
                    }
                } catch (IOException x) {
                    handshake = -1;
                    if (log.isDebugEnabled()) log.debug("Error during SSL handshake", x);
                } catch (CancelledKeyException ckx) {
                    handshake = -1;
                }
                if (handshake == 0) {
                    SocketState state = SocketState.OPEN;
                    //最關(guān)鍵的代碼枪狂,交給handler處理socket
                    if (event == null) {
                        state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
                    } else {
                        state = getHandler().process(socketWrapper, event);
                    }
                    if (state == SocketState.CLOSED) {
                        close(socket, key);
                    }
                } else if (handshake == -1) {
                    close(socket, key);
                //如果handshake返回的是SelectionKey.OP_READ危喉,注冊(cè)讀事件到Poller;如果返回的是SelectionKey.OP_WRITE摘完,注冊(cè)寫事件到Poller姥饰,進(jìn)行后續(xù)處理
                } else if (handshake == SelectionKey.OP_READ) {
                    socketWrapper.registerReadInterest();
                } else if (handshake == SelectionKey.OP_WRITE) {
                    socketWrapper.registerWriteInterest();
                }
            } catch (CancelledKeyException cx) {
                socket.getPoller().cancelledKey(key);
            } catch (VirtualMachineError vme) {
                ExceptionUtils.handleThrowable(vme);
            } catch (Throwable t) {
                log.error(sm.getString("endpoint.processing.fail"), t);
                socket.getPoller().cancelledKey(key);
            } finally {
                socketWrapper = null;
                event = null;
                //return to cache
                if (running && !paused) {
                    //處理結(jié)束后傻谁,將SocketProcessor重新放入緩存棧中
                    processorCache.push(this);
                }
            }
        }
    }

這里SocketEvent有OPEN_READ孝治、OPEN_WRITE、STOP审磁、TIMEOUT谈飒、DISCONNECT、ERROR六中狀態(tài)态蒂。SocketProcessor對(duì)象主要將socket交給Handler來(lái)處理請(qǐng)求杭措。

總結(jié)

到這里NIO的整個(gè)處理流程就大致清楚了,整體流程如下:


NIO整體處理流程

下面深入Handler來(lái)看一下Socket請(qǐng)求是如何轉(zhuǎn)換為Request對(duì)象钾恢,以及如何調(diào)用Servlet中的方法手素。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末鸳址,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子泉懦,更是在濱河造成了極大的恐慌稿黍,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,036評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件崩哩,死亡現(xiàn)場(chǎng)離奇詭異巡球,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)邓嘹,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,046評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門酣栈,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人汹押,你說(shuō)我怎么就攤上這事矿筝。” “怎么了棚贾?”我有些...
    開封第一講書人閱讀 164,411評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵跋涣,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我鸟悴,道長(zhǎng)陈辱,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,622評(píng)論 1 293
  • 正文 為了忘掉前任细诸,我火速辦了婚禮沛贪,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘震贵。我一直安慰自己利赋,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,661評(píng)論 6 392
  • 文/花漫 我一把揭開白布猩系。 她就那樣靜靜地躺著媚送,像睡著了一般。 火紅的嫁衣襯著肌膚如雪寇甸。 梳的紋絲不亂的頭發(fā)上塘偎,一...
    開封第一講書人閱讀 51,521評(píng)論 1 304
  • 那天,我揣著相機(jī)與錄音拿霉,去河邊找鬼吟秩。 笑死,一個(gè)胖子當(dāng)著我的面吹牛绽淘,可吹牛的內(nèi)容都是我干的涵防。 我是一名探鬼主播,決...
    沈念sama閱讀 40,288評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼沪铭,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼壮池!你這毒婦竟也來(lái)了偏瓤?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,200評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤椰憋,失蹤者是張志新(化名)和其女友劉穎硼补,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體熏矿,經(jīng)...
    沈念sama閱讀 45,644評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡已骇,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,837評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了票编。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片褪储。...
    茶點(diǎn)故事閱讀 39,953評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖慧域,靈堂內(nèi)的尸體忽然破棺而出鲤竹,到底是詐尸還是另有隱情,我是刑警寧澤昔榴,帶...
    沈念sama閱讀 35,673評(píng)論 5 346
  • 正文 年R本政府宣布辛藻,位于F島的核電站,受9級(jí)特大地震影響互订,放射性物質(zhì)發(fā)生泄漏吱肌。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,281評(píng)論 3 329
  • 文/蒙蒙 一仰禽、第九天 我趴在偏房一處隱蔽的房頂上張望氮墨。 院中可真熱鬧,春花似錦吐葵、人聲如沸规揪。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,889評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)猛铅。三九已至,卻和暖如春凤藏,著一層夾襖步出監(jiān)牢的瞬間奸忽,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,011評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工清笨, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留月杉,地道東北人刃跛。 一個(gè)月前我還...
    沈念sama閱讀 48,119評(píng)論 3 370
  • 正文 我出身青樓抠艾,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親桨昙。 傳聞我的和親對(duì)象是個(gè)殘疾皇子检号,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,901評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容