summaryrefslogtreecommitdiff
path: root/src/tox/execute/local_sub_process/__init__.py
blob: 478fb6b084149c51c5929a7061b20ee776800d86 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
"""Execute that runs on local file system via subprocess-es"""
from __future__ import annotations

import fnmatch
import io
import logging
import os
import shutil
import sys
from subprocess import DEVNULL, PIPE, TimeoutExpired
from types import TracebackType
from typing import TYPE_CHECKING, Any, Generator, Sequence

from tox.tox_env.errors import Fail

from ..api import Execute, ExecuteInstance, ExecuteOptions, ExecuteStatus
from ..request import ExecuteRequest, StdinSource
from ..stream import SyncWrite
from ..util import shebang

# mypy: warn-unused-ignores=false

if sys.platform == "win32":  # explicit check for mypy # pragma: win32 cover
    # needs stdin/stdout handlers backed by overlapped IO
    if TYPE_CHECKING:  # the typeshed libraries don't contain this, so replace it with normal one
        from subprocess import Popen
    else:
        from asyncio.windows_utils import Popen
    from signal import CTRL_C_EVENT as SIG_INTERRUPT
    from signal import SIGTERM

    from .read_via_thread_windows import ReadViaThreadWindows as ReadViaThread

else:  # pragma: win32 no cover
    from signal import SIGINT as SIG_INTERRUPT
    from signal import SIGKILL, SIGTERM
    from subprocess import Popen

    from .read_via_thread_unix import ReadViaThreadUnix as ReadViaThread


IS_WIN = sys.platform == "win32"


class LocalSubProcessExecutor(Execute):
    def build_instance(
        self,
        request: ExecuteRequest,
        options: ExecuteOptions,
        out: SyncWrite,
        err: SyncWrite,
    ) -> ExecuteInstance:
        return LocalSubProcessExecuteInstance(request, options, out, err)


class LocalSubprocessExecuteStatus(ExecuteStatus):
    def __init__(self, options: ExecuteOptions, out: SyncWrite, err: SyncWrite, process: Popen[bytes]):
        self._process: Popen[bytes] = process
        super().__init__(options, out, err)
        self._interrupted = False

    @property
    def exit_code(self) -> int | None:
        return self._process.returncode

    def interrupt(self) -> None:
        self._interrupted = True
        if self._process is not None:  # pragma: no branch
            # A three level stop mechanism for children - INT -> TERM -> KILL
            # communicate will wait for the app to stop, and then drain the standard streams and close them
            to_pid, host_pid = self._process.pid, os.getpid()
            msg = "requested interrupt of %d from %d, activate in %.2f"
            logging.warning(msg, to_pid, host_pid, self.options.suicide_timeout)
            if self.wait(self.options.suicide_timeout) is None:  # still alive -> INT
                # on Windows everyone in the same process group, so they got the message
                if sys.platform != "win32":  # pragma: win32 cover
                    msg = "send signal %s to %d from %d with timeout %.2f"
                    logging.warning(msg, f"SIGINT({SIG_INTERRUPT})", to_pid, host_pid, self.options.interrupt_timeout)
                    self._process.send_signal(SIG_INTERRUPT)
                if self.wait(self.options.interrupt_timeout) is None:  # still alive -> TERM # pragma: no branch
                    terminate_output = self.options.terminate_timeout
                    logging.warning(msg, f"SIGTERM({SIGTERM})", to_pid, host_pid, terminate_output)
                    self._process.terminate()
                    # Windows terminate is UNIX kill
                    if sys.platform != "win32" and self.wait(terminate_output) is None:  # pragma: no branch
                        logging.warning(msg[:-18], f"SIGKILL({SIGKILL})", to_pid, host_pid)
                        self._process.kill()  # still alive -> KILL
                    self.wait()  # unconditional wait as kill should soon bring down the process
                logging.warning("interrupt finished with success")
            else:  # pragma: no cover # difficult to test, process must die just as it's being interrupted
                logging.warning("process already dead with %s within %s", self._process.returncode, host_pid)

    def wait(self, timeout: float | None = None) -> int | None:
        try:  # note wait in general might deadlock if output large, but we drain in background threads so not an issue
            return self._process.wait(timeout=timeout)
        except TimeoutExpired:
            return None

    def write_stdin(self, content: str) -> None:
        stdin = self._process.stdin
        if stdin is None:  # pragma: no branch
            return  # pragma: no cover
        bytes_content = content.encode()
        try:
            if sys.platform == "win32":  # explicit check for mypy  # pragma: win32 cover
                # on Windows we have a PipeHandle object here rather than a file stream
                import _overlapped  # type: ignore[import]

                ov = _overlapped.Overlapped(0)
                ov.WriteFile(stdin.handle, bytes_content)  # type: ignore[attr-defined]
                result = ov.getresult(10)  # wait up to 10ms to perform the operation
                if result != len(bytes_content):
                    raise RuntimeError(f"failed to write to {stdin!r}")
            else:
                stdin.write(bytes_content)
                stdin.flush()
        except OSError:  # pragma: no cover
            if self._interrupted:  # pragma: no cover
                pass  # pragma: no cover  # if the process was asked to exit in the meantime ignore write errors
            raise  # pragma: no cover

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}(pid={self._process.pid}, returncode={self._process.returncode!r})"

    @property
    def metadata(self) -> dict[str, Any]:
        return {"pid": self._process.pid} if self._process.pid else {}


class LocalSubprocessExecuteFailedStatus(ExecuteStatus):
    def __init__(self, options: ExecuteOptions, out: SyncWrite, err: SyncWrite, exit_code: int | None) -> None:
        super().__init__(options, out, err)
        self._exit_code = exit_code

    @property
    def exit_code(self) -> int | None:
        return self._exit_code

    def wait(self, timeout: float | None = None) -> int | None:  # noqa: U100
        return self._exit_code  # pragma: no cover

    def write_stdin(self, content: str) -> None:  # noqa: U100
        """cannot write"""

    def interrupt(self) -> None:
        return None  # pragma: no cover # nothing running so nothing to interrupt


class LocalSubProcessExecuteInstance(ExecuteInstance):
    def __init__(
        self,
        request: ExecuteRequest,
        options: ExecuteOptions,
        out: SyncWrite,
        err: SyncWrite,
        on_exit_drain: bool = True,
    ) -> None:
        super().__init__(request, options, out, err)
        self.process: Popen[bytes] | None = None
        self._cmd: list[str] | None = None
        self._read_stderr: ReadViaThread | None = None
        self._read_stdout: ReadViaThread | None = None
        self._on_exit_drain = on_exit_drain

    @property
    def cmd(self) -> Sequence[str]:
        if self._cmd is None:
            base = self.request.cmd[0]
            executable = shutil.which(base, path=self.request.env["PATH"])
            if executable is None:
                cmd = self.request.cmd  # if failed to find leave as it is
            else:
                if self.request.allow is not None:
                    for allow in self.request.allow:
                        # 1. allow matches just the original name of the executable
                        # 2. allow matches the entire resolved path
                        if fnmatch.fnmatch(self.request.cmd[0], allow) or fnmatch.fnmatch(executable, allow):
                            break
                    else:
                        msg = f"{base} (resolves to {executable})" if base == executable else base
                        raise Fail(f"{msg} is not allowed, use allowlist_externals to allow it")
                cmd = [executable]
                if sys.platform != "win32" and self.request.env.get("TOX_LIMITED_SHEBANG", "").strip():
                    shebang_line = shebang(executable)
                    if shebang_line:
                        cmd = [*shebang_line, executable]
                cmd.extend(self.request.cmd[1:])
            self._cmd = cmd
        return self._cmd

    def __enter__(self) -> ExecuteStatus:
        # adjust sub-process terminal size
        columns, lines = shutil.get_terminal_size(fallback=(-1, -1))
        if columns != -1:  # pragma: no branch
            self.request.env.setdefault("COLUMNS", str(columns))
        if lines != -1:  # pragma: no branch
            self.request.env.setdefault("LINES", str(lines))

        stdout, stderr = self.get_stream_file_no("stdout"), self.get_stream_file_no("stderr")
        try:
            self.process = process = Popen(
                self.cmd,
                stdout=next(stdout),
                stderr=next(stderr),
                stdin={StdinSource.USER: None, StdinSource.OFF: DEVNULL, StdinSource.API: PIPE}[self.request.stdin],
                cwd=str(self.request.cwd),
                env=self.request.env,
            )
        except OSError as exception:
            return LocalSubprocessExecuteFailedStatus(self.options, self._out, self._err, exception.errno)

        status = LocalSubprocessExecuteStatus(self.options, self._out, self._err, process)
        drain, pid = self._on_exit_drain, self.process.pid
        self._read_stderr = ReadViaThread(stderr.send(process), self.err_handler, name=f"err-{pid}", drain=drain)
        self._read_stderr.__enter__()
        self._read_stdout = ReadViaThread(stdout.send(process), self.out_handler, name=f"out-{pid}", drain=drain)
        self._read_stdout.__enter__()

        if sys.platform == "win32":  # explicit check for mypy:  # pragma: win32 cover
            process.stderr.read = self._read_stderr._drain_stream  # type: ignore[assignment,union-attr]
            process.stdout.read = self._read_stdout._drain_stream  # type: ignore[assignment,union-attr]
        return status

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        if self._read_stderr is not None:
            self._read_stderr.__exit__(exc_type, exc_val, exc_tb)
        if self._read_stdout is not None:
            self._read_stdout.__exit__(exc_type, exc_val, exc_tb)
        if self.process is not None:  # cleanup the file handlers
            for stream in (self.process.stdout, self.process.stderr, self.process.stdin):
                if stream is not None and not getattr(stream, "closed", False):
                    try:
                        stream.close()
                    except OSError as exc:  # pragma: no cover
                        logging.warning("error while trying to close %r with %r", stream, exc)  # pragma: no cover

    @staticmethod
    def get_stream_file_no(key: str) -> Generator[int, Popen[bytes], None]:
        allocated_pty = _pty(key)
        if allocated_pty is not None:
            main_fd, child_fd = allocated_pty
            yield child_fd
            os.close(child_fd)  # close the child process pipe
            yield main_fd
        else:
            process = yield PIPE
            stream = getattr(process, key)
            if sys.platform == "win32":  # explicit check for mypy # pragma: win32 cover
                yield stream.handle
            else:
                yield stream.name

    def set_out_err(self, out: SyncWrite, err: SyncWrite) -> tuple[SyncWrite, SyncWrite]:
        prev = self._out, self._err
        if self._read_stdout is not None:  # pragma: no branch
            self._read_stdout.handler = out.handler
        if self._read_stderr is not None:  # pragma: no branch
            self._read_stderr.handler = err.handler
        return prev


def _pty(key: str) -> tuple[int, int] | None:
    """
    Allocate a virtual terminal (pty) for a subprocess.

    A virtual terminal allows a process to perform syscalls that fetch attributes related to the tty,
    for example to determine whether to use colored output or enter interactive mode.

    The termios attributes of the controlling terminal stream will be copied to the allocated pty.

    :param key: The stream to copy attributes from. Either "stdout" or "stderr".
    :return: (main_fd, child_fd) of an allocated pty; or None on error or if unsupported (win32).
    """
    if sys.platform == "win32":  # explicit check for mypy # pragma: win32 cover
        return None

    stream: io.TextIOWrapper = getattr(sys, key)

    # when our current stream is a tty, emulate pty for the child
    #   to allow host streams traits to be inherited
    if not stream.isatty():
        return None

    try:
        import fcntl
        import pty
        import struct
        import termios
    except ImportError:  # pragma: no cover
        return None  # cannot proceed on platforms without pty support

    try:
        main, child = pty.openpty()
    except OSError:  # could not open a tty
        return None  # pragma: no cover

    try:
        mode = termios.tcgetattr(stream)
        termios.tcsetattr(child, termios.TCSANOW, mode)
    except (termios.error, OSError):  # could not inherit traits
        return None  # pragma: no cover

    # adjust sub-process terminal size
    columns, lines = shutil.get_terminal_size(fallback=(-1, -1))
    if columns != -1 and lines != -1:
        size = struct.pack("HHHH", columns, lines, 0, 0)
        fcntl.ioctl(child, termios.TIOCSWINSZ, size)

    return main, child


__all__ = (
    "SIG_INTERRUPT",
    "CREATION_FLAGS",
    "LocalSubProcessExecuteInstance",
    "LocalSubProcessExecutor",
    "LocalSubprocessExecuteStatus",
    "LocalSubprocessExecuteFailedStatus",
)