###############   守护进程  ##############

"""
守护进程 父进程中将一个子进程设置为守护进程,那么这个子进程会随着主进程的结束而结束。
主进程创建守护进程
其一:守护进程会在主进程代码执行结束后就终止
其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止 """
# 第一版:主进程结束了,子进程还没有结束,
# import time
# from multiprocessing import Process
#
# def func():
# while True:
# time.sleep(1)
# print("我还活着")
#
#
# if __name__ == '__main__':
# p=Process(target=func)
# p.start()
# i = 0
# while i<10:
# time.sleep(1)
# i+=1
# print("主进程结束") # 守护进程,就是主进程代码结束了而结束,记住不是主进程彻底结束,而是代码结束,
import time
from multiprocessing import Process def func():
while True:
time.sleep(1)
print("我还活着") if __name__ == '__main__':
p = Process(target=func)
p.daemon = True # 设置子进程为守护进程, #一定要在p.start()前设置,设置p为守护进程
p.start()
i = 0
while i < 5:
time.sleep(1)
i += 1
print("主进程代码结束")

其他的方法:

from multiprocessing import Process
import time
def func(name):
print("%s在test...."%name) if __name__ == "__main__":
p = Process(target=func,args=("andy",))
p.start()
print(p.is_alive()) # # 判断一个进程是否活着
p.terminate() # 结束一个进程,
time.sleep(1)
print(p.is_alive())

##################       进程锁              #####################

"""
互斥锁:
通过刚刚的学习,我们千方百计实现了程序的异步,让多个任务可以同时在几个进程中并发处理,
他们之间的运行没有顺序,一旦开启也不受我们控制。
尽管并发编程让我们能更加充分的利用IO资源,但是也给我们带来了新的问题。 当多个进程使用同一份数据资源的时候,就会因为竞争而引发数据安全或顺序混乱问题。 """

下面的代码演示了不同的任务争抢一个资源(终端输出)的场景。

from multiprocessing import Process
import time
import random def task1():
print('这是 task1 任务'.center(30, '-'))
print('task1 进了洗手间')
time.sleep(random.randint(1, 3))
print('task1 办事呢...')
time.sleep(random.randint(1, 3))
print('task1 走出了洗手间') def task2():
print('这是 task2 任务'.center(30, '-'))
print('task2 进了洗手间')
time.sleep(random.randint(1, 3))
print('task2 办事呢...')
time.sleep(random.randint(1, 3))
print('task2 走出了洗手间') def task3():
print('这是 task3 任务'.center(30, '-'))
print('task3 进了洗手间')
time.sleep(random.randint(1, 3))
print('task3 办事呢...')
time.sleep(random.randint(1, 3))
print('task3 走出了洗手间') if __name__ == '__main__':
p1 = Process(target=task1)
p2 = Process(target=task2)
p3 = Process(target=task3) p1.start()
p2.start()
p3.start() """
---------这是 task1 任务----------
task1 进了洗手间
---------这是 task2 任务----------
task2 进了洗手间
---------这是 task3 任务----------
task3 进了洗手间
task3 办事呢...
task1 办事呢...
task3 走出了洗手间
task2 办事呢...
task2 走出了洗手间
task1 走出了洗手间
"""

通过加锁来控制

from multiprocessing import Process, Lock
import time
import random # 生成一个互斥锁
mutex_lock = Lock() def task1(lock):
# 锁门
lock.acquire()
print('这是 task1 任务'.center(30, '-'))
print('task1 进了洗手间')
time.sleep(random.randint(1, 3))
print('task1 办事呢...')
time.sleep(random.randint(1, 3))
print('task1 走出了洗手间')
# 释放锁
lock.release() def task2(lock):
# 锁门
lock.acquire()
print('这是 task2 任务'.center(30, '-'))
print('task2 进了洗手间')
time.sleep(random.randint(1, 3))
print('task2 办事呢...')
time.sleep(random.randint(1, 3))
print('task2 走出了洗手间')
# 释放锁
lock.release() def task3(lock):
# 锁门
lock.acquire()
print('这是 task3 任务'.center(30, '-'))
print('task3 进了洗手间')
time.sleep(random.randint(1, 3))
print('task3 办事呢...')
time.sleep(random.randint(1, 3))
print('task3 走出了洗手间')
# 释放锁
lock.release() if __name__ == '__main__':
p1 = Process(target=task1, args=(mutex_lock, ))
p2 = Process(target=task2, args=(mutex_lock, ))
p3 = Process(target=task3, args=(mutex_lock, )) # 释放新建进程的信号,具体谁先启动无法确定
p1.start()
p2.start()
p3.start() """
---------这是 task2 任务----------
task2 进了洗手间
task2 办事呢...
task2 走出了洗手间
---------这是 task1 任务----------
task1 进了洗手间
task1 办事呢...
task1 走出了洗手间
---------这是 task3 任务----------
task3 进了洗手间
task3 办事呢...
task3 走出了洗手间 """

买票的案例:

并发出错:

from multiprocessing import Process, Lock
import json
import time
import random
import os def search():
time.sleep(0.5)
with open('db.json', 'r', encoding='utf8') as f:
data = json.load(f)
print('剩余票数:{}'.format(data.get('count'))) def buy(): with open('db.json', 'r', encoding='utf8') as f:
data = json.load(f)
if data.get('count', 0) > 0:
data['count'] -= 1
time.sleep(random.randint(1, 3))
with open('db.json', 'w', encoding='utf8') as f2:
json.dump(data, f2)
print('{}购票成功!'.format(os.getpid()))
else:
print('购票失败') def task():
search() # 查票并发
buy() # 串行买票 if __name__ == '__main__':
for i in range(10):
p = Process(target=task)
p.start()

加上锁

from multiprocessing import Process, Lock
import json
import time
import random
import os # 设置互斥锁
mutex_lock = Lock() def search():
time.sleep(0.5)
with open('db.json', 'r', encoding='utf8') as f:
data = json.load(f)
print('剩余票数:{}'.format(data.get('count'))) def buy(): with open('db.json', 'r', encoding='utf8') as f:
data = json.load(f)
if data.get('count', 0) > 0:
data['count'] -= 1
time.sleep(random.randint(1, 3))
with open('db.json', 'w', encoding='utf8') as f2:
json.dump(data, f2)
print('{}购票成功!'.format(os.getpid()))
else:
print('购票失败') def task(lock):
search() # 查票并发
lock.acquire()
buy() # 串行买票
lock.release() if __name__ == '__main__':
for i in range(10):
p = Process(target=task, args=(mutex_lock, ))
p.start()

###############         进程间的通信         ##############

"""
进程间的三种通信(IPC)方式: 方式一:队列(推荐使用) 方式二:管道(不推荐使用,了解即可)
管道相当于队列,但是管道不自动加锁 方式三:共享数据(不推荐使用,了解即可)
共享数据也没有自动加锁的功能,所以还是推荐用队列的。感兴趣的可以研究研究管道和共享数据 """

###############    进程间的通信---队列   ##############

"""
Queue介绍
我们可以创建一个共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。 Queue的实例q常用方法:
################################### Queue([maxsize])
创建共享的进程队列。
参数 :maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。
底层队列使用管道和锁定实现。 q.get( [ block [ ,timeout ] ] )
返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。
block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。
timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。 q.get_nowait( )
同q.get(False)方法。 q.put(item [, block [,timeout ] ] )
将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。
block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。
timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。 q.qsize()
返回队列中目前项目的正确数量。
此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。
在某些系统上,此方法可能引发NotImplementedError异常。 q.empty()
如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。
也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。 q.full()
如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。 """

基本的队列操作:

'''
multiprocessing模块支持进程间通信的两种主要形式:管道和队列
都是基于消息传递实现的,但是队列接口
''' from multiprocessing import Queue
q=Queue(3) #put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
# q.put(3) # 如果队列已经满了,程序就会停在这里,等待数据被别人取走,再将数据放入队列。
# 如果队列中的数据一直不被取走,程序就会永远停在这里。
try:
q.put_nowait(3) # 可以使用put_nowait,如果队列满了不会阻塞,但是会因为队列满了而报错。
except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去,但是会丢掉这个消息。
print('队列已经满了') # 因此,我们再放入数据之前,可以先看一下队列的状态,如果已经满了,就不继续put了。
print(q.full()) #满了 print(q.get())
print(q.get())
print(q.get())
# print(q.get()) # 同put方法一样,如果队列已经空了,那么继续取就会出现阻塞。
try:
q.get_nowait(3) # 可以使用get_nowait,如果队列满了不会阻塞,但是会因为没取到值而报错。
except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去。
print('队列已经空了') print(q.empty()) #空了

上面还没有设计到进程间的通信,下面看一个简单的主进程和子进程之间通信的例子:

import time
from multiprocessing import Process, Queue def f(q):
q.put([time.asctime(), 'hi', 'hello']) #调用主函数中p进程传递过来的进程参数 put函数为向队列中添加一条数据。 if __name__ == '__main__':
q = Queue() #创建一个Queue对象
p = Process(target=f, args=(q,)) #创建一个进程
p.start()
print(q.get())
p.join()

############       生产者消费者模型        ##############

"""
什么是生产者消费者模式?两个角色、一个场所
两个角色:
产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者;
一个场所:
生产者和消费者之间的中介就叫做缓冲区。 为什么要使用生产者和消费者模式?
如果不使用这种模式,
那么生产者就必须等待消费者处理完,才能继续生产数据。这就阻塞了,不能并发,
同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。
为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式。 使用了这种模式:
解决生产者和消费者的强耦合问题,生产者不需要等待消费者消费完了才可以生产了,而是直接扔给阻塞队列,
消费者也不需要等待生产者了,直接到阻塞队列取数据,
阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力 生产者/消费者模型的优点:
1、解耦,即降低生产者和消费者之间的依赖关系。
2、支持并发,即生产者和消费者可以是两个独立的并发主体,互不干扰的运行。
3、支持忙闲不均,平衡了生产者和消费者的处理能力
"""

# 队列的生产者和消费者模型
# 买包子的例子
# 有蒸包子的人,这就是生产者,有买包子的人,这就是消费者,
# 实际中,可能会有数据供需不平衡的问题,
# 就是数据生产的多了没有消费,所以我们要增加消费者,或者减少生产
# 数据消费的多了,我们要增加生产者,来解决这个问题,

# 基于队列实现生产者消费者模型

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
while True:
res=q.get()
time.sleep(random.randint(1,3))
print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q):
for i in range(10):
time.sleep(random.randint(1,3))
res='包子%s' %i
q.put(res)
print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__':
q=Queue()
#生产者们:即厨师们
p1=Process(target=producer,args=(q,)) #消费者们:即吃货们
c1=Process(target=consumer,args=(q,)) #开始
p1.start()
c1.start()
print('主')

此时的问题是主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。

解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环。

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
while True:
res=q.get()
if res is None:break #收到结束信号则结束
time.sleep(random.randint(1,3))
print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q):
for i in range(10):
time.sleep(random.randint(1,3))
res='包子%s' %i
q.put(res)
print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))
q.put(None) #发送结束信号
if __name__ == '__main__':
q=Queue()
#生产者们:即厨师们
p1=Process(target=producer,args=(q,)) #消费者们:即吃货们
c1=Process(target=consumer,args=(q,)) #开始
p1.start()
c1.start()
print('主')

注意:结束信号None,不一定要由生产者发,主进程里同样可以发,但主进程需要等生产者结束后才应该发送该信号

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
while True:
res=q.get()
if res is None:break #收到结束信号则结束
time.sleep(random.randint(1,3))
print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(q):
for i in range(2):
time.sleep(random.randint(1,3))
res='包子%s' %i
q.put(res)
print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__':
q=Queue()
#生产者们:即厨师们
p1=Process(target=producer,args=(q,)) #消费者们:即吃货们
c1=Process(target=consumer,args=(q,)) #开始
p1.start()
c1.start() p1.join()
q.put(None) #发送结束信号
print('主')

但上述解决方式,在有多个生产者和多个消费者时,我们则需要用一个很low的方式去解决

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
while True:
res=q.get()
if res is None:break #收到结束信号则结束
time.sleep(random.randint(1,3))
print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(name,q):
for i in range(2):
time.sleep(random.randint(1,3))
res='%s%s' %(name,i)
q.put(res)
print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__':
q=Queue()
#生产者们:即厨师们
p1=Process(target=producer,args=('包子',q))
p2=Process(target=producer,args=('骨头',q))
p3=Process(target=producer,args=('泔水',q)) #消费者们:即吃货们
c1=Process(target=consumer,args=(q,))
c2=Process(target=consumer,args=(q,)) #开始
p1.start()
p2.start()
p3.start()
c1.start() p1.join() #必须保证生产者全部生产完毕,才应该发送结束信号
p2.join()
p3.join()
q.put(None) #有几个消费者就应该发送几次结束信号None
q.put(None) #发送结束信号
print('主')

# 问题:有多个消费者的时候,只有一个消费者拿到这个值,然后结束了,但是拿不到的消费者,就还没有结束,
# 这种写法太麻烦了,如果有1000个还得了,怎么办?使用新的一个模块:JoinableQueue

JoinableQueue([maxsize]) 
创建可连接的共享进程队列。这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

"""
JoinableQueue的实例p除了与Queue对象相同的方法之外,还具有以下方法: q.task_done()
使用者使用此方法发出信号,表示q.get()
返回的项目已经被处理。如果调用此方法的次数大于从队列中删除的项目数量,将引发ValueError异常。 q.join()
生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。阻塞将持续到为队列中的每个项目均调用q.task_done()
方法为止。
下面的例子说明如何建立永远运行的进程,使用和处理队列上的项目。生产者将项目放入队列,并等待它们被处理。 """ from multiprocessing import Process, JoinableQueue
import time, random, os def consumer(q):
while True:
res = q.get()
time.sleep(random.randint(1, 3))
print('\033[45m%s 吃 %s\033[0m' % (os.getpid(), res))
q.task_done() # 向q.join()发送一次信号,证明一个数据已经被取走了 def producer(name, q):
for i in range(10):
time.sleep(random.randint(1, 3))
res = '%s%s' % (name, i)
q.put(res)
print('\033[44m%s 生产了 %s\033[0m' % (os.getpid(), res))
q.join() # 生产完毕,使用此方法进行阻塞,直到队列中所有项目均被处理。 if __name__ == '__main__':
q = JoinableQueue()
# 生产者们:即厨师们
p1 = Process(target=producer, args=('包子', q))
p2 = Process(target=producer, args=('骨头', q))
p3 = Process(target=producer, args=('泔水', q)) # 消费者们:即吃货们
c1 = Process(target=consumer, args=(q,))
c2 = Process(target=consumer, args=(q,))
c1.daemon = True
c2.daemon = True # 开始
p_l = [p1, p2, p3, c1, c2]
for p in p_l:
p.start() p1.join()
p2.join()
p3.join()
print('主') # 主进程等--->p1,p2,p3等---->c1,c2
# p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据
# 因而c1,c2也没有存在的价值了,不需要继续阻塞在进程中影响主进程了。应该随着主进程的结束而结束,所以设置成守护进程就可以了。

###################################################

最新文章

  1. zookeeper集群搭建
  2. bootstrap的介绍 和使用
  3. 利用python的双向队列(Deque)数据结构实现回文检测的算法
  4. 网页倒计时,动态显示&quot;&#215;&#215;年还剩&#215;&#215;天&#215;&#215;时&#215;&#215;分&#215;&#215;秒&quot;
  5. Memcached和Redis对比和适用场景
  6. MySQL 5.5.35 单机多实例配置详解
  7. Mac 上SVN上传.a文件
  8. HDU 2122 HDU Today【Floyd】
  9. (C#)使用队列(Queue)解决简单的并发问题
  10. JDK与JRE
  11. 移植openssl
  12. java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.Worker thread.
  13. 【noip模拟】局部最小值
  14. 百度病了,必应挂了,Yandex疯了。
  15. sqlSugar的使用---入门
  16. javascript NaN注意事项
  17. 在div中放一个相同大小的svg,实际显示的位置svg偏下
  18. 上海高校金马五校赛 F题:1 + 2 = 3?
  19. 【Linux 线程】常用线程函数复习《三》
  20. 【转】mysql 索引过长1071-max key length is 767 byte

热门文章

  1. echarts 柱状图的选中模式实现-被选中变色和再次选中为取消变色
  2. 报错盲注之exp注入(double数值类型溢出原理详解)
  3. 第二十篇ORM查询与SQL语句
  4. 文件的概念、标准IO其一
  5. c++程序—switch分支
  6. 使用Redux管理React数据流要点浅析
  7. cf 453A.Little Pony and Expected Maximum
  8. C++ STD Gems04
  9. Ubuntu16.04 faster-rcnn+caffe+gpu运行环境配置以及解决各种bug
  10. mysql5.6免安装使用