1.多线程实现

import threading
import queue
import logging
logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(levelname)s -->%(funcName)s at line %(lineno)d: \n %(message)s')
log= logging.getLogger() def producer(q):
for k in range(10):
q.put(k)
logging.info("put %s item into queue "%k)
# block all producer main until consumer get all queue members that put
q.join() def consumer(q):
while True:
"""
if queue been empty and q.get(block=True),
because default block in source code is True ,
queue will bock forever util queue put new item .
"""
item=q.get()
logging.info("get out of %s member of queue "%item) # notify q.join() consumer has get out of member of queue
q.task_done() if __name__ == '__main__':
thread_num=5
# limit max length of queue
q = queue.Queue(maxsize=5)
producers=[threading.Thread(target=producer,args=(q,))]
consumers=[threading.Thread(target=consumer,args=(q,)) for i in range(thread_num)]
# start producer and consumer
for pr in producers:
pr.start()
for cn in consumers:
cn.start()
# block main thread of producer
for p in producers:
p.join()
# block main of consumer
for c in consumers:
c.join() 执行结果:可以看到控制台一直阻塞在那里。由于消费者消费完queue里面所有的成员后queue 变成empty,空队列,由于源码q.get()内置参数block一直为True,当取不到时一直阻塞,那么这就意味着当你block 为True时必须设置结束标志让消费者结束,退出死循环接下来我们
改良一下这个方法:

在生产者生产的时候加入结束标志None告诉消费者拿到None时就break 跳出死循环,结束消费,这样就解决了消费之一直阻塞问题:

在producer主线程join之前添加与线程数目相同的None,让每个thread break 跳出死循环:

import threading
import queue
import logging
logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(levelname)s -->%(funcName)s at line %(lineno)d: \n %(message)s')
log= logging.getLogger() def producer(q):
for k in range(10):
q.put(k)
logging.info("put %s item into queue "%k)
# block all producer main until consumer get all queue members that put
q.join() def consumer(q):
while True:
"""
if queue been empty and q.get(block=True),
because default block in source code is True ,
queue will bock forever util queue put new item .
"""
item=q.get()
logging.info("get out of %s member of queue " % item)
if item == None:
logging.info("get None stop consumer from while loop")
break
# notify q.join() consumer has get out of member of queue
q.task_done() if __name__ == '__main__':
thread_num=5
# limit max length of queue
q = queue.Queue(maxsize=5)
producers=[threading.Thread(target=producer,args=(q,))]
consumers=[threading.Thread(target=consumer,args=(q,)) for i in range(thread_num)]
# start producer and consumer
for pr in producers:
pr.start()
for cn in consumers:
cn.start()
# block main thread of producer
for p in producers:
p.join()
# put same number of thread_num None to break thread from consumer while
for i in range(thread_num):
q.put(None)
# block main of consumer
for c in consumers:
c.join()

  

2019-12-21 20:37:00,206 - INFO -->consumer at line 23:
get out of None member of queue
2019-12-21 20:37:00,206 - INFO -->consumer at line 25:
get None mean break
2019-12-21 20:37:00,206 - INFO -->consumer at line 25:
get None mean break
2019-12-21 20:37:00,206 - INFO -->consumer at line 25:
get None mean break
2019-12-21 20:37:00,206 - INFO -->consumer at line 25:
get None mean break
2019-12-21 20:37:00,206 - INFO -->consumer at line 23:
get out of None member of queue
2019-12-21 20:37:00,207 - INFO -->consumer at line 25:
get None mean break

Process finished with exit code 0

结果:正常释放所有线程,但是好像每次线程都要加个标志太麻烦,感觉,能不能有更好的办法呢,当然接着往下看:

改良consumer方法在每次消费队列之前检查队列queue.empty() 是否为True,是就直接break跳出while loop:就不需要每次都加None标志了

import threading
import queue
import logging
logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(levelname)s -->%(funcName)s at line %(lineno)d: \n %(message)s')
log= logging.getLogger() def producer(q):
for k in range(10):
q.put(k)
logging.info("put %s item into queue "%k)
# block all producer main until consumer get all queue members that put
q.join() def consumer(q):
while True:
"""
if queue been empty and q.get(block=True),
because default block in source code is True ,
queue will bock forever util queue put new item .
"""
if q.empty():break
item=q.get()
logging.info("get out of %s member of queue " % item)
# notify q.join() consumer has get out of member of queue
q.task_done() if __name__ == '__main__':
thread_num=5
# limit max length of queue
q = queue.Queue(maxsize=5)
producers=[threading.Thread(target=producer,args=(q,))]
consumers=[threading.Thread(target=consumer,args=(q,)) for i in range(thread_num)]
# start producer and consumer
for pr in producers:
pr.start()
for cn in consumers:
cn.start()
# block main thread of producer
for p in producers:
p.join()

  

j结果如下:

最新文章

  1. 编辑 Ext 表格(二)——— 编辑表格元素
  2. flex自适应小例子
  3. Math DayTwo
  4. Service和Thread的关系及如何启用Service,如何停用Service
  5. 解决SQL SERVER LDF文件过大的问题
  6. @Override在JDK1.5和JDK1.6中用法区别
  7. OC1_数组创建
  8. 手机摇一摇效果-html5
  9. 宇宙【全7季】【合集】【蓝光1080P】【历史频道】
  10. eclipse run on server 浏览器启动设置
  11. ST40 自制 JTAG 适配器
  12. [USACO13NOV]没有找零No Change [TPLY]
  13. NABCD-课程表开发
  14. css格式比较及选择器类型总结
  15. ZOOKEEPER典型应用场景解析
  16. 一个页面中使用多个UEditor
  17. 删除已渲染select标签的值
  18. 【转】stm32CubeMx上移植自己的printf()和scanf()函数
  19. 词袋模型(BOW, bag of words)
  20. UI Recorder 安装教程(一)

热门文章

  1. [Python]爬取CSDN论坛 标题 2020.2.8
  2. LaTeX技巧012:LaTeX 插图加载宏包
  3. 使用Vmware过程中,突然网络连接不上问题
  4. (转) 统计在从1到n的正整数中1出现的次数
  5. LVS笔试题!
  6. Web安全测试学习笔记 - 文件包含
  7. ubuntu系统定时运行 crontab
  8. 【Unity|C#】番外篇(1)——6个重要概念:栈与堆,值类型与引用类型,装箱与拆箱
  9. 问题解决:局域网内,为啥别人ping不到我的IP
  10. 记录 shell学习过程(5)continue break