springboot整合rocketMq示例

簡介

rocketMq消息隊列以前是阿里開發(fā)的后來捐贈給Apache開源,先進先出,可以用來推送消息炫贤,也可以一對多發(fā)布消息(廣播訂閱),redis也能實現(xiàn)這些功能付秕,但是對比起redis兰珍,rocketMq的可靠性要更好,比如生產(chǎn)者發(fā)布了消息询吴,如果消費者那邊程序掛了沒收到消息掠河,消息不會消失,等消費者程序恢復(fù)正常后猛计,消息還能被接收到唠摹。

producer:生產(chǎn)者,主動發(fā)消息的那一端奉瘤。
comsumer:消費者勾拉,被動接收消息的那一端(通過監(jiān)聽器觸發(fā)pull動作)。
生產(chǎn)者可以一對一的發(fā)消息盗温,也可以一對多的發(fā)消息藕赞,多個comsumer都能收到相同的消息。生產(chǎn)者發(fā)送消息的過程為廣播動作卖局,消費者接收消息的過程為訂閱動作斧蜕。

利用rocketmq接收消息而不是直接發(fā)送消息給數(shù)據(jù)庫,可以大大緩解在高并發(fā)下數(shù)據(jù)庫承受的壓力砚偶,按照先進先出的規(guī)則先存入rocketmq,再一個一個的被消費者獲取操作數(shù)據(jù)庫批销,這個過程為削峰洒闸。

安裝

可以去這個網(wǎng)址https://archive.apache.org/dist/rocketmq下載安裝,jdk最好用1.8,用jdk11跑不起來风钻,用的公司安裝好的集群或自己電腦上安裝的單機都是可以運行的顷蟀。
以4.5.1為例

wget https://archive.apache.org/dist/rocketmq/4.5.1/rocketmq-all-4.5.1-bin-release.zip
unzip rocketmq-all-4.5.1-bin-release.zip

解壓之后先把conf文件夾下的broker.conf文件后面加入自己電腦的ip

brokerIP1=192.168.137.18

為啥要加這個呢酒请,這個程序不太智能骡技,如果我電腦上開了docker,docker的ip排在第一個,它默認就把docker的ip作為我本機的ip了羞反,所以要指定布朦。
將bin文件夾下的runbroker.sh的第一個JAVA_OPT改成

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"

將runserver.sh的第一個JAVA_OPT改成

JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx256m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

因為默認的內(nèi)存占用太大了,如果是8g的電腦跑不起來

啟動namesrv
nohup sh bin/mqnamesrv &

查看啟動日志

tail -f ~/logs/rocketmqlogs/namesrv.log
指定ip端口和修改后的配置文件啟動broker
nohup sh bin/mqbroker -n 192.168.137:9876 -c ~/myspace/profiles/rocketmq-all-4.5.1-bin-release/conf/broker.conf &

查看啟動日志

tail -f ~/logs/rocketmqlogs/broker.log 

這樣rocketmq就算啟動好了
停止命令:

sh bin/mqshutdown namesrv
sh bin/mqshutdown broker

springboot整合rocketMq

maven引入(可以去maven倉庫下載最新版昼窗,截止目前是2.1.1最新):

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>

application.properties加入:

rocketmq.name-server:172.26.xxx.24:9876;172.26.xxx.25:9876
rocketmq.producer.group=producerGroup1
rocketmq.producer.retry-times-when-send-failed=2
rocketmq.producer.retry-times-when-send-async-failed=0
rocketmq.producer.send-message-timeout=300000
rocketmq.producer.max-message-size=4194304
rocketmq.producer.retry-next-server=false

上面的server如果是集群則是多個地址分號分隔是趴,如果是單機只填一個就好了,rocketmq.producer.group是生產(chǎn)者組名稱澄惊,名字可以自己取

新建生產(chǎn)者類Producer.java

package com.zhaohy.app.rocketMq;

import java.util.HashMap;
import java.util.Map;

import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
public class Producer {
    @Autowired
    private RocketMQTemplate mqTemplate;
    
    public void send() {
        Map<String, Object> resultMap = new HashMap<String, Object>();
        resultMap.put("name", "testName");
        resultMap.put("id", "1");
        resultMap.put("sex", "1");
        //發(fā)送消息
        mqTemplate.convertAndSend("Topic1:TagA", resultMap);

        //發(fā)送spring的Message
        mqTemplate.send("Topic1:TagA", MessageBuilder.withPayload(resultMap).build());

        //發(fā)送異步消息
        mqTemplate.asyncSend("Topic1:TagA", resultMap, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("發(fā)送成功");
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println("發(fā)送失敗");
            }
        });

        //發(fā)送順序消息
        mqTemplate.syncSendOrderly("Topic1", "1,創(chuàng)建", "3");
        mqTemplate.syncSendOrderly("Topic1", "2,支付", "2");
        mqTemplate.syncSendOrderly("Topic1", "3,完成", "1");
    }
}

在上面的send方法中有三種發(fā)送方式唆途,可以發(fā)送String也可以發(fā)送實體類。
新建消費者Consumer.java和Consumer2.java

package com.zhaohy.app.rocketMq;

import java.util.Map;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "Topic1", consumerGroup = "consumerGroup1")
public class Consumer implements RocketMQListener<Map<String, Object>> {
    @Override
    public void onMessage(Map<String, Object> paramsMap) {
        System.out.println("1收到: "+paramsMap.get("name"));
    }

}
package com.zhaohy.app.rocketMq;

import java.util.Map;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "Topic1", consumerGroup = "consumerGroup2")
public class Consumer2 implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("2收到: "+s);
    }

}

消費者類中注解里的topic即是在生產(chǎn)者定義的topic,多個消費者之間consumerGroup的名稱不能重復(fù)掸驱。

controller里新建測試方法:

@Autowired
Producer producer;

@RequestMapping("/test/producerSend.do")
    public void rocketMqProducerTest(HttpServletRequest request) {
        producer.send();
    }

運行這個測試接口:控制臺輸出如下:

發(fā)送成功
2收到: {"sex":"1","name":"testName","id":"1"}
2收到: 1,創(chuàng)建
2收到: {"sex":"1","name":"testName","id":"1"}
2收到: {"sex":"1","name":"testName","id":"1"}
2收到: 3,完成
2收到: 2,支付
1收到: testName
1收到: testName
1收到: testName

可以看到consumerGroup1里用Map<String, Object>接收的肛搬,只能拿到同是Map<String, Object>類型發(fā)送的生產(chǎn)者發(fā)送的信息,consumerGroup2里用String接收的信息能全部拿到毕贼,所以可以用String來接收温赔,然后再把json轉(zhuǎn)換成對象處理。

至此就成功運行了鬼癣,用rocketmq可以做并發(fā)量比較大的場景陶贼,如點贊,秒殺等場景待秃,之前寫過一篇是用redis做的點贊:java實現(xiàn)點贊功能示例拜秧,需要結(jié)合定時任務(wù),如果是用mq章郁,就不用定時也可以實現(xiàn)了枉氮,會簡單不少。

參考:https://www.cnblogs.com/zpKang/p/13717258.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
禁止轉(zhuǎn)載驱犹,如需轉(zhuǎn)載請通過簡信或評論聯(lián)系作者嘲恍。
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市雄驹,隨后出現(xiàn)的幾起案子佃牛,更是在濱河造成了極大的恐慌,老刑警劉巖医舆,帶你破解...
    沈念sama閱讀 216,544評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件俘侠,死亡現(xiàn)場離奇詭異象缀,居然都是意外死亡,警方通過查閱死者的電腦和手機爷速,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,430評論 3 392
  • 文/潘曉璐 我一進店門央星,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人惫东,你說我怎么就攤上這事莉给。” “怎么了廉沮?”我有些...
    開封第一講書人閱讀 162,764評論 0 353
  • 文/不壞的土叔 我叫張陵颓遏,是天一觀的道長。 經(jīng)常有香客問我滞时,道長叁幢,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,193評論 1 292
  • 正文 為了忘掉前任坪稽,我火速辦了婚禮曼玩,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘窒百。我一直安慰自己黍判,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,216評論 6 388
  • 文/花漫 我一把揭開白布贝咙。 她就那樣靜靜地躺著样悟,像睡著了一般。 火紅的嫁衣襯著肌膚如雪庭猩。 梳的紋絲不亂的頭發(fā)上窟她,一...
    開封第一講書人閱讀 51,182評論 1 299
  • 那天,我揣著相機與錄音蔼水,去河邊找鬼震糖。 笑死,一個胖子當(dāng)著我的面吹牛趴腋,可吹牛的內(nèi)容都是我干的吊说。 我是一名探鬼主播,決...
    沈念sama閱讀 40,063評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼优炬,長吁一口氣:“原來是場噩夢啊……” “哼颁井!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起蠢护,我...
    開封第一講書人閱讀 38,917評論 0 274
  • 序言:老撾萬榮一對情侶失蹤雅宾,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后葵硕,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體眉抬,經(jīng)...
    沈念sama閱讀 45,329評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡贯吓,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,543評論 2 332
  • 正文 我和宋清朗相戀三年缩举,在試婚紗的時候發(fā)現(xiàn)自己被綠了务豺。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,722評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡矢空,死狀恐怖库北,靈堂內(nèi)的尸體忽然破棺而出爬舰,到底是詐尸還是另有隱情,我是刑警寧澤贤惯,帶...
    沈念sama閱讀 35,425評論 5 343
  • 正文 年R本政府宣布洼专,位于F島的核電站,受9級特大地震影響孵构,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜烟很,卻給世界環(huán)境...
    茶點故事閱讀 41,019評論 3 326
  • 文/蒙蒙 一颈墅、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧雾袱,春花似錦恤筛、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,671評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至林说,卻和暖如春煎殷,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背腿箩。 一陣腳步聲響...
    開封第一講書人閱讀 32,825評論 1 269
  • 我被黑心中介騙來泰國打工豪直, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人珠移。 一個月前我還...
    沈念sama閱讀 47,729評論 2 368
  • 正文 我出身青樓弓乙,卻偏偏與公主長得像,于是被迫代替她去往敵國和親钧惧。 傳聞我的和親對象是個殘疾皇子暇韧,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,614評論 2 353

推薦閱讀更多精彩內(nèi)容