【Lars教程目錄】
Lars源代碼
https://github.com/aceld/Lars
【Lars系統(tǒng)概述】
第1章-概述
第2章-項(xiàng)目目錄構(gòu)建
【Lars系統(tǒng)之Reactor模型服務(wù)器框架模塊】
第1章-項(xiàng)目結(jié)構(gòu)與V0.1雛形
第2章-內(nèi)存管理與Buffer封裝
第3章-事件觸發(fā)EventLoop
第4章-鏈接與消息封裝
第5章-Client客戶端模型
第6章-連接管理及限制
第7章-消息業(yè)務(wù)路由分發(fā)機(jī)制
第8章-鏈接創(chuàng)建/銷毀Hook機(jī)制
第9章-消息任務(wù)隊(duì)列與線程池
第10章-配置文件讀寫(xiě)功能
第11章-udp服務(wù)與客戶端
第12章-數(shù)據(jù)傳輸協(xié)議protocol buffer
第13章-QPS性能測(cè)試
第14章-異步消息任務(wù)機(jī)制
第15章-鏈接屬性設(shè)置功能
【Lars系統(tǒng)之DNSService模塊】
第1章-Lars-dns簡(jiǎn)介
第2章-數(shù)據(jù)庫(kù)創(chuàng)建
第3章-項(xiàng)目目錄結(jié)構(gòu)及環(huán)境構(gòu)建
第4章-Route結(jié)構(gòu)的定義
第5章-獲取Route信息
第6章-Route訂閱模式
第7章-Backend Thread實(shí)時(shí)監(jiān)控
【Lars系統(tǒng)之Report Service模塊】
第1章-項(xiàng)目概述-數(shù)據(jù)表及proto3協(xié)議定義
第2章-獲取report上報(bào)數(shù)據(jù)
第3章-存儲(chǔ)線程池及消息隊(duì)列
【Lars系統(tǒng)之LoadBalance Agent模塊】
第1章-項(xiàng)目概述及構(gòu)建
第2章-主模塊業(yè)務(wù)結(jié)構(gòu)搭建
第3章-Report與Dns Client設(shè)計(jì)與實(shí)現(xiàn)
第4章-負(fù)載均衡模塊基礎(chǔ)設(shè)計(jì)
第5章-負(fù)載均衡獲取Host主機(jī)信息API
第6章-負(fù)載均衡上報(bào)Host主機(jī)信息API
第7章-過(guò)期窗口清理與過(guò)載超時(shí)(V0.5)
第8章-定期拉取最新路由信息(V0.6)
第9章-負(fù)載均衡獲取Route信息API(0.7)
第10章-API初始化接口(V0.8)
第11章-Lars Agent性能測(cè)試工具
第12章- Lars啟動(dòng)工具腳本
5) 存儲(chǔ)線程池及消息隊(duì)列
? 我們現(xiàn)在的reporter_service的io入庫(kù)操作莉御,完全是在消息的callback中進(jìn)行的,那么實(shí)際上,這回占用我們server的工作線程的阻塞時(shí)間裤唠,從而浪費(fèi)cpu雏亚。所以我們應(yīng)該將io的入庫(kù)操作贬蛙,交給一個(gè)專門做入庫(kù)的消息隊(duì)列線程池來(lái)做锦亦,這樣我們的callback就會(huì)立刻返回該業(yè)務(wù)马绝,從而可以繼續(xù)處理下一個(gè)conn鏈接的消息事件業(yè)務(wù)择示。
? 所以我們就要在此給reporter_service設(shè)計(jì)一個(gè)存儲(chǔ)數(shù)據(jù)的線程池及配套的消息隊(duì)列妒牙。當(dāng)然這里面我們還是直接用寫(xiě)好的lars_reactor
框架里的接口即可。
lars_reporter/src/reporter_service.cpp
#include "lars_reactor.h"
#include "lars.pb.h"
#include "store_report.h"
#include <string>
thread_queue<lars::ReportStatusRequest> **reportQueues = NULL;
int thread_cnt = 0;
void get_report_status(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
lars::ReportStatusRequest req;
req.ParseFromArray(data, len);
//將上報(bào)數(shù)據(jù)存儲(chǔ)到db
StoreReport sr;
sr.store(req);
//輪詢將消息平均發(fā)送到每個(gè)線程的消息隊(duì)列中
static int index = 0;
//將消息發(fā)送給某個(gè)線程消息隊(duì)列
reportQueues[index]->send(req);
index ++;
index = index % thread_cnt;
}
void create_reportdb_threads()
{
thread_cnt = config_file::instance()->GetNumber("reporter", "db_thread_cnt", 3);
//開(kāi)線程池的消息隊(duì)列
reportQueues = new thread_queue<lars::ReportStatusRequest>*[thread_cnt];
if (reportQueues == NULL) {
fprintf(stderr, "create thread_queue<lars::ReportStatusRequest>*[%d], error", thread_cnt) ;
exit(1);
}
for (int i = 0; i < thread_cnt; i++) {
//給當(dāng)前線程創(chuàng)建一個(gè)消息隊(duì)列queue
reportQueues[i] = new thread_queue<lars::ReportStatusRequest>();
if (reportQueues == NULL) {
fprintf(stderr, "create thread_queue error\n");
exit(1);
}
pthread_t tid;
int ret = pthread_create(&tid, NULL, store_main, reportQueues[i]);
if (ret == -1) {
perror("pthread_create");
exit(1);
}
pthread_detach(tid);
}
}
int main(int argc, char **argv)
{
event_loop loop;
//加載配置文件
config_file::setPath("./conf/lars_reporter.conf");
std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
short port = config_file::instance()->GetNumber("reactor", "port", 7779);
//創(chuàng)建tcp server
tcp_server server(&loop, ip.c_str(), port);
//添加數(shù)據(jù)上報(bào)請(qǐng)求處理的消息分發(fā)處理業(yè)務(wù)
server.add_msg_router(lars::ID_ReportStatusRequest, get_report_status);
//為了防止在業(yè)務(wù)中出現(xiàn)io阻塞对妄,那么需要啟動(dòng)一個(gè)線程池對(duì)IO進(jìn)行操作的湘今,接受業(yè)務(wù)的請(qǐng)求存儲(chǔ)消息
create_reportdb_threads();
//啟動(dòng)事件監(jiān)聽(tīng)
loop.event_process();
return 0;
}
? 這里主線程啟動(dòng)了線程池,根據(jù)配置文件的db_thread_cnt
數(shù)量來(lái)開(kāi)辟剪菱。每個(gè)線程都會(huì)執(zhí)行store_main
方法摩瞎,我們來(lái)看一下實(shí)現(xiàn)
lars_reporter/src/store_thread.cpp
#include "lars.pb.h"
#include "lars_reactor.h"
#include "store_report.h"
struct Args
{
thread_queue<lars::ReportStatusRequest>* first;
StoreReport *second;
};
//typedef void io_callback(event_loop *loop, int fd, void *args);
void thread_report(event_loop *loop, int fd, void *args)
{
//1. 從queue里面取出需要report的數(shù)據(jù)(需要thread_queue)
thread_queue<lars::ReportStatusRequest>* queue = ((Args*)args)->first;
StoreReport *sr = ((Args*)args)->second;
std::queue<lars::ReportStatusRequest> report_msgs;
//1.1 從消息隊(duì)列中取出全部的消息元素集合
queue->recv(report_msgs);
while ( !report_msgs.empty() ) {
lars::ReportStatusRequest msg = report_msgs.front();
report_msgs.pop();
//2. 將數(shù)據(jù)存儲(chǔ)到DB中(需要StoreReport)
sr->store(msg);
}
}
void *store_main(void *args)
{
//得到對(duì)應(yīng)的thread_queue
thread_queue<lars::ReportStatusRequest> *queue = (thread_queue<lars::ReportStatusRequest>*)args;
//定義事件觸發(fā)機(jī)制
event_loop loop;
//定義一個(gè)存儲(chǔ)對(duì)象
StoreReport sr;
Args callback_args;
callback_args.first = queue;
callback_args.second = &sr;
queue->set_loop(&loop);
queue->set_callback(thread_report, &callback_args);
//啟動(dòng)事件監(jiān)聽(tīng)
loop.event_process();
return NULL;
}
? 每個(gè)線程都會(huì)綁定一個(gè)thread_queue<lars::ReportStatusRequest>
,然后一個(gè)線程里面有一個(gè)loop,來(lái)監(jiān)控消息隊(duì)列是否有消息事件過(guò)來(lái),如果有消息實(shí)現(xiàn)過(guò)來(lái)孝常,針對(duì)每個(gè)消息會(huì)觸發(fā)thread_report()
方法旗们, 在thread_report()
中,我們就直接將lars::ReportStatusRequest
消息存儲(chǔ)到db中构灸。
? 那么上渴,由誰(shuí)來(lái)給每個(gè)線程的thread_queue
發(fā)送消息呢,就是agent/客戶端發(fā)送的請(qǐng)求,我們?cè)谔幚?code>lars::ID_ReportStatusRequest 消息分發(fā)業(yè)務(wù)的時(shí)候調(diào)用get_report_status()
來(lái)觸發(fā)稠氮。
lars_reporter/src/reporter_service.cpp
void get_report_status(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
lars::ReportStatusRequest req;
req.ParseFromArray(data, len);
//將上報(bào)數(shù)據(jù)存儲(chǔ)到db
StoreReport sr;
sr.store(req);
//輪詢將消息平均發(fā)送到每個(gè)線程的消息隊(duì)列中
static int index = 0;
//將消息發(fā)送給某個(gè)線程消息隊(duì)列
reportQueues[index]->send(req);
index ++;
index = index % thread_cnt;
}
? 這里的分發(fā)機(jī)制曹阔,是采用最輪詢的方式,是每個(gè)線程依次分配隔披,去調(diào)用thread_queue
的send()
方法赃份,將消息發(fā)送給消息隊(duì)列。
? 最后我們進(jìn)行測(cè)試奢米,效果跟之前的效果是一樣的抓韩。我們現(xiàn)在已經(jīng)集成進(jìn)來(lái)了存儲(chǔ)線程池,現(xiàn)在就不用擔(dān)心在處理業(yè)務(wù)的時(shí)候鬓长,因?yàn)镈B等的io阻塞谒拴,使cpu得不到充分利用了。
關(guān)于作者:
作者:Aceld(劉丹冰)
mail: danbing.at@gmail.com
github: https://github.com/aceld
原創(chuàng)書(shū)籍gitbook: http://legacy.gitbook.com/@aceld
原創(chuàng)聲明:未經(jīng)作者允許請(qǐng)勿轉(zhuǎn)載, 如果轉(zhuǎn)載請(qǐng)注明出處