1.任务队列
线程池结构体就是存储任务队列的。很明显,任务中需要有执行函数的函数地址和传入的参数。
1 2 3 4 5
| typedef struct Task { void (*function)(void *arg); void *arg; } Task;
|
在这里,我们的function
函数只接受单个参数。
- 如果需要多个参数呢?我在github上给出了一小段实现代码。
这是我的github仓库
其实就是需要一个参数结构体MyParam
,function
中传入的arg
类型转换为MyParam
就行了。
2.线程池定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| typedef struct ThreadPool { Task *taskQ; int queueCapacity; int queueSize; int queueFront; int queueRear;
pthread_t managerID; pthread_t *threadIDs;
int minNum; int maxNum; int busyNum; int liveNum; int exitNum;
pthread_mutex_t mutexPool; pthread_mutex_t mutexBusy;
pthread_cond_t notFull; pthread_cond_t notEmpty;
int shutdown; } ThreadPool;
|
3.函数申明
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| ThreadPool *threadPoolCreate(int min, int max, int queueSize);
int threadPoolDestroy(ThreadPool *pool);
void threadPoolAdd(ThreadPool *pool, void (*func)(void *), void *arg);
int threadPoolBusyNum(ThreadPool *pool);
int threadPoolAliveNum(ThreadPool *pool);
void *manager(void *arg);
void *worker(void *arg);
void threadExit(ThreadPool *pool);
|
4.函数代码实现
threadPoolCreate
体会一下do... while(0)
的妙用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| ThreadPool *threadPoolCreate(int min, int max, int queueSize) { ThreadPool *pool = (ThreadPool *)malloc(sizeof(ThreadPool)); do { if (pool == NULL) { printf("malloc threadpool fail...\n"); break; }
pool->threadIDs = (pthread_t *)malloc(sizeof(pthread_t) * max); if (pool->threadIDs == NULL) { printf("malloc threadIDs fail...\n"); break; } memset(pool->threadIDs, 0, sizeof(pthread_t) * max); pool->minNum = min; pool->maxNum = max; pool->busyNum = 0; pool->liveNum = min; pool->exitNum = 0;
if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 || pthread_mutex_init(&pool->mutexBusy, NULL) != 0 || pthread_cond_init(&pool->notEmpty, NULL) != 0 || pthread_cond_init(&pool->notFull, NULL) != 0) { printf("mutex or condition init fail...\n"); break; }
pool->taskQ = (Task *)malloc(sizeof(Task) * queueSize); pool->queueCapacity = queueSize; pool->queueSize = 0; pool->queueFront = 0; pool->queueRear = 0;
pool->shutdown = 0;
pthread_create(&pool->managerID, NULL, manager, pool); for (int i = 0; i < min; ++i) { pthread_create(&pool->threadIDs[i], NULL, worker, pool); } return pool; } while (0);
if (pool && pool->threadIDs) free(pool->threadIDs); if (pool && pool->taskQ) free(pool->taskQ); if (pool) free(pool);
return NULL; }
|
threadPoolDestroy
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| int threadPoolDestroy(ThreadPool *pool) {
if (pool == NULL) { return -1; }
pool->shutdown = 1; pthread_join(pool->managerID, NULL); for (int i = 0; i < pool->liveNum; ++i) { pthread_cond_signal(&pool->notEmpty); } if (pool->taskQ) { free(pool->taskQ); } if (pool->threadIDs) { free(pool->threadIDs); }
pthread_mutex_destroy(&pool->mutexPool); pthread_mutex_destroy(&pool->mutexBusy); pthread_cond_destroy(&pool->notEmpty); pthread_cond_destroy(&pool->notFull);
free(pool); pool = NULL;
return 0; }
|
threadPoolAdd
- 为什么需要
while
循环
因为pthread_cond_signal
会唤醒所有被条件变量阻塞的线程。
假设有两个生产者线程因为任务队列已经满了,而被阻塞在该位置。随后某个工作线程拿取了一个任务而使得任务队列没有满,接着pthread_cond_signal
唤醒这两个生产者线程。这两个生产者线程首先都尝试获取mutexPool
这把锁,然后只有一个生产者线程拿到了这把锁,执行到下一个while
循环条件不满足就退出了,然后就可以把任务添加到任务队列中,最后释放掉了锁。此时任务队列又满了。
随后第二个生产者线程获得锁,仍在while
循环中,它发现条件仍然满足,又调用pthread_cond_wait
函数。
通过这样的机制,生产者线程就不会在已经满了的任务队列中继续添加任务了。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| void threadPoolAdd(ThreadPool *pool, void (*func)(void *), void *arg) { pthread_mutex_lock(&pool->mutexPool); while (pool->queueSize == pool->queueCapacity && !pool->shutdown) { pthread_cond_wait(&pool->notFull, &pool->mutexPool); } if (pool->shutdown) { pthread_mutex_unlock(&pool->mutexPool); return; } pool->taskQ[pool->queueRear].function = func; pool->taskQ[pool->queueRear].arg = arg; pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity; pool->queueSize++;
pthread_cond_signal(&pool->notEmpty); pthread_mutex_unlock(&pool->mutexPool); }
|
threadPoolBusyNum、threadPoolAliveNum
这两个函数比较简单,不做说明。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| int threadPoolBusyNum(ThreadPool *pool) { pthread_mutex_lock(&pool->mutexBusy); int busyNum = pool->busyNum; pthread_mutex_unlock(&pool->mutexBusy); return busyNum; }
int threadPoolAliveNum(ThreadPool *pool) { pthread_mutex_lock(&pool->mutexPool); int aliveNum = pool->liveNum; pthread_mutex_unlock(&pool->mutexPool); return aliveNum; }
|
worker
上一篇说过,woker
函数是一个while
循环,这很自然,因为线程池就是要复用线程,一个线程结束后应该转去执行其他任务,不应该结束。
while
循环下还有一个while
循环,这和上面的逻辑很像。也是为了防止某个线程尝试获取空队列中的任务,这会引发难以预料的错误。
最后线程的执行就是一次函数调用task.function(task.arg)
这里还需要注意对busyNum
的处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| void *worker(void *arg) { ThreadPool *pool = (ThreadPool *)arg;
while (1) { pthread_mutex_lock(&pool->mutexPool); while (pool->queueSize == 0 && !pool->shutdown) { pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
if (pool->exitNum > 0) { pool->exitNum--; if (pool->liveNum > pool->minNum) { pool->liveNum--; pthread_mutex_unlock(&pool->mutexPool); threadExit(pool); } } }
if (pool->shutdown) { pthread_mutex_unlock(&pool->mutexPool); threadExit(pool); }
Task task; task.function = pool->taskQ[pool->queueFront].function; task.arg = pool->taskQ[pool->queueFront].arg; pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity; pool->queueSize--; pthread_cond_signal(&pool->notFull); pthread_mutex_unlock(&pool->mutexPool);
printf("thread %ld start working...\n", pthread_self()); pthread_mutex_lock(&pool->mutexBusy); pool->busyNum++; pthread_mutex_unlock(&pool->mutexBusy); task.function(task.arg); free(task.arg); task.arg = NULL;
printf("thread %ld end working...\n", pthread_self()); pthread_mutex_lock(&pool->mutexBusy); pool->busyNum--; pthread_mutex_unlock(&pool->mutexBusy); } return NULL; }
|
manager
manager
是管理这些存活线程的线程。
exitNum
是需要杀死的线程数目。
在这里,增加存活线程和杀死存活线程的逻辑比较简单,详情见代码。
每一次最多增加或杀死NUMBER
个存活线程。其实杀死存活线程的逻辑仍然在存活线程的worker
函数中,存活线程都是自杀的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| void *manager(void *arg) { ThreadPool *pool = (ThreadPool *)arg; while (!pool->shutdown) { sleep(3);
pthread_mutex_lock(&pool->mutexPool); int queueSize = pool->queueSize; int liveNum = pool->liveNum; pthread_mutex_unlock(&pool->mutexPool);
pthread_mutex_lock(&pool->mutexBusy); int busyNum = pool->busyNum; pthread_mutex_unlock(&pool->mutexBusy);
if (queueSize > liveNum && liveNum < pool->maxNum) { pthread_mutex_lock(&pool->mutexPool); int counter = 0; for (int i = 0; i < pool->maxNum && counter < NUMBER && pool->liveNum < pool->maxNum; ++i) { if (pool->threadIDs[i] == 0) { pthread_create(&pool->threadIDs[i], NULL, worker, pool); counter++; pool->liveNum++; } } pthread_mutex_unlock(&pool->mutexPool); } if (busyNum * 2 < liveNum && liveNum > pool->minNum) { pthread_mutex_lock(&pool->mutexPool); pool->exitNum = NUMBER; pthread_mutex_unlock(&pool->mutexPool); for (int i = 0; i < NUMBER; ++i) { pthread_cond_signal(&pool->notEmpty); } } } return NULL; }
|
threadExit
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| void threadExit(ThreadPool *pool) { pthread_t tid = pthread_self(); for (int i = 0; i < pool->maxNum; ++i) { if (pool->threadIDs[i] == tid) { pool->threadIDs[i] = 0; printf("threadExit() called, %ld exiting...\n", tid); break; } } pthread_exit(NULL); }
|