summaryrefslogtreecommitdiff
path: root/taskflow/patterns
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2013-06-06 12:40:00 -0700
committerJoshua Harlow <harlowja@gmail.com>2013-07-06 12:14:00 -0700
commitd746a93171ccfb5507f8a71b3e71814fc89d2a7f (patch)
tree2ffd2be4c60a1912a32a726de02f781584c54df2 /taskflow/patterns
parent0cfb18e00b983e5e380c7d91f9fbe598737ac1ab (diff)
downloadtaskflow-d746a93171ccfb5507f8a71b3e71814fc89d2a7f.tar.gz
Make connection/validation of tasks be after they are added.
Instead of having validation be immediate when tasks are added to the linear flow, follow the same paradigm as the graph_flow where only upon connection (or determination of the order) will tasks be connected/validated. Change-Id: Ia8275ec88b0229f0793819249ae59fad0a2e9935
Diffstat (limited to 'taskflow/patterns')
-rw-r--r--taskflow/patterns/graph_flow.py13
-rw-r--r--taskflow/patterns/linear_flow.py28
2 files changed, 39 insertions, 2 deletions
diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py
index 584bd0e..83a7433 100644
--- a/taskflow/patterns/graph_flow.py
+++ b/taskflow/patterns/graph_flow.py
@@ -64,6 +64,19 @@ class Flow(linear_flow.Flow):
lines.append(" State: %s" % (self.state))
return "\n".join(lines)
+ @decorators.locked
+ def remove(self, task_uuid):
+ remove_nodes = []
+ for r in self._graph.nodes_iter():
+ if r.uuid == task_uuid:
+ remove_nodes.append(r)
+ if not remove_nodes:
+ raise IndexError("No task found with uuid %s" % (task_uuid))
+ else:
+ for r in remove_nodes:
+ self._graph.remove_node(r)
+ self._runners = []
+
def _ordering(self):
try:
return self._connect()
diff --git a/taskflow/patterns/linear_flow.py b/taskflow/patterns/linear_flow.py
index 2d711c1..8122845 100644
--- a/taskflow/patterns/linear_flow.py
+++ b/taskflow/patterns/linear_flow.py
@@ -62,6 +62,7 @@ class Flow(base.Flow):
self._left_off_at = 0
# All runners to run are collected here.
self._runners = []
+ self._connected = False
@decorators.locked
def add_many(self, tasks):
@@ -76,7 +77,7 @@ class Flow(base.Flow):
assert isinstance(task, collections.Callable)
r = utils.Runner(task)
r.runs_before = list(reversed(self._runners))
- self._associate_providers(r)
+ self._connected = False
self._runners.append(r)
return r.uuid
@@ -108,9 +109,31 @@ class Flow(base.Flow):
lines.append(" State: %s" % (self.state))
return "\n".join(lines)
- def _ordering(self):
+ @decorators.locked
+ def remove(self, task_uuid):
+ removed = False
+ for (i, r) in enumerate(self._runners):
+ if r.uuid == task_uuid:
+ self._runners.pop(i)
+ self._connected = False
+ removed = True
+ break
+ if not removed:
+ raise IndexError("No task found with uuid %s" % (task_uuid))
+
+ def _connect(self):
+ if self._connected:
+ return self._runners
+ for r in self._runners:
+ r.providers = {}
+ for r in reversed(self._runners):
+ self._associate_providers(r)
+ self._connected = True
return self._runners
+ def _ordering(self):
+ return self._connect()
+
@decorators.locked
def run(self, context, *args, **kwargs):
super(Flow, self).run(context, *args, **kwargs)
@@ -241,6 +264,7 @@ class Flow(base.Flow):
self.result_fetcher = None
self._accumulator.reset()
self._left_off_at = 0
+ self._connected = False
@decorators.locked
def rollback(self, context, cause):