Boost.Asio和Libuv服務(wù)器實(shí)現(xiàn)案例

1 簡(jiǎn)介

Boost.Asio和Libuv都是非常優(yōu)秀的網(wǎng)絡(luò)通訊框架。本文使用兩種技術(shù)蛋欣,在CentOS上各自實(shí)現(xiàn)一套服務(wù)程序蚌铜,實(shí)現(xiàn)從命名管道讀取數(shù)據(jù)作為輸入谈况,然后將所有數(shù)據(jù)通過Socket/TCP轉(zhuǎn)發(fā)到Socket客戶端的功能。

異步數(shù)據(jù)處理就是指早抠,任務(wù)觸發(fā)后不需要等待它們完成霎烙。 相反,LibUV/Boost.Asio 會(huì)在任務(wù)完成時(shí)觸發(fā)一個(gè)應(yīng)用蕊连。

網(wǎng)絡(luò)程序設(shè)計(jì)中有兩種主流的設(shè)計(jì)模式:Reactor和Proactor悬垃。兩者的區(qū)別,用知乎上的一個(gè)神回答解釋:

如何深刻理解reactor和proactor甘苍?
reactor:能收了你跟俺說一聲尝蠕。
proactor: 你給我收十個(gè)字節(jié),收好了跟俺說一聲载庭。

LibUV是Node.js底層所采用的異步通訊框架看彼,所有的通訊機(jī)制可以在同一個(gè)線程中異步完成。Nodejs的背后是V8和libuv和IOCP(windows)囚聚,libuv接受V8解析后的請(qǐng)求事件闲昭,用event loop來注冊(cè)事件,并提交用戶代碼來進(jìn)行實(shí)際操作靡挥,libuv是屬于所謂的reactor pattern的序矩,即用戶可以注冊(cè)一個(gè)”讀回調(diào)“來處理發(fā)生在字節(jié)流上的讀操作。

Linux下高性能的網(wǎng)絡(luò)庫(kù)中大多使用的Reactor 模式去實(shí)現(xiàn),Boost Asio在Linux下用epoll和select去模擬proactor模式,影響了它的效率和實(shí)現(xiàn)復(fù)雜度跋破。為什么Boost.Asio使用Proactor模式呢簸淀?借用知乎陳聰?shù)幕卮?/a>來解釋吧:

Windows 下很難實(shí)現(xiàn)高效可伸縮的 Reactor。首先毒返,Win32 API 里 WaitForMultipleObjects 只能同時(shí)等待 64 個(gè) handle (MAXIMUM_WAIT_OBJECTS)租幕;其次 WinSock 的 select() 實(shí)現(xiàn)又很 buggy,特別是在錯(cuò)誤處理方面有很多奇葩行為(具體見各種跨平臺(tái)網(wǎng)絡(luò)庫(kù)代碼中對(duì)此的注釋)拧簸;最后劲绪,Windows Vista 新增的 WSAPoll() 函數(shù)與 POSIX 的 poll() 又不盡兼容( daniel.haxx.se/blog/201 )。

Windows 有自己的一套高效異步IO模型(幾乎等同于Proactor)盆赤,同時(shí)支持文件IO和網(wǎng)絡(luò)IO贾富;但 Linux 只有高效的網(wǎng)絡(luò)同步IO(epoll 之類的 io multiplexing 是同步的Reactor,且不支持磁盤文件)牺六,二者的高效IO編程模型從根本上不兼容(Windows 可以把網(wǎng)絡(luò)事件發(fā)到 GUI 線程的事件隊(duì)列中颤枪,有點(diǎn)類似 Reactor,但是似乎一個(gè)進(jìn)程只能有一個(gè) GUI 線程淑际,因此在多核系統(tǒng)上其伸縮性受限)畏纲。

因此扇住,ASIO 要想高效且跨平臺(tái),只能用 Proactor 模型了盗胀。不可避免地會(huì)在 Linux 上損失一點(diǎn)兒效率艘蹋。

換句話說,proactor需要利用操作系統(tǒng)的底層異步API票灰,由內(nèi)核線程并行進(jìn)行實(shí)際的I/O讀寫操作簿训,可以實(shí)現(xiàn)靈活的異步回調(diào),并且在回調(diào)之前數(shù)據(jù)已經(jīng)準(zhǔn)備好并放在用戶緩存中了米间。

在2017年强品,Boost.Asio可能會(huì)進(jìn)入C++標(biāo)準(zhǔn)。為什么 Proactor 是最佳模型屈糊?

  • 跨平臺(tái) 許多操作系統(tǒng)都有異步API的榛,即便是沒有異步API的Linux, 通過 epoll 也能模擬 Proactor 模式。
  • 支持回調(diào)函數(shù)組合 將一系列異步操作進(jìn)行組合逻锐,封裝成對(duì)外的一個(gè)異步調(diào)用夫晌。這個(gè)只有Proactor能做到,Reactor 做不到昧诱。意味著如果asio使用Reactor模式晓淀,就對(duì)不起他“庫(kù)” 之名。
  • 相比 Reactor 可以實(shí)現(xiàn) Zero-copy
  • 和線程解耦盏档。 長(zhǎng)時(shí)間執(zhí)行的過程總是由操作系統(tǒng)異步完成凶掰,應(yīng)用程序無需為此開啟線程。
    Proactor 也并非全無缺點(diǎn)蜈亩,缺點(diǎn)就是內(nèi)存占用比 Reactor 大懦窘。Proactor 需要先分配內(nèi)存而后處理IO, 而 Reactor 是先等待 IO 而后分配內(nèi)存。相對(duì)的Proactor卻獲得了Zero-copy好處稚配。因?yàn)閮?nèi)存已經(jīng)分配好了畅涂,因此操作系統(tǒng)可以將接受到的網(wǎng)絡(luò)數(shù)據(jù)直接從網(wǎng)絡(luò)接口拷貝到應(yīng)用程序內(nèi)存,而無需經(jīng)過內(nèi)核中轉(zhuǎn)道川。

2 使用Libuv

2.1簡(jiǎn)介

LibUV是Node.js底層所采用的異步通訊框架午衰,所有的通訊機(jī)制可以在同一個(gè)線程中異步完成。不過libuv不僅僅支持異步io操作冒萄,而且還具有一個(gè)強(qiáng)勁的線程池臊岸,用于支持多線程并行的cpu密集型操作(參考[3])。

本文描述的Libuv服務(wù)程序就僅使用了一個(gè)線程(即主線程)宦言。這樣的好處是無需做資源同步控制扇单,代碼簡(jiǎn)潔商模。
LibUV使用異步事件驅(qū)動(dòng)編程奠旺。程序的運(yùn)行主體是一個(gè)事件分發(fā)循環(huán)(event-loop)蜘澜。在事件驅(qū)動(dòng)編程中,程序會(huì)關(guān)注每一個(gè)事件响疚,并且對(duì)每一個(gè)事件的發(fā)生做出反應(yīng)鄙信。libuv會(huì)負(fù)責(zé)將來自操作系統(tǒng)的事件收集起來,或者監(jiān)視其他來源的事件忿晕。這樣装诡,用戶就可以注冊(cè)回調(diào)函數(shù),回調(diào)函數(shù)會(huì)在事件發(fā)生的時(shí)候被調(diào)用践盼。event-loop會(huì)一直保持運(yùn)行狀態(tài)鸦采。用偽代碼描述如下:

while there are still events to process:
    e = get the next event
    if there is a callback associated with e:
        call the callback

2.2 初始化Libuv

創(chuàng)建一個(gè)uv_loop_t*類型的全局變量。

uv_loop_t* loop;

在main()函數(shù)中進(jìn)行初始化:

int main(int argc,char **argv) {
    …
    loop = uv_default_loop();
    uv_pipe_t pipe;
    uv_fs_t file_req;
    int fd=uv_fs_open(loop,&file_req,argv[2],O_CREAT|O_RDWR,0644,NULL);
    uv_pipe_init(loop, &pipe, 0);
    uv_pipe_open(&pipe,fd);
    uv_read_start((uv_stream_t*)&pipe,alloc_buffer,read_pipe);

    return uv_run(loop, UV_RUN_DEFAULT);
}

2.3 建立Socket服務(wù)器

在main()函數(shù)中構(gòu)建Socket/TCP服務(wù)器:

int DEFAULT_PORT=0;
int main(int argc,char **argv) {
    …
    uv_tcp_t server;
    uv_tcp_init(loop, &server);
    uv_ip4_addr("0.0.0.0", DEFAULT_PORT, &addr);
    uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0);
    int r = uv_listen((uv_stream_t*)&server, DEFAULT_BACKLOG, on_new_connection);
    if (r) {
        fprintf(stderr, "Listen error %s\n", uv_strerror(r));
        return 1;
    }

    return uv_run(loop, UV_RUN_DEFAULT);
}

其中on_new_connection()函數(shù)用于接收客戶端連接咕幻。

vector<uv_stream_t*>client_pool;
void on_new_connection(uv_stream_t *server, int status) {
    if (status < 0) {
        fprintf(stderr, "New connection error %s\n", uv_strerror(status));
        // error!
        return;
    }

    uv_tcp_t *client = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
    uv_tcp_init(loop, client);
    if (uv_accept(server, (uv_stream_t*)client) == 0) {
        client_pool.push_back((uv_stream_t*)client);
        uv_read_start((uv_stream_t*)client, alloc_buffer, recv_cb);
    }
    else {
        uv_close((uv_handle_t*)client, NULL);
    }
}

在收到客戶端連接時(shí)渔伯,將新建立的客戶端連接對(duì)象放入client_pool數(shù)組中,作為當(dāng)前客戶端連接隊(duì)列肄程。
當(dāng)客戶端退出或異常關(guān)閉時(shí)锣吼,從該隊(duì)列中刪除,在recv_cb()函數(shù)中判斷客戶端關(guān)閉狀態(tài)蓝厌。recv_cb()函數(shù)在接收到客戶端發(fā)送的數(shù)據(jù)時(shí)被調(diào)用玄叠,當(dāng)返回的nread<0時(shí),表明連接狀態(tài)發(fā)生異常變化拓提。

2.4實(shí)時(shí)監(jiān)聽管道

在main()函數(shù)中構(gòu)建管道監(jiān)聽服務(wù):

int main(int argc,char **argv) {
    …
    uv_pipe_t pipe;
    uv_fs_t file_req;
    int fd=uv_fs_open(loop,&file_req,argv[2],O_CREAT|O_RDWR,0644,NULL);
    uv_pipe_init(loop, &pipe, 0);
    uv_pipe_open(&pipe,fd);
    uv_read_start((uv_stream_t*)&pipe,alloc_buffer,read_pipe);

    return uv_run(loop, UV_RUN_DEFAULT);
}

當(dāng)管道中出現(xiàn)新數(shù)據(jù)時(shí)读恃,libuv會(huì)異步調(diào)用read_pipe()函數(shù)。

void read_pipe(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
    if (nread <0) {
        if (nread != UV_EOF)
            LOGE("Read pipe error "<<uv_err_name(nread)<<"\r\n");
        uv_close((uv_handle_t*)stream, NULL);   
        if (buf->base)
            free(buf->base);
        //free(buf);
    }
    else {
        if (nread > 0) {
            // 此處進(jìn)行Socket轉(zhuǎn)發(fā)
        }
        else{
            /* Everything OK, but nothing read. */
            if(buf->base)
                free(buf->base);
        }
    }

}

注意代态,當(dāng)管道讀取正常但無數(shù)據(jù)時(shí)狐粱,需要使用free(buf->base)清空緩存,因?yàn)樵趌ibuv的文檔中有這么一句:
Note
nread might be 0, which does not indicate anerror or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under read(2).

2.5Socket數(shù)據(jù)轉(zhuǎn)發(fā)

在管道監(jiān)聽回調(diào)函數(shù)read_pipe()中胆数,向所有連接的Socket客戶端轉(zhuǎn)發(fā)管道中的數(shù)據(jù)肌蜻。

int curmcount=0;
for(int i=0;i<client_pool.size();i++)
{               
    write_req_t *req=(write_req_t*)malloc(sizeof(write_req_t));
    req->buf=uv_buf_init(buf->base,nread);
    curmcount++;
    uv_write((uv_write_t *)req, client_pool[i], &req->buf, 1, echo_write);
}   
if(curmcount>0){
    M[buf->base] = curmcount;
}
else
{
    /* Everything OK, no client. */
    if(buf->base)
        free(buf->base);
}

每次從管道獲得數(shù)據(jù)時(shí),當(dāng)客戶端數(shù)量大于0時(shí)必尼,遍歷客戶端列表蒋搜,依次調(diào)用uv_write()進(jìn)行數(shù)據(jù)發(fā)送。因?yàn)閘ibuv是異步工作機(jī)制判莉,我們必須確保寫數(shù)據(jù)緩存一直有效豆挽,直到寫操作成功返回。所以我們應(yīng)該在寫操作的回調(diào)函數(shù)echo_write()中刪除當(dāng)前寫數(shù)據(jù)緩存券盅,而不是在調(diào)用uv_write()之后立即刪除帮哈,這就需要我們進(jìn)行緩存管理。
我們使用一個(gè)std::map M來存儲(chǔ)每一個(gè)寫數(shù)據(jù)緩存及其使用計(jì)數(shù)的映射锰镀,通過echo_write()中計(jì)算調(diào)用次數(shù)來判斷是否完成了所有客戶端的異步發(fā)送工作娘侍。在echo_write()每次調(diào)用時(shí)咖刃,所對(duì)應(yīng)的寫數(shù)據(jù)緩存使用計(jì)數(shù)減一,當(dāng)使用計(jì)數(shù)降為0時(shí)憾筏,刪除緩存數(shù)據(jù)嚎杨。

map<char*,int> M;
void echo_write(uv_write_t *req, int status) {
    if (status) {
        fprintf(stderr, "Write error %s\n", uv_strerror(status));
    }
    M[((write_req_t *)req)->buf.base]--;
    write_req_t *wr=(write_req_t *)req;
    if(M[((write_req_t *)req)->buf.base]==0)
       {
        auto pos=M.find(((write_req_t *)req)->buf.base);
        if(pos!=M.end())
            M.erase(pos);
        free(wr->buf.base); 
    }
    free(wr);
}

當(dāng)沒有客戶端連接時(shí),同樣需要直接使用free(buf->base)清空緩存氧腰。
由于std::map會(huì)不斷的擴(kuò)大自身容器空間(非容器所存儲(chǔ)的元素?cái)?shù)量)枫浙,通過erase()可以縮減容器空間,也可以通過swap()來一次性釋放:

if (M.size() == 0){
    map<char*, int> temp;
    M.swap(temp);
}

2.6已知問題

當(dāng)某些Socket客戶端接收數(shù)據(jù)非常緩慢時(shí)古拴,會(huì)導(dǎo)致M隊(duì)列持續(xù)增長(zhǎng)箩帚,出現(xiàn)內(nèi)存持續(xù)增長(zhǎng)問題。本文中不提供該問題的解決方式黄痪。

3使用Boost.Asio

3.1簡(jiǎn)介

名字本身就說明了一切:Asio意即異步輸入/輸出膏潮。該庫(kù)可以讓C++異步地處理數(shù)據(jù),且平臺(tái)獨(dú)立满力。
使用Boost.Asio進(jìn)行異步數(shù)據(jù)處理的應(yīng)用程序基于兩個(gè)概念:I/O服務(wù)和I/O對(duì)象焕参。I/O服務(wù)抽象了操作系統(tǒng)的接口,允許第一時(shí)間進(jìn)行異步數(shù)據(jù)處理油额,而I/O對(duì)象則用于初始化特定的操作叠纷。 鑒于Boost.Asio只提供了一個(gè)名為boost::asio::io_service的類作為I/O服務(wù),Boost.Asio使用io_service同操作系統(tǒng)的輸入/輸出服務(wù)進(jìn)行交互潦嘶,它針對(duì)所支持的每一個(gè)操作系統(tǒng)都分別實(shí)現(xiàn)了優(yōu)化的類涩嚣,另外庫(kù)中還包含了針對(duì)不同I/O對(duì)象的幾個(gè)類。 其中掂僵,類boost::asio::ip::tcp::socket用于通過網(wǎng)絡(luò)發(fā)送和接收數(shù)據(jù)航厚,而類boost::asio::deadline_timer則提供了一個(gè)計(jì)時(shí)器,用于測(cè)量某個(gè)固定時(shí)間點(diǎn)到來或是一段指定的時(shí)長(zhǎng)過去了锰蓬。
不同于LibUV服務(wù)僅有一個(gè)線程的結(jié)構(gòu)幔睬。在本程序中,使用兩個(gè)線程芹扭,分別進(jìn)行管道讀取和Socket服務(wù)麻顶,后者為主線程,Asio的I/O服務(wù)運(yùn)行在其中舱卡。

3.2 初始化Asio

首先辅肾,程序至少需要一個(gè)io_service實(shí)例。通常一個(gè)io_service的實(shí)例就足夠了轮锥。在本程序中矫钓,使用一個(gè)全局的I/O服務(wù)對(duì)象:io_service:

boost::asio::io_service  io_service;
// ...
//其他代碼
// ...
int main(int argc,char** argv)
{
    // ...
    return0;
}

3.2.1 I/O服務(wù)池vs I/O線程池

有時(shí)候?yàn)榱颂幚砟承╅L(zhǎng)時(shí)間任務(wù),需要使用多個(gè)I/O線程。
有兩種I/O服務(wù)與I/O線程的對(duì)應(yīng)關(guān)系:
·第一種新娜,僅使用一個(gè)I/O服務(wù)赵辕,同時(shí)使用多個(gè)I/O線程,即多個(gè)線程同時(shí)調(diào)用io_service::run()函數(shù):

vector<boost::shared_ptr<boost::thread> > threads;
void Start()
{
    for (int i = 0; i != nThreads; ++i)
    {
        boost::shared_ptr pTh(new boost::thread(
            boost::bind(&boost::asio::io_service::run, &io_service)));
        threads.push_back(pTh);
    }
}

此時(shí)杯活,io_service.post()異步運(yùn)行的方法可能會(huì)在線程池中的任意一個(gè)線程中被調(diào)用匆帚。如果其中涉及到資源同步問題熬词,要么進(jìn)行同步訪問控制旁钧,要么使用strand[1],確保異步調(diào)用的方法被順序執(zhí)行互拾⊥峤瘢或者,為了避免這種問題颜矿,可以采用第二種方式寄猩。
·第二種,使用與線程同樣數(shù)量的I/O服務(wù)對(duì)象骑疆,

vector<boost::shared_ptr<boost::thread> > threads;
vector<boost::shared_ptr<io_service> >io_services;
void Start()
{
    for (int i = 0; i != nThreads; ++i)
    {
        boost::shared_ptr pTh(new boost::thread(
            boost::bind(&boost::asio::io_service::run, &io_services[i])));
        threads.push_back(pTh);
    }
}

此時(shí)田篇,每一個(gè)io_service的run()函數(shù)僅被一個(gè)線程調(diào)用。

3.3建立Socket服務(wù)器

然后你指定你想要監(jiān)聽的端口箍铭,再創(chuàng)建一個(gè)接收器——一個(gè)用來接收客戶端連接的對(duì)象(boost::ip::tcp::acceptor)泊柬,我們將io_service的對(duì)象引用以及接收器封裝在類server中,通過start_accept()函數(shù)啟動(dòng)監(jiān)聽诈火,并調(diào)用io_service_.run()啟動(dòng)Asio的I/O服務(wù)事件循環(huán)(類似于LibUV的event-loop)兽赁。
同樣,為了維護(hù)客戶端連接狀態(tài)冷守,使用了一個(gè)數(shù)組來保存客戶端列表:

vector<boost::shared_ptr<session> >client_pool;

然后刀崖,定義一個(gè)類server來封裝服務(wù)器的處理邏輯:

class server
{
public:
    server(boost::asio::io_service &io_service, string IP, short port) :io_service_(io_service), acceptor_(io_service,tcp::endpoint(address::from_string(IP), port))
    {
    }
    void run()
    {
        start_accept();
        io_service_.run();
    }
    void start_accept()
    {   
        boost::shared_ptr<session> new_session(new session(io_service_));
        acceptor_.async_accept(new_session->socket(), boost::bind(&server::handle_accept, this, new_session, boost::asio::placeholders::error));
    }
    void handle_accept(boost::shared_ptr<session> new_session, const boost::system::error_code &error)
    {
        start_accept();
        if (!error)
        {
            client_pool.push_back(new_session);
            return;
        }
    }

    // start_send() ...

private:
    boost::asio::io_service &io_service_;
    tcp::acceptor acceptor_;
};
    
server* g_s;

類server的start_accept()函數(shù)中,首先創(chuàng)建一個(gè)虛擬的Socket對(duì)象拍摇,通過aysnc_accept()函數(shù)來等待客戶端的連接亮钦。然后當(dāng)一個(gè)連接建立時(shí),會(huì)異步調(diào)用handle_accept()函數(shù)充活。此時(shí)或悲,一個(gè)簡(jiǎn)單的程序可能會(huì)建立一個(gè)線程,在線程中執(zhí)行一些簡(jiǎn)單的指令堪唐,例如:

// sock為剛剛從接收器中得到的新的客戶端連接對(duì)象
boost::thread(boost::bind(client_session, sock));
// ...
void client_session(socket_ptr sock) {
    while (true) {
        char data[512];
        size_t len = sock->read_some(buffer(data));
        if (len > 0)
            write(*sock, buffer("ok", 2));
    }
}

而在我們的程序中巡语,我們僅僅是將客戶端連接封裝為一個(gè)類session對(duì)象,放入客戶端隊(duì)列中淮菠,當(dāng)從管道接收到數(shù)據(jù)時(shí)男公,才會(huì)真正觸發(fā)與客戶端通信的指令。
然后,在main()函數(shù)中初始化服務(wù)器:

int main(int argc, char **argv)
{   
    sscanf(argv[1],"%d",&DEFAULT_PORT);
    g_s=new server(io_service, "0.0.0.0", DEFAULT_PORT);
    g_s->run();
    return 0;
}

我們?cè)趍ain()函數(shù)主線程中調(diào)用Server::run()函數(shù)枢赔,后者調(diào)用了io_service.run()澄阳,也就是說,本程序的I/O服務(wù)運(yùn)行在主線程中踏拜。

3.4維護(hù)Socket連接(class session)

使用類session來封裝一個(gè)遠(yuǎn)程客戶端連接碎赢,包含一個(gè)socket對(duì)象和一個(gè)發(fā)送緩沖區(qū),以及各個(gè)異步回調(diào)函數(shù)的定義速梗。

class session :public boost::enable_shared_from_this<session>
{
public:
    session(boost::asio::io_service &io_service) :socket_(io_service)
    {
    }
    tcp::socket &socket()
    {
        return socket_;
    }
    void start_write()
    {
        //...
    }
private:
    void handle_read(const boost::system::error_code &error, size_t bytes_transferred)
    {
        if (error) return;      
    }
    void handle_write(const boost::system::error_code &error)
    {       
        //...
    }
    tcp::socket socket_;
    boost::asio::streambuf send_buffer;
};

由于僅使用了一個(gè)I/O服務(wù)和一個(gè)(默認(rèn)的)I/O服務(wù)線程肮塞,客戶端列表在該線程中維護(hù),無需線程間同步訪問控制姻锁。

3.5實(shí)時(shí)監(jiān)聽管道

利用額外的線程枕赵,通過Linux自身的文件接口訪問管道,并同步監(jiān)聽位隶。從管道中讀取新數(shù)據(jù)后拷窜,通過I/O服務(wù)的post()函數(shù),將接收到的數(shù)據(jù)從管道線程發(fā)送到I/O服務(wù)線程涧黄。

char *pipe_name;
void read_pipe()
{   
    int fd=open(pipe_name,O_RDONLY);
    if(fd<0)
    {
        printf("Fail to open mybuf: %s .\n",strerror(errno));
        return;
    }
    int nbytes=0;
    char *buf=(char *)malloc(suggested_size);
    while(true)
    {
        nbytes=read(fd,buf,suggested_size);
        if(nbytes<=0)
        {
            continue;
        }
        boost::shared_ptr<string> msg(new string(buf,nbytes));
    
        io_service.post(boost::bind(&server::start_send, g_s, msg));
    }
    free(buf);
}
int main(int argc, char **argv)
{   
    //...
    sscanf(argv[1],"%d",&DEFAULT_PORT);
    pipe_name=argv[2];
    boost::thread(&read_pipe);
    g_s=new server(io_service, "0.0.0.0", DEFAULT_PORT);
    g_s->run();
    return 0;
}

Boost.Asio可以讓我們異步地運(yùn)行任何方法篮昧。僅僅需要使用post():

void my_func() {
    ...
}
io_service.post(my_func);

這樣就可以保證my_func在調(diào)用了io_service.run()方法的某一個(gè)線程中被調(diào)用(參見“I/O服務(wù)池vs I/O線程池”說明)。在我們的程序中笋妥,server::start_send()函數(shù)將會(huì)在主線程中執(zhí)行(因?yàn)閞un()在主線程被調(diào)用)懊昨。

3.6 Socket數(shù)據(jù)轉(zhuǎn)發(fā)

3.6.1 async_write與async_write_some

異步TCP通訊使用async_write與async_write_some進(jìn)行數(shù)據(jù)發(fā)送。async_write_some發(fā)送一段數(shù)據(jù)挽鞠,但是可能最終僅發(fā)送出一部分?jǐn)?shù)據(jù)疚颊,async_write內(nèi)部(可能多次)調(diào)用async_write_some來確保所有數(shù)據(jù)發(fā)送成功。
不同操作系統(tǒng)環(huán)境下async_write_some的執(zhí)行過程可能不一樣信认。在Windows下材义,async_write_some會(huì)將數(shù)據(jù)發(fā)送請(qǐng)求交給Windows操作系統(tǒng)內(nèi)核IOCP服務(wù),內(nèi)核執(zhí)行成功之后執(zhí)行回調(diào)嫁赏。如過先后多次執(zhí)行async_write而不是等待回調(diào)之后依次再執(zhí)行其掂,可能會(huì)導(dǎo)致最終發(fā)送數(shù)據(jù)的前后順序出現(xiàn)錯(cuò)誤。

3.6.2程序中代碼實(shí)現(xiàn)

為了確保數(shù)據(jù)發(fā)送順序的正確性潦蝇,我們必須在每一次async_write執(zhí)行之后款熬,在其回調(diào)函數(shù)中繼續(xù)下一次async_write操作。所以我們建立了一個(gè)全局消息隊(duì)列攘乒,針對(duì)每個(gè)客戶端連接各自維護(hù)一個(gè)消息隊(duì)列贤牛。

map<session*, queue<boost::shared_ptr<string> > >msgQueues;
class server
{
    // ...
    void start_send(boost::shared_ptr<string> &msg)
    {       
        int csize=client_pool.size();
        for(int i=0;i<csize;i++)
        {
            msgQueues[client_pool[i].get()].push(msg);  
            if(msgQueues[client_pool[i].get()].size()==1)
            {
                client_pool[i]->start_write();
            }
        }
    }
    // ...
}

管道中收到的消息通過server::start_send()加入到各個(gè)客戶端的消息隊(duì)列中,當(dāng)且僅當(dāng)消息隊(duì)列長(zhǎng)度為1(即原隊(duì)列為空)则酝,再去觸發(fā)消息隊(duì)列的發(fā)送殉簸,確保后續(xù)各個(gè)客戶端的每一條消息都是在回調(diào)流程中順序發(fā)送的。
發(fā)往客戶端的數(shù)據(jù)處理:

class session
{
    // ...
    void start_write(){
        if(msgQueues[this].size()>0)
        {
            std::ostream stream(&send_buffer);
            string msg(*msgQueues[this].front());
            stream<<msg;
            boost::asio::async_write(socket_,send_buffer,boost::bind(&session::handle_write,shared_from_this(),boost::asio::placeholders::error));
        }           
    }
    // ...
    void handle_write(const boost::system::error_code &error)
    {       
        //隊(duì)列頭發(fā)送完成后刪除
        msgQueues[this].pop();  
        if(error)
        {   
            //清空消息隊(duì)列        
            auto cur=msgQueues.find(this);
            if(cur!=msgQueues.end())
            {
                while(!msgQueues[this].empty())
                {
                    msgQueues[this].pop();
                }
                msgQueues.erase(cur);
            }
            //客戶端寫錯(cuò)誤 刪除客戶端
                for(auto p=client_pool.begin();p!=client_pool.end();p++)
                {
                if(p->get()==this)
                {
                    client_pool.erase(p);
                    break;
                }                  
               }
        }
        // 繼續(xù)處理下一條消息
        start_write();
    }
    // ...
}

4 性能比較

依據(jù)參考資料[2]的測(cè)試分析,如果要進(jìn)行性能測(cè)試般卑,應(yīng)該選擇合適的評(píng)比基準(zhǔn)武鲁。
對(duì)于LibUV和Boost.Asio,兩者在Windows下都是使用IOCP完成的蝠检,所以其實(shí)最終的性能確實(shí)應(yīng)該是相當(dāng)?shù)你迨螅蛟S最重要的還是先知道兩者實(shí)現(xiàn)上的異同吧。

參考資料

[1]參考http://www.crazygaze.com/blog/2016/03/17/how-strands-work-and-why-you-should-use-them/叹谁,以及Remotery(https://github.com/Celtoys/Remotery)饲梭,查看多線程競(jìng)爭(zhēng)資源的運(yùn)行狀態(tài)。
[2] Practical difference between epoll and Windows IO Completion Ports (IOCP)
[3] 征服優(yōu)雅本慕、高效的Libuv庫(kù)之初識(shí)篇

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末排拷,一起剝皮案震驚了整個(gè)濱河市侧漓,隨后出現(xiàn)的幾起案子锅尘,更是在濱河造成了極大的恐慌,老刑警劉巖布蔗,帶你破解...
    沈念sama閱讀 218,755評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件藤违,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡纵揍,警方通過查閱死者的電腦和手機(jī)顿乒,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來泽谨,“玉大人璧榄,你說我怎么就攤上這事“杀ⅲ” “怎么了骨杂?”我有些...
    開封第一講書人閱讀 165,138評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)雄卷。 經(jīng)常有香客問我搓蚪,道長(zhǎng),這世上最難降的妖魔是什么丁鹉? 我笑而不...
    開封第一講書人閱讀 58,791評(píng)論 1 295
  • 正文 為了忘掉前任妒潭,我火速辦了婚禮,結(jié)果婚禮上揣钦,老公的妹妹穿的比我還像新娘雳灾。我一直安慰自己,他們只是感情好冯凹,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,794評(píng)論 6 392
  • 文/花漫 我一把揭開白布谎亩。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪团驱。 梳的紋絲不亂的頭發(fā)上摸吠,一...
    開封第一講書人閱讀 51,631評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音嚎花,去河邊找鬼寸痢。 笑死,一個(gè)胖子當(dāng)著我的面吹牛紊选,可吹牛的內(nèi)容都是我干的啼止。 我是一名探鬼主播,決...
    沈念sama閱讀 40,362評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼兵罢,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼献烦!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起卖词,我...
    開封第一講書人閱讀 39,264評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤巩那,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后此蜈,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體即横,經(jīng)...
    沈念sama閱讀 45,724評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年裆赵,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了东囚。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,040評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡战授,死狀恐怖页藻,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情植兰,我是刑警寧澤份帐,帶...
    沈念sama閱讀 35,742評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站钉跷,受9級(jí)特大地震影響弥鹦,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜爷辙,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,364評(píng)論 3 330
  • 文/蒙蒙 一彬坏、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧膝晾,春花似錦栓始、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽禀忆。三九已至,卻和暖如春落恼,著一層夾襖步出監(jiān)牢的瞬間箩退,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評(píng)論 1 270
  • 我被黑心中介騙來泰國(guó)打工佳谦, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留戴涝,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,247評(píng)論 3 371
  • 正文 我出身青樓钻蔑,卻偏偏與公主長(zhǎng)得像啥刻,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子咪笑,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,979評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容