summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2016-06-02 07:02:10 +0000
committerGerrit Code Review <review@openstack.org>2016-06-02 07:02:10 +0000
commitf3d1c06c37c23f3e35d1a5e68207b7e3e453307a (patch)
tree28c108b902c701a87c5e31bcb5c6dfc1962fdf22
parent40f6feefe46271c75c7e8faaec6280bd34705599 (diff)
parentf7a2b658c97e900a1b287da090a6786cbb07f3f4 (diff)
downloadgear-f3d1c06c37c23f3e35d1a5e68207b7e3e453307a.tar.gz
Merge "Add a test for worker termination"
-rw-r--r--gear/tests/__init__.py6
-rw-r--r--gear/tests/test_functional.py12
2 files changed, 17 insertions, 1 deletions
diff --git a/gear/tests/__init__.py b/gear/tests/__init__.py
index 834b0f0..6d5edb4 100644
--- a/gear/tests/__init__.py
+++ b/gear/tests/__init__.py
@@ -17,6 +17,7 @@
"""Common utilities used in testing"""
import errno
+import logging
import os
import socket
@@ -49,7 +50,10 @@ class BaseTestCase(testtools.TestCase, testresources.ResourcedTestCase):
stderr = self.useFixture(fixtures.StringStream('stderr')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
- self.useFixture(fixtures.FakeLogger())
+ self.useFixture(fixtures.FakeLogger(
+ level=logging.DEBUG,
+ format='%(asctime)s %(name)-32s '
+ '%(levelname)-8s %(message)s'))
self.useFixture(fixtures.NestedTempfile())
diff --git a/gear/tests/test_functional.py b/gear/tests/test_functional.py
index 247649a..26e72e3 100644
--- a/gear/tests/test_functional.py
+++ b/gear/tests/test_functional.py
@@ -14,11 +14,13 @@
# limitations under the License.
import os
+import threading
import time
from OpenSSL import crypto
import fixtures
import testscenarios
+import testtools
import gear
from gear import tests
@@ -125,6 +127,16 @@ class TestFunctional(tests.BaseTestCase):
self.assertTrue(job.complete)
self.assertEqual(job.data, [b'workdata'])
+ def test_worker_termination(self):
+ def getJob():
+ with testtools.ExpectedException(gear.InterruptedError):
+ self.worker.getJob()
+ self.worker.registerFunction('test')
+ jobthread = threading.Thread(target=getJob)
+ jobthread.daemon = True
+ jobthread.start()
+ self.worker.stopWaitingForJobs()
+
def load_tests(loader, in_tests, pattern):
return testscenarios.load_tests_apply_scenarios(loader, in_tests, pattern)