summaryrefslogtreecommitdiff
path: root/src/tox/execute/pep517_backend.py
blob: 2f8ffc6d8fb399e5849c79c3da9493faec9758fc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
"""A executor that reuses a single subprocess for all backend calls (saving on python startup/import overhead)"""
from __future__ import annotations

import time
from pathlib import Path
from subprocess import TimeoutExpired
from threading import Lock
from types import TracebackType
from typing import Sequence

from pyproject_api import BackendFailed

from tox.execute import ExecuteRequest
from tox.execute.api import Execute, ExecuteInstance, ExecuteOptions, ExecuteStatus
from tox.execute.local_sub_process import LocalSubProcessExecuteInstance
from tox.execute.request import StdinSource
from tox.execute.stream import SyncWrite


class LocalSubProcessPep517Executor(Execute):
    """Executor holding the backend process"""

    def __init__(self, colored: bool, cmd: Sequence[str], env: dict[str, str], cwd: Path):
        super().__init__(colored)
        self.cmd = cmd
        self.env = env
        self.cwd = cwd
        self._local_execute: tuple[LocalSubProcessExecuteInstance, ExecuteStatus] | None = None
        self._exc: Exception | None = None
        self.is_alive: bool = False

    def build_instance(
        self,
        request: ExecuteRequest,
        options: ExecuteOptions,
        out: SyncWrite,
        err: SyncWrite,
    ) -> ExecuteInstance:
        result = LocalSubProcessPep517ExecuteInstance(request, options, out, err, self.local_execute(options))
        return result

    def local_execute(self, options: ExecuteOptions) -> tuple[LocalSubProcessExecuteInstance, ExecuteStatus]:
        if self._exc is not None:
            raise self._exc
        if self._local_execute is None:
            request = ExecuteRequest(cmd=self.cmd, cwd=self.cwd, env=self.env, stdin=StdinSource.API, run_id="pep517")

            instance = LocalSubProcessExecuteInstance(
                request=request,
                options=options,
                out=SyncWrite(name="pep517-out", target=None, color=None),  # not enabled no need to enter/exit
                err=SyncWrite(name="pep517-err", target=None, color=None),  # not enabled no need to enter/exit
                on_exit_drain=False,
            )
            status = instance.__enter__()
            self._local_execute = instance, status
            while True:
                if b"started backend " in status.out:
                    self.is_alive = True
                    break
                if b"failed to start backend" in status.err:
                    from tox.tox_env.python.virtual_env.package.pyproject import ToxBackendFailed

                    failure = BackendFailed(
                        result={
                            "code": -5,
                            "exc_type": "FailedToStart",
                            "exc_msg": "could not start backend",
                        },
                        out=status.out.decode(),
                        err=status.err.decode(),
                    )
                    self._exc = ToxBackendFailed(failure)
                    raise self._exc
                time.sleep(0.01)  # wait a short while for the output to populate
        return self._local_execute

    @staticmethod
    def _handler(into: bytearray, content: bytes) -> None:
        """ignore content generated"""
        into.extend(content)  # pragma: no cover

    def close(self) -> None:
        if self._local_execute is not None:  # pragma: no branch
            execute, status = self._local_execute
            execute.__exit__(None, None, None)
            if execute.process is not None:  # pragma: no branch
                if execute.process.returncode is None:  # pragma: no cover
                    try:  # pragma: no cover
                        execute.process.wait(timeout=0.1)  # pragma: no cover
                    except TimeoutExpired:  # pragma: no cover
                        execute.process.terminate()  # pragma: no cover  # if does not stop on its own kill it
        self.is_alive = False


class LocalSubProcessPep517ExecuteInstance(ExecuteInstance):
    """A backend invocation"""

    def __init__(
        self,
        request: ExecuteRequest,
        options: ExecuteOptions,
        out: SyncWrite,
        err: SyncWrite,
        instance_status: tuple[LocalSubProcessExecuteInstance, ExecuteStatus],
    ):
        super().__init__(request, options, out, err)
        self._instance, self._status = instance_status
        self._lock = Lock()

    @property
    def cmd(self) -> Sequence[str]:
        return self._instance.cmd

    def __enter__(self) -> ExecuteStatus:
        self._lock.acquire()
        self._swap_out_err()
        return self._status

    def __exit__(
        self,
        exc_type: type[BaseException] | None,  # noqa: U100
        exc_val: BaseException | None,  # noqa: U100
        exc_tb: TracebackType | None,  # noqa: U100
    ) -> None:
        self._swap_out_err()
        self._lock.release()

    def _swap_out_err(self) -> None:
        out, err = self._out, self._err
        # update status to see the newly collected content
        self._out, self._err = self._instance.set_out_err(out, err)
        # update the thread out/err
        self._status.set_out_err(out, err)