summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2016-03-25 18:31:17 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2016-03-25 18:31:17 -0400
commit8a776122a378f92a97a7bc5a54422a2fe1ecd8f9 (patch)
treed590b415c88a478e4673f79a90ba059475570451 /test
parentd594691a1af4e0d6b12aa4a5c1f1ede178a8985c (diff)
downloadsqlalchemy-8a776122a378f92a97a7bc5a54422a2fe1ecd8f9.tar.gz
- Added connection pool events :meth:`ConnectionEvents.close`,
:meth:`.ConnectionEvents.detach`, :meth:`.ConnectionEvents.close_detached`.
Diffstat (limited to 'test')
-rw-r--r--test/engine/test_pool.py60
1 files changed, 60 insertions, 0 deletions
diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py
index 8551e1fcb..4547984ab 100644
--- a/test/engine/test_pool.py
+++ b/test/engine/test_pool.py
@@ -345,6 +345,66 @@ class PoolEventsTest(PoolTestBase):
return p, canary
+ def _close_event_fixture(self):
+ p = self._queuepool_fixture()
+ canary = Mock()
+ event.listen(p, 'close', canary)
+
+ return p, canary
+
+ def _detach_event_fixture(self):
+ p = self._queuepool_fixture()
+ canary = Mock()
+ event.listen(p, 'detach', canary)
+
+ return p, canary
+
+ def _close_detached_event_fixture(self):
+ p = self._queuepool_fixture()
+ canary = Mock()
+ event.listen(p, 'close_detached', canary)
+
+ return p, canary
+
+ def test_close(self):
+ p, canary = self._close_event_fixture()
+
+ c1 = p.connect()
+
+ connection = c1.connection
+ rec = c1._connection_record
+
+ c1.close()
+
+ eq_(canary.mock_calls, [])
+
+ p.dispose()
+ eq_(canary.mock_calls, [call(connection, rec)])
+
+ def test_detach(self):
+ p, canary = self._detach_event_fixture()
+
+ c1 = p.connect()
+
+ connection = c1.connection
+ rec = c1._connection_record
+
+ c1.detach()
+
+ eq_(canary.mock_calls, [call(connection, rec)])
+
+ def test_detach_close(self):
+ p, canary = self._close_detached_event_fixture()
+
+ c1 = p.connect()
+
+ connection = c1.connection
+
+ c1.detach()
+
+ c1.close()
+ eq_(canary.mock_calls, [call(connection)])
+
def test_first_connect_event(self):
p, canary = self._first_connect_event_fixture()