之前在“服務(wù)過載時的一些思考”中說到過載相關(guān)的問題,最后有五個小問題:一是如何從源頭避免當(dāng)請求的服務(wù)過載時的反應(yīng)卷雕,是重試還是如何蒸甜?二是服務(wù)端在過載時如何優(yōu)雅的拒絕后面的請求雷猪?三是過載時如何快速恢復(fù)正常服務(wù)懊昨?四是框架是否有可改進和優(yōu)化的地方?第五個問題是服務(wù)如何判斷當(dāng)前服務(wù)是否過載春宣?這里就這個問題分析下brpc中的相關(guān)實現(xiàn)酵颁。
因為這里brpc收發(fā)消息和業(yè)務(wù)處理都在一個進程中,包括之前的skynet也是月帝,所以這里收到消息后躏惋,只能在后續(xù)邏輯中判斷要不要處理,而不是比如單獨的接入層進程收發(fā)消息在接入層處理負(fù)載問題嚷辅。
簡單的方法可以在收到消息時對每個請求加上當(dāng)前時間簿姨,邏輯那邊處理該請求消息時判斷下時間,過久的話直接丟棄簸搞,包括后續(xù)的請求扁位。如果消息隊列長度無限制,也可以根據(jù)當(dāng)前的消息數(shù)量來決定趁俊。但這里有個問題域仇,有些場合是需要先解析消息等一些判斷,這個也是花時間的寺擂,可能處理消息的時間遠(yuǎn)遠(yuǎn)小于解析的時間暇务?還個就是應(yīng)用場景的情況,在游戲中怔软,大部分是寫請求垦细,如果丟掉當(dāng)前的,可能會有些影響挡逼。包括早期不正確的設(shè)計括改,比如釋放一個技能,開始和結(jié)束都要廣播給周圍的玩家家坎,如果沒有開始的消息只有結(jié)束的嘱能,可能處理就有問題吝梅,不過這些設(shè)計后期都已經(jīng)修正(優(yōu)化)。
這里說的是同時在處理的請求數(shù)焰檩,而非socket連接數(shù)憔涉。當(dāng)同時處理的請求數(shù)達(dá)到服務(wù)的限制時订框,直接給客戶端回brpc::ELIMIT錯誤析苫,而不會調(diào)用服務(wù)回調(diào)〈┌猓看到ELIMIT錯誤的client應(yīng)重試另一個server衩侥。
在服務(wù)收到請求時會進行[server級]的并發(fā)請求數(shù)字段累加并判斷:
396 if (!server_accessor.AddConcurrency(cntl.get())) {
397 cntl->SetFailed(
398 ELIMIT, "Reached server's max_concurrency=%d",
399 server->options().max_concurrency);
400 break;
401 }
402
403 if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
404 cntl->SetFailed(ELIMIT, "Too many user code to run when"
405 " -usercode_in_pthread is on");
406 break;
407 }
43 // Returns true if the `max_concurrency' limit is not reached.
44 bool AddConcurrency(Controller* c) {
45 if (_server->options().max_concurrency <= 0) {
46 return true;
47 }
48 c->add_flag(Controller::FLAGS_ADDED_CONCURRENCY);
49 return (butil::subtle::NoBarrier_AtomicIncrement(&_server->_concurrency, 1)
50 <= _server->options().max_concurrency);
51 }
接著對[method級]進行判斷是否達(dá)到并發(fā)數(shù)上限:
440 method_status = mp->status;
441 if (method_status) {
442 int rejected_cc = 0;
443 if (!method_status->OnRequested(&rejected_cc)) {
444 cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
445 mp->method->full_name().c_str(), rejected_cc);
446 break;
447 }
448 }
93 inline bool MethodStatus::OnRequested(int* rejected_cc) {
94 const int cc = _nconcurrency.fetch_add(1, butil::memory_order_relaxed) + 1;
95 if (NULL == _cl || _cl->OnRequested(cc)) {
96 return true;
97 }
98 if (rejected_cc) {
99 *rejected_cc = cc;
100 }
101 return false;
102 }
90 bool AutoConcurrencyLimiter::OnRequested(int current_concurrency) {
91 return current_concurrency <= _max_concurrency;
92 }
這個類AutoConcurrencyLimiter后面分析。以上是請求到達(dá)時矛物,先進行server層的限流茫死,然后再進行具體的method層的限流。收到請求時_nconcurrency增加一履羞,返回響應(yīng)時減一峦萎,當(dāng)返回響應(yīng)時由concurrency_remover析構(gòu)時進行減操作:
152 ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);
153 ConcurrencyRemover::~ConcurrencyRemover() {
154 if (_status) {
155 _status->OnResponded(_c->ErrorCode(), butil::cpuwide_time_us() - _received_us);
156 _status = NULL;
157 }
158 ServerPrivateAccessor(_c->server()).RemoveConcurrency(_c);
159 }
104 inline void MethodStatus::OnResponded(int error_code, int64_t latency) {
105 _nconcurrency.fetch_sub(1, butil::memory_order_relaxed);
106 if (0 == error_code) {
107 _latency_rec << latency;
108 } else {
109 _nerror_bvar << 1;
110 }
111 if (NULL != _cl) {
112 _cl->OnResponded(error_code, latency);
113 }
114 }
53 void RemoveConcurrency(const Controller* c) {
54 if (c->has_flag(Controller::FLAGS_ADDED_CONCURRENCY)) {
55 butil::subtle::NoBarrier_AtomicIncrement(&_server->_concurrency, -1);
56 }
57 }
以上實現(xiàn)簡單的判斷是否超過并發(fā)數(shù)限制,而上面并沒有給出如何達(dá)到限制數(shù)忆首,只是簡單的判斷current_concurrency <= _max_concurrency爱榔。
對于一個請求,會設(shè)置它的達(dá)到時間糙及,處理完后算出latency并更新AutoConcurrencyLimiter實例的相關(guān)數(shù)據(jù)详幽,前者繼承自ConcurrencyLimiter類,有兩個virtual接口OnRequested和OnResponded用于在請求到來時判斷是否要處理和返回響應(yīng)時再設(shè)置:
32 // This method should be called each time a request comes in. It returns
33 // false when the concurrency reaches the upper limit, otherwise it
34 // returns true. Normally, when OnRequested returns false, you should
35 // return an ELIMIT error directly.
36 virtual bool OnRequested(int current_concurrency) = 0;
37
38 // Each request should call this method before responding.
39 // `error_code' : Error code obtained from the controller, 0 means success.
40 // `latency' : Microseconds taken by RPC.
41 // NOTE: Even if OnRequested returns false, after sending ELIMIT, you
42 // still need to call OnResponded.
43 virtual void OnResponded(int error_code, int64_t latency_us) = 0;
94 void AutoConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) {
95 if (0 == error_code) {
96 _total_succ_req.fetch_add(1, butil::memory_order_relaxed);
97 } else if (ELIMIT == error_code) {
98 return;
99 }
100
101 const int64_t now_time_us = butil::gettimeofday_us();
102 int64_t last_sampling_time_us =
103 _last_sampling_time_us.load(butil::memory_order_relaxed);
104
105 if (last_sampling_time_us == 0 ||
106 now_time_us - last_sampling_time_us >=
107 FLAGS_auto_cl_sampling_interval_ms * 1000) {
108 bool sample_this_call = _last_sampling_time_us.compare_exchange_strong(
109 last_sampling_time_us, now_time_us, butil::memory_order_relaxed);
110 if (sample_this_call) {
111 bool sample_window_submitted = AddSample(error_code, latency_us,
112 now_time_us);
113 if (sample_window_submitted) {
114 //more code...
123 }
124 }
125 }
這里根據(jù)處理的返回值來更新采樣數(shù)據(jù)浸锨,對返回值ELIMIT的情況不統(tǒng)計唇聘,其中auto_cl_sampling_interval_ms是采樣間隔,默認(rèn)是0.1毫秒柱搜。如果該間隔內(nèi)處理成功的請求越多迟郎,則_total_succ_req越大,當(dāng)超過采樣間隔時聪蘸,則更新AddSample:
138 bool AutoConcurrencyLimiter::AddSample(int error_code,
139 int64_t latency_us,
140 int64_t sampling_time_us) {
141 std::unique_lock<butil::Mutex> lock_guard(_sw_mutex);
142 if (_reset_latency_us != 0) {
143 // min_latency is about to be reset soon.
144 if (_reset_latency_us > sampling_time_us) {
145 // ignoring samples during waiting for the deadline.
146 return false;
147 }
148 // Remeasure min_latency when concurrency has dropped to low load
149 _min_latency_us = -1;
150 _reset_latency_us = 0;
151 _remeasure_start_us = NextResetTime(sampling_time_us);
152 ResetSampleWindow(sampling_time_us);
153 }
154
155 if (_sw.start_time_us == 0) {
156 _sw.start_time_us = sampling_time_us;
157 }
158
159 if (error_code != 0 && FLAGS_auto_cl_enable_error_punish) {
160 ++_sw.failed_count;
161 _sw.total_failed_us += latency_us;
162 } else if (error_code == 0) {
163 ++_sw.succ_count;
164 _sw.total_succ_us += latency_us;
165 }
167 if (_sw.succ_count + _sw.failed_count < FLAGS_auto_cl_min_sample_count) {
168 if (sampling_time_us - _sw.start_time_us >=
169 FLAGS_auto_cl_sample_window_size_ms * 1000) {
170 // If the sample size is insufficient at the end of the sampling
171 // window, discard the entire sampling window
172 ResetSampleWindow(sampling_time_us);
173 }
174 return false;
175 }
176 if (sampling_time_us - _sw.start_time_us <
177 FLAGS_auto_cl_sample_window_size_ms * 1000 &&
178 _sw.succ_count + _sw.failed_count < FLAGS_auto_cl_max_sample_count) {
179 return false;
180 }
181
182 if(_sw.succ_count > 0) {
183 UpdateMaxConcurrency(sampling_time_us);
184 } else {
185 // All request failed
186 _max_concurrency /= 2;
187 }
188 ResetSampleWindow(sampling_time_us);
189 return true;
190 }
其中_sw為采樣窗口數(shù)據(jù)統(tǒng)計谎亩,每一個auto_cl_sample_window_size_ms周期內(nèi),對于處理失敗的情況且有懲罰值則會統(tǒng)計總失敗個數(shù)和total_failed_us和宇姚。auto_cl_sample_window_size_ms為一次采樣的周期時間匈庭。總請求個數(shù)小于auto_cl_min_sample_count時浑劳,如果超過采樣周期則重置_sw阱持,否則繼續(xù)統(tǒng)計,之后要么超過采樣周期魔熏,要么達(dá)到auto_cl_max_sample_count最大請求個數(shù)衷咽,窗口中積累足夠多的樣本數(shù)據(jù)鸽扁,那么本次采樣結(jié)束,此時會更新_max_concurrency镶骗,對于所有請求都失敗的情況直接把_max_concurrency降為一半桶现。否則會UpdateMaxConcurrency:
219 void AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
220 int32_t total_succ_req = _total_succ_req.load(butil::memory_order_relaxed);
221 double failed_punish = _sw.total_failed_us * FLAGS_auto_cl_fail_punish_ratio;
222 int64_t avg_latency =
223 std::ceil((failed_punish + _sw.total_succ_us) / _sw.succ_count);
224 double qps = 1000000.0 * total_succ_req / (sampling_time_us - _sw.start_time_us);
225 UpdateMinLatency(avg_latency);
226 UpdateQps(qps);
227
228 int next_max_concurrency = 0;
229 // Remeasure min_latency at regular intervals
230 if (_remeasure_start_us <= sampling_time_us) {
231 const double reduce_ratio = FLAGS_auto_cl_reduce_ratio_while_remeasure;
232 _reset_latency_us = sampling_time_us + avg_latency * 2;
233 next_max_concurrency =
234 std::ceil(_ema_max_qps * _min_latency_us / 1000000 * reduce_ratio);
235 } else {
236 const double change_step = FLAGS_auto_cl_change_rate_of_explore_ratio;
237 const double max_explore_ratio = FLAGS_auto_cl_max_explore_ratio;
238 const double min_explore_ratio = FLAGS_auto_cl_min_explore_ratio;
239 const double correction_factor = FLAGS_auto_cl_latency_fluctuation_correction_factor;
240 if (avg_latency <= _min_latency_us * (1.0 + min_explore_ratio * correction_factor) ||
241 qps <= _ema_max_qps / (1.0 + min_explore_ratio)) {
242 _explore_ratio = std::min(max_explore_ratio, _explore_ratio + change_step);
243 } else {
244 _explore_ratio = std::max(min_explore_ratio, _explore_ratio - change_step);
245 }
246 next_max_concurrency =
247 _min_latency_us * _ema_max_qps / 1000000 * (1 + _explore_ratio);
248 }
249
250 if (next_max_concurrency != _max_concurrency) {
251 _max_concurrency = next_max_concurrency;
252 }
253 }
201 void AutoConcurrencyLimiter::UpdateMinLatency(int64_t latency_us) {
202 const double ema_factor = FLAGS_auto_cl_alpha_factor_for_ema;
203 if (_min_latency_us <= 0) {
204 _min_latency_us = latency_us;
205 } else if (latency_us < _min_latency_us) {
206 _min_latency_us = latency_us * ema_factor + _min_latency_us * (1 - ema_factor);
207 }
208 }
210 void AutoConcurrencyLimiter::UpdateQps(double qps) {
211 const double ema_factor = FLAGS_auto_cl_alpha_factor_for_ema / 10;
212 if (qps >= _ema_max_qps) {
213 _ema_max_qps = qps;
214 } else {
215 _ema_max_qps = qps * ema_factor + _ema_max_qps * (1 - ema_factor);
216 }
217 }
avg_latency為該采樣窗口內(nèi),處理失敗請求所花時間總和鼎姊,乘以懲罰系數(shù)的時間骡和,加上處理成功請求所花的總時間,除以成功請求總個數(shù)相寇。
在UpdateMinLatency中慰于,為了減少個別窗口的抖動對限流算法的影響,同時盡量降低計算開銷唤衫,計算min_latency時會通過使用EMA來進行平滑處理婆赠,同理UpdateQps。
引用:自適應(yīng)限流
假如從啟動到打滿qps的時間過長佳励,這期間會損失大量流量休里。在這里我們采取的措施有兩個:
1采樣方面,一旦采到的請求數(shù)量足夠多赃承,直接提交當(dāng)前采樣窗口妙黍,而不是等待采樣窗口的到時間了才提交
2計算公式方面,當(dāng)current_qps > 保存的max_qps時楣导,直接進行更新废境,不進行平滑處理
在進行了這兩個處理之后,絕大部分情況下都能夠在2秒左右將qps打滿筒繁。
_max_concurrency的有效時間由_reset_latency_us決定噩凹,即以當(dāng)前時間延后avg_latency的兩倍。在這段時間內(nèi)的樣本數(shù)據(jù)不采集直到超時重新計算_min_latency_us和_max_concurrency毡咏。_max_concurrency會在每個采樣窗口周期內(nèi)進行更新驮宴,_min_latency_us和_ema_max_qps進行平滑處理。若_remeasure_start_us超時則:
233 next_max_concurrency = std::ceil(_ema_max_qps * _min_latency_us / 1000000 * reduce_ratio);
否則可能要以一定的比例系數(shù)增大或減小_max_concurrency呕缭,根據(jù)avg_latency和qps:
240 if (avg_latency <= _min_latency_us * (1.0 + min_explore_ratio * correction_factor) ||
241 qps <= _ema_max_qps / (1.0 + min_explore_ratio)) {
242 _explore_ratio = std::min(max_explore_ratio, _explore_ratio + change_step);
243 } else {
244 _explore_ratio = std::max(min_explore_ratio, _explore_ratio - change_step);
245 }
246 next_max_concurrency =
247 _min_latency_us * _ema_max_qps / 1000000 * (1 + _explore_ratio);
當(dāng)latency與min_latency很接近時堵泽,根據(jù)計算公式會得到一個較高max_concurrency來適應(yīng)concurrency的波動,從而盡可能的減少“誤殺”恢总。同時迎罗,隨著latency的升高,max_concurrency會逐漸降低片仿,以保護服務(wù)不會過載纹安。
max_concurrency = max_qps * ((2+alpha) * min_latency - latency)
這塊代碼其實很好明白,但需要涉及過多的理論基礎(chǔ)才能充分理解為何這么設(shè)計,以及各比例參數(shù)的確定厢岂。