brpc 自適應限流分析

1堡纬、注冊并發(fā)設置

GlobalInitializeOrDieImpl中

注冊名稱和對于的并發(fā)限制類携悯。

    // Concurrency Limiters
    ConcurrencyLimiterExtension()->RegisterOrDie("auto", &g_ext->auto_cl);
    ConcurrencyLimiterExtension()->RegisterOrDie("constant", &g_ext->constant_cl);

2缔恳、設置并發(fā)使用的類

StartInternal中
    for (MethodMap::iterator it = _method_map.begin();
        it != _method_map.end(); ++it) {
        if (it->second.is_builtin_service) {
            it->second.status->SetConcurrencyLimiter(NULL);
        } else {
            const AdaptiveMaxConcurrency* amc = &it->second.max_concurrency;
            if (amc->type() == AdaptiveMaxConcurrency::UNLIMITED()) {
                amc = &_options.method_max_concurrency;
            }
            ConcurrencyLimiter* cl = NULL;
            if (!CreateConcurrencyLimiter(*amc, &cl)) {    【1】獲取并發(fā)限制類
                LOG(ERROR) << "Fail to create ConcurrencyLimiter for method";
                return -1;
            }
            it->second.status->SetConcurrencyLimiter(cl);  【2】設置并發(fā)限制類
        }
    }
CreateConcurrencyLimiter實現(xiàn):
static bool CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc,
                                     ConcurrencyLimiter** out) {
    if (amc.type() == AdaptiveMaxConcurrency::UNLIMITED()) {
        *out = NULL;
        return true;
    }
    const ConcurrencyLimiter* cl =
        ConcurrencyLimiterExtension()->Find(amc.type().c_str()); 【3】根據(jù)名稱獲取并發(fā)配置
    if (cl == NULL) {
        LOG(ERROR) << "Fail to find ConcurrencyLimiter by `" << amc.value() << "'";
        return false;
    }
    ConcurrencyLimiter* cl_copy = cl->New(amc);
    if (cl_copy == NULL) {
        LOG(ERROR) << "Fail to new ConcurrencyLimiter";
        return false;
    }
    *out = cl_copy;
    return true;
}

3仰冠、默認協(xié)議baidu_std中自適應并發(fā)控制的更新

SendRpcResponse中

ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);

~ConcurrencyRemover調用OnResponded

ConcurrencyRemover::~ConcurrencyRemover() {
    if (_status) {
        _status->OnResponded(_c->ErrorCode(), butil::cpuwide_time_us() - _received_us);
        _status = NULL;
    }
    ServerPrivateAccessor(_c->server()).RemoveConcurrency(_c);
}

OnResponded中調用 AutoConcurrencyLimiter::OnResponded

inline void MethodStatus::OnResponded(int error_code, int64_t latency) {
    _nconcurrency.fetch_sub(1, butil::memory_order_relaxed);
    if (0 == error_code) {
        _latency_rec << latency;
    } else {
        _nerror_bvar << 1;
    }
    if (NULL != _cl) {
        _cl->OnResponded(error_code, latency);
    }
}

AutoConcurrencyLimiter::OnResponded調用AddSample

void AutoConcurrencyLimiter::OnResponded(int error_code, int64_t latency_us) {
...
            bool sample_window_submitted = AddSample(error_code, latency_us, 
                                                     now_time_us);
...
}

AddSample中調用UpdateMaxConcurrency

UpdateMaxConcurrency(sampling_time_us);

UpdateMaxConcurrency實現(xiàn)
本次分析調用關系蜕依,這里算法和說明文檔的算法對于關系典勇,暫時沒有仔細分析劫哼。

void AutoConcurrencyLimiter::UpdateMaxConcurrency(int64_t sampling_time_us) {
    int32_t total_succ_req = _total_succ_req.load(butil::memory_order_relaxed);
    double failed_punish = _sw.total_failed_us * FLAGS_auto_cl_fail_punish_ratio;
    int64_t avg_latency = 
        std::ceil((failed_punish + _sw.total_succ_us) / _sw.succ_count);
    double qps = 1000000.0 * total_succ_req / (sampling_time_us - _sw.start_time_us);
    UpdateMinLatency(avg_latency);
    UpdateQps(qps);

    int next_max_concurrency = 0;
    // Remeasure min_latency at regular intervals
    if (_remeasure_start_us <= sampling_time_us) {
        const double reduce_ratio = FLAGS_auto_cl_reduce_ratio_while_remeasure;
        _reset_latency_us = sampling_time_us + avg_latency * 2;
        next_max_concurrency = 
            std::ceil(_ema_max_qps * _min_latency_us / 1000000 * reduce_ratio);
    } else {
        const double change_step = FLAGS_auto_cl_change_rate_of_explore_ratio;
        const double max_explore_ratio = FLAGS_auto_cl_max_explore_ratio;
        const double min_explore_ratio = FLAGS_auto_cl_min_explore_ratio;
        const double correction_factor = FLAGS_auto_cl_latency_fluctuation_correction_factor;
        if (avg_latency <= _min_latency_us * (1.0 + min_explore_ratio * correction_factor) || 
            qps <= _ema_max_qps / (1.0 + min_explore_ratio)) {
            _explore_ratio  = std::min(max_explore_ratio, _explore_ratio + change_step); 
        } else {
            _explore_ratio = std::max(min_explore_ratio, _explore_ratio - change_step);
        }
        next_max_concurrency = 
            _min_latency_us * _ema_max_qps / 1000000 *  (1 + _explore_ratio);
    }

    if (next_max_concurrency != _max_concurrency) {
        _max_concurrency = next_max_concurrency;
    }
}

4、默認協(xié)議中自適應并發(fā)的使用

如果并發(fā)超過設置_max_concurrency拒絕當前請求割笙。

ProcessRpcRequest中

            if (!method_status->OnRequested(&rejected_cc)) {
                cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
                                mp->method->full_name().c_str(), rejected_cc);
                break;
            }

OnRequested實現(xiàn)

bool AutoConcurrencyLimiter::OnRequested(int current_concurrency) {
    return current_concurrency <= _max_concurrency;
}
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末权烧,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子伤溉,更是在濱河造成了極大的恐慌般码,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,311評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件乱顾,死亡現(xiàn)場離奇詭異板祝,居然都是意外死亡,警方通過查閱死者的電腦和手機走净,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評論 2 382
  • 文/潘曉璐 我一進店門券时,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人伏伯,你說我怎么就攤上這事橘洞。” “怎么了舵鳞?”我有些...
    開封第一講書人閱讀 152,671評論 0 342
  • 文/不壞的土叔 我叫張陵震檩,是天一觀的道長。 經常有香客問我,道長抛虏,這世上最難降的妖魔是什么博其? 我笑而不...
    開封第一講書人閱讀 55,252評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮迂猴,結果婚禮上慕淡,老公的妹妹穿的比我還像新娘。我一直安慰自己沸毁,他們只是感情好峰髓,可當我...
    茶點故事閱讀 64,253評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著息尺,像睡著了一般携兵。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上搂誉,一...
    開封第一講書人閱讀 49,031評論 1 285
  • 那天徐紧,我揣著相機與錄音,去河邊找鬼炭懊。 笑死并级,一個胖子當著我的面吹牛,可吹牛的內容都是我干的侮腹。 我是一名探鬼主播嘲碧,決...
    沈念sama閱讀 38,340評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼父阻!你這毒婦竟也來了愈涩?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 36,973評論 0 259
  • 序言:老撾萬榮一對情侶失蹤至非,失蹤者是張志新(化名)和其女友劉穎钠署,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體荒椭,經...
    沈念sama閱讀 43,466評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡谐鼎,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 35,937評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了趣惠。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片狸棍。...
    茶點故事閱讀 38,039評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖味悄,靈堂內的尸體忽然破棺而出草戈,到底是詐尸還是另有隱情,我是刑警寧澤侍瑟,帶...
    沈念sama閱讀 33,701評論 4 323
  • 正文 年R本政府宣布唐片,位于F島的核電站丙猬,受9級特大地震影響,放射性物質發(fā)生泄漏费韭。R本人自食惡果不足惜茧球,卻給世界環(huán)境...
    茶點故事閱讀 39,254評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望星持。 院中可真熱鬧抢埋,春花似錦、人聲如沸督暂。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽逻翁。三九已至饥努,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間卢未,已是汗流浹背肪凛。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留辽社,地道東北人。 一個月前我還...
    沈念sama閱讀 45,497評論 2 354
  • 正文 我出身青樓翘鸭,卻偏偏與公主長得像滴铅,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子就乓,可洞房花燭夜當晚...
    茶點故事閱讀 42,786評論 2 345