1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
import sys, weakref
from testlib import config
class ConnectionKiller(object):
def __init__(self):
self.proxy_refs = weakref.WeakKeyDictionary()
def checkout(self, dbapi_con, con_record, con_proxy):
self.proxy_refs[con_proxy] = True
def _apply_all(self, methods):
for rec in self.proxy_refs:
if rec is not None and rec.is_valid:
try:
for name in methods:
if callable(name):
name(rec)
else:
getattr(rec, name)()
except (SystemExit, KeyboardInterrupt):
raise
except Exception, e:
# fixme
sys.stderr.write("\n" + str(e) + "\n")
def rollback_all(self):
self._apply_all(('rollback',))
def close_all(self):
self._apply_all(('rollback', 'close'))
def assert_all_closed(self):
for rec in self.proxy_refs:
if rec.is_valid:
assert False
testing_reaper = ConnectionKiller()
def assert_conns_closed(fn):
def decorated(*args, **kw):
try:
fn(*args, **kw)
finally:
testing_reaper.assert_all_closed()
decorated.__name__ = fn.__name__
return decorated
def rollback_open_connections(fn):
"""Decorator that rolls back all open connections after fn execution."""
def decorated(*args, **kw):
try:
fn(*args, **kw)
finally:
testing_reaper.rollback_all()
decorated.__name__ = fn.__name__
return decorated
def close_open_connections(fn):
"""Decorator that closes all connections after fn execution."""
def decorated(*args, **kw):
try:
fn(*args, **kw)
finally:
testing_reaper.close_all()
decorated.__name__ = fn.__name__
return decorated
class ReconnectFixture(object):
def __init__(self, dbapi):
self.dbapi = dbapi
self.connections = []
def __getattr__(self, key):
return getattr(self.dbapi, key)
def connect(self, *args, **kwargs):
conn = self.dbapi.connect(*args, **kwargs)
self.connections.append(conn)
return conn
def shutdown(self):
for c in list(self.connections):
c.close()
self.connections = []
def reconnecting_engine(url=None, options=None):
url = url or config.db_url
dbapi = config.db.dialect.dbapi
engine = testing_engine(url, {'module':ReconnectFixture(dbapi)})
engine.test_shutdown = engine.dialect.dbapi.shutdown
return engine
def testing_engine(url=None, options=None):
"""Produce an engine configured by --options with optional overrides."""
from sqlalchemy import create_engine
from testlib.testing import ExecutionContextWrapper
url = url or config.db_url
options = options or config.db_opts
listeners = options.setdefault('listeners', [])
listeners.append(testing_reaper)
engine = create_engine(url, **options)
create_context = engine.dialect.create_execution_context
def create_exec_context(*args, **kwargs):
return ExecutionContextWrapper(create_context(*args, **kwargs))
engine.dialect.create_execution_context = create_exec_context
return engine
def utf8_engine(url=None, options=None):
"""Hook for dialects or drivers that don't handle utf8 by default."""
from sqlalchemy.engine import url as engine_url
if config.db.name == 'mysql':
dbapi_ver = config.db.dialect.dbapi.version_info
if (dbapi_ver < (1, 2, 1) or
dbapi_ver in ((1, 2, 1, 'gamma', 1), (1, 2, 1, 'gamma', 2),
(1, 2, 1, 'gamma', 3), (1, 2, 1, 'gamma', 5))):
raise RuntimeError('Character set support unavailable with this '
'driver version: %s' % repr(dbapi_ver))
else:
url = url or config.db_url
url = engine_url.make_url(url)
url.query['charset'] = 'utf8'
url.query['use_unicode'] = '0'
url = str(url)
return testing_engine(url, options)
|