summaryrefslogtreecommitdiff
path: root/taskflow/tests
diff options
context:
space:
mode:
authorGreg Hill <greg.hill@rackspace.com>2016-01-20 10:57:45 -0600
committerGreg Hill <greg.hill@rackspace.com>2016-05-03 12:59:18 -0500
commitd1db445461ae714738bd4c4d73e043c481e1213d (patch)
treecaf0d15d1e8a2aadf9c3367e9bea8318bd2269ad /taskflow/tests
parent1bc8dd9bcab110d06fb36da756b8acc19febd065 (diff)
downloadtaskflow-d1db445461ae714738bd4c4d73e043c481e1213d.tar.gz
Make conductor.stop stop the running engine gracefully
Previously stopping the conductor would only prevent it from accepting new jobs. This change makes it abort the current job by telling the engine to stop processing tasks after the current ones finish. This allows for conductors to gracefully exit when receiving a kill signal (although the signal handling is not done automatically yet). Change-Id: Ie6ddcbb2df4508ad1e3f6698c6f4cb2fc26a278f
Diffstat (limited to 'taskflow/tests')
-rw-r--r--taskflow/tests/unit/test_conductors.py53
-rw-r--r--taskflow/tests/unit/worker_based/test_worker.py2
-rw-r--r--taskflow/tests/utils.py6
3 files changed, 60 insertions, 1 deletions
diff --git a/taskflow/tests/unit/test_conductors.py b/taskflow/tests/unit/test_conductors.py
index 6177f26..569e593 100644
--- a/taskflow/tests/unit/test_conductors.py
+++ b/taskflow/tests/unit/test_conductors.py
@@ -53,6 +53,13 @@ def test_factory(blowup):
return f
+def sleep_factory():
+ f = lf.Flow("test")
+ f.add(test_utils.SleepTask('test1'))
+ f.add(test_utils.ProgressingTask('test2'))
+ return f
+
+
def test_store_factory():
f = lf.Flow("test")
f.add(test_utils.TaskMultiArg('task1'))
@@ -366,6 +373,52 @@ class ManyConductorTest(testscenarios.TestWithScenarios,
self.assertIsNotNone(fd)
self.assertEqual(st.SUCCESS, fd.state)
+ def test_stop_aborts_engine(self):
+ components = self.make_components()
+ components.conductor.connect()
+ consumed_event = threading.Event()
+ job_consumed_event = threading.Event()
+ job_abandoned_event = threading.Event()
+ running_start_event = threading.Event()
+
+ def on_running_start(event, details):
+ running_start_event.set()
+
+ def on_consume(state, details):
+ consumed_event.set()
+
+ def on_job_consumed(event, details):
+ if event == 'job_consumed':
+ job_consumed_event.set()
+
+ def on_job_abandoned(event, details):
+ if event == 'job_abandoned':
+ job_abandoned_event.set()
+
+ components.board.notifier.register(base.REMOVAL, on_consume)
+ components.conductor.notifier.register("job_consumed",
+ on_job_consumed)
+ components.conductor.notifier.register("job_abandoned",
+ on_job_abandoned)
+ components.conductor.notifier.register("running_start",
+ on_running_start)
+ with close_many(components.conductor, components.client):
+ t = threading_utils.daemon_thread(components.conductor.run)
+ t.start()
+ lb, fd = pu.temporary_flow_detail(components.persistence)
+ engines.save_factory_details(fd, sleep_factory,
+ [], {},
+ backend=components.persistence)
+ components.board.post('poke', lb,
+ details={'flow_uuid': fd.uuid,
+ 'store': {'duration': 2}})
+ running_start_event.wait(test_utils.WAIT_TIMEOUT)
+ components.conductor.stop()
+ job_abandoned_event.wait(test_utils.WAIT_TIMEOUT)
+ self.assertTrue(job_abandoned_event.is_set())
+ self.assertFalse(job_consumed_event.is_set())
+ self.assertFalse(consumed_event.is_set())
+
class NonBlockingExecutorTest(test.TestCase):
def test_bad_wait_timeout(self):
diff --git a/taskflow/tests/unit/worker_based/test_worker.py b/taskflow/tests/unit/worker_based/test_worker.py
index da26fa3..8716fa2 100644
--- a/taskflow/tests/unit/worker_based/test_worker.py
+++ b/taskflow/tests/unit/worker_based/test_worker.py
@@ -33,7 +33,7 @@ class TestWorker(test.MockTestCase):
self.broker_url = 'test-url'
self.exchange = 'test-exchange'
self.topic = 'test-topic'
- self.endpoint_count = 27
+ self.endpoint_count = 28
# patch classes
self.executor_mock, self.executor_inst_mock = self.patchClass(
diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py
index 23eeeb6..0a71c14 100644
--- a/taskflow/tests/utils.py
+++ b/taskflow/tests/utils.py
@@ -17,6 +17,7 @@
import contextlib
import string
import threading
+import time
import redis
import six
@@ -340,6 +341,11 @@ class TaskRevertExtraArgs(task.Task):
pass
+class SleepTask(task.Task):
+ def execute(self, duration, **kwargs):
+ time.sleep(duration)
+
+
class EngineTestBase(object):
def setUp(self):
super(EngineTestBase, self).setUp()