diff options
author | kumar <kumar.mcmillan@gmail.com> | 2011-11-11 11:53:23 -0600 |
---|---|---|
committer | kumar <kumar.mcmillan@gmail.com> | 2011-11-11 11:53:23 -0600 |
commit | 38a976ccaeef112f988058ebbac7833f7bcecfa1 (patch) | |
tree | fa2f8546e4923a5a197ced166b6ffb0a8efbeff0 | |
parent | c3a9d056a88ffd05330d702192434dae7b23e385 (diff) | |
parent | 0f3ff413655950a3de8507f41575aec025e4b234 (diff) | |
download | nose-38a976ccaeef112f988058ebbac7833f7bcecfa1.tar.gz |
Merged
18 files changed, 564 insertions, 279 deletions
@@ -7,9 +7,12 @@ 1.1.3 +- Adds :option:`--cover-xml` and :option:`--cover-xml-file` (#311). + Patch by Timothée Peignier. - Adds support for :option:`--cover-branches` (related to #370). Patch by Timothée Peignier. - Fixed Unicode issue on Python 3.1 with coverage (#442) +- fixed class level fixture handling in multiprocessing plugin 1.1.2 @@ -107,7 +107,7 @@ In addition to passing command-line options, you may also put configuration options in your project's *setup.cfg* file, or a .noserc or nose.cfg file in your home directory. In any of these standard .ini-style config files, you put your nosetests configuration in a -``[nosetests]`` section. Options are the same as on the command line, +"[nosetests]" section. Options are the same as on the command line, with the -- prefix removed. For options that are simple switches, you must supply a value: @@ -117,7 +117,7 @@ must supply a value: All configuration files that are found will be loaded and their options combined. You can override the standard config file loading -with the ``-c`` option. +with the "-c" option. Using Plugins @@ -353,6 +353,18 @@ Options Produce HTML coverage information in dir +--cover-branches + + Include branch coverage in coverage report [NOSE_COVER_BRANCHES] + +--cover-xml + + Produce XML coverage information + +--cover-xml-file=FILE + + Produce XML coverage information in file + --pdb Drop into debugger on errors diff --git a/functional_tests/test_multiprocessing/__init__.py b/functional_tests/test_multiprocessing/__init__.py new file mode 100644 index 0000000..2a52efd --- /dev/null +++ b/functional_tests/test_multiprocessing/__init__.py @@ -0,0 +1,28 @@ +import os +from unittest import TestCase + +from nose.plugins import PluginTester +from nose.plugins.skip import SkipTest +from nose.plugins.multiprocess import MultiProcess + +support = os.path.join(os.path.dirname(__file__), 'support') + +def setup(): + try: + import multiprocessing + if 'active' in MultiProcess.status: + raise SkipTest("Multiprocess plugin is active. Skipping tests of " + "plugin itself.") + except ImportError: + raise SkipTest("multiprocessing module not available") + +class MPTestBase(PluginTester, TestCase): + processes = 1 + activate = '--processes=1' + plugins = [MultiProcess()] + suitepath = os.path.join(support, 'timeout.py') + + def __init__(self, *args, **kwargs): + self.activate = '--processes=%d' % self.processes + PluginTester.__init__(self) + TestCase.__init__(self, *args, **kwargs) diff --git a/functional_tests/test_multiprocessing/support/class.py b/functional_tests/test_multiprocessing/support/class.py new file mode 100644 index 0000000..905bcdf --- /dev/null +++ b/functional_tests/test_multiprocessing/support/class.py @@ -0,0 +1,14 @@ +class TestFunctionalTest(object): + counter = 0 + @classmethod + def setup_class(cls): + cls.counter += 1 + @classmethod + def teardown_class(cls): + cls.counter -= 1 + def _run(self): + assert self.counter==1 + def test1(self): + self._run() + def test2(self): + self._run() diff --git a/functional_tests/test_multiprocessing/support/concurrent_shared/__init__.py b/functional_tests/test_multiprocessing/support/concurrent_shared/__init__.py new file mode 100644 index 0000000..c557001 --- /dev/null +++ b/functional_tests/test_multiprocessing/support/concurrent_shared/__init__.py @@ -0,0 +1,6 @@ +counter=[0] +_multiprocess_shared_ = True +def setup_package(): + counter[0] += 1 +def teardown_package(): + counter[0] -= 1 diff --git a/functional_tests/test_multiprocessing/support/concurrent_shared/test.py b/functional_tests/test_multiprocessing/support/concurrent_shared/test.py new file mode 100644 index 0000000..5bc21f6 --- /dev/null +++ b/functional_tests/test_multiprocessing/support/concurrent_shared/test.py @@ -0,0 +1,11 @@ +#from . import counter +from time import sleep +#_multiprocess_can_split_ = True +class Test1(object): + def test1(self): + sleep(1) + pass +class Test2(object): + def test2(self): + sleep(1) + pass diff --git a/functional_tests/test_multiprocessing/support/fake_nosetest.py b/functional_tests/test_multiprocessing/support/fake_nosetest.py new file mode 100644 index 0000000..f5a6289 --- /dev/null +++ b/functional_tests/test_multiprocessing/support/fake_nosetest.py @@ -0,0 +1,14 @@ +import os +import sys + +import nose +from nose.plugins.multiprocess import MultiProcess +from nose.config import Config +from nose.plugins.manager import PluginManager + +if __name__ == '__main__': + if len(sys.argv) < 3: + print "USAGE: %s TEST_FILE LOG_FILE" % sys.argv[0] + sys.exit(1) + os.environ['NOSE_MP_LOG']=sys.argv[2] + nose.main(defaultTest=sys.argv[1], argv=[sys.argv[0],'--processes=1','-v'], config=Config(plugins=PluginManager(plugins=[MultiProcess()]))) diff --git a/functional_tests/test_multiprocessing/support/keyboardinterrupt.py b/functional_tests/test_multiprocessing/support/keyboardinterrupt.py new file mode 100644 index 0000000..2c36d95 --- /dev/null +++ b/functional_tests/test_multiprocessing/support/keyboardinterrupt.py @@ -0,0 +1,29 @@ +import os + +from tempfile import mktemp +from time import sleep + +if 'NOSE_MP_LOG' not in os.environ: + raise Exception('Environment variable NOSE_MP_LOG is not set') + +logfile = os.environ['NOSE_MP_LOG'] + +def log(w): + f = open(logfile, 'a') + f.write(w+"\n") + f.close() +#make sure all tests in this file are dispatched to the same subprocess +def setup(): + log('setup') + +def test_timeout(): + log('test_timeout') + sleep(2) + log('test_timeout_finished') + +# check timeout will not prevent remaining tests dispatched to the same subprocess to continue to run +def test_pass(): + log('test_pass') + +def teardown(): + log('teardown') diff --git a/functional_tests/test_multiprocessing/support/keyboardinterrupt_twice.py b/functional_tests/test_multiprocessing/support/keyboardinterrupt_twice.py new file mode 100644 index 0000000..3932bbd --- /dev/null +++ b/functional_tests/test_multiprocessing/support/keyboardinterrupt_twice.py @@ -0,0 +1,34 @@ +import os + +from tempfile import mktemp +from time import sleep + +if 'NOSE_MP_LOG' not in os.environ: + raise Exception('Environment variable NOSE_MP_LOG is not set') + +logfile = os.environ['NOSE_MP_LOG'] + +def log(w): + f = open(logfile, 'a') + f.write(w+"\n") + f.close() +#make sure all tests in this file are dispatched to the same subprocess +def setup(): + '''global logfile + logfile = mktemp() + print "tempfile is:",logfile''' + log('setup') + +def test_timeout(): + log('test_timeout') + sleep(2) + log('test_timeout_finished') + +# check timeout will not prevent remaining tests dispatched to the same subprocess to continue to run +def test_pass(): + log('test_pass') + +def teardown(): + log('teardown') + sleep(10) + log('teardown_finished') diff --git a/functional_tests/test_multiprocessing/support/timeout.py b/functional_tests/test_multiprocessing/support/timeout.py index 52dce12..480c859 100644 --- a/functional_tests/test_multiprocessing/support/timeout.py +++ b/functional_tests/test_multiprocessing/support/timeout.py @@ -1,6 +1,12 @@ +#make sure all tests in this file are dispatched to the same subprocess +def setup(): + pass def test_timeout(): "this test *should* fail when process-timeout=1" from time import sleep sleep(2) +# check timeout will not prevent remaining tests dispatched to the same subprocess to continue to run +def test_pass(): + pass diff --git a/functional_tests/test_multiprocessing/test_class.py b/functional_tests/test_multiprocessing/test_class.py new file mode 100644 index 0000000..d92710d --- /dev/null +++ b/functional_tests/test_multiprocessing/test_class.py @@ -0,0 +1,13 @@ +import os + +from test_multiprocessing import MPTestBase + + +#test case for #462 +class TestClassFixture(MPTestBase): + suitepath = os.path.join(os.path.dirname(__file__), 'support', 'class.py') + + def runTest(self): + assert str(self.output).strip().endswith('OK') + assert 'Ran 2 tests' in self.output + diff --git a/functional_tests/test_multiprocessing/test_concurrent_shared.py b/functional_tests/test_multiprocessing/test_concurrent_shared.py new file mode 100644 index 0000000..2552c2b --- /dev/null +++ b/functional_tests/test_multiprocessing/test_concurrent_shared.py @@ -0,0 +1,13 @@ +import os + +from test_multiprocessing import MPTestBase + +class TestConcurrentShared(MPTestBase): + processes = 2 + suitepath = os.path.join(os.path.dirname(__file__), 'support', + 'concurrent_shared') + + def runTest(self): + assert 'Ran 2 tests in 1.' in self.output, "make sure two tests use 1.x seconds (no more than 2 seconsd)" + assert str(self.output).strip().endswith('OK') + diff --git a/functional_tests/test_multiprocessing/test_keyboardinterrupt.py b/functional_tests/test_multiprocessing/test_keyboardinterrupt.py new file mode 100644 index 0000000..8f07e54 --- /dev/null +++ b/functional_tests/test_multiprocessing/test_keyboardinterrupt.py @@ -0,0 +1,84 @@ +from subprocess import Popen,PIPE +import os +import sys +from time import sleep +import signal + +import nose + +support = os.path.join(os.path.dirname(__file__), 'support') + +PYTHONPATH = os.environ['PYTHONPATH'] if 'PYTHONPATH' in os.environ else '' +def setup(): + nose_parent_dir = os.path.normpath(os.path.join(os.path.abspath(os.path.dirname(nose.__file__)),'..')) + paths = [nose_parent_dir] + if PYTHONPATH: + paths.append(PYTHONPATH) + os.environ['PYTHONPATH'] = os.pathsep.join(paths) +def teardown(): + if PYTHONPATH: + os.environ['PYTHONPATH'] = PYTHONPATH + else: + del os.environ['PYTHONPATH'] + +runner = os.path.join(support, 'fake_nosetest.py') +def keyboardinterrupt(case): + #os.setsid would create a process group so signals sent to the + #parent process will propogates to all children processes + from tempfile import mktemp + logfile = mktemp() + process = Popen([sys.executable,runner,os.path.join(support,case),logfile], preexec_fn=os.setsid, stdout=PIPE, stderr=PIPE, bufsize=-1) + + #wait until logfile is created: + retry=100 + while not os.path.exists(logfile): + sleep(0.1) + retry -= 1 + if not retry: + raise Exception('Timeout while waiting for log file to be created by fake_nosetest.py') + + os.killpg(process.pid, signal.SIGINT) + return process, logfile + +def get_log_content(logfile): + '''prefix = 'tempfile is: ' + if not stdout.startswith(prefix): + raise Exception('stdout does not contain tmp file name: '+stdout) + logfile = stdout[len(prefix):].strip() #remove trailing new line char''' + f = open(logfile) + content = f.read() + f.close() + os.remove(logfile) + return content + +def test_keyboardinterrupt(): + process, logfile = keyboardinterrupt('keyboardinterrupt.py') + stdout, stderr = [s.decode('utf-8') for s in process.communicate(None)] + print stderr + log = get_log_content(logfile) + assert 'setup' in log + assert 'test_timeout' in log + assert 'test_timeout_finished' not in log + assert 'test_pass' not in log + assert 'teardown' in log + assert 'Ran 0 tests' in stderr + assert 'KeyboardInterrupt' in stderr + assert 'FAILED (errors=1)' in stderr + assert 'ERROR: Worker 0 keyboard interrupt, failing current test '+os.path.join(support,'keyboardinterrupt.py') in stderr + + +def test_keyboardinterrupt_twice(): + process, logfile = keyboardinterrupt('keyboardinterrupt_twice.py') + sleep(0.5) + os.killpg(process.pid, signal.SIGINT) + stdout, stderr = [s.decode('utf-8') for s in process.communicate(None)] + log = get_log_content(logfile) + assert 'setup' in log + assert 'test_timeout' in log + assert 'test_timeout_finished' not in log + assert 'test_pass' not in log + assert 'teardown' in log + assert 'teardown_finished' not in log + assert 'Ran 0 tests' in stderr + assert 'KeyboardInterrupt' in stderr + assert 'FAILED (errors=1)' in stderr diff --git a/functional_tests/test_multiprocessing/test_nameerror.py b/functional_tests/test_multiprocessing/test_nameerror.py index f73d02b..5e58226 100644 --- a/functional_tests/test_multiprocessing/test_nameerror.py +++ b/functional_tests/test_multiprocessing/test_nameerror.py @@ -1,28 +1,10 @@ import os -import unittest -from nose.plugins import PluginTester -from nose.plugins.skip import SkipTest -from nose.plugins.multiprocess import MultiProcess +from test_multiprocessing import support, MPTestBase - -support = os.path.join(os.path.dirname(__file__), 'support') - - -def setup(): - try: - import multiprocessing - if 'active' in MultiProcess.status: - raise SkipTest("Multiprocess plugin is active. Skipping tests of " - "plugin itself.") - except ImportError: - raise SkipTest("multiprocessing module not available") - - -class TestMPNameError(PluginTester, unittest.TestCase): - activate = '--processes=2' - plugins = [MultiProcess()] - suitepath = os.path.join(support, 'nameerror.py') +class TestMPNameError(MPTestBase): + processes = 2 + suitepath = os.path.join(os.path.dirname(__file__), 'support', 'nameerror.py') def runTest(self): print str(self.output) diff --git a/functional_tests/test_multiprocessing/test_process_timeout.py b/functional_tests/test_multiprocessing/test_process_timeout.py index 535ecdb..6b858f8 100644 --- a/functional_tests/test_multiprocessing/test_process_timeout.py +++ b/functional_tests/test_multiprocessing/test_process_timeout.py @@ -1,37 +1,21 @@ import os -import unittest -from nose.plugins import PluginTester -from nose.plugins.skip import SkipTest -from nose.plugins.multiprocess import MultiProcess +from test_multiprocessing import MPTestBase -support = os.path.join(os.path.dirname(__file__), 'support') - - -def setup(): - try: - import multiprocessing - if 'active' in MultiProcess.status: - raise SkipTest("Multiprocess plugin is active. Skipping tests of " - "plugin itself.") - except ImportError: - raise SkipTest("multiprocessing module not available") - - - -class TestMPTimeout(PluginTester, unittest.TestCase): - activate = '--processes=2' +class TestMPTimeout(MPTestBase): args = ['--process-timeout=1'] - plugins = [MultiProcess()] - suitepath = os.path.join(support, 'timeout.py') + suitepath = os.path.join(os.path.dirname(__file__), 'support', 'timeout.py') def runTest(self): assert "TimedOutException: 'timeout.test_timeout'" in self.output - + assert "Ran 2 tests in" in self.output + assert "FAILED (errors=1)" in self.output class TestMPTimeoutPass(TestMPTimeout): args = ['--process-timeout=3'] def runTest(self): assert "TimedOutException: 'timeout.test_timeout'" not in self.output + assert "Ran 2 tests in" in self.output assert str(self.output).strip().endswith('OK') + diff --git a/nose/plugins/cover.py b/nose/plugins/cover.py index 8be1fe2..a055c92 100644 --- a/nose/plugins/cover.py +++ b/nose/plugins/cover.py @@ -113,6 +113,15 @@ class Coverage(Plugin): dest="cover_branches", help="Include branch coverage in coverage report " "[NOSE_COVER_BRANCHES]") + parser.add_option("--cover-xml", action="store_true", + default=env.get('NOSE_COVER_XML'), + dest="cover_xml", + help="Produce XML coverage information") + parser.add_option("--cover-xml-file", action="store", + default=env.get('NOSE_COVER_XML_FILE', 'coverage.xml'), + dest="cover_xml_file", + metavar="FILE", + help="Produce XML coverage information in file") def configure(self, options, config): """ @@ -149,6 +158,10 @@ class Coverage(Plugin): self.coverHtmlDir = options.cover_html_dir log.debug('Will put HTML coverage report in %s', self.coverHtmlDir) self.coverBranches = options.cover_branches + self.coverXmlFile = None + if options.cover_xml: + self.coverXmlFile = options.cover_xml_file + log.debug('Will put XML coverage report in %s', self.coverXmlFile) if self.enabled: self.status['active'] = True @@ -182,6 +195,9 @@ class Coverage(Plugin): self.coverInstance.html_report(modules, self.coverHtmlDir) else: self.report_html(modules) + if self.coverXmlFile: + log.debug("Generating XML coverage report") + self.coverInstance.xml_report(modules, self.coverXmlFile) def report_html(self, modules): if not os.path.exists(self.coverHtmlDir): diff --git a/nose/plugins/multiprocess.py b/nose/plugins/multiprocess.py index 260cbf8..48ba54d 100644 --- a/nose/plugins/multiprocess.py +++ b/nose/plugins/multiprocess.py @@ -130,7 +130,8 @@ log = logging.getLogger(__name__) Process = Queue = Pool = Event = Value = Array = None -class TimedOutException(Exception): +# have to inherit KeyboardInterrupt to it will interrupt process properly +class TimedOutException(KeyboardInterrupt): def __init__(self, value = "Timed Out"): self.value = value def __str__(self): @@ -140,7 +141,16 @@ def _import_mp(): global Process, Queue, Pool, Event, Value, Array try: from multiprocessing import Manager, Process + #prevent the server process created in the manager which holds Python + #objects and allows other processes to manipulate them using proxies + #to interrupt on SIGINT (keyboardinterrupt) so that the communication + #channel between subprocesses and main process is still usable after + #ctrl+C is received in the main process. + old=signal.signal(signal.SIGINT, signal.SIG_IGN) m = Manager() + #reset it back so main process will receive a KeyboardInterrupt + #exception on ctrl+c + signal.signal(signal.SIGINT, old) Queue, Pool, Event, Value, Array = ( m.Queue, m.Pool, m.Event, m.Value, m.Array ) @@ -247,42 +257,17 @@ class MultiProcess(Plugin): config=self.config, loaderClass=self.loaderClass) +def signalhandler(sig, frame): + raise TimedOutException() + class MultiProcessTestRunner(TextTestRunner): waitkilltime = 5.0 # max time to wait to terminate a process that does not - # respond to SIGINT + # respond to SIGILL def __init__(self, **kw): self.loaderClass = kw.pop('loaderClass', loader.defaultTestLoader) super(MultiProcessTestRunner, self).__init__(**kw) - def run(self, test): - """ - Execute the test (which may be a test suite). If the test is a suite, - distribute it out among as many processes as have been configured, at - as fine a level as is possible given the context fixtures defined in - the suite or any sub-suites. - - """ - log.debug("%s.run(%s) (%s)", self, test, os.getpid()) - wrapper = self.config.plugins.prepareTest(test) - if wrapper is not None: - test = wrapper - - # plugins can decorate or capture the output stream - wrapped = self.config.plugins.setOutputStream(self.stream) - if wrapped is not None: - self.stream = wrapped - - testQueue = Queue() - resultQueue = Queue() - tasks = [] - completed = [] - workers = [] - to_teardown = [] - shouldStop = Event() - - result = self._makeResult() - start = time.time() - + def collect(self, test, testQueue, tasks, to_teardown, result): # dispatch and collect results # put indexes only on queue because tests aren't picklable for case in self.nextBatch(test): @@ -308,32 +293,76 @@ class MultiProcessTestRunner(TextTestRunner): result.addError(case, sys.exc_info()) else: to_teardown.append(case) - for _t in case: - test_addr = self.addtask(testQueue,tasks,_t) - log.debug("Queued shared-fixture test %s (%s) to %s", - len(tasks), test_addr, testQueue) + if case.factory: + ancestors=case.factory.context.get(case, []) + for an in ancestors[:2]: + #log.debug('reset ancestor %s', an) + if getattr(an, '_multiprocess_shared_', False): + an._multiprocess_can_split_=True + #an._multiprocess_shared_=False + self.collect(case, testQueue, tasks, to_teardown, result) else: test_addr = self.addtask(testQueue,tasks,case) log.debug("Queued test %s (%s) to %s", len(tasks), test_addr, testQueue) + def startProcess(self, iworker, testQueue, resultQueue, shouldStop, result): + currentaddr = Value('c',bytes_('')) + currentstart = Value('d',time.time()) + keyboardCaught = Event() + p = Process(target=runner, + args=(iworker, testQueue, + resultQueue, + currentaddr, + currentstart, + keyboardCaught, + shouldStop, + self.loaderClass, + result.__class__, + pickle.dumps(self.config))) + p.currentaddr = currentaddr + p.currentstart = currentstart + p.keyboardCaught = keyboardCaught + old = signal.signal(signal.SIGILL, signalhandler) + p.start() + signal.signal(signal.SIGILL, old) + return p + + def run(self, test): + """ + Execute the test (which may be a test suite). If the test is a suite, + distribute it out among as many processes as have been configured, at + as fine a level as is possible given the context fixtures defined in + the suite or any sub-suites. + + """ + log.debug("%s.run(%s) (%s)", self, test, os.getpid()) + wrapper = self.config.plugins.prepareTest(test) + if wrapper is not None: + test = wrapper + + # plugins can decorate or capture the output stream + wrapped = self.config.plugins.setOutputStream(self.stream) + if wrapped is not None: + self.stream = wrapped + + testQueue = Queue() + resultQueue = Queue() + tasks = [] + completed = [] + workers = [] + to_teardown = [] + shouldStop = Event() + + result = self._makeResult() + start = time.time() + + self.collect(test, testQueue, tasks, to_teardown, result) + log.debug("Starting %s workers", self.config.multiprocess_workers) for i in range(self.config.multiprocess_workers): - currentaddr = Value('c',bytes_('')) - currentstart = Value('d',0.0) - keyboardCaught = Event() - p = Process(target=runner, args=(i, testQueue, resultQueue, - currentaddr, currentstart, - keyboardCaught, shouldStop, - self.loaderClass, - result.__class__, - pickle.dumps(self.config))) - p.currentaddr = currentaddr - p.currentstart = currentstart - p.keyboardCaught = keyboardCaught - # p.setDaemon(True) - p.start() + p = self.startProcess(i, testQueue, resultQueue, shouldStop, result) workers.append(p) log.debug("Started worker process %s", i+1) @@ -341,162 +370,143 @@ class MultiProcessTestRunner(TextTestRunner): # need to keep track of the next time to check for timeouts in case # more than one process times out at the same time. nexttimeout=self.config.multiprocess_timeout - while tasks: - log.debug("Waiting for results (%s/%s tasks), next timeout=%.3fs", - len(completed), total_tasks,nexttimeout) - try: - iworker, addr, newtask_addrs, batch_result = resultQueue.get( - timeout=nexttimeout) - log.debug('Results received for worker %d, %s, new tasks: %d', - iworker,addr,len(newtask_addrs)) + thrownError = None + + try: + while tasks: + log.debug("Waiting for results (%s/%s tasks), next timeout=%.3fs", + len(completed), total_tasks,nexttimeout) try: + iworker, addr, newtask_addrs, batch_result = resultQueue.get( + timeout=nexttimeout) + log.debug('Results received for worker %d, %s, new tasks: %d', + iworker,addr,len(newtask_addrs)) try: - tasks.remove(addr) - except ValueError: - log.warn('worker %s failed to remove from tasks: %s', - iworker,addr) - total_tasks += len(newtask_addrs) - for newaddr in newtask_addrs: - tasks.append(newaddr) - except KeyError: - log.debug("Got result for unknown task? %s", addr) - log.debug("current: %s",str(list(tasks)[0])) - else: - completed.append([addr,batch_result]) - self.consolidate(result, batch_result) - if (self.config.stopOnError - and not result.wasSuccessful()): - # set the stop condition - shouldStop.set() - break - if self.config.multiprocess_restartworker: - log.debug('joining worker %s',iworker) - # wait for working, but not that important if worker - # cannot be joined in fact, for workers that add to - # testQueue, they will not terminate until all their - # items are read - workers[iworker].join(timeout=1) - if not shouldStop.is_set() and not testQueue.empty(): - log.debug('starting new process on worker %s',iworker) - currentaddr = Value('c',bytes_('')) - currentstart = Value('d',time.time()) - keyboardCaught = Event() - workers[iworker] = Process(target=runner, - args=(iworker, testQueue, - resultQueue, - currentaddr, - currentstart, - keyboardCaught, - shouldStop, - self.loaderClass, - result.__class__, - pickle.dumps(self.config))) - workers[iworker].currentaddr = currentaddr - workers[iworker].currentstart = currentstart - workers[iworker].keyboardCaught = keyboardCaught - workers[iworker].start() - except Empty: - log.debug("Timed out with %s tasks pending " - "(empty testQueue=%d): %s", - len(tasks),testQueue.empty(),str(tasks)) - any_alive = False - for iworker, w in enumerate(workers): - if w.is_alive(): - worker_addr = bytes_(w.currentaddr.value,'ascii') - timeprocessing = time.time() - w.currentstart.value - if ( len(worker_addr) == 0 + try: + tasks.remove(addr) + except ValueError: + log.warn('worker %s failed to remove from tasks: %s', + iworker,addr) + total_tasks += len(newtask_addrs) + tasks.extend(newtask_addrs) + except KeyError: + log.debug("Got result for unknown task? %s", addr) + log.debug("current: %s",str(list(tasks)[0])) + else: + completed.append([addr,batch_result]) + self.consolidate(result, batch_result) + if (self.config.stopOnError + and not result.wasSuccessful()): + # set the stop condition + shouldStop.set() + break + if self.config.multiprocess_restartworker: + log.debug('joining worker %s',iworker) + # wait for working, but not that important if worker + # cannot be joined in fact, for workers that add to + # testQueue, they will not terminate until all their + # items are read + workers[iworker].join(timeout=1) + if not shouldStop.is_set() and not testQueue.empty(): + log.debug('starting new process on worker %s',iworker) + workers[iworker] = self.startProcess(iworker, testQueue, resultQueue, shouldStop, result) + except Empty: + log.debug("Timed out with %s tasks pending " + "(empty testQueue=%r): %s", + len(tasks),testQueue.empty(),str(tasks)) + any_alive = False + for iworker, w in enumerate(workers): + if w.is_alive(): + worker_addr = bytes_(w.currentaddr.value,'ascii') + timeprocessing = time.time() - w.currentstart.value + if ( len(worker_addr) == 0 + and timeprocessing > self.config.multiprocess_timeout-0.1): + log.debug('worker %d has finished its work item, ' + 'but is not exiting? do we wait for it?', + iworker) + else: + any_alive = True + if (len(worker_addr) > 0 and timeprocessing > self.config.multiprocess_timeout-0.1): - log.debug('worker %d has finished its work item, ' - 'but is not exiting? do we wait for it?', - iworker) - else: - any_alive = True - if (len(worker_addr) > 0 - and timeprocessing > self.config.multiprocess_timeout-0.1): - log.debug('timed out worker %s: %s', - iworker,worker_addr) - w.currentaddr.value = bytes_('') - # If the process is in C++ code, sending a SIGINT - # might not send a python KeybordInterrupt exception - # therefore, send multiple signals until an - # exception is caught. If this takes too long, then - # terminate the process - w.keyboardCaught.clear() - startkilltime = time.time() - while not w.keyboardCaught.is_set() and w.is_alive(): - if time.time()-startkilltime > self.waitkilltime: - # have to terminate... - log.error("terminating worker %s",iworker) - w.terminate() - currentaddr = Value('c',bytes_('')) - currentstart = Value('d',time.time()) - keyboardCaught = Event() - workers[iworker] = Process(target=runner, - args=(iworker, testQueue, resultQueue, - currentaddr, currentstart, - keyboardCaught, shouldStop, - self.loaderClass, - result.__class__, - pickle.dumps(self.config))) - workers[iworker].currentaddr = currentaddr - workers[iworker].currentstart = currentstart - workers[iworker].keyboardCaught = keyboardCaught - workers[iworker].start() - # there is a small probability that the - # terminated process might send a result, - # which has to be specially handled or - # else processes might get orphaned. - w = workers[iworker] - break - os.kill(w.pid, signal.SIGINT) - time.sleep(0.1) - if not any_alive and testQueue.empty(): - log.debug("All workers dead") - break - nexttimeout=self.config.multiprocess_timeout - for w in workers: - if w.is_alive() and len(w.currentaddr.value) > 0: - timeprocessing = time.time()-w.currentstart.value - if timeprocessing <= self.config.multiprocess_timeout: - nexttimeout = min(nexttimeout, - self.config.multiprocess_timeout-timeprocessing) - - log.debug("Completed %s tasks (%s remain)", len(completed), len(tasks)) - - for case in to_teardown: - log.debug("Tearing down shared fixtures for %s", case) - try: - case.tearDown() - except (KeyboardInterrupt, SystemExit): - raise - except: - result.addError(case, sys.exc_info()) + log.debug('timed out worker %s: %s', + iworker,worker_addr) + w.currentaddr.value = bytes_('') + # If the process is in C++ code, sending a SIGILL + # might not send a python KeybordInterrupt exception + # therefore, send multiple signals until an + # exception is caught. If this takes too long, then + # terminate the process + w.keyboardCaught.clear() + startkilltime = time.time() + while not w.keyboardCaught.is_set() and w.is_alive(): + if time.time()-startkilltime > self.waitkilltime: + # have to terminate... + log.error("terminating worker %s",iworker) + w.terminate() + # there is a small probability that the + # terminated process might send a result, + # which has to be specially handled or + # else processes might get orphaned. + workers[iworker] = w = self.startProcess(iworker, testQueue, resultQueue, shouldStop, result) + break + os.kill(w.pid, signal.SIGILL) + time.sleep(0.1) + if not any_alive and testQueue.empty(): + log.debug("All workers dead") + break + nexttimeout=self.config.multiprocess_timeout + for w in workers: + if w.is_alive() and len(w.currentaddr.value) > 0: + timeprocessing = time.time()-w.currentstart.value + if timeprocessing <= self.config.multiprocess_timeout: + nexttimeout = min(nexttimeout, + self.config.multiprocess_timeout-timeprocessing) + log.debug("Completed %s tasks (%s remain)", len(completed), len(tasks)) + + except (KeyboardInterrupt, SystemExit), e: + log.info('parent received ctrl-c when waiting for test results') + thrownError = e + #resultQueue.get(False) + + result.addError(test, sys.exc_info()) + + try: + for case in to_teardown: + log.debug("Tearing down shared fixtures for %s", case) + try: + case.tearDown() + except (KeyboardInterrupt, SystemExit): + raise + except: + result.addError(case, sys.exc_info()) - stop = time.time() + stop = time.time() - # first write since can freeze on shutting down processes - result.printErrors() - result.printSummary(start, stop) - self.config.plugins.finalize(result) + # first write since can freeze on shutting down processes + result.printErrors() + result.printSummary(start, stop) + self.config.plugins.finalize(result) - log.debug("Tell all workers to stop") - for w in workers: - if w.is_alive(): - testQueue.put('STOP', block=False) + if thrownError is None: + log.debug("Tell all workers to stop") + for w in workers: + if w.is_alive(): + testQueue.put('STOP', block=False) - # wait for the workers to end - try: + # wait for the workers to end for iworker,worker in enumerate(workers): if worker.is_alive(): log.debug('joining worker %s',iworker) - worker.join()#10) + worker.join() if worker.is_alive(): log.debug('failed to join worker %s',iworker) - except KeyboardInterrupt: - log.info('parent received ctrl-c') + except (KeyboardInterrupt, SystemExit): + log.info('parent received ctrl-c when shutting down: stop all processes') for worker in workers: - worker.terminate() - worker.join() + if worker.is_alive(): + worker.terminate() + if thrownError: raise thrownError + else: raise return result @@ -573,7 +583,7 @@ class MultiProcessTestRunner(TextTestRunner): for batch in self.nextBatch(case): yield batch - def checkCanSplit(self, context, fixt): + def checkCanSplit(context, fixt): """ Callback that we use to check whether the fixtures found in a context or ancestor are ones we care about. @@ -587,6 +597,7 @@ class MultiProcessTestRunner(TextTestRunner): if getattr(context, '_multiprocess_can_split_', False): return False return True + checkCanSplit = staticmethod(checkCanSplit) def sharedFixtures(self, case): context = getattr(case, 'context', None) @@ -688,17 +699,28 @@ def __runner(ix, testQueue, resultQueue, currentaddr, currentstart, test(result) currentaddr.value = bytes_('') resultQueue.put((ix, test_addr, test.tasks, batch(result))) - except KeyboardInterrupt: - keyboardCaught.set() - if len(currentaddr.value) > 0: - log.exception('Worker %s keyboard interrupt, failing ' - 'current test %s',ix,test_addr) + except KeyboardInterrupt, e: #TimedOutException: + timeout = isinstance(e, TimedOutException) + if timeout: + keyboardCaught.set() + if len(currentaddr.value): + if timeout: + msg = 'Worker %s timed out, failing current test %s' + else: + msg = 'Worker %s keyboard interrupt, failing current test %s' + log.exception(msg,ix,test_addr) currentaddr.value = bytes_('') failure.Failure(*sys.exc_info())(result) resultQueue.put((ix, test_addr, test.tasks, batch(result))) else: - log.debug('Worker %s test %s timed out',ix,test_addr) + if timeout: + msg = 'Worker %s test %s timed out' + else: + msg = 'Worker %s test %s keyboard interrupt' + log.debug(msg,ix,test_addr) resultQueue.put((ix, test_addr, test.tasks, batch(result))) + if not timeout: + raise except SystemExit: currentaddr.value = bytes_('') log.exception('Worker %s system exit',ix) @@ -746,6 +768,7 @@ class NoSharedFixtureContextSuite(ContextSuite): else: result, orig = result, result try: + #log.debug('setUp for %s', id(self)); self.setUp() except KeyboardInterrupt: raise @@ -754,42 +777,40 @@ class NoSharedFixtureContextSuite(ContextSuite): result.addError(self, self._exc_info()) return try: - localtests = [test for test in self._tests] - if len(localtests) > 1 and self.testQueue is not None: - log.debug("queue %d tests"%len(localtests)) - for test in localtests: - if isinstance(test.test,nose.failure.Failure): - # proably failed in the generator, so execute directly - # to get the exception - test(orig) - else: - MultiProcessTestRunner.addtask(self.testQueue, - self.tasks, test) - else: - for test in localtests: - if (isinstance(test,nose.case.Test) - and self.arg is not None): - test.test.arg = self.arg + for test in self._tests: + if (isinstance(test,nose.case.Test) + and self.arg is not None): + test.test.arg = self.arg + else: + test.arg = self.arg + test.testQueue = self.testQueue + test.tasks = self.tasks + if result.shouldStop: + log.debug("stopping") + break + # each nose.case.Test will create its own result proxy + # so the cases need the original result, to avoid proxy + # chains + #log.debug('running test %s in suite %s', test, self); + try: + test(orig) + except KeyboardInterrupt, e: + timeout = isinstance(e, TimedOutException) + if timeout: + msg = 'Timeout when running test %s in suite %s' else: - test.arg = self.arg - test.testQueue = self.testQueue - test.tasks = self.tasks - if result.shouldStop: - log.debug("stopping") - break - # each nose.case.Test will create its own result proxy - # so the cases need the original result, to avoid proxy - # chains - try: - test(orig) - except KeyboardInterrupt,e: - err = (TimedOutException,TimedOutException(str(test)), - sys.exc_info()[2]) - test.config.plugins.addError(test,err) - orig.addError(test,err) + msg = 'KeyboardInterrupt when running test %s in suite %s' + log.debug(msg, test, self) + err = (TimedOutException,TimedOutException(str(test)), + sys.exc_info()[2]) + test.config.plugins.addError(test,err) + orig.addError(test,err) + if not timeout: + raise finally: self.has_run = True try: + #log.debug('tearDown for %s', id(self)); self.tearDown() except KeyboardInterrupt: raise diff --git a/nosetests.1 b/nosetests.1 index bdb9ef2..1ccc461 100644 --- a/nosetests.1 +++ b/nosetests.1 @@ -329,6 +329,21 @@ Produce HTML coverage information in dir .TP +\fB\-\-cover\-branches\fR\fR +Include branch coverage in coverage report [NOSE_COVER_BRANCHES] + + +.TP +\fB\-\-cover\-xml\fR\fR +Produce XML coverage information + + +.TP +\fB\-\-cover\-xml\-file\fR\fR=FILE +Produce XML coverage information in file + + +.TP \fB\-\-pdb\fR\fR Drop into debugger on errors @@ -476,5 +491,5 @@ jpellerin+nose@gmail.com .SH COPYRIGHT LGPL -.\" Generated by docutils manpage writer on 2011-08-05 10:25. +.\" Generated by docutils manpage writer on 2011-11-03 16:30. .\" |