Linux 下C++线程池的简单实现(在老外代码上添加注释)。
作为一个C++菜鸟,研究半天这个代码的实现原理,发现好多语法不太熟悉,因此加了一大堆注释,仅供参考。该段代码主要通过继承workthread类来实现自己的线程代码,通过thread_pool类来管理线程池,线程池不能够实现动态改变线程数目,存在一定局限性。目前可能还有缺陷,毕竟C++来封装这个东西,资源释放什么的必须想清楚,比如vector存储了基类指针实现多态,那么如何释放对象仍需要考虑,后续我可能会更进一步修改完善该代码,下面贡献一下自己的劳动成果。
#include
#include
#include
#include
using namespace std;
/*
WorkerThread class
This class needs to be sobclassed by the user.
*/
class WorkerThread{
public:
? ? int id;
? ? unsigned virtual executeThis()
?{
? return 0;
?}
? ? WorkerThread(int id) : id(id) {}
? ? virtual ~WorkerThread(){}
};
/*
ThreadPool class manages all the ThreadPool related activities. This includes keeping track of idle threads and synchronizations between all threads.
*/
class ThreadPool{
public:
? ? ThreadPool();
? ? ThreadPool(int maxThreadsTemp);
? ? virtual ~ThreadPool();
?
?void destroyPool(int maxPollSecs);
? ? bool assignWork(WorkerThread *worker);
? ? bool fetchWork(WorkerThread **worker);
?void initializeThreads();
?
? ? static void *threadExecute(void *param); // pthread_create()调用的函数必须为静态的
? ? static pthread_mutex_t mutexSync;
? ? static pthread_mutex_t mutexWorkCompletion;//工作完成个数互斥量
?
? ?
private:
? ? int maxThreads;
? ?
? ? pthread_cond_t? condCrit;
? ? sem_t availableWork;
? ? sem_t availableThreads;
? ? vector workerQueue;
? ? int topIndex;
? ? int bottomIndex;
?int incompleteWork;
? ? int queueSize;
};
?
?
#include
#include "threadpool.h"
using namespace std;
//初始化类的静态成员必须加上类型和作用域,static数据成员必须在类定义体的外部定义,不像不同数据成员可以用构造函数初始化
//应该在定义时进行初始化,注意是定义,这个定义应该放在包含类的非内联成员函数定义的文件中。
//注:静态成员函数只能使用静态变量,非静态没有限制,静态变量必须在外部定义和初始化,没初始化就为默认数值
pthread_mutex_t ThreadPool::mutexSync = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t ThreadPool::mutexWorkCompletion = PTHREAD_MUTEX_INITIALIZER;
ThreadPool::ThreadPool()
{
?ThreadPool(2);
}
ThreadPool::ThreadPool(int maxThreads)
{
? if (maxThreads < 1)?
? ? ? maxThreads=1;
?
? pthread_mutex_lock(&mutexSync);
? this->maxThreads = maxThreads;
? this->queueSize = maxThreads;
? workerQueue.resize(maxThreads, NULL);//调整容器大小,然后用默认构造函数初始化新的空间
? topIndex = 0;
? bottomIndex = 0;
? incompleteWork = 0;
? sem_init(&availableWork, 0, 0); //工作队列信号量,表示已经加入队列的工作,初始时没有工作
? sem_init(&availableThreads, 0, queueSize);? //空闲线程信号量,一开始就有quisize个线程可以使用
? pthread_mutex_unlock(&mutexSync);
}
//调用pthread_create()让线程跑起来,threadExecute是类的静态函数,因为pthread_create()第三个参数必须为静态函数
void ThreadPool::initializeThreads()
{
? for(int i = 0; i?{
? pthread_t tempThread;
? pthread_create(&tempThread, NULL, ThreadPool::threadExecute, (void*)this );
? }
}
ThreadPool::~ThreadPool()
{
?//因为对于vector,clear并不真正释放内存(这是为优化效率所做的事),clear实际所做的是为vector中所保存的所有对象调用析构函数(如果有的话),
?//然后初始化size这些东西,让你觉得把所有的对象清除了。。。
? ? //真正释放内存是在vector的析构函数里进行的,所以一旦超出vector的作用域(如函数返回),首先它所保存的所有对象会被析构,
?//然后会调用allocator中的deallocate函数回收对象本身的内存。。。
? ? workerQueue.clear();
}
void ThreadPool::destroyPool(int maxPollSecs = 2)
{
?while(incompleteWork>0 )
?{
? ? //cout << "Work is still incomplete=" << incompleteWork << endl;
? sleep(maxPollSecs);
?}
?cout << "All Done!! Wow! That was a lot of work!" << endl;
?sem_destroy(&availableWork);
?sem_destroy(&availableThreads);
? ? pthread_mutex_destroy(&mutexSync);
? ? pthread_mutex_destroy(&mutexWorkCompletion);
}
//分配人物到top,然后通知有任务需要执行。
bool ThreadPool::assignWork(WorkerThread *workerThread)
{
? ? pthread_mutex_lock(&mutexWorkCompletion);
? ? incompleteWork++;
? //cout << "assignWork...incomapleteWork=" << incompleteWork << endl;
?pthread_mutex_unlock(&mutexWorkCompletion);
?sem_wait(&availableThreads);
?pth