1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
|
"""Execute that runs on local file system via subprocess-es"""
from __future__ import annotations
import fnmatch
import io
import logging
import os
import shutil
import sys
from subprocess import DEVNULL, PIPE, TimeoutExpired
from types import TracebackType
from typing import TYPE_CHECKING, Any, Generator, Sequence
from tox.tox_env.errors import Fail
from ..api import Execute, ExecuteInstance, ExecuteOptions, ExecuteStatus
from ..request import ExecuteRequest, StdinSource
from ..stream import SyncWrite
from ..util import shebang
# mypy: warn-unused-ignores=false
if sys.platform == "win32": # explicit check for mypy # pragma: win32 cover
# needs stdin/stdout handlers backed by overlapped IO
if TYPE_CHECKING: # the typeshed libraries don't contain this, so replace it with normal one
from subprocess import Popen
else:
from asyncio.windows_utils import Popen
from signal import CTRL_C_EVENT as SIG_INTERRUPT
from signal import SIGTERM
from .read_via_thread_windows import ReadViaThreadWindows as ReadViaThread
else: # pragma: win32 no cover
from signal import SIGINT as SIG_INTERRUPT
from signal import SIGKILL, SIGTERM
from subprocess import Popen
from .read_via_thread_unix import ReadViaThreadUnix as ReadViaThread
IS_WIN = sys.platform == "win32"
class LocalSubProcessExecutor(Execute):
def build_instance(
self,
request: ExecuteRequest,
options: ExecuteOptions,
out: SyncWrite,
err: SyncWrite,
) -> ExecuteInstance:
return LocalSubProcessExecuteInstance(request, options, out, err)
class LocalSubprocessExecuteStatus(ExecuteStatus):
def __init__(self, options: ExecuteOptions, out: SyncWrite, err: SyncWrite, process: Popen[bytes]):
self._process: Popen[bytes] = process
super().__init__(options, out, err)
self._interrupted = False
@property
def exit_code(self) -> int | None:
return self._process.returncode
def interrupt(self) -> None:
self._interrupted = True
if self._process is not None: # pragma: no branch
# A three level stop mechanism for children - INT -> TERM -> KILL
# communicate will wait for the app to stop, and then drain the standard streams and close them
to_pid, host_pid = self._process.pid, os.getpid()
msg = "requested interrupt of %d from %d, activate in %.2f"
logging.warning(msg, to_pid, host_pid, self.options.suicide_timeout)
if self.wait(self.options.suicide_timeout) is None: # still alive -> INT
# on Windows everyone in the same process group, so they got the message
if sys.platform != "win32": # pragma: win32 cover
msg = "send signal %s to %d from %d with timeout %.2f"
logging.warning(msg, f"SIGINT({SIG_INTERRUPT})", to_pid, host_pid, self.options.interrupt_timeout)
self._process.send_signal(SIG_INTERRUPT)
if self.wait(self.options.interrupt_timeout) is None: # still alive -> TERM # pragma: no branch
terminate_output = self.options.terminate_timeout
logging.warning(msg, f"SIGTERM({SIGTERM})", to_pid, host_pid, terminate_output)
self._process.terminate()
# Windows terminate is UNIX kill
if sys.platform != "win32" and self.wait(terminate_output) is None: # pragma: no branch
logging.warning(msg[:-18], f"SIGKILL({SIGKILL})", to_pid, host_pid)
self._process.kill() # still alive -> KILL
self.wait() # unconditional wait as kill should soon bring down the process
logging.warning("interrupt finished with success")
else: # pragma: no cover # difficult to test, process must die just as it's being interrupted
logging.warning("process already dead with %s within %s", self._process.returncode, host_pid)
def wait(self, timeout: float | None = None) -> int | None:
try: # note wait in general might deadlock if output large, but we drain in background threads so not an issue
return self._process.wait(timeout=timeout)
except TimeoutExpired:
return None
def write_stdin(self, content: str) -> None:
stdin = self._process.stdin
if stdin is None: # pragma: no branch
return # pragma: no cover
bytes_content = content.encode()
try:
if sys.platform == "win32": # explicit check for mypy # pragma: win32 cover
# on Windows we have a PipeHandle object here rather than a file stream
import _overlapped # type: ignore[import]
ov = _overlapped.Overlapped(0)
ov.WriteFile(stdin.handle, bytes_content) # type: ignore[attr-defined]
result = ov.getresult(10) # wait up to 10ms to perform the operation
if result != len(bytes_content):
raise RuntimeError(f"failed to write to {stdin!r}")
else:
stdin.write(bytes_content)
stdin.flush()
except OSError: # pragma: no cover
if self._interrupted: # pragma: no cover
pass # pragma: no cover # if the process was asked to exit in the meantime ignore write errors
raise # pragma: no cover
def __repr__(self) -> str:
return f"{self.__class__.__name__}(pid={self._process.pid}, returncode={self._process.returncode!r})"
@property
def metadata(self) -> dict[str, Any]:
return {"pid": self._process.pid} if self._process.pid else {}
class LocalSubprocessExecuteFailedStatus(ExecuteStatus):
def __init__(self, options: ExecuteOptions, out: SyncWrite, err: SyncWrite, exit_code: int | None) -> None:
super().__init__(options, out, err)
self._exit_code = exit_code
@property
def exit_code(self) -> int | None:
return self._exit_code
def wait(self, timeout: float | None = None) -> int | None: # noqa: U100
return self._exit_code # pragma: no cover
def write_stdin(self, content: str) -> None: # noqa: U100
"""cannot write"""
def interrupt(self) -> None:
return None # pragma: no cover # nothing running so nothing to interrupt
class LocalSubProcessExecuteInstance(ExecuteInstance):
def __init__(
self,
request: ExecuteRequest,
options: ExecuteOptions,
out: SyncWrite,
err: SyncWrite,
on_exit_drain: bool = True,
) -> None:
super().__init__(request, options, out, err)
self.process: Popen[bytes] | None = None
self._cmd: list[str] | None = None
self._read_stderr: ReadViaThread | None = None
self._read_stdout: ReadViaThread | None = None
self._on_exit_drain = on_exit_drain
@property
def cmd(self) -> Sequence[str]:
if self._cmd is None:
base = self.request.cmd[0]
executable = shutil.which(base, path=self.request.env["PATH"])
if executable is None:
cmd = self.request.cmd # if failed to find leave as it is
else:
if self.request.allow is not None:
for allow in self.request.allow:
# 1. allow matches just the original name of the executable
# 2. allow matches the entire resolved path
if fnmatch.fnmatch(self.request.cmd[0], allow) or fnmatch.fnmatch(executable, allow):
break
else:
msg = f"{base} (resolves to {executable})" if base == executable else base
raise Fail(f"{msg} is not allowed, use allowlist_externals to allow it")
cmd = [executable]
if sys.platform != "win32" and self.request.env.get("TOX_LIMITED_SHEBANG", "").strip():
shebang_line = shebang(executable)
if shebang_line:
cmd = [*shebang_line, executable]
cmd.extend(self.request.cmd[1:])
self._cmd = cmd
return self._cmd
def __enter__(self) -> ExecuteStatus:
# adjust sub-process terminal size
columns, lines = shutil.get_terminal_size(fallback=(-1, -1))
if columns != -1: # pragma: no branch
self.request.env.setdefault("COLUMNS", str(columns))
if lines != -1: # pragma: no branch
self.request.env.setdefault("LINES", str(lines))
stdout, stderr = self.get_stream_file_no("stdout"), self.get_stream_file_no("stderr")
try:
self.process = process = Popen(
self.cmd,
stdout=next(stdout),
stderr=next(stderr),
stdin={StdinSource.USER: None, StdinSource.OFF: DEVNULL, StdinSource.API: PIPE}[self.request.stdin],
cwd=str(self.request.cwd),
env=self.request.env,
)
except OSError as exception:
return LocalSubprocessExecuteFailedStatus(self.options, self._out, self._err, exception.errno)
status = LocalSubprocessExecuteStatus(self.options, self._out, self._err, process)
drain, pid = self._on_exit_drain, self.process.pid
self._read_stderr = ReadViaThread(stderr.send(process), self.err_handler, name=f"err-{pid}", drain=drain)
self._read_stderr.__enter__()
self._read_stdout = ReadViaThread(stdout.send(process), self.out_handler, name=f"out-{pid}", drain=drain)
self._read_stdout.__enter__()
if sys.platform == "win32": # explicit check for mypy: # pragma: win32 cover
process.stderr.read = self._read_stderr._drain_stream # type: ignore[assignment,union-attr]
process.stdout.read = self._read_stdout._drain_stream # type: ignore[assignment,union-attr]
return status
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
if self._read_stderr is not None:
self._read_stderr.__exit__(exc_type, exc_val, exc_tb)
if self._read_stdout is not None:
self._read_stdout.__exit__(exc_type, exc_val, exc_tb)
if self.process is not None: # cleanup the file handlers
for stream in (self.process.stdout, self.process.stderr, self.process.stdin):
if stream is not None and not getattr(stream, "closed", False):
try:
stream.close()
except OSError as exc: # pragma: no cover
logging.warning("error while trying to close %r with %r", stream, exc) # pragma: no cover
@staticmethod
def get_stream_file_no(key: str) -> Generator[int, Popen[bytes], None]:
allocated_pty = _pty(key)
if allocated_pty is not None:
main_fd, child_fd = allocated_pty
yield child_fd
os.close(child_fd) # close the child process pipe
yield main_fd
else:
process = yield PIPE
stream = getattr(process, key)
if sys.platform == "win32": # explicit check for mypy # pragma: win32 cover
yield stream.handle
else:
yield stream.name
def set_out_err(self, out: SyncWrite, err: SyncWrite) -> tuple[SyncWrite, SyncWrite]:
prev = self._out, self._err
if self._read_stdout is not None: # pragma: no branch
self._read_stdout.handler = out.handler
if self._read_stderr is not None: # pragma: no branch
self._read_stderr.handler = err.handler
return prev
def _pty(key: str) -> tuple[int, int] | None:
"""
Allocate a virtual terminal (pty) for a subprocess.
A virtual terminal allows a process to perform syscalls that fetch attributes related to the tty,
for example to determine whether to use colored output or enter interactive mode.
The termios attributes of the controlling terminal stream will be copied to the allocated pty.
:param key: The stream to copy attributes from. Either "stdout" or "stderr".
:return: (main_fd, child_fd) of an allocated pty; or None on error or if unsupported (win32).
"""
if sys.platform == "win32": # explicit check for mypy # pragma: win32 cover
return None
stream: io.TextIOWrapper = getattr(sys, key)
# when our current stream is a tty, emulate pty for the child
# to allow host streams traits to be inherited
if not stream.isatty():
return None
try:
import fcntl
import pty
import struct
import termios
except ImportError: # pragma: no cover
return None # cannot proceed on platforms without pty support
try:
main, child = pty.openpty()
except OSError: # could not open a tty
return None # pragma: no cover
try:
mode = termios.tcgetattr(stream)
termios.tcsetattr(child, termios.TCSANOW, mode)
except (termios.error, OSError): # could not inherit traits
return None # pragma: no cover
# adjust sub-process terminal size
columns, lines = shutil.get_terminal_size(fallback=(-1, -1))
if columns != -1 and lines != -1:
size = struct.pack("HHHH", columns, lines, 0, 0)
fcntl.ioctl(child, termios.TIOCSWINSZ, size)
return main, child
__all__ = (
"SIG_INTERRUPT",
"CREATION_FLAGS",
"LocalSubProcessExecuteInstance",
"LocalSubProcessExecutor",
"LocalSubprocessExecuteStatus",
"LocalSubprocessExecuteFailedStatus",
)
|