總概
A筐眷、技術棧
- 開發(fā)語言:Java 1.8
- 數(shù)據(jù)庫:MySQL、Redis习柠、MongoDB匀谣、Elasticsearch
- 微服務框架:Spring Cloud Alibaba
- 微服務網(wǎng)關:Spring Cloud Gateway
- 服務注冊和配置中心:Nacos
- 分布式事務:Seata
- 鏈路追蹤框架:Sleuth
- 服務降級與熔斷:Sentinel
- ORM框架:MyBatis-Plus
- 分布式任務調(diào)度平臺:XXL-JOB
- 消息中間件:RocketMQ
- 分布式鎖:Redisson
- 權限:OAuth2
- DevOps:Jenkins、Docker资溃、K8S
B武翎、源碼地址
C、本節(jié)實現(xiàn)目標
- [mall-order]下單溶锭,用RocketMQ消息中間件發(fā)送消息宝恶,[mall-member]監(jiān)聽消費給用戶加積分
D、系列
- 微服務開發(fā)系列 第一篇:項目搭建
- 微服務開發(fā)系列 第二篇:Nacos
- 微服務開發(fā)系列 第三篇:OpenFeign
- 微服務開發(fā)系列 第四篇:分頁查詢
- 微服務開發(fā)系列 第五篇:Redis
- 微服務開發(fā)系列 第六篇:Redisson
- 微服務開發(fā)系列 第七篇:RocketMQ
- 微服務開發(fā)系列 第八篇:Elasticsearch
- 微服務開發(fā)系列 第九篇:OAuth2
- 微服務開發(fā)系列 第十篇:Gateway
- 微服務開發(fā)系列 第十一篇:XXL-JOB
- 微服務開發(fā)系列 第十二篇:MongoDB
- 微服務開發(fā)系列 第n篇:AOP請求日志監(jiān)控
- 微服務開發(fā)系列 第n篇:自定義校驗注解
一趴捅、RocketMQ安裝
供參考:
二垫毙、功能描述
用戶下單(mall-order服務)后,發(fā)送下單事件MQ, mall-member服務監(jiān)聽消費MQ拱绑,為用戶增加積分综芥,MQ此處的作用是解耦。
三猎拨、代碼實現(xiàn)
3.1 maven加RocketMQ依賴包
在項目[mall-pom]的pom.xml里加入RocketMQ依賴包
<rocketmq.version>2.2.3</rocketmq.version>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq.version}</version>
</dependency>
3.2 common.yml配置RocketMQ參數(shù)
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: group-${spring.profiles.active}
send-message-timeout: 3000 # 消息發(fā)送超時時長膀藐,默認3s
retry-times-when-send-failed: 3 # 同步發(fā)送消息失敗重試次數(shù)屠阻,默認2
retry-times-when-send-async-failed: 3 # 異步發(fā)送消息失敗重試次數(shù),默認2
common.yml完整配置
spring:
redis:
database: 0
host: 127.0.0.1
port: 6379a
password: 123abc
jedis:
pool:
max-active: 500 #連接池的最大數(shù)據(jù)庫連接數(shù)额各。設為0表示無限制
max-idle: 20 #最大空閑數(shù)
max-wait: -1
min-idle: 5
timeout: 1000
redisson:
password: 123abc
cluster:
nodeAddresses: ["redis://127.0.0.1:6379"]
single:
address: "redis://127.0.0.1:6379"
database: 0
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.168.100.51:3306/ac_db?serverTimezone=Asia/Shanghai&useUnicode=true&tinyInt1isBit=false&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowPublicKeyRetrieval=true
username: ac_u
password: ac_PWD_123
#hikari數(shù)據(jù)庫連接池
hikari:
pool-name: YH_HikariCP
minimum-idle: 10 #最小空閑連接數(shù)量
idle-timeout: 600000 #空閑連接存活最大時間国觉,默認600000(10分鐘)
maximum-pool-size: 100 #連接池最大連接數(shù),默認是10
auto-commit: true #此屬性控制從池返回的連接的默認自動提交行為,默認值:true
max-lifetime: 1800000 #此屬性控制池中連接的最長生命周期臊泰,值0表示無限生命周期蛉加,默認1800000即30分鐘
connection-timeout: 30000 #數(shù)據(jù)庫連接超時時間,默認30秒,即30000
connection-test-query: SELECT 1
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: group-${spring.profiles.active}
send-message-timeout: 3000 # 消息發(fā)送超時時長缸逃,默認3s
retry-times-when-send-failed: 3 # 同步發(fā)送消息失敗重試次數(shù),默認2
retry-times-when-send-async-failed: 3 # 異步發(fā)送消息失敗重試次數(shù)厂抽,默認2
mybatis-plus:
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
3.3 [mall-order]生產(chǎn)者
生產(chǎn)者OrderSender
package com.ac.order.mq.send;
import com.ac.common.qm.MqTopicConstant;
import com.ac.common.qm.msg.MqOrderMsg;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Slf4j
@Lazy
@Component
public class OrderSender {
@Resource
private RocketMQTemplate rocketMQTemplate;
public void asyncSend(MqOrderMsg mqMsg) {
String payload = JSONObject.toJSONString(mqMsg);
//Topic+Tag更精準接收消息
String destination = MqTopicConstant.TOPIC_ORDER + ":" + mqMsg.getAction().getCode();
rocketMQTemplate.asyncSend(destination, payload, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info(OrderSender.class.getSimpleName() + ",消息發(fā)送成功, result: {}", sendResult);
}
@Override
public void onException(Throwable e) {
log.error(OrderSender.class.getSimpleName() + ",消息發(fā)送失敗");
e.printStackTrace();
}
});
}
}
下單發(fā)送MQ
@Slf4j
@Service
public class OrderServiceImpl implements OrderService {
@Resource
private OrderDao orderDaoImpl;
@Resource
private MemberFeignApi memberFeignApi;
@Resource
private OrderItemService orderItemServiceImpl;
@Resource
private OrderSender orderSender;
@Override
public OrderDetailDTO findOrderDetail(Long id) {
return null;
}
@Override
public IPage<OrderDTO> pageOrder(OrderPageQry qry) {
return orderDaoImpl.pageOrder(qry);
}
@Transactional(rollbackFor = Exception.class)
@Override
public Long createOrder(OrderAddVO addVO) {
Order order = new Order();
order.setOrderNo(RandomUtil.randomNumbers(8));
//省略支付流程
order.setOrderState(OrderStateEnum.PAYED);
order.setOrderTime(LocalDateTime.now());
//通過feign取用戶信息
MemberDTO member = memberFeignApi.findMember(addVO.getMemberId());
order.setMemberId(addVO.getMemberId());
order.setMemberName(member.getMemberName());
order.setMobile(member.getMobile());
orderDaoImpl.save(order);
BigDecimal discountAmount = new BigDecimal(0.00);
BigDecimal productAmount = new BigDecimal(0.00);
//存訂單項信息
for (OrderItemAddVO orderItemAdd : addVO.getOrderItemList()) {
OrderItem orderItem = orderItemServiceImpl.addOrderItem(order.getId(), orderItemAdd);
productAmount = productAmount.add(orderItem.getBuyPrice().multiply(new BigDecimal(orderItem.getBuyNum())));
}
//更新訂單金額信息
order.setDiscountAmount(discountAmount);
order.setProductAmount(productAmount);
BigDecimal payAmount = productAmount.subtract(discountAmount);
order.setPayAmount(payAmount);
orderDaoImpl.updateById(order);
//發(fā)送下單MQ
MqOrderMsg mqMsg = MqOrderMsg.builder()
.action(MqOrderAction.PAID)
.orderId(order.getId())
.memberId(order.getMemberId())
.payAmount(order.getPayAmount())
.build();
orderSender.asyncSend(mqMsg);
return order.getId();
}
}
3.4 [mall-member]消費者
MemberOrderListener消費者
package com.ac.member.mq.listener;
import com.ac.common.qm.MqTopicConstant;
import com.ac.common.qm.MqConsumerConstant;
import com.ac.common.qm.msg.MqOrderAction;
import com.ac.common.qm.msg.MqOrderMsg;
import com.ac.member.component.MemberIntegralComponent;
import com.ac.member.enums.IntegralSourceTypeEnum;
import com.ac.member.vo.IntegralLogEditVO;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Slf4j
@Service
@RocketMQMessageListener(
consumerGroup = MqConsumerConstant.CONSUMER_MEMBER_ORDER,
topic = MqTopicConstant.TOPIC_ORDER,
selectorExpression = "PAID||REFUND",
messageModel = MessageModel.CLUSTERING)
public class MemberOrderListener implements RocketMQListener<MessageExt> {
@Resource
private MemberIntegralComponent memberIntegralComponent;
@Override
public void onMessage(MessageExt message) {
MqOrderMsg mqMsg = JSONObject.parseObject(message.getBody(), MqOrderMsg.class);
log.info(MemberOrderListener.class.getSimpleName() + ",msgId={},msg={}", message.getMsgId(), mqMsg);
try {
//Topic+Tag更精準接收消息
MqOrderAction action = mqMsg.getAction();
if (MqOrderAction.PAID == action) {
dealPaid(mqMsg);
} else if (MqOrderAction.REFUND == action) {
dealRefund(mqMsg);
}
} catch (Exception e) {
log.error(MemberOrderListener.class.getSimpleName() + ",消費失敗,mqMsg={},e={}", mqMsg, e.getMessage());
}
}
/**
* 處理訂單付款事件
*
* @param mqMsg
*/
private void dealPaid(MqOrderMsg mqMsg) {
IntegralLogEditVO integralVO = new IntegralLogEditVO();
integralVO.setMemberId(mqMsg.getMemberId());
integralVO.setSourceType(IntegralSourceTypeEnum.AWARD_ORDER);
integralVO.setSourceRemark("下單獲得積分");
integralVO.setIntegral(mqMsg.getPayAmount().longValue());
memberIntegralComponent.recordIntegral(integralVO);
}
private void dealRefund(MqOrderMsg mqMsg) {
log.info("處理退單事件");
}
}
四需频、測試
4.1 下單
下單
4.2 控制臺日志
[mall-order]控制臺MQ發(fā)送日志:
2023-04-04 15:58:37.052 INFO 25204 --- [ublicExecutor_1] com.ac.order.mq.send.OrderSender : OrderSender,消息發(fā)送成功, result: SendResult [sendStatus=SEND_OK, msgId=7F000001627418B4AAC212E0B7F30000, offsetMsgId=AC100B8D00002A9F000000000003B369, messageQueue=MessageQueue [topic=TOPIC_ORDER, brokerName=LAPTOP-R0R80SCR, queueId=3], queueOffset=0]
[mall-member]控制臺MQ接收日志:
2023-04-04 15:58:37.243 INFO 26788 --- [_MEMBER_ORDER_1] c.a.m.mq.listener.MemberOrderListener : MemberOrderListener,msgId=7F000001627418B4AAC212E0B7F30000,msg=MqOrderMsg(action=PAID, orderId=281635594240001, memberId=264260572479489, payAmount=40.50)
4.3 數(shù)據(jù)庫記錄
t_order
t_member_integral
t_member_integral_log
4.4 RocketMQ Dashboard
Dashboard列表
Dashboard消息內(nèi)容