【Lars教程目錄】
Lars源代碼
https://github.com/aceld/Lars
【Lars系統(tǒng)概述】
第1章-概述
第2章-項目目錄構(gòu)建
【Lars系統(tǒng)之Reactor模型服務(wù)器框架模塊】
第1章-項目結(jié)構(gòu)與V0.1雛形
第2章-內(nèi)存管理與Buffer封裝
第3章-事件觸發(fā)EventLoop
第4章-鏈接與消息封裝
第5章-Client客戶端模型
第6章-連接管理及限制
第7章-消息業(yè)務(wù)路由分發(fā)機制
第8章-鏈接創(chuàng)建/銷毀Hook機制
第9章-消息任務(wù)隊列與線程池
第10章-配置文件讀寫功能
第11章-udp服務(wù)與客戶端
第12章-數(shù)據(jù)傳輸協(xié)議protocol buffer
第13章-QPS性能測試
第14章-異步消息任務(wù)機制
第15章-鏈接屬性設(shè)置功能
【Lars系統(tǒng)之DNSService模塊】
第1章-Lars-dns簡介
第2章-數(shù)據(jù)庫創(chuàng)建
第3章-項目目錄結(jié)構(gòu)及環(huán)境構(gòu)建
第4章-Route結(jié)構(gòu)的定義
第5章-獲取Route信息
第6章-Route訂閱模式
第7章-Backend Thread實時監(jiān)控
【Lars系統(tǒng)之Report Service模塊】
第1章-項目概述-數(shù)據(jù)表及proto3協(xié)議定義
第2章-獲取report上報數(shù)據(jù)
第3章-存儲線程池及消息隊列
【Lars系統(tǒng)之LoadBalance Agent模塊】
第1章-項目概述及構(gòu)建
第2章-主模塊業(yè)務(wù)結(jié)構(gòu)搭建
第3章-Report與Dns Client設(shè)計與實現(xiàn)
第4章-負(fù)載均衡模塊基礎(chǔ)設(shè)計
第5章-負(fù)載均衡獲取Host主機信息API
第6章-負(fù)載均衡上報Host主機信息API
第7章-過期窗口清理與過載超時(V0.5)
第8章-定期拉取最新路由信息(V0.6)
第9章-負(fù)載均衡獲取Route信息API(0.7)
第10章-API初始化接口(V0.8)
第11章-Lars Agent性能測試工具
第12章- Lars啟動工具腳本
7) 負(fù)載均衡上報Host主機信息API(V0.4)
7.1 proto通信協(xié)議定義
syntax = "proto3";
package lars;
/* Lars系統(tǒng)的消息ID */
enum MessageId {
ID_UNKNOW = 0; //proto3 enum第一個屬性必須是0,用來占位
ID_GetRouteRequest = 1; //向DNS請求Route對應(yīng)的關(guān)系的消息ID
ID_GetRouteResponse = 2; //DNS回復(fù)的Route信息的消息ID
ID_ReportStatusRequest = 3; //上報host調(diào)用狀態(tài)信息請求消息ID
ID_GetHostRequest = 4; //API 發(fā)送請求host信息給 Lb Agent模塊 消息ID
ID_GetHostResponse = 5; //agent 回執(zhí)給 API host信息的 消息ID
ID_ReportRequest = 6; //API report get_host的調(diào)用結(jié)果給agent的 消息ID
}
//...
//...
// API 上報調(diào)用結(jié)果給 Agent(UDP)
message ReportRequest {
int32 modid = 1;
int32 cmdid = 2;
HostInfo host = 3;
int32 retcode = 4;
}
ID_ReportRequest
和 message ReportRequest
是針對API層與agent的請求互通協(xié)議鳄袍。
7.2 Lars-API:Reporter()方法客戶端實現(xiàn)
Lars/api/cpp/lars_api/lars_api.h
#pragma once
#include "lars_reactor.h"
#include <string>
class lars_client
{
public:
lars_client();
~lars_client();
//lars 系統(tǒng)獲取host信息 得到可用host的ip和port
int get_host(int modid, int cmdid, std::string& ip, int &port);
//lars 系統(tǒng)上報host調(diào)用信息
void report(int modid, int cmdid, const std::string &ip, int port, int retcode);
private:
int _sockfd[3]; //3個udp socket fd 對應(yīng)agent 3個udp server
uint32_t _seqid; //消息的序列號
};
? 新增report()
方法绢要。
Lars/api/cpp/lars_api/lars_api.cpp
//lars 系統(tǒng)上報host調(diào)用信息
void lars_client::report(int modid, int cmdid, const std::string &ip, int port, int retcode)
{
//1 封裝上報消息
lars::ReportRequest req;
req.set_modid(modid);
req.set_cmdid(cmdid);
req.set_retcode(retcode);
//1.1 host信息
lars::HostInfo *hp = req.mutable_host();
//ip
struct in_addr inaddr;
inet_aton(ip.c_str(), &inaddr);
int ip_num = inaddr.s_addr;
hp->set_ip(ip_num);
//port
hp->set_port(port);
//2. send
char write_buf[4096];
//消息頭
msg_head head;
head.msglen = req.ByteSizeLong();
head.msgid = lars::ID_ReportRequest;
memcpy(write_buf, &head, MESSAGE_HEAD_LEN);
req.SerializeToArray(write_buf + MESSAGE_HEAD_LEN, head.msglen);
int index = (modid+cmdid)%3;
int ret = sendto(_sockfd[index], write_buf, head.msglen + MESSAGE_HEAD_LEN, 0, NULL, 0);
if (ret == -1) {
perror("sendto");
}
}
Lars/api/cpp/example/example.cpp
int ret = api.get_host(modid, cmdid, ip, port);
if (ret == 0) {
std::cout << "host is " << ip << ":" << port << std::endl;
//上報調(diào)用結(jié)果
api.report(modid, cmdid, ip, port, 0);
}
? 在example的業(yè)務(wù)應(yīng)用中,加上調(diào)用上報api拗小。在每次調(diào)用完get_host
重罪。
7.3 report業(yè)務(wù)添加的配置參數(shù)信息
Lars/lars_loadbalance_agent/conf/lars_lb_agent.conf
[reporter]
ip = 127.0.0.1
port = 7779
[dnsserver]
ip = 127.0.0.1
port = 7778
[loadbalance]
;經(jīng)過若干次獲取請求host節(jié)點后,試探選擇一次overload過載節(jié)點
probe_num=10
;初始化host_info主機信息訪問成功的個數(shù),防止剛啟動時少量失敗就認(rèn)為過載
init_succ_cnt=180
;當(dāng)idle節(jié)點失敗率高于此值蛆封,節(jié)點變overload狀態(tài)
err_rate=0.1
;當(dāng)overload節(jié)點成功率高于此值,節(jié)點變成idle狀態(tài)
succ_rate=0.95
;當(dāng)idle節(jié)點連續(xù)失敗次數(shù)超過此值勾栗,節(jié)點變成overload狀態(tài)
contin_err_limit=15
;當(dāng)overload節(jié)點連續(xù)成功次數(shù)超過此值, 節(jié)點變成idle狀態(tài)
contin_succ_limit=15
配置文件里在[loadbalance]
中新增了一些字段惨篱。
那么我們需要在啟動lb_agent的時候,加載這些配置文件參數(shù).
lars_loadbalance_agent/include/main_server.h
#pragma once
#include "lars_reactor.h"
#include "lars.pb.h"
#include "route_lb.h"
struct load_balance_config
{
//經(jīng)過若干次獲取請求host節(jié)點后围俘,試探選擇一次overload過載節(jié)點
int probe_num;
//初始化host_info主機信息訪問成功的個數(shù)砸讳,防止剛啟動時少量失敗就認(rèn)為過載
int init_succ_cnt;
//**************************************************
//當(dāng)idle節(jié)點失敗率高于此值,節(jié)點變overload狀態(tài)
float err_rate;
//當(dāng)overload節(jié)點成功率高于此值界牡,節(jié)點變成idle狀態(tài)
float succ_rate;
//當(dāng)idle節(jié)點連續(xù)失敗次數(shù)超過此值簿寂,節(jié)點變成overload狀態(tài)
int contin_err_limit;
//當(dāng)overload節(jié)點連續(xù)成功次數(shù)超過此值, 節(jié)點變成idle狀態(tài)
int contin_succ_limit;
//當(dāng)前agent本地ip地址(用于上報 填充caller字段)
uint32_t local_ip;
//**************************************************
};
lars_loadbalance_agent/src/main_server.cpp
#include "main_server.h"
#include "lars.pb.h"
#include <netdb.h>
// ...
//--------- 全局資源 ----------
static void init_lb_agent()
{
//1. 加載配置文件
config_file::setPath("./conf/lars_lb_agent.conf");
lb_config.probe_num = config_file::instance()->GetNumber("loadbalance", "probe_num", 10);
lb_config.init_succ_cnt = config_file::instance()->GetNumber("loadbalance", "init_succ_cnt", 180);
lb_config.err_rate = config_file::instance()->GetFloat("loadbalance", "err_rate", 0.1);
lb_config.succ_rate = config_file::instance()->GetFloat("loadbalance", "succ_rate", 0.92);
lb_config.contin_succ_limit = config_file::instance()->GetNumber("loadbalance", "contin_succ_limit", 10);
lb_config.contin_err_limit = config_file::instance()->GetNumber("loadbalance", "contin_err_limit", 10);
//2. 初始化3個route_lb模塊
create_route_lb();
//3. 加載本地ip
char my_host_name[1024];
if (gethostname(my_host_name, 1024) == 0) {
struct hostent *hd = gethostbyname(my_host_name);
if (hd)
{
struct sockaddr_in myaddr;
myaddr.sin_addr = *(struct in_addr*)hd->h_addr;
lb_config.local_ip = ntohl(myaddr.sin_addr.s_addr);
}
}
if (!lb_config.local_ip) {
struct in_addr inaddr;
inet_aton("127.0.0.1", &inaddr);
lb_config.local_ip = ntohl(inaddr.s_addr);
}
}
// ...
這里的本地ip,是之后在上報的時候宿亡,發(fā)送消息需要一個caller參數(shù)常遂,這個caller參數(shù)我們就暫時默認(rèn)是當(dāng)前agent的ip為caller。
7.4 Agent UDP Server處理API-Report請求
? 接下來我們針對API發(fā)送的report的ID_ReportRequest
進行處理.
lars_loadbalance_agent/src/agent_udp_server.cpp
#include "lars_reactor.h"
#include "main_server.h"
// ...
static void report_cb(const char *data, uint32_t len, int msgid, net_connection *net_conn, void *user_data)
{
lars::ReportRequest req;
req.ParseFromArray(data, len);
route_lb *ptr_route_lb = (route_lb*)user_data;
ptr_route_lb->report_host(req);
}
void * agent_server_main(void * args)
{
long index = (long)args;
short port = index + 8888;
event_loop loop;
udp_server server(&loop, "0.0.0.0", port);
//給server注冊消息分發(fā)路由業(yè)務(wù), 針對ID_GetHostRequest處理 每個udp擁有一個對應(yīng)的route_lb
server.add_msg_router(lars::ID_GetHostRequest, get_host_cb, r_lb[port-8888]);
//======================================================
//給server注冊消息分發(fā)路由業(yè)務(wù)挽荠,針對ID_ReportRequest處理
server.add_msg_router(lars::ID_ReportRequest, report_cb, r_lb[port-8888]);
//======================================================
printf("agent UDP server :port %d is started...\n", port);
loop.event_process();
return NULL;
}
void start_UDP_servers(void)
{
for (long i = 0; i < 3; i ++) {
pthread_t tid;
int ret = pthread_create(&tid, NULL, agent_server_main, (void*)i);
if (ret == -1) {
perror("pthread_create");
exit(1);
}
pthread_detach(tid);
}
}
這里主要是通過一個udp server中的route_lb對象來調(diào)用的report_host(req)
方法克胳。我們來實現(xiàn)這個方法。
lars_loadbalance_agent/src/route_lb.cpp
//agent 上報某主機的獲取結(jié)果
void route_lb::report_host(lars::ReportRequest req)
{
int modid = req.modid();
int cmdid = req.cmdid();
int retcode = req.retcode();
int ip = req.host().ip();
int port = req.host().port();
uint64_t key = ((uint64_t)modid << 32) + cmdid;
pthread_mutex_lock(&_mutex);
if (_route_lb_map.find(key) != _route_lb_map.end()) {
load_balance *lb = _route_lb_map[key];
lb->report(ip, port, retcode);
//上報信息給遠(yuǎn)程reporter服務(wù)器
lb->commit();
}
pthread_mutex_unlock(&_mutex);
}
當(dāng)然圈匆,route_lb最終還是分管每個modid/cmdid對應(yīng)的load_balance模塊,那么選擇一個可用的load_balance對象漠另,調(diào)用load_balance
的report()
方法.而通過commit()
方法,將report的上報結(jié)果提交到遠(yuǎn)程的report service
中去跃赚。
接下來我們看一下load_balance
的report方法實現(xiàn).
lars_loadbalance_agent/src/load_balance.cpp
//上報當(dāng)前host主機調(diào)用情況給遠(yuǎn)端repoter service
void load_balance::report(int ip, int port, int retcode)
{
uint64_t key = ((uint64_t)ip << 32) + port;
if (_host_map.find(key) == _host_map.end()) {
return;
}
//1 計數(shù)統(tǒng)計
host_info *hi = _host_map[key];
if (retcode == lars::RET_SUCC) { // retcode == 0
//更新虛擬成功笆搓、真實成功次數(shù)
hi->vsucc ++;
hi->rsucc ++;
//連續(xù)成功增加
hi->contin_succ ++;
//連續(xù)失敗次數(shù)歸零
hi->contin_err = 0;
}
else {
//更新虛擬失敗、真實失敗次數(shù)
hi->verr ++;
hi->rerr ++;
//連續(xù)失敗個數(shù)增加
hi->contin_err++;
//連續(xù)成功次數(shù)歸零
hi->contin_succ = 0;
}
//2.檢查節(jié)點狀態(tài)
//檢查idle節(jié)點是否滿足overload條件
//或者overload節(jié)點是否滿足idle條件
//--> 如果是dile節(jié)點,則只有調(diào)用失敗才有必要判斷是否達到overload條件
if (hi->overload == false && retcode != lars::RET_SUCC) {
bool overload = false;
//idle節(jié)點纬傲,檢查是否達到判定為overload的狀態(tài)條件
//(1).計算失敗率,如果大于預(yù)設(shè)值失敗率满败,則為overload
double err_rate = hi->verr * 1.0 / (hi->vsucc + hi->verr);
if (err_rate > lb_config.err_rate) {
overload = true;
}
//(2).連續(xù)失敗次數(shù)達到閾值,判定為overload
if( overload == false && hi->contin_err >= (uint32_t)lb_config.contin_err_limit) {
overload = true;
}
//判定overload需要做的更改流程
if (overload) {
struct in_addr saddr;
saddr.s_addr = htonl(hi->ip);
printf("[%d, %d] host %s:%d change overload, succ %u err %u\n",
_modid, _cmdid, inet_ntoa(saddr), hi->port, hi->vsucc, hi->verr);
//設(shè)置hi為overload狀態(tài)
hi->set_overload();
//移出_idle_list,放入_overload_list
_idle_list.remove(hi);
_overload_list.push_back(hi);
return;
}
}
//--> 如果是overload節(jié)點叹括,則只有調(diào)用成功才有必要判斷是否達到idle條件
else if (hi->overload == true && retcode == lars::RET_SUCC) {
bool idle = false;
//overload節(jié)點葫录,檢查是否達到回到idle狀態(tài)的條件
//(1).計算成功率,如果大于預(yù)設(shè)值的成功率领猾,則為idle
double succ_rate = hi->vsucc * 1.0 / (hi->vsucc + hi->verr);
if (succ_rate > lb_config.succ_rate) {
idle = true;
}
//(2).連續(xù)成功次數(shù)達到閾值米同,判定為idle
if (idle == false && hi->contin_succ >= (uint32_t)lb_config.contin_succ_limit) {
idle = true;
}
//判定為idle需要做的更改流程
if (idle) {
struct in_addr saddr;
saddr.s_addr = htonl(hi->ip);
printf("[%d, %d] host %s:%d change idle, succ %u err %u\n",
_modid, _cmdid, inet_ntoa(saddr), hi->port, hi->vsucc, hi->verr);
//設(shè)置為idle狀態(tài)
hi->set_idle();
//移出overload_list, 放入_idle_list
_overload_list.remove(hi);
_idle_list.push_back(hi);
return;
}
}
//TODO 窗口檢查和超時機制
}
其中set_idle()
與set_overload()
方法實現(xiàn)如下:
lars_loadbalance_agent/src/host_info.cpp
#include "host_info.h"
#include "main_server.h"
void host_info::set_idle()
{
vsucc = lb_config.init_succ_cnt;
verr = 0;
rsucc = 0;
rerr = 0;
contin_succ = 0;
contin_err = 0;
overload = false;
}
void host_info::set_overload()
{
vsucc = 0;
verr = lb_config.init_err_cnt;//overload的初試虛擬err錯誤次數(shù)
rsucc = 0;
rerr = 0;
contin_err = 0;
contin_succ = 0;
overload = true;
}
load_balance
的report()
方法實現(xiàn)主要是針對兩個鏈表做節(jié)點的交替處理。和成功率失敗率的判斷摔竿。
節(jié)點失敗率 = 節(jié)點
verr
/ (vsucc
+verr
)
節(jié)點成功率 = 節(jié)點vsucc
/ (vsucc
+verr
)
當(dāng)idle節(jié)點的失敗率>預(yù)設(shè)值(默認(rèn)10%)面粮,將節(jié)點判定為overload;
當(dāng)overload節(jié)點的成功率>預(yù)設(shè)值(默認(rèn)95%)继低,將節(jié)點判定為idle熬苍;
而不可以idle/overload節(jié)點都只關(guān)注成功率or都只關(guān)注失敗率,這樣可能造成節(jié)點在idle/overload狀態(tài)間頻繁切換
為idle節(jié)點、overload節(jié)點設(shè)置不同的閾值可以區(qū)別對待柴底。
? 接下來我們來實現(xiàn)load_balance
的commit()
方法婿脸。
lars_loadbalance_agent/src/load_balance.cpp
//提交host的調(diào)用結(jié)果給遠(yuǎn)程reporter service上報結(jié)果
void load_balance::commit()
{
if (this->empty() == true) {
return;
}
//1. 封裝請求消息
lars::ReportStatusRequest req;
req.set_modid(_modid);
req.set_cmdid(_cmdid);
req.set_ts(time(NULL));
req.set_caller(lb_config.local_ip);
//2. 從idle_list取值
for (host_list_it it = _idle_list.begin(); it != _idle_list.end(); it++) {
host_info *hi = *it;
lars::HostCallResult call_res;
call_res.set_ip(hi->ip);
call_res.set_port(hi->port);
call_res.set_succ(hi->rsucc);
call_res.set_err(hi->rerr);
call_res.set_overload(false);
req.add_results()->CopyFrom(call_res);
}
//3. 從over_list取值
for (host_list_it it = _overload_list.begin(); it != _overload_list.end(); it++) {
host_info *hi = *it;
lars::HostCallResult call_res;
call_res.set_ip(hi->ip);
call_res.set_port(hi->port);
call_res.set_succ(hi->rsucc);
call_res.set_err(hi->rerr);
call_res.set_overload(true);
req.add_results()->CopyFrom(call_res);
}
//4 發(fā)送給report_client 的消息隊列
report_queue->send(req);
}
關(guān)于作者:
作者:Aceld(劉丹冰)
mail: danbing.at@gmail.com
github: https://github.com/aceld
原創(chuàng)書籍gitbook: http://legacy.gitbook.com/@aceld
原創(chuàng)聲明:未經(jīng)作者允許請勿轉(zhuǎn)載, 如果轉(zhuǎn)載請注明出處