基于pthread封装了一个简易的ThreadPool,具有以下特性:

  1.具有优先级的任务队列

  2.线程池大小可以二次调整,增加线程或者删除空闲线程

  3.任务两种重写方式,重写run或者使用函数回调

首先是任务类的声明

class Task{
public:
string taskName; public:
Task(){}
Task(string _taskName):taskName(_taskName){
priority=;
}
void setPriority(int pri){
priority=pri;
}
int getPriority(){
return priority;
} virtual ~Task(){} virtual void run()=;
public:
int priority;
};
struct TaskCom{
bool operator()(const Task* t1,const Task* t2){
return t1->priority<t2->priority;
}
};
class CbTask:public Task{//回调版本的task
public:
CbTask(string name,int pri):Task(name){
setPriority(pri);
}
void setCallBack(void *(*_process) (void *_arg), void *_arg){
process=_process;
arg=_arg;
}
void run(){
(*process) (arg);
}
private:
void*(*process)(void *arg);
void *arg;
};
class MyTask1:public Task{
public:
MyTask1(string name,int pri):Task(name){
setPriority(pri);
}
void run(){
printf("hello,this is MyTask1\n");
sleep();
}
};
MyTask1 *task1=new MyTask1("mytask1",); void *printStr(void * str){
printf("%s\n",str);
}
CbTask *task6=new CbTask("mytask6",);
char *str="你好";
task6->setCallBack(printStr,static_cast<void*>(str));

线程池声明

class ThreadPool{
private:
static priority_queue<Task*,vector<Task*>,TaskCom > taskList;//带优先级
static map<pthread_t,int> threads;
bool shutdown;
int maxThreadNum;
int ThreadNum; static pthread_mutex_t mutex;
static pthread_mutex_t map_mutex;
static pthread_cond_t cond; protected:
static void *threadRoutine(void *arg);
static void setThreadStat(pthread_t tid,int stat);
public:
void poolInit();
void poolDestroy();
void addThread();
void delThread(int n);
void addTask(Task *task);
int getTaskListSize();
int getPoolSize(); ThreadPool(int _threadNum);
~ThreadPool();
enum ThreadStat{
THREAD_RUN=,
THREAD_WAIT,
THREAD_SHUT
};

线程池实现

#include "ThreadPool.h"

priority_queue<Task*,vector<Task*>,TaskCom> ThreadPool::taskList;
map<pthread_t,int> ThreadPool::threads; pthread_mutex_t ThreadPool::mutex=PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t ThreadPool::map_mutex=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t ThreadPool::cond=PTHREAD_COND_INITIALIZER; ThreadPool::ThreadPool(int _threadNum):maxThreadNum(_threadNum){
poolInit();
}
ThreadPool::~ThreadPool(){
poolDestroy();
}
void *ThreadPool::threadRoutine(void *arg){
pthread_t tid=pthread_self();
pthread_mutex_lock(&map_mutex);
threads.insert(make_pair(tid,THREAD_WAIT));
int &threadStat=threads[tid];
pthread_mutex_unlock(&map_mutex);
while(){
pthread_mutex_lock(&mutex);
while(taskList.size()==&&threadStat==THREAD_WAIT){
pthread_cond_wait(&cond,&mutex);
}
if(threadStat==THREAD_SHUT){
pthread_mutex_unlock(&mutex);
printf("thread %lu will exit\n",tid);
pthread_exit(NULL);
}
// printf("task num=%d\n",taskList.size());
Task *task=taskList.top();
taskList.pop();
// printf("task num=%d\n",taskList.size());
setThreadStat(tid,THREAD_RUN);
printf("thread %lu is running with task--> %s*** %d\n",tid,task->taskName.c_str(),task->getPriority());
pthread_mutex_unlock(&mutex); task->run();
setThreadStat(tid,THREAD_WAIT); printf("thread %lu has done with task--> %s\n",tid,task->taskName.c_str()); }
return NULL;
}
void ThreadPool::setThreadStat(pthread_t tid,int stat){
threads[tid]=stat;
}
void ThreadPool::addThread(){
pthread_t tid;
pthread_create(&tid,NULL,threadRoutine,NULL);
ThreadNum++;
} void ThreadPool::delThread(int n){
int num=;
int size=getPoolSize();
if(n>size){
printf("pool size is less than you input\n");
return;
}
while(num<n){
for(map<pthread_t,int>::iterator ite=threads.begin();ite!=threads.end();){
if(ite->second==THREAD_WAIT){
setThreadStat(ite->first,THREAD_SHUT);
// printf("**thread %lu \n",ite->first);
pthread_cond_broadcast(&cond);
pthread_join(ite->first,NULL);
map<pthread_t,int>::iterator tmp=++ite;
pthread_mutex_lock(&map_mutex);
threads.erase(--ite);
ThreadNum--;
if(ThreadNum!=threads.size())
printf("thread num is wrong\n");
pthread_mutex_unlock(&map_mutex);
ite=tmp;
// printf("**thread %lu \n",ite->first);
// printf("**thread %d\n",threads.size());
num++;
if(num==n)
break;
}else{
++ite;
}
} }
}
void ThreadPool::poolInit(){
for(int i=;i<maxThreadNum;i++)
addThread();
}
void ThreadPool::poolDestroy(){
printf("thread pool begin to destory\n");
while(threads.size()!=){
for(map<pthread_t,int>::iterator ite=threads.begin();ite!=threads.end();){
if(ite->second==THREAD_WAIT){
setThreadStat(ite->first,THREAD_SHUT);
pthread_cond_broadcast(&cond);
pthread_join(ite->first,NULL);
map<pthread_t,int>::iterator tmp=++ite;
pthread_mutex_lock(&map_mutex);
threads.erase(--ite);
ThreadNum--;
if(ThreadNum!=threads.size())
printf("thread num is wrong\n");
pthread_mutex_unlock(&map_mutex);
ite=tmp;
}
}
}
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
printf("thread pool has destoryed\n");
}
void ThreadPool::addTask(Task *task){
taskList.push(task);
pthread_cond_signal(&cond);
}
int ThreadPool::getTaskListSize(){
return taskList.size();
}
int ThreadPool::getPoolSize(){
return ThreadNum;
}

工程

https://github.com/tla001/ThreadPool

最新文章

  1. java 支付宝 第三方即时到账支付 接口
  2. git常见错误
  3. Python将Excel生成SHP
  4. 实现Base64加密解密
  5. 深入理解java虚拟机(6)---内存模型与线程 &amp; Volatile
  6. 【Android测试】【第五节】LogCat——命令行
  7. jquery 下拉菜单
  8. 2016 - 1- 14 UI阶段学习补充 transform属性详解
  9. 一款点击图片进行无限循环的jquery手风琴特效
  10. 深入浅出ES6(十):集合
  11. WSB备份到远程共享文件夹的限制
  12. html中的一些标签学习
  13. wsdl文件结构分析
  14. IOS开发之XCode学习012:Slider和ProgressView
  15. Java 继承Thread类和实现Runnable接口的区别
  16. 编写脚本-SQL SERVER 用户权限分配
  17. 【高德地图Android SDK】视频教学
  18. 【Unity】11.6 恒定力 (Constant Force)
  19. Nginx 配置多站点vhost
  20. vs2010支持html5+css3

热门文章

  1. &lt;head&gt; 中的 JavaScript 函数
  2. ATK-DataPortal 设计框架(一)
  3. 为什么需要Vlan ? Vlan实现原理 ? 不同Vlan的通信 ?
  4. iOS之UIImagePickerController显示中文界面
  5. 初涉基环外向树dp&amp;&amp;bzoj1040: [ZJOI2008]骑士
  6. 用 jQuery 实现表单验证(转载)
  7. httpd的prefork、worker、event
  8. mysql 导出数据字典
  9. 五、RegExp(正则表达式)篇
  10. 微信小程序播放视频