diff options
Diffstat (limited to 'test/engine')
| -rw-r--r-- | test/engine/alltests.py | 2 | ||||
| -rw-r--r-- | test/engine/autoconnect_engine.py | 90 | ||||
| -rw-r--r-- | test/engine/bind.py | 49 | ||||
| -rw-r--r-- | test/engine/execute.py | 17 | ||||
| -rw-r--r-- | test/engine/metadata.py | 18 | ||||
| -rw-r--r-- | test/engine/parseconnect.py | 12 | ||||
| -rw-r--r-- | test/engine/pool.py | 44 | ||||
| -rw-r--r-- | test/engine/proxy_engine.py | 204 | ||||
| -rw-r--r-- | test/engine/reconnect.py | 12 | ||||
| -rw-r--r-- | test/engine/reflection.py | 130 | ||||
| -rw-r--r-- | test/engine/transaction.py | 294 |
11 files changed, 451 insertions, 421 deletions
diff --git a/test/engine/alltests.py b/test/engine/alltests.py index ec8a47390..a34a82ed7 100644 --- a/test/engine/alltests.py +++ b/test/engine/alltests.py @@ -10,12 +10,12 @@ def suite(): 'engine.bind', 'engine.reconnect', 'engine.execute', + 'engine.metadata', 'engine.transaction', # schema/tables 'engine.reflection', - 'engine.proxy_engine' ) alltests = unittest.TestSuite() for name in modules_to_test: diff --git a/test/engine/autoconnect_engine.py b/test/engine/autoconnect_engine.py deleted file mode 100644 index 69c2c33f5..000000000 --- a/test/engine/autoconnect_engine.py +++ /dev/null @@ -1,90 +0,0 @@ -from testbase import PersistTest -import testbase -from sqlalchemy import * -from sqlalchemy.ext.proxy import AutoConnectEngine - -import os - -# -# Define an engine, table and mapper at the module level, to show that the -# table and mapper can be used with different real engines in multiple threads -# - - -module_engine = AutoConnectEngine( testbase.db_uri ) -users = Table('users', module_engine, - Column('user_id', Integer, primary_key=True), - Column('user_name', String(16)), - Column('password', String(20)) - ) - -class User(object): - pass - - -class AutoConnectEngineTest1(PersistTest): - - def setUp(self): - clear_mappers() - objectstore.clear() - - def test_engine_connect(self): - users.create() - assign_mapper(User, users) - try: - trans = objectstore.begin() - - user = User() - user.user_name='fred' - user.password='*' - trans.commit() - - # select - sqluser = User.select_by(user_name='fred')[0] - assert sqluser.user_name == 'fred' - - # modify - sqluser.user_name = 'fred jones' - - # commit - saves everything that changed - objectstore.commit() - - allusers = [ user.user_name for user in User.select() ] - assert allusers == [ 'fred jones' ] - finally: - users.drop() - - - - -if __name__ == "__main__": - testbase.main() - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/test/engine/bind.py b/test/engine/bind.py index b9e53e6b1..6a0c78f57 100644 --- a/test/engine/bind.py +++ b/test/engine/bind.py @@ -2,12 +2,10 @@ including the deprecated versions of these arguments""" import testbase -import unittest, sys, datetime -import tables -db = testbase.db from sqlalchemy import * +from testlib import * -class BindTest(testbase.PersistTest): +class BindTest(PersistTest): def test_create_drop_explicit(self): metadata = MetaData() table = Table('test_table', metadata, @@ -17,7 +15,6 @@ class BindTest(testbase.PersistTest): testbase.db.connect() ): for args in [ - ([], {'connectable':bind}), ([], {'bind':bind}), ([bind], {}) ]: @@ -57,7 +54,7 @@ class BindTest(testbase.PersistTest): table = Table('test_table', metadata, Column('foo', Integer)) metadata.bind = bind - assert metadata.bind is metadata.engine is table.bind is table.engine is bind + assert metadata.bind is table.bind is bind metadata.create_all() assert table.exists() metadata.drop_all() @@ -70,7 +67,7 @@ class BindTest(testbase.PersistTest): Column('foo', Integer)) metadata.connect(bind) - assert metadata.bind is metadata.engine is table.bind is table.engine is bind + assert metadata.bind is table.bind is bind metadata.create_all() assert table.exists() metadata.drop_all() @@ -88,15 +85,12 @@ class BindTest(testbase.PersistTest): try: for args in ( ([bind], {}), - ([], {'engine_or_url':bind}), ([], {'bind':bind}), - ([], {'engine':bind}) ): metadata = MetaData(*args[0], **args[1]) table = Table('test_table', metadata, - Column('foo', Integer)) - - assert metadata.bind is metadata.engine is table.bind is table.engine is bind + Column('foo', Integer)) + assert metadata.bind is table.bind is bind metadata.create_all() assert table.exists() metadata.drop_all() @@ -111,7 +105,8 @@ class BindTest(testbase.PersistTest): metadata = MetaData() table = Table('test_table', metadata, Column('foo', Integer), - mysql_engine='InnoDB') + test_needs_acid=True, + ) conn = testbase.db.connect() metadata.create_all(bind=conn) try: @@ -124,7 +119,7 @@ class BindTest(testbase.PersistTest): table.insert().execute(foo=7) trans.rollback() metadata.bind = None - assert testbase.db.execute("select count(1) from test_table").scalar() == 0 + assert conn.execute("select count(1) from test_table").scalar() == 0 finally: metadata.drop_all(bind=conn) @@ -147,10 +142,7 @@ class BindTest(testbase.PersistTest): ): try: e = elem(bind=bind) - assert e.bind is e.engine is bind - e.execute() - e = elem(engine=bind) - assert e.bind is e.engine is bind + assert e.bind is bind e.execute() finally: if isinstance(bind, engine.Connection): @@ -158,16 +150,19 @@ class BindTest(testbase.PersistTest): try: e = elem() - assert e.bind is e.engine is None + assert e.bind is None e.execute() assert False except exceptions.InvalidRequestError, e: assert str(e) == "This Compiled object is not bound to any Engine or Connection." - + finally: + if isinstance(bind, engine.Connection): + bind.close() metadata.drop_all(bind=testbase.db) def test_session(self): + from sqlalchemy.orm import create_session, mapper metadata = MetaData() table = Table('test_table', metadata, Column('foo', Integer, primary_key=True), @@ -177,11 +172,13 @@ class BindTest(testbase.PersistTest): mapper(Foo, table) metadata.create_all(bind=testbase.db) try: - for bind in (testbase.db, testbase.db.connect()): + for bind in (testbase.db, + testbase.db.connect() + ): try: - for args in ({'bind':bind}, {'bind_to':bind}): + for args in ({'bind':bind},): sess = create_session(**args) - assert sess.bind is sess.bind_to is bind + assert sess.bind is bind f = Foo() sess.save(f) sess.flush() @@ -189,6 +186,9 @@ class BindTest(testbase.PersistTest): finally: if isinstance(bind, engine.Connection): bind.close() + + if isinstance(bind, engine.Connection): + bind.close() sess = create_session() f = Foo() @@ -198,8 +198,9 @@ class BindTest(testbase.PersistTest): assert False except exceptions.InvalidRequestError, e: assert str(e).startswith("Could not locate any Engine or Connection bound to mapper") - finally: + if isinstance(bind, engine.Connection): + bind.close() metadata.drop_all(bind=testbase.db) diff --git a/test/engine/execute.py b/test/engine/execute.py index 283006cfa..3d3b43f9b 100644 --- a/test/engine/execute.py +++ b/test/engine/execute.py @@ -1,19 +1,14 @@ - import testbase -import unittest, sys, datetime -import tables -db = testbase.db from sqlalchemy import * +from testlib import * - -class ExecuteTest(testbase.PersistTest): +class ExecuteTest(PersistTest): def setUpAll(self): global users, metadata metadata = MetaData(testbase.db) users = Table('users', metadata, Column('user_id', INT, primary_key = True), Column('user_name', VARCHAR(20)), - mysql_engine='InnoDB' ) metadata.create_all() @@ -22,7 +17,7 @@ class ExecuteTest(testbase.PersistTest): def tearDownAll(self): metadata.drop_all() - @testbase.supported('sqlite') + @testing.supported('sqlite') def test_raw_qmark(self): for conn in (testbase.db, testbase.db.connect()): conn.execute("insert into users (user_id, user_name) values (?, ?)", (1,"jack")) @@ -34,7 +29,7 @@ class ExecuteTest(testbase.PersistTest): assert res.fetchall() == [(1, "jack"), (2, "fred"), (3, "ed"), (4, "horse"), (5, "barney"), (6, "donkey"), (7, 'sally')] conn.execute("delete from users") - @testbase.supported('mysql', 'postgres') + @testing.supported('mysql', 'postgres') def test_raw_sprintf(self): for conn in (testbase.db, testbase.db.connect()): conn.execute("insert into users (user_id, user_name) values (%s, %s)", [1,"jack"]) @@ -47,7 +42,7 @@ class ExecuteTest(testbase.PersistTest): # pyformat is supported for mysql, but skipping because a few driver # versions have a bug that bombs out on this test. (1.2.2b3, 1.2.2c1, 1.2.2) - @testbase.supported('postgres') + @testing.supported('postgres') def test_raw_python(self): for conn in (testbase.db, testbase.db.connect()): conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)", {'id':1, 'name':'jack'}) @@ -57,7 +52,7 @@ class ExecuteTest(testbase.PersistTest): assert res.fetchall() == [(1, "jack"), (2, "ed"), (3, "horse"), (4, 'sally')] conn.execute("delete from users") - @testbase.supported('sqlite') + @testing.supported('sqlite') def test_raw_named(self): for conn in (testbase.db, testbase.db.connect()): conn.execute("insert into users (user_id, user_name) values (:id, :name)", {'id':1, 'name':'jack'}) diff --git a/test/engine/metadata.py b/test/engine/metadata.py new file mode 100644 index 000000000..973007fab --- /dev/null +++ b/test/engine/metadata.py @@ -0,0 +1,18 @@ +import testbase +from sqlalchemy import * +from testlib import * + +class MetaDataTest(PersistTest): + def test_metadata_connect(self): + metadata = MetaData() + t1 = Table('table1', metadata, Column('col1', Integer, primary_key=True), + Column('col2', String(20))) + metadata.bind = testbase.db + metadata.create_all() + try: + assert t1.count().scalar() == 0 + finally: + metadata.drop_all() + +if __name__ == '__main__': + testbase.main() diff --git a/test/engine/parseconnect.py b/test/engine/parseconnect.py index 967a20ed5..3e186275d 100644 --- a/test/engine/parseconnect.py +++ b/test/engine/parseconnect.py @@ -1,8 +1,7 @@ -from testbase import PersistTest import testbase -import sqlalchemy.engine.url as url from sqlalchemy import * -import unittest +import sqlalchemy.engine.url as url +from testlib import * class ParseConnectTest(PersistTest): @@ -65,7 +64,7 @@ class CreateEngineTest(PersistTest): def testrecycle(self): dbapi = MockDBAPI(foober=12, lala=18, hoho={'this':'dict'}, fooz='somevalue') e = create_engine('postgres://', pool_recycle=472, module=dbapi) - assert e.connection_provider._pool._recycle == 472 + assert e.pool._recycle == 472 def testbadargs(self): # good arg, use MockDBAPI to prevent oracle import errors @@ -116,7 +115,6 @@ class CreateEngineTest(PersistTest): except TypeError: assert True - e = create_engine('sqlite://', echo=True) e = create_engine('mysql://', module=MockDBAPI(), connect_args={'use_unicode':True}, convert_unicode=True) e = create_engine('sqlite://', connect_args={'use_unicode':True}, convert_unicode=True) @@ -139,8 +137,8 @@ class CreateEngineTest(PersistTest): def testpoolargs(self): """test that connection pool args make it thru""" e = create_engine('postgres://', creator=None, pool_recycle=-1, echo_pool=None, auto_close_cursors=False, disallow_open_cursors=True, module=MockDBAPI()) - assert e.connection_provider._pool.auto_close_cursors is False - assert e.connection_provider._pool.disallow_open_cursors is True + assert e.pool.auto_close_cursors is False + assert e.pool.disallow_open_cursors is True # these args work for QueuePool e = create_engine('postgres://', max_overflow=8, pool_timeout=60, poolclass=pool.QueuePool, module=MockDBAPI()) diff --git a/test/engine/pool.py b/test/engine/pool.py index 85e9d59fd..364afa9d7 100644 --- a/test/engine/pool.py +++ b/test/engine/pool.py @@ -1,10 +1,9 @@ import testbase -from testbase import PersistTest -import unittest, sys, os, time -import threading, thread - +import threading, thread, time import sqlalchemy.pool as pool import sqlalchemy.exceptions as exceptions +from testlib import * + mcid = 1 class MockDBAPI(object): @@ -45,7 +44,7 @@ class PoolTest(PersistTest): connection2 = manager.connect('foo.db') connection3 = manager.connect('bar.db') - self.echo( "connection " + repr(connection)) + print "connection " + repr(connection) self.assert_(connection.cursor() is not None) self.assert_(connection is connection2) self.assert_(connection2 is not connection3) @@ -64,7 +63,7 @@ class PoolTest(PersistTest): connection = manager.connect('foo.db') connection2 = manager.connect('foo.db') - self.echo( "connection " + repr(connection)) + print "connection " + repr(connection) self.assert_(connection.cursor() is not None) self.assert_(connection is not connection2) @@ -80,7 +79,7 @@ class PoolTest(PersistTest): def status(pool): tup = (pool.size(), pool.checkedin(), pool.overflow(), pool.checkedout()) - self.echo( "Pool size: %d Connections in pool: %d Current Overflow: %d Current Checked out connections: %d" % tup) + print "Pool size: %d Connections in pool: %d Current Overflow: %d Current Checked out connections: %d" % tup return tup c1 = p.connect() @@ -160,7 +159,7 @@ class PoolTest(PersistTest): print timeouts assert len(timeouts) > 0 for t in timeouts: - assert abs(t - 3) < 1 + assert abs(t - 3) < 1, "Not all timeouts were 3 seconds: " + repr(timeouts) def _test_overflow(self, thread_count, max_overflow): def creator(): @@ -352,6 +351,35 @@ class PoolTest(PersistTest): c2 = None c1 = None self.assert_(p.checkedout() == 0) + + def test_properties(self): + dbapi = MockDBAPI() + p = pool.QueuePool(creator=lambda: dbapi.connect('foo.db'), + pool_size=1, max_overflow=0) + + c = p.connect() + self.assert_(not c.properties) + self.assert_(c.properties is c._connection_record.properties) + + c.properties['foo'] = 'bar' + c.close() + del c + + c = p.connect() + self.assert_('foo' in c.properties) + + c.invalidate() + c = p.connect() + self.assert_('foo' not in c.properties) + + c.properties['foo2'] = 'bar2' + c.detach() + self.assert_('foo2' in c.properties) + + c2 = p.connect() + self.assert_(c.connection is not c2.connection) + self.assert_(not c2.properties) + self.assert_('foo2' in c.properties) def tearDown(self): pool.clear_managers() diff --git a/test/engine/proxy_engine.py b/test/engine/proxy_engine.py deleted file mode 100644 index 26b738e41..000000000 --- a/test/engine/proxy_engine.py +++ /dev/null @@ -1,204 +0,0 @@ -from testbase import PersistTest -import testbase -import os - -from sqlalchemy import * -from sqlalchemy.ext.proxy import ProxyEngine - - -# -# Define an engine, table and mapper at the module level, to show that the -# table and mapper can be used with different real engines in multiple threads -# - - -class ProxyTestBase(PersistTest): - def setUpAll(self): - - global users, User, module_engine, module_metadata - - module_engine = ProxyEngine(echo=testbase.echo) - module_metadata = MetaData() - - users = Table('users', module_metadata, - Column('user_id', Integer, primary_key=True), - Column('user_name', String(16)), - Column('password', String(20)) - ) - - class User(object): - pass - - User.mapper = mapper(User, users) - def tearDownAll(self): - clear_mappers() - -class ConstructTest(ProxyTestBase): - """tests that we can build SQL constructs without engine-specific parameters, particulary - oid_column, being needed, as the proxy engine is usually not connected yet.""" - - def test_join(self): - engine = ProxyEngine() - t = Table('table1', engine, - Column('col1', Integer, primary_key=True)) - t2 = Table('table2', engine, - Column('col2', Integer, ForeignKey('table1.col1'))) - j = join(t, t2) - - -class ProxyEngineTest1(ProxyTestBase): - - def test_engine_connect(self): - # connect to a real engine - module_engine.connect(testbase.db_uri) - module_metadata.create_all(module_engine) - - session = create_session(bind_to=module_engine) - try: - - user = User() - user.user_name='fred' - user.password='*' - - session.save(user) - session.flush() - - query = session.query(User) - - # select - sqluser = query.select_by(user_name='fred')[0] - assert sqluser.user_name == 'fred' - - # modify - sqluser.user_name = 'fred jones' - - # flush - saves everything that changed - session.flush() - - allusers = [ user.user_name for user in query.select() ] - assert allusers == ['fred jones'] - - finally: - module_metadata.drop_all(module_engine) - - -class ThreadProxyTest(ProxyTestBase): - - def tearDownAll(self): - try: - os.remove('threadtesta.db') - except OSError: - pass - try: - os.remove('threadtestb.db') - except OSError: - pass - - @testbase.supported('sqlite') - def test_multi_thread(self): - - from threading import Thread - from Queue import Queue - - # start 2 threads with different connection params - # and perform simultaneous operations, showing that the - # 2 threads don't share a connection - qa = Queue() - qb = Queue() - def run(db_uri, uname, queue): - def test(): - - try: - module_engine.connect(db_uri) - module_metadata.create_all(module_engine) - try: - session = create_session(bind_to=module_engine) - - query = session.query(User) - - all = list(query.select()) - assert all == [] - - u = User() - u.user_name = uname - u.password = 'whatever' - - session.save(u) - session.flush() - - names = [u.user_name for u in query.select()] - assert names == [uname] - finally: - module_metadata.drop_all(module_engine) - module_engine.get_engine().dispose() - except Exception, e: - import traceback - traceback.print_exc() - queue.put(e) - else: - queue.put(False) - return test - - a = Thread(target=run('sqlite:///threadtesta.db', 'jim', qa)) - b = Thread(target=run('sqlite:///threadtestb.db', 'joe', qb)) - - a.start() - b.start() - - # block and wait for the threads to push their results - res = qa.get() - if res != False: - raise res - - res = qb.get() - if res != False: - raise res - - -class ProxyEngineTest2(ProxyTestBase): - - def test_table_singleton_a(self): - """set up for table singleton check - """ - # - # For this 'test', create a proxy engine instance, connect it - # to a real engine, and make it do some work - # - engine = ProxyEngine() - cats = Table('cats', engine, - Column('cat_id', Integer, primary_key=True), - Column('cat_name', String)) - - engine.connect(testbase.db_uri) - - cats.create(engine) - cats.drop(engine) - - ProxyEngineTest2.cats_table_a = cats - assert isinstance(cats, Table) - - def test_table_singleton_b(self): - """check that a table on a 2nd proxy engine instance gets 2nd table - instance - """ - # - # Now create a new proxy engine instance and attach the same - # table as the first test. This should result in 2 table instances, - # since different proxy engine instances can't attach to the - # same table instance - # - engine = ProxyEngine() - cats = Table('cats', engine, - Column('cat_id', Integer, primary_key=True), - Column('cat_name', String)) - assert id(cats) != id(ProxyEngineTest2.cats_table_a) - - # the real test -- if we're still using the old engine reference, - # this will fail because the old reference's local storage will - # not have the default attributes - engine.connect(testbase.db_uri) - cats.create(engine) - cats.drop(engine) - -if __name__ == "__main__": - testbase.main() diff --git a/test/engine/reconnect.py b/test/engine/reconnect.py index defc878ab..7c213695f 100644 --- a/test/engine/reconnect.py +++ b/test/engine/reconnect.py @@ -1,6 +1,8 @@ import testbase +import sys, weakref from sqlalchemy import create_engine, exceptions -import gc, weakref, sys +from testlib import * + class MockDisconnect(Exception): pass @@ -37,7 +39,7 @@ class MockCursor(object): def close(self): pass -class ReconnectTest(testbase.PersistTest): +class ReconnectTest(PersistTest): def test_reconnect(self): """test that an 'is_disconnect' condition will invalidate the connection, and additionally dispose the previous connection pool and recreate.""" @@ -50,7 +52,7 @@ class ReconnectTest(testbase.PersistTest): # monkeypatch disconnect checker db.dialect.is_disconnect = lambda e: isinstance(e, MockDisconnect) - pid = id(db.connection_provider._pool) + pid = id(db.pool) # make a connection conn = db.connect() @@ -81,7 +83,7 @@ class ReconnectTest(testbase.PersistTest): # close shouldnt break conn.close() - assert id(db.connection_provider._pool) != pid + assert id(db.pool) != pid # ensure all connections closed (pool was recycled) assert len(dbapi.connections) == 0 @@ -92,4 +94,4 @@ class ReconnectTest(testbase.PersistTest): assert len(dbapi.connections) == 1 if __name__ == '__main__': - testbase.main()
\ No newline at end of file + testbase.main() diff --git a/test/engine/reflection.py b/test/engine/reflection.py index 74ae75e2e..00c1276ee 100644 --- a/test/engine/reflection.py +++ b/test/engine/reflection.py @@ -1,13 +1,12 @@ -from testbase import PersistTest import testbase -import pickle -import sqlalchemy.ansisql as ansisql +import pickle, StringIO from sqlalchemy import * +import sqlalchemy.ansisql as ansisql from sqlalchemy.exceptions import NoSuchTableError import sqlalchemy.databases.mysql as mysql +from testlib import * -import unittest, re, StringIO class ReflectionTest(PersistTest): def testbasic(self): @@ -15,6 +14,10 @@ class ReflectionTest(PersistTest): use_string_defaults = use_function_defaults or testbase.db.engine.__module__.endswith('sqlite') + if (testbase.db.engine.name == 'mysql' and + testbase.db.dialect.get_version_info(testbase.db) < (4, 1, 1)): + return + if use_function_defaults: defval = func.current_date() deftype = Date @@ -54,14 +57,14 @@ class ReflectionTest(PersistTest): Column('test_passivedefault4', deftype3, PassiveDefault(defval3)), Column('test9', Binary(100)), Column('test_numeric', Numeric(None, None)), - mysql_engine='InnoDB' + test_needs_fk=True, ) - + addresses = Table('engine_email_addresses', meta, Column('address_id', Integer, primary_key = True), Column('remote_user_id', Integer, ForeignKey(users.c.user_id)), Column('email_address', String(20)), - mysql_engine='InnoDB' + test_needs_fk=True, ) meta.drop_all() @@ -106,6 +109,29 @@ class ReflectionTest(PersistTest): addresses.drop() users.drop() + def test_autoload_partial(self): + meta = MetaData(testbase.db) + foo = Table('foo', meta, + Column('a', String(30)), + Column('b', String(30)), + Column('c', String(30)), + Column('d', String(30)), + Column('e', String(30)), + Column('f', String(30)), + ) + meta.create_all() + try: + meta2 = MetaData(testbase.db) + foo2 = Table('foo', meta2, autoload=True, include_columns=['b', 'f', 'e']) + # test that cols come back in original order + assert [c.name for c in foo2.c] == ['b', 'e', 'f'] + for c in ('b', 'f', 'e'): + assert c in foo2.c + for c in ('a', 'c', 'd'): + assert c not in foo2.c + finally: + meta.drop_all() + def testoverridecolumns(self): """test that you can override columns which contain foreign keys to other reflected tables""" meta = MetaData(testbase.db) @@ -203,7 +229,7 @@ class ReflectionTest(PersistTest): finally: meta.drop_all() - @testbase.supported('mysql') + @testing.supported('mysql') def testmysqltypes(self): meta1 = MetaData(testbase.db) table = Table( @@ -250,7 +276,7 @@ class ReflectionTest(PersistTest): PRIMARY KEY(id) )""") try: - metadata = MetaData(engine=testbase.db) + metadata = MetaData(bind=testbase.db) book = Table('book', metadata, autoload=True) assert book.c.id in book.primary_key assert book.c.series not in book.primary_key @@ -271,7 +297,7 @@ class ReflectionTest(PersistTest): PRIMARY KEY(id, isbn) )""") try: - metadata = MetaData(engine=testbase.db) + metadata = MetaData(bind=testbase.db) book = Table('book', metadata, autoload=True) assert book.c.id in book.primary_key assert book.c.isbn in book.primary_key @@ -280,7 +306,7 @@ class ReflectionTest(PersistTest): finally: testbase.db.execute("drop table book") - @testbase.supported('sqlite') + @testing.supported('sqlite') def test_goofy_sqlite(self): """test autoload of table where quotes were used with all the colnames. quirky in sqlite.""" testbase.db.execute("""CREATE TABLE "django_content_type" ( @@ -309,7 +335,12 @@ class ReflectionTest(PersistTest): def test_composite_fk(self): """test reflection of composite foreign keys""" + + if (testbase.db.engine.name == 'mysql' and + testbase.db.dialect.get_version_info(testbase.db) < (4, 1, 1)): + return meta = MetaData(testbase.db) + table = Table( 'multi', meta, Column('multi_id', Integer, primary_key=True), @@ -317,7 +348,7 @@ class ReflectionTest(PersistTest): Column('multi_hoho', Integer, primary_key=True), Column('name', String(50), nullable=False), Column('val', String(100)), - mysql_engine='InnoDB' + test_needs_fk=True, ) table2 = Table('multi2', meta, Column('id', Integer, primary_key=True), @@ -326,7 +357,7 @@ class ReflectionTest(PersistTest): Column('lala', Integer), Column('data', String(50)), ForeignKeyConstraint(['foo', 'bar', 'lala'], ['multi.multi_id', 'multi.multi_rev', 'multi.multi_hoho']), - mysql_engine='InnoDB' + test_needs_fk=True, ) assert table.c.multi_hoho meta.create_all() @@ -345,7 +376,6 @@ class ReflectionTest(PersistTest): finally: meta.drop_all() - def test_to_metadata(self): meta = MetaData() @@ -372,17 +402,17 @@ class ReflectionTest(PersistTest): def test_pickle(): meta.connect(testbase.db) meta2 = pickle.loads(pickle.dumps(meta)) - assert meta2.engine is None + assert meta2.bind is None return (meta2.tables['mytable'], meta2.tables['othertable']) def test_pickle_via_reflect(): # this is the most common use case, pickling the results of a # database reflection - meta2 = MetaData(engine=testbase.db) + meta2 = MetaData(bind=testbase.db) t1 = Table('mytable', meta2, autoload=True) t2 = Table('othertable', meta2, autoload=True) meta3 = pickle.loads(pickle.dumps(meta2)) - assert meta3.engine is None + assert meta3.bind is None assert meta3.tables['mytable'] is not t1 return (meta3.tables['mytable'], meta3.tables['othertable']) @@ -392,6 +422,8 @@ class ReflectionTest(PersistTest): table_c, table2_c = test() assert table is not table_c assert table_c.c.myid.primary_key + assert isinstance(table_c.c.myid.type, Integer) + assert isinstance(table_c.c.name.type, String) assert not table_c.c.name.nullable assert table_c.c.description.nullable assert table.primary_key is not table_c.primary_key @@ -418,14 +450,10 @@ class ReflectionTest(PersistTest): finally: meta.drop_all(testbase.db) - # mysql throws its own exception for no such table, resulting in - # a sqlalchemy.SQLError instead of sqlalchemy.NoSuchTableError. - # this could probably be fixed at some point. - @testbase.unsupported('mysql') def test_nonexistent(self): self.assertRaises(NoSuchTableError, Table, 'fake_table', - testbase.db, autoload=True) + MetaData(testbase.db), autoload=True) def testoverride(self): meta = MetaData(testbase.db) @@ -452,7 +480,7 @@ class ReflectionTest(PersistTest): finally: table.drop() - @testbase.supported('mssql') + @testing.supported('mssql') def testidentity(self): meta = MetaData(testbase.db) table = Table( @@ -505,20 +533,6 @@ class ReflectionTest(PersistTest): finally: meta.drop_all() - - meta = MetaData(testbase.db) - table = Table( - 'select', meta, - Column('col1', Integer, primary_key=True) - ) - table.create() - - meta2 = MetaData(testbase.db) - try: - table2 = Table('select', meta2, autoload=True) - finally: - table.drop() - class CreateDropTest(PersistTest): def setUpAll(self): global metadata, users @@ -558,33 +572,33 @@ class CreateDropTest(PersistTest): def testcheckfirst(self): try: assert not users.exists(testbase.db) - users.create(connectable=testbase.db) + users.create(bind=testbase.db) assert users.exists(testbase.db) - users.create(connectable=testbase.db, checkfirst=True) - users.drop(connectable=testbase.db) - users.drop(connectable=testbase.db, checkfirst=True) - assert not users.exists(connectable=testbase.db) - users.create(connectable=testbase.db, checkfirst=True) - users.drop(connectable=testbase.db) + users.create(bind=testbase.db, checkfirst=True) + users.drop(bind=testbase.db) + users.drop(bind=testbase.db, checkfirst=True) + assert not users.exists(bind=testbase.db) + users.create(bind=testbase.db, checkfirst=True) + users.drop(bind=testbase.db) finally: - metadata.drop_all(connectable=testbase.db) + metadata.drop_all(bind=testbase.db) def test_createdrop(self): - metadata.create_all(connectable=testbase.db) + metadata.create_all(bind=testbase.db) self.assertEqual( testbase.db.has_table('items'), True ) self.assertEqual( testbase.db.has_table('email_addresses'), True ) - metadata.create_all(connectable=testbase.db) + metadata.create_all(bind=testbase.db) self.assertEqual( testbase.db.has_table('items'), True ) - metadata.drop_all(connectable=testbase.db) + metadata.drop_all(bind=testbase.db) self.assertEqual( testbase.db.has_table('items'), False ) self.assertEqual( testbase.db.has_table('email_addresses'), False ) - metadata.drop_all(connectable=testbase.db) + metadata.drop_all(bind=testbase.db) self.assertEqual( testbase.db.has_table('items'), False ) class SchemaTest(PersistTest): # this test should really be in the sql tests somewhere, not engine - @testbase.unsupported('sqlite') + @testing.unsupported('sqlite') def testiteration(self): metadata = MetaData() table1 = Table('table1', metadata, @@ -607,14 +621,17 @@ class SchemaTest(PersistTest): print buf assert buf.index("CREATE TABLE someschema.table1") > -1 assert buf.index("CREATE TABLE someschema.table2") > -1 - - @testbase.unsupported('sqlite', 'postgres') - def test_create_with_defaultschema(self): + + @testing.supported('mysql','postgres') + def testcreate(self): engine = testbase.db schema = engine.dialect.get_default_schema_name(engine) + #engine.echo = True - # test reflection of tables with an explcit schemaname - # matching the default + if testbase.db.name == 'mysql': + schema = testbase.db.url.database + else: + schema = 'public' metadata = MetaData(testbase.db) table1 = Table('table1', metadata, Column('col1', Integer, primary_key=True), @@ -628,10 +645,7 @@ class SchemaTest(PersistTest): metadata.clear() table1 = Table('table1', metadata, autoload=True, schema=schema) table2 = Table('table2', metadata, autoload=True, schema=schema) - assert table1.schema == table2.schema == schema - assert len(metadata.tables) == 2 metadata.drop_all() - if __name__ == "__main__": testbase.main() diff --git a/test/engine/transaction.py b/test/engine/transaction.py index c89bf4b14..593a069a9 100644 --- a/test/engine/transaction.py +++ b/test/engine/transaction.py @@ -1,19 +1,19 @@ - import testbase -import unittest, sys, datetime -import tables -db = testbase.db +import sys, time, threading + from sqlalchemy import * +from sqlalchemy.orm import * +from testlib import * -class TransactionTest(testbase.PersistTest): +class TransactionTest(PersistTest): def setUpAll(self): global users, metadata metadata = MetaData() users = Table('query_users', metadata, Column('user_id', INT, primary_key = True), Column('user_name', VARCHAR(20)), - mysql_engine='InnoDB' + test_needs_acid=True, ) users.create(testbase.db) @@ -114,8 +114,154 @@ class TransactionTest(testbase.PersistTest): result = connection.execute("select * from query_users") assert len(result.fetchall()) == 0 connection.close() + + @testing.unsupported('sqlite') + def testnestedsubtransactionrollback(self): + connection = testbase.db.connect() + transaction = connection.begin() + connection.execute(users.insert(), user_id=1, user_name='user1') + trans2 = connection.begin_nested() + connection.execute(users.insert(), user_id=2, user_name='user2') + trans2.rollback() + connection.execute(users.insert(), user_id=3, user_name='user3') + transaction.commit() + + self.assertEquals( + connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(), + [(1,),(3,)] + ) + connection.close() + + @testing.unsupported('sqlite') + def testnestedsubtransactioncommit(self): + connection = testbase.db.connect() + transaction = connection.begin() + connection.execute(users.insert(), user_id=1, user_name='user1') + trans2 = connection.begin_nested() + connection.execute(users.insert(), user_id=2, user_name='user2') + trans2.commit() + connection.execute(users.insert(), user_id=3, user_name='user3') + transaction.commit() + + self.assertEquals( + connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(), + [(1,),(2,),(3,)] + ) + connection.close() + + @testing.unsupported('sqlite') + def testrollbacktosubtransaction(self): + connection = testbase.db.connect() + transaction = connection.begin() + connection.execute(users.insert(), user_id=1, user_name='user1') + trans2 = connection.begin_nested() + connection.execute(users.insert(), user_id=2, user_name='user2') + trans3 = connection.begin() + connection.execute(users.insert(), user_id=3, user_name='user3') + trans3.rollback() + connection.execute(users.insert(), user_id=4, user_name='user4') + transaction.commit() + + self.assertEquals( + connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(), + [(1,),(4,)] + ) + connection.close() + + @testing.supported('postgres', 'mysql') + def testtwophasetransaction(self): + connection = testbase.db.connect() + + transaction = connection.begin_twophase() + connection.execute(users.insert(), user_id=1, user_name='user1') + transaction.prepare() + transaction.commit() + + transaction = connection.begin_twophase() + connection.execute(users.insert(), user_id=2, user_name='user2') + transaction.commit() + + transaction = connection.begin_twophase() + connection.execute(users.insert(), user_id=3, user_name='user3') + transaction.rollback() + + transaction = connection.begin_twophase() + connection.execute(users.insert(), user_id=4, user_name='user4') + transaction.prepare() + transaction.rollback() + + self.assertEquals( + connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(), + [(1,),(2,)] + ) + connection.close() + + @testing.supported('postgres', 'mysql') + def testmixedtransaction(self): + connection = testbase.db.connect() + + transaction = connection.begin_twophase() + connection.execute(users.insert(), user_id=1, user_name='user1') + + transaction2 = connection.begin() + connection.execute(users.insert(), user_id=2, user_name='user2') + + transaction3 = connection.begin_nested() + connection.execute(users.insert(), user_id=3, user_name='user3') + + transaction4 = connection.begin() + connection.execute(users.insert(), user_id=4, user_name='user4') + transaction4.commit() + + transaction3.rollback() + + connection.execute(users.insert(), user_id=5, user_name='user5') + + transaction2.commit() + + transaction.prepare() + + transaction.commit() + + self.assertEquals( + connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(), + [(1,),(2,),(5,)] + ) + connection.close() -class AutoRollbackTest(testbase.PersistTest): + @testing.supported('postgres') + def testtwophaserecover(self): + # MySQL recovery doesn't currently seem to work correctly + # Prepared transactions disappear when connections are closed and even + # when they aren't it doesn't seem possible to use the recovery id. + connection = testbase.db.connect() + + transaction = connection.begin_twophase() + connection.execute(users.insert(), user_id=1, user_name='user1') + transaction.prepare() + + connection.close() + connection2 = testbase.db.connect() + + self.assertEquals( + connection2.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(), + [] + ) + + recoverables = connection2.recover_twophase() + self.assertTrue( + transaction.xid in recoverables + ) + + connection2.commit_prepared(transaction.xid, recover=True) + + self.assertEquals( + connection2.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(), + [(1,)] + ) + connection2.close() + +class AutoRollbackTest(PersistTest): def setUpAll(self): global metadata metadata = MetaData() @@ -123,7 +269,7 @@ class AutoRollbackTest(testbase.PersistTest): def tearDownAll(self): metadata.drop_all(testbase.db) - @testbase.unsupported('sqlite') + @testing.unsupported('sqlite') def testrollback_deadlock(self): """test that returning connections to the pool clears any object locks.""" conn1 = testbase.db.connect() @@ -131,6 +277,7 @@ class AutoRollbackTest(testbase.PersistTest): users = Table('deadlock_users', metadata, Column('user_id', INT, primary_key = True), Column('user_name', VARCHAR(20)), + test_needs_acid=True, ) users.create(conn1) conn1.execute("select * from deadlock_users") @@ -141,15 +288,15 @@ class AutoRollbackTest(testbase.PersistTest): users.drop(conn2) conn2.close() -class TLTransactionTest(testbase.PersistTest): +class TLTransactionTest(PersistTest): def setUpAll(self): global users, metadata, tlengine - tlengine = create_engine(testbase.db_uri, strategy='threadlocal') + tlengine = create_engine(testbase.db.url, strategy='threadlocal') metadata = MetaData() users = Table('query_users', metadata, Column('user_id', INT, primary_key = True), Column('user_name', VARCHAR(20)), - mysql_engine='InnoDB' + test_needs_acid=True, ) users.create(tlengine) def tearDown(self): @@ -254,7 +401,7 @@ class TLTransactionTest(testbase.PersistTest): finally: external_connection.close() - @testbase.unsupported('sqlite') + @testing.unsupported('sqlite') def testnesting(self): """tests nesting of tranacstions""" external_connection = tlengine.connect() @@ -330,7 +477,7 @@ class TLTransactionTest(testbase.PersistTest): try: mapper(User, users) - sess = create_session(bind_to=tlengine) + sess = create_session(bind=tlengine) tlengine.begin() u = User() sess.save(u) @@ -347,6 +494,127 @@ class TLTransactionTest(testbase.PersistTest): assert c1.connection is c2.connection c2.close() assert c1.connection.connection is not None + +class ForUpdateTest(PersistTest): + def setUpAll(self): + global counters, metadata + metadata = MetaData() + counters = Table('forupdate_counters', metadata, + Column('counter_id', INT, primary_key = True), + Column('counter_value', INT), + test_needs_acid=True, + ) + counters.create(testbase.db) + def tearDown(self): + testbase.db.connect().execute(counters.delete()) + def tearDownAll(self): + counters.drop(testbase.db) + + def increment(self, count, errors, update_style=True, delay=0.005): + con = testbase.db.connect() + sel = counters.select(for_update=update_style, + whereclause=counters.c.counter_id==1) + + for i in xrange(count): + trans = con.begin() + try: + existing = con.execute(sel).fetchone() + incr = existing['counter_value'] + 1 + + time.sleep(delay) + con.execute(counters.update(counters.c.counter_id==1, + values={'counter_value':incr})) + time.sleep(delay) + + readback = con.execute(sel).fetchone() + if (readback['counter_value'] != incr): + raise AssertionError("Got %s post-update, expected %s" % + (readback['counter_value'], incr)) + trans.commit() + except Exception, e: + trans.rollback() + errors.append(e) + break + + con.close() + + @testing.supported('mysql', 'oracle', 'postgres') + def testqueued_update(self): + """Test SELECT FOR UPDATE with concurrent modifications. + + Runs concurrent modifications on a single row in the users table, + with each mutator trying to increment a value stored in user_name. + """ + + db = testbase.db + db.execute(counters.insert(), counter_id=1, counter_value=0) + + iterations, thread_count = 10, 5 + threads, errors = [], [] + for i in xrange(thread_count): + thread = threading.Thread(target=self.increment, + args=(iterations,), + kwargs={'errors': errors, + 'update_style': True}) + thread.start() + threads.append(thread) + for thread in threads: + thread.join() + + for e in errors: + sys.stderr.write("Failure: %s\n" % e) + + self.assert_(len(errors) == 0) + + sel = counters.select(whereclause=counters.c.counter_id==1) + final = db.execute(sel).fetchone() + self.assert_(final['counter_value'] == iterations * thread_count) + + def overlap(self, ids, errors, update_style): + sel = counters.select(for_update=update_style, + whereclause=counters.c.counter_id.in_(*ids)) + con = testbase.db.connect() + trans = con.begin() + try: + rows = con.execute(sel).fetchall() + time.sleep(0.25) + trans.commit() + except Exception, e: + trans.rollback() + errors.append(e) + + def _threaded_overlap(self, thread_count, groups, update_style=True, pool=5): + db = testbase.db + for cid in range(pool - 1): + db.execute(counters.insert(), counter_id=cid + 1, counter_value=0) + + errors, threads = [], [] + for i in xrange(thread_count): + thread = threading.Thread(target=self.overlap, + args=(groups.pop(0), errors, update_style)) + thread.start() + threads.append(thread) + for thread in threads: + thread.join() + + return errors + + @testing.supported('mysql', 'oracle', 'postgres') + def testqueued_select(self): + """Simple SELECT FOR UPDATE conflict test""" + + errors = self._threaded_overlap(2, [(1,2,3),(3,4,5)]) + for e in errors: + sys.stderr.write("Failure: %s\n" % e) + self.assert_(len(errors) == 0) + + @testing.supported('oracle', 'postgres') + def testnowait_select(self): + """Simple SELECT FOR UPDATE NOWAIT conflict test""" + + errors = self._threaded_overlap(2, [(1,2,3),(3,4,5)], + update_style='nowait') + self.assert_(len(errors) != 0) if __name__ == "__main__": testbase.main() |
