summaryrefslogtreecommitdiff
path: root/test/pool.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2006-05-25 14:20:23 +0000
committerMike Bayer <mike_mp@zzzcomputing.com>2006-05-25 14:20:23 +0000
commitbb79e2e871d0a4585164c1a6ed626d96d0231975 (patch)
tree6d457ba6c36c408b45db24ec3c29e147fe7504ff /test/pool.py
parent4fc3a0648699c2b441251ba4e1d37a9107bd1986 (diff)
downloadsqlalchemy-bb79e2e871d0a4585164c1a6ed626d96d0231975.tar.gz
merged 0.2 branch into trunk; 0.1 now in sqlalchemy/branches/rel_0_1
Diffstat (limited to 'test/pool.py')
-rw-r--r--test/pool.py66
1 files changed, 58 insertions, 8 deletions
diff --git a/test/pool.py b/test/pool.py
index 2737a33b1..d8c984aa8 100644
--- a/test/pool.py
+++ b/test/pool.py
@@ -1,5 +1,5 @@
from testbase import PersistTest
-import unittest, sys, os
+import unittest, sys, os, time
from pysqlite2 import dbapi2 as sqlite
import sqlalchemy.pool as pool
@@ -40,7 +40,14 @@ class PoolTest(PersistTest):
self.assert_(connection.cursor() is not None)
self.assert_(connection is not connection2)
- def testqueuepool(self):
+ def testqueuepool_del(self):
+ self._do_testqueuepool(useclose=False)
+
+ def testqueuepool_close(self):
+ self._do_testqueuepool(useclose=True)
+
+ def _do_testqueuepool(self, useclose=False):
+
p = pool.QueuePool(creator = lambda: sqlite.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = False, echo = False)
def status(pool):
@@ -60,30 +67,73 @@ class PoolTest(PersistTest):
self.assert_(status(p) == (3,0,2,5))
c6 = p.connect()
self.assert_(status(p) == (3,0,3,6))
- c4 = c3 = c2 = None
+ if useclose:
+ c4.close()
+ c3.close()
+ c2.close()
+ else:
+ c4 = c3 = c2 = None
self.assert_(status(p) == (3,3,3,3))
- c1 = c5 = c6 = None
+ if useclose:
+ c1.close()
+ c5.close()
+ c6.close()
+ else:
+ c1 = c5 = c6 = None
self.assert_(status(p) == (3,3,0,0))
c1 = p.connect()
c2 = p.connect()
self.assert_(status(p) == (3, 1, 0, 2))
- c2 = None
+ if useclose:
+ c2.close()
+ else:
+ c2 = None
self.assert_(status(p) == (3, 2, 0, 1))
- def testthreadlocal(self):
+ def test_timeout(self):
+ p = pool.QueuePool(creator = lambda: sqlite.connect('foo.db'), pool_size = 3, max_overflow = 0, use_threadlocal = False, echo = False, timeout=2)
+ c1 = p.get()
+ c2 = p.get()
+ c3 = p.get()
+ now = time.time()
+ c4 = p.get()
+ assert int(time.time() - now) == 2
+
+ def testthreadlocal_del(self):
+ self._do_testthreadlocal(useclose=False)
+
+ def testthreadlocal_close(self):
+ self._do_testthreadlocal(useclose=True)
+
+ def _do_testthreadlocal(self, useclose=False):
for p in (
pool.QueuePool(creator = lambda: sqlite.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = True, echo = False),
pool.SingletonThreadPool(creator = lambda: sqlite.connect('foo.db'), use_threadlocal = True)
- ):
+ ):
c1 = p.connect()
c2 = p.connect()
self.assert_(c1 is c2)
c3 = p.unique_connection()
self.assert_(c3 is not c1)
- c2 = None
+ if useclose:
+ c2.close()
+ else:
+ c2 = None
c2 = p.connect()
self.assert_(c1 is c2)
self.assert_(c3 is not c1)
+ if useclose:
+ c2.close()
+ else:
+ c2 = None
+
+ if useclose:
+ c1 = p.connect()
+ c2 = p.connect()
+ c3 = p.connect()
+ c3.close()
+ c2.close()
+ self.assert_(c1.connection is not None)
def tearDown(self):
pool.clear_managers()