summaryrefslogtreecommitdiff
path: root/perf/perf_measure.py
diff options
context:
space:
mode:
Diffstat (limited to 'perf/perf_measure.py')
-rw-r--r--perf/perf_measure.py188
1 files changed, 0 insertions, 188 deletions
diff --git a/perf/perf_measure.py b/perf/perf_measure.py
deleted file mode 100644
index e8f9ea98..00000000
--- a/perf/perf_measure.py
+++ /dev/null
@@ -1,188 +0,0 @@
-# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
-# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt
-
-# Run like this:
-# .tox/py36/bin/python perf/perf_measure.py
-
-from collections import namedtuple
-import os
-import statistics
-import sys
-import tempfile
-import time
-
-from unittest_mixins.mixins import make_file
-
-import coverage
-from coverage.misc import import_local_file
-
-from tests.helpers import SuperModuleCleaner
-
-
-class StressResult(namedtuple('StressResult', ['files', 'calls', 'lines', 'baseline', 'covered'])):
- @property
- def overhead(self):
- return self.covered - self.baseline
-
-
-TEST_FILE = """\
-def parent(call_count, line_count):
- for _ in range(call_count):
- child(line_count)
-
-def child(line_count):
- for i in range(line_count):
- x = 1
-"""
-
-def mk_main(file_count, call_count, line_count):
- lines = []
- lines.extend(
- f"import test{idx}" for idx in range(file_count)
- )
- lines.extend(
- f"test{idx}.parent({call_count}, {line_count})" for idx in range(file_count)
- )
- return "\n".join(lines)
-
-
-class StressTest:
-
- def __init__(self):
- self.module_cleaner = SuperModuleCleaner()
-
- def _run_scenario(self, file_count, call_count, line_count):
- self.module_cleaner.clean_local_file_imports()
-
- for idx in range(file_count):
- make_file(f'test{idx}.py', TEST_FILE)
- make_file('testmain.py', mk_main(file_count, call_count, line_count))
-
- # Run it once just to get the disk caches loaded up.
- import_local_file("testmain")
- self.module_cleaner.clean_local_file_imports()
-
- # Run it to get the baseline time.
- start = time.perf_counter()
- import_local_file("testmain")
- baseline = time.perf_counter() - start
- self.module_cleaner.clean_local_file_imports()
-
- # Run it to get the covered time.
- start = time.perf_counter()
- cov = coverage.Coverage()
- cov.start()
- try: # pragma: nested
- # Import the Python file, executing it.
- import_local_file("testmain")
- finally: # pragma: nested
- # Stop coverage.py.
- covered = time.perf_counter() - start
- stats = cov._collector.tracers[0].get_stats()
- if stats:
- stats = stats.copy()
- cov.stop()
-
- return baseline, covered, stats
-
- def _compute_overhead(self, file_count, call_count, line_count):
- baseline, covered, stats = self._run_scenario(file_count, call_count, line_count)
-
- #print("baseline = {:.2f}, covered = {:.2f}".format(baseline, covered))
- # Empirically determined to produce the same numbers as the collected
- # stats from get_stats(), with Python 3.6.
- actual_file_count = 17 + file_count
- actual_call_count = file_count * call_count + 156 * file_count + 85
- actual_line_count = (
- 2 * file_count * call_count * line_count +
- 3 * file_count * call_count +
- 769 * file_count +
- 345
- )
-
- if stats is not None:
- assert actual_file_count == stats['files']
- assert actual_call_count == stats['calls']
- assert actual_line_count == stats['lines']
- print("File counts", file_count, actual_file_count, stats['files'])
- print("Call counts", call_count, actual_call_count, stats['calls'])
- print("Line counts", line_count, actual_line_count, stats['lines'])
- print()
-
- return StressResult(
- actual_file_count,
- actual_call_count,
- actual_line_count,
- baseline,
- covered,
- )
-
- fixed = 200
- numlo = 100
- numhi = 100
- step = 50
- runs = 5
-
- def count_operations(self):
-
- def operations(thing):
- for _ in range(self.runs):
- for n in range(self.numlo, self.numhi+1, self.step):
- kwargs = {
- "file_count": self.fixed,
- "call_count": self.fixed,
- "line_count": self.fixed,
- }
- kwargs[thing+"_count"] = n
- yield kwargs['file_count'] * kwargs['call_count'] * kwargs['line_count']
-
- ops = sum(sum(operations(thing)) for thing in ["file", "call", "line"])
- print(f"{ops/1e6:.1f}M operations")
-
- def check_coefficients(self):
- # For checking the calculation of actual stats:
- for f in range(1, 6):
- for c in range(1, 6):
- for l in range(1, 6):
- _, _, stats = self._run_scenario(f, c, l)
- print("{0},{1},{2},{3[files]},{3[calls]},{3[lines]}".format(f, c, l, stats))
-
- def stress_test(self):
- # For checking the overhead for each component:
- def time_thing(thing):
- per_thing = []
- pct_thing = []
- for _ in range(self.runs):
- for n in range(self.numlo, self.numhi+1, self.step):
- kwargs = {
- "file_count": self.fixed,
- "call_count": self.fixed,
- "line_count": self.fixed,
- }
- kwargs[thing+"_count"] = n
- res = self._compute_overhead(**kwargs)
- per_thing.append(res.overhead / getattr(res, f"{thing}s"))
- pct_thing.append(res.covered / res.baseline * 100)
-
- out = f"Per {thing}: "
- out += "mean = {:9.3f}us, stddev = {:8.3f}us, ".format(
- statistics.mean(per_thing)*1e6, statistics.stdev(per_thing)*1e6
- )
- out += f"min = {min(per_thing)*1e6:9.3f}us, "
- out += "pct = {:6.1f}%, stddev = {:6.1f}%".format(
- statistics.mean(pct_thing), statistics.stdev(pct_thing)
- )
- print(out)
-
- time_thing("file")
- time_thing("call")
- time_thing("line")
-
-
-if __name__ == '__main__':
- with tempfile.TemporaryDirectory(prefix="coverage_stress_") as tempdir:
- print(f"Working in {tempdir}")
- os.chdir(tempdir)
- sys.path.insert(0, ".")
-
- StressTest().stress_test()