summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/pool
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2019-01-06 01:14:26 -0500
committermike bayer <mike_mp@zzzcomputing.com>2019-01-06 17:34:50 +0000
commit1e1a38e7801f410f244e4bbb44ec795ae152e04e (patch)
tree28e725c5c8188bd0cfd133d1e268dbca9b524978 /lib/sqlalchemy/pool
parent404e69426b05a82d905cbb3ad33adafccddb00dd (diff)
downloadsqlalchemy-1e1a38e7801f410f244e4bbb44ec795ae152e04e.tar.gz
Run black -l 79 against all source files
This is a straight reformat run using black as is, with no edits applied at all. The black run will format code consistently, however in some cases that are prevalent in SQLAlchemy code it produces too-long lines. The too-long lines will be resolved in the following commit that will resolve all remaining flake8 issues including shadowed builtins, long lines, import order, unused imports, duplicate imports, and docstring issues. Change-Id: I7eda77fed3d8e73df84b3651fd6cfcfe858d4dc9
Diffstat (limited to 'lib/sqlalchemy/pool')
-rw-r--r--lib/sqlalchemy/pool/__init__.py7
-rw-r--r--lib/sqlalchemy/pool/base.py185
-rw-r--r--lib/sqlalchemy/pool/dbapi_proxy.py12
-rw-r--r--lib/sqlalchemy/pool/impl.py142
4 files changed, 205 insertions, 141 deletions
diff --git a/lib/sqlalchemy/pool/__init__.py b/lib/sqlalchemy/pool/__init__.py
index f2f035051..2aa6eeeb7 100644
--- a/lib/sqlalchemy/pool/__init__.py
+++ b/lib/sqlalchemy/pool/__init__.py
@@ -20,7 +20,12 @@ SQLAlchemy connection pool.
from .base import _refs # noqa
from .base import Pool # noqa
from .impl import ( # noqa
- QueuePool, StaticPool, NullPool, AssertionPool, SingletonThreadPool)
+ QueuePool,
+ StaticPool,
+ NullPool,
+ AssertionPool,
+ SingletonThreadPool,
+)
from .dbapi_proxy import manage, clear_managers # noqa
from .base import reset_rollback, reset_commit, reset_none # noqa
diff --git a/lib/sqlalchemy/pool/base.py b/lib/sqlalchemy/pool/base.py
index 442d3b64a..382e740c6 100644
--- a/lib/sqlalchemy/pool/base.py
+++ b/lib/sqlalchemy/pool/base.py
@@ -18,9 +18,9 @@ from .. import exc, log, event, interfaces, util
from ..util import threading
-reset_rollback = util.symbol('reset_rollback')
-reset_commit = util.symbol('reset_commit')
-reset_none = util.symbol('reset_none')
+reset_rollback = util.symbol("reset_rollback")
+reset_commit = util.symbol("reset_commit")
+reset_none = util.symbol("reset_none")
class _ConnDialect(object):
@@ -46,7 +46,8 @@ class _ConnDialect(object):
def do_ping(self, dbapi_connection):
raise NotImplementedError(
"The ping feature requires that a dialect is "
- "passed to the connection pool.")
+ "passed to the connection pool."
+ )
class Pool(log.Identified):
@@ -55,16 +56,20 @@ class Pool(log.Identified):
_dialect = _ConnDialect()
- def __init__(self,
- creator, recycle=-1, echo=None,
- use_threadlocal=False,
- logging_name=None,
- reset_on_return=True,
- listeners=None,
- events=None,
- dialect=None,
- pre_ping=False,
- _dispatch=None):
+ def __init__(
+ self,
+ creator,
+ recycle=-1,
+ echo=None,
+ use_threadlocal=False,
+ logging_name=None,
+ reset_on_return=True,
+ listeners=None,
+ events=None,
+ dialect=None,
+ pre_ping=False,
+ _dispatch=None,
+ ):
"""
Construct a Pool.
@@ -200,16 +205,16 @@ class Pool(log.Identified):
self._invalidate_time = 0
self._use_threadlocal = use_threadlocal
self._pre_ping = pre_ping
- if reset_on_return in ('rollback', True, reset_rollback):
+ if reset_on_return in ("rollback", True, reset_rollback):
self._reset_on_return = reset_rollback
- elif reset_on_return in ('none', None, False, reset_none):
+ elif reset_on_return in ("none", None, False, reset_none):
self._reset_on_return = reset_none
- elif reset_on_return in ('commit', reset_commit):
+ elif reset_on_return in ("commit", reset_commit):
self._reset_on_return = reset_commit
else:
raise exc.ArgumentError(
- "Invalid value for 'reset_on_return': %r"
- % reset_on_return)
+ "Invalid value for 'reset_on_return': %r" % reset_on_return
+ )
self.echo = echo
@@ -223,17 +228,18 @@ class Pool(log.Identified):
if listeners:
util.warn_deprecated(
"The 'listeners' argument to Pool (and "
- "create_engine()) is deprecated. Use event.listen().")
+ "create_engine()) is deprecated. Use event.listen()."
+ )
for l in listeners:
self.add_listener(l)
@property
def _creator(self):
- return self.__dict__['_creator']
+ return self.__dict__["_creator"]
@_creator.setter
def _creator(self, creator):
- self.__dict__['_creator'] = creator
+ self.__dict__["_creator"] = creator
self._invoke_creator = self._should_wrap_creator(creator)
def _should_wrap_creator(self, creator):
@@ -252,7 +258,7 @@ class Pool(log.Identified):
# look for the exact arg signature that DefaultStrategy
# sends us
- if (argspec[0], argspec[3]) == (['connection_record'], (None,)):
+ if (argspec[0], argspec[3]) == (["connection_record"], (None,)):
return creator
# or just a single positional
elif positionals == 1:
@@ -268,11 +274,13 @@ class Pool(log.Identified):
try:
self._dialect.do_close(connection)
except Exception:
- self.logger.error("Exception closing connection %r",
- connection, exc_info=True)
+ self.logger.error(
+ "Exception closing connection %r", connection, exc_info=True
+ )
@util.deprecated(
- 2.7, "Pool.add_listener is deprecated. Use event.listen()")
+ 2.7, "Pool.add_listener is deprecated. Use event.listen()"
+ )
def add_listener(self, listener):
"""Add a :class:`.PoolListener`-like object to this pool.
@@ -315,7 +323,7 @@ class Pool(log.Identified):
rec = getattr(connection, "_connection_record", None)
if not rec or self._invalidate_time < rec.starttime:
self._invalidate_time = time.time()
- if _checkin and getattr(connection, 'is_valid', False):
+ if _checkin and getattr(connection, "is_valid", False):
connection.invalidate(exception)
def recreate(self):
@@ -491,15 +499,14 @@ class _ConnectionRecord(object):
fairy = _ConnectionFairy(dbapi_connection, rec, echo)
rec.fairy_ref = weakref.ref(
fairy,
- lambda ref: _finalize_fairy and
- _finalize_fairy(
- None,
- rec, pool, ref, echo)
+ lambda ref: _finalize_fairy
+ and _finalize_fairy(None, rec, pool, ref, echo),
)
_refs.add(rec)
if echo:
- pool.logger.debug("Connection %r checked out from pool",
- dbapi_connection)
+ pool.logger.debug(
+ "Connection %r checked out from pool", dbapi_connection
+ )
return fairy
def _checkin_failed(self, err):
@@ -563,12 +570,16 @@ class _ConnectionRecord(object):
self.__pool.logger.info(
"%sInvalidate connection %r (reason: %s:%s)",
"Soft " if soft else "",
- self.connection, e.__class__.__name__, e)
+ self.connection,
+ e.__class__.__name__,
+ e,
+ )
else:
self.__pool.logger.info(
"%sInvalidate connection %r",
"Soft " if soft else "",
- self.connection)
+ self.connection,
+ )
if soft:
self._soft_invalidate_time = time.time()
else:
@@ -580,24 +591,26 @@ class _ConnectionRecord(object):
if self.connection is None:
self.info.clear()
self.__connect()
- elif self.__pool._recycle > -1 and \
- time.time() - self.starttime > self.__pool._recycle:
+ elif (
+ self.__pool._recycle > -1
+ and time.time() - self.starttime > self.__pool._recycle
+ ):
self.__pool.logger.info(
- "Connection %r exceeded timeout; recycling",
- self.connection)
+ "Connection %r exceeded timeout; recycling", self.connection
+ )
recycle = True
elif self.__pool._invalidate_time > self.starttime:
self.__pool.logger.info(
- "Connection %r invalidated due to pool invalidation; " +
- "recycling",
- self.connection
+ "Connection %r invalidated due to pool invalidation; "
+ + "recycling",
+ self.connection,
)
recycle = True
elif self._soft_invalidate_time > self.starttime:
self.__pool.logger.info(
- "Connection %r invalidated due to local soft invalidation; " +
- "recycling",
- self.connection
+ "Connection %r invalidated due to local soft invalidation; "
+ + "recycling",
+ self.connection,
)
recycle = True
@@ -631,15 +644,16 @@ class _ConnectionRecord(object):
raise
else:
if first_connect_check:
- pool.dispatch.first_connect.\
- for_modify(pool.dispatch).\
- exec_once(self.connection, self)
+ pool.dispatch.first_connect.for_modify(
+ pool.dispatch
+ ).exec_once(self.connection, self)
if pool.dispatch.connect:
pool.dispatch.connect(self.connection, self)
-def _finalize_fairy(connection, connection_record,
- pool, ref, echo, fairy=None):
+def _finalize_fairy(
+ connection, connection_record, pool, ref, echo, fairy=None
+):
"""Cleanup for a :class:`._ConnectionFairy` whether or not it's already
been garbage collected.
@@ -654,12 +668,14 @@ def _finalize_fairy(connection, connection_record,
if connection is not None:
if connection_record and echo:
- pool.logger.debug("Connection %r being returned to pool",
- connection)
+ pool.logger.debug(
+ "Connection %r being returned to pool", connection
+ )
try:
fairy = fairy or _ConnectionFairy(
- connection, connection_record, echo)
+ connection, connection_record, echo
+ )
assert fairy.connection is connection
fairy._reset(pool)
@@ -670,7 +686,8 @@ def _finalize_fairy(connection, connection_record,
pool._close_connection(connection)
except BaseException as e:
pool.logger.error(
- "Exception during reset or similar", exc_info=True)
+ "Exception during reset or similar", exc_info=True
+ )
if connection_record:
connection_record.invalidate(e=e)
if not isinstance(e, Exception):
@@ -752,8 +769,9 @@ class _ConnectionFairy(object):
raise exc.InvalidRequestError("This connection is closed")
fairy._counter += 1
- if (not pool.dispatch.checkout and not pool._pre_ping) or \
- fairy._counter != 1:
+ if (
+ not pool.dispatch.checkout and not pool._pre_ping
+ ) or fairy._counter != 1:
return fairy
# Pool listeners can trigger a reconnection on checkout, as well
@@ -767,38 +785,45 @@ class _ConnectionFairy(object):
if pool._pre_ping:
if fairy._echo:
pool.logger.debug(
- "Pool pre-ping on connection %s",
- fairy.connection)
+ "Pool pre-ping on connection %s", fairy.connection
+ )
result = pool._dialect.do_ping(fairy.connection)
if not result:
if fairy._echo:
pool.logger.debug(
"Pool pre-ping on connection %s failed, "
- "will invalidate pool", fairy.connection)
+ "will invalidate pool",
+ fairy.connection,
+ )
raise exc.InvalidatePoolError()
- pool.dispatch.checkout(fairy.connection,
- fairy._connection_record,
- fairy)
+ pool.dispatch.checkout(
+ fairy.connection, fairy._connection_record, fairy
+ )
return fairy
except exc.DisconnectionError as e:
if e.invalidate_pool:
pool.logger.info(
"Disconnection detected on checkout, "
"invalidating all pooled connections prior to "
- "current timestamp (reason: %r)", e)
+ "current timestamp (reason: %r)",
+ e,
+ )
fairy._connection_record.invalidate(e)
pool._invalidate(fairy, e, _checkin=False)
else:
pool.logger.info(
"Disconnection detected on checkout, "
"invalidating individual connection %s (reason: %r)",
- fairy.connection, e)
+ fairy.connection,
+ e,
+ )
fairy._connection_record.invalidate(e)
try:
- fairy.connection = \
+ fairy.connection = (
fairy._connection_record.get_connection()
+ )
except Exception as err:
with util.safe_reraise():
fairy._connection_record._checkin_failed(err)
@@ -813,8 +838,14 @@ class _ConnectionFairy(object):
return _ConnectionFairy._checkout(self._pool, fairy=self)
def _checkin(self):
- _finalize_fairy(self.connection, self._connection_record,
- self._pool, None, self._echo, fairy=self)
+ _finalize_fairy(
+ self.connection,
+ self._connection_record,
+ self._pool,
+ None,
+ self._echo,
+ fairy=self,
+ )
self.connection = None
self._connection_record = None
@@ -825,20 +856,22 @@ class _ConnectionFairy(object):
pool.dispatch.reset(self, self._connection_record)
if pool._reset_on_return is reset_rollback:
if self._echo:
- pool.logger.debug("Connection %s rollback-on-return%s",
- self.connection,
- ", via agent"
- if self._reset_agent else "")
+ pool.logger.debug(
+ "Connection %s rollback-on-return%s",
+ self.connection,
+ ", via agent" if self._reset_agent else "",
+ )
if self._reset_agent:
self._reset_agent.rollback()
else:
pool._dialect.do_rollback(self)
elif pool._reset_on_return is reset_commit:
if self._echo:
- pool.logger.debug("Connection %s commit-on-return%s",
- self.connection,
- ", via agent"
- if self._reset_agent else "")
+ pool.logger.debug(
+ "Connection %s commit-on-return%s",
+ self.connection,
+ ", via agent" if self._reset_agent else "",
+ )
if self._reset_agent:
self._reset_agent.commit()
else:
@@ -964,5 +997,3 @@ class _ConnectionFairy(object):
self._counter -= 1
if self._counter == 0:
self._checkin()
-
-
diff --git a/lib/sqlalchemy/pool/dbapi_proxy.py b/lib/sqlalchemy/pool/dbapi_proxy.py
index aa439bd23..425c4a114 100644
--- a/lib/sqlalchemy/pool/dbapi_proxy.py
+++ b/lib/sqlalchemy/pool/dbapi_proxy.py
@@ -101,9 +101,10 @@ class _DBProxy(object):
self._create_pool_mutex.acquire()
try:
if key not in self.pools:
- kw.pop('sa_pool_key', None)
+ kw.pop("sa_pool_key", None)
pool = self.poolclass(
- lambda: self.module.connect(*args, **kw), **self.kw)
+ lambda: self.module.connect(*args, **kw), **self.kw
+ )
self.pools[key] = pool
return pool
else:
@@ -138,9 +139,6 @@ class _DBProxy(object):
def _serialize(self, *args, **kw):
if "sa_pool_key" in kw:
- return kw['sa_pool_key']
+ return kw["sa_pool_key"]
- return tuple(
- list(args) +
- [(k, kw[k]) for k in sorted(kw)]
- )
+ return tuple(list(args) + [(k, kw[k]) for k in sorted(kw)])
diff --git a/lib/sqlalchemy/pool/impl.py b/lib/sqlalchemy/pool/impl.py
index 3058d6247..6159f6a5b 100644
--- a/lib/sqlalchemy/pool/impl.py
+++ b/lib/sqlalchemy/pool/impl.py
@@ -30,8 +30,15 @@ class QueuePool(Pool):
"""
- def __init__(self, creator, pool_size=5, max_overflow=10, timeout=30, use_lifo=False,
- **kw):
+ def __init__(
+ self,
+ creator,
+ pool_size=5,
+ max_overflow=10,
+ timeout=30,
+ use_lifo=False,
+ **kw
+ ):
r"""
Construct a QueuePool.
@@ -117,8 +124,10 @@ class QueuePool(Pool):
else:
raise exc.TimeoutError(
"QueuePool limit of size %d overflow %d reached, "
- "connection timed out, timeout %d" %
- (self.size(), self.overflow(), self._timeout), code="3o7r")
+ "connection timed out, timeout %d"
+ % (self.size(), self.overflow(), self._timeout),
+ code="3o7r",
+ )
if self._inc_overflow():
try:
@@ -150,15 +159,19 @@ class QueuePool(Pool):
def recreate(self):
self.logger.info("Pool recreating")
- return self.__class__(self._creator, pool_size=self._pool.maxsize,
- max_overflow=self._max_overflow,
- timeout=self._timeout,
- recycle=self._recycle, echo=self.echo,
- logging_name=self._orig_logging_name,
- use_threadlocal=self._use_threadlocal,
- reset_on_return=self._reset_on_return,
- _dispatch=self.dispatch,
- dialect=self._dialect)
+ return self.__class__(
+ self._creator,
+ pool_size=self._pool.maxsize,
+ max_overflow=self._max_overflow,
+ timeout=self._timeout,
+ recycle=self._recycle,
+ echo=self.echo,
+ logging_name=self._orig_logging_name,
+ use_threadlocal=self._use_threadlocal,
+ reset_on_return=self._reset_on_return,
+ _dispatch=self.dispatch,
+ dialect=self._dialect,
+ )
def dispose(self):
while True:
@@ -172,12 +185,17 @@ class QueuePool(Pool):
self.logger.info("Pool disposed. %s", self.status())
def status(self):
- return "Pool size: %d Connections in pool: %d "\
- "Current Overflow: %d Current Checked out "\
- "connections: %d" % (self.size(),
- self.checkedin(),
- self.overflow(),
- self.checkedout())
+ return (
+ "Pool size: %d Connections in pool: %d "
+ "Current Overflow: %d Current Checked out "
+ "connections: %d"
+ % (
+ self.size(),
+ self.checkedin(),
+ self.overflow(),
+ self.checkedout(),
+ )
+ )
def size(self):
return self._pool.maxsize
@@ -221,14 +239,16 @@ class NullPool(Pool):
def recreate(self):
self.logger.info("Pool recreating")
- return self.__class__(self._creator,
- recycle=self._recycle,
- echo=self.echo,
- logging_name=self._orig_logging_name,
- use_threadlocal=self._use_threadlocal,
- reset_on_return=self._reset_on_return,
- _dispatch=self.dispatch,
- dialect=self._dialect)
+ return self.__class__(
+ self._creator,
+ recycle=self._recycle,
+ echo=self.echo,
+ logging_name=self._orig_logging_name,
+ use_threadlocal=self._use_threadlocal,
+ reset_on_return=self._reset_on_return,
+ _dispatch=self.dispatch,
+ dialect=self._dialect,
+ )
def dispose(self):
pass
@@ -266,7 +286,7 @@ class SingletonThreadPool(Pool):
"""
def __init__(self, creator, pool_size=5, **kw):
- kw['use_threadlocal'] = True
+ kw["use_threadlocal"] = True
Pool.__init__(self, creator, **kw)
self._conn = threading.local()
self._all_conns = set()
@@ -274,15 +294,17 @@ class SingletonThreadPool(Pool):
def recreate(self):
self.logger.info("Pool recreating")
- return self.__class__(self._creator,
- pool_size=self.size,
- recycle=self._recycle,
- echo=self.echo,
- logging_name=self._orig_logging_name,
- use_threadlocal=self._use_threadlocal,
- reset_on_return=self._reset_on_return,
- _dispatch=self.dispatch,
- dialect=self._dialect)
+ return self.__class__(
+ self._creator,
+ pool_size=self.size,
+ recycle=self._recycle,
+ echo=self.echo,
+ logging_name=self._orig_logging_name,
+ use_threadlocal=self._use_threadlocal,
+ reset_on_return=self._reset_on_return,
+ _dispatch=self.dispatch,
+ dialect=self._dialect,
+ )
def dispose(self):
"""Dispose of this pool."""
@@ -303,8 +325,10 @@ class SingletonThreadPool(Pool):
c.close()
def status(self):
- return "SingletonThreadPool id:%d size: %d" % \
- (id(self), len(self._all_conns))
+ return "SingletonThreadPool id:%d size: %d" % (
+ id(self),
+ len(self._all_conns),
+ )
def _do_return_conn(self, conn):
pass
@@ -347,20 +371,22 @@ class StaticPool(Pool):
return "StaticPool"
def dispose(self):
- if '_conn' in self.__dict__:
+ if "_conn" in self.__dict__:
self._conn.close()
self._conn = None
def recreate(self):
self.logger.info("Pool recreating")
- return self.__class__(creator=self._creator,
- recycle=self._recycle,
- use_threadlocal=self._use_threadlocal,
- reset_on_return=self._reset_on_return,
- echo=self.echo,
- logging_name=self._orig_logging_name,
- _dispatch=self.dispatch,
- dialect=self._dialect)
+ return self.__class__(
+ creator=self._creator,
+ recycle=self._recycle,
+ use_threadlocal=self._use_threadlocal,
+ reset_on_return=self._reset_on_return,
+ echo=self.echo,
+ logging_name=self._orig_logging_name,
+ _dispatch=self.dispatch,
+ dialect=self._dialect,
+ )
def _create_connection(self):
return self._conn
@@ -391,7 +417,7 @@ class AssertionPool(Pool):
def __init__(self, *args, **kw):
self._conn = None
self._checked_out = False
- self._store_traceback = kw.pop('store_traceback', True)
+ self._store_traceback = kw.pop("store_traceback", True)
self._checkout_traceback = None
Pool.__init__(self, *args, **kw)
@@ -411,18 +437,22 @@ class AssertionPool(Pool):
def recreate(self):
self.logger.info("Pool recreating")
- return self.__class__(self._creator, echo=self.echo,
- logging_name=self._orig_logging_name,
- _dispatch=self.dispatch,
- dialect=self._dialect)
+ return self.__class__(
+ self._creator,
+ echo=self.echo,
+ logging_name=self._orig_logging_name,
+ _dispatch=self.dispatch,
+ dialect=self._dialect,
+ )
def _do_get(self):
if self._checked_out:
if self._checkout_traceback:
- suffix = ' at:\n%s' % ''.join(
- chop_traceback(self._checkout_traceback))
+ suffix = " at:\n%s" % "".join(
+ chop_traceback(self._checkout_traceback)
+ )
else:
- suffix = ''
+ suffix = ""
raise AssertionError("connection is already checked out" + suffix)
if not self._conn: