diff options
| author | Ian Stapleton Cordasco <graffatcolmingov@gmail.com> | 2018-04-15 19:31:31 +0000 |
|---|---|---|
| committer | Ian Stapleton Cordasco <graffatcolmingov@gmail.com> | 2018-04-15 19:31:31 +0000 |
| commit | d0db8497b97cdcb0207da1a3b994e91ba554896c (patch) | |
| tree | 7c0a2e56a346770b60dcb90b94bcc81fd3286492 | |
| parent | 730071cdafcf73346ee11b222cbf66dcc84645c9 (diff) | |
| parent | c16c0c904150c924f3920eb67ae7d242f9775277 (diff) | |
| download | flake8-d0db8497b97cdcb0207da1a3b994e91ba554896c.tar.gz | |
Merge branch 'master' into 'master'
Move all uses of `pool` inside `run_parallel()`.
Closes #410
See merge request pycqa/flake8!228
| -rw-r--r-- | src/flake8/checker.py | 61 | ||||
| -rw-r--r-- | src/flake8/main/application.py | 1 | ||||
| -rw-r--r-- | tests/unit/test_checker_manager.py | 10 |
3 files changed, 38 insertions, 34 deletions
diff --git a/src/flake8/checker.py b/src/flake8/checker.py index 7a18ce5..97c5ea1 100644 --- a/src/flake8/checker.py +++ b/src/flake8/checker.py @@ -74,7 +74,6 @@ class Manager(object): self.checks = checker_plugins self.jobs = self._job_count() self.using_multiprocessing = self.jobs > 1 - self.pool = None self.processes = [] self.checkers = [] self.statistics = { @@ -84,14 +83,6 @@ class Manager(object): 'tokens': 0, } - if self.using_multiprocessing: - try: - self.pool = multiprocessing.Pool(self.jobs, _pool_init) - except OSError as oserr: - if oserr.errno not in SERIAL_RETRY_ERRNOS: - raise - self.using_multiprocessing = False - def _process_statistics(self): for checker in self.checkers: for statistic in defaults.STATISTIC_NAMES: @@ -268,30 +259,40 @@ class Manager(object): results_found += len(results) return (results_found, results_reported) - def _force_cleanup(self): - if self.pool is not None: - self.pool.terminate() - self.pool.join() - def run_parallel(self): """Run the checkers in parallel.""" final_results = collections.defaultdict(list) final_statistics = collections.defaultdict(dict) - pool_map = self.pool.imap_unordered( - _run_checks, - self.checkers, - chunksize=calculate_pool_chunksize( - len(self.checkers), - self.jobs, - ), - ) - for ret in pool_map: - filename, results, statistics = ret - final_results[filename] = results - final_statistics[filename] = statistics - self.pool.close() - self.pool.join() - self.pool = None + + try: + pool = multiprocessing.Pool(self.jobs, _pool_init) + except OSError as oserr: + if oserr.errno not in SERIAL_RETRY_ERRNOS: + raise + self.using_multiprocessing = False + self.run_serial() + return + + try: + pool_map = pool.imap_unordered( + _run_checks, + self.checkers, + chunksize=calculate_pool_chunksize( + len(self.checkers), + self.jobs, + ), + ) + for ret in pool_map: + filename, results, statistics = ret + final_results[filename] = results + final_statistics[filename] = statistics + pool.close() + pool.join() + pool = None + finally: + if pool is not None: + pool.terminate() + pool.join() for checker in self.checkers: filename = checker.display_name @@ -328,8 +329,6 @@ class Manager(object): except KeyboardInterrupt: LOG.warning('Flake8 was interrupted by the user') raise exceptions.EarlyQuit('Early quit while running checks') - finally: - self._force_cleanup() def start(self, paths=None): """Start checking files. diff --git a/src/flake8/main/application.py b/src/flake8/main/application.py index aed6175..9c15629 100644 --- a/src/flake8/main/application.py +++ b/src/flake8/main/application.py @@ -405,7 +405,6 @@ class Application(object): print('... stopped') LOG.critical('Caught keyboard interrupt from user') LOG.exception(exc) - self.file_checker_manager._force_cleanup() self.catastrophic_failure = True except exceptions.ExecutionError as exc: print('There was a critical error during execution of Flake8:') diff --git a/tests/unit/test_checker_manager.py b/tests/unit/test_checker_manager.py index 82fa3f8..02397f0 100644 --- a/tests/unit/test_checker_manager.py +++ b/tests/unit/test_checker_manager.py @@ -22,17 +22,23 @@ def test_oserrors_cause_serial_fall_back(): style_guide = style_guide_mock() with mock.patch('_multiprocessing.SemLock', side_effect=err): manager = checker.Manager(style_guide, [], []) + with mock.patch.object(manager, 'run_serial') as serial: + manager.run() + assert serial.call_count == 1 assert manager.using_multiprocessing is False @mock.patch('flake8.utils.is_windows', return_value=False) def test_oserrors_are_reraised(is_windows): - """Verify that OSErrors will cause the Manager to fallback to serial.""" + """Verify that unexpected OSErrors will cause the Manager to reraise.""" err = OSError(errno.EAGAIN, 'Ominous message') style_guide = style_guide_mock() with mock.patch('_multiprocessing.SemLock', side_effect=err): with pytest.raises(OSError): - checker.Manager(style_guide, [], []) + manager = checker.Manager(style_guide, [], []) + with mock.patch.object(manager, 'run_serial') as serial: + manager.run() + assert serial.call_count == 0 def test_multiprocessing_is_disabled(): |
