diff options
Diffstat (limited to 'test/sql/test_defaults.py')
-rw-r--r-- | test/sql/test_defaults.py | 288 |
1 files changed, 166 insertions, 122 deletions
diff --git a/test/sql/test_defaults.py b/test/sql/test_defaults.py index d511de229..2c144daf4 100644 --- a/test/sql/test_defaults.py +++ b/test/sql/test_defaults.py @@ -5,8 +5,9 @@ from sqlalchemy.sql import select, text import sqlalchemy as sa from sqlalchemy import testing from sqlalchemy.testing import engines -from sqlalchemy import MetaData, Integer, String, ForeignKey, Boolean, exc,\ - Sequence, func, literal, Unicode, cast +from sqlalchemy import ( + MetaData, Integer, String, ForeignKey, Boolean, exc, Sequence, func, + literal, Unicode, cast) from sqlalchemy.types import TypeDecorator, TypeEngine from sqlalchemy.testing.schema import Table, Column from sqlalchemy.dialects import sqlite @@ -16,7 +17,6 @@ from sqlalchemy import util t = f = f2 = ts = currenttime = metadata = default_generator = None -t = f = f2 = ts = currenttime = metadata = default_generator = None class DefaultTest(fixtures.TestBase): __backend__ = True @@ -59,14 +59,23 @@ class DefaultTest(fixtures.TestBase): # differences currenttime = func.current_date(type_=sa.Date, bind=db) if is_oracle: - ts = db.scalar(sa.select([func.trunc(func.sysdate(), sa.literal_column("'DAY'"), type_=sa.Date).label('today')])) - assert isinstance(ts, datetime.date) and not isinstance(ts, datetime.datetime) + ts = db.scalar( + sa.select( + [ + func.trunc( + func.sysdate(), sa.literal_column("'DAY'"), + type_=sa.Date).label('today')])) + assert isinstance(ts, datetime.date) and not isinstance( + ts, datetime.datetime) f = sa.select([func.length('abcdef')], bind=db).scalar() f2 = sa.select([func.length('abcdefghijk')], bind=db).scalar() # TODO: engine propigation across nested functions not working - currenttime = func.trunc(currenttime, sa.literal_column("'DAY'"), bind=db, type_=sa.Date) + currenttime = func.trunc( + currenttime, sa.literal_column("'DAY'"), bind=db, + type_=sa.Date) def1 = currenttime - def2 = func.trunc(sa.text("sysdate"), sa.literal_column("'DAY'"), type_=sa.Date) + def2 = func.trunc( + sa.text("sysdate"), sa.literal_column("'DAY'"), type_=sa.Date) deftype = sa.Date elif use_function_defaults: @@ -86,7 +95,8 @@ class DefaultTest(fixtures.TestBase): ts = 3 deftype = Integer - t = Table('default_test1', metadata, + t = Table( + 'default_test1', metadata, # python function Column('col1', Integer, primary_key=True, default=mydefault), @@ -146,47 +156,56 @@ class DefaultTest(fixtures.TestBase): t.delete().execute() def test_bad_arg_signature(self): - ex_msg = \ - "ColumnDefault Python function takes zero "\ - "or one positional arguments" + ex_msg = "ColumnDefault Python function takes zero " \ + "or one positional arguments" def fn1(x, y): pass + def fn2(x, y, z=3): pass + class fn3(object): def __init__(self, x, y): pass + class FN4(object): def __call__(self, x, y): pass fn4 = FN4() for fn in fn1, fn2, fn3, fn4: - assert_raises_message(sa.exc.ArgumentError, - ex_msg, - sa.ColumnDefault, fn) + assert_raises_message( + sa.exc.ArgumentError, ex_msg, sa.ColumnDefault, fn) def test_arg_signature(self): + def fn1(): pass + def fn2(): pass + def fn3(x=1): eq_(x, 1) + def fn4(x=1, y=2, z=3): eq_(x, 1) fn5 = list + class fn6a(object): def __init__(self, x): eq_(x, "context") + class fn6b(object): def __init__(self, x, y=3): eq_(x, "context") + class FN7(object): def __call__(self, x): eq_(x, "context") fn7 = FN7() + class FN8(object): def __call__(self, x, y=3): eq_(x, "context") @@ -210,7 +229,8 @@ class DefaultTest(fixtures.TestBase): def test_py_vs_server_default_detection(self): def has_(name, *wanted): - slots = ['default', 'onupdate', 'server_default', 'server_onupdate'] + slots = [ + 'default', 'onupdate', 'server_default', 'server_onupdate'] col = tbl.c[name] for slot in wanted: slots.remove(slot) @@ -273,7 +293,8 @@ class DefaultTest(fixtures.TestBase): has_('col5', 'default', 'server_default', 'onupdate') has_('col6', 'default', 'server_default', 'onupdate') has_('col7', 'default', 'server_default', 'onupdate') - has_('col8', 'default', 'server_default', 'onupdate', 'server_onupdate') + has_( + 'col8', 'default', 'server_default', 'onupdate', 'server_onupdate') @testing.fails_on('firebird', 'Data type unknown') def test_insert(self): @@ -289,7 +310,8 @@ class DefaultTest(fixtures.TestBase): t.insert().execute() - ctexec = sa.select([currenttime.label('now')], bind=testing.db).scalar() + ctexec = sa.select( + [currenttime.label('now')], bind=testing.db).scalar() l = t.select().order_by(t.c.col1).execute() today = datetime.date.today() eq_(l.fetchall(), [ @@ -353,8 +375,10 @@ class DefaultTest(fixtures.TestBase): ) def test_missing_many_param(self): - assert_raises_message(exc.StatementError, - "A value is required for bind parameter 'col7', in parameter group 1", + assert_raises_message( + exc.StatementError, + "A value is required for bind parameter 'col7', in parameter " + "group 1", t.insert().execute, {'col4': 7, 'col7': 12, 'col8': 19}, {'col4': 7, 'col8': 19}, @@ -423,13 +447,16 @@ class PKDefaultTest(fixtures.TablesTest): @classmethod def define_tables(cls, metadata): - t2 = Table('t2', metadata, + t2 = Table( + 't2', metadata, Column('nextid', Integer)) - Table('t1', metadata, - Column('id', Integer, primary_key=True, - default=sa.select([func.max(t2.c.nextid)]).as_scalar()), - Column('data', String(30))) + Table( + 't1', metadata, + Column( + 'id', Integer, primary_key=True, + default=sa.select([func.max(t2.c.nextid)]).as_scalar()), + Column('data', String(30))) @testing.requires.returning def test_with_implicit_returning(self): @@ -445,7 +472,7 @@ class PKDefaultTest(fixtures.TablesTest): engine = testing.db else: engine = engines.testing_engine( - options={'implicit_returning': returning}) + options={'implicit_returning': returning}) engine.execute(t2.insert(), nextid=1) r = engine.execute(t1.insert(), data='hi') eq_([1], r.inserted_primary_key) @@ -454,6 +481,7 @@ class PKDefaultTest(fixtures.TablesTest): r = engine.execute(t1.insert(), data='there') eq_([2], r.inserted_primary_key) + class PKIncrementTest(fixtures.TablesTest): run_define_tables = 'each' __backend__ = True @@ -529,13 +557,15 @@ class EmptyInsertTest(fixtures.TestBase): @testing.fails_on('oracle', 'FIXME: unknown') @testing.provide_metadata def test_empty_insert(self): - t1 = Table('t1', self.metadata, - Column('is_true', Boolean, server_default=('1'))) + t1 = Table( + 't1', self.metadata, + Column('is_true', Boolean, server_default=('1'))) self.metadata.create_all() t1.insert().execute() eq_(1, select([func.count(text('*'))], from_obj=t1).scalar()) eq_(True, t1.select().scalar()) + class AutoIncrementTest(fixtures.TablesTest): __requires__ = ('identity',) run_define_tables = 'each' @@ -557,7 +587,8 @@ class AutoIncrementTest(fixtures.TablesTest): eq_(1, sa.select([func.count(sa.text('*'))], from_obj=single).scalar()) def test_autoincrement_fk(self): - nodes = Table('nodes', self.metadata, + nodes = Table( + 'nodes', self.metadata, Column('id', Integer, primary_key=True), Column('parent_id', Integer, ForeignKey('nodes.id')), Column('data', String(30))) @@ -572,39 +603,43 @@ class AutoIncrementTest(fixtures.TablesTest): impl = TypeEngine assert MyType()._type_affinity is None - t = Table('x', MetaData(), + t = Table( + 'x', MetaData(), Column('id', MyType(), primary_key=True) ) assert t._autoincrement_column is None def test_autoincrement_ignore_fk(self): m = MetaData() - Table('y', m, + Table( + 'y', m, Column('id', Integer(), primary_key=True) ) - x = Table('x', m, - Column('id', Integer(), - ForeignKey('y.id'), + x = Table( + 'x', m, + Column( + 'id', Integer(), ForeignKey('y.id'), autoincrement="ignore_fk", primary_key=True) ) assert x._autoincrement_column is x.c.id def test_autoincrement_fk_disqualifies(self): m = MetaData() - Table('y', m, + Table( + 'y', m, Column('id', Integer(), primary_key=True) ) - x = Table('x', m, - Column('id', Integer(), - ForeignKey('y.id'), - primary_key=True) + x = Table( + 'x', m, + Column('id', Integer(), ForeignKey('y.id'), primary_key=True) ) assert x._autoincrement_column is None @testing.fails_on('sqlite', 'FIXME: unknown') def test_non_autoincrement(self): # sqlite INT primary keys can be non-unique! (only for ints) - nonai = Table("nonaitest", self.metadata, + nonai = Table( + "nonaitest", self.metadata, Column('id', Integer, autoincrement=False, primary_key=True), Column('data', String(20))) nonai.create() @@ -621,11 +656,11 @@ class AutoIncrementTest(fixtures.TablesTest): nonai.insert().execute(id=1, data='row 1') - def test_col_w_sequence_non_autoinc_no_firing(self): metadata = self.metadata # plain autoincrement/PK table in the actual schema - Table("x", metadata, + Table( + "x", metadata, Column("set_id", Integer, primary_key=True) ) metadata.create_all() @@ -633,21 +668,19 @@ class AutoIncrementTest(fixtures.TablesTest): # for the INSERT use a table with a Sequence # and autoincrement=False. Using a ForeignKey # would have the same effect - dataset_no_autoinc = Table("x", MetaData(), - Column("set_id", Integer, Sequence("some_seq"), - primary_key=True, autoincrement=False) - ) - - testing.db.execute( - dataset_no_autoinc.insert() + dataset_no_autoinc = Table( + "x", MetaData(), + Column( + "set_id", Integer, Sequence("some_seq"), + primary_key=True, autoincrement=False) ) + + testing.db.execute(dataset_no_autoinc.insert()) eq_( testing.db.scalar(dataset_no_autoinc.count()), 1 ) - - class SequenceDDLTest(fixtures.TestBase, testing.AssertsCompiledSQL): __dialect__ = 'default' __backend__ = True @@ -678,6 +711,7 @@ class SequenceDDLTest(fixtures.TestBase, testing.AssertsCompiledSQL): "DROP SEQUENCE foo_seq", ) + class SequenceExecTest(fixtures.TestBase): __requires__ = ('sequences',) __backend__ = True @@ -743,7 +777,8 @@ class SequenceExecTest(fixtures.TestBase): """test can use next_value() in whereclause""" metadata = self.metadata - t1 = Table('t', metadata, + t1 = Table( + 't', metadata, Column('x', Integer) ) t1.create(testing.db) @@ -761,7 +796,8 @@ class SequenceExecTest(fixtures.TestBase): """test can use next_value() in values() of _ValuesBase""" metadata = self.metadata - t1 = Table('t', metadata, + t1 = Table( + 't', metadata, Column('x', Integer) ) t1.create(testing.db) @@ -782,13 +818,12 @@ class SequenceExecTest(fixtures.TestBase): e = engines.testing_engine(options={'implicit_returning': False}) s = Sequence("my_sequence") metadata.bind = e - t1 = Table('t', metadata, + t1 = Table( + 't', metadata, Column('x', Integer, primary_key=True) ) t1.create() - r = e.execute( - t1.insert().values(x=s.next_value()) - ) + r = e.execute(t1.insert().values(x=s.next_value())) eq_(r.inserted_primary_key, [None]) @testing.requires.returning @@ -801,7 +836,8 @@ class SequenceExecTest(fixtures.TestBase): e = engines.testing_engine(options={'implicit_returning': True}) s = Sequence("my_sequence") metadata.bind = e - t1 = Table('t', metadata, + t1 = Table( + 't', metadata, Column('x', Integer, primary_key=True) ) t1.create() @@ -810,6 +846,7 @@ class SequenceExecTest(fixtures.TestBase): ) self._assert_seq_result(r.inserted_primary_key[0]) + class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL): __requires__ = ('sequences',) __backend__ = True @@ -832,7 +869,6 @@ class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL): finally: seq.drop(testing.db) - def _has_sequence(self, name): return testing.db.dialect.has_sequence(testing.db, name) @@ -840,15 +876,9 @@ class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL): """test dialect renders the "nextval" construct, whether or not "optional" is set """ - for s in ( - Sequence("my_seq"), - Sequence("my_seq", optional=True)): - assert str(s.next_value(). - compile(dialect=testing.db.dialect)) in ( - "nextval('my_seq')", - "gen_id(my_seq, 1)", - "my_seq.nextval", - ) + for s in (Sequence("my_seq"), Sequence("my_seq", optional=True)): + assert str(s.next_value().compile(dialect=testing.db.dialect)) in ( + "nextval('my_seq')", "gen_id(my_seq, 1)", "my_seq.nextval",) def test_nextval_unsupported(self): """test next_value() used on non-sequence platform @@ -899,11 +929,11 @@ class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL): Sequence("s1", metadata=metadata) s2 = Sequence("s2", metadata=metadata) s3 = Sequence("s3") - t = Table('t', metadata, - Column('c', Integer, s3, primary_key=True)) + t = Table( + 't', metadata, + Column('c', Integer, s3, primary_key=True)) assert s3.metadata is metadata - t.create(testing.db, checkfirst=True) s3.drop(testing.db) @@ -923,8 +953,9 @@ class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL): assert not self._has_sequence('s1') assert not self._has_sequence('s2') - cartitems = sometable = metadata = None + + class TableBoundSequenceTest(fixtures.TestBase): __requires__ = ('sequences',) __backend__ = True @@ -933,17 +964,21 @@ class TableBoundSequenceTest(fixtures.TestBase): def setup_class(cls): global cartitems, sometable, metadata metadata = MetaData(testing.db) - cartitems = Table("cartitems", metadata, - Column("cart_id", Integer, Sequence('cart_id_seq'), primary_key=True), + cartitems = Table( + "cartitems", metadata, + Column( + "cart_id", Integer, Sequence('cart_id_seq'), primary_key=True), Column("description", String(40)), Column("createdate", sa.DateTime()) ) - sometable = Table('Manager', metadata, - Column('obj_id', Integer, Sequence('obj_id_seq')), - Column('name', String(128)), - Column('id', Integer, Sequence('Manager_id_seq', optional=True), - primary_key=True), - ) + sometable = Table( + 'Manager', metadata, + Column('obj_id', Integer, Sequence('obj_id_seq')), + Column('name', String(128)), + Column( + 'id', Integer, Sequence('Manager_id_seq', optional=True), + primary_key=True), + ) metadata.create_all() @@ -969,8 +1004,7 @@ class TableBoundSequenceTest(fixtures.TestBase): def test_seq_nonpk(self): """test sequences fire off as defaults on non-pk columns""" - engine = engines.testing_engine( - options={'implicit_returning': False}) + engine = engines.testing_engine(options={'implicit_returning': False}) result = engine.execute(sometable.insert(), name="somename") assert set(result.postfetch_cols()) == set([sometable.c.obj_id]) @@ -991,8 +1025,8 @@ class TableBoundSequenceTest(fixtures.TestBase): class SpecialTypePKTest(fixtures.TestBase): """test process_result_value in conjunction with primary key columns. - Also tests that "autoincrement" checks are against column.type._type_affinity, - rather than the class of "type" itself. + Also tests that "autoincrement" checks are against + column.type._type_affinity, rather than the class of "type" itself. """ __backend__ = True @@ -1001,6 +1035,7 @@ class SpecialTypePKTest(fixtures.TestBase): def setup_class(cls): class MyInteger(TypeDecorator): impl = Integer + def process_bind_param(self, value, dialect): if value is None: return None @@ -1020,7 +1055,8 @@ class SpecialTypePKTest(fixtures.TestBase): kw['primary_key'] = True if kw.get('autoincrement', True): kw['test_needs_autoincrement'] = True - t = Table('x', metadata, + t = Table( + 'x', metadata, Column('y', self.MyInteger, *arg, **kw), Column('data', Integer), implicit_returning=implicit_returning @@ -1030,9 +1066,9 @@ class SpecialTypePKTest(fixtures.TestBase): r = t.insert().values(data=5).execute() # we don't pre-fetch 'server_default'. - if 'server_default' in kw and (not - testing.db.dialect.implicit_returning or - not implicit_returning): + if 'server_default' in kw and ( + not testing.db.dialect.implicit_returning or + not implicit_returning): eq_(r.inserted_primary_key, [None]) else: eq_(r.inserted_primary_key, ['INT_1']) @@ -1049,7 +1085,8 @@ class SpecialTypePKTest(fixtures.TestBase): self._run_test() def test_literal_default_label(self): - self._run_test(default=literal("INT_1", type_=self.MyInteger).label('foo')) + self._run_test( + default=literal("INT_1", type_=self.MyInteger).label('foo')) def test_literal_default_no_label(self): self._run_test(default=literal("INT_1", type_=self.MyInteger)) @@ -1075,6 +1112,7 @@ class SpecialTypePKTest(fixtures.TestBase): def test_server_default_no_implicit_returning(self): self._run_test(server_default='1', autoincrement=False) + class ServerDefaultsOnPKTest(fixtures.TestBase): __backend__ = True @@ -1090,11 +1128,13 @@ class ServerDefaultsOnPKTest(fixtures.TestBase): """ metadata = self.metadata - t = Table('x', metadata, - Column('y', String(10), server_default='key_one', primary_key=True), - Column('data', String(10)), - implicit_returning=False - ) + t = Table( + 'x', metadata, + Column( + 'y', String(10), server_default='key_one', primary_key=True), + Column('data', String(10)), + implicit_returning=False + ) metadata.create_all() r = t.insert().execute(data='data') eq_(r.inserted_primary_key, [None]) @@ -1106,13 +1146,16 @@ class ServerDefaultsOnPKTest(fixtures.TestBase): @testing.requires.returning @testing.provide_metadata def test_string_default_on_insert_with_returning(self): - """With implicit_returning, we get a string PK default back no problem.""" + """With implicit_returning, we get a string PK default back no + problem.""" metadata = self.metadata - t = Table('x', metadata, - Column('y', String(10), server_default='key_one', primary_key=True), - Column('data', String(10)) - ) + t = Table( + 'x', metadata, + Column( + 'y', String(10), server_default='key_one', primary_key=True), + Column('data', String(10)) + ) metadata.create_all() r = t.insert().execute(data='data') eq_(r.inserted_primary_key, ['key_one']) @@ -1124,12 +1167,12 @@ class ServerDefaultsOnPKTest(fixtures.TestBase): @testing.provide_metadata def test_int_default_none_on_insert(self): metadata = self.metadata - t = Table('x', metadata, - Column('y', Integer, - server_default='5', primary_key=True), - Column('data', String(10)), - implicit_returning=False - ) + t = Table( + 'x', metadata, + Column('y', Integer, server_default='5', primary_key=True), + Column('data', String(10)), + implicit_returning=False + ) assert t._autoincrement_column is None metadata.create_all() r = t.insert().execute(data='data') @@ -1144,15 +1187,16 @@ class ServerDefaultsOnPKTest(fixtures.TestBase): t.select().execute().fetchall(), [(5, 'data')] ) + @testing.provide_metadata def test_autoincrement_reflected_from_server_default(self): metadata = self.metadata - t = Table('x', metadata, - Column('y', Integer, - server_default='5', primary_key=True), - Column('data', String(10)), - implicit_returning=False - ) + t = Table( + 'x', metadata, + Column('y', Integer, server_default='5', primary_key=True), + Column('data', String(10)), + implicit_returning=False + ) assert t._autoincrement_column is None metadata.create_all() @@ -1163,12 +1207,12 @@ class ServerDefaultsOnPKTest(fixtures.TestBase): @testing.provide_metadata def test_int_default_none_on_insert_reflected(self): metadata = self.metadata - Table('x', metadata, - Column('y', Integer, - server_default='5', primary_key=True), - Column('data', String(10)), - implicit_returning=False - ) + Table( + 'x', metadata, + Column('y', Integer, server_default='5', primary_key=True), + Column('data', String(10)), + implicit_returning=False + ) metadata.create_all() m2 = MetaData(metadata.bind) @@ -1191,11 +1235,11 @@ class ServerDefaultsOnPKTest(fixtures.TestBase): @testing.provide_metadata def test_int_default_on_insert_with_returning(self): metadata = self.metadata - t = Table('x', metadata, - Column('y', Integer, - server_default='5', primary_key=True), - Column('data', String(10)) - ) + t = Table( + 'x', metadata, + Column('y', Integer, server_default='5', primary_key=True), + Column('data', String(10)) + ) metadata.create_all() r = t.insert().execute(data='data') @@ -1205,6 +1249,7 @@ class ServerDefaultsOnPKTest(fixtures.TestBase): [(5, 'data')] ) + class UnicodeDefaultsTest(fixtures.TestBase): __backend__ = True @@ -1215,7 +1260,6 @@ class UnicodeDefaultsTest(fixtures.TestBase): default = u('foo') Column(Unicode(32), default=default) - def test_nonunicode_default(self): default = b('foo') assert_raises_message( |