1堡纬、注冊并發(fā)設置
GlobalInitializeOrDieImpl中
注冊名稱和對于的并發(fā)限制類携悯。
// Concurrency Limiters
ConcurrencyLimiterExtension()->RegisterOrDie("auto", &g_ext->auto_cl);
ConcurrencyLimiterExtension()->RegisterOrDie("constant", &g_ext->constant_cl);
2缔恳、設置并發(fā)使用的類
StartInternal中
for (MethodMap::iterator it = _method_map.begin();
it != _method_map.end(); ++it) {
if (it->second.is_builtin_service) {
it->second.status->SetConcurrencyLimiter(NULL);
} else {
const AdaptiveMaxConcurrency* amc = &it->second.max_concurrency;
if (amc->type() == AdaptiveMaxConcurrency::UNLIMITED()) {
amc = &_options.method_max_concurrency;
}
ConcurrencyLimiter* cl = NULL;
if (!CreateConcurrencyLimiter(*amc, &cl)) { 【1】獲取并發(fā)限制類
LOG(ERROR) << "Fail to create ConcurrencyLimiter for method";
return -1;
}
it->second.status->SetConcurrencyLimiter(cl); 【2】設置并發(fā)限制類
}
}
CreateConcurrencyLimiter實現(xiàn):
static bool CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc,
ConcurrencyLimiter** out) {
if (amc.type() == AdaptiveMaxConcurrency::UNLIMITED()) {
*out = NULL;
return true;
}
const ConcurrencyLimiter* cl =
ConcurrencyLimiterExtension()->Find(amc.type().c_str()); 【3】根據(jù)名稱獲取并發(fā)配置
if (cl == NULL) {
LOG(ERROR) << "Fail to find ConcurrencyLimiter by `" << amc.value() << "'";
return false;
}
ConcurrencyLimiter* cl_copy = cl->New(amc);
if (cl_copy == NULL) {
LOG(ERROR) << "Fail to new ConcurrencyLimiter";
return false;
}
*out = cl_copy;
return true;
}
3仰冠、默認協(xié)議baidu_std中自適應并發(fā)控制的更新
SendRpcResponse中
ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);
~ConcurrencyRemover調用OnResponded
ConcurrencyRemover::~ConcurrencyRemover() {
if (_status) {
_status->OnResponded(_c->ErrorCode(), butil::cpuwide_time_us() - _received_us);
_status = NULL;
}
ServerPrivateAccessor(_c->server()).RemoveConcurrency(_c);
}
OnResponded中調用 AutoConcurrencyLimiter::OnResponded
inline void MethodStatus::OnResponded(int error_code, int64_t latency) {
_nconcurrency.fetch_sub(1, butil::memory_order_relaxed);
if (0 == error_code) {
_latency_rec << latency;
} else {
_nerror_bvar << 1;
}
if (NULL != _cl) {
_cl->OnResponded(error_code, latency);
}
}
AutoConcurrencyLimiter::OnResponded調用AddSample
void AutoConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) {
...
bool sample_window_submitted = AddSample(error_code, latency_us,
now_time_us);
...
}
AddSample中調用UpdateMaxConcurrency
UpdateMaxConcurrency(sampling_time_us);
UpdateMaxConcurrency實現(xiàn)
本次分析調用關系蜕依,這里算法和說明文檔的算法對于關系典勇,暫時沒有仔細分析劫哼。
void AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
int32_t total_succ_req = _total_succ_req.load(butil::memory_order_relaxed);
double failed_punish = _sw.total_failed_us * FLAGS_auto_cl_fail_punish_ratio;
int64_t avg_latency =
std::ceil((failed_punish + _sw.total_succ_us) / _sw.succ_count);
double qps = 1000000.0 * total_succ_req / (sampling_time_us - _sw.start_time_us);
UpdateMinLatency(avg_latency);
UpdateQps(qps);
int next_max_concurrency = 0;
// Remeasure min_latency at regular intervals
if (_remeasure_start_us <= sampling_time_us) {
const double reduce_ratio = FLAGS_auto_cl_reduce_ratio_while_remeasure;
_reset_latency_us = sampling_time_us + avg_latency * 2;
next_max_concurrency =
std::ceil(_ema_max_qps * _min_latency_us / 1000000 * reduce_ratio);
} else {
const double change_step = FLAGS_auto_cl_change_rate_of_explore_ratio;
const double max_explore_ratio = FLAGS_auto_cl_max_explore_ratio;
const double min_explore_ratio = FLAGS_auto_cl_min_explore_ratio;
const double correction_factor = FLAGS_auto_cl_latency_fluctuation_correction_factor;
if (avg_latency <= _min_latency_us * (1.0 + min_explore_ratio * correction_factor) ||
qps <= _ema_max_qps / (1.0 + min_explore_ratio)) {
_explore_ratio = std::min(max_explore_ratio, _explore_ratio + change_step);
} else {
_explore_ratio = std::max(min_explore_ratio, _explore_ratio - change_step);
}
next_max_concurrency =
_min_latency_us * _ema_max_qps / 1000000 * (1 + _explore_ratio);
}
if (next_max_concurrency != _max_concurrency) {
_max_concurrency = next_max_concurrency;
}
}
4、默認協(xié)議中自適應并發(fā)的使用
如果并發(fā)超過設置_max_concurrency拒絕當前請求割笙。
ProcessRpcRequest中
if (!method_status->OnRequested(&rejected_cc)) {
cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
mp->method->full_name().c_str(), rejected_cc);
break;
}
OnRequested實現(xiàn)
bool AutoConcurrencyLimiter::OnRequested(int current_concurrency) {
return current_concurrency <= _max_concurrency;
}