一、理論基礎(chǔ)
1.1 RocketMQ能用來做什么
消息通訊
消息通訊是最基本也是最為簡(jiǎn)單的應(yīng)用。比較典型的一個(gè)應(yīng)用場(chǎng)景就是沒有公網(wǎng)IP的情況下菜循,外界服務(wù)無法訪問接口,可以使用消息隊(duì)列來訂閱事件來實(shí)現(xiàn)雙向通信申尤。
異步處理
對(duì)于處理頻繁且不需要即時(shí)反饋的場(chǎng)景來講癌幕,RocketMQ具備良好的性能,而且比較優(yōu)秀的消息堆積處理能力對(duì)于異步操作來說也是加分項(xiàng)昧穿。
其余功能
比如流量削峰勺远、應(yīng)用解耦等,具體可看下網(wǎng)上對(duì)于該功能的詳細(xì)講解时鸵,本文不做深入胶逢。
1.2 基礎(chǔ)概念
Topic:主題厅瞎,一級(jí)消息類型,可以配合Tag使用做細(xì)致區(qū)分初坠,不同類型的消息設(shè)置不同Topic
Tag:消息標(biāo)簽和簸,二級(jí)消息類型,用于進(jìn)一步區(qū)分某個(gè)Topic下的消息分類
Producer:生產(chǎn)者碟刺,發(fā)送消息
Consumer:消費(fèi)者锁保,一個(gè)消息可以被多個(gè)消費(fèi)者訂閱
Consumer Group:消費(fèi)者分組,為了實(shí)現(xiàn)集群消費(fèi)半沽,不同Consumer Group之間消費(fèi)進(jìn)度彼此不受影響爽柒,一個(gè)Consumer Group下包含多個(gè)Consumer實(shí)例
Producer Group:生產(chǎn)者分組,標(biāo)識(shí)發(fā)送同一類消息的Producer抄囚,通常發(fā)送邏輯一致霉赡,一個(gè)Producer Group可以發(fā)送多個(gè)Topic消息
1.3 簡(jiǎn)單說明
GitHub上有一個(gè)開源的RocketMQ工具:RocketMQ-Spring
感興趣的可以研究一下,功能實(shí)現(xiàn)很完整幔托。
二穴亏、實(shí)戰(zhàn)代碼
2.1 依賴引入
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>
<!-- 自定義的元數(shù)據(jù)依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
2.2 配置項(xiàng)
rocketmq:
# rocketmqClient日志路徑,默認(rèn)是系統(tǒng)登錄用戶的根目錄
producer:
clientLogDir: logs/rocketmq_client
# 日志級(jí)別
clientLogLevel: WARN
namesrvAddr: 127.0.0.1:9876
groupName: test
retryTimesWhenSendAsyncFailed: 1
sendMsgTimeout: 6000
brokerName: broker-a
consumer:
# 日志級(jí)別
clientLogLevel: WARN
namesrvAddr: 127.0.0.1:9876
groupName: test
threadMax: 20
threadMin: 10
備注:對(duì)于服務(wù)器硬盤不大的機(jī)器來講重挑,一定要記得設(shè)置RocketMQ的日志級(jí)別和路徑等嗓化,否則增長(zhǎng)極快的日志文件很快就會(huì)將你的硬盤塞滿。而且如果沒有后續(xù)的日志搜集與分析需求谬哀,很多日志沒必要打印刺覆。
2.3 配置文件讀取
2.3.1 producer
/**
* RocketMQ Producer 配置項(xiàng)
* @author smile
*/
@Component
@ConfigurationProperties(prefix = "rocketmq.producer")
@Data
public class ProducerProperties {
private String clientLogDir;
private String clientLogLevel;
private String namesrvAddr;
private String groupName;
private int retryTimesWhenSendAsyncFailed;
private int sendMsgTimeout;
}
2.3.2 consumer
/**
* RocketMQ consumer配置項(xiàng)
* @author smile
*/
@Component
@ConfigurationProperties(prefix = "rocketmq.consumer")
@Data
public class ConsumerProperties {
private String namesrvAddr;
private int threadMax;
private int threadMin;
private String groupName;
private String clientLogDir;
private String clientLogLevel;
}
2.4 producer初始化
/**
* 程序啟動(dòng)時(shí)初始化Producer
* @author smile
*/
@Configuration
@Slf4j
public class ProducerConfig {
private final ProducerProperties properties;
public ProducerConfig(ProducerProperties properties) {
this.properties = properties;
}
@Bean
public DefaultMQProducer getRocketMQProducer() throws MQClientException {
setClientProperty();
DefaultMQProducer producer = new DefaultMQProducer(properties.getGroupName());
producer.setNamesrvAddr(properties.getNamesrvAddr());
producer.setRetryTimesWhenSendAsyncFailed(properties.getRetryTimesWhenSendAsyncFailed());
producer.setSendMsgTimeout(properties.getSendMsgTimeout());
producer.start();
log.info("*** producer has started! groupName:[{}], namesrvAddr:[{}] ***", properties.getGroupName(), properties.getNamesrvAddr());
return producer;
}
private void setClientProperty() {
System.setProperty(ClientLogger.CLIENT_LOG_ROOT, properties.getClientLogDir());
System.setProperty(ClientLogger.CLIENT_LOG_LEVEL, properties.getClientLogLevel());
}
}
2.5 consumer示例代碼
2.5.1 Consumer管理程序
- 本實(shí)例基于一個(gè)Group的Consumer。簡(jiǎn)單測(cè)試過史煎,未做生產(chǎn)環(huán)境的深度測(cè)試
- 可實(shí)現(xiàn)簡(jiǎn)單的Consumer初始化谦屑、新增訂閱、取消訂閱功能
/**
* @author smile
*/
@Slf4j
public class ConsumerManager {
private static final ConsumerManager MANAGER = new ConsumerManager();
private static final String TAGS_SEP = "||";
private static Map<String, String> subscription = new HashMap<String, String>(8) {
{
// 系統(tǒng)消息:訂閱與取消訂閱的事件
put("sys", "subscribe||unsubscribe");
}
};
private static DefaultMQPushConsumer consumer;
/**
* 單例篇梭,不允許外界主動(dòng)實(shí)例化
*/
private ConsumerManager() {
}
public static ConsumerManager getInstance() {
return MANAGER;
}
/**
* 初始化Consumer氢橙,本示例初始化一個(gè)ConsumerGroup
* 后續(xù)所有的訂閱與取消訂閱都是在一個(gè)consumer實(shí)例下進(jìn)行
*/
public void initConsumer(ConsumerProperties properties) throws MQClientException {
// 設(shè)置client日志信息, producer初始化時(shí)已配置,此處不再配置
// System.setProperty(ClientLogger.CLIENT_LOG_ROOT, properties.getClientLogDir());
// System.setProperty(ClientLogger.CLIENT_LOG_LEVEL, properties.getClientLogLevel());
consumer = new DefaultMQPushConsumer(properties.getGroupName());
consumer.setNamesrvAddr(properties.getNamesrvAddr());
consumer.setConsumeThreadMax(properties.getThreadMax());
consumer.setConsumeThreadMin(properties.getThreadMin());
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
log.info("consumer topic and tags : {}", subscription);
for (Map.Entry<String, String> entry : subscription.entrySet()) {
consumer.subscribe(entry.getKey(), entry.getValue());
}
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try {
MessageExt msg = msgs.get(0);
String msgBody = new String(msg.getBody(), "utf-8");
log.info("receive message, messageId:[{}], messageBody:{}恬偷, topic:[{}], tag:[{}]",
msg.getMsgId(), msgBody, msg.getTopic(), msg.getTags());
log.info("delay: [{}] ms", (System.currentTimeMillis() - msg.getBornTimestamp()));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
// 如果執(zhí)行異常悍手,則稍后會(huì)重新消費(fèi)
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
log.info("consumer has started! NamesrvAddr:[{}], groupName:[{}]", properties.getNamesrvAddr(), properties.getGroupName());
}
/**
* 訂閱新的事件
* @param topic topic
* @param tags tags,支持多個(gè)tag訂閱袍患,格式:TagA||TagB
* @throws MQClientException
*/
public void subscribe(String topic, String tags) throws MQClientException {
if (subscription.containsKey(topic)) {
tags = StringUtils.join(subscription.get(topic), TAGS_SEP, tags);
}
consumer.subscribe(topic, tags);
subscription.put(topic, tags);
log.info("!!刷新訂閱, {}", subscription);
}
/**
* 取消訂閱
*/
public void unsubscribe(String topic, String tags) throws MQClientException {
if (!subscription.containsKey(topic)) {
log.error("topic is not found");
return;
}
String[] unsubscribeTags = tags.trim().split("\\|\\|");
String[] existingTags = subscription.get(topic).trim().split("\\|\\|");
log.info("unsubscribeTags: {}, existingTags: {}", unsubscribeTags, existingTags);
StringBuilder newTagsBuilder = new StringBuilder();
for (String existingTag : existingTags) {
if (!ArrayUtils.contains(unsubscribeTags, existingTag)) {
newTagsBuilder.append(existingTag).append(TAGS_SEP);
}
}
if (tags.length() == 0) {
consumer.unsubscribe(topic);
return;
}
String newTags = newTagsBuilder.substring(0, newTagsBuilder.length() - 2);
log.info("newTags: {}", newTagsBuilder);
consumer.subscribe(topic, newTags);
subscription.put(topic, newTags);
log.info("!!取消訂閱坦康,新的訂閱列表, {}", subscription);
}
}
2.5.2 初始化Consumer
/**
* @author smile
*/
@Component
public class ConsumerInit implements CommandLineRunner {
private final ConsumerProperties properties;
public ConsumerInit(ConsumerProperties properties) {
this.properties = properties;
}
@Override
public void run(String... args) throws Exception {
ConsumerManager.getInstance().initConsumer(properties);
}
}