導(dǎo)讀:當(dāng)資源成為瓶頸時(shí),服務(wù)框架需要對(duì)消費(fèi)者做限流耙蔑,啟動(dòng)流控保護(hù)機(jī)制见妒。流量控制有多種策略,比較常用的有:針對(duì)訪(fǎng)問(wèn)速率的靜態(tài)流控甸陌、針對(duì)資源占用的動(dòng)態(tài)流控须揣、針對(duì)消費(fèi)者并發(fā)連接數(shù)的連接控制和針對(duì)并行訪(fǎng)問(wèn)數(shù)的并發(fā)控制。在實(shí)踐中钱豁,各種流量控制策略需要綜合使用才能起到較好的效果返敬。
在分布式架構(gòu)中,應(yīng)用和應(yīng)用之間的調(diào)用類(lèi)型分為以下兩種寥院,流控方式也略有不同劲赠。
同步RPC類(lèi)調(diào)用,比如RESTful秸谢,Dubbo凛澎,HSF等都屬于該類(lèi)。對(duì)于該類(lèi)同步調(diào)用估蹄,通常限流方式為兩種:針對(duì)服務(wù)提供者的并發(fā)全局流控塑煎,或針對(duì)服務(wù)消費(fèi)者的并發(fā)局部流控。兩種的控制手段類(lèi)似臭蚁,都是通過(guò)限制服務(wù)端或客服端并發(fā)調(diào)用數(shù)來(lái)進(jìn)行限制最铁。
異步MQ類(lèi)調(diào)用,典型如RocketMQ, ? ? ?Kafka垮兑,等冷尉。對(duì)于該類(lèi)異步調(diào)用,通常限流方式是在訂閱端限流系枪。限流方式為兩種:針對(duì)消息訂閱者的并發(fā)流控雀哨,或針對(duì)消息訂閱者的消費(fèi)延時(shí)流控。
針對(duì)消息訂閱者的消費(fèi)延時(shí)流控基本原理是,在每次客戶(hù)端消費(fèi)時(shí)雾棺,可以增加一個(gè)延時(shí)來(lái)控制消費(fèi)速度膊夹,這樣理論消費(fèi)并發(fā)最快速度為:
MaxRate = 1 / ConsumInterval * ConcurrentThreadNumber
比如如果消息并發(fā)消費(fèi)線(xiàn)程為20,延時(shí)為100ms捌浩,則理論上可以將并發(fā)消費(fèi)控制在200以下放刨。具體公式如下:
200 = 1 / 0.1 * 20
相比并發(fā)線(xiàn)程數(shù)流控,消費(fèi)延時(shí)流控優(yōu)點(diǎn)在于實(shí)現(xiàn)相對(duì)簡(jiǎn)單尸饺,對(duì)MQ類(lèi)客戶(hù)端包依賴(lài)較少进统,不需要客戶(hù)端提供控制并發(fā)線(xiàn)程數(shù)的動(dòng)態(tài)調(diào)整接口。
以上各種流量控制方法侵佃,在分布式架構(gòu)下麻昼,如果要做到全局動(dòng)態(tài)控制奠支,一個(gè)簡(jiǎn)單的技術(shù)方法是依賴(lài)配置中心馋辈,即通過(guò)配置中心來(lái)進(jìn)行流控參數(shù)的下發(fā)。
下面章節(jié)詳細(xì)介紹如何基于配置中心來(lái)實(shí)現(xiàn)異步消息消費(fèi)的全局動(dòng)態(tài)流控倍谜。使用的例子為阿里云上的?MQ?(消息隊(duì)列)和?ACM?(應(yīng)用配置管理)兩款產(chǎn)品迈螟。
注:之所以用MQ為示例是因?yàn)樵诒疚淖珜?xiě)之時(shí),正好MQ Consumer Client SDK并不支持動(dòng)態(tài)調(diào)整現(xiàn)成并發(fā)數(shù)尔崔,因此通過(guò)基于ACM來(lái)動(dòng)態(tài)調(diào)整消費(fèi)延遲的方法正好可以解決MQ消費(fèi)流控動(dòng)態(tài)的問(wèn)題答毫。
基于消費(fèi)延時(shí)流控的基本原理
基本原理如下。其中季春,管理員或應(yīng)用程序通過(guò)ACM控制臺(tái)發(fā)布消費(fèi)延時(shí)配置(RCV_INTERVAL_TIME)洗搂,所有MQ消費(fèi)程序訂閱該配置。理論上载弄,該配置從發(fā)布到下發(fā)所有客戶(hù)端耘拇,可以在1秒內(nèi)完成(取決于網(wǎng)絡(luò)演示)。
該章節(jié)基于配置中心來(lái)實(shí)現(xiàn)異步消息消費(fèi)的全局動(dòng)態(tài)流控的代碼示例宇攻。使用的例子為阿里云上的MQ(消息隊(duì)列)和ACM(應(yīng)用配置管理)兩款產(chǎn)品惫叛,基于Java語(yǔ)言。關(guān)于SDK的詳細(xì)介紹逞刷,可參見(jiàn)兩款產(chǎn)品的官方文檔嘉涌。
在ACM上創(chuàng)建消費(fèi)延時(shí)的參數(shù),截屏如下夸浅。
設(shè)置全局消費(fèi)延時(shí)變量
首先仑最,設(shè)置消費(fèi)接收延時(shí)的全局變量, 如下帆喇。
? ? ? ? ?// 初始化消息接收延時(shí)參數(shù)词身,單位為millisecond
? ? ? ? ? ? static int RCV_INTERVAL_TIME = 10000;
? ? ? ? ? ? // 初始化配置服務(wù),控制臺(tái)通過(guò)示例代碼自動(dòng)獲取下面參數(shù)
? ? ? ? ? ? ConfigService.init("acm.aliyun.com", /*租戶(hù)ID*/"xxx", /*AK*/"xxx", /*SK*/"yyy"); ? ?
? ? ? ? ? ? // 主動(dòng)獲取配置
? ? ? ? ? ? String content = ConfigService.getConfig("app.mq.qos", "DEFAULT_GROUP", 6000);
? ? ? ? ? ? Properties p = new Properties();
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? p.load(new StringReader(content));
? ? ? ? ? ? ? ? RCV_INTERVAL_TIME = Integer.valueOf(p.getProperty("RCV_INTERVAL_TIME"));
? ? ? ? ? ? } catch (IOException e) {
? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? }
其次番枚,設(shè)置ACM listener法严,確保當(dāng)配置被修改時(shí)损敷,即使更新 RCV_INTERVAL_TIME 參數(shù), 如下深啤。
// 初始化的時(shí)候拗馒,給配置添加監(jiān)聽(tīng),配置變更會(huì)回調(diào)通知
? ? ? ? ? ? ConfigService.addListener("app.mq.qos", "DEFAULT_GROUP", new ConfigChangeListener() {
? ? ? ? ? ? ? ? public void receiveConfigInfo(String configInfo) {
? ? ? ? ? ? ? ? ? ? Properties p = new Properties();
? ? ? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? ? ? p.load(new StringReader(configInfo));
? ? ? ? ? ? ? ? ? ? ? ? RCV_INTERVAL_TIME = Integer.valueOf(p.getProperty("RCV_INTERVAL_TIME"));
? ? ? ? ? ? ? ? ? ? } catch (IOException e) {
? ? ? ? ? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? });
設(shè)置 MQ 消費(fèi)延時(shí)邏輯
完整實(shí)例如下。
注:這里 RCV_INTERVAL_TIME 參數(shù)的訪(fǎng)問(wèn)是故意沒(méi)有加鎖的溯街,讀者可以自行思考原因诱桂。Aliyun ONS Client不提供動(dòng)態(tài)線(xiàn)程并發(fā)數(shù),默認(rèn)并發(fā)為20呈昔。因此這里正好使用消費(fèi)延時(shí)參數(shù)來(lái)動(dòng)態(tài)調(diào)節(jié)QoS挥等。
?//以下代碼可直接貼在Main()函數(shù)里
? ? ? ? Properties properties = new Properties();
? ? ? ? properties.put(PropertyKeyConst.ConsumerId, "CID_consumer_group");
? ? ? ? properties.put(PropertyKeyConst.AccessKey,"xxx");
? ? ? ? properties.put(PropertyKeyConst.SecretKey, "yyy");
? ? ? ? properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
? ? ? ? // 設(shè)置 TCP 接入域名(此處以公共云生產(chǎn)環(huán)境為例)
? ? ? ? properties.put(PropertyKeyConst.ONSAddr,
? ? ? ? ? "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
? ? ? ? Consumer consumer = ONSFactory.createConsumer(properties);
? ? ? ? consumer.subscribe(/*Topic*/"topic-name", /*Tag*/null, new MessageListener()?
? ? ? ? {
? ? ? ? ? ? public Action consume(Message message, ConsumeContext context) {
? ? ? ? ? ? ? ? // MQ Subscribe QoS logical start,?
? ? ? ? ? ? ? ? // Each consuming process will sleep for RCV_INTERVAL_TIME seconds with 100 ms sleeping cycle.
? ? ? ? ? ? ? ? // Within each cycle, the thread will check RCV_INTERVAL_TIME in case it's set to a smaller value.?
? ? ? ? ? ? ? ? // RCV_INTERVAL_TIME <= 0 means no sleeping.
? ? ? ? ? ? ? ? int rcvIntervalTimeLeft = RCV_INTERVAL_TIME;
? ? ? ? ? ? ? ? while (rcvIntervalTimeLeft > 0) {
? ? ? ? ? ? ? ? ? ? if (rcvIntervalTimeLeft > RCV_INTERVAL_TIME) {
? ? ? ? ? ? ? ? ? ? ? ? rcvIntervalTimeLeft = RCV_INTERVAL_TIME;
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? ? ? if (rcvIntervalTimeLeft >= 100) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? rcvIntervalTimeLeft -= 100;
? ? ? ? ? ? ? ? ? ? ? ? ? ? Thread.sleep(100);
? ? ? ? ? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? ? ? ? ? Thread.sleep(rcvIntervalTimeLeft);
? ? ? ? ? ? ? ? ? ? ? ? ? ? rcvIntervalTimeLeft = 0;
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? // MQ Subscribe interval logical ends
? ? ? ? ? ? ? ? System.out.println("Receive: " + message);
? ? ? ? ? ? ? ? /*
? ? ? ? ? ? ? ? ?* Put your business logic here.
? ? ? ? ? ? ? ? ?*/
? ? ? ? ? ? ? ? doSomething();
? ? ? ? ? ? ? ? return Action.CommitMessage;
? ? ? ? ? ? }
? ? ? ? });
? ? ? ? consumer.start();
運(yùn)行結(jié)果
單機(jī)運(yùn)行consumer進(jìn)行消費(fèi),假設(shè)queue內(nèi)的消息無(wú)限多堤尾,不存在消費(fèi)萬(wàn)的情況肝劲,分三段測(cè)試,分別運(yùn)行約5分鐘,通過(guò)ACM配置推送來(lái)達(dá)到以下效果郭宝。
RCV_INTERVAL_TIME ? ? ?= 100 ms
RCV_INTERVAL_TIME ? ? ?= 5000 ms
RCV_INTERVAL_TIME ? ? ?= 1000 ms
結(jié)果如下辞槐,在單MQ消費(fèi)業(yè)務(wù)處理耗時(shí)約100ms情況下的,單機(jī)并發(fā)20線(xiàn)程的測(cè)試結(jié)果粘室。
RCV_INTERVAL_TIME? = 100 ms:平均消費(fèi)性能約為 9000 tpm 左右
RCV_INTERVAL_TIME? = 5000 ms:平均消費(fèi)性能被限制到了 200 tpm 左右
RCV_INTERVAL_TIME? = 1000 ms:平均消費(fèi)性能回升到到了 1100 tpm 左右
以上結(jié)果基本達(dá)到消費(fèi)和 tpm 成反比的預(yù)期榄檬,最關(guān)鍵的是整個(gè)過(guò)程中,應(yīng)用不中斷衔统,流控推送結(jié)果秒級(jí)生效到分布式集群鹿榜。單機(jī)性能結(jié)果如下所示。
相關(guān)產(chǎn)品詳情請(qǐng)參見(jiàn):
? 消息產(chǎn)品
Aliyun MQ:aliyun.com/product/ons
? 配置中心產(chǎn)品
Aliyun ACM:aliyun.com/product/acm