summaryrefslogtreecommitdiff
path: root/tests/test_concurrency.py
diff options
context:
space:
mode:
authorNed Batchelder <ned@nedbatchelder.com>2018-02-22 06:38:12 -0500
committerNed Batchelder <ned@nedbatchelder.com>2018-02-22 06:38:12 -0500
commitedf8ff449138ce039e2a1445cab89b53d2333aa3 (patch)
treea0eeb1c10c15afe5b113fa2113d8835a120d4a97 /tests/test_concurrency.py
parent23963b47d4774546e28be0eafdd191f753e0866f (diff)
downloadpython-coveragepy-git-edf8ff449138ce039e2a1445cab89b53d2333aa3.tar.gz
More pragmas to fine-tune coverage of test code
Diffstat (limited to 'tests/test_concurrency.py')
-rw-r--r--tests/test_concurrency.py22
1 files changed, 11 insertions, 11 deletions
diff --git a/tests/test_concurrency.py b/tests/test_concurrency.py
index 76e1d9e4..88f2b50d 100644
--- a/tests/test_concurrency.py
+++ b/tests/test_concurrency.py
@@ -464,7 +464,7 @@ def test_coverage_stop_in_threads():
has_started_coverage = []
has_stopped_coverage = []
- def run_thread():
+ def run_thread(): # pragma: nested
"""Check that coverage is stopping properly in threads."""
deadline = time.time() + 5
ident = threading.currentThread().ident
@@ -480,11 +480,11 @@ def test_coverage_stop_in_threads():
cov = coverage.coverage()
cov.start()
- t = threading.Thread(target=run_thread)
- t.start()
+ t = threading.Thread(target=run_thread) # pragma: nested
+ t.start() # pragma: nested
- time.sleep(0.1)
- cov.stop()
+ time.sleep(0.1) # pragma: nested
+ cov.stop() # pragma: nested
time.sleep(0.1)
assert has_started_coverage == [t.ident]
@@ -513,7 +513,7 @@ def test_thread_safe_save_data(tmpdir):
for module_name in module_names:
import_local_file(module_name)
- def random_load():
+ def random_load(): # pragma: nested
"""Import modules randomly to stress coverage."""
while should_run[0]:
module_name = random.choice(module_names)
@@ -529,12 +529,12 @@ def test_thread_safe_save_data(tmpdir):
cov = coverage.coverage()
cov.start()
- threads = [threading.Thread(target=random_load) for _ in range(10)]
- should_run[0] = True
- for t in threads:
+ threads = [threading.Thread(target=random_load) for _ in range(10)] # pragma: nested
+ should_run[0] = True # pragma: nested
+ for t in threads: # pragma: nested
t.start()
- time.sleep(duration)
+ time.sleep(duration) # pragma: nested
cov.stop()
@@ -546,7 +546,7 @@ def test_thread_safe_save_data(tmpdir):
for t in threads:
t.join()
- if (not imported) and duration < 10:
+ if (not imported) and duration < 10: # pragma: only failure
duration *= 2
finally: