summaryrefslogtreecommitdiff
path: root/test/engine/test_pool.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2019-01-06 01:14:26 -0500
committermike bayer <mike_mp@zzzcomputing.com>2019-01-06 17:34:50 +0000
commit1e1a38e7801f410f244e4bbb44ec795ae152e04e (patch)
tree28e725c5c8188bd0cfd133d1e268dbca9b524978 /test/engine/test_pool.py
parent404e69426b05a82d905cbb3ad33adafccddb00dd (diff)
downloadsqlalchemy-1e1a38e7801f410f244e4bbb44ec795ae152e04e.tar.gz
Run black -l 79 against all source files
This is a straight reformat run using black as is, with no edits applied at all. The black run will format code consistently, however in some cases that are prevalent in SQLAlchemy code it produces too-long lines. The too-long lines will be resolved in the following commit that will resolve all remaining flake8 issues including shadowed builtins, long lines, import order, unused imports, duplicate imports, and docstring issues. Change-Id: I7eda77fed3d8e73df84b3651fd6cfcfe858d4dc9
Diffstat (limited to 'test/engine/test_pool.py')
-rw-r--r--test/engine/test_pool.py547
1 files changed, 263 insertions, 284 deletions
diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py
index 99e50f582..547c265bb 100644
--- a/test/engine/test_pool.py
+++ b/test/engine/test_pool.py
@@ -29,7 +29,9 @@ def MockDBAPI(): # noqa
# adding a side_effect for close seems to help.
conn = Mock(
cursor=Mock(side_effect=cursor),
- close=Mock(side_effect=close), closed=False)
+ close=Mock(side_effect=close),
+ closed=False,
+ )
return conn
def shutdown(value):
@@ -40,9 +42,8 @@ def MockDBAPI(): # noqa
db.is_shutdown = value
db = Mock(
- connect=Mock(side_effect=connect),
- shutdown=shutdown,
- is_shutdown=False)
+ connect=Mock(side_effect=connect), shutdown=shutdown, is_shutdown=False
+ )
return db
@@ -71,18 +72,19 @@ class PoolTestBase(fixtures.TestBase):
def _queuepool_dbapi_fixture(self, **kw):
dbapi = MockDBAPI()
- return dbapi, pool.QueuePool(
- creator=lambda: dbapi.connect('foo.db'),
- **kw)
+ return (
+ dbapi,
+ pool.QueuePool(creator=lambda: dbapi.connect("foo.db"), **kw),
+ )
class PoolTest(PoolTestBase):
def test_manager(self):
manager = pool.manage(MockDBAPI(), use_threadlocal=True)
- c1 = manager.connect('foo.db')
- c2 = manager.connect('foo.db')
- c3 = manager.connect('bar.db')
+ c1 = manager.connect("foo.db")
+ c2 = manager.connect("foo.db")
+ c3 = manager.connect("bar.db")
c4 = manager.connect("foo.db", bar="bat")
c5 = manager.connect("foo.db", bar="hoho")
c6 = manager.connect("foo.db", bar="bat")
@@ -98,21 +100,15 @@ class PoolTest(PoolTestBase):
dbapi = MockDBAPI()
manager = pool.manage(dbapi, use_threadlocal=True)
- c1 = manager.connect('foo.db', sa_pool_key="a")
- c2 = manager.connect('foo.db', sa_pool_key="b")
- c3 = manager.connect('bar.db', sa_pool_key="a")
+ c1 = manager.connect("foo.db", sa_pool_key="a")
+ c2 = manager.connect("foo.db", sa_pool_key="b")
+ c3 = manager.connect("bar.db", sa_pool_key="a")
assert c1.cursor() is not None
assert c1 is not c2
assert c1 is c3
- eq_(
- dbapi.connect.mock_calls,
- [
- call("foo.db"),
- call("foo.db"),
- ]
- )
+ eq_(dbapi.connect.mock_calls, [call("foo.db"), call("foo.db")])
def test_bad_args(self):
manager = pool.manage(MockDBAPI())
@@ -121,20 +117,21 @@ class PoolTest(PoolTestBase):
def test_non_thread_local_manager(self):
manager = pool.manage(MockDBAPI(), use_threadlocal=False)
- connection = manager.connect('foo.db')
- connection2 = manager.connect('foo.db')
+ connection = manager.connect("foo.db")
+ connection2 = manager.connect("foo.db")
self.assert_(connection.cursor() is not None)
self.assert_(connection is not connection2)
- @testing.fails_on('+pyodbc',
- "pyodbc cursor doesn't implement tuple __eq__")
+ @testing.fails_on(
+ "+pyodbc", "pyodbc cursor doesn't implement tuple __eq__"
+ )
@testing.fails_on("+pg8000", "returns [1], not (1,)")
def test_cursor_iterable(self):
conn = testing.db.raw_connection()
cursor = conn.cursor()
cursor.execute(str(select([1], bind=testing.db)))
- expected = [(1, )]
+ expected = [(1,)]
for row in cursor:
eq_(row, expected.pop(0))
@@ -142,8 +139,13 @@ class PoolTest(PoolTestBase):
def creator():
raise Exception("no creates allowed")
- for cls in (pool.SingletonThreadPool, pool.StaticPool,
- pool.QueuePool, pool.NullPool, pool.AssertionPool):
+ for cls in (
+ pool.SingletonThreadPool,
+ pool.StaticPool,
+ pool.QueuePool,
+ pool.NullPool,
+ pool.AssertionPool,
+ ):
p = cls(creator=creator)
p.dispose()
p2 = p.recreate()
@@ -165,12 +167,17 @@ class PoolTest(PoolTestBase):
def _do_testthreadlocal(self, useclose=False):
dbapi = MockDBAPI()
- for p in pool.QueuePool(creator=dbapi.connect,
- pool_size=3, max_overflow=-1,
- use_threadlocal=True), \
- pool.SingletonThreadPool(
+ for p in (
+ pool.QueuePool(
creator=dbapi.connect,
- use_threadlocal=True):
+ pool_size=3,
+ max_overflow=-1,
+ use_threadlocal=True,
+ ),
+ pool.SingletonThreadPool(
+ creator=dbapi.connect, use_threadlocal=True
+ ),
+ ):
c1 = p.connect()
c2 = p.connect()
self.assert_(c1 is c2)
@@ -222,25 +229,25 @@ class PoolTest(PoolTestBase):
self.assert_(not c.info)
self.assert_(c.info is c._connection_record.info)
- c.info['foo'] = 'bar'
+ c.info["foo"] = "bar"
c.close()
del c
c = p.connect()
- self.assert_('foo' in c.info)
+ self.assert_("foo" in c.info)
c.invalidate()
c = p.connect()
- self.assert_('foo' not in c.info)
+ self.assert_("foo" not in c.info)
- c.info['foo2'] = 'bar2'
+ c.info["foo2"] = "bar2"
c.detach()
- self.assert_('foo2' in c.info)
+ self.assert_("foo2" in c.info)
c2 = p.connect()
is_not_(c.connection, c2.connection)
assert not c2.info
- assert 'foo2' in c.info
+ assert "foo2" in c.info
def test_rec_info(self):
p = self._queuepool_fixture(pool_size=1, max_overflow=0)
@@ -249,18 +256,18 @@ class PoolTest(PoolTestBase):
self.assert_(not c.record_info)
self.assert_(c.record_info is c._connection_record.record_info)
- c.record_info['foo'] = 'bar'
+ c.record_info["foo"] = "bar"
c.close()
del c
c = p.connect()
- self.assert_('foo' in c.record_info)
+ self.assert_("foo" in c.record_info)
c.invalidate()
c = p.connect()
- self.assert_('foo' in c.record_info)
+ self.assert_("foo" in c.record_info)
- c.record_info['foo2'] = 'bar2'
+ c.record_info["foo2"] = "bar2"
c.detach()
is_(c.record_info, None)
is_(c._connection_record, None)
@@ -268,16 +275,14 @@ class PoolTest(PoolTestBase):
c2 = p.connect()
assert c2.record_info
- assert 'foo2' in c2.record_info
+ assert "foo2" in c2.record_info
def test_rec_unconnected(self):
# test production of a _ConnectionRecord with an
# initially unconnected state.
dbapi = MockDBAPI()
- p1 = pool.Pool(
- creator=lambda: dbapi.connect('foo.db')
- )
+ p1 = pool.Pool(creator=lambda: dbapi.connect("foo.db"))
r1 = pool._ConnectionRecord(p1, connect=False)
@@ -289,9 +294,7 @@ class PoolTest(PoolTestBase):
# test that _ConnectionRecord.close() allows
# the record to be reusable
dbapi = MockDBAPI()
- p1 = pool.Pool(
- creator=lambda: dbapi.connect('foo.db')
- )
+ p1 = pool.Pool(creator=lambda: dbapi.connect("foo.db"))
r1 = pool._ConnectionRecord(p1)
@@ -302,20 +305,14 @@ class PoolTest(PoolTestBase):
r1.close()
assert not r1.connection
- eq_(
- c1.mock_calls,
- [call.close()]
- )
+ eq_(c1.mock_calls, [call.close()])
c2 = r1.get_connection()
is_not_(c1, c2)
is_(c2, r1.connection)
- eq_(
- c2.mock_calls,
- []
- )
+ eq_(c2.mock_calls, [])
class PoolDialectTest(PoolTestBase):
@@ -324,16 +321,17 @@ class PoolDialectTest(PoolTestBase):
class PoolDialect(object):
def do_rollback(self, dbapi_connection):
- canary.append('R')
+ canary.append("R")
dbapi_connection.rollback()
def do_commit(self, dbapi_connection):
- canary.append('C')
+ canary.append("C")
dbapi_connection.commit()
def do_close(self, dbapi_connection):
- canary.append('CL')
+ canary.append("CL")
dbapi_connection.close()
+
return PoolDialect(), canary
def _do_test(self, pool_cls, assertion):
@@ -351,19 +349,19 @@ class PoolDialectTest(PoolTestBase):
eq_(canary, assertion)
def test_queue_pool(self):
- self._do_test(pool.QueuePool, ['R', 'CL', 'R'])
+ self._do_test(pool.QueuePool, ["R", "CL", "R"])
def test_assertion_pool(self):
- self._do_test(pool.AssertionPool, ['R', 'CL', 'R'])
+ self._do_test(pool.AssertionPool, ["R", "CL", "R"])
def test_singleton_pool(self):
- self._do_test(pool.SingletonThreadPool, ['R', 'CL', 'R'])
+ self._do_test(pool.SingletonThreadPool, ["R", "CL", "R"])
def test_null_pool(self):
- self._do_test(pool.NullPool, ['R', 'CL', 'R', 'CL'])
+ self._do_test(pool.NullPool, ["R", "CL", "R", "CL"])
def test_static_pool(self):
- self._do_test(pool.StaticPool, ['R', 'R'])
+ self._do_test(pool.StaticPool, ["R", "R"])
class PoolEventsTest(PoolTestBase):
@@ -372,9 +370,9 @@ class PoolEventsTest(PoolTestBase):
canary = []
def first_connect(*arg, **kw):
- canary.append('first_connect')
+ canary.append("first_connect")
- event.listen(p, 'first_connect', first_connect)
+ event.listen(p, "first_connect", first_connect)
return p, canary
@@ -383,9 +381,9 @@ class PoolEventsTest(PoolTestBase):
canary = []
def connect(*arg, **kw):
- canary.append('connect')
+ canary.append("connect")
- event.listen(p, 'connect', connect)
+ event.listen(p, "connect", connect)
return p, canary
@@ -394,8 +392,9 @@ class PoolEventsTest(PoolTestBase):
canary = []
def checkout(*arg, **kw):
- canary.append('checkout')
- event.listen(p, 'checkout', checkout)
+ canary.append("checkout")
+
+ event.listen(p, "checkout", checkout)
return p, canary
@@ -404,8 +403,9 @@ class PoolEventsTest(PoolTestBase):
canary = []
def checkin(*arg, **kw):
- canary.append('checkin')
- event.listen(p, 'checkin', checkin)
+ canary.append("checkin")
+
+ event.listen(p, "checkin", checkin)
return p, canary
@@ -414,43 +414,44 @@ class PoolEventsTest(PoolTestBase):
canary = []
def reset(*arg, **kw):
- canary.append('reset')
- event.listen(p, 'reset', reset)
+ canary.append("reset")
+
+ event.listen(p, "reset", reset)
return p, canary
def _invalidate_event_fixture(self):
p = self._queuepool_fixture()
canary = Mock()
- event.listen(p, 'invalidate', canary)
+ event.listen(p, "invalidate", canary)
return p, canary
def _soft_invalidate_event_fixture(self):
p = self._queuepool_fixture()
canary = Mock()
- event.listen(p, 'soft_invalidate', canary)
+ event.listen(p, "soft_invalidate", canary)
return p, canary
def _close_event_fixture(self):
p = self._queuepool_fixture()
canary = Mock()
- event.listen(p, 'close', canary)
+ event.listen(p, "close", canary)
return p, canary
def _detach_event_fixture(self):
p = self._queuepool_fixture()
canary = Mock()
- event.listen(p, 'detach', canary)
+ event.listen(p, "detach", canary)
return p, canary
def _close_detached_event_fixture(self):
p = self._queuepool_fixture()
canary = Mock()
- event.listen(p, 'close_detached', canary)
+ event.listen(p, "close_detached", canary)
return p, canary
@@ -497,7 +498,7 @@ class PoolEventsTest(PoolTestBase):
p, canary = self._first_connect_event_fixture()
p.connect()
- eq_(canary, ['first_connect'])
+ eq_(canary, ["first_connect"])
def test_first_connect_event_fires_once(self):
p, canary = self._first_connect_event_fixture()
@@ -505,7 +506,7 @@ class PoolEventsTest(PoolTestBase):
p.connect()
p.connect()
- eq_(canary, ['first_connect'])
+ eq_(canary, ["first_connect"])
def test_first_connect_on_previously_recreated(self):
p, canary = self._first_connect_event_fixture()
@@ -514,7 +515,7 @@ class PoolEventsTest(PoolTestBase):
p.connect()
p2.connect()
- eq_(canary, ['first_connect', 'first_connect'])
+ eq_(canary, ["first_connect", "first_connect"])
def test_first_connect_on_subsequently_recreated(self):
p, canary = self._first_connect_event_fixture()
@@ -523,13 +524,13 @@ class PoolEventsTest(PoolTestBase):
p2 = p.recreate()
p2.connect()
- eq_(canary, ['first_connect', 'first_connect'])
+ eq_(canary, ["first_connect", "first_connect"])
def test_connect_event(self):
p, canary = self._connect_event_fixture()
p.connect()
- eq_(canary, ['connect'])
+ eq_(canary, ["connect"])
def test_connect_event_fires_subsequent(self):
p, canary = self._connect_event_fixture()
@@ -537,7 +538,7 @@ class PoolEventsTest(PoolTestBase):
c1 = p.connect() # noqa
c2 = p.connect() # noqa
- eq_(canary, ['connect', 'connect'])
+ eq_(canary, ["connect", "connect"])
def test_connect_on_previously_recreated(self):
p, canary = self._connect_event_fixture()
@@ -547,7 +548,7 @@ class PoolEventsTest(PoolTestBase):
p.connect()
p2.connect()
- eq_(canary, ['connect', 'connect'])
+ eq_(canary, ["connect", "connect"])
def test_connect_on_subsequently_recreated(self):
p, canary = self._connect_event_fixture()
@@ -556,20 +557,20 @@ class PoolEventsTest(PoolTestBase):
p2 = p.recreate()
p2.connect()
- eq_(canary, ['connect', 'connect'])
+ eq_(canary, ["connect", "connect"])
def test_checkout_event(self):
p, canary = self._checkout_event_fixture()
p.connect()
- eq_(canary, ['checkout'])
+ eq_(canary, ["checkout"])
def test_checkout_event_fires_subsequent(self):
p, canary = self._checkout_event_fixture()
p.connect()
p.connect()
- eq_(canary, ['checkout', 'checkout'])
+ eq_(canary, ["checkout", "checkout"])
def test_checkout_event_on_subsequently_recreated(self):
p, canary = self._checkout_event_fixture()
@@ -578,7 +579,7 @@ class PoolEventsTest(PoolTestBase):
p2 = p.recreate()
p2.connect()
- eq_(canary, ['checkout', 'checkout'])
+ eq_(canary, ["checkout", "checkout"])
def test_checkin_event(self):
p, canary = self._checkin_event_fixture()
@@ -586,7 +587,7 @@ class PoolEventsTest(PoolTestBase):
c1 = p.connect()
eq_(canary, [])
c1.close()
- eq_(canary, ['checkin'])
+ eq_(canary, ["checkin"])
def test_reset_event(self):
p, canary = self._reset_event_fixture()
@@ -594,7 +595,7 @@ class PoolEventsTest(PoolTestBase):
c1 = p.connect()
eq_(canary, [])
c1.close()
- eq_(canary, ['reset'])
+ eq_(canary, ["reset"])
def test_soft_invalidate_event_no_exception(self):
p, canary = self._soft_invalidate_event_fixture()
@@ -653,7 +654,7 @@ class PoolEventsTest(PoolTestBase):
eq_(canary, [])
del c1
lazy_gc()
- eq_(canary, ['checkin'])
+ eq_(canary, ["checkin"])
def test_checkin_event_on_subsequently_recreated(self):
p, canary = self._checkin_event_fixture()
@@ -665,10 +666,10 @@ class PoolEventsTest(PoolTestBase):
eq_(canary, [])
c1.close()
- eq_(canary, ['checkin'])
+ eq_(canary, ["checkin"])
c2.close()
- eq_(canary, ['checkin', 'checkin'])
+ eq_(canary, ["checkin", "checkin"])
def test_listen_targets_scope(self):
canary = []
@@ -686,15 +687,14 @@ class PoolEventsTest(PoolTestBase):
canary.append("listen_four")
engine = testing_engine(testing.db.url)
- event.listen(pool.Pool, 'connect', listen_one)
- event.listen(engine.pool, 'connect', listen_two)
- event.listen(engine, 'connect', listen_three)
- event.listen(engine.__class__, 'connect', listen_four)
+ event.listen(pool.Pool, "connect", listen_one)
+ event.listen(engine.pool, "connect", listen_two)
+ event.listen(engine, "connect", listen_three)
+ event.listen(engine.__class__, "connect", listen_four)
engine.execute(select([1])).close()
eq_(
- canary,
- ["listen_one", "listen_four", "listen_two", "listen_three"]
+ canary, ["listen_one", "listen_four", "listen_two", "listen_three"]
)
def test_listen_targets_per_subclass(self):
@@ -712,9 +712,9 @@ class PoolEventsTest(PoolTestBase):
def listen_three(*args):
canary.append("listen_three")
- event.listen(pool.Pool, 'connect', listen_one)
- event.listen(pool.QueuePool, 'connect', listen_two)
- event.listen(pool.SingletonThreadPool, 'connect', listen_three)
+ event.listen(pool.Pool, "connect", listen_one)
+ event.listen(pool.QueuePool, "connect", listen_two)
+ event.listen(pool.SingletonThreadPool, "connect", listen_three)
p1 = pool.QueuePool(creator=MockDBAPI().connect)
p2 = pool.SingletonThreadPool(creator=MockDBAPI().connect)
@@ -739,15 +739,16 @@ class PoolEventsTest(PoolTestBase):
raise Exception("it failed")
def listen_two(conn, rec):
- rec.info['important_flag'] = True
+ rec.info["important_flag"] = True
p1 = pool.QueuePool(
- creator=MockDBAPI().connect, pool_size=1, max_overflow=0)
- event.listen(p1, 'connect', listen_one)
- event.listen(p1, 'connect', listen_two)
+ creator=MockDBAPI().connect, pool_size=1, max_overflow=0
+ )
+ event.listen(p1, "connect", listen_one)
+ event.listen(p1, "connect", listen_two)
conn = p1.connect()
- eq_(conn.info['important_flag'], True)
+ eq_(conn.info["important_flag"], True)
conn.invalidate()
conn.close()
@@ -757,7 +758,7 @@ class PoolEventsTest(PoolTestBase):
fail = False
conn = p1.connect()
- eq_(conn.info['important_flag'], True)
+ eq_(conn.info["important_flag"], True)
conn.close()
def teardown(self):
@@ -775,21 +776,21 @@ class PoolFirstConnectSyncTest(PoolTestBase):
evt = Mock()
- @event.listens_for(pool, 'first_connect')
+ @event.listens_for(pool, "first_connect")
def slow_first_connect(dbapi_con, rec):
time.sleep(1)
evt.first_connect()
- @event.listens_for(pool, 'connect')
+ @event.listens_for(pool, "connect")
def on_connect(dbapi_con, rec):
evt.connect()
def checkout():
for j in range(2):
c1 = pool.connect()
- time.sleep(.02)
+ time.sleep(0.02)
c1.close()
- time.sleep(.02)
+ time.sleep(0.02)
threads = []
for i in range(5):
@@ -805,7 +806,8 @@ class PoolFirstConnectSyncTest(PoolTestBase):
call.first_connect(),
call.connect(),
call.connect(),
- call.connect()]
+ call.connect(),
+ ],
)
@@ -813,16 +815,15 @@ class DeprecatedPoolListenerTest(PoolTestBase):
@testing.requires.predictable_gc
@testing.uses_deprecated(r".*Use event.listen")
def test_listeners(self):
-
class InstrumentingListener(object):
def __init__(self):
- if hasattr(self, 'connect'):
+ if hasattr(self, "connect"):
self.connect = self.inst_connect
- if hasattr(self, 'first_connect'):
+ if hasattr(self, "first_connect"):
self.first_connect = self.inst_first_connect
- if hasattr(self, 'checkout'):
+ if hasattr(self, "checkout"):
self.checkout = self.inst_checkout
- if hasattr(self, 'checkin'):
+ if hasattr(self, "checkin"):
self.checkin = self.inst_checkin
self.clear()
@@ -838,9 +839,7 @@ class DeprecatedPoolListenerTest(PoolTestBase):
eq_(len(self.checked_out), cout)
eq_(len(self.checked_in), cin)
- def assert_in(
- self, item, in_conn, in_fconn,
- in_cout, in_cin):
+ def assert_in(self, item, in_conn, in_fconn, in_cout, in_cin):
eq_((item in self.connected), in_conn)
eq_((item in self.first_connected), in_fconn)
eq_((item in self.checked_out), in_cout)
@@ -1056,7 +1055,6 @@ class DeprecatedPoolListenerTest(PoolTestBase):
class QueuePoolTest(PoolTestBase):
-
def test_queuepool_del(self):
self._do_testqueuepool(useclose=False)
@@ -1064,13 +1062,15 @@ class QueuePoolTest(PoolTestBase):
self._do_testqueuepool(useclose=True)
def _do_testqueuepool(self, useclose=False):
- p = self._queuepool_fixture(
- pool_size=3,
- max_overflow=-1)
+ p = self._queuepool_fixture(pool_size=3, max_overflow=-1)
def status(pool):
- return pool.size(), pool.checkedin(), pool.overflow(), \
- pool.checkedout()
+ return (
+ pool.size(),
+ pool.checkedin(),
+ pool.overflow(),
+ pool.checkedout(),
+ )
c1 = p.connect()
self.assert_(status(p) == (3, 0, -2, 1))
@@ -1115,19 +1115,13 @@ class QueuePoolTest(PoolTestBase):
@testing.requires.timing_intensive
def test_timeout(self):
- p = self._queuepool_fixture(
- pool_size=3,
- max_overflow=0,
- timeout=2)
+ p = self._queuepool_fixture(pool_size=3, max_overflow=0, timeout=2)
c1 = p.connect() # noqa
c2 = p.connect() # noqa
c3 = p.connect() # noqa
now = time.time()
- assert_raises(
- tsa.exc.TimeoutError,
- p.connect
- )
+ assert_raises(tsa.exc.TimeoutError, p.connect)
assert int(time.time() - now) == 2
@testing.requires.threading_with_mock
@@ -1142,9 +1136,12 @@ class QueuePoolTest(PoolTestBase):
# them back to the start of do_get()
dbapi = MockDBAPI()
p = pool.QueuePool(
- creator=lambda: dbapi.connect(delay=.05),
+ creator=lambda: dbapi.connect(delay=0.05),
pool_size=2,
- max_overflow=1, use_threadlocal=False, timeout=3)
+ max_overflow=1,
+ use_threadlocal=False,
+ timeout=3,
+ )
timeouts = []
def checkout():
@@ -1180,25 +1177,26 @@ class QueuePoolTest(PoolTestBase):
mutex = threading.Lock()
def creator():
- time.sleep(.05)
+ time.sleep(0.05)
with mutex:
return dbapi.connect()
- p = pool.QueuePool(creator=creator,
- pool_size=3, timeout=2,
- max_overflow=max_overflow)
+ p = pool.QueuePool(
+ creator=creator, pool_size=3, timeout=2, max_overflow=max_overflow
+ )
peaks = []
def whammy():
for i in range(10):
try:
con = p.connect()
- time.sleep(.005)
+ time.sleep(0.005)
peaks.append(p.overflow())
con.close()
del con
except tsa.exc.TimeoutError:
pass
+
threads = []
for i in range(thread_count):
th = threading.Thread(target=whammy)
@@ -1270,28 +1268,29 @@ class QueuePoolTest(PoolTestBase):
p = pool.QueuePool(creator=create, pool_size=2, max_overflow=3)
threads = [
+ threading.Thread(target=run_test, args=("success_one", p, False)),
+ threading.Thread(target=run_test, args=("success_two", p, False)),
+ threading.Thread(target=run_test, args=("overflow_one", p, True)),
+ threading.Thread(target=run_test, args=("overflow_two", p, False)),
threading.Thread(
- target=run_test, args=("success_one", p, False)),
- threading.Thread(
- target=run_test, args=("success_two", p, False)),
- threading.Thread(
- target=run_test, args=("overflow_one", p, True)),
- threading.Thread(
- target=run_test, args=("overflow_two", p, False)),
- threading.Thread(
- target=run_test, args=("overflow_three", p, False))
+ target=run_test, args=("overflow_three", p, False)
+ ),
]
for t in threads:
t.start()
- time.sleep(.2)
+ time.sleep(0.2)
for t in threads:
t.join(timeout=join_timeout)
eq_(
dbapi.connect().operation.mock_calls,
- [call("success_one"), call("success_two"),
- call("overflow_two"), call("overflow_three"),
- call("overflow_one")]
+ [
+ call("success_one"),
+ call("success_two"),
+ call("overflow_two"),
+ call("overflow_three"),
+ call("overflow_one"),
+ ],
)
@testing.requires.threading_with_mock
@@ -1314,15 +1313,18 @@ class QueuePoolTest(PoolTestBase):
success = []
for timeout in (None, 30):
for max_overflow in (0, -1, 3):
- p = pool.QueuePool(creator=creator,
- pool_size=2, timeout=timeout,
- max_overflow=max_overflow)
+ p = pool.QueuePool(
+ creator=creator,
+ pool_size=2,
+ timeout=timeout,
+ max_overflow=max_overflow,
+ )
def waiter(p, timeout, max_overflow):
success_key = (timeout, max_overflow)
conn = p.connect()
success.append(success_key)
- time.sleep(.1)
+ time.sleep(0.1)
conn.close()
c1 = p.connect() # noqa
@@ -1331,8 +1333,8 @@ class QueuePoolTest(PoolTestBase):
threads = []
for i in range(2):
t = threading.Thread(
- target=waiter,
- args=(p, timeout, max_overflow))
+ target=waiter, args=(p, timeout, max_overflow)
+ )
t.daemon = True
t.start()
threads.append(t)
@@ -1341,7 +1343,7 @@ class QueuePoolTest(PoolTestBase):
# two waiter threads hit upon wait()
# inside the queue, before we invalidate the other
# two conns
- time.sleep(.2)
+ time.sleep(0.2)
p._invalidate(c2)
for t in threads:
@@ -1381,8 +1383,9 @@ class QueuePoolTest(PoolTestBase):
return fairy
with patch(
- "sqlalchemy.pool._ConnectionRecord.checkout",
- _decorate_existing_checkout):
+ "sqlalchemy.pool._ConnectionRecord.checkout",
+ _decorate_existing_checkout,
+ ):
conn = p.connect()
is_(conn._connection_record.connection, None)
conn.close()
@@ -1397,25 +1400,25 @@ class QueuePoolTest(PoolTestBase):
def creator():
canary.append(1)
return dbapi.connect()
+
p1 = pool.QueuePool(
- creator=creator,
- pool_size=1, timeout=None,
- max_overflow=0)
+ creator=creator, pool_size=1, timeout=None, max_overflow=0
+ )
def waiter(p):
conn = p.connect()
canary.append(2)
- time.sleep(.5)
+ time.sleep(0.5)
conn.close()
c1 = p1.connect()
threads = []
for i in range(5):
- t = threading.Thread(target=waiter, args=(p1, ))
+ t = threading.Thread(target=waiter, args=(p1,))
t.start()
threads.append(t)
- time.sleep(.5)
+ time.sleep(0.5)
eq_(canary, [1])
# this also calls invalidate()
@@ -1430,9 +1433,9 @@ class QueuePoolTest(PoolTestBase):
def test_dispose_closes_pooled(self):
dbapi = MockDBAPI()
- p = pool.QueuePool(creator=dbapi.connect,
- pool_size=2, timeout=None,
- max_overflow=0)
+ p = pool.QueuePool(
+ creator=dbapi.connect, pool_size=2, timeout=None, max_overflow=0
+ )
c1 = p.connect()
c2 = p.connect()
c1_con = c1.connection
@@ -1473,8 +1476,9 @@ class QueuePoolTest(PoolTestBase):
def test_mixed_close(self):
pool._refs.clear()
- p = self._queuepool_fixture(pool_size=3, max_overflow=-1,
- use_threadlocal=True)
+ p = self._queuepool_fixture(
+ pool_size=3, max_overflow=-1, use_threadlocal=True
+ )
c1 = p.connect()
c2 = p.connect()
assert c1 is c2
@@ -1494,9 +1498,7 @@ class QueuePoolTest(PoolTestBase):
self._test_overflow_no_gc(False)
def _test_overflow_no_gc(self, threadlocal):
- p = self._queuepool_fixture(
- pool_size=2,
- max_overflow=2)
+ p = self._queuepool_fixture(pool_size=2, max_overflow=2)
# disable weakref collection of the
# underlying connections
@@ -1521,14 +1523,14 @@ class QueuePoolTest(PoolTestBase):
eq_(
set([c.close.call_count for c in strong_refs]),
- set([1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 0])
+ set([1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 0]),
)
@testing.requires.predictable_gc
def test_weakref_kaboom(self):
p = self._queuepool_fixture(
- pool_size=3,
- max_overflow=-1, use_threadlocal=True)
+ pool_size=3, max_overflow=-1, use_threadlocal=True
+ )
c1 = p.connect()
c2 = p.connect()
c1.close()
@@ -1548,8 +1550,8 @@ class QueuePoolTest(PoolTestBase):
reference counting."""
p = self._queuepool_fixture(
- pool_size=3,
- max_overflow=-1, use_threadlocal=True)
+ pool_size=3, max_overflow=-1, use_threadlocal=True
+ )
c1 = p.connect()
c2 = p.connect()
assert c1 is c2
@@ -1565,9 +1567,8 @@ class QueuePoolTest(PoolTestBase):
mock.return_value = 10000
p = self._queuepool_fixture(
- pool_size=1,
- max_overflow=0,
- recycle=30)
+ pool_size=1, max_overflow=0, recycle=30
+ )
c1 = p.connect()
c_ref = weakref.ref(c1.connection)
c1.close()
@@ -1583,9 +1584,7 @@ class QueuePoolTest(PoolTestBase):
@testing.requires.timing_intensive
def test_recycle_on_invalidate(self):
- p = self._queuepool_fixture(
- pool_size=1,
- max_overflow=0)
+ p = self._queuepool_fixture(pool_size=1, max_overflow=0)
c1 = p.connect()
c_ref = weakref.ref(c1.connection)
c1.close()
@@ -1596,16 +1595,14 @@ class QueuePoolTest(PoolTestBase):
p._invalidate(c2)
assert c2_rec.connection is None
c2.close()
- time.sleep(.5)
+ time.sleep(0.5)
c3 = p.connect()
is_not_(c3.connection, c_ref())
@testing.requires.timing_intensive
def test_recycle_on_soft_invalidate(self):
- p = self._queuepool_fixture(
- pool_size=1,
- max_overflow=0)
+ p = self._queuepool_fixture(pool_size=1, max_overflow=0)
c1 = p.connect()
c_ref = weakref.ref(c1.connection)
c1.close()
@@ -1617,7 +1614,7 @@ class QueuePoolTest(PoolTestBase):
is_(c2_rec.connection, c2.connection)
c2.close()
- time.sleep(.5)
+ time.sleep(0.5)
c3 = p.connect()
is_not_(c3.connection, c_ref())
is_(c3._connection_record, c2_rec)
@@ -1627,15 +1624,17 @@ class QueuePoolTest(PoolTestBase):
finalize_fairy = pool._finalize_fairy
def assert_no_wr_callback(
- connection, connection_record,
- pool, ref, echo, fairy=None):
+ connection, connection_record, pool, ref, echo, fairy=None
+ ):
if fairy is None:
raise AssertionError(
- "finalize fairy was called as a weakref callback")
+ "finalize fairy was called as a weakref callback"
+ )
return finalize_fairy(
- connection, connection_record, pool, ref, echo, fairy)
- return patch.object(
- pool, '_finalize_fairy', assert_no_wr_callback)
+ connection, connection_record, pool, ref, echo, fairy
+ )
+
+ return patch.object(pool, "_finalize_fairy", assert_no_wr_callback)
def _assert_cleanup_on_pooled_reconnect(self, dbapi, p):
# p is QueuePool with size=1, max_overflow=2,
@@ -1646,10 +1645,7 @@ class QueuePoolTest(PoolTestBase):
eq_(p.checkedout(), 0)
eq_(p._overflow, 0)
dbapi.shutdown(True)
- assert_raises(
- Exception,
- p.connect
- )
+ assert_raises(Exception, p.connect)
eq_(p._overflow, 0)
eq_(p.checkedout(), 0) # and not 1
@@ -1675,8 +1671,8 @@ class QueuePoolTest(PoolTestBase):
@testing.requires.timing_intensive
def test_error_on_pooled_reconnect_cleanup_recycle(self):
dbapi, p = self._queuepool_dbapi_fixture(
- pool_size=1,
- max_overflow=2, recycle=1)
+ pool_size=1, max_overflow=2, recycle=1
+ )
c1 = p.connect()
c1.close()
time.sleep(1.5)
@@ -1685,8 +1681,7 @@ class QueuePoolTest(PoolTestBase):
def test_connect_handler_not_called_for_recycled(self):
"""test [ticket:3497]"""
- dbapi, p = self._queuepool_dbapi_fixture(
- pool_size=2, max_overflow=2)
+ dbapi, p = self._queuepool_dbapi_fixture(pool_size=2, max_overflow=2)
canary = Mock()
@@ -1706,16 +1701,10 @@ class QueuePoolTest(PoolTestBase):
event.listen(p, "connect", canary.connect)
event.listen(p, "checkout", canary.checkout)
- assert_raises(
- Exception,
- p.connect
- )
+ assert_raises(Exception, p.connect)
p._pool.queue = collections.deque(
- [
- c for c in p._pool.queue
- if c.connection is not None
- ]
+ [c for c in p._pool.queue if c.connection is not None]
)
dbapi.shutdown(False)
@@ -1724,17 +1713,13 @@ class QueuePoolTest(PoolTestBase):
eq_(
canary.mock_calls,
- [
- call.connect(ANY, ANY),
- call.checkout(ANY, ANY, ANY)
- ]
+ [call.connect(ANY, ANY), call.checkout(ANY, ANY, ANY)],
)
def test_connect_checkout_handler_always_gets_info(self):
"""test [ticket:3497]"""
- dbapi, p = self._queuepool_dbapi_fixture(
- pool_size=2, max_overflow=2)
+ dbapi, p = self._queuepool_dbapi_fixture(pool_size=2, max_overflow=2)
c1 = p.connect()
c2 = p.connect()
@@ -1751,22 +1736,16 @@ class QueuePoolTest(PoolTestBase):
@event.listens_for(p, "connect")
def connect(conn, conn_rec):
- conn_rec.info['x'] = True
+ conn_rec.info["x"] = True
@event.listens_for(p, "checkout")
def checkout(conn, conn_rec, conn_f):
- assert 'x' in conn_rec.info
+ assert "x" in conn_rec.info
- assert_raises(
- Exception,
- p.connect
- )
+ assert_raises(Exception, p.connect)
p._pool.queue = collections.deque(
- [
- c for c in p._pool.queue
- if c.connection is not None
- ]
+ [c for c in p._pool.queue if c.connection is not None]
)
dbapi.shutdown(False)
@@ -1774,9 +1753,7 @@ class QueuePoolTest(PoolTestBase):
c.close()
def test_error_on_pooled_reconnect_cleanup_wcheckout_event(self):
- dbapi, p = self._queuepool_dbapi_fixture(
- pool_size=1,
- max_overflow=2)
+ dbapi, p = self._queuepool_dbapi_fixture(pool_size=1, max_overflow=2)
c1 = p.connect()
c1.close()
@@ -1791,12 +1768,12 @@ class QueuePoolTest(PoolTestBase):
@testing.requires.predictable_gc
def test_userspace_disconnectionerror_weakref_finalizer(self):
dbapi, pool = self._queuepool_dbapi_fixture(
- pool_size=1,
- max_overflow=2)
+ pool_size=1, max_overflow=2
+ )
@event.listens_for(pool, "checkout")
def handle_checkout_event(dbapi_con, con_record, con_proxy):
- if getattr(dbapi_con, 'boom') == 'yes':
+ if getattr(dbapi_con, "boom") == "yes":
raise tsa.exc.DisconnectionError()
conn = pool.connect()
@@ -1805,7 +1782,7 @@ class QueuePoolTest(PoolTestBase):
eq_(old_dbapi_conn.mock_calls, [call.rollback()])
- old_dbapi_conn.boom = 'yes'
+ old_dbapi_conn.boom = "yes"
conn = pool.connect()
dbapi_conn = conn.connection
@@ -1817,16 +1794,13 @@ class QueuePoolTest(PoolTestBase):
# old connection was just closed - did not get an
# erroneous reset on return
- eq_(
- old_dbapi_conn.mock_calls,
- [call.rollback(), call.close()]
- )
+ eq_(old_dbapi_conn.mock_calls, [call.rollback(), call.close()])
@testing.requires.timing_intensive
def test_recycle_pool_no_race(self):
def slow_close():
slow_closing_connection._slow_close()
- time.sleep(.5)
+ time.sleep(0.5)
slow_closing_connection = Mock()
slow_closing_connection.connect.return_value.close = slow_close
@@ -1847,9 +1821,11 @@ class QueuePoolTest(PoolTestBase):
def creator():
return slow_closing_connection.connect()
+
p1 = TrackQueuePool(creator=creator, pool_size=20)
from sqlalchemy import create_engine
+
eng = create_engine(testing.db.url, pool=p1, _initialize=False)
eng.dialect = dialect
@@ -1864,8 +1840,8 @@ class QueuePoolTest(PoolTestBase):
time.sleep(random.random())
try:
conn._handle_dbapi_exception(
- Error(), "statement", {},
- Mock(), Mock())
+ Error(), "statement", {}, Mock(), Mock()
+ )
except tsa.exc.DBAPIError:
pass
@@ -1873,7 +1849,7 @@ class QueuePoolTest(PoolTestBase):
# connections
threads = []
for conn in conns:
- t = threading.Thread(target=attempt, args=(conn, ))
+ t = threading.Thread(target=attempt, args=(conn,))
t.start()
threads.append(t)
@@ -1908,8 +1884,9 @@ class QueuePoolTest(PoolTestBase):
assert c1.connection.id != c_id
def test_recreate(self):
- p = self._queuepool_fixture(reset_on_return=None, pool_size=1,
- max_overflow=0)
+ p = self._queuepool_fixture(
+ reset_on_return=None, pool_size=1, max_overflow=0
+ )
p2 = p.recreate()
assert p2.size() == 1
assert p2._reset_on_return is pool.reset_none
@@ -1964,8 +1941,9 @@ class QueuePoolTest(PoolTestBase):
eq_(c2_con.close.call_count, 0)
def test_threadfairy(self):
- p = self._queuepool_fixture(pool_size=3, max_overflow=-1,
- use_threadlocal=True)
+ p = self._queuepool_fixture(
+ pool_size=3, max_overflow=-1, use_threadlocal=True
+ )
c1 = p.connect()
c1.close()
c2 = p.connect()
@@ -1978,9 +1956,7 @@ class QueuePoolTest(PoolTestBase):
rec = c1._connection_record
c1.close()
assert_raises_message(
- Warning,
- "Double checkin attempted on %s" % rec,
- rec.checkin
+ Warning, "Double checkin attempted on %s" % rec, rec.checkin
)
def test_lifo(self):
@@ -2064,12 +2040,13 @@ class QueuePoolTest(PoolTestBase):
class ResetOnReturnTest(PoolTestBase):
def _fixture(self, **kw):
dbapi = Mock()
- return dbapi, pool.QueuePool(
- creator=lambda: dbapi.connect('foo.db'),
- **kw)
+ return (
+ dbapi,
+ pool.QueuePool(creator=lambda: dbapi.connect("foo.db"), **kw),
+ )
def test_plain_rollback(self):
- dbapi, p = self._fixture(reset_on_return='rollback')
+ dbapi, p = self._fixture(reset_on_return="rollback")
c1 = p.connect()
c1.close()
@@ -2077,7 +2054,7 @@ class ResetOnReturnTest(PoolTestBase):
assert not dbapi.connect().commit.called
def test_plain_commit(self):
- dbapi, p = self._fixture(reset_on_return='commit')
+ dbapi, p = self._fixture(reset_on_return="commit")
c1 = p.connect()
c1.close()
@@ -2093,7 +2070,7 @@ class ResetOnReturnTest(PoolTestBase):
assert not dbapi.connect().commit.called
def test_agent_rollback(self):
- dbapi, p = self._fixture(reset_on_return='rollback')
+ dbapi, p = self._fixture(reset_on_return="rollback")
class Agent(object):
def __init__(self, conn):
@@ -2124,7 +2101,7 @@ class ResetOnReturnTest(PoolTestBase):
assert not dbapi.connect().commit.called
def test_agent_commit(self):
- dbapi, p = self._fixture(reset_on_return='commit')
+ dbapi, p = self._fixture(reset_on_return="commit")
class Agent(object):
def __init__(self, conn):
@@ -2154,7 +2131,7 @@ class ResetOnReturnTest(PoolTestBase):
assert dbapi.connect().commit.called
def test_reset_agent_disconnect(self):
- dbapi, p = self._fixture(reset_on_return='rollback')
+ dbapi, p = self._fixture(reset_on_return="rollback")
class Agent(object):
def __init__(self, conn):
@@ -2180,16 +2157,16 @@ class SingletonThreadPoolTest(PoolTestBase):
def test_cleanup(self):
self._test_cleanup(False)
-# TODO: the SingletonThreadPool cleanup method
-# has an unfixed race condition within the "cleanup" system that
-# leads to this test being off by one connection under load; in any
-# case, this connection will be closed once it is garbage collected.
-# this pool is not a production-level pool and is only used for the
-# SQLite "memory" connection, and is not very useful under actual
-# multi-threaded conditions
-# @testing.requires.threading_with_mock
-# def test_cleanup_no_gc(self):
-# self._test_cleanup(True)
+ # TODO: the SingletonThreadPool cleanup method
+ # has an unfixed race condition within the "cleanup" system that
+ # leads to this test being off by one connection under load; in any
+ # case, this connection will be closed once it is garbage collected.
+ # this pool is not a production-level pool and is only used for the
+ # SQLite "memory" connection, and is not very useful under actual
+ # multi-threaded conditions
+ # @testing.requires.threading_with_mock
+ # def test_cleanup_no_gc(self):
+ # self._test_cleanup(True)
def _test_cleanup(self, strong_refs):
"""test that the pool's connections are OK after cleanup() has
@@ -2203,6 +2180,7 @@ class SingletonThreadPoolTest(PoolTestBase):
# the mock iterator isn't threadsafe...
with lock:
return dbapi.connect()
+
p = pool.SingletonThreadPool(creator=creator, pool_size=3)
if strong_refs:
@@ -2212,7 +2190,9 @@ class SingletonThreadPoolTest(PoolTestBase):
c = p.connect()
sr.add(c.connection)
return c
+
else:
+
def _conn():
return p.connect()
@@ -2222,7 +2202,7 @@ class SingletonThreadPoolTest(PoolTestBase):
assert c
c.cursor()
c.close()
- time.sleep(.1)
+ time.sleep(0.1)
threads = []
for i in range(10):
@@ -2241,13 +2221,13 @@ class SingletonThreadPoolTest(PoolTestBase):
class AssertionPoolTest(PoolTestBase):
def test_connect_error(self):
dbapi = MockDBAPI()
- p = pool.AssertionPool(creator=lambda: dbapi.connect('foo.db'))
+ p = pool.AssertionPool(creator=lambda: dbapi.connect("foo.db"))
c1 = p.connect() # noqa
assert_raises(AssertionError, p.connect)
def test_connect_multiple(self):
dbapi = MockDBAPI()
- p = pool.AssertionPool(creator=lambda: dbapi.connect('foo.db'))
+ p = pool.AssertionPool(creator=lambda: dbapi.connect("foo.db"))
c1 = p.connect()
c1.close()
c2 = p.connect()
@@ -2260,7 +2240,7 @@ class AssertionPoolTest(PoolTestBase):
class NullPoolTest(PoolTestBase):
def test_reconnect(self):
dbapi = MockDBAPI()
- p = pool.NullPool(creator=lambda: dbapi.connect('foo.db'))
+ p = pool.NullPool(creator=lambda: dbapi.connect("foo.db"))
c1 = p.connect()
c1.close()
@@ -2272,10 +2252,8 @@ class NullPoolTest(PoolTestBase):
c1 = p.connect()
dbapi.connect.assert_has_calls(
- [
- call('foo.db'),
- call('foo.db')],
- any_order=True)
+ [call("foo.db"), call("foo.db")], any_order=True
+ )
class StaticPoolTest(PoolTestBase):
@@ -2283,7 +2261,8 @@ class StaticPoolTest(PoolTestBase):
dbapi = MockDBAPI()
def creator():
- return dbapi.connect('foo.db')
+ return dbapi.connect("foo.db")
+
p = pool.StaticPool(creator)
p2 = p.recreate()
assert p._creator is p2._creator