RocketMQ安裝部署及整合Springboot

轉(zhuǎn)載原文:https://www.cnblogs.com/wuzhenzhao/p/11498735.html? ? ? ? 作者:吳振照




RocketMQ安裝部署及整合Springboot

消息中間件的功能:

  通過學習ActiveMq钧嘶,kafka雹洗,rabbitMq這些消息中間件,我們大致能為消息中間件的功能做一下以下定義:可以先從基本的需求開始思考

最基本的是要能支持消息的發(fā)送和接收估蹄,需要涉及到網(wǎng)絡(luò)通信就一定會涉及到NIO

消息中心的消息存儲(持久化/非持久化)

消息的序列化和反序列化

是否跨語言

消息的確認機制,如何避免消息重發(fā)

  高級功能:

消息的有序性

是否支持事務(wù)消息

消息收發(fā)的性能圈盔,對高并發(fā)大數(shù)據(jù)量的支持

是否支持集群

消息的可靠性存儲

是否支持多協(xié)議

MQ消息存儲選擇:

  從主流的幾種MQ消息隊列采用的存儲方式來看戴尸,主要會有三種

分布式KV存儲,比如ActiveMQ中采用的levelDB付魔、Redis, 這種存儲方式對于消息讀寫能力要求不高的情況下可以使用

文件系統(tǒng)存儲飞蹂,常見的比如kafka几苍、RocketMQ、RabbitMQ都是采用消息刷盤到所部署的機器上的文件系統(tǒng)來做持久化陈哑,這種方案適合對于有高吞吐量要求的消息中間件妻坝,因為消息刷盤是一種高效率,高可靠惊窖、高性能的持久化方式刽宪,除非磁盤出現(xiàn)故障,否則一般是不會出現(xiàn)無法持久化的問題

關(guān)系型數(shù)據(jù)庫界酒,比如ActiveMQ可以采用mysql作為消息存儲圣拄,關(guān)系型數(shù)據(jù)庫在單表數(shù)據(jù)量達到千萬級的情況下IO性能會出現(xiàn)瓶頸,所以ActiveMQ并不適合于高吞吐量的消息隊列場景毁欣。

  總的來說售担,對于存儲效率,文件系統(tǒng)要優(yōu)于分布式KV存儲署辉,分布式KV存儲要優(yōu)于關(guān)系型數(shù)據(jù)庫.

RocketMQ的發(fā)展歷史:

  RocketMq是一個由阿里巴巴開源的消息中間件, 2012年開源岩四,2017年成為apache頂級項目哭尝。它的核心設(shè)計借鑒了Kafka,所以我們在了解RocketMQ的時候剖煌,會發(fā)現(xiàn)很多和kafka相同的特性材鹦。同時呢逝淹,Rocket在某些功能上和kafka又有較大的差異,接下來我們就去了解RocketMQ

支持集群模型桶唐、負載均衡栅葡、水平擴展能力

億級別消息堆積能力

采用零拷貝的原理,順序?qū)懕P尤泽,隨機讀

底層通信框架采用Netty NIO

NameServer代替Zookeeper欣簇,實現(xiàn)服務(wù)尋址和服務(wù)協(xié)調(diào)

消息失敗重試機制、消息可查詢

強調(diào)集群無單點坯约,可擴展熊咽,任意一點高可用,水平可擴展

經(jīng)過多次雙十一的考驗

RocketMQ的架構(gòu):

  集群本身沒有什么特殊之處闹丐,和kafka的整體架構(gòu)類似横殴,其中zookeeper替換成了NameServer。在rocketmq的早版本(2.x)的時候卿拴,是沒有namesrv組件的衫仑,用的是zookeeper做分布式協(xié)調(diào)和服務(wù)發(fā)現(xiàn),但是后期阿里數(shù)據(jù)根據(jù)實際業(yè)務(wù)需求進行改進和優(yōu)化堕花,自主研發(fā)了輕量級的namesrv,用于注冊Client服務(wù)與Broker的請求路由工作文狱,namesrv上不做任何消息的位置存儲,頻繁操作zookeeper的位置存儲數(shù)據(jù)會影響整體集群性能.

RocketMQ由四部分組成:

Name Server 可集群部署航徙,節(jié)點之間無任何信息同步如贷。提供輕量級的服務(wù)發(fā)現(xiàn)和路由

Broker(消息中轉(zhuǎn)角色,負責存儲消息到踏,轉(zhuǎn)發(fā)消息) 部署相對復(fù)雜杠袱,Broker 分為Master 與Slave,一個Master 可以對應(yīng)多個Slave窝稿,但是一個Slave 只能對應(yīng)一個Master楣富,Master 與Slave 的對應(yīng)關(guān)系通過指定相同的BrokerName,不同的BrokerId來定 義伴榔,BrokerId為0 表示Master纹蝴,非0 表示Slave。Master 也可以部署多個踪少。

Producer塘安,生產(chǎn)者,擁有相同 Producer Group 的 Producer 組成一個集群援奢, 與Name Server 集群中的其中一個節(jié)點(隨機選擇)建立長連接兼犯,定期從Name Server 取Topic 路由信息,并向提供Topic服務(wù)的Master 建立長連接,且定時向Master 發(fā)送心跳切黔。Producer 完全無狀態(tài)砸脊,可集群部署。

Consumer纬霞,消費者凌埂,接收消息進行消費的實例,擁有相同 Consumer Group 的 Consumer 組成一個集群诗芜,與Name Server 集群中的其中一個節(jié)點(隨機選擇)建立長連接瞳抓,定期從Name Server 取Topic 路由信息,并向提供Topic 服務(wù)的Master绢陌、Slave 建立長連接挨下,且定時向Master、Slave 發(fā)送心跳脐湾。Consumer既可以從Master 訂閱消息臭笆,也可以從Slave 訂閱消息,訂閱規(guī)則由Broker 配置決定秤掌。

  要使用rocketmq愁铺,至少需要啟動兩個進程,nameserver闻鉴、broker茵乱,前者是各種topic注冊中心,后者是真正的broker孟岛。

單機環(huán)境RocketMQ的安裝(單master):

下載?rocketmq的安裝文件:?http://rocketmq.apache.org

  解壓 unzip rocketmq-all-4.4.0-bin-release.zip

啟動 nameserver:

進入rocketMQ解壓目錄下的bin文件夾,啟動namesrv服務(wù):nohup sh mqnamesrv &? tail -f ~/logs/rocketmqlogs/namesrv.log 查看啟動日志

  停止 nameserver : sh bin/mqshutdown namesrv . 停止服務(wù)的時候需要注意瓶竭,要先停止broker,其次停止nameserver渠羞。

默認情況下斤贰,nameserver監(jiān)聽的是?9876?端口。查看日志內(nèi)容出現(xiàn)如下信息即啟動成功:

啟動 broker:

  nohup sh bin/mqbroker -n ${namesrvIp}:9876 -c /conf/broker.conf & ? 其中[-c可以指定broker.conf配置文件]次询。默認情況下會加載conf/broker.conf

  停止broker :sh bin/mqshutdown broker

nohup sh mqbroker -n localhost:9876 &? 啟動broker荧恍,其中-n表示指定當前broker對應(yīng)的命名服務(wù)地址: 默認情況下,broker監(jiān)聽的是10911端口 屯吊。

輸入 tail -f ~/logs/rocketmqlogs/broker.log 查看日志

如果 tail -f ~/logs/rocketmqlogs/broker.log 提示找不到文件送巡,則打開當前目錄下的 nohup.out日志文件查看,出現(xiàn)如下日志表示啟動失敗盒卸,提示內(nèi)存無法分配

內(nèi)存不足的問題:

  這是因為bin 目錄下啟動 nameserv 與 broker 的 runbroker.sh 和 runserver.sh 文件中默認分配的內(nèi)存太大骗爆,rocketmq比較耗內(nèi)存,所以默認分配的內(nèi)存比較大蔽介,而系統(tǒng)實際內(nèi)存卻太小導(dǎo)致啟動失敗淮腾,通常像虛擬機上安裝的 CentOS 服務(wù)器內(nèi)存可能是沒有高的糟需,只能調(diào)小。實際中應(yīng)該根據(jù)服務(wù)器內(nèi)存情況谷朝,配置一個合適的值 ,我這里設(shè)置成1g武花。

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m"Xms:是指設(shè)定程序啟動時占用內(nèi)存大小圆凰。一般來講,大點体箕,程序會啟動的快一點专钉,但是也可能會導(dǎo)致機器暫時間變慢。

Xmx:是指設(shè)定程序運行期間最大可占用的內(nèi)存大小累铅。如果程序運行需要占用更多的內(nèi)存跃须,超出了這個設(shè)置值,就會拋出OutOfMemory異常娃兽。

xmn:年輕代的heap大小菇民,一般設(shè)置為Xmx的3、4分之一

  修改后重新啟動投储,輸入 tail -f ~/logs/rocketmqlogs/broker.log 查看日志:

  在這里我們會發(fā)現(xiàn)第练,這個broker所監(jiān)聽的IP地址似乎不是我localhost,這個會導(dǎo)致后續(xù)我們會連接不上broker玛荞,我們需要修改配置文件 broker.conf 增加一行配置 brokerIP1=192.168.1.101娇掏,然后重新啟動 nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf &,再查看日志:

broker.conf 文件 基本配置:

namesrvAddr? :nameserver地址

brokerClusterName = DefaultCluster:Cluster名稱勋眯,如果集群機器數(shù)比較多婴梧,可以分成多個cluster,每個cluster提供給不同的業(yè)務(wù)場景使用

brokerName = broker-a:broker名稱客蹋,如果配置主從模式塞蹭,master和slave需要配置相同的名稱來表明關(guān)系

brokerId = 0:在主從模式中,一個master broker可以有多個slave嚼酝,0表示master浮还,大于0表示不同slave的id

deleteWhen = 04:刪除文件時間點,默認是凌晨4點

fileReservedTime = 48:文件保留時間闽巩,默認48小時

brokerRole = ASYNC_MASTER:?SYNC_MASTER/ASYNC_MASTER/SLAVE ; 同步表示slave和master消息同步完成后再返回信息給客戶端

flushDiskType = ASYNC_FLUSH:刷盤方式

autoCreateTopicEnable = true : topic不存在的情況下自動創(chuàng)建

brokerIP1 ? ? ? ? ip ? ? ? ? ? ip設(shè)置外網(wǎng)ip钧舌,不需要連接外網(wǎng)的話,可以在參數(shù)前面加#注釋掉

listenPort ? ? ? ? port ? ? ?? port可自由設(shè)置涎跨,一般設(shè)置10911

brokerPermission ? ?? 0x4|0x2 ? ? ? broker讀寫權(quán)限

defaultTopicQueueNums ? ? 8 ? ?? 默認topic讀寫隊列數(shù)

clusterTopicEnable ? ? ? ? ?? true ? ? 是否啟用集群topic

brokerTopicEnable ? ? ? ? ?? true ? ? 是否啟用brokertopic

autoCreateSubscriptionGroup ? ? TRUE ? ?? 是否允許Broker 自動創(chuàng)建訂閱組洼冻,建議線下開啟,線上關(guān)閉

sendMessageThreadPoolNums ? ? 1 ? ? ?? 發(fā)送消息線程池數(shù)量

storePathConsumeQueue ? ? ?? $HOME/store/consumequeue ? ?? 消費隊列存儲路徑

storePathIndex ? ? ? ? ? ? ? ? ? ? ? ? $HOME/store/index ? ? ? ? 消息索引存儲路徑

storeCheckpoint ? ? ? ? ?? $HOME/store/checkpoint ? ? ? ?? checkpoint 文件存儲路徑

abortFile ? ? ? ? ? ? ? ?? $HOME/store/abort ? ? ? ? ? ? abort 文件存儲路徑

消息發(fā)送和接收基本應(yīng)用:

1.添加 pom 依賴:

??org.apache.rocketmq??rocketmq-client??4.5.2

2.生產(chǎn)者 producer:


publicclass RocketMqProducer {

? ? publicstaticvoid main(String[] args) throws MQClientException, InterruptedException {

? ? ? ? /*????? ? *生產(chǎn)者組隅很,簡單來說就是多個發(fā)送同一類消息的生產(chǎn)者稱之為一個生產(chǎn)者組

? ? ? ? *rocketmq支持事務(wù)消息撞牢,在發(fā)送事務(wù)消息時,如果事務(wù)消息異常(producer掛了),broker端會來回查

? ? ? ? *事務(wù)的狀態(tài)屋彪,這個時候會根據(jù)group名稱來查找對應(yīng)的producer來執(zhí)行相應(yīng)的回查邏輯所宰。相當于實現(xiàn)了producer的高可用

????? ? */? ? ? ? DefaultMQProducer producer =newDefaultMQProducer("unique_producer_group_name");

producer.setDefaultTopicQueueNums(3);//設(shè)置默認的queue數(shù)量

//指定namesrv服務(wù)地址,獲取broker相關(guān)信息producer.setNamesrvAddr("192.168.1.101:9876");

? ? ? ? producer.start();

? ? ? ? for(inti =0; i <10; i++) {

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? //創(chuàng)建一個消息實例畜挥,指定指定topic仔粥、tag、消息內(nèi)容Message msg =newMessage("testTopic","testTag",

? ? ? ? ? ? ? ? ? ? ? ? ("Hello RocketMQ "+ i).getBytes(RemotingHelper.DEFAULT_CHARSET)/* Message body */);

? ? ? ? ? ? ? ? //發(fā)送消息并且獲取發(fā)送結(jié)果SendResult sendResult = producer.send(msg);

? ? ? ? ? ? ? ? System.out.printf("%s%n", sendResult);

? ? ? ? ? ? } catch (Exception e) {

? ? ? ? ? ? ? ? e.printStackTrace();

? ? ? ? ? ? ? ? Thread.sleep(1000);

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? producer.shutdown();

? ? }

}

  SendResult中蟹但,有一個sendStatus狀態(tài)躯泰,表示消息的發(fā)送狀態(tài)。一共有四種狀態(tài):

FLUSH_DISK_TIMEOUT?: 表示沒有在規(guī)定時間內(nèi)完成刷盤(需要Broker 的刷盤策Ill創(chuàng)立設(shè)置成SYNC_FLUSH 才會報這個錯誤) 华糖。

FLUSH_SLAVE_TIMEOUT?:表示在主備方式下麦向,并且Broker 被設(shè)置成SYNC_MASTER 方式,沒有在設(shè)定時間內(nèi)完成主從同步客叉。

SLAVE_NOT_AVAILABLE?: 這個狀態(tài)產(chǎn)生的場景和FLUSH_SLAVE_TIMEOUT 類似仰禀, 表示在主備方式下洁闰,并且Broker 被設(shè)置成SYNC_MASTER ,但是沒有找到被配置成Slave 的Broker 。

SEND_OK?:表示發(fā)送成功堂湖,發(fā)送成功的具體含義着降,比如消息是否已經(jīng)被存儲到磁盤侦讨?消息是否被同步到了Slave 上窘面?消息在Slave 上是否被寫入磁盤?需要結(jié)合所配置的刷盤策略件相、主從策略來定再扭。這個狀態(tài)還可以簡單理解為,沒有發(fā)生上面列出的三個問題狀態(tài)就是SEND OK

3.消費者consumer:

publicclass RocketMqConsumer {

? ? publicstaticvoid main(String[] args) throws MQClientException {

? ? ? ? //消費者的組名夜矗,這個和kafka是一樣,這里需要注意的是泛范,DefaultMQPushConsumer consumer =newDefaultMQPushConsumer("unique_consumer_group_name");

? ? ? ? //指定NameServer地址,多個地址以 ; 隔開consumer.setNamesrvAddr("192.168.1.101:9876");

? ? ? ? //設(shè)置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費

? ? ? ? //如果非第一次啟動紊撕,那么按照上次消費的位置繼續(xù)消費? ? ? ? consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

? ? ? ? //訂閱PushTopic下Tag為push的消息consumer.subscribe("testTopic","*");//*表示不過濾罢荡,可以通過tag來過濾,比如:”tagA”/*????? ? * 注冊消息監(jiān)聽回調(diào)這里有兩種監(jiān)聽对扶,MessageListenerConcurrently以及MessageListenerOrderly

? ? ? ? * 前者是普通監(jiān)聽区赵,后者是順序監(jiān)聽。這塊在后續(xù)單獨說明

????? ? */? ? ? ? consumer.registerMessageListener(new MessageListenerConcurrently() {

? ? ? ? ? ? @Override

? ? ? ? ? ? publicConsumeConcurrentlyStatus consumeMessage(List? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? msgs,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ConsumeConcurrentlyContext context) {

? ? ? ? ? ? ? ? System.out.printf("%s Receive New Messages: %s %n",

? ? ? ? ? ? ? ? ? ? ? ? Thread.currentThread().getName(), msgs);

? ? ? ? ? ? ? ? returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;//返回消息消費狀態(tài)? ? ? ? ? ? }

? ? ? ? });

? ? ? ? consumer.start();

? ? ? ? System.out.printf("Consumer Started.%n");

? ? }

  Rocketmq中支持廣播消息浪南,就意味著同一個group中的消費者可以消費同一個消息笼才。

  consumerGroup:位于同一個consumerGroup中的consumer實例和producerGroup中的各個produer實例承擔的角色類似;同一個group中可以配置多個consumer络凿,可以提高消費端的并發(fā)消費能力以及容災(zāi),和kafka一樣骡送,多個consumer會對消息做負載均衡昂羡,意味著同一個topic下的不同messageQueue會分發(fā)給同一個group中的不同consumer。同時摔踱,如果我們希望消息能夠達到廣播的目的虐先,那么只需要把consumer加入到不同的group就行。

  RocketMQ提供了兩種消息消費模型昌渤,一種是pull主動拉去赴穗,另一種是push,被動接收膀息。但實際上RocketMQ都是pull模式,只是push在pull模式上做了一層封裝了赵,也就是pull到消息以后觸發(fā)業(yè)務(wù)消費者注冊到這里的callback. RocketMQ是基于長輪訓來實現(xiàn)消息的pull潜支。

  nameServer的地址:name server地址,用于獲取broker柿汛、topic信息冗酿。

SpringBoot整合RocketMq:

1.pom.xml:

? ? ? ? ? ? org.springframework.boot? ? ? ? ? ? spring-boot-starter-web? ? ? ? ? ? ? ? ? ? ? ? ? ? org.apache.rocketmq? ? ? ? ? ? rocketmq-client? ? ? ? ? ? 4.5.2

2.application.yml :

rocketmq:

? # 生產(chǎn)者配置

? producer:

? ? isOnOff: on

? ? # 發(fā)送同一類消息的設(shè)置為同一個group,保證唯一

? ? groupName: unique_producer_group_name

? ? # 服務(wù)地址

? ? namesrvAddr: 192.168.1.101:9876? ? # 消息最大長度 默認1024*4(4M)

? ? maxMessageSize: 4096? ? # 發(fā)送消息超時時間,默認3000

? ? sendMsgTimeout: 3000? ? # 發(fā)送消息失敗重試次數(shù)络断,默認2

? ? retryTimesWhenSendFailed: 2? # 消費者配置

? consumer:

? ? isOnOff: on

? ? # 官方建議:確保同一組中的每個消費者訂閱相同的主題裁替。

? ? groupName: unique_consumer_group_name

? ? # 服務(wù)地址

? ? namesrvAddr: 192.168.1.101:9876? ? # 接收該 Topic 下所有 Tag

? ? topics: testTopic~*;

? ? consumeThreadMin: 20? ? consumeThreadMax: 64? ? # 設(shè)置一次消費消息的條數(shù),默認為1條

? ? consumeMessageBatchMaxSize: 1# 配置 Group? Topic? Tag

plat:

? plat-group: unique_group_name

? plat-topic: testTopic

? plat-tag: testTag

3.?ProducerConfig 生產(chǎn)者配置:

@Configurationpublicclass ProducerConfig {

? ? privatestaticfinal Logger LOG = LoggerFactory.getLogger(ProducerConfig.class) ;

? ? @Value("${rocketmq.producer.groupName}")

? ? private String groupName;

? ? @Value("${rocketmq.producer.namesrvAddr}")

? ? private String namesrvAddr;

? ? @Value("${rocketmq.producer.maxMessageSize}")

? ? private Integer maxMessageSize ;

? ? @Value("${rocketmq.producer.sendMsgTimeout}")

? ? private Integer sendMsgTimeout;

? ? @Value("${rocketmq.producer.retryTimesWhenSendFailed}")

? ? private Integer retryTimesWhenSendFailed;

? ? @Bean

? ? public DefaultMQProducer defaultMQProducer() {

? ? ? ? DefaultMQProducer producer;

? ? ? ? producer =newDefaultMQProducer(this.groupName);

? ? ? ? producer.setNamesrvAddr(this.namesrvAddr);

? ? ? ? //如果需要同一個jvm中不同的producer往不同的mq集群發(fā)送消息貌笨,需要設(shè)置不同的instanceNameif(this.maxMessageSize!=null){

? ? ? ? ? ? producer.setMaxMessageSize(this.maxMessageSize);

? ? ? ? }

? ? ? ? if(this.sendMsgTimeout!=null){

? ? ? ? ? ? producer.setSendMsgTimeout(this.sendMsgTimeout);

? ? ? ? }

? ? ? ? //如果發(fā)送消息失敗弱判,設(shè)置重試次數(shù),默認為2次if(this.retryTimesWhenSendFailed!=null){

? ? ? ? ? ? producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);

? ? ? ? }

? ? ? ? try {

? ? ? ? ? ? producer.start();

? ? ? ? } catch (MQClientException e) {

? ? ? ? ? ? e.printStackTrace();

? ? ? ? }

? ? ? ? return producer;

? ? }

}

4.ConsumerConfig 消費者配置:

@Configurationpublicclass ConsumerConfig {

? ? privatestaticfinal Logger LOG = LoggerFactory.getLogger(ConsumerConfig.class) ;

? ? @Value("${rocketmq.consumer.namesrvAddr}")

? ? private String namesrvAddr;

? ? @Value("${rocketmq.consumer.groupName}")

? ? private String groupName;

? ? @Value("${rocketmq.consumer.consumeThreadMin}")

? ? privateint consumeThreadMin;

? ? @Value("${rocketmq.consumer.consumeThreadMax}")

? ? privateint consumeThreadMax;

? ? @Value("${rocketmq.consumer.topics}")

? ? private String topics;

? ? @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")

? ? privateint consumeMessageBatchMaxSize;

? ? @Resource

? ? private RocketMsgListener msgListener;

? ? @Bean

? ? public DefaultMQPushConsumer defaultMQPushConsumer(){

? ? ? ? DefaultMQPushConsumer consumer =new DefaultMQPushConsumer(groupName);

? ? ? ? consumer.setNamesrvAddr(namesrvAddr);

? ? ? ? consumer.setConsumeThreadMin(consumeThreadMin);

? ? ? ? consumer.setConsumeThreadMax(consumeThreadMax);

? ? ? ? consumer.registerMessageListener(msgListener);

? ? ? ? consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

? ? ? ? consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);

? ? ? ? try {

? ? ? ? ? ? String[] topicTagsArr = topics.split(";");

? ? ? ? ? ? for (String topicTags : topicTagsArr) {

? ? ? ? ? ? ? ? String[] topicTag = topicTags.split("~");

? ? ? ? ? ? ? ? consumer.subscribe(topicTag[0],topicTag[1]);

? ? ? ? ? ? }

? ? ? ? ? ? consumer.start();

? ? ? ? }catch (MQClientException e){

? ? ? ? ? ? e.printStackTrace();

? ? ? ? }

? ? ? ? return consumer;

? ? }

}

5.RocketMsgListener 監(jiān)聽器:

@Componentpublicclass RocketMsgListener implements MessageListenerConcurrently {

? ? privatestaticfinal Logger LOG = LoggerFactory.getLogger(RocketMsgListener.class) ;

? ? @Resource

? ? private ParamConfigService paramConfigService ;

? ? @Override

? ? publicConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext context) {

? ? ? ? if (CollectionUtils.isEmpty(list)){

? ? ? ? ? ? return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

? ? ? ? }

? ? ? ? MessageExt messageExt = list.get(0);

? ? ? ? LOG.info("接受到的消息為:"+new String(messageExt.getBody()));

? ? ? ? intreConsume = messageExt.getReconsumeTimes();

? ? ? ? // 消息已經(jīng)重試了3次锥惋,如果不需要再次消費昌腰,則返回成功if(reConsume ==3){

? ? ? ? ? ? return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

? ? ? ? }

? ? ? ? if(messageExt.getTopic().equals(paramConfigService.platTopic)){

? ? ? ? ? ? String tags = messageExt.getTags() ;

? ? ? ? ? ? switch (tags){

? ? ? ? ? ? ? ? case"testTag":

? ? ? ? ? ? ? ? ? ? LOG.info("匹配到testTag"+tags);

? ? ? ? ? ? ? ? ? ? break ;

? ? ? ? ? ? ? ? default:

? ? ? ? ? ? ? ? ? ? LOG.info("未匹配到Tag == >>"+tags);

? ? ? ? ? ? ? ? ? ? break;

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? // 消息消費成功return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

? ? }

}

6.參數(shù)配置類:

@Servicepublicclass ParamConfigService {

? ? @Value("${plat.plat-group}")

? ? public String platGroup ;

? ? @Value("${plat.plat-topic}")

? ? public String platTopic ;

? ? @Value("${plat.plat-tag}")

? ? public String accountTag ;

   //省略 get? set}

7.測試類:

@RestControllerpublicclass TestController {

? ? @Autowired

? ? private DefaultMQProducer defaultMQProducer;

? ? @Autowired

? ? private ParamConfigService paramConfigService;

? ? @RequestMapping(value ="/testStringQueue.json", method = {RequestMethod.GET})

? ? public SendResult testStringQueue() {

? ? ? ? // 可以不使用Config中的Group? ? ? ? defaultMQProducer.setProducerGroup(paramConfigService.platGroup);

? ? ? ? SendResult sendResult =null;

? ? ? ? String msgInfo ="rocketmq? message 1";

? ? ? ? try {

? ? ? ? ? ? Message sendMsg =new Message(paramConfigService.platTopic,

? ? ? ? ? ? ? ? ? ? paramConfigService.accountTag, msgInfo.getBytes());

? ? ? ? ? ? sendResult = defaultMQProducer.send(sendMsg);

? ? ? ? } catch (Exception e) {

? ? ? ? ? ? e.printStackTrace();

? ? ? ? }

? ? ? ? return sendResult;

? ? }

}

  啟動項目訪問接口就可以看到效果。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末膀跌,一起剝皮案震驚了整個濱河市遭商,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌捅伤,老刑警劉巖劫流,帶你破解...
    沈念sama閱讀 210,914評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異丛忆,居然都是意外死亡祠汇,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,935評論 2 383
  • 文/潘曉璐 我一進店門蘸际,熙熙樓的掌柜王于貴愁眉苦臉地迎上來座哩,“玉大人,你說我怎么就攤上這事粮彤「睿” “怎么了姜骡?”我有些...
    開封第一講書人閱讀 156,531評論 0 345
  • 文/不壞的土叔 我叫張陵,是天一觀的道長屿良。 經(jīng)常有香客問我圈澈,道長,這世上最難降的妖魔是什么尘惧? 我笑而不...
    開封第一講書人閱讀 56,309評論 1 282
  • 正文 為了忘掉前任康栈,我火速辦了婚禮,結(jié)果婚禮上喷橙,老公的妹妹穿的比我還像新娘啥么。我一直安慰自己,他們只是感情好贰逾,可當我...
    茶點故事閱讀 65,381評論 5 384
  • 文/花漫 我一把揭開白布悬荣。 她就那樣靜靜地躺著,像睡著了一般疙剑。 火紅的嫁衣襯著肌膚如雪氯迂。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,730評論 1 289
  • 那天言缤,我揣著相機與錄音嚼蚀,去河邊找鬼。 笑死管挟,一個胖子當著我的面吹牛轿曙,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播哮独,決...
    沈念sama閱讀 38,882評論 3 404
  • 文/蒼蘭香墨 我猛地睜開眼拳芙,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了皮璧?” 一聲冷哼從身側(cè)響起舟扎,我...
    開封第一講書人閱讀 37,643評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎悴务,沒想到半個月后睹限,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,095評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡讯檐,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,448評論 2 325
  • 正文 我和宋清朗相戀三年羡疗,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片别洪。...
    茶點故事閱讀 38,566評論 1 339
  • 序言:一個原本活蹦亂跳的男人離奇死亡叨恨,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出挖垛,到底是詐尸還是另有隱情痒钝,我是刑警寧澤秉颗,帶...
    沈念sama閱讀 34,253評論 4 328
  • 正文 年R本政府宣布,位于F島的核電站送矩,受9級特大地震影響蚕甥,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜栋荸,卻給世界環(huán)境...
    茶點故事閱讀 39,829評論 3 312
  • 文/蒙蒙 一菇怀、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧晌块,春花似錦爱沟、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,715評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至靠汁,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間闽铐,已是汗流浹背蝶怔。 一陣腳步聲響...
    開封第一講書人閱讀 31,945評論 1 264
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留兄墅,地道東北人踢星。 一個月前我還...
    沈念sama閱讀 46,248評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像隙咸,于是被迫代替她去往敵國和親沐悦。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,440評論 2 348