diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2006-05-25 14:20:23 +0000 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2006-05-25 14:20:23 +0000 |
| commit | bb79e2e871d0a4585164c1a6ed626d96d0231975 (patch) | |
| tree | 6d457ba6c36c408b45db24ec3c29e147fe7504ff /test/pool.py | |
| parent | 4fc3a0648699c2b441251ba4e1d37a9107bd1986 (diff) | |
| download | sqlalchemy-bb79e2e871d0a4585164c1a6ed626d96d0231975.tar.gz | |
merged 0.2 branch into trunk; 0.1 now in sqlalchemy/branches/rel_0_1
Diffstat (limited to 'test/pool.py')
| -rw-r--r-- | test/pool.py | 66 |
1 files changed, 58 insertions, 8 deletions
diff --git a/test/pool.py b/test/pool.py index 2737a33b1..d8c984aa8 100644 --- a/test/pool.py +++ b/test/pool.py @@ -1,5 +1,5 @@ from testbase import PersistTest -import unittest, sys, os +import unittest, sys, os, time from pysqlite2 import dbapi2 as sqlite import sqlalchemy.pool as pool @@ -40,7 +40,14 @@ class PoolTest(PersistTest): self.assert_(connection.cursor() is not None) self.assert_(connection is not connection2) - def testqueuepool(self): + def testqueuepool_del(self): + self._do_testqueuepool(useclose=False) + + def testqueuepool_close(self): + self._do_testqueuepool(useclose=True) + + def _do_testqueuepool(self, useclose=False): + p = pool.QueuePool(creator = lambda: sqlite.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = False, echo = False) def status(pool): @@ -60,30 +67,73 @@ class PoolTest(PersistTest): self.assert_(status(p) == (3,0,2,5)) c6 = p.connect() self.assert_(status(p) == (3,0,3,6)) - c4 = c3 = c2 = None + if useclose: + c4.close() + c3.close() + c2.close() + else: + c4 = c3 = c2 = None self.assert_(status(p) == (3,3,3,3)) - c1 = c5 = c6 = None + if useclose: + c1.close() + c5.close() + c6.close() + else: + c1 = c5 = c6 = None self.assert_(status(p) == (3,3,0,0)) c1 = p.connect() c2 = p.connect() self.assert_(status(p) == (3, 1, 0, 2)) - c2 = None + if useclose: + c2.close() + else: + c2 = None self.assert_(status(p) == (3, 2, 0, 1)) - def testthreadlocal(self): + def test_timeout(self): + p = pool.QueuePool(creator = lambda: sqlite.connect('foo.db'), pool_size = 3, max_overflow = 0, use_threadlocal = False, echo = False, timeout=2) + c1 = p.get() + c2 = p.get() + c3 = p.get() + now = time.time() + c4 = p.get() + assert int(time.time() - now) == 2 + + def testthreadlocal_del(self): + self._do_testthreadlocal(useclose=False) + + def testthreadlocal_close(self): + self._do_testthreadlocal(useclose=True) + + def _do_testthreadlocal(self, useclose=False): for p in ( pool.QueuePool(creator = lambda: sqlite.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = True, echo = False), pool.SingletonThreadPool(creator = lambda: sqlite.connect('foo.db'), use_threadlocal = True) - ): + ): c1 = p.connect() c2 = p.connect() self.assert_(c1 is c2) c3 = p.unique_connection() self.assert_(c3 is not c1) - c2 = None + if useclose: + c2.close() + else: + c2 = None c2 = p.connect() self.assert_(c1 is c2) self.assert_(c3 is not c1) + if useclose: + c2.close() + else: + c2 = None + + if useclose: + c1 = p.connect() + c2 = p.connect() + c3 = p.connect() + c3.close() + c2.close() + self.assert_(c1.connection is not None) def tearDown(self): pool.clear_managers() |
