summaryrefslogtreecommitdiff
path: root/test/engine/test_execute.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine/test_execute.py')
-rw-r--r--test/engine/test_execute.py318
1 files changed, 293 insertions, 25 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index 5c3279ba9..730ef4446 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -174,7 +174,7 @@ class ExecuteTest(fixtures.TestBase):
@testing.skip_if(
lambda: testing.against('mysql+mysqldb'), 'db-api flaky')
@testing.fails_on_everything_except(
- 'postgresql+psycopg2',
+ 'postgresql+psycopg2', 'postgresql+psycopg2cffi',
'postgresql+pypostgresql', 'mysql+mysqlconnector',
'mysql+pymysql', 'mysql+cymysql')
def test_raw_python(self):
@@ -639,21 +639,21 @@ class ConvenienceExecuteTest(fixtures.TablesTest):
def test_transaction_connection_ctx_commit(self):
fn = self._trans_fn(True)
- conn = testing.db.connect()
- ctx = conn.begin()
- testing.run_as_contextmanager(ctx, fn, 5, value=8)
- self._assert_fn(5, value=8)
+ with testing.db.connect() as conn:
+ ctx = conn.begin()
+ testing.run_as_contextmanager(ctx, fn, 5, value=8)
+ self._assert_fn(5, value=8)
def test_transaction_connection_ctx_rollback(self):
fn = self._trans_rollback_fn(True)
- conn = testing.db.connect()
- ctx = conn.begin()
- assert_raises_message(
- Exception,
- "breakage",
- testing.run_as_contextmanager, ctx, fn, 5, value=8
- )
- self._assert_no_data()
+ with testing.db.connect() as conn:
+ ctx = conn.begin()
+ assert_raises_message(
+ Exception,
+ "breakage",
+ testing.run_as_contextmanager, ctx, fn, 5, value=8
+ )
+ self._assert_no_data()
def test_connection_as_ctx(self):
fn = self._trans_fn()
@@ -666,10 +666,12 @@ class ConvenienceExecuteTest(fixtures.TablesTest):
def test_connect_as_ctx_noautocommit(self):
fn = self._trans_fn()
self._assert_no_data()
- ctx = testing.db.connect().execution_options(autocommit=False)
- testing.run_as_contextmanager(ctx, fn, 5, value=8)
- # autocommit is off
- self._assert_no_data()
+
+ with testing.db.connect() as conn:
+ ctx = conn.execution_options(autocommit=False)
+ testing.run_as_contextmanager(ctx, fn, 5, value=8)
+ # autocommit is off
+ self._assert_no_data()
def test_transaction_engine_fn_commit(self):
fn = self._trans_fn()
@@ -687,17 +689,17 @@ class ConvenienceExecuteTest(fixtures.TablesTest):
def test_transaction_connection_fn_commit(self):
fn = self._trans_fn()
- conn = testing.db.connect()
- conn.transaction(fn, 5, value=8)
- self._assert_fn(5, value=8)
+ with testing.db.connect() as conn:
+ conn.transaction(fn, 5, value=8)
+ self._assert_fn(5, value=8)
def test_transaction_connection_fn_rollback(self):
fn = self._trans_rollback_fn()
- conn = testing.db.connect()
- assert_raises(
- Exception,
- conn.transaction, fn, 5, value=8
- )
+ with testing.db.connect() as conn:
+ assert_raises(
+ Exception,
+ conn.transaction, fn, 5, value=8
+ )
self._assert_no_data()
@@ -1900,6 +1902,272 @@ class HandleErrorTest(fixtures.TestBase):
self._test_alter_disconnect(True, False)
self._test_alter_disconnect(False, False)
+ def test_handle_error_event_connect_isolation_level(self):
+ engine = engines.testing_engine()
+
+ class MySpecialException(Exception):
+ pass
+
+ @event.listens_for(engine, "handle_error")
+ def handle_error(ctx):
+ raise MySpecialException("failed operation")
+
+ ProgrammingError = engine.dialect.dbapi.ProgrammingError
+ with engine.connect() as conn:
+ with patch.object(
+ conn.dialect, "get_isolation_level",
+ Mock(side_effect=ProgrammingError("random error"))
+ ):
+ assert_raises(
+ MySpecialException,
+ conn.get_isolation_level
+ )
+
+
+class HandleInvalidatedOnConnectTest(fixtures.TestBase):
+ __requires__ = ('sqlite', )
+
+ def setUp(self):
+ e = create_engine('sqlite://')
+
+ connection = Mock(
+ get_server_version_info=Mock(return_value='5.0'))
+
+ def connect(*args, **kwargs):
+ return connection
+ dbapi = Mock(
+ sqlite_version_info=(99, 9, 9,),
+ version_info=(99, 9, 9,),
+ sqlite_version='99.9.9',
+ paramstyle='named',
+ connect=Mock(side_effect=connect)
+ )
+
+ sqlite3 = e.dialect.dbapi
+ dbapi.Error = sqlite3.Error,
+ dbapi.ProgrammingError = sqlite3.ProgrammingError
+
+ self.dbapi = dbapi
+ self.ProgrammingError = sqlite3.ProgrammingError
+
+ def test_wraps_connect_in_dbapi(self):
+ dbapi = self.dbapi
+ dbapi.connect = Mock(
+ side_effect=self.ProgrammingError("random error"))
+ try:
+ create_engine('sqlite://', module=dbapi).connect()
+ assert False
+ except tsa.exc.DBAPIError as de:
+ assert not de.connection_invalidated
+
+ def test_handle_error_event_connect(self):
+ dbapi = self.dbapi
+ dbapi.connect = Mock(
+ side_effect=self.ProgrammingError("random error"))
+
+ class MySpecialException(Exception):
+ pass
+
+ eng = create_engine('sqlite://', module=dbapi)
+
+ @event.listens_for(eng, "handle_error")
+ def handle_error(ctx):
+ assert ctx.engine is eng
+ assert ctx.connection is None
+ raise MySpecialException("failed operation")
+
+ assert_raises(
+ MySpecialException,
+ eng.connect
+ )
+
+ def test_handle_error_event_revalidate(self):
+ dbapi = self.dbapi
+
+ class MySpecialException(Exception):
+ pass
+
+ eng = create_engine('sqlite://', module=dbapi, _initialize=False)
+
+ @event.listens_for(eng, "handle_error")
+ def handle_error(ctx):
+ assert ctx.engine is eng
+ assert ctx.connection is conn
+ assert isinstance(ctx.sqlalchemy_exception, tsa.exc.ProgrammingError)
+ raise MySpecialException("failed operation")
+
+ conn = eng.connect()
+ conn.invalidate()
+
+ dbapi.connect = Mock(
+ side_effect=self.ProgrammingError("random error"))
+
+ assert_raises(
+ MySpecialException,
+ getattr, conn, 'connection'
+ )
+
+ def test_handle_error_event_implicit_revalidate(self):
+ dbapi = self.dbapi
+
+ class MySpecialException(Exception):
+ pass
+
+ eng = create_engine('sqlite://', module=dbapi, _initialize=False)
+
+ @event.listens_for(eng, "handle_error")
+ def handle_error(ctx):
+ assert ctx.engine is eng
+ assert ctx.connection is conn
+ assert isinstance(
+ ctx.sqlalchemy_exception, tsa.exc.ProgrammingError)
+ raise MySpecialException("failed operation")
+
+ conn = eng.connect()
+ conn.invalidate()
+
+ dbapi.connect = Mock(
+ side_effect=self.ProgrammingError("random error"))
+
+ assert_raises(
+ MySpecialException,
+ conn.execute, select([1])
+ )
+
+ def test_handle_error_custom_connect(self):
+ dbapi = self.dbapi
+
+ class MySpecialException(Exception):
+ pass
+
+ def custom_connect():
+ raise self.ProgrammingError("random error")
+
+ eng = create_engine('sqlite://', module=dbapi, creator=custom_connect)
+
+ @event.listens_for(eng, "handle_error")
+ def handle_error(ctx):
+ assert ctx.engine is eng
+ assert ctx.connection is None
+ raise MySpecialException("failed operation")
+
+ assert_raises(
+ MySpecialException,
+ eng.connect
+ )
+
+ def test_handle_error_event_connect_invalidate_flag(self):
+ dbapi = self.dbapi
+ dbapi.connect = Mock(
+ side_effect=self.ProgrammingError(
+ "Cannot operate on a closed database."))
+
+ class MySpecialException(Exception):
+ pass
+
+ eng = create_engine('sqlite://', module=dbapi)
+
+ @event.listens_for(eng, "handle_error")
+ def handle_error(ctx):
+ assert ctx.is_disconnect
+ ctx.is_disconnect = False
+
+ try:
+ eng.connect()
+ assert False
+ except tsa.exc.DBAPIError as de:
+ assert not de.connection_invalidated
+
+ def test_cant_connect_stay_invalidated(self):
+ class MySpecialException(Exception):
+ pass
+
+ eng = create_engine('sqlite://')
+
+ @event.listens_for(eng, "handle_error")
+ def handle_error(ctx):
+ assert ctx.is_disconnect
+
+ conn = eng.connect()
+
+ conn.invalidate()
+
+ eng.pool._creator = Mock(
+ side_effect=self.ProgrammingError(
+ "Cannot operate on a closed database."))
+
+ try:
+ conn.connection
+ assert False
+ except tsa.exc.DBAPIError:
+ assert conn.invalidated
+
+ def _test_dont_touch_non_dbapi_exception_on_connect(self, connect_fn):
+ dbapi = self.dbapi
+ dbapi.connect = Mock(side_effect=TypeError("I'm not a DBAPI error"))
+
+ e = create_engine('sqlite://', module=dbapi)
+ e.dialect.is_disconnect = is_disconnect = Mock()
+ assert_raises_message(
+ TypeError,
+ "I'm not a DBAPI error",
+ connect_fn, e
+ )
+ eq_(is_disconnect.call_count, 0)
+
+ def test_dont_touch_non_dbapi_exception_on_connect(self):
+ self._test_dont_touch_non_dbapi_exception_on_connect(
+ lambda engine: engine.connect())
+
+ def test_dont_touch_non_dbapi_exception_on_contextual_connect(self):
+ self._test_dont_touch_non_dbapi_exception_on_connect(
+ lambda engine: engine.contextual_connect())
+
+ def test_ensure_dialect_does_is_disconnect_no_conn(self):
+ """test that is_disconnect() doesn't choke if no connection,
+ cursor given."""
+ dialect = testing.db.dialect
+ dbapi = dialect.dbapi
+ assert not dialect.is_disconnect(
+ dbapi.OperationalError("test"), None, None)
+
+ def _test_invalidate_on_connect(self, connect_fn):
+ """test that is_disconnect() is called during connect.
+
+ interpretation of connection failures are not supported by
+ every backend.
+
+ """
+
+ dbapi = self.dbapi
+ dbapi.connect = Mock(
+ side_effect=self.ProgrammingError(
+ "Cannot operate on a closed database."))
+ try:
+ connect_fn(create_engine('sqlite://', module=dbapi))
+ assert False
+ except tsa.exc.DBAPIError as de:
+ assert de.connection_invalidated
+
+ def test_invalidate_on_connect(self):
+ """test that is_disconnect() is called during connect.
+
+ interpretation of connection failures are not supported by
+ every backend.
+
+ """
+ self._test_invalidate_on_connect(lambda engine: engine.connect())
+
+ def test_invalidate_on_contextual_connect(self):
+ """test that is_disconnect() is called during connect.
+
+ interpretation of connection failures are not supported by
+ every backend.
+
+ """
+ self._test_invalidate_on_connect(
+ lambda engine: engine.contextual_connect())
+
class ProxyConnectionTest(fixtures.TestBase):