summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/pool.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2008-08-13 22:41:17 +0000
committerMike Bayer <mike_mp@zzzcomputing.com>2008-08-13 22:41:17 +0000
commitcd7678a965594ff2db153a7cade0fe8555bd0d38 (patch)
tree51d029be7b976c0c9da0cb0f265435b5d1caa0e9 /lib/sqlalchemy/pool.py
parentc374f7b8b52b4169d41cf8d2cfe310ed5fd5f925 (diff)
downloadsqlalchemy-cd7678a965594ff2db153a7cade0fe8555bd0d38.tar.gz
- with 2.3 support dropped,
all usage of thread.get_ident() is removed, and replaced with threading.local() usage. this allows potentially faster and safer thread local access.
Diffstat (limited to 'lib/sqlalchemy/pool.py')
-rw-r--r--lib/sqlalchemy/pool.py88
1 files changed, 42 insertions, 46 deletions
diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py
index 6cc05b287..ddf7cb51e 100644
--- a/lib/sqlalchemy/pool.py
+++ b/lib/sqlalchemy/pool.py
@@ -16,7 +16,7 @@ regular DB-API connect() methods to be transparently managed by a
SQLAlchemy connection pool.
"""
-import weakref, time
+import weakref, time, threading
from sqlalchemy import exc, log
from sqlalchemy import queue as Queue
@@ -119,11 +119,7 @@ class Pool(object):
def __init__(self, creator, recycle=-1, echo=None, use_threadlocal=False,
reset_on_return=True, listeners=None):
self.logger = log.instance_logger(self, echoflag=echo)
- # the WeakValueDictionary works more nicely than a regular dict of
- # weakrefs. the latter can pile up dead reference objects which don't
- # get cleaned out. WVD adds from 1-6 method calls to a checkout
- # operation.
- self._threadconns = weakref.WeakValueDictionary()
+ self._threadconns = threading.local()
self._creator = creator
self._recycle = recycle
self._use_threadlocal = use_threadlocal
@@ -165,15 +161,15 @@ class Pool(object):
return _ConnectionFairy(self).checkout()
try:
- return self._threadconns[thread.get_ident()].checkout()
- except KeyError:
+ return self._threadconns.current().checkout()
+ except AttributeError:
agent = _ConnectionFairy(self)
- self._threadconns[thread.get_ident()] = agent
+ self._threadconns.current = weakref.ref(agent)
return agent.checkout()
def return_conn(self, record):
- if self._use_threadlocal and thread.get_ident() in self._threadconns:
- del self._threadconns[thread.get_ident()]
+ if self._use_threadlocal and hasattr(self._threadconns, "current"):
+ del self._threadconns.current
self.do_return_conn(record)
def get(self):
@@ -286,8 +282,6 @@ class _ConnectionRecord(object):
self.__pool.log("Error on connect(): %s" % e)
raise
- properties = property(lambda self: self.info,
- doc="A synonym for .info, will be removed in 0.5.")
def _finalize_fairy(connection, connection_record, pool, ref=None):
if ref is not None and connection_record.backref is not ref:
@@ -331,11 +325,16 @@ class _ConnectionFairy(object):
self._pool.log("Connection %r checked out from pool" %
self.connection)
- _logger = property(lambda self: self._pool.logger)
+ @property
+ def _logger(self):
+ return self._pool.logger
- is_valid = property(lambda self:self.connection is not None)
+ @property
+ def is_valid(self):
+ return self.connection is not None
- def _get_info(self):
+ @property
+ def info(self):
"""An info collection unique to this DB-API connection."""
try:
@@ -348,8 +347,6 @@ class _ConnectionFairy(object):
except AttributeError:
self._detached_info = value = {}
return value
- info = property(_get_info)
- properties = property(_get_info)
def invalidate(self, e=None):
"""Mark this connection as invalidated.
@@ -478,61 +475,60 @@ class SingletonThreadPool(Pool):
def __init__(self, creator, pool_size=5, **params):
params['use_threadlocal'] = True
Pool.__init__(self, creator, **params)
- self._conns = {}
+ self._conn = threading.local()
+ self._all_conns = set()
self.size = pool_size
def recreate(self):
self.log("Pool recreating")
- return SingletonThreadPool(self._creator, pool_size=self.size, recycle=self._recycle, echo=self._should_log_info, use_threadlocal=self._use_threadlocal, listeners=self.listeners)
+ return SingletonThreadPool(self._creator,
+ pool_size=self.size,
+ recycle=self._recycle,
+ echo=self._should_log_info,
+ use_threadlocal=self._use_threadlocal,
+ listeners=self.listeners)
def dispose(self):
- """Dispose of this pool.
+ """Dispose of this pool."""
- this method leaves the possibility of checked-out connections
- remaining opened, so it is advised to not reuse the pool once
- dispose() is called, and to instead use a new pool constructed
- by the recreate() method.
- """
-
- for key, conn in self._conns.items():
+ for conn in self._all_conns:
try:
conn.close()
except (SystemExit, KeyboardInterrupt):
raise
except:
- # sqlite won't even let you close a conn from a thread
+ # pysqlite won't even let you close a conn from a thread
# that didn't create it
pass
- del self._conns[key]
-
+
+ self._all_conns.clear()
+
def dispose_local(self):
- try:
- del self._conns[thread.get_ident()]
- except KeyError:
- pass
+ if hasattr(self._conn, 'current'):
+ conn = self._conn.current()
+ self._all_conns.discard(conn)
+ del self._conn.current
def cleanup(self):
- for key in self._conns.keys():
- try:
- del self._conns[key]
- except KeyError:
- pass
- if len(self._conns) <= self.size:
+ for conn in list(self._all_conns):
+ self._all_conns.discard(conn)
+ if len(self._all_conns) <= self.size:
return
def status(self):
- return "SingletonThreadPool id:%d thread:%d size: %d" % (id(self), thread.get_ident(), len(self._conns))
+ return "SingletonThreadPool id:%d size: %d" % (id(self), len(self._all_conns))
def do_return_conn(self, conn):
pass
def do_get(self):
try:
- return self._conns[thread.get_ident()]
- except KeyError:
+ return self._conn.current()
+ except AttributeError:
c = self.create_connection()
- self._conns[thread.get_ident()] = c
- if len(self._conns) > self.size:
+ self._conn.current = weakref.ref(c)
+ self._all_conns.add(c)
+ if len(self._all_conns) > self.size:
self.cleanup()
return c