diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2009-08-06 21:11:27 +0000 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2009-08-06 21:11:27 +0000 |
| commit | 8fc5005dfe3eb66a46470ad8a8c7b95fc4d6bdca (patch) | |
| tree | ae9e27d12c9fbf8297bb90469509e1cb6a206242 /lib/sqlalchemy/pool.py | |
| parent | 7638aa7f242c6ea3d743aa9100e32be2052546a6 (diff) | |
| download | sqlalchemy-8fc5005dfe3eb66a46470ad8a8c7b95fc4d6bdca.tar.gz | |
merge 0.6 series to trunk.
Diffstat (limited to 'lib/sqlalchemy/pool.py')
| -rw-r--r-- | lib/sqlalchemy/pool.py | 128 |
1 files changed, 57 insertions, 71 deletions
diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py index c4e1af20c..dabdc6e35 100644 --- a/lib/sqlalchemy/pool.py +++ b/lib/sqlalchemy/pool.py @@ -19,7 +19,7 @@ SQLAlchemy connection pool. import weakref, time, threading from sqlalchemy import exc, log -from sqlalchemy import queue as Queue +from sqlalchemy import queue as sqla_queue from sqlalchemy.util import threading, pickle, as_interface proxies = {} @@ -51,7 +51,7 @@ def clear_managers(): All pools and connections are disposed. """ - for manager in proxies.values(): + for manager in proxies.itervalues(): manager.close() proxies.clear() @@ -108,6 +108,7 @@ class Pool(object): self.echo = echo self.listeners = [] self._on_connect = [] + self._on_first_connect = [] self._on_checkout = [] self._on_checkin = [] @@ -178,12 +179,14 @@ class Pool(object): """ - listener = as_interface( - listener, methods=('connect', 'checkout', 'checkin')) + listener = as_interface(listener, + methods=('connect', 'first_connect', 'checkout', 'checkin')) self.listeners.append(listener) if hasattr(listener, 'connect'): self._on_connect.append(listener) + if hasattr(listener, 'first_connect'): + self._on_first_connect.append(listener) if hasattr(listener, 'checkout'): self._on_checkout.append(listener) if hasattr(listener, 'checkin'): @@ -197,6 +200,10 @@ class _ConnectionRecord(object): self.__pool = pool self.connection = self.__connect() self.info = {} + ls = pool.__dict__.pop('_on_first_connect', None) + if ls is not None: + for l in ls: + l.first_connect(self.connection, self) if pool._on_connect: for l in pool._on_connect: l.connect(self.connection, self) @@ -269,8 +276,11 @@ class _ConnectionRecord(object): def _finalize_fairy(connection, connection_record, pool, ref=None): - if ref is not None and connection_record.backref is not ref: + _refs.discard(connection_record) + + if ref is not None and (connection_record.fairy is not ref or isinstance(pool, AssertionPool)): return + if connection is not None: try: if pool._reset_on_return: @@ -284,7 +294,7 @@ def _finalize_fairy(connection, connection_record, pool, ref=None): if isinstance(e, (SystemExit, KeyboardInterrupt)): raise if connection_record is not None: - connection_record.backref = None + connection_record.fairy = None if pool._should_log_info: pool.log("Connection %r being returned to pool" % connection) if pool._on_checkin: @@ -292,6 +302,8 @@ def _finalize_fairy(connection, connection_record, pool, ref=None): l.checkin(connection, connection_record) pool.return_conn(connection_record) +_refs = set() + class _ConnectionFairy(object): """Proxies a DB-API connection and provides return-on-dereference support.""" @@ -303,7 +315,8 @@ class _ConnectionFairy(object): try: rec = self._connection_record = pool.get() conn = self.connection = self._connection_record.get_connection() - self._connection_record.backref = weakref.ref(self, lambda ref:_finalize_fairy(conn, rec, pool, ref)) + rec.fairy = weakref.ref(self, lambda ref:_finalize_fairy(conn, rec, pool, ref)) + _refs.add(rec) except: self.connection = None # helps with endless __getattr__ loops later on self._connection_record = None @@ -402,8 +415,9 @@ class _ConnectionFairy(object): """ if self._connection_record is not None: + _refs.remove(self._connection_record) + self._connection_record.fairy = None self._connection_record.connection = None - self._connection_record.backref = None self._pool.do_return_conn(self._connection_record) self._detached_info = \ self._connection_record.info.copy() @@ -501,10 +515,8 @@ class SingletonThreadPool(Pool): del self._conn.current def cleanup(self): - for conn in list(self._all_conns): - self._all_conns.discard(conn) - if len(self._all_conns) <= self.size: - return + while len(self._all_conns) > self.size: + self._all_conns.pop() def status(self): return "SingletonThreadPool id:%d size: %d" % (id(self), len(self._all_conns)) @@ -593,7 +605,7 @@ class QueuePool(Pool): """ Pool.__init__(self, creator, **params) - self._pool = Queue.Queue(pool_size) + self._pool = sqla_queue.Queue(pool_size) self._overflow = 0 - pool_size self._max_overflow = max_overflow self._timeout = timeout @@ -606,7 +618,7 @@ class QueuePool(Pool): def do_return_conn(self, conn): try: self._pool.put(conn, False) - except Queue.Full: + except sqla_queue.Full: if self._overflow_lock is None: self._overflow -= 1 else: @@ -620,7 +632,7 @@ class QueuePool(Pool): try: wait = self._max_overflow > -1 and self._overflow >= self._max_overflow return self._pool.get(wait, self._timeout) - except Queue.Empty: + except sqla_queue.Empty: if self._max_overflow > -1 and self._overflow >= self._max_overflow: if not wait: return self.do_get() @@ -648,7 +660,7 @@ class QueuePool(Pool): try: conn = self._pool.get(False) conn.close() - except Queue.Empty: + except sqla_queue.Empty: break self._overflow = 0 - self.size() @@ -747,7 +759,8 @@ class StaticPool(Pool): Pool.__init__(self, creator, **params) self._conn = creator() self.connection = _ConnectionRecord(self) - + self.connection = None + def status(self): return "StaticPool" @@ -788,68 +801,41 @@ class AssertionPool(Pool): ## TODO: modify this to handle an arbitrary connection count. - def __init__(self, creator, **params): - """ - Construct an AssertionPool. - - :param creator: a callable function that returns a DB-API - connection object. The function will be called with - parameters. - - :param recycle: If set to non -1, number of seconds between - connection recycling, which means upon checkout, if this - timeout is surpassed the connection will be closed and - replaced with a newly opened connection. Defaults to -1. - - :param echo: If True, connections being pulled and retrieved - from the pool will be logged to the standard output, as well - as pool sizing information. Echoing can also be achieved by - enabling logging for the "sqlalchemy.pool" - namespace. Defaults to False. - - :param use_threadlocal: If set to True, repeated calls to - :meth:`connect` within the same application thread will be - guaranteed to return the same connection object, if one has - already been retrieved from the pool and has not been - returned yet. Offers a slight performance advantage at the - cost of individual transactions by default. The - :meth:`unique_connection` method is provided to bypass the - threadlocal behavior installed into :meth:`connect`. - - :param reset_on_return: If true, reset the database state of - connections returned to the pool. This is typically a - ROLLBACK to release locks and transaction resources. - Disable at your own peril. Defaults to True. - - :param listeners: A list of - :class:`~sqlalchemy.interfaces.PoolListener`-like objects or - dictionaries of callables that receive events when DB-API - connections are created, checked out and checked in to the - pool. - - """ - Pool.__init__(self, creator, **params) - self.connection = _ConnectionRecord(self) - self._conn = self.connection - + def __init__(self, *args, **kw): + self._conn = None + self._checked_out = False + Pool.__init__(self, *args, **kw) + def status(self): return "AssertionPool" - def create_connection(self): - raise AssertionError("Invalid") - def do_return_conn(self, conn): - assert conn is self._conn and self.connection is None - self.connection = conn + if not self._checked_out: + raise AssertionError("connection is not checked out") + self._checked_out = False + assert conn is self._conn def do_return_invalid(self, conn): - raise AssertionError("Invalid") + self._conn = None + self._checked_out = False + + def dispose(self): + self._checked_out = False + self._conn.close() + def recreate(self): + self.log("Pool recreating") + return AssertionPool(self._creator, echo=self._should_log_info, listeners=self.listeners) + def do_get(self): - assert self.connection is not None - c = self.connection - self.connection = None - return c + if self._checked_out: + raise AssertionError("connection is already checked out") + + if not self._conn: + self._conn = self.create_connection() + + self._checked_out = True + return self._conn class _DBProxy(object): """Layers connection pooling behavior on top of a standard DB-API module. |
