引出

首先需要了解的是threadpool 的用途,他更适合于用到一些大量的短任务合集,而非一些时间长的任务,换句话说,适合大量的CPU密集型短任务,那些消耗时间较长的IO密集型长任务适合用协程去解决。

目前,python 标准库(特指python2.X)中的threadpool模块是在 multiprocessing.pool.threadpool,或者multiprocessing.dummy.ThreadPool(dummy模块是针对threading 多线程的进一步封装)。该模块有个缺点就是在所有线程执行完之前无法强制退出。实现原理大同小异:实例化pool的时候会创建指定数目的线程,把task 传给一个task-queue,线程会读取task-queue 的task,没有就阻塞,读取到后就执行,并将结果交给一个result-queue。

除了标准库中的threadpool,还有一些使用比较多的threadpool,以下展开。

pip 中的 ThreadPool

安装简单:pip install threadpool
使用如下:

1
2
3
4
pool = ThreadPool(poolsize)   # 定义线程池,指定线程数量
requests = makeRequests(some_callable, list_of_args, callback) # 调用makeRequests创建了要开启多线程的函数,以及函数相关参数和回调函数
[pool.putRequest(req) for req in requests] # 所有要运行多线程的请求扔进线程池
pool.wait() # 等待所有线程完成后退出

原理类似,源码解读可以参考python——有一种线程池叫做自己写的线程池 ,该博客还给出了对其的一些优化。

自己定制 threadpool

根据需要的功能定制适合自己的threadpool 也是一种常见的手段,常用的功能比如:是否需要返回线程执行后的返回值,线程执行完之后销毁还是阻塞等等。以下为自己经常用的的一个比较简洁的threadpool,感谢@kaito-kidd提供,源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# coding: utf8

"""
线程池,用于高效执行某些任务。
""" import Queue
import threading class Task(threading.Thread): """ 任务 """ def __init__(self, num, input_queue, output_queue, error_queue):
super(Task, self).__init__()
self.thread_name = "thread-%s" % num
self.input_queue = input_queue
self.output_queue = output_queue
self.error_queue = error_queue
self.deamon = True def run(self):
"""run
"""
while 1:
try:
func, args = self.input_queue.get(block=False)
except Queue.Empty:
print "%s finished!" % self.thread_name
break
try:
result = func(*args)
except Exception as exc:
self.error_queue.put((func.func_name, args, str(exc)))
else:
self.output_queue.put(result) class Pool(object): """ 线程池 """ def __init__(self, size):
self.input_queue = Queue.Queue()
self.output_queue = Queue.Queue()
self.error_queue = Queue.Queue()
self.tasks = [
Task(i, self.input_queue, self.output_queue,
self.error_queue) for i in range(size)
] def add_task(self, func, args):
"""添加单个任务
"""
if not isinstance(args, tuple):
raise TypeError("args must be tuple type!")
self.input_queue.put((func, args)) def add_tasks(self, tasks):
"""批量添加任务
"""
if not isinstance(tasks, list):
raise TypeError("tasks must be list type!")
for func, args in tasks:
self.add_task(func, args) def get_results(self):
"""获取执行结果集
"""
while not self.output_queue.empty():
print "Result: ", self.output_queue.get() def get_errors(self):
"""获取执行失败的结果集
"""
while not self.error_queue.empty():
func, args, error_info = self.error_queue.get()
print "Error: func: %s, args : %s, error_info : %s" \
% (func.func_name, args, error_info) def run(self):
"""执行
"""
for task in self.tasks:
task.start()
for task in self.tasks:
task.join() def test(i):
"""test """
result = i * 10
return result def main():
""" main """
pool = Pool(size=5)
pool.add_tasks([(test, (i,)) for i in range(100)])
pool.run() if __name__ == "__main__":
main()

阅读原文

最新文章

  1. kaggle数据挖掘竞赛初步--Titanic<原始数据分析&缺失值处理>
  2. python程序性能分析
  3. Nginx支持多站点配置小结
  4. 运用Fluxion高效破解WiFi密码
  5. C# ?(问号)的三个用处(转载)
  6. c实现的iOS http下载类。支持自己设定http 头(比如cookie等)
  7. CentOS 安装 Chrome
  8. Apache Spark Mesos
  9. ARC和非ARC文件混编
  10. XHTML 基础(含部分css)
  11. php和node高并发 大数据量怎么处理
  12. mpi中的广播
  13. iOS进阶之TCP代理鉴权过程
  14. YARN集群的mapreduce测试(五)
  15. 三问助你Fundebug
  16. CSS查漏补缺【未完】
  17. swagger如何测试List类型参数
  18. Netty沾包和拆包
  19. JUnit4 单元测试
  20. MyBatis基础入门《三》Select查询集合

热门文章

  1. mysql设计规范和原则
  2. MySQL 自带的4个系统数据库的说明
  3. Spring实战(第4版).pdf - 百度云资源
  4. Python使用Thrift
  5. Android PKMS服务
  6. httprunner学习10-测试报告ExtentReport
  7. hive中时间操作(一)
  8. spark-shell操作hive
  9. Spark SQL中的Catalyst 的工作机制
  10. [Unit test] jasmine createSpyObj