summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Harlow <jxharlow@godaddy.com>2016-06-14 17:13:37 -0700
committerChangBo Guo(gcb) <glongwave@gmail.com>2017-01-09 12:51:44 +0000
commit22f75755b744c220b8dfd085dbb39e838d58f051 (patch)
tree917f61c53da42582fdab6c19611c0a477acaacc7
parentb3b659f38b7e6879f5c03d2dc0d4cae752802ee5 (diff)
downloadtaskflow-22f75755b744c220b8dfd085dbb39e838d58f051.tar.gz
Protect storage better against external concurrent access
Lock down the various state machine action handling functions so that they are ensured correct (and consistent) access to the storage layer when they are modifiying and/or reading it. Change-Id: Ie893a44aa963ab515f19e77f9904f49c843cb4e5
-rw-r--r--taskflow/engines/action_engine/builder.py94
-rw-r--r--taskflow/storage.py12
2 files changed, 62 insertions, 44 deletions
diff --git a/taskflow/engines/action_engine/builder.py b/taskflow/engines/action_engine/builder.py
index 0666307..1683501 100644
--- a/taskflow/engines/action_engine/builder.py
+++ b/taskflow/engines/action_engine/builder.py
@@ -143,10 +143,11 @@ class MachineBuilder(object):
get_atom_intention = self._storage.get_atom_intention
def do_schedule(next_nodes):
- return self._scheduler.schedule(
- sorted(next_nodes,
- key=lambda node: getattr(node, 'priority', 0),
- reverse=True))
+ with self._storage.lock.write_lock():
+ return self._scheduler.schedule(
+ sorted(next_nodes,
+ key=lambda node: getattr(node, 'priority', 0),
+ reverse=True))
def iter_next_atoms(atom=None, apply_deciders=True):
# Yields and filters and tweaks the next atoms to run...
@@ -164,9 +165,10 @@ class MachineBuilder(object):
# to include any nodes that need to be executed (from a previous
# attempt, which may be empty if never ran before) and any nodes
# that are now ready to be ran.
- memory.next_up.update(
- iter_utils.unique_seen((self._completer.resume(),
- iter_next_atoms())))
+ with self._storage.lock.write_lock():
+ memory.next_up.update(
+ iter_utils.unique_seen((self._completer.resume(),
+ iter_next_atoms())))
return SCHEDULE
def game_over(old_state, new_state, event):
@@ -176,11 +178,12 @@ class MachineBuilder(object):
# it is *always* called before the final state is entered.
if memory.failures:
return FAILED
- leftover_atoms = iter_utils.count(
- # Avoid activating the deciders, since at this point
- # the engine is finishing and there will be no more further
- # work done anyway...
- iter_next_atoms(apply_deciders=False))
+ with self._storage.lock.read_lock():
+ leftover_atoms = iter_utils.count(
+ # Avoid activating the deciders, since at this point
+ # the engine is finishing and there will be no more further
+ # work done anyway...
+ iter_next_atoms(apply_deciders=False))
if leftover_atoms:
# Ok we didn't finish (either reverting or executing...) so
# that means we must of been stopped at some point...
@@ -199,21 +202,22 @@ class MachineBuilder(object):
# if the user of this engine has requested the engine/storage
# that holds this information to stop or suspend); handles failures
# that occur during this process safely...
- current_flow_state = self._storage.get_flow_state()
- if current_flow_state == st.RUNNING and memory.next_up:
- not_done, failures = do_schedule(memory.next_up)
- if not_done:
- memory.not_done.update(not_done)
- if failures:
- memory.failures.extend(failures)
- memory.next_up.intersection_update(not_done)
- elif current_flow_state == st.SUSPENDING and memory.not_done:
- # Try to force anything not cancelled to now be cancelled
- # so that the executor that gets it does not continue to
- # try to work on it (if the future execution is still in
- # its backlog, if it's already being executed, this will
- # do nothing).
- memory.cancel_futures()
+ with self._storage.lock.write_lock():
+ current_flow_state = self._storage.get_flow_state()
+ if current_flow_state == st.RUNNING and memory.next_up:
+ not_done, failures = do_schedule(memory.next_up)
+ if not_done:
+ memory.not_done.update(not_done)
+ if failures:
+ memory.failures.extend(failures)
+ memory.next_up.intersection_update(not_done)
+ elif current_flow_state == st.SUSPENDING and memory.not_done:
+ # Try to force anything not cancelled to now be cancelled
+ # so that the executor that gets it does not continue to
+ # try to work on it (if the future execution is still in
+ # its backlog, if it's already being executed, this will
+ # do nothing).
+ memory.cancel_futures()
return WAIT
def complete_an_atom(fut):
@@ -277,23 +281,25 @@ class MachineBuilder(object):
# nodes to be scheduled in the future); handles failures that
# occur during this process safely...
next_up = set()
- while memory.done:
- fut = memory.done.pop()
- # Force it to be completed so that we can ensure that
- # before we iterate over any successors or predecessors
- # that we know it has been completed and saved and so on...
- completion_status = complete_an_atom(fut)
- if (not memory.failures
- and completion_status != WAS_CANCELLED):
- atom = fut.atom
- try:
- more_work = set(iter_next_atoms(atom=atom))
- except Exception:
- memory.failures.append(failure.Failure())
- LOG.exception("Engine '%s' atom post-completion"
- " next atom searching failed", atom)
- else:
- next_up.update(more_work)
+ with self._storage.lock.write_lock():
+ while memory.done:
+ fut = memory.done.pop()
+ # Force it to be completed so that we can ensure that
+ # before we iterate over any successors or predecessors
+ # that we know it has been completed and saved and so on...
+ completion_status = complete_an_atom(fut)
+ if (not memory.failures
+ and completion_status != WAS_CANCELLED):
+ atom = fut.atom
+ try:
+ more_work = set(iter_next_atoms(atom=atom))
+ except Exception:
+ memory.failures.append(failure.Failure())
+ LOG.exception(
+ "Engine '%s' atom post-completion"
+ " next atom searching failed", atom)
+ else:
+ next_up.update(more_work)
current_flow_state = self._storage.get_flow_state()
if (current_flow_state == st.RUNNING
and next_up and not memory.failures):
diff --git a/taskflow/storage.py b/taskflow/storage.py
index be52774..76d059b 100644
--- a/taskflow/storage.py
+++ b/taskflow/storage.py
@@ -407,6 +407,18 @@ class Storage(object):
self._failures.setdefault(atom_name, {})
return atom_ids
+ @property
+ def lock(self):
+ """Reader/writer lock used to ensure multi-thread safety.
+
+ This does **not** protect against the **same** storage objects being
+ used by multiple engines/users across multiple processes (or
+ different machines); certain backends handle that situation better
+ than others (for example by using sequence identifiers) and it's a
+ ongoing work in progress to make that better).
+ """
+ return self._lock
+
def ensure_atom(self, atom):
"""Ensure there is an atomdetail for the **given** atom.