概念:
如果一个程序每处理一个任务都需要创建一个线程来处理,假设创建线程的时间为T1,任务执行的时间为T2,线程销毁的时间为T3,那么线程的有效使用时间率为T2/(T1+T2+T3),如果任务执行的时间非常短,那么线程的使用效率就会非常低,这对高并发的服务器性能来说是不能接受的,所以需要引入线程池概念。线程池简单点来说就是创建了很多工作线程,线程函数for(;;)循环,不停的从任务队伍获取任务并处理。
注意点:
1.线程同步问题,互斥锁,条件变量,信号量都可以用于线程同步,单纯的互斥锁只能轮询去处理数据,可能会空转进而造成CPU资源的浪费,信号量并不建议用(并发编程的十五条建议),所以使用互斥锁+条件变量来解决线程同步的问题,可参考条件变量实现生产者消费者模型
线程池的组成:
1.任务队列:简单来说就是数据的容器,用来存放没有处理的任务,可以用队列,也可以用链表等数据结构
2.任务接口:具体点来说就是任务队列里面的数据要调用哪个函数来处理
3..线程管理:负责创建工作线程,销毁工作线程
----- gcc pthread_pool.c -lpthread -----
#include
#include
#include
#include
typedef struct worker {
void* (*callback) (void* arg);
void* arg;
struct worker* next;
} CThread_worker;
static void* callback(void* arg) {
printf("threadid:0x%x, working on task %d\n", pthread_self(), *(int*)arg);
sleep(1);
return(NULL);
}
typedef struct {
pthread_mutex_t mutex;
pthread_cond_t cond;
CThread_worker* queue_head;
int shutdown;
pthread_t* threadid;
int max_thread_num;
int cur_queue_size;
} CThread_pool;
void* thread_routine(void* arg);
static CThread_pool* pool = NULL;
void pool_init(int max_thread_num) {
pool = (CThread_pool*) malloc(sizeof(CThread_pool));
pthread_mutex_init(&(pool->mutex), NULL);
pthread_cond_init(&(pool->cond), NULL);
pool->queue_head = NULL;
pool->shutdown = 0;
pool->cur_queue_size = 0;
pool->threadid = (pthread_t*) malloc(max_thread_num * sizeof (pthread_t));
int i = 0;
for (i=0; i
pthread_create(&(pool->threadid[i]), NULL, thread_routine, NULL);
}
}
int pool_add_worker(void* (*callback) (void* arg), void* arg) {
printf("pool add worker arg:%d\n", *(int*)arg);
CThread_worker* newworker = (CThread_worker*) malloc(sizeof(CThread_worker));
newworker->callback = callback;
newworker->arg = arg;
newworker->next = NULL;
pthread_mutex_lock(&(pool->mutex));
CThread_worker* worker = pool->queue_head;
if (worker != NULL) {
while (worker->next != NULL)
worker = worker->next;
worker->next = newworker;
}
else {
pool->queue_head = newworker;
}
int dosignal;
if (pool->cur_queue_size == 0)
dosignal = 1;
pool->cur_queue_size += 1;
pthread_mutex_unlock(&(pool->mutex));
if (dosignal)
pthread_cond_signal(&(pool->cond));
return 0;
}
int pool_destroy() {
printf("pool destroy now\n");
if (pool->shutdown)
return -1;
pool->shutdown = 1;
pthread_cond_broadcast(&(pool->cond));
int i;
for (i=0; imax_thread_num; ++i)
pthread_join(pool->threadid[i], NULL);
free(pool->threadid);
pool->threadid = NULL;
CThread_worker* head = NULL;
while (pool->queue_head != NULL) {
head = pool->queue_head;
pool->queue_head = pool->queue_head->next;
free(head);
head = NULL;
}
pthread_mutex_destroy(&(pool->mutex));
pthread_cond_destroy(&(pool->cond));
free(pool);
pool = NULL;
printf("pool destroy end\n");
return 0;
}
void* thread_routine(void* arg) {
printf("starting threadid:0x%x\n", pthread_self());
for (; ;) {
pthread_mutex_lock(&(pool->mutex));
while (pool->cur_queue_size == 0 && !pool->shutdown) {
printf("threadid:0x%x is waiting\n", pthread_self());
pthread_cond_wait(&(pool->cond), &(pool->mutex));
}
if (pool->shutdown) {
pthread_mutex_unlock(&(pool->mutex));
printf("threadid:0x%x will exit\n", pthread_self());
pthread_exit(NULL);
}
printf("threadid:0x%x is starting to work\n", pthread_self());
pool->cur_queue_size -= 1;
CThread_worker* worker = pool->queue_head;
pool->queue_head = worker->next;
pthread_mutex_unlock(&(pool->mutex));
(*(worker->callback)) (worker->arg);
free(worker);
worker = NULL;
}
return(NULL);
}
int main(int argc, char const *argv[])
{
pool_init(2);
int* workingnum = (int*) malloc(sizeof(int) * 10);
int i;
for (i=0; i<5; ++i) {
workingnum[i] = i;
pool_add_worker(callback, &workingnum[i]);
}
sleep(5);
pool_destroy();
free(workingnum);
workingnum = NULL;
return 0;
}
测试结果: