diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-11-15 16:58:50 -0500 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-12-11 13:26:05 -0500 |
| commit | ba5cbf9366e9b2c5ed8e27e91815d7a2c3b63e41 (patch) | |
| tree | 038f2263d581d5e49d74731af68febc4bf64eb19 /test/dialect/mssql | |
| parent | 87d58b6d8188ccff808b3207d5f9398bb9adf9b9 (diff) | |
| download | sqlalchemy-ba5cbf9366e9b2c5ed8e27e91815d7a2c3b63e41.tar.gz | |
correct for "autocommit" deprecation warning
Ensure no autocommit warnings occur internally or
within tests.
Also includes fixes for SQL Server full text tests
which apparently have not been working at all for a long
time, as it used long removed APIs. CI has not had
fulltext running for some years and is now installed.
Change-Id: Id806e1856c9da9f0a9eac88cebc7a94ecc95eb96
Diffstat (limited to 'test/dialect/mssql')
| -rw-r--r-- | test/dialect/mssql/test_engine.py | 2 | ||||
| -rw-r--r-- | test/dialect/mssql/test_query.py | 336 | ||||
| -rw-r--r-- | test/dialect/mssql/test_reflection.py | 17 | ||||
| -rw-r--r-- | test/dialect/mssql/test_types.py | 182 |
4 files changed, 276 insertions, 261 deletions
diff --git a/test/dialect/mssql/test_engine.py b/test/dialect/mssql/test_engine.py index 444455958..668df6ecb 100644 --- a/test/dialect/mssql/test_engine.py +++ b/test/dialect/mssql/test_engine.py @@ -382,7 +382,7 @@ class FastExecutemanyTest(fixtures.TestBase): if executemany: assert cursor.fast_executemany - with eng.connect() as conn: + with eng.begin() as conn: conn.execute( t.insert(), [{"id": i, "data": "data_%d" % i} for i in range(100)], diff --git a/test/dialect/mssql/test_query.py b/test/dialect/mssql/test_query.py index d9dc033e1..ea0bfa4d2 100644 --- a/test/dialect/mssql/test_query.py +++ b/test/dialect/mssql/test_query.py @@ -9,7 +9,6 @@ from sqlalchemy import func from sqlalchemy import Identity from sqlalchemy import Integer from sqlalchemy import literal -from sqlalchemy import MetaData from sqlalchemy import or_ from sqlalchemy import PrimaryKeyConstraint from sqlalchemy import select @@ -26,22 +25,15 @@ from sqlalchemy.testing.assertsql import CursorSQL from sqlalchemy.testing.assertsql import DialectSQL from sqlalchemy.util import ue -metadata = None -cattable = None -matchtable = None - -class IdentityInsertTest(fixtures.TestBase, AssertsCompiledSQL): +class IdentityInsertTest(fixtures.TablesTest, AssertsCompiledSQL): __only_on__ = "mssql" __dialect__ = mssql.MSDialect() __backend__ = True @classmethod - def setup_class(cls): - global metadata, cattable - metadata = MetaData(testing.db) - - cattable = Table( + def define_tables(cls, metadata): + Table( "cattable", metadata, Column("id", Integer), @@ -49,82 +41,82 @@ class IdentityInsertTest(fixtures.TestBase, AssertsCompiledSQL): PrimaryKeyConstraint("id", name="PK_cattable"), ) - def setup(self): - metadata.create_all() - - def teardown(self): - metadata.drop_all() - def test_compiled(self): + cattable = self.tables.cattable self.assert_compile( cattable.insert().values(id=9, description="Python"), "INSERT INTO cattable (id, description) " "VALUES (:id, :description)", ) - def test_execute(self): - with testing.db.connect() as conn: - conn.execute(cattable.insert().values(id=9, description="Python")) - - cats = conn.execute(cattable.select().order_by(cattable.c.id)) - eq_([(9, "Python")], list(cats)) + def test_execute(self, connection): + conn = connection + cattable = self.tables.cattable + conn.execute(cattable.insert().values(id=9, description="Python")) - result = conn.execute(cattable.insert().values(description="PHP")) - eq_(result.inserted_primary_key, (10,)) - lastcat = conn.execute( - cattable.select().order_by(desc(cattable.c.id)) - ) - eq_((10, "PHP"), lastcat.first()) - - def test_executemany(self): - with testing.db.connect() as conn: - conn.execute( - cattable.insert(), - [ - {"id": 89, "description": "Python"}, - {"id": 8, "description": "Ruby"}, - {"id": 3, "description": "Perl"}, - {"id": 1, "description": "Java"}, - ], - ) - cats = conn.execute(cattable.select().order_by(cattable.c.id)) - eq_( - [(1, "Java"), (3, "Perl"), (8, "Ruby"), (89, "Python")], - list(cats), - ) - conn.execute( - cattable.insert(), - [{"description": "PHP"}, {"description": "Smalltalk"}], - ) - lastcats = conn.execute( - cattable.select().order_by(desc(cattable.c.id)).limit(2) - ) - eq_([(91, "Smalltalk"), (90, "PHP")], list(lastcats)) + cats = conn.execute(cattable.select().order_by(cattable.c.id)) + eq_([(9, "Python")], list(cats)) - def test_insert_plain_param(self): - with testing.db.connect() as conn: - conn.execute(cattable.insert(), id=5) - eq_(conn.scalar(select(cattable.c.id)), 5) + result = conn.execute(cattable.insert().values(description="PHP")) + eq_(result.inserted_primary_key, (10,)) + lastcat = conn.execute(cattable.select().order_by(desc(cattable.c.id))) + eq_((10, "PHP"), lastcat.first()) - def test_insert_values_key_plain(self): - with testing.db.connect() as conn: - conn.execute(cattable.insert().values(id=5)) - eq_(conn.scalar(select(cattable.c.id)), 5) - - def test_insert_values_key_expression(self): - with testing.db.connect() as conn: - conn.execute(cattable.insert().values(id=literal(5))) - eq_(conn.scalar(select(cattable.c.id)), 5) - - def test_insert_values_col_plain(self): - with testing.db.connect() as conn: - conn.execute(cattable.insert().values({cattable.c.id: 5})) - eq_(conn.scalar(select(cattable.c.id)), 5) - - def test_insert_values_col_expression(self): - with testing.db.connect() as conn: - conn.execute(cattable.insert().values({cattable.c.id: literal(5)})) - eq_(conn.scalar(select(cattable.c.id)), 5) + def test_executemany(self, connection): + conn = connection + cattable = self.tables.cattable + conn.execute( + cattable.insert(), + [ + {"id": 89, "description": "Python"}, + {"id": 8, "description": "Ruby"}, + {"id": 3, "description": "Perl"}, + {"id": 1, "description": "Java"}, + ], + ) + cats = conn.execute(cattable.select().order_by(cattable.c.id)) + eq_( + [(1, "Java"), (3, "Perl"), (8, "Ruby"), (89, "Python")], + list(cats), + ) + conn.execute( + cattable.insert(), + [{"description": "PHP"}, {"description": "Smalltalk"}], + ) + lastcats = conn.execute( + cattable.select().order_by(desc(cattable.c.id)).limit(2) + ) + eq_([(91, "Smalltalk"), (90, "PHP")], list(lastcats)) + + def test_insert_plain_param(self, connection): + conn = connection + cattable = self.tables.cattable + conn.execute(cattable.insert(), id=5) + eq_(conn.scalar(select(cattable.c.id)), 5) + + def test_insert_values_key_plain(self, connection): + conn = connection + cattable = self.tables.cattable + conn.execute(cattable.insert().values(id=5)) + eq_(conn.scalar(select(cattable.c.id)), 5) + + def test_insert_values_key_expression(self, connection): + conn = connection + cattable = self.tables.cattable + conn.execute(cattable.insert().values(id=literal(5))) + eq_(conn.scalar(select(cattable.c.id)), 5) + + def test_insert_values_col_plain(self, connection): + conn = connection + cattable = self.tables.cattable + conn.execute(cattable.insert().values({cattable.c.id: 5})) + eq_(conn.scalar(select(cattable.c.id)), 5) + + def test_insert_values_col_expression(self, connection): + conn = connection + cattable = self.tables.cattable + conn.execute(cattable.insert().values({cattable.c.id: literal(5)})) + eq_(conn.scalar(select(cattable.c.id)), 5) class QueryUnicodeTest(fixtures.TestBase): @@ -391,37 +383,35 @@ def full_text_search_missing(): """Test if full text search is not implemented and return False if it is and True otherwise.""" - try: - connection = testing.db.connect() - try: - connection.exec_driver_sql( - "CREATE FULLTEXT CATALOG Catalog AS " "DEFAULT" - ) - return False - except Exception: - return True - finally: - connection.close() + if not testing.against("mssql"): + return True + + with testing.db.connect() as conn: + result = conn.exec_driver_sql( + "SELECT cast(SERVERPROPERTY('IsFullTextInstalled') as integer)" + ) + return result.scalar() == 0 -class MatchTest(fixtures.TestBase, AssertsCompiledSQL): +class MatchTest(fixtures.TablesTest, AssertsCompiledSQL): __only_on__ = "mssql" __skip_if__ = (full_text_search_missing,) __backend__ = True + run_setup_tables = "once" + run_inserts = run_deletes = "once" + @classmethod - def setup_class(cls): - global metadata, cattable, matchtable - metadata = MetaData(testing.db) - cattable = Table( + def define_tables(cls, metadata): + Table( "cattable", metadata, Column("id", Integer), Column("description", String(50)), PrimaryKeyConstraint("id", name="PK_cattable"), ) - matchtable = Table( + Table( "matchtable", metadata, Column("id", Integer), @@ -429,24 +419,65 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL): Column("category_id", Integer, ForeignKey("cattable.id")), PrimaryKeyConstraint("id", name="PK_matchtable"), ) - DDL( - """CREATE FULLTEXT INDEX + + event.listen( + metadata, + "before_create", + DDL("CREATE FULLTEXT CATALOG Catalog AS DEFAULT"), + ) + event.listen( + metadata, + "after_create", + DDL( + """CREATE FULLTEXT INDEX ON cattable (description) KEY INDEX PK_cattable""" - ).execute_at("after-create", matchtable) - DDL( - """CREATE FULLTEXT INDEX + ), + ) + event.listen( + metadata, + "after_create", + DDL( + """CREATE FULLTEXT INDEX ON matchtable (title) KEY INDEX PK_matchtable""" - ).execute_at("after-create", matchtable) - metadata.create_all() - cattable.insert().execute( + ), + ) + + event.listen( + metadata, + "after_drop", + DDL("DROP FULLTEXT CATALOG Catalog"), + ) + + @classmethod + def setup_bind(cls): + return testing.db.execution_options(isolation_level="AUTOCOMMIT") + + @classmethod + def setup_class(cls): + with testing.db.connect().execution_options( + isolation_level="AUTOCOMMIT" + ) as conn: + try: + conn.exec_driver_sql("DROP FULLTEXT CATALOG Catalog") + except: + pass + super(MatchTest, cls).setup_class() + + @classmethod + def insert_data(cls, connection): + cattable, matchtable = cls.tables("cattable", "matchtable") + + connection.execute( + cattable.insert(), [ {"id": 1, "description": "Python"}, {"id": 2, "description": "Ruby"}, - ] + ], ) - matchtable.insert().execute( + connection.execute( + matchtable.insert(), [ { "id": 1, @@ -461,62 +492,53 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL): }, {"id": 4, "title": "Guide to Django", "category_id": 1}, {"id": 5, "title": "Python in a Nutshell", "category_id": 1}, - ] + ], ) - DDL("WAITFOR DELAY '00:00:05'").execute(bind=engines.testing_engine()) - - @classmethod - def teardown_class(cls): - metadata.drop_all() - connection = testing.db.connect() - connection.exec_driver_sql("DROP FULLTEXT CATALOG Catalog") - connection.close() + # apparently this is needed! index must run asynchronously + connection.execute(DDL("WAITFOR DELAY '00:00:05'")) def test_expression(self): + matchtable = self.tables.matchtable self.assert_compile( matchtable.c.title.match("somstr"), "CONTAINS (matchtable.title, ?)", ) - def test_simple_match(self): - results = ( + def test_simple_match(self, connection): + matchtable = self.tables.matchtable + results = connection.execute( matchtable.select() .where(matchtable.c.title.match("python")) .order_by(matchtable.c.id) - .execute() - .fetchall() - ) + ).fetchall() eq_([2, 5], [r.id for r in results]) - def test_simple_match_with_apostrophe(self): - results = ( - matchtable.select() - .where(matchtable.c.title.match("Matz's")) - .execute() - .fetchall() - ) + def test_simple_match_with_apostrophe(self, connection): + matchtable = self.tables.matchtable + results = connection.execute( + matchtable.select().where(matchtable.c.title.match("Matz's")) + ).fetchall() eq_([3], [r.id for r in results]) - def test_simple_prefix_match(self): - results = ( - matchtable.select() - .where(matchtable.c.title.match('"nut*"')) - .execute() - .fetchall() - ) + def test_simple_prefix_match(self, connection): + matchtable = self.tables.matchtable + results = connection.execute( + matchtable.select().where(matchtable.c.title.match('"nut*"')) + ).fetchall() eq_([5], [r.id for r in results]) - def test_simple_inflectional_match(self): - results = ( - matchtable.select() - .where(matchtable.c.title.match('FORMSOF(INFLECTIONAL, "dives")')) - .execute() - .fetchall() - ) + def test_simple_inflectional_match(self, connection): + matchtable = self.tables.matchtable + results = connection.execute( + matchtable.select().where( + matchtable.c.title.match('FORMSOF(INFLECTIONAL, "dives")') + ) + ).fetchall() eq_([2], [r.id for r in results]) - def test_or_match(self): - results1 = ( + def test_or_match(self, connection): + matchtable = self.tables.matchtable + results1 = connection.execute( matchtable.select() .where( or_( @@ -525,31 +547,25 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL): ) ) .order_by(matchtable.c.id) - .execute() - .fetchall() - ) + ).fetchall() eq_([3, 5], [r.id for r in results1]) - results2 = ( + results2 = connection.execute( matchtable.select() .where(matchtable.c.title.match("nutshell OR ruby")) .order_by(matchtable.c.id) - .execute() - .fetchall() - ) + ).fetchall() eq_([3, 5], [r.id for r in results2]) - def test_and_match(self): - results1 = ( - matchtable.select() - .where( + def test_and_match(self, connection): + matchtable = self.tables.matchtable + results1 = connection.execute( + matchtable.select().where( and_( matchtable.c.title.match("python"), matchtable.c.title.match("nutshell"), ) ) - .execute() - .fetchall() - ) + ).fetchall() eq_([5], [r.id for r in results1]) results2 = ( matchtable.select() @@ -559,8 +575,10 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL): ) eq_([5], [r.id for r in results2]) - def test_match_across_joins(self): - results = ( + def test_match_across_joins(self, connection): + matchtable = self.tables.matchtable + cattable = self.tables.cattable + results = connection.execute( matchtable.select() .where( and_( @@ -572,7 +590,5 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL): ) ) .order_by(matchtable.c.id) - .execute() - .fetchall() - ) + ).fetchall() eq_([1, 3, 5], [r.id for r in results]) diff --git a/test/dialect/mssql/test_reflection.py b/test/dialect/mssql/test_reflection.py index 6009bfb6c..86c97316a 100644 --- a/test/dialect/mssql/test_reflection.py +++ b/test/dialect/mssql/test_reflection.py @@ -741,14 +741,9 @@ class IdentityReflectionTest(fixtures.TablesTest): @testing.requires.views def test_reflect_views(self, connection): - try: - with testing.db.connect() as conn: - conn.exec_driver_sql("CREATE VIEW view1 AS SELECT * FROM t1") - insp = inspect(testing.db) - for col in insp.get_columns("view1"): - is_true("dialect_options" not in col) - is_true("identity" in col) - eq_(col["identity"], {}) - finally: - with testing.db.connect() as conn: - conn.exec_driver_sql("DROP VIEW view1") + connection.exec_driver_sql("CREATE VIEW view1 AS SELECT * FROM t1") + insp = inspect(connection) + for col in insp.get_columns("view1"): + is_true("dialect_options" not in col) + is_true("identity" in col) + eq_(col["identity"], {}) diff --git a/test/dialect/mssql/test_types.py b/test/dialect/mssql/test_types.py index 11a2a25b3..a4a3bedda 100644 --- a/test/dialect/mssql/test_types.py +++ b/test/dialect/mssql/test_types.py @@ -221,7 +221,7 @@ class RowVersionTest(fixtures.TablesTest): Column("rv", cls(convert_int=convert_int)), ) - with testing.db.connect() as conn: + with testing.db.begin() as conn: conn.execute(t.insert().values(data="foo")) last_ts_1 = conn.exec_driver_sql("SELECT @@DBTS").scalar() @@ -545,7 +545,7 @@ class TypeRoundTripTest( __backend__ = True @testing.provide_metadata - def test_decimal_notation(self): + def test_decimal_notation(self, connection): metadata = self.metadata numeric_table = Table( "numeric_table", @@ -560,7 +560,7 @@ class TypeRoundTripTest( "numericcol", Numeric(precision=38, scale=20, asdecimal=True) ), ) - metadata.create_all() + metadata.create_all(connection) test_items = [ decimal.Decimal(d) for d in ( @@ -623,21 +623,20 @@ class TypeRoundTripTest( ) ] - with testing.db.connect() as conn: - for value in test_items: - result = conn.execute( - numeric_table.insert(), dict(numericcol=value) - ) - primary_key = result.inserted_primary_key - returned = conn.scalar( - select(numeric_table.c.numericcol).where( - numeric_table.c.id == primary_key[0] - ) + for value in test_items: + result = connection.execute( + numeric_table.insert(), dict(numericcol=value) + ) + primary_key = result.inserted_primary_key + returned = connection.scalar( + select(numeric_table.c.numericcol).where( + numeric_table.c.id == primary_key[0] ) - eq_(value, returned) + ) + eq_(value, returned) @testing.provide_metadata - def test_float(self): + def test_float(self, connection): metadata = self.metadata float_table = Table( @@ -652,41 +651,47 @@ class TypeRoundTripTest( Column("floatcol", Float()), ) - metadata.create_all() - try: - test_items = [ - float(d) - for d in ( - "1500000.00000000000000000000", - "-1500000.00000000000000000000", - "1500000", - "0.0000000000000000002", - "0.2", - "-0.0000000000000000002", - "156666.458923543", - "-156666.458923543", - "1", - "-1", - "1234", - "2E-12", - "4E8", - "3E-6", - "3E-7", - "4.1", - "1E-1", - "1E-2", - "1E-3", - "1E-4", - "1E-5", - "1E-6", - "1E-7", - "1E-8", + metadata.create_all(connection) + test_items = [ + float(d) + for d in ( + "1500000.00000000000000000000", + "-1500000.00000000000000000000", + "1500000", + "0.0000000000000000002", + "0.2", + "-0.0000000000000000002", + "156666.458923543", + "-156666.458923543", + "1", + "-1", + "1234", + "2E-12", + "4E8", + "3E-6", + "3E-7", + "4.1", + "1E-1", + "1E-2", + "1E-3", + "1E-4", + "1E-5", + "1E-6", + "1E-7", + "1E-8", + ) + ] + for value in test_items: + result = connection.execute( + float_table.insert(), dict(floatcol=value) + ) + primary_key = result.inserted_primary_key + returned = connection.scalar( + select(float_table.c.floatcol).where( + float_table.c.id == primary_key[0] ) - ] - for value in test_items: - float_table.insert().execute(floatcol=value) - except Exception as e: - raise e + ) + eq_(value, returned) # todo this should suppress warnings, but it does not @emits_warning_on("mssql+mxodbc", r".*does not have any indexes.*") @@ -770,18 +775,17 @@ class TypeRoundTripTest( d2 = datetime.datetime(2007, 10, 30, 11, 2, 32) return t, (d1, t1, d2) - def test_date_roundtrips(self, date_fixture): + def test_date_roundtrips(self, date_fixture, connection): t, (d1, t1, d2) = date_fixture - with testing.db.begin() as conn: - conn.execute( - t.insert(), adate=d1, adatetime=d2, atime1=t1, atime2=d2 - ) + connection.execute( + t.insert(), adate=d1, adatetime=d2, atime1=t1, atime2=d2 + ) - row = conn.execute(t.select()).first() - eq_( - (row.adate, row.adatetime, row.atime1, row.atime2), - (d1, d2, t1, d2.time()), - ) + row = connection.execute(t.select()).first() + eq_( + (row.adate, row.adatetime, row.atime1, row.atime2), + (d1, d2, t1, d2.time()), + ) @testing.metadata_fixture() def datetimeoffset_fixture(self, metadata): @@ -870,45 +874,45 @@ class TypeRoundTripTest( dto_param_value, expected_offset_hours, should_fail, + connection, ): t = datetimeoffset_fixture dto_param_value = dto_param_value() - with testing.db.begin() as conn: - if should_fail: - assert_raises( - sa.exc.DBAPIError, - conn.execute, - t.insert(), - adatetimeoffset=dto_param_value, - ) - return - - conn.execute( + if should_fail: + assert_raises( + sa.exc.DBAPIError, + connection.execute, t.insert(), adatetimeoffset=dto_param_value, ) + return - row = conn.execute(t.select()).first() + connection.execute( + t.insert(), + adatetimeoffset=dto_param_value, + ) - if dto_param_value is None: - is_(row.adatetimeoffset, None) - else: - eq_( - row.adatetimeoffset, - datetime.datetime( - 2007, - 10, - 30, - 11, - 2, - 32, - 123456, - util.timezone( - datetime.timedelta(hours=expected_offset_hours) - ), + row = connection.execute(t.select()).first() + + if dto_param_value is None: + is_(row.adatetimeoffset, None) + else: + eq_( + row.adatetimeoffset, + datetime.datetime( + 2007, + 10, + 30, + 11, + 2, + 32, + 123456, + util.timezone( + datetime.timedelta(hours=expected_offset_hours) ), - ) + ), + ) @emits_warning_on("mssql+mxodbc", r".*does not have any indexes.*") @testing.provide_metadata @@ -1173,7 +1177,7 @@ class BinaryTest(fixtures.TestBase): if expected is None: expected = data - with engine.connect() as conn: + with engine.begin() as conn: conn.execute(binary_table.insert(), data=data) eq_(conn.scalar(select(binary_table.c.data)), expected) |
