diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2016-03-25 18:31:17 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2016-03-25 18:31:17 -0400 |
| commit | 8a776122a378f92a97a7bc5a54422a2fe1ecd8f9 (patch) | |
| tree | d590b415c88a478e4673f79a90ba059475570451 /test | |
| parent | d594691a1af4e0d6b12aa4a5c1f1ede178a8985c (diff) | |
| download | sqlalchemy-8a776122a378f92a97a7bc5a54422a2fe1ecd8f9.tar.gz | |
- Added connection pool events :meth:`ConnectionEvents.close`,
:meth:`.ConnectionEvents.detach`,
:meth:`.ConnectionEvents.close_detached`.
Diffstat (limited to 'test')
| -rw-r--r-- | test/engine/test_pool.py | 60 |
1 files changed, 60 insertions, 0 deletions
diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index 8551e1fcb..4547984ab 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -345,6 +345,66 @@ class PoolEventsTest(PoolTestBase): return p, canary + def _close_event_fixture(self): + p = self._queuepool_fixture() + canary = Mock() + event.listen(p, 'close', canary) + + return p, canary + + def _detach_event_fixture(self): + p = self._queuepool_fixture() + canary = Mock() + event.listen(p, 'detach', canary) + + return p, canary + + def _close_detached_event_fixture(self): + p = self._queuepool_fixture() + canary = Mock() + event.listen(p, 'close_detached', canary) + + return p, canary + + def test_close(self): + p, canary = self._close_event_fixture() + + c1 = p.connect() + + connection = c1.connection + rec = c1._connection_record + + c1.close() + + eq_(canary.mock_calls, []) + + p.dispose() + eq_(canary.mock_calls, [call(connection, rec)]) + + def test_detach(self): + p, canary = self._detach_event_fixture() + + c1 = p.connect() + + connection = c1.connection + rec = c1._connection_record + + c1.detach() + + eq_(canary.mock_calls, [call(connection, rec)]) + + def test_detach_close(self): + p, canary = self._close_detached_event_fixture() + + c1 = p.connect() + + connection = c1.connection + + c1.detach() + + c1.close() + eq_(canary.mock_calls, [call(connection)]) + def test_first_connect_event(self): p, canary = self._first_connect_event_fixture() |
