上篇文章中已經(jīng)介紹了pipline相關(guān)的內(nèi)容pipline,如果對這部分內(nèi)容比較熟悉的話,理解這部分內(nèi)容就很簡單了袭祟。為了容易說明遏插,還是把上一節(jié)的demo程序先放到這里捂贿。
public void start() {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new InBoundHandler1());
ch.pipeline().addLast(new InBoundHandler2());
ch.pipeline().addLast(new InBoundHandler3());
}
});
....
}
服務(wù)端能夠接收消息的前提是已經(jīng)和客戶端建立一個channel通道,想要了解這個channel怎么建立的可以參考這篇文章Netty源碼--accept連接胳嘲,這里不再贅述厂僧。
通道建立后,是在NioEventLoop類中監(jiān)聽這個channel的讀寫事件了牛,具體過程之前已經(jīng)在這篇文章Netty源碼--accept連接中分析颜屠,這里直接跳到processSelectedKey這個方法的實現(xiàn):
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
在這個方法中辰妙,當(dāng)監(jiān)聽到讀事件后,會調(diào)用unsafe的read方法甫窟,那么就看下這個unsafe的具體類型是啥密浑。
AbstractNioChannel.NioUnsafe unsafe = ch.unsafe()這句代碼返回了一個NioUnsafe對象,NioUnsafe 是一個接口粗井,具體實現(xiàn)類主要有兩個NioByteUnsafe和NioMessageUnsafe尔破。由于這里的unsafe是通過調(diào)用ch.unsafe生成的,ch具體類型是NioSocketChannel浇衬,通過追溯代碼這個unsafe是在NioSocketChannel的構(gòu)造函數(shù)中通過調(diào)用這個類的newUnsafe方法初始化的懒构。
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioSocketChannelUnsafe();
}
private final class NioSocketChannelUnsafe extends NioByteUnsafe {
從上面代碼可以看到,這個unsafe是一個NioByteUnsafe類型的耘擂,因此監(jiān)聽到讀事件后調(diào)用的unsafe.read()這個方法具體實現(xiàn)就是在NioByteUnsafe這個類中胆剧。
@Override
public final void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
這個方法首先調(diào)用doReadBytes這個方法讀取數(shù)據(jù)到ByteBuf中,然后調(diào)用 pipeline.fireChannelRead(byteBuf)將ByteBuf中數(shù)據(jù)發(fā)送到pipeline中保存的第一個handler中醉冤,看下具體調(diào)用過程秩霍。
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
首先調(diào)用DedaultChannelPipline類中的fireChannelRead方法,在這個方法中調(diào)用了AbstractChannelHandlerContext這個類的invokeChannelRead方法冤灾,并將DedaultChannelPipline的指向鏈表首節(jié)點的head指針作為這個方法的參數(shù)傳遞進(jìn)去前域。
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
});
}
}
最終調(diào)用next.invokeChannelRead(m)方法,handler()返回的是HeadContext類韵吨,看下這個類中invokeChannelRead方法的實現(xiàn)匿垄。
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
然后調(diào)用ctx.fireChannelRead(msg)這個方法,其實選擇channel的邏輯主要在這個方法實現(xiàn)归粉。
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
findContextInbound這個方法其實返回的就是DefaultChannelPipeline中鏈表中下一個需要處理的channelHandler椿疗,通過這個方法使消息能夠在多個channelHandler傳遞。選擇好下一個channelHandler所對應(yīng)的AbstractChannelHandlerContext類后糠悼,調(diào)用invokeChannelRead(final AbstractChannelHandlerContext next, Object msg)方法届榄。
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
主要看這句 ((ChannelInboundHandler) handler()).channelRead(this, msg),handler()返回的是當(dāng)前AbstractChannelHandlerContext 對應(yīng)的channelHandler倔喂,這個channelHandler其實就是我們在demo程序中初始化時添加的InBoundHandler1铝条、InBoundHandler2、InBoundHandler3席噩。這三個類都繼承ChannelInboundHandlerAdapter班缰,實現(xiàn)了channelRead方法,這樣我們就可以在這個channelRead方法根據(jù)自己的協(xié)議以及業(yè)務(wù)特點悼枢,對數(shù)據(jù)做特定的處理埠忘,這也是netty作為一個網(wǎng)絡(luò)通信框架非常靈活的一點。