summaryrefslogtreecommitdiff
path: root/test/engine/test_pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine/test_pool.py')
-rw-r--r--test/engine/test_pool.py76
1 files changed, 64 insertions, 12 deletions
diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py
index 0c4557d49..196e4e760 100644
--- a/test/engine/test_pool.py
+++ b/test/engine/test_pool.py
@@ -49,6 +49,7 @@ class PoolTestBase(fixtures.TestBase):
return dbapi, pool.QueuePool(creator=lambda: dbapi.connect('foo.db'),
**kw)
+
class PoolTest(PoolTestBase):
def test_manager(self):
manager = pool.manage(MockDBAPI(), use_threadlocal=True)
@@ -86,7 +87,6 @@ class PoolTest(PoolTestBase):
]
)
-
def test_bad_args(self):
manager = pool.manage(MockDBAPI())
manager.connect(None)
@@ -218,6 +218,7 @@ class PoolTest(PoolTestBase):
class PoolDialectTest(PoolTestBase):
def _dialect(self):
canary = []
+
class PoolDialect(object):
def do_rollback(self, dbapi_connection):
canary.append('R')
@@ -266,6 +267,7 @@ class PoolEventsTest(PoolTestBase):
def _first_connect_event_fixture(self):
p = self._queuepool_fixture()
canary = []
+
def first_connect(*arg, **kw):
canary.append('first_connect')
@@ -276,8 +278,10 @@ class PoolEventsTest(PoolTestBase):
def _connect_event_fixture(self):
p = self._queuepool_fixture()
canary = []
+
def connect(*arg, **kw):
canary.append('connect')
+
event.listen(p, 'connect', connect)
return p, canary
@@ -285,6 +289,7 @@ class PoolEventsTest(PoolTestBase):
def _checkout_event_fixture(self):
p = self._queuepool_fixture()
canary = []
+
def checkout(*arg, **kw):
canary.append('checkout')
event.listen(p, 'checkout', checkout)
@@ -294,6 +299,7 @@ class PoolEventsTest(PoolTestBase):
def _checkin_event_fixture(self):
p = self._queuepool_fixture()
canary = []
+
def checkin(*arg, **kw):
canary.append('checkin')
event.listen(p, 'checkin', checkin)
@@ -303,6 +309,7 @@ class PoolEventsTest(PoolTestBase):
def _reset_event_fixture(self):
p = self._queuepool_fixture()
canary = []
+
def reset(*arg, **kw):
canary.append('reset')
event.listen(p, 'reset', reset)
@@ -470,12 +477,16 @@ class PoolEventsTest(PoolTestBase):
def test_listen_targets_scope(self):
canary = []
+
def listen_one(*args):
canary.append("listen_one")
+
def listen_two(*args):
canary.append("listen_two")
+
def listen_three(*args):
canary.append("listen_three")
+
def listen_four(*args):
canary.append("listen_four")
@@ -492,13 +503,17 @@ class PoolEventsTest(PoolTestBase):
)
def test_listen_targets_per_subclass(self):
- """test that listen() called on a subclass remains specific to that subclass."""
+ """test that listen() called on a subclass remains specific to
+ that subclass."""
canary = []
+
def listen_one(*args):
canary.append("listen_one")
+
def listen_two(*args):
canary.append("listen_two")
+
def listen_three(*args):
canary.append("listen_three")
@@ -526,6 +541,7 @@ class PoolEventsTest(PoolTestBase):
# going
pool.Pool.dispatch._clear()
+
class PoolFirstConnectSyncTest(PoolTestBase):
# test [ticket:2964]
@@ -560,11 +576,14 @@ class PoolFirstConnectSyncTest(PoolTestBase):
th.join(join_timeout)
eq_(evt.mock_calls,
- [call.first_connect(), call.connect(), call.connect(), call.connect()]
+ [
+ call.first_connect(),
+ call.connect(),
+ call.connect(),
+ call.connect()]
)
-
class DeprecatedPoolListenerTest(PoolTestBase):
@testing.requires.predictable_gc
@testing.uses_deprecated(r".*Use event.listen")
@@ -580,38 +599,45 @@ class DeprecatedPoolListenerTest(PoolTestBase):
if hasattr(self, 'checkin'):
self.checkin = self.inst_checkin
self.clear()
+
def clear(self):
self.connected = []
self.first_connected = []
self.checked_out = []
self.checked_in = []
+
def assert_total(innerself, conn, fconn, cout, cin):
eq_(len(innerself.connected), conn)
eq_(len(innerself.first_connected), fconn)
eq_(len(innerself.checked_out), cout)
eq_(len(innerself.checked_in), cin)
+
def assert_in(innerself, item, in_conn, in_fconn,
in_cout, in_cin):
self.assert_((item in innerself.connected) == in_conn)
self.assert_((item in innerself.first_connected) == in_fconn)
self.assert_((item in innerself.checked_out) == in_cout)
self.assert_((item in innerself.checked_in) == in_cin)
+
def inst_connect(self, con, record):
print("connect(%s, %s)" % (con, record))
assert con is not None
assert record is not None
self.connected.append(con)
+
def inst_first_connect(self, con, record):
print("first_connect(%s, %s)" % (con, record))
assert con is not None
assert record is not None
self.first_connected.append(con)
+
def inst_checkout(self, con, record, proxy):
print("checkout(%s, %s, %s)" % (con, record, proxy))
assert con is not None
assert record is not None
assert proxy is not None
self.checked_out.append(con)
+
def inst_checkin(self, con, record):
print("checkin(%s, %s)" % (con, record))
# con can be None if invalidated
@@ -620,15 +646,19 @@ class DeprecatedPoolListenerTest(PoolTestBase):
class ListenAll(tsa.interfaces.PoolListener, InstrumentingListener):
pass
+
class ListenConnect(InstrumentingListener):
def connect(self, con, record):
pass
+
class ListenFirstConnect(InstrumentingListener):
def first_connect(self, con, record):
pass
+
class ListenCheckOut(InstrumentingListener):
def checkout(self, con, record, proxy, num):
pass
+
class ListenCheckIn(InstrumentingListener):
def checkin(self, con, record):
pass
@@ -746,8 +776,10 @@ class DeprecatedPoolListenerTest(PoolTestBase):
def test_listeners_callables(self):
def connect(dbapi_con, con_record):
counts[0] += 1
+
def checkout(dbapi_con, con_record, con_proxy):
counts[1] += 1
+
def checkin(dbapi_con, con_record):
counts[2] += 1
@@ -884,6 +916,7 @@ class QueuePoolTest(PoolTestBase):
pool_size=2,
max_overflow=1, use_threadlocal=False, timeout=3)
timeouts = []
+
def checkout():
for x in range(1):
now = time.time()
@@ -915,6 +948,7 @@ class QueuePoolTest(PoolTestBase):
dbapi = MockDBAPI()
mutex = threading.Lock()
+
def creator():
time.sleep(.05)
with mutex:
@@ -924,6 +958,7 @@ class QueuePoolTest(PoolTestBase):
pool_size=3, timeout=2,
max_overflow=max_overflow)
peaks = []
+
def whammy():
for i in range(10):
try:
@@ -947,7 +982,6 @@ class QueuePoolTest(PoolTestBase):
lazy_gc()
assert not pool._refs
-
def test_overflow_reset_on_failed_connect(self):
dbapi = Mock()
@@ -956,6 +990,7 @@ class QueuePoolTest(PoolTestBase):
raise Exception("connection failed")
creator = dbapi.connect
+
def create():
return creator()
@@ -1029,7 +1064,6 @@ class QueuePoolTest(PoolTestBase):
call("overflow_one")]
)
-
@testing.requires.threading_with_mock
@testing.requires.timing_intensive
def test_waiters_handled(self):
@@ -1039,6 +1073,7 @@ class QueuePoolTest(PoolTestBase):
"""
mutex = threading.Lock()
dbapi = MockDBAPI()
+
def creator():
mutex.acquire()
try:
@@ -1052,6 +1087,7 @@ class QueuePoolTest(PoolTestBase):
p = pool.QueuePool(creator=creator,
pool_size=2, timeout=timeout,
max_overflow=max_overflow)
+
def waiter(p, timeout, max_overflow):
success_key = (timeout, max_overflow)
conn = p.connect()
@@ -1088,12 +1124,14 @@ class QueuePoolTest(PoolTestBase):
dbapi = MockDBAPI()
canary = []
+
def creator():
canary.append(1)
return dbapi.connect()
p1 = pool.QueuePool(creator=creator,
pool_size=1, timeout=None,
max_overflow=0)
+
def waiter(p):
conn = p.connect()
canary.append(2)
@@ -1165,7 +1203,8 @@ class QueuePoolTest(PoolTestBase):
def test_mixed_close(self):
pool._refs.clear()
- p = self._queuepool_fixture(pool_size=3, max_overflow=-1, use_threadlocal=True)
+ p = self._queuepool_fixture(pool_size=3, max_overflow=-1,
+ use_threadlocal=True)
c1 = p.connect()
c2 = p.connect()
assert c1 is c2
@@ -1191,6 +1230,7 @@ class QueuePoolTest(PoolTestBase):
# disable weakref collection of the
# underlying connections
strong_refs = set()
+
def _conn():
c = p.connect()
strong_refs.add(c.connection)
@@ -1334,6 +1374,7 @@ class QueuePoolTest(PoolTestBase):
dialect.dbapi.Error = Error
pools = []
+
class TrackQueuePool(pool.QueuePool):
def __init__(self, *arg, **kw):
pools.append(self)
@@ -1357,11 +1398,13 @@ class QueuePoolTest(PoolTestBase):
def attempt(conn):
time.sleep(random.random())
try:
- conn._handle_dbapi_exception(Error(), "statement", {}, Mock(), Mock())
+ conn._handle_dbapi_exception(Error(), "statement", {},
+ Mock(), Mock())
except tsa.exc.DBAPIError:
pass
- # run an error + invalidate operation on the remaining 7 open connections
+ # run an error + invalidate operation on the remaining 7 open
+ #connections
threads = []
for conn in conns:
t = threading.Thread(target=attempt, args=(conn, ))
@@ -1399,7 +1442,8 @@ class QueuePoolTest(PoolTestBase):
assert c1.connection.id != c_id
def test_recreate(self):
- p = self._queuepool_fixture(reset_on_return=None, pool_size=1, max_overflow=0)
+ p = self._queuepool_fixture(reset_on_return=None, pool_size=1,
+ max_overflow=0)
p2 = p.recreate()
assert p2.size() == 1
assert p2._reset_on_return is pool.reset_none
@@ -1454,16 +1498,19 @@ class QueuePoolTest(PoolTestBase):
eq_(c2_con.close.call_count, 0)
def test_threadfairy(self):
- p = self._queuepool_fixture(pool_size=3, max_overflow=-1, use_threadlocal=True)
+ p = self._queuepool_fixture(pool_size=3, max_overflow=-1,
+ use_threadlocal=True)
c1 = p.connect()
c1.close()
c2 = p.connect()
assert c2.connection is not None
+
class ResetOnReturnTest(PoolTestBase):
def _fixture(self, **kw):
dbapi = Mock()
- return dbapi, pool.QueuePool(creator=lambda: dbapi.connect('foo.db'), **kw)
+ return dbapi, pool.QueuePool(creator=lambda: dbapi.connect('foo.db'),
+ **kw)
def test_plain_rollback(self):
dbapi, p = self._fixture(reset_on_return='rollback')
@@ -1550,6 +1597,7 @@ class ResetOnReturnTest(PoolTestBase):
assert not dbapi.connect().rollback.called
assert dbapi.connect().commit.called
+
class SingletonThreadPoolTest(PoolTestBase):
@testing.requires.threading_with_mock
@@ -1567,6 +1615,7 @@ class SingletonThreadPoolTest(PoolTestBase):
dbapi = MockDBAPI()
lock = threading.Lock()
+
def creator():
# the mock iterator isn't threadsafe...
with lock:
@@ -1575,6 +1624,7 @@ class SingletonThreadPoolTest(PoolTestBase):
if strong_refs:
sr = set()
+
def _conn():
c = p.connect()
sr.add(c.connection)
@@ -1604,6 +1654,7 @@ class SingletonThreadPoolTest(PoolTestBase):
still_opened = len([c for c in sr if not c.close.call_count])
eq_(still_opened, 3)
+
class AssertionPoolTest(PoolTestBase):
def test_connect_error(self):
dbapi = MockDBAPI()
@@ -1622,6 +1673,7 @@ class AssertionPoolTest(PoolTestBase):
c3 = p.connect()
assert_raises(AssertionError, p.connect)
+
class NullPoolTest(PoolTestBase):
def test_reconnect(self):
dbapi = MockDBAPI()