Android 一套完整的 Socket 解決方案
本文原創(chuàng)铺峭,轉(zhuǎn)載請(qǐng)注明出處。
歡迎關(guān)注我的 簡(jiǎn)書 捉蚤,關(guān)注我的專題 Android Class 我會(huì)長(zhǎng)期堅(jiān)持為大家收錄簡(jiǎn)書上高質(zhì)量的 Android 相關(guān)博文抬驴。
項(xiàng)目地址,喜歡點(diǎn)一個(gè) star:
寫在前面:
在上上周的時(shí)候缆巧,寫了一篇文章:
在 Android 上布持,一個(gè)完整的 UDP 通信模塊應(yīng)該是怎樣的?
文中介紹了在 Android 端陕悬,一個(gè)完整的 UDP 模塊應(yīng)該考慮哪些方面题暖。當(dāng)然了文中最后也提到了,UDP 的使用本身就有一些局限性捉超,比如發(fā)送數(shù)據(jù)的大小有限制胧卤,屬于不可靠協(xié)議,可能丟包拼岳。而且它是一對(duì)多發(fā)送的協(xié)議等等...如果能將這個(gè)模塊能加入 TCP Socket 補(bǔ)充枝誊,那就比較完美解決了 Android 上端到端的通信。下面就來看看怎么去做惜纸。
整體步驟流程
先來說一下整體的步驟思路吧:
- 發(fā)送 UDP 廣播侧啼,大家都知道 UDP 廣播的特性是整個(gè)網(wǎng)段的設(shè)備都可以收到這個(gè)消息。
- 接收方收到了 UDP 的廣播堪簿,將自己的 ip 地址痊乾,和雙方約定的端口號(hào),回復(fù)給 UDP 的發(fā)送方椭更。
- 發(fā)送方拿到了對(duì)方的 ip 地址以及端口號(hào)哪审,就可以發(fā)起 TCP 請(qǐng)求了,建立 TCP 連接虑瀑。
- 保持一個(gè) TCP 心跳湿滓,如果發(fā)現(xiàn)對(duì)方不在了,超時(shí)重復(fù) 1 步驟舌狗,重新建立聯(lián)系叽奥。
整體的步驟就和上述的一樣,下面用代碼展開:
搭建 UDP 模塊
public UDPSocket(Context context) {
this.mContext = context;
int cpuNumbers = Runtime.getRuntime().availableProcessors();
// 根據(jù)CPU數(shù)目初始化線程池
mThreadPool = Executors.newFixedThreadPool(cpuNumbers * Config.POOL_SIZE);
// 記錄創(chuàng)建對(duì)象時(shí)的時(shí)間
lastReceiveTime = System.currentTimeMillis();
messageReceiveList = new ArrayList<>();
Log.d(TAG, "創(chuàng)建 UDP 對(duì)象");
// createUser();
}
首先進(jìn)行一些初始化操作痛侍,準(zhǔn)備線程池朝氓,記錄對(duì)象初始的時(shí)間等等。
public void startUDPSocket() {
if (client != null) return;
try {
// 表明這個(gè) Socket 在設(shè)置的端口上監(jiān)聽數(shù)據(jù)。
client = new DatagramSocket(CLIENT_PORT);
client.setReuseAddress(true);
if (receivePacket == null) {
// 創(chuàng)建接受數(shù)據(jù)的 packet
receivePacket = new DatagramPacket(receiveByte, BUFFER_LENGTH);
}
startSocketThread();
} catch (SocketException e) {
e.printStackTrace();
}
}
緊接著就創(chuàng)建了真正的一個(gè) UDP Socket 端赵哲,DatagramSocket待德,注意這里傳入的端口號(hào) CLIENT_PORT 的意思是這個(gè) DatagramSocket 在此端口號(hào)接收消息。
/**
* 開啟發(fā)送數(shù)據(jù)的線程
*/
private void startSocketThread() {
clientThread = new Thread(new Runnable() {
@Override
public void run() {
receiveMessage();
}
});
isThreadRunning = true;
clientThread.start();
Log.d(TAG, "開啟 UDP 數(shù)據(jù)接收線程");
startHeartbeatTimer();
}
我們都知道 Socket 中要處理數(shù)據(jù)的發(fā)送和接收枫夺,并且發(fā)送和接收都是阻塞的将宪,應(yīng)該放在子線程中,這里就開啟了一個(gè)線程橡庞,來處理接收到的 UDP 消息(UDP 模塊上一篇文章講得比較詳細(xì)了较坛,所以這里就不詳細(xì)展開了)
/**
* 處理接受到的消息
*/
private void receiveMessage() {
while (isThreadRunning) {
try {
if (client != null) {
client.receive(receivePacket);
}
lastReceiveTime = System.currentTimeMillis();
Log.d(TAG, "receive packet success...");
} catch (IOException e) {
Log.e(TAG, "UDP數(shù)據(jù)包接收失敗扒最!線程停止");
stopUDPSocket();
e.printStackTrace();
return;
}
if (receivePacket == null || receivePacket.getLength() == 0) {
Log.e(TAG, "無法接收UDP數(shù)據(jù)或者接收到的UDP數(shù)據(jù)為空");
continue;
}
String strReceive = new String(receivePacket.getData(), receivePacket.getOffset(), receivePacket.getLength());
Log.d(TAG, strReceive + " from " + receivePacket.getAddress().getHostAddress() + ":" + receivePacket.getPort());
//解析接收到的 json 信息
notifyMessageReceive(strReceive);
// 每次接收完UDP數(shù)據(jù)后燎潮,重置長(zhǎng)度。否則可能會(huì)導(dǎo)致下次收到數(shù)據(jù)包被截?cái)唷? if (receivePacket != null) {
receivePacket.setLength(BUFFER_LENGTH);
}
}
}
在子線程接收 UDP 數(shù)據(jù)扼倘,并且 notifyMessageReceive 方法通過接口來向外通知消息确封。
/**
* 發(fā)送心跳包
*
* @param message
*/
public void sendMessage(final String message) {
mThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
BROADCAST_IP = WifiUtil.getBroadcastAddress();
Log.d(TAG, "BROADCAST_IP:" + BROADCAST_IP);
InetAddress targetAddress = InetAddress.getByName(BROADCAST_IP);
DatagramPacket packet = new DatagramPacket(message.getBytes(), message.length(), targetAddress, CLIENT_PORT);
client.send(packet);
// 數(shù)據(jù)發(fā)送事件
Log.d(TAG, "數(shù)據(jù)發(fā)送成功");
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
接著 startHeartbeatTimer
開啟一個(gè)心跳線程,每間隔五秒再菊,就去廣播一個(gè) UDP 消息爪喘。注意這里 getBroadcastAddress
是獲取的網(wǎng)段 ip,發(fā)送這個(gè) UDP 消息的時(shí)候纠拔,整個(gè)網(wǎng)段的所有設(shè)備都可以接收到秉剑。
到此為止,我們發(fā)送端的 UDP 算是搭建完成了稠诲。
搭建 TCP 模塊
接下來 TCP 模塊該出場(chǎng)了侦鹏,UDP 發(fā)送心跳廣播的目的就是找到對(duì)應(yīng)設(shè)備的 ip 地址和約定好的端口,所以在 UDP 數(shù)據(jù)的接收方法里:
/**
* 處理 udp 收到的消息
*
* @param message
*/
private void handleUdpMessage(String message) {
try {
JSONObject jsonObject = new JSONObject(message);
String ip = jsonObject.optString(Config.TCP_IP);
String port = jsonObject.optString(Config.TCP_PORT);
if (!TextUtils.isEmpty(ip) && !TextUtils.isEmpty(port)) {
startTcpConnection(ip, port);
}
} catch (JSONException e) {
e.printStackTrace();
}
}
這個(gè)方法的目的就是取到對(duì)方 UDPServer 端臀叙,發(fā)給我的 UDP 消息略水,將它的 ip 地址告訴了我,以及我們提前約定好的端口號(hào)劝萤。
怎么獲得一個(gè)設(shè)備的 ip 呢渊涝?
public String getLocalIPAddress() {
WifiInfo wifiInfo = mWifiManager.getConnectionInfo();
return intToIp(wifiInfo.getIpAddress());
}
private static String intToIp(int i) {
return (i & 0xFF) + "." + ((i >> 8) & 0xFF) + "." + ((i >> 16) & 0xFF) + "."
+ ((i >> 24) & 0xFF);
}
現(xiàn)在拿到了對(duì)方的 ip,以及約定好的端口號(hào)床嫌,終于可以開啟一個(gè) TCP 客戶端了跨释。
private boolean startTcpConnection(final String ip, final int port) {
try {
if (mSocket == null) {
mSocket = new Socket(ip, port);
mSocket.setKeepAlive(true);
mSocket.setTcpNoDelay(true);
mSocket.setReuseAddress(true);
}
InputStream is = mSocket.getInputStream();
br = new BufferedReader(new InputStreamReader(is));
OutputStream os = mSocket.getOutputStream();
pw = new PrintWriter(new BufferedWriter(new OutputStreamWriter(os)), true);
Log.d(TAG, "tcp 創(chuàng)建成功...");
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
當(dāng) TCP 客戶端成功建立的時(shí)候,我們就可以通過 TCP Socket 來發(fā)送和接收消息了厌处。
細(xì)節(jié)處理
接下來就是一些細(xì)節(jié)處理了鳖谈,比如我們的 UDP 心跳,當(dāng) TCP 建立成功之時(shí)阔涉,我們要停止 UDP 的心跳:
if (startTcpConnection(ip, Integer.valueOf(port))) {// 嘗試建立 TCP 連接
if (mListener != null) {
mListener.onSuccess();
}
startReceiveTcpThread();
startHeartbeatTimer();
} else {
if (mListener != null) {
mListener.onFailed(Config.ErrorCode.CREATE_TCP_ERROR);
}
}
// TCP已經(jīng)成功建立連接缆娃,停止 UDP 的心跳包捷绒。
public void stopHeartbeatTimer() {
if (timer != null) {
timer.exit();
timer = null;
}
}
對(duì) TCP 連接進(jìn)行心跳保護(hù):
/**
* 啟動(dòng)心跳
*/
private void startHeartbeatTimer() {
if (timer == null) {
timer = new HeartbeatTimer();
}
timer.setOnScheduleListener(new HeartbeatTimer.OnScheduleListener() {
@Override
public void onSchedule() {
Log.d(TAG, "timer is onSchedule...");
long duration = System.currentTimeMillis() - lastReceiveTime;
Log.d(TAG, "duration:" + duration);
if (duration > TIME_OUT) {//若超過十五秒都沒收到我的心跳包,則認(rèn)為對(duì)方不在線龄恋。
Log.d(TAG, "tcp ping 超時(shí)疙驾,對(duì)方已經(jīng)下線");
stopTcpConnection();
if (mListener != null) {
mListener.onFailed(Config.ErrorCode.PING_TCP_TIMEOUT);
}
} else if (duration > HEARTBEAT_MESSAGE_DURATION) {//若超過兩秒他沒收到我的心跳包凶伙,則重新發(fā)一個(gè)郭毕。
JSONObject jsonObject = new JSONObject();
try {
jsonObject.put(Config.MSG, Config.PING);
} catch (JSONException e) {
e.printStackTrace();
}
sendTcpMessage(jsonObject.toString());
}
}
});
timer.startTimer(0, 1000 * 2);
}
首先會(huì)每隔兩秒,就給對(duì)方發(fā)送一個(gè) ping 包函荣,看看對(duì)面在不在显押,如果超過 15 秒還沒有回復(fù)我,那就說明對(duì)方掉線了傻挂,關(guān)閉我這邊的 TCP 端乘碑。進(jìn)入 onFailed 方法。
@Override
public void onFailed(int errorCode) {// tcp 異常處理
switch (errorCode) {
case Config.ErrorCode.CREATE_TCP_ERROR:
break;
case Config.ErrorCode.PING_TCP_TIMEOUT:
udpSocket.startHeartbeatTimer();
tcpSocket = null;
break;
}
}
當(dāng) TCP 連接超時(shí)金拒,我就會(huì)重新啟動(dòng) UDP 的廣播心跳兽肤,尋找等待連接的設(shè)備。進(jìn)入下一個(gè)步驟循環(huán)绪抛。
對(duì)于數(shù)據(jù)傳輸?shù)母袷桨〉鹊燃?xì)節(jié)资铡,這個(gè)和業(yè)務(wù)相關(guān)。自己來定就好幢码。
還可以根據(jù)自己業(yè)務(wù)的模式笤休,是 CPU 密集型啊,還是 IO 密集型啊症副,來開啟不同的線程通道店雅。這個(gè)就涉及線程的知識(shí)了。
項(xiàng)目地址贞铣,喜歡點(diǎn)一個(gè) star: