(27)存儲(chǔ)線程池及消息隊(duì)列(Reporter部分)-【Lars-基于C++負(fù)載均衡遠(yuǎn)程服務(wù)器調(diào)度系統(tǒng)教程】

【Lars教程目錄】

Lars源代碼
https://github.com/aceld/Lars


【Lars系統(tǒng)概述】
第1章-概述
第2章-項(xiàng)目目錄構(gòu)建


【Lars系統(tǒng)之Reactor模型服務(wù)器框架模塊】
第1章-項(xiàng)目結(jié)構(gòu)與V0.1雛形
第2章-內(nèi)存管理與Buffer封裝
第3章-事件觸發(fā)EventLoop
第4章-鏈接與消息封裝
第5章-Client客戶端模型
第6章-連接管理及限制
第7章-消息業(yè)務(wù)路由分發(fā)機(jī)制
第8章-鏈接創(chuàng)建/銷毀Hook機(jī)制
第9章-消息任務(wù)隊(duì)列與線程池
第10章-配置文件讀寫(xiě)功能
第11章-udp服務(wù)與客戶端
第12章-數(shù)據(jù)傳輸協(xié)議protocol buffer
第13章-QPS性能測(cè)試
第14章-異步消息任務(wù)機(jī)制
第15章-鏈接屬性設(shè)置功能


【Lars系統(tǒng)之DNSService模塊】
第1章-Lars-dns簡(jiǎn)介
第2章-數(shù)據(jù)庫(kù)創(chuàng)建
第3章-項(xiàng)目目錄結(jié)構(gòu)及環(huán)境構(gòu)建
第4章-Route結(jié)構(gòu)的定義
第5章-獲取Route信息
第6章-Route訂閱模式
第7章-Backend Thread實(shí)時(shí)監(jiān)控


【Lars系統(tǒng)之Report Service模塊】
第1章-項(xiàng)目概述-數(shù)據(jù)表及proto3協(xié)議定義
第2章-獲取report上報(bào)數(shù)據(jù)
第3章-存儲(chǔ)線程池及消息隊(duì)列


【Lars系統(tǒng)之LoadBalance Agent模塊】
第1章-項(xiàng)目概述及構(gòu)建
第2章-主模塊業(yè)務(wù)結(jié)構(gòu)搭建
第3章-Report與Dns Client設(shè)計(jì)與實(shí)現(xiàn)
第4章-負(fù)載均衡模塊基礎(chǔ)設(shè)計(jì)
第5章-負(fù)載均衡獲取Host主機(jī)信息API
第6章-負(fù)載均衡上報(bào)Host主機(jī)信息API
第7章-過(guò)期窗口清理與過(guò)載超時(shí)(V0.5)
第8章-定期拉取最新路由信息(V0.6)
第9章-負(fù)載均衡獲取Route信息API(0.7)
第10章-API初始化接口(V0.8)
第11章-Lars Agent性能測(cè)試工具
第12章- Lars啟動(dòng)工具腳本


5) 存儲(chǔ)線程池及消息隊(duì)列

? 我們現(xiàn)在的reporter_service的io入庫(kù)操作莉御,完全是在消息的callback中進(jìn)行的,那么實(shí)際上,這回占用我們server的工作線程的阻塞時(shí)間裤唠,從而浪費(fèi)cpu雏亚。所以我們應(yīng)該將io的入庫(kù)操作贬蛙,交給一個(gè)專門做入庫(kù)的消息隊(duì)列線程池來(lái)做锦亦,這樣我們的callback就會(huì)立刻返回該業(yè)務(wù)马绝,從而可以繼續(xù)處理下一個(gè)conn鏈接的消息事件業(yè)務(wù)择示。

? 所以我們就要在此給reporter_service設(shè)計(jì)一個(gè)存儲(chǔ)數(shù)據(jù)的線程池及配套的消息隊(duì)列妒牙。當(dāng)然這里面我們還是直接用寫(xiě)好的lars_reactor框架里的接口即可。

lars_reporter/src/reporter_service.cpp

#include "lars_reactor.h"
#include "lars.pb.h"
#include "store_report.h"
#include <string>

thread_queue<lars::ReportStatusRequest> **reportQueues = NULL;
int thread_cnt = 0;

void get_report_status(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
    lars::ReportStatusRequest req;

    req.ParseFromArray(data, len);

    //將上報(bào)數(shù)據(jù)存儲(chǔ)到db 
    StoreReport sr;
    sr.store(req);

    //輪詢將消息平均發(fā)送到每個(gè)線程的消息隊(duì)列中
    static int index = 0;
    //將消息發(fā)送給某個(gè)線程消息隊(duì)列
    reportQueues[index]->send(req);
    index ++;
    index = index % thread_cnt;
}

void create_reportdb_threads()
{
    thread_cnt = config_file::instance()->GetNumber("reporter", "db_thread_cnt", 3);
    
    //開(kāi)線程池的消息隊(duì)列
    reportQueues = new thread_queue<lars::ReportStatusRequest>*[thread_cnt];

    if (reportQueues == NULL) {
        fprintf(stderr, "create thread_queue<lars::ReportStatusRequest>*[%d], error", thread_cnt) ;
        exit(1);
    }

    for (int i = 0; i < thread_cnt; i++) {
        //給當(dāng)前線程創(chuàng)建一個(gè)消息隊(duì)列queue
        reportQueues[i] = new thread_queue<lars::ReportStatusRequest>();
        if (reportQueues == NULL) {
            fprintf(stderr, "create thread_queue error\n");
            exit(1);
        }

        pthread_t tid;
        int ret = pthread_create(&tid, NULL, store_main, reportQueues[i]);
        if (ret == -1)  {
            perror("pthread_create");
            exit(1);
        }

        pthread_detach(tid);
    }
}


int main(int argc, char **argv)
{
    event_loop loop;

    //加載配置文件
    config_file::setPath("./conf/lars_reporter.conf");
    std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
    short port = config_file::instance()->GetNumber("reactor", "port", 7779);


    //創(chuàng)建tcp server
    tcp_server server(&loop, ip.c_str(), port);

    //添加數(shù)據(jù)上報(bào)請(qǐng)求處理的消息分發(fā)處理業(yè)務(wù)
    server.add_msg_router(lars::ID_ReportStatusRequest, get_report_status);

    //為了防止在業(yè)務(wù)中出現(xiàn)io阻塞对妄,那么需要啟動(dòng)一個(gè)線程池對(duì)IO進(jìn)行操作的湘今,接受業(yè)務(wù)的請(qǐng)求存儲(chǔ)消息
    create_reportdb_threads();
  
    //啟動(dòng)事件監(jiān)聽(tīng)
    loop.event_process(); 

    return 0;
}

? 這里主線程啟動(dòng)了線程池,根據(jù)配置文件的db_thread_cnt數(shù)量來(lái)開(kāi)辟剪菱。每個(gè)線程都會(huì)執(zhí)行store_main方法摩瞎,我們來(lái)看一下實(shí)現(xiàn)

lars_reporter/src/store_thread.cpp

#include "lars.pb.h"
#include "lars_reactor.h"
#include "store_report.h"

struct Args 
{
    thread_queue<lars::ReportStatusRequest>* first;
    StoreReport *second;
};

//typedef void io_callback(event_loop *loop, int fd, void *args);
void thread_report(event_loop *loop, int fd, void *args)
{
    //1. 從queue里面取出需要report的數(shù)據(jù)(需要thread_queue)
    thread_queue<lars::ReportStatusRequest>* queue = ((Args*)args)->first;
    StoreReport *sr = ((Args*)args)->second;

    std::queue<lars::ReportStatusRequest> report_msgs;

    //1.1 從消息隊(duì)列中取出全部的消息元素集合
    queue->recv(report_msgs);
    while ( !report_msgs.empty() ) {
        lars::ReportStatusRequest msg = report_msgs.front();
        report_msgs.pop();

        //2. 將數(shù)據(jù)存儲(chǔ)到DB中(需要StoreReport)
        sr->store(msg);
    }
}


void *store_main(void *args)
{
    //得到對(duì)應(yīng)的thread_queue
    thread_queue<lars::ReportStatusRequest> *queue = (thread_queue<lars::ReportStatusRequest>*)args;

    //定義事件觸發(fā)機(jī)制
    event_loop loop;

    //定義一個(gè)存儲(chǔ)對(duì)象
    StoreReport sr; 

    Args callback_args;
    callback_args.first = queue;
    callback_args.second = &sr;

    queue->set_loop(&loop);
    queue->set_callback(thread_report, &callback_args);


    //啟動(dòng)事件監(jiān)聽(tīng)
    loop.event_process();

    return NULL;
}

? 每個(gè)線程都會(huì)綁定一個(gè)thread_queue<lars::ReportStatusRequest>,然后一個(gè)線程里面有一個(gè)loop,來(lái)監(jiān)控消息隊(duì)列是否有消息事件過(guò)來(lái),如果有消息實(shí)現(xiàn)過(guò)來(lái)孝常,針對(duì)每個(gè)消息會(huì)觸發(fā)thread_report()方法旗们, 在thread_report()中,我們就直接將lars::ReportStatusRequest消息存儲(chǔ)到db中构灸。

? 那么上渴,由誰(shuí)來(lái)給每個(gè)線程的thread_queue發(fā)送消息呢,就是agent/客戶端發(fā)送的請(qǐng)求,我們?cè)谔幚?code>lars::ID_ReportStatusRequest 消息分發(fā)業(yè)務(wù)的時(shí)候調(diào)用get_report_status()來(lái)觸發(fā)稠氮。

lars_reporter/src/reporter_service.cpp

void get_report_status(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
    lars::ReportStatusRequest req;

    req.ParseFromArray(data, len);

    //將上報(bào)數(shù)據(jù)存儲(chǔ)到db 
    StoreReport sr;
    sr.store(req);

    //輪詢將消息平均發(fā)送到每個(gè)線程的消息隊(duì)列中
    static int index = 0;
    //將消息發(fā)送給某個(gè)線程消息隊(duì)列
    reportQueues[index]->send(req);
    index ++;
    index = index % thread_cnt;
}

? 這里的分發(fā)機(jī)制曹阔,是采用最輪詢的方式,是每個(gè)線程依次分配隔披,去調(diào)用thread_queuesend()方法赃份,將消息發(fā)送給消息隊(duì)列。

? 最后我們進(jìn)行測(cè)試奢米,效果跟之前的效果是一樣的抓韩。我們現(xiàn)在已經(jīng)集成進(jìn)來(lái)了存儲(chǔ)線程池,現(xiàn)在就不用擔(dān)心在處理業(yè)務(wù)的時(shí)候鬓长,因?yàn)镈B等的io阻塞谒拴,使cpu得不到充分利用了。


關(guān)于作者:

作者:Aceld(劉丹冰)

mail: danbing.at@gmail.com
github: https://github.com/aceld
原創(chuàng)書(shū)籍gitbook: http://legacy.gitbook.com/@aceld

原創(chuàng)聲明:未經(jīng)作者允許請(qǐng)勿轉(zhuǎn)載, 如果轉(zhuǎn)載請(qǐng)注明出處

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末涉波,一起剝皮案震驚了整個(gè)濱河市英上,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌怠蹂,老刑警劉巖善延,帶你破解...
    沈念sama閱讀 218,036評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異城侧,居然都是意外死亡易遣,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,046評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門嫌佑,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)豆茫,“玉大人,你說(shuō)我怎么就攤上這事屋摇】辏” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,411評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵炮温,是天一觀的道長(zhǎng)火脉。 經(jīng)常有香客問(wèn)我,道長(zhǎng)柒啤,這世上最難降的妖魔是什么倦挂? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,622評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮担巩,結(jié)果婚禮上方援,老公的妹妹穿的比我還像新娘。我一直安慰自己涛癌,他們只是感情好犯戏,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,661評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布送火。 她就那樣靜靜地躺著,像睡著了一般先匪。 火紅的嫁衣襯著肌膚如雪种吸。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,521評(píng)論 1 304
  • 那天胚鸯,我揣著相機(jī)與錄音骨稿,去河邊找鬼笨鸡。 笑死姜钳,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的形耗。 我是一名探鬼主播哥桥,決...
    沈念sama閱讀 40,288評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼激涤!你這毒婦竟也來(lái)了拟糕?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,200評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤倦踢,失蹤者是張志新(化名)和其女友劉穎送滞,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體辱挥,經(jīng)...
    沈念sama閱讀 45,644評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡犁嗅,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,837評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了晤碘。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片褂微。...
    茶點(diǎn)故事閱讀 39,953評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖园爷,靈堂內(nèi)的尸體忽然破棺而出宠蚂,到底是詐尸還是另有隱情,我是刑警寧澤童社,帶...
    沈念sama閱讀 35,673評(píng)論 5 346
  • 正文 年R本政府宣布求厕,位于F島的核電站,受9級(jí)特大地震影響扰楼,放射性物質(zhì)發(fā)生泄漏呀癣。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,281評(píng)論 3 329
  • 文/蒙蒙 一灭抑、第九天 我趴在偏房一處隱蔽的房頂上張望十艾。 院中可真熱鬧,春花似錦腾节、人聲如沸忘嫉。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,889評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)庆冕。三九已至康吵,卻和暖如春央渣,著一層夾襖步出監(jiān)牢的瞬間入录,已是汗流浹背鞍时。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,011評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工父叙, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留懈涛,地道東北人晦毙。 一個(gè)月前我還...
    沈念sama閱讀 48,119評(píng)論 3 370
  • 正文 我出身青樓恋拷,卻偏偏與公主長(zhǎng)得像准给,于是被迫代替她去往敵國(guó)和親响巢。 傳聞我的和親對(duì)象是個(gè)殘疾皇子描滔,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,901評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容