大型mmo服務(wù)器架構(gòu)介紹----線程篇

本篇是打算介紹一下目前常見的大型mmo服務(wù)器架構(gòu)的源碼,其實目前見過的幾個框架在思想上模型基本上大同小異攘残,本人公司的代碼由于不方便展示讨惩,于是使用開源的框架進(jìn)行解析,主要理解大致思想氏身,抓重點(diǎn)

有沒有想過一個問題巍棱?
服務(wù)器在啟動的一刻,客戶端和服務(wù)器究竟做了哪些準(zhǔn)備工作蛋欣?以及玩家登陸的時候航徙,客戶端又是怎么樣和服務(wù)器通訊的呢?
本篇也是系列的第一篇陷虎,打算先從底層線程的設(shè)計開始講起
該框架將每個任務(wù)進(jìn)行封裝到踏,然后通過線程管理器分別分發(fā)到對應(yīng)的線程處理。
也是mmo服務(wù)器的基礎(chǔ)線程框架尚猿。

ThreadObject類

ThreadObject類其實是線程處理事件的一個最小單位
里面包含了需要處理的函數(shù)(通過std::function包裝)

class Thread;
class ThreadObject : public MessageList
{
public:
    virtual bool Init() = 0; //初始化函數(shù)
    virtual void RegisterMsgFunction() = 0;//消息注冊
    virtual void Update() = 0;    //更新

    void SetThread(Thread* pThread);
    Thread* GetThread() const;
    bool IsActive() const;
    void Dispose() override;
    
protected:
    bool _active{ true };
    Thread* _pThread{ nullptr };

該類很簡單窝稿,繼承于MessageList(這個后續(xù)說)
并且主要成員就兩個,_active是否已經(jīng)執(zhí)行過凿掂,_pThread為當(dāng)前執(zhí)行這個線程對象的線程指針(就是這玩意是在哪個線程上執(zhí)行的)

如何使用這個類呢伴榔,只需要繼承這個類,然后將這個類的 虛函數(shù)實現(xiàn)就行
具體用法后面再結(jié)合一起說

Thread類

介紹完object類自然就是介紹Thread類啦庄萎,顧名思義 ThreadObject對象就是在某個Thread對象上運(yùn)行的

class Packet;
class ThreadObject;

class ThreadObjectList: public IDisposable
{
public:
    void AddObject(ThreadObject* _obj);
    void Update();
    void AddPacketToList(Packet* pPacket);
    void Dispose() override;

protected:
    // 本線程的所有對象    
    std::list<ThreadObject*> _objlist;
    std::mutex _obj_lock;
};

class Thread : public ThreadObjectList, public SnObject {
public:
    Thread();
    void Start();
    void Stop();
    bool IsRun() const;
   
private:
    bool _isRun;
    std::thread _thread;
};

可以看到上面的代碼踪少,Thread類繼承于ThreadObjectList,而ThreadObjectList里就保存了一個list糠涛,里面存放所有ThreadObject對象的指針援奢,并且提供了方法接口,將ThreadObject對象指針存入到list中脱羡,而主Thread類保存了當(dāng)前進(jìn)程的指針萝究,使用的是C++11后的std::thread庫免都,并且也封裝了start和stop方法,方便使用帆竹。

void ThreadObjectList::AddObject(ThreadObject* obj)
{
    std::lock_guard<std::mutex> guard(_obj_lock);//對當(dāng)前線程上鎖

    // 在加入之前初始化一下
    if (!obj->Init())
    {
        std::cout << "AddObject Failed. ThreadObject init failed." << std::endl;
    }
    else
    {
        obj->RegisterMsgFunction(); //運(yùn)行threadObject的消息注冊函數(shù)
        _objlist.push_back(obj); //將threadobject對象指針保持至list
      
        //保持成功后 將當(dāng)前線程的指針保存至這個threadObject對象的thread成員上
        const auto pThread = dynamic_cast<Thread*>(this);
        if (pThread != nullptr)
            obj->SetThread(pThread);
    }
}

上面的函數(shù)就是將ThreadObject對象指針存入ThreadObjectList::_objlist對象的過程绕娘,重要的三點(diǎn):
1:存入之前先對object初始化
2:注冊消息函數(shù)
3:成功之后保存當(dāng)前線程指針

Thread::Thread()
{
    this->_isRun = true;
}

void Thread::Stop()
{
    if (!_isRun)
    {
        _isRun = false;
        if (_thread.joinable()) _thread.join();
    }
}

void Thread::Start()
{
    _isRun = true;
    _thread = std::thread([this]()
    {
        while (_isRun)
        {
            Update();
        }
    });
}

bool Thread::IsRun() const
{
    return _isRun;
}

上述代碼就是Thread類的方法實現(xiàn),非常簡單明了栽连,這里注意 thread對象只要在運(yùn)行险领,那么就會一直執(zhí)行ThreadObiectList對象的update函數(shù) 這也是當(dāng)前線程的主循環(huán)

那么我們來看看update函數(shù)都干了些什么吧~

void ThreadObjectList::Update()
{
    std::list<ThreadObject*> _tmpObjs; //
    _obj_lock.lock();
    std::copy(_objlist.begin(), _objlist.end(), std::back_inserter(_tmpObjs));
    _obj_lock.unlock();

    for (ThreadObject* pTObj : _tmpObjs)
    {
        pTObj->ProcessPacket();
        pTObj->Update();

        // 非激活狀態(tài),刪除
        if (!pTObj->IsActive())
        {
            _obj_lock.lock();
            _objlist.remove(pTObj);
            _obj_lock.unlock();

            pTObj->Dispose();
            delete pTObj;
        }
    }

    std::this_thread::sleep_for(std::chrono::milliseconds(1));
}

上述代碼其實也不難秒紧,本質(zhì)就是把原來的list上的Object拷貝一份绢陌,然后對拷貝的這份的每個object指針進(jìn)行處理
分別對每個object執(zhí)行
1: processPacket(這個先無視 是MessageList里的方法,理解成處理消息就行)
2:Update(這個函數(shù)是虛函數(shù)熔恢,具體邏輯自己實現(xiàn)脐湾,也是我們需要真正執(zhí)行邏輯的地方)
3:執(zhí)行完update之后 判斷這個object是否已經(jīng)沒用了(被拋棄了) 如果已經(jīng)沒用了,那么就將這個obj移除掉

最終再過一毫秒繼續(xù)運(yùn)行該函數(shù)

ThreadMgr類

class Packet;
class ThreadObject;
class Network;

class ThreadMgr :public Singleton<ThreadMgr>, public ThreadObjectList
{
public:
    ThreadMgr();
    void StartAllThread();
    bool IsGameLoop();
    void NewThread();
    bool AddObjToThread(ThreadObject* obj);
    void AddNetworkToThread(APP_TYPE appType, Network* pNetwork);

    // message
    void DispatchPacket(Packet* pPacket);
    void SendPacket(Packet* pPacket);

    void Dispose() override;

private:
    Network* GetNetwork(APP_TYPE appType);

private:
    uint64 _lastThreadSn{ 0 }; // 實現(xiàn)線程對象均分

    std::mutex _thread_lock;
    std::map<uint64, Thread*> _threads;

    // NetworkLocator
    std::mutex _locator_lock;
    std::map<APP_TYPE, Network*> _networkLocator;
};

不需要全部看懂意思 只需要大概了解思想
抓重點(diǎn)
1:繼承自Singleton<ThreadMgr> 說明每個進(jìn)程只允許一個對象叙淌,不管從哪個線程獲取到這個ThreadMgr都是指向同一個對象
2: std::map<uint64, Thread*> _threads; 該對象擁有所有線程對象的指針秤掌,并且是以key = 線程id value =線程指針的形式保存
3:可以通過該對象創(chuàng)建新的進(jìn)程
4:可以通過該對象直接將ThreadObject對象平均分配到線程中,而不需要我們自己決定鹰霍,實現(xiàn)了負(fù)載均衡

接下來看具體方法實現(xiàn):

void ThreadMgr::StartAllThread()
{
    auto iter = _threads.begin();
    while (iter != _threads.end())
    {
        iter->second->Start();
        ++iter;
    }
}

bool ThreadMgr::IsGameLoop()
{
    for (auto iter = _threads.begin(); iter != _threads.end(); ++iter)
    {
        if (iter->second->IsRun())
            return true;
    }

    return false;
}

void ThreadMgr::NewThread()
{
    std::lock_guard<std::mutex> guard(_thread_lock);
    auto pThread = new Thread();
    _threads.insert(std::make_pair(pThread->GetSN(), pThread));
}

上面代碼通俗易懂 不需要解釋~

// 平均加入各線程中
bool ThreadMgr::AddObjToThread(ThreadObject* obj)
{
    std::lock_guard<std::mutex> guard(_thread_lock);

    // 找到上一次的線程 
    auto iter = _threads.begin();
    if (_lastThreadSn > 0)
    {
        iter = _threads.find(_lastThreadSn);
    }

    if (iter == _threads.end())
    {
        // 沒有找到闻鉴,可能沒有配線程
        std::cout << "AddThreadObj Failed. no thead." << std::endl;
        return false;
    }

    // 取到它的下一個活動線程
    do
    {
        ++iter;
        if (iter == _threads.end())
            iter = _threads.begin();
    } while (!(iter->second->IsRun()));

    auto pThread = iter->second;
    pThread->AddObject(obj);
    _lastThreadSn = pThread->GetSN();
    //std::cout << "add obj to thread.id:" << pThread->GetSN() << std::endl;

    return true;
}

主要來看如何進(jìn)行負(fù)載均衡的
看了代碼就能理解,其實就是平均分配
在把obj加入到某個線程之后 記錄這個線程的線程id茂洒,下次別的ojb來的時候就在這個線程id對應(yīng)的mapKey的下一位key對應(yīng)的線程上增加該ojb
假設(shè) 線程有 1 2 3
第一次是加到1 上
第二次是加到2 上
第三次是加到3 上
如此反復(fù)

那么到此為止孟岛,整體的線程框架就能了解了

image.png

MessageList

image.png

上文說道,在ThreadObject上繼承了MessageList這個類督勺,那么這個類是啥呢纤房?

先考慮一個問題:
假設(shè)一個消息Id為 Msg1,攜帶著Packet數(shù)據(jù)骇径,從客戶端傳來彩倚,按照現(xiàn)在的情況是 ThreadMgr管理類會將這個msg分配給每個Thread摔蓝,并且每個ThreadObject都可以處理這個數(shù)據(jù)腋舌。
但是如果我只希望Thread1里的某個ThreadObject處理這條數(shù)據(jù)就行了盏触,別的ThreadObject處理別的MsgId,實現(xiàn)消息的過濾块饺,應(yīng)該怎么辦呢赞辩?
MessageList的用處就是在這里,實現(xiàn)消息過濾授艰,每個ThreadObject監(jiān)聽自己感興趣的msgId辨嗽。
看代碼!

class Packet;

typedef std::function<void(Packet*)> HandleFunction;

class MessageList
{
public:
    void RegisterFunction(int msgId, HandleFunction function);//注冊這個消息Id 對應(yīng)的回調(diào)
    bool IsFollowMsgId(int msgId);//是否是這個ThreadObjc所需要的消息id
    void ProcessPacket();//處理Packet
    void AddPacket(Packet* pPacket); //添加Patcket

protected:
    std::mutex _msgMutex; //鎖
    std::list<Packet*> _msgList; //消息隊列
    std::map<int, HandleFunction> _callbackHandle; //回調(diào)map 
};

從上面可以大致知道淮腾,每個ThreadObject都有一個MessageList糟需,這樣每個ThreadObject在聲明的時候就可以通過RegisterFunction這個函數(shù)來注冊自己需要監(jiān)聽的消息屉佳。

void MessageList::RegisterFunction(int msgId, HandleFunction function)
{
    std::lock_guard<std::mutex> guard(_msgMutex);
    _callbackHandle[msgId] = function;
}

當(dāng)消息來到的時候,通過IsFollowMsgId 來判斷是不是ThreadObject所需要的msgId洲押,如果是的話武花,就調(diào)用 AddPacket函數(shù),將這個消息的packet保存下來杈帐,供后續(xù)ProcessPacket函數(shù)使用体箕。

void MessageList::AddPacket(Packet* pPacket)
{
    std::lock_guard<std::mutex> guard(_msgMutex);
    _msgList.push_back(pPacket);
}
image.png

上圖邏輯就是每個Thread在分發(fā)消息的時候 根據(jù)每個ThreadObject關(guān)心的MsgId進(jìn)行分發(fā)的。

void MessageList::ProcessPacket()
{
    std::list<Packet*> tmpList;
    _msgMutex.lock();
    std::copy(_msgList.begin(), _msgList.end(), std::back_inserter(tmpList));
    _msgList.clear();
    _msgMutex.unlock();

    for (auto packet : tmpList)
    {
        const auto handleIter = _callbackHandle.find(packet->GetMsgId());
        if (handleIter == _callbackHandle.end())
        {
            std::cout << "packet is not hander. msg id;" << packet->GetMsgId() << std::endl;
        }
        else
        {
            handleIter->second(packet);
        }
    }

    tmpList.clear();
}

處理消息的邏輯大致如下
1.聲明一個tmpList挑童,存儲
2 上鎖 把_msgList復(fù)制到tmpList里處理 清空_msgList 解鎖

  1. 遍歷tmpList 獲取每個packet的 MsgId累铅,并且通過_callbackHandle map找到對應(yīng)MsgId需要處理的函數(shù)對象,調(diào)用該函數(shù)站叼。

提問:為什么這里要上鎖呢娃兽?
其實是因為ThreadMgr是一個單例對象,這個對象在線程1和線程2其實指向的都是同一個對象尽楔,那么就會造成共享資源的問題换薄,所以需要對Thread和ThreadObect做訪問臨界資源上鎖的處理。

提問:MsgId是什么
實際上可以理解成 是和服務(wù)器和客戶端規(guī)定的協(xié)議 翔试,比如大家規(guī)定當(dāng)msgId等1時轻要,這個消息代表的含義是XXX。那么雙方彼此就可以通過MsgId獲取到含義垦缅,從而處理這個消息冲泥。
目前用的最普遍的消息序列化工具是protobuf

什么是protobuf? 官方文檔對 protobuf 的定義:protocol buffers 是一種語言無關(guān)、平臺無關(guān)壁涎、可擴(kuò)展的序列化結(jié)構(gòu)數(shù)據(jù)的方法凡恍,可用于數(shù)據(jù)通信協(xié)議和數(shù)據(jù)存儲等.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市怔球,隨后出現(xiàn)的幾起案子嚼酝,更是在濱河造成了極大的恐慌,老刑警劉巖竟坛,帶你破解...
    沈念sama閱讀 217,907評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件闽巩,死亡現(xiàn)場離奇詭異,居然都是意外死亡担汤,警方通過查閱死者的電腦和手機(jī)涎跨,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,987評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來崭歧,“玉大人隅很,你說我怎么就攤上這事÷誓耄” “怎么了叔营?”我有些...
    開封第一講書人閱讀 164,298評論 0 354
  • 文/不壞的土叔 我叫張陵屋彪,是天一觀的道長。 經(jīng)常有香客問我绒尊,道長撼班,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,586評論 1 293
  • 正文 為了忘掉前任垒酬,我火速辦了婚禮砰嘁,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘勘究。我一直安慰自己矮湘,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,633評論 6 392
  • 文/花漫 我一把揭開白布口糕。 她就那樣靜靜地躺著缅阳,像睡著了一般。 火紅的嫁衣襯著肌膚如雪景描。 梳的紋絲不亂的頭發(fā)上十办,一...
    開封第一講書人閱讀 51,488評論 1 302
  • 那天,我揣著相機(jī)與錄音超棺,去河邊找鬼向族。 笑死,一個胖子當(dāng)著我的面吹牛棠绘,可吹牛的內(nèi)容都是我干的件相。 我是一名探鬼主播,決...
    沈念sama閱讀 40,275評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼氧苍,長吁一口氣:“原來是場噩夢啊……” “哼夜矗!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起让虐,我...
    開封第一講書人閱讀 39,176評論 0 276
  • 序言:老撾萬榮一對情侶失蹤紊撕,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后赡突,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體对扶,經(jīng)...
    沈念sama閱讀 45,619評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,819評論 3 336
  • 正文 我和宋清朗相戀三年麸俘,在試婚紗的時候發(fā)現(xiàn)自己被綠了辩稽。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片惧笛。...
    茶點(diǎn)故事閱讀 39,932評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡从媚,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出患整,到底是詐尸還是另有隱情拜效,我是刑警寧澤喷众,帶...
    沈念sama閱讀 35,655評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站紧憾,受9級特大地震影響到千,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜赴穗,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,265評論 3 329
  • 文/蒙蒙 一憔四、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧般眉,春花似錦了赵、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,871評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至埠对,卻和暖如春络断,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背项玛。 一陣腳步聲響...
    開封第一講書人閱讀 32,994評論 1 269
  • 我被黑心中介騙來泰國打工貌笨, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人襟沮。 一個月前我還...
    沈念sama閱讀 48,095評論 3 370
  • 正文 我出身青樓躁绸,卻偏偏與公主長得像,于是被迫代替她去往敵國和親臣嚣。 傳聞我的和親對象是個殘疾皇子净刮,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,884評論 2 354

推薦閱讀更多精彩內(nèi)容