diff options
author | James E. Blair <jeblair@linux.vnet.ibm.com> | 2015-12-02 11:03:25 -0800 |
---|---|---|
committer | James E. Blair <jeblair@linux.vnet.ibm.com> | 2015-12-03 10:48:32 -0800 |
commit | f7a2b658c97e900a1b287da090a6786cbb07f3f4 (patch) | |
tree | 282d8a1b721eb02a798f7ac095001cba8f0b5da0 | |
parent | 5491e93d95e1dcde6de541f3e8a3452c9a0523d1 (diff) | |
download | gear-f7a2b658c97e900a1b287da090a6786cbb07f3f4.tar.gz |
Add a test for worker termination
This was an attempt to find a code path that could cause
stopWaitingForJobs to raise an exception. So far, that has failed,
but add this test anyway to exercise some of that code.
Change-Id: I39955fc7250af9b93f336c7c3e0315528bd8d4a9
-rw-r--r-- | gear/tests/__init__.py | 6 | ||||
-rw-r--r-- | gear/tests/test_functional.py | 12 |
2 files changed, 17 insertions, 1 deletions
diff --git a/gear/tests/__init__.py b/gear/tests/__init__.py index 834b0f0..6d5edb4 100644 --- a/gear/tests/__init__.py +++ b/gear/tests/__init__.py @@ -17,6 +17,7 @@ """Common utilities used in testing""" import errno +import logging import os import socket @@ -49,7 +50,10 @@ class BaseTestCase(testtools.TestCase, testresources.ResourcedTestCase): stderr = self.useFixture(fixtures.StringStream('stderr')).stream self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr)) - self.useFixture(fixtures.FakeLogger()) + self.useFixture(fixtures.FakeLogger( + level=logging.DEBUG, + format='%(asctime)s %(name)-32s ' + '%(levelname)-8s %(message)s')) self.useFixture(fixtures.NestedTempfile()) diff --git a/gear/tests/test_functional.py b/gear/tests/test_functional.py index 247649a..26e72e3 100644 --- a/gear/tests/test_functional.py +++ b/gear/tests/test_functional.py @@ -14,11 +14,13 @@ # limitations under the License. import os +import threading import time from OpenSSL import crypto import fixtures import testscenarios +import testtools import gear from gear import tests @@ -125,6 +127,16 @@ class TestFunctional(tests.BaseTestCase): self.assertTrue(job.complete) self.assertEqual(job.data, [b'workdata']) + def test_worker_termination(self): + def getJob(): + with testtools.ExpectedException(gear.InterruptedError): + self.worker.getJob() + self.worker.registerFunction('test') + jobthread = threading.Thread(target=getJob) + jobthread.daemon = True + jobthread.start() + self.worker.stopWaitingForJobs() + def load_tests(loader, in_tests, pattern): return testscenarios.load_tests_apply_scenarios(loader, in_tests, pattern) |