原文地址
最近使用Netty 異步通訊框架 法绵,在使用的過程中發(fā)現(xiàn)如果當(dāng)網(wǎng)絡(luò)斷開的時候會出現(xiàn)無法檢測的現(xiàn)象箕速;
影響長鏈接斷開的原因主要有三種:1.服務(wù)停止,2.本地網(wǎng)線斷開朋譬,3.公網(wǎng)或者局域網(wǎng)中交換機斷開盐茎;
在使用的過程中發(fā)現(xiàn)在服務(wù)停止或者本地網(wǎng)絡(luò)斷開的時候netty的@ChannelHandler中的channelInactive會被調(diào)用,但是如果要是公網(wǎng)或者局域網(wǎng)交換機直接網(wǎng)絡(luò)斷開是不能立刻收到channelInactive的回調(diào);所以我設(shè)計的是通過IdleStateHandler函數(shù)進(jìn)行回調(diào)徙赢;在每次收到心跳數(shù)據(jù)之后寫一個延遲發(fā)送的函數(shù)字柠,延遲心跳時間發(fā)送心跳
算了還是上代碼吧,實在是寫不下去了狡赐;
NettyClientBootstrap android客戶端啟動類
//netty 客戶端入口程序
public class NettyClientManager{
private String host; //ip地址
private int port; //端口號
private EventLoopGroup group;//EventLoop線程組
private Bootstrap b;
private Channel ch;
private ScheduledExecutorService executorService;
// 隔N秒后重連
private static final int RE_CONN_WAIT_SECONDS = 5;
//多長時間為請求后窑业,發(fā)送心跳
private static final int WRITE_WAIT_SECONDS = 7;
// 是否停止
private boolean isStop = false;
private final String TAG = "NettyClientBootstrap";
//連接狀態(tài)變化通知接口
private ITCPStateListener mStateListener;
//handler實現(xiàn)類
private NettyClientHandler mNettyClientHandler;
private boolean isOnline =false;
public NettyClientBootstrap(String host, int port) {
this.host = host;
this.port = port;
group = new NioEventLoopGroup();
b = new Bootstrap();
mNettyClientHandler = new NettyClientHandler(mListener);
b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
/**
*此處設(shè)置應(yīng)該與服務(wù)器設(shè)置相同
*
**/
ChannelPipeline pipeline = ch.pipeline();
// Decoders
pipeline.addLast("frameDecoder", new LineBasedFrameDecoder(1024 * 1024 *
1024));
pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
// Encoder
pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast("lineEncoder", new LineEncoder(LineSeparator.UNIX,
CharsetUtil.UTF_8));
//設(shè)置 IdleStateHandler 函數(shù) 可以在userEventTriggered函數(shù)中獲取讀寫超時以及總超時
pipeline.addLast("ping", new IdleStateHandler(WRITE_WAIT_SECONDS,WRITE_WAIT_SECONDS, WRITE_WAIT_SECONDS, TimeUnit.SECONDS));
// 客戶端的邏輯
pipeline.addLast("handler", mNettyClientHandler);
}
});
}
//開連接服務(wù)
public void onStart() {
new Thread() {
@Override
public void run() {
connServer();
super.run();
}
}.start();
}
//停止服務(wù)
public void onStop() {
isStop = true;
if (ch != null && ch.isOpen()) {
ch.close();
}
if (executorService != null) {
executorService.shutdown();
}
}
//內(nèi)部連接函數(shù)
private void connServer() {
Log.e(TAG, "connServer ServerIP = " + IStatic.ServerIP + " ;tcpPort = " + IStatic.tcpPort);
isStop = false;
if (executorService != null) {
executorService.shutdown();
}
//以固定延遲(時間)來反復(fù)進(jìn)行重連,用于開始沒有連接成功的情況
executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleWithFixedDelay(new Runnable() {
boolean isConnSucc = true;
@Override
public void run() {
try {
//連接服務(wù)器
if (ch != null && ch.isOpen()) {
Log.e(TAG, " ch != null && ch.isOpen() ");
ch.close();
}
ch = b.connect(host, port).sync().channel();
// 此方法會阻塞
// ch.closeFuture().sync();
System.out.println("connect server finish");
Log.e(TAG, "connect server finish");
} catch (Exception e) {
e.printStackTrace();
Log.e(TAG, e.toString());
isConnSucc = false;
} finally {
System.out.println("executorService.shutdown before");
Log.e(TAG, "executorService.shutdown before isConnSucc = " + isConnSucc);
if (isConnSucc) {
if (executorService != null) {
executorService.shutdown();
}
}
System.out.println("executorService.shutdown after");
Log.e(TAG, "connect server finish isConnSucc = " + isConnSucc);
}
}
}, RE_CONN_WAIT_SECONDS, RE_CONN_WAIT_SECONDS, TimeUnit.SECONDS);
}
//根據(jù)NettyClientHandler 中回調(diào)得到的狀態(tài)對長鏈接進(jìn)行重練操作
private ITCPStateListener mListener = new ITCPStateListener() {
@Override
public void online() {
//由于在handler中判斷的有一點問題枕屉,所以在此處重新判斷一下斷線后第一次上線的時候回調(diào)給main函數(shù)常柄,以便能夠提示用戶連接成功
if(!isOnline)
{
if (mStateListener != null)
mStateListener.online();
new Thread(){
@Override
public void run() {
super.run();
//連接成功后將executorService 連接池關(guān)閉,以保證能夠在重連的時候能夠建立連接成功
if(!executorService.isShutdown())
{
executorService.shutdown();
Log.e(TAG,"executorService is not Shutdown");
}else {
Log.e(TAG,"executorService is Shutdown");
}
}
}.start();
}
isOnline=true;
}
@Override
public void offline() {
if(isOnline)
{
if (mStateListener != null)
mStateListener.offline();
}
isOnline=false;
if (!isStop) {
new Thread() {
@Override
public void run() {
super.run();
try {
sleep(5 * 1000);
/*
* 重連
*/
connServer();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
}
}
};
//發(fā)送數(shù)據(jù)
public void sendMsg(String msg)
{
mNettyClientHandler.sendMsg(msg);
}
//設(shè)置連接狀態(tài)監(jiān)聽
public void setStateListener(ITCPStateListener stateListener) {
mStateListener = stateListener;
}
//設(shè)置返回數(shù)據(jù)監(jiān)聽
public void setInfoListener(ITCPInfoListener infoListener) {
mNettyClientHandler.setInfoListener(infoListener);
}
}
netty客戶端的入口程序完成后接下來我們來完成更為主要的handler實現(xiàn)程序
@ChannelHandler.Sharable
public class NettyClientHandler extends SimpleChannelInboundHandler<String> {
private ITCPStateListener mStateListener;
private final int HEART_FRESH_TIME = 5000;//設(shè)置心跳間隔時間
private boolean isonline = false;
private String HEART_FRESH = "";
private final String TAG = "NettyClientHandler";
private ChannelHandlerContext mctx;
private ITCPInfoListener mInfoListener;
private SFresh mFresh = null;
public NettyClientHandler(ITCPStateListener StateListener) {
mStateListener = StateListener;
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
//讀超時
MyLogger.e(TAG, "===服務(wù)端=== (Reader_IDLE 讀超時)");
ctx.channel().close();
} else if (event.state() == IdleState.WRITER_IDLE) {
//寫超時
MyLogger.e(TAG, "===服務(wù)端=== (Reader_IDLE 寫超時)");
ctx.channel().close();
} else if (event.state() == IdleState.ALL_IDLE) {
//總超時
MyLogger.e(TAG, "===服務(wù)端=== (ALL_IDLE 總超時)");
ctx.channel().close();
}
}
}
//通過handler機制能夠保證只是接收到數(shù)據(jù)只是發(fā)送一次心跳數(shù)據(jù)搀擂,在試驗中發(fā)現(xiàn)西潘,在網(wǎng)絡(luò)不好的條件下,
//反復(fù)重連的時候會產(chǎn)生多個心跳那么將會出現(xiàn)同一秒內(nèi)可能發(fā)生多次心跳哨颂,所以在接收到發(fā)送心跳的指令
//之后首先移除以前的歷史數(shù)據(jù)秸架,保證只是發(fā)送一次
private Handler mHanler = new Handler(){
@Override
public void handleMessage(Message msg) {
mHanler .removeMessage(0);
//此處用來發(fā)送心跳數(shù)據(jù)
}
};
@Override
protected void channelRead0(ChannelHandlerContext ctx, String message) throws Exception {
//測試中發(fā)現(xiàn)channelActive這個函數(shù)有的時候并不代表真的是連接成功了,那么我選擇在收到數(shù)據(jù)的時候認(rèn)為是連成功
if (!isonline) {
mStateListener.online();
isonline = true;
}
String action = JSON.parseObject(message, JsonBean.class).action;
if(action.equls("fresh")){
//此處是我們的心跳數(shù)據(jù)咆蒿,你們可以根據(jù)你們自己的方式進(jìn)行修改
mHanler .removeMessageDelay(0,5000);
//接收到心跳數(shù)據(jù)延遲5s后發(fā)送數(shù)據(jù)东抹,這樣能夠保證如果沒有收到心跳數(shù)據(jù)的時候?qū)?//不會發(fā)送心跳那么在超時讀取超時的時候會被回調(diào),還有就是如果在發(fā)送過程中網(wǎng)絡(luò)斷開那么會在寫入的超時函數(shù)中被回調(diào)
}
//接收到的數(shù)據(jù)沃测,對數(shù)據(jù)進(jìn)行處理缭黔,我們項目中主要是使用json數(shù)據(jù)解析,我使用的是fastjson 感覺速度很快蒂破,而且?guī)缀醪怀鲥e
}
//連接成功
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
System.out.println("Client active ");
MyLogger.e(TAG, "Client active ");
this.mctx = ctx;
super.channelActive(ctx);
setHeartInfo(1);
//延遲2s后發(fā)送首次心跳數(shù)據(jù)或者登陸數(shù)據(jù)
Observable.timer(2, TimeUnit.SECONDS)
.subscribe(l->{
sendHeartData();
},e->{
MyLogger.e(TAG, "===發(fā)送心跳異常=== e = "+e.toString());
ctx.channel().close();
});
}
//斷開連接函數(shù)馏谨,客戶端主動close或者服務(wù)器斷開連接的時候會回調(diào)此函數(shù)
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client close ");
MyLogger.e(TAG, "Client close ");
super.channelInactive(ctx);
isonline = false;
mctx.close();
this.mctx = null;
mHeartTimer.cancel();
if (mStateListener != null)
mStateListener.offline();
}
//發(fā)送數(shù)據(jù)
public boolean sendMsg(String message) {
if (mctx != null) {
try {
MyLogger.e(TAG, "sendMsg = " + mctx.isRemoved());
MyLogger.e(TAG, "sendMsg = " + message);
mctx.channel().writeAndFlush(message);
return true;
} catch (Exception e) {
return false;
}
}
return false;
}
//發(fā)送心跳數(shù)據(jù)
private void sendHeartData() {
try {
MyLogger.e(TAG, " HeartTask ctx = " + mctx);
MyLogger.e(TAG, " HeartTask HEART_FRESH = " + HEART_FRESH);
if (mctx != null) mctx.channel().writeAndFlush(HEART_FRESH);
if (mFresh.getLogin() == 1) {
setHeartInfo(0);
}
} catch (Exception e) {
MyLogger.e(TAG, " sendHeartData e = " + e.toString());
}
}
//設(shè)置心跳數(shù)據(jù)
private void setHeartInfo(int first) {
if (mFresh == null) {
mFresh = new SFresh();
mFresh.setIp(IStatic.IP);
mFresh.setMac(IStatic.MAC);
}
mFresh.setLogin(first);
HEART_FRESH = JSON.toJSONString(mFresh);
}
//設(shè)置接收到的數(shù)據(jù)監(jiān)聽函數(shù)
public void setInfoListener(ITCPInfoListener infoListener) {
mInfoListener = infoListener;
}
}
首次寫簡書,也不知道如何寫主要就是貼上代碼附迷,讓大家能夠幫助檢查一下惧互,以望能夠有大神能夠讀到幫助改正其中可能會在的bug以及不足之處;還有就是強烈推薦大家使用rxjava 異步 機制喇伯,寫出來的代碼真的是很好看