轉(zhuǎn)載原文:https://www.cnblogs.com/wuzhenzhao/p/11498735.html? ? ? ? 作者:吳振照
消息中間件的功能:
通過學習ActiveMq钧嘶,kafka雹洗,rabbitMq這些消息中間件,我們大致能為消息中間件的功能做一下以下定義:可以先從基本的需求開始思考
最基本的是要能支持消息的發(fā)送和接收估蹄,需要涉及到網(wǎng)絡(luò)通信就一定會涉及到NIO
消息中心的消息存儲(持久化/非持久化)
消息的序列化和反序列化
是否跨語言
消息的確認機制,如何避免消息重發(fā)
高級功能:
消息的有序性
是否支持事務(wù)消息
消息收發(fā)的性能圈盔,對高并發(fā)大數(shù)據(jù)量的支持
是否支持集群
消息的可靠性存儲
是否支持多協(xié)議
MQ消息存儲選擇:
從主流的幾種MQ消息隊列采用的存儲方式來看戴尸,主要會有三種
分布式KV存儲,比如ActiveMQ中采用的levelDB付魔、Redis, 這種存儲方式對于消息讀寫能力要求不高的情況下可以使用
文件系統(tǒng)存儲飞蹂,常見的比如kafka几苍、RocketMQ、RabbitMQ都是采用消息刷盤到所部署的機器上的文件系統(tǒng)來做持久化陈哑,這種方案適合對于有高吞吐量要求的消息中間件妻坝,因為消息刷盤是一種高效率,高可靠惊窖、高性能的持久化方式刽宪,除非磁盤出現(xiàn)故障,否則一般是不會出現(xiàn)無法持久化的問題
關(guān)系型數(shù)據(jù)庫界酒,比如ActiveMQ可以采用mysql作為消息存儲圣拄,關(guān)系型數(shù)據(jù)庫在單表數(shù)據(jù)量達到千萬級的情況下IO性能會出現(xiàn)瓶頸,所以ActiveMQ并不適合于高吞吐量的消息隊列場景毁欣。
總的來說售担,對于存儲效率,文件系統(tǒng)要優(yōu)于分布式KV存儲署辉,分布式KV存儲要優(yōu)于關(guān)系型數(shù)據(jù)庫.
RocketMQ的發(fā)展歷史:
RocketMq是一個由阿里巴巴開源的消息中間件, 2012年開源岩四,2017年成為apache頂級項目哭尝。它的核心設(shè)計借鑒了Kafka,所以我們在了解RocketMQ的時候剖煌,會發(fā)現(xiàn)很多和kafka相同的特性材鹦。同時呢逝淹,Rocket在某些功能上和kafka又有較大的差異,接下來我們就去了解RocketMQ
支持集群模型桶唐、負載均衡栅葡、水平擴展能力
億級別消息堆積能力
采用零拷貝的原理,順序?qū)懕P尤泽,隨機讀
底層通信框架采用Netty NIO
NameServer代替Zookeeper欣簇,實現(xiàn)服務(wù)尋址和服務(wù)協(xié)調(diào)
消息失敗重試機制、消息可查詢
強調(diào)集群無單點坯约,可擴展熊咽,任意一點高可用,水平可擴展
經(jīng)過多次雙十一的考驗
RocketMQ的架構(gòu):
集群本身沒有什么特殊之處闹丐,和kafka的整體架構(gòu)類似横殴,其中zookeeper替換成了NameServer。在rocketmq的早版本(2.x)的時候卿拴,是沒有namesrv組件的衫仑,用的是zookeeper做分布式協(xié)調(diào)和服務(wù)發(fā)現(xiàn),但是后期阿里數(shù)據(jù)根據(jù)實際業(yè)務(wù)需求進行改進和優(yōu)化堕花,自主研發(fā)了輕量級的namesrv,用于注冊Client服務(wù)與Broker的請求路由工作文狱,namesrv上不做任何消息的位置存儲,頻繁操作zookeeper的位置存儲數(shù)據(jù)會影響整體集群性能.
RocketMQ由四部分組成:
Name Server 可集群部署航徙,節(jié)點之間無任何信息同步如贷。提供輕量級的服務(wù)發(fā)現(xiàn)和路由
Broker(消息中轉(zhuǎn)角色,負責存儲消息到踏,轉(zhuǎn)發(fā)消息) 部署相對復(fù)雜杠袱,Broker 分為Master 與Slave,一個Master 可以對應(yīng)多個Slave窝稿,但是一個Slave 只能對應(yīng)一個Master楣富,Master 與Slave 的對應(yīng)關(guān)系通過指定相同的BrokerName,不同的BrokerId來定 義伴榔,BrokerId為0 表示Master纹蝴,非0 表示Slave。Master 也可以部署多個踪少。
Producer塘安,生產(chǎn)者,擁有相同 Producer Group 的 Producer 組成一個集群援奢, 與Name Server 集群中的其中一個節(jié)點(隨機選擇)建立長連接兼犯,定期從Name Server 取Topic 路由信息,并向提供Topic服務(wù)的Master 建立長連接,且定時向Master 發(fā)送心跳切黔。Producer 完全無狀態(tài)砸脊,可集群部署。
Consumer纬霞,消費者凌埂,接收消息進行消費的實例,擁有相同 Consumer Group 的 Consumer 組成一個集群诗芜,與Name Server 集群中的其中一個節(jié)點(隨機選擇)建立長連接瞳抓,定期從Name Server 取Topic 路由信息,并向提供Topic 服務(wù)的Master绢陌、Slave 建立長連接挨下,且定時向Master、Slave 發(fā)送心跳脐湾。Consumer既可以從Master 訂閱消息臭笆,也可以從Slave 訂閱消息,訂閱規(guī)則由Broker 配置決定秤掌。
要使用rocketmq愁铺,至少需要啟動兩個進程,nameserver闻鉴、broker茵乱,前者是各種topic注冊中心,后者是真正的broker孟岛。
單機環(huán)境RocketMQ的安裝(單master):
下載?rocketmq的安裝文件:?http://rocketmq.apache.org
解壓 unzip rocketmq-all-4.4.0-bin-release.zip
啟動 nameserver:
進入rocketMQ解壓目錄下的bin文件夾,啟動namesrv服務(wù):nohup sh mqnamesrv &? tail -f ~/logs/rocketmqlogs/namesrv.log 查看啟動日志
停止 nameserver : sh bin/mqshutdown namesrv . 停止服務(wù)的時候需要注意瓶竭,要先停止broker,其次停止nameserver渠羞。
默認情況下斤贰,nameserver監(jiān)聽的是?9876?端口。查看日志內(nèi)容出現(xiàn)如下信息即啟動成功:
啟動 broker:
nohup sh bin/mqbroker -n ${namesrvIp}:9876 -c /conf/broker.conf & ? 其中[-c可以指定broker.conf配置文件]次询。默認情況下會加載conf/broker.conf
停止broker :sh bin/mqshutdown broker
nohup sh mqbroker -n localhost:9876 &? 啟動broker荧恍,其中-n表示指定當前broker對應(yīng)的命名服務(wù)地址: 默認情況下,broker監(jiān)聽的是10911端口 屯吊。
輸入 tail -f ~/logs/rocketmqlogs/broker.log 查看日志
如果 tail -f ~/logs/rocketmqlogs/broker.log 提示找不到文件送巡,則打開當前目錄下的 nohup.out日志文件查看,出現(xiàn)如下日志表示啟動失敗盒卸,提示內(nèi)存無法分配
內(nèi)存不足的問題:
這是因為bin 目錄下啟動 nameserv 與 broker 的 runbroker.sh 和 runserver.sh 文件中默認分配的內(nèi)存太大骗爆,rocketmq比較耗內(nèi)存,所以默認分配的內(nèi)存比較大蔽介,而系統(tǒng)實際內(nèi)存卻太小導(dǎo)致啟動失敗淮腾,通常像虛擬機上安裝的 CentOS 服務(wù)器內(nèi)存可能是沒有高的糟需,只能調(diào)小。實際中應(yīng)該根據(jù)服務(wù)器內(nèi)存情況谷朝,配置一個合適的值 ,我這里設(shè)置成1g武花。
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m"Xms:是指設(shè)定程序啟動時占用內(nèi)存大小圆凰。一般來講,大點体箕,程序會啟動的快一點专钉,但是也可能會導(dǎo)致機器暫時間變慢。
Xmx:是指設(shè)定程序運行期間最大可占用的內(nèi)存大小累铅。如果程序運行需要占用更多的內(nèi)存跃须,超出了這個設(shè)置值,就會拋出OutOfMemory異常娃兽。
xmn:年輕代的heap大小菇民,一般設(shè)置為Xmx的3、4分之一
修改后重新啟動投储,輸入 tail -f ~/logs/rocketmqlogs/broker.log 查看日志:
在這里我們會發(fā)現(xiàn)第练,這個broker所監(jiān)聽的IP地址似乎不是我localhost,這個會導(dǎo)致后續(xù)我們會連接不上broker玛荞,我們需要修改配置文件 broker.conf 增加一行配置 brokerIP1=192.168.1.101娇掏,然后重新啟動 nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf &,再查看日志:
broker.conf 文件 基本配置:
namesrvAddr? :nameserver地址
brokerClusterName = DefaultCluster:Cluster名稱勋眯,如果集群機器數(shù)比較多婴梧,可以分成多個cluster,每個cluster提供給不同的業(yè)務(wù)場景使用
brokerName = broker-a:broker名稱客蹋,如果配置主從模式塞蹭,master和slave需要配置相同的名稱來表明關(guān)系
brokerId = 0:在主從模式中,一個master broker可以有多個slave嚼酝,0表示master浮还,大于0表示不同slave的id
deleteWhen = 04:刪除文件時間點,默認是凌晨4點
fileReservedTime = 48:文件保留時間闽巩,默認48小時
brokerRole = ASYNC_MASTER:?SYNC_MASTER/ASYNC_MASTER/SLAVE ; 同步表示slave和master消息同步完成后再返回信息給客戶端
flushDiskType = ASYNC_FLUSH:刷盤方式
autoCreateTopicEnable = true : topic不存在的情況下自動創(chuàng)建
brokerIP1 ? ? ? ? ip ? ? ? ? ? ip設(shè)置外網(wǎng)ip钧舌,不需要連接外網(wǎng)的話,可以在參數(shù)前面加#注釋掉
listenPort ? ? ? ? port ? ? ?? port可自由設(shè)置涎跨,一般設(shè)置10911
brokerPermission ? ?? 0x4|0x2 ? ? ? broker讀寫權(quán)限
defaultTopicQueueNums ? ? 8 ? ?? 默認topic讀寫隊列數(shù)
clusterTopicEnable ? ? ? ? ?? true ? ? 是否啟用集群topic
brokerTopicEnable ? ? ? ? ?? true ? ? 是否啟用brokertopic
autoCreateSubscriptionGroup ? ? TRUE ? ?? 是否允許Broker 自動創(chuàng)建訂閱組洼冻,建議線下開啟,線上關(guān)閉
sendMessageThreadPoolNums ? ? 1 ? ? ?? 發(fā)送消息線程池數(shù)量
storePathConsumeQueue ? ? ?? $HOME/store/consumequeue ? ?? 消費隊列存儲路徑
storePathIndex ? ? ? ? ? ? ? ? ? ? ? ? $HOME/store/index ? ? ? ? 消息索引存儲路徑
storeCheckpoint ? ? ? ? ?? $HOME/store/checkpoint ? ? ? ?? checkpoint 文件存儲路徑
abortFile ? ? ? ? ? ? ? ?? $HOME/store/abort ? ? ? ? ? ? abort 文件存儲路徑
消息發(fā)送和接收基本應(yīng)用:
1.添加 pom 依賴:
??org.apache.rocketmq??rocketmq-client??4.5.2
2.生產(chǎn)者 producer:
publicclass RocketMqProducer {
? ? publicstaticvoid main(String[] args) throws MQClientException, InterruptedException {
? ? ? ? /*????? ? *生產(chǎn)者組隅很,簡單來說就是多個發(fā)送同一類消息的生產(chǎn)者稱之為一個生產(chǎn)者組
? ? ? ? *rocketmq支持事務(wù)消息撞牢,在發(fā)送事務(wù)消息時,如果事務(wù)消息異常(producer掛了),broker端會來回查
? ? ? ? *事務(wù)的狀態(tài)屋彪,這個時候會根據(jù)group名稱來查找對應(yīng)的producer來執(zhí)行相應(yīng)的回查邏輯所宰。相當于實現(xiàn)了producer的高可用
????? ? */? ? ? ? DefaultMQProducer producer =newDefaultMQProducer("unique_producer_group_name");
producer.setDefaultTopicQueueNums(3);//設(shè)置默認的queue數(shù)量
//指定namesrv服務(wù)地址,獲取broker相關(guān)信息producer.setNamesrvAddr("192.168.1.101:9876");
? ? ? ? producer.start();
? ? ? ? for(inti =0; i <10; i++) {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? //創(chuàng)建一個消息實例畜挥,指定指定topic仔粥、tag、消息內(nèi)容Message msg =newMessage("testTopic","testTag",
? ? ? ? ? ? ? ? ? ? ? ? ("Hello RocketMQ "+ i).getBytes(RemotingHelper.DEFAULT_CHARSET)/* Message body */);
? ? ? ? ? ? ? ? //發(fā)送消息并且獲取發(fā)送結(jié)果SendResult sendResult = producer.send(msg);
? ? ? ? ? ? ? ? System.out.printf("%s%n", sendResult);
? ? ? ? ? ? } catch (Exception e) {
? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? ? ? Thread.sleep(1000);
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? producer.shutdown();
? ? }
}
SendResult中蟹但,有一個sendStatus狀態(tài)躯泰,表示消息的發(fā)送狀態(tài)。一共有四種狀態(tài):
FLUSH_DISK_TIMEOUT?: 表示沒有在規(guī)定時間內(nèi)完成刷盤(需要Broker 的刷盤策Ill創(chuàng)立設(shè)置成SYNC_FLUSH 才會報這個錯誤) 华糖。
FLUSH_SLAVE_TIMEOUT?:表示在主備方式下麦向,并且Broker 被設(shè)置成SYNC_MASTER 方式,沒有在設(shè)定時間內(nèi)完成主從同步客叉。
SLAVE_NOT_AVAILABLE?: 這個狀態(tài)產(chǎn)生的場景和FLUSH_SLAVE_TIMEOUT 類似仰禀, 表示在主備方式下洁闰,并且Broker 被設(shè)置成SYNC_MASTER ,但是沒有找到被配置成Slave 的Broker 。
SEND_OK?:表示發(fā)送成功堂湖,發(fā)送成功的具體含義着降,比如消息是否已經(jīng)被存儲到磁盤侦讨?消息是否被同步到了Slave 上窘面?消息在Slave 上是否被寫入磁盤?需要結(jié)合所配置的刷盤策略件相、主從策略來定再扭。這個狀態(tài)還可以簡單理解為,沒有發(fā)生上面列出的三個問題狀態(tài)就是SEND OK
3.消費者consumer:
publicclass RocketMqConsumer {
? ? publicstaticvoid main(String[] args) throws MQClientException {
? ? ? ? //消費者的組名夜矗,這個和kafka是一樣,這里需要注意的是泛范,DefaultMQPushConsumer consumer =newDefaultMQPushConsumer("unique_consumer_group_name");
? ? ? ? //指定NameServer地址,多個地址以 ; 隔開consumer.setNamesrvAddr("192.168.1.101:9876");
? ? ? ? //設(shè)置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費
? ? ? ? //如果非第一次啟動紊撕,那么按照上次消費的位置繼續(xù)消費? ? ? ? consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
? ? ? ? //訂閱PushTopic下Tag為push的消息consumer.subscribe("testTopic","*");//*表示不過濾罢荡,可以通過tag來過濾,比如:”tagA”/*????? ? * 注冊消息監(jiān)聽回調(diào)這里有兩種監(jiān)聽对扶,MessageListenerConcurrently以及MessageListenerOrderly
? ? ? ? * 前者是普通監(jiān)聽区赵,后者是順序監(jiān)聽。這塊在后續(xù)單獨說明
????? ? */? ? ? ? consumer.registerMessageListener(new MessageListenerConcurrently() {
? ? ? ? ? ? @Override
? ? ? ? ? ? publicConsumeConcurrentlyStatus consumeMessage(List? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? msgs,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ConsumeConcurrentlyContext context) {
? ? ? ? ? ? ? ? System.out.printf("%s Receive New Messages: %s %n",
? ? ? ? ? ? ? ? ? ? ? ? Thread.currentThread().getName(), msgs);
? ? ? ? ? ? ? ? returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;//返回消息消費狀態(tài)? ? ? ? ? ? }
? ? ? ? });
? ? ? ? consumer.start();
? ? ? ? System.out.printf("Consumer Started.%n");
? ? }
Rocketmq中支持廣播消息浪南,就意味著同一個group中的消費者可以消費同一個消息笼才。
consumerGroup:位于同一個consumerGroup中的consumer實例和producerGroup中的各個produer實例承擔的角色類似;同一個group中可以配置多個consumer络凿,可以提高消費端的并發(fā)消費能力以及容災(zāi),和kafka一樣骡送,多個consumer會對消息做負載均衡昂羡,意味著同一個topic下的不同messageQueue會分發(fā)給同一個group中的不同consumer。同時摔踱,如果我們希望消息能夠達到廣播的目的虐先,那么只需要把consumer加入到不同的group就行。
RocketMQ提供了兩種消息消費模型昌渤,一種是pull主動拉去赴穗,另一種是push,被動接收膀息。但實際上RocketMQ都是pull模式,只是push在pull模式上做了一層封裝了赵,也就是pull到消息以后觸發(fā)業(yè)務(wù)消費者注冊到這里的callback. RocketMQ是基于長輪訓來實現(xiàn)消息的pull潜支。
nameServer的地址:name server地址,用于獲取broker柿汛、topic信息冗酿。
SpringBoot整合RocketMq:
1.pom.xml:
? ? ? ? ? ? org.springframework.boot? ? ? ? ? ? spring-boot-starter-web? ? ? ? ? ? ? ? ? ? ? ? ? ? org.apache.rocketmq? ? ? ? ? ? rocketmq-client? ? ? ? ? ? 4.5.2
2.application.yml :
rocketmq:
? # 生產(chǎn)者配置
? producer:
? ? isOnOff: on
? ? # 發(fā)送同一類消息的設(shè)置為同一個group,保證唯一
? ? groupName: unique_producer_group_name
? ? # 服務(wù)地址
? ? namesrvAddr: 192.168.1.101:9876? ? # 消息最大長度 默認1024*4(4M)
? ? maxMessageSize: 4096? ? # 發(fā)送消息超時時間,默認3000
? ? sendMsgTimeout: 3000? ? # 發(fā)送消息失敗重試次數(shù)络断,默認2
? ? retryTimesWhenSendFailed: 2? # 消費者配置
? consumer:
? ? isOnOff: on
? ? # 官方建議:確保同一組中的每個消費者訂閱相同的主題裁替。
? ? groupName: unique_consumer_group_name
? ? # 服務(wù)地址
? ? namesrvAddr: 192.168.1.101:9876? ? # 接收該 Topic 下所有 Tag
? ? topics: testTopic~*;
? ? consumeThreadMin: 20? ? consumeThreadMax: 64? ? # 設(shè)置一次消費消息的條數(shù),默認為1條
? ? consumeMessageBatchMaxSize: 1# 配置 Group? Topic? Tag
plat:
? plat-group: unique_group_name
? plat-topic: testTopic
? plat-tag: testTag
3.?ProducerConfig 生產(chǎn)者配置:
@Configurationpublicclass ProducerConfig {
? ? privatestaticfinal Logger LOG = LoggerFactory.getLogger(ProducerConfig.class) ;
? ? @Value("${rocketmq.producer.groupName}")
? ? private String groupName;
? ? @Value("${rocketmq.producer.namesrvAddr}")
? ? private String namesrvAddr;
? ? @Value("${rocketmq.producer.maxMessageSize}")
? ? private Integer maxMessageSize ;
? ? @Value("${rocketmq.producer.sendMsgTimeout}")
? ? private Integer sendMsgTimeout;
? ? @Value("${rocketmq.producer.retryTimesWhenSendFailed}")
? ? private Integer retryTimesWhenSendFailed;
? ? @Bean
? ? public DefaultMQProducer defaultMQProducer() {
? ? ? ? DefaultMQProducer producer;
? ? ? ? producer =newDefaultMQProducer(this.groupName);
? ? ? ? producer.setNamesrvAddr(this.namesrvAddr);
? ? ? ? //如果需要同一個jvm中不同的producer往不同的mq集群發(fā)送消息貌笨,需要設(shè)置不同的instanceNameif(this.maxMessageSize!=null){
? ? ? ? ? ? producer.setMaxMessageSize(this.maxMessageSize);
? ? ? ? }
? ? ? ? if(this.sendMsgTimeout!=null){
? ? ? ? ? ? producer.setSendMsgTimeout(this.sendMsgTimeout);
? ? ? ? }
? ? ? ? //如果發(fā)送消息失敗弱判,設(shè)置重試次數(shù),默認為2次if(this.retryTimesWhenSendFailed!=null){
? ? ? ? ? ? producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
? ? ? ? }
? ? ? ? try {
? ? ? ? ? ? producer.start();
? ? ? ? } catch (MQClientException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? ? ? return producer;
? ? }
}
4.ConsumerConfig 消費者配置:
@Configurationpublicclass ConsumerConfig {
? ? privatestaticfinal Logger LOG = LoggerFactory.getLogger(ConsumerConfig.class) ;
? ? @Value("${rocketmq.consumer.namesrvAddr}")
? ? private String namesrvAddr;
? ? @Value("${rocketmq.consumer.groupName}")
? ? private String groupName;
? ? @Value("${rocketmq.consumer.consumeThreadMin}")
? ? privateint consumeThreadMin;
? ? @Value("${rocketmq.consumer.consumeThreadMax}")
? ? privateint consumeThreadMax;
? ? @Value("${rocketmq.consumer.topics}")
? ? private String topics;
? ? @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
? ? privateint consumeMessageBatchMaxSize;
? ? @Resource
? ? private RocketMsgListener msgListener;
? ? @Bean
? ? public DefaultMQPushConsumer defaultMQPushConsumer(){
? ? ? ? DefaultMQPushConsumer consumer =new DefaultMQPushConsumer(groupName);
? ? ? ? consumer.setNamesrvAddr(namesrvAddr);
? ? ? ? consumer.setConsumeThreadMin(consumeThreadMin);
? ? ? ? consumer.setConsumeThreadMax(consumeThreadMax);
? ? ? ? consumer.registerMessageListener(msgListener);
? ? ? ? consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
? ? ? ? consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
? ? ? ? try {
? ? ? ? ? ? String[] topicTagsArr = topics.split(";");
? ? ? ? ? ? for (String topicTags : topicTagsArr) {
? ? ? ? ? ? ? ? String[] topicTag = topicTags.split("~");
? ? ? ? ? ? ? ? consumer.subscribe(topicTag[0],topicTag[1]);
? ? ? ? ? ? }
? ? ? ? ? ? consumer.start();
? ? ? ? }catch (MQClientException e){
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? ? ? return consumer;
? ? }
}
5.RocketMsgListener 監(jiān)聽器:
@Componentpublicclass RocketMsgListener implements MessageListenerConcurrently {
? ? privatestaticfinal Logger LOG = LoggerFactory.getLogger(RocketMsgListener.class) ;
? ? @Resource
? ? private ParamConfigService paramConfigService ;
? ? @Override
? ? publicConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext context) {
? ? ? ? if (CollectionUtils.isEmpty(list)){
? ? ? ? ? ? return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
? ? ? ? }
? ? ? ? MessageExt messageExt = list.get(0);
? ? ? ? LOG.info("接受到的消息為:"+new String(messageExt.getBody()));
? ? ? ? intreConsume = messageExt.getReconsumeTimes();
? ? ? ? // 消息已經(jīng)重試了3次锥惋,如果不需要再次消費昌腰,則返回成功if(reConsume ==3){
? ? ? ? ? ? return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
? ? ? ? }
? ? ? ? if(messageExt.getTopic().equals(paramConfigService.platTopic)){
? ? ? ? ? ? String tags = messageExt.getTags() ;
? ? ? ? ? ? switch (tags){
? ? ? ? ? ? ? ? case"testTag":
? ? ? ? ? ? ? ? ? ? LOG.info("匹配到testTag"+tags);
? ? ? ? ? ? ? ? ? ? break ;
? ? ? ? ? ? ? ? default:
? ? ? ? ? ? ? ? ? ? LOG.info("未匹配到Tag == >>"+tags);
? ? ? ? ? ? ? ? ? ? break;
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? // 消息消費成功return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
? ? }
}
6.參數(shù)配置類:
@Servicepublicclass ParamConfigService {
? ? @Value("${plat.plat-group}")
? ? public String platGroup ;
? ? @Value("${plat.plat-topic}")
? ? public String platTopic ;
? ? @Value("${plat.plat-tag}")
? ? public String accountTag ;
//省略 get? set}
7.測試類:
@RestControllerpublicclass TestController {
? ? @Autowired
? ? private DefaultMQProducer defaultMQProducer;
? ? @Autowired
? ? private ParamConfigService paramConfigService;
? ? @RequestMapping(value ="/testStringQueue.json", method = {RequestMethod.GET})
? ? public SendResult testStringQueue() {
? ? ? ? // 可以不使用Config中的Group? ? ? ? defaultMQProducer.setProducerGroup(paramConfigService.platGroup);
? ? ? ? SendResult sendResult =null;
? ? ? ? String msgInfo ="rocketmq? message 1";
? ? ? ? try {
? ? ? ? ? ? Message sendMsg =new Message(paramConfigService.platTopic,
? ? ? ? ? ? ? ? ? ? paramConfigService.accountTag, msgInfo.getBytes());
? ? ? ? ? ? sendResult = defaultMQProducer.send(sendMsg);
? ? ? ? } catch (Exception e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? ? ? return sendResult;
? ? }
}
啟動項目訪問接口就可以看到效果。