Linux下设计一个简单的线程池.docx
- 文档编号:14425294
- 上传时间:2023-06-23
- 格式:DOCX
- 页数:10
- 大小:19.46KB
Linux下设计一个简单的线程池.docx
《Linux下设计一个简单的线程池.docx》由会员分享,可在线阅读,更多相关《Linux下设计一个简单的线程池.docx(10页珍藏版)》请在冰点文库上搜索。
Linux下设计一个简单的线程池
Linux下设计一个简单的线程池
定义
什么是线程池?
简单点说,线程池就是有一堆已经创建好了的线程,初始它们都处于空闲等待状态,当有新的任务需要处理的时候,就从这个池子里面取一个空闲等
待的线程来处理该任务,当处理完成了就再次把该线程放回池中,以供后面的任务使用。
当池子里的线程全都处理忙碌状态时,线程池中没有可用的空闲等待线程,
此时,根据需要选择创建一个新的线程并置入池中,或者通知任务线程池忙,稍后再试。
为什么要用线程池?
我们说,线程的创建和销毁比之进程的创建和销毁是轻量级的,但是当我们的任务需要大量进行大量线程的创建和销毁操作时,这个消耗就会变成的相当大。
比如,
当你设计一个压力性能测试框架的时候,需要连续产生大量的并发操作,这个是时候,线程池就可以很好的帮上你的忙。
线程池的好处就在于线程复用,一个任务处理完成后,当前线程可以直接处理下一个任务,而不是销毁后再创建,非常适用于连续产生大量并发任务的场合。
线程池工作原理
线程池中每一个线程的工作过程如下:
图1:
线程的工作流程
线程池的任务就在于负责这些线程的创建,销毁和任务处理参数传递、唤醒和等待。
1.创建若干线程,置入线程池
2.任务达到时,从线程池取空闲线程
3.取得了空闲线程,立即进行任务处理
4.否则新建一个线程,并置入线程池,执行3
5.如果创建失败或者线程池已满,根据设计策略选择返回错误或将任务置入处理队列,等待处理
6.销毁线程池
图2:
线程池的工作原理
线程池设计数据结构设计任务设计
[cpp]viewplaincopytypedefstructtp_work_desc_sTpWorkDesc;typedefvoid(*process_job)(TpWorkDesc*job);structtp_work_desc_s{void*ret;//callin,thatisargumentsvoid*arg;//callout,thatisreturnvalue};
其中,TpWorkDesc是任务参数描述,arg是传递给任务的参数,ret则是任务处理完成后的返回值;
process_job函数是任务处理函数原型,每个任务处理函数都应该这样定义,然后将它作为参数传给线程池处理,线程池将会选择一个空闲线程通过调用该函数来进行任务处理;
线程设计
[cpp]viewplaincopytypedefstructtp_thread_info_sTpThreadInfo;structtp_thread_info_s{pthread_tthread_id;//threadidnumTPBOOLis_busy;//threadstatus:
true-busy;flase-idlepthread_cond_tthread_cond;pthread_mutex_tthread_lock;process_jobproc_fun;TpWorkDesc*th_job;TpThreadPool*tp_pool;};
TpThreadInfo是对一个线程的描述。
thread_id是该线程的ID;
is_busy用于标识该线程是否正处理忙碌状态;
thread_cond用于任务处理时的唤醒和等待;
thread_lock,用于任务加锁,用于条件变量等待加锁;
proc_fun是当前任务的回调函数地址;
th_job是任务的参数信息;
tp_pool是所在线程池的指针;
线程池设计
[cpp]viewplaincopytypedefstructtp_thread_pool_sTpThreadPool;structtp_thread_pool_s{unsignedmin_th_num;//minthreadnumberinthepoolunsignedcur_th_num;//currentthreadnumberinthepoolunsignedmax_th_num;//maxthreadnumberinthepoolpthread_mutex_ttp_lock;pthread_tmanage_thread_id;//managethreadidnumTpThreadInfo*thread_info;Queueidle_q;TPBOOLstop_flag;};
TpThreadPool是对线程池的描述。
min_th_num是线程池中至少存在的线程数,线程池初始化的过程中会创建min_th_num数量的线程;
cur_th_num是线程池当前存在的线程数量;
max_th_num则是线程池最多可以存在的线程数量;
tp_lock用于线程池管理时的互斥;
manage_thread_id是线程池的管理线程ID;
thread_info则是指向线程池数据,这里使用一个数组来存储线程池中线程的信息,该数组的大小为max_th_num;
idle_q是存储线程池空闲线程指针的队列,用于从线程池快速取得空闲线程;
stop_flag用于线程池的销毁,当stop_flag为FALSE时,表明当前线程池需要销毁,所有忙碌线程在处理完当前任务后会退出;算法设计线程池的创建和初始化
线程创建
创建伊始,线程池线程容量大小上限为max_th_num,初始容量为min_th_num;
[cpp]viewplaincopyTpThreadPool*tp_create(unsignedmin_num,unsignedmax_num){TpThreadPool*pTp;pTp=(TpThreadPool*)malloc(sizeof(TpThreadPool));memset(pTp,0,sizeof(TpThreadPool));//initmembervarpTp->min_th_num=min_num;pTp->cur_th_num=min_num;pTp->max_th_num=max_num;pthread_mutex_init(&pTp->tp_lock,NULL);//mallocmemfornumthreadinfostructif(NULL!
=pTp->thread_info)free(pTp->thread_info);pTp->thread_info=(TpThreadInfo*)malloc(sizeof(TpThreadInfo)*pTp->max_th_num);memset(pTp->thread_info,0,sizeof(TpThreadInfo)*pTp->max_th_num);returnpTp;}线程初始化
[cpp]viewplaincopyTPBOOLtp_init(TpThreadPool*pTp){inti;interr;TpThreadInfo*pThi;initQueue(&pTp->idle_q);pTp->stop_flag=FALSE;//createworkthreadandinitworkthreadinfofor(i=0;i<pTp->min_th_num;i++){pThi=pTp->thread_info+i;pThi->tp_pool=pTp;pThi->is_busy=FALSE;pthread_cond_init(&pThi->thread_cond,NULL);pthread_mutex_init(&pThi->thread_lock,NULL);pThi->proc_fun=def_proc_fun;pThi->th_job=NULL;enQueue(&pTp->idle_q,pThi);err=pthread_create(&pThi->thread_id,NULL,tp_work_thread,pThi);if(0!
=err){perror("tp_init:
createworkthreadfailed.");clearQueue(&pTp->idle_q);returnFALSE;}}//createmanagethreaderr=pthread_create(&pTp->manage_thread_id,NULL,tp_manage_thread,pTp);if(0!
=err){clearQueue(&pTp->idle_q);printf("tp_init:
creatmanagethreadfailed\n");returnFALSE;}returnTRUE;}初始线程池中线程数量为min_th_num,对这些线程一一进行初始化;
将这些初始化的空闲线程一一置入空闲队列;
创建管理线程,用于监控线程池的状态,并适当回收多余的线程资源;
线程池的关闭和销毁
[cpp]viewplaincopyvoidtp_close(TpThreadPool*pTp,TPBOOLwait){unsignedi;pTp->stop_flag=TRUE;if(wait){for(i=0;i<pTp->cur_th_num;i++){pthread_cond_signal(&pTp->thread_info[i].thread_cond);}for(i=0;i<pTp->cur_th_num;i++){pthread_join(pTp->thread_info[i].thread_id,NULL);pthread_mutex_destroy(&pTp->thread_info[i].thread_lock);pthread_cond_destroy(&pTp->thread_info[i].thread_cond);}}else{//closeworkthreadfor(i=0;i<pTp->cur_th_num;i++){kill((pid_t)pTp->thread_info[i].thread_id,SIGKILL);pthread_mutex_destroy(&pTp->thread_info[i].thread_lock);pthread_cond_destroy(&pTp->thread_info[i].thread_cond);}}//closemanagethreadkill((pid_t)pTp->manage_thread_id,SIGKILL);pthread_mutex_destroy(&pTp->tp_lock);//freethreadstructfree(pTp->thread_info);pTp->thread_info=NULL;}
线程池关闭的过程中,可以选择是否对正在处理的任务进行等待,如果是,则会唤醒所有任务,然后等待所有任务执行完成,然后返回;如果不是,则将立即杀死所有线程,然后返回,注意:
这可能会导致任务的处理中断而产生错误!
任务处理
[cpp]viewplaincopyTPBOOLtp_process_job(TpThreadPool*pTp,process_jobproc_fun,TpWorkDesc*job){TpThreadInfo*pThi;//fillpTp->thread_info'srelativeworkkeypthread_mutex_lock(&pTp->tp_lock);pThi=(TpThreadInfo*)deQueue(&pTp->idle_q);pthread_mutex_unlock(&pTp->tp_lock);if(pThi){pThi->is_busy=TRUE;pThi->proc_fun=proc_fun;pThi->th_job=job;pthread_cond_signal(&pThi->thread_cond);DEBUG("Fetchathreadfrompool.\n");returnTRUE;}//ifallcurrentthreadarebusy,newthreadiscreatedherepthread_mutex_lock(&pTp->tp_lock);pThi=tp_add_thread(pTp);pthread_mutex_unlock(&pTp->tp_lock);if(!
pThi){DEBUG("Thethreadpoolisfull,nomorethreadavailable.\n");returnFALSE;}DEBUG("Nomoreidlethread,createdanewone.\n");pThi->proc_fun=proc_fun;pThi->th_job=job;//sendcondtoworkthreadpthread_cond_signal(&pThi->thread_cond);returnTRUE;}
当一个新任务到达是,线程池首先会检查是否有可用的空闲线程,如果是,则采用才空闲线程进行任务处理并返回TRUE,如果不是,则尝试新建一个线程,并使用该线程对任务进行处理,如果失败则返回FALSE,说明线程池忙碌或者出错。
[cpp]viewplaincopystaticvoid*tp_work_thread(void*arg){pthread_tcurid;//currentthreadidTpThreadInfo*pTinfo=(TpThreadInfo*)arg;//waitcondforprocessingrealjob.while(!
(pTinfo->tp_pool->stop_flag)){pthread_mutex_lock(&pTinfo->thread_lock);pthread_cond_wait(&pTinfo->thread_cond,&pTinfo->thread_lock);pthread_mutex_unlock(&pTinfo->thread_lock);//processpTinfo->proc_fun(pTinfo->th_job);//threadstatebesetidleafterwork//pthread_mutex_lock(&pTinfo->thread_lock);pTinfo->is_busy=FALSE;enQueue(&pTinfo->tp_pool->idle_q,pTinfo);//pthread_mutex_unlock(&pTinfo->thread_lock);DEBUG("Jobdone,Iamidlenow.\n");}}
上面这个函数是任务处理函数,该函数将始终处理等待唤醒状态,直到新任务到达或者线程销毁时被唤醒,然后调用任务处理回调函数对任务进行处理;当任务处理完成时,则将自己置入空闲队列中,以供下一个任务处理。
[cpp]viewplaincopyTpThreadInfo*tp_add_thread(TpThreadPool*pTp){interr;TpThreadInfo*new_thread;if(pTp->max_th_num<=pTp->cur_th_num)returnNULL;//mallocnewthreadinfostructnew_thread=pTp->thread_info+pTp->cur_th_num;new_thread->tp_pool=pTp;//initnewthread'scond&mutexpthread_cond_init(&new_thread->thread_cond,NULL);pthread_mutex_init(&new_thread->thread_lock,NULL);//initstatusisbusy,onlynewprocessjobwillcallthisfunctionnew_thread->is_busy=TRUE;err=pthread_create(&new_thread->thread_id,NULL,tp_work_thread,new_thread);if(0!
=err){free(new_thread);returnNULL;}//addcurrentthreadnumberinthepool.pTp->cur_th_num++;returnnew_thread;}
上面这个函数用于向线程池中添加新的线程,该函数将会在当线程池没有空闲线程可用时被调用。
函数将会新建一个线程,并设置自己的状态为busy(立即就要被用于执行任务)。
线程池管理
线程池的管理主要是监控线程池的整体忙碌状态,当线程池大部分线程处于空闲状态时,管理线程将适当的销毁一定数量的空闲线程,以便减少线程池对系统资源的消耗。
这里设计认为,当空闲线程的数量超过线程池线程数量的1/2时,线程池总体处理空闲状态,可以适当销毁部分线程池的线程,以减少线程池对系统资源的开销。
线程池状态计算
这里的BUSY_THRESHOLD的值是0.5,也即是当空闲线程数量超过一半时,返回0,说明线程池整体状态为闲,否则返回1,说明为忙。
[cpp]viewplaincopyinttp_get_tp_status(TpThreadPool*pTp){floatbusy_num=0.0;inti;//getbusythreadnumberbusy_num=pTp->cur_th_num-pTp->idle_q.count;DEBUG("Currentthreadpoolstatus,currentnum:
%u,busynum:
%u,idlenum:
%u\n",pTp->cur_th_num,(unsigned)busy_num,pTp->idle_q.count);//0.2?
orothernum?
if(busy_num/(pTp->cur_th_num)<BUSY_THRESHOLD)return0;//idlestatuselsereturn1;//busyornormalstatus}
线程的销毁算法
1.从空闲队列中dequeue一个空闲线程指针,该指针指向线程信息数组的某项,例如这里是p;
2.销毁该线程
3.把线程信息数组的最后一项拷贝至位置p
4.线程池数量减少一,即cur_th_num--图3:
线程销毁
[cpp]viewplaincopyTPBOOLtp_delete_thread(TpThreadPool*pTp){unsignedidx;TpThreadInfo*pThi;TpThreadInfotT;//currentthreadnumcan't<minthreadnumif(pTp->cur_th_num<=pTp->min_th_num)returnFALSE;//pthread_mutex_lock(&pTp->tp_lock);pThi=deQueue(&pTp->idle_q);//pthread_mutex_unlock(&pTp->tp_lock);if(!
pThi)returnFALSE;//afterdeletingidlethread,currentthreadnum-1pTp->cur_th_num--;memcpy(&tT,pThi,sizeof(TpThreadInfo));memcpy(pThi,pTp->thread_info+pTp->cur_th_num,sizeof(TpThreadInfo));//killtheidlethreadandfreeinfostructkill((pid_t)tT.thread_id,SIGKILL);pthread_mutex_destroy(&tT.thread_lock);pthread_cond_destroy(&tT.thread_cond);returnTRUE;}
线程池监控
线程池通过一个管理线程来进行监控,管理线程将会每隔一段时间对线程池的状态进行计算,根据线程池的状态适当的销毁部分线程,减少对系统资源的消耗。
[cpp]viewplaincopystaticvoid*tp_manage_thread(void*arg){TpThreadPool*pTp=(TpThreadPool*)arg;//mainthreadpoolstructinstance//1?
sleep(MANAGE_INTERVAL);do{if(tp_get_tp_status(pTp)==0){do{if(!
tp_delete_thread(pTp))break;}while(TRUE);}//endforif//1?
sleep(MANAGE_INTERVAL);}while(!
pTp->stop_flag);returnNULL;}程序测试
至此,我们的设计需要使用一个测试程序来进行验证。
于是,我们写下这样一段代码。
[cpp]viewplaincopy#include<stdio.h>#include<unistd.h>#include"thread_po
- 配套讲稿:
如PPT文件的首页显示word图标,表示该PPT已包含配套word讲稿。双击word图标可打开word文档。
- 特殊限制:
部分文档作品中含有的国旗、国徽等图片,仅作为作品整体效果示例展示,禁止商用。设计者仅对作品中独创性部分享有著作权。
- 关 键 词:
- Linux 下设 一个 简单 线程