diff options
| author | mike bayer <mike_mp@zzzcomputing.com> | 2022-08-24 17:00:25 +0000 |
|---|---|---|
| committer | Gerrit Code Review <gerrit@ci3.zzzcomputing.com> | 2022-08-24 17:00:25 +0000 |
| commit | e475f8f4b136ff52a97e002e96b65880bc9f59bf (patch) | |
| tree | 08406c47e9ecde77ebf2a3bb02b22b5765526a1b /test/dialect | |
| parent | 27bf1c1c287debb69c4644bf6dc35e3bad5470ad (diff) | |
| parent | 08c96ff74c36b471724e7d161bdc594f21893b33 (diff) | |
| download | sqlalchemy-e475f8f4b136ff52a97e002e96b65880bc9f59bf.tar.gz | |
Merge "include mssql_clustered dialect_options when reflecting - issue #8288" into main
Diffstat (limited to 'test/dialect')
| -rw-r--r-- | test/dialect/mssql/test_reflection.py | 158 |
1 files changed, 157 insertions, 1 deletions
diff --git a/test/dialect/mssql/test_reflection.py b/test/dialect/mssql/test_reflection.py index cd8742b9c..f682538b3 100644 --- a/test/dialect/mssql/test_reflection.py +++ b/test/dialect/mssql/test_reflection.py @@ -571,10 +571,166 @@ class ReflectionTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL): t2 = Table("t", MetaData(), autoload_with=connection) idx = list(sorted(t2.indexes, key=lambda idx: idx.name))[0] + self.assert_compile( + CreateIndex(idx), + "CREATE NONCLUSTERED INDEX idx_x ON t (x) WHERE ([x]='test')", + ) + + def test_index_reflection_clustered(self, metadata, connection): + """ + when the result of get_indexes() is used to build an index it should + include the CLUSTERED keyword when appropriate + """ + t1 = Table( + "t", + metadata, + Column("id", Integer), + Column("x", types.String(20)), + Column("y", types.Integer), + ) + Index("idx_x", t1.c.x, mssql_clustered=True) + Index("idx_y", t1.c.y) + metadata.create_all(connection) + ind = testing.db.dialect.get_indexes(connection, "t", None) + + clustered_index = "" + for ix in ind: + if ix["dialect_options"]["mssql_clustered"]: + clustered_index = ix["name"] + + eq_(clustered_index, "idx_x") + + t2 = Table("t", MetaData(), autoload_with=connection) + idx = list(sorted(t2.indexes, key=lambda idx: idx.name))[0] + + self.assert_compile( + CreateIndex(idx), "CREATE CLUSTERED INDEX idx_x ON t (x)" + ) + + def test_index_reflection_filtered_and_clustered( + self, metadata, connection + ): + """ + table with one filtered index and one clustered index so each index + will have different dialect_options keys + """ + t1 = Table( + "t", + metadata, + Column("id", Integer), + Column("x", types.String(20)), + Column("y", types.Integer), + ) + Index("idx_x", t1.c.x, mssql_clustered=True) + Index("idx_y", t1.c.y, mssql_where=t1.c.y >= 5) + metadata.create_all(connection) + ind = testing.db.dialect.get_indexes(connection, "t", None) + + clustered_index = "" + for ix in ind: + if ix["dialect_options"]["mssql_clustered"]: + clustered_index = ix["name"] + + eq_(clustered_index, "idx_x") + + filtered_indexes = [] + for ix in ind: + if "dialect_options" in ix: + if "mssql_where" in ix["dialect_options"]: + filtered_indexes.append( + ix["dialect_options"]["mssql_where"] + ) + + eq_(sorted(filtered_indexes), ["([y]>=(5))"]) + + t2 = Table("t", MetaData(), autoload_with=connection) + clustered_idx = list( + sorted(t2.indexes, key=lambda clustered_idx: clustered_idx.name) + )[0] + filtered_idx = list( + sorted(t2.indexes, key=lambda filtered_idx: filtered_idx.name) + )[1] + + self.assert_compile( + CreateIndex(clustered_idx), "CREATE CLUSTERED INDEX idx_x ON t (x)" + ) self.assert_compile( - CreateIndex(idx), "CREATE INDEX idx_x ON t (x) WHERE ([x]='test')" + CreateIndex(filtered_idx), + "CREATE NONCLUSTERED INDEX idx_y ON t (y) WHERE ([y]>=(5))", + ) + + def test_index_reflection_nonclustered(self, metadata, connection): + """ + one index created by specifying mssql_clustered=False + one created without specifying mssql_clustered property so it will + use default of NONCLUSTERED. + When reflected back mssql_clustered=False should be included in both + """ + t1 = Table( + "t", + metadata, + Column("id", Integer), + Column("x", types.String(20)), + Column("y", types.Integer), + ) + Index("idx_x", t1.c.x, mssql_clustered=False) + Index("idx_y", t1.c.y) + metadata.create_all(connection) + ind = testing.db.dialect.get_indexes(connection, "t", None) + + for ix in ind: + assert ix["dialect_options"]["mssql_clustered"] == False + + t2 = Table("t", MetaData(), autoload_with=connection) + idx = list(sorted(t2.indexes, key=lambda idx: idx.name))[0] + + self.assert_compile( + CreateIndex(idx), "CREATE NONCLUSTERED INDEX idx_x ON t (x)" + ) + + def test_primary_key_reflection_clustered(self, metadata, connection): + """ + A primary key will be clustered by default if no other clustered index + exists. + When reflected back, mssql_clustered=True should be present. + """ + t1 = Table( + "t", + metadata, + Column("id", Integer), + Column("x", types.String(20)), + Column("y", types.Integer), ) + PrimaryKeyConstraint(t1.c.id, name="pk_t") + + metadata.create_all(connection) + pk_reflect = testing.db.dialect.get_pk_constraint( + connection, "t", None + ) + + assert pk_reflect["dialect_options"]["mssql_clustered"] == True + + def test_primary_key_reflection_nonclustered(self, metadata, connection): + """ + Nonclustered primary key should include mssql_clustered=False + when reflected back + """ + t1 = Table( + "t", + metadata, + Column("id", Integer), + Column("x", types.String(20)), + Column("y", types.Integer), + ) + PrimaryKeyConstraint(t1.c.id, name="pk_t", mssql_clustered=False) + + metadata.create_all(connection) + pk_reflect = testing.db.dialect.get_pk_constraint( + connection, "t", None + ) + + assert pk_reflect["dialect_options"]["mssql_clustered"] == False def test_max_ident_in_varchar_not_present(self, metadata, connection): """test [ticket:3504]. |
