summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/pool.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2006-05-25 14:20:23 +0000
committerMike Bayer <mike_mp@zzzcomputing.com>2006-05-25 14:20:23 +0000
commitbb79e2e871d0a4585164c1a6ed626d96d0231975 (patch)
tree6d457ba6c36c408b45db24ec3c29e147fe7504ff /lib/sqlalchemy/pool.py
parent4fc3a0648699c2b441251ba4e1d37a9107bd1986 (diff)
downloadsqlalchemy-bb79e2e871d0a4585164c1a6ed626d96d0231975.tar.gz
merged 0.2 branch into trunk; 0.1 now in sqlalchemy/branches/rel_0_1
Diffstat (limited to 'lib/sqlalchemy/pool.py')
-rw-r--r--lib/sqlalchemy/pool.py86
1 files changed, 48 insertions, 38 deletions
diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py
index 1603c9873..4452f6419 100644
--- a/lib/sqlalchemy/pool.py
+++ b/lib/sqlalchemy/pool.py
@@ -70,41 +70,38 @@ def clear_managers():
class Pool(object):
- def __init__(self, echo = False, use_threadlocal = True, logger=None):
- self._threadconns = weakref.WeakValueDictionary()
+ def __init__(self, echo = False, use_threadlocal = True, logger=None, **kwargs):
+ self._threadconns = {} #weakref.WeakValueDictionary()
self._use_threadlocal = use_threadlocal
- self._echo = echo
+ self.echo = echo
self._logger = logger or util.Logger(origin='pool')
def unique_connection(self):
- return ConnectionFairy(self)
+ return ConnectionFairy(self).checkout()
def connect(self):
if not self._use_threadlocal:
- return ConnectionFairy(self)
+ return ConnectionFairy(self).checkout()
try:
- return self._threadconns[thread.get_ident()]
+ return self._threadconns[thread.get_ident()].checkout()
except KeyError:
- agent = ConnectionFairy(self)
+ agent = ConnectionFairy(self).checkout()
self._threadconns[thread.get_ident()] = agent
return agent
- def return_conn(self, conn):
- if self._echo:
- self.log("return connection to pool")
- self.do_return_conn(conn)
+ def return_conn(self, agent):
+ if self._use_threadlocal:
+ try:
+ del self._threadconns[thread.get_ident()]
+ except KeyError:
+ pass
+ self.do_return_conn(agent.connection)
def get(self):
- if self._echo:
- self.log("get connection from pool")
- self.log(self.status())
return self.do_get()
def return_invalid(self):
- if self._echo:
- self.log("return invalid connection to pool")
- self.log(self.status())
self.do_return_invalid()
def do_get(self):
@@ -125,6 +122,7 @@ class Pool(object):
class ConnectionFairy(object):
def __init__(self, pool, connection=None):
self.pool = pool
+ self.__counter = 0
if connection is not None:
self.connection = connection
else:
@@ -134,16 +132,35 @@ class ConnectionFairy(object):
self.connection = None
self.pool.return_invalid()
raise
+ if self.pool.echo:
+ self.pool.log("Connection %s checked out from pool" % repr(self.connection))
def invalidate(self):
+ if self.pool.echo:
+ self.pool.log("Invalidate connection %s" % repr(self.connection))
+ self.connection.rollback()
self.connection = None
self.pool.return_invalid()
def cursor(self):
return CursorFairy(self, self.connection.cursor())
def __getattr__(self, key):
return getattr(self.connection, key)
+ def checkout(self):
+ if self.connection is None:
+ raise "this connection is closed"
+ self.__counter +=1
+ return self
+ def close(self):
+ self.__counter -=1
+ if self.__counter == 0:
+ self._close()
def __del__(self):
+ self._close()
+ def _close(self):
if self.connection is not None:
- self.pool.return_conn(self.connection)
+ if self.pool.echo:
+ self.pool.log("Connection %s being returned to pool" % repr(self.connection))
+ self.connection.rollback()
+ self.pool.return_conn(self)
self.pool = None
self.connection = None
@@ -156,19 +173,18 @@ class CursorFairy(object):
class SingletonThreadPool(Pool):
"""Maintains one connection per each thread, never moving to another thread. this is
- used for SQLite and other databases with a similar restriction."""
+ used for SQLite."""
def __init__(self, creator, **params):
Pool.__init__(self, **params)
self._conns = {}
self._creator = creator
def status(self):
- return "SingletonThreadPool thread:%d size: %d" % (thread.get_ident(), len(self._conns))
+ return "SingletonThreadPool id:%d thread:%d size: %d" % (id(self), thread.get_ident(), len(self._conns))
def do_return_conn(self, conn):
- if self._conns.get(thread.get_ident(), None) is None:
- self._conns[thread.get_ident()] = conn
-
+ pass
+
def do_return_invalid(self):
try:
del self._conns[thread.get_ident()]
@@ -177,54 +193,48 @@ class SingletonThreadPool(Pool):
def do_get(self):
try:
- c = self._conns[thread.get_ident()]
- if c is None:
- return self._creator()
+ return self._conns[thread.get_ident()]
except KeyError:
c = self._creator()
- self._conns[thread.get_ident()] = None
- return c
+ self._conns[thread.get_ident()] = c
+ return c
class QueuePool(Pool):
"""uses Queue.Queue to maintain a fixed-size list of connections."""
- def __init__(self, creator, pool_size = 5, max_overflow = 10, **params):
+ def __init__(self, creator, pool_size = 5, max_overflow = 10, timeout=30, **params):
Pool.__init__(self, **params)
self._creator = creator
self._pool = Queue.Queue(pool_size)
self._overflow = 0 - pool_size
self._max_overflow = max_overflow
+ self._timeout = timeout
def do_return_conn(self, conn):
- if self._echo:
- self.log("return QP connection to pool")
try:
self._pool.put(conn, False)
except Queue.Full:
self._overflow -= 1
def do_return_invalid(self):
- if self._echo:
- self.log("return invalid connection")
if self._pool.full():
self._overflow -= 1
def do_get(self):
- if self._echo:
- self.log("get QP connection from pool")
- self.log(self.status())
try:
- return self._pool.get(self._max_overflow > -1 and self._overflow >= self._max_overflow)
+ return self._pool.get(self._max_overflow > -1 and self._overflow >= self._max_overflow, self._timeout)
except Queue.Empty:
self._overflow += 1
return self._creator()
- def __del__(self):
+ def dispose(self):
while True:
try:
conn = self._pool.get(False)
conn.close()
except Queue.Empty:
break
+ def __del__(self):
+ self.dispose()
def status(self):
tup = (self.size(), self.checkedin(), self.overflow(), self.checkedout())