diff options
author | Georg Brandl <georg@python.org> | 2014-09-22 17:29:52 +0200 |
---|---|---|
committer | Georg Brandl <georg@python.org> | 2014-09-22 17:29:52 +0200 |
commit | 0488306cad87d948c89f00f2968eabc3dc5caec0 (patch) | |
tree | c3fabb7129b30d77bbae2304bddd1c03cce01d09 /sphinx/util | |
parent | 1d8e60fd86bc5297ad419fac45c5ea2a6b23513f (diff) | |
download | sphinx-0488306cad87d948c89f00f2968eabc3dc5caec0.tar.gz |
Refactor parallel process into a base class that executes any task, and a derived class that executes a batch of the same task.
Diffstat (limited to 'sphinx/util')
-rw-r--r-- | sphinx/util/parallel.py | 145 |
1 files changed, 91 insertions, 54 deletions
diff --git a/sphinx/util/parallel.py b/sphinx/util/parallel.py index 44a69800..3fb40895 100644 --- a/sphinx/util/parallel.py +++ b/sphinx/util/parallel.py @@ -26,20 +26,102 @@ from sphinx.errors import SphinxParallelError parallel_available = multiprocessing and (os.name == 'posix') -class ParallelProcess(object): +class SerialTasks(object): + """Has the same interface as ParallelTasks, but executes tasks directly.""" - def __init__(self, process_func, result_func, nproc, maxbatch=10): - self.process_func = process_func - self.result_func = result_func + def __init__(self, nproc=1): + pass + + def add_task(self, task_func, arg=None, result_func=None): + if arg is not None: + res = task_func(arg) + else: + res = task_func() + if result_func: + result_func(res) + + def join(self): + pass + + +class ParallelTasks(object): + """Executes *nproc* tasks in parallel after forking.""" + + def __init__(self, nproc): self.nproc = nproc - self.maxbatch = maxbatch # list of threads to join when waiting for completion self._threads = [] - self._chunks = [] - self.nchunks = 0 + self._nthreads = 0 # queue of result objects to process self.result_queue = queue.Queue() self._nprocessed = 0 + # maps tasks to result functions + self._result_funcs = {} + # allow only "nproc" worker processes at once + self._semaphore = threading.Semaphore(self.nproc) + + def _process_thread(self, tid, func, arg): + def process(pipe, arg): + try: + if arg is None: + ret = func() + else: + ret = func(arg) + pipe.send((False, ret)) + except BaseException as err: + pipe.send((True, (err, traceback.format_exc()))) + + precv, psend = multiprocessing.Pipe(False) + proc = multiprocessing.Process(target=process, args=(psend, arg)) + proc.start() + result = precv.recv() + self.result_queue.put((tid, arg) + result) + proc.join() + self._semaphore.release() + + def add_task(self, task_func, arg=None, result_func=None): + tid = len(self._threads) + self._semaphore.acquire() + t = threading.Thread(target=self._process_thread, + args=(tid, task_func, arg)) + t.setDaemon(True) + t.start() + self._nthreads += 1 + self._threads.append(t) + self._result_funcs[tid] = result_func or (lambda *x: None) + # try processing results already in parallel + try: + tid, arg, exc, result = self.result_queue.get(False) + except queue.Empty: + pass + else: + if exc: + raise SphinxParallelError(*result) + self._result_funcs.pop(tid)(arg, result) + self._nprocessed += 1 + + def join(self): + while self._nprocessed < self._nthreads: + tid, arg, exc, result = self.result_queue.get() + if exc: + raise SphinxParallelError(*result) + self._result_funcs.pop(tid)(arg, result) + self._nprocessed += 1 + + for t in self._threads: + t.join() + + +class ParallelChunked(ParallelTasks): + """Executes chunks of a list of arguments in parallel.""" + + def __init__(self, process_func, result_func, nproc, maxbatch=10): + ParallelTasks.__init__(self, nproc) + self.process_func = process_func + self.result_func = result_func + self.maxbatch = maxbatch + self._chunks = [] + self.nchunks = 0 def set_arguments(self, arguments): # determine how many documents to read in one go @@ -54,53 +136,8 @@ class ParallelProcess(object): self._chunks = [arguments[i*chunksize:(i+1)*chunksize] for i in range(nchunks)] self.nchunks = len(self._chunks) - def spawn(self): + def iter_chunks(self): assert self._chunks - - def process(pipe, chunk): - try: - ret = self.process_func(chunk) - pipe.send((False, ret)) - except BaseException as err: - pipe.send((True, (err, traceback.format_exc()))) - - def process_thread(chunk): - precv, psend = multiprocessing.Pipe(False) - proc = multiprocessing.Process(target=process, args=(psend, chunk)) - proc.start() - result = precv.recv() - self.result_queue.put((chunk,) + result) - proc.join() - semaphore.release() - - # allow only "nproc" worker processes at once - semaphore = threading.Semaphore(self.nproc) - for chunk in self._chunks: yield chunk - semaphore.acquire() - t = threading.Thread(target=process_thread, args=(chunk,)) - t.setDaemon(True) - t.start() - self._threads.append(t) - # try processing results already in parallel - try: - chunk, exc, result = self.result_queue.get(False) - except queue.Empty: - pass - else: - if exc: - raise SphinxParallelError(*result) - self.result_func(chunk, result) - self._nprocessed += 1 - - def join(self): - while self._nprocessed < self.nchunks: - chunk, exc, result = self.result_queue.get() - if exc: - raise SphinxParallelError(*result) - self.result_func(chunk, result) - self._nprocessed += 1 - - for t in self._threads: - t.join() + self.add_task(self.process_func, chunk, self.result_func) |