在 Server 和 Client 通訊中熟吏,由于網(wǎng)絡(luò)等原因很有可能會發(fā)生數(shù)據(jù)丟包的現(xiàn)象距糖。如果數(shù)據(jù)確實(shí),服務(wù)端接收的信息不完整牵寺,就會造成混亂悍引。
我們就需要在 Server 和 Client 之間建立一個通訊協(xié)議,通過協(xié)議中的規(guī)則帽氓,判斷當(dāng)前接收到的信息是否完整趣斤。根據(jù)信息的完整情況,采取不同的處理方法黎休。
通訊協(xié)議 protocol 的核心就是設(shè)計一個頭部浓领。如果傳來的信息不包含這個頭部玉凯,就說明當(dāng)前信息和之前的信息是同一條。那么就把當(dāng)前信息和之前的那條信息合并成一條镊逝。
而協(xié)議主要包含的功能是封裝(Enpack)和解析(Depack)壮啊。
Enpack 是客戶端對信息進(jìn)行數(shù)據(jù)封裝嫉鲸。封裝之后可以傳遞給服務(wù)器撑蒜。
Depack 是服務(wù)端對信息進(jìn)行數(shù)據(jù)解封。
其中有個 Const 部分玄渗,用于定義頭部座菠、頭部長度、客戶端傳入信息長度藤树。
在代碼中浴滴,我們這樣定義
const (
ConstHeader = "Headers"
ConstHeaderLength = 7
ConstMLength = 4
)
頭部的內(nèi)容為 "Headers",長度為 7 岁钓。
所以 ConstHeaderLength = 7
而信息傳遞中升略,我們會把 int 類型轉(zhuǎn)換成 byte 類型。一個 int 的長度等于 4 個 byte 的長度屡限。因此品嚣,我們設(shè)置 ConstMLength = 4。代表客戶端傳來的信息大小钧大。
自定義協(xié)議 protocol 的代碼示例如下:
/**
* protocol
* @Author: Jian Junbo
* @Email: junbojian@qq.com
* @Create: 2017/9/14 11:49
*
* Description: 通訊協(xié)議處理
*/
package protocol
import (
"bytes"
"encoding/binary"
)
const (
ConstHeader = "Headers"
ConstHeaderLength = 7
ConstMLength = 4
)
//封包
func Enpack(message []byte) []byte {
return append(append([]byte(ConstHeader), IntToBytes(len(message))...), message...)
}
//解包
func Depack(buffer []byte) []byte {
length := len(buffer)
var i int
data := make([]byte, 32)
for i = 0; i < length; i++ {
if length < i + ConstHeaderLength + ConstMLength{
break
}
if string(buffer[i:i+ConstHeaderLength]) == ConstHeader {
messageLength := ByteToInt(buffer[i+ConstHeaderLength : i+ConstHeaderLength+ConstMLength])
if length < i+ConstHeaderLength+ConstMLength+messageLength {
break
}
data = buffer[i+ConstHeaderLength+ConstMLength : i+ConstHeaderLength+ConstMLength+messageLength]
}
}
if i == length {
return make([]byte, 0)
}
return data
}
//字節(jié)轉(zhuǎn)換成整形
func ByteToInt(n []byte) int {
bytesbuffer := bytes.NewBuffer(n)
var x int32
binary.Read(bytesbuffer, binary.BigEndian, &x)
return int(x)
}
//整數(shù)轉(zhuǎn)換成字節(jié)
func IntToBytes(n int) []byte {
x := int32(n)
bytesBuffer := bytes.NewBuffer([]byte{})
binary.Write(bytesBuffer, binary.BigEndian, x)
return bytesBuffer.Bytes()
}
Server 端主要是通過協(xié)議來解析客戶端發(fā)送來的信息翰撑。
建立一個函數(shù),用來完成連接對接收信息的處理啊央。其中建立了通道readerChannel眶诈,并把接收來的信息放在通道里。在放入通道之前瓜饥,使用 protocol 的 Depack 對信息進(jìn)行解析逝撬。
//連接處理
func handleConnection(conn net.Conn) {
//緩沖區(qū),存儲被截斷的數(shù)據(jù)
tmpBuffer := make([]byte, 0)
//接收解包
readerChannel := make(chan []byte, 10000)
go reader(readerChannel)
buffer := make([]byte, 1024)
for{
n, err := conn.Read(buffer)
if err != nil{
Log(conn.RemoteAddr().String(), "connection error: ", err)
return
}
tmpBuffer = protocol.Depack(append(tmpBuffer, buffer[:n]...))
readerChannel <- tmpBuffer //接收的信息寫入通道
}
defer conn.Close()
}
如果信息讀取發(fā)生錯誤(包括讀取到信息結(jié)束符 EOF)乓土,都會打印錯誤信息宪潮,并挑出循環(huán)。
Log(conn.RemoteAddr().String(), "connection error: ", err)
return
由于通道內(nèi)的數(shù)據(jù)是 []byte 型的帐我。需要轉(zhuǎn)換成 string坎炼。這個工作有專門的獲取通道數(shù)據(jù)的 reader(readerChannel chan []byte) 來完成。
//獲取通道數(shù)據(jù)
func reader(readerchannel chan []byte) {
for{
select {
case data := <-readerchannel:
Log(string(data)) //打印通道內(nèi)的信息
}
}
}
查看 Server 端代碼示例:
/**
* MySocketProtocalServer
* @Author: Jian Junbo
* @Email: junbojian@qq.com
* @Create: 2017/9/14 13:54
* Copyright (c) 2017 Jian Junbo All rights reserved.
*
* Description: 服務(wù)端拦键,接收客戶端傳來的信息
*/
package main
import (
"net"
"fmt"
"os"
"log"
"protocol"
)
func main() {
netListen, err := net.Listen("tcp", "localhost:7373")
CheckErr(err)
defer netListen.Close()
Log("Waiting for client ...") //啟動后谣光,等待客戶端訪問。
for{
conn, err := netListen.Accept() //監(jiān)聽客戶端
if err != nil {
Log(conn.RemoteAddr().String(), "發(fā)了了錯誤:", err)
continue
}
Log(conn.RemoteAddr().String(), "tcp connection success")
go handleConnection(conn)
}
}
//連接處理
func handleConnection(conn net.Conn) {
//緩沖區(qū)芬为,存儲被截斷的數(shù)據(jù)
tmpBuffer := make([]byte, 0)
//接收解包
readerChannel := make(chan []byte, 10000)
go reader(readerChannel)
buffer := make([]byte, 1024)
for{
n, err := conn.Read(buffer)
if err != nil{
Log(conn.RemoteAddr().String(), "connection error: ", err)
return
}
tmpBuffer = protocol.Depack(append(tmpBuffer, buffer[:n]...))
readerChannel <- tmpBuffer //接收的信息寫入通道
}
defer conn.Close()
}
//獲取通道數(shù)據(jù)
func reader(readerchannel chan []byte) {
for{
select {
case data := <-readerchannel:
Log(string(data)) //打印通道內(nèi)的信息
}
}
}
//日志處理
func Log(v ...interface{}) {
log.Println(v...)
}
//錯誤處理
func CheckErr(err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
os.Exit(1)
}
}
客戶端使用 Enpack 封裝要發(fā)送到服務(wù)端的信息后萄金,寫入連接 conn 中蟀悦。
/**
* MySocketProtocalClient
* @Author: Jian Junbo
* @Email: junbojian@qq.com
* @Create: 2017/9/14 15:23
* Copyright (c) 2017 Jian Junbo All rights reserved.
*
* Description:
*/
package main
import (
"net"
"time"
"strconv"
"protocol"
"fmt"
"os"
)
//發(fā)送100次請求
func send(conn net.Conn) {
for i := 0; i < 100; i++ {
session := GetSession()
words := "{\"ID\":\""+strconv.Itoa(i)+"\",\"Session\":\""+session+"20170914165908\",\"Meta\":\"golang\",\"Content\":\"message\"}"
conn.Write(protocol.Enpack([]byte(words)))
fmt.Println(words) //打印發(fā)送出去的信息
}
fmt.Println("send over")
defer conn.Close()
}
//用當(dāng)前時間做識別。當(dāng)前時間的十進(jìn)制整數(shù)
func GetSession() string {
gs1 := time.Now().Unix()
gs2 := strconv.FormatInt(gs1, 10)
return gs2
}
func main() {
server := "localhost:7373"
tcpAddr, err := net.ResolveTCPAddr("tcp4", server)
if err != nil{
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
os.Exit(1)
}
conn, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil{
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
os.Exit(1)
}
fmt.Println("connect success")
send(conn)
}
代碼運(yùn)行效果: