diff options
| author | Joshua Harlow <harlowja@yahoo-inc.com> | 2013-07-08 11:19:09 -0700 |
|---|---|---|
| committer | Joshua Harlow <harlowja@yahoo-inc.com> | 2013-07-08 11:19:09 -0700 |
| commit | 61f3944a4f9cd2fb00690b945a4aa7b4736cd2e7 (patch) | |
| tree | 43b7e7d8493e8158e9eba0cc6004fa96c826a3e7 /taskflow/patterns | |
| parent | b89eefea1804ede071747f558bb11571090445b3 (diff) | |
| download | taskflow-61f3944a4f9cd2fb00690b945a4aa7b4736cd2e7.tar.gz | |
Add helper reset internals function.
Use a little helper function to reset
the internal state of the flow when needed
instead of duplicating code that does this.
Change-Id: I51d83538a2920c7d387ffd1756e8d99413f4077e
Diffstat (limited to 'taskflow/patterns')
| -rw-r--r-- | taskflow/patterns/graph_flow.py | 14 | ||||
| -rw-r--r-- | taskflow/patterns/linear_flow.py | 14 |
2 files changed, 15 insertions, 13 deletions
diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py index 2883c48..824545a 100644 --- a/taskflow/patterns/graph_flow.py +++ b/taskflow/patterns/graph_flow.py @@ -49,8 +49,7 @@ class Flow(linear_flow.Flow): assert isinstance(task, collections.Callable) r = utils.Runner(task) self._graph.add_node(r, uuid=r.uuid) - self._runners = [] - self._leftoff_at = None + self._reset_internals() return r.uuid def _add_dependency(self, provider, requirer): @@ -64,6 +63,10 @@ class Flow(linear_flow.Flow): lines.append("%s" % (self.state)) return "; ".join(lines) + def _reset_internals(self): + super(Flow, self)._reset_internals() + self._runners = [] + @decorators.locked def remove(self, uuid): runner = None @@ -74,10 +77,8 @@ class Flow(linear_flow.Flow): if not runner: raise ValueError("No runner found with uuid %s" % (uuid)) else: - # Ensure that we reset out internal state after said removal self._graph.remove_node(runner) - self._runners = [] - self._leftoff_at = None + self._reset_internals() def _ordering(self): try: @@ -94,7 +95,7 @@ class Flow(linear_flow.Flow): create said dependency.""" if len(self._graph) == 0: return [] - if self._runners: + if self._connected: return self._runners # Clear out all edges (since we want to do a fresh connection) @@ -137,4 +138,5 @@ class Flow(linear_flow.Flow): r.runs_before = list(reversed(run_stack)) run_stack.append(r) self._runners = run_order + self._connected = True return run_order diff --git a/taskflow/patterns/linear_flow.py b/taskflow/patterns/linear_flow.py index 9a9df15..149842f 100644 --- a/taskflow/patterns/linear_flow.py +++ b/taskflow/patterns/linear_flow.py @@ -70,11 +70,14 @@ class Flow(base.Flow): assert isinstance(task, collections.Callable) r = utils.Runner(task) r.runs_before = list(reversed(self._runners)) - self._connected = False - self._leftoff_at = None self._runners.append(r) + self._reset_internals() return r.uuid + def _reset_internals(self): + self._connected = False + self._leftoff_at = None + def _associate_providers(self, runner): # Ensure that some previous task provides this input. who_provides = {} @@ -112,10 +115,8 @@ class Flow(base.Flow): if index_removed == -1: raise ValueError("No runner found with uuid %s" % (uuid)) else: - # Ensure that we reset out internal state after said removal. removed = self._runners.pop(index_removed) - self._connected = False - self._leftoff_at = None + self._reset_internals() # Go and remove it from any runner after the removed runner since # those runners may have had an attachment to it. for r in self._runners[index_removed:]: @@ -257,8 +258,7 @@ class Flow(base.Flow): self.results = {} self.resumer = None self._accumulator.reset() - self._leftoff_at = None - self._connected = False + self._reset_internals() @decorators.locked def rollback(self, context, cause): |
