diff options
| author | Joshua Harlow <harlowja@yahoo-inc.com> | 2013-06-25 18:46:55 -0700 |
|---|---|---|
| committer | Joshua Harlow <harlowja@yahoo-inc.com> | 2013-08-26 12:45:21 -0700 |
| commit | fceabee18fd85397f7fa81557c0cadc532a87c4c (patch) | |
| tree | 4a53888ff07777c90c48f50259ff0d96de7bc480 /taskflow/flow.py | |
| parent | 392925d0130f18c18fd43c4449f7ee1a589bf802 (diff) | |
| download | taskflow-fceabee18fd85397f7fa81557c0cadc532a87c4c.tar.gz | |
Add a locally running threaded flow
Propose a new graph flow that will run every task
in the graph at the same time but will use a count
down latch concept to ensure that a tasks dependents
are provided before the task itself can run. This
allows for tasks to run in parallel (if they have
no dependents or are placed in disjoint parts of
graph).
Note: although this flow uses threads it is typically
expected that the underlying threads would be greenthreads
since python native threading is still suboptimal (for
various reasons).
Implements: blueprint locally-run-many-at-once
Change-Id: If434abd77758aa12fc99da395a2559995305a853
Diffstat (limited to 'taskflow/flow.py')
| -rw-r--r-- | taskflow/flow.py | 99 |
1 files changed, 51 insertions, 48 deletions
diff --git a/taskflow/flow.py b/taskflow/flow.py index 4319913..8d03e48 100644 --- a/taskflow/flow.py +++ b/taskflow/flow.py @@ -21,7 +21,6 @@ import threading from taskflow.openstack.common import uuidutils -from taskflow import decorators from taskflow import exceptions as exc from taskflow import states from taskflow import utils @@ -86,14 +85,14 @@ class Flow(object): # storage backend). self.notifier = utils.TransitionNotifier() self.task_notifier = utils.TransitionNotifier() - # Ensure that modifications and/or multiple runs aren't happening - # at the same time in the same flow at the same time. - self._lock = threading.RLock() # Assign this flow a unique identifer. if uuid: self._id = str(uuid) else: self._id = uuidutils.generate_uuid() + # Ensure we can not change the state at the same time in 2 different + # threads. + self._state_lock = threading.RLock() @property def name(self): @@ -109,21 +108,26 @@ class Flow(object): """Provides a read-only view of the flow state.""" return self._state - def _change_state(self, context, new_state): - was_changed = False - old_state = self.state - with self._lock: + def _change_state(self, context, new_state, check_func=None, notify=True): + old_state = None + changed = False + with self._state_lock: if self.state != new_state: - old_state = self.state - self._state = new_state - was_changed = True - if was_changed: - # Don't notify while holding the lock. + if (not check_func or + (check_func and check_func(self.state))): + changed = True + old_state = self.state + self._state = new_state + # Don't notify while holding the lock so that the reciever of said + # notifications can actually perform operations on the given flow + # without getting into deadlock. + if notify and changed: self.notifier.notify(self.state, details={ 'context': context, 'flow': self, 'old_state': old_state, }) + return changed def __str__(self): lines = ["Flow: %s" % (self.name)] @@ -141,7 +145,6 @@ class Flow(object): """ raise NotImplementedError() - @decorators.locked def add_many(self, tasks): """Adds many tasks to this flow. @@ -158,54 +161,54 @@ class Flow(object): Returns how many tasks were interrupted (if any). """ - if self.state in self.UNINTERRUPTIBLE_STATES: - raise exc.InvalidStateException(("Can not interrupt when" - " in state %s") % (self.state)) - # Note(harlowja): Do *not* acquire the lock here so that the flow may - # be interrupted while running. This does mean the the above check may - # not be valid but we can worry about that if it becomes an issue. - old_state = self.state - if old_state != states.INTERRUPTED: - self._state = states.INTERRUPTED - self.notifier.notify(self.state, details={ - 'context': None, - 'flow': self, - 'old_state': old_state, - }) - return 0 + def check(): + if self.state in self.UNINTERRUPTIBLE_STATES: + raise exc.InvalidStateException(("Can not interrupt when" + " in state %s") % self.state) + + check() + with self._state_lock: + check() + self._change_state(None, states.INTERRUPTED) + return 0 - @decorators.locked def reset(self): """Fully resets the internal state of this flow, allowing for the flow to be ran again. Note: Listeners are also reset. """ - if self.state not in self.RESETTABLE_STATES: - raise exc.InvalidStateException(("Can not reset when" - " in state %s") % (self.state)) - self.notifier.reset() - self.task_notifier.reset() - self._change_state(None, states.PENDING) - - @decorators.locked + def check(): + if self.state not in self.RESETTABLE_STATES: + raise exc.InvalidStateException(("Can not reset when" + " in state %s") % self.state) + + check() + with self._state_lock: + check() + self.notifier.reset() + self.task_notifier.reset() + self._change_state(None, states.PENDING) + def soft_reset(self): """Partially resets the internal state of this flow, allowing for the - flow to be ran again from an interrupted state only. + flow to be ran again from an interrupted state. """ - if self.state not in self.SOFT_RESETTABLE_STATES: - raise exc.InvalidStateException(("Can not soft reset when" - " in state %s") % (self.state)) - self._change_state(None, states.PENDING) + def check(): + if self.state not in self.SOFT_RESETTABLE_STATES: + raise exc.InvalidStateException(("Can not soft reset when" + " in state %s") % self.state) + + check() + with self._state_lock: + check() + self._change_state(None, states.PENDING) - @decorators.locked + @abc.abstractmethod def run(self, context, *args, **kwargs): """Executes the workflow.""" - if self.state not in self.RUNNABLE_STATES: - raise exc.InvalidStateException("Unable to run flow when " - "in state %s" % (self.state)) + raise NotImplementedError() - @decorators.locked def rollback(self, context, cause): """Performs rollback of this workflow and any attached parent workflows if present. |
