diff options
Diffstat (limited to 'lib/sqlalchemy/pool.py')
-rw-r--r-- | lib/sqlalchemy/pool.py | 214 |
1 files changed, 115 insertions, 99 deletions
diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py index 7150ce81f..d26bbf32c 100644 --- a/lib/sqlalchemy/pool.py +++ b/lib/sqlalchemy/pool.py @@ -67,7 +67,9 @@ reset_rollback = util.symbol('reset_rollback') reset_commit = util.symbol('reset_commit') reset_none = util.symbol('reset_none') + class _ConnDialect(object): + """partial implementation of :class:`.Dialect` which provides DBAPI connection methods. @@ -76,6 +78,7 @@ class _ConnDialect(object): :class:`.Dialect`. """ + def do_rollback(self, dbapi_connection): dbapi_connection.rollback() @@ -85,20 +88,22 @@ class _ConnDialect(object): def do_close(self, dbapi_connection): dbapi_connection.close() + class Pool(log.Identified): + """Abstract base class for connection pools.""" _dialect = _ConnDialect() def __init__(self, - creator, recycle=-1, echo=None, - use_threadlocal=False, - logging_name=None, - reset_on_return=True, - listeners=None, - events=None, - _dispatch=None, - _dialect=None): + creator, recycle=-1, echo=None, + use_threadlocal=False, + logging_name=None, + reset_on_return=True, + listeners=None, + events=None, + _dispatch=None, + _dialect=None): """ Construct a Pool. @@ -134,10 +139,10 @@ class Pool(log.Identified): .. warning:: The :paramref:`.Pool.use_threadlocal` flag **does not affect the behavior** of :meth:`.Engine.connect`. - :meth:`.Engine.connect` makes use of the :meth:`.Pool.unique_connection` - method which **does not use thread local context**. - To produce a :class:`.Connection` which refers to the - :meth:`.Pool.connect` method, use + :meth:`.Engine.connect` makes use of the + :meth:`.Pool.unique_connection` method which **does not use thread + local context**. To produce a :class:`.Connection` which refers + to the :meth:`.Pool.connect` method, use :meth:`.Engine.contextual_connect`. Note that other SQLAlchemy connectivity systems such as @@ -221,8 +226,8 @@ class Pool(log.Identified): self._reset_on_return = reset_commit else: raise exc.ArgumentError( - "Invalid value for 'reset_on_return': %r" - % reset_on_return) + "Invalid value for 'reset_on_return': %r" + % reset_on_return) self.echo = echo if _dispatch: @@ -234,8 +239,8 @@ class Pool(log.Identified): event.listen(self, target, fn) if listeners: util.warn_deprecated( - "The 'listeners' argument to Pool (and " - "create_engine()) is deprecated. Use event.listen().") + "The 'listeners' argument to Pool (and " + "create_engine()) is deprecated. Use event.listen().") for l in listeners: self.add_listener(l) @@ -247,7 +252,7 @@ class Pool(log.Identified): raise except: self.logger.error("Exception closing connection %r", - connection, exc_info=True) + connection, exc_info=True) @util.deprecated( 2.7, "Pool.add_listener is deprecated. Use event.listen()") @@ -267,8 +272,9 @@ class Pool(log.Identified): This method is equivalent to :meth:`.Pool.connect` when the :paramref:`.Pool.use_threadlocal` flag is not set to True. - When :paramref:`.Pool.use_threadlocal` is True, the :meth:`.Pool.unique_connection` - method provides a means of bypassing the threadlocal context. + When :paramref:`.Pool.use_threadlocal` is True, the + :meth:`.Pool.unique_connection` method provides a means of bypassing + the threadlocal context. """ return _ConnectionFairy._checkout(self) @@ -295,7 +301,6 @@ class Pool(log.Identified): if getattr(connection, 'is_valid', False): connection.invalidate(exception) - def recreate(self): """Return a new :class:`.Pool`, of the same class as this one and configured with identical creation arguments. @@ -371,6 +376,7 @@ class Pool(log.Identified): class _ConnectionRecord(object): + """Internal object which maintains an individual DBAPI connection referenced by a :class:`.Pool`. @@ -406,8 +412,8 @@ class _ConnectionRecord(object): self.finalize_callback = deque() pool.dispatch.first_connect.\ - for_modify(pool.dispatch).\ - exec_once(self.connection, self) + for_modify(pool.dispatch).\ + exec_once(self.connection, self) pool.dispatch.connect(self.connection, self) connection = None @@ -439,16 +445,16 @@ class _ConnectionRecord(object): raise fairy = _ConnectionFairy(dbapi_connection, rec) rec.fairy_ref = weakref.ref( - fairy, - lambda ref: _finalize_fairy and \ - _finalize_fairy( - dbapi_connection, - rec, pool, ref, pool._echo) - ) + fairy, + lambda ref: _finalize_fairy and + _finalize_fairy( + dbapi_connection, + rec, pool, ref, pool._echo) + ) _refs.add(rec) if pool._echo: pool.logger.debug("Connection %r checked out from pool", - dbapi_connection) + dbapi_connection) return fairy def checkin(self): @@ -462,7 +468,6 @@ class _ConnectionRecord(object): pool.dispatch.checkin(connection, self) pool._return_conn(self) - def close(self): if self.connection is not None: self.__close() @@ -471,9 +476,9 @@ class _ConnectionRecord(object): """Invalidate the DBAPI connection held by this :class:`._ConnectionRecord`. This method is called for all connection invalidations, including - when the :meth:`._ConnectionFairy.invalidate` or :meth:`.Connection.invalidate` - methods are called, as well as when any so-called "automatic invalidation" - condition occurs. + when the :meth:`._ConnectionFairy.invalidate` or + :meth:`.Connection.invalidate` methods are called, as well as when any + so-called "automatic invalidation" condition occurs. .. seealso:: @@ -504,14 +509,15 @@ class _ConnectionRecord(object): elif self.__pool._recycle > -1 and \ time.time() - self.starttime > self.__pool._recycle: self.__pool.logger.info( - "Connection %r exceeded timeout; recycling", - self.connection) + "Connection %r exceeded timeout; recycling", + self.connection) recycle = True elif self.__pool._invalidate_time > self.starttime: self.__pool.logger.info( - "Connection %r invalidated due to pool invalidation; recycling", - self.connection - ) + "Connection %r invalidated due to pool invalidation; " + + "recycling", + self.connection + ) recycle = True if recycle: @@ -536,7 +542,8 @@ class _ConnectionRecord(object): raise -def _finalize_fairy(connection, connection_record, pool, ref, echo, fairy=None): +def _finalize_fairy(connection, connection_record, + pool, ref, echo, fairy=None): """Cleanup for a :class:`._ConnectionFairy` whether or not it's already been garbage collected. @@ -544,13 +551,13 @@ def _finalize_fairy(connection, connection_record, pool, ref, echo, fairy=None): _refs.discard(connection_record) if ref is not None and \ - connection_record.fairy_ref is not ref: + connection_record.fairy_ref is not ref: return if connection is not None: if connection_record and echo: pool.logger.debug("Connection %r being returned to pool", - connection) + connection) try: fairy = fairy or _ConnectionFairy(connection, connection_record) @@ -561,7 +568,8 @@ def _finalize_fairy(connection, connection_record, pool, ref, echo, fairy=None): if not connection_record: pool._close_connection(connection) except Exception as e: - pool.logger.error("Exception during reset or similar", exc_info=True) + pool.logger.error( + "Exception during reset or similar", exc_info=True) if connection_record: connection_record.invalidate(e=e) if isinstance(e, (SystemExit, KeyboardInterrupt)): @@ -575,6 +583,7 @@ _refs = set() class _ConnectionFairy(object): + """Proxies a DBAPI connection and provides return-on-dereference support. @@ -582,10 +591,11 @@ class _ConnectionFairy(object): to provide context management to a DBAPI connection delivered by that :class:`.Pool`. - The name "fairy" is inspired by the fact that the :class:`._ConnectionFairy` - object's lifespan is transitory, as it lasts only for the length of a - specific DBAPI connection being checked out from the pool, and additionally - that as a transparent proxy, it is mostly invisible. + The name "fairy" is inspired by the fact that the + :class:`._ConnectionFairy` object's lifespan is transitory, as it lasts + only for the length of a specific DBAPI connection being checked out from + the pool, and additionally that as a transparent proxy, it is mostly + invisible. .. seealso:: @@ -611,8 +621,8 @@ class _ConnectionFairy(object): _reset_agent = None """Refer to an object with a ``.commit()`` and ``.rollback()`` method; if non-None, the "reset-on-return" feature will call upon this object - rather than directly against the dialect-level do_rollback() and do_commit() - methods. + rather than directly against the dialect-level do_rollback() and + do_commit() methods. In practice, a :class:`.Connection` assigns a :class:`.Transaction` object to this variable when one is in scope so that the :class:`.Transaction` @@ -649,8 +659,8 @@ class _ConnectionFairy(object): while attempts > 0: try: pool.dispatch.checkout(fairy.connection, - fairy._connection_record, - fairy) + fairy._connection_record, + fairy) return fairy except exc.DisconnectionError as e: pool.logger.info( @@ -668,7 +678,7 @@ class _ConnectionFairy(object): def _checkin(self): _finalize_fairy(self.connection, self._connection_record, - self._pool, None, self._echo, fairy=self) + self._pool, None, self._echo, fairy=self) self.connection = None self._connection_record = None @@ -680,9 +690,9 @@ class _ConnectionFairy(object): if pool._reset_on_return is reset_rollback: if echo: pool.logger.debug("Connection %s rollback-on-return%s", - self.connection, - ", via agent" - if self._reset_agent else "") + self.connection, + ", via agent" + if self._reset_agent else "") if self._reset_agent: self._reset_agent.rollback() else: @@ -690,9 +700,9 @@ class _ConnectionFairy(object): elif pool._reset_on_return is reset_commit: if echo: pool.logger.debug("Connection %s commit-on-return%s", - self.connection, - ", via agent" - if self._reset_agent else "") + self.connection, + ", via agent" + if self._reset_agent else "") if self._reset_agent: self._reset_agent.commit() else: @@ -759,7 +769,6 @@ class _ConnectionFairy(object): def __getattr__(self, key): return getattr(self.connection, key) - def detach(self): """Separate this connection from its Pool. @@ -788,8 +797,8 @@ class _ConnectionFairy(object): self._checkin() - class SingletonThreadPool(Pool): + """A Pool that maintains one connection per thread. Maintains one connection per each thread, never moving a connection to a @@ -816,14 +825,14 @@ class SingletonThreadPool(Pool): def recreate(self): self.logger.info("Pool recreating") return self.__class__(self._creator, - pool_size=self.size, - recycle=self._recycle, - echo=self.echo, - logging_name=self._orig_logging_name, - use_threadlocal=self._use_threadlocal, - reset_on_return=self._reset_on_return, - _dispatch=self.dispatch, - _dialect=self._dialect) + pool_size=self.size, + recycle=self._recycle, + echo=self.echo, + logging_name=self._orig_logging_name, + use_threadlocal=self._use_threadlocal, + reset_on_return=self._reset_on_return, + _dispatch=self.dispatch, + _dialect=self._dialect) def dispose(self): """Dispose of this pool.""" @@ -847,7 +856,7 @@ class SingletonThreadPool(Pool): def status(self): return "SingletonThreadPool id:%d size: %d" % \ - (id(self), len(self._all_conns)) + (id(self), len(self._all_conns)) def _do_return_conn(self, conn): pass @@ -868,6 +877,7 @@ class SingletonThreadPool(Pool): class QueuePool(Pool): + """A :class:`.Pool` that imposes a limit on the number of open connections. :class:`.QueuePool` is the default pooling implementation used for @@ -908,9 +918,10 @@ class QueuePool(Pool): :param timeout: The number of seconds to wait before giving up on returning a connection. Defaults to 30. - :param \**kw: Other keyword arguments including :paramref:`.Pool.recycle`, - :paramref:`.Pool.echo`, :paramref:`.Pool.reset_on_return` and others - are passed to the :class:`.Pool` constructor. + :param \**kw: Other keyword arguments including + :paramref:`.Pool.recycle`, :paramref:`.Pool.echo`, + :paramref:`.Pool.reset_on_return` and others are passed to the + :class:`.Pool` constructor. """ Pool.__init__(self, creator, **kw) @@ -941,9 +952,9 @@ class QueuePool(Pool): return self._do_get() else: raise exc.TimeoutError( - "QueuePool limit of size %d overflow %d reached, " - "connection timed out, timeout %d" % - (self.size(), self.overflow(), self._timeout)) + "QueuePool limit of size %d overflow %d reached, " + "connection timed out, timeout %d" % + (self.size(), self.overflow(), self._timeout)) if self._inc_overflow(): try: @@ -976,14 +987,14 @@ class QueuePool(Pool): def recreate(self): self.logger.info("Pool recreating") return self.__class__(self._creator, pool_size=self._pool.maxsize, - max_overflow=self._max_overflow, - timeout=self._timeout, - recycle=self._recycle, echo=self.echo, - logging_name=self._orig_logging_name, - use_threadlocal=self._use_threadlocal, - reset_on_return=self._reset_on_return, - _dispatch=self.dispatch, - _dialect=self._dialect) + max_overflow=self._max_overflow, + timeout=self._timeout, + recycle=self._recycle, echo=self.echo, + logging_name=self._orig_logging_name, + use_threadlocal=self._use_threadlocal, + reset_on_return=self._reset_on_return, + _dispatch=self.dispatch, + _dialect=self._dialect) def dispose(self): while True: @@ -998,11 +1009,11 @@ class QueuePool(Pool): def status(self): return "Pool size: %d Connections in pool: %d "\ - "Current Overflow: %d Current Checked out "\ - "connections: %d" % (self.size(), - self.checkedin(), - self.overflow(), - self.checkedout()) + "Current Overflow: %d Current Checked out "\ + "connections: %d" % (self.size(), + self.checkedin(), + self.overflow(), + self.checkedout()) def size(self): return self._pool.maxsize @@ -1018,6 +1029,7 @@ class QueuePool(Pool): class NullPool(Pool): + """A Pool which does not pool connections. Instead it literally opens and closes the underlying DB-API connection @@ -1046,19 +1058,20 @@ class NullPool(Pool): self.logger.info("Pool recreating") return self.__class__(self._creator, - recycle=self._recycle, - echo=self.echo, - logging_name=self._orig_logging_name, - use_threadlocal=self._use_threadlocal, - reset_on_return=self._reset_on_return, - _dispatch=self.dispatch, - _dialect=self._dialect) + recycle=self._recycle, + echo=self.echo, + logging_name=self._orig_logging_name, + use_threadlocal=self._use_threadlocal, + reset_on_return=self._reset_on_return, + _dispatch=self.dispatch, + _dialect=self._dialect) def dispose(self): pass class StaticPool(Pool): + """A Pool of exactly one connection, used for all requests. Reconnect-related functions such as ``recycle`` and connection @@ -1106,6 +1119,7 @@ class StaticPool(Pool): class AssertionPool(Pool): + """A :class:`.Pool` that allows at most one checked out connection at any given time. @@ -1119,6 +1133,7 @@ class AssertionPool(Pool): this in the assertion error raised. """ + def __init__(self, *args, **kw): self._conn = None self._checked_out = False @@ -1143,9 +1158,9 @@ class AssertionPool(Pool): def recreate(self): self.logger.info("Pool recreating") return self.__class__(self._creator, echo=self.echo, - logging_name=self._orig_logging_name, - _dispatch=self.dispatch, - _dialect=self._dialect) + logging_name=self._orig_logging_name, + _dispatch=self.dispatch, + _dialect=self._dialect) def _do_get(self): if self._checked_out: @@ -1166,6 +1181,7 @@ class AssertionPool(Pool): class _DBProxy(object): + """Layers connection pooling behavior on top of a standard DB-API module. Proxies a DB-API 2.0 connect() call to a connection pool keyed to the @@ -1211,8 +1227,8 @@ class _DBProxy(object): try: if key not in self.pools: kw.pop('sa_pool_key', None) - pool = self.poolclass(lambda: - self.module.connect(*args, **kw), **self.kw) + pool = self.poolclass( + lambda: self.module.connect(*args, **kw), **self.kw) self.pools[key] = pool return pool else: |