#include <WinSock2.h>
#include <iostream>
#include <vector>
#include <iostream>
#include <process.h>
#include <MSWSock.h>
#include <Windows.h>
#include <vector>
using namespace std;
#define _AFXDLL
#pragma comment(lib, "Ws2_32.lib") // Socket編程需用的動態(tài)鏈接庫
#pragma comment(lib, "Kernel32.lib") // IOCP需要用到的動態(tài)鏈接庫
//#include <afxtempl.h>
#define MAX_BUFFER_LEN 2048
#define MAX_POST_ACCEPT 10
// 傳遞給Worker線程的退出信號
#define EXIT_CODE NULL
// 釋放指針和句柄資源的宏
// 釋放指針宏
#define RELEASE(x) {if(x != NULL ){delete x;x=NULL;}}
// 釋放句柄宏
#define RELEASE_HANDLE(x) {if(x != NULL && x!=INVALID_HANDLE_VALUE){ CloseHandle(x);x = NULL;}}
// 釋放Socket宏
#define RELEASE_SOCKET(x) {if(x !=INVALID_SOCKET) { closesocket(x);x=INVALID_SOCKET;}}
// 在完成端口上投遞的I/O操作的類型
typedef enum _OPERATION_TYPE
{
ACCEPT_POSTED, // 標志投遞的Accept操作
SEND_POSTED, // 標志投遞的是發(fā)送操作
RECV_POSTED, // 標志投遞的是接收操作
NULL_POSTED // 用于初始化邮破,無意義
}OPERATION_TYPE;
CRITICAL_SECTION m_csContextList;
HANDLE m_hShutdownEvent;
HANDLE m_hIOCompletionPort; // 完成端口的句柄
const int m_nThreads = 16;//建立對應的線程數(shù)
HANDLE* m_phWorkerThreads = NULL; // 工作者線程的句柄指針
LPFN_ACCEPTEX m_lpfnAcceptEx; // AcceptEx 和 GetAcceptExSockaddrs 的函數(shù)指針,用于調(diào)用這兩個擴展函數(shù)
LPFN_GETACCEPTEXSOCKADDRS m_lpfnGetAcceptExSockAddrs;
typedef struct _tagThreadParams_WORKER
{
int nThreadNo; //線程編號
} THREADPARAMS_WORKER, *PTHREADPARAM_WORKER;
THREADPARAMS_WORKER* pThreadParams[m_nThreads] = { NULL };
unsigned __stdcall _WorkerThread(LPVOID lpParam);
//每次套接字操作(如:AcceptEx, WSARecv, WSASend等)對應的數(shù)據(jù)結(jié)構(gòu):OVERLAPPED結(jié)構(gòu)(標識本次操作)仆救,關聯(lián)的套接字抒和,緩沖區(qū),操作類型彤蔽;
typedef struct _PER_IO_CONTEXT
{
OVERLAPPED m_Overlapped; // 每一個重疊網(wǎng)絡操作的重疊結(jié)構(gòu)(針對每一個Socket的每一個操作摧莽,都要有一個)
SOCKET m_sockAccept; // 這個網(wǎng)絡操作所使用的Socket
WSABUF m_wsaBuf; // WSA類型的緩沖區(qū),用于給重疊操作傳參數(shù)的
char m_szBuffer[MAX_BUFFER_LEN]; // 這個是WSABUF里具體存字符的緩沖區(qū)
OPERATION_TYPE m_OpType; // 標識網(wǎng)絡操作的類型(對應上面的枚舉)
DWORD m_nTotalBytes; //數(shù)據(jù)總的字節(jié)數(shù)
DWORD m_nSendBytes; //已經(jīng)發(fā)送的字節(jié)數(shù)顿痪,如未發(fā)送數(shù)據(jù)則設置為0
//構(gòu)造函數(shù)
_PER_IO_CONTEXT()
{
ZeroMemory(&m_Overlapped, sizeof(m_Overlapped));
ZeroMemory(m_szBuffer, MAX_BUFFER_LEN);
m_sockAccept = INVALID_SOCKET;
m_wsaBuf.buf = m_szBuffer;
m_wsaBuf.len = MAX_BUFFER_LEN;
m_OpType = NULL_POSTED;
m_nTotalBytes = 0;
m_nSendBytes = 0;
}
//析構(gòu)函數(shù)
~_PER_IO_CONTEXT()
{
if (m_sockAccept != INVALID_SOCKET)
{
closesocket(m_sockAccept);
m_sockAccept = INVALID_SOCKET;
}
}
//重置緩沖區(qū)內(nèi)容
void ResetBuffer()
{
ZeroMemory(m_szBuffer, MAX_BUFFER_LEN);
m_wsaBuf.buf = m_szBuffer;
m_wsaBuf.len = MAX_BUFFER_LEN;
}
} PER_IO_CONTEXT, *PPER_IO_CONTEXT;
typedef struct _PER_SOCKET_CONTEXT
{
SOCKET m_Socket; //連接客戶端的socket
SOCKADDR_IN m_ClientAddr; //客戶端地址
vector<_PER_IO_CONTEXT*> m_arrayIoContext; //套接字操作镊辕,本例是WSARecv和WSASend共用一個PER_IO_CONTEXT
//構(gòu)造函數(shù)
_PER_SOCKET_CONTEXT()
{
m_Socket = INVALID_SOCKET;
memset(&m_ClientAddr, 0, sizeof(m_ClientAddr));
}
//析構(gòu)函數(shù)
~_PER_SOCKET_CONTEXT()
{
if (m_Socket != INVALID_SOCKET)
{
closesocket(m_Socket);
m_Socket = INVALID_SOCKET;
}
// 釋放掉所有的IO上下文數(shù)據(jù)
for (int i = 0; i < m_arrayIoContext.size(); i++)
{
delete m_arrayIoContext.at(i);
}
m_arrayIoContext.clear();
}
//進行套接字操作時,調(diào)用此函數(shù)返回PER_IO_CONTEX結(jié)構(gòu)
_PER_IO_CONTEXT* GetNewIoContext()
{
_PER_IO_CONTEXT* p = new _PER_IO_CONTEXT;
m_arrayIoContext.push_back(p);
return p;
}
// 從數(shù)組中移除一個指定的IoContext
void RemoveContext(_PER_IO_CONTEXT* pContext)
{
for (int i = 0; i < m_arrayIoContext.size(); i++)
{
if (pContext == m_arrayIoContext.at(i))
{
delete pContext;
pContext = NULL;
m_arrayIoContext.erase(m_arrayIoContext.begin() + i);
break;
}
}
}
} PER_SOCKET_CONTEXT, *PPER_SOCKET_CONTEXT;
PER_SOCKET_CONTEXT* m_pListenContext; // 用于監(jiān)聽的Socket的Context信息
vector<PER_SOCKET_CONTEXT*>m_arrayClientContext; // 客戶端Socket的Context信息
void _DeInitialize();
bool _PostAccept(PER_IO_CONTEXT* pAcceptIoContext);
void _RemoveContext(PER_SOCKET_CONTEXT *pSocketContext);
bool _IsSocketAlive(SOCKET s);
bool _DoAccpet(PER_SOCKET_CONTEXT* pSocketContext, PER_IO_CONTEXT* pIoContext);
bool _DoFirstRecvWithoutData(PER_IO_CONTEXT* pIoContext);
void _AddToContextList(PER_SOCKET_CONTEXT *pHandleData);
bool PostRecv(PER_IO_CONTEXT* pIoContext);
bool _AssociateWithIOCP(PER_SOCKET_CONTEXT *pContext);
bool PostWrite(PER_IO_CONTEXT* pIoContext);
bool _DoFirstRecvWithData(PER_IO_CONTEXT* pIoContext);
bool _DoRecv(PER_SOCKET_CONTEXT* pSocketContext, PER_IO_CONTEXT* pIoContext);
bool HandleError(PER_SOCKET_CONTEXT *pContext, const DWORD& dwErr);
void _ClearContextList();
void stop();
int main() {
WSADATA wsaData;
int nResult;
nResult = WSAStartup(MAKEWORD(2, 2), &wsaData);
// 錯誤(一般都不可能出現(xiàn))
if (NO_ERROR != nResult)
{
cout << "初始化winsock失敗" << endl;
return 0;
}
// 初始化線程互斥量
InitializeCriticalSection(&m_csContextList);
// 建立系統(tǒng)退出的事件通知
m_hShutdownEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
m_hIOCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (NULL == m_hIOCompletionPort)
{
cout << ("建立完成端口失斠舷征懈!錯誤代碼: %d!\n") << endl;
return false;
}
// 為工作者線程初始化句柄
m_phWorkerThreads = new HANDLE[m_nThreads];
// 根據(jù)計算出來的數(shù)量建立工作者線程
for (int i = 0; i < m_nThreads; i++)
{
pThreadParams[i] = new THREADPARAMS_WORKER;
pThreadParams[i]->nThreadNo = i + 1;
m_phWorkerThreads[i] = (HANDLE)_beginthreadex(0, 0, _WorkerThread, (pThreadParams[i]), 0, NULL);
}
cout << " 建立 _WorkerThread %d 個." << m_nThreads << endl;
// AcceptEx 和 GetAcceptExSockaddrs 的GUID,用于導出函數(shù)指針
GUID GuidAcceptEx = { 0xb5367df1,0xcbac,0x11cf,{0x95,0xca,0x00,0x80,0x5f,0x48,0xa1,0x92} };
GUID GuidGetAcceptExSockAddrs = { 0xb5367df2,0xcbac,0x11cf,{0x95,0xca,0x00,0x80,0x5f,0x48,0xa1,0x92} };
// 服務器地址信息揩悄,用于綁定Socket
struct sockaddr_in ServerAddress;
// 生成用于監(jiān)聽的Socket的信息
m_pListenContext = new PER_SOCKET_CONTEXT;
// 需要使用重疊IO卖哎,必須得使用WSASocket來建立Socket,才可以支持重疊IO操作
m_pListenContext->m_Socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
if (INVALID_SOCKET == m_pListenContext->m_Socket)
{
cout << "初始化Socket失敗,錯誤代碼: %d.\n" << WSAGetLastError() << endl;
return false;
}
else
{
cout << ("WSASocket() 完成.\n") << endl;;
}
// 將Listen Socket綁定至完成端口中
if (NULL == CreateIoCompletionPort((HANDLE)m_pListenContext->m_Socket, m_hIOCompletionPort, (DWORD)m_pListenContext, 0))
{
cout << "綁定 Listen Socket至完成端口失斂髂取焕窝!錯誤代碼: %d/n" << WSAGetLastError() << endl;
RELEASE_SOCKET(m_pListenContext->m_Socket);
return false;
}
else
{
cout << "Listen Socket綁定完成端口 完成.\n" << endl;
}
// 填充地址信息
ZeroMemory((char *)&ServerAddress, sizeof(ServerAddress));
ServerAddress.sin_family = AF_INET;
// 這里可以綁定任何可用的IP地址,或者綁定一個指定的IP地址
//ServerAddress.sin_addr.s_addr = htonl(INADDR_ANY);
ServerAddress.sin_addr.s_addr = inet_addr("127.0.0.1");
ServerAddress.sin_port = htons(6000);
// 綁定地址和端口
if (SOCKET_ERROR == bind(m_pListenContext->m_Socket, (struct sockaddr *) &ServerAddress, sizeof(ServerAddress)))
{
cout << "bind()函數(shù)執(zhí)行錯誤.\n" << endl;
return false;
}
else
{
cout << "bind() 完成.\n" << endl;
}
// 開始進行監(jiān)聽
if (SOCKET_ERROR == listen(m_pListenContext->m_Socket, SOMAXCONN))
{
cout << "Listen()函數(shù)執(zhí)行出現(xiàn)錯誤.\n" << endl;
return false;
}
else
{
cout << "Listen() 完成.\n" << endl;
}
// 使用AcceptEx函數(shù)照藻,因為這個是屬于WinSock2規(guī)范之外的微軟另外提供的擴展函數(shù)
// 所以需要額外獲取一下函數(shù)的指針袜啃,
// 獲取AcceptEx函數(shù)指針
DWORD dwBytes = 0;
if (SOCKET_ERROR == WSAIoctl(
m_pListenContext->m_Socket,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx,
sizeof(GuidAcceptEx),
&m_lpfnAcceptEx,
sizeof(m_lpfnAcceptEx),
&dwBytes,
NULL,
NULL))
{
cout << "WSAIoctl 未能獲取AcceptEx函數(shù)指針。錯誤代碼: %d\n" << WSAGetLastError() << endl;
_DeInitialize();
return false;
}
// 獲取GetAcceptExSockAddrs函數(shù)指針幸缕,也是同理
if (SOCKET_ERROR == WSAIoctl(
m_pListenContext->m_Socket,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidGetAcceptExSockAddrs,
sizeof(GuidGetAcceptExSockAddrs),
&m_lpfnGetAcceptExSockAddrs,
sizeof(m_lpfnGetAcceptExSockAddrs),
&dwBytes,
NULL,
NULL))
{
cout << "WSAIoctl 未能獲取GuidGetAcceptExSockAddrs函數(shù)指針群发。錯誤代碼: %d\n" << WSAGetLastError() << endl;
_DeInitialize();
return false;
}
// 為AcceptEx 準備參數(shù),然后投遞AcceptEx I/O請求
//創(chuàng)建10個套接字发乔,投遞AcceptEx請求熟妓,即共有10個套接字進行accept操作;
for (int i = 0; i < MAX_POST_ACCEPT; i++)
{
// 新建一個IO_CONTEXT
PER_IO_CONTEXT* pAcceptIoContext = m_pListenContext->GetNewIoContext();
if (false == _PostAccept(pAcceptIoContext))
{
m_pListenContext->RemoveContext(pAcceptIoContext);
return false;
}
}
cout << "投遞 %d 個AcceptEx請求完畢" << MAX_POST_ACCEPT << endl;
WaitForMultipleObjects(m_nThreads, m_phWorkerThreads, 0, INFINITE);
stop();
system("pause");
return 0;
}
unsigned __stdcall _WorkerThread(LPVOID lpParam)
{
THREADPARAMS_WORKER* pParam = (THREADPARAMS_WORKER*)lpParam;
int nThreadNo = (int)pParam->nThreadNo;
cout << "工作者線程啟動栏尚,ID: %d." << nThreadNo << endl;
OVERLAPPED *pOverlapped = NULL;
PER_SOCKET_CONTEXT *pSocketContext = NULL;
DWORD dwBytesTransfered = 0;
//循環(huán)處理請求起愈,直到接收到Shutdown信息為止
while (WAIT_OBJECT_0 != WaitForSingleObject(m_hShutdownEvent, 0))
{
BOOL bReturn = GetQueuedCompletionStatus(m_hIOCompletionPort, &dwBytesTransfered, (PULONG_PTR)&pSocketContext, &pOverlapped, INFINITE);
//接收EXIT_CODE退出標志,則直接退出
if (EXIT_CODE == (DWORD)pSocketContext)
{
break;
}
//返回值為0译仗,表示出錯
if (!bReturn)
{
DWORD dwErr = GetLastError();
// 顯示一下提示信息
if (!HandleError(pSocketContext, dwErr))
{
break;
}
continue;
}
else
{
// 讀取傳入的參數(shù)
PER_IO_CONTEXT* pIoContext = CONTAINING_RECORD(pOverlapped, PER_IO_CONTEXT, m_Overlapped);
// 判斷是否有客戶端斷開了
if ((0 == dwBytesTransfered) && (RECV_POSTED == pIoContext->m_OpType || SEND_POSTED == pIoContext->m_OpType))
{
cout << "客戶端 %s:%d 斷開連接." << inet_ntoa(pSocketContext->m_ClientAddr.sin_addr) << ntohs(pSocketContext->m_ClientAddr.sin_port) << endl;
// 釋放掉對應的資源
_RemoveContext(pSocketContext);
continue;
}
else
{
switch (pIoContext->m_OpType)
{
// Accept
case ACCEPT_POSTED:
{
pIoContext->m_nTotalBytes = dwBytesTransfered;
_DoAccpet(pSocketContext, pIoContext);
}
break;
// RECV
case RECV_POSTED:
{
pIoContext->m_nTotalBytes = dwBytesTransfered;
_DoRecv(pSocketContext, pIoContext);
}
break;
case SEND_POSTED:
{
pIoContext->m_nSendBytes += dwBytesTransfered;
if (pIoContext->m_nSendBytes < pIoContext->m_nTotalBytes)
{
//數(shù)據(jù)未能發(fā)送完抬虽,繼續(xù)發(fā)送數(shù)據(jù)
pIoContext->m_wsaBuf.buf = pIoContext->m_szBuffer + pIoContext->m_nSendBytes;
pIoContext->m_wsaBuf.len = pIoContext->m_nTotalBytes - pIoContext->m_nSendBytes;
PostWrite(pIoContext);
}
else
{
PostRecv(pIoContext);
}
}
break;
default:
// 不應該執(zhí)行到這里
cout << "_WorkThread中的 pIoContext->m_OpType 參數(shù)異常.\n" << endl;
break;
} //switch
}//if
}//if
}//while
cout << "工作者線程 %d 號退出.\n" << nThreadNo << endl;
// 釋放線程參數(shù)
RELEASE(lpParam);
return 0;
}
void _DeInitialize()
{
// 刪除客戶端列表的互斥量
DeleteCriticalSection(&m_csContextList);
// 關閉系統(tǒng)退出事件句柄
RELEASE_HANDLE(m_hShutdownEvent);
// 釋放工作者線程句柄指針
for (int i = 0; i < m_nThreads; i++)
{
RELEASE_HANDLE(m_phWorkerThreads[i]);
}
RELEASE(m_phWorkerThreads);
// 關閉IOCP句柄
RELEASE_HANDLE(m_hIOCompletionPort);
// 關閉監(jiān)聽Socket
RELEASE(m_pListenContext);
cout << "釋放資源完畢.\n" << endl;
}
bool _PostAccept(PER_IO_CONTEXT* pAcceptIoContext)
{
// 準備參數(shù)
DWORD dwBytes = 0;
pAcceptIoContext->m_OpType = ACCEPT_POSTED;
WSABUF *p_wbuf = &pAcceptIoContext->m_wsaBuf;
OVERLAPPED *p_ol = &pAcceptIoContext->m_Overlapped;
// 為以后新連入的客戶端先準備好Socket( 這個是與傳統(tǒng)accept最大的區(qū)別 )
pAcceptIoContext->m_sockAccept = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
if (INVALID_SOCKET == pAcceptIoContext->m_sockAccept)
{
cout << "創(chuàng)建用于Accept的Socket失敗纵菌!錯誤代碼: %d" << WSAGetLastError() << endl;
return false;
}
// 投遞AcceptEx
if (FALSE == m_lpfnAcceptEx(m_pListenContext->m_Socket, pAcceptIoContext->m_sockAccept, p_wbuf->buf, p_wbuf->len - ((sizeof(SOCKADDR_IN) + 16) * 2),
sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, &dwBytes, p_ol))
{
if (WSA_IO_PENDING != WSAGetLastError())
{
cout << "投遞 AcceptEx 請求失敗阐污,錯誤代碼: %d" << WSAGetLastError() << endl;
return false;
}
}
return true;
}
bool HandleError(PER_SOCKET_CONTEXT *pContext, const DWORD& dwErr)
{
// 如果是超時了,就再繼續(xù)等吧
if (WAIT_TIMEOUT == dwErr)
{
// 確認客戶端是否還活著...
if (!_IsSocketAlive(pContext->m_Socket))
{
cout << "檢測到客戶端異常退出咱圆!" << endl;
_RemoveContext(pContext);
return true;
}
else
{
cout << "網(wǎng)絡操作超時笛辟!重試中..." << endl;
return true;
}
}
// 可能是客戶端異常退出了
else if (ERROR_NETNAME_DELETED == dwErr)
{
cout << "檢測到客戶端異常退出!" << endl;
_RemoveContext(pContext);
return true;
}
else
{
cout << "完成端口操作出現(xiàn)錯誤序苏,線程退出手幢。錯誤代碼:%d" << dwErr;
return false;
}
}
void _RemoveContext(PER_SOCKET_CONTEXT *pSocketContext)
{
EnterCriticalSection(&m_csContextList);
for (int i = 0; i < m_arrayClientContext.size(); i++)
{
if (pSocketContext == m_arrayClientContext.at(i))
{
RELEASE(pSocketContext);
m_arrayClientContext.erase(m_arrayClientContext.begin() + i);
break;
}
}
LeaveCriticalSection(&m_csContextList);
}
bool _IsSocketAlive(SOCKET s)
{
int nByteSent = send(s, "", 0, 0);
if (-1 == nByteSent)
return false;
return true;
}
bool _DoAccpet(PER_SOCKET_CONTEXT* pSocketContext, PER_IO_CONTEXT* pIoContext)
{
if (pIoContext->m_nTotalBytes > 0)
{
//客戶接入時,第一次接收dwIOSize字節(jié)數(shù)據(jù)
_DoFirstRecvWithData(pIoContext);
}
else
{
//客戶端接入時忱详,沒有發(fā)送數(shù)據(jù)围来,則投遞WSARecv請求,接收數(shù)據(jù)
_DoFirstRecvWithoutData(pIoContext);
}
// 5. 使用完畢之后匈睁,把Listen Socket的那個IoContext重置管钳,然后準備投遞新的AcceptEx
pIoContext->ResetBuffer();
return _PostAccept(pIoContext);
}
bool _DoFirstRecvWithoutData(PER_IO_CONTEXT* pIoContext)
{
//為新接入的套接字創(chuàng)建PER_SOCKET_CONTEXT結(jié)構(gòu),并綁定到完成端口
PER_SOCKET_CONTEXT* pNewSocketContext = new PER_SOCKET_CONTEXT;
SOCKADDR_IN ClientAddr;
int Len = sizeof(ClientAddr);
getpeername(pIoContext->m_sockAccept, (sockaddr*)&ClientAddr, &Len);
pNewSocketContext->m_Socket = pIoContext->m_sockAccept;
memcpy(&(pNewSocketContext->m_ClientAddr), &ClientAddr, sizeof(SOCKADDR_IN));
//將該套接字綁定到完成端口
if (false == _AssociateWithIOCP(pNewSocketContext))
{
RELEASE(pNewSocketContext);
return false;
}
//投遞WSARecv請求软舌,接收數(shù)據(jù)
PER_IO_CONTEXT* pNewIoContext = pNewSocketContext->GetNewIoContext();
//此時是AcceptEx未接收到客戶端第一次發(fā)送的數(shù)據(jù)才漆,所以這里調(diào)用PostRecv,接收來自客戶端的數(shù)據(jù)
if (false == PostRecv(pNewIoContext))
{
pNewSocketContext->RemoveContext(pNewIoContext);
return false;
}
//如果投遞成功佛点,那么就把這個有效的客戶端信息醇滥,加入到ContextList中去(需要統(tǒng)一管理黎比,方便釋放資源)
_AddToContextList(pNewSocketContext);
return true;
}
void _AddToContextList(PER_SOCKET_CONTEXT *pHandleData)
{
EnterCriticalSection(&m_csContextList);
m_arrayClientContext.push_back(pHandleData);
LeaveCriticalSection(&m_csContextList);
}
bool PostRecv(PER_IO_CONTEXT* pIoContext)
{
// 初始化變量
DWORD dwFlags = 0;
DWORD dwBytes = 0;
WSABUF *p_wbuf = &pIoContext->m_wsaBuf;
OVERLAPPED *p_ol = &pIoContext->m_Overlapped;
pIoContext->ResetBuffer();
pIoContext->m_OpType = RECV_POSTED;
pIoContext->m_nSendBytes = 0;
pIoContext->m_nTotalBytes = 0;
// 初始化完成后,鸳玩,投遞WSARecv請求
int nBytesRecv = WSARecv(pIoContext->m_sockAccept, p_wbuf, 1, &dwBytes, &dwFlags, p_ol, NULL);
// 如果返回值錯誤阅虫,并且錯誤的代碼并非是Pending的話,那就說明這個重疊請求失敗了
if ((SOCKET_ERROR == nBytesRecv) && (WSA_IO_PENDING != WSAGetLastError()))
{
cout << "投遞第一個WSARecv失敳桓颓帝!" << endl;
return false;
}
return true;
}
// 將句柄(Socket)綁定到完成端口中
bool _AssociateWithIOCP(PER_SOCKET_CONTEXT *pContext)
{
// 將用于和客戶端通信的SOCKET綁定到完成端口中
HANDLE hTemp = CreateIoCompletionPort((HANDLE)pContext->m_Socket, m_hIOCompletionPort, (DWORD)pContext, 0);
if (NULL == hTemp)
{
cout << "執(zhí)行CreateIoCompletionPort()出現(xiàn)錯誤.錯誤代碼:%d" << GetLastError() << endl;
return false;
}
return true;
}
bool _DoFirstRecvWithData(PER_IO_CONTEXT* pIoContext)
{
SOCKADDR_IN* ClientAddr = NULL;
SOCKADDR_IN* LocalAddr = NULL;
int remoteLen = sizeof(SOCKADDR_IN), localLen = sizeof(SOCKADDR_IN);
//1. 首先取得連入客戶端的地址信息
m_lpfnGetAcceptExSockAddrs(pIoContext->m_wsaBuf.buf, pIoContext->m_wsaBuf.len - ((sizeof(SOCKADDR_IN) + 16) * 2),
sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, (LPSOCKADDR*)&LocalAddr, &localLen, (LPSOCKADDR*)&ClientAddr, &remoteLen);
//顯示客戶端信息
cout << "客戶額 %s:%d 信息:%s." << inet_ntoa(ClientAddr->sin_addr) << ntohs(ClientAddr->sin_port) << pIoContext->m_wsaBuf.buf << endl;
//2.為新接入的套接創(chuàng)建PER_SOCKET_CONTEXT,并將該套接字綁定到完成端口
PER_SOCKET_CONTEXT* pNewSocketContext = new PER_SOCKET_CONTEXT;
pNewSocketContext->m_Socket = pIoContext->m_sockAccept;
memcpy(&(pNewSocketContext->m_ClientAddr), ClientAddr, sizeof(SOCKADDR_IN));
// 參數(shù)設置完畢窝革,將這個Socket和完成端口綁定(這也是一個關鍵步驟)
if (false == _AssociateWithIOCP(pNewSocketContext))
{
RELEASE(pNewSocketContext);
return false;
}
//3. 繼續(xù)购城,建立其下的IoContext,用于在這個Socket上投遞第一個Recv數(shù)據(jù)請求
PER_IO_CONTEXT* pNewIoContext = pNewSocketContext->GetNewIoContext();
pNewIoContext->m_OpType = SEND_POSTED;
pNewIoContext->m_sockAccept = pNewSocketContext->m_Socket;
pNewIoContext->m_nTotalBytes = pIoContext->m_nTotalBytes;
pNewIoContext->m_nSendBytes = 0;
pNewIoContext->m_wsaBuf.len = pIoContext->m_nTotalBytes;
memcpy(pNewIoContext->m_wsaBuf.buf, pIoContext->m_wsaBuf.buf, pIoContext->m_nTotalBytes); //復制數(shù)據(jù)到WSASend函數(shù)的參數(shù)緩沖區(qū)
//此時是第一次接收數(shù)據(jù)成功虐译,所以這里投遞PostWrite瘪板,向客戶端發(fā)送數(shù)據(jù)
if (false == PostWrite(pNewIoContext))
{
pNewSocketContext->RemoveContext(pNewIoContext);
return false;
}
//4. 如果投遞成功,那么就把這個有效的客戶端信息漆诽,加入到ContextList中去(需要統(tǒng)一管理侮攀,方便釋放資源)
_AddToContextList(pNewSocketContext);
return true;
}
bool PostWrite(PER_IO_CONTEXT* pIoContext)
{
// 初始化變量
DWORD dwFlags = 0;
DWORD dwSendNumBytes = 0;
WSABUF *p_wbuf = &pIoContext->m_wsaBuf;
OVERLAPPED *p_ol = &pIoContext->m_Overlapped;
pIoContext->m_OpType = SEND_POSTED;
//投遞WSASend請求 -- 需要修改
int nRet = WSASend(pIoContext->m_sockAccept, &pIoContext->m_wsaBuf, 1, &dwSendNumBytes, dwFlags,
&pIoContext->m_Overlapped, NULL);
// 如果返回值錯誤,并且錯誤的代碼并非是Pending的話厢拭,那就說明這個重疊請求失敗了
if ((SOCKET_ERROR == nRet) && (WSA_IO_PENDING != WSAGetLastError()))
{
cout << "投遞WSASend失斃加ⅰ!" << endl;
return false;
}
return true;
}
bool _DoRecv(PER_SOCKET_CONTEXT* pSocketContext, PER_IO_CONTEXT* pIoContext)
{
//輸出接收的數(shù)據(jù)
SOCKADDR_IN* ClientAddr = &pSocketContext->m_ClientAddr;
cout << "收到 %s:%d 信息:%s" << inet_ntoa(ClientAddr->sin_addr) << ntohs(ClientAddr->sin_port) << pIoContext->m_wsaBuf.buf << endl;
//發(fā)送數(shù)據(jù)
pIoContext->m_nSendBytes = 0;
pIoContext->m_nTotalBytes = pIoContext->m_nTotalBytes;
pIoContext->m_wsaBuf.len = pIoContext->m_nTotalBytes;
pIoContext->m_wsaBuf.buf = pIoContext->m_szBuffer;
return PostWrite(pIoContext);
}
void stop()
{
if (m_pListenContext != NULL && m_pListenContext->m_Socket != INVALID_SOCKET)
{
// 激活關閉消息通知
SetEvent(m_hShutdownEvent);
for (int i = 0; i < m_nThreads; i++)
{
// 通知所有的完成端口操作退出
PostQueuedCompletionStatus(m_hIOCompletionPort, 0, (DWORD)EXIT_CODE, NULL);
}
// 等待所有的客戶端資源退出
WaitForMultipleObjects(m_nThreads, m_phWorkerThreads, TRUE, INFINITE);
// 清除客戶端列表信息
_ClearContextList();
// 釋放其他資源
_DeInitialize();
}
}
void _ClearContextList()
{
EnterCriticalSection(&m_csContextList);
for (int i = 0; i < m_arrayClientContext.size(); i++)
{
delete m_arrayClientContext.at(i);
}
m_arrayClientContext.clear();
LeaveCriticalSection(&m_csContextList);
}
IOCP小豬案例修改
最后編輯于 :
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
- 文/潘曉璐 我一進店門鼻忠,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人杈绸,你說我怎么就攤上這事帖蔓。” “怎么了瞳脓?”我有些...
- 文/不壞的土叔 我叫張陵塑娇,是天一觀的道長。 經(jīng)常有香客問我劫侧,道長埋酬,這世上最難降的妖魔是什么哨啃? 我笑而不...
- 正文 為了忘掉前任,我火速辦了婚禮写妥,結(jié)果婚禮上拳球,老公的妹妹穿的比我還像新娘。我一直安慰自己珍特,他們只是感情好祝峻,可當我...
- 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著扎筒,像睡著了一般莱找。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上砸琅,一...
- 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼棕所!你這毒婦竟也來了闸盔?” 一聲冷哼從身側(cè)響起,我...
- 正文 年R本政府宣布坪郭,位于F島的核電站,受9級特大地震影響脉幢,放射性物質(zhì)發(fā)生泄漏歪沃。R本人自食惡果不足惜信姓,卻給世界環(huán)境...
- 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望绸罗。 院中可真熱鬧意推,春花似錦、人聲如沸珊蟀。這莊子的主人今日做“春日...
- 文/蒼蘭香墨 我抬頭看了看天上的太陽育灸。三九已至腻窒,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間磅崭,已是汗流浹背儿子。 一陣腳步聲響...