消息隊(duì)列中間件是分布式系統(tǒng)中重要的組件歇僧,主要解決應(yīng)用解耦图张,異步消息,日志記錄,流量削鋒祸轮、分布式事務(wù)等問(wèn)題兽埃,實(shí)現(xiàn)高性能,高可用适袜,可伸縮和最終一致性架構(gòu)柄错。
zebra架構(gòu)選用RocketMQ作為消息隊(duì)列組件,下面介紹下RocketMQ如何與Springboot進(jìn)行組合封裝苦酱。
1鄙陡、引入依賴包
2、設(shè)置配置項(xiàng)信息
namesrvAddr地址
zebra.rocketmq.namesrvAddr=0.0.0.0:9876
生產(chǎn)者group名稱
zebra.rocketmq.producerGroupName=producerGroupName
事務(wù)生產(chǎn)者group名稱
zebra.rocketmq.transactionProducerGroupName=transactionProducerGroupName
消費(fèi)者group名稱
zebra.rocketmq.consumerGroupName=consumerGroupName
生產(chǎn)者實(shí)例名稱
zebra.rocketmq.producerInstanceName=producerInstanceName
消費(fèi)者實(shí)例名稱
zebra.rocketmq.consumerInstanceName=consumerInstanceName
事務(wù)生產(chǎn)者實(shí)例名稱
zebra.rocketmq.producerTranInstanceName=producerTranInstanceName
一次最大消費(fèi)多少數(shù)量消息
zebra.rocketmq.consumerBatchMaxSize=1
廣播消費(fèi)
zebra.rocketmq.consumerBroadcasting=false
消費(fèi)的topic:tag
zebra.rocketmq.subscribe[0]=TopicTest1:TagA
啟動(dòng)的時(shí)候是否消費(fèi)歷史記錄
zebra.rocketmq.enableHisConsumer=false
啟動(dòng)順序消費(fèi)
zebra.rocketmq.enableOrderConsumer=false
3躏啰、編寫(xiě)配置類
@ConfigurationProperties(RocketmqProperties.PREFIX)
public class RocketmqProperties {
public static final String PREFIX = "zebra.rocketmq";
private String namesrvAddr;
private String producerGroupName;
private String transactionProducerGroupName;
private String consumerGroupName;
private String producerInstanceName;
private String consumerInstanceName;
private String producerTranInstanceName;
private int consumerBatchMaxSize;
private boolean consumerBroadcasting;
private boolean enableHisConsumer;
private boolean enableOrderConsumer;
private List subscribe = new ArrayList<>();
}
4趁矾、編寫(xiě)producer和consumer初始化類
@Configuration
@EnableConfigurationProperties(RocketmqProperties.class)
@ConditionalOnProperty(prefix = RocketmqProperties.PREFIX, value = "namesrvAddr")
public class RocketmqAutoConfiguration {
private static final Logger log = LogManager.getLogger(RocketmqAutoConfiguration.class);
@Autowired
private RocketmqProperties properties;
@Autowired
private ApplicationEventPublisher publisher;
private static boolean isFirstSub = true;
private static long startTime = System.currentTimeMillis();
/**
* 初始化向rocketmq發(fā)送普通消息的生產(chǎn)者
*/
@Bean
@ConditionalOnProperty(prefix = RocketmqProperties.PREFIX, value = "producerInstanceName")
@ConditionalOnBean(EtcdClient.class)
public DefaultMQProducer defaultProducer() throws MQClientException {
/**
* 一個(gè)應(yīng)用創(chuàng)建一個(gè)Producer,由應(yīng)用來(lái)維護(hù)此對(duì)象给僵,可以設(shè)置為全局對(duì)象或者單例<br>
* 注意:ProducerGroupName需要由應(yīng)用來(lái)保證唯一<br>
* ProducerGroup這個(gè)概念發(fā)送普通的消息時(shí)毫捣,作用不大,但是發(fā)送分布式事務(wù)消息時(shí)帝际,比較關(guān)鍵蔓同,
* 因?yàn)榉?wù)器會(huì)回查這個(gè)Group下的任意一個(gè)Producer
*/
DefaultMQProducer producer = new DefaultMQProducer(properties.getProducerGroupName());
producer.setNamesrvAddr(properties.getNamesrvAddr());
producer.setInstanceName(properties.getProducerInstanceName());
producer.setVipChannelEnabled(false);
producer.setRetryTimesWhenSendAsyncFailed(10);
/**
* Producer對(duì)象在使用之前必須要調(diào)用start初始化,初始化一次即可<br>
* 注意:切記不可以在每次發(fā)送消息時(shí)蹲诀,都調(diào)用start方法
*/
producer.start();
log.info("RocketMq defaultProducer Started.");
return producer;
}
/**
* 初始化向rocketmq發(fā)送事務(wù)消息的生產(chǎn)者
*/
@Bean
@ConditionalOnProperty(prefix = RocketmqProperties.PREFIX, value = "producerTranInstanceName")
@ConditionalOnBean(EtcdClient.class)
public TransactionMQProducer transactionProducer() throws MQClientException {
/**
* 一個(gè)應(yīng)用創(chuàng)建一個(gè)Producer斑粱,由應(yīng)用來(lái)維護(hù)此對(duì)象,可以設(shè)置為全局對(duì)象或者單例<br>
* 注意:ProducerGroupName需要由應(yīng)用來(lái)保證唯一<br>
* ProducerGroup這個(gè)概念發(fā)送普通的消息時(shí)脯爪,作用不大则北,但是發(fā)送分布式事務(wù)消息時(shí),比較關(guān)鍵痕慢,
* 因?yàn)榉?wù)器會(huì)回查這個(gè)Group下的任意一個(gè)Producer
*/
TransactionMQProducer producer = new TransactionMQProducer(properties.getTransactionProducerGroupName());
producer.setNamesrvAddr(properties.getNamesrvAddr());
producer.setInstanceName(properties.getProducerTranInstanceName());
producer.setRetryTimesWhenSendAsyncFailed(10);
// 事務(wù)回查最小并發(fā)數(shù)
producer.setCheckThreadPoolMinSize(2);
// 事務(wù)回查最大并發(fā)數(shù)
producer.setCheckThreadPoolMaxSize(2);
// 隊(duì)列數(shù)
producer.setCheckRequestHoldMax(2000);
// TODO 由于社區(qū)版本的服務(wù)器閹割調(diào)了消息回查的功能尚揣,所以這個(gè)地方?jīng)]有意義
// TransactionCheckListener transactionCheckListener = new
// TransactionCheckListenerImpl();
// producer.setTransactionCheckListener(transactionCheckListener);
/**
* Producer對(duì)象在使用之前必須要調(diào)用start初始化,初始化一次即可<br>
* 注意:切記不可以在每次發(fā)送消息時(shí)掖举,都調(diào)用start方法
*/
producer.start();
log.info("RocketMq TransactionMQProducer Started.");
return producer;
}
/**
* 初始化rocketmq消息監(jiān)聽(tīng)方式的消費(fèi)者
*/
@Bean
@ConditionalOnProperty(prefix = RocketmqProperties.PREFIX, value = "consumerInstanceName")
@ConditionalOnBean(EtcdClient.class)
public DefaultMQPushConsumer pushConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(properties.getConsumerGroupName());
consumer.setNamesrvAddr(properties.getNamesrvAddr());
consumer.setInstanceName(properties.getConsumerInstanceName());
if (properties.isConsumerBroadcasting()) {
consumer.setMessageModel(MessageModel.BROADCASTING);
}
consumer.setConsumeMessageBatchMaxSize(
properties.getConsumerBatchMaxSize() == 0 ? 1 : properties.getConsumerBatchMaxSize());// 設(shè)置批量消費(fèi)快骗,以提升消費(fèi)吞吐量,默認(rèn)是1
/**
* 訂閱指定topic下tags
*/
List<String> subscribeList = properties.getSubscribe();
for (String sunscribe : subscribeList) {
consumer.subscribe(sunscribe.split(":")[0], sunscribe.split(":")[1]);
}
if (properties.isEnableOrderConsumer()) {
consumer.registerMessageListener((List<MessageExt> msgs, ConsumeOrderlyContext context) -> {
try {
context.setAutoCommit(true);
msgs =filter(msgs);
if(msgs.size()==0) return ConsumeOrderlyStatus.SUCCESS;
this.publisher.publishEvent(new RocketmqEvent(msgs, consumer));
} catch (Exception e) {
e.printStackTrace();
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
// 如果沒(méi)有return success塔次,consumer會(huì)重復(fù)消費(fèi)此信息方篮,直到success。
return ConsumeOrderlyStatus.SUCCESS;
});
} else {
consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {
try {
msgs=filter(msgs);
if(msgs.size()==0) return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
this.publisher.publishEvent(new RocketmqEvent(msgs, consumer));
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
// 如果沒(méi)有return success励负,consumer會(huì)重復(fù)消費(fèi)此信息藕溅,直到success。
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
}
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5000);// 延遲5秒再啟動(dòng)熄守,主要是等待spring事件監(jiān)聽(tīng)相關(guān)程序初始化完成蜈垮,否則,回出現(xiàn)對(duì)RocketMQ的消息進(jìn)行消費(fèi)后立即發(fā)布消息到達(dá)的事件裕照,然而此事件的監(jiān)聽(tīng)程序還未初始化攒发,從而造成消息的丟失
/**
* Consumer對(duì)象在使用之前必須要調(diào)用start初始化,初始化一次即可<br>
*/
try {
consumer.start();
} catch (Exception e) {
log.info("RocketMq pushConsumer Start failure!!!.");
log.error(e.getMessage(), e);
}
log.info("RocketMq pushConsumer Started.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
return consumer;
}
private List<MessageExt> filter(List<MessageExt> msgs){
if(isFirstSub&&!properties.isEnableHisConsumer()){
msgs =msgs.stream().filter(item ->startTime - item.getBornTimestamp() < 0).collect(Collectors.toList());
}
if(isFirstSub && msgs.size()>0){
isFirstSub = false;
}
return msgs;
}
4晋南、編寫(xiě)Event惠猿,方便Consumer使用
public class RocketmqEvent extends ApplicationEvent {
private static final long serialVersionUID = -4468405250074063206L;
private DefaultMQPushConsumer consumer;
private List<MessageExt> msgs;
public RocketmqEvent(List<MessageExt> msgs, DefaultMQPushConsumer consumer) throws Exception {
super(msgs);
this.consumer = consumer;
this.setMsgs(msgs);
}
public String getMsg(int idx) {
try {
return new String(getMsgs().get(idx).getBody(), "utf-8");
} catch (UnsupportedEncodingException e) {
return null;
}
}
public String getMsg(int idx,String code) {
try {
return new String(getMsgs().get(idx).getBody(), code);
} catch (UnsupportedEncodingException e) {
return null;
}
}
public DefaultMQPushConsumer getConsumer() {
return consumer;
}
public void setConsumer(DefaultMQPushConsumer consumer) {
this.consumer = consumer;
}
public MessageExt getMessageExt(int idx) {
return getMsgs().get(idx);
}
public String getTopic(int idx) {
return getMsgs().get(idx).getTopic();
}
public String getTag(int idx) {
return getMsgs().get(idx).getTags();
}
public byte[] getBody(int idx) {
return getMsgs().get(idx).getBody();
}
public String getKeys(int idx) {
return getMsgs().get(idx).getKeys();
}
public List<MessageExt> getMsgs() {
return msgs;
}
public void setMsgs(List<MessageExt> msgs) {
this.msgs = msgs;
}
}
范例
Producer
@RestController
public class ProducerDemo {
@Autowired
private DefaultMQProducer defaultProducer;
@Autowired
private TransactionMQProducer transactionProducer;
private int i = 0;
@RequestMapping(value = "/sendMsg", method = RequestMethod.GET)
public void sendMsg() {
Message msg = new Message("TopicTest1", // topic
"TagA", // tag
"OrderID00" + i, // key
("Hello zebra mq" + i).getBytes());// body
try {
defaultProducer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
// TODO 發(fā)送成功處理
}
@Override
public void onException(Throwable e) {
System.out.println(e);
// TODO 發(fā)送失敗處理
}
});
i++;
} catch (Exception e) {
e.printStackTrace();
}
}
@RequestMapping(value = "/sendTransactionMsg", method = RequestMethod.GET)
public String sendTransactionMsg() {
SendResult sendResult = null;
try {
// 構(gòu)造消息
Message msg = new Message("TopicTest1", // topic
"TagA", // tag
"OrderID001", // key
("Hello zebra mq").getBytes());// body
// 發(fā)送事務(wù)消息,LocalTransactionExecute的executeLocalTransactionBranch方法中執(zhí)行本地邏輯
sendResult = transactionProducer.sendMessageInTransaction(msg, (Message msg1, Object arg) -> {
int value = 1;
// TODO 執(zhí)行本地事務(wù)负间,改變value的值
// ===================================================
System.out.println("執(zhí)行本地事務(wù)偶妖。。政溃。完成");
if (arg instanceof Integer) {
value = (Integer) arg;
}
// ===================================================
if (value == 0) {
throw new RuntimeException("Could not find db");
} else if ((value % 5) == 0) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if ((value % 4) == 0) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.ROLLBACK_MESSAGE;
}, 4);
System.out.println(sendResult);
} catch (Exception e) {
e.printStackTrace();
}
return sendResult.toString();
}
@RequestMapping(value = "/sendMsgOrder", method = RequestMethod.GET)
public void sendMsgOrder() {
Message msg = new Message("TopicTest1", // topic
"TagA", // tag
"OrderID00" + i, // key
("Hello zebra mq" + i).getBytes());// body
try {
defaultProducer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
System.out.println("MessageQueue" + arg);
int index = ((Integer) arg) % mqs.size();
return mqs.get(index);
}
}, i);// i==arg
i++;
} catch (Exception e) {
e.printStackTrace();
}
}
}
Consumer
@Component
public class ConsumerDemo {
@EventListener(condition = "#event.msgs[0].topic=='TopicTest1' && #event.msgs[0].tags=='TagA'")
public void rocketmqMsgListen(RocketmqEvent event) {
// DefaultMQPushConsumer consumer = event.getConsumer();
try {
System.out.println("com.guosen.client.controller.consumerDemo監(jiān)聽(tīng)到一個(gè)消息達(dá)到:" + event.getMsgs().get(0).getMsgId());
// TODO 進(jìn)行業(yè)務(wù)處理
} catch (Exception e) {
e.printStackTrace();
}
}
}