數(shù)據(jù)的封裝與傳輸
上一篇文章講到Websocket握手協(xié)議的處理车柠,現(xiàn)在開始說數(shù)據(jù)的傳輸颈嚼。Websocket數(shù)據(jù)幀的封裝和傳輸其實和處理握手請求的流程差不太多局骤,都需要通過bytebuffer寫入Socket的輸出流或者從輸入流讀取呢堰。這里我們從解析數(shù)據(jù)幀開始屿聋,知道如何解析數(shù)據(jù)幀机久,封裝也就不成話下臭墨。
我們來重新復習一下Websocket的數(shù)據(jù)傳輸協(xié)議
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
具體每一bit的意思
FIN 1bit 表示信息的最后一幀
RSV 1-3 1bit each 以后備用的 默認都為 0
Opcode 4bit 幀類型,稍后細說
Mask 1bit 掩碼膘盖,是否加密數(shù)據(jù)胧弛,默認必須置為1
Payload 7bit 數(shù)據(jù)的長度
Masking-key 1 or 4 bit 掩碼
Payload data (x + y) bytes 數(shù)據(jù)
Extension data x bytes 擴展數(shù)據(jù)
Application data y bytes 程序數(shù)據(jù)
其中較為重要的是Opcode字段尤误,這個字段表示幀的類型,例如這個傳輸?shù)膸俏谋绢愋瓦€是二進制類型结缚,二進制類型傳輸?shù)臄?shù)據(jù)可以是圖片或者語音之類的损晤。
OPCODE:4位
解釋PayloadData,如果接收到未知的opcode红竭,接收端必須關閉連接尤勋。
0x0表示附加數(shù)據(jù)幀
0x1表示文本數(shù)據(jù)幀
0x2表示二進制數(shù)據(jù)幀
0x3-7暫時無定義,為以后的非控制幀保留
0x8表示連接關閉
0x9表示ping
0xA表示pong
0xB-F暫時無定義德崭,為以后的控制幀保留
當我們解析完數(shù)據(jù)幀后斥黑,需要根據(jù)Opcode字段的類型進行對消息的不同回調處理,幀數(shù)據(jù)可以如下定義眉厨,Opcode通過枚舉定義出來
public abstract interface Framedata
{
public abstract boolean isFin();
public abstract boolean getTransfereMasked();
public abstract Opcode getOpcode();
public abstract ByteBuffer getPayloadData();
public abstract void append(Framedata paramFramedata)
throws InvalidFrameException;
public static enum Opcode
{
CONTINUOUS, TEXT, BINARY, PING, PONG, CLOSING;
}
}
解析首先得判斷Fin字段和Opcode字段锌奴。這兩個字段跟一個消息分片(Fragment)的概念有關,一般的已知長度的消息憾股,F(xiàn)in值為1鹿蜀,表示結束,Opcode值不能為0服球,可以看一下上面代碼的枚舉類型茴恰,0代表CONTINUOUS,就是說有連續(xù)的數(shù)據(jù)幀會發(fā)送過來斩熊。
而某些未知長度的消息往枣,則需要把消息分片發(fā)送。根據(jù)我的理解粉渠,實時的語音聊天就屬于這種情形分冈。這時候前面幀的Fin值為0,Opcode值為0霸株,最后的結束幀F(xiàn)in值為1雕沉,Opcode不為0。
首先應該是讀取二進制流解析數(shù)據(jù)去件,從bytebuffer中取出數(shù)據(jù)坡椒,按byte來讀取,一個byte有8個bit尤溜,數(shù)據(jù)幀是按bit來定義的倔叼,我們還得從byte中解析出具體Websocket協(xié)議中的每一個幀。
下面是讀取二進制流的代碼靴跛,值得關注的是通過位與運算獲取具體每一個bit的數(shù)據(jù)缀雳,另外比較麻煩的是playload的處理,以及還需要根據(jù)Mask掩碼來解密數(shù)據(jù)梢睛,傳輸協(xié)議里面有一位Mask肥印,代表是否加密數(shù)據(jù)识椰,默認設置為1。這里就不細說了
public Framedata translateSingleFrame(ByteBuffer buffer) throws Draft_10.IncompleteException, InvalidDataException {
int maxpacketsize = buffer.remaining();
int realpacketsize = 2;
if (maxpacketsize < realpacketsize)
throw new IncompleteException(realpacketsize);
byte b1 = buffer.get();
boolean FIN = b1 >> 8 != 0;
byte rsv = (byte) ((b1 & 0x7F) >> 4);
if (rsv != 0)
throw new InvalidFrameException("bad rsv " + rsv);
byte b2 = buffer.get();
boolean MASK = (b2 & 0xFFFFFF80) != 0;
int payloadlength = (byte) (b2 & 0x7F);
Framedata.Opcode optcode = toOpcode((byte) (b1 & 0xF));
if ((!FIN) && (
(optcode == Framedata.Opcode.PING) || (optcode == Framedata.Opcode.PONG) || (optcode == Framedata.Opcode.CLOSING))) {
throw new InvalidFrameException("control frames may no be fragmented");
}
if ((payloadlength < 0) || (payloadlength > 125)) {
if ((optcode == Framedata.Opcode.PING) || (optcode == Framedata.Opcode.PONG) || (optcode == Framedata.Opcode.CLOSING)) {
throw new InvalidFrameException("more than 125 octets");
}
if (payloadlength == 126) {
realpacketsize += 2;
if (maxpacketsize < realpacketsize)
throw new IncompleteException(realpacketsize);
byte[] sizebytes = new byte[3];
sizebytes[1] = buffer.get();
sizebytes[2] = buffer.get();
payloadlength = new BigInteger(sizebytes).intValue();
} else {
realpacketsize += 8;
if (maxpacketsize < realpacketsize)
throw new IncompleteException(realpacketsize);
byte[] bytes = new byte[8];
for (int i = 0; i < 8; i++) {
bytes[i] = buffer.get();
}
long length = new BigInteger(bytes).longValue();
if (length > 2147483647L) {
throw new LimitExedeedException("Payloadsize is to big...");
}
payloadlength = (int) length;
}
}
realpacketsize += (MASK ? 4 : 0);
realpacketsize += payloadlength;
if (maxpacketsize < realpacketsize) {
throw new IncompleteException(realpacketsize);
}
ByteBuffer payload = ByteBuffer.allocate(checkAlloc(payloadlength));
if (MASK) {
byte[] maskskey = new byte[4];
buffer.get(maskskey);
for (int i = 0; i < payloadlength; i++)
payload.put((byte) (buffer.get() ^ maskskey[(i % 4)]));
} else {
payload.put(buffer.array(), buffer.position(), payload.limit());
buffer.position(buffer.position() + payload.limit());
}
FrameBuilder frame;
FrameBuilder frame;
if (optcode == Framedata.Opcode.CLOSING) {
frame = new CloseFrameBuilder();
} else {
frame = new FramedataImpl1();
frame.setFin(FIN);
frame.setOptcode(optcode);
}
payload.flip();
frame.setPayload(payload);
return frame;
}
讀取完后就是根據(jù)這個數(shù)據(jù)幀的類型來進行不同的回調
private void decodeFrames(ByteBuffer socketBuffer) {
try {
List frames = this.draft.translateFrame(socketBuffer);//讀取二進制流
for (Framedata f : frames) {
if (DEBUG)
System.out.println("matched frame: " + f);
Framedata.Opcode curop = f.getOpcode();
boolean fin = f.isFin();
if (curop == Framedata.Opcode.CLOSING) { //關閉幀
int code = 1005;
String reason = "";
if ((f instanceof CloseFrame)) {
CloseFrame cf = (CloseFrame) f;
code = cf.getCloseCode();
reason = cf.getMessage();
}
if (this.readystate == WebSocket.READYSTATE.CLOSING) {
closeConnection(code, reason, true);
} else if (this.draft.getCloseHandshakeType() == Draft.CloseHandshakeType.TWOWAY)
close(code, reason, true);
else {
flushAndClose(code, reason, false);
}
} else if (curop == Framedata.Opcode.PING) { //Ping
this.wsl.onWebsocketPing(this, f);
} else if (curop == Framedata.Opcode.PONG) { //Pong
this.wsl.onWebsocketPong(this, f);
} else if ((!fin) || (curop == Framedata.Opcode.CONTINUOUS)) { //分片消息
if (curop != Framedata.Opcode.CONTINUOUS) {
if (this.current_continuous_frame_opcode != null)
throw new InvalidDataException(1002, "Previous continuous frame sequence not completed.");
this.current_continuous_frame_opcode = curop;
} else if (fin) {
if (this.current_continuous_frame_opcode == null)
throw new InvalidDataException(1002, "Continuous frame sequence was not started.");
this.current_continuous_frame_opcode = null;
} else if (this.current_continuous_frame_opcode == null) {
throw new InvalidDataException(1002, "Continuous frame sequence was not started.");
}
try {
this.wsl.onWebsocketMessageFragment(this, f);
} catch (RuntimeException e) {
this.wsl.onWebsocketError(this, e);
}
} else { //普通消息
if (this.current_continuous_frame_opcode != null)
throw new InvalidDataException(1002, "Continuous frame sequence not completed.");
if (curop == Framedata.Opcode.TEXT) //文本消息
try {
this.wsl.onWebsocketMessage(this, Charsetfunctions.stringUtf8(f.getPayloadData()));
} catch (RuntimeException e) {
this.wsl.onWebsocketError(this, e);
}
else if (curop == Framedata.Opcode.BINARY) //二進制消息
try {
this.wsl.onWebsocketMessage(this, f.getPayloadData());
} catch (RuntimeException e) {
this.wsl.onWebsocketError(this, e);
}
else
throw new InvalidDataException(1002, "non control or continious frame expected");
}
}
} catch (InvalidDataException e1) {
this.wsl.onWebsocketError(this, e1);
close(e1);
return;
}
}
現(xiàn)在說一下控制幀的處理深碱,WebSocket控制幀有3種:Close(關閉幀)腹鹉、Ping以及Pong。Close關閉幀很容易理解敷硅,客戶端如果接受到了就關閉連接功咒,客戶端也可以發(fā)送關閉幀給服務端。Ping和Pong是websocket里的心跳绞蹦,用來保證客戶端是在線的力奋,一般來說只有服務端給客戶端發(fā)送Ping,然后客戶端發(fā)送Pong來回應幽七,表明自己仍然在線景殷。
我們來看一下Nathan Rajlich的Java-Websocket代碼,客戶端對Ping的處理很簡單澡屡,把收到的Ping幀改一下Opcode的類型猿挚,就可以發(fā)送回給服務端了∈火模可以看到客戶端處理服務端發(fā)送的Pong回調的方法是空的绩蜻。
public void onWebsocketPing(WebSocket conn, Framedata f) {
FramedataImpl1 resp = new FramedataImpl1(f);
resp.setOptcode(Framedata.Opcode.PONG);
conn.sendFrame(resp);
}
public void onWebsocketPong(WebSocket conn, Framedata f) {
}
這篇暫時寫到這里好了,下一篇寫寫Websocket Client和Websocket Server的實現(xiàn)