书接上回我们继续学习Python中的多线程。
1 | # 优先队列 |
线程同步-Semaphore
Semaphore是控制线程进入数量的锁。
例子:对于一个文件来说,可以同时进行读和写,但是写一般只用于一个线程写,读可以允许多个,比如说我们指定十个线程进行读。
下面模拟一个爬虫,对线程数量进行控制。
一个线程用于抓取URL,用别的URL进行解析。
1 | import threading |
Semaphore
的大致原理是在每次线程执行前获得调用acquire
,这时维护的数字会减一,当线程结束之后会加一。这样就完成了指定数量的线程并发。
一定要注意是在主线程中获得,然后在执行的子线程中释放。
我们看下源码:
1 | class Semaphore: |
内部也是通过Condition
来实现的。
ThreadPoolExecutor线程池
我们为什么需要线程池呢?
线程池可以完成上面Semaphore
的功能来指定并发数量
主线程中可以获取某一个线程的状态或者某一个任务的状态,以及返回值
当一个线程完成的时候我们的主线程能够立即知道
futures可以让多线程和多进程编码接口一致
我们看下一个简单使用
1 | from concurrent.futures import ThreadPoolExecutor |
相比于done
方法的非阻塞模式,result
为阻塞模式,只有对于的线程执行完之后才会执行其他线程。
1 | task1 = executor.submit(get_html, (3)) |
上面代码运行一下,你会发现是等到task1
执行完之后才开始继续往下执行。
我们可以使用cancel
命令取消任务的执行
1 | executor = ThreadPoolExecutor(max_workers=2) |
cancel只能取消未开始的线程,已经开始的是无法取消的
我们将可并行的线程池设为1,这样就能实现了cancel
。
上面是用穷举法给每个线程都命名了 我们可以不用这种方法 获得已经成功的task的返回
我们可以使用as_completed
获得已经成功的返回
1 | urls = [3,2,4] |
上面的代码在for循环中会逐步将已经成功的给打印出来。
我们看下源码:
1 | def as_completed(fs, timeout=None): |
在主线程运行到for循环时会将已经成功的打印出来,对于没有成功的会等待,在成功后返回。
我们也可以使用future
的map函数
1 | for data in executor.map(get_html, urls): |
map函数和Python的map函数类似,将参数一一作用在函数上。
我们看下输出结果,是不是很奇怪?
在使用map函数的时候,输出并不是按照执行完成的先后顺序,而是按照参数传入的先后顺序。
我们可以使用wait方法阻塞主线程,等待指定的线程执行完毕之后再继续执行。
1 | executor = ThreadPoolExecutor(max_workers=2) |
wait也可以输出参数,我们看下源码参数有哪些
1 | FIRST_COMPLETED = 'FIRST_COMPLETED' # 第一个执行完成 |
ThreadPoolExecutor源码分析
1 | from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED |
Future
可以理解为未来对象,因为里面包含了将要执行的线程的这种状态。这里老师理解的是一种容器,线程返回容器,在某些状态会改变容器中的值,下面我们看看何时改变。
看下源码
1 | def submit(self, fn, *args, **kwargs): |
1 | class _WorkItem(object): |
我们看到正是在_WorkItem
中进行了函数的执行,结果设置到Future 判断等一系列操作。
多线程和多进程的对比
对于多IO操作来说一般使用多线程,对于耗CPU的操作一般使用多进程,进程切换的代价要多于线程之间的切换
我们看下两个对比
对于计算是耗CPU的操作
1 | import time |
在数值较小的情况下,输出结果并不是成倍的差异,这是因为进程之间切换也是耗时操作。
下面是IO操作
1 | def random_sleep(n): |
子进程的执行逻辑
1 | import os |
上面的代码执行到os.fork()
的时候将fork一个子进程 但是对当前进程还是要往下执行
子进程将会把父进程的所有数据重新拷贝一份到子进程中(所有的数据包括程序运行)
所以进程间的数据是完全隔离的,之前线程中通过全局变量的通信方式将不再适用
因为进程之间数据是完全隔离的,每个进程之间都有自己的数据
子进程将会把fork之后的代码再运行一遍
适用 sleep会在父进程执行完毕之后 把子进程一起关闭
multiprocessing多进程编程
我们使用multiprocessing
进程都线程编程,接口很多和多线程类似
1 | import time |
除了上面的实现方式,我们一样可以使用继承
1 | class MyProcess(multiprocessing.Process): |
使用线程池
1 | pool = multiprocessing.Pool(multiprocessing.cpu_count()) |
使用imap进行多进程
1 | pool = multiprocessing.Pool(multiprocessing.cpu_count()) |
这个和上面我们学习的future
的map函数类似,按照参数传入的顺序返回
使用imap_unordered进程多进程编程
1 | pool = multiprocessing.Pool(multiprocessing.cpu_count()) |
上面的输出就是乱序的了,谁先完成就先输出谁
进程间的通信方式Queue, Pipe, Manager
这节我们学习进程间的通信方式
共享全局变量适合多线程之间通信但是不适合多进程之间通信,因为进程之间资源不共享,是隔离状态
使用Queue进程通信
这里的Queue不再是 from queue import Queue 这个Queue
1 | from multiprocessing import Queue, Process |
注意:multiprocessing中的Queue是不能用 multiprocessing 的Pool线程池中 进程间通信的
Pool中的进程间通信需要使用manager中的queue
1 | from multiprocessing import Queue, Process, Pool |
上面的输出为空
那么我们怎么在进程池中通信呢?
我们需要使用Manager
进行实例化,然后使用里面的Queue
。
1 | queue = Manager().Queue(10) |
这样就能正常通信了
现在我们了解到了有三个Queue
1 | from queue import Queue #用于多线程 |
使用Pipe进行进程通信
Pipe只能适用于两个进程间的通信
1 | from multiprocessing import Queue, Process, Pool, Manager, Pipe |
Pipe的性能是高于Queue,因为内部少了很多的锁机制。因此只有两个进程的时候,我们尽量使用Pipe进行通信。
其他通信方式
我们能不能像多线程那样,在多进程之间也维护一个公共内存块或者公共变量呢?
使用Manager
里面的数据结构进行内存共享
1 | def add_data(p_dict, key, value): |
我们看下还有哪些数据结构
1 | class SyncManager(multiprocessing.managers.BaseManager): |