diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2006-05-25 14:20:23 +0000 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2006-05-25 14:20:23 +0000 |
| commit | bb79e2e871d0a4585164c1a6ed626d96d0231975 (patch) | |
| tree | 6d457ba6c36c408b45db24ec3c29e147fe7504ff /lib/sqlalchemy/pool.py | |
| parent | 4fc3a0648699c2b441251ba4e1d37a9107bd1986 (diff) | |
| download | sqlalchemy-bb79e2e871d0a4585164c1a6ed626d96d0231975.tar.gz | |
merged 0.2 branch into trunk; 0.1 now in sqlalchemy/branches/rel_0_1
Diffstat (limited to 'lib/sqlalchemy/pool.py')
| -rw-r--r-- | lib/sqlalchemy/pool.py | 86 |
1 files changed, 48 insertions, 38 deletions
diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool.py index 1603c9873..4452f6419 100644 --- a/lib/sqlalchemy/pool.py +++ b/lib/sqlalchemy/pool.py @@ -70,41 +70,38 @@ def clear_managers(): class Pool(object): - def __init__(self, echo = False, use_threadlocal = True, logger=None): - self._threadconns = weakref.WeakValueDictionary() + def __init__(self, echo = False, use_threadlocal = True, logger=None, **kwargs): + self._threadconns = {} #weakref.WeakValueDictionary() self._use_threadlocal = use_threadlocal - self._echo = echo + self.echo = echo self._logger = logger or util.Logger(origin='pool') def unique_connection(self): - return ConnectionFairy(self) + return ConnectionFairy(self).checkout() def connect(self): if not self._use_threadlocal: - return ConnectionFairy(self) + return ConnectionFairy(self).checkout() try: - return self._threadconns[thread.get_ident()] + return self._threadconns[thread.get_ident()].checkout() except KeyError: - agent = ConnectionFairy(self) + agent = ConnectionFairy(self).checkout() self._threadconns[thread.get_ident()] = agent return agent - def return_conn(self, conn): - if self._echo: - self.log("return connection to pool") - self.do_return_conn(conn) + def return_conn(self, agent): + if self._use_threadlocal: + try: + del self._threadconns[thread.get_ident()] + except KeyError: + pass + self.do_return_conn(agent.connection) def get(self): - if self._echo: - self.log("get connection from pool") - self.log(self.status()) return self.do_get() def return_invalid(self): - if self._echo: - self.log("return invalid connection to pool") - self.log(self.status()) self.do_return_invalid() def do_get(self): @@ -125,6 +122,7 @@ class Pool(object): class ConnectionFairy(object): def __init__(self, pool, connection=None): self.pool = pool + self.__counter = 0 if connection is not None: self.connection = connection else: @@ -134,16 +132,35 @@ class ConnectionFairy(object): self.connection = None self.pool.return_invalid() raise + if self.pool.echo: + self.pool.log("Connection %s checked out from pool" % repr(self.connection)) def invalidate(self): + if self.pool.echo: + self.pool.log("Invalidate connection %s" % repr(self.connection)) + self.connection.rollback() self.connection = None self.pool.return_invalid() def cursor(self): return CursorFairy(self, self.connection.cursor()) def __getattr__(self, key): return getattr(self.connection, key) + def checkout(self): + if self.connection is None: + raise "this connection is closed" + self.__counter +=1 + return self + def close(self): + self.__counter -=1 + if self.__counter == 0: + self._close() def __del__(self): + self._close() + def _close(self): if self.connection is not None: - self.pool.return_conn(self.connection) + if self.pool.echo: + self.pool.log("Connection %s being returned to pool" % repr(self.connection)) + self.connection.rollback() + self.pool.return_conn(self) self.pool = None self.connection = None @@ -156,19 +173,18 @@ class CursorFairy(object): class SingletonThreadPool(Pool): """Maintains one connection per each thread, never moving to another thread. this is - used for SQLite and other databases with a similar restriction.""" + used for SQLite.""" def __init__(self, creator, **params): Pool.__init__(self, **params) self._conns = {} self._creator = creator def status(self): - return "SingletonThreadPool thread:%d size: %d" % (thread.get_ident(), len(self._conns)) + return "SingletonThreadPool id:%d thread:%d size: %d" % (id(self), thread.get_ident(), len(self._conns)) def do_return_conn(self, conn): - if self._conns.get(thread.get_ident(), None) is None: - self._conns[thread.get_ident()] = conn - + pass + def do_return_invalid(self): try: del self._conns[thread.get_ident()] @@ -177,54 +193,48 @@ class SingletonThreadPool(Pool): def do_get(self): try: - c = self._conns[thread.get_ident()] - if c is None: - return self._creator() + return self._conns[thread.get_ident()] except KeyError: c = self._creator() - self._conns[thread.get_ident()] = None - return c + self._conns[thread.get_ident()] = c + return c class QueuePool(Pool): """uses Queue.Queue to maintain a fixed-size list of connections.""" - def __init__(self, creator, pool_size = 5, max_overflow = 10, **params): + def __init__(self, creator, pool_size = 5, max_overflow = 10, timeout=30, **params): Pool.__init__(self, **params) self._creator = creator self._pool = Queue.Queue(pool_size) self._overflow = 0 - pool_size self._max_overflow = max_overflow + self._timeout = timeout def do_return_conn(self, conn): - if self._echo: - self.log("return QP connection to pool") try: self._pool.put(conn, False) except Queue.Full: self._overflow -= 1 def do_return_invalid(self): - if self._echo: - self.log("return invalid connection") if self._pool.full(): self._overflow -= 1 def do_get(self): - if self._echo: - self.log("get QP connection from pool") - self.log(self.status()) try: - return self._pool.get(self._max_overflow > -1 and self._overflow >= self._max_overflow) + return self._pool.get(self._max_overflow > -1 and self._overflow >= self._max_overflow, self._timeout) except Queue.Empty: self._overflow += 1 return self._creator() - def __del__(self): + def dispose(self): while True: try: conn = self._pool.get(False) conn.close() except Queue.Empty: break + def __del__(self): + self.dispose() def status(self): tup = (self.size(), self.checkedin(), self.overflow(), self.checkedout()) |
