前言
前面已經(jīng)初步分析請(qǐng)求流程,下面我們繼續(xù)洗出。
Poller流程處理
從上一篇直到Acceptor接受到請(qǐng)求并注冊(cè)到Poller中的events緩存棧中静檬,下面來(lái)想起看一下Poller的處理流程宽档。
public void run() {
// Loop until destroy() is called
while (true) {
boolean hasEvents = false;
try {
if (!close) {
//1. 更新PollerEvent隊(duì)列尉姨,主要執(zhí)行PollerEvent的run方法來(lái)更新selector感興趣的事件
hasEvents = events();
//2. 將wakeupCounter設(shè)置為-1,如果oldvalue>0雌贱,做非阻塞select;否則做超時(shí)的阻塞select啊送。其中wakeupCounter在#addEvent()時(shí)會(huì)加1
if (wakeupCounter.getAndSet(-1) > 0) {
// If we are here, means we have other stuff to do
// Do a non blocking select
keyCount = selector.selectNow();
} else {
keyCount = selector.select(selectorTimeout);
}
//3. wakeupCounter設(shè)置為0
wakeupCounter.set(0);
}
if (close) {
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
continue;
}
//4. keyCount為0偿短,說(shuō)明沒有事件到來(lái),再執(zhí)行一次#events()
if (keyCount == 0) hasEvents = (hasEvents | events());
//5. 遍歷SelectionKey(事件已到來(lái)馋没,進(jìn)行后續(xù)處理)昔逗,進(jìn)行讀寫處理
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper attachment = (NioSocketWrapper) sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (attachment == null) {
iterator.remove();
} else {
iterator.remove();
//6. 提交到線程池中進(jìn)行處理處理
processKey(sk, attachment);
}
}
// Process timeouts
timeout(keyCount, hasEvents);
}
getStopLatch().countDown();
}
public boolean events() {
boolean result = false;
PollerEvent pe = null;
for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++) {
result = true;
try {
pe.run();
pe.reset();
if (running && !paused) {
eventCache.push(pe);
}
} catch (Throwable x) {
log.error(sm.getString("endpoint.nio.pollerEventError"), x);
}
}
return result;
}
主要流程如下:
- 調(diào)用#event()方法,更新selector感興趣的事件篷朵;
- 執(zhí)行#selector.select()方法勾怒,檢測(cè)是否有事件到來(lái);
- 將到來(lái)的事件提交至線程池進(jìn)行下一步處理声旺。
protected void processKey(SelectionKey sk, NioSocketWrapper socketWrapper) {
try {
if (close) {
cancelledKey(sk);
} else if (sk.isValid() && socketWrapper != null) {
if (sk.isReadable() || sk.isWritable()) {
if (socketWrapper.getSendfileData() != null) {
processSendfile(sk, socketWrapper, false);
} else {
//1. 在通道上注銷對(duì)已經(jīng)發(fā)生事件的關(guān)注
unreg(sk, socketWrapper, sk.readyOps());
boolean closeSocket = false;
// Read goes before write
if (sk.isReadable()) {
//2. 進(jìn)行異步IO的處理或者交給SocketProcessor處理讀操作
if (socketWrapper.readOperation != null) {
getExecutor().execute(socketWrapper.readOperation);
} else if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
if (!closeSocket && sk.isWritable()) {
//3. 進(jìn)行異步IO的處理或者交給SocketProcessor處理寫操作
if (socketWrapper.writeOperation != null) {
getExecutor().execute(socketWrapper.writeOperation);
} else if (!processSocket(socketWrapper, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk);
}
}
}
} else {
// Invalid key
cancelledKey(sk);
}
} catch (CancelledKeyException ckx) {
cancelledKey(sk);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.nio.keyProcessingError"), t);
}
}
//防止了通道對(duì)同一個(gè)事件不斷select的問(wèn)題
protected void unreg(SelectionKey sk, NioSocketWrapper socketWrapper, int readyOps) {
// This is a must, so that we don't have multiple threads messing with the socket
reg(sk, socketWrapper, sk.interestOps() & (~readyOps));
}
protected void reg(SelectionKey sk, NioSocketWrapper socketWrapper, int intops) {
sk.interestOps(intops);
socketWrapper.interestOps(intops);
}
這里主要注銷了對(duì)已經(jīng)發(fā)生事件的關(guān)注笔链,然后將具體的處理邏輯交給SocketProcessor來(lái)處理,后面會(huì)介紹腮猖。
工作線程流程處理
從上面了解到鉴扫,最后會(huì)調(diào)用#Poller.processSocket()方法,將處理邏輯交給SocketProcessor類澈缺,我們來(lái)看下:
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
//1. 從緩存棧中獲取SocketProcessor坪创,無(wú)則創(chuàng)建否則重置SocketProcessor對(duì)象
SocketProcessorBase<S> sc = processorCache.pop();
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
//2. 獲取線程池,如果配置了線程池則將SocketProcessor提交到線程池中執(zhí)行姐赡,否則直接執(zhí)行SocketProcessor的run方法莱预。
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
} catch (RejectedExecutionException ree) {
getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
getLog().error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
如果配置了線程池,則提交SocketProcessor線程至線程池中執(zhí)行项滑,否則直接執(zhí)行#SocketProcessor.run()依沮。下面來(lái)看下SocketProcessor類:
public abstract class SocketProcessorBase<S> implements Runnable {
protected SocketWrapperBase<S> socketWrapper;
//socket事件狀態(tài)
protected SocketEvent event;
public SocketProcessorBase(SocketWrapperBase<S> socketWrapper, SocketEvent event) {
reset(socketWrapper, event);
}
public void reset(SocketWrapperBase<S> socketWrapper, SocketEvent event) {
Objects.requireNonNull(event);
this.socketWrapper = socketWrapper;
this.event = event;
}
@Override
public final void run() {
synchronized (socketWrapper) {
// It is possible that processing may be triggered for read and
// write at the same time. The sync above makes sure that processing
// does not occur in parallel. The test below ensures that if the
// first event to be processed results in the socket being closed,
// the subsequent events are not processed.
if (socketWrapper.isClosed()) {
return;
}
doRun();
}
}
//子類實(shí)現(xiàn)
protected abstract void doRun();
}
protected class SocketProcessor extends SocketProcessorBase<NioChannel> {
public SocketProcessor(SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
super(socketWrapper, event);
}
@Override
protected void doRun() {
NioChannel socket = socketWrapper.getSocket();
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
int handshake = -1;
try {
if (key != null) {
//NioChannel默認(rèn)返回true,SecureNioChannel這里才需要處理
if (socket.isHandshakeComplete()) {
// No TLS handshaking required. Let the handler
// process this socket / event combination.
handshake = 0;
} else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
event == SocketEvent.ERROR) {
// Unable to complete the TLS handshake. Treat it as
// if the handshake failed.
handshake = -1;
} else {
//具體處理
handshake = socket.handshake(key.isReadable(), key.isWritable());
// The handshake process reads/writes from/to the
// socket. status may therefore be OPEN_WRITE once
// the handshake completes. However, the handshake
// happens when the socket is opened so the status
// must always be OPEN_READ after it completes. It
// is OK to always set this as it is only used if
// the handshake completes.
event = SocketEvent.OPEN_READ;
}
}
} catch (IOException x) {
handshake = -1;
if (log.isDebugEnabled()) log.debug("Error during SSL handshake", x);
} catch (CancelledKeyException ckx) {
handshake = -1;
}
if (handshake == 0) {
SocketState state = SocketState.OPEN;
//最關(guān)鍵的代碼枪狂,交給handler處理socket
if (event == null) {
state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
} else {
state = getHandler().process(socketWrapper, event);
}
if (state == SocketState.CLOSED) {
close(socket, key);
}
} else if (handshake == -1) {
close(socket, key);
//如果handshake返回的是SelectionKey.OP_READ危喉,注冊(cè)讀事件到Poller;如果返回的是SelectionKey.OP_WRITE摘完,注冊(cè)寫事件到Poller姥饰,進(jìn)行后續(xù)處理
} else if (handshake == SelectionKey.OP_READ) {
socketWrapper.registerReadInterest();
} else if (handshake == SelectionKey.OP_WRITE) {
socketWrapper.registerWriteInterest();
}
} catch (CancelledKeyException cx) {
socket.getPoller().cancelledKey(key);
} catch (VirtualMachineError vme) {
ExceptionUtils.handleThrowable(vme);
} catch (Throwable t) {
log.error(sm.getString("endpoint.processing.fail"), t);
socket.getPoller().cancelledKey(key);
} finally {
socketWrapper = null;
event = null;
//return to cache
if (running && !paused) {
//處理結(jié)束后傻谁,將SocketProcessor重新放入緩存棧中
processorCache.push(this);
}
}
}
}
這里SocketEvent有OPEN_READ孝治、OPEN_WRITE、STOP审磁、TIMEOUT谈飒、DISCONNECT、ERROR六中狀態(tài)态蒂。SocketProcessor對(duì)象主要將socket交給Handler來(lái)處理請(qǐng)求杭措。
總結(jié)
到這里NIO的整個(gè)處理流程就大致清楚了,整體流程如下:
NIO整體處理流程
下面深入Handler來(lái)看一下Socket請(qǐng)求是如何轉(zhuǎn)換為Request對(duì)象钾恢,以及如何調(diào)用Servlet中的方法手素。