tomcat 8.x NioEndpoint之Acceptor組件淺析2

簡書 杭州_mina

tomcat 8.x NioEndpoint核心組件淺析1

1. Acceptor 淺析

   /**
     * The background thread that listens for incoming TCP/IP connections and
     * hands them off to an appropriate processor.
     */
    protected class Acceptor extends AbstractEndpoint.Acceptor {

        @Override
        public void run() {

            int errorDelay = 0;

            // Loop until we receive a shutdown command
            // 一直循環(huán)直到接收到關(guān)閉命令
            while (running) {

                // paused 只有在unbind的時候會設(shè)置
                while (paused && running) {
                    //變更Acceptor的狀態(tài)
                    state = AcceptorState.PAUSED;
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }
                
                if (!running) {
                    break;
                }
                // Acceptor 設(shè)置成running 
                state = AcceptorState.RUNNING;

                try {
                    //if we have reached max connections, wait
                    //連接數(shù)達(dá)到最大旨巷,暫停線程士骤。
                    //這里用到的是connectionLimitLatch鎖饿凛,可以理解為一個閉鎖
                    //我理解connectionLimitLatch和coundownlatch類似
                    //補(bǔ)充一點(diǎn)這個是先增加計數(shù)器南吮、如果超過最大連接數(shù)則減少計時器涨岁、然后線程暫停,你這個計時器就是當(dāng)前連接數(shù)
                    //程序的下面我會簡單的講一下這個方法的實(shí)現(xiàn)
                    countUpOrAwaitConnection();

                    SocketChannel socket = null;
                    try {
                        // Accept the next incoming connection from the server
                        // socket
                        //調(diào)用serversocket的accept方法
                        socket = serverSock.accept();
                    } catch (IOException ioe) {
                        // We didn't get a socket
                        // 出錯了要減去連接數(shù)
                        countDownConnection();
                        if (running) {
                            // Introduce delay if necessary
                            errorDelay = handleExceptionWithDelay(errorDelay);
                            // re-throw
                            throw ioe;
                        } else {
                            break;
                        }
                    }
                    // Successful accept, reset the error delay
                    errorDelay = 0;

                    // Configure the socket
                    if (running && !paused) {
                        // setSocketOptions() will hand the socket off to
                        // an appropriate processor if successful
                        // 把socket扔到poller中
                        if (!setSocketOptions(socket)) {
                            // 如果加入poller失敗關(guān)閉連接
                            closeSocket(socket);
                        }
                    } else {
                        closeSocket(socket);
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    log.error(sm.getString("endpoint.accept.fail"), t);
                }
            }
            state = AcceptorState.ENDED;
        }
    }

下面來看setSocketOptions方法中如何把SocketChannel加入到poller中

protected boolean setSocketOptions(SocketChannel socket) {
        // Process the connection
        try {
            //disable blocking, APR style, we are gonna be polling it
            socket.configureBlocking(false); //設(shè)置成非阻塞
            //獲取socket
            Socket sock = socket.socket();
           //配置socket信息
            socketProperties.setProperties(sock);
            //創(chuàng)建一個NioChannel 他封裝了SocketChannel
            NioChannel channel = nioChannels.pop();
            if (channel == null) {
                //如果為null 創(chuàng)建一個NioChannel 這里使用系統(tǒng)內(nèi)存
               //使用系統(tǒng)內(nèi)存可以省去一步從系統(tǒng)內(nèi)存拷貝到堆內(nèi)存的動作猜煮、性能上會有很大的提升拗胜,nioChannels初始化默認(rèn)為128個 
               //當(dāng)socket 關(guān)閉的重新清理NioChannel而不是銷毀這個對象可以達(dá)到對象復(fù)用的效果、因為申請系統(tǒng)內(nèi)存的開銷比申請堆內(nèi)存的開銷要大很多
                SocketBufferHandler bufhandler = new SocketBufferHandler(
                        socketProperties.getAppReadBufSize(),
                        socketProperties.getAppWriteBufSize(),
                        socketProperties.getDirectBuffer());
                if (isSSLEnabled()) {
                    channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
                } else {
                    channel = new NioChannel(socket, bufhandler);
                }
            } else {
                //如果不為null設(shè)置SocketChannel
                channel.setIOChannel(socket);
                //將channle復(fù)位 以上就是重置系統(tǒng)內(nèi)存把指針重新定位到buff的開始位置
                channel.reset();
            }
            //丟入poller隊列中
            getPoller0().register(channel);
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            try {
                log.error("",t);
            } catch (Throwable tt) {
                ExceptionUtils.handleThrowable(tt);
            }
            // Tell to close the socket
            return false;
        }
        return true;
    }

最后一步register方法

 public void register(final NioChannel socket) {
             //設(shè)置poller 對象
            socket.setPoller(this); 
            //設(shè)置一個附件待會把這個NioSocketWrapper注冊到poller對象中的selector上去           
            NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);//獲取
            socket.setSocketWrapper(ka);
            ka.setPoller(this);
            ka.setReadTimeout(getSocketProperties().getSoTimeout());
            ka.setWriteTimeout(getSocketProperties().getSoTimeout());
            ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
            ka.setSecure(isSSLEnabled());
            ka.setReadTimeout(getSoTimeout());
            ka.setWriteTimeout(getSoTimeout());
            PollerEvent r = eventCache.pop();
            ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
            //生成一個PollerEvent對象 這個對象繼承了Runnable
            //這個對象的主要作用就是把NioSocketWrapper注冊到poller對象中的selector上去           
            //還有就是獲取SelectionKey 該對象是用于跟蹤這些被注冊事件的句柄
            if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
            else r.reset(socket,ka,OP_REGISTER);
            //把pollerEvent放入到poller的隊列中
           addEvent(r);
        }

2. LimitLatch 淺析

  • 2.1 AbstractEndpoint 封裝了LimitLatch一些方法
    //默認(rèn)最大連接數(shù)為10000
    private int maxConnections = 10000;
    //獲取最大連接數(shù)
    public int  getMaxConnections() {
        return this.maxConnections;
    }
    //初始化閉鎖
    protected LimitLatch initializeConnectionLatch() {
        if (maxConnections==-1) return null;
        if (connectionLimitLatch==null) {
            //
            connectionLimitLatch = new LimitLatch(getMaxConnections());
        }
        return connectionLimitLatch;
    }
    //將會釋放所有的線程
    protected void releaseConnectionLatch() {
        LimitLatch latch = connectionLimitLatch;
        if (latch!=null) latch.releaseAll();
        connectionLimitLatch = null;
    }
    //增加計數(shù)辆沦,如果太大昼捍,那么等待
    protected void countUpOrAwaitConnection() throws InterruptedException {
        if (maxConnections==-1) return;
        LimitLatch latch = connectionLimitLatch;
        if (latch!=null) latch.countUpOrAwait();
    }
    //減少計數(shù)
    protected long countDownConnection() {
        if (maxConnections==-1) return -1;
        LimitLatch latch = connectionLimitLatch;
        if (latch!=null) {
            long result = latch.countDown();
            if (result<0) {
                getLog().warn(sm.getString("endpoint.warn.incorrectConnectionCount"));
            }
            return result;
        } else return -1
    }
  • 2.2 LimitLatch 源碼淺析
/*
 *  Licensed to the Apache Software Foundation (ASF) under one or more
 *  contributor license agreements.  See the NOTICE file distributed with
 *  this work for additional information regarding copyright ownership.
 *  The ASF licenses this file to You under the Apache License, Version 2.0
 *  (the "License"); you may not use this file except in compliance with
 *  the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */
package org.apache.tomcat.util.threads;

import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/**
 * Shared latch that allows the latch to be acquired a limited number of times
 * after which all subsequent requests to acquire the latch will be placed in a
 * FIFO queue until one of the shares is returned.
 */
public class LimitLatch {

    private static final Log log = LogFactory.getLog(LimitLatch.class);
     //構(gòu)建sync對象、主要用來同步肢扯、阻塞兩個功能
    private class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1L;

        public Sync() {
        }
        @Override
        protected int tryAcquireShared(int ignored) {
             //增加計數(shù)器
            long newCount = count.incrementAndGet();
            //如果計數(shù)器大于最大limit 則計數(shù)器減一 返回-1 否則返回 1 
            //這里的limit 其實(shí)就是maxConnections
            if (!released && newCount > limit) {
                // Limit exceeded
                count.decrementAndGet();
                return -1;
            } else {
                return 1;
            }
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            //計數(shù)器減一
            count.decrementAndGet();
            return true;
        }
    }

    private final Sync sync;
    //計數(shù)器
    private final AtomicLong count;
    //最大連接數(shù)
    private volatile long limit;
    //是否全部釋放
    private volatile boolean released = false;

    /**
     * Instantiates a LimitLatch object with an initial limit.
     *
     * @param limit - maximum number of concurrent acquisitions of this latch
     */
    public LimitLatch(long limit) {
        this.limit = limit;
        this.count = new AtomicLong(0);
        this.sync = new Sync();
    }

    /**
     * Returns the current count for the latch
     *
     * @return the current count for latch
     */
    //獲取當(dāng)前計數(shù)器
    public long getCount() {
        return count.get();
    }

    /**
     * Obtain the current limit.
     *
     * @return the limit
     */
   //獲取最大連接數(shù)
    public long getLimit() {
        return limit;
    }


    /**
     * Sets a new limit. If the limit is decreased there may be a period where
     * more shares of the latch are acquired than the limit. In this case no
     * more shares of the latch will be issued until sufficient shares have been
     * returned to reduce the number of acquired shares of the latch to below
     * the new limit. If the limit is increased, threads currently in the queue
     * may not be issued one of the newly available shares until the next
     * request is made for a latch.
     *
     * @param limit The new limit
     */
   //設(shè)置最大連接數(shù)
    public void setLimit(long limit) {
        this.limit = limit;
    }


    /**
     * Acquires a shared latch if one is available or waits for one if no shared
     * latch is current available.
     *
     * @throws InterruptedException If the current thread is interrupted
     */
    //增加計數(shù)器如果超過最大連接數(shù)妒茬、則等待并且計數(shù)器減一
    public void countUpOrAwait() throws InterruptedException {
        if (log.isDebugEnabled()) {
            log.debug("Counting up[" + Thread.currentThread().getName() + "] latch=" + getCount());
        }
        sync.acquireSharedInterruptibly(1);
    }

    /**
     * Releases a shared latch, making it available for another thread to use.
     *
     * @return the previous counter value
     */
    //減少計數(shù)器其實(shí)就是減少連接數(shù)
    public long countDown() {
        sync.releaseShared(0);
        long result = getCount();
        if (log.isDebugEnabled()) {
            log.debug("Counting down[" + Thread.currentThread().getName() + "] latch=" + result);
        }
        return result;
    }

    /**
     * Releases all waiting threads and causes the {@link #limit} to be ignored
     * until {@link #reset()} is called.
     *
     * @return <code>true</code> if release was done
     */
    //釋放所有線程
    public boolean releaseAll() {
        released = true;
        return sync.releaseShared(0);
    }

    /**
     * Resets the latch and initializes the shared acquisition counter to zero.
     *
     * @see #releaseAll()
     */
    //重置計數(shù)器
    public void reset() {
        this.count.set(0);
        released = false;
    }

    /**
     * Returns <code>true</code> if there is at least one thread waiting to
     * acquire the shared lock, otherwise returns <code>false</code>.
     *
     * @return <code>true</code> if threads are waiting
     */
    //是否有等待線程
    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    /**
     * Provide access to the list of threads waiting to acquire this limited
     * shared latch.
     *
     * @return a collection of threads
     */
    //獲取所有等待線程
    public Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }
}

3.總結(jié)Acceptor流程

雖然看上去代碼有點(diǎn)復(fù)雜,但是實(shí)際上就是一句話概括蔚晨。獲取socket乍钻、并且對socket進(jìn)行封裝肛循,扔到Poller的隊列中。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末银择,一起剝皮案震驚了整個濱河市多糠,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌浩考,老刑警劉巖夹孔,帶你破解...
    沈念sama閱讀 212,542評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異析孽,居然都是意外死亡搭伤,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,596評論 3 385
  • 文/潘曉璐 我一進(jìn)店門袜瞬,熙熙樓的掌柜王于貴愁眉苦臉地迎上來怜俐,“玉大人,你說我怎么就攤上這事邓尤∨睦穑” “怎么了?”我有些...
    開封第一講書人閱讀 158,021評論 0 348
  • 文/不壞的土叔 我叫張陵汞扎,是天一觀的道長季稳。 經(jīng)常有香客問我,道長佩捞,這世上最難降的妖魔是什么绞幌? 我笑而不...
    開封第一講書人閱讀 56,682評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮一忱,結(jié)果婚禮上莲蜘,老公的妹妹穿的比我還像新娘。我一直安慰自己帘营,他們只是感情好票渠,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,792評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著芬迄,像睡著了一般问顷。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上禀梳,一...
    開封第一講書人閱讀 49,985評論 1 291
  • 那天杜窄,我揣著相機(jī)與錄音,去河邊找鬼算途。 笑死塞耕,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的嘴瓤。 我是一名探鬼主播扫外,決...
    沈念sama閱讀 39,107評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼莉钙,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了筛谚?” 一聲冷哼從身側(cè)響起磁玉,我...
    開封第一講書人閱讀 37,845評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎驾讲,沒想到半個月后蚊伞,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,299評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡蝎毡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,612評論 2 327
  • 正文 我和宋清朗相戀三年厚柳,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片沐兵。...
    茶點(diǎn)故事閱讀 38,747評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖便监,靈堂內(nèi)的尸體忽然破棺而出扎谎,到底是詐尸還是另有隱情,我是刑警寧澤烧董,帶...
    沈念sama閱讀 34,441評論 4 333
  • 正文 年R本政府宣布毁靶,位于F島的核電站,受9級特大地震影響逊移,放射性物質(zhì)發(fā)生泄漏预吆。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,072評論 3 317
  • 文/蒙蒙 一胳泉、第九天 我趴在偏房一處隱蔽的房頂上張望拐叉。 院中可真熱鬧,春花似錦扇商、人聲如沸凤瘦。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,828評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蔬芥。三九已至,卻和暖如春控汉,著一層夾襖步出監(jiān)牢的瞬間笔诵,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,069評論 1 267
  • 我被黑心中介騙來泰國打工姑子, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留乎婿,地道東北人。 一個月前我還...
    沈念sama閱讀 46,545評論 2 362
  • 正文 我出身青樓壁酬,卻偏偏與公主長得像次酌,于是被迫代替她去往敵國和親恨课。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,658評論 2 350

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理岳服,服務(wù)發(fā)現(xiàn)剂公,斷路器,智...
    卡卡羅2017閱讀 134,637評論 18 139
  • 說明本次redis集群安裝在rhel6.8 64位機(jī)器上吊宋,redis版本為3.2.8纲辽,redis的gem文件版本為...
    讀或?qū)?/span>閱讀 14,660評論 3 9
  • ¥開啟¥ 【iAPP實(shí)現(xiàn)進(jìn)入界面執(zhí)行逐一顯】 〖2017-08-25 15:22:14〗 《//首先開一個線程,因...
    小菜c閱讀 6,375評論 0 17
  • 先來說說我昨天做的夢吧璃搜,昨天晚上一直做夢坐電梯拖吼,騰云駕霧,飛來飛去这吻,毫無約束吊档。突然嗖的一下,我的眼睛就睜開了唾糯。打開...
    懷瑾姑娘閱讀 417評論 0 1
  • 今天離寶寶出生還有27天怠硼,三伏天持續(xù)高溫,腹中又揣著小火爐移怯,簡直就是太痛苦了香璃,不開空調(diào)睡不著,長時間開空調(diào)又由于空...
    小七碎碎念閱讀 131評論 0 0