brpc之自適應(yīng)限流分析

之前在“服務(wù)過載時的一些思考”中說到過載相關(guān)的問題,最后有五個小問題:一是如何從源頭避免當(dāng)請求的服務(wù)過載時的反應(yīng)卷雕,是重試還是如何蒸甜?二是服務(wù)端在過載時如何優(yōu)雅的拒絕后面的請求雷猪?三是過載時如何快速恢復(fù)正常服務(wù)懊昨?四是框架是否有可改進和優(yōu)化的地方?第五個問題是服務(wù)如何判斷當(dāng)前服務(wù)是否過載春宣?這里就這個問題分析下brpc中的相關(guān)實現(xiàn)酵颁。

因為這里brpc收發(fā)消息和業(yè)務(wù)處理都在一個進程中,包括之前的skynet也是月帝,所以這里收到消息后躏惋,只能在后續(xù)邏輯中判斷要不要處理,而不是比如單獨的接入層進程收發(fā)消息在接入層處理負(fù)載問題嚷辅。

簡單的方法可以在收到消息時對每個請求加上當(dāng)前時間簿姨,邏輯那邊處理該請求消息時判斷下時間,過久的話直接丟棄簸搞,包括后續(xù)的請求扁位。如果消息隊列長度無限制,也可以根據(jù)當(dāng)前的消息數(shù)量來決定趁俊。但這里有個問題域仇,有些場合是需要先解析消息等一些判斷,這個也是花時間的寺擂,可能處理消息的時間遠(yuǎn)遠(yuǎn)小于解析的時間暇务?還個就是應(yīng)用場景的情況,在游戲中怔软,大部分是寫請求垦细,如果丟掉當(dāng)前的,可能會有些影響挡逼。包括早期不正確的設(shè)計括改,比如釋放一個技能,開始和結(jié)束都要廣播給周圍的玩家家坎,如果沒有開始的消息只有結(jié)束的嘱能,可能處理就有問題吝梅,不過這些設(shè)計后期都已經(jīng)修正(優(yōu)化)。

這里說的是同時在處理的請求數(shù)焰檩,而非socket連接數(shù)憔涉。當(dāng)同時處理的請求數(shù)達(dá)到服務(wù)的限制時订框,直接給客戶端回brpc::ELIMIT錯誤析苫,而不會調(diào)用服務(wù)回調(diào)〈┌猓看到ELIMIT錯誤的client應(yīng)重試另一個server衩侥。

在服務(wù)收到請求時會進行[server級]的并發(fā)請求數(shù)字段累加并判斷:

396         if (!server_accessor.AddConcurrency(cntl.get())) {
397             cntl->SetFailed(
398                 ELIMIT, "Reached server's max_concurrency=%d",
399                 server->options().max_concurrency);
400             break;
401         }
402 
403         if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
404             cntl->SetFailed(ELIMIT, "Too many user code to run when"
405                             " -usercode_in_pthread is on");
406             break;
407         }

 43     // Returns true if the `max_concurrency' limit is not reached.
 44     bool AddConcurrency(Controller* c) {
 45         if (_server->options().max_concurrency <= 0) {
 46             return true;
 47         }
 48         c->add_flag(Controller::FLAGS_ADDED_CONCURRENCY);
 49         return (butil::subtle::NoBarrier_AtomicIncrement(&_server->_concurrency, 1)
 50                 <= _server->options().max_concurrency);
 51     }

接著對[method級]進行判斷是否達(dá)到并發(fā)數(shù)上限:

440         method_status = mp->status;
441         if (method_status) {
442             int rejected_cc = 0;
443             if (!method_status->OnRequested(&rejected_cc)) {
444                 cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
445                                 mp->method->full_name().c_str(), rejected_cc);
446                 break;
447             }
448         }

 93 inline bool MethodStatus::OnRequested(int* rejected_cc) {
 94     const int cc = _nconcurrency.fetch_add(1, butil::memory_order_relaxed) + 1;
 95     if (NULL == _cl || _cl->OnRequested(cc)) {
 96         return true;
 97     } 
 98     if (rejected_cc) {   
 99         *rejected_cc = cc;
100     }
101     return false;
102 }

 90 bool AutoConcurrencyLimiter::OnRequested(int current_concurrency) {
 91     return current_concurrency <= _max_concurrency;
 92 }

這個類AutoConcurrencyLimiter后面分析。以上是請求到達(dá)時矛物,先進行server層的限流茫死,然后再進行具體的method層的限流。收到請求時_nconcurrency增加一履羞,返回響應(yīng)時減一峦萎,當(dāng)返回響應(yīng)時由concurrency_remover析構(gòu)時進行減操作:

152     ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);

153 ConcurrencyRemover::~ConcurrencyRemover() {
154     if (_status) {
155         _status->OnResponded(_c->ErrorCode(), butil::cpuwide_time_us() - _received_us);
156         _status = NULL;
157     }   
158     ServerPrivateAccessor(_c->server()).RemoveConcurrency(_c);
159 }

104 inline void MethodStatus::OnResponded(int error_code, int64_t latency) {
105     _nconcurrency.fetch_sub(1, butil::memory_order_relaxed);
106     if (0 == error_code) {
107         _latency_rec << latency;
108     } else {
109         _nerror_bvar << 1;
110     }
111     if (NULL != _cl) {
112         _cl->OnResponded(error_code, latency);
113     }   
114 }

 53     void RemoveConcurrency(const Controller* c) {
 54         if (c->has_flag(Controller::FLAGS_ADDED_CONCURRENCY)) {
 55             butil::subtle::NoBarrier_AtomicIncrement(&_server->_concurrency, -1);
 56         }
 57     }

以上實現(xiàn)簡單的判斷是否超過并發(fā)數(shù)限制,而上面并沒有給出如何達(dá)到限制數(shù)忆首,只是簡單的判斷current_concurrency <= _max_concurrency爱榔。

對于一個請求,會設(shè)置它的達(dá)到時間糙及,處理完后算出latency并更新AutoConcurrencyLimiter實例的相關(guān)數(shù)據(jù)详幽,前者繼承自ConcurrencyLimiter類,有兩個virtual接口OnRequested和OnResponded用于在請求到來時判斷是否要處理和返回響應(yīng)時再設(shè)置:

 32     // This method should be called each time a request comes in. It returns
 33     // false when the concurrency reaches the upper limit, otherwise it 
 34     // returns true. Normally, when OnRequested returns false, you should 
 35     // return an ELIMIT error directly.
 36     virtual bool OnRequested(int current_concurrency) = 0;
 37 
 38     // Each request should call this method before responding.
 39     // `error_code' : Error code obtained from the controller, 0 means success.
 40     // `latency' : Microseconds taken by RPC.
 41     // NOTE: Even if OnRequested returns false, after sending ELIMIT, you 
 42     // still need to call OnResponded.
 43     virtual void OnResponded(int error_code, int64_t latency_us) = 0;
 94 void AutoConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) {
 95     if (0 == error_code) {
 96         _total_succ_req.fetch_add(1, butil::memory_order_relaxed);
 97     } else if (ELIMIT == error_code) {
 98         return;
 99     }
100 
101     const int64_t now_time_us = butil::gettimeofday_us();
102     int64_t last_sampling_time_us =
103         _last_sampling_time_us.load(butil::memory_order_relaxed);
104 
105     if (last_sampling_time_us == 0 ||
106         now_time_us - last_sampling_time_us >=
107             FLAGS_auto_cl_sampling_interval_ms * 1000) {
108         bool sample_this_call = _last_sampling_time_us.compare_exchange_strong(
109             last_sampling_time_us, now_time_us, butil::memory_order_relaxed);
110         if (sample_this_call) {
111             bool sample_window_submitted = AddSample(error_code, latency_us,
112                                                      now_time_us);
113             if (sample_window_submitted) {
114                 //more code...
123         }
124     }
125 }

這里根據(jù)處理的返回值來更新采樣數(shù)據(jù)浸锨,對返回值ELIMIT的情況不統(tǒng)計唇聘,其中auto_cl_sampling_interval_ms是采樣間隔,默認(rèn)是0.1毫秒柱搜。如果該間隔內(nèi)處理成功的請求越多迟郎,則_total_succ_req越大,當(dāng)超過采樣間隔時聪蘸,則更新AddSample:

138 bool AutoConcurrencyLimiter::AddSample(int error_code,
139                                        int64_t latency_us,
140                                        int64_t sampling_time_us) {
141     std::unique_lock<butil::Mutex> lock_guard(_sw_mutex);
142     if (_reset_latency_us != 0) {
143         // min_latency is about to be reset soon.
144         if (_reset_latency_us > sampling_time_us) {
145             // ignoring samples during waiting for the deadline.
146             return false;
147         }
148         // Remeasure min_latency when concurrency has dropped to low load
149         _min_latency_us = -1;
150         _reset_latency_us = 0;
151         _remeasure_start_us = NextResetTime(sampling_time_us);
152         ResetSampleWindow(sampling_time_us);
153     }
154 
155     if (_sw.start_time_us == 0) {
156         _sw.start_time_us = sampling_time_us;
157     }
158 
159     if (error_code != 0 && FLAGS_auto_cl_enable_error_punish) {
160         ++_sw.failed_count;
161         _sw.total_failed_us += latency_us;
162     } else if (error_code == 0) {
163         ++_sw.succ_count;
164         _sw.total_succ_us += latency_us;
165     }
167     if (_sw.succ_count + _sw.failed_count < FLAGS_auto_cl_min_sample_count) {
168         if (sampling_time_us - _sw.start_time_us >=
169             FLAGS_auto_cl_sample_window_size_ms * 1000) {
170             // If the sample size is insufficient at the end of the sampling 
171             // window, discard the entire sampling window
172             ResetSampleWindow(sampling_time_us);
173         }
174         return false;
175     }
176     if (sampling_time_us - _sw.start_time_us <
177         FLAGS_auto_cl_sample_window_size_ms * 1000 &&
178         _sw.succ_count + _sw.failed_count < FLAGS_auto_cl_max_sample_count) {
179         return false;
180     }
181 
182     if(_sw.succ_count > 0) {
183         UpdateMaxConcurrency(sampling_time_us);
184     } else {
185         // All request failed
186         _max_concurrency /= 2;
187     }
188     ResetSampleWindow(sampling_time_us);
189     return true;
190 }

其中_sw為采樣窗口數(shù)據(jù)統(tǒng)計谎亩,每一個auto_cl_sample_window_size_ms周期內(nèi),對于處理失敗的情況且有懲罰值則會統(tǒng)計總失敗個數(shù)和total_failed_us和宇姚。auto_cl_sample_window_size_ms為一次采樣的周期時間匈庭。總請求個數(shù)小于auto_cl_min_sample_count時浑劳,如果超過采樣周期則重置_sw阱持,否則繼續(xù)統(tǒng)計,之后要么超過采樣周期魔熏,要么達(dá)到auto_cl_max_sample_count最大請求個數(shù)衷咽,窗口中積累足夠多的樣本數(shù)據(jù)鸽扁,那么本次采樣結(jié)束,此時會更新_max_concurrency镶骗,對于所有請求都失敗的情況直接把_max_concurrency降為一半桶现。否則會UpdateMaxConcurrency:

219 void AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
220     int32_t total_succ_req = _total_succ_req.load(butil::memory_order_relaxed);
221     double failed_punish = _sw.total_failed_us * FLAGS_auto_cl_fail_punish_ratio;
222     int64_t avg_latency =
223         std::ceil((failed_punish + _sw.total_succ_us) / _sw.succ_count);
224     double qps = 1000000.0 * total_succ_req / (sampling_time_us - _sw.start_time_us);
225     UpdateMinLatency(avg_latency);
226     UpdateQps(qps);
227 
228     int next_max_concurrency = 0;
229     // Remeasure min_latency at regular intervals
230     if (_remeasure_start_us <= sampling_time_us) {
231         const double reduce_ratio = FLAGS_auto_cl_reduce_ratio_while_remeasure;
232         _reset_latency_us = sampling_time_us + avg_latency * 2;
233         next_max_concurrency =
234             std::ceil(_ema_max_qps * _min_latency_us / 1000000 * reduce_ratio);
235     } else {
236         const double change_step = FLAGS_auto_cl_change_rate_of_explore_ratio;
237         const double max_explore_ratio = FLAGS_auto_cl_max_explore_ratio;
238         const double min_explore_ratio = FLAGS_auto_cl_min_explore_ratio;
239         const double correction_factor = FLAGS_auto_cl_latency_fluctuation_correction_factor;
240         if (avg_latency <= _min_latency_us * (1.0 + min_explore_ratio * correction_factor) ||
241             qps <= _ema_max_qps / (1.0 + min_explore_ratio)) {
242             _explore_ratio  = std::min(max_explore_ratio, _explore_ratio + change_step);
243         } else {
244             _explore_ratio = std::max(min_explore_ratio, _explore_ratio - change_step);
245         }
246         next_max_concurrency =
247             _min_latency_us * _ema_max_qps / 1000000 *  (1 + _explore_ratio);
248     }
249 
250     if (next_max_concurrency != _max_concurrency) {
251         _max_concurrency = next_max_concurrency;
252     }
253 }

201 void AutoConcurrencyLimiter::UpdateMinLatency(int64_t latency_us) {
202     const double ema_factor = FLAGS_auto_cl_alpha_factor_for_ema;
203     if (_min_latency_us <= 0) {
204         _min_latency_us = latency_us;
205     } else if (latency_us < _min_latency_us) {
206         _min_latency_us = latency_us * ema_factor + _min_latency_us * (1 - ema_factor);
207     }
208 }

210 void AutoConcurrencyLimiter::UpdateQps(double qps) {
211     const double ema_factor = FLAGS_auto_cl_alpha_factor_for_ema / 10;
212     if (qps >= _ema_max_qps) {
213         _ema_max_qps = qps;
214     } else {
215         _ema_max_qps = qps * ema_factor + _ema_max_qps * (1 - ema_factor);
216     }
217 }

avg_latency為該采樣窗口內(nèi),處理失敗請求所花時間總和鼎姊,乘以懲罰系數(shù)的時間骡和,加上處理成功請求所花的總時間,除以成功請求總個數(shù)相寇。
在UpdateMinLatency中慰于,為了減少個別窗口的抖動對限流算法的影響,同時盡量降低計算開銷唤衫,計算min_latency時會通過使用EMA來進行平滑處理婆赠,同理UpdateQps。

引用:自適應(yīng)限流
假如從啟動到打滿qps的時間過長佳励,這期間會損失大量流量休里。在這里我們采取的措施有兩個:
1采樣方面,一旦采到的請求數(shù)量足夠多赃承,直接提交當(dāng)前采樣窗口妙黍,而不是等待采樣窗口的到時間了才提交
2計算公式方面,當(dāng)current_qps > 保存的max_qps時楣导,直接進行更新废境,不進行平滑處理
在進行了這兩個處理之后,絕大部分情況下都能夠在2秒左右將qps打滿筒繁。

_max_concurrency的有效時間由_reset_latency_us決定噩凹,即以當(dāng)前時間延后avg_latency的兩倍。在這段時間內(nèi)的樣本數(shù)據(jù)不采集直到超時重新計算_min_latency_us和_max_concurrency毡咏。_max_concurrency會在每個采樣窗口周期內(nèi)進行更新驮宴,_min_latency_us和_ema_max_qps進行平滑處理。若_remeasure_start_us超時則:

233 next_max_concurrency = std::ceil(_ema_max_qps * _min_latency_us / 1000000 * reduce_ratio);

否則可能要以一定的比例系數(shù)增大或減小_max_concurrency呕缭,根據(jù)avg_latency和qps:

240         if (avg_latency <= _min_latency_us * (1.0 + min_explore_ratio * correction_factor) ||
241             qps <= _ema_max_qps / (1.0 + min_explore_ratio)) {
242             _explore_ratio  = std::min(max_explore_ratio, _explore_ratio + change_step);
243         } else {
244             _explore_ratio = std::max(min_explore_ratio, _explore_ratio - change_step);
245         }
246         next_max_concurrency =
247             _min_latency_us * _ema_max_qps / 1000000 *  (1 + _explore_ratio);

當(dāng)latency與min_latency很接近時堵泽,根據(jù)計算公式會得到一個較高max_concurrency來適應(yīng)concurrency的波動,從而盡可能的減少“誤殺”恢总。同時迎罗,隨著latency的升高,max_concurrency會逐漸降低片仿,以保護服務(wù)不會過載纹安。

max_concurrency = max_qps * ((2+alpha) * min_latency - latency)

這塊代碼其實很好明白,但需要涉及過多的理論基礎(chǔ)才能充分理解為何這么設(shè)計,以及各比例參數(shù)的確定厢岂。

自適應(yīng)限流
限制最大并發(fā)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末光督,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子塔粒,更是在濱河造成了極大的恐慌结借,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,695評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件卒茬,死亡現(xiàn)場離奇詭異船老,居然都是意外死亡,警方通過查閱死者的電腦和手機扬虚,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評論 3 399
  • 文/潘曉璐 我一進店門努隙,熙熙樓的掌柜王于貴愁眉苦臉地迎上來球恤,“玉大人辜昵,你說我怎么就攤上這事⊙矢” “怎么了堪置?”我有些...
    開封第一講書人閱讀 168,130評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長张惹。 經(jīng)常有香客問我舀锨,道長,這世上最難降的妖魔是什么宛逗? 我笑而不...
    開封第一講書人閱讀 59,648評論 1 297
  • 正文 為了忘掉前任坎匿,我火速辦了婚禮,結(jié)果婚禮上雷激,老公的妹妹穿的比我還像新娘替蔬。我一直安慰自己,他們只是感情好屎暇,可當(dāng)我...
    茶點故事閱讀 68,655評論 6 397
  • 文/花漫 我一把揭開白布承桥。 她就那樣靜靜地躺著,像睡著了一般根悼。 火紅的嫁衣襯著肌膚如雪凶异。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,268評論 1 309
  • 那天挤巡,我揣著相機與錄音剩彬,去河邊找鬼。 笑死矿卑,一個胖子當(dāng)著我的面吹牛喉恋,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播,決...
    沈念sama閱讀 40,835評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼瀑晒,長吁一口氣:“原來是場噩夢啊……” “哼绍坝!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起苔悦,我...
    開封第一講書人閱讀 39,740評論 0 276
  • 序言:老撾萬榮一對情侶失蹤轩褐,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后玖详,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體把介,經(jīng)...
    沈念sama閱讀 46,286評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡惩琉,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,375評論 3 340
  • 正文 我和宋清朗相戀三年悲酷,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片溜嗜。...
    茶點故事閱讀 40,505評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡向臀,死狀恐怖巢墅,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情券膀,我是刑警寧澤君纫,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站芹彬,受9級特大地震影響蓄髓,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜舒帮,卻給世界環(huán)境...
    茶點故事閱讀 41,873評論 3 333
  • 文/蒙蒙 一会喝、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧玩郊,春花似錦肢执、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至临庇,卻和暖如春反璃,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背假夺。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評論 1 272
  • 我被黑心中介騙來泰國打工淮蜈, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人已卷。 一個月前我還...
    沈念sama閱讀 48,921評論 3 376
  • 正文 我出身青樓梧田,卻偏偏與公主長得像,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子裁眯,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,515評論 2 359

推薦閱讀更多精彩內(nèi)容