假設(shè) mobile_applications 表的字段只有兩個(gè) app_name, other_info菱魔,均為 varchar(256),先上一段簡(jiǎn)單的業(yè)務(wù)邏輯代碼
package main
import (
"database/sql"
_ "github.com/go-sql-driver/mysql"
)
func main() {
db, _ := sql.Open("mysql",
"root:123456@tcp(127.0.0.1:3306)/xg?charset=utf8&parseTime=true&loc=Local")
// tx1
tx, _ := db.Begin()
_, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
"第一個(gè)應(yīng)用", "第一個(gè)應(yīng)用的信息")
_, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
"第二個(gè)應(yīng)用", "第二個(gè)應(yīng)用的信息")
_ = tx.Commit()
// tx2
tx, _ = db.Begin()
_, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
"第三個(gè)應(yīng)用", "第三個(gè)應(yīng)用的信息")
_, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
"第四個(gè)應(yīng)用", "第四個(gè)應(yīng)用的信息")
//_ = tx.Commit()
// tx3
tx, _ = db.Begin()
_, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
"第五個(gè)應(yīng)用", "第五個(gè)應(yīng)用的信息")
_, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
"第六個(gè)應(yīng)用", "第六個(gè)應(yīng)用的信息")
_ = tx.Rollback()
}
執(zhí)行結(jié)果:
mysql> select * from mobile_applications;
+-----------------+--------------------------+------------------+---------------+
| app_name | other_info | xxx_unrecognized | xxx_sizecache |
+-----------------+--------------------------+------------------+---------------+
| 第一個(gè)應(yīng)用 | 第一個(gè)應(yīng)用的信息 | NULL | NULL |
| 第二個(gè)應(yīng)用 | 第二個(gè)應(yīng)用的信息 | NULL | NULL |
+-----------------+--------------------------+------------------+---------------+
2 rows in set (0.00 sec)
顯然,只有 tx1 最終落庫(kù)确买,tx2 和 tx3 都沒(méi)有落庫(kù)诅蝶。這個(gè)符合我們對(duì)事務(wù)的理解,即只有 commit 的操作才會(huì)最終落庫(kù)桑包。
這里我們首先要理解南蓬,對(duì)于計(jì)算機(jī)而言,什么是事務(wù)?數(shù)據(jù)庫(kù)系統(tǒng)概念里對(duì)“事務(wù)”的定義相信很多人耳熟能詳赘方,但是對(duì)于我們運(yùn)行的程序而言烧颖,事務(wù)實(shí)際上分為兩層,第一層是內(nèi)存中的上下文窄陡,第二層是DBMS控制的我們常說(shuō)的“事務(wù)”倒信。底層事務(wù)實(shí)際上是通過(guò)上下文來(lái)定義和操作的,中間隔了層 driver泳梆,屏蔽了具體的細(xì)節(jié)鳖悠。
以 go 中的 事務(wù)為例,是通過(guò)一個(gè) struct 來(lái)定義的优妙。
type Tx struct {
db *DB
// closemu prevents the transaction from closing while there
// is an active query. It is held for read during queries
// and exclusively during close.
closemu sync.RWMutex
// dc is owned exclusively until Commit or Rollback, at which point
// it's returned with putConn.
dc *driverConn
txi driver.Tx
// releaseConn is called once the Tx is closed to release
// any held driverConn back to the pool.
releaseConn func(error)
// done transitions from 0 to 1 exactly once, on Commit
// or Rollback. once done, all operations fail with
// ErrTxDone.
// Use atomic operations on value when checking value.
done int32
// All Stmts prepared for this transaction. These will be closed after the
// transaction has been committed or rolled back.
stmts struct {
sync.Mutex
v []*Stmt
}
// cancel is called after done transitions from 0 to 1.
cancel func()
// ctx lives for the life of the transaction.
ctx context.Context
}
其中核心是:
- driverConn
go 里面的數(shù)據(jù)庫(kù)連接乘综,封裝了 driver 里的數(shù)據(jù)庫(kù)連接。 - driver.Tx
定義了 commit 和 rollback 兩個(gè)方法套硼。
一個(gè)事務(wù)卡辰,實(shí)際上就是所有 CRUD 操作都在同一個(gè)數(shù)據(jù)庫(kù)連接里,調(diào)用 Begin 會(huì)通過(guò)該連接邪意,經(jīng)有 driver 執(zhí)行特定 DBMS 的事務(wù)開(kāi)啟指令九妈,比如 mysql 的 driver 就是
func (mc *mysqlConn) begin(readOnly bool) (driver.Tx, error) {
if mc.closed.IsSet() {
errLog.Print(ErrInvalidConn)
return nil, driver.ErrBadConn
}
var q string
if readOnly {
q = "START TRANSACTION READ ONLY"
} else {
q = "START TRANSACTION"
}
err := mc.exec(q)
if err == nil {
return &mysqlTx{mc}, err
}
return nil, mc.markBadConn(err)
}
后續(xù)操作就都在一個(gè)事務(wù)里了。上層可以通過(guò) Begin 方法返回的 Commit/Rollback 方法來(lái)提交或回滾這個(gè)事務(wù)雾鬼。那么萌朱,go 是如何實(shí)現(xiàn)對(duì)數(shù)據(jù)庫(kù)事務(wù)的支持的呢?我們從入口代碼一步一步來(lái)看策菜。
db.Beigin
driver.go 里定義了 ErrBadConn: driver 拋出的錯(cuò)誤晶疼,表示底層連接不可用或已中斷,上層應(yīng)該重新用一個(gè)連接
sql.go 里定義了兩個(gè)常量又憨,
-
cachedOrNewConn:
如果連接池里有 idle 狀態(tài)的連接翠霍,直接返回;如果連接池里的連接數(shù)已經(jīng)達(dá)到 MaxOpenCons 定義的數(shù)量蠢莺,則阻塞等待寒匙,直到有一個(gè)連接 idel;否則躏将,創(chuàng)建新的連接锄弱,加入到連接池,然后返回耸携。 -
alwaysNewConn:
強(qiáng)制使用新的連接而不是從接池里里復(fù)用
// Begin() 方法是帶參數(shù)版本的一個(gè)默認(rèn)版本
func (db *DB) Begin() (*Tx, error) {
return db.BeginTx(context.Background(), nil)
}
// 可以設(shè)置 Contxt 和 事務(wù)配置項(xiàng)
func (db *DB) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error) {
var tx *Tx
var err error
// 如果遇到 ErrBadConn 默認(rèn)會(huì)重試 2 次
for i := 0; i < maxBadConnRetries; i++ {
tx, err = db.begin(ctx, opts, cachedOrNewConn)
if err != driver.ErrBadConn {
break
}
}
if err == driver.ErrBadConn {
return db.begin(ctx, opts, alwaysNewConn)
}
return tx, err
}
func (db *DB) begin(ctx context.Context, opts *TxOptions, strategy connReuseStrategy) (tx *Tx, err error) {
dc, err := db.conn(ctx, strategy)
if err != nil {
return nil, err
}
return db.beginDC(ctx, dc, dc.releaseConn, opts)
}
外部調(diào)用最終進(jìn)入私有方法 begin 里棵癣,begin 主要完成兩個(gè)操作,一是獲取一個(gè)數(shù)據(jù)庫(kù)連接夺衍,二是創(chuàng)建事務(wù)的上下文狈谊,即開(kāi)啟事務(wù)。下面我們一一來(lái)看。
db.conn
這個(gè)方法很長(zhǎng)河劝,核心業(yè)務(wù)邏輯主要分為以下幾個(gè)部分:
- 檢查
首先檢查 db 是否關(guān)閉了壁榕,再檢查是否 context 過(guò)期了,若任一為是都會(huì)直接返回錯(cuò)誤
db.mu.Lock() // 臨界區(qū)的結(jié)束點(diǎn)不同情況不相同
if db.closed {
db.mu.Unlock()
return nil, errDBClosed
}
// Check if the context is expired.
select {
default:
case <-ctx.Done():
db.mu.Unlock()
return nil, ctx.Err()
}
lifetime := db.maxLifetime // 后面邏輯使用
- cachedOrNewConn 且 存在 idle 的連接
// db.freeConn 是一個(gè) slice赎瞎,存儲(chǔ)了 idle 的連接
numFree := len(db.freeConn)
if strategy == cachedOrNewConn && numFree > 0 {
conn := db.freeConn[0]
// 移走第一個(gè)
copy(db.freeConn, db.freeConn[1:])
db.freeConn = db.freeConn[:numFree-1]
conn.inUse = true
db.mu.Unlock() // 釋放鎖了
// 判斷連接是否過(guò)期了
if conn.expired(lifetime) {
conn.Close()
return nil, driver.ErrBadConn
}
// lastErr 字段的意義沒(méi)看懂牌里,好像在返回一個(gè)連接之前都要檢查這個(gè)錯(cuò)誤有沒(méi)有設(shè)置
// Lock around reading lastErr to ensure the session resetter finished.
conn.Lock()
err := conn.lastErr
conn.Unlock()
if err == driver.ErrBadConn {
conn.Close()
return nil, driver.ErrBadConn
}
return conn, nil
}
- 沒(méi)有空閑連接了,或者強(qiáng)制使用新連接务甥。
- 連接池達(dá)到最大了
連接池達(dá)到最大了牡辽,就必須阻塞,這里用了一個(gè) channel敞临,類(lèi)似于 Java 的條件隊(duì)列
- 連接池達(dá)到最大了
// Make the connRequest channel. It's buffered so that the
// connectionOpener doesn't block while waiting for the req to be read.
req := make(chan connRequest, 1)
reqKey := db.nextRequestKeyLocked()
db.connRequests[reqKey] = req
db.waitCount++
db.mu.Unlock()
waitStart := time.Now()
// Timeout the connection request with the context.
select {
// Context 結(jié)束了
case <-ctx.Done():
// Remove the connection request and ensure no value has been sent
// on it after removing.
db.mu.Lock()
delete(db.connRequests, reqKey)
db.mu.Unlock()
atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
select {
default:
// 已經(jīng)發(fā)出去了态辛,且創(chuàng)建成功了,那么這個(gè)連接就需要加入連接池
// 但是不清楚為什么這里沒(méi)有驗(yàn)證了
case ret, ok := <-req:
if ok && ret.conn != nil {
db.putConn(ret.conn, ret.err, false)
}
}
return nil, ctx.Err()
// 收到返回
case ret, ok := <-req:
atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
// channel 是被關(guān)閉了的
if !ok {
return nil, errDBClosed
}
// 超時(shí)了
if ret.err == nil && ret.conn.expired(lifetime) {
ret.conn.Close()
return nil, driver.ErrBadConn
}
if ret.conn == nil {
return nil, ret.err
}
// Lock around reading lastErr to ensure the session resetter finished.
ret.conn.Lock()
err := ret.conn.lastErr
ret.conn.Unlock()
if err == driver.ErrBadConn {
ret.conn.Close()
return nil, driver.ErrBadConn
}
return ret.conn, ret.err
}
- 連接池還沒(méi)有達(dá)到最大
直接新建連接
db.numOpen++ // optimistically
db.mu.Unlock()
ci, err := db.connector.Connect(ctx)
if err != nil {
db.mu.Lock()
db.numOpen-- // correct for earlier optimism
db.maybeOpenNewConnections()
db.mu.Unlock()
return nil, err
}
db.mu.Lock()
dc := &driverConn{
db: db,
createdAt: nowFunc(),
ci: ci,
inUse: true,
}
db.addDepLocked(dc, dc)
db.mu.Unlock()
return dc, nil
putConnDBLocked
前面提到挺尿,當(dāng)連接池已滿(mǎn)且沒(méi)有 idel 連接的時(shí)候奏黑,是通過(guò)注冊(cè)了一個(gè) channel 來(lái)異步接收 free 連接的通知的。維護(hù)所有 channel 的是
connReqeusts map[uint64] chan connRequest
type connRequest struct {
conn *driverConn
err error
}
使用完一個(gè)連接后编矾,需要“歸還”連接給 DB熟史。DB 的邏輯是:
如果存在 connRequest,會(huì)將 dc 直接交給它
否則窄俏,放入到連接池里蹂匹,標(biāo)記為 idel,并啟動(dòng)清理線程裆操,關(guān)閉那些超時(shí)的連接
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
if db.closed {
return false
}
if db.maxOpen > 0 && db.numOpen > db.maxOpen {
return false
}
if c := len(db.connRequests); c > 0 {
var req chan connRequest
var reqKey uint64
for reqKey, req = range db.connRequests {
break
}
delete(db.connRequests, reqKey) // Remove from pending requests.
if err == nil {
dc.inUse = true
}
req <- connRequest{
conn: dc,
err: err,
}
return true
} else if err == nil && !db.closed {
if db.maxIdleConnsLocked() > len(db.freeConn) {
db.freeConn = append(db.freeConn, dc)
db.startCleanerLocked()
return true
}
db.maxIdleClosed++
}
return false
}
db.beginDC
前面的操作完成了連接的獲扰辍(創(chuàng)建/釋放),下面就要啟動(dòng)一個(gè)事務(wù)踪区。
// beginDC starts a transaction. The provided dc must be valid and ready to use.
func (db *DB) beginDC(ctx context.Context, dc *driverConn, release func(error), opts *TxOptions) (tx *Tx, err error) {
var txi driver.Tx
withLock(dc, func() { // 就是個(gè)包裝方法,加鎖吊骤,執(zhí)行函數(shù)缎岗,放鎖
txi, err = ctxDriverBegin(ctx, opts, dc.ci) // 調(diào)用 driver 開(kāi)啟事務(wù)
})
if err != nil {
release(err)
return nil, err
}
// 后面的主要是暴露上下文,注冊(cè)資源清理回調(diào)
// Schedule the transaction to rollback when the context is cancelled.
// The cancel function in Tx will be called after done is set to true.
ctx, cancel := context.WithCancel(ctx)
tx = &Tx{
db: db,
dc: dc,
releaseConn: release,
txi: txi,
cancel: cancel,
ctx: ctx,
}
go tx.awaitDone()
return tx, nil
}
// awaitDone blocks until the context in Tx is canceled and rolls back
// the transaction if it's not already done.
func (tx *Tx) awaitDone() {
// 如果在事務(wù)提交/回滾前白粉,就結(jié)束阻塞传泊,說(shuō)明 Context結(jié)束了,那就要執(zhí)行資源清理
<-tx.ctx.Done()
// 關(guān)必并從連接池里刪除這個(gè)連接鸭巴,來(lái)保證事務(wù)已經(jīng)關(guān)閉眷细、資源得到釋放
// 對(duì)于已經(jīng)提交/回滾的事務(wù),這個(gè)方法不會(huì)由任何影響
tx.rollback(true)
}