不同于單一架構(gòu)應(yīng)用(Monolith), 分布式環(huán)境下, 進(jìn)行事務(wù)操作將變得困難, 因?yàn)榉植际江h(huán)境通常會(huì)有多個(gè)數(shù)據(jù)源, 只用本地?cái)?shù)據(jù)庫(kù)事務(wù)難以保證多個(gè)數(shù)據(jù)源數(shù)據(jù)的一致性. 這種情況下, 可以使用兩階段或者三階段提交協(xié)議來(lái)完成分布式事務(wù).但是使用這種方式一般來(lái)說(shuō)性能較差, 因?yàn)槭聞?wù)管理器需要在多個(gè)數(shù)據(jù)源之間進(jìn)行多次等待. 有一種方法同樣可以解決分布式事務(wù)問(wèn)題, 并且性能較好, 這就是我這篇文章要介紹的使用事件,本地事務(wù)以及消息隊(duì)列來(lái)實(shí)現(xiàn)分布式事務(wù).
我們從一個(gè)簡(jiǎn)單的實(shí)例入手. 基本所有互聯(lián)網(wǎng)應(yīng)用都會(huì)有用戶注冊(cè)的功能. 在這個(gè)例子中, 我們對(duì)于用戶注冊(cè)有兩步操作:
- 注冊(cè)成功, 保存用戶信息.
- 需要給用戶發(fā)放一張代金券, 目的是鼓勵(lì)用戶進(jìn)行消費(fèi).
如果是一個(gè)單一架構(gòu)應(yīng)用, 實(shí)現(xiàn)這個(gè)功能非常簡(jiǎn)單: 在一個(gè)本地事務(wù)里, 往用戶表插一條記錄, 并且在代金券表里插一條記錄, 提交事務(wù)就完成了. 但是如果我們的應(yīng)用是用微服務(wù)實(shí)現(xiàn)的, 可能用戶和代金券是兩個(gè)獨(dú)立的服務(wù), 他們有各自的應(yīng)用和數(shù)據(jù)庫(kù), 那么就沒(méi)有辦法簡(jiǎn)單的使用本地事務(wù)來(lái)保證操作的原子性了. 現(xiàn)在來(lái)看看如何使用事件機(jī)制和消息隊(duì)列來(lái)實(shí)現(xiàn)這個(gè)需求.(我在這里使用的消息隊(duì)列是kafka, 原理同樣適用于ActiveMQ/RabbitMQ等其他隊(duì)列)
我們會(huì)為用戶注冊(cè)這個(gè)操作創(chuàng)建一個(gè)事件, 該事件就叫做用戶創(chuàng)建事件(USER_CREATED). 用戶服務(wù)成功保存用戶記錄后, 會(huì)發(fā)送用戶創(chuàng)建事件到消息隊(duì)列, 代金券服務(wù)會(huì)監(jiān)聽(tīng)用戶創(chuàng)建事件, 一旦接收到該事件, 代金券服務(wù)就會(huì)在自己的數(shù)據(jù)庫(kù)中為該用戶創(chuàng)建一張代金券. 好了, 這些步驟看起來(lái)都相當(dāng)?shù)暮?jiǎn)單直觀, 但是怎么保證事務(wù)的原子性呢? 考慮下面這兩個(gè)場(chǎng)景:
- 用戶服務(wù)在保存用戶記錄, 還沒(méi)來(lái)得及向消息隊(duì)列發(fā)送消息之前就宕機(jī)了. 怎么保證用戶創(chuàng)建事件一定發(fā)送到消息隊(duì)列了?
- 代金券服務(wù)接收到用戶創(chuàng)建事件, 還沒(méi)來(lái)得及處理事件就宕機(jī)了. 重新啟動(dòng)之后如何消費(fèi)之前的用戶創(chuàng)建事件?
這兩個(gè)問(wèn)題的本質(zhì)是: 如何讓操作數(shù)據(jù)庫(kù)和操作消息隊(duì)列這兩個(gè)操作成為一個(gè)原子操作. 不考慮2PC, 這里我們可以通過(guò)事件表來(lái)解決這個(gè)問(wèn)題. 下面是類(lèi)圖.
EventPublish是記錄待發(fā)布事件的表. 其中:
- id: 每個(gè)事件在創(chuàng)建的時(shí)候都會(huì)生成一個(gè)全局唯一ID, 例如UUID.
- status: 事件狀態(tài), 枚舉類(lèi)型. 現(xiàn)在只有兩個(gè)狀態(tài): 待發(fā)布(NEW), 已發(fā)布(PUBLISHED).
- payload: 事件內(nèi)容. 這里我們會(huì)將事件內(nèi)容轉(zhuǎn)成json存到這個(gè)字段里.
- eventType: 事件類(lèi)型, 枚舉類(lèi)型. 每個(gè)事件都會(huì)有一個(gè)類(lèi)型, 比如我們之前提到的創(chuàng)建用戶USER_CREATED就是一個(gè)事件類(lèi)型.
EventProcess是用來(lái)記錄待處理的事件. 字段與EventPublish基本相同.
我們首先看看事件的發(fā)布過(guò)程. 下面是用戶服務(wù)發(fā)布用戶創(chuàng)建事件的順序圖.
- 用戶服務(wù)在接收到用戶請(qǐng)求后開(kāi)啟事務(wù), 在用戶表創(chuàng)建一條用戶記錄, 并且在EventPublish表創(chuàng)建一條status為NEW的記錄, payload記錄的是事件內(nèi)容, 提交事務(wù).
- 用戶服務(wù)中的定時(shí)器首先開(kāi)啟事務(wù), 然后查詢(xún)EventPublish是否有status為NEW的記錄, 查詢(xún)到記錄之后, 拿到payload信息, 將消息發(fā)布到kafka中對(duì)應(yīng)的topic.發(fā)送成功之后, 修改數(shù)據(jù)庫(kù)中EventPublish的status為PUBLISHED, 提交事務(wù).
下面是代金券服務(wù)處理用戶創(chuàng)建事件的順序圖.
- 代金券服務(wù)接收到kafka傳來(lái)的用戶創(chuàng)建事件(實(shí)際上是代金券服務(wù)主動(dòng)拉取的消息, 先忽略消息隊(duì)列的實(shí)現(xiàn)), 在EventProcess表創(chuàng)建一條status為NEW的記錄, payload記錄的是事件內(nèi)容, 如果保存成功, 向kafka返回接收成功的消息.
- 代金券服務(wù)中的定時(shí)器首先開(kāi)啟事務(wù), 然后查詢(xún)EventProcess是否有status為NEW的記錄, 查詢(xún)到記錄之后, 拿到payload信息, 交給事件回調(diào)處理器處理, 這里是直接創(chuàng)建代金券記錄. 處理成功之后修改數(shù)據(jù)庫(kù)中EventProcess的status為PROCESSED, 最后提交事務(wù).
回過(guò)頭來(lái)看我們之前提出的兩個(gè)問(wèn)題:
- 用戶服務(wù)在保存用戶記錄, 還沒(méi)來(lái)得及向消息隊(duì)列發(fā)送消息之前就宕機(jī)了. 怎么保證用戶創(chuàng)建事件一定發(fā)送到消息隊(duì)列了?
根據(jù)事件發(fā)布的順序圖, 我們把創(chuàng)建事件和發(fā)布事件分成了兩步操作. 如果事件創(chuàng)建成功, 但是在發(fā)布的時(shí)候宕機(jī)了. 啟動(dòng)之后定時(shí)器會(huì)重新對(duì)之前沒(méi)有發(fā)布成功的事件進(jìn)行發(fā)布. 如果事件在創(chuàng)建的時(shí)候就宕機(jī)了, 因?yàn)槭录?chuàng)建和業(yè)務(wù)操作在一個(gè)數(shù)據(jù)庫(kù)事務(wù)里, 所以對(duì)應(yīng)的業(yè)務(wù)操作也失敗了, 數(shù)據(jù)庫(kù)狀態(tài)的一致性得到了保證. - 代金券服務(wù)接收到用戶創(chuàng)建事件, 還沒(méi)來(lái)得及處理事件就宕機(jī)了. 重新啟動(dòng)之后如何消費(fèi)之前的用戶創(chuàng)建事件?
根據(jù)事件處理的順序圖, 我們把接收事件和處理事件分成了兩步操作. 如果事件接收成功, 但是在處理的時(shí)候宕機(jī)了. 啟動(dòng)之后定時(shí)器會(huì)重新對(duì)之前沒(méi)有處理成功的事件進(jìn)行處理. 如果事件在接收的時(shí)候就宕機(jī)了, kafka會(huì)重新將事件發(fā)送給對(duì)應(yīng)服務(wù).
通過(guò)這種方式, 我們不用2PC, 也保證了多個(gè)數(shù)據(jù)源之間狀態(tài)的最終一致性. 和2PC/3PC這種同步事務(wù)處理的方式相比, 這種異步事務(wù)處理方式具有異步系統(tǒng)通常都有的優(yōu)點(diǎn):
- 事務(wù)吞吐量大. 因?yàn)椴恍枰却渌麛?shù)據(jù)源響應(yīng).
- 容錯(cuò)性好. A服務(wù)在發(fā)布事件的時(shí)候, B服務(wù)甚至可以不在線.
缺點(diǎn):
- 編程與調(diào)試較復(fù)雜.
- 容易出現(xiàn)較多的中間狀態(tài). 比如上面的例子, 在用戶服務(wù)已經(jīng)保存了用戶并發(fā)布了事件, 但是代金券服務(wù)還沒(méi)來(lái)得及處理之前, 用戶如果登錄系統(tǒng), 會(huì)發(fā)現(xiàn)自己是沒(méi)有代金券的. 這種情況可能在有些業(yè)務(wù)中是能夠容忍的, 但是有些業(yè)務(wù)卻不行. 所以開(kāi)發(fā)之前要考慮好.
另外, 上面的流程在實(shí)現(xiàn)的過(guò)程中還有一些可以改進(jìn)的地方:
- 定時(shí)器在更新EventPublish狀態(tài)為PUBLISHED的時(shí)候, 可以一次批量更新多個(gè)EventProcess的狀態(tài).
- 定時(shí)器查詢(xún)EventProcess并交給事件回調(diào)處理器處理的時(shí)候, 可以使用線程池異步處理, 加快EventProcess處理周期.
- 在保存EventPublish和EventProcess的時(shí)候同時(shí)保存到Redis, 之后的操作可以對(duì)Redis中的數(shù)據(jù)進(jìn)行, 但是要小心處理緩存和數(shù)據(jù)庫(kù)可能狀態(tài)不一致問(wèn)題.
- 針對(duì)Kafka, 因?yàn)镵afka的特點(diǎn)是可能重發(fā)消息, 所以在接收事件并且保存到EventProcess的時(shí)候可能報(bào)主鍵沖突的錯(cuò)誤(因?yàn)橹貜?fù)消息id是相同的), 這個(gè)時(shí)候可以直接丟棄該消息.