diff options
Diffstat (limited to 'test/engine/test_execute.py')
| -rw-r--r-- | test/engine/test_execute.py | 463 |
1 files changed, 206 insertions, 257 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index 150038a40..0b5b1b16d 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -50,27 +50,22 @@ from sqlalchemy.testing.util import gc_collect from sqlalchemy.testing.util import picklers -users, metadata, users_autoinc = None, None, None - - class SomeException(Exception): pass -class ExecuteTest(fixtures.TestBase): +class ExecuteTest(fixtures.TablesTest): __backend__ = True @classmethod - def setup_class(cls): - global users, users_autoinc, metadata - metadata = MetaData(testing.db) - users = Table( + def define_tables(cls, metadata): + Table( "users", metadata, Column("user_id", INT, primary_key=True, autoincrement=False), Column("user_name", VARCHAR(20)), ) - users_autoinc = Table( + Table( "users_autoinc", metadata, Column( @@ -78,15 +73,6 @@ class ExecuteTest(fixtures.TestBase): ), Column("user_name", VARCHAR(20)), ) - metadata.create_all() - - @engines.close_first - def teardown(self): - testing.db.execute(users.delete()) - - @classmethod - def teardown_class(cls): - metadata.drop_all() @testing.fails_on( "postgresql+pg8000", @@ -101,223 +87,176 @@ class ExecuteTest(fixtures.TestBase): ) conn = testing.db.connect() - result = conn.execution_options(no_parameters=True).scalar(stmt) + result = ( + conn.execution_options(no_parameters=True) + .exec_driver_sql(stmt) + .scalar() + ) eq_(result, "%") - @testing.fails_on_everything_except( - "firebird", "sqlite", "+pyodbc", "+mxodbc", "mysql+oursql" - ) - def test_raw_qmark(self): - def go(conn): - conn.execute( - "insert into users (user_id, user_name) " "values (?, ?)", - (1, "jack"), - ) - conn.execute( - "insert into users (user_id, user_name) " "values (?, ?)", - [2, "fred"], - ) - conn.execute( - "insert into users (user_id, user_name) " "values (?, ?)", - [3, "ed"], - [4, "horse"], - ) - conn.execute( - "insert into users (user_id, user_name) " "values (?, ?)", - (5, "barney"), - (6, "donkey"), - ) - conn.execute( - "insert into users (user_id, user_name) " "values (?, ?)", - 7, - "sally", - ) - res = conn.execute("select * from users order by user_id") - assert res.fetchall() == [ - (1, "jack"), - (2, "fred"), - (3, "ed"), - (4, "horse"), - (5, "barney"), - (6, "donkey"), - (7, "sally"), - ] - for multiparam, param in [ - (("jack", "fred"), {}), - ((["jack", "fred"],), {}), - ]: - res = conn.execute( - "select * from users where user_name=? or " - "user_name=? order by user_id", - *multiparam, - **param - ) - assert res.fetchall() == [(1, "jack"), (2, "fred")] - res = conn.execute("select * from users where user_name=?", "jack") - assert res.fetchall() == [(1, "jack")] - conn.execute("delete from users") - - go(testing.db) - conn = testing.db.connect() - try: - go(conn) - finally: - conn.close() - - # some psycopg2 versions bomb this. - @testing.fails_on_everything_except( - "mysql+mysqldb", - "mysql+pymysql", - "mysql+cymysql", - "mysql+mysqlconnector", - "postgresql", - ) - def test_raw_sprintf(self): - def go(conn): - conn.execute( - "insert into users (user_id, user_name) " "values (%s, %s)", - [1, "jack"], - ) - conn.execute( - "insert into users (user_id, user_name) " "values (%s, %s)", - [2, "ed"], - [3, "horse"], - ) - conn.execute( - "insert into users (user_id, user_name) " "values (%s, %s)", - 4, - "sally", - ) - conn.execute("insert into users (user_id) values (%s)", 5) - res = conn.execute("select * from users order by user_id") - assert res.fetchall() == [ - (1, "jack"), - (2, "ed"), - (3, "horse"), - (4, "sally"), - (5, None), - ] - for multiparam, param in [ - (("jack", "ed"), {}), - ((["jack", "ed"],), {}), - ]: - res = conn.execute( - "select * from users where user_name=%s or " - "user_name=%s order by user_id", - *multiparam, - **param - ) - assert res.fetchall() == [(1, "jack"), (2, "ed")] - res = conn.execute( - "select * from users where user_name=%s", "jack" - ) - assert res.fetchall() == [(1, "jack")] - - conn.execute("delete from users") - - go(testing.db) - conn = testing.db.connect() - try: - go(conn) - finally: - conn.close() - - # pyformat is supported for mysql, but skipping because a few driver - # versions have a bug that bombs out on this test. (1.2.2b3, - # 1.2.2c1, 1.2.2) - - @testing.skip_if(lambda: testing.against("mysql+mysqldb"), "db-api flaky") - @testing.fails_on_everything_except( - "postgresql+psycopg2", - "postgresql+psycopg2cffi", - "postgresql+pypostgresql", - "postgresql+pygresql", - "mysql+mysqlconnector", - "mysql+pymysql", - "mysql+cymysql", - "mssql+pymssql", - ) - def test_raw_python(self): - def go(conn): - conn.execute( - "insert into users (user_id, user_name) " - "values (%(id)s, %(name)s)", - {"id": 1, "name": "jack"}, - ) - conn.execute( - "insert into users (user_id, user_name) " - "values (%(id)s, %(name)s)", - {"id": 2, "name": "ed"}, - {"id": 3, "name": "horse"}, - ) - conn.execute( - "insert into users (user_id, user_name) " - "values (%(id)s, %(name)s)", - id=4, - name="sally", - ) - res = conn.execute("select * from users order by user_id") - assert res.fetchall() == [ - (1, "jack"), - (2, "ed"), - (3, "horse"), - (4, "sally"), - ] - conn.execute("delete from users") + def test_raw_positional_invalid(self, connection): + assert_raises_message( + tsa.exc.ArgumentError, + "List argument must consist only of tuples or dictionaries", + connection.exec_driver_sql, + "insert into users (user_id, user_name) " "values (?, ?)", + [2, "fred"], + ) - go(testing.db) - conn = testing.db.connect() - try: - go(conn) - finally: - conn.close() + assert_raises_message( + tsa.exc.ArgumentError, + "List argument must consist only of tuples or dictionaries", + connection.exec_driver_sql, + "insert into users (user_id, user_name) " "values (?, ?)", + [[3, "ed"], [4, "horse"]], + ) - @testing.fails_on_everything_except("sqlite", "oracle+cx_oracle") - def test_raw_named(self): - def go(conn): - conn.execute( - "insert into users (user_id, user_name) " - "values (:id, :name)", - {"id": 1, "name": "jack"}, - ) - conn.execute( - "insert into users (user_id, user_name) " - "values (:id, :name)", - {"id": 2, "name": "ed"}, - {"id": 3, "name": "horse"}, - ) - conn.execute( - "insert into users (user_id, user_name) " - "values (:id, :name)", - id=4, - name="sally", - ) - res = conn.execute("select * from users order by user_id") - assert res.fetchall() == [ - (1, "jack"), - (2, "ed"), - (3, "horse"), - (4, "sally"), - ] - conn.execute("delete from users") + def test_raw_named_invalid(self, connection): + assert_raises( + TypeError, + connection.exec_driver_sql, + "insert into users (user_id, user_name) " + "values (%(id)s, %(name)s)", + {"id": 2, "name": "ed"}, + {"id": 3, "name": "horse"}, + ) + assert_raises( + TypeError, + connection.exec_driver_sql, + "insert into users (user_id, user_name) " + "values (%(id)s, %(name)s)", + id=4, + name="sally", + ) - go(testing.db) - conn = testing.db.connect() - try: - go(conn) - finally: - conn.close() + @testing.requires.qmark_paramstyle + def test_raw_qmark(self, connection): + conn = connection + conn.exec_driver_sql( + "insert into users (user_id, user_name) " "values (?, ?)", + (1, "jack"), + ) + conn.exec_driver_sql( + "insert into users (user_id, user_name) " "values (?, ?)", + (2, "fred"), + ) + conn.exec_driver_sql( + "insert into users (user_id, user_name) " "values (?, ?)", + [(3, "ed"), (4, "horse")], + ) + conn.exec_driver_sql( + "insert into users (user_id, user_name) " "values (?, ?)", + [(5, "barney"), (6, "donkey")], + ) + conn.exec_driver_sql( + "insert into users (user_id, user_name) " "values (?, ?)", + (7, "sally"), + ) + res = conn.exec_driver_sql("select * from users order by user_id") + assert res.fetchall() == [ + (1, "jack"), + (2, "fred"), + (3, "ed"), + (4, "horse"), + (5, "barney"), + (6, "donkey"), + (7, "sally"), + ] + + res = conn.exec_driver_sql( + "select * from users where user_name=?", ("jack",) + ) + assert res.fetchall() == [(1, "jack")] + + @testing.requires.format_paramstyle + def test_raw_sprintf(self, connection): + conn = connection + conn.exec_driver_sql( + "insert into users (user_id, user_name) " "values (%s, %s)", + (1, "jack"), + ) + conn.exec_driver_sql( + "insert into users (user_id, user_name) " "values (%s, %s)", + [(2, "ed"), (3, "horse")], + ) + conn.exec_driver_sql( + "insert into users (user_id, user_name) " "values (%s, %s)", + (4, "sally"), + ) + conn.exec_driver_sql("insert into users (user_id) values (%s)", (5,)) + res = conn.exec_driver_sql("select * from users order by user_id") + assert res.fetchall() == [ + (1, "jack"), + (2, "ed"), + (3, "horse"), + (4, "sally"), + (5, None), + ] + + res = conn.exec_driver_sql( + "select * from users where user_name=%s", ("jack",) + ) + assert res.fetchall() == [(1, "jack")] + + @testing.requires.pyformat_paramstyle + def test_raw_python(self, connection): + conn = connection + conn.exec_driver_sql( + "insert into users (user_id, user_name) " + "values (%(id)s, %(name)s)", + {"id": 1, "name": "jack"}, + ) + conn.exec_driver_sql( + "insert into users (user_id, user_name) " + "values (%(id)s, %(name)s)", + [{"id": 2, "name": "ed"}, {"id": 3, "name": "horse"}], + ) + conn.exec_driver_sql( + "insert into users (user_id, user_name) " + "values (%(id)s, %(name)s)", + dict(id=4, name="sally"), + ) + res = conn.exec_driver_sql("select * from users order by user_id") + assert res.fetchall() == [ + (1, "jack"), + (2, "ed"), + (3, "horse"), + (4, "sally"), + ] + + @testing.requires.named_paramstyle + def test_raw_named(self, connection): + conn = connection + conn.exec_driver_sql( + "insert into users (user_id, user_name) " "values (:id, :name)", + {"id": 1, "name": "jack"}, + ) + conn.exec_driver_sql( + "insert into users (user_id, user_name) " "values (:id, :name)", + [{"id": 2, "name": "ed"}, {"id": 3, "name": "horse"}], + ) + conn.exec_driver_sql( + "insert into users (user_id, user_name) " "values (:id, :name)", + {"id": 4, "name": "sally"}, + ) + res = conn.exec_driver_sql("select * from users order by user_id") + assert res.fetchall() == [ + (1, "jack"), + (2, "ed"), + (3, "horse"), + (4, "sally"), + ] @testing.engines.close_open_connections def test_exception_wrapping_dbapi(self): conn = testing.db.connect() - for _c in testing.db, conn: - assert_raises_message( - tsa.exc.DBAPIError, - r"not_a_valid_statement", - _c.execute, - "not_a_valid_statement", - ) + # engine does not have exec_driver_sql + assert_raises_message( + tsa.exc.DBAPIError, + r"not_a_valid_statement", + conn.exec_driver_sql, + "not_a_valid_statement", + ) @testing.requires.sqlite def test_exception_wrapping_non_dbapi_error(self): @@ -334,7 +273,10 @@ class ExecuteTest(fixtures.TestBase): ) assert_raises_message( - TypeError, "I'm not a DBAPI error", c.execute, "select " + TypeError, + "I'm not a DBAPI error", + c.exec_driver_sql, + "select ", ) eq_(is_disconnect.call_count, 0) @@ -359,7 +301,7 @@ class ExecuteTest(fixtures.TestBase): ): with testing.db.connect() as conn: assert_raises( - tsa.exc.OperationalError, conn.execute, "select 1" + tsa.exc.OperationalError, conn.exec_driver_sql, "select 1" ) def test_exception_wrapping_non_dbapi_statement(self): @@ -378,11 +320,8 @@ class ExecuteTest(fixtures.TestBase): ) _go(testing.db) - conn = testing.db.connect() - try: + with testing.db.connect() as conn: _go(conn) - finally: - conn.close() def test_not_an_executable(self): for obj in ( @@ -574,6 +513,7 @@ class ExecuteTest(fixtures.TestBase): def test_empty_insert(self): """test that execute() interprets [] as a list with no params""" + users_autoinc = self.tables.users_autoinc testing.db.execute( users_autoinc.insert().values(user_name=bindparam("name", None)), @@ -885,7 +825,7 @@ class CompiledCacheTest(fixtures.TestBase): cached_conn.execute(ins, {"user_name": "u3"}) eq_(compile_mock.call_count, 1) assert len(cache) == 1 - eq_(conn.execute("select count(*) from users").scalar(), 3) + eq_(conn.exec_driver_sql("select count(*) from users").scalar(), 3) @testing.only_on( ["sqlite", "mysql", "postgresql"], @@ -924,7 +864,7 @@ class CompiledCacheTest(fixtures.TestBase): cached_conn.execute(ins, {"photo_blob": blob}) eq_(compile_mock.call_count, 1) eq_(len(cache), 1) - eq_(conn.execute("select count(*) from photo").scalar(), 1) + eq_(conn.exec_driver_sql("select count(*) from photo").scalar(), 1) del blob @@ -1427,7 +1367,7 @@ class EngineEventsTest(fixtures.TestBase): with e1.connect() as conn: - result = conn.execute(stmt) + result = conn.exec_driver_sql(stmt) ctx = result.context eq_( @@ -1496,7 +1436,7 @@ class EngineEventsTest(fixtures.TestBase): t1.insert().execute(c1=5, c2="some data") t1.insert().execute(c1=6) eq_( - engine.execute("select * from t1").fetchall(), + engine.execute(text("select * from t1")).fetchall(), [(5, "some data"), (6, "foo")], ) finally: @@ -1987,7 +1927,7 @@ class HandleErrorTest(fixtures.TestBase): with engine.connect() as conn: try: - conn.execute("SELECT FOO FROM I_DONT_EXIST") + conn.exec_driver_sql("SELECT FOO FROM I_DONT_EXIST") assert False except tsa.exc.DBAPIError as e: ctx = canary.mock_calls[0][1][0] @@ -2018,20 +1958,20 @@ class HandleErrorTest(fixtures.TestBase): assert_raises_message( MyException, "my exception", - conn.execute, + conn.exec_driver_sql, "SELECT 'ERROR ONE' FROM I_DONT_EXIST", ) # case 2: return the DBAPI exception we're given; # no wrapping should occur assert_raises( conn.dialect.dbapi.Error, - conn.execute, + conn.exec_driver_sql, "SELECT 'ERROR TWO' FROM I_DONT_EXIST", ) # case 3: normal wrapping assert_raises( tsa.exc.DBAPIError, - conn.execute, + conn.exec_driver_sql, "SELECT 'ERROR THREE' FROM I_DONT_EXIST", ) @@ -2080,7 +2020,7 @@ class HandleErrorTest(fixtures.TestBase): assert_raises_message( MyException2, "my exception chained", - conn.execute, + conn.exec_driver_sql, "SELECT 'ERROR ONE' FROM I_DONT_EXIST", ) eq_(patched.call_count, 1) @@ -2090,7 +2030,7 @@ class HandleErrorTest(fixtures.TestBase): ) as patched: assert_raises( MyException1, - conn.execute, + conn.exec_driver_sql, "SELECT 'ERROR TWO' FROM I_DONT_EXIST", ) eq_(patched.call_count, 1) @@ -2102,7 +2042,7 @@ class HandleErrorTest(fixtures.TestBase): # by err2 assert_raises( MyException1, - conn.execute, + conn.exec_driver_sql, "SELECT 'ERROR THREE' FROM I_DONT_EXIST", ) eq_(patched.call_count, 1) @@ -2112,7 +2052,7 @@ class HandleErrorTest(fixtures.TestBase): ) as patched: assert_raises( tsa.exc.DBAPIError, - conn.execute, + conn.exec_driver_sql, "SELECT 'ERROR FIVE' FROM I_DONT_EXIST", ) eq_(patched.call_count, 1) @@ -2123,7 +2063,7 @@ class HandleErrorTest(fixtures.TestBase): assert_raises_message( MyException3, "my exception short circuit", - conn.execute, + conn.exec_driver_sql, "SELECT 'ERROR FOUR' FROM I_DONT_EXIST", ) eq_(patched.call_count, 1) @@ -2145,7 +2085,7 @@ class HandleErrorTest(fixtures.TestBase): assert_raises_message( tsa.exc.OperationalError, "rollback failed", - conn.execute, + conn.exec_driver_sql, "insert into i_dont_exist (x) values ('y')", ) @@ -2200,7 +2140,10 @@ class HandleErrorTest(fixtures.TestBase): ) assert_raises_message( - TypeError, "I'm not a DBAPI error", c.execute, "select " + TypeError, + "I'm not a DBAPI error", + c.exec_driver_sql, + "select ", ) ctx = listener.mock_calls[0][1][0] eq_(ctx.statement, "select ") @@ -2223,13 +2166,17 @@ class HandleErrorTest(fixtures.TestBase): with engine.connect() as conn: assert_raises( tsa.exc.DBAPIError, - conn.execution_options(skip_user_error_events=True).execute, + conn.execution_options( + skip_user_error_events=True + ).exec_driver_sql, "SELECT ERROR_ONE FROM I_DONT_EXIST", ) assert_raises( MyException1, - conn.execution_options(skip_user_error_events=False).execute, + conn.execution_options( + skip_user_error_events=False + ).exec_driver_sql, "SELECT ERROR_ONE FROM I_DONT_EXIST", ) @@ -2246,7 +2193,7 @@ class HandleErrorTest(fixtures.TestBase): with engine.connect() as c: try: - c.execute("SELECT x FROM nonexistent") + c.exec_driver_sql("SELECT x FROM nonexistent") assert False except tsa.exc.StatementError as st: eq_(st.connection_invalidated, evt_value) @@ -2287,7 +2234,7 @@ class HandleErrorTest(fixtures.TestBase): with engine.connect() as c: target_crec = c.connection._connection_record try: - c.execute("SELECT x FROM nonexistent") + c.exec_driver_sql("SELECT x FROM nonexistent") assert False except tsa.exc.StatementError as st: eq_(st.connection_invalidated, True) @@ -2644,7 +2591,9 @@ class DialectEventTest(fixtures.TestBase): def _test_do_execute(self, retval): with self._run_test(retval) as (conn, m1): - result = conn.execute("insert into table foo", {"foo": "bar"}) + result = conn.exec_driver_sql( + "insert into table foo", {"foo": "bar"} + ) self._assert( retval, m1.do_execute, @@ -2661,7 +2610,7 @@ class DialectEventTest(fixtures.TestBase): def _test_do_executemany(self, retval): with self._run_test(retval) as (conn, m1): - result = conn.execute( + result = conn.exec_driver_sql( "insert into table foo", [{"foo": "bar"}, {"foo": "bar"}] ) self._assert( @@ -2680,9 +2629,9 @@ class DialectEventTest(fixtures.TestBase): def _test_do_execute_no_params(self, retval): with self._run_test(retval) as (conn, m1): - result = conn.execution_options(no_parameters=True).execute( - "insert into table foo" - ) + result = conn.execution_options( + no_parameters=True + ).exec_driver_sql("insert into table foo") self._assert( retval, m1.do_execute_no_params, @@ -2840,7 +2789,7 @@ class AutocommitTextTest(fixtures.TestBase): engine.dialect.dbapi = dbapi with engine.connect() as conn: - conn.execute("%s something table something" % keyword) + conn.exec_driver_sql("%s something table something" % keyword) if expected: eq_(dbapi.connect().mock_calls, [call.cursor(), call.commit()]) |
