OpenThread是最舒心的跨平臺多線程并發(fā)庫

OpenThread

OpenThread是最舒心的跨平臺多線程并發(fā)庫帆疟,多線程三大設計模式: Await模式, Worker模式和Actor模式。

使用優(yōu)雅的方式楣责,創(chuàng)建線程淆储、管理線程和線程間通信,從而實現(xiàn)多核并發(fā)秆剪。

OpenThread無任何依賴赊淑,全平臺設計,只有兩個源文件仅讽,讓小白都可以輕松玩轉C++多線程開發(fā)陶缺。

OpenLinyou項目設計跨平臺服務器框架,在VS或者XCode上寫代碼洁灵,無需任何改動就可以編譯運行在Linux上饱岸,甚至是安卓和iOS.
OpenLinyou:https://github.com/openlinyou
https://gitee.com/linyouhappy

跨平臺支持

Windows、linux徽千、Mac苫费、iOS、Android等跨平臺設計

編譯和執(zhí)行

請安裝cmake工具双抽,用cmake構建工程百框,可以在vs或者xcode上編譯運行。
源代碼:https://github.com/openlinyou/openthread
https://gitee.com/linyouhappy/openthread

#克隆項目
git clone https://github.com/openlinyou/openthread
cd ./openthread
#創(chuàng)建build工程目錄
mkdir build
cd build
cmake ..
#如果是win32牍汹,在該目錄出現(xiàn)openthread.sln铐维,點擊它就可以啟動vs寫代碼調試
make
./helloworld

全部源文件

  • src/openthread.h
  • src/openthread.cpp

技術特點

OpenThread的技術特點:

  1. 跨平臺設計,提供Linux統(tǒng)一的pthread接口柑贞,支持安卓和iOS方椎。
  2. 線程池管理采用智能指針和無鎖map,實現(xiàn)高效訪問線程對象钧嘶。
  3. 每個線程自帶消息隊列棠众,消息放入隊列原子鎖,而讀取消息隊列,無鎖操作闸拿。保證線程交換信息高效空盼。
  4. 線程交互數(shù)據(jù),采用智能指針管理新荤,實現(xiàn)內存自動化管理揽趾,無需擔憂內存泄漏。
  5. 多線程三大設計模式: Await模式, Worker模式和Actor模式苛骨。

多線程開發(fā)三大設計模式

  1. Await模式篱瞎。兩條線程,一條線程向另一條線程請求痒芝,同時阻塞等待俐筋;另一條線程接收到請求,返回數(shù)據(jù)喚醒第一條線程严衬;第一條線程喚醒澄者,拿到數(shù)據(jù)繼續(xù)執(zhí)行。
  2. Worker模式请琳。適合客戶端粱挡,創(chuàng)建一定量的worker線程,組成factory俄精,向外提供唯一接口服務询筏。
  3. Actor模式。適合服務端嘀倒,一條線程一條Actor屈留,不同的Actor負責不同的功能。

1.創(chuàng)建線程HelloWorld

#include <assert.h>
#include <stdio.h>
#include "openthread.h"
using namespace open;

//子線程接收到三種消息就會調用此函數(shù)测蘑,三種消息為線程啟動灌危、退出和接收消息,
void TestThread(OpenThreadMsg& msg)
{
    if (msg.state_ == OpenThread::START)
    {
        printf("Hello OpenThread\n");
        //睡眠1秒鐘
        OpenThread::Sleep(1000);
        //退出線程
        msg.thread().stop();
    }
}
int main()
{
    // 創(chuàng)建線程碳胳,并對線程取名勇蝙,并設置子線程運行函數(shù)TestThread
    auto thread = OpenThread::Create("Thread", TestThread);
    // 等待子線程退出
    OpenThread::ThreadJoin(thread);
    printf("Pause\n");
    return getchar();
}

2.Await模式

在主線程創(chuàng)建OpenSyncReturn對象,把它發(fā)給子線程挨约,并阻塞等待子線程返回味混。
子線程接到該消息后,再發(fā)消息喚醒诫惭,再發(fā)OpenSync對象給主線程翁锡,等待主線程響應。
主線程線程被喚醒后夕土,收到子線程消息攜帶的OpenSync對象馆衔,喚醒子線程瘟判。

#include <assert.h>
#include <iostream>
#include <stdio.h>
#include "openthread.h"

using namespace open;

// Test1
struct TestData
{
    std::string data_;
};
struct Test1Data
{
    std::string data_;
    OpenSync openSync_;
    ~Test1Data()
    {
        printf("Test1:~Test1Data\n");
    }
};

// 子線程調用
void Test1Thread(OpenThreadMsg& msg)
{
    //線程啟動的消息
    if (msg.state_ == OpenThread::START)
    {
        printf("Test1Thread[%s] START\n", msg.name().c_str());
        OpenThread::Sleep(1000);
    }
    //線程接收到的消息
    else if (msg.state_ == OpenThread::RUN)
    {
        // //接收主線程的OpenSyncReturn對象,對其喚醒并發(fā)消息角溃。
        OpenSyncReturn<TestData, Test1Data>* data = msg.edit<OpenSyncReturn<TestData, Test1Data>>();
        if (data)
        {
            std::shared_ptr<TestData> str = data->get();
            if (str)
            {
                assert(str->data_ == "Waiting for you!");
            }
            auto sptr = std::shared_ptr<Test1Data>(new Test1Data);
            sptr->data_.assign("Of Course,I Still Love You!");
            data->wakeup(sptr);

            //等待主線程喚醒
            sptr->openSync_.await();
        }
        OpenThread::Sleep(1000);
    }
    //線程退出前的消息
    else if (msg.state_ == OpenThread::STOP)
    {
        printf("Test1Thread[%s] STOP\n", msg.name().c_str());
        OpenThread::Sleep(1000);
    }
}

int main()
{
    // 指定線程名拷获,并創(chuàng)建。未填函數(shù)减细,線程未啟動狀態(tài)匆瓜,需要執(zhí)行start啟動
    auto threadRef = OpenThread::Create("Test1Thread");
    threadRef.start(Test1Thread);

    // 給子線程發(fā)送消息
    auto msg = std::shared_ptr<OpenSyncReturn<TestData, Test1Data>>(new OpenSyncReturn<TestData, Test1Data>);
    {
        auto data = std::shared_ptr<TestData>(new TestData);
        data->data_ = "Waiting for you!";
        msg->put(data);
    }
    threadRef.send(msg);
    //阻塞主線程,等待子線程喚醒
    auto ret = msg->awaitReturn();
    if (ret)
    {
        assert(ret->data_ == "Of Course,I Still Love You!");
        printf("Test1====>>:%s\n", ret->data_.c_str());

        //喚醒子線程的阻塞
        ret->openSync_.wakeup();
    }
    // 向子線程發(fā)送關閉消息
    threadRef.stop();

    // 等待全部線程退出
    OpenThread::ThreadJoin(threadRef);
    printf("Pause\n");
    return getchar();
}

3.線程之間進行通信

分別創(chuàng)建子線程dog和子線程cat未蝌,子線程dog和子線程cat之間互相通信驮吱。
這是一個dog溜cat的小故事。

#include <assert.h>
#include <stdio.h>
#include "openthread.h"
using namespace open;
//dog子線程
void Test2ThreadDog(OpenThreadMsg& msg)
{
    assert(msg.name() == "dog");
    switch (msg.state_)
    {
    case OpenThread::START:
        printf("Test2ThreadDog[%s] START\n", msg.name().c_str());
        break;
    case OpenThread::RUN: {
        const std::string* data = msg.data<std::string>();
        if (!data) break;
        printf("Test2ThreadDog[%s] MSG:%s\n", msg.name().c_str(), data->c_str());
        //來自主線程的消息
        if (*data == "Hello dog! Catch cat!")
        {
            //向cat子線程發(fā)消息
            auto data = OpenThread::MakeShared<std::string>();
            data->assign("Hello cat! Catch you!");
            auto cat = OpenThread::Thread("cat");
            if (cat && !cat.send(data))
            {
                printf("Test2ThreadDog[%s] send failed\n", msg.name().c_str());
            }
        }
        //來自子線程cat的消息
        else if (*data == "Bang dog!")
        {
            //關閉子線程cat
            auto cat = OpenThread::Thread("cat");
            cat.stop();
        }
        else
        {
            assert(false);
        }
        break;
    }
    case OpenThread::STOP:
        printf("Test2ThreadDog[%s] STOP\n", msg.name().c_str());
        break;
    default:
        break;
    }
}
//cat子線程
void Test2ThreadCat(OpenThreadMsg& msg)
{
    assert(msg.name() == "cat");
    switch (msg.state_)
    {
    case OpenThread::START:
        printf("Test2ThreadCat[%s] START\n", msg.name().c_str());
        break;
    case OpenThread::RUN: {
        const std::string* data = msg.data<std::string>();
        if (!data) break;
        printf("Test2ThreadCat[%s] MSG:%s\n", msg.name().c_str(), data->c_str());
        //來自子線程dog的消息
        if (*data == "Hello cat! Catch you!")
        {
            auto data = OpenThread::MakeShared<std::string>();
            data->assign("Bang dog!");
            //向子線程dog發(fā)消息
            if (!OpenThread::Send("dog", data))
            {
                printf("Test2ThreadCat[%s] send failed\n", msg.name().c_str());
            }
        }
        break;
    }
    case OpenThread::STOP:
        printf("Test2ThreadCat[%s] STOP\n", msg.name().c_str());
        // dog線程關閉了cat树埠,cat線程在關閉前糠馆,也關閉dog線程,進行回擊怎憋。
        OpenThread::Stop("dog");
        break;
    default:
        break;
    }
}
int main()
{
    // 創(chuàng)建子線程dog和cat
    auto dog = OpenThread::Create("dog", Test2ThreadDog);
    auto cat = OpenThread::Create("cat", Test2ThreadCat);
    // 向子線程dog發(fā)消息
    auto data = OpenThread::MakeShared<std::string>();
    data->assign("Hello dog! Catch cat!");
    if (!dog.send(data))
    {
        printf("Test2Thread send failed\n");
    }
    // 等待子線程退出
    OpenThread::ThreadJoin({ "dog", "cat" });
    return getchar();
}

4.批量創(chuàng)建和管理線程

OpenThread啟動的時候,會默認設定創(chuàng)建線程的最大數(shù)量九昧。超過以后绊袋,就不能修改。
所以铸鹰,在程序啟動的時候癌别,用OpenThread::Init(256)可以指定線程最大數(shù)量。線程的目標主要是發(fā)揮多核性能蹋笼。
創(chuàng)建太多線程會帶來性能損耗展姐,最好線程數(shù)是CPU核數(shù)的2倍。盡量避免頻繁創(chuàng)建和銷毀線程剖毯。
為了防止線程之間混淆圾笨,設計了線程池OpenThreadPool⊙纺保可以對不同的業(yè)務配置專門的線程池擂达。

#include <assert.h>
#include <iostream>
#include <stdio.h>
#include "openthread.h"
using namespace open;

void Test3Thread1(OpenThreadMsg& msg)
{
}
void Test3Thread2(OpenThreadMsg& msg)
{
}
void Test3()
{
    //指定線程最大數(shù)量限制,只有程序啟動的時候才可修改
    OpenThread::Init(256);
    size_t capacity = OpenThread::GetThreadCapacity();
    assert(capacity == 256)
    for (size_t pid = 0; pid < capacity; pid++)
    {
        //OpenThread::Thread查詢線程對象OpenThread
        auto threadRef = OpenThread::Thread("Thread_"+std::to_string(pid));
        //由于沒有創(chuàng)建任何線程胶滋,故是null
        assert(!threadRef);
    }
    //全部線程名稱數(shù)量板鬓,線程名稱指定后就一直存在。
    assert(OpenThread::GetThreadSize() == 0);
    //創(chuàng)建智能指針對象究恤,發(fā)給子線程俭令。字符串"sendMsg"
    auto data = OpenThread::MakeShared<std::string>();
    data->assign("sendMsg");
    std::string name;
    //創(chuàng)建1024條線程
    for (int pid = 0; pid < capacity; pid++)
    {
        name = "Thread_" + std::to_string(pid);
        //OpenThread::Create創(chuàng)建指定名稱的線程,如果名稱綁定的線程存在部宿,就返回該線程抄腔。
        //成功以后便有線程名。 top -Hp可以查看。window系統(tǒng)沒有線程名
        auto threadRef = OpenThread::Create(name, Test3Thread1);
        assert(threadRef && threadRef.pid() == pid && threadRef.name() == name);
        //三種方式向子線程發(fā)消息妓柜,線程對象箱季、線程id(不是系統(tǒng)線程id,是數(shù)組索引id)棍掐、線程名稱
        threadRef.send(data);
        OpenThread::Send(pid, data);
        OpenThread::Send(name, data);
        printf("Test3 create %s\n", name.c_str());
    }
    assert(OpenThread::GetThreadSize() == capacity);
    for (size_t pid = 0; pid < capacity; pid++)
    {
        name = "Thread_" + std::to_string(pid);
        //通過線程名查詢線程藏雏,通過線程名查詢線程效率比較差,推薦使用線程id查詢作煌。
        auto threadRef = OpenThread::Thread(name);
        assert(threadRef && threadRef.name() == name);
        //關閉子線程
        threadRef.stop();
    }
    printf("Test3 do stop\n");
    //等待全部子線程關閉退出
    OpenThread::ThreadJoinAll();
    printf("Test3 finish waitStop\n");
    // 再次創(chuàng)建子線程掘殴,子線程名稱會一直存在,占用容量粟誓。
    //除非調用OpenThread::StopAll()奏寨,關閉清理全部子線程,推倒重來鹰服。
    for (size_t pid = 0; pid < capacity; pid++)
    {
        name = "Thread_" + std::to_string(pid);
        auto threadRef = OpenThread::Create(name, Test3Thread2);
        assert(threadRef && threadRef.pid() == pid && threadRef.name() == name);
    }
    printf("Test3 finish create again\n");
    //子線程名字數(shù)量超過最大容量病瞳,故用"over_boundary"創(chuàng)建失敗
    auto threadRef = OpenThread::Create("over_boundary");
    assert(!threadRef);
    //關閉退出全部線程,并進行清理
    OpenThread::StopAll();
}
//線程池測試
void Test5Thread2(OpenThreadMsg& msg)
{
    if (msg.state_ == OpenThread::START)
    {
        printf("Test1Thread[%s] START\n", msg.name().c_str());
        OpenThread::Sleep(1000);
    }
    else if (msg.state_ == OpenThread::RUN)
    {
        // recevie msg
        printf("Test1Thread[%s] RUN\n", msg.name().c_str());
        OpenThread::Sleep(1000);
    }
    else if (msg.state_ == OpenThread::STOP)
    {
        printf("Test1Thread[%s] STOP\n", msg.name().c_str());
        OpenThread::Sleep(1000);
    }
}
void Test5()
{
    //新建線程池
    OpenThreadPool pool;
    pool.init(64);

    auto thread = pool.create("Independent");
    if (thread)
    {
        thread->start(Test5Thread2);
        thread->stop();
    }
    //停止該線程池的全部線程
    pool.stopAll();
    pool.threadJoinAll();
}
int main()
{
    Test3();
    Test5();
    printf("Pause\n");
    return getchar();
}

5.Actor設計模式

Actor模式悲酷。適合服務端套菜,一條線程一條Actor,不同的Actor負責不同的功能设易。
用Worker類封裝使用OpenThread逗柴,一條線程一個Worker業(yè)務。Inspector(監(jiān)控)顿肺、Timer(定時器)和Server(服務器)繼承Worker戏溺。
Inspector負責監(jiān)控多個Timer運行信息,做負載均衡屠尊。
Timer提供定時器服務旷祸,啟動時,向Inspector注冊知染,并提供運行信息肋僧。
Server向Inspector查詢可用的Timer,然后向此Timer請求定時服務控淡。

#include <assert.h>
#include <iostream>
#include <stdio.h>
#include <map>
#include <unordered_map>
#include "openthread.h"
using namespace open;

class ProtoBuffer : public OpenThreadProto
{
    void* data_;
public:
    int dataType_;
    ProtoBuffer() 
        : OpenThreadProto()
        ,dataType_(0)
        ,data_(0){}
    virtual ~ProtoBuffer() { if (data_) delete data_; }
    template <class T>
    inline T& data() 
    { 
        T* t = 0;
        if (data_)
        {
            t = dynamic_cast<T*>((T*)data_);
            if (data_ == t) return *t;
            delete data_;
        }
        t = new T;
        data_ = t;
        return *t;
    }
    template <class T>
    inline T& data() const
    {
        if (data_)
        {
            T* t = dynamic_cast<T*>((T*)data_);
            if (data_ == t) return *t;
        }
        assert(false);
        static T t;
        return t;
    }
    static inline int ProtoType() { return (int)(uintptr_t) & (ProtoType); }
    virtual inline int protoType() const { return ProtoBuffer::ProtoType(); }
};

struct ProtoLoop : public OpenThreadProto
{
    int type_;
    ProtoLoop() :type_(-1) {}
    static inline int ProtoType() { return (int)(uintptr_t) & (ProtoType); }
    virtual inline int protoType() const { return ProtoLoop::ProtoType(); }
};

struct TimerEventMsg
{
    int workerId_;
    int64_t deadline_;
    TimerEventMsg() : workerId_(0), deadline_(0) {}
};

struct TimerInfoMsg
{
    int workerId_;
    size_t leftCount_;
    int64_t cpuCost_;
    int64_t dataTime_;
    TimerInfoMsg() : workerId_(0), leftCount_(0), cpuCost_(0), dataTime_(0) {}
};

enum EMsgId
{
    query_timer_info,
    get_timer_info,
    request_timer,
};

class Inspector : public OpenThreadWorker
{
    std::unordered_map<std::string, TimerInfoMsg> mapTimerInfo_;
    std::vector<int> vectQueryId;
public:
    Inspector(const std::string& name):OpenThreadWorker(name)
    {
        registers(ProtoLoop::ProtoType(), (OpenThreadHandle)&Inspector::onProtoLoop);
        registers(ProtoBuffer::ProtoType(), (OpenThreadHandle)&Inspector::onProtoBuffer);
    }
    virtual void onStart() {}
private:
    void onProtoLoop(const ProtoLoop& proto)
    {
        printf("Inspector::onProtoLoop[%s]Recevie<<==[%s]\n", name_.c_str(), proto.srcName_.c_str());
        std::vector<int> vectPid;
        vectPid.reserve(mapTimerInfo_.size());
        for (auto iter = mapTimerInfo_.begin(); iter != mapTimerInfo_.end(); iter++)
        {
            if (iter->second.workerId_ >= 0)
                vectPid.push_back(iter->second.workerId_);
        }
        auto root = std::shared_ptr<ProtoBuffer>(new ProtoBuffer);
        root->dataType_ = get_timer_info;
        send(vectPid, root);
    }
    void onProtoBuffer(const ProtoBuffer& proto)
    {
        printf("Inspector::onProtoBuffer[%s]Recevie<<==[%s]\n", name_.c_str(), proto.srcName_.c_str());
        if (proto.dataType_ == get_timer_info)
        {
            auto& msg = proto.data<TimerInfoMsg>();
            auto& timerInfo = mapTimerInfo_[proto.srcName_];
            timerInfo = msg;
            if (!vectQueryId.empty())
            {
                auto root = std::shared_ptr<ProtoBuffer>(new ProtoBuffer);
                root->dataType_ = query_timer_info;
                auto& info = root->data<TimerInfoMsg>();
                info = timerInfo;
                send(vectQueryId, root);

                vectQueryId.clear();
            }
        }
        else if (proto.dataType_ == query_timer_info)
        {
            TimerInfoMsg* tmpInfo = 0;
            auto curTime = OpenThread::MilliUnixtime();
            for (auto iter = mapTimerInfo_.begin(); iter != mapTimerInfo_.end(); iter++)
            {
                auto& info = iter->second;
                if (curTime > info.dataTime_ + 10000) continue;
                if (tmpInfo)
                {
                    if (tmpInfo->leftCount_ > info.leftCount_ || tmpInfo->cpuCost_ > info.cpuCost_)
                        tmpInfo = &info;
                }
                else
                {
                    tmpInfo = &info;
                }
            }
            if (!tmpInfo)
            {
                vectQueryId.push_back(proto.srcPid_);
                auto root = std::shared_ptr<ProtoLoop>(new ProtoLoop);
                sendLoop(root);
            }
            else
            {
                auto root = std::shared_ptr<ProtoBuffer>(new ProtoBuffer);
                root->dataType_ = query_timer_info;
                auto& info = root->data<TimerInfoMsg>();
                info = *tmpInfo;
                send(proto.srcPid_, root);
            }
        }
    }
};


class Timer:public OpenThreadWorker
{
    int inspectorId_;
    std::multimap<int64_t, int> mapTimerEvent_;
public:
    Timer(const std::string& name):OpenThreadWorker(name)
    {
        inspectorId_ = -1;
        registers(ProtoLoop::ProtoType(), (OpenThreadHandle)&Timer::onProtoLoop);
        registers(ProtoBuffer::ProtoType(), (OpenThreadHandle)&Timer::onProtoBuffer);
    }
protected:
    virtual void onStart()
    {
        while (inspectorId_ < 0)
        {
            inspectorId_ = ThreadId("Inspector");
            if (inspectorId_ >= 0)
            {
                auto root = std::shared_ptr<ProtoBuffer>(new ProtoBuffer);
                root->dataType_ = get_timer_info;
                auto& msg = root->data<TimerInfoMsg>();
                msg.workerId_ = pid();
                msg.dataTime_ = OpenThread::MilliUnixtime();
                msg.cpuCost_ = thread_->cpuCost();
                msg.leftCount_ = thread_->leftCount();

                send(inspectorId_, root);
                break;
            }
            OpenThread::Sleep(100);
        }
        auto root = std::shared_ptr<ProtoLoop>(new ProtoLoop);
        sendLoop(root);
    }
private:
    void onProtoLoop(const ProtoLoop& proto)
    {
        printf("Timer::onProtoLoop[%s]Recevie<<==[%s]\n", name_.c_str(), proto.srcName_.c_str());
        assert(proto.srcPid_ == pid_);
        int64_t curTime = 0;
        while (canLoop())
        {
            if (!mapTimerEvent_.empty())
            {
                curTime = OpenThread::MilliUnixtime();
                while (!mapTimerEvent_.empty())
                {
                    auto iter = mapTimerEvent_.begin();
                    if (curTime > iter->first)
                    {
                        auto root = std::shared_ptr<ProtoBuffer>(new ProtoBuffer);
                        root->dataType_ = request_timer;
                        auto& msg = root->data<TimerEventMsg>();
                        msg.workerId_ = pid();
                        msg.deadline_ = curTime;

                        send(iter->second, root);

                        mapTimerEvent_.erase(iter);
                    }
                    else
                    {
                        break;
                    }
                }
            }
            OpenThread::Sleep(10);
        }
    }
    void onProtoBuffer(const ProtoBuffer& proto)
    {
        printf("Timer::onProtoBuffer[%s]Recevie<<==[%s]\n", name_.c_str(), proto.srcName_.c_str());
        if (proto.dataType_ == get_timer_info)
        {
            auto root = std::shared_ptr<ProtoBuffer>(new ProtoBuffer);
            root->dataType_ = get_timer_info;
            auto& msg = root->data<TimerInfoMsg>();
            msg.workerId_ = pid();
            msg.dataTime_  = OpenThread::MilliUnixtime();
            msg.cpuCost_   = thread_->cpuCost();
            msg.leftCount_ = thread_->leftCount();
            send(proto.srcPid_, root);

            auto sptr = std::shared_ptr<ProtoLoop>(new ProtoLoop);
            sendLoop(sptr);
        }
        else if (proto.dataType_ == request_timer)
        {
            auto& msg = proto.data<TimerEventMsg>();
            mapTimerEvent_.insert({ msg.deadline_, proto.srcPid_ });

            auto sptr = std::shared_ptr<ProtoLoop>(new ProtoLoop);
            sendLoop(sptr);
        }
    }
};

class Server:public OpenThreadWorker
{
    int inspectorId_;
    int collect_;
public:
    Server(const std::string& name)
        :OpenThreadWorker(name)
        ,inspectorId_(-1)
    {
        collect_ = 0;
        registers(ProtoLoop::ProtoType(), (OpenThreadHandle)&Server::onProtoLoop);
        registers(ProtoBuffer::ProtoType(), (OpenThreadHandle)&Server::onProtoBuffer);
    }
protected:
    virtual void onStart()
    {
        while (inspectorId_ < 0)
        {
            inspectorId_ = ThreadId("Inspector");
            OpenThread::Sleep(10);
        }
        auto sptr = std::shared_ptr<ProtoLoop>(new ProtoLoop);
        sendLoop(sptr);
    }
private:
    void onProtoLoop(const ProtoLoop& proto)
    {
        printf("Server::onProtoLoop[%s]Recevie<<==[%s]\n", name_.c_str(), proto.srcName_.c_str());
        auto root = std::shared_ptr<ProtoBuffer>(new ProtoBuffer);
        root->dataType_ = query_timer_info;
        send(inspectorId_, root);
    }

    void onProtoBuffer(const ProtoBuffer& proto)
    {
        printf("Server::onProtoBuffer[%s]Recevie<<==[%s]\n", name_.c_str(), proto.srcName_.c_str());
        if (proto.dataType_ == query_timer_info)
        {
            auto& msg = proto.data<TimerInfoMsg>();
            if (msg.workerId_ > 0)
            {
                auto root = std::shared_ptr<ProtoBuffer>(new ProtoBuffer);
                root->dataType_ = request_timer;
                auto& event = root->data<TimerEventMsg>();
                int64_t curTime = OpenThread::MilliUnixtime();
                event.deadline_ = curTime + curTime % 2000;
                if (event.deadline_ > curTime + 2000)
                {
                    event.deadline_ = curTime;
                }
                send(msg.workerId_, root);
            }
            else
            {
                auto sptr = std::shared_ptr<ProtoLoop>(new ProtoLoop);
                sendLoop(sptr);
            }
        }
        else if (proto.dataType_ == request_timer)
        {
            if (collect_++ > 100)
            {
                OpenThread::StopAll();
                return;
            }
            sendLoop(std::shared_ptr<ProtoLoop>(new ProtoLoop));
        }
    }
};

int main()
{
    OpenThread::StopAll();
    std::vector<OpenThreadWorker*> vectWorker =
    {
        new Inspector("Inspector"),
        new Timer("timer1"),
        new Timer("timer2"),
        new Server("server1"),
        new Server("server2"),
        new Server("server3"),
        new Server("server4")
    };
    for (size_t i = 0; i < vectWorker.size(); i++)
    {
        vectWorker[i]->start();
    }

    OpenThread::ThreadJoinAll();
    for (size_t i = 0; i < vectWorker.size(); i++)
    {
        delete vectWorker[i];
    }
    vectWorker.clear();
    printf("Pause\n");
    return getchar();
}

6.Worker設計模式

適合客戶端噩咪,創(chuàng)建一定量的worker線程乓土,組成factory,向外提供唯一接口服務。

#include <assert.h>
#include <iostream>
#include <stdio.h>
#include <vector>
#include "openthread.h"
using namespace open;
//業(yè)務數(shù)據(jù)結構
struct Product
{
    int id_;
    std::string goods_;
    Product():id_(0) {}
};

//OpenThread交換協(xié)議
struct ProtoTask : public OpenThreadProto
{
    std::shared_ptr<Product> data_;
    OpenSync openSync_;

    static inline int ProtoType() { return 1; }
    virtual inline int protoType() const { return ProtoTask::ProtoType(); }
};

class Worker : public OpenThreadWorker
{   
    //Worker工程線程Factory鳞疲,提供4個worker線程临庇。
    class Factory
    {
        const std::vector<Worker*> vectWorker_;
    public:
        Factory()
        :vectWorker_({
            new Worker("Producer1"),
            new Worker("Producer2"),
            new Worker("Producer3"),
            new Worker("Producer4"),
            }) {}
        Worker* getWorker()
        {
            if (vectWorker_.empty()) return 0;
            return vectWorker_[std::rand() % vectWorker_.size()];
        }
    };
    static Factory Instance_;

    // Worker
    Worker(const std::string& name)
        :OpenThreadWorker(name)
    {
        mapHandle_[ProtoTask::ProtoType()] = (OpenThreadHandle)&Worker::makeProduct;
        uid_ = 1;
        start();
    }
    ~Worker()
    {
        for (size_t i = 0; i < vectTask_.size(); ++i)
        {
            vectTask_[i].openSync_.wakeup();
        }
    }
    //生產(chǎn)產(chǎn)品
    void makeProduct(const ProtoTask& proto)
    {
        vectTask_.push_back(proto);
        if (rand() % 2 == 0)
        {
            OpenThread::Sleep(1000);
        }
        for (size_t i = 0; i < vectTask_.size(); ++i)
        {
            auto& task = vectTask_[i];
            if (task.data_)
            {
                task.data_->id_ = pid_ + 100 * uid_++;
                task.data_->goods_ = name_ + " Dog coin" + std::to_string(task.data_->id_);
            }
            task.openSync_.wakeup();
        }
        vectTask_.clear();
    }
    int uid_;
    std::vector<ProtoTask> vectTask_;
public:
    //對外服務統(tǒng)一接口
    static bool MakeProduct(std::shared_ptr<Product>& product)
    {
        auto worker = Instance_.getWorker();
        if (!worker)  return false;
        auto proto = std::shared_ptr<ProtoTask>(new ProtoTask);
        proto->data_ = product;
        bool ret = worker->send(-1, proto);
        assert(ret);
        proto->openSync_.await();
        return ret;
    }
};
Worker::Factory Worker::Instance_;

void TestThread(OpenThreadMsg& msg)
{
    if (msg.state_ == OpenThread::START)
    {
        for (size_t i = 0; i < 100; i++)
        {
            auto product = std::shared_ptr<Product>(new Product());
            Worker::MakeProduct(product);
            printf("[%s] Recevie Product:%s\n", msg.name().c_str(), product->goods_.c_str());
        }
        msg.thread().stop();
    }
}

int main()
{
    //創(chuàng)建4條測試線程
    OpenThread::Create("TestThread1", TestThread);
    OpenThread::Create("TestThread2", TestThread);
    OpenThread::Create("TestThread3", TestThread);
    OpenThread::Create("TestThread4", TestThread);
    
    // wait stop
    OpenThread::ThreadJoinAll();
    printf("Pause\n");
    return getchar();
}
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末叼风,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子么夫,更是在濱河造成了極大的恐慌,老刑警劉巖肤视,帶你破解...
    沈念sama閱讀 218,122評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件档痪,死亡現(xiàn)場離奇詭異,居然都是意外死亡邢滑,警方通過查閱死者的電腦和手機腐螟,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來困后,“玉大人乐纸,你說我怎么就攤上這事∫∮瑁” “怎么了汽绢?”我有些...
    開封第一講書人閱讀 164,491評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長侧戴。 經(jīng)常有香客問我宁昭,道長,這世上最難降的妖魔是什么酗宋? 我笑而不...
    開封第一講書人閱讀 58,636評論 1 293
  • 正文 為了忘掉前任久窟,我火速辦了婚禮,結果婚禮上本缠,老公的妹妹穿的比我還像新娘。我一直安慰自己入问,他們只是感情好丹锹,可當我...
    茶點故事閱讀 67,676評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著芬失,像睡著了一般楣黍。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上棱烂,一...
    開封第一講書人閱讀 51,541評論 1 305
  • 那天租漂,我揣著相機與錄音,去河邊找鬼颊糜。 笑死哩治,一個胖子當著我的面吹牛,可吹牛的內容都是我干的衬鱼。 我是一名探鬼主播业筏,決...
    沈念sama閱讀 40,292評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼鸟赫!你這毒婦竟也來了蒜胖?” 一聲冷哼從身側響起消别,我...
    開封第一講書人閱讀 39,211評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎台谢,沒想到半個月后寻狂,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,655評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡朋沮,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,846評論 3 336
  • 正文 我和宋清朗相戀三年蛇券,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片朽们。...
    茶點故事閱讀 39,965評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡怀读,死狀恐怖,靈堂內的尸體忽然破棺而出骑脱,到底是詐尸還是另有隱情菜枷,我是刑警寧澤,帶...
    沈念sama閱讀 35,684評論 5 347
  • 正文 年R本政府宣布叁丧,位于F島的核電站啤誊,受9級特大地震影響,放射性物質發(fā)生泄漏拥娄。R本人自食惡果不足惜蚊锹,卻給世界環(huán)境...
    茶點故事閱讀 41,295評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望稚瘾。 院中可真熱鬧牡昆,春花似錦、人聲如沸摊欠。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽些椒。三九已至播瞳,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間免糕,已是汗流浹背赢乓。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留石窑,地道東北人牌芋。 一個月前我還...
    沈念sama閱讀 48,126評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像尼斧,于是被迫代替她去往敵國和親姜贡。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,914評論 2 355

推薦閱讀更多精彩內容