mq的應(yīng)用場(chǎng)景
1。解耦
2。削峰
3【寥常【延遲消息】
4笆呆∥樯【分布式事務(wù)】
5.日志收集
6.消息分發(fā)
采用分布式事務(wù)方式確保消息默認(rèn)發(fā)送到rocketmq上获三,
默認(rèn)開(kāi)啟了數(shù)據(jù)持久化位置:${user.home}/store
消費(fèi)的時(shí)候報(bào)錯(cuò)會(huì)自動(dòng)重試
【默認(rèn)重試間隔是多少】
無(wú)序消息
對(duì)于無(wú)序消息(通過(guò)實(shí)現(xiàn)MessageListenerConcurrently接口來(lái)消費(fèi))食绿,第一次重試間隔時(shí)間是 10 秒踩叭。第二次重試間隔時(shí)間是 30 秒磕潮,第三次重試間隔時(shí)間是 1 分鐘,第四次重試間隔時(shí)間是 2 分鐘容贝,第五次重試間隔時(shí)間是 3 分鐘自脯,第六次重試間隔時(shí)間是 4 分鐘,第七次重試間隔時(shí)間是 5 分鐘斤富,第八次重試間隔時(shí)間是 6 分鐘膏潮,第九次重試間隔時(shí)間是 7 分鐘,第十次重試間隔時(shí)間是 8 分鐘满力,第十一次重試間隔時(shí)間是 9 分鐘焕参,第十二次重試間隔時(shí)間是 10 分鐘,第十三次重試間隔時(shí)間是 20 分鐘油额,第十四次重試間隔時(shí)間是 30 分鐘叠纷,第十五次重試間隔時(shí)間是 1 小時(shí),第十六次重試間隔時(shí)間是 2 小時(shí)潦嘶。
順序消息
對(duì)于順序消息(通過(guò)實(shí)現(xiàn)MessageListenerOrderly接口來(lái)消費(fèi))讲岁,默認(rèn)的重試間隔時(shí)間是 1000 毫秒(1 秒)〕囊裕可以通過(guò)MessageListenerOrderly接口的SUSPEND_CURRENT_QUEUE_TIME_MILLIS屬性來(lái)設(shè)置具體的重試間隔時(shí)間缓艳。
【重試次數(shù)限制及死信隊(duì)列】
RocketMQ 會(huì)限制消息的重試次數(shù)。當(dāng)消息重試達(dá)到一定次數(shù)(默認(rèn) 16 次)后看峻,如果仍然無(wú)法成功消費(fèi)阶淘,消息會(huì)被發(fā)送到死信隊(duì)列(DLQ,Dead - Letter - Queue)互妓。
死信隊(duì)列中的消息可以通過(guò)專(zhuān)門(mén)的工具或者自定義的程序進(jìn)行后續(xù)處理溪窒。例如,開(kāi)發(fā)人員可以編寫(xiě)一個(gè)工具來(lái)定期掃描死信隊(duì)列中的消息冯勉,分析消費(fèi)失敗的原因澈蚌,可能是消息格式錯(cuò)誤、業(yè)務(wù)邏輯處理有嚴(yán)重問(wèn)題(如數(shù)據(jù)庫(kù)連接異常無(wú)法恢復(fù)等)灼狰,然后根據(jù)具體情況對(duì)消息進(jìn)行手動(dòng)修復(fù)或者調(diào)整業(yè)務(wù)邏輯后重新消費(fèi)宛瞄。
mq事務(wù)
import com.alibaba.fastjson.JSON;
import com.csw.RocketMQ.dao.TxLogDao;
import com.csw.domain.Order;
import com.csw.domain.TxLog;
import com.csw.orderAndMsg.dao.OrderDao;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
import java.util.UUID;
@Service
public class OrderServiceImpl4 {
@Autowired
private OrderDao orderDao;
@Autowired
private TxLogDao txLogDao;
@Autowired
private RocketMQTemplate rocketMQTemplate;
//事務(wù)mq【第1步】
public void createOrderBefore(Order order) {
String txId = UUID.randomUUID().toString();
String orderJson = JSON.toJSONString(order);
//發(fā)送半事務(wù)消息
rocketMQTemplate.sendMessageInTransaction(
"tx_producer_group",
"tx_topic",
MessageBuilder.withPayload(orderJson).setHeader("txId", txId).build(),
order
);
}
//事務(wù)mq【第3步】
@Transactional
public void createOrder(String txId, Order order) {
//保存訂單
orderDao.save(order);
TxLog txLog = new TxLog();
txLog.setTxId(txId);
txLog.setDate(new Date());
//記錄事物日志
txLogDao.save(txLog);
}
import com.csw.RocketMQ.dao.TxLogDao;
import com.csw.domain.Order;
import com.csw.domain.TxLog;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
/**
* 事務(wù)消息接受
* 普通消息接收在user里
*/
@Service
@RocketMQTransactionListener(txProducerGroup = "tx_producer_group")
public class OrderServiceImpl4Listener implements RocketMQLocalTransactionListener {
@Autowired
private OrderServiceImpl4 orderServiceImpl4;
@Autowired
private TxLogDao txLogDao;
//執(zhí)行本地事物
//事務(wù)mq【第2步】
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String txId = (String) msg.getHeaders().get("txId");
try {
//本地事物
Order order = (Order) arg;
orderServiceImpl4.createOrder(txId,order);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
//消息回查【如果一段時(shí)間沒(méi)有收到消息比如,報(bào)錯(cuò)還沒(méi)有來(lái)得及提交就宕機(jī)了交胚,網(wǎng)絡(luò)延遲了mq沒(méi)收到等】
//事務(wù)mq【第4步】
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String txId = (String) msg.getHeaders().get("txId");
TxLog txLog = null;
int retryCount = 0;
while (retryCount < 3) { // 最多重試3次
txLog = txLogDao.findById(txId).get();
if (txLog!= null) {
return RocketMQLocalTransactionState.COMMIT;
} else {
try {
Thread.sleep(1000); // 等待1秒后重試
} catch (InterruptedException e) {
// 處理異常
}
retryCount++;
}
}
return RocketMQLocalTransactionState.ROLLBACK;
}
}
import com.csw.domain.Order;
import com.csw.domain.User;
import com.csw.sms.utils.SmsUtil;
import com.csw.user.dao.UserDao;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Random;
/**
* 普通消息接受
* 事務(wù)消息接收在order里
*/
@Slf4j
@Service("shopSmsService")
//consumerGroup-消費(fèi)者組名 topic-要消費(fèi)的主題
@RocketMQMessageListener(
consumerGroup = "tx_producer_group", //消費(fèi)者組名
topic = "tx_topic",//消費(fèi)主題
consumeMode = ConsumeMode.CONCURRENTLY,//消費(fèi)模式,指定是否順序消費(fèi) CONCURRENTLY(同步,默認(rèn)) ORDERLY(順序)
messageModel = MessageModel.CLUSTERING//消息模式 BROADCASTING(廣播) CLUSTERING(集群,默認(rèn))在廣播模式下份汗,消費(fèi)失敗的消息會(huì)被丟棄盈电,而在集群模式下,【消費(fèi)失敗的消息會(huì)被重新入隊(duì)等待稍后消費(fèi)】
)
public class RocketSms implements RocketMQListener<Order> {
@Autowired
private UserDao userDao;
//消費(fèi)邏輯
@Override
public void onMessage(Order order) {
log.error("接收到了一個(gè)訂單信息{},接下來(lái)就可以發(fā)送短信通知了", order);
// 獲取消息體中的JSON字符串
//根據(jù)uid 獲取手機(jī)號(hào)
User user = userDao.findById(order.getUid()).get();
//消費(fèi)失敗測(cè)試
//int mm = 1 / 0;
//生成驗(yàn)證碼 1-9 6
StringBuilder builder = new StringBuilder();
for (int i = 0; i < 6; i++) {
builder.append(new Random().nextInt(9) + 1);
}
String smsCode = builder.toString();
Param param = new Param(smsCode);
try {
//發(fā)送短信 {"code":"123456"}
SmsUtil smsUtil = new SmsUtil();
//節(jié)省資源
//smsUtil.send(user.getTelephone(),JSON.toJSONString(param),"yzm");
log.error("短信發(fā)送成功");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
class Param {
private String code;
}
}
延遲隊(duì)列
/**
* 發(fā)送延遲消息
*
* RocketMQ 默認(rèn)提供了 18 個(gè)延遲級(jí)別杯活,每個(gè)延遲級(jí)別對(duì)應(yīng)的延遲時(shí)間如下:
* 1s匆帚、5s、10s旁钧、30s吸重、1m、2m歪今、3m嚎幸、4m、5m彤委、6m鞭铆、7m或衡、8m焦影、9m、10m封断、20m斯辰、30m、1h坡疼、2h彬呻。
*/
public void sendDelayMessage() {
// 構(gòu)建消息,這里以簡(jiǎn)單的字符串消息為例
Message<String> message = MessageBuilder.withPayload("This is a delay message").build();
//超時(shí)10秒鐘柄瑰,延遲級(jí)別是5等于1分鐘
rocketMQTemplate.syncSend("topic-dely-test", message, 10 * 1000, 5);
}
}
package com.csw.rocketMQ;
import com.csw.user.dao.UserDao;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* 延遲隊(duì)列的組名可以隨便指定,但是不要指定已有的組名
*在
*/
@Slf4j
@Service
@RocketMQMessageListener(
consumerGroup = "topic-dely-test", topic = "topic-dely-test")
public class RocketDelaySms implements RocketMQListener<String> {
@Autowired
private UserDao userDao;
//消費(fèi)邏輯
@Override
public void onMessage(String message) {
log.error("接收到了一個(gè)訂單信息{},接下來(lái)就可以發(fā)送短信通知了", message);
}
}
配置
【配置文件】
rocketmq:
name-server: 192.168.147.131:9876 #rocketMQ服務(wù)的地址
【依賴(lài)】
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
下載RocketMQ并解壓
http://rocketmq.apache.org/release_notes/release-notes-4.4.0/
nohup ./bin/mqnamesrv &
nohup bin/mqbroker -n localhost:9876 &
下載控制臺(tái)
# 在git上下載下面的工程 rocketmq-console-1.0.0
https://github.com/apache/rocketmq-externals/releases
# 修改配置文件 rocketmq-console\src\main\resources\application.properties
server.port=7777 #項(xiàng)目啟動(dòng)后的端口號(hào)
rocketmq.config.namesrvAddr=192.168.109.131:9876 #nameserv的地址闸氮,注意防火墻要開(kāi)啟
9876端口
# 進(jìn)入控制臺(tái)項(xiàng)目,將工程打成jar包
mvn clean package -Dmaven.test.skip=true
# 啟動(dòng)控制臺(tái)
java -jar target/rocketmq-console-ng-1.0.0.jar
瀏覽器訪(fǎng)問(wèn)
http://localhost:7777/#/consumer