复习

# 信号量
from multiprocessing import Semaphore
# 用锁的原理实现的,内置了一个计数器
# 在同一个事件,只能有指定数量的进程执行某一段被控制住的代码
# 事件
# wait阻塞受到事件控制的同步组件
# 状态 True Flase is_set
# true--》false 用clear()
# false --->true 用set()
# wait方法 状态为true不阻塞 状态为false的时候阻塞 # 队列
# Queue
# put 当队列满的时候阻塞等待队列有空位置
# get 当队列空的时候阻塞等待队列有数据
# full empty 不完全准确 # JoinableQuere
# task_done 与get连用
# join 与put连用

管道

from multiprocessing import Pipe,Process
def func(conn1,conn2):
conn2.close()
while True:
try:
msg = conn1.recv()
print(msg)
except EOFError:
conn1.close()
break if __name__ == '__main__':
conn1,conn2 = Pipe()
Process(target=func,args=(conn1,conn2)).start()
conn1.close()
for i in range(20):
conn2.send('吃了吗')
conn2.close()
from multiprocessing import  Pipe,Process
import time,random
def producer(con,pro,name,food):
con.close()
for i in range(4):
time.sleep(random.randint(1,3))
f = '%s生产%s%s'%(name,food,i)
print(f)
pro.send(f)
pro.close()
def consumer(con,pro,name):
pro.close()
while True:
try:
food = con.recv()
print('%s吃了%s'%(name,food))
time.sleep(random.randint(1,3))
except EOFError:
con.close()
break
if __name__ == '__main__':
con,pro = Pipe()
p = Process(target=producer,args = (con,pro,'egon','泔水'))
p.start()
c = Process(target=consumer,args = (con,pro,'alex'))
c.start()
con.close()
pro.close()
进程之间的数据共享
from multiprocessing import  Manager,Process,Lock

def main(dic,lock):
lock.acquire()
dic['count'] -= 1
lock.release() if __name__ == '__main__':
m = Manager()
l = Lock()
dic = m.dict({'count':100})
p_list = []
for i in range(50):
p = Process(target=main,args=(dic,l))
p.start()
p_list.append(p)
for i in p_list: i.join()
print('主进程:',dic)

进程池

# 为什么有进程池
# 效率
# 每开启进程,开启属于这个进程的内存空间
# 寄存器 堆栈 文件
# 进程过多,操作系统调度进程
# 进程池
# python中的先创建一个属于进程的池子
# 这个池子指定能存放多少个进程
# 先将这些进程创建好
from multiprocessing import Pool
import os,time
def func(n):
print('start func%s'%n,os.getpid())
time.sleep(1)
print('end func%s'%n,os.getpid())
if __name__ == '__main__':
p = Pool(5)
for i in range(10):
p.apply_async(func,args=(i,))
p.close() #结束进程池接受任务
p.join() #感知进程池中的任务执行结束

socket_server-进程池

#server
import socket
from multiprocessing import Pool def func(conn):
conn.send(b'hello')
print(conn.recv(1024).decode('utf-8'))
conn.close()
if __name__ == '__main__':
p = Pool(5)
sk = socket.socket()
sk.bind(('127.0.0.1',8080))
sk.listen()
while True:
conn,addr = sk.accept()
p.apply_async(func,args=(conn,))
sk.close()
#client
import socket sk = socket.socket()
sk.connect(('127.0.0.1',8080))
ret = sk.recv(1024).decode('utf-8')
print(ret)
msg = input('>>>').encode('utf-8')
sk.send(msg)
sk.close()

进程池返回值

# p.map(funcname,iterable)  默认异步的执行任务,自带close和join
# p.apply 同步调用
# p.apply_async 异步调用 和主进程完全异步 需要手动close和join
from multiprocessing import Pool
def func(i):
return i*i if __name__ == '__main__':
p = Pool(5)
for i in range(10):
res = p.apply(func,args=(i,)) #apply的结果就是func的返回值
print(res)
import time
from multiprocessing import Pool
def func(i):
time.sleep(0.5)
return i*i if __name__ == '__main__':
p = Pool(5)
res_list = []
for i in range(10):
res = p.apply_async(func,args=(i,)) #
res_list.append(res)
for res in res_list:print(res.get())
#map
import time
from multiprocessing import Pool
def func(i):
time.sleep(0.5)
return i*i if __name__ == '__main__':
p = Pool(5)
ret = p.map(func,range(10))
print(ret) [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

进程池的回调函数

from multiprocessing import Pool

def func1(n):
print('in func1')
return n*n
def func2(nn):
print('in func2')
print(nn) if __name__ == '__main__':
p = Pool(5) p.apply_async(func1,args=(10,),callback=func2)
p.close()
p.join()

in func1
in func2
100

 
from multiprocessing import Pool
import os
def func1(n):
print('in func1',os.getpid())
return n*n
def func2(nn): #参数只能是func1的返回值
print('in func2',os.getpid())
print(nn) if __name__ == '__main__':
print('主进程: ',os.getpid())
p = Pool(5) p.apply_async(func1,args=(10,),callback=func2)
p.close()
p.join() 主进程: 11172
in func1 11760
in func2 11172
100

最新文章

  1. 超千个节点OpenStack私有云案例(1):CERN 5000+ 计算节点私有云
  2. HTTP Header
  3. 推荐一篇 OAuth 2.0 必读文章
  4. FFmpeg frei0r water 滤镜
  5. HVTableView 分享组
  6. 高性能JavaScript-JS脚本加载与执行对性能的影响
  7. Device.js——检测设备平台、操作系统的Javascript 库
  8. 使用POI 导入excel
  9. mysql性能优化配置总结
  10. linux kvm虚拟机快速构建及磁盘类型
  11. ●BZOJ 3129 [Sdoi2013]方程
  12. 【Android 应用开发】 自定义 圆形进度条 组件
  13. 关于web项目中静态资源加载不了的一些解决思路
  14. ES6 模块与 CommonJS 模块的差异
  15. 2199: [Usaco2011 Jan]奶牛议会 2-sat
  16. 【javaScript】数组的相关操作
  17. CVE-2017-8570漏洞利用
  18. RAPID程序设计
  19. 代码记录——phase16,block32
  20. 浅谈在Java开发中的枚举的作用和用法

热门文章

  1. redis安装&启动
  2. 037 Sudoku Solver 解数独
  3. B. Batch Sort
  4. (转)io优化
  5. Redis特性之持久化机制
  6. 【转】"超时时间已到。在操作完成之前超时时间已过或服务器未响应"的解决方法
  7. Dev控件工具箱安装
  8. There is much opportunity for anyone willing to dedicate himself to his labors.
  9. php分页代码及总结
  10. SaaS 系统架构设计经验总结