在節(jié)點(diǎn)通信時(shí)驼仪,經(jīng)常需要心跳機(jī)制來探測對(duì)方是否是存活的。在netty中用押,IdleStateHandler就能提供這種心跳檢測功能尊勿。讓我們先看看例子:
服務(wù)端添加的handler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//添加IdleStateHandler编曼,5s檢測一次讀事件
p.addLast(new IdleStateHandler(5, 0, 0));
p.addLast(new TimeServerHandler());
p.addLast(new HeartBeatServerHandler());
}
public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {
int lossConnectCount = 0;
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent)evt;
if (event.state()== IdleState.READER_IDLE){
lossConnectCount++;
if (lossConnectCount>2){
System.out.println("關(guān)閉這個(gè)不活躍通道豆巨!");
ctx.channel().close();
}
}
}else {
super.userEventTriggered(ctx,evt);
}
}
}
服務(wù)端我們添加一個(gè)IdleStateHandler,若5s內(nèi)無讀事件則觸發(fā)心跳處理方法HeartBeatServerHandler#userEventTriggered掐场,若連續(xù)2次無讀事件往扔,則關(guān)閉這個(gè)客戶端channel贩猎。
客戶端:
.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));
ch.pipeline().addLast(new TimeClientHandler());
}
});
在看IdleStateHandler實(shí)現(xiàn)前,我們先想一下如果自己實(shí)現(xiàn)的話會(huì)怎么實(shí)現(xiàn)呢瓤球?
首先融欧,IdleStateHandler在每個(gè)客戶端接入時(shí),會(huì)生成一個(gè)對(duì)象卦羡。同時(shí)要初始化做一些事噪馏,比如對(duì)讀或?qū)懯录M(jìn)行定時(shí)判斷,看在指定的時(shí)間內(nèi)有沒有感興趣的讀/寫事件绿饵,沒有則觸發(fā)事件欠肾,做一些自定義的事情。同時(shí)上次讀/寫事件完成后需更新時(shí)間拟赊,以便定時(shí)任務(wù)能及時(shí)感知刺桃。定時(shí)判斷可以利用EventLoop父類自帶的schedule調(diào)度方法, 更新上次讀/寫事件完成后的時(shí)間需要監(jiān)聽讀完成吸祟、寫完成事件瑟慈,需要實(shí)現(xiàn)入站、出站接口屋匕。
讓我們看看netty中的實(shí)現(xiàn):
public class IdleStateHandler extends ChannelDuplexHandler
繼承了ChannelDuplexHandler類葛碧,此類實(shí)現(xiàn)了入站、出站接口过吻。繼續(xù)看構(gòu)造方法:
public IdleStateHandler(boolean observeOutput,
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
if (unit == null) {
throw new NullPointerException("unit");
}
this.observeOutput = observeOutput;
if (readerIdleTime <= 0) {
readerIdleTimeNanos = 0;
} else {
readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
}
if (writerIdleTime <= 0) {
writerIdleTimeNanos = 0;
} else {
writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
}
if (allIdleTime <= 0) {
allIdleTimeNanos = 0;
} else {
allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
}
}
初始化了讀进泼、寫、讀或?qū)懙某瑫r(shí)參數(shù)纤虽。再看看初始化調(diào)度方法:
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
//state: 0 - none, 1 - initialized, 2 - destroyed
switch (state) {
case 1:
case 2:
return;
}
state = 1;
initOutputChanged(ctx);
lastReadTime = lastWriteTime = ticksInNanos();
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
initialize方法在addLast -> handlerAdded中會(huì)調(diào)用乳绕。比如我們?cè)O(shè)置了檢測讀事件,那readerIdleTimeNanos>0逼纸,會(huì)執(zhí)行schedule方法洋措。這里會(huì)傳一個(gè)ReaderIdleTimeoutTask對(duì)象過去,讓我們先看看ReaderIdleTimeoutTask做了什么:
private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
}
@Override
protected void run(ChannelHandlerContext ctx) {
//讀超時(shí)時(shí)間
long nextDelay = readerIdleTimeNanos;
//如果還在讀時(shí)間進(jìn)行中則不進(jìn)行判斷
if (!reading) {
//判斷當(dāng)前時(shí)間-上次讀時(shí)間是否大于超時(shí)時(shí)間
nextDelay -= ticksInNanos() - lastReadTime;
}
if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback.
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try {
//觸發(fā)fireUserEventTriggered方法
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
private abstract static class AbstractIdleTask implements Runnable {
private final ChannelHandlerContext ctx;
AbstractIdleTask(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
if (!ctx.channel().isOpen()) {
return;
}
//調(diào)用子類的run方法
run(ctx);
}
protected abstract void run(ChannelHandlerContext ctx);
}
再看看schedule方法:
ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
return ctx.executor().schedule(task, delay, unit);
}
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduledTaskQueue().add(task);
} else {
execute(new Runnable() {
@Override
public void run() {
scheduledTaskQueue().add(task);
}
});
}
return task;
}
schedule方法最終會(huì)調(diào)用AbstractScheduledEventExecutor#schedule方法杰刽,將定時(shí)任務(wù)包裝成ScheduledFutureTask放入scheduledTaskQueue隊(duì)列中呻纹。在eventLoop的runAllTasks中會(huì)拉取task進(jìn)行調(diào)度(如果到了此延遲任務(wù)執(zhí)行的時(shí)候)。
再看看上次讀事件更新:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
//讀事件進(jìn)行中专缠,會(huì)將reading設(shè)為true,此時(shí)不會(huì)除非讀檢測事件淑仆。
reading = true;
firstReaderIdleEvent = firstAllIdleEvent = true;
}
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//讀事件完成后會(huì)更新讀時(shí)間
if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
lastReadTime = ticksInNanos();
reading = false;
}
ctx.fireChannelReadComplete();
}
其他檢測事件也類似涝婉,這里就不分析了。