diff options
-rw-r--r-- | .treerc | 1 | ||||
-rw-r--r-- | Makefile | 2 | ||||
-rw-r--r-- | setup.cfg | 10 | ||||
-rw-r--r-- | tests/balance_xdist_plugin.py | 174 | ||||
-rw-r--r-- | tests/conftest.py | 54 |
5 files changed, 190 insertions, 51 deletions
@@ -15,3 +15,4 @@ ignore = _build _spell *.egg *.egg-info .mypy_cache + tmp @@ -62,7 +62,7 @@ metahtml: ## Produce meta-coverage HTML reports. python igor.py combine_html metasmoke: - COVERAGE_NO_PYTRACER=1 ARGS="-e py39" make clean metacov metahtml + COVERAGE_NO_PYTRACER=1 ARGS="-e py39" make metacov metahtml PIP_COMPILE = pip-compile --upgrade --allow-unsafe upgrade: export CUSTOM_COMPILE_COMMAND=make upgrade @@ -14,7 +14,15 @@ filterwarnings = ignore:the imp module is deprecated in favour of importlib:DeprecationWarning # xfail tests that pass should fail the test suite -xfail_strict=true +xfail_strict = true + +balanced_clumps = + ; Because of expensive session-scoped fixture: + VirtualenvTest + ; Because of shared-file manipulations (~/tests/actual/testing): + CompareTest + ; No idea why this one fails if run on separate workers: + GetZipBytesTest [pep8] # E265 block comment should start with '# ' diff --git a/tests/balance_xdist_plugin.py b/tests/balance_xdist_plugin.py new file mode 100644 index 00000000..6786a342 --- /dev/null +++ b/tests/balance_xdist_plugin.py @@ -0,0 +1,174 @@ +# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0 +# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt + +""" +A pytest plugin to record test times and then balance xdist with them next time. + +The only thing in here particular to coverage.py is the use of the tmp directory +for storing the data. +""" + +import collections +import csv +import os +import shutil +import time +from pathlib import Path + +import pytest +import xdist.scheduler + + +def pytest_addoption(parser): + """Auto-called to define ini-file settings.""" + parser.addini( + "balanced_clumps", + type="linelist", + help="Test substrings to assign to the same worker", + ) + +@pytest.hookimpl(tryfirst=True) +def pytest_configure(config): + """Registers our pytest plugin.""" + config.pluginmanager.register(BalanceXdistPlugin(config), "balance_xdist_plugin") + + +class BalanceXdistPlugin: # pragma: debugging + """The plugin""" + + def __init__(self, config): + self.config = config + self.running_all = (self.config.getoption("-k") == "") + self.times = collections.defaultdict(float) + self.worker = os.environ.get("PYTEST_XDIST_WORKER", "none") + self.tests_csv = None + + def pytest_sessionstart(self, session): + """Called once before any tests are run, but in every worker.""" + if not self.running_all: + return + + tests_csv_dir = Path(session.startdir).resolve() / "tmp/tests_csv" + self.tests_csv = tests_csv_dir / f"{self.worker}.csv" + + if self.worker == "none": + if tests_csv_dir.exists(): + for csv_file in tests_csv_dir.iterdir(): + with csv_file.open(newline="") as fcsv: + reader = csv.reader(fcsv) + for row in reader: + self.times[row[1]] += float(row[3]) + shutil.rmtree(tests_csv_dir) + + def write_duration_row(self, item, phase, duration): + """Helper to write a row to the tracked-test csv file.""" + if self.running_all: + self.tests_csv.parent.mkdir(parents=True, exist_ok=True) + with self.tests_csv.open("a", newline="") as fcsv: + csv.writer(fcsv).writerow([self.worker, item.nodeid, phase, duration]) + + @pytest.hookimpl(hookwrapper=True) + def pytest_runtest_setup(self, item): + """Run once for each test.""" + start = time.time() + yield + self.write_duration_row(item, "setup", time.time() - start) + + @pytest.hookimpl(hookwrapper=True) + def pytest_runtest_call(self, item): + """Run once for each test.""" + start = time.time() + yield + self.write_duration_row(item, "call", time.time() - start) + + @pytest.hookimpl(hookwrapper=True) + def pytest_runtest_teardown(self, item): + """Run once for each test.""" + start = time.time() + yield + self.write_duration_row(item, "teardown", time.time() - start) + + @pytest.mark.trylast + def pytest_xdist_make_scheduler(self, config, log): + """Create our BalancedScheduler using time data from the last run.""" + # Assign tests to chunks + nchunks = 8 + totals = [0] * nchunks + tests = collections.defaultdict(set) + + # first put the difficult ones all in one worker + clumped = set() + clumps = config.getini("balanced_clumps") + for i, clump_word in enumerate(clumps): + clump_nodes = set(nodeid for nodeid in self.times.keys() if clump_word in nodeid) + i %= nchunks + tests[i].update(clump_nodes) + totals[i] += sum(self.times[nodeid] for nodeid in clump_nodes) + clumped.update(clump_nodes) + + # Then assign the rest in descending order + rest = [(nodeid, t) for (nodeid, t) in self.times.items() if nodeid not in clumped] + rest.sort(key=lambda item: item[1], reverse=True) + for nodeid, t in rest: + lightest = min(enumerate(totals), key=lambda pair: pair[1])[0] + tests[lightest].add(nodeid) + totals[lightest] += t + + test_chunks = {} + for chunk_id, nodeids in tests.items(): + for nodeid in nodeids: + test_chunks[nodeid] = chunk_id + + return BalancedScheduler(config, log, clumps, test_chunks) + + +class BalancedScheduler(xdist.scheduler.LoadScopeScheduling): # pylint: disable=abstract-method # pragma: debugging + """A balanced-chunk test scheduler for pytest-xdist.""" + def __init__(self, config, log, clumps, test_chunks): + super().__init__(config, log) + self.clumps = clumps + self.test_chunks = test_chunks + + def _split_scope(self, nodeid): + """Assign a chunk id to a test node.""" + # If we have a chunk assignment for this node, return it. + scope = self.test_chunks.get(nodeid) + if scope is not None: + return scope + + # If this is a node that should be clumped, clump it. + for i, clump_word in enumerate(self.clumps): + if clump_word in nodeid: + return f"clump{i}" + + # Otherwise every node is a separate chunk. + return nodeid + + +# Run this with: +# python -c "from tests.balance_xdist_plugin import show_worker_times as f; f()" +def show_worker_times(): # pragma: debugging + """Ad-hoc utility to show data from the last tracked-test run.""" + times = collections.defaultdict(float) + tests = collections.defaultdict(int) + tests_csv_dir = Path("tmp/tests_csv") + + for csv_file in tests_csv_dir.iterdir(): + with csv_file.open(newline="") as fcsv: + reader = csv.reader(fcsv) + for row in reader: + worker = row[0] + duration = float(row[3]) + times[worker] += duration + if row[2] == "call": + tests[worker] += 1 + + for worker in sorted(tests.keys()): + print(f"{worker}: {tests[worker]:3d} {times[worker]:.2f}") + + total = sum(times.values()) + avg = total / len(times) + print(f"total: {total:.2f}, avg: {avg:.2f}") + lo = min(times.values()) + hi = max(times.values()) + print(f"lo = {lo:.2f}; hi = {hi:.2f}; gap = {hi - lo:.2f}; long delta = {hi - avg:.2f}") diff --git a/tests/conftest.py b/tests/conftest.py index c8ca21f2..23d6b213 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -7,7 +7,6 @@ Pytest auto configuration. This module is run automatically by pytest, to define and enable fixtures. """ -import itertools import os import sys import sysconfig @@ -20,7 +19,6 @@ from coverage import env from coverage.exceptions import _StopEverything from coverage.files import set_relative_directory - # Pytest will rewrite assertions in test modules, but not elsewhere. # This tells pytest to also rewrite assertions in coveragetest.py. pytest.register_assert_rewrite("tests.coveragetest") @@ -29,6 +27,9 @@ pytest.register_assert_rewrite("tests.helpers") # Pytest can take additional options: # $set_env.py: PYTEST_ADDOPTS - Extra arguments to pytest. +pytest_plugins = "tests.balance_xdist_plugin" + + @pytest.fixture(autouse=True) def set_warnings(): """Configure warnings to show while running tests.""" @@ -92,17 +93,11 @@ def reset_filesdotpy_globals(): WORKER = os.environ.get("PYTEST_XDIST_WORKER", "none") -TRACK_TESTS = False -TEST_TXT = "/tmp/tests.txt" - def pytest_sessionstart(): """Run once at the start of the test session.""" - if TRACK_TESTS: # pragma: debugging - with open(TEST_TXT, "w") as testtxt: - print("Starting:", file=testtxt) - - # Create a .pth file for measuring subprocess coverage. + # Only in the main process... if WORKER == "none": + # Create a .pth file for measuring subprocess coverage. pth_dir = find_writable_pth_directory() assert pth_dir (pth_dir / "subcover.pth").write_text("import coverage; coverage.process_startup()\n") @@ -118,53 +113,14 @@ def pytest_sessionfinish(): if pth_file.exists(): pth_file.unlink() - -def write_test_name(prefix): - """For tracking where and when tests are running.""" - if TRACK_TESTS: # pragma: debugging - with open(TEST_TXT, "a") as testtxt: - test = os.environ.get("PYTEST_CURRENT_TEST", "unknown") - print(f"{prefix} {WORKER}: {test}", file=testtxt, flush=True) - - @pytest.hookimpl(hookwrapper=True) def pytest_runtest_call(item): """Run once for each test.""" - write_test_name(">") - # Convert _StopEverything into skipped tests. outcome = yield if outcome.excinfo and issubclass(outcome.excinfo[0], _StopEverything): # pragma: only jython pytest.skip(f"Skipping {item.nodeid} for _StopEverything: {outcome.excinfo[1]}") - write_test_name("<") - - -def interleaved(firsts, rest, n): - """Interleave the firsts among the rest so that they occur each n items.""" - num = sum(len(l) for l in firsts) + len(rest) - lists = firsts + [rest] * (n - len(firsts)) - listcycle = itertools.cycle(lists) - - while num: - alist = next(listcycle) # pylint: disable=stop-iteration-return - if alist: - yield alist.pop() - num -= 1 - - -def pytest_collection_modifyitems(items): - """Re-order the collected tests.""" - # Trick the xdist scheduler to put all of the VirtualenvTest tests on the - # same worker by sprinkling them into the collected items every Nth place. - virt = set(i for i in items if "VirtualenvTest" in i.nodeid) - rest = [i for i in items if i not in virt] - nworkers = int(os.environ.get("PYTEST_XDIST_WORKER_COUNT", 4)) - items[:] = interleaved([virt], rest, nworkers) - if TRACK_TESTS: # pragma: debugging - with open("/tmp/items.txt", "w") as f: - print("\n".join(i.nodeid for i in items), file=f) - def possible_pth_dirs(): """Produce a sequence of directories for trying to write .pth files.""" |