何為連接池
連接池是負責分配叹坦、管理和釋放連接,它允許應(yīng)用程序重復使用池中的空閑的連接据块,而不是每次都重新建立一個連接。 本質(zhì)就是管理了一堆長鏈接折剃,提供給需求方相應(yīng)的句柄使用另假。
連接池有何用
減少網(wǎng)絡(luò)io開銷
減少了每次連接三次握手和四次揮手的開銷。連接復用怕犁,自然減少了創(chuàng)建边篮,關(guān)閉套接字等流程。提升系統(tǒng)性能控制資源
如果沒有連接池管理奏甫,如每次請求戈轿,協(xié)程都創(chuàng)建一個連接,那么當請求量巨大時阵子,產(chǎn)生非常大的浪費并且可能會導致高負載下的異常發(fā)生思杯,最終導致所有服務(wù)都不可用。這就是為什么很多存儲都會有一層proxy來管理挠进,不讓業(yè)務(wù)服務(wù)直接和存儲連接色乾。簡化編程
使用者只需關(guān)心如何獲取和返回的方法,無需關(guān)心底層連接领突、避免資源泄漏等問題
redigo是如何實現(xiàn)v1.8.4
首先redigo不支持cluster暖璧,作者也不打算支持,所以建議還是選擇go-redis
package main
import (
"fmt"
red "github.com/gomodule/redigo/redis"
"time"
)
type Redis struct {
pool *red.Pool
}
var redis *Redis
func Exec(cmd string, key interface{}, args ...interface{}) (interface{}, error) {
con := redis.pool.Get()
// connct
if err := con.Err(); err != nil {
return nil, err
}
defer con.Close()
parmas := make([]interface{}, 0)
parmas = append(parmas, key)
if len(args) > 0 {
for _, v := range args {
parmas = append(parmas, v)
}
}
return con.Do(cmd, parmas...)
}
func initRedis() {
redis = new(Redis)
redis.pool = &red.Pool{
MaxIdle: 2, //空閑數(shù)
IdleTimeout: 240 * time.Second,
MaxActive: 0, //最大數(shù)
Dial: func() (red.Conn, error) {
c, err := red.Dial("tcp", "127.0.0.1:6379")
if err != nil {
return nil, err
}
return c, err
},
TestOnBorrow: func(c red.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}
}
func main() {
initRedis()
Exec("set", "dandy", "hello")
result, err := Exec("get", "dandy")
if err != nil {
fmt.Print(err.Error())
}
str, _ := red.String(result, err)
fmt.Print(str)
redis.pool.Close()
}
初始化redis.pool
type Pool struct {
// Dial conn中Dial調(diào)用初始化
Dial func() (Conn, error)
// 帶有context的Dial,2選1即可
DialContext func(ctx context.Context) (Conn, error)
// 獲取連接池中君旦,校驗連接是否可用澎办,一般和PING、PONG使用
TestOnBorrow func(c Conn, t time.Time) error
// 連接池中最大空閑數(shù)
MaxIdle int
// 連接池中保持活躍的數(shù)于宙,0沒有限制
MaxActive int
// 空閑檢查時間
IdleTimeout time.Duration
// wait設(shè)置為true并且pool中活躍數(shù)到達設(shè)置的最大值,直到連接池中有可用連接浮驳,get()才返回
Wait bool
// 設(shè)置連接最大存活時間 0無限制
MaxConnLifetime time.Duration
// 統(tǒng)計、隊列等使用
mu sync.Mutex // mu protects the following fields
closed bool // set to true when the pool is closed.
active int // the number of open connections in the pool
initOnce sync.Once // the init ch once func
ch chan struct{} // limits open connections when p.Wait is true
idle idleList // idle connections
waitCount int64 // total number of connections waited for.
waitDuration time.Duration // total time waited for new connections.
}
Pool獲取連接
Get獲取
源碼 pool.go
func (p *Pool) Get() Conn
- wait設(shè)置等待
select {
case <-p.ch://當連接池滿時捞魁,會阻塞等待至会,直到有空閑連接
select {
case <-ctx.Done():
p.ch <- struct{}{}
return 0, ctx.Err()
default:
}
case <-ctx.Done():
return 0, ctx.Err()
}
當pool中設(shè)置了Wait,當連接滿時(p.ch獲取不到數(shù)據(jù))谱俭,會等待直到池中有空閑連接奉件,就會通知ch
看activeConn.close()會調(diào)用Pool.put(),此時連接池將會有空閑連接宵蛀,并且通知剛才等待的Wait ch
if p.ch != nil && !p.closed {
// 通知等待ch
p.ch <- struct{}{}
}
- 空閑時間判斷
if p.IdleTimeout > 0 {
n := p.idle.count
// 只需從尾部back判斷即可驗證是否過期
// 如果過期,刪除尾部县貌,釋放該連接术陶,并且繼續(xù)遍歷
for i := 0; i < n && p.idle.back != nil && p.idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ {
pc := p.idle.back
p.idle.popBack()
p.mu.Unlock()
pc.c.Close()
p.mu.Lock()
p.active--
}
}
我們可以先大致先看內(nèi)部連接池雙向鏈表的管理點擊跳轉(zhuǎn),也許這樣你會很容易的理解煤痕。
這里idletimeout遍歷整個鏈表梧宫,因為idle.back.t為最早插入的時間,所以只需要檢查尾部back即可。
- 從連接池頭部后取空閑連接
for p.idle.front != nil {
pc := p.idle.front
// 取出頭部
p.idle.popFront()
p.mu.Unlock()
// 校驗連接是否正常摆碉,一般我們設(shè)置回調(diào)ping取檢驗塘匣,
// 自然每次都多了一次請求,性能消耗
if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.c, pc.t) == nil) &&
(p.MaxConnLifetime == 0 || nowFunc().Sub(pc.created) < p.MaxConnLifetime) {
// 返回可用的連接
return &activeConn{p: p, pc: pc}, nil
}
// 校驗不通過巷帝,自然釋放該連接
pc.c.Close()
p.mu.Lock()
p.active--
}
因為上面的條件如果都校驗成功忌卤,說明鏈表頭部有數(shù)據(jù),我們只需pop出來楞泼,之后返回activeConn驰徊,即我們成功后去了一個連接。注意堕阔,這里activeConn很關(guān)鍵棍厂,里頭將最早初始化創(chuàng)建的p(pool)存入,并且將pc(poolConn)即從鏈表中取出的數(shù)據(jù)存起來超陆⊙埃基本上ac(activeConn)涵蓋了后續(xù)所有可以操作的數(shù)據(jù)。 詳細pc我們可以看下面
- 開始鏈表肯定是空的侥猬,如何獲取連接
p.active++
p.mu.Unlock()
// 這里撥號,即調(diào)用我們一開始注冊的回調(diào)pool中的Dial
// 這里就是創(chuàng)建線程池開始創(chuàng)建連接捐韩,即conn的管理
c, err := p.dial(ctx)
if err != nil {
p.mu.Lock()
p.active--
if p.ch != nil && !p.closed {
p.ch <- struct{}{}
}
p.mu.Unlock()
// 返回錯誤連接
return errorConn{err}, err
}
// pc中c為conn.go中conn的存儲退唠。
return &activeConn{p: p, pc: &poolConn{c: c, created: nowFunc()}}, nil
這里就是一開始,我們調(diào)用dial創(chuàng)建連接荤胁,并返回activConn得到連接瞧预。后續(xù)我們會分析conn.go
pool 中鏈表put的存放
func (p *Pool) put(pc *poolConn, forceClose bool) error {
p.mu.Lock()
if !p.closed && !forceClose {
pc.t = nowFunc()
// 報存隊列
p.idle.pushFront(pc)
// 超過設(shè)置,pop出時間有效時間最小的back連接
if p.idle.count > p.MaxIdle {
pc = p.idle.back
p.idle.popBack()
} else {
pc = nil
}
}
// back該連接不保存仅政,直接關(guān)閉
if pc != nil {
p.mu.Unlock()
pc.c.Close()
p.mu.Lock()
p.active--
}
// 上述以說明垢油,配合wait設(shè)置使用
if p.ch != nil && !p.closed {
p.ch <- struct{}{}
}
p.mu.Unlock()
return nil
}
當Pool.get獲取的連接,并沒有保存在連接池中圆丹,而是當activeConn.Close()時滩愁,才調(diào)用put,保存連接辫封。至此硝枉,pool中核心功能都已準備完畢廉丽。
idleList連接池管理
- pushFront鏈表存儲
func (l *idleList) pushFront(pc *poolConn) {
// 這里記住,idleList中front和back始終指向
// 的是連接池中的頭部和尾部
// 1 新的pc尾指針指向鏈表頭部
pc.next = l.front
pc.prev = nil
if l.count == 0 {
// 0.當連接池為空妻味,頭尾都指向改連接
l.back = pc
} else {
// 2. 鏈表頭前驅(qū)指針指向pc
l.front.prev = pc
}
// 3.修改l中front指向為新插入的pc
l.front = pc
l.count++
}
-
popFront刪除鏈表
conn.go
- 連接創(chuàng)建
// 調(diào)用net/dial.go庫進行連接
netConn, err := do.dialContext(ctx, network, address)
if err != nil {
return nil, err
}
c := &conn{
// 暫時我們研究的是返回:TCPConn
conn: netConn,
// bufio寫的也很好正压,后續(xù)對其分析
bw: bufio.NewWriter(netConn),
br: bufio.NewReader(netConn),
readTimeout: do.readTimeout,
writeTimeout: do.writeTimeout,
}
- 之后的activeConn調(diào)用的Do方法就是調(diào)用conn中的Do
if cmd != "" {
// RESP協(xié)議組包
if err := c.writeCommand(cmd, args); err != nil {
return nil, c.fatal(err)
}
}
// bufio用法,里頭Write為interface實際為TCPConn的操作
if err := c.bw.Flush(); err != nil {
return nil, c.fatal(err)
}
var deadline time.Time
if readTimeout != 0 {
deadline = time.Now().Add(readTimeout)
}
// read過期檢測
if err := c.conn.SetReadDeadline(deadline); err != nil {
return nil, c.fatal(err)
}
var err error
var reply interface{}
for i := 0; i <= pending; i++ {
var e error
// 獲取redis服務(wù)回包數(shù)據(jù)
if reply, e = c.readReply(); e != nil {
return nil, c.fatal(e)
}
if e, ok := reply.(Error); ok && err == nil {
err = e
}
}
Do方法調(diào)用的是DoWithTimeout责球,這里發(fā)起RESP協(xié)議組包焦履,并發(fā)送數(shù)據(jù)給redis服務(wù)端,之后讀取redis服務(wù)器返回的數(shù)據(jù)雏逾。
大家如果覺得有啥疑惑或者不正確嘉裤,都可以在評論或者加微信(dandyhzh)一起談?wù)摗?/p>