【Lars教程目錄】
Lars源代碼
https://github.com/aceld/Lars
【Lars系統(tǒng)概述】
第1章-概述
第2章-項目目錄構(gòu)建
【Lars系統(tǒng)之Reactor模型服務(wù)器框架模塊】
第1章-項目結(jié)構(gòu)與V0.1雛形
第2章-內(nèi)存管理與Buffer封裝
第3章-事件觸發(fā)EventLoop
第4章-鏈接與消息封裝
第5章-Client客戶端模型
第6章-連接管理及限制
第7章-消息業(yè)務(wù)路由分發(fā)機制
第8章-鏈接創(chuàng)建/銷毀Hook機制
第9章-消息任務(wù)隊列與線程池
第10章-配置文件讀寫功能
第11章-udp服務(wù)與客戶端
第12章-數(shù)據(jù)傳輸協(xié)議protocol buffer
第13章-QPS性能測試
第14章-異步消息任務(wù)機制
第15章-鏈接屬性設(shè)置功能
【Lars系統(tǒng)之DNSService模塊】
第1章-Lars-dns簡介
第2章-數(shù)據(jù)庫創(chuàng)建
第3章-項目目錄結(jié)構(gòu)及環(huán)境構(gòu)建
第4章-Route結(jié)構(gòu)的定義
第5章-獲取Route信息
第6章-Route訂閱模式
第7章-Backend Thread實時監(jiān)控
【Lars系統(tǒng)之Report Service模塊】
第1章-項目概述-數(shù)據(jù)表及proto3協(xié)議定義
第2章-獲取report上報數(shù)據(jù)
第3章-存儲線程池及消息隊列
【Lars系統(tǒng)之LoadBalance Agent模塊】
第1章-項目概述及構(gòu)建
第2章-主模塊業(yè)務(wù)結(jié)構(gòu)搭建
第3章-Report與Dns Client設(shè)計與實現(xiàn)
第4章-負載均衡模塊基礎(chǔ)設(shè)計
第5章-負載均衡獲取Host主機信息API
第6章-負載均衡上報Host主機信息API
第7章-過期窗口清理與過載超時(V0.5)
第8章-定期拉取最新路由信息(V0.6)
第9章-負載均衡獲取Route信息API(0.7)
第10章-API初始化接口(V0.8)
第11章-Lars Agent性能測試工具
第12章- Lars啟動工具腳本
Backend Thread的后臺總業(yè)務(wù)流程如下:
7.1 數(shù)據(jù)庫表相關(guān)查詢方法實現(xiàn)
? 我們先實現(xiàn)一些基本的數(shù)據(jù)表達查詢方法:
lars_dns/src/dns_route.cpp
/*
* return 0, 表示 加載成功边锁,version沒有改變
* 1, 表示 加載成功唯笙,version有改變
* -1 表示 加載失敗
* */
int Route::load_version()
{
//這里面只會有一條數(shù)據(jù)
snprintf(_sql, 1000, "SELECT version from RouteVersion WHERE id = 1;");
int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));
if (ret)
{
fprintf(stderr, "load version error: %s\n", mysql_error(&_db_conn));
return -1;
}
MYSQL_RES *result = mysql_store_result(&_db_conn);
if (!result)
{
fprintf(stderr, "mysql store result: %s\n", mysql_error(&_db_conn));
return -1;
}
long line_num = mysql_num_rows(result);
if (line_num == 0)
{
fprintf(stderr, "No version in table RouteVersion: %s\n", mysql_error(&_db_conn));
return -1;
}
MYSQL_ROW row = mysql_fetch_row(result);
//得到version
long new_version = atol(row[0]);
if (new_version == this->_version)
{
//加載成功但是沒有修改
return 0;
}
this->_version = new_version;
printf("now route version is %ld\n", this->_version);
mysql_free_result(result);
return 1;
}
//加載RouteData到_temp_pointer
int Route::load_route_data()
{
_temp_pointer->clear();
snprintf(_sql, 100, "SELECT * FROM RouteData;");
int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));
if (ret)
{
fprintf(stderr, "load version error: %s\n", mysql_error(&_db_conn));
return -1;
}
MYSQL_RES *result = mysql_store_result(&_db_conn);
if (!result)
{
fprintf(stderr, "mysql store result: %s\n", mysql_error(&_db_conn));
return -1;
}
long line_num = mysql_num_rows(result);
MYSQL_ROW row;
for (long i = 0;i < line_num; ++i)
{
row = mysql_fetch_row(result);
int modid = atoi(row[1]);
int cmdid = atoi(row[2]);
unsigned ip = atoi(row[3]);
int port = atoi(row[4]);
uint64_t key = ((uint64_t)modid << 32) + cmdid;
uint64_t value = ((uint64_t)ip << 32) + port;
(*_temp_pointer)[key].insert(value);
}
printf("load data to tmep succ! size is %lu\n", _temp_pointer->size());
mysql_free_result(result);
return 0;
}
//將temp_pointer的數(shù)據(jù)更新到data_pointer
void Route::swap()
{
pthread_rwlock_wrlock(&_map_lock);
route_map *temp = _data_pointer;
_data_pointer = _temp_pointer;
_temp_pointer = temp;
pthread_rwlock_unlock(&_map_lock);
}
//加載RouteChange得到修改的modid/cmdid
//將結(jié)果放在vector中
void Route::load_changes(std::vector<uint64_t> &change_list)
{
//讀取當前版本之前的全部修改
snprintf(_sql, 1000, "SELECT modid,cmdid FROM RouteChange WHERE version <= %ld;", _version);
int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));
if (ret)
{
fprintf(stderr, "mysql_real_query: %s\n", mysql_error(&_db_conn));
return ;
}
MYSQL_RES *result = mysql_store_result(&_db_conn);
if (!result)
{
fprintf(stderr, "mysql_store_result %s\n", mysql_error(&_db_conn));
return ;
}
long lineNum = mysql_num_rows(result);
if (lineNum == 0)
{
fprintf(stderr, "No version in table ChangeLog: %s\n", mysql_error(&_db_conn));
return ;
}
MYSQL_ROW row;
for (long i = 0;i < lineNum; ++i)
{
row = mysql_fetch_row(result);
int modid = atoi(row[0]);
int cmdid = atoi(row[1]);
uint64_t key = (((uint64_t)modid) << 32) + cmdid;
change_list.push_back(key);
}
mysql_free_result(result);
}
//將RouteChange
//刪除RouteChange的全部修改記錄數(shù)據(jù),remove_all為全部刪除
//否則默認刪除當前版本之前的全部修改
void Route::remove_changes(bool remove_all)
{
if (remove_all == false)
{
snprintf(_sql, 1000, "DELETE FROM RouteChange WHERE version <= %ld;", _version);
}
else
{
snprintf(_sql, 1000, "DELETE FROM RouteChange;");
}
int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));
if (ret != 0)
{
fprintf(stderr, "delete RouteChange: %s\n", mysql_error(&_db_conn));
return ;
}
return;
}
這里面提供了基本的對一些表的加載和刪除操作:
load_version()
:加載當前route信息版本號潘飘。
load_route_data()
:加載RouteData
信息表杏慰,到_temp_pointer中。
swap()
:將__temp_pointer的表數(shù)據(jù)同步到_data_temp表中.
load_changes()
:加載RouteChange得到修改的modid/cmdid誉结,將結(jié)果放在vector中
remove_changes()
:清空之前的修改記錄鹅士。
7.2 Backend Thread業(yè)務(wù)流程實現(xiàn)
lars_dns/src/dns_route.cpp
//周期性后端檢查db的route信息的更新變化業(yè)務(wù)
//backend thread完成
void *check_route_changes(void *args)
{
int wait_time = 10;//10s自動修改一次,也可以從配置文件讀取
long last_load_time = time(NULL);
//清空全部的RouteChange
Route::instance()->remove_changes(true);
//1 判斷是否有修改
while (true) {
sleep(1);
long current_time = time(NULL);
//1.1 加載RouteVersion得到當前版本號
int ret = Route::instance()->load_version();
if (ret == 1) {
//version改版 有modid/cmdid修改
//2 如果有修改
//2.1 將最新的RouteData加載到_temp_pointer中
if (Route::instance()->load_route_data() == 0) {
//2.2 更新_temp_pointer數(shù)據(jù)到_data_pointer map中
Route::instance()->swap();
last_load_time = current_time;//更新最后加載時間
}
//2.3 獲取被修改的modid/cmdid對應(yīng)的訂閱客戶端,進行推送
std::vector<uint64_t> changes;
Route::instance()->load_changes(changes);
//推送
SubscribeList::instance()->publish(changes);
//2.4 刪除當前版本之前的修改記錄
Route::instance()->remove_changes();
}
else {
//3 如果沒有修改
if (current_time - last_load_time >= wait_time) {
//3.1 超時,加載最新的temp_pointer
if (Route::instance()->load_route_data() == 0) {
//3.2 _temp_pointer數(shù)據(jù)更新到_data_pointer map中
Route::instance()->swap();
last_load_time = current_time;
}
}
}
}
return NULL;
}
? 該實現(xiàn)與上面流程圖描述的過程一樣惩坑。那么check_route_changes()
我們可以讓一個后臺線程進行承載掉盅。
lars_dns/src/dns_service.cpp
int main(int argc, char **argv)
{
event_loop loop;
//加載配置文件
config_file::setPath("conf/lars_dns.conf");
std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
short port = config_file::instance()->GetNumber("reactor", "port", 7778);
//創(chuàng)建tcp服務(wù)器
server = new tcp_server(&loop, ip.c_str(), port);
//注冊鏈接創(chuàng)建/銷毀Hook函數(shù)
server->set_conn_start(create_subscribe);
server->set_conn_close(clear_subscribe);
//注冊路由業(yè)務(wù)
server->add_msg_router(lars::ID_GetRouteRequest, get_route);
// =================================================
//開辟backend thread 周期性檢查db數(shù)據(jù)庫route信息的更新狀態(tài)
pthread_t tid;
int ret = pthread_create(&tid, NULL, check_route_changes, NULL);
if (ret == -1) {
perror("pthread_create backendThread");
exit(1);
}
//設(shè)置分離模式
pthread_detach(tid);
// =================================================
//開始事件監(jiān)聽
printf("lars dns service ....\n");
loop.event_process();
return 0;
}
7.3 完成dns模塊的訂閱功能測試V0.3
? 我們提供一個修改一個modid/cmdid的sql語句來觸發(fā)訂閱條件,并且讓dns service服務(wù)器主動給訂閱的客戶端發(fā)送該訂閱消息以舒。
lars_dns/test/test_insert_dns_route.sql
USE lars_dns;
SET @time = UNIX_TIMESTAMP(NOW());
INSERT INTO RouteData(modid, cmdid, serverip, serverport) VALUES(1, 1, 3232235953, 9999);
UPDATE RouteVersion SET version = @time WHERE id = 1;
INSERT INTO RouteChange(modid, cmdid, version) VALUES(1, 1, @time);
客戶端代碼:
lars_dns/test/lars_dns_test1.cpp
#include <string.h>
#include <unistd.h>
#include <string>
#include "lars_reactor.h"
#include "lars.pb.h"
//命令行參數(shù)
struct Option
{
Option():ip(NULL),port(0) {}
char *ip;
short port;
};
Option option;
void Usage() {
printf("Usage: ./lars_dns_test -h ip -p port\n");
}
//解析命令行
void parse_option(int argc, char **argv)
{
for (int i = 0; i < argc; i++) {
if (strcmp(argv[i], "-h") == 0) {
option.ip = argv[i + 1];
}
else if (strcmp(argv[i], "-p") == 0) {
option.port = atoi(argv[i + 1]);
}
}
if ( !option.ip || !option.port ) {
Usage();
exit(1);
}
}
//typedef void (*conn_callback)(net_connection *conn, void *args);
void on_connection(net_connection *conn, void *args)
{
//發(fā)送Route信息請求
lars::GetRouteRequest req;
req.set_modid(1);
req.set_cmdid(1);
std::string requestString;
req.SerializeToString(&requestString);
conn->send_message(requestString.c_str(), requestString.size(), lars::ID_GetRouteRequest);
}
void deal_get_route(const char *data, uint32_t len, int msgid, net_connection *net_conn, void *user_data)
{
//解包得到數(shù)據(jù)
lars::GetRouteResponse rsp;
rsp.ParseFromArray(data, len);
//打印數(shù)據(jù)
printf("modid = %d\n", rsp.modid());
printf("cmdid = %d\n", rsp.cmdid());
printf("host_size = %d\n", rsp.host_size());
for (int i = 0; i < rsp.host_size(); i++) {
printf("-->ip = %u\n", rsp.host(i).ip());
printf("-->port = %d\n", rsp.host(i).port());
}
}
int main(int argc, char **argv)
{
parse_option(argc, argv);
event_loop loop;
tcp_client *client;
//創(chuàng)建客戶端
client = new tcp_client(&loop, option.ip, option.port, "lars_dns_test");
if (client == NULL) {
fprintf(stderr, "client == NULL\n");
exit(1);
}
//客戶端成功建立連接趾痘,首先發(fā)送請求包
client->set_conn_start(on_connection);
//設(shè)置服務(wù)端回應(yīng)包處理業(yè)務(wù)
client->add_msg_router(lars::ID_GetRouteResponse, deal_get_route);
loop.event_process();
return 0;
}
啟動dns_server:
$ ./bin/lars_dns
msg_router init...
create 0 thread
create 1 thread
create 2 thread
create 3 thread
create 4 thread
add msg cb msgid = 1
lars dns service ....
now route version is 1571058286 modID = 1, cmdID = 1, ip = 3232235953, port = 9999
啟動客戶端:
$ ./lars_dns_test1 -h 127.0.0.1 -p 7778
msg_router init...
do_connect EINPROGRESS
add msg cb msgid = 2
connect 127.0.0.1:7778 succ!
modid = 1
cmdid = 1
host_size = 1
-->ip = 3232235953
-->port = 9999
我們知道,第一請求modid/cmdid就會訂閱該Route模塊蔓钟。
然后我們通過外界修改modid=1,cmdid=1的模塊,新開一個終端永票,執(zhí)行test_insert_dns_route.sql
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> use lars_dns;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> \. test_insert_dns_route.sql
Database changed
Query OK, 0 rows affected (0.00 sec)
Query OK, 1 row affected (0.01 sec)
Query OK, 1 row affected (0.01 sec)
Rows matched: 1 Changed: 1 Warnings: 0
Query OK, 1 row affected (0.02 sec)
mysql>
然后我會會發(fā)現(xiàn)客戶端已經(jīng)得到一個新的消息,就是最新的route數(shù)據(jù)過來滥沫。是由dns_service主動推送過來的訂閱消息.
客戶端:
$ ./lars_dns_test1 -h 127.0.0.1 -p 7778
msg_router init...
do_connect EINPROGRESS
add msg cb msgid = 2
connect 127.0.0.1:7778 succ!
modid = 1
cmdid = 1
host_size = 1
-->ip = 3232235953
-->port = 9999
modid = 1
cmdid = 1
host_size = 1
-->ip = 3232235953
-->port = 9999
? 這樣我們的訂閱功能就完成了侣集,整體的lars_dns模塊的工作到此的基本需求全部也已經(jīng)滿足。
關(guān)于作者:
作者:Aceld(劉丹冰)
mail: danbing.at@gmail.com
github: https://github.com/aceld
原創(chuàng)書籍gitbook: http://legacy.gitbook.com/@aceld
原創(chuàng)聲明:未經(jīng)作者允許請勿轉(zhuǎn)載, 如果轉(zhuǎn)載請注明出處