diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2014-05-30 16:24:38 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2014-05-30 16:24:38 -0400 |
| commit | 814637e291953bc7e05ced3e215ef33bde5b040a (patch) | |
| tree | 7b46787f2b04ae06bb2934833d827bfb04b90036 /test/engine | |
| parent | 8daa6ccfb0be6486d36ebdd3cd709e8ebfbfa207 (diff) | |
| download | sqlalchemy-814637e291953bc7e05ced3e215ef33bde5b040a.tar.gz | |
- vastly improve the "safe close cursor" tests in test_reconnect
- Fixed bug which would occur if a DBAPI exception
occurs when the engine first connects and does its initial checks,
and the exception is not a disconnect exception, yet the cursor
raises an error when we try to close it. In this case the real
exception would be quashed as we tried to log the cursor close
exception via the connection pool and failed, as we were trying
to access the pool's logger in a way that is inappropriate
in this very specific scenario. fixes #3063
Diffstat (limited to 'test/engine')
| -rw-r--r-- | test/engine/test_reconnect.py | 71 |
1 files changed, 57 insertions, 14 deletions
diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py index 23a3b3703..e6897b13d 100644 --- a/test/engine/test_reconnect.py +++ b/test/engine/test_reconnect.py @@ -373,33 +373,76 @@ class MockReconnectTest(fixtures.TestBase): class CursorErrTest(fixtures.TestBase): + # this isn't really a "reconnect" test, it's more of + # a generic "recovery". maybe this test suite should have been + # named "test_error_recovery". + def _fixture(self, explode_on_exec, initialize): + class DBAPIError(Exception): + pass - def setup(self): def MockDBAPI(): def cursor(): while True: - yield Mock( - description=[], - close=Mock(side_effect=Exception("explode"))) + if explode_on_exec: + yield Mock( + description=[], + close=Mock(side_effect=DBAPIError("explode")), + execute=Mock(side_effect=DBAPIError("explode")) + ) + else: + yield Mock( + description=[], + close=Mock(side_effect=Exception("explode")), + ) def connect(): while True: - yield Mock(cursor=Mock(side_effect=cursor())) - - return Mock(connect=Mock(side_effect=connect())) - + yield Mock( + spec=['cursor', 'commit', 'rollback', 'close'], + cursor=Mock(side_effect=cursor()), + ) + + return Mock( + Error = DBAPIError, + paramstyle='qmark', + connect=Mock(side_effect=connect()) + ) dbapi = MockDBAPI() - self.db = testing_engine( - 'postgresql://foo:bar@localhost/test', - options=dict(module=dbapi, _initialize=False)) + + from sqlalchemy.engine import default + url = Mock( + get_dialect=lambda: default.DefaultDialect, + translate_connect_args=lambda: {}, + query={}, + ) + eng = testing_engine( + url, + options=dict(module=dbapi, _initialize=initialize)) + eng.pool.logger = Mock() + return eng def test_cursor_explode(self): - conn = self.db.connect() + db = self._fixture(False, False) + conn = db.connect() result = conn.execute("select foo") result.close() conn.close() + eq_( + db.pool.logger.error.mock_calls, + [call('Error closing cursor', exc_info=True)] + ) + + def test_cursor_shutdown_in_initialize(self): + db = self._fixture(True, True) + assert_raises_message( + exc.SAWarning, + "Exception attempting to detect", + db.connect + ) + eq_( + db.pool.logger.error.mock_calls, + [call('Error closing cursor', exc_info=True)] + ) - def teardown(self): - self.db.dispose() def _assert_invalidated(fn, *args): |
