diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2008-08-13 22:41:17 +0000 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2008-08-13 22:41:17 +0000 |
| commit | cd7678a965594ff2db153a7cade0fe8555bd0d38 (patch) | |
| tree | 51d029be7b976c0c9da0cb0f265435b5d1caa0e9 /lib/sqlalchemy/pool.py | |
| parent | c374f7b8b52b4169d41cf8d2cfe310ed5fd5f925 (diff) | |
| download | sqlalchemy-cd7678a965594ff2db153a7cade0fe8555bd0d38.tar.gz | |
- with 2.3 support dropped,
all usage of thread.get_ident() is removed, and replaced
with threading.local() usage. this allows potentially
faster and safer thread local access.
Diffstat (limited to 'lib/sqlalchemy/pool.py')
| -rw-r--r-- | lib/sqlalchemy/pool.py | 88 |
1 files changed, 42 insertions, 46 deletions
diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py index 6cc05b287..ddf7cb51e 100644 --- a/lib/sqlalchemy/pool.py +++ b/lib/sqlalchemy/pool.py @@ -16,7 +16,7 @@ regular DB-API connect() methods to be transparently managed by a SQLAlchemy connection pool. """ -import weakref, time +import weakref, time, threading from sqlalchemy import exc, log from sqlalchemy import queue as Queue @@ -119,11 +119,7 @@ class Pool(object): def __init__(self, creator, recycle=-1, echo=None, use_threadlocal=False, reset_on_return=True, listeners=None): self.logger = log.instance_logger(self, echoflag=echo) - # the WeakValueDictionary works more nicely than a regular dict of - # weakrefs. the latter can pile up dead reference objects which don't - # get cleaned out. WVD adds from 1-6 method calls to a checkout - # operation. - self._threadconns = weakref.WeakValueDictionary() + self._threadconns = threading.local() self._creator = creator self._recycle = recycle self._use_threadlocal = use_threadlocal @@ -165,15 +161,15 @@ class Pool(object): return _ConnectionFairy(self).checkout() try: - return self._threadconns[thread.get_ident()].checkout() - except KeyError: + return self._threadconns.current().checkout() + except AttributeError: agent = _ConnectionFairy(self) - self._threadconns[thread.get_ident()] = agent + self._threadconns.current = weakref.ref(agent) return agent.checkout() def return_conn(self, record): - if self._use_threadlocal and thread.get_ident() in self._threadconns: - del self._threadconns[thread.get_ident()] + if self._use_threadlocal and hasattr(self._threadconns, "current"): + del self._threadconns.current self.do_return_conn(record) def get(self): @@ -286,8 +282,6 @@ class _ConnectionRecord(object): self.__pool.log("Error on connect(): %s" % e) raise - properties = property(lambda self: self.info, - doc="A synonym for .info, will be removed in 0.5.") def _finalize_fairy(connection, connection_record, pool, ref=None): if ref is not None and connection_record.backref is not ref: @@ -331,11 +325,16 @@ class _ConnectionFairy(object): self._pool.log("Connection %r checked out from pool" % self.connection) - _logger = property(lambda self: self._pool.logger) + @property + def _logger(self): + return self._pool.logger - is_valid = property(lambda self:self.connection is not None) + @property + def is_valid(self): + return self.connection is not None - def _get_info(self): + @property + def info(self): """An info collection unique to this DB-API connection.""" try: @@ -348,8 +347,6 @@ class _ConnectionFairy(object): except AttributeError: self._detached_info = value = {} return value - info = property(_get_info) - properties = property(_get_info) def invalidate(self, e=None): """Mark this connection as invalidated. @@ -478,61 +475,60 @@ class SingletonThreadPool(Pool): def __init__(self, creator, pool_size=5, **params): params['use_threadlocal'] = True Pool.__init__(self, creator, **params) - self._conns = {} + self._conn = threading.local() + self._all_conns = set() self.size = pool_size def recreate(self): self.log("Pool recreating") - return SingletonThreadPool(self._creator, pool_size=self.size, recycle=self._recycle, echo=self._should_log_info, use_threadlocal=self._use_threadlocal, listeners=self.listeners) + return SingletonThreadPool(self._creator, + pool_size=self.size, + recycle=self._recycle, + echo=self._should_log_info, + use_threadlocal=self._use_threadlocal, + listeners=self.listeners) def dispose(self): - """Dispose of this pool. + """Dispose of this pool.""" - this method leaves the possibility of checked-out connections - remaining opened, so it is advised to not reuse the pool once - dispose() is called, and to instead use a new pool constructed - by the recreate() method. - """ - - for key, conn in self._conns.items(): + for conn in self._all_conns: try: conn.close() except (SystemExit, KeyboardInterrupt): raise except: - # sqlite won't even let you close a conn from a thread + # pysqlite won't even let you close a conn from a thread # that didn't create it pass - del self._conns[key] - + + self._all_conns.clear() + def dispose_local(self): - try: - del self._conns[thread.get_ident()] - except KeyError: - pass + if hasattr(self._conn, 'current'): + conn = self._conn.current() + self._all_conns.discard(conn) + del self._conn.current def cleanup(self): - for key in self._conns.keys(): - try: - del self._conns[key] - except KeyError: - pass - if len(self._conns) <= self.size: + for conn in list(self._all_conns): + self._all_conns.discard(conn) + if len(self._all_conns) <= self.size: return def status(self): - return "SingletonThreadPool id:%d thread:%d size: %d" % (id(self), thread.get_ident(), len(self._conns)) + return "SingletonThreadPool id:%d size: %d" % (id(self), len(self._all_conns)) def do_return_conn(self, conn): pass def do_get(self): try: - return self._conns[thread.get_ident()] - except KeyError: + return self._conn.current() + except AttributeError: c = self.create_connection() - self._conns[thread.get_ident()] = c - if len(self._conns) > self.size: + self._conn.current = weakref.ref(c) + self._all_conns.add(c) + if len(self._all_conns) > self.size: self.cleanup() return c |
