summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIan Cordasco <graffatcolmingov@gmail.com>2016-12-20 18:31:28 -0600
committerIan Cordasco <graffatcolmingov@gmail.com>2016-12-20 18:31:28 -0600
commitcf0115e105c5963a45446b2cd511c076606119aa (patch)
treed34587f944a8ba5a55f8f97abd04320825261bee
parent348722d77b32ad2ecc064559bb8b4cae08e85549 (diff)
parent9b8f038a97477a93ae7c7860ebfdcda0b85ed538 (diff)
downloadflake8-cf0115e105c5963a45446b2cd511c076606119aa.tar.gz
Merge branch 'perf-improvements'
-rw-r--r--CONTRIBUTORS.txt1
-rw-r--r--docs/source/release-notes/3.3.0.rst4
-rw-r--r--src/flake8/checker.py172
-rw-r--r--src/flake8/processor.py1
-rw-r--r--tests/unit/test_checker_manager.py4
-rw-r--r--tests/unit/test_file_processor.py2
6 files changed, 74 insertions, 110 deletions
diff --git a/CONTRIBUTORS.txt b/CONTRIBUTORS.txt
index 78ea654..59c227d 100644
--- a/CONTRIBUTORS.txt
+++ b/CONTRIBUTORS.txt
@@ -25,3 +25,4 @@ Contributors (by order of appearance) :
- Tyrel Souza
- Corey Farwell
- Michael Penkov
+- Anthony Sottile
diff --git a/docs/source/release-notes/3.3.0.rst b/docs/source/release-notes/3.3.0.rst
index fc7c2f4..0b0feae 100644
--- a/docs/source/release-notes/3.3.0.rst
+++ b/docs/source/release-notes/3.3.0.rst
@@ -3,6 +3,8 @@
You can view the `3.3.0 milestone`_ on GitLab for more details.
+- Dramatically improve the performance of Flake8 (`GitLab!156`_)
+
- Fix problem where hooks should only check \*.py files. (See also
`GitLab#268`_)
@@ -14,3 +16,5 @@ You can view the `3.3.0 milestone`_ on GitLab for more details.
https://gitlab.com/pycqa/flake8/milestones/16
.. _GitLab#268:
https://gitlab.com/pycqa/flake8/issues/268
+.. _GitLab!156:
+ https://gitlab.com/pycqa/flake8/merge_requests/156
diff --git a/src/flake8/checker.py b/src/flake8/checker.py
index 18e4937..4cd049d 100644
--- a/src/flake8/checker.py
+++ b/src/flake8/checker.py
@@ -1,7 +1,9 @@
"""Checker Manager and Checker classes."""
+import collections
import errno
import logging
import os
+import signal
import sys
import tokenize
@@ -10,11 +12,6 @@ try:
except ImportError:
multiprocessing = None
-try:
- import Queue as queue
-except ImportError:
- import queue
-
from flake8 import defaults
from flake8 import exceptions
from flake8 import processor
@@ -76,10 +73,8 @@ class Manager(object):
self.options = style_guide.options
self.checks = checker_plugins
self.jobs = self._job_count()
- self.process_queue = None
- self.results_queue = None
- self.statistics_queue = None
self.using_multiprocessing = self.jobs > 1
+ self.pool = None
self.processes = []
self.checkers = []
self.statistics = {
@@ -91,48 +86,17 @@ class Manager(object):
if self.using_multiprocessing:
try:
- self.process_queue = multiprocessing.Queue()
- self.results_queue = multiprocessing.Queue()
- self.statistics_queue = multiprocessing.Queue()
+ self.pool = multiprocessing.Pool(self.jobs, _pool_init)
except OSError as oserr:
if oserr.errno not in SERIAL_RETRY_ERRNOS:
raise
self.using_multiprocessing = False
- @staticmethod
- def _cleanup_queue(q):
- while not q.empty():
- q.get_nowait()
-
- def _force_cleanup(self):
- if self.using_multiprocessing:
- for proc in self.processes:
- proc.join(0.2)
- self._cleanup_queue(self.process_queue)
- self._cleanup_queue(self.results_queue)
- self._cleanup_queue(self.statistics_queue)
-
def _process_statistics(self):
- all_statistics = self.statistics
- if self.using_multiprocessing:
- total_number_of_checkers = len(self.checkers)
- statistics_gathered = 0
- while statistics_gathered < total_number_of_checkers:
- try:
- statistics = self.statistics_queue.get(block=False)
- statistics_gathered += 1
- except queue.Empty:
- break
-
- for statistic in defaults.STATISTIC_NAMES:
- all_statistics[statistic] += statistics[statistic]
- else:
- statistics_generator = (checker.statistics
- for checker in self.checkers)
- for statistics in statistics_generator:
- for statistic in defaults.STATISTIC_NAMES:
- all_statistics[statistic] += statistics[statistic]
- all_statistics['files'] += len(self.checkers)
+ for checker in self.checkers:
+ for statistic in defaults.STATISTIC_NAMES:
+ self.statistics[statistic] += checker.statistics[statistic]
+ self.statistics['files'] += len(self.checkers)
def _job_count(self):
# type: () -> int
@@ -189,19 +153,6 @@ class Manager(object):
# it to an integer
return int(jobs)
- def _results(self):
- seen_done = 0
- LOG.info('Retrieving results')
- while True:
- result = self.results_queue.get()
- if result == 'DONE':
- seen_done += 1
- if seen_done >= self.jobs:
- break
- continue
-
- yield result
-
def _handle_results(self, filename, results):
style_guide = self.style_guide
reported_results_count = 0
@@ -282,12 +233,15 @@ class Manager(object):
is_stdin)
checks = self.checks.to_dictionary()
- self.checkers = [
+ checkers = (
FileChecker(filename, checks, self.options)
for argument in paths
for filename in utils.filenames_from(argument,
self.is_path_excluded)
if should_create_file_checker(filename, argument)
+ )
+ self.checkers = [
+ checker for checker in checkers if checker.should_process
]
LOG.info('Checking %d files', len(self.checkers))
@@ -311,32 +265,41 @@ class Manager(object):
results_found += len(results)
return (results_found, results_reported)
+ def _force_cleanup(self):
+ if self.pool is not None:
+ self.pool.terminate()
+ self.pool.join()
+
def run_parallel(self):
"""Run the checkers in parallel."""
- LOG.info('Starting %d process workers', self.jobs)
- for i in range(self.jobs):
- proc = multiprocessing.Process(
- target=_run_checks_from_queue,
- args=(self.process_queue, self.results_queue,
- self.statistics_queue)
- )
- proc.daemon = True
- proc.start()
- self.processes.append(proc)
-
- final_results = {}
- for (filename, results) in self._results():
+ final_results = collections.defaultdict(list)
+ final_statistics = collections.defaultdict(dict)
+ pool_map = self.pool.imap_unordered(
+ _run_checks,
+ self.checkers,
+ chunksize=calculate_pool_chunksize(
+ len(self.checkers),
+ self.jobs,
+ ),
+ )
+ for ret in pool_map:
+ filename, results, statistics = ret
final_results[filename] = results
+ final_statistics[filename] = statistics
+ self.pool.close()
+ self.pool.join()
+ self.pool = None
for checker in self.checkers:
filename = checker.display_name
- checker.results = sorted(final_results.get(filename, []),
+ checker.results = sorted(final_results[filename],
key=lambda tup: (tup[2], tup[2]))
+ checker.statistics = final_statistics[filename]
def run_serial(self):
"""Run the checkers in serial."""
for checker in self.checkers:
- checker.run_checks(self.results_queue, self.statistics_queue)
+ checker.run_checks()
def run(self):
"""Run all the checkers.
@@ -374,15 +337,6 @@ class Manager(object):
"""
LOG.info('Making checkers')
self.make_checkers(paths)
- if not self.using_multiprocessing:
- return
-
- LOG.info('Populating process queue')
- for checker in self.checkers:
- self.process_queue.put(checker)
-
- for i in range(self.jobs):
- self.process_queue.put('DONE')
def stop(self):
"""Stop checking files."""
@@ -413,13 +367,18 @@ class FileChecker(object):
self.filename = filename
self.checks = checks
self.results = []
- self.processor = self._make_processor()
- self.display_name = self.processor.filename
self.statistics = {
'tokens': 0,
'logical lines': 0,
- 'physical lines': len(self.processor.lines),
+ 'physical lines': 0,
}
+ self.processor = self._make_processor()
+ self.display_name = filename
+ self.should_process = False
+ if self.processor is not None:
+ self.display_name = self.processor.filename
+ self.should_process = not self.processor.should_ignore_file()
+ self.statistics['physical lines'] = len(self.processor.lines)
def _make_processor(self):
try:
@@ -597,11 +556,8 @@ class FileChecker(object):
self.run_physical_checks(file_processor.lines[-1])
self.run_logical_checks()
- def run_checks(self, results_queue, statistics_queue):
+ def run_checks(self):
"""Run checks against the file."""
- if self.processor.should_ignore_file():
- return
-
try:
self.process_tokens()
except exceptions.InvalidSyntax as exc:
@@ -610,13 +566,9 @@ class FileChecker(object):
self.run_ast_checks()
- if results_queue is not None:
- results_queue.put((self.filename, self.results))
-
logical_lines = self.processor.statistics['logical lines']
self.statistics['logical lines'] = logical_lines
- if statistics_queue is not None:
- statistics_queue.put(self.statistics)
+ return self.filename, self.results, self.statistics
def handle_comment(self, token, token_text):
"""Handle the logic when encountering a comment token."""
@@ -663,19 +615,25 @@ class FileChecker(object):
override_error_line=token[4])
-def _run_checks_from_queue(process_queue, results_queue, statistics_queue):
- LOG.info('Running checks in parallel')
- try:
- for checker in iter(process_queue.get, 'DONE'):
- LOG.info('Checking "%s"', checker.filename)
- checker.run_checks(results_queue, statistics_queue)
- except exceptions.PluginRequestedUnknownParameters as exc:
- print(str(exc))
- except Exception as exc:
- LOG.error('Unhandled exception occurred')
- raise
- finally:
- results_queue.put('DONE')
+def _pool_init():
+ """Ensure correct signaling of ^C using multiprocessing.Pool."""
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+
+
+def calculate_pool_chunksize(num_checkers, num_jobs):
+ """Determine the chunksize for the multiprocessing Pool.
+
+ - For chunksize, see: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.imap # noqa
+ - This formula, while not perfect, aims to give each worker two batches of
+ work.
+ - See: https://gitlab.com/pycqa/flake8/merge_requests/156#note_18878876
+ - See: https://gitlab.com/pycqa/flake8/issues/265
+ """
+ return max(num_checkers // (num_jobs * 2), 1)
+
+
+def _run_checks(checker):
+ return checker.run_checks()
def find_offset(offset, mapping):
diff --git a/src/flake8/processor.py b/src/flake8/processor.py
index 7cea038..3827a26 100644
--- a/src/flake8/processor.py
+++ b/src/flake8/processor.py
@@ -367,6 +367,7 @@ def is_eol_token(token):
"""Check if the token is an end-of-line token."""
return token[0] in NEWLINE or token[4][token[3][1]:].lstrip() == '\\\n'
+
if COMMENT_WITH_NL: # If on Python 2.6
def is_eol_token(token, _is_eol_token=is_eol_token):
"""Check if the token is an end-of-line token."""
diff --git a/tests/unit/test_checker_manager.py b/tests/unit/test_checker_manager.py
index e5562ef..82fa3f8 100644
--- a/tests/unit/test_checker_manager.py
+++ b/tests/unit/test_checker_manager.py
@@ -20,7 +20,7 @@ def test_oserrors_cause_serial_fall_back():
"""Verify that OSErrors will cause the Manager to fallback to serial."""
err = OSError(errno.ENOSPC, 'Ominous message about spaceeeeee')
style_guide = style_guide_mock()
- with mock.patch('multiprocessing.Queue', side_effect=err):
+ with mock.patch('_multiprocessing.SemLock', side_effect=err):
manager = checker.Manager(style_guide, [], [])
assert manager.using_multiprocessing is False
@@ -30,7 +30,7 @@ def test_oserrors_are_reraised(is_windows):
"""Verify that OSErrors will cause the Manager to fallback to serial."""
err = OSError(errno.EAGAIN, 'Ominous message')
style_guide = style_guide_mock()
- with mock.patch('multiprocessing.Queue', side_effect=err):
+ with mock.patch('_multiprocessing.SemLock', side_effect=err):
with pytest.raises(OSError):
checker.Manager(style_guide, [], [])
diff --git a/tests/unit/test_file_processor.py b/tests/unit/test_file_processor.py
index c534003..736d21e 100644
--- a/tests/unit/test_file_processor.py
+++ b/tests/unit/test_file_processor.py
@@ -59,7 +59,7 @@ def test_read_lines_from_stdin(stdin_get_value):
stdin_value = mock.Mock()
stdin_value.splitlines.return_value = []
stdin_get_value.return_value = stdin_value
- file_processor = processor.FileProcessor('-', options_from())
+ processor.FileProcessor('-', options_from())
stdin_get_value.assert_called_once_with()
stdin_value.splitlines.assert_called_once_with(True)