KillBill框架是一個(gè)開源的支付和計(jì)費(fèi)SaaS平臺(tái)虽界,作為一款分布式的平臺(tái)补疑,我們今天主要是從它身上學(xué)習(xí)如何處理分布式事務(wù)問題的积蔚。
眾所周知油额,采用分布式設(shè)計(jì)之后我們需要解決幾大問題:
- 數(shù)據(jù)一致性分發(fā)問題
- 數(shù)據(jù)聚合join問題
- 分布式事務(wù)問題
解決數(shù)據(jù)一致性分發(fā)有幾種方案:
- 事務(wù)性發(fā)件箱機(jī)制(本地消息事務(wù)機(jī)制)
- CDC技術(shù)
數(shù)據(jù)庫(kù)聚合join問題依賴于數(shù)據(jù)一致性分發(fā)技術(shù)悯舟,分庫(kù)分表之后患整,數(shù)據(jù)查詢變得非常棘手眯停,要想把數(shù)據(jù)聚合起來(lái)济舆,可以采用下面幾種方式:
- 采用全局表方式,就是每個(gè)數(shù)據(jù)庫(kù)存儲(chǔ)一份需要的數(shù)據(jù)
- 業(yè)務(wù)設(shè)計(jì)時(shí)做數(shù)據(jù)冗余莺债,這是一種反設(shè)計(jì)模式滋觉,即在需要的地方冗余一份業(yè)務(wù)數(shù)據(jù)從而避免join
- 采用程序聚合的方式,可以采用的技術(shù)主要有兩種齐邦,第一種是分布式代理中間件椎侠,比如以ShardingSphere為代表,第二種是CQRS技術(shù)
而對(duì)于分布式事務(wù)的解決方案主要有:
- 2PC(只停留在理論方面)
- TCC
- Saga
在分布式事務(wù)解決方面措拇,典型的技術(shù)框架有:
- Seata我纪,支持多種模式,AT模式(加強(qiáng)版2PC)丐吓,TCC浅悉,Saga,在生產(chǎn)實(shí)踐方面主推AT模式。
- Cadence券犁,基于事務(wù)性發(fā)件箱機(jī)制實(shí)現(xiàn)术健,和KillBill類似
KillBill使用的技術(shù)介紹
KillBill的使用的技術(shù)有
KillBill主要的啟動(dòng)項(xiàng)目是profiles,對(duì)于Jersey和Guice框架的整合可以參考這篇文章族操,整合之后苛坚,我們以Accout為例進(jìn)行原理介紹比被,AccountResource為賬號(hào)服務(wù)的RESTful服務(wù)入口色难,其中有個(gè)CreateAccount方法來(lái)進(jìn)行Accout賬號(hào)創(chuàng)建,最終通過DefaultAccountDao調(diào)用Create方法進(jìn)行賬號(hào)創(chuàng)建等缀,KillBill的數(shù)據(jù)存儲(chǔ)底層采用的jdbi技術(shù)實(shí)現(xiàn)枷莉,但是對(duì)事務(wù)的封裝采用AOP技術(shù)實(shí)現(xiàn),主要邏輯在EntitySqlDaoTransactionalJdbiWrapper這個(gè)類尺迂,JDBI技術(shù)可以參考這篇文章笤妙,KillBill對(duì)JDBI進(jìn)行了擴(kuò)展冒掌,采用模板技術(shù)來(lái)生成SQL代碼,具體可以參考其commons項(xiàng)目的實(shí)現(xiàn)蹲盘。
KillBill自己開發(fā)了事務(wù)總線股毫,用于分布式事務(wù)的消息傳遞,其主要是用來(lái)執(zhí)行遠(yuǎn)程消息召衔,其主要是通過DefaultPersistentBus來(lái)實(shí)現(xiàn)事務(wù)消息的消費(fèi)铃诬。
KillBill的服務(wù)總線采用微內(nèi)核架構(gòu)實(shí)現(xiàn),其主要的實(shí)現(xiàn)邏輯在DefaultLifecycle苍凛,其主要是找到需要運(yùn)行的各種服務(wù)趣席,然后分階段調(diào)用其每個(gè)階段的方法執(zhí)行,主要邏輯在KillbillPlatformGuiceListener的startLifecycle方法
protected void startLifecycle() {
this.startLifecycleStage1();
this.killbillLifecycle.fireStartupSequencePriorEventRegistration();
this.startLifecycleStage2();
this.killbillLifecycle.fireStartupSequencePostEventRegistration();
this.startLifecycleStage3();
}
startLifecycleStage2用來(lái)服務(wù)監(jiān)聽注冊(cè)醇蝴,startLifecycleStage3啟動(dòng)服務(wù)宣肚,同樣以DefaultServerService為例,其注冊(cè)方法是registerForNotifications悠栓,啟動(dòng)方法是start
@LifecycleHandlerType(LifecycleLevel.INIT_SERVICE)
public void registerForNotifications() throws NotificationQueueAlreadyExists {
try {
bus.register(pushNotificationListener);
} catch (final EventBusException e) {
log.warn("Failed to register PushNotificationListener", e);
}
pushNotificationRetryService.initialize();
}
@LifecycleHandlerType(LifecycleLevel.START_SERVICE)
public void start() {
pushNotificationRetryService.start();
}
服務(wù)總線的主要接口是PersistentBus霉涨,我們可以看到其中不但有注冊(cè)監(jiān)聽的方法,也有發(fā)送事件的方法惭适,我們同樣以DefaultAccountDao為例嵌纲,其中的postBusEventFromTransaction方法用來(lái)向事務(wù)性發(fā)件箱發(fā)送事件。
@Override
protected void postBusEventFromTransaction(final AccountModelDao account, final AccountModelDao savedAccount, final ChangeType changeType,
final EntitySqlDaoWrapperFactory entitySqlDaoWrapperFactory, final InternalCallContext context) throws BillingExceptionBase {
// This is only called for the create call (see update below)
switch (changeType) {
case INSERT:
break;
default:
return;
}
final Long recordId = savedAccount.getRecordId();
// We need to re-hydrate the callcontext with the account record id
final InternalCallContext rehydratedContext = internalCallContextFactory.createInternalCallContext(savedAccount, recordId, context);
final AccountCreationInternalEvent creationEvent = new DefaultAccountCreationEvent(new DefaultAccountData(savedAccount), savedAccount.getId(),
rehydratedContext.getAccountRecordId(), rehydratedContext.getTenantRecordId(), rehydratedContext.getUserToken());
try {
eventBus.postFromTransaction(creationEvent, entitySqlDaoWrapperFactory.getHandle().getConnection());
} catch (final EventBusException e) {
log.warn("Failed to post account creation event for accountId='{}'", savedAccount.getId(), e);
}
}
Kill Bill基層用的AOP技術(shù)分析
底層其使用的是cglib技術(shù)腥沽,可以從SqlObject找到答案
static <T> T buildSqlObject(final Class<T> sqlObjectType, final HandleDing handle) {
Factory f;
if (factories.containsKey(sqlObjectType)) {
f = (Factory)factories.get(sqlObjectType);
} else {
Enhancer e = new Enhancer();
e.setClassLoader(sqlObjectType.getClassLoader());
List<Class> interfaces = new ArrayList();
interfaces.add(CloseInternalDoNotUseThisClass.class);
if (sqlObjectType.isInterface()) {
interfaces.add(sqlObjectType);
} else {
e.setSuperclass(sqlObjectType);
}
e.setInterfaces((Class[])interfaces.toArray(new Class[interfaces.size()]));
final SqlObject so = new SqlObject(buildHandlersFor(sqlObjectType), handle);
e.setCallback(new MethodInterceptor() {
public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable {
return so.invoke(o, method, objects, methodProxy);
}
});
T t = e.create();
T actual = factories.putIfAbsent(sqlObjectType, (Factory)t);
if (actual == null) {
return t;
}
f = (Factory)actual;
}
final SqlObject so = new SqlObject(buildHandlersFor(sqlObjectType), handle);
return f.newInstance(new MethodInterceptor() {
public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable {
return so.invoke(o, method, objects, methodProxy);
}
});
}
通過代理其底層實(shí)際的執(zhí)行類是BasicHandle逮走,接著調(diào)用的是EntitySqlDaoTransactionalJdbiWrapper,最終調(diào)用的是AccountSqlDao今阳,具體的調(diào)用過程讀者可以自己通過搭建調(diào)試環(huán)境分析师溅。
總體來(lái)說,KillBill有很多技術(shù)值得我們學(xué)習(xí)盾舌,作為一個(gè)10年的平臺(tái)墓臭,同時(shí)作為一款有多年的生產(chǎn)實(shí)踐經(jīng)驗(yàn)的框架需要讀者根據(jù)文檔和源代碼進(jìn)行更多的挖掘。