diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2009-08-06 21:11:27 +0000 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2009-08-06 21:11:27 +0000 |
| commit | 8fc5005dfe3eb66a46470ad8a8c7b95fc4d6bdca (patch) | |
| tree | ae9e27d12c9fbf8297bb90469509e1cb6a206242 /test/engine/test_pool.py | |
| parent | 7638aa7f242c6ea3d743aa9100e32be2052546a6 (diff) | |
| download | sqlalchemy-8fc5005dfe3eb66a46470ad8a8c7b95fc4d6bdca.tar.gz | |
merge 0.6 series to trunk.
Diffstat (limited to 'test/engine/test_pool.py')
| -rw-r--r-- | test/engine/test_pool.py | 630 |
1 files changed, 341 insertions, 289 deletions
diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index d135ad337..68637281e 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -1,7 +1,8 @@ -import threading, time, gc -from sqlalchemy import pool, interfaces +import threading, time +from sqlalchemy import pool, interfaces, create_engine, select import sqlalchemy as tsa -from sqlalchemy.test import TestBase +from sqlalchemy.test import TestBase, testing +from sqlalchemy.test.util import gc_collect, lazy_gc mcid = 1 @@ -51,7 +52,6 @@ class PoolTest(PoolTestBase): connection2 = manager.connect('foo.db') connection3 = manager.connect('bar.db') - print "connection " + repr(connection) self.assert_(connection.cursor() is not None) self.assert_(connection is connection2) self.assert_(connection2 is not connection3) @@ -70,8 +70,6 @@ class PoolTest(PoolTestBase): connection = manager.connect('foo.db') connection2 = manager.connect('foo.db') - print "connection " + repr(connection) - self.assert_(connection.cursor() is not None) self.assert_(connection is not connection2) @@ -103,7 +101,8 @@ class PoolTest(PoolTestBase): c2.close() else: c2 = None - + lazy_gc() + if useclose: c1 = p.connect() c2 = p.connect() @@ -117,6 +116,8 @@ class PoolTest(PoolTestBase): # extra tests with QueuePool to ensure connections get __del__()ed when dereferenced if isinstance(p, pool.QueuePool): + lazy_gc() + self.assert_(p.checkedout() == 0) c1 = p.connect() c2 = p.connect() @@ -126,6 +127,7 @@ class PoolTest(PoolTestBase): else: c2 = None c1 = None + lazy_gc() self.assert_(p.checkedout() == 0) def test_properties(self): @@ -164,6 +166,8 @@ class PoolTest(PoolTestBase): def __init__(self): if hasattr(self, 'connect'): self.connect = self.inst_connect + if hasattr(self, 'first_connect'): + self.first_connect = self.inst_first_connect if hasattr(self, 'checkout'): self.checkout = self.inst_checkout if hasattr(self, 'checkin'): @@ -171,14 +175,17 @@ class PoolTest(PoolTestBase): self.clear() def clear(self): self.connected = [] + self.first_connected = [] self.checked_out = [] self.checked_in = [] - def assert_total(innerself, conn, cout, cin): + def assert_total(innerself, conn, fconn, cout, cin): self.assert_(len(innerself.connected) == conn) + self.assert_(len(innerself.first_connected) == fconn) self.assert_(len(innerself.checked_out) == cout) self.assert_(len(innerself.checked_in) == cin) - def assert_in(innerself, item, in_conn, in_cout, in_cin): + def assert_in(innerself, item, in_conn, in_fconn, in_cout, in_cin): self.assert_((item in innerself.connected) == in_conn) + self.assert_((item in innerself.first_connected) == in_fconn) self.assert_((item in innerself.checked_out) == in_cout) self.assert_((item in innerself.checked_in) == in_cin) def inst_connect(self, con, record): @@ -186,6 +193,11 @@ class PoolTest(PoolTestBase): assert con is not None assert record is not None self.connected.append(con) + def inst_first_connect(self, con, record): + print "first_connect(%s, %s)" % (con, record) + assert con is not None + assert record is not None + self.first_connected.append(con) def inst_checkout(self, con, record, proxy): print "checkout(%s, %s, %s)" % (con, record, proxy) assert con is not None @@ -203,6 +215,9 @@ class PoolTest(PoolTestBase): class ListenConnect(InstrumentingListener): def connect(self, con, record): pass + class ListenFirstConnect(InstrumentingListener): + def first_connect(self, con, record): + pass class ListenCheckOut(InstrumentingListener): def checkout(self, con, record, proxy, num): pass @@ -214,40 +229,43 @@ class PoolTest(PoolTestBase): return pool.QueuePool(creator=lambda: dbapi.connect('foo.db'), use_threadlocal=False, **kw) - def assert_listeners(p, total, conn, cout, cin): + def assert_listeners(p, total, conn, fconn, cout, cin): for instance in (p, p.recreate()): self.assert_(len(instance.listeners) == total) self.assert_(len(instance._on_connect) == conn) + self.assert_(len(instance._on_first_connect) == fconn) self.assert_(len(instance._on_checkout) == cout) self.assert_(len(instance._on_checkin) == cin) p = _pool() - assert_listeners(p, 0, 0, 0, 0) + assert_listeners(p, 0, 0, 0, 0, 0) p.add_listener(ListenAll()) - assert_listeners(p, 1, 1, 1, 1) + assert_listeners(p, 1, 1, 1, 1, 1) p.add_listener(ListenConnect()) - assert_listeners(p, 2, 2, 1, 1) + assert_listeners(p, 2, 2, 1, 1, 1) + + p.add_listener(ListenFirstConnect()) + assert_listeners(p, 3, 2, 2, 1, 1) p.add_listener(ListenCheckOut()) - assert_listeners(p, 3, 2, 2, 1) + assert_listeners(p, 4, 2, 2, 2, 1) p.add_listener(ListenCheckIn()) - assert_listeners(p, 4, 2, 2, 2) + assert_listeners(p, 5, 2, 2, 2, 2) del p - print "----" snoop = ListenAll() p = _pool(listeners=[snoop]) - assert_listeners(p, 1, 1, 1, 1) + assert_listeners(p, 1, 1, 1, 1, 1) c = p.connect() - snoop.assert_total(1, 1, 0) + snoop.assert_total(1, 1, 1, 0) cc = c.connection - snoop.assert_in(cc, True, True, False) + snoop.assert_in(cc, True, True, True, False) c.close() - snoop.assert_in(cc, True, True, True) + snoop.assert_in(cc, True, True, True, True) del c, cc snoop.clear() @@ -255,10 +273,11 @@ class PoolTest(PoolTestBase): # this one depends on immediate gc c = p.connect() cc = c.connection - snoop.assert_in(cc, False, True, False) - snoop.assert_total(0, 1, 0) + snoop.assert_in(cc, False, False, True, False) + snoop.assert_total(0, 0, 1, 0) del c, cc - snoop.assert_total(0, 1, 1) + lazy_gc() + snoop.assert_total(0, 0, 1, 1) p.dispose() snoop.clear() @@ -266,44 +285,46 @@ class PoolTest(PoolTestBase): c = p.connect() c.close() c = p.connect() - snoop.assert_total(1, 2, 1) + snoop.assert_total(1, 0, 2, 1) c.close() - snoop.assert_total(1, 2, 2) + snoop.assert_total(1, 0, 2, 2) # invalidation p.dispose() snoop.clear() c = p.connect() - snoop.assert_total(1, 1, 0) + snoop.assert_total(1, 0, 1, 0) c.invalidate() - snoop.assert_total(1, 1, 1) + snoop.assert_total(1, 0, 1, 1) c.close() - snoop.assert_total(1, 1, 1) + snoop.assert_total(1, 0, 1, 1) del c - snoop.assert_total(1, 1, 1) + lazy_gc() + snoop.assert_total(1, 0, 1, 1) c = p.connect() - snoop.assert_total(2, 2, 1) + snoop.assert_total(2, 0, 2, 1) c.close() del c - snoop.assert_total(2, 2, 2) + lazy_gc() + snoop.assert_total(2, 0, 2, 2) # detached p.dispose() snoop.clear() c = p.connect() - snoop.assert_total(1, 1, 0) + snoop.assert_total(1, 0, 1, 0) c.detach() - snoop.assert_total(1, 1, 0) + snoop.assert_total(1, 0, 1, 0) c.close() del c - snoop.assert_total(1, 1, 0) + snoop.assert_total(1, 0, 1, 0) c = p.connect() - snoop.assert_total(2, 2, 0) + snoop.assert_total(2, 0, 2, 0) c.close() del c - snoop.assert_total(2, 2, 1) + snoop.assert_total(2, 0, 2, 1) def test_listeners_callables(self): dbapi = MockDBAPI() @@ -362,262 +383,293 @@ class PoolTest(PoolTestBase): c.close() assert counts == [1, 2, 3] + def test_listener_after_oninit(self): + """Test that listeners are called after OnInit is removed""" + called = [] + def listener(*args): + called.append(True) + listener.connect = listener + engine = create_engine(testing.db.url) + engine.pool.add_listener(listener) + engine.execute(select([1])) + assert called, "Listener not called on connect" + + class QueuePoolTest(PoolTestBase): - def testqueuepool_del(self): - self._do_testqueuepool(useclose=False) - - def testqueuepool_close(self): - self._do_testqueuepool(useclose=True) - - def _do_testqueuepool(self, useclose=False): - p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = False) - - def status(pool): - tup = (pool.size(), pool.checkedin(), pool.overflow(), pool.checkedout()) - print "Pool size: %d Connections in pool: %d Current Overflow: %d Current Checked out connections: %d" % tup - return tup - - c1 = p.connect() - self.assert_(status(p) == (3,0,-2,1)) - c2 = p.connect() - self.assert_(status(p) == (3,0,-1,2)) - c3 = p.connect() - self.assert_(status(p) == (3,0,0,3)) - c4 = p.connect() - self.assert_(status(p) == (3,0,1,4)) - c5 = p.connect() - self.assert_(status(p) == (3,0,2,5)) - c6 = p.connect() - self.assert_(status(p) == (3,0,3,6)) - if useclose: - c4.close() - c3.close() - c2.close() - else: - c4 = c3 = c2 = None - self.assert_(status(p) == (3,3,3,3)) - if useclose: - c1.close() - c5.close() - c6.close() - else: - c1 = c5 = c6 = None - self.assert_(status(p) == (3,3,0,0)) - c1 = p.connect() - c2 = p.connect() - self.assert_(status(p) == (3, 1, 0, 2), status(p)) - if useclose: - c2.close() - else: - c2 = None - self.assert_(status(p) == (3, 2, 0, 1)) - - def test_timeout(self): - p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = 0, use_threadlocal = False, timeout=2) - c1 = p.connect() - c2 = p.connect() - c3 = p.connect() - now = time.time() - try: - c4 = p.connect() - assert False - except tsa.exc.TimeoutError, e: - assert int(time.time() - now) == 2 - - def test_timeout_race(self): - # test a race condition where the initial connecting threads all race - # to queue.Empty, then block on the mutex. each thread consumes a - # connection as they go in. when the limit is reached, the remaining - # threads go in, and get TimeoutError; even though they never got to - # wait for the timeout on queue.get(). the fix involves checking the - # timeout again within the mutex, and if so, unlocking and throwing - # them back to the start of do_get() - p = pool.QueuePool(creator = lambda: mock_dbapi.connect(delay=.05), pool_size = 2, max_overflow = 1, use_threadlocal = False, timeout=3) - timeouts = [] - def checkout(): - for x in xrange(1): - now = time.time() - try: - c1 = p.connect() - except tsa.exc.TimeoutError, e: - timeouts.append(int(time.time()) - now) - continue - time.sleep(4) - c1.close() - - threads = [] - for i in xrange(10): - th = threading.Thread(target=checkout) - th.start() - threads.append(th) - for th in threads: - th.join() - - print timeouts - assert len(timeouts) > 0 - for t in timeouts: - assert abs(t - 3) < 1, "Not all timeouts were 3 seconds: " + repr(timeouts) - - def _test_overflow(self, thread_count, max_overflow): - def creator(): - time.sleep(.05) - return mock_dbapi.connect() - - p = pool.QueuePool(creator=creator, - pool_size=3, timeout=2, - max_overflow=max_overflow) - peaks = [] - def whammy(): - for i in range(10): - try: - con = p.connect() - time.sleep(.005) - peaks.append(p.overflow()) - con.close() - del con - except tsa.exc.TimeoutError: - pass - threads = [] - for i in xrange(thread_count): - th = threading.Thread(target=whammy) - th.start() - threads.append(th) - for th in threads: - th.join() - - self.assert_(max(peaks) <= max_overflow) - - def test_no_overflow(self): - self._test_overflow(40, 0) - - def test_max_overflow(self): - self._test_overflow(40, 5) - - def test_mixed_close(self): - p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True) - c1 = p.connect() - c2 = p.connect() - assert c1 is c2 - c1.close() - c2 = None - assert p.checkedout() == 1 - c1 = None - assert p.checkedout() == 0 - - def test_weakref_kaboom(self): - p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True) - c1 = p.connect() - c2 = p.connect() - c1.close() - c2 = None - del c1 - del c2 - gc.collect() - assert p.checkedout() == 0 - c3 = p.connect() - assert c3 is not None - - def test_trick_the_counter(self): - """this is a "flaw" in the connection pool; since threadlocal uses a single ConnectionFairy per thread - with an open/close counter, you can fool the counter into giving you a ConnectionFairy with an - ambiguous counter. i.e. its not true reference counting.""" - p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True) - c1 = p.connect() - c2 = p.connect() - assert c1 is c2 - c1.close() - c2 = p.connect() - c2.close() - self.assert_(p.checkedout() != 0) - - c2.close() - self.assert_(p.checkedout() == 0) - - def test_recycle(self): - p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 1, max_overflow = 0, use_threadlocal = False, recycle=3) - - c1 = p.connect() - c_id = id(c1.connection) - c1.close() - c2 = p.connect() - assert id(c2.connection) == c_id - c2.close() - time.sleep(4) - c3= p.connect() - assert id(c3.connection) != c_id - - def test_invalidate(self): - dbapi = MockDBAPI() - p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False) - c1 = p.connect() - c_id = c1.connection.id - c1.close(); c1=None - c1 = p.connect() - assert c1.connection.id == c_id - c1.invalidate() - c1 = None - - c1 = p.connect() - assert c1.connection.id != c_id - - def test_recreate(self): - dbapi = MockDBAPI() - p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False) - p2 = p.recreate() - assert p2.size() == 1 - assert p2._use_threadlocal is False - assert p2._max_overflow == 0 - - def test_reconnect(self): - """tests reconnect operations at the pool level. SA's engine/dialect includes another - layer of reconnect support for 'database was lost' errors.""" + def testqueuepool_del(self): + self._do_testqueuepool(useclose=False) + + def testqueuepool_close(self): + self._do_testqueuepool(useclose=True) + + def _do_testqueuepool(self, useclose=False): + p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = False) + + def status(pool): + tup = (pool.size(), pool.checkedin(), pool.overflow(), pool.checkedout()) + print "Pool size: %d Connections in pool: %d Current Overflow: %d Current Checked out connections: %d" % tup + return tup + + c1 = p.connect() + self.assert_(status(p) == (3,0,-2,1)) + c2 = p.connect() + self.assert_(status(p) == (3,0,-1,2)) + c3 = p.connect() + self.assert_(status(p) == (3,0,0,3)) + c4 = p.connect() + self.assert_(status(p) == (3,0,1,4)) + c5 = p.connect() + self.assert_(status(p) == (3,0,2,5)) + c6 = p.connect() + self.assert_(status(p) == (3,0,3,6)) + if useclose: + c4.close() + c3.close() + c2.close() + else: + c4 = c3 = c2 = None + lazy_gc() + + self.assert_(status(p) == (3,3,3,3)) + if useclose: + c1.close() + c5.close() + c6.close() + else: + c1 = c5 = c6 = None + lazy_gc() + + self.assert_(status(p) == (3,3,0,0)) + + c1 = p.connect() + c2 = p.connect() + self.assert_(status(p) == (3, 1, 0, 2), status(p)) + if useclose: + c2.close() + else: + c2 = None + lazy_gc() + + self.assert_(status(p) == (3, 2, 0, 1)) + + c1.close() + + lazy_gc() + assert not pool._refs - dbapi = MockDBAPI() - p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False) - c1 = p.connect() - c_id = c1.connection.id - c1.close(); c1=None - - c1 = p.connect() - assert c1.connection.id == c_id - dbapi.raise_error = True - c1.invalidate() - c1 = None - - c1 = p.connect() - assert c1.connection.id != c_id - - def test_detach(self): - dbapi = MockDBAPI() - p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False) - - c1 = p.connect() - c1.detach() - c_id = c1.connection.id - - c2 = p.connect() - assert c2.connection.id != c1.connection.id - dbapi.raise_error = True - - c2.invalidate() - c2 = None - - c2 = p.connect() - assert c2.connection.id != c1.connection.id - - con = c1.connection - - assert not con.closed - c1.close() - assert con.closed - - def test_threadfairy(self): - p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True) - c1 = p.connect() - c1.close() - c2 = p.connect() - assert c2.connection is not None + def test_timeout(self): + p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = 0, use_threadlocal = False, timeout=2) + c1 = p.connect() + c2 = p.connect() + c3 = p.connect() + now = time.time() + try: + c4 = p.connect() + assert False + except tsa.exc.TimeoutError, e: + assert int(time.time() - now) == 2 + + def test_timeout_race(self): + # test a race condition where the initial connecting threads all race + # to queue.Empty, then block on the mutex. each thread consumes a + # connection as they go in. when the limit is reached, the remaining + # threads go in, and get TimeoutError; even though they never got to + # wait for the timeout on queue.get(). the fix involves checking the + # timeout again within the mutex, and if so, unlocking and throwing + # them back to the start of do_get() + p = pool.QueuePool(creator = lambda: mock_dbapi.connect(delay=.05), pool_size = 2, max_overflow = 1, use_threadlocal = False, timeout=3) + timeouts = [] + def checkout(): + for x in xrange(1): + now = time.time() + try: + c1 = p.connect() + except tsa.exc.TimeoutError, e: + timeouts.append(int(time.time()) - now) + continue + time.sleep(4) + c1.close() + + threads = [] + for i in xrange(10): + th = threading.Thread(target=checkout) + th.start() + threads.append(th) + for th in threads: + th.join() + + print timeouts + assert len(timeouts) > 0 + for t in timeouts: + assert abs(t - 3) < 1, "Not all timeouts were 3 seconds: " + repr(timeouts) + + def _test_overflow(self, thread_count, max_overflow): + def creator(): + time.sleep(.05) + return mock_dbapi.connect() + + p = pool.QueuePool(creator=creator, + pool_size=3, timeout=2, + max_overflow=max_overflow) + peaks = [] + def whammy(): + for i in range(10): + try: + con = p.connect() + time.sleep(.005) + peaks.append(p.overflow()) + con.close() + del con + except tsa.exc.TimeoutError: + pass + threads = [] + for i in xrange(thread_count): + th = threading.Thread(target=whammy) + th.start() + threads.append(th) + for th in threads: + th.join() + + self.assert_(max(peaks) <= max_overflow) + + lazy_gc() + assert not pool._refs + + def test_no_overflow(self): + self._test_overflow(40, 0) + + def test_max_overflow(self): + self._test_overflow(40, 5) + + def test_mixed_close(self): + p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True) + c1 = p.connect() + c2 = p.connect() + assert c1 is c2 + c1.close() + c2 = None + assert p.checkedout() == 1 + c1 = None + lazy_gc() + assert p.checkedout() == 0 + + lazy_gc() + assert not pool._refs + + def test_weakref_kaboom(self): + p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True) + c1 = p.connect() + c2 = p.connect() + c1.close() + c2 = None + del c1 + del c2 + gc_collect() + assert p.checkedout() == 0 + c3 = p.connect() + assert c3 is not None + + def test_trick_the_counter(self): + """this is a "flaw" in the connection pool; since threadlocal uses a single ConnectionFairy per thread + with an open/close counter, you can fool the counter into giving you a ConnectionFairy with an + ambiguous counter. i.e. its not true reference counting.""" + p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True) + c1 = p.connect() + c2 = p.connect() + assert c1 is c2 + c1.close() + c2 = p.connect() + c2.close() + self.assert_(p.checkedout() != 0) + + c2.close() + self.assert_(p.checkedout() == 0) + + def test_recycle(self): + p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 1, max_overflow = 0, use_threadlocal = False, recycle=3) + + c1 = p.connect() + c_id = id(c1.connection) + c1.close() + c2 = p.connect() + assert id(c2.connection) == c_id + c2.close() + time.sleep(4) + c3= p.connect() + assert id(c3.connection) != c_id + + def test_invalidate(self): + dbapi = MockDBAPI() + p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False) + c1 = p.connect() + c_id = c1.connection.id + c1.close(); c1=None + c1 = p.connect() + assert c1.connection.id == c_id + c1.invalidate() + c1 = None + + c1 = p.connect() + assert c1.connection.id != c_id + + def test_recreate(self): + dbapi = MockDBAPI() + p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False) + p2 = p.recreate() + assert p2.size() == 1 + assert p2._use_threadlocal is False + assert p2._max_overflow == 0 + + def test_reconnect(self): + """tests reconnect operations at the pool level. SA's engine/dialect includes another + layer of reconnect support for 'database was lost' errors.""" + + dbapi = MockDBAPI() + p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False) + c1 = p.connect() + c_id = c1.connection.id + c1.close(); c1=None + + c1 = p.connect() + assert c1.connection.id == c_id + dbapi.raise_error = True + c1.invalidate() + c1 = None + + c1 = p.connect() + assert c1.connection.id != c_id + + def test_detach(self): + dbapi = MockDBAPI() + p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False) + + c1 = p.connect() + c1.detach() + c_id = c1.connection.id + + c2 = p.connect() + assert c2.connection.id != c1.connection.id + dbapi.raise_error = True + + c2.invalidate() + c2 = None + + c2 = p.connect() + assert c2.connection.id != c1.connection.id + + con = c1.connection + + assert not con.closed + c1.close() + assert con.closed + + def test_threadfairy(self): + p = pool.QueuePool(creator = mock_dbapi.connect, pool_size = 3, max_overflow = -1, use_threadlocal = True) + c1 = p.connect() + c1.close() + c2 = p.connect() + assert c2.connection is not None class SingletonThreadPoolTest(PoolTestBase): def test_cleanup(self): |
