一个线程池的c++实现
2024-09-22 15:41:08
前面我们实现了CallBack类,实现了对任意可调用对象的封装,且统一了调用接口。
现在利用CallBack类,我们来实现一个线程池,我们的线程池包含:
1. 状态机, 用于控制和管理线程池的运行、停止
2. 任务队列, std::queue< std::unique_ptr< Base::Closure > > tasks; 维护了一个queue来保存、添加、删除需要执行的任务(用户定义回调函数,封装成CallBack统一接口)
3. 存放运行线程的容器, std::vector< std::thread > workers; 每次新建一个线程执行一个task,并将该线程放入vector里,便于对线程进行统一的管理
4. 同步锁和条件变量:
std::mutex worker_mutex; // 用于多个线程访问workers时的同步
std::mutex queue_mutex; // 用于多个线程访问tasks时的同步
std::condition_variable condition; // 用于添加任务线程与执行线程之间的协作,以及用于状态机改变时的协作等
#include <functional>
#include<mutex>
#include<vector>
#include<queue>
#include<condition_variable>
#include<thread>
#include<memory>
#include<future>
#include<iostream> namespace Base { class CallBack; typedef CallBack Closure; class CallBack {
public:
template<class F, class... Args>
CallBack(F&& f, Args&&... args) {
task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
} void Run() {
task();
} virtual ~CallBack() {} private:
std::function<void()> task;
}; template<class F, class... Args>
CallBack* NewCallBack(F&& f, Args&&... args) {
return new CallBack(f, args...);
} } // namespace Base class FixedThreadPool{
public:
enum State{
IDLE,
RUNNING,
STOP,
}; FixedThreadPool();
FixedThreadPool( size_t );
FixedThreadPool ( const FixedThreadPool& ) = delete ;
FixedThreadPool& operator=( const FixedThreadPool& ) = delete ; void SetPoolSize( size_t );
size_t Size() const; void AddTask( Base::Closure* task );
~FixedThreadPool(); void AwaitTermination();
void Start();
void Stop(); private:
void ThreadWorker(); std::vector< std::thread > workers; // 用于放置运行线程 std::queue< std::unique_ptr< Base::Closure > > tasks; // 任务队列 std::mutex worker_mutex;
std::mutex queue_mutex;
std::condition_variable condition; State state_;
unsigned int thread_size_;
}; FixedThreadPool::FixedThreadPool():
state_(IDLE),
thread_size_(4)
{
} FixedThreadPool::FixedThreadPool(size_t num_threads):
state_(IDLE),
thread_size_(num_threads)
{
} void FixedThreadPool::SetPoolSize(size_t size) {
thread_size_ = size;
} size_t FixedThreadPool::Size() const {
return thread_size_;
} // Destructor joins all threads
FixedThreadPool::~FixedThreadPool() {
for(std::thread &worker: workers) {
if (worker.joinable()) {
worker.join();
}
}
} // 根据线程池的size(也就是线程数),与任务数量,创建恰当数量的线程,分别从tasks队列里取出任务执行.
// 并把这些线程都放进workers容器里,方便统一管理
void FixedThreadPool::Start() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
state_ = RUNNING;
unsigned int num_working_threads_ =
thread_size_ < tasks.size()? thread_size_ : tasks.size();
for (unsigned int i = workers.size(); i < num_working_threads_; i++) {
workers.emplace_back(std::thread(&FixedThreadPool::ThreadWorker, this));
}
}
condition.notify_all();
} // Thread worker 从tasks队列里取一个任务(回调函数),执行
void FixedThreadPool::ThreadWorker() {
Base::Closure* task;
while (1) {
{
std::unique_lock<std::mutex> lock(queue_mutex);
condition.wait(lock,
[this] { return state_ == STOP || !tasks.empty(); }); // state_==STOP && tasks==empty , state_!=STOP && tasks!=empty, state_==STOP && tasks != empty 继续往下执>行; state_ != STOP && tasks==empty 阻塞等待
if (state_ == STOP && tasks.empty()) {
return;
}
task = (tasks.front()).release();
tasks.pop();
}
task->Run();
}
} // Add new work item to the pool
// 如果当前工作线程数小于线程池的size, 就新建一个线程,从tasks队列取出一个任务并执行,并将该线程放入workers容器
// 然后将该任务task 放进tasks系列, 最后notify_one, 唤醒一个执行线程去执行任务
void FixedThreadPool::AddTask(Base::Closure* task) {
{
std::unique_lock<std::mutex> lock(worker_mutex);
if (state_ == RUNNING && workers.size() < thread_size_) {
workers.emplace_back(std::thread(&FixedThreadPool::ThreadWorker, this));
}
} {
std::unique_lock<std::mutex> lock(queue_mutex);
if (state_ == STOP) {
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks.emplace(std::unique_ptr<Base::Closure>(task));
}
condition.notify_one();
} // Blocks and wait for all previously submitted tasks to be completed.
void FixedThreadPool::AwaitTermination() {
condition.notify_all();
for(std::thread &worker: workers) {
if (worker.joinable()) {
worker.join();
}
}
} // Shut down the threadpool. This method does not wait for previously submitted
// tasks to be completed.
void FixedThreadPool::Stop() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
state_ = STOP;
}
condition.notify_all();
} } int func( int a ){
std::this_thread::sleep_for( std::chrono::seconds(100) );
printf(" a = %d\n", a);
return 0;
} void testThreadpool(){
FixedThreadPool pool(4);
pool.Start();
pool.AddTask(Base::NewCallBack(func, 3));
/*for(int i = 0; i < 10; ++i) {
pool.AddTask(new Base::CallBack([i] {
std::cout << "hello " << i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "world " << i << std::endl;
return i*i;
}));
}*/ pool.Stop();
pool.AwaitTermination();
std::cout << "All tasks complted." << std::endl; } int main(){
testThreadpool(); // 该测试用例先新建了一个能包含4个线程的线程池,首先往里面添加一个任务,该任务是由CallBack封装的普通函数,
return 0; // 先睡1秒,再打印参数3, 线程池分配一个线程来执行这个任务。然后用10次循环来添加任务,该任务是由CallBack封装的
// lamda表达式,依次打印hello 循环下标, world 循环下标。由于线程池最多只能开4个线程,再依次继续新建3个线程执行
} // 执行任务以后,剩下的任务只能先入任务队列并等待之前已经新建的线程来取任务执行。
运行结果:
[daq@centos build]$ ./hello-exe/cmake-good
hello hello 10
world 1
hello 3
world 3
hello 4
world 4
hello 5
world 5
hello 6
world 6
hello 7
world 7
hello 8
world 8
hello hello 9
world 9 world 0
2
world 2
a = 3
All tasks complted.
最新文章
- IBM Z上邮件服务器的配置相关内容
- Yii2.0 rules验证规则大全
- jquery日期格式化
- [Effective C++ --023]宁以non-member、non-friend替换member函数
- iOS NSDatePicker
- python连续爬取多个网页的图片分别保存到不同的文件夹
- css animation 动画的制作
- adodb.stream对象的方法/属性
- Linux Redis自动启动,Redis开机启动,Linux Redis设置开机启动
- C#控制台应用程序之旅游资源与线路管理系统
- 程序员如何避免996、icu?欢迎来讨论一下。
- C++ 使用命名规范
- jquery easyui datagrid mvc server端分页排序筛选的实现
- Android Studio 点击两次返回键,退出APP
- Python之路PythonNet,第二篇,网络2
- rtmp和http方式在播放flv方面的各自优势和劣势
- devexpress之barManager 使用
- 点击input 禁止手机自带键盘弹出
- OpenGl 知识一
- cmd命令提示符大全(干货)