1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
from __future__ import annotations
import logging
import os
import signal
import sys
from io import TextIOWrapper
from pathlib import Path
from types import FrameType
from unittest.mock import MagicMock
from tox.execute import Outcome
from tox.execute.local_sub_process import LocalSubProcessExecutor
from tox.execute.request import ExecuteRequest, StdinSource
from tox.report import NamedBytesIO
logging.basicConfig(level=logging.DEBUG, format="%(relativeCreated)d\t%(levelname).1s\t%(message)s")
bad_process = Path(__file__).parent / "bad_process.py"
executor = LocalSubProcessExecutor(colored=False)
request = ExecuteRequest(
cmd=[sys.executable, bad_process, sys.argv[1]],
cwd=Path().absolute(),
env=os.environ.copy(),
stdin=StdinSource.API,
run_id="",
)
out_err = TextIOWrapper(NamedBytesIO("out")), TextIOWrapper(NamedBytesIO("err"))
def show_outcome(outcome: Outcome | None) -> None:
if outcome is not None: # pragma: no branch
print(outcome.exit_code)
print(repr(outcome.out))
print(repr(outcome.err))
print(outcome.elapsed, end="")
print("done show outcome", file=sys.stderr)
def handler(s: int, f: FrameType | None) -> None:
logging.info(f"signal {s} at {f}")
global interrupt_done
if interrupt_done is False: # pragma: no branch
interrupt_done = True
logging.info(f"interrupt via {status}")
status.interrupt()
logging.info(f"interrupt finished via {status}")
interrupt_done = False
signal.signal(signal.SIGINT, handler)
logging.info("PID %d start %r", os.getpid(), request)
tox_env = MagicMock(conf={"suicide_timeout": 0.01, "interrupt_timeout": 0.05, "terminate_timeout": 0.07})
try:
with executor.call(request, show=False, out_err=out_err, env=tox_env) as status:
logging.info("wait on %r", status)
while status.exit_code is None:
status.wait(timeout=0.01) # use wait here with timeout to not block the main thread
logging.info("wait over on %r", status)
show_outcome(status.outcome)
except Exception as exception: # pragma: no cover
logging.exception(exception) # pragma: no cover
finally:
logging.info("done")
logging.shutdown()
|