diff options
Diffstat (limited to 'test/engine')
| -rw-r--r-- | test/engine/test_bind.py | 5 | ||||
| -rw-r--r-- | test/engine/test_deprecations.py | 299 | ||||
| -rw-r--r-- | test/engine/test_execute.py | 463 | ||||
| -rw-r--r-- | test/engine/test_logging.py | 44 | ||||
| -rw-r--r-- | test/engine/test_reconnect.py | 8 | ||||
| -rw-r--r-- | test/engine/test_reflection.py | 24 | ||||
| -rw-r--r-- | test/engine/test_transaction.py | 99 |
7 files changed, 624 insertions, 318 deletions
diff --git a/test/engine/test_bind.py b/test/engine/test_bind.py index 956874846..c58e703d3 100644 --- a/test/engine/test_bind.py +++ b/test/engine/test_bind.py @@ -133,7 +133,10 @@ class BindTest(fixtures.TestBase): trans.rollback() metadata.bind = None assert ( - conn.execute("select count(*) from test_table").scalar() == 0 + conn.exec_driver_sql( + "select count(*) from test_table" + ).scalar() + == 0 ) finally: metadata.drop_all(bind=conn) diff --git a/test/engine/test_deprecations.py b/test/engine/test_deprecations.py index 0c77e8c25..edf1503d5 100644 --- a/test/engine/test_deprecations.py +++ b/test/engine/test_deprecations.py @@ -4,6 +4,7 @@ from sqlalchemy import create_engine from sqlalchemy import event from sqlalchemy import ForeignKey from sqlalchemy import func +from sqlalchemy import INT from sqlalchemy import Integer from sqlalchemy import literal from sqlalchemy import MetaData @@ -12,7 +13,9 @@ from sqlalchemy import select from sqlalchemy import String from sqlalchemy import testing from sqlalchemy import TypeDecorator +from sqlalchemy import VARCHAR from sqlalchemy.engine import reflection +from sqlalchemy.engine.base import Connection from sqlalchemy.engine.base import Engine from sqlalchemy.engine.mock import MockConnection from sqlalchemy.testing import assert_raises @@ -22,12 +25,20 @@ from sqlalchemy.testing import eq_ from sqlalchemy.testing import fixtures from sqlalchemy.testing import is_ from sqlalchemy.testing import is_false +from sqlalchemy.testing import is_instance_of from sqlalchemy.testing import is_true from sqlalchemy.testing.mock import Mock from sqlalchemy.testing.schema import Column from sqlalchemy.testing.schema import Table +def _string_deprecation_expect(): + return testing.expect_deprecated_20( + r"Passing a string to Connection.execute\(\) is deprecated " + r"and will be removed in version 2.0" + ) + + class SomeException(Exception): pass @@ -35,6 +46,10 @@ class SomeException(Exception): class ConnectionlessDeprecationTest(fixtures.TestBase): """test various things associated with "connectionless" executions.""" + def check_usage(self, inspector): + with inspector._operation_context() as conn: + is_instance_of(conn, Connection) + def test_inspector_constructor_engine(self): with testing.expect_deprecated( r"The __init__\(\) method on Inspector is deprecated and will " @@ -43,6 +58,7 @@ class ConnectionlessDeprecationTest(fixtures.TestBase): i1 = reflection.Inspector(testing.db) is_(i1.bind, testing.db) + self.check_usage(i1) def test_inspector_constructor_connection(self): with testing.db.connect() as conn: @@ -54,6 +70,7 @@ class ConnectionlessDeprecationTest(fixtures.TestBase): is_(i1.bind, conn) is_(i1.engine, testing.db) + self.check_usage(i1) def test_inspector_from_engine(self): with testing.expect_deprecated( @@ -63,6 +80,7 @@ class ConnectionlessDeprecationTest(fixtures.TestBase): i1 = reflection.Inspector.from_engine(testing.db) is_(i1.bind, testing.db) + self.check_usage(i1) def test_bind_close_conn(self): e = testing.db @@ -256,7 +274,7 @@ class HandleErrorTest(fixtures.TestBase): with engine.connect() as conn: try: - conn.execute("SELECT FOO FROM I_DONT_EXIST") + conn.exec_driver_sql("SELECT FOO FROM I_DONT_EXIST") assert False except tsa.exc.DBAPIError as e: eq_(canary.mock_calls[0][1][5], e.orig) @@ -305,7 +323,10 @@ class HandleErrorTest(fixtures.TestBase): ) assert_raises_message( - TypeError, "I'm not a DBAPI error", c.execute, "select " + TypeError, + "I'm not a DBAPI error", + c.exec_driver_sql, + "select ", ) # no legacy event eq_(listener.mock_calls, []) @@ -369,6 +390,10 @@ class PoolTestBase(fixtures.TestBase): ) +def select1(db): + return str(select([1]).compile(dialect=db.dialect)) + + class DeprecatedEngineFeatureTest(fixtures.TablesTest): __backend__ = True @@ -442,6 +467,60 @@ class DeprecatedEngineFeatureTest(fixtures.TablesTest): assert_raises(Exception, conn.transaction, fn, 5, value=8) self._assert_no_data() + def test_execute_plain_string(self): + with _string_deprecation_expect(): + testing.db.execute(select1(testing.db)).scalar() + + def test_scalar_plain_string(self): + with _string_deprecation_expect(): + testing.db.scalar(select1(testing.db)) + + # Tests for the warning when non dict params are used + # @testing.combinations(42, (42,)) + # def test_execute_positional_non_dicts(self, args): + # with testing.expect_deprecated( + # r"Usage of tuple or scalars as positional arguments of " + # ): + # testing.db.execute(text(select1(testing.db)), args).scalar() + + # @testing.combinations(42, (42,)) + # def test_scalar_positional_non_dicts(self, args): + # with testing.expect_deprecated( + # r"Usage of tuple or scalars as positional arguments of " + # ): + # testing.db.scalar(text(select1(testing.db)), args) + + +class DeprecatedConnectionFeatureTest(fixtures.TablesTest): + __backend__ = True + + def test_execute_plain_string(self): + with _string_deprecation_expect(): + with testing.db.connect() as conn: + conn.execute(select1(testing.db)).scalar() + + def test_scalar_plain_string(self): + with _string_deprecation_expect(): + with testing.db.connect() as conn: + conn.scalar(select1(testing.db)) + + # Tests for the warning when non dict params are used + # @testing.combinations(42, (42,)) + # def test_execute_positional_non_dicts(self, args): + # with testing.expect_deprecated( + # r"Usage of tuple or scalars as positional arguments of " + # ): + # with testing.db.connect() as conn: + # conn.execute(text(select1(testing.db)), args).scalar() + + # @testing.combinations(42, (42,)) + # def test_scalar_positional_non_dicts(self, args): + # with testing.expect_deprecated( + # r"Usage of tuple or scalars as positional arguments of " + # ): + # with testing.db.connect() as conn: + # conn.scalar(text(select1(testing.db)), args) + class DeprecatedReflectionTest(fixtures.TablesTest): @classmethod @@ -540,3 +619,219 @@ class ExecutionOptionsTest(fixtures.TestBase): ): c2_branch = c2.connect() eq_(c2_branch._execution_options, {"foo": "bar"}) + + +class RawExecuteTest(fixtures.TablesTest): + __backend__ = True + + @classmethod + def define_tables(cls, metadata): + Table( + "users", + metadata, + Column("user_id", INT, primary_key=True, autoincrement=False), + Column("user_name", VARCHAR(20)), + ) + Table( + "users_autoinc", + metadata, + Column( + "user_id", INT, primary_key=True, test_needs_autoincrement=True + ), + Column("user_name", VARCHAR(20)), + ) + + @testing.fails_on( + "postgresql+pg8000", + "pg8000 still doesn't allow single paren without params", + ) + def test_no_params_option(self, connection): + stmt = ( + "SELECT '%'" + + testing.db.dialect.statement_compiler( + testing.db.dialect, None + ).default_from() + ) + + result = ( + connection.execution_options(no_parameters=True) + .execute(stmt) + .scalar() + ) + eq_(result, "%") + + @testing.requires.qmark_paramstyle + def test_raw_qmark(self, connection): + conn = connection + + with _string_deprecation_expect(): + conn.execute( + "insert into users (user_id, user_name) " "values (?, ?)", + (1, "jack"), + ) + with _string_deprecation_expect(): + conn.execute( + "insert into users (user_id, user_name) " "values (?, ?)", + [2, "fred"], + ) + + with _string_deprecation_expect(): + conn.execute( + "insert into users (user_id, user_name) " "values (?, ?)", + [3, "ed"], + [4, "horse"], + ) + with _string_deprecation_expect(): + conn.execute( + "insert into users (user_id, user_name) " "values (?, ?)", + (5, "barney"), + (6, "donkey"), + ) + + with _string_deprecation_expect(): + conn.execute( + "insert into users (user_id, user_name) " "values (?, ?)", + 7, + "sally", + ) + + with _string_deprecation_expect(): + res = conn.execute("select * from users order by user_id") + assert res.fetchall() == [ + (1, "jack"), + (2, "fred"), + (3, "ed"), + (4, "horse"), + (5, "barney"), + (6, "donkey"), + (7, "sally"), + ] + for multiparam, param in [ + (("jack", "fred"), {}), + ((["jack", "fred"],), {}), + ]: + with _string_deprecation_expect(): + res = conn.execute( + "select * from users where user_name=? or " + "user_name=? order by user_id", + *multiparam, + **param + ) + assert res.fetchall() == [(1, "jack"), (2, "fred")] + + with _string_deprecation_expect(): + res = conn.execute("select * from users where user_name=?", "jack") + assert res.fetchall() == [(1, "jack")] + + @testing.requires.format_paramstyle + def test_raw_sprintf(self, connection): + conn = connection + with _string_deprecation_expect(): + conn.execute( + "insert into users (user_id, user_name) " "values (%s, %s)", + [1, "jack"], + ) + with _string_deprecation_expect(): + conn.execute( + "insert into users (user_id, user_name) " "values (%s, %s)", + [2, "ed"], + [3, "horse"], + ) + with _string_deprecation_expect(): + conn.execute( + "insert into users (user_id, user_name) " "values (%s, %s)", + 4, + "sally", + ) + with _string_deprecation_expect(): + conn.execute("insert into users (user_id) values (%s)", 5) + with _string_deprecation_expect(): + res = conn.execute("select * from users order by user_id") + assert res.fetchall() == [ + (1, "jack"), + (2, "ed"), + (3, "horse"), + (4, "sally"), + (5, None), + ] + for multiparam, param in [ + (("jack", "ed"), {}), + ((["jack", "ed"],), {}), + ]: + with _string_deprecation_expect(): + res = conn.execute( + "select * from users where user_name=%s or " + "user_name=%s order by user_id", + *multiparam, + **param + ) + assert res.fetchall() == [(1, "jack"), (2, "ed")] + with _string_deprecation_expect(): + res = conn.execute( + "select * from users where user_name=%s", "jack" + ) + assert res.fetchall() == [(1, "jack")] + + @testing.requires.pyformat_paramstyle + def test_raw_python(self, connection): + conn = connection + with _string_deprecation_expect(): + conn.execute( + "insert into users (user_id, user_name) " + "values (%(id)s, %(name)s)", + {"id": 1, "name": "jack"}, + ) + with _string_deprecation_expect(): + conn.execute( + "insert into users (user_id, user_name) " + "values (%(id)s, %(name)s)", + {"id": 2, "name": "ed"}, + {"id": 3, "name": "horse"}, + ) + with _string_deprecation_expect(): + conn.execute( + "insert into users (user_id, user_name) " + "values (%(id)s, %(name)s)", + id=4, + name="sally", + ) + with _string_deprecation_expect(): + res = conn.execute("select * from users order by user_id") + assert res.fetchall() == [ + (1, "jack"), + (2, "ed"), + (3, "horse"), + (4, "sally"), + ] + + @testing.requires.named_paramstyle + def test_raw_named(self, connection): + conn = connection + with _string_deprecation_expect(): + conn.execute( + "insert into users (user_id, user_name) " + "values (:id, :name)", + {"id": 1, "name": "jack"}, + ) + with _string_deprecation_expect(): + conn.execute( + "insert into users (user_id, user_name) " + "values (:id, :name)", + {"id": 2, "name": "ed"}, + {"id": 3, "name": "horse"}, + ) + with _string_deprecation_expect(): + conn.execute( + "insert into users (user_id, user_name) " + "values (:id, :name)", + id=4, + name="sally", + ) + with _string_deprecation_expect(): + res = conn.execute("select * from users order by user_id") + assert res.fetchall() == [ + (1, "jack"), + (2, "ed"), + (3, "horse"), + (4, "sally"), + ] diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index 150038a40..0b5b1b16d 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -50,27 +50,22 @@ from sqlalchemy.testing.util import gc_collect from sqlalchemy.testing.util import picklers -users, metadata, users_autoinc = None, None, None - - class SomeException(Exception): pass -class ExecuteTest(fixtures.TestBase): +class ExecuteTest(fixtures.TablesTest): __backend__ = True @classmethod - def setup_class(cls): - global users, users_autoinc, metadata - metadata = MetaData(testing.db) - users = Table( + def define_tables(cls, metadata): + Table( "users", metadata, Column("user_id", INT, primary_key=True, autoincrement=False), Column("user_name", VARCHAR(20)), ) - users_autoinc = Table( + Table( "users_autoinc", metadata, Column( @@ -78,15 +73,6 @@ class ExecuteTest(fixtures.TestBase): ), Column("user_name", VARCHAR(20)), ) - metadata.create_all() - - @engines.close_first - def teardown(self): - testing.db.execute(users.delete()) - - @classmethod - def teardown_class(cls): - metadata.drop_all() @testing.fails_on( "postgresql+pg8000", @@ -101,223 +87,176 @@ class ExecuteTest(fixtures.TestBase): ) conn = testing.db.connect() - result = conn.execution_options(no_parameters=True).scalar(stmt) + result = ( + conn.execution_options(no_parameters=True) + .exec_driver_sql(stmt) + .scalar() + ) eq_(result, "%") - @testing.fails_on_everything_except( - "firebird", "sqlite", "+pyodbc", "+mxodbc", "mysql+oursql" - ) - def test_raw_qmark(self): - def go(conn): - conn.execute( - "insert into users (user_id, user_name) " "values (?, ?)", - (1, "jack"), - ) - conn.execute( - "insert into users (user_id, user_name) " "values (?, ?)", - [2, "fred"], - ) - conn.execute( - "insert into users (user_id, user_name) " "values (?, ?)", - [3, "ed"], - [4, "horse"], - ) - conn.execute( - "insert into users (user_id, user_name) " "values (?, ?)", - (5, "barney"), - (6, "donkey"), - ) - conn.execute( - "insert into users (user_id, user_name) " "values (?, ?)", - 7, - "sally", - ) - res = conn.execute("select * from users order by user_id") - assert res.fetchall() == [ - (1, "jack"), - (2, "fred"), - (3, "ed"), - (4, "horse"), - (5, "barney"), - (6, "donkey"), - (7, "sally"), - ] - for multiparam, param in [ - (("jack", "fred"), {}), - ((["jack", "fred"],), {}), - ]: - res = conn.execute( - "select * from users where user_name=? or " - "user_name=? order by user_id", - *multiparam, - **param - ) - assert res.fetchall() == [(1, "jack"), (2, "fred")] - res = conn.execute("select * from users where user_name=?", "jack") - assert res.fetchall() == [(1, "jack")] - conn.execute("delete from users") - - go(testing.db) - conn = testing.db.connect() - try: - go(conn) - finally: - conn.close() - - # some psycopg2 versions bomb this. - @testing.fails_on_everything_except( - "mysql+mysqldb", - "mysql+pymysql", - "mysql+cymysql", - "mysql+mysqlconnector", - "postgresql", - ) - def test_raw_sprintf(self): - def go(conn): - conn.execute( - "insert into users (user_id, user_name) " "values (%s, %s)", - [1, "jack"], - ) - conn.execute( - "insert into users (user_id, user_name) " "values (%s, %s)", - [2, "ed"], - [3, "horse"], - ) - conn.execute( - "insert into users (user_id, user_name) " "values (%s, %s)", - 4, - "sally", - ) - conn.execute("insert into users (user_id) values (%s)", 5) - res = conn.execute("select * from users order by user_id") - assert res.fetchall() == [ - (1, "jack"), - (2, "ed"), - (3, "horse"), - (4, "sally"), - (5, None), - ] - for multiparam, param in [ - (("jack", "ed"), {}), - ((["jack", "ed"],), {}), - ]: - res = conn.execute( - "select * from users where user_name=%s or " - "user_name=%s order by user_id", - *multiparam, - **param - ) - assert res.fetchall() == [(1, "jack"), (2, "ed")] - res = conn.execute( - "select * from users where user_name=%s", "jack" - ) - assert res.fetchall() == [(1, "jack")] - - conn.execute("delete from users") - - go(testing.db) - conn = testing.db.connect() - try: - go(conn) - finally: - conn.close() - - # pyformat is supported for mysql, but skipping because a few driver - # versions have a bug that bombs out on this test. (1.2.2b3, - # 1.2.2c1, 1.2.2) - - @testing.skip_if(lambda: testing.against("mysql+mysqldb"), "db-api flaky") - @testing.fails_on_everything_except( - "postgresql+psycopg2", - "postgresql+psycopg2cffi", - "postgresql+pypostgresql", - "postgresql+pygresql", - "mysql+mysqlconnector", - "mysql+pymysql", - "mysql+cymysql", - "mssql+pymssql", - ) - def test_raw_python(self): - def go(conn): - conn.execute( - "insert into users (user_id, user_name) " - "values (%(id)s, %(name)s)", - {"id": 1, "name": "jack"}, - ) - conn.execute( - "insert into users (user_id, user_name) " - "values (%(id)s, %(name)s)", - {"id": 2, "name": "ed"}, - {"id": 3, "name": "horse"}, - ) - conn.execute( - "insert into users (user_id, user_name) " - "values (%(id)s, %(name)s)", - id=4, - name="sally", - ) - res = conn.execute("select * from users order by user_id") - assert res.fetchall() == [ - (1, "jack"), - (2, "ed"), - (3, "horse"), - (4, "sally"), - ] - conn.execute("delete from users") + def test_raw_positional_invalid(self, connection): + assert_raises_message( + tsa.exc.ArgumentError, + "List argument must consist only of tuples or dictionaries", + connection.exec_driver_sql, + "insert into users (user_id, user_name) " "values (?, ?)", + [2, "fred"], + ) - go(testing.db) - conn = testing.db.connect() - try: - go(conn) - finally: - conn.close() + assert_raises_message( + tsa.exc.ArgumentError, + "List argument must consist only of tuples or dictionaries", + connection.exec_driver_sql, + "insert into users (user_id, user_name) " "values (?, ?)", + [[3, "ed"], [4, "horse"]], + ) - @testing.fails_on_everything_except("sqlite", "oracle+cx_oracle") - def test_raw_named(self): - def go(conn): - conn.execute( - "insert into users (user_id, user_name) " - "values (:id, :name)", - {"id": 1, "name": "jack"}, - ) - conn.execute( - "insert into users (user_id, user_name) " - "values (:id, :name)", - {"id": 2, "name": "ed"}, - {"id": 3, "name": "horse"}, - ) - conn.execute( - "insert into users (user_id, user_name) " - "values (:id, :name)", - id=4, - name="sally", - ) - res = conn.execute("select * from users order by user_id") - assert res.fetchall() == [ - (1, "jack"), - (2, "ed"), - (3, "horse"), - (4, "sally"), - ] - conn.execute("delete from users") + def test_raw_named_invalid(self, connection): + assert_raises( + TypeError, + connection.exec_driver_sql, + "insert into users (user_id, user_name) " + "values (%(id)s, %(name)s)", + {"id": 2, "name": "ed"}, + {"id": 3, "name": "horse"}, + ) + assert_raises( + TypeError, + connection.exec_driver_sql, + "insert into users (user_id, user_name) " + "values (%(id)s, %(name)s)", + id=4, + name="sally", + ) - go(testing.db) - conn = testing.db.connect() - try: - go(conn) - finally: - conn.close() + @testing.requires.qmark_paramstyle + def test_raw_qmark(self, connection): + conn = connection + conn.exec_driver_sql( + "insert into users (user_id, user_name) " "values (?, ?)", + (1, "jack"), + ) + conn.exec_driver_sql( + "insert into users (user_id, user_name) " "values (?, ?)", + (2, "fred"), + ) + conn.exec_driver_sql( + "insert into users (user_id, user_name) " "values (?, ?)", + [(3, "ed"), (4, "horse")], + ) + conn.exec_driver_sql( + "insert into users (user_id, user_name) " "values (?, ?)", + [(5, "barney"), (6, "donkey")], + ) + conn.exec_driver_sql( + "insert into users (user_id, user_name) " "values (?, ?)", + (7, "sally"), + ) + res = conn.exec_driver_sql("select * from users order by user_id") + assert res.fetchall() == [ + (1, "jack"), + (2, "fred"), + (3, "ed"), + (4, "horse"), + (5, "barney"), + (6, "donkey"), + (7, "sally"), + ] + + res = conn.exec_driver_sql( + "select * from users where user_name=?", ("jack",) + ) + assert res.fetchall() == [(1, "jack")] + + @testing.requires.format_paramstyle + def test_raw_sprintf(self, connection): + conn = connection + conn.exec_driver_sql( + "insert into users (user_id, user_name) " "values (%s, %s)", + (1, "jack"), + ) + conn.exec_driver_sql( + "insert into users (user_id, user_name) " "values (%s, %s)", + [(2, "ed"), (3, "horse")], + ) + conn.exec_driver_sql( + "insert into users (user_id, user_name) " "values (%s, %s)", + (4, "sally"), + ) + conn.exec_driver_sql("insert into users (user_id) values (%s)", (5,)) + res = conn.exec_driver_sql("select * from users order by user_id") + assert res.fetchall() == [ + (1, "jack"), + (2, "ed"), + (3, "horse"), + (4, "sally"), + (5, None), + ] + + res = conn.exec_driver_sql( + "select * from users where user_name=%s", ("jack",) + ) + assert res.fetchall() == [(1, "jack")] + + @testing.requires.pyformat_paramstyle + def test_raw_python(self, connection): + conn = connection + conn.exec_driver_sql( + "insert into users (user_id, user_name) " + "values (%(id)s, %(name)s)", + {"id": 1, "name": "jack"}, + ) + conn.exec_driver_sql( + "insert into users (user_id, user_name) " + "values (%(id)s, %(name)s)", + [{"id": 2, "name": "ed"}, {"id": 3, "name": "horse"}], + ) + conn.exec_driver_sql( + "insert into users (user_id, user_name) " + "values (%(id)s, %(name)s)", + dict(id=4, name="sally"), + ) + res = conn.exec_driver_sql("select * from users order by user_id") + assert res.fetchall() == [ + (1, "jack"), + (2, "ed"), + (3, "horse"), + (4, "sally"), + ] + + @testing.requires.named_paramstyle + def test_raw_named(self, connection): + conn = connection + conn.exec_driver_sql( + "insert into users (user_id, user_name) " "values (:id, :name)", + {"id": 1, "name": "jack"}, + ) + conn.exec_driver_sql( + "insert into users (user_id, user_name) " "values (:id, :name)", + [{"id": 2, "name": "ed"}, {"id": 3, "name": "horse"}], + ) + conn.exec_driver_sql( + "insert into users (user_id, user_name) " "values (:id, :name)", + {"id": 4, "name": "sally"}, + ) + res = conn.exec_driver_sql("select * from users order by user_id") + assert res.fetchall() == [ + (1, "jack"), + (2, "ed"), + (3, "horse"), + (4, "sally"), + ] @testing.engines.close_open_connections def test_exception_wrapping_dbapi(self): conn = testing.db.connect() - for _c in testing.db, conn: - assert_raises_message( - tsa.exc.DBAPIError, - r"not_a_valid_statement", - _c.execute, - "not_a_valid_statement", - ) + # engine does not have exec_driver_sql + assert_raises_message( + tsa.exc.DBAPIError, + r"not_a_valid_statement", + conn.exec_driver_sql, + "not_a_valid_statement", + ) @testing.requires.sqlite def test_exception_wrapping_non_dbapi_error(self): @@ -334,7 +273,10 @@ class ExecuteTest(fixtures.TestBase): ) assert_raises_message( - TypeError, "I'm not a DBAPI error", c.execute, "select " + TypeError, + "I'm not a DBAPI error", + c.exec_driver_sql, + "select ", ) eq_(is_disconnect.call_count, 0) @@ -359,7 +301,7 @@ class ExecuteTest(fixtures.TestBase): ): with testing.db.connect() as conn: assert_raises( - tsa.exc.OperationalError, conn.execute, "select 1" + tsa.exc.OperationalError, conn.exec_driver_sql, "select 1" ) def test_exception_wrapping_non_dbapi_statement(self): @@ -378,11 +320,8 @@ class ExecuteTest(fixtures.TestBase): ) _go(testing.db) - conn = testing.db.connect() - try: + with testing.db.connect() as conn: _go(conn) - finally: - conn.close() def test_not_an_executable(self): for obj in ( @@ -574,6 +513,7 @@ class ExecuteTest(fixtures.TestBase): def test_empty_insert(self): """test that execute() interprets [] as a list with no params""" + users_autoinc = self.tables.users_autoinc testing.db.execute( users_autoinc.insert().values(user_name=bindparam("name", None)), @@ -885,7 +825,7 @@ class CompiledCacheTest(fixtures.TestBase): cached_conn.execute(ins, {"user_name": "u3"}) eq_(compile_mock.call_count, 1) assert len(cache) == 1 - eq_(conn.execute("select count(*) from users").scalar(), 3) + eq_(conn.exec_driver_sql("select count(*) from users").scalar(), 3) @testing.only_on( ["sqlite", "mysql", "postgresql"], @@ -924,7 +864,7 @@ class CompiledCacheTest(fixtures.TestBase): cached_conn.execute(ins, {"photo_blob": blob}) eq_(compile_mock.call_count, 1) eq_(len(cache), 1) - eq_(conn.execute("select count(*) from photo").scalar(), 1) + eq_(conn.exec_driver_sql("select count(*) from photo").scalar(), 1) del blob @@ -1427,7 +1367,7 @@ class EngineEventsTest(fixtures.TestBase): with e1.connect() as conn: - result = conn.execute(stmt) + result = conn.exec_driver_sql(stmt) ctx = result.context eq_( @@ -1496,7 +1436,7 @@ class EngineEventsTest(fixtures.TestBase): t1.insert().execute(c1=5, c2="some data") t1.insert().execute(c1=6) eq_( - engine.execute("select * from t1").fetchall(), + engine.execute(text("select * from t1")).fetchall(), [(5, "some data"), (6, "foo")], ) finally: @@ -1987,7 +1927,7 @@ class HandleErrorTest(fixtures.TestBase): with engine.connect() as conn: try: - conn.execute("SELECT FOO FROM I_DONT_EXIST") + conn.exec_driver_sql("SELECT FOO FROM I_DONT_EXIST") assert False except tsa.exc.DBAPIError as e: ctx = canary.mock_calls[0][1][0] @@ -2018,20 +1958,20 @@ class HandleErrorTest(fixtures.TestBase): assert_raises_message( MyException, "my exception", - conn.execute, + conn.exec_driver_sql, "SELECT 'ERROR ONE' FROM I_DONT_EXIST", ) # case 2: return the DBAPI exception we're given; # no wrapping should occur assert_raises( conn.dialect.dbapi.Error, - conn.execute, + conn.exec_driver_sql, "SELECT 'ERROR TWO' FROM I_DONT_EXIST", ) # case 3: normal wrapping assert_raises( tsa.exc.DBAPIError, - conn.execute, + conn.exec_driver_sql, "SELECT 'ERROR THREE' FROM I_DONT_EXIST", ) @@ -2080,7 +2020,7 @@ class HandleErrorTest(fixtures.TestBase): assert_raises_message( MyException2, "my exception chained", - conn.execute, + conn.exec_driver_sql, "SELECT 'ERROR ONE' FROM I_DONT_EXIST", ) eq_(patched.call_count, 1) @@ -2090,7 +2030,7 @@ class HandleErrorTest(fixtures.TestBase): ) as patched: assert_raises( MyException1, - conn.execute, + conn.exec_driver_sql, "SELECT 'ERROR TWO' FROM I_DONT_EXIST", ) eq_(patched.call_count, 1) @@ -2102,7 +2042,7 @@ class HandleErrorTest(fixtures.TestBase): # by err2 assert_raises( MyException1, - conn.execute, + conn.exec_driver_sql, "SELECT 'ERROR THREE' FROM I_DONT_EXIST", ) eq_(patched.call_count, 1) @@ -2112,7 +2052,7 @@ class HandleErrorTest(fixtures.TestBase): ) as patched: assert_raises( tsa.exc.DBAPIError, - conn.execute, + conn.exec_driver_sql, "SELECT 'ERROR FIVE' FROM I_DONT_EXIST", ) eq_(patched.call_count, 1) @@ -2123,7 +2063,7 @@ class HandleErrorTest(fixtures.TestBase): assert_raises_message( MyException3, "my exception short circuit", - conn.execute, + conn.exec_driver_sql, "SELECT 'ERROR FOUR' FROM I_DONT_EXIST", ) eq_(patched.call_count, 1) @@ -2145,7 +2085,7 @@ class HandleErrorTest(fixtures.TestBase): assert_raises_message( tsa.exc.OperationalError, "rollback failed", - conn.execute, + conn.exec_driver_sql, "insert into i_dont_exist (x) values ('y')", ) @@ -2200,7 +2140,10 @@ class HandleErrorTest(fixtures.TestBase): ) assert_raises_message( - TypeError, "I'm not a DBAPI error", c.execute, "select " + TypeError, + "I'm not a DBAPI error", + c.exec_driver_sql, + "select ", ) ctx = listener.mock_calls[0][1][0] eq_(ctx.statement, "select ") @@ -2223,13 +2166,17 @@ class HandleErrorTest(fixtures.TestBase): with engine.connect() as conn: assert_raises( tsa.exc.DBAPIError, - conn.execution_options(skip_user_error_events=True).execute, + conn.execution_options( + skip_user_error_events=True + ).exec_driver_sql, "SELECT ERROR_ONE FROM I_DONT_EXIST", ) assert_raises( MyException1, - conn.execution_options(skip_user_error_events=False).execute, + conn.execution_options( + skip_user_error_events=False + ).exec_driver_sql, "SELECT ERROR_ONE FROM I_DONT_EXIST", ) @@ -2246,7 +2193,7 @@ class HandleErrorTest(fixtures.TestBase): with engine.connect() as c: try: - c.execute("SELECT x FROM nonexistent") + c.exec_driver_sql("SELECT x FROM nonexistent") assert False except tsa.exc.StatementError as st: eq_(st.connection_invalidated, evt_value) @@ -2287,7 +2234,7 @@ class HandleErrorTest(fixtures.TestBase): with engine.connect() as c: target_crec = c.connection._connection_record try: - c.execute("SELECT x FROM nonexistent") + c.exec_driver_sql("SELECT x FROM nonexistent") assert False except tsa.exc.StatementError as st: eq_(st.connection_invalidated, True) @@ -2644,7 +2591,9 @@ class DialectEventTest(fixtures.TestBase): def _test_do_execute(self, retval): with self._run_test(retval) as (conn, m1): - result = conn.execute("insert into table foo", {"foo": "bar"}) + result = conn.exec_driver_sql( + "insert into table foo", {"foo": "bar"} + ) self._assert( retval, m1.do_execute, @@ -2661,7 +2610,7 @@ class DialectEventTest(fixtures.TestBase): def _test_do_executemany(self, retval): with self._run_test(retval) as (conn, m1): - result = conn.execute( + result = conn.exec_driver_sql( "insert into table foo", [{"foo": "bar"}, {"foo": "bar"}] ) self._assert( @@ -2680,9 +2629,9 @@ class DialectEventTest(fixtures.TestBase): def _test_do_execute_no_params(self, retval): with self._run_test(retval) as (conn, m1): - result = conn.execution_options(no_parameters=True).execute( - "insert into table foo" - ) + result = conn.execution_options( + no_parameters=True + ).exec_driver_sql("insert into table foo") self._assert( retval, m1.do_execute_no_params, @@ -2840,7 +2789,7 @@ class AutocommitTextTest(fixtures.TestBase): engine.dialect.dbapi = dbapi with engine.connect() as conn: - conn.execute("%s something table something" % keyword) + conn.exec_driver_sql("%s something table something" % keyword) if expected: eq_(dbapi.connect().mock_calls, [call.cursor(), call.commit()]) diff --git a/test/engine/test_logging.py b/test/engine/test_logging.py index fe4ff44a7..5d50a010d 100644 --- a/test/engine/test_logging.py +++ b/test/engine/test_logging.py @@ -20,6 +20,11 @@ from sqlalchemy.testing import mock from sqlalchemy.testing.util import lazy_gc +def exec_sql(engine, sql, *args, **kwargs): + with engine.connect() as conn: + return conn.exec_driver_sql(sql, *args, **kwargs) + + class LogParamsTest(fixtures.TestBase): __only_on__ = "sqlite" __requires__ = ("ad_hoc_engines",) @@ -29,21 +34,23 @@ class LogParamsTest(fixtures.TestBase): self.no_param_engine = engines.testing_engine( options={"echo": True, "hide_parameters": True} ) - self.eng.execute("create table if not exists foo (data string)") - self.no_param_engine.execute( - "create table if not exists foo (data string)" + exec_sql(self.eng, "create table if not exists foo (data string)") + exec_sql( + self.no_param_engine, + "create table if not exists foo (data string)", ) self.buf = logging.handlers.BufferingHandler(100) for log in [logging.getLogger("sqlalchemy.engine")]: log.addHandler(self.buf) def teardown(self): - self.eng.execute("drop table if exists foo") + exec_sql(self.eng, "drop table if exists foo") for log in [logging.getLogger("sqlalchemy.engine")]: log.removeHandler(self.buf) def test_log_large_list_of_dict(self): - self.eng.execute( + exec_sql( + self.eng, "INSERT INTO foo (data) values (:data)", [{"data": str(i)} for i in range(100)], ) @@ -71,7 +78,8 @@ class LogParamsTest(fixtures.TestBase): ) def test_log_no_parameters(self): - self.no_param_engine.execute( + exec_sql( + self.no_param_engine, "INSERT INTO foo (data) values (:data)", [{"data": str(i)} for i in range(100)], ) @@ -81,7 +89,8 @@ class LogParamsTest(fixtures.TestBase): ) def test_log_large_list_of_tuple(self): - self.eng.execute( + exec_sql( + self.eng, "INSERT INTO foo (data) values (?)", [(str(i),) for i in range(100)], ) @@ -210,7 +219,7 @@ class LogParamsTest(fixtures.TestBase): largeparam = "".join(chr(random.randint(52, 85)) for i in range(5000)) - self.eng.execute("INSERT INTO foo (data) values (?)", (largeparam,)) + exec_sql(self.eng, "INSERT INTO foo (data) values (?)", (largeparam,)) eq_( self.buf.buffer[1].message, @@ -225,7 +234,7 @@ class LogParamsTest(fixtures.TestBase): lp2 = "".join(chr(random.randint(52, 85)) for i in range(8)) lp3 = "".join(chr(random.randint(52, 85)) for i in range(670)) - self.eng.execute("SELECT ?, ?, ?", (lp1, lp2, lp3)) + exec_sql(self.eng, "SELECT ?, ?, ?", (lp1, lp2, lp3)) eq_( self.buf.buffer[1].message, @@ -240,8 +249,10 @@ class LogParamsTest(fixtures.TestBase): lp2 = "".join(chr(random.randint(52, 85)) for i in range(200)) lp3 = "".join(chr(random.randint(52, 85)) for i in range(670)) - self.eng.execute( - "INSERT INTO foo (data) values (?)", [(lp1,), (lp2,), (lp3,)] + exec_sql( + self.eng, + "INSERT INTO foo (data) values (?)", + [(lp1,), (lp2,), (lp3,)], ) eq_( @@ -273,7 +284,8 @@ class LogParamsTest(fixtures.TestBase): tsa.exc.DBAPIError, r".*INSERT INTO nonexistent \(data\) values \(:data\)\]\n" r"\[SQL parameters hidden due to hide_parameters=True\]", - lambda: self.no_param_engine.execute( + lambda: exec_sql( + self.no_param_engine, "INSERT INTO nonexistent (data) values (:data)", [{"data": str(i)} for i in range(10)], ), @@ -324,7 +336,7 @@ class LogParamsTest(fixtures.TestBase): largeparam = "".join(chr(random.randint(52, 85)) for i in range(5000)) self.eng.echo = "debug" - result = self.eng.execute("SELECT ?", (largeparam,)) + result = exec_sql(self.eng, "SELECT ?", (largeparam,)) row = result.first() @@ -370,7 +382,8 @@ class LogParamsTest(fixtures.TestBase): r"{'data': '6'}, {'data': '7'} ... displaying 10 of " r"100 total bound parameter sets ... {'data': '98'}, " r"{'data': '99'}\]", - lambda: self.eng.execute( + lambda: exec_sql( + self.eng, "INSERT INTO nonexistent (data) values (:data)", [{"data": str(i)} for i in range(100)], ), @@ -385,7 +398,8 @@ class LogParamsTest(fixtures.TestBase): r"... displaying " r"10 of 100 total bound parameter sets ... " r"\('98',\), \('99',\)\]", - lambda: self.eng.execute( + lambda: exec_sql( + self.eng, "INSERT INTO nonexistent (data) values (?)", [(str(i),) for i in range(100)], ), diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py index 000be1a70..a09b04748 100644 --- a/test/engine/test_reconnect.py +++ b/test/engine/test_reconnect.py @@ -826,7 +826,7 @@ class CursorErrTest(fixtures.TestBase): def test_cursor_explode(self): db = self._fixture(False, False) conn = db.connect() - result = conn.execute("select foo") + result = conn.exec_driver_sql("select foo") result.close() conn.close() eq_( @@ -1006,7 +1006,7 @@ class RealReconnectTest(fixtures.TestBase): engine = engines.testing_engine() def broken_initialize(connection): - connection.execute("select fake_stuff from _fake_table") + connection.exec_driver_sql("select fake_stuff from _fake_table") engine.dialect.initialize = broken_initialize @@ -1020,7 +1020,7 @@ class RealReconnectTest(fixtures.TestBase): engine = engines.testing_engine() def broken_initialize(connection): - connection.execute("select fake_stuff from _fake_table") + connection.exec_driver_sql("select fake_stuff from _fake_table") engine.dialect.initialize = broken_initialize @@ -1186,7 +1186,7 @@ class InvalidateDuringResultTest(fixtures.TestBase): ) def test_invalidate_on_results(self): conn = self.engine.connect() - result = conn.execute("select * from sometable") + result = conn.exec_driver_sql("select * from sometable") for x in range(20): result.fetchone() self.engine.test_shutdown() diff --git a/test/engine/test_reflection.py b/test/engine/test_reflection.py index 9de181216..3a2a3c699 100644 --- a/test/engine/test_reflection.py +++ b/test/engine/test_reflection.py @@ -1016,7 +1016,7 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): indexes""" with testing.db.begin() as conn: - conn.execute( + conn.exec_driver_sql( """ CREATE TABLE book ( id INTEGER NOT NULL, @@ -1056,7 +1056,7 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): """test reflection of a composite primary key""" with testing.db.begin() as conn: - conn.execute( + conn.exec_driver_sql( """ CREATE TABLE book ( id INTEGER NOT NULL, @@ -2045,19 +2045,21 @@ class ReverseCasingReflectTest(fixtures.TestBase, AssertsCompiledSQL): @testing.requires.denormalized_names def setup(self): - testing.db.execute( + with testing.db.connect() as conn: + conn.exec_driver_sql( + """ + CREATE TABLE weird_casing( + col1 char(20), + "Col2" char(20), + "col3" char(20) + ) """ - CREATE TABLE weird_casing( - col1 char(20), - "Col2" char(20), - "col3" char(20) - ) - """ - ) + ) @testing.requires.denormalized_names def teardown(self): - testing.db.execute("drop table weird_casing") + with testing.db.connect() as conn: + conn.exec_driver_sql("drop table weird_casing") @testing.requires.denormalized_names def test_direct_quoting(self): diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py index 39c9b2ad4..c6a6c5e3a 100644 --- a/test/engine/test_transaction.py +++ b/test/engine/test_transaction.py @@ -60,7 +60,7 @@ class TransactionTest(fixtures.TestBase): transaction.commit() transaction = connection.begin() - result = connection.execute("select * from query_users") + result = connection.exec_driver_sql("select * from query_users") assert len(result.fetchall()) == 3 transaction.commit() connection.close() @@ -74,7 +74,7 @@ class TransactionTest(fixtures.TestBase): connection.execute(users.insert(), user_id=2, user_name="user2") connection.execute(users.insert(), user_id=3, user_name="user3") transaction.rollback() - result = connection.execute("select * from query_users") + result = connection.exec_driver_sql("select * from query_users") assert len(result.fetchall()) == 0 connection.close() @@ -92,7 +92,7 @@ class TransactionTest(fixtures.TestBase): print("Exception: ", e) transaction.rollback() - result = connection.execute("select * from query_users") + result = connection.exec_driver_sql("select * from query_users") assert len(result.fetchall()) == 0 connection.close() @@ -150,7 +150,7 @@ class TransactionTest(fixtures.TestBase): assert_raises_message( exc.InvalidRequestError, "This connection is on an inactive transaction. Please", - connection.execute, + connection.exec_driver_sql, "select 1", ) @@ -189,7 +189,12 @@ class TransactionTest(fixtures.TestBase): assert branched.in_transaction() branched.execute(users.insert(), user_id=2, user_name="user2") nested.rollback() - eq_(connection.scalar("select count(*) from query_users"), 1) + eq_( + connection.exec_driver_sql( + "select count(*) from query_users" + ).scalar(), + 1, + ) finally: connection.close() @@ -201,7 +206,12 @@ class TransactionTest(fixtures.TestBase): branched.execute(users.insert(), user_id=1, user_name="user1") finally: connection.close() - eq_(testing.db.scalar("select count(*) from query_users"), 1) + eq_( + testing.db.execute( + text("select count(*) from query_users") + ).scalar(), + 1, + ) @testing.requires.savepoints def test_branch_savepoint_rollback(self): @@ -216,7 +226,12 @@ class TransactionTest(fixtures.TestBase): nested.rollback() assert connection.in_transaction() trans.commit() - eq_(connection.scalar("select count(*) from query_users"), 1) + eq_( + connection.exec_driver_sql( + "select count(*) from query_users" + ).scalar(), + 1, + ) finally: connection.close() @@ -232,7 +247,12 @@ class TransactionTest(fixtures.TestBase): branched.execute(users.insert(), user_id=2, user_name="user2") nested.rollback() assert not connection.in_transaction() - eq_(connection.scalar("select count(*) from query_users"), 1) + eq_( + connection.exec_driver_sql( + "select count(*) from query_users" + ).scalar(), + 1, + ) finally: connection.close() @@ -267,7 +287,12 @@ class TransactionTest(fixtures.TestBase): conn2 = connection.execution_options(dummy=True) conn2.execute(users.insert(), user_id=2, user_name="user2") transaction.rollback() - eq_(connection.scalar("select count(*) from query_users"), 0) + eq_( + connection.exec_driver_sql( + "select count(*) from query_users" + ).scalar(), + 0, + ) finally: connection.close() @@ -283,9 +308,12 @@ class TransactionTest(fixtures.TestBase): trans2.commit() transaction.rollback() self.assert_( - connection.scalar("select count(*) from " "query_users") == 0 + connection.exec_driver_sql( + "select count(*) from " "query_users" + ).scalar() + == 0 ) - result = connection.execute("select * from query_users") + result = connection.exec_driver_sql("select * from query_users") assert len(result.fetchall()) == 0 connection.close() @@ -301,7 +329,10 @@ class TransactionTest(fixtures.TestBase): assert not trans.is_active self.assert_( - connection.scalar("select count(*) from " "query_users") == 0 + connection.exec_driver_sql( + "select count(*) from " "query_users" + ).scalar() + == 0 ) trans = connection.begin() @@ -309,7 +340,10 @@ class TransactionTest(fixtures.TestBase): trans.__exit__(None, None, None) assert not trans.is_active self.assert_( - connection.scalar("select count(*) from " "query_users") == 1 + connection.exec_driver_sql( + "select count(*) from " "query_users" + ).scalar() + == 1 ) connection.close() @@ -328,9 +362,12 @@ class TransactionTest(fixtures.TestBase): transaction.commit() assert not connection.in_transaction() self.assert_( - connection.scalar("select count(*) from " "query_users") == 5 + connection.exec_driver_sql( + "select count(*) from " "query_users" + ).scalar() + == 5 ) - result = connection.execute("select * from query_users") + result = connection.exec_driver_sql("select * from query_users") assert len(result.fetchall()) == 5 connection.close() @@ -349,9 +386,12 @@ class TransactionTest(fixtures.TestBase): transaction.close() assert not connection.in_transaction() self.assert_( - connection.scalar("select count(*) from " "query_users") == 0 + connection.exec_driver_sql( + "select count(*) from " "query_users" + ).scalar() + == 0 ) - result = connection.execute("select * from query_users") + result = connection.exec_driver_sql("select * from query_users") assert len(result.fetchall()) == 0 connection.close() @@ -406,7 +446,7 @@ class TransactionTest(fixtures.TestBase): assert_raises_message( exc.InvalidRequestError, "This connection is on an inactive savepoint transaction.", - connection.execute, + connection.exec_driver_sql, "select 1", ) trans2.rollback() @@ -701,7 +741,7 @@ class AutoRollbackTest(fixtures.TestBase): test_needs_acid=True, ) users.create(conn1) - conn1.execute("select * from deadlock_users") + conn1.exec_driver_sql("select * from deadlock_users") conn1.close() # without auto-rollback in the connection pool's return() logic, @@ -732,20 +772,23 @@ class ExplicitAutoCommitTest(fixtures.TestBase): Column("id", Integer, primary_key=True), Column("data", String(100)), ) - metadata.create_all() - testing.db.execute( - "create function insert_foo(varchar) " - "returns integer as 'insert into foo(data) " - "values ($1);select 1;' language sql" - ) + with testing.db.connect() as conn: + metadata.create_all(conn) + conn.exec_driver_sql( + "create function insert_foo(varchar) " + "returns integer as 'insert into foo(data) " + "values ($1);select 1;' language sql" + ) def teardown(self): - foo.delete().execute().close() + with testing.db.connect() as conn: + conn.execute(foo.delete()) @classmethod def teardown_class(cls): - testing.db.execute("drop function insert_foo(varchar)") - metadata.drop_all() + with testing.db.connect() as conn: + conn.exec_driver_sql("drop function insert_foo(varchar)") + metadata.drop_all(conn) def test_control(self): |
