跨全平臺(tái)高性能HttpClient嘗試用OpenSocket開(kāi)發(fā)設(shè)計(jì)

OpenSocket是一個(gè)跨全平臺(tái)的高性能網(wǎng)絡(luò)并發(fā)庫(kù)谚咬。

它使用了高性能IO萝快,Linux和安卓用epoll,Win32用IOCP贡翘,iOS和Mac用kqueue,其他系統(tǒng)使用select砰逻。

本文用這種高性能socket庫(kù)鸣驱,設(shè)計(jì)開(kāi)發(fā)一個(gè)HttpClient。

為了開(kāi)發(fā)方面蝠咆,我們使用OpenThread作為線程庫(kù)踊东。OpenThread可以實(shí)現(xiàn)多線程三大設(shè)計(jì)模式,開(kāi)發(fā)這個(gè)HttpClient刚操,使用Worker模式闸翅。

設(shè)計(jì)思路如下:

  1. 每個(gè)HttpClient是一條線程O(píng)penThreader。一個(gè)HttpClient對(duì)象菊霜,可以處理任意個(gè)Http請(qǐng)求坚冀。它由Factory類管理和創(chuàng)建。

  2. 一次Http請(qǐng)求就是一個(gè)task鉴逞,從Factory挑選一個(gè)HttpClient對(duì)象记某,向該對(duì)象發(fā)送task。并進(jìn)行阻塞等待結(jié)果华蜒。

  3. HttpClient對(duì)象收到task消息辙纬。把IP和端口做參數(shù),調(diào)用OpenSocket的connect叭喜。
    connect有兩個(gè)作用贺拣,一個(gè)是執(zhí)行網(wǎng)絡(luò)連接產(chǎn)生一個(gè)fd,同時(shí)把這個(gè)fd加入到poll捂蕴,fd與HttpClient對(duì)象的線程id進(jìn)行綁定譬涡。
    fd加入到poll成功以后,該socket的任何消息可以通過(guò)線程Id啥辨,發(fā)到對(duì)應(yīng)的HttpClient涡匀。

  4. HttpClient對(duì)象接收到socket的open消息后,向服務(wù)器發(fā)送http報(bào)文溉知。

  5. 因?yàn)閏onnect把fd和線程id進(jìn)行綁定陨瘩。所以,HttpClient會(huì)收到服務(wù)器返回的Http報(bào)文级乍。

  6. HttpClient接收完Http報(bào)文舌劳,就喚醒請(qǐng)求線程。請(qǐng)求線程被喚醒玫荣,拿到Http請(qǐng)求數(shù)據(jù)甚淡。

測(cè)試?yán)邮谦@取交易所的最新龍虎數(shù)據(jù)。

具體源碼如下:

#include <assert.h>
#include <time.h>
#include <math.h>
#include <map>
#include "open/openthread.h"
#include "opensocket.h"
using namespace open;

//請(qǐng)求http對(duì)象捅厂,包含返回對(duì)象贯卦。
class HttpRequest
{
    std::string url_;
public:
    int port_;
    std::string host_;
    std::string ip_;
    std::string path_;
    std::string method_;
    std::string body_;
        //http請(qǐng)求頭
    std::map<std::string, std::string> headers_;
    HttpRequest() :port_(80) {}
    std::string& operator[](const std::string& key) { return headers_[key]; }
    //指定url资柔,并進(jìn)行解析和域名解析
    void setUrl(const std::string& url)
    {
        if (url.empty()) return;
        url_ = url;
        int len = (int)url.length();
        char* ptr = (char*)url.c_str();
        if (len >= 8)
        {
            if (memcmp(ptr, "http://", strlen("http://")) == 0)
                ptr += strlen("http://");
            else if (memcmp(ptr, "https://", strlen("https://")) == 0)
                ptr += strlen("https://");
        }
        const char* tmp = strstr(ptr, "/");
        path_.clear();
        if (tmp != 0)
        {
            path_.append(tmp);
            host_.clear();
            host_.append(ptr, tmp - ptr);
        }
        else
        {
            host_ = ptr;
        }
        port_ = 80;
        ip_.clear();
        ptr = (char*)host_.c_str();
        tmp = strstr(ptr, ":");
        if (tmp != 0)
        {
            ip_.append(ptr, tmp - ptr);
            tmp += 1;
            port_ = atoi(tmp);
        }
        else
        {
            ip_ = ptr;
        }
        //域名解析,把域名轉(zhuǎn)ip撵割』哐撸可以緩存,提供效率
        ip_ = OpenSocket::DomainNameToIp(ip_);
    }
    inline void operator=(const std::string& url) { setUrl(url); }
    //http返回對(duì)象
    struct HttpResponse
    {
        int code_;
        int clen_;
        std::string head_;
        std::string body_;
        //std::multimap<std::string, std::string> headers_;
        std::map<std::string, std::string> headers_;
        std::string& operator[](const std::string& key) { return headers_[key]; }
        HttpResponse():code_(0), clen_(0) {}
        //解析返回http消息頭
        void parseHeader()
        {
            if (!headers_.empty() || head_.size() < 12) return;
            std::string line;
            const char* ptr = strstr(head_.c_str(), "\r\n");
            if (!ptr) return;
            code_ = 0;
            clen_ = 0;
            line.append(head_.c_str(), ptr - head_.c_str());
            for (size_t i = 0; i < line.size(); i++)
            {
                if (line[i] == ' ')
                {
                    while (i < line.size() && line[i] == ' ') ++i;
                    code_ = std::atoi(line.data() + i);
                    break;
                }
            }
            if (code_ <= 0) return;
            line.clear();
            int k = -1;
            int j = -1;
            std::string key;
            std::string value;
            for (size_t i = ptr - head_.c_str() + 2; i < head_.size() - 1; i++)
            {
                if (head_[i] == '\r' && head_[i + 1] == '\n')
                {
                    if (j >  0)
                    {
                        k = 0;
                        while (k < line.size() && line[k] == ' ') ++k;
                        while (k >= 0 && line.back() == ' ') line.pop_back();
                        value = line.data() + j + 1;
                        while (j >= 0 && line[j] == ' ') j--;
                        key.clear();
                        key.append(line.data(), j);
                        for (size_t x = 0; x < key.size(); x++)
                            key[x] = std::tolower(key[x]);
                        headers_[key] = value;
                    }
                    ++i;
                    j = -1;
                    line.clear();
                    continue;
                }
                line.push_back(head_[i]);
                if (j < 0 && line.back() == ':')
                {
                    j = line.size() - 1;
                }
            }
            clen_ = std::atoi(headers_["content-length"].c_str());
        }
    };
    HttpResponse response_;
    //阻塞當(dāng)前線程睁枕,等待http消息返回官边,才繼續(xù)執(zhí)行。
    OpenSync openSync_;
};

//OpenThread的線程之間通信數(shù)據(jù)結(jié)構(gòu)外遇,用isSocket_區(qū)別是socket消息還是http請(qǐng)求消息
struct BaseProto
{
    bool isSocket_;
};
//攜帶OpenSocket消息的數(shù)據(jù)結(jié)構(gòu),isSocket_=true
struct SocketProto : public BaseProto
{
    std::shared_ptr<OpenSocketMsg> data_;
};
//攜帶http請(qǐng)求消息的數(shù)據(jù)結(jié)構(gòu)契吉,isSocket_=false
struct TaskProto : public BaseProto
{
    int fd_;
    OpenSync openSync_;
    std::shared_ptr<HttpRequest> request_;
};

//應(yīng)用程序單利跳仿,封裝OpenSocket,一個(gè)進(jìn)程只有一個(gè)對(duì)象捐晶。
class App
{
    //OpenSocketMsg需要手動(dòng)釋放菲语,放到智能指針,由智能指針釋放
    static void SocketFunc(const OpenSocketMsg* msg)
    {
        if (!msg) return;
        //msg需要手動(dòng)delete惑灵,把它托管給智能指針
        auto proto = std::shared_ptr<SocketProto>(new SocketProto);
        //OpenThread的線程id >= 0山上,所以只處理非負(fù)數(shù)的條件
        if (msg->uid_ >= 0)
        {
            proto->isSocket_ = true;
            proto->data_ = std::shared_ptr<OpenSocketMsg>((OpenSocketMsg*)msg);
            //msg->uid_是綁定的線程id,向該線程派發(fā)socket消息
            if (!OpenThread::Send((int)msg->uid_, proto))
                printf("SocketFunc dispatch faild pid = %lld\n", msg->uid_);
        }
    }
public:
    static App Instance_;
    //OpenSocket對(duì)象英支,可以設(shè)計(jì)成單利
    OpenSocket openSocket_;
    //App構(gòu)造的時(shí)候佩憾,啟動(dòng)OpenSocket。
    App() {  openSocket_.run(App::SocketFunc); }
};
App App::Instance_;

//HttpClient線程類干花,F(xiàn)actory管理一組線程妄帘。
class HttpClient : public OpenThreader
{
    //Factory
    class Factory
    {
        const std::vector<HttpClient*> vectWorker_;
    public:
        Factory()
            :vectWorker_({
                new HttpClient("HttpClient1"),
                new HttpClient("HttpClient2"),
                new HttpClient("HttpClient3"),
                new HttpClient("HttpClient4"),
                }) {}
            //采用隨機(jī)方式,提供一個(gè)線程
        HttpClient* getWorker()
        {
            if (vectWorker_.empty()) return 0;
            return vectWorker_[std::rand() % vectWorker_.size()];
        }
    };
    static Factory Instance_;

    // name是線程名池凄,必須制定抡驼。在Linux上,top -Hp可以看到這個(gè)線程名肿仑。
    HttpClient(const std::string& name)
        :OpenThreader(name)
    {
        start();
    }
    ~HttpClient()
    {
        //銷毀之前致盟,盡可能喚醒請(qǐng)求線程,防止請(qǐng)求線程阻塞
        for (auto iter = mapFdToTask_.begin(); iter != mapFdToTask_.end(); iter++)
            iter->second.openSync_.wakeup();
    }
    //處理請(qǐng)求http線程發(fā)過(guò)來(lái)的消息
    void onHttp(TaskProto& proto)
    {
        auto& request = proto.request_;
        //連接Http服務(wù)器尤慰,并把fd與當(dāng)前線程綁定馏锡。該socket的全部消息,都發(fā)到此線程
        proto.fd_ = App::Instance_.openSocket_.connect(pid(), request->ip_, request->port_);
        request->response_.code_ = -1;
        request->response_.head_.clear();
        request->response_.body_.clear();
        //fd與任務(wù)綁定到任務(wù)列表
        mapFdToTask_[proto.fd_] = proto;
    }
    //與http服務(wù)器連接成功以后割择,發(fā)送http請(qǐng)求報(bào)文
    void onSend(const std::shared_ptr<OpenSocketMsg>& data)
    {
        //需要判斷fd綁定的task是否存在眷篇,否則關(guān)閉與Http服務(wù)器的連接
        auto iter = mapFdToTask_.find(data->fd_);
        if (iter == mapFdToTask_.end())
        {
            App::Instance_.openSocket_.close(pid(), data->fd_);
            return;
        }
        auto& task = iter->second;
        auto& request = task.request_;
        std::string buffer = request->method_ + " " + request->path_ + " HTTP/1.1 \r\n";
        auto iter1 = request->headers_.begin();
        for (; iter1 != request->headers_.end(); iter1++)
        {
            buffer.append(iter1->first + ": " + iter1->second + "\r\n");
        }
        if (!request->body_.empty())
        {
            buffer.append("Content-Length:" + std::to_string(request->body_.size()) + "\r\n\r\n");
            buffer.append(request->body_);
            buffer.append("\r\n");
        }
        else
        {
            buffer.append("\r\n");
        }
        //制作好Http請(qǐng)求報(bào)文,發(fā)送給服務(wù)器荔泳。
        App::Instance_.openSocket_.send(task.fd_, buffer.data(), (int)buffer.size());
    }
    //處理Http服務(wù)器發(fā)送過(guò)了socket數(shù)據(jù)流蕉饼,拼成完整的Http返回報(bào)文
    void onRead(const std::shared_ptr<OpenSocketMsg>& data)
    {
        //Http任務(wù)列表沒(méi)有綁定fd的任務(wù)虐杯,就對(duì)該fd關(guān)閉。
        auto iter = mapFdToTask_.find(data->fd_);
        if (iter == mapFdToTask_.end())
        {
            App::Instance_.openSocket_.close(pid(), data->fd_);
            return;
        }
        auto& task = iter->second;
        auto& response = task.request_->response_;
        //處理返回http頭
        if (response.code_ == -1)
        {
            response.head_.append(data->data(), data->size());
            const char* ptr = strstr(response.head_.data(), "\r\n\r\n");
            if (!ptr) return;
            response.code_ = 0;
            response.body_.append(ptr + 4);
            response.head_.resize(ptr - response.head_.data() + 2);
            response.parseHeader();
        }
        //處理返回的http的body
        else
        {
            response.body_.append(data->data(), data->size());
        }
        if (response.clen_ > 0)
        {
            if (response.clen_ >= response.body_.size())
                response.body_.resize(response.clen_);
            App::Instance_.openSocket_.close(pid(), data->fd_);
        }
        else if (response.body_.size() > 2)
        {
            if (response.body_[response.body_.size() - 2] == '\r' && response.body_.back() == '\n')
            {
                response.body_.pop_back();
                response.body_.pop_back();
                App::Instance_.openSocket_.close(pid(), data->fd_);
            }
        }
    }
    //與Http服務(wù)器關(guān)閉的消息昧港,喚醒請(qǐng)求線程擎椰,并對(duì)fd綁定的任務(wù),移出任務(wù)列表
    void onClose(const std::shared_ptr<OpenSocketMsg>& data)
    {
        auto iter = mapFdToTask_.find(data->fd_);
        if (iter != mapFdToTask_.end())
        {
            iter->second.openSync_.wakeup();
            mapFdToTask_.erase(iter);
        }
    }
    //接收綁定此線程的socket消息创肥。
    void onSocket(const SocketProto& proto)
    {
        const auto& msg = proto.data_;
        switch (msg->type_)
        {
        case OpenSocket::ESocketData:
            onRead(msg);
            break;
        case OpenSocket::ESocketClose:
            onClose(msg);
            break;
        case OpenSocket::ESocketError:
            printf("[%s]ESocketError:%s\n", ThreadName((int)msg->uid_).c_str(), msg->info());
            onClose(msg);
            break;
        case OpenSocket::ESocketWarning:
            printf("[%s]ESocketWarning:%s\n", ThreadName((int)msg->uid_).c_str(), msg->info());
            break;
        case OpenSocket::ESocketOpen:
            onSend(msg);
            break;
        case OpenSocket::ESocketAccept:
        case OpenSocket::ESocketUdp:
            assert(false);
            break;
        default:
            break;
        }
    }
    //處理static bool Http(std::shared_ptr<HttpRequest>& request)發(fā)過(guò)來(lái)的消息
    virtual void onMsg(OpenThreadMsg& msg)
    {
        const BaseProto* data = msg.data<BaseProto>();
        if (!data) return;
        if (!data->isSocket_)
        {
            TaskProto* proto = msg.edit<TaskProto>();
            if (proto) onHttp(*proto);
        }
        else
        {
            const SocketProto* proto = msg.data<SocketProto>();
            if (proto) onSocket(*proto);
        }
    }
    std::map<int, TaskProto> mapFdToTask_;
public:
    static bool Http(std::shared_ptr<HttpRequest>& request)
    {
        if (request->ip_.empty())
        {
            assert(false);
            return false;
        }
        //類型線程池中达舒,選擇一個(gè)。
        auto worker = Instance_.getWorker();
        if (!worker)  return false;
        auto proto = std::shared_ptr<TaskProto>(new TaskProto);
        proto->request_ = request;
        proto->isSocket_ = false;
        //接收消息地方:virtual void onMsg(OpenThreadMsg& msg)
        bool ret = OpenThread::Send(worker->pid(), proto);
        assert(ret);
        //阻塞叹侄,等待http請(qǐng)求完成喚醒
        proto->openSync_.await();
        return ret;
    }
};
HttpClient::Factory HttpClient::Instance_;


int main()
{
    auto request = std::shared_ptr<HttpRequest>(new HttpRequest);
    //請(qǐng)求交易所的最新龍虎數(shù)據(jù)
    request->setUrl("http://reportdocs.static.szse.cn/files/text/jy/jy230308.txt");
    request->method_ = "GET";

    //自定義Http請(qǐng)求頭
    (*request)["Host"] = "reportdocs.static.szse.cn";
    (*request)["Accept"] = "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7";
    (*request)["Accept-Encoding"] = "gzip,deflate";
    (*request)["Accept-Language"] = "zh-CN,zh;q=0.9";
    (*request)["Cache-Control"] = "max-age=0";
    (*request)["User-Agent"] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36(KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36";
    (*request)["Upgrade-Insecure-Requests"] = "1";

    //發(fā)送http請(qǐng)求
    HttpClient::Http(request);
    //返回http請(qǐng)求
    auto& response = request->response_;
    printf("code:%d, header:%s\n", response.code_, response.head_.c_str());
    return getchar();
}

編譯和執(zhí)行

請(qǐng)安裝cmake工具巩搏,用cmake可以構(gòu)建出VS或者XCode工程,就可以在vs或者xcode上編譯運(yùn)行趾代。
源代碼:https://github.com/openlinyou/opensocket
https://gitee.com/linyouhappy/opensocket

#克隆項(xiàng)目
git clone https://github.com/openlinyou/opensocket
cd ./opensocket
#創(chuàng)建build工程目錄
mkdir build
cd build
cmake ..
#如果是win32贯底,在該目錄出現(xiàn)opensocket.sln,點(diǎn)擊它就可以啟動(dòng)vs寫(xiě)代碼調(diào)試
make
./httpclient
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末撒强,一起剝皮案震驚了整個(gè)濱河市禽捆,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌飘哨,老刑警劉巖胚想,帶你破解...
    沈念sama閱讀 217,277評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異芽隆,居然都是意外死亡浊服,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門摆马,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)臼闻,“玉大人,你說(shuō)我怎么就攤上這事囤采∈瞿牛” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 163,624評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵蕉毯,是天一觀的道長(zhǎng)乓搬。 經(jīng)常有香客問(wèn)我,道長(zhǎng)代虾,這世上最難降的妖魔是什么进肯? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,356評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮棉磨,結(jié)果婚禮上江掩,老公的妹妹穿的比我還像新娘。我一直安慰自己,他們只是感情好环形,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,402評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布策泣。 她就那樣靜靜地躺著,像睡著了一般抬吟。 火紅的嫁衣襯著肌膚如雪萨咕。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,292評(píng)論 1 301
  • 那天火本,我揣著相機(jī)與錄音危队,去河邊找鬼。 笑死钙畔,一個(gè)胖子當(dāng)著我的面吹牛茫陆,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播刃鳄,決...
    沈念sama閱讀 40,135評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼盅弛,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了叔锐?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 38,992評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤见秽,失蹤者是張志新(化名)和其女友劉穎愉烙,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體解取,經(jīng)...
    沈念sama閱讀 45,429評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡步责,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,636評(píng)論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了禀苦。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蔓肯。...
    茶點(diǎn)故事閱讀 39,785評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖振乏,靈堂內(nèi)的尸體忽然破棺而出蔗包,到底是詐尸還是另有隱情,我是刑警寧澤慧邮,帶...
    沈念sama閱讀 35,492評(píng)論 5 345
  • 正文 年R本政府宣布调限,位于F島的核電站,受9級(jí)特大地震影響误澳,放射性物質(zhì)發(fā)生泄漏耻矮。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,092評(píng)論 3 328
  • 文/蒙蒙 一忆谓、第九天 我趴在偏房一處隱蔽的房頂上張望裆装。 院中可真熱鬧,春花似錦、人聲如沸哨免。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,723評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)铁瞒。三九已至妙色,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間慧耍,已是汗流浹背身辨。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,858評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留芍碧,地道東北人煌珊。 一個(gè)月前我還...
    沈念sama閱讀 47,891評(píng)論 2 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像泌豆,于是被迫代替她去往敵國(guó)和親定庵。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,713評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容