Windows提供的線程池工作模式
- 以異步方式調(diào)用一個函數(shù)
- 每隔一段時間調(diào)用一個函數(shù)
- 當內(nèi)核對象觸發(fā)的時候調(diào)用一個函數(shù)
- 當異步I/O請求完成時調(diào)用一個函數(shù)
備注
- 使用以上內(nèi)存池時颅拦,需要考慮線程同步問題
以異步方式調(diào)用函數(shù)
BOOL WINAPI TrySubmitThreadpoolCallback
(
PTP_SIMPLE_CALLBACK pfns,
PVOID pv,
PTP_CALLBACK_ENVIRON pcbe //用于對線程池進行定制
);
功能
Requests that a thread pool worker thread call the specified callback function.pfns
The callback function.
//回調(diào)函數(shù)類型
VOID CALLBACK SimpleCallback
(
PTP_CALLBACK_INSTANCE Instance,
PVOID Context
);
pv
Optional application-defined data to pass to the callback function.pv
A TP_CALLBACK_ENVIRON structure that defines the environment in which to execute the callback function. The InitializeThreadpoolEnvironment function returns this structure.
If this parameter is NULL, the callback executes in the default callback environment返回值
If the function succeeds, it returns TRUE.
If the function fails, it returns FALSE.
示例
#include <windows.h>
#include <cstdio>
#include <cassert>
void CALLBACK Fun(PTP_CALLBACK_INSTANCE, void* pVoid)
{
printf("%s\n", static_cast<const char*>(pVoid));
}
int main()
{
const char* pStrC = "Hello World";
BOOL bRe = TrySubmitThreadpoolCallback(Fun, const_cast<char*>(pStrC), NULL);
assert(bRe);
system("pause");
return 0;
//程序輸出"Hello World"
}
PTP_WORK WINAPI CreateThreadpoolWork
(
PTP_WORK_CALLBACK pfnwk,
PVOID pv,
PTP_CALLBACK_ENVIRON pcbe //用于對線程池進行定制
);
功能
Creates a new work object.pfnwk
The callback function. A worker thread calls this callback each time you call SubmitThreadpoolWork to post the work object.
VOID CALLBACK WorkCallback
(
PTP_CALLBACK_INSTANCE Instance,
PVOID Context,
PTP_WORK Work
);
pv
Optional application-defined data to pass to the callback function.返回值
If the function succeeds, it returns a TP_WORK structure that defines the work object. Applications do not modify(修改) the members of this structure.
If the function fails, it returns NULL.
void SubmitThreadpoolWork
(
PTP_WORK pwk
);
- 功能
Posts a work object to the thread pool. A worker thread calls the work object's callback function.
WaitForThreadpoolWorkCallbacks
VOID WINAPI WaitForThreadpoolWorkCallbacks
(
PTP_WORK pwk,
BOOL fCancelPendingCallbacks
);
功能
Waits for outstanding(未完成) work callbacks to complete and optionally(可選的) cancels pending callbacks that have not yet started to execute.fCancelPendingCallbacks
Indicates whether to cancel queued callbacks that have not yet started to execute.
VOID WINAPI CloseThreadpoolWork
(
PTP_WORK pwk
);
- 功能
Releases the specified work object.
示例
#include <windows.h>
#include <cstdio>
#include <cassert>
void CALLBACK Fun(PTP_CALLBACK_INSTANCE, void* pVoid, PTP_WORK)
{
printf("%s\n", static_cast<const char*>(pVoid));
}
int main()
{
const char* pStrC = "Hello World";
PTP_WORK pWork = CreateThreadpoolWork(Fun, const_cast<char*>(pStrC), NULL);
assert(pWork);
SubmitThreadpoolWork(pWork);
SubmitThreadpoolWork(pWork);
WaitForThreadpoolWorkCallbacks(pWork, FALSE);
CloseThreadpoolWork(pWork);
system("pause");
/*
輸出:
"Hello World"
"Hello World"
*/
}
備注
- 每一次調(diào)用
TrySubmitThreadpoolCallback
郭变,系統(tǒng)會在內(nèi)部分配一個工作項翩瓜,如果打算提交大量的工作項略贮,則出于內(nèi)存以及性能的考慮,使用CreateThreadpoolWork
創(chuàng)建一個工作項并分多次提交會更好
每隔一段時間調(diào)用一個函數(shù)
PTP_TIMER WINAPI CreateThreadpoolTimer
(
PTP_TIMER_CALLBACK pfnti, //回調(diào)函數(shù)
PVOID pv, //傳給回調(diào)函數(shù)的值
PTP_CALLBACK_ENVIRON pcbe //用于線程池定制
);
功能
Creates a new timer object.返回值
If the function succeeds, it returns a TP_TIMER structure that defines the timer object. Applications do not modify the members of this structure.
If the function fails, it returns NULL.回調(diào)函數(shù)聲明
void __stdcall FunCallBack
(
PTP_CALLBACK_INSTANCE Instance,
PVOID Context,
PTP_TIMER Timer
);
VOID WINAPI SetThreadpoolTimer
(
PTP_TIMER pti,
PFILETIME pftDueTime,
DWORD msPeriod,
DWORD msWindowLength
);
功能
Sets the timer object—, replacing the previous timer, if any. A worker thread calls the timer object's callback after the specified timeout expires.pftDueTime
A pointer to a FILETIME structure that specifies(指定) the absolute or relative time at which the timer should expire(到期). If positive or zero, it indicates the absolute time since January 1, 1601 (UTC), measured in 100 nanosecond units. If negative, it indicates the amount of time to wait relative to the current time.
If this parameter is NULL, the timer object will cease(停止) to queue new callbacks (but callbacks already queued will still occur). Note that if this parameter is zero, the timer will expire immediately.msPeriod
The timer period(周期), in milliseconds. If this parameter is zero, the timer is signaled once. If this parameter is greater than zero, the timer is periodic(周期的). A periodic timer automatically reactivates each time the period elapses(消逝), until the timer is canceled.msWindowLength
The maximum amount of time the system can delay before calling the timer callback. If this parameter is set, the system can batch(批量) calls to conserve power(節(jié)約電力).
WaitForThreadpoolTimerCallbacks
VOID WINAPI WaitForThreadpoolTimerCallbacks
(
PTP_TIMER pti,
BOOL fCancelPendingCallbacks
);
功能
Waits for outstanding timer callbacks to complete and optionally cancels pending callbacks that have not yet started to execute.fCancelPendingCallbacks
Indicates whether to cancel queued callbacks that have not yet started to execute.
VOID WINAPI CloseThreadpoolTimer
(
PTP_TIMER pti
);
功能
Releases the specified timer object.備注
The timer object is freed immediately if there are no outstanding callbacks; otherwise, the timer object is freed asynchronously after the outstanding callback functions complete.
In some cases, callback functions might run after CloseThreadpoolTimer has been called. To prevent this behavior:
1.Call the SetThreadpoolTimer function with the pftDueTime parameter set to NULL and the msPeriod and msWindowLengthparameters set to 0.
2.Call the WaitForThreadpoolTimerCallbacks function.
3.Call CloseThreadpoolTimer.
If there is a cleanup group associated with the timer object, it is not necessary to call this function; calling the CloseThreadpoolCleanupGroupMembers function releases the work, wait, and timer objects associated with the cleanup group.
示例
#include <windows.h>
#include <cstdio>
#include <cassert>
void __stdcall Fun(PTP_CALLBACK_INSTANCE, void* pVoid, PTP_TIMER Timer)
{
printf("%s\n", static_cast<const char*>(pVoid));
}
int main()
{
const char* pStrC = "Hello";
PTP_TIMER pTimer = CreateThreadpoolTimer(Fun, const_cast<char*>(pStrC), NULL);
assert(pTimer);
ULARGE_INTEGER nTem;
FILETIME FileTime;
nTem.QuadPart = -1 * 20000000;
FileTime.dwHighDateTime = nTem.HighPart;
FileTime.dwLowDateTime = nTem.LowPart;
/*
FileTime.dwHighDateTime = 4294967295
FileTime.dwLowDateTime = 4274967296
*/
SetThreadpoolTimer(pTimer, &FileTime, 1000, NULL);
Sleep(5000);
WaitForThreadpoolTimerCallbacks(pTimer, FALSE);
CloseThreadpoolTimer(pTimer);
return 0;
/*
在程序執(zhí)行2秒后,輸出Hello,然后每隔1秒輸出Hello,一共輸出3個Hello僧凰,程序結(jié)束
*/
}
在內(nèi)核對象觸發(fā)時調(diào)用一個函數(shù)
PTP_WAIT WINAPI CreateThreadpoolWait
(
PTP_WAIT_CALLBACK pfnwa, //回調(diào)函數(shù)
PVOID pv, //傳遞給回調(diào)函數(shù)的值
PTP_CALLBACK_ENVIRON pcbe //用于定制線程池
);
功能
Creates a new wait object.返回值
If the function succeeds, it returns a TP_WAIT structure that defines the wait object. Applications do not modify the members of this structure.
If the function fails, it returns NULL
void __stdcall FunCallBack
(
PTP_CALLBACK_INSTANCE Instance,
PVOID Context,
PTP_WAIT Wait,
TP_WAIT_RESULT WaitResult //等待結(jié)果
);
等待結(jié)果的值 | 意義 |
---|---|
WAIT_OBJECT_0 |
內(nèi)核對象被觸發(fā) |
WAIT_TIMEOUT |
超時 |
WAIT_ABANDONED_0 |
互斥量被遺棄 |
VOID WINAPI SetThreadpoolWait
(
PTP_WAIT pwa,
HANDLE h,
PFILETIME pftTimeout
);
功能
Sets the wait object—replacing the previous wait object, if any. A worker thread calls the wait object's callback function after the handle becomes signaled or after the specified timeout expires.h
A handle.
If this parameter is NULL, the wait object will cease(停止) to queue new callbacks (but callbacks already queued will still occur).
If this parameter is not NULL, it must refer to a valid waitable object.
If this handle is closed while(與此同時) the wait is still pending, the function's behavior is undefined. If the wait is still pending and the handle must be closed, use CloseThreadpoolWait to cancel the wait and then close the handle.pftTimeout
A pointer to a FILETIME structure that specifies the absolute or relative time at which the wait operation should time out. If this parameter points to a positive value, it indicates the absolute time since January 1, 1601 (UTC), in 100-nanosecond intervals. If this parameter points to a negative value, it indicates the amount of time to wait relative to the current time.
If this parameter points to 0, the wait times out immediately. If this parameter is NULL, the wait will not time out.備注
A wait object can wait for only one handle. Setting the handle for a wait object replaces the previous handle, if any.
You must re-register the event with the wait object before signaling it each time to trigger the wait callback.
WaitForThreadpoolWaitCallbacks
VOID WINAPI WaitForThreadpoolTimerCallbacks
(
PTP_TIMER pti,
BOOL fCancelPendingCallbacks
);
功能
Waits for outstanding wait callbacks to complete and optionally cancels pending callbacks that have not yet started to execute.fCancelPendingCallbacks
Indicates whether to cancel queued callbacks that have not yet started to execute.
- 功能
Releases the specified wait object.
示例
#include <windows.h>
#include <cstdio>
#include <cassert>
void __stdcall Fun(PTP_CALLBACK_INSTANCE, void* pVoid, PTP_WAIT pWait,
TP_WAIT_RESULT nWaitResult)
{
if (WAIT_OBJECT_0 == nWaitResult)
{
printf("%s\n", static_cast<const char*>(pVoid));
}
else if (WAIT_TIMEOUT == nWaitResult)
{
printf("TimeOut\n");
}
else if (WAIT_ABANDONED_0 == nWaitResult)
{
printf("Abandoned\n");
}
else
{
assert(false);
}
}
int main()
{
const char* pStrC = "Hello";
HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
PTP_WAIT pWait = CreateThreadpoolWait(Fun, const_cast<char*>(pStrC), NULL);
assert(pWait && hEvent);
ULARGE_INTEGER nTem;
nTem.QuadPart = -10000000 * 2;
FILETIME FileTime;
FileTime.dwLowDateTime = nTem.LowPart;
FileTime.dwHighDateTime = nTem.HighPart;
SetThreadpoolWait(pWait, hEvent, &FileTime);
Sleep(3000);
SetThreadpoolWait(pWait, hEvent, &FileTime);
Sleep(1000);
SetEvent(hEvent);
Sleep(2000);
WaitForThreadpoolWaitCallbacks(pWait, FALSE);
CloseHandle(hEvent);
CloseThreadpoolWait(pWait);
/*
運行結(jié)果:
運行2s后輸出"TimeOut"
運行4s后輸出"Hello"
運行6s后退出
*/
}
在異步I/O請求完成時調(diào)用一個函數(shù)
PTP_IO WINAPI CreateThreadpoolIo
(
HANDLE fl, //被線程池內(nèi)部I/O完成端口關(guān)聯(lián)的設備句柄
PTP_WIN32_IO_CALLBACK pfnio, //回調(diào)函數(shù)
PVOID pv, //回調(diào)函數(shù)參數(shù)
PTP_CALLBACK_ENVIRON pcbe //用于線程池定制
);
功能
Creates a new I/O completion object.f1
The file handle to bind to this I/O completion object.返回值
If the function succeeds, it returns a TP_IO structure that defines the I/O object. Applications do not modify the members of this structure.
If the function fails, it returns NULL.
回調(diào)函數(shù)
VOID _stdcall Fun
(
PTP_CALLBACK_INSTANCE Instance,
PVOID Context,
PVOID Overlapped,
ULONG IoResult,
ULONG_PTR NumberOfBytesTransferred,
PTP_IO Io
);
Context
The application - defined data.Overlapped
A pointer to a variable that receives the address of the OVERLAPPED structure that was specified when the completed I / O operation was started.IoResult
The result of the I / O operation.If the I / O is successful, this parameter is
NO_ERROR.Otherwise, this parameter is one of the system error codes.NumberOfBytesTransferred
The number of bytes transferred during the I / O operation that has completed.Io
A TP_IO structure that defines the I / O completion object that generated the callback.
VOID WINAPI StartThreadpoolIo(PTP_IO pio);
功能
Notifies the thread pool that I/O operations may possibly begin for the specified I/O completion object. A worker thread calls the I/O completion object's callback function after the operation completes on the file handle bound to this object.備注
You must call this function before initiating each asynchronous I/O operation on the file handle bound to the I/O completion object. Failure to do so will cause the thread pool to ignore an I/O operation when it completes and will cause memory corruption.
If the I/O operation fails, call the CancelThreadpoolIo function to cancel this notification.
If the file handle bound to the I/O completion object has the notification mode FILE_SKIP_COMPLETION_PORT_ON_SUCCESS and an asynchronous I/O operation returns immediately with success, the object's I/O completion callback function is not called and threadpool I/O notifications must be canceled.
VOID WINAPI WaitForThreadpoolIoCallbacks
(
PTP_IO pio,
BOOL fCancelPendingCallbacks
);
功能
Waits for outstanding I/O completion callbacks to complete and optionally cancels pending callbacks that have not yet started to execute.fCancelPendingCallbacks
Indicates whether to cancel queued callbacks that have not yet started to execute.
VOID WINAPI CloseThreadpoolIo(PTP_IO pio);
- 功能
Releases the specified I/O completion object.
void CancelThreadpoolIo(PTP_IO pio);
- 功能
Cancels the notification from the StartThreadpoolIo function.
示例一
#include <windows.h>
#include <cstdio>
#include <cassert>
VOID _stdcall Fun
(
PTP_CALLBACK_INSTANCE Instance,
PVOID Context,
PVOID Overlapped,
ULONG IoResult,
ULONG_PTR NumberOfBytesTransferred,
PTP_IO Io
)
{
assert(NO_ERROR == IoResult);
printf("I/O操作完成,傳輸字節(jié)數(shù):%d\n", NumberOfBytesTransferred);
}
int main()
{
HANDLE hFile = CreateFileA("1.dat", GENERIC_READ | GENERIC_WRITE, 0,
NULL, OPEN_ALWAYS, FILE_FLAG_OVERLAPPED, NULL);
assert(INVALID_HANDLE_VALUE != hFile);
PTP_IO pIo = CreateThreadpoolIo(hFile, Fun, NULL, NULL);
assert(pIo);
StartThreadpoolIo(pIo);
const char* pStrC = "Hello World";
OVERLAPPED ov = {};
BOOL nRe = WriteFile(hFile, pStrC, static_cast<DWORD>(strlen(pStrC)), NULL, &ov);
if (!(!nRe && ERROR_IO_PENDING == GetLastError()))
{
CancelThreadpoolIo(pIo);
}
WaitForThreadpoolIoCallbacks(pIo, false);
CloseThreadpoolIo(pIo);
CloseHandle(hFile);
system("pause");
//程序輸出:I/O操作完成熟丸,傳輸字節(jié)數(shù):11
}
示例二(TCP客戶端)
#include <WS2tcpip.h>
#include <cstdio>
#include <cassert>
#pragma comment(lib, "ws2_32")
struct SIoContext
{
SIoContext() : sock(INVALID_SOCKET)
{
memset(&OverLapped, 0, sizeof OverLapped);
wsBuff.buf = buff;
wsBuff.len = sizeof buff;
}
OVERLAPPED OverLapped;
SOCKET sock;
WSABUF wsBuff;
char buff[65536];
};
void InitNetEnvironment()
{
WSAData wsaData;
if (0 == WSAStartup(MAKEWORD(2, 2), &wsaData)
&& wsaData.wVersion == 0x202)
{
return;
}
assert(false);
}
VOID _stdcall Fun
(
PTP_CALLBACK_INSTANCE Instance,
PVOID Context,
PVOID Overlapped,
ULONG IoResult,
ULONG_PTR NumberOfBytesTransferred,
PTP_IO Io
)
{
assert(NO_ERROR == IoResult);
SIoContext* pIoContext = static_cast<SIoContext*>(Context);
if (NumberOfBytesTransferred > 0)
{
pIoContext->buff[NumberOfBytesTransferred] = 0;
printf("%s\n", pIoContext->buff);
StartThreadpoolIo(Io);
DWORD nFlag = 0;
if (!WSARecv(pIoContext->sock, &pIoContext->wsBuff, 1, nullptr,
&nFlag, &pIoContext->OverLapped, NULL))
{
CancelThreadpoolIo(Io);
}
}
else
{
printf("連接中斷\n");
}
}
int main()
{
InitNetEnvironment();
SIoContext IoContext;
IoContext.sock = socket(AF_INET, SOCK_STREAM, 0);
assert(INVALID_SOCKET != IoContext.sock);
sockaddr_in sockAddr = {};
sockAddr.sin_family = AF_INET;
sockAddr.sin_port = htons(8080);
if (1 != inet_pton(AF_INET, "127.0.0.1", &sockAddr.sin_addr.s_addr))
{
assert(false);
}
int nRe = connect(IoContext.sock,
reinterpret_cast<sockaddr*>(&sockAddr), sizeof sockaddr);
assert(SOCKET_ERROR != nRe);
PTP_IO pIo = CreateThreadpoolIo
(reinterpret_cast<HANDLE>(IoContext.sock), Fun, &IoContext, NULL);
assert(pIo);
StartThreadpoolIo(pIo);
DWORD nFlag = 0;
if (!WSARecv(IoContext.sock, &IoContext.wsBuff, 1, nullptr, &nFlag,
&IoContext.OverLapped, NULL))
{
CancelThreadpoolIo(pIo);
}
system("pause");
WaitForThreadpoolIoCallbacks(pIo, false);
CloseThreadpoolIo(pIo);
closesocket(IoContext.sock);
WSACleanup();
/*
運行結(jié)果:
先啟動服務器训措,然后服務器先發(fā)送"Hello",然后再發(fā)送"World"光羞,然后斷開連接
本軟件輸出:
"Hello"
"World"
"連接中斷"
*/
}
回調(diào)函數(shù)的終止操作
函數(shù) | 功能 |
---|---|
LeaveCriticalSectionWhenCallbackReturns | Specifies the critical section that the thread pool will release when the current callback completes. |
ReleaseMutexWhenCallbackReturns | Specifies the mutex that the thread pool will release when the current callback completes. |
ReleaseSemaphoreWhenCallbackReturns | Specifies the semaphore that the thread pool will release when the current callback completes. |
SetEventWhenCallbackReturns | Specifies the event that the thread pool will set when the current callback completes. |
FreeLibraryWhenCallbackReturns | Specifies the DLL that the thread pool will unload when the current callback completes. |
- 備注: 對任意一個回調(diào)函數(shù)的實例绩鸣,線程池中的線程只會執(zhí)行一種終止操作,最終調(diào)用的終止操作會覆蓋之前調(diào)用的那個終止函數(shù)
示例
#include <windows.h>
#include <cstdio>
#include <cassert>
CRITICAL_SECTION cs;
void CALLBACK Fun(PTP_CALLBACK_INSTANCE pInstance, void* pVoid, PTP_WORK)
{
EnterCriticalSection(&cs);
LeaveCriticalSectionWhenCallbackReturns(pInstance, &cs);
printf("%s\n", static_cast<const char*>(pVoid));
}
int main()
{
InitializeCriticalSection(&cs);
const char* pStrC = "Hello World";
PTP_WORK pWork = CreateThreadpoolWork(Fun, const_cast<char*>(pStrC), NULL);
assert(pWork);
SubmitThreadpoolWork(pWork);
SubmitThreadpoolWork(pWork);
WaitForThreadpoolWorkCallbacks(pWork, FALSE);
CloseThreadpoolWork(pWork);
DeleteCriticalSection(&cs);
system("pause");
/*
程序運行結(jié)果:
輸出2個"Hello World"
若注釋掉 LeaveCriticalSectionWhenCallbackReturns(pInstance, &cs);
則只能輸出一個"Hello World"纱兑,并陷入阻塞狀態(tài)
*/
}
線程池定制
函數(shù) | 功能 |
---|---|
CreateThreadpool | Allocates a new pool of threads to execute callbacks. 唯一的參數(shù):This parameter is reserved and must be NULL. |
SetThreadpoolThreadMinimum | Sets the minimum number of threads that the specified thread pool must make available to process callbacks. |
SetThreadpoolThreadMaximum | Sets the maximum number of threads that the specified thread pool can allocate to process callbacks. |
CloseThreadpool | Closes the specified thread pool. Remarks:The thread pool is closed immediately if there are no outstanding work, I/O, timer, or wait objects that are bound to the pool; otherwise, the thread pool is released asynchronously after the outstanding objects are freed. |
函數(shù) | 功能 |
---|---|
InitializeThreadpoolEnvironment | Initializes a callback environment. |
SetThreadpoolCallbackPool | Sets the thread pool to be used when generating(發(fā)生) callbacks. |
DestroyThreadpoolEnvironment | Deletes the specified callback environment. Call this function when the callback environment is no longer needed for creating new thread pool objects. |
示例
#include <windows.h>
#include <cstdio>
#include <cassert>
void CALLBACK Fun(PTP_CALLBACK_INSTANCE pInstance, void* pVoid, PTP_WORK)
{
printf("%s\n", static_cast<const char*>(pVoid));
while (true)
{
Sleep(1);
}
}
int main()
{
const char* pStrC = "Hello World";
PTP_POOL pPool = CreateThreadpool(NULL);
assert(pPool);
SetThreadpoolThreadMaximum(pPool, 1);
SetThreadpoolThreadMinimum(pPool, 1);
TP_CALLBACK_ENVIRON CallbackEnviron;
InitializeThreadpoolEnvironment(&CallbackEnviron);
SetThreadpoolCallbackPool(&CallbackEnviron, pPool);
PTP_WORK pWork = CreateThreadpoolWork(Fun, const_cast<char*>(pStrC),
&CallbackEnviron);
assert(pWork);
SubmitThreadpoolWork(pWork);
SubmitThreadpoolWork(pWork);
WaitForThreadpoolWorkCallbacks(pWork, FALSE);
CloseThreadpoolWork(pWork);
CloseThreadpool(pPool);
DestroyThreadpoolEnvironment(&CallbackEnviron);
system("pause");
/*
程序運行結(jié)果:
輸出"Hello World"后無反應呀闻,進程開啟的線程總數(shù)為2
*/
}
得體的銷毀線程池
- 由于線程池可以處理不同來源的項,為了幫助我們對自定義的線程池進行得體的清理潜慎,線程池提供了清理組
- 不深入進行探討此功能