jetty-server高性能,多線程特性的源碼分析

一. 本文闡述內容的著眼點:

闡述jetty-server作為一個高性能的隐轩,吞吐量比較高的http服務它后面采用了怎樣的技術和多線程手段提升它的吞吐量和高性能的绵脯。主要分三塊:
1.server實例化:new Server();
2.啟動Server:server.start();
3.等待請求某些http的請求并處理(簡略闡述)底循;

二. new Server():

/* ------------------------------------------------------------ */
    /** Convenience constructor
     * Creates server and a {@link ServerConnector} at the passed port.
     * @param port The port of a network HTTP connector (or 0 for a randomly allocated port).
     * @see NetworkConnector#getLocalPort()
     */
    public Server(@Name("port")int port)
    {
        //1.初始化線程池
        this((ThreadPool)null);
        //2.初始化ServerConnector
        ServerConnector connector=new ServerConnector(this);
        //3.設置port
        connector.setPort(port);
        //4.關聯(lián)Server和Connector
        setConnectors(new Connector[]{connector});
    }

1.初始化線程池:this((ThreadPool)null)

像jetty這樣一個服務端的容器,后臺不會使用new 一個線程,new 一個線程不是一件很靠譜的事情莉测。一定是使用線程池的。同時jetty并沒有使用jdk的線程池唧喉,而是自己實現(xiàn)了一個線程池--QueuedThreadPool捣卤。

public Server(@Name("threadpool") ThreadPool pool)
 {
        _threadPool=pool!=null?pool:new QueuedThreadPool();
        addBean(_threadPool);
        setServer(this);
 }

QueuedThreadPool實現(xiàn)了一個SizedThreadPool。

@ManagedObject("A thread pool")
public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Dumpable

QueuedThreadPool關鍵是executor方法:
execute僅僅把job放入隊列_jobs里八孝。_jobs是一個BlockingQueue的接口董朝,保存所有要執(zhí)行的任務。BlockingQueue是一個性能不高的隊列干跛,因為這里execute執(zhí)行頻率不是很高子姜,offer和poll就不是很高。如果execute執(zhí)行頻率很高楼入,那么這里就有優(yōu)化的空間了哥捕。

public void execute(Runnable job)
    {
        if (LOG.isDebugEnabled())
            LOG.debug("queue {}",job);
        if (!isRunning() || !_jobs.offer(job))
        {
            LOG.warn("{} rejected {}", this, job);
            throw new RejectedExecutionException(job.toString());
        }
        else
        {
            // Make sure there is at least one thread executing the job.
            if (getThreads() == 0)
                startThreads(1);
        }
    }

默認用類jetty自己實現(xiàn)的BlockingArrayQueue:
默認調用:

 public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("queue") BlockingQueue<Runnable> queue, @Name("threadGroup") ThreadGroup threadGroup)
    {
        setMinThreads(minThreads);
        setMaxThreads(maxThreads);
        setIdleTimeout(idleTimeout);
        setStopTimeout(5000);

        if (queue==null)
        {
            int capacity=Math.max(_minThreads, 8);
            queue=new BlockingArrayQueue<>(capacity, capacity);
        }
        _jobs=queue;
        _threadGroup=threadGroup;
    }

jetty自己實現(xiàn)的BlockingArrayQueue,雖然做了一些優(yōu)化還是用的阻塞浅辙,性能不是太高:

public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E>

2.初始化ServerConnector:

ServerConnector表示服務端的一個連接扭弧。處理http連接以及nio,channel,selector等。繼承AbstractConnector

@ManagedObject("HTTP connector using NIO ByteChannels and Selectors")
public class ServerConnector extends AbstractNetworkConnector

@ManagedObject("AbstractNetworkConnector")
public abstract class AbstractNetworkConnector extends AbstractConnector implements NetworkConnector

分四步:
1.初始化ScheduledExecutorScheduler
2.初始化ByteBufferPool
3.維護ConnectionFactory
4.取得可用CPU數(shù)量
5.更新acceptor數(shù)量
6.創(chuàng)建acceptor線程組
7.初始化ServerConnectorManager

public AbstractConnector(
            Server server,
            Executor executor,
            Scheduler scheduler,
            ByteBufferPool pool,
            int acceptors,
            ConnectionFactory... factories)
    {
        _server=server;
        _executor=executor!=null?executor:_server.getThreadPool();
//1.初始化ScheduledExecutorScheduler
        if (scheduler==null)
            scheduler=_server.getBean(Scheduler.class);
        _scheduler=scheduler!=null?scheduler:new ScheduledExecutorScheduler();
       //2.初始化ByteBufferPool
        if (pool==null)
            pool=_server.getBean(ByteBufferPool.class);
        _byteBufferPool = pool!=null?pool:new ArrayByteBufferPool();

        addBean(_server,false);
        addBean(_executor);
        if (executor==null)
            unmanage(_executor); // inherited from server
        addBean(_scheduler);
        addBean(_byteBufferPool);
        //3.維護ConnectionFactory
        for (ConnectionFactory factory:factories)
            addConnectionFactory(factory);
       //4.取得可用CPU數(shù)量
        int cores = Runtime.getRuntime().availableProcessors();
       //5.更新acceptor數(shù)量
        if (acceptors < 0)
            acceptors=Math.max(1, Math.min(4,cores/8));
        if (acceptors > cores)
            LOG.warn("Acceptors should be <= availableProcessors: " + this);
       //6.創(chuàng)建acceptor線程組
        _acceptors = new Thread[acceptors];
    }

1)初始化ScheduledExecutorScheduler:

jetty里有些任務是要隔一段時間執(zhí)行一次的记舆。比如隔一段時間要檢查什么東西的狀態(tài)鸽捻。

2)初始化ByteBufferPool:

當jetty sever接受到大量Http請求(底層是tcp請求),需要初始化ByteBuffer,然后讀取或放入channel中御蒲,這樣太頻繁的話會降低性能衣赶。ByteBufferPool里面放的都是ByteBuffer對象,可重用厚满,減少GC,減少對象的產生府瞄。new不會消耗性能(因為new是jvm性能非常高的操作),主要是對象的回收碘箍,特別是新生代的回收遵馆,GC太消耗jvm。
但是丰榴,不能隨便設計一個多個線程獲取資源的Pool货邓,因為Pool的特性是線程安全,如果用低效的阻塞(如synchronized)去實現(xiàn)線程安全四濒,這樣還不如直接new對象然后交給GC回收换况。用低效的阻塞(如synchronized)做的對象池的性能是非常差的。
所以要想高效設計對象pool,必須是線程安全而且無鎖的盗蟆。

ByteBufferPool和一般的對象池還是有區(qū)別的戈二,一般的對象池里的對象任何一個都一樣,比如連接池。ByteBufferPool不同,所以比普通對象池稍微復雜一些坎怪。因為請求有可能用1k的ByteBuffer,2k的ByteBuffer亏栈,10k的ByteBuffer,2M的ByteBuffer宏赘。而且不可能為每一個容量都存N份绒北。
ByteBufferPool在jetty里有很多的實現(xiàn)類,現(xiàn)在分析一下默認的實現(xiàn)類ArrayByteBufferPool:

    public ArrayByteBufferPool(int minSize, int increment, int maxSize, int maxQueue)
    {
        if (minSize<=0)
            minSize=0;
        if (increment<=0)
            increment=1024;
        if (maxSize<=0)
            maxSize=64*1024;
        if (minSize>=increment)
            throw new IllegalArgumentException("minSize >= increment");
        if ((maxSize%increment)!=0 || increment>=maxSize)
            throw new IllegalArgumentException("increment must be a divisor of maxSize");
        _min=minSize;
        _inc=increment;
//一種大小對于一個Bucket,所以需要maxSize/increment=64個   //Bucket
        //直接內存Bucket數(shù)組
        _direct=new ByteBufferPool.Bucket[maxSize/increment];
        //堆內存Bucket數(shù)組
        _indirect=new ByteBufferPool.Bucket[maxSize/increment];
        _maxQueue=maxQueue;

        int size=0;
        for (int i=0;i<_direct.length;i++)
        {
            size+=_inc;
            //創(chuàng)建每個大小的Bucket察署,并放到Bucket數(shù)組里闷游。
            _direct[i]=new ByteBufferPool.Bucket(this,size,_maxQueue);
            _indirect[i]=new ByteBufferPool.Bucket(this,size,_maxQueue);
        }
    }

minSize:最小大小
increment:每次增加的大小。
maxSize:最大大小贴汪。
例如: minSize=0;increment=1024;maxSize=64*1024;
這樣會有64種大小脐往。

存儲ByteBuffer的結構:Bucket:

class Bucket
    {
        private final Deque<ByteBuffer> _queue = new ConcurrentLinkedDeque<>();
        private final ByteBufferPool _pool;
        private final int _capacity;
        private final AtomicInteger _space;

重要的參數(shù):
_capacity,表示Bucket存放ByteBuffer個數(shù)扳埂。
_queue业簿,表示實際存放ByteBuffer的容器,ConcurrentLinkedDeque (無鎖的實現(xiàn)阳懂,并發(fā)性很好)梅尤。
初始化ArrayByteBufferPool時會創(chuàng)建每個大小的Bucket柜思,并放到Bucket數(shù)組( _direct,_indirect)里。但是_queue里的內容是延遲加載的巷燥。

acquire():向ArrayByteBufferPool池中申請ByteBuffer赡盘。

//size:需要多大的ByteBuffer; direct:是否是直接內存
 public ByteBuffer acquire(int size, boolean direct)
    {
       // _direct[]或_indirect[]在取得合適的Bucket,因為是懶加載,
       //所以有可能是空的
        ByteBufferPool.Bucket bucket = bucketFor(size,direct);
        if (bucket==null)
            return newByteBuffer(size,direct);//null缰揪,就新建
       //如果在 _direct[]或_indirect[]找到合適的bucket陨享,
       //就在bucket內部隊列中取得一個ByteBuffer
        return bucket.acquire(direct);
    }

release():

public void release(ByteBuffer buffer)
    {
        if (buffer!=null)
        {    
            //尋找合適的Bucket
            ByteBufferPool.Bucket bucket = bucketFor(buffer.capacity(),buffer.isDirect());
            if (bucket!=null)
                //放到找到的Bucket的內部隊列中
                bucket.release(buffer);
        }
    }

例外處理:
如何一個線程想要128k的bytebuffer,按acquire()會申請一個128k的,但是release時钝腺,不會歸還回去抛姑,這樣128k不進入Bucket的內部隊列。

3)維護ConnectionFactory:

ConnectionFactory用來創(chuàng)建連接對象艳狐,如HttpConnectionFactory途戒。

4)取得可用CPU數(shù)量:

int cores = Runtime.getRuntime().availableProcessors();

5)更新acceptor線程數(shù)量:

acceptor指有多少線程去做處理客戶端請求的工作。
有幾個線程做acceptors:acceptors原則上不會超過四個僵驰。
服務端設計的經驗,

   if (acceptors < 0)
       acceptors=Math.max(1, Math.min(4,cores/8));`

6)創(chuàng)建acceptor線程組:

_acceptors = new Thread[acceptors];

7)初始化ServerConnectorManager:

ServerConnectorManager繼承SelectorManager唁毒,用來管理selector的管理器蒜茴。
selector是NIO中的選擇器,管理channel狀態(tài)的浆西。
_manager = newSelectorManager(getExecutor(), getScheduler(),selectors);

selector的線程個數(shù):
Math.max(1,Math.min(cpus/2,threads/16));

3.設置port:

設置sever的port.

4.關聯(lián)Server和Connector:

 setConnectors(new Connector[]{connector});

3.Server.start():

org.eclipse.jetty.util.component.AbstractLifeCycle:

public final void start() throws Exception
    {
        synchronized (_lock)
        {
            try
            {
                if (_state == __STARTED || _state == __STARTING)
                    return;
               //1.設置啟動狀態(tài)
                setStarting();
               //2.啟動過程doStart()
                doStart();
                //3.啟動完畢
                setStarted();
            }
            catch (Throwable e)
            {
                setFailed(e);
                throw e;
            }
        }
    }

1.設置啟動狀態(tài):

private void setStarting()
    {
        if (LOG.isDebugEnabled())
            LOG.debug("starting {}",this);
        _state = __STARTING;
        for (Listener listener : _listeners)
            listener.lifeCycleStarting(this);
    }

2.啟動過程doStart():

包括Server在內粉私,很多jetty對象,包括Connector等近零,都實現(xiàn)了一個LifeCycle這個接口诺核。LifeCycle表示有一個生命周期的對象,包括一些方法start(),stop()等久信,一個對象的生老病死都在里面窖杀。因為像Server,Connector這樣的對象都有生老病死這樣一個周期,所以類似有聲明周期的對象都會實現(xiàn)這樣一個LifeCycle的接口裙士。

    @Override
    protected void doStart() throws Exception
    {
        // Create an error handler if there is none
        if (_errorHandler==null)
            _errorHandler=getBean(ErrorHandler.class);
        if (_errorHandler==null)
            setErrorHandler(new ErrorHandler());
        if (_errorHandler instanceof ErrorHandler.ErrorPageMapper)
            LOG.warn("ErrorPageMapper not supported for Server level Error Handling");
        _errorHandler.setServer(this);
        
        //If the Server should be stopped when the jvm exits, register
        //with the shutdown handler thread.
        if (getStopAtShutdown())
            ShutdownThread.register(this);

        //Register the Server with the handler thread for receiving
        //remote stop commands
        //1.注冊ShutdownMonitor,用來遠程停止Server
        ShutdownMonitor.register(this);

        //Start a thread waiting to receive "stop" commands.
        ShutdownMonitor.getInstance().start(); // initialize

        LOG.info("jetty-" + getVersion());
        if (!Jetty.STABLE)
        {
            LOG.warn("THIS IS NOT A STABLE RELEASE! DO NOT USE IN PRODUCTION!");
            LOG.warn("Download a stable release from http://download.eclipse.org/jetty/");
        }
        
        HttpGenerator.setJettyVersion(HttpConfiguration.SERVER_VERSION);

        // Check that the thread pool size is enough.
        //2.拿到線程池入客,在前面實例化的時候就已經初始化完了。
        SizedThreadPool pool = getBean(SizedThreadPool.class);
        int max=pool==null?-1:pool.getMaxThreads();
        int selectors=0;
        int acceptors=0;

        for (Connector connector : _connectors)
        {
            if (connector instanceof AbstractConnector)
            {
                AbstractConnector abstractConnector = (AbstractConnector)connector;
                Executor connectorExecutor = connector.getExecutor();

                if (connectorExecutor != pool)
                {
                    // Do not count the selectors and acceptors from this connector at
                    // the server level, because the connector uses a dedicated executor.
                    continue;
                }

                acceptors += abstractConnector.getAcceptors();

                if (connector instanceof ServerConnector)
                {
                    // The SelectorManager uses 2 threads for each selector,
                    // one for the normal and one for the low priority strategies.
                    selectors += 2 * ((ServerConnector)connector).getSelectorManager().getSelectorCount();
                }
            }
        }

        int needed=1+selectors+acceptors;
        if (max>0 && needed>max)
            throw new IllegalStateException(String.format("Insufficient threads: max=%d < needed(acceptors=%d + selectors=%d + request=1)",max,acceptors,selectors));

        MultiException mex=new MultiException();
        try
        {
            super.doStart();
        }
        catch(Throwable e)
        {
            mex.add(e);
        }

        // start connectors last
        for (Connector connector : _connectors)
        {
            try
            {
                connector.start();
            }
            catch(Throwable e)
            {
                mex.add(e);
            }
        }

        if (isDumpAfterStart())
            dumpStdErr();

        mex.ifExceptionThrow();

        LOG.info(String.format("Started @%dms",Uptime.getUptime()));
    }

1.ShutdownMonitor:

    單啟動一個線程去支持遠程關閉Server
   ```
   //remote stop commands
    ShutdownMonitor.register(this);

    //Start a thread waiting to receive "stop" commands.
    ShutdownMonitor.getInstance().start(); // initialize

###2.拿到線程池:
        拿到線程池腿椎,在前面實例化的時候就已經初始化完了桌硫。
        jetty實現(xiàn)了自己的一套管理體系:
        ```
        // Check that the thread pool size is enough.
        SizedThreadPool pool = getBean(SizedThreadPool.class);
        int max=pool==null?-1:pool.getMaxThreads();
        int selectors=0;
        int acceptors=0;
       ```
###3.計算selector數(shù)量:

累計所有connector下的selector,累加:
最后needed=1+selectors+acceptors;  needed是一共所需線程數(shù):
但大于兩百終止程序(如果有200個線程去做selector,和acceptor了,系統(tǒng)認為跑不起來了啃炸。)

int max=pool==null?-1:pool.getMaxThreads();
int selectors=0;
int acceptors=0;

    for (Connector connector : _connectors)
    {
        if (connector instanceof AbstractConnector)
        {
            AbstractConnector abstractConnector = (AbstractConnector)connector;
            Executor connectorExecutor = connector.getExecutor();

            if (connectorExecutor != pool)
            {
                // Do not count the selectors and acceptors from this connector at
                // the server level, because the connector uses a dedicated executor.
                continue;
            }

            acceptors += abstractConnector.getAcceptors();

            if (connector instanceof ServerConnector)
            {
                // The SelectorManager uses 2 threads for each selector,
                // one for the normal and one for the low priority strategies.
                selectors += 2 * ((ServerConnector)connector).getSelectorManager().getSelectorCount();
            }
        }
    }

//共需要的線程數(shù):

int needed=1+selectors+acceptors;
大于兩百終止程序:
        if (max>0 && needed>max)
            throw new IllegalStateException(String.format("Insufficient threads: max=%d < needed(acceptors=%d + selectors=%d + request=1)",max,acceptors,selectors));

4.管理bean铆隘,并開始執(zhí)行:

QueuedThreadPool實現(xiàn)了LifeCycle南用,所以在這里執(zhí)行

/**
     * Starts the managed lifecycle beans in the order they were added.
     */
    @Override
    protected void doStart() throws Exception
    {
        if (_destroyed)
            throw new IllegalStateException("Destroyed container cannot be restarted");

        // indicate that we are started, so that addBean will start other beans added.
        _doStarted = true;

        // start our managed and auto beans
        for (Bean b : _beans)
        {
            if (b._bean instanceof LifeCycle)
            {
                LifeCycle l = (LifeCycle)b._bean;
                switch(b._managed)
                {
                    case MANAGED:
                        if (!l.isRunning())
                            start(l);
                        break;
                    case AUTO:
                        if (l.isRunning())
                            unmanage(b);
                        else
                        {
                            manage(b);
                            start(l);
                        }
                        break;
                }
            }
        }

        super.doStart();
    }

QueuedThreadPool的啟動:

@Override
    protected void doStart() throws Exception
    {
        super.doStart();
        _threadsStarted.set(0);
   //啟動若干線程:創(chuàng)建線程托修,設置線程的屬性砚嘴,啟動線程际长。
        startThreads(_minThreads);
    }
//開始線程
private boolean startThreads(int threadsToStart)
    {
        while (threadsToStart > 0 && isRunning())
        {
            int threads = _threadsStarted.get();
            if (threads >= _maxThreads)
                return false;

            if (!_threadsStarted.compareAndSet(threads, threads + 1))
                continue;

            boolean started = false;
            try
            {
                //創(chuàng)建線程:用的是_runnable(實現(xiàn)了Runnable接口)
                Thread thread = newThread(_runnable);
                //設置線程的屬性
                thread.setDaemon(isDaemon());
                thread.setPriority(getThreadsPriority());
                thread.setName(_name + "-" + thread.getId());
                _threads.add(thread);
                //啟動線程
                thread.start();
                started = true;
                --threadsToStart;
            }
            finally
            {
                if (!started)
                    _threadsStarted.decrementAndGet();
            }
        }
        return true;
    }

//_runnable詳情:用死循環(huán)不斷地_jobs.poll():這里是啟動線程后動_jobs里面取任務了搓彻。對于與前面的executor方法

while (job != null && isRunning())
                   {
                       if (LOG.isDebugEnabled())
                           LOG.debug("run {}",job);
                       //真正的讓線程run旭贬。
                       runJob(job);
                       if (LOG.isDebugEnabled())
                           LOG.debug("ran {}",job);
                       if (Thread.interrupted())
                       {
                           ignore=true;
                           break loop;
                       }
                       job = _jobs.poll();
                   }

5.啟動連接:

1.獲取ConnectionFactory
2.創(chuàng)建Selector并啟動程序
3.創(chuàng)建Acceptor線程扼脐。

       // start connectors last
        for (Connector connector : _connectors)
        {
            try
            {
               //開始
                connector.start();
            }
            catch(Throwable e)
            {
                mex.add(e);
            }
       }
####1.獲取ConnectionFactory
//org.eclipse.jetty.server.AbstractConnector
protected void doStart() throws Exception
    {
        if(_defaultProtocol==null)
            throw new IllegalStateException("No default protocol for "+this);
//取得ConnectionFactory:
        _defaultConnectionFactory = getConnectionFactory(_defaultProtocol);
        if(_defaultConnectionFactory==null)
            throw new IllegalStateException("No protocol factory for default protocol '"+_defaultProtocol+"' in "+this);
        SslConnectionFactory ssl = getConnectionFactory(SslConnectionFactory.class);
        if (ssl != null)
        {
            String next = ssl.getNextProtocol();
            ConnectionFactory cf = getConnectionFactory(next);
            if (cf == null)
                throw new IllegalStateException("No protocol factory for SSL next protocol: '" + next + "' in " + this);
        }

        super.doStart();

        _stopping=new CountDownLatch(_acceptors.length);
        for (int i = 0; i < _acceptors.length; i++)
        {
            Acceptor a = new Acceptor(i);
            addBean(a);
            getExecutor().execute(a);
        }

        LOG.info("Started {}", this);
    }

使用上層的LifeCycle佣谐,讓Connector的bean都run起來:

2創(chuàng)建selector線程并啟動狭魂。

org.eclipse.jetty.io.SelectorManager

 @Override
    protected void doStart() throws Exception
    {
        addBean(new ReservedThreadExecutor(getExecutor(),_reservedThreads),true);
        for (int i = 0; i < _selectors.length; i++)
        {
            ManagedSelector selector = newSelector(i);
            _selectors[i] = selector;
            addBean(selector);
        }
        super.doStart();
    }

3.創(chuàng)建acceptor線程并啟動坞生。

org.eclipse.jetty.server.AbstractConnector.dostart():

 _stopping=new CountDownLatch(_acceptors.length);
        for (int i = 0; i < _acceptors.length; i++)
        {
            Acceptor a = new Acceptor(i);
            addBean(a);
            getExecutor().execute(a);
        }

4.創(chuàng)建Acceptor都做了什么?

org.eclipse.jetty.server.AbstractConnector.Acceptor.run()

 @Override
        public void run()
        {
            final Thread thread = Thread.currentThread();
            String name=thread.getName();
              //設置線程名字
            _name=String.format("%s-acceptor-%d@%x-%s",name,_id,hashCode(),AbstractConnector.this.toString());
              
            thread.setName(_name);
                //設置線程優(yōu)先級
            int priority=thread.getPriority();
            if (_acceptorPriorityDelta!=0)
                thread.setPriority(Math.max(Thread.MIN_PRIORITY,Math.min(Thread.MAX_PRIORITY,priority+_acceptorPriorityDelta)));

            synchronized (AbstractConnector.this)
            {
               //將線程放入_acceptors數(shù)組
                _acceptors[_id] = thread;
            }

            try
            {
                //監(jiān)聽端口
                while (isRunning())
                {
                    try (Locker.Lock lock = _locker.lock())
                    {
                        if (!_accepting && isRunning())
                        {
                            _setAccepting.await();
                            continue;
                        }
                    }
                    catch (InterruptedException e) 
                    {
                        continue;
                    }
                    
                    try
                    {
                         //監(jiān)聽
                        accept(_id);
                    }
                    catch (Throwable x)
                    {
                        if (!handleAcceptFailure(x))
                            break;
                    }
                }
            }
            finally
            {
                thread.setName(name);
                if (_acceptorPriorityDelta!=0)
                    thread.setPriority(priority);

                synchronized (AbstractConnector.this)
                {
                    _acceptors[_id] = null;
                }
                CountDownLatch stopping=_stopping;
                if (stopping!=null)
                    stopping.countDown();
            }
        }

監(jiān)聽客戶請求的代碼:
org.eclipse.jetty.server.ServerConnector:

 @Override
    public void accept(int acceptorID) throws IOException
    {
        ServerSocketChannel serverChannel = _acceptChannel;
        if (serverChannel != null && serverChannel.isOpen())
        {
            SocketChannel channel = serverChannel.accept();
           //Acceptor在這里阻塞等待
            accepted(channel);
        }
    }

如果acceptor數(shù)量為零摔认,沒有專門的線程進行accept,則設置為非阻塞模式电谣,如果是非零剿牺,則有專門的線程進行accept晒来,因此為阻塞模式。
org.eclipse.jetty.server.ServerConnector:

@Override
protected void doStart() throws Exception
{
super.doStart();

    if (getAcceptors()==0)
    {
         //沒有專門的線程進行accept,則設置為非阻塞模式
        _acceptChannel.configureBlocking(false);
        _acceptor.set(_manager.acceptor(_acceptChannel));
    }
}

處理Http請求:

1.Accept成功
2.請求處理

1.Accept成功

接著acceptor說:
org.eclipse.jetty.server.ServerConnector:

 @Override
    public void accept(int acceptorID) throws IOException
    {
        ServerSocketChannel serverChannel = _acceptChannel;
        if (serverChannel != null && serverChannel.isOpen())
        {
            SocketChannel channel = serverChannel.accept();
           //Acceptor在這里阻塞等待
            accepted(channel);
        }
    }

分析accepted():
org.eclipse.jetty.server.ServerConnector:

private void accepted(SocketChannel channel) throws IOException
    {
        //1.設置為非阻塞模式
        channel.configureBlocking(false);
        //2.配置socket
        Socket socket = channel.socket();
        configure(socket);
        //3.正式處理:把channel交給selectorManager
        _manager.accept(channel);
    }

正式處理的分析:
1.選擇可用的ManagedSelector線程列牺。

  1. ManagedSelector處理瞎领。

org.eclipse.jetty.io. ManagedSelector

    public void accept(SelectableChannel channel, Object attachment)
    {
        //選擇可用的ManagedSelector線程。
        final ManagedSelector selector = chooseSelector(channel);
        //提交任務
        selector.submit(selector.new Accept(channel, attachment));
    }

1.選擇可用的ManagedSelector線程驼修。
org.eclipse.jetty.io. ManagedSelector

private ManagedSelector chooseSelector(SelectableChannel channel)
    {
        // Ideally we would like to have all connections from the same client end
        // up on the same selector (to try to avoid smearing the data from a single
        // client over all cores), but because of proxies, the remote address may not
        // really be the client - so we have to hedge our bets to ensure that all
        // channels don't end up on the one selector for a proxy.
        ManagedSelector candidate1 = null;
        if (channel != null)
        {
            try
            {
                if (channel instanceof SocketChannel)
                {
                    SocketAddress remote = ((SocketChannel)channel).getRemoteAddress();
                    if (remote instanceof InetSocketAddress)
                    {
                        byte[] addr = ((InetSocketAddress)remote).getAddress().getAddress();
                        if (addr != null)
                        {
                            int s = addr[addr.length - 1] & 0xFF;
                            candidate1 = _selectors[s % getSelectorCount()];
                        }
                    }
                }
            }
            catch (IOException x)
            {
                LOG.ignore(x);
            }
        }

        // The ++ increment here is not atomic, but it does not matter,
        // so long as the value changes sometimes, then connections will
        // be distributed over the available selectors.

       //這里的_selectorIndex是成員變量并不是線程安全的幢竹,
       //沒有必要線程
       //安全蹲坷,只要_selectorIndex是變化的就可以保證分配到不同       //的selectors,如果用線程安全方法(包括原子類)性能會下降循签。
      //就是說有時候線程安全不是必要的e
        long s = _selectorIndex++;
        int index = (int)(s % getSelectorCount());
        ManagedSelector candidate2 = _selectors[index];

        if (candidate1 == null || candidate1.size() >= candidate2.size() * 2)
            return candidate2;
        return candidate1;
    }

2.提交任務:
org.eclipse.jetty.io. ManagedSelector

public void submit(Runnable change)
    {
        if (LOG.isDebugEnabled())
            LOG.debug("Queued change {} on {}", change, this);

        Selector selector = null;
        try (Locker.Lock lock = _locker.lock())
        {
            _actions.offer(change);
            if (_selecting)
            {
                selector = _selector;
                // To avoid the extra select wakeup.
                _selecting = false;
            }
        }
        if (selector != null)
            selector.wakeup();
    }

2.請求處理:

org.eclipse.jetty.io. ManagedSelector.run()
@Override
public void run()
{
try
{
channel.register(_selector, SelectionKey.OP_CONNECT, this);
}
catch (Throwable x)
{
failed(x);
}
}

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末聚唐,一起剝皮案震驚了整個濱河市扮惦,隨后出現(xiàn)的幾起案子崖蜜,更是在濱河造成了極大的恐慌,老刑警劉巖等恐,帶你破解...
    沈念sama閱讀 216,544評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件郊尝,死亡現(xiàn)場離奇詭異流昏,居然都是意外死亡谚鄙,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,430評論 3 392
  • 文/潘曉璐 我一進店門蚊荣,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人媳叨,你說我怎么就攤上這事。” “怎么了汞舱?”我有些...
    開封第一講書人閱讀 162,764評論 0 353
  • 文/不壞的土叔 我叫張陵赔蒲,是天一觀的道長欢际。 經常有香客問我窒篱,道長配并,這世上最難降的妖魔是什么嫉髓? 我笑而不...
    開封第一講書人閱讀 58,193評論 1 292
  • 正文 為了忘掉前任苫耸,我火速辦了婚禮,結果婚禮上,老公的妹妹穿的比我還像新娘笼痛。我一直安慰自己,他們只是感情好,可當我...
    茶點故事閱讀 67,216評論 6 388
  • 文/花漫 我一把揭開白布隶校。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,182評論 1 299
  • 那天,我揣著相機與錄音,去河邊找鬼。 笑死,一個胖子當著我的面吹牛,可吹牛的內容都是我干的。 我是一名探鬼主播,決...
    沈念sama閱讀 40,063評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼柴墩,長吁一口氣:“原來是場噩夢啊……” “哼歼指!你這毒婦竟也來了惰赋?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 38,917評論 0 274
  • 序言:老撾萬榮一對情侶失蹤拒炎,失蹤者是張志新(化名)和其女友劉穎挪拟,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體击你,經...
    沈念sama閱讀 45,329評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡玉组,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,543評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了丁侄。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片惯雳。...
    茶點故事閱讀 39,722評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖鸿摇,靈堂內的尸體忽然破棺而出石景,到底是詐尸還是另有隱情,我是刑警寧澤拙吉,帶...
    沈念sama閱讀 35,425評論 5 343
  • 正文 年R本政府宣布潮孽,位于F島的核電站,受9級特大地震影響筷黔,放射性物質發(fā)生泄漏往史。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,019評論 3 326
  • 文/蒙蒙 一佛舱、第九天 我趴在偏房一處隱蔽的房頂上張望怠堪。 院中可真熱鬧,春花似錦名眉、人聲如沸粟矿。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,671評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽陌粹。三九已至,卻和暖如春福压,著一層夾襖步出監(jiān)牢的瞬間掏秩,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,825評論 1 269
  • 我被黑心中介騙來泰國打工荆姆, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留蒙幻,地道東北人。 一個月前我還...
    沈念sama閱讀 47,729評論 2 368
  • 正文 我出身青樓胆筒,卻偏偏與公主長得像邮破,于是被迫代替她去往敵國和親诈豌。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,614評論 2 353

推薦閱讀更多精彩內容

  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理抒和,服務發(fā)現(xiàn)矫渔,斷路器,智...
    卡卡羅2017閱讀 134,652評論 18 139
  • 從三月份找實習到現(xiàn)在摧莽,面了一些公司庙洼,掛了不少,但最終還是拿到小米镊辕、百度油够、阿里、京東征懈、新浪叠聋、CVTE、樂視家的研發(fā)崗...
    時芥藍閱讀 42,240評論 11 349
  • 1. Java基礎部分 基礎部分的順序:基本語法受裹,類相關的語法碌补,內部類的語法,繼承相關的語法棉饶,異常的語法厦章,線程的語...
    子非魚_t_閱讀 31,623評論 18 399
  • 1.Tomcat總體架構 Tomcat有Connector和Container兩大核心組件,Connector組件...
    monkey01閱讀 12,040評論 6 23
  • 打包分為兩種方式: 手動打包 自動打包 手動打包主要就是全程靠按鈕去點擊下一步下一步,如圖所示 這樣打包如果是單個...
    霧中的影子閱讀 3,323評論 1 1