asyncio并发编程-上

这一篇是高级编程的最后一篇了,学完整套课程之后对Python的知识很多地方有了豁然开朗的感觉。如果你看到了这个系列的文章觉得不错的话,可以购买老师的原版视频文件,进行学习哦!购买链接:Python高级编程和异步IO并发编程

asyncioPython中解决异步I/O高并发的一个模块,在3.4版本之后引入。

asyncio的事件循环

我们先看下asyncio有哪些功能:

  1. 包含各种特定系统实现的模块化事件循环(针对不同系统都能兼容的事件循环:例如Windows下的select,linux下的epoll。)

  2. 传输和协议抽象(对TCP和UDP协议的抽象)

  3. 对TCP、UDP、SSL、子进程、延时调用以及其他的具体支持

  4. 模仿futures模块但适用于事件循环使用的Future
  5. 基于yield from的协议和任务,可以让我们使用顺序的方式编写并发代码

  6. 必须使用一个将产生阻塞IO的调用时,有接口可以把这个事件迁移到线程池

  7. 模仿threading模块中的同步原语,可以用在单线程内的协程之间

前面我们学习了协程,但是协程脱离事件循环意义就不是很大了。

下面我们开始学习asyncio的使用吧!💪

首先我们明确一点,高并发异步IO编程的编码模式由三部分组成:

事件循环+回调(驱动生成器)+epoll(IO多路复用)

asyncioPython用于解决异步io编程的一整套解决方案

有趣的小知识:

tornado也是基于asyncio的异步框架,通过协程和事件循环来完成高并发。相对于DjangoFlask这种传统的阻塞IO框架本身不提供web服务器,不会去完成Socket编码的,因此我们在部署的时候会搭配实现了SOcket编码的框架(uwsgi, gunicorn+nginx)。Tornado实现了自己的web服务器,因此我们部署Tornado的时候是可以直接部署的(会使用epoll来完成socket请求),但是真正部署的时候,还是会使用nginx来完成一些操作(IP限制等)。因此Tornado的数据库驱动就不能使用阻塞IO驱动框架了。

asyncio的简单使用:

协程要搭配事件循环才能使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
import time


async def get_html(url):
print("start get url")
await asyncio.sleep(2)
print("end get url")

if __name__ == "__main__":
start_time = time.time()

# 我们使用 asyncio 实现的事件循环 这个loop就可完成 之前我们自己实现的 事件循环 select 的操作
loop = asyncio.get_event_loop()

# 可以使用 run_until_complete 进行协程的调用 这是一个阻塞函数 可以理解为多线程编程中的jion方法 然后把 asyncio理解为协程池
loop.run_until_complete(get_html("http://www.imooc.com"))
print(time.time()-start_time)

# 输出
start get url
end get url
2.0019102096557617

我们可以同时执行多个协程,传入一个可迭代的任务对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio
import time


async def get_html(url):
print("start get url")
await asyncio.sleep(2)
print("end get url")

if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()

# 这个 tasks 可以是不同的协程
tasks = [get_html("http://www.imooc.com") for i in range(10)]

# asyncio.wait()函数会接收一个可迭代对象
loop.run_until_complete(asyncio.wait(tasks))
print(time.time()-start_time)

# 输出就不打印了 耗时大概两秒

注意:在协程中不能使用同步的时间睡眠 time.sleep(),否则当执行的协程超过一个的时候就会出现同步阻塞的情况。

要是哪个小伙伴想测试下上面那句话,可以将上面的代码await asyncio.sleep(2)改为time.sleep(2)你会发现运行的时间不再是两秒了,而是20+秒。

为什么不能再协程使用同步的sleep呢?

这就要说到我们的loop小朋友了,协程要配合事件循环的,我们在运行协程的时候当遇到await关键字就知道这是一个异步阻塞操作了,会在此处暂停返回一个Future对象,然后由loop小朋友再执行已经可以运行的协程。这样保证了能够异步执行操作。当我们直接在协程中使用sleep同步操作时候,不会暂停而是一直等待,这就是原因😊

如何获得协程的返回值呢?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import asyncio
import time


async def get_html(url):
print("start get url")
await asyncio.sleep(2)
return "红烧肉"


if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()

# 这里使用 asyncio.ensure_future 来获得一个future对象 是不是很像多线程编程中的 submit
get_future = asyncio.ensure_future(get_html("http://www.imooc.com"))

# 也可以使用 loop 的 create_task 两者用法一样
# task = loop.create_task()
# task 是 future 的子类

# 可以将future 对象传入到 run_until_complete
loop.run_until_complete(get_future)
# 通过 future 对象的 result函数获得结果
print(get_future.result())

# 输出
start get url
红烧肉

上面的代码还可以这么写

1
2
3
4
5
6
7
if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()
# get_future = asyncio.ensure_future(get_html("http://www.imooc.com"))
task = loop.create_task(get_html("http://www.imooc.com"))
loop.run_until_complete(task)
print(task.result())

我们看到使用loop.create_taskasyncio.ensure_future是一样的效果,具体区别我们稍后会学习到。💪

有没有小伙伴怀疑,当使用asyncio.ensure_future的时候是何时和我们创建的loop建立联系的呢,是在loop.run_until_complete(get_future)的时候吗?

让我们看下ensure_future的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def ensure_future(coro_or_future, *, loop=None):
"""Wrap a coroutine or an awaitable in a future.

If the argument is a Future, it is returned directly.
"""
if futures.isfuture(coro_or_future):
if loop is not None and loop is not coro_or_future._loop:
raise ValueError('loop argument must agree with Future')
return coro_or_future
elif coroutines.iscoroutine(coro_or_future):
# 看这里 当没有loop传入的时候,会获得当前loop 因为线程中只有这个一个 loop 这里启动loop和外层代码的loop是同一个loop
if loop is None:
loop = events.get_event_loop()

# 我们看到 内部同样是使用 create_task
task = loop.create_task(coro_or_future)
if task._source_traceback:
del task._source_traceback[-1]
return task
elif compat.PY35 and inspect.isawaitable(coro_or_future):
return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
else:
raise TypeError('A Future, a coroutine or an awaitable is required')

除了上面直接调用协程,我们还可以在协程执行完成之后进行一个回调。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio
import time
from functools import partial


async def get_html(url):
print("start get url")
await asyncio.sleep(2)
return "红烧肉"


# 当我们想要在 回调函数中传递参数的时候 注意 future 参数写在最后
def callback(url, future):
print(url)
print("send email to 红烧肉")


if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()
# get_future = asyncio.ensure_future(get_html("http://www.imooc.com"))
task = loop.create_task(get_html("http://www.imooc.com"))

task.add_done_callback(partial(callback, "http://www.imooc.com"))

loop.run_until_complete(task)

print(task.result())

我们使用partial将传入的参数,伪造成一个函数。

回调函数会默认接收一个 future 对象参数

wait和gather

我们上面已经使用了wait来进行多协程的运行,我们看下它的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
ALL_COMPLETED = concurrent.futures.ALL_COMPLETED


@coroutine
def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
"""Wait for the Futures and coroutines given by fs to complete.

The sequence futures must not be empty.

Coroutines will be wrapped in Tasks.

Returns two sets of Future: (done, pending).

Usage:

done, pending = yield from asyncio.wait(fs)

Note: This does not raise TimeoutError! Futures that aren't done
when the timeout occurs are returned in the second set.
"""
if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
if not fs:
raise ValueError('Set of coroutines/Futures is empty.')
if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
raise ValueError('Invalid return_when value: {}'.format(return_when))

if loop is None:
loop = events.get_event_loop()

fs = {ensure_future(f, loop=loop) for f in set(fs)}

return (yield from _wait(fs, timeout, return_when, loop))

这个wait我们理解为多线程中的wait,同样存在return_when参数,可以指定何时返回。

gather如何使用呢?

1
2
3
4
5
6
if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()
tasks = [get_html("http://www.imooc.com") for i in range(10)]
loop.run_until_complete(asyncio.gather(*tasks))
print(time.time()-start_time)

我们将wait直接修改为gather然后可迭代对象加上*即可。

两者的区别是什么呢?

  1. gather更加hight-level
  2. gather可以将协程分组
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
group1 = [get_html("http://projectsedu.com") for i in range(2)]
group2 = [get_html("http://www.imooc.com") for i in range(2)]

# 我们可以分组传递
loop.run_until_complete(asyncio.gather(*group1, *group2))

# 我们可以将先进行gather操作
group1 = asyncio.gather(*group1)
group2 = asyncio.gather(*group2)
loop.run_until_complete(asyncio.gather(group1, group2))

# 我们可以批量取消某个分组
group1 = asyncio.gather(*group1)
group2 = asyncio.gather(*group2)

group2.cancel()
task取消和子协程调用原理

我们先看下run_until_completerun_forever两个函数的区别。

run_until_complete在运行完指定的协程之后就会停止,而run_forever则会一直运行。

看下源码:

image-20180927141443200

在图片中我们看到run_until_complete里面同样使用了run_forever。但是,增加了一个回调_run_until_complete_cb

1
2
3
4
5
6
7
def _run_until_complete_cb(fut):
exc = fut._exception
if (isinstance(exc, BaseException) and not isinstance(exc, Exception)):
# Issue #22429: run_forever() already finished, no need to
# stop it.
return
fut._loop.stop()

在回调函数中当没有协程运行的时候会将loop即事件循环直接暂停。

asyncio会将loop放到future中,而future同样会被放到loop中。

因此我们可以在任何一个任务中停止掉 loop

如何取消协程中的task(future)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import asyncio


async def get_html(sleep_times):
print("waiting")
await asyncio.sleep(sleep_times)
print("done after {}s".format(sleep_times))


if __name__ == "__main__":
task1 = get_html(2)
task2 = get_html(3)
task3 = get_html(3)

tasks = [task1, task2, task3]

loop = asyncio.get_event_loop()

try:
loop.run_until_complete(asyncio.wait(tasks))
# 我们发送一个 controle + c 异常
except KeyboardInterrupt as e:
# 获得所有的task
all_tasks = asyncio.Task.all_tasks()
for task in all_tasks:
print("cancel task")

# 将task 取消 返回布尔值
print(task.cancel())

# 先将 loop 暂停
loop.stop()

# 记得将 loop 再次运行 run_forever 否则将报错
loop.run_forever()
finally:
# 最后 关闭 loop
loop.close()

有咩有小伙伴对all_tasks = asyncio.Task.all_tasks()这句代码疑惑?

image-20180927144138030

我们看了源码就知道了 因为全局只有一个loop,所以能够在任何位置轻松获得loop相关的信息。

如何在协程中插入子协程

我们看一段官方文档的代码:

官方文档叫chain coroutines链式协程?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio

async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0)
return x + y

async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

compute() is chained to print_sum(): print_sum() coroutine waits until compute() is completed before returning its result.

序列图:

image-20180927145609440

图中展示大致意思:当我们运行一个协程的时候,立即创建一个Task,由EventLoop驱动Task,然后Task驱动print_sum。当协程中调用了另外一个子协程的时候,是直接由Task和子协程通信的。直至子协程运行完毕抛出StopIteration异常,然后父协程会捕捉到异常并提取出结果,父协程运行完毕,同样抛出异常,逐层往上抛出然后终止Task。重点在于Task和子协程compute之间的通道,以及异常抛出拦截。

The “Task” is created by the AbstractEventLoop.run_until_complete() method when it gets a coroutine object instead of a task.

意思是,图中的Task并不是一个任务而是一个协程对象。

The diagram shows the control flow, it does not describe exactly how things work internally. For example, the sleep coroutine creates an internal future which uses AbstractEventLoop.call_later() to wake up the task in 1 second.

意思是,图大致讲了如何在协程中调用子协程,但是内部实现没有体现出来。

例如:当调用asyncio.sleep(1.0)的时候会创建一个内部的future对象然后使用 AbstractEventLoop.call_later() 在一秒后唤醒任务。

asyncio中的其他函数
call_soon 函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio


def callback(sleep_times):
print(f"success time {sleep_times}")


def stoploop(loop):
loop.stop()


if __name__ == '__main__':
loop = asyncio.get_event_loop()

# 这里传入的是函数名称 不是协程 因为很多时候 我们希望在循环体系中插入一个函数
# call_soon 是即刻执行 比不是下一行代码执行 而是等到下一个循环的时候执行
loop.call_soon(callback, 2)

# 停止时间循环
loop.call_soon(stoploop, loop)

# 因为我们传入的不是协程 而是函数 因此启动要使用 run_forever
loop.run_forever()
call_later 函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio


def callback(sleep_times):
print(f"success time {sleep_times}")


def stoploop(loop):
loop.stop()


if __name__ == '__main__':
loop = asyncio.get_event_loop()

# call_later 是延迟调用
loop.call_later(2, callback, 2)
loop.call_later(1, callback, 1)
loop.call_later(3, callback, 3)

loop.run_forever()

# 输出
success time 1
success time 2
success time 3

从输出看出 call_later并不是根据添加的顺序执行的 而是根据延迟的时间。

为了进一步比较call_latercall_soon的区别我们看下下面代码的输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
if __name__ == '__main__':
loop = asyncio.get_event_loop()

# call_later 是延迟调用
loop.call_later(2, callback, 2)
loop.call_later(1, callback, 1)
loop.call_later(3, callback, 3)

loop.call_soon(callback, 4)

loop.run_forever()
# 输出

success time 4
success time 1
success time 2
success time 3

我们看到call_soon执行是比call_later要早的 是下个循环立即执行

call_at函数

call_at函数可以让我们指定时间运行回调函数,这里的时间是 loop里面的时间 不是传统的时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
if __name__ == '__main__':
loop = asyncio.get_event_loop()

# 获得loop的当前时间
loop_time = loop.time()

# 使用 call_at 在 当前时间的基础上 延迟几秒执行回调
loop.call_at(loop_time + 2, callback, 2)
loop.call_at(loop_time + 1, callback, 1)
loop.call_at(loop_time + 3, callback, 3)

loop.call_soon(callback, 4)

loop.run_forever()

# 输出
success time 4
success time 1
success time 2
success time 3
call_soon_threadsafe 函数

这是一个线程安全的函数 作用和 call_soon一样

asyncio是可以在多线程环境下运行的,asyncio是一整套的异步IO解决方案,不仅可以解决协程调度问题,还可以解决线程、进程问题。

1
2
3
4
5
6
7
8
9
10
11
def call_soon_threadsafe(self, callback, *args):
"""Like call_soon(), but thread-safe."""
self._check_closed()
if self._debug:
self._check_callback(callback, 'call_soon_threadsafe')
handle = self._call_soon(callback, args)
if handle._source_traceback:
del handle._source_tracebac
# 又这个函数实现线程安全的
self._write_to_self()
return handle

当我们在多线程中 多个回调函数使用了一个变量 可以使用这个来保证线程安全

知识就是财富
如果您觉得文章对您有帮助, 欢迎请我喝杯水!