diff options
| author | Min Pae <sputnik13@gmail.com> | 2015-11-04 09:27:25 -0800 |
|---|---|---|
| committer | Min Pae <sputnik13@gmail.com> | 2015-11-16 12:20:28 -0800 |
| commit | 6918b8fab0d303bb7596df657f24897bbc67a1fd (patch) | |
| tree | dfe1fc4432146911ab88bcd41ffa19679773ea9e /taskflow/tests/unit/test_conductors.py | |
| parent | ae9c701f9073941fbe063d2b7854ff6eed5b5fc0 (diff) | |
| download | taskflow-6918b8fab0d303bb7596df657f24897bbc67a1fd.tar.gz | |
Adding notification points for job completion
Adding notifications for job completion, both consumed and abandoned, so that a
listener can take some action based on job completion.
Change-Id: I826285d4bfccd2406df7b59e53a9b724702ed094
Diffstat (limited to 'taskflow/tests/unit/test_conductors.py')
| -rw-r--r-- | taskflow/tests/unit/test_conductors.py | 32 |
1 files changed, 32 insertions, 0 deletions
diff --git a/taskflow/tests/unit/test_conductors.py b/taskflow/tests/unit/test_conductors.py index d7f84d5..9fa46f9 100644 --- a/taskflow/tests/unit/test_conductors.py +++ b/taskflow/tests/unit/test_conductors.py @@ -113,11 +113,25 @@ class ManyConductorTest(testscenarios.TestWithScenarios, components = self.make_components() components.conductor.connect() consumed_event = threading.Event() + job_consumed_event = threading.Event() + job_abandoned_event = threading.Event() def on_consume(state, details): consumed_event.set() + def on_job_consumed(event, details): + if event == 'job_consumed': + job_consumed_event.set() + + def on_job_abandoned(event, details): + if event == 'job_abandoned': + job_abandoned_event.set() + components.board.notifier.register(base.REMOVAL, on_consume) + components.conductor.notifier.register("job_consumed", + on_job_consumed) + components.conductor.notifier.register("job_abandoned", + on_job_abandoned) with close_many(components.conductor, components.client): t = threading_utils.daemon_thread(components.conductor.run) t.start() @@ -128,6 +142,8 @@ class ManyConductorTest(testscenarios.TestWithScenarios, components.board.post('poke', lb, details={'flow_uuid': fd.uuid}) self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT)) + self.assertTrue(job_consumed_event.wait(test_utils.WAIT_TIMEOUT)) + self.assertFalse(job_abandoned_event.wait(1)) components.conductor.stop() self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT)) self.assertFalse(components.conductor.dispatching) @@ -171,11 +187,25 @@ class ManyConductorTest(testscenarios.TestWithScenarios, components = self.make_components() components.conductor.connect() consumed_event = threading.Event() + job_consumed_event = threading.Event() + job_abandoned_event = threading.Event() def on_consume(state, details): consumed_event.set() + def on_job_consumed(event, details): + if event == 'job_consumed': + job_consumed_event.set() + + def on_job_abandoned(event, details): + if event == 'job_abandoned': + job_abandoned_event.set() + components.board.notifier.register(base.REMOVAL, on_consume) + components.conductor.notifier.register("job_consumed", + on_job_consumed) + components.conductor.notifier.register("job_abandoned", + on_job_abandoned) with close_many(components.conductor, components.client): t = threading_utils.daemon_thread(components.conductor.run) t.start() @@ -186,6 +216,8 @@ class ManyConductorTest(testscenarios.TestWithScenarios, components.board.post('poke', lb, details={'flow_uuid': fd.uuid}) self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT)) + self.assertTrue(job_consumed_event.wait(test_utils.WAIT_TIMEOUT)) + self.assertFalse(job_abandoned_event.wait(1)) components.conductor.stop() self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT)) self.assertFalse(components.conductor.dispatching) |
