0.关于

为缩短篇幅,本系列记录如下:

再谈多线程模型之生产者消费者(基础概念)(c++11实现)

再谈多线程模型之生产者消费者(单一生产者和单一消费者)(c++11实现)

再谈多线程模型之生产者消费者(单一生产者和多消费者)(c++11实现)

再谈多线程模型之生产者消费者(多生产者和单一消费者 )(c++11实现)

再谈多线程模型之生产者消费者(多生产者和多消费者 )(c++11实现)本文

再谈多线程模型之生产者消费者(总结)(c++11实现)

本文涉及到的代码演示环境: VS2017

欢迎留言指正

1. 多生产者&多消费者

  • 1.1 相对一对一一对多多对多则是一对一多对多的结合体。
  • 1.2 生产者有多个,且其相互之间存在竞争
  • 1.3 消费者有多个,其其相互之间存在竞争
  • 1.4 大家共用一个缓冲区,还要考虑生产者与消费者之间的T同步情况。
  • 1.5 结构体模型是这样的:
template<typename T>
struct repo_
{
// 用作互斥访问缓冲区
std::mutex _mtx_queue; // 缓冲区最大size
unsigned int _count_max_queue_10 = 10; // 缓冲区
std::queue<T> _queue; // 缓冲区没有满,通知生产者继续生产
std::condition_variable _cv_queue_not_full; // 缓冲区不为空,通知消费者继续消费
std::condition_variable _cv_queue_not_empty; // 用于生产者之间的竞争
std::mutex _mtx_pro;
// 计算当前已经生产了多少数据了
unsigned int _cnt_cur_pro = 0; // 用于消费者之间的竞争
std::mutex _mtx_con;
// 计算当前已经消费多少数据了
unsigned int _cnt_cur_con = 0; repo_(const unsigned int count_max_queue = 10) :_count_max_queue_10(count_max_queue)
, _cnt_cur_con(0) {
;
} repo_(const repo_&instance) = delete;
repo_& operator = (const repo_& instance) = delete;
repo_(const repo_&&instance) = delete;
repo_& operator = (const repo_&& instance) = delete; };

可见,相对单一消费者和单一生产者模型,多了下面的代码,用于解决竞争的问题。


// 用于生产者之间的竞争
std::mutex _mtx_pro;
// 计算当前已经生产了多少数据了
unsigned int _cnt_cur_pro = 0; // 用于消费者之间的竞争
std::mutex _mtx_con;
// 计算当前已经消费多少数据了
unsigned int _cnt_cur_con = 0;
  • 1.6 生产者线程
template< typename T >
void thread_pro(const int thread_index, const int count_max_produce, repo<T>* param_repo)
{
if (nullptr == param_repo || NULL == param_repo)
return; while (true)
{
bool is_running = true; {
// 用于生产者之间竞争
std::unique_lock<std::mutex> lock(param_repo->_mtx_pro); // 缓冲区没有满,继续生产
if (param_repo->_cnt_cur_pro < cnt_total_10)
{
thread_produce_item<T>(thread_index, *param_repo, param_repo->_cnt_cur_pro);
++param_repo->_cnt_cur_pro;
}
else
is_running = false;
} std::this_thread::sleep_for(std::chrono::microseconds(16));
if (!is_running)
break;
}
}
  • 1.7 消费者线程
template< typename T >
void thread_con(const int thread_index, repo<T>* param_repo)
{
if (nullptr == param_repo || NULL == param_repo)
return; while (true)
{
bool is_running = true;
{
std::unique_lock<std::mutex> lock(param_repo->_mtx_con);
// 还没消费到指定的数目,继续消费
if (param_repo->_cnt_cur_con < cnt_total_10)
{
thread_consume_item<T>(thread_index, *param_repo);
++param_repo->_cnt_cur_con;
}
else
is_running = false; } std::this_thread::sleep_for(std::chrono::microseconds(16)); // 结束线程
if ((!is_running))
break;
}
}

1.8 完整源码

#pragma once

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector> std::mutex _mtx;
std::condition_variable _cv_not_full;
std::condition_variable _cv_not_empty; const int max_queue_size_10 = 10; enum
{
// 总生产数目
cnt_total_10 = 10,
}; template<typename T>
struct repo_
{
// 用作互斥访问缓冲区
std::mutex _mtx_queue; // 缓冲区最大size
unsigned int _count_max_queue_10 = 10; // 缓冲区
std::queue<T> _queue; // 缓冲区没有满,通知生产者继续生产
std::condition_variable _cv_queue_not_full; // 缓冲区不为空,通知消费者继续消费
std::condition_variable _cv_queue_not_empty; // 用于生产者之间的竞争
std::mutex _mtx_pro;
// 计算当前已经生产了多少数据了
unsigned int _cnt_cur_pro = 0; // 用于消费者之间的竞争
std::mutex _mtx_con;
// 计算当前已经消费多少数据了
unsigned int _cnt_cur_con = 0; repo_(const unsigned int count_max_queue = 10) :_count_max_queue_10(count_max_queue)
, _cnt_cur_con(0) {
;
} repo_(const repo_&instance) = delete;
repo_& operator = (const repo_& instance) = delete;
repo_(const repo_&&instance) = delete;
repo_& operator = (const repo_&& instance) = delete; }; template <typename T>
using repo = repo_<T>; //---------------------------------------------------------------------------------------- // 生产者生产数据
template <typename T>
void thread_produce_item(const int &thread_index, repo<T>& param_repo, const T& repo_item)
{
std::unique_lock<std::mutex> lock(param_repo._mtx_queue); // 1. 生产者只要发现缓冲区没有满, 就继续生产
param_repo._cv_queue_not_full.wait(lock, [&] { return param_repo._queue.size() < param_repo._count_max_queue_10; }); // 2. 将生产好的商品放入缓冲区
param_repo._queue.push(repo_item); // log to console
std::cout << "生产者" << thread_index << "生产数据:" << repo_item << "\n"; // 3. 通知消费者可以消费了
//param_repo._cv_queue_not_empty.notify_one();
param_repo._cv_queue_not_empty.notify_one();
} //----------------------------------------------------------------------------------------
// 消费者消费数据 template <typename T>
T thread_consume_item(const int thread_index, repo<T>& param_repo)
{
std::unique_lock<std::mutex> lock(param_repo._mtx_queue); // 1. 消费者需要等待【缓冲区不为空】的信号
param_repo._cv_queue_not_empty.wait(lock, [&] {return !param_repo._queue.empty(); }); // 2. 拿出数据
T item;
item = param_repo._queue.front();
param_repo._queue.pop(); std::cout << "消费者" << thread_index << "从缓冲区中拿出一组数据:" << item << std::endl; // 3. 通知生产者,继续生产
param_repo._cv_queue_not_full.notify_one(); return item;
} //---------------------------------------------------------------------------------------- /**
* @ brief: 生产者线程
* @ thread_index - 线程标识,区分是哪一个线程
* @ count_max_produce - 最大生产次数
* @ param_repo - 缓冲区
* @ return - void */
template< typename T >
void thread_pro(const int thread_index, const int count_max_produce, repo<T>* param_repo)
{
if (nullptr == param_repo || NULL == param_repo)
return; while (true)
{
bool is_running = true; {
// 用于生产者之间竞争
std::unique_lock<std::mutex> lock(param_repo->_mtx_pro); // 缓冲区没有满,继续生产
if (param_repo->_cnt_cur_pro < cnt_total_10)
{
thread_produce_item<T>(thread_index, *param_repo, param_repo->_cnt_cur_pro);
++param_repo->_cnt_cur_pro;
}
else
is_running = false;
} std::this_thread::sleep_for(std::chrono::microseconds(16));
if (!is_running)
break;
}
} /**
* @ brief: 消费者线程
* @ thread_index - 线程标识,区分线程
* @ param_repo - 缓冲区
* @ return - void */
template< typename T >
void thread_con(const int thread_index, repo<T>* param_repo)
{
if (nullptr == param_repo || NULL == param_repo)
return; while (true)
{
bool is_running = true;
{
std::unique_lock<std::mutex> lock(param_repo->_mtx_con);
// 还没消费到指定的数目,继续消费
if (param_repo->_cnt_cur_con < cnt_total_10)
{
thread_consume_item<T>(thread_index, *param_repo);
++param_repo->_cnt_cur_con;
}
else
is_running = false; } std::this_thread::sleep_for(std::chrono::microseconds(16)); // 结束线程
if ((!is_running))
break;
}
} // 入口函数
//---------------------------------------------------------------------------------------- int main(int argc, char *argv[], char *env[])
{
// 缓冲区
repo<int> repository;
// 线程池
std::vector<std::thread> vec_thread; // 生产者
vec_thread.push_back(std::thread(thread_pro<int>, 1, cnt_total_10, &repository));
vec_thread.push_back(std::thread(thread_pro<int>, 2, cnt_total_10, &repository)); // 消费者
vec_thread.push_back(std::thread(thread_con<int>, 1, &repository));
vec_thread.push_back(std::thread(thread_con<int>, 2, &repository)); for (auto &item : vec_thread)
{
item.join();
} return 0;
}
  • 1.9 可能的结果

最新文章

  1. DFS序+线段树+bitset CF 620E New Year Tree(圣诞树)
  2. AIX 5L 系统管理技术 —— 存储管理——卷组
  3. 图解Tomcat类加载机制
  4. 【iCore3 双核心板_FPGA】实验二十四:Niosii——SDRAM读写实验
  5. hdu 1710 二叉树的遍历
  6. Spring学习笔记之初始化和销毁方法的调用次序
  7. sql server 2012 如何收缩事务日志
  8. 根据不同的分辨率选择不同的css文件
  9. iOS项目架构文档
  10. 通过数组初始化链表的两种方法:指向指针的引用node *&amp;tail和指向指针的指针(二维指针)node **tail
  11. Android(通用机能)
  12. @font-face(css3属性)实如今网页中嵌入随意字体
  13. Select下拉列表选择自动提交form表单数据
  14. 用Socket编写的聊天小程序
  15. java中new关键字和newInstance()方法的区别
  16. vue中的路由的跳转的参数
  17. 114. Flatten Binary Tree to Linked List 把二叉树变成链表
  18. Vue2 第一天学习
  19. HDU 2256 Problem of Precision(矩阵)
  20. delphi 动态绑定代码都某个控件

热门文章

  1. 【R】ggplot2的facet_warp/grid如何实现超过两类水平的分面?
  2. 66-Reorder List
  3. 硬盘SSD、HDD和SSHD都是什么意思?哪种类型硬盘最好?
  4. Notepad++—显示代码对齐是使用了制表符还是空格
  5. mysql—MySQL数据库中10位时间戳转换为标准时间后,如何对标准时间进行加减X天处理
  6. open 函数小结
  7. mongoDB整个文件夹拷贝备份还原的坑
  8. javaSE高级篇4 — 反射机制( 含类加载器 ) — 更新完毕
  9. Vue 前端配置多级目录实践(基于Nginx配置方式)
  10. 字符串属性转变List属性存入数据库