summaryrefslogtreecommitdiff
path: root/nose
diff options
context:
space:
mode:
authorHeng Liu <liucougar@gmail.com>2011-10-26 16:13:20 -0700
committerHeng Liu <liucougar@gmail.com>2011-10-26 16:13:20 -0700
commitec9eb1d2ffb2ea897fac5b80c9d941b3845cc6a0 (patch)
tree6335a4348c5f20c3fc5378e5fe078f3d6465c383 /nose
parent7eb66b631fd32f5c40b870118fc1a1fd2a575869 (diff)
downloadnose-ec9eb1d2ffb2ea897fac5b80c9d941b3845cc6a0.tar.gz
properly distribute tests under shared fixtures to multiprocess worker
this is a regression introduced when fixing Issue 462, but there is no test coverage for this issue. added a functional test case to make sure it's working as expected
Diffstat (limited to 'nose')
-rw-r--r--nose/plugins/multiprocess.py128
1 files changed, 61 insertions, 67 deletions
diff --git a/nose/plugins/multiprocess.py b/nose/plugins/multiprocess.py
index 604752a..fc4235b 100644
--- a/nose/plugins/multiprocess.py
+++ b/nose/plugins/multiprocess.py
@@ -254,35 +254,7 @@ class MultiProcessTestRunner(TextTestRunner):
self.loaderClass = kw.pop('loaderClass', loader.defaultTestLoader)
super(MultiProcessTestRunner, self).__init__(**kw)
- def run(self, test):
- """
- Execute the test (which may be a test suite). If the test is a suite,
- distribute it out among as many processes as have been configured, at
- as fine a level as is possible given the context fixtures defined in
- the suite or any sub-suites.
-
- """
- log.debug("%s.run(%s) (%s)", self, test, os.getpid())
- wrapper = self.config.plugins.prepareTest(test)
- if wrapper is not None:
- test = wrapper
-
- # plugins can decorate or capture the output stream
- wrapped = self.config.plugins.setOutputStream(self.stream)
- if wrapped is not None:
- self.stream = wrapped
-
- testQueue = Queue()
- resultQueue = Queue()
- tasks = []
- completed = []
- workers = []
- to_teardown = []
- shouldStop = Event()
-
- result = self._makeResult()
- start = time.time()
-
+ def collect(self, test, testQueue, tasks, to_teardown, result):
# dispatch and collect results
# put indexes only on queue because tests aren't picklable
for case in self.nextBatch(test):
@@ -308,16 +280,51 @@ class MultiProcessTestRunner(TextTestRunner):
result.addError(case, sys.exc_info())
else:
to_teardown.append(case)
- for _t in case:
- test_addr = self.addtask(testQueue,tasks,_t)
- log.debug("Queued shared-fixture test %s (%s) to %s",
- len(tasks), test_addr, testQueue)
+ if case.factory:
+ ancestors=case.factory.context.get(case, [])
+ for an in ancestors[:2]:
+ #log.debug('reset ancestor %s', an)
+ if getattr(an, '_multiprocess_shared_', False):
+ an._multiprocess_can_split_=True
+ #an._multiprocess_shared_=False
+ self.collect(case, testQueue, tasks, to_teardown, result)
else:
test_addr = self.addtask(testQueue,tasks,case)
log.debug("Queued test %s (%s) to %s",
len(tasks), test_addr, testQueue)
+ def run(self, test):
+ """
+ Execute the test (which may be a test suite). If the test is a suite,
+ distribute it out among as many processes as have been configured, at
+ as fine a level as is possible given the context fixtures defined in
+ the suite or any sub-suites.
+
+ """
+ log.debug("%s.run(%s) (%s)", self, test, os.getpid())
+ wrapper = self.config.plugins.prepareTest(test)
+ if wrapper is not None:
+ test = wrapper
+
+ # plugins can decorate or capture the output stream
+ wrapped = self.config.plugins.setOutputStream(self.stream)
+ if wrapped is not None:
+ self.stream = wrapped
+
+ testQueue = Queue()
+ resultQueue = Queue()
+ tasks = []
+ completed = []
+ workers = []
+ to_teardown = []
+ shouldStop = Event()
+
+ result = self._makeResult()
+ start = time.time()
+
+ self.collect(test, testQueue, tasks, to_teardown, result)
+
log.debug("Starting %s workers", self.config.multiprocess_workers)
for i in range(self.config.multiprocess_workers):
currentaddr = Value('c',bytes_(''))
@@ -755,40 +762,27 @@ class NoSharedFixtureContextSuite(ContextSuite):
result.addError(self, self._exc_info())
return
try:
- localtests = [test for test in self._tests]
- if (not self.hasFixtures(MultiProcessTestRunner.checkCanSplit)
- and len(localtests) > 1 and self.testQueue is not None):
- log.debug("queue %d tests"%len(localtests))
- for test in localtests:
- if isinstance(test.test,nose.failure.Failure):
- # proably failed in the generator, so execute directly
- # to get the exception
- test(orig)
- else:
- MultiProcessTestRunner.addtask(self.testQueue,
- self.tasks, test)
- else:
- for test in localtests:
- if (isinstance(test,nose.case.Test)
- and self.arg is not None):
- test.test.arg = self.arg
- else:
- test.arg = self.arg
- test.testQueue = self.testQueue
- test.tasks = self.tasks
- if result.shouldStop:
- log.debug("stopping")
- break
- # each nose.case.Test will create its own result proxy
- # so the cases need the original result, to avoid proxy
- # chains
- try:
- test(orig)
- except KeyboardInterrupt,e:
- err = (TimedOutException,TimedOutException(str(test)),
- sys.exc_info()[2])
- test.config.plugins.addError(test,err)
- orig.addError(test,err)
+ for test in self._tests:
+ if (isinstance(test,nose.case.Test)
+ and self.arg is not None):
+ test.test.arg = self.arg
+ else:
+ test.arg = self.arg
+ test.testQueue = self.testQueue
+ test.tasks = self.tasks
+ if result.shouldStop:
+ log.debug("stopping")
+ break
+ # each nose.case.Test will create its own result proxy
+ # so the cases need the original result, to avoid proxy
+ # chains
+ try:
+ test(orig)
+ except KeyboardInterrupt,e:
+ err = (TimedOutException,TimedOutException(str(test)),
+ sys.exc_info()[2])
+ test.config.plugins.addError(test,err)
+ orig.addError(test,err)
finally:
self.has_run = True
try: