# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0 # For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt """ A pytest plugin to record test times and then balance xdist with them next time. The only thing in here particular to coverage.py is the use of the tmp directory for storing the data. """ import collections import csv import os import shutil import time from pathlib import Path import pytest import xdist.scheduler def pytest_addoption(parser): """Auto-called to define ini-file settings.""" parser.addini( "balanced_clumps", type="linelist", help="Test substrings to assign to the same worker", ) @pytest.hookimpl(tryfirst=True) def pytest_configure(config): """Registers our pytest plugin.""" config.pluginmanager.register(BalanceXdistPlugin(config), "balance_xdist_plugin") class BalanceXdistPlugin: # pragma: debugging """The plugin""" def __init__(self, config): self.config = config self.running_all = (self.config.getoption("-k") == "") self.times = collections.defaultdict(float) self.worker = os.environ.get("PYTEST_XDIST_WORKER", "none") self.tests_csv = None def pytest_sessionstart(self, session): """Called once before any tests are run, but in every worker.""" if not self.running_all: return tests_csv_dir = Path(session.startdir).resolve() / "tmp/tests_csv" self.tests_csv = tests_csv_dir / f"{self.worker}.csv" if self.worker == "none": if tests_csv_dir.exists(): for csv_file in tests_csv_dir.iterdir(): with csv_file.open(newline="") as fcsv: reader = csv.reader(fcsv) for row in reader: self.times[row[1]] += float(row[3]) shutil.rmtree(tests_csv_dir) def write_duration_row(self, item, phase, duration): """Helper to write a row to the tracked-test csv file.""" if self.running_all: self.tests_csv.parent.mkdir(parents=True, exist_ok=True) with self.tests_csv.open("a", newline="") as fcsv: csv.writer(fcsv).writerow([self.worker, item.nodeid, phase, duration]) @pytest.hookimpl(hookwrapper=True) def pytest_runtest_setup(self, item): """Run once for each test.""" start = time.time() yield self.write_duration_row(item, "setup", time.time() - start) @pytest.hookimpl(hookwrapper=True) def pytest_runtest_call(self, item): """Run once for each test.""" start = time.time() yield self.write_duration_row(item, "call", time.time() - start) @pytest.hookimpl(hookwrapper=True) def pytest_runtest_teardown(self, item): """Run once for each test.""" start = time.time() yield self.write_duration_row(item, "teardown", time.time() - start) @pytest.mark.trylast def pytest_xdist_make_scheduler(self, config, log): """Create our BalancedScheduler using time data from the last run.""" # Assign tests to chunks nchunks = 8 totals = [0] * nchunks tests = collections.defaultdict(set) # first put the difficult ones all in one worker clumped = set() clumps = config.getini("balanced_clumps") for i, clump_word in enumerate(clumps): clump_nodes = set(nodeid for nodeid in self.times.keys() if clump_word in nodeid) i %= nchunks tests[i].update(clump_nodes) totals[i] += sum(self.times[nodeid] for nodeid in clump_nodes) clumped.update(clump_nodes) # Then assign the rest in descending order rest = [(nodeid, t) for (nodeid, t) in self.times.items() if nodeid not in clumped] rest.sort(key=lambda item: item[1], reverse=True) for nodeid, t in rest: lightest = min(enumerate(totals), key=lambda pair: pair[1])[0] tests[lightest].add(nodeid) totals[lightest] += t test_chunks = {} for chunk_id, nodeids in tests.items(): for nodeid in nodeids: test_chunks[nodeid] = chunk_id return BalancedScheduler(config, log, clumps, test_chunks) class BalancedScheduler(xdist.scheduler.LoadScopeScheduling): # pylint: disable=abstract-method # pragma: debugging """A balanced-chunk test scheduler for pytest-xdist.""" def __init__(self, config, log, clumps, test_chunks): super().__init__(config, log) self.clumps = clumps self.test_chunks = test_chunks def _split_scope(self, nodeid): """Assign a chunk id to a test node.""" # If we have a chunk assignment for this node, return it. scope = self.test_chunks.get(nodeid) if scope is not None: return scope # If this is a node that should be clumped, clump it. for i, clump_word in enumerate(self.clumps): if clump_word in nodeid: return f"clump{i}" # Otherwise every node is a separate chunk. return nodeid # Run this with: # python -c "from tests.balance_xdist_plugin import show_worker_times as f; f()" def show_worker_times(): # pragma: debugging """Ad-hoc utility to show data from the last tracked-test run.""" times = collections.defaultdict(float) tests = collections.defaultdict(int) tests_csv_dir = Path("tmp/tests_csv") for csv_file in tests_csv_dir.iterdir(): with csv_file.open(newline="") as fcsv: reader = csv.reader(fcsv) for row in reader: worker = row[0] duration = float(row[3]) times[worker] += duration if row[2] == "call": tests[worker] += 1 for worker in sorted(tests.keys()): print(f"{worker}: {tests[worker]:3d} {times[worker]:.2f}") total = sum(times.values()) avg = total / len(times) print(f"total: {total:.2f}, avg: {avg:.2f}") lo = min(times.values()) hi = max(times.values()) print(f"lo = {lo:.2f}; hi = {hi:.2f}; gap = {hi - lo:.2f}; long delta = {hi - avg:.2f}")