diff options
Diffstat (limited to 'tests/balance_xdist_plugin.py')
-rw-r--r-- | tests/balance_xdist_plugin.py | 174 |
1 files changed, 174 insertions, 0 deletions
diff --git a/tests/balance_xdist_plugin.py b/tests/balance_xdist_plugin.py new file mode 100644 index 00000000..6786a342 --- /dev/null +++ b/tests/balance_xdist_plugin.py @@ -0,0 +1,174 @@ +# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0 +# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt + +""" +A pytest plugin to record test times and then balance xdist with them next time. + +The only thing in here particular to coverage.py is the use of the tmp directory +for storing the data. +""" + +import collections +import csv +import os +import shutil +import time +from pathlib import Path + +import pytest +import xdist.scheduler + + +def pytest_addoption(parser): + """Auto-called to define ini-file settings.""" + parser.addini( + "balanced_clumps", + type="linelist", + help="Test substrings to assign to the same worker", + ) + +@pytest.hookimpl(tryfirst=True) +def pytest_configure(config): + """Registers our pytest plugin.""" + config.pluginmanager.register(BalanceXdistPlugin(config), "balance_xdist_plugin") + + +class BalanceXdistPlugin: # pragma: debugging + """The plugin""" + + def __init__(self, config): + self.config = config + self.running_all = (self.config.getoption("-k") == "") + self.times = collections.defaultdict(float) + self.worker = os.environ.get("PYTEST_XDIST_WORKER", "none") + self.tests_csv = None + + def pytest_sessionstart(self, session): + """Called once before any tests are run, but in every worker.""" + if not self.running_all: + return + + tests_csv_dir = Path(session.startdir).resolve() / "tmp/tests_csv" + self.tests_csv = tests_csv_dir / f"{self.worker}.csv" + + if self.worker == "none": + if tests_csv_dir.exists(): + for csv_file in tests_csv_dir.iterdir(): + with csv_file.open(newline="") as fcsv: + reader = csv.reader(fcsv) + for row in reader: + self.times[row[1]] += float(row[3]) + shutil.rmtree(tests_csv_dir) + + def write_duration_row(self, item, phase, duration): + """Helper to write a row to the tracked-test csv file.""" + if self.running_all: + self.tests_csv.parent.mkdir(parents=True, exist_ok=True) + with self.tests_csv.open("a", newline="") as fcsv: + csv.writer(fcsv).writerow([self.worker, item.nodeid, phase, duration]) + + @pytest.hookimpl(hookwrapper=True) + def pytest_runtest_setup(self, item): + """Run once for each test.""" + start = time.time() + yield + self.write_duration_row(item, "setup", time.time() - start) + + @pytest.hookimpl(hookwrapper=True) + def pytest_runtest_call(self, item): + """Run once for each test.""" + start = time.time() + yield + self.write_duration_row(item, "call", time.time() - start) + + @pytest.hookimpl(hookwrapper=True) + def pytest_runtest_teardown(self, item): + """Run once for each test.""" + start = time.time() + yield + self.write_duration_row(item, "teardown", time.time() - start) + + @pytest.mark.trylast + def pytest_xdist_make_scheduler(self, config, log): + """Create our BalancedScheduler using time data from the last run.""" + # Assign tests to chunks + nchunks = 8 + totals = [0] * nchunks + tests = collections.defaultdict(set) + + # first put the difficult ones all in one worker + clumped = set() + clumps = config.getini("balanced_clumps") + for i, clump_word in enumerate(clumps): + clump_nodes = set(nodeid for nodeid in self.times.keys() if clump_word in nodeid) + i %= nchunks + tests[i].update(clump_nodes) + totals[i] += sum(self.times[nodeid] for nodeid in clump_nodes) + clumped.update(clump_nodes) + + # Then assign the rest in descending order + rest = [(nodeid, t) for (nodeid, t) in self.times.items() if nodeid not in clumped] + rest.sort(key=lambda item: item[1], reverse=True) + for nodeid, t in rest: + lightest = min(enumerate(totals), key=lambda pair: pair[1])[0] + tests[lightest].add(nodeid) + totals[lightest] += t + + test_chunks = {} + for chunk_id, nodeids in tests.items(): + for nodeid in nodeids: + test_chunks[nodeid] = chunk_id + + return BalancedScheduler(config, log, clumps, test_chunks) + + +class BalancedScheduler(xdist.scheduler.LoadScopeScheduling): # pylint: disable=abstract-method # pragma: debugging + """A balanced-chunk test scheduler for pytest-xdist.""" + def __init__(self, config, log, clumps, test_chunks): + super().__init__(config, log) + self.clumps = clumps + self.test_chunks = test_chunks + + def _split_scope(self, nodeid): + """Assign a chunk id to a test node.""" + # If we have a chunk assignment for this node, return it. + scope = self.test_chunks.get(nodeid) + if scope is not None: + return scope + + # If this is a node that should be clumped, clump it. + for i, clump_word in enumerate(self.clumps): + if clump_word in nodeid: + return f"clump{i}" + + # Otherwise every node is a separate chunk. + return nodeid + + +# Run this with: +# python -c "from tests.balance_xdist_plugin import show_worker_times as f; f()" +def show_worker_times(): # pragma: debugging + """Ad-hoc utility to show data from the last tracked-test run.""" + times = collections.defaultdict(float) + tests = collections.defaultdict(int) + tests_csv_dir = Path("tmp/tests_csv") + + for csv_file in tests_csv_dir.iterdir(): + with csv_file.open(newline="") as fcsv: + reader = csv.reader(fcsv) + for row in reader: + worker = row[0] + duration = float(row[3]) + times[worker] += duration + if row[2] == "call": + tests[worker] += 1 + + for worker in sorted(tests.keys()): + print(f"{worker}: {tests[worker]:3d} {times[worker]:.2f}") + + total = sum(times.values()) + avg = total / len(times) + print(f"total: {total:.2f}, avg: {avg:.2f}") + lo = min(times.values()) + hi = max(times.values()) + print(f"lo = {lo:.2f}; hi = {hi:.2f}; gap = {hi - lo:.2f}; long delta = {hi - avg:.2f}") |