spring cloud stream 介紹(照搬)
- Spring Cloud Stream 是一個(gè)用于構(gòu)建基于消息的微服務(wù)應(yīng)用框架。它基于 SpringBoot 來創(chuàng)建具有生產(chǎn)級(jí)別的單機(jī) Spring 應(yīng)用粱甫,并且使用
Spring Integration
與 Broker 進(jìn)行連接。- Spring Cloud Stream 提供了消息中間件配置的統(tǒng)一抽象,推出了 publish-subscribe瞒大、consumer groups、partition 這些統(tǒng)一的概念酗电。
- Spring Cloud Stream 內(nèi)部有兩個(gè)概念:Binder 和 Binding德绿。
- Binder: 跟外部消息中間件集成的組件蕴纳,用來創(chuàng)建 Binding,各消息中間件都有自己的 Binder 實(shí)現(xiàn)。
比如Kafka
的實(shí)現(xiàn)KafkaMessageChannelBinder
,RabbitMQ
的實(shí)現(xiàn)RabbitMessageChannelBinder
以及RocketMQ
的實(shí)現(xiàn)RocketMQMessageChannelBinder
案狠。- Binding: 包括 Input Binding 和 Output Binding。
- Binding 在消息中間件與應(yīng)用程序提供的 Provider 和 Consumer 之間提供了一個(gè)橋梁灿椅,實(shí)現(xiàn)了開發(fā)者只需使用應(yīng)用程序的 Provider 或 Consumer 生產(chǎn)或消費(fèi)數(shù)據(jù)即可,屏蔽了開發(fā)者與底層消息中間件的接觸馍刮。
版本選擇
因?yàn)椴皇怯糜陂_發(fā),僅供學(xué)習(xí)用所以我參照了下阿里的版本,選用了最新的,具體依據(jù)自身項(xiàng)目做參考
阿里git版本說明傳送門
阿里組件
畢業(yè)版本依賴
一. pom.xml 配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.2</version>
</parent>
<groupId>org.example</groupId>
<artifactId>spring-alibaba-cloud-rocketmq-studytest</artifactId>
<version>1.0-SNAPSHOT</version>
<description>阿里巴巴cloud-rocketmq學(xué)習(xí)</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencyManagement>
<dependencies>
<!-- spring cloud 版本依賴-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>2020.0.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- spring alibaba cloud 版本依賴-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2021.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<!-- rocketmq 依賴-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
<!-- spring boot web依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>
二. 自定義消息channel與rocketMq配置
上面我們引入了spring cloud alibab rocketmq相關(guān)依賴滥朱,下面我們開始消息通道與yml關(guān)于rocketmq的配置
由于阿里的spring-cloud-starter-stream-rocketmq 是依賴spring的stream binder實(shí)現(xiàn)的,所以rocketMq配置分為rocketMq的自定義配置與stream binder的公共配置,如下:
spring.cloud.stream.rocketmq
為rocketmq自定義配置spring.cloud.stream.bindings
為srping cloud stream binder公共配置,以此來達(dá)到對(duì)Apache Kafka
RabbitMQ等消息中間件的擴(kuò)展
-
1. 自定義普通消息
-
普通消息YML配置
spring:
cloud:
stream:
# 阿里rocketMq配置 topic 與 group 均以 實(shí)例id% 為前綴配置 如實(shí)例id為 MQ_INST_XXXX_XXX 則group或topic 配置 MQ_INST_XXXX_XXX%grouID
rocketmq:
binder:
# 【若為阿里云購買服務(wù),則為控制臺(tái)的對(duì)外或?qū)?nèi)實(shí)例地址】【若自己搭建的服務(wù),為自定義rocketmq服務(wù)地址127.0.0.1:9876】
name-server: http://MQ_INST_XXXX_XXX.DD.FFF.aliyuncs.com:80
# 阿里access-key 【購買阿里服務(wù) 控制臺(tái)獲取填寫 若為自搭服務(wù)可不填】
access-key: AAAAAAAAAAA
# 阿里secret-key 【購買阿里服務(wù) 控制臺(tái)獲取填寫 若為自搭服務(wù)可不填】
secret-key: BBBBBBBBBBBBBB
# rocketMq 自定義消息通道配置
bindings:
# 阿里rocketMq binder 生產(chǎn)者配置
### 普通生產(chǎn)消息通道
customized_output_channel: {producer.group: MQ_INST_XXXX_XXX%GID_QIGUANBANG_DEV}
# 阿里rocketMq binder 消費(fèi)者配置
### 普通消息訂閱通道
customized_input_channel: {consumer.tags: test_consumer_tag}
# stream binder 公共配置
bindings:
# spring cloud stream binder 生產(chǎn)者配置
### 普通消息通道
customized_output_channel: {destination: MQ_INST_XXXX_XXX%mg_common_topic, content-type: application/json}
# spring cloud stream binder 消費(fèi)者配置
### 普通消息訂閱通道
customized_input_channel: {destination: MQ_INST_XXXX_XXX%mg_common_topic, group: MQ_INST_XXXX_XXX%GID_QIGUANBANG_DEV, content-type: application/json}
關(guān)于rocketmq的group 與 topic在yml中的書寫方式付翁,官方文檔是這么寫的
topic 和 group 請(qǐng)以 實(shí)例id% 為前綴進(jìn)行配置佣渴。比如 topic 為 "test"砂竖,需要配置成 "實(shí)例id%test"
官方文檔地址 滑到最后,但是我試過去掉后也能正常使用(可能出于兼容自搭RocketMq服務(wù)的目的)三圆,可能是購買阿里服務(wù)的需要這么填寫路媚,消息軌跡或者其他內(nèi)容需要獲取實(shí)例信息,這樣書寫方便快速獲取?具體原因還需觀察源碼撤师,暫時(shí)按照官方的來。
-
自定義channel接口
spring cloud stream 提供了自定義的Mesage接口
Source
和Sink
供開發(fā)者使用,通過在程序啟動(dòng)類
或者服務(wù)類
添加注解來啟用, 如下:
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
@Slf4j
@EnableBinding({Source.class, Sink.class})
@SpringBootApplication
public class RocketApplication {
public static void main(String[] args) {
SpringApplication.run(RocketApplication.class, args);
log.debug("==========rocketMq服務(wù)啟動(dòng)成功!==========");
}
}
@Component
@EnableBinding(Source.class)
public class RocketMqService {
Source
提供了生產(chǎn)者的接口,而Sink
提供了消費(fèi)者的接口杆勇,通過觀察源碼,我們可以發(fā)現(xiàn)杏死,接口類的內(nèi)容十分簡(jiǎn)單。
-
source
package org.springframework.cloud.stream.messaging;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* Bindable interface with one output channel.
*
* @author Dave Syer
* @author Marius Bogoevici
* @see org.springframework.cloud.stream.annotation.EnableBinding
*/
public interface Source {
/**
* Name of the output channel.
*/
String OUTPUT = "output";
/**
* @return output channel
*/
@Output(Source.OUTPUT)
MessageChannel output();
}
-
sink
package org.springframework.cloud.stream.messaging;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
/**
* Bindable interface with one input channel.
*
* @author Dave Syer
* @author Marius Bogoevici
* @see org.springframework.cloud.stream.annotation.EnableBinding
*/
public interface Sink {
/**
* Input channel name.
*/
String INPUT = "input";
/**
* @return input channel.
*/
@Input(Sink.INPUT)
SubscribableChannel input();
}
而且spring cloud stream 也支持我們自定義message通道,所以我們可以通過根據(jù)自己的業(yè)務(wù)來制定不同的消息通道列吼,以此來滿足我們的業(yè)務(wù)需求您炉,示列如下:
-
自定義消息生產(chǎn)通道接口
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface OutputChannel {
// 普通消息生產(chǎn)通道 對(duì)應(yīng)yml自定義節(jié)點(diǎn)名稱
String NORMAL_PRODUCER_CHANNEL = "customized_output_channel";
@Output(OutputChannel.NORMAL_PRODUCER_CHANNEL)
MessageChannel NormalOutput();
}
-
自定義消息訂閱通道接口
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface InputChannel {
// 普通消息訂閱通道 對(duì)應(yīng)yml自定義節(jié)點(diǎn)名稱
String NORMAL_CONSUMER_CHANNEL = "customized_input_channel";
@Input(InputChannel.NORMAL_CONSUMER_CHANNEL)
SubscribableChannel normalConsumerChannel();
}
啟動(dòng)類啟用
import com.study.rocketmq.channel.InputChannel;
import com.study.rocketmq.channel.OutputChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
@Slf4j
@EnableBinding({InputChannel.class, OutputChannel.class})
@SpringBootApplication
public class RocketApplication {
public static void main(String[] args) {
SpringApplication.run(RocketApplication.class, args);
log.debug("==========rocketMq服務(wù)啟動(dòng)成功赐纱!==========");
}
}
環(huán)境配置和代碼配置已經(jīng)好了淫痰,下面后門開始寫消息生產(chǎn)方法和消息訂閱
-
普通消息發(fā)送
// controller
@RestController
@RequestMapping("/msg")
public class TestMsgController {
@Autowired
ProducerService producerService;
@GetMapping("/sendMsg/{msg}")
public String sendMsg(@PathVariable("msg")String msg){
producerService.sendNormalMsg(msg, "test_consumer_tag", "testKey");
return "SUCCESS";
}
}
import com.study.rocketmq.channel.OutputChannel;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class ProducerService {
@Autowired
private OutputChannel outputChannel;
/**
* 發(fā)送普通消息
* @param message 消息內(nèi)容
* @param consumerTag 消費(fèi)者group標(biāo)識(shí)
* @param msgKey 消息key
* @return
*/
public boolean sendNormalMsg(String message, String consumerTag, String msgKey){
// 構(gòu)建消息
Message<String> messageBuild = MessageBuilder.withPayload(message)
.setHeader(MessageConst.PROPERTY_TAGS, consumerTag)
.setHeader(MessageConst.PROPERTY_KEYS, msgKey)
.build();
// 發(fā)送消息
boolean sendResult = outputChannel.NormalOutput().send(messageBuild);
if (sendResult){
log.info("普通消息發(fā)送成功-consumerTag:{}-msgKey:{}", consumerTag, msgKey);
}else {
log.error("普通消息發(fā)送失敳N酢!:{}", consumerTag, msgKey);
}
return sendResult;
}
}
import com.study.rocketmq.channel.InputChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
/**
* @ClassName MessageListener
* @author lgq
* @description //消息監(jiān)聽
* @Date 2021/6/9
* @Version V1.0
*/
@Slf4j
@Component
public class MessageListener {
// 通過StreamListener監(jiān)聽消息 只允許rocketmq_KEYS = testKey接收
@StreamListener(value = InputChannel.NORMAL_CONSUMER_CHANNEL, condition = "headers['rocketmq_KEYS'] == 'testKey'")
public void receivePayMsg(@Payload String payResult) {
log.debug("接收到普通消息:{}", payResult);
}
}
通過ApiPost工具請(qǐng)求到腥,默認(rèn)打印SUCCESS字符,觀察控制臺(tái)發(fā)現(xiàn)沒有發(fā)送成功。打開控制臺(tái)也沒看到我們本地的客戶端注冊(cè)上了。
ApiPost請(qǐng)求
錯(cuò)誤日志
阿里云rocketMq控制臺(tái)
后來通過查詢資料得知脓杉,可能阿里的rocketMq服務(wù)版本比較高冰寻,ons客戶端版本已經(jīng)到了4.8而spring-cloud-starter-stream-rocketmq所使用的版本才4.4.0,所以我們排除掉它自帶的依賴蜘犁,引入最新的。
<!-- rocketmq 依賴-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<!-- 排除自帶rocketMq-client依賴【低版本消息無法發(fā)送成功】-->
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- rocketMq -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.8.0</version>
</dependency>
然后重新啟動(dòng)则披,刷新阿里控制臺(tái)蜓萄,發(fā)現(xiàn)已經(jīng)注冊(cè)上了
阿里云rocketmq控制臺(tái)
嘗試重新發(fā)送消息
發(fā)送成功
-
2. 自定義延時(shí)/定時(shí)消息
- YML 添加如下配置
bindings: # 阿里rocketMq binder 生產(chǎn)者配置 ### 延時(shí)消息生產(chǎn) producer.sync 屬性需設(shè)置為true delay_output_channel: {producer.group: MQ_INST_XXXX_XXX%GID_QIGUANBANG_DEV, producer.sync: true} # 阿里rocketMq binder 消費(fèi)者配置 ### 延時(shí)消息訂閱 delay_input_channel: {consumer.tags: test_delay_tag} bindings: # spring cloud stream binder 生產(chǎn)者配置 ### 延時(shí)消息 delay_output_channel: {destination: MQ_INST_XXXX_XXX%common-delay-topic, content-type: application/json} # spring cloud stream binder 消費(fèi)者配置 ### 延時(shí)消息訂閱 delay_input_channel: {destination: MQ_INST_XXXX_XXX%common-delay-topic, group: MQ_INST_XXXX_XXX%GID_QIGUANBANG_DEV, content-type: application/json}
- InputChannel 接口添加如下方法
// 延時(shí)/定時(shí)消息訂閱通道 對(duì)應(yīng)yml自定義節(jié)點(diǎn)名稱 String DELAY_CONSUMER_CHANNEL = "delay_input_channel"; // 延時(shí)/定時(shí)消息訂閱 @Input(DELAY_CONSUMER_CHANNEL) SubscribableChannel delayConsumerChannel();
- OutputChannel 接口添加如下方法
// 延時(shí)或定時(shí)消息生產(chǎn)通道 String DELAY_PRODUCER_CHANNEL = "delay_output_channel"; @Output(DELAY_PRODUCER_CHANNEL) MessageChannel delayOutput();
- ProducerService 添加如下方法
/** * 延時(shí)消息發(fā)送 * @param message 延時(shí)消息體 * @param delayLevel 延時(shí)級(jí)別 1~18 (1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 【 1=1s 2=5s 3=10s】) * @param ConsumerTag 消費(fèi)者TAG標(biāo)識(shí) 通過TAG區(qū)分消費(fèi)對(duì)象 * @param MsgKey 消息key 可以通過該字段再次區(qū)分 * @return */ public boolean sendDelayMsg(String message, int delayLevel, String ConsumerTag, String MsgKey){ // 構(gòu)建消息 Message<String> messageBuild = MessageBuilder.withPayload(message) .setHeader(MessageConst.PROPERTY_TAGS, ConsumerTag) .setHeader(MessageConst.PROPERTY_KEYS, MsgKey) .setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayLevel) .build(); // 發(fā)送消息 boolean sendResult = outputChannel.delayOutput().send(messageBuild); if (sendResult){ log.info("延時(shí)消息發(fā)送成功-ConsumerTag:{}-MsgKey:{}", ConsumerTag, MsgKey); }else { log.error("延時(shí)消息發(fā)送失斔凹凇V混簟:{}", ConsumerTag, MsgKey); } return sendResult; } /** * https://help.aliyun.com/document_detail/43349.html * rocketMq 指定時(shí)間消息發(fā)送 * @param message 消息內(nèi)容 * @param ConsumerTag 消費(fèi)者group標(biāo)識(shí) * @param MsgKey 消息key * @param fixedTime 指定時(shí)間戳 指定時(shí)間戳必須大于當(dāng)前時(shí)間 否則立即消費(fèi) 參數(shù)可設(shè)置40天內(nèi)的任何時(shí)刻(單位毫秒)沼头,超過40天消息發(fā)送將失敗 * @return */ public boolean sendFixedTimeMsg(String message, String ConsumerTag, String MsgKey, long fixedTime){ // 構(gòu)建消息 __STARTDELIVERTIME 為發(fā)送定時(shí)任務(wù)需要的請(qǐng)求頭 Message<String> messageBuild = MessageBuilder.withPayload(message) .setHeader(MessageConst.PROPERTY_TAGS, ConsumerTag) .setHeader(MessageConst.PROPERTY_KEYS, MsgKey) .setHeader("__STARTDELIVERTIME", fixedTime) .build(); // 發(fā)送消息 boolean sendResult = outputChannel.delayOutput().send(messageBuild); if (sendResult){ log.info("定時(shí)消息發(fā)送成功-ConsumerTag:{}-MsgKey:{}", ConsumerTag, MsgKey); }else { log.error("定時(shí)消息發(fā)送失斃扰浮M啤:{}", ConsumerTag, MsgKey); } return sendResult; }
- MessageListener 監(jiān)聽器添加如下監(jiān)聽
// 監(jiān)聽定時(shí)/延時(shí)消息通道,只允許key = delayMsg 通過 @StreamListener(value = InputChannel.DELAY_CONSUMER_CHANNEL, condition = "headers['rocketmq_KEYS'] == 'delayMsg'") public void receiveDelayMsg(@Payload String payResult) { log.debug("接收到延時(shí)消息:{}", payResult); } // 監(jiān)聽定時(shí)/延時(shí)消息通道惰说,只允許key = fixTimeMsg通過 @StreamListener(value = InputChannel.DELAY_CONSUMER_CHANNEL, condition = "headers['rocketmq_KEYS'] == 'fixTimeMsg'") public void receivefixTimeMsg(@Payload String payResult) { log.debug("接收到定時(shí)消息:{}", payResult); }
發(fā)送延時(shí)任務(wù) 級(jí)別定義為2【對(duì)應(yīng)5s】 消息tag:test_delay_tag磨德; 消息key:delayMsg; 消息體:延時(shí)5s吆视;
@GetMapping("/sendMsg/{msg}") public String sendMsg(@PathVariable("msg")String msg){ producerService.sendDelayMsg(msg, 2,"test_delay_tag", "delayMsg"); return "SUCCESS"; }
發(fā)送結(jié)果:
發(fā)送成功發(fā)送定時(shí)任務(wù) 消息tag:test_delay_tag剖张; 消息key:fixTimeMsg; 消息體:延時(shí)5s揩环;指定18:18:00消費(fèi)消息
@GetMapping("/sendMsg/{msg}") public String sendMsg(@PathVariable("msg")String msg) throws ParseException { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Calendar calendar = Calendar.getInstance(); calendar.setTime(simpleDateFormat.parse("2021-06-09 18:18:00")); long time = calendar.getTime().getTime(); producerService.sendFixedTimeMsg(msg, "test_delay_tag", "delayMsg", time); return "SUCCESS"; }
我們看到消息 是在18:18:01的時(shí)候消費(fèi)的搔弄,重復(fù)實(shí)驗(yàn)里幾次,發(fā)現(xiàn)偶爾會(huì)有誤差但是差距不大【1s以內(nèi)】丰滑,這也是能接受的顾犹,需要注意的是,
rocketMq定時(shí)參數(shù)可設(shè)置40天內(nèi)的任何時(shí)刻(單位毫秒)褒墨,超過40天消息發(fā)送將失敗
image.png
image.png
三. application.yml 完整配置
spring:
application:
name: rocketmq-server
cloud:
stream:
# 阿里rocketMq配置 topic 與 group 均以 實(shí)例id% 為前綴配置 如實(shí)例id為 MQ_INST_XXXX_XXX 則group或topic 配置 MQ_INST_XXXX_XXX%grouID
rocketmq:
binder:
# 【若為阿里云購買服務(wù)炫刷,則為控制臺(tái)的對(duì)外或?qū)?nèi)實(shí)例地址】【若自己搭建的服務(wù),為自定義rocketmq服務(wù)地址127.0.0.1:9876】
name-server: http://MQ_INST_XXXX_XXX.mq-internet-access.mq-internet.aliyuncs.com:80
# 阿里access-key 【購買阿里服務(wù) 控制臺(tái)獲取填寫】
access-key: LTAI4FwRvzLckUQ2xuFE4q6N
# 阿里secret-key 【購買阿里服務(wù) 控制臺(tái)獲取填寫】
secret-key: 2RmSqPLLdE1lSOqBtjIrd21kGw0O12
# 自定義軌跡信息存儲(chǔ)TOPIC 默認(rèn)為 RMQ_SYS_TRACE_TOPIC
customized-trace-topic: rmq_sys_TRACE_DATA_cn-qingdao-publictest
# rocketMq 自定義消息通道配置
bindings:
# 阿里rocketMq binder 生產(chǎn)者配置
### 延時(shí)消息生產(chǎn) producer.sync 屬性需設(shè)置為true
delay_output_channel: {producer.group: MQ_INST_XXXX_XXX%GID_QIGUANBANG_DEV, producer.sync: true}
### 普通生產(chǎn)消息
customized_output_channel: {producer.group: MQ_INST_XXXX_XXX%GID_QIGUANBANG_DEV}
# 阿里rocketMq binder 消費(fèi)者配置
### 延時(shí)消息訂閱
delay_input_channel: {consumer.tags: test_delay_tag}
### 普通消息訂閱
customized_input_channel: {consumer.tags: test_consumer_tag}
bindings:
# spring cloud stream binder 生產(chǎn)者配置
### 延時(shí)消息
delay_output_channel: {destination: MQ_INST_XXXX_XXX%common-delay-topic, content-type: application/json}
### 普通消息
customized_output_channel: {destination: MQ_INST_XXXX_XXX%mg_common_topic, content-type: application/json}
# spring cloud stream binder 消費(fèi)者配置
### 延時(shí)消息訂閱
delay_input_channel: {destination: MQ_INST_XXXX_XXX%common-delay-topic, group: MQ_INST_XXXX_XXX%GID_AQUARIUS_DELAY, content-type: application/json}
### 普通消息訂閱
customized_input_channel: {destination: MQ_INST_XXXX_XXX%mg_common_topic, group: MQ_INST_XXXX_XXX%GID_QIGUANBANG_DEV, content-type: application/json}
# 服務(wù)端口號(hào)
server:
port: 8083
# slf4j日志配置
logging:
level:
root: info
com.study: debug
四.spring-alibaba-cloud-rocketmq 詳細(xì)配置選項(xiàng)
五.MQ消費(fèi)軌跡異常
關(guān)于阿里云控制臺(tái)郁妈,消費(fèi)消息軌跡顯示未消費(fèi)(或者其他)浑玛,但確實(shí)已經(jīng)消費(fèi)了,可以升級(jí)rocketMq-client版本解決噩咪。之前我的版本是4.8.0顾彰,升級(jí)4.9.1后問題解決。
image.png
<!-- rocketMq -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.9.1</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<!-- 排除自帶rocketMq-client依賴【低版本消息無法發(fā)送成功】-->
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
</exclusion>
</exclusions>
</dependency>