summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2014-05-05 15:15:03 +0000
committerGerrit Code Review <review@openstack.org>2014-05-05 15:15:03 +0000
commit089663761cc15f8b3cdb874b6a76270ccdd0a412 (patch)
tree26ce0e0d1a6735842057fb6d0206a5b9b3b3cb54
parenta03b141a954c7e644f0033defdb1b5b434a7c49a (diff)
parent93efb62fd100f5135928443c2c325ae78b1c1fd0 (diff)
downloadsqlalchemy-migrate-089663761cc15f8b3cdb874b6a76270ccdd0a412.tar.gz
Merge "Move patch from oslo to drop unique constraints with sqlite"0.9.1
-rw-r--r--migrate/changeset/databases/sqlite.py51
-rw-r--r--migrate/tests/changeset/test_constraint.py2
2 files changed, 49 insertions, 4 deletions
diff --git a/migrate/changeset/databases/sqlite.py b/migrate/changeset/databases/sqlite.py
index a601593..8a5ba90 100644
--- a/migrate/changeset/databases/sqlite.py
+++ b/migrate/changeset/databases/sqlite.py
@@ -8,8 +8,10 @@ try: # Python 3
except ImportError: # Python 2
from UserDict import DictMixin
from copy import copy
+import re
from sqlalchemy.databases import sqlite as sa_base
+from sqlalchemy.schema import UniqueConstraint
from migrate import exceptions
from migrate.changeset import ansisql
@@ -27,7 +29,38 @@ class SQLiteCommon(object):
class SQLiteHelper(SQLiteCommon):
- def recreate_table(self,table,column=None,delta=None):
+ def _get_unique_constraints(self, table):
+ """Retrieve information about existing unique constraints of the table
+
+ This feature is needed for recreate_table() to work properly.
+ """
+
+ data = table.metadata.bind.execute(
+ """SELECT sql
+ FROM sqlite_master
+ WHERE
+ type='table' AND
+ name=:table_name""",
+ table_name=table.name
+ ).fetchone()[0]
+
+ UNIQUE_PATTERN = "CONSTRAINT (\w+) UNIQUE \(([^\)]+)\)"
+ constraints = []
+ for name, cols in re.findall(UNIQUE_PATTERN, data):
+ # Filter out any columns that were dropped from the table.
+ columns = []
+ for c in cols.split(","):
+ if c in table.columns:
+ # There was a bug in reflection of SQLite columns with
+ # reserved identifiers as names (SQLite can return them
+ # wrapped with double quotes), so strip double quotes.
+ columns.extend(c.strip(' "'))
+ if columns:
+ constraints.extend(UniqueConstraint(*columns, name=name))
+ return constraints
+
+ def recreate_table(self, table, column=None, delta=None,
+ omit_uniques=None):
table_name = self.preparer.format_table(table)
# we remove all indexes so as not to have
@@ -35,6 +68,15 @@ class SQLiteHelper(SQLiteCommon):
for index in table.indexes:
index.drop()
+ # reflect existing unique constraints
+ for uc in self._get_unique_constraints(table):
+ table.append_constraint(uc)
+ # omit given unique constraints when creating a new table if required
+ table.constraints = set([
+ cons for cons in table.constraints
+ if omit_uniques is None or cons.name not in omit_uniques
+ ])
+
self.append('ALTER TABLE %s RENAME TO migration_tmp' % table_name)
self.execute()
@@ -126,9 +168,12 @@ class SQLiteConstraintGenerator(ansisql.ANSIConstraintGenerator, SQLiteHelper, S
class SQLiteConstraintDropper(ansisql.ANSIColumnDropper,
- SQLiteCommon,
+ SQLiteHelper,
ansisql.ANSIConstraintCommon):
+ def _modify_table(self, table, column, delta):
+ return 'INSERT INTO %(table_name)s SELECT * from migration_tmp'
+
def visit_migrate_primary_key_constraint(self, constraint):
tmpl = "DROP INDEX %s "
name = self.get_constraint_name(constraint)
@@ -143,7 +188,7 @@ class SQLiteConstraintDropper(ansisql.ANSIColumnDropper,
self._not_supported('ALTER TABLE DROP CONSTRAINT')
def visit_migrate_unique_constraint(self, *p, **k):
- self._not_supported('ALTER TABLE DROP CONSTRAINT')
+ self.recreate_table(p[0].table, omit_uniques=[p[0].name])
# TODO: technically primary key is a NOT NULL + UNIQUE constraint, should add NOT NULL to index
diff --git a/migrate/tests/changeset/test_constraint.py b/migrate/tests/changeset/test_constraint.py
index d27554e..6421206 100644
--- a/migrate/tests/changeset/test_constraint.py
+++ b/migrate/tests/changeset/test_constraint.py
@@ -274,7 +274,7 @@ class TestAutoname(CommonTestConstraint):
self.table.insert(values={'id': 2, 'fkey': 2}).execute()
self.table.insert(values={'id': 1, 'fkey': 3}).execute()
- @fixture.usedb(not_supported=['oracle', 'sqlite'])
+ @fixture.usedb(not_supported=['oracle'])
def test_autoname_unique(self):
"""UniqueConstraints can guess their name if None is given"""
cons = UniqueConstraint(self.table.c.fkey)