diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2013-07-02 13:14:21 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2013-07-02 13:14:21 -0400 |
| commit | d3d10c982c8a44c85a0114c491207297eac7611d (patch) | |
| tree | bdfda394fb23cc8d65c0acb77ca070937d93580a /lib/sqlalchemy/pool.py | |
| parent | 38c5e870a7883df0ae104df828217e326f6cff6a (diff) | |
| download | sqlalchemy-d3d10c982c8a44c85a0114c491207297eac7611d.tar.gz | |
- refactor pool a bit so that intent between ConnectionRecord/ConnectionFairy is clear;
make sure that the DBAPI connection passed to the reset-on-return events/dialect hooks
is also a "fairy", so that dictionaries like "info" are available. [ticket:2770]
- rework the execution_options system so that the dialect is given the job of making
any immediate adjustments based on a set event. move the "isolation level" logic to use
this new system. Also work things out so that even engine-level execution options
can be used for things like isolation level; the dialect attaches a connect-event
handler in this case to handle the task.
- to support this new system as well as further extensibiltiy of execution options
add events engine_connect(), set_connection_execution_options(), set_engine_execution_options()
Diffstat (limited to 'lib/sqlalchemy/pool.py')
| -rw-r--r-- | lib/sqlalchemy/pool.py | 198 |
1 files changed, 112 insertions, 86 deletions
diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py index dcf3d9e39..97411dd3a 100644 --- a/lib/sqlalchemy/pool.py +++ b/lib/sqlalchemy/pool.py @@ -25,6 +25,7 @@ from .util import queue as sqla_queue from .util import threading, memoized_property, \ chop_traceback +from collections import deque proxies = {} @@ -217,7 +218,7 @@ class Pool(log.Identified): """ - return _ConnectionFairy(self).checkout() + return _ConnectionFairy.checkout(self) def _create_connection(self): """Called by subclasses to create a new ConnectionRecord.""" @@ -269,18 +270,16 @@ class Pool(log.Identified): """ if not self._use_threadlocal: - return _ConnectionFairy(self).checkout() + return _ConnectionFairy.checkout(self) try: rec = self._threadconns.current() - if rec: - return rec.checkout() except AttributeError: pass + else: + return rec.checkout_existing() - agent = _ConnectionFairy(self) - self._threadconns.current = weakref.ref(agent) - return agent.checkout() + return _ConnectionFairy.checkout(self, self._threadconns) def _return_conn(self, record): """Given a _ConnectionRecord, return it to the :class:`.Pool`. @@ -311,11 +310,11 @@ class Pool(log.Identified): class _ConnectionRecord(object): - finalize_callback = None def __init__(self, pool): self.__pool = pool self.connection = self.__connect() + self.finalize_callback = deque() pool.dispatch.first_connect.\ for_modify(pool.dispatch).\ @@ -326,6 +325,36 @@ class _ConnectionRecord(object): def info(self): return {} + @classmethod + def checkout(cls, pool): + rec = pool._do_get() + dbapi_connection = rec.get_connection() + fairy = _ConnectionFairy(dbapi_connection, rec) + rec.fairy_ref = weakref.ref( + fairy, + lambda ref: _finalize_fairy and \ + _finalize_fairy( + dbapi_connection, + rec, pool, ref, pool._echo) + ) + _refs.add(rec) + if pool._echo: + pool.logger.debug("Connection %r checked out from pool", + dbapi_connection) + return fairy + + def checkin(self): + self.fairy_ref = None + connection = self.connection + pool = self.__pool + while self.finalize_callback: + finalizer = self.finalize_callback.pop() + finalizer(connection) + if pool.dispatch.checkin: + pool.dispatch.checkin(connection, self) + pool._return_conn(self) + + def close(self): if self.connection is not None: self.__pool._close_connection(self.connection) @@ -373,11 +402,15 @@ class _ConnectionRecord(object): raise -def _finalize_fairy(connection, connection_record, pool, ref, echo): +def _finalize_fairy(connection, connection_record, pool, ref, echo, fairy=None): + """Cleanup for a :class:`._ConnectionFairy` whether or not it's already + been garbage collected. + + """ _refs.discard(connection_record) if ref is not None and \ - connection_record.fairy is not ref: + connection_record.fairy_ref is not ref: return if connection is not None: @@ -386,35 +419,31 @@ def _finalize_fairy(connection, connection_record, pool, ref, echo): connection) try: + fairy = fairy or _ConnectionFairy(connection, connection_record) if pool.dispatch.reset: - pool.dispatch.reset(connection, connection_record) + pool.dispatch.reset(fairy, connection_record) if pool._reset_on_return is reset_rollback: if echo: pool.logger.debug("Connection %s rollback-on-return", connection) - pool._dialect.do_rollback(connection) + pool._dialect.do_rollback(fairy) elif pool._reset_on_return is reset_commit: if echo: - pool.logger.debug("Conneciton %s commit-on-return", + pool.logger.debug("Connection %s commit-on-return", connection) - pool._dialect.do_commit(connection) + pool._dialect.do_commit(fairy) + # Immediately close detached instances - if connection_record is None: + if not connection_record: pool._close_connection(connection) except Exception as e: - if connection_record is not None: + if connection_record: connection_record.invalidate(e=e) if isinstance(e, (SystemExit, KeyboardInterrupt)): raise - if connection_record is not None: - connection_record.fairy = None - if connection_record.finalize_callback: - connection_record.finalize_callback(connection) - del connection_record.finalize_callback - if pool.dispatch.checkin: - pool.dispatch.checkin(connection, connection_record) - pool._return_conn(connection_record) + if connection_record: + connection_record.checkin() _refs = set() @@ -424,27 +453,58 @@ class _ConnectionFairy(object): """Proxies a DB-API connection and provides return-on-dereference support.""" - def __init__(self, pool): - self._pool = pool - self.__counter = 0 - self._echo = _echo = pool._should_log_debug() - try: - rec = self._connection_record = pool._do_get() - conn = self.connection = self._connection_record.get_connection() - rec.fairy = weakref.ref( - self, - lambda ref: _finalize_fairy and \ - _finalize_fairy(conn, rec, pool, ref, _echo) - ) - _refs.add(rec) - except: - # helps with endless __getattr__ loops later on - self.connection = None - self._connection_record = None - raise - if self._echo: - self._pool.logger.debug("Connection %r checked out from pool", - self.connection) + def __init__(self, dbapi_connection, connection_record): + self.connection = dbapi_connection + self._connection_record = connection_record + + @classmethod + def checkout(cls, pool, threadconns=None, fairy=None): + if not fairy: + fairy = _ConnectionRecord.checkout(pool) + + fairy._pool = pool + fairy._counter = 0 + fairy._echo = pool._should_log_debug() + + if threadconns is not None: + threadconns.current = weakref.ref(fairy) + + if fairy.connection is None: + raise exc.InvalidRequestError("This connection is closed") + fairy._counter += 1 + + if not pool.dispatch.checkout or fairy._counter != 1: + return fairy + + # Pool listeners can trigger a reconnection on checkout + attempts = 2 + while attempts > 0: + try: + pool.dispatch.checkout(fairy.connection, + fairy._connection_record, + fairy) + return fairy + except exc.DisconnectionError as e: + pool.logger.info( + "Disconnection detected on checkout: %s", e) + fairy._connection_record.invalidate(e) + fairy.connection = fairy._connection_record.get_connection() + attempts -= 1 + + pool.logger.info("Reconnection attempts exhausted on checkout") + fairy.invalidate() + raise exc.InvalidRequestError("This connection is closed") + + def checkout_existing(self): + return _ConnectionFairy.checkout(self._pool, fairy=self) + + def checkin(self): + _finalize_fairy(self.connection, self._connection_record, + self._pool, None, self._echo, fairy=self) + self.connection = None + self._connection_record = None + + _close = checkin @property def _logger(self): @@ -465,10 +525,7 @@ class _ConnectionFairy(object): in subsequent instances of :class:`.ConnectionFairy`. """ - try: - return self._connection_record.info - except AttributeError: - raise exc.InvalidRequestError("This connection is closed") + return self._connection_record.info def invalidate(self, e=None): """Mark this connection as invalidated. @@ -479,10 +536,10 @@ class _ConnectionFairy(object): if self.connection is None: raise exc.InvalidRequestError("This connection is closed") - if self._connection_record is not None: + if self._connection_record: self._connection_record.invalidate(e=e) self.connection = None - self._close() + self.checkin() def cursor(self, *args, **kwargs): return self.connection.cursor(*args, **kwargs) @@ -490,32 +547,6 @@ class _ConnectionFairy(object): def __getattr__(self, key): return getattr(self.connection, key) - def checkout(self): - if self.connection is None: - raise exc.InvalidRequestError("This connection is closed") - self.__counter += 1 - - if not self._pool.dispatch.checkout or self.__counter != 1: - return self - - # Pool listeners can trigger a reconnection on checkout - attempts = 2 - while attempts > 0: - try: - self._pool.dispatch.checkout(self.connection, - self._connection_record, - self) - return self - except exc.DisconnectionError as e: - self._pool.logger.info( - "Disconnection detected on checkout: %s", e) - self._connection_record.invalidate(e) - self.connection = self._connection_record.get_connection() - attempts -= 1 - - self._pool.logger.info("Reconnection attempts exhausted on checkout") - self.invalidate() - raise exc.InvalidRequestError("This connection is closed") def detach(self): """Separate this connection from its Pool. @@ -532,22 +563,17 @@ class _ConnectionFairy(object): if self._connection_record is not None: _refs.remove(self._connection_record) - self._connection_record.fairy = None + self._connection_record.fairy_ref = None self._connection_record.connection = None self._pool._do_return_conn(self._connection_record) self.info = self.info.copy() self._connection_record = None def close(self): - self.__counter -= 1 - if self.__counter == 0: - self._close() + self._counter -= 1 + if self._counter == 0: + self.checkin() - def _close(self): - _finalize_fairy(self.connection, self._connection_record, - self._pool, None, self._echo) - self.connection = None - self._connection_record = None class SingletonThreadPool(Pool): |
