diff options
author | Buck Golemon <workitharder@gmail.com> | 2011-04-30 20:29:34 -0700 |
---|---|---|
committer | Buck Golemon <workitharder@gmail.com> | 2011-04-30 20:29:34 -0700 |
commit | f3f3889756e42ffae238cd0b40c0fbb0413e6129 (patch) | |
tree | 2a133e32a46a36af76fc36fe508c6a69f87759eb /nose | |
parent | 01cc27384a411a5efe2c36ca98032fd6de2438b8 (diff) | |
download | nose-f3f3889756e42ffae238cd0b40c0fbb0413e6129.tar.gz |
fix pre-existing flaws in the multiprocessing doctest
main fix was to use multiprocess.Manager in place of the "raw" objects
all changes in the plugin are simply reactions to differences in
API between the raw and managed object types
Diffstat (limited to 'nose')
-rw-r--r-- | nose/plugins/multiprocess.py | 50 |
1 files changed, 20 insertions, 30 deletions
diff --git a/nose/plugins/multiprocess.py b/nose/plugins/multiprocess.py index b1c1495..149b695 100644 --- a/nose/plugins/multiprocess.py +++ b/nose/plugins/multiprocess.py @@ -139,11 +139,11 @@ class TimedOutException(Exception): def _import_mp(): global Process, Queue, Pool, Event, Value, Array try: - from multiprocessing import (Process as Process_, Queue as Queue_, - Pool as Pool_, Event as Event_, - Value as Value_, Array as Array_) - Process, Queue, Pool, Event, Value, Array = (Process_, Queue_, Pool_, - Event_, Value_, Array_) + from multiprocessing import Manager, Process + m = Manager() + Queue, Pool, Event, Value, Array = ( + m.Queue, m.Pool, m.Event, m.Value, m.Array + ) except ImportError: warn("multiprocessing module is not available, multiprocess plugin " "cannot be used", RuntimeWarning) @@ -320,9 +320,8 @@ class MultiProcessTestRunner(TextTestRunner): log.debug("Starting %s workers", self.config.multiprocess_workers) for i in range(self.config.multiprocess_workers): - currentaddr = Array('c',1000) - currentaddr.value = bytes_('') - currentstart = Value('d') + currentaddr = Value('c',bytes_('')) + currentstart = Value('d',0.0) keyboardCaught = Event() p = Process(target=runner, args=(i, testQueue, resultQueue, currentaddr, currentstart, @@ -379,10 +378,8 @@ class MultiProcessTestRunner(TextTestRunner): workers[iworker].join(timeout=1) if not shouldStop.is_set() and not testQueue.empty(): log.debug('starting new process on worker %s',iworker) - currentaddr = Array('c',1000) - currentaddr.value = bytes_('') - currentstart = Value('d') - currentstart.value = time.time() + currentaddr = Value('c',bytes_('')) + currentstart = Value('d',time.time()) keyboardCaught = Event() workers[iworker] = Process(target=runner, args=(iworker, testQueue, @@ -406,9 +403,9 @@ class MultiProcessTestRunner(TextTestRunner): for iworker, w in enumerate(workers): if w.is_alive(): worker_addr = bytes_(w.currentaddr.value,'ascii') - timeprocessing = time.time()-w.currentstart.value - if (len(worker_addr) == 0 - and timeprocessing > self.config.multiprocess_timeout-0.1): + timeprocessing = time.time() - w.currentstart.value + if ( len(worker_addr) == 0 + and timeprocessing > self.config.multiprocess_timeout-0.1): log.debug('worker %d has finished its work item, ' 'but is not exiting? do we wait for it?', iworker) @@ -431,10 +428,8 @@ class MultiProcessTestRunner(TextTestRunner): # have to terminate... log.error("terminating worker %s",iworker) w.terminate() - currentaddr = Array('c',1000) - currentaddr.value = bytes_('') - currentstart = Value('d') - currentstart.value = time.time() + currentaddr = Value('c',bytes_('')) + currentstart = Value('d',time.time()) keyboardCaught = Event() workers[iworker] = Process(target=runner, args=(iworker, testQueue, resultQueue, @@ -626,17 +621,12 @@ def runner(ix, testQueue, resultQueue, currentaddr, currentstart, keyboardCaught, shouldStop, loaderClass, resultClass, config): try: try: - try: - return __runner(ix, testQueue, resultQueue, currentaddr, currentstart, - keyboardCaught, shouldStop, loaderClass, resultClass, config) - except KeyboardInterrupt: - keyboardCaught.set() - log.debug('Worker %s keyboard interrupt, stopping',ix) - except Empty: - log.debug("Worker %s timed out waiting for tasks", ix) - finally: - testQueue.close() - resultQueue.close() + return __runner(ix, testQueue, resultQueue, currentaddr, currentstart, + keyboardCaught, shouldStop, loaderClass, resultClass, config) + except KeyboardInterrupt: + log.debug('Worker %s keyboard interrupt, stopping',ix) + except Empty: + log.debug("Worker %s timed out waiting for tasks", ix) def __runner(ix, testQueue, resultQueue, currentaddr, currentstart, keyboardCaught, shouldStop, loaderClass, resultClass, config): |