summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGES6
-rw-r--r--lib/sqlalchemy/dialects/mssql/base.py61
-rw-r--r--lib/sqlalchemy/dialects/mssql/information_schema.py2
-rw-r--r--test/dialect/test_mssql.py76
4 files changed, 113 insertions, 32 deletions
diff --git a/CHANGES b/CHANGES
index 6df2e8006..9eab740d6 100644
--- a/CHANGES
+++ b/CHANGES
@@ -260,6 +260,12 @@ CHANGES
- Fixed bug where aliasing of tables with "schema" would
fail to compile properly. [ticket:1943]
+
+ - Rewrote the reflection of indexes to use sys.
+ catalogs, so that column names of any configuration
+ (spaces, embedded commas, etc.) can be reflected.
+ Note that reflection of indexes requires SQL
+ Server 2005 or greater. [ticket:1770]
- informix
- *Major* cleanup / modernization of the Informix
diff --git a/lib/sqlalchemy/dialects/mssql/base.py b/lib/sqlalchemy/dialects/mssql/base.py
index 222737dbe..5c3b72647 100644
--- a/lib/sqlalchemy/dialects/mssql/base.py
+++ b/lib/sqlalchemy/dialects/mssql/base.py
@@ -114,6 +114,8 @@ Known Issues
------------
* No support for more than one ``IDENTITY`` column per table
+* reflection of indexes does not work with versions older than
+ SQL Server 2005
"""
import datetime, decimal, inspect, operator, sys, re
@@ -1124,26 +1126,55 @@ class MSDialect(default.DefaultDialect):
view_names = [r[0] for r in connection.execute(s)]
return view_names
- # The cursor reports it is closed after executing the sp.
@reflection.cache
def get_indexes(self, connection, tablename, schema=None, **kw):
+ # using system catalogs, don't support index reflection
+ # below MS 2005
+ if self.server_version_info < MS_2005_VERSION:
+ return []
+
current_schema = schema or self.default_schema_name
- col_finder = re.compile("(\w+)")
full_tname = "%s.%s" % (current_schema, tablename)
- indexes = []
- s = sql.text("exec sp_helpindex '%s'" % full_tname)
- rp = connection.execute(s)
- if rp.closed:
- # did not work for this setup.
- return []
+
+ rp = connection.execute(
+ sql.text("select ind.index_id, ind.is_unique, ind.name "
+ "from sys.indexes as ind join sys.tables as tab on "
+ "ind.object_id=tab.object_id "
+ "join sys.schemas as sch on sch.schema_id=tab.schema_id "
+ "where tab.name = :tabname "
+ "and sch.name=:schname "
+ "and ind.is_primary_key=0",
+ bindparams=[
+ sql.bindparam('tabname', tablename, sqltypes.Unicode),
+ sql.bindparam('schname', current_schema, sqltypes.Unicode)
+ ]
+ )
+ )
+ indexes = {}
for row in rp:
- if 'primary key' not in row['index_description']:
- indexes.append({
- 'name' : row['index_name'],
- 'column_names' : col_finder.findall(row['index_keys']),
- 'unique': 'unique' in row['index_description']
- })
- return indexes
+ indexes[row['index_id']] = {
+ 'name':row['name'],
+ 'unique':row['is_unique'] == 1,
+ 'column_names':[]
+ }
+ rp = connection.execute(
+ sql.text("select ind_col.index_id, col.name from sys.columns as col "
+ "join sys.index_columns as ind_col on "
+ "ind_col.column_id=col.column_id "
+ "join sys.tables as tab on tab.object_id=col.object_id "
+ "join sys.schemas as sch on sch.schema_id=tab.schema_id "
+ "where tab.name=:tabname "
+ "and sch.name=:schname",
+ bindparams=[
+ sql.bindparam('tabname', tablename, sqltypes.Unicode),
+ sql.bindparam('schname', current_schema, sqltypes.Unicode)
+ ]),
+ )
+ for row in rp:
+ if row['index_id'] in indexes:
+ indexes[row['index_id']]['column_names'].append(row['name'])
+
+ return indexes.values()
@reflection.cache
def get_view_definition(self, connection, viewname, schema=None, **kw):
diff --git a/lib/sqlalchemy/dialects/mssql/information_schema.py b/lib/sqlalchemy/dialects/mssql/information_schema.py
index cd1606dbf..4dd6436cd 100644
--- a/lib/sqlalchemy/dialects/mssql/information_schema.py
+++ b/lib/sqlalchemy/dialects/mssql/information_schema.py
@@ -1,3 +1,5 @@
+# TODO: should be using the sys. catalog with SQL Server, not information schema
+
from sqlalchemy import Table, MetaData, Column, ForeignKey
from sqlalchemy.types import String, Unicode, Integer, TypeDecorator
diff --git a/test/dialect/test_mssql.py b/test/dialect/test_mssql.py
index e840f9c0c..f9912317c 100644
--- a/test/dialect/test_mssql.py
+++ b/test/dialect/test_mssql.py
@@ -439,37 +439,79 @@ class ReflectionTest(TestBase, ComparesTables):
finally:
meta.drop_all()
+ @testing.provide_metadata
def test_identity(self):
- meta = MetaData(testing.db)
table = Table(
- 'identity_test', meta,
+ 'identity_test', metadata,
Column('col1', Integer, Sequence('fred', 2, 3), primary_key=True)
)
table.create()
meta2 = MetaData(testing.db)
- try:
- table2 = Table('identity_test', meta2, autoload=True)
- sequence = isinstance(table2.c['col1'].default, schema.Sequence) \
- and table2.c['col1'].default
- assert sequence.start == 2
- assert sequence.increment == 3
- finally:
- table.drop()
+ table2 = Table('identity_test', meta2, autoload=True)
+ sequence = isinstance(table2.c['col1'].default, schema.Sequence) \
+ and table2.c['col1'].default
+ assert sequence.start == 2
+ assert sequence.increment == 3
@testing.emits_warning("Did not recognize")
+ @testing.provide_metadata
def test_skip_types(self):
- meta = MetaData(testing.db)
testing.db.execute("""
create table foo (id integer primary key, data xml)
""")
- try:
- t1 = Table('foo', meta, autoload=True)
- assert isinstance(t1.c.id.type, Integer)
- assert isinstance(t1.c.data.type, types.NullType)
- finally:
- testing.db.execute("drop table foo")
+ t1 = Table('foo', metadata, autoload=True)
+ assert isinstance(t1.c.id.type, Integer)
+ assert isinstance(t1.c.data.type, types.NullType)
+
+ @testing.provide_metadata
+ def test_indexes_cols(self):
+
+ t1 = Table('t', metadata, Column('x', Integer), Column('y', Integer))
+ Index('foo', t1.c.x, t1.c.y)
+ metadata.create_all()
+
+ m2 = MetaData()
+ t2 = Table('t', m2, autoload=True, autoload_with=testing.db)
+
+ eq_(
+ set(list(t2.indexes)[0].columns),
+ set([t2.c['x'], t2.c.y])
+ )
+
+ @testing.provide_metadata
+ def test_indexes_cols_with_commas(self):
+
+ t1 = Table('t', metadata,
+ Column('x, col', Integer, key='x'),
+ Column('y', Integer)
+ )
+ Index('foo', t1.c.x, t1.c.y)
+ metadata.create_all()
+
+ m2 = MetaData()
+ t2 = Table('t', m2, autoload=True, autoload_with=testing.db)
+
+ eq_(
+ set(list(t2.indexes)[0].columns),
+ set([t2.c['x, col'], t2.c.y])
+ )
+
+ @testing.provide_metadata
+ def test_indexes_cols_with_spaces(self):
+
+ t1 = Table('t', metadata, Column('x col', Integer, key='x'),
+ Column('y', Integer))
+ Index('foo', t1.c.x, t1.c.y)
+ metadata.create_all()
+
+ m2 = MetaData()
+ t2 = Table('t', m2, autoload=True, autoload_with=testing.db)
+ eq_(
+ set(list(t2.indexes)[0].columns),
+ set([t2.c['x col'], t2.c.y])
+ )
class QueryUnicodeTest(TestBase):