diff options
-rw-r--r-- | .hgignore | 1 | ||||
-rw-r--r-- | AUTHORS | 1 | ||||
-rw-r--r-- | CHANGELOG | 2 | ||||
-rw-r--r-- | functional_tests/doc_tests/test_issue142/errorclass_failure.rst.py3.patch | 38 | ||||
-rw-r--r-- | functional_tests/doc_tests/test_multiprocess/multiprocess.rst | 2 | ||||
-rw-r--r-- | functional_tests/test_multiprocessing/support/nameerror.py | 4 | ||||
-rw-r--r-- | functional_tests/test_multiprocessing/support/timeout.py | 6 | ||||
-rw-r--r-- | functional_tests/test_multiprocessing/test_keyboardinterrupt.rst | 32 | ||||
-rw-r--r-- | functional_tests/test_multiprocessing/test_keyboardinterrupt_fixtures.py | 8 | ||||
-rw-r--r-- | functional_tests/test_multiprocessing/test_nameerror.rst | 21 | ||||
-rw-r--r-- | functional_tests/test_multiprocessing/test_nameerror_fixtures.py | 1 | ||||
-rw-r--r-- | nose/plugins/multiprocess.py | 140 | ||||
-rw-r--r-- | nose/plugins/plugintest.py | 91 | ||||
-rw-r--r-- | nose/pyversion.py | 6 | ||||
-rw-r--r-- | nosetests.1 | 2 | ||||
-rw-r--r-- | unit_tests/test_multiprocess.py | 6 |
16 files changed, 241 insertions, 120 deletions
@@ -20,3 +20,4 @@ dist build .noseids .tox +.*.sw[nop] @@ -16,3 +16,4 @@ Timothee Peignier Thomas Kluyver Heng Liu Rosen Diankov +Buck Golemon @@ -2,6 +2,8 @@ - Revised multiprocessing implementation so that it works for test generators (#399). Thanks to Rosen Diankov for the patch. +- More fixes to multiprocessing implemented by Buck Golemon (also part of + #399). - Code coverage plugin now uses native HTML generation when coverage 3 is installed (#264). Thanks to Timothee Peignier for the patch. - Xunit plugin now shows test run time in fractions of a second (#317) diff --git a/functional_tests/doc_tests/test_issue142/errorclass_failure.rst.py3.patch b/functional_tests/doc_tests/test_issue142/errorclass_failure.rst.py3.patch deleted file mode 100644 index f598a64..0000000 --- a/functional_tests/doc_tests/test_issue142/errorclass_failure.rst.py3.patch +++ /dev/null @@ -1,38 +0,0 @@ ---- errorclass_failure.rst.orig 2010-08-31 10:39:35.000000000 -0700 -+++ errorclass_failure.rst 2010-08-31 10:40:26.000000000 -0700 -@@ -30,7 +30,7 @@ - ---------------------------------------------------------------------- - Traceback (most recent call last): - ... -- Todo: fix me -+ errorclass_failure_plugin.Todo: fix me - <BLANKLINE> - ---------------------------------------------------------------------- - Ran 2 tests in ...s -@@ -49,7 +49,7 @@ - ---------------------------------------------------------------------- - Traceback (most recent call last): - ... -- Todo: fix me -+ errorclass_failure_plugin.Todo: fix me - <BLANKLINE> - ---------------------------------------------------------------------- - Ran 1 test in ...s -@@ -95,7 +95,7 @@ - ---------------------------------------------------------------------- - Traceback (most recent call last): - ... -- Todo: fix me -+ errorclass_failure_plugin.Todo: fix me - <BLANKLINE> - ---------------------------------------------------------------------- - Ran 6 tests in ...s -@@ -114,7 +114,7 @@ - ---------------------------------------------------------------------- - Traceback (most recent call last): - ... -- Todo: fix me -+ errorclass_failure_plugin.Todo: fix me - <BLANKLINE> - ---------------------------------------------------------------------- - Ran 6 tests in ...s diff --git a/functional_tests/doc_tests/test_multiprocess/multiprocess.rst b/functional_tests/doc_tests/test_multiprocess/multiprocess.rst index 8de2802..9bade23 100644 --- a/functional_tests/doc_tests/test_multiprocess/multiprocess.rst +++ b/functional_tests/doc_tests/test_multiprocess/multiprocess.rst @@ -221,6 +221,8 @@ Then we can run again and see the failures. >>> run(argv=['nosetests', '-v', '--processes=2', test_can_split], ... plugins=[MultiProcess()]) #doctest: +ELLIPSIS + setup called + teardown called test_can_split.... ... FAILED (failures=...) diff --git a/functional_tests/test_multiprocessing/support/nameerror.py b/functional_tests/test_multiprocessing/support/nameerror.py new file mode 100644 index 0000000..20e7bd7 --- /dev/null +++ b/functional_tests/test_multiprocessing/support/nameerror.py @@ -0,0 +1,4 @@ +# we purposefully raise a NameError at the top level here + +undefined_variable + diff --git a/functional_tests/test_multiprocessing/support/timeout.py b/functional_tests/test_multiprocessing/support/timeout.py new file mode 100644 index 0000000..52dce12 --- /dev/null +++ b/functional_tests/test_multiprocessing/support/timeout.py @@ -0,0 +1,6 @@ + +def test_timeout(): + "this test *should* fail when process-timeout=1" + from time import sleep + sleep(2) + diff --git a/functional_tests/test_multiprocessing/test_keyboardinterrupt.rst b/functional_tests/test_multiprocessing/test_keyboardinterrupt.rst new file mode 100644 index 0000000..6d108dd --- /dev/null +++ b/functional_tests/test_multiprocessing/test_keyboardinterrupt.rst @@ -0,0 +1,32 @@ + >>> from os.path import join, dirname + >>> from nose.plugins.plugintest import run_buffered + >>> from nose.plugins.multiprocess import MultiProcess + + >>> run_buffered( argv=[ + ... 'nosetests', '--processes=2', '--process-timeout=1', + ... join(dirname(__file__), 'support', 'timeout.py') + ... ], plugins=[MultiProcess()]) + E + ====================================================================== + ERROR: this test *should* fail when process-timeout=1 + ---------------------------------------------------------------------- + Traceback (most recent call last): + ... + TimedOutException: 'timeout.test_timeout' + <BLANKLINE> + ---------------------------------------------------------------------- + Ran 1 test in ...s + <BLANKLINE> + FAILED (errors=1) + + >>> run_buffered( argv=[ + ... 'nosetests', '--processes=2', '--process-timeout=3', + ... join(dirname(__file__), 'support', 'timeout.py') + ... ], plugins=[MultiProcess()]) + . + ---------------------------------------------------------------------- + Ran 1 test in ...s + <BLANKLINE> + OK + + diff --git a/functional_tests/test_multiprocessing/test_keyboardinterrupt_fixtures.py b/functional_tests/test_multiprocessing/test_keyboardinterrupt_fixtures.py new file mode 100644 index 0000000..539de6d --- /dev/null +++ b/functional_tests/test_multiprocessing/test_keyboardinterrupt_fixtures.py @@ -0,0 +1,8 @@ + +def setup_module(): + try: + import multiprocessing + except ImportError: + from nose.plugins.skip import SkipTest + raise SkipTest("multiprocessing module not available") + diff --git a/functional_tests/test_multiprocessing/test_nameerror.rst b/functional_tests/test_multiprocessing/test_nameerror.rst new file mode 100644 index 0000000..076c58f --- /dev/null +++ b/functional_tests/test_multiprocessing/test_nameerror.rst @@ -0,0 +1,21 @@ + >>> from os.path import join, dirname + >>> from nose.plugins.plugintest import run_buffered + >>> from nose.plugins.multiprocess import MultiProcess + + >>> def run(*cmd): run_buffered(argv=list(cmd), plugins=[MultiProcess()]) + >>> def path(fname): return join(dirname(__file__), 'support', fname) + + >>> run( 'nosetests', '--processes=2', path('nameerror.py') ) + E + ====================================================================== + ERROR: Failure: NameError (name 'undefined_variable' is not defined) + ---------------------------------------------------------------------- + Traceback (most recent call last): + ... + NameError: name 'undefined_variable' is not defined + <BLANKLINE> + ---------------------------------------------------------------------- + Ran 1 test in ...s + <BLANKLINE> + FAILED (errors=1) + diff --git a/functional_tests/test_multiprocessing/test_nameerror_fixtures.py b/functional_tests/test_multiprocessing/test_nameerror_fixtures.py new file mode 100644 index 0000000..4e70060 --- /dev/null +++ b/functional_tests/test_multiprocessing/test_nameerror_fixtures.py @@ -0,0 +1 @@ +from test_keyboardinterrupt_fixtures import setup_module diff --git a/nose/plugins/multiprocess.py b/nose/plugins/multiprocess.py index ec54978..149b695 100644 --- a/nose/plugins/multiprocess.py +++ b/nose/plugins/multiprocess.py @@ -139,11 +139,11 @@ class TimedOutException(Exception): def _import_mp(): global Process, Queue, Pool, Event, Value, Array try: - from multiprocessing import (Process as Process_, Queue as Queue_, - Pool as Pool_, Event as Event_, - Value as Value_, Array as Array_) - Process, Queue, Pool, Event, Value, Array = (Process_, Queue_, Pool_, - Event_, Value_, Array_) + from multiprocessing import Manager, Process + m = Manager() + Queue, Pool, Event, Value, Array = ( + m.Queue, m.Pool, m.Event, m.Value, m.Array + ) except ImportError: warn("multiprocessing module is not available, multiprocess plugin " "cannot be used", RuntimeWarning) @@ -293,7 +293,11 @@ class MultiProcessTestRunner(TextTestRunner): case(result) # run here to capture the failure continue # handle shared fixtures - if isinstance(case, ContextSuite) and self.sharedFixtures(case): + if isinstance(case, ContextSuite) and case.context is failure.Failure: + log.debug("Case is a Failure") + case(result) # run here to capture the failure + continue + elif isinstance(case, ContextSuite) and self.sharedFixtures(case): log.debug("%s has shared fixtures", case) try: case.setUp() @@ -316,9 +320,8 @@ class MultiProcessTestRunner(TextTestRunner): log.debug("Starting %s workers", self.config.multiprocess_workers) for i in range(self.config.multiprocess_workers): - currentaddr = Array('c',1000) - currentaddr.value = bytes_('') - currentstart = Value('d') + currentaddr = Value('c',bytes_('')) + currentstart = Value('d',0.0) keyboardCaught = Event() p = Process(target=runner, args=(i, testQueue, resultQueue, currentaddr, currentstart, @@ -375,10 +378,8 @@ class MultiProcessTestRunner(TextTestRunner): workers[iworker].join(timeout=1) if not shouldStop.is_set() and not testQueue.empty(): log.debug('starting new process on worker %s',iworker) - currentaddr = Array('c',1000) - currentaddr.value = bytes_('') - currentstart = Value('d') - currentstart.value = time.time() + currentaddr = Value('c',bytes_('')) + currentstart = Value('d',time.time()) keyboardCaught = Event() workers[iworker] = Process(target=runner, args=(iworker, testQueue, @@ -401,10 +402,10 @@ class MultiProcessTestRunner(TextTestRunner): any_alive = False for iworker, w in enumerate(workers): if w.is_alive(): - worker_addr = str(w.currentaddr.value,'ascii') - timeprocessing = time.time()-w.currentstart.value - if (len(worker_addr) == 0 - and timeprocessing > self.config.multiprocess_timeout-0.1): + worker_addr = bytes_(w.currentaddr.value,'ascii') + timeprocessing = time.time() - w.currentstart.value + if ( len(worker_addr) == 0 + and timeprocessing > self.config.multiprocess_timeout-0.1): log.debug('worker %d has finished its work item, ' 'but is not exiting? do we wait for it?', iworker) @@ -427,10 +428,8 @@ class MultiProcessTestRunner(TextTestRunner): # have to terminate... log.error("terminating worker %s",iworker) w.terminate() - currentaddr = Array('c',1000) - currentaddr.value = bytes_('') - currentstart = Value('d') - currentstart.value = time.time() + currentaddr = Value('c',bytes_('')) + currentstart = Value('d',time.time()) keyboardCaught = Event() workers[iworker] = Process(target=runner, args=(iworker, testQueue, resultQueue, @@ -620,6 +619,18 @@ class MultiProcessTestRunner(TextTestRunner): def runner(ix, testQueue, resultQueue, currentaddr, currentstart, keyboardCaught, shouldStop, loaderClass, resultClass, config): + try: + try: + return __runner(ix, testQueue, resultQueue, currentaddr, currentstart, + keyboardCaught, shouldStop, loaderClass, resultClass, config) + except KeyboardInterrupt: + log.debug('Worker %s keyboard interrupt, stopping',ix) + except Empty: + log.debug("Worker %s timed out waiting for tasks", ix) + +def __runner(ix, testQueue, resultQueue, currentaddr, currentstart, + keyboardCaught, shouldStop, loaderClass, resultClass, config): + config = pickle.loads(config) dummy_parser = config.parserClass() if _instantiate_plugins is not None: @@ -659,54 +670,47 @@ def runner(ix, testQueue, resultQueue, currentaddr, currentstart, failures, errors, errorClasses) - try: + for test_addr, arg in iter(get, 'STOP'): + if shouldStop.is_set(): + log.exception('Worker %d STOPPED',ix) + break + result = makeResult() + test = loader.loadTestsFromNames([test_addr]) + test.testQueue = testQueue + test.tasks = [] + test.arg = arg + log.debug("Worker %s Test is %s (%s)", ix, test_addr, test) try: - for test_addr, arg in iter(get, 'STOP'): - if shouldStop.is_set(): - log.exception('Worker %d STOPPED',ix) - break - result = makeResult() - test = loader.loadTestsFromNames([test_addr]) - test.testQueue = testQueue - test.tasks = [] - test.arg = arg - log.debug("Worker %s Test is %s (%s)", ix, test_addr, test) - try: - if arg is not None: - test_addr = test_addr + str(arg) - currentaddr.value = bytes_(test_addr) - currentstart.value = time.time() - test(result) - currentaddr.value = bytes_('') - resultQueue.put((ix, test_addr, test.tasks, batch(result))) - except KeyboardInterrupt: - keyboardCaught.set() - if len(currentaddr.value) > 0: - log.exception('Worker %s keyboard interrupt, failing ' - 'current test %s',ix,test_addr) - currentaddr.value = bytes_('') - failure.Failure(*sys.exc_info())(result) - resultQueue.put((ix, test_addr, test.tasks, batch(result))) - else: - log.debug('Worker %s test %s timed out',ix,test_addr) - resultQueue.put((ix, test_addr, test.tasks, batch(result))) - except SystemExit: - currentaddr.value = bytes_('') - log.exception('Worker %s system exit',ix) - raise - except: - currentaddr.value = bytes_('') - log.exception("Worker %s error running test or returning " - "results",ix) - failure.Failure(*sys.exc_info())(result) - resultQueue.put((ix, test_addr, test.tasks, batch(result))) - if config.multiprocess_restartworker: - break - except Empty: - log.debug("Worker %s timed out waiting for tasks", ix) - finally: - testQueue.close() - resultQueue.close() + if arg is not None: + test_addr = test_addr + str(arg) + currentaddr.value = bytes_(test_addr) + currentstart.value = time.time() + test(result) + currentaddr.value = bytes_('') + resultQueue.put((ix, test_addr, test.tasks, batch(result))) + except KeyboardInterrupt: + keyboardCaught.set() + if len(currentaddr.value) > 0: + log.exception('Worker %s keyboard interrupt, failing ' + 'current test %s',ix,test_addr) + currentaddr.value = bytes_('') + failure.Failure(*sys.exc_info())(result) + resultQueue.put((ix, test_addr, test.tasks, batch(result))) + else: + log.debug('Worker %s test %s timed out',ix,test_addr) + resultQueue.put((ix, test_addr, test.tasks, batch(result))) + except SystemExit: + currentaddr.value = bytes_('') + log.exception('Worker %s system exit',ix) + raise + except: + currentaddr.value = bytes_('') + log.exception("Worker %s error running test or returning " + "results",ix) + failure.Failure(*sys.exc_info())(result) + resultQueue.put((ix, test_addr, test.tasks, batch(result))) + if config.multiprocess_restartworker: + break log.debug("Worker %s ending", ix) diff --git a/nose/plugins/plugintest.py b/nose/plugins/plugintest.py index 49f1884..68e8941 100644 --- a/nose/plugins/plugintest.py +++ b/nose/plugins/plugintest.py @@ -103,9 +103,78 @@ try: from cStringIO import StringIO except ImportError: from StringIO import StringIO - + __all__ = ['PluginTester', 'run'] +from os import getpid +class MultiProcessFile(object): + """ + helper for testing multiprocessing + + multiprocessing poses a problem for doctests, since the strategy + of replacing sys.stdout/stderr with file-like objects then + inspecting the results won't work: the child processes will + write to the objects, but the data will not be reflected + in the parent doctest-ing process. + + The solution is to create file-like objects which will interact with + multiprocessing in a more desirable way. + + All processes can write to this object, but only the creator can read. + This allows the testing system to see a unified picture of I/O. + """ + def __init__(self): + # per advice at: + # http://docs.python.org/library/multiprocessing.html#all-platforms + self.__master = getpid() + self.__queue = Manager().Queue() + self.__buffer = StringIO() + self.softspace = 0 + + def buffer(self): + if getpid() != self.__master: + return + + from Queue import Empty + from collections import defaultdict + cache = defaultdict(str) + while True: + try: + pid, data = self.__queue.get_nowait() + except Empty: + break + if pid == (): + #show parent output after children + #this is what users see, usually + pid = ( 1e100, ) # googol! + cache[pid] += data + for pid in sorted(cache): + #self.__buffer.write( '%s wrote: %r\n' % (pid, cache[pid]) ) #DEBUG + self.__buffer.write( cache[pid] ) + def write(self, data): + # note that these pids are in the form of current_process()._identity + # rather than OS pids + from multiprocessing import current_process + pid = current_process()._identity + self.__queue.put((pid, data)) + def __iter__(self): + "getattr doesn't work for iter()" + self.buffer() + return self.__buffer + def seek(self, offset, whence=0): + self.buffer() + return self.__buffer.seek(offset, whence) + def getvalue(self): + self.buffer() + return self.__buffer.getvalue() + def __getattr__(self, attr): + return getattr(self.__buffer, attr) + +try: + from multiprocessing import Manager + Buffer = MultiProcessFile +except ImportError: + Buffer = StringIO class PluginTester(object): """A mixin for testing nose plugins in their runtime environment. @@ -177,7 +246,7 @@ class PluginTester(object): from nose.plugins.manager import PluginManager suite = None - stream = StringIO() + stream = Buffer() conf = Config(env=self.env, stream=stream, plugins=PluginManager(plugins=self.plugins)) @@ -214,16 +283,18 @@ class AccessDecorator(object): def __contains__(self, val): return val in self._buf def __iter__(self): - return self.stream + return iter(self.stream) def __str__(self): return self._buf def blankline_separated_blocks(text): + "a bunch of === characters is also considered a blank line" block = [] for line in text.splitlines(True): block.append(line) - if not line.strip(): + line = line.strip() + if not line or line.startswith('===') and not line.strip('='): yield "".join(block) block = [] if block: @@ -240,13 +311,15 @@ def remove_stack_traces(out): | innermost\ last ) \) : ) - \s* $ # toss trailing whitespace on the header. - (?P<stack> .*?) # don't blink: absorb stuff until... - ^ (?P<msg> \w+ .*) # a line *starts* with alphanum. + \s* $ # toss trailing whitespace on the header. + (?P<stack> .*?) # don't blink: absorb stuff until... + ^(?=\w) # a line *starts* with alphanum. + .*?(?P<exception> \w+ ) # exception name + (?P<msg> [:\n] .*) # the rest """, re.VERBOSE | re.MULTILINE | re.DOTALL) blocks = [] for block in blankline_separated_blocks(out): - blocks.append(traceback_re.sub(r"\g<hdr>\n...\n\g<msg>", block)) + blocks.append(traceback_re.sub(r"\g<hdr>\n...\n\g<exception>\g<msg>", block)) return "".join(blocks) @@ -296,7 +369,7 @@ def run(*arg, **kw): from nose.config import Config from nose.plugins.manager import PluginManager - buffer = StringIO() + buffer = Buffer() if 'config' not in kw: plugins = kw.pop('plugins', []) if isinstance(plugins, list): diff --git a/nose/pyversion.py b/nose/pyversion.py index 105f910..36ddcfd 100644 --- a/nose/pyversion.py +++ b/nose/pyversion.py @@ -70,8 +70,8 @@ else: # definition so that things can do stuff based on its associated class) class UnboundMethod: def __init__(self, cls, func): - # Make sure we have all the same attributes as the original function, - # so that the AttributeSelector plugin will work correctly... + # Make sure we have all the same attributes as the original function, + # so that the AttributeSelector plugin will work correctly... self.__dict__ = func.__dict__.copy() self._func = func self.__self__ = UnboundSelf(cls) @@ -122,6 +122,8 @@ def ismethod(obj): # Make a pseudo-bytes function that can be called without the encoding arg: if sys.version_info >= (3, 0): def bytes_(s, encoding='utf8'): + if isinstance(s, bytes): + return s return bytes(s, encoding) else: def bytes_(s, encoding=None): diff --git a/nosetests.1 b/nosetests.1 index 504ebbf..cf0473f 100644 --- a/nosetests.1 +++ b/nosetests.1 @@ -471,5 +471,5 @@ jpellerin+nose@gmail.com .SH COPYRIGHT LGPL -.\" Generated by docutils manpage writer on 2011-04-18 10:57. +.\" Generated by docutils manpage writer on 2011-05-02 14:25. .\" diff --git a/unit_tests/test_multiprocess.py b/unit_tests/test_multiprocess.py index c262b7d..aeb65e4 100644 --- a/unit_tests/test_multiprocess.py +++ b/unit_tests/test_multiprocess.py @@ -46,11 +46,13 @@ class T(unittest.TestCase): pass def test_mp_process_args_pickleable(): - raise SkipTest('this currently gets stuck in poll() 90% of the time') + # TODO(Kumar) this test needs to be more succint. + # If you start seeing it timeout then perhaps we need to skip it again. + # raise SkipTest('this currently gets stuck in poll() 90% of the time') test = case.Test(T('runTest')) config = Config() config.multiprocess_workers = 2 - config.multiprocess_timeout = 0.5 + config.multiprocess_timeout = 5 runner = multiprocess.MultiProcessTestRunner( stream=_WritelnDecorator(sys.stdout), verbosity=10, |