summaryrefslogtreecommitdiff
path: root/taskflow/patterns
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-08-02 21:27:54 +0000
committerGerrit Code Review <review@openstack.org>2013-08-02 21:27:54 +0000
commit3674983e6a2f0ef84c0e93c64c60cd4c5c4db3f6 (patch)
tree47c23c293ef3516abd54df34784bbe3819dd7b98 /taskflow/patterns
parent9be6c5d18a91a13cf2ee96242ef03b40740cda66 (diff)
parent324ea453aee7ad6432e60360441fdc018d143d9c (diff)
downloadtaskflow-3674983e6a2f0ef84c0e93c64c60cd4c5c4db3f6.tar.gz
Merge "Adjust a bunch of hacking violations."
Diffstat (limited to 'taskflow/patterns')
-rw-r--r--taskflow/patterns/base.py37
-rw-r--r--taskflow/patterns/distributed_flow.py10
-rw-r--r--taskflow/patterns/graph_flow.py9
-rw-r--r--taskflow/patterns/linear_flow.py9
-rw-r--r--taskflow/patterns/resumption/logbook.py9
5 files changed, 57 insertions, 17 deletions
diff --git a/taskflow/patterns/base.py b/taskflow/patterns/base.py
index 14d6aa5..019e84f 100644
--- a/taskflow/patterns/base.py
+++ b/taskflow/patterns/base.py
@@ -28,9 +28,28 @@ from taskflow import utils
class Flow(object):
- """The base abstract class of all flow implementations."""
+ """The base abstract class of all flow implementations.
+
+ It provides a set of parents to flows that have a concept of parent flows
+ as well as a state and state utility functions to the deriving classes. It
+ also provides a name and an identifier (uuid or other) to the flow so that
+ it can be uniquely identifed among many flows.
+
+ Flows are expected to provide (if desired) the following methods:
+ - add
+ - add_many
+ - interrupt
+ - reset
+ - rollback
+ - run
+ - soft_reset
+ """
+
__metaclass__ = abc.ABCMeta
+ # Common states that certain actions can be performed in. If the flow
+ # is not in these sets of states then it is likely that the flow operation
+ # can not succeed.
RESETTABLE_STATES = set([
states.INTERRUPTED,
states.SUCCESS,
@@ -78,10 +97,12 @@ class Flow(object):
@property
def name(self):
+ """A non-unique name for this flow (human readable)"""
return self._name
@property
def uuid(self):
+ """Uniquely identifies this flow"""
return "f-%s" % (self._id)
@property
@@ -117,7 +138,8 @@ class Flow(object):
"""Adds a given task to this flow.
Returns the uuid that is associated with the task for later operations
- before and after it is ran."""
+ before and after it is ran.
+ """
raise NotImplementedError()
@decorators.locked
@@ -156,7 +178,10 @@ class Flow(object):
@decorators.locked
def reset(self):
"""Fully resets the internal state of this flow, allowing for the flow
- to be ran again. *Listeners are also reset*"""
+ to be ran again.
+
+ Note: Listeners are also reset.
+ """
if self.state not in self.RESETTABLE_STATES:
raise exc.InvalidStateException(("Can not reset when"
" in state %s") % (self.state))
@@ -167,7 +192,8 @@ class Flow(object):
@decorators.locked
def soft_reset(self):
"""Partially resets the internal state of this flow, allowing for the
- flow to be ran again from an interrupted state *only*"""
+ flow to be ran again from an interrupted state only.
+ """
if self.state not in self.SOFT_RESETTABLE_STATES:
raise exc.InvalidStateException(("Can not soft reset when"
" in state %s") % (self.state))
@@ -183,5 +209,6 @@ class Flow(object):
@decorators.locked
def rollback(self, context, cause):
"""Performs rollback of this workflow and any attached parent workflows
- if present."""
+ if present.
+ """
pass
diff --git a/taskflow/patterns/distributed_flow.py b/taskflow/patterns/distributed_flow.py
index 90b3f25..5b4268b 100644
--- a/taskflow/patterns/distributed_flow.py
+++ b/taskflow/patterns/distributed_flow.py
@@ -27,8 +27,14 @@ LOG = logging.getLogger(__name__)
class Flow(object):
- """A linear chain of independent tasks that can be applied as one unit or
- rolled back as one unit."""
+ """A flow that can paralleize task running by using celery.
+
+ This flow backs running tasks (and associated dependencies) by using celery
+ as the runtime framework to accomplish execution (and status reporting) of
+ said tasks that compose the flow. It allows for parallel execution where
+ possible (data/task dependency dependent) without having to worry about how
+ this is accomplished in celery.
+ """
def __init__(self, name, parents=None):
self.name = name
diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py
index 7e051db..320c347 100644
--- a/taskflow/patterns/graph_flow.py
+++ b/taskflow/patterns/graph_flow.py
@@ -35,7 +35,8 @@ LOG = logging.getLogger(__name__)
class Flow(linear_flow.Flow):
"""A extension of the linear flow which will run the associated tasks in
a linear topological ordering (and reverse using the same linear
- topological order)"""
+ topological order).
+ """
def __init__(self, name, parents=None, uuid=None):
super(Flow, self).__init__(name, parents, uuid)
@@ -67,7 +68,8 @@ class Flow(linear_flow.Flow):
@decorators.locked
def add_dependency(self, provider_uuid, requirer_uuid):
"""Connects provider to requirer where provider will now be required
- to run before requirer does."""
+ to run before requirer does.
+ """
if provider_uuid == requirer_uuid:
raise ValueError("Unable to link %s to itself" % provider_uuid)
provider = self._find_uuid(provider_uuid)
@@ -116,7 +118,8 @@ class Flow(linear_flow.Flow):
def _connect(self):
"""Connects the nodes & edges of the graph together by examining who
the requirements of each node and finding another node that will
- create said dependency."""
+ create said dependency.
+ """
if len(self._graph) == 0:
return []
if self._connected:
diff --git a/taskflow/patterns/linear_flow.py b/taskflow/patterns/linear_flow.py
index 96ef74e..c4d6084 100644
--- a/taskflow/patterns/linear_flow.py
+++ b/taskflow/patterns/linear_flow.py
@@ -34,11 +34,12 @@ LOG = logging.getLogger(__name__)
class Flow(base.Flow):
""""A linear chain of tasks that can be applied in order as one unit and
- rolled back as one unit using the reverse order that the tasks have
- been applied in.
+ rolled back as one unit using the reverse order that the tasks have
+ been applied in.
- Note(harlowja): Each task in the chain must have requirements
- which are satisfied by the previous task/s in the chain."""
+ Note(harlowja): Each task in the chain must have requirements
+ which are satisfied by the previous task/s in the chain.
+ """
def __init__(self, name, parents=None, uuid=None):
super(Flow, self).__init__(name, parents, uuid)
diff --git a/taskflow/patterns/resumption/logbook.py b/taskflow/patterns/resumption/logbook.py
index a3f7859..687e166 100644
--- a/taskflow/patterns/resumption/logbook.py
+++ b/taskflow/patterns/resumption/logbook.py
@@ -35,7 +35,8 @@ class Resumption(object):
def _task_listener(state, details):
"""Store the result of the task under the given flow in the log
- book so that it can be retrieved later."""
+ book so that it can be retrieved later.
+ """
runner = details['runner']
flow = details['flow']
LOG.debug("Recording %s of %s has finished state %s",
@@ -67,7 +68,8 @@ class Resumption(object):
def _workflow_listener(state, details):
"""Ensure that when we receive an event from said workflow that we
- make sure a logbook entry exists for that flow."""
+ make sure a logbook entry exists for that flow.
+ """
flow = details['flow']
old_state = details['old_state']
LOG.debug("%s has transitioned from %s to %s", flow, old_state,
@@ -112,7 +114,8 @@ class Resumption(object):
def resume(self, flow, ordering):
"""Splits the initial ordering into two segments, the first which
has already completed (or errored) and the second which has not
- completed or errored."""
+ completed or errored.
+ """
flow_id = flow.uuid
if flow_id not in self._logbook: