summaryrefslogtreecommitdiff
path: root/test/engine
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine')
-rw-r--r--test/engine/test_bind.py5
-rw-r--r--test/engine/test_deprecations.py299
-rw-r--r--test/engine/test_execute.py463
-rw-r--r--test/engine/test_logging.py44
-rw-r--r--test/engine/test_reconnect.py8
-rw-r--r--test/engine/test_reflection.py24
-rw-r--r--test/engine/test_transaction.py99
7 files changed, 624 insertions, 318 deletions
diff --git a/test/engine/test_bind.py b/test/engine/test_bind.py
index 956874846..c58e703d3 100644
--- a/test/engine/test_bind.py
+++ b/test/engine/test_bind.py
@@ -133,7 +133,10 @@ class BindTest(fixtures.TestBase):
trans.rollback()
metadata.bind = None
assert (
- conn.execute("select count(*) from test_table").scalar() == 0
+ conn.exec_driver_sql(
+ "select count(*) from test_table"
+ ).scalar()
+ == 0
)
finally:
metadata.drop_all(bind=conn)
diff --git a/test/engine/test_deprecations.py b/test/engine/test_deprecations.py
index 0c77e8c25..edf1503d5 100644
--- a/test/engine/test_deprecations.py
+++ b/test/engine/test_deprecations.py
@@ -4,6 +4,7 @@ from sqlalchemy import create_engine
from sqlalchemy import event
from sqlalchemy import ForeignKey
from sqlalchemy import func
+from sqlalchemy import INT
from sqlalchemy import Integer
from sqlalchemy import literal
from sqlalchemy import MetaData
@@ -12,7 +13,9 @@ from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy import testing
from sqlalchemy import TypeDecorator
+from sqlalchemy import VARCHAR
from sqlalchemy.engine import reflection
+from sqlalchemy.engine.base import Connection
from sqlalchemy.engine.base import Engine
from sqlalchemy.engine.mock import MockConnection
from sqlalchemy.testing import assert_raises
@@ -22,12 +25,20 @@ from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing import is_false
+from sqlalchemy.testing import is_instance_of
from sqlalchemy.testing import is_true
from sqlalchemy.testing.mock import Mock
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
+def _string_deprecation_expect():
+ return testing.expect_deprecated_20(
+ r"Passing a string to Connection.execute\(\) is deprecated "
+ r"and will be removed in version 2.0"
+ )
+
+
class SomeException(Exception):
pass
@@ -35,6 +46,10 @@ class SomeException(Exception):
class ConnectionlessDeprecationTest(fixtures.TestBase):
"""test various things associated with "connectionless" executions."""
+ def check_usage(self, inspector):
+ with inspector._operation_context() as conn:
+ is_instance_of(conn, Connection)
+
def test_inspector_constructor_engine(self):
with testing.expect_deprecated(
r"The __init__\(\) method on Inspector is deprecated and will "
@@ -43,6 +58,7 @@ class ConnectionlessDeprecationTest(fixtures.TestBase):
i1 = reflection.Inspector(testing.db)
is_(i1.bind, testing.db)
+ self.check_usage(i1)
def test_inspector_constructor_connection(self):
with testing.db.connect() as conn:
@@ -54,6 +70,7 @@ class ConnectionlessDeprecationTest(fixtures.TestBase):
is_(i1.bind, conn)
is_(i1.engine, testing.db)
+ self.check_usage(i1)
def test_inspector_from_engine(self):
with testing.expect_deprecated(
@@ -63,6 +80,7 @@ class ConnectionlessDeprecationTest(fixtures.TestBase):
i1 = reflection.Inspector.from_engine(testing.db)
is_(i1.bind, testing.db)
+ self.check_usage(i1)
def test_bind_close_conn(self):
e = testing.db
@@ -256,7 +274,7 @@ class HandleErrorTest(fixtures.TestBase):
with engine.connect() as conn:
try:
- conn.execute("SELECT FOO FROM I_DONT_EXIST")
+ conn.exec_driver_sql("SELECT FOO FROM I_DONT_EXIST")
assert False
except tsa.exc.DBAPIError as e:
eq_(canary.mock_calls[0][1][5], e.orig)
@@ -305,7 +323,10 @@ class HandleErrorTest(fixtures.TestBase):
)
assert_raises_message(
- TypeError, "I'm not a DBAPI error", c.execute, "select "
+ TypeError,
+ "I'm not a DBAPI error",
+ c.exec_driver_sql,
+ "select ",
)
# no legacy event
eq_(listener.mock_calls, [])
@@ -369,6 +390,10 @@ class PoolTestBase(fixtures.TestBase):
)
+def select1(db):
+ return str(select([1]).compile(dialect=db.dialect))
+
+
class DeprecatedEngineFeatureTest(fixtures.TablesTest):
__backend__ = True
@@ -442,6 +467,60 @@ class DeprecatedEngineFeatureTest(fixtures.TablesTest):
assert_raises(Exception, conn.transaction, fn, 5, value=8)
self._assert_no_data()
+ def test_execute_plain_string(self):
+ with _string_deprecation_expect():
+ testing.db.execute(select1(testing.db)).scalar()
+
+ def test_scalar_plain_string(self):
+ with _string_deprecation_expect():
+ testing.db.scalar(select1(testing.db))
+
+ # Tests for the warning when non dict params are used
+ # @testing.combinations(42, (42,))
+ # def test_execute_positional_non_dicts(self, args):
+ # with testing.expect_deprecated(
+ # r"Usage of tuple or scalars as positional arguments of "
+ # ):
+ # testing.db.execute(text(select1(testing.db)), args).scalar()
+
+ # @testing.combinations(42, (42,))
+ # def test_scalar_positional_non_dicts(self, args):
+ # with testing.expect_deprecated(
+ # r"Usage of tuple or scalars as positional arguments of "
+ # ):
+ # testing.db.scalar(text(select1(testing.db)), args)
+
+
+class DeprecatedConnectionFeatureTest(fixtures.TablesTest):
+ __backend__ = True
+
+ def test_execute_plain_string(self):
+ with _string_deprecation_expect():
+ with testing.db.connect() as conn:
+ conn.execute(select1(testing.db)).scalar()
+
+ def test_scalar_plain_string(self):
+ with _string_deprecation_expect():
+ with testing.db.connect() as conn:
+ conn.scalar(select1(testing.db))
+
+ # Tests for the warning when non dict params are used
+ # @testing.combinations(42, (42,))
+ # def test_execute_positional_non_dicts(self, args):
+ # with testing.expect_deprecated(
+ # r"Usage of tuple or scalars as positional arguments of "
+ # ):
+ # with testing.db.connect() as conn:
+ # conn.execute(text(select1(testing.db)), args).scalar()
+
+ # @testing.combinations(42, (42,))
+ # def test_scalar_positional_non_dicts(self, args):
+ # with testing.expect_deprecated(
+ # r"Usage of tuple or scalars as positional arguments of "
+ # ):
+ # with testing.db.connect() as conn:
+ # conn.scalar(text(select1(testing.db)), args)
+
class DeprecatedReflectionTest(fixtures.TablesTest):
@classmethod
@@ -540,3 +619,219 @@ class ExecutionOptionsTest(fixtures.TestBase):
):
c2_branch = c2.connect()
eq_(c2_branch._execution_options, {"foo": "bar"})
+
+
+class RawExecuteTest(fixtures.TablesTest):
+ __backend__ = True
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "users",
+ metadata,
+ Column("user_id", INT, primary_key=True, autoincrement=False),
+ Column("user_name", VARCHAR(20)),
+ )
+ Table(
+ "users_autoinc",
+ metadata,
+ Column(
+ "user_id", INT, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column("user_name", VARCHAR(20)),
+ )
+
+ @testing.fails_on(
+ "postgresql+pg8000",
+ "pg8000 still doesn't allow single paren without params",
+ )
+ def test_no_params_option(self, connection):
+ stmt = (
+ "SELECT '%'"
+ + testing.db.dialect.statement_compiler(
+ testing.db.dialect, None
+ ).default_from()
+ )
+
+ result = (
+ connection.execution_options(no_parameters=True)
+ .execute(stmt)
+ .scalar()
+ )
+ eq_(result, "%")
+
+ @testing.requires.qmark_paramstyle
+ def test_raw_qmark(self, connection):
+ conn = connection
+
+ with _string_deprecation_expect():
+ conn.execute(
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ (1, "jack"),
+ )
+ with _string_deprecation_expect():
+ conn.execute(
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ [2, "fred"],
+ )
+
+ with _string_deprecation_expect():
+ conn.execute(
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ [3, "ed"],
+ [4, "horse"],
+ )
+ with _string_deprecation_expect():
+ conn.execute(
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ (5, "barney"),
+ (6, "donkey"),
+ )
+
+ with _string_deprecation_expect():
+ conn.execute(
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ 7,
+ "sally",
+ )
+
+ with _string_deprecation_expect():
+ res = conn.execute("select * from users order by user_id")
+ assert res.fetchall() == [
+ (1, "jack"),
+ (2, "fred"),
+ (3, "ed"),
+ (4, "horse"),
+ (5, "barney"),
+ (6, "donkey"),
+ (7, "sally"),
+ ]
+ for multiparam, param in [
+ (("jack", "fred"), {}),
+ ((["jack", "fred"],), {}),
+ ]:
+ with _string_deprecation_expect():
+ res = conn.execute(
+ "select * from users where user_name=? or "
+ "user_name=? order by user_id",
+ *multiparam,
+ **param
+ )
+ assert res.fetchall() == [(1, "jack"), (2, "fred")]
+
+ with _string_deprecation_expect():
+ res = conn.execute("select * from users where user_name=?", "jack")
+ assert res.fetchall() == [(1, "jack")]
+
+ @testing.requires.format_paramstyle
+ def test_raw_sprintf(self, connection):
+ conn = connection
+ with _string_deprecation_expect():
+ conn.execute(
+ "insert into users (user_id, user_name) " "values (%s, %s)",
+ [1, "jack"],
+ )
+ with _string_deprecation_expect():
+ conn.execute(
+ "insert into users (user_id, user_name) " "values (%s, %s)",
+ [2, "ed"],
+ [3, "horse"],
+ )
+ with _string_deprecation_expect():
+ conn.execute(
+ "insert into users (user_id, user_name) " "values (%s, %s)",
+ 4,
+ "sally",
+ )
+ with _string_deprecation_expect():
+ conn.execute("insert into users (user_id) values (%s)", 5)
+ with _string_deprecation_expect():
+ res = conn.execute("select * from users order by user_id")
+ assert res.fetchall() == [
+ (1, "jack"),
+ (2, "ed"),
+ (3, "horse"),
+ (4, "sally"),
+ (5, None),
+ ]
+ for multiparam, param in [
+ (("jack", "ed"), {}),
+ ((["jack", "ed"],), {}),
+ ]:
+ with _string_deprecation_expect():
+ res = conn.execute(
+ "select * from users where user_name=%s or "
+ "user_name=%s order by user_id",
+ *multiparam,
+ **param
+ )
+ assert res.fetchall() == [(1, "jack"), (2, "ed")]
+ with _string_deprecation_expect():
+ res = conn.execute(
+ "select * from users where user_name=%s", "jack"
+ )
+ assert res.fetchall() == [(1, "jack")]
+
+ @testing.requires.pyformat_paramstyle
+ def test_raw_python(self, connection):
+ conn = connection
+ with _string_deprecation_expect():
+ conn.execute(
+ "insert into users (user_id, user_name) "
+ "values (%(id)s, %(name)s)",
+ {"id": 1, "name": "jack"},
+ )
+ with _string_deprecation_expect():
+ conn.execute(
+ "insert into users (user_id, user_name) "
+ "values (%(id)s, %(name)s)",
+ {"id": 2, "name": "ed"},
+ {"id": 3, "name": "horse"},
+ )
+ with _string_deprecation_expect():
+ conn.execute(
+ "insert into users (user_id, user_name) "
+ "values (%(id)s, %(name)s)",
+ id=4,
+ name="sally",
+ )
+ with _string_deprecation_expect():
+ res = conn.execute("select * from users order by user_id")
+ assert res.fetchall() == [
+ (1, "jack"),
+ (2, "ed"),
+ (3, "horse"),
+ (4, "sally"),
+ ]
+
+ @testing.requires.named_paramstyle
+ def test_raw_named(self, connection):
+ conn = connection
+ with _string_deprecation_expect():
+ conn.execute(
+ "insert into users (user_id, user_name) "
+ "values (:id, :name)",
+ {"id": 1, "name": "jack"},
+ )
+ with _string_deprecation_expect():
+ conn.execute(
+ "insert into users (user_id, user_name) "
+ "values (:id, :name)",
+ {"id": 2, "name": "ed"},
+ {"id": 3, "name": "horse"},
+ )
+ with _string_deprecation_expect():
+ conn.execute(
+ "insert into users (user_id, user_name) "
+ "values (:id, :name)",
+ id=4,
+ name="sally",
+ )
+ with _string_deprecation_expect():
+ res = conn.execute("select * from users order by user_id")
+ assert res.fetchall() == [
+ (1, "jack"),
+ (2, "ed"),
+ (3, "horse"),
+ (4, "sally"),
+ ]
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index 150038a40..0b5b1b16d 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -50,27 +50,22 @@ from sqlalchemy.testing.util import gc_collect
from sqlalchemy.testing.util import picklers
-users, metadata, users_autoinc = None, None, None
-
-
class SomeException(Exception):
pass
-class ExecuteTest(fixtures.TestBase):
+class ExecuteTest(fixtures.TablesTest):
__backend__ = True
@classmethod
- def setup_class(cls):
- global users, users_autoinc, metadata
- metadata = MetaData(testing.db)
- users = Table(
+ def define_tables(cls, metadata):
+ Table(
"users",
metadata,
Column("user_id", INT, primary_key=True, autoincrement=False),
Column("user_name", VARCHAR(20)),
)
- users_autoinc = Table(
+ Table(
"users_autoinc",
metadata,
Column(
@@ -78,15 +73,6 @@ class ExecuteTest(fixtures.TestBase):
),
Column("user_name", VARCHAR(20)),
)
- metadata.create_all()
-
- @engines.close_first
- def teardown(self):
- testing.db.execute(users.delete())
-
- @classmethod
- def teardown_class(cls):
- metadata.drop_all()
@testing.fails_on(
"postgresql+pg8000",
@@ -101,223 +87,176 @@ class ExecuteTest(fixtures.TestBase):
)
conn = testing.db.connect()
- result = conn.execution_options(no_parameters=True).scalar(stmt)
+ result = (
+ conn.execution_options(no_parameters=True)
+ .exec_driver_sql(stmt)
+ .scalar()
+ )
eq_(result, "%")
- @testing.fails_on_everything_except(
- "firebird", "sqlite", "+pyodbc", "+mxodbc", "mysql+oursql"
- )
- def test_raw_qmark(self):
- def go(conn):
- conn.execute(
- "insert into users (user_id, user_name) " "values (?, ?)",
- (1, "jack"),
- )
- conn.execute(
- "insert into users (user_id, user_name) " "values (?, ?)",
- [2, "fred"],
- )
- conn.execute(
- "insert into users (user_id, user_name) " "values (?, ?)",
- [3, "ed"],
- [4, "horse"],
- )
- conn.execute(
- "insert into users (user_id, user_name) " "values (?, ?)",
- (5, "barney"),
- (6, "donkey"),
- )
- conn.execute(
- "insert into users (user_id, user_name) " "values (?, ?)",
- 7,
- "sally",
- )
- res = conn.execute("select * from users order by user_id")
- assert res.fetchall() == [
- (1, "jack"),
- (2, "fred"),
- (3, "ed"),
- (4, "horse"),
- (5, "barney"),
- (6, "donkey"),
- (7, "sally"),
- ]
- for multiparam, param in [
- (("jack", "fred"), {}),
- ((["jack", "fred"],), {}),
- ]:
- res = conn.execute(
- "select * from users where user_name=? or "
- "user_name=? order by user_id",
- *multiparam,
- **param
- )
- assert res.fetchall() == [(1, "jack"), (2, "fred")]
- res = conn.execute("select * from users where user_name=?", "jack")
- assert res.fetchall() == [(1, "jack")]
- conn.execute("delete from users")
-
- go(testing.db)
- conn = testing.db.connect()
- try:
- go(conn)
- finally:
- conn.close()
-
- # some psycopg2 versions bomb this.
- @testing.fails_on_everything_except(
- "mysql+mysqldb",
- "mysql+pymysql",
- "mysql+cymysql",
- "mysql+mysqlconnector",
- "postgresql",
- )
- def test_raw_sprintf(self):
- def go(conn):
- conn.execute(
- "insert into users (user_id, user_name) " "values (%s, %s)",
- [1, "jack"],
- )
- conn.execute(
- "insert into users (user_id, user_name) " "values (%s, %s)",
- [2, "ed"],
- [3, "horse"],
- )
- conn.execute(
- "insert into users (user_id, user_name) " "values (%s, %s)",
- 4,
- "sally",
- )
- conn.execute("insert into users (user_id) values (%s)", 5)
- res = conn.execute("select * from users order by user_id")
- assert res.fetchall() == [
- (1, "jack"),
- (2, "ed"),
- (3, "horse"),
- (4, "sally"),
- (5, None),
- ]
- for multiparam, param in [
- (("jack", "ed"), {}),
- ((["jack", "ed"],), {}),
- ]:
- res = conn.execute(
- "select * from users where user_name=%s or "
- "user_name=%s order by user_id",
- *multiparam,
- **param
- )
- assert res.fetchall() == [(1, "jack"), (2, "ed")]
- res = conn.execute(
- "select * from users where user_name=%s", "jack"
- )
- assert res.fetchall() == [(1, "jack")]
-
- conn.execute("delete from users")
-
- go(testing.db)
- conn = testing.db.connect()
- try:
- go(conn)
- finally:
- conn.close()
-
- # pyformat is supported for mysql, but skipping because a few driver
- # versions have a bug that bombs out on this test. (1.2.2b3,
- # 1.2.2c1, 1.2.2)
-
- @testing.skip_if(lambda: testing.against("mysql+mysqldb"), "db-api flaky")
- @testing.fails_on_everything_except(
- "postgresql+psycopg2",
- "postgresql+psycopg2cffi",
- "postgresql+pypostgresql",
- "postgresql+pygresql",
- "mysql+mysqlconnector",
- "mysql+pymysql",
- "mysql+cymysql",
- "mssql+pymssql",
- )
- def test_raw_python(self):
- def go(conn):
- conn.execute(
- "insert into users (user_id, user_name) "
- "values (%(id)s, %(name)s)",
- {"id": 1, "name": "jack"},
- )
- conn.execute(
- "insert into users (user_id, user_name) "
- "values (%(id)s, %(name)s)",
- {"id": 2, "name": "ed"},
- {"id": 3, "name": "horse"},
- )
- conn.execute(
- "insert into users (user_id, user_name) "
- "values (%(id)s, %(name)s)",
- id=4,
- name="sally",
- )
- res = conn.execute("select * from users order by user_id")
- assert res.fetchall() == [
- (1, "jack"),
- (2, "ed"),
- (3, "horse"),
- (4, "sally"),
- ]
- conn.execute("delete from users")
+ def test_raw_positional_invalid(self, connection):
+ assert_raises_message(
+ tsa.exc.ArgumentError,
+ "List argument must consist only of tuples or dictionaries",
+ connection.exec_driver_sql,
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ [2, "fred"],
+ )
- go(testing.db)
- conn = testing.db.connect()
- try:
- go(conn)
- finally:
- conn.close()
+ assert_raises_message(
+ tsa.exc.ArgumentError,
+ "List argument must consist only of tuples or dictionaries",
+ connection.exec_driver_sql,
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ [[3, "ed"], [4, "horse"]],
+ )
- @testing.fails_on_everything_except("sqlite", "oracle+cx_oracle")
- def test_raw_named(self):
- def go(conn):
- conn.execute(
- "insert into users (user_id, user_name) "
- "values (:id, :name)",
- {"id": 1, "name": "jack"},
- )
- conn.execute(
- "insert into users (user_id, user_name) "
- "values (:id, :name)",
- {"id": 2, "name": "ed"},
- {"id": 3, "name": "horse"},
- )
- conn.execute(
- "insert into users (user_id, user_name) "
- "values (:id, :name)",
- id=4,
- name="sally",
- )
- res = conn.execute("select * from users order by user_id")
- assert res.fetchall() == [
- (1, "jack"),
- (2, "ed"),
- (3, "horse"),
- (4, "sally"),
- ]
- conn.execute("delete from users")
+ def test_raw_named_invalid(self, connection):
+ assert_raises(
+ TypeError,
+ connection.exec_driver_sql,
+ "insert into users (user_id, user_name) "
+ "values (%(id)s, %(name)s)",
+ {"id": 2, "name": "ed"},
+ {"id": 3, "name": "horse"},
+ )
+ assert_raises(
+ TypeError,
+ connection.exec_driver_sql,
+ "insert into users (user_id, user_name) "
+ "values (%(id)s, %(name)s)",
+ id=4,
+ name="sally",
+ )
- go(testing.db)
- conn = testing.db.connect()
- try:
- go(conn)
- finally:
- conn.close()
+ @testing.requires.qmark_paramstyle
+ def test_raw_qmark(self, connection):
+ conn = connection
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ (1, "jack"),
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ (2, "fred"),
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ [(3, "ed"), (4, "horse")],
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ [(5, "barney"), (6, "donkey")],
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (?, ?)",
+ (7, "sally"),
+ )
+ res = conn.exec_driver_sql("select * from users order by user_id")
+ assert res.fetchall() == [
+ (1, "jack"),
+ (2, "fred"),
+ (3, "ed"),
+ (4, "horse"),
+ (5, "barney"),
+ (6, "donkey"),
+ (7, "sally"),
+ ]
+
+ res = conn.exec_driver_sql(
+ "select * from users where user_name=?", ("jack",)
+ )
+ assert res.fetchall() == [(1, "jack")]
+
+ @testing.requires.format_paramstyle
+ def test_raw_sprintf(self, connection):
+ conn = connection
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (%s, %s)",
+ (1, "jack"),
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (%s, %s)",
+ [(2, "ed"), (3, "horse")],
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (%s, %s)",
+ (4, "sally"),
+ )
+ conn.exec_driver_sql("insert into users (user_id) values (%s)", (5,))
+ res = conn.exec_driver_sql("select * from users order by user_id")
+ assert res.fetchall() == [
+ (1, "jack"),
+ (2, "ed"),
+ (3, "horse"),
+ (4, "sally"),
+ (5, None),
+ ]
+
+ res = conn.exec_driver_sql(
+ "select * from users where user_name=%s", ("jack",)
+ )
+ assert res.fetchall() == [(1, "jack")]
+
+ @testing.requires.pyformat_paramstyle
+ def test_raw_python(self, connection):
+ conn = connection
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) "
+ "values (%(id)s, %(name)s)",
+ {"id": 1, "name": "jack"},
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) "
+ "values (%(id)s, %(name)s)",
+ [{"id": 2, "name": "ed"}, {"id": 3, "name": "horse"}],
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) "
+ "values (%(id)s, %(name)s)",
+ dict(id=4, name="sally"),
+ )
+ res = conn.exec_driver_sql("select * from users order by user_id")
+ assert res.fetchall() == [
+ (1, "jack"),
+ (2, "ed"),
+ (3, "horse"),
+ (4, "sally"),
+ ]
+
+ @testing.requires.named_paramstyle
+ def test_raw_named(self, connection):
+ conn = connection
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (:id, :name)",
+ {"id": 1, "name": "jack"},
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (:id, :name)",
+ [{"id": 2, "name": "ed"}, {"id": 3, "name": "horse"}],
+ )
+ conn.exec_driver_sql(
+ "insert into users (user_id, user_name) " "values (:id, :name)",
+ {"id": 4, "name": "sally"},
+ )
+ res = conn.exec_driver_sql("select * from users order by user_id")
+ assert res.fetchall() == [
+ (1, "jack"),
+ (2, "ed"),
+ (3, "horse"),
+ (4, "sally"),
+ ]
@testing.engines.close_open_connections
def test_exception_wrapping_dbapi(self):
conn = testing.db.connect()
- for _c in testing.db, conn:
- assert_raises_message(
- tsa.exc.DBAPIError,
- r"not_a_valid_statement",
- _c.execute,
- "not_a_valid_statement",
- )
+ # engine does not have exec_driver_sql
+ assert_raises_message(
+ tsa.exc.DBAPIError,
+ r"not_a_valid_statement",
+ conn.exec_driver_sql,
+ "not_a_valid_statement",
+ )
@testing.requires.sqlite
def test_exception_wrapping_non_dbapi_error(self):
@@ -334,7 +273,10 @@ class ExecuteTest(fixtures.TestBase):
)
assert_raises_message(
- TypeError, "I'm not a DBAPI error", c.execute, "select "
+ TypeError,
+ "I'm not a DBAPI error",
+ c.exec_driver_sql,
+ "select ",
)
eq_(is_disconnect.call_count, 0)
@@ -359,7 +301,7 @@ class ExecuteTest(fixtures.TestBase):
):
with testing.db.connect() as conn:
assert_raises(
- tsa.exc.OperationalError, conn.execute, "select 1"
+ tsa.exc.OperationalError, conn.exec_driver_sql, "select 1"
)
def test_exception_wrapping_non_dbapi_statement(self):
@@ -378,11 +320,8 @@ class ExecuteTest(fixtures.TestBase):
)
_go(testing.db)
- conn = testing.db.connect()
- try:
+ with testing.db.connect() as conn:
_go(conn)
- finally:
- conn.close()
def test_not_an_executable(self):
for obj in (
@@ -574,6 +513,7 @@ class ExecuteTest(fixtures.TestBase):
def test_empty_insert(self):
"""test that execute() interprets [] as a list with no params"""
+ users_autoinc = self.tables.users_autoinc
testing.db.execute(
users_autoinc.insert().values(user_name=bindparam("name", None)),
@@ -885,7 +825,7 @@ class CompiledCacheTest(fixtures.TestBase):
cached_conn.execute(ins, {"user_name": "u3"})
eq_(compile_mock.call_count, 1)
assert len(cache) == 1
- eq_(conn.execute("select count(*) from users").scalar(), 3)
+ eq_(conn.exec_driver_sql("select count(*) from users").scalar(), 3)
@testing.only_on(
["sqlite", "mysql", "postgresql"],
@@ -924,7 +864,7 @@ class CompiledCacheTest(fixtures.TestBase):
cached_conn.execute(ins, {"photo_blob": blob})
eq_(compile_mock.call_count, 1)
eq_(len(cache), 1)
- eq_(conn.execute("select count(*) from photo").scalar(), 1)
+ eq_(conn.exec_driver_sql("select count(*) from photo").scalar(), 1)
del blob
@@ -1427,7 +1367,7 @@ class EngineEventsTest(fixtures.TestBase):
with e1.connect() as conn:
- result = conn.execute(stmt)
+ result = conn.exec_driver_sql(stmt)
ctx = result.context
eq_(
@@ -1496,7 +1436,7 @@ class EngineEventsTest(fixtures.TestBase):
t1.insert().execute(c1=5, c2="some data")
t1.insert().execute(c1=6)
eq_(
- engine.execute("select * from t1").fetchall(),
+ engine.execute(text("select * from t1")).fetchall(),
[(5, "some data"), (6, "foo")],
)
finally:
@@ -1987,7 +1927,7 @@ class HandleErrorTest(fixtures.TestBase):
with engine.connect() as conn:
try:
- conn.execute("SELECT FOO FROM I_DONT_EXIST")
+ conn.exec_driver_sql("SELECT FOO FROM I_DONT_EXIST")
assert False
except tsa.exc.DBAPIError as e:
ctx = canary.mock_calls[0][1][0]
@@ -2018,20 +1958,20 @@ class HandleErrorTest(fixtures.TestBase):
assert_raises_message(
MyException,
"my exception",
- conn.execute,
+ conn.exec_driver_sql,
"SELECT 'ERROR ONE' FROM I_DONT_EXIST",
)
# case 2: return the DBAPI exception we're given;
# no wrapping should occur
assert_raises(
conn.dialect.dbapi.Error,
- conn.execute,
+ conn.exec_driver_sql,
"SELECT 'ERROR TWO' FROM I_DONT_EXIST",
)
# case 3: normal wrapping
assert_raises(
tsa.exc.DBAPIError,
- conn.execute,
+ conn.exec_driver_sql,
"SELECT 'ERROR THREE' FROM I_DONT_EXIST",
)
@@ -2080,7 +2020,7 @@ class HandleErrorTest(fixtures.TestBase):
assert_raises_message(
MyException2,
"my exception chained",
- conn.execute,
+ conn.exec_driver_sql,
"SELECT 'ERROR ONE' FROM I_DONT_EXIST",
)
eq_(patched.call_count, 1)
@@ -2090,7 +2030,7 @@ class HandleErrorTest(fixtures.TestBase):
) as patched:
assert_raises(
MyException1,
- conn.execute,
+ conn.exec_driver_sql,
"SELECT 'ERROR TWO' FROM I_DONT_EXIST",
)
eq_(patched.call_count, 1)
@@ -2102,7 +2042,7 @@ class HandleErrorTest(fixtures.TestBase):
# by err2
assert_raises(
MyException1,
- conn.execute,
+ conn.exec_driver_sql,
"SELECT 'ERROR THREE' FROM I_DONT_EXIST",
)
eq_(patched.call_count, 1)
@@ -2112,7 +2052,7 @@ class HandleErrorTest(fixtures.TestBase):
) as patched:
assert_raises(
tsa.exc.DBAPIError,
- conn.execute,
+ conn.exec_driver_sql,
"SELECT 'ERROR FIVE' FROM I_DONT_EXIST",
)
eq_(patched.call_count, 1)
@@ -2123,7 +2063,7 @@ class HandleErrorTest(fixtures.TestBase):
assert_raises_message(
MyException3,
"my exception short circuit",
- conn.execute,
+ conn.exec_driver_sql,
"SELECT 'ERROR FOUR' FROM I_DONT_EXIST",
)
eq_(patched.call_count, 1)
@@ -2145,7 +2085,7 @@ class HandleErrorTest(fixtures.TestBase):
assert_raises_message(
tsa.exc.OperationalError,
"rollback failed",
- conn.execute,
+ conn.exec_driver_sql,
"insert into i_dont_exist (x) values ('y')",
)
@@ -2200,7 +2140,10 @@ class HandleErrorTest(fixtures.TestBase):
)
assert_raises_message(
- TypeError, "I'm not a DBAPI error", c.execute, "select "
+ TypeError,
+ "I'm not a DBAPI error",
+ c.exec_driver_sql,
+ "select ",
)
ctx = listener.mock_calls[0][1][0]
eq_(ctx.statement, "select ")
@@ -2223,13 +2166,17 @@ class HandleErrorTest(fixtures.TestBase):
with engine.connect() as conn:
assert_raises(
tsa.exc.DBAPIError,
- conn.execution_options(skip_user_error_events=True).execute,
+ conn.execution_options(
+ skip_user_error_events=True
+ ).exec_driver_sql,
"SELECT ERROR_ONE FROM I_DONT_EXIST",
)
assert_raises(
MyException1,
- conn.execution_options(skip_user_error_events=False).execute,
+ conn.execution_options(
+ skip_user_error_events=False
+ ).exec_driver_sql,
"SELECT ERROR_ONE FROM I_DONT_EXIST",
)
@@ -2246,7 +2193,7 @@ class HandleErrorTest(fixtures.TestBase):
with engine.connect() as c:
try:
- c.execute("SELECT x FROM nonexistent")
+ c.exec_driver_sql("SELECT x FROM nonexistent")
assert False
except tsa.exc.StatementError as st:
eq_(st.connection_invalidated, evt_value)
@@ -2287,7 +2234,7 @@ class HandleErrorTest(fixtures.TestBase):
with engine.connect() as c:
target_crec = c.connection._connection_record
try:
- c.execute("SELECT x FROM nonexistent")
+ c.exec_driver_sql("SELECT x FROM nonexistent")
assert False
except tsa.exc.StatementError as st:
eq_(st.connection_invalidated, True)
@@ -2644,7 +2591,9 @@ class DialectEventTest(fixtures.TestBase):
def _test_do_execute(self, retval):
with self._run_test(retval) as (conn, m1):
- result = conn.execute("insert into table foo", {"foo": "bar"})
+ result = conn.exec_driver_sql(
+ "insert into table foo", {"foo": "bar"}
+ )
self._assert(
retval,
m1.do_execute,
@@ -2661,7 +2610,7 @@ class DialectEventTest(fixtures.TestBase):
def _test_do_executemany(self, retval):
with self._run_test(retval) as (conn, m1):
- result = conn.execute(
+ result = conn.exec_driver_sql(
"insert into table foo", [{"foo": "bar"}, {"foo": "bar"}]
)
self._assert(
@@ -2680,9 +2629,9 @@ class DialectEventTest(fixtures.TestBase):
def _test_do_execute_no_params(self, retval):
with self._run_test(retval) as (conn, m1):
- result = conn.execution_options(no_parameters=True).execute(
- "insert into table foo"
- )
+ result = conn.execution_options(
+ no_parameters=True
+ ).exec_driver_sql("insert into table foo")
self._assert(
retval,
m1.do_execute_no_params,
@@ -2840,7 +2789,7 @@ class AutocommitTextTest(fixtures.TestBase):
engine.dialect.dbapi = dbapi
with engine.connect() as conn:
- conn.execute("%s something table something" % keyword)
+ conn.exec_driver_sql("%s something table something" % keyword)
if expected:
eq_(dbapi.connect().mock_calls, [call.cursor(), call.commit()])
diff --git a/test/engine/test_logging.py b/test/engine/test_logging.py
index fe4ff44a7..5d50a010d 100644
--- a/test/engine/test_logging.py
+++ b/test/engine/test_logging.py
@@ -20,6 +20,11 @@ from sqlalchemy.testing import mock
from sqlalchemy.testing.util import lazy_gc
+def exec_sql(engine, sql, *args, **kwargs):
+ with engine.connect() as conn:
+ return conn.exec_driver_sql(sql, *args, **kwargs)
+
+
class LogParamsTest(fixtures.TestBase):
__only_on__ = "sqlite"
__requires__ = ("ad_hoc_engines",)
@@ -29,21 +34,23 @@ class LogParamsTest(fixtures.TestBase):
self.no_param_engine = engines.testing_engine(
options={"echo": True, "hide_parameters": True}
)
- self.eng.execute("create table if not exists foo (data string)")
- self.no_param_engine.execute(
- "create table if not exists foo (data string)"
+ exec_sql(self.eng, "create table if not exists foo (data string)")
+ exec_sql(
+ self.no_param_engine,
+ "create table if not exists foo (data string)",
)
self.buf = logging.handlers.BufferingHandler(100)
for log in [logging.getLogger("sqlalchemy.engine")]:
log.addHandler(self.buf)
def teardown(self):
- self.eng.execute("drop table if exists foo")
+ exec_sql(self.eng, "drop table if exists foo")
for log in [logging.getLogger("sqlalchemy.engine")]:
log.removeHandler(self.buf)
def test_log_large_list_of_dict(self):
- self.eng.execute(
+ exec_sql(
+ self.eng,
"INSERT INTO foo (data) values (:data)",
[{"data": str(i)} for i in range(100)],
)
@@ -71,7 +78,8 @@ class LogParamsTest(fixtures.TestBase):
)
def test_log_no_parameters(self):
- self.no_param_engine.execute(
+ exec_sql(
+ self.no_param_engine,
"INSERT INTO foo (data) values (:data)",
[{"data": str(i)} for i in range(100)],
)
@@ -81,7 +89,8 @@ class LogParamsTest(fixtures.TestBase):
)
def test_log_large_list_of_tuple(self):
- self.eng.execute(
+ exec_sql(
+ self.eng,
"INSERT INTO foo (data) values (?)",
[(str(i),) for i in range(100)],
)
@@ -210,7 +219,7 @@ class LogParamsTest(fixtures.TestBase):
largeparam = "".join(chr(random.randint(52, 85)) for i in range(5000))
- self.eng.execute("INSERT INTO foo (data) values (?)", (largeparam,))
+ exec_sql(self.eng, "INSERT INTO foo (data) values (?)", (largeparam,))
eq_(
self.buf.buffer[1].message,
@@ -225,7 +234,7 @@ class LogParamsTest(fixtures.TestBase):
lp2 = "".join(chr(random.randint(52, 85)) for i in range(8))
lp3 = "".join(chr(random.randint(52, 85)) for i in range(670))
- self.eng.execute("SELECT ?, ?, ?", (lp1, lp2, lp3))
+ exec_sql(self.eng, "SELECT ?, ?, ?", (lp1, lp2, lp3))
eq_(
self.buf.buffer[1].message,
@@ -240,8 +249,10 @@ class LogParamsTest(fixtures.TestBase):
lp2 = "".join(chr(random.randint(52, 85)) for i in range(200))
lp3 = "".join(chr(random.randint(52, 85)) for i in range(670))
- self.eng.execute(
- "INSERT INTO foo (data) values (?)", [(lp1,), (lp2,), (lp3,)]
+ exec_sql(
+ self.eng,
+ "INSERT INTO foo (data) values (?)",
+ [(lp1,), (lp2,), (lp3,)],
)
eq_(
@@ -273,7 +284,8 @@ class LogParamsTest(fixtures.TestBase):
tsa.exc.DBAPIError,
r".*INSERT INTO nonexistent \(data\) values \(:data\)\]\n"
r"\[SQL parameters hidden due to hide_parameters=True\]",
- lambda: self.no_param_engine.execute(
+ lambda: exec_sql(
+ self.no_param_engine,
"INSERT INTO nonexistent (data) values (:data)",
[{"data": str(i)} for i in range(10)],
),
@@ -324,7 +336,7 @@ class LogParamsTest(fixtures.TestBase):
largeparam = "".join(chr(random.randint(52, 85)) for i in range(5000))
self.eng.echo = "debug"
- result = self.eng.execute("SELECT ?", (largeparam,))
+ result = exec_sql(self.eng, "SELECT ?", (largeparam,))
row = result.first()
@@ -370,7 +382,8 @@ class LogParamsTest(fixtures.TestBase):
r"{'data': '6'}, {'data': '7'} ... displaying 10 of "
r"100 total bound parameter sets ... {'data': '98'}, "
r"{'data': '99'}\]",
- lambda: self.eng.execute(
+ lambda: exec_sql(
+ self.eng,
"INSERT INTO nonexistent (data) values (:data)",
[{"data": str(i)} for i in range(100)],
),
@@ -385,7 +398,8 @@ class LogParamsTest(fixtures.TestBase):
r"... displaying "
r"10 of 100 total bound parameter sets ... "
r"\('98',\), \('99',\)\]",
- lambda: self.eng.execute(
+ lambda: exec_sql(
+ self.eng,
"INSERT INTO nonexistent (data) values (?)",
[(str(i),) for i in range(100)],
),
diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py
index 000be1a70..a09b04748 100644
--- a/test/engine/test_reconnect.py
+++ b/test/engine/test_reconnect.py
@@ -826,7 +826,7 @@ class CursorErrTest(fixtures.TestBase):
def test_cursor_explode(self):
db = self._fixture(False, False)
conn = db.connect()
- result = conn.execute("select foo")
+ result = conn.exec_driver_sql("select foo")
result.close()
conn.close()
eq_(
@@ -1006,7 +1006,7 @@ class RealReconnectTest(fixtures.TestBase):
engine = engines.testing_engine()
def broken_initialize(connection):
- connection.execute("select fake_stuff from _fake_table")
+ connection.exec_driver_sql("select fake_stuff from _fake_table")
engine.dialect.initialize = broken_initialize
@@ -1020,7 +1020,7 @@ class RealReconnectTest(fixtures.TestBase):
engine = engines.testing_engine()
def broken_initialize(connection):
- connection.execute("select fake_stuff from _fake_table")
+ connection.exec_driver_sql("select fake_stuff from _fake_table")
engine.dialect.initialize = broken_initialize
@@ -1186,7 +1186,7 @@ class InvalidateDuringResultTest(fixtures.TestBase):
)
def test_invalidate_on_results(self):
conn = self.engine.connect()
- result = conn.execute("select * from sometable")
+ result = conn.exec_driver_sql("select * from sometable")
for x in range(20):
result.fetchone()
self.engine.test_shutdown()
diff --git a/test/engine/test_reflection.py b/test/engine/test_reflection.py
index 9de181216..3a2a3c699 100644
--- a/test/engine/test_reflection.py
+++ b/test/engine/test_reflection.py
@@ -1016,7 +1016,7 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
indexes"""
with testing.db.begin() as conn:
- conn.execute(
+ conn.exec_driver_sql(
"""
CREATE TABLE book (
id INTEGER NOT NULL,
@@ -1056,7 +1056,7 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
"""test reflection of a composite primary key"""
with testing.db.begin() as conn:
- conn.execute(
+ conn.exec_driver_sql(
"""
CREATE TABLE book (
id INTEGER NOT NULL,
@@ -2045,19 +2045,21 @@ class ReverseCasingReflectTest(fixtures.TestBase, AssertsCompiledSQL):
@testing.requires.denormalized_names
def setup(self):
- testing.db.execute(
+ with testing.db.connect() as conn:
+ conn.exec_driver_sql(
+ """
+ CREATE TABLE weird_casing(
+ col1 char(20),
+ "Col2" char(20),
+ "col3" char(20)
+ )
"""
- CREATE TABLE weird_casing(
- col1 char(20),
- "Col2" char(20),
- "col3" char(20)
- )
- """
- )
+ )
@testing.requires.denormalized_names
def teardown(self):
- testing.db.execute("drop table weird_casing")
+ with testing.db.connect() as conn:
+ conn.exec_driver_sql("drop table weird_casing")
@testing.requires.denormalized_names
def test_direct_quoting(self):
diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py
index 39c9b2ad4..c6a6c5e3a 100644
--- a/test/engine/test_transaction.py
+++ b/test/engine/test_transaction.py
@@ -60,7 +60,7 @@ class TransactionTest(fixtures.TestBase):
transaction.commit()
transaction = connection.begin()
- result = connection.execute("select * from query_users")
+ result = connection.exec_driver_sql("select * from query_users")
assert len(result.fetchall()) == 3
transaction.commit()
connection.close()
@@ -74,7 +74,7 @@ class TransactionTest(fixtures.TestBase):
connection.execute(users.insert(), user_id=2, user_name="user2")
connection.execute(users.insert(), user_id=3, user_name="user3")
transaction.rollback()
- result = connection.execute("select * from query_users")
+ result = connection.exec_driver_sql("select * from query_users")
assert len(result.fetchall()) == 0
connection.close()
@@ -92,7 +92,7 @@ class TransactionTest(fixtures.TestBase):
print("Exception: ", e)
transaction.rollback()
- result = connection.execute("select * from query_users")
+ result = connection.exec_driver_sql("select * from query_users")
assert len(result.fetchall()) == 0
connection.close()
@@ -150,7 +150,7 @@ class TransactionTest(fixtures.TestBase):
assert_raises_message(
exc.InvalidRequestError,
"This connection is on an inactive transaction. Please",
- connection.execute,
+ connection.exec_driver_sql,
"select 1",
)
@@ -189,7 +189,12 @@ class TransactionTest(fixtures.TestBase):
assert branched.in_transaction()
branched.execute(users.insert(), user_id=2, user_name="user2")
nested.rollback()
- eq_(connection.scalar("select count(*) from query_users"), 1)
+ eq_(
+ connection.exec_driver_sql(
+ "select count(*) from query_users"
+ ).scalar(),
+ 1,
+ )
finally:
connection.close()
@@ -201,7 +206,12 @@ class TransactionTest(fixtures.TestBase):
branched.execute(users.insert(), user_id=1, user_name="user1")
finally:
connection.close()
- eq_(testing.db.scalar("select count(*) from query_users"), 1)
+ eq_(
+ testing.db.execute(
+ text("select count(*) from query_users")
+ ).scalar(),
+ 1,
+ )
@testing.requires.savepoints
def test_branch_savepoint_rollback(self):
@@ -216,7 +226,12 @@ class TransactionTest(fixtures.TestBase):
nested.rollback()
assert connection.in_transaction()
trans.commit()
- eq_(connection.scalar("select count(*) from query_users"), 1)
+ eq_(
+ connection.exec_driver_sql(
+ "select count(*) from query_users"
+ ).scalar(),
+ 1,
+ )
finally:
connection.close()
@@ -232,7 +247,12 @@ class TransactionTest(fixtures.TestBase):
branched.execute(users.insert(), user_id=2, user_name="user2")
nested.rollback()
assert not connection.in_transaction()
- eq_(connection.scalar("select count(*) from query_users"), 1)
+ eq_(
+ connection.exec_driver_sql(
+ "select count(*) from query_users"
+ ).scalar(),
+ 1,
+ )
finally:
connection.close()
@@ -267,7 +287,12 @@ class TransactionTest(fixtures.TestBase):
conn2 = connection.execution_options(dummy=True)
conn2.execute(users.insert(), user_id=2, user_name="user2")
transaction.rollback()
- eq_(connection.scalar("select count(*) from query_users"), 0)
+ eq_(
+ connection.exec_driver_sql(
+ "select count(*) from query_users"
+ ).scalar(),
+ 0,
+ )
finally:
connection.close()
@@ -283,9 +308,12 @@ class TransactionTest(fixtures.TestBase):
trans2.commit()
transaction.rollback()
self.assert_(
- connection.scalar("select count(*) from " "query_users") == 0
+ connection.exec_driver_sql(
+ "select count(*) from " "query_users"
+ ).scalar()
+ == 0
)
- result = connection.execute("select * from query_users")
+ result = connection.exec_driver_sql("select * from query_users")
assert len(result.fetchall()) == 0
connection.close()
@@ -301,7 +329,10 @@ class TransactionTest(fixtures.TestBase):
assert not trans.is_active
self.assert_(
- connection.scalar("select count(*) from " "query_users") == 0
+ connection.exec_driver_sql(
+ "select count(*) from " "query_users"
+ ).scalar()
+ == 0
)
trans = connection.begin()
@@ -309,7 +340,10 @@ class TransactionTest(fixtures.TestBase):
trans.__exit__(None, None, None)
assert not trans.is_active
self.assert_(
- connection.scalar("select count(*) from " "query_users") == 1
+ connection.exec_driver_sql(
+ "select count(*) from " "query_users"
+ ).scalar()
+ == 1
)
connection.close()
@@ -328,9 +362,12 @@ class TransactionTest(fixtures.TestBase):
transaction.commit()
assert not connection.in_transaction()
self.assert_(
- connection.scalar("select count(*) from " "query_users") == 5
+ connection.exec_driver_sql(
+ "select count(*) from " "query_users"
+ ).scalar()
+ == 5
)
- result = connection.execute("select * from query_users")
+ result = connection.exec_driver_sql("select * from query_users")
assert len(result.fetchall()) == 5
connection.close()
@@ -349,9 +386,12 @@ class TransactionTest(fixtures.TestBase):
transaction.close()
assert not connection.in_transaction()
self.assert_(
- connection.scalar("select count(*) from " "query_users") == 0
+ connection.exec_driver_sql(
+ "select count(*) from " "query_users"
+ ).scalar()
+ == 0
)
- result = connection.execute("select * from query_users")
+ result = connection.exec_driver_sql("select * from query_users")
assert len(result.fetchall()) == 0
connection.close()
@@ -406,7 +446,7 @@ class TransactionTest(fixtures.TestBase):
assert_raises_message(
exc.InvalidRequestError,
"This connection is on an inactive savepoint transaction.",
- connection.execute,
+ connection.exec_driver_sql,
"select 1",
)
trans2.rollback()
@@ -701,7 +741,7 @@ class AutoRollbackTest(fixtures.TestBase):
test_needs_acid=True,
)
users.create(conn1)
- conn1.execute("select * from deadlock_users")
+ conn1.exec_driver_sql("select * from deadlock_users")
conn1.close()
# without auto-rollback in the connection pool's return() logic,
@@ -732,20 +772,23 @@ class ExplicitAutoCommitTest(fixtures.TestBase):
Column("id", Integer, primary_key=True),
Column("data", String(100)),
)
- metadata.create_all()
- testing.db.execute(
- "create function insert_foo(varchar) "
- "returns integer as 'insert into foo(data) "
- "values ($1);select 1;' language sql"
- )
+ with testing.db.connect() as conn:
+ metadata.create_all(conn)
+ conn.exec_driver_sql(
+ "create function insert_foo(varchar) "
+ "returns integer as 'insert into foo(data) "
+ "values ($1);select 1;' language sql"
+ )
def teardown(self):
- foo.delete().execute().close()
+ with testing.db.connect() as conn:
+ conn.execute(foo.delete())
@classmethod
def teardown_class(cls):
- testing.db.execute("drop function insert_foo(varchar)")
- metadata.drop_all()
+ with testing.db.connect() as conn:
+ conn.exec_driver_sql("drop function insert_foo(varchar)")
+ metadata.drop_all(conn)
def test_control(self):