diff options
| author | mike bayer <mike_mp@zzzcomputing.com> | 2018-02-07 20:58:47 -0500 |
|---|---|---|
| committer | Gerrit Code Review <gerrit@ci.zzzcomputing.com> | 2018-02-07 20:58:47 -0500 |
| commit | ba957f84d333918ca34d562b407cc6d65bccbccb (patch) | |
| tree | 4ec38c96ba774bdc6b8ef91ed55ae81c954b21fe /test/engine | |
| parent | 9f657fee5ba66820b9cd6b80653332c07eec0451 (diff) | |
| parent | 4065352c7e9779cfeae34f01073ef6c43aeae488 (diff) | |
| download | sqlalchemy-ba957f84d333918ca34d562b407cc6d65bccbccb.tar.gz | |
Merge "Add flag for class-level disallow of events, apply to OptionEngine"
Diffstat (limited to 'test/engine')
| -rw-r--r-- | test/engine/test_execute.py | 153 |
1 files changed, 102 insertions, 51 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index 1b7934794..e0727f770 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -511,57 +511,6 @@ class ExecuteTest(fixtures.TestBase): is_(eng.pool, eng2.pool) @testing.requires.ad_hoc_engines - def test_generative_engine_event_dispatch(self): - canary = [] - - def l1(*arg, **kw): - canary.append("l1") - - def l2(*arg, **kw): - canary.append("l2") - - def l3(*arg, **kw): - canary.append("l3") - - eng = engines.testing_engine(options={'execution_options': - {'base': 'x1'}}) - event.listen(eng, "before_execute", l1) - - eng1 = eng.execution_options(foo="b1") - event.listen(eng, "before_execute", l2) - event.listen(eng1, "before_execute", l3) - - eng.execute(select([1])).close() - eng1.execute(select([1])).close() - - eq_(canary, ["l1", "l2", "l3", "l1", "l2"]) - - @testing.requires.ad_hoc_engines - def test_dispose_event(self): - canary = Mock() - eng = create_engine(testing.db.url) - event.listen(eng, "engine_disposed", canary) - - conn = eng.connect() - conn.close() - eng.dispose() - - conn = eng.connect() - conn.close() - - eq_( - canary.mock_calls, - [call(eng)] - ) - - eng.dispose() - - eq_( - canary.mock_calls, - [call(eng), call(eng)] - ) - - @testing.requires.ad_hoc_engines def test_autocommit_option_no_issue_first_connect(self): eng = create_engine(testing.db.url) eng.update_execution_options(autocommit=True) @@ -1387,6 +1336,108 @@ class EngineEventsTest(fixtures.TestBase): eq_(c3._execution_options, {'foo': 'bar', 'bar': 'bat'}) eq_(canary, ['execute', 'cursor_execute']) + @testing.requires.ad_hoc_engines + def test_generative_engine_event_dispatch(self): + canary = [] + + def l1(*arg, **kw): + canary.append("l1") + + def l2(*arg, **kw): + canary.append("l2") + + def l3(*arg, **kw): + canary.append("l3") + + eng = engines.testing_engine(options={'execution_options': + {'base': 'x1'}}) + event.listen(eng, "before_execute", l1) + + eng1 = eng.execution_options(foo="b1") + event.listen(eng, "before_execute", l2) + event.listen(eng1, "before_execute", l3) + + eng.execute(select([1])).close() + + eq_(canary, ["l1", "l2"]) + + eng1.execute(select([1])).close() + + eq_(canary, ["l1", "l2", "l3", "l1", "l2"]) + + @testing.requires.ad_hoc_engines + def test_clslevel_engine_event_options(self): + canary = [] + + def l1(*arg, **kw): + canary.append("l1") + + def l2(*arg, **kw): + canary.append("l2") + + def l3(*arg, **kw): + canary.append("l3") + + def l4(*arg, **kw): + canary.append("l4") + + event.listen(Engine, "before_execute", l1) + + eng = engines.testing_engine(options={'execution_options': + {'base': 'x1'}}) + event.listen(eng, "before_execute", l2) + + eng1 = eng.execution_options(foo="b1") + event.listen(eng, "before_execute", l3) + event.listen(eng1, "before_execute", l4) + + eng.execute(select([1])).close() + + eq_(canary, ["l1", "l2", "l3"]) + + eng1.execute(select([1])).close() + + eq_(canary, ["l1", "l2", "l3", "l4", "l1", "l2", "l3"]) + + @testing.requires.ad_hoc_engines + def test_cant_listen_to_option_engine(self): + from sqlalchemy.engine import base + + def evt(*arg, **kw): + pass + + assert_raises_message( + tsa.exc.InvalidRequestError, + r"Can't assign an event directly to the " + "<class 'sqlalchemy.engine.base.OptionEngine'> class", + event.listen, base.OptionEngine, "before_cursor_execute", evt + ) + + @testing.requires.ad_hoc_engines + def test_dispose_event(self): + canary = Mock() + eng = create_engine(testing.db.url) + event.listen(eng, "engine_disposed", canary) + + conn = eng.connect() + conn.close() + eng.dispose() + + conn = eng.connect() + conn.close() + + eq_( + canary.mock_calls, + [call(eng)] + ) + + eng.dispose() + + eq_( + canary.mock_calls, + [call(eng), call(eng)] + ) + def test_retval_flag(self): canary = [] |
