一個(gè)IoSession的I/O事件是注冊(cè)在一個(gè)Selector對(duì)象上儡循,并且每個(gè)Processor線程只輪詢一個(gè)Selector對(duì)象混滔,即每一個(gè)鏈接只有一個(gè)線程處理I/O事件洒疚,這樣能保證同一IoSession數(shù)據(jù)的有序性。
下面就從部分源碼探究其中的原理坯屿,以NioAcceptor為例子:
public NioSocketAcceptor() {
super(new DefaultSocketSessionConfig(), NioProcessor.class);
((DefaultSocketSessionConfig) getSessionConfig()).init(this);
}
這里的NioProcessor.class就是Processor的具體類型油湖。
protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) {
this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true, null);
}
SimpleIoProcessorPool是Processor的線程池,使用NioProcessor創(chuàng)建具體的線程领跛。
跳過Acceptor的初始化過程乏德,當(dāng)客戶端請(qǐng)求建立鏈接,服務(wù)端Acceptor線程會(huì)執(zhí)行以下代碼:
private void processHandles(Iterator<H> handles) throws Exception {
while (handles.hasNext()) {
H handle = handles.next();
handles.remove();
// Associates a new created connection to a processor,
// and get back a session
S session = accept(processor, handle); //這里的processor是processor線程池
if (session == null) {
continue;
}
initSession(session, null, null);
// add the session to the SocketIoProcessor
session.getProcessor().add(session);
}
}
@Override
protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {
SelectionKey key = null;
if (handle != null) {
key = handle.keyFor(selector);
}
if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {
return null;
}
// accept the connection from the client
SocketChannel ch = handle.accept();
if (ch == null) {
return null;
}
return new NioSocketSession(this, processor, ch);
}
這里創(chuàng)建了NioSocketSession將Processor線程池與SocketChannel綁定在一起。然后通過 session.getProcessor().add(session)將會(huì)話注冊(cè)到SimpleIoProcessorPool線程池中的一個(gè)Processor對(duì)象內(nèi)部的Selector對(duì)象喊括。
為什么這里的processor是線程池胧瓜?還記得NioSocketAcceptor的構(gòu)造函數(shù)中的SimpleIoProcessorPool,processor就是它的實(shí)例郑什。
看以下NioSocketSession的getProcessor()方法:
public IoProcessor<NioSession> getProcessor() {
return processor;
}
返回的就是與它關(guān)聯(lián)的SimpleIoProcessorPool線程池對(duì)象.再看SimpleIoProcessorPool的addI()方法:
public final void add(S session) {
getProcessor(session).add(session);
}
private IoProcessor<S> getProcessor(S session) {
IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR);
if (processor == null) {
if (disposed || disposing) {
throw new IllegalStateException("A disposed processor cannot be accessed.");
}
processor = pool[Math.abs((int) session.getId()) % pool.length];
if (processor == null) {
throw new IllegalStateException("A disposed processor cannot be accessed.");
}
session.setAttributeIfAbsent(PROCESSOR, processor);
}
return processor;
}
getProcessor()這個(gè)方法是SimpleIoProcessorPool中的府喳,負(fù)責(zé)根據(jù)Session返回一個(gè)與之關(guān)聯(lián)的Processor線程,這里用了session id對(duì)線程池中的線程總數(shù)取模的算法蘑拯。與Session關(guān)聯(lián)的Processor被添加到Session的Attribute中以便下次直接取出劫拢。
到這一部還沒有看到Session內(nèi)部的SocketChannel的IO事件是怎么注冊(cè)到Processor線程的Selector對(duì)象上的,繼續(xù)分析Processor的add()方法:
@Override
public final void add(S session) {
if (disposed || disposing) {
throw new IllegalStateException("Already disposed.");
}
// Adds the session to the newSession queue and starts the worker
newSessions.add(session);
startupProcessor();
}
private void startupProcessor() {
Processor processor = processorRef.get();
if (processor == null) {
processor = new Processor();
if (processorRef.compareAndSet(null, processor)) {
executor.execute(new NamePreservingRunnable(processor, threadName));
}
}
// Just stop the select() and start it again, so that the processor
// can be activated immediately.
wakeup();
}
//NamePreservingRunnable的run方法强胰,顯示給線程命名,然后執(zhí)行Processor的run方法妹沙。
public void run() {
Thread currentThread = Thread.currentThread();
String oldName = currentThread.getName();
if (newName != null) {
setName(currentThread, newName);
}
try {
runnable.run();
} finally {
setName(currentThread, oldName);
}
}
private class Processor implements Runnable {
public void run() {
assert (processorRef.get() == this);
int nSessions = 0;
lastIdleCheckTime = System.currentTimeMillis();
int nbTries = 10;
for (;;) {
try {
...
int selected = select(SELECT_TIMEOUT);
...
nSessions += handleNewSessions();
...
if (selected > 0) {
// LOG.debug("Processing ..."); // This log hurts one of
// the MDCFilter test...
process();
}
...
}
} catch (ClosedSelectorException cse) {
ExceptionMonitor.getInstance().exceptionCaught(cse);
break;
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
ExceptionMonitor.getInstance().exceptionCaught(e1);
}
}
}
}
}
這里開始顯露一些馬腳了偶洋,先是把session添加到newSessions這個(gè)隊(duì)列中。然后建立了Processor實(shí)例距糖,這就是具體的Processor線程玄窝。通過executor的execute()方法先是執(zhí)行了NamePreservingRunnable的run()方法,其內(nèi)部執(zhí)行了Processor的run()方法悍引。
執(zhí)行Processor的run()中的select()其實(shí)就是調(diào)用其內(nèi)部Selector對(duì)象的select()方法恩脂,會(huì)導(dǎo)致Processor線程的阻塞:
protected int select(long timeout) throws Exception {
return selector.select(timeout);
}
然后調(diào)用了Processor內(nèi)部的Selector對(duì)象的wakeup()方法,wakeup()這個(gè)方法是當(dāng)Selector對(duì)象執(zhí)行select()方法阻塞時(shí),立即返回趣斤。
@Override
protected void wakeup() {
wakeupCalled.getAndSet(true);
selector.wakeup();
}
于是后續(xù)就執(zhí)行了:
private int handleNewSessions() {
int addedSessions = 0;
for (S session = newSessions.poll(); session != null; session = newSessions.poll()) {
if (addNow(session)) {
// A new session has been created
addedSessions++;
}
}
return addedSessions;
}
private boolean addNow(S session) {
boolean registered = false;
try {
init(session);
registered = true;
// Build the filter chain of this session.
IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
chainBuilder.buildFilterChain(session.getFilterChain());
// DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
// in AbstractIoFilterChain.fireSessionOpened().
// Propagate the SESSION_CREATED event up to the chain
IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
listeners.fireSessionCreated(session);
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
try {
destroy(session);
} catch (Exception e1) {
ExceptionMonitor.getInstance().exceptionCaught(e1);
} finally {
registered = false;
}
}
return registered;
}
@Override
protected void init(NioSession session) throws Exception {
SelectableChannel ch = (SelectableChannel) session.getChannel();
ch.configureBlocking(false);
session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
}
到此終于理清了俩块,Processor線程先是阻塞的,由Acceptor線程把session添加到newSessions隊(duì)列浓领,然后通過wakeup將Processor從Selector對(duì)象的select()方法返回執(zhí)行到handleNewSessions()方法玉凯,此方法會(huì)取出newSessions隊(duì)列中的session然后通過addNow()方法執(zhí)行NioProcessor的init()方法,由init()方法將session中的Channel的OP_READ事件注冊(cè)到Selector對(duì)象上。
所以一個(gè)IoSession對(duì)應(yīng)的是一個(gè)Proceccor線程联贩,也是一個(gè)Selector對(duì)象漫仆,每個(gè)IoSession的讀取數(shù)據(jù)處理一定是同步的。
既然有讀就一定有寫泪幌,記得上述代碼中有一段:
private void processHandles(Iterator<H> handles) throws Exception {
while (handles.hasNext()) {
H handle = handles.next();
handles.remove();
// Associates a new created connection to a processor,
// and get back a session
S session = accept(processor, handle); //這里的processor是processor線程池
if (session == null) {
continue;
}
initSession(session, null, null);
// add the session to the SocketIoProcessor
session.getProcessor().add(session);
}
}
重點(diǎn)是initSession()方法:
protected final void initSession(IoSession session, IoFuture future, IoSessionInitializer sessionInitializer) {
...
((AbstractIoSession) session).setWriteRequestQueue(session.getService().getSessionDataStructureFactory()
.getWriteRequestQueue(session));
...
}
這里為session添加了WriteRequestQueue其實(shí)就是session的消息寫入隊(duì)列盲厌,當(dāng)session被暫停或者WriteRequestQueue隊(duì)列非空寫入的消息會(huì)添加到這個(gè)隊(duì)列里:
if (!s.isWriteSuspended()) {
if (writeRequestQueue.isEmpty(session)) {
// We can write directly the message
s.getProcessor().write(s, writeRequest);
} else {
s.getWriteRequestQueue().offer(s, writeRequest);
s.getProcessor().flush(s);
}
} else {
s.getWriteRequestQueue().offer(s, writeRequest);
}
而如果隊(duì)列是空的則會(huì)執(zhí)行write()方法祸泪,其實(shí)也是將寫入請(qǐng)求插入隊(duì)列然后直接執(zhí)行flush()方法吗浩。
@Override
public void write(S session, WriteRequest writeRequest) {
WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
writeRequestQueue.offer(session, writeRequest);
if (!session.isWriteSuspended()) {
this.flush(session);
}
}
flush()方法會(huì)在flushingSessions隊(duì)列添加session并通過wakeup()方法將Processor線程從阻塞中恢復(fù):
@Override
public final void flush(S session) {
// add the session to the queue if it's not already
// in the queue, then wake up the select()
if (session.setScheduledForFlush(true)) {
flushingSessions.add(session);
wakeup();
}
}
在Processor線程中會(huì)執(zhí)行flush(long currentTime)方法,依次取出隊(duì)列的每個(gè)session,注意這里的隊(duì)列是ConcurrentLinkedQueue浴滴,所以不管在任何線程調(diào)用IoSession的write()方法寫入消息拓萌,最終都會(huì)同步的插入到這個(gè)隊(duì)列。
通過flushNow(session, currentTime)方法先是取出session的WriteRequestQueue隊(duì)列(每個(gè)session都有一個(gè)寫入消息的同步隊(duì)列)升略,然后依次取出其中的寫消息請(qǐng)求微王,然后調(diào)用writeBuffer(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)屡限,最終調(diào)用write(NioSession session, IoBuffer buf, int length) 通過session關(guān)聯(lián)的Channel的write()方法將字節(jié)流發(fā)送。由于代碼過多只貼出最終部分:
@Override
protected int write(NioSession session, IoBuffer buf, int length) throws IOException {
if (buf.remaining() <= length) {
return session.getChannel().write(buf.buf());
}
int oldLimit = buf.limit();
buf.limit(buf.position() + length);
try {
return session.getChannel().write(buf.buf());
} finally {
buf.limit(oldLimit);
}
}
到此分析Processor線程讀寫終于結(jié)束了炕倘,可以得出結(jié)論钧大,會(huì)話的讀寫都是在Processor線程池中的一個(gè)Processor線程執(zhí)行的。其中讀消息是按事件順序依次完成的罩旋,寫消息可以由多個(gè)線程同時(shí)寫啊央,但是寫入的請(qǐng)求一定是同步地插入到Session地寫消息隊(duì)列中,然后由Processor線程按順序依次完成發(fā)送涨醋。擔(dān)心Mina框架讀寫的并發(fā)問題可以打住了瓜饥。