高并發(fā):RocketMQ 削峰實戰(zhàn)谬盐!

作者:WilsonHe
鏈接: https://juejin.im/post/5ea159e4f265da47f0794da5

MQ的主要特點為解耦甸私、異步削峰飞傀,該文章主要記錄與分享個人在實際項目中的RocketMQ削峰用法皇型,用于減少數(shù)據(jù)庫壓力的業(yè)務(wù)場景,其中RocketMQ的核心組件概念如下:

  • Producer:生產(chǎn)發(fā)送消息
  • Broker:存儲Producer發(fā)送過來的消息
  • Consumer:從Broker拉取消息并進(jìn)行消費
  • NameServer:為Producer或Consumer路由到Broker
    image

其中消費流程有以下幾點是必須注意的:

  • RocketMQ的Consumer獲取消息是通過向Broker發(fā)送拉取請求獲取的砸烦,而不是由Broker發(fā)送Consumer接收的方式弃鸦。
  • Consumer每次拉取消息時消息都會被均勻分發(fā)到消息隊列再進(jìn)行傳輸,所以RocketMQ中的很多參數(shù)都是針對隊列而不是Topic的(這個是重點幢痘,順便吐槽下源碼的文檔講的真不清晰唬格,很多都需要自己試錯,但Dashboard做得很好)颜说,其中每個Broker消息隊列(ConsumeQueue)的數(shù)量都可以通過RocketMQ DashBoard實時更改調(diào)整购岗。

rocketmq-spring-boot-starter用法簡介

當(dāng)開發(fā)中需要快速集成RocketMQ時可以考慮使用 rocketmq-spring-boot-starter 搭建RocketMQ的集成環(huán)境,但該框架并不完全具備RocketMQ所有的配置簡化脑沿,如需批量消費消息便需要自定義一個DefaultMQPushConsumer bean去消費了藕畔。個人在開發(fā)中常用的rocketmq-spring-boot-starter相關(guān)類:

  • RocketMQListener接口:消費者都需實現(xiàn)該接口的消費方法onMessage(msg)
  • RocketMQPushConsumerLifecycleListener接口:當(dāng)@RocketMQMessageListener中的配置不足以滿足我們的需求時庄拇,可以實現(xiàn)該接口直接更改消費者類DefaultMQPushConsumer配置
  • @RocketMQMessageListener:被該注解標(biāo)注并實現(xiàn)了接口RocketMQListener的bean為一個消費者并監(jiān)聽指定topic隊列中的消息注服,該注解中包含消費者的一些常用配置(大部分按默認(rèn)即可),一般只需更改consumerGroup(消費組)與topic措近。RocketMQMessageListener中的屬性配置是可以使用Placeholder(占位符)從配置文件或配置中心獲取的溶弟,如下圖:
    image

業(yè)務(wù)案例

有一個點贊業(yè)務(wù),不限制用戶的點贊數(shù)只需進(jìn)行記錄(產(chǎn)品需求瞭郑,開發(fā)提議無效)辜御,當(dāng)每個用戶都進(jìn)行x連擊享受數(shù)量猛增的快感時如果數(shù)據(jù)庫都需要進(jìn)行x個點贊數(shù)據(jù)的插入,數(shù)據(jù)庫毫無疑問會塞死導(dǎo)致崩潰屈张。于是想到可以嘗試下MQ削峰擒权,比如每秒來了5000消息但數(shù)據(jù)庫只能承受2000袱巨,那我消費時每次只拉取消費1600就好了,剩下的放在Broker堆積慢慢消費就好碳抄。由于之前的消息中心也在用RocketMQ愉老,于是確認(rèn)使用RocketMQ來進(jìn)行削峰。
image

環(huán)境配置

文章例子環(huán)境:1NameServer + 2Broker + 1Consumer

添加maven依賴

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>

application.yml配置

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: praise-group
server:
  port: 10000

spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    username: root
    password: tiger
    url: jdbc:mysql://localhost:3306/wilson
swagger:
  docket:
    base-package: io.rocket.consumer.controller

點贊接口

PraiseRecord(點贊記錄):

@Data
public class PraiseRecord implements Serializable {
    private Long id;
    private Long uid;
    private Long liveId;
    private LocalDateTime createTime;
}

MessageController(簡單的測試接口):

RestController
@RequestMapping("/message")
public class MessageController {
    @Resource
    private RocketMQTemplate rocketMQTemplate;

    @PostMapping("/praise")
    public ServerResponse praise(@RequestBody PraiseRecordVO vo) {
        rocketMQTemplate.sendOneWay(RocketConstant.Topic.PRAISE_TOPIC, MessageBuilder.withPayload(vo).build());
        return ServerResponse.success();
    }

    // ......

}

由于用戶可以連續(xù)點贊剖效,所以考慮可以在點贊消息的處理上寬松一點(容許消息丟失)以追求更高的性能嫉入,因此選擇使用sendOneyWay()進(jìn)行消息發(fā)送。

RocketMQ的消息發(fā)送方式主要含syncSend()同步發(fā)送璧尸、asyncSend()異步發(fā)送咒林、sendOneWay()三種方式,sendOneWay()也是異步發(fā)送爷光,區(qū)別在于不需等待Broker返回確認(rèn)垫竞,所以可能會存在信息丟失的狀況,但吞吐量更高蛀序,具體需根據(jù)業(yè)務(wù)情況選用件甥。性能:sendOneWay > asyncSend > syncSend RocketMQTemplate的send()方法默認(rèn)是同步(syncSend)的,更多可看源碼實現(xiàn)。

PraiseListener:點贊消息消費者

@Service
@RocketMQMessageListener(topic = RocketConstant.Topic.PRAISE_TOPIC, consumerGroup = RocketConstant.ConsumerGroup.PRAISE_CONSUMER)
@Slf4j
public class PraiseListener implements RocketMQListener<PraiseRecordVO>, RocketMQPushConsumerLifecycleListener {
    @Resource
    private PraiseRecordService praiseRecordService;

    @Override
    public void onMessage(PraiseRecordVO vo) {
        praiseRecordService.insert(vo.copyProperties(PraiseRecord::new));
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        // 每次拉取的間隔哼拔,單位為毫秒
        consumer.setPullInterval(2000);
        // 設(shè)置每次從隊列中拉取的消息數(shù)為16
        consumer.setPullBatchSize(16);
    }
}

單次pull消息的最大數(shù)目受broker存儲的MessageStoreConfig.maxTransferCountOnMessageInMemory(默認(rèn)為32)值限制,即若想要消費者從隊列拉取的消息數(shù)大于32有效(pullBatchSize>32)則需更改Broker的啟動參數(shù)maxTransferCountOnMessageInMemory值瓣颅。在MQ削峰的配置參數(shù)里倦逐,以下幾個DefaultMQPushConsumer的參數(shù)是需要注意一下的:

  • pullInterval:每次從Broker拉取消息的間隔,單位為毫秒
  • pullBatchSize:每次從Broker隊列拉取到的消息數(shù)宫补,該參數(shù)很容易讓人誤解檬姥,一開始我以為是每次拉取的消息總數(shù),但測試過幾次后確認(rèn)了實質(zhì)上是從每個隊列的拉取數(shù)(源碼上的注釋文檔真的很差粉怕,跟沒有一樣)健民,即Consume每次拉取的消息總數(shù)如下:EachPullTotal=所有Broker上的寫隊列數(shù)和(writeQueueNums=readQueueNums) * pullBatchSize
  • consumeMessageBatchMaxSize:每次消費(即將多條消息合并為List消費)的最大消息數(shù)目,默認(rèn)值為1贫贝,rocketmq-spring-boot-starter 目前不支持批量消費(2.1.0版本)

在消費者開始消息消費時會先從各隊列中拉取一條消息進(jìn)行消費秉犹,消費成功后再以每次pullBatchSize的數(shù)目進(jìn)行拉取。PraiseListener中設(shè)置了每次拉取的間隔為2s稚晚,每次從隊列拉取的消息數(shù)為16崇堵,在搭建了2master broker且broker上writeQueueNums=readQueueNums=4的環(huán)境下每次拉取的消息理論數(shù)值為16 * 2 * 4 = 128,在第一次從各隊列拉取1條消息(即共8條)后消費成功后會每次就會拉取最多128條消息進(jìn)行消費客燕,想驗證下的可以把onMessage()的insert()改為log.info("1")然后統(tǒng)計單位秒內(nèi)打印的日志數(shù)是否為128鸳劳。
image

根據(jù)以上配置單Conumer情況下每2s理論消費為128,即每2秒數(shù)據(jù)庫新增的點贊數(shù)據(jù)大概為128條左右也搓,有20%偏差都在個人可接受范圍內(nèi)赏廓,然后對點贊接口進(jìn)行簡單壓測1s 2000請求校驗MQ效果涵紊,根據(jù)消費配置理論上需要16次拉取即需32s才能消費完,壓測后查看數(shù)據(jù)庫校驗效果:
image
image

由上圖可以看出除第一次2s和最后一次2s外數(shù)據(jù)庫每2s的插入數(shù)據(jù)數(shù)和一般都在128附近波動幔摸,也用了34s(因第一次拉取數(shù)較少所以比理論多花費一次拉取)消費的偏差大小可能會受每次拉取數(shù)pullBatchSize摸柄、Broker上的消息隊列數(shù)、網(wǎng)絡(luò)波動等情況影響抚太,但需要的目的已經(jīng)達(dá)到了塘幅,我只想把單位時間內(nèi)過多的數(shù)據(jù)庫操作交給MQ做分隔成多個單位時間內(nèi)的小批量操作,消息過多就堆積尿贫,當(dāng)請求峰值過了后直到MQ堆積的消息消費完前數(shù)據(jù)庫的插入數(shù)依舊會與峰值期的插入數(shù)相差不大电媳,達(dá)到了MQ削峰填谷的效果。

上線了但消費效率預(yù)估失誤如何動態(tài)更改消費效率 庆亡?

當(dāng)把拉取數(shù)pullBatchSize設(shè)置Broker的默認(rèn)最大傳輸值32了匾乓,線上又不想重啟Broker更改maxTransferCountOnMessageInMemory參數(shù),如有2個Broker且queue都為4又谋,那么拉取消費效率才為32 * 2 * 4 = 256拼缝,如果想要動態(tài)調(diào)整,可以從Broker數(shù)或Broker隊列數(shù)下手彰亥,可以將Broker的writeQueueNums咧七、readQueueNums增大,如都改為8任斋,那么效率就成了32 * 2 * 8 = 512继阻。需要注意的是更改完queues后必須去Dashboard的Topic下的CONSUMER MANAGER查看新增的隊列上是否都有Consumer成功注冊上去了,因為遇到了在測試與生產(chǎn)上使用rocketmq-spring-boot-starter @RocketMQListener標(biāo)注消費者不會自動注冊到新隊列上的情況废酷,但沒排除是不是RocketMQ版本的原因(個人本地的版本比環(huán)境上的高了一個小版本0.0.1瘟檩,本地沒出現(xiàn)沒消費者注冊到新隊列上的問題),而是使用了自定義DefaultMQPushConsumer bean(原生的方式都是沒有問題的)的備用方案澈蟆。當(dāng)再啟動新的消費者應(yīng)用時CONSUMER MANAGER(下圖)中就會出現(xiàn) 新Consumer數(shù) * 各Broker隊列數(shù)和的隊列行墨辛。
image

如何使用RocketMQ批量消費 ?

雖然點贊業(yè)務(wù)使用MQ單條插入后TPS已經(jīng)達(dá)到當(dāng)前業(yè)務(wù)指標(biāo)要求了趴俘,但考慮到如果后續(xù)要求在不添加機器數(shù)的情況下增加TPS睹簇,且數(shù)據(jù)量還沒到分庫分表的程度,個人就打算從批量消費下手哮幢,由一次插入一條點贊記錄改為一次性插入多條(insertBatch)带膀。當(dāng)然能滿足現(xiàn)有需求能不做肯定不做的,過度優(yōu)化過分礙事橙垢,但想多點方案不會壞事垛叨。rocketmq-spring-boot-starter并沒有提供批量消費的功能,所以要批量消費消息需要自定義DefaultMQPushConsumer并配置其consumeMessageBatchMaxSize屬性。consumeMessageBatchMaxSize屬性默認(rèn)值為1嗽元,即每次只消費一條消息敛纲,需要注意的是該屬性也會受pullBatchSize影響,如果consumeMessageBatchMaxSize為32但pullBatchSize只為12剂癌,那么每次批量消費的最大消息數(shù)也就只有12淤翔。如下為個人測試批量消費Consumer的測試bean:

@Bean
public DefaultMQPushConsumer userMQPushConsumer() throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketConstant.ConsumerGroup.SPRING_BOOT_USER_CONSUMER);
    consumer.setNamesrvAddr(nameServer);
    consumer.subscribe(RocketConstant.Topic.SPRING_BOOT_USER_TOPIC, "*");
    // 設(shè)置每次消息拉取的時間間隔,單位毫秒
    consumer.setPullInterval(1000);
    // 設(shè)置每個隊列每次拉取的最大消息數(shù)
    consumer.setPullBatchSize(24);
    // 設(shè)置消費者單次批量消費的消息數(shù)目上限
    consumer.setConsumeMessageBatchMaxSize(12);
    consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context)
            -> {
        List<UserInfo> userInfos = new ArrayList<>(msgs.size());
        Map<Integer, Integer> queueMsgMap = new HashMap<>(8);
        msgs.forEach(msg -> {
            userInfos.add(JSONObject.parseObject(msg.getBody(), UserInfo.class));
            queueMsgMap.compute(msg.getQueueId(), (key, val) -> val == null ? 1 : ++val);
        });
        log.info("userInfo size: {}, content: {}", userInfos.size(), userInfos);
        /*
          處理批量消息佩谷,如批量插入:userInfoMapper.insertBatch(userInfos);
         */
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    consumer.start();
    return consumer;
}

如果默認(rèn)配置情況下log打印出的userInfo size恒為1旁壮,但由于設(shè)置了consumeMessageBatchMaxSizepullBatchSize,且pullBatchSize較小谐檀,所以每次消費的消息數(shù)最大值為12抡谐,如下圖:

image

附本文相關(guān)信息

  • 確保mqnamesrv與mqbroker已啟動成功,如該文章環(huán)境的啟動:
mqnamesrv -n 127.0.0.1:9876
mqbroker -c E:\RocketMQ\rocketmq-all-4.5.2-bin-release\bin\2m-noslave\broker-a.properties
mqbroker -c E:\RocketMQ\rocketmq-all-4.5.2-bin-release\bin\2m-noslave\broker-b.properties
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市溃肪,隨后出現(xiàn)的幾起案子免胃,更是在濱河造成了極大的恐慌,老刑警劉巖惫撰,帶你破解...
    沈念sama閱讀 219,427評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件羔沙,死亡現(xiàn)場離奇詭異,居然都是意外死亡厨钻,警方通過查閱死者的電腦和手機撬碟,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,551評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來莉撇,“玉大人,你說我怎么就攤上這事惶傻」骼桑” “怎么了?”我有些...
    開封第一講書人閱讀 165,747評論 0 356
  • 文/不壞的土叔 我叫張陵银室,是天一觀的道長涂佃。 經(jīng)常有香客問我,道長蜈敢,這世上最難降的妖魔是什么辜荠? 我笑而不...
    開封第一講書人閱讀 58,939評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮抓狭,結(jié)果婚禮上伯病,老公的妹妹穿的比我還像新娘。我一直安慰自己否过,他們只是感情好午笛,可當(dāng)我...
    茶點故事閱讀 67,955評論 6 392
  • 文/花漫 我一把揭開白布惭蟋。 她就那樣靜靜地躺著,像睡著了一般药磺。 火紅的嫁衣襯著肌膚如雪告组。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,737評論 1 305
  • 那天癌佩,我揣著相機與錄音木缝,去河邊找鬼。 笑死围辙,一個胖子當(dāng)著我的面吹牛我碟,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播酌畜,決...
    沈念sama閱讀 40,448評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼怎囚,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了桥胞?” 一聲冷哼從身側(cè)響起恳守,我...
    開封第一講書人閱讀 39,352評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎贩虾,沒想到半個月后催烘,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,834評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡缎罢,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,992評論 3 338
  • 正文 我和宋清朗相戀三年伊群,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片策精。...
    茶點故事閱讀 40,133評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡舰始,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出咽袜,到底是詐尸還是另有隱情丸卷,我是刑警寧澤,帶...
    沈念sama閱讀 35,815評論 5 346
  • 正文 年R本政府宣布询刹,位于F島的核電站谜嫉,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏凹联。R本人自食惡果不足惜沐兰,卻給世界環(huán)境...
    茶點故事閱讀 41,477評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望蔽挠。 院中可真熱鬧住闯,春花似錦、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,022評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至春寿,卻和暖如春朗涩,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背绑改。 一陣腳步聲響...
    開封第一講書人閱讀 33,147評論 1 272
  • 我被黑心中介騙來泰國打工谢床, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人厘线。 一個月前我還...
    沈念sama閱讀 48,398評論 3 373
  • 正文 我出身青樓识腿,卻偏偏與公主長得像,于是被迫代替她去往敵國和親造壮。 傳聞我的和親對象是個殘疾皇子渡讼,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,077評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 作者:WilsonHe鏈接: https://juejin.im/post/5ea159e4f265da47f07...
    碼農(nóng)小光閱讀 2,447評論 0 26
  • RocketMQ4.X JMS Java消息服務(wù)(Java Message Service),Java平臺中關(guān)于面...
    方穹軒閱讀 707評論 0 1
  • 最近 RocketMQ 剛剛上生產(chǎn)環(huán)境成箫,閑暇之時在這里做一些分享,主要目的是讓初學(xué)者能快速上手RocketMQ旨枯。 ...
    云原生實戰(zhàn)閱讀 243,103評論 38 240
  • 久違的晴天蹬昌,家長會。 家長大會開好到教室時攀隔,離放學(xué)已經(jīng)沒多少時間了皂贩。班主任說已經(jīng)安排了三個家長分享經(jīng)驗。 放學(xué)鈴聲...
    飄雪兒5閱讀 7,523評論 16 22
  • 今天感恩節(jié)哎昆汹,感謝一直在我身邊的親朋好友明刷。感恩相遇!感恩不離不棄满粗。 中午開了第一次的黨會遮精,身份的轉(zhuǎn)變要...
    迷月閃星情閱讀 10,567評論 0 11