summaryrefslogtreecommitdiff
path: root/nose
diff options
context:
space:
mode:
authorBuck Golemon <workitharder@gmail.com>2011-04-30 20:29:34 -0700
committerBuck Golemon <workitharder@gmail.com>2011-04-30 20:29:34 -0700
commitf3f3889756e42ffae238cd0b40c0fbb0413e6129 (patch)
tree2a133e32a46a36af76fc36fe508c6a69f87759eb /nose
parent01cc27384a411a5efe2c36ca98032fd6de2438b8 (diff)
downloadnose-f3f3889756e42ffae238cd0b40c0fbb0413e6129.tar.gz
fix pre-existing flaws in the multiprocessing doctest
main fix was to use multiprocess.Manager in place of the "raw" objects all changes in the plugin are simply reactions to differences in API between the raw and managed object types
Diffstat (limited to 'nose')
-rw-r--r--nose/plugins/multiprocess.py50
1 files changed, 20 insertions, 30 deletions
diff --git a/nose/plugins/multiprocess.py b/nose/plugins/multiprocess.py
index b1c1495..149b695 100644
--- a/nose/plugins/multiprocess.py
+++ b/nose/plugins/multiprocess.py
@@ -139,11 +139,11 @@ class TimedOutException(Exception):
def _import_mp():
global Process, Queue, Pool, Event, Value, Array
try:
- from multiprocessing import (Process as Process_, Queue as Queue_,
- Pool as Pool_, Event as Event_,
- Value as Value_, Array as Array_)
- Process, Queue, Pool, Event, Value, Array = (Process_, Queue_, Pool_,
- Event_, Value_, Array_)
+ from multiprocessing import Manager, Process
+ m = Manager()
+ Queue, Pool, Event, Value, Array = (
+ m.Queue, m.Pool, m.Event, m.Value, m.Array
+ )
except ImportError:
warn("multiprocessing module is not available, multiprocess plugin "
"cannot be used", RuntimeWarning)
@@ -320,9 +320,8 @@ class MultiProcessTestRunner(TextTestRunner):
log.debug("Starting %s workers", self.config.multiprocess_workers)
for i in range(self.config.multiprocess_workers):
- currentaddr = Array('c',1000)
- currentaddr.value = bytes_('')
- currentstart = Value('d')
+ currentaddr = Value('c',bytes_(''))
+ currentstart = Value('d',0.0)
keyboardCaught = Event()
p = Process(target=runner, args=(i, testQueue, resultQueue,
currentaddr, currentstart,
@@ -379,10 +378,8 @@ class MultiProcessTestRunner(TextTestRunner):
workers[iworker].join(timeout=1)
if not shouldStop.is_set() and not testQueue.empty():
log.debug('starting new process on worker %s',iworker)
- currentaddr = Array('c',1000)
- currentaddr.value = bytes_('')
- currentstart = Value('d')
- currentstart.value = time.time()
+ currentaddr = Value('c',bytes_(''))
+ currentstart = Value('d',time.time())
keyboardCaught = Event()
workers[iworker] = Process(target=runner,
args=(iworker, testQueue,
@@ -406,9 +403,9 @@ class MultiProcessTestRunner(TextTestRunner):
for iworker, w in enumerate(workers):
if w.is_alive():
worker_addr = bytes_(w.currentaddr.value,'ascii')
- timeprocessing = time.time()-w.currentstart.value
- if (len(worker_addr) == 0
- and timeprocessing > self.config.multiprocess_timeout-0.1):
+ timeprocessing = time.time() - w.currentstart.value
+ if ( len(worker_addr) == 0
+ and timeprocessing > self.config.multiprocess_timeout-0.1):
log.debug('worker %d has finished its work item, '
'but is not exiting? do we wait for it?',
iworker)
@@ -431,10 +428,8 @@ class MultiProcessTestRunner(TextTestRunner):
# have to terminate...
log.error("terminating worker %s",iworker)
w.terminate()
- currentaddr = Array('c',1000)
- currentaddr.value = bytes_('')
- currentstart = Value('d')
- currentstart.value = time.time()
+ currentaddr = Value('c',bytes_(''))
+ currentstart = Value('d',time.time())
keyboardCaught = Event()
workers[iworker] = Process(target=runner,
args=(iworker, testQueue, resultQueue,
@@ -626,17 +621,12 @@ def runner(ix, testQueue, resultQueue, currentaddr, currentstart,
keyboardCaught, shouldStop, loaderClass, resultClass, config):
try:
try:
- try:
- return __runner(ix, testQueue, resultQueue, currentaddr, currentstart,
- keyboardCaught, shouldStop, loaderClass, resultClass, config)
- except KeyboardInterrupt:
- keyboardCaught.set()
- log.debug('Worker %s keyboard interrupt, stopping',ix)
- except Empty:
- log.debug("Worker %s timed out waiting for tasks", ix)
- finally:
- testQueue.close()
- resultQueue.close()
+ return __runner(ix, testQueue, resultQueue, currentaddr, currentstart,
+ keyboardCaught, shouldStop, loaderClass, resultClass, config)
+ except KeyboardInterrupt:
+ log.debug('Worker %s keyboard interrupt, stopping',ix)
+ except Empty:
+ log.debug("Worker %s timed out waiting for tasks", ix)
def __runner(ix, testQueue, resultQueue, currentaddr, currentstart,
keyboardCaught, shouldStop, loaderClass, resultClass, config):