summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnts Aasma <ants.aasma@gmail.com>2008-01-20 03:22:00 +0000
committerAnts Aasma <ants.aasma@gmail.com>2008-01-20 03:22:00 +0000
commit9f366afdda4b508eb4ef3e626da2fec98ad04773 (patch)
tree62aeb76e50e813fde97134e830b2a0de39e06dc4
parent4be99db15b7a62b37493c86da07bcc787f44a7df (diff)
downloadsqlalchemy-9f366afdda4b508eb4ef3e626da2fec98ad04773.tar.gz
- parent transactions weren't started on the connection when adding a connection to a nested session transaction.
- session.transaction now always refers to the innermost active transaction, even when commit/rollback are called directly on the session transaction object. - when preparing a two-phase transaction fails on one connection all the connections are rolled back. - two phase transactions can now be prepared. - session.close() didn't close all transactions when nested transactions were used. - rollback() previously erroneously set the current transaction directly to the parent of the transaction that could be rolled back to. - autoflush for commit() wasn't flushing for simple subtransactions.
-rw-r--r--CHANGES28
-rw-r--r--doc/build/content/session.txt6
-rw-r--r--lib/sqlalchemy/ext/activemapper.py2
-rw-r--r--lib/sqlalchemy/orm/session.py217
-rw-r--r--test/orm/session.py118
5 files changed, 294 insertions, 77 deletions
diff --git a/CHANGES b/CHANGES
index 5d2d83375..3b7a65d62 100644
--- a/CHANGES
+++ b/CHANGES
@@ -74,6 +74,29 @@ CHANGES
down polymorphic queries
- miscellaneous tickets: [ticket:940]
+
+ - session transaction management fixes:
+ - fixed bug with session transaction management. Parent transactions
+ weren't started on the connection when adding a connection to a
+ nested transaction.
+ - session.transaction now always refers to the innermost active
+ transaction, even when commit/rollback are called directly on the
+ session transaction object.
+ - when preparing a two-phase transaction fails on one connection
+ all the connections are rolled back.
+ - two phase transactions can now be prepared.
+ - session.close() didn't close all transactions when nested
+ transactions were used.
+ - rollback() previously erroneously set the current transaction
+ directly to the parent of the transaction that could be rolled back
+ to. Now it rolls back the next transaction up that can handle it, but
+ sets the current transaction to it's parent and inactivates the
+ transactions in between. Inactive transactions can only be
+ rolled back or closed, any other call results in an error. In effect
+ this means that every begin() must be accompanied by a corresponding
+ commit() or rollback(), including the implicit begin() in
+ transactional sessions.
+ - autoflush for commit() wasn't flushing for simple subtransactions.
- general
- warnings are now issued as type exceptions.SAWarning.
@@ -89,6 +112,11 @@ CHANGES
allows columns to map properly to result sets even
if long-name truncation kicks in [ticket:941]
+- ext
+ - Changed ext.activemapper to use a non-transactional session
+ for the objectstore as the test seem to imply that this is
+ the expected behavior.
+
0.4.2p3
------
- general
diff --git a/doc/build/content/session.txt b/doc/build/content/session.txt
index 7698a1b5c..e17987cf5 100644
--- a/doc/build/content/session.txt
+++ b/doc/build/content/session.txt
@@ -476,7 +476,7 @@ Session also supports Python 2.5's with statement so that the example above can
item1.foo = 'bar'
item2.bar = 'foo'
-For MySQL and Postgres (and soon Oracle), "nested" transactions can be accomplished which use SAVEPOINT behavior, via the `begin_nested()` method:
+Subtransactions can be created by calling the `begin()` method repeatedly. For each transaction you `begin()` you must always call either `commit()` or `rollback()`. Note that this includes the implicit transaction created by the transactional session. When a subtransaction is created the current transaction of the session is set to that transaction. Commiting the subtransaction will return you to the next outer transaction. Rolling it back will also return you to the next outer transaction, but in addition it will roll back database state to the innermost transaction that supports rolling back to. Usually this means the root transaction, unless you use the nested transaction functionality via the `begin_nested()` method. MySQL and Postgres (and soon Oracle) support using "nested" transactions by creating SAVEPOINTs, :
{python}
Session = sessionmaker(transactional=False)
@@ -492,7 +492,7 @@ For MySQL and Postgres (and soon Oracle), "nested" transactions can be accomplis
sess.commit() # commits u1 and u2
-Finally, for MySQL, Postgres, and soon Oracle as well, the session can be instructed to use two-phase commit semantics using the flag `twophase=True`, which coordinates transactions across multiple databases:
+Finally, for MySQL, Postgres, and soon Oracle as well, the session can be instructed to use two-phase commit semantics. This will coordinate the commiting of transactions across databases so that the transaction is either committed or rolled back in all databases. You can also `prepare()` the session for interacting with transactions not managed by SQLAlchemy. To use two phase transactions set the flag `twophase=True` on the session:
{python}
engine1 = create_engine('postgres://db1')
@@ -511,6 +511,8 @@ Finally, for MySQL, Postgres, and soon Oracle as well, the session can be instru
# before committing both transactions
sess.commit()
+Be aware that when a crash occurs in one of the databases while the the transactions are prepared you have to manually commit or rollback the prepared transactions in your database as appropriate.
+
## Embedding SQL Insert/Update Expressions into a Flush {@name=flushsql}
This feature allows the value of a database column to be set to a SQL expression instead of a literal value. It's especially useful for atomic updates, calling stored procedures, etc. All you do is assign an expression to an attribute:
diff --git a/lib/sqlalchemy/ext/activemapper.py b/lib/sqlalchemy/ext/activemapper.py
index b28ada0af..02f4b5b35 100644
--- a/lib/sqlalchemy/ext/activemapper.py
+++ b/lib/sqlalchemy/ext/activemapper.py
@@ -13,7 +13,7 @@ import sys
#
metadata = ThreadLocalMetaData()
Objectstore = scoped_session
-objectstore = scoped_session(sessionmaker(autoflush=True))
+objectstore = scoped_session(sessionmaker(autoflush=True, transactional=False))
#
# declarative column declaration - this is so that we can infer the colname
diff --git a/lib/sqlalchemy/orm/session.py b/lib/sqlalchemy/orm/session.py
index e2bc71b92..f32ec25ec 100644
--- a/lib/sqlalchemy/orm/session.py
+++ b/lib/sqlalchemy/orm/session.py
@@ -144,110 +144,164 @@ class SessionTransaction(object):
def __init__(self, session, parent=None, autoflush=True, nested=False):
self.session = session
- self.__connections = {}
- self.__parent = parent
+ self._connections = {}
+ self._parent = parent
self.autoflush = autoflush
self.nested = nested
+ self._active = True
+ self._prepared = False
+
+ is_active = property(lambda s: s.session is not None and s._active)
+
+ def _assert_is_active(self):
+ self._assert_is_open()
+ if not self._active:
+ raise exceptions.InvalidRequestError("The transaction is inactive due to a rollback in a subtransaction and should be closed")
+
+ def _assert_is_open(self):
+ if self.session is None:
+ raise exceptions.InvalidRequestError("The transaction is closed")
def connection(self, bindkey, **kwargs):
+ self._assert_is_active()
engine = self.session.get_bind(bindkey, **kwargs)
return self.get_or_add(engine)
def _begin(self, **kwargs):
+ self._assert_is_active()
return SessionTransaction(self.session, self, **kwargs)
+ def _iterate_parents(self, upto=None):
+ if self._parent is upto:
+ return (self,)
+ else:
+ if self._parent is None:
+ raise exceptions.InvalidRequestError("Transaction %s is not on the active transaction list" % upto)
+ return (self,) + self._parent._iterate_parents(upto)
+
def add(self, bind):
- if self.__parent is not None:
- return self.__parent.add(bind)
+ self._assert_is_active()
+ if self._parent is not None and not self.nested:
+ return self._parent.add(bind)
- if bind.engine in self.__connections:
+ if bind.engine in self._connections:
raise exceptions.InvalidRequestError("Session already has a Connection associated for the given %sEngine" % (isinstance(bind, engine.Connection) and "Connection's " or ""))
return self.get_or_add(bind)
- def _connection_dict(self):
- if self.__parent is not None and not self.nested:
- return self.__parent._connection_dict()
- else:
- return self.__connections
-
def get_or_add(self, bind):
- if self.__parent is not None:
+ self._assert_is_active()
+
+ if bind in self._connections:
+ return self._connections[bind][0]
+
+ if self._parent is not None:
+ conn = self._parent.get_or_add(bind)
if not self.nested:
- return self.__parent.get_or_add(bind)
-
- if bind in self.__connections:
- return self.__connections[bind][0]
-
- conn_dict = self.__parent._connection_dict()
- if bind in conn_dict:
- (conn, trans, autoclose) = conn_dict[bind]
- self.__connections[conn] = self.__connections[bind.engine] = (conn, conn.begin_nested(), autoclose)
return conn
- elif bind in self.__connections:
- return self.__connections[bind][0]
-
- if not isinstance(bind, engine.Connection):
- e = bind
- c = bind.contextual_connect()
- else:
- e = bind.engine
- c = bind
- if e in self.__connections:
- raise exceptions.InvalidRequestError("Session already has a Connection associated for the given Connection's Engine")
- if self.nested:
- trans = c.begin_nested()
- elif self.session.twophase:
- trans = c.begin_twophase()
else:
- trans = c.begin()
- self.__connections[c] = self.__connections[e] = (c, trans, c is not bind)
- return self.__connections[c][0]
+ if isinstance(bind, engine.Connection):
+ conn = bind
+ if conn.engine in self._connections:
+ raise exceptions.InvalidRequestError("Session already has a Connection associated for the given Connection's Engine")
+ else:
+ conn = bind.contextual_connect()
- def commit(self):
- if self.__parent is not None and not self.nested:
- return self.__parent
+ if self.session.twophase and self._parent is None:
+ transaction = conn.begin_twophase()
+ elif self.nested:
+ transaction = conn.begin_nested()
+ else:
+ transaction = conn.begin()
+
+ self._connections[conn] = self._connections[conn.engine] = (conn, transaction, conn is not bind)
+ return conn
- if self.session.extension is not None:
+ def prepare(self):
+ if self._parent is not None or not self.session.twophase:
+ raise exceptions.InvalidRequestError("Only root two phase transactions of can be prepared")
+ self._prepare_impl()
+
+ def _prepare_impl(self):
+ self._assert_is_active()
+ if self.session.extension is not None and (self._parent is None or self.nested):
self.session.extension.before_commit(self.session)
-
+
+ if self.session.transaction is not self:
+ for subtransaction in self.session.transaction._iterate_parents(upto=self):
+ subtransaction.commit()
+
if self.autoflush:
self.session.flush()
+
+ if self._parent is None and self.session.twophase:
+ try:
+ for t in util.Set(self._connections.values()):
+ t[1].prepare()
+ except:
+ for t in util.Set(self._connections.values()):
+ try:
+ t[1].rollback()
+ except:
+ pass
+ raise
+
+ self._deactivate()
+ self._prepared = True
+
+ def commit(self):
+ self._assert_is_open()
+ if not self._prepared:
+ self._prepare_impl()
+
+ if self._parent is None or self.nested:
+ for t in util.Set(self._connections.values()):
+ t[1].commit()
- if self.session.twophase:
- for t in util.Set(self.__connections.values()):
- t[1].prepare()
-
- for t in util.Set(self.__connections.values()):
- t[1].commit()
-
- if self.session.extension is not None:
- self.session.extension.after_commit(self.session)
+ if self.session.extension is not None:
+ self.session.extension.after_commit(self.session)
self.close()
- return self.__parent
+ return self._parent
def rollback(self):
- if self.__parent is not None and not self.nested:
- return self.__parent.rollback()
-
- for t in util.Set(self.__connections.values()):
+ self._assert_is_open()
+
+ if self.session.transaction is not self:
+ for subtransaction in self.session.transaction._iterate_parents(upto=self):
+ subtransaction.close()
+
+ if self.is_active:
+ for transaction in self._iterate_parents():
+ if transaction._parent is None or transaction.nested:
+ transaction._rollback_impl()
+ transaction._deactivate()
+ break
+ else:
+ transaction._deactivate()
+ self.close()
+ return self._parent
+
+ def _rollback_impl(self):
+ for t in util.Set(self._connections.values()):
t[1].rollback()
if self.session.extension is not None:
self.session.extension.after_rollback(self.session)
- self.close()
- return self.__parent
+ def _deactivate(self):
+ self._active = False
def close(self):
- if self.__parent is not None:
- return
- for t in util.Set(self.__connections.values()):
- if t[2]:
- t[0].close()
- else:
- t[1].close()
- self.session.transaction = None
+ self.session.transaction = self._parent
+ if self._parent is None:
+ for connection, transaction, autoclose in util.Set(self._connections.values()):
+ if autoclose:
+ connection.close()
+ else:
+ transaction.close()
+ self._deactivate()
+ self.session = None
+ self._connections = None
def __enter__(self):
return self
@@ -458,7 +512,7 @@ class Session(object):
if self.transaction is None:
pass
else:
- self.transaction = self.transaction.rollback()
+ self.transaction.rollback()
# TODO: we can rollback attribute values. however
# we would want to expand attributes.py to be able to save *two* rollback points, one to the
# last flush() and the other to when the object first entered the transaction.
@@ -484,13 +538,29 @@ class Session(object):
if self.transaction is None:
if self.transactional:
self.begin()
- self.transaction = self.transaction.commit()
else:
raise exceptions.InvalidRequestError("No transaction is begun.")
- else:
- self.transaction = self.transaction.commit()
+
+ self.transaction.commit()
if self.transaction is None and self.transactional:
self.begin()
+
+ def prepare(self):
+ """Prepare the current transaction in progress for two phase commit.
+
+ If no transaction is in progress, this method raises
+ an InvalidRequestError.
+
+ Only root transactions of two phase sessions can be prepared. If the current transaction is
+ not such, an InvalidRequestError is raised.
+ """
+ if self.transaction is None:
+ if self.transactional:
+ self.begin()
+ else:
+ raise exceptions.InvalidRequestError("No transaction is begun.")
+
+ self.transaction.prepare()
def connection(self, mapper=None, **kwargs):
"""Return a ``Connection`` corresponding to this session's
@@ -554,7 +624,8 @@ class Session(object):
self.clear()
if self.transaction is not None:
- self.transaction.close()
+ for transaction in self.transaction._iterate_parents():
+ transaction.close()
if self.transactional:
# note this doesnt use any connection resources
self.begin()
diff --git a/test/orm/session.py b/test/orm/session.py
index 0a6157e0a..930efe07a 100644
--- a/test/orm/session.py
+++ b/test/orm/session.py
@@ -1,6 +1,6 @@
import testenv; testenv.configure_for_tests()
from sqlalchemy import *
-from sqlalchemy import exceptions
+from sqlalchemy import exceptions, util
from sqlalchemy.orm import *
from sqlalchemy.orm.session import SessionExtension
from sqlalchemy.orm.session import Session as SessionCls
@@ -355,6 +355,122 @@ class SessionTest(AssertMixin):
assert len(sess.query(User).all()) == 1
sess.close()
+ @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
+ 'oracle', 'maxdb')
+ @testing.exclude('mysql', '<', (5, 0, 3))
+ def test_nested_transaction_connection_add(self):
+ class User(object): pass
+ mapper(User, users)
+
+ sess = create_session(transactional=False)
+
+ sess.begin()
+ sess.begin_nested()
+
+ u1 = User()
+ sess.save(u1)
+ sess.flush()
+
+ sess.rollback()
+
+ u2 = User()
+ sess.save(u2)
+
+ sess.commit()
+
+ self.assertEquals(util.Set(sess.query(User).all()), util.Set([u2]))
+
+ sess.begin()
+ sess.begin_nested()
+
+ u3 = User()
+ sess.save(u3)
+ sess.commit() # commit the nested transaction
+ sess.rollback()
+
+ self.assertEquals(util.Set(sess.query(User).all()), util.Set([u2]))
+
+ sess.close()
+
+ @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
+ 'oracle', 'maxdb')
+ @testing.exclude('mysql', '<', (5, 0, 3))
+ def test_mixed_transaction_control(self):
+ class User(object): pass
+ mapper(User, users)
+
+ sess = create_session(transactional=False)
+
+ sess.begin()
+ sess.begin_nested()
+ transaction = sess.begin()
+
+ sess.save(User())
+
+ transaction.commit()
+ sess.commit()
+ sess.commit()
+
+ sess.close()
+
+ self.assertEquals(len(sess.query(User).all()), 1)
+
+ t1 = sess.begin()
+ t2 = sess.begin_nested()
+
+ sess.save(User())
+
+ t2.commit()
+ assert sess.transaction is t1
+
+ sess.close()
+
+ @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
+ 'oracle', 'maxdb')
+ @testing.exclude('mysql', '<', (5, 0, 3))
+ def test_mixed_transaction_close(self):
+ class User(object): pass
+ mapper(User, users)
+
+ sess = create_session(transactional=True)
+
+ sess.begin_nested()
+
+ sess.save(User())
+ sess.flush()
+
+ sess.close()
+
+ sess.save(User())
+ sess.commit()
+
+ sess.close()
+
+ self.assertEquals(len(sess.query(User).all()), 1)
+
+ @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
+ 'oracle', 'maxdb')
+ @testing.exclude('mysql', '<', (5, 0, 3))
+ def test_error_on_using_inactive_session(self):
+ class User(object): pass
+ mapper(User, users)
+
+ sess = create_session(transactional=False)
+
+ try:
+ sess.begin()
+ sess.begin()
+
+ sess.save(User())
+ sess.flush()
+
+ sess.rollback()
+ sess.begin()
+ assert False
+ except exceptions.InvalidRequestError, e:
+ self.assertEquals(str(e), "The transaction is inactive due to a rollback in a subtransaction and should be closed")
+ sess.close()
+
@engines.close_open_connections
def test_bound_connection(self):
class User(object):pass