Go 數(shù)據(jù)庫(kù)事務(wù)的源碼分析

假設(shè) mobile_applications 表的字段只有兩個(gè) app_name, other_info菱魔,均為 varchar(256),先上一段簡(jiǎn)單的業(yè)務(wù)邏輯代碼

package main

import (
    "database/sql"
    _ "github.com/go-sql-driver/mysql"
)

func main() {
    db, _ := sql.Open("mysql",
        "root:123456@tcp(127.0.0.1:3306)/xg?charset=utf8&parseTime=true&loc=Local")
// tx1
    tx, _ := db.Begin()
    _, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
        "第一個(gè)應(yīng)用", "第一個(gè)應(yīng)用的信息")
    _, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
        "第二個(gè)應(yīng)用", "第二個(gè)應(yīng)用的信息")
    _ = tx.Commit()

// tx2
    tx, _ = db.Begin()
    _, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
        "第三個(gè)應(yīng)用", "第三個(gè)應(yīng)用的信息")
    _, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
        "第四個(gè)應(yīng)用", "第四個(gè)應(yīng)用的信息")
    //_ = tx.Commit()

// tx3
    tx, _ = db.Begin()
    _, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
        "第五個(gè)應(yīng)用", "第五個(gè)應(yīng)用的信息")
    _, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
        "第六個(gè)應(yīng)用", "第六個(gè)應(yīng)用的信息")
    _ = tx.Rollback()
}

執(zhí)行結(jié)果:

mysql> select * from mobile_applications;
+-----------------+--------------------------+------------------+---------------+
| app_name        | other_info               | xxx_unrecognized | xxx_sizecache |
+-----------------+--------------------------+------------------+---------------+
| 第一個(gè)應(yīng)用      | 第一個(gè)應(yīng)用的信息         | NULL             |          NULL |
| 第二個(gè)應(yīng)用      | 第二個(gè)應(yīng)用的信息         | NULL             |          NULL |
+-----------------+--------------------------+------------------+---------------+
2 rows in set (0.00 sec)

顯然,只有 tx1 最終落庫(kù)确买,tx2 和 tx3 都沒(méi)有落庫(kù)诅蝶。這個(gè)符合我們對(duì)事務(wù)的理解,即只有 commit 的操作才會(huì)最終落庫(kù)桑包。

這里我們首先要理解南蓬,對(duì)于計(jì)算機(jī)而言,什么是事務(wù)?數(shù)據(jù)庫(kù)系統(tǒng)概念里對(duì)“事務(wù)”的定義相信很多人耳熟能詳赘方,但是對(duì)于我們運(yùn)行的程序而言烧颖,事務(wù)實(shí)際上分為兩層,第一層是內(nèi)存中的上下文窄陡,第二層是DBMS控制的我們常說(shuō)的“事務(wù)”倒信。底層事務(wù)實(shí)際上是通過(guò)上下文來(lái)定義和操作的,中間隔了層 driver泳梆,屏蔽了具體的細(xì)節(jié)鳖悠。
以 go 中的 事務(wù)為例,是通過(guò)一個(gè) struct 來(lái)定義的优妙。

type Tx struct {
    db *DB

    // closemu prevents the transaction from closing while there
    // is an active query. It is held for read during queries
    // and exclusively during close.
    closemu sync.RWMutex

    // dc is owned exclusively until Commit or Rollback, at which point
    // it's returned with putConn.
    dc  *driverConn
    txi driver.Tx

    // releaseConn is called once the Tx is closed to release
    // any held driverConn back to the pool.
    releaseConn func(error)

    // done transitions from 0 to 1 exactly once, on Commit
    // or Rollback. once done, all operations fail with
    // ErrTxDone.
    // Use atomic operations on value when checking value.
    done int32

    // All Stmts prepared for this transaction. These will be closed after the
    // transaction has been committed or rolled back.
    stmts struct {
        sync.Mutex
        v []*Stmt
    }

    // cancel is called after done transitions from 0 to 1.
    cancel func()

    // ctx lives for the life of the transaction.
    ctx context.Context
}

其中核心是:

  • driverConn
    go 里面的數(shù)據(jù)庫(kù)連接乘综,封裝了 driver 里的數(shù)據(jù)庫(kù)連接。
  • driver.Tx
    定義了 commit 和 rollback 兩個(gè)方法套硼。

一個(gè)事務(wù)卡辰,實(shí)際上就是所有 CRUD 操作都在同一個(gè)數(shù)據(jù)庫(kù)連接里,調(diào)用 Begin 會(huì)通過(guò)該連接邪意,經(jīng)有 driver 執(zhí)行特定 DBMS 的事務(wù)開(kāi)啟指令九妈,比如 mysql 的 driver 就是

func (mc *mysqlConn) begin(readOnly bool) (driver.Tx, error) {
    if mc.closed.IsSet() {
        errLog.Print(ErrInvalidConn)
        return nil, driver.ErrBadConn
    }
    var q string
    if readOnly {
        q = "START TRANSACTION READ ONLY"
    } else {
        q = "START TRANSACTION"
    }
    err := mc.exec(q)
    if err == nil {
        return &mysqlTx{mc}, err
    }
    return nil, mc.markBadConn(err)
}

后續(xù)操作就都在一個(gè)事務(wù)里了。上層可以通過(guò) Begin 方法返回的 Commit/Rollback 方法來(lái)提交或回滾這個(gè)事務(wù)雾鬼。那么萌朱,go 是如何實(shí)現(xiàn)對(duì)數(shù)據(jù)庫(kù)事務(wù)的支持的呢?我們從入口代碼一步一步來(lái)看策菜。

db.Beigin

driver.go 里定義了 ErrBadConn: driver 拋出的錯(cuò)誤晶疼,表示底層連接不可用或已中斷,上層應(yīng)該重新用一個(gè)連接
sql.go 里定義了兩個(gè)常量又憨,

  • cachedOrNewConn:
    如果連接池里有 idle 狀態(tài)的連接翠霍,直接返回;如果連接池里的連接數(shù)已經(jīng)達(dá)到 MaxOpenCons 定義的數(shù)量蠢莺,則阻塞等待寒匙,直到有一個(gè)連接 idel;否則躏将,創(chuàng)建新的連接锄弱,加入到連接池,然后返回耸携。
  • alwaysNewConn:
    強(qiáng)制使用新的連接而不是從接池里里復(fù)用
// Begin() 方法是帶參數(shù)版本的一個(gè)默認(rèn)版本
func (db *DB) Begin() (*Tx, error) {
    return db.BeginTx(context.Background(), nil)
}

// 可以設(shè)置 Contxt 和 事務(wù)配置項(xiàng)
func (db *DB) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error) {
    var tx *Tx
    var err error
// 如果遇到 ErrBadConn 默認(rèn)會(huì)重試 2 次
    for i := 0; i < maxBadConnRetries; i++ {
        tx, err = db.begin(ctx, opts, cachedOrNewConn)
        if err != driver.ErrBadConn {
            break
        }
    }
    if err == driver.ErrBadConn {
        return db.begin(ctx, opts, alwaysNewConn)
    }
    return tx, err
}

func (db *DB) begin(ctx context.Context, opts *TxOptions, strategy connReuseStrategy) (tx *Tx, err error) {
    dc, err := db.conn(ctx, strategy)
    if err != nil {
        return nil, err
    }
    return db.beginDC(ctx, dc, dc.releaseConn, opts)
}

外部調(diào)用最終進(jìn)入私有方法 begin 里棵癣,begin 主要完成兩個(gè)操作,一是獲取一個(gè)數(shù)據(jù)庫(kù)連接夺衍,二是創(chuàng)建事務(wù)的上下文狈谊,即開(kāi)啟事務(wù)。下面我們一一來(lái)看。

db.conn

這個(gè)方法很長(zhǎng)河劝,核心業(yè)務(wù)邏輯主要分為以下幾個(gè)部分:

  • 檢查
    首先檢查 db 是否關(guān)閉了壁榕,再檢查是否 context 過(guò)期了,若任一為是都會(huì)直接返回錯(cuò)誤
db.mu.Lock() // 臨界區(qū)的結(jié)束點(diǎn)不同情況不相同
if db.closed {
  db.mu.Unlock()
  return nil, errDBClosed
}
// Check if the context is expired.
select {
default:
  case <-ctx.Done():
  db.mu.Unlock()
  return nil, ctx.Err()
}

lifetime := db.maxLifetime // 后面邏輯使用
  • cachedOrNewConn 且 存在 idle 的連接
// db.freeConn 是一個(gè) slice赎瞎,存儲(chǔ)了 idle 的連接
numFree := len(db.freeConn)
if strategy == cachedOrNewConn && numFree > 0 {
  conn := db.freeConn[0]
// 移走第一個(gè)
  copy(db.freeConn, db.freeConn[1:])
  db.freeConn = db.freeConn[:numFree-1]

  conn.inUse = true
  db.mu.Unlock() // 釋放鎖了
// 判斷連接是否過(guò)期了
  if conn.expired(lifetime) {
    conn.Close()
    return nil, driver.ErrBadConn
  }
  // lastErr 字段的意義沒(méi)看懂牌里,好像在返回一個(gè)連接之前都要檢查這個(gè)錯(cuò)誤有沒(méi)有設(shè)置
  // Lock around reading lastErr to ensure the session resetter finished.
  conn.Lock()
  err := conn.lastErr
  conn.Unlock()
  if err == driver.ErrBadConn {
    conn.Close()
    return nil, driver.ErrBadConn
  }
  return conn, nil
}
  • 沒(méi)有空閑連接了,或者強(qiáng)制使用新連接务甥。
    • 連接池達(dá)到最大了
      連接池達(dá)到最大了牡辽,就必須阻塞,這里用了一個(gè) channel敞临,類(lèi)似于 Java 的條件隊(duì)列
// Make the connRequest channel. It's buffered so that the
// connectionOpener doesn't block while waiting for the req to be read.
req := make(chan connRequest, 1)
reqKey := db.nextRequestKeyLocked()
db.connRequests[reqKey] = req
db.waitCount++
db.mu.Unlock()

waitStart := time.Now()

// Timeout the connection request with the context.
select {
  // Context 結(jié)束了
  case <-ctx.Done():
    // Remove the connection request and ensure no value has been sent
    // on it after removing.
    db.mu.Lock()
    delete(db.connRequests, reqKey)
    db.mu.Unlock()

    atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
    
    select {
      default:
      // 已經(jīng)發(fā)出去了态辛,且創(chuàng)建成功了,那么這個(gè)連接就需要加入連接池
      // 但是不清楚為什么這里沒(méi)有驗(yàn)證了
      case ret, ok := <-req:
        if ok && ret.conn != nil {
          db.putConn(ret.conn, ret.err, false)
        }
    }
    return nil, ctx.Err()
  // 收到返回
  case ret, ok := <-req:
    atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
    // channel 是被關(guān)閉了的
    if !ok {
      return nil, errDBClosed
    }
    // 超時(shí)了
    if ret.err == nil && ret.conn.expired(lifetime) {
      ret.conn.Close()
      return nil, driver.ErrBadConn
    }
    if ret.conn == nil {
      return nil, ret.err
    }
    // Lock around reading lastErr to ensure the session resetter finished.
    ret.conn.Lock()
    err := ret.conn.lastErr
    ret.conn.Unlock()
    if err == driver.ErrBadConn {
      ret.conn.Close()
      return nil, driver.ErrBadConn
    }
    return ret.conn, ret.err
}
  • 連接池還沒(méi)有達(dá)到最大
    直接新建連接
db.numOpen++ // optimistically
db.mu.Unlock()
ci, err := db.connector.Connect(ctx)
if err != nil {
  db.mu.Lock()
  db.numOpen-- // correct for earlier optimism
  db.maybeOpenNewConnections()
  db.mu.Unlock()
  return nil, err
}
db.mu.Lock()
dc := &driverConn{
  db:        db,
  createdAt: nowFunc(),
  ci:        ci,
  inUse:     true,
}
db.addDepLocked(dc, dc)
db.mu.Unlock()
return dc, nil

putConnDBLocked

前面提到挺尿,當(dāng)連接池已滿(mǎn)且沒(méi)有 idel 連接的時(shí)候奏黑,是通過(guò)注冊(cè)了一個(gè) channel 來(lái)異步接收 free 連接的通知的。維護(hù)所有 channel 的是

connReqeusts map[uint64] chan connRequest

type connRequest struct {
    conn *driverConn
    err  error
}

使用完一個(gè)連接后编矾,需要“歸還”連接給 DB熟史。DB 的邏輯是:
如果存在 connRequest,會(huì)將 dc 直接交給它
否則窄俏,放入到連接池里蹂匹,標(biāo)記為 idel,并啟動(dòng)清理線程裆操,關(guān)閉那些超時(shí)的連接

func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
    if db.closed {
        return false
    }
    if db.maxOpen > 0 && db.numOpen > db.maxOpen {
        return false
    }
    if c := len(db.connRequests); c > 0 {
        var req chan connRequest
        var reqKey uint64
        for reqKey, req = range db.connRequests {
            break
        }
        delete(db.connRequests, reqKey) // Remove from pending requests.
        if err == nil {
            dc.inUse = true
        }
        req <- connRequest{
            conn: dc,
            err:  err,
        }
        return true
    } else if err == nil && !db.closed {
        if db.maxIdleConnsLocked() > len(db.freeConn) {
            db.freeConn = append(db.freeConn, dc)
            db.startCleanerLocked()
            return true
        }
        db.maxIdleClosed++
    }
    return false
}

db.beginDC

前面的操作完成了連接的獲扰辍(創(chuàng)建/釋放),下面就要啟動(dòng)一個(gè)事務(wù)踪区。

// beginDC starts a transaction. The provided dc must be valid and ready to use.
func (db *DB) beginDC(ctx context.Context, dc *driverConn, release func(error), opts *TxOptions) (tx *Tx, err error) {
    var txi driver.Tx
    withLock(dc, func() { // 就是個(gè)包裝方法,加鎖吊骤,執(zhí)行函數(shù)缎岗,放鎖
        txi, err = ctxDriverBegin(ctx, opts, dc.ci) // 調(diào)用 driver 開(kāi)啟事務(wù)
    })
    if err != nil {
        release(err)
        return nil, err
    }
        // 后面的主要是暴露上下文,注冊(cè)資源清理回調(diào)
    // Schedule the transaction to rollback when the context is cancelled.
    // The cancel function in Tx will be called after done is set to true.
    ctx, cancel := context.WithCancel(ctx)
    tx = &Tx{
        db:          db,
        dc:          dc,
        releaseConn: release,
        txi:         txi,
        cancel:      cancel,
        ctx:         ctx,
    }
    go tx.awaitDone()
    return tx, nil
}

// awaitDone blocks until the context in Tx is canceled and rolls back
// the transaction if it's not already done.
func (tx *Tx) awaitDone() {
    // 如果在事務(wù)提交/回滾前白粉,就結(jié)束阻塞传泊,說(shuō)明 Context結(jié)束了,那就要執(zhí)行資源清理
    <-tx.ctx.Done()

    // 關(guān)必并從連接池里刪除這個(gè)連接鸭巴,來(lái)保證事務(wù)已經(jīng)關(guān)閉眷细、資源得到釋放
    // 對(duì)于已經(jīng)提交/回滾的事務(wù),這個(gè)方法不會(huì)由任何影響
    tx.rollback(true)
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末鹃祖,一起剝皮案震驚了整個(gè)濱河市溪椎,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖校读,帶你破解...
    沈念sama閱讀 218,682評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件沼侣,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡歉秫,警方通過(guò)查閱死者的電腦和手機(jī)蛾洛,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)雁芙,“玉大人轧膘,你說(shuō)我怎么就攤上這事⊥酶剩” “怎么了谎碍?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,083評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)裂明。 經(jīng)常有香客問(wèn)我椿浓,道長(zhǎng),這世上最難降的妖魔是什么闽晦? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,763評(píng)論 1 295
  • 正文 為了忘掉前任扳碍,我火速辦了婚禮,結(jié)果婚禮上仙蛉,老公的妹妹穿的比我還像新娘笋敞。我一直安慰自己,他們只是感情好荠瘪,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,785評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布夯巷。 她就那樣靜靜地躺著,像睡著了一般哀墓。 火紅的嫁衣襯著肌膚如雪趁餐。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,624評(píng)論 1 305
  • 那天篮绰,我揣著相機(jī)與錄音后雷,去河邊找鬼。 笑死吠各,一個(gè)胖子當(dāng)著我的面吹牛臀突,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播贾漏,決...
    沈念sama閱讀 40,358評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼候学,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了纵散?” 一聲冷哼從身側(cè)響起梳码,我...
    開(kāi)封第一講書(shū)人閱讀 39,261評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤隐圾,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后边翁,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體翎承,經(jīng)...
    沈念sama閱讀 45,722評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年符匾,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了叨咖。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,030評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡啊胶,死狀恐怖甸各,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情焰坪,我是刑警寧澤趣倾,帶...
    沈念sama閱讀 35,737評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站某饰,受9級(jí)特大地震影響儒恋,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜黔漂,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,360評(píng)論 3 330
  • 文/蒙蒙 一诫尽、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧炬守,春花似錦牧嫉、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,941評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至鳍置,卻和暖如春辽剧,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背税产。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,057評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工抖仅, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人砖第。 一個(gè)月前我還...
    沈念sama閱讀 48,237評(píng)論 3 371
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像环凿,于是被迫代替她去往敵國(guó)和親梧兼。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,976評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容