summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGeorg Brandl <georg@python.org>2014-09-22 16:21:04 +0200
committerGeorg Brandl <georg@python.org>2014-09-22 16:21:04 +0200
commit1d8e60fd86bc5297ad419fac45c5ea2a6b23513f (patch)
treebe37a9d8b8bc0b94da85227318617f9889473b21
parent34648f8aa680a89f7d95253a59facea78ad278ad (diff)
downloadsphinx-1d8e60fd86bc5297ad419fac45c5ea2a6b23513f.tar.gz
Factor out parallel building into a utility class. Better error handling
with traceback of the parallel process saved in the error log.
-rw-r--r--sphinx/builders/__init__.py66
-rw-r--r--sphinx/environment.py100
-rw-r--r--sphinx/errors.py13
-rw-r--r--sphinx/util/__init__.py10
-rw-r--r--sphinx/util/parallel.py106
5 files changed, 169 insertions, 126 deletions
diff --git a/sphinx/builders/__init__.py b/sphinx/builders/__init__.py
index f797766b..2fe45db2 100644
--- a/sphinx/builders/__init__.py
+++ b/sphinx/builders/__init__.py
@@ -22,7 +22,8 @@ from docutils import nodes
from sphinx.util import i18n, path_stabilize
from sphinx.util.osutil import SEP, relative_uri, find_catalog
-from sphinx.util.console import bold, purple, darkgreen
+from sphinx.util.console import bold, darkgreen
+from sphinx.util.parallel import ParallelProcess, parallel_available
# side effect: registers roles and directives
from sphinx import roles
@@ -318,10 +319,8 @@ class Builder(object):
# check for prerequisites to parallel build
# (parallel only works on POSIX, because the forking impl of
# multiprocessing is required)
- if (multiprocessing and
- self.app.parallel > 1 and
- self.allow_parallel and
- os.name == 'posix'):
+ if parallel_available and len(docnames) > 5 and self.app.parallel > 1 \
+ and self.allow_parallel:
for extname, md in self.app._extension_metadata.items():
par_ok = md.get('parallel_write_safe', True)
if not par_ok:
@@ -349,59 +348,32 @@ class Builder(object):
def _write_parallel(self, docnames, warnings, nproc):
def write_process(docs):
- try:
- for docname, doctree in docs:
- self.write_doc(docname, doctree)
- except KeyboardInterrupt:
- pass # do not print a traceback on Ctrl-C
- finally:
- for warning in warnings:
- self.warn(*warning)
-
- def process_thread(docs):
- p = multiprocessing.Process(target=write_process, args=(docs,))
- p.start()
- p.join()
- semaphore.release()
-
- # allow only "nproc" worker processes at once
- semaphore = threading.Semaphore(nproc)
- # list of threads to join when waiting for completion
- threads = []
+ for docname, doctree in docs:
+ self.write_doc(docname, doctree)
+ return warnings
+
+ def process_warnings(docs, wlist):
+ warnings.extend(wlist)
# warm up caches/compile templates using the first document
firstname, docnames = docnames[0], docnames[1:]
doctree = self.env.get_and_resolve_doctree(firstname, self)
self.write_doc_serialized(firstname, doctree)
self.write_doc(firstname, doctree)
- # for the rest, determine how many documents to write in one go
- ndocs = len(docnames)
- chunksize = min(ndocs // nproc, 10)
- if chunksize == 0:
- chunksize = 1
- nchunks, rest = divmod(ndocs, chunksize)
- if rest:
- nchunks += 1
- # partition documents in "chunks" that will be written by one Process
- chunks = [docnames[i*chunksize:(i+1)*chunksize] for i in range(nchunks)]
- for docnames in self.app.status_iterator(
- chunks, 'writing output... ', darkgreen, len(chunks)):
- docs = []
- for docname in docnames:
+
+ proc = ParallelProcess(write_process, process_warnings, nproc)
+ proc.set_arguments(docnames)
+
+ for chunk in self.app.status_iterator(proc.spawn(), 'writing output... ',
+ darkgreen, proc.nchunks):
+ for i, docname in enumerate(chunk):
doctree = self.env.get_and_resolve_doctree(docname, self)
self.write_doc_serialized(docname, doctree)
- docs.append((docname, doctree))
- # start a new thread to oversee the completion of this chunk
- semaphore.acquire()
- t = threading.Thread(target=process_thread, args=(docs,))
- t.setDaemon(True)
- t.start()
- threads.append(t)
+ chunk[i] = (docname, doctree)
# make sure all threads have finished
self.info(bold('waiting for workers... '))
- for t in threads:
- t.join()
+ proc.join()
def prepare_writing(self, docnames):
"""A place where you can add logic before :meth:`write_doc` is run"""
diff --git a/sphinx/environment.py b/sphinx/environment.py
index 2cb7adfd..86718ed8 100644
--- a/sphinx/environment.py
+++ b/sphinx/environment.py
@@ -22,14 +22,8 @@ from os import path
from glob import glob
from itertools import groupby
-try:
- import multiprocessing
- import threading
-except ImportError:
- multiprocessing = threading = None
-
from six import iteritems, itervalues, text_type, class_types
-from six.moves import cPickle as pickle, zip, queue
+from six.moves import cPickle as pickle, zip
from docutils import nodes
from docutils.io import FileInput, NullOutput
from docutils.core import Publisher
@@ -48,6 +42,7 @@ from sphinx.util.nodes import clean_astext, make_refnode, WarningStream
from sphinx.util.osutil import SEP, find_catalog_files, getcwd, fs_encoding
from sphinx.util.console import bold, purple
from sphinx.util.matching import compile_matchers
+from sphinx.util.parallel import ParallelProcess, parallel_available
from sphinx.util.websupport import is_commentable
from sphinx.errors import SphinxError, ExtensionError
from sphinx.locale import _
@@ -562,10 +557,7 @@ class BuildEnvironment:
# check if we should do parallel or serial read
par_ok = False
- if (len(added | changed) > 5 and
- multiprocessing and
- app.parallel > 1 and
- os.name == 'posix'):
+ if parallel_available and len(docnames) > 5 and app.parallel > 1:
par_ok = True
for extname, md in app._extension_metadata.items():
ext_ok = md.get('parallel_read_safe')
@@ -604,88 +596,44 @@ class BuildEnvironment:
self.read_doc(docname, app)
def _read_parallel(self, docnames, app, nproc):
- def read_process(docs, pipe):
+ # clear all outdated docs at once
+ for docname in docnames:
+ app.emit('env-purge-doc', self, docname)
+ self.clear_doc(docname)
+
+ def read_process(docs):
self.app = app
self.warnings = []
self.set_warnfunc(lambda *args: self.warnings.append(args))
- try:
- for docname in docs:
- self.read_doc(docname, app)
- except KeyboardInterrupt:
- # XXX return None?
- pass # do not print a traceback on Ctrl-C
+ for docname in docs:
+ self.read_doc(docname, app)
+ # allow pickling self to send it back
self.set_warnfunc(None)
del self.app
del self.domains
del self.config.values
del self.config
- pipe.send(self)
-
- def process_thread(docs):
- precv, psend = multiprocessing.Pipe(False)
- p = multiprocessing.Process(target=read_process, args=(docs, psend))
- p.start()
- # XXX error handling
- new_env = precv.recv()
- merge_queue.put((docs, new_env))
- p.join()
- semaphore.release()
-
- # allow only "nproc" worker processes at once
- semaphore = threading.Semaphore(nproc)
- # list of threads to join when waiting for completion
- threads = []
- # queue of other env objects to merge
- merge_queue = queue.Queue()
+ return self
- # clear all outdated docs at once
- for docname in docnames:
- app.emit('env-purge-doc', self, docname)
- self.clear_doc(docname)
+ def merge(docs, otherenv):
+ warnings.extend(otherenv.warnings)
+ self.merge_info_from(docs, otherenv, app)
- # determine how many documents to read in one go
- ndocs = len(docnames)
- chunksize = min(ndocs // nproc, 10)
- if chunksize == 0:
- chunksize = 1
- nchunks, rest = divmod(ndocs, chunksize)
- if rest:
- nchunks += 1
- # partition documents in "chunks" that will be written by one Process
- chunks = [docnames[i*chunksize:(i+1)*chunksize] for i in range(nchunks)]
+ proc = ParallelProcess(read_process, merge, nproc)
+ proc.set_arguments(docnames)
warnings = []
- merged = 0
- for chunk in app.status_iterator(chunks, 'reading sources... ',
- purple, len(chunks)):
- semaphore.acquire()
- t = threading.Thread(target=process_thread, args=(chunk,))
- t.setDaemon(True)
- t.start()
- threads.append(t)
- try:
- docs, other = merge_queue.get(False)
- except queue.Empty:
- pass
- else:
- warnings.extend(other.warnings)
- self.merge_info_from(docs, other, app)
- merged += 1
+ for chunk in app.status_iterator(proc.spawn(), 'reading sources... ',
+ purple, proc.nchunks):
+ pass # spawning in the iterator
- while merged < len(chunks):
- docs, other = merge_queue.get()
- warnings.extend(other.warnings)
- self.merge_info_from(docs, other, app)
- merged += 1
+ # make sure all threads have finished
+ app.info(bold('waiting for workers... '))
+ proc.join()
for warning in warnings:
self._warnfunc(*warning)
- # make sure all threads have finished
- app.info(bold('waiting for workers... '))
- for t in threads:
- t.join()
-
def check_dependents(self, already):
to_rewrite = self.assign_section_numbers()
for docname in to_rewrite:
diff --git a/sphinx/errors.py b/sphinx/errors.py
index 4d737e51..3d7a5eb4 100644
--- a/sphinx/errors.py
+++ b/sphinx/errors.py
@@ -10,6 +10,9 @@
:license: BSD, see LICENSE for details.
"""
+import traceback
+
+
class SphinxError(Exception):
"""
Base class for Sphinx errors that are shown to the user in a nicer
@@ -62,3 +65,13 @@ class PycodeError(Exception):
if len(self.args) > 1:
res += ' (exception was: %r)' % self.args[1]
return res
+
+
+class SphinxParallelError(Exception):
+ def __init__(self, orig_exc, traceback):
+ self.orig_exc = orig_exc
+ self.traceback = traceback
+
+ def __str__(self):
+ return traceback.format_exception_only(
+ self.orig_exc.__class__, self.orig_exc)[0].strip()
diff --git a/sphinx/util/__init__.py b/sphinx/util/__init__.py
index 30dc0cb0..e7277520 100644
--- a/sphinx/util/__init__.py
+++ b/sphinx/util/__init__.py
@@ -29,7 +29,7 @@ from docutils.utils import relative_path
import jinja2
import sphinx
-from sphinx.errors import PycodeError
+from sphinx.errors import PycodeError, SphinxParallelError
from sphinx.util.console import strip_colors
from sphinx.util.osutil import fs_encoding
@@ -191,7 +191,11 @@ _DEBUG_HEADER = '''\
def save_traceback(app):
"""Save the current exception's traceback in a temporary file."""
import platform
- exc = traceback.format_exc()
+ exc = sys.exc_info()[1]
+ if isinstance(exc, SphinxParallelError):
+ exc_format = '(Error in parallel process)\n' + exc.traceback
+ else:
+ exc_format = traceback.format_exc()
fd, path = tempfile.mkstemp('.log', 'sphinx-err-')
last_msgs = ''
if app is not None:
@@ -212,7 +216,7 @@ def save_traceback(app):
os.write(fd, ('# %s (%s) from %s\n' % (
extname, app._extension_metadata[extname]['version'],
modfile)).encode('utf-8'))
- os.write(fd, exc.encode('utf-8'))
+ os.write(fd, exc_format.encode('utf-8'))
os.close(fd)
return path
diff --git a/sphinx/util/parallel.py b/sphinx/util/parallel.py
new file mode 100644
index 00000000..44a69800
--- /dev/null
+++ b/sphinx/util/parallel.py
@@ -0,0 +1,106 @@
+# -*- coding: utf-8 -*-
+"""
+ sphinx.util.parallel
+ ~~~~~~~~~~~~~~~~~~~~
+
+ Parallel building utilities.
+
+ :copyright: Copyright 2007-2014 by the Sphinx team, see AUTHORS.
+ :license: BSD, see LICENSE for details.
+"""
+
+import os
+import traceback
+
+try:
+ import multiprocessing
+ import threading
+except ImportError:
+ multiprocessing = threading = None
+
+from six.moves import queue
+
+from sphinx.errors import SphinxParallelError
+
+# our parallel functionality only works for the forking Process
+parallel_available = multiprocessing and (os.name == 'posix')
+
+
+class ParallelProcess(object):
+
+ def __init__(self, process_func, result_func, nproc, maxbatch=10):
+ self.process_func = process_func
+ self.result_func = result_func
+ self.nproc = nproc
+ self.maxbatch = maxbatch
+ # list of threads to join when waiting for completion
+ self._threads = []
+ self._chunks = []
+ self.nchunks = 0
+ # queue of result objects to process
+ self.result_queue = queue.Queue()
+ self._nprocessed = 0
+
+ def set_arguments(self, arguments):
+ # determine how many documents to read in one go
+ nargs = len(arguments)
+ chunksize = min(nargs // self.nproc, self.maxbatch)
+ if chunksize == 0:
+ chunksize = 1
+ nchunks, rest = divmod(nargs, chunksize)
+ if rest:
+ nchunks += 1
+ # partition documents in "chunks" that will be written by one Process
+ self._chunks = [arguments[i*chunksize:(i+1)*chunksize] for i in range(nchunks)]
+ self.nchunks = len(self._chunks)
+
+ def spawn(self):
+ assert self._chunks
+
+ def process(pipe, chunk):
+ try:
+ ret = self.process_func(chunk)
+ pipe.send((False, ret))
+ except BaseException as err:
+ pipe.send((True, (err, traceback.format_exc())))
+
+ def process_thread(chunk):
+ precv, psend = multiprocessing.Pipe(False)
+ proc = multiprocessing.Process(target=process, args=(psend, chunk))
+ proc.start()
+ result = precv.recv()
+ self.result_queue.put((chunk,) + result)
+ proc.join()
+ semaphore.release()
+
+ # allow only "nproc" worker processes at once
+ semaphore = threading.Semaphore(self.nproc)
+
+ for chunk in self._chunks:
+ yield chunk
+ semaphore.acquire()
+ t = threading.Thread(target=process_thread, args=(chunk,))
+ t.setDaemon(True)
+ t.start()
+ self._threads.append(t)
+ # try processing results already in parallel
+ try:
+ chunk, exc, result = self.result_queue.get(False)
+ except queue.Empty:
+ pass
+ else:
+ if exc:
+ raise SphinxParallelError(*result)
+ self.result_func(chunk, result)
+ self._nprocessed += 1
+
+ def join(self):
+ while self._nprocessed < self.nchunks:
+ chunk, exc, result = self.result_queue.get()
+ if exc:
+ raise SphinxParallelError(*result)
+ self.result_func(chunk, result)
+ self._nprocessed += 1
+
+ for t in self._threads:
+ t.join()