diff options
| author | Jason Kirtland <jek@discorporate.us> | 2007-07-28 19:51:55 +0000 |
|---|---|---|
| committer | Jason Kirtland <jek@discorporate.us> | 2007-07-28 19:51:55 +0000 |
| commit | 9f100231798d83f2bf4a53494eb5199864a0094d (patch) | |
| tree | 829e6165c1f38ae042ac5c41da673d1a397370a7 /lib/sqlalchemy | |
| parent | 7a0a1cbc817eb0cab90118f288e9a65c7ac35aaa (diff) | |
| download | sqlalchemy-9f100231798d83f2bf4a53494eb5199864a0094d.tar.gz | |
Added pool hooks for connection creation, check out and check in.
Diffstat (limited to 'lib/sqlalchemy')
| -rw-r--r-- | lib/sqlalchemy/exceptions.py | 4 | ||||
| -rw-r--r-- | lib/sqlalchemy/interfaces.py | 51 | ||||
| -rw-r--r-- | lib/sqlalchemy/pool.py | 62 |
3 files changed, 114 insertions, 3 deletions
diff --git a/lib/sqlalchemy/exceptions.py b/lib/sqlalchemy/exceptions.py index 55c345bd7..7fe5cf518 100644 --- a/lib/sqlalchemy/exceptions.py +++ b/lib/sqlalchemy/exceptions.py @@ -89,3 +89,7 @@ class DBAPIError(SQLAlchemyError): def __init__(self, message, orig): SQLAlchemyError.__init__(self, "(%s) (%s) %s"% (message, orig.__class__.__name__, str(orig))) self.orig = orig + +class DisconnectionError(SQLAlchemyError): + """Raised within ``Pool`` when a disconnect is detected on a raw DBAPI connection.""" + pass diff --git a/lib/sqlalchemy/interfaces.py b/lib/sqlalchemy/interfaces.py new file mode 100644 index 000000000..5df19ceef --- /dev/null +++ b/lib/sqlalchemy/interfaces.py @@ -0,0 +1,51 @@ +# interfaces.py +# Copyright (C) 2007 Jason Kirtland jek@discorporate.us +# +# This module is part of SQLAlchemy and is released under +# the MIT License: http://www.opensource.org/licenses/mit-license.php + + +class PoolListener(object): + """Hooks into the lifecycle of connections in a ``Pool``. + + """ + + def connect(dbapi_con, con_record): + """Called once for each new DBAPI connection or pool's ``creator()``. + + dbapi_con: + A newly connected raw DBAPI connection (not a SQLAlchemy + ``Connection`` wrapper). + + con_record: + The ``_ConnectionRecord`` that currently owns the connection + """ + + def checkout(dbapi_con, con_record): + """Called when a connection is retrieved from the pool. + + dbapi_con: + A raw DBAPI connection + + con_record: + The ``_ConnectionRecord`` that currently owns the connection + + If you raise an ``exceptions.DisconnectionError``, the current + connection will be disposed and a fresh connection retrieved. + Processing of all checkout listeners will abort and restart + using the new connection. + """ + + def checkin(dbapi_con, con_record): + """Called when a connection returns to the pool. + + Note that the connection may be closed, and may be None if the + connection has been invalidated. ``checkin`` will not be called + for detached connections. (They do not return to the pool.) + + dbapi_con: + A raw DBAPI connection + + con_record: + The _ConnectionRecord that currently owns the connection + """ diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py index f86e14ab1..02f7b1527 100644 --- a/lib/sqlalchemy/pool.py +++ b/lib/sqlalchemy/pool.py @@ -111,6 +111,11 @@ class Pool(object): surpassed the connection will be closed and replaced with a newly opened connection. Defaults to -1. + listeners + A list of ``PoolListener``-like objects that receive events when + DBAPI connections are created, checked out and checked in to the + pool. + auto_close_cursors Cursors, returned by ``connection.cursor()``, are tracked and are automatically closed when the connection is returned to the @@ -126,8 +131,9 @@ class Pool(object): False, then no cursor processing occurs upon checkin. """ - def __init__(self, creator, recycle=-1, echo=None, use_threadlocal=False, auto_close_cursors=True, - disallow_open_cursors=False): + def __init__(self, creator, recycle=-1, echo=None, use_threadlocal=False, + auto_close_cursors=True, disallow_open_cursors=False, + listeners=None): self.logger = logging.instance_logger(self) self._threadconns = weakref.WeakValueDictionary() self._creator = creator @@ -136,6 +142,13 @@ class Pool(object): self.auto_close_cursors = auto_close_cursors self.disallow_open_cursors = disallow_open_cursors self.echo = echo + self.listeners = [] + self._on_connect = [] + self._on_checkout = [] + self._on_checkin = [] + if listeners: + for l in listeners: + self.add_listener(l) echo = logging.echo_property() def unique_connection(self): @@ -183,6 +196,17 @@ class Pool(object): def status(self): raise NotImplementedError() + def add_listener(self, listener): + """Add a ``PoolListener``-like object to this pool.""" + + self.listeners.append(listener) + if hasattr(listener, 'connect'): + self._on_connect.append(listener) + if hasattr(listener, 'checkout'): + self._on_checkout.append(listener) + if hasattr(listener, 'checkin'): + self._on_checkin.append(listener) + def log(self, msg): self.logger.info(msg) @@ -191,6 +215,9 @@ class _ConnectionRecord(object): self.__pool = pool self.connection = self.__connect() self.properties = {} + if pool._on_connect: + for l in pool._on_connect: + l.connect(self.connection, self) def close(self): if self.connection is not None: @@ -209,11 +236,17 @@ class _ConnectionRecord(object): if self.connection is None: self.connection = self.__connect() self.properties.clear() + if self.__pool._on_connect: + for l in self.__pool._on_connect: + l.connect(self.connection, self) elif (self.__pool._recycle > -1 and time.time() - self.starttime > self.__pool._recycle): self.__pool.log("Connection %s exceeded timeout; recycling" % repr(self.connection)) self.__close() self.connection = self.__connect() self.properties.clear() + if self.__pool._on_connect: + for l in self.__pool._on_connect: + l.connect(self.connection, self) return self.connection def __close(self): @@ -305,7 +338,27 @@ class _ConnectionFairy(object): if self.connection is None: raise exceptions.InvalidRequestError("This connection is closed") self.__counter +=1 - return self + + if not self._pool._on_checkout or self.__counter != 1: + return self + + # Pool listeners can trigger a reconnection on checkout + attempts = 2 + while attempts > 0: + try: + for l in self._pool._on_checkout: + l.checkout(self.connection, self._connection_record) + return self + except exceptions.DisconnectionError, e: + self._pool.log( + "Disconnection detected on checkout: %s" % (str(e))) + self._connection_record.invalidate(e) + self.connection = self._connection_record.get_connection() + attempts -= 1 + + self._pool.log("Reconnection attempts exhausted on checkout") + self.invalidate() + raise exceptions.InvalidRequestError("This connection is closed") def detach(self): """Separate this Connection from its Pool. @@ -357,6 +410,9 @@ class _ConnectionFairy(object): if self._connection_record is not None: if self._pool.echo: self._pool.log("Connection %s being returned to pool" % repr(self.connection)) + if self._pool._on_checkin: + for l in self._pool._on_checkin: + l.checkin(self.connection, self._connection_record) self._pool.return_conn(self) self.connection = None self._connection_record = None |
