一. 什么是Tao
Tao屎暇,在英文中的意思是“The ultimate principle of universe”饭宾,即“道”,它是宇宙的終極奧義看铆。
“道生一盛末,一生二弹惦,二生三,三生無(wú)窮助泽。” ——《道德經(jīng)》
Tao同時(shí)也是我用Go語(yǔ)言開(kāi)發(fā)的一個(gè)異步的TCP服務(wù)器框架(TCP Asynchronous Go server FramewOrk)暑刃,秉承Go語(yǔ)言“Less is more”的極簡(jiǎn)主義哲學(xué),它能穿透一切表象架谎,帶你一窺網(wǎng)絡(luò)編程的世界,讓你從此徹底擺脫只會(huì)寫“socket-bind-listen-accept”的窘境裹匙。本文將簡(jiǎn)單討論一下這個(gè)框架的設(shè)計(jì)思路以及自己的一些思考。
1. Tao解決什么問(wèn)題
1.1 場(chǎng)景
你開(kāi)發(fā)的產(chǎn)品有一套特有的業(yè)務(wù)邏輯惰匙,要通過(guò)互聯(lián)網(wǎng)得到服務(wù)端的支持才能為你的客戶提供服務(wù)。
1.2 問(wèn)題
怎樣快速穩(wěn)定地實(shí)現(xiàn)產(chǎn)品的功能绘盟,而不需要耗費(fèi)大量的時(shí)間處理各種底層的網(wǎng)絡(luò)通信細(xì)節(jié)。
1.3 解決方案
Tao提供了一種用框架支撐業(yè)務(wù)邏輯的機(jī)制撤嫩。你只需要與客戶端定義好消息格式茴她,然后將對(duì)應(yīng)的業(yè)務(wù)邏輯編寫成函數(shù)注冊(cè)到框架中就可以了。
2. 50行啟動(dòng)一個(gè)聊天服務(wù)器
讓我們舉一個(gè)例子來(lái)看看如何使用Tao框架實(shí)現(xiàn)一個(gè)簡(jiǎn)單的群聊天服務(wù)器己沛。服務(wù)器端代碼可以這么寫:
package main
import (
"fmt"
"net"
"github.com/leesper/holmes"
"github.com/leesper/tao"
"github.com/leesper/tao/examples/chat"
)
// ChatServer is the chatting server.
type ChatServer struct {
*tao.Server
}
// NewChatServer returns a ChatServer.
func NewChatServer() *ChatServer {
onConnectOption := tao.OnConnectOption(func(conn tao.WriteCloser) bool {
holmes.Infoln("on connect")
return true
})
onErrorOption := tao.OnErrorOption(func(conn tao.WriteCloser) {
holmes.Infoln("on error")
})
onCloseOption := tao.OnCloseOption(func(conn tao.WriteCloser) {
holmes.Infoln("close chat client")
})
return &ChatServer{
tao.NewServer(onConnectOption, onErrorOption, onCloseOption),
}
}
func main() {
defer holmes.Start().Stop()
tao.Register(chat.ChatMessage, chat.DeserializeMessage, chat.ProcessMessage)
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", "0.0.0.0", 12345))
if err != nil {
holmes.Fatalln("listen error", err)
}
chatServer := NewChatServer()
err = chatServer.Start(l)
if err != nil {
holmes.Fatalln("start error", err)
}
}
啟動(dòng)一個(gè)服務(wù)器只需要三步就能完成垫桂。首先注冊(cè)消息和業(yè)務(wù)邏輯回調(diào)霹粥,其次填入IP地址和端口后控,最后Start一下就可以了矾利。這時(shí)候客戶端就能夠發(fā)起連接,并開(kāi)始聊天时肿。業(yè)務(wù)邏輯的實(shí)現(xiàn)很簡(jiǎn)單,遍歷所有的連接须教,然后發(fā)送數(shù)據(jù):
// ProcessMessage handles the Message logic.
func ProcessMessage(ctx context.Context, conn tao.WriteCloser) {
holmes.Infof("ProcessMessage")
s, ok := tao.ServerFromContext(ctx)
if ok {
msg := tao.MessageFromContext(ctx)
s.Broadcast(msg)
}
}
3. Go語(yǔ)言的編程哲學(xué)
Go語(yǔ)言是“云計(jì)算時(shí)代的C語(yǔ)言”桅锄,適用于開(kāi)發(fā)基礎(chǔ)性服務(wù),比如服務(wù)器。它語(yǔ)法類似C語(yǔ)言且標(biāo)準(zhǔn)庫(kù)豐富盟戏,上手較快,所以開(kāi)發(fā)效率高笛求;編譯速度快狡孔,運(yùn)行效率接近C,所以運(yùn)行效率高辱揭。
3.1 面向?qū)ο缶幊?/h3>
Go語(yǔ)言面向?qū)ο缶幊痰娘L(fēng)格是“多用組合,少用繼承”,以匿名嵌入的方式實(shí)現(xiàn)繼承听皿。比如上面的聊天服務(wù)器ChatServer:
// ChatServer is the chatting server.
type ChatServer struct {
*tao.Server
}
于是ChatServer就自動(dòng)繼承了Server所有的屬性和方法。當(dāng)然啊送,這里是以指針的方式嵌入的昔逗。
3.2 面向接口編程
Go語(yǔ)言的面向接口編程是“鴨子類型”的婆排,即“如果我走起來(lái)像鴨子段只,叫起來(lái)像鴨子赞枕,那么我就是一只鴨子”姐赡。其他的編程語(yǔ)言需要顯示地說(shuō)明自己繼承某個(gè)接口项滑,Go語(yǔ)言卻采取的是“隱式聲明”的方式枪狂。比如Tao框架使用的多線程日志庫(kù)Holmes實(shí)現(xiàn)“每小時(shí)創(chuàng)建一個(gè)新日志文件”功能的核心代碼如下:
func (ls *logSegment)Write(p []byte) (n int, err error) {
if ls.timeToCreate != nil && ls.logFile != os.Stdout && ls.logFile != os.Stderr {
select {
case current := <-ls.timeToCreate:
ls.logFile.Close()
ls.logFile = nil
name := getLogFileName(current)
ls.logFile, err = os.Create(path.Join(ls.logPath, name))
if err != nil {
fmt.Fprintln(os.Stderr, err)
ls.logFile = os.Stderr
} else {
next := current.Truncate(ls.unit).Add(ls.unit)
ls.timeToCreate = time.After(next.Sub(time.Now()))
}
default:
// do nothing
}
}
return ls.logFile.Write(p)
}
而標(biāo)準(zhǔn)庫(kù)中的io.Writer定義如下,那么這里的logSegment就實(shí)現(xiàn)了io.Writer的接口,所有以io.Writer作為形參的函數(shù)谈飒,我都可以傳一個(gè)logSegment的實(shí)參進(jìn)去杭措。
type Writer interface {
Write(p []byte) (n int, err error)
}
3.3 一個(gè)中心,兩個(gè)基本點(diǎn)
掌握Go語(yǔ)言泉懦,要把握“一個(gè)中心,兩個(gè)基本點(diǎn)”邓嘹⌒谘海“一個(gè)中心”是Go語(yǔ)言并發(fā)模型窖维,即“不要通過(guò)共享內(nèi)存來(lái)通信陈辱,要通過(guò)通信來(lái)共享內(nèi)存”沛贪;“兩個(gè)基本點(diǎn)”是Go語(yǔ)言的并發(fā)模型的兩大基石:channel和go-routine。理解了它們就能看懂大部分代碼媚送。下面讓我們正式開(kāi)始介紹Tao框架吧。
二. Tao的設(shè)計(jì)思路
1. 服務(wù)器的啟動(dòng)
Tao框架支持通過(guò)tao.TLSCredsOption()函數(shù)提供傳輸層安全的TLS Server吟秩。服務(wù)器的核心職責(zé)是“監(jiān)聽(tīng)并接受客戶端連接”涵防。每個(gè)進(jìn)程能夠打開(kāi)的文件描述符是有限制的,所以它還需要限制最大并發(fā)連接數(shù)椰憋,關(guān)鍵代碼如下:
// Start starts the TCP server, accepting new clients and creating service
// go-routine for each. The service go-routines read messages and then call
// the registered handlers to handle them. Start returns when failed with fatal
// errors, the listener willl be closed when returned.
func (s *Server) Start(l net.Listener) error {
s.mu.Lock()
if s.lis == nil {
s.mu.Unlock()
l.Close()
return ErrServerClosed
}
s.lis[l] = true
s.mu.Unlock()
defer func() {
s.mu.Lock()
if s.lis != nil && s.lis[l] {
l.Close()
delete(s.lis, l)
}
s.mu.Unlock()
}()
holmes.Infof("server start, net %s addr %s\n", l.Addr().Network(), l.Addr().String())
s.wg.Add(1)
go s.timeOutLoop()
var tempDelay time.Duration
for {
rawConn, err := l.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay >= max {
tempDelay = max
}
holmes.Errorf("accept error %v, retrying in %d\n", err, tempDelay)
select {
case <-time.After(tempDelay):
case <-s.ctx.Done():
}
continue
}
return err
}
tempDelay = 0
// how many connections do we have ?
sz := s.conns.Size()
if sz >= MaxConnections {
holmes.Warnf("max connections size %d, refuse\n", sz)
rawConn.Close()
continue
}
if s.opts.tlsCfg != nil {
rawConn = tls.Server(rawConn, s.opts.tlsCfg)
}
netid := netIdentifier.GetAndIncrement()
sc := NewServerConn(netid, s, rawConn)
sc.SetName(sc.rawConn.RemoteAddr().String())
s.mu.Lock()
if s.sched != nil {
sc.RunEvery(s.interv, s.sched)
}
s.mu.Unlock()
s.conns.Put(netid, sc)
addTotalConn(1)
s.wg.Add(1)
go func() {
sc.Start()
}()
holmes.Infof("accepted client %s, id %d, total %d\n", sc.GetName(), netid, s.conns.Size())
s.conns.RLock()
for _, c := range s.conns.m {
holmes.Infof("client %s\n", c.GetName())
}
s.conns.RUnlock()
} // for loop
}
如果服務(wù)器在接受客戶端連接請(qǐng)求的時(shí)候發(fā)生了臨時(shí)錯(cuò)誤,那么服務(wù)器將等待最多1秒的時(shí)間再重新嘗試接受請(qǐng)求卵渴,如果現(xiàn)有的連接數(shù)超過(guò)了MaxConnections(默認(rèn)1000)浪读,就拒絕并關(guān)閉連接互订,否則啟動(dòng)一個(gè)新的連接開(kāi)始工作仰禽。
2. 服務(wù)器的優(yōu)雅關(guān)閉
Go語(yǔ)言在發(fā)布1.7版時(shí)在標(biāo)準(zhǔn)庫(kù)中引入了context包。context包提供的Context結(jié)構(gòu)能夠在服務(wù)器温峭,網(wǎng)絡(luò)連接以及各相關(guān)線程之間建立一種相關(guān)聯(lián)的“上下文”關(guān)系。這種上下文關(guān)系包含的信息是與某次網(wǎng)絡(luò)請(qǐng)求有關(guān)的(request scoped)揖庄,因此與該請(qǐng)求有關(guān)的所有Go線程都能安全地訪問(wèn)這個(gè)上下文結(jié)構(gòu),讀取或者寫入與上下文有關(guān)的數(shù)據(jù)检号。比如handleLoop線程會(huì)將某個(gè)網(wǎng)絡(luò)連接的net ID以及message打包到上下文結(jié)構(gòu)中,然后連同handler函數(shù)一起交給工作者線程去處理:
// handleLoop() - put handler or timeout callback into worker go-routines
func handleLoop(c WriteCloser, wg *sync.WaitGroup) {
//... omitted ...
for {
select {
//... omitted ...
case msgHandler := <-handlerCh:
msg, handler := msgHandler.message, msgHandler.handler
if handler != nil {
if askForWorker {
WorkerPoolInstance().Put(netID, func() {
handler(NewContextWithNetID(NewContextWithMessage(ctx, msg), netID), c)
})
}
}
//... omitted ...
}
}
隨后,在工作者線程真正執(zhí)行時(shí)玛痊,業(yè)務(wù)邏輯代碼就能在handler函數(shù)中獲取到message或者net ID擂煞,這些都是與本次請(qǐng)求有關(guān)的上下文數(shù)據(jù)蝗拿,比如一個(gè)典型的echo server就會(huì)這樣處理:
// ProcessMessage process the logic of echo message.
func ProcessMessage(ctx context.Context, conn tao.WriteCloser) {
msg := tao.MessageFromContext(ctx).(Message)
holmes.Infof("receving message %s\n", msg.Content)
conn.Write(msg)
}
使用context的另外一個(gè)場(chǎng)景是實(shí)現(xiàn)服務(wù)器及網(wǎng)絡(luò)連接的“優(yōu)雅關(guān)閉”。服務(wù)器在管理網(wǎng)絡(luò)連接時(shí)會(huì)將自己的上下文傳遞給它仓手,而網(wǎng)絡(luò)連接啟動(dòng)新線程時(shí)同樣也會(huì)將自己的上下文傳遞給這些線程,這些上下文都是可取消(cancelable)的辛慰。當(dāng)服務(wù)器需要停機(jī)或者連接將要關(guān)閉時(shí),只要調(diào)用cancel函數(shù)速客,所有這些線程就能收到通知并退出。服務(wù)器或者網(wǎng)絡(luò)連接通過(guò)阻塞等待這些線程關(guān)閉之后再關(guān)閉浪耘,就能最大限度保證正確退出。服務(wù)器關(guān)閉的關(guān)鍵代碼如下:
// Stop gracefully closes the server, it blocked until all connections
// are closed and all go-routines are exited.
func (s *Server) Stop() {
// immediately stop accepting new clients
s.mu.Lock()
listeners := s.lis
s.lis = nil
s.mu.Unlock()
for l := range listeners {
l.Close()
holmes.Infof("stop accepting at address %s\n", l.Addr().String())
}
// close all connections
conns := map[int64]*ServerConn{}
s.conns.RLock()
for k, v := range s.conns.m {
conns[k] = v
}
s.conns.Clear()
s.conns.RUnlock()
for _, c := range conns {
c.rawConn.Close()
holmes.Infof("close client %s\n", c.GetName())
}
s.mu.Lock()
s.cancel()
s.mu.Unlock()
s.wg.Wait()
holmes.Infoln("server stopped gracefully, bye.")
os.Exit(0)
}
3. 網(wǎng)絡(luò)連接模型
在其他的編程語(yǔ)言中,采用Reactor模式編寫的服務(wù)器往往需要在一個(gè)IO線程異步地通過(guò)epoll進(jìn)行多路復(fù)用掘鄙。而因?yàn)镚o線程的開(kāi)銷廉價(jià),Go語(yǔ)言可以對(duì)每一個(gè)網(wǎng)絡(luò)連接創(chuàng)建三個(gè)go-routine颅夺。readLoop()負(fù)責(zé)讀取數(shù)據(jù)并反序列化成消息;writeLoop()負(fù)責(zé)序列化消息并發(fā)送二進(jìn)制字節(jié)流拗慨;最后handleLoop()負(fù)責(zé)調(diào)用消息處理函數(shù)。這三個(gè)協(xié)程在連接創(chuàng)建并啟動(dòng)時(shí)就會(huì)各自獨(dú)立運(yùn)行:
// Start starts the server connection, creating go-routines for reading,
// writing and handlng.
func (sc *ServerConn) Start() {
holmes.Infof("conn start, <%v -> %v>\n", sc.rawConn.LocalAddr(), sc.rawConn.RemoteAddr())
onConnect := sc.belong.opts.onConnect
if onConnect != nil {
onConnect(sc)
}
loopers := []func(WriteCloser, *sync.WaitGroup){readLoop, writeLoop, handleLoop}
for _, l := range loopers {
looper := l
sc.wg.Add(1)
go looper(sc, sc.wg)
}
}
3.1 核心代碼分析之readLoop
readLoop做了三件關(guān)鍵的工作。首先調(diào)用消息編解碼器將接收到的字節(jié)流反序列化成消息其爵;然后更新用于心跳檢測(cè)的時(shí)間戳;最后摇幻,根據(jù)消息的協(xié)議號(hào)找到對(duì)應(yīng)的消息處理函數(shù),如果注冊(cè)了消息回調(diào)函數(shù),那么就調(diào)用該函數(shù)處理消息银酗,否則將消息和處理函數(shù)打包發(fā)送到handlerCh中黍特,注意其中的cDone和sDone分別是網(wǎng)絡(luò)連接和服務(wù)器上下文結(jié)構(gòu)中的channel,分別用于監(jiān)聽(tīng)網(wǎng)絡(luò)連接和服務(wù)器的“關(guān)閉”事件通知(下同)迫像。
/* readLoop() blocking read from connection, deserialize bytes into message,
then find corresponding handler, put it into channel */
func readLoop(c WriteCloser, wg *sync.WaitGroup) {
var (
rawConn net.Conn
codec Codec
cDone <-chan struct{}
sDone <-chan struct{}
setHeartBeatFunc func(int64)
onMessage onMessageFunc
handlerCh chan MessageHandler
msg Message
err error
)
switch c := c.(type) {
case *ServerConn:
rawConn = c.rawConn
codec = c.belong.opts.codec
cDone = c.ctx.Done()
sDone = c.belong.ctx.Done()
setHeartBeatFunc = c.SetHeartBeat
onMessage = c.belong.opts.onMessage
handlerCh = c.handlerCh
case *ClientConn:
rawConn = c.rawConn
codec = c.opts.codec
cDone = c.ctx.Done()
sDone = nil
setHeartBeatFunc = c.SetHeartBeat
onMessage = c.opts.onMessage
handlerCh = c.handlerCh
}
defer func() {
if p := recover(); p != nil {
holmes.Errorf("panics: %v\n", p)
}
wg.Done()
holmes.Debugln("readLoop go-routine exited")
c.Close()
}()
for {
select {
case <-cDone: // connection closed
holmes.Debugln("receiving cancel signal from conn")
return
case <-sDone: // server closed
holmes.Debugln("receiving cancel signal from server")
return
default:
msg, err = codec.Decode(rawConn)
if err != nil {
holmes.Errorf("error decoding message %v\n", err)
if _, ok := err.(ErrUndefined); ok {
// update heart beats
setHeartBeatFunc(time.Now().UnixNano())
continue
}
return
}
setHeartBeatFunc(time.Now().UnixNano())
handler := GetHandlerFunc(msg.MessageNumber())
if handler == nil {
if onMessage != nil {
holmes.Infof("message %d call onMessage()\n", msg.MessageNumber())
onMessage(msg, c.(WriteCloser))
} else {
holmes.Warnf("no handler or onMessage() found for message %d\n", msg.MessageNumber())
}
continue
}
handlerCh <- MessageHandler{msg, handler}
}
}
}
3.2 核心代碼分析之writeLoop
writeLoop做了一件事情由缆,從sendCh中讀取已序列化好的字節(jié)流均唉,然后發(fā)送到網(wǎng)絡(luò)上。但是要注意肚菠,該協(xié)程在連接關(guān)閉退出執(zhí)行之前舔箭,會(huì)非阻塞地將sendCh中的消息全部發(fā)送完畢再退出,避免漏發(fā)消息案糙,這就是關(guān)鍵所在限嫌。
/* writeLoop() receive message from channel, serialize it into bytes,
then blocking write into connection */
func writeLoop(c WriteCloser, wg *sync.WaitGroup) {
var (
rawConn net.Conn
sendCh chan []byte
cDone <-chan struct{}
sDone <-chan struct{}
pkt []byte
err error
)
switch c := c.(type) {
case *ServerConn:
rawConn = c.rawConn
sendCh = c.sendCh
cDone = c.ctx.Done()
sDone = c.belong.ctx.Done()
case *ClientConn:
rawConn = c.rawConn
sendCh = c.sendCh
cDone = c.ctx.Done()
sDone = nil
}
defer func() {
if p := recover(); p != nil {
holmes.Errorf("panics: %v\n", p)
}
// drain all pending messages before exit
OuterFor:
for {
select {
case pkt = <-sendCh:
if pkt != nil {
if _, err = rawConn.Write(pkt); err != nil {
holmes.Errorf("error writing data %v\n", err)
}
}
default:
break OuterFor
}
}
wg.Done()
holmes.Debugln("writeLoop go-routine exited")
c.Close()
}()
for {
select {
case <-cDone: // connection closed
holmes.Debugln("receiving cancel signal from conn")
return
case <-sDone: // server closed
holmes.Debugln("receiving cancel signal from server")
return
case pkt = <-sendCh:
if pkt != nil {
if _, err = rawConn.Write(pkt); err != nil {
holmes.Errorf("error writing data %v\n", err)
return
}
}
}
}
}
3.3 核心代碼分析之handleLoop
readLoop將消息和處理函數(shù)打包發(fā)給了handlerCh时捌,于是handleLoop就從handlerCh中取出消息和處理函數(shù)怒医,然后交給工作者線程池,由后者負(fù)責(zé)調(diào)度執(zhí)行奢讨,完成對(duì)消息的處理稚叹。這里很好的詮釋了Go語(yǔ)言是如何通過(guò)channel實(shí)現(xiàn)Go線程間通信的。
// handleLoop() - put handler or timeout callback into worker go-routines
func handleLoop(c WriteCloser, wg *sync.WaitGroup) {
var (
cDone <-chan struct{}
sDone <-chan struct{}
timerCh chan *OnTimeOut
handlerCh chan MessageHandler
netID int64
ctx context.Context
askForWorker bool
)
switch c := c.(type) {
case *ServerConn:
cDone = c.ctx.Done()
sDone = c.belong.ctx.Done()
timerCh = c.timerCh
handlerCh = c.handlerCh
netID = c.netid
ctx = c.ctx
askForWorker = true
case *ClientConn:
cDone = c.ctx.Done()
sDone = nil
timerCh = c.timing.timeOutChan
handlerCh = c.handlerCh
netID = c.netid
ctx = c.ctx
}
defer func() {
if p := recover(); p != nil {
holmes.Errorf("panics: %v\n", p)
}
wg.Done()
holmes.Debugln("handleLoop go-routine exited")
c.Close()
}()
for {
select {
case <-cDone: // connectin closed
holmes.Debugln("receiving cancel signal from conn")
return
case <-sDone: // server closed
holmes.Debugln("receiving cancel signal from server")
return
case msgHandler := <-handlerCh:
msg, handler := msgHandler.message, msgHandler.handler
if handler != nil {
if askForWorker {
WorkerPoolInstance().Put(netID, func() {
handler(NewContextWithNetID(NewContextWithMessage(ctx, msg), netID), c)
})
addTotalHandle()
} else {
handler(NewContextWithNetID(NewContextWithMessage(ctx, msg), netID), c)
}
}
case timeout := <-timerCh:
if timeout != nil {
timeoutNetID := NetIDFromContext(timeout.Ctx)
if timeoutNetID != netID {
holmes.Errorf("timeout net %d, conn net %d, mismatched!\n", timeoutNetID, netID)
}
if askForWorker {
WorkerPoolInstance().Put(netID, func() {
timeout.Callback(time.Now(), c.(WriteCloser))
})
} else {
timeout.Callback(time.Now(), c.(WriteCloser))
}
}
}
}
}
4. 消息處理機(jī)制
4.1 消息上下文
任何一個(gè)實(shí)現(xiàn)了Message接口的類型拿诸,都是一個(gè)消息扒袖,它需要提供方法訪問(wèn)自己的協(xié)議號(hào)并將自己序列化成字節(jié)數(shù)組;另外亩码,每個(gè)消息都需要注冊(cè)自己的反序列化函數(shù)和處理函數(shù):
// Handler takes the responsibility to handle incoming messages.
type Handler interface {
Handle(context.Context, interface{})
}
// HandlerFunc serves as an adapter to allow the use of ordinary functions as handlers.
type HandlerFunc func(context.Context, WriteCloser)
// Handle calls f(ctx, c)
func (f HandlerFunc) Handle(ctx context.Context, c WriteCloser) {
f(ctx, c)
}
// UnmarshalFunc unmarshals bytes into Message.
type UnmarshalFunc func([]byte) (Message, error)
// handlerUnmarshaler is a combination of unmarshal and handle functions for message.
type handlerUnmarshaler struct {
handler HandlerFunc
unmarshaler UnmarshalFunc
}
func init() {
messageRegistry = map[int32]messageFunc{}
buf = new(bytes.Buffer)
}
// Register registers the unmarshal and handle functions for msgType.
// If no unmarshal function provided, the message will not be parsed.
// If no handler function provided, the message will not be handled unless you
// set a default one by calling SetOnMessageCallback.
// If Register being called twice on one msgType, it will panics.
func Register(msgType int32, unmarshaler func([]byte) (Message, error), handler func(context.Context, WriteCloser)) {
if _, ok := messageRegistry[msgType]; ok {
panic(fmt.Sprintf("trying to register message %d twice", msgType))
}
messageRegistry[msgType] = handlerUnmarshaler{
unmarshaler: unmarshaler,
handler: HandlerFunc(handler),
}
}
// GetUnmarshalFunc returns the corresponding unmarshal function for msgType.
func GetUnmarshalFunc(msgType int32) UnmarshalFunc {
entry, ok := messageRegistry[msgType]
if !ok {
return nil
}
return entry.unmarshaler
}
// GetHandlerFunc returns the corresponding handler function for msgType.
func GetHandlerFunc(msgType int32) HandlerFunc {
entry, ok := messageRegistry[msgType]
if !ok {
return nil
}
return entry.handler
}
// Message represents the structured data that can be handled.
type Message interface {
MessageNumber() int32
Serialize() ([]byte, error)
}
對(duì)每個(gè)消息處理函數(shù)而言季率,要處理的消息以及發(fā)送該消息的客戶端都是不同的,這些信息被稱為“消息上下文”描沟,用Context結(jié)構(gòu)表示飒泻,每個(gè)不同的客戶端用一個(gè)64位整數(shù)netid標(biāo)識(shí):
// Context is the context info for every handler function.
// Handler function handles the business logic about message.
// We can find the client connection who sent this message by netid and send back responses.
type Context struct{
message Message
netid int64
}
func NewContext(msg Message, id int64) Context {
return Context{
message: msg,
netid: id,
}
}
func (ctx Context)Message() Message {
return ctx.message
}
func (ctx Context)Id() int64 {
return ctx.netid
}
4.2 編解碼器
接收數(shù)據(jù)時(shí),編解碼器(Codec)負(fù)責(zé)按照一定的格式將網(wǎng)絡(luò)連接上讀取的字節(jié)數(shù)據(jù)反序列化成消息吏廉,并將消息交給上層處理(解碼)泞遗;發(fā)送數(shù)據(jù)時(shí),編解碼器將上層傳遞過(guò)來(lái)的消息序列化成字節(jié)數(shù)據(jù)席覆,交給下層發(fā)送(編碼):
// Codec is the interface for message coder and decoder.
// Application programmer can define a custom codec themselves.
type Codec interface {
Decode(Connection) (Message, error)
Encode(Message) ([]byte, error)
}
Tao框架采用的是“Type-Length-Data”的格式打包數(shù)據(jù)史辙。Type占4個(gè)字節(jié),表示協(xié)議類型;Length占4個(gè)字節(jié)聊倔,表示消息長(zhǎng)度晦毙,Data為變長(zhǎng)字節(jié)序列,長(zhǎng)度由Length表示耙蔑。反序列化時(shí)结序,由Type字段可以確定協(xié)議類型,然后截取Length長(zhǎng)度的字節(jié)數(shù)據(jù)Data纵潦,并調(diào)用已注冊(cè)的反序列化函數(shù)處理。核心代碼如下:
// Codec is the interface for message coder and decoder.
// Application programmer can define a custom codec themselves.
type Codec interface {
Decode(net.Conn) (Message, error)
Encode(Message) ([]byte, error)
}
// TypeLengthValueCodec defines a special codec.
// Format: type-length-value |4 bytes|4 bytes|n bytes <= 8M|
type TypeLengthValueCodec struct{}
// Decode decodes the bytes data into Message
func (codec TypeLengthValueCodec) Decode(raw net.Conn) (Message, error) {
byteChan := make(chan []byte)
errorChan := make(chan error)
go func(bc chan []byte, ec chan error) {
typeData := make([]byte, MessageTypeBytes)
_, err := io.ReadFull(raw, typeData)
if err != nil {
ec <- err
close(bc)
close(ec)
holmes.Debugln("go-routine read message type exited")
return
}
bc <- typeData
}(byteChan, errorChan)
var typeBytes []byte
select {
case err := <-errorChan:
return nil, err
case typeBytes = <-byteChan:
if typeBytes == nil {
holmes.Warnln("read type bytes nil")
return nil, ErrBadData
}
typeBuf := bytes.NewReader(typeBytes)
var msgType int32
if err := binary.Read(typeBuf, binary.LittleEndian, &msgType); err != nil {
return nil, err
}
lengthBytes := make([]byte, MessageLenBytes)
_, err := io.ReadFull(raw, lengthBytes)
if err != nil {
return nil, err
}
lengthBuf := bytes.NewReader(lengthBytes)
var msgLen uint32
if err = binary.Read(lengthBuf, binary.LittleEndian, &msgLen); err != nil {
return nil, err
}
if msgLen > MessageMaxBytes {
holmes.Errorf("message(type %d) has bytes(%d) beyond max %d\n", msgType, msgLen, MessageMaxBytes)
return nil, ErrBadData
}
// read application data
msgBytes := make([]byte, msgLen)
_, err = io.ReadFull(raw, msgBytes)
if err != nil {
return nil, err
}
// deserialize message from bytes
unmarshaler := GetUnmarshalFunc(msgType)
if unmarshaler == nil {
return nil, ErrUndefined(msgType)
}
return unmarshaler(msgBytes)
}
}
這里的代碼存在一些微妙的設(shè)計(jì)垃环,需要仔細(xì)解釋一下邀层。TypeLengthValueCodec.Decode()函數(shù)會(huì)被readLoop協(xié)程用到。因?yàn)閕o.ReadFull()是同步調(diào)用遂庄,沒(méi)有數(shù)據(jù)可讀時(shí)會(huì)阻塞readLoop協(xié)程寥院。此時(shí)如果關(guān)閉網(wǎng)絡(luò)連接,readLoop協(xié)程將無(wú)法退出涛目。所以這里的代碼用到了一個(gè)小技巧:專門開(kāi)辟了一個(gè)新協(xié)程來(lái)等待讀取最開(kāi)始的4字節(jié)Type數(shù)據(jù)秸谢,然后自己select阻塞在多個(gè)channel上,這樣就不會(huì)忽略其他channel傳遞過(guò)來(lái)的消息霹肝。一旦成功讀取到Type數(shù)據(jù)估蹄,就繼續(xù)后面的流程:讀取Length數(shù)據(jù),根據(jù)Length讀取應(yīng)用數(shù)據(jù)交給先前注冊(cè)好的反序列化函數(shù)沫换。注意臭蚁,如果收到超過(guò)最大長(zhǎng)度的數(shù)據(jù)就會(huì)關(guān)閉連接,這是為了防止外部程序惡意消耗系統(tǒng)資源讯赏。
5. 工作者協(xié)程池
為了提高框架的健壯性垮兑,避免因?yàn)樘幚順I(yè)務(wù)邏輯造成的響應(yīng)延遲,消息處理函數(shù)一般都會(huì)被調(diào)度到工作者協(xié)程池執(zhí)行漱挎。設(shè)計(jì)工作者協(xié)程池的一個(gè)關(guān)鍵是如何將任務(wù)散列給池子中的不同協(xié)程系枪。一方面,要避免并發(fā)問(wèn)題磕谅,必須保證同一個(gè)網(wǎng)絡(luò)連接發(fā)來(lái)的消息都被散列到同一個(gè)協(xié)程按順序執(zhí)行私爷;另一方面,散列一定要是均勻的怜庸,不能讓協(xié)程“忙的忙死当犯,閑的閑死”。關(guān)鍵還是在散列函數(shù)的設(shè)計(jì)上割疾。
5.1 核心代碼分析
協(xié)程池是按照單例模式設(shè)計(jì)的嚎卫。創(chuàng)建時(shí)會(huì)調(diào)用newWorker()創(chuàng)建一系列worker協(xié)程。
// WorkerPool is a pool of go-routines running functions.
type WorkerPool struct {
workers []*worker
closeChan chan struct{}
}
var (
globalWorkerPool *WorkerPool
)
func init() {
globalWorkerPool = newWorkerPool(WorkersNum)
}
// WorkerPoolInstance returns the global pool.
func WorkerPoolInstance() *WorkerPool {
return globalWorkerPool
}
func newWorkerPool(vol int) *WorkerPool {
if vol <= 0 {
vol = WorkersNum
}
pool := &WorkerPool{
workers: make([]*worker, vol),
closeChan: make(chan struct{}),
}
for i := range pool.workers {
pool.workers[i] = newWorker(i, 1024, pool.closeChan)
if pool.workers[i] == nil {
panic("worker nil")
}
}
return pool
}
5.2 給工作者協(xié)程分配任務(wù)
給工作者協(xié)程分配任務(wù)的方式很簡(jiǎn)單,通過(guò)hashCode()散列函數(shù)找到對(duì)應(yīng)的worker協(xié)程拓诸,然后把回調(diào)函數(shù)發(fā)送到對(duì)應(yīng)協(xié)程的channel中侵佃。對(duì)應(yīng)協(xié)程在運(yùn)行時(shí)就會(huì)從channel中取出然后執(zhí)行,在start()函數(shù)中奠支。
// Put appends a function to some worker's channel.
func (wp *WorkerPool) Put(k interface{}, cb func()) error {
code := hashCode(k)
return wp.workers[code&uint32(len(wp.workers)-1)].put(workerFunc(cb))
}
func (w *worker) start() {
for {
select {
case <-w.closeChan:
return
case cb := <-w.callbackChan:
before := time.Now()
cb()
addTotalTime(time.Since(before).Seconds())
}
}
}
func (w *worker) put(cb workerFunc) error {
select {
case w.callbackChan <- cb:
return nil
default:
return ErrWouldBlock
}
}
6. 線程安全的定時(shí)器
Tao框架設(shè)計(jì)了一個(gè)定時(shí)器TimingWheel馋辈,用來(lái)控制定時(shí)任務(wù)。Connection在此基礎(chǔ)上進(jìn)行了進(jìn)一步封裝倍谜。提供定時(shí)執(zhí)行(RunAt)迈螟,延時(shí)執(zhí)行(RunAfter)和周期執(zhí)行(RunEvery)功能。這里通過(guò)定時(shí)器的設(shè)計(jì)引出多線程編程的一點(diǎn)經(jīng)驗(yàn)之談尔崔。
6.1 定時(shí)任務(wù)的數(shù)據(jù)結(jié)構(gòu)設(shè)計(jì)
6.1.1 定時(shí)任務(wù)結(jié)構(gòu)
每個(gè)定時(shí)任務(wù)由一個(gè)timerType表示答毫,它帶有自己的id和包含定時(shí)回調(diào)函數(shù)的結(jié)構(gòu)OnTimeOut。expiration表示該任務(wù)到期要被執(zhí)行的時(shí)間季春,interval表示時(shí)間間隔洗搂,interval > 0意味著該任務(wù)是會(huì)被周期性重復(fù)執(zhí)行的任務(wù)。
/* 'expiration' is the time when timer time out, if 'interval' > 0
the timer will time out periodically, 'timeout' contains the callback
to be called when times out */
type timerType struct {
id int64
expiration time.Time
interval time.Duration
timeout *OnTimeOut
index int // for container/heap
}
// OnTimeOut represents a timed task.
type OnTimeOut struct {
Callback func(time.Time, WriteCloser)
Ctx context.Context
}
// NewOnTimeOut returns OnTimeOut.
func NewOnTimeOut(ctx context.Context, cb func(time.Time, WriteCloser)) *OnTimeOut {
return &OnTimeOut{
Callback: cb,
Ctx: ctx,
}
}
6.1.2 定時(shí)任務(wù)的組織
定時(shí)器需要按照到期時(shí)間的順序從最近到最遠(yuǎn)排列载弄,這是一個(gè)天然的小頂堆耘拇,于是這里采用標(biāo)準(zhǔn)庫(kù)container/heap創(chuàng)建了一個(gè)堆數(shù)據(jù)結(jié)構(gòu)來(lái)組織定時(shí)任務(wù),存取效率達(dá)到O(nlogn)宇攻。
// timerHeap is a heap-based priority queue
type timerHeapType []*timerType
func (heap timerHeapType) getIndexByID(id int64) int {
for _, t := range heap {
if t.id == id {
return t.index
}
}
return -1
}
func (heap timerHeapType) Len() int {
return len(heap)
}
func (heap timerHeapType) Less(i, j int) bool {
return heap[i].expiration.UnixNano() < heap[j].expiration.UnixNano()
}
func (heap timerHeapType) Swap(i, j int) {
heap[i], heap[j] = heap[j], heap[i]
heap[i].index = i
heap[j].index = j
}
func (heap *timerHeapType) Push(x interface{}) {
n := len(*heap)
timer := x.(*timerType)
timer.index = n
*heap = append(*heap, timer)
}
func (heap *timerHeapType) Pop() interface{} {
old := *heap
n := len(old)
timer := old[n-1]
timer.index = -1
*heap = old[0 : n-1]
return timer
}
6.2 定時(shí)器核心代碼分析
TimingWheel在創(chuàng)建時(shí)會(huì)啟動(dòng)一個(gè)單獨(dú)協(xié)程來(lái)運(yùn)行定時(shí)器核心代碼start()惫叛。它在多個(gè)channel上進(jìn)行多路復(fù)用操作:如果從cancelChan收到timerId,就執(zhí)行取消操作:從堆上刪除對(duì)應(yīng)的定時(shí)任務(wù)尺碰;將定時(shí)任務(wù)數(shù)量發(fā)送給sizeChan挣棕,別的線程就能獲取當(dāng)前定時(shí)任務(wù)數(shù);如果從quitChan收到消息亲桥,定時(shí)器就會(huì)被關(guān)閉然后退出洛心;如果從addChan收到timer,就將該定時(shí)任務(wù)添加到堆题篷;如果從tw.ticker.C收到定時(shí)信號(hào)词身,就調(diào)用getExpired()函數(shù)獲取到期的任務(wù),然后將這些任務(wù)回調(diào)發(fā)送到TimeOutChannel中番枚,其他相關(guān)線程會(huì)通過(guò)該channel獲取并執(zhí)行定時(shí)回調(diào)法严。最后tw.update()會(huì)更新周期性執(zhí)行的定時(shí)任務(wù),重新調(diào)度執(zhí)行葫笼。
func (tw *TimingWheel) update(timers []*timerType) {
if timers != nil {
for _, t := range timers {
if t.isRepeat() {
t.expiration = t.expiration.Add(t.interval)
heap.Push(&tw.timers, t)
}
}
}
}
func (tw *TimingWheel) start() {
for {
select {
case timerID := <-tw.cancelChan:
index := tw.timers.getIndexByID(timerID)
if index >= 0 {
heap.Remove(&tw.timers, index)
}
case tw.sizeChan <- tw.timers.Len():
case <-tw.ctx.Done():
tw.ticker.Stop()
return
case timer := <-tw.addChan:
heap.Push(&tw.timers, timer)
case <-tw.ticker.C:
timers := tw.getExpired()
for _, t := range timers {
tw.GetTimeOutChannel() <- t.timeout
}
tw.update(timers)
}
}
}
6.3 定時(shí)器是怎么做到線程安全的
用Tao框架開(kāi)發(fā)的服務(wù)器一開(kāi)始總是時(shí)不時(shí)地崩潰深啤。有時(shí)候運(yùn)行了幾個(gè)小時(shí)服務(wù)器就突然退出了。查看打印出來(lái)的調(diào)用棧發(fā)現(xiàn)路星。每次程序都在定時(shí)器上崩潰溯街,原因是數(shù)組訪問(wèn)越界。這就是并發(fā)訪問(wèn)導(dǎo)致的問(wèn)題,為什么呢呈昔?因?yàn)槎〞r(shí)器的核心函數(shù)在一個(gè)協(xié)程中操作堆數(shù)據(jù)結(jié)構(gòu)挥等,與此同時(shí)其提供的添加,刪除等接口卻有可能在其他協(xié)程中調(diào)用堤尾。多個(gè)協(xié)程并發(fā)訪問(wèn)一個(gè)沒(méi)有加鎖的數(shù)據(jù)結(jié)構(gòu)肝劲,必然會(huì)出現(xiàn)問(wèn)題。解決方法很簡(jiǎn)單:將多個(gè)協(xié)程的并發(fā)訪問(wèn)轉(zhuǎn)化為單個(gè)協(xié)程的串行訪問(wèn)郭宝,也就是將添加辞槐,刪除等操作發(fā)送給不同的channel,然后在start()協(xié)程中統(tǒng)一處理:
// AddTimer adds new timed task.
func (tw *TimingWheel) AddTimer(when time.Time, interv time.Duration, to *OnTimeOut) int64 {
if to == nil {
return int64(-1)
}
timer := newTimer(when, interv, to)
tw.addChan <- timer
return timer.id
}
// Size returns the number of timed tasks.
func (tw *TimingWheel) Size() int {
return <-tw.sizeChan
}
// CancelTimer cancels a timed task with specified timer ID.
func (tw *TimingWheel) CancelTimer(timerID int64) {
tw.cancelChan <- timerID
}
6.4 應(yīng)用層心跳
陳碩在他的《Linux多線程服務(wù)端編程》一書(shū)中說(shuō)到粘室,維護(hù)長(zhǎng)連接的服務(wù)器都應(yīng)該在應(yīng)用層自己實(shí)現(xiàn)心跳消息:
“在嚴(yán)肅的網(wǎng)絡(luò)程序中催蝗,應(yīng)用層的心跳協(xié)議是必不可少的。應(yīng)該用心跳消息來(lái)判斷對(duì)方進(jìn)程是否能正常工作育特。”
要使用一個(gè)連接來(lái)同時(shí)發(fā)送心跳和其他業(yè)務(wù)消息先朦,這樣一旦應(yīng)用層因?yàn)槌鲥e(cuò)發(fā)不出消息缰冤,對(duì)方就能夠立刻通過(guò)心跳停止感知到。值得注意的是喳魏,在Tao框架中棉浸,定時(shí)器只有一個(gè),而客戶端連接可能會(huì)有很多個(gè)刺彩。在長(zhǎng)連接模式下迷郑,每個(gè)客戶端都需要處理心跳包,或者其他類型的定時(shí)任務(wù)创倔。將框架設(shè)計(jì)為“每個(gè)客戶端連接自帶一個(gè)定時(shí)器”是不合適的——有十萬(wàn)個(gè)連接就有十萬(wàn)個(gè)定時(shí)器嗡害,會(huì)有較高的CPU占用率。定時(shí)器應(yīng)該只有一個(gè)畦攘,所有客戶端注冊(cè)進(jìn)來(lái)的定時(shí)任務(wù)都由它負(fù)責(zé)處理霸妹。但是如果所有的客戶端連接都等待唯一一個(gè)定時(shí)器發(fā)來(lái)的消息,就又會(huì)存在并發(fā)問(wèn)題知押。比如client 1的定時(shí)任務(wù)到期了叹螟,但它現(xiàn)在正忙著處理其他消息,這個(gè)定時(shí)任務(wù)就可能被其他client執(zhí)行台盯。所以這里采取了一種“先集中后分散”的處理機(jī)制:每一個(gè)定時(shí)任務(wù)都由一個(gè)TimeOut結(jié)構(gòu)表示罢绽,該結(jié)構(gòu)中除了回調(diào)函數(shù)還包含一個(gè)context【仓眩客戶端啟動(dòng)定時(shí)任務(wù)的時(shí)候都會(huì)填入net ID良价。TCPServer統(tǒng)一接收定時(shí)任務(wù),然后從定時(shí)任務(wù)中取出net ID,然后將該定時(shí)任務(wù)交給相應(yīng)的ServerConn或ClientConn去執(zhí)行:
// Retrieve the extra data(i.e. net id), and then redispatch timeout callbacks
// to corresponding client connection, this prevents one client from running
// callbacks of other clients
func (s *Server) timeOutLoop() {
defer s.wg.Done()
for {
select {
case <-s.ctx.Done():
return
case timeout := <-s.timing.GetTimeOutChannel():
netID := timeout.Ctx.Value(netIDCtx).(int64)
if sc, ok := s.conns.Get(netID); ok {
sc.timerCh <- timeout
} else {
holmes.Warnf("invalid client %d\n", netID)
}
}
}
}
三. 也談并發(fā)編程的核心問(wèn)題和基本思路
當(dāng)我們談?wù)摬l(fā)編程的時(shí)候棚壁,我們?cè)谡務(wù)撌裁幢兀坑靡痪湓捀爬ǎ?strong>當(dāng)多個(gè)線程同時(shí)訪問(wèn)一個(gè)未受保護(hù)的共享數(shù)據(jù)時(shí),就會(huì)產(chǎn)生并發(fā)問(wèn)題袖外。那么多線程編程的本質(zhì)就是怎樣避免上述情況的發(fā)生了史隆。這里總結(jié)一些,有三種基本的方法曼验。
1. 對(duì)共享數(shù)據(jù)結(jié)構(gòu)進(jìn)行保護(hù)
這是教科書(shū)上最常見(jiàn)的方法了泌射。用各種信號(hào)量/互斥鎖對(duì)數(shù)據(jù)結(jié)構(gòu)進(jìn)行保護(hù),先加鎖鬓照,然后執(zhí)行操作熔酷,最后解鎖。舉個(gè)例子豺裆,Tao框架中用于網(wǎng)絡(luò)連接管理的ConnMap就是這么實(shí)現(xiàn)的:
// ConnMap is a safe map for server connection management.
type ConnMap struct {
sync.RWMutex
m map[int64]*ServerConn
}
// NewConnMap returns a new ConnMap.
func NewConnMap() *ConnMap {
return &ConnMap{
m: make(map[int64]*ServerConn),
}
}
// Clear clears all elements in map.
func (cm *ConnMap) Clear() {
cm.Lock()
cm.m = make(map[int64]*ServerConn)
cm.Unlock()
}
// Get gets a server connection with specified net ID.
func (cm *ConnMap) Get(id int64) (*ServerConn, bool) {
cm.RLock()
sc, ok := cm.m[id]
cm.RUnlock()
return sc, ok
}
// Put puts a server connection with specified net ID in map.
func (cm *ConnMap) Put(id int64, sc *ServerConn) {
cm.Lock()
cm.m[id] = sc
cm.Unlock()
}
// Remove removes a server connection with specified net ID.
func (cm *ConnMap) Remove(id int64) {
cm.Lock()
delete(cm.m, id)
cm.Unlock()
}
// Size returns map size.
func (cm *ConnMap) Size() int {
cm.RLock()
size := len(cm.m)
cm.RUnlock()
return size
}
// IsEmpty tells whether ConnMap is empty.
func (cm *ConnMap) IsEmpty() bool {
return cm.Size() <= 0
}
2 多線程并行轉(zhuǎn)化為單線程串行
這種方法在前面已經(jīng)介紹過(guò)拒秘,它屬于無(wú)鎖化的一種編程方式。多個(gè)線程的操作請(qǐng)求都放到一個(gè)任務(wù)隊(duì)列中臭猜,最終由一個(gè)單一的線程來(lái)讀取隊(duì)列并串行執(zhí)行躺酒。這種方法在并發(fā)量很大的時(shí)候還是會(huì)有性能瓶頸。
3 采用精心設(shè)計(jì)的并發(fā)數(shù)據(jù)結(jié)構(gòu)
最好的辦法還是要從數(shù)據(jù)結(jié)構(gòu)上入手蔑歌,有很多技巧能夠讓數(shù)據(jù)結(jié)構(gòu)適應(yīng)多線程并發(fā)訪問(wèn)的場(chǎng)景羹应。比如Java標(biāo)準(zhǔn)庫(kù)中的java.util.concurrent,包含了各種并發(fā)數(shù)據(jù)結(jié)構(gòu)次屠,其中ConcurrentHashMap的基本原理就是分段鎖园匹,對(duì)每個(gè)段(Segment)加鎖保護(hù),并發(fā)寫入數(shù)據(jù)時(shí)通過(guò)散列函數(shù)分發(fā)到不同的段上面劫灶,在SegmentA上加鎖并不影響SegmentB的訪問(wèn)裸违。
處理并發(fā)多線程問(wèn)題,一定要小心再小心本昏,思考再思考累颂,一不注意就會(huì)踩坑
四. 特別鳴謝
- 《Linux多線程服務(wù)端編程》以及Muduo網(wǎng)絡(luò)編程庫(kù) - by陳碩
- 《Go程序設(shè)計(jì)語(yǔ)言》 - Donovan & Kernighan
- gotcp - A Go package for quickly building tcp servers
- syncmap - A thread safe map implementation for Golang
- leaf - A pragmatic game server framework in Go