2019.5.27日更新
目前springcloud alibaba的項(xiàng)目已經(jīng)快要更新到第一個(gè)relase版本了,如果想要整合rocketmq建議移步springcloud alibaba的github哪里有更加方便的操作芥颈,下面僅供參考
前言
前段時(shí)間更新技術(shù)架構(gòu)的時(shí)候妇垢,看到rocketmq出了4的版本,而且本身這個(gè)mq有事務(wù)消息,在分布式的場(chǎng)景中有很好的啟發(fā)性箩做,和作用燃辖,而且本身它也是阿里開(kāi)源到apache的一個(gè)項(xiàng)目俘陷,從出身還是實(shí)力來(lái)說(shuō)都很不錯(cuò)倒得,所以今天就抱著支持國(guó)產(chǎn)的心態(tài)開(kāi)看一看這個(gè)mq
1.項(xiàng)目代碼
maven
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
producer
單從分類producer的官網(wǎng)doc來(lái)看主要分成3種,
DefaultMQProducer
TransactionMQProducer
messagingAccessPoint.createProducer()
本文主要說(shuō)的是秉馏,DefaultMQProducer琴昆,TransactionMQProducer
DefaultMQProducer
默認(rèn)的producer,從官方的文檔來(lái)看复颈,前四個(gè)都是對(duì)這個(gè)producer的運(yùn)用只是set的值不同而已衅谷,而且是很細(xì)微的變化而已
那么從最簡(jiǎn)單的開(kāi)始
application.yml文件
rocketmq:
# 生產(chǎn)者配置
producer:
groupName: ${spring.application.name}
namesrvAddr: 192.168.40.133:9876
default: false
yml文件配置讀取類
@Getter
@Setter
@ConfigurationProperties(prefix = "rocketmq.producer")
@Configuration
@ToString
public class ProducerConfig {
private String namesrvAddr;
private String groupName;
}
producer類的創(chuàng)建類渗蟹,需要注意的是這個(gè)producer一個(gè)程序里面只能出現(xiàn)一個(gè)膘怕,當(dāng)重復(fù)創(chuàng)建時(shí)就會(huì)報(bào)錯(cuò)
@Log4j2
@Configuration
public class ProducerConfigure {
@Autowired
private ProducerConfig producerConfigure;
/**
* 創(chuàng)建普通消息發(fā)送者實(shí)例
*
* @return
* @throws MQClientException
*/
@Bean
@ConditionalOnProperty(prefix = "rocketmq.producer", value = "default", havingValue = "true")
public DefaultMQProducer defaultProducer() throws MQClientException {
log.info(producerConfigure.toString());
log.info("defaultProducer 正在創(chuàng)建---------------------------------------");
DefaultMQProducer producer = new DefaultMQProducer(producerConfigure.getGroupName());
producer.setNamesrvAddr(producerConfigure.getNamesrvAddr());
producer.setVipChannelEnabled(false);
producer.setRetryTimesWhenSendAsyncFailed(10);
producer.start();
log.info("rocketmq producer server開(kāi)啟成功---------------------------------.");
return producer;
}
}
當(dāng)producer創(chuàng)建完畢之后就是consumer的公用設(shè)置
首先也是yml和配置類的定義
rocketmq:
# 消費(fèi)者配置
consumer:
groupName: ${spring.application.name}
namesrvAddr: 192.168.40.133:9876
@Getter
@Setter
@ConfigurationProperties(prefix = "rocketmq.consumer")
@Configuration
@ToString
public class ConsumerConfig {
private String groupName;
private String namesrvAddr;
}
@Configuration
@Log4j2
public abstract class DefaultConsumerConfigure {
@Autowired
private ConsumerConfig consumerConfig;
// 開(kāi)啟消費(fèi)者監(jiān)聽(tīng)服務(wù)
public void listener(String topic, String tag) throws MQClientException {
log.info("開(kāi)啟" + topic + ":" + tag + "消費(fèi)者-------------------");
log.info(consumerConfig.toString());
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerConfig.getGroupName());
consumer.setNamesrvAddr(consumerConfig.getNamesrvAddr());
consumer.subscribe(topic, tag);
// 開(kāi)啟內(nèi)部類實(shí)現(xiàn)監(jiān)聽(tīng)
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
return DefaultConsumerConfigure.this.dealBody(msgs);
}
});
consumer.start();
log.info("rocketmq啟動(dòng)成功---------------------------------------");
}
// 處理body的業(yè)務(wù)
public abstract ConsumeConcurrentlyStatus dealBody(List<MessageExt> msgs);
}
值得注意的是争群,這里DefaultConsumerConfigure沒(méi)有定義在什么時(shí)候運(yùn)行轻要,還有對(duì)body的操作也抽象出來(lái)了幸冻,提供給實(shí)現(xiàn)類做處理碑定,方便業(yè)務(wù)抽取
@Log4j2
@Configuration
public class TestConsumer extends DefaultConsumerConfigure implements ApplicationListener<ContextRefreshedEvent>{
@Override
public void onApplicationEvent(ContextRefreshedEvent arg0) {
try {
super.listener("t_TopicTest", "Tag1");
} catch (MQClientException e) {
log.error("消費(fèi)者監(jiān)聽(tīng)器啟動(dòng)失敗", e);
}
}
@Override
public ConsumeConcurrentlyStatus dealBody(List<MessageExt> msgs) {
int num = 1;
log.info("進(jìn)入");
for(MessageExt msg : msgs) {
log.info("第" + num + "次消息");
try {
String msgStr = new String(msg.getBody(), "utf-8");
log.info(msgStr);
} catch (UnsupportedEncodingException e) {
log.error("body轉(zhuǎn)字符串解析失敗");
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
那么這些寫(xiě)完之后基本就做測(cè)試類的編寫(xiě)了
這個(gè)地實(shí)現(xiàn)了ApplicationListener,讓他在啟動(dòng)的時(shí)候就開(kāi)始執(zhí)行這個(gè)consumer,相信有些同學(xué)會(huì)喜歡用@PostConstruct勘究,但是不要這么做,因?yàn)樗麜?huì)在init之前執(zhí)行,那么有些類會(huì)加載不完全虏等,會(huì)導(dǎo)致無(wú)法開(kāi)機(jī)啟動(dòng)的
然后再controller里面引入producer澄干,然后直接調(diào)用即可
@RestController
@RequestMapping("/test")
@Log4j2
public class TestController {
@Autowired
private DefaultMQProducer defaultMQProducer;
// @Autowired
// private TransactionMQProducer producer;
@Autowired
private TestTransactionListener testTransactionListener;
@GetMapping("/test")
public void test(String info) throws Exception {
Message message = new Message("TopicTest", "Tag1", "12345", "rocketmq測(cè)試成功".getBytes());
// 這里用到了這個(gè)mq的異步處理,類似ajax拂檩,可以得到發(fā)送到mq的情況加矛,并做相應(yīng)的處理
//不過(guò)要注意的是這個(gè)是異步的
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("傳輸成功");
log.info(GsonUtil.GSON.toJson(sendResult));
}
@Override
public void onException(Throwable e) {
log.error("傳輸失敗", e);
}
});
}
}
跑一跑妓羊,就可以看到結(jié)果了
說(shuō)完了DefaultMQProducer那么就來(lái)說(shuō)transactionMQProducer
同樣參考官網(wǎng)的例子來(lái)做整合
在原來(lái)的ProducerConfigure類的基礎(chǔ)上加上即可
需要注意的是ConditionalOnProperty這個(gè)必須得有星持,而且配置文件中
transaction和default中只能有一個(gè)是true捡鱼,不然就會(huì)同時(shí)創(chuàng)建兩個(gè)producer管引,那么就會(huì)報(bào)錯(cuò)
/**
* 創(chuàng)建事務(wù)消息發(fā)送者實(shí)例
*
* @return
* @throws MQClientException
*/
@Bean
@ConditionalOnProperty(prefix = "rocketmq.producer", value = "transaction", havingValue = "true")
public TransactionMQProducer transactionMQProducer() throws MQClientException {
log.info(producerConfigure.toString());
log.info("TransactionMQProducer 正在創(chuàng)建---------------------------------------");
TransactionMQProducer producer = new TransactionMQProducer(producerConfigure.getGroupName());
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setNamesrvAddr(producerConfigure.getNamesrvAddr());
producer.setExecutorService(executorService);
producer.start();
log.info("TransactionMQProducer server開(kāi)啟成功---------------------------------.");
return producer;
}
因?yàn)閠ransaction的流程下,rocketmq會(huì)先發(fā)送一個(gè)consumer不可見(jiàn)的消息,然后在調(diào)用
TransactionListener這個(gè)接口中的executeLocalTransaction,中的方法執(zhí)行事務(wù),然后方法內(nèi)部需要返回
一個(gè)LocalTransactionState的枚舉信息恶导,分別為
public enum LocalTransactionState {
COMMIT_MESSAGE, // 提交
ROLLBACK_MESSAGE, // 回滾
UNKNOW, // 未知
}
相應(yīng)的當(dāng)我們返回的是COMMIT_MESSAGE時(shí),那么producer會(huì)把消息提交到mq上,
如果是ROLLBACK_MESSAGE那么producer就會(huì)結(jié)束闸准,并且不提交到mq,
public interface TransactionListener {
/**
* When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
*
* @param msg Half(prepare) message
* @param arg Custom business parameter
* @return Transaction state
*/
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
/**
* When no response to prepare(half) message. broker will send check message to check the transaction status, and this
* method will be invoked to get local transaction status.
*
* @param msg Check message
* @return Transaction state
*/
LocalTransactionState checkLocalTransaction(final MessageExt msg);
}
需要注意的是checkLocalTransaction是用作mq長(zhǎng)時(shí)間沒(méi)有收到producer的executeLocalTransaction響應(yīng)的時(shí)候調(diào)用的,這個(gè)類在3.0之后的版本就被閹割了察迟,只有接口秕岛,卻沒(méi)有實(shí)現(xiàn)改鲫,那么直接寫(xiě)一個(gè)空實(shí)現(xiàn)即可,在我這邊的代碼上,我做了一個(gè)抽象壳嚎,把需要實(shí)現(xiàn)的executeLocalTransaction抽象出來(lái)
@Configuration
public abstract class AbstractTransactionListener implements TransactionListener {
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
return LocalTransactionState.COMMIT_MESSAGE;
}
}
這個(gè)是executeLocalTransaction的實(shí)現(xiàn)類寡润,簡(jiǎn)單的做了些業(yè)務(wù)瞬沦,然后返回了一個(gè)commit
@Configuration
@Log4j2
public class TestTransactionListener extends AbstractTransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
log.info(new String(msg.getBody()));
return LocalTransactionState.COMMIT_MESSAGE;
}
}
consumer是沒(méi)有變化的边坤,基本相同区拳,那么就直接貼上controller的測(cè)試代碼
@GetMapping("t_test")
public void Ttest(String info) throws Exception {
Message message = new Message("t_TopicTest", "Tag1", "12345", "rocketmq測(cè)試成功".getBytes());
producer.setTransactionListener(testTransactionListener);
producer.setSendMsgTimeout(10000);
producer.sendMessageInTransaction(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("傳輸成功");
log.info(GsonUtil.GSON.toJson(sendResult));
}
@Override
public void onException(Throwable e) {
log.error("傳輸失敗", e);
}
});
}
跑一跑即可看到結(jié)果參數(shù)