SpringBoot整合RocketMQ

RocketMQ基于Spring編程模型的消息收發(fā)

添加rocketmq-spring-boot-starter等相關(guān)依賴

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.8.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>tk.mybatis</groupId>
        <artifactId>mapper-spring-boot-starter</artifactId>
        <version>2.1.5</version>
    </dependency>

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.0.3</version>
    </dependency>

    ......
</dependencies>

RocketMQ Producer

添加配置

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    #必須指定group
    group: test-group

編寫producer代碼

@Service
@Slf4j
public class ShareService {

    @Autowired
    private ShareMapper shareMapper;

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public ShareDTO auditById(Integer id, ShareAuditDTO auditDTO){
        //1苍匆、查詢Share是否存在,不存在或者當(dāng)前的audit_status != NOT_YET就拋異常
        Share share = shareMapper.selectByPrimaryKey(id);

        //3捷枯、如果是PASS,那么就發(fā)送消息到rocketmq蛾洛,讓用戶中心去消費(fèi),并為發(fā)布人添加積分
        this.rocketMQTemplate.convertAndSend("add-bonus",
                UserAddBonusMsgDTO.builder()
                .userId(share.getUserId())
                        .bonus(50).build());

        return shareDTO;
    }
}

RocketMQ Consumer

添加配置

rocketmq:
  name-server: 127.0.0.1:9876

編寫consumer代碼

@Component
@RocketMQMessageListener(topic = "add-bonus",consumerGroup = "consumer-group")
@Slf4j
public class AddBonusListener implements RocketMQListener<UserAddBonusMsgDTO> {

    @Autowired
    private UserService userService;


    @Override
    public void onMessage(UserAddBonusMsgDTO message) {
        //收到消息后執(zhí)行的業(yè)務(wù)饺蔑。
        userService.addBonus(message);
    }
}

@Service
@Slf4j
public class UserService {

    @Autowired
    private UserMapper userMapper;

    @Autowired
    private BonusEventLogMapper bonusEventLogMapper;

    @Transactional(rollbackFor = Exception.class)
    public void addBonus(UserAddBonusMsgDTO userAddBonusMsgDTO) {
        //當(dāng)收到消息的時(shí)候執(zhí)行的業(yè)務(wù)
        //1变泄、為用戶加積分
        Integer userId = userAddBonusMsgDTO.getUserId();
        Integer bonus = userAddBonusMsgDTO.getBonus();
        User user = userMapper.selectByPrimaryKey(userId);
        user.setBonus(user.getBonus() + bonus);
        userMapper.updateByPrimaryKeySelective(user);
        //2、記錄日志到bonus_event_log表里面
        BonusEventLog bonusEventLog = BonusEventLog.builder().userId(userId).value(bonus).event("CONTRIBUTE")
                .createTime(new Date()).description("投稿加積分").build();
        bonusEventLogMapper.insertSelective(bonusEventLog);
        log.info("積分添加完畢心傀。屈暗。。脂男。");
    }
}

RocketMQ基于Spring編程模型的事務(wù)消息收發(fā)

編寫事物消息Producer代碼

@Service
@Slf4j
public class ShareService {

    @Autowired
    private ShareMapper shareMapper;

    @Autowired
    private UserCenterFeignClient userCenterFeignClient;

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Autowired
    private RocketmqTransactionLogMapper rocketmqTransactionLogMapper;

    public ShareDTO auditById(Integer id, ShareAuditDTO auditDTO){
        //1养叛、查詢Share是否存在,不存在或者當(dāng)前的audit_status != NOT_YET就拋異常
        Share share = shareMapper.selectByPrimaryKey(id);
        if(share == null){
            throw new IllegalArgumentException("參數(shù)非法宰翅!該分享不存在");
        }
        if(!Objects.equals("NOT_YET",share.getAuditStatus())){
            throw new IllegalArgumentException("參數(shù)非法弃甥!該分享已審核通過或?qū)徍瞬煌ㄟ^");
        }

        //3、如果是PASS汁讼,那么就發(fā)送消息到rocketmq淆攻,讓用戶中心去消費(fèi),并為發(fā)布人添加積分
        //該接口主要為審核嘿架,所以加積分使用異步操作瓶珊,這樣可以有效縮短該接口的響應(yīng)耗時(shí),從而提升用戶體驗(yàn)
        if(AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())){
            String transactionId = UUID.randomUUID().toString();
            //發(fā)送半消息
            rocketMQTemplate.sendMessageInTransaction(
                    "tx-add-bonus-group",
                    "add-bonus",
                    MessageBuilder.withPayload(UserAddBonusMsgDTO.builder().userId(share.getUserId()).bonus(50).build())
                            //header也有大用處
                            .setHeader(RocketMQHeaders.TRANSACTION_ID,transactionId)
                            .setHeader("share_id",id)
                            .build(),
                    auditDTO);//最后一個(gè)參數(shù)為args
        }else if(AuditStatusEnum.REJECT.equals(auditDTO.getAuditStatusEnum())){
            this.auditBYIdInDB(id,auditDTO);
        }

        ShareDTO shareDTO = new ShareDTO();
        BeanUtils.copyProperties(share,shareDTO);
        return shareDTO;
    }

    @Transactional(rollbackFor = Exception.class)
    public void auditBYIdInDB(Integer id,ShareAuditDTO auditDTO) {
        //2伞芹、審核資源,將狀態(tài)設(shè)為PASS/REJECT
        Share share = Share.builder().id(id).auditStatus(auditDTO.getAuditStatusEnum().toString())
                .reason(auditDTO.getReason()).build();
        shareMapper.updateByPrimaryKeySelective(share);
    }

    @Transactional(rollbackFor = Exception.class)
    public void auditBYIdWithRocketMqLog(Integer id,ShareAuditDTO auditDTO,String transactionId) {
        this.auditBYIdInDB(id,auditDTO);
        RocketmqTransactionLog rocketmqTransactionLog = RocketmqTransactionLog.builder().transactionId(transactionId)
                .log("審核分享...").build();
        rocketmqTransactionLogMapper.insertSelective(rocketmqTransactionLog);
    }
}

創(chuàng)建事物消息Producer事務(wù)監(jiān)聽器

@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {

    @Autowired
    private ShareService shareService;

    @Autowired
    private RocketmqTransactionLogMapper rocketmqTransactionLogMapper;

    /**
     * 發(fā)送prepare消息成功此方法被回調(diào)唱较,該方法用于執(zhí)行本地事務(wù)
     *
     * @param message 回傳的消息倦春,利用transactionId即可獲取到該消息的唯一Id
     * @param auditDTO 調(diào)用send方法時(shí)傳遞的參數(shù)线婚,當(dāng)send時(shí)候若有額外的參數(shù)可以傳遞到send方法中辕狰,這里能獲取到
     * @return 返回事務(wù)狀態(tài)勉抓,COMMIT:提交  ROLLBACK:回滾  UNKNOW:回調(diào)
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object auditDTO) {
        MessageHeaders headers = message.getHeaders();
        String transactionId = (String)headers.get(RocketMQHeaders.TRANSACTION_ID);
        Integer shareId = Integer.parseInt((String)headers.get("share_id"));
        try {
            shareService.auditBYIdWithRocketMqLog(shareId,(ShareAuditDTO)auditDTO,transactionId);
            //本地事物成功侣集,執(zhí)行commit
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            log.error("本地事物執(zhí)行異常简僧,e={}",e);
            //本地事物失敗获雕,執(zhí)行rollback
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    /**
     * 實(shí)現(xiàn)本地事務(wù)回查的邏輯,并返回本地事務(wù)執(zhí)行狀態(tài)
     * @param message 通過獲取transactionId來判斷這條消息的本地事務(wù)執(zhí)行狀態(tài)
     * @return 返回事務(wù)狀態(tài),COMMIT:提交  ROLLBACK:回滾  UNKNOW:回調(diào)
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        MessageHeaders headers = message.getHeaders();
        String transactionId = (String)headers.get(RocketMQHeaders.TRANSACTION_ID);

        RocketmqTransactionLog rocketmqTransactionLog = rocketmqTransactionLogMapper.selectOne(RocketmqTransactionLog
                .builder().transactionId(transactionId).build());
        if(rocketmqTransactionLog == null){
            log.error("如果本地事物日志沒有記錄收捣,transactionId={}",transactionId);
            //本地事物失敗届案,執(zhí)行rollback
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        //如果本地事物日志有記錄,執(zhí)行commit
        return RocketMQLocalTransactionState.COMMIT;
    }
}

事務(wù)消息consumer代碼和非事務(wù)消息編碼一致

@Component
@RocketMQMessageListener(topic = "add-bonus",consumerGroup = "consumer-group")
@Slf4j
public class AddBonusListener implements RocketMQListener<UserAddBonusMsgDTO> {

    @Autowired
    private UserService userService;


    @Override
    public void onMessage(UserAddBonusMsgDTO message) {
        //收到消息后執(zhí)行的業(yè)務(wù)楣颠。
        userService.addBonus(message);
    }
}

@Service
@Slf4j
public class UserService {

    @Autowired
    private UserMapper userMapper;

    @Autowired
    private BonusEventLogMapper bonusEventLogMapper;

    @Transactional(rollbackFor = Exception.class)
    public void addBonus(UserAddBonusMsgDTO userAddBonusMsgDTO) {
        //當(dāng)收到消息的時(shí)候執(zhí)行的業(yè)務(wù)
        //1、為用戶加積分
        Integer userId = userAddBonusMsgDTO.getUserId();
        Integer bonus = userAddBonusMsgDTO.getBonus();
        User user = userMapper.selectByPrimaryKey(userId);
        user.setBonus(user.getBonus() + bonus);
        userMapper.updateByPrimaryKeySelective(user);
        //2童漩、記錄日志到bonus_event_log表里面
        BonusEventLog bonusEventLog = BonusEventLog.builder().userId(userId).value(bonus).event("CONTRIBUTE")
                .createTime(new Date()).description("投稿加積分").build();
        bonusEventLogMapper.insertSelective(bonusEventLog);
        log.info("積分添加完畢。差凹。侧馅。危尿。");
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末馁痴,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子罗晕,更是在濱河造成了極大的恐慌,老刑警劉巖船逮,帶你破解...
    沈念sama閱讀 219,589評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件粤铭,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡梆惯,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,615評(píng)論 3 396
  • 文/潘曉璐 我一進(jìn)店門凹髓,熙熙樓的掌柜王于貴愁眉苦臉地迎上來怯屉,“玉大人蔚舀,你說我怎么就攤上這事锨络。” “怎么了羡儿?”我有些...
    開封第一講書人閱讀 165,933評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長。 經(jīng)常有香客問我缅叠,道長,這世上最難降的妖魔是什么肤粱? 我笑而不...
    開封第一講書人閱讀 58,976評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮,結(jié)果婚禮上领铐,老公的妹妹穿的比我還像新娘。我一直安慰自己绪撵,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,999評(píng)論 6 393
  • 文/花漫 我一把揭開白布幻碱。 她就那樣靜靜地躺著细溅,像睡著了一般褥傍。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上恍风,一...
    開封第一講書人閱讀 51,775評(píng)論 1 307
  • 那天誓篱,我揣著相機(jī)與錄音,去河邊找鬼窜骄。 笑死,一個(gè)胖子當(dāng)著我的面吹牛邻遏,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播准验,決...
    沈念sama閱讀 40,474評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼沟娱,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了济似?” 一聲冷哼從身側(cè)響起盏缤,我...
    開封第一講書人閱讀 39,359評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤蓖扑,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后律杠,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,854評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡灰嫉,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,007評(píng)論 3 338
  • 正文 我和宋清朗相戀三年嗓奢,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片股耽。...
    茶點(diǎn)故事閱讀 40,146評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖炎滞,靈堂內(nèi)的尸體忽然破棺而出诬乞,到底是詐尸還是另有隱情厂榛,我是刑警寧澤丽惭,帶...
    沈念sama閱讀 35,826評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站柜砾,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏痰驱。R本人自食惡果不足惜瞳浦,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,484評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望叫潦。 院中可真熱鬧,春花似錦、人聲如沸氢架。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,029評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽警检。三九已至,卻和暖如春扇雕,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背洼裤。 一陣腳步聲響...
    開封第一講書人閱讀 33,153評(píng)論 1 272
  • 我被黑心中介騙來泰國打工溪王, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人莹菱。 一個(gè)月前我還...
    沈念sama閱讀 48,420評(píng)論 3 373
  • 正文 我出身青樓道伟,卻偏偏與公主長得像迹缀,于是被迫代替她去往敵國和親蜜徽。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,107評(píng)論 2 356