【Lars教程目錄】
Lars源代碼
https://github.com/aceld/Lars
【Lars系統(tǒng)概述】
第1章-概述
第2章-項目目錄構(gòu)建
【Lars系統(tǒng)之Reactor模型服務(wù)器框架模塊】
第1章-項目結(jié)構(gòu)與V0.1雛形
第2章-內(nèi)存管理與Buffer封裝
第3章-事件觸發(fā)EventLoop
第4章-鏈接與消息封裝
第5章-Client客戶端模型
第6章-連接管理及限制
第7章-消息業(yè)務(wù)路由分發(fā)機制
第8章-鏈接創(chuàng)建/銷毀Hook機制
第9章-消息任務(wù)隊列與線程池
第10章-配置文件讀寫功能
第11章-udp服務(wù)與客戶端
第12章-數(shù)據(jù)傳輸協(xié)議protocol buffer
第13章-QPS性能測試
第14章-異步消息任務(wù)機制
第15章-鏈接屬性設(shè)置功能
【Lars系統(tǒng)之DNSService模塊】
第1章-Lars-dns簡介
第2章-數(shù)據(jù)庫創(chuàng)建
第3章-項目目錄結(jié)構(gòu)及環(huán)境構(gòu)建
第4章-Route結(jié)構(gòu)的定義
第5章-獲取Route信息
第6章-Route訂閱模式
第7章-Backend Thread實時監(jiān)控
【Lars系統(tǒng)之Report Service模塊】
第1章-項目概述-數(shù)據(jù)表及proto3協(xié)議定義
第2章-獲取report上報數(shù)據(jù)
第3章-存儲線程池及消息隊列
【Lars系統(tǒng)之LoadBalance Agent模塊】
第1章-項目概述及構(gòu)建
第2章-主模塊業(yè)務(wù)結(jié)構(gòu)搭建
第3章-Report與Dns Client設(shè)計與實現(xiàn)
第4章-負載均衡模塊基礎(chǔ)設(shè)計
第5章-負載均衡獲取Host主機信息API
第6章-負載均衡上報Host主機信息API
第7章-過期窗口清理與過載超時(V0.5)
第8章-定期拉取最新路由信息(V0.6)
第9章-負載均衡獲取Route信息API(0.7)
第10章-API初始化接口(V0.8)
第11章-Lars Agent性能測試工具
第12章- Lars啟動工具腳本
10) 負載均衡獲取Route信息API(0.7)
10.1 proto通信協(xié)議定義
base/proto/lars.proto
/* Lars系統(tǒng)的消息ID */
enum MessageId {
ID_UNKNOW = 0; //proto3 enum第一個屬性必須是0肺孵,用來占位
ID_GetRouteRequest = 1; //向DNS請求Route對應(yīng)的關(guān)系的消息ID
ID_GetRouteResponse = 2; //DNS回復(fù)的Route信息的消息ID
ID_ReportStatusRequest = 3; //上報host調(diào)用狀態(tài)信息請求消息ID
ID_GetHostRequest = 4; //API 發(fā)送請求host信息給 Lb Agent模塊 消息ID
ID_GetHostResponse = 5; //agent 回執(zhí)給 API host信息的 消息ID
ID_ReportRequest = 6; //API report get_host的調(diào)用結(jié)果給agent的 消息ID
// =======================================================
ID_API_GetRouteRequest = 7; //API 請求agent某個modid/cmdid的全部hosts信息的route 消息ID
ID_API_GetRouteResponse = 8; //agent 回執(zhí)給 API的全部hosts的route信息 消息ID
// =======================================================
}
? 增加兩個message ID裁蚁, ID_API_GetRouteRequest
和ID_API_GetRouteResponse
,主要是針對API層獲取route全部的host節(jié)點信息通信使用兢榨。
10.2 Lars-API:get_route()方法客戶端實現(xiàn)
api/cpp/lars_api/lars_api.h
typedef std::pair<std::string, int> ip_port;
typedef std::vector<ip_port> route_set;
typedef route_set::iterator route_set_it;
api/cpp/lars_api/lars_api.cpp
//lars 系統(tǒng)獲取某modid/cmdid全部的hosts(route)信息
int lars_client::get_route(int modid, int cmdid, route_set &route)
{
//1. 封裝請求消息
lars::GetRouteRequest req;
req.set_modid(modid);
req.set_cmdid(cmdid);
//2. send
char write_buf[4096], read_buf[80*1024];
//消息頭
msg_head head;
head.msglen = req.ByteSizeLong();
head.msgid = lars::ID_API_GetRouteRequest;
memcpy(write_buf, &head, MESSAGE_HEAD_LEN);
//消息體
req.SerializeToArray(write_buf+MESSAGE_HEAD_LEN, head.msglen);
//簡單的hash來發(fā)給對應(yīng)的agent udp server
int index = (modid + cmdid) %3;
int ret = sendto(_sockfd[index], write_buf, head.msglen + MESSAGE_HEAD_LEN, 0, NULL, 0);
if (ret == -1) {
perror("sendto");
return lars::RET_SYSTEM_ERROR;
}
//3. recv
lars::GetRouteResponse rsp;
int message_len = recvfrom(_sockfd[index], read_buf, sizeof(read_buf), 0, NULL, NULL);
if (message_len == -1) {
perror("recvfrom");
return lars::RET_SYSTEM_ERROR;
}
//消息頭
memcpy(&head, read_buf, MESSAGE_HEAD_LEN);
if (head.msgid != lars::ID_API_GetRouteResponse) {
fprintf(stderr, "message ID error!\n");
return lars::RET_SYSTEM_ERROR;
}
//消息體
ret = rsp.ParseFromArray(read_buf + MESSAGE_HEAD_LEN, message_len - MESSAGE_HEAD_LEN);
if (!ret) {
fprintf(stderr, "message format error: head.msglen = %d, message_len = %d, message_len - MESSAGE_HEAD_LEN = %d, head msgid = %d, ID_GetHostResponse = %d\n", head.msglen, message_len, message_len-MESSAGE_HEAD_LEN, head.msgid, lars::ID_GetRouteResponse);
return lars::RET_SYSTEM_ERROR;
}
if (rsp.modid() != modid || rsp.cmdid() != cmdid) {
fprintf(stderr, "message format error\n");
return lars::RET_SYSTEM_ERROR;
}
//4 處理消息
for (int i = 0; i < rsp.host_size(); i++) {
const lars::HostInfo &host = rsp.host(i);
struct in_addr inaddr;
inaddr.s_addr = host.ip();
std::string ip = inet_ntoa(inaddr);
int port = host.port();
route.push_back(ip_port(ip,port));
}
return lars::RET_SUCC;
}
10.3 Agent UDP Server處理API-get_route請求
lars_loadbalance_agent/src/agent_udp_server.cpp
static void get_route_cb(const char *data, uint32_t len, int msgid, net_connection *net_conn, void *user_data)
{
//解析api發(fā)送的請求包
lars::GetRouteRequest req;
req.ParseFromArray(data, len);
int modid = req.modid();
int cmdid = req.cmdid();
//設(shè)置回執(zhí)消息
lars::GetRouteResponse rsp;
rsp.set_modid(modid);
rsp.set_cmdid(cmdid);
route_lb *ptr_route_lb = (route_lb*)user_data;
//調(diào)用route_lb的獲取host方法狠半,得到rsp返回結(jié)果
ptr_route_lb->get_route(modid, cmdid, rsp);
//打包回執(zhí)給api消息
std::string responseString;
rsp.SerializeToString(&responseString);
net_conn->send_message(responseString.c_str(), responseString.size(), lars::ID_API_GetRouteResponse);
}
void * agent_server_main(void * args)
{
// ....
//給server注冊消息分發(fā)路由業(yè)務(wù)盗温,針對ID_API_GetRouteRequest處理
server.add_msg_router(lars::ID_API_GetRouteRequest, get_route_cb, r_lb[port-8888]);
// ...
return NULL;
}
針對ID_API_GetRouteRequest
添加一個消息處理方法熔酷,在回調(diào)業(yè)務(wù)中,通過route_lb
的get_route()
方法獲取信息.我們來實現(xiàn)這個方法当娱。
lars_loadbalance_agent/src/route_lb.cpp
//agent獲取某個modid/cmdid的全部主機吃既,將返回的主機結(jié)果存放在rsp中
int route_lb::get_route(int modid, int cmdid, lars::GetRouteResponse &rsp)
{
int ret = lars::RET_SUCC;
//1. 得到key
uint64_t key = ((uint64_t)modid << 32) + cmdid;
pthread_mutex_lock(&_mutex);
//2. 當(dāng)前key已經(jīng)存在_route_lb_map中
if (_route_lb_map.find(key) != _route_lb_map.end()) {
//2.1 取出對應(yīng)的load_balance
load_balance *lb = _route_lb_map[key];
std::vector<host_info*> vec;
lb->get_all_hosts(vec);
for (std::vector<host_info*>::iterator it = vec.begin(); it != vec.end(); it++) {
lars::HostInfo host;
host.set_ip((*it)->ip);
host.set_port((*it)->port);
rsp.add_host()->CopyFrom(host);
}
//超時重拉路由
//檢查是否要重新拉路由信息
//若路由并沒有處于PULLING狀態(tài),且有效期已經(jīng)超時跨细,則重新拉取
if (lb->status == load_balance::NEW && time(NULL) - lb->last_update_time > lb_config.update_timeout) {
lb->pull();
}
}
//3. 當(dāng)前key不存在_route_lb_map中
else {
//3.1 新建一個load_balance
load_balance *lb = new load_balance(modid, cmdid);
if (lb == NULL) {
fprintf(stderr, "no more space to create loadbalance\n");
exit(1);
}
//3.2 新建的load_balance加入到map中
_route_lb_map[key] = lb;
//3.3 從dns service服務(wù)拉取具體的host信息
lb->pull();
ret = lars::RET_NOEXIST;
}
pthread_mutex_unlock(&_mutex);
return ret;
}
其中鹦倚,load_balance
的get_all_hosts()
方法實現(xiàn)如下:
lars_loadbalance_agent/src/load_balance.cpp
//獲取當(dāng)前掛載下的全部host信息 添加到vec中
void load_balance::get_all_hosts(std::vector<host_info*> &vec)
{
for (host_map_it it = _host_map.begin(); it != _host_map.end(); it++) {
host_info *hi = it->second;
vec.push_back(hi);
}
}
接下來,我們可以簡單測試一些獲取route信息的api
api/cpp/example/example.cpp
#include "lars_api.h"
#include <iostream>
void usage()
{
printf("usage: ./example [modid] [cmdid]\n");
}
int main(int argc, char **argv)
{
if (argc != 3) {
usage();
return 1;
}
int modid = atoi(argv[1]);
int cmdid = atoi(argv[2]);
lars_client api;
std::string ip;
int port;
route_set route;
int ret = api.get_route(modid, cmdid, route);
if (ret == 0) {
std::cout << "get route succ!" << std::endl;
for (route_set_it it = route.begin(); it != route.end(); it++) {
std::cout << "ip = " << (*it).first << ", port = " << (*it).second << std::endl;
}
}
ret = api.get_host(modid, cmdid, ip, port);
if (ret == 0) {
std::cout << "host is " << ip << ":" << port << std::endl;
//上報調(diào)用結(jié)果
api.report(modid, cmdid, ip, port, 0);
}
return 0;
}
關(guān)于作者:
作者:Aceld(劉丹冰)
mail: danbing.at@gmail.com
github: https://github.com/aceld
原創(chuàng)書籍gitbook: http://legacy.gitbook.com/@aceld
原創(chuàng)聲明:未經(jīng)作者允許請勿轉(zhuǎn)載, 如果轉(zhuǎn)載請注明出處