纯C实现线程池

张开发
2026/4/9 11:09:42 15 分钟阅读

分享文章

纯C实现线程池
一、高并发下的无奈如果服务器采用一请求一线程的方式考虑如下计算:内存16GB ÷ POSIX线程8MB 16384 ÷ 8 2048个线程也就是说2048个线程就会占满内存导致宕机为了在高并发情况下留给系统可用的最少内存缓冲设计必不可少。而对于任务而言每个新任务创建一个新线程的方式将任务发送和执行逻辑混在一起难以管理。而线程池的技术很好地解决了线程创建和管理的痛点。线程池由任务队列、工作队列线程池、管理组件所组成。任务生产者只需要关心任务推送而线程池负责消费任务并不关心任务从哪来。1.1 任务队列和工作队列任务队列和工作队列由双向链表实现。任务队列包含执行函数入口工作队列消费任务后用标识符terminate退出循环。//任务队列的结构体structnTask{void(*task_func)(structnTask*task);void*user_data;structnTask*prev;structnTask*next;};//执行队列的结构体structnWorker{pthread_tthreadid;intterminate;structnManager*manager;structnWorker*prev;structnWorker*next;};1.2 条件变量机制管理组件通过条件变量cond类型是pthread_cond_t管理工作线程阻塞等待任务推送涉及对底层链表修改的操作需要互斥锁用于同步。typedefstructnManager{structnTask*tasks;structnWorker*workers;pthread_mutex_tmutex;pthread_cond_tcond;//条件变量}ThreadPool;API实现线程池创建int nThreadPoolCreate(ThreadPool *pool,int numWorkers)回调static void *nThreadPoolCallBack(void *arg)每个任务队列中的任务需要一个回调函数来“消费”它任务推送int nThreadPoolPushTask(ThreadPool *pool, struct nTask *tasks)当线程池拥有可用的线程的时候将任务推送到工作队列当中执行销毁线程池int nThreadPoolDestory(ThreadPool *pool, int nWorker)2.1 线程池创建参数线程池实例pool最大的工作线程数目numWorks初始化条件变量、互斥锁、pool实例各个字段为实例pool创建数量为numWorks的工作线程worker通过for循环调用malloc函数分配内存、pthread_create创建工作线程指定需要的回调函数。intnThreadPoolCreate(ThreadPool*pool,intnumWorkers){if(poolNULL)return-1;if(numWorkers1)numWorkers1;memset(pool,0,sizeof(ThreadPool));pthread_cond_tblank_condPTHREAD_COND_INITIALIZER;memcpy(pool-cond,blank_cond,sizeof(pthread_cond_t));pthread_mutex_init(pool-mutex,NULL);inti0;for(i0;inumWorkers;i){structnWorker*worker(structnWorker*)malloc(sizeof(structnWorker));if(workerNULL){perror(malloc);return-2;}memset(worker,0,sizeof(structnWorker));worker-managerpool;intretpthread_create(worker-threadid,NULL,nThreadPoolCallBack,worker);if(ret){perror(pthread_create);free(worker);return-3;}LIST_INSERT(worker,pool-workers);}printf(call_back\n);return0;}2.2 回调函数while1循环判断任务队列当中是否有任务task为空时调用pthread_cond_wait阻塞等待否则回调task_func消费任务完成后terminate置位退出。staticvoid*nThreadPoolCallBack(void*arg){structnWorker*worker(structnWorker*)arg;while(1){pthread_mutex_lock(worker-manager-mutex);while(worker-manager-tasksNULL){if(worker-terminate)break;pthread_cond_wait(worker-manager-cond,worker-manager-mutex);}//避免死锁if(worker-terminate){pthread_mutex_unlock(worker-manager-mutex);break;}structnTask*taskworker-manager-tasks;LIST_REMOVE(task,worker-manager-tasks);pthread_mutex_unlock(worker-manager-mutex);task-task_func(task);}free(worker);returnNULL;}2.3 任务推送任务推送的关键是调用pthread_cond_signal()唤醒pthread_cond_wait中的工作线程intnThreadPoolPushTask(ThreadPool*pool,structnTask*tasks){pthread_mutex_lock(pool-mutex);LIST_INSERT(tasks,pool-tasks);pthread_cond_signal(pool-cond);//唤醒一个等待这个条件的线程pthread_mutex_unlock(pool-mutex);return0;}2.4 线程池删除将所有工作线程的标识位全部置位则会在被唤醒后自动退出。调用pthread_cond_broadcast函数将所有阻塞的线程全部唤醒这些线程就会自动关闭。相比于将工作队列和任务队列全部置为NULL这样做可以避免内存泄漏的危险。intnThreadPoolDestory(ThreadPool*pool,intnWorker){structnWorker*workerNULL;for(workerpool-workers;worker!NULL;workerworker-next){worker-terminate1;}//这把锁和条件等待时候的锁是同一把pthread_mutex_lock(pool-mutex);pthread_cond_broadcast(pool-cond);//唤醒所有等待这个条件的线程pthread_mutex_unlock(pool-mutex);pool-workersNULL;pool-tasksNULL;return0;}

更多文章