diff options
Diffstat (limited to 'taskflow')
| -rw-r--r-- | taskflow/persistence/logbook.py | 26 | ||||
| -rw-r--r-- | taskflow/storage.py | 20 |
2 files changed, 30 insertions, 16 deletions
diff --git a/taskflow/persistence/logbook.py b/taskflow/persistence/logbook.py index d3b0a7a..2de5848 100644 --- a/taskflow/persistence/logbook.py +++ b/taskflow/persistence/logbook.py @@ -49,6 +49,10 @@ def _safe_unmarshal_time(when): return timeutils.unmarshall_time(when) +def _was_failure(state, result): + return state == states.FAILURE and isinstance(result, misc.Failure) + + def _fix_meta(data): # Handle the case where older schemas allowed this to be non-dict by # correcting this case by replacing it with a dictionary when a non-dict @@ -317,6 +321,10 @@ class AtomDetail(object): def to_dict(self): """Translates the internal state of this object to a dictionary.""" + @abc.abstractmethod + def put(self, state, result): + """Puts a result (acquired in the given state) into this detail.""" + def _to_dict_shared(self): if self.failure: failure = self.failure.to_dict() @@ -367,6 +375,15 @@ class TaskDetail(AtomDetail): self.state = state self.intention = states.EXECUTE + def put(self, state, result): + self.state = state + if _was_failure(state, result): + self.failure = result + self.results = None + else: + self.results = result + self.failure = None + @classmethod def from_dict(cls, data): """Translates the given data into an instance of this class.""" @@ -416,6 +433,15 @@ class RetryDetail(AtomDetail): except IndexError as e: raise exc.NotFound("Last failures not found", e) + def put(self, state, result): + # Do not clean retry history (only on reset does this happen). + self.state = state + if _was_failure(state, result): + self.failure = result + else: + self.results.append((result, {})) + self.failure = None + @classmethod def from_dict(cls, data): """Translates the given data into an instance of this class.""" diff --git a/taskflow/storage.py b/taskflow/storage.py index 6d3068c..14fd318 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -53,7 +53,7 @@ class Storage(object): self._lock = self._lock_cls() # NOTE(imelnikov): failure serialization looses information, - # so we cache failures here, in task name -> misc.Failure mapping. + # so we cache failures here, in atom name -> failure mapping. self._failures = {} for ad in self._flowdetail: if ad.failure is not None: @@ -322,24 +322,12 @@ class Storage(object): """Put result for atom with id 'uuid' to storage.""" with self._lock.write_lock(): ad = self._atomdetail_by_name(atom_name) - ad.state = state + ad.put(state, data) if state == states.FAILURE and isinstance(data, misc.Failure): - # FIXME(harlowja): this seems like it should be internal logic - # in the atom detail object and not in here. Fix that soon... - # - # Do not clean retry history - if not isinstance(ad, logbook.RetryDetail): - ad.results = None - ad.failure = data + # NOTE(imelnikov): failure serialization looses information, + # so we cache failures here, in atom name -> failure mapping. self._failures[ad.name] = data else: - # FIXME(harlowja): this seems like it should be internal logic - # in the atom detail object and not in here. Fix that soon... - if isinstance(ad, logbook.RetryDetail): - ad.results.append((data, {})) - else: - ad.results = data - ad.failure = None self._check_all_results_provided(ad.name, data) self._with_connection(self._save_atom_detail, ad) |
