summaryrefslogtreecommitdiff
path: root/sphinx/environment.py
diff options
context:
space:
mode:
authorGeorg Brandl <georg@python.org>2014-09-22 14:51:47 +0200
committerGeorg Brandl <georg@python.org>2014-09-22 14:51:47 +0200
commit31452fc64d550f526d3a6c4dfbb1121873896503 (patch)
tree1b259d36d3f105e8b0971acb1f838f89110595e1 /sphinx/environment.py
parent905cbf853d5fe8fa911ec8fc81dc31134f4e5ba2 (diff)
downloadsphinx-git-31452fc64d550f526d3a6c4dfbb1121873896503.tar.gz
[WIP] parallel read
Diffstat (limited to 'sphinx/environment.py')
-rw-r--r--sphinx/environment.py260
1 files changed, 205 insertions, 55 deletions
diff --git a/sphinx/environment.py b/sphinx/environment.py
index 560756a89..2cb7adfdb 100644
--- a/sphinx/environment.py
+++ b/sphinx/environment.py
@@ -22,8 +22,14 @@ from os import path
from glob import glob
from itertools import groupby
+try:
+ import multiprocessing
+ import threading
+except ImportError:
+ multiprocessing = threading = None
+
from six import iteritems, itervalues, text_type, class_types
-from six.moves import cPickle as pickle, zip
+from six.moves import cPickle as pickle, zip, queue
from docutils import nodes
from docutils.io import FileInput, NullOutput
from docutils.core import Publisher
@@ -40,6 +46,7 @@ from sphinx.util import url_re, get_matching_docs, docname_join, split_into, \
FilenameUniqDict
from sphinx.util.nodes import clean_astext, make_refnode, WarningStream
from sphinx.util.osutil import SEP, find_catalog_files, getcwd, fs_encoding
+from sphinx.util.console import bold, purple
from sphinx.util.matching import compile_matchers
from sphinx.util.websupport import is_commentable
from sphinx.errors import SphinxError, ExtensionError
@@ -328,6 +335,50 @@ class BuildEnvironment:
for domain in self.domains.values():
domain.clear_doc(docname)
+ def merge_info_from(self, docnames, other, app):
+ """Merge global information gathered about *docnames* while reading them
+ from the *other* environment.
+
+ This possibly comes from a parallel build process.
+ """
+ docnames = set(docnames)
+ for docname in docnames:
+ self.all_docs[docname] = other.all_docs[docname]
+ if docname in other.reread_always:
+ self.reread_always.add(docname)
+ self.metadata[docname] = other.metadata[docname]
+ if docname in other.dependencies:
+ self.dependencies[docname] = other.dependencies[docname]
+ self.titles[docname] = other.titles[docname]
+ self.longtitles[docname] = other.longtitles[docname]
+ self.tocs[docname] = other.tocs[docname]
+ self.toc_num_entries[docname] = other.toc_num_entries[docname]
+ # toc_secnumbers is not assigned during read
+ if docname in other.toctree_includes:
+ self.toctree_includes[docname] = other.toctree_includes[docname]
+ self.indexentries[docname] = other.indexentries[docname]
+ if docname in other.glob_toctrees:
+ self.glob_toctrees.add(docname)
+ if docname in other.numbered_toctrees:
+ self.numbered_toctrees.add(docname)
+
+ self.images.merge_other(docnames, other.images)
+ self.dlfiles.merge_other(docnames, other.dlfiles)
+
+ for subfn, fnset in other.files_to_rebuild.items():
+ self.files_to_rebuild.setdefault(subfn, set()).update(fnset & docnames)
+ for key, data in other.citations.items():
+ # XXX duplicates?
+ if data[0] in docnames:
+ self.citations[key] = data
+ for version, changes in other.versionchanges.items():
+ self.versionchanges.setdefault(version, []).extend(
+ change for change in changes if change[1] in docnames)
+
+ for domainname, domain in self.domains.items():
+ domain.merge_domaindata(docnames, other.domaindata[domainname])
+ app.emit('env-merge-info', self, docnames, other)
+
def doc2path(self, docname, base=True, suffix=None):
"""Return the filename for the document name.
@@ -443,13 +494,11 @@ class BuildEnvironment:
return added, changed, removed
- def update(self, config, srcdir, doctreedir, app=None):
+ def update(self, config, srcdir, doctreedir, app):
"""(Re-)read all files new or changed since last update.
- Returns a summary, the total count of documents to reread and an
- iterator that yields docnames as it processes them. Store all
- environment docnames in the canonical format (ie using SEP as a
- separator in place of os.path.sep).
+ Store all environment docnames in the canonical format (ie using SEP as
+ a separator in place of os.path.sep).
"""
config_changed = False
if self.config is None:
@@ -481,6 +530,8 @@ class BuildEnvironment:
# this cache also needs to be updated every time
self._nitpick_ignore = set(self.config.nitpick_ignore)
+ app.info(bold('updating environment: '), nonl=1)
+
added, changed, removed = self.get_outdated_files(config_changed)
# allow user intervention as well
@@ -495,33 +546,145 @@ class BuildEnvironment:
msg += '%s added, %s changed, %s removed' % (len(added), len(changed),
len(removed))
+ app.info(msg)
- def update_generator():
- self.app = app
+ self.app = app
- # clear all files no longer present
- for docname in removed:
- if app:
- app.emit('env-purge-doc', self, docname)
- self.clear_doc(docname)
-
- # read all new and changed files
- docnames = sorted(added | changed)
- if app:
- app.emit('env-before-read-docs', self, docnames)
- for docname in docnames:
- yield docname
- self.read_doc(docname, app=app)
+ # clear all files no longer present
+ for docname in removed:
+ app.emit('env-purge-doc', self, docname)
+ self.clear_doc(docname)
+
+ # read all new and changed files
+ docnames = sorted(added | changed)
+ # allow changing and reordering the list of docs to read
+ app.emit('env-before-read-docs', self, docnames)
+
+ # check if we should do parallel or serial read
+ par_ok = False
+ if (len(added | changed) > 5 and
+ multiprocessing and
+ app.parallel > 1 and
+ os.name == 'posix'):
+ par_ok = True
+ for extname, md in app._extension_metadata.items():
+ ext_ok = md.get('parallel_read_safe')
+ if ext_ok:
+ continue
+ if ext_ok is None:
+ app.warn('the %s extension does not declare if it '
+ 'is safe for parallel reading, assuming it '
+ 'isn\'t - please ask the extension author to '
+ 'check and make it explicit' % extname)
+ app.warn('doing serial read')
+ else:
+ app.warn('the %s extension is not safe for parallel '
+ 'reading, doing serial read' % extname)
+ par_ok = False
+ break
+ if par_ok:
+ self._read_parallel(docnames, app, nproc=app.parallel)
+ else:
+ self._read_serial(docnames, app)
+
+ if config.master_doc not in self.all_docs:
+ self.warn(None, 'master file %s not found' %
+ self.doc2path(config.master_doc))
+
+ self.app = None
+ app.emit('env-updated', self)
+ return docnames
+
+ def _read_serial(self, docnames, app):
+ for docname in app.status_iterator(docnames, 'reading sources... ',
+ purple, len(docnames)):
+ # remove all inventory entries for that file
+ app.emit('env-purge-doc', self, docname)
+ self.clear_doc(docname)
+ self.read_doc(docname, app)
- if config.master_doc not in self.all_docs:
- self.warn(None, 'master file %s not found' %
- self.doc2path(config.master_doc))
+ def _read_parallel(self, docnames, app, nproc):
+ def read_process(docs, pipe):
+ self.app = app
+ self.warnings = []
+ self.set_warnfunc(lambda *args: self.warnings.append(args))
+ try:
+ for docname in docs:
+ self.read_doc(docname, app)
+ except KeyboardInterrupt:
+ # XXX return None?
+ pass # do not print a traceback on Ctrl-C
+ self.set_warnfunc(None)
+ del self.app
+ del self.domains
+ del self.config.values
+ del self.config
+ pipe.send(self)
+
+ def process_thread(docs):
+ precv, psend = multiprocessing.Pipe(False)
+ p = multiprocessing.Process(target=read_process, args=(docs, psend))
+ p.start()
+ # XXX error handling
+ new_env = precv.recv()
+ merge_queue.put((docs, new_env))
+ p.join()
+ semaphore.release()
+
+ # allow only "nproc" worker processes at once
+ semaphore = threading.Semaphore(nproc)
+ # list of threads to join when waiting for completion
+ threads = []
+ # queue of other env objects to merge
+ merge_queue = queue.Queue()
+
+ # clear all outdated docs at once
+ for docname in docnames:
+ app.emit('env-purge-doc', self, docname)
+ self.clear_doc(docname)
+
+ # determine how many documents to read in one go
+ ndocs = len(docnames)
+ chunksize = min(ndocs // nproc, 10)
+ if chunksize == 0:
+ chunksize = 1
+ nchunks, rest = divmod(ndocs, chunksize)
+ if rest:
+ nchunks += 1
+ # partition documents in "chunks" that will be written by one Process
+ chunks = [docnames[i*chunksize:(i+1)*chunksize] for i in range(nchunks)]
+
+ warnings = []
+ merged = 0
+ for chunk in app.status_iterator(chunks, 'reading sources... ',
+ purple, len(chunks)):
+ semaphore.acquire()
+ t = threading.Thread(target=process_thread, args=(chunk,))
+ t.setDaemon(True)
+ t.start()
+ threads.append(t)
+ try:
+ docs, other = merge_queue.get(False)
+ except queue.Empty:
+ pass
+ else:
+ warnings.extend(other.warnings)
+ self.merge_info_from(docs, other, app)
+ merged += 1
+
+ while merged < len(chunks):
+ docs, other = merge_queue.get()
+ warnings.extend(other.warnings)
+ self.merge_info_from(docs, other, app)
+ merged += 1
- self.app = None
- if app:
- app.emit('env-updated', self)
+ for warning in warnings:
+ self._warnfunc(*warning)
- return msg, len(added | changed), update_generator()
+ # make sure all threads have finished
+ app.info(bold('waiting for workers... '))
+ for t in threads:
+ t.join()
def check_dependents(self, already):
to_rewrite = self.assign_section_numbers()
@@ -590,19 +753,8 @@ class BuildEnvironment:
directives.directive = directive
roles.role = role
- def read_doc(self, docname, src_path=None, save_parsed=True, app=None):
- """Parse a file and add/update inventory entries for the doctree.
-
- If srcpath is given, read from a different source file.
- """
- # remove all inventory entries for that file
- if app:
- app.emit('env-purge-doc', self, docname)
-
- self.clear_doc(docname)
-
- if src_path is None:
- src_path = self.doc2path(docname)
+ def read_doc(self, docname, app=None):
+ """Parse a file and add/update inventory entries for the doctree."""
self.temp_data['docname'] = docname
# defaults to the global default, but can be re-set in a document
@@ -639,6 +791,7 @@ class BuildEnvironment:
destination_class=NullOutput)
pub.set_components(None, 'restructuredtext', None)
pub.process_programmatic_settings(None, self.settings, None)
+ src_path = self.doc2path(docname)
source = SphinxFileInput(app, self, source=None, source_path=src_path,
encoding=self.config.source_encoding)
pub.source = source
@@ -706,20 +859,17 @@ class BuildEnvironment:
self.ref_context.clear()
roles._roles.pop('', None) # if a document has set a local default role
- if save_parsed:
- # save the parsed doctree
- doctree_filename = self.doc2path(docname, self.doctreedir,
- '.doctree')
- dirname = path.dirname(doctree_filename)
- if not path.isdir(dirname):
- os.makedirs(dirname)
- f = open(doctree_filename, 'wb')
- try:
- pickle.dump(doctree, f, pickle.HIGHEST_PROTOCOL)
- finally:
- f.close()
- else:
- return doctree
+ # save the parsed doctree
+ doctree_filename = self.doc2path(docname, self.doctreedir,
+ '.doctree')
+ dirname = path.dirname(doctree_filename)
+ if not path.isdir(dirname):
+ os.makedirs(dirname)
+ f = open(doctree_filename, 'wb')
+ try:
+ pickle.dump(doctree, f, pickle.HIGHEST_PROTOCOL)
+ finally:
+ f.close()
# utilities to use while reading a document