前言:
通過之前的幾篇文章我們詳細(xì)的介紹到了一個socket框架應(yīng)該怎么架構(gòu),需要些什么模塊,可是美中不足的就是它只支持最簡單的socket協(xié)議,不能夠滿足實際生產(chǎn)情況蟀拷,于是我便對此框架進(jìn)行了改造,讓它能夠同時支持websocket 和 socket 萍聊,而且插件式 注冊问芬,當(dāng)需要別的長連接協(xié)議的時,完全可以自己定制寿桨。已經(jīng)把所有代碼整合了此衅,希望給個星星支持一下 microSocket。
實現(xiàn)基礎(chǔ):
一切編程皆socket 亭螟,這話有點說的絕對挡鞍。但是仔細(xì)想想,確實也就是那么回事媒佣,網(wǎng)絡(luò)通信現(xiàn)在99%都是socket吧匕累。我們有n多種協(xié)議,但是都是socket的默伍,所以這些協(xié)議無非就是 握手 解包 封包 上面不同 欢嘿,那我們把這些 過程 單獨封裝 不就能夠 寫一個框架能夠隨意切換 協(xié)議了么!
代碼實現(xiàn):
廢話不多說 我們直接看代碼 也糊!
type SocketTypes interface{
ConnHandle(msf *Msf,sess *Session)
Pack(data []byte)[]byte
}
我們定義了一個接口 必須實現(xiàn)兩個 函數(shù)
- .ConnHandle 函數(shù) 傳入一個session 對象 其實就是一個 socket握手成功的句柄炼蹦,我們在這個函數(shù)里面死循環(huán)不斷地讀取 句柄 的數(shù)據(jù) 并且 解包 處理粘包 和 解析數(shù)據(jù) 并 路由
- . Pack 函數(shù) 負(fù)責(zé)把要發(fā)送的數(shù)據(jù) 打包成指定協(xié)議的 數(shù)據(jù)包
如此一來我們 server 代碼便 非常的 整潔
func NewMsf(msfEvent MsfEventer,socketType SocketTypes) *Msf {
msf := &Msf{
EventPool: NewRouterMap(),
MsfEvent: msfEvent,
SocketType :socketType,
}
msf.SessionMaster = NewSessonM(msf)
return msf
}
func (this *Msf) Listening(address string) {
tcpListen, err := net.Listen("tcp", address)
if err != nil {
panic(err)
}
go this.SessionMaster.HeartBeat(2)
fd := uint32(0)
for {
conn, err := tcpListen.Accept()
if err != nil {
log.Println(err)
continue
}
//調(diào)用握手事件
if this.MsfEvent.OnHandel(fd, conn) == false {
continue
}
sess := NewSession(fd, conn)
this.SessionMaster.SetSession(fd, sess)
fd++
//調(diào)用相應(yīng)協(xié)議的處理函數(shù)
go this.SocketType.ConnHandle(this,sess)
}
}
上面就是 server 的兩個主要 函數(shù)
- . 第一個函數(shù) 不用多介紹,就是創(chuàng)建一個server 對象 狸剃,傳入一個框架事件對象掐隐,和一個 協(xié)議對象。
- . 第二個函數(shù)實現(xiàn)的邏輯就是 監(jiān)聽一個端口 死循環(huán) 不斷的接收新連接 钞馁,一接到新連接 就 調(diào)用 協(xié)議對象 處理該連接 虑省,并且設(shè)置 心跳 還有 一些錯誤處理 。
websocket協(xié)議對象的實現(xiàn):
為了給大家做一個例子 我搜索了相關(guān)資料 除了 封裝了一個普通 socket 的協(xié)議對象 還封裝了一個 websocket 對象 希望能夠一起學(xué)習(xí)僧凰,
type WebSocket struct {
}
//ws接收消息
func (this *WebSocket) ConnHandle(msf *Msf, sess *Session) {
defer func() {
msf.SessionMaster.DelSessionById(sess.Id)
//調(diào)用斷開鏈接事件
msf.MsfEvent.OnClose(sess.Id)
}()
if this.Handshake(sess) == false {
return
}
var (
buf []byte
err error
fin byte
opcode byte
mask byte
mKey []byte
length uint64
l uint16
payload byte
tembuf []byte
)
for {
buf = make([]byte, 2)
_, err = io.ReadFull(sess.Con, buf)
if err != nil {
break
}
fin = buf[0] >> 7
opcode = buf[0] & 0xf
if opcode == 8 {
break
}
mask = buf[1] >> 7
payload = buf[1] & 0x7f
switch {
case payload < 126:
length = uint64(payload)
case payload == 126:
buf = make([]byte, 2)
io.ReadFull(sess.Con, buf)
binary.Read(bytes.NewReader(buf), binary.BigEndian, &l)
length = uint64(l)
case payload == 127:
buf = make([]byte, 8)
io.ReadFull(sess.Con, buf)
binary.Read(bytes.NewReader(buf), binary.BigEndian, &length)
}
if mask == 1 {
mKey = make([]byte, 4)
io.ReadFull(sess.Con, mKey)
}
buf = make([]byte, length)
io.ReadFull(sess.Con, buf)
if mask == 1 {
for i, v := range buf {
buf[i] = v ^ mKey[i%4]
}
}
//更新最近接收到消息的時間
sess.UpdateTime()
if len(buf) == 0 {
continue
}
tembuf = append(tembuf,buf...)
if fin == 0 {
continue
}
//把請求的到數(shù)據(jù)轉(zhuǎn)化為map
requestData := util.String2Map(string(tembuf))
tembuf = make([]byte,0)
if requestData["module"] == "" || requestData["action"] == "" ||
msf.EventPool.ModuleExit(requestData["module"]) == false {
log.Println("not find module ", requestData)
continue
}
requestData["fd"] = fmt.Sprintf("%d", sess.Id)
//調(diào)用接收消息事件
if msf.MsfEvent.OnMessage(sess.Id, requestData) == false {
return
}
//路由
msf.EventPool.Hook(requestData["module"], requestData["action"], requestData)
}
}
//websocket 打包事件
func (this *WebSocket) Pack(data []byte) []byte {
length := len(data)
frame := []byte{129}
switch {
case length < 126:
frame = append(frame, byte(length))
case length <= 0xffff:
buf := make([]byte, 2)
binary.BigEndian.PutUint16(buf, uint16(length))
frame = append(frame, byte(126))
frame = append(frame, buf...)
case uint64(length) <= 0xffffffffffffffff:
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, uint64(length))
frame = append(frame, byte(127))
frame = append(frame, buf...)
default:
return []byte{}
}
frame = append(frame, data...)
return frame
}
//握手包
func (this *WebSocket) Handshake(sess *Session) bool {
reader := bufio.NewReader(sess.Con)
key := ""
str := ""
for {
line, _, err := reader.ReadLine()
if err != nil {
log.Fatal(err)
return false
}
if len(line) == 0 {
break
}
str = string(line)
if strings.HasPrefix(str, "Sec-WebSocket-Key") {
key = str[19:43]
}
}
sha := sha1.New()
io.WriteString(sha, key+"258EAFA5-E914-47DA-95CA-C5AB0DC85B11")
key = base64.StdEncoding.EncodeToString(sha.Sum(nil))
header := "HTTP/1.1 101 Switching Protocols\r\n" +
"Connection: Upgrade\r\n" +
"Sec-WebSocket-Version: 13\r\n" +
"Sec-WebSocket-Accept: " + key + "\r\n" +
"Upgrade: websocket\r\n\r\n"
sess.Con.Write([]byte(header))
return true
}
此對象我已經(jīng)測試過了 完全沒有問題 探颈!
如果有什么 疑問的 歡迎 留言 一起討論 。