线程中的Queue

 import time
import threading
import queue
import random def putMessage():
for i in "Hello World!!!":
q.put(i)
time.sleep(random.random())
# print("size:%s"%q.qsize()) # 查看队列长度
#
# print("full:%s"%q.full()) # 查看队列是否为满的状态
#
# print("empty:%s"%q.empty())        # 查看队列是否为空的状态 def getMessage():
while True:
if not q.empty():
print(q.get())
else:
time.sleep(random.random()) if __name__ == "__main__":
q = queue.Queue() t1 = threading.Thread(target=putMessage)
t1.setDaemon(True)
t1.start() t2 = threading.Thread(target=getMessage)
t2.setDaemon(True)
t2.start() time.sleep(10)

进程中的Queue

 from multiprocessing import Queue

 q = Queue(3)    # 初始化一个Queue对象,最多可以put三条信息,如果不写3,那么久无限制

 q.put("Message01")         # 添加信息的方法
q.put("Message02")
print(q.full()) # 查看 队列 是否满了的方法 q.put("Message03")
print(q.full()) # 因为队列已经满了,所以下面的消息会出现异常,第一个 try 会等待2秒后再抛出异常,
# 第二个 try 会立刻抛出异常
try:
q.put("Message04", True, 2)
except:
print("消息队列已满,现有消息数量:%s"%q.qsize()) try:
q.put_nowait("Message04")
except:
print("消息队列已满,现有消息数量:%s"%q.qsize()) # 推荐使用的方式,先判断队列是否已满,再写入
if not q.full():
q.put_nowait("Message04") # 读取消息的时候,先判断消息队列是否为空,再读取
if not q.empty():
for i in range(q.qsize()):
print(q.get_nowait())

队列:
  为什么要用队列?列表也很好用啊。:数据安全
  创建方法:
    模式1:FIFO -- queue.Queue()
    模式2:FILO -- queue.LifoQueue()
    模式3:priorty -- queue.PriorityQueue()
      q.put([1, 'hello'])
      q.put([2, 'world'])
      级别 1 比 2 低, 1 先出来

  方法的参数:
    put()
      调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常
    get()
      调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。

  其它方法:
    q.empty() 如果队列为空,返回True,反之False
    q.full() 如果队列满了,返回True,反之False
    q.full 与 maxsize 大小对应
    q.get([block[, timeout]]) 获取队列,timeout等待时间
    q.get_nowait() 相当q.get(False)
    非阻塞 q.put(item) 写入队列,timeout等待时间
    q.put_nowait(item) 相当q.put(item, False)
    q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号

    q.join() 实际上意味着等到队列为空,再执行别的操作
    # join多少次,就需要用几次 task_done

多进程优点:

  1. 可以利用多核实现并行运算

缺点:

  1. 开销大
  2. 通信困难

管道Pipe

multiprocessing.Pipe([duplex])
返回2个连接对象(conn1, conn2),代表管道的两端,
默认是双向通信.如果duplex=False,conn1只能用来接收消息,conn2只能用来发送消息. 主要用到的方法:
send() 发送数据
recv() 接收数据
 import multiprocessing

 from multiprocessing import Process, Pipe

 def send(pipe):
pipe.send(['spam'] + [42, 'egg'])
pipe.close() def talk(pipe):
pipe.send(dict(name = 'Bob', spam = 42))
reply = pipe.recv()
print('talker got:', reply) if __name__ == '__main__':
(con1, con2) = Pipe()
sender = Process(target = send, name = 'send', args = (con1, ))
sender.start()
print("con2 got: %s" % con2.recv())#从send收到消息
con2.close() (parentEnd, childEnd) = Pipe()
child = Process(target = talk, name = 'talk', args = (childEnd,))
child.start()
print('parent got:', parentEnd.recv())
parentEnd.send({x * 2 for x in 'spam'})
child.join()
print('parent exit')

进程间的信息共享Manage

Python中进程间共享数据,处理基本的queue,pipe外,还提供了更高层次的封装。使用multiprocessing.Manager可以简单地使用这些高级接口。

Manager支持的类型有list,dict,Namespace,  Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。
 import multiprocessing
import time def worker(d, key, value):
d[key] = value if __name__ == '__main__':
mgr = multiprocessing.Manager() d = mgr.dict()
jobs = [multiprocessing.Process(target=worker, args=(d, i, i*2))
for i in range(10)
]
for j in jobs:
j.start()
for j in jobs:
j.join()
print('Results:' )
for key, value in enumerate(dict(d)):
print("%s=%s:%s" % (key, value, d[value])) print("================================================================") manager = multiprocessing.Manager()
Global = manager.Namespace()
Global.x = 10
Global.y = 'hello'
print(Global) print("==================================================================")
问题:

列表不可变
在学习python多进程管理manager时候,当不使用join对当前进程(主进程)进行阻塞时会报错: 这样进行一下总结:在使用manager管理/进行多进程及其数据交互时候,必须对每一个manager内的进程进行join-------待所有子进程完成后再回到主进程。

多进程之进程池

 import time
from multiprocessing import Pool def worker():
for i in range(10):
print("From worker %s"%i)
time.sleep(0.5) def foo():
for i in range(10):
print("From foo %s"%i)
time.sleep(0.5) def bar():
for i in range(10):
print("From bar %s"%i)
time.sleep(0.5) if __name__ == "__main__":
pool = Pool(4) # 创建Pool对象, 3 表示同时最多可以增加 3 条进程
pool.apply_async(worker)
pool.apply_async(worker)
pool.apply_async(worker)
pool.apply_async(foo)
pool.apply_async(foo)
pool.apply_async(foo)
pool.apply_async(bar)
pool.apply_async(bar)
pool.apply_async(bar) pool.close() # 关闭进程池,禁止添加任务
pool.join() # 等待子进程结束后,主进程才往下走
print("Is done...")

并发之协程

 import time

 def consumer():
r = ''
while True:
# 3、consumer通过yield拿到消息,处理,又通过yield把结果传回;
# yield指令具有return关键字的作用。然后函数的堆栈会自动冻结(freeze)在这一行。
# 当函数调用者的下一次利用next()或generator.send()或for-in来再次调用该函数时,
# 就会从yield代码的下一行开始,继续执行,再返回下一次迭代结果。通过这种方式,迭代器可以实现无限序列和惰性求值。
n = yield r
if not n:
return
print('[CONSUMER] ←← Consuming %s...' % n)
time.sleep(1)
r = '200 OK'
def produce(c):
# 1、首先调用c.next()启动生成器
next(c)
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] →→ Producing %s...' % n)
# 2、然后,一旦生产了东西,通过c.send(n)切换到consumer执行;
cr = c.send(n)
# 4、produce拿到consumer处理的结果,继续生产下一条消息;
print('[PRODUCER] Consumer return: %s' % cr)
# 5、produce决定不生产了,通过c.close()关闭consumer,整个过程结束。
c.close()
if __name__=='__main__':
# 6、整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。
c = consumer()
produce(c)

协程封装之greenlet

 import greenlet
import time
import random """
创建方法:greenlet.greenlet(self, run=None, parent=None)
主要方法:
a.switch() 切换到 a 里面执行
"""
def foo():
for i in range(10):
print("foo:",i)
time.sleep(random.random())
gb.switch() def bar():
for i in range(10):
print("bar:", i)
time.sleep(random.random())
gf.switch() if __name__ == "__main__":
gf = greenlet.greenlet(foo)
gb = greenlet.greenlet(bar) gf.switch()

协程封装之 gevent

 import gevent
from gevent import monkey
import time
import random monkey.patch_all() # 如果遇到 IO 阻塞,那么就切换到下一个 协程的程序 def foo():
for i in range(10):
print("foo:",i)
time.sleep(random.random()) def bar():
for i in range(10):
print("bar:", i)
time.sleep(random.random()) if __name__ == "__main__":
gevent.joinall([gevent.spawn(foo),
gevent.spawn(bar)]) # 固定用法,将里面的函数放入到 协程的执行序列中

最新文章

  1. 使用strace 工具跟踪系统调用和信号
  2. python中的深拷贝和潜拷贝
  3. TX Textcontrol 使用总结四——打印
  4. Unity中的C#规则
  5. 开发一个支持多用户在线的FTP程序
  6. Wap touch flispan demo
  7. Java常用类库--观察者设计模式( Observable类Observer接口)
  8. wemall app中基于Java获取和保存图片的代码
  9. oracle数据库密码过期修改注意事项
  10. poj2083 Fractal
  11. Numpy 笔记: 多维数组的切片(slicing)和索引(indexing)【转】
  12. iOS 11开发教程(十八)iOS11应用视图之使用代码添加按钮
  13. log4Net 高性能写入和CSV格式
  14. uva-10718-贪心
  15. Flickr Hosts
  16. python怎么安装requests、beautifulsoup4等第三方库
  17. Cisco Smart Install远程命令执行漏洞
  18. SQL Server 执行计划分析
  19. 转:Uncovering Drupalgeddon 2(cve-2018-7600)漏洞深度解析(附漏洞利用代码地址)
  20. Spring-1-F Dice(HDU 5012)解题报告及测试数据

热门文章

  1. (转)Java中Image的水平翻转、缩放与自由旋转操作
  2. ES6数组Api扩充
  3. PHP算法之最接近的三数之和
  4. nodejs 在MYSQL 数据库中插入和查询数据
  5. 发现一个新的远程软件 gotohttp
  6. redis数据备份还原
  7. nodejs的npm修改源
  8. mac系统下通过安装包的形式安装mongdb
  9. 线段树区间离散化——牛客多校E
  10. 兼容火狐浏览器的select下拉框样式