1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
import testbase
from sqlalchemy import create_engine, exceptions
import gc, weakref, sys
class MockDisconnect(Exception):
pass
class MockDBAPI(object):
def __init__(self):
self.paramstyle = 'named'
self.connections = weakref.WeakKeyDictionary()
def connect(self, *args, **kwargs):
return MockConnection(self)
class MockConnection(object):
def __init__(self, dbapi):
self.explode = False
dbapi.connections[self] = True
def rollback(self):
pass
def commit(self):
pass
def cursor(self):
return MockCursor(explode=self.explode)
def close(self):
pass
class MockCursor(object):
def __init__(self, explode):
self.explode = explode
self.description = None
def execute(self, *args, **kwargs):
if self.explode:
raise MockDisconnect("Lost the DB connection")
else:
return
def close(self):
pass
class ReconnectTest(testbase.PersistTest):
def test_reconnect(self):
"""test that an 'is_disconnect' condition will invalidate the connection, and additionally
dispose the previous connection pool and recreate."""
dbapi = MockDBAPI()
# create engine using our current dburi
db = create_engine('postgres://foo:bar@localhost/test', module=dbapi)
# monkeypatch disconnect checker
db.dialect.is_disconnect = lambda e: isinstance(e, MockDisconnect)
pid = id(db.connection_provider._pool)
# make a connection
conn = db.connect()
# connection works
conn.execute("SELECT 1")
# create a second connection within the pool, which we'll ensure also goes away
conn2 = db.connect()
conn2.close()
# two connections opened total now
assert len(dbapi.connections) == 2
# set it to fail
conn.connection.connection.explode = True
try:
# execute should fail
conn.execute("SELECT 1")
assert False
except exceptions.SQLAlchemyError, e:
pass
# assert was invalidated
assert conn.connection.connection is None
# close shouldnt break
conn.close()
assert id(db.connection_provider._pool) != pid
# ensure all connections closed (pool was recycled)
assert len(dbapi.connections) == 0
conn =db.connect()
conn.execute("SELECT 1")
conn.close()
assert len(dbapi.connections) == 1
if __name__ == '__main__':
testbase.main()
|