summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkumar <kumar.mcmillan@gmail.com>2011-11-11 11:53:23 -0600
committerkumar <kumar.mcmillan@gmail.com>2011-11-11 11:53:23 -0600
commit38a976ccaeef112f988058ebbac7833f7bcecfa1 (patch)
treefa2f8546e4923a5a197ced166b6ffb0a8efbeff0
parentc3a9d056a88ffd05330d702192434dae7b23e385 (diff)
parent0f3ff413655950a3de8507f41575aec025e4b234 (diff)
downloadnose-38a976ccaeef112f988058ebbac7833f7bcecfa1.tar.gz
Merged
-rw-r--r--CHANGELOG3
-rw-r--r--README.txt16
-rw-r--r--functional_tests/test_multiprocessing/__init__.py28
-rw-r--r--functional_tests/test_multiprocessing/support/class.py14
-rw-r--r--functional_tests/test_multiprocessing/support/concurrent_shared/__init__.py6
-rw-r--r--functional_tests/test_multiprocessing/support/concurrent_shared/test.py11
-rw-r--r--functional_tests/test_multiprocessing/support/fake_nosetest.py14
-rw-r--r--functional_tests/test_multiprocessing/support/keyboardinterrupt.py29
-rw-r--r--functional_tests/test_multiprocessing/support/keyboardinterrupt_twice.py34
-rw-r--r--functional_tests/test_multiprocessing/support/timeout.py6
-rw-r--r--functional_tests/test_multiprocessing/test_class.py13
-rw-r--r--functional_tests/test_multiprocessing/test_concurrent_shared.py13
-rw-r--r--functional_tests/test_multiprocessing/test_keyboardinterrupt.py84
-rw-r--r--functional_tests/test_multiprocessing/test_nameerror.py26
-rw-r--r--functional_tests/test_multiprocessing/test_process_timeout.py30
-rw-r--r--nose/plugins/cover.py16
-rw-r--r--nose/plugins/multiprocess.py483
-rw-r--r--nosetests.117
18 files changed, 564 insertions, 279 deletions
diff --git a/CHANGELOG b/CHANGELOG
index 31f3a91..abb73c3 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -7,9 +7,12 @@
1.1.3
+- Adds :option:`--cover-xml` and :option:`--cover-xml-file` (#311).
+ Patch by Timothée Peignier.
- Adds support for :option:`--cover-branches` (related to #370).
Patch by Timothée Peignier.
- Fixed Unicode issue on Python 3.1 with coverage (#442)
+- fixed class level fixture handling in multiprocessing plugin
1.1.2
diff --git a/README.txt b/README.txt
index 1b578da..f8a057f 100644
--- a/README.txt
+++ b/README.txt
@@ -107,7 +107,7 @@ In addition to passing command-line options, you may also put
configuration options in your project's *setup.cfg* file, or a .noserc
or nose.cfg file in your home directory. In any of these standard
.ini-style config files, you put your nosetests configuration in a
-``[nosetests]`` section. Options are the same as on the command line,
+"[nosetests]" section. Options are the same as on the command line,
with the -- prefix removed. For options that are simple switches, you
must supply a value:
@@ -117,7 +117,7 @@ must supply a value:
All configuration files that are found will be loaded and their
options combined. You can override the standard config file loading
-with the ``-c`` option.
+with the "-c" option.
Using Plugins
@@ -353,6 +353,18 @@ Options
Produce HTML coverage information in dir
+--cover-branches
+
+ Include branch coverage in coverage report [NOSE_COVER_BRANCHES]
+
+--cover-xml
+
+ Produce XML coverage information
+
+--cover-xml-file=FILE
+
+ Produce XML coverage information in file
+
--pdb
Drop into debugger on errors
diff --git a/functional_tests/test_multiprocessing/__init__.py b/functional_tests/test_multiprocessing/__init__.py
new file mode 100644
index 0000000..2a52efd
--- /dev/null
+++ b/functional_tests/test_multiprocessing/__init__.py
@@ -0,0 +1,28 @@
+import os
+from unittest import TestCase
+
+from nose.plugins import PluginTester
+from nose.plugins.skip import SkipTest
+from nose.plugins.multiprocess import MultiProcess
+
+support = os.path.join(os.path.dirname(__file__), 'support')
+
+def setup():
+ try:
+ import multiprocessing
+ if 'active' in MultiProcess.status:
+ raise SkipTest("Multiprocess plugin is active. Skipping tests of "
+ "plugin itself.")
+ except ImportError:
+ raise SkipTest("multiprocessing module not available")
+
+class MPTestBase(PluginTester, TestCase):
+ processes = 1
+ activate = '--processes=1'
+ plugins = [MultiProcess()]
+ suitepath = os.path.join(support, 'timeout.py')
+
+ def __init__(self, *args, **kwargs):
+ self.activate = '--processes=%d' % self.processes
+ PluginTester.__init__(self)
+ TestCase.__init__(self, *args, **kwargs)
diff --git a/functional_tests/test_multiprocessing/support/class.py b/functional_tests/test_multiprocessing/support/class.py
new file mode 100644
index 0000000..905bcdf
--- /dev/null
+++ b/functional_tests/test_multiprocessing/support/class.py
@@ -0,0 +1,14 @@
+class TestFunctionalTest(object):
+ counter = 0
+ @classmethod
+ def setup_class(cls):
+ cls.counter += 1
+ @classmethod
+ def teardown_class(cls):
+ cls.counter -= 1
+ def _run(self):
+ assert self.counter==1
+ def test1(self):
+ self._run()
+ def test2(self):
+ self._run()
diff --git a/functional_tests/test_multiprocessing/support/concurrent_shared/__init__.py b/functional_tests/test_multiprocessing/support/concurrent_shared/__init__.py
new file mode 100644
index 0000000..c557001
--- /dev/null
+++ b/functional_tests/test_multiprocessing/support/concurrent_shared/__init__.py
@@ -0,0 +1,6 @@
+counter=[0]
+_multiprocess_shared_ = True
+def setup_package():
+ counter[0] += 1
+def teardown_package():
+ counter[0] -= 1
diff --git a/functional_tests/test_multiprocessing/support/concurrent_shared/test.py b/functional_tests/test_multiprocessing/support/concurrent_shared/test.py
new file mode 100644
index 0000000..5bc21f6
--- /dev/null
+++ b/functional_tests/test_multiprocessing/support/concurrent_shared/test.py
@@ -0,0 +1,11 @@
+#from . import counter
+from time import sleep
+#_multiprocess_can_split_ = True
+class Test1(object):
+ def test1(self):
+ sleep(1)
+ pass
+class Test2(object):
+ def test2(self):
+ sleep(1)
+ pass
diff --git a/functional_tests/test_multiprocessing/support/fake_nosetest.py b/functional_tests/test_multiprocessing/support/fake_nosetest.py
new file mode 100644
index 0000000..f5a6289
--- /dev/null
+++ b/functional_tests/test_multiprocessing/support/fake_nosetest.py
@@ -0,0 +1,14 @@
+import os
+import sys
+
+import nose
+from nose.plugins.multiprocess import MultiProcess
+from nose.config import Config
+from nose.plugins.manager import PluginManager
+
+if __name__ == '__main__':
+ if len(sys.argv) < 3:
+ print "USAGE: %s TEST_FILE LOG_FILE" % sys.argv[0]
+ sys.exit(1)
+ os.environ['NOSE_MP_LOG']=sys.argv[2]
+ nose.main(defaultTest=sys.argv[1], argv=[sys.argv[0],'--processes=1','-v'], config=Config(plugins=PluginManager(plugins=[MultiProcess()])))
diff --git a/functional_tests/test_multiprocessing/support/keyboardinterrupt.py b/functional_tests/test_multiprocessing/support/keyboardinterrupt.py
new file mode 100644
index 0000000..2c36d95
--- /dev/null
+++ b/functional_tests/test_multiprocessing/support/keyboardinterrupt.py
@@ -0,0 +1,29 @@
+import os
+
+from tempfile import mktemp
+from time import sleep
+
+if 'NOSE_MP_LOG' not in os.environ:
+ raise Exception('Environment variable NOSE_MP_LOG is not set')
+
+logfile = os.environ['NOSE_MP_LOG']
+
+def log(w):
+ f = open(logfile, 'a')
+ f.write(w+"\n")
+ f.close()
+#make sure all tests in this file are dispatched to the same subprocess
+def setup():
+ log('setup')
+
+def test_timeout():
+ log('test_timeout')
+ sleep(2)
+ log('test_timeout_finished')
+
+# check timeout will not prevent remaining tests dispatched to the same subprocess to continue to run
+def test_pass():
+ log('test_pass')
+
+def teardown():
+ log('teardown')
diff --git a/functional_tests/test_multiprocessing/support/keyboardinterrupt_twice.py b/functional_tests/test_multiprocessing/support/keyboardinterrupt_twice.py
new file mode 100644
index 0000000..3932bbd
--- /dev/null
+++ b/functional_tests/test_multiprocessing/support/keyboardinterrupt_twice.py
@@ -0,0 +1,34 @@
+import os
+
+from tempfile import mktemp
+from time import sleep
+
+if 'NOSE_MP_LOG' not in os.environ:
+ raise Exception('Environment variable NOSE_MP_LOG is not set')
+
+logfile = os.environ['NOSE_MP_LOG']
+
+def log(w):
+ f = open(logfile, 'a')
+ f.write(w+"\n")
+ f.close()
+#make sure all tests in this file are dispatched to the same subprocess
+def setup():
+ '''global logfile
+ logfile = mktemp()
+ print "tempfile is:",logfile'''
+ log('setup')
+
+def test_timeout():
+ log('test_timeout')
+ sleep(2)
+ log('test_timeout_finished')
+
+# check timeout will not prevent remaining tests dispatched to the same subprocess to continue to run
+def test_pass():
+ log('test_pass')
+
+def teardown():
+ log('teardown')
+ sleep(10)
+ log('teardown_finished')
diff --git a/functional_tests/test_multiprocessing/support/timeout.py b/functional_tests/test_multiprocessing/support/timeout.py
index 52dce12..480c859 100644
--- a/functional_tests/test_multiprocessing/support/timeout.py
+++ b/functional_tests/test_multiprocessing/support/timeout.py
@@ -1,6 +1,12 @@
+#make sure all tests in this file are dispatched to the same subprocess
+def setup():
+ pass
def test_timeout():
"this test *should* fail when process-timeout=1"
from time import sleep
sleep(2)
+# check timeout will not prevent remaining tests dispatched to the same subprocess to continue to run
+def test_pass():
+ pass
diff --git a/functional_tests/test_multiprocessing/test_class.py b/functional_tests/test_multiprocessing/test_class.py
new file mode 100644
index 0000000..d92710d
--- /dev/null
+++ b/functional_tests/test_multiprocessing/test_class.py
@@ -0,0 +1,13 @@
+import os
+
+from test_multiprocessing import MPTestBase
+
+
+#test case for #462
+class TestClassFixture(MPTestBase):
+ suitepath = os.path.join(os.path.dirname(__file__), 'support', 'class.py')
+
+ def runTest(self):
+ assert str(self.output).strip().endswith('OK')
+ assert 'Ran 2 tests' in self.output
+
diff --git a/functional_tests/test_multiprocessing/test_concurrent_shared.py b/functional_tests/test_multiprocessing/test_concurrent_shared.py
new file mode 100644
index 0000000..2552c2b
--- /dev/null
+++ b/functional_tests/test_multiprocessing/test_concurrent_shared.py
@@ -0,0 +1,13 @@
+import os
+
+from test_multiprocessing import MPTestBase
+
+class TestConcurrentShared(MPTestBase):
+ processes = 2
+ suitepath = os.path.join(os.path.dirname(__file__), 'support',
+ 'concurrent_shared')
+
+ def runTest(self):
+ assert 'Ran 2 tests in 1.' in self.output, "make sure two tests use 1.x seconds (no more than 2 seconsd)"
+ assert str(self.output).strip().endswith('OK')
+
diff --git a/functional_tests/test_multiprocessing/test_keyboardinterrupt.py b/functional_tests/test_multiprocessing/test_keyboardinterrupt.py
new file mode 100644
index 0000000..8f07e54
--- /dev/null
+++ b/functional_tests/test_multiprocessing/test_keyboardinterrupt.py
@@ -0,0 +1,84 @@
+from subprocess import Popen,PIPE
+import os
+import sys
+from time import sleep
+import signal
+
+import nose
+
+support = os.path.join(os.path.dirname(__file__), 'support')
+
+PYTHONPATH = os.environ['PYTHONPATH'] if 'PYTHONPATH' in os.environ else ''
+def setup():
+ nose_parent_dir = os.path.normpath(os.path.join(os.path.abspath(os.path.dirname(nose.__file__)),'..'))
+ paths = [nose_parent_dir]
+ if PYTHONPATH:
+ paths.append(PYTHONPATH)
+ os.environ['PYTHONPATH'] = os.pathsep.join(paths)
+def teardown():
+ if PYTHONPATH:
+ os.environ['PYTHONPATH'] = PYTHONPATH
+ else:
+ del os.environ['PYTHONPATH']
+
+runner = os.path.join(support, 'fake_nosetest.py')
+def keyboardinterrupt(case):
+ #os.setsid would create a process group so signals sent to the
+ #parent process will propogates to all children processes
+ from tempfile import mktemp
+ logfile = mktemp()
+ process = Popen([sys.executable,runner,os.path.join(support,case),logfile], preexec_fn=os.setsid, stdout=PIPE, stderr=PIPE, bufsize=-1)
+
+ #wait until logfile is created:
+ retry=100
+ while not os.path.exists(logfile):
+ sleep(0.1)
+ retry -= 1
+ if not retry:
+ raise Exception('Timeout while waiting for log file to be created by fake_nosetest.py')
+
+ os.killpg(process.pid, signal.SIGINT)
+ return process, logfile
+
+def get_log_content(logfile):
+ '''prefix = 'tempfile is: '
+ if not stdout.startswith(prefix):
+ raise Exception('stdout does not contain tmp file name: '+stdout)
+ logfile = stdout[len(prefix):].strip() #remove trailing new line char'''
+ f = open(logfile)
+ content = f.read()
+ f.close()
+ os.remove(logfile)
+ return content
+
+def test_keyboardinterrupt():
+ process, logfile = keyboardinterrupt('keyboardinterrupt.py')
+ stdout, stderr = [s.decode('utf-8') for s in process.communicate(None)]
+ print stderr
+ log = get_log_content(logfile)
+ assert 'setup' in log
+ assert 'test_timeout' in log
+ assert 'test_timeout_finished' not in log
+ assert 'test_pass' not in log
+ assert 'teardown' in log
+ assert 'Ran 0 tests' in stderr
+ assert 'KeyboardInterrupt' in stderr
+ assert 'FAILED (errors=1)' in stderr
+ assert 'ERROR: Worker 0 keyboard interrupt, failing current test '+os.path.join(support,'keyboardinterrupt.py') in stderr
+
+
+def test_keyboardinterrupt_twice():
+ process, logfile = keyboardinterrupt('keyboardinterrupt_twice.py')
+ sleep(0.5)
+ os.killpg(process.pid, signal.SIGINT)
+ stdout, stderr = [s.decode('utf-8') for s in process.communicate(None)]
+ log = get_log_content(logfile)
+ assert 'setup' in log
+ assert 'test_timeout' in log
+ assert 'test_timeout_finished' not in log
+ assert 'test_pass' not in log
+ assert 'teardown' in log
+ assert 'teardown_finished' not in log
+ assert 'Ran 0 tests' in stderr
+ assert 'KeyboardInterrupt' in stderr
+ assert 'FAILED (errors=1)' in stderr
diff --git a/functional_tests/test_multiprocessing/test_nameerror.py b/functional_tests/test_multiprocessing/test_nameerror.py
index f73d02b..5e58226 100644
--- a/functional_tests/test_multiprocessing/test_nameerror.py
+++ b/functional_tests/test_multiprocessing/test_nameerror.py
@@ -1,28 +1,10 @@
import os
-import unittest
-from nose.plugins import PluginTester
-from nose.plugins.skip import SkipTest
-from nose.plugins.multiprocess import MultiProcess
+from test_multiprocessing import support, MPTestBase
-
-support = os.path.join(os.path.dirname(__file__), 'support')
-
-
-def setup():
- try:
- import multiprocessing
- if 'active' in MultiProcess.status:
- raise SkipTest("Multiprocess plugin is active. Skipping tests of "
- "plugin itself.")
- except ImportError:
- raise SkipTest("multiprocessing module not available")
-
-
-class TestMPNameError(PluginTester, unittest.TestCase):
- activate = '--processes=2'
- plugins = [MultiProcess()]
- suitepath = os.path.join(support, 'nameerror.py')
+class TestMPNameError(MPTestBase):
+ processes = 2
+ suitepath = os.path.join(os.path.dirname(__file__), 'support', 'nameerror.py')
def runTest(self):
print str(self.output)
diff --git a/functional_tests/test_multiprocessing/test_process_timeout.py b/functional_tests/test_multiprocessing/test_process_timeout.py
index 535ecdb..6b858f8 100644
--- a/functional_tests/test_multiprocessing/test_process_timeout.py
+++ b/functional_tests/test_multiprocessing/test_process_timeout.py
@@ -1,37 +1,21 @@
import os
-import unittest
-from nose.plugins import PluginTester
-from nose.plugins.skip import SkipTest
-from nose.plugins.multiprocess import MultiProcess
+from test_multiprocessing import MPTestBase
-support = os.path.join(os.path.dirname(__file__), 'support')
-
-
-def setup():
- try:
- import multiprocessing
- if 'active' in MultiProcess.status:
- raise SkipTest("Multiprocess plugin is active. Skipping tests of "
- "plugin itself.")
- except ImportError:
- raise SkipTest("multiprocessing module not available")
-
-
-
-class TestMPTimeout(PluginTester, unittest.TestCase):
- activate = '--processes=2'
+class TestMPTimeout(MPTestBase):
args = ['--process-timeout=1']
- plugins = [MultiProcess()]
- suitepath = os.path.join(support, 'timeout.py')
+ suitepath = os.path.join(os.path.dirname(__file__), 'support', 'timeout.py')
def runTest(self):
assert "TimedOutException: 'timeout.test_timeout'" in self.output
-
+ assert "Ran 2 tests in" in self.output
+ assert "FAILED (errors=1)" in self.output
class TestMPTimeoutPass(TestMPTimeout):
args = ['--process-timeout=3']
def runTest(self):
assert "TimedOutException: 'timeout.test_timeout'" not in self.output
+ assert "Ran 2 tests in" in self.output
assert str(self.output).strip().endswith('OK')
+
diff --git a/nose/plugins/cover.py b/nose/plugins/cover.py
index 8be1fe2..a055c92 100644
--- a/nose/plugins/cover.py
+++ b/nose/plugins/cover.py
@@ -113,6 +113,15 @@ class Coverage(Plugin):
dest="cover_branches",
help="Include branch coverage in coverage report "
"[NOSE_COVER_BRANCHES]")
+ parser.add_option("--cover-xml", action="store_true",
+ default=env.get('NOSE_COVER_XML'),
+ dest="cover_xml",
+ help="Produce XML coverage information")
+ parser.add_option("--cover-xml-file", action="store",
+ default=env.get('NOSE_COVER_XML_FILE', 'coverage.xml'),
+ dest="cover_xml_file",
+ metavar="FILE",
+ help="Produce XML coverage information in file")
def configure(self, options, config):
"""
@@ -149,6 +158,10 @@ class Coverage(Plugin):
self.coverHtmlDir = options.cover_html_dir
log.debug('Will put HTML coverage report in %s', self.coverHtmlDir)
self.coverBranches = options.cover_branches
+ self.coverXmlFile = None
+ if options.cover_xml:
+ self.coverXmlFile = options.cover_xml_file
+ log.debug('Will put XML coverage report in %s', self.coverXmlFile)
if self.enabled:
self.status['active'] = True
@@ -182,6 +195,9 @@ class Coverage(Plugin):
self.coverInstance.html_report(modules, self.coverHtmlDir)
else:
self.report_html(modules)
+ if self.coverXmlFile:
+ log.debug("Generating XML coverage report")
+ self.coverInstance.xml_report(modules, self.coverXmlFile)
def report_html(self, modules):
if not os.path.exists(self.coverHtmlDir):
diff --git a/nose/plugins/multiprocess.py b/nose/plugins/multiprocess.py
index 260cbf8..48ba54d 100644
--- a/nose/plugins/multiprocess.py
+++ b/nose/plugins/multiprocess.py
@@ -130,7 +130,8 @@ log = logging.getLogger(__name__)
Process = Queue = Pool = Event = Value = Array = None
-class TimedOutException(Exception):
+# have to inherit KeyboardInterrupt to it will interrupt process properly
+class TimedOutException(KeyboardInterrupt):
def __init__(self, value = "Timed Out"):
self.value = value
def __str__(self):
@@ -140,7 +141,16 @@ def _import_mp():
global Process, Queue, Pool, Event, Value, Array
try:
from multiprocessing import Manager, Process
+ #prevent the server process created in the manager which holds Python
+ #objects and allows other processes to manipulate them using proxies
+ #to interrupt on SIGINT (keyboardinterrupt) so that the communication
+ #channel between subprocesses and main process is still usable after
+ #ctrl+C is received in the main process.
+ old=signal.signal(signal.SIGINT, signal.SIG_IGN)
m = Manager()
+ #reset it back so main process will receive a KeyboardInterrupt
+ #exception on ctrl+c
+ signal.signal(signal.SIGINT, old)
Queue, Pool, Event, Value, Array = (
m.Queue, m.Pool, m.Event, m.Value, m.Array
)
@@ -247,42 +257,17 @@ class MultiProcess(Plugin):
config=self.config,
loaderClass=self.loaderClass)
+def signalhandler(sig, frame):
+ raise TimedOutException()
+
class MultiProcessTestRunner(TextTestRunner):
waitkilltime = 5.0 # max time to wait to terminate a process that does not
- # respond to SIGINT
+ # respond to SIGILL
def __init__(self, **kw):
self.loaderClass = kw.pop('loaderClass', loader.defaultTestLoader)
super(MultiProcessTestRunner, self).__init__(**kw)
- def run(self, test):
- """
- Execute the test (which may be a test suite). If the test is a suite,
- distribute it out among as many processes as have been configured, at
- as fine a level as is possible given the context fixtures defined in
- the suite or any sub-suites.
-
- """
- log.debug("%s.run(%s) (%s)", self, test, os.getpid())
- wrapper = self.config.plugins.prepareTest(test)
- if wrapper is not None:
- test = wrapper
-
- # plugins can decorate or capture the output stream
- wrapped = self.config.plugins.setOutputStream(self.stream)
- if wrapped is not None:
- self.stream = wrapped
-
- testQueue = Queue()
- resultQueue = Queue()
- tasks = []
- completed = []
- workers = []
- to_teardown = []
- shouldStop = Event()
-
- result = self._makeResult()
- start = time.time()
-
+ def collect(self, test, testQueue, tasks, to_teardown, result):
# dispatch and collect results
# put indexes only on queue because tests aren't picklable
for case in self.nextBatch(test):
@@ -308,32 +293,76 @@ class MultiProcessTestRunner(TextTestRunner):
result.addError(case, sys.exc_info())
else:
to_teardown.append(case)
- for _t in case:
- test_addr = self.addtask(testQueue,tasks,_t)
- log.debug("Queued shared-fixture test %s (%s) to %s",
- len(tasks), test_addr, testQueue)
+ if case.factory:
+ ancestors=case.factory.context.get(case, [])
+ for an in ancestors[:2]:
+ #log.debug('reset ancestor %s', an)
+ if getattr(an, '_multiprocess_shared_', False):
+ an._multiprocess_can_split_=True
+ #an._multiprocess_shared_=False
+ self.collect(case, testQueue, tasks, to_teardown, result)
else:
test_addr = self.addtask(testQueue,tasks,case)
log.debug("Queued test %s (%s) to %s",
len(tasks), test_addr, testQueue)
+ def startProcess(self, iworker, testQueue, resultQueue, shouldStop, result):
+ currentaddr = Value('c',bytes_(''))
+ currentstart = Value('d',time.time())
+ keyboardCaught = Event()
+ p = Process(target=runner,
+ args=(iworker, testQueue,
+ resultQueue,
+ currentaddr,
+ currentstart,
+ keyboardCaught,
+ shouldStop,
+ self.loaderClass,
+ result.__class__,
+ pickle.dumps(self.config)))
+ p.currentaddr = currentaddr
+ p.currentstart = currentstart
+ p.keyboardCaught = keyboardCaught
+ old = signal.signal(signal.SIGILL, signalhandler)
+ p.start()
+ signal.signal(signal.SIGILL, old)
+ return p
+
+ def run(self, test):
+ """
+ Execute the test (which may be a test suite). If the test is a suite,
+ distribute it out among as many processes as have been configured, at
+ as fine a level as is possible given the context fixtures defined in
+ the suite or any sub-suites.
+
+ """
+ log.debug("%s.run(%s) (%s)", self, test, os.getpid())
+ wrapper = self.config.plugins.prepareTest(test)
+ if wrapper is not None:
+ test = wrapper
+
+ # plugins can decorate or capture the output stream
+ wrapped = self.config.plugins.setOutputStream(self.stream)
+ if wrapped is not None:
+ self.stream = wrapped
+
+ testQueue = Queue()
+ resultQueue = Queue()
+ tasks = []
+ completed = []
+ workers = []
+ to_teardown = []
+ shouldStop = Event()
+
+ result = self._makeResult()
+ start = time.time()
+
+ self.collect(test, testQueue, tasks, to_teardown, result)
+
log.debug("Starting %s workers", self.config.multiprocess_workers)
for i in range(self.config.multiprocess_workers):
- currentaddr = Value('c',bytes_(''))
- currentstart = Value('d',0.0)
- keyboardCaught = Event()
- p = Process(target=runner, args=(i, testQueue, resultQueue,
- currentaddr, currentstart,
- keyboardCaught, shouldStop,
- self.loaderClass,
- result.__class__,
- pickle.dumps(self.config)))
- p.currentaddr = currentaddr
- p.currentstart = currentstart
- p.keyboardCaught = keyboardCaught
- # p.setDaemon(True)
- p.start()
+ p = self.startProcess(i, testQueue, resultQueue, shouldStop, result)
workers.append(p)
log.debug("Started worker process %s", i+1)
@@ -341,162 +370,143 @@ class MultiProcessTestRunner(TextTestRunner):
# need to keep track of the next time to check for timeouts in case
# more than one process times out at the same time.
nexttimeout=self.config.multiprocess_timeout
- while tasks:
- log.debug("Waiting for results (%s/%s tasks), next timeout=%.3fs",
- len(completed), total_tasks,nexttimeout)
- try:
- iworker, addr, newtask_addrs, batch_result = resultQueue.get(
- timeout=nexttimeout)
- log.debug('Results received for worker %d, %s, new tasks: %d',
- iworker,addr,len(newtask_addrs))
+ thrownError = None
+
+ try:
+ while tasks:
+ log.debug("Waiting for results (%s/%s tasks), next timeout=%.3fs",
+ len(completed), total_tasks,nexttimeout)
try:
+ iworker, addr, newtask_addrs, batch_result = resultQueue.get(
+ timeout=nexttimeout)
+ log.debug('Results received for worker %d, %s, new tasks: %d',
+ iworker,addr,len(newtask_addrs))
try:
- tasks.remove(addr)
- except ValueError:
- log.warn('worker %s failed to remove from tasks: %s',
- iworker,addr)
- total_tasks += len(newtask_addrs)
- for newaddr in newtask_addrs:
- tasks.append(newaddr)
- except KeyError:
- log.debug("Got result for unknown task? %s", addr)
- log.debug("current: %s",str(list(tasks)[0]))
- else:
- completed.append([addr,batch_result])
- self.consolidate(result, batch_result)
- if (self.config.stopOnError
- and not result.wasSuccessful()):
- # set the stop condition
- shouldStop.set()
- break
- if self.config.multiprocess_restartworker:
- log.debug('joining worker %s',iworker)
- # wait for working, but not that important if worker
- # cannot be joined in fact, for workers that add to
- # testQueue, they will not terminate until all their
- # items are read
- workers[iworker].join(timeout=1)
- if not shouldStop.is_set() and not testQueue.empty():
- log.debug('starting new process on worker %s',iworker)
- currentaddr = Value('c',bytes_(''))
- currentstart = Value('d',time.time())
- keyboardCaught = Event()
- workers[iworker] = Process(target=runner,
- args=(iworker, testQueue,
- resultQueue,
- currentaddr,
- currentstart,
- keyboardCaught,
- shouldStop,
- self.loaderClass,
- result.__class__,
- pickle.dumps(self.config)))
- workers[iworker].currentaddr = currentaddr
- workers[iworker].currentstart = currentstart
- workers[iworker].keyboardCaught = keyboardCaught
- workers[iworker].start()
- except Empty:
- log.debug("Timed out with %s tasks pending "
- "(empty testQueue=%d): %s",
- len(tasks),testQueue.empty(),str(tasks))
- any_alive = False
- for iworker, w in enumerate(workers):
- if w.is_alive():
- worker_addr = bytes_(w.currentaddr.value,'ascii')
- timeprocessing = time.time() - w.currentstart.value
- if ( len(worker_addr) == 0
+ try:
+ tasks.remove(addr)
+ except ValueError:
+ log.warn('worker %s failed to remove from tasks: %s',
+ iworker,addr)
+ total_tasks += len(newtask_addrs)
+ tasks.extend(newtask_addrs)
+ except KeyError:
+ log.debug("Got result for unknown task? %s", addr)
+ log.debug("current: %s",str(list(tasks)[0]))
+ else:
+ completed.append([addr,batch_result])
+ self.consolidate(result, batch_result)
+ if (self.config.stopOnError
+ and not result.wasSuccessful()):
+ # set the stop condition
+ shouldStop.set()
+ break
+ if self.config.multiprocess_restartworker:
+ log.debug('joining worker %s',iworker)
+ # wait for working, but not that important if worker
+ # cannot be joined in fact, for workers that add to
+ # testQueue, they will not terminate until all their
+ # items are read
+ workers[iworker].join(timeout=1)
+ if not shouldStop.is_set() and not testQueue.empty():
+ log.debug('starting new process on worker %s',iworker)
+ workers[iworker] = self.startProcess(iworker, testQueue, resultQueue, shouldStop, result)
+ except Empty:
+ log.debug("Timed out with %s tasks pending "
+ "(empty testQueue=%r): %s",
+ len(tasks),testQueue.empty(),str(tasks))
+ any_alive = False
+ for iworker, w in enumerate(workers):
+ if w.is_alive():
+ worker_addr = bytes_(w.currentaddr.value,'ascii')
+ timeprocessing = time.time() - w.currentstart.value
+ if ( len(worker_addr) == 0
+ and timeprocessing > self.config.multiprocess_timeout-0.1):
+ log.debug('worker %d has finished its work item, '
+ 'but is not exiting? do we wait for it?',
+ iworker)
+ else:
+ any_alive = True
+ if (len(worker_addr) > 0
and timeprocessing > self.config.multiprocess_timeout-0.1):
- log.debug('worker %d has finished its work item, '
- 'but is not exiting? do we wait for it?',
- iworker)
- else:
- any_alive = True
- if (len(worker_addr) > 0
- and timeprocessing > self.config.multiprocess_timeout-0.1):
- log.debug('timed out worker %s: %s',
- iworker,worker_addr)
- w.currentaddr.value = bytes_('')
- # If the process is in C++ code, sending a SIGINT
- # might not send a python KeybordInterrupt exception
- # therefore, send multiple signals until an
- # exception is caught. If this takes too long, then
- # terminate the process
- w.keyboardCaught.clear()
- startkilltime = time.time()
- while not w.keyboardCaught.is_set() and w.is_alive():
- if time.time()-startkilltime > self.waitkilltime:
- # have to terminate...
- log.error("terminating worker %s",iworker)
- w.terminate()
- currentaddr = Value('c',bytes_(''))
- currentstart = Value('d',time.time())
- keyboardCaught = Event()
- workers[iworker] = Process(target=runner,
- args=(iworker, testQueue, resultQueue,
- currentaddr, currentstart,
- keyboardCaught, shouldStop,
- self.loaderClass,
- result.__class__,
- pickle.dumps(self.config)))
- workers[iworker].currentaddr = currentaddr
- workers[iworker].currentstart = currentstart
- workers[iworker].keyboardCaught = keyboardCaught
- workers[iworker].start()
- # there is a small probability that the
- # terminated process might send a result,
- # which has to be specially handled or
- # else processes might get orphaned.
- w = workers[iworker]
- break
- os.kill(w.pid, signal.SIGINT)
- time.sleep(0.1)
- if not any_alive and testQueue.empty():
- log.debug("All workers dead")
- break
- nexttimeout=self.config.multiprocess_timeout
- for w in workers:
- if w.is_alive() and len(w.currentaddr.value) > 0:
- timeprocessing = time.time()-w.currentstart.value
- if timeprocessing <= self.config.multiprocess_timeout:
- nexttimeout = min(nexttimeout,
- self.config.multiprocess_timeout-timeprocessing)
-
- log.debug("Completed %s tasks (%s remain)", len(completed), len(tasks))
-
- for case in to_teardown:
- log.debug("Tearing down shared fixtures for %s", case)
- try:
- case.tearDown()
- except (KeyboardInterrupt, SystemExit):
- raise
- except:
- result.addError(case, sys.exc_info())
+ log.debug('timed out worker %s: %s',
+ iworker,worker_addr)
+ w.currentaddr.value = bytes_('')
+ # If the process is in C++ code, sending a SIGILL
+ # might not send a python KeybordInterrupt exception
+ # therefore, send multiple signals until an
+ # exception is caught. If this takes too long, then
+ # terminate the process
+ w.keyboardCaught.clear()
+ startkilltime = time.time()
+ while not w.keyboardCaught.is_set() and w.is_alive():
+ if time.time()-startkilltime > self.waitkilltime:
+ # have to terminate...
+ log.error("terminating worker %s",iworker)
+ w.terminate()
+ # there is a small probability that the
+ # terminated process might send a result,
+ # which has to be specially handled or
+ # else processes might get orphaned.
+ workers[iworker] = w = self.startProcess(iworker, testQueue, resultQueue, shouldStop, result)
+ break
+ os.kill(w.pid, signal.SIGILL)
+ time.sleep(0.1)
+ if not any_alive and testQueue.empty():
+ log.debug("All workers dead")
+ break
+ nexttimeout=self.config.multiprocess_timeout
+ for w in workers:
+ if w.is_alive() and len(w.currentaddr.value) > 0:
+ timeprocessing = time.time()-w.currentstart.value
+ if timeprocessing <= self.config.multiprocess_timeout:
+ nexttimeout = min(nexttimeout,
+ self.config.multiprocess_timeout-timeprocessing)
+ log.debug("Completed %s tasks (%s remain)", len(completed), len(tasks))
+
+ except (KeyboardInterrupt, SystemExit), e:
+ log.info('parent received ctrl-c when waiting for test results')
+ thrownError = e
+ #resultQueue.get(False)
+
+ result.addError(test, sys.exc_info())
+
+ try:
+ for case in to_teardown:
+ log.debug("Tearing down shared fixtures for %s", case)
+ try:
+ case.tearDown()
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ except:
+ result.addError(case, sys.exc_info())
- stop = time.time()
+ stop = time.time()
- # first write since can freeze on shutting down processes
- result.printErrors()
- result.printSummary(start, stop)
- self.config.plugins.finalize(result)
+ # first write since can freeze on shutting down processes
+ result.printErrors()
+ result.printSummary(start, stop)
+ self.config.plugins.finalize(result)
- log.debug("Tell all workers to stop")
- for w in workers:
- if w.is_alive():
- testQueue.put('STOP', block=False)
+ if thrownError is None:
+ log.debug("Tell all workers to stop")
+ for w in workers:
+ if w.is_alive():
+ testQueue.put('STOP', block=False)
- # wait for the workers to end
- try:
+ # wait for the workers to end
for iworker,worker in enumerate(workers):
if worker.is_alive():
log.debug('joining worker %s',iworker)
- worker.join()#10)
+ worker.join()
if worker.is_alive():
log.debug('failed to join worker %s',iworker)
- except KeyboardInterrupt:
- log.info('parent received ctrl-c')
+ except (KeyboardInterrupt, SystemExit):
+ log.info('parent received ctrl-c when shutting down: stop all processes')
for worker in workers:
- worker.terminate()
- worker.join()
+ if worker.is_alive():
+ worker.terminate()
+ if thrownError: raise thrownError
+ else: raise
return result
@@ -573,7 +583,7 @@ class MultiProcessTestRunner(TextTestRunner):
for batch in self.nextBatch(case):
yield batch
- def checkCanSplit(self, context, fixt):
+ def checkCanSplit(context, fixt):
"""
Callback that we use to check whether the fixtures found in a
context or ancestor are ones we care about.
@@ -587,6 +597,7 @@ class MultiProcessTestRunner(TextTestRunner):
if getattr(context, '_multiprocess_can_split_', False):
return False
return True
+ checkCanSplit = staticmethod(checkCanSplit)
def sharedFixtures(self, case):
context = getattr(case, 'context', None)
@@ -688,17 +699,28 @@ def __runner(ix, testQueue, resultQueue, currentaddr, currentstart,
test(result)
currentaddr.value = bytes_('')
resultQueue.put((ix, test_addr, test.tasks, batch(result)))
- except KeyboardInterrupt:
- keyboardCaught.set()
- if len(currentaddr.value) > 0:
- log.exception('Worker %s keyboard interrupt, failing '
- 'current test %s',ix,test_addr)
+ except KeyboardInterrupt, e: #TimedOutException:
+ timeout = isinstance(e, TimedOutException)
+ if timeout:
+ keyboardCaught.set()
+ if len(currentaddr.value):
+ if timeout:
+ msg = 'Worker %s timed out, failing current test %s'
+ else:
+ msg = 'Worker %s keyboard interrupt, failing current test %s'
+ log.exception(msg,ix,test_addr)
currentaddr.value = bytes_('')
failure.Failure(*sys.exc_info())(result)
resultQueue.put((ix, test_addr, test.tasks, batch(result)))
else:
- log.debug('Worker %s test %s timed out',ix,test_addr)
+ if timeout:
+ msg = 'Worker %s test %s timed out'
+ else:
+ msg = 'Worker %s test %s keyboard interrupt'
+ log.debug(msg,ix,test_addr)
resultQueue.put((ix, test_addr, test.tasks, batch(result)))
+ if not timeout:
+ raise
except SystemExit:
currentaddr.value = bytes_('')
log.exception('Worker %s system exit',ix)
@@ -746,6 +768,7 @@ class NoSharedFixtureContextSuite(ContextSuite):
else:
result, orig = result, result
try:
+ #log.debug('setUp for %s', id(self));
self.setUp()
except KeyboardInterrupt:
raise
@@ -754,42 +777,40 @@ class NoSharedFixtureContextSuite(ContextSuite):
result.addError(self, self._exc_info())
return
try:
- localtests = [test for test in self._tests]
- if len(localtests) > 1 and self.testQueue is not None:
- log.debug("queue %d tests"%len(localtests))
- for test in localtests:
- if isinstance(test.test,nose.failure.Failure):
- # proably failed in the generator, so execute directly
- # to get the exception
- test(orig)
- else:
- MultiProcessTestRunner.addtask(self.testQueue,
- self.tasks, test)
- else:
- for test in localtests:
- if (isinstance(test,nose.case.Test)
- and self.arg is not None):
- test.test.arg = self.arg
+ for test in self._tests:
+ if (isinstance(test,nose.case.Test)
+ and self.arg is not None):
+ test.test.arg = self.arg
+ else:
+ test.arg = self.arg
+ test.testQueue = self.testQueue
+ test.tasks = self.tasks
+ if result.shouldStop:
+ log.debug("stopping")
+ break
+ # each nose.case.Test will create its own result proxy
+ # so the cases need the original result, to avoid proxy
+ # chains
+ #log.debug('running test %s in suite %s', test, self);
+ try:
+ test(orig)
+ except KeyboardInterrupt, e:
+ timeout = isinstance(e, TimedOutException)
+ if timeout:
+ msg = 'Timeout when running test %s in suite %s'
else:
- test.arg = self.arg
- test.testQueue = self.testQueue
- test.tasks = self.tasks
- if result.shouldStop:
- log.debug("stopping")
- break
- # each nose.case.Test will create its own result proxy
- # so the cases need the original result, to avoid proxy
- # chains
- try:
- test(orig)
- except KeyboardInterrupt,e:
- err = (TimedOutException,TimedOutException(str(test)),
- sys.exc_info()[2])
- test.config.plugins.addError(test,err)
- orig.addError(test,err)
+ msg = 'KeyboardInterrupt when running test %s in suite %s'
+ log.debug(msg, test, self)
+ err = (TimedOutException,TimedOutException(str(test)),
+ sys.exc_info()[2])
+ test.config.plugins.addError(test,err)
+ orig.addError(test,err)
+ if not timeout:
+ raise
finally:
self.has_run = True
try:
+ #log.debug('tearDown for %s', id(self));
self.tearDown()
except KeyboardInterrupt:
raise
diff --git a/nosetests.1 b/nosetests.1
index bdb9ef2..1ccc461 100644
--- a/nosetests.1
+++ b/nosetests.1
@@ -329,6 +329,21 @@ Produce HTML coverage information in dir
.TP
+\fB\-\-cover\-branches\fR\fR
+Include branch coverage in coverage report [NOSE_COVER_BRANCHES]
+
+
+.TP
+\fB\-\-cover\-xml\fR\fR
+Produce XML coverage information
+
+
+.TP
+\fB\-\-cover\-xml\-file\fR\fR=FILE
+Produce XML coverage information in file
+
+
+.TP
\fB\-\-pdb\fR\fR
Drop into debugger on errors
@@ -476,5 +491,5 @@ jpellerin+nose@gmail.com
.SH COPYRIGHT
LGPL
-.\" Generated by docutils manpage writer on 2011-08-05 10:25.
+.\" Generated by docutils manpage writer on 2011-11-03 16:30.
.\"