解鎖新姿勢(shì) | 如何用配置中心實(shí)現(xiàn)全局動(dòng)態(tài)流控晦毙?

導(dǎo)讀:當(dāng)資源成為瓶頸時(shí),服務(wù)框架需要對(duì)消費(fèi)者做限流耙蔑,啟動(dòng)流控保護(hù)機(jī)制见妒。流量控制有多種策略,比較常用的有:針對(duì)訪(fǎng)問(wèn)速率的靜態(tài)流控甸陌、針對(duì)資源占用的動(dòng)態(tài)流控须揣、針對(duì)消費(fèi)者并發(fā)連接數(shù)的連接控制和針對(duì)并行訪(fǎng)問(wèn)數(shù)的并發(fā)控制。在實(shí)踐中钱豁,各種流量控制策略需要綜合使用才能起到較好的效果返敬。

在分布式架構(gòu)中,應(yīng)用和應(yīng)用之間的調(diào)用類(lèi)型分為以下兩種寥院,流控方式也略有不同劲赠。

同步RPC類(lèi)調(diào)用,比如RESTful秸谢,Dubbo凛澎,HSF等都屬于該類(lèi)。對(duì)于該類(lèi)同步調(diào)用估蹄,通常限流方式為兩種:針對(duì)服務(wù)提供者的并發(fā)全局流控塑煎,或針對(duì)服務(wù)消費(fèi)者的并發(fā)局部流控。兩種的控制手段類(lèi)似臭蚁,都是通過(guò)限制服務(wù)端或客服端并發(fā)調(diào)用數(shù)來(lái)進(jìn)行限制最铁。

異步MQ類(lèi)調(diào)用,典型如RocketMQ, ? ? ?Kafka垮兑,等冷尉。對(duì)于該類(lèi)異步調(diào)用,通常限流方式是在訂閱端限流系枪。限流方式為兩種:針對(duì)消息訂閱者的并發(fā)流控雀哨,或針對(duì)消息訂閱者的消費(fèi)延時(shí)流控。

針對(duì)消息訂閱者的消費(fèi)延時(shí)流控基本原理是,在每次客戶(hù)端消費(fèi)時(shí)雾棺,可以增加一個(gè)延時(shí)來(lái)控制消費(fèi)速度膊夹,這樣理論消費(fèi)并發(fā)最快速度為:

MaxRate = 1 / ConsumInterval * ConcurrentThreadNumber

比如如果消息并發(fā)消費(fèi)線(xiàn)程為20,延時(shí)為100ms捌浩,則理論上可以將并發(fā)消費(fèi)控制在200以下放刨。具體公式如下:

200 = 1 / 0.1 * 20

相比并發(fā)線(xiàn)程數(shù)流控,消費(fèi)延時(shí)流控優(yōu)點(diǎn)在于實(shí)現(xiàn)相對(duì)簡(jiǎn)單尸饺,對(duì)MQ類(lèi)客戶(hù)端包依賴(lài)較少进统,不需要客戶(hù)端提供控制并發(fā)線(xiàn)程數(shù)的動(dòng)態(tài)調(diào)整接口。

以上各種流量控制方法侵佃,在分布式架構(gòu)下麻昼,如果要做到全局動(dòng)態(tài)控制奠支,一個(gè)簡(jiǎn)單的技術(shù)方法是依賴(lài)配置中心馋辈,即通過(guò)配置中心來(lái)進(jìn)行流控參數(shù)的下發(fā)。

下面章節(jié)詳細(xì)介紹如何基于配置中心來(lái)實(shí)現(xiàn)異步消息消費(fèi)的全局動(dòng)態(tài)流控倍谜。使用的例子為阿里云上的?MQ?(消息隊(duì)列)和?ACM?(應(yīng)用配置管理)兩款產(chǎn)品迈螟。

注:之所以用MQ為示例是因?yàn)樵诒疚淖珜?xiě)之時(shí),正好MQ Consumer Client SDK并不支持動(dòng)態(tài)調(diào)整現(xiàn)成并發(fā)數(shù)尔崔,因此通過(guò)基于ACM來(lái)動(dòng)態(tài)調(diào)整消費(fèi)延遲的方法正好可以解決MQ消費(fèi)流控動(dòng)態(tài)的問(wèn)題答毫。


基于消費(fèi)延時(shí)流控的基本原理


基本原理如下。其中季春,管理員或應(yīng)用程序通過(guò)ACM控制臺(tái)發(fā)布消費(fèi)延時(shí)配置(RCV_INTERVAL_TIME)洗搂,所有MQ消費(fèi)程序訂閱該配置。理論上载弄,該配置從發(fā)布到下發(fā)所有客戶(hù)端耘拇,可以在1秒內(nèi)完成(取決于網(wǎng)絡(luò)演示)。


代碼示例

該章節(jié)基于配置中心來(lái)實(shí)現(xiàn)異步消息消費(fèi)的全局動(dòng)態(tài)流控的代碼示例宇攻。使用的例子為阿里云上的MQ(消息隊(duì)列)和ACM(應(yīng)用配置管理)兩款產(chǎn)品惫叛,基于Java語(yǔ)言。關(guān)于SDK的詳細(xì)介紹逞刷,可參見(jiàn)兩款產(chǎn)品的官方文檔嘉涌。

在ACM上創(chuàng)建消費(fèi)延時(shí)的參數(shù),截屏如下夸浅。


設(shè)置全局消費(fèi)延時(shí)變量


首先仑最,設(shè)置消費(fèi)接收延時(shí)的全局變量, 如下帆喇。

? ? ? ? ?// 初始化消息接收延時(shí)參數(shù)词身,單位為millisecond

? ? ? ? ? ? static int RCV_INTERVAL_TIME = 10000;

? ? ? ? ? ? // 初始化配置服務(wù),控制臺(tái)通過(guò)示例代碼自動(dòng)獲取下面參數(shù)

? ? ? ? ? ? ConfigService.init("acm.aliyun.com", /*租戶(hù)ID*/"xxx", /*AK*/"xxx", /*SK*/"yyy"); ? ?

? ? ? ? ? ? // 主動(dòng)獲取配置

? ? ? ? ? ? String content = ConfigService.getConfig("app.mq.qos", "DEFAULT_GROUP", 6000);

? ? ? ? ? ? Properties p = new Properties();

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? p.load(new StringReader(content));

? ? ? ? ? ? ? ? RCV_INTERVAL_TIME = Integer.valueOf(p.getProperty("RCV_INTERVAL_TIME"));

? ? ? ? ? ? } catch (IOException e) {

? ? ? ? ? ? ? ? e.printStackTrace();

? ? ? ? ? ? }

其次番枚,設(shè)置ACM listener法严,確保當(dāng)配置被修改時(shí)损敷,即使更新 RCV_INTERVAL_TIME 參數(shù), 如下深啤。

// 初始化的時(shí)候拗馒,給配置添加監(jiān)聽(tīng),配置變更會(huì)回調(diào)通知

? ? ? ? ? ? ConfigService.addListener("app.mq.qos", "DEFAULT_GROUP", new ConfigChangeListener() {

? ? ? ? ? ? ? ? public void receiveConfigInfo(String configInfo) {

? ? ? ? ? ? ? ? ? ? Properties p = new Properties();

? ? ? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? ? ? p.load(new StringReader(configInfo));

? ? ? ? ? ? ? ? ? ? ? ? RCV_INTERVAL_TIME = Integer.valueOf(p.getProperty("RCV_INTERVAL_TIME"));

? ? ? ? ? ? ? ? ? ? } catch (IOException e) {

? ? ? ? ? ? ? ? ? ? ? ? e.printStackTrace();

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? }

? ? ? ? ? ? });


設(shè)置 MQ 消費(fèi)延時(shí)邏輯


完整實(shí)例如下。

注:這里 RCV_INTERVAL_TIME 參數(shù)的訪(fǎng)問(wèn)是故意沒(méi)有加鎖的溯街,讀者可以自行思考原因诱桂。Aliyun ONS Client不提供動(dòng)態(tài)線(xiàn)程并發(fā)數(shù),默認(rèn)并發(fā)為20呈昔。因此這里正好使用消費(fèi)延時(shí)參數(shù)來(lái)動(dòng)態(tài)調(diào)節(jié)QoS挥等。

?//以下代碼可直接貼在Main()函數(shù)里

? ? ? ? Properties properties = new Properties();

? ? ? ? properties.put(PropertyKeyConst.ConsumerId, "CID_consumer_group");

? ? ? ? properties.put(PropertyKeyConst.AccessKey,"xxx");

? ? ? ? properties.put(PropertyKeyConst.SecretKey, "yyy");

? ? ? ? properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");

? ? ? ? // 設(shè)置 TCP 接入域名(此處以公共云生產(chǎn)環(huán)境為例)

? ? ? ? properties.put(PropertyKeyConst.ONSAddr,

? ? ? ? ? "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");

? ? ? ? Consumer consumer = ONSFactory.createConsumer(properties);

? ? ? ? consumer.subscribe(/*Topic*/"topic-name", /*Tag*/null, new MessageListener()?

? ? ? ? {

? ? ? ? ? ? public Action consume(Message message, ConsumeContext context) {

? ? ? ? ? ? ? ? // MQ Subscribe QoS logical start,?

? ? ? ? ? ? ? ? // Each consuming process will sleep for RCV_INTERVAL_TIME seconds with 100 ms sleeping cycle.

? ? ? ? ? ? ? ? // Within each cycle, the thread will check RCV_INTERVAL_TIME in case it's set to a smaller value.?

? ? ? ? ? ? ? ? // RCV_INTERVAL_TIME <= 0 means no sleeping.

? ? ? ? ? ? ? ? int rcvIntervalTimeLeft = RCV_INTERVAL_TIME;

? ? ? ? ? ? ? ? while (rcvIntervalTimeLeft > 0) {

? ? ? ? ? ? ? ? ? ? if (rcvIntervalTimeLeft > RCV_INTERVAL_TIME) {

? ? ? ? ? ? ? ? ? ? ? ? rcvIntervalTimeLeft = RCV_INTERVAL_TIME;

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? ? ? if (rcvIntervalTimeLeft >= 100) {

? ? ? ? ? ? ? ? ? ? ? ? ? ? rcvIntervalTimeLeft -= 100;

? ? ? ? ? ? ? ? ? ? ? ? ? ? Thread.sleep(100);

? ? ? ? ? ? ? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? ? ? ? ? ? ? Thread.sleep(rcvIntervalTimeLeft);

? ? ? ? ? ? ? ? ? ? ? ? ? ? rcvIntervalTimeLeft = 0;

? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? } catch (InterruptedException e) {

? ? ? ? ? ? ? ? ? ? ? ? e.printStackTrace();

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? // MQ Subscribe interval logical ends

? ? ? ? ? ? ? ? System.out.println("Receive: " + message);

? ? ? ? ? ? ? ? /*

? ? ? ? ? ? ? ? ?* Put your business logic here.

? ? ? ? ? ? ? ? ?*/

? ? ? ? ? ? ? ? doSomething();

? ? ? ? ? ? ? ? return Action.CommitMessage;

? ? ? ? ? ? }

? ? ? ? });

? ? ? ? consumer.start();


運(yùn)行結(jié)果


單機(jī)運(yùn)行consumer進(jìn)行消費(fèi),假設(shè)queue內(nèi)的消息無(wú)限多堤尾,不存在消費(fèi)萬(wàn)的情況肝劲,分三段測(cè)試,分別運(yùn)行約5分鐘,通過(guò)ACM配置推送來(lái)達(dá)到以下效果郭宝。

RCV_INTERVAL_TIME ? ? ?= 100 ms

RCV_INTERVAL_TIME ? ? ?= 5000 ms

RCV_INTERVAL_TIME ? ? ?= 1000 ms

結(jié)果如下辞槐,在單MQ消費(fèi)業(yè)務(wù)處理耗時(shí)約100ms情況下的,單機(jī)并發(fā)20線(xiàn)程的測(cè)試結(jié)果粘室。

RCV_INTERVAL_TIME? = 100 ms:平均消費(fèi)性能約為 9000 tpm 左右

RCV_INTERVAL_TIME? = 5000 ms:平均消費(fèi)性能被限制到了 200 tpm 左右

RCV_INTERVAL_TIME? = 1000 ms:平均消費(fèi)性能回升到到了 1100 tpm 左右

以上結(jié)果基本達(dá)到消費(fèi)和 tpm 成反比的預(yù)期榄檬,最關(guān)鍵的是整個(gè)過(guò)程中,應(yīng)用不中斷衔统,流控推送結(jié)果秒級(jí)生效到分布式集群鹿榜。單機(jī)性能結(jié)果如下所示。


相關(guān)產(chǎn)品詳情請(qǐng)參見(jiàn):

? 消息產(chǎn)品

Aliyun MQ:aliyun.com/product/ons

? 配置中心產(chǎn)品

Aliyun ACM:aliyun.com/product/acm

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末锦爵,一起剝皮案震驚了整個(gè)濱河市舱殿,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌棉浸,老刑警劉巖怀薛,帶你破解...
    沈念sama閱讀 206,968評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異迷郑,居然都是意外死亡枝恋,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén)嗡害,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)焚碌,“玉大人,你說(shuō)我怎么就攤上這事霸妹∈纾” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 153,220評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀(guān)的道長(zhǎng)鹃骂。 經(jīng)常有香客問(wèn)我台盯,道長(zhǎng),這世上最難降的妖魔是什么畏线? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,416評(píng)論 1 279
  • 正文 為了忘掉前任静盅,我火速辦了婚禮,結(jié)果婚禮上寝殴,老公的妹妹穿的比我還像新娘蒿叠。我一直安慰自己,他們只是感情好蚣常,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,425評(píng)論 5 374
  • 文/花漫 我一把揭開(kāi)白布市咽。 她就那樣靜靜地躺著,像睡著了一般抵蚊。 火紅的嫁衣襯著肌膚如雪施绎。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,144評(píng)論 1 285
  • 那天泌射,我揣著相機(jī)與錄音粘姜,去河邊找鬼鬓照。 笑死熔酷,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的豺裆。 我是一名探鬼主播拒秘,決...
    沈念sama閱讀 38,432評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼臭猜!你這毒婦竟也來(lái)了躺酒?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 37,088評(píng)論 0 261
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤蔑歌,失蹤者是張志新(化名)和其女友劉穎羹应,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體次屠,經(jīng)...
    沈念sama閱讀 43,586評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡园匹,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,028評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了劫灶。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片裸违。...
    茶點(diǎn)故事閱讀 38,137評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖本昏,靈堂內(nèi)的尸體忽然破棺而出恕稠,到底是詐尸還是另有隱情局骤,我是刑警寧澤惋鸥,帶...
    沈念sama閱讀 33,783評(píng)論 4 324
  • 正文 年R本政府宣布播演,位于F島的核電站,受9級(jí)特大地震影響峰搪,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,343評(píng)論 3 307
  • 文/蒙蒙 一岸啡、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧赫编,春花似錦巡蘸、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,333評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至嘹吨,卻和暖如春搬味,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背蟀拷。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,559評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工碰纬, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人问芬。 一個(gè)月前我還...
    沈念sama閱讀 45,595評(píng)論 2 355
  • 正文 我出身青樓悦析,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親此衅。 傳聞我的和親對(duì)象是個(gè)殘疾皇子强戴,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,901評(píng)論 2 345