一摄悯、基礎(chǔ)概念
- Producer:消息生產(chǎn)者
- Producer Group:消息生產(chǎn)者組瘪松,發(fā)送同類消息的一個(gè)消息生產(chǎn)組
- Consumer:消費(fèi)者
- Consumer Group:消費(fèi)同個(gè)消息的多個(gè)實(shí)例
- Tag:標(biāo)簽预明,子主題(二級(jí)分類),用于區(qū)分同一個(gè)主題下的不同業(yè)務(wù)的消息
- Topic:主題
- Message:消息
- Broker:MQ程序任柜,接收生產(chǎn)的消息哨鸭,提供給消費(fèi)者消費(fèi)的程序
- Name Server:給生產(chǎn)和消費(fèi)者提供路由信息,提供輕量級(jí)的服務(wù)發(fā)現(xiàn)和路由
關(guān)于其服務(wù)端的構(gòu)建:
- 通過數(shù)據(jù)存儲(chǔ)服務(wù)broker往枣,這玩意只做數(shù)據(jù)存儲(chǔ)處理相關(guān),不接受對外收發(fā)請求座享,只和name server交互婉商,為保證數(shù)據(jù)的可靠和穩(wěn)定性,提供主從策略渣叛,多節(jié)點(diǎn)備份丈秩,當(dāng)主節(jié)點(diǎn)掛了從節(jié)點(diǎn)繼續(xù)提供服務(wù);
- 服務(wù)發(fā)現(xiàn)和路由器name server淳衙,這個(gè)相當(dāng)于微服務(wù)中的注冊中心蘑秽,它負(fù)責(zé)進(jìn)行路由的分發(fā),以及broker集群的管理箫攀,同時(shí)為了提供高可用肠牲,name server集群其實(shí)不叫集群,它們互不影響靴跛,任意一個(gè)掛了對整個(gè)集群依然正常工作缀雳。
相關(guān)文檔資料
二、簡單使用示例
2.1 使用基礎(chǔ)的 rocketmq-client包來實(shí)現(xiàn)
這個(gè)包中包含了封裝的RocketMQ相關(guān)的TCP連接操作梢睛。
2.1.1 添加maven依賴和配置
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
在application配置文件添加如下配置
rocketmq:
name-server: 192.168.111.63:9876
producer:
group: client-server
創(chuàng)建DefaultMQProducer構(gòu)建工廠類
@Slf4j
@Component
public class RocketProducerBuilder implements DisposableBean {
/**
* NameServer 地址
*/
@Value(value = "${rocketmq.name-server}")
private String nameServerAddr;
/**
* 生產(chǎn)者的組名
*/
@Value(value = "${rocketmq.producer.group}")
private String producerGroup;
private DefaultMQProducer producer;
/**
* 初始化DefaultMQProducer
*
* 參考rocketmq-spring-boot包中的org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration類
*
* @throws MQClientException 啟動(dòng)消息生產(chǎn)者異常
*/
@PostConstruct
void init() throws MQClientException {
//生產(chǎn)者的組名
producer = new DefaultMQProducer(producerGroup);
/// 指定NameServer地址肥印,多個(gè)地址以 ; 隔開
producer.setNamesrvAddr(nameServerAddr);
// 關(guān)閉Channel通道
producer.setVipChannelEnabled(false);
// 發(fā)送消息超時(shí)時(shí)間,單位毫秒
producer.setSendMsgTimeout(3000);
// 在同步模式下绝葡,消息發(fā)送失敗后重試次數(shù)深碱,注意這個(gè)可能導(dǎo)致重復(fù)消息
producer.setRetryTimesWhenSendFailed(2);
// 在異步模式下,消息發(fā)送失敗后重試次數(shù)藏畅,注意這個(gè)可能導(dǎo)致重復(fù)消息
producer.setRetryTimesWhenSendAsyncFailed(2);
// 發(fā)送消息的消息體網(wǎng)絡(luò)包最大值
producer.setMaxMessageSize(1024 * 1024 * 4);
// 當(dāng)消息體網(wǎng)絡(luò)包大于4k時(shí)壓縮消息
producer.setCompressMsgBodyOverHowmuch(1024 * 4);
// 當(dāng)向一個(gè)broker發(fā)送消息失敗了敷硅,是否重新嘗試下一個(gè)
producer.setRetryAnotherBrokerWhenNotStoreOK(false);
// Producer對象在使用之前必須要調(diào)用start初始化,只能初始化一次
producer.start();
}
/**
* 獲取DefaultMQProducer
* @return 返回消息生產(chǎn)者DefaultMQProducer
*/
public DefaultMQProducer build(){
return this.producer;
}
@Override
public void destroy() {
if(null!=producer){
producer.shutdown();
log.info("Rocket Producer Destroyed");
}
}
}
2.1.2 發(fā)送普通消息
普通消息分為:同步(Sync)發(fā)送愉阎、異步(Async)發(fā)送和單向(Oneway)發(fā)送绞蹦。
- 發(fā)送同步消息示例;注意同步消息會(huì)阻塞等待消息發(fā)送結(jié)果榜旦,適用場景:重要通知郵件坦辟、報(bào)名短信通知、營銷短信系統(tǒng)等章办。
@Autowired
private RocketProducerBuilder producerBuilder;
/**
* 同步消息
*/
@PostMapping("/general")
public Mono<SendResult> sendMessage(@RequestBody OrderDTO orderDTO){
DefaultMQProducer producer = producerBuilder.build();
Message message = new Message();
message.setTopic("demo-pay");
message.setTags("train");
message.setKeys(UUID.randomUUID().toString());
message.setBody(JSONObject.toJSONString(orderDTO).getBytes(StandardCharsets.UTF_8));
return Mono.defer(()-> {
try {
// 注意這個(gè)方法在發(fā)送失敗了會(huì)重試锉走,消費(fèi)者需要做好處理
return Mono.just(producer.send(message, 3000));
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
return Mono.error(e);
}
});
}
普通消息應(yīng)該是最常用的消息滨彻,需要注意的是DefaultMQProducer的send方法有重試機(jī)制,具體查看org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl類中的sendDefaultImpl()方法挪蹭;
因此消費(fèi)者需要處理注意接口的冥等性亭饵。
- 發(fā)送異步消息;異步發(fā)送是指發(fā)送方發(fā)出一條消息后梁厉,不等服務(wù)端返回響應(yīng)辜羊,接著發(fā)送下一條消息的通訊方式。適用場景:異步發(fā)送一般用于鏈路耗時(shí)較長词顾,對響應(yīng)時(shí)間較為敏感的業(yè)務(wù)場景八秃。
/**
* 異步消息
*/
@PostMapping("/async")
public String asyncSend(@Validated @RequestBody OrderDTO orderDTO) {
DefaultMQProducer producer = producerBuilder.build();
Message message = new Message();
message.setTopic("demo-pay");
message.setTags("train");
message.setKeys(UUID.randomUUID().toString());
message.setBody(JSONObject.toJSONString(orderDTO).getBytes(StandardCharsets.UTF_8));
try {
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("異步消息發(fā)送結(jié)果:{}", sendResult);
}
@Override
public void onException(Throwable e) {
log.error("異步消息發(fā)送異常:", e);
}
}, 3000);
} catch (Exception e) {
log.error("異步消息發(fā)送異常:", e);
}
return "發(fā)送成功";
}
- 單向發(fā)送;發(fā)送方只負(fù)責(zé)發(fā)送消息肉盹,不等待服務(wù)端返回響應(yīng)且沒有回調(diào)函數(shù)觸發(fā)昔驱,即只發(fā)送請求不等待應(yīng)答。此方式發(fā)送消息的過程耗時(shí)非常短上忍,一般在微秒級(jí)別骤肛。適用場景:適用于某些耗時(shí)非常短,但對可靠性要求并不高的場景窍蓝,例如日志收集腋颠。
/**
* 單向消息
*/
@PostMapping("/oneway")
public String sendOneway(@Validated @RequestBody OrderDTO orderDTO) {
DefaultMQProducer producer = producerBuilder.build();
Message message = new Message();
message.setTopic("demo-pay");
message.setTags("train");
message.setKeys(UUID.randomUUID().toString());
message.setBody(JSONObject.toJSONString(orderDTO).getBytes(StandardCharsets.UTF_8));
try {
producer.sendOneway(message);
} catch (Exception e) {
log.error("單向消息發(fā)送異常:", e);
}
return "發(fā)送成功";
}
2.1.3 發(fā)送延時(shí)消息
延時(shí)消息用于指定消息發(fā)送后,延時(shí)一段時(shí)間才被投遞到客戶端進(jìn)行消費(fèi)(例如 3 秒后才被消費(fèi))吓笙,適用于解決一些消息生產(chǎn)和消費(fèi)有時(shí)間窗口要求的場景淑玫,或者通過消息觸發(fā)延遲任務(wù)的場景,類似于延遲隊(duì)列面睛。
注意:開源版本的僅支持18個(gè)等級(jí)的延遲消息絮蒿,阿里云官方的商業(yè)版支持任意時(shí)間的延時(shí)消息停撞;
延時(shí)等級(jí)(delayLevel)對應(yīng)的時(shí)間: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
/**
* 延時(shí)消息
*/
@PostMapping("/delay")
public SendResult delayMessage(@Validated @RequestBody OrderDTO orderDTO){
Message message = new Message();
message.setTopic("demo-pay");
message.setTags("train");
message.setKeys(UUID.randomUUID().toString());
message.setBody(JSONObject.toJSONString(orderDTO).getBytes(StandardCharsets.UTF_8));
// 延時(shí)等級(jí)(delayLevel)對應(yīng)的時(shí)間 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h档插,從1開始
message.setDelayTimeLevel(3);
try {
return producerBuilder.build().send(message);
} catch (Exception e) {
e.printStackTrace();
log.error("延時(shí)消息發(fā)送異常:", e);
}
return null;
}
2.1.4 發(fā)送順序消息
順序消息(FIFO 消息)是消息隊(duì)列 RocketMQ 版提供的一種嚴(yán)格按照順序來發(fā)布和消費(fèi)的消息類型。
其分為下面2類:
- 全局順序:對于指定的一個(gè) Topic,所有消息按照嚴(yán)格的先入先出 FIFO(First In First Out)的順序進(jìn)行發(fā)布和消費(fèi)亲茅。
- 局部順序:對于指定的一個(gè) Topic,所有消息根據(jù) Sharding Key 進(jìn)行區(qū)塊分區(qū)狗准。同一個(gè)分區(qū)內(nèi)的消息按照嚴(yán)格的 FIFO 順序進(jìn)行發(fā)布和消費(fèi)克锣。Sharding Key 是順序消息中用來區(qū)分不同分區(qū)的關(guān)鍵字段,和普通消息的 Key 是完全不同的概念腔长。
示例:用戶A袭祟、B都下了訂單,需要以順序發(fā)送3條消息捞附,
A1 A2 A3 B1 B2 B3 全局順序巾乳,但是系統(tǒng)性能很受影響您没。
A1 B1 A2 A3 B2 B3 局部順序,只需要保證A或B的消息順序即可胆绊,中間可以穿插其他的消息
A2 B2 A1 A3 B1 B3 這樣的就不符合要求
為了實(shí)現(xiàn)消息的順序消費(fèi)氨鹏,我們需要最生產(chǎn)者和消息者做特殊些要求;對應(yīng)全局順序压状,必須設(shè)置1個(gè)Topic下讀寫隊(duì)列都為1仆抵,同時(shí)業(yè)務(wù)端還需要對消息的重試機(jī)制進(jìn)行處理,性能自然也就差了种冬,因此不建議使用镣丑。
對于局部順序,需要保證消息的發(fā)送順序娱两、消息的存儲(chǔ)順序莺匠、消息的消費(fèi)順序;
- 消息的發(fā)送:多線程中需要保證同一個(gè)業(yè)務(wù)編號(hào)的消息在一個(gè)線程中完成谷婆,同時(shí)使用同步的消息慨蛙;
- 消息的存儲(chǔ):mq的topic下會(huì)存在多個(gè)queue,要保證消息的順序存儲(chǔ)纪挎,同一個(gè)業(yè)務(wù)編號(hào)的消息需要被發(fā)送到一個(gè)queue中期贫。對應(yīng)到mq中,需要使用MessageQueueSelector來選擇要發(fā)送的queue异袄,即對業(yè)務(wù)編號(hào)進(jìn)行hash通砍,然后根據(jù)隊(duì)列數(shù)量對hash值取余,將消息發(fā)送到一個(gè)queue中烤蜕。
- 消息的消費(fèi):要保證消息順序消費(fèi)封孙,同一個(gè)queue就只能被一個(gè)消費(fèi)者所消費(fèi),因此對broker中消費(fèi)隊(duì)列加鎖是無法避免的讽营。同一時(shí)刻虎忌,一個(gè)消費(fèi)隊(duì)列只能被一個(gè)消費(fèi)者消費(fèi),消費(fèi)者內(nèi)部橱鹏,也只能有一個(gè)消費(fèi)線程來消費(fèi)該隊(duì)列膜蠢。即,同一時(shí)刻莉兰,一個(gè)消費(fèi)隊(duì)列只能被一個(gè)消費(fèi)者中的一個(gè)線程消費(fèi)挑围。
全局順序和局部順序的代碼實(shí)現(xiàn)幾乎是一樣。
/**
* 順序消息
*/
@PostMapping
public String sendMessage(@Validated @RequestBody OrderNotify orderNotify){
Message message = new Message();
message.setTopic("order-notify");
message.setTags("train");
message.setKeys(UUID.randomUUID().toString());
message.setBody(JSONObject.toJSONString(orderNotify).getBytes(StandardCharsets.UTF_8));
try {
producerBuilder.build().send(message, (mqs, msg, arg) -> {
// 這里就是進(jìn)行隊(duì)列的選擇糖荒,這里的arg參數(shù)就是后面?zhèn)魅氲哪莻€(gè)參數(shù)
int index = Math.abs(arg.hashCode()%mqs.size());
return mqs.get(index);
}, orderNotify.getOrderNo());
} catch (Exception e) {
log.error("順序消息發(fā)送異常", e);
return "順序消息發(fā)送異常:"+e.getMessage();
}
return "消息發(fā)送完成";
}
消費(fèi)者實(shí)現(xiàn)杉辙,和這里需要選擇有序的監(jiān)聽類實(shí)現(xiàn),同時(shí)需要從隊(duì)列開始處開始消費(fèi)捶朵。(其他的是一樣的蜘矢,具體的見消費(fèi)者那塊的代碼)
@Override
public void init() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getConsumerGroup());
consumer.setNamesrvAddr(getNameServer());
try {
// 設(shè)置consumer所訂閱的Topic和Tag, *代表所有的Tag
consumer.subscribe(this.topics, this.tags);
// CONSUME_FROM_FIRST_OFFSET, 從隊(duì)列最開始開始消費(fèi)狂男,即歷史消息(還存在broker的)全部消費(fèi)一遍
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 設(shè)置線程數(shù),默認(rèn)是20品腹;這里先將其設(shè)置小點(diǎn)
consumer.setConsumeThreadMin(3);
consumer.setConsumeThreadMax(6);
// MessageListenerOrderly 有序的
consumer.registerMessageListener((MessageListenerOrderly) (list, context) -> {
try{
// 其實(shí)這里默認(rèn)每次只會(huì)傳入一條消息
log.warn("本次消息數(shù):{}", list.size());
for(MessageExt messageExt:list){
//打印消息內(nèi)容
log.info("messageExt: [{}]: {}", getNumber(), messageExt);
String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
ConsumerResult consumerResult = rocketConsumerHandler.handler(ConsumerMessage.builder()
.number(getNumber())
.message(messageBody)
.build());
if(!consumerResult.isSuccess() && consumerResult.isRetry()){
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
}catch (Exception e){
log.error("順序消息消費(fèi)異常:", e);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
});
super.setMqPushConsumer(consumer);
consumer.start();
log.info("start rocketmq consumer success");
}catch (Exception e){
throw new BizRunTimeException("注冊rocketmq消費(fèi)者異常", e);
}
}
2.1.5 訂閱消息發(fā)布
Rocket的消息訂閱方式分為集群模式(默認(rèn)的)和廣播模式并淋,這個(gè)只需要在消費(fèi)端進(jìn)行設(shè)置即可;廣播模式下珍昨,同一個(gè) Group ID 所標(biāo)識(shí)的所有 Consumer 都會(huì)各自消費(fèi)某條消息一次县耽。同時(shí)廣播模式下不支持順序消息,消費(fèi)點(diǎn)重置即消費(fèi)失敗不會(huì)重新投遞镣典。
額外說明:同一個(gè)Group ID 所標(biāo)識(shí)的消費(fèi)者訂閱的設(shè)置需要保持一致兔毙,即消費(fèi)者分組A中所有消費(fèi)者的topic和tag必須設(shè)置為一樣的。
下面這個(gè)就是不正確的兄春,如果這樣設(shè)置澎剥,那么消息消費(fèi)的邏輯就會(huì)混亂,甚至導(dǎo)致消息丟失
就是在創(chuàng)建消費(fèi)者時(shí)增加下面這行代碼即可
consumer.setMessageModel(MessageModel.BROADCASTING);
2.1.6 事務(wù)消息
2.1.7 消費(fèi)者
下面使用基于springboot的簡單實(shí)現(xiàn)赶舆;application配置文件如下:
rocketmq:
name-server: 192.168.111.63:9876
producer:
group: client-server
consumer:
- consumerGroup: trian-order
consumeFromWhere: CONSUME_FROM_LAST_OFFSET
topics: demo-pay
rocketConsumerHandler: "top.vchar.rocketmq.config.rocketmq.handler.SimpleRocketConsumerHandler"
消費(fèi)者接口定義
public interface RocketConsumer {
/**
* 初始化
*/
void init();
}
/**
* <p> rocketmq 消費(fèi)者基礎(chǔ)信息 </p>
*
* @version 1.0
* @create_date 2020/10/27
*/
@Slf4j
public abstract class AbstractRocketConsumer implements RocketConsumer, DisposableBean {
protected MQPushConsumer consumer;
@Getter
private final String nameServer;
@Getter
private final String consumerGroup;
@Getter
private final ConsumeFromWhere consumeFromWhere;
public AbstractRocketConsumer(String nameServer, String consumerGroup, ConsumeFromWhere consumeFromWhere){
this.nameServer = nameServer;
this.consumerGroup = consumerGroup;
this.consumeFromWhere = Optional.ofNullable(consumeFromWhere).orElse(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
}
/**
* 初始化
*/
@Override
public abstract void init();
public void setMqPushConsumer(MQPushConsumer consumer){
this.consumer = consumer;
}
/**
* 銷毀
*/
@Override
public void destroy() throws Exception {
if (Objects.nonNull(consumer)) {
consumer.shutdown();
}
log.info("container destroyed, {}", this.toString());
}
}
/**
* <p> 普通消息哑姚,延時(shí)消息消費(fèi)者 </p>
*
* @author vchar fred
* @version 1.0
* @create_date 2020/10/29
*/
@Slf4j
public class SimpleRocketConsumer extends AbstractRocketConsumer {
private final String topics;
private final String tags;
private final RocketConsumerHandler rocketConsumerHandler;
/**
* 創(chuàng)建簡單的消費(fèi)者
* @param nameServer name server
* @param consumerGroup 消費(fèi)者組
* @param consumeFromWhere 消費(fèi)策略
* CONSUME_FROM_LAST_OFFSET 默認(rèn)策略。從該隊(duì)列最尾開始消費(fèi)芜茵,跳過歷史消息
* CONSUME_FROM_FIRST_OFFSET, 從隊(duì)列最開始開始消費(fèi)叙量,即歷史消息(還存在broker的)全部消費(fèi)一遍
* CONSUME_FROM_TIMESTAMP; 根據(jù)時(shí)間消費(fèi)
* @param topics 主題
* @param tags 標(biāo)簽,默認(rèn)為*
* @param rocketConsumerHandler 消息接收業(yè)務(wù)處理器
*/
public SimpleRocketConsumer(String nameServer, String consumerGroup, ConsumeFromWhere consumeFromWhere, String topics, String tags, RocketConsumerHandler rocketConsumerHandler){
super(nameServer, consumerGroup, consumeFromWhere);
Assert.notNull(nameServer, "RocketMQ name server can't null");
Assert.notNull(consumerGroup, "RocketMQ consumer group can't null");
Assert.notNull(topics, "RocketMQ topics can't null");
Assert.notNull(rocketConsumerHandler, "RocketMQ SimpleRocketConsumerHandler can't null");
this.topics = topics;
this.tags = Optional.ofNullable(tags).orElse("*");
this.rocketConsumerHandler = rocketConsumerHandler;
}
@Override
public void init() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getConsumerGroup());
consumer.setNamesrvAddr(getNameServer());
try {
//設(shè)置consumer所訂閱的Topic和Tag, *代表所有的Tag
consumer.subscribe(this.topics, this.tags);
// CONSUME_FROM_LAST_OFFSET 默認(rèn)策略九串。從該隊(duì)列最尾開始消費(fèi)绞佩,跳過歷史消息
// CONSUME_FROM_FIRST_OFFSET, 從隊(duì)列最開始開始消費(fèi),即歷史消息(還存在broker的)全部消費(fèi)一遍
// CONSUME_FROM_TIMESTAMP;//根據(jù)時(shí)間消費(fèi)
consumer.setConsumeFromWhere(getConsumeFromWhere());
// MessageListenerOrderly 有序的猪钮,
// 注意有序的在返回消費(fèi)失敗后品山,其會(huì)馬上就將消息再次發(fā)過來,并且其消費(fèi)次數(shù)不變烤低,
// 也就是其會(huì)永遠(yuǎn)的重試(因此建議不要把異常拋出肘交,程序里面手動(dòng)處理下)
// MessageListenerConcurrently無序的,效率更高
consumer.registerMessageListener((MessageListenerConcurrently)(list, context)->{
try{
for(MessageExt messageExt:list){
//打印消息內(nèi)容
log.info("messageExt: {}", messageExt);
String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
ConsumerResult consumerResult = rocketConsumerHandler.handler(messageBody);
if(!consumerResult.isSuccess() && consumerResult.isRetry()){
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}catch (Exception e){
log.error("消息消費(fèi)異常", e);
// 消費(fèi)失敗扑馁,稍后mq會(huì)再次將消息發(fā)過來涯呻,注意mq默認(rèn)最大重試次數(shù)為16,可以修改檐蚜。
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
// 消費(fèi)成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
super.setMqPushConsumer(consumer);
log.info("start rocketmq consumer success");
}catch (Exception e){
throw new BizRunTimeException("注冊rocketmq消費(fèi)者異常", e);
}
}
public static SimpleRocketConsumerBuilder builder(){
return new SimpleRocketConsumerBuilder();
}
static class SimpleRocketConsumerBuilder {
private String nameServer;
private String consumerGroup;
private ConsumeFromWhere consumeFromWhere;
private String topics;
private String tags;
private RocketConsumerHandler rocketConsumerHandler;
public SimpleRocketConsumerBuilder(){
}
public SimpleRocketConsumer build(){
return new SimpleRocketConsumer(this.nameServer, this.consumerGroup, this.consumeFromWhere, this.topics, this.tags, this.rocketConsumerHandler);
}
public SimpleRocketConsumerBuilder nameServer(String nameServer){
this.nameServer = nameServer;
return this;
}
public SimpleRocketConsumerBuilder consumerGroup(String consumerGroup){
this.consumerGroup = consumerGroup;
return this;
}
public SimpleRocketConsumerBuilder consumeFromWhere(ConsumeFromWhere consumeFromWhere){
this.consumeFromWhere = consumeFromWhere;
return this;
}
public SimpleRocketConsumerBuilder topics(String topics){
this.topics = topics;
return this;
}
public SimpleRocketConsumerBuilder tags(String tags){
this.tags = tags;
return this;
}
public SimpleRocketConsumerBuilder rocketConsumerHandler(RocketConsumerHandler rocketConsumerHandler){
this.rocketConsumerHandler = rocketConsumerHandler;
return this;
}
}
}
業(yè)務(wù)處理handler
public interface RocketConsumerHandler {
/**
* 消息處理
* @param message 消息內(nèi)容
* @return 返回處理結(jié)果
*/
ConsumerResult handler(String message);
}
/**
* <p> 消息業(yè)務(wù)handler實(shí)現(xiàn) </p>
*
* @version 1.0
* @create_date 2020/10/29
*/
@Slf4j
@Component
public class SimpleRocketConsumerHandler implements RocketConsumerHandler {
@Override
public ConsumerResult handler(String message) {
log.info("消費(fèi)消息: {}", message);
// TODO 這里是業(yè)務(wù)處理魄懂,ConsumerResult類就是個(gè)簡單的處理結(jié)果類
return new ConsumerResult(true);
}
}
RocketMQ配置類
@Data
@Component
@ConfigurationProperties(prefix = "rocketmq")
public class RocketMQProperties implements Serializable {
/**
* The name server for rocketMQ, formats: `host:port;host:port`.
*/
private String nameServer;
private Producer producer;
private List<Consumer> consumer;
@Data
public static final class Producer {
private String group;
}
@Data
public static final class Consumer {
private String consumerGroup;
private ConsumeFromWhere consumeFromWhere;
private String topics;
private String tags;
private String rocketConsumerHandler;
}
}
注冊消費(fèi)者
/**
* <p> 消費(fèi)者注冊 </p>
*
* spring容器將所有的bean加載完畢后會(huì)執(zhí)行run方法
*
* @version 1.0
* @create_date 2020/10/29
*/
@Slf4j
@Component
public class RocketMQConsumerRegister implements CommandLineRunner {
private final RocketMQProperties properties;
public RocketMQConsumerRegister(RocketMQProperties properties) {
this.properties = properties;
}
@Override
public void run(String... args) throws Exception {
List<RocketMQProperties.Consumer> consumers = properties.getConsumer();
if(consumers!=null && !consumers.isEmpty()){
for(RocketMQProperties.Consumer consumer:consumers){
SimpleRocketConsumer.builder()
.nameServer(this.properties.getNameServer())
.consumerGroup(consumer.getConsumerGroup())
.consumeFromWhere(consumer.getConsumeFromWhere())
.topics(consumer.getTopics())
.tags(consumer.getTags())
.rocketConsumerHandler(getHandler(consumer.getRocketConsumerHandler()))
.build().init();
}
}
}
private RocketConsumerHandler getHandler(String handlerClass){
try {
// 由于這些業(yè)務(wù)處理handler可能依賴些基礎(chǔ)組件沿侈,比如數(shù)據(jù)庫等闯第,因此這里從spring容器中獲取bean
return (RocketConsumerHandler) SpringBeanUtil.getBean(Class.forName(handlerClass));
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
}
/**
* <p> spring bean 工具類 </p>
*
* @version 1.0
* @create_date 2020/10/29
*/
@Component
public class SpringBeanUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext = null;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if (null == SpringBeanUtil.applicationContext) {
SpringBeanUtil.applicationContext = applicationContext;
}
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
/**
* 通過Bean名字獲取Bean
*
* @param beanName bean 名稱
* @return 返回獲取到的對象
*/
public static Object getBean(String beanName) {
return getApplicationContext().getBean(beanName);
}
/**
* 通過Bean類型獲取Bean
*
* @param beanClass bean class
* @param <T> beanClass
* @return 返回對象
*/
public static <T> T getBean(Class<T> beanClass) {
return getApplicationContext().getBean(beanClass);
}
/**
* 通過Bean名字和Bean類型獲取Bean
*
* @param beanName bean 名稱
* @param beanClass class
* @param <T> beanClass
* @return 返回對象
*/
public static <T> T getBean(String beanName, Class<T> beanClass) {
return getApplicationContext().getBean(beanName, beanClass);
}
}