完成例程和完成端口
注:重疊IO有三種實(shí)現(xiàn)方式:完成事件瞧剖、完成例程、完成端口可免,完成事件和WSAEventSelect模型差不多抓于,也都限制于64個(gè)事件(可用多個(gè)線程擴(kuò)展),這里就不再敘述
項(xiàng)目地址
1.完成例程和完成端口都是重疊IO模型浇借,它們都是異步模型捉撮。
用一個(gè)形象的比喻:打印店打印紙張。同步非阻塞模型(例如Select)是輪到你了就叫你過來妇垢,其他時(shí)間你可以去干別的事情巾遭。異步模型是店員幫你打印好了再叫你過來,其他時(shí)間任你安排闯估。
2.不同點(diǎn):
2.1完成例程需要自己控制線程的執(zhí)行順序灼舍,而完成端口是由操作系統(tǒng)幫你完成。
完成例程的流程一般是:
2.1.1使用一個(gè)線程專門用于接收連接(WSAAccept)涨薪,WSAAccept是阻塞函數(shù)骑素。
2.1.2接收到Socket之后開啟一個(gè)線程開始接收數(shù)據(jù)(WSARecv)。完成例程一般使用SleepEx函數(shù)休眠刚夺,以等待OS執(zhí)行完畢献丑,完成事件使用WSAWaitForMultipleEvents函數(shù)。這里使用線程池來減少創(chuàng)建銷毀線程的次數(shù)侠姑。
2.2完成端口不需要自己管理線程的執(zhí)行順序创橄。
2.2.1完成端口首先需要使用CreateIoCompletionPort函數(shù)創(chuàng)建一個(gè)CompletionPort,F(xiàn)ileHandle參數(shù)可以為文件句柄或者Socket莽红,這里首次創(chuàng)建時(shí)使用INVALID_HANDLE_VALUE妥畏。調(diào)用該函數(shù)操作系統(tǒng)會(huì)將該設(shè)備句柄添加到設(shè)備列表。返回的句柄需要保存,接下來會(huì)使用到咖熟。
2.2.2接下來創(chuàng)建一些工作線程圃酵,一般為cput的兩倍即可,CreateIoCompletionPort最后一個(gè)參數(shù)可以指定線程數(shù)馍管,默認(rèn)為cpu數(shù)郭赐。每個(gè)線程都是一個(gè)死循環(huán),在循環(huán)內(nèi)第一條語句調(diào)用GetQueuedCompletionStatus函數(shù)确沸,該函數(shù)會(huì)讓操作系統(tǒng)將該線程壓入到等待線程隊(duì)列中捌锭。該隊(duì)列是LIFO。當(dāng)I/O完成隊(duì)列非空罗捎,且工作線程并未超出總的并發(fā)數(shù)時(shí)观谦,系統(tǒng)從等待線程隊(duì)列中取出線程。該線程將從CreateIoCompletionPort函數(shù)后繼續(xù)執(zhí)行桨菜。
2.2.3接受連接可以使用WSAAccept或者AcceptEx(異步)豁状,和其他IO模型一樣,接受到連接后使用WSARecv接受數(shù)據(jù)倒得,這里就不需要使用SleepEx休眠了泻红。當(dāng)某個(gè)IO事件完成時(shí)OS會(huì)選擇一個(gè)線程執(zhí)行,所以使用WSARecv時(shí)需要傳入一些參數(shù)表示這個(gè)是Read類型霞掺。
2.2.4GetQueuedCompletionStatus函數(shù)的lpCompletionKey參數(shù)可供用戶自身使用谊路,LPOVERLAPPED作為重疊IO必須使用的結(jié)構(gòu)體也可利用傳參。LPOVERLAPPED結(jié)構(gòu)體的hEvent參數(shù)是事件模型所需要的菩彬,可作為一個(gè)指針傳參缠劝。這里還有一個(gè)技巧,使用CONTAINING_RECORD可以根據(jù)一個(gè)結(jié)構(gòu)體的第一個(gè)參數(shù)參數(shù)找到這個(gè)結(jié)構(gòu)體骗灶,但這個(gè)參數(shù)必須是該結(jié)構(gòu)體的第一個(gè)參數(shù)惨恭。
2.2.5除了當(dāng)IO操作完成時(shí)OS時(shí)投遞完成包,用戶還可使用PostQueuedCompletionStatus在IOCP上投寄一個(gè)完成包耙旦。當(dāng)需要釋放所有線程時(shí)只需要使用PostQueuedCompletionStatus函數(shù)傳入一個(gè)約定好的信號(hào)喉恋,即可讓線程退出。
完成端口核心代碼
void TService::InitCompletionPort()
{
SYSTEM_INFO sys;
GetSystemInfo(&sys);
int process = sys.dwNumberOfProcessors;
threadNum = 2 * process;
m_portWorks = new HANDLE[threadNum]; //兩倍cpu數(shù)線程
//將該設(shè)備句柄添加到設(shè)備列表中
m_completionPortHandle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, threadNum);
if (m_completionPortHandle == nullptr)
{
OnError("create completion port error");
return;
}
for (int i = 0; i < threadNum; i++)
{
m_portWorks[i] = ::CreateThread(0, 0, CompletionPortWord, m_completionPortHandle, 0, nullptr);//端口作為線程參數(shù)
}
ThreadPool::GetInstance().commit([](void* p)
{
while (true) //接收連接線程母廷,可使用AcceptEx異步接收連接
{
TService& service = *Word::GetInstance().GetManager<TService>();
service.clientSock = WSAAccept(service.listenSocket, (sockaddr*)&service.clientAddr, &service.addrSize, nullptr, NULL);
IOContext* ioContext = new IOContext(service.clientSock);
service.AcceptComplete(*ioContext);
}
},nullptr);
}
DWORD __stdcall TService::CompletionPortWord(LPVOID IpParm) //工作線程
{
HANDLE portHandle = IpParm; //重疊端口
DWORD lpNumberOfBytesRecvd; //
OVERLAPPED* IpOverlapped;
TService* service;
IOContext* iOContext;
while (true)
{
BOOL bReturn =GetQueuedCompletionStatus(portHandle, &lpNumberOfBytesRecvd, (LPDWORD)&service, &IpOverlapped, INFINITE); //加入就緒棧轻黑,內(nèi)核會(huì)選擇一個(gè)線程執(zhí)行(FIFO)
//先計(jì)算出IpOverlapped地址的偏移量,然后根據(jù)特定的變量地址找到結(jié)構(gòu)體地址,IpOverlapped需要作為第一個(gè)參數(shù)
iOContext = (IOContext*)CONTAINING_RECORD(IpOverlapped, IOContext, m_overlapped);
if (iOContext->m_Type==OperationType::ExitPosted) //線程退出標(biāo)志
{
break;
}
if (bReturn == false) //出現(xiàn)錯(cuò)誤
{
break;
}
if (lpNumberOfBytesRecvd == 0)
{
CloseHandle((HANDLE)iOContext->m_sockAccept);
service->RemoveChannel(iOContext->m_sockAccept);
continue;
}
switch (iOContext->m_Type)
{
case OperationType::AcceptPosted:
service->AcceptComplete(*iOContext);
case OperationType::RecvPosted:
service->RecvComplete(*iOContext);
break;
case OperationType::SendPosted:
service->SendComplete(*iOContext);
break;
}
}
return 0;
}
void TService::PostRecv(IOContext & ioContext)
{
ioContext.m_Type = OperationType::RecvPosted;
WSARecv(ioContext.m_sockAccept, &ioContext.m_wsaBuf, 1, &ioContext.m_numOfBytes, &ioContext.m_flags, &ioContext.m_overlapped, nullptr);
}
void TService::PostExit()
{
IOContext context(listenSocket);
context.m_Type = OperationType::ExitPosted;
for (int i = 0; i < threadNum; i++)
{
//每個(gè)線程都會(huì)收到一個(gè)釋放信號(hào)
PostQueuedCompletionStatus(m_completionPortHandle, 1, (ULONG_PTR)this, (OVERLAPPED*)&context);
}
}
void TService::AcceptComplete(IOContext & ioContext)
{
SOCKET sock = this->clientSock;
HANDLE handle = CreateIoCompletionPort((HANDLE)sock, this->m_completionPortHandle, (DWORD)this, 0); //加入端口
TChannel* channel = (TChannel*)this->AddChannel(this->clientSock);
PostRecv(ioContext);
}