diff options
| author | Ants Aasma <ants.aasma@gmail.com> | 2008-01-20 03:22:00 +0000 |
|---|---|---|
| committer | Ants Aasma <ants.aasma@gmail.com> | 2008-01-20 03:22:00 +0000 |
| commit | 9f366afdda4b508eb4ef3e626da2fec98ad04773 (patch) | |
| tree | 62aeb76e50e813fde97134e830b2a0de39e06dc4 /lib | |
| parent | 4be99db15b7a62b37493c86da07bcc787f44a7df (diff) | |
| download | sqlalchemy-9f366afdda4b508eb4ef3e626da2fec98ad04773.tar.gz | |
- parent transactions weren't started on the connection when adding a connection to a nested session transaction.
- session.transaction now always refers to the innermost active transaction, even when commit/rollback are called directly on the session transaction object.
- when preparing a two-phase transaction fails on one connection all the connections are rolled back.
- two phase transactions can now be prepared.
- session.close() didn't close all transactions when nested transactions were used.
- rollback() previously erroneously set the current transaction directly to the parent of the transaction that could be rolled back to.
- autoflush for commit() wasn't flushing for simple subtransactions.
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/sqlalchemy/ext/activemapper.py | 2 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/session.py | 217 |
2 files changed, 145 insertions, 74 deletions
diff --git a/lib/sqlalchemy/ext/activemapper.py b/lib/sqlalchemy/ext/activemapper.py index b28ada0af..02f4b5b35 100644 --- a/lib/sqlalchemy/ext/activemapper.py +++ b/lib/sqlalchemy/ext/activemapper.py @@ -13,7 +13,7 @@ import sys # metadata = ThreadLocalMetaData() Objectstore = scoped_session -objectstore = scoped_session(sessionmaker(autoflush=True)) +objectstore = scoped_session(sessionmaker(autoflush=True, transactional=False)) # # declarative column declaration - this is so that we can infer the colname diff --git a/lib/sqlalchemy/orm/session.py b/lib/sqlalchemy/orm/session.py index e2bc71b92..f32ec25ec 100644 --- a/lib/sqlalchemy/orm/session.py +++ b/lib/sqlalchemy/orm/session.py @@ -144,110 +144,164 @@ class SessionTransaction(object): def __init__(self, session, parent=None, autoflush=True, nested=False): self.session = session - self.__connections = {} - self.__parent = parent + self._connections = {} + self._parent = parent self.autoflush = autoflush self.nested = nested + self._active = True + self._prepared = False + + is_active = property(lambda s: s.session is not None and s._active) + + def _assert_is_active(self): + self._assert_is_open() + if not self._active: + raise exceptions.InvalidRequestError("The transaction is inactive due to a rollback in a subtransaction and should be closed") + + def _assert_is_open(self): + if self.session is None: + raise exceptions.InvalidRequestError("The transaction is closed") def connection(self, bindkey, **kwargs): + self._assert_is_active() engine = self.session.get_bind(bindkey, **kwargs) return self.get_or_add(engine) def _begin(self, **kwargs): + self._assert_is_active() return SessionTransaction(self.session, self, **kwargs) + def _iterate_parents(self, upto=None): + if self._parent is upto: + return (self,) + else: + if self._parent is None: + raise exceptions.InvalidRequestError("Transaction %s is not on the active transaction list" % upto) + return (self,) + self._parent._iterate_parents(upto) + def add(self, bind): - if self.__parent is not None: - return self.__parent.add(bind) + self._assert_is_active() + if self._parent is not None and not self.nested: + return self._parent.add(bind) - if bind.engine in self.__connections: + if bind.engine in self._connections: raise exceptions.InvalidRequestError("Session already has a Connection associated for the given %sEngine" % (isinstance(bind, engine.Connection) and "Connection's " or "")) return self.get_or_add(bind) - def _connection_dict(self): - if self.__parent is not None and not self.nested: - return self.__parent._connection_dict() - else: - return self.__connections - def get_or_add(self, bind): - if self.__parent is not None: + self._assert_is_active() + + if bind in self._connections: + return self._connections[bind][0] + + if self._parent is not None: + conn = self._parent.get_or_add(bind) if not self.nested: - return self.__parent.get_or_add(bind) - - if bind in self.__connections: - return self.__connections[bind][0] - - conn_dict = self.__parent._connection_dict() - if bind in conn_dict: - (conn, trans, autoclose) = conn_dict[bind] - self.__connections[conn] = self.__connections[bind.engine] = (conn, conn.begin_nested(), autoclose) return conn - elif bind in self.__connections: - return self.__connections[bind][0] - - if not isinstance(bind, engine.Connection): - e = bind - c = bind.contextual_connect() - else: - e = bind.engine - c = bind - if e in self.__connections: - raise exceptions.InvalidRequestError("Session already has a Connection associated for the given Connection's Engine") - if self.nested: - trans = c.begin_nested() - elif self.session.twophase: - trans = c.begin_twophase() else: - trans = c.begin() - self.__connections[c] = self.__connections[e] = (c, trans, c is not bind) - return self.__connections[c][0] + if isinstance(bind, engine.Connection): + conn = bind + if conn.engine in self._connections: + raise exceptions.InvalidRequestError("Session already has a Connection associated for the given Connection's Engine") + else: + conn = bind.contextual_connect() - def commit(self): - if self.__parent is not None and not self.nested: - return self.__parent + if self.session.twophase and self._parent is None: + transaction = conn.begin_twophase() + elif self.nested: + transaction = conn.begin_nested() + else: + transaction = conn.begin() + + self._connections[conn] = self._connections[conn.engine] = (conn, transaction, conn is not bind) + return conn - if self.session.extension is not None: + def prepare(self): + if self._parent is not None or not self.session.twophase: + raise exceptions.InvalidRequestError("Only root two phase transactions of can be prepared") + self._prepare_impl() + + def _prepare_impl(self): + self._assert_is_active() + if self.session.extension is not None and (self._parent is None or self.nested): self.session.extension.before_commit(self.session) - + + if self.session.transaction is not self: + for subtransaction in self.session.transaction._iterate_parents(upto=self): + subtransaction.commit() + if self.autoflush: self.session.flush() + + if self._parent is None and self.session.twophase: + try: + for t in util.Set(self._connections.values()): + t[1].prepare() + except: + for t in util.Set(self._connections.values()): + try: + t[1].rollback() + except: + pass + raise + + self._deactivate() + self._prepared = True + + def commit(self): + self._assert_is_open() + if not self._prepared: + self._prepare_impl() + + if self._parent is None or self.nested: + for t in util.Set(self._connections.values()): + t[1].commit() - if self.session.twophase: - for t in util.Set(self.__connections.values()): - t[1].prepare() - - for t in util.Set(self.__connections.values()): - t[1].commit() - - if self.session.extension is not None: - self.session.extension.after_commit(self.session) + if self.session.extension is not None: + self.session.extension.after_commit(self.session) self.close() - return self.__parent + return self._parent def rollback(self): - if self.__parent is not None and not self.nested: - return self.__parent.rollback() - - for t in util.Set(self.__connections.values()): + self._assert_is_open() + + if self.session.transaction is not self: + for subtransaction in self.session.transaction._iterate_parents(upto=self): + subtransaction.close() + + if self.is_active: + for transaction in self._iterate_parents(): + if transaction._parent is None or transaction.nested: + transaction._rollback_impl() + transaction._deactivate() + break + else: + transaction._deactivate() + self.close() + return self._parent + + def _rollback_impl(self): + for t in util.Set(self._connections.values()): t[1].rollback() if self.session.extension is not None: self.session.extension.after_rollback(self.session) - self.close() - return self.__parent + def _deactivate(self): + self._active = False def close(self): - if self.__parent is not None: - return - for t in util.Set(self.__connections.values()): - if t[2]: - t[0].close() - else: - t[1].close() - self.session.transaction = None + self.session.transaction = self._parent + if self._parent is None: + for connection, transaction, autoclose in util.Set(self._connections.values()): + if autoclose: + connection.close() + else: + transaction.close() + self._deactivate() + self.session = None + self._connections = None def __enter__(self): return self @@ -458,7 +512,7 @@ class Session(object): if self.transaction is None: pass else: - self.transaction = self.transaction.rollback() + self.transaction.rollback() # TODO: we can rollback attribute values. however # we would want to expand attributes.py to be able to save *two* rollback points, one to the # last flush() and the other to when the object first entered the transaction. @@ -484,13 +538,29 @@ class Session(object): if self.transaction is None: if self.transactional: self.begin() - self.transaction = self.transaction.commit() else: raise exceptions.InvalidRequestError("No transaction is begun.") - else: - self.transaction = self.transaction.commit() + + self.transaction.commit() if self.transaction is None and self.transactional: self.begin() + + def prepare(self): + """Prepare the current transaction in progress for two phase commit. + + If no transaction is in progress, this method raises + an InvalidRequestError. + + Only root transactions of two phase sessions can be prepared. If the current transaction is + not such, an InvalidRequestError is raised. + """ + if self.transaction is None: + if self.transactional: + self.begin() + else: + raise exceptions.InvalidRequestError("No transaction is begun.") + + self.transaction.prepare() def connection(self, mapper=None, **kwargs): """Return a ``Connection`` corresponding to this session's @@ -554,7 +624,8 @@ class Session(object): self.clear() if self.transaction is not None: - self.transaction.close() + for transaction in self.transaction._iterate_parents(): + transaction.close() if self.transactional: # note this doesnt use any connection resources self.begin() |
