補充InBond事件
通過在上一篇分析outbound事件已經(jīng)兩者的關(guān)系,在來分析inbound就比較簡單了爽篷,作為outbound的鏡像事件它是怎么運行的呢联贩?
Inbound 的特點是它傳播方向是 head -> customContext -> tail
方法中的代碼片段中我們只分析了doConnect這個方法,而且也已經(jīng)知道doConnect方法是最后是通過java nio中socket與服務(wù)端建立鏈接钓辆,那么再想想缕溉,鏈接成功后呢能颁,是不是要通知客戶端
已經(jīng)鏈接成功了呢,所以這里返回true表示鏈接成功倒淫,然后通過fulfillConnectPromise傳遞鏈接成功的事件伙菊,再結(jié)合官網(wǎng)的pipeline流轉(zhuǎn)圖我們推測這里應(yīng)該是將連接成功事件綁定到inbound
上進行傳播,那么是不是這樣的呢敌土?我們結(jié)合代碼來
AbstractNioChannel中connect方法片段
//doConnect這里的實現(xiàn)在子類NioSocketChannel中
if (doConnect(remoteAddress, localAddress)) {
//fulfillConnectPromise方法再鏈接后通知事件
fulfillConnectPromise(promise, wasActive);
} else {
//略
}
下面是AbstractNioChannel的fulfillConnectPromise具體如下镜硕,
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
}
// Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
// We still need to ensure we call fireChannelActive() in this case.
boolean active = isActive();
// trySuccess() will return false if a user cancelled the connection attempt.
boolean promiseSet = promise.trySuccess();
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
//翻譯九四無論連接嘗試是否被取消,都應(yīng)觸發(fā)channelActive()事件返干,因為發(fā)生的事情就是發(fā)生了什么兴枯。
if (!wasActive && active) {
pipeline().fireChannelActive();
}
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
if (!promiseSet) {
close(voidPromise());
}
}
其中關(guān)鍵就在pipeline().fireChannelActive();這個方法中,
繼續(xù)fireChannelActive實現(xiàn)矩欠,根據(jù)前面的文章分析财剖,再創(chuàng)建channel的時候會創(chuàng)建一個DefaultChannelPipeline悠夯,所以這里的fireChannelActive就在DefaultChannelPipeline中
@Override
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
- 注意到invokeChannelActive的入?yún)閔ead,所以從側(cè)面也印證了inbound事件從head開始
繼續(xù)查看源碼
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
}
這個方法很熟悉有木有躺坟,和之前分析outbound一樣沦补,不過這里的next就是head了。且invokeChannelActive的實現(xiàn)在AbstractChannelHandlerContext中
private void invokeChannelActive() {
//判斷是否已添加haddler
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelActive();
}
}
- 這里就和之前outbound分析相同了咪橙,唯一不同就是獲取的handler()方法獲取返回的是head
那么我們分析head的channelActive(其實else中的fireChannelActive也會涉及),我們從HeadContext找到該方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
- 看到這ctx.fireChannelActive();這里fireChannelActive();和上面else里的是同一個均屬于AbstractChannelHandlerContext類下(.channelActive(this)中的this入?yún)⒕褪茿bstractChannelHandlerContext本身)
所以分析AbstractChannelHandlerContext方法下的fireChannelActive就可以了
@Override
public ChannelHandlerContext fireChannelActive() {
invokeChannelActive(findContextInbound());
return this;
}
- 看到invokeChannelActive(findContextInbound());這個方法和之前outbound類似夕膀,不過這里區(qū)別還是有的,findContextInbound找到一個可用的inbound handler.這里就是
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
繼續(xù)追蹤會看執(zhí)行會從.channelActive(this)->HeadContext的channelActive(ChannelHandlerContext ctx)->AbstractChannelHandlerContext的ChannelHandlerContext fireChannelActive()
->>AbstractChannelHandlerContext的invokeChannelActive(final AbstractChannelHandlerContext next)->再次回到AbstractChannelHandlerContext的 invokeChannelActive()
總而言之就是inbound事件過來從headContext(inboud和outbound接口均實現(xiàn))沿著inboud流向自定義inboundHandler依次執(zhí)行channelActive通知美侦。最后到TailContext(實現(xiàn)的是一個空方法channelActive)产舞。
所以我們寫個inbound的handler并重寫channelActive。當(dāng)和服務(wù)端建立鏈接成功后菠剩,寫入數(shù)據(jù)到服務(wù)端易猫。下面的代碼是不是就很清楚了
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
private final ByteBuf firstMessage;
/**
* Creates a client-side handler.
*/
public EchoClientHandler() {
firstMessage = Unpooled.buffer(EchoClient.SIZE);
for (int i = 0; i < firstMessage.capacity(); i ++) {
firstMessage.writeByte((byte) i);
}
}
//當(dāng)和服務(wù)端建立鏈接成功后,寫入數(shù)據(jù)到pipeline(包含這ChannelHandlerContext的鏈表)
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(firstMessage);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
- 如果你懷疑這個writeAndFlush是否傳遞的是outbound事件具壮,其實可以追蹤源碼准颓,會找到AbstractChannelHandlerContext next = findContextOutbound();最后是nio的write
就此其他的事件同理可自行分析
最后借用網(wǎng)友的總結(jié)兩種事件:
對于 Outbound事件:
Outbound 事件是請求事件(由 Connect 發(fā)起一個請求, 并最終由 unsafe 處理這個請求)
Outbound 事件的發(fā)起者是 Channel
Outbound 事件的處理者是 unsafe
Outbound 事件在 Pipeline 中的傳輸方向是 tail -> head.
在 ChannelHandler 中處理事件時, 如果這個 Handler 不是最后一個 Hnalder, 則需要調(diào)用 ctx.xxx (例如 ctx.connect) 將此事件繼續(xù)傳播下去. 如果不這樣做, 那么此事件的傳播會提前終止.
Outbound 事件流: Context.OUT_EVT -> Connect.findContextOutbound -> nextContext.invokeOUT_EVT -> nextHandler.OUT_EVT -> nextContext.OUT_EVT
對于 Inbound 事件:
Inbound 事件是通知事件, 當(dāng)某件事情已經(jīng)就緒后, 通知上層.
Inbound 事件發(fā)起者是 unsafe
Inbound 事件的處理者是 Channel, 如果用戶沒有實現(xiàn)自定義的處理方法, 那么Inbound 事件默認的處理者是 TailContext, 并且其處理方法是空實現(xiàn).
Inbound 事件在 Pipeline 中傳輸方向是 head -> tail
在 ChannelHandler 中處理事件時, 如果這個 Handler 不是最后一個 Hnalder, 則需要調(diào)用 ctx.fireIN_EVT (例如 ctx.fireChannelActive) 將此事件繼續(xù)傳播下去. 如果不這樣做, 那么此事件的傳播會提前終止.
Inbound 事件流: Context.fireIN_EVT -> Connect.findContextInbound -> nextContext.invokeIN_EVT -> nextHandler.IN_EVT -> nextContext.fireIN_EVT