summaryrefslogtreecommitdiff
path: root/test/engine
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine')
-rw-r--r--test/engine/alltests.py2
-rw-r--r--test/engine/autoconnect_engine.py90
-rw-r--r--test/engine/bind.py49
-rw-r--r--test/engine/execute.py17
-rw-r--r--test/engine/metadata.py18
-rw-r--r--test/engine/parseconnect.py12
-rw-r--r--test/engine/pool.py44
-rw-r--r--test/engine/proxy_engine.py204
-rw-r--r--test/engine/reconnect.py12
-rw-r--r--test/engine/reflection.py130
-rw-r--r--test/engine/transaction.py294
11 files changed, 451 insertions, 421 deletions
diff --git a/test/engine/alltests.py b/test/engine/alltests.py
index ec8a47390..a34a82ed7 100644
--- a/test/engine/alltests.py
+++ b/test/engine/alltests.py
@@ -10,12 +10,12 @@ def suite():
'engine.bind',
'engine.reconnect',
'engine.execute',
+ 'engine.metadata',
'engine.transaction',
# schema/tables
'engine.reflection',
- 'engine.proxy_engine'
)
alltests = unittest.TestSuite()
for name in modules_to_test:
diff --git a/test/engine/autoconnect_engine.py b/test/engine/autoconnect_engine.py
deleted file mode 100644
index 69c2c33f5..000000000
--- a/test/engine/autoconnect_engine.py
+++ /dev/null
@@ -1,90 +0,0 @@
-from testbase import PersistTest
-import testbase
-from sqlalchemy import *
-from sqlalchemy.ext.proxy import AutoConnectEngine
-
-import os
-
-#
-# Define an engine, table and mapper at the module level, to show that the
-# table and mapper can be used with different real engines in multiple threads
-#
-
-
-module_engine = AutoConnectEngine( testbase.db_uri )
-users = Table('users', module_engine,
- Column('user_id', Integer, primary_key=True),
- Column('user_name', String(16)),
- Column('password', String(20))
- )
-
-class User(object):
- pass
-
-
-class AutoConnectEngineTest1(PersistTest):
-
- def setUp(self):
- clear_mappers()
- objectstore.clear()
-
- def test_engine_connect(self):
- users.create()
- assign_mapper(User, users)
- try:
- trans = objectstore.begin()
-
- user = User()
- user.user_name='fred'
- user.password='*'
- trans.commit()
-
- # select
- sqluser = User.select_by(user_name='fred')[0]
- assert sqluser.user_name == 'fred'
-
- # modify
- sqluser.user_name = 'fred jones'
-
- # commit - saves everything that changed
- objectstore.commit()
-
- allusers = [ user.user_name for user in User.select() ]
- assert allusers == [ 'fred jones' ]
- finally:
- users.drop()
-
-
-
-
-if __name__ == "__main__":
- testbase.main()
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/test/engine/bind.py b/test/engine/bind.py
index b9e53e6b1..6a0c78f57 100644
--- a/test/engine/bind.py
+++ b/test/engine/bind.py
@@ -2,12 +2,10 @@
including the deprecated versions of these arguments"""
import testbase
-import unittest, sys, datetime
-import tables
-db = testbase.db
from sqlalchemy import *
+from testlib import *
-class BindTest(testbase.PersistTest):
+class BindTest(PersistTest):
def test_create_drop_explicit(self):
metadata = MetaData()
table = Table('test_table', metadata,
@@ -17,7 +15,6 @@ class BindTest(testbase.PersistTest):
testbase.db.connect()
):
for args in [
- ([], {'connectable':bind}),
([], {'bind':bind}),
([bind], {})
]:
@@ -57,7 +54,7 @@ class BindTest(testbase.PersistTest):
table = Table('test_table', metadata,
Column('foo', Integer))
metadata.bind = bind
- assert metadata.bind is metadata.engine is table.bind is table.engine is bind
+ assert metadata.bind is table.bind is bind
metadata.create_all()
assert table.exists()
metadata.drop_all()
@@ -70,7 +67,7 @@ class BindTest(testbase.PersistTest):
Column('foo', Integer))
metadata.connect(bind)
- assert metadata.bind is metadata.engine is table.bind is table.engine is bind
+ assert metadata.bind is table.bind is bind
metadata.create_all()
assert table.exists()
metadata.drop_all()
@@ -88,15 +85,12 @@ class BindTest(testbase.PersistTest):
try:
for args in (
([bind], {}),
- ([], {'engine_or_url':bind}),
([], {'bind':bind}),
- ([], {'engine':bind})
):
metadata = MetaData(*args[0], **args[1])
table = Table('test_table', metadata,
- Column('foo', Integer))
-
- assert metadata.bind is metadata.engine is table.bind is table.engine is bind
+ Column('foo', Integer))
+ assert metadata.bind is table.bind is bind
metadata.create_all()
assert table.exists()
metadata.drop_all()
@@ -111,7 +105,8 @@ class BindTest(testbase.PersistTest):
metadata = MetaData()
table = Table('test_table', metadata,
Column('foo', Integer),
- mysql_engine='InnoDB')
+ test_needs_acid=True,
+ )
conn = testbase.db.connect()
metadata.create_all(bind=conn)
try:
@@ -124,7 +119,7 @@ class BindTest(testbase.PersistTest):
table.insert().execute(foo=7)
trans.rollback()
metadata.bind = None
- assert testbase.db.execute("select count(1) from test_table").scalar() == 0
+ assert conn.execute("select count(1) from test_table").scalar() == 0
finally:
metadata.drop_all(bind=conn)
@@ -147,10 +142,7 @@ class BindTest(testbase.PersistTest):
):
try:
e = elem(bind=bind)
- assert e.bind is e.engine is bind
- e.execute()
- e = elem(engine=bind)
- assert e.bind is e.engine is bind
+ assert e.bind is bind
e.execute()
finally:
if isinstance(bind, engine.Connection):
@@ -158,16 +150,19 @@ class BindTest(testbase.PersistTest):
try:
e = elem()
- assert e.bind is e.engine is None
+ assert e.bind is None
e.execute()
assert False
except exceptions.InvalidRequestError, e:
assert str(e) == "This Compiled object is not bound to any Engine or Connection."
-
+
finally:
+ if isinstance(bind, engine.Connection):
+ bind.close()
metadata.drop_all(bind=testbase.db)
def test_session(self):
+ from sqlalchemy.orm import create_session, mapper
metadata = MetaData()
table = Table('test_table', metadata,
Column('foo', Integer, primary_key=True),
@@ -177,11 +172,13 @@ class BindTest(testbase.PersistTest):
mapper(Foo, table)
metadata.create_all(bind=testbase.db)
try:
- for bind in (testbase.db, testbase.db.connect()):
+ for bind in (testbase.db,
+ testbase.db.connect()
+ ):
try:
- for args in ({'bind':bind}, {'bind_to':bind}):
+ for args in ({'bind':bind},):
sess = create_session(**args)
- assert sess.bind is sess.bind_to is bind
+ assert sess.bind is bind
f = Foo()
sess.save(f)
sess.flush()
@@ -189,6 +186,9 @@ class BindTest(testbase.PersistTest):
finally:
if isinstance(bind, engine.Connection):
bind.close()
+
+ if isinstance(bind, engine.Connection):
+ bind.close()
sess = create_session()
f = Foo()
@@ -198,8 +198,9 @@ class BindTest(testbase.PersistTest):
assert False
except exceptions.InvalidRequestError, e:
assert str(e).startswith("Could not locate any Engine or Connection bound to mapper")
-
finally:
+ if isinstance(bind, engine.Connection):
+ bind.close()
metadata.drop_all(bind=testbase.db)
diff --git a/test/engine/execute.py b/test/engine/execute.py
index 283006cfa..3d3b43f9b 100644
--- a/test/engine/execute.py
+++ b/test/engine/execute.py
@@ -1,19 +1,14 @@
-
import testbase
-import unittest, sys, datetime
-import tables
-db = testbase.db
from sqlalchemy import *
+from testlib import *
-
-class ExecuteTest(testbase.PersistTest):
+class ExecuteTest(PersistTest):
def setUpAll(self):
global users, metadata
metadata = MetaData(testbase.db)
users = Table('users', metadata,
Column('user_id', INT, primary_key = True),
Column('user_name', VARCHAR(20)),
- mysql_engine='InnoDB'
)
metadata.create_all()
@@ -22,7 +17,7 @@ class ExecuteTest(testbase.PersistTest):
def tearDownAll(self):
metadata.drop_all()
- @testbase.supported('sqlite')
+ @testing.supported('sqlite')
def test_raw_qmark(self):
for conn in (testbase.db, testbase.db.connect()):
conn.execute("insert into users (user_id, user_name) values (?, ?)", (1,"jack"))
@@ -34,7 +29,7 @@ class ExecuteTest(testbase.PersistTest):
assert res.fetchall() == [(1, "jack"), (2, "fred"), (3, "ed"), (4, "horse"), (5, "barney"), (6, "donkey"), (7, 'sally')]
conn.execute("delete from users")
- @testbase.supported('mysql', 'postgres')
+ @testing.supported('mysql', 'postgres')
def test_raw_sprintf(self):
for conn in (testbase.db, testbase.db.connect()):
conn.execute("insert into users (user_id, user_name) values (%s, %s)", [1,"jack"])
@@ -47,7 +42,7 @@ class ExecuteTest(testbase.PersistTest):
# pyformat is supported for mysql, but skipping because a few driver
# versions have a bug that bombs out on this test. (1.2.2b3, 1.2.2c1, 1.2.2)
- @testbase.supported('postgres')
+ @testing.supported('postgres')
def test_raw_python(self):
for conn in (testbase.db, testbase.db.connect()):
conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)", {'id':1, 'name':'jack'})
@@ -57,7 +52,7 @@ class ExecuteTest(testbase.PersistTest):
assert res.fetchall() == [(1, "jack"), (2, "ed"), (3, "horse"), (4, 'sally')]
conn.execute("delete from users")
- @testbase.supported('sqlite')
+ @testing.supported('sqlite')
def test_raw_named(self):
for conn in (testbase.db, testbase.db.connect()):
conn.execute("insert into users (user_id, user_name) values (:id, :name)", {'id':1, 'name':'jack'})
diff --git a/test/engine/metadata.py b/test/engine/metadata.py
new file mode 100644
index 000000000..973007fab
--- /dev/null
+++ b/test/engine/metadata.py
@@ -0,0 +1,18 @@
+import testbase
+from sqlalchemy import *
+from testlib import *
+
+class MetaDataTest(PersistTest):
+ def test_metadata_connect(self):
+ metadata = MetaData()
+ t1 = Table('table1', metadata, Column('col1', Integer, primary_key=True),
+ Column('col2', String(20)))
+ metadata.bind = testbase.db
+ metadata.create_all()
+ try:
+ assert t1.count().scalar() == 0
+ finally:
+ metadata.drop_all()
+
+if __name__ == '__main__':
+ testbase.main()
diff --git a/test/engine/parseconnect.py b/test/engine/parseconnect.py
index 967a20ed5..3e186275d 100644
--- a/test/engine/parseconnect.py
+++ b/test/engine/parseconnect.py
@@ -1,8 +1,7 @@
-from testbase import PersistTest
import testbase
-import sqlalchemy.engine.url as url
from sqlalchemy import *
-import unittest
+import sqlalchemy.engine.url as url
+from testlib import *
class ParseConnectTest(PersistTest):
@@ -65,7 +64,7 @@ class CreateEngineTest(PersistTest):
def testrecycle(self):
dbapi = MockDBAPI(foober=12, lala=18, hoho={'this':'dict'}, fooz='somevalue')
e = create_engine('postgres://', pool_recycle=472, module=dbapi)
- assert e.connection_provider._pool._recycle == 472
+ assert e.pool._recycle == 472
def testbadargs(self):
# good arg, use MockDBAPI to prevent oracle import errors
@@ -116,7 +115,6 @@ class CreateEngineTest(PersistTest):
except TypeError:
assert True
- e = create_engine('sqlite://', echo=True)
e = create_engine('mysql://', module=MockDBAPI(), connect_args={'use_unicode':True}, convert_unicode=True)
e = create_engine('sqlite://', connect_args={'use_unicode':True}, convert_unicode=True)
@@ -139,8 +137,8 @@ class CreateEngineTest(PersistTest):
def testpoolargs(self):
"""test that connection pool args make it thru"""
e = create_engine('postgres://', creator=None, pool_recycle=-1, echo_pool=None, auto_close_cursors=False, disallow_open_cursors=True, module=MockDBAPI())
- assert e.connection_provider._pool.auto_close_cursors is False
- assert e.connection_provider._pool.disallow_open_cursors is True
+ assert e.pool.auto_close_cursors is False
+ assert e.pool.disallow_open_cursors is True
# these args work for QueuePool
e = create_engine('postgres://', max_overflow=8, pool_timeout=60, poolclass=pool.QueuePool, module=MockDBAPI())
diff --git a/test/engine/pool.py b/test/engine/pool.py
index 85e9d59fd..364afa9d7 100644
--- a/test/engine/pool.py
+++ b/test/engine/pool.py
@@ -1,10 +1,9 @@
import testbase
-from testbase import PersistTest
-import unittest, sys, os, time
-import threading, thread
-
+import threading, thread, time
import sqlalchemy.pool as pool
import sqlalchemy.exceptions as exceptions
+from testlib import *
+
mcid = 1
class MockDBAPI(object):
@@ -45,7 +44,7 @@ class PoolTest(PersistTest):
connection2 = manager.connect('foo.db')
connection3 = manager.connect('bar.db')
- self.echo( "connection " + repr(connection))
+ print "connection " + repr(connection)
self.assert_(connection.cursor() is not None)
self.assert_(connection is connection2)
self.assert_(connection2 is not connection3)
@@ -64,7 +63,7 @@ class PoolTest(PersistTest):
connection = manager.connect('foo.db')
connection2 = manager.connect('foo.db')
- self.echo( "connection " + repr(connection))
+ print "connection " + repr(connection)
self.assert_(connection.cursor() is not None)
self.assert_(connection is not connection2)
@@ -80,7 +79,7 @@ class PoolTest(PersistTest):
def status(pool):
tup = (pool.size(), pool.checkedin(), pool.overflow(), pool.checkedout())
- self.echo( "Pool size: %d Connections in pool: %d Current Overflow: %d Current Checked out connections: %d" % tup)
+ print "Pool size: %d Connections in pool: %d Current Overflow: %d Current Checked out connections: %d" % tup
return tup
c1 = p.connect()
@@ -160,7 +159,7 @@ class PoolTest(PersistTest):
print timeouts
assert len(timeouts) > 0
for t in timeouts:
- assert abs(t - 3) < 1
+ assert abs(t - 3) < 1, "Not all timeouts were 3 seconds: " + repr(timeouts)
def _test_overflow(self, thread_count, max_overflow):
def creator():
@@ -352,6 +351,35 @@ class PoolTest(PersistTest):
c2 = None
c1 = None
self.assert_(p.checkedout() == 0)
+
+ def test_properties(self):
+ dbapi = MockDBAPI()
+ p = pool.QueuePool(creator=lambda: dbapi.connect('foo.db'),
+ pool_size=1, max_overflow=0)
+
+ c = p.connect()
+ self.assert_(not c.properties)
+ self.assert_(c.properties is c._connection_record.properties)
+
+ c.properties['foo'] = 'bar'
+ c.close()
+ del c
+
+ c = p.connect()
+ self.assert_('foo' in c.properties)
+
+ c.invalidate()
+ c = p.connect()
+ self.assert_('foo' not in c.properties)
+
+ c.properties['foo2'] = 'bar2'
+ c.detach()
+ self.assert_('foo2' in c.properties)
+
+ c2 = p.connect()
+ self.assert_(c.connection is not c2.connection)
+ self.assert_(not c2.properties)
+ self.assert_('foo2' in c.properties)
def tearDown(self):
pool.clear_managers()
diff --git a/test/engine/proxy_engine.py b/test/engine/proxy_engine.py
deleted file mode 100644
index 26b738e41..000000000
--- a/test/engine/proxy_engine.py
+++ /dev/null
@@ -1,204 +0,0 @@
-from testbase import PersistTest
-import testbase
-import os
-
-from sqlalchemy import *
-from sqlalchemy.ext.proxy import ProxyEngine
-
-
-#
-# Define an engine, table and mapper at the module level, to show that the
-# table and mapper can be used with different real engines in multiple threads
-#
-
-
-class ProxyTestBase(PersistTest):
- def setUpAll(self):
-
- global users, User, module_engine, module_metadata
-
- module_engine = ProxyEngine(echo=testbase.echo)
- module_metadata = MetaData()
-
- users = Table('users', module_metadata,
- Column('user_id', Integer, primary_key=True),
- Column('user_name', String(16)),
- Column('password', String(20))
- )
-
- class User(object):
- pass
-
- User.mapper = mapper(User, users)
- def tearDownAll(self):
- clear_mappers()
-
-class ConstructTest(ProxyTestBase):
- """tests that we can build SQL constructs without engine-specific parameters, particulary
- oid_column, being needed, as the proxy engine is usually not connected yet."""
-
- def test_join(self):
- engine = ProxyEngine()
- t = Table('table1', engine,
- Column('col1', Integer, primary_key=True))
- t2 = Table('table2', engine,
- Column('col2', Integer, ForeignKey('table1.col1')))
- j = join(t, t2)
-
-
-class ProxyEngineTest1(ProxyTestBase):
-
- def test_engine_connect(self):
- # connect to a real engine
- module_engine.connect(testbase.db_uri)
- module_metadata.create_all(module_engine)
-
- session = create_session(bind_to=module_engine)
- try:
-
- user = User()
- user.user_name='fred'
- user.password='*'
-
- session.save(user)
- session.flush()
-
- query = session.query(User)
-
- # select
- sqluser = query.select_by(user_name='fred')[0]
- assert sqluser.user_name == 'fred'
-
- # modify
- sqluser.user_name = 'fred jones'
-
- # flush - saves everything that changed
- session.flush()
-
- allusers = [ user.user_name for user in query.select() ]
- assert allusers == ['fred jones']
-
- finally:
- module_metadata.drop_all(module_engine)
-
-
-class ThreadProxyTest(ProxyTestBase):
-
- def tearDownAll(self):
- try:
- os.remove('threadtesta.db')
- except OSError:
- pass
- try:
- os.remove('threadtestb.db')
- except OSError:
- pass
-
- @testbase.supported('sqlite')
- def test_multi_thread(self):
-
- from threading import Thread
- from Queue import Queue
-
- # start 2 threads with different connection params
- # and perform simultaneous operations, showing that the
- # 2 threads don't share a connection
- qa = Queue()
- qb = Queue()
- def run(db_uri, uname, queue):
- def test():
-
- try:
- module_engine.connect(db_uri)
- module_metadata.create_all(module_engine)
- try:
- session = create_session(bind_to=module_engine)
-
- query = session.query(User)
-
- all = list(query.select())
- assert all == []
-
- u = User()
- u.user_name = uname
- u.password = 'whatever'
-
- session.save(u)
- session.flush()
-
- names = [u.user_name for u in query.select()]
- assert names == [uname]
- finally:
- module_metadata.drop_all(module_engine)
- module_engine.get_engine().dispose()
- except Exception, e:
- import traceback
- traceback.print_exc()
- queue.put(e)
- else:
- queue.put(False)
- return test
-
- a = Thread(target=run('sqlite:///threadtesta.db', 'jim', qa))
- b = Thread(target=run('sqlite:///threadtestb.db', 'joe', qb))
-
- a.start()
- b.start()
-
- # block and wait for the threads to push their results
- res = qa.get()
- if res != False:
- raise res
-
- res = qb.get()
- if res != False:
- raise res
-
-
-class ProxyEngineTest2(ProxyTestBase):
-
- def test_table_singleton_a(self):
- """set up for table singleton check
- """
- #
- # For this 'test', create a proxy engine instance, connect it
- # to a real engine, and make it do some work
- #
- engine = ProxyEngine()
- cats = Table('cats', engine,
- Column('cat_id', Integer, primary_key=True),
- Column('cat_name', String))
-
- engine.connect(testbase.db_uri)
-
- cats.create(engine)
- cats.drop(engine)
-
- ProxyEngineTest2.cats_table_a = cats
- assert isinstance(cats, Table)
-
- def test_table_singleton_b(self):
- """check that a table on a 2nd proxy engine instance gets 2nd table
- instance
- """
- #
- # Now create a new proxy engine instance and attach the same
- # table as the first test. This should result in 2 table instances,
- # since different proxy engine instances can't attach to the
- # same table instance
- #
- engine = ProxyEngine()
- cats = Table('cats', engine,
- Column('cat_id', Integer, primary_key=True),
- Column('cat_name', String))
- assert id(cats) != id(ProxyEngineTest2.cats_table_a)
-
- # the real test -- if we're still using the old engine reference,
- # this will fail because the old reference's local storage will
- # not have the default attributes
- engine.connect(testbase.db_uri)
- cats.create(engine)
- cats.drop(engine)
-
-if __name__ == "__main__":
- testbase.main()
diff --git a/test/engine/reconnect.py b/test/engine/reconnect.py
index defc878ab..7c213695f 100644
--- a/test/engine/reconnect.py
+++ b/test/engine/reconnect.py
@@ -1,6 +1,8 @@
import testbase
+import sys, weakref
from sqlalchemy import create_engine, exceptions
-import gc, weakref, sys
+from testlib import *
+
class MockDisconnect(Exception):
pass
@@ -37,7 +39,7 @@ class MockCursor(object):
def close(self):
pass
-class ReconnectTest(testbase.PersistTest):
+class ReconnectTest(PersistTest):
def test_reconnect(self):
"""test that an 'is_disconnect' condition will invalidate the connection, and additionally
dispose the previous connection pool and recreate."""
@@ -50,7 +52,7 @@ class ReconnectTest(testbase.PersistTest):
# monkeypatch disconnect checker
db.dialect.is_disconnect = lambda e: isinstance(e, MockDisconnect)
- pid = id(db.connection_provider._pool)
+ pid = id(db.pool)
# make a connection
conn = db.connect()
@@ -81,7 +83,7 @@ class ReconnectTest(testbase.PersistTest):
# close shouldnt break
conn.close()
- assert id(db.connection_provider._pool) != pid
+ assert id(db.pool) != pid
# ensure all connections closed (pool was recycled)
assert len(dbapi.connections) == 0
@@ -92,4 +94,4 @@ class ReconnectTest(testbase.PersistTest):
assert len(dbapi.connections) == 1
if __name__ == '__main__':
- testbase.main() \ No newline at end of file
+ testbase.main()
diff --git a/test/engine/reflection.py b/test/engine/reflection.py
index 74ae75e2e..00c1276ee 100644
--- a/test/engine/reflection.py
+++ b/test/engine/reflection.py
@@ -1,13 +1,12 @@
-from testbase import PersistTest
import testbase
-import pickle
-import sqlalchemy.ansisql as ansisql
+import pickle, StringIO
from sqlalchemy import *
+import sqlalchemy.ansisql as ansisql
from sqlalchemy.exceptions import NoSuchTableError
import sqlalchemy.databases.mysql as mysql
+from testlib import *
-import unittest, re, StringIO
class ReflectionTest(PersistTest):
def testbasic(self):
@@ -15,6 +14,10 @@ class ReflectionTest(PersistTest):
use_string_defaults = use_function_defaults or testbase.db.engine.__module__.endswith('sqlite')
+ if (testbase.db.engine.name == 'mysql' and
+ testbase.db.dialect.get_version_info(testbase.db) < (4, 1, 1)):
+ return
+
if use_function_defaults:
defval = func.current_date()
deftype = Date
@@ -54,14 +57,14 @@ class ReflectionTest(PersistTest):
Column('test_passivedefault4', deftype3, PassiveDefault(defval3)),
Column('test9', Binary(100)),
Column('test_numeric', Numeric(None, None)),
- mysql_engine='InnoDB'
+ test_needs_fk=True,
)
-
+
addresses = Table('engine_email_addresses', meta,
Column('address_id', Integer, primary_key = True),
Column('remote_user_id', Integer, ForeignKey(users.c.user_id)),
Column('email_address', String(20)),
- mysql_engine='InnoDB'
+ test_needs_fk=True,
)
meta.drop_all()
@@ -106,6 +109,29 @@ class ReflectionTest(PersistTest):
addresses.drop()
users.drop()
+ def test_autoload_partial(self):
+ meta = MetaData(testbase.db)
+ foo = Table('foo', meta,
+ Column('a', String(30)),
+ Column('b', String(30)),
+ Column('c', String(30)),
+ Column('d', String(30)),
+ Column('e', String(30)),
+ Column('f', String(30)),
+ )
+ meta.create_all()
+ try:
+ meta2 = MetaData(testbase.db)
+ foo2 = Table('foo', meta2, autoload=True, include_columns=['b', 'f', 'e'])
+ # test that cols come back in original order
+ assert [c.name for c in foo2.c] == ['b', 'e', 'f']
+ for c in ('b', 'f', 'e'):
+ assert c in foo2.c
+ for c in ('a', 'c', 'd'):
+ assert c not in foo2.c
+ finally:
+ meta.drop_all()
+
def testoverridecolumns(self):
"""test that you can override columns which contain foreign keys to other reflected tables"""
meta = MetaData(testbase.db)
@@ -203,7 +229,7 @@ class ReflectionTest(PersistTest):
finally:
meta.drop_all()
- @testbase.supported('mysql')
+ @testing.supported('mysql')
def testmysqltypes(self):
meta1 = MetaData(testbase.db)
table = Table(
@@ -250,7 +276,7 @@ class ReflectionTest(PersistTest):
PRIMARY KEY(id)
)""")
try:
- metadata = MetaData(engine=testbase.db)
+ metadata = MetaData(bind=testbase.db)
book = Table('book', metadata, autoload=True)
assert book.c.id in book.primary_key
assert book.c.series not in book.primary_key
@@ -271,7 +297,7 @@ class ReflectionTest(PersistTest):
PRIMARY KEY(id, isbn)
)""")
try:
- metadata = MetaData(engine=testbase.db)
+ metadata = MetaData(bind=testbase.db)
book = Table('book', metadata, autoload=True)
assert book.c.id in book.primary_key
assert book.c.isbn in book.primary_key
@@ -280,7 +306,7 @@ class ReflectionTest(PersistTest):
finally:
testbase.db.execute("drop table book")
- @testbase.supported('sqlite')
+ @testing.supported('sqlite')
def test_goofy_sqlite(self):
"""test autoload of table where quotes were used with all the colnames. quirky in sqlite."""
testbase.db.execute("""CREATE TABLE "django_content_type" (
@@ -309,7 +335,12 @@ class ReflectionTest(PersistTest):
def test_composite_fk(self):
"""test reflection of composite foreign keys"""
+
+ if (testbase.db.engine.name == 'mysql' and
+ testbase.db.dialect.get_version_info(testbase.db) < (4, 1, 1)):
+ return
meta = MetaData(testbase.db)
+
table = Table(
'multi', meta,
Column('multi_id', Integer, primary_key=True),
@@ -317,7 +348,7 @@ class ReflectionTest(PersistTest):
Column('multi_hoho', Integer, primary_key=True),
Column('name', String(50), nullable=False),
Column('val', String(100)),
- mysql_engine='InnoDB'
+ test_needs_fk=True,
)
table2 = Table('multi2', meta,
Column('id', Integer, primary_key=True),
@@ -326,7 +357,7 @@ class ReflectionTest(PersistTest):
Column('lala', Integer),
Column('data', String(50)),
ForeignKeyConstraint(['foo', 'bar', 'lala'], ['multi.multi_id', 'multi.multi_rev', 'multi.multi_hoho']),
- mysql_engine='InnoDB'
+ test_needs_fk=True,
)
assert table.c.multi_hoho
meta.create_all()
@@ -345,7 +376,6 @@ class ReflectionTest(PersistTest):
finally:
meta.drop_all()
-
def test_to_metadata(self):
meta = MetaData()
@@ -372,17 +402,17 @@ class ReflectionTest(PersistTest):
def test_pickle():
meta.connect(testbase.db)
meta2 = pickle.loads(pickle.dumps(meta))
- assert meta2.engine is None
+ assert meta2.bind is None
return (meta2.tables['mytable'], meta2.tables['othertable'])
def test_pickle_via_reflect():
# this is the most common use case, pickling the results of a
# database reflection
- meta2 = MetaData(engine=testbase.db)
+ meta2 = MetaData(bind=testbase.db)
t1 = Table('mytable', meta2, autoload=True)
t2 = Table('othertable', meta2, autoload=True)
meta3 = pickle.loads(pickle.dumps(meta2))
- assert meta3.engine is None
+ assert meta3.bind is None
assert meta3.tables['mytable'] is not t1
return (meta3.tables['mytable'], meta3.tables['othertable'])
@@ -392,6 +422,8 @@ class ReflectionTest(PersistTest):
table_c, table2_c = test()
assert table is not table_c
assert table_c.c.myid.primary_key
+ assert isinstance(table_c.c.myid.type, Integer)
+ assert isinstance(table_c.c.name.type, String)
assert not table_c.c.name.nullable
assert table_c.c.description.nullable
assert table.primary_key is not table_c.primary_key
@@ -418,14 +450,10 @@ class ReflectionTest(PersistTest):
finally:
meta.drop_all(testbase.db)
- # mysql throws its own exception for no such table, resulting in
- # a sqlalchemy.SQLError instead of sqlalchemy.NoSuchTableError.
- # this could probably be fixed at some point.
- @testbase.unsupported('mysql')
def test_nonexistent(self):
self.assertRaises(NoSuchTableError, Table,
'fake_table',
- testbase.db, autoload=True)
+ MetaData(testbase.db), autoload=True)
def testoverride(self):
meta = MetaData(testbase.db)
@@ -452,7 +480,7 @@ class ReflectionTest(PersistTest):
finally:
table.drop()
- @testbase.supported('mssql')
+ @testing.supported('mssql')
def testidentity(self):
meta = MetaData(testbase.db)
table = Table(
@@ -505,20 +533,6 @@ class ReflectionTest(PersistTest):
finally:
meta.drop_all()
-
- meta = MetaData(testbase.db)
- table = Table(
- 'select', meta,
- Column('col1', Integer, primary_key=True)
- )
- table.create()
-
- meta2 = MetaData(testbase.db)
- try:
- table2 = Table('select', meta2, autoload=True)
- finally:
- table.drop()
-
class CreateDropTest(PersistTest):
def setUpAll(self):
global metadata, users
@@ -558,33 +572,33 @@ class CreateDropTest(PersistTest):
def testcheckfirst(self):
try:
assert not users.exists(testbase.db)
- users.create(connectable=testbase.db)
+ users.create(bind=testbase.db)
assert users.exists(testbase.db)
- users.create(connectable=testbase.db, checkfirst=True)
- users.drop(connectable=testbase.db)
- users.drop(connectable=testbase.db, checkfirst=True)
- assert not users.exists(connectable=testbase.db)
- users.create(connectable=testbase.db, checkfirst=True)
- users.drop(connectable=testbase.db)
+ users.create(bind=testbase.db, checkfirst=True)
+ users.drop(bind=testbase.db)
+ users.drop(bind=testbase.db, checkfirst=True)
+ assert not users.exists(bind=testbase.db)
+ users.create(bind=testbase.db, checkfirst=True)
+ users.drop(bind=testbase.db)
finally:
- metadata.drop_all(connectable=testbase.db)
+ metadata.drop_all(bind=testbase.db)
def test_createdrop(self):
- metadata.create_all(connectable=testbase.db)
+ metadata.create_all(bind=testbase.db)
self.assertEqual( testbase.db.has_table('items'), True )
self.assertEqual( testbase.db.has_table('email_addresses'), True )
- metadata.create_all(connectable=testbase.db)
+ metadata.create_all(bind=testbase.db)
self.assertEqual( testbase.db.has_table('items'), True )
- metadata.drop_all(connectable=testbase.db)
+ metadata.drop_all(bind=testbase.db)
self.assertEqual( testbase.db.has_table('items'), False )
self.assertEqual( testbase.db.has_table('email_addresses'), False )
- metadata.drop_all(connectable=testbase.db)
+ metadata.drop_all(bind=testbase.db)
self.assertEqual( testbase.db.has_table('items'), False )
class SchemaTest(PersistTest):
# this test should really be in the sql tests somewhere, not engine
- @testbase.unsupported('sqlite')
+ @testing.unsupported('sqlite')
def testiteration(self):
metadata = MetaData()
table1 = Table('table1', metadata,
@@ -607,14 +621,17 @@ class SchemaTest(PersistTest):
print buf
assert buf.index("CREATE TABLE someschema.table1") > -1
assert buf.index("CREATE TABLE someschema.table2") > -1
-
- @testbase.unsupported('sqlite', 'postgres')
- def test_create_with_defaultschema(self):
+
+ @testing.supported('mysql','postgres')
+ def testcreate(self):
engine = testbase.db
schema = engine.dialect.get_default_schema_name(engine)
+ #engine.echo = True
- # test reflection of tables with an explcit schemaname
- # matching the default
+ if testbase.db.name == 'mysql':
+ schema = testbase.db.url.database
+ else:
+ schema = 'public'
metadata = MetaData(testbase.db)
table1 = Table('table1', metadata,
Column('col1', Integer, primary_key=True),
@@ -628,10 +645,7 @@ class SchemaTest(PersistTest):
metadata.clear()
table1 = Table('table1', metadata, autoload=True, schema=schema)
table2 = Table('table2', metadata, autoload=True, schema=schema)
- assert table1.schema == table2.schema == schema
- assert len(metadata.tables) == 2
metadata.drop_all()
-
if __name__ == "__main__":
testbase.main()
diff --git a/test/engine/transaction.py b/test/engine/transaction.py
index c89bf4b14..593a069a9 100644
--- a/test/engine/transaction.py
+++ b/test/engine/transaction.py
@@ -1,19 +1,19 @@
-
import testbase
-import unittest, sys, datetime
-import tables
-db = testbase.db
+import sys, time, threading
+
from sqlalchemy import *
+from sqlalchemy.orm import *
+from testlib import *
-class TransactionTest(testbase.PersistTest):
+class TransactionTest(PersistTest):
def setUpAll(self):
global users, metadata
metadata = MetaData()
users = Table('query_users', metadata,
Column('user_id', INT, primary_key = True),
Column('user_name', VARCHAR(20)),
- mysql_engine='InnoDB'
+ test_needs_acid=True,
)
users.create(testbase.db)
@@ -114,8 +114,154 @@ class TransactionTest(testbase.PersistTest):
result = connection.execute("select * from query_users")
assert len(result.fetchall()) == 0
connection.close()
+
+ @testing.unsupported('sqlite')
+ def testnestedsubtransactionrollback(self):
+ connection = testbase.db.connect()
+ transaction = connection.begin()
+ connection.execute(users.insert(), user_id=1, user_name='user1')
+ trans2 = connection.begin_nested()
+ connection.execute(users.insert(), user_id=2, user_name='user2')
+ trans2.rollback()
+ connection.execute(users.insert(), user_id=3, user_name='user3')
+ transaction.commit()
+
+ self.assertEquals(
+ connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(),
+ [(1,),(3,)]
+ )
+ connection.close()
+
+ @testing.unsupported('sqlite')
+ def testnestedsubtransactioncommit(self):
+ connection = testbase.db.connect()
+ transaction = connection.begin()
+ connection.execute(users.insert(), user_id=1, user_name='user1')
+ trans2 = connection.begin_nested()
+ connection.execute(users.insert(), user_id=2, user_name='user2')
+ trans2.commit()
+ connection.execute(users.insert(), user_id=3, user_name='user3')
+ transaction.commit()
+
+ self.assertEquals(
+ connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(),
+ [(1,),(2,),(3,)]
+ )
+ connection.close()
+
+ @testing.unsupported('sqlite')
+ def testrollbacktosubtransaction(self):
+ connection = testbase.db.connect()
+ transaction = connection.begin()
+ connection.execute(users.insert(), user_id=1, user_name='user1')
+ trans2 = connection.begin_nested()
+ connection.execute(users.insert(), user_id=2, user_name='user2')
+ trans3 = connection.begin()
+ connection.execute(users.insert(), user_id=3, user_name='user3')
+ trans3.rollback()
+ connection.execute(users.insert(), user_id=4, user_name='user4')
+ transaction.commit()
+
+ self.assertEquals(
+ connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(),
+ [(1,),(4,)]
+ )
+ connection.close()
+
+ @testing.supported('postgres', 'mysql')
+ def testtwophasetransaction(self):
+ connection = testbase.db.connect()
+
+ transaction = connection.begin_twophase()
+ connection.execute(users.insert(), user_id=1, user_name='user1')
+ transaction.prepare()
+ transaction.commit()
+
+ transaction = connection.begin_twophase()
+ connection.execute(users.insert(), user_id=2, user_name='user2')
+ transaction.commit()
+
+ transaction = connection.begin_twophase()
+ connection.execute(users.insert(), user_id=3, user_name='user3')
+ transaction.rollback()
+
+ transaction = connection.begin_twophase()
+ connection.execute(users.insert(), user_id=4, user_name='user4')
+ transaction.prepare()
+ transaction.rollback()
+
+ self.assertEquals(
+ connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(),
+ [(1,),(2,)]
+ )
+ connection.close()
+
+ @testing.supported('postgres', 'mysql')
+ def testmixedtransaction(self):
+ connection = testbase.db.connect()
+
+ transaction = connection.begin_twophase()
+ connection.execute(users.insert(), user_id=1, user_name='user1')
+
+ transaction2 = connection.begin()
+ connection.execute(users.insert(), user_id=2, user_name='user2')
+
+ transaction3 = connection.begin_nested()
+ connection.execute(users.insert(), user_id=3, user_name='user3')
+
+ transaction4 = connection.begin()
+ connection.execute(users.insert(), user_id=4, user_name='user4')
+ transaction4.commit()
+
+ transaction3.rollback()
+
+ connection.execute(users.insert(), user_id=5, user_name='user5')
+
+ transaction2.commit()
+
+ transaction.prepare()
+
+ transaction.commit()
+
+ self.assertEquals(
+ connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(),
+ [(1,),(2,),(5,)]
+ )
+ connection.close()
-class AutoRollbackTest(testbase.PersistTest):
+ @testing.supported('postgres')
+ def testtwophaserecover(self):
+ # MySQL recovery doesn't currently seem to work correctly
+ # Prepared transactions disappear when connections are closed and even
+ # when they aren't it doesn't seem possible to use the recovery id.
+ connection = testbase.db.connect()
+
+ transaction = connection.begin_twophase()
+ connection.execute(users.insert(), user_id=1, user_name='user1')
+ transaction.prepare()
+
+ connection.close()
+ connection2 = testbase.db.connect()
+
+ self.assertEquals(
+ connection2.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(),
+ []
+ )
+
+ recoverables = connection2.recover_twophase()
+ self.assertTrue(
+ transaction.xid in recoverables
+ )
+
+ connection2.commit_prepared(transaction.xid, recover=True)
+
+ self.assertEquals(
+ connection2.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(),
+ [(1,)]
+ )
+ connection2.close()
+
+class AutoRollbackTest(PersistTest):
def setUpAll(self):
global metadata
metadata = MetaData()
@@ -123,7 +269,7 @@ class AutoRollbackTest(testbase.PersistTest):
def tearDownAll(self):
metadata.drop_all(testbase.db)
- @testbase.unsupported('sqlite')
+ @testing.unsupported('sqlite')
def testrollback_deadlock(self):
"""test that returning connections to the pool clears any object locks."""
conn1 = testbase.db.connect()
@@ -131,6 +277,7 @@ class AutoRollbackTest(testbase.PersistTest):
users = Table('deadlock_users', metadata,
Column('user_id', INT, primary_key = True),
Column('user_name', VARCHAR(20)),
+ test_needs_acid=True,
)
users.create(conn1)
conn1.execute("select * from deadlock_users")
@@ -141,15 +288,15 @@ class AutoRollbackTest(testbase.PersistTest):
users.drop(conn2)
conn2.close()
-class TLTransactionTest(testbase.PersistTest):
+class TLTransactionTest(PersistTest):
def setUpAll(self):
global users, metadata, tlengine
- tlengine = create_engine(testbase.db_uri, strategy='threadlocal')
+ tlengine = create_engine(testbase.db.url, strategy='threadlocal')
metadata = MetaData()
users = Table('query_users', metadata,
Column('user_id', INT, primary_key = True),
Column('user_name', VARCHAR(20)),
- mysql_engine='InnoDB'
+ test_needs_acid=True,
)
users.create(tlengine)
def tearDown(self):
@@ -254,7 +401,7 @@ class TLTransactionTest(testbase.PersistTest):
finally:
external_connection.close()
- @testbase.unsupported('sqlite')
+ @testing.unsupported('sqlite')
def testnesting(self):
"""tests nesting of tranacstions"""
external_connection = tlengine.connect()
@@ -330,7 +477,7 @@ class TLTransactionTest(testbase.PersistTest):
try:
mapper(User, users)
- sess = create_session(bind_to=tlengine)
+ sess = create_session(bind=tlengine)
tlengine.begin()
u = User()
sess.save(u)
@@ -347,6 +494,127 @@ class TLTransactionTest(testbase.PersistTest):
assert c1.connection is c2.connection
c2.close()
assert c1.connection.connection is not None
+
+class ForUpdateTest(PersistTest):
+ def setUpAll(self):
+ global counters, metadata
+ metadata = MetaData()
+ counters = Table('forupdate_counters', metadata,
+ Column('counter_id', INT, primary_key = True),
+ Column('counter_value', INT),
+ test_needs_acid=True,
+ )
+ counters.create(testbase.db)
+ def tearDown(self):
+ testbase.db.connect().execute(counters.delete())
+ def tearDownAll(self):
+ counters.drop(testbase.db)
+
+ def increment(self, count, errors, update_style=True, delay=0.005):
+ con = testbase.db.connect()
+ sel = counters.select(for_update=update_style,
+ whereclause=counters.c.counter_id==1)
+
+ for i in xrange(count):
+ trans = con.begin()
+ try:
+ existing = con.execute(sel).fetchone()
+ incr = existing['counter_value'] + 1
+
+ time.sleep(delay)
+ con.execute(counters.update(counters.c.counter_id==1,
+ values={'counter_value':incr}))
+ time.sleep(delay)
+
+ readback = con.execute(sel).fetchone()
+ if (readback['counter_value'] != incr):
+ raise AssertionError("Got %s post-update, expected %s" %
+ (readback['counter_value'], incr))
+ trans.commit()
+ except Exception, e:
+ trans.rollback()
+ errors.append(e)
+ break
+
+ con.close()
+
+ @testing.supported('mysql', 'oracle', 'postgres')
+ def testqueued_update(self):
+ """Test SELECT FOR UPDATE with concurrent modifications.
+
+ Runs concurrent modifications on a single row in the users table,
+ with each mutator trying to increment a value stored in user_name.
+ """
+
+ db = testbase.db
+ db.execute(counters.insert(), counter_id=1, counter_value=0)
+
+ iterations, thread_count = 10, 5
+ threads, errors = [], []
+ for i in xrange(thread_count):
+ thread = threading.Thread(target=self.increment,
+ args=(iterations,),
+ kwargs={'errors': errors,
+ 'update_style': True})
+ thread.start()
+ threads.append(thread)
+ for thread in threads:
+ thread.join()
+
+ for e in errors:
+ sys.stderr.write("Failure: %s\n" % e)
+
+ self.assert_(len(errors) == 0)
+
+ sel = counters.select(whereclause=counters.c.counter_id==1)
+ final = db.execute(sel).fetchone()
+ self.assert_(final['counter_value'] == iterations * thread_count)
+
+ def overlap(self, ids, errors, update_style):
+ sel = counters.select(for_update=update_style,
+ whereclause=counters.c.counter_id.in_(*ids))
+ con = testbase.db.connect()
+ trans = con.begin()
+ try:
+ rows = con.execute(sel).fetchall()
+ time.sleep(0.25)
+ trans.commit()
+ except Exception, e:
+ trans.rollback()
+ errors.append(e)
+
+ def _threaded_overlap(self, thread_count, groups, update_style=True, pool=5):
+ db = testbase.db
+ for cid in range(pool - 1):
+ db.execute(counters.insert(), counter_id=cid + 1, counter_value=0)
+
+ errors, threads = [], []
+ for i in xrange(thread_count):
+ thread = threading.Thread(target=self.overlap,
+ args=(groups.pop(0), errors, update_style))
+ thread.start()
+ threads.append(thread)
+ for thread in threads:
+ thread.join()
+
+ return errors
+
+ @testing.supported('mysql', 'oracle', 'postgres')
+ def testqueued_select(self):
+ """Simple SELECT FOR UPDATE conflict test"""
+
+ errors = self._threaded_overlap(2, [(1,2,3),(3,4,5)])
+ for e in errors:
+ sys.stderr.write("Failure: %s\n" % e)
+ self.assert_(len(errors) == 0)
+
+ @testing.supported('oracle', 'postgres')
+ def testnowait_select(self):
+ """Simple SELECT FOR UPDATE NOWAIT conflict test"""
+
+ errors = self._threaded_overlap(2, [(1,2,3),(3,4,5)],
+ update_style='nowait')
+ self.assert_(len(errors) != 0)
if __name__ == "__main__":
testbase.main()