diff options
-rw-r--r-- | test/engine/test_pool.py | 234 |
1 files changed, 128 insertions, 106 deletions
diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index 4f804f8b2..a4f891b3c 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -15,7 +15,7 @@ import collections join_timeout = 10 -def MockDBAPI(): +def MockDBAPI(): # noqa def cursor(): return Mock() @@ -61,8 +61,9 @@ class PoolTestBase(fixtures.TestBase): def _queuepool_dbapi_fixture(self, **kw): dbapi = MockDBAPI() - return dbapi, pool.QueuePool(creator=lambda: dbapi.connect('foo.db'), - **kw) + return dbapi, pool.QueuePool( + creator=lambda: dbapi.connect('foo.db'), + **kw) class PoolTest(PoolTestBase): @@ -95,7 +96,8 @@ class PoolTest(PoolTestBase): assert c1 is not c2 assert c1 is c3 - eq_(dbapi.connect.mock_calls, + eq_( + dbapi.connect.mock_calls, [ call("foo.db"), call("foo.db"), @@ -156,7 +158,8 @@ class PoolTest(PoolTestBase): for p in pool.QueuePool(creator=dbapi.connect, pool_size=3, max_overflow=-1, use_threadlocal=True), \ - pool.SingletonThreadPool(creator=dbapi.connect, + pool.SingletonThreadPool( + creator=dbapi.connect, use_threadlocal=True): c1 = p.connect() c2 = p.connect() @@ -408,14 +411,14 @@ class PoolEventsTest(PoolTestBase): def test_first_connect_event(self): p, canary = self._first_connect_event_fixture() - c1 = p.connect() + p.connect() eq_(canary, ['first_connect']) def test_first_connect_event_fires_once(self): p, canary = self._first_connect_event_fixture() - c1 = p.connect() - c2 = p.connect() + p.connect() + p.connect() eq_(canary, ['first_connect']) @@ -423,31 +426,31 @@ class PoolEventsTest(PoolTestBase): p, canary = self._first_connect_event_fixture() p2 = p.recreate() - c1 = p.connect() - c2 = p2.connect() + p.connect() + p2.connect() eq_(canary, ['first_connect', 'first_connect']) def test_first_connect_on_subsequently_recreated(self): p, canary = self._first_connect_event_fixture() - c1 = p.connect() + p.connect() p2 = p.recreate() - c2 = p2.connect() + p2.connect() eq_(canary, ['first_connect', 'first_connect']) def test_connect_event(self): p, canary = self._connect_event_fixture() - c1 = p.connect() + p.connect() eq_(canary, ['connect']) def test_connect_event_fires_subsequent(self): p, canary = self._connect_event_fixture() - c1 = p.connect() - c2 = p.connect() + c1 = p.connect() # noqa + c2 = p.connect() # noqa eq_(canary, ['connect', 'connect']) @@ -456,39 +459,39 @@ class PoolEventsTest(PoolTestBase): p2 = p.recreate() - c1 = p.connect() - c2 = p2.connect() + p.connect() + p2.connect() eq_(canary, ['connect', 'connect']) def test_connect_on_subsequently_recreated(self): p, canary = self._connect_event_fixture() - c1 = p.connect() + p.connect() p2 = p.recreate() - c2 = p2.connect() + p2.connect() eq_(canary, ['connect', 'connect']) def test_checkout_event(self): p, canary = self._checkout_event_fixture() - c1 = p.connect() + p.connect() eq_(canary, ['checkout']) def test_checkout_event_fires_subsequent(self): p, canary = self._checkout_event_fixture() - c1 = p.connect() - c2 = p.connect() + p.connect() + p.connect() eq_(canary, ['checkout', 'checkout']) def test_checkout_event_on_subsequently_recreated(self): p, canary = self._checkout_event_fixture() - c1 = p.connect() + p.connect() p2 = p.recreate() - c2 = p2.connect() + p2.connect() eq_(canary, ['checkout', 'checkout']) @@ -682,19 +685,21 @@ class PoolFirstConnectSyncTest(PoolTestBase): for th in threads: th.join(join_timeout) - eq_(evt.mock_calls, - [ - call.first_connect(), - call.connect(), - call.connect(), - call.connect()] - ) + eq_( + evt.mock_calls, + [ + call.first_connect(), + call.connect(), + call.connect(), + call.connect()] + ) class DeprecatedPoolListenerTest(PoolTestBase): @testing.requires.predictable_gc @testing.uses_deprecated(r".*Use event.listen") def test_listeners(self): + class InstrumentingListener(object): def __init__(self): if hasattr(self, 'connect'): @@ -713,18 +718,19 @@ class DeprecatedPoolListenerTest(PoolTestBase): self.checked_out = [] self.checked_in = [] - def assert_total(innerself, conn, fconn, cout, cin): - eq_(len(innerself.connected), conn) - eq_(len(innerself.first_connected), fconn) - eq_(len(innerself.checked_out), cout) - eq_(len(innerself.checked_in), cin) + def assert_total(self, conn, fconn, cout, cin): + eq_(len(self.connected), conn) + eq_(len(self.first_connected), fconn) + eq_(len(self.checked_out), cout) + eq_(len(self.checked_in), cin) - def assert_in(innerself, item, in_conn, in_fconn, - in_cout, in_cin): - self.assert_((item in innerself.connected) == in_conn) - self.assert_((item in innerself.first_connected) == in_fconn) - self.assert_((item in innerself.checked_out) == in_cout) - self.assert_((item in innerself.checked_in) == in_cin) + def assert_in( + self, item, in_conn, in_fconn, + in_cout, in_cin): + eq_((item in self.connected), in_conn) + eq_((item in self.first_connected), in_fconn) + eq_((item in self.checked_out), in_cout) + eq_((item in self.checked_in), in_cin) def inst_connect(self, con, record): print("connect(%s, %s)" % (con, record)) @@ -944,8 +950,9 @@ class QueuePoolTest(PoolTestBase): self._do_testqueuepool(useclose=True) def _do_testqueuepool(self, useclose=False): - p = self._queuepool_fixture(pool_size=3, - max_overflow=-1) + p = self._queuepool_fixture( + pool_size=3, + max_overflow=-1) def status(pool): return pool.size(), pool.checkedin(), pool.overflow(), \ @@ -994,18 +1001,20 @@ class QueuePoolTest(PoolTestBase): @testing.requires.timing_intensive def test_timeout(self): - p = self._queuepool_fixture(pool_size=3, - max_overflow=0, - timeout=2) - c1 = p.connect() - c2 = p.connect() - c3 = p.connect() + p = self._queuepool_fixture( + pool_size=3, + max_overflow=0, + timeout=2) + c1 = p.connect() # noqa + c2 = p.connect() # noqa + c3 = p.connect() # noqa now = time.time() - try: - c4 = p.connect() - assert False - except tsa.exc.TimeoutError: - assert int(time.time() - now) == 2 + + assert_raises( + tsa.exc.TimeoutError, + p.connect + ) + assert int(time.time() - now) == 2 @testing.requires.threading_with_mock @testing.requires.timing_intensive @@ -1019,9 +1028,9 @@ class QueuePoolTest(PoolTestBase): # them back to the start of do_get() dbapi = MockDBAPI() p = pool.QueuePool( - creator=lambda: dbapi.connect(delay=.05), - pool_size=2, - max_overflow=1, use_threadlocal=False, timeout=3) + creator=lambda: dbapi.connect(delay=.05), + pool_size=2, + max_overflow=1, use_threadlocal=False, timeout=3) timeouts = [] def checkout(): @@ -1102,9 +1111,9 @@ class QueuePoolTest(PoolTestBase): return creator() p = pool.QueuePool(creator=create, pool_size=2, max_overflow=3) - c1 = self._with_teardown(p.connect()) - c2 = self._with_teardown(p.connect()) - c3 = self._with_teardown(p.connect()) + c1 = self._with_teardown(p.connect()) # noqa + c2 = self._with_teardown(p.connect()) # noqa + c3 = self._with_teardown(p.connect()) # noqa eq_(p._overflow, 1) creator = failing_dbapi assert_raises(Exception, p.connect) @@ -1148,15 +1157,15 @@ class QueuePoolTest(PoolTestBase): threads = [ threading.Thread( - target=run_test, args=("success_one", p, False)), + target=run_test, args=("success_one", p, False)), threading.Thread( - target=run_test, args=("success_two", p, False)), + target=run_test, args=("success_two", p, False)), threading.Thread( - target=run_test, args=("overflow_one", p, True)), + target=run_test, args=("overflow_one", p, True)), threading.Thread( - target=run_test, args=("overflow_two", p, False)), + target=run_test, args=("overflow_two", p, False)), threading.Thread( - target=run_test, args=("overflow_three", p, False)) + target=run_test, args=("overflow_three", p, False)) ] for t in threads: t.start() @@ -1202,13 +1211,14 @@ class QueuePoolTest(PoolTestBase): time.sleep(.1) conn.close() - c1 = p.connect() + c1 = p.connect() # noqa c2 = p.connect() threads = [] for i in range(2): - t = threading.Thread(target=waiter, - args=(p, timeout, max_overflow)) + t = threading.Thread( + target=waiter, + args=(p, timeout, max_overflow)) t.daemon = True t.start() threads.append(t) @@ -1227,7 +1237,8 @@ class QueuePoolTest(PoolTestBase): def test_connrec_invalidated_within_checkout_no_race(self): """Test that a concurrent ConnectionRecord.invalidate() which - occurs after the ConnectionFairy has called _ConnectionRecord.checkout() + occurs after the ConnectionFairy has called + _ConnectionRecord.checkout() but before the ConnectionFairy tests "fairy.connection is None" will not result in an InvalidRequestError. @@ -1262,7 +1273,6 @@ class QueuePoolTest(PoolTestBase): is_(conn._connection_record.connection, None) conn.close() - @testing.requires.threading_with_mock @testing.requires.timing_intensive def test_notify_waiters(self): @@ -1273,9 +1283,10 @@ class QueuePoolTest(PoolTestBase): def creator(): canary.append(1) return dbapi.connect() - p1 = pool.QueuePool(creator=creator, - pool_size=1, timeout=None, - max_overflow=0) + p1 = pool.QueuePool( + creator=creator, + pool_size=1, timeout=None, + max_overflow=0) def waiter(p): conn = p.connect() @@ -1369,8 +1380,9 @@ class QueuePoolTest(PoolTestBase): self._test_overflow_no_gc(False) def _test_overflow_no_gc(self, threadlocal): - p = self._queuepool_fixture(pool_size=2, - max_overflow=2) + p = self._queuepool_fixture( + pool_size=2, + max_overflow=2) # disable weakref collection of the # underlying connections @@ -1400,8 +1412,9 @@ class QueuePoolTest(PoolTestBase): @testing.requires.predictable_gc def test_weakref_kaboom(self): - p = self._queuepool_fixture(pool_size=3, - max_overflow=-1, use_threadlocal=True) + p = self._queuepool_fixture( + pool_size=3, + max_overflow=-1, use_threadlocal=True) c1 = p.connect() c2 = p.connect() c1.close() @@ -1420,8 +1433,9 @@ class QueuePoolTest(PoolTestBase): ConnectionFairy with an ambiguous counter. i.e. its not true reference counting.""" - p = self._queuepool_fixture(pool_size=3, - max_overflow=-1, use_threadlocal=True) + p = self._queuepool_fixture( + pool_size=3, + max_overflow=-1, use_threadlocal=True) c1 = p.connect() c2 = p.connect() assert c1 is c2 @@ -1454,8 +1468,9 @@ class QueuePoolTest(PoolTestBase): @testing.requires.timing_intensive def test_recycle_on_invalidate(self): - p = self._queuepool_fixture(pool_size=1, - max_overflow=0) + p = self._queuepool_fixture( + pool_size=1, + max_overflow=0) c1 = p.connect() c_id = id(c1.connection) c1.close() @@ -1472,8 +1487,9 @@ class QueuePoolTest(PoolTestBase): @testing.requires.timing_intensive def test_recycle_on_soft_invalidate(self): - p = self._queuepool_fixture(pool_size=1, - max_overflow=0) + p = self._queuepool_fixture( + pool_size=1, + max_overflow=0) c1 = p.connect() c_id = id(c1.connection) c1.close() @@ -1523,14 +1539,16 @@ class QueuePoolTest(PoolTestBase): dbapi.shutdown(False) - c1 = self._with_teardown(p.connect()) + c1 = self._with_teardown(p.connect()) # noqa assert p._pool.empty() # poolsize is one, so we're empty OK - c2 = self._with_teardown(p.connect()) + c2 = self._with_teardown(p.connect()) # noqa eq_(p._overflow, 1) # and not 2 # this hangs if p._overflow is 2 c3 = self._with_teardown(p.connect()) + c3.close() + def test_error_on_pooled_reconnect_cleanup_invalidate(self): dbapi, p = self._queuepool_dbapi_fixture(pool_size=1, max_overflow=2) c1 = p.connect() @@ -1540,8 +1558,9 @@ class QueuePoolTest(PoolTestBase): @testing.requires.timing_intensive def test_error_on_pooled_reconnect_cleanup_recycle(self): - dbapi, p = self._queuepool_dbapi_fixture(pool_size=1, - max_overflow=2, recycle=1) + dbapi, p = self._queuepool_dbapi_fixture( + pool_size=1, + max_overflow=2, recycle=1) c1 = p.connect() c1.close() time.sleep(1.5) @@ -1638,10 +1657,10 @@ class QueuePoolTest(PoolTestBase): c = p.connect() c.close() - def test_error_on_pooled_reconnect_cleanup_wcheckout_event(self): - dbapi, p = self._queuepool_dbapi_fixture(pool_size=1, - max_overflow=2) + dbapi, p = self._queuepool_dbapi_fixture( + pool_size=1, + max_overflow=2) c1 = p.connect() c1.close() @@ -1694,13 +1713,14 @@ class QueuePoolTest(PoolTestBase): def attempt(conn): time.sleep(random.random()) try: - conn._handle_dbapi_exception(Error(), "statement", {}, - Mock(), Mock()) + conn._handle_dbapi_exception( + Error(), "statement", {}, + Mock(), Mock()) except tsa.exc.DBAPIError: pass # run an error + invalidate operation on the remaining 7 open - #connections + # connections threads = [] for conn in conns: t = threading.Thread(target=attempt, args=(conn, )) @@ -1768,7 +1788,7 @@ class QueuePoolTest(PoolTestBase): dbapi, p = self._queuepool_dbapi_fixture(pool_size=1, max_overflow=0) c1 = p.connect() c1.detach() - c2 = p.connect() + c2 = p.connect() # noqa eq_(dbapi.connect.mock_calls, [call("foo.db"), call("foo.db")]) c1_con = c1.connection @@ -1805,8 +1825,9 @@ class QueuePoolTest(PoolTestBase): class ResetOnReturnTest(PoolTestBase): def _fixture(self, **kw): dbapi = Mock() - return dbapi, pool.QueuePool(creator=lambda: dbapi.connect('foo.db'), - **kw) + return dbapi, pool.QueuePool( + creator=lambda: dbapi.connect('foo.db'), + **kw) def test_plain_rollback(self): dbapi, p = self._fixture(reset_on_return='rollback') @@ -1955,7 +1976,7 @@ class AssertionPoolTest(PoolTestBase): def test_connect_error(self): dbapi = MockDBAPI() p = pool.AssertionPool(creator=lambda: dbapi.connect('foo.db')) - c1 = p.connect() + c1 = p.connect() # noqa assert_raises(AssertionError, p.connect) def test_connect_multiple(self): @@ -1966,7 +1987,7 @@ class AssertionPoolTest(PoolTestBase): c2 = p.connect() c2.close() - c3 = p.connect() + c3 = p.connect() # noqa assert_raises(AssertionError, p.connect) @@ -1984,16 +2005,19 @@ class NullPoolTest(PoolTestBase): c1 = None c1 = p.connect() - dbapi.connect.assert_has_calls([ - call('foo.db'), - call('foo.db')], - any_order=True) + dbapi.connect.assert_has_calls( + [ + call('foo.db'), + call('foo.db')], + any_order=True) class StaticPoolTest(PoolTestBase): def test_recreate(self): dbapi = MockDBAPI() - creator = lambda: dbapi.connect('foo.db') + + def creator(): + return dbapi.connect('foo.db') p = pool.StaticPool(creator) p2 = p.recreate() assert p._creator is p2._creator @@ -2045,5 +2069,3 @@ class CreatorCompatibilityTest(PoolTestBase): conn = e.connect() conn.close() - - |