summaryrefslogtreecommitdiff
path: root/nose
diff options
context:
space:
mode:
authorkumar <kumar.mcmillan@gmail.com>2011-11-11 11:53:23 -0600
committerkumar <kumar.mcmillan@gmail.com>2011-11-11 11:53:23 -0600
commit38a976ccaeef112f988058ebbac7833f7bcecfa1 (patch)
treefa2f8546e4923a5a197ced166b6ffb0a8efbeff0 /nose
parentc3a9d056a88ffd05330d702192434dae7b23e385 (diff)
parent0f3ff413655950a3de8507f41575aec025e4b234 (diff)
downloadnose-38a976ccaeef112f988058ebbac7833f7bcecfa1.tar.gz
Merged
Diffstat (limited to 'nose')
-rw-r--r--nose/plugins/cover.py16
-rw-r--r--nose/plugins/multiprocess.py483
2 files changed, 268 insertions, 231 deletions
diff --git a/nose/plugins/cover.py b/nose/plugins/cover.py
index 8be1fe2..a055c92 100644
--- a/nose/plugins/cover.py
+++ b/nose/plugins/cover.py
@@ -113,6 +113,15 @@ class Coverage(Plugin):
dest="cover_branches",
help="Include branch coverage in coverage report "
"[NOSE_COVER_BRANCHES]")
+ parser.add_option("--cover-xml", action="store_true",
+ default=env.get('NOSE_COVER_XML'),
+ dest="cover_xml",
+ help="Produce XML coverage information")
+ parser.add_option("--cover-xml-file", action="store",
+ default=env.get('NOSE_COVER_XML_FILE', 'coverage.xml'),
+ dest="cover_xml_file",
+ metavar="FILE",
+ help="Produce XML coverage information in file")
def configure(self, options, config):
"""
@@ -149,6 +158,10 @@ class Coverage(Plugin):
self.coverHtmlDir = options.cover_html_dir
log.debug('Will put HTML coverage report in %s', self.coverHtmlDir)
self.coverBranches = options.cover_branches
+ self.coverXmlFile = None
+ if options.cover_xml:
+ self.coverXmlFile = options.cover_xml_file
+ log.debug('Will put XML coverage report in %s', self.coverXmlFile)
if self.enabled:
self.status['active'] = True
@@ -182,6 +195,9 @@ class Coverage(Plugin):
self.coverInstance.html_report(modules, self.coverHtmlDir)
else:
self.report_html(modules)
+ if self.coverXmlFile:
+ log.debug("Generating XML coverage report")
+ self.coverInstance.xml_report(modules, self.coverXmlFile)
def report_html(self, modules):
if not os.path.exists(self.coverHtmlDir):
diff --git a/nose/plugins/multiprocess.py b/nose/plugins/multiprocess.py
index 260cbf8..48ba54d 100644
--- a/nose/plugins/multiprocess.py
+++ b/nose/plugins/multiprocess.py
@@ -130,7 +130,8 @@ log = logging.getLogger(__name__)
Process = Queue = Pool = Event = Value = Array = None
-class TimedOutException(Exception):
+# have to inherit KeyboardInterrupt to it will interrupt process properly
+class TimedOutException(KeyboardInterrupt):
def __init__(self, value = "Timed Out"):
self.value = value
def __str__(self):
@@ -140,7 +141,16 @@ def _import_mp():
global Process, Queue, Pool, Event, Value, Array
try:
from multiprocessing import Manager, Process
+ #prevent the server process created in the manager which holds Python
+ #objects and allows other processes to manipulate them using proxies
+ #to interrupt on SIGINT (keyboardinterrupt) so that the communication
+ #channel between subprocesses and main process is still usable after
+ #ctrl+C is received in the main process.
+ old=signal.signal(signal.SIGINT, signal.SIG_IGN)
m = Manager()
+ #reset it back so main process will receive a KeyboardInterrupt
+ #exception on ctrl+c
+ signal.signal(signal.SIGINT, old)
Queue, Pool, Event, Value, Array = (
m.Queue, m.Pool, m.Event, m.Value, m.Array
)
@@ -247,42 +257,17 @@ class MultiProcess(Plugin):
config=self.config,
loaderClass=self.loaderClass)
+def signalhandler(sig, frame):
+ raise TimedOutException()
+
class MultiProcessTestRunner(TextTestRunner):
waitkilltime = 5.0 # max time to wait to terminate a process that does not
- # respond to SIGINT
+ # respond to SIGILL
def __init__(self, **kw):
self.loaderClass = kw.pop('loaderClass', loader.defaultTestLoader)
super(MultiProcessTestRunner, self).__init__(**kw)
- def run(self, test):
- """
- Execute the test (which may be a test suite). If the test is a suite,
- distribute it out among as many processes as have been configured, at
- as fine a level as is possible given the context fixtures defined in
- the suite or any sub-suites.
-
- """
- log.debug("%s.run(%s) (%s)", self, test, os.getpid())
- wrapper = self.config.plugins.prepareTest(test)
- if wrapper is not None:
- test = wrapper
-
- # plugins can decorate or capture the output stream
- wrapped = self.config.plugins.setOutputStream(self.stream)
- if wrapped is not None:
- self.stream = wrapped
-
- testQueue = Queue()
- resultQueue = Queue()
- tasks = []
- completed = []
- workers = []
- to_teardown = []
- shouldStop = Event()
-
- result = self._makeResult()
- start = time.time()
-
+ def collect(self, test, testQueue, tasks, to_teardown, result):
# dispatch and collect results
# put indexes only on queue because tests aren't picklable
for case in self.nextBatch(test):
@@ -308,32 +293,76 @@ class MultiProcessTestRunner(TextTestRunner):
result.addError(case, sys.exc_info())
else:
to_teardown.append(case)
- for _t in case:
- test_addr = self.addtask(testQueue,tasks,_t)
- log.debug("Queued shared-fixture test %s (%s) to %s",
- len(tasks), test_addr, testQueue)
+ if case.factory:
+ ancestors=case.factory.context.get(case, [])
+ for an in ancestors[:2]:
+ #log.debug('reset ancestor %s', an)
+ if getattr(an, '_multiprocess_shared_', False):
+ an._multiprocess_can_split_=True
+ #an._multiprocess_shared_=False
+ self.collect(case, testQueue, tasks, to_teardown, result)
else:
test_addr = self.addtask(testQueue,tasks,case)
log.debug("Queued test %s (%s) to %s",
len(tasks), test_addr, testQueue)
+ def startProcess(self, iworker, testQueue, resultQueue, shouldStop, result):
+ currentaddr = Value('c',bytes_(''))
+ currentstart = Value('d',time.time())
+ keyboardCaught = Event()
+ p = Process(target=runner,
+ args=(iworker, testQueue,
+ resultQueue,
+ currentaddr,
+ currentstart,
+ keyboardCaught,
+ shouldStop,
+ self.loaderClass,
+ result.__class__,
+ pickle.dumps(self.config)))
+ p.currentaddr = currentaddr
+ p.currentstart = currentstart
+ p.keyboardCaught = keyboardCaught
+ old = signal.signal(signal.SIGILL, signalhandler)
+ p.start()
+ signal.signal(signal.SIGILL, old)
+ return p
+
+ def run(self, test):
+ """
+ Execute the test (which may be a test suite). If the test is a suite,
+ distribute it out among as many processes as have been configured, at
+ as fine a level as is possible given the context fixtures defined in
+ the suite or any sub-suites.
+
+ """
+ log.debug("%s.run(%s) (%s)", self, test, os.getpid())
+ wrapper = self.config.plugins.prepareTest(test)
+ if wrapper is not None:
+ test = wrapper
+
+ # plugins can decorate or capture the output stream
+ wrapped = self.config.plugins.setOutputStream(self.stream)
+ if wrapped is not None:
+ self.stream = wrapped
+
+ testQueue = Queue()
+ resultQueue = Queue()
+ tasks = []
+ completed = []
+ workers = []
+ to_teardown = []
+ shouldStop = Event()
+
+ result = self._makeResult()
+ start = time.time()
+
+ self.collect(test, testQueue, tasks, to_teardown, result)
+
log.debug("Starting %s workers", self.config.multiprocess_workers)
for i in range(self.config.multiprocess_workers):
- currentaddr = Value('c',bytes_(''))
- currentstart = Value('d',0.0)
- keyboardCaught = Event()
- p = Process(target=runner, args=(i, testQueue, resultQueue,
- currentaddr, currentstart,
- keyboardCaught, shouldStop,
- self.loaderClass,
- result.__class__,
- pickle.dumps(self.config)))
- p.currentaddr = currentaddr
- p.currentstart = currentstart
- p.keyboardCaught = keyboardCaught
- # p.setDaemon(True)
- p.start()
+ p = self.startProcess(i, testQueue, resultQueue, shouldStop, result)
workers.append(p)
log.debug("Started worker process %s", i+1)
@@ -341,162 +370,143 @@ class MultiProcessTestRunner(TextTestRunner):
# need to keep track of the next time to check for timeouts in case
# more than one process times out at the same time.
nexttimeout=self.config.multiprocess_timeout
- while tasks:
- log.debug("Waiting for results (%s/%s tasks), next timeout=%.3fs",
- len(completed), total_tasks,nexttimeout)
- try:
- iworker, addr, newtask_addrs, batch_result = resultQueue.get(
- timeout=nexttimeout)
- log.debug('Results received for worker %d, %s, new tasks: %d',
- iworker,addr,len(newtask_addrs))
+ thrownError = None
+
+ try:
+ while tasks:
+ log.debug("Waiting for results (%s/%s tasks), next timeout=%.3fs",
+ len(completed), total_tasks,nexttimeout)
try:
+ iworker, addr, newtask_addrs, batch_result = resultQueue.get(
+ timeout=nexttimeout)
+ log.debug('Results received for worker %d, %s, new tasks: %d',
+ iworker,addr,len(newtask_addrs))
try:
- tasks.remove(addr)
- except ValueError:
- log.warn('worker %s failed to remove from tasks: %s',
- iworker,addr)
- total_tasks += len(newtask_addrs)
- for newaddr in newtask_addrs:
- tasks.append(newaddr)
- except KeyError:
- log.debug("Got result for unknown task? %s", addr)
- log.debug("current: %s",str(list(tasks)[0]))
- else:
- completed.append([addr,batch_result])
- self.consolidate(result, batch_result)
- if (self.config.stopOnError
- and not result.wasSuccessful()):
- # set the stop condition
- shouldStop.set()
- break
- if self.config.multiprocess_restartworker:
- log.debug('joining worker %s',iworker)
- # wait for working, but not that important if worker
- # cannot be joined in fact, for workers that add to
- # testQueue, they will not terminate until all their
- # items are read
- workers[iworker].join(timeout=1)
- if not shouldStop.is_set() and not testQueue.empty():
- log.debug('starting new process on worker %s',iworker)
- currentaddr = Value('c',bytes_(''))
- currentstart = Value('d',time.time())
- keyboardCaught = Event()
- workers[iworker] = Process(target=runner,
- args=(iworker, testQueue,
- resultQueue,
- currentaddr,
- currentstart,
- keyboardCaught,
- shouldStop,
- self.loaderClass,
- result.__class__,
- pickle.dumps(self.config)))
- workers[iworker].currentaddr = currentaddr
- workers[iworker].currentstart = currentstart
- workers[iworker].keyboardCaught = keyboardCaught
- workers[iworker].start()
- except Empty:
- log.debug("Timed out with %s tasks pending "
- "(empty testQueue=%d): %s",
- len(tasks),testQueue.empty(),str(tasks))
- any_alive = False
- for iworker, w in enumerate(workers):
- if w.is_alive():
- worker_addr = bytes_(w.currentaddr.value,'ascii')
- timeprocessing = time.time() - w.currentstart.value
- if ( len(worker_addr) == 0
+ try:
+ tasks.remove(addr)
+ except ValueError:
+ log.warn('worker %s failed to remove from tasks: %s',
+ iworker,addr)
+ total_tasks += len(newtask_addrs)
+ tasks.extend(newtask_addrs)
+ except KeyError:
+ log.debug("Got result for unknown task? %s", addr)
+ log.debug("current: %s",str(list(tasks)[0]))
+ else:
+ completed.append([addr,batch_result])
+ self.consolidate(result, batch_result)
+ if (self.config.stopOnError
+ and not result.wasSuccessful()):
+ # set the stop condition
+ shouldStop.set()
+ break
+ if self.config.multiprocess_restartworker:
+ log.debug('joining worker %s',iworker)
+ # wait for working, but not that important if worker
+ # cannot be joined in fact, for workers that add to
+ # testQueue, they will not terminate until all their
+ # items are read
+ workers[iworker].join(timeout=1)
+ if not shouldStop.is_set() and not testQueue.empty():
+ log.debug('starting new process on worker %s',iworker)
+ workers[iworker] = self.startProcess(iworker, testQueue, resultQueue, shouldStop, result)
+ except Empty:
+ log.debug("Timed out with %s tasks pending "
+ "(empty testQueue=%r): %s",
+ len(tasks),testQueue.empty(),str(tasks))
+ any_alive = False
+ for iworker, w in enumerate(workers):
+ if w.is_alive():
+ worker_addr = bytes_(w.currentaddr.value,'ascii')
+ timeprocessing = time.time() - w.currentstart.value
+ if ( len(worker_addr) == 0
+ and timeprocessing > self.config.multiprocess_timeout-0.1):
+ log.debug('worker %d has finished its work item, '
+ 'but is not exiting? do we wait for it?',
+ iworker)
+ else:
+ any_alive = True
+ if (len(worker_addr) > 0
and timeprocessing > self.config.multiprocess_timeout-0.1):
- log.debug('worker %d has finished its work item, '
- 'but is not exiting? do we wait for it?',
- iworker)
- else:
- any_alive = True
- if (len(worker_addr) > 0
- and timeprocessing > self.config.multiprocess_timeout-0.1):
- log.debug('timed out worker %s: %s',
- iworker,worker_addr)
- w.currentaddr.value = bytes_('')
- # If the process is in C++ code, sending a SIGINT
- # might not send a python KeybordInterrupt exception
- # therefore, send multiple signals until an
- # exception is caught. If this takes too long, then
- # terminate the process
- w.keyboardCaught.clear()
- startkilltime = time.time()
- while not w.keyboardCaught.is_set() and w.is_alive():
- if time.time()-startkilltime > self.waitkilltime:
- # have to terminate...
- log.error("terminating worker %s",iworker)
- w.terminate()
- currentaddr = Value('c',bytes_(''))
- currentstart = Value('d',time.time())
- keyboardCaught = Event()
- workers[iworker] = Process(target=runner,
- args=(iworker, testQueue, resultQueue,
- currentaddr, currentstart,
- keyboardCaught, shouldStop,
- self.loaderClass,
- result.__class__,
- pickle.dumps(self.config)))
- workers[iworker].currentaddr = currentaddr
- workers[iworker].currentstart = currentstart
- workers[iworker].keyboardCaught = keyboardCaught
- workers[iworker].start()
- # there is a small probability that the
- # terminated process might send a result,
- # which has to be specially handled or
- # else processes might get orphaned.
- w = workers[iworker]
- break
- os.kill(w.pid, signal.SIGINT)
- time.sleep(0.1)
- if not any_alive and testQueue.empty():
- log.debug("All workers dead")
- break
- nexttimeout=self.config.multiprocess_timeout
- for w in workers:
- if w.is_alive() and len(w.currentaddr.value) > 0:
- timeprocessing = time.time()-w.currentstart.value
- if timeprocessing <= self.config.multiprocess_timeout:
- nexttimeout = min(nexttimeout,
- self.config.multiprocess_timeout-timeprocessing)
-
- log.debug("Completed %s tasks (%s remain)", len(completed), len(tasks))
-
- for case in to_teardown:
- log.debug("Tearing down shared fixtures for %s", case)
- try:
- case.tearDown()
- except (KeyboardInterrupt, SystemExit):
- raise
- except:
- result.addError(case, sys.exc_info())
+ log.debug('timed out worker %s: %s',
+ iworker,worker_addr)
+ w.currentaddr.value = bytes_('')
+ # If the process is in C++ code, sending a SIGILL
+ # might not send a python KeybordInterrupt exception
+ # therefore, send multiple signals until an
+ # exception is caught. If this takes too long, then
+ # terminate the process
+ w.keyboardCaught.clear()
+ startkilltime = time.time()
+ while not w.keyboardCaught.is_set() and w.is_alive():
+ if time.time()-startkilltime > self.waitkilltime:
+ # have to terminate...
+ log.error("terminating worker %s",iworker)
+ w.terminate()
+ # there is a small probability that the
+ # terminated process might send a result,
+ # which has to be specially handled or
+ # else processes might get orphaned.
+ workers[iworker] = w = self.startProcess(iworker, testQueue, resultQueue, shouldStop, result)
+ break
+ os.kill(w.pid, signal.SIGILL)
+ time.sleep(0.1)
+ if not any_alive and testQueue.empty():
+ log.debug("All workers dead")
+ break
+ nexttimeout=self.config.multiprocess_timeout
+ for w in workers:
+ if w.is_alive() and len(w.currentaddr.value) > 0:
+ timeprocessing = time.time()-w.currentstart.value
+ if timeprocessing <= self.config.multiprocess_timeout:
+ nexttimeout = min(nexttimeout,
+ self.config.multiprocess_timeout-timeprocessing)
+ log.debug("Completed %s tasks (%s remain)", len(completed), len(tasks))
+
+ except (KeyboardInterrupt, SystemExit), e:
+ log.info('parent received ctrl-c when waiting for test results')
+ thrownError = e
+ #resultQueue.get(False)
+
+ result.addError(test, sys.exc_info())
+
+ try:
+ for case in to_teardown:
+ log.debug("Tearing down shared fixtures for %s", case)
+ try:
+ case.tearDown()
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ except:
+ result.addError(case, sys.exc_info())
- stop = time.time()
+ stop = time.time()
- # first write since can freeze on shutting down processes
- result.printErrors()
- result.printSummary(start, stop)
- self.config.plugins.finalize(result)
+ # first write since can freeze on shutting down processes
+ result.printErrors()
+ result.printSummary(start, stop)
+ self.config.plugins.finalize(result)
- log.debug("Tell all workers to stop")
- for w in workers:
- if w.is_alive():
- testQueue.put('STOP', block=False)
+ if thrownError is None:
+ log.debug("Tell all workers to stop")
+ for w in workers:
+ if w.is_alive():
+ testQueue.put('STOP', block=False)
- # wait for the workers to end
- try:
+ # wait for the workers to end
for iworker,worker in enumerate(workers):
if worker.is_alive():
log.debug('joining worker %s',iworker)
- worker.join()#10)
+ worker.join()
if worker.is_alive():
log.debug('failed to join worker %s',iworker)
- except KeyboardInterrupt:
- log.info('parent received ctrl-c')
+ except (KeyboardInterrupt, SystemExit):
+ log.info('parent received ctrl-c when shutting down: stop all processes')
for worker in workers:
- worker.terminate()
- worker.join()
+ if worker.is_alive():
+ worker.terminate()
+ if thrownError: raise thrownError
+ else: raise
return result
@@ -573,7 +583,7 @@ class MultiProcessTestRunner(TextTestRunner):
for batch in self.nextBatch(case):
yield batch
- def checkCanSplit(self, context, fixt):
+ def checkCanSplit(context, fixt):
"""
Callback that we use to check whether the fixtures found in a
context or ancestor are ones we care about.
@@ -587,6 +597,7 @@ class MultiProcessTestRunner(TextTestRunner):
if getattr(context, '_multiprocess_can_split_', False):
return False
return True
+ checkCanSplit = staticmethod(checkCanSplit)
def sharedFixtures(self, case):
context = getattr(case, 'context', None)
@@ -688,17 +699,28 @@ def __runner(ix, testQueue, resultQueue, currentaddr, currentstart,
test(result)
currentaddr.value = bytes_('')
resultQueue.put((ix, test_addr, test.tasks, batch(result)))
- except KeyboardInterrupt:
- keyboardCaught.set()
- if len(currentaddr.value) > 0:
- log.exception('Worker %s keyboard interrupt, failing '
- 'current test %s',ix,test_addr)
+ except KeyboardInterrupt, e: #TimedOutException:
+ timeout = isinstance(e, TimedOutException)
+ if timeout:
+ keyboardCaught.set()
+ if len(currentaddr.value):
+ if timeout:
+ msg = 'Worker %s timed out, failing current test %s'
+ else:
+ msg = 'Worker %s keyboard interrupt, failing current test %s'
+ log.exception(msg,ix,test_addr)
currentaddr.value = bytes_('')
failure.Failure(*sys.exc_info())(result)
resultQueue.put((ix, test_addr, test.tasks, batch(result)))
else:
- log.debug('Worker %s test %s timed out',ix,test_addr)
+ if timeout:
+ msg = 'Worker %s test %s timed out'
+ else:
+ msg = 'Worker %s test %s keyboard interrupt'
+ log.debug(msg,ix,test_addr)
resultQueue.put((ix, test_addr, test.tasks, batch(result)))
+ if not timeout:
+ raise
except SystemExit:
currentaddr.value = bytes_('')
log.exception('Worker %s system exit',ix)
@@ -746,6 +768,7 @@ class NoSharedFixtureContextSuite(ContextSuite):
else:
result, orig = result, result
try:
+ #log.debug('setUp for %s', id(self));
self.setUp()
except KeyboardInterrupt:
raise
@@ -754,42 +777,40 @@ class NoSharedFixtureContextSuite(ContextSuite):
result.addError(self, self._exc_info())
return
try:
- localtests = [test for test in self._tests]
- if len(localtests) > 1 and self.testQueue is not None:
- log.debug("queue %d tests"%len(localtests))
- for test in localtests:
- if isinstance(test.test,nose.failure.Failure):
- # proably failed in the generator, so execute directly
- # to get the exception
- test(orig)
- else:
- MultiProcessTestRunner.addtask(self.testQueue,
- self.tasks, test)
- else:
- for test in localtests:
- if (isinstance(test,nose.case.Test)
- and self.arg is not None):
- test.test.arg = self.arg
+ for test in self._tests:
+ if (isinstance(test,nose.case.Test)
+ and self.arg is not None):
+ test.test.arg = self.arg
+ else:
+ test.arg = self.arg
+ test.testQueue = self.testQueue
+ test.tasks = self.tasks
+ if result.shouldStop:
+ log.debug("stopping")
+ break
+ # each nose.case.Test will create its own result proxy
+ # so the cases need the original result, to avoid proxy
+ # chains
+ #log.debug('running test %s in suite %s', test, self);
+ try:
+ test(orig)
+ except KeyboardInterrupt, e:
+ timeout = isinstance(e, TimedOutException)
+ if timeout:
+ msg = 'Timeout when running test %s in suite %s'
else:
- test.arg = self.arg
- test.testQueue = self.testQueue
- test.tasks = self.tasks
- if result.shouldStop:
- log.debug("stopping")
- break
- # each nose.case.Test will create its own result proxy
- # so the cases need the original result, to avoid proxy
- # chains
- try:
- test(orig)
- except KeyboardInterrupt,e:
- err = (TimedOutException,TimedOutException(str(test)),
- sys.exc_info()[2])
- test.config.plugins.addError(test,err)
- orig.addError(test,err)
+ msg = 'KeyboardInterrupt when running test %s in suite %s'
+ log.debug(msg, test, self)
+ err = (TimedOutException,TimedOutException(str(test)),
+ sys.exc_info()[2])
+ test.config.plugins.addError(test,err)
+ orig.addError(test,err)
+ if not timeout:
+ raise
finally:
self.has_run = True
try:
+ #log.debug('tearDown for %s', id(self));
self.tearDown()
except KeyboardInterrupt:
raise