summaryrefslogtreecommitdiff
path: root/test/engine
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2014-05-30 16:24:38 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2014-05-30 16:24:38 -0400
commit814637e291953bc7e05ced3e215ef33bde5b040a (patch)
tree7b46787f2b04ae06bb2934833d827bfb04b90036 /test/engine
parent8daa6ccfb0be6486d36ebdd3cd709e8ebfbfa207 (diff)
downloadsqlalchemy-814637e291953bc7e05ced3e215ef33bde5b040a.tar.gz
- vastly improve the "safe close cursor" tests in test_reconnect
- Fixed bug which would occur if a DBAPI exception occurs when the engine first connects and does its initial checks, and the exception is not a disconnect exception, yet the cursor raises an error when we try to close it. In this case the real exception would be quashed as we tried to log the cursor close exception via the connection pool and failed, as we were trying to access the pool's logger in a way that is inappropriate in this very specific scenario. fixes #3063
Diffstat (limited to 'test/engine')
-rw-r--r--test/engine/test_reconnect.py71
1 files changed, 57 insertions, 14 deletions
diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py
index 23a3b3703..e6897b13d 100644
--- a/test/engine/test_reconnect.py
+++ b/test/engine/test_reconnect.py
@@ -373,33 +373,76 @@ class MockReconnectTest(fixtures.TestBase):
class CursorErrTest(fixtures.TestBase):
+ # this isn't really a "reconnect" test, it's more of
+ # a generic "recovery". maybe this test suite should have been
+ # named "test_error_recovery".
+ def _fixture(self, explode_on_exec, initialize):
+ class DBAPIError(Exception):
+ pass
- def setup(self):
def MockDBAPI():
def cursor():
while True:
- yield Mock(
- description=[],
- close=Mock(side_effect=Exception("explode")))
+ if explode_on_exec:
+ yield Mock(
+ description=[],
+ close=Mock(side_effect=DBAPIError("explode")),
+ execute=Mock(side_effect=DBAPIError("explode"))
+ )
+ else:
+ yield Mock(
+ description=[],
+ close=Mock(side_effect=Exception("explode")),
+ )
def connect():
while True:
- yield Mock(cursor=Mock(side_effect=cursor()))
-
- return Mock(connect=Mock(side_effect=connect()))
-
+ yield Mock(
+ spec=['cursor', 'commit', 'rollback', 'close'],
+ cursor=Mock(side_effect=cursor()),
+ )
+
+ return Mock(
+ Error = DBAPIError,
+ paramstyle='qmark',
+ connect=Mock(side_effect=connect())
+ )
dbapi = MockDBAPI()
- self.db = testing_engine(
- 'postgresql://foo:bar@localhost/test',
- options=dict(module=dbapi, _initialize=False))
+
+ from sqlalchemy.engine import default
+ url = Mock(
+ get_dialect=lambda: default.DefaultDialect,
+ translate_connect_args=lambda: {},
+ query={},
+ )
+ eng = testing_engine(
+ url,
+ options=dict(module=dbapi, _initialize=initialize))
+ eng.pool.logger = Mock()
+ return eng
def test_cursor_explode(self):
- conn = self.db.connect()
+ db = self._fixture(False, False)
+ conn = db.connect()
result = conn.execute("select foo")
result.close()
conn.close()
+ eq_(
+ db.pool.logger.error.mock_calls,
+ [call('Error closing cursor', exc_info=True)]
+ )
+
+ def test_cursor_shutdown_in_initialize(self):
+ db = self._fixture(True, True)
+ assert_raises_message(
+ exc.SAWarning,
+ "Exception attempting to detect",
+ db.connect
+ )
+ eq_(
+ db.pool.logger.error.mock_calls,
+ [call('Error closing cursor', exc_info=True)]
+ )
- def teardown(self):
- self.db.dispose()
def _assert_invalidated(fn, *args):