tcc分布式事務(wù)源碼解析系列(四)之項目實戰(zhàn)

通過之前的幾篇文章我相信您已經(jīng)搭建好了運行環(huán)境善已,本次的項目實戰(zhàn)是依照happylifeplat-tcc-demo項目來演練,也是非常經(jīng)典的分布式事務(wù)場景:支付成功渴频,進行訂單狀態(tài)的更新,扣除用戶賬戶,庫存扣減這幾個模塊來進行tcc分布式事務(wù)冶忱。話不多說,讓我們一起進入體驗吧!

  • 首先我們找到 PaymentServiceImpl.makePayment 方法囚枪,這是tcc分布式事務(wù)的發(fā)起者
   //tcc分布式事務(wù)的發(fā)起者
   @Tcc(confirmMethod = "confirmOrderStatus", cancelMethod = "cancelOrderStatus")
   public void makePayment(Order order) {
       order.setStatus(OrderStatusEnum.PAYING.getCode());
       orderMapper.update(order);
       //扣除用戶余額 遠端的rpc調(diào)用
       AccountDTO accountDTO = new AccountDTO();
       accountDTO.setAmount(order.getTotalAmount());
       accountDTO.setUserId(order.getUserId());
       accountService.payment(accountDTO);
       //進入扣減庫存操作 遠端的rpc調(diào)用
       InventoryDTO inventoryDTO = new InventoryDTO();
       inventoryDTO.setCount(order.getCount());
       inventoryDTO.setProductId(order.getProductId());
       inventoryService.decrease(inventoryDTO);
   }
   //更新訂單的confirm方法
   public void confirmOrderStatus(Order order) {
        order.setStatus(OrderStatusEnum.PAY_SUCCESS.getCode());
        orderMapper.update(order);
        LOGGER.info("=========進行訂單confirm操作完成================");


    }
    //更新訂單的cancel方法
    public void cancelOrderStatus(Order order) {
        order.setStatus(OrderStatusEnum.PAY_FAIL.getCode());
        orderMapper.update(order);
        LOGGER.info("=========進行訂單cancel操作完成================");
    }

  • makePayment 方法是tcc分布式事務(wù)的發(fā)起者派诬,它里面有更新訂單狀態(tài)(本地),進行庫存扣減(rpc扣減)链沼,資金扣減(rpc調(diào)用)

  • confirmOrderStatus 為訂單狀態(tài)confirm方法默赂,cancelOrderStatus 為訂單狀態(tài)cancel方法。

  • 我們重點關(guān)注 @Tcc(confirmMethod = "confirmOrderStatus", cancelMethod = "cancelOrderStatus") 這個注解括勺,為什么加了這個注解就有神奇的作用缆八。

@Tcc注解切面

@Aspect
public abstract class TccTransactionAspect {

    private TccTransactionInterceptor tccTransactionInterceptor;

    public void setTccTransactionInterceptor(TccTransactionInterceptor tccTransactionInterceptor) {
        this.tccTransactionInterceptor = tccTransactionInterceptor;
    }

    @Pointcut("@annotation(com.happylifeplat.tcc.annotation.Tcc)")
    public void txTransactionInterceptor() {

    }

    @Around("txTransactionInterceptor()")
    public Object interceptCompensableMethod(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
        return tccTransactionInterceptor.interceptor(proceedingJoinPoint);
    }

    public abstract int getOrder();
}

  • 我們可以知道Spring實現(xiàn)類的方法凡是加了@Tcc注解的奈辰,在調(diào)用的時候,都會進行 tccTransactionInterceptor.interceptor 調(diào)用乱豆。

  • 其次我們發(fā)現(xiàn)該類是一個抽象類奖恰,肯定會有其他類繼承它,你猜的沒錯宛裕,對應(yīng)dubbo用戶瑟啃,他的繼承類為:

@Aspect
@Component
public class DubboTccTransactionAspect extends TccTransactionAspect implements Ordered {


    @Autowired
    public DubboTccTransactionAspect(DubboTccTransactionInterceptor dubboTccTransactionInterceptor) {
        super.setTccTransactionInterceptor(dubboTccTransactionInterceptor);
    }


    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE;
    }
}

  • springcloud的用戶,它的繼承類為:
@Aspect
@Component
public class SpringCloudTxTransactionAspect extends TccTransactionAspect implements Ordered {


    @Autowired
    public SpringCloudTxTransactionAspect(SpringCloudTxTransactionInterceptor  springCloudTxTransactionInterceptor) {
        this.setTccTransactionInterceptor(springCloudTxTransactionInterceptor);
    }


    public void init() {

    }

    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE;
    }
}

  • 我們注意到他們都實現(xiàn)了Spring的Ordered接口,并重寫了 getOrder 方法揩尸,都返回了 Ordered.HIGHEST_PRECEDENCE 那么可以知道蛹屿,他是優(yōu)先級最高的切面。

TccCoordinatorMethodAspect 切面

@Aspect
@Component
public class TccCoordinatorMethodAspect implements Ordered {


    private final TccCoordinatorMethodInterceptor tccCoordinatorMethodInterceptor;

    @Autowired
    public TccCoordinatorMethodAspect(TccCoordinatorMethodInterceptor tccCoordinatorMethodInterceptor) {
        this.tccCoordinatorMethodInterceptor = tccCoordinatorMethodInterceptor;
    }


    @Pointcut("@annotation(com.happylifeplat.tcc.annotation.Tcc)")
    public void coordinatorMethod() {

    }

    @Around("coordinatorMethod()")
    public Object interceptCompensableMethod(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
        return tccCoordinatorMethodInterceptor.interceptor(proceedingJoinPoint);
    }


    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE + 1;
    }
}

  • 該切面是第二優(yōu)先級的切面岩榆,意思就是當我們對有@Tcc注解的方法發(fā)起調(diào)用的時候错负,首先會進入 TccTransactionAspect 切面,然后再進入 TccCoordinatorMethodAspect 切面朗恳。
    該切面主要是用來獲取@Tcc注解上的元數(shù)據(jù)湿颅,比如confrim方法名稱等等。

  • 到這里如果大家基本能明白的話粥诫,差不多就了解了整個框架原理油航,現(xiàn)在讓我們來跟蹤調(diào)用吧。

現(xiàn)在我們回過頭怀浆,我們來調(diào)用 PaymentServiceImpl.makePayment 方法

  • dubbo用戶谊囚,我們會進入如下攔截器:

@Component
public class DubboTccTransactionInterceptor implements TccTransactionInterceptor {

    private final TccTransactionAspectService tccTransactionAspectService;

    @Autowired
    public DubboTccTransactionInterceptor(TccTransactionAspectService tccTransactionAspectService) {
        this.tccTransactionAspectService = tccTransactionAspectService;
    }


    @Override
    public Object interceptor(ProceedingJoinPoint pjp) throws Throwable {
        final String context = RpcContext.getContext().getAttachment(Constant.TCC_TRANSACTION_CONTEXT);
        TccTransactionContext tccTransactionContext = null;
        if (StringUtils.isNoneBlank(context)) {
            tccTransactionContext =
                    GsonUtils.getInstance().fromJson(context, TccTransactionContext.class);
        }
        return tccTransactionAspectService.invoke(tccTransactionContext, pjp);
    }
}

  • 我們繼續(xù)跟蹤 tccTransactionAspectService.invoke(tccTransactionContext, pjp) ,發(fā)現(xiàn)通過判斷過后,我們會進入 StartTccTransactionHandler.handler 方法,我們已經(jīng)進入了分布式事務(wù)處理的入口了执赡!
/**
    * 分布式事務(wù)處理接口
    *
    * @param point   point 切點
    * @param context 信息
    * @return Object
    * @throws Throwable 異常
    */
   @Override
   public Object handler(ProceedingJoinPoint point, TccTransactionContext context) throws Throwable {
       Object returnValue;
       try {
          //開啟分布式事務(wù)
           tccTransactionManager.begin();
           try {
               //發(fā)起調(diào)用 執(zhí)行try方法
               returnValue = point.proceed();

           } catch (Throwable throwable) {
               //異常執(zhí)行cancel

               tccTransactionManager.cancel();

               throw throwable;
           }
           //try成功執(zhí)行confirm confirm 失敗的話镰踏,那就只能走本地補償
           tccTransactionManager.confirm();
       } finally {
           tccTransactionManager.remove();
       }
       return returnValue;
   }

  • 我們進行跟進 tccTransactionManager.begin() 方法:
/**
    * 該方法為發(fā)起方第一次調(diào)用
    * 也是tcc事務(wù)的入口
    */
   void begin() {
       LogUtil.debug(LOGGER, () -> "開始執(zhí)行tcc事務(wù)!start");
       TccTransaction tccTransaction = CURRENT.get();
       if (Objects.isNull(tccTransaction)) {
           tccTransaction = new TccTransaction();
           tccTransaction.setStatus(TccActionEnum.TRYING.getCode());
           tccTransaction.setRole(TccRoleEnum.START.getCode());
       }
       //保存當前事務(wù)信息
       coordinatorCommand.execute(new CoordinatorAction(CoordinatorActionEnum.SAVE, tccTransaction));

       CURRENT.set(tccTransaction);

       //設(shè)置tcc事務(wù)上下文沙合,這個類會傳遞給遠端
       TccTransactionContext context = new TccTransactionContext();
       context.setAction(TccActionEnum.TRYING.getCode());//設(shè)置執(zhí)行動作為try
       context.setTransId(tccTransaction.getTransId());//設(shè)置事務(wù)id
       TransactionContextLocal.getInstance().set(context);

   }
  • 這里我們保存了事務(wù)信息奠伪,并且開啟了事務(wù)上下文,并把它保存在了ThreadLoacl里面,大家想想這里為什么一定要保存在ThreadLocal里面绊率。

  • begin方法執(zhí)行完后谨敛,我們回到切面,現(xiàn)在我們來執(zhí)行 point.proceed()滤否,當執(zhí)行這一句代碼的時候脸狸,會進入第二個切面,即進入了 TccCoordinatorMethodInterceptor


public Object interceptor(ProceedingJoinPoint pjp) throws Throwable {

       final TccTransaction currentTransaction = tccTransactionManager.getCurrentTransaction();

       if (Objects.nonNull(currentTransaction)) {
           final TccActionEnum action = TccActionEnum.acquireByCode(currentTransaction.getStatus());
           switch (action) {
               case TRYING:
                   registerParticipant(pjp, currentTransaction.getTransId());
                   break;
               case CONFIRMING:
                   break;
               case CANCELING:
                   break;
           }
       }
       return pjp.proceed(pjp.getArgs());
   }

  • 這里由于是在try階段藐俺,直接就進入了 registerParticipant 方法
private void registerParticipant(ProceedingJoinPoint point, String transId) throws NoSuchMethodException {

        MethodSignature signature = (MethodSignature) point.getSignature();
        Method method = signature.getMethod();

        Class<?> clazz = point.getTarget().getClass();

        Object[] args = point.getArgs();

        final Tcc tcc = method.getAnnotation(Tcc.class);

        //獲取協(xié)調(diào)方法
        String confirmMethodName = tcc.confirmMethod();

       /* if (StringUtils.isBlank(confirmMethodName)) {
            confirmMethodName = method.getName();
        }*/

        String cancelMethodName = tcc.cancelMethod();

       /* if (StringUtils.isBlank(cancelMethodName)) {
            cancelMethodName = method.getName();
        }
*/
        //設(shè)置模式
        final TccPatternEnum pattern = tcc.pattern();

        tccTransactionManager.getCurrentTransaction().setPattern(pattern.getCode());


        TccInvocation confirmInvocation = null;
        if (StringUtils.isNoneBlank(confirmMethodName)) {
            confirmInvocation = new TccInvocation(clazz,
                    confirmMethodName, method.getParameterTypes(), args);
        }

        TccInvocation cancelInvocation = null;
        if (StringUtils.isNoneBlank(cancelMethodName)) {
            cancelInvocation = new TccInvocation(clazz,
                    cancelMethodName,
                    method.getParameterTypes(), args);
        }


        //封裝調(diào)用點
        final Participant participant = new Participant(
                transId,
                confirmInvocation,
                cancelInvocation);

        tccTransactionManager.enlistParticipant(participant);

    }
  • 這里就獲取了 @Tcc(confirmMethod = "confirmOrderStatus", cancelMethod = "cancelOrderStatus")
    的信息炊甲,并把他封裝成了 Participant,存起來欲芹,然后真正的調(diào)用 PaymentServiceImpl.makePayment 業(yè)務(wù)方法卿啡,在業(yè)務(wù)方法里面,我們首先執(zhí)行的是更新訂單狀態(tài)(相信現(xiàn)在你還記得0.0):
       order.setStatus(OrderStatusEnum.PAYING.getCode());
       orderMapper.update(order);
  • 接下來耀石,我們進行扣除用戶余額,注意扣除余額這里是一個rpc方法:
       AccountDTO accountDTO = new AccountDTO();
       accountDTO.setAmount(order.getTotalAmount());
       accountDTO.setUserId(order.getUserId());
       accountService.payment(accountDTO)
  • 現(xiàn)在我們來關(guān)注 accountService.payment(accountDTO) 牵囤,這個接口的定義爸黄。

dubbo接口:


public interface AccountService {


    /**
     * 扣款支付
     *
     * @param accountDTO 參數(shù)dto
     * @return true
     */
    @Tcc
    boolean payment(AccountDTO accountDTO);
}

springcloud接口

@FeignClient(value = "account-service", configuration = MyConfiguration.class)
public interface AccountClient {

    @PostMapping("/account-service/account/payment")
    @Tcc
    Boolean payment(@RequestBody AccountDTO accountDO);

}

很明顯這里我們都在接口上加了@Tcc注解滞伟,我們知道springAop的特性,在接口上加注解炕贵,是無法進入切面的梆奈,所以我們在這里,要采用rpc框架的某些特性來幫助我們獲取到 @Tcc注解信息称开。 這一步很重要亩钟。當我們發(fā)起

accountService.payment(accountDTO) 調(diào)用的時候:

  • dubbo用戶,會走dubbo的filter接口,TccTransactionFilter:
private void registerParticipant(Class clazz, String methodName, Object[] arguments, Class... args) throws TccRuntimeException {
      try {
          Method method = clazz.getDeclaredMethod(methodName, args);
          Tcc tcc = method.getAnnotation(Tcc.class);
          if (Objects.nonNull(tcc)) {

            //獲取事務(wù)的上下文
              final TccTransactionContext tccTransactionContext =
                      TransactionContextLocal.getInstance().get();
              if (Objects.nonNull(tccTransactionContext)) {
                //dubbo rpc傳參數(shù)
                  RpcContext.getContext()
                          .setAttachment(Constant.TCC_TRANSACTION_CONTEXT,
                                  GsonUtils.getInstance().toJson(tccTransactionContext));
              }
              if (Objects.nonNull(tccTransactionContext)) {
                  if(TccActionEnum.TRYING.getCode()==tccTransactionContext.getAction()){
                      //獲取協(xié)調(diào)方法
                      String confirmMethodName = tcc.confirmMethod();

                      if (StringUtils.isBlank(confirmMethodName)) {
                          confirmMethodName = method.getName();
                      }

                      String cancelMethodName = tcc.cancelMethod();

                      if (StringUtils.isBlank(cancelMethodName)) {
                          cancelMethodName = method.getName();
                      }
                    //設(shè)置模式
                      final TccPatternEnum pattern = tcc.pattern();

                      tccTransactionManager.getCurrentTransaction().setPattern(pattern.getCode());
                      TccInvocation confirmInvocation = new TccInvocation(clazz,
                              confirmMethodName,
                              args, arguments);

                      TccInvocation cancelInvocation = new TccInvocation(clazz,
                              cancelMethodName,
                              args, arguments);

                      //封裝調(diào)用點
                      final Participant participant = new Participant(
                              tccTransactionContext.getTransId(),
                              confirmInvocation,
                              cancelInvocation);

                      tccTransactionManager.enlistParticipant(participant);
                  }

              }



          }
      } catch (NoSuchMethodException e) {
          throw new TccRuntimeException("not fount method " + e.getMessage());
      }

  }
  • springcloud 用戶鳖轰,則會進入 TccFeignHandler
public class TccFeignHandler implements InvocationHandler {
    /**
     * logger
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(TccFeignHandler.class);

    private Target<?> target;
    private Map<Method, MethodHandler> handlers;

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (Object.class.equals(method.getDeclaringClass())) {
            return method.invoke(this, args);
        } else {

            final Tcc tcc = method.getAnnotation(Tcc.class);
            if (Objects.isNull(tcc)) {
                return this.handlers.get(method).invoke(args);
            }

            final TccTransactionContext tccTransactionContext =
                    TransactionContextLocal.getInstance().get();
            if (Objects.nonNull(tccTransactionContext)) {
                final TccTransactionManager tccTransactionManager =
                        SpringBeanUtils.getInstance().getBean(TccTransactionManager.class);
                if (TccActionEnum.TRYING.getCode() == tccTransactionContext.getAction()) {
                    //獲取協(xié)調(diào)方法
                    String confirmMethodName = tcc.confirmMethod();

                    if (StringUtils.isBlank(confirmMethodName)) {
                        confirmMethodName = method.getName();
                    }

                    String cancelMethodName = tcc.cancelMethod();

                    if (StringUtils.isBlank(cancelMethodName)) {
                        cancelMethodName = method.getName();
                    }

                    //設(shè)置模式
                    final TccPatternEnum pattern = tcc.pattern();

                    tccTransactionManager.getCurrentTransaction().setPattern(pattern.getCode());

                    final Class<?> declaringClass = method.getDeclaringClass();

                    TccInvocation confirmInvocation = new TccInvocation(declaringClass,
                            confirmMethodName,
                            method.getParameterTypes(), args);

                    TccInvocation cancelInvocation = new TccInvocation(declaringClass,
                            cancelMethodName,
                            method.getParameterTypes(), args);

                    //封裝調(diào)用點
                    final Participant participant = new Participant(
                            tccTransactionContext.getTransId(),
                            confirmInvocation,
                            cancelInvocation);

                    tccTransactionManager.enlistParticipant(participant);
                }

            }


            return this.handlers.get(method).invoke(args);
        }
    }


    public void setTarget(Target<?> target) {
        this.target = target;
    }


    public void setHandlers(Map<Method, MethodHandler> handlers) {
        this.handlers = handlers;
    }

}

重要提示 在這里我們獲取了在第一步設(shè)置的事務(wù)上下文清酥,然后進行dubbo的rpc傳參數(shù),細心的你可能會發(fā)現(xiàn)蕴侣,這里獲取遠端confirm焰轻,cancel方法其實就是自己本身啊昆雀?辱志,那發(fā)起者還怎么來調(diào)用遠端的confrim,cancel方法呢狞膘?這里的調(diào)用揩懒,我們通過事務(wù)上下文的狀態(tài)來控制,比如在confrim階段挽封,我們就調(diào)用confrim方法等已球。。

到這里我們就完成了整個消費端的調(diào)用,下一篇分析提供者的調(diào)用流程智亮!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末退疫,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子鸽素,更是在濱河造成了極大的恐慌褒繁,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,734評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件馍忽,死亡現(xiàn)場離奇詭異棒坏,居然都是意外死亡,警方通過查閱死者的電腦和手機遭笋,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,931評論 3 394
  • 文/潘曉璐 我一進店門坝冕,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人瓦呼,你說我怎么就攤上這事喂窟。” “怎么了央串?”我有些...
    開封第一講書人閱讀 164,133評論 0 354
  • 文/不壞的土叔 我叫張陵磨澡,是天一觀的道長。 經(jīng)常有香客問我质和,道長稳摄,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,532評論 1 293
  • 正文 為了忘掉前任饲宿,我火速辦了婚禮厦酬,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘瘫想。我一直安慰自己仗阅,他們只是感情好,可當我...
    茶點故事閱讀 67,585評論 6 392
  • 文/花漫 我一把揭開白布国夜。 她就那樣靜靜地躺著减噪,像睡著了一般。 火紅的嫁衣襯著肌膚如雪支竹。 梳的紋絲不亂的頭發(fā)上旋廷,一...
    開封第一講書人閱讀 51,462評論 1 302
  • 那天,我揣著相機與錄音礼搁,去河邊找鬼饶碘。 笑死,一個胖子當著我的面吹牛馒吴,可吹牛的內(nèi)容都是我干的扎运。 我是一名探鬼主播瑟曲,決...
    沈念sama閱讀 40,262評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼豪治!你這毒婦竟也來了洞拨?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,153評論 0 276
  • 序言:老撾萬榮一對情侶失蹤负拟,失蹤者是張志新(化名)和其女友劉穎烦衣,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體掩浙,經(jīng)...
    沈念sama閱讀 45,587評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡花吟,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,792評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了厨姚。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片衅澈。...
    茶點故事閱讀 39,919評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖谬墙,靈堂內(nèi)的尸體忽然破棺而出今布,到底是詐尸還是另有隱情,我是刑警寧澤拭抬,帶...
    沈念sama閱讀 35,635評論 5 345
  • 正文 年R本政府宣布部默,位于F島的核電站,受9級特大地震影響玖喘,放射性物質(zhì)發(fā)生泄漏甩牺。R本人自食惡果不足惜蘑志,卻給世界環(huán)境...
    茶點故事閱讀 41,237評論 3 329
  • 文/蒙蒙 一累奈、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧急但,春花似錦澎媒、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,855評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至镐躲,卻和暖如春储玫,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背萤皂。 一陣腳步聲響...
    開封第一講書人閱讀 32,983評論 1 269
  • 我被黑心中介騙來泰國打工撒穷, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人裆熙。 一個月前我還...
    沈念sama閱讀 48,048評論 3 370
  • 正文 我出身青樓端礼,卻偏偏與公主長得像禽笑,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子蛤奥,可洞房花燭夜當晚...
    茶點故事閱讀 44,864評論 2 354

推薦閱讀更多精彩內(nèi)容