兩個(gè)事務(wù)同時(shí)操作相同的數(shù)據(jù),后提交的事務(wù)會(huì)覆蓋先提交的事務(wù)處理結(jié)果
使用銀行轉(zhuǎn)賬的經(jīng)典例子來(lái)幫助理解表窘,這里的更新丟失是因?yàn)?code>并發(fā)讀后寫(xiě)造成的
更新丟失模擬
mysql數(shù)據(jù)準(zhǔn)備
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for tb
-- ----------------------------
DROP TABLE IF EXISTS `tb`;
CREATE TABLE `tb` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
`yk` int(11) NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of tb
-- ----------------------------
INSERT INTO `tb` VALUES (1, 0);
SET FOREIGN_KEY_CHECKS = 1;
使用python操作mysql
做這種簡(jiǎn)單操作使用python的sqlalchemy比較方便,java的jdbc的話(huà)也行~
- 使用線(xiàn)程A和線(xiàn)程B,分別對(duì)tb表字段yk進(jìn)行疊加
- 線(xiàn)程A疊加100次姜凄,線(xiàn)程B疊加150次,最終結(jié)果應(yīng)該是250次
- 線(xiàn)程A和線(xiàn)程B在這里我也稱(chēng)作為事務(wù)A和事務(wù)B幫助理解
import threading
from threading import Thread
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
engine = create_engine(
'mysql+mysqlconnector://{user}:{password}@{host}:{port}/{database}'.format(user='root', password='yk123',
host='localhost',
port='3306',
database='test'),
max_overflow=int(1), # 超過(guò)連接池大小外最多創(chuàng)建的連接
pool_size=int(5), # 連接池大小,默認(rèn)是5
pool_timeout=int(1), # 池中沒(méi)有線(xiàn)程最多等待的時(shí)間趾访,否則報(bào)錯(cuò)
pool_recycle=int(0) # 多久之后對(duì)線(xiàn)程池中的線(xiàn)程進(jìn)行一次連接的回收(重置)
)
# 修改
def add(count):
for i in list(range(1,count)):
print(threading.currentThread().getName()+"==>"+str(i))
connection = engine.connect(close_with_result=False)
trans = connection.begin()
ex = connection.execute("SELECT yk FROM tb WHERE id = %(id)s;", id=1)
y = ex.first()
yk = y[0]
yk = yk + 1
connection.execute("UPDATE tb SET yk = %(yk)s WHERE id = 1;",yk=yk)
trans.commit()
# 測(cè)試
if __name__ == '__main__':
# 用線(xiàn)程去執(zhí)行函數(shù)
t1 = Thread(target=add,args=(101,)) # 100次
t1.setName("線(xiàn)程A")
t2 = Thread(target=add,args=(151,)) # 150次
t2.setName("線(xiàn)程B")
t1.start()
t2.start()
最后态秧,執(zhí)行結(jié)果僅為150,出現(xiàn)了更新丟失
解析出現(xiàn)這種事務(wù)并發(fā)問(wèn)題的過(guò)程扼鞋,事務(wù)A和事務(wù)B并發(fā)執(zhí)行
時(shí)間 | 事務(wù)A | 事務(wù)B |
---|---|---|
T1 | 開(kāi)始事務(wù) | |
T2 | 開(kāi)始事務(wù) | |
T3 | 查詢(xún)yk值為0 | |
T4 | 查詢(xún)yk值為0 | |
T5 | yk值加1 | |
T6 | yk值加1 | |
T7 | 事務(wù)提交屿聋,yk值為1 | 此時(shí)yk的值已經(jīng)為1 |
T8 | 事務(wù)提交,yk值為1 藏鹊;出現(xiàn)更新丟失润讥,yk的值應(yīng)該為2
|
解決方式
1、提升隔離級(jí)別到到serializable
serializable能讓數(shù)據(jù)庫(kù)的所有事務(wù)都串行化盘寡,排隊(duì)執(zhí)行楚殿;此時(shí)就不存在事務(wù)A和事務(wù)B一起執(zhí)行的情況了;但是這種方法是不允許的竿痰,因?yàn)閟erializabl級(jí)別禁止事務(wù)并發(fā)執(zhí)行脆粥,這在性能上是一個(gè)很大的坑
set session transaction isolation level serializable ;
set global transaction isolation level serializable;
SELECT @@global.tx_isolation, @@tx_isolation;
重新運(yùn)行上面的程序,運(yùn)行結(jié)果
2影涉、使用悲觀鎖---X鎖变隔,加鎖讀
重新將隔離級(jí)別設(shè)為默認(rèn)級(jí)別 RR
set session transaction isolation level repeatable read ;
set global transaction isolation level repeatable read;
SELECT @@global.tx_isolation, @@tx_isolation;
將查詢(xún)yk字段的select加上 for update關(guān)鍵字;添加X(jué)鎖
ex = connection.execute("SELECT yk FROM tb WHERE id = %(id)s
FOR UPDATE
;", id=1)
重新運(yùn)行上面的程序蟹倾,結(jié)果為250匣缘。X鎖為排他鎖,阻塞其它事務(wù)的加鎖讀和insert鲜棠、update肌厨、delete操作。所以B事務(wù)的查詢(xún)操作會(huì)被阻塞豁陆,直到A事務(wù)提交B事務(wù)才會(huì)繼續(xù)執(zhí)行
在這里有個(gè)問(wèn)題: 為什么不使用S鎖柑爸?
使用S鎖在鎖定行后面有更新操作,容易導(dǎo)致死鎖盒音。所以這種情況請(qǐng)使用X鎖表鳍。我的這篇文章有講到這點(diǎn)
http://www.reibang.com/p/beddb45070bb
3、使用樂(lè)觀鎖機(jī)制
思路:在update語(yǔ)句中添加一個(gè)版本號(hào)做where條件祥诽,判斷update語(yǔ)句執(zhí)行的影響行數(shù)譬圣,如果為0則重新執(zhí)行事務(wù)B;循環(huán)下去原押,直到更新成功胁镐!
1、先給tb表添加一個(gè)version版本號(hào)字段诸衔,更改完后表結(jié)構(gòu)和數(shù)據(jù)如下
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for tb
-- ----------------------------
DROP TABLE IF EXISTS `tb`;
CREATE TABLE `tb` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
`yk` int(11) NULL DEFAULT NULL,
`version` int(11) NOT NULL COMMENT '版本號(hào)',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of tb
-- ----------------------------
INSERT INTO `tb` VALUES (1, 0, 1);
SET FOREIGN_KEY_CHECKS = 1;
2盯漂、update語(yǔ)句改成這樣:
result = connection.execute(
"UPDATE tb SET yk = %(yk)s ,version = version +1 WHERE id = 1 and version = %(version)s;", yk=yk,
version=version)
使用樂(lè)觀鎖的思想,代碼改成如下所示
import threading
from threading import Thread
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
engine = create_engine(
'mysql+mysqlconnector://{user}:{password}@{host}:{port}/{database}'.format(user='root', password='yk123',
host='localhost',
port='3306',
database='test'),
max_overflow=int(1), # 超過(guò)連接池大小外最多創(chuàng)建的連接
pool_size=int(5), # 連接池大小,默認(rèn)是5
pool_timeout=int(1), # 池中沒(méi)有線(xiàn)程最多等待的時(shí)間笨农,否則報(bào)錯(cuò)
pool_recycle=int(0) # 多久之后對(duì)線(xiàn)程池中的線(xiàn)程進(jìn)行一次連接的回收(重置)
)
# 修改
def add(count):
for i in list(range(1,count)):
print(threading.currentThread().getName()+"==>"+str(i))
dg()
def dg():
connection = engine.connect(close_with_result=False)
trans = connection.begin()
ex = connection.execute("SELECT yk,version FROM tb WHERE id = %(id)s;", id=1)
y = ex.first()
version = y[1]
yk = y[0]
yk = yk + 1
result = connection.execute(
"UPDATE tb SET yk = %(yk)s ,version = version +1 WHERE id = 1 and version = %(version)s;", yk=yk,
version=version)
rowcount = result.rowcount
trans.commit()
# update影響行數(shù)為0就缆,說(shuō)明更新失敗谒亦!自旋竭宰!這里做dg函數(shù)的遞歸調(diào)用
if rowcount == 0:
dg()
# 測(cè)試
if __name__ == '__main__':
# 用線(xiàn)程去執(zhí)行函數(shù)
t1 = Thread(target=add,args=(101,)) # 100次
t1.setName("線(xiàn)程A")
t2 = Thread(target=add,args=(151,)) # 150次
t2.setName("線(xiàn)程B")
t1.start()
t2.start()
執(zhí)行結(jié)果:yk值最終為250
注意:自旋的遞歸調(diào)用一定不能在同一個(gè)事務(wù)里執(zhí)行,因?yàn)閙ysql的默認(rèn)RR級(jí)別下是允許可重復(fù)的份招,也就是說(shuō)多次讀取version字段并不會(huì)發(fā)生改變切揭。故需要一次遞歸就開(kāi)啟一次事務(wù),這樣才能讀到最新的version字段
比較悲觀鎖和樂(lè)觀鎖锁摔,應(yīng)該選擇哪種廓旬?
悲觀鎖在查詢(xún)時(shí)添加X(jué)鎖,查詢(xún)多效率低下
樂(lè)觀鎖在大量的并發(fā)修改下谐腰,很容易造成失敗自旋孕豹,在我上面的例子中體現(xiàn)的是 dg()函數(shù)的遞歸執(zhí)行。極端情況下要失敗很多次才能成功修改
- 查詢(xún)多十气,修改少 使用樂(lè)觀鎖
- 查詢(xún)少励背,修改多 使用悲觀鎖
《阿里巴巴Java開(kāi)發(fā)手冊(cè)》里面有講到關(guān)于悲觀鎖和樂(lè)觀鎖的選擇:
【強(qiáng)制】 并發(fā)修改同一記錄時(shí),避免更新丟失砸西,需要加鎖叶眉。 要么在應(yīng)用層加鎖,要么在緩存加鎖芹枷,要么在數(shù)據(jù)庫(kù)層使用樂(lè)觀鎖竟闪,使用 version 作為更新依據(jù)。
說(shuō)明:如果每次訪問(wèn)沖突概率小于 20%杖狼,推薦使用樂(lè)觀鎖炼蛤,否則使用悲觀鎖。樂(lè)觀鎖的重試次數(shù)不得小于3 次蝶涩。但是如何判斷沖突概率
和重試次數(shù)
呢理朋?這應(yīng)該根據(jù)業(yè)務(wù)環(huán)境進(jìn)行實(shí)驗(yàn)才能得出吧
還有一條
【推薦】 資金相關(guān)的金融敏感信息,使用悲觀鎖策略绿聘。
說(shuō)明:樂(lè)觀鎖在獲得鎖的同時(shí)已經(jīng)完成了更新操作嗽上,校驗(yàn)邏輯容易出現(xiàn)漏洞,另外熄攘,樂(lè)觀鎖對(duì)沖突的解決策略有較復(fù)雜的要求兽愤,處理不當(dāng)容易造成系統(tǒng)壓力或數(shù)據(jù)異常,所以資金相關(guān)的金融敏感信息不建議使用樂(lè)觀鎖更新
。
其實(shí)這種更新丟失問(wèn)題可以交給mysql自己解決
我們不需要將yk字段讀到程序中浅萧,給它加1再賦值回去逐沙。完全可以直接在sql中進(jìn)行加1,如
UPDATE tb SET yk = yk + 1 WHERE id = 1;
這樣的sql具有原子性洼畅,本身的讀和寫(xiě)被mysql底層看做一個(gè)操作吩案。因此不會(huì)出現(xiàn)更新丟失了
import threading
from threading import Thread
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
engine = create_engine(
'mysql+mysqlconnector://{user}:{password}@{host}:{port}/{database}'.format(user='root', password='yk123',
host='localhost',
port='3306',
database='test'),
max_overflow=int(1), # 超過(guò)連接池大小外最多創(chuàng)建的連接
pool_size=int(5), # 連接池大小,默認(rèn)是5
pool_timeout=int(1), # 池中沒(méi)有線(xiàn)程最多等待的時(shí)間,否則報(bào)錯(cuò)
pool_recycle=int(0) # 多久之后對(duì)線(xiàn)程池中的線(xiàn)程進(jìn)行一次連接的回收(重置)
)
# 修改
def add(count):
for i in list(range(1,count)):
print(threading.currentThread().getName()+"==>"+str(i))
connection = engine.connect(close_with_result=False)
trans = connection.begin()
connection.execute("UPDATE tb SET yk = yk + 1 WHERE id = 1;")
trans.commit()
# 測(cè)試
if __name__ == '__main__':
# 用線(xiàn)程去執(zhí)行函數(shù)
t1 = Thread(target=add,args=(101,)) # 100次
t1.setName("線(xiàn)程A")
t2 = Thread(target=add,args=(151,)) # 150次
t2.setName("線(xiàn)程B")
t1.start()
t2.start()