summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.hgignore1
-rw-r--r--AUTHORS1
-rw-r--r--CHANGELOG2
-rw-r--r--functional_tests/doc_tests/test_issue142/errorclass_failure.rst.py3.patch38
-rw-r--r--functional_tests/doc_tests/test_multiprocess/multiprocess.rst2
-rw-r--r--functional_tests/test_multiprocessing/support/nameerror.py4
-rw-r--r--functional_tests/test_multiprocessing/support/timeout.py6
-rw-r--r--functional_tests/test_multiprocessing/test_keyboardinterrupt.rst32
-rw-r--r--functional_tests/test_multiprocessing/test_keyboardinterrupt_fixtures.py8
-rw-r--r--functional_tests/test_multiprocessing/test_nameerror.rst21
-rw-r--r--functional_tests/test_multiprocessing/test_nameerror_fixtures.py1
-rw-r--r--nose/plugins/multiprocess.py140
-rw-r--r--nose/plugins/plugintest.py91
-rw-r--r--nose/pyversion.py6
-rw-r--r--nosetests.12
-rw-r--r--unit_tests/test_multiprocess.py6
16 files changed, 241 insertions, 120 deletions
diff --git a/.hgignore b/.hgignore
index f9431ac..a069a0b 100644
--- a/.hgignore
+++ b/.hgignore
@@ -20,3 +20,4 @@ dist
build
.noseids
.tox
+.*.sw[nop]
diff --git a/AUTHORS b/AUTHORS
index 615b67a..a24c015 100644
--- a/AUTHORS
+++ b/AUTHORS
@@ -16,3 +16,4 @@ Timothee Peignier
Thomas Kluyver
Heng Liu
Rosen Diankov
+Buck Golemon
diff --git a/CHANGELOG b/CHANGELOG
index 37b341e..c0d0a9f 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,6 +2,8 @@
- Revised multiprocessing implementation so that it works for test generators
(#399). Thanks to Rosen Diankov for the patch.
+- More fixes to multiprocessing implemented by Buck Golemon (also part of
+ #399).
- Code coverage plugin now uses native HTML generation when coverage 3 is
installed (#264). Thanks to Timothee Peignier for the patch.
- Xunit plugin now shows test run time in fractions of a second (#317)
diff --git a/functional_tests/doc_tests/test_issue142/errorclass_failure.rst.py3.patch b/functional_tests/doc_tests/test_issue142/errorclass_failure.rst.py3.patch
deleted file mode 100644
index f598a64..0000000
--- a/functional_tests/doc_tests/test_issue142/errorclass_failure.rst.py3.patch
+++ /dev/null
@@ -1,38 +0,0 @@
---- errorclass_failure.rst.orig 2010-08-31 10:39:35.000000000 -0700
-+++ errorclass_failure.rst 2010-08-31 10:40:26.000000000 -0700
-@@ -30,7 +30,7 @@
- ----------------------------------------------------------------------
- Traceback (most recent call last):
- ...
-- Todo: fix me
-+ errorclass_failure_plugin.Todo: fix me
- <BLANKLINE>
- ----------------------------------------------------------------------
- Ran 2 tests in ...s
-@@ -49,7 +49,7 @@
- ----------------------------------------------------------------------
- Traceback (most recent call last):
- ...
-- Todo: fix me
-+ errorclass_failure_plugin.Todo: fix me
- <BLANKLINE>
- ----------------------------------------------------------------------
- Ran 1 test in ...s
-@@ -95,7 +95,7 @@
- ----------------------------------------------------------------------
- Traceback (most recent call last):
- ...
-- Todo: fix me
-+ errorclass_failure_plugin.Todo: fix me
- <BLANKLINE>
- ----------------------------------------------------------------------
- Ran 6 tests in ...s
-@@ -114,7 +114,7 @@
- ----------------------------------------------------------------------
- Traceback (most recent call last):
- ...
-- Todo: fix me
-+ errorclass_failure_plugin.Todo: fix me
- <BLANKLINE>
- ----------------------------------------------------------------------
- Ran 6 tests in ...s
diff --git a/functional_tests/doc_tests/test_multiprocess/multiprocess.rst b/functional_tests/doc_tests/test_multiprocess/multiprocess.rst
index 8de2802..9bade23 100644
--- a/functional_tests/doc_tests/test_multiprocess/multiprocess.rst
+++ b/functional_tests/doc_tests/test_multiprocess/multiprocess.rst
@@ -221,6 +221,8 @@ Then we can run again and see the failures.
>>> run(argv=['nosetests', '-v', '--processes=2', test_can_split],
... plugins=[MultiProcess()]) #doctest: +ELLIPSIS
+ setup called
+ teardown called
test_can_split....
...
FAILED (failures=...)
diff --git a/functional_tests/test_multiprocessing/support/nameerror.py b/functional_tests/test_multiprocessing/support/nameerror.py
new file mode 100644
index 0000000..20e7bd7
--- /dev/null
+++ b/functional_tests/test_multiprocessing/support/nameerror.py
@@ -0,0 +1,4 @@
+# we purposefully raise a NameError at the top level here
+
+undefined_variable
+
diff --git a/functional_tests/test_multiprocessing/support/timeout.py b/functional_tests/test_multiprocessing/support/timeout.py
new file mode 100644
index 0000000..52dce12
--- /dev/null
+++ b/functional_tests/test_multiprocessing/support/timeout.py
@@ -0,0 +1,6 @@
+
+def test_timeout():
+ "this test *should* fail when process-timeout=1"
+ from time import sleep
+ sleep(2)
+
diff --git a/functional_tests/test_multiprocessing/test_keyboardinterrupt.rst b/functional_tests/test_multiprocessing/test_keyboardinterrupt.rst
new file mode 100644
index 0000000..6d108dd
--- /dev/null
+++ b/functional_tests/test_multiprocessing/test_keyboardinterrupt.rst
@@ -0,0 +1,32 @@
+ >>> from os.path import join, dirname
+ >>> from nose.plugins.plugintest import run_buffered
+ >>> from nose.plugins.multiprocess import MultiProcess
+
+ >>> run_buffered( argv=[
+ ... 'nosetests', '--processes=2', '--process-timeout=1',
+ ... join(dirname(__file__), 'support', 'timeout.py')
+ ... ], plugins=[MultiProcess()])
+ E
+ ======================================================================
+ ERROR: this test *should* fail when process-timeout=1
+ ----------------------------------------------------------------------
+ Traceback (most recent call last):
+ ...
+ TimedOutException: 'timeout.test_timeout'
+ <BLANKLINE>
+ ----------------------------------------------------------------------
+ Ran 1 test in ...s
+ <BLANKLINE>
+ FAILED (errors=1)
+
+ >>> run_buffered( argv=[
+ ... 'nosetests', '--processes=2', '--process-timeout=3',
+ ... join(dirname(__file__), 'support', 'timeout.py')
+ ... ], plugins=[MultiProcess()])
+ .
+ ----------------------------------------------------------------------
+ Ran 1 test in ...s
+ <BLANKLINE>
+ OK
+
+
diff --git a/functional_tests/test_multiprocessing/test_keyboardinterrupt_fixtures.py b/functional_tests/test_multiprocessing/test_keyboardinterrupt_fixtures.py
new file mode 100644
index 0000000..539de6d
--- /dev/null
+++ b/functional_tests/test_multiprocessing/test_keyboardinterrupt_fixtures.py
@@ -0,0 +1,8 @@
+
+def setup_module():
+ try:
+ import multiprocessing
+ except ImportError:
+ from nose.plugins.skip import SkipTest
+ raise SkipTest("multiprocessing module not available")
+
diff --git a/functional_tests/test_multiprocessing/test_nameerror.rst b/functional_tests/test_multiprocessing/test_nameerror.rst
new file mode 100644
index 0000000..076c58f
--- /dev/null
+++ b/functional_tests/test_multiprocessing/test_nameerror.rst
@@ -0,0 +1,21 @@
+ >>> from os.path import join, dirname
+ >>> from nose.plugins.plugintest import run_buffered
+ >>> from nose.plugins.multiprocess import MultiProcess
+
+ >>> def run(*cmd): run_buffered(argv=list(cmd), plugins=[MultiProcess()])
+ >>> def path(fname): return join(dirname(__file__), 'support', fname)
+
+ >>> run( 'nosetests', '--processes=2', path('nameerror.py') )
+ E
+ ======================================================================
+ ERROR: Failure: NameError (name 'undefined_variable' is not defined)
+ ----------------------------------------------------------------------
+ Traceback (most recent call last):
+ ...
+ NameError: name 'undefined_variable' is not defined
+ <BLANKLINE>
+ ----------------------------------------------------------------------
+ Ran 1 test in ...s
+ <BLANKLINE>
+ FAILED (errors=1)
+
diff --git a/functional_tests/test_multiprocessing/test_nameerror_fixtures.py b/functional_tests/test_multiprocessing/test_nameerror_fixtures.py
new file mode 100644
index 0000000..4e70060
--- /dev/null
+++ b/functional_tests/test_multiprocessing/test_nameerror_fixtures.py
@@ -0,0 +1 @@
+from test_keyboardinterrupt_fixtures import setup_module
diff --git a/nose/plugins/multiprocess.py b/nose/plugins/multiprocess.py
index ec54978..149b695 100644
--- a/nose/plugins/multiprocess.py
+++ b/nose/plugins/multiprocess.py
@@ -139,11 +139,11 @@ class TimedOutException(Exception):
def _import_mp():
global Process, Queue, Pool, Event, Value, Array
try:
- from multiprocessing import (Process as Process_, Queue as Queue_,
- Pool as Pool_, Event as Event_,
- Value as Value_, Array as Array_)
- Process, Queue, Pool, Event, Value, Array = (Process_, Queue_, Pool_,
- Event_, Value_, Array_)
+ from multiprocessing import Manager, Process
+ m = Manager()
+ Queue, Pool, Event, Value, Array = (
+ m.Queue, m.Pool, m.Event, m.Value, m.Array
+ )
except ImportError:
warn("multiprocessing module is not available, multiprocess plugin "
"cannot be used", RuntimeWarning)
@@ -293,7 +293,11 @@ class MultiProcessTestRunner(TextTestRunner):
case(result) # run here to capture the failure
continue
# handle shared fixtures
- if isinstance(case, ContextSuite) and self.sharedFixtures(case):
+ if isinstance(case, ContextSuite) and case.context is failure.Failure:
+ log.debug("Case is a Failure")
+ case(result) # run here to capture the failure
+ continue
+ elif isinstance(case, ContextSuite) and self.sharedFixtures(case):
log.debug("%s has shared fixtures", case)
try:
case.setUp()
@@ -316,9 +320,8 @@ class MultiProcessTestRunner(TextTestRunner):
log.debug("Starting %s workers", self.config.multiprocess_workers)
for i in range(self.config.multiprocess_workers):
- currentaddr = Array('c',1000)
- currentaddr.value = bytes_('')
- currentstart = Value('d')
+ currentaddr = Value('c',bytes_(''))
+ currentstart = Value('d',0.0)
keyboardCaught = Event()
p = Process(target=runner, args=(i, testQueue, resultQueue,
currentaddr, currentstart,
@@ -375,10 +378,8 @@ class MultiProcessTestRunner(TextTestRunner):
workers[iworker].join(timeout=1)
if not shouldStop.is_set() and not testQueue.empty():
log.debug('starting new process on worker %s',iworker)
- currentaddr = Array('c',1000)
- currentaddr.value = bytes_('')
- currentstart = Value('d')
- currentstart.value = time.time()
+ currentaddr = Value('c',bytes_(''))
+ currentstart = Value('d',time.time())
keyboardCaught = Event()
workers[iworker] = Process(target=runner,
args=(iworker, testQueue,
@@ -401,10 +402,10 @@ class MultiProcessTestRunner(TextTestRunner):
any_alive = False
for iworker, w in enumerate(workers):
if w.is_alive():
- worker_addr = str(w.currentaddr.value,'ascii')
- timeprocessing = time.time()-w.currentstart.value
- if (len(worker_addr) == 0
- and timeprocessing > self.config.multiprocess_timeout-0.1):
+ worker_addr = bytes_(w.currentaddr.value,'ascii')
+ timeprocessing = time.time() - w.currentstart.value
+ if ( len(worker_addr) == 0
+ and timeprocessing > self.config.multiprocess_timeout-0.1):
log.debug('worker %d has finished its work item, '
'but is not exiting? do we wait for it?',
iworker)
@@ -427,10 +428,8 @@ class MultiProcessTestRunner(TextTestRunner):
# have to terminate...
log.error("terminating worker %s",iworker)
w.terminate()
- currentaddr = Array('c',1000)
- currentaddr.value = bytes_('')
- currentstart = Value('d')
- currentstart.value = time.time()
+ currentaddr = Value('c',bytes_(''))
+ currentstart = Value('d',time.time())
keyboardCaught = Event()
workers[iworker] = Process(target=runner,
args=(iworker, testQueue, resultQueue,
@@ -620,6 +619,18 @@ class MultiProcessTestRunner(TextTestRunner):
def runner(ix, testQueue, resultQueue, currentaddr, currentstart,
keyboardCaught, shouldStop, loaderClass, resultClass, config):
+ try:
+ try:
+ return __runner(ix, testQueue, resultQueue, currentaddr, currentstart,
+ keyboardCaught, shouldStop, loaderClass, resultClass, config)
+ except KeyboardInterrupt:
+ log.debug('Worker %s keyboard interrupt, stopping',ix)
+ except Empty:
+ log.debug("Worker %s timed out waiting for tasks", ix)
+
+def __runner(ix, testQueue, resultQueue, currentaddr, currentstart,
+ keyboardCaught, shouldStop, loaderClass, resultClass, config):
+
config = pickle.loads(config)
dummy_parser = config.parserClass()
if _instantiate_plugins is not None:
@@ -659,54 +670,47 @@ def runner(ix, testQueue, resultQueue, currentaddr, currentstart,
failures,
errors,
errorClasses)
- try:
+ for test_addr, arg in iter(get, 'STOP'):
+ if shouldStop.is_set():
+ log.exception('Worker %d STOPPED',ix)
+ break
+ result = makeResult()
+ test = loader.loadTestsFromNames([test_addr])
+ test.testQueue = testQueue
+ test.tasks = []
+ test.arg = arg
+ log.debug("Worker %s Test is %s (%s)", ix, test_addr, test)
try:
- for test_addr, arg in iter(get, 'STOP'):
- if shouldStop.is_set():
- log.exception('Worker %d STOPPED',ix)
- break
- result = makeResult()
- test = loader.loadTestsFromNames([test_addr])
- test.testQueue = testQueue
- test.tasks = []
- test.arg = arg
- log.debug("Worker %s Test is %s (%s)", ix, test_addr, test)
- try:
- if arg is not None:
- test_addr = test_addr + str(arg)
- currentaddr.value = bytes_(test_addr)
- currentstart.value = time.time()
- test(result)
- currentaddr.value = bytes_('')
- resultQueue.put((ix, test_addr, test.tasks, batch(result)))
- except KeyboardInterrupt:
- keyboardCaught.set()
- if len(currentaddr.value) > 0:
- log.exception('Worker %s keyboard interrupt, failing '
- 'current test %s',ix,test_addr)
- currentaddr.value = bytes_('')
- failure.Failure(*sys.exc_info())(result)
- resultQueue.put((ix, test_addr, test.tasks, batch(result)))
- else:
- log.debug('Worker %s test %s timed out',ix,test_addr)
- resultQueue.put((ix, test_addr, test.tasks, batch(result)))
- except SystemExit:
- currentaddr.value = bytes_('')
- log.exception('Worker %s system exit',ix)
- raise
- except:
- currentaddr.value = bytes_('')
- log.exception("Worker %s error running test or returning "
- "results",ix)
- failure.Failure(*sys.exc_info())(result)
- resultQueue.put((ix, test_addr, test.tasks, batch(result)))
- if config.multiprocess_restartworker:
- break
- except Empty:
- log.debug("Worker %s timed out waiting for tasks", ix)
- finally:
- testQueue.close()
- resultQueue.close()
+ if arg is not None:
+ test_addr = test_addr + str(arg)
+ currentaddr.value = bytes_(test_addr)
+ currentstart.value = time.time()
+ test(result)
+ currentaddr.value = bytes_('')
+ resultQueue.put((ix, test_addr, test.tasks, batch(result)))
+ except KeyboardInterrupt:
+ keyboardCaught.set()
+ if len(currentaddr.value) > 0:
+ log.exception('Worker %s keyboard interrupt, failing '
+ 'current test %s',ix,test_addr)
+ currentaddr.value = bytes_('')
+ failure.Failure(*sys.exc_info())(result)
+ resultQueue.put((ix, test_addr, test.tasks, batch(result)))
+ else:
+ log.debug('Worker %s test %s timed out',ix,test_addr)
+ resultQueue.put((ix, test_addr, test.tasks, batch(result)))
+ except SystemExit:
+ currentaddr.value = bytes_('')
+ log.exception('Worker %s system exit',ix)
+ raise
+ except:
+ currentaddr.value = bytes_('')
+ log.exception("Worker %s error running test or returning "
+ "results",ix)
+ failure.Failure(*sys.exc_info())(result)
+ resultQueue.put((ix, test_addr, test.tasks, batch(result)))
+ if config.multiprocess_restartworker:
+ break
log.debug("Worker %s ending", ix)
diff --git a/nose/plugins/plugintest.py b/nose/plugins/plugintest.py
index 49f1884..68e8941 100644
--- a/nose/plugins/plugintest.py
+++ b/nose/plugins/plugintest.py
@@ -103,9 +103,78 @@ try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
-
+
__all__ = ['PluginTester', 'run']
+from os import getpid
+class MultiProcessFile(object):
+ """
+ helper for testing multiprocessing
+
+ multiprocessing poses a problem for doctests, since the strategy
+ of replacing sys.stdout/stderr with file-like objects then
+ inspecting the results won't work: the child processes will
+ write to the objects, but the data will not be reflected
+ in the parent doctest-ing process.
+
+ The solution is to create file-like objects which will interact with
+ multiprocessing in a more desirable way.
+
+ All processes can write to this object, but only the creator can read.
+ This allows the testing system to see a unified picture of I/O.
+ """
+ def __init__(self):
+ # per advice at:
+ # http://docs.python.org/library/multiprocessing.html#all-platforms
+ self.__master = getpid()
+ self.__queue = Manager().Queue()
+ self.__buffer = StringIO()
+ self.softspace = 0
+
+ def buffer(self):
+ if getpid() != self.__master:
+ return
+
+ from Queue import Empty
+ from collections import defaultdict
+ cache = defaultdict(str)
+ while True:
+ try:
+ pid, data = self.__queue.get_nowait()
+ except Empty:
+ break
+ if pid == ():
+ #show parent output after children
+ #this is what users see, usually
+ pid = ( 1e100, ) # googol!
+ cache[pid] += data
+ for pid in sorted(cache):
+ #self.__buffer.write( '%s wrote: %r\n' % (pid, cache[pid]) ) #DEBUG
+ self.__buffer.write( cache[pid] )
+ def write(self, data):
+ # note that these pids are in the form of current_process()._identity
+ # rather than OS pids
+ from multiprocessing import current_process
+ pid = current_process()._identity
+ self.__queue.put((pid, data))
+ def __iter__(self):
+ "getattr doesn't work for iter()"
+ self.buffer()
+ return self.__buffer
+ def seek(self, offset, whence=0):
+ self.buffer()
+ return self.__buffer.seek(offset, whence)
+ def getvalue(self):
+ self.buffer()
+ return self.__buffer.getvalue()
+ def __getattr__(self, attr):
+ return getattr(self.__buffer, attr)
+
+try:
+ from multiprocessing import Manager
+ Buffer = MultiProcessFile
+except ImportError:
+ Buffer = StringIO
class PluginTester(object):
"""A mixin for testing nose plugins in their runtime environment.
@@ -177,7 +246,7 @@ class PluginTester(object):
from nose.plugins.manager import PluginManager
suite = None
- stream = StringIO()
+ stream = Buffer()
conf = Config(env=self.env,
stream=stream,
plugins=PluginManager(plugins=self.plugins))
@@ -214,16 +283,18 @@ class AccessDecorator(object):
def __contains__(self, val):
return val in self._buf
def __iter__(self):
- return self.stream
+ return iter(self.stream)
def __str__(self):
return self._buf
def blankline_separated_blocks(text):
+ "a bunch of === characters is also considered a blank line"
block = []
for line in text.splitlines(True):
block.append(line)
- if not line.strip():
+ line = line.strip()
+ if not line or line.startswith('===') and not line.strip('='):
yield "".join(block)
block = []
if block:
@@ -240,13 +311,15 @@ def remove_stack_traces(out):
| innermost\ last
) \) :
)
- \s* $ # toss trailing whitespace on the header.
- (?P<stack> .*?) # don't blink: absorb stuff until...
- ^ (?P<msg> \w+ .*) # a line *starts* with alphanum.
+ \s* $ # toss trailing whitespace on the header.
+ (?P<stack> .*?) # don't blink: absorb stuff until...
+ ^(?=\w) # a line *starts* with alphanum.
+ .*?(?P<exception> \w+ ) # exception name
+ (?P<msg> [:\n] .*) # the rest
""", re.VERBOSE | re.MULTILINE | re.DOTALL)
blocks = []
for block in blankline_separated_blocks(out):
- blocks.append(traceback_re.sub(r"\g<hdr>\n...\n\g<msg>", block))
+ blocks.append(traceback_re.sub(r"\g<hdr>\n...\n\g<exception>\g<msg>", block))
return "".join(blocks)
@@ -296,7 +369,7 @@ def run(*arg, **kw):
from nose.config import Config
from nose.plugins.manager import PluginManager
- buffer = StringIO()
+ buffer = Buffer()
if 'config' not in kw:
plugins = kw.pop('plugins', [])
if isinstance(plugins, list):
diff --git a/nose/pyversion.py b/nose/pyversion.py
index 105f910..36ddcfd 100644
--- a/nose/pyversion.py
+++ b/nose/pyversion.py
@@ -70,8 +70,8 @@ else:
# definition so that things can do stuff based on its associated class)
class UnboundMethod:
def __init__(self, cls, func):
- # Make sure we have all the same attributes as the original function,
- # so that the AttributeSelector plugin will work correctly...
+ # Make sure we have all the same attributes as the original function,
+ # so that the AttributeSelector plugin will work correctly...
self.__dict__ = func.__dict__.copy()
self._func = func
self.__self__ = UnboundSelf(cls)
@@ -122,6 +122,8 @@ def ismethod(obj):
# Make a pseudo-bytes function that can be called without the encoding arg:
if sys.version_info >= (3, 0):
def bytes_(s, encoding='utf8'):
+ if isinstance(s, bytes):
+ return s
return bytes(s, encoding)
else:
def bytes_(s, encoding=None):
diff --git a/nosetests.1 b/nosetests.1
index 504ebbf..cf0473f 100644
--- a/nosetests.1
+++ b/nosetests.1
@@ -471,5 +471,5 @@ jpellerin+nose@gmail.com
.SH COPYRIGHT
LGPL
-.\" Generated by docutils manpage writer on 2011-04-18 10:57.
+.\" Generated by docutils manpage writer on 2011-05-02 14:25.
.\"
diff --git a/unit_tests/test_multiprocess.py b/unit_tests/test_multiprocess.py
index c262b7d..aeb65e4 100644
--- a/unit_tests/test_multiprocess.py
+++ b/unit_tests/test_multiprocess.py
@@ -46,11 +46,13 @@ class T(unittest.TestCase):
pass
def test_mp_process_args_pickleable():
- raise SkipTest('this currently gets stuck in poll() 90% of the time')
+ # TODO(Kumar) this test needs to be more succint.
+ # If you start seeing it timeout then perhaps we need to skip it again.
+ # raise SkipTest('this currently gets stuck in poll() 90% of the time')
test = case.Test(T('runTest'))
config = Config()
config.multiprocess_workers = 2
- config.multiprocess_timeout = 0.5
+ config.multiprocess_timeout = 5
runner = multiprocess.MultiProcessTestRunner(
stream=_WritelnDecorator(sys.stdout),
verbosity=10,