1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
|
# objectstore.py
# Copyright (C) 2005,2006 Michael Bayer mike_mp@zzzcomputing.com
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
from sqlalchemy import util, exceptions, sql
import unitofwork, query
import weakref
import sqlalchemy
class SessionTransaction(object):
def __init__(self, session, parent=None, autoflush=True):
self.session = session
self.connections = {}
self.parent = parent
self.autoflush = autoflush
def connection(self, mapper_or_class, entity_name=None):
if isinstance(mapper_or_class, type):
mapper_or_class = class_mapper(mapper_or_class, entity_name=entity_name)
if self.parent is not None:
return self.parent.connection(mapper_or_class)
engine = self.session.get_bind(mapper_or_class)
return self.get_or_add(engine)
def _begin(self):
return SessionTransaction(self.session, self)
def add(self, connectable):
if self.connections.has_key(connectable.engine):
raise exceptions.InvalidRequestError("Session already has a Connection associated for the given Connection's Engine")
return self.get_or_add(connectable)
def get_or_add(self, connectable):
# we reference the 'engine' attribute on the given object, which in the case of
# Connection, ProxyEngine, Engine, ComposedSQLEngine, whatever, should return the original
# "Engine" object that is handling the connection.
if self.connections.has_key(connectable.engine):
return self.connections[connectable.engine][0]
e = connectable.engine
c = connectable.contextual_connect()
if not self.connections.has_key(e):
self.connections[e] = (c, c.begin())
return self.connections[e][0]
def commit(self):
if self.parent is not None:
return
if self.autoflush:
self.session.flush()
for t in self.connections.values():
t[1].commit()
self.close()
def rollback(self):
if self.parent is not None:
self.parent.rollback()
return
for k, t in self.connections.iteritems():
t[1].rollback()
self.close()
def close(self):
if self.parent is not None:
return
for t in self.connections.values():
t[0].close()
self.session.transaction = None
class Session(object):
"""encapsulates a set of objects being operated upon within an object-relational operation."""
def __init__(self, bind_to=None, hash_key=None, import_session=None, echo_uow=False):
if import_session is not None:
self.uow = unitofwork.UnitOfWork(identity_map=import_session.uow.identity_map)
else:
self.uow = unitofwork.UnitOfWork()
self.bind_to = bind_to
self.binds = {}
self.echo_uow = echo_uow
self.transaction = None
if hash_key is None:
self.hash_key = id(self)
else:
self.hash_key = hash_key
_sessions[self.hash_key] = self
def create_transaction(self, **kwargs):
"""returns a new SessionTransaction corresponding to an existing or new transaction.
if the transaction is new, the returned SessionTransaction will have commit control
over the underlying transaction, else will have rollback control only."""
if self.transaction is not None:
return self.transaction._begin()
else:
self.transaction = SessionTransaction(self, **kwargs)
return self.transaction
def connect(self, mapper=None, **kwargs):
"""returns a unique connection corresponding to the given mapper. this connection
will not be part of any pre-existing transactional context."""
return self.get_bind(mapper).connect(**kwargs)
def connection(self, mapper, **kwargs):
"""returns a Connection corresponding to the given mapper. used by the execute()
method which performs select operations for Mapper and Query.
if this Session is transactional,
the connection will be in the context of this session's transaction. otherwise, the connection
is returned by the contextual_connect method, which some Engines override to return a thread-local
connection, and will have close_with_result set to True.
the given **kwargs will be sent to the engine's contextual_connect() method, if no transaction is in progress."""
if self.transaction is not None:
return self.transaction.connection(mapper)
else:
return self.get_bind(mapper).contextual_connect(**kwargs)
def execute(self, mapper, clause, params, **kwargs):
"""using the given mapper to identify the appropriate Engine or Connection to be used for statement execution,
executes the given ClauseElement using the provided parameter dictionary. Returns a ResultProxy corresponding
to the execution's results. If this method allocates a new Connection for the operation, then the ResultProxy's close()
method will release the resources of the underlying Connection, otherwise its a no-op.
"""
return self.connection(mapper, close_with_result=True).execute(clause, params, **kwargs)
def scalar(self, mapper, clause, params, **kwargs):
"""works like execute() but returns a scalar result."""
return self.connection(mapper, close_with_result=True).scalar(clause, params, **kwargs)
def close(self):
"""closes this Session.
"""
self.clear()
if self.transaction is not None:
self.transaction.close()
def clear(self):
"""removes all object instances from this Session. this is equivalent to calling expunge() for all
objects in this Session."""
for instance in self:
self._unattach(instance)
self.uow = unitofwork.UnitOfWork()
def mapper(self, class_, entity_name=None):
"""given an Class, returns the primary Mapper responsible for persisting it"""
return class_mapper(class_, entity_name = entity_name)
def bind_mapper(self, mapper, bindto):
"""binds the given Mapper to the given Engine or Connection. All subsequent operations involving this
Mapper will use the given bindto."""
self.binds[mapper] = bindto
def bind_table(self, table, bindto):
"""binds the given Table to the given Engine or Connection. All subsequent operations involving this
Table will use the given bindto."""
self.binds[table] = bindto
def get_bind(self, mapper):
"""given a Mapper, returns the Engine or Connection which is used to execute statements on behalf of this
Mapper. Calling connect() on the return result will always result in a Connection object. This method
disregards any SessionTransaction that may be in progress.
The order of searching is as follows:
if an Engine or Connection was bound to this Mapper specifically within this Session, returns that
Engine or Connection.
if an Engine or Connection was bound to this Mapper's underlying Table within this Session
(i.e. not to the Table directly), returns that Engine or Conneciton.
if an Engine or Connection was bound to this Session, returns that Engine or Connection.
finally, returns the Engine which was bound directly to the Table's MetaData object.
If no Engine is bound to the Table, an exception is raised.
"""
if mapper is None:
return self.bind_to
elif self.binds.has_key(mapper):
return self.binds[mapper]
elif self.binds.has_key(mapper.mapped_table):
return self.binds[mapper.mapped_table]
elif self.bind_to is not None:
return self.bind_to
else:
e = mapper.mapped_table.engine
if e is None:
raise exceptions.InvalidRequestError("Could not locate any Engine bound to mapper '%s'" % str(mapper))
return e
def query(self, mapper_or_class, entity_name=None):
"""given a mapper or Class, returns a new Query object corresponding to this Session and the mapper, or the classes' primary mapper."""
if isinstance(mapper_or_class, type):
return query.Query(class_mapper(mapper_or_class, entity_name=entity_name), self)
else:
return query.Query(mapper_or_class, self)
def _sql(self):
class SQLProxy(object):
def __getattr__(self, key):
def call(*args, **kwargs):
kwargs[engine] = self.engine
return getattr(sql, key)(*args, **kwargs)
sql = property(_sql)
def get_id_key(ident, class_, entity_name=None):
"""returns an identity-map key for use in storing/retrieving an item from the identity
map, given a tuple of the object's primary key values.
ident - a tuple of primary key values corresponding to the object to be stored. these
values should be in the same order as the primary keys of the table
class_ - a reference to the object's class
entity_name - optional string name to further qualify the class
"""
return (class_, tuple(ident), entity_name)
get_id_key = staticmethod(get_id_key)
def get_row_key(row, class_, primary_key, entity_name=None):
"""returns an identity-map key for use in storing/retrieving an item from the identity
map, given a result set row.
row - a sqlalchemy.dbengine.RowProxy instance or other map corresponding result-set
column names to their values within a row.
class_ - a reference to the object's class
primary_key - a list of column objects that will target the primary key values
in the given row.
entity_name - optional string name to further qualify the class
"""
return (class_, tuple([row[column] for column in primary_key]), entity_name)
get_row_key = staticmethod(get_row_key)
def begin(self, *obj):
"""deprecated"""
raise exceptions.InvalidRequestError("Session.begin() is deprecated. use install_mod('legacy_session') to enable the old behavior")
def commit(self, *obj):
"""deprecated"""
raise exceptions.InvalidRequestError("Session.commit() is deprecated. use install_mod('legacy_session') to enable the old behavior")
def flush(self, objects=None):
"""flushes all the object modifications present in this session to the database. 'objects'
is a list or tuple of objects specifically to be flushed."""
self.uow.flush(self, objects, echo=self.echo_uow)
def get(self, class_, ident, **kwargs):
"""returns an instance of the object based on the given identifier, or None
if not found. The ident argument is a scalar or tuple of primary key column values in the order of the
table def's primary key columns.
the entity_name keyword argument may also be specified which further qualifies the underlying
Mapper used to perform the query."""
entity_name = kwargs.get('entity_name', None)
return self.query(class_, entity_name=entity_name).get(ident)
def load(self, class_, ident, **kwargs):
"""returns an instance of the object based on the given identifier. If not found,
raises an exception. The method will *remove all pending changes* to the object
already existing in the Session. The ident argument is a scalar or tuple of
primary key columns in the order of the table def's primary key columns.
the entity_name keyword argument may also be specified which further qualifies the underlying
Mapper used to perform the query."""
entity_name = kwargs.get('entity_name', None)
return self.query(class_, entity_name=entity_name).load(ident)
def refresh(self, object):
"""reloads the attributes for the given object from the database, clears
any changes made."""
self.uow.refresh(self, object)
def expire(self, object):
"""invalidates the data in the given object and sets them to refresh themselves
the next time they are requested."""
self.uow.expire(self, object)
def expunge(self, object):
"""removes the given object from this Session. this will free all internal references to the object."""
self.uow.expunge(object)
self._unattach(object)
def save(self, object, entity_name=None):
"""
Adds a transient (unsaved) instance to this Session. This operation cascades the "save_or_update"
method to associated instances if the relation is mapped with cascade="save-update".
The 'entity_name' keyword argument will further qualify the specific Mapper used to handle this
instance.
"""
self._save_impl(object, entity_name=entity_name)
object_mapper(object).cascade_callable('save-update', object, lambda c, e:self._save_or_update_impl(c, e))
def update(self, object, entity_name=None):
"""Brings the given detached (saved) instance into this Session.
If there is a persistent instance with the same identifier (i.e. a saved instance already associated with this
Session), an exception is thrown.
This operation cascades the "save_or_update" method to associated instances if the relation is mapped
with cascade="save-update"."""
self._update_impl(object, entity_name=entity_name)
object_mapper(object).cascade_callable('save-update', object, lambda c, e:self._save_or_update_impl(c, e))
def save_or_update(self, object, entity_name=None):
self._save_or_update_impl(object, entity_name=entity_name)
object_mapper(object).cascade_callable('save-update', object, lambda c, e:self._save_or_update_impl(c, e))
def _save_or_update_impl(self, object, entity_name=None):
key = getattr(object, '_instance_key', None)
if key is None:
self._save_impl(object, entity_name=entity_name)
else:
self._update_impl(object, entity_name=entity_name)
def delete(self, object, entity_name=None):
#self.uow.register_deleted(object)
for c in [object] + list(object_mapper(object).cascade_iterator('delete', object)):
self.uow.register_deleted(c)
def merge(self, object, entity_name=None):
instance = None
for obj in [object] + list(object_mapper(object).cascade_iterator('merge', object)):
key = getattr(obj, '_instance_key', None)
if key is None:
mapper = object_mapper(object)
ident = mapper.identity(object)
for k in ident:
if k is None:
raise exceptions.InvalidRequestError("Instance '%s' does not have a full set of identity values, and does not represent a saved entity in the database. Use the add() method to add unsaved instances to this Session." % repr(obj))
key = mapper.identity_key(ident)
u = self.uow
if u.identity_map.has_key(key):
# TODO: copy the state of the given object into this one. tricky !
inst = u.identity_map[key]
else:
inst = self.get(object.__class__, *key[1])
if obj is object:
instance = inst
return instance
def _save_impl(self, object, **kwargs):
if hasattr(object, '_instance_key'):
if not self.uow.has_key(object._instance_key):
raise exceptions.InvalidRequestError("Instance '%s' is a detached instance or is already persistent in a different Session" % repr(object))
else:
m = class_mapper(object.__class__, entity_name=kwargs.get('entity_name', None))
m._assign_entity_name(object)
self._register_new(object)
def _update_impl(self, object, **kwargs):
if self._is_attached(object) and object not in self.deleted:
return
if not hasattr(object, '_instance_key'):
raise exceptions.InvalidRequestError("Instance '%s' is not persisted" % repr(object))
if attribute_manager.is_modified(object):
self._register_dirty(object)
else:
self._register_clean(object)
def _register_changed(self, obj):
if hasattr(obj, '_instance_key'):
self._register_dirty(obj)
else:
self._register_new(obj)
def _register_new(self, obj):
self._attach(obj)
self.uow.register_new(obj)
def _register_dirty(self, obj):
self._attach(obj)
self.uow.register_dirty(obj)
def _register_clean(self, obj):
self._attach(obj)
self.uow.register_clean(obj)
def _register_deleted(self, obj):
self._attach(obj)
self.uow.register_deleted(obj)
def _attach(self, obj):
"""Attach the given object to this Session."""
if getattr(obj, '_sa_session_id', None) != self.hash_key:
old = getattr(obj, '_sa_session_id', None)
if old is not None and _sessions.has_key(old):
raise exceptions.InvalidRequestError("Object '%s' is already attached to session '%s' (this is '%s')" % (repr(obj), old, id(self)))
# auto-removal from the old session is disabled. but if we decide to
# turn it back on, do it as below: gingerly since _sessions is a WeakValueDict
# and it might be affected by other threads
#try:
# sess = _sessions[old]
#except KeyError:
# sess = None
#if sess is not None:
# sess.expunge(old)
key = getattr(obj, '_instance_key', None)
if key is not None:
self.identity_map[key] = obj
obj._sa_session_id = self.hash_key
def _unattach(self, obj):
if not self._is_attached(obj): #getattr(obj, '_sa_session_id', None) != self.hash_key:
raise exceptions.InvalidRequestError("Object '%s' is not attached to this Session" % repr(obj))
del obj._sa_session_id
def _is_attached(self, obj):
return getattr(obj, '_sa_session_id', None) == self.hash_key
def __contains__(self, obj):
return self._is_attached(obj) and (obj in self.uow.new or self.uow.has_key(obj._instance_key))
def __iter__(self):
return iter(list(self.uow.new) + self.uow.identity_map.values())
def _get(self, key):
return self.uow._get(key)
def has_key(self, key):
return self.uow.has_key(key)
def is_expired(self, instance, **kwargs):
return self.uow.is_expired(instance, **kwargs)
dirty = property(lambda s:s.uow.dirty, doc="a Set of all objects marked as 'dirty' within this Session")
deleted = property(lambda s:s.uow.deleted, doc="a Set of all objects marked as 'deleted' within this Session")
new = property(lambda s:s.uow.new, doc="a Set of all objects marked as 'new' within this Session.")
identity_map = property(lambda s:s.uow.identity_map, doc="a WeakValueDictionary consisting of all objects within this Session keyed to their _instance_key value.")
def import_instance(self, *args, **kwargs):
"""deprecated; a synynom for merge()"""
return self.merge(*args, **kwargs)
def get_id_key(ident, class_, entity_name=None):
return Session.get_id_key(ident, class_, entity_name)
def get_row_key(row, class_, primary_key, entity_name=None):
return Session.get_row_key(row, class_, primary_key, entity_name)
def object_mapper(obj):
return sqlalchemy.orm.object_mapper(obj)
def class_mapper(class_, **kwargs):
return sqlalchemy.orm.class_mapper(class_, **kwargs)
# this is the AttributeManager instance used to provide attribute behavior on objects.
# to all the "global variable police" out there: its a stateless object.
attribute_manager = unitofwork.attribute_manager
# this dictionary maps the hash key of a Session to the Session itself, and
# acts as a Registry with which to locate Sessions. this is to enable
# object instances to be associated with Sessions without having to attach the
# actual Session object directly to the object instance.
_sessions = weakref.WeakValueDictionary()
def object_session(obj):
hashkey = getattr(obj, '_sa_session_id', None)
if hashkey is not None:
return _sessions.get(hashkey)
return None
unitofwork.object_session = object_session
def get_session(obj=None):
"""deprecated"""
if obj is not None:
return object_session(obj)
raise exceptions.InvalidRequestError("get_session() is deprecated, and does not return the thread-local session anymore. Use the SessionContext.mapper_extension or import sqlalchemy.mod.threadlocal to establish a default thread-local context.")
|