summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnthony Sottile <asottile@umich.edu>2020-08-27 22:44:56 +0000
committerAnthony Sottile <asottile@umich.edu>2020-08-27 22:44:56 +0000
commit3765318dcf58ccecc076907f05bd259801c71556 (patch)
tree1f57c3910980c203a435dcab8600a485bd8ca859
parentabdc9b14d6c7b531b001caf3d4086f2f25a05c2a (diff)
parente6a5f6a66391d46d8bfbc2a39cbf3e78ffff0d82 (diff)
downloadflake8-3765318dcf58ccecc076907f05bd259801c71556.tar.gz
Merge branch 'bugfix/cpython-3770-semopen-missing' into 'master'
Support linting when missing sem_open syscall See merge request pycqa/flake8!448
-rw-r--r--src/flake8/checker.py34
-rw-r--r--tests/integration/test_checker.py35
2 files changed, 58 insertions, 11 deletions
diff --git a/src/flake8/checker.py b/src/flake8/checker.py
index d993cb9..ea8b5d3 100644
--- a/src/flake8/checker.py
+++ b/src/flake8/checker.py
@@ -9,7 +9,7 @@ import tokenize
from typing import Dict, List, Optional, Tuple
try:
- import multiprocessing
+ import multiprocessing.pool
except ImportError:
multiprocessing = None # type: ignore
@@ -262,18 +262,16 @@ class Manager(object):
results_found += len(results)
return (results_found, results_reported)
- def run_parallel(self):
+ def run_parallel(self): # type: () -> None
"""Run the checkers in parallel."""
# fmt: off
final_results = collections.defaultdict(list) # type: Dict[str, List[Tuple[str, int, int, str, Optional[str]]]] # noqa: E501
- final_statistics = collections.defaultdict(dict) # type: Dict[str, Dict[str, None]] # noqa: E501
+ final_statistics = collections.defaultdict(dict) # type: Dict[str, Dict[str, int]] # noqa: E501
# fmt: on
- try:
- pool = multiprocessing.Pool(self.jobs, _pool_init)
- except OSError as oserr:
- if oserr.errno not in SERIAL_RETRY_ERRNOS:
- raise
+ pool = _try_initialize_processpool(self.jobs)
+
+ if pool is None:
self.run_serial()
return
@@ -303,12 +301,12 @@ class Manager(object):
checker.results = final_results[filename]
checker.statistics = final_statistics[filename]
- def run_serial(self):
+ def run_serial(self): # type: () -> None
"""Run the checkers in serial."""
for checker in self.checkers:
checker.run_checks()
- def run(self):
+ def run(self): # type: () -> None
"""Run all the checkers.
This will intelligently decide whether to run the checks in parallel
@@ -634,11 +632,25 @@ class FileChecker(object):
self.run_physical_checks(line + "\n")
-def _pool_init():
+def _pool_init(): # type: () -> None
"""Ensure correct signaling of ^C using multiprocessing.Pool."""
signal.signal(signal.SIGINT, signal.SIG_IGN)
+def _try_initialize_processpool(job_count):
+ # type: (int) -> Optional[multiprocessing.pool.Pool]
+ """Return a new process pool instance if we are able to create one."""
+ try:
+ return multiprocessing.Pool(job_count, _pool_init)
+ except OSError as err:
+ if err.errno not in SERIAL_RETRY_ERRNOS:
+ raise
+ except ImportError:
+ pass
+
+ return None
+
+
def calculate_pool_chunksize(num_checkers, num_jobs):
"""Determine the chunksize for the multiprocessing Pool.
diff --git a/tests/integration/test_checker.py b/tests/integration/test_checker.py
index 0acdb6e..836b543 100644
--- a/tests/integration/test_checker.py
+++ b/tests/integration/test_checker.py
@@ -246,3 +246,38 @@ def test_report_order(results, expected_order):
with mock.patch.object(manager, '_handle_results', handler):
assert manager.report() == (len(results), len(results))
handler.assert_called_once_with('placeholder', expected_results)
+
+
+def test_acquire_when_multiprocessing_pool_can_initialize():
+ """Verify successful importing of hardware semaphore support.
+
+ Mock the behaviour of a platform that has a hardware sem_open
+ implementation, and then attempt to initialize a multiprocessing
+ Pool object.
+
+ This simulates the behaviour on most common platforms.
+ """
+ with mock.patch("multiprocessing.Pool") as pool:
+ result = checker._try_initialize_processpool(2)
+
+ pool.assert_called_once_with(2, checker._pool_init)
+ assert result is pool.return_value
+
+
+def test_acquire_when_multiprocessing_pool_can_not_initialize():
+ """Verify unsuccessful importing of hardware semaphore support.
+
+ Mock the behaviour of a platform that has not got a hardware sem_open
+ implementation, and then attempt to initialize a multiprocessing
+ Pool object.
+
+ This scenario will occur on platforms such as Termux and on some
+ more exotic devices.
+
+ https://github.com/python/cpython/blob/4e02981de0952f54bf87967f8e10d169d6946b40/Lib/multiprocessing/synchronize.py#L30-L33
+ """
+ with mock.patch("multiprocessing.Pool", side_effect=ImportError) as pool:
+ result = checker._try_initialize_processpool(2)
+
+ pool.assert_called_once_with(2, checker._pool_init)
+ assert result is None