summaryrefslogtreecommitdiff
path: root/taskflow
diff options
context:
space:
mode:
Diffstat (limited to 'taskflow')
-rw-r--r--taskflow/persistence/logbook.py26
-rw-r--r--taskflow/storage.py20
2 files changed, 30 insertions, 16 deletions
diff --git a/taskflow/persistence/logbook.py b/taskflow/persistence/logbook.py
index d3b0a7a..2de5848 100644
--- a/taskflow/persistence/logbook.py
+++ b/taskflow/persistence/logbook.py
@@ -49,6 +49,10 @@ def _safe_unmarshal_time(when):
return timeutils.unmarshall_time(when)
+def _was_failure(state, result):
+ return state == states.FAILURE and isinstance(result, misc.Failure)
+
+
def _fix_meta(data):
# Handle the case where older schemas allowed this to be non-dict by
# correcting this case by replacing it with a dictionary when a non-dict
@@ -317,6 +321,10 @@ class AtomDetail(object):
def to_dict(self):
"""Translates the internal state of this object to a dictionary."""
+ @abc.abstractmethod
+ def put(self, state, result):
+ """Puts a result (acquired in the given state) into this detail."""
+
def _to_dict_shared(self):
if self.failure:
failure = self.failure.to_dict()
@@ -367,6 +375,15 @@ class TaskDetail(AtomDetail):
self.state = state
self.intention = states.EXECUTE
+ def put(self, state, result):
+ self.state = state
+ if _was_failure(state, result):
+ self.failure = result
+ self.results = None
+ else:
+ self.results = result
+ self.failure = None
+
@classmethod
def from_dict(cls, data):
"""Translates the given data into an instance of this class."""
@@ -416,6 +433,15 @@ class RetryDetail(AtomDetail):
except IndexError as e:
raise exc.NotFound("Last failures not found", e)
+ def put(self, state, result):
+ # Do not clean retry history (only on reset does this happen).
+ self.state = state
+ if _was_failure(state, result):
+ self.failure = result
+ else:
+ self.results.append((result, {}))
+ self.failure = None
+
@classmethod
def from_dict(cls, data):
"""Translates the given data into an instance of this class."""
diff --git a/taskflow/storage.py b/taskflow/storage.py
index 6d3068c..14fd318 100644
--- a/taskflow/storage.py
+++ b/taskflow/storage.py
@@ -53,7 +53,7 @@ class Storage(object):
self._lock = self._lock_cls()
# NOTE(imelnikov): failure serialization looses information,
- # so we cache failures here, in task name -> misc.Failure mapping.
+ # so we cache failures here, in atom name -> failure mapping.
self._failures = {}
for ad in self._flowdetail:
if ad.failure is not None:
@@ -322,24 +322,12 @@ class Storage(object):
"""Put result for atom with id 'uuid' to storage."""
with self._lock.write_lock():
ad = self._atomdetail_by_name(atom_name)
- ad.state = state
+ ad.put(state, data)
if state == states.FAILURE and isinstance(data, misc.Failure):
- # FIXME(harlowja): this seems like it should be internal logic
- # in the atom detail object and not in here. Fix that soon...
- #
- # Do not clean retry history
- if not isinstance(ad, logbook.RetryDetail):
- ad.results = None
- ad.failure = data
+ # NOTE(imelnikov): failure serialization looses information,
+ # so we cache failures here, in atom name -> failure mapping.
self._failures[ad.name] = data
else:
- # FIXME(harlowja): this seems like it should be internal logic
- # in the atom detail object and not in here. Fix that soon...
- if isinstance(ad, logbook.RetryDetail):
- ad.results.append((data, {}))
- else:
- ad.results = data
- ad.failure = None
self._check_all_results_provided(ad.name, data)
self._with_connection(self._save_atom_detail, ad)