summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/pool.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2012-06-22 12:24:08 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2012-06-22 12:24:08 -0400
commit5f0a7bb152b30dd7b05771725a7ffe16e3af8f8a (patch)
treebc17038b5eb1a82ce41accbba56d9228d540858f /lib/sqlalchemy/pool.py
parent51a3a9ac8a76096a6a25eb2cc7404970561d5123 (diff)
downloadsqlalchemy-5f0a7bb152b30dd7b05771725a7ffe16e3af8f8a.tar.gz
- [bug] Fixed bug whereby
a disconnect detect + dispose that occurs when the QueuePool has threads waiting for connections would leave those threads waiting for the duration of the timeout on the old pool. The fix now notifies those waiters with a special exception case and has them move onto the new pool. This fix may or may not be ported to 0.7. [ticket:2522]
Diffstat (limited to 'lib/sqlalchemy/pool.py')
-rw-r--r--lib/sqlalchemy/pool.py82
1 files changed, 49 insertions, 33 deletions
diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py
index 9aabe689b..f9e0f98dd 100644
--- a/lib/sqlalchemy/pool.py
+++ b/lib/sqlalchemy/pool.py
@@ -22,7 +22,6 @@ from sqlalchemy import exc, log, event, events, interfaces, util
from sqlalchemy.util import queue as sqla_queue
from sqlalchemy.util import threading, memoized_property, \
chop_traceback
-
proxies = {}
def manage(module, **params):
@@ -212,6 +211,17 @@ class Pool(log.Identified):
raise NotImplementedError()
+ def _replace(self):
+ """Dispose + recreate this pool.
+
+ Subclasses may employ special logic to
+ move threads waiting on this pool to the
+ new one.
+
+ """
+ self.dispose()
+ return self.recreate()
+
def connect(self):
"""Return a DBAPI connection from the pool.
@@ -580,6 +590,12 @@ class SingletonThreadPool(Pool):
self._cleanup()
return c
+class DummyLock(object):
+ def acquire(self, wait=True):
+ return True
+ def release(self):
+ pass
+
class QueuePool(Pool):
"""A :class:`.Pool` that imposes a limit on the number of open connections.
@@ -688,37 +704,26 @@ class QueuePool(Pool):
self._max_overflow = max_overflow
self._timeout = timeout
self._overflow_lock = self._max_overflow > -1 and \
- threading.Lock() or None
-
- def recreate(self):
- self.logger.info("Pool recreating")
- return self.__class__(self._creator, pool_size=self._pool.maxsize,
- max_overflow=self._max_overflow,
- timeout=self._timeout,
- recycle=self._recycle, echo=self.echo,
- logging_name=self._orig_logging_name,
- use_threadlocal=self._use_threadlocal,
- _dispatch=self.dispatch)
+ threading.Lock() or DummyLock()
def _do_return_conn(self, conn):
try:
self._pool.put(conn, False)
except sqla_queue.Full:
conn.close()
- if self._overflow_lock is None:
+ self._overflow_lock.acquire()
+ try:
self._overflow -= 1
- else:
- self._overflow_lock.acquire()
- try:
- self._overflow -= 1
- finally:
- self._overflow_lock.release()
+ finally:
+ self._overflow_lock.release()
def _do_get(self):
try:
wait = self._max_overflow > -1 and \
self._overflow >= self._max_overflow
return self._pool.get(wait, self._timeout)
+ except sqla_queue.SAAbort, aborted:
+ return aborted.context._do_get()
except sqla_queue.Empty:
if self._max_overflow > -1 and \
self._overflow >= self._max_overflow:
@@ -730,22 +735,27 @@ class QueuePool(Pool):
"connection timed out, timeout %d" %
(self.size(), self.overflow(), self._timeout))
- if self._overflow_lock is not None:
- self._overflow_lock.acquire()
-
- if self._max_overflow > -1 and \
- self._overflow >= self._max_overflow:
- if self._overflow_lock is not None:
- self._overflow_lock.release()
- return self._do_get()
-
+ self._overflow_lock.acquire()
try:
- con = self._create_connection()
- self._overflow += 1
+ if self._max_overflow > -1 and \
+ self._overflow >= self._max_overflow:
+ return self._do_get()
+ else:
+ con = self._create_connection()
+ self._overflow += 1
+ return con
finally:
- if self._overflow_lock is not None:
- self._overflow_lock.release()
- return con
+ self._overflow_lock.release()
+
+ def recreate(self):
+ self.logger.info("Pool recreating")
+ return self.__class__(self._creator, pool_size=self._pool.maxsize,
+ max_overflow=self._max_overflow,
+ timeout=self._timeout,
+ recycle=self._recycle, echo=self.echo,
+ logging_name=self._orig_logging_name,
+ use_threadlocal=self._use_threadlocal,
+ _dispatch=self.dispatch)
def dispose(self):
while True:
@@ -758,6 +768,12 @@ class QueuePool(Pool):
self._overflow = 0 - self.size()
self.logger.info("Pool disposed. %s", self.status())
+ def _replace(self):
+ self.dispose()
+ np = self.recreate()
+ self._pool.abort(np)
+ return np
+
def status(self):
return "Pool size: %d Connections in pool: %d "\
"Current Overflow: %d Current Checked out "\