summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/flake8/checker.py34
1 files changed, 23 insertions, 11 deletions
diff --git a/src/flake8/checker.py b/src/flake8/checker.py
index d993cb9..ea8b5d3 100644
--- a/src/flake8/checker.py
+++ b/src/flake8/checker.py
@@ -9,7 +9,7 @@ import tokenize
from typing import Dict, List, Optional, Tuple
try:
- import multiprocessing
+ import multiprocessing.pool
except ImportError:
multiprocessing = None # type: ignore
@@ -262,18 +262,16 @@ class Manager(object):
results_found += len(results)
return (results_found, results_reported)
- def run_parallel(self):
+ def run_parallel(self): # type: () -> None
"""Run the checkers in parallel."""
# fmt: off
final_results = collections.defaultdict(list) # type: Dict[str, List[Tuple[str, int, int, str, Optional[str]]]] # noqa: E501
- final_statistics = collections.defaultdict(dict) # type: Dict[str, Dict[str, None]] # noqa: E501
+ final_statistics = collections.defaultdict(dict) # type: Dict[str, Dict[str, int]] # noqa: E501
# fmt: on
- try:
- pool = multiprocessing.Pool(self.jobs, _pool_init)
- except OSError as oserr:
- if oserr.errno not in SERIAL_RETRY_ERRNOS:
- raise
+ pool = _try_initialize_processpool(self.jobs)
+
+ if pool is None:
self.run_serial()
return
@@ -303,12 +301,12 @@ class Manager(object):
checker.results = final_results[filename]
checker.statistics = final_statistics[filename]
- def run_serial(self):
+ def run_serial(self): # type: () -> None
"""Run the checkers in serial."""
for checker in self.checkers:
checker.run_checks()
- def run(self):
+ def run(self): # type: () -> None
"""Run all the checkers.
This will intelligently decide whether to run the checks in parallel
@@ -634,11 +632,25 @@ class FileChecker(object):
self.run_physical_checks(line + "\n")
-def _pool_init():
+def _pool_init(): # type: () -> None
"""Ensure correct signaling of ^C using multiprocessing.Pool."""
signal.signal(signal.SIGINT, signal.SIG_IGN)
+def _try_initialize_processpool(job_count):
+ # type: (int) -> Optional[multiprocessing.pool.Pool]
+ """Return a new process pool instance if we are able to create one."""
+ try:
+ return multiprocessing.Pool(job_count, _pool_init)
+ except OSError as err:
+ if err.errno not in SERIAL_RETRY_ERRNOS:
+ raise
+ except ImportError:
+ pass
+
+ return None
+
+
def calculate_pool_chunksize(num_checkers, num_jobs):
"""Determine the chunksize for the multiprocessing Pool.