使用APScheduler报RuntimeError:There is no current event loop in thread

最近在使用APScheduler的时候发生了一个报错:RuntimeError:There is no current event loop in thread。仔细研究和查阅之后发现是调用方式有问题。

看下别人的问答:RuntimeError: There is no current event loop in thread in async + apscheduler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
URL_LIST = ['<url1>',
'<url2>',
'<url2>',
]

def demo_async(urls):
"""Fetch list of web pages asynchronously."""
loop = asyncio.get_event_loop() # event loop
future = asyncio.ensure_future(fetch_all(urls)) # tasks to do
loop.run_until_complete(future) # loop until done

async def fetch_all(urls):
tasks = [] # dictionary of start times for each url
async with ClientSession() as session:
for url in urls:
task = asyncio.ensure_future(fetch(url, session))
tasks.append(task) # create list of tasks
_ = await asyncio.gather(*tasks) # gather task responses

async def fetch(url, session):
"""Fetch a url, using specified ClientSession."""
async with session.get(url) as response:
resp = await response.read()
print(resp)

if __name__ == '__main__':
scheduler = AsyncIOScheduler()
scheduler.add_job(demo_async, args=[URL_LIST], trigger='interval', seconds=15)
scheduler.start()
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

# Execution will block here until Ctrl+C (Ctrl+Break on Windows) is pressed.
try:
asyncio.get_event_loop().run_forever()
except (KeyboardInterrupt, SystemExit):
pass

因为我们是和asyncio一起使用的,我们看下AsyncIOExecutor的注释:

1
2
3
4
5
6
7
8
9
10
class AsyncIOExecutor(BaseExecutor):
"""
Runs jobs in the default executor of the event loop.

If the job function is a native coroutine function, it is scheduled to be run directly in the
event loop as soon as possible. All other functions are run in the event loop's default
executor which is usually a thread pool.

Plugin alias: ``asyncio``
"""

即当我们添加job的时候,如果是协程函数将会在主线程中执行(主线程中存在loop),而当我们的job不是

协程函数的时候,将会使用线程池或者进程池去处理(新的线程或者进程中是没有loop的)。

解决办法:如果你添加的job必须是同步函数,而同步函数中需要使用event_loop那么你需要按照下面方式重新设置一下新的event_loop

1
2
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

根本原因在源码中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def get_event_loop(self):
"""Get the event loop.
This may be None or an instance of EventLoop.
"""
if (self._local._loop is None and
not self._local._set_called and
isinstance(threading.current_thread(), threading._MainThread)):
self.set_event_loop(self.new_event_loop())

if self._local._loop is None:
raise RuntimeError('There is no current event loop in thread %r.'
% threading.current_thread().name)

return self._local._loop

在主线程中,调用get_event_loop总能返回属于主线程的event loop对象,如果是处于非主线程中,还需要调用set_event_loop方法指定一个event loop对象,这样get_event_loop才会获取到被标记的event loop对象:

1
2
3
4
5
def set_event_loop(self, loop):
"""Set the event loop."""
self._local._set_called = True
assert loop is None or isinstance(loop, AbstractEventLoop)
self._local._loop = loop

推荐阅读:

Python定时任务工具–APScheduler

知识就是财富
如果您觉得文章对您有帮助, 欢迎请我喝杯水!