為什么需要心跳包?劫谅?嚷掠?
心跳包主要是用來做TCP長連接保活的贯城。有時 socket 雖然是連接的但中間網(wǎng)絡(luò)可能有問題霹娄,這時你還在不停的往外發(fā)送數(shù)據(jù)鲫骗,但對方是收不到的踩晶,你不知道對方是不是還活著,不知道 socket 通道是不是還是聯(lián)通的。 心跳包就是你發(fā)送一些試探包給對方坦胶,對方回應(yīng)晴楔,如果一定時間內(nèi)比如30秒內(nèi)沒有收到任何數(shù)據(jù),說明對方或網(wǎng)絡(luò)可能有問題了纪岁。這時你主動斷開 socket 連接则果,避免浪費資源。
TCP 本來就有 keepAlive 機制為什么還需要應(yīng)用層自己實現(xiàn)心跳遗增?款青??
TCP keepAlive 也是在一定時間內(nèi)(默認2小時)socket 上沒有接收到數(shù)據(jù)時主動斷開連接饰及,避免浪費資源康震,這時遠端很可能已經(jīng)down機了或中間網(wǎng)絡(luò)有問題。也是通過發(fā)送一系列試探包看有沒有回應(yīng)來實現(xiàn)的屏箍。
TCP keepAlive 依賴操作系統(tǒng)橘忱,默認是關(guān)閉的,需要修改操作系統(tǒng)配置打開鹦付。
所以在應(yīng)用層實現(xiàn)心跳包還是必須的敲长。
netty 中通過 IdleStateHandler 在空閑的時候發(fā)送心跳包
為什么在空閑的時候發(fā)送心跳包,而不是每隔固定時間發(fā)送???
這個是顯而易見的祈噪,正常通信時說明兩端連接是沒有問題的辑鲤,所以只在空閑的時候發(fā)送心跳包。如果每隔固定時間發(fā)送就會浪費資源占用正常通信的資源月褥。
假設(shè)現(xiàn)在要做一個手機端推送的項目宁赤,所有手機通過 TCP 長連接連接到后臺服務(wù)器。心跳機制是這樣的:
- 手機端在寫空閑的時候發(fā)送心跳包給服務(wù)端决左,用 IdleStateHandler 來做 socket 的空閑檢測。 如果 5 秒內(nèi)沒有寫任何數(shù)據(jù)惑芭,則發(fā)送心跳包到服務(wù)端
ch.pipeline().addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));
ch.pipeline().addLast(new HeartbeatKeeper());
@ChannelHandler.Sharable
public class HeartbeatKeeper extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent) evt).state();
if (state == IdleState.WRITER_IDLE) {
System.out.println("client send heart beat");
ctx.channel().writeAndFlush("heart beat\n");
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
- 服務(wù)端設(shè)置讀超時遂跟,如果 30 秒內(nèi)沒有收到一個客戶端的任何數(shù)據(jù)則關(guān)閉連接婴渡。
ch.pipeline().addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
ch.pipeline().addLast(new IdleStateTrigger());
@ChannelHandler.Sharable
public class IdleStateTrigger extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent) evt).state();
if (state == IdleState.READER_IDLE) {
ctx.channel().close();
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
服務(wù)端接收到心跳包后要不要回復(fù)缩搅??硼瓣?
看其他博客說不要回復(fù),如果有 10萬空閑連接亿傅,光回復(fù)心跳包就要占用大量資源瘟栖。服務(wù)端讀超時后直接關(guān)閉連接,客戶端再進行重連酬滤。
斷線重連
斷線重連也很簡單就是在 channelInactive 的時候重新 connect 就行了。參考其他博客專門用一個 ChannelInboundHandler 來處理斷線重連氯檐。
@ChannelHandler.Sharable
public class ConnectionWatchDog extends ChannelInboundHandlerAdapter implements TimerTask {
private final Bootstrap bootstrap;
private final String host;
private final int port;
private volatile boolean reconnect;
private int attempts;
private Channel channel;
private HashedWheelTimer timer = new HashedWheelTimer();
private int reconnectDelay = 5;
public ConnectionWatchDog(Bootstrap bootstrap, String host, int port, boolean reconnect) {
this.bootstrap = bootstrap;
this.host = host;
this.port = port;
this.reconnect = reconnect;
}
public Channel getChannel() {
return this.channel;
}
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive");
channel = ctx.channel();
ctx.fireChannelActive();
}
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive");
ctx.fireChannelInactive();
channel = null;
if (reconnect) {
attempts = 0;
scheduleReconnect();
}
}
private void connect() {
bootstrap.connect(host, port).addListener((future) -> {
if (future.isSuccess()) {
System.out.println("connected to " + host + ":" + port);
attempts = 0;
} else {
System.out.println("connect failed " + attempts + " , to reconnect after " + reconnectDelay + " 秒");
// 這里現(xiàn)在每5秒重連一次直到連接上冠摄,可自己實現(xiàn)重連邏輯
scheduleReconnect();
}
});
}
public void run(Timeout timeout) {
synchronized (this.bootstrap) {
++attempts;
connect();
}
}
private void scheduleReconnect() {
timer.newTimeout(this, reconnectDelay, TimeUnit.SECONDS);
}
public void setReconnect(boolean reconnect) {
this.reconnect = reconnect;
}
}
這個 watchDog Handler 應(yīng)當放在 ChannelPipeline 的最前面
public void connect(String host, int port) {
Bootstrap bootstrap = new Bootstrap().group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
watchDog = new ConnectionWatchDog(bootstrap, host, port, true);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(watchDog);
ch.pipeline().addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));
ch.pipeline().addLast(new HeartbeatKeeper());
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new ClientDemoHandler());
}
});
// 這里如果第一次連接不成功也可以嘗試多次連接
bootstrap.connect(host, port);
}
客戶端給服務(wù)端發(fā)送心跳包河泳,服務(wù)端用給客戶端發(fā)心跳包嗎拆挥?韵洋??
其實客戶端和服務(wù)端都是相對的搪缨,這個看應(yīng)用場景。如果客戶端想要及時處理斷網(wǎng)负甸,路由故障等情況就需要接受服務(wù)端發(fā)來的心跳來檢測痹届。像斷網(wǎng),路由故障這種情況蚕捉,兩邊都不知道TCP連接的狀態(tài)柴淘,必須靠心跳。長連接服務(wù)端一般都要接收心跳包的为严,如果沒有心跳可能會有大量的無效連接,直接耗盡服務(wù)器資源第股,無效的連接要盡早關(guān)閉掉。
DEMO:
https://github.com/lesliebeijing/Netty-Demo
基于 Netty 寫的一個簡單的推送 DEMO诲锹,可用在手機端推送
https://github.com/lesliebeijing/EncPush
Netty 客戶端用在 Android 中也很穩(wěn)定,我們的物聯(lián)網(wǎng)項目Android和后臺都是用的 Netty改备。