Tao - Go語(yǔ)言實(shí)現(xiàn)的TCP網(wǎng)絡(luò)編程框架

一. 什么是Tao

Tao屎暇,在英文中的意思是“The ultimate principle of universe”饭宾,即“道”,它是宇宙的終極奧義看铆。

“道生一盛末,一生二弹惦,二生三,三生無(wú)窮助泽。” ——《道德經(jīng)》

Tao同時(shí)也是我用Go語(yǔ)言開(kāi)發(fā)的一個(gè)異步的TCP服務(wù)器框架(TCP Asynchronous Go server FramewOrk)暑刃,秉承Go語(yǔ)言“Less is more”的極簡(jiǎn)主義哲學(xué),它能穿透一切表象架谎,帶你一窺網(wǎng)絡(luò)編程的世界,讓你從此徹底擺脫只會(huì)寫“socket-bind-listen-accept”的窘境裹匙。本文將簡(jiǎn)單討論一下這個(gè)框架的設(shè)計(jì)思路以及自己的一些思考。

1. Tao解決什么問(wèn)題

1.1 場(chǎng)景

你開(kāi)發(fā)的產(chǎn)品有一套特有的業(yè)務(wù)邏輯惰匙,要通過(guò)互聯(lián)網(wǎng)得到服務(wù)端的支持才能為你的客戶提供服務(wù)。

1.2 問(wèn)題

怎樣快速穩(wěn)定地實(shí)現(xiàn)產(chǎn)品的功能绘盟,而不需要耗費(fèi)大量的時(shí)間處理各種底層的網(wǎng)絡(luò)通信細(xì)節(jié)。

1.3 解決方案

Tao提供了一種用框架支撐業(yè)務(wù)邏輯的機(jī)制撤嫩。你只需要與客戶端定義好消息格式茴她,然后將對(duì)應(yīng)的業(yè)務(wù)邏輯編寫成函數(shù)注冊(cè)到框架中就可以了。

2. 50行啟動(dòng)一個(gè)聊天服務(wù)器

讓我們舉一個(gè)例子來(lái)看看如何使用Tao框架實(shí)現(xiàn)一個(gè)簡(jiǎn)單的群聊天服務(wù)器己沛。服務(wù)器端代碼可以這么寫:

package main

import (
    "fmt"
    "net"

    "github.com/leesper/holmes"
    "github.com/leesper/tao"
    "github.com/leesper/tao/examples/chat"
)

// ChatServer is the chatting server.
type ChatServer struct {
    *tao.Server
}

// NewChatServer returns a ChatServer.
func NewChatServer() *ChatServer {
    onConnectOption := tao.OnConnectOption(func(conn tao.WriteCloser) bool {
        holmes.Infoln("on connect")
        return true
    })
    onErrorOption := tao.OnErrorOption(func(conn tao.WriteCloser) {
        holmes.Infoln("on error")
    })
    onCloseOption := tao.OnCloseOption(func(conn tao.WriteCloser) {
        holmes.Infoln("close chat client")
    })
    return &ChatServer{
        tao.NewServer(onConnectOption, onErrorOption, onCloseOption),
    }
}

func main() {
    defer holmes.Start().Stop()

    tao.Register(chat.ChatMessage, chat.DeserializeMessage, chat.ProcessMessage)

    l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", "0.0.0.0", 12345))
    if err != nil {
        holmes.Fatalln("listen error", err)
    }
    chatServer := NewChatServer()
    err = chatServer.Start(l)
    if err != nil {
        holmes.Fatalln("start error", err)
    }
}

啟動(dòng)一個(gè)服務(wù)器只需要三步就能完成垫桂。首先注冊(cè)消息和業(yè)務(wù)邏輯回調(diào)霹粥,其次填入IP地址和端口后控,最后Start一下就可以了矾利。這時(shí)候客戶端就能夠發(fā)起連接,并開(kāi)始聊天时肿。業(yè)務(wù)邏輯的實(shí)現(xiàn)很簡(jiǎn)單,遍歷所有的連接须教,然后發(fā)送數(shù)據(jù):

// ProcessMessage handles the Message logic.
func ProcessMessage(ctx context.Context, conn tao.WriteCloser) {
    holmes.Infof("ProcessMessage")
    s, ok := tao.ServerFromContext(ctx)
    if ok {
        msg := tao.MessageFromContext(ctx)
        s.Broadcast(msg)
    }
}

3. Go語(yǔ)言的編程哲學(xué)

Go語(yǔ)言是“云計(jì)算時(shí)代的C語(yǔ)言”桅锄,適用于開(kāi)發(fā)基礎(chǔ)性服務(wù),比如服務(wù)器。它語(yǔ)法類似C語(yǔ)言且標(biāo)準(zhǔn)庫(kù)豐富盟戏,上手較快,所以開(kāi)發(fā)效率高笛求;編譯速度快狡孔,運(yùn)行效率接近C,所以運(yùn)行效率高辱揭。

3.1 面向?qū)ο缶幊?/h3>

Go語(yǔ)言面向?qū)ο缶幊痰娘L(fēng)格是“多用組合,少用繼承”,以匿名嵌入的方式實(shí)現(xiàn)繼承听皿。比如上面的聊天服務(wù)器ChatServer:

// ChatServer is the chatting server.
type ChatServer struct {
    *tao.Server
}

于是ChatServer就自動(dòng)繼承了Server所有的屬性和方法。當(dāng)然啊送,這里是以指針的方式嵌入的昔逗。

3.2 面向接口編程

Go語(yǔ)言的面向接口編程是“鴨子類型”的婆排,即“如果我走起來(lái)像鴨子段只,叫起來(lái)像鴨子赞枕,那么我就是一只鴨子”姐赡。其他的編程語(yǔ)言需要顯示地說(shuō)明自己繼承某個(gè)接口项滑,Go語(yǔ)言卻采取的是“隱式聲明”的方式枪狂。比如Tao框架使用的多線程日志庫(kù)Holmes實(shí)現(xiàn)“每小時(shí)創(chuàng)建一個(gè)新日志文件”功能的核心代碼如下:

func (ls *logSegment)Write(p []byte) (n int, err error) {
  if ls.timeToCreate != nil && ls.logFile != os.Stdout && ls.logFile != os.Stderr {
    select {
    case current := <-ls.timeToCreate:
      ls.logFile.Close()
      ls.logFile = nil
      name := getLogFileName(current)
      ls.logFile, err = os.Create(path.Join(ls.logPath, name))
      if err != nil {
        fmt.Fprintln(os.Stderr, err)
        ls.logFile = os.Stderr
      } else {
        next := current.Truncate(ls.unit).Add(ls.unit)
        ls.timeToCreate = time.After(next.Sub(time.Now()))
      }
    default:
      // do nothing
    }
  }
  return ls.logFile.Write(p)
}

而標(biāo)準(zhǔn)庫(kù)中的io.Writer定義如下,那么這里的logSegment就實(shí)現(xiàn)了io.Writer的接口,所有以io.Writer作為形參的函數(shù)谈飒,我都可以傳一個(gè)logSegment的實(shí)參進(jìn)去杭措。

type Writer interface {
    Write(p []byte) (n int, err error)
}

3.3 一個(gè)中心,兩個(gè)基本點(diǎn)

掌握Go語(yǔ)言泉懦,要把握“一個(gè)中心,兩個(gè)基本點(diǎn)”邓嘹⌒谘海“一個(gè)中心”是Go語(yǔ)言并發(fā)模型窖维,即“不要通過(guò)共享內(nèi)存來(lái)通信陈辱,要通過(guò)通信來(lái)共享內(nèi)存”沛贪;“兩個(gè)基本點(diǎn)”是Go語(yǔ)言的并發(fā)模型的兩大基石:channel和go-routine。理解了它們就能看懂大部分代碼媚送。下面讓我們正式開(kāi)始介紹Tao框架吧。

二. Tao的設(shè)計(jì)思路

1. 服務(wù)器的啟動(dòng)

Tao框架支持通過(guò)tao.TLSCredsOption()函數(shù)提供傳輸層安全的TLS Server吟秩。服務(wù)器的核心職責(zé)是“監(jiān)聽(tīng)并接受客戶端連接”涵防。每個(gè)進(jìn)程能夠打開(kāi)的文件描述符是有限制的,所以它還需要限制最大并發(fā)連接數(shù)椰憋,關(guān)鍵代碼如下:

// Start starts the TCP server, accepting new clients and creating service
// go-routine for each. The service go-routines read messages and then call
// the registered handlers to handle them. Start returns when failed with fatal
// errors, the listener willl be closed when returned.
func (s *Server) Start(l net.Listener) error {
    s.mu.Lock()
    if s.lis == nil {
        s.mu.Unlock()
        l.Close()
        return ErrServerClosed
    }
    s.lis[l] = true
    s.mu.Unlock()

    defer func() {
        s.mu.Lock()
        if s.lis != nil && s.lis[l] {
            l.Close()
            delete(s.lis, l)
        }
        s.mu.Unlock()
    }()

    holmes.Infof("server start, net %s addr %s\n", l.Addr().Network(), l.Addr().String())

    s.wg.Add(1)
    go s.timeOutLoop()

    var tempDelay time.Duration
    for {
        rawConn, err := l.Accept()
        if err != nil {
            if ne, ok := err.(net.Error); ok && ne.Temporary() {
                if tempDelay == 0 {
                    tempDelay = 5 * time.Millisecond
                } else {
                    tempDelay *= 2
                }
                if max := 1 * time.Second; tempDelay >= max {
                    tempDelay = max
                }
                holmes.Errorf("accept error %v, retrying in %d\n", err, tempDelay)
                select {
                case <-time.After(tempDelay):
                case <-s.ctx.Done():
                }
                continue
            }
            return err
        }
        tempDelay = 0

        // how many connections do we have ?
        sz := s.conns.Size()
        if sz >= MaxConnections {
            holmes.Warnf("max connections size %d, refuse\n", sz)
            rawConn.Close()
            continue
        }

        if s.opts.tlsCfg != nil {
            rawConn = tls.Server(rawConn, s.opts.tlsCfg)
        }

        netid := netIdentifier.GetAndIncrement()
        sc := NewServerConn(netid, s, rawConn)
        sc.SetName(sc.rawConn.RemoteAddr().String())

        s.mu.Lock()
        if s.sched != nil {
            sc.RunEvery(s.interv, s.sched)
        }
        s.mu.Unlock()

        s.conns.Put(netid, sc)
        addTotalConn(1)

        s.wg.Add(1)
        go func() {
            sc.Start()
        }()

        holmes.Infof("accepted client %s, id %d, total %d\n", sc.GetName(), netid, s.conns.Size())
        s.conns.RLock()
        for _, c := range s.conns.m {
            holmes.Infof("client %s\n", c.GetName())
        }
        s.conns.RUnlock()
    } // for loop
}

如果服務(wù)器在接受客戶端連接請(qǐng)求的時(shí)候發(fā)生了臨時(shí)錯(cuò)誤,那么服務(wù)器將等待最多1秒的時(shí)間再重新嘗試接受請(qǐng)求卵渴,如果現(xiàn)有的連接數(shù)超過(guò)了MaxConnections(默認(rèn)1000)浪读,就拒絕并關(guān)閉連接互订,否則啟動(dòng)一個(gè)新的連接開(kāi)始工作仰禽。

2. 服務(wù)器的優(yōu)雅關(guān)閉

Go語(yǔ)言在發(fā)布1.7版時(shí)在標(biāo)準(zhǔn)庫(kù)中引入了context包。context包提供的Context結(jié)構(gòu)能夠在服務(wù)器温峭,網(wǎng)絡(luò)連接以及各相關(guān)線程之間建立一種相關(guān)聯(lián)的“上下文”關(guān)系。這種上下文關(guān)系包含的信息是與某次網(wǎng)絡(luò)請(qǐng)求有關(guān)的(request scoped)揖庄,因此與該請(qǐng)求有關(guān)的所有Go線程都能安全地訪問(wèn)這個(gè)上下文結(jié)構(gòu),讀取或者寫入與上下文有關(guān)的數(shù)據(jù)检号。比如handleLoop線程會(huì)將某個(gè)網(wǎng)絡(luò)連接的net ID以及message打包到上下文結(jié)構(gòu)中,然后連同handler函數(shù)一起交給工作者線程去處理:

// handleLoop() - put handler or timeout callback into worker go-routines
func handleLoop(c WriteCloser, wg *sync.WaitGroup) {
    //... omitted ...

    
    for {
        select {
        //... omitted ...
        case msgHandler := <-handlerCh:
            msg, handler := msgHandler.message, msgHandler.handler
            if handler != nil {
                if askForWorker {
                    WorkerPoolInstance().Put(netID, func() {
                        handler(NewContextWithNetID(NewContextWithMessage(ctx, msg), netID), c)
                    })
                } 
            }
        //... omitted ...
    }
}

隨后,在工作者線程真正執(zhí)行時(shí)玛痊,業(yè)務(wù)邏輯代碼就能在handler函數(shù)中獲取到message或者net ID擂煞,這些都是與本次請(qǐng)求有關(guān)的上下文數(shù)據(jù)蝗拿,比如一個(gè)典型的echo server就會(huì)這樣處理:

// ProcessMessage process the logic of echo message.
func ProcessMessage(ctx context.Context, conn tao.WriteCloser) {
    msg := tao.MessageFromContext(ctx).(Message)
    holmes.Infof("receving message %s\n", msg.Content)
    conn.Write(msg)
}

使用context的另外一個(gè)場(chǎng)景是實(shí)現(xiàn)服務(wù)器及網(wǎng)絡(luò)連接的“優(yōu)雅關(guān)閉”。服務(wù)器在管理網(wǎng)絡(luò)連接時(shí)會(huì)將自己的上下文傳遞給它仓手,而網(wǎng)絡(luò)連接啟動(dòng)新線程時(shí)同樣也會(huì)將自己的上下文傳遞給這些線程,這些上下文都是可取消(cancelable)的辛慰。當(dāng)服務(wù)器需要停機(jī)或者連接將要關(guān)閉時(shí),只要調(diào)用cancel函數(shù)速客,所有這些線程就能收到通知并退出。服務(wù)器或者網(wǎng)絡(luò)連接通過(guò)阻塞等待這些線程關(guān)閉之后再關(guān)閉浪耘,就能最大限度保證正確退出。服務(wù)器關(guān)閉的關(guān)鍵代碼如下:

// Stop gracefully closes the server, it blocked until all connections
// are closed and all go-routines are exited.
func (s *Server) Stop() {
    // immediately stop accepting new clients
    s.mu.Lock()
    listeners := s.lis
    s.lis = nil
    s.mu.Unlock()

    for l := range listeners {
        l.Close()
        holmes.Infof("stop accepting at address %s\n", l.Addr().String())
    }

    // close all connections
    conns := map[int64]*ServerConn{}
    s.conns.RLock()
    for k, v := range s.conns.m {
        conns[k] = v
    }
    s.conns.Clear()
    s.conns.RUnlock()

    for _, c := range conns {
        c.rawConn.Close()
        holmes.Infof("close client %s\n", c.GetName())
    }

    s.mu.Lock()
    s.cancel()
    s.mu.Unlock()

    s.wg.Wait()

    holmes.Infoln("server stopped gracefully, bye.")
    os.Exit(0)
}

3. 網(wǎng)絡(luò)連接模型

在其他的編程語(yǔ)言中,采用Reactor模式編寫的服務(wù)器往往需要在一個(gè)IO線程異步地通過(guò)epoll進(jìn)行多路復(fù)用掘鄙。而因?yàn)镚o線程的開(kāi)銷廉價(jià),Go語(yǔ)言可以對(duì)每一個(gè)網(wǎng)絡(luò)連接創(chuàng)建三個(gè)go-routine颅夺。readLoop()負(fù)責(zé)讀取數(shù)據(jù)并反序列化成消息;writeLoop()負(fù)責(zé)序列化消息并發(fā)送二進(jìn)制字節(jié)流拗慨;最后handleLoop()負(fù)責(zé)調(diào)用消息處理函數(shù)。這三個(gè)協(xié)程在連接創(chuàng)建并啟動(dòng)時(shí)就會(huì)各自獨(dú)立運(yùn)行:

// Start starts the server connection, creating go-routines for reading,
// writing and handlng.
func (sc *ServerConn) Start() {
    holmes.Infof("conn start, <%v -> %v>\n", sc.rawConn.LocalAddr(), sc.rawConn.RemoteAddr())
    onConnect := sc.belong.opts.onConnect
    if onConnect != nil {
        onConnect(sc)
    }

    loopers := []func(WriteCloser, *sync.WaitGroup){readLoop, writeLoop, handleLoop}
    for _, l := range loopers {
        looper := l
        sc.wg.Add(1)
        go looper(sc, sc.wg)
    }
}

3.1 核心代碼分析之readLoop

readLoop做了三件關(guān)鍵的工作。首先調(diào)用消息編解碼器將接收到的字節(jié)流反序列化成消息其爵;然后更新用于心跳檢測(cè)的時(shí)間戳;最后摇幻,根據(jù)消息的協(xié)議號(hào)找到對(duì)應(yīng)的消息處理函數(shù),如果注冊(cè)了消息回調(diào)函數(shù),那么就調(diào)用該函數(shù)處理消息银酗,否則將消息和處理函數(shù)打包發(fā)送到handlerCh中黍特,注意其中的cDone和sDone分別是網(wǎng)絡(luò)連接和服務(wù)器上下文結(jié)構(gòu)中的channel,分別用于監(jiān)聽(tīng)網(wǎng)絡(luò)連接和服務(wù)器的“關(guān)閉”事件通知(下同)迫像。

/* readLoop() blocking read from connection, deserialize bytes into message,
then find corresponding handler, put it into channel */
func readLoop(c WriteCloser, wg *sync.WaitGroup) {
    var (
        rawConn          net.Conn
        codec            Codec
        cDone            <-chan struct{}
        sDone            <-chan struct{}
        setHeartBeatFunc func(int64)
        onMessage        onMessageFunc
        handlerCh        chan MessageHandler
        msg              Message
        err              error
    )

    switch c := c.(type) {
    case *ServerConn:
        rawConn = c.rawConn
        codec = c.belong.opts.codec
        cDone = c.ctx.Done()
        sDone = c.belong.ctx.Done()
        setHeartBeatFunc = c.SetHeartBeat
        onMessage = c.belong.opts.onMessage
        handlerCh = c.handlerCh
    case *ClientConn:
        rawConn = c.rawConn
        codec = c.opts.codec
        cDone = c.ctx.Done()
        sDone = nil
        setHeartBeatFunc = c.SetHeartBeat
        onMessage = c.opts.onMessage
        handlerCh = c.handlerCh
    }

    defer func() {
        if p := recover(); p != nil {
            holmes.Errorf("panics: %v\n", p)
        }
        wg.Done()
        holmes.Debugln("readLoop go-routine exited")
        c.Close()
    }()

    for {
        select {
        case <-cDone: // connection closed
            holmes.Debugln("receiving cancel signal from conn")
            return
        case <-sDone: // server closed
            holmes.Debugln("receiving cancel signal from server")
            return
        default:
            msg, err = codec.Decode(rawConn)
            if err != nil {
                holmes.Errorf("error decoding message %v\n", err)
                if _, ok := err.(ErrUndefined); ok {
                    // update heart beats
                    setHeartBeatFunc(time.Now().UnixNano())
                    continue
                }
                return
            }
            setHeartBeatFunc(time.Now().UnixNano())
            handler := GetHandlerFunc(msg.MessageNumber())
            if handler == nil {
                if onMessage != nil {
                    holmes.Infof("message %d call onMessage()\n", msg.MessageNumber())
                    onMessage(msg, c.(WriteCloser))
                } else {
                    holmes.Warnf("no handler or onMessage() found for message %d\n", msg.MessageNumber())
                }
                continue
            }
            handlerCh <- MessageHandler{msg, handler}
        }
    }
}

3.2 核心代碼分析之writeLoop

writeLoop做了一件事情由缆,從sendCh中讀取已序列化好的字節(jié)流均唉,然后發(fā)送到網(wǎng)絡(luò)上。但是要注意肚菠,該協(xié)程在連接關(guān)閉退出執(zhí)行之前舔箭,會(huì)非阻塞地將sendCh中的消息全部發(fā)送完畢再退出,避免漏發(fā)消息案糙,這就是關(guān)鍵所在限嫌。

/* writeLoop() receive message from channel, serialize it into bytes,
then blocking write into connection */
func writeLoop(c WriteCloser, wg *sync.WaitGroup) {
    var (
        rawConn net.Conn
        sendCh  chan []byte
        cDone   <-chan struct{}
        sDone   <-chan struct{}
        pkt     []byte
        err     error
    )

    switch c := c.(type) {
    case *ServerConn:
        rawConn = c.rawConn
        sendCh = c.sendCh
        cDone = c.ctx.Done()
        sDone = c.belong.ctx.Done()
    case *ClientConn:
        rawConn = c.rawConn
        sendCh = c.sendCh
        cDone = c.ctx.Done()
        sDone = nil
    }

    defer func() {
        if p := recover(); p != nil {
            holmes.Errorf("panics: %v\n", p)
        }
        // drain all pending messages before exit
    OuterFor:
        for {
            select {
            case pkt = <-sendCh:
                if pkt != nil {
                    if _, err = rawConn.Write(pkt); err != nil {
                        holmes.Errorf("error writing data %v\n", err)
                    }
                }
            default:
                break OuterFor
            }
        }
        wg.Done()
        holmes.Debugln("writeLoop go-routine exited")
        c.Close()
    }()

    for {
        select {
        case <-cDone: // connection closed
            holmes.Debugln("receiving cancel signal from conn")
            return
        case <-sDone: // server closed
            holmes.Debugln("receiving cancel signal from server")
            return
        case pkt = <-sendCh:
            if pkt != nil {
                if _, err = rawConn.Write(pkt); err != nil {
                    holmes.Errorf("error writing data %v\n", err)
                    return
                }
            }
        }
    }
}

3.3 核心代碼分析之handleLoop

readLoop將消息和處理函數(shù)打包發(fā)給了handlerCh时捌,于是handleLoop就從handlerCh中取出消息和處理函數(shù)怒医,然后交給工作者線程池,由后者負(fù)責(zé)調(diào)度執(zhí)行奢讨,完成對(duì)消息的處理稚叹。這里很好的詮釋了Go語(yǔ)言是如何通過(guò)channel實(shí)現(xiàn)Go線程間通信的。

// handleLoop() - put handler or timeout callback into worker go-routines
func handleLoop(c WriteCloser, wg *sync.WaitGroup) {
    var (
        cDone        <-chan struct{}
        sDone        <-chan struct{}
        timerCh      chan *OnTimeOut
        handlerCh    chan MessageHandler
        netID        int64
        ctx          context.Context
        askForWorker bool
    )

    switch c := c.(type) {
    case *ServerConn:
        cDone = c.ctx.Done()
        sDone = c.belong.ctx.Done()
        timerCh = c.timerCh
        handlerCh = c.handlerCh
        netID = c.netid
        ctx = c.ctx
        askForWorker = true
    case *ClientConn:
        cDone = c.ctx.Done()
        sDone = nil
        timerCh = c.timing.timeOutChan
        handlerCh = c.handlerCh
        netID = c.netid
        ctx = c.ctx
    }

    defer func() {
        if p := recover(); p != nil {
            holmes.Errorf("panics: %v\n", p)
        }
        wg.Done()
        holmes.Debugln("handleLoop go-routine exited")
        c.Close()
    }()

    for {
        select {
        case <-cDone: // connectin closed
            holmes.Debugln("receiving cancel signal from conn")
            return
        case <-sDone: // server closed
            holmes.Debugln("receiving cancel signal from server")
            return
        case msgHandler := <-handlerCh:
            msg, handler := msgHandler.message, msgHandler.handler
            if handler != nil {
                if askForWorker {
                    WorkerPoolInstance().Put(netID, func() {
                        handler(NewContextWithNetID(NewContextWithMessage(ctx, msg), netID), c)
                    })
                    addTotalHandle()
                } else {
                    handler(NewContextWithNetID(NewContextWithMessage(ctx, msg), netID), c)
                }
            }
        case timeout := <-timerCh:
            if timeout != nil {
                timeoutNetID := NetIDFromContext(timeout.Ctx)
                if timeoutNetID != netID {
                    holmes.Errorf("timeout net %d, conn net %d, mismatched!\n", timeoutNetID, netID)
                }
                if askForWorker {
                    WorkerPoolInstance().Put(netID, func() {
                        timeout.Callback(time.Now(), c.(WriteCloser))
                    })
                } else {
                    timeout.Callback(time.Now(), c.(WriteCloser))
                }
            }
        }
    }
}

4. 消息處理機(jī)制

4.1 消息上下文

任何一個(gè)實(shí)現(xiàn)了Message接口的類型拿诸,都是一個(gè)消息扒袖,它需要提供方法訪問(wèn)自己的協(xié)議號(hào)并將自己序列化成字節(jié)數(shù)組;另外亩码,每個(gè)消息都需要注冊(cè)自己的反序列化函數(shù)和處理函數(shù):

// Handler takes the responsibility to handle incoming messages.
type Handler interface {
    Handle(context.Context, interface{})
}

// HandlerFunc serves as an adapter to allow the use of ordinary functions as handlers.
type HandlerFunc func(context.Context, WriteCloser)

// Handle calls f(ctx, c)
func (f HandlerFunc) Handle(ctx context.Context, c WriteCloser) {
    f(ctx, c)
}

// UnmarshalFunc unmarshals bytes into Message.
type UnmarshalFunc func([]byte) (Message, error)

// handlerUnmarshaler is a combination of unmarshal and handle functions for message.
type handlerUnmarshaler struct {
    handler     HandlerFunc
    unmarshaler UnmarshalFunc
}

func init() {
  messageRegistry = map[int32]messageFunc{}
  buf = new(bytes.Buffer)
}

// Register registers the unmarshal and handle functions for msgType.
// If no unmarshal function provided, the message will not be parsed.
// If no handler function provided, the message will not be handled unless you
// set a default one by calling SetOnMessageCallback.
// If Register being called twice on one msgType, it will panics.
func Register(msgType int32, unmarshaler func([]byte) (Message, error), handler func(context.Context, WriteCloser)) {
    if _, ok := messageRegistry[msgType]; ok {
        panic(fmt.Sprintf("trying to register message %d twice", msgType))
    }

    messageRegistry[msgType] = handlerUnmarshaler{
        unmarshaler: unmarshaler,
        handler:     HandlerFunc(handler),
    }
}

// GetUnmarshalFunc returns the corresponding unmarshal function for msgType.
func GetUnmarshalFunc(msgType int32) UnmarshalFunc {
    entry, ok := messageRegistry[msgType]
    if !ok {
        return nil
    }
    return entry.unmarshaler
}

// GetHandlerFunc returns the corresponding handler function for msgType.
func GetHandlerFunc(msgType int32) HandlerFunc {
    entry, ok := messageRegistry[msgType]
    if !ok {
        return nil
    }
    return entry.handler
}

// Message represents the structured data that can be handled.
type Message interface {
    MessageNumber() int32
    Serialize() ([]byte, error)
}

對(duì)每個(gè)消息處理函數(shù)而言季率,要處理的消息以及發(fā)送該消息的客戶端都是不同的,這些信息被稱為“消息上下文”描沟,用Context結(jié)構(gòu)表示飒泻,每個(gè)不同的客戶端用一個(gè)64位整數(shù)netid標(biāo)識(shí):

// Context is the context info for every handler function.
// Handler function handles the business logic about message.
// We can find the client connection who sent this message by netid and send back responses.
type Context struct{
  message Message
  netid int64
}

func NewContext(msg Message, id int64) Context {
  return Context{
    message: msg,
    netid: id,
  }
}

func (ctx Context)Message() Message {
  return ctx.message
}

func (ctx Context)Id() int64 {
  return ctx.netid
}

4.2 編解碼器

接收數(shù)據(jù)時(shí),編解碼器(Codec)負(fù)責(zé)按照一定的格式將網(wǎng)絡(luò)連接上讀取的字節(jié)數(shù)據(jù)反序列化成消息吏廉,并將消息交給上層處理(解碼)泞遗;發(fā)送數(shù)據(jù)時(shí),編解碼器將上層傳遞過(guò)來(lái)的消息序列化成字節(jié)數(shù)據(jù)席覆,交給下層發(fā)送(編碼):

// Codec is the interface for message coder and decoder.
// Application programmer can define a custom codec themselves.
type Codec interface {
  Decode(Connection) (Message, error)
  Encode(Message) ([]byte, error)
}

Tao框架采用的是“Type-Length-Data”的格式打包數(shù)據(jù)史辙。Type占4個(gè)字節(jié),表示協(xié)議類型;Length占4個(gè)字節(jié)聊倔,表示消息長(zhǎng)度晦毙,Data為變長(zhǎng)字節(jié)序列,長(zhǎng)度由Length表示耙蔑。反序列化時(shí)结序,由Type字段可以確定協(xié)議類型,然后截取Length長(zhǎng)度的字節(jié)數(shù)據(jù)Data纵潦,并調(diào)用已注冊(cè)的反序列化函數(shù)處理。核心代碼如下:

// Codec is the interface for message coder and decoder.
// Application programmer can define a custom codec themselves.
type Codec interface {
    Decode(net.Conn) (Message, error)
    Encode(Message) ([]byte, error)
}

// TypeLengthValueCodec defines a special codec.
// Format: type-length-value |4 bytes|4 bytes|n bytes <= 8M|
type TypeLengthValueCodec struct{}

// Decode decodes the bytes data into Message
func (codec TypeLengthValueCodec) Decode(raw net.Conn) (Message, error) {
    byteChan := make(chan []byte)
    errorChan := make(chan error)

    go func(bc chan []byte, ec chan error) {
        typeData := make([]byte, MessageTypeBytes)
        _, err := io.ReadFull(raw, typeData)
        if err != nil {
            ec <- err
            close(bc)
            close(ec)
            holmes.Debugln("go-routine read message type exited")
            return
        }
        bc <- typeData
    }(byteChan, errorChan)

    var typeBytes []byte

    select {
    case err := <-errorChan:
        return nil, err

    case typeBytes = <-byteChan:
        if typeBytes == nil {
            holmes.Warnln("read type bytes nil")
            return nil, ErrBadData
        }
        typeBuf := bytes.NewReader(typeBytes)
        var msgType int32
        if err := binary.Read(typeBuf, binary.LittleEndian, &msgType); err != nil {
            return nil, err
        }

        lengthBytes := make([]byte, MessageLenBytes)
        _, err := io.ReadFull(raw, lengthBytes)
        if err != nil {
            return nil, err
        }
        lengthBuf := bytes.NewReader(lengthBytes)
        var msgLen uint32
        if err = binary.Read(lengthBuf, binary.LittleEndian, &msgLen); err != nil {
            return nil, err
        }
        if msgLen > MessageMaxBytes {
            holmes.Errorf("message(type %d) has bytes(%d) beyond max %d\n", msgType, msgLen, MessageMaxBytes)
            return nil, ErrBadData
        }

        // read application data
        msgBytes := make([]byte, msgLen)
        _, err = io.ReadFull(raw, msgBytes)
        if err != nil {
            return nil, err
        }

        // deserialize message from bytes
        unmarshaler := GetUnmarshalFunc(msgType)
        if unmarshaler == nil {
            return nil, ErrUndefined(msgType)
        }
        return unmarshaler(msgBytes)
    }
}

這里的代碼存在一些微妙的設(shè)計(jì)垃环,需要仔細(xì)解釋一下邀层。TypeLengthValueCodec.Decode()函數(shù)會(huì)被readLoop協(xié)程用到。因?yàn)閕o.ReadFull()是同步調(diào)用遂庄,沒(méi)有數(shù)據(jù)可讀時(shí)會(huì)阻塞readLoop協(xié)程寥院。此時(shí)如果關(guān)閉網(wǎng)絡(luò)連接,readLoop協(xié)程將無(wú)法退出涛目。所以這里的代碼用到了一個(gè)小技巧:專門開(kāi)辟了一個(gè)新協(xié)程來(lái)等待讀取最開(kāi)始的4字節(jié)Type數(shù)據(jù)秸谢,然后自己select阻塞在多個(gè)channel上,這樣就不會(huì)忽略其他channel傳遞過(guò)來(lái)的消息霹肝。一旦成功讀取到Type數(shù)據(jù)估蹄,就繼續(xù)后面的流程:讀取Length數(shù)據(jù),根據(jù)Length讀取應(yīng)用數(shù)據(jù)交給先前注冊(cè)好的反序列化函數(shù)沫换。注意臭蚁,如果收到超過(guò)最大長(zhǎng)度的數(shù)據(jù)就會(huì)關(guān)閉連接,這是為了防止外部程序惡意消耗系統(tǒng)資源讯赏。

5. 工作者協(xié)程池

為了提高框架的健壯性垮兑,避免因?yàn)樘幚順I(yè)務(wù)邏輯造成的響應(yīng)延遲,消息處理函數(shù)一般都會(huì)被調(diào)度到工作者協(xié)程池執(zhí)行漱挎。設(shè)計(jì)工作者協(xié)程池的一個(gè)關(guān)鍵是如何將任務(wù)散列給池子中的不同協(xié)程系枪。一方面,要避免并發(fā)問(wèn)題磕谅,必須保證同一個(gè)網(wǎng)絡(luò)連接發(fā)來(lái)的消息都被散列到同一個(gè)協(xié)程按順序執(zhí)行私爷;另一方面,散列一定要是均勻的怜庸,不能讓協(xié)程“忙的忙死当犯,閑的閑死”。關(guān)鍵還是在散列函數(shù)的設(shè)計(jì)上割疾。

5.1 核心代碼分析

協(xié)程池是按照單例模式設(shè)計(jì)的嚎卫。創(chuàng)建時(shí)會(huì)調(diào)用newWorker()創(chuàng)建一系列worker協(xié)程。

// WorkerPool is a pool of go-routines running functions.
type WorkerPool struct {
    workers   []*worker
    closeChan chan struct{}
}

var (
    globalWorkerPool *WorkerPool
)

func init() {
    globalWorkerPool = newWorkerPool(WorkersNum)
}

// WorkerPoolInstance returns the global pool.
func WorkerPoolInstance() *WorkerPool {
    return globalWorkerPool
}

func newWorkerPool(vol int) *WorkerPool {
    if vol <= 0 {
        vol = WorkersNum
    }

    pool := &WorkerPool{
        workers:   make([]*worker, vol),
        closeChan: make(chan struct{}),
    }

    for i := range pool.workers {
        pool.workers[i] = newWorker(i, 1024, pool.closeChan)
        if pool.workers[i] == nil {
            panic("worker nil")
        }
    }

    return pool
}

5.2 給工作者協(xié)程分配任務(wù)

給工作者協(xié)程分配任務(wù)的方式很簡(jiǎn)單,通過(guò)hashCode()散列函數(shù)找到對(duì)應(yīng)的worker協(xié)程拓诸,然后把回調(diào)函數(shù)發(fā)送到對(duì)應(yīng)協(xié)程的channel中侵佃。對(duì)應(yīng)協(xié)程在運(yùn)行時(shí)就會(huì)從channel中取出然后執(zhí)行,在start()函數(shù)中奠支。

// Put appends a function to some worker's channel.
func (wp *WorkerPool) Put(k interface{}, cb func()) error {
    code := hashCode(k)
    return wp.workers[code&uint32(len(wp.workers)-1)].put(workerFunc(cb))
}

func (w *worker) start() {
    for {
        select {
        case <-w.closeChan:
            return
        case cb := <-w.callbackChan:
            before := time.Now()
            cb()
            addTotalTime(time.Since(before).Seconds())
        }
    }
}

func (w *worker) put(cb workerFunc) error {
    select {
    case w.callbackChan <- cb:
        return nil
    default:
        return ErrWouldBlock
    }
}

6. 線程安全的定時(shí)器

Tao框架設(shè)計(jì)了一個(gè)定時(shí)器TimingWheel馋辈,用來(lái)控制定時(shí)任務(wù)。Connection在此基礎(chǔ)上進(jìn)行了進(jìn)一步封裝倍谜。提供定時(shí)執(zhí)行(RunAt)迈螟,延時(shí)執(zhí)行(RunAfter)和周期執(zhí)行(RunEvery)功能。這里通過(guò)定時(shí)器的設(shè)計(jì)引出多線程編程的一點(diǎn)經(jīng)驗(yàn)之談尔崔。

6.1 定時(shí)任務(wù)的數(shù)據(jù)結(jié)構(gòu)設(shè)計(jì)

6.1.1 定時(shí)任務(wù)結(jié)構(gòu)

每個(gè)定時(shí)任務(wù)由一個(gè)timerType表示答毫,它帶有自己的id和包含定時(shí)回調(diào)函數(shù)的結(jié)構(gòu)OnTimeOut。expiration表示該任務(wù)到期要被執(zhí)行的時(shí)間季春,interval表示時(shí)間間隔洗搂,interval > 0意味著該任務(wù)是會(huì)被周期性重復(fù)執(zhí)行的任務(wù)。

/* 'expiration' is the time when timer time out, if 'interval' > 0
the timer will time out periodically, 'timeout' contains the callback
to be called when times out */
type timerType struct {
    id         int64
    expiration time.Time
    interval   time.Duration
    timeout    *OnTimeOut
    index      int // for container/heap
}

// OnTimeOut represents a timed task.
type OnTimeOut struct {
    Callback func(time.Time, WriteCloser)
    Ctx      context.Context
}

// NewOnTimeOut returns OnTimeOut.
func NewOnTimeOut(ctx context.Context, cb func(time.Time, WriteCloser)) *OnTimeOut {
    return &OnTimeOut{
        Callback: cb,
        Ctx:      ctx,
    }
}

6.1.2 定時(shí)任務(wù)的組織

定時(shí)器需要按照到期時(shí)間的順序從最近到最遠(yuǎn)排列载弄,這是一個(gè)天然的小頂堆耘拇,于是這里采用標(biāo)準(zhǔn)庫(kù)container/heap創(chuàng)建了一個(gè)堆數(shù)據(jù)結(jié)構(gòu)來(lái)組織定時(shí)任務(wù),存取效率達(dá)到O(nlogn)宇攻。

// timerHeap is a heap-based priority queue
type timerHeapType []*timerType

func (heap timerHeapType) getIndexByID(id int64) int {
    for _, t := range heap {
        if t.id == id {
            return t.index
        }
    }
    return -1
}

func (heap timerHeapType) Len() int {
    return len(heap)
}

func (heap timerHeapType) Less(i, j int) bool {
    return heap[i].expiration.UnixNano() < heap[j].expiration.UnixNano()
}

func (heap timerHeapType) Swap(i, j int) {
    heap[i], heap[j] = heap[j], heap[i]
    heap[i].index = i
    heap[j].index = j
}

func (heap *timerHeapType) Push(x interface{}) {
    n := len(*heap)
    timer := x.(*timerType)
    timer.index = n
    *heap = append(*heap, timer)
}

func (heap *timerHeapType) Pop() interface{} {
    old := *heap
    n := len(old)
    timer := old[n-1]
    timer.index = -1
    *heap = old[0 : n-1]
    return timer
}

6.2 定時(shí)器核心代碼分析

TimingWheel在創(chuàng)建時(shí)會(huì)啟動(dòng)一個(gè)單獨(dú)協(xié)程來(lái)運(yùn)行定時(shí)器核心代碼start()惫叛。它在多個(gè)channel上進(jìn)行多路復(fù)用操作:如果從cancelChan收到timerId,就執(zhí)行取消操作:從堆上刪除對(duì)應(yīng)的定時(shí)任務(wù)尺碰;將定時(shí)任務(wù)數(shù)量發(fā)送給sizeChan挣棕,別的線程就能獲取當(dāng)前定時(shí)任務(wù)數(shù);如果從quitChan收到消息亲桥,定時(shí)器就會(huì)被關(guān)閉然后退出洛心;如果從addChan收到timer,就將該定時(shí)任務(wù)添加到堆题篷;如果從tw.ticker.C收到定時(shí)信號(hào)词身,就調(diào)用getExpired()函數(shù)獲取到期的任務(wù),然后將這些任務(wù)回調(diào)發(fā)送到TimeOutChannel中番枚,其他相關(guān)線程會(huì)通過(guò)該channel獲取并執(zhí)行定時(shí)回調(diào)法严。最后tw.update()會(huì)更新周期性執(zhí)行的定時(shí)任務(wù),重新調(diào)度執(zhí)行葫笼。

func (tw *TimingWheel) update(timers []*timerType) {
    if timers != nil {
        for _, t := range timers {
            if t.isRepeat() {
                t.expiration = t.expiration.Add(t.interval)
                heap.Push(&tw.timers, t)
            }
        }
    }
}

func (tw *TimingWheel) start() {
    for {
        select {
        case timerID := <-tw.cancelChan:
            index := tw.timers.getIndexByID(timerID)
            if index >= 0 {
                heap.Remove(&tw.timers, index)
            }

        case tw.sizeChan <- tw.timers.Len():

        case <-tw.ctx.Done():
            tw.ticker.Stop()
            return

        case timer := <-tw.addChan:
            heap.Push(&tw.timers, timer)

        case <-tw.ticker.C:
            timers := tw.getExpired()
            for _, t := range timers {
                tw.GetTimeOutChannel() <- t.timeout
            }
            tw.update(timers)
        }
    }
}

6.3 定時(shí)器是怎么做到線程安全的

用Tao框架開(kāi)發(fā)的服務(wù)器一開(kāi)始總是時(shí)不時(shí)地崩潰深啤。有時(shí)候運(yùn)行了幾個(gè)小時(shí)服務(wù)器就突然退出了。查看打印出來(lái)的調(diào)用棧發(fā)現(xiàn)路星。每次程序都在定時(shí)器上崩潰溯街,原因是數(shù)組訪問(wèn)越界。這就是并發(fā)訪問(wèn)導(dǎo)致的問(wèn)題,為什么呢呈昔?因?yàn)槎〞r(shí)器的核心函數(shù)在一個(gè)協(xié)程中操作堆數(shù)據(jù)結(jié)構(gòu)挥等,與此同時(shí)其提供的添加,刪除等接口卻有可能在其他協(xié)程中調(diào)用堤尾。多個(gè)協(xié)程并發(fā)訪問(wèn)一個(gè)沒(méi)有加鎖的數(shù)據(jù)結(jié)構(gòu)肝劲,必然會(huì)出現(xiàn)問(wèn)題。解決方法很簡(jiǎn)單:將多個(gè)協(xié)程的并發(fā)訪問(wèn)轉(zhuǎn)化為單個(gè)協(xié)程的串行訪問(wèn)郭宝,也就是將添加辞槐,刪除等操作發(fā)送給不同的channel,然后在start()協(xié)程中統(tǒng)一處理:

// AddTimer adds new timed task.
func (tw *TimingWheel) AddTimer(when time.Time, interv time.Duration, to *OnTimeOut) int64 {
    if to == nil {
        return int64(-1)
    }
    timer := newTimer(when, interv, to)
    tw.addChan <- timer
    return timer.id
}

// Size returns the number of timed tasks.
func (tw *TimingWheel) Size() int {
    return <-tw.sizeChan
}

// CancelTimer cancels a timed task with specified timer ID.
func (tw *TimingWheel) CancelTimer(timerID int64) {
    tw.cancelChan <- timerID
}

6.4 應(yīng)用層心跳

陳碩在他的《Linux多線程服務(wù)端編程》一書(shū)中說(shuō)到粘室,維護(hù)長(zhǎng)連接的服務(wù)器都應(yīng)該在應(yīng)用層自己實(shí)現(xiàn)心跳消息:

“在嚴(yán)肅的網(wǎng)絡(luò)程序中催蝗,應(yīng)用層的心跳協(xié)議是必不可少的。應(yīng)該用心跳消息來(lái)判斷對(duì)方進(jìn)程是否能正常工作育特。”

要使用一個(gè)連接來(lái)同時(shí)發(fā)送心跳和其他業(yè)務(wù)消息先朦,這樣一旦應(yīng)用層因?yàn)槌鲥e(cuò)發(fā)不出消息缰冤,對(duì)方就能夠立刻通過(guò)心跳停止感知到。值得注意的是喳魏,在Tao框架中棉浸,定時(shí)器只有一個(gè),而客戶端連接可能會(huì)有很多個(gè)刺彩。在長(zhǎng)連接模式下迷郑,每個(gè)客戶端都需要處理心跳包,或者其他類型的定時(shí)任務(wù)创倔。將框架設(shè)計(jì)為“每個(gè)客戶端連接自帶一個(gè)定時(shí)器”是不合適的——有十萬(wàn)個(gè)連接就有十萬(wàn)個(gè)定時(shí)器嗡害,會(huì)有較高的CPU占用率。定時(shí)器應(yīng)該只有一個(gè)畦攘,所有客戶端注冊(cè)進(jìn)來(lái)的定時(shí)任務(wù)都由它負(fù)責(zé)處理霸妹。但是如果所有的客戶端連接都等待唯一一個(gè)定時(shí)器發(fā)來(lái)的消息,就又會(huì)存在并發(fā)問(wèn)題知押。比如client 1的定時(shí)任務(wù)到期了叹螟,但它現(xiàn)在正忙著處理其他消息,這個(gè)定時(shí)任務(wù)就可能被其他client執(zhí)行台盯。所以這里采取了一種“先集中后分散”的處理機(jī)制:每一個(gè)定時(shí)任務(wù)都由一個(gè)TimeOut結(jié)構(gòu)表示罢绽,該結(jié)構(gòu)中除了回調(diào)函數(shù)還包含一個(gè)context【仓眩客戶端啟動(dòng)定時(shí)任務(wù)的時(shí)候都會(huì)填入net ID良价。TCPServer統(tǒng)一接收定時(shí)任務(wù),然后從定時(shí)任務(wù)中取出net ID,然后將該定時(shí)任務(wù)交給相應(yīng)的ServerConn或ClientConn去執(zhí)行:

// Retrieve the extra data(i.e. net id), and then redispatch timeout callbacks
// to corresponding client connection, this prevents one client from running
// callbacks of other clients
func (s *Server) timeOutLoop() {
    defer s.wg.Done()

    for {
        select {
        case <-s.ctx.Done():
            return

        case timeout := <-s.timing.GetTimeOutChannel():
            netID := timeout.Ctx.Value(netIDCtx).(int64)
            if sc, ok := s.conns.Get(netID); ok {
                sc.timerCh <- timeout
            } else {
                holmes.Warnf("invalid client %d\n", netID)
            }
        }
    }
}

三. 也談并發(fā)編程的核心問(wèn)題和基本思路

當(dāng)我們談?wù)摬l(fā)編程的時(shí)候棚壁,我們?cè)谡務(wù)撌裁幢兀坑靡痪湓捀爬ǎ?strong>當(dāng)多個(gè)線程同時(shí)訪問(wèn)一個(gè)未受保護(hù)的共享數(shù)據(jù)時(shí),就會(huì)產(chǎn)生并發(fā)問(wèn)題袖外。那么多線程編程的本質(zhì)就是怎樣避免上述情況的發(fā)生了史隆。這里總結(jié)一些,有三種基本的方法曼验。

1. 對(duì)共享數(shù)據(jù)結(jié)構(gòu)進(jìn)行保護(hù)

這是教科書(shū)上最常見(jiàn)的方法了泌射。用各種信號(hào)量/互斥鎖對(duì)數(shù)據(jù)結(jié)構(gòu)進(jìn)行保護(hù),先加鎖鬓照,然后執(zhí)行操作熔酷,最后解鎖。舉個(gè)例子豺裆,Tao框架中用于網(wǎng)絡(luò)連接管理的ConnMap就是這么實(shí)現(xiàn)的:

// ConnMap is a safe map for server connection management.
type ConnMap struct {
    sync.RWMutex
    m map[int64]*ServerConn
}

// NewConnMap returns a new ConnMap.
func NewConnMap() *ConnMap {
    return &ConnMap{
        m: make(map[int64]*ServerConn),
    }
}

// Clear clears all elements in map.
func (cm *ConnMap) Clear() {
    cm.Lock()
    cm.m = make(map[int64]*ServerConn)
    cm.Unlock()
}

// Get gets a server connection with specified net ID.
func (cm *ConnMap) Get(id int64) (*ServerConn, bool) {
    cm.RLock()
    sc, ok := cm.m[id]
    cm.RUnlock()
    return sc, ok
}

// Put puts a server connection with specified net ID in map.
func (cm *ConnMap) Put(id int64, sc *ServerConn) {
    cm.Lock()
    cm.m[id] = sc
    cm.Unlock()
}

// Remove removes a server connection with specified net ID.
func (cm *ConnMap) Remove(id int64) {
    cm.Lock()
    delete(cm.m, id)
    cm.Unlock()
}

// Size returns map size.
func (cm *ConnMap) Size() int {
    cm.RLock()
    size := len(cm.m)
    cm.RUnlock()
    return size
}

// IsEmpty tells whether ConnMap is empty.
func (cm *ConnMap) IsEmpty() bool {
    return cm.Size() <= 0
}

2 多線程并行轉(zhuǎn)化為單線程串行

這種方法在前面已經(jīng)介紹過(guò)拒秘,它屬于無(wú)鎖化的一種編程方式。多個(gè)線程的操作請(qǐng)求都放到一個(gè)任務(wù)隊(duì)列中臭猜,最終由一個(gè)單一的線程來(lái)讀取隊(duì)列并串行執(zhí)行躺酒。這種方法在并發(fā)量很大的時(shí)候還是會(huì)有性能瓶頸。

3 采用精心設(shè)計(jì)的并發(fā)數(shù)據(jù)結(jié)構(gòu)

最好的辦法還是要從數(shù)據(jù)結(jié)構(gòu)上入手蔑歌,有很多技巧能夠讓數(shù)據(jù)結(jié)構(gòu)適應(yīng)多線程并發(fā)訪問(wèn)的場(chǎng)景羹应。比如Java標(biāo)準(zhǔn)庫(kù)中的java.util.concurrent,包含了各種并發(fā)數(shù)據(jù)結(jié)構(gòu)次屠,其中ConcurrentHashMap的基本原理就是分段鎖园匹,對(duì)每個(gè)段(Segment)加鎖保護(hù),并發(fā)寫入數(shù)據(jù)時(shí)通過(guò)散列函數(shù)分發(fā)到不同的段上面劫灶,在SegmentA上加鎖并不影響SegmentB的訪問(wèn)裸违。
處理并發(fā)多線程問(wèn)題,一定要小心再小心本昏,思考再思考累颂,一不注意就會(huì)踩坑

四. 特別鳴謝

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市凛俱,隨后出現(xiàn)的幾起案子紊馏,更是在濱河造成了極大的恐慌,老刑警劉巖蒲犬,帶你破解...
    沈念sama閱讀 216,402評(píng)論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件邓夕,死亡現(xiàn)場(chǎng)離奇詭異弱睦,居然都是意外死亡毕贼,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門巡蘸,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人擂送,你說(shuō)我怎么就攤上這事悦荒。” “怎么了嘹吨?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,483評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵搬味,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我蟀拷,道長(zhǎng)碰纬,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,165評(píng)論 1 292
  • 正文 為了忘掉前任问芬,我火速辦了婚禮悦析,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘此衅。我一直安慰自己强戴,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,176評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布挡鞍。 她就那樣靜靜地躺著酌泰,像睡著了一般。 火紅的嫁衣襯著肌膚如雪匕累。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,146評(píng)論 1 297
  • 那天默伍,我揣著相機(jī)與錄音欢嘿,去河邊找鬼。 笑死也糊,一個(gè)胖子當(dāng)著我的面吹牛炼蹦,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播狸剃,決...
    沈念sama閱讀 40,032評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼掐隐,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了钞馁?” 一聲冷哼從身側(cè)響起虑省,我...
    開(kāi)封第一講書(shū)人閱讀 38,896評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎僧凰,沒(méi)想到半個(gè)月后探颈,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,311評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡训措,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,536評(píng)論 2 332
  • 正文 我和宋清朗相戀三年伪节,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了光羞。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,696評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡怀大,死狀恐怖纱兑,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情化借,我是刑警寧澤潜慎,帶...
    沈念sama閱讀 35,413評(píng)論 5 343
  • 正文 年R本政府宣布,位于F島的核電站屏鳍,受9級(jí)特大地震影響勘纯,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜钓瞭,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,008評(píng)論 3 325
  • 文/蒙蒙 一驳遵、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧山涡,春花似錦堤结、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至鳞溉,卻和暖如春瘾带,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背熟菲。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,815評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工看政, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人抄罕。 一個(gè)月前我還...
    沈念sama閱讀 47,698評(píng)論 2 368
  • 正文 我出身青樓允蚣,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親呆贿。 傳聞我的和親對(duì)象是個(gè)殘疾皇子嚷兔,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,592評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容

  • 原文鏈接:https://github.com/EasyKotlin 在常用的并發(fā)模型中,多進(jìn)程做入、多線程冒晰、分布式是...
    JackChen1024閱讀 10,726評(píng)論 3 23
  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)竟块,斷路器翩剪,智...
    卡卡羅2017閱讀 134,651評(píng)論 18 139
  • Android 自定義View的各種姿勢(shì)1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 172,072評(píng)論 25 707
  • 今天背了247個(gè)單詞,花費(fèi)了3個(gè)番茄鐘彩郊,好累前弯。本來(lái)計(jì)劃看完最后200頁(yè)《和時(shí)間做朋友》蚪缀,但是也未能如愿。 自從正式...
    粉藍(lán)閱讀 254評(píng)論 0 0
  • A_master閱讀 288評(píng)論 0 0