1 簡(jiǎn)介
Boost.Asio和Libuv都是非常優(yōu)秀的網(wǎng)絡(luò)通訊框架。本文使用兩種技術(shù)蛋欣,在CentOS上各自實(shí)現(xiàn)一套服務(wù)程序蚌铜,實(shí)現(xiàn)從命名管道讀取數(shù)據(jù)作為輸入谈况,然后將所有數(shù)據(jù)通過Socket/TCP轉(zhuǎn)發(fā)到Socket客戶端的功能。
異步數(shù)據(jù)處理就是指早抠,任務(wù)觸發(fā)后不需要等待它們完成霎烙。 相反,LibUV/Boost.Asio 會(huì)在任務(wù)完成時(shí)觸發(fā)一個(gè)應(yīng)用蕊连。
網(wǎng)絡(luò)程序設(shè)計(jì)中有兩種主流的設(shè)計(jì)模式:Reactor和Proactor悬垃。兩者的區(qū)別,用知乎上的一個(gè)神回答解釋:
如何深刻理解reactor和proactor甘苍?
reactor:能收了你跟俺說一聲尝蠕。
proactor: 你給我收十個(gè)字節(jié),收好了跟俺說一聲载庭。
LibUV是Node.js底層所采用的異步通訊框架看彼,所有的通訊機(jī)制可以在同一個(gè)線程中異步完成。Nodejs的背后是V8和libuv和IOCP(windows)囚聚,libuv接受V8解析后的請(qǐng)求事件闲昭,用event loop來注冊(cè)事件,并提交用戶代碼來進(jìn)行實(shí)際操作靡挥,libuv是屬于所謂的reactor pattern的序矩,即用戶可以注冊(cè)一個(gè)”讀回調(diào)“來處理發(fā)生在字節(jié)流上的讀操作。
Linux下高性能的網(wǎng)絡(luò)庫(kù)中大多使用的Reactor 模式去實(shí)現(xiàn),Boost Asio在Linux下用epoll和select去模擬proactor模式,影響了它的效率和實(shí)現(xiàn)復(fù)雜度跋破。為什么Boost.Asio使用Proactor模式呢簸淀?借用知乎上陳聰?shù)幕卮?/a>來解釋吧:
Windows 下很難實(shí)現(xiàn)高效可伸縮的 Reactor。首先毒返,Win32 API 里 WaitForMultipleObjects 只能同時(shí)等待 64 個(gè) handle (MAXIMUM_WAIT_OBJECTS)租幕;其次 WinSock 的 select() 實(shí)現(xiàn)又很 buggy,特別是在錯(cuò)誤處理方面有很多奇葩行為(具體見各種跨平臺(tái)網(wǎng)絡(luò)庫(kù)代碼中對(duì)此的注釋)拧簸;最后劲绪,Windows Vista 新增的 WSAPoll() 函數(shù)與 POSIX 的 poll() 又不盡兼容( daniel.haxx.se/blog/201 )。
Windows 有自己的一套高效異步IO模型(幾乎等同于Proactor)盆赤,同時(shí)支持文件IO和網(wǎng)絡(luò)IO贾富;但 Linux 只有高效的網(wǎng)絡(luò)同步IO(epoll 之類的 io multiplexing 是同步的Reactor,且不支持磁盤文件)牺六,二者的高效IO編程模型從根本上不兼容(Windows 可以把網(wǎng)絡(luò)事件發(fā)到 GUI 線程的事件隊(duì)列中颤枪,有點(diǎn)類似 Reactor,但是似乎一個(gè)進(jìn)程只能有一個(gè) GUI 線程淑际,因此在多核系統(tǒng)上其伸縮性受限)畏纲。
因此扇住,ASIO 要想高效且跨平臺(tái),只能用 Proactor 模型了盗胀。不可避免地會(huì)在 Linux 上損失一點(diǎn)兒效率艘蹋。
換句話說,proactor需要利用操作系統(tǒng)的底層異步API票灰,由內(nèi)核線程并行進(jìn)行實(shí)際的I/O讀寫操作簿训,可以實(shí)現(xiàn)靈活的異步回調(diào),并且在回調(diào)之前數(shù)據(jù)已經(jīng)準(zhǔn)備好并放在用戶緩存中了米间。
在2017年强品,Boost.Asio可能會(huì)進(jìn)入C++標(biāo)準(zhǔn)。為什么 Proactor 是最佳模型屈糊?
- 跨平臺(tái) 許多操作系統(tǒng)都有異步API的榛,即便是沒有異步API的Linux, 通過 epoll 也能模擬 Proactor 模式。
- 支持回調(diào)函數(shù)組合 將一系列異步操作進(jìn)行組合逻锐,封裝成對(duì)外的一個(gè)異步調(diào)用夫晌。這個(gè)只有Proactor能做到,Reactor 做不到昧诱。意味著如果asio使用Reactor模式晓淀,就對(duì)不起他“庫(kù)” 之名。
- 相比 Reactor 可以實(shí)現(xiàn) Zero-copy
- 和線程解耦盏档。 長(zhǎng)時(shí)間執(zhí)行的過程總是由操作系統(tǒng)異步完成凶掰,應(yīng)用程序無需為此開啟線程。
Proactor 也并非全無缺點(diǎn)蜈亩,缺點(diǎn)就是內(nèi)存占用比 Reactor 大懦窘。Proactor 需要先分配內(nèi)存而后處理IO, 而 Reactor 是先等待 IO 而后分配內(nèi)存。相對(duì)的Proactor卻獲得了Zero-copy好處稚配。因?yàn)閮?nèi)存已經(jīng)分配好了畅涂,因此操作系統(tǒng)可以將接受到的網(wǎng)絡(luò)數(shù)據(jù)直接從網(wǎng)絡(luò)接口拷貝到應(yīng)用程序內(nèi)存,而無需經(jīng)過內(nèi)核中轉(zhuǎn)道川。
2 使用Libuv
2.1簡(jiǎn)介
LibUV是Node.js底層所采用的異步通訊框架午衰,所有的通訊機(jī)制可以在同一個(gè)線程中異步完成。不過libuv不僅僅支持異步io操作冒萄,而且還具有一個(gè)強(qiáng)勁的線程池臊岸,用于支持多線程并行的cpu密集型操作(參考[3])。
本文描述的Libuv服務(wù)程序就僅使用了一個(gè)線程(即主線程)宦言。這樣的好處是無需做資源同步控制扇单,代碼簡(jiǎn)潔商模。
LibUV使用異步事件驅(qū)動(dòng)編程奠旺。程序的運(yùn)行主體是一個(gè)事件分發(fā)循環(huán)(event-loop)蜘澜。在事件驅(qū)動(dòng)編程中,程序會(huì)關(guān)注每一個(gè)事件响疚,并且對(duì)每一個(gè)事件的發(fā)生做出反應(yīng)鄙信。libuv會(huì)負(fù)責(zé)將來自操作系統(tǒng)的事件收集起來,或者監(jiān)視其他來源的事件忿晕。這樣装诡,用戶就可以注冊(cè)回調(diào)函數(shù),回調(diào)函數(shù)會(huì)在事件發(fā)生的時(shí)候被調(diào)用践盼。event-loop會(huì)一直保持運(yùn)行狀態(tài)鸦采。用偽代碼描述如下:
while there are still events to process:
e = get the next event
if there is a callback associated with e:
call the callback
2.2 初始化Libuv
創(chuàng)建一個(gè)uv_loop_t*類型的全局變量。
uv_loop_t* loop;
在main()函數(shù)中進(jìn)行初始化:
int main(int argc,char **argv) {
…
loop = uv_default_loop();
uv_pipe_t pipe;
uv_fs_t file_req;
int fd=uv_fs_open(loop,&file_req,argv[2],O_CREAT|O_RDWR,0644,NULL);
uv_pipe_init(loop, &pipe, 0);
uv_pipe_open(&pipe,fd);
uv_read_start((uv_stream_t*)&pipe,alloc_buffer,read_pipe);
return uv_run(loop, UV_RUN_DEFAULT);
}
2.3 建立Socket服務(wù)器
在main()函數(shù)中構(gòu)建Socket/TCP服務(wù)器:
int DEFAULT_PORT=0;
int main(int argc,char **argv) {
…
uv_tcp_t server;
uv_tcp_init(loop, &server);
uv_ip4_addr("0.0.0.0", DEFAULT_PORT, &addr);
uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0);
int r = uv_listen((uv_stream_t*)&server, DEFAULT_BACKLOG, on_new_connection);
if (r) {
fprintf(stderr, "Listen error %s\n", uv_strerror(r));
return 1;
}
return uv_run(loop, UV_RUN_DEFAULT);
}
其中on_new_connection()函數(shù)用于接收客戶端連接咕幻。
vector<uv_stream_t*>client_pool;
void on_new_connection(uv_stream_t *server, int status) {
if (status < 0) {
fprintf(stderr, "New connection error %s\n", uv_strerror(status));
// error!
return;
}
uv_tcp_t *client = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init(loop, client);
if (uv_accept(server, (uv_stream_t*)client) == 0) {
client_pool.push_back((uv_stream_t*)client);
uv_read_start((uv_stream_t*)client, alloc_buffer, recv_cb);
}
else {
uv_close((uv_handle_t*)client, NULL);
}
}
在收到客戶端連接時(shí)渔伯,將新建立的客戶端連接對(duì)象放入client_pool數(shù)組中,作為當(dāng)前客戶端連接隊(duì)列肄程。
當(dāng)客戶端退出或異常關(guān)閉時(shí)锣吼,從該隊(duì)列中刪除,在recv_cb()函數(shù)中判斷客戶端關(guān)閉狀態(tài)蓝厌。recv_cb()函數(shù)在接收到客戶端發(fā)送的數(shù)據(jù)時(shí)被調(diào)用玄叠,當(dāng)返回的nread<0時(shí),表明連接狀態(tài)發(fā)生異常變化拓提。
2.4實(shí)時(shí)監(jiān)聽管道
在main()函數(shù)中構(gòu)建管道監(jiān)聽服務(wù):
int main(int argc,char **argv) {
…
uv_pipe_t pipe;
uv_fs_t file_req;
int fd=uv_fs_open(loop,&file_req,argv[2],O_CREAT|O_RDWR,0644,NULL);
uv_pipe_init(loop, &pipe, 0);
uv_pipe_open(&pipe,fd);
uv_read_start((uv_stream_t*)&pipe,alloc_buffer,read_pipe);
return uv_run(loop, UV_RUN_DEFAULT);
}
當(dāng)管道中出現(xiàn)新數(shù)據(jù)時(shí)读恃,libuv會(huì)異步調(diào)用read_pipe()函數(shù)。
void read_pipe(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
if (nread <0) {
if (nread != UV_EOF)
LOGE("Read pipe error "<<uv_err_name(nread)<<"\r\n");
uv_close((uv_handle_t*)stream, NULL);
if (buf->base)
free(buf->base);
//free(buf);
}
else {
if (nread > 0) {
// 此處進(jìn)行Socket轉(zhuǎn)發(fā)
}
else{
/* Everything OK, but nothing read. */
if(buf->base)
free(buf->base);
}
}
}
注意代态,當(dāng)管道讀取正常但無數(shù)據(jù)時(shí)狐粱,需要使用free(buf->base)清空緩存,因?yàn)樵趌ibuv的文檔中有這么一句:
Note
nread might be 0, which does not indicate anerror or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under read(2).
2.5Socket數(shù)據(jù)轉(zhuǎn)發(fā)
在管道監(jiān)聽回調(diào)函數(shù)read_pipe()中胆数,向所有連接的Socket客戶端轉(zhuǎn)發(fā)管道中的數(shù)據(jù)肌蜻。
int curmcount=0;
for(int i=0;i<client_pool.size();i++)
{
write_req_t *req=(write_req_t*)malloc(sizeof(write_req_t));
req->buf=uv_buf_init(buf->base,nread);
curmcount++;
uv_write((uv_write_t *)req, client_pool[i], &req->buf, 1, echo_write);
}
if(curmcount>0){
M[buf->base] = curmcount;
}
else
{
/* Everything OK, no client. */
if(buf->base)
free(buf->base);
}
每次從管道獲得數(shù)據(jù)時(shí),當(dāng)客戶端數(shù)量大于0時(shí)必尼,遍歷客戶端列表蒋搜,依次調(diào)用uv_write()進(jìn)行數(shù)據(jù)發(fā)送。因?yàn)閘ibuv是異步工作機(jī)制判莉,我們必須確保寫數(shù)據(jù)緩存一直有效豆挽,直到寫操作成功返回。所以我們應(yīng)該在寫操作的回調(diào)函數(shù)echo_write()中刪除當(dāng)前寫數(shù)據(jù)緩存券盅,而不是在調(diào)用uv_write()之后立即刪除帮哈,這就需要我們進(jìn)行緩存管理。
我們使用一個(gè)std::map M來存儲(chǔ)每一個(gè)寫數(shù)據(jù)緩存及其使用計(jì)數(shù)的映射锰镀,通過echo_write()中計(jì)算調(diào)用次數(shù)來判斷是否完成了所有客戶端的異步發(fā)送工作娘侍。在echo_write()每次調(diào)用時(shí)咖刃,所對(duì)應(yīng)的寫數(shù)據(jù)緩存使用計(jì)數(shù)減一,當(dāng)使用計(jì)數(shù)降為0時(shí)憾筏,刪除緩存數(shù)據(jù)嚎杨。
map<char*,int> M;
void echo_write(uv_write_t *req, int status) {
if (status) {
fprintf(stderr, "Write error %s\n", uv_strerror(status));
}
M[((write_req_t *)req)->buf.base]--;
write_req_t *wr=(write_req_t *)req;
if(M[((write_req_t *)req)->buf.base]==0)
{
auto pos=M.find(((write_req_t *)req)->buf.base);
if(pos!=M.end())
M.erase(pos);
free(wr->buf.base);
}
free(wr);
}
當(dāng)沒有客戶端連接時(shí),同樣需要直接使用free(buf->base)清空緩存氧腰。
由于std::map會(huì)不斷的擴(kuò)大自身容器空間(非容器所存儲(chǔ)的元素?cái)?shù)量)枫浙,通過erase()可以縮減容器空間,也可以通過swap()來一次性釋放:
if (M.size() == 0){
map<char*, int> temp;
M.swap(temp);
}
2.6已知問題
當(dāng)某些Socket客戶端接收數(shù)據(jù)非常緩慢時(shí)古拴,會(huì)導(dǎo)致M隊(duì)列持續(xù)增長(zhǎng)箩帚,出現(xiàn)內(nèi)存持續(xù)增長(zhǎng)問題。本文中不提供該問題的解決方式黄痪。
3使用Boost.Asio
3.1簡(jiǎn)介
名字本身就說明了一切:Asio意即異步輸入/輸出膏潮。該庫(kù)可以讓C++異步地處理數(shù)據(jù),且平臺(tái)獨(dú)立满力。
使用Boost.Asio進(jìn)行異步數(shù)據(jù)處理的應(yīng)用程序基于兩個(gè)概念:I/O服務(wù)和I/O對(duì)象焕参。I/O服務(wù)抽象了操作系統(tǒng)的接口,允許第一時(shí)間進(jìn)行異步數(shù)據(jù)處理油额,而I/O對(duì)象則用于初始化特定的操作叠纷。 鑒于Boost.Asio只提供了一個(gè)名為boost::asio::io_service的類作為I/O服務(wù),Boost.Asio使用io_service同操作系統(tǒng)的輸入/輸出服務(wù)進(jìn)行交互潦嘶,它針對(duì)所支持的每一個(gè)操作系統(tǒng)都分別實(shí)現(xiàn)了優(yōu)化的類涩嚣,另外庫(kù)中還包含了針對(duì)不同I/O對(duì)象的幾個(gè)類。 其中掂僵,類boost::asio::ip::tcp::socket用于通過網(wǎng)絡(luò)發(fā)送和接收數(shù)據(jù)航厚,而類boost::asio::deadline_timer則提供了一個(gè)計(jì)時(shí)器,用于測(cè)量某個(gè)固定時(shí)間點(diǎn)到來或是一段指定的時(shí)長(zhǎng)過去了锰蓬。
不同于LibUV服務(wù)僅有一個(gè)線程的結(jié)構(gòu)幔睬。在本程序中,使用兩個(gè)線程芹扭,分別進(jìn)行管道讀取和Socket服務(wù)麻顶,后者為主線程,Asio的I/O服務(wù)運(yùn)行在其中舱卡。
3.2 初始化Asio
首先辅肾,程序至少需要一個(gè)io_service實(shí)例。通常一個(gè)io_service的實(shí)例就足夠了轮锥。在本程序中矫钓,使用一個(gè)全局的I/O服務(wù)對(duì)象:io_service:
boost::asio::io_service io_service;
// ...
//其他代碼
// ...
int main(int argc,char** argv)
{
// ...
return0;
}
3.2.1 I/O服務(wù)池vs I/O線程池
有時(shí)候?yàn)榱颂幚砟承╅L(zhǎng)時(shí)間任務(wù),需要使用多個(gè)I/O線程。
有兩種I/O服務(wù)與I/O線程的對(duì)應(yīng)關(guān)系:
·第一種新娜,僅使用一個(gè)I/O服務(wù)赵辕,同時(shí)使用多個(gè)I/O線程,即多個(gè)線程同時(shí)調(diào)用io_service::run()函數(shù):
vector<boost::shared_ptr<boost::thread> > threads;
void Start()
{
for (int i = 0; i != nThreads; ++i)
{
boost::shared_ptr pTh(new boost::thread(
boost::bind(&boost::asio::io_service::run, &io_service)));
threads.push_back(pTh);
}
}
此時(shí)杯活,io_service.post()異步運(yùn)行的方法可能會(huì)在線程池中的任意一個(gè)線程中被調(diào)用匆帚。如果其中涉及到資源同步問題熬词,要么進(jìn)行同步訪問控制旁钧,要么使用strand[1],確保異步調(diào)用的方法被順序執(zhí)行互拾⊥峤瘢或者,為了避免這種問題颜矿,可以采用第二種方式寄猩。
·第二種,使用與線程同樣數(shù)量的I/O服務(wù)對(duì)象骑疆,
vector<boost::shared_ptr<boost::thread> > threads;
vector<boost::shared_ptr<io_service> >io_services;
void Start()
{
for (int i = 0; i != nThreads; ++i)
{
boost::shared_ptr pTh(new boost::thread(
boost::bind(&boost::asio::io_service::run, &io_services[i])));
threads.push_back(pTh);
}
}
此時(shí)田篇,每一個(gè)io_service的run()函數(shù)僅被一個(gè)線程調(diào)用。
3.3建立Socket服務(wù)器
然后你指定你想要監(jiān)聽的端口箍铭,再創(chuàng)建一個(gè)接收器——一個(gè)用來接收客戶端連接的對(duì)象(boost::ip::tcp::acceptor)泊柬,我們將io_service的對(duì)象引用以及接收器封裝在類server中,通過start_accept()函數(shù)啟動(dòng)監(jiān)聽诈火,并調(diào)用io_service_.run()啟動(dòng)Asio的I/O服務(wù)事件循環(huán)(類似于LibUV的event-loop)兽赁。
同樣,為了維護(hù)客戶端連接狀態(tài)冷守,使用了一個(gè)數(shù)組來保存客戶端列表:
vector<boost::shared_ptr<session> >client_pool;
然后刀崖,定義一個(gè)類server來封裝服務(wù)器的處理邏輯:
class server
{
public:
server(boost::asio::io_service &io_service, string IP, short port) :io_service_(io_service), acceptor_(io_service,tcp::endpoint(address::from_string(IP), port))
{
}
void run()
{
start_accept();
io_service_.run();
}
void start_accept()
{
boost::shared_ptr<session> new_session(new session(io_service_));
acceptor_.async_accept(new_session->socket(), boost::bind(&server::handle_accept, this, new_session, boost::asio::placeholders::error));
}
void handle_accept(boost::shared_ptr<session> new_session, const boost::system::error_code &error)
{
start_accept();
if (!error)
{
client_pool.push_back(new_session);
return;
}
}
// start_send() ...
private:
boost::asio::io_service &io_service_;
tcp::acceptor acceptor_;
};
server* g_s;
類server的start_accept()函數(shù)中,首先創(chuàng)建一個(gè)虛擬的Socket對(duì)象拍摇,通過aysnc_accept()函數(shù)來等待客戶端的連接亮钦。然后當(dāng)一個(gè)連接建立時(shí),會(huì)異步調(diào)用handle_accept()函數(shù)充活。此時(shí)或悲,一個(gè)簡(jiǎn)單的程序可能會(huì)建立一個(gè)線程,在線程中執(zhí)行一些簡(jiǎn)單的指令堪唐,例如:
// sock為剛剛從接收器中得到的新的客戶端連接對(duì)象
boost::thread(boost::bind(client_session, sock));
// ...
void client_session(socket_ptr sock) {
while (true) {
char data[512];
size_t len = sock->read_some(buffer(data));
if (len > 0)
write(*sock, buffer("ok", 2));
}
}
而在我們的程序中巡语,我們僅僅是將客戶端連接封裝為一個(gè)類session對(duì)象,放入客戶端隊(duì)列中淮菠,當(dāng)從管道接收到數(shù)據(jù)時(shí)男公,才會(huì)真正觸發(fā)與客戶端通信的指令。
然后,在main()函數(shù)中初始化服務(wù)器:
int main(int argc, char **argv)
{
sscanf(argv[1],"%d",&DEFAULT_PORT);
g_s=new server(io_service, "0.0.0.0", DEFAULT_PORT);
g_s->run();
return 0;
}
我們?cè)趍ain()函數(shù)主線程中調(diào)用Server::run()函數(shù)枢赔,后者調(diào)用了io_service.run()澄阳,也就是說,本程序的I/O服務(wù)運(yùn)行在主線程中踏拜。
3.4維護(hù)Socket連接(class session)
使用類session來封裝一個(gè)遠(yuǎn)程客戶端連接碎赢,包含一個(gè)socket對(duì)象和一個(gè)發(fā)送緩沖區(qū),以及各個(gè)異步回調(diào)函數(shù)的定義速梗。
class session :public boost::enable_shared_from_this<session>
{
public:
session(boost::asio::io_service &io_service) :socket_(io_service)
{
}
tcp::socket &socket()
{
return socket_;
}
void start_write()
{
//...
}
private:
void handle_read(const boost::system::error_code &error, size_t bytes_transferred)
{
if (error) return;
}
void handle_write(const boost::system::error_code &error)
{
//...
}
tcp::socket socket_;
boost::asio::streambuf send_buffer;
};
由于僅使用了一個(gè)I/O服務(wù)和一個(gè)(默認(rèn)的)I/O服務(wù)線程肮塞,客戶端列表在該線程中維護(hù),無需線程間同步訪問控制姻锁。
3.5實(shí)時(shí)監(jiān)聽管道
利用額外的線程枕赵,通過Linux自身的文件接口訪問管道,并同步監(jiān)聽位隶。從管道中讀取新數(shù)據(jù)后拷窜,通過I/O服務(wù)的post()函數(shù),將接收到的數(shù)據(jù)從管道線程發(fā)送到I/O服務(wù)線程涧黄。
char *pipe_name;
void read_pipe()
{
int fd=open(pipe_name,O_RDONLY);
if(fd<0)
{
printf("Fail to open mybuf: %s .\n",strerror(errno));
return;
}
int nbytes=0;
char *buf=(char *)malloc(suggested_size);
while(true)
{
nbytes=read(fd,buf,suggested_size);
if(nbytes<=0)
{
continue;
}
boost::shared_ptr<string> msg(new string(buf,nbytes));
io_service.post(boost::bind(&server::start_send, g_s, msg));
}
free(buf);
}
int main(int argc, char **argv)
{
//...
sscanf(argv[1],"%d",&DEFAULT_PORT);
pipe_name=argv[2];
boost::thread(&read_pipe);
g_s=new server(io_service, "0.0.0.0", DEFAULT_PORT);
g_s->run();
return 0;
}
Boost.Asio可以讓我們異步地運(yùn)行任何方法篮昧。僅僅需要使用post():
void my_func() {
...
}
io_service.post(my_func);
這樣就可以保證my_func在調(diào)用了io_service.run()方法的某一個(gè)線程中被調(diào)用(參見“I/O服務(wù)池vs I/O線程池”說明)。在我們的程序中笋妥,server::start_send()函數(shù)將會(huì)在主線程中執(zhí)行(因?yàn)閞un()在主線程被調(diào)用)懊昨。
3.6 Socket數(shù)據(jù)轉(zhuǎn)發(fā)
3.6.1 async_write與async_write_some
異步TCP通訊使用async_write與async_write_some進(jìn)行數(shù)據(jù)發(fā)送。async_write_some發(fā)送一段數(shù)據(jù)挽鞠,但是可能最終僅發(fā)送出一部分?jǐn)?shù)據(jù)疚颊,async_write內(nèi)部(可能多次)調(diào)用async_write_some來確保所有數(shù)據(jù)發(fā)送成功。
不同操作系統(tǒng)環(huán)境下async_write_some的執(zhí)行過程可能不一樣信认。在Windows下材义,async_write_some會(huì)將數(shù)據(jù)發(fā)送請(qǐng)求交給Windows操作系統(tǒng)內(nèi)核IOCP服務(wù),內(nèi)核執(zhí)行成功之后執(zhí)行回調(diào)嫁赏。如過先后多次執(zhí)行async_write而不是等待回調(diào)之后依次再執(zhí)行其掂,可能會(huì)導(dǎo)致最終發(fā)送數(shù)據(jù)的前后順序出現(xiàn)錯(cuò)誤。
3.6.2程序中代碼實(shí)現(xiàn)
為了確保數(shù)據(jù)發(fā)送順序的正確性潦蝇,我們必須在每一次async_write執(zhí)行之后款熬,在其回調(diào)函數(shù)中繼續(xù)下一次async_write操作。所以我們建立了一個(gè)全局消息隊(duì)列攘乒,針對(duì)每個(gè)客戶端連接各自維護(hù)一個(gè)消息隊(duì)列贤牛。
map<session*, queue<boost::shared_ptr<string> > >msgQueues;
class server
{
// ...
void start_send(boost::shared_ptr<string> &msg)
{
int csize=client_pool.size();
for(int i=0;i<csize;i++)
{
msgQueues[client_pool[i].get()].push(msg);
if(msgQueues[client_pool[i].get()].size()==1)
{
client_pool[i]->start_write();
}
}
}
// ...
}
管道中收到的消息通過server::start_send()加入到各個(gè)客戶端的消息隊(duì)列中,當(dāng)且僅當(dāng)消息隊(duì)列長(zhǎng)度為1(即原隊(duì)列為空)则酝,再去觸發(fā)消息隊(duì)列的發(fā)送殉簸,確保后續(xù)各個(gè)客戶端的每一條消息都是在回調(diào)流程中順序發(fā)送的。
發(fā)往客戶端的數(shù)據(jù)處理:
class session
{
// ...
void start_write(){
if(msgQueues[this].size()>0)
{
std::ostream stream(&send_buffer);
string msg(*msgQueues[this].front());
stream<<msg;
boost::asio::async_write(socket_,send_buffer,boost::bind(&session::handle_write,shared_from_this(),boost::asio::placeholders::error));
}
}
// ...
void handle_write(const boost::system::error_code &error)
{
//隊(duì)列頭發(fā)送完成后刪除
msgQueues[this].pop();
if(error)
{
//清空消息隊(duì)列
auto cur=msgQueues.find(this);
if(cur!=msgQueues.end())
{
while(!msgQueues[this].empty())
{
msgQueues[this].pop();
}
msgQueues.erase(cur);
}
//客戶端寫錯(cuò)誤 刪除客戶端
for(auto p=client_pool.begin();p!=client_pool.end();p++)
{
if(p->get()==this)
{
client_pool.erase(p);
break;
}
}
}
// 繼續(xù)處理下一條消息
start_write();
}
// ...
}
4 性能比較
依據(jù)參考資料[2]的測(cè)試分析,如果要進(jìn)行性能測(cè)試般卑,應(yīng)該選擇合適的評(píng)比基準(zhǔn)武鲁。
對(duì)于LibUV和Boost.Asio,兩者在Windows下都是使用IOCP完成的蝠检,所以其實(shí)最終的性能確實(shí)應(yīng)該是相當(dāng)?shù)你迨螅蛟S最重要的還是先知道兩者實(shí)現(xiàn)上的異同吧。
參考資料
[1]參考http://www.crazygaze.com/blog/2016/03/17/how-strands-work-and-why-you-should-use-them/叹谁,以及Remotery(https://github.com/Celtoys/Remotery)饲梭,查看多線程競(jìng)爭(zhēng)資源的運(yùn)行狀態(tài)。
[2] Practical difference between epoll and Windows IO Completion Ports (IOCP)
[3] 征服優(yōu)雅本慕、高效的Libuv庫(kù)之初識(shí)篇