Android開發(fā)之使用Netty進行Socket編程(二) 主要介紹了Netty框架內(nèi)的主要使用的類以及 在客戶端基本的建立連接并接收消息 的簡單示例检痰。眾所周知,在Android應(yīng)用開發(fā)中痢艺,一般是發(fā)起網(wǎng)絡(luò)請求,拿到數(shù)據(jù)后異步更新UI介陶,那么本文就介紹在開發(fā)過程中堤舒,如何封裝Netty,方便開發(fā)者請求服務(wù)端的數(shù)據(jù)并異步地更新UI哺呜。
1 基本功能
- 與服務(wù)器建立TCP連接舌缤,TCP是面向連接的協(xié)議,只能用于點對點的通訊某残,所以對服務(wù)器的定位是IP地址+端口號国撵。
- 發(fā)送消息給服務(wù)器,相當(dāng)于發(fā)起請求玻墅,這個消息里面應(yīng)該包含 與后臺協(xié)議好的校驗部分 以及 提供給服務(wù)器索引數(shù)據(jù)的參數(shù)部分介牙。
- 理想狀態(tài)下,TCP連接一旦建立澳厢,在通信雙方中的任何一方主動關(guān)閉連接前环础,TCP 連接都將被一直保持下去囚似,所以需要定時不間斷地給服務(wù)器發(fā)送一個后臺定義的消息段,即心跳包喳整,讓對方知道自己“在線”矢沿。
- 接收服務(wù)器返回的數(shù)據(jù)们衙,并且拿到這些數(shù)據(jù)異步地去更新程序頁面,將獲取到的JSON文本解析成POJO。
2 接口的定義
利用Java的回調(diào)機制衫嵌,在子線程建立連接并發(fā)出請求,在接收服務(wù)器返回的數(shù)據(jù)后告訴主線程更新UI鲜侥。還包括一些連接異常的處理都通過回調(diào)接口實現(xiàn)漂坏。
public interface INettyClient {
void connect(String host, int port);//1. 建立連接
void sendMessage(int mt, String msg, long delayed);//2. 發(fā)送消息
void addDataReceiveListener(OnDataReceiveListener listener);//3. 為不同的請求添加監(jiān)聽器
interface OnDataReceiveListener {
void onDataReceive(int mt, String json);//接收到數(shù)據(jù)時觸發(fā)
}
interface OnConnectStatusListener {
void onDisconnected();//連接異常時觸發(fā)
}
}
3 對服務(wù)器返回數(shù)據(jù)的處理
對數(shù)據(jù)的處理主要是在ChannelHandler
中完成(詳見Android開發(fā)之使用Netty進行Socket編程(二) )。在這里我們主要繼承了ChannelInboundHandlerAdapter
谓罗,并提供了回調(diào)接口供主線程更新UI粱哼。
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private final String TAG = "Netty";
private INettyClient.OnConnectStatusListener statusListener;
private List<INettyClient.OnDataReceiveListener> listeners = new ArrayList<>();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//channelActive()方法將會在連接被建立并且準(zhǔn)備進行通信時被調(diào)用。
Log.d(TAG, "channel active");
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
//channelRead()方法是在數(shù)據(jù)被接收的時候調(diào)用檩咱。
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
//verify(String body)方法對服務(wù)器返回的數(shù)據(jù)進行校驗揭措,并取出數(shù)據(jù)部分。
//具體校驗的方法需要與后臺同事進行協(xié)議刻蚯。
body = verify(body);
Log.d(TAG, "verify : " + body);
if (null != body)
parseJson(body);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
//exceptionCaught()事件處理方法是當(dāng)出現(xiàn)Throwable對象才會被調(diào)用绊含,
//即當(dāng)Netty由于IO錯誤或者處理器在處理事件時拋出的異常時。
//在大部分情況下炊汹,捕獲的異常應(yīng)該被記錄下來并且把關(guān)聯(lián)的channel給關(guān)閉掉躬充。
ctx.close();
Log.e(TAG, "Unexpected exception from downstream : "
+ cause.getMessage());
if (statusListener != null)//連接異常時觸發(fā)onDisconnected()
statusListener.onDisconnected();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
LogUtils.d(TAG, "channelReadComplete");
}
//對數(shù)據(jù)進行解析,拿出區(qū)分不同請求的 flag字段讨便,再根據(jù)不同的flag字段去觸發(fā)相對應(yīng)的監(jiān)聽器
private void parseJson(String json) {
try {
JSONObject jObject = new JSONObject(json);
int msgType = jObject.getInt(Constant.FLAG_MT);
Log.d(TAG, "parseJson message type: " + msgType + " json: " + json);
callListeners(msgType, json);
} catch (Exception e) {
LogUtils.e(TAG, "parseJson exception: " + e.getMessage());
e.printStackTrace();
}
}
//遍歷監(jiān)聽器List充甚,觸發(fā)擁有正確msgType 的OnDataReceiveListener,
//回調(diào) void onDataReceive(int mt, String json);方法
private void callListeners(final int msgType , final String json) {
for (final INettyClient.OnDataReceiveListener listener : listeners)
if (listener != null)
new Handler(Looper.getMainLooper()).post(new Runnable() {
@Override
public void run() {//主線程中進行
listener.onDataReceive(mt, json);
}
});
}
//綁定OnDataReceiveListener
public void addDataReceiveListener(INettyClient.OnDataReceiveListener listener) {
if (!listeners.contains(listener))
listeners.add(listener);
}
//綁定OnConnectStatusListener
public void setConnectStatusListener(INettyClient.OnConnectStatusListener listener) {
statusListener = listener;
}
}```
以上霸褒,就是一個供Android客戶端使用的``ChannelHandler``伴找,可以通過實現(xiàn)具體的``OnDataReceiveListener ``來異步地獲得服務(wù)器返回的 數(shù)據(jù)。
## 4 NettyClient的實現(xiàn)
以上僅僅是展示了如何處理服務(wù)器返回的數(shù)據(jù)废菱。建立連接疆瑰、發(fā)送消息以及心跳包的功能還沒進行封裝。在2.接口的定義 里面已經(jīng)定義好了NettyClient應(yīng)該具備哪些行為昙啄,現(xiàn)在進行具體的實現(xiàn)穆役。
主要的實現(xiàn)思路是:
1. 構(gòu)建Bootstrap,其中包括設(shè)置好ChannelHandler來處理將來接收到的數(shù)據(jù)(詳見[Android開發(fā)之使用Netty進行Socket編程(二)](http://www.reibang.com/p/db74e673e43c) )梳凛。由Boostrap建立連接耿币。通過`` channel.writeAndFlush(constructMessage(sendMsg)).sync()``發(fā)送消息。這些工作都在子線程完成韧拒。
2. 在子線程 建立連接并向服務(wù)器發(fā)送請求淹接,這里采用了`HanlderThread`+`Handler`的方案十性。通過`Looper`依次從`Handler`的隊列中獲取信息,逐個進行處理塑悼,保證安全劲适,不會出現(xiàn)混亂。
3. 心跳包的發(fā)送通過``handleMessage(Message msg)``中的死循環(huán)進行不間斷地發(fā)送厢蒜。
4. `NettyClientHandler `的實現(xiàn)中我們已經(jīng)知道霞势,當(dāng)Netty異常時會觸發(fā)`statusListener.onDisconnected();`,NettyClient中斑鸦,onDisconnected()方法會進行重連操作愕贡。
接收到服務(wù)器返回的消息時,會在主線程中觸發(fā)``onDataReceiveListener .onDataReceive(mt, json);``巷屿。
5. 外部通過單例模式進行調(diào)用固以。
public class NettyClient implements INettyClient {
private final String TAG = NettyClient.class.getSimpleName();
private static NettyClient mInstance;
private Bootstrap bootstrap;
private Channel channel;
private String host;
private int port;
private HandlerThread workThread = null;
private Handler mWorkHandler = null;
private NettyClientHandler nettyClientHandler;
private final String ACTION_SEND_TYPE = "action_send_type";
private final String ACTION_SEND_MSG = "action_send_msg";
private final int MESSAGE_INIT = 0x1;
private final int MESSAGE_CONNECT = 0x2;
private final int MESSAGE_SEND = 0x3;
private Handler.Callback mWorkHandlerCallback = new Handler.Callback() {
@Override
public boolean handleMessage(Message msg) {
switch (msg.what) {
case MESSAGE_INIT: {
NioEventLoopGroup group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new LineBasedFrameDecoder(Integer.MAX_VALUE));
pipeline.addLast(nettyClientHandler);
}
});
break;
}
case MESSAGE_CONNECT: {
try {
if (TextUtils.isEmpty(host) || port == 0) {
Exception exception = new Exception("Netty host | port is invalid");
throw exception;
}
channel = bootstrap.connect(new InetSocketAddress(host,
port)).sync().channel();
} catch (Exception e) {
LogUtils.e(TAG, "connect failed " + e.getMessage() + " reconnect delay: " + Constant.DELAY_CONNECT);
ToastUtils.showOnOtherThread("服務(wù)器連接失敗");
e.printStackTrace();
sendReconnectMessage();
}
break;
}
case MESSAGE_SEND: {
String sendMsg = msg.getData().getString(ACTION_SEND_MSG);
int mt = msg.getData().getInt(ACTION_SEND_TYPE);
try {
if (channel != null && channel.isOpen()) {
channel.writeAndFlush(constructMessage(sendMsg)).sync();
Log.d(TAG, "send succeed " + constructMessage(sendMsg));
} else {
throw new Exception("channel is null | closed");
}
} catch (Exception e) {
LogUtils.e(TAG, "send failed " + e.getMessage());
sendReconnectMessage();
e.printStackTrace();
} finally {
if (Constant.MT.HAND_SHAKE.getType() == mt)
sendMessage(mt, sendMsg, Constant.DELAY_HAND_SHAKE);
}
break;
}
}
return true;
}
};
private NettyClient() {
init();
}
public synchronized static NettyClient getInstance() {
if (mInstance == null)
mInstance = new NettyClient();
return mInstance;
}
private void init() {
workThread = new HandlerThread(NettyClient.class.getName());
workThread.start();
mWorkHandler = new Handler(workThread.getLooper(), mWorkHandlerCallback);
nettyClientHandler = new NettyClientHandler();
nettyClientHandler.setConnectStatusListener(new OnConnectStatusListener() {
@Override
public void onDisconnected() {
sendReconnectMessage();
}
});
mWorkHandler.sendEmptyMessage(MESSAGE_INIT);
}
@Override
public void connect(String host, int port) {
this.host = host;
this.port = port;
mWorkHandler.sendEmptyMessage(MESSAGE_CONNECT);
}
@Override
public void addDataReceiveListener(OnDataReceiveListener listener) {
if (nettyClientHandler != null)
nettyClientHandler.addDataReceiveListener(listener);
}
private void sendReconnectMessage() {
mWorkHandler.sendEmptyMessageDelayed(MESSAGE_CONNECT, Constant.DELAY_CONNECT);
}
@Override
public void sendMessage(int mt, String msg, long delayed) {
if (TextUtils.isEmpty(msg))
return;
Message message = new Message();
Bundle bundle = new Bundle();
message.what = MESSAGE_SEND;
bundle.putString(ACTION_SEND_MSG, msg);
bundle.putInt(ACTION_SEND_TYPE, mt);
message.setData(bundle);
mWorkHandler.sendMessageDelayed(message, delayed);
}
private String constructMessage(String json) {
String message;
//與后臺協(xié)議好,如何設(shè)置校驗部分嘱巾,然后和json一起發(fā)給服務(wù)器
return message;
}
}
## 5 NettyClient的使用
NettyClient采用了全局單例的模式憨琳。
1. 在Activity或者Fragment中為NettyClient綁定相應(yīng)的數(shù)據(jù)接收監(jiān)聽器:
NettyClient.getInstance().addDataReceiveListener(new INettyClient.OnDataReceiveListener() {
@Override
public void onDataReceive(int mt, String json) {
//這些邏輯運行在主線程
if (mt == Constant.MSG_TYPE) {
UserModel user=new Gson().fromJson(json, UserModel .class);
textView.setText(user.getName);
}
}
2. 在Activity或者Fragment中發(fā)起請求:
```NettyClient.getInstance().sendMessage(Constant.MSG_TYPE, "發(fā)向服務(wù)器的消息", 0);```
可以看到使用起來十分簡單方便。