Skip to content

Commit 9818e31

Browse files
committed
_internal(feat[async_subprocess]): Add async subprocess wrapper
why: Foundation for asyncio support - async equivalent of SubprocessCommand what: - Add AsyncSubprocessCommand dataclass with run(), check_output(), wait() methods - Add AsyncCompletedProcess result type mirroring subprocess.CompletedProcess - Use asyncio.create_subprocess_exec for secure non-shell execution - Use asyncio.wait_for for timeout handling (Python 3.10+ compatible) - Configure pytest-asyncio strict mode in pyproject.toml - Add comprehensive tests with 100% coverage (22 tests, all passing)
1 parent 9cd9087 commit 9818e31

3 files changed

Lines changed: 623 additions & 0 deletions

File tree

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,8 @@ testpaths = [
236236
filterwarnings = [
237237
"ignore:The frontend.Option(Parser)? class.*:DeprecationWarning::",
238238
]
239+
asyncio_mode = "strict"
240+
asyncio_default_fixture_loop_scope = "function"
239241

240242
[tool.pytest-watcher]
241243
now = true
Lines changed: 389 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,389 @@
1+
# ruff: noqa: A002
2+
r"""Async invocable :mod:`subprocess` wrapper.
3+
4+
Async equivalent of :mod:`libvcs._internal.subprocess`.
5+
6+
Note
7+
----
8+
This is an internal API not covered by versioning policy.
9+
10+
Examples
11+
--------
12+
- :class:`~AsyncSubprocessCommand`: Async wrapper for subprocess execution.
13+
14+
Before (sync):
15+
16+
>>> import subprocess
17+
>>> subprocess.run(
18+
... ['echo', 'hi'],
19+
... capture_output=True, text=True
20+
... ).stdout
21+
'hi\n'
22+
23+
With this (async):
24+
25+
>>> import asyncio
26+
>>> async def example():
27+
... cmd = AsyncSubprocessCommand(['echo', 'hi'])
28+
... result = await cmd.run()
29+
... return result.stdout
30+
>>> # asyncio.run(example()) # Returns 'hi\n'
31+
"""
32+
33+
from __future__ import annotations
34+
35+
import asyncio
36+
import asyncio.subprocess
37+
import dataclasses
38+
import subprocess
39+
import typing as t
40+
from collections.abc import Mapping, Sequence
41+
42+
from libvcs._internal.types import StrOrBytesPath
43+
44+
from .dataclasses import SkipDefaultFieldsReprMixin
45+
46+
#: Command type alias
47+
_CMD: t.TypeAlias = StrOrBytesPath | Sequence[StrOrBytesPath]
48+
49+
#: Environment type alias
50+
_ENV: t.TypeAlias = Mapping[str, str]
51+
52+
53+
@dataclasses.dataclass
54+
class AsyncCompletedProcess(t.Generic[t.AnyStr]):
55+
"""Result of an async subprocess execution.
56+
57+
Mirrors :class:`subprocess.CompletedProcess` for async context.
58+
59+
Parameters
60+
----------
61+
args : list[str]
62+
The command arguments
63+
returncode : int
64+
Exit code of the process
65+
stdout : str | bytes | None
66+
Captured stdout, if any
67+
stderr : str | bytes | None
68+
Captured stderr, if any
69+
"""
70+
71+
args: list[str]
72+
returncode: int
73+
stdout: t.AnyStr | None = None
74+
stderr: t.AnyStr | None = None
75+
76+
def check_returncode(self) -> None:
77+
"""Raise CalledProcessError if returncode is non-zero.
78+
79+
Raises
80+
------
81+
subprocess.CalledProcessError
82+
If the process exited with a non-zero code
83+
"""
84+
if self.returncode != 0:
85+
raise subprocess.CalledProcessError(
86+
self.returncode,
87+
self.args,
88+
self.stdout,
89+
self.stderr,
90+
)
91+
92+
93+
@dataclasses.dataclass(repr=False)
94+
class AsyncSubprocessCommand(SkipDefaultFieldsReprMixin):
95+
r"""Async subprocess command wrapper.
96+
97+
Wraps asyncio subprocess execution in a dataclass for inspection
98+
and mutation before invocation.
99+
100+
Parameters
101+
----------
102+
args : list[str]
103+
Command and arguments to run
104+
cwd : str | Path, optional
105+
Working directory for the command
106+
env : dict[str, str], optional
107+
Environment variables for the command
108+
109+
Examples
110+
--------
111+
>>> import asyncio
112+
>>> async def example():
113+
... cmd = AsyncSubprocessCommand(['echo', 'hello'])
114+
... result = await cmd.run()
115+
... return result.stdout
116+
>>> # asyncio.run(example()) # Returns 'hello\n'
117+
118+
Modify before running:
119+
120+
>>> cmd = AsyncSubprocessCommand(['echo', 'hi'])
121+
>>> cmd.args
122+
['echo', 'hi']
123+
>>> cmd.args[1] = 'hello'
124+
>>> cmd.args
125+
['echo', 'hello']
126+
"""
127+
128+
args: _CMD
129+
cwd: StrOrBytesPath | None = None
130+
env: _ENV | None = None
131+
132+
# Limits for stdout/stderr
133+
limit: int = 2**16 # 64 KiB default buffer
134+
135+
def _args_as_list(self) -> list[str]:
136+
"""Convert args to list of strings for asyncio."""
137+
from os import PathLike
138+
139+
args = self.args
140+
if isinstance(args, (str, bytes, PathLike)):
141+
# Single command (str, bytes, or PathLike)
142+
return [str(args) if not isinstance(args, bytes) else args.decode()]
143+
# At this point, args is Sequence[StrOrBytesPath]
144+
return [
145+
str(arg) if not isinstance(arg, bytes) else arg.decode() for arg in args
146+
]
147+
148+
async def _create_process(
149+
self,
150+
*,
151+
stdin: int | None = None,
152+
stdout: int | None = None,
153+
stderr: int | None = None,
154+
) -> asyncio.subprocess.Process:
155+
"""Create an async subprocess.
156+
157+
Uses asyncio.create_subprocess_exec for secure, non-shell execution.
158+
"""
159+
args_list = self._args_as_list()
160+
# Use asyncio's subprocess creation (non-shell variant for security)
161+
return await asyncio.subprocess.create_subprocess_exec(
162+
*args_list,
163+
stdin=stdin,
164+
stdout=stdout,
165+
stderr=stderr,
166+
cwd=self.cwd,
167+
env=self.env,
168+
limit=self.limit,
169+
)
170+
171+
@t.overload
172+
async def run(
173+
self,
174+
*,
175+
check: bool = ...,
176+
timeout: float | None = ...,
177+
input: bytes | None = ...,
178+
text: t.Literal[False] = ...,
179+
) -> AsyncCompletedProcess[bytes]: ...
180+
181+
@t.overload
182+
async def run(
183+
self,
184+
*,
185+
check: bool = ...,
186+
timeout: float | None = ...,
187+
input: str | None = ...,
188+
text: t.Literal[True],
189+
) -> AsyncCompletedProcess[str]: ...
190+
191+
@t.overload
192+
async def run(
193+
self,
194+
*,
195+
check: bool = ...,
196+
timeout: float | None = ...,
197+
input: str | bytes | None = ...,
198+
text: bool = ...,
199+
) -> AsyncCompletedProcess[t.Any]: ...
200+
201+
async def run(
202+
self,
203+
*,
204+
check: bool = False,
205+
timeout: float | None = None,
206+
input: str | bytes | None = None,
207+
text: bool = False,
208+
) -> AsyncCompletedProcess[t.Any]:
209+
r"""Run command asynchronously and return completed process.
210+
211+
Uses asyncio subprocess APIs for non-blocking operation.
212+
213+
Parameters
214+
----------
215+
check : bool, default False
216+
If True, raise CalledProcessError on non-zero exit
217+
timeout : float, optional
218+
Timeout in seconds. Raises asyncio.TimeoutError if exceeded.
219+
input : str | bytes, optional
220+
Data to send to stdin
221+
text : bool, default False
222+
If True, decode stdout/stderr as text
223+
224+
Returns
225+
-------
226+
AsyncCompletedProcess
227+
Result with args, returncode, stdout, stderr
228+
229+
Raises
230+
------
231+
subprocess.CalledProcessError
232+
If check=True and process exits with non-zero code
233+
asyncio.TimeoutError
234+
If timeout is exceeded
235+
236+
Examples
237+
--------
238+
>>> import asyncio
239+
>>> async def example():
240+
... cmd = AsyncSubprocessCommand(['echo', 'hello'])
241+
... result = await cmd.run(text=True)
242+
... return result.stdout.strip()
243+
>>> # asyncio.run(example()) # Returns 'hello'
244+
"""
245+
args_list = self._args_as_list()
246+
247+
# Prepare input as bytes
248+
input_bytes: bytes | None = None
249+
if input is not None:
250+
input_bytes = input.encode() if isinstance(input, str) else input
251+
252+
# Create subprocess
253+
proc = await self._create_process(
254+
stdin=asyncio.subprocess.PIPE if input_bytes else None,
255+
stdout=asyncio.subprocess.PIPE,
256+
stderr=asyncio.subprocess.PIPE,
257+
)
258+
259+
try:
260+
# Use communicate() with optional timeout via wait_for
261+
if timeout is not None:
262+
stdout_bytes, stderr_bytes = await asyncio.wait_for(
263+
proc.communicate(input_bytes),
264+
timeout=timeout,
265+
)
266+
else:
267+
stdout_bytes, stderr_bytes = await proc.communicate(input_bytes)
268+
except asyncio.TimeoutError:
269+
# Kill process on timeout
270+
proc.kill()
271+
await proc.wait()
272+
raise
273+
274+
# Get return code (should be set after communicate)
275+
returncode = proc.returncode
276+
assert returncode is not None, "returncode should be set after communicate()"
277+
278+
# Decode if text mode
279+
stdout: str | bytes | None = stdout_bytes
280+
stderr: str | bytes | None = stderr_bytes
281+
if text:
282+
stdout = stdout_bytes.decode() if stdout_bytes else ""
283+
stderr = stderr_bytes.decode() if stderr_bytes else ""
284+
285+
result: AsyncCompletedProcess[t.Any] = AsyncCompletedProcess(
286+
args=args_list,
287+
returncode=returncode,
288+
stdout=stdout,
289+
stderr=stderr,
290+
)
291+
292+
if check:
293+
result.check_returncode()
294+
295+
return result
296+
297+
async def check_output(
298+
self,
299+
*,
300+
timeout: float | None = None,
301+
input: str | bytes | None = None,
302+
text: bool = False,
303+
) -> str | bytes:
304+
r"""Run command and return stdout, raising on non-zero exit.
305+
306+
Parameters
307+
----------
308+
timeout : float, optional
309+
Timeout in seconds
310+
input : str | bytes, optional
311+
Data to send to stdin
312+
text : bool, default False
313+
If True, return stdout as text
314+
315+
Returns
316+
-------
317+
str | bytes
318+
Command stdout
319+
320+
Raises
321+
------
322+
subprocess.CalledProcessError
323+
If process exits with non-zero code
324+
asyncio.TimeoutError
325+
If timeout is exceeded
326+
327+
Examples
328+
--------
329+
>>> import asyncio
330+
>>> async def example():
331+
... cmd = AsyncSubprocessCommand(['echo', 'hello'])
332+
... return await cmd.check_output(text=True)
333+
>>> # asyncio.run(example()) # Returns 'hello\n'
334+
"""
335+
result = await self.run(check=True, timeout=timeout, input=input, text=text)
336+
return result.stdout if result.stdout is not None else (b"" if not text else "")
337+
338+
async def wait(
339+
self,
340+
*,
341+
timeout: float | None = None,
342+
) -> int:
343+
"""Run command and return exit code.
344+
345+
Discards stdout/stderr.
346+
347+
Parameters
348+
----------
349+
timeout : float, optional
350+
Timeout in seconds
351+
352+
Returns
353+
-------
354+
int
355+
Process exit code
356+
357+
Raises
358+
------
359+
asyncio.TimeoutError
360+
If timeout is exceeded
361+
362+
Examples
363+
--------
364+
>>> import asyncio
365+
>>> async def example():
366+
... cmd = AsyncSubprocessCommand(['true'])
367+
... return await cmd.wait()
368+
>>> # asyncio.run(example()) # Returns 0
369+
"""
370+
proc = await self._create_process(
371+
stdin=asyncio.subprocess.DEVNULL,
372+
stdout=asyncio.subprocess.DEVNULL,
373+
stderr=asyncio.subprocess.DEVNULL,
374+
)
375+
376+
try:
377+
if timeout is not None:
378+
returncode = await asyncio.wait_for(
379+
proc.wait(),
380+
timeout=timeout,
381+
)
382+
else:
383+
returncode = await proc.wait()
384+
except asyncio.TimeoutError:
385+
proc.kill()
386+
await proc.wait()
387+
raise
388+
389+
return returncode

0 commit comments

Comments
 (0)