Spring Cloud 集成阿里rocketMq 發(fā)送延時(shí)/定時(shí)/普通消息 解決消費(fèi)軌跡未消費(fèi)問題


spring cloud stream 介紹(照搬)

  • Spring Cloud Stream 是一個(gè)用于構(gòu)建基于消息的微服務(wù)應(yīng)用框架。它基于 SpringBoot 來創(chuàng)建具有生產(chǎn)級(jí)別的單機(jī) Spring 應(yīng)用粱甫,并且使用 Spring Integration 與 Broker 進(jìn)行連接。
  • Spring Cloud Stream 提供了消息中間件配置的統(tǒng)一抽象,推出了 publish-subscribe瞒大、consumer groups、partition 這些統(tǒng)一的概念酗电。
  • Spring Cloud Stream 內(nèi)部有兩個(gè)概念:Binder 和 Binding德绿。
  • Binder: 跟外部消息中間件集成的組件蕴纳,用來創(chuàng)建 Binding,各消息中間件都有自己的 Binder 實(shí)現(xiàn)。
    比如 Kafka 的實(shí)現(xiàn) KafkaMessageChannelBinderRabbitMQ 的實(shí)現(xiàn) RabbitMessageChannelBinder 以及 RocketMQ的實(shí)現(xiàn) RocketMQMessageChannelBinder案狠。
  • Binding: 包括 Input Binding 和 Output Binding。
  • Binding 在消息中間件與應(yīng)用程序提供的 Provider 和 Consumer 之間提供了一個(gè)橋梁灿椅,實(shí)現(xiàn)了開發(fā)者只需使用應(yīng)用程序的 Provider 或 Consumer 生產(chǎn)或消費(fèi)數(shù)據(jù)即可,屏蔽了開發(fā)者與底層消息中間件的接觸馍刮。

版本選擇

因?yàn)椴皇怯糜陂_發(fā),僅供學(xué)習(xí)用所以我參照了下阿里的版本,選用了最新的,具體依據(jù)自身項(xiàng)目做參考
阿里git版本說明傳送門

阿里組件

畢業(yè)版本依賴


一. pom.xml 配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.2</version>
    </parent>
    <groupId>org.example</groupId>
    <artifactId>spring-alibaba-cloud-rocketmq-studytest</artifactId>
    <version>1.0-SNAPSHOT</version>
    <description>阿里巴巴cloud-rocketmq學(xué)習(xí)</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <!-- spring cloud 版本依賴-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>2020.0.0</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <!-- spring alibaba cloud 版本依賴-->
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>2021.1</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <!-- rocketmq 依賴-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        </dependency>
        <!-- spring boot web依賴-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>
</project>

二. 自定義消息channel與rocketMq配置

上面我們引入了spring cloud alibab rocketmq相關(guān)依賴滥朱,下面我們開始消息通道與yml關(guān)于rocketmq的配置
由于阿里的spring-cloud-starter-stream-rocketmq 是依賴spring的stream binder實(shí)現(xiàn)的,所以rocketMq配置分為rocketMq的自定義配置與stream binder的公共配置,如下:

  • spring.cloud.stream.rocketmq 為rocketmq自定義配置
  • spring.cloud.stream.bindings 為srping cloud stream binder公共配置,以此來達(dá)到對(duì)Apache Kafka
    RabbitMQ等消息中間件的擴(kuò)展

  • 1. 自定義普通消息

  • 普通消息YML配置
spring:
  cloud:
    stream:
      # 阿里rocketMq配置 topic 與 group 均以 實(shí)例id% 為前綴配置 如實(shí)例id為 MQ_INST_XXXX_XXX 則group或topic 配置 MQ_INST_XXXX_XXX%grouID
      rocketmq:
        binder:
          # 【若為阿里云購買服務(wù),則為控制臺(tái)的對(duì)外或?qū)?nèi)實(shí)例地址】【若自己搭建的服務(wù),為自定義rocketmq服務(wù)地址127.0.0.1:9876】
          name-server: http://MQ_INST_XXXX_XXX.DD.FFF.aliyuncs.com:80
          # 阿里access-key 【購買阿里服務(wù) 控制臺(tái)獲取填寫 若為自搭服務(wù)可不填】
          access-key: AAAAAAAAAAA
          # 阿里secret-key 【購買阿里服務(wù) 控制臺(tái)獲取填寫 若為自搭服務(wù)可不填】
          secret-key: BBBBBBBBBBBBBB
        # rocketMq 自定義消息通道配置
        bindings:
          # 阿里rocketMq binder 生產(chǎn)者配置
          ### 普通生產(chǎn)消息通道
          customized_output_channel: {producer.group: MQ_INST_XXXX_XXX%GID_QIGUANBANG_DEV}
          # 阿里rocketMq binder 消費(fèi)者配置
          ### 普通消息訂閱通道
          customized_input_channel: {consumer.tags: test_consumer_tag}

      # stream binder 公共配置
      bindings:
        # spring cloud stream binder 生產(chǎn)者配置
        ### 普通消息通道
        customized_output_channel: {destination: MQ_INST_XXXX_XXX%mg_common_topic, content-type: application/json}
        # spring cloud stream binder 消費(fèi)者配置
        ### 普通消息訂閱通道
        customized_input_channel: {destination: MQ_INST_XXXX_XXX%mg_common_topic, group: MQ_INST_XXXX_XXX%GID_QIGUANBANG_DEV, content-type: application/json}

關(guān)于rocketmq的group 與 topic在yml中的書寫方式付翁,官方文檔是這么寫的
topic 和 group 請(qǐng)以 實(shí)例id% 為前綴進(jìn)行配置佣渴。比如 topic 為 "test"砂竖,需要配置成 "實(shí)例id%test"
官方文檔地址 滑到最后,但是我試過去掉后也能正常使用(可能出于兼容自搭RocketMq服務(wù)的目的)三圆,可能是購買阿里服務(wù)的需要這么填寫路媚,消息軌跡或者其他內(nèi)容需要獲取實(shí)例信息,這樣書寫方便快速獲取?具體原因還需觀察源碼撤师,暫時(shí)按照官方的來。

  • 自定義channel接口

spring cloud stream 提供了自定義的Mesage接口 SourceSink供開發(fā)者使用,通過在程序啟動(dòng)類或者服務(wù)類添加注解來啟用, 如下:

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;

@Slf4j
@EnableBinding({Source.class, Sink.class})
@SpringBootApplication
public class RocketApplication {
    public static void main(String[] args) {
        SpringApplication.run(RocketApplication.class, args);
        log.debug("==========rocketMq服務(wù)啟動(dòng)成功!==========");
    }
}
@Component
@EnableBinding(Source.class)
public class RocketMqService {

Source 提供了生產(chǎn)者的接口,而Sink提供了消費(fèi)者的接口杆勇,通過觀察源碼,我們可以發(fā)現(xiàn)杏死,接口類的內(nèi)容十分簡(jiǎn)單。

  • source
package org.springframework.cloud.stream.messaging;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * Bindable interface with one output channel.
 *
 * @author Dave Syer
 * @author Marius Bogoevici
 * @see org.springframework.cloud.stream.annotation.EnableBinding
 */
public interface Source {

    /**
     * Name of the output channel.
     */
    String OUTPUT = "output";

    /**
     * @return output channel
     */
    @Output(Source.OUTPUT)
    MessageChannel output();

}
  • sink
package org.springframework.cloud.stream.messaging;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

/**
 * Bindable interface with one input channel.
 *
 * @author Dave Syer
 * @author Marius Bogoevici
 * @see org.springframework.cloud.stream.annotation.EnableBinding
 */
public interface Sink {

    /**
     * Input channel name.
     */
    String INPUT = "input";

    /**
     * @return input channel.
     */
    @Input(Sink.INPUT)
    SubscribableChannel input();

}

而且spring cloud stream 也支持我們自定義message通道,所以我們可以通過根據(jù)自己的業(yè)務(wù)來制定不同的消息通道列吼,以此來滿足我們的業(yè)務(wù)需求您炉,示列如下:

  • 自定義消息生產(chǎn)通道接口
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface OutputChannel {
    // 普通消息生產(chǎn)通道 對(duì)應(yīng)yml自定義節(jié)點(diǎn)名稱
    String NORMAL_PRODUCER_CHANNEL = "customized_output_channel";

    @Output(OutputChannel.NORMAL_PRODUCER_CHANNEL)
    MessageChannel NormalOutput();
}
  • 自定義消息訂閱通道接口
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface InputChannel {
    // 普通消息訂閱通道 對(duì)應(yīng)yml自定義節(jié)點(diǎn)名稱
    String NORMAL_CONSUMER_CHANNEL = "customized_input_channel";

    @Input(InputChannel.NORMAL_CONSUMER_CHANNEL)
    SubscribableChannel normalConsumerChannel();
}
啟動(dòng)類啟用
import com.study.rocketmq.channel.InputChannel;
import com.study.rocketmq.channel.OutputChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

@Slf4j
@EnableBinding({InputChannel.class, OutputChannel.class})
@SpringBootApplication
public class RocketApplication {
    public static void main(String[] args) {
        SpringApplication.run(RocketApplication.class, args);
        log.debug("==========rocketMq服務(wù)啟動(dòng)成功赐纱!==========");
    }
}

環(huán)境配置和代碼配置已經(jīng)好了淫痰,下面后門開始寫消息生產(chǎn)方法和消息訂閱

  • 普通消息發(fā)送
// controller
@RestController
@RequestMapping("/msg")
public class TestMsgController {

    @Autowired
    ProducerService producerService;

    @GetMapping("/sendMsg/{msg}")
    public String sendMsg(@PathVariable("msg")String msg){
        producerService.sendNormalMsg(msg, "test_consumer_tag", "testKey");
        return "SUCCESS";
    }
}
import com.study.rocketmq.channel.OutputChannel;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class ProducerService {

    @Autowired
    private OutputChannel outputChannel;
    /**
     * 發(fā)送普通消息
     * @param message          消息內(nèi)容
     * @param consumerTag   消費(fèi)者group標(biāo)識(shí)
     * @param msgKey            消息key
     * @return
     */
    public boolean sendNormalMsg(String message, String consumerTag, String msgKey){
        // 構(gòu)建消息
        Message<String> messageBuild = MessageBuilder.withPayload(message)
                .setHeader(MessageConst.PROPERTY_TAGS, consumerTag)
                .setHeader(MessageConst.PROPERTY_KEYS, msgKey)
                .build();
        // 發(fā)送消息
        boolean sendResult = outputChannel.NormalOutput().send(messageBuild);
        if (sendResult){
            log.info("普通消息發(fā)送成功-consumerTag:{}-msgKey:{}", consumerTag, msgKey);
        }else {
            log.error("普通消息發(fā)送失敳N酢!:{}",  consumerTag, msgKey);
        }
        return sendResult;
    }
}
import com.study.rocketmq.channel.InputChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

/**
  * @ClassName MessageListener
  * @author lgq
  * @description //消息監(jiān)聽
  * @Date 2021/6/9
  * @Version V1.0
  */
@Slf4j
@Component
public class MessageListener {
    
    // 通過StreamListener監(jiān)聽消息 只允許rocketmq_KEYS = testKey接收
    @StreamListener(value = InputChannel.NORMAL_CONSUMER_CHANNEL, condition = "headers['rocketmq_KEYS'] == 'testKey'")
    public void receivePayMsg(@Payload String payResult) {
        log.debug("接收到普通消息:{}", payResult);
    }
}

通過ApiPost工具請(qǐng)求到腥,默認(rèn)打印SUCCESS字符,觀察控制臺(tái)發(fā)現(xiàn)沒有發(fā)送成功。打開控制臺(tái)也沒看到我們本地的客戶端注冊(cè)上了。


ApiPost請(qǐng)求

錯(cuò)誤日志

阿里云rocketMq控制臺(tái)

后來通過查詢資料得知脓杉,可能阿里的rocketMq服務(wù)版本比較高冰寻,ons客戶端版本已經(jīng)到了4.8而spring-cloud-starter-stream-rocketmq所使用的版本才4.4.0,所以我們排除掉它自帶的依賴蜘犁,引入最新的。


image.png
<!-- rocketmq 依賴-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
            <!-- 排除自帶rocketMq-client依賴【低版本消息無法發(fā)送成功】-->
            <exclusions>
                <exclusion>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-client</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-acl</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- rocketMq -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-acl</artifactId>
            <version>4.8.0</version>
        </dependency>

然后重新啟動(dòng)则披,刷新阿里控制臺(tái)蜓萄,發(fā)現(xiàn)已經(jīng)注冊(cè)上了


阿里云rocketmq控制臺(tái)

嘗試重新發(fā)送消息


發(fā)送成功
  • 2. 自定義延時(shí)/定時(shí)消息

    • YML 添加如下配置
             bindings:
                 # 阿里rocketMq binder 生產(chǎn)者配置
                 ### 延時(shí)消息生產(chǎn) producer.sync 屬性需設(shè)置為true
                 delay_output_channel: {producer.group: MQ_INST_XXXX_XXX%GID_QIGUANBANG_DEV, producer.sync: true}
                 # 阿里rocketMq binder 消費(fèi)者配置
                 ### 延時(shí)消息訂閱
                 delay_input_channel: {consumer.tags: test_delay_tag}
             bindings:
               # spring cloud stream binder 生產(chǎn)者配置
               ### 延時(shí)消息
               delay_output_channel: {destination: MQ_INST_XXXX_XXX%common-delay-topic, content-type: application/json}
               # spring cloud stream binder 消費(fèi)者配置
               ### 延時(shí)消息訂閱
               delay_input_channel: {destination: MQ_INST_XXXX_XXX%common-delay-topic, group: MQ_INST_XXXX_XXX%GID_QIGUANBANG_DEV, content-type: application/json}
    
    • InputChannel 接口添加如下方法
        // 延時(shí)/定時(shí)消息訂閱通道 對(duì)應(yīng)yml自定義節(jié)點(diǎn)名稱
       String DELAY_CONSUMER_CHANNEL = "delay_input_channel";
       
       // 延時(shí)/定時(shí)消息訂閱
       @Input(DELAY_CONSUMER_CHANNEL)
       SubscribableChannel delayConsumerChannel();
    
    • OutputChannel 接口添加如下方法
     // 延時(shí)或定時(shí)消息生產(chǎn)通道
     String DELAY_PRODUCER_CHANNEL = "delay_output_channel";
    
     @Output(DELAY_PRODUCER_CHANNEL)
     MessageChannel delayOutput();
    
    • ProducerService 添加如下方法
         /**
      * 延時(shí)消息發(fā)送
      * @param message   延時(shí)消息體
      * @param delayLevel 延時(shí)級(jí)別 1~18 (1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 【 1=1s 2=5s 3=10s】)
      * @param ConsumerTag 消費(fèi)者TAG標(biāo)識(shí) 通過TAG區(qū)分消費(fèi)對(duì)象
      * @param MsgKey  消息key 可以通過該字段再次區(qū)分
      * @return
      */
     public boolean sendDelayMsg(String message, int delayLevel, String ConsumerTag, String MsgKey){
         // 構(gòu)建消息
         Message<String> messageBuild = MessageBuilder.withPayload(message)
                 .setHeader(MessageConst.PROPERTY_TAGS, ConsumerTag)
                 .setHeader(MessageConst.PROPERTY_KEYS, MsgKey)
                 .setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayLevel)
                 .build();
         // 發(fā)送消息
         boolean sendResult = outputChannel.delayOutput().send(messageBuild);
         if (sendResult){
             log.info("延時(shí)消息發(fā)送成功-ConsumerTag:{}-MsgKey:{}", ConsumerTag, MsgKey);
         }else {
             log.error("延時(shí)消息發(fā)送失斔凹凇V混簟:{}",  ConsumerTag, MsgKey);
         }
         return sendResult;
     }
    
    
     /**
      * https://help.aliyun.com/document_detail/43349.html
      * rocketMq 指定時(shí)間消息發(fā)送
      * @param message 消息內(nèi)容
      * @param ConsumerTag 消費(fèi)者group標(biāo)識(shí)
      * @param MsgKey 消息key
      * @param fixedTime 指定時(shí)間戳 指定時(shí)間戳必須大于當(dāng)前時(shí)間 否則立即消費(fèi) 參數(shù)可設(shè)置40天內(nèi)的任何時(shí)刻(單位毫秒)沼头,超過40天消息發(fā)送將失敗
      * @return
      */
     public boolean sendFixedTimeMsg(String message, String ConsumerTag, String MsgKey, long fixedTime){
         // 構(gòu)建消息 __STARTDELIVERTIME 為發(fā)送定時(shí)任務(wù)需要的請(qǐng)求頭
         Message<String> messageBuild = MessageBuilder.withPayload(message)
                 .setHeader(MessageConst.PROPERTY_TAGS, ConsumerTag)
                 .setHeader(MessageConst.PROPERTY_KEYS, MsgKey)
                 .setHeader("__STARTDELIVERTIME", fixedTime)
                 .build();
         // 發(fā)送消息
         boolean sendResult = outputChannel.delayOutput().send(messageBuild);
         if (sendResult){
             log.info("定時(shí)消息發(fā)送成功-ConsumerTag:{}-MsgKey:{}", ConsumerTag, MsgKey);
         }else {
             log.error("定時(shí)消息發(fā)送失斃扰浮M啤:{}",  ConsumerTag, MsgKey);
         }
         return sendResult;
     }
    
    • MessageListener 監(jiān)聽器添加如下監(jiān)聽
     // 監(jiān)聽定時(shí)/延時(shí)消息通道,只允許key = delayMsg 通過
     @StreamListener(value = InputChannel.DELAY_CONSUMER_CHANNEL, condition = "headers['rocketmq_KEYS'] == 'delayMsg'")
     public void receiveDelayMsg(@Payload String payResult) {
         log.debug("接收到延時(shí)消息:{}", payResult);
     }
    
     // 監(jiān)聽定時(shí)/延時(shí)消息通道惰说,只允許key = fixTimeMsg通過
     @StreamListener(value = InputChannel.DELAY_CONSUMER_CHANNEL, condition = "headers['rocketmq_KEYS'] == 'fixTimeMsg'")
     public void receivefixTimeMsg(@Payload String payResult) {
         log.debug("接收到定時(shí)消息:{}", payResult);
     }
    

    發(fā)送延時(shí)任務(wù) 級(jí)別定義為2【對(duì)應(yīng)5s】 消息tag:test_delay_tag磨德; 消息key:delayMsg; 消息體:延時(shí)5s吆视;

      @GetMapping("/sendMsg/{msg}")
      public String sendMsg(@PathVariable("msg")String msg){
          producerService.sendDelayMsg(msg, 2,"test_delay_tag", "delayMsg");
          return "SUCCESS";
      }
    

    發(fā)送結(jié)果:
    發(fā)送成功

    發(fā)送定時(shí)任務(wù) 消息tag:test_delay_tag剖张; 消息key:fixTimeMsg; 消息體:延時(shí)5s揩环;指定18:18:00消費(fèi)消息

      @GetMapping("/sendMsg/{msg}")
      public String sendMsg(@PathVariable("msg")String msg) throws ParseException {
          SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
          Calendar calendar = Calendar.getInstance();
          calendar.setTime(simpleDateFormat.parse("2021-06-09 18:18:00"));
          long time = calendar.getTime().getTime();
    
          producerService.sendFixedTimeMsg(msg, "test_delay_tag", "delayMsg", time);
          return "SUCCESS";
      }
    

    我們看到消息 是在18:18:01的時(shí)候消費(fèi)的搔弄,重復(fù)實(shí)驗(yàn)里幾次,發(fā)現(xiàn)偶爾會(huì)有誤差但是差距不大【1s以內(nèi)】丰滑,這也是能接受的顾犹,需要注意的是,rocketMq定時(shí)參數(shù)可設(shè)置40天內(nèi)的任何時(shí)刻(單位毫秒)褒墨,超過40天消息發(fā)送將失敗

    image.png

    image.png

三. application.yml 完整配置

spring:
  application:
    name: rocketmq-server
  cloud:
    stream:
      # 阿里rocketMq配置 topic 與 group 均以 實(shí)例id% 為前綴配置 如實(shí)例id為 MQ_INST_XXXX_XXX 則group或topic 配置 MQ_INST_XXXX_XXX%grouID
      rocketmq:
        binder:
          # 【若為阿里云購買服務(wù)炫刷,則為控制臺(tái)的對(duì)外或?qū)?nèi)實(shí)例地址】【若自己搭建的服務(wù),為自定義rocketmq服務(wù)地址127.0.0.1:9876】
          name-server: http://MQ_INST_XXXX_XXX.mq-internet-access.mq-internet.aliyuncs.com:80
          # 阿里access-key 【購買阿里服務(wù) 控制臺(tái)獲取填寫】
          access-key: LTAI4FwRvzLckUQ2xuFE4q6N
          # 阿里secret-key 【購買阿里服務(wù) 控制臺(tái)獲取填寫】
          secret-key: 2RmSqPLLdE1lSOqBtjIrd21kGw0O12
          # 自定義軌跡信息存儲(chǔ)TOPIC 默認(rèn)為 RMQ_SYS_TRACE_TOPIC
          customized-trace-topic: rmq_sys_TRACE_DATA_cn-qingdao-publictest
        # rocketMq 自定義消息通道配置
        bindings:
          # 阿里rocketMq binder 生產(chǎn)者配置
          ### 延時(shí)消息生產(chǎn) producer.sync 屬性需設(shè)置為true
          delay_output_channel: {producer.group: MQ_INST_XXXX_XXX%GID_QIGUANBANG_DEV, producer.sync: true}
          ### 普通生產(chǎn)消息
          customized_output_channel: {producer.group: MQ_INST_XXXX_XXX%GID_QIGUANBANG_DEV}
          # 阿里rocketMq binder 消費(fèi)者配置
          ### 延時(shí)消息訂閱
          delay_input_channel: {consumer.tags: test_delay_tag}
          ### 普通消息訂閱
          customized_input_channel: {consumer.tags: test_consumer_tag}
      bindings:
        # spring cloud stream binder 生產(chǎn)者配置
        ### 延時(shí)消息
        delay_output_channel: {destination: MQ_INST_XXXX_XXX%common-delay-topic, content-type: application/json}
        ### 普通消息
        customized_output_channel: {destination: MQ_INST_XXXX_XXX%mg_common_topic, content-type: application/json}

        # spring cloud stream binder 消費(fèi)者配置
        ### 延時(shí)消息訂閱
        delay_input_channel: {destination: MQ_INST_XXXX_XXX%common-delay-topic, group: MQ_INST_XXXX_XXX%GID_AQUARIUS_DELAY, content-type: application/json}
        ### 普通消息訂閱
        customized_input_channel: {destination: MQ_INST_XXXX_XXX%mg_common_topic, group: MQ_INST_XXXX_XXX%GID_QIGUANBANG_DEV, content-type: application/json}
# 服務(wù)端口號(hào)
server:
  port: 8083

# slf4j日志配置
logging:
  level:
    root: info
    com.study: debug

四.spring-alibaba-cloud-rocketmq 詳細(xì)配置選項(xiàng)

官方文檔

五.MQ消費(fèi)軌跡異常

關(guān)于阿里云控制臺(tái)郁妈,消費(fèi)消息軌跡顯示未消費(fèi)(或者其他)浑玛,但確實(shí)已經(jīng)消費(fèi)了,可以升級(jí)rocketMq-client版本解決噩咪。之前我的版本是4.8.0顾彰,升級(jí)4.9.1后問題解決。


image.png
<!-- rocketMq -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-acl</artifactId>
            <version>4.9.1</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
            <!-- 排除自帶rocketMq-client依賴【低版本消息無法發(fā)送成功】-->
            <exclusions>
                <exclusion>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-client</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-acl</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末胃碾,一起剝皮案震驚了整個(gè)濱河市涨享,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌仆百,老刑警劉巖厕隧,帶你破解...
    沈念sama閱讀 217,826評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異俄周,居然都是意外死亡吁讨,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,968評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門峦朗,熙熙樓的掌柜王于貴愁眉苦臉地迎上來建丧,“玉大人,你說我怎么就攤上這事甚垦〔杈椋” “怎么了?”我有些...
    開封第一講書人閱讀 164,234評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵艰亮,是天一觀的道長(zhǎng)闭翩。 經(jīng)常有香客問我,道長(zhǎng)迄埃,這世上最難降的妖魔是什么疗韵? 我笑而不...
    開封第一講書人閱讀 58,562評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮侄非,結(jié)果婚禮上蕉汪,老公的妹妹穿的比我還像新娘。我一直安慰自己逞怨,他們只是感情好者疤,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,611評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著叠赦,像睡著了一般驹马。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上除秀,一...
    開封第一講書人閱讀 51,482評(píng)論 1 302
  • 那天糯累,我揣著相機(jī)與錄音,去河邊找鬼册踩。 笑死泳姐,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的暂吉。 我是一名探鬼主播胖秒,決...
    沈念sama閱讀 40,271評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼慕的!你這毒婦竟也來了扒怖?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,166評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤业稼,失蹤者是張志新(化名)和其女友劉穎盗痒,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體低散,經(jīng)...
    沈念sama閱讀 45,608評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡俯邓,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,814評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了熔号。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片稽鞭。...
    茶點(diǎn)故事閱讀 39,926評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖引镊,靈堂內(nèi)的尸體忽然破棺而出朦蕴,到底是詐尸還是另有隱情篮条,我是刑警寧澤,帶...
    沈念sama閱讀 35,644評(píng)論 5 346
  • 正文 年R本政府宣布吩抓,位于F島的核電站涉茧,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏疹娶。R本人自食惡果不足惜伴栓,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,249評(píng)論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望雨饺。 院中可真熱鬧钳垮,春花似錦、人聲如沸额港。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,866評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽移斩。三九已至短荐,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間叹哭,已是汗流浹背忍宋。 一陣腳步聲響...
    開封第一講書人閱讀 32,991評(píng)論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留风罩,地道東北人糠排。 一個(gè)月前我還...
    沈念sama閱讀 48,063評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子帅腌,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,871評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容