分布式事務概述
說起分布式事務券膀,是個讓人又愛又恨的話題驯遇。恨他叉庐,因為這個世界性難題始終沒有一個完美的解決方案。而愛他玩郊,因為他引出了一系列解巧妙決方案匾竿,不得不感嘆一代又一代計算機人的智慧岭妖。這篇文章我們就來談談分布式事務的一些解決方案反璃,其中互聯(lián)網(wǎng)公司中最常用哪種淮蜈?MQ又如何和事務聯(lián)系起來呢已卷?
模型
在正式開始介紹之前侧蘸,我們先通過兩張圖例來看看什么是分布式事務的情況。
上圖我們再熟悉不過穿稳,就是傳統(tǒng)的本地事務逢艘,我們操作的數(shù)據(jù)庫是一個單數(shù)據(jù)源骤菠,通過數(shù)據(jù)庫提供的事務就可以輕松完成商乎,業(yè)務上通過spring的聲明式事務注解@Transitional就可以,可是如果多數(shù)據(jù)源的情況呢爬泥?
像這樣其實就是分布式事務的一種情況了,跨庫事務却桶。很明顯本地事務那套方式已經(jīng)玩不起來了蔗牡,要怎么辦呢辩越?當然是兩階段提交嘍,別急趁啸,先賣個關(guān)子,后面我們詳細來解釋旅掂,下面再來看一種情況商虐。
以上大概是互聯(lián)網(wǎng)最常見的一種模型了秘车,微服務間的分布式事務劫哼。這種情況甚至服務間存在于不同的jvm進程沦偎。這里要使用的方式就是大名鼎鼎的TCC了,這個我們后面再詳細探討具體的實現(xiàn)方式搔驼。
好了舌涨,了解了分布式事務產(chǎn)生的模型扔字,我們就可以開始正式開始吹牛逼之旅了革为,等等,貌似還需要交代幾個小小的理論琢蛤。
BASE理論
為什么說分布式事務是個難題博其?因為要達到強一致性(數(shù)據(jù)更新后立即達到一致狀態(tài))是非常困難的迂猴,所以有了中間狀態(tài)(軟狀態(tài))沸毁,即允許數(shù)據(jù)在某一狀態(tài)下是不一致的傻寂,但是要盡可能保證最終一致性崎逃。這其實也就是BASE理論了眉孩,他的定義如下:
base理論
- 基本可用(Basically Availability)
指分布式系統(tǒng)出現(xiàn)不可預知錯誤的時候浪汪,允許損失部分可用性。 - 軟狀態(tài)(Soft State)
也就是中間狀態(tài)广恢,允許存在這種狀態(tài)并認為不會影響系統(tǒng)的整體可用性钉迷,即允許不同節(jié)點在的數(shù)據(jù)傳輸之間存在延遲糠聪。 - 最終一致(Eventual Consistency)
在數(shù)據(jù)更新操作之后谐鼎,在經(jīng)過一定時間的同步之后,最終都能達到一個一致的狀態(tài)身害。不需要保證系統(tǒng)的強一致性塌鸯。
兩階段提交(Two Phase Commitment)
所謂兩階段提交唐片,顧名思義牵触,就是把事務的提交分成兩個階段揽思,但是注意见擦,這兩個階段是在一組操作中的,不要誤以為是兩組操作福侈。可能這么說不是很明白卢未,我們再來看張圖例
還不明白也沒關(guān)系肪凛,等介紹完了XA和TCC再回過頭看看就GET了。好了辽社,下面我們就從XA開始伟墙,正式開始來了解分布式事務的那些解決方案。
XA/JTA方案
XA是業(yè)界關(guān)于分布式管理的一個規(guī)范滴铅,而JTA是JAVA的一個實現(xiàn)戳葵。
在XA中,我們引入了一個中間協(xié)調(diào)者的角色拱烁。在第一階段中,所有的參與者需要鎖住要操作的資源噩翠,進行操作戏自,然后通知協(xié)調(diào)者已經(jīng)準備就緒,可以提交事務绎秒。
第二階段時協(xié)調(diào)者收到了某個參與者發(fā)送的請求浦妄,得知了他們都已經(jīng)達到了可以提交事務的狀態(tài),接著像所有參與者發(fā)送commit命令见芹,事務提交剂娄。如果有一方參與者執(zhí)行失敗,那協(xié)調(diào)器就會發(fā)送rollback命令玄呛,各個參與者都回滾阅懦。
都說talk is cheap,show me the code徘铝,下面我們就來看看上述過程使用JTA實現(xiàn)的一組代碼
boolean logXaCommands = true;
// 獲得資源管理器操作接口實例 RM1
Connection conn1 = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "root");
XAConnection xaConn1 = new MysqlXAConnection((com.mysql.jdbc.Connection) conn1, logXaCommands);
XAResource rm1 = xaConn1.getXAResource();
// 獲得資源管理器操作接口實例 RM2
Connection conn2 = DriverManager.getConnection("jdbc:mysql://localhost:3306/test1", "root", "root");
XAConnection xaConn2 = new MysqlXAConnection((com.mysql.jdbc.Connection) conn2, logXaCommands);
XAResource rm2 = xaConn2.getXAResource();
// AP請求TM執(zhí)行一個分布式事務耳胎,TM生成全局事務id
byte[] gtrid = "g12345".getBytes();
int formatId = 1;
try {
// ==============分別執(zhí)行RM1和RM2上的事務分支====================
// TM生成rm1上的事務分支id
byte[] bqual1 = "b00001".getBytes();
Xid xid1 = new MysqlXid(gtrid, bqual1, formatId);
// 執(zhí)行rm1上的事務分支
rm1.start(xid1, XAResource.TMNOFLAGS);// One of TMNOFLAGS, TMJOIN,
// or TMRESUME.
PreparedStatement ps1 = conn1.prepareStatement("INSERT into user(name) VALUES ('tianshouzhi')");
ps1.execute();
rm1.end(xid1, XAResource.TMSUCCESS);
// TM生成rm2上的事務分支id
byte[] bqual2 = "b00002".getBytes();
Xid xid2 = new MysqlXid(gtrid, bqual2, formatId);
// 執(zhí)行rm2上的事務分支
rm2.start(xid2, XAResource.TMNOFLAGS);
PreparedStatement ps2 = conn2.prepareStatement("INSERT into user(name) VALUES ('wangxiaoxiao')");
ps2.execute();
rm2.end(xid2, XAResource.TMSUCCESS);
// ===================兩階段提交================================
// phase1:詢問所有的RM 準備提交事務分支
int rm1_prepare = rm1.prepare(xid1);
int rm2_prepare = rm2.prepare(xid2);
// phase2:提交所有事務分支
boolean onePhase = false; // TM判斷有2個事務分支,所以不能優(yōu)化為一階段提交
// 所有事務分支都prepare成功惕它,提交所有事務分支
if (rm1_prepare == XAResource.XA_OK && rm2_prepare == XAResource.XA_OK) {
rm1.commit(xid1, onePhase);
rm2.commit(xid2, onePhase);
} else {// 如果有事務分支沒有成功怕午,則回滾
rm1.rollback(xid1);
rm1.rollback(xid2);
}
} catch (XAException e) {
// 如果出現(xiàn)異常,也要進行回滾
e.printStackTrace();
}
再來看看atomikos的實現(xiàn)方式淹魄,atomikos的免費開源版也實現(xiàn)了XA
private static AtomikosDataSourceBean createAtomikosDataSourceBean(String dbName) {
// 連接池基本屬性
Properties p = new Properties();
p.setProperty("url", "jdbc:mysql://localhost:3306/" + dbName);
p.setProperty("user", "root");
p.setProperty("password", "root");
// 使用AtomikosDataSourceBean封裝com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
// atomikos要求為每個AtomikosDataSourceBean名稱郁惜,為了方便記憶,這里設(shè)置為和dbName相同
ds.setUniqueResourceName(dbName);
ds.setXaDataSourceClassName("com.mysql.jdbc.jdbc2.optional.MysqlXADataSource");
ds.setXaProperties(p);
return ds;
}
public static void main(String[] args) throws Exception {
AtomikosDataSourceBean ds1 = createAtomikosDataSourceBean("db_user");
AtomikosDataSourceBean ds2 = createAtomikosDataSourceBean("db_account");
Connection conn1 = null;
Connection conn2 = null;
PreparedStatement ps1 = null;
PreparedStatement ps2 = null;
UserTransaction userTransaction = new UserTransactionImp();
try {
// 開啟事務
userTransaction.begin();
// 執(zhí)行db1上的sql
conn1 = ds1.getConnection();
ps1 = conn1.prepareStatement("INSERT into user(name) VALUES (?)", Statement.RETURN_GENERATED_KEYS);
ps1.setString(1, "tianshouzhi");
ps1.executeUpdate();
ResultSet generatedKeys = ps1.getGeneratedKeys();
int userId = -1;
while (generatedKeys.next()) {
userId = generatedKeys.getInt(1);// 獲得自動生成的userId
}
// 模擬異常 甲锡,直接進入catch代碼塊兆蕉,2個都不會提交
// int i=1/0;
// 執(zhí)行db2上的sql
conn2 = ds2.getConnection();
ps2 = conn2.prepareStatement("INSERT into account(user_id,money) VALUES (?,?)");
ps2.setInt(1, userId);
ps2.setDouble(2, 10000000);
ps2.executeUpdate();
// 兩階段提交
userTransaction.commit();
} catch (Exception e) {
try {
e.printStackTrace();
userTransaction.rollback();
} catch (SystemException e1) {
e1.printStackTrace();
}
} finally {
try {
ps1.close();
ps2.close();
conn1.close();
conn2.close();
ds1.close();
ds2.close();
} catch (Exception ignore) {
}
}
}
簡單很多了對吧羽戒?其實它們的原理時十分類似的,所以如果有面試問到atomikos的具體實現(xiàn)方式虎韵,你懂得易稠!
說了這么多,其實這種方式更多的是提供一種解決問題的思路包蓝,實際環(huán)境中時不太可能這么玩的驶社,因為這種方式性能太差了,他需要鎖住相應的資源养晋,互聯(lián)網(wǎng)項目中有著很高并發(fā)吞吐量衬吆,這種方式很明顯不適合,所以還是得引入我們今天要討論的第二種方式:TCC
TCC兩階段補償性方案
根據(jù)上面的圖例绳泉,也可以看出TCC就是 Try-Confirm-Cancel的簡稱逊抡,為了讓大家更好的理解這種方案,我舉個例子來說明零酪。
假設(shè)我們需要一張從合肥飛往大理的機票冒嫡,但是沒有直飛,怎么辦四苇?難道一張一張買嗎孝凌,更不用說還要留意中轉(zhuǎn)時間是否合理等等問題,所以我們往往會選擇一個機票預訂平臺月腋,讓他幫我們一次性購買這兩張機票蟀架。
這就是一個典型的分布式事務的場景了,機票預訂平臺需要同時向兩家航空公司(不同DB榆骚,不同SERVER)發(fā)送下單請求片拍,要么同時成功,要么同時失敗妓肢。很明顯XA那種方案是完全不適用的捌省,總不能把人家的表資源給鎖了,誰會讓你這么干碉钠。所以我們需要的是一種業(yè)務上的手段纲缓。
好,這種業(yè)務手段其實就是TCC喊废,在第一階段中祝高,機票預訂平臺會像兩家航空公司提供的API接口發(fā)送請求,預留我們需要的機票污筷。第二階段中, 如果預留操作任意一方不成功褂策,就發(fā)送取消請求,訂票不成功。如果成功呢斤寂?發(fā)送確認請求,完成下單操作揪惦。這樣一來遍搞,就保證了預定機票這組操作要么同時成功,要么不成功器腋。中間的預留就是中間狀態(tài)溪猿,但是最終保證了數(shù)據(jù)的一致性。
你可能有這樣的問題纫塌,要是確認階段有一方失敗怎么辦诊县?首先呢,這個失敗幾率不高措左,但是對于互聯(lián)網(wǎng)公司來說依痊,即使很低的幾率對應的訂單量可能也是非常龐大的,所以需要兩方共同提供一定的機制怎披,進一步提高訂票的成功率胸嘁。比如,機票預訂平臺在收到確認失敗的消息時凉逛,可能會有一定的重試機制性宏,若重試若干次時候依然不成功,才會認為是真正的失敗状飞。航空公司則會對接口保證冪等性毫胜,對網(wǎng)絡(luò)超時失敗的情況(訂單其實已經(jīng)生成)也要有一定的處置方式。
如果還是失敗呢诬辈?BASE定理允許產(chǎn)生一定的不可用酵使,所以我們要對這種情況進行補償。通常使用日志或者MQ的方式進行補償自晰,甚至最后還是需要通過人工對賬的方式凝化。
說到這里已經(jīng)不難看出,XA是一種資源層面的分布式事務酬荞,在兩階段提交的整個過程中搓劫,它都會一致持有資源的鎖,是強一致性混巧,而TCC則是業(yè)務層面的分布式事務枪向,不會持有資源鎖,保證的是最終一致性咧党。最后再給大家提供一個實現(xiàn)了tcc的框架秘蛔,有興趣的話可以多看看,我們這里就不提供具體代碼了。
MQ事務方案
終于到了最后一種方案了深员,其實也很簡單负蠕,先看圖
服務1先向MQ發(fā)送一條中間狀態(tài)的prepare消息,此時這條消息不會被消費者收到倦畅。接著繼續(xù)執(zhí)行服務1中的業(yè)務邏輯遮糖,成功后再向MQ發(fā)送confirm消息,將這條消息從中間狀態(tài)改為可被消費者接受的狀態(tài)叠赐,消費者收到消息后執(zhí)行己方業(yè)務邏輯欲账,成功后向MQ發(fā)送ACK。
這樣同樣保證了分布式事務芭概,且因為存在中間狀態(tài)赛不,所以保證的也是最終一致性。如果消費者一方收取消息出現(xiàn)異嘲罩蓿或ack請求超時呢踢故?MQ一般都有一定的消息補發(fā)重試機制,所以要做好接口的冪等優(yōu)化奏路。如果confirm請求失敗呢畴椰?這時候消息隊列需要像服務1對應的業(yè)務發(fā)送定時消息來確認當前狀態(tài),如果已經(jīng)成功鸽粉,再修改中間狀態(tài)即可斜脂。
總結(jié)
無論哪種方案都不能十全十美的保證分布式事務,所以一定要做好補償触机≈愦粒總而言之,業(yè)界對于這一難題的解決方案都是柔性事務+補償機制儡首,強調(diào)的是最終一致性片任。要想保證強一致性又不影響性能,這就是一個世界性難題了蔬胯。不過牛人輩出对供,說不定哪一天我們就能見到這樣的方案了不是嗎?