消息隊(duì)列如何實(shí)現(xiàn)流量削峰未檩?
要對(duì)流量進(jìn)行削峰戴尸,最容易想到的解決方案就是用消息隊(duì)列來(lái)緩沖瞬時(shí)流量,把同步的直接調(diào)用轉(zhuǎn)換成異步的間接推送讹挎,中間通過一個(gè)隊(duì)列在一端承接瞬時(shí)的流量洪峰校赤,在另一端平滑地將消息推送出去。
這里就不講springboot
和rabbitmq
如何集成了筒溃,參考文章https://www.cnblogs.com/fantongxue/p/12493497.html
一,準(zhǔn)備工作:
數(shù)據(jù)庫(kù)有一張商品表沾乘,庫(kù)存量是100×保現(xiàn)在有1000個(gè)消費(fèi)者準(zhǔn)備開搶這100個(gè)庫(kù)存。
t_product
表維護(hù)商品編號(hào)與商品庫(kù)存剩余數(shù)量翅阵。編號(hào)No123321的這種商品的庫(kù)存量有100個(gè)歪玲。
t_product_record
維護(hù)搶到商品的用戶ID。理論上t_product
表開搶后的 記錄數(shù)量應(yīng)該是100條(共有100個(gè)人搶到了商品)掷匠。
我們使用壓力測(cè)試工具jweter
對(duì)其進(jìn)行并發(fā)性測(cè)試滥崩。
二,springboot開始集成rabbitmq
1讹语,加入amqp的依賴
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
2钙皮,配置application.yml配置文件
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC
username: root
password: 1234
rabbitmq:
host: 101.201.101.206
username: guest
password: guest
publisher-confirms: true # 開啟Rabbitmq發(fā)送消息確認(rèn)機(jī)制,發(fā)送消息到隊(duì)列并觸發(fā)回調(diào)方法
publisher-returns: true
listener:
simple:
concurrency: 10 #消費(fèi)者數(shù)量
max-concurrency: 10 #最大消費(fèi)者數(shù)量
prefetch: 1 #限流(消費(fèi)者每次從隊(duì)列獲取的消息數(shù)量)
auto-startup: true #啟動(dòng)時(shí)自動(dòng)啟動(dòng)容器
acknowledge-mode: manual #開啟ACK手動(dòng)確認(rèn)模式
3,RabbitConfig配置類
1短条, 定義消息轉(zhuǎn)換實(shí)例 导匣,轉(zhuǎn)化成 JSON
傳輸
2 , 配置啟用rabbitmq
事務(wù)
package com.aaa.springredis.controller;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class RabbitConfig {
/**
* 定義消息轉(zhuǎn)換實(shí)例 茸时,轉(zhuǎn)化成 JSON傳輸
*
* @return Jackson2JsonMessageConverter
*/
@Bean
public MessageConverter integrationEventMessageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* 配置啟用rabbitmq事務(wù)
*
* @param connectionFactory connectionFactory
* @return RabbitTransactionManager
*/
@Bean
public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
}
4贡定,初始化rabbitmq的回調(diào)函數(shù)
說明:被@PostConstruct
修飾的方法會(huì)在服務(wù)器加載Servlet
的時(shí)候運(yùn)行,并且只會(huì)被服務(wù)器執(zhí)行一次可都。如果想在生成對(duì)象時(shí)完成某些初始化操作缓待,而偏偏這些初始化操作又依賴于依賴注入,那么久無(wú)法在構(gòu)造函數(shù)中實(shí)現(xiàn)渠牲。為此命斧,可以使用@PostConstruct
注解一個(gè)方法來(lái)完成初始化,@PostConstruct
注解的方法將會(huì)在依賴注入完成后被自動(dòng)調(diào)用嘱兼。</font>
回調(diào)函數(shù)的使用前提是配置文件中開啟了rabitmq
消息確認(rèn)機(jī)制
Constructor >> @Autowired >> @PostConstruct
@Autowired
RabbitTemplate rabbitTemplate;
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitController.class);
@PostConstruct
private void init(){
/**
* 消息發(fā)送到交換器Exchange后觸發(fā)回調(diào)国葬。
* 使用該功能需要開啟確認(rèn),spring-boot中配置如下:
* spring.rabbitmq.publisher-confirms = true
*/
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
if (b) {
System.out.println("消息已確認(rèn) cause:{"+s+"} - {"+correlationData+"}");
} else {
System.out.println("消息未確認(rèn) cause:{"+s+"} - {"+correlationData+"}");
}
}
});
/**
* 通過實(shí)現(xiàn)ReturnCallback接口芹壕,
* 如果消息從交換器發(fā)送到對(duì)應(yīng)隊(duì)列失敗時(shí)觸發(fā)
* 比如根據(jù)發(fā)送消息時(shí)指定的routingKey找不到隊(duì)列時(shí)會(huì)觸發(fā)
* 使用該功能需要開啟確認(rèn)汇四,spring-boot中配置如下:
* spring.rabbitmq.publisher-returns = true
*/
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
LOGGER.error("消息被退回:{}", message);
LOGGER.error("消息使用的交換機(jī):{}", exchange);
LOGGER.error("消息使用的路由鍵:{}", routingKey);
LOGGER.error("描述:{}", replyText);
}
});
}
三,開始測(cè)試
1踢涌,寫搶單測(cè)試類
寫搶單測(cè)試類通孽,我們使用jweter
壓力測(cè)試工具開啟1000個(gè)線程進(jìn)行測(cè)試(開啟多線程并發(fā)測(cè)試),所以為了區(qū)別每一個(gè)模擬的用戶睁壁,使用userId
累加的方式進(jìn)行區(qū)分背苦。
private int userId=0;
//開始搶單
@RequestMapping("/begin")
@ResponseBody
public void begin(){
userId++;
this.send(userId);
}
而上面的send
方法就是把接收到的用戶請(qǐng)求發(fā)送到rabbitmq
消息中間件中。
@RequestMapping("/send")
@ResponseBody
public String send(Integer messge){
//第一個(gè)參數(shù):交換機(jī)名字 第二個(gè)參數(shù):Routing Key的值 第三個(gè)參數(shù):傳遞的消息對(duì)象
rabbitTemplate.convertAndSend("test.direct","test",messge);
return "發(fā)送消息成功";
}
2潘明,配置rabbitmq監(jiān)聽方法
rabbitmq
監(jiān)聽上篇文章也說過了行剂,作用就是監(jiān)聽指定隊(duì)列中收到來(lái)自交換機(jī)的消息,來(lái)一條收一條钳降,收完為止厚宰!
通過ACK
確認(rèn)是否被正確接收,每個(gè)Message
都要被確認(rèn)遂填,可以手動(dòng)去 ACK
或自動(dòng)ACK
铲觉,如果信息消費(fèi)失敗的話會(huì)拒絕當(dāng)前消息,并把消息返回原隊(duì)列吓坚。
從隊(duì)列中收到用戶的userId
撵幽,然后進(jìn)行購(gòu)買商品模擬操作(減少一個(gè)庫(kù)存,新增一條購(gòu)買記錄)
@Autowired
RabbitController controller;
/**
* @RabbitListener 可以標(biāo)注在類上面礁击,需配合 @RabbitHandler 注解一起使用
* @RabbitListener 標(biāo)注在類上面表示當(dāng)有收到消息的時(shí)候盐杂,就交給 @RabbitHandler 的方法處理逗载,具體使用哪個(gè)方法處理,
* 根據(jù) MessageConverter 轉(zhuǎn)換后的參數(shù)類型
*
* 使用 @Payload 和 @Headers 注解可以消息中的 body 與 headers 信息
*
* 通過 ACK 確認(rèn)是否被正確接收况褪,每個(gè) Message 都要被確認(rèn)(acknowledged)撕贞,可以手動(dòng)去 ACK 或自動(dòng) ACK
*/
@RabbitListener(queues = "test") //指定監(jiān)聽的隊(duì)列名
public void receiver(@Payload Integer userId, @Headers Channel channel, Message message) throws IOException {
LOGGER.info("用戶{}開始搶單", userId);
try {
//處理消息
controller.robbingProduct(userId);
// 確認(rèn)消息已經(jīng)消費(fèi)成功
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
LOGGER.error("消費(fèi)處理異常:{} - {}", userId, e);
// 拒絕當(dāng)前消息,并把消息返回原隊(duì)列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
購(gòu)買商品的方法
public void robbingProduct(Integer userId){
Product product = testDao.selectProductByNo("No123321");
if (product != null && product.getTotal() > 0) {
//更新庫(kù)存表测垛,庫(kù)存量減少1捏膨。返回1說明更新成功。返回0說明庫(kù)存已經(jīng)為0
int i = testDao.updateProduct("No123321");
if(i>0){
//插入記錄
testDao.insertProductRecord(new ProductRecord("No123321", userId));
//發(fā)送短信
LOGGER.info("用戶{}搶單成功", userId);
}else {
LOGGER.error("用戶{}搶單失敗", userId);
}
} else {
LOGGER.error("用戶{}搶單失敗", userId);
}
}
3食侮,jweter工具測(cè)試并發(fā)
jweter
壓力測(cè)試工具如何使用百度吧号涯,這里忽略!
控制臺(tái)打印
而數(shù)據(jù)庫(kù)中的庫(kù)存變成了0
購(gòu)買記錄中存放了搶單成功的用戶id
(100條記錄)
當(dāng)然锯七,剩下的900個(gè)用戶都搶單失敗了链快!
rabbitmq
隊(duì)列是先進(jìn)先出的順序,先來(lái)后到眉尸,1000個(gè)請(qǐng)求你也得給我排隊(duì)域蜗,前100個(gè)請(qǐng)求搶單成功之后就注定了后900個(gè)請(qǐng)求是搶單失敗的!</font>
使用RabbitMQ
的最主要變化就是:以前搶單操作請(qǐng)求直接由我們搶單應(yīng)用程序執(zhí)行噪猾,現(xiàn)在請(qǐng)求被轉(zhuǎn)移到了RabbitMQ
服務(wù)器中霉祸。RabbitMQ
服務(wù)器把接收到的搶單請(qǐng)求進(jìn)行排隊(duì),最后由RabbitMQ
服務(wù)器把搶單請(qǐng)求轉(zhuǎn)發(fā)到我們的搶單應(yīng)用程序袱蜡,這樣的好處就是避免我們的搶單應(yīng)用程序短時(shí)間直接處理大量請(qǐng)求丝蹭。RabbitMQ
服務(wù)器主要作用是減緩搶單應(yīng)用程序的并發(fā)壓力,相當(dāng)于在我們的搶單程序之前加了一道請(qǐng)求緩沖區(qū)坪蚁。</font>
實(shí)戰(zhàn)結(jié)束奔穿!
工程地址:https://github.com/fantongxue666/rabbitmq-seckill
作者:本少爺來(lái)了
鏈接:http://www.reibang.com/p/f5e28df429b8
來(lái)源:簡(jiǎn)書
著作權(quán)歸作者所有。商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán)敏晤,非商業(yè)轉(zhuǎn)載請(qǐng)注明出處贱田。