为了解决阻塞(如I/O)问题,我们需要对程序进行并发设计。

本文将通过将线程和队列 结合在一起,轻松地在 Python 中完成线程编程,创建一些简单但有效的线程使用模式。

一、使用线程

先看一个线程不多的例子,不存在阻塞,很简单:

import threading
import datetime class MyThread(threading.Thread):
def run(self):
now = datetime.datetime.now()
print("{} says Hello World at time: {}".format(self.getName(), now)) for i in range(2):
t = MyThread()
t.start()

    代码解读

1. 两个线程都输出了 Hello World 语句,并都带有日期戳。

2. 两个导入语句:一个导入了日期时间模块,另一个导入线程模块。

3. 类 MyThread 继承自 threading.Thread,也正因为如此,您需要定义一个 run 方法,以此执行您在该线程中要运行的代码。

4. run 方法中的self.getName() 是一个用于确定该线程名称的方法。

5. 最后三行代码实际地调用该类,并启动线程。如果注意的话,那么会发现实际启动线程的是 t.start()

二、使用线程队列

如前所述,当多个线程需要共享数据或者资源的时候,可能会使得线程的使用变得复杂。线程模块提供了许多同步原语,包括信号量、条件变量、事件和锁。当这些 选项存在时,最佳实践是转而关注于使用队列。相比较而言,队列更容易处理,并且可以使得线程编程更加安全,因为它们能够有效地传送单个线程对资源的所有访 问,并支持更加清晰的、可读性更强的设计模式。

在下一个示例中,我们的目的是:获取网站的 URL,并显示页面的前 300 个字节。

先看看串行方式或者依次执行实现的代码:

from urllib import request
import time hosts = ["http://yahoo.com", "http://amazon.com", "http://ibm.com", "http://apple.com"] start = time.time()
#grabs urls of hosts and prints first 1024 bytes of page
for host in hosts:
url = request.urlopen(host)
print(url.read(200)) print("Elapsed Time: %s" % (time.time() - start))

    代码解读

1. urllib 模块减少了获取 Web 页面的复杂程度。两次 time.time() 用于计算程序运行时间。

2. 这个程序的执行速度是 13.7 秒,这个结果并不算太好,也不算太糟。

3. 但如果需要检索数百个 Web 页面,那么按照这个平均值,总时间需要花费大约 1000 秒的时间。如果需要检索更多页面呢?

下面给出线程化版本:

import queue
import threading
from urllib import request
import time hosts = ["http://yahoo.com", "http://amazon.com", "http://ibm.com", "http://apple.com"] in_queue = queue.Queue() class ThreadUrl(threading.Thread):
"""Threaded Url Grab"""
def __init__(self, in_queue):
threading.Thread.__init__(self)
self.in_queue = in_queue def run(self):
while True:
#grabs host from queue
host = self.in_queue.get() #grabs urls of hosts and prints first 1024 bytes of page
url = request.urlopen(host)
print(url.read(200)) #signals to queue job is done
self.in_queue.task_done() start = time.time() def main(): #spawn a pool of threads, and pass them queue instance
for i in range(4):
t = ThreadUrl(in_queue)
t.setDaemon(True)
t.start() #populate queue with data
for host in hosts:
in_queue.put(host) #wait on the queue until everything has been processed
in_queue.join() main()
print("Elapsed Time: %s" % (time.time() - start))

    代码解读

1. 与第一个线程示例相比,它并没有复杂多少,因为使用了队列模块。

2. 创建一个 queue.Queue() 的实例,然后使用数据对它进行填充。

3. 将经过填充数据的实例传递给线程类,后者是通过继承 threading.Thread 的方式创建的。

4. 生成守护线程池。

5. 每次从队列中取出一个项目,并使用该线程中的数据和 run 方法以执行相应的工作。

6. 在完成这项工作之后,使用 queue.task_done() 函数向任务已经完成的队列发送一个信号。

7. 对队列执行 join 操作,实际上意味着等到队列为空,再退出主程序。

在使用这个模式时需要注意一点:通过将守护线程设置为 true,将允许主线程或者程序仅在守护线程处于活动状态时才能够退出。这种方式创建了一种简单的方式以控制程序流程,因为在退出之前,您可以对队列执行 join 操作、或者等到队列为空。

join()保持阻塞状态,直到处理了队列中的所有项目为止。在将一个项目添加到该队列时,未完成的任务的总数就会增加。当使用者线程调用 task_done() 以表示检索了该项目、并完成了所有的工作时,那么未完成的任务的总数就会减少。当未完成的任务的总数减少到零时,join() 就会结束阻塞状态。

三、使用多个队列

下一个示例有两个队列。其中一个队列的各线程获取的完整 Web 页面,然后将结果放置到第二个队列中。然后,对加入到第二个队列中的另一个线程池进行设置,然后对 Web 页面执行相应的处理。

提取所访问的每个页面的 title 标记,并将其打印输出。

import queue
import threading
from urllib import request
import time
from bs4 import BeautifulSoup hosts = ["http://yahoo.com", "http://amazon.com", "http://ibm.com", "http://apple.com"] in_queue = queue.Queue()
out_queue = queue.Queue() class ThreadUrl(threading.Thread):
"""Threaded Url Grab"""
def __init__(self, in_queue, out_queue):
threading.Thread.__init__(self)
self.in_queue = in_queue
self.out_queue = out_queue def run(self):
while True:
#grabs host from queue
host = self.in_queue.get() #grabs urls of hosts and then grabs chunk of webpage
url = request.urlopen(host)
chunk = url.read() #place chunk into out queue
self.out_queue.put(chunk) #signals to queue job is done
self.in_queue.task_done() class DatamineThread(threading.Thread):
"""Threaded Url Grab"""
def __init__(self, out_queue):
threading.Thread.__init__(self)
self.out_queue = out_queue def run(self):
while True:
#grabs host from queue
chunk = self.out_queue.get() #parse the chunk
soup = BeautifulSoup(chunk)
print(soup.findAll(['title'])) #signals to queue job is done
self.out_queue.task_done() start = time.time() def main(): #spawn a pool of threads, and pass them queue instance
for i in range(4):
t = ThreadUrl(in_queue, out_queue)
t.setDaemon(True)
t.start() #populate queue with data
for host in hosts:
in_queue.put(host) for i in range(4):
dt = DatamineThread(out_queue)
dt.setDaemon(True)
dt.start() #wait on the queue until everything has been processed
in_queue.join()
out_queue.join() main()
print("Elapsed Time: %s" % (time.time() - start))

    代码解读

1. 我们添加了另一个队列实例,然后将该队列传递给第一个线程池类 ThreadURL

2. 对于另一个线程池类 DatamineThread, 几乎复制了完全相同的结构。

3. 在这个类的 run 方法中,从队列中的各个线程获取 Web 页面、文本块,然后使用 Beautiful Soup 处理这个文本块。

4. 使用 Beautiful Soup 提取每个页面的 title 标记、并将其打印输出。

5. 可以很容易地将这个示例推广到一些更有价值的应用场景,因为您掌握了基本搜索引擎或者数据挖掘工具的核心内容。

6. 一种思想是使用 Beautiful Soup 从每个页面中提取链接,然后按照它们进行导航。

最新文章

  1. setTimeout和setInterval定时器使用详解测试
  2. Leetcode Minimum Window Substring
  3. 利用ajax向jsp传输数据
  4. google map 计算地图面积方法
  5. C语言实现简单线程池(转-Newerth)
  6. 多窗体之间方法调用 z
  7. jenkins邮件设置
  8. angularjs知识点
  9. Spring Security Oauth2 示例
  10. malloc(0)
  11. HTML DOM 学习笔记
  12. 在django中,redirect如何传递message。
  13. 正则表达式识别js跳转的链接
  14. [Python]查询mysql导出结果至Excel并发送邮件
  15. 26. The Greenhouse Effect and Its Consequences 温室效应及其后果
  16. UVA 548(二叉树重建与遍历)
  17. javascript DOM相关语法
  18. Hyperledger Fabric开发
  19. Linux --防火墙(一)
  20. 2017-10-17 NOIP模拟赛2

热门文章

  1. socat 的神奇使用方式
  2. Vue2学习笔记:数据交互vue-resource
  3. MySQL案例09:Last_IO_Error: Got fatal error 1236 from master when reading data from binary log
  4. REST Framework组件的解析源码
  5. procexp
  6. Linux系统设置运行级别
  7. 铁乐学python_day21_面向对象编程3
  8. C++实现一个Vector3空间向量类(转)
  9. C++用法总结
  10. __iter__的有无