淘先锋技术网

首页 1 2 3 4 5 6 7

概念:

如果一个程序每处理一个任务都需要创建一个线程来处理,假设创建线程的时间为T1,任务执行的时间为T2,线程销毁的时间为T3,那么线程的有效使用时间率为T2/(T1+T2+T3),如果任务执行的时间非常短,那么线程的使用效率就会非常低,这对高并发的服务器性能来说是不能接受的,所以需要引入线程池概念。线程池简单点来说就是创建了很多工作线程,线程函数for(;;)循环,不停的从任务队伍获取任务并处理。

注意点:

1.线程同步问题,互斥锁,条件变量,信号量都可以用于线程同步,单纯的互斥锁只能轮询去处理数据,可能会空转进而造成CPU资源的浪费,信号量并不建议用(并发编程的十五条建议),所以使用互斥锁+条件变量来解决线程同步的问题,可参考条件变量实现生产者消费者模型

线程池的组成:

1.任务队列:简单来说就是数据的容器,用来存放没有处理的任务,可以用队列,也可以用链表等数据结构

2.任务接口:具体点来说就是任务队列里面的数据要调用哪个函数来处理

3..线程管理:负责创建工作线程,销毁工作线程

----- gcc pthread_pool.c -lpthread -----

#include

#include

#include

#include

typedef struct worker {

void* (*callback) (void* arg);

void* arg;

struct worker* next;

} CThread_worker;

static void* callback(void* arg) {

printf("threadid:0x%x, working on task %d\n", pthread_self(), *(int*)arg);

sleep(1);

return(NULL);

}

typedef struct {

pthread_mutex_t mutex;

pthread_cond_t cond;

CThread_worker* queue_head;

int shutdown;

pthread_t* threadid;

int max_thread_num;

int cur_queue_size;

} CThread_pool;

void* thread_routine(void* arg);

static CThread_pool* pool = NULL;

void pool_init(int max_thread_num) {

pool = (CThread_pool*) malloc(sizeof(CThread_pool));

pthread_mutex_init(&(pool->mutex), NULL);

pthread_cond_init(&(pool->cond), NULL);

pool->queue_head = NULL;

pool->shutdown = 0;

pool->cur_queue_size = 0;

pool->threadid = (pthread_t*) malloc(max_thread_num * sizeof (pthread_t));

int i = 0;

for (i=0; i

pthread_create(&(pool->threadid[i]), NULL, thread_routine, NULL);

}

}

int pool_add_worker(void* (*callback) (void* arg), void* arg) {

printf("pool add worker arg:%d\n", *(int*)arg);

CThread_worker* newworker = (CThread_worker*) malloc(sizeof(CThread_worker));

newworker->callback = callback;

newworker->arg = arg;

newworker->next = NULL;

pthread_mutex_lock(&(pool->mutex));

CThread_worker* worker = pool->queue_head;

if (worker != NULL) {

while (worker->next != NULL)

worker = worker->next;

worker->next = newworker;

}

else {

pool->queue_head = newworker;

}

int dosignal;

if (pool->cur_queue_size == 0)

dosignal = 1;

pool->cur_queue_size += 1;

pthread_mutex_unlock(&(pool->mutex));

if (dosignal)

pthread_cond_signal(&(pool->cond));

return 0;

}

int pool_destroy() {

printf("pool destroy now\n");

if (pool->shutdown)

return -1;

pool->shutdown = 1;

pthread_cond_broadcast(&(pool->cond));

int i;

for (i=0; imax_thread_num; ++i)

pthread_join(pool->threadid[i], NULL);

free(pool->threadid);

pool->threadid = NULL;

CThread_worker* head = NULL;

while (pool->queue_head != NULL) {

head = pool->queue_head;

pool->queue_head = pool->queue_head->next;

free(head);

head = NULL;

}

pthread_mutex_destroy(&(pool->mutex));

pthread_cond_destroy(&(pool->cond));

free(pool);

pool = NULL;

printf("pool destroy end\n");

return 0;

}

void* thread_routine(void* arg) {

printf("starting threadid:0x%x\n", pthread_self());

for (; ;) {

pthread_mutex_lock(&(pool->mutex));

while (pool->cur_queue_size == 0 && !pool->shutdown) {

printf("threadid:0x%x is waiting\n", pthread_self());

pthread_cond_wait(&(pool->cond), &(pool->mutex));

}

if (pool->shutdown) {

pthread_mutex_unlock(&(pool->mutex));

printf("threadid:0x%x will exit\n", pthread_self());

pthread_exit(NULL);

}

printf("threadid:0x%x is starting to work\n", pthread_self());

pool->cur_queue_size -= 1;

CThread_worker* worker = pool->queue_head;

pool->queue_head = worker->next;

pthread_mutex_unlock(&(pool->mutex));

(*(worker->callback)) (worker->arg);

free(worker);

worker = NULL;

}

return(NULL);

}

int main(int argc, char const *argv[])

{

pool_init(2);

int* workingnum = (int*) malloc(sizeof(int) * 10);

int i;

for (i=0; i<5; ++i) {

workingnum[i] = i;

pool_add_worker(callback, &workingnum[i]);

}

sleep(5);

pool_destroy();

free(workingnum);

workingnum = NULL;

return 0;

}

测试结果:

66c71614b359463ad7d94607f868c375.png