summaryrefslogtreecommitdiff
path: root/taskflow/flow.py
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2013-06-25 18:46:55 -0700
committerJoshua Harlow <harlowja@yahoo-inc.com>2013-08-26 12:45:21 -0700
commitfceabee18fd85397f7fa81557c0cadc532a87c4c (patch)
tree4a53888ff07777c90c48f50259ff0d96de7bc480 /taskflow/flow.py
parent392925d0130f18c18fd43c4449f7ee1a589bf802 (diff)
downloadtaskflow-fceabee18fd85397f7fa81557c0cadc532a87c4c.tar.gz
Add a locally running threaded flow
Propose a new graph flow that will run every task in the graph at the same time but will use a count down latch concept to ensure that a tasks dependents are provided before the task itself can run. This allows for tasks to run in parallel (if they have no dependents or are placed in disjoint parts of graph). Note: although this flow uses threads it is typically expected that the underlying threads would be greenthreads since python native threading is still suboptimal (for various reasons). Implements: blueprint locally-run-many-at-once Change-Id: If434abd77758aa12fc99da395a2559995305a853
Diffstat (limited to 'taskflow/flow.py')
-rw-r--r--taskflow/flow.py99
1 files changed, 51 insertions, 48 deletions
diff --git a/taskflow/flow.py b/taskflow/flow.py
index 4319913..8d03e48 100644
--- a/taskflow/flow.py
+++ b/taskflow/flow.py
@@ -21,7 +21,6 @@ import threading
from taskflow.openstack.common import uuidutils
-from taskflow import decorators
from taskflow import exceptions as exc
from taskflow import states
from taskflow import utils
@@ -86,14 +85,14 @@ class Flow(object):
# storage backend).
self.notifier = utils.TransitionNotifier()
self.task_notifier = utils.TransitionNotifier()
- # Ensure that modifications and/or multiple runs aren't happening
- # at the same time in the same flow at the same time.
- self._lock = threading.RLock()
# Assign this flow a unique identifer.
if uuid:
self._id = str(uuid)
else:
self._id = uuidutils.generate_uuid()
+ # Ensure we can not change the state at the same time in 2 different
+ # threads.
+ self._state_lock = threading.RLock()
@property
def name(self):
@@ -109,21 +108,26 @@ class Flow(object):
"""Provides a read-only view of the flow state."""
return self._state
- def _change_state(self, context, new_state):
- was_changed = False
- old_state = self.state
- with self._lock:
+ def _change_state(self, context, new_state, check_func=None, notify=True):
+ old_state = None
+ changed = False
+ with self._state_lock:
if self.state != new_state:
- old_state = self.state
- self._state = new_state
- was_changed = True
- if was_changed:
- # Don't notify while holding the lock.
+ if (not check_func or
+ (check_func and check_func(self.state))):
+ changed = True
+ old_state = self.state
+ self._state = new_state
+ # Don't notify while holding the lock so that the reciever of said
+ # notifications can actually perform operations on the given flow
+ # without getting into deadlock.
+ if notify and changed:
self.notifier.notify(self.state, details={
'context': context,
'flow': self,
'old_state': old_state,
})
+ return changed
def __str__(self):
lines = ["Flow: %s" % (self.name)]
@@ -141,7 +145,6 @@ class Flow(object):
"""
raise NotImplementedError()
- @decorators.locked
def add_many(self, tasks):
"""Adds many tasks to this flow.
@@ -158,54 +161,54 @@ class Flow(object):
Returns how many tasks were interrupted (if any).
"""
- if self.state in self.UNINTERRUPTIBLE_STATES:
- raise exc.InvalidStateException(("Can not interrupt when"
- " in state %s") % (self.state))
- # Note(harlowja): Do *not* acquire the lock here so that the flow may
- # be interrupted while running. This does mean the the above check may
- # not be valid but we can worry about that if it becomes an issue.
- old_state = self.state
- if old_state != states.INTERRUPTED:
- self._state = states.INTERRUPTED
- self.notifier.notify(self.state, details={
- 'context': None,
- 'flow': self,
- 'old_state': old_state,
- })
- return 0
+ def check():
+ if self.state in self.UNINTERRUPTIBLE_STATES:
+ raise exc.InvalidStateException(("Can not interrupt when"
+ " in state %s") % self.state)
+
+ check()
+ with self._state_lock:
+ check()
+ self._change_state(None, states.INTERRUPTED)
+ return 0
- @decorators.locked
def reset(self):
"""Fully resets the internal state of this flow, allowing for the flow
to be ran again.
Note: Listeners are also reset.
"""
- if self.state not in self.RESETTABLE_STATES:
- raise exc.InvalidStateException(("Can not reset when"
- " in state %s") % (self.state))
- self.notifier.reset()
- self.task_notifier.reset()
- self._change_state(None, states.PENDING)
-
- @decorators.locked
+ def check():
+ if self.state not in self.RESETTABLE_STATES:
+ raise exc.InvalidStateException(("Can not reset when"
+ " in state %s") % self.state)
+
+ check()
+ with self._state_lock:
+ check()
+ self.notifier.reset()
+ self.task_notifier.reset()
+ self._change_state(None, states.PENDING)
+
def soft_reset(self):
"""Partially resets the internal state of this flow, allowing for the
- flow to be ran again from an interrupted state only.
+ flow to be ran again from an interrupted state.
"""
- if self.state not in self.SOFT_RESETTABLE_STATES:
- raise exc.InvalidStateException(("Can not soft reset when"
- " in state %s") % (self.state))
- self._change_state(None, states.PENDING)
+ def check():
+ if self.state not in self.SOFT_RESETTABLE_STATES:
+ raise exc.InvalidStateException(("Can not soft reset when"
+ " in state %s") % self.state)
+
+ check()
+ with self._state_lock:
+ check()
+ self._change_state(None, states.PENDING)
- @decorators.locked
+ @abc.abstractmethod
def run(self, context, *args, **kwargs):
"""Executes the workflow."""
- if self.state not in self.RUNNABLE_STATES:
- raise exc.InvalidStateException("Unable to run flow when "
- "in state %s" % (self.state))
+ raise NotImplementedError()
- @decorators.locked
def rollback(self, context, cause):
"""Performs rollback of this workflow and any attached parent workflows
if present.