summaryrefslogtreecommitdiff
path: root/test/sql/test_defaults.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2009-06-10 21:18:24 +0000
committerMike Bayer <mike_mp@zzzcomputing.com>2009-06-10 21:18:24 +0000
commit45cec095b4904ba71425d2fe18c143982dd08f43 (patch)
treeaf5e540fdcbf1cb2a3337157d69d4b40be010fa8 /test/sql/test_defaults.py
parent698a3c1ac665e7cd2ef8d5ad3ebf51b7fe6661f4 (diff)
downloadsqlalchemy-45cec095b4904ba71425d2fe18c143982dd08f43.tar.gz
- unit tests have been migrated from unittest to nose.
See README.unittests for information on how to run the tests. [ticket:970]
Diffstat (limited to 'test/sql/test_defaults.py')
-rw-r--r--test/sql/test_defaults.py641
1 files changed, 641 insertions, 0 deletions
diff --git a/test/sql/test_defaults.py b/test/sql/test_defaults.py
new file mode 100644
index 000000000..964157466
--- /dev/null
+++ b/test/sql/test_defaults.py
@@ -0,0 +1,641 @@
+from sqlalchemy.test.testing import eq_, assert_raises, assert_raises_message
+import datetime
+from sqlalchemy import Sequence, Column, func
+from sqlalchemy.sql import select, text
+import sqlalchemy as sa
+from sqlalchemy.test import testing
+from sqlalchemy import MetaData, Integer, String, ForeignKey, Boolean
+from sqlalchemy.test.schema import Table
+from sqlalchemy.test.testing import eq_
+from test.sql import _base
+
+
+class DefaultTest(testing.TestBase):
+
+ @classmethod
+ def setup_class(cls):
+ global t, f, f2, ts, currenttime, metadata, default_generator
+
+ db = testing.db
+ metadata = MetaData(db)
+ default_generator = {'x':50}
+
+ def mydefault():
+ default_generator['x'] += 1
+ return default_generator['x']
+
+ def myupdate_with_ctx(ctx):
+ conn = ctx.connection
+ return conn.execute(sa.select([sa.text('13')])).scalar()
+
+ def mydefault_using_connection(ctx):
+ conn = ctx.connection
+ try:
+ return conn.execute(sa.select([sa.text('12')])).scalar()
+ finally:
+ # ensure a "close()" on this connection does nothing,
+ # since its a "branched" connection
+ conn.close()
+
+ use_function_defaults = testing.against('postgres', 'mssql', 'maxdb')
+ is_oracle = testing.against('oracle')
+
+ # select "count(1)" returns different results on different DBs also
+ # correct for "current_date" compatible as column default, value
+ # differences
+ currenttime = func.current_date(type_=sa.Date, bind=db)
+ if is_oracle:
+ ts = db.scalar(sa.select([func.trunc(func.sysdate(), sa.literal_column("'DAY'"), type_=sa.Date).label('today')]))
+ assert isinstance(ts, datetime.date) and not isinstance(ts, datetime.datetime)
+ f = sa.select([func.length('abcdef')], bind=db).scalar()
+ f2 = sa.select([func.length('abcdefghijk')], bind=db).scalar()
+ # TODO: engine propigation across nested functions not working
+ currenttime = func.trunc(currenttime, sa.literal_column("'DAY'"), bind=db, type_=sa.Date)
+ def1 = currenttime
+ def2 = func.trunc(sa.text("sysdate"), sa.literal_column("'DAY'"), type_=sa.Date)
+
+ deftype = sa.Date
+ elif use_function_defaults:
+ f = sa.select([func.length('abcdef')], bind=db).scalar()
+ f2 = sa.select([func.length('abcdefghijk')], bind=db).scalar()
+ def1 = currenttime
+ deftype = sa.Date
+ if testing.against('maxdb'):
+ def2 = sa.text("curdate")
+ elif testing.against('mssql'):
+ def2 = sa.text("getdate()")
+ else:
+ def2 = sa.text("current_date")
+ ts = db.func.current_date().scalar()
+ else:
+ f = len('abcdef')
+ f2 = len('abcdefghijk')
+ def1 = def2 = "3"
+ ts = 3
+ deftype = Integer
+
+ t = Table('default_test1', metadata,
+ # python function
+ Column('col1', Integer, primary_key=True,
+ default=mydefault),
+
+ # python literal
+ Column('col2', String(20),
+ default="imthedefault",
+ onupdate="im the update"),
+
+ # preexecute expression
+ Column('col3', Integer,
+ default=func.length('abcdef'),
+ onupdate=func.length('abcdefghijk')),
+
+ # SQL-side default from sql expression
+ Column('col4', deftype,
+ server_default=def1),
+
+ # SQL-side default from literal expression
+ Column('col5', deftype,
+ server_default=def2),
+
+ # preexecute + update timestamp
+ Column('col6', sa.Date,
+ default=currenttime,
+ onupdate=currenttime),
+
+ Column('boolcol1', sa.Boolean, default=True),
+ Column('boolcol2', sa.Boolean, default=False),
+
+ # python function which uses ExecutionContext
+ Column('col7', Integer,
+ default=mydefault_using_connection,
+ onupdate=myupdate_with_ctx),
+
+ # python builtin
+ Column('col8', sa.Date,
+ default=datetime.date.today,
+ onupdate=datetime.date.today),
+ # combo
+ Column('col9', String(20),
+ default='py',
+ server_default='ddl'))
+ t.create()
+
+ @classmethod
+ def teardown_class(cls):
+ t.drop()
+
+ def teardown(self):
+ default_generator['x'] = 50
+ t.delete().execute()
+
+ def test_bad_arg_signature(self):
+ ex_msg = \
+ "ColumnDefault Python function takes zero or one positional arguments"
+
+ def fn1(x, y): pass
+ def fn2(x, y, z=3): pass
+ class fn3(object):
+ def __init__(self, x, y):
+ pass
+ class FN4(object):
+ def __call__(self, x, y):
+ pass
+ fn4 = FN4()
+
+ for fn in fn1, fn2, fn3, fn4:
+ assert_raises_message(sa.exc.ArgumentError,
+ ex_msg,
+ sa.ColumnDefault, fn)
+
+ def test_arg_signature(self):
+ def fn1(): pass
+ def fn2(): pass
+ def fn3(x=1): pass
+ def fn4(x=1, y=2, z=3): pass
+ fn5 = list
+ class fn6(object):
+ def __init__(self, x):
+ pass
+ class fn6(object):
+ def __init__(self, x, y=3):
+ pass
+ class FN7(object):
+ def __call__(self, x):
+ pass
+ fn7 = FN7()
+ class FN8(object):
+ def __call__(self, x, y=3):
+ pass
+ fn8 = FN8()
+
+ for fn in fn1, fn2, fn3, fn4, fn5, fn6, fn7, fn8:
+ c = sa.ColumnDefault(fn)
+
+ @testing.fails_on('firebird', 'Data type unknown')
+ def test_standalone(self):
+ c = testing.db.engine.contextual_connect()
+ x = c.execute(t.c.col1.default)
+ y = t.c.col2.default.execute()
+ z = c.execute(t.c.col3.default)
+ assert 50 <= x <= 57
+ eq_(y, 'imthedefault')
+ eq_(z, f)
+ eq_(f2, 11)
+
+ def test_py_vs_server_default_detection(self):
+
+ def has_(name, *wanted):
+ slots = ['default', 'onupdate', 'server_default', 'server_onupdate']
+ col = tbl.c[name]
+ for slot in wanted:
+ slots.remove(slot)
+ assert getattr(col, slot) is not None, getattr(col, slot)
+ for slot in slots:
+ assert getattr(col, slot) is None, getattr(col, slot)
+
+ tbl = t
+ has_('col1', 'default')
+ has_('col2', 'default', 'onupdate')
+ has_('col3', 'default', 'onupdate')
+ has_('col4', 'server_default')
+ has_('col5', 'server_default')
+ has_('col6', 'default', 'onupdate')
+ has_('boolcol1', 'default')
+ has_('boolcol2', 'default')
+ has_('col7', 'default', 'onupdate')
+ has_('col8', 'default', 'onupdate')
+ has_('col9', 'default', 'server_default')
+
+ ColumnDefault, DefaultClause = sa.ColumnDefault, sa.DefaultClause
+
+ t2 = Table('t2', MetaData(),
+ Column('col1', Integer, Sequence('foo')),
+ Column('col2', Integer,
+ default=Sequence('foo'),
+ server_default='y'),
+ Column('col3', Integer,
+ Sequence('foo'),
+ server_default='x'),
+ Column('col4', Integer,
+ ColumnDefault('x'),
+ DefaultClause('y')),
+ Column('col4', Integer,
+ ColumnDefault('x'),
+ DefaultClause('y'),
+ DefaultClause('y', for_update=True)),
+ Column('col5', Integer,
+ ColumnDefault('x'),
+ DefaultClause('y'),
+ onupdate='z'),
+ Column('col6', Integer,
+ ColumnDefault('x'),
+ server_default='y',
+ onupdate='z'),
+ Column('col7', Integer,
+ default='x',
+ server_default='y',
+ onupdate='z'),
+ Column('col8', Integer,
+ server_onupdate='u',
+ default='x',
+ server_default='y',
+ onupdate='z'))
+ tbl = t2
+ has_('col1', 'default')
+ has_('col2', 'default', 'server_default')
+ has_('col3', 'default', 'server_default')
+ has_('col4', 'default', 'server_default', 'server_onupdate')
+ has_('col5', 'default', 'server_default', 'onupdate')
+ has_('col6', 'default', 'server_default', 'onupdate')
+ has_('col7', 'default', 'server_default', 'onupdate')
+ has_('col8', 'default', 'server_default', 'onupdate', 'server_onupdate')
+
+ @testing.fails_on('firebird', 'Data type unknown')
+ def test_insert(self):
+ r = t.insert().execute()
+ assert r.lastrow_has_defaults()
+ eq_(set(r.context.postfetch_cols),
+ set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]))
+
+ r = t.insert(inline=True).execute()
+ assert r.lastrow_has_defaults()
+ eq_(set(r.context.postfetch_cols),
+ set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]))
+
+ t.insert().execute()
+
+ ctexec = sa.select([currenttime.label('now')], bind=testing.db).scalar()
+ l = t.select().order_by(t.c.col1).execute()
+ today = datetime.date.today()
+ eq_(l.fetchall(), [
+ (x, 'imthedefault', f, ts, ts, ctexec, True, False,
+ 12, today, 'py')
+ for x in range(51, 54)])
+
+ t.insert().execute(col9=None)
+ assert r.lastrow_has_defaults()
+ eq_(set(r.context.postfetch_cols),
+ set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]))
+
+ eq_(t.select(t.c.col1==54).execute().fetchall(),
+ [(54, 'imthedefault', f, ts, ts, ctexec, True, False,
+ 12, today, None)])
+
+ @testing.fails_on('firebird', 'Data type unknown')
+ def test_insertmany(self):
+ # MySQL-Python 1.2.2 breaks functions in execute_many :(
+ if (testing.against('mysql') and
+ testing.db.dialect.dbapi.version_info[:3] == (1, 2, 2)):
+ return
+
+ r = t.insert().execute({}, {}, {})
+
+ ctexec = currenttime.scalar()
+ l = t.select().execute()
+ today = datetime.date.today()
+ eq_(l.fetchall(),
+ [(51, 'imthedefault', f, ts, ts, ctexec, True, False,
+ 12, today, 'py'),
+ (52, 'imthedefault', f, ts, ts, ctexec, True, False,
+ 12, today, 'py'),
+ (53, 'imthedefault', f, ts, ts, ctexec, True, False,
+ 12, today, 'py')])
+
+ def test_insert_values(self):
+ t.insert(values={'col3':50}).execute()
+ l = t.select().execute()
+ eq_(50, l.fetchone()['col3'])
+
+ @testing.fails_on('firebird', 'Data type unknown')
+ def test_updatemany(self):
+ # MySQL-Python 1.2.2 breaks functions in execute_many :(
+ if (testing.against('mysql') and
+ testing.db.dialect.dbapi.version_info[:3] == (1, 2, 2)):
+ return
+
+ t.insert().execute({}, {}, {})
+
+ t.update(t.c.col1==sa.bindparam('pkval')).execute(
+ {'pkval':51,'col7':None, 'col8':None, 'boolcol1':False})
+
+ t.update(t.c.col1==sa.bindparam('pkval')).execute(
+ {'pkval':51,},
+ {'pkval':52,},
+ {'pkval':53,})
+
+ l = t.select().execute()
+ ctexec = currenttime.scalar()
+ today = datetime.date.today()
+ eq_(l.fetchall(),
+ [(51, 'im the update', f2, ts, ts, ctexec, False, False,
+ 13, today, 'py'),
+ (52, 'im the update', f2, ts, ts, ctexec, True, False,
+ 13, today, 'py'),
+ (53, 'im the update', f2, ts, ts, ctexec, True, False,
+ 13, today, 'py')])
+
+ @testing.fails_on('firebird', 'Data type unknown')
+ def test_update(self):
+ r = t.insert().execute()
+ pk = r.last_inserted_ids()[0]
+ t.update(t.c.col1==pk).execute(col4=None, col5=None)
+ ctexec = currenttime.scalar()
+ l = t.select(t.c.col1==pk).execute()
+ l = l.fetchone()
+ eq_(l,
+ (pk, 'im the update', f2, None, None, ctexec, True, False,
+ 13, datetime.date.today(), 'py'))
+ eq_(11, f2)
+
+ @testing.fails_on('firebird', 'Data type unknown')
+ def test_update_values(self):
+ r = t.insert().execute()
+ pk = r.last_inserted_ids()[0]
+ t.update(t.c.col1==pk, values={'col3': 55}).execute()
+ l = t.select(t.c.col1==pk).execute()
+ l = l.fetchone()
+ eq_(55, l['col3'])
+
+ @testing.fails_on_everything_except('postgres')
+ def test_passive_override(self):
+ """
+ Primarily for postgres, tests that when we get a primary key column
+ back from reflecting a table which has a default value on it, we
+ pre-execute that DefaultClause upon insert, even though DefaultClause
+ says "let the database execute this", because in postgres we must have
+ all the primary key values in memory before insert; otherwise we can't
+ locate the just inserted row.
+
+ """
+ # TODO: move this to dialect/postgres
+ try:
+ meta = MetaData(testing.db)
+ testing.db.execute("""
+ CREATE TABLE speedy_users
+ (
+ speedy_user_id SERIAL PRIMARY KEY,
+
+ user_name VARCHAR NOT NULL,
+ user_password VARCHAR NOT NULL
+ );
+ """, None)
+
+ t = Table("speedy_users", meta, autoload=True)
+ t.insert().execute(user_name='user', user_password='lala')
+ l = t.select().execute().fetchall()
+ eq_(l, [(1, 'user', 'lala')])
+ finally:
+ testing.db.execute("drop table speedy_users", None)
+
+
+class PKDefaultTest(_base.TablesTest):
+ __requires__ = ('subqueries',)
+
+ @classmethod
+ def define_tables(cls, metadata):
+ t2 = Table('t2', metadata,
+ Column('nextid', Integer))
+
+ Table('t1', metadata,
+ Column('id', Integer, primary_key=True,
+ default=sa.select([func.max(t2.c.nextid)]).as_scalar()),
+ Column('data', String(30)))
+
+ @testing.fails_on('mssql', 'FIXME: unknown')
+ @testing.resolve_artifact_names
+ def test_basic(self):
+ t2.insert().execute(nextid=1)
+ r = t1.insert().execute(data='hi')
+ eq_([1], r.last_inserted_ids())
+
+ t2.insert().execute(nextid=2)
+ r = t1.insert().execute(data='there')
+ eq_([2], r.last_inserted_ids())
+
+
+class PKIncrementTest(_base.TablesTest):
+ run_define_tables = 'each'
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table("aitable", metadata,
+ Column('id', Integer, Sequence('ai_id_seq', optional=True),
+ primary_key=True),
+ Column('int1', Integer),
+ Column('str1', String(20)))
+
+ # TODO: add coverage for increment on a secondary column in a key
+ @testing.fails_on('firebird', 'Data type unknown')
+ @testing.resolve_artifact_names
+ def _test_autoincrement(self, bind):
+ ids = set()
+ rs = bind.execute(aitable.insert(), int1=1)
+ last = rs.last_inserted_ids()[0]
+ self.assert_(last)
+ self.assert_(last not in ids)
+ ids.add(last)
+
+ rs = bind.execute(aitable.insert(), str1='row 2')
+ last = rs.last_inserted_ids()[0]
+ self.assert_(last)
+ self.assert_(last not in ids)
+ ids.add(last)
+
+ rs = bind.execute(aitable.insert(), int1=3, str1='row 3')
+ last = rs.last_inserted_ids()[0]
+ self.assert_(last)
+ self.assert_(last not in ids)
+ ids.add(last)
+
+ rs = bind.execute(aitable.insert(values={'int1':func.length('four')}))
+ last = rs.last_inserted_ids()[0]
+ self.assert_(last)
+ self.assert_(last not in ids)
+ ids.add(last)
+
+ eq_(list(bind.execute(aitable.select().order_by(aitable.c.id))),
+ [(1, 1, None), (2, None, 'row 2'), (3, 3, 'row 3'), (4, 4, None)])
+
+ @testing.resolve_artifact_names
+ def test_autoincrement_autocommit(self):
+ self._test_autoincrement(testing.db)
+
+ @testing.resolve_artifact_names
+ def test_autoincrement_transaction(self):
+ con = testing.db.connect()
+ tx = con.begin()
+ try:
+ try:
+ self._test_autoincrement(con)
+ except:
+ try:
+ tx.rollback()
+ except:
+ pass
+ raise
+ else:
+ tx.commit()
+ finally:
+ con.close()
+
+
+class EmptyInsertTest(testing.TestBase):
+ @testing.exclude('sqlite', '<', (3, 3, 8), 'no empty insert support')
+ @testing.fails_on('oracle', 'FIXME: unknown')
+ def test_empty_insert(self):
+ metadata = MetaData(testing.db)
+ t1 = Table('t1', metadata,
+ Column('is_true', Boolean, server_default=('1')))
+ metadata.create_all()
+
+ try:
+ result = t1.insert().execute()
+ eq_(1, select([func.count(text('*'))], from_obj=t1).scalar())
+ eq_(True, t1.select().scalar())
+ finally:
+ metadata.drop_all()
+
+class AutoIncrementTest(_base.TablesTest):
+ __requires__ = ('identity',)
+ run_define_tables = 'each'
+
+ @classmethod
+ def define_tables(cls, metadata):
+ """Each test manipulates self.metadata individually."""
+
+ @testing.exclude('sqlite', '<', (3, 4), 'no database support')
+ def test_autoincrement_single_col(self):
+ single = Table('single', self.metadata,
+ Column('id', Integer, primary_key=True))
+ single.create()
+
+ r = single.insert().execute()
+ id_ = r.last_inserted_ids()[0]
+ assert id_ is not None
+ eq_(1, sa.select([func.count(sa.text('*'))], from_obj=single).scalar())
+
+ def test_autoincrement_fk(self):
+ nodes = Table('nodes', self.metadata,
+ Column('id', Integer, primary_key=True),
+ Column('parent_id', Integer, ForeignKey('nodes.id')),
+ Column('data', String(30)))
+ nodes.create()
+
+ r = nodes.insert().execute(data='foo')
+ id_ = r.last_inserted_ids()[0]
+ nodes.insert().execute(data='bar', parent_id=id_)
+
+ @testing.fails_on('sqlite', 'FIXME: unknown')
+ def test_non_autoincrement(self):
+ # sqlite INT primary keys can be non-unique! (only for ints)
+ nonai = Table("nonaitest", self.metadata,
+ Column('id', Integer, autoincrement=False, primary_key=True),
+ Column('data', String(20)))
+ nonai.create()
+
+
+ try:
+ # postgres + mysql strict will fail on first row,
+ # mysql in legacy mode fails on second row
+ nonai.insert().execute(data='row 1')
+ nonai.insert().execute(data='row 2')
+ assert False
+ except sa.exc.SQLError, e:
+ assert True
+
+ nonai.insert().execute(id=1, data='row 1')
+
+
+class SequenceTest(testing.TestBase):
+ __requires__ = ('sequences',)
+
+ @classmethod
+ def setup_class(cls):
+ global cartitems, sometable, metadata
+ metadata = MetaData(testing.db)
+ cartitems = Table("cartitems", metadata,
+ Column("cart_id", Integer, Sequence('cart_id_seq'), primary_key=True),
+ Column("description", String(40)),
+ Column("createdate", sa.DateTime())
+ )
+ sometable = Table( 'Manager', metadata,
+ Column('obj_id', Integer, Sequence('obj_id_seq'), ),
+ Column('name', String(128)),
+ Column('id', Integer, Sequence('Manager_id_seq', optional=True),
+ primary_key=True),
+ )
+
+ metadata.create_all()
+
+ def testseqnonpk(self):
+ """test sequences fire off as defaults on non-pk columns"""
+
+ result = sometable.insert().execute(name="somename")
+ assert 'id' in result.postfetch_cols()
+
+ result = sometable.insert().execute(name="someother")
+ assert 'id' in result.postfetch_cols()
+
+ sometable.insert().execute(
+ {'name':'name3'},
+ {'name':'name4'})
+ eq_(sometable.select().execute().fetchall(),
+ [(1, "somename", 1),
+ (2, "someother", 2),
+ (3, "name3", 3),
+ (4, "name4", 4)])
+
+ def testsequence(self):
+ cartitems.insert().execute(description='hi')
+ cartitems.insert().execute(description='there')
+ r = cartitems.insert().execute(description='lala')
+
+ assert r.last_inserted_ids() and r.last_inserted_ids()[0] is not None
+ id_ = r.last_inserted_ids()[0]
+
+ eq_(1,
+ sa.select([func.count(cartitems.c.cart_id)],
+ sa.and_(cartitems.c.description == 'lala',
+ cartitems.c.cart_id == id_)).scalar())
+
+ cartitems.select().execute().fetchall()
+
+ @testing.fails_on('maxdb', 'FIXME: unknown')
+ # maxdb db-api seems to double-execute NEXTVAL internally somewhere,
+ # throwing off the numbers for these tests...
+ def test_implicit_sequence_exec(self):
+ s = Sequence("my_sequence", metadata=MetaData(testing.db))
+ s.create()
+ try:
+ x = s.execute()
+ eq_(x, 1)
+ finally:
+ s.drop()
+
+ @testing.fails_on('maxdb', 'FIXME: unknown')
+ def teststandalone_explicit(self):
+ s = Sequence("my_sequence")
+ s.create(bind=testing.db)
+ try:
+ x = s.execute(testing.db)
+ eq_(x, 1)
+ finally:
+ s.drop(testing.db)
+
+ def test_checkfirst(self):
+ s = Sequence("my_sequence")
+ s.create(testing.db, checkfirst=False)
+ s.create(testing.db, checkfirst=True)
+ s.drop(testing.db, checkfirst=False)
+ s.drop(testing.db, checkfirst=True)
+
+ @testing.fails_on('maxdb', 'FIXME: unknown')
+ def teststandalone2(self):
+ x = cartitems.c.cart_id.sequence.execute()
+ self.assert_(1 <= x <= 4)
+
+ @classmethod
+ def teardown_class(cls):
+ metadata.drop_all()
+
+