summaryrefslogtreecommitdiff
path: root/sphinx/util/parallel.py
diff options
context:
space:
mode:
Diffstat (limited to 'sphinx/util/parallel.py')
-rw-r--r--sphinx/util/parallel.py109
1 files changed, 50 insertions, 59 deletions
diff --git a/sphinx/util/parallel.py b/sphinx/util/parallel.py
index 1d1e0a098..bace0b5fd 100644
--- a/sphinx/util/parallel.py
+++ b/sphinx/util/parallel.py
@@ -5,20 +5,21 @@
Parallel building utilities.
- :copyright: Copyright 2007-2015 by the Sphinx team, see AUTHORS.
+ :copyright: Copyright 2007-2016 by the Sphinx team, see AUTHORS.
:license: BSD, see LICENSE for details.
"""
import os
+import time
import traceback
+from math import sqrt
try:
import multiprocessing
- import threading
except ImportError:
- multiprocessing = threading = None
+ multiprocessing = None
-from six.moves import queue
+from six import iteritems
from sphinx.errors import SphinxParallelError
@@ -49,17 +50,20 @@ class ParallelTasks(object):
def __init__(self, nproc):
self.nproc = nproc
- # list of threads to join when waiting for completion
- self._taskid = 0
- self._threads = {}
- self._nthreads = 0
- # queue of result objects to process
- self.result_queue = queue.Queue()
- self._nprocessed = 0
- # maps tasks to result functions
+ # (optional) function performed by each task on the result of main task
self._result_funcs = {}
- # allow only "nproc" worker processes at once
- self._semaphore = threading.Semaphore(self.nproc)
+ # task arguments
+ self._args = {}
+ # list of subprocesses (both started and waiting)
+ self._procs = {}
+ # list of receiving pipe connections of running subprocesses
+ self._precvs = {}
+ # list of receiving pipe connections of waiting subprocesses
+ self._precvsWaiting = {}
+ # number of working subprocesses
+ self._pworking = 0
+ # task number of each subprocess
+ self._taskid = 0
def _process(self, pipe, func, arg):
try:
@@ -71,61 +75,48 @@ class ParallelTasks(object):
except BaseException as err:
pipe.send((True, (err, traceback.format_exc())))
- def _process_thread(self, tid, func, arg):
- precv, psend = multiprocessing.Pipe(False)
- proc = multiprocessing.Process(target=self._process,
- args=(psend, func, arg))
- proc.start()
- result = precv.recv()
- self.result_queue.put((tid, arg) + result)
- proc.join()
- self._semaphore.release()
-
def add_task(self, task_func, arg=None, result_func=None):
tid = self._taskid
self._taskid += 1
- self._semaphore.acquire()
- thread = threading.Thread(target=self._process_thread,
- args=(tid, task_func, arg))
- thread.setDaemon(True)
- thread.start()
- self._nthreads += 1
- self._threads[tid] = thread
- self._result_funcs[tid] = result_func or (lambda *x: None)
- # try processing results already in parallel
- try:
- tid, arg, exc, result = self.result_queue.get(False)
- except queue.Empty:
- pass
- else:
- del self._threads[tid]
- if exc:
- raise SphinxParallelError(*result)
- result_func = self._result_funcs.pop(tid)(arg, result)
- if result_func:
- result_func(result)
- self._nprocessed += 1
+ self._result_funcs[tid] = result_func or (lambda arg: None)
+ self._args[tid] = arg
+ precv, psend = multiprocessing.Pipe(False)
+ proc = multiprocessing.Process(target=self._process,
+ args=(psend, task_func, arg))
+ self._procs[tid] = proc
+ self._precvsWaiting[tid] = precv
+ self._join_one()
def join(self):
- while self._nprocessed < self._nthreads:
- tid, arg, exc, result = self.result_queue.get()
- del self._threads[tid]
- if exc:
- raise SphinxParallelError(*result)
- result_func = self._result_funcs.pop(tid)(arg, result)
- if result_func:
- result_func(result)
- self._nprocessed += 1
-
- # there shouldn't be any threads left...
- for t in self._threads.values():
- t.join()
+ while self._pworking:
+ self._join_one()
+
+ def _join_one(self):
+ for tid, pipe in iteritems(self._precvs):
+ if pipe.poll():
+ exc, result = pipe.recv()
+ if exc:
+ raise SphinxParallelError(*result)
+ self._result_funcs.pop(tid)(self._args.pop(tid), result)
+ self._procs[tid].join()
+ self._pworking -= 1
+ break
+ else:
+ time.sleep(0.02)
+ while self._precvsWaiting and self._pworking < self.nproc:
+ newtid, newprecv = self._precvsWaiting.popitem()
+ self._precvs[newtid] = newprecv
+ self._procs[newtid].start()
+ self._pworking += 1
def make_chunks(arguments, nproc, maxbatch=10):
# determine how many documents to read in one go
nargs = len(arguments)
- chunksize = min(nargs // nproc, maxbatch)
+ chunksize = nargs // nproc
+ if chunksize >= maxbatch:
+ # try to improve batch size vs. number of batches
+ chunksize = int(sqrt(nargs/nproc * maxbatch))
if chunksize == 0:
chunksize = 1
nchunks, rest = divmod(nargs, chunksize)