diff options
Diffstat (limited to 'test/engine/test_reconnect.py')
| -rw-r--r-- | test/engine/test_reconnect.py | 39 |
1 files changed, 35 insertions, 4 deletions
diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py index 31a2b705a..9b3a1b4db 100644 --- a/test/engine/test_reconnect.py +++ b/test/engine/test_reconnect.py @@ -58,7 +58,7 @@ class MockReconnectTest(TestBase): module=dbapi, _initialize=False) # monkeypatch disconnect checker - db.dialect.is_disconnect = lambda e: isinstance(e, MockDisconnect) + db.dialect.is_disconnect = lambda e, conn, cursor: isinstance(e, MockDisconnect) def test_reconnect(self): """test that an 'is_disconnect' condition will invalidate the @@ -259,6 +259,22 @@ class RealReconnectTest(TestBase): conn.close() + def test_ensure_is_disconnect_gets_connection(self): + def is_disconnect(e, conn, cursor): + # connection is still present + assert conn.connection is not None + # the error usually occurs on connection.cursor(), + # though MySQLdb we get a non-working cursor. + # assert cursor is None + + engine.dialect.is_disconnect = is_disconnect + conn = engine.connect() + engine.test_shutdown() + assert_raises( + tsa.exc.DBAPIError, + conn.execute, select([1]) + ) + def test_invalidate_twice(self): conn = engine.connect() conn.invalidate() @@ -280,7 +296,7 @@ class RealReconnectTest(TestBase): p1 = engine.pool - def is_disconnect(e): + def is_disconnect(e, conn, cursor): return True engine.dialect.is_disconnect = is_disconnect @@ -374,13 +390,28 @@ class RecycleTest(TestBase): def test_basic(self): for threadlocal in False, True: - engine = engines.reconnecting_engine(options={'pool_recycle' - : 1, 'pool_threadlocal': threadlocal}) + engine = engines.reconnecting_engine( + options={'pool_threadlocal': threadlocal}) + conn = engine.contextual_connect() eq_(conn.execute(select([1])).scalar(), 1) conn.close() + + # set the pool recycle down to 1. + # we aren't doing this inline with the + # engine create since cx_oracle takes way + # too long to create the 1st connection and don't + # want to build a huge delay into this test. + + engine.pool._recycle = 1 + + # kill the DB connection engine.test_shutdown() + + # wait until past the recycle period time.sleep(2) + + # can connect, no exception conn = engine.contextual_connect() eq_(conn.execute(select([1])).scalar(), 1) conn.close() |
