11.1 python中的GIL

# coding=utf-8
# gil global interpreter lock (cpython)
# python中一个线程对应于c语言中的一个线程
# gil使得同一个时刻只有一个线程在一个cpu上执行字节码, 无法将多个线程映射到多个cpu上执行 # gil会根据执行的字节码行数以及时间片释放gil,
# gil在遇到io的操作时候主动释放 import dis
def add(a):
a = a + 1
return a print(dis.dis(add))
total = 0 def add():
# 1. dosomething1
# 2. io操作
# 1. dosomething3
global total
for i in range(1000000):
total += 1 def desc():
global total
for i in range(1000000):
total -= 1 import threading thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()
thread2.start() thread1.join()
thread2.join()
print(total) # 在IO频繁的时候是很适合的

执行多少行后字节码会释放

11.2 python多线程编程

操作系统最小的执行单元

# coding=utf-8
# __auther__ = 'lewen' import time
import threading def get_detail_html(url):
print("get detail html started")
time.sleep(2)
print("get detail html end") def get_detail_url(url):
print("get detail url started")
time.sleep(4)
print("get detail url end") if __name__ == "__main__":
# 在主线程起两个线程
thread1 = threading.Thread(target=get_detail_html, args=("",))
thread2 = threading.Thread(target=get_detail_url, args=("",))
# thread1.setDaemon(True)
# thread2.setDaemon(True) # 守护线程,当主线程退出的时候, 子线程kill掉 start_time = time.time()
thread1.start()
thread2.start() thread1.join() # 等待线程的执行完成,才会执行下面
thread2.join() print("last time: {}".format(time.time() - start_time))

通过集成Thread来实现多线程

class GetDetailHtml(threading.Thread):
def __init__(self, name):
# py2 必须在括号写类名
# 继承父类的name
super().__init__(name=name) def run(self):
print("get detail html started")
time.sleep(2)
print("get detail html end") class GetDetailUrl(threading.Thread):
def __init__(self, name):
super().__init__(name=name) def run(self):
print("get detail url started")
time.sleep(4)
print("get detail url end")
if __name__ == "__main__":
thread1 = GetDetailHtml("get_detail_html")
thread2 = GetDetailUrl("get_detail_url")
start_time = time.time()
thread1.start()
thread2.start() thread1.join()
thread2.join() print("last time: {}".format(time.time() - start_time))

11.3 线程间通信-Queue

共用变量

# 线程间通信

import time
import threading
from chapter11 import variables from threading import Condition detail_url_list = [] # 1. 生产者当生产10个url以后就就等待,保证detail_url_list中最多只有十个url
# 2. 当url_list为空的时候,消费者就暂停 def get_detail_html(lock):
# 爬取文章详情页
detail_url_list = variables.detail_url_list # 将共享变量存放到文件中去
while True: if len(variables.detail_url_list):
if len(detail_url_list):
url = detail_url_list.pop()
# for url in detail_url_list:
print("get detail html started")
time.sleep(2)
print("get detail html end")
else:
time.sleep(1) def get_detail_url(lock):
# 爬取文章列表页
detail_url_list = variables.detail_url_list
while True:
print("get detail url started")
time.sleep(4)
for i in range(20):
if len(detail_url_list) >= 10:
time.sleep(1)
else:
detail_url_list.append("http://projectsedu.com/{id}".format(id=i))
print("get detail url end") # 1. 线程通信方式- 共享变量 if __name__ == "__main__":
thread_detail_url = threading.Thread(target=get_detail_url, args=(lock,))
for i in range(10):
html_thread = threading.Thread(target=get_detail_html, args=(lock,))
html_thread.start()
# # thread2 = GetDetailUrl("get_detail_url")
start_time = time.time() # 当主线程退出的时候, 子线程kill掉
print("last time: {}".format(time.time() - start_time))

# 通过共用变量

from queue import Queue

import time
import threading def get_detail_html(queue):
# 爬取文章详情页
while True: url = queue.get() # 阻塞,没有会停在这
print(url)
# 内部基于deque
print("get detail html started")
time.sleep(1)
print("get detail html end") def get_detail_url(queue):
# 爬取文章列表页
while True:
print("get detail url started")
time.sleep(2)
for i in range(9):
queue.put("http://www.baidu.com/s?wd=".format(id=i))
print(queue.qsize())
print("get detail url end") # 1. 线程通信方式- 共享变量 if __name__ == "__main__":
detail_url_queue = Queue(maxsize=1000) thread_detail_url = threading.Thread(target=get_detail_url, args=(detail_url_queue,)) html_thread_list = []
for i in range(10):
html_thread = threading.Thread(target=get_detail_html, args=(detail_url_queue,))
html_thread.start()
html_thread_list.append(html_thread)
start_time = time.time() for h in html_thread_list:
h.join()
# detail_url_queue.join() # 这里想退出,必须等到 detail_url_queue.task_done()调用,才会退出 print("last time: {}".format(time.time() - start_time))

# 通过queue的方式进行线程间同步

11.4 线程同步(Lock、RLock、Semaphores、Condition)

from threading import Lock, RLock, Condition  # 可重入的锁

# Lock 不能重复调用
total = 0
lock = RLock() # 在同一个线程里面,可以连续调用多次acquire, 一定要注意acquire的次数要和release的次数相等
# 多个线程之间仍会竞争 def add(): global lock
global total
for i in range(1000000):
lock.acquire()
lock.acquire() # 一个线程里面重入的锁
total += 1
lock.release()
lock.release() def desc():
global total
global lock
for i in range(1000000):
lock.acquire()
total -= 1
lock.release() import threading thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()
thread2.start() thread1.join()
thread2.join()
print(total) # 1. 用锁会影响性能
# 2. 锁会引起死锁
# 死锁的情况 A(a,b)
"""
A(a、b)
acquire (a)
acquire (b) # 阻塞住,死在这 B(b、a)
acquire (b) # 交互死锁,资源竞争
acquire (a)
"""

Lock、RLock

condition 使用以及源码分析

import threading

class XiaoAi(threading.Thread):
def __init__(self, lock):
super().__init__(name="小爱")
self.lock = lock def run(self):
self.lock.acquire()
print("{} : 在 ".format(self.name))
self.lock.release() self.lock.acquire()
print("{} : 好啊 ".format(self.name))
self.lock.release() class TianMao(threading.Thread):
def __init__(self, lock):
super().__init__(name="天猫精灵")
self.lock = lock def run(self): self.lock.acquire()
print("{} : 小爱同学 ".format(self.name))
self.lock.release() self.lock.acquire()
print("{} : 我们来对古诗吧 ".format(self.name))
self.lock.release()
if __name__ == "__main__": lock = threading.Lock() xiaoai = XiaoAi(lock)
tianmao = TianMao(lock) tianmao.start()
xiaoai.start() # ---
天猫精灵 : 小爱同学
天猫精灵 : 我们来对古诗吧
小爱 : 在
小爱 : 好啊

没有使用condition

class XiaoAi(threading.Thread):
def __init__(self, cond):
super().__init__(name="小爱")
self.cond = cond def run(self):
with self.cond:#第一把锁
self.cond.wait()
print("{} : 在 ".format(self.name))
self.cond.notify() self.cond.wait()
print("{} : 好啊 ".format(self.name))
self.cond.notify() self.cond.wait()
print("{} : 君住长江尾 ".format(self.name))
self.cond.notify() self.cond.wait()
print("{} : 共饮长江水 ".format(self.name))
self.cond.notify() self.cond.wait()
print("{} : 此恨何时已 ".format(self.name))
self.cond.notify() self.cond.wait()
print("{} : 定不负相思意 ".format(self.name))
self.cond.notify() class TianMao(threading.Thread):
def __init__(self, cond):
super().__init__(name="天猫精灵")
self.cond = cond def run(self):
with self.cond: #第一把锁
print("{} : 小爱同学 ".format(self.name))
self.cond.notify() # 提醒
self.cond.wait() # 等待条件提醒 print("{} : 我们来对古诗吧 ".format(self.name))
self.cond.notify()
self.cond.wait() print("{} : 我住长江头 ".format(self.name))
self.cond.notify()
self.cond.wait() print("{} : 日日思君不见君 ".format(self.name))
self.cond.notify()
self.cond.wait() print("{} : 此水几时休 ".format(self.name))
self.cond.notify()
self.cond.wait() print("{} : 只愿君心似我心 ".format(self.name))
self.cond.notify()
self.cond.wait()
if __name__ == "__main__": cond = threading.Condition()
xiaoai = XiaoAi(cond)
tianmao = TianMao(cond) # 在调用with cond之后才能调用wait或者notify方法
# condition有两层锁, 一把底层锁(with condition)会在线程调用了wait方法的时候释放,
# 上面的锁会在每次调用wait的时候分配一把并放入到cond的等待队列中,等到notify方法的唤醒
xiaoai.start()
tianmao.start() # 启动顺序很重要
# 天猫start 后 notify ,然后小爱 start 进入wait ,一直接受不到 notify 就阻塞住
# start 后 wait 的线程应该先启动去等着,以免接受不到notify # ---
天猫精灵 : 小爱同学
小爱 : 在
天猫精灵 : 我们来对古诗吧
小爱 : 好啊
天猫精灵 : 我住长江头
小爱 : 君住长江尾
天猫精灵 : 日日思君不见君
小爱 : 共饮长江水
天猫精灵 : 此水几时休
小爱 : 此恨何时已
天猫精灵 : 只愿君心似我心
小爱 : 定不负相思意

 在调用with cond之后才能调用wait或者notify方法
condition有两层锁, 一把底层锁(with condition)会在线程调用了wait方法的时候释放,
上面的锁会在每次调用wait的时候分配一把并放入到cond的等待队列中,等到notify方法的唤醒

Semaphore 使用

# Semaphore 是用于控制进入数量的锁
# 文件, 读、写, 写一般只是用于一个线程写,读可以允许有多个 # 做爬虫
import threading
import time class HtmlSpider(threading.Thread):
def __init__(self, url, sem):
super().__init__()
self.url = url
self.sem = sem def run(self):
time.sleep(2)
print("got html text success")
self.sem.release() class UrlProducer(threading.Thread):
def __init__(self, sem):
super().__init__()
self.sem = sem def run(self):
for i in range(20):
self.sem.acquire()
html_thread = HtmlSpider("https://baidu.com/{}".format(i), self.sem)
html_thread.start() if __name__ == "__main__":
sem = threading.Semaphore(3)
url_producer = UrlProducer(sem)
url_producer.start()

11.5 concurrent线程池编码

# 线程池, 为什么要线程池
# 主线程中可以获取某一个线程的状态或者某一个任务的状态,以及返回值
# 当一个线程完成的时候我们主线程能立即知道
# futures可以让多线程和多进程编码接口一致 from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED
import time def get_html(times):
time.sleep(times)
print("get page {} success".format(times))
return times
executor = ThreadPoolExecutor(max_workers=2) # 通过submit函数提交执行的函数到线程池中, submit 是立即返回
task1 = executor.submit(get_html, (3))
task2 = executor.submit(get_html, (2)) # done方法用于判定某个任务是否完成
print(task1.done()) # print(task2.cancel()) # 取消任务(成功返回True),在执行中或开始执行的时候是不能取消的
# time.sleep(3)
# print(task1.done()) # result 是阻塞的方法可以获取task的执行结果
print(task1.result()) # --------- urls = [3,2,4]
all_task = [executor.submit(get_html, (url)) for url in urls] # 批量提交 wait(all_task, return_when=FIRST_COMPLETED)
print("main") # 要获取已经成功的task的返回
# for future in as_completed(all_task):
# data = future.result()
# print("get {} page".format(data)) # 通过executor的map获取已经完成的task的值
# for data in executor.map(get_html, urls):
# print("get {} page".format(data)) # 跟提交值顺序相同 # ----
False
get page 2 success
get page 3 success
3
get page 2 success
main
get page 3 success
get page 4 success
from concurrent.futures import Future
#未来对象,task的返回容器

11.6 多进程编程-multiprocessing

# 多进程编程
# 耗cpu的操作,用多进程编程, 对于io操作来说, 使用多线程编程,进程切换代价要高于线程 # 1. 对于耗费cpu的操作,多进程由于多线程 import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from concurrent.futures import ProcessPoolExecutor def fib(n):
if n<=2:
return 1
return fib(n-1)+fib(n-2) if __name__ == "__main__":
with ProcessPoolExecutor(3) as executor: #last time is: 14.505059242248535
# with ThreadPoolExecutor(3) as executor: # last time is: 30.066641330718994
all_task = [executor.submit(fib, (num)) for num in range(25,40)]
start_time = time.time()
for future in as_completed(all_task):
data = future.result()
print("exe result: {}".format(data)) print("last time is: {}".format(time.time()-start_time))
#2. 对于io操作来说,多线程优于多进程
def random_sleep(n):
time.sleep(n)
return n if __name__ == "__main__":
# with ThreadPoolExecutor(3) as executor:
with ProcessPoolExecutor(3) as executor:
all_task = [executor.submit(random_sleep, (num)) for num in [1]*30]
start_time = time.time()
for future in as_completed(all_task):
data = future.result()
print("exe result: {}".format(data)) print("last time is: {}".format(time.time()-start_time))
import os
import time
# fork只能用Linux/unix中
pid = os.fork()
print("lewen",pid) if pid ==0: #子进程拷贝
print("子进程 %s,父进程 %s"%(os.getpid(),os.getppid()))
else:
print("我是父进程:%s"%(pid)) time.sleep(2) [root@doit ~]# python fork_test.py
('lewen', 16077)
我是父进程:16077
('lewen', 0)
子进程 16077,父进程 16076 import os
import time
# fork只能用Linux/unix中
print("lewen",pid) pid = os.fork() if pid ==0: #子进程拷贝
print("子进程 %s,父进程 %s"%(os.getpid(),os.getppid()))
else:
print("我是父进程:%s"%(pid)) time.sleep(2) [root@doit ~]# python fork_test.py
lewen
我是父进程:16096
子进程 16096,父进程 16095

os.fork()

from concurrent.futures import ProcessPoolExecutor  # 进程池,基于multiprocessing,推荐
import multiprocessing # 多进程编程
import time def get_html(n):
time.sleep(n)
print("sub_progress success")
return n class MyProcess(multiprocessing.Process):
def run(self):
pass if __name__ == "__main__":
# progress = multiprocessing.Process(target=get_html, args=(2,))
# print(progress.pid)
# progress.start()
# print(progress.pid)
# progress.join()
# print("main progress end") """
None
10796
sub_progress success
main progress end """ # 使用线程池
pool = multiprocessing.Pool(multiprocessing.cpu_count())
# result = pool.apply_async(get_html, args=(3,)) # 异步提交任务
#
# # 等待所有任务完成
# pool.close() # 关闭,不再接受新的任务进来,才不会出错
# pool.join()
#
# print(result.get())
"""
sub_progress success
3
""" # imap
# for result in pool.imap(get_html, [1, 5, 3]):
# print("{} sleep success".format(result))
"""
sub_progress success
1 sleep success
sub_progress success
sub_progress success
5 sleep success
3 sleep success
""" for result in pool.imap_unordered(get_html, [1, 5, 3]): # 谁先完成就打出来
print("{} sleep success".format(result)) """
sub_progress success
1 sleep success
sub_progress success
3 sleep success
sub_progress success
5 sleep success """

11.7 进程间通信

1 multiprocessing.Queue
# 共享全局变量通信
# 共享全局变量不能适用于多进程编程,可以适用于多线程 def producer(a):
a += 100
time.sleep(2) def consumer(a):
time.sleep(2)
print(a) if __name__ == "__main__":
a = 1
my_producer = Process(target=producer, args=(a,))
my_consumer = Process(target=consumer, args=(a,))
my_producer.start()
my_consumer.start()
my_producer.join()
my_consumer.join()
---
1

共享全局变量不能适用于多进程编程,可以适用于多线程

# multiprocessing中的queue不能用于pool进程池
# pool中的进程间通信需要使用manager中的queue
import time
from multiprocessing import Process, Queue, Pool, Manager, Pipe def producer(queue):
queue.put("a")
time.sleep(2) def consumer(queue):
time.sleep(2)
data = queue.get()
print(data) if __name__ == "__main__":
queue = Manager().Queue(10)
pool = Pool(2) pool.apply_async(producer, args=(queue,))
pool.apply_async(consumer, args=(queue,)) pool.close()
pool.join() --
a

2 pool中的进程间通信需要使用manager中的queue

#通过pipe(管道)实现进程间通信
#pipe的性能高于queue def producer(pipe):
pipe.send("lewen") def consumer(pipe):
print(pipe.recv()) if __name__ == "__main__":
recevie_pipe, send_pipe = Pipe()
# pipe只能适用于两个进程
my_producer= Process(target=producer, args=(send_pipe, ))
my_consumer = Process(target=consumer, args=(recevie_pipe,)) my_producer.start()
my_consumer.start()
my_producer.join()
my_consumer.join()

3 通过pipe(管道)实现进程间通信

内存共享

def add_data(p_dict, key, value):
p_dict[key] = value if __name__ == "__main__":
progress_dict = Manager().dict()
from queue import PriorityQueue # 优先级队列,后插入的数据尽快被获取到 first_progress = Process(target=add_data, args=(progress_dict, "lewen1", 22))
second_progress = Process(target=add_data, args=(progress_dict, "lewen2", 23)) first_progress.start()
second_progress.start()
first_progress.join()
second_progress.join() print(progress_dict) ---
{'lewen1': 22, 'lewen2': 23}

最新文章

  1. vim 常用命令逐渐熟悉以及常用的配置记录
  2. HTML5 Canvas arc()函数//////////////////////(转)
  3. ES6 你可能不知道的事 – 基础篇
  4. php面向对象(OOP)编程完全教程
  5. 【转】Python numpy库的nonzero函数用法
  6. Careercup - Facebook面试题 - 4922014007558144
  7. CentOS(七)--Linux文件类型及目录配置
  8. Android 使用SDcard进行文件的读取
  9. BZOJ1613: [Usaco2007 Jan]Running贝茜的晨练计划
  10. Video Target Tracking Based on Online Learning—深度学习在目标跟踪中的应用
  11. Linux忘记root密码 单用户模式 及启动加密
  12. 解决React Native:Error: Cannot find module &#39;asap/raw&#39;
  13. python3中一句话定义函数
  14. Redis 5.0 集群搭建
  15. Java语法基础动手动脑
  16. [!] CocoaPods could not find compatible versions for pod &quot;Folly&quot;问题举例
  17. 数据仓库原理&lt;3&gt;:数据仓库与ODS
  18. php array_flip() 删除数组重复元素——大彻大悟
  19. mysql测试数据创建
  20. hibernate_SessionFactory_getCurrentSession_JTA简介

热门文章

  1. PHP图像 因其本身有错无法显示
  2. java常用的Utils写法
  3. 切换svn登录账户
  4. How to Pronounce the Word ‘TO’
  5. jQuery中的几个模块总结
  6. winform程序开机启动时的运行目录
  7. groovy 环境配置
  8. dom node 查找父级parentNode
  9. Cannot resolve class or package &#39;dbcp&#39; Cannot resolve class &#39;BasicDataSource&#39;
  10. 快速上手Vue