summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkumar <kumar.mcmillan@gmail.com>2011-11-23 00:06:20 -0600
committerkumar <kumar.mcmillan@gmail.com>2011-11-23 00:06:20 -0600
commit79efaa3d4cabff40e0194d2e6680a5070c6476e2 (patch)
tree5bb18ccb66ca7095b6c8bae22488496f899d49fd
parent0e87a596e464b1adf32a5bb6d894956ed8e5c8ad (diff)
parent58e1a16bb3e70b5531328431a35aac29b86bbf7d (diff)
downloadnose-79efaa3d4cabff40e0194d2e6680a5070c6476e2.tar.gz
really merged. hg, leave me alone man
-rw-r--r--AUTHORS1
-rw-r--r--CHANGELOG5
-rw-r--r--functional_tests/test_defaultpluginmanager.py22
-rw-r--r--functional_tests/test_multiprocessing/__init__.py28
-rw-r--r--functional_tests/test_multiprocessing/support/class.py14
-rw-r--r--functional_tests/test_multiprocessing/support/concurrent_shared/__init__.py6
-rw-r--r--functional_tests/test_multiprocessing/support/concurrent_shared/test.py11
-rw-r--r--functional_tests/test_multiprocessing/support/fake_nosetest.py14
-rw-r--r--functional_tests/test_multiprocessing/support/keyboardinterrupt.py29
-rw-r--r--functional_tests/test_multiprocessing/support/keyboardinterrupt_twice.py34
-rw-r--r--functional_tests/test_multiprocessing/support/timeout.py6
-rw-r--r--functional_tests/test_multiprocessing/test_class.py13
-rw-r--r--functional_tests/test_multiprocessing/test_concurrent_shared.py13
-rw-r--r--functional_tests/test_multiprocessing/test_keyboardinterrupt.py84
-rw-r--r--functional_tests/test_multiprocessing/test_nameerror.py26
-rw-r--r--functional_tests/test_multiprocessing/test_process_timeout.py30
-rw-r--r--nose/config.py24
-rw-r--r--nose/core.py36
-rw-r--r--nose/plugins/cover.py2
-rw-r--r--nose/plugins/manager.py34
-rw-r--r--nose/plugins/multiprocess.py483
-rw-r--r--nose/plugins/plugintest.py78
-rw-r--r--nosetests.12
23 files changed, 638 insertions, 357 deletions
diff --git a/AUTHORS b/AUTHORS
index aa962ea..c647a2b 100644
--- a/AUTHORS
+++ b/AUTHORS
@@ -21,3 +21,4 @@ Bobby Impollonia
Takafumi Arakaki
Peter Bengtsson
Gary Donovan
+Brendan McCollam
diff --git a/CHANGELOG b/CHANGELOG
index 2023b9c..8f6233b 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,10 +1,15 @@
1.1.3
+- Fixed issue where plugins included with `addplugins` keyword could
+ be overridden by built-in plugins (or third-party plugins registered
+ with setuptools) of the same name (#466).
+ Patch by Brendan McCollam
- Adds :option:`--cover-xml` and :option:`--cover-xml-file` (#311).
Patch by Timothée Peignier.
- Adds support for :option:`--cover-branches` (related to #370).
Patch by Timothée Peignier.
- Fixed Unicode issue on Python 3.1 with coverage (#442)
+- fixed class level fixture handling in multiprocessing plugin
1.1.2
diff --git a/functional_tests/test_defaultpluginmanager.py b/functional_tests/test_defaultpluginmanager.py
new file mode 100644
index 0000000..28c1b86
--- /dev/null
+++ b/functional_tests/test_defaultpluginmanager.py
@@ -0,0 +1,22 @@
+import unittest
+from nose.plugins import Plugin
+from nose.plugins.manager import DefaultPluginManager
+
+class OverridesSkip(Plugin):
+ """Plugin to override the built-in Skip"""
+ enabled = True
+ name = 'skip'
+ is_overridden = True
+
+
+class TestDefaultPluginManager(unittest.TestCase):
+
+ def test_extraplugins_override_builtins(self):
+ pm = DefaultPluginManager()
+ pm.addPlugins(extraplugins=[OverridesSkip()])
+ pm.loadPlugins()
+ for plugin in pm.plugins:
+ if plugin.name == "skip":
+ break
+ overridden = getattr(plugin, 'is_overridden', False)
+ self.assertTrue(overridden)
diff --git a/functional_tests/test_multiprocessing/__init__.py b/functional_tests/test_multiprocessing/__init__.py
new file mode 100644
index 0000000..2a52efd
--- /dev/null
+++ b/functional_tests/test_multiprocessing/__init__.py
@@ -0,0 +1,28 @@
+import os
+from unittest import TestCase
+
+from nose.plugins import PluginTester
+from nose.plugins.skip import SkipTest
+from nose.plugins.multiprocess import MultiProcess
+
+support = os.path.join(os.path.dirname(__file__), 'support')
+
+def setup():
+ try:
+ import multiprocessing
+ if 'active' in MultiProcess.status:
+ raise SkipTest("Multiprocess plugin is active. Skipping tests of "
+ "plugin itself.")
+ except ImportError:
+ raise SkipTest("multiprocessing module not available")
+
+class MPTestBase(PluginTester, TestCase):
+ processes = 1
+ activate = '--processes=1'
+ plugins = [MultiProcess()]
+ suitepath = os.path.join(support, 'timeout.py')
+
+ def __init__(self, *args, **kwargs):
+ self.activate = '--processes=%d' % self.processes
+ PluginTester.__init__(self)
+ TestCase.__init__(self, *args, **kwargs)
diff --git a/functional_tests/test_multiprocessing/support/class.py b/functional_tests/test_multiprocessing/support/class.py
new file mode 100644
index 0000000..905bcdf
--- /dev/null
+++ b/functional_tests/test_multiprocessing/support/class.py
@@ -0,0 +1,14 @@
+class TestFunctionalTest(object):
+ counter = 0
+ @classmethod
+ def setup_class(cls):
+ cls.counter += 1
+ @classmethod
+ def teardown_class(cls):
+ cls.counter -= 1
+ def _run(self):
+ assert self.counter==1
+ def test1(self):
+ self._run()
+ def test2(self):
+ self._run()
diff --git a/functional_tests/test_multiprocessing/support/concurrent_shared/__init__.py b/functional_tests/test_multiprocessing/support/concurrent_shared/__init__.py
new file mode 100644
index 0000000..c557001
--- /dev/null
+++ b/functional_tests/test_multiprocessing/support/concurrent_shared/__init__.py
@@ -0,0 +1,6 @@
+counter=[0]
+_multiprocess_shared_ = True
+def setup_package():
+ counter[0] += 1
+def teardown_package():
+ counter[0] -= 1
diff --git a/functional_tests/test_multiprocessing/support/concurrent_shared/test.py b/functional_tests/test_multiprocessing/support/concurrent_shared/test.py
new file mode 100644
index 0000000..5bc21f6
--- /dev/null
+++ b/functional_tests/test_multiprocessing/support/concurrent_shared/test.py
@@ -0,0 +1,11 @@
+#from . import counter
+from time import sleep
+#_multiprocess_can_split_ = True
+class Test1(object):
+ def test1(self):
+ sleep(1)
+ pass
+class Test2(object):
+ def test2(self):
+ sleep(1)
+ pass
diff --git a/functional_tests/test_multiprocessing/support/fake_nosetest.py b/functional_tests/test_multiprocessing/support/fake_nosetest.py
new file mode 100644
index 0000000..f5a6289
--- /dev/null
+++ b/functional_tests/test_multiprocessing/support/fake_nosetest.py
@@ -0,0 +1,14 @@
+import os
+import sys
+
+import nose
+from nose.plugins.multiprocess import MultiProcess
+from nose.config import Config
+from nose.plugins.manager import PluginManager
+
+if __name__ == '__main__':
+ if len(sys.argv) < 3:
+ print "USAGE: %s TEST_FILE LOG_FILE" % sys.argv[0]
+ sys.exit(1)
+ os.environ['NOSE_MP_LOG']=sys.argv[2]
+ nose.main(defaultTest=sys.argv[1], argv=[sys.argv[0],'--processes=1','-v'], config=Config(plugins=PluginManager(plugins=[MultiProcess()])))
diff --git a/functional_tests/test_multiprocessing/support/keyboardinterrupt.py b/functional_tests/test_multiprocessing/support/keyboardinterrupt.py
new file mode 100644
index 0000000..2c36d95
--- /dev/null
+++ b/functional_tests/test_multiprocessing/support/keyboardinterrupt.py
@@ -0,0 +1,29 @@
+import os
+
+from tempfile import mktemp
+from time import sleep
+
+if 'NOSE_MP_LOG' not in os.environ:
+ raise Exception('Environment variable NOSE_MP_LOG is not set')
+
+logfile = os.environ['NOSE_MP_LOG']
+
+def log(w):
+ f = open(logfile, 'a')
+ f.write(w+"\n")
+ f.close()
+#make sure all tests in this file are dispatched to the same subprocess
+def setup():
+ log('setup')
+
+def test_timeout():
+ log('test_timeout')
+ sleep(2)
+ log('test_timeout_finished')
+
+# check timeout will not prevent remaining tests dispatched to the same subprocess to continue to run
+def test_pass():
+ log('test_pass')
+
+def teardown():
+ log('teardown')
diff --git a/functional_tests/test_multiprocessing/support/keyboardinterrupt_twice.py b/functional_tests/test_multiprocessing/support/keyboardinterrupt_twice.py
new file mode 100644
index 0000000..3932bbd
--- /dev/null
+++ b/functional_tests/test_multiprocessing/support/keyboardinterrupt_twice.py
@@ -0,0 +1,34 @@
+import os
+
+from tempfile import mktemp
+from time import sleep
+
+if 'NOSE_MP_LOG' not in os.environ:
+ raise Exception('Environment variable NOSE_MP_LOG is not set')
+
+logfile = os.environ['NOSE_MP_LOG']
+
+def log(w):
+ f = open(logfile, 'a')
+ f.write(w+"\n")
+ f.close()
+#make sure all tests in this file are dispatched to the same subprocess
+def setup():
+ '''global logfile
+ logfile = mktemp()
+ print "tempfile is:",logfile'''
+ log('setup')
+
+def test_timeout():
+ log('test_timeout')
+ sleep(2)
+ log('test_timeout_finished')
+
+# check timeout will not prevent remaining tests dispatched to the same subprocess to continue to run
+def test_pass():
+ log('test_pass')
+
+def teardown():
+ log('teardown')
+ sleep(10)
+ log('teardown_finished')
diff --git a/functional_tests/test_multiprocessing/support/timeout.py b/functional_tests/test_multiprocessing/support/timeout.py
index 52dce12..480c859 100644
--- a/functional_tests/test_multiprocessing/support/timeout.py
+++ b/functional_tests/test_multiprocessing/support/timeout.py
@@ -1,6 +1,12 @@
+#make sure all tests in this file are dispatched to the same subprocess
+def setup():
+ pass
def test_timeout():
"this test *should* fail when process-timeout=1"
from time import sleep
sleep(2)
+# check timeout will not prevent remaining tests dispatched to the same subprocess to continue to run
+def test_pass():
+ pass
diff --git a/functional_tests/test_multiprocessing/test_class.py b/functional_tests/test_multiprocessing/test_class.py
new file mode 100644
index 0000000..d92710d
--- /dev/null
+++ b/functional_tests/test_multiprocessing/test_class.py
@@ -0,0 +1,13 @@
+import os
+
+from test_multiprocessing import MPTestBase
+
+
+#test case for #462
+class TestClassFixture(MPTestBase):
+ suitepath = os.path.join(os.path.dirname(__file__), 'support', 'class.py')
+
+ def runTest(self):
+ assert str(self.output).strip().endswith('OK')
+ assert 'Ran 2 tests' in self.output
+
diff --git a/functional_tests/test_multiprocessing/test_concurrent_shared.py b/functional_tests/test_multiprocessing/test_concurrent_shared.py
new file mode 100644
index 0000000..2552c2b
--- /dev/null
+++ b/functional_tests/test_multiprocessing/test_concurrent_shared.py
@@ -0,0 +1,13 @@
+import os
+
+from test_multiprocessing import MPTestBase
+
+class TestConcurrentShared(MPTestBase):
+ processes = 2
+ suitepath = os.path.join(os.path.dirname(__file__), 'support',
+ 'concurrent_shared')
+
+ def runTest(self):
+ assert 'Ran 2 tests in 1.' in self.output, "make sure two tests use 1.x seconds (no more than 2 seconsd)"
+ assert str(self.output).strip().endswith('OK')
+
diff --git a/functional_tests/test_multiprocessing/test_keyboardinterrupt.py b/functional_tests/test_multiprocessing/test_keyboardinterrupt.py
new file mode 100644
index 0000000..8f07e54
--- /dev/null
+++ b/functional_tests/test_multiprocessing/test_keyboardinterrupt.py
@@ -0,0 +1,84 @@
+from subprocess import Popen,PIPE
+import os
+import sys
+from time import sleep
+import signal
+
+import nose
+
+support = os.path.join(os.path.dirname(__file__), 'support')
+
+PYTHONPATH = os.environ['PYTHONPATH'] if 'PYTHONPATH' in os.environ else ''
+def setup():
+ nose_parent_dir = os.path.normpath(os.path.join(os.path.abspath(os.path.dirname(nose.__file__)),'..'))
+ paths = [nose_parent_dir]
+ if PYTHONPATH:
+ paths.append(PYTHONPATH)
+ os.environ['PYTHONPATH'] = os.pathsep.join(paths)
+def teardown():
+ if PYTHONPATH:
+ os.environ['PYTHONPATH'] = PYTHONPATH
+ else:
+ del os.environ['PYTHONPATH']
+
+runner = os.path.join(support, 'fake_nosetest.py')
+def keyboardinterrupt(case):
+ #os.setsid would create a process group so signals sent to the
+ #parent process will propogates to all children processes
+ from tempfile import mktemp
+ logfile = mktemp()
+ process = Popen([sys.executable,runner,os.path.join(support,case),logfile], preexec_fn=os.setsid, stdout=PIPE, stderr=PIPE, bufsize=-1)
+
+ #wait until logfile is created:
+ retry=100
+ while not os.path.exists(logfile):
+ sleep(0.1)
+ retry -= 1
+ if not retry:
+ raise Exception('Timeout while waiting for log file to be created by fake_nosetest.py')
+
+ os.killpg(process.pid, signal.SIGINT)
+ return process, logfile
+
+def get_log_content(logfile):
+ '''prefix = 'tempfile is: '
+ if not stdout.startswith(prefix):
+ raise Exception('stdout does not contain tmp file name: '+stdout)
+ logfile = stdout[len(prefix):].strip() #remove trailing new line char'''
+ f = open(logfile)
+ content = f.read()
+ f.close()
+ os.remove(logfile)
+ return content
+
+def test_keyboardinterrupt():
+ process, logfile = keyboardinterrupt('keyboardinterrupt.py')
+ stdout, stderr = [s.decode('utf-8') for s in process.communicate(None)]
+ print stderr
+ log = get_log_content(logfile)
+ assert 'setup' in log
+ assert 'test_timeout' in log
+ assert 'test_timeout_finished' not in log
+ assert 'test_pass' not in log
+ assert 'teardown' in log
+ assert 'Ran 0 tests' in stderr
+ assert 'KeyboardInterrupt' in stderr
+ assert 'FAILED (errors=1)' in stderr
+ assert 'ERROR: Worker 0 keyboard interrupt, failing current test '+os.path.join(support,'keyboardinterrupt.py') in stderr
+
+
+def test_keyboardinterrupt_twice():
+ process, logfile = keyboardinterrupt('keyboardinterrupt_twice.py')
+ sleep(0.5)
+ os.killpg(process.pid, signal.SIGINT)
+ stdout, stderr = [s.decode('utf-8') for s in process.communicate(None)]
+ log = get_log_content(logfile)
+ assert 'setup' in log
+ assert 'test_timeout' in log
+ assert 'test_timeout_finished' not in log
+ assert 'test_pass' not in log
+ assert 'teardown' in log
+ assert 'teardown_finished' not in log
+ assert 'Ran 0 tests' in stderr
+ assert 'KeyboardInterrupt' in stderr
+ assert 'FAILED (errors=1)' in stderr
diff --git a/functional_tests/test_multiprocessing/test_nameerror.py b/functional_tests/test_multiprocessing/test_nameerror.py
index f73d02b..5e58226 100644
--- a/functional_tests/test_multiprocessing/test_nameerror.py
+++ b/functional_tests/test_multiprocessing/test_nameerror.py
@@ -1,28 +1,10 @@
import os
-import unittest
-from nose.plugins import PluginTester
-from nose.plugins.skip import SkipTest
-from nose.plugins.multiprocess import MultiProcess
+from test_multiprocessing import support, MPTestBase
-
-support = os.path.join(os.path.dirname(__file__), 'support')
-
-
-def setup():
- try:
- import multiprocessing
- if 'active' in MultiProcess.status:
- raise SkipTest("Multiprocess plugin is active. Skipping tests of "
- "plugin itself.")
- except ImportError:
- raise SkipTest("multiprocessing module not available")
-
-
-class TestMPNameError(PluginTester, unittest.TestCase):
- activate = '--processes=2'
- plugins = [MultiProcess()]
- suitepath = os.path.join(support, 'nameerror.py')
+class TestMPNameError(MPTestBase):
+ processes = 2
+ suitepath = os.path.join(os.path.dirname(__file__), 'support', 'nameerror.py')
def runTest(self):
print str(self.output)
diff --git a/functional_tests/test_multiprocessing/test_process_timeout.py b/functional_tests/test_multiprocessing/test_process_timeout.py
index 535ecdb..6b858f8 100644
--- a/functional_tests/test_multiprocessing/test_process_timeout.py
+++ b/functional_tests/test_multiprocessing/test_process_timeout.py
@@ -1,37 +1,21 @@
import os
-import unittest
-from nose.plugins import PluginTester
-from nose.plugins.skip import SkipTest
-from nose.plugins.multiprocess import MultiProcess
+from test_multiprocessing import MPTestBase
-support = os.path.join(os.path.dirname(__file__), 'support')
-
-
-def setup():
- try:
- import multiprocessing
- if 'active' in MultiProcess.status:
- raise SkipTest("Multiprocess plugin is active. Skipping tests of "
- "plugin itself.")
- except ImportError:
- raise SkipTest("multiprocessing module not available")
-
-
-
-class TestMPTimeout(PluginTester, unittest.TestCase):
- activate = '--processes=2'
+class TestMPTimeout(MPTestBase):
args = ['--process-timeout=1']
- plugins = [MultiProcess()]
- suitepath = os.path.join(support, 'timeout.py')
+ suitepath = os.path.join(os.path.dirname(__file__), 'support', 'timeout.py')
def runTest(self):
assert "TimedOutException: 'timeout.test_timeout'" in self.output
-
+ assert "Ran 2 tests in" in self.output
+ assert "FAILED (errors=1)" in self.output
class TestMPTimeoutPass(TestMPTimeout):
args = ['--process-timeout=3']
def runTest(self):
assert "TimedOutException: 'timeout.test_timeout'" not in self.output
+ assert "Ran 2 tests in" in self.output
assert str(self.output).strip().endswith('OK')
+
diff --git a/nose/config.py b/nose/config.py
index d787fed..e110b7a 100644
--- a/nose/config.py
+++ b/nose/config.py
@@ -169,7 +169,7 @@ class Config(object):
self.verbosity = int(env.get('NOSE_VERBOSE', 1))
self.where = ()
self.py3where = ()
- self.workingDir = None
+ self.workingDir = None
"""
def __init__(self, **kw):
@@ -210,7 +210,7 @@ class Config(object):
self.firstPackageWins = False
self.parserClass = OptionParser
self.worker = False
-
+
self._default = self.__dict__.copy()
self.update(kw)
self._orig = self.__dict__.copy()
@@ -237,7 +237,7 @@ class Config(object):
dummy_parser = self.parserClass()
self.plugins.addOptions(dummy_parser, {})
self.plugins.configure(self.options, self)
-
+
def __repr__(self):
d = self.__dict__.copy()
# don't expose env, could include sensitive info
@@ -289,7 +289,7 @@ class Config(object):
if sys.version_info >= (3,):
options.where = options.py3where
- # `where` is an append action, so it can't have a default value
+ # `where` is an append action, so it can't have a default value
# in the parser, or that default will always be in the list
if not options.where:
options.where = env.get('NOSE_WHERE', None)
@@ -315,16 +315,16 @@ class Config(object):
if options.where is not None:
self.configureWhere(options.where)
-
+
if options.testMatch:
self.testMatch = re.compile(options.testMatch)
-
+
if options.ignoreFiles:
self.ignoreFiles = map(re.compile, tolist(options.ignoreFiles))
log.info("Ignoring files matching %s", options.ignoreFiles)
else:
log.info("Ignoring files matching %s", self.ignoreFilesDefaultStrings)
-
+
if options.include:
self.include = map(re.compile, tolist(options.include))
log.info("Including tests matching %s", options.include)
@@ -348,7 +348,7 @@ class Config(object):
from logging.config import fileConfig
fileConfig(self.loggingConfig)
return
-
+
format = logging.Formatter('%(name)s: %(levelname)s: %(message)s')
if self.debugLog:
handler = logging.FileHandler(self.debugLog)
@@ -364,7 +364,7 @@ class Config(object):
if handler not in logger.handlers:
logger.addHandler(handler)
- # default level
+ # default level
lvl = logging.WARNING
if self.verbosity >= 5:
lvl = 0
@@ -580,7 +580,7 @@ class Config(object):
def todict(self):
return self.__dict__.copy()
-
+
def update(self, d):
self.__dict__.update(d)
@@ -590,13 +590,13 @@ class NoOptions(object):
"""
def __getstate__(self):
return {}
-
+
def __setstate__(self, state):
pass
def __getnewargs__(self):
return ()
-
+
def __getattr__(self, attr):
return None
diff --git a/nose/core.py b/nose/core.py
index e219903..b78f232 100644
--- a/nose/core.py
+++ b/nose/core.py
@@ -23,12 +23,12 @@ compat_24 = sys.version_info >= (2, 4)
__all__ = ['TestProgram', 'main', 'run', 'run_exit', 'runmodule', 'collector',
'TextTestRunner']
-
+
class TextTestRunner(unittest.TextTestRunner):
"""Test runner that uses nose's TextTestResult to enable errorClasses,
as well as providing hooks for plugins to override or replace the test
output stream, results, and the test case itself.
- """
+ """
def __init__(self, stream=sys.stderr, descriptions=1, verbosity=1,
config=None):
if config is None:
@@ -36,7 +36,7 @@ class TextTestRunner(unittest.TextTestRunner):
self.config = config
unittest.TextTestRunner.__init__(self, stream, descriptions, verbosity)
-
+
def _makeResult(self):
return TextTestResult(self.stream,
self.descriptions,
@@ -50,12 +50,12 @@ class TextTestRunner(unittest.TextTestRunner):
wrapper = self.config.plugins.prepareTest(test)
if wrapper is not None:
test = wrapper
-
+
# plugins can decorate or capture the output stream
wrapped = self.config.plugins.setOutputStream(self.stream)
if wrapped is not None:
self.stream = wrapped
-
+
result = self._makeResult()
start = time.time()
test(result)
@@ -65,7 +65,7 @@ class TextTestRunner(unittest.TextTestRunner):
self.config.plugins.finalize(result)
return result
-
+
class TestProgram(unittest.TestProgram):
"""Collect and run tests, returning success or failure.
@@ -104,7 +104,7 @@ class TestProgram(unittest.TestProgram):
if config is None:
config = self.makeConfig(env, plugins)
if addplugins:
- config.plugins.addPlugins(addplugins)
+ config.plugins.addPlugins(extraplugins=addplugins)
self.config = config
self.suite = suite
self.exit = exit
@@ -121,14 +121,14 @@ class TestProgram(unittest.TestProgram):
"""Load a Config, pre-filled with user config files if any are
found.
"""
- cfg_files = all_config_files()
+ cfg_files = all_config_files()
if plugins:
manager = PluginManager(plugins=plugins)
else:
manager = DefaultPluginManager()
return Config(
env=env, files=cfg_files, plugins=manager)
-
+
def parseArgs(self, argv):
"""Parse argv and env and configure running environment.
"""
@@ -146,7 +146,7 @@ class TestProgram(unittest.TestProgram):
if self.config.options.showPlugins:
self.showPlugins()
sys.exit(0)
-
+
if self.testLoader is None:
self.testLoader = defaultTestLoader(config=self.config)
elif isclass(self.testLoader):
@@ -155,7 +155,7 @@ class TestProgram(unittest.TestProgram):
if plug_loader is not None:
self.testLoader = plug_loader
log.debug("test loader is %s", self.testLoader)
-
+
# FIXME if self.module is a string, add it to self.testNames? not sure
if self.config.testNames:
@@ -167,7 +167,7 @@ class TestProgram(unittest.TestProgram):
if self.config.workingDir is not None:
os.chdir(self.config.workingDir)
self.createTests()
-
+
def createTests(self):
"""Create the tests to run. If a self.suite
is set, then that suite will be used. Otherwise, tests will be
@@ -210,10 +210,10 @@ class TestProgram(unittest.TestProgram):
self.options = []
def add_option(self, *arg, **kw):
self.options.append((arg, kw.pop('help', '')))
-
+
v = self.config.verbosity
self.config.plugins.sort()
- for p in self.config.plugins:
+ for p in self.config.plugins:
print "Plugin %s" % p.name
if v >= 2:
print " score: %s" % p.score
@@ -234,7 +234,7 @@ class TestProgram(unittest.TestProgram):
initial_indent=' ',
subsequent_indent=' '))
print
-
+
def usage(cls):
import nose
if hasattr(nose, '__loader__'):
@@ -276,9 +276,9 @@ def run(*arg, **kw):
* addplugins: List of **extra** plugins to use. Pass a list of plugin
instances in this argument to make custom plugins available while
still using the DefaultPluginManager.
-
+
With the exception that the ``exit`` argument is always set
- to False.
+ to False.
"""
kw['exit'] = False
return TestProgram(*arg, **kw).success
@@ -297,7 +297,7 @@ def collector():
unittest.TestSuite. The collector will, by default, load options from
all config files and execute loader.loadTestsFromNames() on the
configured testNames, or '.' if no testNames are configured.
- """
+ """
# plugins that implement any of these methods are disabled, since
# we don't control the test runner and won't be able to run them
# finalize() is also not called, but plugins that use it aren't disabled,
diff --git a/nose/plugins/cover.py b/nose/plugins/cover.py
index 21545bb..a055c92 100644
--- a/nose/plugins/cover.py
+++ b/nose/plugins/cover.py
@@ -161,7 +161,7 @@ class Coverage(Plugin):
self.coverXmlFile = None
if options.cover_xml:
self.coverXmlFile = options.cover_xml_file
- log.debug('Will put XML coverage report in %s', self.coverHtmlFile)
+ log.debug('Will put XML coverage report in %s', self.coverXmlFile)
if self.enabled:
self.status['active'] = True
diff --git a/nose/plugins/manager.py b/nose/plugins/manager.py
index ce3e48a..4d2ed22 100644
--- a/nose/plugins/manager.py
+++ b/nose/plugins/manager.py
@@ -17,6 +17,10 @@ The plugin managers provided with nose are:
:class:`EntryPointPluginManager`
This manager uses setuptools entrypoints to load plugins.
+:class:`ExtraPluginsPluginManager`
+ This manager loads extra plugins specified with the keyword
+ `addplugins`.
+
:class:`DefaultPluginMananger`
This is the manager class that will be used by default. If
setuptools is installed, it is a subclass of
@@ -48,6 +52,7 @@ import inspect
import logging
import os
import sys
+from itertools import chain as iterchain
from warnings import warn
import nose.config
from nose.failure import Failure
@@ -218,8 +223,10 @@ class NoPlugins(object):
class PluginManager(object):
- """Base class for plugin managers. Does not implement loadPlugins, so it
- may only be used with a static list of plugins.
+ """Base class for plugin managers. PluginManager is intended to be
+ used only with a static list of plugins. The loadPlugins() implementation
+ only reloads plugins from _extraplugins to prevent those from being
+ overridden by a subclass.
The basic functionality of a plugin manager is to proxy all unknown
attributes through a ``PluginProxy`` to a list of plugins.
@@ -231,6 +238,7 @@ class PluginManager(object):
def __init__(self, plugins=(), proxyClass=None):
self._plugins = []
+ self._extraplugins = ()
self._proxies = {}
if plugins:
self.addPlugins(plugins)
@@ -256,8 +264,13 @@ class PluginManager(object):
if getattr(p, 'name', None) != new_name]
self._plugins.append(plug)
- def addPlugins(self, plugins):
- for plug in plugins:
+ def addPlugins(self, plugins=(), extraplugins=()):
+ """extraplugins are maintained in a separate list and
+ re-added by loadPlugins() to prevent their being overwritten
+ by plugins added by a subclass of PluginManager
+ """
+ self._extraplugins = extraplugins
+ for plug in iterchain(plugins, extraplugins):
self.addPlugin(plug)
def configure(self, options, config):
@@ -275,7 +288,8 @@ class PluginManager(object):
log.debug("Plugins enabled: %s", enabled)
def loadPlugins(self):
- pass
+ for plug in self._extraplugins:
+ self.addPlugin(plug)
def sort(self):
return sort_list(self._plugins, lambda x: getattr(x, 'score', 1), reverse=True)
@@ -361,9 +375,7 @@ class EntryPointPluginManager(PluginManager):
def loadPlugins(self):
"""Load plugins by iterating the `nose.plugins` entry point.
"""
- super(EntryPointPluginManager, self).loadPlugins()
from pkg_resources import iter_entry_points
-
loaded = {}
for entry_point, adapt in self.entry_points:
for ep in iter_entry_points(entry_point):
@@ -387,6 +399,7 @@ class EntryPointPluginManager(PluginManager):
else:
plug = plugcls()
self.addPlugin(plug)
+ super(EntryPointPluginManager, self).loadPlugins()
class BuiltinPluginManager(PluginManager):
@@ -403,11 +416,12 @@ class BuiltinPluginManager(PluginManager):
try:
import pkg_resources
- class DefaultPluginManager(BuiltinPluginManager, EntryPointPluginManager):
+ class DefaultPluginManager(EntryPointPluginManager, BuiltinPluginManager):
pass
-except ImportError:
- DefaultPluginManager = BuiltinPluginManager
+except ImportError:
+ class DefaultPluginManager(BuiltinPluginManager):
+ pass
class RestrictedPluginManager(DefaultPluginManager):
"""Plugin manager that restricts the plugin list to those not
diff --git a/nose/plugins/multiprocess.py b/nose/plugins/multiprocess.py
index 260cbf8..48ba54d 100644
--- a/nose/plugins/multiprocess.py
+++ b/nose/plugins/multiprocess.py
@@ -130,7 +130,8 @@ log = logging.getLogger(__name__)
Process = Queue = Pool = Event = Value = Array = None
-class TimedOutException(Exception):
+# have to inherit KeyboardInterrupt to it will interrupt process properly
+class TimedOutException(KeyboardInterrupt):
def __init__(self, value = "Timed Out"):
self.value = value
def __str__(self):
@@ -140,7 +141,16 @@ def _import_mp():
global Process, Queue, Pool, Event, Value, Array
try:
from multiprocessing import Manager, Process
+ #prevent the server process created in the manager which holds Python
+ #objects and allows other processes to manipulate them using proxies
+ #to interrupt on SIGINT (keyboardinterrupt) so that the communication
+ #channel between subprocesses and main process is still usable after
+ #ctrl+C is received in the main process.
+ old=signal.signal(signal.SIGINT, signal.SIG_IGN)
m = Manager()
+ #reset it back so main process will receive a KeyboardInterrupt
+ #exception on ctrl+c
+ signal.signal(signal.SIGINT, old)
Queue, Pool, Event, Value, Array = (
m.Queue, m.Pool, m.Event, m.Value, m.Array
)
@@ -247,42 +257,17 @@ class MultiProcess(Plugin):
config=self.config,
loaderClass=self.loaderClass)
+def signalhandler(sig, frame):
+ raise TimedOutException()
+
class MultiProcessTestRunner(TextTestRunner):
waitkilltime = 5.0 # max time to wait to terminate a process that does not
- # respond to SIGINT
+ # respond to SIGILL
def __init__(self, **kw):
self.loaderClass = kw.pop('loaderClass', loader.defaultTestLoader)
super(MultiProcessTestRunner, self).__init__(**kw)
- def run(self, test):
- """
- Execute the test (which may be a test suite). If the test is a suite,
- distribute it out among as many processes as have been configured, at
- as fine a level as is possible given the context fixtures defined in
- the suite or any sub-suites.
-
- """
- log.debug("%s.run(%s) (%s)", self, test, os.getpid())
- wrapper = self.config.plugins.prepareTest(test)
- if wrapper is not None:
- test = wrapper
-
- # plugins can decorate or capture the output stream
- wrapped = self.config.plugins.setOutputStream(self.stream)
- if wrapped is not None:
- self.stream = wrapped
-
- testQueue = Queue()
- resultQueue = Queue()
- tasks = []
- completed = []
- workers = []
- to_teardown = []
- shouldStop = Event()
-
- result = self._makeResult()
- start = time.time()
-
+ def collect(self, test, testQueue, tasks, to_teardown, result):
# dispatch and collect results
# put indexes only on queue because tests aren't picklable
for case in self.nextBatch(test):
@@ -308,32 +293,76 @@ class MultiProcessTestRunner(TextTestRunner):
result.addError(case, sys.exc_info())
else:
to_teardown.append(case)
- for _t in case:
- test_addr = self.addtask(testQueue,tasks,_t)
- log.debug("Queued shared-fixture test %s (%s) to %s",
- len(tasks), test_addr, testQueue)
+ if case.factory:
+ ancestors=case.factory.context.get(case, [])
+ for an in ancestors[:2]:
+ #log.debug('reset ancestor %s', an)
+ if getattr(an, '_multiprocess_shared_', False):
+ an._multiprocess_can_split_=True
+ #an._multiprocess_shared_=False
+ self.collect(case, testQueue, tasks, to_teardown, result)
else:
test_addr = self.addtask(testQueue,tasks,case)
log.debug("Queued test %s (%s) to %s",
len(tasks), test_addr, testQueue)
+ def startProcess(self, iworker, testQueue, resultQueue, shouldStop, result):
+ currentaddr = Value('c',bytes_(''))
+ currentstart = Value('d',time.time())
+ keyboardCaught = Event()
+ p = Process(target=runner,
+ args=(iworker, testQueue,
+ resultQueue,
+ currentaddr,
+ currentstart,
+ keyboardCaught,
+ shouldStop,
+ self.loaderClass,
+ result.__class__,
+ pickle.dumps(self.config)))
+ p.currentaddr = currentaddr
+ p.currentstart = currentstart
+ p.keyboardCaught = keyboardCaught
+ old = signal.signal(signal.SIGILL, signalhandler)
+ p.start()
+ signal.signal(signal.SIGILL, old)
+ return p
+
+ def run(self, test):
+ """
+ Execute the test (which may be a test suite). If the test is a suite,
+ distribute it out among as many processes as have been configured, at
+ as fine a level as is possible given the context fixtures defined in
+ the suite or any sub-suites.
+
+ """
+ log.debug("%s.run(%s) (%s)", self, test, os.getpid())
+ wrapper = self.config.plugins.prepareTest(test)
+ if wrapper is not None:
+ test = wrapper
+
+ # plugins can decorate or capture the output stream
+ wrapped = self.config.plugins.setOutputStream(self.stream)
+ if wrapped is not None:
+ self.stream = wrapped
+
+ testQueue = Queue()
+ resultQueue = Queue()
+ tasks = []
+ completed = []
+ workers = []
+ to_teardown = []
+ shouldStop = Event()
+
+ result = self._makeResult()
+ start = time.time()
+
+ self.collect(test, testQueue, tasks, to_teardown, result)
+
log.debug("Starting %s workers", self.config.multiprocess_workers)
for i in range(self.config.multiprocess_workers):
- currentaddr = Value('c',bytes_(''))
- currentstart = Value('d',0.0)
- keyboardCaught = Event()
- p = Process(target=runner, args=(i, testQueue, resultQueue,
- currentaddr, currentstart,
- keyboardCaught, shouldStop,
- self.loaderClass,
- result.__class__,
- pickle.dumps(self.config)))
- p.currentaddr = currentaddr
- p.currentstart = currentstart
- p.keyboardCaught = keyboardCaught
- # p.setDaemon(True)
- p.start()
+ p = self.startProcess(i, testQueue, resultQueue, shouldStop, result)
workers.append(p)
log.debug("Started worker process %s", i+1)
@@ -341,162 +370,143 @@ class MultiProcessTestRunner(TextTestRunner):
# need to keep track of the next time to check for timeouts in case
# more than one process times out at the same time.
nexttimeout=self.config.multiprocess_timeout
- while tasks:
- log.debug("Waiting for results (%s/%s tasks), next timeout=%.3fs",
- len(completed), total_tasks,nexttimeout)
- try:
- iworker, addr, newtask_addrs, batch_result = resultQueue.get(
- timeout=nexttimeout)
- log.debug('Results received for worker %d, %s, new tasks: %d',
- iworker,addr,len(newtask_addrs))
+ thrownError = None
+
+ try:
+ while tasks:
+ log.debug("Waiting for results (%s/%s tasks), next timeout=%.3fs",
+ len(completed), total_tasks,nexttimeout)
try:
+ iworker, addr, newtask_addrs, batch_result = resultQueue.get(
+ timeout=nexttimeout)
+ log.debug('Results received for worker %d, %s, new tasks: %d',
+ iworker,addr,len(newtask_addrs))
try:
- tasks.remove(addr)
- except ValueError:
- log.warn('worker %s failed to remove from tasks: %s',
- iworker,addr)
- total_tasks += len(newtask_addrs)
- for newaddr in newtask_addrs:
- tasks.append(newaddr)
- except KeyError:
- log.debug("Got result for unknown task? %s", addr)
- log.debug("current: %s",str(list(tasks)[0]))
- else:
- completed.append([addr,batch_result])
- self.consolidate(result, batch_result)
- if (self.config.stopOnError
- and not result.wasSuccessful()):
- # set the stop condition
- shouldStop.set()
- break
- if self.config.multiprocess_restartworker:
- log.debug('joining worker %s',iworker)
- # wait for working, but not that important if worker
- # cannot be joined in fact, for workers that add to
- # testQueue, they will not terminate until all their
- # items are read
- workers[iworker].join(timeout=1)
- if not shouldStop.is_set() and not testQueue.empty():
- log.debug('starting new process on worker %s',iworker)
- currentaddr = Value('c',bytes_(''))
- currentstart = Value('d',time.time())
- keyboardCaught = Event()
- workers[iworker] = Process(target=runner,
- args=(iworker, testQueue,
- resultQueue,
- currentaddr,
- currentstart,
- keyboardCaught,
- shouldStop,
- self.loaderClass,
- result.__class__,
- pickle.dumps(self.config)))
- workers[iworker].currentaddr = currentaddr
- workers[iworker].currentstart = currentstart
- workers[iworker].keyboardCaught = keyboardCaught
- workers[iworker].start()
- except Empty:
- log.debug("Timed out with %s tasks pending "
- "(empty testQueue=%d): %s",
- len(tasks),testQueue.empty(),str(tasks))
- any_alive = False
- for iworker, w in enumerate(workers):
- if w.is_alive():
- worker_addr = bytes_(w.currentaddr.value,'ascii')
- timeprocessing = time.time() - w.currentstart.value
- if ( len(worker_addr) == 0
+ try:
+ tasks.remove(addr)
+ except ValueError:
+ log.warn('worker %s failed to remove from tasks: %s',
+ iworker,addr)
+ total_tasks += len(newtask_addrs)
+ tasks.extend(newtask_addrs)
+ except KeyError:
+ log.debug("Got result for unknown task? %s", addr)
+ log.debug("current: %s",str(list(tasks)[0]))
+ else:
+ completed.append([addr,batch_result])
+ self.consolidate(result, batch_result)
+ if (self.config.stopOnError
+ and not result.wasSuccessful()):
+ # set the stop condition
+ shouldStop.set()
+ break
+ if self.config.multiprocess_restartworker:
+ log.debug('joining worker %s',iworker)
+ # wait for working, but not that important if worker
+ # cannot be joined in fact, for workers that add to
+ # testQueue, they will not terminate until all their
+ # items are read
+ workers[iworker].join(timeout=1)
+ if not shouldStop.is_set() and not testQueue.empty():
+ log.debug('starting new process on worker %s',iworker)
+ workers[iworker] = self.startProcess(iworker, testQueue, resultQueue, shouldStop, result)
+ except Empty:
+ log.debug("Timed out with %s tasks pending "
+ "(empty testQueue=%r): %s",
+ len(tasks),testQueue.empty(),str(tasks))
+ any_alive = False
+ for iworker, w in enumerate(workers):
+ if w.is_alive():
+ worker_addr = bytes_(w.currentaddr.value,'ascii')
+ timeprocessing = time.time() - w.currentstart.value
+ if ( len(worker_addr) == 0
+ and timeprocessing > self.config.multiprocess_timeout-0.1):
+ log.debug('worker %d has finished its work item, '
+ 'but is not exiting? do we wait for it?',
+ iworker)
+ else:
+ any_alive = True
+ if (len(worker_addr) > 0
and timeprocessing > self.config.multiprocess_timeout-0.1):
- log.debug('worker %d has finished its work item, '
- 'but is not exiting? do we wait for it?',
- iworker)
- else:
- any_alive = True
- if (len(worker_addr) > 0
- and timeprocessing > self.config.multiprocess_timeout-0.1):
- log.debug('timed out worker %s: %s',
- iworker,worker_addr)
- w.currentaddr.value = bytes_('')
- # If the process is in C++ code, sending a SIGINT
- # might not send a python KeybordInterrupt exception
- # therefore, send multiple signals until an
- # exception is caught. If this takes too long, then
- # terminate the process
- w.keyboardCaught.clear()
- startkilltime = time.time()
- while not w.keyboardCaught.is_set() and w.is_alive():
- if time.time()-startkilltime > self.waitkilltime:
- # have to terminate...
- log.error("terminating worker %s",iworker)
- w.terminate()
- currentaddr = Value('c',bytes_(''))
- currentstart = Value('d',time.time())
- keyboardCaught = Event()
- workers[iworker] = Process(target=runner,
- args=(iworker, testQueue, resultQueue,
- currentaddr, currentstart,
- keyboardCaught, shouldStop,
- self.loaderClass,
- result.__class__,
- pickle.dumps(self.config)))
- workers[iworker].currentaddr = currentaddr
- workers[iworker].currentstart = currentstart
- workers[iworker].keyboardCaught = keyboardCaught
- workers[iworker].start()
- # there is a small probability that the
- # terminated process might send a result,
- # which has to be specially handled or
- # else processes might get orphaned.
- w = workers[iworker]
- break
- os.kill(w.pid, signal.SIGINT)
- time.sleep(0.1)
- if not any_alive and testQueue.empty():
- log.debug("All workers dead")
- break
- nexttimeout=self.config.multiprocess_timeout
- for w in workers:
- if w.is_alive() and len(w.currentaddr.value) > 0:
- timeprocessing = time.time()-w.currentstart.value
- if timeprocessing <= self.config.multiprocess_timeout:
- nexttimeout = min(nexttimeout,
- self.config.multiprocess_timeout-timeprocessing)
-
- log.debug("Completed %s tasks (%s remain)", len(completed), len(tasks))
-
- for case in to_teardown:
- log.debug("Tearing down shared fixtures for %s", case)
- try:
- case.tearDown()
- except (KeyboardInterrupt, SystemExit):
- raise
- except:
- result.addError(case, sys.exc_info())
+ log.debug('timed out worker %s: %s',
+ iworker,worker_addr)
+ w.currentaddr.value = bytes_('')
+ # If the process is in C++ code, sending a SIGILL
+ # might not send a python KeybordInterrupt exception
+ # therefore, send multiple signals until an
+ # exception is caught. If this takes too long, then
+ # terminate the process
+ w.keyboardCaught.clear()
+ startkilltime = time.time()
+ while not w.keyboardCaught.is_set() and w.is_alive():
+ if time.time()-startkilltime > self.waitkilltime:
+ # have to terminate...
+ log.error("terminating worker %s",iworker)
+ w.terminate()
+ # there is a small probability that the
+ # terminated process might send a result,
+ # which has to be specially handled or
+ # else processes might get orphaned.
+ workers[iworker] = w = self.startProcess(iworker, testQueue, resultQueue, shouldStop, result)
+ break
+ os.kill(w.pid, signal.SIGILL)
+ time.sleep(0.1)
+ if not any_alive and testQueue.empty():
+ log.debug("All workers dead")
+ break
+ nexttimeout=self.config.multiprocess_timeout
+ for w in workers:
+ if w.is_alive() and len(w.currentaddr.value) > 0:
+ timeprocessing = time.time()-w.currentstart.value
+ if timeprocessing <= self.config.multiprocess_timeout:
+ nexttimeout = min(nexttimeout,
+ self.config.multiprocess_timeout-timeprocessing)
+ log.debug("Completed %s tasks (%s remain)", len(completed), len(tasks))
+
+ except (KeyboardInterrupt, SystemExit), e:
+ log.info('parent received ctrl-c when waiting for test results')
+ thrownError = e
+ #resultQueue.get(False)
+
+ result.addError(test, sys.exc_info())
+
+ try:
+ for case in to_teardown:
+ log.debug("Tearing down shared fixtures for %s", case)
+ try:
+ case.tearDown()
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ except:
+ result.addError(case, sys.exc_info())
- stop = time.time()
+ stop = time.time()
- # first write since can freeze on shutting down processes
- result.printErrors()
- result.printSummary(start, stop)
- self.config.plugins.finalize(result)
+ # first write since can freeze on shutting down processes
+ result.printErrors()
+ result.printSummary(start, stop)
+ self.config.plugins.finalize(result)
- log.debug("Tell all workers to stop")
- for w in workers:
- if w.is_alive():
- testQueue.put('STOP', block=False)
+ if thrownError is None:
+ log.debug("Tell all workers to stop")
+ for w in workers:
+ if w.is_alive():
+ testQueue.put('STOP', block=False)
- # wait for the workers to end
- try:
+ # wait for the workers to end
for iworker,worker in enumerate(workers):
if worker.is_alive():
log.debug('joining worker %s',iworker)
- worker.join()#10)
+ worker.join()
if worker.is_alive():
log.debug('failed to join worker %s',iworker)
- except KeyboardInterrupt:
- log.info('parent received ctrl-c')
+ except (KeyboardInterrupt, SystemExit):
+ log.info('parent received ctrl-c when shutting down: stop all processes')
for worker in workers:
- worker.terminate()
- worker.join()
+ if worker.is_alive():
+ worker.terminate()
+ if thrownError: raise thrownError
+ else: raise
return result
@@ -573,7 +583,7 @@ class MultiProcessTestRunner(TextTestRunner):
for batch in self.nextBatch(case):
yield batch
- def checkCanSplit(self, context, fixt):
+ def checkCanSplit(context, fixt):
"""
Callback that we use to check whether the fixtures found in a
context or ancestor are ones we care about.
@@ -587,6 +597,7 @@ class MultiProcessTestRunner(TextTestRunner):
if getattr(context, '_multiprocess_can_split_', False):
return False
return True
+ checkCanSplit = staticmethod(checkCanSplit)
def sharedFixtures(self, case):
context = getattr(case, 'context', None)
@@ -688,17 +699,28 @@ def __runner(ix, testQueue, resultQueue, currentaddr, currentstart,
test(result)
currentaddr.value = bytes_('')
resultQueue.put((ix, test_addr, test.tasks, batch(result)))
- except KeyboardInterrupt:
- keyboardCaught.set()
- if len(currentaddr.value) > 0:
- log.exception('Worker %s keyboard interrupt, failing '
- 'current test %s',ix,test_addr)
+ except KeyboardInterrupt, e: #TimedOutException:
+ timeout = isinstance(e, TimedOutException)
+ if timeout:
+ keyboardCaught.set()
+ if len(currentaddr.value):
+ if timeout:
+ msg = 'Worker %s timed out, failing current test %s'
+ else:
+ msg = 'Worker %s keyboard interrupt, failing current test %s'
+ log.exception(msg,ix,test_addr)
currentaddr.value = bytes_('')
failure.Failure(*sys.exc_info())(result)
resultQueue.put((ix, test_addr, test.tasks, batch(result)))
else:
- log.debug('Worker %s test %s timed out',ix,test_addr)
+ if timeout:
+ msg = 'Worker %s test %s timed out'
+ else:
+ msg = 'Worker %s test %s keyboard interrupt'
+ log.debug(msg,ix,test_addr)
resultQueue.put((ix, test_addr, test.tasks, batch(result)))
+ if not timeout:
+ raise
except SystemExit:
currentaddr.value = bytes_('')
log.exception('Worker %s system exit',ix)
@@ -746,6 +768,7 @@ class NoSharedFixtureContextSuite(ContextSuite):
else:
result, orig = result, result
try:
+ #log.debug('setUp for %s', id(self));
self.setUp()
except KeyboardInterrupt:
raise
@@ -754,42 +777,40 @@ class NoSharedFixtureContextSuite(ContextSuite):
result.addError(self, self._exc_info())
return
try:
- localtests = [test for test in self._tests]
- if len(localtests) > 1 and self.testQueue is not None:
- log.debug("queue %d tests"%len(localtests))
- for test in localtests:
- if isinstance(test.test,nose.failure.Failure):
- # proably failed in the generator, so execute directly
- # to get the exception
- test(orig)
- else:
- MultiProcessTestRunner.addtask(self.testQueue,
- self.tasks, test)
- else:
- for test in localtests:
- if (isinstance(test,nose.case.Test)
- and self.arg is not None):
- test.test.arg = self.arg
+ for test in self._tests:
+ if (isinstance(test,nose.case.Test)
+ and self.arg is not None):
+ test.test.arg = self.arg
+ else:
+ test.arg = self.arg
+ test.testQueue = self.testQueue
+ test.tasks = self.tasks
+ if result.shouldStop:
+ log.debug("stopping")
+ break
+ # each nose.case.Test will create its own result proxy
+ # so the cases need the original result, to avoid proxy
+ # chains
+ #log.debug('running test %s in suite %s', test, self);
+ try:
+ test(orig)
+ except KeyboardInterrupt, e:
+ timeout = isinstance(e, TimedOutException)
+ if timeout:
+ msg = 'Timeout when running test %s in suite %s'
else:
- test.arg = self.arg
- test.testQueue = self.testQueue
- test.tasks = self.tasks
- if result.shouldStop:
- log.debug("stopping")
- break
- # each nose.case.Test will create its own result proxy
- # so the cases need the original result, to avoid proxy
- # chains
- try:
- test(orig)
- except KeyboardInterrupt,e:
- err = (TimedOutException,TimedOutException(str(test)),
- sys.exc_info()[2])
- test.config.plugins.addError(test,err)
- orig.addError(test,err)
+ msg = 'KeyboardInterrupt when running test %s in suite %s'
+ log.debug(msg, test, self)
+ err = (TimedOutException,TimedOutException(str(test)),
+ sys.exc_info()[2])
+ test.config.plugins.addError(test,err)
+ orig.addError(test,err)
+ if not timeout:
+ raise
finally:
self.has_run = True
try:
+ #log.debug('tearDown for %s', id(self));
self.tearDown()
except KeyboardInterrupt:
raise
diff --git a/nose/plugins/plugintest.py b/nose/plugins/plugintest.py
index 68e8941..9eaebb4 100644
--- a/nose/plugins/plugintest.py
+++ b/nose/plugins/plugintest.py
@@ -3,8 +3,8 @@ Testing Plugins
===============
The plugin interface is well-tested enough to safely unit test your
-use of its hooks with some level of confidence. However, there is also
-a mixin for unittest.TestCase called PluginTester that's designed to
+use of its hooks with some level of confidence. However, there is also
+a mixin for unittest.TestCase called PluginTester that's designed to
test plugins in their native runtime environment.
Here's a simple example with a do-nothing plugin and a composed suite.
@@ -20,7 +20,7 @@ Here's a simple example with a do-nothing plugin and a composed suite.
... for line in self.output:
... # i.e. check for patterns
... pass
- ...
+ ...
... # or check for a line containing ...
... assert "ValueError" in self.output
... def makeSuite(self):
@@ -43,7 +43,7 @@ Here's a simple example with a do-nothing plugin and a composed suite.
And here is a more complex example of testing a plugin that has extra
arguments and reads environment variables.
-
+
>>> import unittest, os
>>> from nose.plugins import Plugin, PluginTester
>>> class FancyOutputter(Plugin):
@@ -57,21 +57,21 @@ arguments and reads environment variables.
... self.fanciness = 2
... if 'EVEN_FANCIER' in self.env:
... self.fanciness = 3
- ...
+ ...
... def options(self, parser, env=os.environ):
... self.env = env
... parser.add_option('--more-fancy', action='store_true')
... Plugin.options(self, parser, env=env)
- ...
+ ...
... def report(self, stream):
... stream.write("FANCY " * self.fanciness)
- ...
+ ...
>>> class TestFancyOutputter(PluginTester, unittest.TestCase):
... activate = '--with-fancy' # enables the plugin
... plugins = [FancyOutputter()]
... args = ['--more-fancy']
... env = {'EVEN_FANCIER': '1'}
- ...
+ ...
... def test_fancy_output(self):
... assert "FANCY FANCY FANCY" in self.output, (
... "got: %s" % self.output)
@@ -80,7 +80,7 @@ arguments and reads environment variables.
... def runTest(self):
... raise ValueError("I hate fancy stuff")
... return unittest.TestSuite([TC()])
- ...
+ ...
>>> res = unittest.TestResult()
>>> case = TestFancyOutputter('test_fancy_output')
>>> case(res)
@@ -169,7 +169,7 @@ class MultiProcessFile(object):
return self.__buffer.getvalue()
def __getattr__(self, attr):
return getattr(self.__buffer, attr)
-
+
try:
from multiprocessing import Manager
Buffer = MultiProcessFile
@@ -178,35 +178,35 @@ except ImportError:
class PluginTester(object):
"""A mixin for testing nose plugins in their runtime environment.
-
- Subclass this and mix in unittest.TestCase to run integration/functional
- tests on your plugin. When setUp() is called, the stub test suite is
- executed with your plugin so that during an actual test you can inspect the
+
+ Subclass this and mix in unittest.TestCase to run integration/functional
+ tests on your plugin. When setUp() is called, the stub test suite is
+ executed with your plugin so that during an actual test you can inspect the
artifacts of how your plugin interacted with the stub test suite.
-
+
- activate
-
+
- the argument to send nosetests to activate the plugin
-
+
- suitepath
-
+
- if set, this is the path of the suite to test. Otherwise, you
will need to use the hook, makeSuite()
-
+
- plugins
- the list of plugins to make available during the run. Note
that this does not mean these plugins will be *enabled* during
the run -- only the plugins enabled by the activate argument
or other settings in argv or env will be enabled.
-
+
- args
-
+
- a list of arguments to add to the nosetests command, in addition to
the activate argument
-
+
- env
-
+
- optional dict of environment variables to send nosetests
"""
@@ -217,34 +217,34 @@ class PluginTester(object):
argv = None
plugins = []
ignoreFiles = None
-
+
def makeSuite(self):
"""returns a suite object of tests to run (unittest.TestSuite())
-
- If self.suitepath is None, this must be implemented. The returned suite
- object will be executed with all plugins activated. It may return
+
+ If self.suitepath is None, this must be implemented. The returned suite
+ object will be executed with all plugins activated. It may return
None.
-
+
Here is an example of a basic suite object you can return ::
-
+
>>> import unittest
>>> class SomeTest(unittest.TestCase):
... def runTest(self):
... raise ValueError("Now do something, plugin!")
- ...
+ ...
>>> unittest.TestSuite([SomeTest()]) # doctest: +ELLIPSIS
<unittest...TestSuite tests=[<...SomeTest testMethod=runTest>]>
-
+
"""
raise NotImplementedError
-
+
def _execPlugin(self):
"""execute the plugin on the internal test suite.
"""
from nose.config import Config
from nose.core import TestProgram
from nose.plugins.manager import PluginManager
-
+
suite = None
stream = Buffer()
conf = Config(env=self.env,
@@ -254,20 +254,20 @@ class PluginTester(object):
conf.ignoreFiles = self.ignoreFiles
if not self.suitepath:
suite = self.makeSuite()
-
+
self.nose = TestProgram(argv=self.argv, config=conf, suite=suite,
exit=False)
self.output = AccessDecorator(stream)
-
+
def setUp(self):
- """runs nosetests with the specified test suite, all plugins
+ """runs nosetests with the specified test suite, all plugins
activated.
"""
self.argv = ['nosetests', self.activate]
if self.args:
self.argv.extend(self.args)
if self.suitepath:
- self.argv.append(self.suitepath)
+ self.argv.append(self.suitepath)
self._execPlugin()
@@ -379,7 +379,7 @@ def run(*arg, **kw):
if 'argv' not in kw:
kw['argv'] = ['nosetests', '-v']
kw['config'].stream = buffer
-
+
# Set up buffering so that all output goes to our buffer,
# or warn user if deprecated behavior is active. If this is not
# done, prints and warnings will either be out of place or
@@ -406,7 +406,7 @@ def run(*arg, **kw):
out = buffer.getvalue()
print munge_nose_output_for_doctest(out)
-
+
def run_buffered(*arg, **kw):
kw['buffer_all'] = True
run(*arg, **kw)
diff --git a/nosetests.1 b/nosetests.1
index 36982b7..322e2c3 100644
--- a/nosetests.1
+++ b/nosetests.1
@@ -491,5 +491,5 @@ jpellerin+nose@gmail.com
.SH COPYRIGHT
LGPL
-.\" Generated by docutils manpage writer on 2011-10-10 09:32.
+.\" Generated by docutils manpage writer on 2011-11-19 18:57.
.\"