from sqlalchemy.testing import eq_, assert_raises, \ assert_raises_message, ne_, expect_warnings import sys from sqlalchemy import event from sqlalchemy.testing.engines import testing_engine from sqlalchemy import create_engine, MetaData, INT, VARCHAR, Sequence, \ select, Integer, String, func, text, exc from sqlalchemy.testing.schema import Table from sqlalchemy.testing.schema import Column from sqlalchemy import testing from sqlalchemy.testing import fixtures users, metadata = None, None class TransactionTest(fixtures.TestBase): __backend__ = True @classmethod def setup_class(cls): global users, metadata metadata = MetaData() users = Table('query_users', metadata, Column('user_id', INT, primary_key=True), Column('user_name', VARCHAR(20)), test_needs_acid=True) users.create(testing.db) def teardown(self): testing.db.execute(users.delete()).close() @classmethod def teardown_class(cls): users.drop(testing.db) def test_commits(self): connection = testing.db.connect() transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name='user1') transaction.commit() transaction = connection.begin() connection.execute(users.insert(), user_id=2, user_name='user2') connection.execute(users.insert(), user_id=3, user_name='user3') transaction.commit() transaction = connection.begin() result = connection.execute("select * from query_users") assert len(result.fetchall()) == 3 transaction.commit() connection.close() def test_rollback(self): """test a basic rollback""" connection = testing.db.connect() transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name='user1') connection.execute(users.insert(), user_id=2, user_name='user2') connection.execute(users.insert(), user_id=3, user_name='user3') transaction.rollback() result = connection.execute("select * from query_users") assert len(result.fetchall()) == 0 connection.close() def test_raise(self): connection = testing.db.connect() transaction = connection.begin() try: connection.execute(users.insert(), user_id=1, user_name='user1') connection.execute(users.insert(), user_id=2, user_name='user2') connection.execute(users.insert(), user_id=1, user_name='user3') transaction.commit() assert False except Exception as e: print("Exception: ", e) transaction.rollback() result = connection.execute("select * from query_users") assert len(result.fetchall()) == 0 connection.close() def test_transaction_container(self): def go(conn, table, data): for d in data: conn.execute(table.insert(), d) testing.db.transaction(go, users, [dict(user_id=1, user_name='user1')]) eq_(testing.db.execute(users.select()).fetchall(), [(1, 'user1')]) assert_raises(exc.DBAPIError, testing.db.transaction, go, users, [{'user_id': 2, 'user_name': 'user2'}, {'user_id': 1, 'user_name': 'user3'}]) eq_(testing.db.execute(users.select()).fetchall(), [(1, 'user1')]) def test_nested_rollback(self): connection = testing.db.connect() try: transaction = connection.begin() try: connection.execute(users.insert(), user_id=1, user_name='user1') connection.execute(users.insert(), user_id=2, user_name='user2') connection.execute(users.insert(), user_id=3, user_name='user3') trans2 = connection.begin() try: connection.execute(users.insert(), user_id=4, user_name='user4') connection.execute(users.insert(), user_id=5, user_name='user5') raise Exception('uh oh') trans2.commit() except Exception: trans2.rollback() raise transaction.rollback() except Exception as e: transaction.rollback() raise except Exception as e: try: # and not "This transaction is inactive" # comment moved here to fix pep8 assert str(e) == 'uh oh' finally: connection.close() def test_branch_nested_rollback(self): connection = testing.db.connect() try: connection.begin() branched = connection.connect() assert branched.in_transaction() branched.execute(users.insert(), user_id=1, user_name='user1') nested = branched.begin() branched.execute(users.insert(), user_id=2, user_name='user2') nested.rollback() assert not connection.in_transaction() eq_(connection.scalar("select count(*) from query_users"), 0) finally: connection.close() def test_branch_autorollback(self): connection = testing.db.connect() try: branched = connection.connect() branched.execute(users.insert(), user_id=1, user_name='user1') try: branched.execute(users.insert(), user_id=1, user_name='user1') except exc.DBAPIError: pass finally: connection.close() def test_branch_orig_rollback(self): connection = testing.db.connect() try: branched = connection.connect() branched.execute(users.insert(), user_id=1, user_name='user1') nested = branched.begin() assert branched.in_transaction() branched.execute(users.insert(), user_id=2, user_name='user2') nested.rollback() eq_(connection.scalar("select count(*) from query_users"), 1) finally: connection.close() def test_branch_autocommit(self): connection = testing.db.connect() try: branched = connection.connect() branched.execute(users.insert(), user_id=1, user_name='user1') finally: connection.close() eq_(testing.db.scalar("select count(*) from query_users"), 1) @testing.requires.savepoints def test_branch_savepoint_rollback(self): connection = testing.db.connect() try: trans = connection.begin() branched = connection.connect() assert branched.in_transaction() branched.execute(users.insert(), user_id=1, user_name='user1') nested = branched.begin_nested() branched.execute(users.insert(), user_id=2, user_name='user2') nested.rollback() assert connection.in_transaction() trans.commit() eq_(connection.scalar("select count(*) from query_users"), 1) finally: connection.close() @testing.requires.two_phase_transactions def test_branch_twophase_rollback(self): connection = testing.db.connect() try: branched = connection.connect() assert not branched.in_transaction() branched.execute(users.insert(), user_id=1, user_name='user1') nested = branched.begin_twophase() branched.execute(users.insert(), user_id=2, user_name='user2') nested.rollback() assert not connection.in_transaction() eq_(connection.scalar("select count(*) from query_users"), 1) finally: connection.close() @testing.requires.python2 @testing.requires.savepoints_w_release def test_savepoint_release_fails_warning(self): with testing.db.connect() as connection: connection.begin() with expect_warnings( "An exception has occurred during handling of a previous " "exception. The previous exception " r"is:.*..SQL\:.*RELEASE SAVEPOINT" ): def go(): with connection.begin_nested() as savepoint: connection.dialect.do_release_savepoint( connection, savepoint._savepoint) assert_raises_message( exc.DBAPIError, r".*SQL\:.*ROLLBACK TO SAVEPOINT", go ) def test_retains_through_options(self): connection = testing.db.connect() try: transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name='user1') conn2 = connection.execution_options(dummy=True) conn2.execute(users.insert(), user_id=2, user_name='user2') transaction.rollback() eq_(connection.scalar("select count(*) from query_users"), 0) finally: connection.close() def test_nesting(self): connection = testing.db.connect() transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name='user1') connection.execute(users.insert(), user_id=2, user_name='user2') connection.execute(users.insert(), user_id=3, user_name='user3') trans2 = connection.begin() connection.execute(users.insert(), user_id=4, user_name='user4') connection.execute(users.insert(), user_id=5, user_name='user5') trans2.commit() transaction.rollback() self.assert_(connection.scalar('select count(*) from ' 'query_users') == 0) result = connection.execute('select * from query_users') assert len(result.fetchall()) == 0 connection.close() def test_with_interface(self): connection = testing.db.connect() trans = connection.begin() connection.execute(users.insert(), user_id=1, user_name='user1') connection.execute(users.insert(), user_id=2, user_name='user2') try: connection.execute(users.insert(), user_id=2, user_name='user2.5') except Exception as e: trans.__exit__(*sys.exc_info()) assert not trans.is_active self.assert_(connection.scalar('select count(*) from ' 'query_users') == 0) trans = connection.begin() connection.execute(users.insert(), user_id=1, user_name='user1') trans.__exit__(None, None, None) assert not trans.is_active self.assert_(connection.scalar('select count(*) from ' 'query_users') == 1) connection.close() def test_close(self): connection = testing.db.connect() transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name='user1') connection.execute(users.insert(), user_id=2, user_name='user2') connection.execute(users.insert(), user_id=3, user_name='user3') trans2 = connection.begin() connection.execute(users.insert(), user_id=4, user_name='user4') connection.execute(users.insert(), user_id=5, user_name='user5') assert connection.in_transaction() trans2.close() assert connection.in_transaction() transaction.commit() assert not connection.in_transaction() self.assert_(connection.scalar('select count(*) from ' 'query_users') == 5) result = connection.execute('select * from query_users') assert len(result.fetchall()) == 5 connection.close() def test_close2(self): connection = testing.db.connect() transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name='user1') connection.execute(users.insert(), user_id=2, user_name='user2') connection.execute(users.insert(), user_id=3, user_name='user3') trans2 = connection.begin() connection.execute(users.insert(), user_id=4, user_name='user4') connection.execute(users.insert(), user_id=5, user_name='user5') assert connection.in_transaction() trans2.close() assert connection.in_transaction() transaction.close() assert not connection.in_transaction() self.assert_(connection.scalar('select count(*) from ' 'query_users') == 0) result = connection.execute('select * from query_users') assert len(result.fetchall()) == 0 connection.close() @testing.requires.savepoints def test_nested_subtransaction_rollback(self): connection = testing.db.connect() transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name='user1') trans2 = connection.begin_nested() connection.execute(users.insert(), user_id=2, user_name='user2') trans2.rollback() connection.execute(users.insert(), user_id=3, user_name='user3') transaction.commit() eq_(connection.execute(select([users.c.user_id]). order_by(users.c.user_id)).fetchall(), [(1, ), (3, )]) connection.close() @testing.requires.savepoints @testing.crashes('oracle+zxjdbc', 'Errors out and causes subsequent tests to ' 'deadlock') def test_nested_subtransaction_commit(self): connection = testing.db.connect() transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name='user1') trans2 = connection.begin_nested() connection.execute(users.insert(), user_id=2, user_name='user2') trans2.commit() connection.execute(users.insert(), user_id=3, user_name='user3') transaction.commit() eq_(connection.execute(select([users.c.user_id]). order_by(users.c.user_id)).fetchall(), [(1, ), (2, ), (3, )]) connection.close() @testing.requires.savepoints def test_rollback_to_subtransaction(self): connection = testing.db.connect() transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name='user1') trans2 = connection.begin_nested() connection.execute(users.insert(), user_id=2, user_name='user2') trans3 = connection.begin() connection.execute(users.insert(), user_id=3, user_name='user3') trans3.rollback() connection.execute(users.insert(), user_id=4, user_name='user4') transaction.commit() eq_(connection.execute(select([users.c.user_id]). order_by(users.c.user_id)).fetchall(), [(1, ), (4, )]) connection.close() @testing.requires.two_phase_transactions def test_two_phase_transaction(self): connection = testing.db.connect() transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=1, user_name='user1') transaction.prepare() transaction.commit() transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=2, user_name='user2') transaction.commit() transaction.close() transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=3, user_name='user3') transaction.rollback() transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=4, user_name='user4') transaction.prepare() transaction.rollback() transaction.close() eq_(connection.execute(select([users.c.user_id]). order_by(users.c.user_id)).fetchall(), [(1, ), (2, )]) connection.close() # PG emergency shutdown: # select * from pg_prepared_xacts # ROLLBACK PREPARED '' # MySQL emergency shutdown: # for arg in `mysql -u root -e "xa recover" | cut -c 8-100 | # grep sa`; do mysql -u root -e "xa rollback '$arg'"; done @testing.crashes('mysql', 'Crashing on 5.5, not worth it') @testing.requires.skip_mysql_on_windows @testing.requires.two_phase_transactions @testing.requires.savepoints def test_mixed_two_phase_transaction(self): connection = testing.db.connect() transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=1, user_name='user1') transaction2 = connection.begin() connection.execute(users.insert(), user_id=2, user_name='user2') transaction3 = connection.begin_nested() connection.execute(users.insert(), user_id=3, user_name='user3') transaction4 = connection.begin() connection.execute(users.insert(), user_id=4, user_name='user4') transaction4.commit() transaction3.rollback() connection.execute(users.insert(), user_id=5, user_name='user5') transaction2.commit() transaction.prepare() transaction.commit() eq_(connection.execute(select([users.c.user_id]). order_by(users.c.user_id)).fetchall(), [(1, ), (2, ), (5, )]) connection.close() @testing.requires.two_phase_transactions @testing.requires.two_phase_recovery def test_two_phase_recover(self): # MySQL recovery doesn't currently seem to work correctly # Prepared transactions disappear when connections are closed # and even when they aren't it doesn't seem possible to use the # recovery id. connection = testing.db.connect() transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=1, user_name='user1') transaction.prepare() connection.invalidate() connection2 = testing.db.connect() eq_( connection2.execution_options(autocommit=True). execute(select([users.c.user_id]). order_by(users.c.user_id)).fetchall(), []) recoverables = connection2.recover_twophase() assert transaction.xid in recoverables connection2.commit_prepared(transaction.xid, recover=True) eq_(connection2.execute(select([users.c.user_id]). order_by(users.c.user_id)).fetchall(), [(1, )]) connection2.close() @testing.requires.two_phase_transactions def test_multiple_two_phase(self): conn = testing.db.connect() xa = conn.begin_twophase() conn.execute(users.insert(), user_id=1, user_name='user1') xa.prepare() xa.commit() xa = conn.begin_twophase() conn.execute(users.insert(), user_id=2, user_name='user2') xa.prepare() xa.rollback() xa = conn.begin_twophase() conn.execute(users.insert(), user_id=3, user_name='user3') xa.rollback() xa = conn.begin_twophase() conn.execute(users.insert(), user_id=4, user_name='user4') xa.prepare() xa.commit() result = \ conn.execute(select([users.c.user_name]). order_by(users.c.user_id)) eq_(result.fetchall(), [('user1', ), ('user4', )]) conn.close() @testing.requires.two_phase_transactions def test_reset_rollback_two_phase_no_rollback(self): # test [ticket:2907], essentially that the # TwoPhaseTransaction is given the job of "reset on return" # so that picky backends like MySQL correctly clear out # their state when a connection is closed without handling # the transaction explicitly. eng = testing_engine() # MySQL raises if you call straight rollback() on # a connection with an XID present @event.listens_for(eng, "invalidate") def conn_invalidated(dbapi_con, con_record, exception): dbapi_con.close() raise exception with eng.connect() as conn: rec = conn.connection._connection_record raw_dbapi_con = rec.connection xa = conn.begin_twophase() conn.execute(users.insert(), user_id=1, user_name='user1') assert rec.connection is raw_dbapi_con with eng.connect() as conn: result = \ conn.execute(select([users.c.user_name]). order_by(users.c.user_id)) eq_(result.fetchall(), []) class ResetAgentTest(fixtures.TestBase): __backend__ = True def test_begin_close(self): with testing.db.connect() as connection: trans = connection.begin() assert connection.connection._reset_agent is trans assert not trans.is_active def test_begin_rollback(self): with testing.db.connect() as connection: trans = connection.begin() assert connection.connection._reset_agent is trans trans.rollback() assert connection.connection._reset_agent is None def test_begin_commit(self): with testing.db.connect() as connection: trans = connection.begin() assert connection.connection._reset_agent is trans trans.commit() assert connection.connection._reset_agent is None @testing.requires.savepoints def test_begin_nested_close(self): with testing.db.connect() as connection: trans = connection.begin_nested() assert connection.connection._reset_agent is trans assert not trans.is_active @testing.requires.savepoints def test_begin_begin_nested_close(self): with testing.db.connect() as connection: trans = connection.begin() trans2 = connection.begin_nested() assert connection.connection._reset_agent is trans assert trans2.is_active # was never closed assert not trans.is_active @testing.requires.savepoints def test_begin_begin_nested_rollback_commit(self): with testing.db.connect() as connection: trans = connection.begin() trans2 = connection.begin_nested() assert connection.connection._reset_agent is trans trans2.rollback() assert connection.connection._reset_agent is trans trans.commit() assert connection.connection._reset_agent is None @testing.requires.savepoints def test_begin_begin_nested_rollback_rollback(self): with testing.db.connect() as connection: trans = connection.begin() trans2 = connection.begin_nested() assert connection.connection._reset_agent is trans trans2.rollback() assert connection.connection._reset_agent is trans trans.rollback() assert connection.connection._reset_agent is None def test_begin_begin_rollback_rollback(self): with testing.db.connect() as connection: trans = connection.begin() trans2 = connection.begin() assert connection.connection._reset_agent is trans trans2.rollback() assert connection.connection._reset_agent is None trans.rollback() assert connection.connection._reset_agent is None def test_begin_begin_commit_commit(self): with testing.db.connect() as connection: trans = connection.begin() trans2 = connection.begin() assert connection.connection._reset_agent is trans trans2.commit() assert connection.connection._reset_agent is trans trans.commit() assert connection.connection._reset_agent is None @testing.requires.two_phase_transactions def test_reset_via_agent_begin_twophase(self): with testing.db.connect() as connection: trans = connection.begin_twophase() assert connection.connection._reset_agent is trans @testing.requires.two_phase_transactions def test_reset_via_agent_begin_twophase_commit(self): with testing.db.connect() as connection: trans = connection.begin_twophase() assert connection.connection._reset_agent is trans trans.commit() assert connection.connection._reset_agent is None @testing.requires.two_phase_transactions def test_reset_via_agent_begin_twophase_rollback(self): with testing.db.connect() as connection: trans = connection.begin_twophase() assert connection.connection._reset_agent is trans trans.rollback() assert connection.connection._reset_agent is None class AutoRollbackTest(fixtures.TestBase): __backend__ = True @classmethod def setup_class(cls): global metadata metadata = MetaData() @classmethod def teardown_class(cls): metadata.drop_all(testing.db) def test_rollback_deadlock(self): """test that returning connections to the pool clears any object locks.""" conn1 = testing.db.connect() conn2 = testing.db.connect() users = Table('deadlock_users', metadata, Column('user_id', INT, primary_key=True), Column('user_name', VARCHAR(20)), test_needs_acid=True) users.create(conn1) conn1.execute('select * from deadlock_users') conn1.close() # without auto-rollback in the connection pool's return() logic, # this deadlocks in PostgreSQL, because conn1 is returned to the # pool but still has a lock on "deadlock_users". comment out the # rollback in pool/ConnectionFairy._close() to see ! users.drop(conn2) conn2.close() class ExplicitAutoCommitTest(fixtures.TestBase): """test the 'autocommit' flag on select() and text() objects. Requires PostgreSQL so that we may define a custom function which modifies the database. """ __only_on__ = 'postgresql' @classmethod def setup_class(cls): global metadata, foo metadata = MetaData(testing.db) foo = Table('foo', metadata, Column('id', Integer, primary_key=True), Column('data', String(100))) metadata.create_all() testing.db.execute("create function insert_foo(varchar) " "returns integer as 'insert into foo(data) " "values ($1);select 1;' language sql") def teardown(self): foo.delete().execute().close() @classmethod def teardown_class(cls): testing.db.execute('drop function insert_foo(varchar)') metadata.drop_all() def test_control(self): # test that not using autocommit does not commit conn1 = testing.db.connect() conn2 = testing.db.connect() conn1.execute(select([func.insert_foo('data1')])) assert conn2.execute(select([foo.c.data])).fetchall() == [] conn1.execute(text("select insert_foo('moredata')")) assert conn2.execute(select([foo.c.data])).fetchall() == [] trans = conn1.begin() trans.commit() assert conn2.execute(select([foo.c.data])).fetchall() \ == [('data1', ), ('moredata', )] conn1.close() conn2.close() def test_explicit_compiled(self): conn1 = testing.db.connect() conn2 = testing.db.connect() conn1.execute(select([func.insert_foo('data1')]) .execution_options(autocommit=True)) assert conn2.execute(select([foo.c.data])).fetchall() \ == [('data1', )] conn1.close() conn2.close() def test_explicit_connection(self): conn1 = testing.db.connect() conn2 = testing.db.connect() conn1.execution_options(autocommit=True).\ execute(select([func.insert_foo('data1')])) eq_(conn2.execute(select([foo.c.data])).fetchall(), [('data1',)]) # connection supersedes statement conn1.execution_options(autocommit=False).\ execute(select([func.insert_foo('data2')]) .execution_options(autocommit=True)) eq_(conn2.execute(select([foo.c.data])).fetchall(), [('data1',)]) # ditto conn1.execution_options(autocommit=True).\ execute(select([func.insert_foo('data3')]) .execution_options(autocommit=False)) eq_(conn2.execute(select([foo.c.data])).fetchall(), [('data1',), ('data2', ), ('data3', )]) conn1.close() conn2.close() def test_explicit_text(self): conn1 = testing.db.connect() conn2 = testing.db.connect() conn1.execute(text("select insert_foo('moredata')") .execution_options(autocommit=True)) assert conn2.execute(select([foo.c.data])).fetchall() \ == [('moredata', )] conn1.close() conn2.close() @testing.uses_deprecated(r'autocommit on select\(\) is deprecated', r'``autocommit\(\)`` is deprecated') def test_explicit_compiled_deprecated(self): conn1 = testing.db.connect() conn2 = testing.db.connect() conn1.execute(select([func.insert_foo('data1')], autocommit=True)) assert conn2.execute(select([foo.c.data])).fetchall() \ == [('data1', )] conn1.execute(select([func.insert_foo('data2')]).autocommit()) assert conn2.execute(select([foo.c.data])).fetchall() \ == [('data1', ), ('data2', )] conn1.close() conn2.close() @testing.uses_deprecated(r'autocommit on text\(\) is deprecated') def test_explicit_text_deprecated(self): conn1 = testing.db.connect() conn2 = testing.db.connect() conn1.execute(text("select insert_foo('moredata')", autocommit=True)) assert conn2.execute(select([foo.c.data])).fetchall() \ == [('moredata', )] conn1.close() conn2.close() def test_implicit_text(self): conn1 = testing.db.connect() conn2 = testing.db.connect() conn1.execute(text("insert into foo (data) values ('implicitdata')")) assert conn2.execute(select([foo.c.data])).fetchall() \ == [('implicitdata', )] conn1.close() conn2.close() tlengine = None class TLTransactionTest(fixtures.TestBase): __requires__ = ('ad_hoc_engines', ) __backend__ = True @classmethod def setup_class(cls): global users, metadata, tlengine tlengine = testing_engine(options=dict(strategy='threadlocal')) metadata = MetaData() users = Table('query_users', metadata, Column('user_id', INT, Sequence('query_users_id_seq', optional=True), primary_key=True), Column('user_name', VARCHAR(20)), test_needs_acid=True) metadata.create_all(tlengine) def teardown(self): tlengine.execute(users.delete()).close() @classmethod def teardown_class(cls): tlengine.close() metadata.drop_all(tlengine) tlengine.dispose() def setup(self): # ensure tests start with engine closed tlengine.close() @testing.crashes('oracle', 'TNS error of unknown origin occurs on the buildbot.') def test_rollback_no_trans(self): tlengine = testing_engine(options=dict(strategy="threadlocal")) # shouldn't fail tlengine.rollback() tlengine.begin() tlengine.rollback() # shouldn't fail tlengine.rollback() def test_commit_no_trans(self): tlengine = testing_engine(options=dict(strategy="threadlocal")) # shouldn't fail tlengine.commit() tlengine.begin() tlengine.rollback() # shouldn't fail tlengine.commit() def test_prepare_no_trans(self): tlengine = testing_engine(options=dict(strategy="threadlocal")) # shouldn't fail tlengine.prepare() tlengine.begin() tlengine.rollback() # shouldn't fail tlengine.prepare() def test_connection_close(self): """test that when connections are closed for real, transactions are rolled back and disposed.""" c = tlengine.contextual_connect() c.begin() assert c.in_transaction() c.close() assert not c.in_transaction() def test_transaction_close(self): c = tlengine.contextual_connect() t = c.begin() tlengine.execute(users.insert(), user_id=1, user_name='user1') tlengine.execute(users.insert(), user_id=2, user_name='user2') t2 = c.begin() tlengine.execute(users.insert(), user_id=3, user_name='user3') tlengine.execute(users.insert(), user_id=4, user_name='user4') t2.close() result = c.execute('select * from query_users') assert len(result.fetchall()) == 4 t.close() external_connection = tlengine.connect() result = external_connection.execute('select * from query_users') try: assert len(result.fetchall()) == 0 finally: c.close() external_connection.close() def test_rollback(self): """test a basic rollback""" tlengine.begin() tlengine.execute(users.insert(), user_id=1, user_name='user1') tlengine.execute(users.insert(), user_id=2, user_name='user2') tlengine.execute(users.insert(), user_id=3, user_name='user3') tlengine.rollback() external_connection = tlengine.connect() result = external_connection.execute('select * from query_users') try: assert len(result.fetchall()) == 0 finally: external_connection.close() def test_commit(self): """test a basic commit""" tlengine.begin() tlengine.execute(users.insert(), user_id=1, user_name='user1') tlengine.execute(users.insert(), user_id=2, user_name='user2') tlengine.execute(users.insert(), user_id=3, user_name='user3') tlengine.commit() external_connection = tlengine.connect() result = external_connection.execute('select * from query_users') try: assert len(result.fetchall()) == 3 finally: external_connection.close() def test_with_interface(self): trans = tlengine.begin() tlengine.execute(users.insert(), user_id=1, user_name='user1') tlengine.execute(users.insert(), user_id=2, user_name='user2') trans.commit() trans = tlengine.begin() tlengine.execute(users.insert(), user_id=3, user_name='user3') trans.__exit__(Exception, "fake", None) trans = tlengine.begin() tlengine.execute(users.insert(), user_id=4, user_name='user4') trans.__exit__(None, None, None) eq_( tlengine.execute(users.select().order_by(users.c.user_id)) .fetchall(), [ (1, 'user1'), (2, 'user2'), (4, 'user4'), ] ) def test_commits(self): connection = tlengine.connect() assert connection.execute('select count(*) from query_users' ).scalar() == 0 connection.close() connection = tlengine.contextual_connect() transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name='user1') transaction.commit() transaction = connection.begin() connection.execute(users.insert(), user_id=2, user_name='user2') connection.execute(users.insert(), user_id=3, user_name='user3') transaction.commit() transaction = connection.begin() result = connection.execute('select * from query_users') rows = result.fetchall() assert len(rows) == 3, 'expected 3 got %d' % len(rows) transaction.commit() connection.close() def test_rollback_off_conn(self): # test that a TLTransaction opened off a TLConnection allows # that TLConnection to be aware of the transactional context conn = tlengine.contextual_connect() trans = conn.begin() conn.execute(users.insert(), user_id=1, user_name='user1') conn.execute(users.insert(), user_id=2, user_name='user2') conn.execute(users.insert(), user_id=3, user_name='user3') trans.rollback() external_connection = tlengine.connect() result = external_connection.execute('select * from query_users') try: assert len(result.fetchall()) == 0 finally: conn.close() external_connection.close() def test_morerollback_off_conn(self): # test that an existing TLConnection automatically takes place # in a TLTransaction opened on a second TLConnection conn = tlengine.contextual_connect() conn2 = tlengine.contextual_connect() trans = conn2.begin() conn.execute(users.insert(), user_id=1, user_name='user1') conn.execute(users.insert(), user_id=2, user_name='user2') conn.execute(users.insert(), user_id=3, user_name='user3') trans.rollback() external_connection = tlengine.connect() result = external_connection.execute('select * from query_users') try: assert len(result.fetchall()) == 0 finally: conn.close() conn2.close() external_connection.close() def test_commit_off_connection(self): conn = tlengine.contextual_connect() trans = conn.begin() conn.execute(users.insert(), user_id=1, user_name='user1') conn.execute(users.insert(), user_id=2, user_name='user2') conn.execute(users.insert(), user_id=3, user_name='user3') trans.commit() external_connection = tlengine.connect() result = external_connection.execute('select * from query_users') try: assert len(result.fetchall()) == 3 finally: conn.close() external_connection.close() def test_nesting_rollback(self): """tests nesting of transactions, rollback at the end""" external_connection = tlengine.connect() self.assert_(external_connection.connection is not tlengine.contextual_connect().connection) tlengine.begin() tlengine.execute(users.insert(), user_id=1, user_name='user1') tlengine.execute(users.insert(), user_id=2, user_name='user2') tlengine.execute(users.insert(), user_id=3, user_name='user3') tlengine.begin() tlengine.execute(users.insert(), user_id=4, user_name='user4') tlengine.execute(users.insert(), user_id=5, user_name='user5') tlengine.commit() tlengine.rollback() try: self.assert_(external_connection.scalar( 'select count(*) from query_users') == 0) finally: external_connection.close() def test_nesting_commit(self): """tests nesting of transactions, commit at the end.""" external_connection = tlengine.connect() self.assert_(external_connection.connection is not tlengine.contextual_connect().connection) tlengine.begin() tlengine.execute(users.insert(), user_id=1, user_name='user1') tlengine.execute(users.insert(), user_id=2, user_name='user2') tlengine.execute(users.insert(), user_id=3, user_name='user3') tlengine.begin() tlengine.execute(users.insert(), user_id=4, user_name='user4') tlengine.execute(users.insert(), user_id=5, user_name='user5') tlengine.commit() tlengine.commit() try: self.assert_(external_connection.scalar( 'select count(*) from query_users') == 5) finally: external_connection.close() def test_mixed_nesting(self): """tests nesting of transactions off the TLEngine directly inside of transactions off the connection from the TLEngine""" external_connection = tlengine.connect() self.assert_(external_connection.connection is not tlengine.contextual_connect().connection) conn = tlengine.contextual_connect() trans = conn.begin() trans2 = conn.begin() tlengine.execute(users.insert(), user_id=1, user_name='user1') tlengine.execute(users.insert(), user_id=2, user_name='user2') tlengine.execute(users.insert(), user_id=3, user_name='user3') tlengine.begin() tlengine.execute(users.insert(), user_id=4, user_name='user4') tlengine.begin() tlengine.execute(users.insert(), user_id=5, user_name='user5') tlengine.execute(users.insert(), user_id=6, user_name='user6') tlengine.execute(users.insert(), user_id=7, user_name='user7') tlengine.commit() tlengine.execute(users.insert(), user_id=8, user_name='user8') tlengine.commit() trans2.commit() trans.rollback() conn.close() try: self.assert_(external_connection.scalar( 'select count(*) from query_users') == 0) finally: external_connection.close() def test_more_mixed_nesting(self): """tests nesting of transactions off the connection from the TLEngine inside of transactions off the TLEngine directly.""" external_connection = tlengine.connect() self.assert_(external_connection.connection is not tlengine.contextual_connect().connection) tlengine.begin() connection = tlengine.contextual_connect() connection.execute(users.insert(), user_id=1, user_name='user1') tlengine.begin() connection.execute(users.insert(), user_id=2, user_name='user2') connection.execute(users.insert(), user_id=3, user_name='user3') trans = connection.begin() connection.execute(users.insert(), user_id=4, user_name='user4') connection.execute(users.insert(), user_id=5, user_name='user5') trans.commit() tlengine.commit() tlengine.rollback() connection.close() try: self.assert_(external_connection.scalar( 'select count(*) from query_users') == 0) finally: external_connection.close() @testing.requires.savepoints def test_nested_subtransaction_rollback(self): tlengine.begin() tlengine.execute(users.insert(), user_id=1, user_name='user1') tlengine.begin_nested() tlengine.execute(users.insert(), user_id=2, user_name='user2') tlengine.rollback() tlengine.execute(users.insert(), user_id=3, user_name='user3') tlengine.commit() tlengine.close() eq_(tlengine.execute(select([users.c.user_id]). order_by(users.c.user_id)).fetchall(), [(1, ), (3, )]) tlengine.close() @testing.requires.savepoints @testing.crashes('oracle+zxjdbc', 'Errors out and causes subsequent tests to ' 'deadlock') def test_nested_subtransaction_commit(self): tlengine.begin() tlengine.execute(users.insert(), user_id=1, user_name='user1') tlengine.begin_nested() tlengine.execute(users.insert(), user_id=2, user_name='user2') tlengine.commit() tlengine.execute(users.insert(), user_id=3, user_name='user3') tlengine.commit() tlengine.close() eq_(tlengine.execute(select([users.c.user_id]). order_by(users.c.user_id)).fetchall(), [(1, ), (2, ), (3, )]) tlengine.close() @testing.requires.savepoints def test_rollback_to_subtransaction(self): tlengine.begin() tlengine.execute(users.insert(), user_id=1, user_name='user1') tlengine.begin_nested() tlengine.execute(users.insert(), user_id=2, user_name='user2') tlengine.begin() tlengine.execute(users.insert(), user_id=3, user_name='user3') tlengine.rollback() tlengine.rollback() tlengine.execute(users.insert(), user_id=4, user_name='user4') tlengine.commit() tlengine.close() eq_(tlengine.execute(select([users.c.user_id]). order_by(users.c.user_id)).fetchall(), [(1, ), (4, )]) tlengine.close() def test_connections(self): """tests that contextual_connect is threadlocal""" c1 = tlengine.contextual_connect() c2 = tlengine.contextual_connect() assert c1.connection is c2.connection c2.close() assert not c1.closed assert not tlengine.closed @testing.requires.independent_cursors def test_result_closing(self): """tests that contextual_connect is threadlocal""" r1 = tlengine.execute(select([1])) r2 = tlengine.execute(select([1])) row1 = r1.fetchone() row2 = r2.fetchone() r1.close() assert r2.connection is r1.connection assert not r2.connection.closed assert not tlengine.closed # close again, nothing happens since resultproxy calls close() # only once r1.close() assert r2.connection is r1.connection assert not r2.connection.closed assert not tlengine.closed r2.close() assert r2.connection.closed assert tlengine.closed @testing.crashes('oracle+cx_oracle', 'intermittent failures on the buildbot') def test_dispose(self): eng = testing_engine(options=dict(strategy='threadlocal')) result = eng.execute(select([1])) eng.dispose() eng.execute(select([1])) @testing.requires.two_phase_transactions def test_two_phase_transaction(self): tlengine.begin_twophase() tlengine.execute(users.insert(), user_id=1, user_name='user1') tlengine.prepare() tlengine.commit() tlengine.begin_twophase() tlengine.execute(users.insert(), user_id=2, user_name='user2') tlengine.commit() tlengine.begin_twophase() tlengine.execute(users.insert(), user_id=3, user_name='user3') tlengine.rollback() tlengine.begin_twophase() tlengine.execute(users.insert(), user_id=4, user_name='user4') tlengine.prepare() tlengine.rollback() eq_(tlengine.execute(select([users.c.user_id]). order_by(users.c.user_id)).fetchall(), [(1, ), (2, )]) class IsolationLevelTest(fixtures.TestBase): __requires__ = ('isolation_level', 'ad_hoc_engines') __backend__ = True def _default_isolation_level(self): if testing.against('sqlite'): return 'SERIALIZABLE' elif testing.against('postgresql'): return 'READ COMMITTED' elif testing.against('mysql'): return "REPEATABLE READ" elif testing.against('mssql'): return "READ COMMITTED" else: assert False, "default isolation level not known" def _non_default_isolation_level(self): if testing.against('sqlite'): return 'READ UNCOMMITTED' elif testing.against('postgresql'): return 'SERIALIZABLE' elif testing.against('mysql'): return "SERIALIZABLE" elif testing.against('mssql'): return "SERIALIZABLE" else: assert False, "non default isolation level not known" def test_engine_param_stays(self): eng = testing_engine() isolation_level = eng.dialect.get_isolation_level( eng.connect().connection) level = self._non_default_isolation_level() ne_(isolation_level, level) eng = testing_engine(options=dict(isolation_level=level)) eq_( eng.dialect.get_isolation_level( eng.connect().connection), level ) # check that it stays conn = eng.connect() eq_( eng.dialect.get_isolation_level(conn.connection), level ) conn.close() conn = eng.connect() eq_( eng.dialect.get_isolation_level(conn.connection), level ) conn.close() def test_default_level(self): eng = testing_engine(options=dict()) isolation_level = eng.dialect.get_isolation_level( eng.connect().connection) eq_(isolation_level, self._default_isolation_level()) def test_reset_level(self): eng = testing_engine(options=dict()) conn = eng.connect() eq_( eng.dialect.get_isolation_level(conn.connection), self._default_isolation_level() ) eng.dialect.set_isolation_level( conn.connection, self._non_default_isolation_level() ) eq_( eng.dialect.get_isolation_level(conn.connection), self._non_default_isolation_level() ) eng.dialect.reset_isolation_level(conn.connection) eq_( eng.dialect.get_isolation_level(conn.connection), self._default_isolation_level() ) conn.close() def test_reset_level_with_setting(self): eng = testing_engine( options=dict( isolation_level=self._non_default_isolation_level())) conn = eng.connect() eq_(eng.dialect.get_isolation_level(conn.connection), self._non_default_isolation_level()) eng.dialect.set_isolation_level( conn.connection, self._default_isolation_level()) eq_(eng.dialect.get_isolation_level(conn.connection), self._default_isolation_level()) eng.dialect.reset_isolation_level(conn.connection) eq_(eng.dialect.get_isolation_level(conn.connection), self._non_default_isolation_level()) conn.close() def test_invalid_level(self): eng = testing_engine(options=dict(isolation_level='FOO')) assert_raises_message( exc.ArgumentError, "Invalid value '%s' for isolation_level. " "Valid isolation levels for %s are %s" % ("FOO", eng.dialect.name, ", ".join(eng.dialect._isolation_lookup)), eng.connect ) def test_connection_invalidated(self): eng = testing_engine() conn = eng.connect() c2 = conn.execution_options( isolation_level=self._non_default_isolation_level()) c2.invalidate() c2.connection # TODO: do we want to rebuild the previous isolation? # for now, this is current behavior so we will leave it. eq_(c2.get_isolation_level(), self._default_isolation_level()) def test_per_connection(self): from sqlalchemy.pool import QueuePool eng = testing_engine( options=dict( poolclass=QueuePool, pool_size=2, max_overflow=0)) c1 = eng.connect() c1 = c1.execution_options( isolation_level=self._non_default_isolation_level() ) c2 = eng.connect() eq_( eng.dialect.get_isolation_level(c1.connection), self._non_default_isolation_level() ) eq_( eng.dialect.get_isolation_level(c2.connection), self._default_isolation_level() ) c1.close() c2.close() c3 = eng.connect() eq_( eng.dialect.get_isolation_level(c3.connection), self._default_isolation_level() ) c4 = eng.connect() eq_( eng.dialect.get_isolation_level(c4.connection), self._default_isolation_level() ) c3.close() c4.close() def test_warning_in_transaction(self): eng = testing_engine() c1 = eng.connect() with expect_warnings( "Connection is already established with a Transaction; " "setting isolation_level may implicitly rollback or commit " "the existing transaction, or have no effect until next " "transaction" ): with c1.begin(): c1 = c1.execution_options( isolation_level=self._non_default_isolation_level() ) eq_( eng.dialect.get_isolation_level(c1.connection), self._non_default_isolation_level() ) # stays outside of transaction eq_( eng.dialect.get_isolation_level(c1.connection), self._non_default_isolation_level() ) def test_per_statement_bzzt(self): assert_raises_message( exc.ArgumentError, r"'isolation_level' execution option may only be specified " r"on Connection.execution_options\(\), or " r"per-engine using the isolation_level " r"argument to create_engine\(\).", select([1]).execution_options, isolation_level=self._non_default_isolation_level() ) def test_per_engine(self): # new in 0.9 eng = create_engine( testing.db.url, execution_options={ 'isolation_level': self._non_default_isolation_level()} ) conn = eng.connect() eq_( eng.dialect.get_isolation_level(conn.connection), self._non_default_isolation_level() ) def test_isolation_level_accessors_connection_default(self): eng = create_engine( testing.db.url ) with eng.connect() as conn: eq_(conn.default_isolation_level, self._default_isolation_level()) with eng.connect() as conn: eq_(conn.get_isolation_level(), self._default_isolation_level()) def test_isolation_level_accessors_connection_option_modified(self): eng = create_engine( testing.db.url ) with eng.connect() as conn: c2 = conn.execution_options( isolation_level=self._non_default_isolation_level()) eq_(conn.default_isolation_level, self._default_isolation_level()) eq_(conn.get_isolation_level(), self._non_default_isolation_level()) eq_(c2.get_isolation_level(), self._non_default_isolation_level())