Python中的协程锁

关于协程锁

尽管asyncio库是使用单线程来实现协程的,但是它还是并发的,乱序执行的。可以说是单线程的调度系统,并且由于执行有延时或者I/O中断等因素,每个协程如果同步时,还是得使用一些同步对象来实现。

asyncio库中定义了一个锁对象Lock,它一次只允许一个协程来访问共享的资源,如果多协程访问就会阻塞起来,也就是说如果一个协程没有释放这个锁,别的协程是没有办法访问共享的资源的。

遇到的一个问题

Python 协程在遇到 await 时会显式切换,这种切换类似于 GIL 锁的切换,如果是非原子性操作,就会出现数据或操作顺序与预期不一致的现象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
In [5]:  import asyncio

In [6]: a = [0]

In [7]: async def f():
...: await asyncio.sleep(0.00001)
...:

In [8]: async def add():
...: b = a[0]
...: await f()
...: b += 1
...: a[0] = b
...:

In [10]: async def main():
...: coros = [add() for _ in range(100)]
...: await asyncio.gather(*coros)
...:

In [11]: asyncio.run(main())

In [12]: a[0]
Out[12]: 1

预期是 100,实际是 1。因为协程 1 读到的 b 为 0,遇到 await 切换到 f(),再切回到协程 2,读到的 b 为 0,遇到 await 切换到 f(),再切回到协程 3,读到的 b 为 0,当第 100 个协程切回到 f(),最后切回协程 99,此时 b+1 得到 1,再切回协程 98,此时 b+1 得到 1,最终 a[0]=1。

解决问题

加锁可以将并发变成串行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
In [1]: import asyncio

In [2]: a = [0]

In [3]: lock = asyncio.Lock()

In [4]: async def f():
...: await asyncio.sleep(0.00001)
...:

In [5]: async def add():
...: async with lock:
...: b = a[0]
...: await f()
...: b += 1
...: a[0] = b
...:

In [6]: async def main():
...: coros = [add() for _ in range(100)]
...: await asyncio.gather(*coros)
...:
...:

In [7]: asyncio.get_event_loop().run_until_complete(main())

In [8]: a[0]
Out[8]: 100
使用场景

对协程的执行顺序有要求,但执行代码非原子操作,且被await语句隔开。或者是对某个协程进行保护在多次调用的时候只有第一次调用成功,其他阻塞等待。适用于分配资源的场景,防止资源被重新分配。

源码

我们看下源码中的使用注释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Usage:
# 直接使用
lock = Lock()
...
await lock.acquire()
try:
...
finally:
lock.release()

Context manager usage:
# 使用上下文的方式
lock = Lock()
...
async with lock:
...

Lock objects can be tested for locking state:
# 判断当前锁是否锁着
if not lock.locked():
await lock.acquire()
else:
# lock is acquired
...

再跑个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import asyncio

import functools

def unlock(lock):
print('callback releasing lock')
lock.release()


async def coro1(lock):
print('coro1 waiting for the lock')
async with lock:
print('coro1 acquired lock')
print('coro1 released lock')


async def coro2(lock):
print('coro2 waiting for the lock')

try:
print('coro2 acquired lock')
finally:
print('coro2 released lock')
lock.release()


async def main(loop):
lock = asyncio.Lock()
print('acquiring the lock before starting coroutines')
await lock.acquire()
print('lock acquired: {}'.format(lock.locked()))

loop.call_later(0.1, functools.partial(unlock, lock))

print('waiting for coroutines')
await asyncio.wait([coro1(lock), coro2(lock)])




event_loop = asyncio.get_event_loop()

event_loop.run_until_complete(main(event_loop))


# 输出结构
acquiring the lock before starting coroutines
lock acquired: True
waiting for coroutines
coro1 waiting for the lock
coro2 waiting for the lock
coro2 acquired lock
coro2 released lock
coro1 acquired lock
coro1 released lock

参考文章:

Python 协程锁的意义

python里协程使用同步锁Lock

知识就是财富
如果您觉得文章对您有帮助, 欢迎请我喝杯水!