diff options
Diffstat (limited to 'sphinx/util/parallel.py')
-rw-r--r-- | sphinx/util/parallel.py | 109 |
1 files changed, 50 insertions, 59 deletions
diff --git a/sphinx/util/parallel.py b/sphinx/util/parallel.py index 1d1e0a098..bace0b5fd 100644 --- a/sphinx/util/parallel.py +++ b/sphinx/util/parallel.py @@ -5,20 +5,21 @@ Parallel building utilities. - :copyright: Copyright 2007-2015 by the Sphinx team, see AUTHORS. + :copyright: Copyright 2007-2016 by the Sphinx team, see AUTHORS. :license: BSD, see LICENSE for details. """ import os +import time import traceback +from math import sqrt try: import multiprocessing - import threading except ImportError: - multiprocessing = threading = None + multiprocessing = None -from six.moves import queue +from six import iteritems from sphinx.errors import SphinxParallelError @@ -49,17 +50,20 @@ class ParallelTasks(object): def __init__(self, nproc): self.nproc = nproc - # list of threads to join when waiting for completion - self._taskid = 0 - self._threads = {} - self._nthreads = 0 - # queue of result objects to process - self.result_queue = queue.Queue() - self._nprocessed = 0 - # maps tasks to result functions + # (optional) function performed by each task on the result of main task self._result_funcs = {} - # allow only "nproc" worker processes at once - self._semaphore = threading.Semaphore(self.nproc) + # task arguments + self._args = {} + # list of subprocesses (both started and waiting) + self._procs = {} + # list of receiving pipe connections of running subprocesses + self._precvs = {} + # list of receiving pipe connections of waiting subprocesses + self._precvsWaiting = {} + # number of working subprocesses + self._pworking = 0 + # task number of each subprocess + self._taskid = 0 def _process(self, pipe, func, arg): try: @@ -71,61 +75,48 @@ class ParallelTasks(object): except BaseException as err: pipe.send((True, (err, traceback.format_exc()))) - def _process_thread(self, tid, func, arg): - precv, psend = multiprocessing.Pipe(False) - proc = multiprocessing.Process(target=self._process, - args=(psend, func, arg)) - proc.start() - result = precv.recv() - self.result_queue.put((tid, arg) + result) - proc.join() - self._semaphore.release() - def add_task(self, task_func, arg=None, result_func=None): tid = self._taskid self._taskid += 1 - self._semaphore.acquire() - thread = threading.Thread(target=self._process_thread, - args=(tid, task_func, arg)) - thread.setDaemon(True) - thread.start() - self._nthreads += 1 - self._threads[tid] = thread - self._result_funcs[tid] = result_func or (lambda *x: None) - # try processing results already in parallel - try: - tid, arg, exc, result = self.result_queue.get(False) - except queue.Empty: - pass - else: - del self._threads[tid] - if exc: - raise SphinxParallelError(*result) - result_func = self._result_funcs.pop(tid)(arg, result) - if result_func: - result_func(result) - self._nprocessed += 1 + self._result_funcs[tid] = result_func or (lambda arg: None) + self._args[tid] = arg + precv, psend = multiprocessing.Pipe(False) + proc = multiprocessing.Process(target=self._process, + args=(psend, task_func, arg)) + self._procs[tid] = proc + self._precvsWaiting[tid] = precv + self._join_one() def join(self): - while self._nprocessed < self._nthreads: - tid, arg, exc, result = self.result_queue.get() - del self._threads[tid] - if exc: - raise SphinxParallelError(*result) - result_func = self._result_funcs.pop(tid)(arg, result) - if result_func: - result_func(result) - self._nprocessed += 1 - - # there shouldn't be any threads left... - for t in self._threads.values(): - t.join() + while self._pworking: + self._join_one() + + def _join_one(self): + for tid, pipe in iteritems(self._precvs): + if pipe.poll(): + exc, result = pipe.recv() + if exc: + raise SphinxParallelError(*result) + self._result_funcs.pop(tid)(self._args.pop(tid), result) + self._procs[tid].join() + self._pworking -= 1 + break + else: + time.sleep(0.02) + while self._precvsWaiting and self._pworking < self.nproc: + newtid, newprecv = self._precvsWaiting.popitem() + self._precvs[newtid] = newprecv + self._procs[newtid].start() + self._pworking += 1 def make_chunks(arguments, nproc, maxbatch=10): # determine how many documents to read in one go nargs = len(arguments) - chunksize = min(nargs // nproc, maxbatch) + chunksize = nargs // nproc + if chunksize >= maxbatch: + # try to improve batch size vs. number of batches + chunksize = int(sqrt(nargs/nproc * maxbatch)) if chunksize == 0: chunksize = 1 nchunks, rest = divmod(nargs, chunksize) |