一. std::promise和std::package_task

(一)共享状态、提供者和管理者

// CLASS TEMPLATE _Promise
template <class _Ty>
class _Promise { // class that implements core of promise
public:
_Promise(_Associated_state<_Ty>* _State_ptr)
: _State(_State_ptr, false), _Future_retrieved(false) { // construct from associated asynchronous state object
} _Promise(_Promise&& _Other)
: _State(_STD move(_Other._State)),
_Future_retrieved(_Other._Future_retrieved) { // construct from rvalue _Promise object
} _Promise& operator=(_Promise&& _Other) { // assign from rvalue _Promise object _State = _STD move(_Other._State);
_Future_retrieved = _Other._Future_retrieved;
return *this;
} ~_Promise() noexcept { // destroy
} void _Swap(_Promise& _Other) { // exchange with _Other
_State._Swap(_Other._State);
_STD swap(_Future_retrieved, _Other._Future_retrieved);
} const _State_manager<_Ty>& _Get_state() const { // return reference to associated asynchronous state object
return _State;
}
_State_manager<_Ty>& _Get_state() { // return reference to associated asynchronous state object
return _State;
} _State_manager<_Ty>& _Get_state_for_set() { // return reference to associated asynchronous state object, or
// throw exception if not valid for setting state
if (!_State.valid()) {
_Throw_future_error(make_error_code(future_errc::no_state));
} return _State;
} _State_manager<_Ty>& _Get_state_for_future() { // return reference to associated asynchronous state object, or
// throw exception if not valid for retrieving future
if (!_State.valid()) {
_Throw_future_error(make_error_code(future_errc::no_state));
} if (_Future_retrieved) {
_Throw_future_error(make_error_code(future_errc::future_already_retrieved));
} _Future_retrieved = true;
return _State;
} bool _Is_valid() const noexcept { // return status
return _State.valid();
} bool _Is_ready() const { // return ready status
return _State._Is_ready();
} bool _Is_ready_at_thread_exit() const { // return ready at thread exit status
return _State._Is_ready_at_thread_exit();
} _Promise(const _Promise&) = delete;
_Promise& operator=(const _Promise&) = delete; private:
_State_manager<_Ty> _State;
bool _Future_retrieved;
}; // CLASS TEMPLATE promise
template <class _Ty>
class promise { // class that defines an asynchronous provider that holds a value
public:
promise() : _MyPromise(new _Associated_state<_Ty>) { // construct
} template <class _Alloc>
promise(allocator_arg_t, const _Alloc& _Al)
: _MyPromise(_Make_associated_state<_Ty>(_Al)) { // construct with allocator
} promise(promise&& _Other) noexcept
: _MyPromise(_STD move(_Other._MyPromise)) { // construct from rvalue promise object
} promise& operator=(promise&& _Other) noexcept { // assign from rvalue promise object
promise(_STD move(_Other)).swap(*this);
return *this;
} ~promise() noexcept { // destroy
if (_MyPromise._Is_valid() && !_MyPromise._Is_ready()
&& !_MyPromise._Is_ready_at_thread_exit()) { // exception if destroyed before function object returns
future_error _Fut(make_error_code(future_errc::broken_promise));
_MyPromise._Get_state()._Set_exception(_STD make_exception_ptr(_Fut), false);
}
} void swap(promise& _Other) noexcept { // exchange with _Other
_MyPromise._Swap(_Other._MyPromise);
} _NODISCARD future<_Ty> get_future() { // return a future object that shares the associated
// asynchronous state
return future<_Ty>(_MyPromise._Get_state_for_future(), _Nil());
} void set_value(const _Ty& _Val) { // store result
_MyPromise._Get_state_for_set()._Set_value(_Val, false);
} void set_value_at_thread_exit(const _Ty& _Val) { // store result and block until thread exit
_MyPromise._Get_state_for_set()._Set_value(_Val, true);
} void set_value(_Ty&& _Val) { // store result
_MyPromise._Get_state_for_set()._Set_value(_STD forward<_Ty>(_Val), false);
} void set_value_at_thread_exit(_Ty&& _Val) { // store result and block until thread exit
_MyPromise._Get_state_for_set()._Set_value(_STD forward<_Ty>(_Val), true);
} void set_exception(exception_ptr _Exc) { // store result
_MyPromise._Get_state_for_set()._Set_exception(_Exc, false);
} void set_exception_at_thread_exit(exception_ptr _Exc) { // store result and block until thread exit
_MyPromise._Get_state_for_set()._Set_exception(_Exc, true);
} promise(const promise&) = delete;
promise& operator=(const promise&) = delete; private:
_Promise<_Ty> _MyPromise;
}; template <class _Ret, class... _ArgTypes>
class packaged_task<_Ret(_ArgTypes...)> { // class that defines an asynchronous provider that returns the
// result of a call to a function object
public:
using _Ptype = typename _P_arg_type<_Ret>::type;
using _MyPromiseType = _Promise<_Ptype>;
using _MyStateManagerType = _State_manager<_Ptype>;
using _MyStateType = _Packaged_state<_Ret(_ArgTypes...)>; packaged_task() noexcept : _MyPromise() { // construct
} template <class _Fty2, class = enable_if_t<!is_same_v<_Remove_cvref_t<_Fty2>, packaged_task>>>
explicit packaged_task(_Fty2&& _Fnarg)
: _MyPromise(new _MyStateType(_STD forward<_Fty2>(_Fnarg))) { // construct from rvalue function object
} packaged_task(packaged_task&& _Other) noexcept
: _MyPromise(_STD move(_Other._MyPromise)) { // construct from rvalue packaged_task object
} packaged_task& operator=(packaged_task&& _Other) noexcept { // assign from rvalue packaged_task object _MyPromise = _STD move(_Other._MyPromise);
return *this;
} #if _HAS_FUNCTION_ALLOCATOR_SUPPORT
template <class _Fty2, class _Alloc, class = enable_if_t<!is_same_v<_Remove_cvref_t<_Fty2>, packaged_task>>>
packaged_task(allocator_arg_t, const _Alloc& _Al, _Fty2&& _Fnarg)
: _MyPromise(_Make_packaged_state<_MyStateType>(
_STD forward<_Fty2>(_Fnarg), _Al)) { // construct from rvalue function object and allocator
}
#endif // _HAS_FUNCTION_ALLOCATOR_SUPPORT ~packaged_task() noexcept { // destroy
_MyPromise._Get_state()._Abandon();
} void swap(packaged_task& _Other) noexcept { // exchange with _Other
_STD swap(_MyPromise, _Other._MyPromise);
} _NODISCARD bool valid() const noexcept { // return status
return _MyPromise._Is_valid();
} _NODISCARD future<_Ret> get_future() { // return a future object that shares the associated
// asynchronous state
return future<_Ret>(_MyPromise._Get_state_for_future(), _Nil());
} void operator()(_ArgTypes... _Args) { // call the function object
if (_MyPromise._Is_ready()) {
_Throw_future_error(make_error_code(future_errc::promise_already_satisfied));
} _MyStateManagerType& _State = _MyPromise._Get_state_for_set();
_MyStateType* _Ptr = static_cast<_MyStateType*>(_State._Ptr());
_Ptr->_Call_immediate(_STD forward<_ArgTypes>(_Args)...);
} void make_ready_at_thread_exit(_ArgTypes... _Args) { // call the function object and block until thread exit
if (_MyPromise._Is_ready()) {
_Throw_future_error(make_error_code(future_errc::promise_already_satisfied));
} _MyStateManagerType& _State = _MyPromise._Get_state_for_set();
if (_State._Ptr()->_Already_has_stored_result()) {
_Throw_future_error(make_error_code(future_errc::promise_already_satisfied));
} _MyStateType* _Ptr = static_cast<_MyStateType*>(_State._Ptr());
_Ptr->_Call_deferred(_STD forward<_ArgTypes>(_Args)...);
} void reset() { // reset to newly constructed state
_MyStateManagerType& _State = _MyPromise._Get_state_for_set();
_MyStateType* _MyState = static_cast<_MyStateType*>(_State._Ptr());
function<_Ret(_ArgTypes...)> _Fnarg = _MyState->_Get_fn();
_MyPromiseType _New_promise(new _MyStateType(_Fnarg));
_MyPromise._Get_state()._Abandon();
_MyPromise._Swap(_New_promise);
} packaged_task(const packaged_task&) = delete;
packaged_task& operator=(const packaged_task&) = delete; private:
_MyPromiseType _MyPromise;
};

【std::promise/std::package_task源码摘要】

  1.“共享状态”作为异步结果的传输通道,由std::async、std::promise和std::package_task等提供(Provider),并交由future/shared_future管理。Provider将计算结果写入“共享状态”对象,而future/shared_future通过get()函数来读取该结果。

  2. std::promise用于包装一个值,将数据和future绑定起来,方便线程赋值。而std::package_task用来包装一个可调用对象,将函数与future绑定以便异步调用。

  3. std::async、std::promise和std::package_task都是“共享状态”对象的创建者,它们创建“共享状态”类型有所不同。

  (1)std::async:创建_Deferred_async_state和_Task_async_state类型的共享状态。

  (2)std::promise:创建_Associated_state类型的“共享状态”。这种类型比较简单,内部只能保存某个类型的值(如返回值)。

  (3)std::package_task:创建_Package_state类型的“共享状态”,这种类型内部是通过std::function来可以包装可调用对象的

  4. std::promise和std::package_task都只有移动语义而没有拷贝语义

(二)std::promise类

  1. 用来保存某一类型的值,也可以用来保存线程函数的返回值,该值可被future读取。它为线程同步提供了一种手段。

  2. 可以通过 get_future 来获取future 对象,该对象与promise通过“共享状态”这个通道进行异步结果传输。std::promise通常在某个时间点通过设置一个值或异常对象, future通过调用get()来获取这个结果。

  3. set_value_at_thread_exit函数:设置共享状态的值,但不会将共享状态的标志设置为ready。当有当线程退出时,该标志位才设置为true,同时唤醒所有阻塞在future的get()函数的线程。

  4. 如果销毁std::promise时未设置值,则会存入一个异常。

(三)std::package_task类

  1. std::package_task 用来包装可调用对象,其本身也是一个可调用对象(因为重载了operator()(Args…args)函数。它可以作为线程函数传递给std::thread,或传给需要可调用对象的另一个函数,或者干脆直接调用。这与std::function类似,但std::package_task会将其包装的可调用对象执行结果(返回值)保存起来,并传递给了future对象

  2. 通过get_future()返回一个与“共享状态”相关联的future对象。其他线程可以通过std::package_task对象在“共享状态”上设置某个值或者异常。

  3. make_ready_at_thread_exit(Arg...args):该函数会调用被包装的任务,并向任务传递参数,类似于std::package_task的operator()成员函数,但不同的是make_read_at_thread_exit并不会立即设置“共享状态”的ready标志,而是在线程退出时才设置它

  4. reset()函数会重置“共享状态”,但是保留了之前被包装的任务。它使得package_task可以被重复使用,这点与std::promise一次性使用不同。

  5. std::package_task对象一般与std::thread配合使用,而不是std::async。如果要使用std::async运行任务,就没有理由去创建std::package_task对象。因为std::async调用时,内部会创建一个基类为_Packaged_state类 “共享状态”的子类对象,而std::package_task也会创建_Packaged_state类的对象。可见std::async能够在调用任务执行之前就做到std::package_task能做到的任何事情,也可以避免重复创建“共享状态”对象。

【编程实验】初探std::promise和std::pack_task

#include <iostream>
#include <future>
#include <thread>
#include <chrono>
#include <queue> using namespace std; void func(std::promise<int>& pr, int param)
{
int res = param * ;
pr.set_value_at_thread_exit(res); //线程退出时,设置需要输出的值
} //计算阶乘
int factorial(int n)
{
std::this_thread::sleep_for(std::chrono::milliseconds()); if (n == )
return ; return n * factorial(n - );
} void get_result(std::future<int>& fut) //获取结果
{
while (fut.wait_for(std::chrono::milliseconds()) == std::future_status::timeout) {
std::cout << ".";
}
std::cout << std::endl;
std::cout << "the factorial result is " << fut.get() << std::endl;
} int main()
{
//1. std::promise/std::future配合使用 //1.1 主线程等待子线程的结果
std::promise<int> pr1;
std::future<int> fut1 = pr1.get_future(); std::thread t1(func, std::ref(pr1), );
t1.join(); //等待t1线程退出 std::cout << "The func output: " << fut1.get() << std::endl;
//1.2 子线程等待主线程的计算结果
std::promise<int> pr2;
std::future<int> fut2 = pr2.get_future(); //创建通道 std::thread t2([](std::future<int>& fut) {
int res = fut.get();
cout <<"thread id: "<<std::this_thread::get_id() << " get result " << res << endl;
}, std::ref(fut2)); std::this_thread::sleep_for(std::chrono::milliseconds()); pr2.set_value();
t2.join(); //2. std::package_task与std::future配合使用
//2.1 直接将package_task作为函数对象使用
std::packaged_task<double(int, int)> task1([](int a, int b) {
return std::pow(a, b);
}); std::future<double> res = task1.get_future(); task1(, ); std::cout << "task_lambda: " << res.get() << endl; //2.2 将std::package_task作为任务传递给std::thread线程
std::packaged_task<int(int)> task2(factorial);
std::future<int> fut3 = task2.get_future(); std::thread t3(std::ref(task2), ); //t3线程计算7的阶乘
std::thread t4(get_result, std::ref(fut3));
t3.join();
t4.join(); task2.reset(); //重置task,使得task2可以被重复使用
std::future<int> fut5 = task2.get_future();
std::thread t5(std::ref(task2), ); //计算8的阶乘
cout << fut5.get() << endl;
t5.join(); //3.std::async与std::future配合使用
std::future<int> fut6 = std::async(std::launch::async, factorial, );
fut6.wait(); cout <<"aync calc result is: " << fut6.get() << endl; return ;
}
/*输出结果
The func output: 50
thread id: 6532 get result 100
task_lambda: 512
.................................
the factorial result is 5040
40320
aync calc result is: 5040
*/

二. std::promise/std::package_task的应用

(一)一次性事件及建模

  1. 一个线程在完成其任务之前,可能需要等待特定的一次性事件的发生。在等待期间,线程可以去轮询事件是否发生,也可以去做其他任务。C++标准库使用std::future为这类一次性事件建模。

  2.一旦事件发生,future变为就绪,而std::future的get/wait()只能被调用一次,无法重复使用。如果多线程等待同一个事件,就需要使用std::shared_future,当事件发生时所有相关的shared_future对象均会变为就绪,并且可以访问其关联的任务结果。

  3.期值对象本身并不提供同步访问,当多个线程需要访问一个独立的期值对象时,必须使用互斥量或类似同步机制对访问进行保护。而如果仅为了实现一次性的事件通信,基于条件变量的设计会要求多余的互斥量和标志位,这显然不够干净利落,而使用期值可以很好的处理这个问题。

(二)线程间传递任务(以GUI消息处理为例)

  1. 在GUI编程中,当一个线程计算完结果,它要发出一条信息给GUI线程,以通知更新界面。

  2. std::package_task提供了实现这种功能的方法,且不需要发送一条自定义信息给GUI线程,而是将函数包装成任务,并传递到GUI线程,使任务在GUI线程中运行。

【编程实验】std::promise和std::package_task的应用

#include <iostream>
#include <future>
#include <thread>
#include <mutex>
#include <queue> using namespace std; //1.一次性事件及建模(以实现暂停状态启动的线程为例)
//创建暂停状态的线程:std::thread类创建的线程,一启动线程就运行起来。但是如果希望在线程运行前设置优先级和内核亲和性,
//就需要一个可以创建一个暂停的线程,然后通过其提供的native_handle成员,利用平台底层API配置这些线程特征。为达到这一
//目的,可以利用std::promise / std::future提供的一次性机制来实现暂停状态的线程。
class MyThread
{
private:
std::thread mThread;
std::promise<void> mPromise;
std::future<void> mFuture;
bool bStart;
public:
template<typename Fn, typename ...ArgTypes>
MyThread(Fn&& fn, ArgTypes&&... args):bStart(false)
{
mFuture = mPromise.get_future(); mThread = std::move(std::thread([this, &fn, &args...] {
mFuture.wait();
std::forward<Fn>(fn)(std::forward<ArgTypes>(args)...);
}));
} void start()
{
if (!bStart) {
mPromise.set_value();
bStart = true;
}
} void join()
{
mThread.join();
} void detach()
{
mThread.detach();
} bool joinable() {
return mThread.joinable();
}
}; //2. 利用shared_future处理多个反应任务
//反应任务
std::mutex g_mtx;
void reach()
{
std::lock_guard<std::mutex> lck(g_mtx);
cout << "thread(id= " <<std::this_thread::get_id() <<") react"<< endl;
} //检测任务(可处理多个反应任务)
void detech()
{
std::promise<void> pr;
std::shared_future<void> sf = pr.get_future().share();
std::vector<std::thread> vec; //反应任务的容器 for (int i = ; i < ; ++i)
{
vec.emplace_back([sf] //在sf按值传递,在其副本上wait
{
sf.wait();
reach();
});
} //... //注意,如果此处抛出异常,则detech会失去响应 pr.set_value(); //让所有线程取消暂停 for (auto& t : vec) {
t.join();
}
} //3. gui消息处理(在线程间传递任务,而不是消息)
class MessageManager
{
std::queue<std::shared_ptr<std::function<void()>>> mQueue;
std::mutex mtx;
bool bShutdown = false;
public:
void shutDown() { bShutdown = true; } //将任务包装成package_task
template<typename Fn, typename... Args>
std::future<std::result_of_t<Fn && (Args&& ...)>> //postTask函数的返回值类型,future配合package_task使用
postTask(Fn&& fn, Args&& ...args)
{
using Ret = std::result_of_t <Fn && (Args && ...)>; //Fn函数的返回值类型 std::lock_guard<std::mutex> lck(mtx); ////将任务包装成package_task类型(注意,由于package_task为只移动类型,不能复制。这里在堆上创建)
auto ptrPA = std::make_shared<std::packaged_task<Ret()>>(std::bind(std::forward<Fn>(fn), std::forward<Args>(args)...));
auto fut = ptrPA->get_future(); //利用lambda将“std::packaged_task<Ret()>”重新包装成queue队列所需的元素类型:std::function<void()>共享指针类型
auto pTask = std::make_shared<std::function<void()>>([ptrPA]()->void {(*ptrPA)(); }); mQueue.push(pTask); return fut;
} void guiThread()
{
while (!bShutdown)
{
//... //处理其它gui界面消息 //获取并执行用户任务
std::shared_ptr<std::function<void()>> task;
{
std::lock_guard<std::mutex> lk(mtx);
if (mQueue.empty())
continue; task = mQueue.front(); //取出队列中的用户任务
mQueue.pop();
}
(*task)(); //执行任务
}
}
}; int main()
{
cout << "main thread running..." << endl; //1. 以暂停状态启动的线程
MyThread th([](int x, int y) {
int res = x + y; cout << x << " + " << y << " = " << res << endl;
return res;
}, , ); th.start(); th.join(); //2.处理多反应任务(在shared_future上等待)
detech(); //3. 在线程间传递任务
MessageManager mm;
std::thread guiThread(&MessageManager::guiThread,&mm); auto fut1 = mm.postTask([](int x, int y)->int { return x + y; }, , );
auto fut2 = mm.postTask([](int x, int y, int z) {return x * y * z; }, , , );
auto fut3 = mm.postTask([](const std::string& str) {return str; }, "SantaClaus");
cout << fut1.get() << endl; //
cout << fut2.get() << endl; //
cout << fut3.get() << endl; //SantaClaus mm.shutDown(); guiThread.join(); return ;
}
/*输出结果
main thread running...
10 + 20 = 30
thread(id= 8792) react
thread(id= 2600) react
thread(id= 2604) react
thread(id= 13384) react
thread(id= 14864) react
thread(id= 14884) react
thread(id= 13588) react
thread(id= 13516) react
thread(id= 11956) react
thread(id= 13540) react
3
6000
SantaClaus
*/

最新文章

  1. 【转】天啦噜!原来Chrome自带的开发者工具还能这么用!(提升JS调试能力的10个技巧)
  2. Programming pearls 编程珠玑的题目
  3. PHP的PSR系列规范都有啥内容
  4. linux中diff命令用法
  5. javaScript怪癖分析
  6. css3 变形(transform)、转换(transition)和动画(animation)
  7. 暴力枚举 UVA 10976 Fractions Again?!
  8. Linq世界走一走
  9. 记录一次会话CRT
  10. css3选择器二
  11. 单线程与多线程的简单示例(以Windows服务发短信为示例)
  12. CodeForces 605B Lazy Student
  13. vue.js项目构建
  14. 字符串匹配(一)----Rabin-Karp算法
  15. django上传excel文件
  16. 701 C. They Are Everywhere
  17. Ext4 ReiserFS Btrfs 等7种文件系统性能比拼
  18. 转: Python中的os.path.dirname(__file__)
  19. Android项目开发第四周学习总结
  20. 【321】python进程监控:psutil

热门文章

  1. Java匹马行天下之学编程的起点——走进编程的殿堂
  2. Docker入门学习及其安装
  3. 【java提高】---patchca生成验证码
  4. array list 的特点及几种遍历方法
  5. Java生鲜电商平台-商品的spu和sku数据结构设计与架构
  6. Hystrix核心熔断器
  7. Arduino+esp8266-01+舵机 制作基于局域网的遥控门禁
  8. ca动画
  9. selenium和AutoIt工具辅助下载和上传
  10. Django 练习班级管理系统四 -- 编辑班级