summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/pool.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2009-08-06 21:11:27 +0000
committerMike Bayer <mike_mp@zzzcomputing.com>2009-08-06 21:11:27 +0000
commit8fc5005dfe3eb66a46470ad8a8c7b95fc4d6bdca (patch)
treeae9e27d12c9fbf8297bb90469509e1cb6a206242 /lib/sqlalchemy/pool.py
parent7638aa7f242c6ea3d743aa9100e32be2052546a6 (diff)
downloadsqlalchemy-8fc5005dfe3eb66a46470ad8a8c7b95fc4d6bdca.tar.gz
merge 0.6 series to trunk.
Diffstat (limited to 'lib/sqlalchemy/pool.py')
-rw-r--r--lib/sqlalchemy/pool.py128
1 files changed, 57 insertions, 71 deletions
diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py
index c4e1af20c..dabdc6e35 100644
--- a/lib/sqlalchemy/pool.py
+++ b/lib/sqlalchemy/pool.py
@@ -19,7 +19,7 @@ SQLAlchemy connection pool.
import weakref, time, threading
from sqlalchemy import exc, log
-from sqlalchemy import queue as Queue
+from sqlalchemy import queue as sqla_queue
from sqlalchemy.util import threading, pickle, as_interface
proxies = {}
@@ -51,7 +51,7 @@ def clear_managers():
All pools and connections are disposed.
"""
- for manager in proxies.values():
+ for manager in proxies.itervalues():
manager.close()
proxies.clear()
@@ -108,6 +108,7 @@ class Pool(object):
self.echo = echo
self.listeners = []
self._on_connect = []
+ self._on_first_connect = []
self._on_checkout = []
self._on_checkin = []
@@ -178,12 +179,14 @@ class Pool(object):
"""
- listener = as_interface(
- listener, methods=('connect', 'checkout', 'checkin'))
+ listener = as_interface(listener,
+ methods=('connect', 'first_connect', 'checkout', 'checkin'))
self.listeners.append(listener)
if hasattr(listener, 'connect'):
self._on_connect.append(listener)
+ if hasattr(listener, 'first_connect'):
+ self._on_first_connect.append(listener)
if hasattr(listener, 'checkout'):
self._on_checkout.append(listener)
if hasattr(listener, 'checkin'):
@@ -197,6 +200,10 @@ class _ConnectionRecord(object):
self.__pool = pool
self.connection = self.__connect()
self.info = {}
+ ls = pool.__dict__.pop('_on_first_connect', None)
+ if ls is not None:
+ for l in ls:
+ l.first_connect(self.connection, self)
if pool._on_connect:
for l in pool._on_connect:
l.connect(self.connection, self)
@@ -269,8 +276,11 @@ class _ConnectionRecord(object):
def _finalize_fairy(connection, connection_record, pool, ref=None):
- if ref is not None and connection_record.backref is not ref:
+ _refs.discard(connection_record)
+
+ if ref is not None and (connection_record.fairy is not ref or isinstance(pool, AssertionPool)):
return
+
if connection is not None:
try:
if pool._reset_on_return:
@@ -284,7 +294,7 @@ def _finalize_fairy(connection, connection_record, pool, ref=None):
if isinstance(e, (SystemExit, KeyboardInterrupt)):
raise
if connection_record is not None:
- connection_record.backref = None
+ connection_record.fairy = None
if pool._should_log_info:
pool.log("Connection %r being returned to pool" % connection)
if pool._on_checkin:
@@ -292,6 +302,8 @@ def _finalize_fairy(connection, connection_record, pool, ref=None):
l.checkin(connection, connection_record)
pool.return_conn(connection_record)
+_refs = set()
+
class _ConnectionFairy(object):
"""Proxies a DB-API connection and provides return-on-dereference support."""
@@ -303,7 +315,8 @@ class _ConnectionFairy(object):
try:
rec = self._connection_record = pool.get()
conn = self.connection = self._connection_record.get_connection()
- self._connection_record.backref = weakref.ref(self, lambda ref:_finalize_fairy(conn, rec, pool, ref))
+ rec.fairy = weakref.ref(self, lambda ref:_finalize_fairy(conn, rec, pool, ref))
+ _refs.add(rec)
except:
self.connection = None # helps with endless __getattr__ loops later on
self._connection_record = None
@@ -402,8 +415,9 @@ class _ConnectionFairy(object):
"""
if self._connection_record is not None:
+ _refs.remove(self._connection_record)
+ self._connection_record.fairy = None
self._connection_record.connection = None
- self._connection_record.backref = None
self._pool.do_return_conn(self._connection_record)
self._detached_info = \
self._connection_record.info.copy()
@@ -501,10 +515,8 @@ class SingletonThreadPool(Pool):
del self._conn.current
def cleanup(self):
- for conn in list(self._all_conns):
- self._all_conns.discard(conn)
- if len(self._all_conns) <= self.size:
- return
+ while len(self._all_conns) > self.size:
+ self._all_conns.pop()
def status(self):
return "SingletonThreadPool id:%d size: %d" % (id(self), len(self._all_conns))
@@ -593,7 +605,7 @@ class QueuePool(Pool):
"""
Pool.__init__(self, creator, **params)
- self._pool = Queue.Queue(pool_size)
+ self._pool = sqla_queue.Queue(pool_size)
self._overflow = 0 - pool_size
self._max_overflow = max_overflow
self._timeout = timeout
@@ -606,7 +618,7 @@ class QueuePool(Pool):
def do_return_conn(self, conn):
try:
self._pool.put(conn, False)
- except Queue.Full:
+ except sqla_queue.Full:
if self._overflow_lock is None:
self._overflow -= 1
else:
@@ -620,7 +632,7 @@ class QueuePool(Pool):
try:
wait = self._max_overflow > -1 and self._overflow >= self._max_overflow
return self._pool.get(wait, self._timeout)
- except Queue.Empty:
+ except sqla_queue.Empty:
if self._max_overflow > -1 and self._overflow >= self._max_overflow:
if not wait:
return self.do_get()
@@ -648,7 +660,7 @@ class QueuePool(Pool):
try:
conn = self._pool.get(False)
conn.close()
- except Queue.Empty:
+ except sqla_queue.Empty:
break
self._overflow = 0 - self.size()
@@ -747,7 +759,8 @@ class StaticPool(Pool):
Pool.__init__(self, creator, **params)
self._conn = creator()
self.connection = _ConnectionRecord(self)
-
+ self.connection = None
+
def status(self):
return "StaticPool"
@@ -788,68 +801,41 @@ class AssertionPool(Pool):
## TODO: modify this to handle an arbitrary connection count.
- def __init__(self, creator, **params):
- """
- Construct an AssertionPool.
-
- :param creator: a callable function that returns a DB-API
- connection object. The function will be called with
- parameters.
-
- :param recycle: If set to non -1, number of seconds between
- connection recycling, which means upon checkout, if this
- timeout is surpassed the connection will be closed and
- replaced with a newly opened connection. Defaults to -1.
-
- :param echo: If True, connections being pulled and retrieved
- from the pool will be logged to the standard output, as well
- as pool sizing information. Echoing can also be achieved by
- enabling logging for the "sqlalchemy.pool"
- namespace. Defaults to False.
-
- :param use_threadlocal: If set to True, repeated calls to
- :meth:`connect` within the same application thread will be
- guaranteed to return the same connection object, if one has
- already been retrieved from the pool and has not been
- returned yet. Offers a slight performance advantage at the
- cost of individual transactions by default. The
- :meth:`unique_connection` method is provided to bypass the
- threadlocal behavior installed into :meth:`connect`.
-
- :param reset_on_return: If true, reset the database state of
- connections returned to the pool. This is typically a
- ROLLBACK to release locks and transaction resources.
- Disable at your own peril. Defaults to True.
-
- :param listeners: A list of
- :class:`~sqlalchemy.interfaces.PoolListener`-like objects or
- dictionaries of callables that receive events when DB-API
- connections are created, checked out and checked in to the
- pool.
-
- """
- Pool.__init__(self, creator, **params)
- self.connection = _ConnectionRecord(self)
- self._conn = self.connection
-
+ def __init__(self, *args, **kw):
+ self._conn = None
+ self._checked_out = False
+ Pool.__init__(self, *args, **kw)
+
def status(self):
return "AssertionPool"
- def create_connection(self):
- raise AssertionError("Invalid")
-
def do_return_conn(self, conn):
- assert conn is self._conn and self.connection is None
- self.connection = conn
+ if not self._checked_out:
+ raise AssertionError("connection is not checked out")
+ self._checked_out = False
+ assert conn is self._conn
def do_return_invalid(self, conn):
- raise AssertionError("Invalid")
+ self._conn = None
+ self._checked_out = False
+
+ def dispose(self):
+ self._checked_out = False
+ self._conn.close()
+ def recreate(self):
+ self.log("Pool recreating")
+ return AssertionPool(self._creator, echo=self._should_log_info, listeners=self.listeners)
+
def do_get(self):
- assert self.connection is not None
- c = self.connection
- self.connection = None
- return c
+ if self._checked_out:
+ raise AssertionError("connection is already checked out")
+
+ if not self._conn:
+ self._conn = self.create_connection()
+
+ self._checked_out = True
+ return self._conn
class _DBProxy(object):
"""Layers connection pooling behavior on top of a standard DB-API module.