diff options
| author | mike bayer <mike_mp@zzzcomputing.com> | 2022-08-24 18:08:17 +0000 |
|---|---|---|
| committer | Gerrit Code Review <gerrit@ci3.zzzcomputing.com> | 2022-08-24 18:08:17 +0000 |
| commit | 1ea3c783b6ceaf488e34f15c3ce97eabbc3ab4d3 (patch) | |
| tree | 1dffaa16ae5a09c12a1cc47a833a352fb023ef95 /lib/sqlalchemy | |
| parent | 1c9da1e1d7ef7994328de2248b69a1a582766272 (diff) | |
| parent | 776abf43d7404a3fa165588fd1e1e2d5ef9a9f04 (diff) | |
| download | sqlalchemy-1ea3c783b6ceaf488e34f15c3ce97eabbc3ab4d3.tar.gz | |
Merge "integrate connection.terminate() for supporting dialects" into main
Diffstat (limited to 'lib/sqlalchemy')
| -rw-r--r-- | lib/sqlalchemy/dialects/postgresql/asyncpg.py | 7 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/default.py | 5 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/interfaces.py | 21 | ||||
| -rw-r--r-- | lib/sqlalchemy/pool/base.py | 38 |
4 files changed, 61 insertions, 10 deletions
diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py index 6888959f0..a84bece4f 100644 --- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py +++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py @@ -793,6 +793,9 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection): self.await_(self._connection.close()) + def terminate(self): + self._connection.terminate() + class AsyncAdaptFallback_asyncpg_connection(AsyncAdapt_asyncpg_connection): __slots__ = () @@ -895,6 +898,7 @@ class PGDialect_asyncpg(PGDialect): supports_server_side_cursors = True render_bind_cast = True + has_terminate = True default_paramstyle = "format" supports_sane_multi_rowcount = False @@ -981,6 +985,9 @@ class PGDialect_asyncpg(PGDialect): def get_deferrable(self, connection): return connection.deferrable + def do_terminate(self, dbapi_connection) -> None: + dbapi_connection.terminate() + def create_connect_args(self, url): opts = url.translate_connect_args(username="user") diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index 80e687c32..9ad0ebbfc 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -237,6 +237,8 @@ class DefaultDialect(Dialect): is_async = False + has_terminate = False + # TODO: this is not to be part of 2.0. implement rudimentary binary # literals for SQLite, PostgreSQL, MySQL only within # _Binary.literal_processor @@ -620,6 +622,9 @@ class DefaultDialect(Dialect): def do_commit(self, dbapi_connection): dbapi_connection.commit() + def do_terminate(self, dbapi_connection): + self.do_close(dbapi_connection) + def do_close(self, dbapi_connection): dbapi_connection.close() diff --git a/lib/sqlalchemy/engine/interfaces.py b/lib/sqlalchemy/engine/interfaces.py index 778c07592..01b266d68 100644 --- a/lib/sqlalchemy/engine/interfaces.py +++ b/lib/sqlalchemy/engine/interfaces.py @@ -966,6 +966,10 @@ class Dialect(EventTarget): is_async: bool """Whether or not this dialect is intended for asyncio use.""" + has_terminate: bool + """Whether or not this dialect has a separate "terminate" implementation + that does not block or require awaiting.""" + engine_config_types: Mapping[str, Any] """a mapping of string keys that can be in an engine config linked to type conversion functions. @@ -1784,6 +1788,23 @@ class Dialect(EventTarget): raise NotImplementedError() + def do_terminate(self, dbapi_connection: DBAPIConnection) -> None: + """Provide an implementation of ``connection.close()`` that tries as + much as possible to not block, given a DBAPI + connection. + + In the vast majority of cases this just calls .close(), however + for some asyncio dialects may call upon different API features. + + This hook is called by the :class:`_pool.Pool` + when a connection is being recycled or has been invalidated. + + .. versionadded:: 1.4.41 + + """ + + raise NotImplementedError() + def do_close(self, dbapi_connection: DBAPIConnection) -> None: """Provide an implementation of ``connection.close()``, given a DBAPI connection. diff --git a/lib/sqlalchemy/pool/base.py b/lib/sqlalchemy/pool/base.py index 51eb5e9f5..41f2f03b2 100644 --- a/lib/sqlalchemy/pool/base.py +++ b/lib/sqlalchemy/pool/base.py @@ -72,6 +72,7 @@ class _ConnDialect: """ is_async = False + has_terminate = False def do_rollback(self, dbapi_connection: PoolProxiedConnection) -> None: dbapi_connection.rollback() @@ -79,6 +80,9 @@ class _ConnDialect: def do_commit(self, dbapi_connection: PoolProxiedConnection) -> None: dbapi_connection.commit() + def do_terminate(self, dbapi_connection: DBAPIConnection) -> None: + dbapi_connection.close() + def do_close(self, dbapi_connection: DBAPIConnection) -> None: dbapi_connection.close() @@ -310,10 +314,19 @@ class Pool(log.Identified, event.EventTarget): creator_fn = cast(_CreatorFnType, creator) return lambda rec: creator_fn() - def _close_connection(self, connection: DBAPIConnection) -> None: - self.logger.debug("Closing connection %r", connection) + def _close_connection( + self, connection: DBAPIConnection, *, terminate: bool = False + ) -> None: + self.logger.debug( + "%s connection %r", + "Hard-closing" if terminate else "Closing", + connection, + ) try: - self._dialect.do_close(connection) + if terminate: + self._dialect.do_terminate(connection) + else: + self._dialect.do_close(connection) except Exception: self.logger.error( "Exception closing connection %r", connection, exc_info=True @@ -742,7 +755,7 @@ class _ConnectionRecord(ConnectionPoolEntry): if soft: self._soft_invalidate_time = time.time() else: - self.__close() + self.__close(terminate=True) self.dbapi_connection = None def get_connection(self) -> DBAPIConnection: @@ -789,7 +802,7 @@ class _ConnectionRecord(ConnectionPoolEntry): recycle = True if recycle: - self.__close() + self.__close(terminate=True) self.info.clear() # type: ignore # our info is always present self.__connect() @@ -804,12 +817,14 @@ class _ConnectionRecord(ConnectionPoolEntry): or (self._soft_invalidate_time > self.starttime) ) - def __close(self) -> None: + def __close(self, *, terminate: bool = False) -> None: self.finalize_callback.clear() if self.__pool.dispatch.close: self.__pool.dispatch.close(self.dbapi_connection, self) assert self.dbapi_connection is not None - self.__pool._close_connection(self.dbapi_connection) + self.__pool._close_connection( + self.dbapi_connection, terminate=terminate + ) self.dbapi_connection = None def __connect(self) -> None: @@ -877,7 +892,9 @@ def _finalize_fairy( dbapi_connection = connection_record.dbapi_connection # null pool is not _is_asyncio but can be used also with async dialects - dont_restore_gced = pool._dialect.is_async + dont_restore_gced = ( + pool._dialect.is_async and not pool._dialect.has_terminate + ) if dont_restore_gced: detach = connection_record is None or is_gc_cleanup @@ -923,8 +940,9 @@ def _finalize_fairy( message = ( "The garbage collector is trying to clean up " f"connection {dbapi_connection!r}. This feature is " - "unsupported on async " - "dbapi, since no IO can be performed at this stage to " + "unsupported on asyncio " + 'dbapis that lack a "terminate" feature, since no ' + "IO can be performed at this stage to " "reset the connection. Please close out all " "connections when they are no longer used, calling " "``close()`` or using a context manager to " |
