Tomcat7線程模型
tomcat 的nio 線程模型也是reactor 模型篷就,由accept 線程負(fù)責(zé)接受連接請求兼蜈,把請求轉(zhuǎn)發(fā)給其中一個Poller
線程,去注冊讀事件,Poller 線程就負(fù)責(zé)該連接的讀和寫,交給后面的線程池去處理驹沿,從讀報文,觸發(fā)后面的servlet請求都由線程池的線程完成。
Accept線程
backlog = 100; 默認(rèn)是100盲憎,也就是tcp的accept 隊列為100医窿,默認(rèn)還是比較少的。
最大連接數(shù)
maxConnections = 10000; 如果連接數(shù)超過了maxConnections,則等待連接釋放,其實(shí)這里底層TCP 鏈接是還可以建立的捞烟,只有內(nèi)核的accept 隊列沒有滿竞思,假如tomcat的鏈接數(shù)達(dá)到了10000跨算,accept線程就不從accept的隊列取出鏈接泉哈,這樣就很容易導(dǎo)致不能建立鏈接了。
核心代碼Run 方法如下:
int errorDelay = 0;
// Loop until we receive a shutdown command
while (running) {
// Loop if endpoint is paused
while (paused && running) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}
if (!running) {
break;
}
state = AcceptorState.RUNNING;
try {
//if we have reached max connections, wait
countUpOrAwaitConnection();
SocketChannel socket = null;
try {
// Accept the next incoming connection from the server
// socket
socket = serverSock.accept();
} catch (IOException ioe) {
// We didn't get a socket
countDownConnection();
if (running) {
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
} else {
break;
}
}
// Successful accept, reset the error delay
errorDelay = 0;
// Configure the socket
if (running && !paused) {
// setSocketOptions() will hand the socket off to
//這里把sock 分發(fā)到poller 線程
// an appropriate processor if successful
if (!setSocketOptions(socket)) {
closeSocket(socket);
}
} else {
closeSocket(socket);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.accept.fail"), t);
}
}
state = AcceptorState.ENDED;
}
Poller 線程
Poller線程負(fù)責(zé)輪詢注冊在對應(yīng)selector 上連接的讀寫請求事件破讨。因?yàn)锳ccept接收到鏈接請求后丛晦,回封裝成一個event,放到Poller的事件隊列提陶,poller 回從里面取出事件獲取socket烫沙。
Poller 線程個數(shù) pollerThreadCount默認(rèn)2個
pollerThreadCount = Math.min(2,Runtime.getRuntime().availableProcessors());
Accept 選擇poller是Round Robin,所以兩個poller線程負(fù)責(zé)的socket 各占一半
同步讀
poller io 線程還有點(diǎn)和reactor 模型不一樣的是,poller 線程不負(fù)責(zé)具體的讀http 消息隙笆,而是有可讀事件時锌蓄,分配給 SocketProcessor 來處理,SocketProcessor 是一個task撑柔,具體由tomcat的工作線程池來執(zhí)行瘸爽,所以一個連接上的http 請求數(shù)據(jù)報的讀取和poller 的線程是異步的,正是因?yàn)檫@樣铅忿,poller 在分配一個讀事件給SocketProcessor 后剪决,就取消了可讀事件的監(jiān)聽,下面是poller worker線程的processKey 方法檀训,用來分配讀寫事件柑潦。
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
try {
if ( close ) {
cancelledKey(sk);
} else if ( sk.isValid() && attachment != null ) {
if (sk.isReadable() || sk.isWritable() ) {
if ( attachment.getSendfileData() != null ) {
processSendfile(sk,attachment, false);
} else {
//先取消讀事件,意思是防止讀
unreg(sk, attachment, sk.readyOps());
boolean closeSocket = false;
// Read goes before write
if (sk.isReadable()) {
//創(chuàng)建socketprocessor來讀http 請求包和業(yè)務(wù)邏輯的執(zhí)行
if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
if (!closeSocket && sk.isWritable()) {
if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk);
}
}
}
} else {
//invalid key
cancelledKey(sk);
}
} catch ( CancelledKeyException ckx ) {
cancelledKey(sk);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error("",t);
}
}
注意上面的unreg方法如下
protected void unreg(SelectionKey sk, NioSocketWrapper attachment, int readyOps) {
//this is a must, so that we don't have multiple threads messing with the socket
reg(sk,attachment,sk.interestOps()& (~readyOps));
}
官方解釋是說防止多個線程同時讀一個socket峻凫,也就是一個請求連接的數(shù)據(jù)渗鬼。想象一種場景,如果一個hSocketProcess ttp請求的包只來了一部分荧琼,也就是SocketProcess 在等待后面一部分譬胎,后面部分來的時候差牛,觸發(fā)讀事件,重新創(chuàng)建一個SocketProcessor银择,這樣會導(dǎo)致兩個processor 同時處理一個socket數(shù)據(jù)多糠,會導(dǎo)致混亂。
何時重新注冊讀事件
- 1 上次請求處理完成浩考,會重新注冊讀事件夹孔,因?yàn)檫B接是持久keeplivve的
- 2 處理半包的情況,需要重新注冊讀事件
//狀態(tài)為LONG時析孽,代表半包的狀態(tài)搭伤,沒有讀完,需要等待,并重新注冊可讀事件袜瞬,
//而且socket 關(guān)聯(lián)的process 不能從connectionsremove掉
if (state == SocketState.LONG) {
// In the middle of processing a request/response. Keep the
// socket associated with the processor. Exact requirements
// depend on type of long poll
//longPoll 如果不是異步請求怜俐,會注冊讀事件
longPoll(wrapper, processor);
if (processor.isAsync()) {
getProtocol().addWaitingProcessor(processor);
}
} else if (state == SocketState.OPEN) {
// In keep-alive but between requests. OK to recycle
// processor. Continue to poll for the next request.
//處理完成的請求,可以remove掉process邓尤,因?yàn)椴恢老麓握埱笫裁磿r候來拍鲤,
// 同時也需要重新注冊讀事件
connections.remove(socket);
getLog().info("Tomcat process finish start to release process "+processor.getRequest().toString());
release(processor);
getLog().info("Tomcat release process "+processor.getRequest().toString()+ "start to register read event for next read!!!");
wrapper.registerReadInterest();
}
所以從上面的分析可以得出結(jié)論,tomcat nio 模型讀不同于netty的reactor 模型汞扎,io 讀寫由io 線程負(fù)責(zé)季稳,讀完了就交給業(yè)務(wù)線程支持,繼續(xù)讀后面的請求數(shù)據(jù)澈魄。但是tomcat是一個請求讀完景鼠,處理完業(yè)務(wù)邏輯,再繼續(xù)讀下一個請求的數(shù)據(jù)痹扇,這對http 這種獨(dú)占的協(xié)議無可厚非铛漓,如果想在http協(xié)議上實(shí)現(xiàn)類似rpc 自定義協(xié)議的連接復(fù)用時,即發(fā)請求可以不用等當(dāng)前請求返回鲫构,就可以繼續(xù)發(fā)浓恶,對發(fā)送多可以實(shí)現(xiàn)少量的連接發(fā)送大量的請求,但是由于服務(wù)端不能并發(fā)的讀结笨,必然會導(dǎo)致讀緩沖區(qū)瞬間滿了问顷,不能被讀走的請求,由于tcp 滑動窗口因子禀梳,也會導(dǎo)致發(fā)送方停止下來
工作線程池executor
執(zhí)行請求的線程池
public void createExecutor() {
internalExecutor = true;
TaskQueue taskqueue = new TaskQueue();
TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
taskqueue.setParent( (ThreadPoolExecutor) executor);
}
- 隊列:無界隊列 TaskQueue杜窄,
- 最小線 程minSpareThreads = 10
- 最大線程 maxThreads = 200
TaskQueue
taskQueue 對 offer方法做了些手腳,就是讓exeecutor的核心線程池達(dá)到最大值算途,如果按正常的邏輯塞耕,當(dāng)線程超過CoreSize 時,任務(wù)回往offer到TaskQueue 中嘴瓤,而tomcat的TaskQueue 是無界的隊列扫外,所以默認(rèn)的話tomcat都只有core size個線程在跑莉钙,這樣估計吞吐量不夠,所以tomcat的TaskQueue修改了offer方法筛谚,如下:
@Override
public boolean offer(Runnable o) {
//we can't do any checks
if (parent==null) return super.offer(o);
//we are maxed out on threads, simply queue the object
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//we have idle threads, just add it to the queue
if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
//if we have less threads than maximum force creation of a new thread
//關(guān)鍵點(diǎn)在這里磁玉,只要工作線程小于最大值,就返回false驾讲,這時線程池會去創(chuàng)建新的線程蚊伞。
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
//if we reached here, we need to add it to the queue
return super.offer(o);
}
如果用來tomcat sever.xml 指定的 exector ,即把Executor 啟用
<Executor name="tomcatThreadPool" namePrefix="catalina-exec-"
maxThreads="150" minSpareThreads="4" maxQueueSize="1000"/>
則創(chuàng)建的是Tomcat 自己實(shí)現(xiàn)的StandardThreadExecutor,該線程池唯一不同的是吮铭,可以指定隊列容量的大小时迫,默認(rèn)是Integer.MAX_VALUE,相當(dāng)于無界l谓晌。
可以通過maxQueueSize
屬性指定,代碼如下:
@Override
protected void startInternal() throws LifecycleException {
taskqueue = new TaskQueue(maxQueueSize);
TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
executor.setThreadRenewalDelay(threadRenewalDelay);
if (prestartminSpareThreads) {
executor.prestartAllCoreThreads();
}
taskqueue.setParent(executor);
setState(LifecycleState.STARTING);
}
Tomcat 異步處理
Servlet3.0 支持了異步掠拳,tomcat7 對異步也要支持,在tomcat的工作線程處理完后纸肉,如果時異步的話溺欧,不能結(jié)束掉當(dāng)前這個請求,要等待業(yè)務(wù)線程觸發(fā)了asyncContext.complete() 方法柏肪,執(zhí)行這個complete時胧奔,tomcat 會把該請求對于的socketprocess 獲取到,再教給上面說的executor 去執(zhí)行预吆。所以我們在通過request.startAsynce()時,最好不要用asyncContext.start()方法去執(zhí)行一些操作胳泉,這樣的話拐叉,這個異步處理還是需要tomcat的線程,來執(zhí)行扇商,就沒有意義了凤瘦。
Tomcat 異步寫
tomcat 的 response flush時,是阻塞的案铺,如果寫緩沖區(qū)不可用蔬芥,則會阻塞住flush的線程,如果想要異步flush控汉。則需要給response的outputStream 添加一個writerListener,有了writerListener tomcat就異步寫笔诵,不會阻塞。但是需要注意的是姑子,必須用tomcat的ServletOutputStream 才支持乎婿,默認(rèn)的servlet api 下的ServletOutputStream是沒有該方法的。
public abstract voidsetWriteListener(javax.servlet.WriteListener listener);
// If we know that the request is bad this early, add the
// Connection: close header.
if (keepAlive && statusDropsConnection(statusCode)) {
keepAlive = false;
}
if (!keepAlive) {
// Avoid adding the close header twice
if (!connectionClosePresent) {
headers.addValue(Constants.CONNECTION).setString(
Constants.CLOSE);
}
} else if (!http11 && !getErrorState().isError()) {
headers.addValue(Constants.CONNECTION).setString(Constants.KEEPALIVE);
}