summaryrefslogtreecommitdiff
path: root/test/engine
diff options
context:
space:
mode:
authormike bayer <mike_mp@zzzcomputing.com>2018-02-07 20:58:47 -0500
committerGerrit Code Review <gerrit@ci.zzzcomputing.com>2018-02-07 20:58:47 -0500
commitba957f84d333918ca34d562b407cc6d65bccbccb (patch)
tree4ec38c96ba774bdc6b8ef91ed55ae81c954b21fe /test/engine
parent9f657fee5ba66820b9cd6b80653332c07eec0451 (diff)
parent4065352c7e9779cfeae34f01073ef6c43aeae488 (diff)
downloadsqlalchemy-ba957f84d333918ca34d562b407cc6d65bccbccb.tar.gz
Merge "Add flag for class-level disallow of events, apply to OptionEngine"
Diffstat (limited to 'test/engine')
-rw-r--r--test/engine/test_execute.py153
1 files changed, 102 insertions, 51 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index 1b7934794..e0727f770 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -511,57 +511,6 @@ class ExecuteTest(fixtures.TestBase):
is_(eng.pool, eng2.pool)
@testing.requires.ad_hoc_engines
- def test_generative_engine_event_dispatch(self):
- canary = []
-
- def l1(*arg, **kw):
- canary.append("l1")
-
- def l2(*arg, **kw):
- canary.append("l2")
-
- def l3(*arg, **kw):
- canary.append("l3")
-
- eng = engines.testing_engine(options={'execution_options':
- {'base': 'x1'}})
- event.listen(eng, "before_execute", l1)
-
- eng1 = eng.execution_options(foo="b1")
- event.listen(eng, "before_execute", l2)
- event.listen(eng1, "before_execute", l3)
-
- eng.execute(select([1])).close()
- eng1.execute(select([1])).close()
-
- eq_(canary, ["l1", "l2", "l3", "l1", "l2"])
-
- @testing.requires.ad_hoc_engines
- def test_dispose_event(self):
- canary = Mock()
- eng = create_engine(testing.db.url)
- event.listen(eng, "engine_disposed", canary)
-
- conn = eng.connect()
- conn.close()
- eng.dispose()
-
- conn = eng.connect()
- conn.close()
-
- eq_(
- canary.mock_calls,
- [call(eng)]
- )
-
- eng.dispose()
-
- eq_(
- canary.mock_calls,
- [call(eng), call(eng)]
- )
-
- @testing.requires.ad_hoc_engines
def test_autocommit_option_no_issue_first_connect(self):
eng = create_engine(testing.db.url)
eng.update_execution_options(autocommit=True)
@@ -1387,6 +1336,108 @@ class EngineEventsTest(fixtures.TestBase):
eq_(c3._execution_options, {'foo': 'bar', 'bar': 'bat'})
eq_(canary, ['execute', 'cursor_execute'])
+ @testing.requires.ad_hoc_engines
+ def test_generative_engine_event_dispatch(self):
+ canary = []
+
+ def l1(*arg, **kw):
+ canary.append("l1")
+
+ def l2(*arg, **kw):
+ canary.append("l2")
+
+ def l3(*arg, **kw):
+ canary.append("l3")
+
+ eng = engines.testing_engine(options={'execution_options':
+ {'base': 'x1'}})
+ event.listen(eng, "before_execute", l1)
+
+ eng1 = eng.execution_options(foo="b1")
+ event.listen(eng, "before_execute", l2)
+ event.listen(eng1, "before_execute", l3)
+
+ eng.execute(select([1])).close()
+
+ eq_(canary, ["l1", "l2"])
+
+ eng1.execute(select([1])).close()
+
+ eq_(canary, ["l1", "l2", "l3", "l1", "l2"])
+
+ @testing.requires.ad_hoc_engines
+ def test_clslevel_engine_event_options(self):
+ canary = []
+
+ def l1(*arg, **kw):
+ canary.append("l1")
+
+ def l2(*arg, **kw):
+ canary.append("l2")
+
+ def l3(*arg, **kw):
+ canary.append("l3")
+
+ def l4(*arg, **kw):
+ canary.append("l4")
+
+ event.listen(Engine, "before_execute", l1)
+
+ eng = engines.testing_engine(options={'execution_options':
+ {'base': 'x1'}})
+ event.listen(eng, "before_execute", l2)
+
+ eng1 = eng.execution_options(foo="b1")
+ event.listen(eng, "before_execute", l3)
+ event.listen(eng1, "before_execute", l4)
+
+ eng.execute(select([1])).close()
+
+ eq_(canary, ["l1", "l2", "l3"])
+
+ eng1.execute(select([1])).close()
+
+ eq_(canary, ["l1", "l2", "l3", "l4", "l1", "l2", "l3"])
+
+ @testing.requires.ad_hoc_engines
+ def test_cant_listen_to_option_engine(self):
+ from sqlalchemy.engine import base
+
+ def evt(*arg, **kw):
+ pass
+
+ assert_raises_message(
+ tsa.exc.InvalidRequestError,
+ r"Can't assign an event directly to the "
+ "<class 'sqlalchemy.engine.base.OptionEngine'> class",
+ event.listen, base.OptionEngine, "before_cursor_execute", evt
+ )
+
+ @testing.requires.ad_hoc_engines
+ def test_dispose_event(self):
+ canary = Mock()
+ eng = create_engine(testing.db.url)
+ event.listen(eng, "engine_disposed", canary)
+
+ conn = eng.connect()
+ conn.close()
+ eng.dispose()
+
+ conn = eng.connect()
+ conn.close()
+
+ eq_(
+ canary.mock_calls,
+ [call(eng)]
+ )
+
+ eng.dispose()
+
+ eq_(
+ canary.mock_calls,
+ [call(eng), call(eng)]
+ )
+
def test_retval_flag(self):
canary = []