先来看一下一个简单的例子

例1:

async def foo():
print('enter foo ...')
await bar()
print('exit foo ...') async def bar():
print('enter bar ...')
print('exit bar ...') f = foo()
try:
f.send(None)
except StopIteration as e:
print(e.value)

例2:

async def foo():
print('enter foo ...')
try:
bar().send(None)
except StopIteration as e:
pass
print('exit foo ...') async def bar():
print('enter bar ...')
print('exit bar ...') f = foo()
try:
f.send(None)
except StopIteration as e:
print(e.value)

也就是说 await bar() 等价于这个

try:
bar().send(None)
except StopIteration as e:
pass

更进一步来讲,await 协程的嵌套就跟函数调用一样,没什么两样。

def foo():
print('enter foo ...')
bar()
print('exit foo ...') def bar():
print('enter bar ...')
print('exit bar ...') foo()

理解了跟函数调用一样就可以看看成是这样:

执行f.send(None)时其实就是执行

print('enter foo ...')		

    print('enter bar ...')
print('exit bar ...') print('exit foo ...')

例3:

class Future:

    def __iter__(self):
print('enter Future ...')
yield self
print('foo 恢复执行')
print('exit Future ...') __await__ = __iter__ async def foo():
print('enter foo ...')
await bar()
print('exit foo ...') async def bar():
future = Future()
print('enter bar ...')
await future
print('exit bar ...') f = foo()
try:
f.send(None)
print('foo 挂起在yield处 ')
print('--'*10)
f.send(None)
except StopIteration as e:
print(e.value)

执行结果:

enter foo ...
enter bar ...
enter Future ...
foo 挂起在yield处
--------------------
foo 恢复执行
exit Future ...
exit bar ...
exit foo ...
None

Future是一个Awaitable对象,实现了__await__方法,await future 实际上是会进入到future.__await__方法中也就是future.__iter__方法中的逻辑,执行到 yield self 处foo协程才真正被挂起,返回future对象本身,f.send(None)才真正的执行完毕,

  • 第一次调用f.send(None),执行:

      print('enter foo ...')
    print('enter bar ...')
    print('enter Future ...')

被挂起

  • 第二次调用f.send(None),执行:

      print('exit Future ...')
    print('exit bar ...')
    print('exit foo ...')

也就是说这样一个foo协程完整的调用过程就是如下过程:

- foo print('enter foo ...')
- bar print('enter bar ...')
- future print('enter Future ...')   # 以上是第一次f.send(None)执行的逻辑,命名为part1
- future yield self ---------------------------------------------------------------
- future print('exit Future ...')   # 以下是第二次f.send(None)执行的逻辑,命名为part2
- bar print('exit bar ...')
- foo print('exit foo ...')

加入我们把这两次f.send(None)调用的逻辑分别命名成part1和part2,那也就是说,通过future这个对象,准确的说是yield关键字,真正的把foo协程要执行的完整逻辑分成了两部分part1和patr2。并且foo的协程状态会被挂起在yield处,这样就要调用两次f.send(None)才能,执行完foo协程,而不是在例2中,直接只调用一次f.send(None)就执行完了foo协程。这就是Future对象的作用。

重点:没有 await future 的协程是没有灵魂的协程,并不会被真正的挂起,只需一次 send(None) 调用即可执行完毕,只有有 await future

的协程才是真正可以被挂起的协程,需要执行两次 send(None) 才能执行完该协程的完整逻辑。

这里小结一下Future的作用

  1. yield 起到了挂起协程的作用。

  2. 通过 yield 把 foo 协程的执行逻辑真正的分成了 part1 和 part2 两部分。

例4:

class Future:

    def __iter__(self):
print('enter Future ...')
print('foo 挂起在yield处 ')
yield self
print('foo 恢复执行')
print('exit Future ...')
return 'future' __await__ = __iter__ class Task: def __init__(self, cor):
self.cor = cor def _step(self):
cor = self.cor
try:
result = cor.send(None)
except Exception as e:
pass async def foo():
print('enter foo ...')
await bar()
print('exit foo ...') async def bar():
future = Future()
print('enter bar ...')
await future
print('exit bar ...') f = foo() task = Task(f)
task._step()
print('--' * 10)
task._step()

执行结果:

enter foo ...
enter bar ...
enter Future ...
foo 挂起在yield处
--------------------
foo 恢复执行
exit Future ...
exit bar ...
exit foo ...

这个例4与例3不同在于,现在有一个Task类,我们把f.send(None)d的操作,封装在了 Task 的 _step 方法中,调用 task._step() 等于是执行 part1 中的逻辑,再次调用

task._step() 等于是执行part2中的逻辑。现在不想手动的 两次调用task._step() 方法,我们写一个简单的Loop类来帮忙完成对task._step的多次调用,请看下面这个例子。

例5:

class Future:

    def __iter__(self):
print('enter Future ...')
print('foo 挂起在yield处 ')
yield self
print('foo 恢复执行')
print('exit Future ...')
return 'future' __await__ = __iter__ class Task: def __init__(self, cor, *, loop=None):
self.cor = cor
self._loop = loop def _step(self):
cor = self.cor
try:
result = cor.send(None)
except StopIteration as e:
self._loop.close()
except Exception as e:
pass class Loop: def __init__(self):
self._stop = False def create_task(self, cor):
task = Task(cor, loop = self)
return task def run_until_complete(self, task):
while not self._stop:
task._step() def close(self):
self._stop = True async def foo():
print('enter foo ...')
await bar()
print('exit foo ...') async def bar():
future = Future()
print('enter bar ...')
await future
print('exit bar ...') if __name__ == '__main__': f = foo()
loop = Loop()
task = loop.create_task(f)
loop.run_until_complete(task)

执行结果:

enter foo ...
enter bar ...
enter Future ...
foo 挂起在yield处
foo 恢复执行
exit Future ...
exit bar ...
exit foo ...

例5中我们实现了一个简单 Loop 类,在while循环中调用task._step方法。

例6:

class Future:

    def __init__(self, *, loop=None):
self._result = None
self._callbacks = [] def set_result(self, result):
self._result = result
callbacks = self._callbacks[:]
self._callbacks = []
for callback in callbacks:
loop._ready.append(callback) def add_callback(self, callback):
self._callbacks.append(callback) def __iter__(self):
print('enter Future ...')
print('foo 挂起在yield处 ')
yield self
print('foo 恢复执行')
print('exit Future ...')
return 'future' __await__ = __iter__ class Task: def __init__(self, cor, *, loop=None):
self.cor = cor
self._loop = loop def _step(self):
cor = self.cor
try:
result = cor.send(None)
# 1. cor 协程执行完毕时,会抛出StopIteration,说明cor执行完毕了,这是关闭loop
except StopIteration as e:
self._loop.close()
# 2. 有异常时
except Exception as e:
"""处理异常逻辑"""
# 3. result为Future对象时
else:
if isinstance(result, Future):
result.add_callback(self._wakeup)
# 立即调用,让下一loop轮循环中立马执行self._wakeup
result.set_result(None) def _wakeup(self):
self._step() class Loop: def __init__(self):
self._stop = False
self._ready = []
def create_task(self, cor):
task = Task(cor, loop = self)
self._ready.append(task._step)
return task def run_until_complete(self, task): assert isinstance(task, Task) while not self._stop:
n = len(self._ready)
for i in range(n):
step = self._ready.pop()
step()
def close(self):
self._stop = True async def foo():
print('enter foo ...')
await bar()
print('exit foo ...') async def bar():
future = Future(loop=loop)
print('enter bar ...')
await future
print('exit bar ...') if __name__ == '__main__': f = foo()
loop = Loop()
task = loop.create_task(f)
loop.run_until_complete(task)

执行结果:

enter foo ...
enter bar ...
enter Future ...
foo 挂起在yield处
foo 恢复执行
exit Future ...
exit bar ...
exit foo ...

在例6中,我们构建了3个稍微复杂点类,Loop类,Task, Future类,这3个类在整个协程执行流程的调度过程中有很强的相互作用关系。

  • Future

    挂起协程的执行流程,把协程的逻辑分为part1和part2两部分。

  • Task

    把协程的part1和part2逻辑封装到task._step和task._wakeup方法中,在不同的时机分别把它们注册到loop对象中,task._step是创建task实例的时候就注册到了loop中,task._wakeup则是在task._setp执行完挂在yield future处,由于有await future语句的存在,必然是返回一个future对象,判断确实是一个future对象,就把task._wakeup注册到future中,future.set_result()则会在合适的时机被调用,一旦它被调用,就会把future中注册的task._wakeup注册到loop中,然后就会在loop循环中调用task._wakeup,协程的part2的逻辑才得以执行,最后抛出StopIteration异常。

  • Loop

    在一个死循环中执行注册到loop中的task._step和task._wakeup方法,完成对协程完整逻辑的执行。

虽然我们自己构建的这三个类的实现很简单,但是这体现asyncio实现事件循环的核心原理,我们实现loop中并没有模拟耗时等待以及对真正IO事件的监听,对应于asyncio来说,它也是构建了Future, Task, Loop这3个类,只是功能要比我们自己构建的要复杂得多,loop对象的while中通过select(timeout)函数的调用实现模拟耗时操作和实现了对网络IO事件的监听,这样我们只要在写了一个执行一个IO操作时,都会有一个future对象 await future,通过future来挂起当前的协程,比如想进行一个socket连接,协程的伪代码如下:

future = Future
# 非阻塞调用,需要try...except...
socket.connect((host, port))
# 注册一个回调函数到write_callbackselect中,只要socket发生可写事件,就执行回调
add_writer(write_callback, future)
await future
...

当我们在调用socket.connect((host, port)),因为是非阻塞socket,会立马返回,然后把这个write_callback, future注册成select的可写事件的回调函数,这个回调函数什么时候被执行呢,就是在loop循环的select(timeout)返回了可写事件时才会触发,回调函数中会调用future.set_result(),也就是说future.set_result的触发时机是在socket连接成功时,select(timeout)返回了可写事件时,future.set_result的作用就是把协程的part2部分注册到loop,然后在下一轮的循环中立即调用,使得协程的await future下面的语句得以继续执行。

由于我这里没有贴asyncio的loop,task,future对象的源码,所以这个例子看起来会很抽象,在上一篇asyncio中贴了这几个类的源码,想详细了解的可以查看我的上一篇文章《asyncio系列之简单协程的基本执行流程分析》。小伙伴们也可以对照着asyncio的源码来debug,这样再来理解这里说的这个例子就比较容易了。

下一篇将介绍asyncio.sleep()的实现机制。

最新文章

  1. ios block中引用self
  2. AD采样问题总结
  3. Android(java)学习笔记93:Android布局详解之一:FrameLayout
  4. node 与php整合
  5. 关于C#重写,隐藏的一些事
  6. 配置suricata
  7. 深入理解HTTP协议—HTTP协议详解(真的很经典)
  8. 记一次内存溢出的分析经历——thrift带给我的痛orz
  9. anndroid 模糊引导界面
  10. Nginx 提示host not found in upstream 错误解决方法
  11. 解决Hibernate:could not initialize proxy - no Session(申明:来源于网络)
  12. GitHub 翻译之 'Hello-world' 翻译
  13. WebSphere概要文件的创建与删除
  14. 从Excel文件中读取内容
  15. linux 2.6.32文件系统的dentry父子关系
  16. centos7 Minimal安装没有ifconfig
  17. Open Flash Chart 之线图(二)
  18. Diffie-Hellman 密钥交换
  19. springMVC中接收请求参数&&数据转发
  20. Percona-Toolkit工具包之pt-archiver

热门文章

  1. C#: Get current keyboard layout\input language
  2. 3ds Max建模,Blend设计,VS2008控制WPF的3D模型例子
  3. Get and Post(Unity3D六个发展)
  4. 联想K860 ROM 从官方Vibe 精简 提高性能
  5. symfony中doctrine常用属性
  6. 关于Hibernate中hbm2java和hbm2ddl工具
  7. IDEA 自动化配置
  8. ELINK编程器典型场景之序列号烧写
  9. 使用NEWSEQUENTIALID解决GUID聚集索引问题
  10. 内存页面的各种属性(就是Read, Write, Execute的组合)