C語言實現(xiàn)支持動態(tài)拓展和銷毀的線程池
本文實例介紹了C 語言實現(xiàn)線程池,支持動態(tài)拓展和銷毀,分享給大家供大家參考,具體內(nèi)容如下
實現(xiàn)功能
- 1.初始化指定個數(shù)的線程
- 2.使用鏈表來管理任務隊列
- 3.支持拓展動態(tài)線程
- 4.如果閑置線程過多,動態(tài)銷毀部分線程
#include <stdio.h> #include <pthread.h> #include <stdlib.h> #include <signal.h> /*線程的任務隊列由,函數(shù)和參數(shù)組成,任務由鏈表來進行管理*/ typedef struct thread_worker_s{ void *(*process)(void *arg); //處理函數(shù) void *arg; //參數(shù) struct thread_worker_s *next; }thread_worker_t; #define bool int #define true 1 #define false 0 /*線程池中各線程狀態(tài)描述*/ #define THREAD_STATE_RUN 0 #define THREAD_STATE_TASK_WAITING 1 #define THREAD_STATE_TASK_PROCESSING 2 #define THREAD_STATE_TASK_FINISHED 3 #define THREAD_STATE_EXIT 4 typedef struct thread_info_s{ pthread_t id; int state; struct thread_info_s *next; }thread_info_t; static char* thread_state_map[] ={"創(chuàng)建","等待任務","處理中","處理完成","已退出"}; /*線程壓縮的時候只有 0,1,2,4 狀態(tài)的線程可以銷毀*/ /*線程池管理器*/ #define THREAD_BUSY_PERCENT 0.5 /*線程:任務 = 1:2 值越小,說明任務多,增加線程*/ #define THREAD_IDLE_PERCENT 2 /*線程:任務 = 2:1 值大于1,線程多于任務,銷毀部分線程*/ typedef struct thread_pool_s{ pthread_mutex_t queue_lock ; //隊列互斥鎖,即涉及到隊列修改時需要加鎖 pthread_cond_t queue_ready; //隊列條件鎖,隊列滿足某個條件,觸發(fā)等待這個條件的線程繼續(xù)執(zhí)行,比如說隊列滿了,隊列空了 thread_worker_t *head ; //任務隊列頭指針 bool is_destroy ; //線程池是否已經(jīng)銷毀 int num; //線程的個數(shù) int rnum; ; //正在跑的線程 int knum; ; //已殺死的線程 int queue_size ; //工作隊列的大小 thread_info_t *threads ; //線程組id,通過pthread_join(thread_ids[0],NULL) 來執(zhí)行線程 pthread_t display ; //打印線程 pthread_t destroy ; //定期銷毀線程的線程id pthread_t extend ; float percent ; //線程個數(shù)于任務的比例 rnum/queue_size int init_num ; pthread_cond_t extend_ready ; //如果要增加線程 }thread_pool_t; /*-------------------------函數(shù)聲明----------------------*/ /** * 1.初始化互斥變量 * 2.初始化等待變量 * 3.創(chuàng)建指定個數(shù)的線程線程 */ thread_pool_t* thread_pool_create(int num); void *thread_excute_route(void *arg); /*調(diào)試函數(shù)*/ void debug(char *message,int flag){ if(flag) printf("%s\n",message); } void *display_thread(void *arg); /** * 添加任務包括以下幾個操作 * 1.將任務添加到隊列末尾 * 2.通知等待進程來處理這個任務 pthread_cond_singal(); */ int thread_pool_add_worker(thread_pool_t *pool,void*(*process)(void *arg),void *arg); //網(wǎng)線程池的隊列中增加一個需要執(zhí)行的函數(shù),也就是任務 /** * 銷毀線程池,包括以下幾個部分 * 1.通知所有等待的進程 pthread_cond_broadcase * 2.等待所有的線程執(zhí)行完 * 3.銷毀任務列表 * 4.釋放鎖,釋放條件 * 4.銷毀線程池對象 */ void *thread_pool_is_need_recovery(void *arg); void *thread_pool_is_need_extend(void *arg); void thread_pool_destory(thread_pool_t *pool); thread_pool_t *thread_pool_create(int num){ if(num<1){ return NULL; } thread_pool_t *p; p = (thread_pool_t*)malloc(sizeof(struct thread_pool_s)); if(p==NULL) return NULL; p->init_num = num; /*初始化互斥變量與條件變量*/ pthread_mutex_init(&(p->queue_lock),NULL); pthread_cond_init(&(p->queue_ready),NULL); /*設置線程個數(shù)*/ p->num = num; p->rnum = num; p->knum = 0; p->head = NULL; p->queue_size =0; p->is_destroy = false; int i=0; thread_info_t *tmp=NULL; for(i=0;i<num;i++){ /*創(chuàng)建線程*/ tmp= (struct thread_info_s*)malloc(sizeof(struct thread_info_s)); if(tmp==NULL){ free(p); return NULL; }else{ tmp->next = p->threads; p->threads = tmp; } pthread_create(&(tmp->id),NULL,thread_excute_route,p); tmp->state = THREAD_STATE_RUN; } /*顯示*/ pthread_create(&(p->display),NULL,display_thread,p); /*檢測是否需要動態(tài)線程*/ //pthread_create(&(p->extend),NULL,thread_pool_is_need_extend,p); /*動態(tài)銷毀*/ pthread_create(&(p->destroy),NULL,thread_pool_is_need_recovery,p); return p; } int thread_pool_add_worker(thread_pool_t *pool,void*(*process)(void*arg),void*arg){ thread_pool_t *p= pool; thread_worker_t *worker=NULL,*member=NULL; worker = (thread_worker_t*)malloc(sizeof(struct thread_worker_s)); int incr=0; if(worker==NULL){ return -1; } worker->process = process; worker->arg = arg; worker->next = NULL; thread_pool_is_need_extend(pool); pthread_mutex_lock(&(p->queue_lock)); member = p->head; if(member!=NULL){ while(member->next!=NULL) member = member->next; member->next = worker; }else{ p->head = worker; } p->queue_size ++; pthread_mutex_unlock(&(p->queue_lock)); pthread_cond_signal(&(p->queue_ready)); return 1; } void thread_pool_wait(thread_pool_t *pool){ thread_info_t *thread; int i=0; for(i=0;i<pool->num;i++){ thread = (thread_info_t*)(pool->threads+i); thread->state = THREAD_STATE_EXIT; pthread_join(thread->id,NULL); } } void thread_pool_destory(thread_pool_t *pool){ thread_pool_t *p = pool; thread_worker_t *member = NULL; if(p->is_destroy) return ; p->is_destroy = true; pthread_cond_broadcast(&(p->queue_ready)); thread_pool_wait(pool); free(p->threads); p->threads = NULL; /*銷毀任務列表*/ while(p->head){ member = p->head; p->head = member->next; free(member); } /*銷毀線程列表*/ thread_info_t *tmp=NULL; while(p->threads){ tmp = p->threads; p->threads = tmp->next; free(tmp); } pthread_mutex_destroy(&(p->queue_lock)); pthread_cond_destroy(&(p->queue_ready)); return ; } /*通過線程id,找到對應的線程*/ thread_info_t *get_thread_by_id(thread_pool_t *pool,pthread_t id){ thread_info_t *thread=NULL; thread_info_t *p=pool->threads; while(p!=NULL){ if(p->id==id) return p; p = p->next; } return NULL; } /*每個線程入口函數(shù)*/ void *thread_excute_route(void *arg){ thread_worker_t *worker = NULL; thread_info_t *thread = NULL; thread_pool_t* p = (thread_pool_t*)arg; //printf("thread %lld create success\n",pthread_self()); while(1){ pthread_mutex_lock(&(p->queue_lock)); /*獲取當前線程的id*/ pthread_t pthread_id = pthread_self(); /*設置當前狀態(tài)*/ thread = get_thread_by_id(p,pthread_id); /*線程池被銷毀,并且沒有任務了*/ if(p->is_destroy==true && p->queue_size ==0){ pthread_mutex_unlock(&(p->queue_lock)); thread->state = THREAD_STATE_EXIT; p->knum ++; p->rnum --; pthread_exit(NULL); } if(thread){ thread->state = THREAD_STATE_TASK_WAITING; /*線程正在等待任務*/ } /*線程池沒有被銷毀,沒有任務到來就一直等待*/ while(p->queue_size==0 && !p->is_destroy){ pthread_cond_wait(&(p->queue_ready),&(p->queue_lock)); } p->queue_size--; worker = p->head; p->head = worker->next; pthread_mutex_unlock(&(p->queue_lock)); if(thread) thread->state = THREAD_STATE_TASK_PROCESSING; /*線程正在執(zhí)行任務*/ (*(worker->process))(worker->arg); if(thread) thread->state = THREAD_STATE_TASK_FINISHED; /*任務執(zhí)行完成*/ free(worker); worker = NULL; } } /*拓展線程*/ void *thread_pool_is_need_extend(void *arg){ thread_pool_t *p = (thread_pool_t *)arg; thread_pool_t *pool = p; /*判斷是否需要增加線程,最終目的 線程:任務=1:2*/ if(p->queue_size>100){ int incr =0; if(((float)p->rnum/p->queue_size) < THREAD_BUSY_PERCENT ){ incr = (p->queue_size*THREAD_BUSY_PERCENT) - p->rnum; /*計算需要增加線程個數(shù)*/ int i=0; thread_info_t *tmp=NULL; thread_pool_t *p = pool; pthread_mutex_lock(&pool->queue_lock); if(p->queue_size<100){ pthread_mutex_unlock(&pool->queue_lock); return ; } for(i=0;i<incr;i++){ /*創(chuàng)建線程*/ tmp= (struct thread_info_s*)malloc(sizeof(struct thread_info_s)); if(tmp==NULL){ continue; }else{ tmp->next = p->threads; p->threads = tmp; } p->num ++; p->rnum ++; pthread_create(&(tmp->id),NULL,thread_excute_route,p); tmp->state = THREAD_STATE_RUN; } pthread_mutex_unlock(&pool->queue_lock); } } //pthread_cond_signal(&pool->extend_ready); } pthread_cond_t sum_ready; /*恢復初始線程個數(shù)*/ void *thread_pool_is_need_recovery(void *arg){ thread_pool_t *pool = (thread_pool_t *)arg; int i=0; thread_info_t *tmp = NULL,*prev=NULL,*p1=NULL; /*如果沒有任務了,當前線程大于初始化的線程個數(shù)*/ while(1){ i=0; if(pool->queue_size==0 && pool->rnum > pool->init_num ){ sleep(5); /*5s秒內(nèi)還是這個狀態(tài)的話就,銷毀部分線程*/ if(pool->queue_size==0 && pool->rnum > pool->init_num ){ pthread_mutex_lock(&pool->queue_lock); tmp = pool->threads; while((pool->rnum != pool->init_num) && tmp){ /*找到空閑的線程*/ if(tmp->state != THREAD_STATE_TASK_PROCESSING){ i++; if(prev) prev->next = tmp->next; else pool->threads = tmp->next; pool->rnum --; /*正在運行的線程減一*/ pool->knum ++; /*銷毀的線程加一*/ kill(tmp->id,SIGKILL); /*銷毀線程*/ p1 = tmp; tmp = tmp->next; free(p1); continue; } prev = tmp; tmp = tmp->next; } pthread_mutex_unlock(&pool->queue_lock); printf("5s內(nèi)沒有新任務銷毀部分線程,銷毀了 %d 個線程\n",i); } } sleep(5); } } /*打印一些信息的*/ void *display_thread(void *arg){ thread_pool_t *p =(thread_pool_t *)arg; thread_info_t *thread = NULL; int i=0; while(1){ printf("threads %d,running %d,killed %d\n",p->num,p->rnum,p->knum); /*線程總數(shù),正在跑的,已銷毀的*/ thread = p->threads; while(thread){ printf("id=%ld,state=%s\n",thread->id,thread_state_map[thread->state]); thread = thread->next; } sleep(5); } }
希望本文所述對大家學習C語言程序設計有所幫助。
上一篇:探究C++中string類的實現(xiàn)原理以及擴展使用
欄 目:C語言
本文標題:C語言實現(xiàn)支持動態(tài)拓展和銷毀的線程池
本文地址:http://mengdiqiu.com.cn/a1/Cyuyan/2585.html
您可能感興趣的文章
- 04-02c語言函數(shù)調(diào)用后清空內(nèi)存 c語言調(diào)用函數(shù)刪除字符
- 04-02c語言的正則匹配函數(shù) c語言正則表達式函數(shù)庫
- 04-02func函數(shù)+在C語言 func函數(shù)在c語言中
- 04-02c語言中對數(shù)函數(shù)的表達式 c語言中對數(shù)怎么表達
- 04-02c語言用函數(shù)寫分段 用c語言表示分段函數(shù)
- 04-02c語言編寫函數(shù)冒泡排序 c語言冒泡排序法函數(shù)
- 04-02c語言沒有round函數(shù) round c語言
- 04-02c語言分段函數(shù)怎么求 用c語言求分段函數(shù)
- 04-02C語言中怎么打出三角函數(shù) c語言中怎么打出三角函數(shù)的值
- 04-02c語言調(diào)用函數(shù)求fibo C語言調(diào)用函數(shù)求階乘


閱讀排行
本欄相關
- 04-02c語言函數(shù)調(diào)用后清空內(nèi)存 c語言調(diào)用
- 04-02func函數(shù)+在C語言 func函數(shù)在c語言中
- 04-02c語言的正則匹配函數(shù) c語言正則表達
- 04-02c語言用函數(shù)寫分段 用c語言表示分段
- 04-02c語言中對數(shù)函數(shù)的表達式 c語言中對
- 04-02c語言編寫函數(shù)冒泡排序 c語言冒泡排
- 04-02c語言沒有round函數(shù) round c語言
- 04-02c語言分段函數(shù)怎么求 用c語言求分段
- 04-02C語言中怎么打出三角函數(shù) c語言中怎
- 04-02c語言調(diào)用函數(shù)求fibo C語言調(diào)用函數(shù)求
隨機閱讀
- 01-11ajax實現(xiàn)頁面的局部加載
- 01-10C#中split用法實例總結(jié)
- 01-10delphi制作wav文件的方法
- 08-05織夢dedecms什么時候用欄目交叉功能?
- 04-02jquery與jsp,用jquery
- 08-05dedecms(織夢)副欄目數(shù)量限制代碼修改
- 01-11Mac OSX 打開原生自帶讀寫NTFS功能(圖文
- 01-10SublimeText編譯C開發(fā)環(huán)境設置
- 01-10使用C語言求解撲克牌的順子及n個骰子
- 08-05DEDE織夢data目錄下的sessions文件夾有什