summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2014-10-02 12:39:00 -0700
committerJoshua Harlow <harlowja@yahoo-inc.com>2015-02-10 05:36:44 +0000
commit687ec913790653f79badc8f5d656c86792e94271 (patch)
tree88d31f9b819db82997684c4fe5aa62455a295ab2
parent6924b3622a0b62ee27c5b4e6a00745b04fedf0c1 (diff)
downloadtaskflow-687ec913790653f79badc8f5d656c86792e94271.tar.gz
Rework the sqlalchemy backend
We can just simplify the usage of an sqlalchemy if we just use (we already have our own ORM like objects anyway) sqlalchemy core in the first place and have a very tiny layer that converts back and forth from our very limited object model that we use in our persistence layer. This change makes that adjustment, which makes it easier to read and understand the actions the sqlalchemy backend is doing when saving, reading and updating data, and avoids yet another layer that isn't useful for our purposes anyway. Change-Id: I911c509f65e7845aee86fed1622eaa56970741f2
-rw-r--r--taskflow/persistence/backends/impl_sqlalchemy.py430
-rw-r--r--taskflow/persistence/backends/sqlalchemy/models.py97
-rw-r--r--taskflow/persistence/backends/sqlalchemy/tables.py99
3 files changed, 282 insertions, 344 deletions
diff --git a/taskflow/persistence/backends/impl_sqlalchemy.py b/taskflow/persistence/backends/impl_sqlalchemy.py
index 98ebb30..5796e27 100644
--- a/taskflow/persistence/backends/impl_sqlalchemy.py
+++ b/taskflow/persistence/backends/impl_sqlalchemy.py
@@ -28,13 +28,13 @@ from oslo_utils import strutils
import six
import sqlalchemy as sa
from sqlalchemy import exc as sa_exc
-from sqlalchemy import orm as sa_orm
from sqlalchemy import pool as sa_pool
+from sqlalchemy import sql
from taskflow import exceptions as exc
from taskflow import logging
from taskflow.persistence.backends.sqlalchemy import migration
-from taskflow.persistence.backends.sqlalchemy import models
+from taskflow.persistence.backends.sqlalchemy import tables
from taskflow.persistence import base
from taskflow.persistence import logbook
from taskflow.types import failure
@@ -180,6 +180,50 @@ def _ping_listener(dbapi_conn, connection_rec, connection_proxy):
raise
+class Alchemist(object):
+ """Internal <-> external row <-> objects + other helper functions.
+
+ NOTE(harlowja): for internal usage only.
+ """
+ def __init__(self, tables):
+ self._tables = tables
+
+ @staticmethod
+ def convert_flow_detail(row):
+ return logbook.FlowDetail.from_dict(dict(row.items()))
+
+ @staticmethod
+ def convert_book(row):
+ return logbook.LogBook.from_dict(dict(row.items()))
+
+ @staticmethod
+ def convert_atom_detail(row):
+ row = dict(row.items())
+ atom_cls = logbook.atom_detail_class(row.pop('atom_type'))
+ return atom_cls.from_dict(row)
+
+ def _atom_query_iter(self, conn, parent_uuid):
+ q = (sql.select([self._tables.atomdetails]).
+ where(self._tables.atomdetails.c.parent_uuid == parent_uuid))
+ for row in conn.execute(q):
+ yield self.convert_atom_detail(row)
+
+ def _flow_query_iter(self, conn, parent_uuid):
+ q = (sql.select([self._tables.flowdetails]).
+ where(self._tables.flowdetails.c.parent_uuid == parent_uuid))
+ for row in conn.execute(q):
+ yield self.convert_flow_detail(row)
+
+ def populate_book(self, conn, book):
+ for fd in self._flow_query_iter(conn, book.uuid):
+ book.add(fd)
+ self.populate_flow_detail(conn, fd)
+
+ def populate_flow_detail(self, conn, fd):
+ for ad in self._atom_query_iter(conn, fd.uuid):
+ fd.add(ad)
+
+
class SQLAlchemyBackend(base.Backend):
"""A sqlalchemy backend.
@@ -197,7 +241,6 @@ class SQLAlchemyBackend(base.Backend):
else:
self._engine = None
self._owns_engine = True
- self._session_maker = None
self._validated = False
def _create_engine(self):
@@ -270,14 +313,8 @@ class SQLAlchemyBackend(base.Backend):
self._engine = self._create_engine()
return self._engine
- def _get_session_maker(self):
- if self._session_maker is None:
- self._session_maker = sa_orm.sessionmaker(bind=self.engine,
- autocommit=True)
- return self._session_maker
-
def get_connection(self):
- conn = Connection(self, self._get_session_maker())
+ conn = Connection(self)
if not self._validated:
try:
max_retries = misc.as_int(self._conf.get('max_retries', None))
@@ -288,9 +325,6 @@ class SQLAlchemyBackend(base.Backend):
return conn
def close(self):
- if self._session_maker is not None:
- self._session_maker.close_all()
- self._session_maker = None
if self._engine is not None and self._owns_engine:
# NOTE(harlowja): Only dispose of the engine and clear it from
# our local state if we actually own the engine in the first
@@ -304,10 +338,12 @@ class SQLAlchemyBackend(base.Backend):
class Connection(base.Connection):
- def __init__(self, backend, session_maker):
+ def __init__(self, backend):
self._backend = backend
- self._session_maker = session_maker
self._engine = backend.engine
+ self._metadata = sa.MetaData()
+ self._tables = tables.fetch(self._metadata)
+ self._converter = Alchemist(self._tables)
@property
def backend(self):
@@ -315,7 +351,7 @@ class Connection(base.Connection):
def validate(self, max_retries=0):
- def test_connect(failures):
+ def verify_connect(failures):
try:
# See if we can make a connection happen.
#
@@ -332,7 +368,7 @@ class Connection(base.Connection):
return True
failures = []
- if test_connect(failures):
+ if verify_connect(failures):
return
# Sorry it didn't work out...
@@ -349,40 +385,13 @@ class Connection(base.Connection):
LOG.info("Attempting to test the connection again in %s seconds.",
sleepy_secs)
time.sleep(sleepy_secs)
- if test_connect(failures):
+ if verify_connect(failures):
return
attempts_left -= 1
# Sorry it didn't work out...
failures[-1].reraise()
- def _run_in_session(self, functor, *args, **kwargs):
- """Runs a callback in a session.
-
- This function proxy will create a session, and then call the callback
- with that session (along with the provided args and kwargs). It ensures
- that the session is opened & closed and makes sure that sqlalchemy
- exceptions aren't emitted from the callback or sessions actions (as
- that would expose the underlying sqlalchemy exception model).
- """
- try:
- session = self._make_session()
- with session.begin():
- return functor(session, *args, **kwargs)
- except sa_exc.SQLAlchemyError as e:
- LOG.exception("Failed running '%s' within a database session",
- functor.__name__)
- raise exc.StorageFailure("Storage backend internal error, failed"
- " running '%s' within a database"
- " session" % functor.__name__, e)
-
- def _make_session(self):
- try:
- return self._session_maker()
- except sa_exc.SQLAlchemyError as e:
- LOG.exception('Failed creating database session')
- raise exc.StorageFailure("Failed creating database session", e)
-
def upgrade(self):
try:
with contextlib.closing(self._engine.connect()) as conn:
@@ -390,237 +399,164 @@ class Connection(base.Connection):
# and we don't recommend to use SQLite in production
# deployments, so migrations are rarely needed
# for SQLite. So we don't bother about working around
- # SQLite limitations, and create database from models
- # when it is in use.
+ # SQLite limitations, and create the database directly from
+ # the tables when it is in use...
if 'sqlite' in self._engine.url.drivername:
- models.BASE.metadata.create_all(conn)
+ self._metadata.create_all(bind=conn)
else:
migration.db_sync(conn)
except sa_exc.SQLAlchemyError as e:
- LOG.exception('Failed upgrading database version')
raise exc.StorageFailure("Failed upgrading database version", e)
- def _clear_all(self, session):
- # NOTE(harlowja): due to how we have our relationship setup and
- # cascading deletes are enabled, this will cause all associated
- # task details and flow details to automatically be purged.
+ def clear_all(self):
try:
- return session.query(models.LogBook).delete()
+ logbooks = self._tables.logbooks
+ with self._engine.begin() as conn:
+ conn.execute(logbooks.delete())
except sa_exc.DBAPIError as e:
- LOG.exception('Failed clearing all entries')
raise exc.StorageFailure("Failed clearing all entries", e)
- def clear_all(self):
- return self._run_in_session(self._clear_all)
-
- def _update_atom_details(self, session, ad):
- # Must already exist since a atoms details has a strong connection to
- # a flow details, and atom details can not be saved on there own since
- # they *must* have a connection to an existing flow detail.
- ad_m = _atom_details_get_model(ad.uuid, session=session)
- ad_m = _atomdetails_merge(ad_m, ad)
- ad_m = session.merge(ad_m)
- return _convert_ad_to_external(ad_m)
-
def update_atom_details(self, atom_detail):
- return self._run_in_session(self._update_atom_details, ad=atom_detail)
-
- def _update_flow_details(self, session, fd):
- # Must already exist since a flow details has a strong connection to
- # a logbook, and flow details can not be saved on there own since they
- # *must* have a connection to an existing logbook.
- fd_m = _flow_details_get_model(fd.uuid, session=session)
- fd_m = _flowdetails_merge(fd_m, fd)
- fd_m = session.merge(fd_m)
- return _convert_fd_to_external(fd_m)
+ try:
+ atomdetails = self._tables.atomdetails
+ with self._engine.begin() as conn:
+ q = (sql.select([atomdetails]).
+ where(atomdetails.c.uuid == atom_detail.uuid))
+ row = conn.execute(q).first()
+ if not row:
+ raise exc.NotFound("No atom details found with uuid"
+ " '%s'" % atom_detail.uuid)
+ e_ad = self._converter.convert_atom_detail(row)
+ self._update_atom_details(conn, atom_detail, e_ad)
+ return e_ad
+ except sa_exc.SQLAlchemyError as e:
+ raise exc.StorageFailure("Failed updating atom details with"
+ " uuid '%s'" % atom_detail.uuid, e)
+
+ def _insert_flow_details(self, conn, fd, parent_uuid):
+ value = fd.to_dict()
+ value['parent_uuid'] = parent_uuid
+ conn.execute(sql.insert(self._tables.flowdetails, value))
+ for ad in fd:
+ self._insert_atom_details(conn, ad, fd.uuid)
+
+ def _insert_atom_details(self, conn, ad, parent_uuid):
+ value = ad.to_dict()
+ value['parent_uuid'] = parent_uuid
+ value['atom_type'] = logbook.atom_detail_type(ad)
+ conn.execute(sql.insert(self._tables.atomdetails, value))
+
+ def _update_atom_details(self, conn, ad, e_ad):
+ e_ad.merge(ad)
+ conn.execute(sql.update(self._tables.atomdetails)
+ .where(self._tables.atomdetails.c.uuid == e_ad.uuid)
+ .values(e_ad.to_dict()))
+
+ def _update_flow_details(self, conn, fd, e_fd):
+ e_fd.merge(fd)
+ conn.execute(sql.update(self._tables.flowdetails)
+ .where(self._tables.flowdetails.c.uuid == e_fd.uuid)
+ .values(e_fd.to_dict()))
+ for ad in fd:
+ e_ad = e_fd.find(ad.uuid)
+ if e_ad is None:
+ e_fd.add(ad)
+ self._insert_atom_details(conn, ad, fd.uuid)
+ else:
+ self._update_atom_details(conn, ad, e_ad)
def update_flow_details(self, flow_detail):
- return self._run_in_session(self._update_flow_details, fd=flow_detail)
-
- def _destroy_logbook(self, session, lb_id):
try:
- lb = _logbook_get_model(lb_id, session=session)
- session.delete(lb)
- except sa_exc.DBAPIError as e:
- LOG.exception('Failed destroying logbook')
- raise exc.StorageFailure("Failed destroying logbook %s" % lb_id, e)
+ flowdetails = self._tables.flowdetails
+ with self._engine.begin() as conn:
+ q = (sql.select([flowdetails]).
+ where(flowdetails.c.uuid == flow_detail.uuid))
+ row = conn.execute(q).first()
+ if not row:
+ raise exc.NotFound("No flow details found with"
+ " uuid '%s'" % flow_detail.uuid)
+ e_fd = self._converter.convert_flow_detail(row)
+ self._converter.populate_flow_detail(conn, e_fd)
+ self._update_flow_details(conn, flow_detail, e_fd)
+ return e_fd
+ except sa_exc.SQLAlchemyError as e:
+ raise exc.StorageFailure("Failed updating flow details with"
+ " uuid '%s'" % flow_detail.uuid, e)
def destroy_logbook(self, book_uuid):
- return self._run_in_session(self._destroy_logbook, lb_id=book_uuid)
-
- def _save_logbook(self, session, lb):
- try:
- lb_m = _logbook_get_model(lb.uuid, session=session)
- lb_m = _logbook_merge(lb_m, lb)
- except exc.NotFound:
- lb_m = _convert_lb_to_internal(lb)
try:
- lb_m = session.merge(lb_m)
- return _convert_lb_to_external(lb_m)
+ logbooks = self._tables.logbooks
+ with self._engine.begin() as conn:
+ q = logbooks.delete().where(logbooks.c.uuid == book_uuid)
+ r = conn.execute(q)
+ if r.rowcount == 0:
+ raise exc.NotFound("No logbook found with"
+ " uuid '%s'" % book_uuid)
except sa_exc.DBAPIError as e:
- LOG.exception('Failed saving logbook')
- raise exc.StorageFailure("Failed saving logbook %s" % lb.uuid, e)
+ raise exc.StorageFailure("Failed destroying"
+ " logbook '%s'" % book_uuid, e)
def save_logbook(self, book):
- return self._run_in_session(self._save_logbook, lb=book)
+ try:
+ logbooks = self._tables.logbooks
+ with self._engine.begin() as conn:
+ q = (sql.select([logbooks]).
+ where(logbooks.c.uuid == book.uuid))
+ row = conn.execute(q).first()
+ if row:
+ e_lb = self._converter.convert_book(row)
+ self._converter.populate_book(conn, e_lb)
+ e_lb.merge(book)
+ conn.execute(sql.update(logbooks)
+ .where(logbooks.c.uuid == e_lb.uuid)
+ .values(e_lb.to_dict()))
+ for fd in book:
+ e_fd = e_lb.find(fd.uuid)
+ if e_fd is None:
+ e_lb.add(fd)
+ self._insert_flow_details(conn, fd, e_lb.uuid)
+ else:
+ self._update_flow_details(conn, fd, e_fd)
+ return e_lb
+ else:
+ conn.execute(sql.insert(logbooks, book.to_dict()))
+ for fd in book:
+ self._insert_flow_details(conn, fd, book.uuid)
+ return book
+ except sa_exc.DBAPIError as e:
+ raise exc.StorageFailure("Failed saving logbook"
+ " '%s'" % book.uuid, e)
def get_logbook(self, book_uuid):
- session = self._make_session()
try:
- lb = _logbook_get_model(book_uuid, session=session)
- return _convert_lb_to_external(lb)
+ logbooks = self._tables.logbooks
+ with contextlib.closing(self._engine.connect()) as conn:
+ q = (sql.select([logbooks]).
+ where(logbooks.c.uuid == book_uuid))
+ row = conn.execute(q).first()
+ if not row:
+ raise exc.NotFound("No logbook found with"
+ " uuid '%s'" % book_uuid)
+ book = self._converter.convert_book(row)
+ self._converter.populate_book(conn, book)
+ return book
except sa_exc.DBAPIError as e:
- LOG.exception('Failed getting logbook')
- raise exc.StorageFailure("Failed getting logbook %s" % book_uuid,
- e)
+ raise exc.StorageFailure(
+ "Failed getting logbook '%s'" % book_uuid, e)
def get_logbooks(self):
- session = self._make_session()
+ gathered = []
try:
- raw_books = session.query(models.LogBook).all()
- books = [_convert_lb_to_external(lb) for lb in raw_books]
+ with contextlib.closing(self._engine.connect()) as conn:
+ q = sql.select([self._tables.logbooks])
+ for row in conn.execute(q):
+ book = self._converter.convert_book(row)
+ self._converter.populate_book(conn, book)
+ gathered.append(book)
except sa_exc.DBAPIError as e:
- LOG.exception('Failed getting logbooks')
raise exc.StorageFailure("Failed getting logbooks", e)
- for lb in books:
- yield lb
+ for book in gathered:
+ yield book
def close(self):
pass
-
-###
-# Internal <-> external model + merging + other helper functions.
-###
-
-
-def _atomdetails_merge(ad_m, ad):
- atom_type = logbook.atom_detail_type(ad)
- if atom_type != ad_m.atom_type:
- raise exc.StorageFailure("Can not merge differing atom types "
- "(%s != %s)" % (atom_type, ad_m.atom_type))
- ad_d = ad.to_dict()
- ad_m.state = ad_d['state']
- ad_m.intention = ad_d['intention']
- ad_m.results = ad_d['results']
- ad_m.version = ad_d['version']
- ad_m.failure = ad_d['failure']
- ad_m.meta = ad_d['meta']
- ad_m.name = ad_d['name']
- return ad_m
-
-
-def _flowdetails_merge(fd_m, fd):
- fd_d = fd.to_dict()
- fd_m.state = fd_d['state']
- fd_m.name = fd_d['name']
- fd_m.meta = fd_d['meta']
- for ad in fd:
- existing_ad = False
- for ad_m in fd_m.atomdetails:
- if ad_m.uuid == ad.uuid:
- existing_ad = True
- ad_m = _atomdetails_merge(ad_m, ad)
- break
- if not existing_ad:
- ad_m = _convert_ad_to_internal(ad, fd_m.uuid)
- fd_m.atomdetails.append(ad_m)
- return fd_m
-
-
-def _logbook_merge(lb_m, lb):
- lb_d = lb.to_dict()
- lb_m.meta = lb_d['meta']
- lb_m.name = lb_d['name']
- lb_m.created_at = lb_d['created_at']
- lb_m.updated_at = lb_d['updated_at']
- for fd in lb:
- existing_fd = False
- for fd_m in lb_m.flowdetails:
- if fd_m.uuid == fd.uuid:
- existing_fd = True
- fd_m = _flowdetails_merge(fd_m, fd)
- if not existing_fd:
- lb_m.flowdetails.append(_convert_fd_to_internal(fd, lb_m.uuid))
- return lb_m
-
-
-def _convert_fd_to_external(fd):
- fd_c = logbook.FlowDetail(fd.name, uuid=fd.uuid)
- fd_c.meta = fd.meta
- fd_c.state = fd.state
- for ad_m in fd.atomdetails:
- fd_c.add(_convert_ad_to_external(ad_m))
- return fd_c
-
-
-def _convert_fd_to_internal(fd, parent_uuid):
- fd_m = models.FlowDetail(name=fd.name, uuid=fd.uuid,
- parent_uuid=parent_uuid, meta=fd.meta,
- state=fd.state)
- fd_m.atomdetails = []
- for ad in fd:
- fd_m.atomdetails.append(_convert_ad_to_internal(ad, fd_m.uuid))
- return fd_m
-
-
-def _convert_ad_to_internal(ad, parent_uuid):
- converted = ad.to_dict()
- converted['atom_type'] = logbook.atom_detail_type(ad)
- converted['parent_uuid'] = parent_uuid
- return models.AtomDetail(**converted)
-
-
-def _convert_ad_to_external(ad):
- # Convert from sqlalchemy model -> external model, this allows us
- # to change the internal sqlalchemy model easily by forcing a defined
- # interface (that isn't the sqlalchemy model itself).
- atom_cls = logbook.atom_detail_class(ad.atom_type)
- return atom_cls.from_dict({
- 'state': ad.state,
- 'intention': ad.intention,
- 'results': ad.results,
- 'failure': ad.failure,
- 'meta': ad.meta,
- 'version': ad.version,
- 'name': ad.name,
- 'uuid': ad.uuid,
- })
-
-
-def _convert_lb_to_external(lb_m):
- lb_c = logbook.LogBook(lb_m.name, lb_m.uuid)
- lb_c.updated_at = lb_m.updated_at
- lb_c.created_at = lb_m.created_at
- lb_c.meta = lb_m.meta
- for fd_m in lb_m.flowdetails:
- lb_c.add(_convert_fd_to_external(fd_m))
- return lb_c
-
-
-def _convert_lb_to_internal(lb_c):
- lb_m = models.LogBook(uuid=lb_c.uuid, meta=lb_c.meta, name=lb_c.name)
- lb_m.flowdetails = []
- for fd_c in lb_c:
- lb_m.flowdetails.append(_convert_fd_to_internal(fd_c, lb_c.uuid))
- return lb_m
-
-
-def _logbook_get_model(lb_id, session):
- entry = session.query(models.LogBook).filter_by(uuid=lb_id).first()
- if entry is None:
- raise exc.NotFound("No logbook found with id: %s" % lb_id)
- return entry
-
-
-def _flow_details_get_model(flow_id, session):
- entry = session.query(models.FlowDetail).filter_by(uuid=flow_id).first()
- if entry is None:
- raise exc.NotFound("No flow details found with id: %s" % flow_id)
- return entry
-
-
-def _atom_details_get_model(atom_id, session):
- entry = session.query(models.AtomDetail).filter_by(uuid=atom_id).first()
- if entry is None:
- raise exc.NotFound("No atom details found with id: %s" % atom_id)
- return entry
diff --git a/taskflow/persistence/backends/sqlalchemy/models.py b/taskflow/persistence/backends/sqlalchemy/models.py
deleted file mode 100644
index e43c965..0000000
--- a/taskflow/persistence/backends/sqlalchemy/models.py
+++ /dev/null
@@ -1,97 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
-# Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_serialization import jsonutils
-from oslo_utils import timeutils
-from oslo_utils import uuidutils
-from sqlalchemy import Column, String, DateTime, Enum
-from sqlalchemy.ext.declarative import declarative_base
-from sqlalchemy import ForeignKey
-from sqlalchemy.orm import backref
-from sqlalchemy.orm import relationship
-from sqlalchemy import types as types
-
-from taskflow.persistence import logbook
-from taskflow import states
-
-BASE = declarative_base()
-
-
-# TODO(harlowja): remove when oslo.db exists
-class TimestampMixin(object):
- created_at = Column(DateTime, default=timeutils.utcnow)
- updated_at = Column(DateTime, onupdate=timeutils.utcnow)
-
-
-class Json(types.TypeDecorator):
- impl = types.Text
-
- def process_bind_param(self, value, dialect):
- return jsonutils.dumps(value)
-
- def process_result_value(self, value, dialect):
- return jsonutils.loads(value)
-
-
-class ModelBase(TimestampMixin):
- """Base model for all taskflow objects."""
- uuid = Column(String, default=uuidutils.generate_uuid,
- primary_key=True, nullable=False, unique=True)
- name = Column(String, nullable=True)
- meta = Column(Json, nullable=True)
-
-
-class LogBook(BASE, ModelBase):
- """Represents a logbook for a set of flows."""
- __tablename__ = 'logbooks'
-
- # Relationships
- flowdetails = relationship("FlowDetail",
- single_parent=True,
- backref=backref("logbooks",
- cascade="save-update, delete, "
- "merge"))
-
-
-class FlowDetail(BASE, ModelBase):
- __tablename__ = 'flowdetails'
-
- # Member variables
- state = Column(String)
-
- # Relationships
- parent_uuid = Column(String, ForeignKey('logbooks.uuid'))
- atomdetails = relationship("AtomDetail",
- single_parent=True,
- backref=backref("flowdetails",
- cascade="save-update, delete, "
- "merge"))
-
-
-class AtomDetail(BASE, ModelBase):
- __tablename__ = 'atomdetails'
-
- # Member variables
- atom_type = Column(Enum(*logbook.ATOM_TYPES, name='atom_types'))
- state = Column(String)
- intention = Column(Enum(*states.INTENTIONS, name='intentions'))
- results = Column(Json)
- failure = Column(Json)
- version = Column(Json)
-
- # Relationships
- parent_uuid = Column(String, ForeignKey('flowdetails.uuid'))
diff --git a/taskflow/persistence/backends/sqlalchemy/tables.py b/taskflow/persistence/backends/sqlalchemy/tables.py
new file mode 100644
index 0000000..239e643
--- /dev/null
+++ b/taskflow/persistence/backends/sqlalchemy/tables.py
@@ -0,0 +1,99 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import collections
+
+from oslo.serialization import jsonutils
+from oslo.utils import timeutils
+from oslo.utils import uuidutils
+from sqlalchemy import Table, Column, String, ForeignKey, DateTime, Enum
+from sqlalchemy import types
+
+from taskflow.persistence import logbook
+from taskflow import states
+
+Tables = collections.namedtuple('Tables',
+ ['logbooks', 'flowdetails', 'atomdetails'])
+
+# Column length limits...
+NAME_LENGTH = 255
+UUID_LENGTH = 64
+STATE_LENGTH = 255
+VERSION_LENGTH = 64
+
+
+class Json(types.TypeDecorator):
+ impl = types.Text
+
+ def process_bind_param(self, value, dialect):
+ if not value:
+ return None
+ return jsonutils.dumps(value)
+
+ def process_result_value(self, value, dialect):
+ if not value:
+ return None
+ return jsonutils.loads(value)
+
+
+def fetch(metadata):
+ """Returns the master set of table objects (which is also there schema)."""
+ logbooks = Table('logbooks', metadata,
+ Column('created_at', DateTime,
+ default=timeutils.utcnow),
+ Column('updated_at', DateTime,
+ default=timeutils.utcnow),
+ Column('meta', Json),
+ Column('name', String(length=NAME_LENGTH)),
+ Column('uuid', String(length=UUID_LENGTH),
+ primary_key=True, nullable=False, unique=True,
+ default=uuidutils.generate_uuid))
+ flowdetails = Table('flowdetails', metadata,
+ Column('created_at', DateTime,
+ default=timeutils.utcnow),
+ Column('updated_at', DateTime,
+ default=timeutils.utcnow),
+ Column('parent_uuid', String(length=UUID_LENGTH),
+ ForeignKey('logbooks.uuid',
+ ondelete='CASCADE')),
+ Column('meta', Json),
+ Column('name', String(length=NAME_LENGTH)),
+ Column('state', String(length=STATE_LENGTH)),
+ Column('uuid', String(length=UUID_LENGTH),
+ primary_key=True, nullable=False, unique=True,
+ default=uuidutils.generate_uuid))
+ atomdetails = Table('atomdetails', metadata,
+ Column('created_at', DateTime,
+ default=timeutils.utcnow),
+ Column('updated_at', DateTime,
+ default=timeutils.utcnow),
+ Column('meta', Json),
+ Column('parent_uuid', String(length=UUID_LENGTH),
+ ForeignKey('flowdetails.uuid',
+ ondelete='CASCADE')),
+ Column('name', String(length=NAME_LENGTH)),
+ Column('version', String(length=VERSION_LENGTH)),
+ Column('state', String(length=STATE_LENGTH)),
+ Column('uuid', String(length=UUID_LENGTH),
+ primary_key=True, nullable=False, unique=True,
+ default=uuidutils.generate_uuid),
+ Column('failure', Json),
+ Column('results', Json),
+ Column('atom_type', Enum(*logbook.ATOM_TYPES,
+ name='atom_types')),
+ Column('intention', Enum(*states.INTENTIONS,
+ name='intentions')))
+ return Tables(logbooks, flowdetails, atomdetails)