线程池

ref: https://github.com/progschj/ThreadPool/blob/master/ThreadPool.h

ref: https://www.jianshu.com/p/eec63026f8d0

思想:

维护一个任务队列,并启动n个线程(消费者)。

注意:

  1. 数据因当通过shared_ptr来管理,否则thread_pool析构后,其它线程访问将产生coredump

    main return 之后,tp被析构,main thread已经结束;但tp内detach的多线程尚未结束,此时detach的线程可能正在执行current(),current结束后,重新进入循环体,std::unique_lockstd::mutex lk(mtx); 但mtx此时已经被析构。

  2. 通过lambda传递任务的参数。

    [=], [&]传递的都是this指针的copy/reference,在thread_pool析构后, this指针将成为野指针。我们因当拷贝shared_ptr。

    [*this]会造成thread_pool的拷贝, 当我们将拷贝函数禁用后,编译错误。

    ref:https://en.cppreference.com/w/cpp/language/lambda

  3. 删除thread_pool的拷贝相关函数。

  4. detach()分离线程。另一做法:用vector保存线程,在~thread_pool()内,执行.join()等待线程break跳出for

  5. stop可设为atomic变量,当执行excute()时,检查stop变量。示例代码中thread_pool析构后,并标记了stop,且工作队列为空时,线程池执行完队列内的job再结束。

    不少人的实现即是将stop作为atomic变量。

// 成员变量:
bool stop
std::queue<std:function<void()>> Q
// 成员变量(互斥管理):
std::mutex mtx
std::condition_variable cv
// 成员函数:
thread_pool(int n = 1)
thread_pool(thread_pool&&) = default;
~thread_pool()
template<class F> void excute(F&& task)
点击查看错误代码示例

Code


struct thread_pool {
public:
thread_pool(int n = 1) {
pdata = std::make_shared();
for(int i = 0; i lk(pdata->mtx); // for( ; ; )结束后, lk释放, 又再度申请lk锁, 可将lk提到for外
if(!pdata->tasks.empty()) {
auto current = std::move(pdata->tasks.front());
pdata->tasks.pop();
printf("pop out from thread %d\n", i);
lk.unlock();
current();
lk.lock();
} else if(pdata->is_shutdown) {
break ;
} else {
pdata->cv.wait(lk); // for( ; ; )结束后, lk释放, 又再度申请lk锁, 可将lk提到for外
}
}
}
).detach();
}
~thread_pool() {
puts("~thread_pool");
{
std::lock_guard lk(pdata->mtx);
pdata->is_shutdown = true;
}
pdata->cv.notify_all();
}
thread_pool(thread_pool&) = delete; template

void execute(F&& task) {

{

std::lock_guardstd::mutex lk(pdata->mtx);

pdata->tasks.emplace(std::forward(task));

puts("put in");

}

pdata->cv.notify_one();

} private:

struct data {

std::mutex mtx;

std::condition_variable cv;

bool is_shutdown = false;

std::queue< std::function<void()> > tasks;

};

std::shared_ptr pdata;

};

正确代码

struct thread_pool {
public:
thread_pool(int n = 1) {
pdata = std::make_shared<data>();
for(int i = 0; i < n; i++)
std::thread(
[pdata = pdata, i] { // if need i
std::unique_lock<std::mutex> lk(pdata->mtx);
for( ; ; ) {
if(!pdata->tasks.empty()) {
auto current = std::move(pdata->tasks.front());
pdata->tasks.pop();
lk.unlock();
current();
lk.lock();
} else if(pdata->stop) {
break ;
} else {
pdata->cv.wait(lk);
}
}
}
).detach();
}
~thread_pool() {
{
std::lock_guard<std::mutex> lk(pdata->mtx);
pdata->stop = true;
}
pdata->cv.notify_all();
}
thread_pool(thread_pool&&) = default; template<typename F>
void execute(F&& task) {
{
std::lock_guard<std::mutex> lk(pdata->mtx);
pdata->tasks.emplace(std::forward<F>(task));
}
pdata->cv.notify_one();
} private:
struct data {
std::mutex mtx;
std::condition_variable cv;
bool stop = false;
std::queue< std::function<void()> > tasks;
};
std::shared_ptr<data> pdata;
};

mapreduce

每个任务对应一个协程

worker可重用,相当于线程池里的一个线程

ntasks为任务数量,即生产者消费者队列进入队列的任务数量

对于每一个任务,启动一个go协程,将该任务分发给某个worker。

如果成功执行,worker可重用,否则,抛弃该worker,采用新的worker。

func (mr *Master) forwardRegistrations(ch chan string) {
i := 0
for {
mr.Lock()
if i < len(mr.workers) {
w := mr.workers[i]
go func() { ch <- w }() // send without holding the lock.
i = i + 1
} else {
mr.newCond.Wait()
}
mr.Unlock()
}
} func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var n_other int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
n_other = nReduce
case reducePhase:
ntasks = nReduce
n_other = len(mapFiles)
} var wg sync.WaitGroup
for i := 0; i < ntasks; i++ {
doTaskArgs := DoTaskArgs{
JobName: jobName,
File: mapFiles[i],
Phase: phase,
TaskNumber: i,
NumOtherPhase: n_other}
wg.Add(1)
go func() {
for {
worker := <-registerChan
if call(worker, "Worker.DoTask", doTaskArgs, nil) {
wg.Done()
registerChan <- worker
break
}
}
}()
}
wg.Wait()
} func Distributed() {
ch := make(chan string)
go mr.forwardRegistrations(ch)
schedule(mr.jobName, mr.files, mr.nReduce, phase, ch)
}

最新文章

  1. jade模板引擎
  2. 一个网络传输框架——zeroMQ 调研笔记
  3. PHP与Golang如何通信?
  4. 可在广域网部署运行的QQ高仿版 -- GG叽叽V3.6,增加语音消息、语音留言等功能
  5. URAL 1031. Railway Tickets(spfa)
  6. codeforces B. Design Tutorial: Learn from Life
  7. LeetCode222 Count Complete Tree Nodes
  8. ORA-01034: ORACLE not available
  9. Oracle Database 12c Using duplicate standby database from active database Created Active DataGuard
  10. semver语义化版本号
  11. Android源码解析——LruCache
  12. spring-cloud-feign负载均衡组件
  13. [转帖]Sqlserver BCP 的用法
  14. java并发的处理方式
  15. Stochastic Gradient Descent收敛判断及收敛速度的控制
  16. linux:ls、ls -l、ls -al区别 示例
  17. 160728、Spark Streaming kafka 实现数据零丢失的几种方式
  18. ubuntu下源码方式安装php5.4
  19. selenium自动化测试浏览器驱动安装(属于转载文章)
  20. linux的screen常用命令使用记录

热门文章

  1. 开源图像识别库OpenCV基于Maven的开发环境准备
  2. [LOJ2290] [THUWC2017] 随机二分图
  3. NOI2017
  4. RAC环境备份,RMAN异机在单实例环境恢复redo问题
  5. 通过 Web Deploy 发布的配置
  6. django learn step
  7. 【sqoop】安装配置测试sqoop1
  8. MySQL Transaction--网络丢包导致长时间未提交事务
  9. pandas数据处理
  10. idea中添加web.xml配置文件与tomcat启动中遇到的web.xml文件找不到的问题