微服務開發(fā)系列 第七篇:RocketMQ

總概

A筐眷、技術棧
  • 開發(fā)語言:Java 1.8
  • 數(shù)據(jù)庫:MySQL、Redis习柠、MongoDB匀谣、Elasticsearch
  • 微服務框架:Spring Cloud Alibaba
  • 微服務網(wǎng)關:Spring Cloud Gateway
  • 服務注冊和配置中心:Nacos
  • 分布式事務:Seata
  • 鏈路追蹤框架:Sleuth
  • 服務降級與熔斷:Sentinel
  • ORM框架:MyBatis-Plus
  • 分布式任務調(diào)度平臺:XXL-JOB
  • 消息中間件:RocketMQ
  • 分布式鎖:Redisson
  • 權限:OAuth2
  • DevOps:Jenkins、Docker资溃、K8S
B武翎、源碼地址

alanchenyan/ac-mall2-cloud

C、本節(jié)實現(xiàn)目標
  • [mall-order]下單溶锭,用RocketMQ消息中間件發(fā)送消息宝恶,[mall-member]監(jiān)聽消費給用戶加積分
D、系列

一趴捅、RocketMQ安裝

供參考:

二垫毙、功能描述

用戶下單(mall-order服務)后,發(fā)送下單事件MQ, mall-member服務監(jiān)聽消費MQ拱绑,為用戶增加積分综芥,MQ此處的作用是解耦。

三猎拨、代碼實現(xiàn)

3.1 maven加RocketMQ依賴包

在項目[mall-pom]的pom.xml里加入RocketMQ依賴包

<rocketmq.version>2.2.3</rocketmq.version>

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>${rocketmq.version}</version>
</dependency>
3.2 common.yml配置RocketMQ參數(shù)
rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: group-${spring.profiles.active}
    send-message-timeout: 3000 # 消息發(fā)送超時時長膀藐,默認3s
    retry-times-when-send-failed: 3 # 同步發(fā)送消息失敗重試次數(shù)屠阻,默認2
    retry-times-when-send-async-failed: 3 # 異步發(fā)送消息失敗重試次數(shù),默認2

common.yml完整配置

spring:
  redis:
    database: 0
    host: 127.0.0.1
    port: 6379a
    password: 123abc
    jedis:
      pool:
        max-active: 500  #連接池的最大數(shù)據(jù)庫連接數(shù)额各。設為0表示無限制
        max-idle: 20   #最大空閑數(shù)
        max-wait: -1
        min-idle: 5
    timeout: 1000
    redisson: 
      password: 123abc
      cluster:
        nodeAddresses: ["redis://127.0.0.1:6379"]
      single:
        address: "redis://127.0.0.1:6379"
        database: 0
    
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://192.168.100.51:3306/ac_db?serverTimezone=Asia/Shanghai&useUnicode=true&tinyInt1isBit=false&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowPublicKeyRetrieval=true
    username: ac_u
    password: ac_PWD_123

    #hikari數(shù)據(jù)庫連接池
    hikari:
      pool-name: YH_HikariCP
      minimum-idle: 10 #最小空閑連接數(shù)量
      idle-timeout: 600000 #空閑連接存活最大時間国觉,默認600000(10分鐘)
      maximum-pool-size: 100 #連接池最大連接數(shù),默認是10
      auto-commit: true  #此屬性控制從池返回的連接的默認自動提交行為,默認值:true
      max-lifetime: 1800000 #此屬性控制池中連接的最長生命周期臊泰,值0表示無限生命周期蛉加,默認1800000即30分鐘
      connection-timeout: 30000 #數(shù)據(jù)庫連接超時時間,默認30秒,即30000
      connection-test-query: SELECT 1

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: group-${spring.profiles.active}
    send-message-timeout: 3000 # 消息發(fā)送超時時長缸逃,默認3s
    retry-times-when-send-failed: 3 # 同步發(fā)送消息失敗重試次數(shù),默認2
    retry-times-when-send-async-failed: 3 # 異步發(fā)送消息失敗重試次數(shù)厂抽,默認2

mybatis-plus:
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
3.3 [mall-order]生產(chǎn)者

生產(chǎn)者OrderSender

package com.ac.order.mq.send;

import com.ac.common.qm.MqTopicConstant;
import com.ac.common.qm.msg.MqOrderMsg;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Slf4j
@Lazy
@Component
public class OrderSender {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    public void asyncSend(MqOrderMsg mqMsg) {
        String payload = JSONObject.toJSONString(mqMsg);

        //Topic+Tag更精準接收消息
        String destination = MqTopicConstant.TOPIC_ORDER + ":" + mqMsg.getAction().getCode();

        rocketMQTemplate.asyncSend(destination, payload, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info(OrderSender.class.getSimpleName() + ",消息發(fā)送成功, result: {}", sendResult);
            }

            @Override
            public void onException(Throwable e) {
                log.error(OrderSender.class.getSimpleName() + ",消息發(fā)送失敗");
                e.printStackTrace();
            }
        });
    }
}

下單發(fā)送MQ

@Slf4j
@Service
public class OrderServiceImpl implements OrderService {

    @Resource
    private OrderDao orderDaoImpl;

    @Resource
    private MemberFeignApi memberFeignApi;

    @Resource
    private OrderItemService orderItemServiceImpl;

    @Resource
    private OrderSender orderSender;

    @Override
    public OrderDetailDTO findOrderDetail(Long id) {
        return null;
    }

    @Override
    public IPage<OrderDTO> pageOrder(OrderPageQry qry) {
        return orderDaoImpl.pageOrder(qry);
    }

    @Transactional(rollbackFor = Exception.class)
    @Override
    public Long createOrder(OrderAddVO addVO) {
        Order order = new Order();
        order.setOrderNo(RandomUtil.randomNumbers(8));

        //省略支付流程
        order.setOrderState(OrderStateEnum.PAYED);
        order.setOrderTime(LocalDateTime.now());

        //通過feign取用戶信息
        MemberDTO member = memberFeignApi.findMember(addVO.getMemberId());
        order.setMemberId(addVO.getMemberId());
        order.setMemberName(member.getMemberName());
        order.setMobile(member.getMobile());
        orderDaoImpl.save(order);

        BigDecimal discountAmount = new BigDecimal(0.00);
        BigDecimal productAmount = new BigDecimal(0.00);
        //存訂單項信息
        for (OrderItemAddVO orderItemAdd : addVO.getOrderItemList()) {
            OrderItem orderItem = orderItemServiceImpl.addOrderItem(order.getId(), orderItemAdd);
            productAmount = productAmount.add(orderItem.getBuyPrice().multiply(new BigDecimal(orderItem.getBuyNum())));
        }

        //更新訂單金額信息
        order.setDiscountAmount(discountAmount);
        order.setProductAmount(productAmount);
        BigDecimal payAmount = productAmount.subtract(discountAmount);
        order.setPayAmount(payAmount);
        orderDaoImpl.updateById(order);

        //發(fā)送下單MQ
        MqOrderMsg mqMsg = MqOrderMsg.builder()
                .action(MqOrderAction.PAID)
                .orderId(order.getId())
                .memberId(order.getMemberId())
                .payAmount(order.getPayAmount())
                .build();
        orderSender.asyncSend(mqMsg);

        return order.getId();
    }
}
3.4 [mall-member]消費者

MemberOrderListener消費者

package com.ac.member.mq.listener;

import com.ac.common.qm.MqTopicConstant;
import com.ac.common.qm.MqConsumerConstant;
import com.ac.common.qm.msg.MqOrderAction;
import com.ac.common.qm.msg.MqOrderMsg;
import com.ac.member.component.MemberIntegralComponent;
import com.ac.member.enums.IntegralSourceTypeEnum;
import com.ac.member.vo.IntegralLogEditVO;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

@Slf4j
@Service
@RocketMQMessageListener(
        consumerGroup = MqConsumerConstant.CONSUMER_MEMBER_ORDER,
        topic = MqTopicConstant.TOPIC_ORDER,
        selectorExpression = "PAID||REFUND",
        messageModel = MessageModel.CLUSTERING)
public class MemberOrderListener implements RocketMQListener<MessageExt> {

    @Resource
    private MemberIntegralComponent memberIntegralComponent;

    @Override
    public void onMessage(MessageExt message) {
        MqOrderMsg mqMsg = JSONObject.parseObject(message.getBody(), MqOrderMsg.class);
        log.info(MemberOrderListener.class.getSimpleName() + ",msgId={},msg={}", message.getMsgId(), mqMsg);
        try {
            //Topic+Tag更精準接收消息
            MqOrderAction action = mqMsg.getAction();
            if (MqOrderAction.PAID == action) {
                dealPaid(mqMsg);
            } else if (MqOrderAction.REFUND == action) {
                dealRefund(mqMsg);
            }
        } catch (Exception e) {
            log.error(MemberOrderListener.class.getSimpleName() + ",消費失敗,mqMsg={},e={}", mqMsg, e.getMessage());
        }
    }

    /**
     * 處理訂單付款事件
     *
     * @param mqMsg
     */
    private void dealPaid(MqOrderMsg mqMsg) {
        IntegralLogEditVO integralVO = new IntegralLogEditVO();
        integralVO.setMemberId(mqMsg.getMemberId());
        integralVO.setSourceType(IntegralSourceTypeEnum.AWARD_ORDER);
        integralVO.setSourceRemark("下單獲得積分");
        integralVO.setIntegral(mqMsg.getPayAmount().longValue());

        memberIntegralComponent.recordIntegral(integralVO);
    }

    private void dealRefund(MqOrderMsg mqMsg) {
        log.info("處理退單事件");
    }
}

四需频、測試

4.1 下單
下單
4.2 控制臺日志

[mall-order]控制臺MQ發(fā)送日志:

2023-04-04 15:58:37.052  INFO 25204 --- [ublicExecutor_1] com.ac.order.mq.send.OrderSender         : OrderSender,消息發(fā)送成功, result: SendResult [sendStatus=SEND_OK, msgId=7F000001627418B4AAC212E0B7F30000, offsetMsgId=AC100B8D00002A9F000000000003B369, messageQueue=MessageQueue [topic=TOPIC_ORDER, brokerName=LAPTOP-R0R80SCR, queueId=3], queueOffset=0]

[mall-member]控制臺MQ接收日志:

2023-04-04 15:58:37.243  INFO 26788 --- [_MEMBER_ORDER_1] c.a.m.mq.listener.MemberOrderListener    : MemberOrderListener,msgId=7F000001627418B4AAC212E0B7F30000,msg=MqOrderMsg(action=PAID, orderId=281635594240001, memberId=264260572479489, payAmount=40.50)
4.3 數(shù)據(jù)庫記錄
t_order
t_member_integral
t_member_integral_log
4.4 RocketMQ Dashboard
Dashboard列表
Dashboard消息內(nèi)容
最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市筷凤,隨后出現(xiàn)的幾起案子昭殉,更是在濱河造成了極大的恐慌,老刑警劉巖藐守,帶你破解...
    沈念sama閱讀 221,576評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件挪丢,死亡現(xiàn)場離奇詭異,居然都是意外死亡卢厂,警方通過查閱死者的電腦和手機乾蓬,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,515評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來慎恒,“玉大人任内,你說我怎么就攤上這事∪诩恚” “怎么了死嗦?”我有些...
    開封第一講書人閱讀 168,017評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長粒氧。 經(jīng)常有香客問我越除,道長,這世上最難降的妖魔是什么外盯? 我笑而不...
    開封第一講書人閱讀 59,626評論 1 296
  • 正文 為了忘掉前任摘盆,我火速辦了婚禮,結果婚禮上门怪,老公的妹妹穿的比我還像新娘骡澈。我一直安慰自己,他們只是感情好掷空,可當我...
    茶點故事閱讀 68,625評論 6 397
  • 文/花漫 我一把揭開白布肋殴。 她就那樣靜靜地躺著囤锉,像睡著了一般。 火紅的嫁衣襯著肌膚如雪护锤。 梳的紋絲不亂的頭發(fā)上官地,一...
    開封第一講書人閱讀 52,255評論 1 308
  • 那天,我揣著相機與錄音烙懦,去河邊找鬼驱入。 笑死,一個胖子當著我的面吹牛氯析,可吹牛的內(nèi)容都是我干的亏较。 我是一名探鬼主播,決...
    沈念sama閱讀 40,825評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼掩缓,長吁一口氣:“原來是場噩夢啊……” “哼雪情!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起你辣,我...
    開封第一講書人閱讀 39,729評論 0 276
  • 序言:老撾萬榮一對情侶失蹤巡通,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后舍哄,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體宴凉,經(jīng)...
    沈念sama閱讀 46,271評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,363評論 3 340
  • 正文 我和宋清朗相戀三年表悬,在試婚紗的時候發(fā)現(xiàn)自己被綠了弥锄。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,498評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡签孔,死狀恐怖叉讥,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情饥追,我是刑警寧澤图仓,帶...
    沈念sama閱讀 36,183評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站但绕,受9級特大地震影響救崔,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜捏顺,卻給世界環(huán)境...
    茶點故事閱讀 41,867評論 3 333
  • 文/蒙蒙 一六孵、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧幅骄,春花似錦劫窒、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,338評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽冠息。三九已至,卻和暖如春孕索,著一層夾襖步出監(jiān)牢的瞬間逛艰,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,458評論 1 272
  • 我被黑心中介騙來泰國打工搞旭, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留散怖,地道東北人。 一個月前我還...
    沈念sama閱讀 48,906評論 3 376
  • 正文 我出身青樓肄渗,卻偏偏與公主長得像镇眷,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子翎嫡,可洞房花燭夜當晚...
    茶點故事閱讀 45,507評論 2 359

推薦閱讀更多精彩內(nèi)容