diff options
author | kumar <kumar.mcmillan@gmail.com> | 2011-05-02 15:22:23 -0500 |
---|---|---|
committer | kumar <kumar.mcmillan@gmail.com> | 2011-05-02 15:22:23 -0500 |
commit | 3a5e71e4e0914cdce1acac9d669313fac05c7a1c (patch) | |
tree | ab7d1cf1262042816b9fdad51a419ffdf86bf38b /nose | |
parent | 0e2ef2befd4e35bf8488c066893d91ff15dd2ac0 (diff) | |
parent | 9c7fda3c3c0a4657aad706ab9ab297a102b10122 (diff) | |
download | nose-3a5e71e4e0914cdce1acac9d669313fac05c7a1c.tar.gz |
Merged with attr plugin fixes
Diffstat (limited to 'nose')
-rw-r--r-- | nose/plugins/multiprocess.py | 140 | ||||
-rw-r--r-- | nose/plugins/plugintest.py | 91 | ||||
-rw-r--r-- | nose/pyversion.py | 6 |
3 files changed, 158 insertions, 79 deletions
diff --git a/nose/plugins/multiprocess.py b/nose/plugins/multiprocess.py index ec54978..149b695 100644 --- a/nose/plugins/multiprocess.py +++ b/nose/plugins/multiprocess.py @@ -139,11 +139,11 @@ class TimedOutException(Exception): def _import_mp(): global Process, Queue, Pool, Event, Value, Array try: - from multiprocessing import (Process as Process_, Queue as Queue_, - Pool as Pool_, Event as Event_, - Value as Value_, Array as Array_) - Process, Queue, Pool, Event, Value, Array = (Process_, Queue_, Pool_, - Event_, Value_, Array_) + from multiprocessing import Manager, Process + m = Manager() + Queue, Pool, Event, Value, Array = ( + m.Queue, m.Pool, m.Event, m.Value, m.Array + ) except ImportError: warn("multiprocessing module is not available, multiprocess plugin " "cannot be used", RuntimeWarning) @@ -293,7 +293,11 @@ class MultiProcessTestRunner(TextTestRunner): case(result) # run here to capture the failure continue # handle shared fixtures - if isinstance(case, ContextSuite) and self.sharedFixtures(case): + if isinstance(case, ContextSuite) and case.context is failure.Failure: + log.debug("Case is a Failure") + case(result) # run here to capture the failure + continue + elif isinstance(case, ContextSuite) and self.sharedFixtures(case): log.debug("%s has shared fixtures", case) try: case.setUp() @@ -316,9 +320,8 @@ class MultiProcessTestRunner(TextTestRunner): log.debug("Starting %s workers", self.config.multiprocess_workers) for i in range(self.config.multiprocess_workers): - currentaddr = Array('c',1000) - currentaddr.value = bytes_('') - currentstart = Value('d') + currentaddr = Value('c',bytes_('')) + currentstart = Value('d',0.0) keyboardCaught = Event() p = Process(target=runner, args=(i, testQueue, resultQueue, currentaddr, currentstart, @@ -375,10 +378,8 @@ class MultiProcessTestRunner(TextTestRunner): workers[iworker].join(timeout=1) if not shouldStop.is_set() and not testQueue.empty(): log.debug('starting new process on worker %s',iworker) - currentaddr = Array('c',1000) - currentaddr.value = bytes_('') - currentstart = Value('d') - currentstart.value = time.time() + currentaddr = Value('c',bytes_('')) + currentstart = Value('d',time.time()) keyboardCaught = Event() workers[iworker] = Process(target=runner, args=(iworker, testQueue, @@ -401,10 +402,10 @@ class MultiProcessTestRunner(TextTestRunner): any_alive = False for iworker, w in enumerate(workers): if w.is_alive(): - worker_addr = str(w.currentaddr.value,'ascii') - timeprocessing = time.time()-w.currentstart.value - if (len(worker_addr) == 0 - and timeprocessing > self.config.multiprocess_timeout-0.1): + worker_addr = bytes_(w.currentaddr.value,'ascii') + timeprocessing = time.time() - w.currentstart.value + if ( len(worker_addr) == 0 + and timeprocessing > self.config.multiprocess_timeout-0.1): log.debug('worker %d has finished its work item, ' 'but is not exiting? do we wait for it?', iworker) @@ -427,10 +428,8 @@ class MultiProcessTestRunner(TextTestRunner): # have to terminate... log.error("terminating worker %s",iworker) w.terminate() - currentaddr = Array('c',1000) - currentaddr.value = bytes_('') - currentstart = Value('d') - currentstart.value = time.time() + currentaddr = Value('c',bytes_('')) + currentstart = Value('d',time.time()) keyboardCaught = Event() workers[iworker] = Process(target=runner, args=(iworker, testQueue, resultQueue, @@ -620,6 +619,18 @@ class MultiProcessTestRunner(TextTestRunner): def runner(ix, testQueue, resultQueue, currentaddr, currentstart, keyboardCaught, shouldStop, loaderClass, resultClass, config): + try: + try: + return __runner(ix, testQueue, resultQueue, currentaddr, currentstart, + keyboardCaught, shouldStop, loaderClass, resultClass, config) + except KeyboardInterrupt: + log.debug('Worker %s keyboard interrupt, stopping',ix) + except Empty: + log.debug("Worker %s timed out waiting for tasks", ix) + +def __runner(ix, testQueue, resultQueue, currentaddr, currentstart, + keyboardCaught, shouldStop, loaderClass, resultClass, config): + config = pickle.loads(config) dummy_parser = config.parserClass() if _instantiate_plugins is not None: @@ -659,54 +670,47 @@ def runner(ix, testQueue, resultQueue, currentaddr, currentstart, failures, errors, errorClasses) - try: + for test_addr, arg in iter(get, 'STOP'): + if shouldStop.is_set(): + log.exception('Worker %d STOPPED',ix) + break + result = makeResult() + test = loader.loadTestsFromNames([test_addr]) + test.testQueue = testQueue + test.tasks = [] + test.arg = arg + log.debug("Worker %s Test is %s (%s)", ix, test_addr, test) try: - for test_addr, arg in iter(get, 'STOP'): - if shouldStop.is_set(): - log.exception('Worker %d STOPPED',ix) - break - result = makeResult() - test = loader.loadTestsFromNames([test_addr]) - test.testQueue = testQueue - test.tasks = [] - test.arg = arg - log.debug("Worker %s Test is %s (%s)", ix, test_addr, test) - try: - if arg is not None: - test_addr = test_addr + str(arg) - currentaddr.value = bytes_(test_addr) - currentstart.value = time.time() - test(result) - currentaddr.value = bytes_('') - resultQueue.put((ix, test_addr, test.tasks, batch(result))) - except KeyboardInterrupt: - keyboardCaught.set() - if len(currentaddr.value) > 0: - log.exception('Worker %s keyboard interrupt, failing ' - 'current test %s',ix,test_addr) - currentaddr.value = bytes_('') - failure.Failure(*sys.exc_info())(result) - resultQueue.put((ix, test_addr, test.tasks, batch(result))) - else: - log.debug('Worker %s test %s timed out',ix,test_addr) - resultQueue.put((ix, test_addr, test.tasks, batch(result))) - except SystemExit: - currentaddr.value = bytes_('') - log.exception('Worker %s system exit',ix) - raise - except: - currentaddr.value = bytes_('') - log.exception("Worker %s error running test or returning " - "results",ix) - failure.Failure(*sys.exc_info())(result) - resultQueue.put((ix, test_addr, test.tasks, batch(result))) - if config.multiprocess_restartworker: - break - except Empty: - log.debug("Worker %s timed out waiting for tasks", ix) - finally: - testQueue.close() - resultQueue.close() + if arg is not None: + test_addr = test_addr + str(arg) + currentaddr.value = bytes_(test_addr) + currentstart.value = time.time() + test(result) + currentaddr.value = bytes_('') + resultQueue.put((ix, test_addr, test.tasks, batch(result))) + except KeyboardInterrupt: + keyboardCaught.set() + if len(currentaddr.value) > 0: + log.exception('Worker %s keyboard interrupt, failing ' + 'current test %s',ix,test_addr) + currentaddr.value = bytes_('') + failure.Failure(*sys.exc_info())(result) + resultQueue.put((ix, test_addr, test.tasks, batch(result))) + else: + log.debug('Worker %s test %s timed out',ix,test_addr) + resultQueue.put((ix, test_addr, test.tasks, batch(result))) + except SystemExit: + currentaddr.value = bytes_('') + log.exception('Worker %s system exit',ix) + raise + except: + currentaddr.value = bytes_('') + log.exception("Worker %s error running test or returning " + "results",ix) + failure.Failure(*sys.exc_info())(result) + resultQueue.put((ix, test_addr, test.tasks, batch(result))) + if config.multiprocess_restartworker: + break log.debug("Worker %s ending", ix) diff --git a/nose/plugins/plugintest.py b/nose/plugins/plugintest.py index 49f1884..68e8941 100644 --- a/nose/plugins/plugintest.py +++ b/nose/plugins/plugintest.py @@ -103,9 +103,78 @@ try: from cStringIO import StringIO except ImportError: from StringIO import StringIO - + __all__ = ['PluginTester', 'run'] +from os import getpid +class MultiProcessFile(object): + """ + helper for testing multiprocessing + + multiprocessing poses a problem for doctests, since the strategy + of replacing sys.stdout/stderr with file-like objects then + inspecting the results won't work: the child processes will + write to the objects, but the data will not be reflected + in the parent doctest-ing process. + + The solution is to create file-like objects which will interact with + multiprocessing in a more desirable way. + + All processes can write to this object, but only the creator can read. + This allows the testing system to see a unified picture of I/O. + """ + def __init__(self): + # per advice at: + # http://docs.python.org/library/multiprocessing.html#all-platforms + self.__master = getpid() + self.__queue = Manager().Queue() + self.__buffer = StringIO() + self.softspace = 0 + + def buffer(self): + if getpid() != self.__master: + return + + from Queue import Empty + from collections import defaultdict + cache = defaultdict(str) + while True: + try: + pid, data = self.__queue.get_nowait() + except Empty: + break + if pid == (): + #show parent output after children + #this is what users see, usually + pid = ( 1e100, ) # googol! + cache[pid] += data + for pid in sorted(cache): + #self.__buffer.write( '%s wrote: %r\n' % (pid, cache[pid]) ) #DEBUG + self.__buffer.write( cache[pid] ) + def write(self, data): + # note that these pids are in the form of current_process()._identity + # rather than OS pids + from multiprocessing import current_process + pid = current_process()._identity + self.__queue.put((pid, data)) + def __iter__(self): + "getattr doesn't work for iter()" + self.buffer() + return self.__buffer + def seek(self, offset, whence=0): + self.buffer() + return self.__buffer.seek(offset, whence) + def getvalue(self): + self.buffer() + return self.__buffer.getvalue() + def __getattr__(self, attr): + return getattr(self.__buffer, attr) + +try: + from multiprocessing import Manager + Buffer = MultiProcessFile +except ImportError: + Buffer = StringIO class PluginTester(object): """A mixin for testing nose plugins in their runtime environment. @@ -177,7 +246,7 @@ class PluginTester(object): from nose.plugins.manager import PluginManager suite = None - stream = StringIO() + stream = Buffer() conf = Config(env=self.env, stream=stream, plugins=PluginManager(plugins=self.plugins)) @@ -214,16 +283,18 @@ class AccessDecorator(object): def __contains__(self, val): return val in self._buf def __iter__(self): - return self.stream + return iter(self.stream) def __str__(self): return self._buf def blankline_separated_blocks(text): + "a bunch of === characters is also considered a blank line" block = [] for line in text.splitlines(True): block.append(line) - if not line.strip(): + line = line.strip() + if not line or line.startswith('===') and not line.strip('='): yield "".join(block) block = [] if block: @@ -240,13 +311,15 @@ def remove_stack_traces(out): | innermost\ last ) \) : ) - \s* $ # toss trailing whitespace on the header. - (?P<stack> .*?) # don't blink: absorb stuff until... - ^ (?P<msg> \w+ .*) # a line *starts* with alphanum. + \s* $ # toss trailing whitespace on the header. + (?P<stack> .*?) # don't blink: absorb stuff until... + ^(?=\w) # a line *starts* with alphanum. + .*?(?P<exception> \w+ ) # exception name + (?P<msg> [:\n] .*) # the rest """, re.VERBOSE | re.MULTILINE | re.DOTALL) blocks = [] for block in blankline_separated_blocks(out): - blocks.append(traceback_re.sub(r"\g<hdr>\n...\n\g<msg>", block)) + blocks.append(traceback_re.sub(r"\g<hdr>\n...\n\g<exception>\g<msg>", block)) return "".join(blocks) @@ -296,7 +369,7 @@ def run(*arg, **kw): from nose.config import Config from nose.plugins.manager import PluginManager - buffer = StringIO() + buffer = Buffer() if 'config' not in kw: plugins = kw.pop('plugins', []) if isinstance(plugins, list): diff --git a/nose/pyversion.py b/nose/pyversion.py index 105f910..36ddcfd 100644 --- a/nose/pyversion.py +++ b/nose/pyversion.py @@ -70,8 +70,8 @@ else: # definition so that things can do stuff based on its associated class) class UnboundMethod: def __init__(self, cls, func): - # Make sure we have all the same attributes as the original function, - # so that the AttributeSelector plugin will work correctly... + # Make sure we have all the same attributes as the original function, + # so that the AttributeSelector plugin will work correctly... self.__dict__ = func.__dict__.copy() self._func = func self.__self__ = UnboundSelf(cls) @@ -122,6 +122,8 @@ def ismethod(obj): # Make a pseudo-bytes function that can be called without the encoding arg: if sys.version_info >= (3, 0): def bytes_(s, encoding='utf8'): + if isinstance(s, bytes): + return s return bytes(s, encoding) else: def bytes_(s, encoding=None): |