Asyncio中的子进程模块subprocess

asyncio提供了通过 async/await 创建和管理子进程的API。不同于Python标准库的subprocessasyncio的子进程函数都是异步的,并且提供了多种工具来处理这些函数,这就很容易并行执行和监视多个子进程。

创建子进程的方法主要有两个:

coroutine asyncio.create_subprocess_exec()

coroutine asyncio.create_subprocess_shell()

我们看下这两个函数的源码:

1
2
3
4
5
6
7
8
9
10
11
12
async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
loop=None, limit=streams._DEFAULT_LIMIT,
**kwds):
if loop is None:
loop = events.get_event_loop()
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
loop=loop)
transport, protocol = await loop.subprocess_shell(
protocol_factory,
cmd, stdin=stdin, stdout=stdout,
stderr=stderr, **kwds)
return Process(transport, protocol, loop)

create_subprocess_shell 函数以shell的方式执行子进程。需要执行的命令参数以字符串的方式输入到cmd中。这种方式执行子进程的话会开启两个子进程(一个是shell子进程,一个是真正的子进程)。我在Docker容器中遇见过杀子进程的时候,真正的子进程成功被杀死但是对应的shell子进程未被杀死的情况。建议使用create_subprocess_exec

1
2
3
4
5
6
7
8
9
10
11
12
13
async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
stderr=None, loop=None,
limit=streams._DEFAULT_LIMIT, **kwds):
if loop is None:
loop = events.get_event_loop()
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
loop=loop)
transport, protocol = await loop.subprocess_exec(
protocol_factory,
program, *args,
stdin=stdin, stdout=stdout,
stderr=stderr, **kwds)
return Process(transport, protocol, loop)

create_subprocess_exec函数的执行特别想我们在shell窗口输入命令。需要执行的程序为program,其他的子命令以位置参数的方式传进去。最终生成的子进程也只有一个真正的子进程。

注意一点:使用这种方式的时候,参数一定不能包含引号否则子进程执行不成功

实现一个子进程类 来管理子进程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class SubProcess():

@classmethod
def create(cls, program, *args, **kwargs):
"""直接使用该函数进行启动子进程"""

process = cls(program, *args, **kwargs)
process.start()

return process

def __init__(self, program, *args, **kwargs):
"""需要执行的程序 参数 和标准输入输出"""

self._program = program
self._args = args
self._kwargs = kwargs

self._stdin = kwargs.pop(r'stdin') if r'stdin' in kwargs else None
self._stdout = kwargs.pop(r'stdout') if r'stdout' in kwargs else asyncio.subprocess.PIPE
self._stderr = kwargs.pop(r'stderr') if r'stderr' in kwargs else asyncio.subprocess.PIPE

self._process = None
self._process_id = None

@property
def stdin(self):

return self._stdin

@property
def stdout(self):

return self._stdout

@property
def stderr(self):

return self._stderr

def is_running(self):
"""判断当前子进程是都正在执行"""

return self._process is not None and self._process.returncode is None

async def start(self):
"""启动子进程"""

if self._process is not None:
return False

self._process = await asyncio.create_subprocess_exec(
self._program, *self._args,
stdin=self._stdin,
stdout=self._stdout,
stderr=self._stderr,
**self._kwargs
)

self._process_id = self._process.pid

return True

def stop(self):
"""停止子进程"""

if self._process is None:
return False

self._process.kill()

return True

async def wait(self, timeout=None):
"""等待子进程执行完毕 或者设置超时时间"""

try:

await asyncio.wait_for(self._process.wait(), timeout=timeout)

except Exception as err:

self._process.kill()

上面我们实现了一个具有超时关闭功能的子进程类。

超时关闭功能的实现是通过wait_for函数来实现的,我们看下源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
async def wait_for(fut, timeout, *, loop=None):
"""Wait for the single Future or coroutine to complete, with timeout.

Coroutine will be wrapped in Task.

Returns result of the Future or coroutine. When a timeout occurs,
it cancels the task and raises TimeoutError. To avoid the task
cancellation, wrap it in shield().

If the wait is cancelled, the task is also cancelled.

This function is a coroutine.
"""
if loop is None:
loop = events.get_event_loop()

if timeout is None:
return await fut

if timeout <= 0:
fut = ensure_future(fut, loop=loop)

if fut.done():
return fut.result()

fut.cancel()
raise futures.TimeoutError()

waiter = loop.create_future()
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
cb = functools.partial(_release_waiter, waiter)

fut = ensure_future(fut, loop=loop)
fut.add_done_callback(cb)

try:
# wait until the future completes or the timeout
try:
await waiter
except futures.CancelledError:
fut.remove_done_callback(cb)
fut.cancel()
raise

if fut.done():
return fut.result()
else:
fut.remove_done_callback(cb)
# We must ensure that the task is not running
# after wait_for() returns.
# See https://bugs.python.org/issue32751
await _cancel_and_wait(fut, loop=loop)
raise futures.TimeoutError()
finally:
timeout_handle.cancel()

如果我们没有设置timeout则会一直等待协程结束,当我们设置了之后就会在到期之后将协程结束调。

主要是通过timeout_handle = loop.call_later(timeout, _release_waiter, waiter)这块代码段实现。

我们看下具体call_later执行的内容:

1
2
3
def _release_waiter(waiter, *args):
if not waiter.done():
waiter.set_result(None)

上面函数就是把我们正在执行的协程停止,我们看看set_result函数里面做了什么:

1
2
3
4
5
6
7
8
9
10
11
def set_result(self, result):
"""Mark the future done and set its result.

If the future is already done when this method is called, raises
InvalidStateError.
"""
if self._state != _PENDING:
raise InvalidStateError('{}: {!r}'.format(self._state, self))
self._result = result
self._state = _FINISHED
self.__schedule_callbacks()
一个注意点

当我们使用子进程执行一个耗时较久的命令时候 可能会出现子进程卡住的现象。这是因为子进程产生一些数据,他们会被buffer起来,当buffer满了,会写到子进程的标准输出和标准错误输出,这些东西通过管道发送给父进程。当管道满了之后,子进程就停止写入,于是就卡住了,及时取走管道的输出就不会出现阻塞了。或者我们并不需要子进程的输出信息 在创建 子进程的时候

1
(program, *args, stdin=None, stdout=None,stderr=None,loop=None,limit=streams._DEFAULT_LIMIT, **kwds)

可以将stdin stdout stderr 设置成asyncio.subprocess.DEVNULL 这样直接不再产生到标准输出的东西,管道就不会满了。

推荐文章:

使用subprocess模块调用子进程并获取输出)

subprocess.Popen子进程管道阻塞

python subprocess模块

知识就是财富
如果您觉得文章对您有帮助, 欢迎请我喝杯水!