springboot集成rabbitmq商品秒殺業(yè)務(wù)實(shí)戰(zhàn)(流量削峰)

消息隊(duì)列如何實(shí)現(xiàn)流量削峰未檩?

要對(duì)流量進(jìn)行削峰戴尸,最容易想到的解決方案就是用消息隊(duì)列來(lái)緩沖瞬時(shí)流量,把同步的直接調(diào)用轉(zhuǎn)換成異步的間接推送讹挎,中間通過一個(gè)隊(duì)列在一端承接瞬時(shí)的流量洪峰校赤,在另一端平滑地將消息推送出去。

image

這里就不講springbootrabbitmq如何集成了筒溃,參考文章https://www.cnblogs.com/fantongxue/p/12493497.html

一,準(zhǔn)備工作:

數(shù)據(jù)庫(kù)有一張商品表沾乘,庫(kù)存量是100×保現(xiàn)在有1000個(gè)消費(fèi)者準(zhǔn)備開搶這100個(gè)庫(kù)存。
t_product表維護(hù)商品編號(hào)與商品庫(kù)存剩余數(shù)量翅阵。編號(hào)No123321的這種商品的庫(kù)存量有100個(gè)歪玲。

image

t_product_record維護(hù)搶到商品的用戶ID。理論上t_product表開搶后的 記錄數(shù)量應(yīng)該是100條(共有100個(gè)人搶到了商品)掷匠。

image

我們使用壓力測(cè)試工具jweter對(duì)其進(jìn)行并發(fā)性測(cè)試滥崩。

二,springboot開始集成rabbitmq

1讹语,加入amqp的依賴

        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
        </dependency>

2钙皮,配置application.yml配置文件

spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC
    username: root
    password: 1234

  rabbitmq:
    host: 101.201.101.206
    username: guest
    password: guest
    publisher-confirms: true  # 開啟Rabbitmq發(fā)送消息確認(rèn)機(jī)制,發(fā)送消息到隊(duì)列并觸發(fā)回調(diào)方法
    publisher-returns: true
    listener:
      simple:
        concurrency: 10 #消費(fèi)者數(shù)量
        max-concurrency: 10 #最大消費(fèi)者數(shù)量
        prefetch: 1 #限流(消費(fèi)者每次從隊(duì)列獲取的消息數(shù)量)
        auto-startup: true  #啟動(dòng)時(shí)自動(dòng)啟動(dòng)容器
        acknowledge-mode: manual #開啟ACK手動(dòng)確認(rèn)模式

3,RabbitConfig配置類

1短条, 定義消息轉(zhuǎn)換實(shí)例 导匣,轉(zhuǎn)化成 JSON傳輸
2 , 配置啟用rabbitmq事務(wù)

package com.aaa.springredis.controller;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class RabbitConfig {

    /**
     * 定義消息轉(zhuǎn)換實(shí)例 茸时,轉(zhuǎn)化成 JSON傳輸
     *
     * @return Jackson2JsonMessageConverter
     */
    @Bean
    public MessageConverter integrationEventMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 配置啟用rabbitmq事務(wù)
     *
     * @param connectionFactory connectionFactory
     * @return RabbitTransactionManager
     */
    @Bean
    public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }
}

4贡定,初始化rabbitmq的回調(diào)函數(shù)

說明:被@PostConstruct修飾的方法會(huì)在服務(wù)器加載Servlet的時(shí)候運(yùn)行,并且只會(huì)被服務(wù)器執(zhí)行一次可都。如果想在生成對(duì)象時(shí)完成某些初始化操作缓待,而偏偏這些初始化操作又依賴于依賴注入,那么久無(wú)法在構(gòu)造函數(shù)中實(shí)現(xiàn)渠牲。為此命斧,可以使用@PostConstruct注解一個(gè)方法來(lái)完成初始化,@PostConstruct注解的方法將會(huì)在依賴注入完成后被自動(dòng)調(diào)用嘱兼。</font>

回調(diào)函數(shù)的使用前提是配置文件中開啟了rabitmq消息確認(rèn)機(jī)制

image

Constructor >> @Autowired >> @PostConstruct

      @Autowired
        RabbitTemplate rabbitTemplate;

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitController.class);

    @PostConstruct
    private void init(){
        /**
         * 消息發(fā)送到交換器Exchange后觸發(fā)回調(diào)国葬。
         * 使用該功能需要開啟確認(rèn),spring-boot中配置如下:
         * spring.rabbitmq.publisher-confirms = true
         */
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean b, String s) {
            if (b) {
                System.out.println("消息已確認(rèn) cause:{"+s+"} - {"+correlationData+"}");
            } else {
                System.out.println("消息未確認(rèn) cause:{"+s+"} - {"+correlationData+"}");
            }
        }
    });
        /**
         * 通過實(shí)現(xiàn)ReturnCallback接口芹壕,
         * 如果消息從交換器發(fā)送到對(duì)應(yīng)隊(duì)列失敗時(shí)觸發(fā)
         * 比如根據(jù)發(fā)送消息時(shí)指定的routingKey找不到隊(duì)列時(shí)會(huì)觸發(fā)
         * 使用該功能需要開啟確認(rèn)汇四,spring-boot中配置如下:
         * spring.rabbitmq.publisher-returns = true
         */
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                LOGGER.error("消息被退回:{}", message);
                LOGGER.error("消息使用的交換機(jī):{}", exchange);
                LOGGER.error("消息使用的路由鍵:{}", routingKey);
                LOGGER.error("描述:{}", replyText);
            }
        });
    }

三,開始測(cè)試

1踢涌,寫搶單測(cè)試類

寫搶單測(cè)試類通孽,我們使用jweter壓力測(cè)試工具開啟1000個(gè)線程進(jìn)行測(cè)試(開啟多線程并發(fā)測(cè)試),所以為了區(qū)別每一個(gè)模擬的用戶睁壁,使用userId累加的方式進(jìn)行區(qū)分背苦。

        private int userId=0;
        //開始搶單
        @RequestMapping("/begin")
        @ResponseBody
        public void begin(){
           userId++;
           this.send(userId);
        }

而上面的send方法就是把接收到的用戶請(qǐng)求發(fā)送到rabbitmq消息中間件中。

        @RequestMapping("/send")
        @ResponseBody
        public String send(Integer messge){
                //第一個(gè)參數(shù):交換機(jī)名字  第二個(gè)參數(shù):Routing Key的值  第三個(gè)參數(shù):傳遞的消息對(duì)象
                rabbitTemplate.convertAndSend("test.direct","test",messge);
                return "發(fā)送消息成功";
        }

2潘明,配置rabbitmq監(jiān)聽方法

rabbitmq監(jiān)聽上篇文章也說過了行剂,作用就是監(jiān)聽指定隊(duì)列中收到來(lái)自交換機(jī)的消息,來(lái)一條收一條钳降,收完為止厚宰!
通過ACK 確認(rèn)是否被正確接收,每個(gè)Message都要被確認(rèn)遂填,可以手動(dòng)去 ACK或自動(dòng)ACK铲觉,如果信息消費(fèi)失敗的話會(huì)拒絕當(dāng)前消息,并把消息返回原隊(duì)列吓坚。

從隊(duì)列中收到用戶的userId撵幽,然后進(jìn)行購(gòu)買商品模擬操作(減少一個(gè)庫(kù)存,新增一條購(gòu)買記錄)

    @Autowired
    RabbitController controller;

    /**
     * @RabbitListener 可以標(biāo)注在類上面礁击,需配合 @RabbitHandler 注解一起使用
     * @RabbitListener 標(biāo)注在類上面表示當(dāng)有收到消息的時(shí)候盐杂,就交給 @RabbitHandler 的方法處理逗载,具體使用哪個(gè)方法處理,
     * 根據(jù) MessageConverter 轉(zhuǎn)換后的參數(shù)類型
     *
     * 使用 @Payload 和 @Headers 注解可以消息中的 body 與 headers 信息
     *
     * 通過 ACK 確認(rèn)是否被正確接收况褪,每個(gè) Message 都要被確認(rèn)(acknowledged)撕贞,可以手動(dòng)去 ACK 或自動(dòng) ACK
     */
    @RabbitListener(queues = "test") //指定監(jiān)聽的隊(duì)列名
    public void receiver(@Payload Integer userId, @Headers Channel channel, Message message) throws IOException {
        LOGGER.info("用戶{}開始搶單", userId);
        try {
            //處理消息
            controller.robbingProduct(userId);
//             確認(rèn)消息已經(jīng)消費(fèi)成功
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            LOGGER.error("消費(fèi)處理異常:{} - {}", userId, e);
//             拒絕當(dāng)前消息,并把消息返回原隊(duì)列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }

購(gòu)買商品的方法

 public void robbingProduct(Integer userId){
                Product product = testDao.selectProductByNo("No123321");
                if (product != null && product.getTotal() > 0) {
                    //更新庫(kù)存表测垛,庫(kù)存量減少1捏膨。返回1說明更新成功。返回0說明庫(kù)存已經(jīng)為0
                    int i = testDao.updateProduct("No123321");
                    if(i>0){
                        //插入記錄
                        testDao.insertProductRecord(new ProductRecord("No123321", userId));
                        //發(fā)送短信
                        LOGGER.info("用戶{}搶單成功", userId);
                    }else {
                        LOGGER.error("用戶{}搶單失敗", userId);
                    }
                } else {
                    LOGGER.error("用戶{}搶單失敗", userId);
                }

        }

3食侮,jweter工具測(cè)試并發(fā)

jweter壓力測(cè)試工具如何使用百度吧号涯,這里忽略!
控制臺(tái)打印

image
image

而數(shù)據(jù)庫(kù)中的庫(kù)存變成了0

image

購(gòu)買記錄中存放了搶單成功的用戶id(100條記錄)

image

當(dāng)然锯七,剩下的900個(gè)用戶都搶單失敗了链快!

rabbitmq隊(duì)列是先進(jìn)先出的順序,先來(lái)后到眉尸,1000個(gè)請(qǐng)求你也得給我排隊(duì)域蜗,前100個(gè)請(qǐng)求搶單成功之后就注定了后900個(gè)請(qǐng)求是搶單失敗的!</font>

使用RabbitMQ的最主要變化就是:以前搶單操作請(qǐng)求直接由我們搶單應(yīng)用程序執(zhí)行噪猾,現(xiàn)在請(qǐng)求被轉(zhuǎn)移到了RabbitMQ服務(wù)器中霉祸。RabbitMQ服務(wù)器把接收到的搶單請(qǐng)求進(jìn)行排隊(duì),最后由RabbitMQ服務(wù)器把搶單請(qǐng)求轉(zhuǎn)發(fā)到我們的搶單應(yīng)用程序袱蜡,這樣的好處就是避免我們的搶單應(yīng)用程序短時(shí)間直接處理大量請(qǐng)求丝蹭。RabbitMQ服務(wù)器主要作用是減緩搶單應(yīng)用程序的并發(fā)壓力,相當(dāng)于在我們的搶單程序之前加了一道請(qǐng)求緩沖區(qū)坪蚁。</font>

實(shí)戰(zhàn)結(jié)束奔穿!
工程地址:https://github.com/fantongxue666/rabbitmq-seckill

作者:本少爺來(lái)了
鏈接:http://www.reibang.com/p/f5e28df429b8
來(lái)源:簡(jiǎn)書
著作權(quán)歸作者所有。商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán)敏晤,非商業(yè)轉(zhuǎn)載請(qǐng)注明出處贱田。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市茵典,隨后出現(xiàn)的幾起案子湘换,更是在濱河造成了極大的恐慌,老刑警劉巖统阿,帶你破解...
    沈念sama閱讀 218,546評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異筹我,居然都是意外死亡扶平,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,224評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門蔬蕊,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)结澄,“玉大人哥谷,你說我怎么就攤上這事÷橄祝” “怎么了们妥?”我有些...
    開封第一講書人閱讀 164,911評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)勉吻。 經(jīng)常有香客問我监婶,道長(zhǎng),這世上最難降的妖魔是什么齿桃? 我笑而不...
    開封第一講書人閱讀 58,737評(píng)論 1 294
  • 正文 為了忘掉前任惑惶,我火速辦了婚禮,結(jié)果婚禮上短纵,老公的妹妹穿的比我還像新娘带污。我一直安慰自己,他們只是感情好香到,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,753評(píng)論 6 392
  • 文/花漫 我一把揭開白布鱼冀。 她就那樣靜靜地躺著,像睡著了一般悠就。 火紅的嫁衣襯著肌膚如雪千绪。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,598評(píng)論 1 305
  • 那天理卑,我揣著相機(jī)與錄音翘紊,去河邊找鬼。 笑死藐唠,一個(gè)胖子當(dāng)著我的面吹牛帆疟,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播宇立,決...
    沈念sama閱讀 40,338評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼踪宠,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了妈嘹?” 一聲冷哼從身側(cè)響起柳琢,我...
    開封第一講書人閱讀 39,249評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎润脸,沒想到半個(gè)月后柬脸,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,696評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡毙驯,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,888評(píng)論 3 336
  • 正文 我和宋清朗相戀三年倒堕,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片爆价。...
    茶點(diǎn)故事閱讀 40,013評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡垦巴,死狀恐怖媳搪,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情骤宣,我是刑警寧澤秦爆,帶...
    沈念sama閱讀 35,731評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站憔披,受9級(jí)特大地震影響等限,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜活逆,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,348評(píng)論 3 330
  • 文/蒙蒙 一精刷、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧蔗候,春花似錦怒允、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,929評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至所灸,卻和暖如春丽惶,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背爬立。 一陣腳步聲響...
    開封第一講書人閱讀 33,048評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工钾唬, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人侠驯。 一個(gè)月前我還...
    沈念sama閱讀 48,203評(píng)論 3 370
  • 正文 我出身青樓抡秆,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親吟策。 傳聞我的和親對(duì)象是個(gè)殘疾皇子儒士,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,960評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容