diff options
Diffstat (limited to 'sphinx/environment.py')
-rw-r--r-- | sphinx/environment.py | 260 |
1 files changed, 205 insertions, 55 deletions
diff --git a/sphinx/environment.py b/sphinx/environment.py index 560756a89..2cb7adfdb 100644 --- a/sphinx/environment.py +++ b/sphinx/environment.py @@ -22,8 +22,14 @@ from os import path from glob import glob from itertools import groupby +try: + import multiprocessing + import threading +except ImportError: + multiprocessing = threading = None + from six import iteritems, itervalues, text_type, class_types -from six.moves import cPickle as pickle, zip +from six.moves import cPickle as pickle, zip, queue from docutils import nodes from docutils.io import FileInput, NullOutput from docutils.core import Publisher @@ -40,6 +46,7 @@ from sphinx.util import url_re, get_matching_docs, docname_join, split_into, \ FilenameUniqDict from sphinx.util.nodes import clean_astext, make_refnode, WarningStream from sphinx.util.osutil import SEP, find_catalog_files, getcwd, fs_encoding +from sphinx.util.console import bold, purple from sphinx.util.matching import compile_matchers from sphinx.util.websupport import is_commentable from sphinx.errors import SphinxError, ExtensionError @@ -328,6 +335,50 @@ class BuildEnvironment: for domain in self.domains.values(): domain.clear_doc(docname) + def merge_info_from(self, docnames, other, app): + """Merge global information gathered about *docnames* while reading them + from the *other* environment. + + This possibly comes from a parallel build process. + """ + docnames = set(docnames) + for docname in docnames: + self.all_docs[docname] = other.all_docs[docname] + if docname in other.reread_always: + self.reread_always.add(docname) + self.metadata[docname] = other.metadata[docname] + if docname in other.dependencies: + self.dependencies[docname] = other.dependencies[docname] + self.titles[docname] = other.titles[docname] + self.longtitles[docname] = other.longtitles[docname] + self.tocs[docname] = other.tocs[docname] + self.toc_num_entries[docname] = other.toc_num_entries[docname] + # toc_secnumbers is not assigned during read + if docname in other.toctree_includes: + self.toctree_includes[docname] = other.toctree_includes[docname] + self.indexentries[docname] = other.indexentries[docname] + if docname in other.glob_toctrees: + self.glob_toctrees.add(docname) + if docname in other.numbered_toctrees: + self.numbered_toctrees.add(docname) + + self.images.merge_other(docnames, other.images) + self.dlfiles.merge_other(docnames, other.dlfiles) + + for subfn, fnset in other.files_to_rebuild.items(): + self.files_to_rebuild.setdefault(subfn, set()).update(fnset & docnames) + for key, data in other.citations.items(): + # XXX duplicates? + if data[0] in docnames: + self.citations[key] = data + for version, changes in other.versionchanges.items(): + self.versionchanges.setdefault(version, []).extend( + change for change in changes if change[1] in docnames) + + for domainname, domain in self.domains.items(): + domain.merge_domaindata(docnames, other.domaindata[domainname]) + app.emit('env-merge-info', self, docnames, other) + def doc2path(self, docname, base=True, suffix=None): """Return the filename for the document name. @@ -443,13 +494,11 @@ class BuildEnvironment: return added, changed, removed - def update(self, config, srcdir, doctreedir, app=None): + def update(self, config, srcdir, doctreedir, app): """(Re-)read all files new or changed since last update. - Returns a summary, the total count of documents to reread and an - iterator that yields docnames as it processes them. Store all - environment docnames in the canonical format (ie using SEP as a - separator in place of os.path.sep). + Store all environment docnames in the canonical format (ie using SEP as + a separator in place of os.path.sep). """ config_changed = False if self.config is None: @@ -481,6 +530,8 @@ class BuildEnvironment: # this cache also needs to be updated every time self._nitpick_ignore = set(self.config.nitpick_ignore) + app.info(bold('updating environment: '), nonl=1) + added, changed, removed = self.get_outdated_files(config_changed) # allow user intervention as well @@ -495,33 +546,145 @@ class BuildEnvironment: msg += '%s added, %s changed, %s removed' % (len(added), len(changed), len(removed)) + app.info(msg) - def update_generator(): - self.app = app + self.app = app - # clear all files no longer present - for docname in removed: - if app: - app.emit('env-purge-doc', self, docname) - self.clear_doc(docname) - - # read all new and changed files - docnames = sorted(added | changed) - if app: - app.emit('env-before-read-docs', self, docnames) - for docname in docnames: - yield docname - self.read_doc(docname, app=app) + # clear all files no longer present + for docname in removed: + app.emit('env-purge-doc', self, docname) + self.clear_doc(docname) + + # read all new and changed files + docnames = sorted(added | changed) + # allow changing and reordering the list of docs to read + app.emit('env-before-read-docs', self, docnames) + + # check if we should do parallel or serial read + par_ok = False + if (len(added | changed) > 5 and + multiprocessing and + app.parallel > 1 and + os.name == 'posix'): + par_ok = True + for extname, md in app._extension_metadata.items(): + ext_ok = md.get('parallel_read_safe') + if ext_ok: + continue + if ext_ok is None: + app.warn('the %s extension does not declare if it ' + 'is safe for parallel reading, assuming it ' + 'isn\'t - please ask the extension author to ' + 'check and make it explicit' % extname) + app.warn('doing serial read') + else: + app.warn('the %s extension is not safe for parallel ' + 'reading, doing serial read' % extname) + par_ok = False + break + if par_ok: + self._read_parallel(docnames, app, nproc=app.parallel) + else: + self._read_serial(docnames, app) + + if config.master_doc not in self.all_docs: + self.warn(None, 'master file %s not found' % + self.doc2path(config.master_doc)) + + self.app = None + app.emit('env-updated', self) + return docnames + + def _read_serial(self, docnames, app): + for docname in app.status_iterator(docnames, 'reading sources... ', + purple, len(docnames)): + # remove all inventory entries for that file + app.emit('env-purge-doc', self, docname) + self.clear_doc(docname) + self.read_doc(docname, app) - if config.master_doc not in self.all_docs: - self.warn(None, 'master file %s not found' % - self.doc2path(config.master_doc)) + def _read_parallel(self, docnames, app, nproc): + def read_process(docs, pipe): + self.app = app + self.warnings = [] + self.set_warnfunc(lambda *args: self.warnings.append(args)) + try: + for docname in docs: + self.read_doc(docname, app) + except KeyboardInterrupt: + # XXX return None? + pass # do not print a traceback on Ctrl-C + self.set_warnfunc(None) + del self.app + del self.domains + del self.config.values + del self.config + pipe.send(self) + + def process_thread(docs): + precv, psend = multiprocessing.Pipe(False) + p = multiprocessing.Process(target=read_process, args=(docs, psend)) + p.start() + # XXX error handling + new_env = precv.recv() + merge_queue.put((docs, new_env)) + p.join() + semaphore.release() + + # allow only "nproc" worker processes at once + semaphore = threading.Semaphore(nproc) + # list of threads to join when waiting for completion + threads = [] + # queue of other env objects to merge + merge_queue = queue.Queue() + + # clear all outdated docs at once + for docname in docnames: + app.emit('env-purge-doc', self, docname) + self.clear_doc(docname) + + # determine how many documents to read in one go + ndocs = len(docnames) + chunksize = min(ndocs // nproc, 10) + if chunksize == 0: + chunksize = 1 + nchunks, rest = divmod(ndocs, chunksize) + if rest: + nchunks += 1 + # partition documents in "chunks" that will be written by one Process + chunks = [docnames[i*chunksize:(i+1)*chunksize] for i in range(nchunks)] + + warnings = [] + merged = 0 + for chunk in app.status_iterator(chunks, 'reading sources... ', + purple, len(chunks)): + semaphore.acquire() + t = threading.Thread(target=process_thread, args=(chunk,)) + t.setDaemon(True) + t.start() + threads.append(t) + try: + docs, other = merge_queue.get(False) + except queue.Empty: + pass + else: + warnings.extend(other.warnings) + self.merge_info_from(docs, other, app) + merged += 1 + + while merged < len(chunks): + docs, other = merge_queue.get() + warnings.extend(other.warnings) + self.merge_info_from(docs, other, app) + merged += 1 - self.app = None - if app: - app.emit('env-updated', self) + for warning in warnings: + self._warnfunc(*warning) - return msg, len(added | changed), update_generator() + # make sure all threads have finished + app.info(bold('waiting for workers... ')) + for t in threads: + t.join() def check_dependents(self, already): to_rewrite = self.assign_section_numbers() @@ -590,19 +753,8 @@ class BuildEnvironment: directives.directive = directive roles.role = role - def read_doc(self, docname, src_path=None, save_parsed=True, app=None): - """Parse a file and add/update inventory entries for the doctree. - - If srcpath is given, read from a different source file. - """ - # remove all inventory entries for that file - if app: - app.emit('env-purge-doc', self, docname) - - self.clear_doc(docname) - - if src_path is None: - src_path = self.doc2path(docname) + def read_doc(self, docname, app=None): + """Parse a file and add/update inventory entries for the doctree.""" self.temp_data['docname'] = docname # defaults to the global default, but can be re-set in a document @@ -639,6 +791,7 @@ class BuildEnvironment: destination_class=NullOutput) pub.set_components(None, 'restructuredtext', None) pub.process_programmatic_settings(None, self.settings, None) + src_path = self.doc2path(docname) source = SphinxFileInput(app, self, source=None, source_path=src_path, encoding=self.config.source_encoding) pub.source = source @@ -706,20 +859,17 @@ class BuildEnvironment: self.ref_context.clear() roles._roles.pop('', None) # if a document has set a local default role - if save_parsed: - # save the parsed doctree - doctree_filename = self.doc2path(docname, self.doctreedir, - '.doctree') - dirname = path.dirname(doctree_filename) - if not path.isdir(dirname): - os.makedirs(dirname) - f = open(doctree_filename, 'wb') - try: - pickle.dump(doctree, f, pickle.HIGHEST_PROTOCOL) - finally: - f.close() - else: - return doctree + # save the parsed doctree + doctree_filename = self.doc2path(docname, self.doctreedir, + '.doctree') + dirname = path.dirname(doctree_filename) + if not path.isdir(dirname): + os.makedirs(dirname) + f = open(doctree_filename, 'wb') + try: + pickle.dump(doctree, f, pickle.HIGHEST_PROTOCOL) + finally: + f.close() # utilities to use while reading a document |