(23)Route訂閱模式(Dns部分)-【Lars-基于C++負(fù)載均衡遠(yuǎn)程服務(wù)器調(diào)度系統(tǒng)教程】

【Lars教程目錄】

Lars源代碼
https://github.com/aceld/Lars


【Lars系統(tǒng)概述】
第1章-概述
第2章-項(xiàng)目目錄構(gòu)建


【Lars系統(tǒng)之Reactor模型服務(wù)器框架模塊】
第1章-項(xiàng)目結(jié)構(gòu)與V0.1雛形
第2章-內(nèi)存管理與Buffer封裝
第3章-事件觸發(fā)EventLoop
第4章-鏈接與消息封裝
第5章-Client客戶端模型
第6章-連接管理及限制
第7章-消息業(yè)務(wù)路由分發(fā)機(jī)制
第8章-鏈接創(chuàng)建/銷毀Hook機(jī)制
第9章-消息任務(wù)隊(duì)列與線程池
第10章-配置文件讀寫功能
第11章-udp服務(wù)與客戶端
第12章-數(shù)據(jù)傳輸協(xié)議protocol buffer
第13章-QPS性能測(cè)試
第14章-異步消息任務(wù)機(jī)制
第15章-鏈接屬性設(shè)置功能


【Lars系統(tǒng)之DNSService模塊】
第1章-Lars-dns簡介
第2章-數(shù)據(jù)庫創(chuàng)建
第3章-項(xiàng)目目錄結(jié)構(gòu)及環(huán)境構(gòu)建
第4章-Route結(jié)構(gòu)的定義
第5章-獲取Route信息
第6章-Route訂閱模式
第7章-Backend Thread實(shí)時(shí)監(jiān)控


【Lars系統(tǒng)之Report Service模塊】
第1章-項(xiàng)目概述-數(shù)據(jù)表及proto3協(xié)議定義
第2章-獲取report上報(bào)數(shù)據(jù)
第3章-存儲(chǔ)線程池及消息隊(duì)列


【Lars系統(tǒng)之LoadBalance Agent模塊】
第1章-項(xiàng)目概述及構(gòu)建
第2章-主模塊業(yè)務(wù)結(jié)構(gòu)搭建
第3章-Report與Dns Client設(shè)計(jì)與實(shí)現(xiàn)
第4章-負(fù)載均衡模塊基礎(chǔ)設(shè)計(jì)
第5章-負(fù)載均衡獲取Host主機(jī)信息API
第6章-負(fù)載均衡上報(bào)Host主機(jī)信息API
第7章-過期窗口清理與過載超時(shí)(V0.5)
第8章-定期拉取最新路由信息(V0.6)
第9章-負(fù)載均衡獲取Route信息API(0.7)
第10章-API初始化接口(V0.8)
第11章-Lars Agent性能測(cè)試工具
第12章- Lars啟動(dòng)工具腳本


6) Route訂閱模式

6.1 訂閱模塊的設(shè)計(jì)與實(shí)現(xiàn)

? 訂閱模式整體的設(shè)計(jì).

lars_dns/include/subscribe.h

#pragma once

#include <vector>
#include <pthread.h>
#include <ext/hash_set>
#include <ext/hash_map>
#include "lars_reactor.h"
#include "lars.pb.h"
#include "dns_route.h"

using namespace __gnu_cxx;


//定義訂閱列表數(shù)據(jù)關(guān)系類型橘券,key->modid/cmdid镀赌, value->fds(訂閱的客戶端文件描述符)
typedef hash_map<uint64_t, hash_set<int>> subscribe_map;

//定義發(fā)布列表的數(shù)據(jù)關(guān)系類型, key->fd(訂閱客戶端的文件描述符), value->modids
typedef hash_map<int, hash_set<uint64_t>> publish_map;

class SubscribeList {
public:
    //設(shè)計(jì)單例
    static void init()  {
        _instance = new SubscribeList();
    }

    static SubscribeList *instance() {
        //保證init方法在這個(gè)進(jìn)程執(zhí)行中型酥,只執(zhí)行一次
        pthread_once(&_once, init);
        return _instance;
    }

    //訂閱
    void subscribe(uint64_t mod, int fd);
    
    //取消訂閱
    void unsubscribe(uint64_t mod, int fd);
    
    //發(fā)布
    void publish(std::vector<uint64_t> &change_mods);

    //根據(jù)在線用戶fd得到需要發(fā)布的列表
    void make_publish_map(listen_fd_set &online_fds, 
                          publish_map &need_publish);
    
    
private:
    //設(shè)計(jì)單例
    SubscribeList();
    SubscribeList(const SubscribeList &);
    const SubscribeList& operator=(const SubscribeList);

    static SubscribeList *_instance;
    static pthread_once_t _once;


    subscribe_map _book_list; //訂閱清單
    pthread_mutex_t _book_list_lock;

    publish_map _push_list; //發(fā)布清單
    pthread_mutex_t _push_list_lock;
};

? 首先SubscribeList采用單例設(shè)計(jì)。這里面定義了兩種數(shù)據(jù)類型

//定義訂閱列表數(shù)據(jù)關(guān)系類型耸携,key->modid/cmdid, value->fds(訂閱的客戶端文件描述符)
typedef hash_map<uint64_t, hash_set<int>> subscribe_map;

//定義發(fā)布列表的數(shù)據(jù)關(guān)系類型, key->fd(訂閱客戶端的文件描述符), value->modids
typedef hash_map<int, hash_set<uint64_t>> publish_map;

? subscribe_map是目前dns系統(tǒng)的總體訂閱列表伴鳖,記錄了訂閱的modid/cmdid都有哪些fds已經(jīng)訂閱了婴削,其實(shí)一個(gè)fd就代表一個(gè)客戶端。

? publish_map是即將發(fā)布的表隙券,其實(shí)這里面是subscribe_map的一個(gè)反表男应,key是訂閱的客戶端fd,而value是該客戶端需要接收的訂閱modid/cmdid數(shù)據(jù)娱仔。

屬性

_book_list:目前dns已經(jīng)全部的訂閱信息清單沐飘。

_push_list:目前dns即將發(fā)布的客戶端及訂閱信息清單。

方法

void subscribe(uint64_t mod, int fd): 加入modid/cmdid 和訂閱的客戶端fd到_book_list中牲迫。

void unsubscribe(uint64_t mod, int fd):取消一條訂閱數(shù)據(jù)耐朴。

void publish(std::vector<uint64_t> &change_mods): 發(fā)布訂閱數(shù)據(jù),其中change_mods是需要發(fā)布的那些modid/cmdid組合盹憎。

void make_publish_map(listen_fd_set &online_fds, publish_map &need_publish): 根據(jù)目前在線的訂閱用戶筛峭,得到需要通信的發(fā)布訂閱列表。

具體實(shí)現(xiàn)如下:

lars_dns/src/subscribe.cpp

#include "subscribe.h"

extern tcp_server *server;

//單例對(duì)象
SubscribeList *SubscribeList::_instance = NULL;

//用于保證創(chuàng)建單例的init方法只執(zhí)行一次的鎖
pthread_once_t SubscribeList::_once = PTHREAD_ONCE_INIT;

SubscribeList::SubscribeList()
{
}

//訂閱
void SubscribeList::subscribe(uint64_t mod, int fd)
{
    //將mod->fd的關(guān)系加入到_book_list中
    pthread_mutex_lock(&_book_list_lock);
    _book_list[mod].insert(fd);
    pthread_mutex_unlock(&_book_list_lock);
}

//取消訂閱
void SubscribeList::unsubscribe(uint64_t mod, int fd)
{
    //將mod->fd關(guān)系從_book_list中刪除
    pthread_mutex_lock(&_book_list_lock);
    if (_book_list.find(mod) != _book_list.end()) {
        _book_list[mod].erase(fd);
        if (_book_list[mod].empty() == true) {
            _book_list.erase(mod);
        }
    }
    pthread_mutex_unlock(&_book_list_lock);
}

void push_change_task(event_loop *loop, void *args)
{
    SubscribeList *subscribe = (SubscribeList*)args;

    //1 獲取全部的在線客戶端fd
    listen_fd_set online_fds;
    loop->get_listen_fds(online_fds);
    
    //2 從subscribe的_push_list中 找到與online_fds集合匹配陪每,放在一個(gè)新的publish_map里
    publish_map need_publish;
    subscribe->make_publish_map(online_fds, need_publish);

    //3 依次從need_publish取出數(shù)據(jù) 發(fā)送給對(duì)應(yīng)客戶端鏈接
    publish_map::iterator it; 
    for (it = need_publish.begin(); it != need_publish.end(); it++) {
        int fd = it->first; //fd

        //遍歷 fd對(duì)應(yīng)的 modid/cmdid集合
        hash_set<uint64_t>::iterator st;
        for (st = it->second.begin(); st != it->second.end(); st++) {
            //一個(gè)modid/cmdid
            int modid = int((*st) >> 32);
            int cmdid = int(*st);

            //組裝pb消息影晓,發(fā)送給客戶
            lars::GetRouteResponse rsp; 
            rsp.set_modid(modid);
            rsp.set_cmdid(cmdid);

            //通過route查詢對(duì)應(yīng)的host ip/port信息 進(jìn)行組裝
            host_set hosts = Route::instance()->get_hosts(modid, cmdid) ;
            for (host_set_it hit = hosts.begin(); hit != hosts.end(); hit++) {
                uint64_t ip_port_pair = *hit;
                lars::HostInfo host_info;
                host_info.set_ip((uint32_t)(ip_port_pair >> 32));
                host_info.set_port((int)ip_port_pair);

                //添加到rsp中
                rsp.add_host()->CopyFrom(host_info);
            }

            //給當(dāng)前fd 發(fā)送一個(gè)更新消息
            std::string responseString;
            rsp.SerializeToString(&responseString);

            //通過fd取出鏈接信息
            net_connection *conn = tcp_server::conns[fd];
            if (conn != NULL) {
                conn->send_message(responseString.c_str(), responseString.size(), lars::ID_GetRouteResponse);
            }
        }
    }
}

//根據(jù)在線用戶fd得到需要發(fā)布的列表
void SubscribeList::make_publish_map(
            listen_fd_set &online_fds, 
            publish_map &need_publish)
{
    publish_map::iterator it;

    pthread_mutex_lock(&_push_list_lock);
    //遍歷_push_list 找到 online_fds匹配的數(shù)據(jù),放到need_publish中
    for (it = _push_list.begin(); it != _push_list.end(); it++)  {
        //it->first 是 fd
        //it->second 是 modid/cmdid
        if (online_fds.find(it->first) != online_fds.end()) {
            //匹配到
            //當(dāng)前的鍵值對(duì)移動(dòng)到need_publish中
            need_publish[it->first] = _push_list[it->first];
            //當(dāng)該組數(shù)據(jù)從_push_list中刪除掉
            _push_list.erase(it);
        }
    }

    pthread_mutex_unlock(&_push_list_lock);
}



//發(fā)布
void SubscribeList::publish(std::vector<uint64_t> &change_mods)
{
    //1 將change_mods已經(jīng)修改的mod->fd 
    //  放到 發(fā)布清單_push_list中 
    pthread_mutex_lock(&_book_list_lock);
    pthread_mutex_lock(&_push_list_lock);

    std::vector<uint64_t>::iterator it;

    for (it = change_mods.begin(); it != change_mods.end(); it++) {
        uint64_t mod = *it;
        if (_book_list.find(mod) != _book_list.end()) {
            //將mod下面的fd set集合拷遷移到 _push_list中
            hash_set<int>::iterator fds_it;
            for (fds_it = _book_list[mod].begin(); fds_it != _book_list[mod].end(); fds_it++) {
                int fd = *fds_it;
                _push_list[fd].insert(mod);
            }
        }
    }

    pthread_mutex_unlock(&_push_list_lock);
    pthread_mutex_unlock(&_book_list_lock);

    //2 通知各個(gè)線程去執(zhí)行推送任務(wù)
    server->thread_poll()->send_task(push_change_task, this);
}

? 這里需要注意的是publish()里的server變量是全局變量檩禾,全局唯一的server句柄挂签。

6.2 開啟訂閱

? 那么訂閱功能實(shí)現(xiàn)了,該如何是調(diào)用觸發(fā)訂閱功能能盼产,我們可以在一個(gè)客戶端建立連接成功之后來調(diào)用.

lars_dns/src/dns_service.cpp

#include <ext/hash_set>
#include "lars_reactor.h"
#include "subscribe.h"
#include "dns_route.h"
#include "lars.pb.h"

tcp_server *server;

using __gnu_cxx::hash_set;

typedef hash_set<uint64_t> client_sub_mod_list;

// ...

//訂閱route 的modid/cmdid
void create_subscribe(net_connection * conn, void *args)
{
    conn->param = new client_sub_mod_list;
}

//退訂route 的modid/cmdid
void clear_subscribe(net_connection * conn, void *args)
{
    client_sub_mod_list::iterator it;
    client_sub_mod_list *sub_list = (client_sub_mod_list*)conn->param;

    for (it = sub_list->begin(); it  != sub_list->end(); it++) {
        uint64_t mod = *it;
        SubscribeList::instance()->unsubscribe(mod, conn->get_fd());
    }

    delete sub_list;

    conn->param = NULL;
}

int main(int argc, char **argv)
{
    event_loop loop;

    //加載配置文件
    config_file::setPath("conf/lars_dns.conf");
    std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
    short port = config_file::instance()->GetNumber("reactor", "port", 7778);


    //創(chuàng)建tcp服務(wù)器
    server = new tcp_server(&loop, ip.c_str(), port);

    //==========注冊(cè)鏈接創(chuàng)建/銷毀Hook函數(shù)============
    server->set_conn_start(create_subscribe);
    server->set_conn_close(clear_subscribe);
    //============================================

    //注冊(cè)路由業(yè)務(wù)
    server->add_msg_router(lars::ID_GetRouteRequest, get_route);

    //開始事件監(jiān)聽    
    printf("lars dns service ....\n");
    loop.event_process();

    return 0;
}

? 這里注冊(cè)了兩個(gè)鏈接Hook竹握。create_subscribe()clear_subscribe()

client_sub_mod_list為當(dāng)前客戶端鏈接所訂閱的route信息列表辆飘。主要存放當(dāng)前客戶訂閱的modid/cmdid的集合啦辐。因?yàn)椴煌目蛻舳擞嗛喌男畔⒉煌酱砸獙⒃摿斜砼c每個(gè)conn進(jìn)行綁定。


關(guān)于作者:

作者:Aceld(劉丹冰)

mail: danbing.at@gmail.com
github: https://github.com/aceld
原創(chuàng)書籍gitbook: http://legacy.gitbook.com/@aceld

原創(chuàng)聲明:未經(jīng)作者允許請(qǐng)勿轉(zhuǎn)載, 如果轉(zhuǎn)載請(qǐng)注明出處

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末芹关,一起剝皮案震驚了整個(gè)濱河市续挟,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌侥衬,老刑警劉巖诗祸,帶你破解...
    沈念sama閱讀 212,222評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異轴总,居然都是意外死亡直颅,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,455評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門怀樟,熙熙樓的掌柜王于貴愁眉苦臉地迎上來功偿,“玉大人,你說我怎么就攤上這事往堡⌒岛桑” “怎么了?”我有些...
    開封第一講書人閱讀 157,720評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵虑灰,是天一觀的道長吨瞎。 經(jīng)常有香客問我,道長穆咐,這世上最難降的妖魔是什么颤诀? 我笑而不...
    開封第一講書人閱讀 56,568評(píng)論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮对湃,結(jié)果婚禮上崖叫,老公的妹妹穿的比我還像新娘。我一直安慰自己熟尉,他們只是感情好归露,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,696評(píng)論 6 386
  • 文/花漫 我一把揭開白布洲脂。 她就那樣靜靜地躺著斤儿,像睡著了一般。 火紅的嫁衣襯著肌膚如雪恐锦。 梳的紋絲不亂的頭發(fā)上往果,一...
    開封第一講書人閱讀 49,879評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音一铅,去河邊找鬼陕贮。 笑死,一個(gè)胖子當(dāng)著我的面吹牛潘飘,可吹牛的內(nèi)容都是我干的肮之。 我是一名探鬼主播掉缺,決...
    沈念sama閱讀 39,028評(píng)論 3 409
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼戈擒!你這毒婦竟也來了眶明?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,773評(píng)論 0 268
  • 序言:老撾萬榮一對(duì)情侶失蹤筐高,失蹤者是張志新(化名)和其女友劉穎搜囱,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體柑土,經(jīng)...
    沈念sama閱讀 44,220評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡蜀肘,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,550評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了稽屏。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片扮宠。...
    茶點(diǎn)故事閱讀 38,697評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖诫欠,靈堂內(nèi)的尸體忽然破棺而出涵卵,到底是詐尸還是另有隱情,我是刑警寧澤荒叼,帶...
    沈念sama閱讀 34,360評(píng)論 4 332
  • 正文 年R本政府宣布轿偎,位于F島的核電站,受9級(jí)特大地震影響被廓,放射性物質(zhì)發(fā)生泄漏坏晦。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,002評(píng)論 3 315
  • 文/蒙蒙 一嫁乘、第九天 我趴在偏房一處隱蔽的房頂上張望昆婿。 院中可真熱鬧,春花似錦蜓斧、人聲如沸仓蛆。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,782評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽看疙。三九已至,卻和暖如春直奋,著一層夾襖步出監(jiān)牢的瞬間能庆,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,010評(píng)論 1 266
  • 我被黑心中介騙來泰國打工脚线, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留搁胆,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,433評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像渠旁,于是被迫代替她去往敵國和親攀例。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,587評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容