summaryrefslogtreecommitdiff
path: root/test/engine/test_execute.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine/test_execute.py')
-rw-r--r--test/engine/test_execute.py463
1 files changed, 206 insertions, 257 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index 150038a40..0b5b1b16d 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -50,27 +50,22 @@ from sqlalchemy.testing.util import gc_collect
from sqlalchemy.testing.util import picklers
-users, metadata, users_autoinc = None, None, None
-
-
class SomeException(Exception):
pass
-class ExecuteTest(fixtures.TestBase):
+class ExecuteTest(fixtures.TablesTest):
__backend__ = True
@classmethod
- def setup_class(cls):
- global users, users_autoinc, metadata
- metadata = MetaData(testing.db)
- users = Table(
+ def define_tables(cls, metadata):
+ Table(
"users",
metadata,
Column("user_id", INT, primary_key=True, autoincrement=False),
Column("user_name", VARCHAR(20)),
)
- users_autoinc = Table(
+ Table(
"users_autoinc",
metadata,
Column(
@@ -78,15 +73,6 @@ class ExecuteTest(fixtures.TestBase):
),
Column("user_name", VARCHAR(20)),
)
- metadata.create_all()
-
- @engines.close_first
- def teardown(self):
- testing.db.execute(users.delete())
-
- @classmethod
- def teardown_class(cls):
- metadata.drop_all()
@testing.fails_on(
"postgresql+pg8000",
@@ -101,223 +87,176 @@ class ExecuteTest(fixtures.TestBase):
)
conn = testing.db.connect()
- result = conn.execution_options(no_parameters=True).scalar(stmt)
+ result = (
+ conn.execution_options(no_parameters=True)
+ .exec_driver_sql(stmt)
+ .scalar()
+ )
eq_(result, "%")
- @testing.fails_on_everything_except(
- "firebird", "sqlite", "+pyodbc", "+mxodbc", "mysql+oursql"
- )
- def test_raw_qmark(self):
- def go(conn):
- conn.execute(
- "insert into users (user_id, user_name) " "values (?, ?)",
- (1, "jack"),
- )
- conn.execute(
- "insert into users (user_id, user_name) " "values (?, ?)",
- [2, "fred"],
- )
- conn.execute(
- "insert into users (user_id, user_name) " "values (?, ?)",
- [3, "ed"],
- [4, "horse"],
- )
- conn.execute(
- "insert into users (user_id, user_name) " "values (?, ?)",
- (5, "barney"),
- (6, "donkey"),
- )
- conn.execute(
- "insert into users (user_id, user_name) " "values (?, ?)",
- 7,
- "sally",
- )
- res = conn.execute("select * from users order by user_id")
- assert res.fetchall() == [
- (1, "jack"),
- (2, "fred"),
- (3, "ed"),
- (4, "horse"),
- (5, "barney"),
- (6, "donkey"),
- (7, "sally"),
- ]
- for multiparam, param in [
- (("jack", "fred"), {}),
- ((["jack", "fred"],), {}),
- ]:
- res = conn.execute(
- "select * from users where user_name=? or "
- "user_name=? order by user_id",
- *multiparam,
- **param
- )
- assert res.fetchall() == [(1, "jack"), (2, "fred")]
- res = conn.execute("select * from users where user_name=?", "jack")
- assert res.fetchall() == [(1, "jack")]
- conn.execute("delete from users")
-
- go(testing.db)
- conn = testing.db.connect()
- try:
- go(conn)
- finally:
- conn.close()
-
- # some psycopg2 versions bomb this.
- @testing.fails_on_everything_except(
- "mysql+mysqldb",
- "mysql+pymysql",
- "mysql+cymysql",
- "mysql+mysqlconnector",
- "postgresql",
- )
- def test_raw_sprintf(self):
- def go(conn):
- conn.execute(
- "insert into users (user_id, user_name) " "values (%s, %s)",
- [1, "jack"],
- )
- conn.execute(
- "insert into users (user_id, user_name) " "values (%s, %s)",
- [2, "ed"],
- [3, "horse"],
- )
- conn.execute(
- "insert into users (user_id, user_name) " "values (%s, %s)",
- 4,
- "sally",
- )
- conn.execute("insert into users (user_id) values (%s)", 5)
- res = conn.execute("select * from users order by user_id")
- assert res.fetchall() == [
- (1, "jack"),
- (2, "ed"),
- (3, "horse"),
- (4, "sally"),
- (5, None),
- ]
- for multiparam, param in [
- (("jack", "ed"), {}),
- ((["jack", "ed"],), {}),
- ]:
- res = conn.execute(
- "select * from users where user_name=%s or "
- "user_name=%s order by user_id",
- *multiparam,
- **param
- )
- assert res.fetchall() == [(1, "jack"), (2, "ed")]
- res = conn.execute(
- "select * from users where user_name=%s", "jack"
- )
- assert res.fetchall() == [(1, "jack")]
-
- conn.execute("delete from users")
-
- go(testing.db)
- conn = testing.db.connect()
- try:
- go(conn)
- finally:
- conn.close()
-
- # pyformat is supported for mysql, but skipping because a few driver
- # versions have a bug that bombs out on this test. (1.2.2b3,
- # 1.2.2c1, 1.2.2)
-
- @testing.skip_if(lambda: testing.against("mysql+mysqldb"), "db-api flaky")
- @testing.fails_on_everything_except(
- "postgresql+psycopg2",
- "postgresql+psycopg2cffi",
- "postgresql+pypostgresql",
- "postgresql+pygresql",
- "mysql+mysqlconnector",
- "mysql+pymysql",
- "mysql+cymysql",
- "mssql+pymssql",
- )
- def test_raw_python(self):
- def go(conn):
- conn.execute(
- "insert into users (user_id, user_name) "
- "values (%(id)s, %(name)s)",
- {"id": 1, "name": "jack"},
- )
- conn.execute(
- "insert into users (user_id, user_name) "
- "values (%(id)s, %(name)s)",
- {"id": 2, "name": "ed"},
- {"id": 3, "name": "horse"},
- )
- conn.execute(
- "insert into users (user_id, user_name) "
- "values (%(id)s, %(name)s)",
- id=4,
- name="sally",
- )
- res = conn.execute("select * from users order by user_id")
- assert res.fetchall() == [
- (1, "jack"),
- (2, "ed"),
- (3, "horse"),
- (4, "sally"),
- ]
- conn.execute("delete from users")
+ def test_raw_positional_invalid(self, connection):
+ assert_raises_message(
+ tsa.exc.ArgumentError,
+ "List argument must consist only of tuples or dictionaries",
+ connection.exec_driver_sql,
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ [2, "fred"],
+ )
- go(testing.db)
- conn = testing.db.connect()
- try:
- go(conn)
- finally:
- conn.close()
+ assert_raises_message(
+ tsa.exc.ArgumentError,
+ "List argument must consist only of tuples or dictionaries",
+ connection.exec_driver_sql,
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ [[3, "ed"], [4, "horse"]],
+ )
- @testing.fails_on_everything_except("sqlite", "oracle+cx_oracle")
- def test_raw_named(self):
- def go(conn):
- conn.execute(
- "insert into users (user_id, user_name) "
- "values (:id, :name)",
- {"id": 1, "name": "jack"},
- )
- conn.execute(
- "insert into users (user_id, user_name) "
- "values (:id, :name)",
- {"id": 2, "name": "ed"},
- {"id": 3, "name": "horse"},
- )
- conn.execute(
- "insert into users (user_id, user_name) "
- "values (:id, :name)",
- id=4,
- name="sally",
- )
- res = conn.execute("select * from users order by user_id")
- assert res.fetchall() == [
- (1, "jack"),
- (2, "ed"),
- (3, "horse"),
- (4, "sally"),
- ]
- conn.execute("delete from users")
+ def test_raw_named_invalid(self, connection):
+ assert_raises(
+ TypeError,
+ connection.exec_driver_sql,
+ "insert into users (user_id, user_name) "
+ "values (%(id)s, %(name)s)",
+ {"id": 2, "name": "ed"},
+ {"id": 3, "name": "horse"},
+ )
+ assert_raises(
+ TypeError,
+ connection.exec_driver_sql,
+ "insert into users (user_id, user_name) "
+ "values (%(id)s, %(name)s)",
+ id=4,
+ name="sally",
+ )
- go(testing.db)
- conn = testing.db.connect()
- try:
- go(conn)
- finally:
- conn.close()
+ @testing.requires.qmark_paramstyle
+ def test_raw_qmark(self, connection):
+ conn = connection
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ (1, "jack"),
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ (2, "fred"),
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ [(3, "ed"), (4, "horse")],
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ [(5, "barney"), (6, "donkey")],
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ (7, "sally"),
+ )
+ res = conn.exec_driver_sql("select * from users order by user_id")
+ assert res.fetchall() == [
+ (1, "jack"),
+ (2, "fred"),
+ (3, "ed"),
+ (4, "horse"),
+ (5, "barney"),
+ (6, "donkey"),
+ (7, "sally"),
+ ]
+
+ res = conn.exec_driver_sql(
+ "select * from users where user_name=?", ("jack",)
+ )
+ assert res.fetchall() == [(1, "jack")]
+
+ @testing.requires.format_paramstyle
+ def test_raw_sprintf(self, connection):
+ conn = connection
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (%s, %s)",
+ (1, "jack"),
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (%s, %s)",
+ [(2, "ed"), (3, "horse")],
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (%s, %s)",
+ (4, "sally"),
+ )
+ conn.exec_driver_sql("insert into users (user_id) values (%s)", (5,))
+ res = conn.exec_driver_sql("select * from users order by user_id")
+ assert res.fetchall() == [
+ (1, "jack"),
+ (2, "ed"),
+ (3, "horse"),
+ (4, "sally"),
+ (5, None),
+ ]
+
+ res = conn.exec_driver_sql(
+ "select * from users where user_name=%s", ("jack",)
+ )
+ assert res.fetchall() == [(1, "jack")]
+
+ @testing.requires.pyformat_paramstyle
+ def test_raw_python(self, connection):
+ conn = connection
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) "
+ "values (%(id)s, %(name)s)",
+ {"id": 1, "name": "jack"},
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) "
+ "values (%(id)s, %(name)s)",
+ [{"id": 2, "name": "ed"}, {"id": 3, "name": "horse"}],
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) "
+ "values (%(id)s, %(name)s)",
+ dict(id=4, name="sally"),
+ )
+ res = conn.exec_driver_sql("select * from users order by user_id")
+ assert res.fetchall() == [
+ (1, "jack"),
+ (2, "ed"),
+ (3, "horse"),
+ (4, "sally"),
+ ]
+
+ @testing.requires.named_paramstyle
+ def test_raw_named(self, connection):
+ conn = connection
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (:id, :name)",
+ {"id": 1, "name": "jack"},
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (:id, :name)",
+ [{"id": 2, "name": "ed"}, {"id": 3, "name": "horse"}],
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (:id, :name)",
+ {"id": 4, "name": "sally"},
+ )
+ res = conn.exec_driver_sql("select * from users order by user_id")
+ assert res.fetchall() == [
+ (1, "jack"),
+ (2, "ed"),
+ (3, "horse"),
+ (4, "sally"),
+ ]
@testing.engines.close_open_connections
def test_exception_wrapping_dbapi(self):
conn = testing.db.connect()
- for _c in testing.db, conn:
- assert_raises_message(
- tsa.exc.DBAPIError,
- r"not_a_valid_statement",
- _c.execute,
- "not_a_valid_statement",
- )
+ # engine does not have exec_driver_sql
+ assert_raises_message(
+ tsa.exc.DBAPIError,
+ r"not_a_valid_statement",
+ conn.exec_driver_sql,
+ "not_a_valid_statement",
+ )
@testing.requires.sqlite
def test_exception_wrapping_non_dbapi_error(self):
@@ -334,7 +273,10 @@ class ExecuteTest(fixtures.TestBase):
)
assert_raises_message(
- TypeError, "I'm not a DBAPI error", c.execute, "select "
+ TypeError,
+ "I'm not a DBAPI error",
+ c.exec_driver_sql,
+ "select ",
)
eq_(is_disconnect.call_count, 0)
@@ -359,7 +301,7 @@ class ExecuteTest(fixtures.TestBase):
):
with testing.db.connect() as conn:
assert_raises(
- tsa.exc.OperationalError, conn.execute, "select 1"
+ tsa.exc.OperationalError, conn.exec_driver_sql, "select 1"
)
def test_exception_wrapping_non_dbapi_statement(self):
@@ -378,11 +320,8 @@ class ExecuteTest(fixtures.TestBase):
)
_go(testing.db)
- conn = testing.db.connect()
- try:
+ with testing.db.connect() as conn:
_go(conn)
- finally:
- conn.close()
def test_not_an_executable(self):
for obj in (
@@ -574,6 +513,7 @@ class ExecuteTest(fixtures.TestBase):
def test_empty_insert(self):
"""test that execute() interprets [] as a list with no params"""
+ users_autoinc = self.tables.users_autoinc
testing.db.execute(
users_autoinc.insert().values(user_name=bindparam("name", None)),
@@ -885,7 +825,7 @@ class CompiledCacheTest(fixtures.TestBase):
cached_conn.execute(ins, {"user_name": "u3"})
eq_(compile_mock.call_count, 1)
assert len(cache) == 1
- eq_(conn.execute("select count(*) from users").scalar(), 3)
+ eq_(conn.exec_driver_sql("select count(*) from users").scalar(), 3)
@testing.only_on(
["sqlite", "mysql", "postgresql"],
@@ -924,7 +864,7 @@ class CompiledCacheTest(fixtures.TestBase):
cached_conn.execute(ins, {"photo_blob": blob})
eq_(compile_mock.call_count, 1)
eq_(len(cache), 1)
- eq_(conn.execute("select count(*) from photo").scalar(), 1)
+ eq_(conn.exec_driver_sql("select count(*) from photo").scalar(), 1)
del blob
@@ -1427,7 +1367,7 @@ class EngineEventsTest(fixtures.TestBase):
with e1.connect() as conn:
- result = conn.execute(stmt)
+ result = conn.exec_driver_sql(stmt)
ctx = result.context
eq_(
@@ -1496,7 +1436,7 @@ class EngineEventsTest(fixtures.TestBase):
t1.insert().execute(c1=5, c2="some data")
t1.insert().execute(c1=6)
eq_(
- engine.execute("select * from t1").fetchall(),
+ engine.execute(text("select * from t1")).fetchall(),
[(5, "some data"), (6, "foo")],
)
finally:
@@ -1987,7 +1927,7 @@ class HandleErrorTest(fixtures.TestBase):
with engine.connect() as conn:
try:
- conn.execute("SELECT FOO FROM I_DONT_EXIST")
+ conn.exec_driver_sql("SELECT FOO FROM I_DONT_EXIST")
assert False
except tsa.exc.DBAPIError as e:
ctx = canary.mock_calls[0][1][0]
@@ -2018,20 +1958,20 @@ class HandleErrorTest(fixtures.TestBase):
assert_raises_message(
MyException,
"my exception",
- conn.execute,
+ conn.exec_driver_sql,
"SELECT 'ERROR ONE' FROM I_DONT_EXIST",
)
# case 2: return the DBAPI exception we're given;
# no wrapping should occur
assert_raises(
conn.dialect.dbapi.Error,
- conn.execute,
+ conn.exec_driver_sql,
"SELECT 'ERROR TWO' FROM I_DONT_EXIST",
)
# case 3: normal wrapping
assert_raises(
tsa.exc.DBAPIError,
- conn.execute,
+ conn.exec_driver_sql,
"SELECT 'ERROR THREE' FROM I_DONT_EXIST",
)
@@ -2080,7 +2020,7 @@ class HandleErrorTest(fixtures.TestBase):
assert_raises_message(
MyException2,
"my exception chained",
- conn.execute,
+ conn.exec_driver_sql,
"SELECT 'ERROR ONE' FROM I_DONT_EXIST",
)
eq_(patched.call_count, 1)
@@ -2090,7 +2030,7 @@ class HandleErrorTest(fixtures.TestBase):
) as patched:
assert_raises(
MyException1,
- conn.execute,
+ conn.exec_driver_sql,
"SELECT 'ERROR TWO' FROM I_DONT_EXIST",
)
eq_(patched.call_count, 1)
@@ -2102,7 +2042,7 @@ class HandleErrorTest(fixtures.TestBase):
# by err2
assert_raises(
MyException1,
- conn.execute,
+ conn.exec_driver_sql,
"SELECT 'ERROR THREE' FROM I_DONT_EXIST",
)
eq_(patched.call_count, 1)
@@ -2112,7 +2052,7 @@ class HandleErrorTest(fixtures.TestBase):
) as patched:
assert_raises(
tsa.exc.DBAPIError,
- conn.execute,
+ conn.exec_driver_sql,
"SELECT 'ERROR FIVE' FROM I_DONT_EXIST",
)
eq_(patched.call_count, 1)
@@ -2123,7 +2063,7 @@ class HandleErrorTest(fixtures.TestBase):
assert_raises_message(
MyException3,
"my exception short circuit",
- conn.execute,
+ conn.exec_driver_sql,
"SELECT 'ERROR FOUR' FROM I_DONT_EXIST",
)
eq_(patched.call_count, 1)
@@ -2145,7 +2085,7 @@ class HandleErrorTest(fixtures.TestBase):
assert_raises_message(
tsa.exc.OperationalError,
"rollback failed",
- conn.execute,
+ conn.exec_driver_sql,
"insert into i_dont_exist (x) values ('y')",
)
@@ -2200,7 +2140,10 @@ class HandleErrorTest(fixtures.TestBase):
)
assert_raises_message(
- TypeError, "I'm not a DBAPI error", c.execute, "select "
+ TypeError,
+ "I'm not a DBAPI error",
+ c.exec_driver_sql,
+ "select ",
)
ctx = listener.mock_calls[0][1][0]
eq_(ctx.statement, "select ")
@@ -2223,13 +2166,17 @@ class HandleErrorTest(fixtures.TestBase):
with engine.connect() as conn:
assert_raises(
tsa.exc.DBAPIError,
- conn.execution_options(skip_user_error_events=True).execute,
+ conn.execution_options(
+ skip_user_error_events=True
+ ).exec_driver_sql,
"SELECT ERROR_ONE FROM I_DONT_EXIST",
)
assert_raises(
MyException1,
- conn.execution_options(skip_user_error_events=False).execute,
+ conn.execution_options(
+ skip_user_error_events=False
+ ).exec_driver_sql,
"SELECT ERROR_ONE FROM I_DONT_EXIST",
)
@@ -2246,7 +2193,7 @@ class HandleErrorTest(fixtures.TestBase):
with engine.connect() as c:
try:
- c.execute("SELECT x FROM nonexistent")
+ c.exec_driver_sql("SELECT x FROM nonexistent")
assert False
except tsa.exc.StatementError as st:
eq_(st.connection_invalidated, evt_value)
@@ -2287,7 +2234,7 @@ class HandleErrorTest(fixtures.TestBase):
with engine.connect() as c:
target_crec = c.connection._connection_record
try:
- c.execute("SELECT x FROM nonexistent")
+ c.exec_driver_sql("SELECT x FROM nonexistent")
assert False
except tsa.exc.StatementError as st:
eq_(st.connection_invalidated, True)
@@ -2644,7 +2591,9 @@ class DialectEventTest(fixtures.TestBase):
def _test_do_execute(self, retval):
with self._run_test(retval) as (conn, m1):
- result = conn.execute("insert into table foo", {"foo": "bar"})
+ result = conn.exec_driver_sql(
+ "insert into table foo", {"foo": "bar"}
+ )
self._assert(
retval,
m1.do_execute,
@@ -2661,7 +2610,7 @@ class DialectEventTest(fixtures.TestBase):
def _test_do_executemany(self, retval):
with self._run_test(retval) as (conn, m1):
- result = conn.execute(
+ result = conn.exec_driver_sql(
"insert into table foo", [{"foo": "bar"}, {"foo": "bar"}]
)
self._assert(
@@ -2680,9 +2629,9 @@ class DialectEventTest(fixtures.TestBase):
def _test_do_execute_no_params(self, retval):
with self._run_test(retval) as (conn, m1):
- result = conn.execution_options(no_parameters=True).execute(
- "insert into table foo"
- )
+ result = conn.execution_options(
+ no_parameters=True
+ ).exec_driver_sql("insert into table foo")
self._assert(
retval,
m1.do_execute_no_params,
@@ -2840,7 +2789,7 @@ class AutocommitTextTest(fixtures.TestBase):
engine.dialect.dbapi = dbapi
with engine.connect() as conn:
- conn.execute("%s something table something" % keyword)
+ conn.exec_driver_sql("%s something table something" % keyword)
if expected:
eq_(dbapi.connect().mock_calls, [call.cursor(), call.commit()])