summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHeng Liu <liucougar@gmail.com>2011-11-03 16:04:42 -0700
committerHeng Liu <liucougar@gmail.com>2011-11-03 16:04:42 -0700
commitc0afc80c65e59b381e6544df3fdaf82068959b23 (patch)
treeff656e8ab50d1bb02f9ebb04230b31032e1d6700
parent104de87d8886c75c0a97b6e5470279d732575e23 (diff)
downloadnose-c0afc80c65e59b381e6544df3fdaf82068959b23.tar.gz
properly handle ctrl+c (keyboard interrupt):
first ctrl+c should interrupt all tests currently being executed by multiprocessing plugin, and no further tests should be run in these subprocesses after first keyboardinterrupt, multiprocessing plugin should still try to execute any scheduled teardowns. second keyboardinterrupt should interrupt teardown execution and return immediately. this also fixes problem where subprocesses keeps on running even after parent nose is killed by keyboardinterrupt. added functional tests for all the mentioned cases
-rw-r--r--functional_tests/test_multiprocessing/support/fake_nosetest.py13
-rw-r--r--functional_tests/test_multiprocessing/support/keyboardinterrupt.py27
-rw-r--r--functional_tests/test_multiprocessing/support/keyboardinterrupt_twice.py27
-rw-r--r--functional_tests/test_multiprocessing/test_keyboardinterrupt.py70
-rw-r--r--nose/plugins/multiprocess.py348
5 files changed, 319 insertions, 166 deletions
diff --git a/functional_tests/test_multiprocessing/support/fake_nosetest.py b/functional_tests/test_multiprocessing/support/fake_nosetest.py
new file mode 100644
index 0000000..886c569
--- /dev/null
+++ b/functional_tests/test_multiprocessing/support/fake_nosetest.py
@@ -0,0 +1,13 @@
+import sys
+
+import nose
+
+from nose.plugins.multiprocess import MultiProcess
+from nose.config import Config
+from nose.plugins.manager import PluginManager
+
+if __name__ == '__main__':
+ if len(sys.argv) < 2:
+ print "USAGE: %s TEST_FILE" % sys.argv[0]
+ sys.exit(1)
+ nose.main(defaultTest=sys.argv[1], argv=[sys.argv[0],'--processes=1','-v'], config=Config(plugins=PluginManager(plugins=[MultiProcess()])))
diff --git a/functional_tests/test_multiprocessing/support/keyboardinterrupt.py b/functional_tests/test_multiprocessing/support/keyboardinterrupt.py
new file mode 100644
index 0000000..eeffd4a
--- /dev/null
+++ b/functional_tests/test_multiprocessing/support/keyboardinterrupt.py
@@ -0,0 +1,27 @@
+from tempfile import mktemp
+from time import sleep
+
+def log(w):
+ f = open(logfile, 'a')
+ f.write(w+"\n")
+ f.close()
+#make sure all tests in this file are dispatched to the same subprocess
+def setup():
+ global logfile
+ logfile = mktemp()
+ print "tempfile is:",logfile
+
+ log('setup')
+ pass
+
+def test_timeout():
+ log('test_timeout')
+ sleep(2)
+ log('test_timeout_finished')
+
+# check timeout will not prevent remaining tests dispatched to the same subprocess to continue to run
+def test_pass():
+ log('test_pass')
+
+def teardown():
+ log('teardown')
diff --git a/functional_tests/test_multiprocessing/support/keyboardinterrupt_twice.py b/functional_tests/test_multiprocessing/support/keyboardinterrupt_twice.py
new file mode 100644
index 0000000..b7d845a
--- /dev/null
+++ b/functional_tests/test_multiprocessing/support/keyboardinterrupt_twice.py
@@ -0,0 +1,27 @@
+from tempfile import mktemp
+from time import sleep
+
+logfile = mktemp()
+print "tempfile is:",logfile
+
+def log(w):
+ f = open(logfile, 'a')
+ f.write(w+"\n")
+ f.close()
+#make sure all tests in this file are dispatched to the same subprocess
+def setup():
+ log('setup')
+
+def test_timeout():
+ log('test_timeout')
+ sleep(2)
+ log('test_timeout_finished')
+
+# check timeout will not prevent remaining tests dispatched to the same subprocess to continue to run
+def test_pass():
+ log('test_pass')
+
+def teardown():
+ log('teardown')
+ sleep(10)
+ log('teardown_finished')
diff --git a/functional_tests/test_multiprocessing/test_keyboardinterrupt.py b/functional_tests/test_multiprocessing/test_keyboardinterrupt.py
new file mode 100644
index 0000000..d0cc508
--- /dev/null
+++ b/functional_tests/test_multiprocessing/test_keyboardinterrupt.py
@@ -0,0 +1,70 @@
+from subprocess import Popen,PIPE
+import os
+import sys
+from time import sleep
+import signal
+
+import nose
+
+from . import support
+
+PYTHONPATH = os.environ['PYTHONPATH'] if 'PYTHONPATH' in os.environ else ''
+def setup():
+ nose_parent_dir = os.path.normpath(os.path.join(os.path.abspath(os.path.dirname(nose.__file__)),'..'))
+ paths = [nose_parent_dir]
+ if PYTHONPATH:
+ paths.append(PYTHONPATH)
+ os.environ['PYTHONPATH'] = os.pathsep.join(paths)
+def teardown():
+ if PYTHONPATH:
+ os.environ['PYTHONPATH'] = PYTHONPATH
+ else:
+ del os.environ['PYTHONPATH']
+
+runner = os.path.join(support, 'fake_nosetest.py')
+def keyboardinterrupt(case):
+ #os.setsid would create a process group so signals sent to the
+ #parent process will propogates to all children processes
+ process = Popen([sys.executable,runner,os.path.join(support,case)], preexec_fn=os.setsid, stdout=PIPE, stderr=PIPE, bufsize=-1)
+
+ sleep(0.5)
+
+ os.killpg(process.pid, signal.SIGINT)
+ return process
+
+def get_log_content(stdout):
+ prefix = 'tempfile is: '
+ if not stdout.startswith(prefix):
+ raise Exception('stdout does not contain tmp file name: '+stdout)
+ logfile = stdout[len(prefix):].strip() #remove trailing new line char
+ f = open(logfile)
+ content = f.read()
+ f.close()
+ return content
+
+def test_keyboardinterrupt():
+ process = keyboardinterrupt('keyboardinterrupt.py')
+ stdout, stderr = process.communicate(None)
+ log = get_log_content(stdout)
+ assert 'setup' in log
+ assert 'test_timeout' in log
+ assert 'test_timeout_finished' not in log
+ assert 'test_pass' not in log
+ assert 'teardown' in log
+ assert 'Ran 0 tests' in stderr
+ assert 'KeyboardInterrupt' in stderr
+
+def test_keyboardinterrupt_twice():
+ process = keyboardinterrupt('keyboardinterrupt_twice.py')
+ sleep(0.5)
+ os.killpg(process.pid, signal.SIGINT)
+ stdout, stderr = process.communicate(None)
+ log = get_log_content(stdout)
+ assert 'setup' in log
+ assert 'test_timeout' in log
+ assert 'test_timeout_finished' not in log
+ assert 'test_pass' not in log
+ assert 'teardown' in log
+ assert 'teardown_finished' not in log
+ assert 'Ran 0 tests' in stderr
+ assert 'KeyboardInterrupt' in stderr
diff --git a/nose/plugins/multiprocess.py b/nose/plugins/multiprocess.py
index fc4235b..abebf77 100644
--- a/nose/plugins/multiprocess.py
+++ b/nose/plugins/multiprocess.py
@@ -130,7 +130,8 @@ log = logging.getLogger(__name__)
Process = Queue = Pool = Event = Value = Array = None
-class TimedOutException(Exception):
+# have to inherit KeyboardInterrupt to it will interrupt process properly
+class TimedOutException(KeyboardInterrupt):
def __init__(self, value = "Timed Out"):
self.value = value
def __str__(self):
@@ -247,9 +248,12 @@ class MultiProcess(Plugin):
config=self.config,
loaderClass=self.loaderClass)
+def signalhandler(sig, frame):
+ raise TimedOutException()
+
class MultiProcessTestRunner(TextTestRunner):
waitkilltime = 5.0 # max time to wait to terminate a process that does not
- # respond to SIGINT
+ # respond to SIGILL
def __init__(self, **kw):
self.loaderClass = kw.pop('loaderClass', loader.defaultTestLoader)
super(MultiProcessTestRunner, self).__init__(**kw)
@@ -294,6 +298,28 @@ class MultiProcessTestRunner(TextTestRunner):
log.debug("Queued test %s (%s) to %s",
len(tasks), test_addr, testQueue)
+ def startProcess(self, iworker, testQueue, resultQueue, shouldStop, result):
+ currentaddr = Value('c',bytes_(''))
+ currentstart = Value('d',time.time())
+ keyboardCaught = Event()
+ p = Process(target=runner,
+ args=(iworker, testQueue,
+ resultQueue,
+ currentaddr,
+ currentstart,
+ keyboardCaught,
+ shouldStop,
+ self.loaderClass,
+ result.__class__,
+ pickle.dumps(self.config)))
+ p.currentaddr = currentaddr
+ p.currentstart = currentstart
+ p.keyboardCaught = keyboardCaught
+ old = signal.signal(signal.SIGILL, signalhandler)
+ p.start()
+ signal.signal(signal.SIGILL, old)
+ return p
+
def run(self, test):
"""
Execute the test (which may be a test suite). If the test is a suite,
@@ -327,20 +353,7 @@ class MultiProcessTestRunner(TextTestRunner):
log.debug("Starting %s workers", self.config.multiprocess_workers)
for i in range(self.config.multiprocess_workers):
- currentaddr = Value('c',bytes_(''))
- currentstart = Value('d',0.0)
- keyboardCaught = Event()
- p = Process(target=runner, args=(i, testQueue, resultQueue,
- currentaddr, currentstart,
- keyboardCaught, shouldStop,
- self.loaderClass,
- result.__class__,
- pickle.dumps(self.config)))
- p.currentaddr = currentaddr
- p.currentstart = currentstart
- p.keyboardCaught = keyboardCaught
- # p.setDaemon(True)
- p.start()
+ p = self.startProcess(i, testQueue, resultQueue, shouldStop, result)
workers.append(p)
log.debug("Started worker process %s", i+1)
@@ -348,162 +361,143 @@ class MultiProcessTestRunner(TextTestRunner):
# need to keep track of the next time to check for timeouts in case
# more than one process times out at the same time.
nexttimeout=self.config.multiprocess_timeout
- while tasks:
- log.debug("Waiting for results (%s/%s tasks), next timeout=%.3fs",
- len(completed), total_tasks,nexttimeout)
- try:
- iworker, addr, newtask_addrs, batch_result = resultQueue.get(
- timeout=nexttimeout)
- log.debug('Results received for worker %d, %s, new tasks: %d',
- iworker,addr,len(newtask_addrs))
+ thrownError = None
+
+ try:
+ while tasks:
+ log.debug("Waiting for results (%s/%s tasks), next timeout=%.3fs",
+ len(completed), total_tasks,nexttimeout)
try:
+ iworker, addr, newtask_addrs, batch_result = resultQueue.get(
+ timeout=nexttimeout)
+ log.debug('Results received for worker %d, %s, new tasks: %d',
+ iworker,addr,len(newtask_addrs))
try:
- tasks.remove(addr)
- except ValueError:
- log.warn('worker %s failed to remove from tasks: %s',
- iworker,addr)
- total_tasks += len(newtask_addrs)
- for newaddr in newtask_addrs:
- tasks.append(newaddr)
- except KeyError:
- log.debug("Got result for unknown task? %s", addr)
- log.debug("current: %s",str(list(tasks)[0]))
- else:
- completed.append([addr,batch_result])
- self.consolidate(result, batch_result)
- if (self.config.stopOnError
- and not result.wasSuccessful()):
- # set the stop condition
- shouldStop.set()
- break
- if self.config.multiprocess_restartworker:
- log.debug('joining worker %s',iworker)
- # wait for working, but not that important if worker
- # cannot be joined in fact, for workers that add to
- # testQueue, they will not terminate until all their
- # items are read
- workers[iworker].join(timeout=1)
- if not shouldStop.is_set() and not testQueue.empty():
- log.debug('starting new process on worker %s',iworker)
- currentaddr = Value('c',bytes_(''))
- currentstart = Value('d',time.time())
- keyboardCaught = Event()
- workers[iworker] = Process(target=runner,
- args=(iworker, testQueue,
- resultQueue,
- currentaddr,
- currentstart,
- keyboardCaught,
- shouldStop,
- self.loaderClass,
- result.__class__,
- pickle.dumps(self.config)))
- workers[iworker].currentaddr = currentaddr
- workers[iworker].currentstart = currentstart
- workers[iworker].keyboardCaught = keyboardCaught
- workers[iworker].start()
- except Empty:
- log.debug("Timed out with %s tasks pending "
- "(empty testQueue=%d): %s",
- len(tasks),testQueue.empty(),str(tasks))
- any_alive = False
- for iworker, w in enumerate(workers):
- if w.is_alive():
- worker_addr = bytes_(w.currentaddr.value,'ascii')
- timeprocessing = time.time() - w.currentstart.value
- if ( len(worker_addr) == 0
+ try:
+ tasks.remove(addr)
+ except ValueError:
+ log.warn('worker %s failed to remove from tasks: %s',
+ iworker,addr)
+ total_tasks += len(newtask_addrs)
+ tasks.extend(newtask_addrs)
+ except KeyError:
+ log.debug("Got result for unknown task? %s", addr)
+ log.debug("current: %s",str(list(tasks)[0]))
+ else:
+ completed.append([addr,batch_result])
+ self.consolidate(result, batch_result)
+ if (self.config.stopOnError
+ and not result.wasSuccessful()):
+ # set the stop condition
+ shouldStop.set()
+ break
+ if self.config.multiprocess_restartworker:
+ log.debug('joining worker %s',iworker)
+ # wait for working, but not that important if worker
+ # cannot be joined in fact, for workers that add to
+ # testQueue, they will not terminate until all their
+ # items are read
+ workers[iworker].join(timeout=1)
+ if not shouldStop.is_set() and not testQueue.empty():
+ log.debug('starting new process on worker %s',iworker)
+ workers[iworker] = self.startProcess(iworker, testQueue, resultQueue, shouldStop, result)
+ except Empty:
+ log.debug("Timed out with %s tasks pending "
+ "(empty testQueue=%r): %s",
+ len(tasks),testQueue.empty(),str(tasks))
+ any_alive = False
+ for iworker, w in enumerate(workers):
+ if w.is_alive():
+ worker_addr = bytes_(w.currentaddr.value,'ascii')
+ timeprocessing = time.time() - w.currentstart.value
+ if ( len(worker_addr) == 0
+ and timeprocessing > self.config.multiprocess_timeout-0.1):
+ log.debug('worker %d has finished its work item, '
+ 'but is not exiting? do we wait for it?',
+ iworker)
+ else:
+ any_alive = True
+ if (len(worker_addr) > 0
and timeprocessing > self.config.multiprocess_timeout-0.1):
- log.debug('worker %d has finished its work item, '
- 'but is not exiting? do we wait for it?',
- iworker)
- else:
- any_alive = True
- if (len(worker_addr) > 0
- and timeprocessing > self.config.multiprocess_timeout-0.1):
- log.debug('timed out worker %s: %s',
- iworker,worker_addr)
- w.currentaddr.value = bytes_('')
- # If the process is in C++ code, sending a SIGINT
- # might not send a python KeybordInterrupt exception
- # therefore, send multiple signals until an
- # exception is caught. If this takes too long, then
- # terminate the process
- w.keyboardCaught.clear()
- startkilltime = time.time()
- while not w.keyboardCaught.is_set() and w.is_alive():
- if time.time()-startkilltime > self.waitkilltime:
- # have to terminate...
- log.error("terminating worker %s",iworker)
- w.terminate()
- currentaddr = Value('c',bytes_(''))
- currentstart = Value('d',time.time())
- keyboardCaught = Event()
- workers[iworker] = Process(target=runner,
- args=(iworker, testQueue, resultQueue,
- currentaddr, currentstart,
- keyboardCaught, shouldStop,
- self.loaderClass,
- result.__class__,
- pickle.dumps(self.config)))
- workers[iworker].currentaddr = currentaddr
- workers[iworker].currentstart = currentstart
- workers[iworker].keyboardCaught = keyboardCaught
- workers[iworker].start()
- # there is a small probability that the
- # terminated process might send a result,
- # which has to be specially handled or
- # else processes might get orphaned.
- w = workers[iworker]
- break
- os.kill(w.pid, signal.SIGINT)
- time.sleep(0.1)
- if not any_alive and testQueue.empty():
- log.debug("All workers dead")
- break
- nexttimeout=self.config.multiprocess_timeout
- for w in workers:
- if w.is_alive() and len(w.currentaddr.value) > 0:
- timeprocessing = time.time()-w.currentstart.value
- if timeprocessing <= self.config.multiprocess_timeout:
- nexttimeout = min(nexttimeout,
- self.config.multiprocess_timeout-timeprocessing)
-
- log.debug("Completed %s tasks (%s remain)", len(completed), len(tasks))
-
- for case in to_teardown:
- log.debug("Tearing down shared fixtures for %s", case)
- try:
- case.tearDown()
- except (KeyboardInterrupt, SystemExit):
- raise
- except:
- result.addError(case, sys.exc_info())
+ log.debug('timed out worker %s: %s',
+ iworker,worker_addr)
+ w.currentaddr.value = bytes_('')
+ # If the process is in C++ code, sending a SIGILL
+ # might not send a python KeybordInterrupt exception
+ # therefore, send multiple signals until an
+ # exception is caught. If this takes too long, then
+ # terminate the process
+ w.keyboardCaught.clear()
+ startkilltime = time.time()
+ while not w.keyboardCaught.is_set() and w.is_alive():
+ if time.time()-startkilltime > self.waitkilltime:
+ # have to terminate...
+ log.error("terminating worker %s",iworker)
+ w.terminate()
+ # there is a small probability that the
+ # terminated process might send a result,
+ # which has to be specially handled or
+ # else processes might get orphaned.
+ workers[iworker] = w = self.startProcess(iworker, testQueue, resultQueue, shouldStop, result)
+ break
+ os.kill(w.pid, signal.SIGILL)
+ time.sleep(0.1)
+ if not any_alive and testQueue.empty():
+ log.debug("All workers dead")
+ break
+ nexttimeout=self.config.multiprocess_timeout
+ for w in workers:
+ if w.is_alive() and len(w.currentaddr.value) > 0:
+ timeprocessing = time.time()-w.currentstart.value
+ if timeprocessing <= self.config.multiprocess_timeout:
+ nexttimeout = min(nexttimeout,
+ self.config.multiprocess_timeout-timeprocessing)
+ log.debug("Completed %s tasks (%s remain)", len(completed), len(tasks))
+
+ except (KeyboardInterrupt, SystemExit), e:
+ log.info('parent received ctrl-c when waiting for test results')
+ thrownError = e
+ #resultQueue.get(False)
+
+ result.addError(test, sys.exc_info())
+
+ try:
+ for case in to_teardown:
+ log.debug("Tearing down shared fixtures for %s", case)
+ try:
+ case.tearDown()
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ except:
+ result.addError(case, sys.exc_info())
- stop = time.time()
+ stop = time.time()
- # first write since can freeze on shutting down processes
- result.printErrors()
- result.printSummary(start, stop)
- self.config.plugins.finalize(result)
+ # first write since can freeze on shutting down processes
+ result.printErrors()
+ result.printSummary(start, stop)
+ self.config.plugins.finalize(result)
- log.debug("Tell all workers to stop")
- for w in workers:
- if w.is_alive():
- testQueue.put('STOP', block=False)
+ if thrownError is None:
+ log.debug("Tell all workers to stop")
+ for w in workers:
+ if w.is_alive():
+ testQueue.put('STOP', block=False)
- # wait for the workers to end
- try:
+ # wait for the workers to end
for iworker,worker in enumerate(workers):
if worker.is_alive():
log.debug('joining worker %s',iworker)
- worker.join()#10)
+ worker.join()
if worker.is_alive():
log.debug('failed to join worker %s',iworker)
- except KeyboardInterrupt:
- log.info('parent received ctrl-c')
+ except (KeyboardInterrupt, SystemExit):
+ log.info('parent received ctrl-c when shutting down: stop all processes')
for worker in workers:
- worker.terminate()
- worker.join()
+ if worker.is_alive():
+ worker.terminate()
+ if thrownError: raise thrownError
+ else: raise
return result
@@ -696,17 +690,28 @@ def __runner(ix, testQueue, resultQueue, currentaddr, currentstart,
test(result)
currentaddr.value = bytes_('')
resultQueue.put((ix, test_addr, test.tasks, batch(result)))
- except KeyboardInterrupt:
- keyboardCaught.set()
- if len(currentaddr.value) > 0:
- log.exception('Worker %s keyboard interrupt, failing '
- 'current test %s',ix,test_addr)
+ except KeyboardInterrupt, e: #TimedOutException:
+ timeout = isinstance(e, TimedOutException)
+ if timeout:
+ keyboardCaught.set()
+ if len(currentaddr.value):
+ if timeout:
+ msg = 'Worker %s timed out, failing current test %s'
+ else:
+ msg = 'Worker %s keyboard interrupt, failing current test %s'
+ log.exception(msg,ix,test_addr)
currentaddr.value = bytes_('')
failure.Failure(*sys.exc_info())(result)
resultQueue.put((ix, test_addr, test.tasks, batch(result)))
else:
- log.debug('Worker %s test %s timed out',ix,test_addr)
+ if timeout:
+ msg = 'Worker %s test %s timed out'
+ else:
+ msg = 'Worker %s test %s keyboard interrupt'
+ log.debug(msg,ix,test_addr)
resultQueue.put((ix, test_addr, test.tasks, batch(result)))
+ if not timeout:
+ raise
except SystemExit:
currentaddr.value = bytes_('')
log.exception('Worker %s system exit',ix)
@@ -754,6 +759,7 @@ class NoSharedFixtureContextSuite(ContextSuite):
else:
result, orig = result, result
try:
+ #log.debug('setUp for %s', id(self));
self.setUp()
except KeyboardInterrupt:
raise
@@ -776,16 +782,26 @@ class NoSharedFixtureContextSuite(ContextSuite):
# each nose.case.Test will create its own result proxy
# so the cases need the original result, to avoid proxy
# chains
+ #log.debug('running test %s in suite %s', test, self);
try:
test(orig)
- except KeyboardInterrupt,e:
+ except KeyboardInterrupt, e:
+ timeout = isinstance(e, TimedOutException)
+ if timeout:
+ msg = 'Timeout when running test %s in suite %s'
+ else:
+ msg = 'KeyboardInterrupt when running test %s in suite %s'
+ log.debug(msg, test, self)
err = (TimedOutException,TimedOutException(str(test)),
sys.exc_info()[2])
test.config.plugins.addError(test,err)
orig.addError(test,err)
+ if not timeout:
+ raise
finally:
self.has_run = True
try:
+ #log.debug('tearDown for %s', id(self));
self.tearDown()
except KeyboardInterrupt:
raise