https://www.cnblogs.com/wongbingming/p/9124142.html

在实战中,将会用到以下知识点:

  • 多线程的基本使用
  • Queue消息队列的使用
  • Redis的基本使用
  • asyncio的使用

. 动态添加协程#

在实战之前,我们要先了解下在asyncio中如何将协程态添加到事件循环中的。这是前提。

如何实现呢,有两种方法:

  • 主线程是同步
import time
import asyncio
from queue import Queue
from threading import Thread def start_loop(loop):
# 一个在后台永远运行的事件循环
asyncio.set_event_loop(loop)
loop.run_forever() def do_sleep(x, queue, msg=""):
time.sleep(x)
queue.put(msg) queue = Queue() new_loop = asyncio.new_event_loop() # 定义一个线程,并传入一个事件循环对象
t = Thread(target=start_loop, args=(new_loop,))
t.start() print(time.ctime()) # 动态添加两个协程
# 这种方法,在主线程是同步的
new_loop.call_soon_threadsafe(do_sleep, 6, queue, "第一个")
new_loop.call_soon_threadsafe(do_sleep, 3, queue, "第二个") while True:
msg = queue.get()
print("{} 协程运行完..".format(msg))
print(time.ctime())

由于是同步的,所以总共耗时6+3=9秒.

输出结果

Thu May 31 22:11:16 2018
第一个 协程运行完..
Thu May 31 22:11:22 2018
第二个 协程运行完..
Thu May 31 22:11:25 2018
  • 主线程是异步的,这是重点,一定要掌握。。
import time
import asyncio
from queue import Queue
from threading import Thread def start_loop(loop):
# 一个在后台永远运行的事件循环
asyncio.set_event_loop(loop)
loop.run_forever() async def do_sleep(x, queue, msg=""):
await asyncio.sleep(x)
queue.put(msg) queue = Queue() new_loop = asyncio.new_event_loop() # 定义一个线程,并传入一个事件循环对象
t = Thread(target=start_loop, args=(new_loop,))
t.start() print(time.ctime()) # 动态添加两个协程
# 这种方法,在主线程是异步的
asyncio.run_coroutine_threadsafe(do_sleep(6, queue, "第一个"), new_loop)
asyncio.run_coroutine_threadsafe(do_sleep(3, queue, "第二个"), new_loop) while True:
msg = queue.get()
print("{} 协程运行完..".format(msg))
print(time.ctime())

输出结果

由于是同步的,所以总共耗时max(6, 3)=6

Thu May 31 22:23:35 2018
第二个 协程运行完..
Thu May 31 22:23:38 2018
第一个 协程运行完..
Thu May 31 22:23:41 2018

实战:利用redis实现动态添加任务#

对于并发任务,通常是用生成消费模型,对队列的处理可以使用类似master-worker的方式,master主要用户获取队列的msg,worker用户处理消息。

为了简单起见,并且协程更适合单线程的方式,我们的主线程用来监听队列,子线程用于处理队列。这里使用redis的队列。主线程中有一个是无限循环,用户消费队列。

先安装Redis
到 https://github.com/MicrosoftArchive/redis/releases 下载

解压到你的路径。

然后,在当前路径运行cmd,运行redis的服务端。

服务开启后,我们就可以运行我们的客户端了。
并依次输入key=queue,value=5,3,1的消息。

一切准备就绪之后,我们就可以运行我们的代码了。

import time
import redis
import asyncio
from queue import Queue
from threading import Thread def start_loop(loop):
# 一个在后台永远运行的事件循环
asyncio.set_event_loop(loop)
loop.run_forever() async def do_sleep(x, queue):
await asyncio.sleep(x)
queue.put("ok") def get_redis():
connection_pool = redis.ConnectionPool(host='127.0.0.1', db=0)
return redis.Redis(connection_pool=connection_pool) def consumer():
while True:
task = rcon.rpop("queue")
if not task:
time.sleep(1)
continue
asyncio.run_coroutine_threadsafe(do_sleep(int(task), queue), new_loop) if __name__ == '__main__':
print(time.ctime())
new_loop = asyncio.new_event_loop() # 定义一个线程,运行一个事件循环对象,用于实时接收新任务
loop_thread = Thread(target=start_loop, args=(new_loop,))
loop_thread.setDaemon(True)
loop_thread.start()
# 创建redis连接
rcon = get_redis() queue = Queue() # 子线程:用于消费队列消息,并实时往事件对象容器中添加新任务
consumer_thread = Thread(target=consumer)
consumer_thread.setDaemon(True)
consumer_thread.start() while True:
msg = queue.get()
print("协程运行完..")
print("当前时间:", time.ctime())

稍微讲下代码

loop_thread:单独的线程,运行着一个事件对象容器,用于实时接收新任务。
consumer_thread:单独的线程,实时接收来自Redis的消息队列,并实时往事件对象容器中添加新任务。

输出结果

Thu May 31 23:42:48 2018
协程运行完..
当前时间: Thu May 31 23:42:49 2018 协程运行完..
当前时间: Thu May 31 23:42:51 2018 协程运行完..
当前时间: Thu May 31 23:42:53 2018

我们在Redis,分别发起了5s3s1s的任务。
从结果来看,这三个任务,确实是并发执行的,1s的任务最先结束,三个任务完成总耗时5s

运行后,程序是一直运行在后台的,我们每一次在Redis中输入新值,都会触发新任务的执行。。


												

最新文章

  1. struts2页面上如何操作字符串
  2. psd图片到html
  3. ASP.NET 使用 System.Web.Script.Serialization 解析 JSON (转)
  4. 【BZOJ1500】[NOI2005]维修数列
  5. O the joy of having nothing / 아무것도 갖지않고
  6. interview collect
  7. 生成GUID字符串
  8. Github Coding Developer Book For LiuGuiLinAndroid
  9. 解决shell脚本中 telnet ap自动输入用户名和密码以及回车符
  10. scrapy基本使用(一)
  11. K8s-Pod控制器
  12. css的再深入9(更新中···)
  13. kudu导入文件(基于impala)
  14. 在WPF中使用Caliburn.Micro搭建MEF插件化开发框架
  15. windows Server 2008 R2 添加新用户时密码不满足密码策略的要求
  16. codeforces 979A Pizza, Pizza, Pizza!!!
  17. mtcp的快速编译(连接)
  18. dynamic(2) – ExpandoObject的使用
  19. 【Java并发编程一】线程安全和共享对象
  20. word文档下划线无法显示的解决方法

热门文章

  1. TensorFlow从0到1之TensorFlow实现多元线性回归(16)
  2. Python中和迭代有关的两个函数next()和iter()
  3. Hexo快速构建个人小站-自定义域名和自定义主题(二)
  4. 【Python爬虫】HTTP基础和urllib库、requests库的使用
  5. JavaWeb网上图书商城完整项目--11.项目所需jquery函数介绍
  6. JavaWeb网上图书商城完整项目-数据库操作工具类2-MapHandle的高级用法
  7. 恕我直言你可能真的不会java第6篇:Stream性能差?不要人云亦云
  8. MySql数据库GROUP BY使用过程中的那些坑
  9. java-IO流(commons-io-2.6)使用教程
  10. mybatis缓存之一级缓存(一)