(24)Backend Thread實時監(jiān)控(Dns部分)-【Lars-基于C++負載均衡遠程服務(wù)器調(diào)度系統(tǒng)教程】

【Lars教程目錄】

Lars源代碼
https://github.com/aceld/Lars


【Lars系統(tǒng)概述】
第1章-概述
第2章-項目目錄構(gòu)建


【Lars系統(tǒng)之Reactor模型服務(wù)器框架模塊】
第1章-項目結(jié)構(gòu)與V0.1雛形
第2章-內(nèi)存管理與Buffer封裝
第3章-事件觸發(fā)EventLoop
第4章-鏈接與消息封裝
第5章-Client客戶端模型
第6章-連接管理及限制
第7章-消息業(yè)務(wù)路由分發(fā)機制
第8章-鏈接創(chuàng)建/銷毀Hook機制
第9章-消息任務(wù)隊列與線程池
第10章-配置文件讀寫功能
第11章-udp服務(wù)與客戶端
第12章-數(shù)據(jù)傳輸協(xié)議protocol buffer
第13章-QPS性能測試
第14章-異步消息任務(wù)機制
第15章-鏈接屬性設(shè)置功能


【Lars系統(tǒng)之DNSService模塊】
第1章-Lars-dns簡介
第2章-數(shù)據(jù)庫創(chuàng)建
第3章-項目目錄結(jié)構(gòu)及環(huán)境構(gòu)建
第4章-Route結(jié)構(gòu)的定義
第5章-獲取Route信息
第6章-Route訂閱模式
第7章-Backend Thread實時監(jiān)控


【Lars系統(tǒng)之Report Service模塊】
第1章-項目概述-數(shù)據(jù)表及proto3協(xié)議定義
第2章-獲取report上報數(shù)據(jù)
第3章-存儲線程池及消息隊列


【Lars系統(tǒng)之LoadBalance Agent模塊】
第1章-項目概述及構(gòu)建
第2章-主模塊業(yè)務(wù)結(jié)構(gòu)搭建
第3章-Report與Dns Client設(shè)計與實現(xiàn)
第4章-負載均衡模塊基礎(chǔ)設(shè)計
第5章-負載均衡獲取Host主機信息API
第6章-負載均衡上報Host主機信息API
第7章-過期窗口清理與過載超時(V0.5)
第8章-定期拉取最新路由信息(V0.6)
第9章-負載均衡獲取Route信息API(0.7)
第10章-API初始化接口(V0.8)
第11章-Lars Agent性能測試工具
第12章- Lars啟動工具腳本


Backend Thread的后臺總業(yè)務(wù)流程如下:
11-Lars-dns_BackendThread.png

7.1 數(shù)據(jù)庫表相關(guān)查詢方法實現(xiàn)

? 我們先實現(xiàn)一些基本的數(shù)據(jù)表達查詢方法:

lars_dns/src/dns_route.cpp

/*
 *  return 0, 表示 加載成功边锁,version沒有改變
 *         1, 表示 加載成功唯笙,version有改變
 *         -1 表示 加載失敗
 * */
int Route::load_version()
{
    //這里面只會有一條數(shù)據(jù)
    snprintf(_sql, 1000, "SELECT version from RouteVersion WHERE id = 1;");

    int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));
    if (ret)
    {
        fprintf(stderr, "load version error: %s\n", mysql_error(&_db_conn));
        return -1;
    }

    MYSQL_RES *result = mysql_store_result(&_db_conn);
    if (!result)
    {
        fprintf(stderr, "mysql store result: %s\n", mysql_error(&_db_conn));
        return -1;
    }

    long line_num = mysql_num_rows(result);
    if (line_num == 0)
    {
        fprintf(stderr, "No version in table RouteVersion: %s\n", mysql_error(&_db_conn));
        return -1;
    }

    MYSQL_ROW row = mysql_fetch_row(result);
    //得到version

    long new_version = atol(row[0]);

    if (new_version == this->_version)
    {
        //加載成功但是沒有修改
        return 0;
    }
    this->_version = new_version;
    printf("now route version is %ld\n", this->_version);

    mysql_free_result(result);

    return 1;
}


//加載RouteData到_temp_pointer
int Route::load_route_data() 
{
    _temp_pointer->clear();

    snprintf(_sql, 100, "SELECT * FROM RouteData;");

    int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));
    if (ret)
    {
        fprintf(stderr, "load version error: %s\n", mysql_error(&_db_conn));
        return -1;
    }

    MYSQL_RES *result = mysql_store_result(&_db_conn);
    if (!result)
    {
        fprintf(stderr, "mysql store result: %s\n", mysql_error(&_db_conn));
        return -1;
    }

    long line_num = mysql_num_rows(result);
    MYSQL_ROW row;
    for (long i = 0;i < line_num; ++i)
    {
        row = mysql_fetch_row(result);
        int modid = atoi(row[1]);
        int cmdid = atoi(row[2]);
        unsigned ip = atoi(row[3]);
        int port = atoi(row[4]);

        uint64_t key = ((uint64_t)modid << 32) + cmdid;
        uint64_t value = ((uint64_t)ip << 32) + port;

        (*_temp_pointer)[key].insert(value);
    }
    printf("load data to tmep succ! size is %lu\n", _temp_pointer->size());

    mysql_free_result(result);

    return 0;
}


//將temp_pointer的數(shù)據(jù)更新到data_pointer
void Route::swap()
{
    pthread_rwlock_wrlock(&_map_lock);
    route_map *temp = _data_pointer;
    _data_pointer = _temp_pointer;
    _temp_pointer = temp;
    pthread_rwlock_unlock(&_map_lock);
}


//加載RouteChange得到修改的modid/cmdid
//將結(jié)果放在vector中
void Route::load_changes(std::vector<uint64_t> &change_list) 
{
    //讀取當前版本之前的全部修改 
    snprintf(_sql, 1000, "SELECT modid,cmdid FROM RouteChange WHERE version <= %ld;", _version);

    int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));
    if (ret)
    {
        fprintf(stderr, "mysql_real_query: %s\n", mysql_error(&_db_conn));
        return ;
    }

    MYSQL_RES *result = mysql_store_result(&_db_conn);
    if (!result)
    {
        fprintf(stderr, "mysql_store_result %s\n", mysql_error(&_db_conn));
        return ;
    }

    long lineNum = mysql_num_rows(result);
    if (lineNum == 0)
    {
        fprintf(stderr,  "No version in table ChangeLog: %s\n", mysql_error(&_db_conn));
        return ;
    }
    MYSQL_ROW row;
    for (long i = 0;i < lineNum; ++i)
    {
        row = mysql_fetch_row(result);
        int modid = atoi(row[0]);
        int cmdid = atoi(row[1]);
        uint64_t key = (((uint64_t)modid) << 32) + cmdid;
        change_list.push_back(key);
    }
    mysql_free_result(result);    
}




//將RouteChange
//刪除RouteChange的全部修改記錄數(shù)據(jù),remove_all為全部刪除
//否則默認刪除當前版本之前的全部修改
void Route::remove_changes(bool remove_all)
{
    if (remove_all == false)
    {
        snprintf(_sql, 1000, "DELETE FROM RouteChange WHERE version <= %ld;", _version);
    }
    else
    {
        snprintf(_sql, 1000, "DELETE FROM RouteChange;");
    }
    int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));
    if (ret != 0)
    {
        fprintf(stderr, "delete RouteChange: %s\n", mysql_error(&_db_conn));
        return ;
    } 

    return;
}

這里面提供了基本的對一些表的加載和刪除操作:

load_version():加載當前route信息版本號潘飘。

load_route_data():加載RouteData信息表杏慰,到_temp_pointer中。

swap():將__temp_pointer的表數(shù)據(jù)同步到_data_temp表中.

load_changes():加載RouteChange得到修改的modid/cmdid誉结,將結(jié)果放在vector中

remove_changes():清空之前的修改記錄鹅士。

7.2 Backend Thread業(yè)務(wù)流程實現(xiàn)

lars_dns/src/dns_route.cpp

//周期性后端檢查db的route信息的更新變化業(yè)務(wù)
//backend thread完成
void *check_route_changes(void *args)
{
    int wait_time = 10;//10s自動修改一次,也可以從配置文件讀取
    long last_load_time = time(NULL);

    //清空全部的RouteChange
    Route::instance()->remove_changes(true);

    //1 判斷是否有修改
    while (true) {
        sleep(1);
        long current_time = time(NULL);

        //1.1 加載RouteVersion得到當前版本號
        int ret = Route::instance()->load_version();
        if (ret == 1) {
            //version改版 有modid/cmdid修改
            //2 如果有修改

            //2.1 將最新的RouteData加載到_temp_pointer中
            if (Route::instance()->load_route_data() == 0) {
                //2.2 更新_temp_pointer數(shù)據(jù)到_data_pointer map中
                Route::instance()->swap();
                last_load_time = current_time;//更新最后加載時間
            }

            //2.3 獲取被修改的modid/cmdid對應(yīng)的訂閱客戶端,進行推送         
            std::vector<uint64_t> changes;
            Route::instance()->load_changes(changes);

            //推送
            SubscribeList::instance()->publish(changes);

            //2.4 刪除當前版本之前的修改記錄
            Route::instance()->remove_changes();

        }
        else {
            //3 如果沒有修改
            if (current_time - last_load_time >= wait_time) {
                //3.1 超時,加載最新的temp_pointer
                if (Route::instance()->load_route_data() == 0) {
                    //3.2 _temp_pointer數(shù)據(jù)更新到_data_pointer map中
                    Route::instance()->swap();
                    last_load_time = current_time;
                }
            }
        }
    }

    return NULL;
}

? 該實現(xiàn)與上面流程圖描述的過程一樣惩坑。那么check_route_changes()我們可以讓一個后臺線程進行承載掉盅。

lars_dns/src/dns_service.cpp

int main(int argc, char **argv)
{
    event_loop loop;

    //加載配置文件
    config_file::setPath("conf/lars_dns.conf");
    std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
    short port = config_file::instance()->GetNumber("reactor", "port", 7778);


    //創(chuàng)建tcp服務(wù)器
    server = new tcp_server(&loop, ip.c_str(), port);

    //注冊鏈接創(chuàng)建/銷毀Hook函數(shù)
    server->set_conn_start(create_subscribe);
    server->set_conn_close(clear_subscribe);

    //注冊路由業(yè)務(wù)
    server->add_msg_router(lars::ID_GetRouteRequest, get_route);

        // =================================================
    //開辟backend thread 周期性檢查db數(shù)據(jù)庫route信息的更新狀態(tài)
    pthread_t tid;
    int ret = pthread_create(&tid, NULL, check_route_changes, NULL);
    if (ret == -1) {
        perror("pthread_create backendThread");
        exit(1);
    }
    //設(shè)置分離模式
    pthread_detach(tid);
    // =================================================

    //開始事件監(jiān)聽    
    printf("lars dns service ....\n");
    loop.event_process();

    return 0;
}

7.3 完成dns模塊的訂閱功能測試V0.3

? 我們提供一個修改一個modid/cmdid的sql語句來觸發(fā)訂閱條件,并且讓dns service服務(wù)器主動給訂閱的客戶端發(fā)送該訂閱消息以舒。

lars_dns/test/test_insert_dns_route.sql

USE lars_dns;

SET @time = UNIX_TIMESTAMP(NOW());

INSERT INTO RouteData(modid, cmdid, serverip, serverport) VALUES(1, 1, 3232235953, 9999);
UPDATE RouteVersion SET version = @time WHERE id = 1;

INSERT INTO RouteChange(modid, cmdid, version) VALUES(1, 1, @time);

客戶端代碼:

lars_dns/test/lars_dns_test1.cpp

#include <string.h>
#include <unistd.h>
#include <string>
#include "lars_reactor.h"
#include "lars.pb.h"

//命令行參數(shù)
struct Option
{
    Option():ip(NULL),port(0) {}

    char *ip;
    short port;
};


Option option;

void Usage() {
    printf("Usage: ./lars_dns_test -h ip -p port\n");
}

//解析命令行
void parse_option(int argc, char **argv)
{
    for (int i = 0; i < argc; i++) {
        if (strcmp(argv[i], "-h") == 0) {
            option.ip = argv[i + 1];
        }
        else if (strcmp(argv[i], "-p") == 0) {
            option.port = atoi(argv[i + 1]);
        }
    }

    if ( !option.ip || !option.port ) {
        Usage();
        exit(1);
    }

}

//typedef void (*conn_callback)(net_connection *conn, void *args);
void on_connection(net_connection *conn, void *args) 
{
    //發(fā)送Route信息請求
    lars::GetRouteRequest req;

    req.set_modid(1);
    req.set_cmdid(1);

    std::string requestString;

    req.SerializeToString(&requestString);
    conn->send_message(requestString.c_str(), requestString.size(), lars::ID_GetRouteRequest);
}

void deal_get_route(const char *data, uint32_t len, int msgid, net_connection *net_conn, void *user_data)
{
    //解包得到數(shù)據(jù)
    lars::GetRouteResponse rsp;
    rsp.ParseFromArray(data, len);
    
    //打印數(shù)據(jù)
    printf("modid = %d\n", rsp.modid());
    printf("cmdid = %d\n", rsp.cmdid());
    printf("host_size = %d\n", rsp.host_size());

    for (int i = 0; i < rsp.host_size(); i++) {
        printf("-->ip = %u\n", rsp.host(i).ip());
        printf("-->port = %d\n", rsp.host(i).port());
    }
   
}


int main(int argc, char **argv)
{
    parse_option(argc, argv);

    event_loop loop;
    tcp_client *client;

    //創(chuàng)建客戶端
    client = new tcp_client(&loop, option.ip, option.port, "lars_dns_test");
    if (client == NULL) {
        fprintf(stderr, "client == NULL\n");
        exit(1);
    }

    //客戶端成功建立連接趾痘,首先發(fā)送請求包
    client->set_conn_start(on_connection);

    //設(shè)置服務(wù)端回應(yīng)包處理業(yè)務(wù)
    client->add_msg_router(lars::ID_GetRouteResponse, deal_get_route);

    loop.event_process(); 

    return 0;
}

啟動dns_server:

$ ./bin/lars_dns 
msg_router init...
create 0 thread
create 1 thread
create 2 thread
create 3 thread
create 4 thread
add msg cb msgid = 1
lars dns service ....
now route version is 1571058286 modID = 1, cmdID = 1, ip = 3232235953, port = 9999

啟動客戶端:

$ ./lars_dns_test1 -h 127.0.0.1 -p 7778
msg_router init...
do_connect EINPROGRESS
add msg cb msgid = 2
connect 127.0.0.1:7778 succ!
modid = 1
cmdid = 1
host_size = 1
-->ip = 3232235953
-->port = 9999

我們知道,第一請求modid/cmdid就會訂閱該Route模塊蔓钟。

然后我們通過外界修改modid=1,cmdid=1的模塊,新開一個終端永票,執(zhí)行test_insert_dns_route.sql

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> use lars_dns;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> \. test_insert_dns_route.sql
Database changed
Query OK, 0 rows affected (0.00 sec)

Query OK, 1 row affected (0.01 sec)

Query OK, 1 row affected (0.01 sec)
Rows matched: 1  Changed: 1  Warnings: 0

Query OK, 1 row affected (0.02 sec)

mysql> 

然后我會會發(fā)現(xiàn)客戶端已經(jīng)得到一個新的消息,就是最新的route數(shù)據(jù)過來滥沫。是由dns_service主動推送過來的訂閱消息.

客戶端:

$ ./lars_dns_test1 -h 127.0.0.1 -p 7778
msg_router init...
do_connect EINPROGRESS
add msg cb msgid = 2
connect 127.0.0.1:7778 succ!
modid = 1
cmdid = 1
host_size = 1
-->ip = 3232235953
-->port = 9999
modid = 1
cmdid = 1
host_size = 1
-->ip = 3232235953
-->port = 9999

? 這樣我們的訂閱功能就完成了侣集,整體的lars_dns模塊的工作到此的基本需求全部也已經(jīng)滿足。


關(guān)于作者:

作者:Aceld(劉丹冰)

mail: danbing.at@gmail.com
github: https://github.com/aceld
原創(chuàng)書籍gitbook: http://legacy.gitbook.com/@aceld

原創(chuàng)聲明:未經(jīng)作者允許請勿轉(zhuǎn)載, 如果轉(zhuǎn)載請注明出處

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末兰绣,一起剝皮案震驚了整個濱河市世分,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌狭魂,老刑警劉巖罚攀,帶你破解...
    沈念sama閱讀 212,222評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異雌澄,居然都是意外死亡,警方通過查閱死者的電腦和手機杯瞻,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,455評論 3 385
  • 文/潘曉璐 我一進店門镐牺,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人魁莉,你說我怎么就攤上這事睬涧。” “怎么了旗唁?”我有些...
    開封第一講書人閱讀 157,720評論 0 348
  • 文/不壞的土叔 我叫張陵畦浓,是天一觀的道長。 經(jīng)常有香客問我检疫,道長讶请,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,568評論 1 284
  • 正文 為了忘掉前任屎媳,我火速辦了婚禮夺溢,結(jié)果婚禮上论巍,老公的妹妹穿的比我還像新娘。我一直安慰自己风响,他們只是感情好嘉汰,可當我...
    茶點故事閱讀 65,696評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著状勤,像睡著了一般鞋怀。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上持搜,一...
    開封第一講書人閱讀 49,879評論 1 290
  • 那天密似,我揣著相機與錄音,去河邊找鬼朵诫。 笑死辛友,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的剪返。 我是一名探鬼主播废累,決...
    沈念sama閱讀 39,028評論 3 409
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼脱盲!你這毒婦竟也來了邑滨?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,773評論 0 268
  • 序言:老撾萬榮一對情侶失蹤钱反,失蹤者是張志新(化名)和其女友劉穎掖看,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體面哥,經(jīng)...
    沈念sama閱讀 44,220評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡哎壳,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,550評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了尚卫。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片归榕。...
    茶點故事閱讀 38,697評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖吱涉,靈堂內(nèi)的尸體忽然破棺而出刹泄,到底是詐尸還是另有隱情,我是刑警寧澤怎爵,帶...
    沈念sama閱讀 34,360評論 4 332
  • 正文 年R本政府宣布特石,位于F島的核電站,受9級特大地震影響鳖链,放射性物質(zhì)發(fā)生泄漏姆蘸。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 40,002評論 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望乞旦。 院中可真熱鬧贼穆,春花似錦、人聲如沸兰粉。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,782評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽玖姑。三九已至愕秫,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間焰络,已是汗流浹背戴甩。 一陣腳步聲響...
    開封第一講書人閱讀 32,010評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留闪彼,地道東北人甜孤。 一個月前我還...
    沈念sama閱讀 46,433評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像畏腕,于是被迫代替她去往敵國和親缴川。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,587評論 2 350

推薦閱讀更多精彩內(nèi)容