summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authormike bayer <mike_mp@zzzcomputing.com>2020-03-22 15:04:59 +0000
committerGerrit Code Review <gerrit@bbpush.zzzcomputing.com>2020-03-22 15:04:59 +0000
commit7ff3e3f2e73e7f17c0a352dacf5c0ccfa2ef7be9 (patch)
tree553fd8f751a75dc5d614c32065817f8da6206968 /test
parentf2a817dd7cde50988839750b9c2464675fb4f069 (diff)
parent9ec75882203b2c01aa1d362f939e21ebcd188e8d (diff)
downloadsqlalchemy-7ff3e3f2e73e7f17c0a352dacf5c0ccfa2ef7be9.tar.gz
Merge "Deprecate plain string in execute and introduce `exec_driver_sql`"
Diffstat (limited to 'test')
-rw-r--r--test/aaa_profiling/test_resultset.py38
-rw-r--r--test/dialect/mssql/test_engine.py26
-rw-r--r--test/dialect/mssql/test_query.py26
-rw-r--r--test/dialect/mssql/test_reflection.py59
-rw-r--r--test/dialect/mssql/test_types.py4
-rw-r--r--test/dialect/mysql/test_dialect.py8
-rw-r--r--test/dialect/mysql/test_for_update.py6
-rw-r--r--test/dialect/mysql/test_reflection.py51
-rw-r--r--test/dialect/mysql/test_types.py4
-rw-r--r--test/dialect/oracle/test_dialect.py60
-rw-r--r--test/dialect/oracle/test_reflection.py74
-rw-r--r--test/dialect/oracle/test_types.py59
-rw-r--r--test/dialect/postgresql/test_dialect.py24
-rw-r--r--test/dialect/postgresql/test_reflection.py111
-rw-r--r--test/dialect/test_firebird.py68
-rw-r--r--test/dialect/test_sqlite.py180
-rw-r--r--test/engine/test_bind.py5
-rw-r--r--test/engine/test_deprecations.py299
-rw-r--r--test/engine/test_execute.py463
-rw-r--r--test/engine/test_logging.py44
-rw-r--r--test/engine/test_reconnect.py8
-rw-r--r--test/engine/test_reflection.py24
-rw-r--r--test/engine/test_transaction.py99
-rw-r--r--test/ext/test_horizontal_shard.py2
-rw-r--r--test/orm/test_bind.py10
-rw-r--r--test/orm/test_session.py51
-rw-r--r--test/orm/test_transaction.py14
-rw-r--r--test/perf/invalidate_stresstest.py6
-rw-r--r--test/requirements.py74
-rw-r--r--test/sql/test_defaults.py12
-rw-r--r--test/sql/test_from_linter.py2
-rw-r--r--test/sql/test_quote.py30
-rw-r--r--test/sql/test_resultset.py47
-rw-r--r--test/sql/test_returning.py4
-rw-r--r--test/sql/test_type_expressions.py8
-rw-r--r--test/sql/test_types.py58
36 files changed, 1291 insertions, 767 deletions
diff --git a/test/aaa_profiling/test_resultset.py b/test/aaa_profiling/test_resultset.py
index 73a1a8b6f..fee4daeb2 100644
--- a/test/aaa_profiling/test_resultset.py
+++ b/test/aaa_profiling/test_resultset.py
@@ -69,16 +69,17 @@ class ResultSetTest(fixtures.TestBase, AssertsExecutionResults):
)
# warm up type caches
- t.select().execute().fetchall()
- t2.select().execute().fetchall()
- testing.db.execute(
- "SELECT %s FROM table1"
- % (", ".join("field%d" % fnum for fnum in range(NUM_FIELDS)))
- ).fetchall()
- testing.db.execute(
- "SELECT %s FROM table2"
- % (", ".join("field%d" % fnum for fnum in range(NUM_FIELDS)))
- ).fetchall()
+ with testing.db.connect() as conn:
+ conn.execute(t.select()).fetchall()
+ conn.execute(t2.select()).fetchall()
+ conn.exec_driver_sql(
+ "SELECT %s FROM table1"
+ % (", ".join("field%d" % fnum for fnum in range(NUM_FIELDS)))
+ ).fetchall()
+ conn.exec_driver_sql(
+ "SELECT %s FROM table2"
+ % (", ".join("field%d" % fnum for fnum in range(NUM_FIELDS)))
+ ).fetchall()
def teardown(self):
metadata.drop_all()
@@ -96,14 +97,16 @@ class ResultSetTest(fixtures.TestBase, AssertsExecutionResults):
stmt = "SELECT %s FROM table1" % (
", ".join("field%d" % fnum for fnum in range(NUM_FIELDS))
)
- [tuple(row) for row in testing.db.execute(stmt).fetchall()]
+ with testing.db.connect() as conn:
+ [tuple(row) for row in conn.exec_driver_sql(stmt).fetchall()]
@profiling.function_call_count(variance=0.10)
def test_raw_unicode(self):
stmt = "SELECT %s FROM table2" % (
", ".join("field%d" % fnum for fnum in range(NUM_FIELDS))
)
- [tuple(row) for row in testing.db.execute(stmt).fetchall()]
+ with testing.db.connect() as conn:
+ [tuple(row) for row in conn.exec_driver_sql(stmt).fetchall()]
def test_contains_doesnt_compile(self):
row = t.select().execute().first()
@@ -126,11 +129,11 @@ class ExecutionTest(fixtures.TestBase):
e = create_engine("sqlite://")
c = e.connect()
# ensure initial connect activities complete
- c.execute("select 1")
+ c.exec_driver_sql("select 1")
@profiling.function_call_count()
def go():
- c.execute("select 1")
+ c.exec_driver_sql("select 1")
try:
go()
@@ -141,11 +144,14 @@ class ExecutionTest(fixtures.TestBase):
# create an engine without any instrumentation.
e = create_engine("sqlite://")
# ensure initial connect activities complete
- e.execute("select 1")
+
+ with e.connect() as conn:
+ conn.exec_driver_sql("select 1")
@profiling.function_call_count()
def go():
- e.execute("select 1")
+ with e.connect() as conn:
+ conn.exec_driver_sql("select 1")
go()
diff --git a/test/dialect/mssql/test_engine.py b/test/dialect/mssql/test_engine.py
index 257e41bf8..97e924fed 100644
--- a/test/dialect/mssql/test_engine.py
+++ b/test/dialect/mssql/test_engine.py
@@ -400,7 +400,15 @@ class FastExecutemanyTest(fixtures.TestBase):
class VersionDetectionTest(fixtures.TestBase):
- def test_pymssql_version(self):
+ @testing.fixture
+ def mock_conn_scalar(self):
+ return lambda text: Mock(
+ exec_driver_sql=Mock(
+ return_value=Mock(scalar=Mock(return_value=text))
+ )
+ )
+
+ def test_pymssql_version(self, mock_conn_scalar):
dialect = pymssql.MSDialect_pymssql()
for vers in [
@@ -410,13 +418,13 @@ class VersionDetectionTest(fixtures.TestBase):
"Microsoft SQL Azure (RTM) - 11.0.9216.62 \n"
"Jul 18 2014 22:00:21 \nCopyright (c) Microsoft Corporation",
]:
- conn = Mock(scalar=Mock(return_value=vers))
+ conn = mock_conn_scalar(vers)
eq_(dialect._get_server_version_info(conn), (11, 0, 9216, 62))
- def test_pyodbc_version_productversion(self):
+ def test_pyodbc_version_productversion(self, mock_conn_scalar):
dialect = pyodbc.MSDialect_pyodbc()
- conn = Mock(scalar=Mock(return_value="11.0.9216.62"))
+ conn = mock_conn_scalar("11.0.9216.62")
eq_(dialect._get_server_version_info(conn), (11, 0, 9216, 62))
def test_pyodbc_version_fallback(self):
@@ -429,8 +437,12 @@ class VersionDetectionTest(fixtures.TestBase):
("Not SQL Server Version 10.5", (5,)),
]:
conn = Mock(
- scalar=Mock(
- side_effect=exc.DBAPIError("stmt", "params", None)
+ exec_driver_sql=Mock(
+ return_value=Mock(
+ scalar=Mock(
+ side_effect=exc.DBAPIError("stmt", "params", None)
+ )
+ )
),
connection=Mock(getinfo=Mock(return_value=vers)),
)
@@ -462,7 +474,7 @@ class RealIsolationLevelTest(fixtures.TestBase):
with testing.db.connect() as c:
c.execution_options(isolation_level=value)
- c.execute("SELECT TOP 10 * FROM test")
+ c.exec_driver_sql("SELECT TOP 10 * FROM test")
eq_(
testing.db.dialect.get_isolation_level(c.connection), value
diff --git a/test/dialect/mssql/test_query.py b/test/dialect/mssql/test_query.py
index fc3352d4e..2ad6c161d 100644
--- a/test/dialect/mssql/test_query.py
+++ b/test/dialect/mssql/test_query.py
@@ -325,7 +325,7 @@ class QueryTest(testing.AssertsExecutionResults, fixtures.TestBase):
)
meta.create_all()
con = testing.db.connect()
- con.execute(
+ con.exec_driver_sql(
"""create trigger paj on t1 for insert as
insert into t2 (descr) select descr from inserted"""
)
@@ -339,7 +339,7 @@ class QueryTest(testing.AssertsExecutionResults, fixtures.TestBase):
finally:
tr.commit()
- con.execute("""drop trigger paj""")
+ con.exec_driver_sql("""drop trigger paj""")
meta.drop_all()
@testing.provide_metadata
@@ -431,11 +431,11 @@ class QueryTest(testing.AssertsExecutionResults, fixtures.TestBase):
)
meta.bind = eng
con = eng.connect()
- con.execute("create schema paj")
+ con.exec_driver_sql("create schema paj")
@event.listens_for(meta, "after_drop")
def cleanup(target, connection, **kw):
- connection.execute("drop schema paj")
+ connection.exec_driver_sql("drop schema paj")
tbl = Table(
"test", meta, Column("id", Integer, primary_key=True), schema="paj"
@@ -450,11 +450,11 @@ class QueryTest(testing.AssertsExecutionResults, fixtures.TestBase):
eng = engines.testing_engine(options=dict(legacy_schema_aliasing=True))
meta.bind = eng
con = eng.connect()
- con.execute("create schema paj")
+ con.exec_driver_sql("create schema paj")
@event.listens_for(meta, "after_drop")
def cleanup(target, connection, **kw):
- connection.execute("drop schema paj")
+ connection.exec_driver_sql("drop schema paj")
tbl = Table(
"test", meta, Column("id", Integer, primary_key=True), schema="paj"
@@ -489,11 +489,11 @@ class QueryTest(testing.AssertsExecutionResults, fixtures.TestBase):
)
meta.bind = eng
con = eng.connect()
- con.execute("create schema paj")
+ con.exec_driver_sql("create schema paj")
@event.listens_for(meta, "after_drop")
def cleanup(target, connection, **kw):
- connection.execute("drop schema paj")
+ connection.exec_driver_sql("drop schema paj")
tbl = Table(
"test", meta, Column("id", Integer, primary_key=True), schema="paj"
@@ -510,11 +510,11 @@ class QueryTest(testing.AssertsExecutionResults, fixtures.TestBase):
eng = engines.testing_engine(options=dict(legacy_schema_aliasing=True))
meta.bind = eng
con = eng.connect()
- con.execute("create schema paj")
+ con.exec_driver_sql("create schema paj")
@event.listens_for(meta, "after_drop")
def cleanup(target, connection, **kw):
- connection.execute("drop schema paj")
+ connection.exec_driver_sql("drop schema paj")
tbl = Table(
"test", meta, Column("id", Integer, primary_key=True), schema="paj"
@@ -548,7 +548,9 @@ def full_text_search_missing():
try:
connection = testing.db.connect()
try:
- connection.execute("CREATE FULLTEXT CATALOG Catalog AS " "DEFAULT")
+ connection.exec_driver_sql(
+ "CREATE FULLTEXT CATALOG Catalog AS " "DEFAULT"
+ )
return False
except Exception:
return True
@@ -621,7 +623,7 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL):
def teardown_class(cls):
metadata.drop_all()
connection = testing.db.connect()
- connection.execute("DROP FULLTEXT CATALOG Catalog")
+ connection.exec_driver_sql("DROP FULLTEXT CATALOG Catalog")
connection.close()
def test_expression(self):
diff --git a/test/dialect/mssql/test_reflection.py b/test/dialect/mssql/test_reflection.py
index 352bc637e..5328513e4 100644
--- a/test/dialect/mssql/test_reflection.py
+++ b/test/dialect/mssql/test_reflection.py
@@ -144,11 +144,10 @@ class ReflectionTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL):
@testing.provide_metadata
def test_skip_types(self):
metadata = self.metadata
- testing.db.execute(
- """
- create table foo (id integer primary key, data xml)
- """
- )
+ with testing.db.connect() as c:
+ c.exec_driver_sql(
+ "create table foo (id integer primary key, data xml)"
+ )
with mock.patch.object(
testing.db.dialect, "ischema_names", {"int": mssql.INTEGER}
):
@@ -236,8 +235,9 @@ class ReflectionTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL):
)
metadata.create_all()
- dbname = testing.db.scalar("select db_name()")
- owner = testing.db.scalar("SELECT user_name()")
+ with testing.db.connect() as c:
+ dbname = c.exec_driver_sql("select db_name()").scalar()
+ owner = c.exec_driver_sql("SELECT user_name()").scalar()
referred_schema = "%(dbname)s.%(owner)s" % {
"dbname": dbname,
"owner": owner,
@@ -439,11 +439,21 @@ class OwnerPlusDBTest(fixtures.TestBase):
schema, owner = base._owner_plus_db(dialect, identifier)
mock_connection = mock.Mock(
- dialect=dialect, scalar=mock.Mock(return_value="my_db")
+ dialect=dialect,
+ exec_driver_sql=mock.Mock(
+ return_value=mock.Mock(scalar=mock.Mock(return_value="my_db"))
+ ),
)
mock_lambda = mock.Mock()
base._switch_db(schema, mock_connection, mock_lambda, "x", y="bar")
- eq_(mock_connection.mock_calls, [mock.call.scalar("select db_name()")])
+ eq_(
+ mock_connection.mock_calls,
+ [mock.call.exec_driver_sql("select db_name()")],
+ )
+ eq_(
+ mock_connection.exec_driver_sql.return_value.mock_calls,
+ [mock.call.scalar()],
+ ),
eq_(mock_lambda.mock_calls, [mock.call("x", y="bar")])
def test_owner_database_pairs_switch_for_different_db(self):
@@ -453,17 +463,24 @@ class OwnerPlusDBTest(fixtures.TestBase):
schema, owner = base._owner_plus_db(dialect, identifier)
mock_connection = mock.Mock(
- dialect=dialect, scalar=mock.Mock(return_value="my_db")
+ dialect=dialect,
+ exec_driver_sql=mock.Mock(
+ return_value=mock.Mock(scalar=mock.Mock(return_value="my_db"))
+ ),
)
mock_lambda = mock.Mock()
base._switch_db(schema, mock_connection, mock_lambda, "x", y="bar")
eq_(
mock_connection.mock_calls,
[
- mock.call.scalar("select db_name()"),
- mock.call.execute("use my_other_db"),
- mock.call.execute("use my_db"),
+ mock.call.exec_driver_sql("select db_name()"),
+ mock.call.exec_driver_sql("use my_other_db"),
+ mock.call.exec_driver_sql("use my_db"),
],
+ eq_(
+ mock_connection.exec_driver_sql.return_value.mock_calls,
+ [mock.call.scalar()],
+ ),
)
eq_(mock_lambda.mock_calls, [mock.call("x", y="bar")])
@@ -496,7 +513,11 @@ class OwnerPlusDBTest(fixtures.TestBase):
mock_connection = mock.Mock(
dialect=dialect,
- scalar=mock.Mock(return_value="Some ] Database"),
+ exec_driver_sql=mock.Mock(
+ return_value=mock.Mock(
+ scalar=mock.Mock(return_value="Some ] Database")
+ )
+ ),
)
mock_lambda = mock.Mock()
base._switch_db(schema, mock_connection, mock_lambda, "x", y="bar")
@@ -506,9 +527,13 @@ class OwnerPlusDBTest(fixtures.TestBase):
eq_(
mock_connection.mock_calls,
[
- mock.call.scalar("select db_name()"),
- mock.call.execute(use_stmt),
- mock.call.execute("use [Some Database]"),
+ mock.call.exec_driver_sql("select db_name()"),
+ mock.call.exec_driver_sql(use_stmt),
+ mock.call.exec_driver_sql("use [Some Database]"),
],
)
+ eq_(
+ mock_connection.exec_driver_sql.return_value.mock_calls,
+ [mock.call.scalar()],
+ )
eq_(mock_lambda.mock_calls, [mock.call("x", y="bar")])
diff --git a/test/dialect/mssql/test_types.py b/test/dialect/mssql/test_types.py
index ba7ed4667..9904408a3 100644
--- a/test/dialect/mssql/test_types.py
+++ b/test/dialect/mssql/test_types.py
@@ -177,7 +177,7 @@ class RowVersionTest(fixtures.TablesTest):
with testing.db.connect() as conn:
conn.execute(t.insert().values(data="foo"))
- last_ts_1 = conn.scalar("SELECT @@DBTS")
+ last_ts_1 = conn.exec_driver_sql("SELECT @@DBTS").scalar()
if convert_int:
last_ts_1 = int(codecs.encode(last_ts_1, "hex"), 16)
@@ -187,7 +187,7 @@ class RowVersionTest(fixtures.TablesTest):
conn.execute(
t.update().values(data="bar").where(t.c.data == "foo")
)
- last_ts_2 = conn.scalar("SELECT @@DBTS")
+ last_ts_2 = conn.exec_driver_sql("SELECT @@DBTS").scalar()
if convert_int:
last_ts_2 = int(codecs.encode(last_ts_2, "hex"), 16)
diff --git a/test/dialect/mysql/test_dialect.py b/test/dialect/mysql/test_dialect.py
index aac1f6489..c641411e1 100644
--- a/test/dialect/mysql/test_dialect.py
+++ b/test/dialect/mysql/test_dialect.py
@@ -184,9 +184,9 @@ class DialectTest(fixtures.TestBase):
statement = "SELECT 1 FROM DUAL WHERE 1=0"
return real_exec(self, statement, *args, **kw)
- real_exec = engine._connection_cls._execute_text
+ real_exec = engine._connection_cls.exec_driver_sql
with mock.patch.object(
- engine._connection_cls, "_execute_text", my_execute
+ engine._connection_cls, "exec_driver_sql", my_execute
):
with expect_warnings(
"Could not retrieve SQL_MODE; please ensure the "
@@ -198,10 +198,10 @@ class DialectTest(fixtures.TestBase):
c = testing.db.connect().execution_options(
isolation_level="AUTOCOMMIT"
)
- assert c.execute("SELECT @@autocommit;").scalar()
+ assert c.exec_driver_sql("SELECT @@autocommit;").scalar()
c = c.execution_options(isolation_level="READ COMMITTED")
- assert not c.execute("SELECT @@autocommit;").scalar()
+ assert not c.exec_driver_sql("SELECT @@autocommit;").scalar()
def test_isolation_level(self):
values = [
diff --git a/test/dialect/mysql/test_for_update.py b/test/dialect/mysql/test_for_update.py
index 3537c3220..1e8df858f 100644
--- a/test/dialect/mysql/test_for_update.py
+++ b/test/dialect/mysql/test_for_update.py
@@ -60,7 +60,7 @@ class MySQLForUpdateLockingTest(fixtures.DeclarativeMappedTest):
@contextlib.contextmanager
def run_test(self):
connection = testing.db.connect()
- connection.execute("set innodb_lock_wait_timeout=1")
+ connection.exec_driver_sql("set innodb_lock_wait_timeout=1")
main_trans = connection.begin()
try:
yield Session(bind=connection)
@@ -71,7 +71,7 @@ class MySQLForUpdateLockingTest(fixtures.DeclarativeMappedTest):
def _assert_a_is_locked(self, should_be_locked):
A = self.classes.A
with testing.db.begin() as alt_trans:
- alt_trans.execute("set innodb_lock_wait_timeout=1")
+ alt_trans.exec_driver_sql("set innodb_lock_wait_timeout=1")
# set x/y > 10
try:
alt_trans.execute(update(A).values(x=15, y=19))
@@ -84,7 +84,7 @@ class MySQLForUpdateLockingTest(fixtures.DeclarativeMappedTest):
def _assert_b_is_locked(self, should_be_locked):
B = self.classes.B
with testing.db.begin() as alt_trans:
- alt_trans.execute("set innodb_lock_wait_timeout=1")
+ alt_trans.exec_driver_sql("set innodb_lock_wait_timeout=1")
# set x/y > 10
try:
alt_trans.execute(update(B).values(x=15, y=19))
diff --git a/test/dialect/mysql/test_reflection.py b/test/dialect/mysql/test_reflection.py
index 1ab646f3c..d5ff1314b 100644
--- a/test/dialect/mysql/test_reflection.py
+++ b/test/dialect/mysql/test_reflection.py
@@ -493,12 +493,14 @@ class ReflectionTest(fixtures.TestBase, AssertsCompiledSQL):
self.metadata.create_all()
with testing.db.connect() as conn:
- conn.execute("CREATE VIEW v1 AS SELECT * FROM x")
- conn.execute("CREATE ALGORITHM=MERGE VIEW v2 AS SELECT * FROM x")
- conn.execute(
+ conn.exec_driver_sql("CREATE VIEW v1 AS SELECT * FROM x")
+ conn.exec_driver_sql(
+ "CREATE ALGORITHM=MERGE VIEW v2 AS SELECT * FROM x"
+ )
+ conn.exec_driver_sql(
"CREATE ALGORITHM=UNDEFINED VIEW v3 AS SELECT * FROM x"
)
- conn.execute(
+ conn.exec_driver_sql(
"CREATE DEFINER=CURRENT_USER VIEW v4 AS SELECT * FROM x"
)
@@ -506,7 +508,7 @@ class ReflectionTest(fixtures.TestBase, AssertsCompiledSQL):
def cleanup(*arg, **kw):
with testing.db.connect() as conn:
for v in ["v1", "v2", "v3", "v4"]:
- conn.execute("DROP VIEW %s" % v)
+ conn.exec_driver_sql("DROP VIEW %s" % v)
insp = inspect(testing.db)
for v in ["v1", "v2", "v3", "v4"]:
@@ -523,15 +525,17 @@ class ReflectionTest(fixtures.TestBase, AssertsCompiledSQL):
@event.listens_for(self.metadata, "before_drop")
def cleanup(*arg, **kw):
with testing.db.connect() as conn:
- conn.execute("DROP TABLE IF EXISTS test_t1")
- conn.execute("DROP TABLE IF EXISTS test_t2")
- conn.execute("DROP VIEW IF EXISTS test_v")
+ conn.exec_driver_sql("DROP TABLE IF EXISTS test_t1")
+ conn.exec_driver_sql("DROP TABLE IF EXISTS test_t2")
+ conn.exec_driver_sql("DROP VIEW IF EXISTS test_v")
with testing.db.connect() as conn:
- conn.execute("CREATE TABLE test_t1 (id INTEGER)")
- conn.execute("CREATE TABLE test_t2 (id INTEGER)")
- conn.execute("CREATE VIEW test_v AS SELECT id FROM test_t1")
- conn.execute("DROP TABLE test_t1")
+ conn.exec_driver_sql("CREATE TABLE test_t1 (id INTEGER)")
+ conn.exec_driver_sql("CREATE TABLE test_t2 (id INTEGER)")
+ conn.exec_driver_sql(
+ "CREATE VIEW test_v AS SELECT id FROM test_t1"
+ )
+ conn.exec_driver_sql("DROP TABLE test_t1")
m = MetaData()
with expect_warnings(
@@ -567,10 +571,10 @@ class ReflectionTest(fixtures.TestBase, AssertsCompiledSQL):
# this is ideally one table, but older MySQL versions choke
# on the multiple TIMESTAMP columns
-
- row = testing.db.execute(
- "show variables like '%%explicit_defaults_for_timestamp%%'"
- ).first()
+ with testing.db.connect() as c:
+ row = c.exec_driver_sql(
+ "show variables like '%%explicit_defaults_for_timestamp%%'"
+ ).first()
explicit_defaults_for_timestamp = row[1].lower() in ("on", "1", "true")
reflected = []
@@ -591,14 +595,15 @@ class ReflectionTest(fixtures.TestBase, AssertsCompiledSQL):
):
Table("nn_t%d" % idx, meta) # to allow DROP
- testing.db.execute(
- """
- CREATE TABLE nn_t%d (
- %s
+ with testing.db.connect() as c:
+ c.exec_driver_sql(
+ """
+ CREATE TABLE nn_t%d (
+ %s
+ )
+ """
+ % (idx, ", \n".join(cols))
)
- """
- % (idx, ", \n".join(cols))
- )
reflected.extend(
{
diff --git a/test/dialect/mysql/test_types.py b/test/dialect/mysql/test_types.py
index ee626b082..010165ac2 100644
--- a/test/dialect/mysql/test_types.py
+++ b/test/dialect/mysql/test_types.py
@@ -697,12 +697,12 @@ class TypeRoundTripTest(fixtures.TestBase, AssertsExecutionResults):
return dt
with testing.db.begin() as conn:
- now = conn.scalar("select now()")
+ now = conn.exec_driver_sql("select now()").scalar()
conn.execute(ts_table.insert(), {"t1": now, "t2": None})
conn.execute(ts_table.insert(), {"t1": None, "t2": None})
conn.execute(ts_table.insert(), {"t2": None})
- new_now = conn.scalar("select now()")
+ new_now = conn.exec_driver_sql("select now()").scalar()
eq_(
[
diff --git a/test/dialect/oracle/test_dialect.py b/test/dialect/oracle/test_dialect.py
index 20c6336b8..c2983bfe0 100644
--- a/test/dialect/oracle/test_dialect.py
+++ b/test/dialect/oracle/test_dialect.py
@@ -331,19 +331,20 @@ class OutParamTest(fixtures.TestBase, AssertsExecutionResults):
@classmethod
def setup_class(cls):
- testing.db.execute(
- """
- create or replace procedure foo(x_in IN number, x_out OUT number,
- y_out OUT number, z_out OUT varchar) IS
- retval number;
- begin
- retval := 6;
- x_out := 10;
- y_out := x_in * 15;
- z_out := NULL;
- end;
- """
- )
+ with testing.db.connect() as c:
+ c.exec_driver_sql(
+ """
+create or replace procedure foo(x_in IN number, x_out OUT number,
+y_out OUT number, z_out OUT varchar) IS
+retval number;
+begin
+ retval := 6;
+ x_out := 10;
+ y_out := x_in * 15;
+ z_out := NULL;
+end;
+ """
+ )
def test_out_params(self):
result = testing.db.execute(
@@ -362,7 +363,7 @@ class OutParamTest(fixtures.TestBase, AssertsExecutionResults):
@classmethod
def teardown_class(cls):
- testing.db.execute("DROP PROCEDURE foo")
+ testing.db.execute(text("DROP PROCEDURE foo"))
class QuotedBindRoundTripTest(fixtures.TestBase):
@@ -511,7 +512,9 @@ class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL):
dialect = self._dialect((12, 2, 0))
conn = mock.Mock(
- execute=mock.Mock(return_value=mock.Mock(scalar=lambda: "12.2.0"))
+ exec_driver_sql=mock.Mock(
+ return_value=mock.Mock(scalar=lambda: "12.2.0")
+ )
)
dialect.initialize(conn)
eq_(dialect.server_version_info, (12, 2, 0))
@@ -524,7 +527,9 @@ class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL):
dialect = self._dialect((12, 2, 0))
conn = mock.Mock(
- execute=mock.Mock(return_value=mock.Mock(scalar=lambda: "12.2.0"))
+ exec_driver_sql=mock.Mock(
+ return_value=mock.Mock(scalar=lambda: "12.2.0")
+ )
)
dialect.initialize(conn)
eq_(dialect.server_version_info, (12, 2, 0))
@@ -540,7 +545,7 @@ class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL):
dialect = self._dialect((11, 2, 0))
conn = mock.Mock(
- execute=mock.Mock(return_value=mock.Mock(scalar="11.0.0"))
+ exec_driver_sql=mock.Mock(return_value=mock.Mock(scalar="11.0.0"))
)
dialect.initialize(conn)
eq_(dialect.server_version_info, (11, 2, 0))
@@ -553,7 +558,9 @@ class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL):
dialect = self._dialect((12, 2, 0))
conn = mock.Mock(
- execute=mock.Mock(return_value=mock.Mock(scalar=lambda: "11.0.0"))
+ exec_driver_sql=mock.Mock(
+ return_value=mock.Mock(scalar=lambda: "11.0.0")
+ )
)
dialect.initialize(conn)
eq_(dialect.server_version_info, (12, 2, 0))
@@ -571,7 +578,7 @@ class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL):
)
conn = mock.Mock(
- execute=mock.Mock(return_value=mock.Mock(scalar=c122))
+ exec_driver_sql=mock.Mock(return_value=mock.Mock(scalar=c122))
)
dialect.initialize(conn)
eq_(dialect.server_version_info, (12, 2, 0))
@@ -590,7 +597,7 @@ class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL):
return "12.thisiscrap.0"
conn = mock.Mock(
- execute=mock.Mock(return_value=mock.Mock(scalar=c122))
+ exec_driver_sql=mock.Mock(return_value=mock.Mock(scalar=c122))
)
dialect.initialize(conn)
eq_(dialect.server_version_info, (12, 2, 0))
@@ -609,12 +616,13 @@ class ExecuteTest(fixtures.TestBase):
__backend__ = True
def test_basic(self):
- eq_(
- testing.db.execute(
- "/*+ this is a comment */ SELECT 1 FROM " "DUAL"
- ).fetchall(),
- [(1,)],
- )
+ with testing.db.connect() as conn:
+ eq_(
+ conn.exec_driver_sql(
+ "/*+ this is a comment */ SELECT 1 FROM " "DUAL"
+ ).fetchall(),
+ [(1,)],
+ )
def test_sequences_are_integers(self):
seq = Sequence("foo_seq")
diff --git a/test/dialect/oracle/test_reflection.py b/test/dialect/oracle/test_reflection.py
index 6359c5c17..3d1361adb 100644
--- a/test/dialect/oracle/test_reflection.py
+++ b/test/dialect/oracle/test_reflection.py
@@ -32,6 +32,11 @@ from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
+def exec_sql(engine, sql, *args, **kwargs):
+ with engine.connect() as conn:
+ return conn.exec_driver_sql(sql, *args, **kwargs)
+
+
class MultiSchemaTest(fixtures.TestBase, AssertsCompiledSQL):
__only_on__ = "oracle"
__backend__ = True
@@ -79,7 +84,7 @@ grant references on %(test_schema)s.child to public;
% {"test_schema": testing.config.test_schema}
).split(";"):
if stmt.strip():
- testing.db.execute(stmt)
+ exec_sql(testing.db, stmt)
@classmethod
def teardown_class(cls):
@@ -97,7 +102,7 @@ drop synonym %(test_schema)s.local_table;
% {"test_schema": testing.config.test_schema}
).split(";"):
if stmt.strip():
- testing.db.execute(stmt)
+ exec_sql(testing.db, stmt)
@testing.provide_metadata
def test_create_same_names_explicit_schema(self):
@@ -221,11 +226,12 @@ drop synonym %(test_schema)s.local_table;
)
def test_reflect_local_to_remote(self):
- testing.db.execute(
+ exec_sql(
+ testing.db,
"CREATE TABLE localtable (id INTEGER "
"PRIMARY KEY, parent_id INTEGER REFERENCES "
"%(test_schema)s.parent(id))"
- % {"test_schema": testing.config.test_schema}
+ % {"test_schema": testing.config.test_schema},
)
try:
meta = MetaData(testing.db)
@@ -242,7 +248,7 @@ drop synonym %(test_schema)s.local_table;
parent.join(lcl)
).execute().fetchall()
finally:
- testing.db.execute("DROP TABLE localtable")
+ exec_sql(testing.db, "DROP TABLE localtable")
def test_reflect_alt_owner_implicit(self):
meta = MetaData(testing.db)
@@ -264,10 +270,11 @@ drop synonym %(test_schema)s.local_table;
).execute().fetchall()
def test_reflect_alt_owner_synonyms(self):
- testing.db.execute(
+ exec_sql(
+ testing.db,
"CREATE TABLE localtable (id INTEGER "
"PRIMARY KEY, parent_id INTEGER REFERENCES "
- "%s.ptable(id))" % testing.config.test_schema
+ "%s.ptable(id))" % testing.config.test_schema,
)
try:
meta = MetaData(testing.db)
@@ -286,7 +293,7 @@ drop synonym %(test_schema)s.local_table;
parent.join(lcl)
).execute().fetchall()
finally:
- testing.db.execute("DROP TABLE localtable")
+ exec_sql(testing.db, "DROP TABLE localtable")
def test_reflect_remote_synonyms(self):
meta = MetaData(testing.db)
@@ -364,18 +371,19 @@ class SystemTableTablenamesTest(fixtures.TestBase):
__backend__ = True
def setup(self):
- testing.db.execute("create table my_table (id integer)")
- testing.db.execute(
- "create global temporary table my_temp_table (id integer)"
+ exec_sql(testing.db, "create table my_table (id integer)")
+ exec_sql(
+ testing.db,
+ "create global temporary table my_temp_table (id integer)",
)
- testing.db.execute(
- "create table foo_table (id integer) tablespace SYSTEM"
+ exec_sql(
+ testing.db, "create table foo_table (id integer) tablespace SYSTEM"
)
def teardown(self):
- testing.db.execute("drop table my_temp_table")
- testing.db.execute("drop table my_table")
- testing.db.execute("drop table foo_table")
+ exec_sql(testing.db, "drop table my_temp_table")
+ exec_sql(testing.db, "drop table my_table")
+ exec_sql(testing.db, "drop table foo_table")
def test_table_names_no_system(self):
insp = inspect(testing.db)
@@ -404,7 +412,8 @@ class DontReflectIOTTest(fixtures.TestBase):
__backend__ = True
def setup(self):
- testing.db.execute(
+ exec_sql(
+ testing.db,
"""
CREATE TABLE admin_docindex(
token char(20),
@@ -416,11 +425,11 @@ class DontReflectIOTTest(fixtures.TestBase):
TABLESPACE users
PCTTHRESHOLD 20
OVERFLOW TABLESPACE users
- """
+ """,
)
def teardown(self):
- testing.db.execute("drop table admin_docindex")
+ exec_sql(testing.db, "drop table admin_docindex")
def test_reflect_all(self):
m = MetaData(testing.db)
@@ -443,8 +452,9 @@ class UnsupportedIndexReflectTest(fixtures.TestBase):
)
metadata.create_all()
- testing.db.execute(
- "CREATE INDEX DATA_IDX ON " "TEST_INDEX_REFLECT (UPPER(DATA))"
+ exec_sql(
+ testing.db,
+ "CREATE INDEX DATA_IDX ON " "TEST_INDEX_REFLECT (UPPER(DATA))",
)
m2 = MetaData(testing.db)
Table("test_index_reflect", m2, autoload=True)
@@ -452,9 +462,10 @@ class UnsupportedIndexReflectTest(fixtures.TestBase):
def all_tables_compression_missing():
try:
- testing.db.execute("SELECT compression FROM all_tables")
- if "Enterprise Edition" not in testing.db.scalar(
- "select * from v$version"
+ exec_sql(testing.db, "SELECT compression FROM all_tables")
+ if (
+ "Enterprise Edition"
+ not in exec_sql(testing.db, "select * from v$version").scalar()
):
return True
return False
@@ -464,9 +475,10 @@ def all_tables_compression_missing():
def all_tables_compress_for_missing():
try:
- testing.db.execute("SELECT compress_for FROM all_tables")
- if "Enterprise Edition" not in testing.db.scalar(
- "select * from v$version"
+ exec_sql(testing.db, "SELECT compress_for FROM all_tables")
+ if (
+ "Enterprise Edition"
+ not in exec_sql(testing.db, "select * from v$version").scalar()
):
return True
return False
@@ -628,11 +640,11 @@ class DBLinkReflectionTest(fixtures.TestBase):
# when accessing via a different username as we do with the
# multiprocess test suite, so testing here is minimal
with testing.db.connect() as conn:
- conn.execute(
+ conn.exec_driver_sql(
"create table test_table "
"(id integer primary key, data varchar2(50))"
)
- conn.execute(
+ conn.exec_driver_sql(
"create synonym test_table_syn "
"for test_table@%s" % cls.dblink
)
@@ -640,8 +652,8 @@ class DBLinkReflectionTest(fixtures.TestBase):
@classmethod
def teardown_class(cls):
with testing.db.connect() as conn:
- conn.execute("drop synonym test_table_syn")
- conn.execute("drop table test_table")
+ conn.exec_driver_sql("drop synonym test_table_syn")
+ conn.exec_driver_sql("drop table test_table")
def test_reflection(self):
"""test the resolution of the synonym/dblink. """
diff --git a/test/dialect/oracle/test_types.py b/test/dialect/oracle/test_types.py
index 23c73a231..70c8f20f2 100644
--- a/test/dialect/oracle/test_types.py
+++ b/test/dialect/oracle/test_types.py
@@ -51,6 +51,11 @@ from sqlalchemy.util import py2k
from sqlalchemy.util import u
+def exec_sql(engine, sql, *args, **kwargs):
+ with engine.connect() as conn:
+ return conn.exec_driver_sql(sql, *args, **kwargs)
+
+
class DialectTypesTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = oracle.OracleDialect()
@@ -371,8 +376,8 @@ class TypesTest(fixtures.TestBase):
)
eq_(
- testing.db.execute(
- "select numericcol from t1 order by intcol"
+ exec_sql(
+ testing.db, "select numericcol from t1 order by intcol"
).fetchall(),
[(float("inf"),), (float("-inf"),)],
)
@@ -403,8 +408,8 @@ class TypesTest(fixtures.TestBase):
)
eq_(
- testing.db.execute(
- "select numericcol from t1 order by intcol"
+ exec_sql(
+ testing.db, "select numericcol from t1 order by intcol"
).fetchall(),
[(decimal.Decimal("Infinity"),), (decimal.Decimal("-Infinity"),)],
)
@@ -439,8 +444,8 @@ class TypesTest(fixtures.TestBase):
eq_(
[
tuple(str(col) for col in row)
- for row in testing.db.execute(
- "select numericcol from t1 order by intcol"
+ for row in exec_sql(
+ testing.db, "select numericcol from t1 order by intcol"
)
],
[("nan",), ("nan",)],
@@ -474,8 +479,8 @@ class TypesTest(fixtures.TestBase):
)
eq_(
- testing.db.execute(
- "select numericcol from t1 order by intcol"
+ exec_sql(
+ testing.db, "select numericcol from t1 order by intcol"
).fetchall(),
[(decimal.Decimal("NaN"),), (decimal.Decimal("NaN"),)],
)
@@ -515,7 +520,7 @@ class TypesTest(fixtures.TestBase):
stmt = "SELECT idata, ndata, ndata2, nidata, fdata FROM foo"
- row = testing.db.execute(stmt).fetchall()[0]
+ row = exec_sql(testing.db, stmt).fetchall()[0]
eq_(
[type(x) for x in row],
[int, decimal.Decimal, decimal.Decimal, int, float],
@@ -551,7 +556,7 @@ class TypesTest(fixtures.TestBase):
(SELECT CAST((SELECT fdata FROM foo) AS FLOAT) FROM DUAL) AS fdata
FROM dual
"""
- row = testing.db.execute(stmt).fetchall()[0]
+ row = exec_sql(testing.db, stmt).fetchall()[0]
eq_(
[type(x) for x in row],
[int, decimal.Decimal, int, int, decimal.Decimal],
@@ -608,7 +613,7 @@ class TypesTest(fixtures.TestBase):
)
WHERE ROWNUM >= 0) anon_1
"""
- row = testing.db.execute(stmt).fetchall()[0]
+ row = exec_sql(testing.db, stmt).fetchall()[0]
eq_(
[type(x) for x in row],
[int, decimal.Decimal, int, int, decimal.Decimal],
@@ -660,7 +665,7 @@ class TypesTest(fixtures.TestBase):
engine = testing_engine(options=dict(coerce_to_decimal=False))
# raw SQL no longer coerces to decimal
- value = engine.scalar("SELECT 5.66 FROM DUAL")
+ value = exec_sql(engine, "SELECT 5.66 FROM DUAL").scalar()
assert isinstance(value, float)
# explicit typing still *does* coerce to decimal
@@ -673,7 +678,7 @@ class TypesTest(fixtures.TestBase):
assert isinstance(value, decimal.Decimal)
# default behavior is raw SQL coerces to decimal
- value = testing.db.scalar("SELECT 5.66 FROM DUAL")
+ value = exec_sql(testing.db, "SELECT 5.66 FROM DUAL").scalar()
assert isinstance(value, decimal.Decimal)
@testing.combinations(
@@ -713,7 +718,7 @@ class TypesTest(fixtures.TestBase):
cx_oracle_result = cursor.fetchone()[0]
cursor.close()
- sqla_result = conn.scalar(stmt)
+ sqla_result = conn.exec_driver_sql(stmt).scalar()
eq_(sqla_result, cx_oracle_result)
@@ -723,10 +728,10 @@ class TypesTest(fixtures.TestBase):
)
def test_coerce_to_unicode(self):
engine = testing_engine(options=dict(coerce_to_unicode=False))
- value = engine.scalar("SELECT 'hello' FROM DUAL")
+ value = exec_sql(engine, "SELECT 'hello' FROM DUAL").scalar()
assert isinstance(value, util.binary_type)
- value = testing.db.scalar("SELECT 'hello' FROM DUAL")
+ value = exec_sql(testing.db, "SELECT 'hello' FROM DUAL").scalar()
assert isinstance(value, util.text_type)
@testing.provide_metadata
@@ -865,21 +870,22 @@ class TypesTest(fixtures.TestBase):
def test_longstring(self):
metadata = MetaData(testing.db)
- testing.db.execute(
+ exec_sql(
+ testing.db,
"""
CREATE TABLE Z_TEST
(
ID NUMERIC(22) PRIMARY KEY,
ADD_USER VARCHAR2(20) NOT NULL
)
- """
+ """,
)
try:
t = Table("z_test", metadata, autoload=True)
t.insert().execute(id=1.0, add_user="foobar")
assert t.select().execute().fetchall() == [(1, "foobar")]
finally:
- testing.db.execute("DROP TABLE Z_TEST")
+ exec_sql(testing.db, "DROP TABLE Z_TEST")
class LOBFetchTest(fixtures.TablesTest):
@@ -943,7 +949,7 @@ class LOBFetchTest(fixtures.TablesTest):
eq_(row["bindata"], b("this is binary 1"))
def test_lobs_with_convert_raw(self):
- row = testing.db.execute("select data, bindata from z_test").first()
+ row = exec_sql(testing.db, "select data, bindata from z_test").first()
eq_(row["data"], "this is text 1")
eq_(row["bindata"], b("this is binary 1"))
@@ -951,8 +957,8 @@ class LOBFetchTest(fixtures.TablesTest):
engine = testing_engine(
options=dict(auto_convert_lobs=False, arraysize=1)
)
- result = engine.execute(
- "select id, data, bindata from z_test order by id"
+ result = exec_sql(
+ engine, "select id, data, bindata from z_test order by id"
)
results = result.fetchall()
@@ -985,8 +991,8 @@ class LOBFetchTest(fixtures.TablesTest):
engine = testing_engine(
options=dict(auto_convert_lobs=True, arraysize=1)
)
- result = engine.execute(
- "select id, data, bindata from z_test order by id"
+ result = exec_sql(
+ engine, "select id, data, bindata from z_test order by id"
)
results = result.fetchall()
@@ -1090,7 +1096,10 @@ class EuroNumericTest(fixtures.TestBase):
{},
),
]:
- test_exp = conn.scalar(stmt, **kw)
+ if isinstance(stmt, util.string_types):
+ test_exp = conn.exec_driver_sql(stmt, kw).scalar()
+ else:
+ test_exp = conn.scalar(stmt, **kw)
eq_(test_exp, exp)
assert type(test_exp) is type(exp)
diff --git a/test/dialect/postgresql/test_dialect.py b/test/dialect/postgresql/test_dialect.py
index e36b69802..03e91482d 100644
--- a/test/dialect/postgresql/test_dialect.py
+++ b/test/dialect/postgresql/test_dialect.py
@@ -56,7 +56,7 @@ class DialectTest(fixtures.TestBase):
def test_version_parsing(self):
def mock_conn(res):
return mock.Mock(
- execute=mock.Mock(
+ exec_driver_sql=mock.Mock(
return_value=mock.Mock(scalar=mock.Mock(return_value=res))
)
)
@@ -615,7 +615,7 @@ class MiscBackendTest(
conn = testing.db.connect()
trans = conn.begin()
try:
- conn.execute(
+ conn.exec_driver_sql(
"""
CREATE OR REPLACE FUNCTION note(message varchar) RETURNS integer AS $$
BEGIN
@@ -625,8 +625,8 @@ END;
$$ LANGUAGE plpgsql;
"""
)
- conn.execute("SELECT note('hi there')")
- conn.execute("SELECT note('another note')")
+ conn.exec_driver_sql("SELECT note('hi there')")
+ conn.exec_driver_sql("SELECT note('another note')")
finally:
trans.rollback()
finally:
@@ -643,7 +643,9 @@ $$ LANGUAGE plpgsql;
@engines.close_open_connections
def test_client_encoding(self):
c = testing.db.connect()
- current_encoding = c.execute("show client_encoding").fetchone()[0]
+ current_encoding = c.exec_driver_sql(
+ "show client_encoding"
+ ).fetchone()[0]
c.close()
# attempt to use an encoding that's not
@@ -655,7 +657,7 @@ $$ LANGUAGE plpgsql;
e = engines.testing_engine(options={"client_encoding": test_encoding})
c = e.connect()
- new_encoding = c.execute("show client_encoding").fetchone()[0]
+ new_encoding = c.exec_driver_sql("show client_encoding").fetchone()[0]
eq_(new_encoding, test_encoding)
@testing.requires.psycopg2_or_pg8000_compatibility
@@ -671,7 +673,7 @@ $$ LANGUAGE plpgsql;
assert_raises_message(
exc.ProgrammingError,
'prepared transaction with identifier "gilberte" does not exist',
- c.execute,
+ c.exec_driver_sql,
"commit prepared 'gilberte'",
)
@@ -697,7 +699,7 @@ $$ LANGUAGE plpgsql;
seq = Sequence("fooseq")
t = Table("mytable", meta1, Column("col1", Integer, seq))
seq.drop()
- testing.db.execute("CREATE SEQUENCE fooseq")
+ testing.db.execute(text("CREATE SEQUENCE fooseq"))
t.create(checkfirst=True)
@testing.provide_metadata
@@ -766,7 +768,8 @@ $$ LANGUAGE plpgsql;
try:
meta = MetaData(testing.db)
testing.db.execute(
- """
+ text(
+ """
CREATE TABLE speedy_users
(
speedy_user_id SERIAL PRIMARY KEY,
@@ -775,6 +778,7 @@ $$ LANGUAGE plpgsql;
user_password VARCHAR NOT NULL
);
"""
+ )
)
t = Table("speedy_users", meta, autoload=True)
r = t.insert().execute(user_name="user", user_password="lala")
@@ -782,7 +786,7 @@ $$ LANGUAGE plpgsql;
result = t.select().execute().fetchall()
assert result == [(1, "user", "lala")]
finally:
- testing.db.execute("drop table speedy_users")
+ testing.db.execute(text("drop table speedy_users"))
@testing.requires.psycopg2_or_pg8000_compatibility
def test_numeric_raise(self):
diff --git a/test/dialect/postgresql/test_reflection.py b/test/dialect/postgresql/test_reflection.py
index 8e26d5a83..89d4ae081 100644
--- a/test/dialect/postgresql/test_reflection.py
+++ b/test/dialect/postgresql/test_reflection.py
@@ -277,28 +277,32 @@ class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults):
'CREATE DOMAIN "SomeSchema"."Quoted.Domain" INTEGER DEFAULT 0',
]:
try:
- con.execute(ddl)
+ con.exec_driver_sql(ddl)
except exc.DBAPIError as e:
if "already exists" not in str(e):
raise e
- con.execute(
+ con.exec_driver_sql(
"CREATE TABLE testtable (question integer, answer " "testdomain)"
)
- con.execute(
+ con.exec_driver_sql(
"CREATE TABLE test_schema.testtable(question "
"integer, answer test_schema.testdomain, anything "
"integer)"
)
- con.execute(
+ con.exec_driver_sql(
"CREATE TABLE crosschema (question integer, answer "
"test_schema.testdomain)"
)
- con.execute("CREATE TABLE enum_test (id integer, data enumdomain)")
+ con.exec_driver_sql(
+ "CREATE TABLE enum_test (id integer, data enumdomain)"
+ )
- con.execute("CREATE TABLE array_test (id integer, data arraydomain)")
+ con.exec_driver_sql(
+ "CREATE TABLE array_test (id integer, data arraydomain)"
+ )
- con.execute(
+ con.exec_driver_sql(
"CREATE TABLE quote_test "
'(id integer, data "SomeSchema"."Quoted.Domain")'
)
@@ -306,19 +310,19 @@ class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults):
@classmethod
def teardown_class(cls):
con = testing.db.connect()
- con.execute("DROP TABLE testtable")
- con.execute("DROP TABLE test_schema.testtable")
- con.execute("DROP TABLE crosschema")
- con.execute("DROP TABLE quote_test")
- con.execute("DROP DOMAIN testdomain")
- con.execute("DROP DOMAIN test_schema.testdomain")
- con.execute("DROP TABLE enum_test")
- con.execute("DROP DOMAIN enumdomain")
- con.execute("DROP TYPE testtype")
- con.execute("DROP TABLE array_test")
- con.execute("DROP DOMAIN arraydomain")
- con.execute('DROP DOMAIN "SomeSchema"."Quoted.Domain"')
- con.execute('DROP SCHEMA "SomeSchema"')
+ con.exec_driver_sql("DROP TABLE testtable")
+ con.exec_driver_sql("DROP TABLE test_schema.testtable")
+ con.exec_driver_sql("DROP TABLE crosschema")
+ con.exec_driver_sql("DROP TABLE quote_test")
+ con.exec_driver_sql("DROP DOMAIN testdomain")
+ con.exec_driver_sql("DROP DOMAIN test_schema.testdomain")
+ con.exec_driver_sql("DROP TABLE enum_test")
+ con.exec_driver_sql("DROP DOMAIN enumdomain")
+ con.exec_driver_sql("DROP TYPE testtype")
+ con.exec_driver_sql("DROP TABLE array_test")
+ con.exec_driver_sql("DROP DOMAIN arraydomain")
+ con.exec_driver_sql('DROP DOMAIN "SomeSchema"."Quoted.Domain"')
+ con.exec_driver_sql('DROP SCHEMA "SomeSchema"')
def test_table_is_reflected(self):
metadata = MetaData(testing.db)
@@ -485,9 +489,9 @@ class ReflectionTest(fixtures.TestBase):
eq_(t2.c.id.server_default.arg.text, "nextval('t_id_seq'::regclass)")
r = t2.insert().execute()
eq_(r.inserted_primary_key, [1])
- testing.db.connect().execution_options(autocommit=True).execute(
- "alter table t_id_seq rename to foobar_id_seq"
- )
+ testing.db.connect().execution_options(
+ autocommit=True
+ ).exec_driver_sql("alter table t_id_seq rename to foobar_id_seq")
m3 = MetaData(testing.db)
t3 = Table("t", m3, autoload=True, implicit_returning=False)
eq_(
@@ -507,9 +511,9 @@ class ReflectionTest(fixtures.TestBase):
Column("x", Integer),
)
metadata.create_all()
- testing.db.connect().execution_options(autocommit=True).execute(
- "alter table t alter column id type varchar(50)"
- )
+ testing.db.connect().execution_options(
+ autocommit=True
+ ).exec_driver_sql("alter table t alter column id type varchar(50)")
m2 = MetaData(testing.db)
t2 = Table("t", m2, autoload=True)
eq_(t2.c.id.autoincrement, False)
@@ -520,9 +524,9 @@ class ReflectionTest(fixtures.TestBase):
metadata = self.metadata
Table("t", metadata, Column("id", Integer, primary_key=True))
metadata.create_all()
- testing.db.connect().execution_options(autocommit=True).execute(
- "alter table t rename id to t_id"
- )
+ testing.db.connect().execution_options(
+ autocommit=True
+ ).exec_driver_sql("alter table t rename id to t_id")
m2 = MetaData(testing.db)
t2 = Table("t", m2, autoload=True)
eq_([c.name for c in t2.primary_key], ["t_id"])
@@ -642,7 +646,7 @@ class ReflectionTest(fixtures.TestBase):
conn = testing.db.connect()
conn.detach()
- conn.execute("SET search_path TO test_schema, test_schema_2")
+ conn.exec_driver_sql("SET search_path TO test_schema, test_schema_2")
meta2 = MetaData(bind=conn)
subject = Table(
"subject",
@@ -727,7 +731,7 @@ class ReflectionTest(fixtures.TestBase):
with testing.db.connect() as conn:
conn.detach()
- conn.execute(
+ conn.exec_driver_sql(
"set search_path to test_schema_2, test_schema, public"
)
@@ -792,7 +796,7 @@ class ReflectionTest(fixtures.TestBase):
with testing.db.connect() as conn:
conn.detach()
- conn.execute(
+ conn.exec_driver_sql(
"set search_path to test_schema_2, test_schema, public"
)
meta2 = MetaData(conn)
@@ -889,22 +893,17 @@ class ReflectionTest(fixtures.TestBase):
Column("aname", String(20)),
)
metadata.create_all()
- testing.db.execute(
- """
- create index idx1 on party ((id || name))
- """
- )
- testing.db.execute(
- """
- create unique index idx2 on party (id) where name = 'test'
- """
- )
- testing.db.execute(
- """
- create index idx3 on party using btree
- (lower(name::text), lower(aname::text))
- """
- )
+ with testing.db.connect() as c:
+ c.exec_driver_sql("create index idx1 on party ((id || name))")
+ c.exec_driver_sql(
+ "create unique index idx2 on party (id) where name = 'test'"
+ )
+ c.exec_driver_sql(
+ """
+ create index idx3 on party using btree
+ (lower(name::text), lower(aname::text))
+ """
+ )
def go():
m2 = MetaData(testing.db)
@@ -951,7 +950,7 @@ class ReflectionTest(fixtures.TestBase):
t1.create(conn)
# check ASC, DESC options alone
- conn.execute(
+ conn.exec_driver_sql(
"""
create index idx1 on party
(id, name ASC, aname DESC)
@@ -959,7 +958,7 @@ class ReflectionTest(fixtures.TestBase):
)
# check DESC w/ NULLS options
- conn.execute(
+ conn.exec_driver_sql(
"""
create index idx2 on party
(name DESC NULLS FIRST, aname DESC NULLS LAST)
@@ -967,7 +966,7 @@ class ReflectionTest(fixtures.TestBase):
)
# check ASC w/ NULLS options
- conn.execute(
+ conn.exec_driver_sql(
"""
create index idx3 on party
(name ASC NULLS FIRST, aname ASC NULLS LAST)
@@ -1028,8 +1027,8 @@ class ReflectionTest(fixtures.TestBase):
)
metadata.create_all()
conn = testing.db.connect().execution_options(autocommit=True)
- conn.execute("CREATE INDEX idx1 ON t (x)")
- conn.execute("ALTER TABLE t RENAME COLUMN x to y")
+ conn.exec_driver_sql("CREATE INDEX idx1 ON t (x)")
+ conn.exec_driver_sql("ALTER TABLE t RENAME COLUMN x to y")
ind = testing.db.dialect.get_indexes(conn, "t", None)
eq_(ind, [{"unique": False, "column_names": ["y"], "name": "idx1"}])
@@ -1051,7 +1050,9 @@ class ReflectionTest(fixtures.TestBase):
metadata.create_all()
with testing.db.connect().execution_options(autocommit=True) as conn:
- conn.execute("CREATE INDEX idx1 ON t (x) WITH (fillfactor = 50)")
+ conn.exec_driver_sql(
+ "CREATE INDEX idx1 ON t (x) WITH (fillfactor = 50)"
+ )
ind = testing.db.dialect.get_indexes(conn, "t", None)
eq_(
@@ -1089,7 +1090,7 @@ class ReflectionTest(fixtures.TestBase):
)
metadata.create_all()
with testing.db.connect().execution_options(autocommit=True) as conn:
- conn.execute("CREATE INDEX idx1 ON t USING gin (x)")
+ conn.exec_driver_sql("CREATE INDEX idx1 ON t USING gin (x)")
ind = testing.db.dialect.get_indexes(conn, "t", None)
eq_(
diff --git a/test/dialect/test_firebird.py b/test/dialect/test_firebird.py
index 1b72da1bc..3455dca66 100644
--- a/test/dialect/test_firebird.py
+++ b/test/dialect/test_firebird.py
@@ -40,17 +40,21 @@ class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults):
def setup_class(cls):
con = testing.db.connect()
try:
- con.execute(
+ con.exec_driver_sql(
"CREATE DOMAIN int_domain AS INTEGER DEFAULT " "42 NOT NULL"
)
- con.execute("CREATE DOMAIN str_domain AS VARCHAR(255)")
- con.execute("CREATE DOMAIN rem_domain AS BLOB SUB_TYPE TEXT")
- con.execute("CREATE DOMAIN img_domain AS BLOB SUB_TYPE " "BINARY")
+ con.exec_driver_sql("CREATE DOMAIN str_domain AS VARCHAR(255)")
+ con.exec_driver_sql(
+ "CREATE DOMAIN rem_domain AS BLOB SUB_TYPE TEXT"
+ )
+ con.exec_driver_sql(
+ "CREATE DOMAIN img_domain AS BLOB SUB_TYPE " "BINARY"
+ )
except ProgrammingError as e:
if "attempt to store duplicate value" not in str(e):
raise e
- con.execute("""CREATE GENERATOR gen_testtable_id""")
- con.execute(
+ con.exec_driver_sql("""CREATE GENERATOR gen_testtable_id""")
+ con.exec_driver_sql(
"""CREATE TABLE testtable (question int_domain,
answer str_domain DEFAULT 'no answer',
remark rem_domain DEFAULT '',
@@ -60,12 +64,12 @@ class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults):
dt timestamp,
redundant str_domain DEFAULT NULL)"""
)
- con.execute(
+ con.exec_driver_sql(
"ALTER TABLE testtable "
"ADD CONSTRAINT testtable_pk PRIMARY KEY "
"(question)"
)
- con.execute(
+ con.exec_driver_sql(
"CREATE TRIGGER testtable_autoid FOR testtable "
" ACTIVE BEFORE INSERT AS"
" BEGIN"
@@ -77,12 +81,12 @@ class DomainReflectionTest(fixtures.TestBase, AssertsExecutionResults):
@classmethod
def teardown_class(cls):
con = testing.db.connect()
- con.execute("DROP TABLE testtable")
- con.execute("DROP DOMAIN int_domain")
- con.execute("DROP DOMAIN str_domain")
- con.execute("DROP DOMAIN rem_domain")
- con.execute("DROP DOMAIN img_domain")
- con.execute("DROP GENERATOR gen_testtable_id")
+ con.exec_driver_sql("DROP TABLE testtable")
+ con.exec_driver_sql("DROP DOMAIN int_domain")
+ con.exec_driver_sql("DROP DOMAIN str_domain")
+ con.exec_driver_sql("DROP DOMAIN rem_domain")
+ con.exec_driver_sql("DROP DOMAIN img_domain")
+ con.exec_driver_sql("DROP GENERATOR gen_testtable_id")
def test_table_is_reflected(self):
from sqlalchemy.types import (
@@ -222,29 +226,29 @@ ID DOM_ID /* INTEGER NOT NULL */ default 0 )
@classmethod
def setup_class(cls):
con = testing.db.connect()
- con.execute(cls.AUTOINC_DM)
- con.execute(cls.MONEY_DM)
- con.execute(cls.NOSI_DM)
- con.execute(cls.RIT_TESORERIA_CAPITOLO_DM)
- con.execute(cls.DEF_ERROR_TB)
- con.execute(cls.DEF_ERROR_NODOM_TB)
+ con.exec_driver_sql(cls.AUTOINC_DM)
+ con.exec_driver_sql(cls.MONEY_DM)
+ con.exec_driver_sql(cls.NOSI_DM)
+ con.exec_driver_sql(cls.RIT_TESORERIA_CAPITOLO_DM)
+ con.exec_driver_sql(cls.DEF_ERROR_TB)
+ con.exec_driver_sql(cls.DEF_ERROR_NODOM_TB)
- con.execute(cls.DOM_ID)
- con.execute(cls.TABLE_A)
- con.execute(cls.TABLE_B)
+ con.exec_driver_sql(cls.DOM_ID)
+ con.exec_driver_sql(cls.TABLE_A)
+ con.exec_driver_sql(cls.TABLE_B)
@classmethod
def teardown_class(cls):
con = testing.db.connect()
- con.execute("DROP TABLE a")
- con.execute("DROP TABLE b")
- con.execute("DROP DOMAIN dom_id")
- con.execute("DROP TABLE def_error_nodom")
- con.execute("DROP TABLE def_error")
- con.execute("DROP DOMAIN rit_tesoreria_capitolo_dm")
- con.execute("DROP DOMAIN nosi_dm")
- con.execute("DROP DOMAIN money_dm")
- con.execute("DROP DOMAIN autoinc_dm")
+ con.exec_driver_sql("DROP TABLE a")
+ con.exec_driver_sql("DROP TABLE b")
+ con.exec_driver_sql("DROP DOMAIN dom_id")
+ con.exec_driver_sql("DROP TABLE def_error_nodom")
+ con.exec_driver_sql("DROP TABLE def_error")
+ con.exec_driver_sql("DROP DOMAIN rit_tesoreria_capitolo_dm")
+ con.exec_driver_sql("DROP DOMAIN nosi_dm")
+ con.exec_driver_sql("DROP DOMAIN money_dm")
+ con.exec_driver_sql("DROP DOMAIN autoinc_dm")
def test_tables_are_reflected_same_way(self):
metadata = MetaData(testing.db)
diff --git a/test/dialect/test_sqlite.py b/test/dialect/test_sqlite.py
index 01ef5f084..ea4cba8cc 100644
--- a/test/dialect/test_sqlite.py
+++ b/test/dialect/test_sqlite.py
@@ -59,6 +59,11 @@ from sqlalchemy.util import u
from sqlalchemy.util import ue
+def exec_sql(engine, sql, *args, **kwargs):
+ conn = engine.connect(close_with_result=True)
+ return conn.exec_driver_sql(sql, *args, **kwargs)
+
+
class TestTypes(fixtures.TestBase, AssertsExecutionResults):
__only_on__ = "sqlite"
@@ -77,23 +82,29 @@ class TestTypes(fixtures.TestBase, AssertsExecutionResults):
)
try:
meta.create_all()
- testing.db.execute(
- "INSERT INTO bool_table (id, boo) " "VALUES (1, 'false');"
+ exec_sql(
+ testing.db,
+ "INSERT INTO bool_table (id, boo) " "VALUES (1, 'false');",
)
- testing.db.execute(
- "INSERT INTO bool_table (id, boo) " "VALUES (2, 'true');"
+ exec_sql(
+ testing.db,
+ "INSERT INTO bool_table (id, boo) " "VALUES (2, 'true');",
)
- testing.db.execute(
- "INSERT INTO bool_table (id, boo) " "VALUES (3, '1');"
+ exec_sql(
+ testing.db,
+ "INSERT INTO bool_table (id, boo) " "VALUES (3, '1');",
)
- testing.db.execute(
- "INSERT INTO bool_table (id, boo) " "VALUES (4, '0');"
+ exec_sql(
+ testing.db,
+ "INSERT INTO bool_table (id, boo) " "VALUES (4, '0');",
)
- testing.db.execute(
- "INSERT INTO bool_table (id, boo) " "VALUES (5, 1);"
+ exec_sql(
+ testing.db,
+ "INSERT INTO bool_table (id, boo) " "VALUES (5, 1);",
)
- testing.db.execute(
- "INSERT INTO bool_table (id, boo) " "VALUES (6, 0);"
+ exec_sql(
+ testing.db,
+ "INSERT INTO bool_table (id, boo) " "VALUES (6, 0);",
)
eq_(
t.select(t.c.boo).order_by(t.c.id).execute().fetchall(),
@@ -176,9 +187,11 @@ class TestTypes(fixtures.TestBase, AssertsExecutionResults):
testing.db.execute(
t.insert().values(d=datetime.datetime(2010, 10, 15, 12, 37, 0))
)
- testing.db.execute("insert into t (d) values ('2004-05-21T00:00:00')")
+ exec_sql(
+ testing.db, "insert into t (d) values ('2004-05-21T00:00:00')"
+ )
eq_(
- testing.db.execute("select * from t order by d").fetchall(),
+ exec_sql(testing.db, "select * from t order by d").fetchall(),
[("2004-05-21T00:00:00",), ("2010-10-15T12:37:00",)],
)
eq_(
@@ -201,9 +214,9 @@ class TestTypes(fixtures.TestBase, AssertsExecutionResults):
testing.db.execute(
t.insert().values(d=datetime.datetime(2010, 10, 15, 12, 37, 0))
)
- testing.db.execute("insert into t (d) values ('20040521000000')")
+ exec_sql(testing.db, "insert into t (d) values ('20040521000000')")
eq_(
- testing.db.execute("select * from t order by d").fetchall(),
+ exec_sql(testing.db, "select * from t order by d").fetchall(),
[("20040521000000",), ("20101015123700",)],
)
eq_(
@@ -223,9 +236,9 @@ class TestTypes(fixtures.TestBase, AssertsExecutionResults):
t = Table("t", self.metadata, Column("d", sqlite_date))
self.metadata.create_all(testing.db)
testing.db.execute(t.insert().values(d=datetime.date(2010, 10, 15)))
- testing.db.execute("insert into t (d) values ('20040521')")
+ exec_sql(testing.db, "insert into t (d) values ('20040521')")
eq_(
- testing.db.execute("select * from t order by d").fetchall(),
+ exec_sql(testing.db, "select * from t order by d").fetchall(),
[("20040521",), ("20101015",)],
)
eq_(
@@ -243,9 +256,9 @@ class TestTypes(fixtures.TestBase, AssertsExecutionResults):
t = Table("t", self.metadata, Column("d", sqlite_date))
self.metadata.create_all(testing.db)
testing.db.execute(t.insert().values(d=datetime.date(2010, 10, 15)))
- testing.db.execute("insert into t (d) values ('2004|05|21')")
+ exec_sql(testing.db, "insert into t (d) values ('2004|05|21')")
eq_(
- testing.db.execute("select * from t order by d").fetchall(),
+ exec_sql(testing.db, "select * from t order by d").fetchall(),
[("2004|05|21",), ("2010|10|15",)],
)
eq_(
@@ -499,12 +512,12 @@ class DefaultsTest(fixtures.TestBase, AssertsCompiledSQL):
val INTEGER NOT NULL DEFAULT 0
)"""
try:
- db.execute(table)
+ exec_sql(db, table)
rt = Table("r_defaults", m, autoload=True)
for i, reflected in enumerate(rt.c):
eq_(str(reflected.server_default.arg), expected[i])
finally:
- db.execute("DROP TABLE r_defaults")
+ exec_sql(db, "DROP TABLE r_defaults")
def test_default_reflection_3(self):
db = testing.db
@@ -513,10 +526,10 @@ class DefaultsTest(fixtures.TestBase, AssertsCompiledSQL):
val INTEGER NOT NULL DEFAULT 0
)"""
try:
- db.execute(table)
+ exec_sql(db, table)
m1 = MetaData(db)
t1 = Table("r_defaults", m1, autoload=True)
- db.execute("DROP TABLE r_defaults")
+ exec_sql(db, "DROP TABLE r_defaults")
t1.create()
m2 = MetaData(db)
t2 = Table("r_defaults", m2, autoload=True)
@@ -527,7 +540,7 @@ class DefaultsTest(fixtures.TestBase, AssertsCompiledSQL):
"NOT NULL)",
)
finally:
- db.execute("DROP TABLE r_defaults")
+ exec_sql(db, "DROP TABLE r_defaults")
@testing.provide_metadata
def test_boolean_default(self):
@@ -633,14 +646,16 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults):
"""Tests autoload of tables created with quoted column names."""
metadata = self.metadata
- testing.db.execute(
+ exec_sql(
+ testing.db,
"""CREATE TABLE "django_content_type" (
"id" integer NOT NULL PRIMARY KEY,
"django_stuff" text NULL
)
- """
+ """,
)
- testing.db.execute(
+ exec_sql(
+ testing.db,
"""
CREATE TABLE "django_admin_log" (
"id" integer NOT NULL PRIMARY KEY,
@@ -650,7 +665,7 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults):
"object_id" text NULL,
"change_message" text NOT NULL
)
- """
+ """,
)
table1 = Table("django_admin_log", metadata, autoload=True)
table2 = Table("django_content_type", metadata, autoload=True)
@@ -670,16 +685,17 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults):
"""
metadata = self.metadata
- testing.db.execute(
+ exec_sql(
+ testing.db,
r'''CREATE TABLE """a""" (
"""id""" integer NOT NULL PRIMARY KEY
)
- '''
+ ''',
)
# unfortunately, still can't do this; sqlite quadruples
# up the quotes on the table name here for pragma foreign_key_list
- # testing.db.execute(r'''
+ # exec_sql(testing.db,r'''
# CREATE TABLE """b""" (
# """id""" integer NOT NULL PRIMARY KEY,
# """aid""" integer NULL
@@ -884,7 +900,7 @@ class AttachedDBTest(fixtures.TestBase):
eq_(insp.get_schema_names(), ["main", "test_schema"])
# implicitly creates a "temp" schema
- self.conn.execute("select * from sqlite_temp_master")
+ self.conn.exec_driver_sql("select * from sqlite_temp_master")
# we're not including it
insp = inspect(self.conn)
@@ -1475,8 +1491,8 @@ def full_text_search_missing():
it is and True otherwise."""
try:
- testing.db.execute("CREATE VIRTUAL TABLE t using FTS3;")
- testing.db.execute("DROP TABLE t;")
+ exec_sql(testing.db, "CREATE VIRTUAL TABLE t using FTS3;")
+ exec_sql(testing.db, "DROP TABLE t;")
return False
except Exception:
return True
@@ -1494,17 +1510,19 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL):
def setup_class(cls):
global metadata, cattable, matchtable
metadata = MetaData(testing.db)
- testing.db.execute(
+ exec_sql(
+ testing.db,
"""
CREATE VIRTUAL TABLE cattable using FTS3 (
id INTEGER NOT NULL,
description VARCHAR(50),
PRIMARY KEY (id)
)
- """
+ """,
)
cattable = Table("cattable", metadata, autoload=True)
- testing.db.execute(
+ exec_sql(
+ testing.db,
"""
CREATE VIRTUAL TABLE matchtable using FTS3 (
id INTEGER NOT NULL,
@@ -1512,7 +1530,7 @@ class MatchTest(fixtures.TestBase, AssertsCompiledSQL):
category_id INTEGER NOT NULL,
PRIMARY KEY (id)
)
- """
+ """,
)
matchtable = Table("matchtable", metadata, autoload=True)
metadata.create_all()
@@ -1678,16 +1696,16 @@ class ReflectHeadlessFKsTest(fixtures.TestBase):
__only_on__ = "sqlite"
def setup(self):
- testing.db.execute("CREATE TABLE a (id INTEGER PRIMARY KEY)")
+ exec_sql(testing.db, "CREATE TABLE a (id INTEGER PRIMARY KEY)")
# this syntax actually works on other DBs perhaps we'd want to add
# tests to test_reflection
- testing.db.execute(
- "CREATE TABLE b (id INTEGER PRIMARY KEY REFERENCES a)"
+ exec_sql(
+ testing.db, "CREATE TABLE b (id INTEGER PRIMARY KEY REFERENCES a)"
)
def teardown(self):
- testing.db.execute("drop table b")
- testing.db.execute("drop table a")
+ exec_sql(testing.db, "drop table b")
+ exec_sql(testing.db, "drop table a")
def test_reflect_tables_fk_no_colref(self):
meta = MetaData()
@@ -1703,17 +1721,21 @@ class KeywordInDatabaseNameTest(fixtures.TestBase):
@classmethod
def setup_class(cls):
with testing.db.begin() as conn:
- conn.execute('ATTACH %r AS "default"' % conn.engine.url.database)
- conn.execute('CREATE TABLE "default".a (id INTEGER PRIMARY KEY)')
+ conn.exec_driver_sql(
+ 'ATTACH %r AS "default"' % conn.engine.url.database
+ )
+ conn.exec_driver_sql(
+ 'CREATE TABLE "default".a (id INTEGER PRIMARY KEY)'
+ )
@classmethod
def teardown_class(cls):
with testing.db.begin() as conn:
try:
- conn.execute('drop table "default".a')
+ conn.exec_driver_sql('drop table "default".a')
except Exception:
pass
- conn.execute('DETACH DATABASE "default"')
+ conn.exec_driver_sql('DETACH DATABASE "default"')
def test_reflect(self):
with testing.db.begin() as conn:
@@ -1729,72 +1751,72 @@ class ConstraintReflectionTest(fixtures.TestBase):
def setup_class(cls):
with testing.db.begin() as conn:
- conn.execute("CREATE TABLE a1 (id INTEGER PRIMARY KEY)")
- conn.execute("CREATE TABLE a2 (id INTEGER PRIMARY KEY)")
- conn.execute(
+ conn.exec_driver_sql("CREATE TABLE a1 (id INTEGER PRIMARY KEY)")
+ conn.exec_driver_sql("CREATE TABLE a2 (id INTEGER PRIMARY KEY)")
+ conn.exec_driver_sql(
"CREATE TABLE b (id INTEGER PRIMARY KEY, "
"FOREIGN KEY(id) REFERENCES a1(id),"
"FOREIGN KEY(id) REFERENCES a2(id)"
")"
)
- conn.execute(
+ conn.exec_driver_sql(
"CREATE TABLE c (id INTEGER, "
"CONSTRAINT bar PRIMARY KEY(id),"
"CONSTRAINT foo1 FOREIGN KEY(id) REFERENCES a1(id),"
"CONSTRAINT foo2 FOREIGN KEY(id) REFERENCES a2(id)"
")"
)
- conn.execute(
+ conn.exec_driver_sql(
# the lower casing + inline is intentional here
"CREATE TABLE d (id INTEGER, x INTEGER unique)"
)
- conn.execute(
+ conn.exec_driver_sql(
# the lower casing + inline is intentional here
"CREATE TABLE d1 "
'(id INTEGER, "some ( STUPID n,ame" INTEGER unique)'
)
- conn.execute(
+ conn.exec_driver_sql(
# the lower casing + inline is intentional here
'CREATE TABLE d2 ( "some STUPID n,ame" INTEGER unique)'
)
- conn.execute(
+ conn.exec_driver_sql(
# the lower casing + inline is intentional here
'CREATE TABLE d3 ( "some STUPID n,ame" INTEGER NULL unique)'
)
- conn.execute(
+ conn.exec_driver_sql(
# lower casing + inline is intentional
"CREATE TABLE e (id INTEGER, x INTEGER references a2(id))"
)
- conn.execute(
+ conn.exec_driver_sql(
'CREATE TABLE e1 (id INTEGER, "some ( STUPID n,ame" INTEGER '
'references a2 ("some ( STUPID n,ame"))'
)
- conn.execute(
+ conn.exec_driver_sql(
"CREATE TABLE e2 (id INTEGER, "
'"some ( STUPID n,ame" INTEGER NOT NULL '
'references a2 ("some ( STUPID n,ame"))'
)
- conn.execute(
+ conn.exec_driver_sql(
"CREATE TABLE f (x INTEGER, CONSTRAINT foo_fx UNIQUE(x))"
)
- conn.execute(
+ conn.exec_driver_sql(
"CREATE TEMPORARY TABLE g "
"(x INTEGER, CONSTRAINT foo_gx UNIQUE(x))"
)
- conn.execute(
+ conn.exec_driver_sql(
# intentional broken casing
"CREATE TABLE h (x INTEGER, COnstraINT foo_hx unIQUE(x))"
)
- conn.execute(
+ conn.exec_driver_sql(
"CREATE TABLE i (x INTEGER, y INTEGER, PRIMARY KEY(x, y))"
)
- conn.execute(
+ conn.exec_driver_sql(
"CREATE TABLE j (id INTEGER, q INTEGER, p INTEGER, "
"PRIMARY KEY(id), FOreiGN KEY(q,p) REFERENCes i(x,y))"
)
- conn.execute(
+ conn.exec_driver_sql(
"CREATE TABLE k (id INTEGER, q INTEGER, p INTEGER, "
"PRIMARY KEY(id), "
"conSTRAINT my_fk FOreiGN KEY ( q , p ) "
@@ -1833,8 +1855,10 @@ class ConstraintReflectionTest(fixtures.TestBase):
meta.create_all(conn)
# will contain an "autoindex"
- conn.execute("create table o (foo varchar(20) primary key)")
- conn.execute(
+ conn.exec_driver_sql(
+ "create table o (foo varchar(20) primary key)"
+ )
+ conn.exec_driver_sql(
"CREATE TABLE onud_test (id INTEGER PRIMARY KEY, "
"c1 INTEGER, c2 INTEGER, c3 INTEGER, c4 INTEGER, "
"CONSTRAINT fk1 FOREIGN KEY (c1) REFERENCES a1(id) "
@@ -1847,30 +1871,30 @@ class ConstraintReflectionTest(fixtures.TestBase):
"ON UPDATE NO ACTION)"
)
- conn.execute(
+ conn.exec_driver_sql(
"CREATE TABLE cp ("
"q INTEGER check (q > 1 AND q < 6),\n"
"CONSTRAINT cq CHECK (q == 1 OR (q > 2 AND q < 5))\n"
")"
)
- conn.execute(
+ conn.exec_driver_sql(
"CREATE TABLE implicit_referred (pk integer primary key)"
)
# single col foreign key with no referred column given,
# must assume primary key of referred table
- conn.execute(
+ conn.exec_driver_sql(
"CREATE TABLE implicit_referrer "
"(id integer REFERENCES implicit_referred)"
)
- conn.execute(
+ conn.exec_driver_sql(
"CREATE TABLE implicit_referred_comp "
"(pk1 integer, pk2 integer, primary key (pk1, pk2))"
)
# composite foreign key with no referred columns given,
# must assume primary key of referred table
- conn.execute(
+ conn.exec_driver_sql(
"CREATE TABLE implicit_referrer_comp "
"(id1 integer, id2 integer, foreign key(id1, id2) "
"REFERENCES implicit_referred_comp)"
@@ -1878,7 +1902,7 @@ class ConstraintReflectionTest(fixtures.TestBase):
# worst case - FK that refers to nonexistent table so we cant
# get pks. requires FK pragma is turned off
- conn.execute(
+ conn.exec_driver_sql(
"CREATE TABLE implicit_referrer_comp_fake "
"(id1 integer, id2 integer, foreign key(id1, id2) "
"REFERENCES fake_table)"
@@ -1912,7 +1936,7 @@ class ConstraintReflectionTest(fixtures.TestBase):
"a2",
]:
try:
- conn.execute("drop table %s" % name)
+ conn.exec_driver_sql("drop table %s" % name)
except Exception:
pass
@@ -2199,7 +2223,7 @@ class ConstraintReflectionTest(fixtures.TestBase):
def test_foreign_key_options_unnamed_inline(self):
with testing.db.connect() as conn:
- conn.execute(
+ conn.exec_driver_sql(
"create table foo (id integer, "
"foreign key (id) references bar (id) on update cascade)"
)
@@ -2365,7 +2389,7 @@ class SavepointTest(fixtures.TablesTest):
@event.listens_for(engine, "begin")
def do_begin(conn):
# emit our own BEGIN
- conn.execute("BEGIN")
+ conn.exec_driver_sql("BEGIN")
return engine
@@ -2536,7 +2560,7 @@ class TypeReflectionTest(fixtures.TestBase):
conn = testing.db.connect()
for from_, to_ in self._fixture_as_string(fixture):
inspector = inspect(conn)
- conn.execute("CREATE TABLE foo (data %s)" % from_)
+ conn.exec_driver_sql("CREATE TABLE foo (data %s)" % from_)
try:
if warnings:
@@ -2559,7 +2583,7 @@ class TypeReflectionTest(fixtures.TestBase):
getattr(to_, attr, None),
)
finally:
- conn.execute("DROP TABLE foo")
+ conn.exec_driver_sql("DROP TABLE foo")
def test_lookup_direct_lookup(self):
self._test_lookup_direct(self._fixed_lookup_fixture())
diff --git a/test/engine/test_bind.py b/test/engine/test_bind.py
index 956874846..c58e703d3 100644
--- a/test/engine/test_bind.py
+++ b/test/engine/test_bind.py
@@ -133,7 +133,10 @@ class BindTest(fixtures.TestBase):
trans.rollback()
metadata.bind = None
assert (
- conn.execute("select count(*) from test_table").scalar() == 0
+ conn.exec_driver_sql(
+ "select count(*) from test_table"
+ ).scalar()
+ == 0
)
finally:
metadata.drop_all(bind=conn)
diff --git a/test/engine/test_deprecations.py b/test/engine/test_deprecations.py
index 0c77e8c25..edf1503d5 100644
--- a/test/engine/test_deprecations.py
+++ b/test/engine/test_deprecations.py
@@ -4,6 +4,7 @@ from sqlalchemy import create_engine
from sqlalchemy import event
from sqlalchemy import ForeignKey
from sqlalchemy import func
+from sqlalchemy import INT
from sqlalchemy import Integer
from sqlalchemy import literal
from sqlalchemy import MetaData
@@ -12,7 +13,9 @@ from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy import testing
from sqlalchemy import TypeDecorator
+from sqlalchemy import VARCHAR
from sqlalchemy.engine import reflection
+from sqlalchemy.engine.base import Connection
from sqlalchemy.engine.base import Engine
from sqlalchemy.engine.mock import MockConnection
from sqlalchemy.testing import assert_raises
@@ -22,12 +25,20 @@ from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing import is_false
+from sqlalchemy.testing import is_instance_of
from sqlalchemy.testing import is_true
from sqlalchemy.testing.mock import Mock
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
+def _string_deprecation_expect():
+ return testing.expect_deprecated_20(
+ r"Passing a string to Connection.execute\(\) is deprecated "
+ r"and will be removed in version 2.0"
+ )
+
+
class SomeException(Exception):
pass
@@ -35,6 +46,10 @@ class SomeException(Exception):
class ConnectionlessDeprecationTest(fixtures.TestBase):
"""test various things associated with "connectionless" executions."""
+ def check_usage(self, inspector):
+ with inspector._operation_context() as conn:
+ is_instance_of(conn, Connection)
+
def test_inspector_constructor_engine(self):
with testing.expect_deprecated(
r"The __init__\(\) method on Inspector is deprecated and will "
@@ -43,6 +58,7 @@ class ConnectionlessDeprecationTest(fixtures.TestBase):
i1 = reflection.Inspector(testing.db)
is_(i1.bind, testing.db)
+ self.check_usage(i1)
def test_inspector_constructor_connection(self):
with testing.db.connect() as conn:
@@ -54,6 +70,7 @@ class ConnectionlessDeprecationTest(fixtures.TestBase):
is_(i1.bind, conn)
is_(i1.engine, testing.db)
+ self.check_usage(i1)
def test_inspector_from_engine(self):
with testing.expect_deprecated(
@@ -63,6 +80,7 @@ class ConnectionlessDeprecationTest(fixtures.TestBase):
i1 = reflection.Inspector.from_engine(testing.db)
is_(i1.bind, testing.db)
+ self.check_usage(i1)
def test_bind_close_conn(self):
e = testing.db
@@ -256,7 +274,7 @@ class HandleErrorTest(fixtures.TestBase):
with engine.connect() as conn:
try:
- conn.execute("SELECT FOO FROM I_DONT_EXIST")
+ conn.exec_driver_sql("SELECT FOO FROM I_DONT_EXIST")
assert False
except tsa.exc.DBAPIError as e:
eq_(canary.mock_calls[0][1][5], e.orig)
@@ -305,7 +323,10 @@ class HandleErrorTest(fixtures.TestBase):
)
assert_raises_message(
- TypeError, "I'm not a DBAPI error", c.execute, "select "
+ TypeError,
+ "I'm not a DBAPI error",
+ c.exec_driver_sql,
+ "select ",
)
# no legacy event
eq_(listener.mock_calls, [])
@@ -369,6 +390,10 @@ class PoolTestBase(fixtures.TestBase):
)
+def select1(db):
+ return str(select([1]).compile(dialect=db.dialect))
+
+
class DeprecatedEngineFeatureTest(fixtures.TablesTest):
__backend__ = True
@@ -442,6 +467,60 @@ class DeprecatedEngineFeatureTest(fixtures.TablesTest):
assert_raises(Exception, conn.transaction, fn, 5, value=8)
self._assert_no_data()
+ def test_execute_plain_string(self):
+ with _string_deprecation_expect():
+ testing.db.execute(select1(testing.db)).scalar()
+
+ def test_scalar_plain_string(self):
+ with _string_deprecation_expect():
+ testing.db.scalar(select1(testing.db))
+
+ # Tests for the warning when non dict params are used
+ # @testing.combinations(42, (42,))
+ # def test_execute_positional_non_dicts(self, args):
+ # with testing.expect_deprecated(
+ # r"Usage of tuple or scalars as positional arguments of "
+ # ):
+ # testing.db.execute(text(select1(testing.db)), args).scalar()
+
+ # @testing.combinations(42, (42,))
+ # def test_scalar_positional_non_dicts(self, args):
+ # with testing.expect_deprecated(
+ # r"Usage of tuple or scalars as positional arguments of "
+ # ):
+ # testing.db.scalar(text(select1(testing.db)), args)
+
+
+class DeprecatedConnectionFeatureTest(fixtures.TablesTest):
+ __backend__ = True
+
+ def test_execute_plain_string(self):
+ with _string_deprecation_expect():
+ with testing.db.connect() as conn:
+ conn.execute(select1(testing.db)).scalar()
+
+ def test_scalar_plain_string(self):
+ with _string_deprecation_expect():
+ with testing.db.connect() as conn:
+ conn.scalar(select1(testing.db))
+
+ # Tests for the warning when non dict params are used
+ # @testing.combinations(42, (42,))
+ # def test_execute_positional_non_dicts(self, args):
+ # with testing.expect_deprecated(
+ # r"Usage of tuple or scalars as positional arguments of "
+ # ):
+ # with testing.db.connect() as conn:
+ # conn.execute(text(select1(testing.db)), args).scalar()
+
+ # @testing.combinations(42, (42,))
+ # def test_scalar_positional_non_dicts(self, args):
+ # with testing.expect_deprecated(
+ # r"Usage of tuple or scalars as positional arguments of "
+ # ):
+ # with testing.db.connect() as conn:
+ # conn.scalar(text(select1(testing.db)), args)
+
class DeprecatedReflectionTest(fixtures.TablesTest):
@classmethod
@@ -540,3 +619,219 @@ class ExecutionOptionsTest(fixtures.TestBase):
):
c2_branch = c2.connect()
eq_(c2_branch._execution_options, {"foo": "bar"})
+
+
+class RawExecuteTest(fixtures.TablesTest):
+ __backend__ = True
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "users",
+ metadata,
+ Column("user_id", INT, primary_key=True, autoincrement=False),
+ Column("user_name", VARCHAR(20)),
+ )
+ Table(
+ "users_autoinc",
+ metadata,
+ Column(
+ "user_id", INT, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column("user_name", VARCHAR(20)),
+ )
+
+ @testing.fails_on(
+ "postgresql+pg8000",
+ "pg8000 still doesn't allow single paren without params",
+ )
+ def test_no_params_option(self, connection):
+ stmt = (
+ "SELECT '%'"
+ + testing.db.dialect.statement_compiler(
+ testing.db.dialect, None
+ ).default_from()
+ )
+
+ result = (
+ connection.execution_options(no_parameters=True)
+ .execute(stmt)
+ .scalar()
+ )
+ eq_(result, "%")
+
+ @testing.requires.qmark_paramstyle
+ def test_raw_qmark(self, connection):
+ conn = connection
+
+ with _string_deprecation_expect():
+ conn.execute(
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ (1, "jack"),
+ )
+ with _string_deprecation_expect():
+ conn.execute(
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ [2, "fred"],
+ )
+
+ with _string_deprecation_expect():
+ conn.execute(
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ [3, "ed"],
+ [4, "horse"],
+ )
+ with _string_deprecation_expect():
+ conn.execute(
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ (5, "barney"),
+ (6, "donkey"),
+ )
+
+ with _string_deprecation_expect():
+ conn.execute(
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ 7,
+ "sally",
+ )
+
+ with _string_deprecation_expect():
+ res = conn.execute("select * from users order by user_id")
+ assert res.fetchall() == [
+ (1, "jack"),
+ (2, "fred"),
+ (3, "ed"),
+ (4, "horse"),
+ (5, "barney"),
+ (6, "donkey"),
+ (7, "sally"),
+ ]
+ for multiparam, param in [
+ (("jack", "fred"), {}),
+ ((["jack", "fred"],), {}),
+ ]:
+ with _string_deprecation_expect():
+ res = conn.execute(
+ "select * from users where user_name=? or "
+ "user_name=? order by user_id",
+ *multiparam,
+ **param
+ )
+ assert res.fetchall() == [(1, "jack"), (2, "fred")]
+
+ with _string_deprecation_expect():
+ res = conn.execute("select * from users where user_name=?", "jack")
+ assert res.fetchall() == [(1, "jack")]
+
+ @testing.requires.format_paramstyle
+ def test_raw_sprintf(self, connection):
+ conn = connection
+ with _string_deprecation_expect():
+ conn.execute(
+ "insert into users (user_id, user_name) " "values (%s, %s)",
+ [1, "jack"],
+ )
+ with _string_deprecation_expect():
+ conn.execute(
+ "insert into users (user_id, user_name) " "values (%s, %s)",
+ [2, "ed"],
+ [3, "horse"],
+ )
+ with _string_deprecation_expect():
+ conn.execute(
+ "insert into users (user_id, user_name) " "values (%s, %s)",
+ 4,
+ "sally",
+ )
+ with _string_deprecation_expect():
+ conn.execute("insert into users (user_id) values (%s)", 5)
+ with _string_deprecation_expect():
+ res = conn.execute("select * from users order by user_id")
+ assert res.fetchall() == [
+ (1, "jack"),
+ (2, "ed"),
+ (3, "horse"),
+ (4, "sally"),
+ (5, None),
+ ]
+ for multiparam, param in [
+ (("jack", "ed"), {}),
+ ((["jack", "ed"],), {}),
+ ]:
+ with _string_deprecation_expect():
+ res = conn.execute(
+ "select * from users where user_name=%s or "
+ "user_name=%s order by user_id",
+ *multiparam,
+ **param
+ )
+ assert res.fetchall() == [(1, "jack"), (2, "ed")]
+ with _string_deprecation_expect():
+ res = conn.execute(
+ "select * from users where user_name=%s", "jack"
+ )
+ assert res.fetchall() == [(1, "jack")]
+
+ @testing.requires.pyformat_paramstyle
+ def test_raw_python(self, connection):
+ conn = connection
+ with _string_deprecation_expect():
+ conn.execute(
+ "insert into users (user_id, user_name) "
+ "values (%(id)s, %(name)s)",
+ {"id": 1, "name": "jack"},
+ )
+ with _string_deprecation_expect():
+ conn.execute(
+ "insert into users (user_id, user_name) "
+ "values (%(id)s, %(name)s)",
+ {"id": 2, "name": "ed"},
+ {"id": 3, "name": "horse"},
+ )
+ with _string_deprecation_expect():
+ conn.execute(
+ "insert into users (user_id, user_name) "
+ "values (%(id)s, %(name)s)",
+ id=4,
+ name="sally",
+ )
+ with _string_deprecation_expect():
+ res = conn.execute("select * from users order by user_id")
+ assert res.fetchall() == [
+ (1, "jack"),
+ (2, "ed"),
+ (3, "horse"),
+ (4, "sally"),
+ ]
+
+ @testing.requires.named_paramstyle
+ def test_raw_named(self, connection):
+ conn = connection
+ with _string_deprecation_expect():
+ conn.execute(
+ "insert into users (user_id, user_name) "
+ "values (:id, :name)",
+ {"id": 1, "name": "jack"},
+ )
+ with _string_deprecation_expect():
+ conn.execute(
+ "insert into users (user_id, user_name) "
+ "values (:id, :name)",
+ {"id": 2, "name": "ed"},
+ {"id": 3, "name": "horse"},
+ )
+ with _string_deprecation_expect():
+ conn.execute(
+ "insert into users (user_id, user_name) "
+ "values (:id, :name)",
+ id=4,
+ name="sally",
+ )
+ with _string_deprecation_expect():
+ res = conn.execute("select * from users order by user_id")
+ assert res.fetchall() == [
+ (1, "jack"),
+ (2, "ed"),
+ (3, "horse"),
+ (4, "sally"),
+ ]
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index 150038a40..0b5b1b16d 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -50,27 +50,22 @@ from sqlalchemy.testing.util import gc_collect
from sqlalchemy.testing.util import picklers
-users, metadata, users_autoinc = None, None, None
-
-
class SomeException(Exception):
pass
-class ExecuteTest(fixtures.TestBase):
+class ExecuteTest(fixtures.TablesTest):
__backend__ = True
@classmethod
- def setup_class(cls):
- global users, users_autoinc, metadata
- metadata = MetaData(testing.db)
- users = Table(
+ def define_tables(cls, metadata):
+ Table(
"users",
metadata,
Column("user_id", INT, primary_key=True, autoincrement=False),
Column("user_name", VARCHAR(20)),
)
- users_autoinc = Table(
+ Table(
"users_autoinc",
metadata,
Column(
@@ -78,15 +73,6 @@ class ExecuteTest(fixtures.TestBase):
),
Column("user_name", VARCHAR(20)),
)
- metadata.create_all()
-
- @engines.close_first
- def teardown(self):
- testing.db.execute(users.delete())
-
- @classmethod
- def teardown_class(cls):
- metadata.drop_all()
@testing.fails_on(
"postgresql+pg8000",
@@ -101,223 +87,176 @@ class ExecuteTest(fixtures.TestBase):
)
conn = testing.db.connect()
- result = conn.execution_options(no_parameters=True).scalar(stmt)
+ result = (
+ conn.execution_options(no_parameters=True)
+ .exec_driver_sql(stmt)
+ .scalar()
+ )
eq_(result, "%")
- @testing.fails_on_everything_except(
- "firebird", "sqlite", "+pyodbc", "+mxodbc", "mysql+oursql"
- )
- def test_raw_qmark(self):
- def go(conn):
- conn.execute(
- "insert into users (user_id, user_name) " "values (?, ?)",
- (1, "jack"),
- )
- conn.execute(
- "insert into users (user_id, user_name) " "values (?, ?)",
- [2, "fred"],
- )
- conn.execute(
- "insert into users (user_id, user_name) " "values (?, ?)",
- [3, "ed"],
- [4, "horse"],
- )
- conn.execute(
- "insert into users (user_id, user_name) " "values (?, ?)",
- (5, "barney"),
- (6, "donkey"),
- )
- conn.execute(
- "insert into users (user_id, user_name) " "values (?, ?)",
- 7,
- "sally",
- )
- res = conn.execute("select * from users order by user_id")
- assert res.fetchall() == [
- (1, "jack"),
- (2, "fred"),
- (3, "ed"),
- (4, "horse"),
- (5, "barney"),
- (6, "donkey"),
- (7, "sally"),
- ]
- for multiparam, param in [
- (("jack", "fred"), {}),
- ((["jack", "fred"],), {}),
- ]:
- res = conn.execute(
- "select * from users where user_name=? or "
- "user_name=? order by user_id",
- *multiparam,
- **param
- )
- assert res.fetchall() == [(1, "jack"), (2, "fred")]
- res = conn.execute("select * from users where user_name=?", "jack")
- assert res.fetchall() == [(1, "jack")]
- conn.execute("delete from users")
-
- go(testing.db)
- conn = testing.db.connect()
- try:
- go(conn)
- finally:
- conn.close()
-
- # some psycopg2 versions bomb this.
- @testing.fails_on_everything_except(
- "mysql+mysqldb",
- "mysql+pymysql",
- "mysql+cymysql",
- "mysql+mysqlconnector",
- "postgresql",
- )
- def test_raw_sprintf(self):
- def go(conn):
- conn.execute(
- "insert into users (user_id, user_name) " "values (%s, %s)",
- [1, "jack"],
- )
- conn.execute(
- "insert into users (user_id, user_name) " "values (%s, %s)",
- [2, "ed"],
- [3, "horse"],
- )
- conn.execute(
- "insert into users (user_id, user_name) " "values (%s, %s)",
- 4,
- "sally",
- )
- conn.execute("insert into users (user_id) values (%s)", 5)
- res = conn.execute("select * from users order by user_id")
- assert res.fetchall() == [
- (1, "jack"),
- (2, "ed"),
- (3, "horse"),
- (4, "sally"),
- (5, None),
- ]
- for multiparam, param in [
- (("jack", "ed"), {}),
- ((["jack", "ed"],), {}),
- ]:
- res = conn.execute(
- "select * from users where user_name=%s or "
- "user_name=%s order by user_id",
- *multiparam,
- **param
- )
- assert res.fetchall() == [(1, "jack"), (2, "ed")]
- res = conn.execute(
- "select * from users where user_name=%s", "jack"
- )
- assert res.fetchall() == [(1, "jack")]
-
- conn.execute("delete from users")
-
- go(testing.db)
- conn = testing.db.connect()
- try:
- go(conn)
- finally:
- conn.close()
-
- # pyformat is supported for mysql, but skipping because a few driver
- # versions have a bug that bombs out on this test. (1.2.2b3,
- # 1.2.2c1, 1.2.2)
-
- @testing.skip_if(lambda: testing.against("mysql+mysqldb"), "db-api flaky")
- @testing.fails_on_everything_except(
- "postgresql+psycopg2",
- "postgresql+psycopg2cffi",
- "postgresql+pypostgresql",
- "postgresql+pygresql",
- "mysql+mysqlconnector",
- "mysql+pymysql",
- "mysql+cymysql",
- "mssql+pymssql",
- )
- def test_raw_python(self):
- def go(conn):
- conn.execute(
- "insert into users (user_id, user_name) "
- "values (%(id)s, %(name)s)",
- {"id": 1, "name": "jack"},
- )
- conn.execute(
- "insert into users (user_id, user_name) "
- "values (%(id)s, %(name)s)",
- {"id": 2, "name": "ed"},
- {"id": 3, "name": "horse"},
- )
- conn.execute(
- "insert into users (user_id, user_name) "
- "values (%(id)s, %(name)s)",
- id=4,
- name="sally",
- )
- res = conn.execute("select * from users order by user_id")
- assert res.fetchall() == [
- (1, "jack"),
- (2, "ed"),
- (3, "horse"),
- (4, "sally"),
- ]
- conn.execute("delete from users")
+ def test_raw_positional_invalid(self, connection):
+ assert_raises_message(
+ tsa.exc.ArgumentError,
+ "List argument must consist only of tuples or dictionaries",
+ connection.exec_driver_sql,
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ [2, "fred"],
+ )
- go(testing.db)
- conn = testing.db.connect()
- try:
- go(conn)
- finally:
- conn.close()
+ assert_raises_message(
+ tsa.exc.ArgumentError,
+ "List argument must consist only of tuples or dictionaries",
+ connection.exec_driver_sql,
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ [[3, "ed"], [4, "horse"]],
+ )
- @testing.fails_on_everything_except("sqlite", "oracle+cx_oracle")
- def test_raw_named(self):
- def go(conn):
- conn.execute(
- "insert into users (user_id, user_name) "
- "values (:id, :name)",
- {"id": 1, "name": "jack"},
- )
- conn.execute(
- "insert into users (user_id, user_name) "
- "values (:id, :name)",
- {"id": 2, "name": "ed"},
- {"id": 3, "name": "horse"},
- )
- conn.execute(
- "insert into users (user_id, user_name) "
- "values (:id, :name)",
- id=4,
- name="sally",
- )
- res = conn.execute("select * from users order by user_id")
- assert res.fetchall() == [
- (1, "jack"),
- (2, "ed"),
- (3, "horse"),
- (4, "sally"),
- ]
- conn.execute("delete from users")
+ def test_raw_named_invalid(self, connection):
+ assert_raises(
+ TypeError,
+ connection.exec_driver_sql,
+ "insert into users (user_id, user_name) "
+ "values (%(id)s, %(name)s)",
+ {"id": 2, "name": "ed"},
+ {"id": 3, "name": "horse"},
+ )
+ assert_raises(
+ TypeError,
+ connection.exec_driver_sql,
+ "insert into users (user_id, user_name) "
+ "values (%(id)s, %(name)s)",
+ id=4,
+ name="sally",
+ )
- go(testing.db)
- conn = testing.db.connect()
- try:
- go(conn)
- finally:
- conn.close()
+ @testing.requires.qmark_paramstyle
+ def test_raw_qmark(self, connection):
+ conn = connection
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ (1, "jack"),
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ (2, "fred"),
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ [(3, "ed"), (4, "horse")],
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ [(5, "barney"), (6, "donkey")],
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ (7, "sally"),
+ )
+ res = conn.exec_driver_sql("select * from users order by user_id")
+ assert res.fetchall() == [
+ (1, "jack"),
+ (2, "fred"),
+ (3, "ed"),
+ (4, "horse"),
+ (5, "barney"),
+ (6, "donkey"),
+ (7, "sally"),
+ ]
+
+ res = conn.exec_driver_sql(
+ "select * from users where user_name=?", ("jack",)
+ )
+ assert res.fetchall() == [(1, "jack")]
+
+ @testing.requires.format_paramstyle
+ def test_raw_sprintf(self, connection):
+ conn = connection
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (%s, %s)",
+ (1, "jack"),
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (%s, %s)",
+ [(2, "ed"), (3, "horse")],
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (%s, %s)",
+ (4, "sally"),
+ )
+ conn.exec_driver_sql("insert into users (user_id) values (%s)", (5,))
+ res = conn.exec_driver_sql("select * from users order by user_id")
+ assert res.fetchall() == [
+ (1, "jack"),
+ (2, "ed"),
+ (3, "horse"),
+ (4, "sally"),
+ (5, None),
+ ]
+
+ res = conn.exec_driver_sql(
+ "select * from users where user_name=%s", ("jack",)
+ )
+ assert res.fetchall() == [(1, "jack")]
+
+ @testing.requires.pyformat_paramstyle
+ def test_raw_python(self, connection):
+ conn = connection
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) "
+ "values (%(id)s, %(name)s)",
+ {"id": 1, "name": "jack"},
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) "
+ "values (%(id)s, %(name)s)",
+ [{"id": 2, "name": "ed"}, {"id": 3, "name": "horse"}],
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) "
+ "values (%(id)s, %(name)s)",
+ dict(id=4, name="sally"),
+ )
+ res = conn.exec_driver_sql("select * from users order by user_id")
+ assert res.fetchall() == [
+ (1, "jack"),
+ (2, "ed"),
+ (3, "horse"),
+ (4, "sally"),
+ ]
+
+ @testing.requires.named_paramstyle
+ def test_raw_named(self, connection):
+ conn = connection
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (:id, :name)",
+ {"id": 1, "name": "jack"},
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (:id, :name)",
+ [{"id": 2, "name": "ed"}, {"id": 3, "name": "horse"}],
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (:id, :name)",
+ {"id": 4, "name": "sally"},
+ )
+ res = conn.exec_driver_sql("select * from users order by user_id")
+ assert res.fetchall() == [
+ (1, "jack"),
+ (2, "ed"),
+ (3, "horse"),
+ (4, "sally"),
+ ]
@testing.engines.close_open_connections
def test_exception_wrapping_dbapi(self):
conn = testing.db.connect()
- for _c in testing.db, conn:
- assert_raises_message(
- tsa.exc.DBAPIError,
- r"not_a_valid_statement",
- _c.execute,
- "not_a_valid_statement",
- )
+ # engine does not have exec_driver_sql
+ assert_raises_message(
+ tsa.exc.DBAPIError,
+ r"not_a_valid_statement",
+ conn.exec_driver_sql,
+ "not_a_valid_statement",
+ )
@testing.requires.sqlite
def test_exception_wrapping_non_dbapi_error(self):
@@ -334,7 +273,10 @@ class ExecuteTest(fixtures.TestBase):
)
assert_raises_message(
- TypeError, "I'm not a DBAPI error", c.execute, "select "
+ TypeError,
+ "I'm not a DBAPI error",
+ c.exec_driver_sql,
+ "select ",
)
eq_(is_disconnect.call_count, 0)
@@ -359,7 +301,7 @@ class ExecuteTest(fixtures.TestBase):
):
with testing.db.connect() as conn:
assert_raises(
- tsa.exc.OperationalError, conn.execute, "select 1"
+ tsa.exc.OperationalError, conn.exec_driver_sql, "select 1"
)
def test_exception_wrapping_non_dbapi_statement(self):
@@ -378,11 +320,8 @@ class ExecuteTest(fixtures.TestBase):
)
_go(testing.db)
- conn = testing.db.connect()
- try:
+ with testing.db.connect() as conn:
_go(conn)
- finally:
- conn.close()
def test_not_an_executable(self):
for obj in (
@@ -574,6 +513,7 @@ class ExecuteTest(fixtures.TestBase):
def test_empty_insert(self):
"""test that execute() interprets [] as a list with no params"""
+ users_autoinc = self.tables.users_autoinc
testing.db.execute(
users_autoinc.insert().values(user_name=bindparam("name", None)),
@@ -885,7 +825,7 @@ class CompiledCacheTest(fixtures.TestBase):
cached_conn.execute(ins, {"user_name": "u3"})
eq_(compile_mock.call_count, 1)
assert len(cache) == 1
- eq_(conn.execute("select count(*) from users").scalar(), 3)
+ eq_(conn.exec_driver_sql("select count(*) from users").scalar(), 3)
@testing.only_on(
["sqlite", "mysql", "postgresql"],
@@ -924,7 +864,7 @@ class CompiledCacheTest(fixtures.TestBase):
cached_conn.execute(ins, {"photo_blob": blob})
eq_(compile_mock.call_count, 1)
eq_(len(cache), 1)
- eq_(conn.execute("select count(*) from photo").scalar(), 1)
+ eq_(conn.exec_driver_sql("select count(*) from photo").scalar(), 1)
del blob
@@ -1427,7 +1367,7 @@ class EngineEventsTest(fixtures.TestBase):
with e1.connect() as conn:
- result = conn.execute(stmt)
+ result = conn.exec_driver_sql(stmt)
ctx = result.context
eq_(
@@ -1496,7 +1436,7 @@ class EngineEventsTest(fixtures.TestBase):
t1.insert().execute(c1=5, c2="some data")
t1.insert().execute(c1=6)
eq_(
- engine.execute("select * from t1").fetchall(),
+ engine.execute(text("select * from t1")).fetchall(),
[(5, "some data"), (6, "foo")],
)
finally:
@@ -1987,7 +1927,7 @@ class HandleErrorTest(fixtures.TestBase):
with engine.connect() as conn:
try:
- conn.execute("SELECT FOO FROM I_DONT_EXIST")
+ conn.exec_driver_sql("SELECT FOO FROM I_DONT_EXIST")
assert False
except tsa.exc.DBAPIError as e:
ctx = canary.mock_calls[0][1][0]
@@ -2018,20 +1958,20 @@ class HandleErrorTest(fixtures.TestBase):
assert_raises_message(
MyException,
"my exception",
- conn.execute,
+ conn.exec_driver_sql,
"SELECT 'ERROR ONE' FROM I_DONT_EXIST",
)
# case 2: return the DBAPI exception we're given;
# no wrapping should occur
assert_raises(
conn.dialect.dbapi.Error,
- conn.execute,
+ conn.exec_driver_sql,
"SELECT 'ERROR TWO' FROM I_DONT_EXIST",
)
# case 3: normal wrapping
assert_raises(
tsa.exc.DBAPIError,
- conn.execute,
+ conn.exec_driver_sql,
"SELECT 'ERROR THREE' FROM I_DONT_EXIST",
)
@@ -2080,7 +2020,7 @@ class HandleErrorTest(fixtures.TestBase):
assert_raises_message(
MyException2,
"my exception chained",
- conn.execute,
+ conn.exec_driver_sql,
"SELECT 'ERROR ONE' FROM I_DONT_EXIST",
)
eq_(patched.call_count, 1)
@@ -2090,7 +2030,7 @@ class HandleErrorTest(fixtures.TestBase):
) as patched:
assert_raises(
MyException1,
- conn.execute,
+ conn.exec_driver_sql,
"SELECT 'ERROR TWO' FROM I_DONT_EXIST",
)
eq_(patched.call_count, 1)
@@ -2102,7 +2042,7 @@ class HandleErrorTest(fixtures.TestBase):
# by err2
assert_raises(
MyException1,
- conn.execute,
+ conn.exec_driver_sql,
"SELECT 'ERROR THREE' FROM I_DONT_EXIST",
)
eq_(patched.call_count, 1)
@@ -2112,7 +2052,7 @@ class HandleErrorTest(fixtures.TestBase):
) as patched:
assert_raises(
tsa.exc.DBAPIError,
- conn.execute,
+ conn.exec_driver_sql,
"SELECT 'ERROR FIVE' FROM I_DONT_EXIST",
)
eq_(patched.call_count, 1)
@@ -2123,7 +2063,7 @@ class HandleErrorTest(fixtures.TestBase):
assert_raises_message(
MyException3,
"my exception short circuit",
- conn.execute,
+ conn.exec_driver_sql,
"SELECT 'ERROR FOUR' FROM I_DONT_EXIST",
)
eq_(patched.call_count, 1)
@@ -2145,7 +2085,7 @@ class HandleErrorTest(fixtures.TestBase):
assert_raises_message(
tsa.exc.OperationalError,
"rollback failed",
- conn.execute,
+ conn.exec_driver_sql,
"insert into i_dont_exist (x) values ('y')",
)
@@ -2200,7 +2140,10 @@ class HandleErrorTest(fixtures.TestBase):
)
assert_raises_message(
- TypeError, "I'm not a DBAPI error", c.execute, "select "
+ TypeError,
+ "I'm not a DBAPI error",
+ c.exec_driver_sql,
+ "select ",
)
ctx = listener.mock_calls[0][1][0]
eq_(ctx.statement, "select ")
@@ -2223,13 +2166,17 @@ class HandleErrorTest(fixtures.TestBase):
with engine.connect() as conn:
assert_raises(
tsa.exc.DBAPIError,
- conn.execution_options(skip_user_error_events=True).execute,
+ conn.execution_options(
+ skip_user_error_events=True
+ ).exec_driver_sql,
"SELECT ERROR_ONE FROM I_DONT_EXIST",
)
assert_raises(
MyException1,
- conn.execution_options(skip_user_error_events=False).execute,
+ conn.execution_options(
+ skip_user_error_events=False
+ ).exec_driver_sql,
"SELECT ERROR_ONE FROM I_DONT_EXIST",
)
@@ -2246,7 +2193,7 @@ class HandleErrorTest(fixtures.TestBase):
with engine.connect() as c:
try:
- c.execute("SELECT x FROM nonexistent")
+ c.exec_driver_sql("SELECT x FROM nonexistent")
assert False
except tsa.exc.StatementError as st:
eq_(st.connection_invalidated, evt_value)
@@ -2287,7 +2234,7 @@ class HandleErrorTest(fixtures.TestBase):
with engine.connect() as c:
target_crec = c.connection._connection_record
try:
- c.execute("SELECT x FROM nonexistent")
+ c.exec_driver_sql("SELECT x FROM nonexistent")
assert False
except tsa.exc.StatementError as st:
eq_(st.connection_invalidated, True)
@@ -2644,7 +2591,9 @@ class DialectEventTest(fixtures.TestBase):
def _test_do_execute(self, retval):
with self._run_test(retval) as (conn, m1):
- result = conn.execute("insert into table foo", {"foo": "bar"})
+ result = conn.exec_driver_sql(
+ "insert into table foo", {"foo": "bar"}
+ )
self._assert(
retval,
m1.do_execute,
@@ -2661,7 +2610,7 @@ class DialectEventTest(fixtures.TestBase):
def _test_do_executemany(self, retval):
with self._run_test(retval) as (conn, m1):
- result = conn.execute(
+ result = conn.exec_driver_sql(
"insert into table foo", [{"foo": "bar"}, {"foo": "bar"}]
)
self._assert(
@@ -2680,9 +2629,9 @@ class DialectEventTest(fixtures.TestBase):
def _test_do_execute_no_params(self, retval):
with self._run_test(retval) as (conn, m1):
- result = conn.execution_options(no_parameters=True).execute(
- "insert into table foo"
- )
+ result = conn.execution_options(
+ no_parameters=True
+ ).exec_driver_sql("insert into table foo")
self._assert(
retval,
m1.do_execute_no_params,
@@ -2840,7 +2789,7 @@ class AutocommitTextTest(fixtures.TestBase):
engine.dialect.dbapi = dbapi
with engine.connect() as conn:
- conn.execute("%s something table something" % keyword)
+ conn.exec_driver_sql("%s something table something" % keyword)
if expected:
eq_(dbapi.connect().mock_calls, [call.cursor(), call.commit()])
diff --git a/test/engine/test_logging.py b/test/engine/test_logging.py
index fe4ff44a7..5d50a010d 100644
--- a/test/engine/test_logging.py
+++ b/test/engine/test_logging.py
@@ -20,6 +20,11 @@ from sqlalchemy.testing import mock
from sqlalchemy.testing.util import lazy_gc
+def exec_sql(engine, sql, *args, **kwargs):
+ with engine.connect() as conn:
+ return conn.exec_driver_sql(sql, *args, **kwargs)
+
+
class LogParamsTest(fixtures.TestBase):
__only_on__ = "sqlite"
__requires__ = ("ad_hoc_engines",)
@@ -29,21 +34,23 @@ class LogParamsTest(fixtures.TestBase):
self.no_param_engine = engines.testing_engine(
options={"echo": True, "hide_parameters": True}
)
- self.eng.execute("create table if not exists foo (data string)")
- self.no_param_engine.execute(
- "create table if not exists foo (data string)"
+ exec_sql(self.eng, "create table if not exists foo (data string)")
+ exec_sql(
+ self.no_param_engine,
+ "create table if not exists foo (data string)",
)
self.buf = logging.handlers.BufferingHandler(100)
for log in [logging.getLogger("sqlalchemy.engine")]:
log.addHandler(self.buf)
def teardown(self):
- self.eng.execute("drop table if exists foo")
+ exec_sql(self.eng, "drop table if exists foo")
for log in [logging.getLogger("sqlalchemy.engine")]:
log.removeHandler(self.buf)
def test_log_large_list_of_dict(self):
- self.eng.execute(
+ exec_sql(
+ self.eng,
"INSERT INTO foo (data) values (:data)",
[{"data": str(i)} for i in range(100)],
)
@@ -71,7 +78,8 @@ class LogParamsTest(fixtures.TestBase):
)
def test_log_no_parameters(self):
- self.no_param_engine.execute(
+ exec_sql(
+ self.no_param_engine,
"INSERT INTO foo (data) values (:data)",
[{"data": str(i)} for i in range(100)],
)
@@ -81,7 +89,8 @@ class LogParamsTest(fixtures.TestBase):
)
def test_log_large_list_of_tuple(self):
- self.eng.execute(
+ exec_sql(
+ self.eng,
"INSERT INTO foo (data) values (?)",
[(str(i),) for i in range(100)],
)
@@ -210,7 +219,7 @@ class LogParamsTest(fixtures.TestBase):
largeparam = "".join(chr(random.randint(52, 85)) for i in range(5000))
- self.eng.execute("INSERT INTO foo (data) values (?)", (largeparam,))
+ exec_sql(self.eng, "INSERT INTO foo (data) values (?)", (largeparam,))
eq_(
self.buf.buffer[1].message,
@@ -225,7 +234,7 @@ class LogParamsTest(fixtures.TestBase):
lp2 = "".join(chr(random.randint(52, 85)) for i in range(8))
lp3 = "".join(chr(random.randint(52, 85)) for i in range(670))
- self.eng.execute("SELECT ?, ?, ?", (lp1, lp2, lp3))
+ exec_sql(self.eng, "SELECT ?, ?, ?", (lp1, lp2, lp3))
eq_(
self.buf.buffer[1].message,
@@ -240,8 +249,10 @@ class LogParamsTest(fixtures.TestBase):
lp2 = "".join(chr(random.randint(52, 85)) for i in range(200))
lp3 = "".join(chr(random.randint(52, 85)) for i in range(670))
- self.eng.execute(
- "INSERT INTO foo (data) values (?)", [(lp1,), (lp2,), (lp3,)]
+ exec_sql(
+ self.eng,
+ "INSERT INTO foo (data) values (?)",
+ [(lp1,), (lp2,), (lp3,)],
)
eq_(
@@ -273,7 +284,8 @@ class LogParamsTest(fixtures.TestBase):
tsa.exc.DBAPIError,
r".*INSERT INTO nonexistent \(data\) values \(:data\)\]\n"
r"\[SQL parameters hidden due to hide_parameters=True\]",
- lambda: self.no_param_engine.execute(
+ lambda: exec_sql(
+ self.no_param_engine,
"INSERT INTO nonexistent (data) values (:data)",
[{"data": str(i)} for i in range(10)],
),
@@ -324,7 +336,7 @@ class LogParamsTest(fixtures.TestBase):
largeparam = "".join(chr(random.randint(52, 85)) for i in range(5000))
self.eng.echo = "debug"
- result = self.eng.execute("SELECT ?", (largeparam,))
+ result = exec_sql(self.eng, "SELECT ?", (largeparam,))
row = result.first()
@@ -370,7 +382,8 @@ class LogParamsTest(fixtures.TestBase):
r"{'data': '6'}, {'data': '7'} ... displaying 10 of "
r"100 total bound parameter sets ... {'data': '98'}, "
r"{'data': '99'}\]",
- lambda: self.eng.execute(
+ lambda: exec_sql(
+ self.eng,
"INSERT INTO nonexistent (data) values (:data)",
[{"data": str(i)} for i in range(100)],
),
@@ -385,7 +398,8 @@ class LogParamsTest(fixtures.TestBase):
r"... displaying "
r"10 of 100 total bound parameter sets ... "
r"\('98',\), \('99',\)\]",
- lambda: self.eng.execute(
+ lambda: exec_sql(
+ self.eng,
"INSERT INTO nonexistent (data) values (?)",
[(str(i),) for i in range(100)],
),
diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py
index 000be1a70..a09b04748 100644
--- a/test/engine/test_reconnect.py
+++ b/test/engine/test_reconnect.py
@@ -826,7 +826,7 @@ class CursorErrTest(fixtures.TestBase):
def test_cursor_explode(self):
db = self._fixture(False, False)
conn = db.connect()
- result = conn.execute("select foo")
+ result = conn.exec_driver_sql("select foo")
result.close()
conn.close()
eq_(
@@ -1006,7 +1006,7 @@ class RealReconnectTest(fixtures.TestBase):
engine = engines.testing_engine()
def broken_initialize(connection):
- connection.execute("select fake_stuff from _fake_table")
+ connection.exec_driver_sql("select fake_stuff from _fake_table")
engine.dialect.initialize = broken_initialize
@@ -1020,7 +1020,7 @@ class RealReconnectTest(fixtures.TestBase):
engine = engines.testing_engine()
def broken_initialize(connection):
- connection.execute("select fake_stuff from _fake_table")
+ connection.exec_driver_sql("select fake_stuff from _fake_table")
engine.dialect.initialize = broken_initialize
@@ -1186,7 +1186,7 @@ class InvalidateDuringResultTest(fixtures.TestBase):
)
def test_invalidate_on_results(self):
conn = self.engine.connect()
- result = conn.execute("select * from sometable")
+ result = conn.exec_driver_sql("select * from sometable")
for x in range(20):
result.fetchone()
self.engine.test_shutdown()
diff --git a/test/engine/test_reflection.py b/test/engine/test_reflection.py
index 9de181216..3a2a3c699 100644
--- a/test/engine/test_reflection.py
+++ b/test/engine/test_reflection.py
@@ -1016,7 +1016,7 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
indexes"""
with testing.db.begin() as conn:
- conn.execute(
+ conn.exec_driver_sql(
"""
CREATE TABLE book (
id INTEGER NOT NULL,
@@ -1056,7 +1056,7 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
"""test reflection of a composite primary key"""
with testing.db.begin() as conn:
- conn.execute(
+ conn.exec_driver_sql(
"""
CREATE TABLE book (
id INTEGER NOT NULL,
@@ -2045,19 +2045,21 @@ class ReverseCasingReflectTest(fixtures.TestBase, AssertsCompiledSQL):
@testing.requires.denormalized_names
def setup(self):
- testing.db.execute(
+ with testing.db.connect() as conn:
+ conn.exec_driver_sql(
+ """
+ CREATE TABLE weird_casing(
+ col1 char(20),
+ "Col2" char(20),
+ "col3" char(20)
+ )
"""
- CREATE TABLE weird_casing(
- col1 char(20),
- "Col2" char(20),
- "col3" char(20)
- )
- """
- )
+ )
@testing.requires.denormalized_names
def teardown(self):
- testing.db.execute("drop table weird_casing")
+ with testing.db.connect() as conn:
+ conn.exec_driver_sql("drop table weird_casing")
@testing.requires.denormalized_names
def test_direct_quoting(self):
diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py
index 39c9b2ad4..c6a6c5e3a 100644
--- a/test/engine/test_transaction.py
+++ b/test/engine/test_transaction.py
@@ -60,7 +60,7 @@ class TransactionTest(fixtures.TestBase):
transaction.commit()
transaction = connection.begin()
- result = connection.execute("select * from query_users")
+ result = connection.exec_driver_sql("select * from query_users")
assert len(result.fetchall()) == 3
transaction.commit()
connection.close()
@@ -74,7 +74,7 @@ class TransactionTest(fixtures.TestBase):
connection.execute(users.insert(), user_id=2, user_name="user2")
connection.execute(users.insert(), user_id=3, user_name="user3")
transaction.rollback()
- result = connection.execute("select * from query_users")
+ result = connection.exec_driver_sql("select * from query_users")
assert len(result.fetchall()) == 0
connection.close()
@@ -92,7 +92,7 @@ class TransactionTest(fixtures.TestBase):
print("Exception: ", e)
transaction.rollback()
- result = connection.execute("select * from query_users")
+ result = connection.exec_driver_sql("select * from query_users")
assert len(result.fetchall()) == 0
connection.close()
@@ -150,7 +150,7 @@ class TransactionTest(fixtures.TestBase):
assert_raises_message(
exc.InvalidRequestError,
"This connection is on an inactive transaction. Please",
- connection.execute,
+ connection.exec_driver_sql,
"select 1",
)
@@ -189,7 +189,12 @@ class TransactionTest(fixtures.TestBase):
assert branched.in_transaction()
branched.execute(users.insert(), user_id=2, user_name="user2")
nested.rollback()
- eq_(connection.scalar("select count(*) from query_users"), 1)
+ eq_(
+ connection.exec_driver_sql(
+ "select count(*) from query_users"
+ ).scalar(),
+ 1,
+ )
finally:
connection.close()
@@ -201,7 +206,12 @@ class TransactionTest(fixtures.TestBase):
branched.execute(users.insert(), user_id=1, user_name="user1")
finally:
connection.close()
- eq_(testing.db.scalar("select count(*) from query_users"), 1)
+ eq_(
+ testing.db.execute(
+ text("select count(*) from query_users")
+ ).scalar(),
+ 1,
+ )
@testing.requires.savepoints
def test_branch_savepoint_rollback(self):
@@ -216,7 +226,12 @@ class TransactionTest(fixtures.TestBase):
nested.rollback()
assert connection.in_transaction()
trans.commit()
- eq_(connection.scalar("select count(*) from query_users"), 1)
+ eq_(
+ connection.exec_driver_sql(
+ "select count(*) from query_users"
+ ).scalar(),
+ 1,
+ )
finally:
connection.close()
@@ -232,7 +247,12 @@ class TransactionTest(fixtures.TestBase):
branched.execute(users.insert(), user_id=2, user_name="user2")
nested.rollback()
assert not connection.in_transaction()
- eq_(connection.scalar("select count(*) from query_users"), 1)
+ eq_(
+ connection.exec_driver_sql(
+ "select count(*) from query_users"
+ ).scalar(),
+ 1,
+ )
finally:
connection.close()
@@ -267,7 +287,12 @@ class TransactionTest(fixtures.TestBase):
conn2 = connection.execution_options(dummy=True)
conn2.execute(users.insert(), user_id=2, user_name="user2")
transaction.rollback()
- eq_(connection.scalar("select count(*) from query_users"), 0)
+ eq_(
+ connection.exec_driver_sql(
+ "select count(*) from query_users"
+ ).scalar(),
+ 0,
+ )
finally:
connection.close()
@@ -283,9 +308,12 @@ class TransactionTest(fixtures.TestBase):
trans2.commit()
transaction.rollback()
self.assert_(
- connection.scalar("select count(*) from " "query_users") == 0
+ connection.exec_driver_sql(
+ "select count(*) from " "query_users"
+ ).scalar()
+ == 0
)
- result = connection.execute("select * from query_users")
+ result = connection.exec_driver_sql("select * from query_users")
assert len(result.fetchall()) == 0
connection.close()
@@ -301,7 +329,10 @@ class TransactionTest(fixtures.TestBase):
assert not trans.is_active
self.assert_(
- connection.scalar("select count(*) from " "query_users") == 0
+ connection.exec_driver_sql(
+ "select count(*) from " "query_users"
+ ).scalar()
+ == 0
)
trans = connection.begin()
@@ -309,7 +340,10 @@ class TransactionTest(fixtures.TestBase):
trans.__exit__(None, None, None)
assert not trans.is_active
self.assert_(
- connection.scalar("select count(*) from " "query_users") == 1
+ connection.exec_driver_sql(
+ "select count(*) from " "query_users"
+ ).scalar()
+ == 1
)
connection.close()
@@ -328,9 +362,12 @@ class TransactionTest(fixtures.TestBase):
transaction.commit()
assert not connection.in_transaction()
self.assert_(
- connection.scalar("select count(*) from " "query_users") == 5
+ connection.exec_driver_sql(
+ "select count(*) from " "query_users"
+ ).scalar()
+ == 5
)
- result = connection.execute("select * from query_users")
+ result = connection.exec_driver_sql("select * from query_users")
assert len(result.fetchall()) == 5
connection.close()
@@ -349,9 +386,12 @@ class TransactionTest(fixtures.TestBase):
transaction.close()
assert not connection.in_transaction()
self.assert_(
- connection.scalar("select count(*) from " "query_users") == 0
+ connection.exec_driver_sql(
+ "select count(*) from " "query_users"
+ ).scalar()
+ == 0
)
- result = connection.execute("select * from query_users")
+ result = connection.exec_driver_sql("select * from query_users")
assert len(result.fetchall()) == 0
connection.close()
@@ -406,7 +446,7 @@ class TransactionTest(fixtures.TestBase):
assert_raises_message(
exc.InvalidRequestError,
"This connection is on an inactive savepoint transaction.",
- connection.execute,
+ connection.exec_driver_sql,
"select 1",
)
trans2.rollback()
@@ -701,7 +741,7 @@ class AutoRollbackTest(fixtures.TestBase):
test_needs_acid=True,
)
users.create(conn1)
- conn1.execute("select * from deadlock_users")
+ conn1.exec_driver_sql("select * from deadlock_users")
conn1.close()
# without auto-rollback in the connection pool's return() logic,
@@ -732,20 +772,23 @@ class ExplicitAutoCommitTest(fixtures.TestBase):
Column("id", Integer, primary_key=True),
Column("data", String(100)),
)
- metadata.create_all()
- testing.db.execute(
- "create function insert_foo(varchar) "
- "returns integer as 'insert into foo(data) "
- "values ($1);select 1;' language sql"
- )
+ with testing.db.connect() as conn:
+ metadata.create_all(conn)
+ conn.exec_driver_sql(
+ "create function insert_foo(varchar) "
+ "returns integer as 'insert into foo(data) "
+ "values ($1);select 1;' language sql"
+ )
def teardown(self):
- foo.delete().execute().close()
+ with testing.db.connect() as conn:
+ conn.execute(foo.delete())
@classmethod
def teardown_class(cls):
- testing.db.execute("drop function insert_foo(varchar)")
- metadata.drop_all()
+ with testing.db.connect() as conn:
+ conn.exec_driver_sql("drop function insert_foo(varchar)")
+ metadata.drop_all(conn)
def test_control(self):
diff --git a/test/ext/test_horizontal_shard.py b/test/ext/test_horizontal_shard.py
index f78f1ff06..70a126f4f 100644
--- a/test/ext/test_horizontal_shard.py
+++ b/test/ext/test_horizontal_shard.py
@@ -446,7 +446,7 @@ class AttachedFileShardTest(ShardTest, fixtures.TestBase):
e = testing_engine("sqlite://")
with e.connect() as conn:
for i in range(1, 5):
- conn.execute(
+ conn.exec_driver_sql(
'ATTACH DATABASE "shard%s_%s.db" AS shard%s'
% (i, provision.FOLLOWER_IDENT, i)
)
diff --git a/test/orm/test_bind.py b/test/orm/test_bind.py
index 3ca2c1021..63588d73e 100644
--- a/test/orm/test_bind.py
+++ b/test/orm/test_bind.py
@@ -255,7 +255,7 @@ class BindIntegrationTest(_fixtures.FixtureTest):
sess.flush()
sess.close()
assert not c.in_transaction()
- assert c.scalar("select count(1) from users") == 0
+ assert c.exec_driver_sql("select count(1) from users").scalar() == 0
sess = create_session(bind=c, autocommit=False)
u = User(name="u2")
@@ -263,9 +263,9 @@ class BindIntegrationTest(_fixtures.FixtureTest):
sess.flush()
sess.commit()
assert not c.in_transaction()
- assert c.scalar("select count(1) from users") == 1
- c.execute("delete from users")
- assert c.scalar("select count(1) from users") == 0
+ assert c.exec_driver_sql("select count(1) from users").scalar() == 1
+ c.exec_driver_sql("delete from users")
+ assert c.exec_driver_sql("select count(1) from users").scalar() == 0
c = testing.db.connect()
@@ -277,7 +277,7 @@ class BindIntegrationTest(_fixtures.FixtureTest):
assert c.in_transaction()
trans.commit()
assert not c.in_transaction()
- assert c.scalar("select count(1) from users") == 1
+ assert c.exec_driver_sql("select count(1) from users").scalar() == 1
class SessionBindTest(fixtures.MappedTest):
diff --git a/test/orm/test_session.py b/test/orm/test_session.py
index fa68fedfe..864264af9 100644
--- a/test/orm/test_session.py
+++ b/test/orm/test_session.py
@@ -99,13 +99,13 @@ class TransScopingTest(_fixtures.FixtureTest):
User, users = self.classes.User, self.tables.users
c = testing.db.connect()
- c.execute("select * from users")
+ c.exec_driver_sql("select * from users")
mapper(User, users)
s = create_session(bind=c)
s.add(User(name="first"))
s.flush()
- c.execute("select * from users")
+ c.exec_driver_sql("select * from users")
def test_close(self):
"""close() doesn't close a connection the session didn't open"""
@@ -113,15 +113,15 @@ class TransScopingTest(_fixtures.FixtureTest):
User, users = self.classes.User, self.tables.users
c = testing.db.connect()
- c.execute("select * from users")
+ c.exec_driver_sql("select * from users")
mapper(User, users)
s = create_session(bind=c)
s.add(User(name="first"))
s.flush()
- c.execute("select * from users")
+ c.exec_driver_sql("select * from users")
s.close()
- c.execute("select * from users")
+ c.exec_driver_sql("select * from users")
def test_autobegin_execute(self):
# test the new autobegin behavior introduced in #5074
@@ -192,13 +192,21 @@ class TransScopingTest(_fixtures.FixtureTest):
u = User(name="x")
sess.add(u)
sess.flush()
- assert conn1.execute("select count(1) from users").scalar() == 1
- assert conn2.execute("select count(1) from users").scalar() == 0
+ assert (
+ conn1.exec_driver_sql("select count(1) from users").scalar() == 1
+ )
+ assert (
+ conn2.exec_driver_sql("select count(1) from users").scalar() == 0
+ )
sess.commit()
- assert conn1.execute("select count(1) from users").scalar() == 1
+ assert (
+ conn1.exec_driver_sql("select count(1) from users").scalar() == 1
+ )
assert (
- testing.db.connect().execute("select count(1) from users").scalar()
+ testing.db.connect()
+ .exec_driver_sql("select count(1) from users")
+ .scalar()
== 1
)
sess.close()
@@ -412,11 +420,16 @@ class SessionStateTest(_fixtures.FixtureTest):
sess.add(u)
u2 = sess.query(User).filter_by(name="ed").one()
assert u2 is u
- eq_(conn1.execute("select count(1) from users").scalar(), 1)
- eq_(conn2.execute("select count(1) from users").scalar(), 0)
+ eq_(conn1.exec_driver_sql("select count(1) from users").scalar(), 1)
+ eq_(conn2.exec_driver_sql("select count(1) from users").scalar(), 0)
sess.commit()
- eq_(conn1.execute("select count(1) from users").scalar(), 1)
- eq_(bind.connect().execute("select count(1) from users").scalar(), 1)
+ eq_(conn1.exec_driver_sql("select count(1) from users").scalar(), 1)
+ eq_(
+ bind.connect()
+ .exec_driver_sql("select count(1) from users")
+ .scalar(),
+ 1,
+ )
sess.close()
def test_with_no_autoflush(self):
@@ -556,7 +569,7 @@ class SessionStateTest(_fixtures.FixtureTest):
)
assert (
testing.db.connect()
- .execute("select count(1) from users")
+ .exec_driver_sql("select count(1) from users")
.scalar()
== 0
)
@@ -569,7 +582,7 @@ class SessionStateTest(_fixtures.FixtureTest):
)
assert (
testing.db.connect()
- .execute("select count(1) from users")
+ .exec_driver_sql("select count(1) from users")
.scalar()
== 1
)
@@ -589,9 +602,13 @@ class SessionStateTest(_fixtures.FixtureTest):
u.name = "ed"
sess.add(u)
sess.commit()
- assert conn1.execute("select count(1) from users").scalar() == 1
assert (
- testing.db.connect().execute("select count(1) from users").scalar()
+ conn1.exec_driver_sql("select count(1) from users").scalar() == 1
+ )
+ assert (
+ testing.db.connect()
+ .exec_driver_sql("select count(1) from users")
+ .scalar()
== 1
)
sess.commit()
diff --git a/test/orm/test_transaction.py b/test/orm/test_transaction.py
index 676a37e8d..15244d9d2 100644
--- a/test/orm/test_transaction.py
+++ b/test/orm/test_transaction.py
@@ -49,7 +49,7 @@ class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest):
tran = s.transaction
s.add(User(name="first"))
s.flush()
- c.execute("select * from users")
+ c.exec_driver_sql("select * from users")
u = User(name="two")
s.add(u)
s.flush()
@@ -153,18 +153,24 @@ class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest):
session.begin_nested()
session.connection().execute(users.insert().values(name="user2"))
assert (
- session.connection().execute("select count(1) from users").scalar()
+ session.connection()
+ .exec_driver_sql("select count(1) from users")
+ .scalar()
== 2
)
session.rollback()
assert (
- session.connection().execute("select count(1) from users").scalar()
+ session.connection()
+ .exec_driver_sql("select count(1) from users")
+ .scalar()
== 1
)
session.connection().execute(users.insert().values(name="user3"))
session.commit()
assert (
- session.connection().execute("select count(1) from users").scalar()
+ session.connection()
+ .exec_driver_sql("select count(1) from users")
+ .scalar()
== 2
)
diff --git a/test/perf/invalidate_stresstest.py b/test/perf/invalidate_stresstest.py
index 59d78237e..c7e1482c7 100644
--- a/test/perf/invalidate_stresstest.py
+++ b/test/perf/invalidate_stresstest.py
@@ -31,7 +31,7 @@ def worker():
try:
conn.begin()
for i in range(5):
- conn.execute("SELECT 1+1")
+ conn.exec_driver_sql("SELECT 1+1")
gevent.sleep(random.random() * 1.01)
except Exception:
@@ -49,8 +49,8 @@ def main():
gevent.sleep(3)
while True:
- result = list(engine.execute("show processlist"))
- engine.execute("kill %d" % result[-2][0])
+ result = list(engine.exec_driver_sql("show processlist"))
+ engine.exec_driver_sql("kill %d" % result[-2][0])
print("\n\n\n BOOM!!!!! \n\n\n")
gevent.sleep(5)
print(engine.pool.status())
diff --git a/test/requirements.py b/test/requirements.py
index 913d06a01..bddcfc09a 100644
--- a/test/requirements.py
+++ b/test/requirements.py
@@ -181,6 +181,43 @@ class DefaultRequirements(SuiteRequirements):
return skip_if(["firebird", "mssql+mxodbc"], "not supported by driver")
@property
+ def qmark_paramstyle(self):
+ return only_on(
+ ["firebird", "sqlite", "+pyodbc", "+mxodbc", "mysql+oursql"]
+ )
+
+ @property
+ def named_paramstyle(self):
+ return only_on(["sqlite", "oracle+cx_oracle"])
+
+ @property
+ def format_paramstyle(self):
+ return only_on(
+ [
+ "mysql+mysqldb",
+ "mysql+pymysql",
+ "mysql+cymysql",
+ "mysql+mysqlconnector",
+ "postgresql",
+ ]
+ )
+
+ @property
+ def pyformat_paramstyle(self):
+ return only_on(
+ [
+ "postgresql+psycopg2",
+ "postgresql+psycopg2cffi",
+ "postgresql+pypostgresql",
+ "postgresql+pygresql",
+ "mysql+mysqlconnector",
+ "mysql+pymysql",
+ "mysql+cymysql",
+ "mssql+pymssql",
+ ]
+ )
+
+ @property
def no_quoting_special_bind_names(self):
"""Target database will quote bound parameter names, doesn't support
EXPANDING"""
@@ -876,10 +913,10 @@ class DefaultRequirements(SuiteRequirements):
with config.db.connect() as conn:
try:
return (
- conn.scalar(
+ conn.exec_driver_sql(
"""select json_extract('{"foo": "bar"}', """
"""'$."foo"')"""
- )
+ ).scalar()
== "bar"
)
except exc.DBAPIError:
@@ -1160,9 +1197,13 @@ class DefaultRequirements(SuiteRequirements):
def check(config):
if not against(config, "postgresql"):
return False
- count = config.db.scalar(
- "SELECT count(*) FROM pg_extension "
- "WHERE extname='%s'" % name
+ count = (
+ config.db.connect(close_with_result=True)
+ .exec_driver_sql(
+ "SELECT count(*) FROM pg_extension "
+ "WHERE extname='%s'" % name
+ )
+ .scalar()
)
return bool(count)
@@ -1184,7 +1225,9 @@ class DefaultRequirements(SuiteRequirements):
):
return False
try:
- config.db.scalar("select '[1,2)'::int4range;")
+ config.db.connect(close_with_result=True).exec_driver_sql(
+ "select '[1,2)'::int4range;"
+ ).scalar()
return True
except Exception:
return False
@@ -1383,7 +1426,11 @@ class DefaultRequirements(SuiteRequirements):
if not against(config, "mysql"):
return False
- row = config.db.execute("show variables like 'sql_mode'").first()
+ row = (
+ config.db.connect(close_with_result=True)
+ .exec_driver_sql("show variables like 'sql_mode'")
+ .first()
+ )
return not row or "NO_ZERO_DATE" not in row[1]
return only_if(check)
@@ -1394,7 +1441,11 @@ class DefaultRequirements(SuiteRequirements):
if not against(config, "mysql"):
return False
- row = config.db.execute("show variables like 'sql_mode'").first()
+ row = (
+ config.db.connect(close_with_result=True)
+ .exec_driver_sql("show variables like 'sql_mode'")
+ .first()
+ )
return not row or "STRICT_TRANS_TABLES" not in row[1]
return only_if(check)
@@ -1483,9 +1534,14 @@ class DefaultRequirements(SuiteRequirements):
@property
def postgresql_utf8_server_encoding(self):
+
return only_if(
lambda config: against(config, "postgresql")
- and config.db.scalar("show server_encoding").lower() == "utf8"
+ and config.db.connect(close_with_result=True)
+ .exec_driver_sql("show server_encoding")
+ .scalar()
+ .lower()
+ == "utf8"
)
@property
diff --git a/test/sql/test_defaults.py b/test/sql/test_defaults.py
index 957fa890a..7d33933f8 100644
--- a/test/sql/test_defaults.py
+++ b/test/sql/test_defaults.py
@@ -1654,9 +1654,11 @@ class SequenceAsServerDefaultTest(
def test_default_textual_w_default(self):
with testing.db.connect() as conn:
- conn.execute("insert into t_seq_test (data) values ('some data')")
+ conn.exec_driver_sql(
+ "insert into t_seq_test (data) values ('some data')"
+ )
- eq_(conn.scalar("select id from t_seq_test"), 1)
+ eq_(conn.exec_driver_sql("select id from t_seq_test").scalar(), 1)
def test_default_core_w_default(self):
t_seq_test = self.tables.t_seq_test
@@ -1667,11 +1669,13 @@ class SequenceAsServerDefaultTest(
def test_default_textual_server_only(self):
with testing.db.connect() as conn:
- conn.execute(
+ conn.exec_driver_sql(
"insert into t_seq_test_2 (data) values ('some data')"
)
- eq_(conn.scalar("select id from t_seq_test_2"), 1)
+ eq_(
+ conn.exec_driver_sql("select id from t_seq_test_2").scalar(), 1
+ )
def test_default_core_server_only(self):
t_seq_test = self.tables.t_seq_test_2
diff --git a/test/sql/test_from_linter.py b/test/sql/test_from_linter.py
index bf2f06b57..416f89de3 100644
--- a/test/sql/test_from_linter.py
+++ b/test/sql/test_from_linter.py
@@ -219,7 +219,7 @@ class TestLinter(fixtures.TablesTest):
def test_noop_for_unhandled_objects(self):
with self.bind.connect() as conn:
- conn.execute("SELECT 1;").fetchone()
+ conn.exec_driver_sql("SELECT 1;").fetchone()
def test_does_not_modify_query(self):
with self.bind.connect() as conn:
diff --git a/test/sql/test_quote.py b/test/sql/test_quote.py
index aba6a0204..627626994 100644
--- a/test/sql/test_quote.py
+++ b/test/sql/test_quote.py
@@ -90,19 +90,23 @@ class QuoteExecTest(fixtures.TestBase):
@testing.provide_metadata
def test_has_table_case_sensitive(self):
preparer = testing.db.dialect.identifier_preparer
- if testing.db.dialect.requires_name_normalize:
- testing.db.execute("CREATE TABLE TAB1 (id INTEGER)")
- else:
- testing.db.execute("CREATE TABLE tab1 (id INTEGER)")
- testing.db.execute(
- "CREATE TABLE %s (id INTEGER)" % preparer.quote_identifier("tab2")
- )
- testing.db.execute(
- "CREATE TABLE %s (id INTEGER)" % preparer.quote_identifier("TAB3")
- )
- testing.db.execute(
- "CREATE TABLE %s (id INTEGER)" % preparer.quote_identifier("TAB4")
- )
+ with testing.db.connect() as conn:
+ if conn.dialect.requires_name_normalize:
+ conn.exec_driver_sql("CREATE TABLE TAB1 (id INTEGER)")
+ else:
+ conn.exec_driver_sql("CREATE TABLE tab1 (id INTEGER)")
+ conn.exec_driver_sql(
+ "CREATE TABLE %s (id INTEGER)"
+ % preparer.quote_identifier("tab2")
+ )
+ conn.exec_driver_sql(
+ "CREATE TABLE %s (id INTEGER)"
+ % preparer.quote_identifier("TAB3")
+ )
+ conn.exec_driver_sql(
+ "CREATE TABLE %s (id INTEGER)"
+ % preparer.quote_identifier("TAB4")
+ )
t1 = Table(
"tab1", self.metadata, Column("id", Integer, primary_key=True)
diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py
index 87886c4fa..f08248440 100644
--- a/test/sql/test_resultset.py
+++ b/test/sql/test_resultset.py
@@ -586,8 +586,8 @@ class ResultProxyTest(fixtures.TablesTest):
)
trans.rollback()
- def test_fetchone_til_end(self):
- result = testing.db.execute("select * from users")
+ def test_fetchone_til_end(self, connection):
+ result = connection.exec_driver_sql("select * from users")
eq_(result.fetchone(), None)
eq_(result.fetchone(), None)
eq_(result.fetchone(), None)
@@ -600,9 +600,10 @@ class ResultProxyTest(fixtures.TablesTest):
def test_connectionless_autoclose_rows_exhausted(self):
users = self.tables.users
- users.insert().execute(dict(user_id=1, user_name="john"))
+ with testing.db.connect() as conn:
+ conn.execute(users.insert(), dict(user_id=1, user_name="john"))
- result = testing.db.execute("select * from users")
+ result = testing.db.execute(text("select * from users"))
connection = result.connection
assert not connection.closed
eq_(result.fetchone(), (1, "john"))
@@ -627,7 +628,7 @@ class ResultProxyTest(fixtures.TablesTest):
assert connection.closed
def test_connectionless_autoclose_no_rows(self):
- result = testing.db.execute("select * from users")
+ result = testing.db.execute(text("select * from users"))
connection = result.connection
assert not connection.closed
eq_(result.fetchone(), None)
@@ -635,7 +636,7 @@ class ResultProxyTest(fixtures.TablesTest):
@testing.requires.updateable_autoincrement_pks
def test_connectionless_autoclose_no_metadata(self):
- result = testing.db.execute("update users set user_id=5")
+ result = testing.db.execute(text("update users set user_id=5"))
connection = result.connection
assert connection.closed
assert_raises_message(
@@ -1071,16 +1072,18 @@ class ResultProxyTest(fixtures.TablesTest):
[("user_id", 1), ("user_name", "foo")],
)
- def test_len(self):
+ def test_len(self, connection):
users = self.tables.users
- users.insert().execute(user_id=1, user_name="foo")
- r = users.select().execute().first()
+ connection.execute(users.insert(), dict(user_id=1, user_name="foo"))
+ r = connection.execute(users.select()).first()
eq_(len(r), 2)
- r = testing.db.execute("select user_name, user_id from users").first()
+ r = connection.exec_driver_sql(
+ "select user_name, user_id from users"
+ ).first()
eq_(len(r), 2)
- r = testing.db.execute("select user_name from users").first()
+ r = connection.exec_driver_sql("select user_name from users").first()
eq_(len(r), 1)
def test_sorting_in_python(self):
@@ -1109,12 +1112,15 @@ class ResultProxyTest(fixtures.TablesTest):
eq_([x.lower() for x in r._fields], ["user_id", "user_name"])
eq_(list(r._mapping.values()), [1, "foo"])
- def test_column_order_with_text_query(self):
+ def test_column_order_with_text_query(self, connection):
# should return values in query order
users = self.tables.users
- users.insert().execute(user_id=1, user_name="foo")
- r = testing.db.execute("select user_name, user_id from users").first()
+ connection.execute(users.insert(), dict(user_id=1, user_name="foo"))
+
+ r = connection.exec_driver_sql(
+ "select user_name, user_id from users"
+ ).first()
eq_(r[0], "foo")
eq_(r[1], 1)
eq_([x.lower() for x in r._fields], ["user_name", "user_id"])
@@ -1271,8 +1277,10 @@ class ResultProxyTest(fixtures.TablesTest):
eq_(row[1:0:-1], ("Uno",))
@testing.only_on("sqlite")
- def test_row_getitem_indexes_raw(self):
- row = testing.db.execute("select 'One' as key, 'Uno' as value").first()
+ def test_row_getitem_indexes_raw(self, connection):
+ row = connection.exec_driver_sql(
+ "select 'One' as key, 'Uno' as value"
+ ).first()
eq_(row._mapping["key"], "One")
eq_(row._mapping["value"], "Uno")
eq_(row[0], "One")
@@ -1304,7 +1312,7 @@ class ResultProxyTest(fixtures.TablesTest):
assert s.getvalue().strip() == "1,Test"
@testing.requires.selectone
- def test_empty_accessors(self):
+ def test_empty_accessors(self, connection):
statements = [
(
"select 1",
@@ -1339,7 +1347,10 @@ class ResultProxyTest(fixtures.TablesTest):
]
for stmt, meths, msg in statements:
- r = testing.db.execute(stmt)
+ if isinstance(stmt, str):
+ r = connection.exec_driver_sql(stmt)
+ else:
+ r = connection.execute(stmt)
try:
for meth in meths:
assert_raises_message(
diff --git a/test/sql/test_returning.py b/test/sql/test_returning.py
index 4cfb3f0d6..d81ad7186 100644
--- a/test/sql/test_returning.py
+++ b/test/sql/test_returning.py
@@ -171,13 +171,13 @@ class ReturningTest(fixtures.TestBase, AssertsExecutionResults):
)
@testing.fails_on_everything_except("postgresql", "firebird")
- def test_literal_returning(self):
+ def test_literal_returning(self, connection):
if testing.against("postgresql"):
literal_true = "true"
else:
literal_true = "1"
- result4 = testing.db.execute(
+ result4 = connection.exec_driver_sql(
'insert into tables (id, persons, "full") '
"values (5, 10, %s) returning persons" % literal_true
)
diff --git a/test/sql/test_type_expressions.py b/test/sql/test_type_expressions.py
index 8c3e8c5d4..412623c01 100644
--- a/test/sql/test_type_expressions.py
+++ b/test/sql/test_type_expressions.py
@@ -333,8 +333,8 @@ class DerivedTest(_ExprFixture, fixtures.TestBase, AssertsCompiledSQL):
class RoundTripTestBase(object):
- def test_round_trip(self):
- testing.db.execute(
+ def test_round_trip(self, connection):
+ connection.execute(
self.tables.test_table.insert(),
{"x": "X1", "y": "Y1"},
{"x": "X2", "y": "Y2"},
@@ -343,7 +343,7 @@ class RoundTripTestBase(object):
# test insert coercion alone
eq_(
- testing.db.execute(
+ connection.exec_driver_sql(
"select * from test_table order by y"
).fetchall(),
[("X1", "y1"), ("X2", "y2"), ("X3", "y3")],
@@ -351,7 +351,7 @@ class RoundTripTestBase(object):
# conversion back to upper
eq_(
- testing.db.execute(
+ connection.execute(
select([self.tables.test_table]).order_by(
self.tables.test_table.c.y
)
diff --git a/test/sql/test_types.py b/test/sql/test_types.py
index 356470dd3..9fb79958d 100644
--- a/test/sql/test_types.py
+++ b/test/sql/test_types.py
@@ -1520,7 +1520,7 @@ class EnumTest(AssertsCompiledSQL, fixtures.TablesTest):
eq_(conn.scalar(select([non_native_enum_table.c.someenum])), None)
@testing.requires.enforces_check_constraints
- def test_check_constraint(self):
+ def test_check_constraint(self, connection):
assert_raises(
(
exc.IntegrityError,
@@ -1530,7 +1530,7 @@ class EnumTest(AssertsCompiledSQL, fixtures.TablesTest):
# https://github.com/PyMySQL/PyMySQL/issues/607 is resolved
exc.InternalError,
),
- testing.db.execute,
+ connection.exec_driver_sql,
"insert into non_native_enum_table "
"(id, someenum) values(1, 'four')",
)
@@ -1563,10 +1563,10 @@ class EnumTest(AssertsCompiledSQL, fixtures.TablesTest):
self.metadata.create_all(conn)
assert_raises(
(exc.DBAPIError,),
- conn.execute,
+ conn.exec_driver_sql,
"insert into my_table " "(data) values('four')",
)
- conn.execute("insert into my_table (data) values ('two')")
+ conn.exec_driver_sql("insert into my_table (data) values ('two')")
@testing.requires.enforces_check_constraints
@testing.provide_metadata
@@ -1596,19 +1596,21 @@ class EnumTest(AssertsCompiledSQL, fixtures.TablesTest):
self.metadata.create_all(conn)
assert_raises(
(exc.DBAPIError,),
- conn.execute,
+ conn.exec_driver_sql,
"insert into my_table " "(data) values('two')",
)
- conn.execute("insert into my_table (data) values ('four')")
+ conn.exec_driver_sql("insert into my_table (data) values ('four')")
def test_skip_check_constraint(self):
with testing.db.connect() as conn:
- conn.execute(
+ conn.exec_driver_sql(
"insert into non_native_enum_table "
"(id, someotherenum) values(1, 'four')"
)
eq_(
- conn.scalar("select someotherenum from non_native_enum_table"),
+ conn.exec_driver_sql(
+ "select someotherenum from non_native_enum_table"
+ ).scalar(),
"four",
)
assert_raises_message(
@@ -2275,11 +2277,14 @@ class ExpressionTest(
def teardown_class(cls):
meta.drop_all()
- def test_control(self):
- assert testing.db.execute("select avalue from test").scalar() == 250
+ def test_control(self, connection):
+ assert (
+ connection.exec_driver_sql("select avalue from test").scalar()
+ == 250
+ )
eq_(
- test_table.select().execute().fetchall(),
+ connection.execute(test_table.select()).fetchall(),
[
(
1,
@@ -2787,35 +2792,35 @@ class NumericRawSQLTest(fixtures.TestBase):
@testing.fails_on("sqlite", "Doesn't provide Decimal results natively")
@testing.provide_metadata
- def test_decimal_fp(self):
+ def test_decimal_fp(self, connection):
metadata = self.metadata
self._fixture(metadata, Numeric(10, 5), decimal.Decimal("45.5"))
- val = testing.db.execute("select val from t").scalar()
+ val = connection.exec_driver_sql("select val from t").scalar()
assert isinstance(val, decimal.Decimal)
eq_(val, decimal.Decimal("45.5"))
@testing.fails_on("sqlite", "Doesn't provide Decimal results natively")
@testing.provide_metadata
- def test_decimal_int(self):
+ def test_decimal_int(self, connection):
metadata = self.metadata
self._fixture(metadata, Numeric(10, 5), decimal.Decimal("45"))
- val = testing.db.execute("select val from t").scalar()
+ val = connection.exec_driver_sql("select val from t").scalar()
assert isinstance(val, decimal.Decimal)
eq_(val, decimal.Decimal("45"))
@testing.provide_metadata
- def test_ints(self):
+ def test_ints(self, connection):
metadata = self.metadata
self._fixture(metadata, Integer, 45)
- val = testing.db.execute("select val from t").scalar()
+ val = connection.exec_driver_sql("select val from t").scalar()
assert isinstance(val, util.int_types)
eq_(val, 45)
@testing.provide_metadata
- def test_float(self):
+ def test_float(self, connection):
metadata = self.metadata
self._fixture(metadata, Float, 46.583)
- val = testing.db.execute("select val from t").scalar()
+ val = connection.exec_driver_sql("select val from t").scalar()
assert isinstance(val, float)
# some DBAPIs have unusual float handling
@@ -2936,16 +2941,16 @@ class BooleanTest(
)
@testing.fails_on("mssql", "FIXME: MS-SQL 2005 doesn't honor CHECK ?!?")
@testing.skip_if(lambda: testing.db.dialect.supports_native_boolean)
- def test_constraint(self):
+ def test_constraint(self, connection):
assert_raises(
(exc.IntegrityError, exc.ProgrammingError),
- testing.db.execute,
+ connection.exec_driver_sql,
"insert into boolean_table (id, value) values(1, 5)",
)
@testing.skip_if(lambda: testing.db.dialect.supports_native_boolean)
- def test_unconstrained(self):
- testing.db.execute(
+ def test_unconstrained(self, connection):
+ connection.exec_driver_sql(
"insert into boolean_table (id, unconstrained_value)"
"values (1, 5)"
)
@@ -2993,13 +2998,16 @@ class BooleanTest(
def test_nonnative_processor_coerces_integer_to_boolean(self):
boolean_table = self.tables.boolean_table
with testing.db.connect() as conn:
- conn.execute(
+ conn.exec_driver_sql(
"insert into boolean_table (id, unconstrained_value) "
"values (1, 5)"
)
eq_(
- conn.scalar("select unconstrained_value from boolean_table"), 5
+ conn.exec_driver_sql(
+ "select unconstrained_value from boolean_table"
+ ).scalar(),
+ 5,
)
eq_(