前言

书接上文:,本文造第四个轮子,也是asyncio包里面非常常用,并且非常复杂的一个函数sleep

一、知识准备

time.sleep直接让当前线程睡觉,但是这种方式显然是不能接受的,如果当前线程睡觉,那我们所有的协程任务都会被卡主,并发也就无从谈起了

● 理解socket.socketpair()创建的套接字对象

● 理解selectors的应用

● 理解最小堆以及heapq的应用

● 理解对象比较

● 这一小结的基础知识很多,希望大家优先了解上述的知识再开始阅读,否则很容易不知所云

二、环境准备

组件 版本
python 3.7.7

三、sleep的实现

先来看下官方sleep的使用方法:

|># more main.py
import asyncio async def hello():
print('enter hello ...')
await asyncio.sleep(5)
print('hello sleep end...')
return 'return hello...' async def world():
print('enter world ...')
await asyncio.sleep(3)
print('world sleep end...')
return 'return world...' async def helloworld():
print('enter helloworld')
ret = await asyncio.gather(hello(), world())
print('exit helloworld')
return ret if __name__ == "__main__":
ret = asyncio.run(helloworld())
print(ret) |># time python3 main.py
enter helloworld
enter hello ...
enter world ...
world sleep end...
hello sleep end...
exit helloworld
['return hello...', 'return world...'] real 0m5.256s
user 0m0.077s
sys 0m0.020s

来看下造的轮子的使用方式:

 more main.py
async def hello():
print('enter hello ...')
await wilsonasyncio.sleep(5)
print('hello sleep end...')
return 'return hello...' async def world():
print('enter world ...')
await wilsonasyncio.sleep(3)
print('world sleep end...')
return 'return world...' async def helloworld():
print('enter helloworld')
ret = await wilsonasyncio.gather(hello(), world())
print('exit helloworld')
return ret if __name__ == "__main__":
ret = wilsonasyncio.run(helloworld())
print(ret) time python3 main.py
enter helloworld
enter hello ...
enter world ...
world sleep end...
hello sleep end...
exit helloworld
['return hello...', 'return world...']
python3 main.py 0.06s user 0.04s system 1% cpu 5.406 total

都是用了5s左右,自己造的轮子也很好的运行了,下面我们来看下轮子的代码

四、代码解析

轮子代码

1)代码组成

|># tree
.
├── eventloops.py
├── futures.py
├── main.py
├── tasks.py
├── wilsonasyncio.py
文件 作用
eventloops.py 事件循环
futures.py futures对象
tasks.py tasks对象
wilsonasyncio.py 可调用方法集合
main.py 入口

2)代码概览:

eventloops.py

类/函数 方法 对象 作用 描述
Eventloop 事件循环,一个线程只有运行一个
__init__ 初始化两个重要对象 self._readyself._stopping
self._ready 所有的待执行任务都是从这个队列取出来,非常重要
self._scheduled 待调度的任务队列,这个队列的任务会在合适的时机进入self._ready 新增
self._stopping 事件循环完成的标志
current_time 从线程启动到现在经历的秒数 新增
call_later 调用该方法会经历一个延时delay之后,再将任务添加到待执行队列 新增
call_at 调用该方法会在指定的时间将任务添加到待执行队列 新增
call_soon 调用该方法会立即将任务添加到待执行队列
run_once run_forever调用,从self._ready队列里面取出任务执行 重新改造,添加了大量逻辑
run_forever 死循环,若self._stopping则退出循环
run_until_complete 非常重要的函数,任务的起点和终点(后面详细介绍)
create_task 将传入的函数封装成task对象,这个操作会将task.__step添加到__ready队列
create_future 返回Future对象 新增
Handle 所有的任务进入待执行队列(Eventloop.call_soon)之前都会封装成Handle对象
__init__ 初始化两个重要对象 self._callbackself._args
self._callback 待执行函数主体
self._args 待执行函数参数
_run 待执行函数执行
TimerHandle 带时间戳的对象 新增
__init__ 初始化重要对象 self._when 新增
self._when 调度的时间 新增
__lt__``__gt__``__eq__ 一系列魔术方法,实现对象比较 新增
get_event_loop 获取当前线程的事件循环
fake_socket 创建一对套接字对象,并且将一种一条套接字注册到多路复用对象sel,返回sel 新增
_complete_eventloop 将事件循环的_stopping标志置位True
run 入口函数
gather 可以同时执行多个任务的入口函数
_GatheringFuture 将每一个任务组成列表,封装成一个新的类
sleep 入口函数 新增

tasks.py

类/函数 方法 对象 作用 描述
Task 继承自Future,主要用于整个协程运行的周期
__init__ 初始化对象 self._coro ,并且call_soonself.__step加入self._ready队列
self._coro 用户定义的函数主体
__step Task类的核心函数
__wakeup 唤醒任务
ensure_future 如果对象是一个Future对象,就返回,否则就会调用create_task返回,并且加入到_ready队列

futures.py

类/函数 方法 对象 作用 描述
Future 主要负责与用户函数进行交互
__init__ 初始化两个重要对象 self._loopself._callbacks
self._loop 事件循环
self._callbacks 回调队列,任务暂存队列,等待时机成熟(状态不是PENDING),就会进入_ready队列
add_done_callback 添加任务回调函数,状态_PENDING,就虎进入_callbacks队列,否则进入_ready队列
set_result 获取任务执行结果并存储至_result,将状态置位_FINISH,调用__schedule_callbacks
__schedule_callbacks 将回调函数放入_ready,等待执行
result 获取返回值
__await__ 使用await就会进入这个方法
__iter__ 使用yield from就会进入这个方法
set_result_unless_cancelled 其实就是Future.set_result,只不过调用场景与调用方式不一样 新增

新加了很多的函数,后面我们边走流程,边讲解他们的用途

3)执行过程

3.1)入口函数

main.py


if __name__ == "__main__":
ret = wilsonasyncio.run(helloworld())
print(ret)

3.2)事件循环启动,同gather,不再赘述

3.3)第一次循环run_forever --> run_once,同gather,不再赘述

3.3.1)gather完成,回到helloworld(),同gather,不再赘述

3.4)第二次循环run_forever --> run_once,从这里开始,不一样的地方来了

  • 从队列中取出数据,此时_ready队列有两个任务,hello() world(),在gather的for循环时添加的
    def run_once(self):
timeout = 0
if not self._ready and self._scheduled:
heapq.heapify(self._scheduled)
when = self._scheduled[0]._when
timeout = min(max(0, when - self.current_time()), 60)
self._selector.select(timeout) end_time = self.current_time()
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
self._ready.append(handle) ntodo = len(self._ready)
for _ in range(ntodo):
handle = self._ready.popleft()
handle._run()
  • run_once做了改动,由于if not self._ready and self._scheduled不满足(self._ready有内容)
  • timeout=0self._selector.select(timeout)不会等待,直接跳过
  • while self._scheduled不满足,不会进入while循环
  • 直接进入handle._run()阶段

上述的函数都没有执行,所以没有分析,在后面执行的时候会详细分析作用

tasks.py

    def __step(self, exc=None):
coro = self._coro
try:
if exc is None:
result = coro.send(None)
else:
result = coro.throw(exc)
except StopIteration as exc:
super().set_result(exc.value)
else:
blocking = getattr(result, '_asyncio_future_blocking', None)
if blocking:
result._asyncio_future_blocking = False
result.add_done_callback(self.__wakeup, result)
finally:
self = None
  • 经过coro.send(None)回到各自的函数本体中
async def hello():
print('enter hello ...')
await wilsonasyncio.sleep(5)
return 'return hello...' async def world():
print('enter world ...')
await wilsonasyncio.sleep(3)
return 'return world...'
  • 由于hello() world()使用await调用了sleep,我们开看看sleep的源码:
async def sleep(delay, result=None, *, loop=None):
if loop is None:
loop = get_event_loop()
future = loop.create_future()
loop.call_later(delay, set_result_unless_cancelled, future, result)
return await future
  • future = loop.create_future()创建了一个Future对象,随后调用call_later
    def call_later(self, delay, callback, *args):
timer = self.call_at(self.current_time() + delay, callback, *args)
return timer def call_at(self, when, callback, *args):
timer = TimerHandle(when, callback, *args)
heapq.heappush(self._scheduled, timer)
return timer
  • call_later call_at,主要的逻辑:

            a) self.current_time()获取当前的时间线,再加上传入的delayhello(5)``world(3))计算出延时

            b) timer = TimerHandle(when, callback, *args)将延时、callback(即set_result_unless_cancelled)再加上参数封装成TimerHandle对象

            c) heapq.heappush(self._scheduled, timer)将对象推入self._scheduled 队列等待合适的时间调度到self._ready

            d) 第一点需要注意的是,self._scheduled 是一个最小堆

            e) 第二点需要注意的是,TimerHandle 实现了__lt__ __gt__ __eq__,所以会通过self._when 进行对象比较。在本例中,hello()的延时是current_time+5,world()的延时是current_time+3,所以world()会优先调度

  • 返回到sleep

async def sleep(delay, result=None, *, loop=None):
if loop is None:
loop = get_event_loop()
future = loop.create_future()
loop.call_later(delay, set_result_unless_cancelled, future, result)
return await future
  • return await futuregather小节中描述过,一旦调用了await,就会来到__await__
    def __await__(self):
if self._state == _PENDING:
self._asyncio_future_blocking = True
yield self
return self.result()
  • yield self self就是之前创建的future对象,并且返回到当初send的地方,对的,就是Task.__step里面
    def __step(self, exc=None):
coro = self._coro
try:
if exc is None:
result = coro.send(None)
else:
result = coro.throw(exc)
except StopIteration as exc:
super().set_result(exc.value)
else:
blocking = getattr(result, '_asyncio_future_blocking', None)
if blocking:
result._asyncio_future_blocking = False
result.add_done_callback(self.__wakeup, result)
finally:
self = None
  • 由于他们去__await__溜达了一圈,所以_asyncio_future_blocking=True,所以hello() world()的回调函数是self.__wakeup

  • 这一次循环结束了,hello() world()并没有结束,而是分分挂起,等待他们的子任务await wilsonasyncio.sleep执行结束完之后才会唤醒

3.5)第三次循环run_forever --> run_once

eventloops:

    def run_once(self):
timeout = 0
if not self._ready and self._scheduled:
heapq.heapify(self._scheduled)
when = self._scheduled[0]._when
timeout = min(max(0, when - self.current_time()), 60)
self._selector.select(timeout) end_time = self.current_time()
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
self._ready.append(handle) ntodo = len(self._ready)
for _ in range(ntodo):
handle = self._ready.popleft()
handle._run()
  • run_once函数进行了改造
  • self._ready为空,并且self._scheduled队列不为空,里面的内容是hello() world()2个函数中调用await wilsonasyncio.sleep()时添加进self._scheduled,分别是两个TimerHandle对象,并且是按照最小堆排序
  • 由于是按照最小堆排序,直接取出来肯定就是时间最小的,timeout = min(max(0, when - self.current_time()), 60),获取timeout
  • self._selector.select(timeout)核心代码,主要的逻辑就会等待timeout的时间,是整个线程等待,所有的协程任务都会卡主,但是当前队列里面所有的任务都是等待,并且当前的timeout是等待时间最小的。在本例中,timeout就是world()的等待时间:3s

self._selector.select(timeout) 这里要详细描述一下了

  • 核心逻辑就是将一堆等待中的协程任务中,选一个等待时间最小的出来,然后线程sleep
  • 这时的sleep可以用粗暴的time.sleep,但是我们选择了self._selector.select(timeout),主要_selector在后面的应用中发挥了巨大的作用,只是在这里看不出来
  • self._selector是一个多路复用对象,就是我们熟悉的select() poll() epoll(),我们将一条socket注册到对象上面,然后调用self._selector.select(timeout),整个线程就会在timeout期间,监听socket是否有新的数据,一旦socket有新的数据,就会立即执行回调函数
  • 那socket对象是怎么来的,event_loop初始化的时候,会通过socket.socketpair()创建一对socket,并且把它注册进self._selector,所以self._selector.select(timeout)会监听socket是否有读写事件
  • sel.register(_ssock.fileno(), selectors.EVENT_READ, None)当然,我们的回调函数是None,没有注册任何回调,在本例中,我们只需要它的等待功能
  • 肯定会有人问了,那如果这时候出现了一个需要立即执行的任务,怎么办?这是后面解答的问题,在后面的轮子中肯定会解答这个问题,而在本例中,不可能会出现这种情况(如果是立即执行的任务,那么一定出现在self._ready队列里面,在第一次循环就已经执行完成了)

简单解释了一下,我们喝一口水,继续。。。

  • end_time = self.current_time()记录一下当前的时间
        while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
self._ready.append(handle)
  • _when大于当前时间的协程任务取出来,放入self._ready队列,等待下一次循环调度
  • _when小于当前事件的协程任务留在self._scheduled ,他们还需要继续睡觉
  • 全部搞完,继续往下走
        ntodo = len(self._ready)
for _ in range(ntodo):
handle = self._ready.popleft()
handle._run()
  • 熟悉的流程,handleworld()sleep,执行其回调函数set_result_unless_cancelled,就是set_result
  • __wakeup加入到self._ready队列,world()等待被唤醒

3.6)第四次循环run_forever --> run_once

if not self._ready and self._scheduled:
  • timeout=0, self._ready有协程任务,不会等待
  • __wakeup --> __step --> send(None) --> 回到 world()
async def world():
print('enter world ...')
await wilsonasyncio.sleep(3)
print('world sleep end...')
return 'return world...'
  • world()协程完成,将回调_done_callback加入到self._ready队列,同gather

3.7)第五次循环run_forever --> run_once

  • gather._done_callback(),由于条件不满足,不会添加回调到self._ready,同gather

3.8)第六次循环run_forever --> run_once

        if not self._ready and self._scheduled:
heapq.heapify(self._scheduled)
when = self._scheduled[0]._when
timeout = min(max(0, when - self.current_time()), 60)
  • timeout = min(max(0, when - self.current_time()), 60),由于刚才等了3s,hello()delay是5s,所以这里timeout=2(这之间还有很多代码的执行时间,不过执行速度非常快)

  • self._selector.select(timeout)等待2s之后,重复了world()的流程

  • handlehello()sleep,执行其回调函数set_result_unless_cancelled,就是set_result

  • __wakeup加入到self._ready队列,hello()等待被唤醒

  • results.append(res)将子任务的结果取出来,放进父任务的results里面

  • 子任务执行完成,终于到了唤醒父任务的时候了task.__wakeup

    def __wakeup(self, future):
try:
future.result()
except Exception as exc:
raise exc
else:
self.__step()
self = None

3.8)第七次循环run_forever --> run_once

  • __wakeup --> __step --> send(None) --> 回到 hello()
async def hello():
print('enter hello ...')
await wilsonasyncio.sleep(5)
print('hello sleep end...')
return 'return hello...'
  • hello()协程完成,将回调_done_callback加入到self._ready队列,同gather

3.9)第八次循环run_forever --> run_once

  • gather._done_callback(),由于条件满足,将回调到self._readyhelloworld等待被唤醒。同上一小节gather

3.10)第九次循环run_forever --> run_once

  • 循环结束
  • 回到run

3.11)回到主函数,获取返回值

if __name__ == "__main__":
ret = wilsonasyncio.run(helloworld())
print(ret)

3.12)执行结果

 python3 main.py
enter helloworld
enter hello ...
enter world ...
world sleep end...
hello sleep end...
exit helloworld
['return hello...', 'return world...']

五、流程总结

六、小结

● 无法总结。。。。有问题私信、留言

● 本文中的代码,参考了python 3.7.7中asyncio的源代码,裁剪而来

● 本文中代码:代码


至此,本文结束

在下才疏学浅,有撒汤漏水的,请各位不吝赐教...

更多文章,请关注我:wilson.chai

最新文章

  1. python网络爬虫 新浪博客篇
  2. DOM操作
  3. PHP之:析构函数
  4. Distinct
  5. Machine Learning - 第6周(Advice for Applying Machine Learning、Machine Learning System Design)
  6. Python异步IO --- 轻松管理10k+并发连接
  7. html+CSS--水平居中设置(定宽块状元素)
  8. JPG各种输入框样式
  9. Char Tools,方便的字符编码转换小工具
  10. 三个重要的游标sp_cursoropen
  11. OCP-1Z0-051-题目解析-第11题
  12. grep的用法笔记
  13. Spring事务管理的实现方式:编程式事务与声明式事务
  14. git上传遇到 GitHub could not read Username 的解决办法
  15. 并发库应用之十 & 多线程数据交换Exchanger应用
  16. HDU--1540 Tunnel Warfare(线段树区间更新)
  17. Java实现单词树(trie)
  18. FlexViewer之整体框架解析
  19. 将python环境打包成.txt文件
  20. pycharm 调试 scrapy

热门文章

  1. 如何美化 Matplotlib 的工具栏和绘图风格
  2. git 初始化本地项目并推送到远程
  3. Android App发布遇到的问题总结【转】
  4. div置顶
  5. redis集群升级,数据迁移及校验
  6. Redis性能管理
  7. vue从后台拿数据渲染页面图片
  8. Python中类的多层继承和多重继承
  9. python中随机生成整数
  10. 数字化转型——医院数字化管理平台HDMP建设历程