diff options
| -rw-r--r-- | CHANGES | 6 | ||||
| -rw-r--r-- | lib/sqlalchemy/dialects/mssql/base.py | 61 | ||||
| -rw-r--r-- | lib/sqlalchemy/dialects/mssql/information_schema.py | 2 | ||||
| -rw-r--r-- | test/dialect/test_mssql.py | 76 |
4 files changed, 113 insertions, 32 deletions
@@ -260,6 +260,12 @@ CHANGES - Fixed bug where aliasing of tables with "schema" would fail to compile properly. [ticket:1943] + + - Rewrote the reflection of indexes to use sys. + catalogs, so that column names of any configuration + (spaces, embedded commas, etc.) can be reflected. + Note that reflection of indexes requires SQL + Server 2005 or greater. [ticket:1770] - informix - *Major* cleanup / modernization of the Informix diff --git a/lib/sqlalchemy/dialects/mssql/base.py b/lib/sqlalchemy/dialects/mssql/base.py index 222737dbe..5c3b72647 100644 --- a/lib/sqlalchemy/dialects/mssql/base.py +++ b/lib/sqlalchemy/dialects/mssql/base.py @@ -114,6 +114,8 @@ Known Issues ------------ * No support for more than one ``IDENTITY`` column per table +* reflection of indexes does not work with versions older than + SQL Server 2005 """ import datetime, decimal, inspect, operator, sys, re @@ -1124,26 +1126,55 @@ class MSDialect(default.DefaultDialect): view_names = [r[0] for r in connection.execute(s)] return view_names - # The cursor reports it is closed after executing the sp. @reflection.cache def get_indexes(self, connection, tablename, schema=None, **kw): + # using system catalogs, don't support index reflection + # below MS 2005 + if self.server_version_info < MS_2005_VERSION: + return [] + current_schema = schema or self.default_schema_name - col_finder = re.compile("(\w+)") full_tname = "%s.%s" % (current_schema, tablename) - indexes = [] - s = sql.text("exec sp_helpindex '%s'" % full_tname) - rp = connection.execute(s) - if rp.closed: - # did not work for this setup. - return [] + + rp = connection.execute( + sql.text("select ind.index_id, ind.is_unique, ind.name " + "from sys.indexes as ind join sys.tables as tab on " + "ind.object_id=tab.object_id " + "join sys.schemas as sch on sch.schema_id=tab.schema_id " + "where tab.name = :tabname " + "and sch.name=:schname " + "and ind.is_primary_key=0", + bindparams=[ + sql.bindparam('tabname', tablename, sqltypes.Unicode), + sql.bindparam('schname', current_schema, sqltypes.Unicode) + ] + ) + ) + indexes = {} for row in rp: - if 'primary key' not in row['index_description']: - indexes.append({ - 'name' : row['index_name'], - 'column_names' : col_finder.findall(row['index_keys']), - 'unique': 'unique' in row['index_description'] - }) - return indexes + indexes[row['index_id']] = { + 'name':row['name'], + 'unique':row['is_unique'] == 1, + 'column_names':[] + } + rp = connection.execute( + sql.text("select ind_col.index_id, col.name from sys.columns as col " + "join sys.index_columns as ind_col on " + "ind_col.column_id=col.column_id " + "join sys.tables as tab on tab.object_id=col.object_id " + "join sys.schemas as sch on sch.schema_id=tab.schema_id " + "where tab.name=:tabname " + "and sch.name=:schname", + bindparams=[ + sql.bindparam('tabname', tablename, sqltypes.Unicode), + sql.bindparam('schname', current_schema, sqltypes.Unicode) + ]), + ) + for row in rp: + if row['index_id'] in indexes: + indexes[row['index_id']]['column_names'].append(row['name']) + + return indexes.values() @reflection.cache def get_view_definition(self, connection, viewname, schema=None, **kw): diff --git a/lib/sqlalchemy/dialects/mssql/information_schema.py b/lib/sqlalchemy/dialects/mssql/information_schema.py index cd1606dbf..4dd6436cd 100644 --- a/lib/sqlalchemy/dialects/mssql/information_schema.py +++ b/lib/sqlalchemy/dialects/mssql/information_schema.py @@ -1,3 +1,5 @@ +# TODO: should be using the sys. catalog with SQL Server, not information schema + from sqlalchemy import Table, MetaData, Column, ForeignKey from sqlalchemy.types import String, Unicode, Integer, TypeDecorator diff --git a/test/dialect/test_mssql.py b/test/dialect/test_mssql.py index e840f9c0c..f9912317c 100644 --- a/test/dialect/test_mssql.py +++ b/test/dialect/test_mssql.py @@ -439,37 +439,79 @@ class ReflectionTest(TestBase, ComparesTables): finally: meta.drop_all() + @testing.provide_metadata def test_identity(self): - meta = MetaData(testing.db) table = Table( - 'identity_test', meta, + 'identity_test', metadata, Column('col1', Integer, Sequence('fred', 2, 3), primary_key=True) ) table.create() meta2 = MetaData(testing.db) - try: - table2 = Table('identity_test', meta2, autoload=True) - sequence = isinstance(table2.c['col1'].default, schema.Sequence) \ - and table2.c['col1'].default - assert sequence.start == 2 - assert sequence.increment == 3 - finally: - table.drop() + table2 = Table('identity_test', meta2, autoload=True) + sequence = isinstance(table2.c['col1'].default, schema.Sequence) \ + and table2.c['col1'].default + assert sequence.start == 2 + assert sequence.increment == 3 @testing.emits_warning("Did not recognize") + @testing.provide_metadata def test_skip_types(self): - meta = MetaData(testing.db) testing.db.execute(""" create table foo (id integer primary key, data xml) """) - try: - t1 = Table('foo', meta, autoload=True) - assert isinstance(t1.c.id.type, Integer) - assert isinstance(t1.c.data.type, types.NullType) - finally: - testing.db.execute("drop table foo") + t1 = Table('foo', metadata, autoload=True) + assert isinstance(t1.c.id.type, Integer) + assert isinstance(t1.c.data.type, types.NullType) + + @testing.provide_metadata + def test_indexes_cols(self): + + t1 = Table('t', metadata, Column('x', Integer), Column('y', Integer)) + Index('foo', t1.c.x, t1.c.y) + metadata.create_all() + + m2 = MetaData() + t2 = Table('t', m2, autoload=True, autoload_with=testing.db) + + eq_( + set(list(t2.indexes)[0].columns), + set([t2.c['x'], t2.c.y]) + ) + + @testing.provide_metadata + def test_indexes_cols_with_commas(self): + + t1 = Table('t', metadata, + Column('x, col', Integer, key='x'), + Column('y', Integer) + ) + Index('foo', t1.c.x, t1.c.y) + metadata.create_all() + + m2 = MetaData() + t2 = Table('t', m2, autoload=True, autoload_with=testing.db) + + eq_( + set(list(t2.indexes)[0].columns), + set([t2.c['x, col'], t2.c.y]) + ) + + @testing.provide_metadata + def test_indexes_cols_with_spaces(self): + + t1 = Table('t', metadata, Column('x col', Integer, key='x'), + Column('y', Integer)) + Index('foo', t1.c.x, t1.c.y) + metadata.create_all() + + m2 = MetaData() + t2 = Table('t', m2, autoload=True, autoload_with=testing.db) + eq_( + set(list(t2.indexes)[0].columns), + set([t2.c['x col'], t2.c.y]) + ) class QueryUnicodeTest(TestBase): |
