diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-02-15 23:43:51 -0500 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-02-17 14:45:04 -0500 |
| commit | 5157e0aa542f390242dd7a6d27a6ce1663230e46 (patch) | |
| tree | 113f0e5a83e8229c7d0cb9e9c47387e1d703cb29 /lib/sqlalchemy/pool | |
| parent | 20213fd1f27fea51015d753bf94c6f40674ae86f (diff) | |
| download | sqlalchemy-5157e0aa542f390242dd7a6d27a6ce1663230e46.tar.gz | |
pep-484 for pool
also extends into some areas of utils, events and others
as needed.
Formalizes a public hierarchy for pool API,
with ManagesConnection -> PoolProxiedConnection /
ConnectionPoolEntry for connectionfairy / connectionrecord,
which are now what's exposed in the event API and other
APIs. all public API docs moved to the new objects.
Corrects the mypy plugin's check for sqlalchemy-stubs
not being insatlled, which has to be imported using the
dash in the name to be effective.
Change-Id: I16c2cb43b2e840d28e70a015f370a768e70f3581
Diffstat (limited to 'lib/sqlalchemy/pool')
| -rw-r--r-- | lib/sqlalchemy/pool/__init__.py | 2 | ||||
| -rw-r--r-- | lib/sqlalchemy/pool/base.py | 904 | ||||
| -rw-r--r-- | lib/sqlalchemy/pool/events.py | 162 | ||||
| -rw-r--r-- | lib/sqlalchemy/pool/impl.py | 171 |
4 files changed, 706 insertions, 533 deletions
diff --git a/lib/sqlalchemy/pool/__init__.py b/lib/sqlalchemy/pool/__init__.py index bc2f93d57..2c52a7065 100644 --- a/lib/sqlalchemy/pool/__init__.py +++ b/lib/sqlalchemy/pool/__init__.py @@ -22,6 +22,8 @@ from .base import _AdhocProxiedConnection from .base import _ConnectionFairy from .base import _ConnectionRecord from .base import _finalize_fairy +from .base import ConnectionPoolEntry as ConnectionPoolEntry +from .base import ManagesConnection as ManagesConnection from .base import Pool as Pool from .base import PoolProxiedConnection as PoolProxiedConnection from .base import reset_commit as reset_commit diff --git a/lib/sqlalchemy/pool/base.py b/lib/sqlalchemy/pool/base.py index 72c56716f..18d268182 100644 --- a/lib/sqlalchemy/pool/base.py +++ b/lib/sqlalchemy/pool/base.py @@ -13,24 +13,55 @@ from __future__ import annotations from collections import deque +from enum import Enum +import threading import time +import typing from typing import Any +from typing import Callable +from typing import cast +from typing import Deque from typing import Dict +from typing import List from typing import Optional +from typing import Tuple from typing import TYPE_CHECKING +from typing import Union import weakref from .. import event from .. import exc from .. import log from .. import util +from ..util.typing import Literal +from ..util.typing import Protocol if TYPE_CHECKING: from ..engine.interfaces import DBAPIConnection + from ..engine.interfaces import DBAPICursor + from ..engine.interfaces import Dialect + from ..event import _Dispatch + from ..event import _ListenerFnType + from ..event import dispatcher -reset_rollback = util.symbol("reset_rollback") -reset_commit = util.symbol("reset_commit") -reset_none = util.symbol("reset_none") + +class ResetStyle(Enum): + """Describe options for "reset on return" behaviors.""" + + reset_rollback = 0 + reset_commit = 1 + reset_none = 2 + + +_ResetStyleArgType = Union[ + ResetStyle, + Literal[True], + Literal[None], + Literal[False], + Literal["commit"], + Literal["rollback"], +] +reset_rollback, reset_commit, reset_none = list(ResetStyle) class _ConnDialect: @@ -45,22 +76,22 @@ class _ConnDialect: is_async = False - def do_rollback(self, dbapi_connection): + def do_rollback(self, dbapi_connection: PoolProxiedConnection) -> None: dbapi_connection.rollback() - def do_commit(self, dbapi_connection): + def do_commit(self, dbapi_connection: PoolProxiedConnection) -> None: dbapi_connection.commit() - def do_close(self, dbapi_connection): + def do_close(self, dbapi_connection: DBAPIConnection) -> None: dbapi_connection.close() - def do_ping(self, dbapi_connection): + def do_ping(self, dbapi_connection: DBAPIConnection) -> None: raise NotImplementedError( "The ping feature requires that a dialect is " "passed to the connection pool." ) - def get_driver_connection(self, connection): + def get_driver_connection(self, connection: DBAPIConnection) -> Any: return connection @@ -68,23 +99,40 @@ class _AsyncConnDialect(_ConnDialect): is_async = True -class Pool(log.Identified): +class _CreatorFnType(Protocol): + def __call__(self) -> DBAPIConnection: + ... + + +class _CreatorWRecFnType(Protocol): + def __call__(self, rec: ConnectionPoolEntry) -> DBAPIConnection: + ... + + +class Pool(log.Identified, event.EventTarget): """Abstract base class for connection pools.""" - _dialect = _ConnDialect() + dispatch: dispatcher[Pool] + echo: log._EchoFlagType + + _orig_logging_name: Optional[str] + _dialect: Union[_ConnDialect, Dialect] = _ConnDialect() + _creator_arg: Union[_CreatorFnType, _CreatorWRecFnType] + _invoke_creator: _CreatorWRecFnType + _invalidate_time: float def __init__( self, - creator, - recycle=-1, - echo=None, - logging_name=None, - reset_on_return=True, - events=None, - dialect=None, - pre_ping=False, - _dispatch=None, + creator: Union[_CreatorFnType, _CreatorWRecFnType], + recycle: int = -1, + echo: log._EchoFlagType = None, + logging_name: Optional[str] = None, + reset_on_return: _ResetStyleArgType = True, + events: Optional[List[Tuple[_ListenerFnType, str]]] = None, + dialect: Optional[Union[_ConnDialect, Dialect]] = None, + pre_ping: bool = False, + _dispatch: Optional[_Dispatch[Pool]] = None, ): """ Construct a Pool. @@ -188,15 +236,14 @@ class Pool(log.Identified): self._recycle = recycle self._invalidate_time = 0 self._pre_ping = pre_ping - self._reset_on_return = util.symbol.parse_user_argument( + self._reset_on_return = util.parse_user_argument_for_enum( reset_on_return, { - reset_rollback: ["rollback", True], - reset_none: ["none", None, False], - reset_commit: ["commit"], + ResetStyle.reset_rollback: ["rollback", True], + ResetStyle.reset_none: ["none", None, False], + ResetStyle.reset_commit: ["commit"], }, "reset_on_return", - resolve_symbol_names=False, ) self.echo = echo @@ -210,19 +257,32 @@ class Pool(log.Identified): event.listen(self, target, fn) @util.hybridproperty - def _is_asyncio(self): + def _is_asyncio(self) -> bool: return self._dialect.is_async @property - def _creator(self): - return self.__dict__["_creator"] + def _creator(self) -> Union[_CreatorFnType, _CreatorWRecFnType]: + return self._creator_arg @_creator.setter - def _creator(self, creator): - self.__dict__["_creator"] = creator - self._invoke_creator = self._should_wrap_creator(creator) + def _creator( + self, creator: Union[_CreatorFnType, _CreatorWRecFnType] + ) -> None: + self._creator_arg = creator + + # mypy seems to get super confused assigning functions to + # attributes + self._invoke_creator = self._should_wrap_creator(creator) # type: ignore # noqa E501 + + @_creator.deleter + def _creator(self) -> None: + # needed for mock testing + del self._creator_arg + del self._invoke_creator # type: ignore[misc] - def _should_wrap_creator(self, creator): + def _should_wrap_creator( + self, creator: Union[_CreatorFnType, _CreatorWRecFnType] + ) -> _CreatorWRecFnType: """Detect if creator accepts a single argument, or is sent as a legacy style no-arg function. @@ -231,26 +291,30 @@ class Pool(log.Identified): try: argspec = util.get_callable_argspec(self._creator, no_self=True) except TypeError: - return lambda crec: creator() + creator_fn = cast(_CreatorFnType, creator) + return lambda rec: creator_fn() - defaulted = argspec[3] is not None and len(argspec[3]) or 0 + if argspec.defaults is not None: + defaulted = len(argspec.defaults) + else: + defaulted = 0 positionals = len(argspec[0]) - defaulted # look for the exact arg signature that DefaultStrategy # sends us if (argspec[0], argspec[3]) == (["connection_record"], (None,)): - return creator + return cast(_CreatorWRecFnType, creator) # or just a single positional elif positionals == 1: - return creator + return cast(_CreatorWRecFnType, creator) # all other cases, just wrap and assume legacy "creator" callable # thing else: - return lambda crec: creator() + creator_fn = cast(_CreatorFnType, creator) + return lambda rec: creator_fn() - def _close_connection(self, connection): + def _close_connection(self, connection: DBAPIConnection) -> None: self.logger.debug("Closing connection %r", connection) - try: self._dialect.do_close(connection) except Exception: @@ -258,12 +322,17 @@ class Pool(log.Identified): "Exception closing connection %r", connection, exc_info=True ) - def _create_connection(self): + def _create_connection(self) -> ConnectionPoolEntry: """Called by subclasses to create a new ConnectionRecord.""" return _ConnectionRecord(self) - def _invalidate(self, connection, exception=None, _checkin=True): + def _invalidate( + self, + connection: PoolProxiedConnection, + exception: Optional[BaseException] = None, + _checkin: bool = True, + ) -> None: """Mark all connections established within the generation of the given connection as invalidated. @@ -280,7 +349,7 @@ class Pool(log.Identified): if _checkin and getattr(connection, "is_valid", False): connection.invalidate(exception) - def recreate(self): + def recreate(self) -> Pool: """Return a new :class:`_pool.Pool`, of the same class as this one and configured with identical creation arguments. @@ -292,7 +361,7 @@ class Pool(log.Identified): raise NotImplementedError() - def dispose(self): + def dispose(self) -> None: """Dispose of this pool. This method leaves the possibility of checked-out connections @@ -307,7 +376,7 @@ class Pool(log.Identified): raise NotImplementedError() - def connect(self): + def connect(self) -> PoolProxiedConnection: """Return a DBAPI connection from the pool. The connection is instrumented such that when its @@ -317,7 +386,7 @@ class Pool(log.Identified): """ return _ConnectionFairy._checkout(self) - def _return_conn(self, record): + def _return_conn(self, record: ConnectionPoolEntry) -> None: """Given a _ConnectionRecord, return it to the :class:`_pool.Pool`. This method is called when an instrumented DBAPI connection @@ -326,100 +395,230 @@ class Pool(log.Identified): """ self._do_return_conn(record) - def _do_get(self): + def _do_get(self) -> ConnectionPoolEntry: """Implementation for :meth:`get`, supplied by subclasses.""" raise NotImplementedError() - def _do_return_conn(self, conn): + def _do_return_conn(self, record: ConnectionPoolEntry) -> None: """Implementation for :meth:`return_conn`, supplied by subclasses.""" raise NotImplementedError() - def status(self): + def status(self) -> str: raise NotImplementedError() -class _ConnectionRecord: +class ManagesConnection: + """Common base for the two connection-management interfaces + :class:`.PoolProxiedConnection` and :class:`.ConnectionPoolEntry`. - """Internal object which maintains an individual DBAPI connection - referenced by a :class:`_pool.Pool`. + These two objects are typically exposed in the public facing API + via the connection pool event hooks, documented at :class:`.PoolEvents`. - The :class:`._ConnectionRecord` object always exists for any particular - DBAPI connection whether or not that DBAPI connection has been - "checked out". This is in contrast to the :class:`._ConnectionFairy` - which is only a public facade to the DBAPI connection while it is checked - out. + .. versionadded:: 2.0 - A :class:`._ConnectionRecord` may exist for a span longer than that - of a single DBAPI connection. For example, if the - :meth:`._ConnectionRecord.invalidate` - method is called, the DBAPI connection associated with this - :class:`._ConnectionRecord` - will be discarded, but the :class:`._ConnectionRecord` may be used again, - in which case a new DBAPI connection is produced when the - :class:`_pool.Pool` - next uses this record. + """ - The :class:`._ConnectionRecord` is delivered along with connection - pool events, including :meth:`_events.PoolEvents.connect` and - :meth:`_events.PoolEvents.checkout`, however :class:`._ConnectionRecord` - still - remains an internal object whose API and internals may change. + __slots__ = () + + dbapi_connection: Optional[DBAPIConnection] + """A reference to the actual DBAPI connection being tracked. + + This is a :pep:`249`-compliant object that for traditional sync-style + dialects is provided by the third-party + DBAPI implementation in use. For asyncio dialects, the implementation + is typically an adapter object provided by the SQLAlchemy dialect + itself; the underlying asyncio object is available via the + :attr:`.ManagesConnection.driver_connection` attribute. + + SQLAlchemy's interface for the DBAPI connection is based on the + :class:`.DBAPIConnection` protocol object .. seealso:: - :class:`._ConnectionFairy` + :attr:`.ManagesConnection.driver_connection` + + :ref:`faq_dbapi_connection` """ - def __init__(self, pool, connect=True): - self.__pool = pool - if connect: - self.__connect() - self.finalize_callback = deque() + @property + def driver_connection(self) -> Optional[Any]: + """The "driver level" connection object as used by the Python + DBAPI or database driver. + + For traditional :pep:`249` DBAPI implementations, this object will + be the same object as that of + :attr:`.ManagesConnection.dbapi_connection`. For an asyncio database + driver, this will be the ultimate "connection" object used by that + driver, such as the ``asyncpg.Connection`` object which will not have + standard pep-249 methods. + + .. versionadded:: 1.4.24 - fresh = False + .. seealso:: - fairy_ref = None + :attr:`.ManagesConnection.dbapi_connection` - starttime = None + :ref:`faq_dbapi_connection` - dbapi_connection = None - """A reference to the actual DBAPI connection being tracked. + """ + raise NotImplementedError() + + @util.dynamic_property + def info(self) -> Dict[str, Any]: + """Info dictionary associated with the underlying DBAPI connection + referred to by this :class:`.ManagesConnection` instance, allowing + user-defined data to be associated with the connection. + + The data in this dictionary is persistent for the lifespan + of the DBAPI connection itself, including across pool checkins + and checkouts. When the connection is invalidated + and replaced with a new one, this dictionary is cleared. + + For a :class:`.PoolProxiedConnection` instance that's not associated + with a :class:`.ConnectionPoolEntry`, such as if it were detached, the + attribute returns a dictionary that is local to that + :class:`.ConnectionPoolEntry`. Therefore the + :attr:`.ManagesConnection.info` attribute will always provide a Python + dictionary. + + .. seealso:: - May be ``None`` if this :class:`._ConnectionRecord` has been marked - as invalidated; a new DBAPI connection may replace it if the owning - pool calls upon this :class:`._ConnectionRecord` to reconnect. + :attr:`.ManagesConnection.record_info` - For adapted drivers, like the Asyncio implementations, this is a - :class:`.AdaptedConnection` that adapts the driver connection - to the DBAPI protocol. - Use :attr:`._ConnectionRecord.driver_connection` to obtain the - connection objected returned by the driver. - .. versionadded:: 1.4.24 + """ + raise NotImplementedError() + + @util.dynamic_property + def record_info(self) -> Optional[Dict[str, Any]]: + """Persistent info dictionary associated with this + :class:`.ManagesConnection`. + + Unlike the :attr:`.ManagesConnection.info` dictionary, the lifespan + of this dictionary is that of the :class:`.ConnectionPoolEntry` + which owns it; therefore this dictionary will persist across + reconnects and connection invalidation for a particular entry + in the connection pool. + + For a :class:`.PoolProxiedConnection` instance that's not associated + with a :class:`.ConnectionPoolEntry`, such as if it were detached, the + attribute returns None. Contrast to the :attr:`.ManagesConnection.info` + dictionary which is never None. + + + .. seealso:: + + :attr:`.ManagesConnection.info` + + """ + raise NotImplementedError() + + def invalidate( + self, e: Optional[BaseException] = None, soft: bool = False + ) -> None: + """Mark the managed connection as invalidated. + + :param e: an exception object indicating a reason for the invalidation. + + :param soft: if True, the connection isn't closed; instead, this + connection will be recycled on next checkout. + + .. seealso:: + + :ref:`pool_connection_invalidation` + + + """ + raise NotImplementedError() + + +class ConnectionPoolEntry(ManagesConnection): + """Interface for the object that maintains an individual database + connection on behalf of a :class:`_pool.Pool` instance. + + The :class:`.ConnectionPoolEntry` object represents the long term + maintainance of a particular connection for a pool, including expiring or + invalidating that connection to have it replaced with a new one, which will + continue to be maintained by that same :class:`.ConnectionPoolEntry` + instance. Compared to :class:`.PoolProxiedConnection`, which is the + short-term, per-checkout connection manager, this object lasts for the + lifespan of a particular "slot" within a connection pool. + + The :class:`.ConnectionPoolEntry` object is mostly visible to public-facing + API code when it is delivered to connection pool event hooks, such as + :meth:`_events.PoolEvents.connect` and :meth:`_events.PoolEvents.checkout`. + + .. versionadded:: 2.0 :class:`.ConnectionPoolEntry` provides the public + facing interface for the :class:`._ConnectionRecord` internal class. """ + __slots__ = () + @property - def driver_connection(self): - """The connection object as returned by the driver after a connect. + def in_use(self) -> bool: + """Return True the connection is currently checked out""" - For normal sync drivers that support the DBAPI protocol, this object - is the same as the one referenced by - :attr:`._ConnectionRecord.dbapi_connection`. + raise NotImplementedError() - For adapted drivers, like the Asyncio ones, this is the actual object - that was returned by the driver ``connect`` call. + def close(self) -> None: + """Close the DBAPI connection managed by this connection pool entry.""" + raise NotImplementedError() - As :attr:`._ConnectionRecord.dbapi_connection` it may be ``None`` - if this :class:`._ConnectionRecord` has been marked as invalidated. - .. versionadded:: 1.4.24 +class _ConnectionRecord(ConnectionPoolEntry): - """ + """Maintains a position in a connection pool which references a pooled + connection. + This is an internal object used by the :class:`_pool.Pool` implementation + to provide context management to a DBAPI connection maintained by + that :class:`_pool.Pool`. The public facing interface for this class + is described by the :class:`.ConnectionPoolEntry` class. See that + class for public API details. + + .. seealso:: + + :class:`.ConnectionPoolEntry` + + :class:`.PoolProxiedConnection` + + """ + + __slots__ = ( + "__pool", + "fairy_ref", + "finalize_callback", + "fresh", + "starttime", + "dbapi_connection", + "__weakref__", + "__dict__", + ) + + finalize_callback: Deque[Callable[[DBAPIConnection], None]] + fresh: bool + fairy_ref: Optional[weakref.ref[_ConnectionFairy]] + starttime: float + + def __init__(self, pool: Pool, connect: bool = True): + self.fresh = False + self.fairy_ref = None + self.starttime = 0 + self.dbapi_connection = None + + self.__pool = pool + if connect: + self.__connect() + self.finalize_callback = deque() + + dbapi_connection: Optional[DBAPIConnection] + + @property + def driver_connection(self) -> Optional[Any]: if self.dbapi_connection is None: return None else: @@ -428,72 +627,41 @@ class _ConnectionRecord: ) @property - def connection(self): - """An alias to :attr:`._ConnectionRecord.dbapi_connection`. - - This alias is deprecated, please use the new name. - - .. deprecated:: 1.4.24 - - """ + def connection(self) -> Optional[DBAPIConnection]: return self.dbapi_connection @connection.setter - def connection(self, value): + def connection(self, value: DBAPIConnection) -> None: self.dbapi_connection = value - _soft_invalidate_time = 0 + _soft_invalidate_time: float = 0 @util.memoized_property - def info(self): - """The ``.info`` dictionary associated with the DBAPI connection. - - This dictionary is shared among the :attr:`._ConnectionFairy.info` - and :attr:`_engine.Connection.info` accessors. - - .. note:: - - The lifespan of this dictionary is linked to the - DBAPI connection itself, meaning that it is **discarded** each time - the DBAPI connection is closed and/or invalidated. The - :attr:`._ConnectionRecord.record_info` dictionary remains - persistent throughout the lifespan of the - :class:`._ConnectionRecord` container. - - """ + def info(self) -> Dict[str, Any]: return {} @util.memoized_property - def record_info(self): - """An "info' dictionary associated with the connection record - itself. - - Unlike the :attr:`._ConnectionRecord.info` dictionary, which is linked - to the lifespan of the DBAPI connection, this dictionary is linked - to the lifespan of the :class:`._ConnectionRecord` container itself - and will remain persistent throughout the life of the - :class:`._ConnectionRecord`. - - .. versionadded:: 1.1 - - """ + def record_info(self) -> Optional[Dict[str, Any]]: return {} @classmethod - def checkout(cls, pool): - rec = pool._do_get() + def checkout(cls, pool: Pool) -> _ConnectionFairy: + rec = cast(_ConnectionRecord, pool._do_get()) try: dbapi_connection = rec.get_connection() except Exception as err: with util.safe_reraise(): rec._checkin_failed(err, _fairy_was_created=False) + raise + echo = pool._should_log_debug() - fairy = _ConnectionFairy(dbapi_connection, rec, echo) + fairy = _ConnectionFairy(pool, dbapi_connection, rec, echo) rec.fairy_ref = ref = weakref.ref( fairy, - lambda ref: _finalize_fairy - and _finalize_fairy(None, rec, pool, ref, echo, True), + lambda ref: _finalize_fairy(None, rec, pool, ref, echo, True) + if _finalize_fairy + else None, ) _strong_ref_connection_records[ref] = rec if echo: @@ -502,13 +670,15 @@ class _ConnectionRecord: ) return fairy - def _checkin_failed(self, err, _fairy_was_created=True): + def _checkin_failed( + self, err: Exception, _fairy_was_created: bool = True + ) -> None: self.invalidate(e=err) self.checkin( _fairy_was_created=_fairy_was_created, ) - def checkin(self, _fairy_was_created=True): + def checkin(self, _fairy_was_created: bool = True) -> None: if self.fairy_ref is None and _fairy_was_created: # _fairy_was_created is False for the initial get connection phase; # meaning there was no _ConnectionFairy and we must unconditionally @@ -524,47 +694,28 @@ class _ConnectionRecord: pool = self.__pool while self.finalize_callback: finalizer = self.finalize_callback.pop() - finalizer(connection) + if connection is not None: + finalizer(connection) if pool.dispatch.checkin: pool.dispatch.checkin(connection, self) pool._return_conn(self) @property - def in_use(self): + def in_use(self) -> bool: return self.fairy_ref is not None @property - def last_connect_time(self): + def last_connect_time(self) -> float: return self.starttime - def close(self): + def close(self) -> None: if self.dbapi_connection is not None: self.__close() - def invalidate(self, e=None, soft=False): - """Invalidate the DBAPI connection held by this - :class:`._ConnectionRecord`. - - This method is called for all connection invalidations, including - when the :meth:`._ConnectionFairy.invalidate` or - :meth:`_engine.Connection.invalidate` methods are called, - as well as when any - so-called "automatic invalidation" condition occurs. - - :param e: an exception object indicating a reason for the - invalidation. - - :param soft: if True, the connection isn't closed; instead, this - connection will be recycled on next checkout. - - .. versionadded:: 1.0.3 - - .. seealso:: - - :ref:`pool_connection_invalidation` - - """ + def invalidate( + self, e: Optional[BaseException] = None, soft: bool = False + ) -> None: # already invalidated if self.dbapi_connection is None: return @@ -595,7 +746,7 @@ class _ConnectionRecord: self.__close() self.dbapi_connection = None - def get_connection(self): + def get_connection(self) -> DBAPIConnection: recycle = False # NOTE: the various comparisons here are assuming that measurable time @@ -610,8 +761,9 @@ class _ConnectionRecord: # within 16 milliseconds accuracy, so unit tests for connection # invalidation need a sleep of at least this long between initial start # time and invalidation for the logic below to work reliably. + if self.dbapi_connection is None: - self.info.clear() + self.info.clear() # type: ignore # our info is always present self.__connect() elif ( self.__pool._recycle > -1 @@ -639,26 +791,29 @@ class _ConnectionRecord: if recycle: self.__close() - self.info.clear() + self.info.clear() # type: ignore # our info is always present self.__connect() + + assert self.dbapi_connection is not None return self.dbapi_connection - def _is_hard_or_soft_invalidated(self): + def _is_hard_or_soft_invalidated(self) -> bool: return ( self.dbapi_connection is None or self.__pool._invalidate_time > self.starttime or (self._soft_invalidate_time > self.starttime) ) - def __close(self): + def __close(self) -> None: self.finalize_callback.clear() if self.__pool.dispatch.close: self.__pool.dispatch.close(self.dbapi_connection, self) + assert self.dbapi_connection is not None self.__pool._close_connection(self.dbapi_connection) self.dbapi_connection = None - def __connect(self): + def __connect(self) -> None: pool = self.__pool # ensure any existing connection is removed, so that if @@ -688,14 +843,16 @@ class _ConnectionRecord: def _finalize_fairy( - dbapi_connection, - connection_record, - pool, - ref, # this is None when called directly, not by the gc - echo, - reset=True, - fairy=None, -): + dbapi_connection: Optional[DBAPIConnection], + connection_record: Optional[_ConnectionRecord], + pool: Pool, + ref: Optional[ + weakref.ref[_ConnectionFairy] + ], # this is None when called directly, not by the gc + echo: Optional[log._EchoFlagType], + reset: bool = True, + fairy: Optional[_ConnectionFairy] = None, +) -> None: """Cleanup for a :class:`._ConnectionFairy` whether or not it's already been garbage collected. @@ -705,12 +862,16 @@ def _finalize_fairy( will only log a message and raise a warning. """ - if ref: + is_gc_cleanup = ref is not None + + if is_gc_cleanup: + assert ref is not None _strong_ref_connection_records.pop(ref, None) elif fairy: _strong_ref_connection_records.pop(weakref.ref(fairy), None) - if ref is not None: + if is_gc_cleanup: + assert connection_record is not None if connection_record.fairy_ref is not ref: return assert dbapi_connection is None @@ -720,10 +881,10 @@ def _finalize_fairy( dont_restore_gced = pool._dialect.is_async if dont_restore_gced: - detach = not connection_record or ref - can_manipulate_connection = not ref + detach = connection_record is None or is_gc_cleanup + can_manipulate_connection = ref is None else: - detach = not connection_record + detach = connection_record is None can_manipulate_connection = True if dbapi_connection is not None: @@ -737,11 +898,14 @@ def _finalize_fairy( ) try: - fairy = fairy or _ConnectionFairy( - dbapi_connection, - connection_record, - echo, - ) + if not fairy: + assert connection_record is not None + fairy = _ConnectionFairy( + pool, + dbapi_connection, + connection_record, + echo, + ) assert fairy.dbapi_connection is dbapi_connection if reset and can_manipulate_connection: fairy._reset(pool) @@ -786,6 +950,7 @@ def _finalize_fairy( # test/engine/test_pool.py::PoolEventsTest::test_checkin_event_gc[True] # which actually started failing when pytest warnings plugin was # turned on, due to util.warn() above + fairy.dbapi_connection = fairy._connection_record = None # type: ignore del dbapi_connection del connection_record del fairy @@ -795,53 +960,36 @@ def _finalize_fairy( # GC under pypy will call ConnectionFairy finalizers. linked directly to the # weakref that will empty itself when collected so that it should not create # any unmanaged memory references. -_strong_ref_connection_records = {} +_strong_ref_connection_records: Dict[ + weakref.ref[_ConnectionFairy], _ConnectionRecord +] = {} -class PoolProxiedConnection: - """interface for the wrapper connection that is used by the connection - pool. +class PoolProxiedConnection(ManagesConnection): + """A connection-like adapter for a :pep:`249` DBAPI connection, which + includes additional methods specific to the :class:`.Pool` implementation. - :class:`.PoolProxiedConnection` is basically the public-facing interface - for the :class:`._ConnectionFairy` implementation object, users familiar - with :class:`._ConnectionFairy` can consider this object to be - equivalent. + :class:`.PoolProxiedConnection` is the public-facing interface for the + internal :class:`._ConnectionFairy` implementation object; users familiar + with :class:`._ConnectionFairy` can consider this object to be equivalent. - .. versionadded:: 2.0 + .. versionadded:: 2.0 :class:`.PoolProxiedConnection` provides the public- + facing interface for the :class:`._ConnectionFairy` internal class. """ __slots__ = () - @util.memoized_property - def dbapi_connection(self) -> "DBAPIConnection": - """A reference to the actual DBAPI connection being tracked. + if typing.TYPE_CHECKING: - .. seealso:: + def commit(self) -> None: + ... - :attr:`.PoolProxiedConnection.driver_connection` + def cursor(self) -> DBAPICursor: + ... - :attr:`.PoolProxiedConnection.dbapi_connection` - - :ref:`faq_dbapi_connection` - - """ - raise NotImplementedError() - - @property - def driver_connection(self) -> Any: - """The connection object as returned by the driver after a connect. - - .. seealso:: - - :attr:`.PoolProxiedConnection.dbapi_connection` - - :attr:`._ConnectionRecord.driver_connection` - - :ref:`faq_dbapi_connection` - - """ - raise NotImplementedError() + def rollback(self) -> None: + ... @property def is_valid(self) -> bool: @@ -850,62 +998,11 @@ class PoolProxiedConnection: raise NotImplementedError() - @util.memoized_property - def info(self) -> Dict[str, Any]: - """Info dictionary associated with the underlying DBAPI connection - referred to by this :class:`.ConnectionFairy`, allowing user-defined - data to be associated with the connection. - - The data here will follow along with the DBAPI connection including - after it is returned to the connection pool and used again - in subsequent instances of :class:`._ConnectionFairy`. It is shared - with the :attr:`._ConnectionRecord.info` and - :attr:`_engine.Connection.info` - accessors. - - The dictionary associated with a particular DBAPI connection is - discarded when the connection itself is discarded. - - """ - - raise NotImplementedError() - @property - def record_info(self) -> Dict[str, Any]: - """Info dictionary associated with the :class:`._ConnectionRecord - container referred to by this :class:`.PoolProxiedConnection`. - - Unlike the :attr:`.PoolProxiedConnection.info` dictionary, the lifespan - of this dictionary is persistent across connections that are - disconnected and/or invalidated within the lifespan of a - :class:`._ConnectionRecord`. - - """ - - raise NotImplementedError() + def is_detached(self) -> bool: + """Return True if this :class:`.PoolProxiedConnection` is detached + from its pool.""" - def invalidate( - self, e: Optional[Exception] = None, soft: bool = False - ) -> None: - """Mark this connection as invalidated. - - This method can be called directly, and is also called as a result - of the :meth:`_engine.Connection.invalidate` method. When invoked, - the DBAPI connection is immediately closed and discarded from - further use by the pool. The invalidation mechanism proceeds - via the :meth:`._ConnectionRecord.invalidate` internal method. - - :param e: an exception object indicating a reason for the invalidation. - - :param soft: if True, the connection isn't closed; instead, this - connection will be recycled on next checkout. - - .. seealso:: - - :ref:`pool_connection_invalidation` - - - """ raise NotImplementedError() def detach(self) -> None: @@ -913,8 +1010,8 @@ class PoolProxiedConnection: This means that the connection will no longer be returned to the pool when closed, and will instead be literally closed. The - containing ConnectionRecord is separated from the DB-API connection, - and will create a new connection when next used. + associated :class:`.ConnectionPoolEntry` is de-associated from this + DBAPI connection. Note that any overall connection limiting constraints imposed by a Pool implementation may be violated after a detach, as the detached @@ -953,43 +1050,37 @@ class _AdhocProxiedConnection(PoolProxiedConnection): __slots__ = ("dbapi_connection", "_connection_record") - def __init__(self, dbapi_connection, connection_record): + dbapi_connection: DBAPIConnection + _connection_record: ConnectionPoolEntry + + def __init__( + self, + dbapi_connection: DBAPIConnection, + connection_record: ConnectionPoolEntry, + ): self.dbapi_connection = dbapi_connection self._connection_record = connection_record @property - def driver_connection(self): + def driver_connection(self) -> Any: return self._connection_record.driver_connection @property - def connection(self): - """An alias to :attr:`._ConnectionFairy.dbapi_connection`. - - This alias is deprecated, please use the new name. - - .. deprecated:: 1.4.24 - - """ - return self._dbapi_connection + def connection(self) -> DBAPIConnection: + return self.dbapi_connection @property - def is_valid(self): + def is_valid(self) -> bool: raise AttributeError("is_valid not implemented by this proxy") - @property - def record_info(self): + @util.dynamic_property + def record_info(self) -> Optional[Dict[str, Any]]: return self._connection_record.record_info - def cursor(self, *args, **kwargs): - """Return a new DBAPI cursor for the underlying connection. - - This method is a proxy for the ``connection.cursor()`` DBAPI - method. - - """ + def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor: return self.dbapi_connection.cursor(*args, **kwargs) - def __getattr__(self, key): + def __getattr__(self, key: Any) -> Any: return getattr(self.dbapi_connection, key) @@ -1001,7 +1092,8 @@ class _ConnectionFairy(PoolProxiedConnection): This is an internal object used by the :class:`_pool.Pool` implementation to provide context management to a DBAPI connection delivered by that :class:`_pool.Pool`. The public facing interface for this class - is described by the :class:`.PoolProxiedConnection` class. + is described by the :class:`.PoolProxiedConnection` class. See that + class for public API details. The name "fairy" is inspired by the fact that the :class:`._ConnectionFairy` object's lifespan is transitory, as it lasts @@ -1011,68 +1103,76 @@ class _ConnectionFairy(PoolProxiedConnection): .. seealso:: - :class:`._ConnectionRecord` - - """ + :class:`.PoolProxiedConnection` - def __init__(self, dbapi_connection, connection_record, echo): - self.dbapi_connection = dbapi_connection - self._connection_record = connection_record - self._echo = echo + :class:`.ConnectionPoolEntry` - _connection_record = None - """A reference to the :class:`._ConnectionRecord` object associated - with the DBAPI connection. - - This is currently an internal accessor which is subject to change. """ - @property - def driver_connection(self): - """The connection object as returned by the driver after a connect. + __slots__ = ( + "dbapi_connection", + "_connection_record", + "_echo", + "_pool", + "_counter", + "__weakref__", + "__dict__", + ) - .. versionadded:: 1.4.24 - - .. seealso:: + pool: Pool + dbapi_connection: DBAPIConnection + _echo: log._EchoFlagType - :attr:`._ConnectionFairy.dbapi_connection` - - :attr:`._ConnectionRecord.driver_connection` + def __init__( + self, + pool: Pool, + dbapi_connection: DBAPIConnection, + connection_record: _ConnectionRecord, + echo: log._EchoFlagType, + ): + self._pool = pool + self._counter = 0 + self.dbapi_connection = dbapi_connection + self._connection_record = connection_record + self._echo = echo - :ref:`faq_dbapi_connection` + _connection_record: Optional[_ConnectionRecord] - """ + @property + def driver_connection(self) -> Optional[Any]: + if self._connection_record is None: + return None return self._connection_record.driver_connection @property - def connection(self): - """An alias to :attr:`._ConnectionFairy.dbapi_connection`. - - This alias is deprecated, please use the new name. - - .. deprecated:: 1.4.24 - - """ + def connection(self) -> DBAPIConnection: return self.dbapi_connection @connection.setter - def connection(self, value): + def connection(self, value: DBAPIConnection) -> None: self.dbapi_connection = value @classmethod - def _checkout(cls, pool, threadconns=None, fairy=None): + def _checkout( + cls, + pool: Pool, + threadconns: Optional[threading.local] = None, + fairy: Optional[_ConnectionFairy] = None, + ) -> _ConnectionFairy: if not fairy: fairy = _ConnectionRecord.checkout(pool) - fairy._pool = pool - fairy._counter = 0 - if threadconns is not None: threadconns.current = weakref.ref(fairy) - if fairy.dbapi_connection is None: - raise exc.InvalidRequestError("This connection is closed") + assert ( + fairy._connection_record is not None + ), "can't 'checkout' a detached connection fairy" + assert ( + fairy.dbapi_connection is not None + ), "can't 'checkout' an invalidated connection fairy" + fairy._counter += 1 if ( not pool.dispatch.checkout and not pool._pre_ping @@ -1084,6 +1184,7 @@ class _ConnectionFairy(PoolProxiedConnection): # there are three attempts made here, but note that if the database # is not accessible from a connection standpoint, those won't proceed # here. + attempts = 2 while attempts > 0: connection_is_fresh = fairy._connection_record.fresh @@ -1160,10 +1261,10 @@ class _ConnectionFairy(PoolProxiedConnection): fairy.invalidate() raise exc.InvalidRequestError("This connection is closed") - def _checkout_existing(self): + def _checkout_existing(self) -> _ConnectionFairy: return _ConnectionFairy._checkout(self._pool, fairy=self) - def _checkin(self, reset=True): + def _checkin(self, reset: bool = True) -> None: _finalize_fairy( self.dbapi_connection, self._connection_record, @@ -1173,14 +1274,13 @@ class _ConnectionFairy(PoolProxiedConnection): reset=reset, fairy=self, ) - self.dbapi_connection = None - self._connection_record = None - _close = _checkin + def _close(self) -> None: + self._checkin() - def _reset(self, pool): + def _reset(self, pool: Pool) -> None: if pool.dispatch.reset: - pool.dispatch.reset(self, self._connection_record) + pool.dispatch.reset(self.dbapi_connection, self._connection_record) if pool._reset_on_return is reset_rollback: if self._echo: pool.logger.debug( @@ -1196,50 +1296,34 @@ class _ConnectionFairy(PoolProxiedConnection): pool._dialect.do_commit(self) @property - def _logger(self): + def _logger(self) -> log._IdentifiedLoggerType: return self._pool.logger @property - def is_valid(self): - """Return True if this :class:`._ConnectionFairy` still refers - to an active DBAPI connection.""" - + def is_valid(self) -> bool: return self.dbapi_connection is not None - @util.memoized_property - def info(self): - """Info dictionary associated with the underlying DBAPI connection - referred to by this :class:`.ConnectionFairy`, allowing user-defined - data to be associated with the connection. - - See :attr:`.PoolProxiedConnection.info` for full description. - - """ - return self._connection_record.info - @property - def record_info(self): - """Info dictionary associated with the :class:`._ConnectionRecord - container referred to by this :class:`.ConnectionFairy`. + def is_detached(self) -> bool: + return self._connection_record is not None - See :attr:`.PoolProxiedConnection.record_info` for full description. - - """ - if self._connection_record: - return self._connection_record.record_info + @util.memoized_property + def info(self) -> Dict[str, Any]: + if self._connection_record is None: + return {} else: - return None - - def invalidate(self, e=None, soft=False): - """Mark this connection as invalidated. - - See :attr:`.PoolProxiedConnection.invalidate` for full description. - - .. seealso:: + return self._connection_record.info - :ref:`pool_connection_invalidation` + @util.dynamic_property + def record_info(self) -> Optional[Dict[str, Any]]: + if self._connection_record is None: + return None + else: + return self._connection_record.record_info - """ + def invalidate( + self, e: Optional[BaseException] = None, soft: bool = False + ) -> None: if self.dbapi_connection is None: util.warn("Can't invalidate an already-closed connection.") @@ -1247,51 +1331,43 @@ class _ConnectionFairy(PoolProxiedConnection): if self._connection_record: self._connection_record.invalidate(e=e, soft=soft) if not soft: - self.dbapi_connection = None - self._checkin() - - def cursor(self, *args, **kwargs): - """Return a new DBAPI cursor for the underlying connection. + # prevent any rollback / reset actions etc. on + # the connection + self.dbapi_connection = None # type: ignore - This method is a proxy for the ``connection.cursor()`` DBAPI - method. + # finalize + self._checkin() - """ + def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor: + assert self.dbapi_connection is not None return self.dbapi_connection.cursor(*args, **kwargs) - def __getattr__(self, key): + def __getattr__(self, key: str) -> Any: return getattr(self.dbapi_connection, key) - def detach(self): - """Separate this connection from its Pool. - - See :meth:`.PoolProxiedConnection.detach` for full description. - - """ - + def detach(self) -> None: if self._connection_record is not None: rec = self._connection_record rec.fairy_ref = None rec.dbapi_connection = None # TODO: should this be _return_conn? self._pool._do_return_conn(self._connection_record) - self.info = self.info.copy() + + # can't get the descriptor assignment to work here + # in pylance. mypy is OK w/ it + self.info = self.info.copy() # type: ignore + self._connection_record = None if self._pool.dispatch.detach: self._pool.dispatch.detach(self.dbapi_connection, rec) - def close(self): - """Release this connection back to the pool. - - See :meth:`.PoolProxiedConnection.close` for full description. - - """ + def close(self) -> None: self._counter -= 1 if self._counter == 0: self._checkin() - def _close_no_reset(self): + def _close_no_reset(self) -> None: self._counter -= 1 if self._counter == 0: self._checkin(reset=False) diff --git a/lib/sqlalchemy/pool/events.py b/lib/sqlalchemy/pool/events.py index e53d614b0..d0d89291b 100644 --- a/lib/sqlalchemy/pool/events.py +++ b/lib/sqlalchemy/pool/events.py @@ -4,13 +4,26 @@ # # This module is part of SQLAlchemy and is released under # the MIT License: https://www.opensource.org/licenses/mit-license.php +from __future__ import annotations +import typing +from typing import Any +from typing import Optional +from typing import Type +from typing import Union + +from .base import ConnectionPoolEntry from .base import Pool +from .base import PoolProxiedConnection from .. import event from .. import util +if typing.TYPE_CHECKING: + from ..engine import Engine + from ..engine.interfaces import DBAPIConnection + -class PoolEvents(event.Events): +class PoolEvents(event.Events[Pool]): """Available events for :class:`_pool.Pool`. The methods here define the name of an event as well @@ -37,35 +50,48 @@ class PoolEvents(event.Events): # will associate with engine.pool event.listen(engine, 'checkout', my_on_checkout) - """ # noqa + """ # noqa E501 _target_class_doc = "SomeEngineOrPool" _dispatch_target = Pool @util.preload_module("sqlalchemy.engine") @classmethod - def _accept_with(cls, target): - Engine = util.preloaded.engine.Engine + def _accept_with( + cls, target: Union[Pool, Type[Pool], Engine, Type[Engine]] + ) -> Union[Pool, Type[Pool]]: + if not typing.TYPE_CHECKING: + Engine = util.preloaded.engine.Engine if isinstance(target, type): if issubclass(target, Engine): return Pool - elif issubclass(target, Pool): + else: + assert issubclass(target, Pool) return target elif isinstance(target, Engine): return target.pool else: + assert isinstance(target, Pool) return target @classmethod - def _listen(cls, event_key, **kw): + def _listen( # type: ignore[override] # would rather keep **kw + cls, + event_key: event._EventKey[Pool], + **kw: Any, + ) -> None: target = event_key.dispatch_target kw.setdefault("asyncio", target._is_asyncio) event_key.base_listen(**kw) - def connect(self, dbapi_connection, connection_record): + def connect( + self, + dbapi_connection: DBAPIConnection, + connection_record: ConnectionPoolEntry, + ) -> None: """Called at the moment a particular DBAPI connection is first created for a given :class:`_pool.Pool`. @@ -74,14 +100,18 @@ class PoolEvents(event.Events): to produce a new DBAPI connection. :param dbapi_connection: a DBAPI connection. - The :attr:`._ConnectionRecord.dbapi_connection` attribute. + The :attr:`.ConnectionPoolEntry.dbapi_connection` attribute. - :param connection_record: the :class:`._ConnectionRecord` managing the - DBAPI connection. + :param connection_record: the :class:`.ConnectionPoolEntry` managing + the DBAPI connection. """ - def first_connect(self, dbapi_connection, connection_record): + def first_connect( + self, + dbapi_connection: DBAPIConnection, + connection_record: ConnectionPoolEntry, + ) -> None: """Called exactly once for the first time a DBAPI connection is checked out from a particular :class:`_pool.Pool`. @@ -99,24 +129,29 @@ class PoolEvents(event.Events): encoding settings, collation settings, and many others. :param dbapi_connection: a DBAPI connection. - The :attr:`._ConnectionRecord.dbapi_connection` attribute. + The :attr:`.ConnectionPoolEntry.dbapi_connection` attribute. - :param connection_record: the :class:`._ConnectionRecord` managing the - DBAPI connection. + :param connection_record: the :class:`.ConnectionPoolEntry` managing + the DBAPI connection. """ - def checkout(self, dbapi_connection, connection_record, connection_proxy): + def checkout( + self, + dbapi_connection: DBAPIConnection, + connection_record: ConnectionPoolEntry, + connection_proxy: PoolProxiedConnection, + ) -> None: """Called when a connection is retrieved from the Pool. :param dbapi_connection: a DBAPI connection. - The :attr:`._ConnectionRecord.dbapi_connection` attribute. + The :attr:`.ConnectionPoolEntry.dbapi_connection` attribute. - :param connection_record: the :class:`._ConnectionRecord` managing the - DBAPI connection. + :param connection_record: the :class:`.ConnectionPoolEntry` managing + the DBAPI connection. - :param connection_proxy: the :class:`._ConnectionFairy` object which - will proxy the public interface of the DBAPI connection for the + :param connection_proxy: the :class:`.PoolProxiedConnection` object + which will proxy the public interface of the DBAPI connection for the lifespan of the checkout. If you raise a :class:`~sqlalchemy.exc.DisconnectionError`, the current @@ -130,7 +165,11 @@ class PoolEvents(event.Events): """ - def checkin(self, dbapi_connection, connection_record): + def checkin( + self, + dbapi_connection: DBAPIConnection, + connection_record: ConnectionPoolEntry, + ) -> None: """Called when a connection returns to the pool. Note that the connection may be closed, and may be None if the @@ -138,14 +177,18 @@ class PoolEvents(event.Events): for detached connections. (They do not return to the pool.) :param dbapi_connection: a DBAPI connection. - The :attr:`._ConnectionRecord.dbapi_connection` attribute. + The :attr:`.ConnectionPoolEntry.dbapi_connection` attribute. - :param connection_record: the :class:`._ConnectionRecord` managing the - DBAPI connection. + :param connection_record: the :class:`.ConnectionPoolEntry` managing + the DBAPI connection. """ - def reset(self, dbapi_connection, connection_record): + def reset( + self, + dbapi_connection: DBAPIConnection, + connection_record: ConnectionPoolEntry, + ) -> None: """Called before the "reset" action occurs for a pooled connection. This event represents @@ -160,10 +203,10 @@ class PoolEvents(event.Events): cases where the connection is discarded immediately after reset. :param dbapi_connection: a DBAPI connection. - The :attr:`._ConnectionRecord.dbapi_connection` attribute. + The :attr:`.ConnectionPoolEntry.dbapi_connection` attribute. - :param connection_record: the :class:`._ConnectionRecord` managing the - DBAPI connection. + :param connection_record: the :class:`.ConnectionPoolEntry` managing + the DBAPI connection. .. seealso:: @@ -173,21 +216,26 @@ class PoolEvents(event.Events): """ - def invalidate(self, dbapi_connection, connection_record, exception): + def invalidate( + self, + dbapi_connection: DBAPIConnection, + connection_record: ConnectionPoolEntry, + exception: Optional[BaseException], + ) -> None: """Called when a DBAPI connection is to be "invalidated". - This event is called any time the :meth:`._ConnectionRecord.invalidate` - method is invoked, either from API usage or via "auto-invalidation", - without the ``soft`` flag. + This event is called any time the + :meth:`.ConnectionPoolEntry.invalidate` method is invoked, either from + API usage or via "auto-invalidation", without the ``soft`` flag. The event occurs before a final attempt to call ``.close()`` on the connection occurs. :param dbapi_connection: a DBAPI connection. - The :attr:`._ConnectionRecord.dbapi_connection` attribute. + The :attr:`.ConnectionPoolEntry.dbapi_connection` attribute. - :param connection_record: the :class:`._ConnectionRecord` managing the - DBAPI connection. + :param connection_record: the :class:`.ConnectionPoolEntry` managing + the DBAPI connection. :param exception: the exception object corresponding to the reason for this invalidation, if any. May be ``None``. @@ -201,10 +249,16 @@ class PoolEvents(event.Events): """ - def soft_invalidate(self, dbapi_connection, connection_record, exception): + def soft_invalidate( + self, + dbapi_connection: DBAPIConnection, + connection_record: ConnectionPoolEntry, + exception: Optional[BaseException], + ) -> None: """Called when a DBAPI connection is to be "soft invalidated". - This event is called any time the :meth:`._ConnectionRecord.invalidate` + This event is called any time the + :meth:`.ConnectionPoolEntry.invalidate` method is invoked with the ``soft`` flag. Soft invalidation refers to when the connection record that tracks @@ -215,17 +269,21 @@ class PoolEvents(event.Events): .. versionadded:: 1.0.3 :param dbapi_connection: a DBAPI connection. - The :attr:`._ConnectionRecord.dbapi_connection` attribute. + The :attr:`.ConnectionPoolEntry.dbapi_connection` attribute. - :param connection_record: the :class:`._ConnectionRecord` managing the - DBAPI connection. + :param connection_record: the :class:`.ConnectionPoolEntry` managing + the DBAPI connection. :param exception: the exception object corresponding to the reason for this invalidation, if any. May be ``None``. """ - def close(self, dbapi_connection, connection_record): + def close( + self, + dbapi_connection: DBAPIConnection, + connection_record: ConnectionPoolEntry, + ) -> None: """Called when a DBAPI connection is closed. The event is emitted before the close occurs. @@ -241,14 +299,18 @@ class PoolEvents(event.Events): .. versionadded:: 1.1 :param dbapi_connection: a DBAPI connection. - The :attr:`._ConnectionRecord.dbapi_connection` attribute. + The :attr:`.ConnectionPoolEntry.dbapi_connection` attribute. - :param connection_record: the :class:`._ConnectionRecord` managing the - DBAPI connection. + :param connection_record: the :class:`.ConnectionPoolEntry` managing + the DBAPI connection. """ - def detach(self, dbapi_connection, connection_record): + def detach( + self, + dbapi_connection: DBAPIConnection, + connection_record: ConnectionPoolEntry, + ) -> None: """Called when a DBAPI connection is "detached" from a pool. This event is emitted after the detach occurs. The connection @@ -257,14 +319,14 @@ class PoolEvents(event.Events): .. versionadded:: 1.1 :param dbapi_connection: a DBAPI connection. - The :attr:`._ConnectionRecord.dbapi_connection` attribute. + The :attr:`.ConnectionPoolEntry.dbapi_connection` attribute. - :param connection_record: the :class:`._ConnectionRecord` managing the - DBAPI connection. + :param connection_record: the :class:`.ConnectionPoolEntry` managing + the DBAPI connection. """ - def close_detached(self, dbapi_connection): + def close_detached(self, dbapi_connection: DBAPIConnection) -> None: """Called when a detached DBAPI connection is closed. The event is emitted before the close occurs. @@ -276,6 +338,6 @@ class PoolEvents(event.Events): .. versionadded:: 1.1 :param dbapi_connection: a DBAPI connection. - The :attr:`._ConnectionRecord.dbapi_connection` attribute. + The :attr:`.ConnectionPoolEntry.dbapi_connection` attribute. """ diff --git a/lib/sqlalchemy/pool/impl.py b/lib/sqlalchemy/pool/impl.py index 7a422cd2a..d1be3f541 100644 --- a/lib/sqlalchemy/pool/impl.py +++ b/lib/sqlalchemy/pool/impl.py @@ -9,19 +9,36 @@ """Pool implementation classes. """ +from __future__ import annotations import threading import traceback +import typing +from typing import Any +from typing import cast +from typing import List +from typing import Optional +from typing import Set +from typing import Type +from typing import Union import weakref from .base import _AsyncConnDialect from .base import _ConnectionFairy from .base import _ConnectionRecord +from .base import _CreatorFnType +from .base import _CreatorWRecFnType +from .base import ConnectionPoolEntry from .base import Pool +from .base import PoolProxiedConnection from .. import exc from .. import util from ..util import chop_traceback from ..util import queue as sqla_queue +from ..util.typing import Literal + +if typing.TYPE_CHECKING: + from ..engine.interfaces import DBAPIConnection class QueuePool(Pool): @@ -34,17 +51,22 @@ class QueuePool(Pool): """ - _is_asyncio = False - _queue_class = sqla_queue.Queue + _is_asyncio = False # type: ignore[assignment] + + _queue_class: Type[ + sqla_queue.QueueCommon[ConnectionPoolEntry] + ] = sqla_queue.Queue + + _pool: sqla_queue.QueueCommon[ConnectionPoolEntry] def __init__( self, - creator, - pool_size=5, - max_overflow=10, - timeout=30.0, - use_lifo=False, - **kw, + creator: Union[_CreatorFnType, _CreatorWRecFnType], + pool_size: int = 5, + max_overflow: int = 10, + timeout: float = 30.0, + use_lifo: bool = False, + **kw: Any, ): r""" Construct a QueuePool. @@ -107,20 +129,20 @@ class QueuePool(Pool): self._timeout = timeout self._overflow_lock = threading.Lock() - def _do_return_conn(self, conn): + def _do_return_conn(self, record: ConnectionPoolEntry) -> None: try: - self._pool.put(conn, False) + self._pool.put(record, False) except sqla_queue.Full: try: - conn.close() + record.close() finally: self._dec_overflow() - def _do_get(self): + def _do_get(self) -> ConnectionPoolEntry: use_overflow = self._max_overflow > -1 + wait = use_overflow and self._overflow >= self._max_overflow try: - wait = use_overflow and self._overflow >= self._max_overflow return self._pool.get(wait, self._timeout) except sqla_queue.Empty: # don't do things inside of "except Empty", because when we say @@ -144,10 +166,11 @@ class QueuePool(Pool): except: with util.safe_reraise(): self._dec_overflow() + raise else: return self._do_get() - def _inc_overflow(self): + def _inc_overflow(self) -> bool: if self._max_overflow == -1: self._overflow += 1 return True @@ -158,7 +181,7 @@ class QueuePool(Pool): else: return False - def _dec_overflow(self): + def _dec_overflow(self) -> Literal[True]: if self._max_overflow == -1: self._overflow -= 1 return True @@ -166,7 +189,7 @@ class QueuePool(Pool): self._overflow -= 1 return True - def recreate(self): + def recreate(self) -> QueuePool: self.logger.info("Pool recreating") return self.__class__( self._creator, @@ -183,7 +206,7 @@ class QueuePool(Pool): dialect=self._dialect, ) - def dispose(self): + def dispose(self) -> None: while True: try: conn = self._pool.get(False) @@ -194,7 +217,7 @@ class QueuePool(Pool): self._overflow = 0 - self.size() self.logger.info("Pool disposed. %s", self.status()) - def status(self): + def status(self) -> str: return ( "Pool size: %d Connections in pool: %d " "Current Overflow: %d Current Checked out " @@ -207,25 +230,28 @@ class QueuePool(Pool): ) ) - def size(self): + def size(self) -> int: return self._pool.maxsize - def timeout(self): + def timeout(self) -> float: return self._timeout - def checkedin(self): + def checkedin(self) -> int: return self._pool.qsize() - def overflow(self): + def overflow(self) -> int: return self._overflow - def checkedout(self): + def checkedout(self) -> int: return self._pool.maxsize - self._pool.qsize() + self._overflow class AsyncAdaptedQueuePool(QueuePool): - _is_asyncio = True - _queue_class = sqla_queue.AsyncAdaptedQueue + _is_asyncio = True # type: ignore[assignment] + _queue_class: Type[ + sqla_queue.QueueCommon[ConnectionPoolEntry] + ] = sqla_queue.AsyncAdaptedQueue + _dialect = _AsyncConnDialect() @@ -246,16 +272,16 @@ class NullPool(Pool): """ - def status(self): + def status(self) -> str: return "NullPool" - def _do_return_conn(self, conn): - conn.close() + def _do_return_conn(self, record: ConnectionPoolEntry) -> None: + record.close() - def _do_get(self): + def _do_get(self) -> ConnectionPoolEntry: return self._create_connection() - def recreate(self): + def recreate(self) -> NullPool: self.logger.info("Pool recreating") return self.__class__( @@ -269,7 +295,7 @@ class NullPool(Pool): dialect=self._dialect, ) - def dispose(self): + def dispose(self) -> None: pass @@ -304,16 +330,21 @@ class SingletonThreadPool(Pool): """ - _is_asyncio = False + _is_asyncio = False # type: ignore[assignment] - def __init__(self, creator, pool_size=5, **kw): + def __init__( + self, + creator: Union[_CreatorFnType, _CreatorWRecFnType], + pool_size: int = 5, + **kw: Any, + ): Pool.__init__(self, creator, **kw) self._conn = threading.local() self._fairy = threading.local() - self._all_conns = set() + self._all_conns: Set[ConnectionPoolEntry] = set() self.size = pool_size - def recreate(self): + def recreate(self) -> SingletonThreadPool: self.logger.info("Pool recreating") return self.__class__( self._creator, @@ -327,7 +358,7 @@ class SingletonThreadPool(Pool): dialect=self._dialect, ) - def dispose(self): + def dispose(self) -> None: """Dispose of this pool.""" for conn in self._all_conns: @@ -340,23 +371,26 @@ class SingletonThreadPool(Pool): self._all_conns.clear() - def _cleanup(self): + def _cleanup(self) -> None: while len(self._all_conns) >= self.size: c = self._all_conns.pop() c.close() - def status(self): + def status(self) -> str: return "SingletonThreadPool id:%d size: %d" % ( id(self), len(self._all_conns), ) - def _do_return_conn(self, conn): - pass + def _do_return_conn(self, record: ConnectionPoolEntry) -> None: + try: + del self._fairy.current # type: ignore + except AttributeError: + pass - def _do_get(self): + def _do_get(self) -> ConnectionPoolEntry: try: - c = self._conn.current() + c = cast(ConnectionPoolEntry, self._conn.current()) if c: return c except AttributeError: @@ -368,11 +402,11 @@ class SingletonThreadPool(Pool): self._all_conns.add(c) return c - def connect(self): + def connect(self) -> PoolProxiedConnection: # vendored from Pool to include the now removed use_threadlocal # behavior try: - rec = self._fairy.current() + rec = cast(_ConnectionFairy, self._fairy.current()) except AttributeError: pass else: @@ -381,13 +415,6 @@ class SingletonThreadPool(Pool): return _ConnectionFairy._checkout(self, self._fairy) - def _return_conn(self, record): - try: - del self._fairy.current - except AttributeError: - pass - self._do_return_conn(record) - class StaticPool(Pool): @@ -401,13 +428,13 @@ class StaticPool(Pool): """ @util.memoized_property - def connection(self): + def connection(self) -> _ConnectionRecord: return _ConnectionRecord(self) - def status(self): + def status(self) -> str: return "StaticPool" - def dispose(self): + def dispose(self) -> None: if ( "connection" in self.__dict__ and self.connection.dbapi_connection is not None @@ -415,7 +442,7 @@ class StaticPool(Pool): self.connection.close() del self.__dict__["connection"] - def recreate(self): + def recreate(self) -> StaticPool: self.logger.info("Pool recreating") return self.__class__( creator=self._creator, @@ -428,20 +455,23 @@ class StaticPool(Pool): dialect=self._dialect, ) - def _transfer_from(self, other_static_pool): + def _transfer_from(self, other_static_pool: StaticPool) -> None: # used by the test suite to make a new engine / pool without # losing the state of an existing SQLite :memory: connection - self._invoke_creator = ( - lambda crec: other_static_pool.connection.dbapi_connection - ) + def creator(rec: ConnectionPoolEntry) -> DBAPIConnection: + conn = other_static_pool.connection.dbapi_connection + assert conn is not None + return conn - def _create_connection(self): + self._invoke_creator = creator + + def _create_connection(self) -> ConnectionPoolEntry: raise NotImplementedError() - def _do_return_conn(self, conn): + def _do_return_conn(self, record: ConnectionPoolEntry) -> None: pass - def _do_get(self): + def _do_get(self) -> ConnectionPoolEntry: rec = self.connection if rec._is_hard_or_soft_invalidated(): del self.__dict__["connection"] @@ -461,28 +491,31 @@ class AssertionPool(Pool): """ - def __init__(self, *args, **kw): + _conn: Optional[ConnectionPoolEntry] + _checkout_traceback: Optional[List[str]] + + def __init__(self, *args: Any, **kw: Any): self._conn = None self._checked_out = False self._store_traceback = kw.pop("store_traceback", True) self._checkout_traceback = None Pool.__init__(self, *args, **kw) - def status(self): + def status(self) -> str: return "AssertionPool" - def _do_return_conn(self, conn): + def _do_return_conn(self, record: ConnectionPoolEntry) -> None: if not self._checked_out: raise AssertionError("connection is not checked out") self._checked_out = False - assert conn is self._conn + assert record is self._conn - def dispose(self): + def dispose(self) -> None: self._checked_out = False if self._conn: self._conn.close() - def recreate(self): + def recreate(self) -> AssertionPool: self.logger.info("Pool recreating") return self.__class__( self._creator, @@ -495,7 +528,7 @@ class AssertionPool(Pool): dialect=self._dialect, ) - def _do_get(self): + def _do_get(self) -> ConnectionPoolEntry: if self._checked_out: if self._checkout_traceback: suffix = " at:\n%s" % "".join( |
