Python的多线程多进程和线程池编程-上

Python中的全局解释器锁GIL

我们首先看下Python中的GIL(global interpreter lock)。

我们现在的编程环境是cpython,Python中的一个线程对应于c语言中的一个线程。

在前期的Python中,为了简单化运行多线程,在解释器中增加一把全局解释器锁,这把锁🔐使得同一时刻只有一个线程在一个CPU上执行字节码,无法将多个线程映射到多个CPU上。

我们如果运行了一个进程,不管里面有多少个线程,只能运行在一个CPU上,这样就无法展现多CPU的优势。

对于多线程编程,并不是一个线程执行完毕再释放GIL,而是根据一些释放规则来的。

正是因为不是完全执行完一个线程之后释放GIL,因此多线程工作的时候如果不考虑线程同步,就会出现问题。

我们来验证一下是否是一个线程执行完毕之后再释放GIL的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
total = 0


# 对全局变量加
def add():
# 1. dosomething1
# 2. io操作 执行到这里的时候 会释放GIL
# 3. dosomething3
global total
for i in range(1000000):
total += 1


# 对全局变量减
def desc():
global total
for i in range(1000000):
total -= 1


import threading

thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
# 开启线程
thread1.start()
thread2.start()

# 等待结束
thread1.join()
thread2.join()
print(total)

# 执行三次输出结果
52164
-515962
312919

理论上执行结果应该为0,但是三次执行之后全不为0,这就验证了并不是在一个线程结束之后才释放GIL。

GIL会在三种情况下释放

  1. GIL会根据执行的字节码行数释放
  2. 设置时间片来释放
  3. GIL在遇到IO操作的时候回主动释放

正是因为在遇到IO操作的时候回主动释放GIL,因此多线程在IO操作频繁的情况下非常实用。

多线程编程

线程是系统能够切换和调度的最小单元。

对于IO操作来说,多线程和多进程性能差别不大(多线程新能稍微好点),对于操作系统来说,线程之前的切换更加容易。

通过Thread类实例化来实现多线程

我们可以通过setDaemon将线程设置为守护线程,当主线程退出后,守护线程也不再执行。

1
2
thread1.setDaemon(True)
thread2.setDaemon(True)

我们可以通过join设置主线程等待子线程执行完毕再执行

1
2
thread1.join()
thread2.join()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# 对于操作系统来说 线程是能够切换和调度的最小单元
# 对于io操作来说,多线程和多进程性能差别不大
# 1.通过Thread类实例化

import time
import threading


def get_detail_html(url):
print("get detail html started")
time.sleep(2)
print("get detail html end")


def get_detail_url(url):
print("get detail url started")
time.sleep(4)
print("get detail url end")


if __name__ == "__main__":
thread1 = threading.Thread(target=get_detail_html, args=("", ))
thread2 = threading.Thread(target=get_detail_url, args=("", ))
start_time = time.time()

# thread1.setDaemon(True)
# thread2.setDaemon(True)
thread1.start()
thread2.start()

# 设置等待两个线程结束之后再执行主线程
thread1.join()
thread2.join()

last_time = time.time() - start_time

print("last time: {}".format(last_time))

# 输出结果
get detail html started
get detail url started
get detail html end
get detail url end
last time: 4.005519151687622

从执行结果我们看到:

主线程是在子线程执行完毕之后再执行的,程序执行时间是4秒(是线程中的最大时间),是并发执行的。

如果我们没有设置join() last_time不是我们期望的4而是接近0的小数,为什么这样呢?

这是因为我们使用了main作为代码入口,相当于也开启了一个线程,我们打开debug调试窗看到的确有三个线程,当只是调用start函数开启线程后,三个线程是并行的,主线程是直接往下执行的,因此为0。

通过继承Thread来实现多线程

在代码量比较小的情况下,我们可以使用上面的方式,在代码量大的时候,我们需要通过继承Thread来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class GetDetailHtml(threading.Thread):
def __init__(self, name):
super().__init__(name=name)


# 使用Thread 类需要重写 run 方法
def run(self):
print(self.name)
print("get detail html started")
time.sleep(2)
print("get detail html end")


class GetDetailUrl(threading.Thread):
def __init__(self, name):
super().__init__(name=name)

def run(self):
print(self.name)
print("get detail url started")
time.sleep(4)
print("get detail url end")


if __name__ == "__main__":
thread1 = GetDetailHtml("get_detail_html")
thread2 = GetDetailUrl("get_detail_url")
start_time = time.time()
thread1.start()
thread2.start()

thread1.join()
thread2.join()

print("last time: {}".format(time.time() - start_time))

输出结果和上面的一直:
get_detail_html
get detail html started
get_detail_url
get detail url started
get detail html end
get detail url end
last time: 4.001453161239624

我们通过继承Thread并重写run方法,也能实现多线程。

使用继承Thread类可以实现一些逻辑复杂的多线程编程,实现自己的逻辑。比如我们可以在实例化的时候指定线程的名字。

1
super().__init__(name=name)

这里之所以使用super关键字,是因为Thread类实例化的时候已经有了name属性,我们直接复用父类的属性。

1
2
3
# Thread的实例化方法
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, *, daemon=None):

线程间通信-共享变量和Queue

使用共享变量通信

使用共享变量(global)进行线程间通信不是线程安全的,如果想使用需要增加一把锁。

使用Queue进行线程同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# 通过queue的方式进行线程间同步

import time
import threading
from queue import Queue


def get_detail_html(queue):
# 爬取文章详情页
while True:
# get 方法是阻塞方式,当不存在数据的时候,一直阻塞着
url = queue.get()
print("get detail html started")
time.sleep(2)
print("get detail html end")


def get_detail_url(queue):
# 爬取文章列表页
while True:
print("get detail url started")
time.sleep(4)
for i in range(20):
# 使用put方法往queue中放数据
queue.put("http://projectsedu.com/{id}".format(id=i))
print("get detail url end")


if __name__ == "__main__":
detail_url_queue = Queue(maxsize=1000)

thread_detail_url = threading.Thread(target=get_detail_url, args=(detail_url_queue,))
for i in range(10):
html_thread = threading.Thread(target=get_detail_html, args=(detail_url_queue,))
html_thread.start()
start_time = time.time()

# 下面的两个 函数 都是 Queue 的


# join函数会一直阻塞主线程,直至获得一个task_done
detail_url_queue.join()

# 发送一个task_done函数到队列
detail_url_queue.task_done()

# 上面两个是成对出现,因此我们可以实现一些逻辑,比如在执行到某个程度的时候,发送一个task_done来停止主线程

print("last time: {}".format(time.time() - start_time))

使用Queue相比于共享变量是线程安全的,我们看下源码为什么

首先看下get方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
    def get(self, block=True, timeout=None):
'''Remove and return an item from the queue.

If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until an item is available. If 'timeout' is
a non-negative number, it blocks at most 'timeout' seconds and raises
the Empty exception if no item was available within that time.
Otherwise ('block' is false), return an item if one is immediately
available, else raise the Empty exception ('timeout' is ignored
in that case).
'''
with self.not_empty:
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time() + timeout
while not self._qsize():
remaining = endtime - time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
item = self._get()
self.not_full.notify()
return item

# 深入 item = self._get()

def _init(self, maxsize):
self.queue = deque()

def _qsize(self):
return len(self.queue)

# Put a new item in the queue
def _put(self, item):
self.queue.append(item)

# Get an item from the queue
def _get(self):
return self.queue.popleft()

我们看到上述的代码段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62



我们可以详细查看下`Queue`的方法有哪些,图片左侧。

![Queue更多的方法](https://static01.imgkr.com/temp/da7d39e40296482496e7d46c2fa2f4dc.png)

`Queue.Queue(maxsize=0)` #FIFO, 用来定义队列的长度,如果maxsize小于1就表示队列长度无限,

`Queue.LifoQueue(maxsize=0) `#LIFO, 如果maxsize小于1就表示队列长度无限

`Queue.qsize()` #返回队列的大小

`Queue.empty() `#如果队列为空,返回True,反之False ,在线程间通信的过程中,可以通过此来给消费者等待信息

`Queue.full() `# 如果队列满了,返回True,反之False,给生产者提醒

`Queue.get([block[, timeout]])` `读队列,timeout等待时间

`Queue.put(item, [block[, timeout]])` 写队列,timeout等待时间

`Queue.queue.clear()` 清空队列

`task_done()`#意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕如果当前一个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执行(即每一个由put()调用入队的任务都有一个对应的task_done()调用)。

`join()`#阻塞调用线程,直到队列中的所有任务被处理掉。只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用task_done((意味着有消费者取得任务并完成任务),未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞。


### 线程同步 Lock、RLock

在最开始我们学习GIL的时候,使用共享变量进行线程通信,出现了问题,为什么会出现问题呢?

我们将那两个方法简化下,看看字节码:

```python
a = 0

def add1(a):
a += 1

def desc1(a):
a -= 1

import dis
print(dis.dis(add1))
print(dis.dis(desc1))

# 对应字节码
10 0 LOAD_FAST 0 (a) 1. load a a=0
2 LOAD_CONST 1 (1) 2. load 1 1
4 INPLACE_ADD 3. + 1
6 STORE_FAST 0 (a) 4. 赋值给a 1=1
8 LOAD_CONST 0 (None)
10 RETURN_VALUE
None
13 0 LOAD_FAST 0 (a) 1. load a a=0
2 LOAD_CONST 1 (1) 2. load 1 1
4 INPLACE_SUBTRACT 3. - 1
6 STORE_FAST 0 (a) 4. 赋值给a 1=1
8 LOAD_CONST 0 (None)
10 RETURN_VALUE
None

存在函数字节码执行不是原子操作 导致变量赋值存在异常

两个函数的字节码如上面所示,在依次执行字节码的过程中,有可能出现线程切换导致变量赋值在了不同的函数上面,例如第四步。这样导致最终的返回值不是0。

为了解决上面的问题,我们需要指定一段代码段要全部运行,即加把锁。也就是说凡是用锁锁上的代码段,只能有一段在执行,只有释放了锁之后,其他代码段才能执行。

使用Lock进行线程同步

我们使用一开始的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from threading import Lock, RLock, Condition  # 可重入的锁

# 在同一个线程里面,可以连续调用多次acquire, 一定要注意acquire的次数要和release的次数相等

total = 0

# 先生成一个锁
lock = Lock()


def add():
global lock
global total
for i in range(1000000):
# 我们使用锁 将下面的代码(对应的字节码)加锁

# 使用acquire来获得锁,只有获得锁的才能执行
lock.acquire()
total += 1
# 使用release释放锁,必须释放掉不然会一直阻塞
lock.release()


def desc():
global total
global lock
for i in range(1000000):
lock.acquire()
total -= 1
lock.release()


import threading

thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()
thread2.start()


thread1.join()
thread2.join()
print(total)

# 输出
0

当我们加了锁之后,无论运行多少次都是0。

是不是加了锁就是完美大吉了呢?答案是不。

锁的存在会有两个问题:

  1. 影响性能(获取锁 和释放锁都会消耗时间)

  2. 会引起死锁

引起死锁的情况大致包括:循环等待变量释放,连续两次调用acquire

1
2
3
4
5
lock.acquire()
lock.acquire()
total += 1
lock.release()
lock.release()

上面这种方式,就会容易出现死锁,因为在acquire后没有释放,再次acquire

针对这种问题,我们可以使用可重入锁:RLock

可重入顾名思义就是可以多次获取锁。

1
2
3
4
5
6
locl = RLock
lock.acquire()
lock.acquire()
total += 1
lock.release()
lock.release()

当我们把锁改为可重入锁之后连续两次调用acquire就不会出现死锁可。

注意:这里所说的可以多次调用是指在同一个线程中,一定要注意acquire的次数要和release的次数相等

线程同步-条件变量condition

条件变量, 用于复杂的线程间同步

这一节我们通过条件变量condition来进行线程同步来实现下面的对话

1
2
3
4
5
6
7
8
9
10
11
12
天猫精灵 : 小爱同学 
小爱 : 在
天猫精灵 : 我们来对古诗吧
小爱 : 好啊
天猫精灵 : 我住长江头
小爱 : 君住长江尾
天猫精灵 : 日日思君不见君
小爱 : 共饮长江水
天猫精灵 : 此水几时休
小爱 : 此恨何时已
天猫精灵 : 只愿君心似我心
小爱 : 定不负相思意

我们先看下condition的源码:

condition的源码

我们看左侧,Condition实现了上下文协议,因此可以使用with语句。里面有两个关键函数。

wait 在等待线程被唤醒 必须由 notify唤醒才能往下执行代码

notify向外通知信息

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import threading


class XiaoAi(threading.Thread):
def __init__(self, cond):
super().__init__(name="小爱")
self.cond = cond

def run(self):
with self.cond:
# 这里需要别的线程notify才能启动
# 在调用with cond之后才能调用wait或者notify方法
# 这里如果不使用with语句 也可以使用self.cond.acquire()

# self.cond.acquire()

self.cond.wait() # 这里先启动小爱同学 然后让其等待
print("{} : 在 ".format(self.name))
self.cond.notify()

self.cond.wait()
print("{} : 好啊 ".format(self.name))
self.cond.notify()

self.cond.wait()
print("{} : 君住长江尾 ".format(self.name))
self.cond.notify()

self.cond.wait()
print("{} : 共饮长江水 ".format(self.name))
self.cond.notify()

self.cond.wait()
print("{} : 此恨何时已 ".format(self.name))
self.cond.notify()

self.cond.wait()
print("{} : 定不负相思意 ".format(self.name))
self.cond.notify()
# 如果上面使用的是self.cond.acquire() 这里需要使用release来释放锁
# self.cond.release()


class TianMao(threading.Thread):
def __init__(self, cond):
super().__init__(name="天猫精灵")
self.cond = cond

def run(self):
self.cond.acquire()
print("{} : 小爱同学 ".format(self.name))
self.cond.notify()
# 发送信号给等待的wait

self.cond.wait()
print("{} : 我们来对古诗吧 ".format(self.name))
self.cond.notify()

self.cond.wait()
print("{} : 我住长江头 ".format(self.name))
self.cond.notify()

self.cond.wait()
print("{} : 日日思君不见君 ".format(self.name))
self.cond.notify()

self.cond.wait()
print("{} : 此水几时休 ".format(self.name))
self.cond.notify()

self.cond.wait()
print("{} : 只愿君心似我心 ".format(self.name))
self.cond.notify()

self.cond.wait()
self.cond.release()


if __name__ == "__main__":
from concurrent import futures

cond = threading.Condition()
xiaoai = XiaoAi(cond)
tianmao = TianMao(cond)

# 启动顺序很重要
xiaoai.start()
tianmao.start()

上面代码可能有的同学就疑惑了,为什么小爱同学先启动,可以看上面说的知识点。

我们再看下源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class Condition:
"""Class that implements a condition variable.

A condition variable allows one or more threads to wait until they are
notified by another thread.

If the lock argument is given and not None, it must be a Lock or RLock
object, and it is used as the underlying lock. Otherwise, a new RLock object
is created and used as the underlying lock.

"""

def __init__(self, lock=None):
if lock is None:
lock = RLock()
self._lock = lock
# Export the lock's acquire() and release() methods
self.acquire = lock.acquire
self.release = lock.release
# If the lock defines _release_save() and/or _acquire_restore(),
# these override the default implementations (which just call
# release() and acquire() on the lock). Ditto for _is_owned().
try:
self._release_save = lock._release_save
except AttributeError:
pass
try:
self._acquire_restore = lock._acquire_restore
except AttributeError:
pass
try:
self._is_owned = lock._is_owned
except AttributeError:
pass
self._waiters = _deque()

在condition中也是需要锁的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def wait(self, timeout=None):
"""Wait until notified or until a timeout occurs.
"""
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock()
waiter.acquire()
self._waiters.append(waiter)
saved_state = self._release_save() # 这里释放外层锁
gotit = False
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
waiter.acquire()
gotit = True
else:
if timeout > 0:
gotit = waiter.acquire(True, timeout)
else:
gotit = waiter.acquire(False)
return gotit
finally:
self._acquire_restore(saved_state) # 这里重新获取了外层锁
if not gotit:
try:
self._waiters.remove(waiter)
except ValueError:
pass

对于wait方法,注释文档中有说明,在一个线程中的wait方法,需要传入条件变量,即需由其他线程的notify通知才能启动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def notify(self, n=1):
"""Wake up one or more threads waiting on this condition, if any.

If the calling thread has not acquired the lock when this method is
called, a RuntimeError is raised.

This method wakes up at most n of the threads waiting for the condition
variable; it is a no-op if no threads are waiting.

"""
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
all_waiters = self._waiters
waiters_to_notify = _deque(_islice(all_waiters, n))
if not waiters_to_notify:
return
for waiter in waiters_to_notify:
waiter.release()
try:
all_waiters.remove(waiter)
except ValueError:
pass

condition有两层锁, 一把底层锁会在线程调用了wait方法的时候释放, 上面的锁会在每次调用wait的时候分配一把并放入到cond的等待队列中,等到notify方法的唤醒

首先代码执行到小爱同学:

1
2
3
def run(self):
with self.cond:
self.cond.wait()

代码执行到这里将会阻塞,我们看下wait源码:

1
2
3
4
waiter = _allocate_lock()	# 生成一把锁
waiter.acquire() # 获得锁
self._waiters.append(waiter) # 将锁追加到一个双端队列中
saved_state = self._release_save() # 释放底层锁即条件变量对象的锁

这段代码将会阻塞下去,这时候天猫精灵线程就开始运行了。

1
2
3
4
5
6
  def run(self):
self.cond.acquire()
print("{} : 小爱同学 ".format(self.name))
self.cond.notify()
# 发送信号给等待的wait
self.cond.wait()

代码执行到这里又会阻塞下去。

如此周而复始,一直到结束。

知识就是财富
如果您觉得文章对您有帮助, 欢迎请我喝杯水!