WebSocket在 HTML5 游戲和網(wǎng)頁消息推送都使用比較多。WebSocket 是 HTML5 的重要特性,它實(shí)現(xiàn)了基于瀏覽器的遠(yuǎn)程socket摄职,它使瀏覽器和服務(wù)器可以進(jìn)行全雙工通信朗鸠。
WebSocket 具體的特性和 http 的區(qū)別這里就不多說,可以去自己查一下乓土。
Go 官方?jīng)]有提供對(duì) WebSocket 的支持,必須選擇第三方提供的包溯警∪に眨《Go Web 編程》一書中的例子使用了 golang.org/x/net
下的 websocket
包。 另外一個(gè)使用比較多的是 gorilla/websocket
梯轻,我接觸的項(xiàng)目是使用的這個(gè)食磕。下面我就以 gorilla/websocket
來寫一個(gè)簡(jiǎn)單的通信示例。
gorilla/websocket
的資料參考:
GitHub:https://github.com/gorilla/websocket
Doc:https://godoc.org/github.com/gorilla/websocket
gorilla/websocket 簡(jiǎn)述
Upgrader
Upgrader 用于升級(jí) http 請(qǐng)求喳挑,把 http 請(qǐng)求升級(jí)為長(zhǎng)連接的 WebSocket彬伦。結(jié)構(gòu)如下:
type Upgrader struct {
// 指定升級(jí) websocket 握手完成的超時(shí)時(shí)間
HandshakeTimeout time.Duration
// 指定 io 操作的緩存大小,如果不指定就會(huì)自動(dòng)分配伊诵。
ReadBufferSize, WriteBufferSize int
// 寫數(shù)據(jù)操作的緩存池单绑,如果沒有設(shè)置值,write buffers 將會(huì)分配到鏈接生命周期里曹宴。
WriteBufferPool BufferPool
//按順序指定服務(wù)支持的協(xié)議搂橙,如值存在,則服務(wù)會(huì)從第一個(gè)開始匹配客戶端的協(xié)議笛坦。
Subprotocols []string
// 指定 http 的錯(cuò)誤響應(yīng)函數(shù)区转,如果沒有設(shè)置 Error 則苔巨,會(huì)生成 http.Error 的錯(cuò)誤響應(yīng)。
Error func(w http.ResponseWriter, r *http.Request, status int, reason error)
// 請(qǐng)求檢查函數(shù)废离,用于統(tǒng)一的鏈接檢查侄泽,以防止跨站點(diǎn)請(qǐng)求偽造。如果不檢查蜻韭,就設(shè)置一個(gè)返回值為true的函數(shù)悼尾。
// 如果請(qǐng)求Origin標(biāo)頭可以接受,CheckOrigin將返回true肖方。 如果CheckOrigin為nil诀豁,則使用安全默認(rèn)值:如果Origin請(qǐng)求頭存在且原始主機(jī)不等于請(qǐng)求主機(jī)頭,則返回false
CheckOrigin func(r *http.Request) bool
// EnableCompression 指定服務(wù)器是否應(yīng)嘗試協(xié)商每個(gè)郵件壓縮(RFC 7692)窥妇。
// 將此值設(shè)置為true并不能保證將支持壓縮舷胜。
// 目前僅支持“無上下文接管”模式
EnableCompression bool
}
func (*Upgrader) Upgrade
Upgrade 函數(shù)將 http 升級(jí)到 WebSocket 協(xié)議。定義如下:
// responseHeader包含在對(duì)客戶端升級(jí)請(qǐng)求的響應(yīng)中活翩。
// 使用responseHeader指定cookie(Set-Cookie)和應(yīng)用程序協(xié)商的子協(xié)議(Sec-WebSocket-Protocol)烹骨。
// 如果升級(jí)失敗,則升級(jí)將使用HTTP錯(cuò)誤響應(yīng)回復(fù)客戶端
// 返回一個(gè) Conn 指針材泄,拿到他后沮焕,可使用 Conn 讀寫數(shù)據(jù)與客戶端通信。
func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (*Conn, error)
使用實(shí)例
type WsServer struct {
......
// 定義一個(gè) upgrade 類型用于升級(jí) http 為 websocket
upgrade *websocket.Upgrader
}
func NewWsServer() *WsServer {
ws.upgrade = &websocket.Upgrader{
ReadBufferSize: 4096,//指定讀緩存區(qū)大小
WriteBufferSize: 1024,// 指定寫緩存區(qū)大小
// 檢測(cè)請(qǐng)求來源
CheckOrigin: func(r *http.Request) bool {
if r.Method != "GET" {
fmt.Println("method is not GET")
return false
}
if r.URL.Path != "/ws" {
fmt.Println("path error")
return false
}
return true
},
}
return ws
}
func (self *WsServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
......
// 收到 http 請(qǐng)求后 升級(jí) 協(xié)議
conn, err := self.upgrade.Upgrade(w, r, nil)
if err != nil {
fmt.Println("websocket error:", err)
return
}
fmt.Println("client connect :", conn.RemoteAddr())
go self.connHandle(conn)
}
http 服務(wù)
啟動(dòng)一個(gè) http 服務(wù)有多種方法
- 第一種
func (self *WsServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/v1" {
httpCode := http.StatusInternalServerError
reasePhrase := http.StatusText(httpCode)
fmt.Println("path error ", reasePhrase)
http.Error(w, reasePhrase, httpCode)
return
}
}
func (w *WsServer) Start() (err error) {
// 使用 net.Listen 監(jiān)聽端口服務(wù)
w.listener, err = net.Listen("tcp", w.addr)
if err != nil {
fmt.Println("net listen error:", err)
return
}
// 啟動(dòng) http 服務(wù)拉宗, w 參數(shù)類型需要 實(shí)現(xiàn) Handler 接口峦树,也就是 ServeHTTP 函數(shù)
err = http.Serve(w.listener, w)
if err != nil {
fmt.Println("http serve error:", err)
return
}
return nil
}
- 第二種
import (
"fmt"
"io"
"net/http"
)
func serveHandle(w http.ResponseWriter, r *http.Request) {
buf := make([]byte, 1024)
r.Body.Read(buf)
fmt.Println("request body", string(buf))
io.WriteString(w, "hello http server 1")
}
func main() {
http.HandleFunc("/v1", serveHandle)
// 直接使用 http 包的 ListenAndServe 函數(shù)監(jiān)聽服務(wù)
http.ListenAndServe(s.addr, nil)
}
WebSocket 完整代碼
http 服務(wù) + Upgrade 實(shí)現(xiàn) WebSocket
package main
import (
"fmt"
"net"
"net/http"
"time"
"github.com/gorilla/websocket"
)
type WsServer struct {
listener net.Listener
addr string
upgrade *websocket.Upgrader
}
func NewWsServer() *WsServer {
ws := new(WsServer)
ws.addr = "0.0.0.0:10215"
ws.upgrade = &websocket.Upgrader{
ReadBufferSize: 4096,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
if r.Method != "GET" {
fmt.Println("method is not GET")
return false
}
if r.URL.Path != "/ws" {
fmt.Println("path error")
return false
}
return true
},
}
return ws
}
func (self *WsServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/ws" {
httpCode := http.StatusInternalServerError
reasePhrase := http.StatusText(httpCode)
fmt.Println("path error ", reasePhrase)
http.Error(w, reasePhrase, httpCode)
return
}
conn, err := self.upgrade.Upgrade(w, r, nil)
if err != nil {
fmt.Println("websocket error:", err)
return
}
fmt.Println("client connect :", conn.RemoteAddr())
go self.connHandle(conn)
}
func (self *WsServer) connHandle(conn *websocket.Conn) {
defer func() {
conn.Close()
}()
stopCh := make(chan int)
go self.send(conn, stopCh)
for {
conn.SetReadDeadline(time.Now().Add(time.Millisecond * time.Duration(5000)))
_, msg, err := conn.ReadMessage()
if err != nil {
close(stopCh)
// 判斷是不是超時(shí)
if netErr, ok := err.(net.Error); ok {
if netErr.Timeout() {
fmt.Printf("ReadMessage timeout remote: %v\n", conn.RemoteAddr())
return
}
}
// 其他錯(cuò)誤,如果是 1001 和 1000 就不打印日志
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
fmt.Printf("ReadMessage other remote:%v error: %v \n", conn.RemoteAddr(), err)
}
return
}
fmt.Println("收到消息:", string(msg))
}
}
//測(cè)試一次性發(fā)送 10萬條數(shù)據(jù)給 client, 如果不使用 time.Sleep browser 過了超時(shí)時(shí)間會(huì)斷開
func (self *WsServer) send10(conn *websocket.Conn) {
for i := 0; i < 100000; i++ {
data := fmt.Sprintf("hello websocket test from server %v", time.Now().UnixNano())
err := conn.WriteMessage(1, []byte(data))
if err != nil {
fmt.Println("send msg faild ", err)
return
}
// time.Sleep(time.Millisecond * 1)
}
}
func (self *WsServer) send(conn *websocket.Conn, stopCh chan int) {
self.send10(conn)
for {
select {
case <-stopCh:
fmt.Println("connect closed")
return
case <-time.After(time.Second * 1):
data := fmt.Sprintf("hello websocket test from server %v", time.Now().UnixNano())
err := conn.WriteMessage(1, []byte(data))
fmt.Println("sending....")
if err != nil {
fmt.Println("send msg faild ", err)
return
}
}
}
}
func (w *WsServer) Start() (err error) {
w.listener, err = net.Listen("tcp", w.addr)
if err != nil {
fmt.Println("net listen error:", err)
return
}
err = http.Serve(w.listener, w)
if err != nil {
fmt.Println("http serve error:", err)
return
}
return nil
}
func main() {
ws := NewWsServer()
ws.Start()
}
WebSocket 客戶端
- 更新中
VILEngine.js 文件
let VIL = (function () {
let VIL = {
};
function DefaultWebSocket(host, call) {
let _host = host;
let _isOpen = false;
let _bufQueue = [];
let _bufCap = 100;
let _call = null;
if("undefined" !== typeof call && call !== null){
_call = call
}else{
_call = {
onConnect:function (e) {
console.log("connect success ", e);
},
onDisconnect:function (e) {
console.log("disconnect ", e);
},
onMsg:function (data) {
//console.log("receive message ", data)
}
}
}
let _socket = new WebSocket(_host);
_socket.binaryType = "arraybuffer";
/**
* 設(shè)置發(fā)送消息緩存隊(duì)列的容量
* @param {number} cap
* @constructor
*/
this.setBufferCap = function(cap){
if("number" !== typeof cap ){
console.error("parameter type is not number ");
return ;
}
if(cap < 0){
console.error("parameter value can not less then 0");
return ;
}
_bufCap = cap;
};
/**
* 發(fā)送消息
* @param {string | ArrayBuffer } data
* @constructor
*/
this.send = function(data){
if(_isOpen && _socket){
_socket.send("");
}else{
if (_bufQueue < _bufCap){
_bufQueue.push(data);
}
}
};
this.close = function(){
_socket.close(1000, "normal");
};
_socket.onopen = function(even){
_isOpen = true;
_call.onConnect(even);
while (_bufQueue > 0){
_socket.send(_bufQueue.shift());
}
};
_socket.onmessage = function(e){
let data = e.data;
_call.onMsg(data);
};
/**
* 收到關(guān)閉連接
* @param even
*/
_socket.onclose = function(even){
_isOpen = false;
_call.onDisconnect({host:_host, event:even});
};
/**
* 收到錯(cuò)誤
* @param err
*/
_socket.onerror = function(err){
_isOpen = false;
_call.onDisconnect({host:_host, event:err});
};
}
try{
VIL.EngineSocket = DefaultWebSocket ;
}catch (e) {
console.error("VILEngine error ", e);
}
return VIL;
})();
websocket.html 文件
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
</body>
<script type="text/javascript" src="VILEngine.js"></script>
<script>
let counter = 0;
let isConnect = false;
let handler = {
onConnect:function (e) {
isConnect = true;
console.log("handler connect success ", e);
var se = setInterval(function () {
if(isConnect === false){
clearInterval(se);
}
console.log("setInterval", Date.now());
socket.send("web browser setInterval");
}, 3000)
},
onDisconnect:function (e) {
isConnect = false;
console.log("handler disconnect ", e);
},
onMsg:function (data) {
counter++;
if(counter >= 2000){
counter = 0;
console.log("handler receive message ", data)
}
}
};
let socket = new VIL.EngineSocket("ws://127.0.0.1:10215/ws", handler);
</script>
</html>
WebSocket 服務(wù)端 和 客戶端在 GitHub 上有源碼旦事,需要的可以訪問
https://github.com/vilsongwei/practiceDemo/tree/master/websockTest