今天看小明大神的博客:深入理解asyncio(三) 里面有段将同步函数改为协程使用的代码。其中提到了run_in_executor
,主要使用这个方法将同步变为异步。
我们先看下如何将一个同步函数变为异步的
1 | In [35]: import time |
上面使用run_in_executor
可以将同步函数a
以协程的方式执行,我们看下源码
1 | def run_in_executor(self, executor, func, *args): |
我们看到 当没有设置 executor
的时候 会默认使用concurrent.futures.ThreadPoolExecutor()
那我们自己设置一下试试看。
1 | In [40]: from concurrent.futures import ThreadPoolExecutor |
正确输出
还有其他方法实现吗?
我们看到 run_in_executor
的源码进行了loop
是否关闭的校验和是否是debug
的判断以及executor
验空和赋值。假设我们不去做这些操作的话,直接使用ThreadPoolExecutor
是否可以呢?
1 | In [1]: from concurrent.futures import ThreadPoolExecutor |
正确输出
上面的thread_executor.submit
返回的是一个future
对象,但是并不是一个符合asyncio
模块的future
是不可等待的,即无法调用await
去等待该对象。
代码中的wrap_future
是一个比较关键的函数,看下源码
1 | def wrap_future(future, *, loop=None): |
结合着使用到的_chain_future
源码一起看
1 | def _chain_future(source, destination): |
通过wrap_future
函数可以将concurrent.futures.Future
变成asyncio.Future
实现可等待。
这样我们可以不用显示的获取当前的loop
也可以直接去将同步函数变成协程去执行了。
装饰器模式 将同步函数变为异步方式
1 | import asyncio |
其中的 max_workers
参数就是能够执行的最大线程数。
再深入看下源码
上面我们知道可直接使用ThreadPoolExecutor
的submit
方法获取到future
对象。通过源码来分析下具体的流程。
1 | class ThreadPoolExecutor(_base.Executor): |
我们看到在submit
函数里面主要是生成了一个concurrent.futures
里的 Future
对象。然后一个_WorkItem
实例。接着将生成的_WorkItem
实例放到了一个队列里面,然后执行self._adjust_thread_count()
函数。这个_WorkItem
实例是什么呢?self._adjust_thread_count()
函数里面是什么呢?我们看下源码。
1 | class _WorkItem(object): |
我们发现在_WorkItem
类中的run
方法执行了真正的同步函数 并将执行结果或者异常放到了之前生成的future
对象中。
那这个run
方法什么时候真正执行呢?我们返回看self._adjust_thread_count()
的源码:
1 | def _adjust_thread_count(self): |
我们看到在_adjust_thread_count
方法中生成了一个线程并且去执行了,执行的函数是_worker
。这个_worker
是什么呢?
1 | def _worker(executor_reference, work_queue, initializer, initargs): |
我们看到在这个函数中会获取到之前放到队列里面的_WorkItem
的实例。然后执行_WorkItem
里面的run
方法。
这样我们整个过程就完整了,成功将一个同步函数变成异步方式执行。