1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
|
import testbase
import unittest, sys, datetime
import tables
db = testbase.db
from sqlalchemy import *
class TransactionTest(testbase.PersistTest):
def setUpAll(self):
global users, metadata
metadata = MetaData()
users = Table('query_users', metadata,
Column('user_id', INT, primary_key = True),
Column('user_name', VARCHAR(20)),
)
users.create(testbase.db)
def tearDown(self):
testbase.db.connect().execute(users.delete())
def tearDownAll(self):
users.drop(testbase.db)
@testbase.unsupported('mysql')
def testrollback(self):
"""test a basic rollback"""
connection = testbase.db.connect()
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name='user1')
connection.execute(users.insert(), user_id=2, user_name='user2')
connection.execute(users.insert(), user_id=3, user_name='user3')
transaction.rollback()
result = connection.execute("select * from query_users")
assert len(result.fetchall()) == 0
connection.close()
@testbase.unsupported('mysql')
def testnestedrollback(self):
connection = testbase.db.connect()
try:
transaction = connection.begin()
try:
connection.execute(users.insert(), user_id=1, user_name='user1')
connection.execute(users.insert(), user_id=2, user_name='user2')
connection.execute(users.insert(), user_id=3, user_name='user3')
trans2 = connection.begin()
try:
connection.execute(users.insert(), user_id=4, user_name='user4')
connection.execute(users.insert(), user_id=5, user_name='user5')
raise Exception("uh oh")
trans2.commit()
except:
trans2.rollback()
raise
transaction.rollback()
except Exception, e:
transaction.rollback()
raise
except Exception, e:
try:
assert str(e) == 'uh oh' # and not "This transaction is inactive"
finally:
connection.close()
@testbase.unsupported('mysql')
def testnesting(self):
connection = testbase.db.connect()
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name='user1')
connection.execute(users.insert(), user_id=2, user_name='user2')
connection.execute(users.insert(), user_id=3, user_name='user3')
trans2 = connection.begin()
connection.execute(users.insert(), user_id=4, user_name='user4')
connection.execute(users.insert(), user_id=5, user_name='user5')
trans2.commit()
transaction.rollback()
self.assert_(connection.scalar("select count(1) from query_users") == 0)
result = connection.execute("select * from query_users")
assert len(result.fetchall()) == 0
connection.close()
class AutoRollbackTest(testbase.PersistTest):
def setUpAll(self):
global metadata
metadata = MetaData()
def tearDownAll(self):
metadata.drop_all(testbase.db)
@testbase.unsupported('sqlite')
def testrollback_deadlock(self):
"""test that returning connections to the pool clears any object locks."""
conn1 = testbase.db.connect()
conn2 = testbase.db.connect()
users = Table('deadlock_users', metadata,
Column('user_id', INT, primary_key = True),
Column('user_name', VARCHAR(20)),
)
users.create(conn1)
conn1.execute("select * from deadlock_users")
conn1.close()
# without auto-rollback in the connection pool's return() logic, this deadlocks in Postgres,
# because conn1 is returned to the pool but still has a lock on "deadlock_users"
# comment out the rollback in pool/ConnectionFairy._close() to see !
users.drop(conn2)
conn2.close()
class TLTransactionTest(testbase.PersistTest):
def setUpAll(self):
global users, metadata, tlengine
tlengine = create_engine(testbase.db_uri, strategy='threadlocal', echo=True)
metadata = MetaData()
users = Table('query_users', metadata,
Column('user_id', INT, primary_key = True),
Column('user_name', VARCHAR(20)),
)
users.create(tlengine)
def tearDown(self):
tlengine.execute(users.delete())
def tearDownAll(self):
users.drop(tlengine)
tlengine.dispose()
@testbase.unsupported('mysql')
def testrollback(self):
"""test a basic rollback"""
tlengine.begin()
tlengine.execute(users.insert(), user_id=1, user_name='user1')
tlengine.execute(users.insert(), user_id=2, user_name='user2')
tlengine.execute(users.insert(), user_id=3, user_name='user3')
tlengine.rollback()
external_connection = tlengine.connect()
result = external_connection.execute("select * from query_users")
try:
assert len(result.fetchall()) == 0
finally:
external_connection.close()
@testbase.unsupported('mysql')
def testcommit(self):
"""test a basic commit"""
tlengine.begin()
tlengine.execute(users.insert(), user_id=1, user_name='user1')
tlengine.execute(users.insert(), user_id=2, user_name='user2')
tlengine.execute(users.insert(), user_id=3, user_name='user3')
tlengine.commit()
external_connection = tlengine.connect()
result = external_connection.execute("select * from query_users")
try:
assert len(result.fetchall()) == 3
finally:
external_connection.close()
@testbase.unsupported('mysql')
def testrollback_off_conn(self):
# test that a TLTransaction opened off a TLConnection allows that
# TLConnection to be aware of the transactional context
conn = tlengine.contextual_connect()
trans = conn.begin()
conn.execute(users.insert(), user_id=1, user_name='user1')
conn.execute(users.insert(), user_id=2, user_name='user2')
conn.execute(users.insert(), user_id=3, user_name='user3')
trans.rollback()
external_connection = tlengine.connect()
result = external_connection.execute("select * from query_users")
try:
assert len(result.fetchall()) == 0
finally:
external_connection.close()
@testbase.unsupported('mysql')
def testmorerollback_off_conn(self):
# test that an existing TLConnection automatically takes place in a TLTransaction
# opened on a second TLConnection
conn = tlengine.contextual_connect()
conn2 = tlengine.contextual_connect()
trans = conn2.begin()
conn.execute(users.insert(), user_id=1, user_name='user1')
conn.execute(users.insert(), user_id=2, user_name='user2')
conn.execute(users.insert(), user_id=3, user_name='user3')
trans.rollback()
external_connection = tlengine.connect()
result = external_connection.execute("select * from query_users")
try:
assert len(result.fetchall()) == 0
finally:
external_connection.close()
@testbase.unsupported('mysql')
def testcommit_off_conn(self):
conn = tlengine.contextual_connect()
trans = conn.begin()
conn.execute(users.insert(), user_id=1, user_name='user1')
conn.execute(users.insert(), user_id=2, user_name='user2')
conn.execute(users.insert(), user_id=3, user_name='user3')
trans.commit()
external_connection = tlengine.connect()
result = external_connection.execute("select * from query_users")
try:
assert len(result.fetchall()) == 3
finally:
external_connection.close()
@testbase.unsupported('mysql', 'sqlite')
def testnesting(self):
"""tests nesting of tranacstions"""
external_connection = tlengine.connect()
self.assert_(external_connection.connection is not tlengine.contextual_connect().connection)
tlengine.begin()
tlengine.execute(users.insert(), user_id=1, user_name='user1')
tlengine.execute(users.insert(), user_id=2, user_name='user2')
tlengine.execute(users.insert(), user_id=3, user_name='user3')
tlengine.begin()
tlengine.execute(users.insert(), user_id=4, user_name='user4')
tlengine.execute(users.insert(), user_id=5, user_name='user5')
tlengine.commit()
tlengine.rollback()
try:
self.assert_(external_connection.scalar("select count(1) from query_users") == 0)
finally:
external_connection.close()
@testbase.unsupported('mysql')
def testmixednesting(self):
"""tests nesting of transactions off the TLEngine directly inside of
tranasctions off the connection from the TLEngine"""
external_connection = tlengine.connect()
self.assert_(external_connection.connection is not tlengine.contextual_connect().connection)
conn = tlengine.contextual_connect()
trans = conn.begin()
trans2 = conn.begin()
tlengine.execute(users.insert(), user_id=1, user_name='user1')
tlengine.execute(users.insert(), user_id=2, user_name='user2')
tlengine.execute(users.insert(), user_id=3, user_name='user3')
tlengine.begin()
tlengine.execute(users.insert(), user_id=4, user_name='user4')
tlengine.begin()
tlengine.execute(users.insert(), user_id=5, user_name='user5')
tlengine.execute(users.insert(), user_id=6, user_name='user6')
tlengine.execute(users.insert(), user_id=7, user_name='user7')
tlengine.commit()
tlengine.execute(users.insert(), user_id=8, user_name='user8')
tlengine.commit()
trans2.commit()
trans.rollback()
conn.close()
try:
self.assert_(external_connection.scalar("select count(1) from query_users") == 0)
finally:
external_connection.close()
@testbase.unsupported('mysql')
def testmoremixednesting(self):
"""tests nesting of transactions off the connection from the TLEngine
inside of tranasctions off thbe TLEngine directly."""
external_connection = tlengine.connect()
self.assert_(external_connection.connection is not tlengine.contextual_connect().connection)
tlengine.begin()
connection = tlengine.contextual_connect()
connection.execute(users.insert(), user_id=1, user_name='user1')
tlengine.begin()
connection.execute(users.insert(), user_id=2, user_name='user2')
connection.execute(users.insert(), user_id=3, user_name='user3')
trans = connection.begin()
connection.execute(users.insert(), user_id=4, user_name='user4')
connection.execute(users.insert(), user_id=5, user_name='user5')
trans.commit()
tlengine.commit()
tlengine.rollback()
connection.close()
try:
self.assert_(external_connection.scalar("select count(1) from query_users") == 0)
finally:
external_connection.close()
def testsessionnesting(self):
class User(object):
pass
try:
mapper(User, users)
sess = create_session(bind_to=tlengine)
tlengine.begin()
u = User()
sess.save(u)
sess.flush()
tlengine.commit()
finally:
clear_mappers()
def testconnections(self):
"""tests that contextual_connect is threadlocal"""
c1 = tlengine.contextual_connect()
c2 = tlengine.contextual_connect()
assert c1.connection is c2.connection
c2.close()
assert c1.connection.connection is not None
if __name__ == "__main__":
testbase.main()
|