diff options
| author | Greg Hill <greg.hill@rackspace.com> | 2015-10-28 09:07:58 -0500 |
|---|---|---|
| committer | Greg Hill <greg.hill@rackspace.com> | 2016-01-25 15:04:34 -0600 |
| commit | 5ce07b2de15cbbd417748edd3fac12a166152aea (patch) | |
| tree | 3db2c372c10f5a5b7eaef32d804d428642af7105 /taskflow/tests/unit | |
| parent | 31764bfb9646e8ede4d0c6ef75888e9f33cb03e4 (diff) | |
| download | taskflow-5ce07b2de15cbbd417748edd3fac12a166152aea.tar.gz | |
Retrieve the store from flowdetails as well, if it exists
Gives users a more permanent way to provide an initial set of
arguments to a flow.
Change-Id: Ib9c3d60882548120d467a645bbac9be78408bac3
Implements: blueprint flow-details-keep-store
Diffstat (limited to 'taskflow/tests/unit')
| -rw-r--r-- | taskflow/tests/unit/test_conductors.py | 137 |
1 files changed, 137 insertions, 0 deletions
diff --git a/taskflow/tests/unit/test_conductors.py b/taskflow/tests/unit/test_conductors.py index 9fa46f9..6177f26 100644 --- a/taskflow/tests/unit/test_conductors.py +++ b/taskflow/tests/unit/test_conductors.py @@ -53,6 +53,12 @@ def test_factory(blowup): return f +def test_store_factory(): + f = lf.Flow("test") + f.add(test_utils.TaskMultiArg('task1')) + return f + + def single_factory(): return futurist.ThreadPoolExecutor(max_workers=1) @@ -229,6 +235,137 @@ class ManyConductorTest(testscenarios.TestWithScenarios, self.assertIsNotNone(fd) self.assertEqual(st.REVERTED, fd.state) + def test_missing_store(self): + components = self.make_components() + components.conductor.connect() + consumed_event = threading.Event() + + def on_consume(state, details): + consumed_event.set() + + components.board.notifier.register(base.REMOVAL, on_consume) + with close_many(components.conductor, components.client): + t = threading_utils.daemon_thread(components.conductor.run) + t.start() + lb, fd = pu.temporary_flow_detail(components.persistence) + engines.save_factory_details(fd, test_store_factory, + [], {}, + backend=components.persistence) + components.board.post('poke', lb, + details={'flow_uuid': fd.uuid}) + self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT)) + components.conductor.stop() + self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT)) + self.assertFalse(components.conductor.dispatching) + + persistence = components.persistence + with contextlib.closing(persistence.get_connection()) as conn: + lb = conn.get_logbook(lb.uuid) + fd = lb.find(fd.uuid) + self.assertIsNotNone(fd) + self.assertIsNone(fd.state) + + def test_job_store(self): + components = self.make_components() + components.conductor.connect() + consumed_event = threading.Event() + + def on_consume(state, details): + consumed_event.set() + + store = {'x': True, 'y': False, 'z': None} + + components.board.notifier.register(base.REMOVAL, on_consume) + with close_many(components.conductor, components.client): + t = threading_utils.daemon_thread(components.conductor.run) + t.start() + lb, fd = pu.temporary_flow_detail(components.persistence) + engines.save_factory_details(fd, test_store_factory, + [], {}, + backend=components.persistence) + components.board.post('poke', lb, + details={'flow_uuid': fd.uuid, + 'store': store}) + self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT)) + components.conductor.stop() + self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT)) + self.assertFalse(components.conductor.dispatching) + + persistence = components.persistence + with contextlib.closing(persistence.get_connection()) as conn: + lb = conn.get_logbook(lb.uuid) + fd = lb.find(fd.uuid) + self.assertIsNotNone(fd) + self.assertEqual(st.SUCCESS, fd.state) + + def test_flowdetails_store(self): + components = self.make_components() + components.conductor.connect() + consumed_event = threading.Event() + + def on_consume(state, details): + consumed_event.set() + + store = {'x': True, 'y': False, 'z': None} + + components.board.notifier.register(base.REMOVAL, on_consume) + with close_many(components.conductor, components.client): + t = threading_utils.daemon_thread(components.conductor.run) + t.start() + lb, fd = pu.temporary_flow_detail(components.persistence, + meta={'store': store}) + engines.save_factory_details(fd, test_store_factory, + [], {}, + backend=components.persistence) + components.board.post('poke', lb, + details={'flow_uuid': fd.uuid}) + self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT)) + components.conductor.stop() + self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT)) + self.assertFalse(components.conductor.dispatching) + + persistence = components.persistence + with contextlib.closing(persistence.get_connection()) as conn: + lb = conn.get_logbook(lb.uuid) + fd = lb.find(fd.uuid) + self.assertIsNotNone(fd) + self.assertEqual(st.SUCCESS, fd.state) + + def test_combined_store(self): + components = self.make_components() + components.conductor.connect() + consumed_event = threading.Event() + + def on_consume(state, details): + consumed_event.set() + + flow_store = {'x': True, 'y': False} + job_store = {'z': None} + + components.board.notifier.register(base.REMOVAL, on_consume) + with close_many(components.conductor, components.client): + t = threading_utils.daemon_thread(components.conductor.run) + t.start() + lb, fd = pu.temporary_flow_detail(components.persistence, + meta={'store': flow_store}) + engines.save_factory_details(fd, test_store_factory, + [], {}, + backend=components.persistence) + components.board.post('poke', lb, + details={'flow_uuid': fd.uuid, + 'store': job_store}) + self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT)) + components.conductor.stop() + self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT)) + self.assertFalse(components.conductor.dispatching) + + persistence = components.persistence + with contextlib.closing(persistence.get_connection()) as conn: + lb = conn.get_logbook(lb.uuid) + fd = lb.find(fd.uuid) + self.assertIsNotNone(fd) + self.assertEqual(st.SUCCESS, fd.state) + class NonBlockingExecutorTest(test.TestCase): def test_bad_wait_timeout(self): |
