一、多进程

1.1 多进程的概念

  由于GIL的存在,python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。Python提供了非常好用的多进程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换。multiprocessing支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

  multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。

但在使用这些共享API的时候,我们要注意以下几点:

  a、在UNIX平台上,当某个进程终结之后,该进程需要被其父进程调用wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个Process对象调用join()方法 (实际上等同于wait)。对于多线程来说,由于只有一个进程,所以不存在此必要性。

  b、Windows系统下,需要注意的是要想启动一个子进程,必须加上 if __name__ == "__main__",进程相关的要写在这句下面。

1.2 创建进程的两种方式

直接创建:

 from multiprocessing import Process
import time
def f(name):
time.sleep(1)
print('hello', name,time.ctime()) if __name__ == '__main__':
p_list=[]
for i in range(3):
p = Process(target=f, args=('alvin',))
p_list.append(p)
p.start()
for i in p_list:
p.join()
print('end')

Demo1

类式调用:

 from multiprocessing import Process
import time class MyProcess(Process):
def __init__(self):
super(MyProcess, self).__init__()
#self.name = name def run(self):
time.sleep(1)
print ('hello', self.name,time.ctime()) if __name__ == '__main__':
p_list=[]
for i in range(3):
p = MyProcess()
p.start()
p_list.append(p) for p in p_list:
p.join() print('end')

Demo2

二、Process类

2.1 构造方法

   Process([group [, target [, name [, args [, kwargs]]]]])

  group: 线程组,目前还没有实现,库引用中提示必须是None; 
  target: 要执行的方法; 
  name: 进程名; 
  args/kwargs: 要传入方法的参数。

2.2 实例方法

   is_alive():返回进程是否在运行。

  join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。

  start():进程准备就绪,等待CPU调度

  run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。

  terminate():不管任务是否完成,立即停止工作进程

2.3 属性

    authkey

  daemon:和线程的setDeamon功能一样

  exitcode(进程在运行时为None、如果为–N,表示被信号N结束)

  name:进程名字。

  pid:进程号。

 import time
from multiprocessing import Process def foo(i):
time.sleep(1)
print (p.is_alive(),i,p.pid)
time.sleep(1) if __name__ == '__main__':
p_list=[]
for i in range(10):
p = Process(target=foo, args=(i,))
#p.daemon=True
p_list.append(p) for p in p_list:
p.start()
# for p in p_list:
# p.join() print('main process end')

Demo

三、进程间通信

不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:

3.1 Queues 使用方法跟threading里的queue类似 --> 将q作为参数传递给子进程。

 from multiprocessing import Process, Queue

 def f(q,n):
q.put([42, n, 'hello']) if __name__ == '__main__':
q = Queue()
p_list=[]
for i in range(3):
p = Process(target=f, args=(q,i))
p_list.append(p)
p.start()
print(q.get())
print(q.get())
print(q.get())
for i in p_list:
i.join()

Demo1

3.2 Pipes --> 通过管道Pipe实现。

 from multiprocessing import Process, Pipe

 def f(conn):
conn.send([42, None, 'hello'])
conn.close() if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
p.join()

Demo2

3.3 数据共享(Manager)

Manager()返回的管理器对象控制一个服务器进程,该进程保存Python对象并允许其他进程使用代理操作它们。

 from multiprocessing import Process, Manager

 def f(d, l,n):
d[n] = ''
d[''] = 2
d[0.25] = None
l.append(n)
print(l) if __name__ == '__main__':
with Manager() as manager:
d = manager.dict() l = manager.list(range(5))
p_list = []
for i in range(10):
p = Process(target=f, args=(d, l,i))
p.start()
p_list.append(p)
for res in p_list:
res.join() print(d)
print(l)

Demo

3.4 进程同步

Without using the lock output from the different processes is liable to get all mixed up.  如果不使用来自不同进程的锁定输出,则可能会混淆不清。

 from multiprocessing import Process, Lock

 def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release() if __name__ == '__main__':
lock = Lock() for num in range(10):
Process(target=f, args=(lock, num)).start()

3.5 进程池

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中有两个方法:

  • apply
  • apply_async
 from  multiprocessing import Process,Pool
import time def Foo(i):
time.sleep(2)
return i+100 def Bar(arg):
print('-->exec done:',arg) pool = Pool(5) for i in range(10):
pool.apply_async(func=Foo, args=(i,),callback=Bar)
#pool.apply(func=Foo, args=(i,)) print('end')
pool.close()
pool.join()

四、协程

协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:

协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

协程的好处:

  • 无需线程上下文切换的开销
  • 无需原子操作锁定及同步的开销
  • "原子操作(atomic operation)是不需要synchronized",所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。
  • 方便切换控制流,简化编程模型
  • 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。

缺点:

  • 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
  • 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序

4.1 使用yield实现协程操作

 1 import time 
2 import queue
3 def consumer(name):
print("--->starting eating baozi...")
while True:
new_baozi = yield
print("[%s] is eating baozi %s" % (name,new_baozi))
#time.sleep(1) def producer(): r = con.__next__()
r = con2.__next__()
n = 0
while n < 5:
n +=1
con.send(n)
con2.send(n)
print("\033[32;1m[producer]\033[0m is making baozi %s" %n ) if __name__ == '__main__':
con = consumer("c1")    # 创建生成器对象
con2 = consumer("c2")    # 创建生成器对象
p = producer()        # 执行producer函数

协程标准定义,即符合什么条件就能称之为协程:

  1. 必须在只有一个单线程里实现并发
  2. 修改共享数据不需加锁
  3. 用户程序里自己保存多个控制流的上下文栈
  4. 一个协程遇到IO操作自动切换到其它协程

基于上面这4点定义,我们刚才用yield实现的程并不能算是合格的线程,因为它有一点功能没实现。

4.2 Greenlet

greenlet是一个用C实现的协程模块,相比与python自带的yield,它可以使你在任意函数之间随意切换,而不需把这个函数先声明为generator

 # -*- coding:utf-8 -*-

 from greenlet import greenlet

 def test1():
print(12)
gr2.switch()
print(34)
gr2.switch() def test2():
print(56)
gr1.switch()
print(78) gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

感觉确实用着比generator还简单了,但好像还没有解决一个问题,就是遇到IO操作,自动切换,并不对。

4.3 Gevent

Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

 import gevent

 def func1():
print('\033[31;1m李闯在跟海涛搞...\033[0m')
gevent.sleep(2)
print('\033[31;1m李闯又回去跟继续跟海涛搞...\033[0m') def func2():
print('\033[32;1m李闯切换到了跟海龙搞...\033[0m')
gevent.sleep(1)
print('\033[32;1m李闯搞完了海涛,回来继续跟海龙搞...\033[0m') gevent.joinall([
gevent.spawn(func1),
gevent.spawn(func2),
#gevent.spawn(func3),
])

输出:

李闯在跟海涛搞...
李闯切换到了跟海龙搞...
李闯搞完了海涛,回来继续跟海龙搞...
李闯又回去跟继续跟海涛搞...

五、补充

5.1 同步与异步的性能区别

 import gevent

 def task(pid):
"""
Some non-deterministic task
"""
gevent.sleep(0.5)
print('Task %s done' % pid) def synchronous():
for i in range(1,10):
task(i) def asynchronous():
threads = [gevent.spawn(task, i) for i in range(10)]
gevent.joinall(threads) print('Synchronous:')
synchronous() print('Asynchronous:')
asynchronous()

上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,后者阻塞当前流程,并执行所有给定的greenlet。执行流程只会在 所有greenlet执行完后才会继续向下走。

5.2 遇到IO阻塞时会自动切换任务(gevent 库中的 monkey 方法)--> 补丁

  ------->能够最大程度监听IO阻塞,提高效率。

 from gevent import monkey
monkey.patch_all() import gevent
from urllib.request import urlopen def f(url):
print('GET: %s' % url)
resp = urlopen(url)
data = resp.read()
print('%d bytes received from %s.' % (len(data), url)) gevent.joinall([
gevent.spawn(f, 'https://www.python.org/'),
gevent.spawn(f, 'https://www.yahoo.com/'),
gevent.spawn(f, 'https://github.com/'),
])

最新文章

  1. Zabbix low-level discovery
  2. 一篇不错的讲解Java异常的文章(转载)原作者已没法考证
  3. JS - Cookie: getCookie, setCookie
  4. ok6410 android driver(8)
  5. NABCD模式
  6. iOS机器学习-TensorFlow
  7. WebService中控制字符的处理
  8. 【转】spring3 MVC实战,手工搭建Spring3项目demo
  9. 2015年10月15日学习html基础笔记
  10. 在Windows Server 2012服务器上安装可靠多播协议
  11. GitCam一款Gif动画制作软件
  12. Hadoop(八)Java程序访问HDFS集群中数据块与查看文件系统
  13. SpringContextUtil 的配置和调用
  14. Spring Boot 参数校验
  15. P1494 [国家集训队]小Z的袜子
  16. Linux命令行参数前加--,-和不加杠
  17. django之模型层(model)--多表相关操作(图书管理小练习)
  18. BZOJ3425[POI2013]Polarization——DP+bitset+分块
  19. 4.前端注册表单验证 &amp;&amp; 表单回填
  20. 如何在ubuntu安装phpstorm

热门文章

  1. linux 编译安装php7
  2. HDU 5412——CRB and Queries——————【线段树套Treap(并没有AC)】
  3. intellijidea课程 intellijidea神器使用技巧 3-3 postfix
  4. 写英文bug的经验总结
  5. Errors while uninstall the reporting extensions
  6. .Net CIL
  7. win10 安装mysql zip 压缩包版
  8. myVision云服务商业数据分析解决方案
  9. Altium_Designer-怎么将“原理图的更改”更新到“pcb图”?
  10. SQL语句关于时间的查询小心得,希望大家给点意见