diff options
author | Heng Liu <liucougar@gmail.com> | 2011-10-26 16:13:20 -0700 |
---|---|---|
committer | Heng Liu <liucougar@gmail.com> | 2011-10-26 16:13:20 -0700 |
commit | ec9eb1d2ffb2ea897fac5b80c9d941b3845cc6a0 (patch) | |
tree | 6335a4348c5f20c3fc5378e5fe078f3d6465c383 /nose | |
parent | 7eb66b631fd32f5c40b870118fc1a1fd2a575869 (diff) | |
download | nose-ec9eb1d2ffb2ea897fac5b80c9d941b3845cc6a0.tar.gz |
properly distribute tests under shared fixtures to multiprocess worker
this is a regression introduced when fixing Issue 462, but there is no test
coverage for this issue.
added a functional test case to make sure it's working as expected
Diffstat (limited to 'nose')
-rw-r--r-- | nose/plugins/multiprocess.py | 128 |
1 files changed, 61 insertions, 67 deletions
diff --git a/nose/plugins/multiprocess.py b/nose/plugins/multiprocess.py index 604752a..fc4235b 100644 --- a/nose/plugins/multiprocess.py +++ b/nose/plugins/multiprocess.py @@ -254,35 +254,7 @@ class MultiProcessTestRunner(TextTestRunner): self.loaderClass = kw.pop('loaderClass', loader.defaultTestLoader) super(MultiProcessTestRunner, self).__init__(**kw) - def run(self, test): - """ - Execute the test (which may be a test suite). If the test is a suite, - distribute it out among as many processes as have been configured, at - as fine a level as is possible given the context fixtures defined in - the suite or any sub-suites. - - """ - log.debug("%s.run(%s) (%s)", self, test, os.getpid()) - wrapper = self.config.plugins.prepareTest(test) - if wrapper is not None: - test = wrapper - - # plugins can decorate or capture the output stream - wrapped = self.config.plugins.setOutputStream(self.stream) - if wrapped is not None: - self.stream = wrapped - - testQueue = Queue() - resultQueue = Queue() - tasks = [] - completed = [] - workers = [] - to_teardown = [] - shouldStop = Event() - - result = self._makeResult() - start = time.time() - + def collect(self, test, testQueue, tasks, to_teardown, result): # dispatch and collect results # put indexes only on queue because tests aren't picklable for case in self.nextBatch(test): @@ -308,16 +280,51 @@ class MultiProcessTestRunner(TextTestRunner): result.addError(case, sys.exc_info()) else: to_teardown.append(case) - for _t in case: - test_addr = self.addtask(testQueue,tasks,_t) - log.debug("Queued shared-fixture test %s (%s) to %s", - len(tasks), test_addr, testQueue) + if case.factory: + ancestors=case.factory.context.get(case, []) + for an in ancestors[:2]: + #log.debug('reset ancestor %s', an) + if getattr(an, '_multiprocess_shared_', False): + an._multiprocess_can_split_=True + #an._multiprocess_shared_=False + self.collect(case, testQueue, tasks, to_teardown, result) else: test_addr = self.addtask(testQueue,tasks,case) log.debug("Queued test %s (%s) to %s", len(tasks), test_addr, testQueue) + def run(self, test): + """ + Execute the test (which may be a test suite). If the test is a suite, + distribute it out among as many processes as have been configured, at + as fine a level as is possible given the context fixtures defined in + the suite or any sub-suites. + + """ + log.debug("%s.run(%s) (%s)", self, test, os.getpid()) + wrapper = self.config.plugins.prepareTest(test) + if wrapper is not None: + test = wrapper + + # plugins can decorate or capture the output stream + wrapped = self.config.plugins.setOutputStream(self.stream) + if wrapped is not None: + self.stream = wrapped + + testQueue = Queue() + resultQueue = Queue() + tasks = [] + completed = [] + workers = [] + to_teardown = [] + shouldStop = Event() + + result = self._makeResult() + start = time.time() + + self.collect(test, testQueue, tasks, to_teardown, result) + log.debug("Starting %s workers", self.config.multiprocess_workers) for i in range(self.config.multiprocess_workers): currentaddr = Value('c',bytes_('')) @@ -755,40 +762,27 @@ class NoSharedFixtureContextSuite(ContextSuite): result.addError(self, self._exc_info()) return try: - localtests = [test for test in self._tests] - if (not self.hasFixtures(MultiProcessTestRunner.checkCanSplit) - and len(localtests) > 1 and self.testQueue is not None): - log.debug("queue %d tests"%len(localtests)) - for test in localtests: - if isinstance(test.test,nose.failure.Failure): - # proably failed in the generator, so execute directly - # to get the exception - test(orig) - else: - MultiProcessTestRunner.addtask(self.testQueue, - self.tasks, test) - else: - for test in localtests: - if (isinstance(test,nose.case.Test) - and self.arg is not None): - test.test.arg = self.arg - else: - test.arg = self.arg - test.testQueue = self.testQueue - test.tasks = self.tasks - if result.shouldStop: - log.debug("stopping") - break - # each nose.case.Test will create its own result proxy - # so the cases need the original result, to avoid proxy - # chains - try: - test(orig) - except KeyboardInterrupt,e: - err = (TimedOutException,TimedOutException(str(test)), - sys.exc_info()[2]) - test.config.plugins.addError(test,err) - orig.addError(test,err) + for test in self._tests: + if (isinstance(test,nose.case.Test) + and self.arg is not None): + test.test.arg = self.arg + else: + test.arg = self.arg + test.testQueue = self.testQueue + test.tasks = self.tasks + if result.shouldStop: + log.debug("stopping") + break + # each nose.case.Test will create its own result proxy + # so the cases need the original result, to avoid proxy + # chains + try: + test(orig) + except KeyboardInterrupt,e: + err = (TimedOutException,TimedOutException(str(test)), + sys.exc_info()[2]) + test.config.plugins.addError(test,err) + orig.addError(test,err) finally: self.has_run = True try: |