diff options
author | Jenkins <jenkins@review.openstack.org> | 2016-06-02 07:02:10 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2016-06-02 07:02:10 +0000 |
commit | f3d1c06c37c23f3e35d1a5e68207b7e3e453307a (patch) | |
tree | 28c108b902c701a87c5e31bcb5c6dfc1962fdf22 | |
parent | 40f6feefe46271c75c7e8faaec6280bd34705599 (diff) | |
parent | f7a2b658c97e900a1b287da090a6786cbb07f3f4 (diff) | |
download | gear-f3d1c06c37c23f3e35d1a5e68207b7e3e453307a.tar.gz |
Merge "Add a test for worker termination"
-rw-r--r-- | gear/tests/__init__.py | 6 | ||||
-rw-r--r-- | gear/tests/test_functional.py | 12 |
2 files changed, 17 insertions, 1 deletions
diff --git a/gear/tests/__init__.py b/gear/tests/__init__.py index 834b0f0..6d5edb4 100644 --- a/gear/tests/__init__.py +++ b/gear/tests/__init__.py @@ -17,6 +17,7 @@ """Common utilities used in testing""" import errno +import logging import os import socket @@ -49,7 +50,10 @@ class BaseTestCase(testtools.TestCase, testresources.ResourcedTestCase): stderr = self.useFixture(fixtures.StringStream('stderr')).stream self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr)) - self.useFixture(fixtures.FakeLogger()) + self.useFixture(fixtures.FakeLogger( + level=logging.DEBUG, + format='%(asctime)s %(name)-32s ' + '%(levelname)-8s %(message)s')) self.useFixture(fixtures.NestedTempfile()) diff --git a/gear/tests/test_functional.py b/gear/tests/test_functional.py index 247649a..26e72e3 100644 --- a/gear/tests/test_functional.py +++ b/gear/tests/test_functional.py @@ -14,11 +14,13 @@ # limitations under the License. import os +import threading import time from OpenSSL import crypto import fixtures import testscenarios +import testtools import gear from gear import tests @@ -125,6 +127,16 @@ class TestFunctional(tests.BaseTestCase): self.assertTrue(job.complete) self.assertEqual(job.data, [b'workdata']) + def test_worker_termination(self): + def getJob(): + with testtools.ExpectedException(gear.InterruptedError): + self.worker.getJob() + self.worker.registerFunction('test') + jobthread = threading.Thread(target=getJob) + jobthread.daemon = True + jobthread.start() + self.worker.stopWaitingForJobs() + def load_tests(loader, in_tests, pattern): return testscenarios.load_tests_apply_scenarios(loader, in_tests, pattern) |