注:本文分析基于Go1.14
簡(jiǎn)單實(shí)例
一個(gè)簡(jiǎn)單的tcp server例子
package main
import (
"context"
"fmt"
"net"
"time"
)
func main() {
listener, err := net.Listen("tcp", "0.0.0.0:7777")
if err != nil {
fmt.Println("Listen error:", err)
return
}
defer listener.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for {
conn, err := listener.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
fmt.Println("Accept temp error:", ne)
time.Sleep(time.Second)
continue
}
fmt.Println("Accept error:", err)
break
}
go handleConn(ctx, conn)
}
}
func handleConn(ctx context.Context, conn net.Conn) {
defer conn.Close()
buf := make([]byte, 1024)
for {
select {
case <-ctx.Done():
return
default:
}
n, err := conn.Read(buf)
if err != nil {
fmt.Println("Read error:", err)
}
_, err = conn.Write(buf[:n])
if err != nil {
fmt.Println("Write error:", err)
}
}
}
完整實(shí)現(xiàn)
上面是簡(jiǎn)單的TCP Server使用示例,如果想獲取完整的服務(wù)實(shí)現(xiàn)术裸,可以使用或者參考github上的一個(gè)go net 開源項(xiàng)目:
https://github.com/izhw/gnet
該項(xiàng)目使用go原生net package實(shí)現(xiàn)恼五,不依賴其他第三方庫(kù)磨淌,可以幫助開發(fā)者快速搭建一個(gè)net服務(wù)(TCP Server or Client)硫豆。
比如芬位,開發(fā)者只要實(shí)現(xiàn)gnet.NetEventHandler事件回調(diào)(也可內(nèi)置默認(rèn)實(shí)現(xiàn))无拗,使用可選的Functional options,就能快速搭建一個(gè)TCP Server昧碉,簡(jiǎn)單使用代碼如下:
package main
import (
"fmt"
"log"
"github.com/izhw/gnet"
"github.com/izhw/gnet/tcp/tcpserver"
)
type ServerHandler struct {
*gnet.NetEventHandler
}
func (h *ServerHandler) OnReadMsg(c gnet.Conn, data []byte) error {
fmt.Println("read msg:", string(data))
c.Write(data)
return nil
}
func main() {
s := tcpserver.NewServer("0.0.0.0:7777", &ServerHandler{})
log.Fatal("Exit:", s.Serve())
}
Go net源碼分析 (go1.14)
開始的簡(jiǎn)單示例中英染,我們看到幾個(gè)關(guān)鍵的調(diào)用:net.Listen()
、listener.Accept()
被饿、conn.Read()
四康、conn.Write
,下面分別進(jìn)行源碼分析狭握。
Listen
-
net.Listen
返回的是實(shí)現(xiàn)了net.Listener
接口的*TCPListener
闪金。 - 其中,
Listen
方法內(nèi)论颅,生成系統(tǒng)文件描述符sysfd
哎垦,使用該sysfd
設(shè)置參數(shù)、調(diào)用syscall.Bind
恃疯、syscall.Listen
完成綁定漏设、監(jiān)聽,并初始化一些重要的結(jié)構(gòu)信息澡谭,創(chuàng)建epoll句柄愿题、注冊(cè)epoll事件损俭,然后構(gòu)造返回TCPListener
。
調(diào)用鏈源碼如下(只貼了關(guān)鍵代碼)潘酗。
// `net/net.go`
// Listener接口
type Listener interface {
// 阻塞等待杆兵,有新連接事件的時(shí)候,返回一個(gè)net.Conn
Accept() (Conn, error)
// 關(guān)閉listener仔夺,任何阻塞的Accept操作變?yōu)椴蛔枞鲈啵⒎祷劐e(cuò)誤
Close() error
// 返回listener的網(wǎng)絡(luò)地址
Addr() Addr
}
// `net/tcpsock.go`
type TCPListener struct {
// Go封裝的網(wǎng)絡(luò)描述符,后面會(huì)具體講
fd *netFD
// Listen配置
lc ListenConfig
}
// `net/dial.go`
// 根據(jù)不同的 'network'和'address'構(gòu)建相應(yīng)的'Listener'
func Listen(network, address string) (Listener, error) {
var lc ListenConfig
return lc.Listen(context.Background(), network, address)
}
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
// resolve解析addrs
// ...
sl := &sysListener{
ListenConfig: *lc,
network: network,
address: address,
}
// 根據(jù)不同的addrs類型調(diào)用不同的listen
var l Listener
la := addrs.first(isIPv4)
switch la := la.(type) {
case *TCPAddr:
l, err = sl.listenTCP(ctx, la)
case *UnixAddr:
l, err = sl.listenUnix(ctx, la)
default:
// return error...
}
// return error...
return l, nil
}
// `net/tcpsock_posix.go`
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
// internetSocket內(nèi)部解析AddrFamily并調(diào)用socket()
// socket內(nèi)部創(chuàng)建關(guān)鍵的結(jié)構(gòu)對(duì)象netFD缸兔,并初始化綁定日裙、監(jiān)聽等
// 注意這里的syscall.SOCK_STREAM,后面會(huì)用到
fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
if err != nil {
return nil, err
}
// 用上面創(chuàng)建并初始化好的fd構(gòu)造TCPListener
return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}
// `net/sock_posix.go`
// 返回一個(gè)初始化好的異步I/O `netFD` 網(wǎng)絡(luò)描述符
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
// `sysSocket`內(nèi)部調(diào)用syscall.Socket惰蜜,
// 并置為非阻塞和close-on-exec (Syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)昂拂,
// 返回系統(tǒng)描述符: s
s, err := sysSocket(family, sotype, proto)
if err != nil {
return nil, err
}
// ...
// 用`sysSocket`創(chuàng)建的`s`創(chuàng)建`netFD`
if fd, err = newFD(s, family, sotype, net); err != nil {
poll.CloseFunc(s)
return nil, err
}
// ...
// 上面提到過,在listenTCP中傳入的參數(shù)為syscall.SOCK_STREAM
// 此處判斷該sotype類型抛猖,調(diào)用fd.listenStream()
if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
fd.Close()
return nil, err
}
// ...
return fd, nil
}
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
var err error
// 設(shè)置默認(rèn)參數(shù) SO_REUSEADDR
if err = setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil {
return err
}
var lsa syscall.Sockaddr
if lsa, err = laddr.sockaddr(fd.family); err != nil {
return err
}
if ctrlFn != nil {
// ...
}
// syscall.Bind 綁定
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
// syscall.Listen 監(jiān)聽
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}
// netFD初始化格侯,fd.init()->fd.pfd.Init()->fd.pfd.pd.init()`
// 最終調(diào)用的是`runtime_pollServerInit`、`runtime_pollOpen`
// netFD初始化部分后面會(huì)接著講
if err = fd.init(); err != nil {
return err
}
lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}
為了更好的理解财著,需要講幾個(gè)重要的struct:
-
netFD
是網(wǎng)絡(luò)描述符联四,其結(jié)構(gòu)體中有一個(gè)poll.FD
對(duì)象。 -
poll.FD
是文件描述符撑教,表示一個(gè)網(wǎng)絡(luò)連接或者OS文件朝墩。 -
poll.FD
結(jié)構(gòu)中主要看Sysfd int
和pd pollDesc
兩個(gè)變量,前者是系統(tǒng)返回的文件描述符伟姐,后者其內(nèi)部封裝了運(yùn)行時(shí)上下文收苏,包括讀、寫goroutine及其狀態(tài)玫镐,讀寫超時(shí)等基本信息倒戏。通過將pollDesc
指針信息存入epollevent.data
(8字節(jié)數(shù)組)中怠噪,然后調(diào)用epollctl
(epoll_ctl
)將fd
和epollevent
信息注冊(cè)到epoll實(shí)例上恐似,實(shí)現(xiàn)epoll事件回調(diào)和用戶態(tài)協(xié)程調(diào)用的關(guān)聯(lián)。
相關(guān)結(jié)構(gòu)源碼如下:
// `net/fd_unix.go`
type netFD struct {
pfd poll.FD
// Close前不可變
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}
// `internal/poll/fd_unix.go`
type FD struct {
// sysfd和Read/Write方法鎖
fdmu fdMutex
// 系統(tǒng)文件描述符
Sysfd int
// I/O poller傍念,封裝了運(yùn)行時(shí)上下文
pd pollDesc
// Writev 緩存.
iovecs *[]syscall.Iovec
// 文件關(guān)閉的時(shí)候發(fā)送信號(hào)量
csema uint32
// 非阻塞模式時(shí)非零
isBlocking uint32
// 是streaming還是packet-based UDP
IsStream bool
// 讀到零字節(jié)是否表示EOF矫夷,對(duì)于基于消息的套接字連接為false
ZeroReadIsEOF bool
// 是一個(gè)file而并非network socket
isFile bool
}
// internal/poll/fd_poll_runtime.go
// `pollDesc`結(jié)構(gòu)中只有一個(gè)`uintptr`變量,`runtimeCtx`封裝了運(yùn)行時(shí)上下文憋槐,其具體信息后面會(huì)講
type pollDesc struct {
runtimeCtx uintptr
}
接著看netFD
初始化源碼:
// `net/fd_unix.go`
// netFD初始化
func (fd *netFD) init() error {
// 調(diào)用 pfd (poll.FD) 的Init方法
return fd.pfd.Init(fd.net, true)
}
// `internal/poll/fd_unix.go`
// FD初始化
func (fd *FD) Init(net string, pollable bool) error {
// ...
// 調(diào)用 pd (pollDesc) 的init方法
err := fd.pd.init(fd)
// ...
return err
}
// internal/poll/fd_poll_runtime.go
// serverInit全局變量双藕,只執(zhí)行一次runtime_pollServerInit,
// 并在其內(nèi)部調(diào)用runtime.netpollinit()創(chuàng)建epoll實(shí)例阳仔;
// runtime_pollOpen內(nèi)部調(diào)用runtime.netpollopen忧陪,
// 將listener fd注冊(cè)到epoll實(shí)例中,初始化pollDesc并返回ctx,賦值runtimeCtx
var serverInit sync.Once
func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return errnoErr(syscall.Errno(errno))
}
pd.runtimeCtx = ctx
return nil
}
上面pollDesc
的init()
方法中嘶摊,runtime_pollServerInit
和runtime_pollOpen
實(shí)際link的是runtime
包下的poll_runtime_pollServerInit
和poll_runtime_pollOpen
函數(shù)延蟹,具體實(shí)現(xiàn)在runtime/netpoll.go
中。
首先叶堆,看一下pollDesc
在runtime包下的具體封裝信息:
// `runtime/netpoll.go`
// Network poller descriptor.
// No heap pointers.
//go:notinheap
type pollDesc struct {
link *pollDesc // in pollcache, protected by pollcache.lock
lock mutex // protects the following fields
fd uintptr
closing bool
everr bool // marks event scanning error happened
user uint32 // user settable cookie
rseq uintptr // protects from stale read timers
rg uintptr // pdReady, pdWait, G waiting for read or nil
rt timer // read deadline timer (set if rt.f != nil)
rd int64 // read deadline
wseq uintptr // protects from stale write timers
wg uintptr // pdReady, pdWait, G waiting for write or nil
wt timer // write deadline timer
wd int64 // write deadline
}
pollDesc
結(jié)構(gòu)中的重要變量:
-
lock
鎖 防止內(nèi)部成員變量并發(fā)讀寫問題 -
fd
為文件描述符 -
rt
和wt
分別表示讀寫定時(shí)器阱飘,用來(lái)防止讀寫超時(shí) -
rg
和wg
分別保存了用戶態(tài)操作pollDesc
的讀、寫goroutine地址虱颗,以及goroutine的ready/wait狀態(tài)沥匈,用于goroutine讀寫阻塞時(shí)掛起、就緒時(shí)恢復(fù)運(yùn)行
接著看一下runtime
包下面的Init
和Open
忘渔,Init
全局只初始化一次高帖。
// `runtime/netpoll.go`
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
netpollGenericInit()
}
func netpollGenericInit() {
// 判斷netpoll是否已經(jīng)初始化過
if atomic.Load(&netpollInited) == 0 {
// 全局鎖
lock(&netpollInitLock)
if netpollInited == 0 {
// 調(diào)用netpollinit()
netpollinit()
atomic.Store(&netpollInited, 1)
}
unlock(&netpollInitLock)
}
}
type epollevent struct {
// 事件
events uint32
data [8]byte // unaligned uintptr
}
// `runtime/netpoll_epoll.go`
func netpollinit() {
// 調(diào)用 OS epoll_create,創(chuàng)建一個(gè)epoll實(shí)例畦粮,
// 把生成的epoll fd賦值給全局變量 `epfd`
// 后續(xù)listener以及accept的所有sockets相關(guān)的epoll操作都是基于這個(gè)`epfd`
epfd = epollcreate1(_EPOLL_CLOEXEC)
// ...
ev := epollevent{
// 讀事件
events: _EPOLLIN,
}
// netpollBreakRd: for netpollBreak
// 在后面有事件回調(diào)時(shí)會(huì)用到棋恼,判斷是否為netpollBreakRd
*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
// 系統(tǒng)調(diào)用 epoll_ctl,注冊(cè)epoll事件
errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
// ...
}
接著看Open
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
// pollcache為全局pollDesc鏈表緩存,調(diào)用 alloc()獲取一個(gè)*pollDesc
pd := pollcache.alloc()
lock(&pd.lock)
// 鎖住锈玉,初始化爪飘、賦值成員變量
// ...
unlock(&pd.lock)
var errno int32
// 調(diào)用 netpollopen(),實(shí)現(xiàn)見下面
errno = netpollopen(fd, pd)
return pd, int(errno)
}
// alloc 如果鏈表頭`first`為空拉背,則分配內(nèi)存并初始化n個(gè)`*pollDesc`節(jié)點(diǎn)鏈表师崎,然后pop出頭節(jié)點(diǎn);
// 如果`first`不為空則直接pop出頭部節(jié)點(diǎn)椅棺。
func (c *pollCache) alloc() *pollDesc {
lock(&c.lock)
if c.first == nil {
const pdSize = unsafe.Sizeof(pollDesc{})
n := pollBlockSize / pdSize
if n == 0 {
n = 1
}
// Must be in non-GC memory because can be referenced
// only from epoll/kqueue internals.
mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
for i := uintptr(0); i < n; i++ {
pd := (*pollDesc)(add(mem, i*pdSize))
pd.link = c.first
c.first = pd
}
}
pd := c.first
c.first = pd.link
unlock(&c.lock)
return pd
}
// netpollopen
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
// 觸發(fā)事件犁罩,讀、寫两疚、掛起關(guān)閉床估、邊緣觸發(fā)
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
// 調(diào)用`epollctl`注冊(cè)fd到epoll實(shí)例
// 同時(shí)把`*pollDesc`保存到`epollevent.data`里傳入內(nèi)核
// 實(shí)現(xiàn)內(nèi)核態(tài)事件和用戶態(tài)協(xié)程的關(guān)聯(lián)
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
從上面的源碼可以看到,Listen
實(shí)現(xiàn)了TCP Server的綁定诱渤、監(jiān)聽丐巫,通過調(diào)用epoll_create
、epoll_ctl
創(chuàng)建epoll句柄勺美、注冊(cè)epoll事件递胧,并將goroutine信息與回調(diào)事件相關(guān)聯(lián)。
Accept
listener.Accept
是Listener
的接口方法赡茸,TCPListener
實(shí)現(xiàn)了該方法缎脾,它阻塞等待下一次調(diào)用并返回一個(gè)Conn。
// `net/tcpsock.go`
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}
// `net/tcpsock_posix.go`
func (ln *TCPListener) accept() (*TCPConn, error) {
// 關(guān)注1:
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
// 關(guān)注2:
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}
// `net/fd_unix.go`
func (fd *netFD) accept() (netfd *netFD, err error) {
// 調(diào)用`poll.FD`的`Accept`方法接受新的socket連接占卧,返回socket的fd
d, rsa, errcall, err := fd.pfd.Accept()
// ...
// 用上面返回的fd(d)創(chuàng)建一個(gè)netFD
if netfd, err = newFD(d, fd.family, fd.sotype, fd.net);
// ...
// 調(diào)用`netFD`的`init`方法完成`pollDesc`初始化遗菠,并將事件加入epoll實(shí)例
if err = netfd.init(); err != nil {
netfd.Close()
return nil, err
}
// ...
return netfd, nil
}
// `internal/poll/fd_unix.go`
// Accept 封裝了accept網(wǎng)絡(luò)調(diào)用
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
// ...
for {
// 其內(nèi)調(diào)用syscall.Accept4/syscall.Accept联喘,設(shè)置為syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC
s, rsa, errcall, err := accept(fd.Sysfd)
// listener fd在創(chuàng)建的時(shí)候設(shè)置為非阻塞模式,accept()會(huì)立即返回辙纬,
// 判斷err耸袜,為nil則說明有新連接事件,直接return
if err == nil {
return s, rsa, "", err
}
switch err {
case syscall.EAGAIN:
if fd.pd.pollable() {
// 如果err為syscall.EAGAIN牲平,并且pollDesc的runtimeCtx不為空堤框,則調(diào)用pollDesc.waitRead,
// 其中調(diào)用了`runtime_pollWait`纵柿,實(shí)際連接調(diào)用的是`runtime.poll_runtime_pollWait`
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
case syscall.ECONNABORTED:
continue
}
return -1, nil, errcall, err
}
}
func (pd *pollDesc) waitRead(isFile bool) error {
return pd.wait('r', isFile)
}
func (pd *pollDesc) waitWrite(isFile bool) error {
return pd.wait('w', isFile)
}
func (pd *pollDesc) wait(mode int, isFile bool) error {
if pd.runtimeCtx == 0 {
return errors.New("waiting for unsupported file type")
}
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}
-
Accept
接收客戶端連接請(qǐng)求建立新連接蜈抓,通過netFD
的accept()
創(chuàng)建系統(tǒng)fd,將socket設(shè)置為非阻塞I/O模式昂儒。 - 后面邏輯與前面講的一樣沟使,創(chuàng)建并初始化
netFD
,其內(nèi)完成pollDesc
初始化渊跋、調(diào)用runtime.netpollopen
將fd
腊嗡、epollevent
添加到epoll實(shí)例。 - 因?yàn)槭欠亲枞J绞霸停?dāng)
accept()
返回err為syscall.EAGAIN時(shí)燕少,若pollDesc
的runtimeCtx
不為空,則調(diào)用pollDesc.waitRead
蒿囤,其中調(diào)用了runtime_pollWait
客们。 -
runtime_pollWait
實(shí)際link的是runtime.poll_runtime_pollWait
,其中調(diào)用netpollblock
材诽,源碼如下:
// `runtime/netpoll.go`
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
// netpollcheckerr and check GOOS
///...
// for循環(huán)等待 IO ready
for !netpollblock(pd, int32(mode), false) {
err = netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
}
return 0
}
// IO reday返回true底挫,超時(shí)或者關(guān)閉返回false
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
// gpp根據(jù)mode的值取rg或者wg,后面調(diào)用gopark時(shí)脸侥,會(huì)把當(dāng)前的goroutine的g結(jié)構(gòu)指針存入gpp
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// set the gpp semaphore to WAIT
// 判斷狀態(tài)為 pdReady 則 return建邓, 否則設(shè)置為 pdWait
for {
old := *gpp
// old == pdReady 表示此時(shí)已有 I/O 事件發(fā)生,
// 直接返回 unblock 當(dāng)前 goroutine 并執(zhí)行相應(yīng)的 I/O 操作
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
throw("runtime: double wait")
}
// CAS原子操作將gpp置為`pdWait`
if atomic.Casuintptr(gpp, 0, pdWait) {
break
}
}
// recheck error states
if waitio || netpollcheckerr(pd, mode) == 0 {
// 防止此時(shí)可能會(huì)有其他的并發(fā)操作修改pd里的內(nèi)容睁枕,所以需要再次檢查錯(cuò)誤狀態(tài)官边。
// gopark將當(dāng)前goroutine置于等待狀態(tài)并等待下一次的調(diào)度
// `netpollblockcommit`回調(diào)函數(shù)在gopark內(nèi)部回調(diào)時(shí),CAS將當(dāng)前goroutine指針存到傳入的參數(shù)`gpp`
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// 通過原子操作將gpp的值設(shè)置為0譬重,返回修改前的值并判斷是否pdReady
old := atomic.Xchguintptr(gpp, 0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
if r {
// Bump the count of goroutines waiting for the poller.
// The scheduler uses this to decide whether to block
// waiting for the poller if there is nothing else to do.
atomic.Xadd(&netpollWaiters, 1)
}
return r
}
關(guān)于Go的調(diào)度拒逮,此處不深入,簡(jiǎn)單講一下臀规。
gopark
調(diào)用mcall(park_m)
,mcall
是通過匯編實(shí)現(xiàn)的栅隐,其函數(shù)原型及作用為:
func mcall(fn func(*g))
- 從當(dāng)前g棧切換到g0棧塔嬉;
- 在g0棧上執(zhí)行函數(shù)fn(g)玩徊,此處為
park_m
; - 保存當(dāng)前g的信息谨究,將PC/SP存儲(chǔ)到
g.sched
中恩袱,當(dāng)被重新調(diào)度時(shí)能夠獲取相關(guān)信息繼續(xù)執(zhí)行。
在park_m()
中將當(dāng)前goroutine狀態(tài)由_Grunning
CAS為_Gwaiting
胶哲、與當(dāng)前的m
解綁畔塔,并回調(diào)netpollblockcommit
將gr/gw
由pdWait
CAS為goroutine指針,然后調(diào)用schedule()
鸯屿。
schedule
函數(shù)永遠(yuǎn)不會(huì)返回澈吨,其調(diào)用邏輯為:schedule() -> execute() -> gogo() -> goroutine 任務(wù) -> goexit() -> goexit1() -> mcall() -> goexit0() -> schedule()。
當(dāng)goroutine對(duì)應(yīng)的fd上發(fā)生期望的事件時(shí)寄摆,它就會(huì)被重新調(diào)度谅辣,從g.sched
中獲取之前保存的信息,繼續(xù)執(zhí)行后面的邏輯婶恼,此時(shí)gpp
為pdReady
狀態(tài)桑阶。
Read、Write
TCPListener
的accept()
中創(chuàng)建并初始化netFD
后勾邦,會(huì)調(diào)用newTCPConn()
創(chuàng)建并返回*TCPConn
蚣录,它實(shí)現(xiàn)了net.Conn
接口,我們主要看Read
和Write
方法眷篇。
Read
調(diào)用鏈源碼:
// `net/tcpsock.go`
func newTCPConn(fd *netFD) *TCPConn {
c := &TCPConn{conn{fd}}
setNoDelay(c.fd, true)
return c
}
type TCPConn struct {
conn
}
// `net/net.go`
type conn struct {
fd *netFD
}
// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}
// `net/fd_unix.go`
func (fd *netFD) Read(p []byte) (n int, err error) {
n, err = fd.pfd.Read(p)
runtime.KeepAlive(fd)
return n, wrapSyscallError("read", err)
}
// `internal/poll/fd_unix.go`
// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
// ...
for {
// syscall.Read系統(tǒng)調(diào)用從socket讀取數(shù)據(jù)包归,因?yàn)?socket在被accept的時(shí)候設(shè)置為非阻塞 I/O,不會(huì)阻塞
n, err := syscall.Read(fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
// 當(dāng)err為syscall.EAGAIN铅歼,調(diào)用waitRead公壤,
// 從上面的分析知道,其內(nèi)通過gopark將當(dāng)前goroutine掛起
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
if runtime.GOOS == "darwin" && err == syscall.EINTR {
continue
}
}
err = fd.eofError(n, err)
return n, err
}
}
conn.Write
與conn.Read
邏輯類似椎椰,通過FD.Write
調(diào)用syscall.Write
厦幅,因?yàn)闉榉亲枞?I/O,如果返回err為syscall.EAGAIN
慨飘,也會(huì)類似Read
确憨,調(diào)用pollDesc.waitWrite
,
執(zhí)行runtime_pollWait
->netpollblock
瓤的,gopark住當(dāng)前goroutine休弃,直到有事件發(fā)生被重新調(diào)度。
netpoll
通過前面的分析圈膏,我們了解了Go
通過gopark
住 goroutine 達(dá)到阻塞 Accept/Read/Write
的效果∷現(xiàn)在會(huì)有兩個(gè)疑問:
- 當(dāng)相應(yīng)的 I/O 事件發(fā)生之后,如何喚醒這些
gopark
住的goroutine從而繼續(xù)調(diào)度執(zhí)行呢稽坤? - 我們前面講到了跟epoll相關(guān)的兩個(gè)調(diào)用
epoll_create
丈甸、epoll_ctl
糯俗,還有一個(gè)重要的epoll_wait
在哪里調(diào)用的呢?
通過源碼睦擂,可以發(fā)現(xiàn)得湘,在runtime/netpoll_epoll.go
中有一個(gè)netpoll()
方法,它內(nèi)部調(diào)用 epollwait
(epoll_wait) 獲取就緒的fd
事件epollevent
列表顿仇,然后將每個(gè)epollevent.data
值取出轉(zhuǎn)化為*pollDesc
淘正,并調(diào)用netpollready
->netpollunblock
, 將rg/wg
的狀態(tài)轉(zhuǎn)化為pdReady
(ioready),同時(shí)將讀臼闻、寫g
指針添加到goroutine列表gList
返回鸿吆。
相關(guān)源碼如下:
// runtime/proc.go
// A gList is a list of Gs linked through g.schedlink. A G can only be
// on one gQueue or gList at a time.
type gList struct {
head guintptr
}
// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) gList {
// epfd、delay判斷waitms賦值
// ...
var events [128]epollevent
retry:
// 獲取就緒的fd事件列表
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
// ...
goto retry
}
var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
// 判斷是否為netpollinit注冊(cè)epoll實(shí)例時(shí)設(shè)置的netpollBreakRd
if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
// ...
continue
}
var mode int32
// 讀事件
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
// 寫事件
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
// 取出保存在epollevent.data中的pollDesc
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.everr = false
if ev.events == _EPOLLERR {
pd.everr = true
}
// 調(diào)用netpollready些阅,傳入就緒fd的pollDesc
netpollready(&toRun, pd, mode)
}
}
return toRun
}
// netpollready 調(diào)用netpollunblock伞剑,把pollDesc中相應(yīng)的可讀、寫goroutine取出
// 并將pollDesc.rg/wg轉(zhuǎn)換狀態(tài)為pdReady市埋,然后將取出的goroutine push到鏈表 toRun 中
//go:nowritebarrier
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
}
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
old := *gpp
if old == pdReady {
return nil
}
if old == 0 && !ioready {
// Only set READY for ioready. runtime_pollWait
// will check for timeout/cancel before waiting.
return nil
}
var new uintptr
if ioready {
new = pdReady
}
// gpp CAS 為 pdReady
if atomic.Casuintptr(gpp, old, new) {
if old == pdReady || old == pdWait {
old = 0
}
// 將之前存入pollDesc的 g結(jié)構(gòu)指針 old 轉(zhuǎn)換為 *g
return (*g)(unsafe.Pointer(old))
}
}
}
我們看到Go 在netpoll()
中獲取觸發(fā)讀寫事件的goroutine列表黎泣,而netpoll()
會(huì)在多處被調(diào)用(runtime/proc.go
):
- startTheWorldWithSema()
StartTheWorld
時(shí)會(huì)調(diào)用netpoll
獲取gList
進(jìn)行調(diào)度 - findrunnable()
該方法會(huì)在Go scheduler的核心方法schedule()
中被調(diào)用,從而調(diào)用netpoll
獲取gList
- pollWork()
后臺(tái)工作循環(huán)(比如idle GC
)檢查時(shí)會(huì)調(diào)用netpoll
獲取gList
- sysmon()
在程序啟動(dòng)時(shí)調(diào)用缤谎,不需要P
抒倚,使用獨(dú)立的M
作為監(jiān)控線程,sysmon
每 20us~10ms運(yùn)行一次坷澡,調(diào)用netpoll
獲取gList
當(dāng)上面這些調(diào)用獲取gList
后托呕,都會(huì)調(diào)用injectglist()
方法(findrunnable()
中會(huì)先pop出一個(gè)g
,將g
狀態(tài)由_Gwaiting
CAS為_Grunnable
,然后再調(diào)用injectglist()
)频敛,injectglist
方法會(huì)將gList
中的g
的狀態(tài)由_Gwaiting
CAS為_Grunnable
项郊,然后放入全局運(yùn)行隊(duì)列(globrunqput(gp)
),從而被重新調(diào)度斟赚,當(dāng)goroutine被重新調(diào)度時(shí)着降,會(huì)從g.sched
中取出PC/SP
信息,繼續(xù)執(zhí)行之前的邏輯拗军。
// Injects the list of runnable G's into the scheduler and clears glist.
// Can run concurrently with GC.
func injectglist(glist *gList) {
if glist.empty() {
return
}
if trace.enabled {
for gp := glist.head.ptr(); gp != nil; gp = gp.schedlink.ptr() {
traceGoUnpark(gp, 0)
}
}
lock(&sched.lock)
var n int
// 從glist中循環(huán)取出gp
for n = 0; !glist.empty(); n++ {
gp := glist.pop()
// 由 _Gwaiting 變?yōu)?_Grunnable
casgstatus(gp, _Gwaiting, _Grunnable)
// 放入全局運(yùn)行隊(duì)列
globrunqput(gp)
}
unlock(&sched.lock)
for ; n != 0 && sched.npidle != 0; n-- {
// 循環(huán)獲取空閑P任洞,并調(diào)度 M 去運(yùn)行 P
startm(nil, false)
}
*glist = gList{}
}
Go基于epoll/kqueue/iocp
和自身的運(yùn)行時(shí)調(diào)度機(jī)制,實(shí)現(xiàn)了自己的I/O多路復(fù)用netpoll
網(wǎng)絡(luò)模型发侵,從上面源碼可以看到交掏,Accept/Read/Write
等方法其底層實(shí)現(xiàn)均采用非阻塞方式,而我們?cè)陂_發(fā)過程中調(diào)用的方式很顯然是同步模式刃鳄,這就大大降低了網(wǎng)絡(luò)開發(fā)難度盅弛,從這也可以看出Go語(yǔ)言設(shè)計(jì)的初衷之一:簡(jiǎn)單而高效。
至此,本文關(guān)于Go net(TCP service)相關(guān)內(nèi)容就介紹完了熊尉,總結(jié)一下涉及的內(nèi)容:
- TCP Server的簡(jiǎn)單使用示例
- 簡(jiǎn)單介紹了一個(gè)可使用(或作為參考)的罐柳、基于Go原生net package實(shí)現(xiàn)的go net項(xiàng)目(github開源項(xiàng)目)
- 主要進(jìn)行了
netpoll
相關(guān)源碼的分析(基于Go1.14)