summaryrefslogtreecommitdiff
path: root/tests/execute/local_subprocess/local_subprocess_sigint.py
blob: 8bee5fe27784cde4544fe39823666b32437bc96d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from __future__ import annotations

import logging
import os
import signal
import sys
from io import TextIOWrapper
from pathlib import Path
from types import FrameType
from unittest.mock import MagicMock

from tox.execute import Outcome
from tox.execute.local_sub_process import LocalSubProcessExecutor
from tox.execute.request import ExecuteRequest, StdinSource
from tox.report import NamedBytesIO

logging.basicConfig(level=logging.DEBUG, format="%(relativeCreated)d\t%(levelname).1s\t%(message)s")
bad_process = Path(__file__).parent / "bad_process.py"

executor = LocalSubProcessExecutor(colored=False)
request = ExecuteRequest(
    cmd=[sys.executable, bad_process, sys.argv[1]],
    cwd=Path().absolute(),
    env=os.environ.copy(),
    stdin=StdinSource.API,
    run_id="",
)
out_err = TextIOWrapper(NamedBytesIO("out")), TextIOWrapper(NamedBytesIO("err"))


def show_outcome(outcome: Outcome | None) -> None:
    if outcome is not None:  # pragma: no branch
        print(outcome.exit_code)
        print(repr(outcome.out))
        print(repr(outcome.err))
        print(outcome.elapsed, end="")
        print("done show outcome", file=sys.stderr)


def handler(s: int, f: FrameType | None) -> None:
    logging.info(f"signal {s} at {f}")
    global interrupt_done
    if interrupt_done is False:  # pragma: no branch
        interrupt_done = True
        logging.info(f"interrupt via {status}")
        status.interrupt()
        logging.info(f"interrupt finished via {status}")


interrupt_done = False
signal.signal(signal.SIGINT, handler)
logging.info("PID %d start %r", os.getpid(), request)
tox_env = MagicMock(conf={"suicide_timeout": 0.01, "interrupt_timeout": 0.05, "terminate_timeout": 0.07})
try:
    with executor.call(request, show=False, out_err=out_err, env=tox_env) as status:
        logging.info("wait on %r", status)
        while status.exit_code is None:
            status.wait(timeout=0.01)  # use wait here with timeout to not block the main thread
        logging.info("wait over on %r", status)
    show_outcome(status.outcome)
except Exception as exception:  # pragma: no cover
    logging.exception(exception)  # pragma: no cover
finally:
    logging.info("done")
    logging.shutdown()