summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/pool.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2013-07-02 13:14:21 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2013-07-02 13:14:21 -0400
commitd3d10c982c8a44c85a0114c491207297eac7611d (patch)
treebdfda394fb23cc8d65c0acb77ca070937d93580a /lib/sqlalchemy/pool.py
parent38c5e870a7883df0ae104df828217e326f6cff6a (diff)
downloadsqlalchemy-d3d10c982c8a44c85a0114c491207297eac7611d.tar.gz
- refactor pool a bit so that intent between ConnectionRecord/ConnectionFairy is clear;
make sure that the DBAPI connection passed to the reset-on-return events/dialect hooks is also a "fairy", so that dictionaries like "info" are available. [ticket:2770] - rework the execution_options system so that the dialect is given the job of making any immediate adjustments based on a set event. move the "isolation level" logic to use this new system. Also work things out so that even engine-level execution options can be used for things like isolation level; the dialect attaches a connect-event handler in this case to handle the task. - to support this new system as well as further extensibiltiy of execution options add events engine_connect(), set_connection_execution_options(), set_engine_execution_options()
Diffstat (limited to 'lib/sqlalchemy/pool.py')
-rw-r--r--lib/sqlalchemy/pool.py198
1 files changed, 112 insertions, 86 deletions
diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py
index dcf3d9e39..97411dd3a 100644
--- a/lib/sqlalchemy/pool.py
+++ b/lib/sqlalchemy/pool.py
@@ -25,6 +25,7 @@ from .util import queue as sqla_queue
from .util import threading, memoized_property, \
chop_traceback
+from collections import deque
proxies = {}
@@ -217,7 +218,7 @@ class Pool(log.Identified):
"""
- return _ConnectionFairy(self).checkout()
+ return _ConnectionFairy.checkout(self)
def _create_connection(self):
"""Called by subclasses to create a new ConnectionRecord."""
@@ -269,18 +270,16 @@ class Pool(log.Identified):
"""
if not self._use_threadlocal:
- return _ConnectionFairy(self).checkout()
+ return _ConnectionFairy.checkout(self)
try:
rec = self._threadconns.current()
- if rec:
- return rec.checkout()
except AttributeError:
pass
+ else:
+ return rec.checkout_existing()
- agent = _ConnectionFairy(self)
- self._threadconns.current = weakref.ref(agent)
- return agent.checkout()
+ return _ConnectionFairy.checkout(self, self._threadconns)
def _return_conn(self, record):
"""Given a _ConnectionRecord, return it to the :class:`.Pool`.
@@ -311,11 +310,11 @@ class Pool(log.Identified):
class _ConnectionRecord(object):
- finalize_callback = None
def __init__(self, pool):
self.__pool = pool
self.connection = self.__connect()
+ self.finalize_callback = deque()
pool.dispatch.first_connect.\
for_modify(pool.dispatch).\
@@ -326,6 +325,36 @@ class _ConnectionRecord(object):
def info(self):
return {}
+ @classmethod
+ def checkout(cls, pool):
+ rec = pool._do_get()
+ dbapi_connection = rec.get_connection()
+ fairy = _ConnectionFairy(dbapi_connection, rec)
+ rec.fairy_ref = weakref.ref(
+ fairy,
+ lambda ref: _finalize_fairy and \
+ _finalize_fairy(
+ dbapi_connection,
+ rec, pool, ref, pool._echo)
+ )
+ _refs.add(rec)
+ if pool._echo:
+ pool.logger.debug("Connection %r checked out from pool",
+ dbapi_connection)
+ return fairy
+
+ def checkin(self):
+ self.fairy_ref = None
+ connection = self.connection
+ pool = self.__pool
+ while self.finalize_callback:
+ finalizer = self.finalize_callback.pop()
+ finalizer(connection)
+ if pool.dispatch.checkin:
+ pool.dispatch.checkin(connection, self)
+ pool._return_conn(self)
+
+
def close(self):
if self.connection is not None:
self.__pool._close_connection(self.connection)
@@ -373,11 +402,15 @@ class _ConnectionRecord(object):
raise
-def _finalize_fairy(connection, connection_record, pool, ref, echo):
+def _finalize_fairy(connection, connection_record, pool, ref, echo, fairy=None):
+ """Cleanup for a :class:`._ConnectionFairy` whether or not it's already
+ been garbage collected.
+
+ """
_refs.discard(connection_record)
if ref is not None and \
- connection_record.fairy is not ref:
+ connection_record.fairy_ref is not ref:
return
if connection is not None:
@@ -386,35 +419,31 @@ def _finalize_fairy(connection, connection_record, pool, ref, echo):
connection)
try:
+ fairy = fairy or _ConnectionFairy(connection, connection_record)
if pool.dispatch.reset:
- pool.dispatch.reset(connection, connection_record)
+ pool.dispatch.reset(fairy, connection_record)
if pool._reset_on_return is reset_rollback:
if echo:
pool.logger.debug("Connection %s rollback-on-return",
connection)
- pool._dialect.do_rollback(connection)
+ pool._dialect.do_rollback(fairy)
elif pool._reset_on_return is reset_commit:
if echo:
- pool.logger.debug("Conneciton %s commit-on-return",
+ pool.logger.debug("Connection %s commit-on-return",
connection)
- pool._dialect.do_commit(connection)
+ pool._dialect.do_commit(fairy)
+
# Immediately close detached instances
- if connection_record is None:
+ if not connection_record:
pool._close_connection(connection)
except Exception as e:
- if connection_record is not None:
+ if connection_record:
connection_record.invalidate(e=e)
if isinstance(e, (SystemExit, KeyboardInterrupt)):
raise
- if connection_record is not None:
- connection_record.fairy = None
- if connection_record.finalize_callback:
- connection_record.finalize_callback(connection)
- del connection_record.finalize_callback
- if pool.dispatch.checkin:
- pool.dispatch.checkin(connection, connection_record)
- pool._return_conn(connection_record)
+ if connection_record:
+ connection_record.checkin()
_refs = set()
@@ -424,27 +453,58 @@ class _ConnectionFairy(object):
"""Proxies a DB-API connection and provides return-on-dereference
support."""
- def __init__(self, pool):
- self._pool = pool
- self.__counter = 0
- self._echo = _echo = pool._should_log_debug()
- try:
- rec = self._connection_record = pool._do_get()
- conn = self.connection = self._connection_record.get_connection()
- rec.fairy = weakref.ref(
- self,
- lambda ref: _finalize_fairy and \
- _finalize_fairy(conn, rec, pool, ref, _echo)
- )
- _refs.add(rec)
- except:
- # helps with endless __getattr__ loops later on
- self.connection = None
- self._connection_record = None
- raise
- if self._echo:
- self._pool.logger.debug("Connection %r checked out from pool",
- self.connection)
+ def __init__(self, dbapi_connection, connection_record):
+ self.connection = dbapi_connection
+ self._connection_record = connection_record
+
+ @classmethod
+ def checkout(cls, pool, threadconns=None, fairy=None):
+ if not fairy:
+ fairy = _ConnectionRecord.checkout(pool)
+
+ fairy._pool = pool
+ fairy._counter = 0
+ fairy._echo = pool._should_log_debug()
+
+ if threadconns is not None:
+ threadconns.current = weakref.ref(fairy)
+
+ if fairy.connection is None:
+ raise exc.InvalidRequestError("This connection is closed")
+ fairy._counter += 1
+
+ if not pool.dispatch.checkout or fairy._counter != 1:
+ return fairy
+
+ # Pool listeners can trigger a reconnection on checkout
+ attempts = 2
+ while attempts > 0:
+ try:
+ pool.dispatch.checkout(fairy.connection,
+ fairy._connection_record,
+ fairy)
+ return fairy
+ except exc.DisconnectionError as e:
+ pool.logger.info(
+ "Disconnection detected on checkout: %s", e)
+ fairy._connection_record.invalidate(e)
+ fairy.connection = fairy._connection_record.get_connection()
+ attempts -= 1
+
+ pool.logger.info("Reconnection attempts exhausted on checkout")
+ fairy.invalidate()
+ raise exc.InvalidRequestError("This connection is closed")
+
+ def checkout_existing(self):
+ return _ConnectionFairy.checkout(self._pool, fairy=self)
+
+ def checkin(self):
+ _finalize_fairy(self.connection, self._connection_record,
+ self._pool, None, self._echo, fairy=self)
+ self.connection = None
+ self._connection_record = None
+
+ _close = checkin
@property
def _logger(self):
@@ -465,10 +525,7 @@ class _ConnectionFairy(object):
in subsequent instances of :class:`.ConnectionFairy`.
"""
- try:
- return self._connection_record.info
- except AttributeError:
- raise exc.InvalidRequestError("This connection is closed")
+ return self._connection_record.info
def invalidate(self, e=None):
"""Mark this connection as invalidated.
@@ -479,10 +536,10 @@ class _ConnectionFairy(object):
if self.connection is None:
raise exc.InvalidRequestError("This connection is closed")
- if self._connection_record is not None:
+ if self._connection_record:
self._connection_record.invalidate(e=e)
self.connection = None
- self._close()
+ self.checkin()
def cursor(self, *args, **kwargs):
return self.connection.cursor(*args, **kwargs)
@@ -490,32 +547,6 @@ class _ConnectionFairy(object):
def __getattr__(self, key):
return getattr(self.connection, key)
- def checkout(self):
- if self.connection is None:
- raise exc.InvalidRequestError("This connection is closed")
- self.__counter += 1
-
- if not self._pool.dispatch.checkout or self.__counter != 1:
- return self
-
- # Pool listeners can trigger a reconnection on checkout
- attempts = 2
- while attempts > 0:
- try:
- self._pool.dispatch.checkout(self.connection,
- self._connection_record,
- self)
- return self
- except exc.DisconnectionError as e:
- self._pool.logger.info(
- "Disconnection detected on checkout: %s", e)
- self._connection_record.invalidate(e)
- self.connection = self._connection_record.get_connection()
- attempts -= 1
-
- self._pool.logger.info("Reconnection attempts exhausted on checkout")
- self.invalidate()
- raise exc.InvalidRequestError("This connection is closed")
def detach(self):
"""Separate this connection from its Pool.
@@ -532,22 +563,17 @@ class _ConnectionFairy(object):
if self._connection_record is not None:
_refs.remove(self._connection_record)
- self._connection_record.fairy = None
+ self._connection_record.fairy_ref = None
self._connection_record.connection = None
self._pool._do_return_conn(self._connection_record)
self.info = self.info.copy()
self._connection_record = None
def close(self):
- self.__counter -= 1
- if self.__counter == 0:
- self._close()
+ self._counter -= 1
+ if self._counter == 0:
+ self.checkin()
- def _close(self):
- _finalize_fairy(self.connection, self._connection_record,
- self._pool, None, self._echo)
- self.connection = None
- self._connection_record = None
class SingletonThreadPool(Pool):