summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy
diff options
context:
space:
mode:
authormike bayer <mike_mp@zzzcomputing.com>2022-08-24 18:08:17 +0000
committerGerrit Code Review <gerrit@ci3.zzzcomputing.com>2022-08-24 18:08:17 +0000
commit1ea3c783b6ceaf488e34f15c3ce97eabbc3ab4d3 (patch)
tree1dffaa16ae5a09c12a1cc47a833a352fb023ef95 /lib/sqlalchemy
parent1c9da1e1d7ef7994328de2248b69a1a582766272 (diff)
parent776abf43d7404a3fa165588fd1e1e2d5ef9a9f04 (diff)
downloadsqlalchemy-1ea3c783b6ceaf488e34f15c3ce97eabbc3ab4d3.tar.gz
Merge "integrate connection.terminate() for supporting dialects" into main
Diffstat (limited to 'lib/sqlalchemy')
-rw-r--r--lib/sqlalchemy/dialects/postgresql/asyncpg.py7
-rw-r--r--lib/sqlalchemy/engine/default.py5
-rw-r--r--lib/sqlalchemy/engine/interfaces.py21
-rw-r--r--lib/sqlalchemy/pool/base.py38
4 files changed, 61 insertions, 10 deletions
diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
index 6888959f0..a84bece4f 100644
--- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py
+++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
@@ -793,6 +793,9 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection):
self.await_(self._connection.close())
+ def terminate(self):
+ self._connection.terminate()
+
class AsyncAdaptFallback_asyncpg_connection(AsyncAdapt_asyncpg_connection):
__slots__ = ()
@@ -895,6 +898,7 @@ class PGDialect_asyncpg(PGDialect):
supports_server_side_cursors = True
render_bind_cast = True
+ has_terminate = True
default_paramstyle = "format"
supports_sane_multi_rowcount = False
@@ -981,6 +985,9 @@ class PGDialect_asyncpg(PGDialect):
def get_deferrable(self, connection):
return connection.deferrable
+ def do_terminate(self, dbapi_connection) -> None:
+ dbapi_connection.terminate()
+
def create_connect_args(self, url):
opts = url.translate_connect_args(username="user")
diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py
index 80e687c32..9ad0ebbfc 100644
--- a/lib/sqlalchemy/engine/default.py
+++ b/lib/sqlalchemy/engine/default.py
@@ -237,6 +237,8 @@ class DefaultDialect(Dialect):
is_async = False
+ has_terminate = False
+
# TODO: this is not to be part of 2.0. implement rudimentary binary
# literals for SQLite, PostgreSQL, MySQL only within
# _Binary.literal_processor
@@ -620,6 +622,9 @@ class DefaultDialect(Dialect):
def do_commit(self, dbapi_connection):
dbapi_connection.commit()
+ def do_terminate(self, dbapi_connection):
+ self.do_close(dbapi_connection)
+
def do_close(self, dbapi_connection):
dbapi_connection.close()
diff --git a/lib/sqlalchemy/engine/interfaces.py b/lib/sqlalchemy/engine/interfaces.py
index 778c07592..01b266d68 100644
--- a/lib/sqlalchemy/engine/interfaces.py
+++ b/lib/sqlalchemy/engine/interfaces.py
@@ -966,6 +966,10 @@ class Dialect(EventTarget):
is_async: bool
"""Whether or not this dialect is intended for asyncio use."""
+ has_terminate: bool
+ """Whether or not this dialect has a separate "terminate" implementation
+ that does not block or require awaiting."""
+
engine_config_types: Mapping[str, Any]
"""a mapping of string keys that can be in an engine config linked to
type conversion functions.
@@ -1784,6 +1788,23 @@ class Dialect(EventTarget):
raise NotImplementedError()
+ def do_terminate(self, dbapi_connection: DBAPIConnection) -> None:
+ """Provide an implementation of ``connection.close()`` that tries as
+ much as possible to not block, given a DBAPI
+ connection.
+
+ In the vast majority of cases this just calls .close(), however
+ for some asyncio dialects may call upon different API features.
+
+ This hook is called by the :class:`_pool.Pool`
+ when a connection is being recycled or has been invalidated.
+
+ .. versionadded:: 1.4.41
+
+ """
+
+ raise NotImplementedError()
+
def do_close(self, dbapi_connection: DBAPIConnection) -> None:
"""Provide an implementation of ``connection.close()``, given a DBAPI
connection.
diff --git a/lib/sqlalchemy/pool/base.py b/lib/sqlalchemy/pool/base.py
index 51eb5e9f5..41f2f03b2 100644
--- a/lib/sqlalchemy/pool/base.py
+++ b/lib/sqlalchemy/pool/base.py
@@ -72,6 +72,7 @@ class _ConnDialect:
"""
is_async = False
+ has_terminate = False
def do_rollback(self, dbapi_connection: PoolProxiedConnection) -> None:
dbapi_connection.rollback()
@@ -79,6 +80,9 @@ class _ConnDialect:
def do_commit(self, dbapi_connection: PoolProxiedConnection) -> None:
dbapi_connection.commit()
+ def do_terminate(self, dbapi_connection: DBAPIConnection) -> None:
+ dbapi_connection.close()
+
def do_close(self, dbapi_connection: DBAPIConnection) -> None:
dbapi_connection.close()
@@ -310,10 +314,19 @@ class Pool(log.Identified, event.EventTarget):
creator_fn = cast(_CreatorFnType, creator)
return lambda rec: creator_fn()
- def _close_connection(self, connection: DBAPIConnection) -> None:
- self.logger.debug("Closing connection %r", connection)
+ def _close_connection(
+ self, connection: DBAPIConnection, *, terminate: bool = False
+ ) -> None:
+ self.logger.debug(
+ "%s connection %r",
+ "Hard-closing" if terminate else "Closing",
+ connection,
+ )
try:
- self._dialect.do_close(connection)
+ if terminate:
+ self._dialect.do_terminate(connection)
+ else:
+ self._dialect.do_close(connection)
except Exception:
self.logger.error(
"Exception closing connection %r", connection, exc_info=True
@@ -742,7 +755,7 @@ class _ConnectionRecord(ConnectionPoolEntry):
if soft:
self._soft_invalidate_time = time.time()
else:
- self.__close()
+ self.__close(terminate=True)
self.dbapi_connection = None
def get_connection(self) -> DBAPIConnection:
@@ -789,7 +802,7 @@ class _ConnectionRecord(ConnectionPoolEntry):
recycle = True
if recycle:
- self.__close()
+ self.__close(terminate=True)
self.info.clear() # type: ignore # our info is always present
self.__connect()
@@ -804,12 +817,14 @@ class _ConnectionRecord(ConnectionPoolEntry):
or (self._soft_invalidate_time > self.starttime)
)
- def __close(self) -> None:
+ def __close(self, *, terminate: bool = False) -> None:
self.finalize_callback.clear()
if self.__pool.dispatch.close:
self.__pool.dispatch.close(self.dbapi_connection, self)
assert self.dbapi_connection is not None
- self.__pool._close_connection(self.dbapi_connection)
+ self.__pool._close_connection(
+ self.dbapi_connection, terminate=terminate
+ )
self.dbapi_connection = None
def __connect(self) -> None:
@@ -877,7 +892,9 @@ def _finalize_fairy(
dbapi_connection = connection_record.dbapi_connection
# null pool is not _is_asyncio but can be used also with async dialects
- dont_restore_gced = pool._dialect.is_async
+ dont_restore_gced = (
+ pool._dialect.is_async and not pool._dialect.has_terminate
+ )
if dont_restore_gced:
detach = connection_record is None or is_gc_cleanup
@@ -923,8 +940,9 @@ def _finalize_fairy(
message = (
"The garbage collector is trying to clean up "
f"connection {dbapi_connection!r}. This feature is "
- "unsupported on async "
- "dbapi, since no IO can be performed at this stage to "
+ "unsupported on asyncio "
+ 'dbapis that lack a "terminate" feature, since no '
+ "IO can be performed at this stage to "
"reset the connection. Please close out all "
"connections when they are no longer used, calling "
"``close()`` or using a context manager to "