簡介
rocketMq消息隊列以前是阿里開發(fā)的后來捐贈給Apache開源,先進先出,可以用來推送消息炫贤,也可以一對多發(fā)布消息(廣播訂閱),redis也能實現(xiàn)這些功能付秕,但是對比起redis兰珍,rocketMq的可靠性要更好,比如生產(chǎn)者發(fā)布了消息询吴,如果消費者那邊程序掛了沒收到消息掠河,消息不會消失,等消費者程序恢復(fù)正常后猛计,消息還能被接收到唠摹。
producer:生產(chǎn)者,主動發(fā)消息的那一端奉瘤。
comsumer:消費者勾拉,被動接收消息的那一端(通過監(jiān)聽器觸發(fā)pull動作)。
生產(chǎn)者可以一對一的發(fā)消息盗温,也可以一對多的發(fā)消息藕赞,多個comsumer都能收到相同的消息。生產(chǎn)者發(fā)送消息的過程為廣播動作卖局,消費者接收消息的過程為訂閱動作斧蜕。
利用rocketmq接收消息而不是直接發(fā)送消息給數(shù)據(jù)庫,可以大大緩解在高并發(fā)下數(shù)據(jù)庫承受的壓力砚偶,按照先進先出的規(guī)則先存入rocketmq,再一個一個的被消費者獲取操作數(shù)據(jù)庫批销,這個過程為削峰洒闸。
安裝
可以去這個網(wǎng)址https://archive.apache.org/dist/rocketmq下載安裝,jdk最好用1.8,用jdk11跑不起來风钻,用的公司安裝好的集群或自己電腦上安裝的單機都是可以運行的顷蟀。
以4.5.1為例
wget https://archive.apache.org/dist/rocketmq/4.5.1/rocketmq-all-4.5.1-bin-release.zip
unzip rocketmq-all-4.5.1-bin-release.zip
解壓之后先把conf文件夾下的broker.conf文件后面加入自己電腦的ip
brokerIP1=192.168.137.18
為啥要加這個呢酒请,這個程序不太智能骡技,如果我電腦上開了docker,docker的ip排在第一個,它默認就把docker的ip作為我本機的ip了羞反,所以要指定布朦。
將bin文件夾下的runbroker.sh的第一個JAVA_OPT改成
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
將runserver.sh的第一個JAVA_OPT改成
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx256m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
因為默認的內(nèi)存占用太大了,如果是8g的電腦跑不起來
啟動namesrv
nohup sh bin/mqnamesrv &
查看啟動日志
tail -f ~/logs/rocketmqlogs/namesrv.log
指定ip端口和修改后的配置文件啟動broker
nohup sh bin/mqbroker -n 192.168.137:9876 -c ~/myspace/profiles/rocketmq-all-4.5.1-bin-release/conf/broker.conf &
查看啟動日志
tail -f ~/logs/rocketmqlogs/broker.log
這樣rocketmq就算啟動好了
停止命令:
sh bin/mqshutdown namesrv
sh bin/mqshutdown broker
springboot整合rocketMq
maven引入(可以去maven倉庫下載最新版昼窗,截止目前是2.1.1最新):
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
application.properties加入:
rocketmq.name-server:172.26.xxx.24:9876;172.26.xxx.25:9876
rocketmq.producer.group=producerGroup1
rocketmq.producer.retry-times-when-send-failed=2
rocketmq.producer.retry-times-when-send-async-failed=0
rocketmq.producer.send-message-timeout=300000
rocketmq.producer.max-message-size=4194304
rocketmq.producer.retry-next-server=false
上面的server如果是集群則是多個地址分號分隔是趴,如果是單機只填一個就好了,rocketmq.producer.group是生產(chǎn)者組名稱澄惊,名字可以自己取
新建生產(chǎn)者類Producer.java
package com.zhaohy.app.rocketMq;
import java.util.HashMap;
import java.util.Map;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class Producer {
@Autowired
private RocketMQTemplate mqTemplate;
public void send() {
Map<String, Object> resultMap = new HashMap<String, Object>();
resultMap.put("name", "testName");
resultMap.put("id", "1");
resultMap.put("sex", "1");
//發(fā)送消息
mqTemplate.convertAndSend("Topic1:TagA", resultMap);
//發(fā)送spring的Message
mqTemplate.send("Topic1:TagA", MessageBuilder.withPayload(resultMap).build());
//發(fā)送異步消息
mqTemplate.asyncSend("Topic1:TagA", resultMap, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("發(fā)送成功");
}
@Override
public void onException(Throwable throwable) {
System.out.println("發(fā)送失敗");
}
});
//發(fā)送順序消息
mqTemplate.syncSendOrderly("Topic1", "1,創(chuàng)建", "3");
mqTemplate.syncSendOrderly("Topic1", "2,支付", "2");
mqTemplate.syncSendOrderly("Topic1", "3,完成", "1");
}
}
在上面的send方法中有三種發(fā)送方式唆途,可以發(fā)送String也可以發(fā)送實體類。
新建消費者Consumer.java和Consumer2.java
package com.zhaohy.app.rocketMq;
import java.util.Map;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "Topic1", consumerGroup = "consumerGroup1")
public class Consumer implements RocketMQListener<Map<String, Object>> {
@Override
public void onMessage(Map<String, Object> paramsMap) {
System.out.println("1收到: "+paramsMap.get("name"));
}
}
package com.zhaohy.app.rocketMq;
import java.util.Map;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "Topic1", consumerGroup = "consumerGroup2")
public class Consumer2 implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("2收到: "+s);
}
}
消費者類中注解里的topic即是在生產(chǎn)者定義的topic,多個消費者之間consumerGroup的名稱不能重復(fù)掸驱。
controller里新建測試方法:
@Autowired
Producer producer;
@RequestMapping("/test/producerSend.do")
public void rocketMqProducerTest(HttpServletRequest request) {
producer.send();
}
運行這個測試接口:控制臺輸出如下:
發(fā)送成功
2收到: {"sex":"1","name":"testName","id":"1"}
2收到: 1,創(chuàng)建
2收到: {"sex":"1","name":"testName","id":"1"}
2收到: {"sex":"1","name":"testName","id":"1"}
2收到: 3,完成
2收到: 2,支付
1收到: testName
1收到: testName
1收到: testName
可以看到consumerGroup1里用Map<String, Object>接收的肛搬,只能拿到同是Map<String, Object>類型發(fā)送的生產(chǎn)者發(fā)送的信息,consumerGroup2里用String接收的信息能全部拿到毕贼,所以可以用String來接收温赔,然后再把json轉(zhuǎn)換成對象處理。
至此就成功運行了鬼癣,用rocketmq可以做并發(fā)量比較大的場景陶贼,如點贊,秒殺等場景待秃,之前寫過一篇是用redis做的點贊:java實現(xiàn)點贊功能示例拜秧,需要結(jié)合定時任務(wù),如果是用mq章郁,就不用定時也可以實現(xiàn)了枉氮,會簡單不少。