協(xié)議定制與數(shù)據(jù)序列化
1胡嘿、長連接這里我們肯定是基于TCP的蝶柿,而TCP協(xié)議其實默認已經(jīng)支持長連接撰筷,但是socket連接存在隨時斷開的情況夏块,這就需要有比較好的協(xié)議保障連接狀態(tài)的檢測遥赚。
2扬舒、定制數(shù)據(jù)序列化格式,建議使用protobuf或者thrift而不是htttp中常用的json凫佛,可以減少序列化與反序列化的開銷讲坎。當然如果用一些其他的協(xié)議孕惜,你可能需要自己實現(xiàn)encoder decoder了,TCP是流晨炕,上層協(xié)議對TCP的流是要做分包粘包處理的衫画,注意好對handler中channelRead和channelReadComplete的方法的復寫。
基于Netty 設(shè)計的客戶端架構(gòu)
1瓮栗、我們會需要設(shè)計一個客戶端削罩,就像netty的官方demo中做的那樣,定義好bootstrap和nioEventLoopGroup费奸。注意NioEventLoopGroup是可以復用的弥激,線程池復用對客戶端比較重要,在斷線重連的時候會排上用場愿阐。
我以采用webSocket協(xié)議為例
mClientHandler = new ClientHandler(sURI); //客戶端收到分包處理完的數(shù)據(jù)微服,然后開始分發(fā)
mMessageHandler = new MessageHandler(mHashMap, mBussinessCodeHelper); // 真正處理業(yè)務(wù)代碼的handler
bootstrap.group(mWorkGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.remoteAddress(sURI.getHost(), sURI.getPort());
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new TbbLoggerHandler());
pipeline.addLast(new IdleStateHandler(200, 180, 0, TimeUnit.SECONDS)); //讀超時與寫超時檢測的handler, 讀超時200s比寫超時時間長一些缨历,發(fā)生讀超時的時候直接斷開重連了职辨。
pipeline.addLast(new HttpClientCodec());
pipeline.addLast(new HttpObjectAggregator(MAX_CONTENT_LENGTH));
pipeline.addLast(mTbbClientHandler);
pipeline.addLast(mTbbMessageHandler);
}
});
try {
mChannel = bootstrap.connect().sync().channel();
mChannel.closeFuture().sync(); // 會阻塞
XGLog.logger_d(mChannel);
} catch (Exception e) {
XGLog.logger_d("exception " + e);
e.printStackTrace();
} finally {
XGLog.logger_d("workerGroup shall shutdown " + TextUtils.isEmpty(mToken));
if (!TextUtils.isEmpty(mToken)) {
mWorkGroup.schedule(new Runnable() {
@Override
public void run() {
connect(); // 斷線重連,這里簡單處理戈二,就是斷了以后每隔2s 嘗試連接一次,其實為了省電需要限制次數(shù)并倍增間隔時間的
}
}, 2, TimeUnit.SECONDS);
}
}
2喳资、設(shè)計好你的handler, netty框架的運用精髓基本都在handler當中觉吭,包括處理流解包然后處理業(yè)務(wù)最后發(fā)送數(shù)據(jù),幾乎全可以包含在handler當中仆邓,客戶端主動發(fā)送數(shù)據(jù)依賴于channel鲜滩,簡單點講就是channel 的 writeAndFlush,向緩沖區(qū)寫數(shù)據(jù)并刷新緩沖區(qū)节值,刷新的操作其實就是發(fā)送數(shù)據(jù)了徙硅,socket的操作本質(zhì)上都抽象成IO動作。一個簡單的handler的例子搞疗,不一定能正常運行嗓蘑,只是作為例子,最為關(guān)鍵的幾個方法
(1) channelRead0(ChannelHandlerContext ctx, Object msg)
處理解包后的數(shù)據(jù)匿乃,也可以分發(fā)數(shù)據(jù)包給下個handler
(2) channelActivie(ChannelHandlerContext ctx)
通道建立了桩皿,這個時候相當于tcp握手了
(3) channelInActive(ChannelHandlerContext ctx)
tcp斷開連接
(4) excepitonCaught(ChannelHandlerContext ctx, Throwable cause)
異常處理,最好要處理幢炸,不處理也別忘了吧throwable發(fā)給下handler泄隔,這個一定得做
(5) userEventTriggered(final ChannelHandlerContext ctx, Object evt)
處理一些自定義的事件,包括讀超時寫超時這樣的事件宛徊,充分體現(xiàn)了netty事件驅(qū)動的特點
@Sharable
public class ClientHandler extends SimpleChannelInboundHandler<Object> {
private static final int BLOCKING_QUEUE_SIZE = 1 << 12;
private static final Queue<MCProtocolPB.MCProtocol> mQueue = new LinkedList<>();
private static final long IDLE_TIME = (long) (5 * 1e9);
/**
* 用于 WebSocket 的握手
*/
private WebSocketClientHandshaker mHandshaker;
/**
*
*/
private ChannelPromise mChannelPromise;
private final PingWebSocketFrame mPingWebSocketFrame = new PingWebSocketFrame();
private final CloseWebSocketFrame mCloseWebSocketFrame = new CloseWebSocketFrame();
private ChannelHandlerContext mChannelHandlerContext;
/**
* 唯一的構(gòu)造類
*
* @param uri WebSocket uri
*/
public ClientHandler(URI uri) {
mHandshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
}
@Override
protected void channelRead0(final ChannelHandlerContext ctx, final Object msg) throws Exception {
if (!mHandshaker.isHandshakeComplete()) {
try {
mHandshaker.finishHandshake(ctx.channel(), (FullHttpResponse) msg);
mChannelPromise.setSuccess();
while (!mQueue.isEmpty()) {
ctx.writeAndFlush(mQueue.poll());
}
ctx.fireUserEventTriggered(Event.CONNECTED); //發(fā)送websocket協(xié)議連接正式建立的事件
} catch (WebSocketHandshakeException e) {
mChannelPromise.setFailure(e);
}
}
if (msg instanceof WebSocketFrame) {
ctx.fireChannelRead(((WebSocketFrame) msg).retain());
}
}
/**
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
mChannelHandlerContext = ctx;
mHandshaker.handshake(ctx.channel());
ctx.writeAndFlush(mPingWebSocketFrame.retain());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
ctx.fireUserEventTriggered(Event.DISCONNECTED);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
XGLog.logger_e("channel unregistered");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
XGLog.logger_e(cause.toString());
super.exceptionCaught(ctx, cause);
if (!mChannelPromise.isDone()) {
mChannelPromise.setFailure(cause);
}
cause.printStackTrace();
ctx.close();
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
XGLog.logger_d("handler removed");
}
/**
*
*
*
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
XGLog.logger_i("handler added");
mChannelPromise = ctx.newPromise();
}
/**
* 端口閑時 發(fā)送心跳包 處理的方法
*
*/
@Override
public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
final IdleStateEvent event = (IdleStateEvent) evt;
ctx.executor().execute(new Runnable() {
@Override
public void run() {
handleIdleEvent(ctx, event);
}
});
super.userEventTriggered(ctx, evt);
} else if (Event.REQUEST_TIME_OUT.equals(evt)) {
XGLog.logger_i("REQUEST triggered already");
}
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
super.channelWritabilityChanged(ctx);
}
/**
* 處理{@link IdleStateEvent}
*
* @param ctx
* @param event
*/
private void handleIdleEvent(final ChannelHandlerContext ctx, IdleStateEvent event) {
IdleState state = event.state();
if (IdleState.READER_IDLE.equals(state)) {
XGLog.logger_e("READ IDLE");
} else if (IdleState.WRITER_IDLE.equals(state)) {
XGLog.logger_e("WRITE IDLE");
ctx.writeAndFlush(mPingWebSocketFrame.retain()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else if (IdleState.ALL_IDLE.equals(state)) {
XGLog.logger_e("ALL IDLE");
}
}
long ticksInNanos() {
return System.nanoTime();
}
}
3佛嬉、考慮好你的斷線重連的情況逻澳,建議每次客戶端發(fā)送數(shù)據(jù)后,服務(wù)端都給回包暖呕,如果鏈路長時間空閑斜做,那么觸發(fā)寫超時事件,發(fā)送心跳包給服務(wù)端缰揪,其實也可以反過來服務(wù)端給客戶端發(fā)數(shù)據(jù)陨享,然后如果還發(fā)生讀超時事件,相當于對方?jīng)]有給回包钝腺,那么斷開連接抛姑,嘗試重連。
public class MyLoggerHandler extends LoggingHandler {
private static final long IDLE_TIME = (long) (9.9 * 1e9);
private long mLastWriteTime = -1;
private ScheduledFuture mScheduledFuture;
public MyLoggerHandler() {
super(LogLevel.INFO);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
XGLog.logger_i("read message " + msg);
long current = ticksInNanos();
long delta = Math.abs(current - mLastWriteTime);
if (delta < IDLE_TIME) {
if (mScheduledFuture != null) {
mScheduledFuture.cancel(false);
}
}
}
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, ChannelPromise promise) throws Exception {
super.write(ctx, msg, promise);
XGLog.logger_i("TbbLoggerHandler write message ");
mScheduledFuture = ctx.executor().schedule(new Runnable() {
@Override
public void run() {
long current = ticksInNanos();
long delta = Math.abs(current - mLastWriteTime);
XGLog.logger_i("current " + current + " last " + mLastWriteTime + " delta " + delta);
if (delta > IDLE_TIME) {
ctx.close();
}
}
}, 10, TimeUnit.SECONDS); // 10s 內(nèi)沒有收到服務(wù)端回執(zhí)艳狐,斷線重連
mLastWriteTime = ticksInNanos();
}
long ticksInNanos() {
return System.nanoTime();
}
}
4定硝、如果客戶端主動發(fā)起請求,那么通過我們的Client的channel引用毫目,可以向服務(wù)端發(fā)送數(shù)據(jù)蔬啡。
5、由于netty可以主動發(fā)起事件镀虐,在netty里處理完了數(shù)據(jù)如果要更新UI或者數(shù)據(jù)庫箱蟆,那么你需要設(shè)計一個簡單的適配層,通過事件機制來觸發(fā)事情就會變得簡單刮便。
針對網(wǎng)絡(luò)波動情況的處理
1空猜、如果發(fā)生可以主動檢測到的鏈路斷開的情況,一定會觸發(fā)channelRemoved恨旱,然后channel會變成inActive,然后那個connect().sync()也就不再阻塞了辈毯,然后往下走,我們的代碼中其實已經(jīng)可以主動間隔2s去重連了搜贤。NioEventLoopGroup.exectue()類似于jdk的線程池谆沃,可以定時觸發(fā)一個事件。
try {
mChannel = bootstrap.connect().sync().channel();
mChannel.closeFuture().sync(); // 會阻塞
XGLog.logger_d(mChannel);
} catch (Exception e) {
XGLog.logger_d("exception " + e);
e.printStackTrace();
} finally {
XGLog.logger_d("workerGroup shall shutdown " + TextUtils.isEmpty(mToken));
if (!TextUtils.isEmpty(mToken)) {
mWorkGroup.schedule(new Runnable() {
@Override
public void run() {
connect(); // 斷線重連仪芒,這里簡單處理唁影,就是斷了以后每隔2s 嘗試連接一次,其實為了省電需要限制次數(shù)并倍增間隔時間的
}
}, 2, TimeUnit.SECONDS);
}
}
2夭咬、如果發(fā)生延時很長的情況,如果發(fā)送請求10s內(nèi)沒有讀事件發(fā)生掏湾,那么你需要考慮重新建立連接了筑公,簡單的做法就是ChannelHandlerContext.close()拇涤,利用 1 中的NioEventLoopGroup線程池 mWorkGroup定時嘗試連接券躁,如果連接成功趾痘,該線程就阻塞岸军,只有斷開的時候才會跑到需要重連的地方佣谐。
3、如果打過電話或者檢測到網(wǎng)絡(luò)切換,那么你也需要斷開然后重連炫掐,因為你的在移動網(wǎng)IP地址基本就變了旗唁,所以重連吧,誰讓我們基于TCP/IP呢。這種情況需要借助Android的一些組件比如BroadCastReceiver來檢測,與netty關(guān)系不大晒来。