簡書 杭州_mina
《tomcat 8.x NioEndpoint核心組件淺析1》
1. Acceptor 淺析
/**
* The background thread that listens for incoming TCP/IP connections and
* hands them off to an appropriate processor.
*/
protected class Acceptor extends AbstractEndpoint.Acceptor {
@Override
public void run() {
int errorDelay = 0;
// Loop until we receive a shutdown command
// 一直循環(huán)直到接收到關(guān)閉命令
while (running) {
// paused 只有在unbind的時候會設(shè)置
while (paused && running) {
//變更Acceptor的狀態(tài)
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}
if (!running) {
break;
}
// Acceptor 設(shè)置成running
state = AcceptorState.RUNNING;
try {
//if we have reached max connections, wait
//連接數(shù)達(dá)到最大旨巷,暫停線程士骤。
//這里用到的是connectionLimitLatch鎖饿凛,可以理解為一個閉鎖
//我理解connectionLimitLatch和coundownlatch類似
//補(bǔ)充一點(diǎn)這個是先增加計數(shù)器南吮、如果超過最大連接數(shù)則減少計時器涨岁、然后線程暫停,你這個計時器就是當(dāng)前連接數(shù)
//程序的下面我會簡單的講一下這個方法的實(shí)現(xiàn)
countUpOrAwaitConnection();
SocketChannel socket = null;
try {
// Accept the next incoming connection from the server
// socket
//調(diào)用serversocket的accept方法
socket = serverSock.accept();
} catch (IOException ioe) {
// We didn't get a socket
// 出錯了要減去連接數(shù)
countDownConnection();
if (running) {
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
} else {
break;
}
}
// Successful accept, reset the error delay
errorDelay = 0;
// Configure the socket
if (running && !paused) {
// setSocketOptions() will hand the socket off to
// an appropriate processor if successful
// 把socket扔到poller中
if (!setSocketOptions(socket)) {
// 如果加入poller失敗關(guān)閉連接
closeSocket(socket);
}
} else {
closeSocket(socket);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.accept.fail"), t);
}
}
state = AcceptorState.ENDED;
}
}
下面來看setSocketOptions方法中如何把SocketChannel加入到poller中
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
//disable blocking, APR style, we are gonna be polling it
socket.configureBlocking(false); //設(shè)置成非阻塞
//獲取socket
Socket sock = socket.socket();
//配置socket信息
socketProperties.setProperties(sock);
//創(chuàng)建一個NioChannel 他封裝了SocketChannel
NioChannel channel = nioChannels.pop();
if (channel == null) {
//如果為null 創(chuàng)建一個NioChannel 這里使用系統(tǒng)內(nèi)存
//使用系統(tǒng)內(nèi)存可以省去一步從系統(tǒng)內(nèi)存拷貝到堆內(nèi)存的動作猜煮、性能上會有很大的提升拗胜,nioChannels初始化默認(rèn)為128個
//當(dāng)socket 關(guān)閉的重新清理NioChannel而不是銷毀這個對象可以達(dá)到對象復(fù)用的效果、因為申請系統(tǒng)內(nèi)存的開銷比申請堆內(nèi)存的開銷要大很多
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
} else {
channel = new NioChannel(socket, bufhandler);
}
} else {
//如果不為null設(shè)置SocketChannel
channel.setIOChannel(socket);
//將channle復(fù)位 以上就是重置系統(tǒng)內(nèi)存把指針重新定位到buff的開始位置
channel.reset();
}
//丟入poller隊列中
getPoller0().register(channel);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error("",t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
// Tell to close the socket
return false;
}
return true;
}
最后一步register方法
public void register(final NioChannel socket) {
//設(shè)置poller 對象
socket.setPoller(this);
//設(shè)置一個附件待會把這個NioSocketWrapper注冊到poller對象中的selector上去
NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);//獲取
socket.setSocketWrapper(ka);
ka.setPoller(this);
ka.setReadTimeout(getSocketProperties().getSoTimeout());
ka.setWriteTimeout(getSocketProperties().getSoTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
ka.setSecure(isSSLEnabled());
ka.setReadTimeout(getSoTimeout());
ka.setWriteTimeout(getSoTimeout());
PollerEvent r = eventCache.pop();
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
//生成一個PollerEvent對象 這個對象繼承了Runnable
//這個對象的主要作用就是把NioSocketWrapper注冊到poller對象中的selector上去
//還有就是獲取SelectionKey 該對象是用于跟蹤這些被注冊事件的句柄
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
else r.reset(socket,ka,OP_REGISTER);
//把pollerEvent放入到poller的隊列中
addEvent(r);
}
2. LimitLatch 淺析
- 2.1 AbstractEndpoint 封裝了LimitLatch一些方法
//默認(rèn)最大連接數(shù)為10000
private int maxConnections = 10000;
//獲取最大連接數(shù)
public int getMaxConnections() {
return this.maxConnections;
}
//初始化閉鎖
protected LimitLatch initializeConnectionLatch() {
if (maxConnections==-1) return null;
if (connectionLimitLatch==null) {
//
connectionLimitLatch = new LimitLatch(getMaxConnections());
}
return connectionLimitLatch;
}
//將會釋放所有的線程
protected void releaseConnectionLatch() {
LimitLatch latch = connectionLimitLatch;
if (latch!=null) latch.releaseAll();
connectionLimitLatch = null;
}
//增加計數(shù)辆沦,如果太大昼捍,那么等待
protected void countUpOrAwaitConnection() throws InterruptedException {
if (maxConnections==-1) return;
LimitLatch latch = connectionLimitLatch;
if (latch!=null) latch.countUpOrAwait();
}
//減少計數(shù)
protected long countDownConnection() {
if (maxConnections==-1) return -1;
LimitLatch latch = connectionLimitLatch;
if (latch!=null) {
long result = latch.countDown();
if (result<0) {
getLog().warn(sm.getString("endpoint.warn.incorrectConnectionCount"));
}
return result;
} else return -1
}
- 2.2 LimitLatch 源碼淺析
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tomcat.util.threads;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
* Shared latch that allows the latch to be acquired a limited number of times
* after which all subsequent requests to acquire the latch will be placed in a
* FIFO queue until one of the shares is returned.
*/
public class LimitLatch {
private static final Log log = LogFactory.getLog(LimitLatch.class);
//構(gòu)建sync對象、主要用來同步肢扯、阻塞兩個功能
private class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1L;
public Sync() {
}
@Override
protected int tryAcquireShared(int ignored) {
//增加計數(shù)器
long newCount = count.incrementAndGet();
//如果計數(shù)器大于最大limit 則計數(shù)器減一 返回-1 否則返回 1
//這里的limit 其實(shí)就是maxConnections
if (!released && newCount > limit) {
// Limit exceeded
count.decrementAndGet();
return -1;
} else {
return 1;
}
}
@Override
protected boolean tryReleaseShared(int arg) {
//計數(shù)器減一
count.decrementAndGet();
return true;
}
}
private final Sync sync;
//計數(shù)器
private final AtomicLong count;
//最大連接數(shù)
private volatile long limit;
//是否全部釋放
private volatile boolean released = false;
/**
* Instantiates a LimitLatch object with an initial limit.
*
* @param limit - maximum number of concurrent acquisitions of this latch
*/
public LimitLatch(long limit) {
this.limit = limit;
this.count = new AtomicLong(0);
this.sync = new Sync();
}
/**
* Returns the current count for the latch
*
* @return the current count for latch
*/
//獲取當(dāng)前計數(shù)器
public long getCount() {
return count.get();
}
/**
* Obtain the current limit.
*
* @return the limit
*/
//獲取最大連接數(shù)
public long getLimit() {
return limit;
}
/**
* Sets a new limit. If the limit is decreased there may be a period where
* more shares of the latch are acquired than the limit. In this case no
* more shares of the latch will be issued until sufficient shares have been
* returned to reduce the number of acquired shares of the latch to below
* the new limit. If the limit is increased, threads currently in the queue
* may not be issued one of the newly available shares until the next
* request is made for a latch.
*
* @param limit The new limit
*/
//設(shè)置最大連接數(shù)
public void setLimit(long limit) {
this.limit = limit;
}
/**
* Acquires a shared latch if one is available or waits for one if no shared
* latch is current available.
*
* @throws InterruptedException If the current thread is interrupted
*/
//增加計數(shù)器如果超過最大連接數(shù)妒茬、則等待并且計數(shù)器減一
public void countUpOrAwait() throws InterruptedException {
if (log.isDebugEnabled()) {
log.debug("Counting up[" + Thread.currentThread().getName() + "] latch=" + getCount());
}
sync.acquireSharedInterruptibly(1);
}
/**
* Releases a shared latch, making it available for another thread to use.
*
* @return the previous counter value
*/
//減少計數(shù)器其實(shí)就是減少連接數(shù)
public long countDown() {
sync.releaseShared(0);
long result = getCount();
if (log.isDebugEnabled()) {
log.debug("Counting down[" + Thread.currentThread().getName() + "] latch=" + result);
}
return result;
}
/**
* Releases all waiting threads and causes the {@link #limit} to be ignored
* until {@link #reset()} is called.
*
* @return <code>true</code> if release was done
*/
//釋放所有線程
public boolean releaseAll() {
released = true;
return sync.releaseShared(0);
}
/**
* Resets the latch and initializes the shared acquisition counter to zero.
*
* @see #releaseAll()
*/
//重置計數(shù)器
public void reset() {
this.count.set(0);
released = false;
}
/**
* Returns <code>true</code> if there is at least one thread waiting to
* acquire the shared lock, otherwise returns <code>false</code>.
*
* @return <code>true</code> if threads are waiting
*/
//是否有等待線程
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
/**
* Provide access to the list of threads waiting to acquire this limited
* shared latch.
*
* @return a collection of threads
*/
//獲取所有等待線程
public Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
}
3.總結(jié)Acceptor流程
雖然看上去代碼有點(diǎn)復(fù)雜,但是實(shí)際上就是一句話概括蔚晨。獲取socket乍钻、并且對socket進(jìn)行封裝肛循,扔到Poller的隊列中。