summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIan Stapleton Cordasco <graffatcolmingov@gmail.com>2018-04-15 19:31:31 +0000
committerIan Stapleton Cordasco <graffatcolmingov@gmail.com>2018-04-15 19:31:31 +0000
commitd0db8497b97cdcb0207da1a3b994e91ba554896c (patch)
tree7c0a2e56a346770b60dcb90b94bcc81fd3286492
parent730071cdafcf73346ee11b222cbf66dcc84645c9 (diff)
parentc16c0c904150c924f3920eb67ae7d242f9775277 (diff)
downloadflake8-d0db8497b97cdcb0207da1a3b994e91ba554896c.tar.gz
Merge branch 'master' into 'master'
Move all uses of `pool` inside `run_parallel()`. Closes #410 See merge request pycqa/flake8!228
-rw-r--r--src/flake8/checker.py61
-rw-r--r--src/flake8/main/application.py1
-rw-r--r--tests/unit/test_checker_manager.py10
3 files changed, 38 insertions, 34 deletions
diff --git a/src/flake8/checker.py b/src/flake8/checker.py
index 7a18ce5..97c5ea1 100644
--- a/src/flake8/checker.py
+++ b/src/flake8/checker.py
@@ -74,7 +74,6 @@ class Manager(object):
self.checks = checker_plugins
self.jobs = self._job_count()
self.using_multiprocessing = self.jobs > 1
- self.pool = None
self.processes = []
self.checkers = []
self.statistics = {
@@ -84,14 +83,6 @@ class Manager(object):
'tokens': 0,
}
- if self.using_multiprocessing:
- try:
- self.pool = multiprocessing.Pool(self.jobs, _pool_init)
- except OSError as oserr:
- if oserr.errno not in SERIAL_RETRY_ERRNOS:
- raise
- self.using_multiprocessing = False
-
def _process_statistics(self):
for checker in self.checkers:
for statistic in defaults.STATISTIC_NAMES:
@@ -268,30 +259,40 @@ class Manager(object):
results_found += len(results)
return (results_found, results_reported)
- def _force_cleanup(self):
- if self.pool is not None:
- self.pool.terminate()
- self.pool.join()
-
def run_parallel(self):
"""Run the checkers in parallel."""
final_results = collections.defaultdict(list)
final_statistics = collections.defaultdict(dict)
- pool_map = self.pool.imap_unordered(
- _run_checks,
- self.checkers,
- chunksize=calculate_pool_chunksize(
- len(self.checkers),
- self.jobs,
- ),
- )
- for ret in pool_map:
- filename, results, statistics = ret
- final_results[filename] = results
- final_statistics[filename] = statistics
- self.pool.close()
- self.pool.join()
- self.pool = None
+
+ try:
+ pool = multiprocessing.Pool(self.jobs, _pool_init)
+ except OSError as oserr:
+ if oserr.errno not in SERIAL_RETRY_ERRNOS:
+ raise
+ self.using_multiprocessing = False
+ self.run_serial()
+ return
+
+ try:
+ pool_map = pool.imap_unordered(
+ _run_checks,
+ self.checkers,
+ chunksize=calculate_pool_chunksize(
+ len(self.checkers),
+ self.jobs,
+ ),
+ )
+ for ret in pool_map:
+ filename, results, statistics = ret
+ final_results[filename] = results
+ final_statistics[filename] = statistics
+ pool.close()
+ pool.join()
+ pool = None
+ finally:
+ if pool is not None:
+ pool.terminate()
+ pool.join()
for checker in self.checkers:
filename = checker.display_name
@@ -328,8 +329,6 @@ class Manager(object):
except KeyboardInterrupt:
LOG.warning('Flake8 was interrupted by the user')
raise exceptions.EarlyQuit('Early quit while running checks')
- finally:
- self._force_cleanup()
def start(self, paths=None):
"""Start checking files.
diff --git a/src/flake8/main/application.py b/src/flake8/main/application.py
index aed6175..9c15629 100644
--- a/src/flake8/main/application.py
+++ b/src/flake8/main/application.py
@@ -405,7 +405,6 @@ class Application(object):
print('... stopped')
LOG.critical('Caught keyboard interrupt from user')
LOG.exception(exc)
- self.file_checker_manager._force_cleanup()
self.catastrophic_failure = True
except exceptions.ExecutionError as exc:
print('There was a critical error during execution of Flake8:')
diff --git a/tests/unit/test_checker_manager.py b/tests/unit/test_checker_manager.py
index 82fa3f8..02397f0 100644
--- a/tests/unit/test_checker_manager.py
+++ b/tests/unit/test_checker_manager.py
@@ -22,17 +22,23 @@ def test_oserrors_cause_serial_fall_back():
style_guide = style_guide_mock()
with mock.patch('_multiprocessing.SemLock', side_effect=err):
manager = checker.Manager(style_guide, [], [])
+ with mock.patch.object(manager, 'run_serial') as serial:
+ manager.run()
+ assert serial.call_count == 1
assert manager.using_multiprocessing is False
@mock.patch('flake8.utils.is_windows', return_value=False)
def test_oserrors_are_reraised(is_windows):
- """Verify that OSErrors will cause the Manager to fallback to serial."""
+ """Verify that unexpected OSErrors will cause the Manager to reraise."""
err = OSError(errno.EAGAIN, 'Ominous message')
style_guide = style_guide_mock()
with mock.patch('_multiprocessing.SemLock', side_effect=err):
with pytest.raises(OSError):
- checker.Manager(style_guide, [], [])
+ manager = checker.Manager(style_guide, [], [])
+ with mock.patch.object(manager, 'run_serial') as serial:
+ manager.run()
+ assert serial.call_count == 0
def test_multiprocessing_is_disabled():