summaryrefslogtreecommitdiff
path: root/sphinx/util/parallel.py
diff options
context:
space:
mode:
Diffstat (limited to 'sphinx/util/parallel.py')
-rw-r--r--sphinx/util/parallel.py55
1 files changed, 39 insertions, 16 deletions
diff --git a/sphinx/util/parallel.py b/sphinx/util/parallel.py
index 109448e05..bcc6117f6 100644
--- a/sphinx/util/parallel.py
+++ b/sphinx/util/parallel.py
@@ -13,15 +13,22 @@ import os
import time
import traceback
from math import sqrt
+from six import iteritems
try:
import multiprocessing
except ImportError:
multiprocessing = None
-from six import iteritems
-
from sphinx.errors import SphinxParallelError
+from sphinx.util import logging
+
+if False:
+ # For type annotation
+ from typing import Any, Callable, Dict, List, Sequence # NOQA
+
+logger = logging.getLogger(__name__)
+
# our parallel functionality only works for the forking Process
parallel_available = multiprocessing and (os.name == 'posix')
@@ -31,9 +38,11 @@ class SerialTasks(object):
"""Has the same interface as ParallelTasks, but executes tasks directly."""
def __init__(self, nproc=1):
+ # type: (int) -> None
pass
def add_task(self, task_func, arg=None, result_func=None):
+ # type: (Callable, Any, Callable) -> None
if arg is not None:
res = task_func(arg)
else:
@@ -42,6 +51,7 @@ class SerialTasks(object):
result_func(res)
def join(self):
+ # type: () -> None
pass
@@ -49,37 +59,45 @@ class ParallelTasks(object):
"""Executes *nproc* tasks in parallel after forking."""
def __init__(self, nproc):
+ # type: (int) -> None
self.nproc = nproc
# (optional) function performed by each task on the result of main task
- self._result_funcs = {}
+ self._result_funcs = {} # type: Dict[int, Callable]
# task arguments
- self._args = {}
+ self._args = {} # type: Dict[int, List[Any]]
# list of subprocesses (both started and waiting)
- self._procs = {}
+ self._procs = {} # type: Dict[int, multiprocessing.Process]
# list of receiving pipe connections of running subprocesses
- self._precvs = {}
+ self._precvs = {} # type: Dict[int, Any]
# list of receiving pipe connections of waiting subprocesses
- self._precvsWaiting = {}
+ self._precvsWaiting = {} # type: Dict[int, Any]
# number of working subprocesses
self._pworking = 0
# task number of each subprocess
self._taskid = 0
def _process(self, pipe, func, arg):
+ # type: (Any, Callable, Any) -> None
try:
- if arg is None:
- ret = func()
- else:
- ret = func(arg)
- pipe.send((False, ret))
+ collector = logging.LogCollector()
+ with collector.collect():
+ if arg is None:
+ ret = func()
+ else:
+ ret = func(arg)
+ failed = False
except BaseException as err:
- errmsg = traceback.format_exception_only(err.__class__, err)[0].strip()
- pipe.send((True, (errmsg, traceback.format_exc())))
+ failed = True
+ errmsg = traceback.format_exception_only(err.__class__, err)[0].strip() # type: ignore # NOQA
+ ret = (errmsg, traceback.format_exc())
+ logging.convert_serializable(collector.logs)
+ pipe.send((failed, collector.logs, ret))
def add_task(self, task_func, arg=None, result_func=None):
+ # type: (Callable, Any, Callable) -> None
tid = self._taskid
self._taskid += 1
- self._result_funcs[tid] = result_func or (lambda arg: None)
+ self._result_funcs[tid] = result_func or (lambda arg, result: None)
self._args[tid] = arg
precv, psend = multiprocessing.Pipe(False)
proc = multiprocessing.Process(target=self._process,
@@ -89,15 +107,19 @@ class ParallelTasks(object):
self._join_one()
def join(self):
+ # type: () -> None
while self._pworking:
self._join_one()
def _join_one(self):
+ # type: () -> None
for tid, pipe in iteritems(self._precvs):
if pipe.poll():
- exc, result = pipe.recv()
+ exc, logs, result = pipe.recv()
if exc:
raise SphinxParallelError(*result)
+ for log in logs:
+ logger.handle(log)
self._result_funcs.pop(tid)(self._args.pop(tid), result)
self._procs[tid].join()
self._pworking -= 1
@@ -112,6 +134,7 @@ class ParallelTasks(object):
def make_chunks(arguments, nproc, maxbatch=10):
+ # type: (Sequence[unicode], int, int) -> List[Any]
# determine how many documents to read in one go
nargs = len(arguments)
chunksize = nargs // nproc