协程和异步IO-下

🐴上我们就要进入到真正的协程概念学习了,这之前我们先了解一下C10M问题。

C10M:如何利用8核心CPU,64G内存,在10gbps的网络上保持1000万并发连接。

通过协程就能解决这类问题。

在我们正式学习协程含义之前,我们先看下面临的问题:

  1. 回调模式编码复杂度高
  2. 同步编程的并发性不高
  3. 多线程编程需要线程间同步(线程间同步我们使用锁Lock机制,但是锁机制会降低并发性能)

如果我们想解决👆的问题,我们应该需要做的是:

  1. 采用同步的方式编写异步的代码

  2. 使用单线程去切换任务

    如何使用单线程切换任务呢,解决这个问题我们又有👇的挑战:

    1. 线程是由操作系统切换的,如果我们声明一个线程操作系统会帮我们自动切换。如果我们想在单线程中切换,就需要我们自己去调度任务。
    2. 实现了单线程中切换是不需要锁🔐机制的(锁的目的是实现线程间的同步,我们在一个线程内进行切换 意味着我们不再需要锁了),并且并发性高。我们类比线程内切换为函数切换,单线程内切换函数性能远高于线程切换,并发性更高。

在传统模式下的函数调用中A->B->C

函数只要执行一次就不会再执行了(即不会再进入函数了),因此是无法进行在函数之间切换。

当我们在单线程中想进行函数切换的时候,需要面临的一个挑战就是:如何实现一个可以暂停的函数,并且在适当的时候恢复该函数的继续执行。

为了解决上面的问题,就出现了协程。

协程的概念

协程:的概念很多。我们可以理解为有多个入口的函数,可以暂停的函数(可以向暂停的地方传入值)。

生成器进阶-sendclosethrow方法

我们知道协程是一个可以暂停的函数,而生成器就是可以暂停的。如何将生成器和协程关联起来呢?

我们先学一下生成器的高级知识吧!

生成器的send方法

我们要学习的第一个知识点就是:

生成器不仅可以产出值,还可以接收值

1
2
3
4
5
6
def gen_func():
# 1. 可以产出值(= 右侧就是产出值)
# 2. 可以接收值(调用方传递进来的值)(= 左侧就是接收值)
html = yield "http://projectsedu.com"
print(html)
yield 2

第二个知识点:

启动生成器有两种方式:调用next()或者send()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def gen_func():
#1. 可以产出值, 2. 可以接收值(调用方传递进来的值)
html = yield "http://projectsedu.com"
print(html)
yield 2

if __name__ == "__main__":
gen = gen_func()
# 在调用send发送非none值之前,我们必须启动一次生成器
# 方式有两种 1. gen.send(None), 2. next(gen)
url = gen.send(None)
# download url
print(url)
html = "红烧肉"
# send 不仅启动了生成器 还将执行到了下一个yield位置
print(gen.send(html))

# 输出
http://projectsedu.com
红烧肉
2

注意send方法的作用:send方法可以传递值进入生成器内部,同时还可以重启生成器执行到下一个yield位置,next是不能传递值的

正是生成器的send方法使暂停了的方法能够重新运行起来,这是生成器变成协程的基础。

生成器的close方法

我们可以使用close方法关闭生成器,关闭之后再次调用生成器就会报错。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def gen_func():
yield "http://projectsedu.com"
yield 2
yield 3
return "红烧肉"


if __name__ == "__main__":
gen = gen_func()
print(next(gen))
gen.close()
print(next(gen))
# 输出
http://projectsedu.com
Traceback (most recent call last):
File "gen_close.py", line 14, in <module>
print(next(gen))
StopIteration

为什么会出现上面的报错呢?

这就是close方法导致的了,close方法会在调用的yield语句地方抛出一个异常GeneratorExit

而异常GeneratorExit是继承BaseException不是Exception

BaseExceptionException更基础

这个方法我们只要知道会关闭已经创建好的生成器即可。

生成器的throw方法

生成器的throw方法是能够向生成器内部扔进去一个异常的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def gen_func():
try:
yield "http://projectsedu.com"
except Exception as e:
pass
yield 2
yield 3


if __name__ == "__main__":
gen = gen_func()
print(next(gen))
gen.throw(Exception, "download error")
print(next(gen))

扔进去的异常产生的位置是调用throw方法之前的yield位置处。

生成器进阶-yield from

我们通过yield from来实现链式连接的功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
my_list = [1, 2, 3]
my_dict = {
"bobby1": "http://projectsedu.com",
"bobby2": "http://www.imooc.com",
}

# yield from iterable
def my_chain(*args, **kwargs):
for my_iterable in args:
yield from my_iterable
# for value in my_iterable:
# yield value


for value in my_chain(my_list, my_dict, range(5, 10)):
print(value)

# 输出
1
2
3
bobby1
bobby2
5
6
7
8
9

yield from并不是简单地yield而是,如果后面是可迭代对象,会依次yield出可迭代对象里面的内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def g1(iterable):
# yield 原样返回对象
yield iterable


def g2(iterable):
# yield from 则是迭代输出
yield from iterable


for value in g1(range(10)):
print(value)
for value in g2(range(10)):
print(value)

# 输出

range(0, 10)
0
1
2
3
4
5
6
7
8
9

下面我们看下yield from的真正作用。

1
2
3
4
5
6
7
def g1(gen):
yield from gen


def main():
g = g1()
g.send(None)

我们看下上面代码的关系:

main为调用方,g1为委托生成器,gen为子生成器

yield from会在调用方与子生成器之间建立一个双向通道,两者之间可以直接通信(调用方可以直接唤醒或传递信息给子生成器)

有了这个点,我们在将同步的思维运用到协程里面的时候才有可能。

我们看下老师写的一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
final_result = {}


def sale s_sum(pro_name):
total = 0
nums = []
while True:
x = yield
print(pro_name + "销量: ", x)
if not x:
break
total += x
nums.append(x)
return total, nums


def middle(key):
while True:
final_result[key] = yield from sales_sum(key)
print(key + "销量统计完成!!.")


def main():
data_sets = {
"bobby牌面膜": [1200, 1500, 3000],
"bobby牌手机": [28, 55, 98, 108],
"bobby牌大衣": [280, 560, 778, 70],
}
for key, data_set in data_sets.items():
print("start key:", key)
m = middle(key)
m.send(None) # 预激middle协程 相当于打通 子生成器和 main 函数
for value in data_set:
m.send(value) # 给协程传递每一组的值
m.send(None)
print("final_result:", final_result)


if __name__ == '__main__':
main()

为什么要使用yield from呢,为什么不直接在委托生成器中直接使用迭代器呢?

我们看下子生成器单独使用的时候会发生什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def sales_sum(pro_name):
total = 0
nums = []
while True:
x = yield
print(pro_name + "销量: ", x)
if not x:
break
total += x
nums.append(x)
return total, nums


if __name__ == "__main__":
my_gen = sales_sum("bobby牌手机")
my_gen.send(None)
my_gen.send(1200)
my_gen.send(1500)
my_gen.send(3000)
my_gen.send(None)

# 输出

my_gen.send(None)
StopIteration: (5700, [1200, 1500, 3000])

在我们最后发送None的时候报异常但是还是把数据返回了 。

如果我们不想展示出异常而是正常返回数据,需要处理一下异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def sales_sum(pro_name):
total = 0
nums = []
while True:
x = yield
print(pro_name + "销量: ", x)
if not x:
break
total += x
nums.append(x)
return total, nums


if __name__ == "__main__":
my_gen = sales_sum("bobby牌手机")
my_gen.send(None)
my_gen.send(1200)
my_gen.send(1500)
my_gen.send(3000)
try:
my_gen.send(None)
except StopIteration as e:
result = e.value
print(result)
# 输出
bobby牌手机销量: 1200
bobby牌手机销量: 1500
bobby牌手机销量: 3000
bobby牌手机销量: None
(5700, [1200, 1500, 3000])

我们看到,当我们使用yield from的时候,会自动帮我们处理好异常。

下面我们看下yield from的具体实现原理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# pep380

# 1. RESULT = yield from EXPR可以简化成下面这样
# 一些说明
"""
_i:子生成器,同时也是一个迭代器
_y:子生成器生产的值
_r:yield from 表达式最终的值
_s:调用方通过send()发送的值
_e:异常对象

"""

_i = iter(EXPR) # EXPR是一个可迭代对象,_i其实是子生成器;
try:
_y = next(_i) # 预激子生成器,把产出的第一个值存在_y中;
except StopIteration as _e:
_r = _e.value # 如果抛出了`StopIteration`异常,那么就将异常对象的`value`属性保存到_r,这是最简单的情况的返回值;
else:
while 1: # 尝试执行这个循环,委托生成器会阻塞;
_s = yield _y # 生产子生成器的值,等待调用方`send()`值,发送过来的值将保存在_s中;
try:
_y = _i.send(_s) # 转发_s,并且尝试向下执行;
except StopIteration as _e:
_r = _e.value # 如果子生成器抛出异常,那么就获取异常对象的`value`属性存到_r,退出循环,恢复委托生成器的运行;
break
RESULT = _r # _r就是整个yield from表达式返回的值。

# 👆就是阐明了yield from是如何解决StopIteration异常的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# 上面只是子生成器是一个理想的生成器的单一情况,当情况是下面的任一情况的时候,👆的逻辑就会有问题了。

"""
1. 子生成器可能只是一个迭代器,并不是一个作为协程的生成器,所以它不支持.throw()和.close()方法;
2. 如果子生成器支持.throw()和.close()方法,但是在子生成器内部,这两个方法都会抛出异常;
3. 调用方让子生成器自己抛出异常
4. 当调用方使用next()或者.send(None)时,都要在子生成器上调用next()函数,当调用方使用.send()发送非 None 值时,才调用子生成器的.send()方法;
"""
_i = iter(EXPR)
try:
_y = next(_i)
except StopIteration as _e:
_r = _e.value
else:
while 1:
try:
_s = yield _y
except GeneratorExit as _e:
try:
_m = _i.close
except AttributeError:
pass
else:
_m()
raise _e
except BaseException as _e:
_x = sys.exc_info()
try:
_m = _i.throw
except AttributeError:
raise _e
else:
try:
_y = _m(*_x)
except StopIteration as _e:
_r = _e.value
break
else:
try:
if _s is None:
_y = next(_i)
else:
_y = _i.send(_s)
except StopIteration as _e:
_r = _e.value
break
RESULT = _r
1
2
3
4
5
6
7
8
9
10
11
12
13
14
看完代码,我们总结一下关键点:

1. 子生成器生产的值,都是直接传给调用方的;调用方通过.send()发送的值都是直接传递给子生成器的;如果发送的是 None,会调用子生成器的__next__()方法,如果不是 None,会调用子生成器的.send()方法;

2. 子生成器退出的时候,最后的return EXPR,会触发一个StopIteration(EXPR)异常;

3. yield from表达式的值,是子生成器终止时,传递给StopIteration异常的第一个参数;

4. 如果调用的时候出现StopIteration异常,委托生成器会恢复运行,同时其他的异常会向上 "冒泡"

5. 传入委托生成器的异常里,除了GeneratorExit之外,其他的所有异常全部传递给子生成器
的.throw()方法;如果调用.throw()的时候出现了StopIteration异常,那么就恢复委托生成器的运行,其他的异常全部向上 "冒泡"

6. 如果在委托生成器上调用.close()或传入GeneratorExit异常,会调用子生成器的.close()方法,没有的话就不调用。如果在调用.close()的时候抛出了异常,那么就向上 "冒泡",否则的话委托生成器会抛出GeneratorExit异常。

看完这个觉得yield from真的是个好东西啊!

async和await

在3.5之前都是通过生成器来实现协程的。

python为了使语义更加明确,就引入了async和await关键词用于定义原生的协程

1
2
3
4
5
6
7
8
9
10
11
12
13
async def downloader(url):
return "bobby"

async def download_url(url):
# dosomethings
html = await downloader(url)
return html


if __name__ == "__main__":
coro = download_url("http://www.imooc.com")
# next(None) 异常
coro.send(None)

我们通过asyncawait实现的协程是不能使用next激活的。

注意:asyncawait是成对出现的,这async中是不能使用yield的。

我们可以把await理解成yield from

await语句后面的对象其实是一个Awaitable对象。

我们可以使用装饰器,将一个普通函数改变为Awaitable对象。

1
2
3
4
5
6
7
8
async def downloader(url):
return "bobby"
import types


@types.coroutine
def downloader(url):
yield "bobby"

协程是需要运用到事件循环中的,下一小节我们就开始真正使用了。💪

知识就是财富
如果您觉得文章对您有帮助, 欢迎请我喝杯水!