Win32编程之线程池同步(十三)
一、InterlockedAdd函数
InterlockedAdd 是 Windows API 中的一个原子操作函数,用于在多线程环境下对一个变量执行原子加法操作。原子操作是指在执行期间不会被其他线程中断,从而确保多线程环境下的数据一致性。
函数原型:
LONG InterlockedAdd( LONG volatile *Addend, LONG Value );
参数解释:
Addend:一个指向被加数的LONG类型指针。这是要修改的变量的地址。Value:要加到变量上的值。
返回值:
InterlockedAdd 函数返回 Addend 参数原始值的拷贝,即执行加法操作前的值。
函数功能:
InterlockedAdd 函数的作用是将 Addend 指针指向的变量的值与 Value 相加,并将结果存储在该变量中。这个操作是原子的,意味着在执行加法操作的过程中,不会被其他线程中断,从而确保多线程环境下的数据一致性。它适用于实现各种线程间的计数和累加操作。
示例代码:
#include <Windows.h>
#include <stdio.h>
LONG g_count = 0;
void CALLBACK ThreadFunc(PTP_CALLBACK_INSTANCE pInstance, void* p, PTP_WORK pWork)
{
for (int i = 0; i < 200; i++) {
InterlockedAdd(&g_count, 1);
}
}
int main() {
PTP_WORK pwk = CreateThreadpoolWork((PTP_WORK_CALLBACK)ThreadFunc, NULL, NULL);
for (int i = 0; i < 100000; i++) {
SubmitThreadpoolWork(pwk);
}
WaitForThreadpoolWorkCallbacks(pwk, FALSE);
CloseThreadpoolWork(pwk);
printf("g_count = %d\n", g_count);
system("pause");
return 1;
}
二、设置临界区
在 Windows 操作系统中,可以使用临界区(Critical Section)来实现线程间的互斥访问,确保多个线程不会同时访问共享资源,从而避免竞态条件和数据不一致性问题。
以下是如何在 Windows 中定义和使用临界区的基本步骤:
步骤 1:定义临界区变量
首先,需要定义一个 CRITICAL_SECTION 结构体变量,用于表示临界区对象。通常,将其定义为全局或局部变量,以确保多个线程都可以访问。
CRITICAL_SECTION gCriticalSection; // 全局临界区变量
步骤 2:初始化临界区
在使用临界区之前,必须对其进行初始化。可以使用 InitializeCriticalSection 函数进行初始化。
InitializeCriticalSection(&gCriticalSection);
步骤 3:进入临界区
要进入临界区(获取临界区锁),使用 EnterCriticalSection 函数。一旦线程进入临界区,其他线程将被阻塞,直到该线程退出临界区。
EnterCriticalSection(&gCriticalSection);
步骤 4:执行临界区代码
在进入临界区后,可以执行需要互斥访问的代码段,例如修改共享资源的操作。
步骤 5:离开临界区
要离开临界区(释放临界区锁),使用 LeaveCriticalSection 函数。
LeaveCriticalSection(&gCriticalSection);
步骤 6:销毁临界区
在不再需要使用临界区时,应该对其进行清理,释放相关资源。可以使用 DeleteCriticalSection 函数来销毁临界区对象。
DeleteCriticalSection(&gCriticalSection);
示例代码:
#include <Windows.h>
#include <stdio.h>
LONG g_count = 0;
void CALLBACK ThreadFunc(PTP_CALLBACK_INSTANCE pInstance, void* p, PTP_WORK pWork)
{
EnterCriticalSection((CRITICAL_SECTION*)p);
for (int i = 0; i < 200; i++) {
InterlockedAdd(&g_count, 1);
}
//LeaveCriticalSection((CRITICAL_SECTION*)p);
LeaveCriticalSectionWhenCallbackReturns(pInstance, (CRITICAL_SECTION*)p);
}
int main() {
CRITICAL_SECTION cs;
InitializeCriticalSection(&cs);
PTP_WORK pwk = CreateThreadpoolWork((PTP_WORK_CALLBACK)ThreadFunc, &cs, NULL);
for (int i = 0; i < 100000; i++) {
SubmitThreadpoolWork(pwk);
}
WaitForThreadpoolWorkCallbacks(pwk, FALSE);
DeleteCriticalSection(&cs);
CloseThreadpoolWork(pwk);
printf("g_count = %d\n", g_count);
system("pause");
return 1;
}
LeaveCriticalSection 函数和 LeaveCriticalSectionWhenCallbackReturns 函数都用于释放临界区锁(退出临界区),但它们的使用场景和行为略有不同:
LeaveCriticalSection 函数:
-
LeaveCriticalSection函数用于在普通的代码路径中手动退出临界区。这意味着,在临界区中的任何线程都可以调用LeaveCriticalSection来释放临界区锁。 -
调用
LeaveCriticalSection会立即释放临界区锁,使得其他线程有机会进入该临界区。 -
这个函数通常用于手动管理临界区的锁定和解锁操作。
示例用法:
EnterCriticalSection(&gCriticalSection); // 执行需要互斥访问的操作 LeaveCriticalSection(&gCriticalSection);
LeaveCriticalSectionWhenCallbackReturns 函数:
-
LeaveCriticalSectionWhenCallbackReturns函数通常用于在回调函数中退出临界区。回调函数是通过线程池或异步 I/O 操作注册的,当操作完成时会自动调用。 -
这个函数的主要目的是确保在回调函数执行完毕后才离开临界区。这对于在回调函数中对共享资源进行安全访问非常有用。
-
使用
LeaveCriticalSectionWhenCallbackReturns可以确保在回调函数执行期间,其他线程不会进入相同的临界区。
示例用法(在回调函数中):
// 在异步操作的回调函数中 LeaveCriticalSectionWhenCallbackReturns(&gCriticalSection, callbackContext);
总之,LeaveCriticalSection 用于手动管理临界区的锁定和解锁,而 LeaveCriticalSectionWhenCallbackReturns 通常用于在回调函数中确保临界区锁的安全释放。它们的选择取决于你的代码逻辑和使用场景。
三、事件内核对象
SetEventWhenCallbackReturns函数:
SetEventWhenCallbackReturns 是 Windows API 中的一个函数,用于在异步回调函数执行完毕后设置一个事件对象。该函数通常用于在异步操作完成后通知其他线程或等待该事件的线程,以便它们可以继续执行相应的任务。
函数原型:
BOOL SetEventWhenCallbackReturns( PTP_CALLBACK_INSTANCE pci, HANDLE evt );
参数解释:
pci:指向线程池回调实例的指针。在异步回调函数中,通常通过回调函数的参数获取。evt:要设置的事件对象的句柄。
返回值:
- 如果函数调用成功,返回
TRUE。 - 如果函数调用失败,返回
FALSE。可以通过调用GetLastError获取错误信息。
函数功能:
SetEventWhenCallbackReturns 函数用于设置指定的事件对象。它通常在异步回调函数执行完毕后被调用,以通知其他线程或等待事件的线程,表示某个异步操作已经完成。
示例代码:
#include <Windows.h>
#include <stdio.h>
LONG g_count = 0;
HANDLE g_hEvent = NULL;
void CALLBACK ThreadFunc(PTP_CALLBACK_INSTANCE pInstance, void* p, PTP_WORK pWork)
{
WaitForSingleObject(g_hEvent, INFINITE);
for (int i = 0; i < 200; i++) {
InterlockedAdd(&g_count, 1);
}
SetEventWhenCallbackReturns(pInstance, g_hEvent);
}
int main() {
/*
参数二:FALSE:自动触发,TRUE:手动触发
参数三:FALSE:默认无信号,TRUE:默认有信息
*/
g_hEvent = CreateEvent(NULL, FALSE, TRUE, NULL);
PTP_WORK pwk = CreateThreadpoolWork((PTP_WORK_CALLBACK)ThreadFunc, NULL, NULL);
for (int i = 0; i < 100000; i++) {
SubmitThreadpoolWork(pwk);
}
WaitForThreadpoolWorkCallbacks(pwk, FALSE);
CloseThreadpoolWork(pwk);
CloseHandle(g_hEvent);
printf("g_count = %d\n", g_count);
system("pause");
return 1;
}
四、互斥量
ReleaseMutexWhenCallbackReturns函数:
ReleaseMutexWhenCallbackReturns 是 Windows API 中的一个函数,用于在异步回调函数执行完毕后释放一个互斥体(Mutex)。互斥体是一种同步原语,用于实现多线程之间的互斥访问,以确保只有一个线程可以访问共享资源。
函数原型:
BOOL ReleaseMutexWhenCallbackReturns( PTP_CALLBACK_INSTANCE pci, HANDLE mut );
参数解释:
pci:指向线程池回调实例的指针。通常在异步回调函数中,通过回调函数的参数获取。mut:要释放的互斥体(Mutex)的句柄。
返回值:
- 如果函数调用成功,返回
TRUE。 - 如果函数调用失败,返回
FALSE。可以通过调用GetLastError获取错误信息。
函数功能:
ReleaseMutexWhenCallbackReturns 函数用于释放指定的互斥体(Mutex)。它通常在异步回调函数执行完毕后被调用,以释放互斥体并允许其他线程获得对共享资源的访问。
示例代码:
#include <Windows.h>
#include <stdio.h>
LONG g_count = 0;
HANDLE g_hMutex = NULL;
void CALLBACK ThreadFunc(PTP_CALLBACK_INSTANCE pInstance, void* p, PTP_WORK pWork)
{
WaitForSingleObject(g_hMutex, INFINITE);
for (int i = 0; i < 200; i++) {
InterlockedAdd(&g_count, 1);
}
ReleaseMutexWhenCallbackReturns(pInstance, g_hMutex);
}
int main() {
g_hMutex = CreateMutex(NULL, FALSE, NULL);
PTP_WORK pwk = CreateThreadpoolWork((PTP_WORK_CALLBACK)ThreadFunc, NULL, NULL);
for (int i = 0; i < 100000; i++) {
SubmitThreadpoolWork(pwk);
}
WaitForThreadpoolWorkCallbacks(pwk, FALSE);
CloseThreadpoolWork(pwk);
CloseHandle(g_hMutex);
printf("g_count = %d\n", g_count);
system("pause");
return 1;
}
五、信号量
#include <Windows.h>
#include <stdio.h>
LONG g_count = 0;
HANDLE g_hSemaphore = NULL;
void CALLBACK ThreadFunc(PTP_CALLBACK_INSTANCE pInstance, void* p, PTP_WORK pWork) {
WaitForSingleObject(g_hSemaphore, INFINITE);
for (int i = 0; i < 200; i++) {
InterlockedAdd(&g_count, 1);
}
ReleaseSemaphoreWhenCallbackReturns(pInstance, g_hSemaphore, 1);
}
int main() {
//初始化100个资源,给到1个资源
g_hSemaphore = CreateSemaphore(NULL, 1, 100, NULL);
PTP_WORK pwk = CreateThreadpoolWork((PTP_WORK_CALLBACK)ThreadFunc, NULL, NULL);
for (int i = 0; i < 100000; i++) {
SubmitThreadpoolWork(pwk);
}
WaitForThreadpoolWorkCallbacks(pwk, FALSE);
CloseThreadpoolWork(pwk);
CloseHandle(g_hSemaphore);
printf("g_count = %d\n", g_count);
system("pause");
return 1;
}
六、主线程解除等待
#include <Windows.h>
#include <stdio.h>
VOID CALLBACK TaskHandle(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work) {
DisassociateCurrentThreadFromCallback(Instance);
Sleep(2000);
printf("我是子线程\n");
}
int main() {
PTP_WORK workItem = CreateThreadpoolWork(TaskHandle, NULL, NULL);
SubmitThreadpoolWork(workItem);
WaitForThreadpoolWorkCallbacks(workItem, false);
printf("主线程运行完毕\n");
getchar();
CloseThreadpoolWork(workItem);
system("pause");
return 1;
}
七、判断线程池是否有可用资源
#include <Windows.h>
#include <stdio.h>
unsigned long g_count = 0;
VOID CALLBACK TaskHandle(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work) {
if (CallbackMayRunLong(Instance)) {
printf("还有剩余资源:%d\n", InterlockedIncrement(&g_count));
} else {
printf("资源用光了\n");
}
Sleep(5000);
printf("我是子线程\n");
}
int main() {
PTP_WORK workItem = CreateThreadpoolWork(TaskHandle, NULL, NULL);
for (int i = 0; i < 1000; i++) {
SubmitThreadpoolWork(workItem);
}
WaitForThreadpoolWorkCallbacks(workItem, false);
printf("主线程运行完毕\n");
getchar();
CloseThreadpoolWork(workItem);
system("pause");
return 1;
}
八、线程资源池定制
CreateThreadpool:创建线程池 CloseThreadpool:关闭线程池 SetThreadpoolThreadMaximum:设置线程池最小资源数 SetThreadpoolThreadMaximum:设置线程池最大资源数 InitializeThreadpoolEnvironment:初始化线程池环境变量 DestroyThreadpoolEnvironment:销毁线程池环境变量 SetThreadpoolCallbackPool:设置线程池回调的线程池 CreateThreadpoolCleanupGroup:创建清理组 CloseThreadpoolCleanupGroupMembers:关闭清理组 SetThreadpoolCallbackCleanupGroup:为线程池设定清理组
示例代码:
#include <Windows.h>
#include <stdio.h>
VOID CALLBACK TaskHandler (PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work) {
printf("hello world\n");
}
int main() {
PTP_POOL myPool = CreateThreadpool(NULL);
SetThreadpoolThreadMaximum(myPool, 1);
SetThreadpoolThreadMaximum(myPool, 10);
TP_CALLBACK_ENVIRON cbe;
InitializeThreadpoolEnvironment(&cbe);
SetThreadpoolCallbackPool(&cbe, myPool);
PTP_CLEANUP_GROUP cleanupGroup = CreateThreadpoolCleanupGroup();
SetThreadpoolCallbackCleanupGroup(&cbe, cleanupGroup, NULL);
PTP_WORK pWork = CreateThreadpoolWork((PTP_WORK_CALLBACK)TaskHandler, NULL, &cbe);
SubmitThreadpoolWork(pWork);
WaitForThreadpoolWorkCallbacks(pWork, false);
printf("主线程\n");
CloseThreadpoolWork(pWork);
CloseThreadpoolCleanupGroupMembers(cleanupGroup, false, NULL);
DestroyThreadpoolEnvironment(&cbe);
CloseThreadpool(myPool);
system("pause");
return 1;
}

浙公网安备 33010602011771号