summaryrefslogtreecommitdiff
path: root/taskflow/patterns
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2013-07-08 11:19:09 -0700
committerJoshua Harlow <harlowja@yahoo-inc.com>2013-07-08 11:19:09 -0700
commit61f3944a4f9cd2fb00690b945a4aa7b4736cd2e7 (patch)
tree43b7e7d8493e8158e9eba0cc6004fa96c826a3e7 /taskflow/patterns
parentb89eefea1804ede071747f558bb11571090445b3 (diff)
downloadtaskflow-61f3944a4f9cd2fb00690b945a4aa7b4736cd2e7.tar.gz
Add helper reset internals function.
Use a little helper function to reset the internal state of the flow when needed instead of duplicating code that does this. Change-Id: I51d83538a2920c7d387ffd1756e8d99413f4077e
Diffstat (limited to 'taskflow/patterns')
-rw-r--r--taskflow/patterns/graph_flow.py14
-rw-r--r--taskflow/patterns/linear_flow.py14
2 files changed, 15 insertions, 13 deletions
diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py
index 2883c48..824545a 100644
--- a/taskflow/patterns/graph_flow.py
+++ b/taskflow/patterns/graph_flow.py
@@ -49,8 +49,7 @@ class Flow(linear_flow.Flow):
assert isinstance(task, collections.Callable)
r = utils.Runner(task)
self._graph.add_node(r, uuid=r.uuid)
- self._runners = []
- self._leftoff_at = None
+ self._reset_internals()
return r.uuid
def _add_dependency(self, provider, requirer):
@@ -64,6 +63,10 @@ class Flow(linear_flow.Flow):
lines.append("%s" % (self.state))
return "; ".join(lines)
+ def _reset_internals(self):
+ super(Flow, self)._reset_internals()
+ self._runners = []
+
@decorators.locked
def remove(self, uuid):
runner = None
@@ -74,10 +77,8 @@ class Flow(linear_flow.Flow):
if not runner:
raise ValueError("No runner found with uuid %s" % (uuid))
else:
- # Ensure that we reset out internal state after said removal
self._graph.remove_node(runner)
- self._runners = []
- self._leftoff_at = None
+ self._reset_internals()
def _ordering(self):
try:
@@ -94,7 +95,7 @@ class Flow(linear_flow.Flow):
create said dependency."""
if len(self._graph) == 0:
return []
- if self._runners:
+ if self._connected:
return self._runners
# Clear out all edges (since we want to do a fresh connection)
@@ -137,4 +138,5 @@ class Flow(linear_flow.Flow):
r.runs_before = list(reversed(run_stack))
run_stack.append(r)
self._runners = run_order
+ self._connected = True
return run_order
diff --git a/taskflow/patterns/linear_flow.py b/taskflow/patterns/linear_flow.py
index 9a9df15..149842f 100644
--- a/taskflow/patterns/linear_flow.py
+++ b/taskflow/patterns/linear_flow.py
@@ -70,11 +70,14 @@ class Flow(base.Flow):
assert isinstance(task, collections.Callable)
r = utils.Runner(task)
r.runs_before = list(reversed(self._runners))
- self._connected = False
- self._leftoff_at = None
self._runners.append(r)
+ self._reset_internals()
return r.uuid
+ def _reset_internals(self):
+ self._connected = False
+ self._leftoff_at = None
+
def _associate_providers(self, runner):
# Ensure that some previous task provides this input.
who_provides = {}
@@ -112,10 +115,8 @@ class Flow(base.Flow):
if index_removed == -1:
raise ValueError("No runner found with uuid %s" % (uuid))
else:
- # Ensure that we reset out internal state after said removal.
removed = self._runners.pop(index_removed)
- self._connected = False
- self._leftoff_at = None
+ self._reset_internals()
# Go and remove it from any runner after the removed runner since
# those runners may have had an attachment to it.
for r in self._runners[index_removed:]:
@@ -257,8 +258,7 @@ class Flow(base.Flow):
self.results = {}
self.resumer = None
self._accumulator.reset()
- self._leftoff_at = None
- self._connected = False
+ self._reset_internals()
@decorators.locked
def rollback(self, context, cause):