summaryrefslogtreecommitdiff
path: root/taskflow/tests/unit/test_conductors.py
diff options
context:
space:
mode:
authorMin Pae <sputnik13@gmail.com>2015-11-04 09:27:25 -0800
committerMin Pae <sputnik13@gmail.com>2015-11-16 12:20:28 -0800
commit6918b8fab0d303bb7596df657f24897bbc67a1fd (patch)
treedfe1fc4432146911ab88bcd41ffa19679773ea9e /taskflow/tests/unit/test_conductors.py
parentae9c701f9073941fbe063d2b7854ff6eed5b5fc0 (diff)
downloadtaskflow-6918b8fab0d303bb7596df657f24897bbc67a1fd.tar.gz
Adding notification points for job completion
Adding notifications for job completion, both consumed and abandoned, so that a listener can take some action based on job completion. Change-Id: I826285d4bfccd2406df7b59e53a9b724702ed094
Diffstat (limited to 'taskflow/tests/unit/test_conductors.py')
-rw-r--r--taskflow/tests/unit/test_conductors.py32
1 files changed, 32 insertions, 0 deletions
diff --git a/taskflow/tests/unit/test_conductors.py b/taskflow/tests/unit/test_conductors.py
index d7f84d5..9fa46f9 100644
--- a/taskflow/tests/unit/test_conductors.py
+++ b/taskflow/tests/unit/test_conductors.py
@@ -113,11 +113,25 @@ class ManyConductorTest(testscenarios.TestWithScenarios,
components = self.make_components()
components.conductor.connect()
consumed_event = threading.Event()
+ job_consumed_event = threading.Event()
+ job_abandoned_event = threading.Event()
def on_consume(state, details):
consumed_event.set()
+ def on_job_consumed(event, details):
+ if event == 'job_consumed':
+ job_consumed_event.set()
+
+ def on_job_abandoned(event, details):
+ if event == 'job_abandoned':
+ job_abandoned_event.set()
+
components.board.notifier.register(base.REMOVAL, on_consume)
+ components.conductor.notifier.register("job_consumed",
+ on_job_consumed)
+ components.conductor.notifier.register("job_abandoned",
+ on_job_abandoned)
with close_many(components.conductor, components.client):
t = threading_utils.daemon_thread(components.conductor.run)
t.start()
@@ -128,6 +142,8 @@ class ManyConductorTest(testscenarios.TestWithScenarios,
components.board.post('poke', lb,
details={'flow_uuid': fd.uuid})
self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT))
+ self.assertTrue(job_consumed_event.wait(test_utils.WAIT_TIMEOUT))
+ self.assertFalse(job_abandoned_event.wait(1))
components.conductor.stop()
self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT))
self.assertFalse(components.conductor.dispatching)
@@ -171,11 +187,25 @@ class ManyConductorTest(testscenarios.TestWithScenarios,
components = self.make_components()
components.conductor.connect()
consumed_event = threading.Event()
+ job_consumed_event = threading.Event()
+ job_abandoned_event = threading.Event()
def on_consume(state, details):
consumed_event.set()
+ def on_job_consumed(event, details):
+ if event == 'job_consumed':
+ job_consumed_event.set()
+
+ def on_job_abandoned(event, details):
+ if event == 'job_abandoned':
+ job_abandoned_event.set()
+
components.board.notifier.register(base.REMOVAL, on_consume)
+ components.conductor.notifier.register("job_consumed",
+ on_job_consumed)
+ components.conductor.notifier.register("job_abandoned",
+ on_job_abandoned)
with close_many(components.conductor, components.client):
t = threading_utils.daemon_thread(components.conductor.run)
t.start()
@@ -186,6 +216,8 @@ class ManyConductorTest(testscenarios.TestWithScenarios,
components.board.post('poke', lb,
details={'flow_uuid': fd.uuid})
self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT))
+ self.assertTrue(job_consumed_event.wait(test_utils.WAIT_TIMEOUT))
+ self.assertFalse(job_abandoned_event.wait(1))
components.conductor.stop()
self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT))
self.assertFalse(components.conductor.dispatching)