RocketMQ基于Spring編程模型的消息收發(fā)
添加rocketmq-spring-boot-starter等相關(guān)依賴
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.8.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>tk.mybatis</groupId>
<artifactId>mapper-spring-boot-starter</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
......
</dependencies>
RocketMQ Producer
添加配置
rocketmq:
name-server: 127.0.0.1:9876
producer:
#必須指定group
group: test-group
編寫producer代碼
@Service
@Slf4j
public class ShareService {
@Autowired
private ShareMapper shareMapper;
@Autowired
private RocketMQTemplate rocketMQTemplate;
public ShareDTO auditById(Integer id, ShareAuditDTO auditDTO){
//1苍匆、查詢Share是否存在,不存在或者當(dāng)前的audit_status != NOT_YET就拋異常
Share share = shareMapper.selectByPrimaryKey(id);
//3捷枯、如果是PASS,那么就發(fā)送消息到rocketmq蛾洛,讓用戶中心去消費(fèi),并為發(fā)布人添加積分
this.rocketMQTemplate.convertAndSend("add-bonus",
UserAddBonusMsgDTO.builder()
.userId(share.getUserId())
.bonus(50).build());
return shareDTO;
}
}
RocketMQ Consumer
添加配置
rocketmq:
name-server: 127.0.0.1:9876
編寫consumer代碼
@Component
@RocketMQMessageListener(topic = "add-bonus",consumerGroup = "consumer-group")
@Slf4j
public class AddBonusListener implements RocketMQListener<UserAddBonusMsgDTO> {
@Autowired
private UserService userService;
@Override
public void onMessage(UserAddBonusMsgDTO message) {
//收到消息后執(zhí)行的業(yè)務(wù)饺蔑。
userService.addBonus(message);
}
}
@Service
@Slf4j
public class UserService {
@Autowired
private UserMapper userMapper;
@Autowired
private BonusEventLogMapper bonusEventLogMapper;
@Transactional(rollbackFor = Exception.class)
public void addBonus(UserAddBonusMsgDTO userAddBonusMsgDTO) {
//當(dāng)收到消息的時(shí)候執(zhí)行的業(yè)務(wù)
//1变泄、為用戶加積分
Integer userId = userAddBonusMsgDTO.getUserId();
Integer bonus = userAddBonusMsgDTO.getBonus();
User user = userMapper.selectByPrimaryKey(userId);
user.setBonus(user.getBonus() + bonus);
userMapper.updateByPrimaryKeySelective(user);
//2、記錄日志到bonus_event_log表里面
BonusEventLog bonusEventLog = BonusEventLog.builder().userId(userId).value(bonus).event("CONTRIBUTE")
.createTime(new Date()).description("投稿加積分").build();
bonusEventLogMapper.insertSelective(bonusEventLog);
log.info("積分添加完畢心傀。屈暗。。脂男。");
}
}
RocketMQ基于Spring編程模型的事務(wù)消息收發(fā)
編寫事物消息Producer代碼
@Service
@Slf4j
public class ShareService {
@Autowired
private ShareMapper shareMapper;
@Autowired
private UserCenterFeignClient userCenterFeignClient;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
public ShareDTO auditById(Integer id, ShareAuditDTO auditDTO){
//1养叛、查詢Share是否存在,不存在或者當(dāng)前的audit_status != NOT_YET就拋異常
Share share = shareMapper.selectByPrimaryKey(id);
if(share == null){
throw new IllegalArgumentException("參數(shù)非法宰翅!該分享不存在");
}
if(!Objects.equals("NOT_YET",share.getAuditStatus())){
throw new IllegalArgumentException("參數(shù)非法弃甥!該分享已審核通過或?qū)徍瞬煌ㄟ^");
}
//3、如果是PASS汁讼,那么就發(fā)送消息到rocketmq淆攻,讓用戶中心去消費(fèi),并為發(fā)布人添加積分
//該接口主要為審核嘿架,所以加積分使用異步操作瓶珊,這樣可以有效縮短該接口的響應(yīng)耗時(shí),從而提升用戶體驗(yàn)
if(AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())){
String transactionId = UUID.randomUUID().toString();
//發(fā)送半消息
rocketMQTemplate.sendMessageInTransaction(
"tx-add-bonus-group",
"add-bonus",
MessageBuilder.withPayload(UserAddBonusMsgDTO.builder().userId(share.getUserId()).bonus(50).build())
//header也有大用處
.setHeader(RocketMQHeaders.TRANSACTION_ID,transactionId)
.setHeader("share_id",id)
.build(),
auditDTO);//最后一個(gè)參數(shù)為args
}else if(AuditStatusEnum.REJECT.equals(auditDTO.getAuditStatusEnum())){
this.auditBYIdInDB(id,auditDTO);
}
ShareDTO shareDTO = new ShareDTO();
BeanUtils.copyProperties(share,shareDTO);
return shareDTO;
}
@Transactional(rollbackFor = Exception.class)
public void auditBYIdInDB(Integer id,ShareAuditDTO auditDTO) {
//2伞芹、審核資源,將狀態(tài)設(shè)為PASS/REJECT
Share share = Share.builder().id(id).auditStatus(auditDTO.getAuditStatusEnum().toString())
.reason(auditDTO.getReason()).build();
shareMapper.updateByPrimaryKeySelective(share);
}
@Transactional(rollbackFor = Exception.class)
public void auditBYIdWithRocketMqLog(Integer id,ShareAuditDTO auditDTO,String transactionId) {
this.auditBYIdInDB(id,auditDTO);
RocketmqTransactionLog rocketmqTransactionLog = RocketmqTransactionLog.builder().transactionId(transactionId)
.log("審核分享...").build();
rocketmqTransactionLogMapper.insertSelective(rocketmqTransactionLog);
}
}
創(chuàng)建事物消息Producer事務(wù)監(jiān)聽器
@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
@Autowired
private ShareService shareService;
@Autowired
private RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
/**
* 發(fā)送prepare消息成功此方法被回調(diào)唱较,該方法用于執(zhí)行本地事務(wù)
*
* @param message 回傳的消息倦春,利用transactionId即可獲取到該消息的唯一Id
* @param auditDTO 調(diào)用send方法時(shí)傳遞的參數(shù)线婚,當(dāng)send時(shí)候若有額外的參數(shù)可以傳遞到send方法中辕狰,這里能獲取到
* @return 返回事務(wù)狀態(tài)勉抓,COMMIT:提交 ROLLBACK:回滾 UNKNOW:回調(diào)
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object auditDTO) {
MessageHeaders headers = message.getHeaders();
String transactionId = (String)headers.get(RocketMQHeaders.TRANSACTION_ID);
Integer shareId = Integer.parseInt((String)headers.get("share_id"));
try {
shareService.auditBYIdWithRocketMqLog(shareId,(ShareAuditDTO)auditDTO,transactionId);
//本地事物成功侣集,執(zhí)行commit
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("本地事物執(zhí)行異常简僧,e={}",e);
//本地事物失敗获雕,執(zhí)行rollback
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 實(shí)現(xiàn)本地事務(wù)回查的邏輯,并返回本地事務(wù)執(zhí)行狀態(tài)
* @param message 通過獲取transactionId來判斷這條消息的本地事務(wù)執(zhí)行狀態(tài)
* @return 返回事務(wù)狀態(tài),COMMIT:提交 ROLLBACK:回滾 UNKNOW:回調(diào)
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
MessageHeaders headers = message.getHeaders();
String transactionId = (String)headers.get(RocketMQHeaders.TRANSACTION_ID);
RocketmqTransactionLog rocketmqTransactionLog = rocketmqTransactionLogMapper.selectOne(RocketmqTransactionLog
.builder().transactionId(transactionId).build());
if(rocketmqTransactionLog == null){
log.error("如果本地事物日志沒有記錄收捣,transactionId={}",transactionId);
//本地事物失敗届案,執(zhí)行rollback
return RocketMQLocalTransactionState.ROLLBACK;
}
//如果本地事物日志有記錄,執(zhí)行commit
return RocketMQLocalTransactionState.COMMIT;
}
}
事務(wù)消息consumer代碼和非事務(wù)消息編碼一致
@Component
@RocketMQMessageListener(topic = "add-bonus",consumerGroup = "consumer-group")
@Slf4j
public class AddBonusListener implements RocketMQListener<UserAddBonusMsgDTO> {
@Autowired
private UserService userService;
@Override
public void onMessage(UserAddBonusMsgDTO message) {
//收到消息后執(zhí)行的業(yè)務(wù)楣颠。
userService.addBonus(message);
}
}
@Service
@Slf4j
public class UserService {
@Autowired
private UserMapper userMapper;
@Autowired
private BonusEventLogMapper bonusEventLogMapper;
@Transactional(rollbackFor = Exception.class)
public void addBonus(UserAddBonusMsgDTO userAddBonusMsgDTO) {
//當(dāng)收到消息的時(shí)候執(zhí)行的業(yè)務(wù)
//1、為用戶加積分
Integer userId = userAddBonusMsgDTO.getUserId();
Integer bonus = userAddBonusMsgDTO.getBonus();
User user = userMapper.selectByPrimaryKey(userId);
user.setBonus(user.getBonus() + bonus);
userMapper.updateByPrimaryKeySelective(user);
//2童漩、記錄日志到bonus_event_log表里面
BonusEventLog bonusEventLog = BonusEventLog.builder().userId(userId).value(bonus).event("CONTRIBUTE")
.createTime(new Date()).description("投稿加積分").build();
bonusEventLogMapper.insertSelective(bonusEventLog);
log.info("積分添加完畢。差凹。侧馅。危尿。");
}
}