summaryrefslogtreecommitdiff
path: root/sphinx/util
diff options
context:
space:
mode:
authorGeorg Brandl <georg@python.org>2014-09-22 17:29:52 +0200
committerGeorg Brandl <georg@python.org>2014-09-22 17:29:52 +0200
commit0488306cad87d948c89f00f2968eabc3dc5caec0 (patch)
treec3fabb7129b30d77bbae2304bddd1c03cce01d09 /sphinx/util
parent1d8e60fd86bc5297ad419fac45c5ea2a6b23513f (diff)
downloadsphinx-0488306cad87d948c89f00f2968eabc3dc5caec0.tar.gz
Refactor parallel process into a base class that executes any task, and a derived class that executes a batch of the same task.
Diffstat (limited to 'sphinx/util')
-rw-r--r--sphinx/util/parallel.py145
1 files changed, 91 insertions, 54 deletions
diff --git a/sphinx/util/parallel.py b/sphinx/util/parallel.py
index 44a69800..3fb40895 100644
--- a/sphinx/util/parallel.py
+++ b/sphinx/util/parallel.py
@@ -26,20 +26,102 @@ from sphinx.errors import SphinxParallelError
parallel_available = multiprocessing and (os.name == 'posix')
-class ParallelProcess(object):
+class SerialTasks(object):
+ """Has the same interface as ParallelTasks, but executes tasks directly."""
- def __init__(self, process_func, result_func, nproc, maxbatch=10):
- self.process_func = process_func
- self.result_func = result_func
+ def __init__(self, nproc=1):
+ pass
+
+ def add_task(self, task_func, arg=None, result_func=None):
+ if arg is not None:
+ res = task_func(arg)
+ else:
+ res = task_func()
+ if result_func:
+ result_func(res)
+
+ def join(self):
+ pass
+
+
+class ParallelTasks(object):
+ """Executes *nproc* tasks in parallel after forking."""
+
+ def __init__(self, nproc):
self.nproc = nproc
- self.maxbatch = maxbatch
# list of threads to join when waiting for completion
self._threads = []
- self._chunks = []
- self.nchunks = 0
+ self._nthreads = 0
# queue of result objects to process
self.result_queue = queue.Queue()
self._nprocessed = 0
+ # maps tasks to result functions
+ self._result_funcs = {}
+ # allow only "nproc" worker processes at once
+ self._semaphore = threading.Semaphore(self.nproc)
+
+ def _process_thread(self, tid, func, arg):
+ def process(pipe, arg):
+ try:
+ if arg is None:
+ ret = func()
+ else:
+ ret = func(arg)
+ pipe.send((False, ret))
+ except BaseException as err:
+ pipe.send((True, (err, traceback.format_exc())))
+
+ precv, psend = multiprocessing.Pipe(False)
+ proc = multiprocessing.Process(target=process, args=(psend, arg))
+ proc.start()
+ result = precv.recv()
+ self.result_queue.put((tid, arg) + result)
+ proc.join()
+ self._semaphore.release()
+
+ def add_task(self, task_func, arg=None, result_func=None):
+ tid = len(self._threads)
+ self._semaphore.acquire()
+ t = threading.Thread(target=self._process_thread,
+ args=(tid, task_func, arg))
+ t.setDaemon(True)
+ t.start()
+ self._nthreads += 1
+ self._threads.append(t)
+ self._result_funcs[tid] = result_func or (lambda *x: None)
+ # try processing results already in parallel
+ try:
+ tid, arg, exc, result = self.result_queue.get(False)
+ except queue.Empty:
+ pass
+ else:
+ if exc:
+ raise SphinxParallelError(*result)
+ self._result_funcs.pop(tid)(arg, result)
+ self._nprocessed += 1
+
+ def join(self):
+ while self._nprocessed < self._nthreads:
+ tid, arg, exc, result = self.result_queue.get()
+ if exc:
+ raise SphinxParallelError(*result)
+ self._result_funcs.pop(tid)(arg, result)
+ self._nprocessed += 1
+
+ for t in self._threads:
+ t.join()
+
+
+class ParallelChunked(ParallelTasks):
+ """Executes chunks of a list of arguments in parallel."""
+
+ def __init__(self, process_func, result_func, nproc, maxbatch=10):
+ ParallelTasks.__init__(self, nproc)
+ self.process_func = process_func
+ self.result_func = result_func
+ self.maxbatch = maxbatch
+ self._chunks = []
+ self.nchunks = 0
def set_arguments(self, arguments):
# determine how many documents to read in one go
@@ -54,53 +136,8 @@ class ParallelProcess(object):
self._chunks = [arguments[i*chunksize:(i+1)*chunksize] for i in range(nchunks)]
self.nchunks = len(self._chunks)
- def spawn(self):
+ def iter_chunks(self):
assert self._chunks
-
- def process(pipe, chunk):
- try:
- ret = self.process_func(chunk)
- pipe.send((False, ret))
- except BaseException as err:
- pipe.send((True, (err, traceback.format_exc())))
-
- def process_thread(chunk):
- precv, psend = multiprocessing.Pipe(False)
- proc = multiprocessing.Process(target=process, args=(psend, chunk))
- proc.start()
- result = precv.recv()
- self.result_queue.put((chunk,) + result)
- proc.join()
- semaphore.release()
-
- # allow only "nproc" worker processes at once
- semaphore = threading.Semaphore(self.nproc)
-
for chunk in self._chunks:
yield chunk
- semaphore.acquire()
- t = threading.Thread(target=process_thread, args=(chunk,))
- t.setDaemon(True)
- t.start()
- self._threads.append(t)
- # try processing results already in parallel
- try:
- chunk, exc, result = self.result_queue.get(False)
- except queue.Empty:
- pass
- else:
- if exc:
- raise SphinxParallelError(*result)
- self.result_func(chunk, result)
- self._nprocessed += 1
-
- def join(self):
- while self._nprocessed < self.nchunks:
- chunk, exc, result = self.result_queue.get()
- if exc:
- raise SphinxParallelError(*result)
- self.result_func(chunk, result)
- self._nprocessed += 1
-
- for t in self._threads:
- t.join()
+ self.add_task(self.process_func, chunk, self.result_func)