Intro
最近正在給 mysql
封裝一個(gè)庫(kù),順帶研究一下 go-mysql-driver
這個(gè)庫(kù)的源碼實(shí)現(xiàn)。
Buffer.go
buffer
是一個(gè)用于給 數(shù)據(jù)庫(kù)連接 (net.Conn
) 進(jìn)行緩沖的一個(gè)數(shù)據(jù)結(jié)構(gòu),其結(jié)構(gòu)為:
type buffer struct {
buf []byte // 緩沖池中的數(shù)據(jù)
nc net.Conn // 負(fù)責(zé)緩沖的數(shù)據(jù)庫(kù)連接對(duì)象
idx int // 已讀數(shù)據(jù)索引
length int // 緩沖池中未讀數(shù)據(jù)的長(zhǎng)度
timeout time.Duration // 數(shù)據(jù)庫(kù)連接的超時(shí)設(shè)置
}
可以看到,因?yàn)? 數(shù)據(jù)庫(kù)連接 (net.Conn
) 在通信的時(shí)候是 同步 的。而為了讓其能夠 同時(shí) 讀/寫(xiě) ,所以實(shí)現(xiàn)了 buffer
這個(gè)數(shù)據(jù)結(jié)構(gòu)涡上,通過(guò)該 buffer
進(jìn)行數(shù)據(jù)緩沖還能實(shí)現(xiàn) 零拷貝 ( zero-copy-ish ) 。
其函數(shù)分別有:
newBuffer(nc net.Conn) buffer
:創(chuàng)建并返回一個(gè)buffer
(*buffer) readNext(need int) ([]byte, error)
:讀取并返回未讀數(shù)據(jù)的 need 位寨辩,如果 need 大于buffer
的length
吓懈,就會(huì)調(diào)用fill(need int) error
對(duì)buffer
進(jìn)行 擴(kuò)容 。(*buffer) fill(need int) error
:對(duì)buffer
進(jìn)行(need/defaultBufSize)
的倍數(shù)擴(kuò)容靡狞,并在timeout
時(shí)間結(jié)束前從buffer.nc
中讀取 need 長(zhǎng)度的數(shù)據(jù)耻警。(*buffer) takeBuffer(length int) []byte
:讀取buffer
中length
長(zhǎng)度的數(shù)據(jù)(只包含已讀),如果buffer.length > 0
甸怕,即還有未讀數(shù)據(jù)甘穿,則立即返回nil
。如果需要讀取的長(zhǎng)度大于buffer
的容量梢杭,則會(huì)進(jìn)行擴(kuò)容温兼。(*buffer) takeSmallBuffer(length int) []byte
:讀取保證不超過(guò)defaultBufSize
長(zhǎng)度的數(shù)據(jù)的快捷函數(shù)(只包含已讀),如果buffer.length > 0
武契,即還有未讀數(shù)據(jù)募判,則立即返回nil
荡含。(*buffer) takeCompleteBuffer() []byte
: 讀取全部的buffer
數(shù)據(jù)(只包含已讀),如果buffer.length > 0
届垫,即還有未讀數(shù)據(jù)释液,則立即返回nil
。
Collations.go
collations
包含了 MySQL
所有支持的 字符集 格式装处,并支持通過(guò) COLLATION_NAME
返回其字符集 ID
误债。
如果需要查詢 MySQL
支持的 字符集 格式,可以使用 SELECT COLLATION_NAME, ID FROM information_schema.COLLATIONS
語(yǔ)句獲取妄迁。
Dsn.go
DSN
即 數(shù)據(jù)源名稱 (Data Source Name) 寝蹈,是 驅(qū)動(dòng)程序連接數(shù)據(jù)庫(kù)的變量信息 ,簡(jiǎn)而言之就是根據(jù)你連接的不同數(shù)據(jù)庫(kù)使用對(duì)應(yīng)的連接信息登淘。
通常箫老,數(shù)據(jù)庫(kù)的連接配置就是在這里定義的:
// Config 基本的數(shù)據(jù)庫(kù)連接信息
type Config struct {
User string // Username
Passwd string // Password (requires User)
Net string // Network type
Addr string // Network address (requires Net)
DBName string // Database name
Params map[string]string // Connection parameters
Collation string // Connection collation
Loc *time.Location // Location for time.Time values
TLSConfig string // TLS configuration name
tls *tls.Config // TLS configuration
Timeout time.Duration // Dial timeout
ReadTimeout time.Duration // I/O read timeout
WriteTimeout time.Duration // I/O write timeout
AllowAllFiles bool // 允許文件使用 LOAD DATA LOCAL INFILE 導(dǎo)入數(shù)據(jù)庫(kù)
AllowCleartextPasswords bool // 支持明文密碼客戶端
AllowOldPasswords bool // 允許使用不可靠的舊密碼
ClientFoundRows bool // 返回匹配的行數(shù)而不是受影響的行數(shù)
ColumnsWithAlias bool // 將表名前置在列名
InterpolateParams bool // 將占位符插入查詢的SQL字符串
MultiStatements bool // 允許一條語(yǔ)句多次查詢
ParseTime bool // 格式化時(shí)間值為 time.Time 變量
Strict bool // 將 warnings 返回 errors
}
這都是一些常見(jiàn)的配置項(xiàng),就此略過(guò)黔州。
該文件有兩個(gè)公共函數(shù)支持 Config
與 DSN
之間轉(zhuǎn)換槽惫。
(*Config)FormatDSN() string
ParseDSN(dsn string) (*Config, error)
Errors.go
errors
定義了 Logger
、MySQLError
辩撑、 MySQLWarning
等數(shù)據(jù)結(jié)構(gòu)。
Logger
復(fù)用了 Go
原生的 log
包仿耽,并將其中的輸出重定向至控制臺(tái)的 標(biāo)準(zhǔn)錯(cuò)誤 合冀。
type Logger interface {
Print(v ...interface{})
}
var errLog = Logger(log.New(os.Stderr, "[mysql]", log.Ldate|log.Ltime|log.Lshortfile))
func SetLogger(logger Logger) error { // 當(dāng)然,你也可以使用自定義的錯(cuò)誤 Logger
if logger == nil {
return errors.New("logger is nil")
}
errLog =logger
return nil
}
MySQLError
而 MySQLError
則簡(jiǎn)單定義了 MySQL
輸出的錯(cuò)誤的結(jié)構(gòu)项贺。
type MySQLError struct {
Number uint16
Message string
}
MySQLWarning
MySQLWarning
則有些不一樣君躺,它需要從 MySQL
中進(jìn)行一次 查詢 ,以獲取所有的警告信息开缎,所以該包也定義了 MySQLWarning
的 slice
結(jié)構(gòu)棕叫。
type MySQLWarning struct {
Level string
Code string
Message string
}
type MySQLWarnings []MySQLWarning
func (mc *mysqlConn) getWarnings() (err error) {
rows, err := mc.Query("SHOW WARNINGS", nil)
// handle err
// initzation MySQLWarnings
for {
err = rows.Next(values)
switch err {
case nil:
warning := MySQLWarning{}
if raw, ok := values[0].([]byte); ok {
warning.Level = string(raw)
}else {
warning.Level = fmt.Sprintf("%s", values[0])
}
if raw, ok := values[1].([]byte); ok {
warning.Code = string(raw)
} else {
warning.Code = fmt.Sprintf("%s", values[1])
}
if raw, ok := values[2].([]byte); ok {
warning.Message = string(raw)
} else {
warning.Message = fmt.Sprintf("%s", values[0])
}
warnings = append(warnings, warning)
}
case io.EOF:
return warnings
default:
rows.Close() // 值得注意的是,如果該函數(shù)沒(méi)有 case 運(yùn)行 default 奕删,該 rows 就不會(huì)被默認(rèn)關(guān)閉俺泣,就會(huì)占用連接池中的一個(gè)連接,是否應(yīng)該使用 `defer rows.Close() ` 避免該情況完残?
return
}
}
Infile.go
前面也有提到 MySQL
在導(dǎo)入大型文件的時(shí)候伏钠,需要使用 LOAD DATA LOCAL INFILE
的形式進(jìn)行導(dǎo)入,而該 infile.go
就是實(shí)現(xiàn)該協(xié)議的代碼谨设。
本包在實(shí)現(xiàn)的 LOAD DATA
的時(shí)候提供了兩種方式進(jìn)行導(dǎo)入:
最常見(jiàn)的熟掂,使用服務(wù)器的文件路徑,如
/data/students.csv
扎拣,下文命名其為 文件路徑注冊(cè)器最通用的赴肚,使用實(shí)現(xiàn)了
io.Reader
接口的數(shù)據(jù)結(jié)構(gòu)素跺,通過(guò)返回該數(shù)據(jù)結(jié)構(gòu)的數(shù)據(jù)進(jìn)行導(dǎo)入,如bytes
os.file
等誉券,下文命名其為 Reader 接口注冊(cè)器
在實(shí)現(xiàn)該功能的時(shí)候指厌,注冊(cè)器 的實(shí)現(xiàn)是用名字作為 Key 的 Map
,為了避免 Map
的 讀寫(xiě)競(jìng)態(tài) 横朋,需要對(duì)其配置一個(gè)讀寫(xiě)鎖仑乌。
var (
fileRegister map[string]bool // 文件路徑注冊(cè)器
fileRegisterLock sync.RWMutex // 文件路徑注冊(cè)器讀寫(xiě)鎖
readerRegister map[string]func() io.Reader // Reader 接口注冊(cè)器
readerRegisterLock sync.RWMutex // Reader 接口注冊(cè)器讀寫(xiě)鎖
)
除了對(duì)兩個(gè)注冊(cè)器的 注冊(cè) 以及 注銷 函數(shù),還有一個(gè)需要分析的一個(gè)函數(shù):
(mc *mysqlConn) handleInFileRequest(name string) (err error)
通過(guò)傳入 文件路徑 或者 Reader 名稱 就可以將數(shù)據(jù)發(fā)往 MySQL
了琴锭。
func (mc *mysqlConn) handleInFileRequest(name string) (err error) {
packSize := 16 * 1024 // 16KB is small enough for disk readahead and large enough for TCP
if mc.maxWriteSize < packSize { // 設(shè)置發(fā)往 MySQL 的數(shù)據(jù)塊大小
packSize = mc.maxWriteSize
}
// 獲取 文件 或 Reader 的數(shù)據(jù)晰甚,并將其賦值到 rdr 中
// var rdr io.Reader
// send context packets
if err != nil {
data := make([]byte, 4+packetSize) // 需要留 4 個(gè) byte 給協(xié)議使用
var n int
for err == nil {
n, err = rdr.Read(data[4:]) // 將數(shù)據(jù)存入 data 的 [4:] 中
if n > 0 {
if ioErr := mc.writePacket(data[:4+n]); ioErr != nil { // 將 data 數(shù)據(jù)發(fā)往 MySQL
return ioErr
}
}
}
if err == io.EOF { // rdr 中的數(shù)據(jù)讀完了
err = nil
}
}
// send empty packet (termination)
if data == nil {
data = make([]byte, 4)
}
if ioErr := mc.writePacket(data[:4]); ioErr != nil { // 告訴 MySQL 文件發(fā)送完畢
return ioErr
}
// read OK packet
if err == nil { // 一切正常結(jié)束
return mc.readResultOK()
}
mc.readPacket() // 如果中途出錯(cuò),將錯(cuò)誤信息讀取到 mysqlConn 中决帖,并返回該錯(cuò)誤
return err
}
到此厕九,infile.go
的實(shí)現(xiàn)已經(jīng)整理完畢了,可以看到地回, 作者 在實(shí)現(xiàn)這個(gè)功能的時(shí)候還是做了一些優(yōu)化的扁远,比如 map Lazy init
,send packet size limited
等刻像。而我們通過(guò)分析規(guī)范的源碼包畅买,能夠提升自己的編碼水平。
Packets.go
接下來(lái)就要深入到 MySQL
的通信協(xié)議中了细睡,官方的 通信協(xié)議文檔 非常齊全谷羞,我在這里只將一些基礎(chǔ)的,我后面分析源碼會(huì)用到的協(xié)議分析下溜徙,如果有興趣湃缎,可以到官方文檔處進(jìn)行查閱。
Protocol Basics
基礎(chǔ)數(shù)據(jù)類型
MySQL
通信的基本數(shù)據(jù)類型有兩種蠢壹, Integer
嗓违、 String
Integer : 分別有 1, 2图贸, 3蹂季, 4, 8 個(gè)字節(jié)長(zhǎng)度的類型求妹,使用小端傳輸乏盐。
String : 分別有 固定長(zhǎng)度字符串(協(xié)議規(guī)定),NULL結(jié)尾字符串(長(zhǎng)度不固定)制恍,長(zhǎng)度編碼字符串(長(zhǎng)度不固定)父能。
報(bào)文協(xié)議
報(bào)文分為 消息頭 以及 消息體,而 消息頭 由 3 字節(jié)的 消息長(zhǎng)度 以及 1 字節(jié)的 序號(hào) sequence
(新客戶端由 0
開(kāi)始)組成净神,消息體 則由 消息長(zhǎng)度 的字節(jié)組成何吝。
3 字節(jié)的 消息長(zhǎng)度 最大值為
0xFFFFFF
溉委,即為16 MB - 1 byte
,這就意味著爱榕,如果整個(gè)消息(不包括消息頭)的長(zhǎng)度大于16MB - 1byte - 4byte
大小時(shí)瓣喊,消息就會(huì)被分包。1 字節(jié)的 序號(hào) 在每次新的客戶端發(fā)起請(qǐng)求時(shí)黔酥,以
0
開(kāi)始藻三,依次遞增 1 ,如果消息需要分包跪者, 序號(hào) 會(huì)隨著分包的數(shù)量遞增棵帽。而在一次應(yīng)答中, 客戶端會(huì)校驗(yàn)服務(wù)器 返回序號(hào) 是否與 發(fā)送序號(hào) 一致渣玲,如果不一致逗概,則返回錯(cuò)誤異常。
協(xié)議類型
handshake
: 發(fā)起連接auth
: 登錄權(quán)限校驗(yàn)ok | error
: 返回結(jié)果狀態(tài)*
ok
: 首字節(jié)為 0 (0x00
)error
: 首字節(jié)為 255 (0xff
)resultset
: 結(jié)果集header
field
eof
row
command package
: 命令
在整個(gè) MySQL
發(fā)起交互的過(guò)程如下圖所示:
在了解這些 MySQL
基礎(chǔ)協(xié)議知識(shí)后忘衍,我們?cè)賮?lái)看 packages.go
的源碼就輕松多了逾苫。
源碼
先來(lái)看看 readPacket
,結(jié)合上面的知識(shí)點(diǎn)應(yīng)該非常好理解枚钓。
func (mc *mysqlConn) readPacket() ([]byte, error) {
var payload []byte
for { // for 循環(huán)是為了讀取有可能分片的數(shù)據(jù)
// Read package header
data, err := mc.buf.readNext(4) // 從 buffer 緩沖器中讀取 4 字節(jié)的 header
if err != nil { // 如果讀取發(fā)生異常铅搓,則關(guān)閉連接,并返回一個(gè)錯(cuò)誤連接的異常
errLog.Print(err)
mc.Close()
return nil, driver.ErrBadConn
}
// Packet Length [24 bit]
pktLen := int(uint32(data[0]) | uint32(data[1])<<8 | uint32(data[2])<<16) // 讀取 3 字節(jié)的消息長(zhǎng)度
if pktLen < 1 {
// 如上所示搀捷,關(guān)閉連接狸吞,并返回一個(gè)錯(cuò)誤連接的異常
}
// Check Packet Sync [8 bit]
if data[3] != mc.sequence { // 判斷服務(wù)端返回的序號(hào)是否與客戶端一致
if data[3] > mc.sequence {
return nil, ErrPktSyncMul // 如果服務(wù)端返回序號(hào)大于客戶端的序號(hào),則有可能是在一次請(qǐng)求中做了多次操作
}
return nil, ErrPktSync // 返回序號(hào)不一致錯(cuò)誤
}
mc.sequence++ // 本次序號(hào)匹配相符指煎,為了匹配下一次請(qǐng)求,先將序號(hào)自增1
data, err := mc.buf.readNext(pktLen) // 讀取 消息長(zhǎng)度 的數(shù)據(jù)
if err != nil {
// 如上所示便斥,關(guān)閉連接至壤,并返回一個(gè)錯(cuò)誤連接的異常
}
isLastPacket := (pktLen < maxPacketSize) // 如果是最后一個(gè)數(shù)據(jù)包,必然小于 maxPacketSize (16MB - 1byte)
// Zero allocations for non-splitting packets
if isLastPacket && payload == nil { // 無(wú)分包情況枢纠,立即返回
return data, nil
}
payload = append(payload, data...)
if isLastPacket { // 如果是最后一個(gè)包像街,讀取完畢后返回
return payload, nil
}
// 還有未讀數(shù)據(jù),開(kāi)始下一次循環(huán)
}
}
下面來(lái)看下結(jié)合 握手報(bào)文協(xié)議 來(lái)看下客戶端向服務(wù)端發(fā)起請(qǐng)求的 readInitPacket
:
func (mc *mysqlConn) readInitPacket() ([]byte, error) {
data, err := mc.readPacket() // 調(diào)用上面的函數(shù)讀取服務(wù)端返回的數(shù)據(jù)
if err != nil {
return nil, err
}
if data[0] == iERR { // iERR = 0xff 消息體的第一個(gè)字節(jié)返回 0xff 晋渺,則意味著 error package
return nil, mc.handleErrorPacket(data)
}
// protocol version [1 byte]
if data[0] < minProtocolVersion { // 判斷是否是兼容的協(xié)議版本
return nil, fmt.Errorf(
"unsupported protocol version %d. Version %d or higher is required",
data[0],
minProtocolVersion,
)
}
// server version [null terminated string]
// connection id [4 bytes]
pos := 1 + bytes.IndexByte(data[1:], 0x00) + 1 + 4 // 讀取 NULL (0x00)為結(jié)尾的字符串镰绎,跳過(guò)服務(wù)器線程 ID
// first part of the password cipher [8 bytes]
cipher := data[pos : pos+8] // 獲取挑戰(zhàn)隨機(jī)數(shù)
// (filler) always 0x00 [1 byte]
pos += 8 + 1
// capability flags (lower 2 bytes) [2 bytes]
mc.flags = clientFlag(binary.LittleEndian.Uint16(data[pos : pos+2])) // 獲取服務(wù)器權(quán)能標(biāo)識(shí)
if mc.flags&clientProtocol41 == 0 { // 說(shuō)明 MySQL 服務(wù)器不支持高于 41 版本的協(xié)議
return nil, ErrOldProtocol
}
if mc.flags&clientSSL == 0 && mc.cfg.tls != nil { // 說(shuō)明 MySQL 服務(wù)器需要 SSL 加密,但是客戶端沒(méi)有配置 SSL
return nil, ErrNoTLS
}
pos += 2 // 指針向后兩位
if len(data) > pos {
// 指針跳過(guò)標(biāo)志位
pos += 1 + 2 + 2 + 1 + 10
// second part of the password cipher [mininum 13 bytes],
// where len=MAX(13, length of auth-plugin-data - 8)
//
// The web documentation is ambiguous about the length. However,
// according to mysql-5.7/sql/auth/sql_authentication.cc line 538,
// the 13th byte is "\0 byte, terminating the second part of
// a scramble". So the second part of the password cipher is
// a NULL terminated string that's at least 13 bytes with the
// last byte being NULL.
//
// The official Python library uses the fixed length 12
// which seems to work but technically could have a hidden bug.
cipher = append(cipher, data[pos:pos+12]...)
// TODO: Verify string termination
// EOF if version (>= 5.5.7 and < 5.5.10) or (>= 5.6.0 and < 5.6.2)
// \NUL otherwise
//
//if data[len(data)-1] == 0 {
// return
//}
//return ErrMalformPkt
// make a memory safe copy of the cipher slice
var b [20]byte
copy(b[:], cipher)
return b[:], nil
}
// make a memory safe copy of the cipher slice
var b [8]byte // 返回 8 字節(jié)的挑戰(zhàn)隨機(jī)數(shù)
copy(b[:], cipher)
return b[:], nil
}
除了上面解析的兩個(gè)函數(shù)木西, packages.go
還有 initialisation process
/ result packages
/ prepared statements
等協(xié)議的 寫(xiě)入/讀取 畴栖,有興趣的讀者可以結(jié)合上面的知識(shí)點(diǎn)自行閱讀。
Driver.go
接下來(lái)就要分析一些比較重要的代碼了八千,比如接下來(lái)要講的 driver.go
吗讶,它主要負(fù)責(zé)與 MySQL
數(shù)據(jù)庫(kù)進(jìn)行各種協(xié)議的連接燎猛,并返回該連接≌战裕可以說(shuō)它才是最基礎(chǔ)重绷、最核心的功能。
不過(guò)首先我們需要看下 database/sql
包中的 Driver
接口需要如何實(shí)現(xiàn):
// database/sql/driver/driver.go
// 數(shù)據(jù)庫(kù)驅(qū)動(dòng)
type Driver interface {
Open(name string) (Conn, error)
}
// ...
// 非并發(fā)安全數(shù)據(jù)庫(kù)連接
type Conn interface {
// 返回一個(gè)綁定到 sql 的準(zhǔn)備語(yǔ)句
Prepare(query string) (Stmt, error)
// 關(guān)閉該連接膜毁,并標(biāo)記為不再使用昭卓,停止所有準(zhǔn)備語(yǔ)句和事務(wù)
// 因?yàn)?database/sql 包維護(hù)了一個(gè)空閑的連接池,并且在空閑連接過(guò)多的時(shí)候會(huì)自動(dòng)調(diào)用 Close 瘟滨,所以驅(qū)動(dòng)程序包不需要顯式調(diào)用該函數(shù)
Close() error
// 開(kāi)始并返回一個(gè)新的事務(wù)候醒,而新的事務(wù)與舊的連接沒(méi)有任何關(guān)聯(lián)
Begin() (Tx, error)
}
根據(jù) database/sql
提供的 Driver
接口, go-sql-driver/mysql
實(shí)現(xiàn)了自己的 數(shù)據(jù)庫(kù)驅(qū)動(dòng) 結(jié)構(gòu):
type MySQLDriver struct{}
func (d MySQLDriver) Open(dsn string) (driver.Conn, error) {
mc := &mysqlConn {
// set max value
}
mc.cfg = ParseDSN(dsn) // 通過(guò)解析 DSN 設(shè)置 MySQL 連接的配置
// set parseTime and strict
// ...
// connect to server
if dial, ok := dials[mc.cfg.Net]; ok { // 根據(jù) 地址 以及 協(xié)議類型室奏,嘗試連接上服務(wù)器
mc.netConn, err = dial(mc.cfg.Addr)
} else { // 連接服務(wù)器失敗火焰,嘗試重連
nd := net.Dialer{Timeout: mc.cfg.Timeout}
mc.netConn, err := nd.Dial(mc.cfg.Net, mc.cfg.Addr)
}
if err != nil { // 重試失敗,返回異常
return nil, err
}
// Enable TCP Keepalives on TCP connections
if tc, ok := mc.netConn.(*net.Conn); ok { // tcp 連接類型轉(zhuǎn)換
if err := tc.SetKeepAlive(true); err != nil {
// Don't send COM_QUIT before handshake.
mc.netConn.Close() // 如果設(shè)置長(zhǎng)連接失敗胧沫,返回異常之前一定要記得將連接斷開(kāi)
mc.netConn = nil
return nil, err
}
}
mc.buff = newBuff(mc.netConn) // 生成一個(gè)帶緩沖的 buffer昌简,如上面 buffer.go 中所說(shuō)
// set I/O timeout
// ...
// Reading Handshake Initialization Packet
cipher, err := mc.readInitPacket() // 發(fā)起數(shù)據(jù)庫(kù)首次握手
if err != nil {
mc.cleanup() // 將當(dāng)前 mysqlConn 對(duì)象銷毀,后面我們會(huì)說(shuō)這個(gè)函數(shù)
return nil, err
}
// Send Client Authentication Packet
if err = mc.writeAuthPacket(cipher); err != nil { // 向數(shù)據(jù)庫(kù)發(fā)送登錄信息校驗(yàn)
mc.cleanup()
return nil, err
}
}
connection.go
終于要講到這個(gè)包的核心數(shù)據(jù)結(jié)構(gòu) mysqlConn
了绒怨,可以說(shuō)纯赎,驅(qū)動(dòng)的所有功能幾乎都圍繞著這個(gè)數(shù)據(jù)結(jié)構(gòu),我們先來(lái)看看它的結(jié)構(gòu):
type mysqlConn struct {
buf buffer // buffer 緩沖器
netConn net.Conn // 網(wǎng)絡(luò)連接
affectedRows uint64 // sql 執(zhí)行成功影響行數(shù)
insertId uint64 // sql 添加成功最新的主鍵 ID
cfg *Config // dsn 中的 基礎(chǔ)配置
maxPacketAllowed int // 允許的最大報(bào)文的字節(jié)長(zhǎng)度南蹂,最大不能超過(guò) (16MB - 1byte)
maxWriteSize int // 允許最大的寫(xiě)入字節(jié)長(zhǎng)度犬金,最大不能超過(guò) (16MB - 1byte)
writeTimeout time.Duration // 執(zhí)行 sql 的 超時(shí)時(shí)間
flags clientFlag // 客戶端狀態(tài)標(biāo)識(shí)
status statusFlag // 服務(wù)端狀態(tài)標(biāo)識(shí)
sequence uint8 // 序號(hào)
parseTime bool // 是否格式化時(shí)間
strict bool // 是否使用嚴(yán)格模式
}
// driver.go
// 而創(chuàng)建一個(gè) mysqlConn 連接需要通過(guò) driver.go 中的 Open 函數(shù),也說(shuō)明 mysqlConn 實(shí)現(xiàn)了 driver.Conn 接口
func (d MySQLDriver) Open(dsn string) (driver.Conn, error) {
mc := &mysqlConn{
// ...
}
// ...
return mc, nil
}
當(dāng)一個(gè)新的客戶端連接上服務(wù)器的時(shí)候 (三次握手結(jié)束六剥,客戶端進(jìn)入 established
狀態(tài))晚顷,需要先對(duì) MySQL
服務(wù)器進(jìn)行 會(huì)話的用戶/系統(tǒng)環(huán)境變量 的設(shè)置。
// Handles parameters set in DSN after the connection is established
func (mc *mysqlConn) handleParams() (err error) {
for param, val := range mc.cfg.Params { // Params: map[string]string
switch param {
// Charset
case "charset": // 如果是字符集疗疟,則調(diào)用 SET NAMES 命令
charsets := strings.Split(val, ",")
for i := range charsets {
// ignore errors here - a charset may not exist
err = mc.exec("SET NAMES " + charsets[i])
if err == nil {
break
}
}
if err != nil {
return
}
// System Vars
default: // 執(zhí)行系統(tǒng)環(huán)境變量設(shè)置
err = mc.exec("SET " + param + "=" + val + "")
if err != nil {
return
}
}
}
}
conntion.go
還負(fù)責(zé) 事務(wù) 该默、預(yù)處理語(yǔ)句 、執(zhí)行/查詢 的管理策彤,但是基本都是往 mysqlConn
中發(fā)送 command package
栓袖,如:
// Begin 開(kāi)啟事務(wù)
func (mc *mysqlConn) Begin() (driver.Tx, error) {
if mc.netConn == nil {
errLog.Print(ErrInvalidConn)
return nil, driver.ErrBadConn
}
err := mc.exec("START TRANSACTION")
if err == nil {
return &mysqlTx{mc}, err // 返回成功開(kāi)啟的事務(wù),重用之前的連接
}
return nil, err
}
// Internal function to execute commands
func (mc *mysqlConn) exec(query string) error {
// Send command
err := mc.writeCommandPacketStr(comQurey, query)
if err != nil {
return err
}
// Read Result
resLen, err := mc.readResultSetHeaderPacket() // 根據(jù) data[0] 的值判斷是否出錯(cuò)店诗,如果沒(méi)有錯(cuò)誤裹刮,則返回消息體的長(zhǎng)度
if err == nil && resLen > 0 { // 存在有效消息體
if err = mc.readUntilEOF(); err != nil { // 讀取 columns
return err
}
err = mc.readUntilEOF() // 讀取 rows
}
return err
}
我想 conntion.go
中最重要的一個(gè)函數(shù)應(yīng)該是 cleanup
,它負(fù)責(zé)將 連接關(guān)閉 庞瘸、 重置環(huán)境變量 等功能捧弃,但是該函數(shù)不能隨意調(diào)用,它只有在 登錄權(quán)限校驗(yàn)異常 時(shí)候才應(yīng)該被調(diào)用擦囊,否則服務(wù)器在不知道客戶端 被強(qiáng)行關(guān)閉 的情況下塔橡,依然會(huì)向該客戶端發(fā)送消息梅割,導(dǎo)致嚴(yán)重異常:
// Closes the network connection and unsets internal variables. Do not call this
// function after successfully authentication, call Close instead. This function
// is called before auth or on auth failure because MySQL will have already
// closed the network connection.
func (mc *mysqlConn) cleanup() {
// Makes cleanup idempotent 保證函數(shù)的冪等性
if mc.netConn != nil {
if err := mc.netConn.Close(); err != nil { // Close 會(huì)嘗試發(fā)送 comQuit command 到服務(wù)器
errLog.Print(err)
}
mc.netConn = nil // 不管 Close 是否成功,必須將 netConn 清空
}
mc.cfg = nil
mc.buf.nc = nil // 緩沖器中的 netConn 也要關(guān)閉
}
Result.go
每當(dāng) MySQL
返回一個(gè) OK
的 狀態(tài)報(bào)文 葛家,該報(bào)文協(xié)議會(huì)攜帶上本次執(zhí)行的結(jié)果 affectedRows
以及 insertId
户辞,而 result.go
就包含著一個(gè)數(shù)據(jù)結(jié)構(gòu)低飒,用于存儲(chǔ)本次的執(zhí)行結(jié)果构哺。
type mysqlResult struct {
affectedRows int64
insertId int64
}
// 兩個(gè) getter
func (res *mysqlResult) LastInsertId() (int64, error) {
return res.insertId, nil
}
func (res *mysqlResult) RowsAffected() (int64, error) {
return res.affectedRows, nil
}
接下來(lái)我們看下在 conntion.go
中是怎么生成 mysqlResult
對(duì)象的:
// connect.go
func (mc *mysqlConn) Exec(query string, args []driver.Value) (driver.Result, error) {
// ...
err := exec(query)
if err == nil {
return &mysqlResult{ // 返回執(zhí)行的結(jié)果
affectedRows: int64(mc.affectedRows),
insertId: int64(mc.insertId),
}, err
}
return nil, err
}
// exec 函數(shù)的解析可以返回上面 package.go 中瀏覽
// package.go
func (mc *mysqlConn) readResultSetHeaderPacket() (int, error) {
data, err := mc.readPacket()
if err == nil {
switch data[0] {
case iOK:
return 0, mc.handleOkPacket(data) // 處理 OK 狀態(tài)報(bào)文
// ...
}
func (mc *mysqlConn) handleOkPacket(data []byte) error {
var n, m int
// 0x00 [1 byte]
// Affected rows [Length Coded Binary]
mc.affectedRows, _, n = readLengthEncodedInteger(data[1:])
// Insert id [Length Coded Binary]
mc.insertId, _, m = readLengthEncodedInteger(data[1+n:])
// ...
}
Row.go
當(dāng) MySQL
執(zhí)行 插入、更新社裆、刪除 等操作后弹砚,都會(huì)返回 Result
双仍,但是 查詢 返回的是 Rows
,我們先來(lái)看看 go-mysql-driver
驅(qū)動(dòng)所實(shí)現(xiàn)的 接口 Rows
的接口描述:
// database/sql/driver/driver.go
// Rows 是執(zhí)行查詢返回的結(jié)果的 游標(biāo)
type Rows interface {
// Columns 返回列的名稱桌吃,從 slice 的長(zhǎng)度可以判斷列的長(zhǎng)度
// 如果一個(gè)列的名稱未知朱沃,則為該列返回一個(gè)空字符串
Columns() []string
// Close 關(guān)閉游標(biāo)
Close() error
// Next 將下一行數(shù)據(jù)填充到 desc 切片中
// 如果讀取的是最后一行數(shù)據(jù),應(yīng)該返回一個(gè) io.EOF 錯(cuò)誤
Next(desc []Value) error
}
type Value interface{} // Value is a value that drivers must be able to handle.
為什么我要說(shuō)這是 go-mysql-driver
驅(qū)動(dòng)所實(shí)現(xiàn)的 接口 Rows
呢茅诱?眼尖的同學(xué)應(yīng)該已經(jīng)看到了逗物, Next
函數(shù)好像和我們平常見(jiàn)到的不一樣啊I蟆翎卓!
是的,因?yàn)槲覀兤匠J褂玫模?/p>
rows.Next()
rows.Scan(dest ...interface{}) error
等函數(shù)的對(duì)象 rows
并不是上面的 接口描述 Rows
摆寄,而是另一個(gè)封裝的 同名數(shù)據(jù)結(jié)構(gòu) Rows
失暴,它就在 database/sql
包中 :
// database/sql.go
type Rows struct {
dc *driverConn
releaseConn func(error)
rowsi driver.Rows // 接口描述的 Rows 藏在這!N⒓ⅰ逗扒!
// 忽略其他字段,因?yàn)槲覀儾环治鲞@個(gè)包...
// lastcols is only used in Scan, Next, and NextResultSet which are expected
// not not be called concurrently.
lastcols []driver.Value
}
我們跳過(guò) database/sql
包中的 Rows
實(shí)現(xiàn)欠橘,其無(wú)非是提供了更多功能的一個(gè)結(jié)果集而已缴阎,讓我們回到真正與數(shù)據(jù)庫(kù)進(jìn)行交互的 Rows
中進(jìn)行源碼分析。
在 go-sql-driver
實(shí)現(xiàn)的 mysqlRows
數(shù)據(jù)結(jié)構(gòu)只實(shí)現(xiàn)了 Columns()
和 Close()
兩個(gè)行數(shù)简软,剩下的 Next(desc []driver.Value)
實(shí)現(xiàn)則交給了 MySQL
的兩種結(jié)果集協(xié)議:
// rows.go
type mysqlField struct {
tableName string
name string
flags fieldFlag
fieldType byte
decimals byte
}
type mysqlRows struct {
mc *mysqlConn
columns []mysqlField
}
type binaryRows struct { // 二進(jìn)制結(jié)果集協(xié)議
mysqlRows // 對(duì)于 Go 的 組合特性 應(yīng)該不會(huì)陌生吧?
}
type textRows struct { // 文本結(jié)果集協(xié)議
mysqlRows
}
func (rows *mysqlRows) Columns() []string {
columns := make([]string, len(rows.columns))
// 將列名賦值到 columns 述暂,如果有設(shè)置別名則賦值別名...
return columns
}
func (rows *mysqlRows) Close() error {
// 將連接里面的未讀數(shù)據(jù)讀完痹升,然后將連接置空
}
// 接下來(lái)的 Next 函數(shù)實(shí)現(xiàn)就交由 binaryRows 和 textRows 了
func (rows *binaryRows) Next(desc []driver.Value) error {
if mc := rows.mc; mc != nil {
if mc.netConn == nil {
return ErrInvalidConn
}
return rows.readRow(dest) // 讀二進(jìn)制協(xié)議結(jié)果集
}
return io.EOF
}
func (rows *testRows) Next(desc []driver.Value) error {
if mc := rows.mc; mc != nil {
if mc.netConn == nil {
return ErrInvalidConn
}
return rows.readRow(dest) // 讀取文本協(xié)議
}
return io.EOF
}
可以說(shuō),實(shí)現(xiàn)了 driver.Rows
接口的只有 binaryRows
和 testRows
畦韭,而他們里面的 readRow(desc)
實(shí)現(xiàn)由于都是和協(xié)議強(qiáng)相關(guān)的代碼疼蛾,就不再解析了。
我們跟著源碼可以看到艺配,使用 textRows
的場(chǎng)景在 getSystemVar
以及 Query
中察郁,而使用 binaryRows
的場(chǎng)景在 statement
中衍慎,就是我們下一步需要解析的部分。
Statement.go
Prepared Statement
皮钠,即預(yù)處理語(yǔ)句稳捆,他有什么優(yōu)勢(shì)呢,為什么 MySQL
要加入它麦轰?
執(zhí)行性能更高:
MySQL
會(huì)對(duì)Prepared Statement
語(yǔ)句預(yù)先進(jìn)行編譯成模板乔夯,并將 占位符 替換 參數(shù) 的位置,這樣如果頻繁執(zhí)行一條參數(shù)只有少量替換的語(yǔ)句時(shí)候款侵,性能會(huì)得到大量提高末荐。可能有同學(xué)會(huì)有疑問(wèn)新锈,為什么MySQL
語(yǔ)句還需要編譯甲脏?那么可以來(lái)參考下這篇 MySQL Prepare 原理 。傳輸協(xié)議更優(yōu):
Prepare Statement
在傳輸時(shí)候使用的是Binary Protocol
妹笆,比使用Text Protocol
的查詢具有 傳輸數(shù)據(jù)量更小 块请、 無(wú)需轉(zhuǎn)換數(shù)據(jù)格式 等優(yōu)勢(shì),緩解了 CPU 和 網(wǎng)絡(luò) 的開(kāi)銷晾浴。安全性更好:由 MySQL Prepare 原理 我們可以知道负乡,
Perpare
編譯之后會(huì)生成 語(yǔ)法樹(shù),在執(zhí)行的時(shí)候才會(huì)將參數(shù)傳進(jìn)來(lái)脊凰,這樣就避免了平常直接執(zhí)行SQL 語(yǔ)句
會(huì)發(fā)生的SQL 注入
問(wèn)題抖棘。
好了,先來(lái)看下 mysqlStmt
的數(shù)據(jù)結(jié)構(gòu):
type mysqlStmt struct {
mc *mysqlConn
id uint32
paramCount int
columns []mysqlField // cached from the first query (既然SQL已經(jīng)預(yù)編譯好了狸涌,返回的結(jié)果集列名已經(jīng)是確定的切省,所以在收到 PREPARE_OK 之后解析數(shù)據(jù)后會(huì)緩存下來(lái))
}
我們發(fā)現(xiàn),它比 mysqlRows
多了兩個(gè)成員變量:
id
:MySQL
預(yù)處理語(yǔ)句之后帕胆,會(huì)給該語(yǔ)句分配一個(gè)id
并返回客戶端朝捆,用于:客戶端提交該
id
給服務(wù)器調(diào)用對(duì)應(yīng)的預(yù)處理語(yǔ)句。paramCount
:參數(shù)數(shù)量懒豹,等于 占位符 的個(gè)數(shù)芙盘,用于:判斷傳入的參數(shù)個(gè)數(shù)是否與預(yù)編譯語(yǔ)句中的占位符個(gè)數(shù)一致。
判斷返回的
PREPARE_OK
響應(yīng)報(bào)文是否帶有 參數(shù)列名 數(shù)據(jù)脸秽。
下面來(lái)看看如何創(chuàng)建并使用一個(gè) Prepare Statement
:
func (mc *mysqlConn) Prepare(query string) (driver.Stmt, error) { // 傳入需要預(yù)編譯的 SQL 語(yǔ)句
// 檢查連接是否可用...
err = mc.writeCommandPacketStr(comStmtPrepare, query) // 將 SQL 發(fā)往數(shù)據(jù)庫(kù)進(jìn)行預(yù)編譯
if err != nil {
return nil, err
}
stmt := &mysqlStmt{ // 預(yù)編譯成功儒老,先創(chuàng)建 stmt 對(duì)象
mc: mc,
}
// Read Result
columnCount, err := stmt.readPrepareResultPacket() // 從 stmt 的連接讀取返回 響應(yīng)報(bào)文
if err == nil {
if stmt.paramCount > 0 { // 如果預(yù)編譯的 SQL 的有參數(shù)
if err = mc.readUntilEOF(); err != nil { // 讀取參數(shù)列名數(shù)據(jù)
return nil, err
}
}
if columnCount > 0 { // 返回執(zhí)行結(jié)果的列表個(gè)數(shù)
err = mc.readUntilEOF() // 讀取執(zhí)行結(jié)果的列名數(shù)據(jù)
}
}
return stmt, err
}
因?yàn)槭且呀?jīng)預(yù)編譯好的語(yǔ)句,所以在執(zhí)行的時(shí)候只需要將參數(shù)傳進(jìn)去就可以了记餐。
func (stmt *mysqlStmt) Exec(args []driver.Value) (driver.Result, error) {
// 檢查連接是否可用...
err := stmt.writeExecutePacket(args)
if err != nil {
return nil, err
}
// 讀取結(jié)果集的行驮樊、列數(shù)據(jù)
}
func(stmt *mysqlStmt) writeExecutePacket(args []driver.Value) error {
if len(args) != stmt.paramCount { // 判斷傳進(jìn)來(lái)的參數(shù)和預(yù)編譯好的SQL參數(shù) 個(gè)數(shù)是否一致
return fmt.Errorf(
"argument count mismatch (got: %d; has: %d)",
len(args),
stmt.paramCount,
)
}
// 讀取緩沖器中的數(shù)據(jù),如果為空,則返回異常...
// command [1 byte]
data[4] = comStmtExecute
// statement_id [4 bytes] 將預(yù)編譯語(yǔ)句的 id 轉(zhuǎn)換為 4字節(jié)的二進(jìn)制數(shù)據(jù)
data[5] = byte(stmt.id)
data[6] = byte(stmt.id >> 8)
data[7] = byte(stmt.id >> 16)
data[8] = byte(stmt.id >> 24)
// flags (0: CURSOR_TYPE_NO_CURSOR) [1 byte]
data[9] = 0x00
// iteration_count (uint32(1)) [4 bytes]
data[10] = 0x01
data[11] = 0x00
data[12] = 0x00
data[13] = 0x00
// 將參數(shù)按照不同的類型轉(zhuǎn)換為 binary protobuf 并 append 到 data 中...
return mc.writePacket(data)
}
相信看到這里囚衔,已經(jīng)能對(duì)看懂源碼的 70% 了挖腰,剩余的代碼都是和協(xié)議相關(guān),就留待有興趣的讀者繼續(xù)研究练湿,這里就不再展開(kāi)講了猴仑。
Transaction.go
事務(wù)是 MySQL
中很重要的一部分,但是驅(qū)動(dòng)的實(shí)現(xiàn)卻很簡(jiǎn)單鞠鲜,因?yàn)橐磺械氖聞?wù)控制都已經(jīng)交由 MySQL
去執(zhí)行了宁脊,驅(qū)動(dòng)所需要做的,只要發(fā)送一個(gè) commit
或者 rollback
的 command packet
即可贤姆。
type mysqlTx struct {
mc *mysqlConn
}
func (tx *mysqlTx) Commit() (err error) {
if tx.mc == nil || tx.mc.netConn == nil {
return ErrInvalidConn
}
err = tx.mc.exec("COMMIT")
tx.mc = nil
return
}
func (tx *mysqlTx) Rollback() (err error) {
if tx.mc == nil || tx.mc.netConn == nil {
return ErrInvalidConn
}
err = tx.mc.exec("ROLLBACK")
tx.mc = nil
return
}
總結(jié)
最后榆苞,其實(shí) buffer
的實(shí)現(xiàn)對(duì)我來(lái)說(shuō)印象是最深刻的,因?yàn)樗亲詈?jiǎn)單而又是最有效的實(shí)現(xiàn)了一個(gè)消息緩沖器霞捡,它實(shí)現(xiàn)的巧妙讓我決定把它放到第一節(jié)坐漏,而其他的幾乎都和 MySQL
的協(xié)議相關(guān),看這些源碼讓我對(duì) MySQL
有了更多的認(rèn)識(shí)碧信。
好了赊琳,本篇字?jǐn)?shù)比較多,也會(huì)有很多不足砰碴,希望大家能夠給本篇博客多提點(diǎn)意見(jiàn)躏筏,讓我可以改進(jìn)的更好。如果還有機(jī)會(huì)呈枉,我會(huì)帶來(lái)其他篇章的源碼解析趁尼,敬請(qǐng)期待 :)