summaryrefslogtreecommitdiff
path: root/nose
diff options
context:
space:
mode:
authorkumar <kumar.mcmillan@gmail.com>2011-05-02 15:22:23 -0500
committerkumar <kumar.mcmillan@gmail.com>2011-05-02 15:22:23 -0500
commit3a5e71e4e0914cdce1acac9d669313fac05c7a1c (patch)
treeab7d1cf1262042816b9fdad51a419ffdf86bf38b /nose
parent0e2ef2befd4e35bf8488c066893d91ff15dd2ac0 (diff)
parent9c7fda3c3c0a4657aad706ab9ab297a102b10122 (diff)
downloadnose-3a5e71e4e0914cdce1acac9d669313fac05c7a1c.tar.gz
Merged with attr plugin fixes
Diffstat (limited to 'nose')
-rw-r--r--nose/plugins/multiprocess.py140
-rw-r--r--nose/plugins/plugintest.py91
-rw-r--r--nose/pyversion.py6
3 files changed, 158 insertions, 79 deletions
diff --git a/nose/plugins/multiprocess.py b/nose/plugins/multiprocess.py
index ec54978..149b695 100644
--- a/nose/plugins/multiprocess.py
+++ b/nose/plugins/multiprocess.py
@@ -139,11 +139,11 @@ class TimedOutException(Exception):
def _import_mp():
global Process, Queue, Pool, Event, Value, Array
try:
- from multiprocessing import (Process as Process_, Queue as Queue_,
- Pool as Pool_, Event as Event_,
- Value as Value_, Array as Array_)
- Process, Queue, Pool, Event, Value, Array = (Process_, Queue_, Pool_,
- Event_, Value_, Array_)
+ from multiprocessing import Manager, Process
+ m = Manager()
+ Queue, Pool, Event, Value, Array = (
+ m.Queue, m.Pool, m.Event, m.Value, m.Array
+ )
except ImportError:
warn("multiprocessing module is not available, multiprocess plugin "
"cannot be used", RuntimeWarning)
@@ -293,7 +293,11 @@ class MultiProcessTestRunner(TextTestRunner):
case(result) # run here to capture the failure
continue
# handle shared fixtures
- if isinstance(case, ContextSuite) and self.sharedFixtures(case):
+ if isinstance(case, ContextSuite) and case.context is failure.Failure:
+ log.debug("Case is a Failure")
+ case(result) # run here to capture the failure
+ continue
+ elif isinstance(case, ContextSuite) and self.sharedFixtures(case):
log.debug("%s has shared fixtures", case)
try:
case.setUp()
@@ -316,9 +320,8 @@ class MultiProcessTestRunner(TextTestRunner):
log.debug("Starting %s workers", self.config.multiprocess_workers)
for i in range(self.config.multiprocess_workers):
- currentaddr = Array('c',1000)
- currentaddr.value = bytes_('')
- currentstart = Value('d')
+ currentaddr = Value('c',bytes_(''))
+ currentstart = Value('d',0.0)
keyboardCaught = Event()
p = Process(target=runner, args=(i, testQueue, resultQueue,
currentaddr, currentstart,
@@ -375,10 +378,8 @@ class MultiProcessTestRunner(TextTestRunner):
workers[iworker].join(timeout=1)
if not shouldStop.is_set() and not testQueue.empty():
log.debug('starting new process on worker %s',iworker)
- currentaddr = Array('c',1000)
- currentaddr.value = bytes_('')
- currentstart = Value('d')
- currentstart.value = time.time()
+ currentaddr = Value('c',bytes_(''))
+ currentstart = Value('d',time.time())
keyboardCaught = Event()
workers[iworker] = Process(target=runner,
args=(iworker, testQueue,
@@ -401,10 +402,10 @@ class MultiProcessTestRunner(TextTestRunner):
any_alive = False
for iworker, w in enumerate(workers):
if w.is_alive():
- worker_addr = str(w.currentaddr.value,'ascii')
- timeprocessing = time.time()-w.currentstart.value
- if (len(worker_addr) == 0
- and timeprocessing > self.config.multiprocess_timeout-0.1):
+ worker_addr = bytes_(w.currentaddr.value,'ascii')
+ timeprocessing = time.time() - w.currentstart.value
+ if ( len(worker_addr) == 0
+ and timeprocessing > self.config.multiprocess_timeout-0.1):
log.debug('worker %d has finished its work item, '
'but is not exiting? do we wait for it?',
iworker)
@@ -427,10 +428,8 @@ class MultiProcessTestRunner(TextTestRunner):
# have to terminate...
log.error("terminating worker %s",iworker)
w.terminate()
- currentaddr = Array('c',1000)
- currentaddr.value = bytes_('')
- currentstart = Value('d')
- currentstart.value = time.time()
+ currentaddr = Value('c',bytes_(''))
+ currentstart = Value('d',time.time())
keyboardCaught = Event()
workers[iworker] = Process(target=runner,
args=(iworker, testQueue, resultQueue,
@@ -620,6 +619,18 @@ class MultiProcessTestRunner(TextTestRunner):
def runner(ix, testQueue, resultQueue, currentaddr, currentstart,
keyboardCaught, shouldStop, loaderClass, resultClass, config):
+ try:
+ try:
+ return __runner(ix, testQueue, resultQueue, currentaddr, currentstart,
+ keyboardCaught, shouldStop, loaderClass, resultClass, config)
+ except KeyboardInterrupt:
+ log.debug('Worker %s keyboard interrupt, stopping',ix)
+ except Empty:
+ log.debug("Worker %s timed out waiting for tasks", ix)
+
+def __runner(ix, testQueue, resultQueue, currentaddr, currentstart,
+ keyboardCaught, shouldStop, loaderClass, resultClass, config):
+
config = pickle.loads(config)
dummy_parser = config.parserClass()
if _instantiate_plugins is not None:
@@ -659,54 +670,47 @@ def runner(ix, testQueue, resultQueue, currentaddr, currentstart,
failures,
errors,
errorClasses)
- try:
+ for test_addr, arg in iter(get, 'STOP'):
+ if shouldStop.is_set():
+ log.exception('Worker %d STOPPED',ix)
+ break
+ result = makeResult()
+ test = loader.loadTestsFromNames([test_addr])
+ test.testQueue = testQueue
+ test.tasks = []
+ test.arg = arg
+ log.debug("Worker %s Test is %s (%s)", ix, test_addr, test)
try:
- for test_addr, arg in iter(get, 'STOP'):
- if shouldStop.is_set():
- log.exception('Worker %d STOPPED',ix)
- break
- result = makeResult()
- test = loader.loadTestsFromNames([test_addr])
- test.testQueue = testQueue
- test.tasks = []
- test.arg = arg
- log.debug("Worker %s Test is %s (%s)", ix, test_addr, test)
- try:
- if arg is not None:
- test_addr = test_addr + str(arg)
- currentaddr.value = bytes_(test_addr)
- currentstart.value = time.time()
- test(result)
- currentaddr.value = bytes_('')
- resultQueue.put((ix, test_addr, test.tasks, batch(result)))
- except KeyboardInterrupt:
- keyboardCaught.set()
- if len(currentaddr.value) > 0:
- log.exception('Worker %s keyboard interrupt, failing '
- 'current test %s',ix,test_addr)
- currentaddr.value = bytes_('')
- failure.Failure(*sys.exc_info())(result)
- resultQueue.put((ix, test_addr, test.tasks, batch(result)))
- else:
- log.debug('Worker %s test %s timed out',ix,test_addr)
- resultQueue.put((ix, test_addr, test.tasks, batch(result)))
- except SystemExit:
- currentaddr.value = bytes_('')
- log.exception('Worker %s system exit',ix)
- raise
- except:
- currentaddr.value = bytes_('')
- log.exception("Worker %s error running test or returning "
- "results",ix)
- failure.Failure(*sys.exc_info())(result)
- resultQueue.put((ix, test_addr, test.tasks, batch(result)))
- if config.multiprocess_restartworker:
- break
- except Empty:
- log.debug("Worker %s timed out waiting for tasks", ix)
- finally:
- testQueue.close()
- resultQueue.close()
+ if arg is not None:
+ test_addr = test_addr + str(arg)
+ currentaddr.value = bytes_(test_addr)
+ currentstart.value = time.time()
+ test(result)
+ currentaddr.value = bytes_('')
+ resultQueue.put((ix, test_addr, test.tasks, batch(result)))
+ except KeyboardInterrupt:
+ keyboardCaught.set()
+ if len(currentaddr.value) > 0:
+ log.exception('Worker %s keyboard interrupt, failing '
+ 'current test %s',ix,test_addr)
+ currentaddr.value = bytes_('')
+ failure.Failure(*sys.exc_info())(result)
+ resultQueue.put((ix, test_addr, test.tasks, batch(result)))
+ else:
+ log.debug('Worker %s test %s timed out',ix,test_addr)
+ resultQueue.put((ix, test_addr, test.tasks, batch(result)))
+ except SystemExit:
+ currentaddr.value = bytes_('')
+ log.exception('Worker %s system exit',ix)
+ raise
+ except:
+ currentaddr.value = bytes_('')
+ log.exception("Worker %s error running test or returning "
+ "results",ix)
+ failure.Failure(*sys.exc_info())(result)
+ resultQueue.put((ix, test_addr, test.tasks, batch(result)))
+ if config.multiprocess_restartworker:
+ break
log.debug("Worker %s ending", ix)
diff --git a/nose/plugins/plugintest.py b/nose/plugins/plugintest.py
index 49f1884..68e8941 100644
--- a/nose/plugins/plugintest.py
+++ b/nose/plugins/plugintest.py
@@ -103,9 +103,78 @@ try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
-
+
__all__ = ['PluginTester', 'run']
+from os import getpid
+class MultiProcessFile(object):
+ """
+ helper for testing multiprocessing
+
+ multiprocessing poses a problem for doctests, since the strategy
+ of replacing sys.stdout/stderr with file-like objects then
+ inspecting the results won't work: the child processes will
+ write to the objects, but the data will not be reflected
+ in the parent doctest-ing process.
+
+ The solution is to create file-like objects which will interact with
+ multiprocessing in a more desirable way.
+
+ All processes can write to this object, but only the creator can read.
+ This allows the testing system to see a unified picture of I/O.
+ """
+ def __init__(self):
+ # per advice at:
+ # http://docs.python.org/library/multiprocessing.html#all-platforms
+ self.__master = getpid()
+ self.__queue = Manager().Queue()
+ self.__buffer = StringIO()
+ self.softspace = 0
+
+ def buffer(self):
+ if getpid() != self.__master:
+ return
+
+ from Queue import Empty
+ from collections import defaultdict
+ cache = defaultdict(str)
+ while True:
+ try:
+ pid, data = self.__queue.get_nowait()
+ except Empty:
+ break
+ if pid == ():
+ #show parent output after children
+ #this is what users see, usually
+ pid = ( 1e100, ) # googol!
+ cache[pid] += data
+ for pid in sorted(cache):
+ #self.__buffer.write( '%s wrote: %r\n' % (pid, cache[pid]) ) #DEBUG
+ self.__buffer.write( cache[pid] )
+ def write(self, data):
+ # note that these pids are in the form of current_process()._identity
+ # rather than OS pids
+ from multiprocessing import current_process
+ pid = current_process()._identity
+ self.__queue.put((pid, data))
+ def __iter__(self):
+ "getattr doesn't work for iter()"
+ self.buffer()
+ return self.__buffer
+ def seek(self, offset, whence=0):
+ self.buffer()
+ return self.__buffer.seek(offset, whence)
+ def getvalue(self):
+ self.buffer()
+ return self.__buffer.getvalue()
+ def __getattr__(self, attr):
+ return getattr(self.__buffer, attr)
+
+try:
+ from multiprocessing import Manager
+ Buffer = MultiProcessFile
+except ImportError:
+ Buffer = StringIO
class PluginTester(object):
"""A mixin for testing nose plugins in their runtime environment.
@@ -177,7 +246,7 @@ class PluginTester(object):
from nose.plugins.manager import PluginManager
suite = None
- stream = StringIO()
+ stream = Buffer()
conf = Config(env=self.env,
stream=stream,
plugins=PluginManager(plugins=self.plugins))
@@ -214,16 +283,18 @@ class AccessDecorator(object):
def __contains__(self, val):
return val in self._buf
def __iter__(self):
- return self.stream
+ return iter(self.stream)
def __str__(self):
return self._buf
def blankline_separated_blocks(text):
+ "a bunch of === characters is also considered a blank line"
block = []
for line in text.splitlines(True):
block.append(line)
- if not line.strip():
+ line = line.strip()
+ if not line or line.startswith('===') and not line.strip('='):
yield "".join(block)
block = []
if block:
@@ -240,13 +311,15 @@ def remove_stack_traces(out):
| innermost\ last
) \) :
)
- \s* $ # toss trailing whitespace on the header.
- (?P<stack> .*?) # don't blink: absorb stuff until...
- ^ (?P<msg> \w+ .*) # a line *starts* with alphanum.
+ \s* $ # toss trailing whitespace on the header.
+ (?P<stack> .*?) # don't blink: absorb stuff until...
+ ^(?=\w) # a line *starts* with alphanum.
+ .*?(?P<exception> \w+ ) # exception name
+ (?P<msg> [:\n] .*) # the rest
""", re.VERBOSE | re.MULTILINE | re.DOTALL)
blocks = []
for block in blankline_separated_blocks(out):
- blocks.append(traceback_re.sub(r"\g<hdr>\n...\n\g<msg>", block))
+ blocks.append(traceback_re.sub(r"\g<hdr>\n...\n\g<exception>\g<msg>", block))
return "".join(blocks)
@@ -296,7 +369,7 @@ def run(*arg, **kw):
from nose.config import Config
from nose.plugins.manager import PluginManager
- buffer = StringIO()
+ buffer = Buffer()
if 'config' not in kw:
plugins = kw.pop('plugins', [])
if isinstance(plugins, list):
diff --git a/nose/pyversion.py b/nose/pyversion.py
index 105f910..36ddcfd 100644
--- a/nose/pyversion.py
+++ b/nose/pyversion.py
@@ -70,8 +70,8 @@ else:
# definition so that things can do stuff based on its associated class)
class UnboundMethod:
def __init__(self, cls, func):
- # Make sure we have all the same attributes as the original function,
- # so that the AttributeSelector plugin will work correctly...
+ # Make sure we have all the same attributes as the original function,
+ # so that the AttributeSelector plugin will work correctly...
self.__dict__ = func.__dict__.copy()
self._func = func
self.__self__ = UnboundSelf(cls)
@@ -122,6 +122,8 @@ def ismethod(obj):
# Make a pseudo-bytes function that can be called without the encoding arg:
if sys.version_info >= (3, 0):
def bytes_(s, encoding='utf8'):
+ if isinstance(s, bytes):
+ return s
return bytes(s, encoding)
else:
def bytes_(s, encoding=None):