diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2023-02-24 16:15:21 -0500 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2023-03-01 16:20:36 -0500 |
commit | 45d0a501609e3588f1accac59c08358c4c6c74a1 (patch) | |
tree | c97aafa7c5d1709ff1af8fe9717bd6220c35d0d9 | |
parent | 8b108297d075ae68178cd18a9cb4d06feee7e075 (diff) | |
download | sqlalchemy-ticket_5648.tar.gz |
ensure event handlers called for all do_pingticket_5648
The support for pool ping listeners to receive exception events via the
:meth:`.ConnectionEvents.handle_error` event added in 2.0.0b1 for
:ticket:`5648` failed to take into account dialect-specific ping routines
such as that of MySQL and PostgreSQL. The dialect feature has been reworked
so that all dialects participate within event handling. Additionally,
a new boolean element :attr:`.ExceptionContext.is_pre_ping` is added
which identifies if this operation is occurring within the pre-ping
operation.
For this release, third party dialects which implement a custom
:meth:`_engine.Dialect.do_ping` method can opt in to the newly improved
behavior by having their method no longer catch exceptions or check
exceptions for "is_disconnect", instead just propagating all exceptions
outwards. Checking the exception for "is_disconnect" is now done by an
enclosing method on the default dialect, which ensures that the event hook
is invoked for all exception scenarios before testing the exception as a
"disconnect" exception. If an existing ``do_ping()`` method continues to
catch exceptions and check "is_disconnect", it will continue to work as it
did previously, but ``handle_error`` hooks will not have access to the
exception if it isn't propagated outwards.
Fixes: #5648
Change-Id: I6535d5cb389e1a761aad8c37cfeb332c548b876d
-rw-r--r-- | doc/build/changelog/unreleased_20/5648.rst | 24 | ||||
-rw-r--r-- | lib/sqlalchemy/dialects/mysql/mysqldb.py | 11 | ||||
-rw-r--r-- | lib/sqlalchemy/dialects/postgresql/_psycopg_common.py | 27 | ||||
-rw-r--r-- | lib/sqlalchemy/dialects/postgresql/asyncpg.py | 11 | ||||
-rw-r--r-- | lib/sqlalchemy/engine/base.py | 6 | ||||
-rw-r--r-- | lib/sqlalchemy/engine/default.py | 27 | ||||
-rw-r--r-- | lib/sqlalchemy/engine/interfaces.py | 14 | ||||
-rw-r--r-- | lib/sqlalchemy/pool/base.py | 7 | ||||
-rw-r--r-- | lib/sqlalchemy/testing/engines.py | 4 | ||||
-rw-r--r-- | test/engine/test_reconnect.py | 152 |
10 files changed, 232 insertions, 51 deletions
diff --git a/doc/build/changelog/unreleased_20/5648.rst b/doc/build/changelog/unreleased_20/5648.rst new file mode 100644 index 000000000..acc1251b1 --- /dev/null +++ b/doc/build/changelog/unreleased_20/5648.rst @@ -0,0 +1,24 @@ +.. change:: + :tags: bug, mysql, postgresql + :tickets: 5648 + + The support for pool ping listeners to receive exception events via the + :meth:`.ConnectionEvents.handle_error` event added in 2.0.0b1 for + :ticket:`5648` failed to take into account dialect-specific ping routines + such as that of MySQL and PostgreSQL. The dialect feature has been reworked + so that all dialects participate within event handling. Additionally, + a new boolean element :attr:`.ExceptionContext.is_pre_ping` is added + which identifies if this operation is occurring within the pre-ping + operation. + + For this release, third party dialects which implement a custom + :meth:`_engine.Dialect.do_ping` method can opt in to the newly improved + behavior by having their method no longer catch exceptions or check + exceptions for "is_disconnect", instead just propagating all exceptions + outwards. Checking the exception for "is_disconnect" is now done by an + enclosing method on the default dialect, which ensures that the event hook + is invoked for all exception scenarios before testing the exception as a + "disconnect" exception. If an existing ``do_ping()`` method continues to + catch exceptions and check "is_disconnect", it will continue to work as it + did previously, but ``handle_error`` hooks will not have access to the + exception if it isn't propagated outwards. diff --git a/lib/sqlalchemy/dialects/mysql/mysqldb.py b/lib/sqlalchemy/dialects/mysql/mysqldb.py index 5c9d11a53..0868401d4 100644 --- a/lib/sqlalchemy/dialects/mysql/mysqldb.py +++ b/lib/sqlalchemy/dialects/mysql/mysqldb.py @@ -168,15 +168,8 @@ class MySQLDialect_mysqldb(MySQLDialect): return on_connect def do_ping(self, dbapi_connection): - try: - dbapi_connection.ping(False) - except self.dbapi.Error as err: - if self.is_disconnect(err, dbapi_connection, None): - return False - else: - raise - else: - return True + dbapi_connection.ping(False) + return True def do_executemany(self, cursor, statement, parameters, context=None): rowcount = cursor.executemany(statement, parameters) diff --git a/lib/sqlalchemy/dialects/postgresql/_psycopg_common.py b/lib/sqlalchemy/dialects/postgresql/_psycopg_common.py index d9ddefd38..739cbc5a9 100644 --- a/lib/sqlalchemy/dialects/postgresql/_psycopg_common.py +++ b/lib/sqlalchemy/dialects/postgresql/_psycopg_common.py @@ -178,20 +178,15 @@ class _PGDialect_common_psycopg(PGDialect): def do_ping(self, dbapi_connection): cursor = None before_autocommit = dbapi_connection.autocommit + + if not before_autocommit: + dbapi_connection.autocommit = True + cursor = dbapi_connection.cursor() try: - if not before_autocommit: - self._do_autocommit(dbapi_connection, True) - cursor = dbapi_connection.cursor() - try: - cursor.execute(self._dialect_specific_select_one) - finally: - cursor.close() - if not before_autocommit and not dbapi_connection.closed: - self._do_autocommit(dbapi_connection, before_autocommit) - except self.dbapi.Error as err: - if self.is_disconnect(err, dbapi_connection, cursor): - return False - else: - raise - else: - return True + cursor.execute(self._dialect_specific_select_one) + finally: + cursor.close() + if not before_autocommit and not dbapi_connection.closed: + dbapi_connection.autocommit = before_autocommit + + return True diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py index e00584503..2acc5fea3 100644 --- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py +++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py @@ -992,15 +992,8 @@ class PGDialect_asyncpg(PGDialect): return ([], opts) def do_ping(self, dbapi_connection): - try: - dbapi_connection.ping() - except self.dbapi.Error as err: - if self.is_disconnect(err, dbapi_connection, None): - return False - else: - raise - else: - return True + dbapi_connection.ping() + return True @classmethod def get_pool_class(cls, url): diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index f6c637aa8..09610b069 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -2275,6 +2275,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): context, self._is_disconnect, invalidate_pool_on_disconnect, + False, ) for fn in self.dialect.dispatch.handle_error: @@ -2345,6 +2346,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): engine: Optional[Engine] = None, is_disconnect: Optional[bool] = None, invalidate_pool_on_disconnect: bool = True, + is_pre_ping: bool = False, ) -> NoReturn: exc_info = sys.exc_info() @@ -2385,6 +2387,7 @@ class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): None, is_disconnect, invalidate_pool_on_disconnect, + is_pre_ping, ) for fn in dialect.dispatch.handle_error: try: @@ -2443,6 +2446,7 @@ class ExceptionContextImpl(ExceptionContext): "execution_context", "is_disconnect", "invalidate_pool_on_disconnect", + "is_pre_ping", ) def __init__( @@ -2458,6 +2462,7 @@ class ExceptionContextImpl(ExceptionContext): context: Optional[ExecutionContext], is_disconnect: bool, invalidate_pool_on_disconnect: bool, + is_pre_ping: bool, ): self.engine = engine self.dialect = dialect @@ -2469,6 +2474,7 @@ class ExceptionContextImpl(ExceptionContext): self.parameters = parameters self.is_disconnect = is_disconnect self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect + self.is_pre_ping = is_pre_ping class Transaction(TransactionalContext): diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index f8126fa30..3e4e6fb9a 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -669,16 +669,11 @@ class DefaultDialect(Dialect): def _dialect_specific_select_one(self): return str(expression.select(1).compile(dialect=self)) - def do_ping(self, dbapi_connection: DBAPIConnection) -> bool: - cursor = None + def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool: try: - cursor = dbapi_connection.cursor() - try: - cursor.execute(self._dialect_specific_select_one) - finally: - cursor.close() + return self.do_ping(dbapi_connection) except self.loaded_dbapi.Error as err: - is_disconnect = self.is_disconnect(err, dbapi_connection, cursor) + is_disconnect = self.is_disconnect(err, dbapi_connection, None) if self._has_events: try: @@ -687,19 +682,25 @@ class DefaultDialect(Dialect): self, is_disconnect=is_disconnect, invalidate_pool_on_disconnect=False, + is_pre_ping=True, ) except exc.StatementError as new_err: is_disconnect = new_err.connection_invalidated - # other exceptions modified by the event handler will be - # thrown - if is_disconnect: return False else: raise - else: - return True + + def do_ping(self, dbapi_connection: DBAPIConnection) -> bool: + cursor = None + + cursor = dbapi_connection.cursor() + try: + cursor.execute(self._dialect_specific_select_one) + finally: + cursor.close() + return True def create_xid(self): """Create a random two-phase transaction ID. diff --git a/lib/sqlalchemy/engine/interfaces.py b/lib/sqlalchemy/engine/interfaces.py index c1de13221..9952a85e3 100644 --- a/lib/sqlalchemy/engine/interfaces.py +++ b/lib/sqlalchemy/engine/interfaces.py @@ -1967,6 +1967,9 @@ class Dialect(EventTarget): raise NotImplementedError() + def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool: + raise NotImplementedError() + def do_ping(self, dbapi_connection: DBAPIConnection) -> bool: """ping the DBAPI connection and return True if the connection is usable.""" @@ -3291,6 +3294,17 @@ class ExceptionContext: """ + is_pre_ping: bool + """Indicates if this error is occurring within the "pre-ping" step + performed when :paramref:`_sa.create_engine.pool_pre_ping` is set to + ``True``. In this mode, the :attr:`.ExceptionContext.engine` attribute + will be ``None``. The dialect in use is accessible via the + :attr:`.ExceptionContext.dialect` attribute. + + .. versionadded:: 2.0.5 + + """ + class AdaptedConnection: """Interface of an adapted connection object to support the DBAPI protocol. diff --git a/lib/sqlalchemy/pool/base.py b/lib/sqlalchemy/pool/base.py index d67f32442..ac487452c 100644 --- a/lib/sqlalchemy/pool/base.py +++ b/lib/sqlalchemy/pool/base.py @@ -132,7 +132,7 @@ class _ConnDialect: def do_close(self, dbapi_connection: DBAPIConnection) -> None: dbapi_connection.close() - def do_ping(self, dbapi_connection: DBAPIConnection) -> bool: + def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool: raise NotImplementedError( "The ping feature requires that a dialect is " "passed to the connection pool." @@ -1266,6 +1266,7 @@ class _ConnectionFairy(PoolProxiedConnection): threadconns: Optional[threading.local] = None, fairy: Optional[_ConnectionFairy] = None, ) -> _ConnectionFairy: + if not fairy: fairy = _ConnectionRecord.checkout(pool) @@ -1304,7 +1305,9 @@ class _ConnectionFairy(PoolProxiedConnection): "Pool pre-ping on connection %s", fairy.dbapi_connection, ) - result = pool._dialect.do_ping(fairy.dbapi_connection) + result = pool._dialect._do_ping_w_event( + fairy.dbapi_connection + ) if not result: if fairy._echo: pool.logger.debug( diff --git a/lib/sqlalchemy/testing/engines.py b/lib/sqlalchemy/testing/engines.py index 546b2f16a..ece54cb52 100644 --- a/lib/sqlalchemy/testing/engines.py +++ b/lib/sqlalchemy/testing/engines.py @@ -454,8 +454,8 @@ class DBAPIProxyConnection: """ - def __init__(self, engine, cursor_cls): - self.conn = engine.pool._creator() + def __init__(self, engine, conn, cursor_cls): + self.conn = conn self.engine = engine self.cursor_cls = cursor_cls diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py index 2a6b21e6b..8ff34da98 100644 --- a/test/engine/test_reconnect.py +++ b/test/engine/test_reconnect.py @@ -1,3 +1,4 @@ +import itertools import time from unittest.mock import call from unittest.mock import Mock @@ -28,6 +29,8 @@ from sqlalchemy.testing import is_false from sqlalchemy.testing import is_true from sqlalchemy.testing import mock from sqlalchemy.testing import ne_ +from sqlalchemy.testing.engines import DBAPIProxyConnection +from sqlalchemy.testing.engines import DBAPIProxyCursor from sqlalchemy.testing.engines import testing_engine from sqlalchemy.testing.schema import Column from sqlalchemy.testing.schema import Table @@ -1002,6 +1005,155 @@ def _assert_invalidated(fn, *args): raise +class RealPrePingEventHandlerTest(fixtures.TestBase): + """real test for issue #5648, which had to be revisited for 2.0 as the + initial version was not adequately tested and non-implementation for + mysql, postgresql was not caught + + """ + + __backend__ = True + __requires__ = "graceful_disconnects", "ad_hoc_engines" + + @testing.fixture + def ping_fixture(self, testing_engine): + engine = testing_engine( + options={"pool_pre_ping": True, "_initialize": False} + ) + + existing_connect = engine.dialect.dbapi.connect + + fail = False + fail_count = itertools.count() + DBAPIError = engine.dialect.dbapi.Error + + class ExplodeConnection(DBAPIProxyConnection): + def ping(self, *arg, **kw): + if fail and next(fail_count) < 1: + raise DBAPIError("unhandled disconnect situation") + else: + return True + + class ExplodeCursor(DBAPIProxyCursor): + def execute(self, stmt, parameters=None, **kw): + if fail and next(fail_count) < 1: + raise DBAPIError("unhandled disconnect situation") + else: + return super().execute(stmt, parameters=parameters, **kw) + + def mock_connect(*arg, **kw): + real_connection = existing_connect(*arg, **kw) + return ExplodeConnection(engine, real_connection, ExplodeCursor) + + with mock.patch.object( + engine.dialect.loaded_dbapi, "connect", mock_connect + ): + + # set up initial connection. pre_ping works on subsequent connects + engine.connect().close() + + # ping / exec will fail + fail = True + + yield engine + + @testing.fixture + def ping_fixture_all_errs_disconnect(self, ping_fixture): + engine = ping_fixture + + with mock.patch.object( + engine.dialect, "is_disconnect", lambda *arg, **kw: True + ): + yield engine + + def test_control(self, ping_fixture): + """test the fixture raises on connect""" + engine = ping_fixture + + with expect_raises_message( + exc.DBAPIError, "unhandled disconnect situation" + ): + engine.connect() + + def test_downgrade_control(self, ping_fixture_all_errs_disconnect): + """test the disconnect fixture doesn't raise, since it considers + all errors to be disconnect errors. + + """ + + engine = ping_fixture_all_errs_disconnect + + conn = engine.connect() + conn.close() + + def test_event_handler_didnt_upgrade_disconnect(self, ping_fixture): + """test that having an event handler that doesn't do anything + keeps the behavior in place for a fatal error. + + """ + engine = ping_fixture + + @event.listens_for(engine, "handle_error") + def setup_disconnect(ctx): + assert not ctx.is_disconnect + + with expect_raises_message( + exc.DBAPIError, "unhandled disconnect situation" + ): + engine.connect() + + def test_event_handler_didnt_downgrade_disconnect( + self, ping_fixture_all_errs_disconnect + ): + """test that having an event handler that doesn't do anything + keeps the behavior in place for a disconnect error. + + """ + engine = ping_fixture_all_errs_disconnect + + @event.listens_for(engine, "handle_error") + def setup_disconnect(ctx): + assert ctx.is_pre_ping + assert ctx.is_disconnect + + conn = engine.connect() + conn.close() + + def test_event_handler_can_upgrade_disconnect(self, ping_fixture): + """test that an event hook can receive a fatal error and convert + it to be a disconnect error during pre-ping""" + + engine = ping_fixture + + @event.listens_for(engine, "handle_error") + def setup_disconnect(ctx): + assert ctx.is_pre_ping + ctx.is_disconnect = True + + conn = engine.connect() + # no error + conn.close() + + def test_event_handler_can_downgrade_disconnect( + self, ping_fixture_all_errs_disconnect + ): + """test that an event hook can receive a disconnect error and convert + it to be a fatal error during pre-ping""" + + engine = ping_fixture_all_errs_disconnect + + @event.listens_for(engine, "handle_error") + def setup_disconnect(ctx): + assert ctx.is_disconnect + if ctx.is_pre_ping: + ctx.is_disconnect = False + + with expect_raises_message( + exc.DBAPIError, "unhandled disconnect situation" + ): + engine.connect() + + class RealReconnectTest(fixtures.TestBase): __backend__ = True __requires__ = "graceful_disconnects", "ad_hoc_engines" |