summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/pool.py')
-rw-r--r--lib/sqlalchemy/pool.py364
1 files changed, 226 insertions, 138 deletions
diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py
index 640277270..787fd059f 100644
--- a/lib/sqlalchemy/pool.py
+++ b/lib/sqlalchemy/pool.py
@@ -5,17 +5,20 @@
# the MIT License: http://www.opensource.org/licenses/mit-license.php
-"""provides a connection pool implementation, which optionally manages connections
-on a thread local basis. Also provides a DBAPI2 transparency layer so that pools can
-be managed automatically, based on module type and connect arguments,
- simply by calling regular DBAPI connect() methods."""
+"""Provide a connection pool implementation, which optionally manages
+connections on a thread local basis.
+
+Also provides a DBAPI2 transparency layer so that pools can be managed
+automatically, based on module type and connect arguments, simply by
+calling regular DBAPI connect() methods.
+"""
import weakref, string, time, sys, traceback
try:
import cPickle as pickle
except:
import pickle
-
+
from sqlalchemy import exceptions, logging
from sqlalchemy import queue as Queue
@@ -27,72 +30,101 @@ except:
proxies = {}
def manage(module, **params):
- """given a DBAPI2 module and pool management parameters, returns a proxy for the module
- that will automatically pool connections, creating new connection pools for each
- distinct set of connection arguments sent to the decorated module's connect() function.
+ """Return a proxy for module that automatically pools connections.
+
+ Given a DBAPI2 module and pool management parameters, returns a
+ proxy for the module that will automatically pool connections,
+ creating new connection pools for each distinct set of connection
+ arguments sent to the decorated module's connect() function.
Arguments:
- module : a DBAPI2 database module.
- poolclass=QueuePool : the class used by the pool module to provide pooling.
-
- Options:
- See Pool for options.
+ module
+ A DBAPI2 database module.
+
+ poolclass
+ The class used by the pool module to provide pooling.
+ Defaults to ``QueuePool``.
+
+ See the ``Pool`` class for options.
"""
try:
return proxies[module]
except KeyError:
- return proxies.setdefault(module, _DBProxy(module, **params))
+ return proxies.setdefault(module, _DBProxy(module, **params))
def clear_managers():
- """removes all current DBAPI2 managers. all pools and connections are disposed."""
+ """Remove all current DBAPI2 managers.
+
+ All pools and connections are disposed.
+ """
+
for manager in proxies.values():
manager.close()
proxies.clear()
-
+
class Pool(object):
- """Base Pool class. This is an abstract class, which is implemented by various subclasses
- including:
-
- QueuePool - pools multiple connections using Queue.Queue
-
- SingletonThreadPool - stores a single connection per execution thread
-
- NullPool - doesnt do any pooling; opens and closes connections
-
- AssertionPool - stores only one connection, and asserts that only one connection is checked out at a time.
-
- the main argument, "creator", is a callable function that returns a newly connected DBAPI connection
- object.
-
+ """Base Pool class.
+
+ This is an abstract class, which is implemented by various
+ subclasses including:
+
+ QueuePool
+ Pools multiple connections using ``Queue.Queue``.
+
+ SingletonThreadPool
+ Stores a single connection per execution thread.
+
+ NullPool
+ Doesn't do any pooling; opens and closes connections.
+
+ AssertionPool
+ Stores only one connection, and asserts that only one connection
+ is checked out at a time.
+
+ The main argument, `creator`, is a callable function that returns
+ a newly connected DBAPI connection object.
+
Options that are understood by Pool are:
-
- echo=False : if set to True, connections being pulled and retrieved from/to the pool will
- be logged to the standard output, as well as pool sizing information. Echoing can also
- be achieved by enabling logging for the "sqlalchemy.pool" namespace.
-
- use_threadlocal=True : if set to True, repeated calls to connect() within the same
- application thread will be guaranteed to return the same connection object, if one has
- already been retrieved from the pool and has not been returned yet. This allows code to
- retrieve a connection from the pool, and then while still holding on to that connection,
- to call other functions which also ask the pool for a connection of the same arguments;
- those functions will act upon the same connection that the calling method is using.
-
- recycle=-1 : if set to non -1, a number of seconds between connection recycling, which
- means upon checkout, if this timeout is surpassed the connection will be closed and replaced
- with a newly opened connection.
-
- auto_close_cursors = True : cursors, returned by connection.cursor(), are tracked and are
- automatically closed when the connection is returned to the pool. some DBAPIs like MySQLDB
- become unstable if cursors remain open.
-
- disallow_open_cursors = False : if auto_close_cursors is False, and disallow_open_cursors is True,
- will raise an exception if an open cursor is detected upon connection checkin.
-
- If auto_close_cursors and disallow_open_cursors are both False, then no cursor processing
- occurs upon checkin.
-
+
+ echo
+ If set to True, connections being pulled and retrieved from/to
+ the pool will be logged to the standard output, as well as pool
+ sizing information. Echoing can also be achieved by enabling
+ logging for the "sqlalchemy.pool" namespace. Defaults to False.
+
+ use_threadlocal
+ If set to True, repeated calls to ``connect()`` within the same
+ application thread will be guaranteed to return the same
+ connection object, if one has already been retrieved from the
+ pool and has not been returned yet. This allows code to retrieve
+ a connection from the pool, and then while still holding on to
+ that connection, to call other functions which also ask the pool
+ for a connection of the same arguments; those functions will act
+ upon the same connection that the calling method is using.
+ Defaults to True.
+
+ recycle
+ If set to non -1, a number of seconds between connection
+ recycling, which means upon checkout, if this timeout is
+ surpassed the connection will be closed and replaced with a
+ newly opened connection. Defaults to -1.
+
+ auto_close_cursors
+ Cursors, returned by ``connection.cursor()``, are tracked and
+ are automatically closed when the connection is returned to the
+ pool. Some DBAPIs like MySQLDB become unstable if cursors
+ remain open. Defaults to True.
+
+ disallow_open_cursors
+ If `auto_close_cursors` is False, and `disallow_open_cursors` is
+ True, will raise an exception if an open cursor is detected upon
+ connection checkin. Defaults to False.
+
+ If `auto_close_cursors` and `disallow_open_cursors` are both
+ False, then no cursor processing occurs upon checkin.
"""
+
def __init__(self, creator, recycle=-1, echo=None, use_threadlocal=False, auto_close_cursors=True, disallow_open_cursors=False):
self.logger = logging.instance_logger(self)
self._threadconns = weakref.WeakValueDictionary()
@@ -103,17 +135,17 @@ class Pool(object):
self.disallow_open_cursors = disallow_open_cursors
self.echo = echo
echo = logging.echo_property()
-
+
def unique_connection(self):
return _ConnectionFairy(self).checkout()
-
+
def create_connection(self):
return _ConnectionRecord(self)
-
+
def connect(self):
if not self._use_threadlocal:
return _ConnectionFairy(self).checkout()
-
+
try:
return self._threadconns[thread.get_ident()].connfairy().checkout()
except KeyError:
@@ -126,13 +158,13 @@ class Pool(object):
def get(self):
return self.do_get()
-
+
def do_get(self):
raise NotImplementedError()
-
+
def do_return_conn(self, conn):
raise NotImplementedError()
-
+
def status(self):
raise NotImplementedError()
@@ -141,19 +173,21 @@ class Pool(object):
def dispose(self):
raise NotImplementedError()
-
class _ConnectionRecord(object):
def __init__(self, pool):
self.__pool = pool
self.connection = self.__connect()
+
def close(self):
self.__pool.log("Closing connection %s" % repr(self.connection))
self.connection.close()
+
def invalidate(self):
self.__pool.log("Invalidate connection %s" % repr(self.connection))
self.__close()
self.connection = None
+
def get_connection(self):
if self.connection is None:
self.connection = self.__connect()
@@ -162,12 +196,14 @@ class _ConnectionRecord(object):
self.__close()
self.connection = self.__connect()
return self.connection
+
def __close(self):
try:
self.__pool.log("Closing connection %s" % (repr(self.connection)))
self.connection.close()
except Exception, e:
self.__pool.log("Connection %s threw an error on close: %s" % (repr(self.connection), str(e)))
+
def __connect(self):
try:
self.starttime = time.time()
@@ -179,12 +215,14 @@ class _ConnectionRecord(object):
raise
class _ThreadFairy(object):
- """marks a thread identifier as owning a connection, for a thread local pool."""
+ """Mark a thread identifier as owning a connection, for a thread local pool."""
+
def __init__(self, connfairy):
self.connfairy = weakref.ref(connfairy)
-
+
class _ConnectionFairy(object):
- """proxies a DBAPI connection object and provides return-on-dereference support"""
+ """Proxy a DBAPI connection object and provides return-on-dereference support."""
+
def __init__(self, pool):
self._threadfairy = _ThreadFairy(self)
self._cursors = weakref.WeakKeyDictionary()
@@ -199,6 +237,7 @@ class _ConnectionFairy(object):
raise
if self.__pool.echo:
self.__pool.log("Connection %s checked out from pool" % repr(self.connection))
+
def invalidate(self):
if self.connection is None:
raise exceptions.InvalidRequestError("This connection is closed")
@@ -206,29 +245,36 @@ class _ConnectionFairy(object):
self.connection = None
self._cursors = None
self._close()
+
def cursor(self, *args, **kwargs):
try:
return _CursorFairy(self, self.connection.cursor(*args, **kwargs))
except Exception, e:
self.invalidate()
raise
+
def __getattr__(self, key):
return getattr(self.connection, key)
+
def checkout(self):
if self.connection is None:
raise exceptions.InvalidRequestError("This connection is closed")
self.__counter +=1
- return self
+ return self
+
def close_open_cursors(self):
if self._cursors is not None:
for c in list(self._cursors):
c.close()
+
def close(self):
self.__counter -=1
if self.__counter == 0:
self._close()
+
def __del__(self):
self._close()
+
def _close(self):
if self._cursors is not None:
# cursors should be closed before connection is returned to the pool. some dbapis like
@@ -252,31 +298,38 @@ class _ConnectionFairy(object):
self._connection_record = None
self._threadfairy = None
self._cursors = None
-
+
class _CursorFairy(object):
def __init__(self, parent, cursor):
self.__parent = parent
self.__parent._cursors[self]=True
self.cursor = cursor
+
def invalidate(self):
self.__parent.invalidate()
+
def close(self):
if self in self.__parent._cursors:
del self.__parent._cursors[self]
self.cursor.close()
+
def __getattr__(self, key):
return getattr(self.cursor, key)
-
+
class SingletonThreadPool(Pool):
- """Maintains one connection per each thread, never moving a connection to a thread
- other than the one which it was created in.
-
- this is used for SQLite, which both does not handle multithreading by default,
- and also requires a singleton connection if a :memory: database is being used.
-
- options are the same as those of Pool, as well as:
-
- pool_size=5 - the number of threads in which to maintain connections at once."""
+ """Maintain one connection per each thread, never moving a
+ connection to a thread other than the one which it was created in.
+
+ This is used for SQLite, which both does not handle multithreading
+ by default, and also requires a singleton connection if a :memory:
+ database is being used.
+
+ Options are the same as those of Pool, as well as:
+
+ pool_size : 5
+ The number of threads in which to maintain connections at once.
+ """
+
def __init__(self, creator, pool_size=5, **params):
params['use_threadlocal'] = True
Pool.__init__(self, creator, **params)
@@ -291,13 +344,13 @@ class SingletonThreadPool(Pool):
# sqlite won't even let you close a conn from a thread that didn't create it
pass
del self._conns[key]
-
+
def dispose_local(self):
try:
del self._conns[thread.get_ident()]
except KeyError:
pass
-
+
def cleanup(self):
for key in self._conns.keys():
try:
@@ -306,13 +359,13 @@ class SingletonThreadPool(Pool):
pass
if len(self._conns) <= self.size:
return
-
+
def status(self):
return "SingletonThreadPool id:%d thread:%d size: %d" % (id(self), thread.get_ident(), len(self._conns))
def do_return_conn(self, conn):
pass
-
+
def do_get(self):
try:
return self._conns[thread.get_ident()]
@@ -322,35 +375,45 @@ class SingletonThreadPool(Pool):
if len(self._conns) > self.size:
self.cleanup()
return c
-
+
class QueuePool(Pool):
- """uses Queue.Queue to maintain a fixed-size list of connections.
-
- Arguments include all those used by the base Pool class, as well as:
-
- pool_size=5 : the size of the pool to be maintained. This is the
- largest number of connections that will be kept persistently in the pool. Note that the
- pool begins with no connections; once this number of connections is requested, that
- number of connections will remain.
-
- max_overflow=10 : the maximum overflow size of the pool. When the number of checked-out
- connections reaches the size set in pool_size, additional connections will be returned up
- to this limit. When those additional connections are returned to the pool, they are
- disconnected and discarded. It follows then that the total number of simultaneous
- connections the pool will allow is pool_size + max_overflow, and the total number of
- "sleeping" connections the pool will allow is pool_size. max_overflow can be set to -1 to
- indicate no overflow limit; no limit will be placed on the total number of concurrent
- connections.
-
- timeout=30 : the number of seconds to wait before giving up on returning a connection
+ """Use ``Queue.Queue`` to maintain a fixed-size list of connections.
+
+ Arguments include all those used by the base Pool class, as well
+ as:
+
+ pool_size
+ The size of the pool to be maintained. This is the largest
+ number of connections that will be kept persistently in the
+ pool. Note that the pool begins with no connections; once this
+ number of connections is requested, that number of connections
+ will remain. Defaults to 5.
+
+ max_overflow
+ The maximum overflow size of the pool. When the number of
+ checked-out connections reaches the size set in pool_size,
+ additional connections will be returned up to this limit. When
+ those additional connections are returned to the pool, they are
+ disconnected and discarded. It follows then that the total
+ number of simultaneous connections the pool will allow is
+ pool_size + `max_overflow`, and the total number of "sleeping"
+ connections the pool will allow is pool_size. `max_overflow` can
+ be set to -1 to indicate no overflow limit; no limit will be
+ placed on the total number of concurrent connections. Defaults
+ to 10.
+
+ timeout
+ The number of seconds to wait before giving up on returning a
+ connection. Defaults to 30.
"""
+
def __init__(self, creator, pool_size = 5, max_overflow = 10, timeout=30, **params):
Pool.__init__(self, creator, **params)
self._pool = Queue.Queue(pool_size)
self._overflow = 0 - pool_size
self._max_overflow = max_overflow
self._timeout = timeout
-
+
def do_return_conn(self, conn):
try:
self._pool.put(conn, False)
@@ -374,29 +437,33 @@ class QueuePool(Pool):
conn.close()
except Queue.Empty:
break
-
+
self._overflow = 0 - self.size()
self.log("Pool disposed. " + self.status())
def status(self):
tup = (self.size(), self.checkedin(), self.overflow(), self.checkedout())
return "Pool size: %d Connections in pool: %d Current Overflow: %d Current Checked out connections: %d" % tup
-
+
def size(self):
return self._pool.maxsize
-
+
def checkedin(self):
return self._pool.qsize()
-
+
def overflow(self):
return self._overflow
-
+
def checkedout(self):
return self._pool.maxsize - self._pool.qsize() + self._overflow
class NullPool(Pool):
- """a Pool implementation which does not pool connections; instead
- it literally opens and closes the underlying DBAPI connection per each connection open/close."""
+ """A Pool implementation which does not pool connections.
+
+ Instead it literally opens and closes the underlying DBAPI
+ connection per each connection open/close.
+ """
+
def status(self):
return "NullPool"
@@ -407,14 +474,19 @@ class NullPool(Pool):
pass
def do_get(self):
- return self.create_connection()
+ return self.create_connection()
class AssertionPool(Pool):
- """a Pool implementation which will raise an exception
- if more than one connection is checked out at a time. Useful for debugging
- code that is using more connections than desired.
-
- TODO: modify this to handle an arbitrary connection count."""
+ """A Pool implementation that allows at most one checked out
+ connection at a time.
+
+ This will raise an exception if more than one connection is
+ checked out at a time. Useful for debugging code that is using
+ more connections than desired.
+ """
+
+ ## TODO: modify this to handle an arbitrary connection count.
+
def __init__(self, creator, **params):
Pool.__init__(self, creator, **params)
self.connection = _ConnectionRecord(self)
@@ -438,17 +510,25 @@ class AssertionPool(Pool):
c = self.connection
self.connection = None
return c
-
+
class _DBProxy(object):
- """proxies a DBAPI2 connect() call to a pooled connection keyed to the specific connect
- parameters. other attributes are proxied through via __getattr__."""
-
+ """Proxy a DBAPI2 connect() call to a pooled connection keyed to
+ the specific connect parameters. Other attributes are proxied
+ through via __getattr__.
+ """
+
def __init__(self, module, poolclass = QueuePool, **params):
+ """Initialize a new proxy.
+
+ module
+ a DBAPI2 module.
+
+ poolclass
+ a Pool class, defaulting to QueuePool.
+
+ Other parameters are sent to the Pool object's constructor.
"""
- module is a DBAPI2 module
- poolclass is a Pool class, defaulting to QueuePool.
- other parameters are sent to the Pool object's constructor.
- """
+
self.module = module
self.params = params
self.poolclass = poolclass
@@ -460,10 +540,10 @@ class _DBProxy(object):
def __del__(self):
self.close()
-
+
def __getattr__(self, key):
return getattr(self.module, key)
-
+
def get_pool(self, *args, **params):
key = self._serialize(*args, **params)
try:
@@ -472,24 +552,32 @@ class _DBProxy(object):
pool = self.poolclass(lambda: self.module.connect(*args, **params), **self.params)
self.pools[key] = pool
return pool
-
+
def connect(self, *args, **params):
- """connects to a database using this DBProxy's module and the given connect
- arguments. if the arguments match an existing pool, the connection will be returned
- from the pool's current thread-local connection instance, or if there is no
- thread-local connection instance it will be checked out from the set of pooled
- connections. If the pool has no available connections and allows new connections to
- be created, a new database connection will be made."""
+ """Activate a connection to the database.
+
+ Connect to the database using this DBProxy's module and the
+ given connect arguments. If the arguments match an existing
+ pool, the connection will be returned from the pool's current
+ thread-local connection instance, or if there is no
+ thread-local connection instance it will be checked out from
+ the set of pooled connections.
+
+ If the pool has no available connections and allows new
+ connections to be created, a new database connection will be
+ made.
+ """
+
return self.get_pool(*args, **params).connect()
-
+
def dispose(self, *args, **params):
- """disposes the connection pool referenced by the given connect arguments."""
+ """Dispose the connection pool referenced by the given connect arguments."""
+
key = self._serialize(*args, **params)
try:
del self.pools[key]
except KeyError:
pass
-
+
def _serialize(self, *args, **params):
return pickle.dumps([args, params])
-