summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--test/engine/test_pool.py234
1 files changed, 128 insertions, 106 deletions
diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py
index 4f804f8b2..a4f891b3c 100644
--- a/test/engine/test_pool.py
+++ b/test/engine/test_pool.py
@@ -15,7 +15,7 @@ import collections
join_timeout = 10
-def MockDBAPI():
+def MockDBAPI(): # noqa
def cursor():
return Mock()
@@ -61,8 +61,9 @@ class PoolTestBase(fixtures.TestBase):
def _queuepool_dbapi_fixture(self, **kw):
dbapi = MockDBAPI()
- return dbapi, pool.QueuePool(creator=lambda: dbapi.connect('foo.db'),
- **kw)
+ return dbapi, pool.QueuePool(
+ creator=lambda: dbapi.connect('foo.db'),
+ **kw)
class PoolTest(PoolTestBase):
@@ -95,7 +96,8 @@ class PoolTest(PoolTestBase):
assert c1 is not c2
assert c1 is c3
- eq_(dbapi.connect.mock_calls,
+ eq_(
+ dbapi.connect.mock_calls,
[
call("foo.db"),
call("foo.db"),
@@ -156,7 +158,8 @@ class PoolTest(PoolTestBase):
for p in pool.QueuePool(creator=dbapi.connect,
pool_size=3, max_overflow=-1,
use_threadlocal=True), \
- pool.SingletonThreadPool(creator=dbapi.connect,
+ pool.SingletonThreadPool(
+ creator=dbapi.connect,
use_threadlocal=True):
c1 = p.connect()
c2 = p.connect()
@@ -408,14 +411,14 @@ class PoolEventsTest(PoolTestBase):
def test_first_connect_event(self):
p, canary = self._first_connect_event_fixture()
- c1 = p.connect()
+ p.connect()
eq_(canary, ['first_connect'])
def test_first_connect_event_fires_once(self):
p, canary = self._first_connect_event_fixture()
- c1 = p.connect()
- c2 = p.connect()
+ p.connect()
+ p.connect()
eq_(canary, ['first_connect'])
@@ -423,31 +426,31 @@ class PoolEventsTest(PoolTestBase):
p, canary = self._first_connect_event_fixture()
p2 = p.recreate()
- c1 = p.connect()
- c2 = p2.connect()
+ p.connect()
+ p2.connect()
eq_(canary, ['first_connect', 'first_connect'])
def test_first_connect_on_subsequently_recreated(self):
p, canary = self._first_connect_event_fixture()
- c1 = p.connect()
+ p.connect()
p2 = p.recreate()
- c2 = p2.connect()
+ p2.connect()
eq_(canary, ['first_connect', 'first_connect'])
def test_connect_event(self):
p, canary = self._connect_event_fixture()
- c1 = p.connect()
+ p.connect()
eq_(canary, ['connect'])
def test_connect_event_fires_subsequent(self):
p, canary = self._connect_event_fixture()
- c1 = p.connect()
- c2 = p.connect()
+ c1 = p.connect() # noqa
+ c2 = p.connect() # noqa
eq_(canary, ['connect', 'connect'])
@@ -456,39 +459,39 @@ class PoolEventsTest(PoolTestBase):
p2 = p.recreate()
- c1 = p.connect()
- c2 = p2.connect()
+ p.connect()
+ p2.connect()
eq_(canary, ['connect', 'connect'])
def test_connect_on_subsequently_recreated(self):
p, canary = self._connect_event_fixture()
- c1 = p.connect()
+ p.connect()
p2 = p.recreate()
- c2 = p2.connect()
+ p2.connect()
eq_(canary, ['connect', 'connect'])
def test_checkout_event(self):
p, canary = self._checkout_event_fixture()
- c1 = p.connect()
+ p.connect()
eq_(canary, ['checkout'])
def test_checkout_event_fires_subsequent(self):
p, canary = self._checkout_event_fixture()
- c1 = p.connect()
- c2 = p.connect()
+ p.connect()
+ p.connect()
eq_(canary, ['checkout', 'checkout'])
def test_checkout_event_on_subsequently_recreated(self):
p, canary = self._checkout_event_fixture()
- c1 = p.connect()
+ p.connect()
p2 = p.recreate()
- c2 = p2.connect()
+ p2.connect()
eq_(canary, ['checkout', 'checkout'])
@@ -682,19 +685,21 @@ class PoolFirstConnectSyncTest(PoolTestBase):
for th in threads:
th.join(join_timeout)
- eq_(evt.mock_calls,
- [
- call.first_connect(),
- call.connect(),
- call.connect(),
- call.connect()]
- )
+ eq_(
+ evt.mock_calls,
+ [
+ call.first_connect(),
+ call.connect(),
+ call.connect(),
+ call.connect()]
+ )
class DeprecatedPoolListenerTest(PoolTestBase):
@testing.requires.predictable_gc
@testing.uses_deprecated(r".*Use event.listen")
def test_listeners(self):
+
class InstrumentingListener(object):
def __init__(self):
if hasattr(self, 'connect'):
@@ -713,18 +718,19 @@ class DeprecatedPoolListenerTest(PoolTestBase):
self.checked_out = []
self.checked_in = []
- def assert_total(innerself, conn, fconn, cout, cin):
- eq_(len(innerself.connected), conn)
- eq_(len(innerself.first_connected), fconn)
- eq_(len(innerself.checked_out), cout)
- eq_(len(innerself.checked_in), cin)
+ def assert_total(self, conn, fconn, cout, cin):
+ eq_(len(self.connected), conn)
+ eq_(len(self.first_connected), fconn)
+ eq_(len(self.checked_out), cout)
+ eq_(len(self.checked_in), cin)
- def assert_in(innerself, item, in_conn, in_fconn,
- in_cout, in_cin):
- self.assert_((item in innerself.connected) == in_conn)
- self.assert_((item in innerself.first_connected) == in_fconn)
- self.assert_((item in innerself.checked_out) == in_cout)
- self.assert_((item in innerself.checked_in) == in_cin)
+ def assert_in(
+ self, item, in_conn, in_fconn,
+ in_cout, in_cin):
+ eq_((item in self.connected), in_conn)
+ eq_((item in self.first_connected), in_fconn)
+ eq_((item in self.checked_out), in_cout)
+ eq_((item in self.checked_in), in_cin)
def inst_connect(self, con, record):
print("connect(%s, %s)" % (con, record))
@@ -944,8 +950,9 @@ class QueuePoolTest(PoolTestBase):
self._do_testqueuepool(useclose=True)
def _do_testqueuepool(self, useclose=False):
- p = self._queuepool_fixture(pool_size=3,
- max_overflow=-1)
+ p = self._queuepool_fixture(
+ pool_size=3,
+ max_overflow=-1)
def status(pool):
return pool.size(), pool.checkedin(), pool.overflow(), \
@@ -994,18 +1001,20 @@ class QueuePoolTest(PoolTestBase):
@testing.requires.timing_intensive
def test_timeout(self):
- p = self._queuepool_fixture(pool_size=3,
- max_overflow=0,
- timeout=2)
- c1 = p.connect()
- c2 = p.connect()
- c3 = p.connect()
+ p = self._queuepool_fixture(
+ pool_size=3,
+ max_overflow=0,
+ timeout=2)
+ c1 = p.connect() # noqa
+ c2 = p.connect() # noqa
+ c3 = p.connect() # noqa
now = time.time()
- try:
- c4 = p.connect()
- assert False
- except tsa.exc.TimeoutError:
- assert int(time.time() - now) == 2
+
+ assert_raises(
+ tsa.exc.TimeoutError,
+ p.connect
+ )
+ assert int(time.time() - now) == 2
@testing.requires.threading_with_mock
@testing.requires.timing_intensive
@@ -1019,9 +1028,9 @@ class QueuePoolTest(PoolTestBase):
# them back to the start of do_get()
dbapi = MockDBAPI()
p = pool.QueuePool(
- creator=lambda: dbapi.connect(delay=.05),
- pool_size=2,
- max_overflow=1, use_threadlocal=False, timeout=3)
+ creator=lambda: dbapi.connect(delay=.05),
+ pool_size=2,
+ max_overflow=1, use_threadlocal=False, timeout=3)
timeouts = []
def checkout():
@@ -1102,9 +1111,9 @@ class QueuePoolTest(PoolTestBase):
return creator()
p = pool.QueuePool(creator=create, pool_size=2, max_overflow=3)
- c1 = self._with_teardown(p.connect())
- c2 = self._with_teardown(p.connect())
- c3 = self._with_teardown(p.connect())
+ c1 = self._with_teardown(p.connect()) # noqa
+ c2 = self._with_teardown(p.connect()) # noqa
+ c3 = self._with_teardown(p.connect()) # noqa
eq_(p._overflow, 1)
creator = failing_dbapi
assert_raises(Exception, p.connect)
@@ -1148,15 +1157,15 @@ class QueuePoolTest(PoolTestBase):
threads = [
threading.Thread(
- target=run_test, args=("success_one", p, False)),
+ target=run_test, args=("success_one", p, False)),
threading.Thread(
- target=run_test, args=("success_two", p, False)),
+ target=run_test, args=("success_two", p, False)),
threading.Thread(
- target=run_test, args=("overflow_one", p, True)),
+ target=run_test, args=("overflow_one", p, True)),
threading.Thread(
- target=run_test, args=("overflow_two", p, False)),
+ target=run_test, args=("overflow_two", p, False)),
threading.Thread(
- target=run_test, args=("overflow_three", p, False))
+ target=run_test, args=("overflow_three", p, False))
]
for t in threads:
t.start()
@@ -1202,13 +1211,14 @@ class QueuePoolTest(PoolTestBase):
time.sleep(.1)
conn.close()
- c1 = p.connect()
+ c1 = p.connect() # noqa
c2 = p.connect()
threads = []
for i in range(2):
- t = threading.Thread(target=waiter,
- args=(p, timeout, max_overflow))
+ t = threading.Thread(
+ target=waiter,
+ args=(p, timeout, max_overflow))
t.daemon = True
t.start()
threads.append(t)
@@ -1227,7 +1237,8 @@ class QueuePoolTest(PoolTestBase):
def test_connrec_invalidated_within_checkout_no_race(self):
"""Test that a concurrent ConnectionRecord.invalidate() which
- occurs after the ConnectionFairy has called _ConnectionRecord.checkout()
+ occurs after the ConnectionFairy has called
+ _ConnectionRecord.checkout()
but before the ConnectionFairy tests "fairy.connection is None"
will not result in an InvalidRequestError.
@@ -1262,7 +1273,6 @@ class QueuePoolTest(PoolTestBase):
is_(conn._connection_record.connection, None)
conn.close()
-
@testing.requires.threading_with_mock
@testing.requires.timing_intensive
def test_notify_waiters(self):
@@ -1273,9 +1283,10 @@ class QueuePoolTest(PoolTestBase):
def creator():
canary.append(1)
return dbapi.connect()
- p1 = pool.QueuePool(creator=creator,
- pool_size=1, timeout=None,
- max_overflow=0)
+ p1 = pool.QueuePool(
+ creator=creator,
+ pool_size=1, timeout=None,
+ max_overflow=0)
def waiter(p):
conn = p.connect()
@@ -1369,8 +1380,9 @@ class QueuePoolTest(PoolTestBase):
self._test_overflow_no_gc(False)
def _test_overflow_no_gc(self, threadlocal):
- p = self._queuepool_fixture(pool_size=2,
- max_overflow=2)
+ p = self._queuepool_fixture(
+ pool_size=2,
+ max_overflow=2)
# disable weakref collection of the
# underlying connections
@@ -1400,8 +1412,9 @@ class QueuePoolTest(PoolTestBase):
@testing.requires.predictable_gc
def test_weakref_kaboom(self):
- p = self._queuepool_fixture(pool_size=3,
- max_overflow=-1, use_threadlocal=True)
+ p = self._queuepool_fixture(
+ pool_size=3,
+ max_overflow=-1, use_threadlocal=True)
c1 = p.connect()
c2 = p.connect()
c1.close()
@@ -1420,8 +1433,9 @@ class QueuePoolTest(PoolTestBase):
ConnectionFairy with an ambiguous counter. i.e. its not true
reference counting."""
- p = self._queuepool_fixture(pool_size=3,
- max_overflow=-1, use_threadlocal=True)
+ p = self._queuepool_fixture(
+ pool_size=3,
+ max_overflow=-1, use_threadlocal=True)
c1 = p.connect()
c2 = p.connect()
assert c1 is c2
@@ -1454,8 +1468,9 @@ class QueuePoolTest(PoolTestBase):
@testing.requires.timing_intensive
def test_recycle_on_invalidate(self):
- p = self._queuepool_fixture(pool_size=1,
- max_overflow=0)
+ p = self._queuepool_fixture(
+ pool_size=1,
+ max_overflow=0)
c1 = p.connect()
c_id = id(c1.connection)
c1.close()
@@ -1472,8 +1487,9 @@ class QueuePoolTest(PoolTestBase):
@testing.requires.timing_intensive
def test_recycle_on_soft_invalidate(self):
- p = self._queuepool_fixture(pool_size=1,
- max_overflow=0)
+ p = self._queuepool_fixture(
+ pool_size=1,
+ max_overflow=0)
c1 = p.connect()
c_id = id(c1.connection)
c1.close()
@@ -1523,14 +1539,16 @@ class QueuePoolTest(PoolTestBase):
dbapi.shutdown(False)
- c1 = self._with_teardown(p.connect())
+ c1 = self._with_teardown(p.connect()) # noqa
assert p._pool.empty() # poolsize is one, so we're empty OK
- c2 = self._with_teardown(p.connect())
+ c2 = self._with_teardown(p.connect()) # noqa
eq_(p._overflow, 1) # and not 2
# this hangs if p._overflow is 2
c3 = self._with_teardown(p.connect())
+ c3.close()
+
def test_error_on_pooled_reconnect_cleanup_invalidate(self):
dbapi, p = self._queuepool_dbapi_fixture(pool_size=1, max_overflow=2)
c1 = p.connect()
@@ -1540,8 +1558,9 @@ class QueuePoolTest(PoolTestBase):
@testing.requires.timing_intensive
def test_error_on_pooled_reconnect_cleanup_recycle(self):
- dbapi, p = self._queuepool_dbapi_fixture(pool_size=1,
- max_overflow=2, recycle=1)
+ dbapi, p = self._queuepool_dbapi_fixture(
+ pool_size=1,
+ max_overflow=2, recycle=1)
c1 = p.connect()
c1.close()
time.sleep(1.5)
@@ -1638,10 +1657,10 @@ class QueuePoolTest(PoolTestBase):
c = p.connect()
c.close()
-
def test_error_on_pooled_reconnect_cleanup_wcheckout_event(self):
- dbapi, p = self._queuepool_dbapi_fixture(pool_size=1,
- max_overflow=2)
+ dbapi, p = self._queuepool_dbapi_fixture(
+ pool_size=1,
+ max_overflow=2)
c1 = p.connect()
c1.close()
@@ -1694,13 +1713,14 @@ class QueuePoolTest(PoolTestBase):
def attempt(conn):
time.sleep(random.random())
try:
- conn._handle_dbapi_exception(Error(), "statement", {},
- Mock(), Mock())
+ conn._handle_dbapi_exception(
+ Error(), "statement", {},
+ Mock(), Mock())
except tsa.exc.DBAPIError:
pass
# run an error + invalidate operation on the remaining 7 open
- #connections
+ # connections
threads = []
for conn in conns:
t = threading.Thread(target=attempt, args=(conn, ))
@@ -1768,7 +1788,7 @@ class QueuePoolTest(PoolTestBase):
dbapi, p = self._queuepool_dbapi_fixture(pool_size=1, max_overflow=0)
c1 = p.connect()
c1.detach()
- c2 = p.connect()
+ c2 = p.connect() # noqa
eq_(dbapi.connect.mock_calls, [call("foo.db"), call("foo.db")])
c1_con = c1.connection
@@ -1805,8 +1825,9 @@ class QueuePoolTest(PoolTestBase):
class ResetOnReturnTest(PoolTestBase):
def _fixture(self, **kw):
dbapi = Mock()
- return dbapi, pool.QueuePool(creator=lambda: dbapi.connect('foo.db'),
- **kw)
+ return dbapi, pool.QueuePool(
+ creator=lambda: dbapi.connect('foo.db'),
+ **kw)
def test_plain_rollback(self):
dbapi, p = self._fixture(reset_on_return='rollback')
@@ -1955,7 +1976,7 @@ class AssertionPoolTest(PoolTestBase):
def test_connect_error(self):
dbapi = MockDBAPI()
p = pool.AssertionPool(creator=lambda: dbapi.connect('foo.db'))
- c1 = p.connect()
+ c1 = p.connect() # noqa
assert_raises(AssertionError, p.connect)
def test_connect_multiple(self):
@@ -1966,7 +1987,7 @@ class AssertionPoolTest(PoolTestBase):
c2 = p.connect()
c2.close()
- c3 = p.connect()
+ c3 = p.connect() # noqa
assert_raises(AssertionError, p.connect)
@@ -1984,16 +2005,19 @@ class NullPoolTest(PoolTestBase):
c1 = None
c1 = p.connect()
- dbapi.connect.assert_has_calls([
- call('foo.db'),
- call('foo.db')],
- any_order=True)
+ dbapi.connect.assert_has_calls(
+ [
+ call('foo.db'),
+ call('foo.db')],
+ any_order=True)
class StaticPoolTest(PoolTestBase):
def test_recreate(self):
dbapi = MockDBAPI()
- creator = lambda: dbapi.connect('foo.db')
+
+ def creator():
+ return dbapi.connect('foo.db')
p = pool.StaticPool(creator)
p2 = p.recreate()
assert p._creator is p2._creator
@@ -2045,5 +2069,3 @@ class CreatorCompatibilityTest(PoolTestBase):
conn = e.connect()
conn.close()
-
-