Go net (TCP Service) 的使用及源碼分析

注:本文分析基于Go1.14

簡(jiǎn)單實(shí)例

一個(gè)簡(jiǎn)單的tcp server例子

package main

import (
    "context"
    "fmt"
    "net"
    "time"
)

func main() {
    listener, err := net.Listen("tcp", "0.0.0.0:7777")
    if err != nil {
        fmt.Println("Listen error:", err)
        return
    }
    defer listener.Close()

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    for {
        conn, err := listener.Accept()
        if err != nil {
            if ne, ok := err.(net.Error); ok && ne.Temporary() {
                fmt.Println("Accept temp error:", ne)
                time.Sleep(time.Second)
                continue
            }
            fmt.Println("Accept error:", err)
            break
        }
        go handleConn(ctx, conn)
    }
}

func handleConn(ctx context.Context, conn net.Conn) {
    defer conn.Close()

    buf := make([]byte, 1024)
    for {
        select {
        case <-ctx.Done():
            return
        default:
        }
        n, err := conn.Read(buf)
        if err != nil {
            fmt.Println("Read error:", err)
        }
        _, err = conn.Write(buf[:n])
        if err != nil {
            fmt.Println("Write error:", err)
        }
    }
}

完整實(shí)現(xiàn)

上面是簡(jiǎn)單的TCP Server使用示例,如果想獲取完整的服務(wù)實(shí)現(xiàn)术裸,可以使用或者參考github上的一個(gè)go net 開源項(xiàng)目

https://github.com/izhw/gnet

該項(xiàng)目使用go原生net package實(shí)現(xiàn)恼五,不依賴其他第三方庫(kù)磨淌,可以幫助開發(fā)者快速搭建一個(gè)net服務(wù)(TCP Server or Client)硫豆。
比如芬位,開發(fā)者只要實(shí)現(xiàn)gnet.NetEventHandler事件回調(diào)(也可內(nèi)置默認(rèn)實(shí)現(xiàn))无拗,使用可選的Functional options,就能快速搭建一個(gè)TCP Server昧碉,簡(jiǎn)單使用代碼如下:

package main

import (
    "fmt"
    "log"

    "github.com/izhw/gnet"
    "github.com/izhw/gnet/tcp/tcpserver"
)

type ServerHandler struct {
    *gnet.NetEventHandler
}

func (h *ServerHandler) OnReadMsg(c gnet.Conn, data []byte) error {
    fmt.Println("read msg:", string(data))
    c.Write(data)
    return nil
}

func main() {
    s := tcpserver.NewServer("0.0.0.0:7777", &ServerHandler{})
    log.Fatal("Exit:", s.Serve())
}

Go net源碼分析 (go1.14)

開始的簡(jiǎn)單示例中英染,我們看到幾個(gè)關(guān)鍵的調(diào)用:net.Listen()listener.Accept()被饿、conn.Read()四康、conn.Write,下面分別進(jìn)行源碼分析狭握。

Listen

  • net.Listen返回的是實(shí)現(xiàn)了net.Listener接口的*TCPListener闪金。
  • 其中,Listen方法內(nèi)论颅,生成系統(tǒng)文件描述符sysfd哎垦,使用該sysfd設(shè)置參數(shù)、調(diào)用syscall.Bind恃疯、syscall.Listen完成綁定漏设、監(jiān)聽,并初始化一些重要的結(jié)構(gòu)信息澡谭,創(chuàng)建epoll句柄愿题、注冊(cè)epoll事件损俭,然后構(gòu)造返回TCPListener

調(diào)用鏈源碼如下(只貼了關(guān)鍵代碼)潘酗。

// `net/net.go`
// Listener接口
type Listener interface {
    // 阻塞等待杆兵,有新連接事件的時(shí)候,返回一個(gè)net.Conn
    Accept() (Conn, error)
    // 關(guān)閉listener仔夺,任何阻塞的Accept操作變?yōu)椴蛔枞鲈啵⒎祷劐e(cuò)誤
    Close() error
    // 返回listener的網(wǎng)絡(luò)地址
    Addr() Addr
}

// `net/tcpsock.go`
type TCPListener struct {
    // Go封裝的網(wǎng)絡(luò)描述符,后面會(huì)具體講
    fd *netFD
    // Listen配置
    lc ListenConfig
}

// `net/dial.go`
// 根據(jù)不同的 'network'和'address'構(gòu)建相應(yīng)的'Listener'
func Listen(network, address string) (Listener, error) {
    var lc ListenConfig
    return lc.Listen(context.Background(), network, address)
}

func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
    // resolve解析addrs
    // ...
    sl := &sysListener{
        ListenConfig: *lc,
        network:      network,
        address:      address,
    }
    // 根據(jù)不同的addrs類型調(diào)用不同的listen
    var l Listener
    la := addrs.first(isIPv4)
    switch la := la.(type) {
    case *TCPAddr:
        l, err = sl.listenTCP(ctx, la)
    case *UnixAddr:
        l, err = sl.listenUnix(ctx, la)
    default:
        // return error...
    }
    // return error...
    return l, nil
}

// `net/tcpsock_posix.go`
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
    // internetSocket內(nèi)部解析AddrFamily并調(diào)用socket()
    // socket內(nèi)部創(chuàng)建關(guān)鍵的結(jié)構(gòu)對(duì)象netFD缸兔,并初始化綁定日裙、監(jiān)聽等
    // 注意這里的syscall.SOCK_STREAM,后面會(huì)用到
    fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
    if err != nil {
        return nil, err
    }
    // 用上面創(chuàng)建并初始化好的fd構(gòu)造TCPListener
    return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}

// `net/sock_posix.go`
// 返回一個(gè)初始化好的異步I/O `netFD` 網(wǎng)絡(luò)描述符
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
    // `sysSocket`內(nèi)部調(diào)用syscall.Socket惰蜜,
    // 并置為非阻塞和close-on-exec (Syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)昂拂,
    // 返回系統(tǒng)描述符: s
    s, err := sysSocket(family, sotype, proto)
    if err != nil {
        return nil, err
    }
    // ...
    // 用`sysSocket`創(chuàng)建的`s`創(chuàng)建`netFD`
    if fd, err = newFD(s, family, sotype, net); err != nil {
        poll.CloseFunc(s)
        return nil, err
    }
    // ...
    // 上面提到過,在listenTCP中傳入的參數(shù)為syscall.SOCK_STREAM
    // 此處判斷該sotype類型抛猖,調(diào)用fd.listenStream()
    if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
        fd.Close()
        return nil, err
    }
    // ...
    return fd, nil
}

func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
    var err error
    // 設(shè)置默認(rèn)參數(shù) SO_REUSEADDR
    if err = setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil {
        return err
    }
    var lsa syscall.Sockaddr
    if lsa, err = laddr.sockaddr(fd.family); err != nil {
        return err
    }
    if ctrlFn != nil {
        // ...
    }
    // syscall.Bind 綁定
    if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
        return os.NewSyscallError("bind", err)
    }
    // syscall.Listen 監(jiān)聽
    if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
        return os.NewSyscallError("listen", err)
    }
    // netFD初始化格侯,fd.init()->fd.pfd.Init()->fd.pfd.pd.init()`
    // 最終調(diào)用的是`runtime_pollServerInit`、`runtime_pollOpen`
    // netFD初始化部分后面會(huì)接著講
    if err = fd.init(); err != nil {
        return err
    }
    lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
    fd.setAddr(fd.addrFunc()(lsa), nil)
    return nil
}

為了更好的理解财著,需要講幾個(gè)重要的struct:

  • netFD是網(wǎng)絡(luò)描述符联四,其結(jié)構(gòu)體中有一個(gè)poll.FD對(duì)象。
  • poll.FD是文件描述符撑教,表示一個(gè)網(wǎng)絡(luò)連接或者OS文件朝墩。
  • poll.FD結(jié)構(gòu)中主要看Sysfd intpd pollDesc兩個(gè)變量,前者是系統(tǒng)返回的文件描述符伟姐,后者其內(nèi)部封裝了運(yùn)行時(shí)上下文收苏,包括讀、寫goroutine及其狀態(tài)玫镐,讀寫超時(shí)等基本信息倒戏。通過將pollDesc指針信息存入epollevent.data(8字節(jié)數(shù)組)中怠噪,然后調(diào)用epollctl(epoll_ctl)將fdepollevent信息注冊(cè)到epoll實(shí)例上恐似,實(shí)現(xiàn)epoll事件回調(diào)和用戶態(tài)協(xié)程調(diào)用的關(guān)聯(lián)。

相關(guān)結(jié)構(gòu)源碼如下:

// `net/fd_unix.go`
type netFD struct {
    pfd poll.FD

    // Close前不可變
    family      int
    sotype      int
    isConnected bool // handshake completed or use of association with peer
    net         string
    laddr       Addr
    raddr       Addr
}

// `internal/poll/fd_unix.go`
type FD struct {
    // sysfd和Read/Write方法鎖
    fdmu fdMutex
    // 系統(tǒng)文件描述符
    Sysfd int
    // I/O poller傍念,封裝了運(yùn)行時(shí)上下文
    pd pollDesc
    // Writev 緩存.
    iovecs *[]syscall.Iovec
    // 文件關(guān)閉的時(shí)候發(fā)送信號(hào)量
    csema uint32
    // 非阻塞模式時(shí)非零
    isBlocking uint32
    // 是streaming還是packet-based UDP
    IsStream bool
    // 讀到零字節(jié)是否表示EOF矫夷,對(duì)于基于消息的套接字連接為false
    ZeroReadIsEOF bool
    // 是一個(gè)file而并非network socket
    isFile bool
}

// internal/poll/fd_poll_runtime.go
// `pollDesc`結(jié)構(gòu)中只有一個(gè)`uintptr`變量,`runtimeCtx`封裝了運(yùn)行時(shí)上下文憋槐,其具體信息后面會(huì)講
type pollDesc struct {
    runtimeCtx uintptr
}

接著看netFD初始化源碼:

// `net/fd_unix.go`
//  netFD初始化
func (fd *netFD) init() error {
    // 調(diào)用 pfd (poll.FD) 的Init方法
    return fd.pfd.Init(fd.net, true)
}

// `internal/poll/fd_unix.go`
// FD初始化
func (fd *FD) Init(net string, pollable bool) error {
    // ...
    // 調(diào)用 pd (pollDesc) 的init方法
    err := fd.pd.init(fd)
    // ...
    return err
}

// internal/poll/fd_poll_runtime.go
// serverInit全局變量双藕,只執(zhí)行一次runtime_pollServerInit,
// 并在其內(nèi)部調(diào)用runtime.netpollinit()創(chuàng)建epoll實(shí)例阳仔;
// runtime_pollOpen內(nèi)部調(diào)用runtime.netpollopen忧陪,
// 將listener fd注冊(cè)到epoll實(shí)例中,初始化pollDesc并返回ctx,賦值runtimeCtx
var serverInit sync.Once
func (pd *pollDesc) init(fd *FD) error {
    serverInit.Do(runtime_pollServerInit)
    ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
    if errno != 0 {
        if ctx != 0 {
            runtime_pollUnblock(ctx)
            runtime_pollClose(ctx)
        }
        return errnoErr(syscall.Errno(errno))
    }
    pd.runtimeCtx = ctx
    return nil
}

上面pollDescinit()方法中嘶摊,runtime_pollServerInitruntime_pollOpen實(shí)際link的是runtime包下的poll_runtime_pollServerInitpoll_runtime_pollOpen函數(shù)延蟹,具體實(shí)現(xiàn)在runtime/netpoll.go中。
首先叶堆,看一下pollDesc在runtime包下的具體封裝信息:

// `runtime/netpoll.go`
// Network poller descriptor.
// No heap pointers.
//go:notinheap
type pollDesc struct {
    link *pollDesc // in pollcache, protected by pollcache.lock

    lock    mutex // protects the following fields
    fd      uintptr
    closing bool
    everr   bool    // marks event scanning error happened
    user    uint32  // user settable cookie
    rseq    uintptr // protects from stale read timers
    rg      uintptr // pdReady, pdWait, G waiting for read or nil
    rt      timer   // read deadline timer (set if rt.f != nil)
    rd      int64   // read deadline
    wseq    uintptr // protects from stale write timers
    wg      uintptr // pdReady, pdWait, G waiting for write or nil
    wt      timer   // write deadline timer
    wd      int64   // write deadline
}

pollDesc結(jié)構(gòu)中的重要變量:

  • lock鎖 防止內(nèi)部成員變量并發(fā)讀寫問題
  • fd為文件描述符
  • rtwt分別表示讀寫定時(shí)器阱飘,用來(lái)防止讀寫超時(shí)
  • rgwg分別保存了用戶態(tài)操作pollDesc的讀、寫goroutine地址虱颗,以及goroutine的ready/wait狀態(tài)沥匈,用于goroutine讀寫阻塞時(shí)掛起、就緒時(shí)恢復(fù)運(yùn)行

接著看一下runtime包下面的InitOpen忘渔,Init全局只初始化一次高帖。

// `runtime/netpoll.go`
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
    netpollGenericInit()
}

func netpollGenericInit() {
    // 判斷netpoll是否已經(jīng)初始化過
    if atomic.Load(&netpollInited) == 0 {
        // 全局鎖
        lock(&netpollInitLock)
        if netpollInited == 0 {
            // 調(diào)用netpollinit()
            netpollinit()
            atomic.Store(&netpollInited, 1)
        }
        unlock(&netpollInitLock)
    }
}

type epollevent struct {
    // 事件
    events uint32
    data   [8]byte // unaligned uintptr
}

// `runtime/netpoll_epoll.go`
func netpollinit() {
    // 調(diào)用 OS epoll_create,創(chuàng)建一個(gè)epoll實(shí)例畦粮,
    // 把生成的epoll fd賦值給全局變量 `epfd`
    // 后續(xù)listener以及accept的所有sockets相關(guān)的epoll操作都是基于這個(gè)`epfd`
    epfd = epollcreate1(_EPOLL_CLOEXEC)
    // ...
    ev := epollevent{
        // 讀事件
        events: _EPOLLIN,
    }
    // netpollBreakRd: for netpollBreak
    // 在后面有事件回調(diào)時(shí)會(huì)用到棋恼,判斷是否為netpollBreakRd
    *(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
    // 系統(tǒng)調(diào)用 epoll_ctl,注冊(cè)epoll事件
    errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
    // ...
}

接著看Open

//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
    // pollcache為全局pollDesc鏈表緩存,調(diào)用 alloc()獲取一個(gè)*pollDesc
    pd := pollcache.alloc()
    lock(&pd.lock)
    // 鎖住锈玉,初始化爪飘、賦值成員變量
    // ...
    unlock(&pd.lock)

    var errno int32
    // 調(diào)用 netpollopen(),實(shí)現(xiàn)見下面
    errno = netpollopen(fd, pd)
    return pd, int(errno)
}

// alloc 如果鏈表頭`first`為空拉背,則分配內(nèi)存并初始化n個(gè)`*pollDesc`節(jié)點(diǎn)鏈表师崎,然后pop出頭節(jié)點(diǎn);
// 如果`first`不為空則直接pop出頭部節(jié)點(diǎn)椅棺。
func (c *pollCache) alloc() *pollDesc {
    lock(&c.lock)
    if c.first == nil {
        const pdSize = unsafe.Sizeof(pollDesc{})
        n := pollBlockSize / pdSize
        if n == 0 {
            n = 1
        }
        // Must be in non-GC memory because can be referenced
        // only from epoll/kqueue internals.
        mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
        for i := uintptr(0); i < n; i++ {
            pd := (*pollDesc)(add(mem, i*pdSize))
            pd.link = c.first
            c.first = pd
        }
    }
    pd := c.first
    c.first = pd.link
    unlock(&c.lock)
    return pd
}

// netpollopen
func netpollopen(fd uintptr, pd *pollDesc) int32 {
    var ev epollevent
    // 觸發(fā)事件犁罩,讀、寫两疚、掛起關(guān)閉床估、邊緣觸發(fā)
    ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
    *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
    // 調(diào)用`epollctl`注冊(cè)fd到epoll實(shí)例
    // 同時(shí)把`*pollDesc`保存到`epollevent.data`里傳入內(nèi)核
    // 實(shí)現(xiàn)內(nèi)核態(tài)事件和用戶態(tài)協(xié)程的關(guān)聯(lián)
    return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

從上面的源碼可以看到,Listen實(shí)現(xiàn)了TCP Server的綁定诱渤、監(jiān)聽丐巫,通過調(diào)用epoll_createepoll_ctl 創(chuàng)建epoll句柄勺美、注冊(cè)epoll事件递胧,并將goroutine信息與回調(diào)事件相關(guān)聯(lián)。

Accept

listener.AcceptListener的接口方法赡茸,TCPListener實(shí)現(xiàn)了該方法缎脾,它阻塞等待下一次調(diào)用并返回一個(gè)Conn。

// `net/tcpsock.go`
func (l *TCPListener) Accept() (Conn, error) {
    if !l.ok() {
        return nil, syscall.EINVAL
    }
    c, err := l.accept()
    if err != nil {
        return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
    }
    return c, nil
}

// `net/tcpsock_posix.go`
func (ln *TCPListener) accept() (*TCPConn, error) {
    // 關(guān)注1:
    fd, err := ln.fd.accept()
    if err != nil {
        return nil, err
    }
    // 關(guān)注2:
    tc := newTCPConn(fd)
    if ln.lc.KeepAlive >= 0 {
        setKeepAlive(fd, true)
        ka := ln.lc.KeepAlive
        if ln.lc.KeepAlive == 0 {
            ka = defaultTCPKeepAlive
        }
        setKeepAlivePeriod(fd, ka)
    }
    return tc, nil
}

// `net/fd_unix.go`
func (fd *netFD) accept() (netfd *netFD, err error) {
    // 調(diào)用`poll.FD`的`Accept`方法接受新的socket連接占卧,返回socket的fd
    d, rsa, errcall, err := fd.pfd.Accept()
    // ...
    // 用上面返回的fd(d)創(chuàng)建一個(gè)netFD
    if netfd, err = newFD(d, fd.family, fd.sotype, fd.net);
    // ...
    // 調(diào)用`netFD`的`init`方法完成`pollDesc`初始化遗菠,并將事件加入epoll實(shí)例
    if err = netfd.init(); err != nil {
        netfd.Close()
        return nil, err
    }
    // ...
    return netfd, nil
}

// `internal/poll/fd_unix.go`
// Accept 封裝了accept網(wǎng)絡(luò)調(diào)用
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
    // ...
    for {
        // 其內(nèi)調(diào)用syscall.Accept4/syscall.Accept联喘,設(shè)置為syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC
        s, rsa, errcall, err := accept(fd.Sysfd)
        // listener fd在創(chuàng)建的時(shí)候設(shè)置為非阻塞模式,accept()會(huì)立即返回辙纬,
        // 判斷err耸袜,為nil則說明有新連接事件,直接return
        if err == nil {
            return s, rsa, "", err
        }
        switch err {
        case syscall.EAGAIN:
            if fd.pd.pollable() {
                // 如果err為syscall.EAGAIN牲平,并且pollDesc的runtimeCtx不為空堤框,則調(diào)用pollDesc.waitRead,
                // 其中調(diào)用了`runtime_pollWait`纵柿,實(shí)際連接調(diào)用的是`runtime.poll_runtime_pollWait`
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }
        case syscall.ECONNABORTED:
            continue
        }
        return -1, nil, errcall, err
    }
}

func (pd *pollDesc) waitRead(isFile bool) error {
    return pd.wait('r', isFile)
}

func (pd *pollDesc) waitWrite(isFile bool) error {
    return pd.wait('w', isFile)
}

func (pd *pollDesc) wait(mode int, isFile bool) error {
    if pd.runtimeCtx == 0 {
        return errors.New("waiting for unsupported file type")
    }
    res := runtime_pollWait(pd.runtimeCtx, mode)
    return convertErr(res, isFile)
}

  • Accept接收客戶端連接請(qǐng)求建立新連接蜈抓,通過netFDaccept()創(chuàng)建系統(tǒng)fd,將socket設(shè)置為非阻塞I/O模式昂儒。
  • 后面邏輯與前面講的一樣沟使,創(chuàng)建并初始化netFD,其內(nèi)完成pollDesc初始化渊跋、調(diào)用runtime.netpollopenfd腊嗡、epollevent添加到epoll實(shí)例。
  • 因?yàn)槭欠亲枞J绞霸停?dāng)accept()返回err為syscall.EAGAIN時(shí)燕少,若pollDescruntimeCtx不為空,則調(diào)用pollDesc.waitRead蒿囤,其中調(diào)用了runtime_pollWait客们。
  • runtime_pollWait實(shí)際link的是runtime.poll_runtime_pollWait,其中調(diào)用netpollblock材诽,源碼如下:
// `runtime/netpoll.go`
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
    // netpollcheckerr and check GOOS
    ///...
    // for循環(huán)等待 IO ready
    for !netpollblock(pd, int32(mode), false) {
        err = netpollcheckerr(pd, int32(mode))
        if err != 0 {
            return err
        }
    }
    return 0
}

// IO reday返回true底挫,超時(shí)或者關(guān)閉返回false
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    // gpp根據(jù)mode的值取rg或者wg,后面調(diào)用gopark時(shí)脸侥,會(huì)把當(dāng)前的goroutine的g結(jié)構(gòu)指針存入gpp
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }
    // set the gpp semaphore to WAIT
    // 判斷狀態(tài)為 pdReady 則 return建邓, 否則設(shè)置為 pdWait
    for {
        old := *gpp
        // old == pdReady 表示此時(shí)已有 I/O 事件發(fā)生,
        // 直接返回 unblock 當(dāng)前 goroutine 并執(zhí)行相應(yīng)的 I/O 操作
        if old == pdReady {
            *gpp = 0
            return true
        }
        if old != 0 {
            throw("runtime: double wait")
        }
        // CAS原子操作將gpp置為`pdWait`
        if atomic.Casuintptr(gpp, 0, pdWait) {
            break
        }
    }

    // recheck error states
    if waitio || netpollcheckerr(pd, mode) == 0 {
        // 防止此時(shí)可能會(huì)有其他的并發(fā)操作修改pd里的內(nèi)容睁枕,所以需要再次檢查錯(cuò)誤狀態(tài)官边。
        // gopark將當(dāng)前goroutine置于等待狀態(tài)并等待下一次的調(diào)度
        // `netpollblockcommit`回調(diào)函數(shù)在gopark內(nèi)部回調(diào)時(shí),CAS將當(dāng)前goroutine指針存到傳入的參數(shù)`gpp`
        gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
    }
    // 通過原子操作將gpp的值設(shè)置為0譬重,返回修改前的值并判斷是否pdReady
    old := atomic.Xchguintptr(gpp, 0)
    if old > pdWait {
        throw("runtime: corrupted polldesc")
    }
    return old == pdReady
}

func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
    r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
    if r {
        // Bump the count of goroutines waiting for the poller.
        // The scheduler uses this to decide whether to block
        // waiting for the poller if there is nothing else to do.
        atomic.Xadd(&netpollWaiters, 1)
    }
    return r
}

關(guān)于Go的調(diào)度拒逮,此處不深入,簡(jiǎn)單講一下臀规。
gopark調(diào)用mcall(park_m)mcall是通過匯編實(shí)現(xiàn)的栅隐,其函數(shù)原型及作用為:

func mcall(fn func(*g))
  • 從當(dāng)前g棧切換到g0棧塔嬉;
  • 在g0棧上執(zhí)行函數(shù)fn(g)玩徊,此處為park_m
  • 保存當(dāng)前g的信息谨究,將PC/SP存儲(chǔ)到g.sched中恩袱,當(dāng)被重新調(diào)度時(shí)能夠獲取相關(guān)信息繼續(xù)執(zhí)行。

park_m()中將當(dāng)前goroutine狀態(tài)由_GrunningCAS為_Gwaiting胶哲、與當(dāng)前的m解綁畔塔,并回調(diào)netpollblockcommitgr/gwpdWaitCAS為goroutine指針,然后調(diào)用schedule()鸯屿。
schedule函數(shù)永遠(yuǎn)不會(huì)返回澈吨,其調(diào)用邏輯為:schedule() -> execute() -> gogo() -> goroutine 任務(wù) -> goexit() -> goexit1() -> mcall() -> goexit0() -> schedule()。
當(dāng)goroutine對(duì)應(yīng)的fd上發(fā)生期望的事件時(shí)寄摆,它就會(huì)被重新調(diào)度谅辣,從g.sched中獲取之前保存的信息,繼續(xù)執(zhí)行后面的邏輯婶恼,此時(shí)gpppdReady狀態(tài)桑阶。

Read、Write

TCPListeneraccept()中創(chuàng)建并初始化netFD后勾邦,會(huì)調(diào)用newTCPConn()創(chuàng)建并返回*TCPConn蚣录,它實(shí)現(xiàn)了net.Conn接口,我們主要看ReadWrite方法眷篇。
Read調(diào)用鏈源碼:

// `net/tcpsock.go`
func newTCPConn(fd *netFD) *TCPConn {
    c := &TCPConn{conn{fd}}
    setNoDelay(c.fd, true)
    return c
}

type TCPConn struct {
    conn
}

// `net/net.go`
type conn struct {
    fd *netFD
}

// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
    if !c.ok() {
        return 0, syscall.EINVAL
    }
    n, err := c.fd.Read(b)
    if err != nil && err != io.EOF {
        err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
    }
    return n, err
}

// `net/fd_unix.go`
func (fd *netFD) Read(p []byte) (n int, err error) {
    n, err = fd.pfd.Read(p)
    runtime.KeepAlive(fd)
    return n, wrapSyscallError("read", err)
}

// `internal/poll/fd_unix.go`
// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
    // ...
    for {
        // syscall.Read系統(tǒng)調(diào)用從socket讀取數(shù)據(jù)包归,因?yàn)?socket在被accept的時(shí)候設(shè)置為非阻塞 I/O,不會(huì)阻塞
        n, err := syscall.Read(fd.Sysfd, p)
        if err != nil {
            n = 0
            if err == syscall.EAGAIN && fd.pd.pollable() {
                // 當(dāng)err為syscall.EAGAIN铅歼,調(diào)用waitRead公壤,
                // 從上面的分析知道,其內(nèi)通過gopark將當(dāng)前goroutine掛起
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }
            if runtime.GOOS == "darwin" && err == syscall.EINTR {
                continue
            }
        }
        err = fd.eofError(n, err)
        return n, err
    }
}

conn.Writeconn.Read邏輯類似椎椰,通過FD.Write調(diào)用syscall.Write厦幅,因?yàn)闉榉亲枞?I/O,如果返回err為syscall.EAGAIN慨飘,也會(huì)類似Read确憨,調(diào)用pollDesc.waitWrite
執(zhí)行runtime_pollWait->netpollblock瓤的,gopark住當(dāng)前goroutine休弃,直到有事件發(fā)生被重新調(diào)度。

netpoll

通過前面的分析圈膏,我們了解了Go通過gopark住 goroutine 達(dá)到阻塞 Accept/Read/Write 的效果∷現(xiàn)在會(huì)有兩個(gè)疑問:

  1. 當(dāng)相應(yīng)的 I/O 事件發(fā)生之后,如何喚醒這些gopark住的goroutine從而繼續(xù)調(diào)度執(zhí)行呢稽坤?
  2. 我們前面講到了跟epoll相關(guān)的兩個(gè)調(diào)用epoll_create丈甸、epoll_ctl糯俗,還有一個(gè)重要的epoll_wait在哪里調(diào)用的呢?

通過源碼睦擂,可以發(fā)現(xiàn)得湘,在runtime/netpoll_epoll.go中有一個(gè)netpoll()方法,它內(nèi)部調(diào)用 epollwait(epoll_wait) 獲取就緒的fd事件epollevent列表顿仇,然后將每個(gè)epollevent.data值取出轉(zhuǎn)化為*pollDesc淘正,并調(diào)用netpollready->netpollunblock, 將rg/wg的狀態(tài)轉(zhuǎn)化為pdReady(ioready),同時(shí)將讀臼闻、寫g指針添加到goroutine列表gList返回鸿吆。
相關(guān)源碼如下:

// runtime/proc.go
// A gList is a list of Gs linked through g.schedlink. A G can only be
// on one gQueue or gList at a time.
type gList struct {
    head guintptr
}

// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) gList {
    // epfd、delay判斷waitms賦值
    // ...
    var events [128]epollevent
retry:
    // 獲取就緒的fd事件列表
    n := epollwait(epfd, &events[0], int32(len(events)), waitms)
    if n < 0 {
        // ...
        goto retry
    }
    var toRun gList
    for i := int32(0); i < n; i++ {
        ev := &events[i]
        if ev.events == 0 {
            continue
        }
        // 判斷是否為netpollinit注冊(cè)epoll實(shí)例時(shí)設(shè)置的netpollBreakRd
        if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
            // ...
            continue
        }

        var mode int32
        // 讀事件
        if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'r'
        }
        // 寫事件
        if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'w'
        }
        if mode != 0 {
            // 取出保存在epollevent.data中的pollDesc
            pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
            pd.everr = false
            if ev.events == _EPOLLERR {
                pd.everr = true
            }
            // 調(diào)用netpollready些阅,傳入就緒fd的pollDesc
            netpollready(&toRun, pd, mode)
        }
    }
    return toRun
}

// netpollready 調(diào)用netpollunblock伞剑,把pollDesc中相應(yīng)的可讀、寫goroutine取出
// 并將pollDesc.rg/wg轉(zhuǎn)換狀態(tài)為pdReady市埋,然后將取出的goroutine push到鏈表 toRun 中
//go:nowritebarrier
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
    var rg, wg *g
    if mode == 'r' || mode == 'r'+'w' {
        rg = netpollunblock(pd, 'r', true)
    }
    if mode == 'w' || mode == 'r'+'w' {
        wg = netpollunblock(pd, 'w', true)
    }
    if rg != nil {
        toRun.push(rg)
    }
    if wg != nil {
        toRun.push(wg)
    }
}

func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }

    for {
        old := *gpp
        if old == pdReady {
            return nil
        }
        if old == 0 && !ioready {
            // Only set READY for ioready. runtime_pollWait
            // will check for timeout/cancel before waiting.
            return nil
        }
        var new uintptr
        if ioready {
            new = pdReady
        }
        // gpp CAS 為 pdReady
        if atomic.Casuintptr(gpp, old, new) {
            if old == pdReady || old == pdWait {
                old = 0
            }
            // 將之前存入pollDesc的 g結(jié)構(gòu)指針 old 轉(zhuǎn)換為 *g
            return (*g)(unsafe.Pointer(old))
        }
    }
}

我們看到Go 在netpoll()中獲取觸發(fā)讀寫事件的goroutine列表黎泣,而netpoll()會(huì)在多處被調(diào)用(runtime/proc.go):

  • startTheWorldWithSema()
    StartTheWorld時(shí)會(huì)調(diào)用netpoll獲取gList進(jìn)行調(diào)度
  • findrunnable()
    該方法會(huì)在Go scheduler的核心方法schedule()中被調(diào)用,從而調(diào)用netpoll獲取gList
  • pollWork()
    后臺(tái)工作循環(huán)(比如idle GC)檢查時(shí)會(huì)調(diào)用netpoll獲取gList
  • sysmon()
    在程序啟動(dòng)時(shí)調(diào)用缤谎,不需要P抒倚,使用獨(dú)立的M作為監(jiān)控線程,sysmon每 20us~10ms運(yùn)行一次坷澡,調(diào)用netpoll獲取gList

當(dāng)上面這些調(diào)用獲取gList后托呕,都會(huì)調(diào)用injectglist()方法(findrunnable()中會(huì)先pop出一個(gè)g,將g狀態(tài)由_GwaitingCAS為_Grunnable,然后再調(diào)用injectglist())频敛,injectglist方法會(huì)將gList中的g的狀態(tài)由_GwaitingCAS為_Grunnable项郊,然后放入全局運(yùn)行隊(duì)列(globrunqput(gp)),從而被重新調(diào)度斟赚,當(dāng)goroutine被重新調(diào)度時(shí)着降,會(huì)從g.sched中取出PC/SP信息,繼續(xù)執(zhí)行之前的邏輯拗军。

// Injects the list of runnable G's into the scheduler and clears glist.
// Can run concurrently with GC.
func injectglist(glist *gList) {
    if glist.empty() {
        return
    }
    if trace.enabled {
        for gp := glist.head.ptr(); gp != nil; gp = gp.schedlink.ptr() {
            traceGoUnpark(gp, 0)
        }
    }
    lock(&sched.lock)
    var n int
    // 從glist中循環(huán)取出gp
    for n = 0; !glist.empty(); n++ {
        gp := glist.pop()
        // 由 _Gwaiting 變?yōu)?_Grunnable
        casgstatus(gp, _Gwaiting, _Grunnable)
        // 放入全局運(yùn)行隊(duì)列
        globrunqput(gp)
    }
    unlock(&sched.lock)
    for ; n != 0 && sched.npidle != 0; n-- {
        // 循環(huán)獲取空閑P任洞,并調(diào)度 M 去運(yùn)行 P
        startm(nil, false)
    }
    *glist = gList{}
}

Go基于epoll/kqueue/iocp和自身的運(yùn)行時(shí)調(diào)度機(jī)制,實(shí)現(xiàn)了自己的I/O多路復(fù)用netpoll網(wǎng)絡(luò)模型发侵,從上面源碼可以看到交掏,Accept/Read/Write等方法其底層實(shí)現(xiàn)均采用非阻塞方式,而我們?cè)陂_發(fā)過程中調(diào)用的方式很顯然是同步模式刃鳄,這就大大降低了網(wǎng)絡(luò)開發(fā)難度盅弛,從這也可以看出Go語(yǔ)言設(shè)計(jì)的初衷之一:簡(jiǎn)單而高效。

至此,本文關(guān)于Go net(TCP service)相關(guān)內(nèi)容就介紹完了熊尉,總結(jié)一下涉及的內(nèi)容:

  • TCP Server的簡(jiǎn)單使用示例
  • 簡(jiǎn)單介紹了一個(gè)可使用(或作為參考)的罐柳、基于Go原生net package實(shí)現(xiàn)的go net項(xiàng)目(github開源項(xiàng)目
  • 主要進(jìn)行了 netpoll相關(guān)源碼的分析(基于Go1.14)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末掌腰,一起剝皮案震驚了整個(gè)濱河市狰住,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌齿梁,老刑警劉巖催植,帶你破解...
    沈念sama閱讀 221,635評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異抬伺,居然都是意外死亡喷众,警方通過查閱死者的電腦和手機(jī)赃梧,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,543評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)稿辙,“玉大人,你說我怎么就攤上這事气忠×诖ⅲ” “怎么了?”我有些...
    開封第一講書人閱讀 168,083評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵旧噪,是天一觀的道長(zhǎng)吨娜。 經(jīng)常有香客問我,道長(zhǎng)淘钟,這世上最難降的妖魔是什么宦赠? 我笑而不...
    開封第一講書人閱讀 59,640評(píng)論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮米母,結(jié)果婚禮上勾扭,老公的妹妹穿的比我還像新娘。我一直安慰自己铁瞒,他們只是感情好妙色,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,640評(píng)論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著精拟,像睡著了一般燎斩。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上蜂绎,一...
    開封第一講書人閱讀 52,262評(píng)論 1 308
  • 那天栅表,我揣著相機(jī)與錄音,去河邊找鬼师枣。 笑死怪瓶,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的践美。 我是一名探鬼主播洗贰,決...
    沈念sama閱讀 40,833評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼找岖,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了敛滋?” 一聲冷哼從身側(cè)響起许布,我...
    開封第一講書人閱讀 39,736評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎绎晃,沒想到半個(gè)月后蜜唾,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,280評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡庶艾,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,369評(píng)論 3 340
  • 正文 我和宋清朗相戀三年袁余,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片咱揍。...
    茶點(diǎn)故事閱讀 40,503評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡颖榜,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出煤裙,到底是詐尸還是另有隱情掩完,我是刑警寧澤,帶...
    沈念sama閱讀 36,185評(píng)論 5 350
  • 正文 年R本政府宣布积暖,位于F島的核電站藤为,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏夺刑。R本人自食惡果不足惜缅疟,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,870評(píng)論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望遍愿。 院中可真熱鬧存淫,春花似錦、人聲如沸沼填。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,340評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)坞笙。三九已至岩饼,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間薛夜,已是汗流浹背籍茧。 一陣腳步聲響...
    開封第一講書人閱讀 33,460評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留梯澜,地道東北人寞冯。 一個(gè)月前我還...
    沈念sama閱讀 48,909評(píng)論 3 376
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親吮龄。 傳聞我的和親對(duì)象是個(gè)殘疾皇子俭茧,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,512評(píng)論 2 359