通過之前的幾篇文章我相信您已經(jīng)搭建好了運行環(huán)境善已,本次的項目實戰(zhàn)是依照happylifeplat-tcc-demo項目來演練,也是非常經(jīng)典的分布式事務(wù)場景:支付成功渴频,進行訂單狀態(tài)的更新,扣除用戶賬戶,庫存扣減這幾個模塊來進行tcc分布式事務(wù)冶忱。話不多說,讓我們一起進入體驗吧!
- 首先我們找到 PaymentServiceImpl.makePayment 方法囚枪,這是tcc分布式事務(wù)的發(fā)起者
//tcc分布式事務(wù)的發(fā)起者
@Tcc(confirmMethod = "confirmOrderStatus", cancelMethod = "cancelOrderStatus")
public void makePayment(Order order) {
order.setStatus(OrderStatusEnum.PAYING.getCode());
orderMapper.update(order);
//扣除用戶余額 遠端的rpc調(diào)用
AccountDTO accountDTO = new AccountDTO();
accountDTO.setAmount(order.getTotalAmount());
accountDTO.setUserId(order.getUserId());
accountService.payment(accountDTO);
//進入扣減庫存操作 遠端的rpc調(diào)用
InventoryDTO inventoryDTO = new InventoryDTO();
inventoryDTO.setCount(order.getCount());
inventoryDTO.setProductId(order.getProductId());
inventoryService.decrease(inventoryDTO);
}
//更新訂單的confirm方法
public void confirmOrderStatus(Order order) {
order.setStatus(OrderStatusEnum.PAY_SUCCESS.getCode());
orderMapper.update(order);
LOGGER.info("=========進行訂單confirm操作完成================");
}
//更新訂單的cancel方法
public void cancelOrderStatus(Order order) {
order.setStatus(OrderStatusEnum.PAY_FAIL.getCode());
orderMapper.update(order);
LOGGER.info("=========進行訂單cancel操作完成================");
}
makePayment 方法是tcc分布式事務(wù)的發(fā)起者派诬,它里面有更新訂單狀態(tài)(本地),進行庫存扣減(rpc扣減)链沼,資金扣減(rpc調(diào)用)
confirmOrderStatus 為訂單狀態(tài)confirm方法默赂,cancelOrderStatus 為訂單狀態(tài)cancel方法。
我們重點關(guān)注 @Tcc(confirmMethod = "confirmOrderStatus", cancelMethod = "cancelOrderStatus") 這個注解括勺,為什么加了這個注解就有神奇的作用缆八。
@Tcc注解切面
- 我們找到在happylifeplat-tcc-core包中找到
TccTransactionAspect,這里定義@Tcc的切點疾捍。
@Aspect
public abstract class TccTransactionAspect {
private TccTransactionInterceptor tccTransactionInterceptor;
public void setTccTransactionInterceptor(TccTransactionInterceptor tccTransactionInterceptor) {
this.tccTransactionInterceptor = tccTransactionInterceptor;
}
@Pointcut("@annotation(com.happylifeplat.tcc.annotation.Tcc)")
public void txTransactionInterceptor() {
}
@Around("txTransactionInterceptor()")
public Object interceptCompensableMethod(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
return tccTransactionInterceptor.interceptor(proceedingJoinPoint);
}
public abstract int getOrder();
}
我們可以知道Spring實現(xiàn)類的方法凡是加了@Tcc注解的奈辰,在調(diào)用的時候,都會進行 tccTransactionInterceptor.interceptor 調(diào)用乱豆。
其次我們發(fā)現(xiàn)該類是一個抽象類奖恰,肯定會有其他類繼承它,你猜的沒錯宛裕,對應(yīng)dubbo用戶瑟啃,他的繼承類為:
@Aspect
@Component
public class DubboTccTransactionAspect extends TccTransactionAspect implements Ordered {
@Autowired
public DubboTccTransactionAspect(DubboTccTransactionInterceptor dubboTccTransactionInterceptor) {
super.setTccTransactionInterceptor(dubboTccTransactionInterceptor);
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
}
- springcloud的用戶,它的繼承類為:
@Aspect
@Component
public class SpringCloudTxTransactionAspect extends TccTransactionAspect implements Ordered {
@Autowired
public SpringCloudTxTransactionAspect(SpringCloudTxTransactionInterceptor springCloudTxTransactionInterceptor) {
this.setTccTransactionInterceptor(springCloudTxTransactionInterceptor);
}
public void init() {
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
}
- 我們注意到他們都實現(xiàn)了Spring的Ordered接口,并重寫了 getOrder 方法揩尸,都返回了 Ordered.HIGHEST_PRECEDENCE 那么可以知道蛹屿,他是優(yōu)先級最高的切面。
TccCoordinatorMethodAspect 切面
@Aspect
@Component
public class TccCoordinatorMethodAspect implements Ordered {
private final TccCoordinatorMethodInterceptor tccCoordinatorMethodInterceptor;
@Autowired
public TccCoordinatorMethodAspect(TccCoordinatorMethodInterceptor tccCoordinatorMethodInterceptor) {
this.tccCoordinatorMethodInterceptor = tccCoordinatorMethodInterceptor;
}
@Pointcut("@annotation(com.happylifeplat.tcc.annotation.Tcc)")
public void coordinatorMethod() {
}
@Around("coordinatorMethod()")
public Object interceptCompensableMethod(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
return tccCoordinatorMethodInterceptor.interceptor(proceedingJoinPoint);
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE + 1;
}
}
該切面是第二優(yōu)先級的切面岩榆,意思就是當我們對有@Tcc注解的方法發(fā)起調(diào)用的時候错负,首先會進入 TccTransactionAspect 切面,然后再進入 TccCoordinatorMethodAspect 切面朗恳。
該切面主要是用來獲取@Tcc注解上的元數(shù)據(jù)湿颅,比如confrim方法名稱等等。到這里如果大家基本能明白的話粥诫,差不多就了解了整個框架原理油航,現(xiàn)在讓我們來跟蹤調(diào)用吧。
現(xiàn)在我們回過頭怀浆,我們來調(diào)用 PaymentServiceImpl.makePayment 方法
- dubbo用戶谊囚,我們會進入如下攔截器:
@Component
public class DubboTccTransactionInterceptor implements TccTransactionInterceptor {
private final TccTransactionAspectService tccTransactionAspectService;
@Autowired
public DubboTccTransactionInterceptor(TccTransactionAspectService tccTransactionAspectService) {
this.tccTransactionAspectService = tccTransactionAspectService;
}
@Override
public Object interceptor(ProceedingJoinPoint pjp) throws Throwable {
final String context = RpcContext.getContext().getAttachment(Constant.TCC_TRANSACTION_CONTEXT);
TccTransactionContext tccTransactionContext = null;
if (StringUtils.isNoneBlank(context)) {
tccTransactionContext =
GsonUtils.getInstance().fromJson(context, TccTransactionContext.class);
}
return tccTransactionAspectService.invoke(tccTransactionContext, pjp);
}
}
- 我們繼續(xù)跟蹤 tccTransactionAspectService.invoke(tccTransactionContext, pjp) ,發(fā)現(xiàn)通過判斷過后,我們會進入 StartTccTransactionHandler.handler 方法,我們已經(jīng)進入了分布式事務(wù)處理的入口了执赡!
/**
* 分布式事務(wù)處理接口
*
* @param point point 切點
* @param context 信息
* @return Object
* @throws Throwable 異常
*/
@Override
public Object handler(ProceedingJoinPoint point, TccTransactionContext context) throws Throwable {
Object returnValue;
try {
//開啟分布式事務(wù)
tccTransactionManager.begin();
try {
//發(fā)起調(diào)用 執(zhí)行try方法
returnValue = point.proceed();
} catch (Throwable throwable) {
//異常執(zhí)行cancel
tccTransactionManager.cancel();
throw throwable;
}
//try成功執(zhí)行confirm confirm 失敗的話镰踏,那就只能走本地補償
tccTransactionManager.confirm();
} finally {
tccTransactionManager.remove();
}
return returnValue;
}
- 我們進行跟進 tccTransactionManager.begin() 方法:
/**
* 該方法為發(fā)起方第一次調(diào)用
* 也是tcc事務(wù)的入口
*/
void begin() {
LogUtil.debug(LOGGER, () -> "開始執(zhí)行tcc事務(wù)!start");
TccTransaction tccTransaction = CURRENT.get();
if (Objects.isNull(tccTransaction)) {
tccTransaction = new TccTransaction();
tccTransaction.setStatus(TccActionEnum.TRYING.getCode());
tccTransaction.setRole(TccRoleEnum.START.getCode());
}
//保存當前事務(wù)信息
coordinatorCommand.execute(new CoordinatorAction(CoordinatorActionEnum.SAVE, tccTransaction));
CURRENT.set(tccTransaction);
//設(shè)置tcc事務(wù)上下文沙合,這個類會傳遞給遠端
TccTransactionContext context = new TccTransactionContext();
context.setAction(TccActionEnum.TRYING.getCode());//設(shè)置執(zhí)行動作為try
context.setTransId(tccTransaction.getTransId());//設(shè)置事務(wù)id
TransactionContextLocal.getInstance().set(context);
}
這里我們保存了事務(wù)信息奠伪,并且開啟了事務(wù)上下文,并把它保存在了ThreadLoacl里面,大家想想這里為什么一定要保存在ThreadLocal里面绊率。
begin方法執(zhí)行完后谨敛,我們回到切面,現(xiàn)在我們來執(zhí)行 point.proceed()滤否,當執(zhí)行這一句代碼的時候脸狸,會進入第二個切面,即進入了 TccCoordinatorMethodInterceptor
public Object interceptor(ProceedingJoinPoint pjp) throws Throwable {
final TccTransaction currentTransaction = tccTransactionManager.getCurrentTransaction();
if (Objects.nonNull(currentTransaction)) {
final TccActionEnum action = TccActionEnum.acquireByCode(currentTransaction.getStatus());
switch (action) {
case TRYING:
registerParticipant(pjp, currentTransaction.getTransId());
break;
case CONFIRMING:
break;
case CANCELING:
break;
}
}
return pjp.proceed(pjp.getArgs());
}
- 這里由于是在try階段藐俺,直接就進入了 registerParticipant 方法
private void registerParticipant(ProceedingJoinPoint point, String transId) throws NoSuchMethodException {
MethodSignature signature = (MethodSignature) point.getSignature();
Method method = signature.getMethod();
Class<?> clazz = point.getTarget().getClass();
Object[] args = point.getArgs();
final Tcc tcc = method.getAnnotation(Tcc.class);
//獲取協(xié)調(diào)方法
String confirmMethodName = tcc.confirmMethod();
/* if (StringUtils.isBlank(confirmMethodName)) {
confirmMethodName = method.getName();
}*/
String cancelMethodName = tcc.cancelMethod();
/* if (StringUtils.isBlank(cancelMethodName)) {
cancelMethodName = method.getName();
}
*/
//設(shè)置模式
final TccPatternEnum pattern = tcc.pattern();
tccTransactionManager.getCurrentTransaction().setPattern(pattern.getCode());
TccInvocation confirmInvocation = null;
if (StringUtils.isNoneBlank(confirmMethodName)) {
confirmInvocation = new TccInvocation(clazz,
confirmMethodName, method.getParameterTypes(), args);
}
TccInvocation cancelInvocation = null;
if (StringUtils.isNoneBlank(cancelMethodName)) {
cancelInvocation = new TccInvocation(clazz,
cancelMethodName,
method.getParameterTypes(), args);
}
//封裝調(diào)用點
final Participant participant = new Participant(
transId,
confirmInvocation,
cancelInvocation);
tccTransactionManager.enlistParticipant(participant);
}
- 這里就獲取了 @Tcc(confirmMethod = "confirmOrderStatus", cancelMethod = "cancelOrderStatus")
的信息炊甲,并把他封裝成了 Participant,存起來欲芹,然后真正的調(diào)用 PaymentServiceImpl.makePayment 業(yè)務(wù)方法卿啡,在業(yè)務(wù)方法里面,我們首先執(zhí)行的是更新訂單狀態(tài)(相信現(xiàn)在你還記得0.0):
order.setStatus(OrderStatusEnum.PAYING.getCode());
orderMapper.update(order);
- 接下來耀石,我們進行扣除用戶余額,注意扣除余額這里是一個rpc方法:
AccountDTO accountDTO = new AccountDTO();
accountDTO.setAmount(order.getTotalAmount());
accountDTO.setUserId(order.getUserId());
accountService.payment(accountDTO)
- 現(xiàn)在我們來關(guān)注 accountService.payment(accountDTO) 牵囤,這個接口的定義爸黄。
dubbo接口:
public interface AccountService {
/**
* 扣款支付
*
* @param accountDTO 參數(shù)dto
* @return true
*/
@Tcc
boolean payment(AccountDTO accountDTO);
}
springcloud接口
@FeignClient(value = "account-service", configuration = MyConfiguration.class)
public interface AccountClient {
@PostMapping("/account-service/account/payment")
@Tcc
Boolean payment(@RequestBody AccountDTO accountDO);
}
很明顯這里我們都在接口上加了@Tcc注解滞伟,我們知道springAop的特性,在接口上加注解炕贵,是無法進入切面的梆奈,所以我們在這里,要采用rpc框架的某些特性來幫助我們獲取到 @Tcc注解信息称开。 這一步很重要亩钟。當我們發(fā)起
accountService.payment(accountDTO) 調(diào)用的時候:
- dubbo用戶,會走dubbo的filter接口,TccTransactionFilter:
private void registerParticipant(Class clazz, String methodName, Object[] arguments, Class... args) throws TccRuntimeException {
try {
Method method = clazz.getDeclaredMethod(methodName, args);
Tcc tcc = method.getAnnotation(Tcc.class);
if (Objects.nonNull(tcc)) {
//獲取事務(wù)的上下文
final TccTransactionContext tccTransactionContext =
TransactionContextLocal.getInstance().get();
if (Objects.nonNull(tccTransactionContext)) {
//dubbo rpc傳參數(shù)
RpcContext.getContext()
.setAttachment(Constant.TCC_TRANSACTION_CONTEXT,
GsonUtils.getInstance().toJson(tccTransactionContext));
}
if (Objects.nonNull(tccTransactionContext)) {
if(TccActionEnum.TRYING.getCode()==tccTransactionContext.getAction()){
//獲取協(xié)調(diào)方法
String confirmMethodName = tcc.confirmMethod();
if (StringUtils.isBlank(confirmMethodName)) {
confirmMethodName = method.getName();
}
String cancelMethodName = tcc.cancelMethod();
if (StringUtils.isBlank(cancelMethodName)) {
cancelMethodName = method.getName();
}
//設(shè)置模式
final TccPatternEnum pattern = tcc.pattern();
tccTransactionManager.getCurrentTransaction().setPattern(pattern.getCode());
TccInvocation confirmInvocation = new TccInvocation(clazz,
confirmMethodName,
args, arguments);
TccInvocation cancelInvocation = new TccInvocation(clazz,
cancelMethodName,
args, arguments);
//封裝調(diào)用點
final Participant participant = new Participant(
tccTransactionContext.getTransId(),
confirmInvocation,
cancelInvocation);
tccTransactionManager.enlistParticipant(participant);
}
}
}
} catch (NoSuchMethodException e) {
throw new TccRuntimeException("not fount method " + e.getMessage());
}
}
- springcloud 用戶鳖轰,則會進入 TccFeignHandler :
public class TccFeignHandler implements InvocationHandler {
/**
* logger
*/
private static final Logger LOGGER = LoggerFactory.getLogger(TccFeignHandler.class);
private Target<?> target;
private Map<Method, MethodHandler> handlers;
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (Object.class.equals(method.getDeclaringClass())) {
return method.invoke(this, args);
} else {
final Tcc tcc = method.getAnnotation(Tcc.class);
if (Objects.isNull(tcc)) {
return this.handlers.get(method).invoke(args);
}
final TccTransactionContext tccTransactionContext =
TransactionContextLocal.getInstance().get();
if (Objects.nonNull(tccTransactionContext)) {
final TccTransactionManager tccTransactionManager =
SpringBeanUtils.getInstance().getBean(TccTransactionManager.class);
if (TccActionEnum.TRYING.getCode() == tccTransactionContext.getAction()) {
//獲取協(xié)調(diào)方法
String confirmMethodName = tcc.confirmMethod();
if (StringUtils.isBlank(confirmMethodName)) {
confirmMethodName = method.getName();
}
String cancelMethodName = tcc.cancelMethod();
if (StringUtils.isBlank(cancelMethodName)) {
cancelMethodName = method.getName();
}
//設(shè)置模式
final TccPatternEnum pattern = tcc.pattern();
tccTransactionManager.getCurrentTransaction().setPattern(pattern.getCode());
final Class<?> declaringClass = method.getDeclaringClass();
TccInvocation confirmInvocation = new TccInvocation(declaringClass,
confirmMethodName,
method.getParameterTypes(), args);
TccInvocation cancelInvocation = new TccInvocation(declaringClass,
cancelMethodName,
method.getParameterTypes(), args);
//封裝調(diào)用點
final Participant participant = new Participant(
tccTransactionContext.getTransId(),
confirmInvocation,
cancelInvocation);
tccTransactionManager.enlistParticipant(participant);
}
}
return this.handlers.get(method).invoke(args);
}
}
public void setTarget(Target<?> target) {
this.target = target;
}
public void setHandlers(Map<Method, MethodHandler> handlers) {
this.handlers = handlers;
}
}