1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
# -*- coding: utf-8 -*-
"""
sphinx.util.parallel
~~~~~~~~~~~~~~~~~~~~
Parallel building utilities.
:copyright: Copyright 2007-2017 by the Sphinx team, see AUTHORS.
:license: BSD, see LICENSE for details.
"""
import os
import time
import traceback
from math import sqrt
try:
import multiprocessing
except ImportError:
multiprocessing = None
from six import iteritems
from sphinx.errors import SphinxParallelError
# our parallel functionality only works for the forking Process
parallel_available = multiprocessing and (os.name == 'posix')
class SerialTasks(object):
"""Has the same interface as ParallelTasks, but executes tasks directly."""
def __init__(self, nproc=1):
pass
def add_task(self, task_func, arg=None, result_func=None):
if arg is not None:
res = task_func(arg)
else:
res = task_func()
if result_func:
result_func(res)
def join(self):
pass
class ParallelTasks(object):
"""Executes *nproc* tasks in parallel after forking."""
def __init__(self, nproc):
self.nproc = nproc
# (optional) function performed by each task on the result of main task
self._result_funcs = {}
# task arguments
self._args = {}
# list of subprocesses (both started and waiting)
self._procs = {}
# list of receiving pipe connections of running subprocesses
self._precvs = {}
# list of receiving pipe connections of waiting subprocesses
self._precvsWaiting = {}
# number of working subprocesses
self._pworking = 0
# task number of each subprocess
self._taskid = 0
def _process(self, pipe, func, arg):
try:
if arg is None:
ret = func()
else:
ret = func(arg)
pipe.send((False, ret))
except BaseException as err:
errmsg = traceback.format_exception_only(err.__class__, err)[0].strip()
pipe.send((True, (errmsg, traceback.format_exc())))
def add_task(self, task_func, arg=None, result_func=None):
tid = self._taskid
self._taskid += 1
self._result_funcs[tid] = result_func or (lambda arg: None)
self._args[tid] = arg
precv, psend = multiprocessing.Pipe(False)
proc = multiprocessing.Process(target=self._process,
args=(psend, task_func, arg))
self._procs[tid] = proc
self._precvsWaiting[tid] = precv
self._join_one()
def join(self):
while self._pworking:
self._join_one()
def _join_one(self):
for tid, pipe in iteritems(self._precvs):
if pipe.poll():
exc, result = pipe.recv()
if exc:
raise SphinxParallelError(*result)
self._result_funcs.pop(tid)(self._args.pop(tid), result)
self._procs[tid].join()
self._pworking -= 1
break
else:
time.sleep(0.02)
while self._precvsWaiting and self._pworking < self.nproc:
newtid, newprecv = self._precvsWaiting.popitem()
self._precvs[newtid] = newprecv
self._procs[newtid].start()
self._pworking += 1
def make_chunks(arguments, nproc, maxbatch=10):
# determine how many documents to read in one go
nargs = len(arguments)
chunksize = nargs // nproc
if chunksize >= maxbatch:
# try to improve batch size vs. number of batches
chunksize = int(sqrt(nargs / nproc * maxbatch))
if chunksize == 0:
chunksize = 1
nchunks, rest = divmod(nargs, chunksize)
if rest:
nchunks += 1
# partition documents in "chunks" that will be written by one Process
return [arguments[i * chunksize:(i + 1) * chunksize] for i in range(nchunks)]
|