diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2017-02-24 10:50:14 -0500 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2017-03-20 18:01:23 -0400 |
| commit | f881dae8179b94f72ab0dc85d8f62be8c9ce2fe0 (patch) | |
| tree | 5cb60158bc13584b5350b9d7f87604d1e0b4350a /test/engine | |
| parent | 9e06ab17b9d3083cd45540f714234d1d5826da32 (diff) | |
| download | sqlalchemy-f881dae8179b94f72ab0dc85d8f62be8c9ce2fe0.tar.gz | |
Integrate "pre-ping" into connection pool.
Added native "pessimistic disconnection" handling to the :class:`.Pool`
object. The new parameter :paramref:`.Pool.pre_ping`, available from
the engine as :paramref:`.create_engine.pool_pre_ping`, applies an
efficient form of the "pre-ping" recipe featured in the pooling
documentation, which upon each connection check out, emits a simple
statement, typically "SELECT 1", to test the connection for liveness.
If the existing connection is no longer able to respond to commands,
the connection is transparently recycled, and all other connections
made prior to the current timestamp are invalidated.
Change-Id: I89700d0075e60abd2250e54b9bd14daf03c71c00
Fixes: #3919
Diffstat (limited to 'test/engine')
| -rw-r--r-- | test/engine/test_reconnect.py | 149 |
1 files changed, 147 insertions, 2 deletions
diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py index 0d7cdb4e7..be60056a5 100644 --- a/test/engine/test_reconnect.py +++ b/test/engine/test_reconnect.py @@ -4,6 +4,7 @@ from sqlalchemy import ( select, MetaData, Integer, String, create_engine, pool, exc, util) from sqlalchemy.testing.schema import Table, Column import sqlalchemy as tsa +from sqlalchemy.engine import url from sqlalchemy import testing from sqlalchemy.testing import mock from sqlalchemy.testing import engines @@ -81,18 +82,27 @@ def mock_connection(): def MockDBAPI(): connections = [] + stopped = [False] def connect(): while True: + if stopped[0]: + raise MockDisconnect("database is stopped") conn = mock_connection() connections.append(conn) yield conn - def shutdown(explode='execute'): + def shutdown(explode='execute', stop=False): + stopped[0] = stop for c in connections: c.explode = explode + def restart(): + stopped[0] = False + connections[:] = [] + def dispose(): + stopped[0] = False for c in connections: c.explode = None connections[:] = [] @@ -101,11 +111,101 @@ def MockDBAPI(): connect=Mock(side_effect=connect()), shutdown=Mock(side_effect=shutdown), dispose=Mock(side_effect=dispose), + restart=Mock(side_effect=restart), paramstyle='named', connections=connections, Error=MockError) +class PrePingMockTest(fixtures.TestBase): + def setup(self): + self.dbapi = MockDBAPI() + + def _pool_fixture(self, pre_ping): + dialect = url.make_url( + 'postgresql://foo:bar@localhost/test').get_dialect()() + dialect.dbapi = self.dbapi + _pool = pool.QueuePool( + creator=lambda: self.dbapi.connect('foo.db'), pre_ping=pre_ping, + dialect=dialect) + + dialect.is_disconnect = \ + lambda e, conn, cursor: isinstance(e, MockDisconnect) + return _pool + + def teardown(self): + self.dbapi.dispose() + + def test_connect_across_restart(self): + pool = self._pool_fixture(pre_ping=True) + + conn = pool.connect() + stale_connection = conn.connection + conn.close() + + self.dbapi.shutdown("execute") + self.dbapi.restart() + + conn = pool.connect() + cursor = conn.cursor() + cursor.execute("hi") + + stale_cursor = stale_connection.cursor() + assert_raises( + MockDisconnect, + stale_cursor.execute, "hi" + ) + + def test_raise_db_is_stopped(self): + pool = self._pool_fixture(pre_ping=True) + + conn = pool.connect() + conn.close() + + self.dbapi.shutdown("execute", stop=True) + + assert_raises_message( + MockDisconnect, + "database is stopped", + pool.connect + ) + + def test_waits_til_exec_wo_ping_db_is_stopped(self): + pool = self._pool_fixture(pre_ping=False) + + conn = pool.connect() + conn.close() + + self.dbapi.shutdown("execute", stop=True) + + conn = pool.connect() + + cursor = conn.cursor() + assert_raises_message( + MockDisconnect, + "Lost the DB connection on execute", + cursor.execute, "foo" + ) + + def test_waits_til_exec_wo_ping_db_is_restarted(self): + pool = self._pool_fixture(pre_ping=False) + + conn = pool.connect() + conn.close() + + self.dbapi.shutdown("execute", stop=True) + self.dbapi.restart() + + conn = pool.connect() + + cursor = conn.cursor() + assert_raises_message( + MockDisconnect, + "Lost the DB connection on execute", + cursor.execute, "foo" + ) + + class MockReconnectTest(fixtures.TestBase): def setup(self): self.dbapi = MockDBAPI() @@ -473,7 +573,6 @@ class MockReconnectTest(fixtures.TestBase): conn.execute(select([1])) assert not conn.invalidated - class CursorErrTest(fixtures.TestBase): # this isn't really a "reconnect" test, it's more of # a generic "recovery". maybe this test suite should have been @@ -818,6 +917,52 @@ class RecycleTest(fixtures.TestBase): conn.close() +class PrePingRealTest(fixtures.TestBase): + __backend__ = True + + def test_pre_ping_db_is_restarted(self): + engine = engines.reconnecting_engine( + options={"pool_pre_ping": True} + ) + + conn = engine.connect() + eq_(conn.execute(select([1])).scalar(), 1) + stale_connection = conn.connection.connection + conn.close() + + engine.test_shutdown() + engine.test_restart() + + conn = engine.connect() + eq_(conn.execute(select([1])).scalar(), 1) + conn.close() + + def exercise_stale_connection(): + curs = stale_connection.cursor() + curs.execute("select 1") + + assert_raises( + engine.dialect.dbapi.Error, + exercise_stale_connection + ) + + def test_pre_ping_db_stays_shutdown(self): + engine = engines.reconnecting_engine( + options={"pool_pre_ping": True} + ) + + conn = engine.connect() + eq_(conn.execute(select([1])).scalar(), 1) + conn.close() + + engine.test_shutdown(stop=True) + + assert_raises( + exc.DBAPIError, + engine.connect + ) + + class InvalidateDuringResultTest(fixtures.TestBase): __backend__ = True |
