summaryrefslogtreecommitdiff
path: root/oslo_db/tests/sqlalchemy/test_exc_filters.py
diff options
context:
space:
mode:
Diffstat (limited to 'oslo_db/tests/sqlalchemy/test_exc_filters.py')
-rw-r--r--oslo_db/tests/sqlalchemy/test_exc_filters.py24
1 files changed, 18 insertions, 6 deletions
diff --git a/oslo_db/tests/sqlalchemy/test_exc_filters.py b/oslo_db/tests/sqlalchemy/test_exc_filters.py
index 7f64594..af3cd91 100644
--- a/oslo_db/tests/sqlalchemy/test_exc_filters.py
+++ b/oslo_db/tests/sqlalchemy/test_exc_filters.py
@@ -26,6 +26,7 @@ from sqlalchemy.orm import registry
from sqlalchemy import sql
from oslo_db import exception
+from oslo_db.sqlalchemy import compat
from oslo_db.sqlalchemy import engines
from oslo_db.sqlalchemy import exc_filters
from oslo_db.sqlalchemy import utils
@@ -139,18 +140,29 @@ class TestsExceptionFilter(_SQLAExceptionMatcher, test_base.BaseTestCase):
# statement
self.engine.connect().close()
- with test_utils.nested(
+ patches = [
mock.patch.object(engine.dialect, "do_execute", do_execute),
# replace the whole DBAPI rather than patching "Error"
# as some DBAPIs might not be patchable (?)
mock.patch.object(engine.dialect,
"dbapi",
mock.Mock(Error=self.Error)),
+
mock.patch.object(engine.dialect, "name", dialect_name),
mock.patch.object(engine.dialect,
"is_disconnect",
lambda *args: is_disconnect)
- ):
+ ]
+ if compat.sqla_2:
+ patches.append(
+ mock.patch.object(
+ engine.dialect,
+ "loaded_dbapi",
+ mock.Mock(Error=self.Error),
+ )
+ )
+
+ with test_utils.nested(*patches):
yield
def _run_test(self, dialect_name, statement, raises, expected,
@@ -754,7 +766,7 @@ class TestExceptionCauseMySQLSavepoint(
session.execute(sql.text("select 1"))
# close underying DB connection
- session.connection().connection.connection.close()
+ compat.driver_connection(session.connection()).close()
# alternate approach, but same idea:
# conn_id = session.scalar("select connection_id()")
@@ -779,7 +791,7 @@ class TestExceptionCauseMySQLSavepoint(
session.execute(sql.text("select 1"))
# close underying DB connection
- session.connection().connection.connection.close()
+ compat.driver_connection(session.connection()).close()
# alternate approach, but same idea:
# conn_id = session.scalar("select connection_id()")
@@ -947,8 +959,8 @@ class TestDuplicate(TestsExceptionFilter):
class TestDeadlock(TestsExceptionFilter):
statement = ('SELECT quota_usages.created_at AS '
'quota_usages_created_at FROM quota_usages '
- 'WHERE quota_usages.project_id = %(project_id_1)s '
- 'AND quota_usages.deleted = %(deleted_1)s FOR UPDATE')
+ 'WHERE quota_usages.project_id = :project_id_1 '
+ 'AND quota_usages.deleted = :deleted_1 FOR UPDATE')
params = {
'project_id_1': '8891d4478bbf48ad992f050cdf55e9b5',
'deleted_1': 0