python多线程多进程和线程池编程-下

书接上回我们继续学习Python中的多线程。

1
2
# 优先队列
from queue import PriorityQueue
线程同步-Semaphore

Semaphore是控制线程进入数量的锁。

例子:对于一个文件来说,可以同时进行读和写,但是写一般只用于一个线程写,读可以允许多个,比如说我们指定十个线程进行读。

下面模拟一个爬虫,对线程数量进行控制。

一个线程用于抓取URL,用别的URL进行解析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import threading
import time


class HtmlSpider(threading.Thread):
def __init__(self, url, sem):
super().__init__()
self.url = url
self.sem = sem

def run(self):
# 模拟抓取网站
time.sleep(2)
print("got html text success")
# 这里需要释放 当释放的时候 数量会再加一
self.sem.release()


class UrlProducer(threading.Thread):
def __init__(self, sem):
super().__init__()
self.sem = sem

def run(self):
for i in range(20):
# 每调用一次acquire 维护的数量就会减一 当数量为0的时候回一直阻塞
self.sem.acquire()
html_thread = HtmlSpider("https://baidu.com/{}".format(i), self.sem)
html_thread.start()


if __name__ == "__main__":
# 生成一个支持三个线程并发的Semaphore
sem = threading.Semaphore(3)
url_producer = UrlProducer(sem)
url_producer.start()

Semaphore的大致原理是在每次线程执行前获得调用acquire,这时维护的数字会减一,当线程结束之后会加一。这样就完成了指定数量的线程并发。

一定要注意是在主线程中获得,然后在执行的子线程中释放。

我们看下源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class Semaphore:
"""This class implements semaphore objects.

Semaphores manage a counter representing the number of release() calls minus
the number of acquire() calls, plus an initial value. The acquire() method
blocks if necessary until it can return without making the counter
negative. If not given, value defaults to 1.

"""

# After Tim Peters' semaphore class, but not quite the same (no maximum)

def __init__(self, value=1):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self._cond = Condition(Lock())
self._value = value
def acquire(self, blocking=True, timeout=None):
"""Acquire a semaphore, decrementing the internal counter by one.

When invoked without arguments: if the internal counter is larger than
zero on entry, decrement it by one and return immediately. If it is zero
on entry, block, waiting until some other thread has called release() to
make it larger than zero. This is done with proper interlocking so that
if multiple acquire() calls are blocked, release() will wake exactly one
of them up. The implementation may pick one at random, so the order in
which blocked threads are awakened should not be relied on. There is no
return value in this case.

When invoked with blocking set to true, do the same thing as when called
without arguments, and return true.

When invoked with blocking set to false, do not block. If a call without
an argument would block, return false immediately; otherwise, do the
same thing as when called without arguments, and return true.

When invoked with a timeout other than None, it will block for at
most timeout seconds. If acquire does not complete successfully in
that interval, return false. Return true otherwise.

"""
if not blocking and timeout is not None:
raise ValueError("can't specify timeout for non-blocking acquire")
rc = False
endtime = None
with self._cond:
while self._value == 0:
if not blocking:
break
if timeout is not None:
if endtime is None:
endtime = _time() + timeout
else:
timeout = endtime - _time()
if timeout <= 0:
break
self._cond.wait(timeout)
else:
self._value -= 1
rc = True
return rc

__enter__ = acquire

def release(self):
"""Release a semaphore, incrementing the internal counter by one.

When the counter is zero on entry and another thread is waiting for it
to become larger than zero again, wake up that thread.

"""
with self._cond:
self._value += 1
self._cond.notify()

内部也是通过Condition来实现的。

ThreadPoolExecutor线程池

我们为什么需要线程池呢?

​ 线程池可以完成上面Semaphore的功能来指定并发数量

​ 主线程中可以获取某一个线程的状态或者某一个任务的状态,以及返回值

​ 当一个线程完成的时候我们的主线程能够立即知道

​ futures可以让多线程和多进程编码接口一致

我们看下一个简单使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from concurrent.futures import ThreadPoolExecutor
import time


def get_html(times):
# 模拟爬取网页
time.sleep(times)
print("get pag {} success".format(times))
return times


# 实例化一个线程池 max_workers 表示一个线程池中最大的并发数
executor = ThreadPoolExecutor(max_workers=2)

# 将线程提交到线程池
# submit 方法是非阻塞的 提交到线程池之后主线程接着执行
# 方法返回的是一个 futures 类对象
task1 = executor.submit(get_html, (3))
task2 = executor.submit(get_html, (2))

# 使用futures对象的done方法可以判断任务是否执行完成
# 因为是非阻塞的这个输出是False
print(task1.done())
time.sleep(3)
# 当主线程等待3秒后 所有的子线程执行完毕 下面输出为True
print(task1.done())

# 输出
False
get pag 2 success
get pag 3 success
True

相比于done方法的非阻塞模式,result为阻塞模式,只有对于的线程执行完之后才会执行其他线程。

1
2
3
4
task1 = executor.submit(get_html, (3))
# 代码会阻塞在这里一直等到任务task1结束 然后打印线程的执行结果
print(task1.result())
task2 = executor.submit(get_html, (2))

上面代码运行一下,你会发现是等到task1执行完之后才开始继续往下执行。

我们可以使用cancel命令取消任务的执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
executor = ThreadPoolExecutor(max_workers=2)
task1 = executor.submit(get_html, (3))
task2 = executor.submit(get_html, (2))
task2.cancel()
# 输出
get pag 2 success
get pag 3 success

executor = ThreadPoolExecutor(max_workers=1)
task1 = executor.submit(get_html, (3))
task2 = executor.submit(get_html, (2))
# 只能在线程没有开始之前进行取消
task2.cancel()

# 输出
get pag 3 success

cancel只能取消未开始的线程,已经开始的是无法取消的

我们将可并行的线程池设为1,这样就能实现了cancel

上面是用穷举法给每个线程都命名了 我们可以不用这种方法 获得已经成功的task的返回

我们可以使用as_completed获得已经成功的返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
urls = [3,2,4]
all_task = [executor.submit(get_html, (url)) for url in urls]

for future in as_completed(all_task):
data = future.result()
print("get {} page".format(data))

# 输出结果
get pag 2 success
get 2 page
get pag 3 success
get 3 page
get pag 4 success
get 4 page

上面的代码在for循环中会逐步将已经成功的给打印出来。

我们看下源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def as_completed(fs, timeout=None):
"""An iterator over the given futures that yields each as it completes.

Args:
fs: The sequence of Futures (possibly created by different Executors) to
iterate over.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.

Returns:
An iterator that yields the given Futures as they complete (finished or
cancelled). If any given Futures are duplicated, they will be returned
once.

Raises:
TimeoutError: If the entire result iterator could not be generated
before the given timeout.
"""
if timeout is not None:
end_time = timeout + time.time()

fs = set(fs)
with _AcquireFutures(fs):
finished = set(
f for f in fs
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
pending = fs - finished
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)

try:
yield from finished

while pending:
if timeout is None:
wait_timeout = None
else:
wait_timeout = end_time - time.time()
if wait_timeout < 0:
raise TimeoutError(
'%d (of %d) futures unfinished' % (
len(pending), len(fs)))

waiter.event.wait(wait_timeout)

with waiter.lock:
finished = waiter.finished_futures
waiter.finished_futures = []
waiter.event.clear()

for future in finished:
yield future
pending.remove(future)

finally:
for f in fs:
with f._condition:
f._waiters.remove(waiter)

在主线程运行到for循环时会将已经成功的打印出来,对于没有成功的会等待,在成功后返回。

我们也可以使用future的map函数

1
2
3
4
5
6
7
8
9
10
11
for data in executor.map(get_html, urls):
# 这里输出的直接是结果 而不用再调用result
print("get {} page".format(data))

# 输出
get pag 2 success
get pag 3 success
get 3 page
get 2 page
get pag 4 success
get 4 page

map函数和Python的map函数类似,将参数一一作用在函数上。

我们看下输出结果,是不是很奇怪?

在使用map函数的时候,输出并不是按照执行完成的先后顺序,而是按照参数传入的先后顺序。

我们可以使用wait方法阻塞主线程,等待指定的线程执行完毕之后再继续执行。

1
2
3
4
5
executor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4]
all_task = [executor.submit(get_html, (url)) for url in urls]
wait(all_task)
print("main")

wait也可以输出参数,我们看下源码参数有哪些

1
2
3
4
5
6
FIRST_COMPLETED = 'FIRST_COMPLETED' # 第一个执行完成
FIRST_EXCEPTION = 'FIRST_EXCEPTION' # 第一异常
ALL_COMPLETED = 'ALL_COMPLETED' # 所有执行完毕

# 等待第一个执行完毕就不再阻塞
wait(all_task, return_when=FIRST_COMPLETED)
ThreadPoolExecutor源码分析
1
2
from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED
from concurrent.futures import Future

Future可以理解为未来对象,因为里面包含了将要执行的线程的这种状态。这里老师理解的是一种容器,线程返回容器,在某些状态会改变容器中的值,下面我们看看何时改变。

看下源码

1
2
3
4
5
6
7
8
9
10
11
12
13
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')

# 重点看下面两个
f = _base.Future()
# _WorkItem 是线程池的执行单元
w = _WorkItem(f, fn, args, kwargs)

self._work_queue.put(w)
self._adjust_thread_count()
return f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs

def run(self):
if not self.future.set_running_or_notify_cancel():
return

try:
result = self.fn(*self.args, **self.kwargs)
except BaseException as e:
self.future.set_exception(e)
else:
self.future.set_result(result)

我们看到正是在_WorkItem中进行了函数的执行,结果设置到Future 判断等一系列操作。

多线程和多进程的对比

对于多IO操作来说一般使用多线程,对于耗CPU的操作一般使用多进程,进程切换的代价要多于线程之间的切换

我们看下两个对比

对于计算是耗CPU的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import time
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor


def fib(n):
if n <= 2:
return 1
return fib(n-1) + fib(n-2)


if __name__ == "__main__":
# 这里改成多进程
with ThreadPoolExecutor(3) as executor:
all_task = [executor.submit(fib, (num)) for num in range(25,40)]
start_time = time.time()
for future in as_completed(all_task):
data = future.result()
print("exe result: {}".format(data))

print("last time is: {}".format(time.time()-start_time))

在数值较小的情况下,输出结果并不是成倍的差异,这是因为进程之间切换也是耗时操作。

下面是IO操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def random_sleep(n):
# 使用 sleep 模拟IO操作
time.sleep(n)
return n


if __name__ == "__main__":
# 这里改成多进程
with ThreadPoolExecutor(3) as executor:
all_task = [executor.submit(random_sleep, (num)) for num in [2] * 30]
start_time = time.time()
for future in as_completed(all_task):
data = future.result()
print("exe result: {}".format(data))

print("last time is: {}".format(time.time() - start_time))

子进程的执行逻辑

1
2
3
4
5
6
7
8
9
10
import os
#fork只能用于linux/unix中
pid = os.fork()
print("bobby")
if pid == 0:
print('子进程 {} ,父进程是: {}.' .format(os.getpid(), os.getppid()))
else:
print('我是父进程:{}.'.format(pid))

time.sleep(3)

上面的代码执行到os.fork()的时候将fork一个子进程 但是对当前进程还是要往下执行

子进程将会把父进程的所有数据重新拷贝一份到子进程中(所有的数据包括程序运行)

所以进程间的数据是完全隔离的,之前线程中通过全局变量的通信方式将不再适用

因为进程之间数据是完全隔离的,每个进程之间都有自己的数据

子进程将会把fork之后的代码再运行一遍

适用 sleep会在父进程执行完毕之后 把子进程一起关闭

multiprocessing多进程编程

我们使用multiprocessing进程都线程编程,接口很多和多线程类似

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import time
import multiprocessing


def get_html(n):
time.sleep(n)
print("sub_progress")
return n


if __name__ == '__main__':
progress = multiprocessing.Process(target=get_html, args=(2, ))
# 在 start 是没有进程ID的
print(progress.pid)
progress.start()
# 可以获得进程ID
print(progress.pid)
progress.join()

print("main progress end")

# 输出结果
None
12898
sub_progress
main progress end

除了上面的实现方式,我们一样可以使用继承

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class MyProcess(multiprocessing.Process):

def __init__(self, time, name):
self.time = time
super().__init__(name=name)

def run(self):
time.sleep(self.time)
print("sub_progress")
return self.time


if __name__ == '__main__':
progress = MyProcess(time=2, name="sub")

print(progress.pid)
progress.start()
print(progress.pid)
progress.join()
print("main progress end")

使用线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
pool = multiprocessing.Pool(multiprocessing.cpu_count())
# 异步执行进程
result = pool.apply_async(get_html, args=(3,))


# 必须先调用close 不再接收任务
pool.close()

# 等待所有任务完成
# 等待进程结束
pool.join()

print(result.get())

使用imap进行多进程

1
2
3
4
5
6
7
8
9
10
11
pool = multiprocessing.Pool(multiprocessing.cpu_count())
for result in pool.imap(get_html, [1,5,3]):
print("{} sleep success".format(result))

# 输出
sub_progress success
1 sleep success
sub_progress success
sub_progress success
5 sleep success
3 sleep success

这个和上面我们学习的future的map函数类似,按照参数传入的顺序返回

使用imap_unordered进程多进程编程

1
2
3
pool = multiprocessing.Pool(multiprocessing.cpu_count())
for result in pool.imap_unordered(get_html, [1,5,3]):
print("{} sleep success".format(result))

上面的输出就是乱序的了,谁先完成就先输出谁

进程间的通信方式Queue, Pipe, Manager

这节我们学习进程间的通信方式

共享全局变量适合多线程之间通信但是不适合多进程之间通信,因为进程之间资源不共享,是隔离状态

使用Queue进程通信

这里的Queue不再是 from queue import Queue 这个Queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from multiprocessing import Queue, Process
import time


def producer(queue):
queue.put("a")
time.sleep(2)


def consumer(queue):
time.sleep(2)
data = queue.get()
print(data)


if __name__ == "__main__":
queue = Queue(10)
my_producer = Process(target=producer, args=(queue,))
my_consumer = Process(target=consumer, args=(queue,))
my_producer.start()
my_consumer.start()
my_producer.join()
my_consumer.join()

# 输出
a

注意:multiprocessing中的Queue是不能用 multiprocessing 的Pool线程池中 进程间通信的

Pool中的进程间通信需要使用manager中的queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from multiprocessing import Queue, Process, Pool
import time


def producer(queue):
queue.put("a")
time.sleep(2)


def consumer(queue):
time.sleep(2)
data = queue.get()
print(data)


if __name__ == "__main__":
pool = Pool(2)
queue = Queue(10)
pool.apply_async(producer, args=(queue,))
pool.apply_async(consumer, args=(queue,))

pool.close()
pool.join()

上面的输出为空

那么我们怎么在进程池中通信呢?

我们需要使用Manager进行实例化,然后使用里面的Queue

1
queue = Manager().Queue(10)

这样就能正常通信了

现在我们了解到了有三个Queue

1
2
3
4
from queue import Queue		#用于多线程
from multiprocessing import Queue # 用于多进程
from multiprocessing import Manager
Manager().Queue() # 用于进程池
使用Pipe进行进程通信

Pipe只能适用于两个进程间的通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from multiprocessing import Queue, Process, Pool, Manager, Pipe


def producer(pipe):
pipe.send("hongshaorou")


def consumer(pipe):
print(pipe.recv())


if __name__ == "__main__":
# 一个用于发送数据, 一个用于接收数据
recevie_pipe, send_pipe = Pipe()
# pipe 只能适用于两个进程
my_producer = Process(target=producer, args=(send_pipe,))
my_consumer = Process(target=consumer, args=(recevie_pipe,))

my_producer.start()
my_consumer.start()
my_producer.join()
my_consumer.join()

Pipe的性能是高于Queue,因为内部少了很多的锁机制。因此只有两个进程的时候,我们尽量使用Pipe进行通信。

其他通信方式

我们能不能像多线程那样,在多进程之间也维护一个公共内存块或者公共变量呢?

使用Manager里面的数据结构进行内存共享

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def add_data(p_dict, key, value):
p_dict[key] = value

if __name__ == "__main__":
progress_dict = Manager().dict()


first_progress = Process(target=add_data, args=(progress_dict, "hongshaorou1", 22))
second_progress = Process(target=add_data, args=(progress_dict, "hongshaorou2", 23))

first_progress.start()
second_progress.start()
first_progress.join()
second_progress.join()

print(progress_dict)

# 结果
{'hongshaorou1': 22, 'hongshaorou2': 23}

我们看下还有哪些数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class SyncManager(multiprocessing.managers.BaseManager):
def Barrier(self, parties, action=None, timeout=None):
return threading.Barrier(parties, action, timeout)

def BoundedSemaphore(self, value=None):
return threading.BoundedSemaphore(value)

def Condition(self, lock=None):
return threading.Condition(lock)

def Event(self):
return threading.Event()

def Lock(self):
return threading.Lock()

def Namespace(self):
pass

def Queue(self, maxsize=None):
return queue.Queue()

def RLock(self):
return threading.RLock()

def Semaphore(self, value=None):
return threading.Semaphore(value)

def Array(self, typecode, sequence):
pass

def Value(self, typecode, value):
pass

def dict(self, mapping_or_sequence):
pass

def list(self, sequence):
pass
知识就是财富
如果您觉得文章对您有帮助, 欢迎请我喝杯水!