在go中創(chuàng)建websocket服務(wù)
基礎(chǔ)組件 雖然golang官網(wǎng)提供的功能包中有websocket服務(wù)相關(guān)內(nèi)容但部分功能不全所以引用第三方包
包地址 github.com/gorilla/websocket
創(chuàng)建一個(gè)websocket的服務(wù)端
- websocket服務(wù)其實(shí)就是在http上升級而來許多地方與http相同
package smile
import (
"errors"
"log"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
)
const (
// 允許等待的寫入時(shí)間
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
// Maximum message size allowed from peer.
maxMessageSize = 512
)
// 最大的連接ID缺狠,每次連接都加1 處理
var maxConnId int64
// 客戶端讀寫消息
type wsMessage struct {
// websocket.TextMessage 消息類型
messageType int
data []byte
}
// ws 的所有連接
// 用于廣播
var wsConnAll map[int64]*wsConnection
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
// 允許所有的CORS 跨域請求,正式環(huán)境可以關(guān)閉
CheckOrigin: func(r *http.Request) bool {
return true
},
}
// 客戶端連接
type wsConnection struct {
wsSocket *websocket.Conn // 底層websocket
inChan chan *wsMessage // 讀隊(duì)列
outChan chan *wsMessage // 寫隊(duì)列
mutex sync.Mutex // 避免重復(fù)關(guān)閉管道,加鎖處理
isClosed bool
closeChan chan byte // 關(guān)閉通知
id int64
}
func wsHandler(resp http.ResponseWriter, req *http.Request) {
// 應(yīng)答客戶端告知升級連接為websocket
wsSocket, err := upgrader.Upgrade(resp, req, nil)
if err != nil {
log.Println("升級為websocket失敗", err.Error())
return
}
maxConnId++
// TODO 如果要控制連接數(shù)可以計(jì)算,wsConnAll長度
// 連接數(shù)保持一定數(shù)量,超過的部分不提供服務(wù)
wsConn := &wsConnection{
wsSocket: wsSocket,
inChan: make(chan *wsMessage, 1000),
outChan: make(chan *wsMessage, 1000),
closeChan: make(chan byte),
isClosed: false,
id: maxConnId,
}
wsConnAll[maxConnId] = wsConn
log.Println("當(dāng)前在線人數(shù)", len(wsConnAll))
// 處理器,發(fā)送定時(shí)信息,避免意外關(guān)閉
go wsConn.processLoop()
// 讀協(xié)程
go wsConn.wsReadLoop()
// 寫協(xié)程
go wsConn.wsWriteLoop()
}
// 處理隊(duì)列中的消息
func (wsConn *wsConnection) processLoop() {
// 處理消息隊(duì)列中的消息
// 獲取到消息隊(duì)列中的消息绿满,處理完成后,發(fā)送消息給客戶端
for {
msg, err := wsConn.wsRead()
if err != nil {
log.Println("獲取消息出現(xiàn)錯(cuò)誤", err.Error())
break
}
log.Println("接收到消息", string(msg.data))
// 修改以下內(nèi)容把客戶端傳遞的消息傳遞給處理程序
err = wsConn.wsWrite(msg.messageType, msg.data)
if err != nil {
log.Println("發(fā)送消息給客戶端出現(xiàn)錯(cuò)誤", err.Error())
break
}
}
}
// 處理消息隊(duì)列中的消息
func (wsConn *wsConnection) wsReadLoop() {
// 設(shè)置消息的最大長度
wsConn.wsSocket.SetReadLimit(maxMessageSize)
wsConn.wsSocket.SetReadDeadline(time.Now().Add(pongWait))
for {
// 讀一個(gè)message
msgType, data, err := wsConn.wsSocket.ReadMessage()
if err != nil {
websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure)
log.Println("消息讀取出現(xiàn)錯(cuò)誤", err.Error())
wsConn.close()
return
}
req := &wsMessage{
msgType,
data,
}
// 放入請求隊(duì)列,消息入棧
select {
case wsConn.inChan <- req:
case <-wsConn.closeChan:
return
}
}
}
// 發(fā)送消息給客戶端
func (wsConn *wsConnection) wsWriteLoop() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
}()
for {
select {
// 取一個(gè)應(yīng)答
case msg := <-wsConn.outChan:
// 寫給websocket
if err := wsConn.wsSocket.WriteMessage(msg.messageType, msg.data); err != nil {
log.Println("發(fā)送消息給客戶端發(fā)生錯(cuò)誤", err.Error())
// 切斷服務(wù)
wsConn.close()
return
}
case <-wsConn.closeChan:
// 獲取到關(guān)閉通知
return
case <-ticker.C:
// 出現(xiàn)超時(shí)情況
wsConn.wsSocket.SetWriteDeadline(time.Now().Add(writeWait))
if err := wsConn.wsSocket.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
// 寫入消息到隊(duì)列中
func (wsConn *wsConnection) wsWrite(messageType int, data []byte) error {
select {
case wsConn.outChan <- &wsMessage{messageType, data}:
case <-wsConn.closeChan:
return errors.New("連接已經(jīng)關(guān)閉")
}
return nil
}
// 讀取消息隊(duì)列中的消息
func (wsConn *wsConnection) wsRead() (*wsMessage, error) {
select {
case msg := <-wsConn.inChan:
// 獲取到消息隊(duì)列中的消息
return msg, nil
case <-wsConn.closeChan:
}
return nil, errors.New("連接已經(jīng)關(guān)閉")
}
// 關(guān)閉連接
func (wsConn *wsConnection) close() {
log.Println("關(guān)閉連接被調(diào)用了")
wsConn.wsSocket.Close()
wsConn.mutex.Lock()
defer wsConn.mutex.Unlock()
if wsConn.isClosed == false {
wsConn.isClosed = true
// 刪除這個(gè)連接的變量
delete(wsConnAll, wsConn.id)
close(wsConn.closeChan)
}
}
// 對所有用戶進(jìn)行廣播
func broadcastUsers(messageType int, data string) {
for _, ws := range wsConnAll {
ws.wsWrite(messageType, []byte(data))
}
}
// 啟動程序
func StartWebsocket(addrPort string) {
wsConnAll = make(map[int64]*wsConnection)
http.HandleFunc("/ws", wsHandler)
http.ListenAndServe(addrPort, nil)
}
啟動調(diào)用
// broadcastUsers 這個(gè)方法是對所有連接進(jìn)行廣播
smile.StartWebsocket("0.0.0.0:8080")