如何使用
1.我們構(gòu)造netty服務(wù)端的時(shí)候,在childHandler里,先獲取到pipeline,然后p.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
p.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
public IdleStateHandler(long readerIdleTime, long writerIdleTime,
long allIdleTime, TimeUnit unit) {
this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
2.我們還需要寫(xiě)一個(gè)handler,來(lái)實(shí)現(xiàn)超時(shí)后需要做的事.netty把超時(shí)和超時(shí)后任務(wù)的觸發(fā)解耦了.(這是不是觀察者模式的具體應(yīng)用呢?)
HeartBeatServerHandler extends ChannelInboundHandlerAdapter{
//沒(méi)有并發(fā)問(wèn)題,因?yàn)槊總€(gè)連接都會(huì)單獨(dú)new一個(gè)HeartBeatServerHandler對(duì)象.
private int lossConnectCount = 0;
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("已經(jīng)5秒未收到客戶端的消息了!");
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
lossConnectCount++;
if (lossConnectCount > 2) {
System.out.println("關(guān)閉這個(gè)不活躍通道鼠锈!");
ctx.channel().close();
}
}
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//重置計(jì)數(shù)器
lossConnectCount = 0;
System.out.println("client says: " + msg.toString());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
類注釋
Triggers an IdleStateEvent when a Channel has not performed read, write, or both operation for a while.
(觸發(fā)一個(gè)IdleStateEvent當(dāng)一個(gè)Channel一段時(shí)間內(nèi)沒(méi)有進(jìn)行讀或者寫(xiě),或者讀寫(xiě))
一些問(wèn)題
假設(shè)netty里沒(méi)有這個(gè)相關(guān)的功能,需要我們自己設(shè)計(jì)一個(gè)IdleStateHandler,該怎么做呢?
需求分析:當(dāng)客戶端和服務(wù)端建立連接的時(shí)候,如果客戶端一段時(shí)間沒(méi)有操作,讀或者寫(xiě),那么我們就可以自定義的進(jìn)行一些操作.
問(wèn)題1.初始化該執(zhí)行什么操作?
問(wèn)題2.如何判斷超時(shí)?
問(wèn)題3.讀寫(xiě)如何分開(kāi)判斷?
問(wèn)題4.判斷超時(shí)該用哪個(gè)線程?netty的io線程還是自定義線程?
我們帶著這些問(wèn)題來(lái)看看netty的設(shè)計(jì).
netty的設(shè)計(jì)
我們先看一下整體的流程.
1.當(dāng)我們new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS)時(shí),發(fā)生了什么?
這個(gè)就是把handler加入到pipeline里,后面有io事件觸發(fā)時(shí),就被handler攔截.然后這個(gè)構(gòu)造器里會(huì)初始化一些值.
核心的就是三個(gè)時(shí)間:
readerIdleTimeNanos
writerIdleTimeNanos
allIdleTimeNanos
public IdleStateHandler(boolean observeOutput,
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
ObjectUtil.checkNotNull(unit, "unit");
this.observeOutput = observeOutput;
//會(huì)有個(gè)和最小時(shí)間比較的邏輯.
if (readerIdleTime <= 0) {
readerIdleTimeNanos = 0;
} else {
readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
}
if (writerIdleTime <= 0) {
writerIdleTimeNanos = 0;
} else {
writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
}
if (allIdleTime <= 0) {
allIdleTimeNanos = 0;
} else {
allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
}
}
2.客戶端第一次創(chuàng)建連接的時(shí)候發(fā)生了什么?
//IdleStateHandler的channelActive()方法在socket通道建立時(shí)被觸發(fā)
//ctx的傳遞要搞清楚
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// This method will be invoked only if this handler was added
// before channelActive() event is fired. If a user adds this handler
// after the channelActive() event, initialize() will be called by beforeAdd().
initialize(ctx);//schedule
super.channelActive(ctx);//傳播事件
}
這里相當(dāng)于一個(gè)任務(wù)的生產(chǎn)者.任務(wù)的執(zhí)行就委托給io線程了.具體看SingleThreadEventExecutor里面的邏輯
//重要的入口,會(huì)開(kāi)啟定時(shí)任務(wù)
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
switch (state) {
case 1:
case 2:
return;
}
state = 1;
initOutputChanged(ctx);
//
lastReadTime = lastWriteTime = ticksInNanos();
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
獲取execut然后執(zhí)行schedule.
ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
//從cts獲取執(zhí)行器.然后調(diào)度.
return ctx.executor().schedule(task, delay, unit);
}
AbstractScheduledEventExecutor里.
//調(diào)度.
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
//當(dāng)前的線程是否eventLoop里的線程.
if (inEventLoop()) {
scheduleFromEventLoop(task);
} else {//什么時(shí)候會(huì)出現(xiàn)執(zhí)行的線程跟綁定的線程不一致呢?
//獲取deadline
final long deadlineNanos = task.deadlineNanos();
// task will add itself to scheduled task queue when run if not expired
if (beforeScheduledTaskSubmitted(deadlineNanos)) {
execute(task);
} else {
lazyExecute(task);
// Second hook after scheduling to facilitate race-avoidance
if (afterScheduledTaskSubmitted(deadlineNanos)) {
execute(WAKEUP_TASK);
}
}
}
return task;
}
就到了io線程的
IdleStateHandler
這個(gè)類繼承了ChannelDuplexHandler.(這個(gè)類細(xì)節(jié)比較多,后面單獨(dú)拎出來(lái)講一下)
ChannelHandler implementation which represents a combination out of a ChannelInboundHandler and the ChannelOutboundHandler. It is a good starting point if your ChannelHandler implementation needs to intercept operations and also state updates.
內(nèi)部類
AbstractIdleTask
private abstract static class AbstractIdleTask implements Runnable {
//需要ctx來(lái)獲取executor.
private final ChannelHandlerContext ctx;
AbstractIdleTask(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
//模板的run方法.
@Override
public void run() {
if (!ctx.channel().isOpen()) {
return;
}
run(ctx);
}
protected abstract void run(ChannelHandlerContext ctx);
}
下面的三個(gè)類無(wú)非就是實(shí)現(xiàn)自己的run方法.把變化的點(diǎn)抽象出來(lái),由子類來(lái)實(shí)現(xiàn).其實(shí)也就是初始化的時(shí)間的不同,其他都是一樣的.當(dāng)chanelIdle的時(shí)候,就會(huì)fireUserEventTriggered,這時(shí)候就完成一次超時(shí)的處理了.
AllIdleTimeoutTask
@Override
protected void run(ChannelHandlerContext ctx) {
long nextDelay = allIdleTimeNanos;
if (!reading) {
nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
}
if (nextDelay <= 0) {
// Both reader and writer are idle - set a new timeout and
// notify the callback.
allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstAllIdleEvent;
firstAllIdleEvent = false;
try {
if (hasOutputChanged(ctx, first)) {
return;
}
IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Either read or write occurred before the timeout - set a new
// timeout with shorter delay.
allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
ReaderIdleTimeoutTask
@Override
protected void run(ChannelHandlerContext ctx) {
//下一次超時(shí)的時(shí)間.
long nextDelay = readerIdleTimeNanos;
//剛開(kāi)始讀的時(shí)候是true,讀完變成false.
if (!reading) {
//下一次超時(shí)的時(shí)間減去當(dāng)前時(shí)間和上一次讀的時(shí)間差. why?
nextDelay -= (ticksInNanos() - lastReadTime);
}
//如果小于0,已經(jīng)超時(shí)了
if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback.
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
//
boolean first = firstReaderIdleEvent;
//
firstReaderIdleEvent = false;
//
try {
//創(chuàng)建一個(gè)event
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
channel空閑了.fireUserEventTriggered,而我們的業(yè)務(wù)處理剛好實(shí)現(xiàn)了這個(gè)方法.
/**
* Is called when an {@link IdleStateEvent} should be fired. This implementation calls
* {@link ChannelHandlerContext#fireUserEventTriggered(Object)}.
*/
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
//see.
ctx.fireUserEventTriggered(evt);
}
WriterIdleTimeoutTask
@Override
protected void run(ChannelHandlerContext ctx) {
long lastWriteTime = IdleStateHandler.this.lastWriteTime;
long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
//已經(jīng)超時(shí).
if (nextDelay <= 0) {
// Writer is idle - set a new timeout and notify the callback.
writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstWriterIdleEvent;
firstWriterIdleEvent = false;
try {
if (hasOutputChanged(ctx, first)) {
return;
}
//
IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
//已經(jīng)idle了.
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Write occurred before the timeout - set a new timeout with shorter delay.
writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
ReadTimeoutHandler
ReadTimeoutHandler extends IdleStateHandler
讀超時(shí)的handler,會(huì)報(bào)ReadTimeoutException的錯(cuò),當(dāng)一定時(shí)間內(nèi)沒(méi)有讀到數(shù)據(jù).
WriteTimeoutHandler
當(dāng)寫(xiě)操作不能在一定的時(shí)間完成的化,報(bào)WriteTimeoutException錯(cuò).
這個(gè)類并沒(méi)有繼承IdleStateHandler,就不在這里講了,有興趣的可以去看看,也很簡(jiǎn)單.
IdleStateEvent
可以理解為netty內(nèi)部把這個(gè)空閑狀態(tài)事件封裝好了,傳個(gè)最終的業(yè)務(wù)調(diào)用方法.
源碼沒(méi)什么特殊的邏輯就不貼了.
IdleState
簡(jiǎn)單的枚舉值.
public enum IdleState {
/**
* No data was received for a while.
*/
READER_IDLE,
/**
* No data was sent for a while.
*/
WRITER_IDLE,
/**
* No data was either received or sent for a while.
*/
ALL_IDLE
}