【Lars教程目錄】
Lars源代碼
https://github.com/aceld/Lars
【Lars系統(tǒng)概述】
第1章-概述
第2章-項(xiàng)目目錄構(gòu)建
【Lars系統(tǒng)之Reactor模型服務(wù)器框架模塊】
第1章-項(xiàng)目結(jié)構(gòu)與V0.1雛形
第2章-內(nèi)存管理與Buffer封裝
第3章-事件觸發(fā)EventLoop
第4章-鏈接與消息封裝
第5章-Client客戶端模型
第6章-連接管理及限制
第7章-消息業(yè)務(wù)路由分發(fā)機(jī)制
第8章-鏈接創(chuàng)建/銷毀Hook機(jī)制
第9章-消息任務(wù)隊(duì)列與線程池
第10章-配置文件讀寫功能
第11章-udp服務(wù)與客戶端
第12章-數(shù)據(jù)傳輸協(xié)議protocol buffer
第13章-QPS性能測(cè)試
第14章-異步消息任務(wù)機(jī)制
第15章-鏈接屬性設(shè)置功能
【Lars系統(tǒng)之DNSService模塊】
第1章-Lars-dns簡介
第2章-數(shù)據(jù)庫創(chuàng)建
第3章-項(xiàng)目目錄結(jié)構(gòu)及環(huán)境構(gòu)建
第4章-Route結(jié)構(gòu)的定義
第5章-獲取Route信息
第6章-Route訂閱模式
第7章-Backend Thread實(shí)時(shí)監(jiān)控
【Lars系統(tǒng)之Report Service模塊】
第1章-項(xiàng)目概述-數(shù)據(jù)表及proto3協(xié)議定義
第2章-獲取report上報(bào)數(shù)據(jù)
第3章-存儲(chǔ)線程池及消息隊(duì)列
【Lars系統(tǒng)之LoadBalance Agent模塊】
第1章-項(xiàng)目概述及構(gòu)建
第2章-主模塊業(yè)務(wù)結(jié)構(gòu)搭建
第3章-Report與Dns Client設(shè)計(jì)與實(shí)現(xiàn)
第4章-負(fù)載均衡模塊基礎(chǔ)設(shè)計(jì)
第5章-負(fù)載均衡獲取Host主機(jī)信息API
第6章-負(fù)載均衡上報(bào)Host主機(jī)信息API
第7章-過期窗口清理與過載超時(shí)(V0.5)
第8章-定期拉取最新路由信息(V0.6)
第9章-負(fù)載均衡獲取Route信息API(0.7)
第10章-API初始化接口(V0.8)
第11章-Lars Agent性能測(cè)試工具
第12章- Lars啟動(dòng)工具腳本
6) Route訂閱模式
6.1 訂閱模塊的設(shè)計(jì)與實(shí)現(xiàn)
? 訂閱模式整體的設(shè)計(jì).
lars_dns/include/subscribe.h
#pragma once
#include <vector>
#include <pthread.h>
#include <ext/hash_set>
#include <ext/hash_map>
#include "lars_reactor.h"
#include "lars.pb.h"
#include "dns_route.h"
using namespace __gnu_cxx;
//定義訂閱列表數(shù)據(jù)關(guān)系類型橘券,key->modid/cmdid镀赌, value->fds(訂閱的客戶端文件描述符)
typedef hash_map<uint64_t, hash_set<int>> subscribe_map;
//定義發(fā)布列表的數(shù)據(jù)關(guān)系類型, key->fd(訂閱客戶端的文件描述符), value->modids
typedef hash_map<int, hash_set<uint64_t>> publish_map;
class SubscribeList {
public:
//設(shè)計(jì)單例
static void init() {
_instance = new SubscribeList();
}
static SubscribeList *instance() {
//保證init方法在這個(gè)進(jìn)程執(zhí)行中型酥,只執(zhí)行一次
pthread_once(&_once, init);
return _instance;
}
//訂閱
void subscribe(uint64_t mod, int fd);
//取消訂閱
void unsubscribe(uint64_t mod, int fd);
//發(fā)布
void publish(std::vector<uint64_t> &change_mods);
//根據(jù)在線用戶fd得到需要發(fā)布的列表
void make_publish_map(listen_fd_set &online_fds,
publish_map &need_publish);
private:
//設(shè)計(jì)單例
SubscribeList();
SubscribeList(const SubscribeList &);
const SubscribeList& operator=(const SubscribeList);
static SubscribeList *_instance;
static pthread_once_t _once;
subscribe_map _book_list; //訂閱清單
pthread_mutex_t _book_list_lock;
publish_map _push_list; //發(fā)布清單
pthread_mutex_t _push_list_lock;
};
? 首先SubscribeList
采用單例設(shè)計(jì)。這里面定義了兩種數(shù)據(jù)類型
//定義訂閱列表數(shù)據(jù)關(guān)系類型耸携,key->modid/cmdid, value->fds(訂閱的客戶端文件描述符)
typedef hash_map<uint64_t, hash_set<int>> subscribe_map;
//定義發(fā)布列表的數(shù)據(jù)關(guān)系類型, key->fd(訂閱客戶端的文件描述符), value->modids
typedef hash_map<int, hash_set<uint64_t>> publish_map;
? subscribe_map
是目前dns系統(tǒng)的總體訂閱列表伴鳖,記錄了訂閱的modid/cmdid都有哪些fds已經(jīng)訂閱了婴削,其實(shí)一個(gè)fd就代表一個(gè)客戶端。
? publish_map
是即將發(fā)布的表隙券,其實(shí)這里面是subscribe_map的一個(gè)反表男应,key是訂閱的客戶端fd,而value是該客戶端需要接收的訂閱modid/cmdid數(shù)據(jù)娱仔。
屬性:
_book_list
:目前dns已經(jīng)全部的訂閱信息清單沐飘。
_push_list
:目前dns即將發(fā)布的客戶端及訂閱信息清單。
方法
void subscribe(uint64_t mod, int fd)
: 加入modid/cmdid 和訂閱的客戶端fd到_book_list中牲迫。
void unsubscribe(uint64_t mod, int fd)
:取消一條訂閱數(shù)據(jù)耐朴。
void publish(std::vector<uint64_t> &change_mods)
: 發(fā)布訂閱數(shù)據(jù),其中change_mods是需要發(fā)布的那些modid/cmdid組合盹憎。
void make_publish_map(listen_fd_set &online_fds, publish_map &need_publish)
: 根據(jù)目前在線的訂閱用戶筛峭,得到需要通信的發(fā)布訂閱列表。
具體實(shí)現(xiàn)如下:
lars_dns/src/subscribe.cpp
#include "subscribe.h"
extern tcp_server *server;
//單例對(duì)象
SubscribeList *SubscribeList::_instance = NULL;
//用于保證創(chuàng)建單例的init方法只執(zhí)行一次的鎖
pthread_once_t SubscribeList::_once = PTHREAD_ONCE_INIT;
SubscribeList::SubscribeList()
{
}
//訂閱
void SubscribeList::subscribe(uint64_t mod, int fd)
{
//將mod->fd的關(guān)系加入到_book_list中
pthread_mutex_lock(&_book_list_lock);
_book_list[mod].insert(fd);
pthread_mutex_unlock(&_book_list_lock);
}
//取消訂閱
void SubscribeList::unsubscribe(uint64_t mod, int fd)
{
//將mod->fd關(guān)系從_book_list中刪除
pthread_mutex_lock(&_book_list_lock);
if (_book_list.find(mod) != _book_list.end()) {
_book_list[mod].erase(fd);
if (_book_list[mod].empty() == true) {
_book_list.erase(mod);
}
}
pthread_mutex_unlock(&_book_list_lock);
}
void push_change_task(event_loop *loop, void *args)
{
SubscribeList *subscribe = (SubscribeList*)args;
//1 獲取全部的在線客戶端fd
listen_fd_set online_fds;
loop->get_listen_fds(online_fds);
//2 從subscribe的_push_list中 找到與online_fds集合匹配陪每,放在一個(gè)新的publish_map里
publish_map need_publish;
subscribe->make_publish_map(online_fds, need_publish);
//3 依次從need_publish取出數(shù)據(jù) 發(fā)送給對(duì)應(yīng)客戶端鏈接
publish_map::iterator it;
for (it = need_publish.begin(); it != need_publish.end(); it++) {
int fd = it->first; //fd
//遍歷 fd對(duì)應(yīng)的 modid/cmdid集合
hash_set<uint64_t>::iterator st;
for (st = it->second.begin(); st != it->second.end(); st++) {
//一個(gè)modid/cmdid
int modid = int((*st) >> 32);
int cmdid = int(*st);
//組裝pb消息影晓,發(fā)送給客戶
lars::GetRouteResponse rsp;
rsp.set_modid(modid);
rsp.set_cmdid(cmdid);
//通過route查詢對(duì)應(yīng)的host ip/port信息 進(jìn)行組裝
host_set hosts = Route::instance()->get_hosts(modid, cmdid) ;
for (host_set_it hit = hosts.begin(); hit != hosts.end(); hit++) {
uint64_t ip_port_pair = *hit;
lars::HostInfo host_info;
host_info.set_ip((uint32_t)(ip_port_pair >> 32));
host_info.set_port((int)ip_port_pair);
//添加到rsp中
rsp.add_host()->CopyFrom(host_info);
}
//給當(dāng)前fd 發(fā)送一個(gè)更新消息
std::string responseString;
rsp.SerializeToString(&responseString);
//通過fd取出鏈接信息
net_connection *conn = tcp_server::conns[fd];
if (conn != NULL) {
conn->send_message(responseString.c_str(), responseString.size(), lars::ID_GetRouteResponse);
}
}
}
}
//根據(jù)在線用戶fd得到需要發(fā)布的列表
void SubscribeList::make_publish_map(
listen_fd_set &online_fds,
publish_map &need_publish)
{
publish_map::iterator it;
pthread_mutex_lock(&_push_list_lock);
//遍歷_push_list 找到 online_fds匹配的數(shù)據(jù),放到need_publish中
for (it = _push_list.begin(); it != _push_list.end(); it++) {
//it->first 是 fd
//it->second 是 modid/cmdid
if (online_fds.find(it->first) != online_fds.end()) {
//匹配到
//當(dāng)前的鍵值對(duì)移動(dòng)到need_publish中
need_publish[it->first] = _push_list[it->first];
//當(dāng)該組數(shù)據(jù)從_push_list中刪除掉
_push_list.erase(it);
}
}
pthread_mutex_unlock(&_push_list_lock);
}
//發(fā)布
void SubscribeList::publish(std::vector<uint64_t> &change_mods)
{
//1 將change_mods已經(jīng)修改的mod->fd
// 放到 發(fā)布清單_push_list中
pthread_mutex_lock(&_book_list_lock);
pthread_mutex_lock(&_push_list_lock);
std::vector<uint64_t>::iterator it;
for (it = change_mods.begin(); it != change_mods.end(); it++) {
uint64_t mod = *it;
if (_book_list.find(mod) != _book_list.end()) {
//將mod下面的fd set集合拷遷移到 _push_list中
hash_set<int>::iterator fds_it;
for (fds_it = _book_list[mod].begin(); fds_it != _book_list[mod].end(); fds_it++) {
int fd = *fds_it;
_push_list[fd].insert(mod);
}
}
}
pthread_mutex_unlock(&_push_list_lock);
pthread_mutex_unlock(&_book_list_lock);
//2 通知各個(gè)線程去執(zhí)行推送任務(wù)
server->thread_poll()->send_task(push_change_task, this);
}
? 這里需要注意的是publish()
里的server變量是全局變量檩禾,全局唯一的server句柄挂签。
6.2 開啟訂閱
? 那么訂閱功能實(shí)現(xiàn)了,該如何是調(diào)用觸發(fā)訂閱功能能盼产,我們可以在一個(gè)客戶端建立連接成功之后來調(diào)用.
lars_dns/src/dns_service.cpp
#include <ext/hash_set>
#include "lars_reactor.h"
#include "subscribe.h"
#include "dns_route.h"
#include "lars.pb.h"
tcp_server *server;
using __gnu_cxx::hash_set;
typedef hash_set<uint64_t> client_sub_mod_list;
// ...
//訂閱route 的modid/cmdid
void create_subscribe(net_connection * conn, void *args)
{
conn->param = new client_sub_mod_list;
}
//退訂route 的modid/cmdid
void clear_subscribe(net_connection * conn, void *args)
{
client_sub_mod_list::iterator it;
client_sub_mod_list *sub_list = (client_sub_mod_list*)conn->param;
for (it = sub_list->begin(); it != sub_list->end(); it++) {
uint64_t mod = *it;
SubscribeList::instance()->unsubscribe(mod, conn->get_fd());
}
delete sub_list;
conn->param = NULL;
}
int main(int argc, char **argv)
{
event_loop loop;
//加載配置文件
config_file::setPath("conf/lars_dns.conf");
std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
short port = config_file::instance()->GetNumber("reactor", "port", 7778);
//創(chuàng)建tcp服務(wù)器
server = new tcp_server(&loop, ip.c_str(), port);
//==========注冊(cè)鏈接創(chuàng)建/銷毀Hook函數(shù)============
server->set_conn_start(create_subscribe);
server->set_conn_close(clear_subscribe);
//============================================
//注冊(cè)路由業(yè)務(wù)
server->add_msg_router(lars::ID_GetRouteRequest, get_route);
//開始事件監(jiān)聽
printf("lars dns service ....\n");
loop.event_process();
return 0;
}
? 這里注冊(cè)了兩個(gè)鏈接Hook竹握。create_subscribe()
和clear_subscribe()
。
client_sub_mod_list
為當(dāng)前客戶端鏈接所訂閱的route信息列表辆飘。主要存放當(dāng)前客戶訂閱的modid/cmdid的集合啦辐。因?yàn)椴煌目蛻舳擞嗛喌男畔⒉煌酱砸獙⒃摿斜砼c每個(gè)conn進(jìn)行綁定。
關(guān)于作者:
作者:Aceld(劉丹冰)
mail: danbing.at@gmail.com
github: https://github.com/aceld
原創(chuàng)書籍gitbook: http://legacy.gitbook.com/@aceld
原創(chuàng)聲明:未經(jīng)作者允許請(qǐng)勿轉(zhuǎn)載, 如果轉(zhuǎn)載請(qǐng)注明出處