线程池的C语言实现

1.任务队列

线程池结构体就是存储任务队列的。很明显,任务中需要有执行函数的函数地址和传入的参数。

1
2
3
4
5
typedef struct Task
{
void (*function)(void *arg);
void *arg;
} Task;

在这里,我们的function函数只接受单个参数。

  • 如果需要多个参数呢?我在github上给出了一小段实现代码。
    这是我的github仓库
    其实就是需要一个参数结构体MyParamfunction中传入的arg类型转换为MyParam就行了。

2.线程池定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
typedef struct ThreadPool
{
// 任务队列
Task *taskQ;
int queueCapacity;
int queueSize; // 当前任务个数
int queueFront;
int queueRear;

pthread_t managerID; // 管理者线程ID
pthread_t *threadIDs; // 工作的线程ID

int minNum; // 最小线程数
int maxNum; // 最大线程数
int busyNum; // 忙的线程数
int liveNum; // 存活的线程数
int exitNum; // 要杀死的线程个数

pthread_mutex_t mutexPool; // 锁整个线程池
pthread_mutex_t mutexBusy; // 锁busyNum变量

pthread_cond_t notFull; // 任务队列是否是满的
pthread_cond_t notEmpty;

int shutdown; // 是否销毁线程池
} ThreadPool;

3.函数申明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 创建线程池
ThreadPool *threadPoolCreate(int min, int max, int queueSize);
// 销毁线程池
int threadPoolDestroy(ThreadPool *pool);
// 往等待队列中添加任务,threadPool中的TaskQ
void threadPoolAdd(ThreadPool *pool, void (*func)(void *), void *arg);
// 忙碌线程的个数
int threadPoolBusyNum(ThreadPool *pool);
// 存活线程的个数
int threadPoolAliveNum(ThreadPool *pool);
// manager线程处理函数
void *manager(void *arg);
// 存活线程处理函数
void *worker(void *arg);
// 线程退出
void threadExit(ThreadPool *pool);

4.函数代码实现

threadPoolCreate

体会一下do... while(0)的妙用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
ThreadPool *threadPoolCreate(int min, int max, int queueSize)
{
ThreadPool *pool = (ThreadPool *)malloc(sizeof(ThreadPool));
do
{
if (pool == NULL)
{
printf("malloc threadpool fail...\n");
break;
}

pool->threadIDs = (pthread_t *)malloc(sizeof(pthread_t) * max);
if (pool->threadIDs == NULL)
{
printf("malloc threadIDs fail...\n");
break;
}
memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
pool->minNum = min;
pool->maxNum = max;
pool->busyNum = 0;
pool->liveNum = min; // 和最小个数相等
pool->exitNum = 0;

if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
pthread_cond_init(&pool->notFull, NULL) != 0)
{
printf("mutex or condition init fail...\n");
break;
}

// 任务队列
pool->taskQ = (Task *)malloc(sizeof(Task) * queueSize);
pool->queueCapacity = queueSize;
pool->queueSize = 0;
pool->queueFront = 0;
pool->queueRear = 0;

pool->shutdown = 0;

// 创建线程
pthread_create(&pool->managerID, NULL, manager, pool);
for (int i = 0; i < min; ++i)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
}
return pool;
} while (0);

// 释放资源
if (pool && pool->threadIDs)
free(pool->threadIDs);
if (pool && pool->taskQ)
free(pool->taskQ);
if (pool)
free(pool);

return NULL;
}

threadPoolDestroy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
int threadPoolDestroy(ThreadPool *pool)
{

if (pool == NULL)
{
return -1;
}

// 关闭线程池
pool->shutdown = 1;
// 阻塞回收管理者线程
pthread_join(pool->managerID, NULL);
// 唤醒阻塞的消费者线程
for (int i = 0; i < pool->liveNum; ++i)
{
pthread_cond_signal(&pool->notEmpty);
}
// 释放堆内存
if (pool->taskQ)
{
free(pool->taskQ);
}
if (pool->threadIDs)
{
free(pool->threadIDs);
}

pthread_mutex_destroy(&pool->mutexPool);
pthread_mutex_destroy(&pool->mutexBusy);
pthread_cond_destroy(&pool->notEmpty);
pthread_cond_destroy(&pool->notFull);

free(pool);
pool = NULL;

return 0;
}

threadPoolAdd

  • 为什么需要while循环
    因为pthread_cond_signal会唤醒所有被条件变量阻塞的线程。
    假设有两个生产者线程因为任务队列已经满了,而被阻塞在该位置。随后某个工作线程拿取了一个任务而使得任务队列没有满,接着pthread_cond_signal唤醒这两个生产者线程。这两个生产者线程首先都尝试获取mutexPool这把锁,然后只有一个生产者线程拿到了这把锁,执行到下一个while循环条件不满足就退出了,然后就可以把任务添加到任务队列中,最后释放掉了锁。此时任务队列又满了。
    随后第二个生产者线程获得锁,仍在while循环中,它发现条件仍然满足,又调用pthread_cond_wait函数。
    通过这样的机制,生产者线程就不会在已经满了的任务队列中继续添加任务了。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    void threadPoolAdd(ThreadPool *pool, void (*func)(void *), void *arg)
    {

    pthread_mutex_lock(&pool->mutexPool);
    while (pool->queueSize == pool->queueCapacity && !pool->shutdown)
    {
    // 阻塞生产者线程
    pthread_cond_wait(&pool->notFull, &pool->mutexPool);
    }
    if (pool->shutdown)
    {
    pthread_mutex_unlock(&pool->mutexPool);
    return;
    }
    // 添加任务
    pool->taskQ[pool->queueRear].function = func;
    pool->taskQ[pool->queueRear].arg = arg;
    pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
    pool->queueSize++;

    pthread_cond_signal(&pool->notEmpty);
    pthread_mutex_unlock(&pool->mutexPool);
    }

threadPoolBusyNum、threadPoolAliveNum

这两个函数比较简单,不做说明。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int threadPoolBusyNum(ThreadPool *pool)
{
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
return busyNum;
}

int threadPoolAliveNum(ThreadPool *pool)
{
pthread_mutex_lock(&pool->mutexPool);
int aliveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
return aliveNum;
}

worker

上一篇说过,woker函数是一个while循环,这很自然,因为线程池就是要复用线程,一个线程结束后应该转去执行其他任务,不应该结束。
while循环下还有一个while循环,这和上面的逻辑很像。也是为了防止某个线程尝试获取空队列中的任务,这会引发难以预料的错误。
最后线程的执行就是一次函数调用task.function(task.arg)
这里还需要注意对busyNum的处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
void *worker(void *arg)
{
ThreadPool *pool = (ThreadPool *)arg;

while (1)
{
pthread_mutex_lock(&pool->mutexPool);
// 当前任务队列是否为空
while (pool->queueSize == 0 && !pool->shutdown)
{
// 阻塞工作线程
pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);

// 判断是不是要销毁线程
if (pool->exitNum > 0)
{
pool->exitNum--;
if (pool->liveNum > pool->minNum)
{
pool->liveNum--;
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
}
}

// 判断线程池是否被关闭了
if (pool->shutdown)
{
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}

// 从任务队列中取出一个任务
Task task;
task.function = pool->taskQ[pool->queueFront].function;
task.arg = pool->taskQ[pool->queueFront].arg;
// 移动头结点
pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
pool->queueSize--;
// 解锁
pthread_cond_signal(&pool->notFull);
pthread_mutex_unlock(&pool->mutexPool);

printf("thread %ld start working...\n", pthread_self());
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexBusy);
task.function(task.arg);
free(task.arg);
task.arg = NULL;

printf("thread %ld end working...\n", pthread_self());
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexBusy);
}
return NULL;
}

manager

manager是管理这些存活线程的线程。
exitNum是需要杀死的线程数目。
在这里,增加存活线程和杀死存活线程的逻辑比较简单,详情见代码。
每一次最多增加或杀死NUMBER个存活线程。其实杀死存活线程的逻辑仍然在存活线程的worker函数中,存活线程都是自杀的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
void *manager(void *arg)
{
ThreadPool *pool = (ThreadPool *)arg;
while (!pool->shutdown)
{
// 每隔3s检测一次
sleep(3);

// 取出线程池中任务的数量和当前线程的数量
pthread_mutex_lock(&pool->mutexPool);
int queueSize = pool->queueSize;
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);

// 取出忙的线程的数量
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);

// 添加线程
// 任务的个数>存活的线程个数 && 存活的线程数<最大线程数
if (queueSize > liveNum && liveNum < pool->maxNum)
{
pthread_mutex_lock(&pool->mutexPool);
int counter = 0;
for (int i = 0; i < pool->maxNum && counter < NUMBER && pool->liveNum < pool->maxNum; ++i)
{
if (pool->threadIDs[i] == 0)
{
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
counter++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexPool);
}
// 销毁线程
// 忙的线程*2 < 存活的线程数 && 存活的线程>最小线程数
if (busyNum * 2 < liveNum && liveNum > pool->minNum)
{
pthread_mutex_lock(&pool->mutexPool);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexPool);
// 让工作的线程自杀
for (int i = 0; i < NUMBER; ++i)
{
pthread_cond_signal(&pool->notEmpty);
}
}
}
return NULL;
}

threadExit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void threadExit(ThreadPool *pool)
{
pthread_t tid = pthread_self();
for (int i = 0; i < pool->maxNum; ++i)
{
if (pool->threadIDs[i] == tid)
{
pool->threadIDs[i] = 0;
printf("threadExit() called, %ld exiting...\n", tid);
break;
}
}
pthread_exit(NULL);
}


线程池的C语言实现
http://example.com/2023/08/05/项目/线程池/线程池的实现/
作者
LiuZhaocheng
发布于
2023年8月5日
许可协议