Python中的多进程与多线程(一)
一、背景
最近在Azkaban的测试工作中,需要在测试环境下模拟线上的调度场景进行稳定性测试。故而重操python旧业,通过python编写脚本来构造类似线上的调度场景。在脚本编写过程中,碰到这样一个需求:要在测试环境创建10000个作业流。
最开始的想法是在一个azkaban project下循环调用10000次create job接口(每个Flow只包含一个job)。由于azkaban它本身没有增加/删除作业流的接口,所有的作业流修改、增加、删除其实都是通过重新上传项目zip包实现的,相应地每次调猛犸前端的create job接口,实际上是在猛犸端对zip包的内容进行了重新的整合后再重新上传zip包到azkaban,整个过程可以拆解成如下过程:解压zip包获得zip包内容,变更zip包内的文件内容,重新打包zip包,上传到azkaban。因此,随着循环次数越往后,zip包包含的内容会越多,接口执行一次的时间就越长。实践发现,第一次调该接口的时间大致不到1秒,到循环1000次的时候接口调用一次的时间就达到了将近3秒。因此,如果指望一个循环10000次来构造该场景,显然要耗费巨大的时间。
在此背景下, 自然而然地就想到用多进程/多线程的方式来处理该问题。
二、“多任务”的操作系统基础
大家都知道,操作系统可以同时运行多个任务。比如你一边听音乐,一边聊IM,一边写博客等。现在的cpu大都是多核的,但即使是过去的单核cpu也是支持多任务并行执行。
单核cpu执行多任务的原理:操作系统交替轮流地执行各个任务。先让任务1执行0.01秒,然后切换到任务2执行0.01秒,再切换到任务3执行0.01秒...这样往复地执行下去。由于cpu的执行速度非常快,所以使用者的主观感受就是这些任务在并行地执行。
多核cpu执行多任务的原理:由于实际应用中,任务的数量往往远超过cpu的核数,所以操作系统实际上是把这些多任务轮流地调度到每个核心上执行。
对于操作系统来说,一个应用就是一个进程。比如打开一个浏览器,它是一个进程;打开一个记事本,它是一个进程。每个进程有它特定的进程号。他们共享系统的内存资源。进程是操作系统分配资源的最小单位。
而对于每一个进程而言,比如一个视频播放器,它必须同时播放视频和音频,就至少需要同时运行两个“子任务”,进程内的这些子任务就是通过线程来完成。线程是最小的执行单元。一个进程它可以包含多个线程,这些线程相互独立,同时又共享进程所拥有的资源。
三、Python多进程编程
1. multiprocessing
multiprocessing是Python提供的一个跨平台的多进程模块,通过它可以很方便地编写多进程程序,在不同的平台(Unix/Linux, Windows)都可以执行。
下面就是使用multiprocessing编写多进程程序的代码:
#!/usr/bin/python
# -*- coding: utf-8 -*
__author__ = 'zni.feng'
import sys
reload (sys)
sys.setdefaultencoding('utf-8') from multiprocessing import Process
import os
import time #子进程fun
def child_projcess_fun(name):
print 'Child process %s with processId %s starts.' % (name, os.getpid())
time.sleep(3)
print 'Child process %s with processId %s ends.' % (name, os.getpid()) if __name__ == "__main__":
print 'Parent processId is: %s.' % os.getpid()
p = Process(target = child_projcess_fun, args=('zni',))
print 'Process starts'
p.start() #开始进程
p.join() #等待子进程结束后再继续往下执行
print 'Process ends.'
程序的输出:
Parent processId is: 11076.
Process starts
Child process zni with processId 11077 starts.
Child process zni with processId 11077 ends.
Process ends.
[Finished in 3.1s]
2. Pool
某些情况下,我们希望批量创建多个子进程,或者给定子进程数的上限,避免无限地消耗系统的资源。通过Pool(进程池)的方式,就可以完成这项工作,下面是使用Pool的代码:
#!/usr/bin/python
# -*- coding: utf-8 -*
__author__ = 'zni.feng'
import sys
reload (sys)
sys.setdefaultencoding('utf-8') from multiprocessing import Pool
import os, time def child_process_test(name, sleep_time):
print 'Child process %s with processId %s starts.' % (name, os.getpid())
time.sleep(sleep_time)
print 'Child process %s with processId %s ends.' % (name, os.getpid()) if __name__ == "__main__":
print 'Parent processId is: %s.' % os.getpid()
p = Pool() #进程池默认大小是cpu的核数
#p = Pool(10) #生成一个容量为10的进程池,即最大同时执行10个子进程
for i in range(5):
p.apply_async(child_process_test, args=('zni_'+str(i), i+1,)) #p.apply_async向进程池提交目标请求 print 'Child processes are running.'
p.close()
p.join() #用来等待进程池中的所有子进程结束再向下执行代码,必须在p.close()或者p.terminate()之后执行
print 'All Processes end.'
程序的输出:
Parent processId is: 5050.
Child processes are running.
Child process zni_0 with processId 5052 starts.
Child process zni_1 with processId 5053 starts.
Child process zni_2 with processId 5054 starts.
Child process zni_3 with processId 5055 starts.
Child process zni_0 with processId 5052 ends.
Child process zni_4 with processId 5052 starts.
Child process zni_1 with processId 5053 ends.
Child process zni_2 with processId 5054 ends.
Child process zni_3 with processId 5055 ends.
Child process zni_4 with processId 5052 ends.
All Processes end.
[Finished in 6.2s]
close()方法和terminate()方法的区别:
close:关闭进程池,使之不能再添加新的进程。已经执行的进程会等待继续执行直到结束。
terminate:强制终止线程池,正在执行的进程也会被强制终止。
3. 进程间通信
Python的multiprocessing模块提供了多种进程间通信的方式,如Queue、Pipe等。
3.1 Queue、Lock
Queue是multiprocessing提供的一个模块,它的数据结构就是"FIFO——first in first out"的队列,常用的方法有:put(object)入队;get()出队;empty()判断队列是否为空。
Lock:当多个子进程对同一个queue执行写操作时,为了避免并发操作产生冲突,可以通过加锁的方式使得某个子进程对queue拥有唯一的写权限,其他子进程必须等待该锁释放后才能再开始执行写操作。
下面就是使用Queue进行进程间通信的代码:在父进程里创建两个子进程,分别实现对queue的读和写操作
#!/usr/bin/python
# -*- coding: utf-8 -*
__author__ = 'zni.feng'
import sys
reload (sys)
sys.setdefaultencoding('utf-8')
from multiprocessing import Process, Queue, Lock
import os, time, random
#写数据进程
def write(q, lock, name):
print 'Child Process %s starts' % name
#获得锁
lock.acquire()
for value in ['A' , 'B', 'C']:
print 'Put %s to queue...' % value
q.put(value)
time.sleep(random.random())
#释放锁
lock.release()
print 'Child Process %s ends' % name #读数据进程
def read(q, lock, name):
print 'Child Process %s starts' % name
while True: #持续地读取q中的数据
value =q.get()
print 'Get %s from queue.' % value
print 'Child Process %s ends' % name if __name__ == "__main__":
#父进程创建queue,并共享给各个子进程
q= Queue()
#创建锁
lock = Lock()
#创建第一个“写”子进程
pw = Process(target = write , args=(q, lock, 'WRITE', ))
#创建“读”进程
pr = Process(target = read, args=(q,lock, 'READ',))
#启动子进程pw,写入:
pw.start()
#启动子进程pr,读取:
pr.start()
#等待pw结束:
pw.join()
#pr是个死循环,通过terminate杀死:
pr.terminate()
print 'Test finish.'
程序的输出结果为:
Child Process WRITE starts
Put A to queue...
Child Process READ starts
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
Child Process WRITE ends
Test finish.
[Finished in 2.0s]
3.2 Pipe
Pipe是另一种进程间通信的方式,俗称“管道”。它由两端组成,一端往管道里写入数据,另一端从管道里读取数据。
下面就是使用Pipe通信的代码:
#!/usr/bin/python
# -*- coding: utf-8 -*
__author__ = 'zni.feng'
import sys
reload (sys)
sys.setdefaultencoding('utf-8')
from multiprocessing import Process, Pipe
import os, time, random #发送数据进程
def send(child_pipe, name):
print 'Child Process %s starts' % name
child_pipe.send('This is Mr.Ni')
child_pipe.close()
time.sleep(random.random())
print 'Child Process %s ends' % name #接收数据进程
def recv(parent_pipe, name):
print 'Child Process %s starts' % name
print parent_pipe.recv()
time.sleep(random.random())
print 'Child Process %s ends' % name if __name__ == "__main__":
#创建管道
parent,child = Pipe()
#创建send进程
ps = Process(target=send, args=(child, 'SEND'))
#创建recv进程
pr = Process(target=recv, args=(parent, 'RECEIVE'))
#启动send进程
ps.start()
#等待send进程结束
ps.join()
#启动recv进程
pr.start()
#等待recv进程结束
pr.join()
print 'Test finish.'
程序的输出结果如下:
Child Process SEND starts
Child Process SEND ends
Child Process RECEIVE starts
This is Mr.Ni
Child Process RECEIVE ends
Test finish.
[Finished in 1.8s]
最新文章
- 安装oracle数据库,登录 时无法连接目标机
- libmegjb.so加载问题调试和分析
- Java中如何将String转成Date
- nyoj 170 网络的可靠性
- nginx 的中文配置详细解释
- Spring Bean 生命周期2
- MVC4.0网站发布和部署到IIS7.0上的方法【转:http://www.th7.cn/Program/net/201403/183756.shtml】
- Ubuntu的挂起和休眠
- ASP.NET得到系统相关信息
- [python]获取字符串类型
- git使用教程之git分支
- Morris遍历-如何用空间复杂度O(1)来遍历二叉树
- Matlab 如何/怎样 读取图片 显示图片 转换成灰度图
- SQL盲注学习
- Arduino语法详解_含示例详解
- python 全栈开发,Day27(复习, defaultdict,Counter,时间模块,random模块,sys模块)
- React绑定事件动态化的实现方法
- 二、Html5元素、属性、格式化
- Python资料收藏(杂乱版)
- pandas.read_csv参数整理
热门文章
- 【探索】机器指令翻译成 JavaScript
- 【趣事】用 JavaScript 对抗 DDOS 攻击 (下)
- XStream将java对象转换为xml时,对象字段中的下划线“_”,转换后变成了两个的解决办法
- 自定义Inspector检视面板
- 在Sublime Text 3上安装代码格式化插件CodeFormatter
- 80 端口被占用 pid=4
- C#中如何创建PDF网格并插入图片
- 就这么漂来漂去---一个毕业三个月的java程序员的裸辞风波
- MySQL 系列(一) 生产标准线上环境安装配置案例及棘手问题解决
- Visual Studio:error MSB8020(搬运)