diff options
author | Jenkins <jenkins@review.openstack.org> | 2015-07-15 00:02:41 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2015-07-15 00:02:41 +0000 |
commit | 453e74d778aef158aaab18ce4166113e1b384b09 (patch) | |
tree | e784d0ca66cd2424107e9b8acfd5f2d0ac23cb97 /taskflow/persistence | |
parent | 9f846d0475b9862da6af52bd959d15a2cd8f5ab0 (diff) | |
parent | a3fe3eb698e7bfa20b0b7fddd91c37a44c092f2c (diff) | |
download | taskflow-453e74d778aef158aaab18ce4166113e1b384b09.tar.gz |
Merge "Retain atom 'revert' result (or failure)"
Diffstat (limited to 'taskflow/persistence')
3 files changed, 185 insertions, 48 deletions
diff --git a/taskflow/persistence/backends/sqlalchemy/alembic/versions/3162c0f3f8e4_add_revert_results_and_revert_failure_.py b/taskflow/persistence/backends/sqlalchemy/alembic/versions/3162c0f3f8e4_add_revert_results_and_revert_failure_.py new file mode 100644 index 0000000..dd54dff --- /dev/null +++ b/taskflow/persistence/backends/sqlalchemy/alembic/versions/3162c0f3f8e4_add_revert_results_and_revert_failure_.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Add 'revert_results' and 'revert_failure' atom detail column. + +Revision ID: 3162c0f3f8e4 +Revises: 589dccdf2b6e +Create Date: 2015-06-17 15:52:56.575245 + +""" + +# revision identifiers, used by Alembic. +revision = '3162c0f3f8e4' +down_revision = '589dccdf2b6e' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.add_column('atomdetails', + sa.Column('revert_results', sa.Text(), nullable=True)) + op.add_column('atomdetails', + sa.Column('revert_failure', sa.Text(), nullable=True)) + + +def downgrade(): + op.drop_column('atomdetails', 'revert_results') + op.drop_column('atomdetails', 'revert_failure') diff --git a/taskflow/persistence/backends/sqlalchemy/tables.py b/taskflow/persistence/backends/sqlalchemy/tables.py index 28acca1..65969fb 100644 --- a/taskflow/persistence/backends/sqlalchemy/tables.py +++ b/taskflow/persistence/backends/sqlalchemy/tables.py @@ -92,6 +92,8 @@ def fetch(metadata): default=uuidutils.generate_uuid), Column('failure', Json), Column('results', Json), + Column('revert_results', Json), + Column('revert_failure', Json), Column('atom_type', Enum(*models.ATOM_TYPES, name='atom_types')), Column('intention', Enum(*states.INTENTIONS, diff --git a/taskflow/persistence/models.py b/taskflow/persistence/models.py index c7a6eae..e41d6d7 100644 --- a/taskflow/persistence/models.py +++ b/taskflow/persistence/models.py @@ -32,6 +32,14 @@ LOG = logging.getLogger(__name__) # Internal helpers... +def _is_all_none(arg, *args): + if arg is not None: + return False + for more_arg in args: + if more_arg is not None: + return False + return True + def _copy_function(deep_copy): if deep_copy: @@ -413,11 +421,18 @@ class AtomDetail(object): strategies). :ivar results: Any results the atom produced from either its ``execute`` method or from other sources. - :ivar failure: If the atom failed (possibly due to its ``execute`` - method raising) this will be a + :ivar revert_results: Any results the atom produced from either its + ``revert`` method or from other sources. + :ivar failure: If the atom failed (due to its ``execute`` method + raising) this will be a :py:class:`~taskflow.types.failure.Failure` object that represents that failure (if there was no failure this will be set to none). + :ivar revert_failure: If the atom failed (possibly due to its ``revert`` + method raising) this will be a + :py:class:`~taskflow.types.failure.Failure` object + that represents that failure (if there was no + failure this will be set to none). """ def __init__(self, name, uuid): @@ -427,6 +442,8 @@ class AtomDetail(object): self.intention = states.EXECUTE self.results = None self.failure = None + self.revert_results = None + self.revert_failure = None self.meta = {} self.version = None @@ -465,6 +482,8 @@ class AtomDetail(object): self.meta = ad.meta self.failure = ad.failure self.results = ad.results + self.revert_results = ad.revert_results + self.revert_failure = ad.revert_failure self.version = ad.version return self @@ -503,6 +522,16 @@ class AtomDetail(object): self.failure = other.failure else: self.failure = None + if self.revert_failure != other.revert_failure: + # NOTE(imelnikov): we can't just deep copy Failures, as they + # contain tracebacks, which are not copyable. + if other.revert_failure: + if deep_copy: + self.revert_failure = other.revert_failure.copy() + else: + self.revert_failure = other.revert_failure + else: + self.revert_failure = None if self.meta != other.meta: self.meta = copy_fn(other.meta) if self.version != other.version: @@ -522,11 +551,17 @@ class AtomDetail(object): failure = self.failure.to_dict() else: failure = None + if self.revert_failure: + revert_failure = self.revert_failure.to_dict() + else: + revert_failure = None return { 'failure': failure, + 'revert_failure': revert_failure, 'meta': self.meta, 'name': self.name, 'results': self.results, + 'revert_results': self.revert_results, 'state': self.state, 'version': self.version, 'intention': self.intention, @@ -547,11 +582,15 @@ class AtomDetail(object): obj.state = data.get('state') obj.intention = data.get('intention') obj.results = data.get('results') + obj.revert_results = data.get('revert_results') obj.version = data.get('version') obj.meta = _fix_meta(data) failure = data.get('failure') if failure: obj.failure = ft.Failure.from_dict(failure) + revert_failure = data.get('revert_failure') + if revert_failure: + obj.revert_failure = ft.Failure.from_dict(revert_failure) return obj @property @@ -582,47 +621,65 @@ class TaskDetail(AtomDetail): def reset(self, state): """Resets this task detail and sets ``state`` attribute value. - This sets any previously set ``results`` and ``failure`` attributes - back to ``None`` and sets the state to the provided one, as well as - setting this task details ``intention`` attribute to ``EXECUTE``. + This sets any previously set ``results``, ``failure``, + and ``revert_results`` attributes back to ``None`` and sets the + state to the provided one, as well as setting this task + details ``intention`` attribute to ``EXECUTE``. """ self.results = None self.failure = None + self.revert_results = None + self.revert_failure = None self.state = state self.intention = states.EXECUTE def put(self, state, result): """Puts a result (acquired in the given state) into this detail. - If the result is a :py:class:`~taskflow.types.failure.Failure` object - then the ``failure`` attribute will be set (and the ``results`` - attribute will be set to ``None``); if the result is not a - :py:class:`~taskflow.types.failure.Failure` object then the - ``results`` attribute will be set (and the ``failure`` attribute - will be set to ``None``). In either case the ``state`` - attribute will be set to the provided state. + Returns whether this object was modified (or whether it was not). """ was_altered = False - if self.state != state: + if state != self.state: self.state = state was_altered = True - if self._was_failure(state, result): + if state == states.REVERT_FAILURE: + if self.revert_failure != result: + self.revert_failure = result + was_altered = True + if not _is_all_none(self.results, self.revert_results): + self.results = None + self.revert_results = None + was_altered = True + elif state == states.FAILURE: if self.failure != result: self.failure = result was_altered = True - if self.results is not None: + if not _is_all_none(self.results, self.revert_results, + self.revert_failure): self.results = None + self.revert_results = None + self.revert_failure = None + was_altered = True + elif state == states.SUCCESS: + if not _is_all_none(self.revert_results, self.revert_failure, + self.failure): + self.revert_results = None + self.revert_failure = None + self.failure = None was_altered = True - else: # We don't really have the ability to determine equality of # task (user) results at the current time, without making # potentially bad guesses, so assume the task detail always needs # to be saved if they are not exactly equivalent... - if self.results is not result: + if result is not self.results: self.results = result was_altered = True - if self.failure is not None: - self.failure = None + elif state == states.REVERTED: + if not _is_all_none(self.revert_failure): + self.revert_failure = None + was_altered = True + if result is not self.revert_results: + self.revert_results = result was_altered = True return was_altered @@ -630,10 +687,11 @@ class TaskDetail(AtomDetail): """Merges the current task detail with the given one. NOTE(harlowja): This merge does **not** copy and replace - the ``results`` attribute if it differs. Instead the current - objects ``results`` attribute directly becomes (via assignment) the - other objects ``results`` attribute. Also note that if the provided - object is this object itself then **no** merging is done. + the ``results`` or ``revert_results`` if it differs. Instead the + current objects ``results`` and ``revert_results`` attributes directly + becomes (via assignment) the other objects attributes. Also note that + if the provided object is this object itself then **no** merging is + done. See: https://bugs.launchpad.net/taskflow/+bug/1452978 for what happens if this is copied at a deeper level (for example by @@ -648,8 +706,8 @@ class TaskDetail(AtomDetail): if other is self: return self super(TaskDetail, self).merge(other, deep_copy=deep_copy) - if self.results != other.results: - self.results = other.results + self.results = other.results + self.revert_results = other.revert_results return self def copy(self): @@ -659,10 +717,10 @@ class TaskDetail(AtomDetail): version information that this object maintains is shallow copied via ``copy.copy``). - NOTE(harlowja): This copy does **not** perform ``copy.copy`` on - the ``results`` attribute of this object (before assigning to the - copy). Instead the current objects ``results`` attribute directly - becomes (via assignment) the copied objects ``results`` attribute. + NOTE(harlowja): This copy does **not** copy and replace + the ``results`` or ``revert_results`` attribute if it differs. Instead + the current objects ``results`` and ``revert_results`` attributes + directly becomes (via assignment) the cloned objects attributes. See: https://bugs.launchpad.net/taskflow/+bug/1452978 for what happens if this is copied at a deeper level (for example by @@ -673,6 +731,7 @@ class TaskDetail(AtomDetail): """ clone = copy.copy(self) clone.results = self.results + clone.revert_results = self.revert_results if self.meta: clone.meta = self.meta.copy() if self.version: @@ -694,12 +753,15 @@ class RetryDetail(AtomDetail): """Resets this retry detail and sets ``state`` attribute value. This sets any previously added ``results`` back to an empty list - and resets the ``failure`` attribute back to ``None`` and sets the - state to the provided one, as well as setting this atom + and resets the ``failure`` and ``revert_failure`` and + ``revert_results`` attributes back to ``None`` and sets the state + to the provided one, as well as setting this retry details ``intention`` attribute to ``EXECUTE``. """ self.results = [] + self.revert_results = None self.failure = None + self.revert_failure = None self.state = state self.intention = states.EXECUTE @@ -711,14 +773,15 @@ class RetryDetail(AtomDetail): copied via ``copy.copy``). NOTE(harlowja): This copy does **not** copy - the incoming objects ``results`` attribute. Instead this - objects ``results`` attribute list is iterated over and a new list - is constructed with each ``(data, failures)`` element in that list - having its ``failures`` (a dictionary of each named + the incoming objects ``results`` or ``revert_results`` attributes. + Instead this objects ``results`` attribute list is iterated over and + a new list is constructed with each ``(data, failures)`` element in + that list having its ``failures`` (a dictionary of each named :py:class:`~taskflow.types.failure.Failure` object that occured) copied but its ``data`` is left untouched. After this is done that new list becomes (via assignment) the cloned - objects ``results`` attribute. + objects ``results`` attribute. The ``revert_results`` is directly + assigned to the cloned objects ``revert_results`` attribute. See: https://bugs.launchpad.net/taskflow/+bug/1452978 for what happens if the ``data`` in ``results`` is copied at a @@ -738,6 +801,7 @@ class RetryDetail(AtomDetail): copied_failures[key] = failure results.append((data, copied_failures)) clone.results = results + clone.revert_results = self.revert_results if self.meta: clone.meta = self.meta.copy() if self.version: @@ -771,21 +835,50 @@ class RetryDetail(AtomDetail): def put(self, state, result): """Puts a result (acquired in the given state) into this detail. - If the result is a :py:class:`~taskflow.types.failure.Failure` object - then the ``failure`` attribute will be set; if the result is not a - :py:class:`~taskflow.types.failure.Failure` object then the - ``results`` attribute will be appended to (and the ``failure`` - attribute will be set to ``None``). In either case the ``state`` - attribute will be set to the provided state. + Returns whether this object was modified (or whether it was not). """ # Do not clean retry history (only on reset does this happen). - self.state = state - if self._was_failure(state, result): - self.failure = result - else: + was_altered = False + if state != self.state: + self.state = state + was_altered = True + if state == states.REVERT_FAILURE: + if result != self.revert_failure: + self.revert_failure = result + was_altered = True + if not _is_all_none(self.revert_results): + self.revert_results = None + was_altered = True + elif state == states.FAILURE: + if result != self.failure: + self.failure = result + was_altered = True + if not _is_all_none(self.revert_results, self.revert_failure): + self.revert_results = None + self.revert_failure = None + was_altered = True + elif state == states.SUCCESS: + if not _is_all_none(self.failure, self.revert_failure, + self.revert_results): + self.failure = None + self.revert_failure = None + self.revert_results = None + # Track what we produced, so that we can examine it (or avoid + # using it again). self.results.append((result, {})) - self.failure = None - return True + was_altered = True + elif state == states.REVERTED: + # We don't really have the ability to determine equality of + # task (user) results at the current time, without making + # potentially bad guesses, so assume the retry detail always needs + # to be saved if they are not exactly equivalent... + if result is not self.revert_results: + self.revert_results = result + was_altered = True + if not _is_all_none(self.revert_failure): + self.revert_failure = None + was_altered = True + return was_altered @classmethod def from_dict(cls, data): |