这一篇是高级编程的最后一篇了,学完整套课程之后对Python
的知识很多地方有了豁然开朗的感觉。如果你看到了这个系列的文章觉得不错的话,可以购买老师的原版视频文件,进行学习哦!购买链接:Python高级编程和异步IO并发编程
asyncio
是Python
中解决异步I/O高并发的一个模块,在3.4版本之后引入。
asyncio的事件循环
我们先看下asyncio
有哪些功能:
包含各种特定系统实现的模块化事件循环(针对不同系统都能兼容的事件循环:例如Windows下的
select
,linux下的epoll
。)传输和协议抽象(对TCP和UDP协议的抽象)
对TCP、UDP、SSL、子进程、延时调用以及其他的具体支持
- 模仿
futures
模块但适用于事件循环使用的Future
类 基于
yield from
的协议和任务,可以让我们使用顺序的方式编写并发代码必须使用一个将产生阻塞IO的调用时,有接口可以把这个事件迁移到线程池
- 模仿
threading
模块中的同步原语,可以用在单线程内的协程之间
前面我们学习了协程,但是协程脱离事件循环意义就不是很大了。
下面我们开始学习asyncio
的使用吧!💪
首先我们明确一点,高并发异步IO编程的编码模式由三部分组成:
事件循环+回调(驱动生成器)+epoll
(IO多路复用)
asyncio
是Python
用于解决异步io编程的一整套解决方案
有趣的小知识:
tornado
也是基于asyncio
的异步框架,通过协程和事件循环来完成高并发。相对于Django
和Flask
这种传统的阻塞IO框架本身不提供web服务器,不会去完成Socket
编码的,因此我们在部署的时候会搭配实现了SOcket
编码的框架(uwsgi, gunicorn+nginx)。Tornado
实现了自己的web服务器,因此我们部署Tornado
的时候是可以直接部署的(会使用epoll
来完成socket请求),但是真正部署的时候,还是会使用nginx
来完成一些操作(IP限制等)。因此Tornado
的数据库驱动就不能使用阻塞IO驱动框架了。
asyncio
的简单使用:
协程要搭配事件循环才能使用
1 | import asyncio |
我们可以同时执行多个协程,传入一个可迭代的任务对象
1 | import asyncio |
注意:在协程中不能使用同步的时间睡眠
time.sleep()
,否则当执行的协程超过一个的时候就会出现同步阻塞的情况。
要是哪个小伙伴想测试下上面那句话,可以将上面的代码await asyncio.sleep(2)
改为time.sleep(2)
你会发现运行的时间不再是两秒了,而是20+秒。
为什么不能再协程使用同步的sleep
呢?
这就要说到我们的loop
小朋友了,协程要配合事件循环的,我们在运行协程的时候当遇到await
关键字就知道这是一个异步阻塞操作了,会在此处暂停返回一个Future
对象,然后由loop
小朋友再执行已经可以运行的协程。这样保证了能够异步执行操作。当我们直接在协程中使用sleep
同步操作时候,不会暂停而是一直等待,这就是原因😊
如何获得协程的返回值呢?
1 | import asyncio |
上面的代码还可以这么写
1 | if __name__ == "__main__": |
我们看到使用loop.create_task
和asyncio.ensure_future
是一样的效果,具体区别我们稍后会学习到。💪
有没有小伙伴怀疑,当使用asyncio.ensure_future
的时候是何时和我们创建的loop
建立联系的呢,是在loop.run_until_complete(get_future)
的时候吗?
让我们看下ensure_future
的源码:
1 | def ensure_future(coro_or_future, *, loop=None): |
除了上面直接调用协程,我们还可以在协程执行完成之后进行一个回调。
1 | import asyncio |
我们使用partial
将传入的参数,伪造成一个函数。
回调函数会默认接收一个 future 对象参数
wait和gather
我们上面已经使用了wait
来进行多协程的运行,我们看下它的源码:
1 |
|
这个wait
我们理解为多线程中的wait
,同样存在return_when
参数,可以指定何时返回。
那gather
如何使用呢?
1 | if __name__ == "__main__": |
我们将wait
直接修改为gather
然后可迭代对象加上*
即可。
两者的区别是什么呢?
gather
更加hight-levelgather
可以将协程分组
1 | group1 = [get_html("http://projectsedu.com") for i in range(2)] |
task取消和子协程调用原理
我们先看下run_until_complete
和run_forever
两个函数的区别。
run_until_complete
在运行完指定的协程之后就会停止,而run_forever
则会一直运行。
看下源码:
在图片中我们看到run_until_complete
里面同样使用了run_forever
。但是,增加了一个回调_run_until_complete_cb
:
1 | def _run_until_complete_cb(fut): |
在回调函数中当没有协程运行的时候会将loop
即事件循环直接暂停。
asyncio
会将loop
放到future
中,而future
同样会被放到loop
中。因此我们可以在任何一个任务中停止掉 loop
如何取消协程中的task(future)
1 | import asyncio |
有咩有小伙伴对all_tasks = asyncio.Task.all_tasks()
这句代码疑惑?
我们看了源码就知道了 因为全局只有一个loop
,所以能够在任何位置轻松获得loop
相关的信息。
如何在协程中插入子协程
我们看一段官方文档的代码:
官方文档叫chain coroutines
链式协程?
1 | import asyncio |
compute()
is chained to print_sum()
: print_sum()
coroutine waits until compute()
is completed before returning its result.
序列图:
图中展示大致意思:当我们运行一个协程的时候,立即创建一个Task
,由EventLoop
驱动Task
,然后Task
驱动print_sum
。当协程中调用了另外一个子协程的时候,是直接由Task
和子协程通信的。直至子协程运行完毕抛出StopIteration
异常,然后父协程会捕捉到异常并提取出结果,父协程运行完毕,同样抛出异常,逐层往上抛出然后终止Task
。重点在于Task
和子协程compute
之间的通道,以及异常抛出拦截。
The “Task” is created by the
AbstractEventLoop.run_until_complete()
method when it gets a coroutine object instead of a task.
意思是,图中的Task
并不是一个任务而是一个协程对象。
The diagram shows the control flow, it does not describe exactly how things work internally. For example, the sleep coroutine creates an internal future which uses
AbstractEventLoop.call_later()
to wake up the task in 1 second.
意思是,图大致讲了如何在协程中调用子协程,但是内部实现没有体现出来。
例如:当调用asyncio.sleep(1.0)
的时候会创建一个内部的future
对象然后使用 AbstractEventLoop.call_later()
在一秒后唤醒任务。
asyncio中的其他函数
call_soon 函数
1 | import asyncio |
call_later 函数
1 | import asyncio |
从输出看出 call_later
并不是根据添加的顺序执行的 而是根据延迟的时间。
为了进一步比较call_later
和call_soon
的区别我们看下下面代码的输出
1 | if __name__ == '__main__': |
我们看到call_soon
执行是比call_later
要早的 是下个循环立即执行
call_at函数
call_at
函数可以让我们指定时间运行回调函数,这里的时间是 loop
里面的时间 不是传统的时间
1 | if __name__ == '__main__': |
call_soon_threadsafe 函数
这是一个线程安全的函数 作用和 call_soon
一样
asyncio
是可以在多线程环境下运行的,asyncio
是一整套的异步IO解决方案,不仅可以解决协程调度问题,还可以解决线程、进程问题。
1 | def call_soon_threadsafe(self, callback, *args): |
当我们在多线程中 多个回调函数使用了一个变量 可以使用这个来保证线程安全