netty集成心跳檢測(cè)
- 需要在pipline里面添加netty自帶的心跳檢測(cè)
// 三個(gè)參數(shù)墅茉,第一個(gè)是當(dāng)多少s沒有讀的時(shí)候執(zhí)行讀空閑,
// 第二個(gè)是當(dāng)多少秒沒有寫的時(shí)候執(zhí)行寫空閑
// 第三個(gè)是當(dāng)多少秒都沒用讀寫的時(shí)候執(zhí)行讀寫空閑
pipeline.addLast(new IdleStateHandler(1,3,5));
- 實(shí)現(xiàn)一個(gè)方法來處理當(dāng)發(fā)生上述幾種空閑的情況并添加到pipline里
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;//強(qiáng)制類型轉(zhuǎn)化
if (event.state() == IdleState.READER_IDLE) {
//當(dāng)發(fā)生讀空閑時(shí)會(huì)執(zhí)行這個(gè)
// System.out.println("進(jìn)入讀空閑......");
} else if (event.state() == IdleState.WRITER_IDLE) {
//當(dāng)發(fā)生寫空閑時(shí)會(huì)執(zhí)行這個(gè)
// System.out.println("進(jìn)入寫空閑......");
} else if (event.state() == IdleState.ALL_IDLE) {
//當(dāng)發(fā)生讀寫空閑時(shí)會(huì)執(zhí)行這個(gè)
Channel channel = ctx.channel();
//資源釋放
channel.close();
}
}
}
}
添加HeartBeatHandler到server的pipline里面
pipeline.addLast(new HeartBeatHandler());
源碼分析
netty為什么添加了一個(gè)IdleStateHandler的攔截器之后就可以檢測(cè)了呢 我們先從IdleStateHandler看起
public IdleStateHandler(
int readerIdleTimeSeconds,
int writerIdleTimeSeconds,
int allIdleTimeSeconds) {
this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
TimeUnit.SECONDS);
}
這邊主要就是初始化我們傳進(jìn)來的數(shù)賦值給readerIdleTime等
public IdleStateHandler(boolean observeOutput,
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
this.observeOutput = observeOutput;
if (readerIdleTime <= 0) {
readerIdleTimeNanos = 0;
} else {
readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
}
if (writerIdleTime <= 0) {
writerIdleTimeNanos = 0;
} else {
writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
}
if (allIdleTime <= 0) {
allIdleTimeNanos = 0;
} else {
allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
}
}
當(dāng)初始化好之后骂铁,此時(shí)我們已經(jīng)把該handler添加到服務(wù)端的pipline中枚驻,
當(dāng)服務(wù)端初始化的時(shí)候會(huì)執(zhí)行該handler的channelRegistered目尖,channelActive等方法
這邊都會(huì)調(diào)用到IdleStateHandler的initialize方法撩独,我們看一下這個(gè)方法
private void initialize(ChannelHandlerContext ctx) {
//這個(gè)state主要是用來判斷是否初始化
switch (state) {
case 1:
case 2:
return;
}
state = 1;
initOutputChanged(ctx);
//這邊這個(gè)是干啥的
lastReadTime = lastWriteTime = ticksInNanos();
//初始化三個(gè)定時(shí)任務(wù)來檢測(cè)校读,這邊是關(guān)鍵弥锄,我們下面來看看這幾個(gè)定時(shí)任務(wù)做了啥事
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
- 三個(gè)的實(shí)現(xiàn)方法基本一致丧靡,我們以ReaderIdleTimeoutTask為例
private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
//主要就是this.ctx = ctx;賦值一下
super(ctx);
}
@Override
protected void run(ChannelHandlerContext ctx) {
//我們配置的那個(gè)超時(shí)時(shí)間
long nextDelay = readerIdleTimeNanos;
if (!reading) {
//計(jì)算最后一次的讀時(shí)間到現(xiàn)在是否超過了我們?cè)O(shè)置的值
// nextDelay = nextDelay - (ticksInNanos() - lastReadTime);
nextDelay -= ticksInNanos() - lastReadTime;
}
//空閑時(shí)間已經(jīng)超過我們?cè)O(shè)置的值
if (nextDelay <= 0) {
//重新執(zhí)行該定時(shí)任務(wù)
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
//判斷是否為第一次發(fā)生空閑事件
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try {
//新建一個(gè)IdleStateEvent事件
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
//這個(gè)方法是觸發(fā)我們上面寫的處理心跳的方法userEventTriggered
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
//如果沒有空閑,則重新執(zhí)行該定時(shí)任務(wù)
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
包裝成一個(gè)IdleStateEvent事件
protected IdleStateEvent newIdleStateEvent(IdleState state, boolean first) {
switch (state) {
case ALL_IDLE:
return first ? IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT : IdleStateEvent.ALL_IDLE_STATE_EVENT;
case READER_IDLE:
return first ? IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT : IdleStateEvent.READER_IDLE_STATE_EVENT;
case WRITER_IDLE:
return first ? IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT : IdleStateEvent.WRITER_IDLE_STATE_EVENT;
default:
throw new IllegalArgumentException("Unhandled: state=" + state + ", first=" + first);
}
}
主要是這個(gè)方法調(diào)用
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
用MASK_USER_EVENT_TRIGGERED來查找哪個(gè)是實(shí)現(xiàn)類
@Override
public ChannelHandlerContext fireUserEventTriggered(final Object event) {
invokeUserEventTriggered(findContextInbound(MASK_USER_EVENT_TRIGGERED), event);
return this;
}
用MASK_USER_EVENT_TRIGGERED來查找哪個(gè)是實(shí)現(xiàn)類叉讥,handler初始化時(shí)有根據(jù)是否有該方法有執(zhí)行這個(gè)
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while ((ctx.executionMask & mask) == 0);
return ctx;
}
static void invokeUserEventTriggered(final AbstractChannelHandlerContext next, final Object event) {
ObjectUtil.checkNotNull(event, "event");
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeUserEventTriggered(event);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeUserEventTriggered(event);
}
});
}
}
主要是在這邊窘行,調(diào)用真正的實(shí)現(xiàn)方法
private void invokeUserEventTriggered(Object event) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).userEventTriggered(this, event);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireUserEventTriggered(event);
}
}
- 其他寫空閑與讀寫空閑與讀空閑幾乎都是一樣,只是事件不一樣