summaryrefslogtreecommitdiff
path: root/taskflow/tests/unit
diff options
context:
space:
mode:
authorGreg Hill <greg.hill@rackspace.com>2015-10-28 09:07:58 -0500
committerGreg Hill <greg.hill@rackspace.com>2016-01-25 15:04:34 -0600
commit5ce07b2de15cbbd417748edd3fac12a166152aea (patch)
tree3db2c372c10f5a5b7eaef32d804d428642af7105 /taskflow/tests/unit
parent31764bfb9646e8ede4d0c6ef75888e9f33cb03e4 (diff)
downloadtaskflow-5ce07b2de15cbbd417748edd3fac12a166152aea.tar.gz
Retrieve the store from flowdetails as well, if it exists
Gives users a more permanent way to provide an initial set of arguments to a flow. Change-Id: Ib9c3d60882548120d467a645bbac9be78408bac3 Implements: blueprint flow-details-keep-store
Diffstat (limited to 'taskflow/tests/unit')
-rw-r--r--taskflow/tests/unit/test_conductors.py137
1 files changed, 137 insertions, 0 deletions
diff --git a/taskflow/tests/unit/test_conductors.py b/taskflow/tests/unit/test_conductors.py
index 9fa46f9..6177f26 100644
--- a/taskflow/tests/unit/test_conductors.py
+++ b/taskflow/tests/unit/test_conductors.py
@@ -53,6 +53,12 @@ def test_factory(blowup):
return f
+def test_store_factory():
+ f = lf.Flow("test")
+ f.add(test_utils.TaskMultiArg('task1'))
+ return f
+
+
def single_factory():
return futurist.ThreadPoolExecutor(max_workers=1)
@@ -229,6 +235,137 @@ class ManyConductorTest(testscenarios.TestWithScenarios,
self.assertIsNotNone(fd)
self.assertEqual(st.REVERTED, fd.state)
+ def test_missing_store(self):
+ components = self.make_components()
+ components.conductor.connect()
+ consumed_event = threading.Event()
+
+ def on_consume(state, details):
+ consumed_event.set()
+
+ components.board.notifier.register(base.REMOVAL, on_consume)
+ with close_many(components.conductor, components.client):
+ t = threading_utils.daemon_thread(components.conductor.run)
+ t.start()
+ lb, fd = pu.temporary_flow_detail(components.persistence)
+ engines.save_factory_details(fd, test_store_factory,
+ [], {},
+ backend=components.persistence)
+ components.board.post('poke', lb,
+ details={'flow_uuid': fd.uuid})
+ self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT))
+ components.conductor.stop()
+ self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT))
+ self.assertFalse(components.conductor.dispatching)
+
+ persistence = components.persistence
+ with contextlib.closing(persistence.get_connection()) as conn:
+ lb = conn.get_logbook(lb.uuid)
+ fd = lb.find(fd.uuid)
+ self.assertIsNotNone(fd)
+ self.assertIsNone(fd.state)
+
+ def test_job_store(self):
+ components = self.make_components()
+ components.conductor.connect()
+ consumed_event = threading.Event()
+
+ def on_consume(state, details):
+ consumed_event.set()
+
+ store = {'x': True, 'y': False, 'z': None}
+
+ components.board.notifier.register(base.REMOVAL, on_consume)
+ with close_many(components.conductor, components.client):
+ t = threading_utils.daemon_thread(components.conductor.run)
+ t.start()
+ lb, fd = pu.temporary_flow_detail(components.persistence)
+ engines.save_factory_details(fd, test_store_factory,
+ [], {},
+ backend=components.persistence)
+ components.board.post('poke', lb,
+ details={'flow_uuid': fd.uuid,
+ 'store': store})
+ self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT))
+ components.conductor.stop()
+ self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT))
+ self.assertFalse(components.conductor.dispatching)
+
+ persistence = components.persistence
+ with contextlib.closing(persistence.get_connection()) as conn:
+ lb = conn.get_logbook(lb.uuid)
+ fd = lb.find(fd.uuid)
+ self.assertIsNotNone(fd)
+ self.assertEqual(st.SUCCESS, fd.state)
+
+ def test_flowdetails_store(self):
+ components = self.make_components()
+ components.conductor.connect()
+ consumed_event = threading.Event()
+
+ def on_consume(state, details):
+ consumed_event.set()
+
+ store = {'x': True, 'y': False, 'z': None}
+
+ components.board.notifier.register(base.REMOVAL, on_consume)
+ with close_many(components.conductor, components.client):
+ t = threading_utils.daemon_thread(components.conductor.run)
+ t.start()
+ lb, fd = pu.temporary_flow_detail(components.persistence,
+ meta={'store': store})
+ engines.save_factory_details(fd, test_store_factory,
+ [], {},
+ backend=components.persistence)
+ components.board.post('poke', lb,
+ details={'flow_uuid': fd.uuid})
+ self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT))
+ components.conductor.stop()
+ self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT))
+ self.assertFalse(components.conductor.dispatching)
+
+ persistence = components.persistence
+ with contextlib.closing(persistence.get_connection()) as conn:
+ lb = conn.get_logbook(lb.uuid)
+ fd = lb.find(fd.uuid)
+ self.assertIsNotNone(fd)
+ self.assertEqual(st.SUCCESS, fd.state)
+
+ def test_combined_store(self):
+ components = self.make_components()
+ components.conductor.connect()
+ consumed_event = threading.Event()
+
+ def on_consume(state, details):
+ consumed_event.set()
+
+ flow_store = {'x': True, 'y': False}
+ job_store = {'z': None}
+
+ components.board.notifier.register(base.REMOVAL, on_consume)
+ with close_many(components.conductor, components.client):
+ t = threading_utils.daemon_thread(components.conductor.run)
+ t.start()
+ lb, fd = pu.temporary_flow_detail(components.persistence,
+ meta={'store': flow_store})
+ engines.save_factory_details(fd, test_store_factory,
+ [], {},
+ backend=components.persistence)
+ components.board.post('poke', lb,
+ details={'flow_uuid': fd.uuid,
+ 'store': job_store})
+ self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT))
+ components.conductor.stop()
+ self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT))
+ self.assertFalse(components.conductor.dispatching)
+
+ persistence = components.persistence
+ with contextlib.closing(persistence.get_connection()) as conn:
+ lb = conn.get_logbook(lb.uuid)
+ fd = lb.find(fd.uuid)
+ self.assertIsNotNone(fd)
+ self.assertEqual(st.SUCCESS, fd.state)
+
class NonBlockingExecutorTest(test.TestCase):
def test_bad_wait_timeout(self):