Springboot2.0 集成Rocketmq實(shí)現(xiàn)事物消息
廢話不多說(shuō)佑惠,直接進(jìn)入正題
1.引入相關(guān)Maven依賴:
注意:當(dāng)前的RELEASE.VERSION=2.0.1
<!--在pom.xml中添加依賴-->
<dependency>
? ? <groupId>org.apache.rocketmq</groupId>
? ? <artifactId>rocketmq-spring-boot-starter</artifactId>
? ? <version>${RELEASE.VERSION}</version>
</dependency>
2.配置生產(chǎn)者:
(1).application.yml配置如下:
rocketmq:
? ? ? name-server: '127.0.0.1:9876'
? ? ? producer:
? ? ? ? ? group: my-group
? ? ? ? ? send-message-timeout: 300000
? ? ? ? ? compress-message-body-threshold: 4096
? ? ? ? ? max-message-size: 4194304
? ? ? ? ? retry-times-when-send-async-failed: 0
? ? ? ? ? retry-next-server: true
? ? ? ? ? retry-times-when-send-failed: 2
(2).發(fā)送事物消息:
@Resource
private RocketMQTemplate rocketMQTemplate;
{
? ? ? ? ? ?Message msg = MessageBuilder.withPayload(s).build();
? ? ? ? ?// rocketMQTemplate.getProducer().setVipChannelEnabled(false);
?// test1事務(wù)組晌块,對(duì)應(yīng)RocketMQLocalTransactionListener中的事務(wù)生產(chǎn)者組名稱
? ? ? ? ? ?rocketMQTemplate.sendMessageInTransaction("test1", "ts", msg, null);
}
實(shí)現(xiàn)事務(wù)監(jiān)聽(tīng):
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import com.alibaba.fastjson.JSON;
@RocketMQTransactionListener(txProducerGroup = "test1")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
? ? public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
? ? ? // ... local transaction process, return bollback, commit or unknown
? ? ? System.out.println("本地事務(wù)和消息發(fā)送:" + JSON.toJSONString(msg));
? ? ? return RocketMQLocalTransactionState.UNKNOWN;
? ? }
? ? @Override
? ? public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
? ? ? // ... check transaction status and return bollback, commit or unknown
? ? ? System.out.println("回查信息:" + JSON.toJSONString(msg));
? ? ? return RocketMQLocalTransactionState.COMMIT;
? ? }
}
注意liunx上面需要開(kāi)放端口:9876弊仪、10909、10910、10911,不然會(huì)出現(xiàn)消費(fèi)端獲取失敗,連不上nameserv和broker
3.消費(fèi)者配置:
application.yml:
rocketmq:
? ? ? name-server: '127.0.0.1:9876'
代碼實(shí)現(xiàn):
對(duì)消費(fèi)者實(shí)現(xiàn)ack機(jī)制:
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "ts", consumerGroup = "my-consumerGroup")
public class RocketmqConsumer implements RocketMQListener<String>,RocketMQPushConsumerLifecycleListener{
? ? private int count = 0;
? ? @Override
? ? public void prepareStart(DefaultMQPushConsumer consumer) {
? ? ? ? consumer.registerMessageListener(new MessageListenerConcurrently() {
? ? ? ? ? ? @Override
? ? ? ? ? ? public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
? ? ? ? ? ? ? ? ConsumeConcurrentlyContext context) {
? ? ? ? ? ? ? ? for (MessageExt messageExt : msgs) {
? ? ? ? ? ? ? ? ? ? System.out.println("重試次數(shù):" + messageExt.getReconsumeTimes());
? ? ? ? ? ? ? ? ? // 注意可以在此處判斷重試次數(shù)症副,實(shí)現(xiàn)入庫(kù)插入,記錄相關(guān)消息政基,進(jìn)行下面的業(yè)務(wù)邏輯處理
? ? ? ? ? ? ? ? ? ? if(count == 0) {
? ? ? ? ? ? ? ? ? ? ? ? count++;
? ? ? ? ? ? ? ? ? ? ? ? return ConsumeConcurrentlyStatus.RECONSUME_LATER;
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? System.out.println("接受到的消息:" + new String(messageExt.getBody()));
? ? ? ? ? ? ? ? ? ? return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
? ? ? ? ? ? }
? ? ? ? });
? ? }
? ? @Override
? ? public void onMessage(String message) {
? ? ? ?實(shí)現(xiàn)RocketMQPushConsumerLifecycleListener監(jiān)聽(tīng)器之后贞铣,此方法不調(diào)用
? ? }
}