1. 背景
今日需要實現(xiàn)一個局域網(wǎng)的wifi數(shù)據(jù)傳輸?shù)墓δ芗奶洹2豢杀苊獾亩鞴唬M行TCP socket的操作背率。由于邏輯比較簡單话瞧,就使用最小化的Socket通信即可。
2. 目標
實現(xiàn)一個TCP模塊寝姿,供業(yè)務層調(diào)用交排。業(yè)務層把要發(fā)送的數(shù)據(jù)和發(fā)送的目標給到TCP模塊,TCP模塊完成傳輸饵筑,并將傳輸狀態(tài)和傳輸結(jié)果反饋給業(yè)務層
3. 需求分析
需要一個類來封裝所有的TCP操作埃篓,我們定義為ChannelTransport
接口:
- 啟動網(wǎng)絡連接
startTcpService() - 關閉網(wǎng)絡連接
stopTcpService() - 網(wǎng)絡聯(lián)通
onConnected() - 網(wǎng)絡連接失敗
onConnectFail() - 網(wǎng)絡聯(lián)通的情況下,一端close根资,另外一端收到-1
onConnectEnd() - 斷網(wǎng)
onConnectException() - 發(fā)送數(shù)據(jù)
sendByte(Byte[] datas) - 收到數(shù)據(jù)
onRead(Byte[] datas)
4. TCP模塊封裝
4.1 接口層 IfSocket
接口層主要就是一接口定義:如打開socket,關閉socket架专,發(fā)送數(shù)據(jù),接收數(shù)據(jù)玄帕,連接狀態(tài)監(jiān)聽部脚,數(shù)據(jù)監(jiān)聽
public interface IfSocket {
public void start();
public void sendTo(byte[] var1);
public void receive();
public void stop();
public void setConnectEventListener(SocketConnectEventListener connectEventListener);
public void setReadStreamListener(OnStreamListener onReadStreamListener);
public static interface SocketConnectEventListener {
/**
* 用于Socket主線程,socket連接成功
*/
public void onConnected();
/**
* 用于Socket主線程裤纹,socket連接失敗
*/
public void onConnectFail();
/**
* 用于IOReadThread委刘,socket 傳輸過程中收到-1結(jié)束符,標志對方socket close或者關閉輸入
*/
public void onConnectEnd();
/**
* 用于IOReadThread和IOWriteThread,socket 傳輸過程中的Io exception
*/
public void onConnectException();
}
/**
* 用于IO Thread 钱雷,一次socket傳輸接收到的數(shù)據(jù)
* @author xuqiang
*
*/
public static interface OnStreamListener {
public void onRead(byte[] var1);
public void onSent();
}
}
4.2 Socket端的具體實現(xiàn)
幾個注意點
- start要分server和client兩種情況
- IO線程用線程池實現(xiàn)TcpWriteIORunnable TcpReadIORunnable
- 設計一個心跳包線程TcpWriteAliveRunable骂铁,在當前沒有send數(shù)據(jù)的情況下,循環(huán)send心跳包
public class TcpSocket implements IfSocket {
boolean isServer = true; //是不是Server
String ipAddress; //Server的IP罩抗,給client用于connect的
protected ExecutorService mThreadPool; //線程池拉庵,用于新建receive和send線程
protected ScheduledExecutorService mScheduledThreadpool; //Timer線程池,用于發(fā)送心跳包
protected int mState; //當前的狀態(tài)
protected Socket mSocket;
protected ServerSocket mServerSocket;
protected SocketConnectEventListener mConnectEventListener;
protected OnStreamListener mOnStreamListener;
private InputStream mInStream;
private OutputStream mOutStream;
public static final byte[] SEND_TAG = new byte[] { -5, -17, -13, -19 }; //數(shù)據(jù)頭部套蒂,用于數(shù)據(jù)校驗
public static final byte[] SEND_ALIVE_TAG = new byte[] { -25, -31, -37, -43 }; //心跳包
protected TcpWriteAliveRunable mTcpWriteAliveRunable; //心跳包的task
public TcpSocket(boolean isServer, String ipAddress) {
super();
this.isServer = isServer;
this.ipAddress = ipAddress;
}
@Override
public void setConnectEventListener(
SocketConnectEventListener connectEventListener) {
this.mConnectEventListener = connectEventListener;
}
@Override
public void setReadStreamListener(OnStreamListener onReadStreamListener) {
this.mOnStreamListener = onReadStreamListener;
}
@Override
public void start() {
this.mThreadPool = Executors.newCachedThreadPool();
this.mScheduledThreadpool = Executors.newScheduledThreadPool(1);
this.mTcpWriteAliveRunable = new TcpWriteAliveRunable(
mOutStream, mConnectEventListener);
try {
if (isServer) {
mServerSocket = new ServerSocket(TcpVar.PORT);
this.mSocket = this.mServerSocket.accept();
} else {
this.mSocket = new Socket(ipAddress, TcpVar.PORT);
}
mState = TcpVar.STATE_CONNECTED;
mConnectEventListener.onConnected();
Dbg.i(TcpVar.TAG, " create socket sucess");
mSocket.setSoTimeout(20000); // 加入超時
mScheduledThreadpool.scheduleAtFixedRate(mTcpWriteAliveRunable, 4, 4, TimeUnit.SECONDS);
} catch (Exception e) {
mState = TcpVar.STATE_CONNECT_FAIL;
mConnectEventListener.onConnectFail();
Dbg.w(TcpVar.TAG, " create socket failed", e);
}
}
@Override
public void receive() {
if (mState != TcpVar.STATE_CONNECTED) {
return;
}
try {
mInStream = new BufferedInputStream(this.mSocket.getInputStream());
} catch (IOException e) {
mInStream = null;
}
mThreadPool.execute(new TcpReadIORunnable(mInStream,
mConnectEventListener, mOnStreamListener));
}
@Override
public void sendTo(byte[] var1) {
if (mState != TcpVar.STATE_CONNECTED) {
return;
}
try {
mOutStream = new BufferedOutputStream(
this.mSocket.getOutputStream());
} catch (IOException e) {
mOutStream = null;
}
try {
//發(fā)送時阻塞當前線程钞支,心跳包暫停發(fā)送,發(fā)送完畢后操刀,心跳包重新發(fā)送
mScheduledThreadpool.shutdownNow();
mThreadPool.submit(new TcpWriteIORunnable(mOutStream,
mConnectEventListener, mOnStreamListener,var1)).get();
mScheduledThreadpool.scheduleAtFixedRate(mTcpWriteAliveRunable, 4, 4, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
};
}
@Override
public void stop() {
mThreadPool.shutdownNow();
mScheduledThreadpool.shutdownNow();
try {
mSocket.close();
mServerSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
4.3 TcpWriteIoRunable的實現(xiàn)
數(shù)據(jù)格式很簡單
- SEND_TAG
- data_length
- data
public class TcpWriteIORunnable implements Runnable {
OutputStream mOutStream;
SocketConnectEventListener mConnectEventListener;
OnStreamListener mOnStreamListener;
byte[] data;
public TcpWriteIORunnable(OutputStream mOutStream,
SocketConnectEventListener mConnectEventListener,
OnStreamListener mOnStreamListener, byte[] datas) {
this.mOutStream = mOutStream;
this.mConnectEventListener = mConnectEventListener;
this.mOnStreamListener = mOnStreamListener;
this.data = data;
}
@Override
public void run() {
try {
mOutStream.write(TcpSocket.SEND_TAG);
mOutStream.write(Util.int2bytes(this.data.length));
mOutStream.write(this.data);
mOutStream.flush();
mOnStreamListener.onSent();
} catch (Exception e) {
mConnectEventListener.onConnectException();
}
}
}
4.4 TcpWriteAliveRunable的實現(xiàn)
心跳包的設計非常簡單烁挟,就是循環(huán)發(fā)送SEND_ALIVE_TAG
mScheduledThreadpool.scheduleAtFixedRate(mTcpWriteAliveRunable, 4, 4, TimeUnit.SECONDS);
public class TcpWriteAliveRunable implements Runnable {
OutputStream mOutStream;
SocketConnectEventListener mConnectEventListener;
public TcpWriteAliveRunable(OutputStream mOutStream,
SocketConnectEventListener mConnectEventListener) {
super();
this.mOutStream = mOutStream;
this.mConnectEventListener = mConnectEventListener;
}
@Override
public void run() {
try{
mOutStream.write(TcpSocket.SEND_ALIVE_TAG);
}
catch (Exception e) {
mConnectEventListener.onConnectException();
}
}
}
4.5 TcpReadIORunnable的實現(xiàn)
Read線程的流程主要分三步:
- 校驗SEND_TAG。校驗的過程中我們是一個字節(jié)一個字節(jié)的校驗
- 第二步還是在讀取數(shù)據(jù)長度
- 第三步就是讀取真正的數(shù)據(jù)了骨坑。有三種策略讀數(shù)據(jù):
- 一個byte一個byte的讀撼嗓,這樣效率較低
- mmInStream.read(len)。但是InputStream.read(len)有個問題就是欢唾,他可能實際讀取的長度是小于len的且警。這個len是數(shù)據(jù)讀取的最大值,所以也不能直接使用;
- 我的算法是:mmInStream.read(len)礁遣,每次記錄已經(jīng)read的數(shù)據(jù)量斑芜,然后通過len-readBytes得到還剩下的數(shù)據(jù)長度,然后依次循環(huán)讀取祟霍,直到數(shù)據(jù)量讀滿len或者read==-1(斷網(wǎng))為止杏头。
public class TcpReadIORunnable implements Runnable {
private boolean isStoped = false;
InputStream mInStream;
SocketConnectEventListener mConnectEventListener;
OnStreamListener mOnReadStreamListener;
public TcpReadIORunnable(InputStream mInStream,
SocketConnectEventListener mConnectEventListener,
OnStreamListener mOnReadStreamListener) {
this.mInStream = mInStream;
this.mConnectEventListener = mConnectEventListener;
this.mOnReadStreamListener = mOnReadStreamListener;
}
@Override
public void run() {
int i = 0;
ByteBuffer errorByteBuffer = ByteBuffer.allocate(1024 * 16);
while (!this.isStoped) {
try {
// 1.判斷起始標記 start
int t = this.mInStream.read();
if (t == -1) {
Dbg.e(TcpVar.TAG, "read stream is -1!!!!!!!"); // 網(wǎng)絡一旦斷了,或者一端關閉沸呐,則出循環(huán)醇王,結(jié)束io線程
mConnectEventListener.onConnectEnd();
break;
}
Dbg.d(TcpVar.TAG, "mmInStream.read() one sucess ");
byte b = (byte) (t & 0xFF);
if (TcpSocket.SEND_TAG[i] != b) {
errorByteBuffer.put(b);
Dbg.e(TcpVar.TAG,
"!read byte error i:"
+ i
+ " b:"
+ EncrypUtil
.byteArrayToHexStr(new byte[] { b })
+ " tag:"
+ EncrypUtil
.byteArrayToHexStr(new byte[] { TcpSocket.SEND_TAG[i] }));
i = 0;
continue;
}
i++;
if (i != TcpSocket.SEND_TAG.length) {
continue;//繼續(xù)讀下一個數(shù)據(jù),直到SEND_TAG讀完
}
i = 0;//到此處全部SEND_TAG全部讀完
//下面是數(shù)據(jù)的打印垂谢,用于調(diào)試
if (errorByteBuffer.position() != 0) {
byte[] dst = new byte[errorByteBuffer.position()];
errorByteBuffer.position(0);
errorByteBuffer.get(dst, 0, dst.length);
errorByteBuffer.clear();
Dbg.e(TcpVar.TAG,
"!read byte error data:"
+ EncrypUtil.byteArrayToHexStr(dst));
}
errorByteBuffer.clear();
// 2.讀取包長度
byte[] len = new byte[4];
for (int j = 0; j < len.length; j++) {
len[j] = (byte) (this.mInStream.read() & 0xFF);
}
// mmInStream.read(len);
int length = Util.bytes2int(len);
// Dbg.d("read length:"+length);
byte[] data = new byte[length];
Dbg.e(TcpVar.TAG, "start read data,length = " + length);
// 3. 讀取數(shù)據(jù)
int readBytes = 0;
while (readBytes < data.length) {
int read = mInStream.read(data, readBytes, data.length
- readBytes);
if (read == -1) {
break;
}
readBytes += read;
}
mOnReadStreamListener.onRead(data);
Dbg.d("read byte end!!!!!!!");
} catch (Exception e) {
Dbg.e("WifiTransferService",
"Fail to read bytes from input stream of Wifiiothread "
+ e.getMessage(), e.getMessage());
mConnectEventListener.onConnectException();
return;
}
}
}
}
5.5 Android業(yè)務層調(diào)用的注意事項厦画。
- 將ChannelTransport中使用TcpSocket做相應的操作疮茄,并且實現(xiàn)OnStreamListener和SocketConnectEventListener滥朱,即可。
- Socket的開啟/關閉/發(fā)送/接收力试,以及OnStreamListener和SocketConnectEventListener的回調(diào)都是在不同的線程中工作的徙邻。為了保證線程同步問題,我們需要使用一個HandlerThread畸裳,并將所有的callback讓HandlerThread去處理缰犁;然后使用ChannelTransport去extends Handler或者再新建一個Handler與這個HandlerThread對應起來。