diff options
Diffstat (limited to 'sphinx/util/parallel.py')
-rw-r--r-- | sphinx/util/parallel.py | 55 |
1 files changed, 39 insertions, 16 deletions
diff --git a/sphinx/util/parallel.py b/sphinx/util/parallel.py index 109448e05..bcc6117f6 100644 --- a/sphinx/util/parallel.py +++ b/sphinx/util/parallel.py @@ -13,15 +13,22 @@ import os import time import traceback from math import sqrt +from six import iteritems try: import multiprocessing except ImportError: multiprocessing = None -from six import iteritems - from sphinx.errors import SphinxParallelError +from sphinx.util import logging + +if False: + # For type annotation + from typing import Any, Callable, Dict, List, Sequence # NOQA + +logger = logging.getLogger(__name__) + # our parallel functionality only works for the forking Process parallel_available = multiprocessing and (os.name == 'posix') @@ -31,9 +38,11 @@ class SerialTasks(object): """Has the same interface as ParallelTasks, but executes tasks directly.""" def __init__(self, nproc=1): + # type: (int) -> None pass def add_task(self, task_func, arg=None, result_func=None): + # type: (Callable, Any, Callable) -> None if arg is not None: res = task_func(arg) else: @@ -42,6 +51,7 @@ class SerialTasks(object): result_func(res) def join(self): + # type: () -> None pass @@ -49,37 +59,45 @@ class ParallelTasks(object): """Executes *nproc* tasks in parallel after forking.""" def __init__(self, nproc): + # type: (int) -> None self.nproc = nproc # (optional) function performed by each task on the result of main task - self._result_funcs = {} + self._result_funcs = {} # type: Dict[int, Callable] # task arguments - self._args = {} + self._args = {} # type: Dict[int, List[Any]] # list of subprocesses (both started and waiting) - self._procs = {} + self._procs = {} # type: Dict[int, multiprocessing.Process] # list of receiving pipe connections of running subprocesses - self._precvs = {} + self._precvs = {} # type: Dict[int, Any] # list of receiving pipe connections of waiting subprocesses - self._precvsWaiting = {} + self._precvsWaiting = {} # type: Dict[int, Any] # number of working subprocesses self._pworking = 0 # task number of each subprocess self._taskid = 0 def _process(self, pipe, func, arg): + # type: (Any, Callable, Any) -> None try: - if arg is None: - ret = func() - else: - ret = func(arg) - pipe.send((False, ret)) + collector = logging.LogCollector() + with collector.collect(): + if arg is None: + ret = func() + else: + ret = func(arg) + failed = False except BaseException as err: - errmsg = traceback.format_exception_only(err.__class__, err)[0].strip() - pipe.send((True, (errmsg, traceback.format_exc()))) + failed = True + errmsg = traceback.format_exception_only(err.__class__, err)[0].strip() # type: ignore # NOQA + ret = (errmsg, traceback.format_exc()) + logging.convert_serializable(collector.logs) + pipe.send((failed, collector.logs, ret)) def add_task(self, task_func, arg=None, result_func=None): + # type: (Callable, Any, Callable) -> None tid = self._taskid self._taskid += 1 - self._result_funcs[tid] = result_func or (lambda arg: None) + self._result_funcs[tid] = result_func or (lambda arg, result: None) self._args[tid] = arg precv, psend = multiprocessing.Pipe(False) proc = multiprocessing.Process(target=self._process, @@ -89,15 +107,19 @@ class ParallelTasks(object): self._join_one() def join(self): + # type: () -> None while self._pworking: self._join_one() def _join_one(self): + # type: () -> None for tid, pipe in iteritems(self._precvs): if pipe.poll(): - exc, result = pipe.recv() + exc, logs, result = pipe.recv() if exc: raise SphinxParallelError(*result) + for log in logs: + logger.handle(log) self._result_funcs.pop(tid)(self._args.pop(tid), result) self._procs[tid].join() self._pworking -= 1 @@ -112,6 +134,7 @@ class ParallelTasks(object): def make_chunks(arguments, nproc, maxbatch=10): + # type: (Sequence[unicode], int, int) -> List[Any] # determine how many documents to read in one go nargs = len(arguments) chunksize = nargs // nproc |