summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/pool.py')
-rw-r--r--lib/sqlalchemy/pool.py214
1 files changed, 115 insertions, 99 deletions
diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py
index 7150ce81f..d26bbf32c 100644
--- a/lib/sqlalchemy/pool.py
+++ b/lib/sqlalchemy/pool.py
@@ -67,7 +67,9 @@ reset_rollback = util.symbol('reset_rollback')
reset_commit = util.symbol('reset_commit')
reset_none = util.symbol('reset_none')
+
class _ConnDialect(object):
+
"""partial implementation of :class:`.Dialect`
which provides DBAPI connection methods.
@@ -76,6 +78,7 @@ class _ConnDialect(object):
:class:`.Dialect`.
"""
+
def do_rollback(self, dbapi_connection):
dbapi_connection.rollback()
@@ -85,20 +88,22 @@ class _ConnDialect(object):
def do_close(self, dbapi_connection):
dbapi_connection.close()
+
class Pool(log.Identified):
+
"""Abstract base class for connection pools."""
_dialect = _ConnDialect()
def __init__(self,
- creator, recycle=-1, echo=None,
- use_threadlocal=False,
- logging_name=None,
- reset_on_return=True,
- listeners=None,
- events=None,
- _dispatch=None,
- _dialect=None):
+ creator, recycle=-1, echo=None,
+ use_threadlocal=False,
+ logging_name=None,
+ reset_on_return=True,
+ listeners=None,
+ events=None,
+ _dispatch=None,
+ _dialect=None):
"""
Construct a Pool.
@@ -134,10 +139,10 @@ class Pool(log.Identified):
.. warning:: The :paramref:`.Pool.use_threadlocal` flag
**does not affect the behavior** of :meth:`.Engine.connect`.
- :meth:`.Engine.connect` makes use of the :meth:`.Pool.unique_connection`
- method which **does not use thread local context**.
- To produce a :class:`.Connection` which refers to the
- :meth:`.Pool.connect` method, use
+ :meth:`.Engine.connect` makes use of the
+ :meth:`.Pool.unique_connection` method which **does not use thread
+ local context**. To produce a :class:`.Connection` which refers
+ to the :meth:`.Pool.connect` method, use
:meth:`.Engine.contextual_connect`.
Note that other SQLAlchemy connectivity systems such as
@@ -221,8 +226,8 @@ class Pool(log.Identified):
self._reset_on_return = reset_commit
else:
raise exc.ArgumentError(
- "Invalid value for 'reset_on_return': %r"
- % reset_on_return)
+ "Invalid value for 'reset_on_return': %r"
+ % reset_on_return)
self.echo = echo
if _dispatch:
@@ -234,8 +239,8 @@ class Pool(log.Identified):
event.listen(self, target, fn)
if listeners:
util.warn_deprecated(
- "The 'listeners' argument to Pool (and "
- "create_engine()) is deprecated. Use event.listen().")
+ "The 'listeners' argument to Pool (and "
+ "create_engine()) is deprecated. Use event.listen().")
for l in listeners:
self.add_listener(l)
@@ -247,7 +252,7 @@ class Pool(log.Identified):
raise
except:
self.logger.error("Exception closing connection %r",
- connection, exc_info=True)
+ connection, exc_info=True)
@util.deprecated(
2.7, "Pool.add_listener is deprecated. Use event.listen()")
@@ -267,8 +272,9 @@ class Pool(log.Identified):
This method is equivalent to :meth:`.Pool.connect` when the
:paramref:`.Pool.use_threadlocal` flag is not set to True.
- When :paramref:`.Pool.use_threadlocal` is True, the :meth:`.Pool.unique_connection`
- method provides a means of bypassing the threadlocal context.
+ When :paramref:`.Pool.use_threadlocal` is True, the
+ :meth:`.Pool.unique_connection` method provides a means of bypassing
+ the threadlocal context.
"""
return _ConnectionFairy._checkout(self)
@@ -295,7 +301,6 @@ class Pool(log.Identified):
if getattr(connection, 'is_valid', False):
connection.invalidate(exception)
-
def recreate(self):
"""Return a new :class:`.Pool`, of the same class as this one
and configured with identical creation arguments.
@@ -371,6 +376,7 @@ class Pool(log.Identified):
class _ConnectionRecord(object):
+
"""Internal object which maintains an individual DBAPI connection
referenced by a :class:`.Pool`.
@@ -406,8 +412,8 @@ class _ConnectionRecord(object):
self.finalize_callback = deque()
pool.dispatch.first_connect.\
- for_modify(pool.dispatch).\
- exec_once(self.connection, self)
+ for_modify(pool.dispatch).\
+ exec_once(self.connection, self)
pool.dispatch.connect(self.connection, self)
connection = None
@@ -439,16 +445,16 @@ class _ConnectionRecord(object):
raise
fairy = _ConnectionFairy(dbapi_connection, rec)
rec.fairy_ref = weakref.ref(
- fairy,
- lambda ref: _finalize_fairy and \
- _finalize_fairy(
- dbapi_connection,
- rec, pool, ref, pool._echo)
- )
+ fairy,
+ lambda ref: _finalize_fairy and
+ _finalize_fairy(
+ dbapi_connection,
+ rec, pool, ref, pool._echo)
+ )
_refs.add(rec)
if pool._echo:
pool.logger.debug("Connection %r checked out from pool",
- dbapi_connection)
+ dbapi_connection)
return fairy
def checkin(self):
@@ -462,7 +468,6 @@ class _ConnectionRecord(object):
pool.dispatch.checkin(connection, self)
pool._return_conn(self)
-
def close(self):
if self.connection is not None:
self.__close()
@@ -471,9 +476,9 @@ class _ConnectionRecord(object):
"""Invalidate the DBAPI connection held by this :class:`._ConnectionRecord`.
This method is called for all connection invalidations, including
- when the :meth:`._ConnectionFairy.invalidate` or :meth:`.Connection.invalidate`
- methods are called, as well as when any so-called "automatic invalidation"
- condition occurs.
+ when the :meth:`._ConnectionFairy.invalidate` or
+ :meth:`.Connection.invalidate` methods are called, as well as when any
+ so-called "automatic invalidation" condition occurs.
.. seealso::
@@ -504,14 +509,15 @@ class _ConnectionRecord(object):
elif self.__pool._recycle > -1 and \
time.time() - self.starttime > self.__pool._recycle:
self.__pool.logger.info(
- "Connection %r exceeded timeout; recycling",
- self.connection)
+ "Connection %r exceeded timeout; recycling",
+ self.connection)
recycle = True
elif self.__pool._invalidate_time > self.starttime:
self.__pool.logger.info(
- "Connection %r invalidated due to pool invalidation; recycling",
- self.connection
- )
+ "Connection %r invalidated due to pool invalidation; " +
+ "recycling",
+ self.connection
+ )
recycle = True
if recycle:
@@ -536,7 +542,8 @@ class _ConnectionRecord(object):
raise
-def _finalize_fairy(connection, connection_record, pool, ref, echo, fairy=None):
+def _finalize_fairy(connection, connection_record,
+ pool, ref, echo, fairy=None):
"""Cleanup for a :class:`._ConnectionFairy` whether or not it's already
been garbage collected.
@@ -544,13 +551,13 @@ def _finalize_fairy(connection, connection_record, pool, ref, echo, fairy=None):
_refs.discard(connection_record)
if ref is not None and \
- connection_record.fairy_ref is not ref:
+ connection_record.fairy_ref is not ref:
return
if connection is not None:
if connection_record and echo:
pool.logger.debug("Connection %r being returned to pool",
- connection)
+ connection)
try:
fairy = fairy or _ConnectionFairy(connection, connection_record)
@@ -561,7 +568,8 @@ def _finalize_fairy(connection, connection_record, pool, ref, echo, fairy=None):
if not connection_record:
pool._close_connection(connection)
except Exception as e:
- pool.logger.error("Exception during reset or similar", exc_info=True)
+ pool.logger.error(
+ "Exception during reset or similar", exc_info=True)
if connection_record:
connection_record.invalidate(e=e)
if isinstance(e, (SystemExit, KeyboardInterrupt)):
@@ -575,6 +583,7 @@ _refs = set()
class _ConnectionFairy(object):
+
"""Proxies a DBAPI connection and provides return-on-dereference
support.
@@ -582,10 +591,11 @@ class _ConnectionFairy(object):
to provide context management to a DBAPI connection delivered by
that :class:`.Pool`.
- The name "fairy" is inspired by the fact that the :class:`._ConnectionFairy`
- object's lifespan is transitory, as it lasts only for the length of a
- specific DBAPI connection being checked out from the pool, and additionally
- that as a transparent proxy, it is mostly invisible.
+ The name "fairy" is inspired by the fact that the
+ :class:`._ConnectionFairy` object's lifespan is transitory, as it lasts
+ only for the length of a specific DBAPI connection being checked out from
+ the pool, and additionally that as a transparent proxy, it is mostly
+ invisible.
.. seealso::
@@ -611,8 +621,8 @@ class _ConnectionFairy(object):
_reset_agent = None
"""Refer to an object with a ``.commit()`` and ``.rollback()`` method;
if non-None, the "reset-on-return" feature will call upon this object
- rather than directly against the dialect-level do_rollback() and do_commit()
- methods.
+ rather than directly against the dialect-level do_rollback() and
+ do_commit() methods.
In practice, a :class:`.Connection` assigns a :class:`.Transaction` object
to this variable when one is in scope so that the :class:`.Transaction`
@@ -649,8 +659,8 @@ class _ConnectionFairy(object):
while attempts > 0:
try:
pool.dispatch.checkout(fairy.connection,
- fairy._connection_record,
- fairy)
+ fairy._connection_record,
+ fairy)
return fairy
except exc.DisconnectionError as e:
pool.logger.info(
@@ -668,7 +678,7 @@ class _ConnectionFairy(object):
def _checkin(self):
_finalize_fairy(self.connection, self._connection_record,
- self._pool, None, self._echo, fairy=self)
+ self._pool, None, self._echo, fairy=self)
self.connection = None
self._connection_record = None
@@ -680,9 +690,9 @@ class _ConnectionFairy(object):
if pool._reset_on_return is reset_rollback:
if echo:
pool.logger.debug("Connection %s rollback-on-return%s",
- self.connection,
- ", via agent"
- if self._reset_agent else "")
+ self.connection,
+ ", via agent"
+ if self._reset_agent else "")
if self._reset_agent:
self._reset_agent.rollback()
else:
@@ -690,9 +700,9 @@ class _ConnectionFairy(object):
elif pool._reset_on_return is reset_commit:
if echo:
pool.logger.debug("Connection %s commit-on-return%s",
- self.connection,
- ", via agent"
- if self._reset_agent else "")
+ self.connection,
+ ", via agent"
+ if self._reset_agent else "")
if self._reset_agent:
self._reset_agent.commit()
else:
@@ -759,7 +769,6 @@ class _ConnectionFairy(object):
def __getattr__(self, key):
return getattr(self.connection, key)
-
def detach(self):
"""Separate this connection from its Pool.
@@ -788,8 +797,8 @@ class _ConnectionFairy(object):
self._checkin()
-
class SingletonThreadPool(Pool):
+
"""A Pool that maintains one connection per thread.
Maintains one connection per each thread, never moving a connection to a
@@ -816,14 +825,14 @@ class SingletonThreadPool(Pool):
def recreate(self):
self.logger.info("Pool recreating")
return self.__class__(self._creator,
- pool_size=self.size,
- recycle=self._recycle,
- echo=self.echo,
- logging_name=self._orig_logging_name,
- use_threadlocal=self._use_threadlocal,
- reset_on_return=self._reset_on_return,
- _dispatch=self.dispatch,
- _dialect=self._dialect)
+ pool_size=self.size,
+ recycle=self._recycle,
+ echo=self.echo,
+ logging_name=self._orig_logging_name,
+ use_threadlocal=self._use_threadlocal,
+ reset_on_return=self._reset_on_return,
+ _dispatch=self.dispatch,
+ _dialect=self._dialect)
def dispose(self):
"""Dispose of this pool."""
@@ -847,7 +856,7 @@ class SingletonThreadPool(Pool):
def status(self):
return "SingletonThreadPool id:%d size: %d" % \
- (id(self), len(self._all_conns))
+ (id(self), len(self._all_conns))
def _do_return_conn(self, conn):
pass
@@ -868,6 +877,7 @@ class SingletonThreadPool(Pool):
class QueuePool(Pool):
+
"""A :class:`.Pool` that imposes a limit on the number of open connections.
:class:`.QueuePool` is the default pooling implementation used for
@@ -908,9 +918,10 @@ class QueuePool(Pool):
:param timeout: The number of seconds to wait before giving up
on returning a connection. Defaults to 30.
- :param \**kw: Other keyword arguments including :paramref:`.Pool.recycle`,
- :paramref:`.Pool.echo`, :paramref:`.Pool.reset_on_return` and others
- are passed to the :class:`.Pool` constructor.
+ :param \**kw: Other keyword arguments including
+ :paramref:`.Pool.recycle`, :paramref:`.Pool.echo`,
+ :paramref:`.Pool.reset_on_return` and others are passed to the
+ :class:`.Pool` constructor.
"""
Pool.__init__(self, creator, **kw)
@@ -941,9 +952,9 @@ class QueuePool(Pool):
return self._do_get()
else:
raise exc.TimeoutError(
- "QueuePool limit of size %d overflow %d reached, "
- "connection timed out, timeout %d" %
- (self.size(), self.overflow(), self._timeout))
+ "QueuePool limit of size %d overflow %d reached, "
+ "connection timed out, timeout %d" %
+ (self.size(), self.overflow(), self._timeout))
if self._inc_overflow():
try:
@@ -976,14 +987,14 @@ class QueuePool(Pool):
def recreate(self):
self.logger.info("Pool recreating")
return self.__class__(self._creator, pool_size=self._pool.maxsize,
- max_overflow=self._max_overflow,
- timeout=self._timeout,
- recycle=self._recycle, echo=self.echo,
- logging_name=self._orig_logging_name,
- use_threadlocal=self._use_threadlocal,
- reset_on_return=self._reset_on_return,
- _dispatch=self.dispatch,
- _dialect=self._dialect)
+ max_overflow=self._max_overflow,
+ timeout=self._timeout,
+ recycle=self._recycle, echo=self.echo,
+ logging_name=self._orig_logging_name,
+ use_threadlocal=self._use_threadlocal,
+ reset_on_return=self._reset_on_return,
+ _dispatch=self.dispatch,
+ _dialect=self._dialect)
def dispose(self):
while True:
@@ -998,11 +1009,11 @@ class QueuePool(Pool):
def status(self):
return "Pool size: %d Connections in pool: %d "\
- "Current Overflow: %d Current Checked out "\
- "connections: %d" % (self.size(),
- self.checkedin(),
- self.overflow(),
- self.checkedout())
+ "Current Overflow: %d Current Checked out "\
+ "connections: %d" % (self.size(),
+ self.checkedin(),
+ self.overflow(),
+ self.checkedout())
def size(self):
return self._pool.maxsize
@@ -1018,6 +1029,7 @@ class QueuePool(Pool):
class NullPool(Pool):
+
"""A Pool which does not pool connections.
Instead it literally opens and closes the underlying DB-API connection
@@ -1046,19 +1058,20 @@ class NullPool(Pool):
self.logger.info("Pool recreating")
return self.__class__(self._creator,
- recycle=self._recycle,
- echo=self.echo,
- logging_name=self._orig_logging_name,
- use_threadlocal=self._use_threadlocal,
- reset_on_return=self._reset_on_return,
- _dispatch=self.dispatch,
- _dialect=self._dialect)
+ recycle=self._recycle,
+ echo=self.echo,
+ logging_name=self._orig_logging_name,
+ use_threadlocal=self._use_threadlocal,
+ reset_on_return=self._reset_on_return,
+ _dispatch=self.dispatch,
+ _dialect=self._dialect)
def dispose(self):
pass
class StaticPool(Pool):
+
"""A Pool of exactly one connection, used for all requests.
Reconnect-related functions such as ``recycle`` and connection
@@ -1106,6 +1119,7 @@ class StaticPool(Pool):
class AssertionPool(Pool):
+
"""A :class:`.Pool` that allows at most one checked out connection at
any given time.
@@ -1119,6 +1133,7 @@ class AssertionPool(Pool):
this in the assertion error raised.
"""
+
def __init__(self, *args, **kw):
self._conn = None
self._checked_out = False
@@ -1143,9 +1158,9 @@ class AssertionPool(Pool):
def recreate(self):
self.logger.info("Pool recreating")
return self.__class__(self._creator, echo=self.echo,
- logging_name=self._orig_logging_name,
- _dispatch=self.dispatch,
- _dialect=self._dialect)
+ logging_name=self._orig_logging_name,
+ _dispatch=self.dispatch,
+ _dialect=self._dialect)
def _do_get(self):
if self._checked_out:
@@ -1166,6 +1181,7 @@ class AssertionPool(Pool):
class _DBProxy(object):
+
"""Layers connection pooling behavior on top of a standard DB-API module.
Proxies a DB-API 2.0 connect() call to a connection pool keyed to the
@@ -1211,8 +1227,8 @@ class _DBProxy(object):
try:
if key not in self.pools:
kw.pop('sa_pool_key', None)
- pool = self.poolclass(lambda:
- self.module.connect(*args, **kw), **self.kw)
+ pool = self.poolclass(
+ lambda: self.module.connect(*args, **kw), **self.kw)
self.pools[key] = pool
return pool
else: