summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2023-02-24 16:15:21 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2023-03-01 16:20:36 -0500
commit45d0a501609e3588f1accac59c08358c4c6c74a1 (patch)
treec97aafa7c5d1709ff1af8fe9717bd6220c35d0d9
parent8b108297d075ae68178cd18a9cb4d06feee7e075 (diff)
downloadsqlalchemy-ticket_5648.tar.gz
ensure event handlers called for all do_pingticket_5648
The support for pool ping listeners to receive exception events via the :meth:`.ConnectionEvents.handle_error` event added in 2.0.0b1 for :ticket:`5648` failed to take into account dialect-specific ping routines such as that of MySQL and PostgreSQL. The dialect feature has been reworked so that all dialects participate within event handling. Additionally, a new boolean element :attr:`.ExceptionContext.is_pre_ping` is added which identifies if this operation is occurring within the pre-ping operation. For this release, third party dialects which implement a custom :meth:`_engine.Dialect.do_ping` method can opt in to the newly improved behavior by having their method no longer catch exceptions or check exceptions for "is_disconnect", instead just propagating all exceptions outwards. Checking the exception for "is_disconnect" is now done by an enclosing method on the default dialect, which ensures that the event hook is invoked for all exception scenarios before testing the exception as a "disconnect" exception. If an existing ``do_ping()`` method continues to catch exceptions and check "is_disconnect", it will continue to work as it did previously, but ``handle_error`` hooks will not have access to the exception if it isn't propagated outwards. Fixes: #5648 Change-Id: I6535d5cb389e1a761aad8c37cfeb332c548b876d
-rw-r--r--doc/build/changelog/unreleased_20/5648.rst24
-rw-r--r--lib/sqlalchemy/dialects/mysql/mysqldb.py11
-rw-r--r--lib/sqlalchemy/dialects/postgresql/_psycopg_common.py27
-rw-r--r--lib/sqlalchemy/dialects/postgresql/asyncpg.py11
-rw-r--r--lib/sqlalchemy/engine/base.py6
-rw-r--r--lib/sqlalchemy/engine/default.py27
-rw-r--r--lib/sqlalchemy/engine/interfaces.py14
-rw-r--r--lib/sqlalchemy/pool/base.py7
-rw-r--r--lib/sqlalchemy/testing/engines.py4
-rw-r--r--test/engine/test_reconnect.py152
10 files changed, 232 insertions, 51 deletions
diff --git a/doc/build/changelog/unreleased_20/5648.rst b/doc/build/changelog/unreleased_20/5648.rst
new file mode 100644
index 000000000..acc1251b1
--- /dev/null
+++ b/doc/build/changelog/unreleased_20/5648.rst
@@ -0,0 +1,24 @@
+.. change::
+ :tags: bug, mysql, postgresql
+ :tickets: 5648
+
+ The support for pool ping listeners to receive exception events via the
+ :meth:`.ConnectionEvents.handle_error` event added in 2.0.0b1 for
+ :ticket:`5648` failed to take into account dialect-specific ping routines
+ such as that of MySQL and PostgreSQL. The dialect feature has been reworked
+ so that all dialects participate within event handling. Additionally,
+ a new boolean element :attr:`.ExceptionContext.is_pre_ping` is added
+ which identifies if this operation is occurring within the pre-ping
+ operation.
+
+ For this release, third party dialects which implement a custom
+ :meth:`_engine.Dialect.do_ping` method can opt in to the newly improved
+ behavior by having their method no longer catch exceptions or check
+ exceptions for "is_disconnect", instead just propagating all exceptions
+ outwards. Checking the exception for "is_disconnect" is now done by an
+ enclosing method on the default dialect, which ensures that the event hook
+ is invoked for all exception scenarios before testing the exception as a
+ "disconnect" exception. If an existing ``do_ping()`` method continues to
+ catch exceptions and check "is_disconnect", it will continue to work as it
+ did previously, but ``handle_error`` hooks will not have access to the
+ exception if it isn't propagated outwards.
diff --git a/lib/sqlalchemy/dialects/mysql/mysqldb.py b/lib/sqlalchemy/dialects/mysql/mysqldb.py
index 5c9d11a53..0868401d4 100644
--- a/lib/sqlalchemy/dialects/mysql/mysqldb.py
+++ b/lib/sqlalchemy/dialects/mysql/mysqldb.py
@@ -168,15 +168,8 @@ class MySQLDialect_mysqldb(MySQLDialect):
return on_connect
def do_ping(self, dbapi_connection):
- try:
- dbapi_connection.ping(False)
- except self.dbapi.Error as err:
- if self.is_disconnect(err, dbapi_connection, None):
- return False
- else:
- raise
- else:
- return True
+ dbapi_connection.ping(False)
+ return True
def do_executemany(self, cursor, statement, parameters, context=None):
rowcount = cursor.executemany(statement, parameters)
diff --git a/lib/sqlalchemy/dialects/postgresql/_psycopg_common.py b/lib/sqlalchemy/dialects/postgresql/_psycopg_common.py
index d9ddefd38..739cbc5a9 100644
--- a/lib/sqlalchemy/dialects/postgresql/_psycopg_common.py
+++ b/lib/sqlalchemy/dialects/postgresql/_psycopg_common.py
@@ -178,20 +178,15 @@ class _PGDialect_common_psycopg(PGDialect):
def do_ping(self, dbapi_connection):
cursor = None
before_autocommit = dbapi_connection.autocommit
+
+ if not before_autocommit:
+ dbapi_connection.autocommit = True
+ cursor = dbapi_connection.cursor()
try:
- if not before_autocommit:
- self._do_autocommit(dbapi_connection, True)
- cursor = dbapi_connection.cursor()
- try:
- cursor.execute(self._dialect_specific_select_one)
- finally:
- cursor.close()
- if not before_autocommit and not dbapi_connection.closed:
- self._do_autocommit(dbapi_connection, before_autocommit)
- except self.dbapi.Error as err:
- if self.is_disconnect(err, dbapi_connection, cursor):
- return False
- else:
- raise
- else:
- return True
+ cursor.execute(self._dialect_specific_select_one)
+ finally:
+ cursor.close()
+ if not before_autocommit and not dbapi_connection.closed:
+ dbapi_connection.autocommit = before_autocommit
+
+ return True
diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
index e00584503..2acc5fea3 100644
--- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py
+++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
@@ -992,15 +992,8 @@ class PGDialect_asyncpg(PGDialect):
return ([], opts)
def do_ping(self, dbapi_connection):
- try:
- dbapi_connection.ping()
- except self.dbapi.Error as err:
- if self.is_disconnect(err, dbapi_connection, None):
- return False
- else:
- raise
- else:
- return True
+ dbapi_connection.ping()
+ return True
@classmethod
def get_pool_class(cls, url):
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index f6c637aa8..09610b069 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -2275,6 +2275,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
context,
self._is_disconnect,
invalidate_pool_on_disconnect,
+ False,
)
for fn in self.dialect.dispatch.handle_error:
@@ -2345,6 +2346,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
engine: Optional[Engine] = None,
is_disconnect: Optional[bool] = None,
invalidate_pool_on_disconnect: bool = True,
+ is_pre_ping: bool = False,
) -> NoReturn:
exc_info = sys.exc_info()
@@ -2385,6 +2387,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]):
None,
is_disconnect,
invalidate_pool_on_disconnect,
+ is_pre_ping,
)
for fn in dialect.dispatch.handle_error:
try:
@@ -2443,6 +2446,7 @@ class ExceptionContextImpl(ExceptionContext):
"execution_context",
"is_disconnect",
"invalidate_pool_on_disconnect",
+ "is_pre_ping",
)
def __init__(
@@ -2458,6 +2462,7 @@ class ExceptionContextImpl(ExceptionContext):
context: Optional[ExecutionContext],
is_disconnect: bool,
invalidate_pool_on_disconnect: bool,
+ is_pre_ping: bool,
):
self.engine = engine
self.dialect = dialect
@@ -2469,6 +2474,7 @@ class ExceptionContextImpl(ExceptionContext):
self.parameters = parameters
self.is_disconnect = is_disconnect
self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect
+ self.is_pre_ping = is_pre_ping
class Transaction(TransactionalContext):
diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py
index f8126fa30..3e4e6fb9a 100644
--- a/lib/sqlalchemy/engine/default.py
+++ b/lib/sqlalchemy/engine/default.py
@@ -669,16 +669,11 @@ class DefaultDialect(Dialect):
def _dialect_specific_select_one(self):
return str(expression.select(1).compile(dialect=self))
- def do_ping(self, dbapi_connection: DBAPIConnection) -> bool:
- cursor = None
+ def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
try:
- cursor = dbapi_connection.cursor()
- try:
- cursor.execute(self._dialect_specific_select_one)
- finally:
- cursor.close()
+ return self.do_ping(dbapi_connection)
except self.loaded_dbapi.Error as err:
- is_disconnect = self.is_disconnect(err, dbapi_connection, cursor)
+ is_disconnect = self.is_disconnect(err, dbapi_connection, None)
if self._has_events:
try:
@@ -687,19 +682,25 @@ class DefaultDialect(Dialect):
self,
is_disconnect=is_disconnect,
invalidate_pool_on_disconnect=False,
+ is_pre_ping=True,
)
except exc.StatementError as new_err:
is_disconnect = new_err.connection_invalidated
- # other exceptions modified by the event handler will be
- # thrown
-
if is_disconnect:
return False
else:
raise
- else:
- return True
+
+ def do_ping(self, dbapi_connection: DBAPIConnection) -> bool:
+ cursor = None
+
+ cursor = dbapi_connection.cursor()
+ try:
+ cursor.execute(self._dialect_specific_select_one)
+ finally:
+ cursor.close()
+ return True
def create_xid(self):
"""Create a random two-phase transaction ID.
diff --git a/lib/sqlalchemy/engine/interfaces.py b/lib/sqlalchemy/engine/interfaces.py
index c1de13221..9952a85e3 100644
--- a/lib/sqlalchemy/engine/interfaces.py
+++ b/lib/sqlalchemy/engine/interfaces.py
@@ -1967,6 +1967,9 @@ class Dialect(EventTarget):
raise NotImplementedError()
+ def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
+ raise NotImplementedError()
+
def do_ping(self, dbapi_connection: DBAPIConnection) -> bool:
"""ping the DBAPI connection and return True if the connection is
usable."""
@@ -3291,6 +3294,17 @@ class ExceptionContext:
"""
+ is_pre_ping: bool
+ """Indicates if this error is occurring within the "pre-ping" step
+ performed when :paramref:`_sa.create_engine.pool_pre_ping` is set to
+ ``True``. In this mode, the :attr:`.ExceptionContext.engine` attribute
+ will be ``None``. The dialect in use is accessible via the
+ :attr:`.ExceptionContext.dialect` attribute.
+
+ .. versionadded:: 2.0.5
+
+ """
+
class AdaptedConnection:
"""Interface of an adapted connection object to support the DBAPI protocol.
diff --git a/lib/sqlalchemy/pool/base.py b/lib/sqlalchemy/pool/base.py
index d67f32442..ac487452c 100644
--- a/lib/sqlalchemy/pool/base.py
+++ b/lib/sqlalchemy/pool/base.py
@@ -132,7 +132,7 @@ class _ConnDialect:
def do_close(self, dbapi_connection: DBAPIConnection) -> None:
dbapi_connection.close()
- def do_ping(self, dbapi_connection: DBAPIConnection) -> bool:
+ def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
raise NotImplementedError(
"The ping feature requires that a dialect is "
"passed to the connection pool."
@@ -1266,6 +1266,7 @@ class _ConnectionFairy(PoolProxiedConnection):
threadconns: Optional[threading.local] = None,
fairy: Optional[_ConnectionFairy] = None,
) -> _ConnectionFairy:
+
if not fairy:
fairy = _ConnectionRecord.checkout(pool)
@@ -1304,7 +1305,9 @@ class _ConnectionFairy(PoolProxiedConnection):
"Pool pre-ping on connection %s",
fairy.dbapi_connection,
)
- result = pool._dialect.do_ping(fairy.dbapi_connection)
+ result = pool._dialect._do_ping_w_event(
+ fairy.dbapi_connection
+ )
if not result:
if fairy._echo:
pool.logger.debug(
diff --git a/lib/sqlalchemy/testing/engines.py b/lib/sqlalchemy/testing/engines.py
index 546b2f16a..ece54cb52 100644
--- a/lib/sqlalchemy/testing/engines.py
+++ b/lib/sqlalchemy/testing/engines.py
@@ -454,8 +454,8 @@ class DBAPIProxyConnection:
"""
- def __init__(self, engine, cursor_cls):
- self.conn = engine.pool._creator()
+ def __init__(self, engine, conn, cursor_cls):
+ self.conn = conn
self.engine = engine
self.cursor_cls = cursor_cls
diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py
index 2a6b21e6b..8ff34da98 100644
--- a/test/engine/test_reconnect.py
+++ b/test/engine/test_reconnect.py
@@ -1,3 +1,4 @@
+import itertools
import time
from unittest.mock import call
from unittest.mock import Mock
@@ -28,6 +29,8 @@ from sqlalchemy.testing import is_false
from sqlalchemy.testing import is_true
from sqlalchemy.testing import mock
from sqlalchemy.testing import ne_
+from sqlalchemy.testing.engines import DBAPIProxyConnection
+from sqlalchemy.testing.engines import DBAPIProxyCursor
from sqlalchemy.testing.engines import testing_engine
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
@@ -1002,6 +1005,155 @@ def _assert_invalidated(fn, *args):
raise
+class RealPrePingEventHandlerTest(fixtures.TestBase):
+ """real test for issue #5648, which had to be revisited for 2.0 as the
+ initial version was not adequately tested and non-implementation for
+ mysql, postgresql was not caught
+
+ """
+
+ __backend__ = True
+ __requires__ = "graceful_disconnects", "ad_hoc_engines"
+
+ @testing.fixture
+ def ping_fixture(self, testing_engine):
+ engine = testing_engine(
+ options={"pool_pre_ping": True, "_initialize": False}
+ )
+
+ existing_connect = engine.dialect.dbapi.connect
+
+ fail = False
+ fail_count = itertools.count()
+ DBAPIError = engine.dialect.dbapi.Error
+
+ class ExplodeConnection(DBAPIProxyConnection):
+ def ping(self, *arg, **kw):
+ if fail and next(fail_count) < 1:
+ raise DBAPIError("unhandled disconnect situation")
+ else:
+ return True
+
+ class ExplodeCursor(DBAPIProxyCursor):
+ def execute(self, stmt, parameters=None, **kw):
+ if fail and next(fail_count) < 1:
+ raise DBAPIError("unhandled disconnect situation")
+ else:
+ return super().execute(stmt, parameters=parameters, **kw)
+
+ def mock_connect(*arg, **kw):
+ real_connection = existing_connect(*arg, **kw)
+ return ExplodeConnection(engine, real_connection, ExplodeCursor)
+
+ with mock.patch.object(
+ engine.dialect.loaded_dbapi, "connect", mock_connect
+ ):
+
+ # set up initial connection. pre_ping works on subsequent connects
+ engine.connect().close()
+
+ # ping / exec will fail
+ fail = True
+
+ yield engine
+
+ @testing.fixture
+ def ping_fixture_all_errs_disconnect(self, ping_fixture):
+ engine = ping_fixture
+
+ with mock.patch.object(
+ engine.dialect, "is_disconnect", lambda *arg, **kw: True
+ ):
+ yield engine
+
+ def test_control(self, ping_fixture):
+ """test the fixture raises on connect"""
+ engine = ping_fixture
+
+ with expect_raises_message(
+ exc.DBAPIError, "unhandled disconnect situation"
+ ):
+ engine.connect()
+
+ def test_downgrade_control(self, ping_fixture_all_errs_disconnect):
+ """test the disconnect fixture doesn't raise, since it considers
+ all errors to be disconnect errors.
+
+ """
+
+ engine = ping_fixture_all_errs_disconnect
+
+ conn = engine.connect()
+ conn.close()
+
+ def test_event_handler_didnt_upgrade_disconnect(self, ping_fixture):
+ """test that having an event handler that doesn't do anything
+ keeps the behavior in place for a fatal error.
+
+ """
+ engine = ping_fixture
+
+ @event.listens_for(engine, "handle_error")
+ def setup_disconnect(ctx):
+ assert not ctx.is_disconnect
+
+ with expect_raises_message(
+ exc.DBAPIError, "unhandled disconnect situation"
+ ):
+ engine.connect()
+
+ def test_event_handler_didnt_downgrade_disconnect(
+ self, ping_fixture_all_errs_disconnect
+ ):
+ """test that having an event handler that doesn't do anything
+ keeps the behavior in place for a disconnect error.
+
+ """
+ engine = ping_fixture_all_errs_disconnect
+
+ @event.listens_for(engine, "handle_error")
+ def setup_disconnect(ctx):
+ assert ctx.is_pre_ping
+ assert ctx.is_disconnect
+
+ conn = engine.connect()
+ conn.close()
+
+ def test_event_handler_can_upgrade_disconnect(self, ping_fixture):
+ """test that an event hook can receive a fatal error and convert
+ it to be a disconnect error during pre-ping"""
+
+ engine = ping_fixture
+
+ @event.listens_for(engine, "handle_error")
+ def setup_disconnect(ctx):
+ assert ctx.is_pre_ping
+ ctx.is_disconnect = True
+
+ conn = engine.connect()
+ # no error
+ conn.close()
+
+ def test_event_handler_can_downgrade_disconnect(
+ self, ping_fixture_all_errs_disconnect
+ ):
+ """test that an event hook can receive a disconnect error and convert
+ it to be a fatal error during pre-ping"""
+
+ engine = ping_fixture_all_errs_disconnect
+
+ @event.listens_for(engine, "handle_error")
+ def setup_disconnect(ctx):
+ assert ctx.is_disconnect
+ if ctx.is_pre_ping:
+ ctx.is_disconnect = False
+
+ with expect_raises_message(
+ exc.DBAPIError, "unhandled disconnect situation"
+ ):
+ engine.connect()
+
+
class RealReconnectTest(fixtures.TestBase):
__backend__ = True
__requires__ = "graceful_disconnects", "ad_hoc_engines"