一. 本文闡述內容的著眼點:
闡述jetty-server作為一個高性能的隐轩,吞吐量比較高的http服務它后面采用了怎樣的技術和多線程手段提升它的吞吐量和高性能的绵脯。主要分三塊:
1.server實例化:new Server();
2.啟動Server:server.start();
3.等待請求某些http的請求并處理(簡略闡述)底循;
二. new Server():
/* ------------------------------------------------------------ */
/** Convenience constructor
* Creates server and a {@link ServerConnector} at the passed port.
* @param port The port of a network HTTP connector (or 0 for a randomly allocated port).
* @see NetworkConnector#getLocalPort()
*/
public Server(@Name("port")int port)
{
//1.初始化線程池
this((ThreadPool)null);
//2.初始化ServerConnector
ServerConnector connector=new ServerConnector(this);
//3.設置port
connector.setPort(port);
//4.關聯(lián)Server和Connector
setConnectors(new Connector[]{connector});
}
1.初始化線程池:this((ThreadPool)null)
像jetty這樣一個服務端的容器,后臺不會使用new 一個線程,new 一個線程不是一件很靠譜的事情莉测。一定是使用線程池的。同時jetty并沒有使用jdk的線程池唧喉,而是自己實現(xiàn)了一個線程池--QueuedThreadPool捣卤。
public Server(@Name("threadpool") ThreadPool pool)
{
_threadPool=pool!=null?pool:new QueuedThreadPool();
addBean(_threadPool);
setServer(this);
}
QueuedThreadPool實現(xiàn)了一個SizedThreadPool。
@ManagedObject("A thread pool")
public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Dumpable
QueuedThreadPool關鍵是executor方法:
execute僅僅把job放入隊列_jobs里八孝。_jobs是一個BlockingQueue的接口董朝,保存所有要執(zhí)行的任務。BlockingQueue是一個性能不高的隊列干跛,因為這里execute執(zhí)行頻率不是很高子姜,offer和poll就不是很高。如果execute執(zhí)行頻率很高楼入,那么這里就有優(yōu)化的空間了哥捕。
public void execute(Runnable job)
{
if (LOG.isDebugEnabled())
LOG.debug("queue {}",job);
if (!isRunning() || !_jobs.offer(job))
{
LOG.warn("{} rejected {}", this, job);
throw new RejectedExecutionException(job.toString());
}
else
{
// Make sure there is at least one thread executing the job.
if (getThreads() == 0)
startThreads(1);
}
}
默認用類jetty自己實現(xiàn)的BlockingArrayQueue:
默認調用:
public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("queue") BlockingQueue<Runnable> queue, @Name("threadGroup") ThreadGroup threadGroup)
{
setMinThreads(minThreads);
setMaxThreads(maxThreads);
setIdleTimeout(idleTimeout);
setStopTimeout(5000);
if (queue==null)
{
int capacity=Math.max(_minThreads, 8);
queue=new BlockingArrayQueue<>(capacity, capacity);
}
_jobs=queue;
_threadGroup=threadGroup;
}
jetty自己實現(xiàn)的BlockingArrayQueue,雖然做了一些優(yōu)化還是用的阻塞浅辙,性能不是太高:
public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E>
2.初始化ServerConnector:
ServerConnector表示服務端的一個連接扭弧。處理http連接以及nio,channel,selector等。繼承AbstractConnector
@ManagedObject("HTTP connector using NIO ByteChannels and Selectors")
public class ServerConnector extends AbstractNetworkConnector
@ManagedObject("AbstractNetworkConnector")
public abstract class AbstractNetworkConnector extends AbstractConnector implements NetworkConnector
分四步:
1.初始化ScheduledExecutorScheduler
2.初始化ByteBufferPool
3.維護ConnectionFactory
4.取得可用CPU數(shù)量
5.更新acceptor數(shù)量
6.創(chuàng)建acceptor線程組
7.初始化ServerConnectorManager
public AbstractConnector(
Server server,
Executor executor,
Scheduler scheduler,
ByteBufferPool pool,
int acceptors,
ConnectionFactory... factories)
{
_server=server;
_executor=executor!=null?executor:_server.getThreadPool();
//1.初始化ScheduledExecutorScheduler
if (scheduler==null)
scheduler=_server.getBean(Scheduler.class);
_scheduler=scheduler!=null?scheduler:new ScheduledExecutorScheduler();
//2.初始化ByteBufferPool
if (pool==null)
pool=_server.getBean(ByteBufferPool.class);
_byteBufferPool = pool!=null?pool:new ArrayByteBufferPool();
addBean(_server,false);
addBean(_executor);
if (executor==null)
unmanage(_executor); // inherited from server
addBean(_scheduler);
addBean(_byteBufferPool);
//3.維護ConnectionFactory
for (ConnectionFactory factory:factories)
addConnectionFactory(factory);
//4.取得可用CPU數(shù)量
int cores = Runtime.getRuntime().availableProcessors();
//5.更新acceptor數(shù)量
if (acceptors < 0)
acceptors=Math.max(1, Math.min(4,cores/8));
if (acceptors > cores)
LOG.warn("Acceptors should be <= availableProcessors: " + this);
//6.創(chuàng)建acceptor線程組
_acceptors = new Thread[acceptors];
}
1)初始化ScheduledExecutorScheduler:
jetty里有些任務是要隔一段時間執(zhí)行一次的记舆。比如隔一段時間要檢查什么東西的狀態(tài)鸽捻。
2)初始化ByteBufferPool:
當jetty sever接受到大量Http請求(底層是tcp請求),需要初始化ByteBuffer,然后讀取或放入channel中御蒲,這樣太頻繁的話會降低性能衣赶。ByteBufferPool里面放的都是ByteBuffer對象,可重用厚满,減少GC,減少對象的產生府瞄。new不會消耗性能(因為new是jvm性能非常高的操作),主要是對象的回收碘箍,特別是新生代的回收遵馆,GC太消耗jvm。
但是丰榴,不能隨便設計一個多個線程獲取資源的Pool货邓,因為Pool的特性是線程安全,如果用低效的阻塞(如synchronized)去實現(xiàn)線程安全四濒,這樣還不如直接new對象然后交給GC回收换况。用低效的阻塞(如synchronized)做的對象池的性能是非常差的。
所以要想高效設計對象pool,必須是線程安全而且無鎖的盗蟆。
ByteBufferPool和一般的對象池還是有區(qū)別的戈二,一般的對象池里的對象任何一個都一樣,比如連接池。ByteBufferPool不同,所以比普通對象池稍微復雜一些坎怪。因為請求有可能用1k的ByteBuffer,2k的ByteBuffer亏栈,10k的ByteBuffer,2M的ByteBuffer宏赘。而且不可能為每一個容量都存N份绒北。
ByteBufferPool在jetty里有很多的實現(xiàn)類,現(xiàn)在分析一下默認的實現(xiàn)類ArrayByteBufferPool:
public ArrayByteBufferPool(int minSize, int increment, int maxSize, int maxQueue)
{
if (minSize<=0)
minSize=0;
if (increment<=0)
increment=1024;
if (maxSize<=0)
maxSize=64*1024;
if (minSize>=increment)
throw new IllegalArgumentException("minSize >= increment");
if ((maxSize%increment)!=0 || increment>=maxSize)
throw new IllegalArgumentException("increment must be a divisor of maxSize");
_min=minSize;
_inc=increment;
//一種大小對于一個Bucket,所以需要maxSize/increment=64個 //Bucket
//直接內存Bucket數(shù)組
_direct=new ByteBufferPool.Bucket[maxSize/increment];
//堆內存Bucket數(shù)組
_indirect=new ByteBufferPool.Bucket[maxSize/increment];
_maxQueue=maxQueue;
int size=0;
for (int i=0;i<_direct.length;i++)
{
size+=_inc;
//創(chuàng)建每個大小的Bucket察署,并放到Bucket數(shù)組里闷游。
_direct[i]=new ByteBufferPool.Bucket(this,size,_maxQueue);
_indirect[i]=new ByteBufferPool.Bucket(this,size,_maxQueue);
}
}
minSize:最小大小
increment:每次增加的大小。
maxSize:最大大小贴汪。
例如: minSize=0;increment=1024;maxSize=64*1024;
這樣會有64種大小脐往。
存儲ByteBuffer的結構:Bucket:
class Bucket
{
private final Deque<ByteBuffer> _queue = new ConcurrentLinkedDeque<>();
private final ByteBufferPool _pool;
private final int _capacity;
private final AtomicInteger _space;
重要的參數(shù):
_capacity,表示Bucket存放ByteBuffer個數(shù)扳埂。
_queue业簿,表示實際存放ByteBuffer的容器,ConcurrentLinkedDeque (無鎖的實現(xiàn)阳懂,并發(fā)性很好)梅尤。
初始化ArrayByteBufferPool時會創(chuàng)建每個大小的Bucket柜思,并放到Bucket數(shù)組( _direct,_indirect)里。但是_queue里的內容是延遲加載的巷燥。
acquire():向ArrayByteBufferPool池中申請ByteBuffer赡盘。
//size:需要多大的ByteBuffer; direct:是否是直接內存
public ByteBuffer acquire(int size, boolean direct)
{
// _direct[]或_indirect[]在取得合適的Bucket,因為是懶加載,
//所以有可能是空的
ByteBufferPool.Bucket bucket = bucketFor(size,direct);
if (bucket==null)
return newByteBuffer(size,direct);//null缰揪,就新建
//如果在 _direct[]或_indirect[]找到合適的bucket陨享,
//就在bucket內部隊列中取得一個ByteBuffer
return bucket.acquire(direct);
}
release():
public void release(ByteBuffer buffer)
{
if (buffer!=null)
{
//尋找合適的Bucket
ByteBufferPool.Bucket bucket = bucketFor(buffer.capacity(),buffer.isDirect());
if (bucket!=null)
//放到找到的Bucket的內部隊列中
bucket.release(buffer);
}
}
例外處理:
如何一個線程想要128k的bytebuffer,按acquire()會申請一個128k的,但是release時钝腺,不會歸還回去抛姑,這樣128k不進入Bucket的內部隊列。
3)維護ConnectionFactory:
ConnectionFactory用來創(chuàng)建連接對象艳狐,如HttpConnectionFactory途戒。
4)取得可用CPU數(shù)量:
int cores = Runtime.getRuntime().availableProcessors();
5)更新acceptor線程數(shù)量:
acceptor指有多少線程去做處理客戶端請求的工作。
有幾個線程做acceptors:acceptors原則上不會超過四個僵驰。
服務端設計的經驗,
if (acceptors < 0)
acceptors=Math.max(1, Math.min(4,cores/8));`
6)創(chuàng)建acceptor線程組:
_acceptors = new Thread[acceptors];
7)初始化ServerConnectorManager:
ServerConnectorManager繼承SelectorManager唁毒,用來管理selector的管理器蒜茴。
selector是NIO中的選擇器,管理channel狀態(tài)的浆西。
_manager = newSelectorManager(getExecutor(), getScheduler(),selectors);
selector的線程個數(shù):
Math.max(1,Math.min(cpus/2,threads/16));
3.設置port:
設置sever的port.
4.關聯(lián)Server和Connector:
setConnectors(new Connector[]{connector});
3.Server.start():
org.eclipse.jetty.util.component.AbstractLifeCycle:
public final void start() throws Exception
{
synchronized (_lock)
{
try
{
if (_state == __STARTED || _state == __STARTING)
return;
//1.設置啟動狀態(tài)
setStarting();
//2.啟動過程doStart()
doStart();
//3.啟動完畢
setStarted();
}
catch (Throwable e)
{
setFailed(e);
throw e;
}
}
}
1.設置啟動狀態(tài):
private void setStarting()
{
if (LOG.isDebugEnabled())
LOG.debug("starting {}",this);
_state = __STARTING;
for (Listener listener : _listeners)
listener.lifeCycleStarting(this);
}
2.啟動過程doStart():
包括Server在內粉私,很多jetty對象,包括Connector等近零,都實現(xiàn)了一個LifeCycle這個接口诺核。LifeCycle表示有一個生命周期的對象,包括一些方法start(),stop()等久信,一個對象的生老病死都在里面窖杀。因為像Server,Connector這樣的對象都有生老病死這樣一個周期,所以類似有聲明周期的對象都會實現(xiàn)這樣一個LifeCycle的接口裙士。
@Override
protected void doStart() throws Exception
{
// Create an error handler if there is none
if (_errorHandler==null)
_errorHandler=getBean(ErrorHandler.class);
if (_errorHandler==null)
setErrorHandler(new ErrorHandler());
if (_errorHandler instanceof ErrorHandler.ErrorPageMapper)
LOG.warn("ErrorPageMapper not supported for Server level Error Handling");
_errorHandler.setServer(this);
//If the Server should be stopped when the jvm exits, register
//with the shutdown handler thread.
if (getStopAtShutdown())
ShutdownThread.register(this);
//Register the Server with the handler thread for receiving
//remote stop commands
//1.注冊ShutdownMonitor,用來遠程停止Server
ShutdownMonitor.register(this);
//Start a thread waiting to receive "stop" commands.
ShutdownMonitor.getInstance().start(); // initialize
LOG.info("jetty-" + getVersion());
if (!Jetty.STABLE)
{
LOG.warn("THIS IS NOT A STABLE RELEASE! DO NOT USE IN PRODUCTION!");
LOG.warn("Download a stable release from http://download.eclipse.org/jetty/");
}
HttpGenerator.setJettyVersion(HttpConfiguration.SERVER_VERSION);
// Check that the thread pool size is enough.
//2.拿到線程池入客,在前面實例化的時候就已經初始化完了。
SizedThreadPool pool = getBean(SizedThreadPool.class);
int max=pool==null?-1:pool.getMaxThreads();
int selectors=0;
int acceptors=0;
for (Connector connector : _connectors)
{
if (connector instanceof AbstractConnector)
{
AbstractConnector abstractConnector = (AbstractConnector)connector;
Executor connectorExecutor = connector.getExecutor();
if (connectorExecutor != pool)
{
// Do not count the selectors and acceptors from this connector at
// the server level, because the connector uses a dedicated executor.
continue;
}
acceptors += abstractConnector.getAcceptors();
if (connector instanceof ServerConnector)
{
// The SelectorManager uses 2 threads for each selector,
// one for the normal and one for the low priority strategies.
selectors += 2 * ((ServerConnector)connector).getSelectorManager().getSelectorCount();
}
}
}
int needed=1+selectors+acceptors;
if (max>0 && needed>max)
throw new IllegalStateException(String.format("Insufficient threads: max=%d < needed(acceptors=%d + selectors=%d + request=1)",max,acceptors,selectors));
MultiException mex=new MultiException();
try
{
super.doStart();
}
catch(Throwable e)
{
mex.add(e);
}
// start connectors last
for (Connector connector : _connectors)
{
try
{
connector.start();
}
catch(Throwable e)
{
mex.add(e);
}
}
if (isDumpAfterStart())
dumpStdErr();
mex.ifExceptionThrow();
LOG.info(String.format("Started @%dms",Uptime.getUptime()));
}
1.ShutdownMonitor:
單啟動一個線程去支持遠程關閉Server
```
//remote stop commands
ShutdownMonitor.register(this);
//Start a thread waiting to receive "stop" commands.
ShutdownMonitor.getInstance().start(); // initialize
###2.拿到線程池:
拿到線程池腿椎,在前面實例化的時候就已經初始化完了桌硫。
jetty實現(xiàn)了自己的一套管理體系:
```
// Check that the thread pool size is enough.
SizedThreadPool pool = getBean(SizedThreadPool.class);
int max=pool==null?-1:pool.getMaxThreads();
int selectors=0;
int acceptors=0;
```
###3.計算selector數(shù)量:
累計所有connector下的selector,累加:
最后needed=1+selectors+acceptors; needed是一共所需線程數(shù):
但大于兩百終止程序(如果有200個線程去做selector,和acceptor了,系統(tǒng)認為跑不起來了啃炸。)
int max=pool==null?-1:pool.getMaxThreads();
int selectors=0;
int acceptors=0;
for (Connector connector : _connectors)
{
if (connector instanceof AbstractConnector)
{
AbstractConnector abstractConnector = (AbstractConnector)connector;
Executor connectorExecutor = connector.getExecutor();
if (connectorExecutor != pool)
{
// Do not count the selectors and acceptors from this connector at
// the server level, because the connector uses a dedicated executor.
continue;
}
acceptors += abstractConnector.getAcceptors();
if (connector instanceof ServerConnector)
{
// The SelectorManager uses 2 threads for each selector,
// one for the normal and one for the low priority strategies.
selectors += 2 * ((ServerConnector)connector).getSelectorManager().getSelectorCount();
}
}
}
//共需要的線程數(shù):
int needed=1+selectors+acceptors;
大于兩百終止程序:
if (max>0 && needed>max)
throw new IllegalStateException(String.format("Insufficient threads: max=%d < needed(acceptors=%d + selectors=%d + request=1)",max,acceptors,selectors));
4.管理bean铆隘,并開始執(zhí)行:
QueuedThreadPool實現(xiàn)了LifeCycle南用,所以在這里執(zhí)行
/**
* Starts the managed lifecycle beans in the order they were added.
*/
@Override
protected void doStart() throws Exception
{
if (_destroyed)
throw new IllegalStateException("Destroyed container cannot be restarted");
// indicate that we are started, so that addBean will start other beans added.
_doStarted = true;
// start our managed and auto beans
for (Bean b : _beans)
{
if (b._bean instanceof LifeCycle)
{
LifeCycle l = (LifeCycle)b._bean;
switch(b._managed)
{
case MANAGED:
if (!l.isRunning())
start(l);
break;
case AUTO:
if (l.isRunning())
unmanage(b);
else
{
manage(b);
start(l);
}
break;
}
}
}
super.doStart();
}
QueuedThreadPool的啟動:
@Override
protected void doStart() throws Exception
{
super.doStart();
_threadsStarted.set(0);
//啟動若干線程:創(chuàng)建線程托修,設置線程的屬性砚嘴,啟動線程际长。
startThreads(_minThreads);
}
//開始線程
private boolean startThreads(int threadsToStart)
{
while (threadsToStart > 0 && isRunning())
{
int threads = _threadsStarted.get();
if (threads >= _maxThreads)
return false;
if (!_threadsStarted.compareAndSet(threads, threads + 1))
continue;
boolean started = false;
try
{
//創(chuàng)建線程:用的是_runnable(實現(xiàn)了Runnable接口)
Thread thread = newThread(_runnable);
//設置線程的屬性
thread.setDaemon(isDaemon());
thread.setPriority(getThreadsPriority());
thread.setName(_name + "-" + thread.getId());
_threads.add(thread);
//啟動線程
thread.start();
started = true;
--threadsToStart;
}
finally
{
if (!started)
_threadsStarted.decrementAndGet();
}
}
return true;
}
//_runnable詳情:用死循環(huán)不斷地_jobs.poll():這里是啟動線程后動_jobs里面取任務了搓彻。對于與前面的executor方法
while (job != null && isRunning())
{
if (LOG.isDebugEnabled())
LOG.debug("run {}",job);
//真正的讓線程run旭贬。
runJob(job);
if (LOG.isDebugEnabled())
LOG.debug("ran {}",job);
if (Thread.interrupted())
{
ignore=true;
break loop;
}
job = _jobs.poll();
}
5.啟動連接:
1.獲取ConnectionFactory
2.創(chuàng)建Selector并啟動程序
3.創(chuàng)建Acceptor線程扼脐。
// start connectors last
for (Connector connector : _connectors)
{
try
{
//開始
connector.start();
}
catch(Throwable e)
{
mex.add(e);
}
}
####1.獲取ConnectionFactory
//org.eclipse.jetty.server.AbstractConnector
protected void doStart() throws Exception
{
if(_defaultProtocol==null)
throw new IllegalStateException("No default protocol for "+this);
//取得ConnectionFactory:
_defaultConnectionFactory = getConnectionFactory(_defaultProtocol);
if(_defaultConnectionFactory==null)
throw new IllegalStateException("No protocol factory for default protocol '"+_defaultProtocol+"' in "+this);
SslConnectionFactory ssl = getConnectionFactory(SslConnectionFactory.class);
if (ssl != null)
{
String next = ssl.getNextProtocol();
ConnectionFactory cf = getConnectionFactory(next);
if (cf == null)
throw new IllegalStateException("No protocol factory for SSL next protocol: '" + next + "' in " + this);
}
super.doStart();
_stopping=new CountDownLatch(_acceptors.length);
for (int i = 0; i < _acceptors.length; i++)
{
Acceptor a = new Acceptor(i);
addBean(a);
getExecutor().execute(a);
}
LOG.info("Started {}", this);
}
使用上層的LifeCycle佣谐,讓Connector的bean都run起來:
2創(chuàng)建selector線程并啟動狭魂。
org.eclipse.jetty.io.SelectorManager
@Override
protected void doStart() throws Exception
{
addBean(new ReservedThreadExecutor(getExecutor(),_reservedThreads),true);
for (int i = 0; i < _selectors.length; i++)
{
ManagedSelector selector = newSelector(i);
_selectors[i] = selector;
addBean(selector);
}
super.doStart();
}
3.創(chuàng)建acceptor線程并啟動坞生。
org.eclipse.jetty.server.AbstractConnector.dostart():
_stopping=new CountDownLatch(_acceptors.length);
for (int i = 0; i < _acceptors.length; i++)
{
Acceptor a = new Acceptor(i);
addBean(a);
getExecutor().execute(a);
}
4.創(chuàng)建Acceptor都做了什么?
org.eclipse.jetty.server.AbstractConnector.Acceptor.run()
@Override
public void run()
{
final Thread thread = Thread.currentThread();
String name=thread.getName();
//設置線程名字
_name=String.format("%s-acceptor-%d@%x-%s",name,_id,hashCode(),AbstractConnector.this.toString());
thread.setName(_name);
//設置線程優(yōu)先級
int priority=thread.getPriority();
if (_acceptorPriorityDelta!=0)
thread.setPriority(Math.max(Thread.MIN_PRIORITY,Math.min(Thread.MAX_PRIORITY,priority+_acceptorPriorityDelta)));
synchronized (AbstractConnector.this)
{
//將線程放入_acceptors數(shù)組
_acceptors[_id] = thread;
}
try
{
//監(jiān)聽端口
while (isRunning())
{
try (Locker.Lock lock = _locker.lock())
{
if (!_accepting && isRunning())
{
_setAccepting.await();
continue;
}
}
catch (InterruptedException e)
{
continue;
}
try
{
//監(jiān)聽
accept(_id);
}
catch (Throwable x)
{
if (!handleAcceptFailure(x))
break;
}
}
}
finally
{
thread.setName(name);
if (_acceptorPriorityDelta!=0)
thread.setPriority(priority);
synchronized (AbstractConnector.this)
{
_acceptors[_id] = null;
}
CountDownLatch stopping=_stopping;
if (stopping!=null)
stopping.countDown();
}
}
監(jiān)聽客戶請求的代碼:
org.eclipse.jetty.server.ServerConnector:
@Override
public void accept(int acceptorID) throws IOException
{
ServerSocketChannel serverChannel = _acceptChannel;
if (serverChannel != null && serverChannel.isOpen())
{
SocketChannel channel = serverChannel.accept();
//Acceptor在這里阻塞等待
accepted(channel);
}
}
如果acceptor數(shù)量為零摔认,沒有專門的線程進行accept,則設置為非阻塞模式电谣,如果是非零剿牺,則有專門的線程進行accept晒来,因此為阻塞模式。
org.eclipse.jetty.server.ServerConnector:
@Override
protected void doStart() throws Exception
{
super.doStart();
if (getAcceptors()==0)
{
//沒有專門的線程進行accept,則設置為非阻塞模式
_acceptChannel.configureBlocking(false);
_acceptor.set(_manager.acceptor(_acceptChannel));
}
}
處理Http請求:
1.Accept成功
2.請求處理
1.Accept成功
接著acceptor說:
org.eclipse.jetty.server.ServerConnector:
@Override
public void accept(int acceptorID) throws IOException
{
ServerSocketChannel serverChannel = _acceptChannel;
if (serverChannel != null && serverChannel.isOpen())
{
SocketChannel channel = serverChannel.accept();
//Acceptor在這里阻塞等待
accepted(channel);
}
}
分析accepted():
org.eclipse.jetty.server.ServerConnector:
private void accepted(SocketChannel channel) throws IOException
{
//1.設置為非阻塞模式
channel.configureBlocking(false);
//2.配置socket
Socket socket = channel.socket();
configure(socket);
//3.正式處理:把channel交給selectorManager
_manager.accept(channel);
}
正式處理的分析:
1.選擇可用的ManagedSelector線程列牺。
- ManagedSelector處理瞎领。
org.eclipse.jetty.io. ManagedSelector
public void accept(SelectableChannel channel, Object attachment)
{
//選擇可用的ManagedSelector線程。
final ManagedSelector selector = chooseSelector(channel);
//提交任務
selector.submit(selector.new Accept(channel, attachment));
}
1.選擇可用的ManagedSelector線程驼修。
org.eclipse.jetty.io. ManagedSelector
private ManagedSelector chooseSelector(SelectableChannel channel)
{
// Ideally we would like to have all connections from the same client end
// up on the same selector (to try to avoid smearing the data from a single
// client over all cores), but because of proxies, the remote address may not
// really be the client - so we have to hedge our bets to ensure that all
// channels don't end up on the one selector for a proxy.
ManagedSelector candidate1 = null;
if (channel != null)
{
try
{
if (channel instanceof SocketChannel)
{
SocketAddress remote = ((SocketChannel)channel).getRemoteAddress();
if (remote instanceof InetSocketAddress)
{
byte[] addr = ((InetSocketAddress)remote).getAddress().getAddress();
if (addr != null)
{
int s = addr[addr.length - 1] & 0xFF;
candidate1 = _selectors[s % getSelectorCount()];
}
}
}
}
catch (IOException x)
{
LOG.ignore(x);
}
}
// The ++ increment here is not atomic, but it does not matter,
// so long as the value changes sometimes, then connections will
// be distributed over the available selectors.
//這里的_selectorIndex是成員變量并不是線程安全的幢竹,
//沒有必要線程
//安全蹲坷,只要_selectorIndex是變化的就可以保證分配到不同 //的selectors,如果用線程安全方法(包括原子類)性能會下降循签。
//就是說有時候線程安全不是必要的e
long s = _selectorIndex++;
int index = (int)(s % getSelectorCount());
ManagedSelector candidate2 = _selectors[index];
if (candidate1 == null || candidate1.size() >= candidate2.size() * 2)
return candidate2;
return candidate1;
}
2.提交任務:
org.eclipse.jetty.io. ManagedSelector
public void submit(Runnable change)
{
if (LOG.isDebugEnabled())
LOG.debug("Queued change {} on {}", change, this);
Selector selector = null;
try (Locker.Lock lock = _locker.lock())
{
_actions.offer(change);
if (_selecting)
{
selector = _selector;
// To avoid the extra select wakeup.
_selecting = false;
}
}
if (selector != null)
selector.wakeup();
}
2.請求處理:
org.eclipse.jetty.io. ManagedSelector.run()
@Override
public void run()
{
try
{
channel.register(_selector, SelectionKey.OP_CONNECT, this);
}
catch (Throwable x)
{
failed(x);
}
}