1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
"""A executor that reuses a single subprocess for all backend calls (saving on python startup/import overhead)"""
from __future__ import annotations
import time
from pathlib import Path
from subprocess import TimeoutExpired
from threading import Lock
from types import TracebackType
from typing import Sequence
from pyproject_api import BackendFailed
from tox.execute import ExecuteRequest
from tox.execute.api import Execute, ExecuteInstance, ExecuteOptions, ExecuteStatus
from tox.execute.local_sub_process import LocalSubProcessExecuteInstance
from tox.execute.request import StdinSource
from tox.execute.stream import SyncWrite
class LocalSubProcessPep517Executor(Execute):
"""Executor holding the backend process"""
def __init__(self, colored: bool, cmd: Sequence[str], env: dict[str, str], cwd: Path):
super().__init__(colored)
self.cmd = cmd
self.env = env
self.cwd = cwd
self._local_execute: tuple[LocalSubProcessExecuteInstance, ExecuteStatus] | None = None
self._exc: Exception | None = None
self.is_alive: bool = False
def build_instance(
self,
request: ExecuteRequest,
options: ExecuteOptions,
out: SyncWrite,
err: SyncWrite,
) -> ExecuteInstance:
result = LocalSubProcessPep517ExecuteInstance(request, options, out, err, self.local_execute(options))
return result
def local_execute(self, options: ExecuteOptions) -> tuple[LocalSubProcessExecuteInstance, ExecuteStatus]:
if self._exc is not None:
raise self._exc
if self._local_execute is None:
request = ExecuteRequest(cmd=self.cmd, cwd=self.cwd, env=self.env, stdin=StdinSource.API, run_id="pep517")
instance = LocalSubProcessExecuteInstance(
request=request,
options=options,
out=SyncWrite(name="pep517-out", target=None, color=None), # not enabled no need to enter/exit
err=SyncWrite(name="pep517-err", target=None, color=None), # not enabled no need to enter/exit
on_exit_drain=False,
)
status = instance.__enter__()
self._local_execute = instance, status
while True:
if b"started backend " in status.out:
self.is_alive = True
break
if b"failed to start backend" in status.err:
from tox.tox_env.python.virtual_env.package.pyproject import ToxBackendFailed
failure = BackendFailed(
result={
"code": -5,
"exc_type": "FailedToStart",
"exc_msg": "could not start backend",
},
out=status.out.decode(),
err=status.err.decode(),
)
self._exc = ToxBackendFailed(failure)
raise self._exc
time.sleep(0.01) # wait a short while for the output to populate
return self._local_execute
@staticmethod
def _handler(into: bytearray, content: bytes) -> None:
"""ignore content generated"""
into.extend(content) # pragma: no cover
def close(self) -> None:
if self._local_execute is not None: # pragma: no branch
execute, status = self._local_execute
execute.__exit__(None, None, None)
if execute.process is not None: # pragma: no branch
if execute.process.returncode is None: # pragma: no cover
try: # pragma: no cover
execute.process.wait(timeout=0.1) # pragma: no cover
except TimeoutExpired: # pragma: no cover
execute.process.terminate() # pragma: no cover # if does not stop on its own kill it
self.is_alive = False
class LocalSubProcessPep517ExecuteInstance(ExecuteInstance):
"""A backend invocation"""
def __init__(
self,
request: ExecuteRequest,
options: ExecuteOptions,
out: SyncWrite,
err: SyncWrite,
instance_status: tuple[LocalSubProcessExecuteInstance, ExecuteStatus],
):
super().__init__(request, options, out, err)
self._instance, self._status = instance_status
self._lock = Lock()
@property
def cmd(self) -> Sequence[str]:
return self._instance.cmd
def __enter__(self) -> ExecuteStatus:
self._lock.acquire()
self._swap_out_err()
return self._status
def __exit__(
self,
exc_type: type[BaseException] | None, # noqa: U100
exc_val: BaseException | None, # noqa: U100
exc_tb: TracebackType | None, # noqa: U100
) -> None:
self._swap_out_err()
self._lock.release()
def _swap_out_err(self) -> None:
out, err = self._out, self._err
# update status to see the newly collected content
self._out, self._err = self._instance.set_out_err(out, err)
# update the thread out/err
self._status.set_out_err(out, err)
|