//

//  main.cpp

//  test1

//

//  Created by sofard on 2018/12/27.

//  Copyright © 2018年 dapshen. All rights reserved.

//

#include <iostream>

#include <thread>

#include <future> //异步网络请求类似

#include <utility>

#include <vector>

#include <numeric>

#include <mutex>

#include <condition_variable>

#include <chrono>

#include <list>

#include <algorithm>

#include <deque>

#include <stack>

#include <exception>

#include <memory> // shared_ptr<>

#include <map>

#include <boost/thread/shared_mutex.hpp>

#include <queue>

#include <chrono>

#include <atomic>

void hellow()

{

std::cout << "hellow,concurrent!" << std::endl;

}

int main(int argc, const char * argv[]) {

// insert code here...

std::thread th1{hellow};

th1.join();

//std::cout << "Hello, World!\n";

return 0;

}

/////////////////////

void do_something(int i=8)

{

}

void do_something_else()

{

}

class background_task

{

public:

void operator()() const

{

do_something();

do_something_else();

}

};

background_task back_init1;

std::thread my_thread2(background_task);

std::thread my_thread3(background_task()); //函数声明

std::thread my_thread4 {background_task()}; //用大括号初始化还是挺好的

std::thread my_thread5([]{do_something();do_something_else();

}); //使用lamada表达式也可以

//detach()分离

//join()等待线程完成,把启动线程全部开启

//join,需要细心挑选运行的位置

////////////

void do_something()

{

}

struct my_func{

int& i;

my_func(int& _i):i(_i){};

void operator()()

{

for(unsigned j =0 ;j< 100000; ++j)

do_something(i);

}

};

////RAII 资源包裹类解决join之前抛出异常

class thread_guard

{

std::thread& t;

public:

explicit thread_guard(std::thread& _t):t(_t)

{}

~thread_guard()

{

if(t.joinable())

{

t.join();

}

}

thread_guard(thread_guard const&) = delete;

thread_guard& operator=(const thread_guard &) = delete;

};

void do_something_in_main()

{

}

void f()

{

int some_local_state =0;

my_func new_fun(some_local_state);

std::thread th1{new_fun};

thread_guard self_raii(th1);

do_something_in_main(); //若抛出异常

//self_raii 一定会析构,一定会join启动

}

///deatch()实现多文档编辑功能

class user_command

{

public:

int type;

};

const int open_new_document =1;

user_command get_user_input()

{

return user_command();

}

std::string ss1="123";

std::string& get_filename()

{

return ss1;

}

void process_user_input(const user_command&){}

void edit_document(const std::string& file_name )

{

//初始化ui

bool flag_end =false;

while(flag_end)

{

user_command new_cmd = get_user_input();

if(new_cmd.type  == open_new_document )

{

const std::string file_n = get_filename();

std::thread th8{edit_document,file_n};

th8.detach();

}

else

{

process_user_input(new_cmd);

}

}

}

/////////线程传参数

class widget_data

{

};

void process_widget_data(widget_data&){}

void my_func90(int i,const widget_data& s);

void oops(int some_param)

{

widget_data data;

std::thread th9(my_func90,3,data);

//简单参数拷贝复制,存在线程中修改的是临时数据的情况

std::thread th10(my_func90,3,std::ref(data));

//使用ref提升为引用

th9.join();

process_widget_data(data);

}

//第二种高级

class X

{

public:

void do_lengthy_work();

};

X my_x;

//另外一种传递函数,再传递拥有的对象的方法,第三个参数开始是函数参数

//参数可以移动,不能拷贝  (unique_ptr<> 移动构造函数,移动拷贝函数等)

//使用std::move()

/*

std::thread() 和 std::unique_ptr<>一样 每个实例占有一部分资源所有权,资源不会被复制,只能转移

*/

std::thread th11(&X::do_lengthy_work,&my_x);

////////转移线程所有权

void some_function()

{}

void some_function_else()

{}

void fuck_10_test()

{

std::thread th12{some_function};

std::thread th13=std::move(th12);

th12 =std::thread(some_function_else);//

std::thread th14;

th14=std::move(th13);

//此时th12有资源,th14也有资源

th12=std::move(th14); //th12会调用terminate(),析构掉,程序异常

}

/////////量产固定线程代码

void do_work(unsigned id){};

void init_more_thread()

{

std::vector<std::thread> threads;

for(unsigned i=0;i<20;++i)

threads.push_back(std::thread(do_work,i));

std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join));

}

//////升级1:运行时决定线程的数量

int number_thread = std::thread::hardware_concurrency();//获取参考

//自己封装下std::accumulate 单线程求和

template<typename Iterator,typename T>

struct accumulate_block

{

void operator()(Iterator first,Iterator last,T& result)

{

result =std::accumulate(first,last,result);

}

};

void test11()

{

accumulate_block<std::vector<int>::iterator, int> add_t;

std::vector<int> jiashu;

int jie;

add_t(jiashu.begin(), jiashu.end(), jie);

std::accumulate(jiashu.begin(), jiashu.end(),jie);

}

//多线程求和,不带异常处理

template<typename Iterator,typename T>

T parallel_accumulate(Iterator first,Iterator last,T init)

{

unsigned long const length =std::distance(first, last);

if(!length)

return init;

//每个线程最少25个数据

unsigned long const min_per_thread = 25;

unsigned long const max_thread =(length + min_per_thread -1)/min_per_thread;

//获取硬件参考线程数量

unsigned long const hardware_thread = std::thread::hardware_concurrency();

unsigned long const num_thread =

std::min(hardware_thread != 0 ? hardware_thread : 2,max_thread);

//每个线程分配的数量

unsigned long const block_size = length/num_thread;

std::vector<T> results(number_thread);

std::vector<std::thread> threads(number_thread);

Iterator block_start =first;

for(unsigned long i=0;i<number_thread-1;++i)

{

Iterator block_end =block_start;

std::advance(block_end, block_size);

//每次都传参到线程中,能否有其他方式返回其运算结果??

threads[i]=std::thread

(accumulate_block<Iterator,T>(),block_start,block_end,std::ref(results[i]));

block_start=block_end;

}

accumulate_block<Iterator,T>()(block_start,last,std::ref(results[number_thread-1]));

//等待累加完成

std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join));

return std::accumulate(results.begin(), results.end(), init);

};

void test_para()

{

std::vector<int> jiashu;

int jie;

//函数模板会自动推断参数类型,也可以给定参数类型

jie=parallel_accumulate<>(jiashu.begin(),jiashu.end(),0);

}

/////识别线程

//标识类型为std::thread::id ,可以比较复制,做很多事情

//////////第三章 线程间共享数据

//race condition,操作非原子

/*

1.保护机制,只有修改的线程能看到中间状态 c++标准库机制

2.修改数据结构和不变量设计,保证结构完成不可分割的变化,lock-free programming

3.软件事物编程(STM),类似于数据库,提交,回滚等

*/

//3.2 使用互斥量保护共享数据,这里使用的是RAII互斥量

class test_lock

{

private:

std::list<int> some_list;

std::mutex some_mutex;

public:

void add_to_list(int new_value)

{

std::lock_guard<std::mutex> guard(some_mutex);

some_list.push_back(new_value);

}

bool list_contains(int value_to_find){

std::lock_guard<std::mutex> guard(some_mutex);

return std::find(some_list.begin(), some_list.end(), value_to_find) == some_list.end();

}

//接口设计需要谨慎,不能留  成员的指针或者引用

};

//3.22 精心组织代码保护共享数据

//切勿将保护数据的指针或引用传递到互斥锁作用域之外,亦或是参数形式传递到用户提供的函数

class some_data{

int a;

std::string b;

public:

void do_something(){};

};

//对data类,通过类似于RAII的包裹,同时实现对data类所有操作的并发控制 (本意如此,可惜可以通过指针盗取data数据,绕过包裹)

class data_wrapper{

some_data data;

std::mutex m;

public:

template<typename Function>

void process_data(Function func){

std::lock_guard<std::mutex> l(m);

func(data); //这尼玛什么鬼语法

}

};

some_data * unprotected;

void malicious_function(some_data &th){

unprotected=&th;

}

data_wrapper x;

void foo100(){

//给每一个函数调用加锁

x.process_data(malicious_function);

unprotected->do_something();

}

//3.2.3 发现接口内在的条件竞争

/*

考虑一个stack ,empty(),top(),pop()接口之间存在竞争,如何解决?

构造函数分配空间时抛出bad_alloc异常,如何解决?给类自定义该异常处理函数,返回再分配..

*/

template <typename T,typename Container = std::deque<T> >

class my_stack

{

public:

//使用deque<T>来初始化stack<T>

explicit my_stack(const Container& ){};

//转移构造函数

explicit my_stack(Container && = Container());

//模板构造函数

template <class Alloc>

explicit my_stack(const Alloc&);

template <class Alloc>

my_stack(const Container&,const Alloc&);

template <class Alloc>

my_stack(const Container&& ,const Alloc&);

template <class Alloc>

my_stack(my_stack&&,const Alloc&);

//const 成员函数

bool empty() const;

size_t size() const;

//const成员函数 与 非const重载

T& top();

const T& top() const;

//右值引用,传参数时,转移所有权

void push(const T&);

void push(T&&);

void pop();

void swap(my_stack&&);

};

//empty,与size的结果不可靠,调用后,数据可能被改掉

void visit_stack()

{

std::stack<int> s;

if(!s.empty()){

int const value =s.top();

// top() 与empty存在竞争

s.pop();

do_something(value);

}

}

//假设现在将stack里面的任务分摊给多个线程,那么top()可能获取到一样 top和pop存在竞争

//解决办法:

//若将top和pop合并,返回栈顶元素,但是当拷贝大数据时,可能抛出异常(bad_alloc),栈顶元素会丢失..

/*

新问题解决办法:

1.可以先分配大数据,再传入引用,移动拷贝

使用std::is_nothrow_copy_constructible<<#class _Tp#>>

std::is_nothrow_move_constructible<<#class _Tp#>>等编译选项

2.返回指向弹出值的指针(使用共享指针)

内部类型 int float 的内存管理开销相对比较大,可能不划算

3.组合使用前面方法

*/

//线程安全的stack

struct empty_stack:std::exception

{

const char* what() const throw();

};

template <typename T>

class threadsafe_stack {

std::stack<T> data;

mutable std::mutex m;

public:

threadsafe_stack(){};

threadsafe_stack(const threadsafe_stack& other){

std::lock_guard<std::mutex> lock(other.m);

data=other.data;

};

threadsafe_stack& operator=(const threadsafe_stack&) = delete;

void push(T new_value){

std::lock_guard<std::mutex> lock(m);

data.push(new_value);

};

std::shared_ptr<T> pop()

{

std::lock_guard<std::mutex> lock(m);

if(!data.empty()) throw empty_stack(); //检查栈是否为空

std::shared_ptr<T> const res(std::make_shared<T>(data.top()));

data.pop();

return res;

};

void pop(T& value)

{

std::lock_guard<std::mutex> lock(m);

if(!data.empty()) throw empty_stack(); //检查栈是否为空

value=data.top();

data.pop();

};

bool empty() const{

std::lock_guard<std::mutex> lock(m);

return data.empty();

};

};

//3.2.4死锁

/*

避免死锁:A,B 2把锁, 按照固定的顺序上锁,解锁

*/

//交换操作中使用 std::lock() 和std::lock_guard()

class some_big_object

{};

void swap(some_big_object& lhs,some_big_object & rhs)

;

class X_2

{

some_big_object some_detail;

std::mutex m;

public:

X_2(some_big_object const& sd):some_detail(sd){};

friend void swap(X_2 lhs,X_2& rhs)

{

if(&lhs==&rhs)

return ;

std::lock(lhs.m,rhs.m);//可能抛出异常,抛出异常会自动解锁

//std::adopt_lock()表示对象已经上锁,不用上新锁

std::lock_guard<std::mutex> lock_a(lhs.m,std::adopt_lock);

std::lock_guard<std::mutex> lock_b(rhs.m,std::adopt_lock);

/* 使用unique_lock时的代码

std::unique_lock<std::mutex>    la(lhs.m,std::defer_lock);

std::unique_lock<std::mutex>    lb(rhs.m,std::defer_lock);

std::lock(la,lb);

*/

swap(lhs.some_detail,rhs.some_detail);

}

};

//3.2.5避免死锁的进阶指导

/*

建议1:获取到一个锁后,尽量不去请求其他的锁

若要请求多个锁,使用std::lock(....)一次性获取

建议2:

避免在持有锁时,调用用户提供的代码(其他人写得代码)

使用固定顺序获取锁

定义遍历的顺序

对应用分层,解耦

*/

//层次避免死锁

//定义层次锁

class hierarchical_mutex

{

std::mutex internal_mutex;

unsigned long const hierarchy_value;

unsigned long previous_hierarchy_value;

static thread_local unsigned long this_thread_hierarchy_value;

void check_for_hierarchy_violation()

{

if(this_thread_hierarchy_value <= hierarchy_value)

{

throw std::logic_error("mutex hierarchy violated");

}

}

void update_hierarchy_value()

{

previous_hierarchy_value = this_thread_hierarchy_value;

this_thread_hierarchy_value = hierarchy_value;

}

public:

explicit hierarchical_mutex(unsigned long value):

hierarchy_value(value),previous_hierarchy_value(0)

{

}

void lock()

{

check_for_hierarchy_violation();

internal_mutex.lock();

update_hierarchy_value();

}

void unlock()

{

this_thread_hierarchy_value = previous_hierarchy_value ;

internal_mutex.unlock();

}

bool try_lock()

{

check_for_hierarchy_violation();

if(!internal_mutex.try_lock())

return false;

update_hierarchy_value();

return true;

}

};

thread_local unsigned long hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);

//每一个实例锁都有一个自己当前的层次,每个线程维护一个static层次 表示当前线程所在的层次

//应用分层,定义出所有每层可能上锁的互斥量,给予层级初值

//上锁:当前线程层次(跑到最底层的线程) < 当前上锁的层次 ,终止操作

hierarchical_mutex high_level_mutex(10000);

hierarchical_mutex low_level_mutex(5000);

//低层次的事情

int do_low_level_stuff();

int low_level_func(){

std::lock_guard<hierarchical_mutex> lk(low_level_mutex);

return do_low_level_stuff();

}

//高层次的事情

void high_level_stuff(int some_param);

void high_level_func(){

std::lock_guard<hierarchical_mutex> lk(high_level_mutex);

high_level_stuff(low_level_func());

}

//执行顺序由高到底,没问题

void thread_a(){

high_level_func();

}

hierarchical_mutex other_mutex(100);

void do_other_stuff();

void other_stuff(){

high_level_func();

do_other_stuff();

}

//先锁住100

//再视图访问10000层级时,10000层级可能被其他线程锁住,形成死锁

/*解决办法

自定义层次锁:当满足这种情况时,抛出异常,析构释放变量的时候,会释放100层次的锁

*/

void thread_b()

{

std::lock_guard<hierarchical_mutex> lk(other_mutex);

other_stuff();

}

///3.2.6 std::unique_lock---- 灵活的锁

//3.2.7不同域传递互斥量

/*

std::unique_lock<> 没有与自身相关的互斥量

*/

//函数get_lock()锁住了互斥量,然后准备数据,返回锁,将对数据的锁,转移给处理函数

void prepare_data();

std::unique_lock<std::mutex> get_lock()

{

extern std::mutex some_mutex;

std::unique_lock<std::mutex> lk(some_mutex);

prepare_data();

return lk; //lk被释放,但是先将所有权转移出去

}

void process_data()

{

std::unique_lock<std::mutex> lk(get_lock());

do_something(1);

//lk.unlock() 可以选择性的释放,lk在销毁前会自动释放

}

//3.2.8 锁的粒度 通过std::unique_lock控制

std::mutex the_mutex;

void get_and_process_data()

{

std::unique_lock<std::mutex> my_lock(the_mutex);

//准备数据操作,data1

my_lock.unlock();

//操作局部数据data1

//再次锁住

my_lock.lock();

//写数据操作

}//自动析构释放锁

//在比较操作符中一次锁定一个互斥量

class Y {

int some_detail;

mutable std::mutex m;

int get_detail() const

{

std::lock_guard<std::mutex> lock_a(m);

return some_detail;

}

public:

Y(int sd):some_detail(sd){};

friend bool operator==(const Y& lhs,const Y& rhs)

{

if(&lhs==&rhs)

return true;

int const lhs_value = lhs.get_detail();

int const rhs_value = rhs.get_detail();

return lhs_value == rhs_value;

}

};

///单例模式 std::once_flag std::call_once配合使用

class some_resource{

public:

void do_something();

};

std::shared_ptr<some_resource> resource_ptr;

std::once_flag resource_flag;

void init_resource()

{

resource_ptr.reset(new some_resource);

}

void foo()

{

//多线程运行时,该语句只会执行一次

std::call_once(resource_flag, init_resource);

resource_ptr->do_something();

}

//C++11 最安全的单例是 定义为函数static变量

////读写锁

//对于一个dns缓存,写得时候不能读,读得时候可以读

class dns_entry

{

public:

std::string eny;

};

class dns_cache

{

typedef std::map<std::string,dns_entry> hash_entry;

hash_entry entries;

mutable boost::shared_mutex entry_mutex;

public:

//使用读锁

dns_entry find_entry(std::string const & domain)

{

boost::shared_lock<boost::shared_mutex> lo(entry_mutex);

const hash_entry::const_iterator it = entries.find(domain);

return (it == entries.end()) ? dns_entry():it->second;

}

//使用写锁添加数据

void update_or_add_entry(const std::string& key,const dns_entry& value)

{

std::lock_guard<boost::shared_mutex> lo(entry_mutex);

entries[key]=value;

}

};

//嵌套锁即可重入锁,类多个成员函数相互调用,且每个函数都加锁的时候

//大佬们都不推荐使用可重入锁

void test_ul()

{

std::recursive_mutex mmm;

std::lock_guard<std::recursive_mutex> lk(mmm);

std::unique_lock<std::recursive_mutex> uk(mmm);

}

/////第四章 同步并发操作

//4.1 等待一个事件或其他条件

//条件变量:1个或多个线程等待条件的达成,条件变量与多个时间或者条件相关

//广播条件达成消息

void test_con(){

std::condition_variable cd1;

std::mutex mx1;

std::condition_variable_any cd_any;

//可以与任何满足mutex的变量配合使用

}

//使用条件变量做唤醒的代码

class data_chunk{

public:

std::string st;

void process_data_chunk()

{};

};

std::mutex mut;

std::queue<data_chunk> data_queue;

std::condition_variable data_cond;

bool more_data_to_prepare()

{

return true;

}

bool is_last_data_chunk(data_chunk& )

{

return false;

}

data_chunk prepare_data_chunk(){

return data_chunk();

};

void data_preparetion_thread()

{

while(more_data_to_prepare())

{

data_chunk const data =  prepare_data_chunk();

std::lock_guard<std::mutex> lk(mut);

data_queue.push(data);

data_cond.notify_one();

}

}

void data_processing_thread()

{

while(true)

{

std::unique_lock<std::mutex> lk(mut);

//条件变量,以某个条件函数检测 某互斥量,条件不满足 加入休眠队列,同时释放锁

//条件满足,继续执行

data_cond.wait(lk,[]{return !data_queue.empty();});

data_chunk data= data_queue.front();

data_queue.pop();

lk.unlock();  //拿到数据立马放锁

data.process_data_chunk() ; // 处理数据

if(is_last_data_chunk(data))

break;

}

}

//4.1.2使用条件变量构建线程安全队列

//stl中的队列

template<class T,class Container = std::deque<T>>

class my_queue{

public:

explicit my_queue(Container const & );

explicit my_queue(Container && = Container());

template<class Alloc>

explicit my_queue(const Alloc&);

template<class Alloc>

my_queue(const Container&,const Alloc&);

template<class Alloc>

my_queue(Container&,const Alloc&);

template<class Alloc>

my_queue(my_queue&&,const Alloc&);

void swap(my_queue&);

//查询操作

bool empty() const;

size_t size() const;

const T& front() const;

T& back()  ;

T& front() ;

const T& back() const;

//修改操作

void push(const T&);

void push(T&& );

void pop();

template<class ...Args>

void emplace(Args&&... args);

};

//线程安全的队列

template<typename T>

class threadsafe_queue

{

private:

std::mutex mut;

std::queue<T> local_queue;

std::condition_variable data_cond;

public:

threadsafe_queue();

threadsafe_queue(const threadsafe_queue&);

threadsafe_queue& operator=(const threadsafe_queue&) =delete;

void push(T& new_value)

{

std::lock_guard<std::mutex> lk(mut);

local_queue.push(new_value);

data_cond.notify_one();

};

//出队列,使用非条件变量出队列

bool try_pop(T& value);

std::shared_ptr<T> try_pop()

{

std::lock_guard<std::mutex> lk(mut);

if(local_queue.empty())

return std::shared_ptr<T>();

std::shared_ptr<T> res(std::make_shared<T>(local_queue.fornt()));

return res;

};

void wait_and_pop(T&value)

{

std::unique_lock<std::mutex> lk(mut);

data_cond.wait(lk,[this]{return !local_queue.empty();});

value=local_queue.fornt();

local_queue.pop();

};

//返回只能指针版本,资源消耗稍微大一些

std::shared_ptr<T> wait_and_pop()

{

std::unique_lock<std::mutex> lk(mut);

data_cond.wait(lk,[this]{return !local_queue.empty();});

//此处构造函数可能会抛出异常,将local_queue 结构改成 std::queue<shared_ptr<T>> 可以解决

std::shared_ptr<T> res(std::make_shared<T>(std::move(local_queue.front())));

local_queue.pop();

return res;

};

bool empty() const

{

std::lock_guard<std::mutex> lk(mut);

return local_queue.empty();

};

};

///使用期望等待,一次性事件 一次性,一次性,一次性

//带返回值的后台任务,比如网络请求

int find_the_answer_to_ltuae();

void do_other_stuff();

void mai_mm()

{

std::future<int> the_answer = std::async(find_the_answer_to_ltuae);

do_other_stuff();

std::cout<<"the answer is" << the_answer.get() <<std::endl;

}

struct X_i

{

void foo(int,std::string const&);

std::string bar(std::string const&);

};

X_i xi;

//通过地址+对象方式,启动异步带返回值任务

auto fi =std::async(&X_i::foo,&xi, 42,"hellow");

//仿函数

struct Y_i

{

double operator()(double);

};

Y_i yi;

auto f3 = std::async(Y_i(), 3.141);//隐含使用了移动构造函数

auto f4 = std::async(std::ref(yi), 2.718);

X_i baz(X_i&);

void qita()

{

auto f5=std::async(baz, std::ref(xi));

//async有额外的2个参数可供选择

//std::launch::async表明独立线程,

//std::launch::deferred表明线程在wait()或者get()后再运行

std::future<X_i> f6 =std::async(std::launch::async | std::launch::deferred,baz,std::ref(xi));

std::future<X_i> f7 =std::async(std::launch::deferred,baz,std::ref(xi));

f6.wait();

}

//4.2.2任务与期望

//对任务细节抽象

//当一个粒度较大的操作可以分解为独立的子任务时,每个子任务可以包含在一个

//std::packaged_task<函数签名>中,之后这个实例将传递到任务调度器或者线程池中

//std::packaged_task<>的特化-----局部类定义,任务带2个参数,返回string

template<>

class std::packaged_task<std::string(std::vector<char>* ,int)>

{

public:

template <typename Callable>

explicit packaged_task(Callable&& f);

std::future<std::string> get_future();

void operator()(std::vector<char>*,int); //()任务执行

};

//使用 std::packaged_tast 执行一个图形界面线程

std::mutex m_image;

std::deque<std::packaged_task<void()>> tasks;

bool gui_shutdown_message_received();

void get_and_process_gui_message();

void gui_thread()

{

while(!gui_shutdown_message_received())

{

get_and_process_gui_message();

std::packaged_task<void()> task;

{

std::lock_guard<std::mutex> lk(m_image);

if(tasks.empty())

continue;

task=std::move(tasks.front());

tasks.pop_front();

}

task(); //任务执行,执行完成后,期望 会被设置为就绪,可以取出一次

}

}

std::thread gui_bg_thread(gui_thread);

//将任务打包后,传给队列线程,返回期望的指针

template<typename Func>

std::future<void> post_task_for_gui_thread(Func f)

{

std::packaged_task<void()> task(f); //打包任务

std::future<void> res = task.get_future();

std::lock_guard<std::mutex> lk(m_image);

tasks.push_back(std::move(task));

return res;

}

//4.2.3使用std::promises

//上章中 包裹任务(包裹了整个函数,结果自动生成)--期望结果

//考虑一个线程处理多个连接事件,来自不同端口连接的数据包乱序方式进行处理

//数据包也将以乱序的方式进入队列

//一对std::promise/std::future 单线程处理多接口的实现

//promise只是包裹了返回值类型,需要自己set_value,或者set_exception设置结果

class data_packet

{

public:

int id;

};

class payload_type

{};

class connect_net{

private:

bool incoming;

bool outgoing;

std::string con_ip;

public:

connect_net(connect_net&&);

connect_net(std::string & s):con_ip(s),incoming(false),outgoing(false){};

void get_connect()

{

if(check_if_real_con())

incoming=true;

}

bool check_if_real_con()

{

return false;

}

void close_connect()

{

outgoing=false;

}

//

bool has_incoming_data()

{

return incoming;

};

bool has_outgoing_data()

{

return outgoing;

};

data_packet incoming_data()

{

return data_packet();

};

std::promise<payload_type> get_promise(int)

{

return std::promise<payload_type>();

};

};

class data_packet;

typedef std::vector<connect_net> connection_set;

bool done(connection_set&);

void process_connection(connection_set& connections)

{

while(!done(connections))

{

for(connection_set::iterator connection = connections.begin(),end =connections.end();

connection != end;

++connection)

{

if(connection->has_incoming_data())

{

data_packet data = connection->incoming_data();

auto p =connection->get_promise(data.id);

}

if(connection->has_outgoing_data())

{

}

}

}

};

//包裹任务会自动处理异常,future能存储返回值或者异常(掩盖住,调用的时候不抛出)

//为promise期望存储异常

struct _data

{

int32_t value;

};

_data data = { 0 };

int mainfuc()

{

std::promise<_data> data_promise;      //创建一个承诺

std::future<_data> data_future = data_promise.get_future();     //得到这个承诺封装好的期望

std::thread prepare_data_thread([](std::promise<_data> &data_promise){

std::this_thread::sleep_for(std::chrono::seconds(2));    //模拟生产过程

try {

data_promise.set_value({ 1 });       //通过set_value()反馈结果

} catch (...) {

data_promise.set_exception(std::current_exception() );

//如果没法设置结果,那么需要手动设置异常

}

}, std::ref(data_promise));

std::thread process_data_thread([](std::future<_data> &data_future){

std::cout << data_future.get().value << std::endl;    //通过get()获取结果

}, std::ref(data_future));

prepare_data_thread.join();

process_data_thread.join();

system("pause");

return 0;

}

//4.2.5多个线程的等待

//std::future 不是线程安全的,多个线程访问,只有第一个有结果,后面的可能会抛异常..其是移动的

std::shared_future<int> sfi; //这个是可拷贝的,

//4.3 心跳连接,超时等如何处理,限定时间等待

//4.3.1 时钟类包含:

//现在时间,时间类型,节拍,同时时钟节拍的分布判断时钟是否稳定

auto t_sys=std::chrono::system_clock::now(); //不稳定

auto t_steady=std::chrono::steady_clock::now();//稳定

//auto t_highresolution = std::chrono::high_resolution_clock;

//时钟周期 std::ratio<1,25>; 1/25一秒一个

//时间间隔

std::chrono::duration<short,std::ratio<60,1>> time1; //60秒为单位

std::chrono::duration<double,std::ratio<1,1000>> time2;//1ms为单位

std::chrono::nanoseconds time3;

std::chrono::duration<double,std::centi> time4(100);//10ms为单位

auto count = time4.count();

//显示转换,截断

std::chrono::milliseconds ms(54834);

std::chrono::seconds s = std::chrono::duration_cast<std::chrono::seconds>(ms);

int some_tesk(){return 100;};

void do_something_with(int i);

std::future<int> future_1 = std::async(some_tesk);

void test_fff(){

//wait_for,超时等待结果

if(future_1.wait_for(std::chrono::milliseconds(35)) == std::future_status::ready)

do_something_with(future_1.get());

}

//4.3.3时间点

void test_time_point(){

auto start = std::chrono::high_resolution_clock::now();

do_something_with(10);

auto stop =std::chrono::high_resolution_clock::now();

std::cout<<"do somethig time: "<<std::chrono::duration<double,std::chrono::seconds>(stop-start)<<"seconds"<<std::endl;

}

//超时等待一个条件变量(没有事件可等待的时候的),类似于定时器

std::condition_variable cv;

bool work_done;

std::mutex m;

bool wait_loop()

{

auto const timeout=std::chrono::system_clock::now()+std::chrono::microseconds(5000);

std::unique_lock<std::mutex> lk(m);

while(!work_done)

{

if(cv.wait_until(lk,timeout) == std::cv_status::timeout )

break;

}

return work_done;

}

//4.3.4具有超时功能的函数,wait_for(),wait_until()..

/*

std::this_thread

std::condition_variable;

std::condition_variable_any;

std::timed_mutex;

std::recursive_timed_mutex;

std::unique_lock<<#class _Mutex#>>;

std::future<<#class _Rp#>>;

std::shared_future<<#class _Rp#>>;

*/

//4.4实战

//4.12 快速排序--顺序实现版(非并行)

template <typename T>

std::list<T> sequential_quick_sort(std::list<T> input)

{

if(input.empty())

return input;

std::list<T> result;

//splice 插入算法(把后面框出来的元素,前插到指定迭代器)

//运行结果result 包含一个元素

result.splice(result.begin(),input,input.begin());

const T& pivot = *result.begin();

auto divide_point = std::partition(input.begin(), input.end(),

[&](const T& t){return t<pivot;});

std::list<T> lower_part;

lower_part.splice(lower_part.end(),input,input.begin(),divide_point);

auto new_lower(sequential_quick_sort(std::move(lower_part) ));

auto new_higher(sequential_quick_sort(std::move(input)));

result.splice(result.begin(),new_lower);

result.splice(result.end(),new_higher);

return result;

}

//4.13 快速排序--"期望"并行版,递归开辟线程,垃圾实现

template<typename T>

std::list<T> parallel_quick_sort(std::list<T> input)

{

if(input.empty())

return input;

std::list<T> result;

result.splice(result.begin(),input,input.begin());

const T& pivot = *result.begin();

auto divide_point = std::partition(input.begin(), input.end(),

[&](const T& t){return t<pivot;});

std::list<T> lower_part;

lower_part.splice(lower_part.end(),input,input.begin(),divide_point);

//开启一个线程,对小半边排序,

std::future<std::list<T>> new_lower

(std::async(&parallel_quick_sort<T>(),std::move(lower_part)));

auto new_higher(parallel_quick_sort(std::move(input)));

result.splice(result.begin(),new_lower.get());

result.splice(result.end(),new_higher);

return result;

}

//4.14 包裹 std::packaged_tast 与 std::thread

template<typename F,typename A> //内嵌类型必须typename显示表明

std::future<typename std::result_of<F(A&&)>::type > spawn_task(F&& f,A&& a)

{

typedef typename std::result_of<F(A&&)>::type result_type;

std::packaged_task<result_type(A&&)> task(std::move(f));

std::future<result_type> res(task.get_futury());

std::thread t(std::move(task),std::move(a));

return res;

}

//ATM逻辑类的简单实现

/*

struct card_inserted{

std::string account;

};

namespace messageing {

class receiver{

public:

std::condition_variable mes;

void receive(std::string &);

};

class sender{

public:

void send(std::string &);

};

}

std::string welcome("请插入银行卡!");

class atm{

messageing::receiver incoming;

messageing::sender bank;

messageing::sender interface_hard;

void (*state)();

std::string account;

std::string pin;

void waiting_for_card()

{

//

interface_hard.send(welcome);

incoming.wait()

}

};

*/

//第5章 底层内存模型与原子操作

//标准原子类型鼻祖

std::atomic_flag f_ato =ATOMIC_FLAG_INIT; //初始化为"清除"

//实现自旋互斥锁(阻塞测试锁)

class spinlock_mutex

{

std::atomic_flag flag;

public:

spinlock_mutex():flag(ATOMIC_FLAG_INIT){}

void lock()

{

//test and set 原子操作,阻塞式读取

while(flag.test_and_set(std::memory_order_acquire));

}

void unlock()

{

flag.clear(std::memory_order_release);

}

};

//atomic类型

void test_atomic_bool(){

std::atomic<bool> atomic_b(true);

atomic_b=false; //以上2句都是原子操作

//对原子变量的读,写,读改写

bool x=atomic_b.load(std::memory_order_acquire);

atomic_b.store(true);

x=atomic_b.exchange(false,std::memory_order_acq_rel);

}

//不同线程对数据的读写

std::vector<int> data_a;

std::atomic<bool> data_ready(false);

void reader_thread()

{

while(!data_ready.load())

{

std::this_thread::sleep_for(std::chrono::milliseconds(1));

}

std::cout<<"the answer="<<data_a[0]<<"/n";

}

void writer_thread()

{

data_a.push_back(42);

data_ready=true;

}

//以上章节没看太懂

//第6章, 基于锁的并发数据结构设计

/*

1.锁的范围中的操作,是否在锁外能够执行?

2.数据结构中不同的区域是否被不同的互斥量保护?

3.所有操作都需要同级互斥量保护吗?

4.能否对数据结构进行简单的修改,提高并发效率?

总结:如何让序列化访问最小,让真实并发最大化

入门:基于互斥量和锁的并发数据结构设计 进阶:无锁并发数据结构设计 高端:原子操作大乱炖

*/

//线程安全的队列

template<typename T>

class threadsafe_queue_version1

{

private:

std::mutex mut;

std::queue<T> local_queue;

std::condition_variable data_cond;

public:

threadsafe_queue_version1();

threadsafe_queue_version1(const threadsafe_queue_version1&);

threadsafe_queue_version1& operator=(const threadsafe_queue_version1&) =delete;

void push(T& new_value)

{

std::lock_guard<std::mutex> lk(mut);

local_queue.push(new_value);

data_cond.notify_one();

};

//出队列,使用非条件变量出队列

bool try_pop(T& value);

std::shared_ptr<T> try_pop()

{

std::lock_guard<std::mutex> lk(mut);

if(local_queue.empty())

return std::shared_ptr<T>();

std::shared_ptr<T> res(std::make_shared<T>(local_queue.fornt()));

return res;

};

void wait_and_pop(T&value)

{

std::unique_lock<std::mutex> lk(mut);

data_cond.wait(lk,[this]{return !local_queue.empty();});

value=local_queue.fornt();

local_queue.pop();

};

//返回只能指针版本,资源消耗稍微大一些

std::shared_ptr<T> wait_and_pop()

{

std::unique_lock<std::mutex> lk(mut);

data_cond.wait(lk,[this]{return !local_queue.empty();});

std::shared_ptr<T> res(std::make_shared<T>(std::move(local_queue.front()))); //4

local_queue.pop();

return res;

};

bool empty() const

{

std::lock_guard<std::mutex> lk(mut);

return local_queue.empty();

};

};

//上面版本 4处构造函数可能会抛出异常,且pop线程可能开启后 没法关闭,将local_queue 结构改成 std::queue<shared_ptr<T>> 可以解决

//6.2.3使用细粒度锁,与 条件变量 实现高性能线程安全队列

//单线程队列

template <typename T>

class queue_one_thread

{

private:

struct node

{

T data;

std::unique_ptr<node> next;

node(T node_a): data(std::move(data_a)){}

};

std::unique_ptr<node> head;

node* tail;

public:

queue_one_thread(){};

queue_one_thread(const queue_one_thread&) = delete;

queue_one_thread& operator=(const queue_one_thread&) = delete ;

std::shared_ptr<T> try_pop()

{

//队列为空

if(!head)

{

return std::shared_ptr<T>();

}

//取出head的元素,制作成共享指针复制给res

const std::shared_ptr<T> res(std::make_shared<T>(head->data));

//为何不直接使用a=a->next

std::unique_ptr<node> const old_head=std::move(head);

head = std::move(old_head->next);

return res;

}

void push(T new_value)

{

//使用 new_value 构造 node ,再利用node 构造 p

std::unique_ptr<node> p(new node(std::move(new_value)));

node * const new_tail = p.get(); // unique.get() 返回指针

if(tail)

tail->next = std::move(p);

else

head =std::move(p);

tail = new_tail;

}

bool empty()

{

return !head;

}

};

/*分析:

1.头指针,尾指针 一般情况下指向的数据之间没关联(即,可以同时出头,进尾),唯一问题是,当只有一个元素的时候

2.当没有元素的时候, push要锁住尾,同时可能要操作头,因此可能要同时锁住头尾(2个锁,可能带来死锁问题)

*/

//6.6线程安全的队列  try_pop 和wait_pop 混合

template <typename T>

class threadsafe_queue_version2

{

private:

struct node

{

std::shared_ptr<T> data;

std::unique_ptr<node> next;

};

std::condition_variable head_cond;

std::mutex head_mutex;

std::unique_ptr<node> head;

std::mutex tail_mutex;

node* tail;

node* get_tail()

{

std::lock_guard<std::mutex> tail_lock(tail_mutex);

return tail;

}

//对尾加锁,判断是否为空 ,必须接在锁定头指针之后的代码中

bool is_empty()

{

return head.get()==get_tail();

}

std::unique_ptr<node> pop_head()

{

//返回头指针

std::unique_ptr<node> old_head(std::move(head));

head=std::move(old_head->next);

return old_head;

}

public:

threadsafe_queue_version2():head(new node),tail(head.get()){};

threadsafe_queue_version2(const threadsafe_queue_version2&) = delete;

threadsafe_queue_version2& operator=(const threadsafe_queue_version2&) = delete;

//从尾部推入数据,只需要锁住尾部

void push(T new_value)

{

//下面3句话可能会抛出异常,但是这里面使用的智能指针,所以 结果只是没有push运行,是异常安全的

std::shared_ptr<T> new_data(std::make_shared<T>(std::move(new_value)));

std::unique_ptr<node> p(new node); //新尾节点

node* new_tail=p.get();

std::lock_guard<std::mutex> lock_tail(tail_mutex);

tail->data = new_data;

tail->next = std::move(p);

tail=new_tail;

//通知有数据了

head_cond.notify_one();

}

//尝试弹出,无数据弹出空指针

std::shared_ptr<T> try_pop()

{

std::unique_ptr<node> res;

std::lock_guard<std::mutex> head_lock(head_mutex);

//不为空的时候 此处先获取 头锁,再获取 尾锁 (程序不存在其他的地方先获取尾锁,再获取头锁,所以不会死锁)

if(!is_empty())

{

res=std::move(pop_head());//对锁的操作会产生异常,这里对数据的操作都是 在获取全部锁之后,所以是异常安全的

return res->data;

}

else

return std::shared_ptr<T>();

};

//返回智能指针版本,资源消耗稍微大一些

std::shared_ptr<T> wait_and_pop(){

std::unique_lock<std::mutex> lk(head_mutex);

head_cond.wait(lk, [this]{return !is_empty();}); //等待不为空,那么必然能pop出头了

//返回头指针

std::unique_ptr<node> old_head(std::move(head));

head=std::move(old_head->next);

return old_head->data;

}

//是否为空

bool empty(){

std::lock_guard<std::mutex> head_lock(head_mutex);

return is_empty();

}

};

//6.3基于锁设计更加复杂的数据结构

//支持并发的map

/*

存在的问题:

1.要访问的迭代器被其他线程删除

2.添加数据后,遍历顺序的影响

思考:不返回引用,用一个互斥锁对每个函数上锁,并行还存在吗?

即使使用读写锁,同时也只有一个线程能修改map,如何提高性能?

基本数据结构: 二叉树(红黑树),有序数组,哈希表

分析:二叉树,根节点向下查找,每次需要锁住当前节点,同时锁可以向下传递,越上层锁住的面积越大嘛,放弃

有序数组,不解释

哈希表,假设有N个桶,那么每个桶都自带一个锁,可能能提高N倍性能

*/

//6.11 线程安全的查询表

template <typename Key,typename Value,typename Hash=std::hash<Key>>

class threadsafe_lookup_table

{

private:

//包裹好,数据为(key,value)的链表

class bucket_type

{

private:

typedef std::pair<Key,Value> bucket_value;

typedef std::list<bucket_value> bucket_data;

typedef typename bucket_data::iterator  bucket_iterator;

bucket_data data; //桶中的元素

mutable boost::shared_mutex mutex;  //读锁

bucket_iterator find_entry_for(Key const& key) const

{

return std::find_if(data.begin(),data.end(),

[&](bucket_value const& item){return item.first==key;});

}

public:

//找到Key的对应的值,使用读锁

Value value_for(Key const& key,Value const& default_value) const

{

boost::shared_lock<boost::shared_mutex> lock(mutex);

bucket_iterator const found_entry = find_entry_for(key);

return (found_entry == data.end()) ? default_value : found_entry->second;

}

//修改操作,使用写锁,锁住整个桶

void add_or_update_mapping(Key const& key,Value const& value)

{

std::unique_lock<boost::shared_mutex> lock(mutex);

bucket_iterator const found_entry = find_entry_for(key);

if(found_entry == data.end())

data.push_back( bucket_value(key,value) );

else

found_entry->second = value; //词句非异常安全,导致的结果只是丢失数据

}

void remove_mapping(Key const& key)

{

std::unique_lock<boost::shared_mutex> lock(mutex);

bucket_iterator const found_entry = find_entry_for(key);

if(found_entry != data.end())

data.erase(found_entry);

}

};

std::vector<std::unique_ptr<bucket_type>> buckets;

Hash hasher; //hash函数,

bucket_type& get_bucket(Key const& key) const

{

std::size_t const bucket_index = hasher(key) % buckets.size();

return *buckets[bucket_index];

}

public:

typedef Key key_type;

typedef Value mapped_type;

typedef Hash hash_type;

//构造函数

threadsafe_lookup_table(unsigned num_buckets=19,Hash const& hasher_ = Hash()):buckets(num_buckets),hasher(hasher_)

{

for(unsigned i =0;i<num_buckets;++i)

buckets[i].reset[new bucket_type];//此处为何要reset啊?

}

threadsafe_lookup_table(const threadsafe_lookup_table&) = delete;

threadsafe_lookup_table& operator=(const threadsafe_lookup_table&) = delete;

Value value_for(Key const& key,Value const& default_value=Value()) const

{

return get_bucket(key).value_for(key,default_value);

}

void add_or_updata_mapping(Key const& key,Value const& value)

{

get_bucket(key).add_or_updata_mapping(key,value);

}

void remove_mapping(Key const& key)

{

get_bucket(key).remove_mapping(key);

}

};

//6.3.2编写线程安全的队列

/////////////第七章,无锁并发数据结构设计

//第8章并发代码设计

/*

线程间划分数据技术,影响并发性能的因素,数据结构设计对性能的影响

多线程中的异常安全,可扩展性,并行算法的实现

*/

//8.1 如何在线程间划分工作

//////////////8.1.2基于数据的划分

//1.事先划分

//2.递归划分,快排(线程数指数级生成,销毁)

//3.使用栈的并行快速排序算法---等待数据库排序

template <typename T>

struct sorter

{

//待排序的块

struct chunk_to_sort

{

std::list<T> data;

std::promise< std::list<T> > promise;

};

//线程安全的栈,节点是 待排序的块

threadsafe_stack<chunk_to_sort> chunks;

std::vector<std::thread> threads;

unsigned const max_thread_count;

std::atomic<bool> end_of_data;

sorter():max_thread_count(std::thread::hardware_concurrency() - 1),

end_of_data(false){}

~sorter()

{

end_of_data = true;

//等线程结束后,再析构

for(unsigned i =0;i<threads.size();++i)

threads[i].joni();

}

//线程开启入口

void sort_thread()

{

while(!end_of_data)

{

try_sort_chunk();

std::this_thread::yield();

}

}

//从栈中弹出,排序

void try_sort_chunk()

{

boost::shared_ptr<chunk_to_sort>  chunk = chunks.pop();

if(chunk)

{

sort_chunk(chunk);

}

}

void sort_chunk(boost::shared_ptr<chunk_to_sort> const & chunk )

{

chunk->promise.set_value(do_sort(chunk->data));

}

std::list<T> do_sort(std::list<T>& chunk_data)

{

if(chunk_data.empty())

return chunk_data;

std::list<T> result;

result.splice(result.begin(),chunk_data,chunk_data.begin());

T const& partition_val =*result.begin();

typename std::list<T>::iterator divide_point =

std::partition(chunk_data.begin(),chunk_data.end(),

[&](T const& val){return val < partition_val;});

//左半边

chunk_to_sort new_lower_chunk;

new_lower_chunk.data.splice(new_lower_chunk.data.end(),chunk_data,chunk_data.begin(),divide_point);

//拿到左边块排序期望,将任务丢进栈

std::future<std::list<T>> new_lower = new_lower_chunk.promise.get_future();

chunks.push(std::move(new_lower_chunk));

if(threads.size() < max_thread_count)

{

threads.push_back(std::thread(&sorter<T>::sort_thread,this));

}

std::list<T> new_higher(do_sort(chunk_data));

result.splice(result.ends(),new_higher);

//当没有数据需要在等待排序的时候,此语句 轮询,性能有影响

//wait_for() 不会阻塞,get()会阻塞

while(new_lower.wait_for(std::chrono::seconds(0)) != std::future_status::ready )

{

try_sort_chunk();

}

result.splice(result.begin(),new_lower.get());

return result;

}

};

template <typename T>

std::list<T> parallel_quick_sort_thread_auto(std::list<T> input)

{

if(input.empty())

return std::move(input);

sorter<T> s;

return s.do_sort(input);

}

///////8.1.3通过任务类型划分工作

//分离工种,然工种间密切的交互,当所有交互都关于同样的问题时候,可以将工种合并

//划分任务序列,get数据流水线,对数据分块,并发

//8.2影响并发代码性能的因素

//8.2.1 cpu数量带来的问题

//8.2.2乒乓缓存

//处理器,每个处理器独立缓存,处理器会等待更新缓存(高竞争)

std::atomic<unsigned long> counter(0);

void process_loop()

{

//乒乓缓存

//相互等待,性能影响大, 当cpu等待缓存的时候,不能做任何事情..

while(counter.fetch_add(1,std::memory_order_relaxed)< 10000000 )

{

do_something(1);

}

}

//8.2.3 伪共享

//线程之间访问的数据过于接近

//比如2个int在内存中相邻, 大家虽然互斥的不是同一个变量,但是互斥的是同一个内存行(32,64B)

//8.2.4让内存数据紧凑

//减少两个线程对同一个内存位置的竞争,避免乒乓缓存

//将线程自己访问的内存放在一起, 将共享内存,每个并发 隔开放

//当线程数量少的时候,又会带来(不使用空间,时间性),导致缺页中断发生

//8.2.5超额认购与频繁的任务切换

//考虑开始最写的并发排序,5层递归32个线程..

/////8.3 为多线程性能设计的 数据结构

//8.3.1 为复杂操作 划分数组元素

//考虑大矩阵相乘 C=B*A的时候

//8.3.2其他数据结构中的数据访问模式

/*

1.尝试调整数据在线程间分布,让同一线线程数据紧凑

2.减少线程需要的数据量

3.不同线程访问不同位置,避免伪共享

*/

class data_item1

{};

class data_item2

{};

//测试伪共享

struct my_data_test{

data_item1 d1;

data_item2 d2;

char padding[65536];

};

//测试互斥量

struct protected_data

{

std::mutex m;

char padding[65536];// 超过缓存行的大小

my_data_test data_to_protect;

};

//异常安全,可扩展性

//8.4.3使用多线程隐藏延时

//多线程病毒扫描

//在线程等待事件阻塞的时候,充分分配其他任务,比如多线程快排

//8.4.4使用并发提高响应能力

//将GUI线程与任务线程分离

std::thread task_thread;

std::atomic<bool> task_cancelled(false);

enum eventType

{

start_task,

stop_task,

task_complete,

quit

};

class event_data

{

public:

eventType type;

};

//并发事件队列

threadsafe_queue<event_data> work_event;

void task()

{

/*

while(!task_complete() && ! task_cancelled)

{

do_next_operation();

}

if(task_cancelled )

{

perform_clearup();

}

else{

//传给事件队列

post_gui_event(task_completa);

}

*/

}

void process(event_data const& event)

{

switch(event.type)

{

case start_task:

task_cancelled=false;

task_thread= std::thread(task);

break;

case stop_task:

task_cancelled = true;

task_thread.join();

break;

case task_complete:

task_thread.join();

//display_result();

break;

default:

break;

//...

}

}

//gui线程

event_data get_event();

void gui_thread_along()

{

while(true)

{

//获取事件,鼠标点击,阻塞式

event_data event =get_event();

if(event.type == quit)

break;

process(event);

}

}

//8.5一些并行设计代码

//并行实现std::for_each

/*

对任务划分,如果想保存线程中的异常,可以使用std::package_task,asycn机制

*/

class join_threads

{

private:

std::vector<std::thread> j_threads;

public:

join_threads(std::vector<std::thread>& th ):j_threads(std::move(th)){};

~join_threads()

{

for(unsigned long i=0;i < j_threads.size();++i)

{

if(j_threads[i].joinable())

j_threads[i].join();

}

}

};

template < typename Iterator ,typename Func>

void parallel_for_each(Iterator first,Iterator last,Func f)

{

unsigned long const length = std::distance(first, last);

if(!length)

return ;

unsigned long const min_per_thread =25;

unsigned long const max_thread = (length +min_per_thread -1 )/min_per_thread;

unsigned long const hardware_threads = std::thread::hardware_concurrency();

unsigned long const num_threads = std::min(hardware_threads ? 2:hardware_threads, max_thread);

unsigned long const block_size = length/num_threads;

std::vector<std::future<void>> futures(num_threads-1);

std::vector<std::thread> threads(num_threads-1);

//线程包裹

join_threads joiner(threads);

Iterator block_start=first;

Iterator block_end =block_start;

for(unsigned long i=0;i<(num_threads-1);++i)

{

block_end =block_start;

std::advance(block_end, block_size);

std::packaged_task<void(void)> task([&](){std::for_each(block_start, block_end, f);});

futures[i]=task.get_future();

threads[i]=std::thread(std::move(task));

block_start =block_end;

}

std::for_each(block_start, last, f);

for(unsigned long i=0;i<(num_threads-1);++i)

{

futures[i].get();

}

}

//并行find算法实现

template <typename Iterator ,typename MatchType>

Iterator parallel_find(Iterator first,Iterator last,MatchType match)

{

struct find_element

{

void operator()(Iterator begin,Iterator end,MatchType match,std::promise<Iterator>* result,std::atomic<bool>* done_flag)

{

try

{

for(;begin!=end && !done_flag->load();++begin)

{

if(*begin == match)

{

//设置找到的位置,同时标记原子变量 ,此处存在竞争,但是 第一个元素会设置为promise

result->set_value(begin);

done_flag->store(true);

return ;

}

}

}

catch(...)

{

try {

result->set_exception(std::current_exception());

done_flag->store(true);

} catch (...) {

}

}

}

};

unsigned long const length = std::distance(first, last);

if(!length)

return ;

unsigned long const min_per_thread =25;

unsigned long const max_thread = (length +min_per_thread -1 )/min_per_thread;

unsigned long const hardware_threads = std::thread::hardware_concurrency();

unsigned long const num_threads = std::min(hardware_threads ? 2:hardware_threads, max_thread);

unsigned long const block_size = length/num_threads;

std::promise<Iterator> result;

std::atomic<bool> done_flag(false);

std::vector<std::thread> threads(num_threads-1);

{

//线程包裹

join_threads joiner(threads);

Iterator block_start=first;

Iterator block_end =block_start;

for(unsigned long i=0;i<(num_threads-1);++i)

{

block_end =block_start;

std::advance(block_end, block_size);

threads[i]=std::thread(find_element(),block_start,block_end,&result,&done_flag);

block_start = block_end;

}

find_element(block_start,last,&result,&done_flag);

if(!done_flag.load())

{

return last;

}

return result.get_guture().get();

}

};

//高级 std::partial_sum 并发代码实现,留作练习

//第9章 高级线程管理

//9.1.1最简单的线程池

class thread_pool

{

std::atomic_bool done;

//std::function<T> ,T只支持能拷贝的类型 //此线程池处理的任务是无参数,无返回的任务

threadsafe_queue_version2<std::function<void()>> work_queue;

std::vector<std::thread> threads;

join_threads joiner; //

void worker_thread()

{

while(!done) //还有工作

{

std::shared_ptr<std::function<void()>> task_p(work_queue.try_pop());

if(task_p==nullptr)

{

std::this_thread::yield(); //放弃CPU时间片,加入抢占队列

}

else

{

std::function<void()> task= *task_p;

task(); //工作

}

}

}

public:

thread_pool():done(false),joiner(threads)

{

unsigned const  thread_count = std::thread::hardware_concurrency()-1;

try{

for(unsigned int i=0 ;i <thread_count ;++i)

{

//初始化线程池,让所有线程进入循环工作状态

threads.push_back(std::thread(&thread_pool::worker_thread,this));

}

}

catch(...)

{

done=true;

throw;

}

}

~thread_pool()

{

done=true;

}

template<typename FunctionType>

void submit(FunctionType f)

{

work_queue.push(std::function<void()>(f));

}

};

//主线程 等待提交到线程池中的任务结果, 汇总

/*

submit返回任务句柄

//std::function<std::packaged_task<void()>> ftc; 失败,类型不支持拷贝

*/

class function_wrapper

{

struct impl_base{

virtual void call() = 0; //接口

virtual ~impl_base(){};

};

std::unique_ptr<impl_base> impl;

template <typename F>

struct impl_type:impl_base

{

F f;

impl_type(F&&f_):f(std::move(f_)){}

void call(){f();};

};

public:

//模板构造函数,以F类型,来构造

template <typename F>

function_wrapper(F&&f):impl(new impl_type<F>(std::move(f))){};

//仿函数功能

void operator()(){impl->call();};

function_wrapper()= default;

//移动构造

function_wrapper(function_wrapper&& other):impl(std::move(other.impl)){};

function_wrapper& operator=(function_wrapper&& other)

{

impl=std::move(other.impl);

return *this;

}

//没有拷贝,赋值构造函数

function_wrapper(const function_wrapper& other) = delete;

function_wrapper(function_wrapper &) =delete;

function_wrapper& operator=(function_wrapper& other) = delete;

};

//线程开启池,线程任务随着池子析构

class thread_pool_2

{

std::atomic_bool done;

//std::function<T> ,T只支持能拷贝的类型 //此线程池处理的任务是无参数,无返回的任务

threadsafe_queue_version2<function_wrapper> work_queue;

std::vector<std::thread> threads;

join_threads joiner; //

void worker_thread()

{

//开辟的线程循环检测是否有工作,线程池析构的时候才退出线程,性能影响较大

while(!done)

{

auto task_p=work_queue.try_pop();

if(task_p==nullptr)

{

std::this_thread::yield(); //放弃CPU时间片,加入抢占队列

}

else

{

function_wrapper task(std::move(*task_p));

task(); //工作

}

}

}

public:

template <typename FunctionType>

std::future<typename std::result_of<FunctionType()>::type> submit(FunctionType f)

{

typedef typename std::result_of<FunctionType>::type result_type;

std::packaged_task<result_type()> task(std::move(f));

std::future<result_type> res(task.get_future());

work_queue.push(std::move(task));

return res;

}

thread_pool_2():done(false),joiner(threads)

{

unsigned const  thread_count = std::thread::hardware_concurrency();

try{

for(unsigned int i=0 ;i <thread_count ;++i)

{

//初始化线程池,让所有线程进入循环工作状态

threads.push_back(std::thread(&thread_pool_2::worker_thread,this));

}

}

catch(...)

{

done=true;

throw;

}

}

~thread_pool_2()

{

done=true;

}

};

//使用

//多线程求和,不带异常处理

template<typename Iterator,typename T>

T parallel_accumulate_thread_poll(Iterator first,Iterator last,T init)

{

unsigned long const length =std::distance(first, last);

if(!length)

return init;

//每个任务分配的数量

unsigned long const block_size = 25;

unsigned long const num_blocks=(length+block_size-1)/block_size;

std::vector<std::future<T>> futures(num_blocks);

thread_pool_2 pool;

Iterator block_start =first;

Iterator block_end =block_start;

for(unsigned long i=0;i<num_blocks-1;++i)

{

block_end =block_start;

std::advance(block_end, block_size);

futures[i]=pool.submit(

[block_start,block_end,init]()->T{

T result=init-init;

T last_resu =accumulate_block<Iterator, T>()(block_start,block_end,result);

return last_resu;

}

);

block_start=block_end;

}

T last_resu=init-init;

T last_result=accumulate_block<Iterator,T>()(block_start,last,last_resu);

T result=init;

T t_lin;

for(unsigned long i =0;i <num_blocks-1 ;++i)

{

t_lin=futures[i].get();

//std::cout<<t_lin<<std::endl;

result += t_lin;

}

result+=last_result;

return result;

};

//////等待 依赖于任务,可管理线程的池子

class thread_pool_3

{

std::atomic_bool done;

//std::function<T> ,T只支持能拷贝的类型 //此线程池处理的任务是无参数,无返回的任务

threadsafe_queue_version2<function_wrapper> work_queue;

std::vector<std::thread> threads;

join_threads joiner; //

void worker_thread()

{

//开辟的线程循环检测是否有工作,线程池析构的时候才退出线程,性能影响较大

while(!done)

{

auto task_p=work_queue.try_pop();

if(task_p==nullptr)

{

std::this_thread::yield(); //放弃CPU时间片,加入抢占队列

}

else

{

function_wrapper task(std::move(*task_p));

task(); //工作

}

}

}

public:

template <typename FunctionType>

std::future<typename std::result_of<FunctionType()>::type> submit(FunctionType f)

{

typedef typename std::result_of<FunctionType>::type result_type;

std::packaged_task<result_type()> task(std::move(f));

std::future<result_type> res(task.get_future());

work_queue.push(std::move(task));

return res;

}

thread_pool_3():done(false),joiner(threads)

{

unsigned const  thread_count = std::thread::hardware_concurrency();

try{

for(unsigned int i=0 ;i <thread_count ;++i)

{

//初始化线程池,让所有线程进入循环工作状态

threads.push_back(std::thread(&thread_pool_3::worker_thread,this));

}

}

catch(...)

{

done=true;

throw;

}

}

~thread_pool_3()

{

done=true;

}

/////////////运行分配的任务接口

void run_pending_task()

{

auto task_p=work_queue.try_pop();

if(task_p==nullptr)

{

std::this_thread::yield(); //放弃CPU时间片,加入抢占队列

}

else

{

function_wrapper task(std::move(*task_p));

task(); //工作

}

}

};

//基于任务管理的线程池 快排

template <typename T>

struct sorter_pool

{

thread_pool_3 pool;

std::list<T> do_sort(std::list<T>& chunk_data)

{

if(chunk_data.empty())

return chunk_data;

std::list<T> result;

result.splice(result.begin(),chunk_data,chunk_data.begin());

T const& partition_val =*result.begin();

typename std::list<T>::iterator divide_point =

std::partition(chunk_data.begin(),chunk_data.end(),

[&](T const& val){return val < partition_val;});

//左半边

std::list<T> new_lower_chunk;

new_lower_chunk.data.splice(new_lower_chunk.data.end(),chunk_data,chunk_data.begin(),divide_point);

//拿到左边块排序期望,将任务丢进栈

std::future<std::list<T>> new_lower = pool.submit(std::bind(&sorter_pool::do_sort,this,std::move(new_lower_chunk)));

std::list<T> new_higher(do_sort(chunk_data));

result.splice(result.ends(),new_higher);

//当没有数据需要在等待排序的时候,此语句 轮询,性能有影响

while(new_lower.wait_for(std::chrono::seconds(0)) != std::future_status::ready )

{

pool.run_pending_task();

}

result.splice(result.begin(),new_lower.get());

return result;

}

};

///任务队列分开的线程池

class thread_pool_queue

{

std::atomic_bool done;

//std::function<T> ,T只支持能拷贝的类型

//此线程池处理的任务是无参数,无返回的任务

//全局工作队列

threadsafe_queue_version2<function_wrapper> pool_work_queue;

typedef std::queue<function_wrapper> local_queue_type;

//定义线程自己的工作队列

static thread_local std::unique_ptr<local_queue_type> local_work_queue;

std::vector<std::thread> threads;

join_threads joiner; //

void worker_thread()

{

//初始化线程工作队列

local_work_queue.reset(new local_queue_type);

//检测线程池是否结束

while(!done)

{

run_pending_task();

}

}

public:

template <typename FunctionType>

std::future<typename std::result_of<FunctionType()>::type> submit(FunctionType f)

{

typedef typename std::result_of<FunctionType>::type result_type;

std::packaged_task<result_type()> task(std::move(f));

std::future<result_type> res(task.get_future());

//自己有任务队列

if(local_work_queue)

{

local_work_queue->push(std::move(task));

}

else

{

pool_work_queue.push(std::move(task));

}

return res;

}

thread_pool_queue():done(false),joiner(threads)

{

unsigned const  thread_count = std::thread::hardware_concurrency();

try{

for(unsigned int i=0 ;i <thread_count ;++i)

{

//初始化线程池,让所有线程进入循环工作状态

threads.push_back(std::thread(&thread_pool_queue::worker_thread,this));

}

}

catch(...)

{

done=true;

throw;

}

}

~thread_pool_queue()

{

done=true;

}

/////////////运行分配的任务接口

void run_pending_task()

{

//自己有任务队列

if(local_work_queue && !local_work_queue->empty())

{

function_wrapper task;

task = std::move(local_work_queue->front());

local_work_queue->pop();

task();

}

else

{

auto task_p =pool_work_queue.try_pop();

if(task_p==nullptr)

{

std::this_thread::yield(); //放弃CPU时间片,加入抢占队列

}

else

{

function_wrapper task;

task=(std::move(*task_p));

task(); //工作

}

}

}

};

////////窃取任务线程池

//队列中实现一个try_steal对外接口

/////包裹实现一个 可中断线程

class interrupt_flag

{

public:

void set();

bool is_set() const;

};

// 命名空间线程本地变量,当线程启动时,如果入口处于该命名空间中,就会实例化这个变量

thread_local interrupt_flag this_thread_interrupt_flag;

class interruptible_thread

{

private:

interrupt_flag* flag;

std::thread internal_thread;

public:

template <typename FunctionType>

interruptible_thread(FunctionType f)

{

std::promise<interrupt_flag*> p;

internal_thread = std::thread(

[f,&p]{

p.set_value(&this_thread_interrupt_flag);

f();

}

);

flag=p.get_future().get();

};

void interrupt()

{

if(flag)

{

flag->set();

}

}

};

template <typename ReturnType ,typename ...Args>

class test_mutux

{

public :

};

最新文章

  1. oracle数据学习第一天
  2. Centos6.5 下安装PostgreSQL9.4数据库
  3. [dts]DTS实例分析
  4. longest common str
  5. linux下使用sqlplus使用上下键显示历史命令
  6. WCF 下的windows服务的安装卸载
  7. HTML&amp;CSS基础学习笔记1.2-HTML的全局属性?
  8. office2010安装出错,windows installer服务不能更新一个或多个受保护的windows文件
  9. 新 Netflix 开源门户
  10. eclipse3.1.1汉化版安装
  11. [转]Nginx基本功能极速入门
  12. RabbitMQ 消息确认与公平调度消费者
  13. RabbitMQ消息队列(四)-服务详细配置与日常监控管理
  14. Html转义字符列表
  15. php----------linux下安装php的swoole扩展
  16. 边沿检测方法-FPGA入门教程
  17. Python3:sorted()函数及列表中的sort()函数
  18. python .dcm文件读取,并转化为.jpg格式
  19. ARC介绍
  20. 96D - Police Stations

热门文章

  1. HanLP Analysis for Elasticsearch
  2. Django-Form组件之字段
  3. 关于Asset Library核心功能的一些计划
  4. ssl握手数据结构
  5. EntityFramework Inner Exception Catch
  6. Elasticsearch-6.7.0系列(三)5601端口 kibana——ES的UI界面
  7. 报错:Failed on local exception: Host Details : local host is: &quot;master/192.168.52.26&quot;; dest
  8. 窗口置顶 - 仿TopWind
  9. FuzzScanner 信息收集小工具
  10. java 通过jmx获取active mq队列消息