diff options
| -rw-r--r-- | doc/build/core/pooling.rst | 49 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/strategies.py | 2 | ||||
| -rw-r--r-- | lib/sqlalchemy/pool/__init__.py | 30 | ||||
| -rw-r--r-- | lib/sqlalchemy/pool/base.py (renamed from lib/sqlalchemy/pool.py) | 540 | ||||
| -rw-r--r-- | lib/sqlalchemy/pool/dbapi_proxy.py | 146 | ||||
| -rw-r--r-- | lib/sqlalchemy/pool/impl.py | 419 | ||||
| -rw-r--r-- | test/engine/test_logging.py | 4 | ||||
| -rw-r--r-- | test/engine/test_pool.py | 2 |
8 files changed, 603 insertions, 589 deletions
diff --git a/doc/build/core/pooling.rst b/doc/build/core/pooling.rst index a23420390..565d8ee1d 100644 --- a/doc/build/core/pooling.rst +++ b/doc/build/core/pooling.rst @@ -494,52 +494,3 @@ API Documentation - Available Pool Implementations .. autoclass:: _ConnectionRecord :members: - -Pooling Plain DB-API Connections --------------------------------- - -Any :pep:`249` DB-API module can be "proxied" through the connection -pool transparently. Usage of the DB-API is exactly as before, except -the ``connect()`` method will consult the pool. Below we illustrate -this with ``psycopg2``:: - - import sqlalchemy.pool as pool - import psycopg2 as psycopg - - psycopg = pool.manage(psycopg) - - # then connect normally - connection = psycopg.connect(database='test', username='scott', - password='tiger') - -This produces a :class:`_DBProxy` object which supports the same -``connect()`` function as the original DB-API module. Upon -connection, a connection proxy object is returned, which delegates its -calls to a real DB-API connection object. This connection object is -stored persistently within a connection pool (an instance of -:class:`.Pool`) that corresponds to the exact connection arguments sent -to the ``connect()`` function. - -The connection proxy supports all of the methods on the original -connection object, most of which are proxied via ``__getattr__()``. -The ``close()`` method will return the connection to the pool, and the -``cursor()`` method will return a proxied cursor object. Both the -connection proxy and the cursor proxy will also return the underlying -connection to the pool after they have both been garbage collected, -which is detected via weakref callbacks (``__del__`` is not used). - -Additionally, when connections are returned to the pool, a -``rollback()`` is issued on the connection unconditionally. This is -to release any locks still held by the connection that may have -resulted from normal activity. - -By default, the ``connect()`` method will return the same connection -that is already checked out in the current thread. This allows a -particular connection to be used in a given thread without needing to -pass it around between functions. To disable this behavior, specify -``use_threadlocal=False`` to the ``manage()`` function. - -.. autofunction:: sqlalchemy.pool.manage - -.. autofunction:: sqlalchemy.pool.clear_managers - diff --git a/lib/sqlalchemy/engine/strategies.py b/lib/sqlalchemy/engine/strategies.py index 4b6ee77fd..0ec6aa06f 100644 --- a/lib/sqlalchemy/engine/strategies.py +++ b/lib/sqlalchemy/engine/strategies.py @@ -134,7 +134,7 @@ class DefaultEngineStrategy(EngineStrategy): pool = poolclass(creator, **pool_args) else: - if isinstance(pool, poollib._DBProxy): + if isinstance(pool, poollib.dbapi_proxy._DBProxy): pool = pool.get_pool(*cargs, **cparams) else: pool = pool diff --git a/lib/sqlalchemy/pool/__init__.py b/lib/sqlalchemy/pool/__init__.py new file mode 100644 index 000000000..f2f035051 --- /dev/null +++ b/lib/sqlalchemy/pool/__init__.py @@ -0,0 +1,30 @@ +# sqlalchemy/pool/__init__.py +# Copyright (C) 2005-2018 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: http://www.opensource.org/licenses/mit-license.php + + +"""Connection pooling for DB-API connections. + +Provides a number of connection pool implementations for a variety of +usage scenarios and thread behavior requirements imposed by the +application, DB-API or database itself. + +Also provides a DB-API 2.0 connection proxying mechanism allowing +regular DB-API connect() methods to be transparently managed by a +SQLAlchemy connection pool. +""" + +from .base import _refs # noqa +from .base import Pool # noqa +from .impl import ( # noqa + QueuePool, StaticPool, NullPool, AssertionPool, SingletonThreadPool) +from .dbapi_proxy import manage, clear_managers # noqa + +from .base import reset_rollback, reset_commit, reset_none # noqa + +# as these are likely to be used in various test suites, debugging +# setups, keep them in the sqlalchemy.pool namespace +from .base import _ConnectionFairy, _ConnectionRecord, _finalize_fairy # noqa diff --git a/lib/sqlalchemy/pool.py b/lib/sqlalchemy/pool/base.py index e22a682e0..855110160 100644 --- a/lib/sqlalchemy/pool.py +++ b/lib/sqlalchemy/pool/base.py @@ -6,63 +6,18 @@ # the MIT License: http://www.opensource.org/licenses/mit-license.php -"""Connection pooling for DB-API connections. +"""Base constructs for connection pools. -Provides a number of connection pool implementations for a variety of -usage scenarios and thread behavior requirements imposed by the -application, DB-API or database itself. - -Also provides a DB-API 2.0 connection proxying mechanism allowing -regular DB-API connect() methods to be transparently managed by a -SQLAlchemy connection pool. """ +from collections import deque import time -import traceback import weakref -from . import exc, log, event, interfaces, util -from .util import queue as sqla_queue -from .util import threading, memoized_property, \ - chop_traceback - -from collections import deque -proxies = {} - - -def manage(module, **params): - r"""Return a proxy for a DB-API module that automatically - pools connections. - - Given a DB-API 2.0 module and pool management parameters, returns - a proxy for the module that will automatically pool connections, - creating new connection pools for each distinct set of connection - arguments sent to the decorated module's connect() function. - - :param module: a DB-API 2.0 database module - - :param poolclass: the class used by the pool module to provide - pooling. Defaults to :class:`.QueuePool`. - - :param \**params: will be passed through to *poolclass* - - """ - try: - return proxies[module] - except KeyError: - return proxies.setdefault(module, _DBProxy(module, **params)) +from .. import exc, log, event, interfaces, util +from ..util import threading -def clear_managers(): - """Remove all current DB-API 2.0 managers. - - All pools and connections are disposed. - """ - - for manager in proxies.values(): - manager.close() - proxies.clear() - reset_rollback = util.symbol('reset_rollback') reset_commit = util.symbol('reset_commit') reset_none = util.symbol('reset_none') @@ -1014,490 +969,3 @@ class _ConnectionFairy(object): self._checkin() -class SingletonThreadPool(Pool): - - """A Pool that maintains one connection per thread. - - Maintains one connection per each thread, never moving a connection to a - thread other than the one which it was created in. - - .. warning:: the :class:`.SingletonThreadPool` will call ``.close()`` - on arbitrary connections that exist beyond the size setting of - ``pool_size``, e.g. if more unique **thread identities** - than what ``pool_size`` states are used. This cleanup is - non-deterministic and not sensitive to whether or not the connections - linked to those thread identities are currently in use. - - :class:`.SingletonThreadPool` may be improved in a future release, - however in its current status it is generally used only for test - scenarios using a SQLite ``:memory:`` database and is not recommended - for production use. - - - Options are the same as those of :class:`.Pool`, as well as: - - :param pool_size: The number of threads in which to maintain connections - at once. Defaults to five. - - :class:`.SingletonThreadPool` is used by the SQLite dialect - automatically when a memory-based database is used. - See :ref:`sqlite_toplevel`. - - """ - - def __init__(self, creator, pool_size=5, **kw): - kw['use_threadlocal'] = True - Pool.__init__(self, creator, **kw) - self._conn = threading.local() - self._all_conns = set() - self.size = pool_size - - def recreate(self): - self.logger.info("Pool recreating") - return self.__class__(self._creator, - pool_size=self.size, - recycle=self._recycle, - echo=self.echo, - logging_name=self._orig_logging_name, - use_threadlocal=self._use_threadlocal, - reset_on_return=self._reset_on_return, - _dispatch=self.dispatch, - dialect=self._dialect) - - def dispose(self): - """Dispose of this pool.""" - - for conn in self._all_conns: - try: - conn.close() - except Exception: - # pysqlite won't even let you close a conn from a thread - # that didn't create it - pass - - self._all_conns.clear() - - def _cleanup(self): - while len(self._all_conns) >= self.size: - c = self._all_conns.pop() - c.close() - - def status(self): - return "SingletonThreadPool id:%d size: %d" % \ - (id(self), len(self._all_conns)) - - def _do_return_conn(self, conn): - pass - - def _do_get(self): - try: - c = self._conn.current() - if c: - return c - except AttributeError: - pass - c = self._create_connection() - self._conn.current = weakref.ref(c) - if len(self._all_conns) >= self.size: - self._cleanup() - self._all_conns.add(c) - return c - - -class QueuePool(Pool): - - """A :class:`.Pool` that imposes a limit on the number of open connections. - - :class:`.QueuePool` is the default pooling implementation used for - all :class:`.Engine` objects, unless the SQLite dialect is in use. - - """ - - def __init__(self, creator, pool_size=5, max_overflow=10, timeout=30, - **kw): - r""" - Construct a QueuePool. - - :param creator: a callable function that returns a DB-API - connection object, same as that of :paramref:`.Pool.creator`. - - :param pool_size: The size of the pool to be maintained, - defaults to 5. This is the largest number of connections that - will be kept persistently in the pool. Note that the pool - begins with no connections; once this number of connections - is requested, that number of connections will remain. - ``pool_size`` can be set to 0 to indicate no size limit; to - disable pooling, use a :class:`~sqlalchemy.pool.NullPool` - instead. - - :param max_overflow: The maximum overflow size of the - pool. When the number of checked-out connections reaches the - size set in pool_size, additional connections will be - returned up to this limit. When those additional connections - are returned to the pool, they are disconnected and - discarded. It follows then that the total number of - simultaneous connections the pool will allow is pool_size + - `max_overflow`, and the total number of "sleeping" - connections the pool will allow is pool_size. `max_overflow` - can be set to -1 to indicate no overflow limit; no limit - will be placed on the total number of concurrent - connections. Defaults to 10. - - :param timeout: The number of seconds to wait before giving up - on returning a connection. Defaults to 30. - - :param \**kw: Other keyword arguments including - :paramref:`.Pool.recycle`, :paramref:`.Pool.echo`, - :paramref:`.Pool.reset_on_return` and others are passed to the - :class:`.Pool` constructor. - - """ - Pool.__init__(self, creator, **kw) - self._pool = sqla_queue.Queue(pool_size) - self._overflow = 0 - pool_size - self._max_overflow = max_overflow - self._timeout = timeout - self._overflow_lock = threading.Lock() - - def _do_return_conn(self, conn): - try: - self._pool.put(conn, False) - except sqla_queue.Full: - try: - conn.close() - finally: - self._dec_overflow() - - def _do_get(self): - use_overflow = self._max_overflow > -1 - - try: - wait = use_overflow and self._overflow >= self._max_overflow - return self._pool.get(wait, self._timeout) - except sqla_queue.Empty: - # don't do things inside of "except Empty", because when we say - # we timed out or can't connect and raise, Python 3 tells - # people the real error is queue.Empty which it isn't. - pass - if use_overflow and self._overflow >= self._max_overflow: - if not wait: - return self._do_get() - else: - raise exc.TimeoutError( - "QueuePool limit of size %d overflow %d reached, " - "connection timed out, timeout %d" % - (self.size(), self.overflow(), self._timeout), code="3o7r") - - if self._inc_overflow(): - try: - return self._create_connection() - except: - with util.safe_reraise(): - self._dec_overflow() - else: - return self._do_get() - - def _inc_overflow(self): - if self._max_overflow == -1: - self._overflow += 1 - return True - with self._overflow_lock: - if self._overflow < self._max_overflow: - self._overflow += 1 - return True - else: - return False - - def _dec_overflow(self): - if self._max_overflow == -1: - self._overflow -= 1 - return True - with self._overflow_lock: - self._overflow -= 1 - return True - - def recreate(self): - self.logger.info("Pool recreating") - return self.__class__(self._creator, pool_size=self._pool.maxsize, - max_overflow=self._max_overflow, - timeout=self._timeout, - recycle=self._recycle, echo=self.echo, - logging_name=self._orig_logging_name, - use_threadlocal=self._use_threadlocal, - reset_on_return=self._reset_on_return, - _dispatch=self.dispatch, - dialect=self._dialect) - - def dispose(self): - while True: - try: - conn = self._pool.get(False) - conn.close() - except sqla_queue.Empty: - break - - self._overflow = 0 - self.size() - self.logger.info("Pool disposed. %s", self.status()) - - def status(self): - return "Pool size: %d Connections in pool: %d "\ - "Current Overflow: %d Current Checked out "\ - "connections: %d" % (self.size(), - self.checkedin(), - self.overflow(), - self.checkedout()) - - def size(self): - return self._pool.maxsize - - def checkedin(self): - return self._pool.qsize() - - def overflow(self): - return self._overflow - - def checkedout(self): - return self._pool.maxsize - self._pool.qsize() + self._overflow - - -class NullPool(Pool): - - """A Pool which does not pool connections. - - Instead it literally opens and closes the underlying DB-API connection - per each connection open/close. - - Reconnect-related functions such as ``recycle`` and connection - invalidation are not supported by this Pool implementation, since - no connections are held persistently. - - .. versionchanged:: 0.7 - :class:`.NullPool` is used by the SQlite dialect automatically - when a file-based database is used. See :ref:`sqlite_toplevel`. - - """ - - def status(self): - return "NullPool" - - def _do_return_conn(self, conn): - conn.close() - - def _do_get(self): - return self._create_connection() - - def recreate(self): - self.logger.info("Pool recreating") - - return self.__class__(self._creator, - recycle=self._recycle, - echo=self.echo, - logging_name=self._orig_logging_name, - use_threadlocal=self._use_threadlocal, - reset_on_return=self._reset_on_return, - _dispatch=self.dispatch, - dialect=self._dialect) - - def dispose(self): - pass - - -class StaticPool(Pool): - - """A Pool of exactly one connection, used for all requests. - - Reconnect-related functions such as ``recycle`` and connection - invalidation (which is also used to support auto-reconnect) are not - currently supported by this Pool implementation but may be implemented - in a future release. - - """ - - @memoized_property - def _conn(self): - return self._creator() - - @memoized_property - def connection(self): - return _ConnectionRecord(self) - - def status(self): - return "StaticPool" - - def dispose(self): - if '_conn' in self.__dict__: - self._conn.close() - self._conn = None - - def recreate(self): - self.logger.info("Pool recreating") - return self.__class__(creator=self._creator, - recycle=self._recycle, - use_threadlocal=self._use_threadlocal, - reset_on_return=self._reset_on_return, - echo=self.echo, - logging_name=self._orig_logging_name, - _dispatch=self.dispatch, - dialect=self._dialect) - - def _create_connection(self): - return self._conn - - def _do_return_conn(self, conn): - pass - - def _do_get(self): - return self.connection - - -class AssertionPool(Pool): - - """A :class:`.Pool` that allows at most one checked out connection at - any given time. - - This will raise an exception if more than one connection is checked out - at a time. Useful for debugging code that is using more connections - than desired. - - .. versionchanged:: 0.7 - :class:`.AssertionPool` also logs a traceback of where - the original connection was checked out, and reports - this in the assertion error raised. - - """ - - def __init__(self, *args, **kw): - self._conn = None - self._checked_out = False - self._store_traceback = kw.pop('store_traceback', True) - self._checkout_traceback = None - Pool.__init__(self, *args, **kw) - - def status(self): - return "AssertionPool" - - def _do_return_conn(self, conn): - if not self._checked_out: - raise AssertionError("connection is not checked out") - self._checked_out = False - assert conn is self._conn - - def dispose(self): - self._checked_out = False - if self._conn: - self._conn.close() - - def recreate(self): - self.logger.info("Pool recreating") - return self.__class__(self._creator, echo=self.echo, - logging_name=self._orig_logging_name, - _dispatch=self.dispatch, - dialect=self._dialect) - - def _do_get(self): - if self._checked_out: - if self._checkout_traceback: - suffix = ' at:\n%s' % ''.join( - chop_traceback(self._checkout_traceback)) - else: - suffix = '' - raise AssertionError("connection is already checked out" + suffix) - - if not self._conn: - self._conn = self._create_connection() - - self._checked_out = True - if self._store_traceback: - self._checkout_traceback = traceback.format_stack() - return self._conn - - -class _DBProxy(object): - - """Layers connection pooling behavior on top of a standard DB-API module. - - Proxies a DB-API 2.0 connect() call to a connection pool keyed to the - specific connect parameters. Other functions and attributes are delegated - to the underlying DB-API module. - """ - - def __init__(self, module, poolclass=QueuePool, **kw): - """Initializes a new proxy. - - module - a DB-API 2.0 module - - poolclass - a Pool class, defaulting to QueuePool - - Other parameters are sent to the Pool object's constructor. - - """ - - self.module = module - self.kw = kw - self.poolclass = poolclass - self.pools = {} - self._create_pool_mutex = threading.Lock() - - def close(self): - for key in list(self.pools): - del self.pools[key] - - def __del__(self): - self.close() - - def __getattr__(self, key): - return getattr(self.module, key) - - def get_pool(self, *args, **kw): - key = self._serialize(*args, **kw) - try: - return self.pools[key] - except KeyError: - self._create_pool_mutex.acquire() - try: - if key not in self.pools: - kw.pop('sa_pool_key', None) - pool = self.poolclass( - lambda: self.module.connect(*args, **kw), **self.kw) - self.pools[key] = pool - return pool - else: - return self.pools[key] - finally: - self._create_pool_mutex.release() - - def connect(self, *args, **kw): - """Activate a connection to the database. - - Connect to the database using this DBProxy's module and the given - connect arguments. If the arguments match an existing pool, the - connection will be returned from the pool's current thread-local - connection instance, or if there is no thread-local connection - instance it will be checked out from the set of pooled connections. - - If the pool has no available connections and allows new connections - to be created, a new database connection will be made. - - """ - - return self.get_pool(*args, **kw).connect() - - def dispose(self, *args, **kw): - """Dispose the pool referenced by the given connect arguments.""" - - key = self._serialize(*args, **kw) - try: - del self.pools[key] - except KeyError: - pass - - def _serialize(self, *args, **kw): - if "sa_pool_key" in kw: - return kw['sa_pool_key'] - - return tuple( - list(args) + - [(k, kw[k]) for k in sorted(kw)] - ) diff --git a/lib/sqlalchemy/pool/dbapi_proxy.py b/lib/sqlalchemy/pool/dbapi_proxy.py new file mode 100644 index 000000000..aa439bd23 --- /dev/null +++ b/lib/sqlalchemy/pool/dbapi_proxy.py @@ -0,0 +1,146 @@ +# sqlalchemy/pool/dbapi_proxy.py +# Copyright (C) 2005-2018 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: http://www.opensource.org/licenses/mit-license.php + + +"""DBAPI proxy utility. + +Provides transparent connection pooling on top of a Python DBAPI. + +This is legacy SQLAlchemy functionality that is not typically used +today. + +""" + +from .impl import QueuePool +from ..util import threading + +proxies = {} + + +def manage(module, **params): + r"""Return a proxy for a DB-API module that automatically + pools connections. + + Given a DB-API 2.0 module and pool management parameters, returns + a proxy for the module that will automatically pool connections, + creating new connection pools for each distinct set of connection + arguments sent to the decorated module's connect() function. + + :param module: a DB-API 2.0 database module + + :param poolclass: the class used by the pool module to provide + pooling. Defaults to :class:`.QueuePool`. + + :param \**params: will be passed through to *poolclass* + + """ + try: + return proxies[module] + except KeyError: + return proxies.setdefault(module, _DBProxy(module, **params)) + + +def clear_managers(): + """Remove all current DB-API 2.0 managers. + + All pools and connections are disposed. + """ + + for manager in proxies.values(): + manager.close() + proxies.clear() + + +class _DBProxy(object): + + """Layers connection pooling behavior on top of a standard DB-API module. + + Proxies a DB-API 2.0 connect() call to a connection pool keyed to the + specific connect parameters. Other functions and attributes are delegated + to the underlying DB-API module. + """ + + def __init__(self, module, poolclass=QueuePool, **kw): + """Initializes a new proxy. + + module + a DB-API 2.0 module + + poolclass + a Pool class, defaulting to QueuePool + + Other parameters are sent to the Pool object's constructor. + + """ + + self.module = module + self.kw = kw + self.poolclass = poolclass + self.pools = {} + self._create_pool_mutex = threading.Lock() + + def close(self): + for key in list(self.pools): + del self.pools[key] + + def __del__(self): + self.close() + + def __getattr__(self, key): + return getattr(self.module, key) + + def get_pool(self, *args, **kw): + key = self._serialize(*args, **kw) + try: + return self.pools[key] + except KeyError: + self._create_pool_mutex.acquire() + try: + if key not in self.pools: + kw.pop('sa_pool_key', None) + pool = self.poolclass( + lambda: self.module.connect(*args, **kw), **self.kw) + self.pools[key] = pool + return pool + else: + return self.pools[key] + finally: + self._create_pool_mutex.release() + + def connect(self, *args, **kw): + """Activate a connection to the database. + + Connect to the database using this DBProxy's module and the given + connect arguments. If the arguments match an existing pool, the + connection will be returned from the pool's current thread-local + connection instance, or if there is no thread-local connection + instance it will be checked out from the set of pooled connections. + + If the pool has no available connections and allows new connections + to be created, a new database connection will be made. + + """ + + return self.get_pool(*args, **kw).connect() + + def dispose(self, *args, **kw): + """Dispose the pool referenced by the given connect arguments.""" + + key = self._serialize(*args, **kw) + try: + del self.pools[key] + except KeyError: + pass + + def _serialize(self, *args, **kw): + if "sa_pool_key" in kw: + return kw['sa_pool_key'] + + return tuple( + list(args) + + [(k, kw[k]) for k in sorted(kw)] + ) diff --git a/lib/sqlalchemy/pool/impl.py b/lib/sqlalchemy/pool/impl.py new file mode 100644 index 000000000..c45661895 --- /dev/null +++ b/lib/sqlalchemy/pool/impl.py @@ -0,0 +1,419 @@ +# sqlalchemy/pool.py +# Copyright (C) 2005-2018 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: http://www.opensource.org/licenses/mit-license.php + + +"""Pool implementation classes. + +""" + +import traceback +import weakref + +from .base import Pool, _ConnectionRecord +from .. import exc +from .. import util +from ..util import queue as sqla_queue +from ..util import chop_traceback +from ..util import threading + + +class QueuePool(Pool): + + """A :class:`.Pool` that imposes a limit on the number of open connections. + + :class:`.QueuePool` is the default pooling implementation used for + all :class:`.Engine` objects, unless the SQLite dialect is in use. + + """ + + def __init__(self, creator, pool_size=5, max_overflow=10, timeout=30, + **kw): + r""" + Construct a QueuePool. + + :param creator: a callable function that returns a DB-API + connection object, same as that of :paramref:`.Pool.creator`. + + :param pool_size: The size of the pool to be maintained, + defaults to 5. This is the largest number of connections that + will be kept persistently in the pool. Note that the pool + begins with no connections; once this number of connections + is requested, that number of connections will remain. + ``pool_size`` can be set to 0 to indicate no size limit; to + disable pooling, use a :class:`~sqlalchemy.pool.NullPool` + instead. + + :param max_overflow: The maximum overflow size of the + pool. When the number of checked-out connections reaches the + size set in pool_size, additional connections will be + returned up to this limit. When those additional connections + are returned to the pool, they are disconnected and + discarded. It follows then that the total number of + simultaneous connections the pool will allow is pool_size + + `max_overflow`, and the total number of "sleeping" + connections the pool will allow is pool_size. `max_overflow` + can be set to -1 to indicate no overflow limit; no limit + will be placed on the total number of concurrent + connections. Defaults to 10. + + :param timeout: The number of seconds to wait before giving up + on returning a connection. Defaults to 30. + + :param \**kw: Other keyword arguments including + :paramref:`.Pool.recycle`, :paramref:`.Pool.echo`, + :paramref:`.Pool.reset_on_return` and others are passed to the + :class:`.Pool` constructor. + + """ + Pool.__init__(self, creator, **kw) + self._pool = sqla_queue.Queue(pool_size) + self._overflow = 0 - pool_size + self._max_overflow = max_overflow + self._timeout = timeout + self._overflow_lock = threading.Lock() + + def _do_return_conn(self, conn): + try: + self._pool.put(conn, False) + except sqla_queue.Full: + try: + conn.close() + finally: + self._dec_overflow() + + def _do_get(self): + use_overflow = self._max_overflow > -1 + + try: + wait = use_overflow and self._overflow >= self._max_overflow + return self._pool.get(wait, self._timeout) + except sqla_queue.Empty: + # don't do things inside of "except Empty", because when we say + # we timed out or can't connect and raise, Python 3 tells + # people the real error is queue.Empty which it isn't. + pass + if use_overflow and self._overflow >= self._max_overflow: + if not wait: + return self._do_get() + else: + raise exc.TimeoutError( + "QueuePool limit of size %d overflow %d reached, " + "connection timed out, timeout %d" % + (self.size(), self.overflow(), self._timeout), code="3o7r") + + if self._inc_overflow(): + try: + return self._create_connection() + except: + with util.safe_reraise(): + self._dec_overflow() + else: + return self._do_get() + + def _inc_overflow(self): + if self._max_overflow == -1: + self._overflow += 1 + return True + with self._overflow_lock: + if self._overflow < self._max_overflow: + self._overflow += 1 + return True + else: + return False + + def _dec_overflow(self): + if self._max_overflow == -1: + self._overflow -= 1 + return True + with self._overflow_lock: + self._overflow -= 1 + return True + + def recreate(self): + self.logger.info("Pool recreating") + return self.__class__(self._creator, pool_size=self._pool.maxsize, + max_overflow=self._max_overflow, + timeout=self._timeout, + recycle=self._recycle, echo=self.echo, + logging_name=self._orig_logging_name, + use_threadlocal=self._use_threadlocal, + reset_on_return=self._reset_on_return, + _dispatch=self.dispatch, + dialect=self._dialect) + + def dispose(self): + while True: + try: + conn = self._pool.get(False) + conn.close() + except sqla_queue.Empty: + break + + self._overflow = 0 - self.size() + self.logger.info("Pool disposed. %s", self.status()) + + def status(self): + return "Pool size: %d Connections in pool: %d "\ + "Current Overflow: %d Current Checked out "\ + "connections: %d" % (self.size(), + self.checkedin(), + self.overflow(), + self.checkedout()) + + def size(self): + return self._pool.maxsize + + def checkedin(self): + return self._pool.qsize() + + def overflow(self): + return self._overflow + + def checkedout(self): + return self._pool.maxsize - self._pool.qsize() + self._overflow + + +class NullPool(Pool): + + """A Pool which does not pool connections. + + Instead it literally opens and closes the underlying DB-API connection + per each connection open/close. + + Reconnect-related functions such as ``recycle`` and connection + invalidation are not supported by this Pool implementation, since + no connections are held persistently. + + .. versionchanged:: 0.7 + :class:`.NullPool` is used by the SQlite dialect automatically + when a file-based database is used. See :ref:`sqlite_toplevel`. + + """ + + def status(self): + return "NullPool" + + def _do_return_conn(self, conn): + conn.close() + + def _do_get(self): + return self._create_connection() + + def recreate(self): + self.logger.info("Pool recreating") + + return self.__class__(self._creator, + recycle=self._recycle, + echo=self.echo, + logging_name=self._orig_logging_name, + use_threadlocal=self._use_threadlocal, + reset_on_return=self._reset_on_return, + _dispatch=self.dispatch, + dialect=self._dialect) + + def dispose(self): + pass + + +class SingletonThreadPool(Pool): + + """A Pool that maintains one connection per thread. + + Maintains one connection per each thread, never moving a connection to a + thread other than the one which it was created in. + + .. warning:: the :class:`.SingletonThreadPool` will call ``.close()`` + on arbitrary connections that exist beyond the size setting of + ``pool_size``, e.g. if more unique **thread identities** + than what ``pool_size`` states are used. This cleanup is + non-deterministic and not sensitive to whether or not the connections + linked to those thread identities are currently in use. + + :class:`.SingletonThreadPool` may be improved in a future release, + however in its current status it is generally used only for test + scenarios using a SQLite ``:memory:`` database and is not recommended + for production use. + + + Options are the same as those of :class:`.Pool`, as well as: + + :param pool_size: The number of threads in which to maintain connections + at once. Defaults to five. + + :class:`.SingletonThreadPool` is used by the SQLite dialect + automatically when a memory-based database is used. + See :ref:`sqlite_toplevel`. + + """ + + def __init__(self, creator, pool_size=5, **kw): + kw['use_threadlocal'] = True + Pool.__init__(self, creator, **kw) + self._conn = threading.local() + self._all_conns = set() + self.size = pool_size + + def recreate(self): + self.logger.info("Pool recreating") + return self.__class__(self._creator, + pool_size=self.size, + recycle=self._recycle, + echo=self.echo, + logging_name=self._orig_logging_name, + use_threadlocal=self._use_threadlocal, + reset_on_return=self._reset_on_return, + _dispatch=self.dispatch, + dialect=self._dialect) + + def dispose(self): + """Dispose of this pool.""" + + for conn in self._all_conns: + try: + conn.close() + except Exception: + # pysqlite won't even let you close a conn from a thread + # that didn't create it + pass + + self._all_conns.clear() + + def _cleanup(self): + while len(self._all_conns) >= self.size: + c = self._all_conns.pop() + c.close() + + def status(self): + return "SingletonThreadPool id:%d size: %d" % \ + (id(self), len(self._all_conns)) + + def _do_return_conn(self, conn): + pass + + def _do_get(self): + try: + c = self._conn.current() + if c: + return c + except AttributeError: + pass + c = self._create_connection() + self._conn.current = weakref.ref(c) + if len(self._all_conns) >= self.size: + self._cleanup() + self._all_conns.add(c) + return c + + +class StaticPool(Pool): + + """A Pool of exactly one connection, used for all requests. + + Reconnect-related functions such as ``recycle`` and connection + invalidation (which is also used to support auto-reconnect) are not + currently supported by this Pool implementation but may be implemented + in a future release. + + """ + + @util.memoized_property + def _conn(self): + return self._creator() + + @util.memoized_property + def connection(self): + return _ConnectionRecord(self) + + def status(self): + return "StaticPool" + + def dispose(self): + if '_conn' in self.__dict__: + self._conn.close() + self._conn = None + + def recreate(self): + self.logger.info("Pool recreating") + return self.__class__(creator=self._creator, + recycle=self._recycle, + use_threadlocal=self._use_threadlocal, + reset_on_return=self._reset_on_return, + echo=self.echo, + logging_name=self._orig_logging_name, + _dispatch=self.dispatch, + dialect=self._dialect) + + def _create_connection(self): + return self._conn + + def _do_return_conn(self, conn): + pass + + def _do_get(self): + return self.connection + + +class AssertionPool(Pool): + + """A :class:`.Pool` that allows at most one checked out connection at + any given time. + + This will raise an exception if more than one connection is checked out + at a time. Useful for debugging code that is using more connections + than desired. + + .. versionchanged:: 0.7 + :class:`.AssertionPool` also logs a traceback of where + the original connection was checked out, and reports + this in the assertion error raised. + + """ + + def __init__(self, *args, **kw): + self._conn = None + self._checked_out = False + self._store_traceback = kw.pop('store_traceback', True) + self._checkout_traceback = None + Pool.__init__(self, *args, **kw) + + def status(self): + return "AssertionPool" + + def _do_return_conn(self, conn): + if not self._checked_out: + raise AssertionError("connection is not checked out") + self._checked_out = False + assert conn is self._conn + + def dispose(self): + self._checked_out = False + if self._conn: + self._conn.close() + + def recreate(self): + self.logger.info("Pool recreating") + return self.__class__(self._creator, echo=self.echo, + logging_name=self._orig_logging_name, + _dispatch=self.dispatch, + dialect=self._dialect) + + def _do_get(self): + if self._checked_out: + if self._checkout_traceback: + suffix = ' at:\n%s' % ''.join( + chop_traceback(self._checkout_traceback)) + else: + suffix = '' + raise AssertionError("connection is already checked out" + suffix) + + if not self._conn: + self._conn = self._create_connection() + + self._checked_out = True + if self._store_traceback: + self._checkout_traceback = traceback.format_stack() + return self._conn diff --git a/test/engine/test_logging.py b/test/engine/test_logging.py index fa36612c0..7044867cf 100644 --- a/test/engine/test_logging.py +++ b/test/engine/test_logging.py @@ -303,7 +303,7 @@ class LoggingNameTest(fixtures.TestBase): for name in [b.name for b in self.buf.buffer]: assert name in ( 'sqlalchemy.engine.base.Engine.%s' % eng_name, - 'sqlalchemy.pool.%s.%s' % + 'sqlalchemy.pool.impl.%s.%s' % (eng.pool.__class__.__name__, pool_name) ) @@ -313,7 +313,7 @@ class LoggingNameTest(fixtures.TestBase): for name in [b.name for b in self.buf.buffer]: assert name in ( 'sqlalchemy.engine.base.Engine', - 'sqlalchemy.pool.%s' % eng.pool.__class__.__name__ + 'sqlalchemy.pool.impl.%s' % eng.pool.__class__.__name__ ) def _named_engine(self, **kw): diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index cdd028548..61737d253 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -1561,7 +1561,7 @@ class QueuePoolTest(PoolTestBase): self.assert_(p.checkedout() == 0) def test_recycle(self): - with patch("sqlalchemy.pool.time.time") as mock: + with patch("sqlalchemy.pool.base.time.time") as mock: mock.return_value = 10000 p = self._queuepool_fixture( |
