summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Shchelokovskyy <shchelokovskyy@gmail.com>2021-07-13 12:06:41 +0300
committerPavlo Shchelokovskyy <pshchelokovskyy@mirantis.com>2021-07-13 09:44:06 +0000
commit3e1f150926029b6a553cbef959f370e39ce6bb5a (patch)
tree3b1dff7c91b292b61133050e2ceb930d64348dc3
parent0e9f7367e7abf737e3fbde58350fb1034a3a36e8 (diff)
downloadtaskflow-3e1f150926029b6a553cbef959f370e39ce6bb5a.tar.gz
Use custom JSONType columns
the JSONType from sqlalchemy_utils is quite brittle as it only does primitive json.dumps on values. This leads to various sorts of StorageFailure exceptions in taskflow when, for example, an unserializable exception bubbles up to the 'failure' field of AtomDetails. This patch sublclasses the JSONType from sqlalchemy_utils and overrides two of its methods that do (de)serialization to work via oslo.serialization functions. They deal with such occurencies much better, for example, by providing 'str' as a fallback default. Change-Id: I3b9e9498b155199a4e707006a0cf22cda0567c06 Related-Bug: #1935957
-rw-r--r--taskflow/persistence/backends/sqlalchemy/tables.py35
1 files changed, 27 insertions, 8 deletions
diff --git a/taskflow/persistence/backends/sqlalchemy/tables.py b/taskflow/persistence/backends/sqlalchemy/tables.py
index b9b065e..062409a 100644
--- a/taskflow/persistence/backends/sqlalchemy/tables.py
+++ b/taskflow/persistence/backends/sqlalchemy/tables.py
@@ -16,10 +16,11 @@
import collections
+from oslo_serialization import jsonutils
from oslo_utils import timeutils
from oslo_utils import uuidutils
from sqlalchemy import Table, Column, String, ForeignKey, DateTime, Enum
-import sqlalchemy_utils as su
+from sqlalchemy_utils.types import json as json_type
from taskflow.persistence import models
from taskflow import states
@@ -34,6 +35,24 @@ STATE_LENGTH = 255
VERSION_LENGTH = 64
+class JSONType(json_type.JSONType):
+ """Customized JSONType using oslo.serialization for json operations"""
+
+ def process_bind_param(self, value, dialect):
+ if dialect.name == 'postgresql' and json_type.has_postgres_json:
+ return value
+ if value is not None:
+ value = jsonutils.dumps(value)
+ return value
+
+ def process_result_value(self, value, dialect):
+ if dialect.name == 'postgresql':
+ return value
+ if value is not None:
+ value = jsonutils.loads(value)
+ return value
+
+
def fetch(metadata):
"""Returns the master set of table objects (which is also there schema)."""
logbooks = Table('logbooks', metadata,
@@ -41,7 +60,7 @@ def fetch(metadata):
default=timeutils.utcnow),
Column('updated_at', DateTime,
onupdate=timeutils.utcnow),
- Column('meta', su.JSONType),
+ Column('meta', JSONType),
Column('name', String(length=NAME_LENGTH)),
Column('uuid', String(length=UUID_LENGTH),
primary_key=True, nullable=False, unique=True,
@@ -54,7 +73,7 @@ def fetch(metadata):
Column('parent_uuid', String(length=UUID_LENGTH),
ForeignKey('logbooks.uuid',
ondelete='CASCADE')),
- Column('meta', su.JSONType),
+ Column('meta', JSONType),
Column('name', String(length=NAME_LENGTH)),
Column('state', String(length=STATE_LENGTH)),
Column('uuid', String(length=UUID_LENGTH),
@@ -65,7 +84,7 @@ def fetch(metadata):
default=timeutils.utcnow),
Column('updated_at', DateTime,
onupdate=timeutils.utcnow),
- Column('meta', su.JSONType),
+ Column('meta', JSONType),
Column('parent_uuid', String(length=UUID_LENGTH),
ForeignKey('flowdetails.uuid',
ondelete='CASCADE')),
@@ -75,10 +94,10 @@ def fetch(metadata):
Column('uuid', String(length=UUID_LENGTH),
primary_key=True, nullable=False, unique=True,
default=uuidutils.generate_uuid),
- Column('failure', su.JSONType),
- Column('results', su.JSONType),
- Column('revert_results', su.JSONType),
- Column('revert_failure', su.JSONType),
+ Column('failure', JSONType),
+ Column('results', JSONType),
+ Column('revert_results', JSONType),
+ Column('revert_failure', JSONType),
Column('atom_type', Enum(*models.ATOM_TYPES,
name='atom_types')),
Column('intention', Enum(*states.INTENTIONS,