summaryrefslogtreecommitdiff
path: root/taskflow/persistence
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-07-15 00:02:41 +0000
committerGerrit Code Review <review@openstack.org>2015-07-15 00:02:41 +0000
commit453e74d778aef158aaab18ce4166113e1b384b09 (patch)
treee784d0ca66cd2424107e9b8acfd5f2d0ac23cb97 /taskflow/persistence
parent9f846d0475b9862da6af52bd959d15a2cd8f5ab0 (diff)
parenta3fe3eb698e7bfa20b0b7fddd91c37a44c092f2c (diff)
downloadtaskflow-453e74d778aef158aaab18ce4166113e1b384b09.tar.gz
Merge "Retain atom 'revert' result (or failure)"
Diffstat (limited to 'taskflow/persistence')
-rw-r--r--taskflow/persistence/backends/sqlalchemy/alembic/versions/3162c0f3f8e4_add_revert_results_and_revert_failure_.py42
-rw-r--r--taskflow/persistence/backends/sqlalchemy/tables.py2
-rw-r--r--taskflow/persistence/models.py189
3 files changed, 185 insertions, 48 deletions
diff --git a/taskflow/persistence/backends/sqlalchemy/alembic/versions/3162c0f3f8e4_add_revert_results_and_revert_failure_.py b/taskflow/persistence/backends/sqlalchemy/alembic/versions/3162c0f3f8e4_add_revert_results_and_revert_failure_.py
new file mode 100644
index 0000000..dd54dff
--- /dev/null
+++ b/taskflow/persistence/backends/sqlalchemy/alembic/versions/3162c0f3f8e4_add_revert_results_and_revert_failure_.py
@@ -0,0 +1,42 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Add 'revert_results' and 'revert_failure' atom detail column.
+
+Revision ID: 3162c0f3f8e4
+Revises: 589dccdf2b6e
+Create Date: 2015-06-17 15:52:56.575245
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '3162c0f3f8e4'
+down_revision = '589dccdf2b6e'
+
+from alembic import op
+import sqlalchemy as sa
+
+
+def upgrade():
+ op.add_column('atomdetails',
+ sa.Column('revert_results', sa.Text(), nullable=True))
+ op.add_column('atomdetails',
+ sa.Column('revert_failure', sa.Text(), nullable=True))
+
+
+def downgrade():
+ op.drop_column('atomdetails', 'revert_results')
+ op.drop_column('atomdetails', 'revert_failure')
diff --git a/taskflow/persistence/backends/sqlalchemy/tables.py b/taskflow/persistence/backends/sqlalchemy/tables.py
index 28acca1..65969fb 100644
--- a/taskflow/persistence/backends/sqlalchemy/tables.py
+++ b/taskflow/persistence/backends/sqlalchemy/tables.py
@@ -92,6 +92,8 @@ def fetch(metadata):
default=uuidutils.generate_uuid),
Column('failure', Json),
Column('results', Json),
+ Column('revert_results', Json),
+ Column('revert_failure', Json),
Column('atom_type', Enum(*models.ATOM_TYPES,
name='atom_types')),
Column('intention', Enum(*states.INTENTIONS,
diff --git a/taskflow/persistence/models.py b/taskflow/persistence/models.py
index c7a6eae..e41d6d7 100644
--- a/taskflow/persistence/models.py
+++ b/taskflow/persistence/models.py
@@ -32,6 +32,14 @@ LOG = logging.getLogger(__name__)
# Internal helpers...
+def _is_all_none(arg, *args):
+ if arg is not None:
+ return False
+ for more_arg in args:
+ if more_arg is not None:
+ return False
+ return True
+
def _copy_function(deep_copy):
if deep_copy:
@@ -413,11 +421,18 @@ class AtomDetail(object):
strategies).
:ivar results: Any results the atom produced from either its
``execute`` method or from other sources.
- :ivar failure: If the atom failed (possibly due to its ``execute``
- method raising) this will be a
+ :ivar revert_results: Any results the atom produced from either its
+ ``revert`` method or from other sources.
+ :ivar failure: If the atom failed (due to its ``execute`` method
+ raising) this will be a
:py:class:`~taskflow.types.failure.Failure` object that
represents that failure (if there was no failure this
will be set to none).
+ :ivar revert_failure: If the atom failed (possibly due to its ``revert``
+ method raising) this will be a
+ :py:class:`~taskflow.types.failure.Failure` object
+ that represents that failure (if there was no
+ failure this will be set to none).
"""
def __init__(self, name, uuid):
@@ -427,6 +442,8 @@ class AtomDetail(object):
self.intention = states.EXECUTE
self.results = None
self.failure = None
+ self.revert_results = None
+ self.revert_failure = None
self.meta = {}
self.version = None
@@ -465,6 +482,8 @@ class AtomDetail(object):
self.meta = ad.meta
self.failure = ad.failure
self.results = ad.results
+ self.revert_results = ad.revert_results
+ self.revert_failure = ad.revert_failure
self.version = ad.version
return self
@@ -503,6 +522,16 @@ class AtomDetail(object):
self.failure = other.failure
else:
self.failure = None
+ if self.revert_failure != other.revert_failure:
+ # NOTE(imelnikov): we can't just deep copy Failures, as they
+ # contain tracebacks, which are not copyable.
+ if other.revert_failure:
+ if deep_copy:
+ self.revert_failure = other.revert_failure.copy()
+ else:
+ self.revert_failure = other.revert_failure
+ else:
+ self.revert_failure = None
if self.meta != other.meta:
self.meta = copy_fn(other.meta)
if self.version != other.version:
@@ -522,11 +551,17 @@ class AtomDetail(object):
failure = self.failure.to_dict()
else:
failure = None
+ if self.revert_failure:
+ revert_failure = self.revert_failure.to_dict()
+ else:
+ revert_failure = None
return {
'failure': failure,
+ 'revert_failure': revert_failure,
'meta': self.meta,
'name': self.name,
'results': self.results,
+ 'revert_results': self.revert_results,
'state': self.state,
'version': self.version,
'intention': self.intention,
@@ -547,11 +582,15 @@ class AtomDetail(object):
obj.state = data.get('state')
obj.intention = data.get('intention')
obj.results = data.get('results')
+ obj.revert_results = data.get('revert_results')
obj.version = data.get('version')
obj.meta = _fix_meta(data)
failure = data.get('failure')
if failure:
obj.failure = ft.Failure.from_dict(failure)
+ revert_failure = data.get('revert_failure')
+ if revert_failure:
+ obj.revert_failure = ft.Failure.from_dict(revert_failure)
return obj
@property
@@ -582,47 +621,65 @@ class TaskDetail(AtomDetail):
def reset(self, state):
"""Resets this task detail and sets ``state`` attribute value.
- This sets any previously set ``results`` and ``failure`` attributes
- back to ``None`` and sets the state to the provided one, as well as
- setting this task details ``intention`` attribute to ``EXECUTE``.
+ This sets any previously set ``results``, ``failure``,
+ and ``revert_results`` attributes back to ``None`` and sets the
+ state to the provided one, as well as setting this task
+ details ``intention`` attribute to ``EXECUTE``.
"""
self.results = None
self.failure = None
+ self.revert_results = None
+ self.revert_failure = None
self.state = state
self.intention = states.EXECUTE
def put(self, state, result):
"""Puts a result (acquired in the given state) into this detail.
- If the result is a :py:class:`~taskflow.types.failure.Failure` object
- then the ``failure`` attribute will be set (and the ``results``
- attribute will be set to ``None``); if the result is not a
- :py:class:`~taskflow.types.failure.Failure` object then the
- ``results`` attribute will be set (and the ``failure`` attribute
- will be set to ``None``). In either case the ``state``
- attribute will be set to the provided state.
+ Returns whether this object was modified (or whether it was not).
"""
was_altered = False
- if self.state != state:
+ if state != self.state:
self.state = state
was_altered = True
- if self._was_failure(state, result):
+ if state == states.REVERT_FAILURE:
+ if self.revert_failure != result:
+ self.revert_failure = result
+ was_altered = True
+ if not _is_all_none(self.results, self.revert_results):
+ self.results = None
+ self.revert_results = None
+ was_altered = True
+ elif state == states.FAILURE:
if self.failure != result:
self.failure = result
was_altered = True
- if self.results is not None:
+ if not _is_all_none(self.results, self.revert_results,
+ self.revert_failure):
self.results = None
+ self.revert_results = None
+ self.revert_failure = None
+ was_altered = True
+ elif state == states.SUCCESS:
+ if not _is_all_none(self.revert_results, self.revert_failure,
+ self.failure):
+ self.revert_results = None
+ self.revert_failure = None
+ self.failure = None
was_altered = True
- else:
# We don't really have the ability to determine equality of
# task (user) results at the current time, without making
# potentially bad guesses, so assume the task detail always needs
# to be saved if they are not exactly equivalent...
- if self.results is not result:
+ if result is not self.results:
self.results = result
was_altered = True
- if self.failure is not None:
- self.failure = None
+ elif state == states.REVERTED:
+ if not _is_all_none(self.revert_failure):
+ self.revert_failure = None
+ was_altered = True
+ if result is not self.revert_results:
+ self.revert_results = result
was_altered = True
return was_altered
@@ -630,10 +687,11 @@ class TaskDetail(AtomDetail):
"""Merges the current task detail with the given one.
NOTE(harlowja): This merge does **not** copy and replace
- the ``results`` attribute if it differs. Instead the current
- objects ``results`` attribute directly becomes (via assignment) the
- other objects ``results`` attribute. Also note that if the provided
- object is this object itself then **no** merging is done.
+ the ``results`` or ``revert_results`` if it differs. Instead the
+ current objects ``results`` and ``revert_results`` attributes directly
+ becomes (via assignment) the other objects attributes. Also note that
+ if the provided object is this object itself then **no** merging is
+ done.
See: https://bugs.launchpad.net/taskflow/+bug/1452978 for
what happens if this is copied at a deeper level (for example by
@@ -648,8 +706,8 @@ class TaskDetail(AtomDetail):
if other is self:
return self
super(TaskDetail, self).merge(other, deep_copy=deep_copy)
- if self.results != other.results:
- self.results = other.results
+ self.results = other.results
+ self.revert_results = other.revert_results
return self
def copy(self):
@@ -659,10 +717,10 @@ class TaskDetail(AtomDetail):
version information that this object maintains is shallow
copied via ``copy.copy``).
- NOTE(harlowja): This copy does **not** perform ``copy.copy`` on
- the ``results`` attribute of this object (before assigning to the
- copy). Instead the current objects ``results`` attribute directly
- becomes (via assignment) the copied objects ``results`` attribute.
+ NOTE(harlowja): This copy does **not** copy and replace
+ the ``results`` or ``revert_results`` attribute if it differs. Instead
+ the current objects ``results`` and ``revert_results`` attributes
+ directly becomes (via assignment) the cloned objects attributes.
See: https://bugs.launchpad.net/taskflow/+bug/1452978 for
what happens if this is copied at a deeper level (for example by
@@ -673,6 +731,7 @@ class TaskDetail(AtomDetail):
"""
clone = copy.copy(self)
clone.results = self.results
+ clone.revert_results = self.revert_results
if self.meta:
clone.meta = self.meta.copy()
if self.version:
@@ -694,12 +753,15 @@ class RetryDetail(AtomDetail):
"""Resets this retry detail and sets ``state`` attribute value.
This sets any previously added ``results`` back to an empty list
- and resets the ``failure`` attribute back to ``None`` and sets the
- state to the provided one, as well as setting this atom
+ and resets the ``failure`` and ``revert_failure`` and
+ ``revert_results`` attributes back to ``None`` and sets the state
+ to the provided one, as well as setting this retry
details ``intention`` attribute to ``EXECUTE``.
"""
self.results = []
+ self.revert_results = None
self.failure = None
+ self.revert_failure = None
self.state = state
self.intention = states.EXECUTE
@@ -711,14 +773,15 @@ class RetryDetail(AtomDetail):
copied via ``copy.copy``).
NOTE(harlowja): This copy does **not** copy
- the incoming objects ``results`` attribute. Instead this
- objects ``results`` attribute list is iterated over and a new list
- is constructed with each ``(data, failures)`` element in that list
- having its ``failures`` (a dictionary of each named
+ the incoming objects ``results`` or ``revert_results`` attributes.
+ Instead this objects ``results`` attribute list is iterated over and
+ a new list is constructed with each ``(data, failures)`` element in
+ that list having its ``failures`` (a dictionary of each named
:py:class:`~taskflow.types.failure.Failure` object that
occured) copied but its ``data`` is left untouched. After
this is done that new list becomes (via assignment) the cloned
- objects ``results`` attribute.
+ objects ``results`` attribute. The ``revert_results`` is directly
+ assigned to the cloned objects ``revert_results`` attribute.
See: https://bugs.launchpad.net/taskflow/+bug/1452978 for
what happens if the ``data`` in ``results`` is copied at a
@@ -738,6 +801,7 @@ class RetryDetail(AtomDetail):
copied_failures[key] = failure
results.append((data, copied_failures))
clone.results = results
+ clone.revert_results = self.revert_results
if self.meta:
clone.meta = self.meta.copy()
if self.version:
@@ -771,21 +835,50 @@ class RetryDetail(AtomDetail):
def put(self, state, result):
"""Puts a result (acquired in the given state) into this detail.
- If the result is a :py:class:`~taskflow.types.failure.Failure` object
- then the ``failure`` attribute will be set; if the result is not a
- :py:class:`~taskflow.types.failure.Failure` object then the
- ``results`` attribute will be appended to (and the ``failure``
- attribute will be set to ``None``). In either case the ``state``
- attribute will be set to the provided state.
+ Returns whether this object was modified (or whether it was not).
"""
# Do not clean retry history (only on reset does this happen).
- self.state = state
- if self._was_failure(state, result):
- self.failure = result
- else:
+ was_altered = False
+ if state != self.state:
+ self.state = state
+ was_altered = True
+ if state == states.REVERT_FAILURE:
+ if result != self.revert_failure:
+ self.revert_failure = result
+ was_altered = True
+ if not _is_all_none(self.revert_results):
+ self.revert_results = None
+ was_altered = True
+ elif state == states.FAILURE:
+ if result != self.failure:
+ self.failure = result
+ was_altered = True
+ if not _is_all_none(self.revert_results, self.revert_failure):
+ self.revert_results = None
+ self.revert_failure = None
+ was_altered = True
+ elif state == states.SUCCESS:
+ if not _is_all_none(self.failure, self.revert_failure,
+ self.revert_results):
+ self.failure = None
+ self.revert_failure = None
+ self.revert_results = None
+ # Track what we produced, so that we can examine it (or avoid
+ # using it again).
self.results.append((result, {}))
- self.failure = None
- return True
+ was_altered = True
+ elif state == states.REVERTED:
+ # We don't really have the ability to determine equality of
+ # task (user) results at the current time, without making
+ # potentially bad guesses, so assume the retry detail always needs
+ # to be saved if they are not exactly equivalent...
+ if result is not self.revert_results:
+ self.revert_results = result
+ was_altered = True
+ if not _is_all_none(self.revert_failure):
+ self.revert_failure = None
+ was_altered = True
+ return was_altered
@classmethod
def from_dict(cls, data):