concurrent.futures的作用:
       管理并发任务池。concurrent.futures模块提供了使用工作线程或进程池运行任务的接口。线程和进程池API都是一样,所以应用只做最小的修改就可以在线程和进程之间地切换

1、基于线程池使用map()

#!/usr/bin/env python
# -*- coding: utf-8 -*- from concurrent import futures
import threading
import time def task(n):
print('{}: 睡眠 {}'.format(threading.current_thread().name,n))
time.sleep(n / 10)
print('{}: 执行完成 {}'.format(threading.current_thread().name,n))
return n / 10 ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: 开始运行')
results = ex.map(task, range(5, 0, -1)) #返回值是generator 生成器
print('main: 未处理的结果 {}'.format(results))
print('main: 等待真实结果')
real_results = list(results)
print('main: 最终结果: {}'.format(real_results))

futures_thread_pool_map.py

运行效果

[root@ mnt]# python3 futures_thread_pool_map.py
main: 开始运行
ThreadPoolExecutor-0_0: 睡眠
ThreadPoolExecutor-0_1: 睡眠
main: 未处理的结果 <generator object Executor.map.<locals>.result_iterator at 0x7f1c97484678>
main: 等待真实结果
ThreadPoolExecutor-0_1: 执行完成
ThreadPoolExecutor-0_1: 睡眠
ThreadPoolExecutor-0_0: 执行完成
ThreadPoolExecutor-0_0: 睡眠
ThreadPoolExecutor-0_0: 执行完成
ThreadPoolExecutor-0_0: 睡眠
ThreadPoolExecutor-0_1: 执行完成
ThreadPoolExecutor-0_0: 执行完成
main: 最终结果: [0.5, 0.4, 0.3, 0.2, 0.1]

 2、futures执行单个任务

#!/usr/bin/env python
# -*- coding: utf-8 -*- from concurrent import futures
import threading
import time def task(n):
print('{}: 睡眠 {}'.format(threading.current_thread().name, n))
time.sleep(n / 10)
print('{}: 执行完成 {}'.format(threading.current_thread().name, n))
return n / 10 ex = futures.ThreadPoolExecutor(max_workers=2)
print('main :开始')
f = ex.submit(task, 5)
print('main: future: {}'.format(f))
print('等待运行结果')
results = f.result()
print('main: result:{}'.format(results))
print('main: future 之后的结果:{}'.format(f))

futures_thread_pool_submit.py

运行效果

[root@ mnt]# python3 futures_thread_pool_submit.py
main :开始
ThreadPoolExecutor-0_0: 睡眠 5
main: future: <Future at 0x7f40c0a6a400 state=running>
等待运行结果
ThreadPoolExecutor-0_0: 执行完成 5
main: result:0.5
main: future 之后的结果:<Future at 0x7f40c0a6a400 state=finished returned float>

3、futures.as_completed()按任意顺序运行结果

#!/usr/bin/env python
# -*- coding: utf-8 -*- import random
import time
from concurrent import futures def task(n):
time.sleep(random.random())
return (n, n / 10) ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: 开始')
wait_for = [
ex.submit(task, i) for i in range(5, 0, -1)
]
for f in futures.as_completed(wait_for):
print('main: result:{}'.format(f.result()))

futures_as_completed.py

运行效果

[root@ mnt]# python3 futures_as_completed.py
main: 开始
main: result:(, 0.5)
main: result:(, 0.4)
main: result:(, 0.3)
main: result:(, 0.1)
main: result:(, 0.2)

4、Future回调之futures.add_done_callback()

#!/usr/bin/env python
# -*- coding: utf-8 -*- from concurrent import futures
import time def task(n):
print('task {} : 睡眠'.format(n))
time.sleep(0.5)
print('task {} : 完成'.format(n))
return n / 10 def done(fn):
if fn.cancelled():
print('done {}:取消'.format(fn.arg))
elif fn.done():
error = fn.exception()
if error:
print('done {} : 错误返回 : {}'.format(fn.arg, error))
else:
result = fn.result()
print('done {} : 正常返回 : {}'.format(fn.arg, result)) if __name__ == '__main__':
ex = futures.ThreadPoolExecutor(max_workers=2)
print('main : 开始')
f = ex.submit(task, 5)
f.arg = 5
f.add_done_callback(done)
result = f.result()

futures_future_callback.py

运行效果

[root@ mnt]# python3 futures_future_callback.py
main : 开始
task : 睡眠
task : 完成
done : 正常返回 : 0.5

 5、Future任务取消之futures.cancel()

#!/usr/bin/env python
# -*- coding: utf-8 -*- from concurrent import futures
import time def task(n):
print('task {} : 睡眠'.format(n))
time.sleep(0.5)
print('task {} : 完成'.format(n))
return n / 10 def done(fn):
if fn.cancelled():
print('done {}:取消'.format(fn.arg))
elif fn.done():
error = fn.exception()
if error:
print('done {} : 错误返回 : {}'.format(fn.arg, error))
else:
result = fn.result()
print('done {} : 正常返回 : {}'.format(fn.arg, result)) if __name__ == '__main__':
ex = futures.ThreadPoolExecutor(max_workers=2)
print('main : 开始')
tasks = [] for i in range(10, 0, -1):
print('main: submitting {}'.format(i))
f = ex.submit(task, i)
f.arg = i
f.add_done_callback(done)
tasks.append((i, f)) for i, task_obj in reversed(tasks):
if not task_obj.cancel():
print('main: 不能取消{}'.format(i)) ex.shutdown()

futures_future_callback_cancel.py

运行效果

[root@mnt]# python3 futures_future_callback_cancel.py
main : 开始
main: submitting
task : 睡眠
main: submitting
task : 睡眠
main: submitting
main: submitting
main: submitting
main: submitting
main: submitting
main: submitting
main: submitting
main: submitting
done :取消
done :取消
done :取消
done :取消
done :取消
done :取消
done :取消
done :取消
main: 不能取消9
main: 不能取消10
task : 完成
done : 正常返回 : 1.0
task : 完成
done : 正常返回 : 0.9

 6、Future异常的处理

#!/usr/bin/env python
# -*- coding: utf-8 -*- from concurrent import futures def task(n):
print('{} : 开始'.format(n))
raise ValueError('这个值不太好 {}'.format(n)) ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: 开始...') f = ex.submit(task, 5) error = f.exception()
print('main: error:{}'.format(error)) try:
result = f.result()
except ValueError as e:
print('访问结果值的异常 {}'.format(e))

futures_future_exception

运行效果

[root@mnt]# python3 futures_future_exception.py
main: 开始...
: 开始
main: error:这个值不太好
访问结果值的异常 这个值不太好

  7、Future上下文管理即利用with打开futures.ThreadPoolExecutor()

#!/usr/bin/env python
# -*- coding: utf-8 -*- from concurrent import futures def task(n):
print(n) with futures.ThreadPoolExecutor(max_workers=2) as ex:
print('main: 开始')
ex.submit(task, 1)
ex.submit(task, 2)
ex.submit(task, 3)
ex.submit(task, 4)
print('main: 结束')

futures_context_manager.py

运行效果

[root@ mnt]# python3 futures_context_manager.py
main: 开始 main: 结束

 8、基于进程池使用map()

#!/usr/bin/env python
# -*- coding: utf-8 -*- from concurrent import futures
import os def task(n):
return (n, os.getpid()) if __name__ == '__main__':
ex = futures.ProcessPoolExecutor(max_workers=2)
results = ex.map(task, range(50, 0, -1))
for n, pid in results:
print('task {} in 进程id {}'.format(n, pid))

futures_process_pool_map.py

运行效果

[root@ mnt]# python3 futures_process_pool_map.py
task in 进程id
task in 进程id
task in 进程id
task in 进程id
task in 进程id

9、基于进程池异常处理

#!/usr/bin/env python
# -*- coding: utf-8 -*- from concurrent import futures
import os
import signal def task(n):
return (n, os.getpid()) if __name__ == '__main__':
with futures.ProcessPoolExecutor(max_workers=2) as ex:
print('获取工作进程的id')
f1 = ex.submit(os.getpid)
pid1 = f1.result() print('结束进程 {}'.format(pid1))
os.kill(pid1, signal.SIGHUP) print('提交其它进程')
f2 = ex.submit(os.getpid)
try:
pid2 = f2.result()
except futures.process.BrokenProcessPool as e:
print('不能开始新的任务:{}'.format(e))

futures_process_pool_broken.py

运行效果

[root@ mnt]# python3 futures_process_pool_broken.py
获取工作进程的id
结束进程
提交其它进程
不能开始新的任务:A process in the process pool was terminated abruptly while the future was running or pending.

最新文章

  1. 【解决】查询无法完成,因为其包含的查找列数已超过管理员强制实施的查找列阈值。Error code=0x80070093; Error source=Groove
  2. thinkphp表单自动验证
  3. Java NIO原理及实例
  4. poj 3176 Cow Bowling(dp基础)
  5. 在Android手机上获取其它应用的包名及版本
  6. Android:主题(Theme)
  7. YC(Y Combinator)斯坦福大学《如何创业》课程要点记录(粗糙)
  8. webpack require.ensure 按需加载
  9. 构造函数constructor 与析构函数destructor(五)
  10. vue的无缝滚动插件vue-seamless-scroll的安装与使用
  11. 第八章 SQL高级处理 8-1 窗口函数
  12. LoadRunner Controller 常见用法
  13. 单链表LRU
  14. MySQL-&gt;索引的维护[20180504]
  15. element-ui多层嵌套表格数据删除
  16. Linux+Python高端运维班第六周作业
  17. Visual studio C++ MFC之点击按钮(菜单栏)生成新窗口
  18. spring学习(3)
  19. java基础—代理(proxy)
  20. POJ 2096 (dp求期望)

热门文章

  1. springboot整合httpClient
  2. TP5实现自定义抛出异常消息(关闭debug)
  3. 终身机器学习(Lifelong Machine Learning)综述
  4. 高性能MySQL3_笔记1_Mysql的架构与历史
  5. OAuth授权看这篇就够了
  6. 01满包加记录最小路劲 L3-001. 凑零钱
  7. Java实现的基础数据结构
  8. squoosh
  9. 如何使用Navicat 创建一个SqlServer定时任务
  10. pycharm2017.3.3永久激活(转载)