diff options
author | Georg Brandl <georg@python.org> | 2014-09-22 16:21:04 +0200 |
---|---|---|
committer | Georg Brandl <georg@python.org> | 2014-09-22 16:21:04 +0200 |
commit | 1d8e60fd86bc5297ad419fac45c5ea2a6b23513f (patch) | |
tree | be37a9d8b8bc0b94da85227318617f9889473b21 | |
parent | 34648f8aa680a89f7d95253a59facea78ad278ad (diff) | |
download | sphinx-1d8e60fd86bc5297ad419fac45c5ea2a6b23513f.tar.gz |
Factor out parallel building into a utility class. Better error handling
with traceback of the parallel process saved in the error log.
-rw-r--r-- | sphinx/builders/__init__.py | 66 | ||||
-rw-r--r-- | sphinx/environment.py | 100 | ||||
-rw-r--r-- | sphinx/errors.py | 13 | ||||
-rw-r--r-- | sphinx/util/__init__.py | 10 | ||||
-rw-r--r-- | sphinx/util/parallel.py | 106 |
5 files changed, 169 insertions, 126 deletions
diff --git a/sphinx/builders/__init__.py b/sphinx/builders/__init__.py index f797766b..2fe45db2 100644 --- a/sphinx/builders/__init__.py +++ b/sphinx/builders/__init__.py @@ -22,7 +22,8 @@ from docutils import nodes from sphinx.util import i18n, path_stabilize from sphinx.util.osutil import SEP, relative_uri, find_catalog -from sphinx.util.console import bold, purple, darkgreen +from sphinx.util.console import bold, darkgreen +from sphinx.util.parallel import ParallelProcess, parallel_available # side effect: registers roles and directives from sphinx import roles @@ -318,10 +319,8 @@ class Builder(object): # check for prerequisites to parallel build # (parallel only works on POSIX, because the forking impl of # multiprocessing is required) - if (multiprocessing and - self.app.parallel > 1 and - self.allow_parallel and - os.name == 'posix'): + if parallel_available and len(docnames) > 5 and self.app.parallel > 1 \ + and self.allow_parallel: for extname, md in self.app._extension_metadata.items(): par_ok = md.get('parallel_write_safe', True) if not par_ok: @@ -349,59 +348,32 @@ class Builder(object): def _write_parallel(self, docnames, warnings, nproc): def write_process(docs): - try: - for docname, doctree in docs: - self.write_doc(docname, doctree) - except KeyboardInterrupt: - pass # do not print a traceback on Ctrl-C - finally: - for warning in warnings: - self.warn(*warning) - - def process_thread(docs): - p = multiprocessing.Process(target=write_process, args=(docs,)) - p.start() - p.join() - semaphore.release() - - # allow only "nproc" worker processes at once - semaphore = threading.Semaphore(nproc) - # list of threads to join when waiting for completion - threads = [] + for docname, doctree in docs: + self.write_doc(docname, doctree) + return warnings + + def process_warnings(docs, wlist): + warnings.extend(wlist) # warm up caches/compile templates using the first document firstname, docnames = docnames[0], docnames[1:] doctree = self.env.get_and_resolve_doctree(firstname, self) self.write_doc_serialized(firstname, doctree) self.write_doc(firstname, doctree) - # for the rest, determine how many documents to write in one go - ndocs = len(docnames) - chunksize = min(ndocs // nproc, 10) - if chunksize == 0: - chunksize = 1 - nchunks, rest = divmod(ndocs, chunksize) - if rest: - nchunks += 1 - # partition documents in "chunks" that will be written by one Process - chunks = [docnames[i*chunksize:(i+1)*chunksize] for i in range(nchunks)] - for docnames in self.app.status_iterator( - chunks, 'writing output... ', darkgreen, len(chunks)): - docs = [] - for docname in docnames: + + proc = ParallelProcess(write_process, process_warnings, nproc) + proc.set_arguments(docnames) + + for chunk in self.app.status_iterator(proc.spawn(), 'writing output... ', + darkgreen, proc.nchunks): + for i, docname in enumerate(chunk): doctree = self.env.get_and_resolve_doctree(docname, self) self.write_doc_serialized(docname, doctree) - docs.append((docname, doctree)) - # start a new thread to oversee the completion of this chunk - semaphore.acquire() - t = threading.Thread(target=process_thread, args=(docs,)) - t.setDaemon(True) - t.start() - threads.append(t) + chunk[i] = (docname, doctree) # make sure all threads have finished self.info(bold('waiting for workers... ')) - for t in threads: - t.join() + proc.join() def prepare_writing(self, docnames): """A place where you can add logic before :meth:`write_doc` is run""" diff --git a/sphinx/environment.py b/sphinx/environment.py index 2cb7adfd..86718ed8 100644 --- a/sphinx/environment.py +++ b/sphinx/environment.py @@ -22,14 +22,8 @@ from os import path from glob import glob from itertools import groupby -try: - import multiprocessing - import threading -except ImportError: - multiprocessing = threading = None - from six import iteritems, itervalues, text_type, class_types -from six.moves import cPickle as pickle, zip, queue +from six.moves import cPickle as pickle, zip from docutils import nodes from docutils.io import FileInput, NullOutput from docutils.core import Publisher @@ -48,6 +42,7 @@ from sphinx.util.nodes import clean_astext, make_refnode, WarningStream from sphinx.util.osutil import SEP, find_catalog_files, getcwd, fs_encoding from sphinx.util.console import bold, purple from sphinx.util.matching import compile_matchers +from sphinx.util.parallel import ParallelProcess, parallel_available from sphinx.util.websupport import is_commentable from sphinx.errors import SphinxError, ExtensionError from sphinx.locale import _ @@ -562,10 +557,7 @@ class BuildEnvironment: # check if we should do parallel or serial read par_ok = False - if (len(added | changed) > 5 and - multiprocessing and - app.parallel > 1 and - os.name == 'posix'): + if parallel_available and len(docnames) > 5 and app.parallel > 1: par_ok = True for extname, md in app._extension_metadata.items(): ext_ok = md.get('parallel_read_safe') @@ -604,88 +596,44 @@ class BuildEnvironment: self.read_doc(docname, app) def _read_parallel(self, docnames, app, nproc): - def read_process(docs, pipe): + # clear all outdated docs at once + for docname in docnames: + app.emit('env-purge-doc', self, docname) + self.clear_doc(docname) + + def read_process(docs): self.app = app self.warnings = [] self.set_warnfunc(lambda *args: self.warnings.append(args)) - try: - for docname in docs: - self.read_doc(docname, app) - except KeyboardInterrupt: - # XXX return None? - pass # do not print a traceback on Ctrl-C + for docname in docs: + self.read_doc(docname, app) + # allow pickling self to send it back self.set_warnfunc(None) del self.app del self.domains del self.config.values del self.config - pipe.send(self) - - def process_thread(docs): - precv, psend = multiprocessing.Pipe(False) - p = multiprocessing.Process(target=read_process, args=(docs, psend)) - p.start() - # XXX error handling - new_env = precv.recv() - merge_queue.put((docs, new_env)) - p.join() - semaphore.release() - - # allow only "nproc" worker processes at once - semaphore = threading.Semaphore(nproc) - # list of threads to join when waiting for completion - threads = [] - # queue of other env objects to merge - merge_queue = queue.Queue() + return self - # clear all outdated docs at once - for docname in docnames: - app.emit('env-purge-doc', self, docname) - self.clear_doc(docname) + def merge(docs, otherenv): + warnings.extend(otherenv.warnings) + self.merge_info_from(docs, otherenv, app) - # determine how many documents to read in one go - ndocs = len(docnames) - chunksize = min(ndocs // nproc, 10) - if chunksize == 0: - chunksize = 1 - nchunks, rest = divmod(ndocs, chunksize) - if rest: - nchunks += 1 - # partition documents in "chunks" that will be written by one Process - chunks = [docnames[i*chunksize:(i+1)*chunksize] for i in range(nchunks)] + proc = ParallelProcess(read_process, merge, nproc) + proc.set_arguments(docnames) warnings = [] - merged = 0 - for chunk in app.status_iterator(chunks, 'reading sources... ', - purple, len(chunks)): - semaphore.acquire() - t = threading.Thread(target=process_thread, args=(chunk,)) - t.setDaemon(True) - t.start() - threads.append(t) - try: - docs, other = merge_queue.get(False) - except queue.Empty: - pass - else: - warnings.extend(other.warnings) - self.merge_info_from(docs, other, app) - merged += 1 + for chunk in app.status_iterator(proc.spawn(), 'reading sources... ', + purple, proc.nchunks): + pass # spawning in the iterator - while merged < len(chunks): - docs, other = merge_queue.get() - warnings.extend(other.warnings) - self.merge_info_from(docs, other, app) - merged += 1 + # make sure all threads have finished + app.info(bold('waiting for workers... ')) + proc.join() for warning in warnings: self._warnfunc(*warning) - # make sure all threads have finished - app.info(bold('waiting for workers... ')) - for t in threads: - t.join() - def check_dependents(self, already): to_rewrite = self.assign_section_numbers() for docname in to_rewrite: diff --git a/sphinx/errors.py b/sphinx/errors.py index 4d737e51..3d7a5eb4 100644 --- a/sphinx/errors.py +++ b/sphinx/errors.py @@ -10,6 +10,9 @@ :license: BSD, see LICENSE for details. """ +import traceback + + class SphinxError(Exception): """ Base class for Sphinx errors that are shown to the user in a nicer @@ -62,3 +65,13 @@ class PycodeError(Exception): if len(self.args) > 1: res += ' (exception was: %r)' % self.args[1] return res + + +class SphinxParallelError(Exception): + def __init__(self, orig_exc, traceback): + self.orig_exc = orig_exc + self.traceback = traceback + + def __str__(self): + return traceback.format_exception_only( + self.orig_exc.__class__, self.orig_exc)[0].strip() diff --git a/sphinx/util/__init__.py b/sphinx/util/__init__.py index 30dc0cb0..e7277520 100644 --- a/sphinx/util/__init__.py +++ b/sphinx/util/__init__.py @@ -29,7 +29,7 @@ from docutils.utils import relative_path import jinja2 import sphinx -from sphinx.errors import PycodeError +from sphinx.errors import PycodeError, SphinxParallelError from sphinx.util.console import strip_colors from sphinx.util.osutil import fs_encoding @@ -191,7 +191,11 @@ _DEBUG_HEADER = '''\ def save_traceback(app): """Save the current exception's traceback in a temporary file.""" import platform - exc = traceback.format_exc() + exc = sys.exc_info()[1] + if isinstance(exc, SphinxParallelError): + exc_format = '(Error in parallel process)\n' + exc.traceback + else: + exc_format = traceback.format_exc() fd, path = tempfile.mkstemp('.log', 'sphinx-err-') last_msgs = '' if app is not None: @@ -212,7 +216,7 @@ def save_traceback(app): os.write(fd, ('# %s (%s) from %s\n' % ( extname, app._extension_metadata[extname]['version'], modfile)).encode('utf-8')) - os.write(fd, exc.encode('utf-8')) + os.write(fd, exc_format.encode('utf-8')) os.close(fd) return path diff --git a/sphinx/util/parallel.py b/sphinx/util/parallel.py new file mode 100644 index 00000000..44a69800 --- /dev/null +++ b/sphinx/util/parallel.py @@ -0,0 +1,106 @@ +# -*- coding: utf-8 -*- +""" + sphinx.util.parallel + ~~~~~~~~~~~~~~~~~~~~ + + Parallel building utilities. + + :copyright: Copyright 2007-2014 by the Sphinx team, see AUTHORS. + :license: BSD, see LICENSE for details. +""" + +import os +import traceback + +try: + import multiprocessing + import threading +except ImportError: + multiprocessing = threading = None + +from six.moves import queue + +from sphinx.errors import SphinxParallelError + +# our parallel functionality only works for the forking Process +parallel_available = multiprocessing and (os.name == 'posix') + + +class ParallelProcess(object): + + def __init__(self, process_func, result_func, nproc, maxbatch=10): + self.process_func = process_func + self.result_func = result_func + self.nproc = nproc + self.maxbatch = maxbatch + # list of threads to join when waiting for completion + self._threads = [] + self._chunks = [] + self.nchunks = 0 + # queue of result objects to process + self.result_queue = queue.Queue() + self._nprocessed = 0 + + def set_arguments(self, arguments): + # determine how many documents to read in one go + nargs = len(arguments) + chunksize = min(nargs // self.nproc, self.maxbatch) + if chunksize == 0: + chunksize = 1 + nchunks, rest = divmod(nargs, chunksize) + if rest: + nchunks += 1 + # partition documents in "chunks" that will be written by one Process + self._chunks = [arguments[i*chunksize:(i+1)*chunksize] for i in range(nchunks)] + self.nchunks = len(self._chunks) + + def spawn(self): + assert self._chunks + + def process(pipe, chunk): + try: + ret = self.process_func(chunk) + pipe.send((False, ret)) + except BaseException as err: + pipe.send((True, (err, traceback.format_exc()))) + + def process_thread(chunk): + precv, psend = multiprocessing.Pipe(False) + proc = multiprocessing.Process(target=process, args=(psend, chunk)) + proc.start() + result = precv.recv() + self.result_queue.put((chunk,) + result) + proc.join() + semaphore.release() + + # allow only "nproc" worker processes at once + semaphore = threading.Semaphore(self.nproc) + + for chunk in self._chunks: + yield chunk + semaphore.acquire() + t = threading.Thread(target=process_thread, args=(chunk,)) + t.setDaemon(True) + t.start() + self._threads.append(t) + # try processing results already in parallel + try: + chunk, exc, result = self.result_queue.get(False) + except queue.Empty: + pass + else: + if exc: + raise SphinxParallelError(*result) + self.result_func(chunk, result) + self._nprocessed += 1 + + def join(self): + while self._nprocessed < self.nchunks: + chunk, exc, result = self.result_queue.get() + if exc: + raise SphinxParallelError(*result) + self.result_func(chunk, result) + self._nprocessed += 1 + + for t in self._threads: + t.join() |