前言
?? 在之前的文章中春锋,我們已經(jīng)知道了netty中channel創(chuàng)建及注冊(cè):這個(gè)過程是connect方法(client端)或者bind方法(server端)所做的第一件事期奔,體現(xiàn)在initAndRegister
方法中馁痴,在這之后還需要完成一些操作以實(shí)現(xiàn)connect肺孤。我們先從client端開始赠堵。
private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
final ChannelPromise promise = channel.newPromise();
if (regFuture.isDone()) {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
} else {
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
}
});
}
return promise;
}
客戶端connect
??initAndRegister
會(huì)返回一個(gè)ChannelFuture對(duì)象粤铭,注冊(cè)邏輯會(huì)提交給對(duì)應(yīng)EventLoop來異步的執(zhí)行,而通過這個(gè)ChannelFuture實(shí)例我們就可以判斷異步任務(wù)的執(zhí)行狀態(tài)吗垮。由于是異步任務(wù)烁登,所以它是否已經(jīng)執(zhí)行完畢不得知饵沧,所以通過ChannelFuture判斷任務(wù)(注冊(cè)任務(wù))是否執(zhí)行完畢,如果沒有執(zhí)行完畢就為其添加一個(gè)監(jiān)聽回調(diào)羡儿,回調(diào)時(shí)機(jī)發(fā)生在任務(wù)結(jié)束掠归。當(dāng)任務(wù)完成后虏冻,開始執(zhí)行doConnect0方法厨相。并返回一個(gè)新的ChannelFuture實(shí)例鸥鹉,順便提一下通過這里的regFuture和promise绪撵,我們也可以看出netty中存在大量的異步處理方式祝蝠。
private static void doConnect0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
if (localAddress == null) {
channel.connect(remoteAddress, promise);
} else {
channel.connect(remoteAddress, localAddress, promise);
}
promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
??通過代碼细溅,我們看到喇聊,通道的連接操作又是作為一個(gè)異步任務(wù)交于channel所注冊(cè)的EventLoop來執(zhí)行誓篱,前提條件是注冊(cè)任務(wù)必須已經(jīng)成功完成了窜骄。在客戶端邻遏,一般沒有執(zhí)行l(wèi)ocalAddress赎线,所以我們繼續(xù)跟蹤channel.connect(remoteAddress, promise)垂寥,發(fā)現(xiàn)矫废,channel的connect操作由pipeline來實(shí)現(xiàn),這次與之前不同的是台舱,它調(diào)用了connect操作竞惋,完成出站處理器在流水線上的執(zhí)行拆宛,與入站從頭開始不同浑厚,出站操作connect是從尾部開始的。與入站相似物蝙,會(huì)依次找到下一個(gè)出站處理器诬乞,回調(diào)其中的connect方法(這里大家可以調(diào)試看一下震嫉,不在贅述)责掏,最終pipeline的流程會(huì)到達(dá)頭結(jié)點(diǎn)。
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, promise);
}
@Override
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
}
↓頭結(jié)點(diǎn)負(fù)責(zé)完成客戶端連接的代碼↓
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
??在頭結(jié)點(diǎn)中,調(diào)用了一個(gè)unsafe實(shí)例的connect方法叫潦。重點(diǎn)關(guān)注doConnect方法矗蕊。
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
/*忽略*/
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
/*忽略*/
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
// NioSocketChannel類中
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
doBind0(localAddress);
}
boolean success = false;
try {
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress)
throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws IOException {
return socketChannel.connect(remoteAddress);
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
??通過SocketUtils的connect方法卿操,我們可以看到害淤,底層借助NIO的SocketChannel進(jìn)行連接窥摄。而由于連接不會(huì)立即成功崭放,所以一般不會(huì)返回true,因此connected為false值骇,則會(huì)執(zhí)行下面這行代碼莹菱,注冊(cè)NIO連接事件。
selectionKey().interestOps(SelectionKey.OP_CONNECT);
??由于配置了連接事件吱瘩,所以當(dāng)?shù)讓舆B接建立好之后道伟,后續(xù)的邏輯處理在哪里呢?還記得NioEventLoop里面的run方法吧使碾。代碼在這里再貼一下蜜徽。
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
當(dāng)連接建立好后,會(huì)通過processSelectedKeys
方法處理連接事件票摇。最終會(huì)執(zhí)行到這樣一段在之前見到過的代碼灰蛙。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop != this || eventLoop == null) {
return;
}
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
當(dāng)接收到連接事件時(shí)會(huì)取消掉連接事件的注冊(cè)。隨后調(diào)用了unsafe.finishConnect()完成連接后的處理组力,finishConnect中調(diào)用了fulfillConnectPromise(connectPromise, wasActive)方法。
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
}
// 當(dāng)連接建立后脱柱,底層的socketChannl打開并建立好連接随闺,active返回為true
boolean active = isActive();
// 修改異步執(zhí)行狀態(tài)
boolean promiseSet = promise.trySuccess();
if (!wasActive && active) {
// 流水線從頭逐個(gè)回調(diào)入站的channelActive方法散罕。
pipeline().fireChannelActive();
}
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
if (!promiseSet) {
close(voidPromise());
}
}
隨后缚甩,pipeline().fireChannelActive()就開始從流水線頭部回調(diào)channelActive方法。
// 頭部節(jié)點(diǎn)HeadContext的channelActive方法派继。
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
頭部節(jié)點(diǎn)會(huì)首先讓流水線上的channelActive回調(diào)繼續(xù)下去(在Echo Server這個(gè)例子中绅络,EchoClientHandler的channelActive方法也會(huì)執(zhí)行),當(dāng)所有的channelActive回調(diào)完成后灭袁,調(diào)用readIfIsAutoRead方法從流水線尾部開始逐個(gè)回調(diào)read方法(這里省略了一些步驟软瞎,大家可以自行查看)芙代。最終read回調(diào)又會(huì)到達(dá)頭結(jié)點(diǎn)裹驰。
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
@Override
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
if (inputShutdown) {
return;
}
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
在頭部節(jié)點(diǎn)調(diào)用了unsafe.beginRead()整葡,隨后又調(diào)用doBeginRead旬渠,可以發(fā)現(xiàn)腥寇,在doBeginRead中赢赊,注冊(cè)了readInterestOp事件。而readInterestOp所代表的的事件就是在生成channel時(shí)傳入的讀事件涩蜘。因此在這里是完成了讀事件的注冊(cè)嚼贡。
服務(wù)端bind
??分析了客戶端后误窖,服務(wù)端也就比較好去分析了。服務(wù)端在bind執(zhí)行后,會(huì)先去調(diào)用initAndRegister完成NioServerSocketChannel向父循環(huán)組中的時(shí)間循環(huán)的注冊(cè),但是再注冊(cè)的時(shí)候并沒有注冊(cè)有效的事件僻爽。注冊(cè)后依次經(jīng)歷下面幾個(gè)方法:doBind0 --> channel.bind --> pipeline.bind。pipeline的bind方法又會(huì)從尾部依次調(diào)用流水線上的出站處理器bind回調(diào)方法杰刽,一直延續(xù)到頭結(jié)點(diǎn)菠发。頭結(jié)點(diǎn)又調(diào)用unsafe.bind()王滤。在unsafe.bind()中,doBind借助serverSocketChannel.bind方法完成綁定滓鸠。綁定操作就此結(jié)束雁乡。隨后如同客戶端在借助SocketChannel完成connect后會(huì)發(fā)出pipeline.fireChannelActive()一樣,server端在綁定結(jié)束后也會(huì)進(jìn)行流水線上channelActive的回調(diào)糜俗□馍裕回調(diào)從頭結(jié)點(diǎn)開始,這就跟client端很相似悠抹。但不同之處在于珠月,客戶端的頭結(jié)點(diǎn)在fireChannelRead后的readIfIsAutoRead會(huì)將讀事件注冊(cè),而在server端楔敌,由于在創(chuàng)建NioServerSocketChannel時(shí)傳入的readInterestOp為accept事件啤挎,因此在通道激活active后,為NioServerSocketChannel中的ServerSocketChannel注冊(cè)了接受連接Accept事件卵凑。
總結(jié)
??我們綜合前面的文章以及本文庆聘,來總結(jié)一下connect和bind方法背后的邏輯。兩者首先都進(jìn)行了通道(NioSocketChannel或NioServerSocketChannel)的創(chuàng)建和注冊(cè)勺卢,注冊(cè)的過程只是把其中封裝的SocketChannel或者ServerSocketChannel注冊(cè)到對(duì)應(yīng)的NioEventLoop的selector中伙判,并沒有實(shí)際注冊(cè)什么有效事件。當(dāng)通道完成注冊(cè)后黑忱,添加到流水線上的handler的handlerAdded方法才會(huì)被回調(diào)(而通道注冊(cè)完成后宴抚,再向流水線添加handler時(shí),其handlerAdded方法會(huì)立即回調(diào))甫煞。隨后流水線調(diào)用fireChannelRegistered菇曲。當(dāng)具體通道的連接或者綁定操作完成后,流水線又會(huì)調(diào)用fireChannelActive方法危虱,表明通道已經(jīng)激活羊娃。通道激活并且channelActive回調(diào)都執(zhí)行完成后,客戶端注冊(cè)了讀事件而服務(wù)端注冊(cè)了accept事件埃跷。
??
*鏈接
1. Netty解析:第一個(gè)demo——Echo Server
2. Netty解析:NioEventLoopGroup事件循環(huán)組
3. Netty解析:NioSocketChannel蕊玷、NioServerSocketChannel的創(chuàng)建及注冊(cè)
4. Netty解析:Handler、Pipeline大動(dòng)脈及其在注冊(cè)過程中體現(xiàn)
5. Netty解析:connect/bind方法背后
6. Netty解析:服務(wù)端如何接受連接并后續(xù)處理讀寫事件