diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2010-11-18 18:57:33 -0500 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2010-11-18 18:57:33 -0500 |
| commit | fd4f39648aa7949e2c64626982e401fcbc928e1f (patch) | |
| tree | a9560d579187be26727e6d2958951cc10f704893 /test | |
| parent | 39fddb8bda933eb0e7835eed09656f6992b3ac58 (diff) | |
| parent | 2336b1cebfcb2f304e09cbc2a0e8bb3fb3a9ceeb (diff) | |
| download | sqlalchemy-fd4f39648aa7949e2c64626982e401fcbc928e1f.tar.gz | |
merge tip
Diffstat (limited to 'test')
| -rw-r--r-- | test/dialect/test_oracle.py | 44 | ||||
| -rw-r--r-- | test/dialect/test_postgresql.py | 76 | ||||
| -rw-r--r-- | test/ext/test_declarative.py | 32 | ||||
| -rw-r--r-- | test/orm/test_mapper.py | 11 | ||||
| -rw-r--r-- | test/orm/test_relationships.py | 389 |
5 files changed, 421 insertions, 131 deletions
diff --git a/test/dialect/test_oracle.py b/test/dialect/test_oracle.py index e1275dbfc..3a0fbac9a 100644 --- a/test/dialect/test_oracle.py +++ b/test/dialect/test_oracle.py @@ -367,7 +367,7 @@ class CompatFlagsTest(TestBase, AssertsCompiledSQL): def server_version_info(self): return (8, 2, 5) - dialect = oracle.dialect() + dialect = oracle.dialect(dbapi=testing.db.dialect.dbapi) dialect._get_server_version_info = server_version_info # before connect, assume modern DB @@ -384,7 +384,8 @@ class CompatFlagsTest(TestBase, AssertsCompiledSQL): self.assert_compile(Unicode(50),"VARCHAR(50)",dialect=dialect) self.assert_compile(UnicodeText(),"CLOB",dialect=dialect) - dialect = oracle.dialect(implicit_returning=True) + dialect = oracle.dialect(implicit_returning=True, + dbapi=testing.db.dialect.dbapi) dialect._get_server_version_info = server_version_info dialect.initialize(testing.db.connect()) assert dialect.implicit_returning @@ -392,7 +393,7 @@ class CompatFlagsTest(TestBase, AssertsCompiledSQL): def test_default_flags(self): """test with no initialization or server version info""" - dialect = oracle.dialect() + dialect = oracle.dialect(dbapi=testing.db.dialect.dbapi) assert dialect._supports_char_length assert dialect._supports_nchar assert dialect.use_ansi @@ -403,7 +404,7 @@ class CompatFlagsTest(TestBase, AssertsCompiledSQL): def test_ora10_flags(self): def server_version_info(self): return (10, 2, 5) - dialect = oracle.dialect() + dialect = oracle.dialect(dbapi=testing.db.dialect.dbapi) dialect._get_server_version_info = server_version_info dialect.initialize(testing.db.connect()) assert dialect._supports_char_length @@ -1043,7 +1044,40 @@ class TypesTest(TestBase, AssertsCompiledSQL): finally: t.drop(engine) - +class EuroNumericTest(TestBase): + """test the numeric output_type_handler when using non-US locale for NLS_LANG.""" + + __only_on__ = 'oracle+cx_oracle' + + def setup(self): + self.old_nls_lang = os.environ.get('NLS_LANG', False) + os.environ['NLS_LANG'] = "GERMAN" + self.engine = testing_engine() + + def teardown(self): + if self.old_nls_lang is not False: + os.environ['NLS_LANG'] = self.old_nls_lang + else: + del os.environ['NLS_LANG'] + self.engine.dispose() + + @testing.provide_metadata + def test_output_type_handler(self): + for stmt, exp, kw in [ + ("SELECT 0.1 FROM DUAL", Decimal("0.1"), {}), + ("SELECT 15 FROM DUAL", 15, {}), + ("SELECT CAST(15 AS NUMERIC(3, 1)) FROM DUAL", Decimal("15"), {}), + ("SELECT CAST(0.1 AS NUMERIC(5, 2)) FROM DUAL", Decimal("0.1"), {}), + ("SELECT :num FROM DUAL", Decimal("2.5"), {'num':Decimal("2.5")}) + ]: + test_exp = self.engine.scalar(stmt, **kw) + eq_( + test_exp, + exp + ) + assert type(test_exp) is type(exp) + + class DontReflectIOTTest(TestBase): """test that index overflow tables aren't included in table_names.""" diff --git a/test/dialect/test_postgresql.py b/test/dialect/test_postgresql.py index 3bc22f2a3..2b6642349 100644 --- a/test/dialect/test_postgresql.py +++ b/test/dialect/test_postgresql.py @@ -456,7 +456,25 @@ class EnumTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL): assert t2.c.value2.type.schema == 'test_schema' finally: metadata.drop_all() + +class NumericInterpretationTest(TestBase): + + + def test_numeric_codes(self): + from sqlalchemy.dialects.postgresql import pg8000, psycopg2, base + from decimal import Decimal + for dialect in (pg8000.dialect(), psycopg2.dialect()): + + typ = Numeric().dialect_impl(dialect) + for code in base._INT_TYPES + base._FLOAT_TYPES + \ + base._DECIMAL_TYPES: + proc = typ.result_processor(dialect, code) + val = 23.7 + if proc is not None: + val = proc(val) + assert val in (23.7, Decimal("23.7")) + class InsertTest(TestBase, AssertsExecutionResults): __only_on__ = 'postgresql' @@ -1866,6 +1884,64 @@ class SpecialTypesTest(TestBase, ComparesTables): assert t.c.plain_interval.type.precision is None assert t.c.precision_interval.type.precision == 3 +class UUIDTest(TestBase): + """Test the bind/return values of the UUID type.""" + + __only_on__ = 'postgresql' + + @testing.fails_on('postgresql+pg8000', 'No support for UUID type') + def test_uuid_string(self): + import uuid + self._test_round_trip( + Table('utable', MetaData(), + Column('data', postgresql.UUID()) + ), + str(uuid.uuid4()), + str(uuid.uuid4()) + ) + + @testing.fails_on('postgresql+pg8000', 'No support for UUID type') + def test_uuid_uuid(self): + import uuid + self._test_round_trip( + Table('utable', MetaData(), + Column('data', postgresql.UUID(as_uuid=True)) + ), + uuid.uuid4(), + uuid.uuid4() + ) + + def test_no_uuid_available(self): + from sqlalchemy.dialects.postgresql import base + uuid_type = base._python_UUID + base._python_UUID = None + try: + assert_raises( + NotImplementedError, + postgresql.UUID, as_uuid=True + ) + finally: + base._python_UUID = uuid_type + + def setup(self): + self.conn = testing.db.connect() + trans = self.conn.begin() + + def teardown(self): + self.conn.close() + + def _test_round_trip(self, utable, value1, value2): + utable.create(self.conn) + self.conn.execute(utable.insert(), {'data':value1}) + self.conn.execute(utable.insert(), {'data':value2}) + r = self.conn.execute( + select([utable.c.data]). + where(utable.c.data != value1) + ) + eq_(r.fetchone()[0], value2) + eq_(r.fetchone(), None) + + class MatchTest(TestBase, AssertsCompiledSQL): __only_on__ = 'postgresql' diff --git a/test/ext/test_declarative.py b/test/ext/test_declarative.py index 89e8e9108..9d7ee255a 100644 --- a/test/ext/test_declarative.py +++ b/test/ext/test_declarative.py @@ -631,7 +631,7 @@ class DeclarativeTest(DeclarativeTestBase): 'Mapper Mapper|User|users could not ' 'assemble any primary key', define) - def test_table_args(self): + def test_table_args_bad_format(self): def err(): class Foo1(Base): @@ -643,7 +643,30 @@ class DeclarativeTest(DeclarativeTestBase): assert_raises_message(sa.exc.ArgumentError, 'Tuple form of __table_args__ is ', err) + + def test_table_args_type(self): + def err(): + class Foo1(Base): + + __tablename__ = 'foo' + __table_args__ = ForeignKeyConstraint(['id'], ['foo.id' + ]) + id = Column('id', Integer, primary_key=True) + assert_raises_message(sa.exc.ArgumentError, + '__table_args__ value must be a tuple, ', err) + + def test_table_args_none(self): + + class Foo2(Base): + __tablename__ = 'foo' + __table_args__ = None + id = Column('id', Integer, primary_key=True) + + assert Foo2.__table__.kwargs == {} + + def test_table_args_dict_format(self): + class Foo2(Base): __tablename__ = 'foo' @@ -652,6 +675,13 @@ class DeclarativeTest(DeclarativeTestBase): assert Foo2.__table__.kwargs['mysql_engine'] == 'InnoDB' + def test_table_args_tuple_format(self): + class Foo2(Base): + + __tablename__ = 'foo' + __table_args__ = {'mysql_engine': 'InnoDB'} + id = Column('id', Integer, primary_key=True) + class Bar(Base): __tablename__ = 'bar' diff --git a/test/orm/test_mapper.py b/test/orm/test_mapper.py index f60d5a871..c3cbaeb33 100644 --- a/test/orm/test_mapper.py +++ b/test/orm/test_mapper.py @@ -665,6 +665,17 @@ class MapperTest(_fixtures.FixtureTest): None]) @testing.resolve_artifact_names + def test_scalar_pk_arg(self): + m1 = mapper(Item, items, primary_key=[items.c.id]) + m2 = mapper(Keyword, keywords, primary_key=keywords.c.id) + m3 = mapper(User, users, primary_key=(users.c.id,)) + + assert m1.primary_key[0] is items.c.id + assert m2.primary_key[0] is keywords.c.id + assert m3.primary_key[0] is users.c.id + + + @testing.resolve_artifact_names def test_custom_join(self): """select_from totally replace the FROM parameters.""" diff --git a/test/orm/test_relationships.py b/test/orm/test_relationships.py index eadf17b39..9cd99320a 100644 --- a/test/orm/test_relationships.py +++ b/test/orm/test_relationships.py @@ -5,25 +5,15 @@ from test.lib import testing from sqlalchemy import Integer, String, ForeignKey, MetaData, and_ from test.lib.schema import Table, Column from sqlalchemy.orm import mapper, relationship, relation, \ - backref, create_session, compile_mappers, clear_mappers, sessionmaker + backref, create_session, compile_mappers, \ + clear_mappers, sessionmaker, attributes,\ + Session, composite, column_property from test.lib.testing import eq_, startswith_ from test.orm import _base, _fixtures -class RelationshipTest(_base.MappedTest): - """An extended topological sort test - - This is essentially an extension of the "dependency.py" topological sort - test. In this test, a table is dependent on two other tables that are - otherwise unrelated to each other. The dependency sort must ensure that - this childmost table is below both parent tables in the outcome (a bug - existed where this was not always the case). - - While the straight topological sort tests should expose this, since the - sorting can be different due to subtle differences in program execution, - this test case was exposing the bug whereas the simpler tests were not. - - """ +class DependencyTwoParentTest(_base.MappedTest): + """Test flush() when a mapper is dependent on multiple relationships""" run_setup_mappers = 'once' run_inserts = 'once' @@ -32,18 +22,24 @@ class RelationshipTest(_base.MappedTest): @classmethod def define_tables(cls, metadata): Table("tbl_a", metadata, - Column("id", Integer, primary_key=True, test_needs_autoincrement=True), + Column("id", Integer, primary_key=True, + test_needs_autoincrement=True), Column("name", String(128))) Table("tbl_b", metadata, - Column("id", Integer, primary_key=True, test_needs_autoincrement=True), + Column("id", Integer, primary_key=True, + test_needs_autoincrement=True), Column("name", String(128))) Table("tbl_c", metadata, - Column("id", Integer, primary_key=True, test_needs_autoincrement=True), - Column("tbl_a_id", Integer, ForeignKey("tbl_a.id"), nullable=False), + Column("id", Integer, primary_key=True, + test_needs_autoincrement=True), + Column("tbl_a_id", Integer, ForeignKey("tbl_a.id"), + nullable=False), Column("name", String(128))) Table("tbl_d", metadata, - Column("id", Integer, primary_key=True, test_needs_autoincrement=True), - Column("tbl_c_id", Integer, ForeignKey("tbl_c.id"), nullable=False), + Column("id", Integer, primary_key=True, + test_needs_autoincrement=True), + Column("tbl_c_id", Integer, ForeignKey("tbl_c.id"), + nullable=False), Column("tbl_b_id", Integer, ForeignKey("tbl_b.id")), Column("name", String(128))) @@ -62,10 +58,12 @@ class RelationshipTest(_base.MappedTest): @testing.resolve_artifact_names def setup_mappers(cls): mapper(A, tbl_a, properties=dict( - c_rows=relationship(C, cascade="all, delete-orphan", backref="a_row"))) + c_rows=relationship(C, cascade="all, delete-orphan", + backref="a_row"))) mapper(B, tbl_b) mapper(C, tbl_c, properties=dict( - d_rows=relationship(D, cascade="all, delete-orphan", backref="c_row"))) + d_rows=relationship(D, cascade="all, delete-orphan", + backref="c_row"))) mapper(D, tbl_d, properties=dict( b_row=relationship(B))) @@ -101,8 +99,12 @@ class RelationshipTest(_base.MappedTest): session.flush() -class RelationshipTest2(_base.MappedTest): - """The ultimate relationship() test: +class CompositeSelfRefFKTest(_base.MappedTest): + """Tests a composite FK where, in + the relationship(), one col points + to itself in the same table. + + this is a very unusual case:: company employee ---------- ---------- @@ -117,22 +119,13 @@ class RelationshipTest2(_base.MappedTest): employee joins to its sub-employees both on reports_to_id, *and on company_id to itself*. - As of 0.5.5 we are making a slight behavioral change, - such that the custom foreign_keys setting - on the o2m side has to be explicitly - unset on the backref m2o side - this to suit - the vast majority of use cases where the backref() - is to receive the same foreign_keys argument - as the forwards reference. But we also - have smartened the remote_side logic such that - you don't even need the custom fks setting. - """ @classmethod def define_tables(cls, metadata): Table('company_t', metadata, - Column('company_id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('company_id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('name', sa.Unicode(30))) Table('employee_t', metadata, @@ -163,7 +156,10 @@ class RelationshipTest2(_base.MappedTest): def test_explicit(self): mapper(Company, company_t) mapper(Employee, employee_t, properties= { - 'company':relationship(Company, primaryjoin=employee_t.c.company_id==company_t.c.company_id, backref='employees'), + 'company':relationship(Company, + primaryjoin=employee_t.c.company_id== + company_t.c.company_id, + backref='employees'), 'reports_to':relationship(Employee, primaryjoin= sa.and_( employee_t.c.emp_id==employee_t.c.reports_to_id, @@ -244,10 +240,12 @@ class RelationshipTest2(_base.MappedTest): test_e5 = sess.query(Employee).get([c2.company_id, e5.emp_id]) assert test_e5.name == 'emp5', test_e5.name assert [x.name for x in test_e1.employees] == ['emp2', 'emp3'] - assert sess.query(Employee).get([c1.company_id, 3]).reports_to.name == 'emp1' - assert sess.query(Employee).get([c2.company_id, 3]).reports_to.name == 'emp5' + assert sess.query(Employee).\ + get([c1.company_id, 3]).reports_to.name == 'emp1' + assert sess.query(Employee).\ + get([c2.company_id, 3]).reports_to.name == 'emp5' -class RelationshipTest3(_base.MappedTest): +class ComplexPostUpdateTest(_base.MappedTest): @classmethod def define_tables(cls, metadata): Table("jobs", metadata, @@ -311,7 +309,8 @@ class RelationshipTest3(_base.MappedTest): comment.content = u'some content' return self.currentversion def add_comment(self): - nextnum = max([-1] + [c.comment_id for c in self.comments]) + 1 + nextnum = max([-1] + + [c.comment_id for c in self.comments]) + 1 newcomment = PageComment() newcomment.comment_id = nextnum self.comments.append(newcomment) @@ -340,7 +339,7 @@ class RelationshipTest3(_base.MappedTest): PageVersion, cascade="all, delete-orphan", primaryjoin=sa.and_(pages.c.jobno==pageversions.c.jobno, - pages.c.pagename==pageversions.c.pagename), + pages.c.pagename==pageversions.c.pagename), order_by=pageversions.c.version, backref=backref('page',lazy='joined') )}) @@ -348,7 +347,7 @@ class RelationshipTest3(_base.MappedTest): 'page': relationship( Page, primaryjoin=sa.and_(pages.c.jobno==pagecomments.c.jobno, - pages.c.pagename==pagecomments.c.pagename), + pages.c.pagename==pagecomments.c.pagename), backref=backref("comments", cascade="all, delete-orphan", order_by=pagecomments.c.comment_id))}) @@ -389,13 +388,14 @@ class RelationshipTest3(_base.MappedTest): s.delete(j) s.flush() -class RelationshipTest4(_base.MappedTest): +class FKsAsPksTest(_base.MappedTest): """Syncrules on foreign keys that are also primary""" @classmethod def define_tables(cls, metadata): Table("tableA", metadata, - Column("id",Integer,primary_key=True, test_needs_autoincrement=True), + Column("id",Integer,primary_key=True, + test_needs_autoincrement=True), Column("foo",Integer,), test_needs_fk=True) @@ -413,7 +413,8 @@ class RelationshipTest4(_base.MappedTest): @testing.resolve_artifact_names def test_onetoone_switch(self): - """test that active history is enabled on a one-to-many/one that has use_get==True""" + """test that active history is enabled on a + one-to-many/one that has use_get==True""" mapper(A, tableA, properties={ 'b':relationship(B, cascade="all,delete-orphan", uselist=False)}) @@ -502,7 +503,8 @@ class RelationshipTest4(_base.MappedTest): @testing.resolve_artifact_names def test_delete_cascade_BtoA(self): - """No 'blank the PK' error when the child is to be deleted as part of a cascade""" + """No 'blank the PK' error when the child is to + be deleted as part of a cascade""" for cascade in ("save-update, delete", #"save-update, delete-orphan", @@ -527,7 +529,9 @@ class RelationshipTest4(_base.MappedTest): @testing.resolve_artifact_names def test_delete_cascade_AtoB(self): - """No 'blank the PK' error when the child is to be deleted as part of a cascade""" + """No 'blank the PK' error when the child is to + be deleted as part of a cascade""" + for cascade in ("save-update, delete", #"save-update, delete-orphan", "save-update, delete, delete-orphan"): @@ -590,19 +594,25 @@ class RelationshipTest4(_base.MappedTest): assert a1 not in sess assert b1 not in sess -class RelationshipToUniqueTest(_base.MappedTest): - """test a relationship based on a primary join against a unique non-pk column""" +class UniqueColReferenceSwitchTest(_base.MappedTest): + """test a relationship based on a primary + join against a unique non-pk column""" @classmethod def define_tables(cls, metadata): Table("table_a", metadata, - Column("id", Integer, primary_key=True, test_needs_autoincrement=True), - Column("ident", String(10), nullable=False, unique=True), + Column("id", Integer, primary_key=True, + test_needs_autoincrement=True), + Column("ident", String(10), nullable=False, + unique=True), ) Table("table_b", metadata, - Column("id", Integer, primary_key=True, test_needs_autoincrement=True), - Column("a_ident", String(10), ForeignKey('table_a.ident'), nullable=False), + Column("id", Integer, primary_key=True, + test_needs_autoincrement=True), + Column("a_ident", String(10), + ForeignKey('table_a.ident'), + nullable=False), ) @classmethod @@ -632,7 +642,7 @@ class RelationshipToUniqueTest(_base.MappedTest): session.delete(a1) session.flush() -class RelationshipTest5(_base.MappedTest): +class RelationshipToSelectableTest(_base.MappedTest): """Test a map to a select that relates to a map to the table.""" @classmethod @@ -671,7 +681,8 @@ class RelationshipTest5(_base.MappedTest): order_by=sa.asc(items.c.id), primaryjoin=sa.and_( container_select.c.policyNum==items.c.policyNum, - container_select.c.policyEffDate==items.c.policyEffDate, + container_select.c.policyEffDate== + items.c.policyEffDate, container_select.c.type==items.c.type), foreign_keys=[ items.c.policyNum, @@ -697,7 +708,7 @@ class RelationshipTest5(_base.MappedTest): for old, new in zip(con.lineItems, newcon.lineItems): eq_(old.id, new.id) -class RelationshipTest6(_base.MappedTest): +class FKEquatedToConstantTest(_base.MappedTest): """test a relationship with a non-column entity in the primary join, is not viewonly, and also has the non-column's clause mentioned in the foreign keys list. @@ -706,12 +717,14 @@ class RelationshipTest6(_base.MappedTest): @classmethod def define_tables(cls, metadata): - Table('tags', metadata, Column("id", Integer, primary_key=True, test_needs_autoincrement=True), + Table('tags', metadata, Column("id", Integer, primary_key=True, + test_needs_autoincrement=True), Column("data", String(50)), ) Table('tag_foo', metadata, - Column("id", Integer, primary_key=True, test_needs_autoincrement=True), + Column("id", Integer, primary_key=True, + test_needs_autoincrement=True), Column('tagid', Integer), Column("data", String(50)), ) @@ -742,7 +755,10 @@ class RelationshipTest6(_base.MappedTest): sess.expunge_all() # relationship works - eq_(sess.query(Tag).all(), [Tag(data='some tag', foo=[TagInstance(data='iplc_case')])]) + eq_( + sess.query(Tag).all(), + [Tag(data='some tag', foo=[TagInstance(data='iplc_case')])] + ) # both TagInstances were persisted eq_( @@ -755,11 +771,13 @@ class BackrefPropagatesForwardsArgs(_base.MappedTest): @classmethod def define_tables(cls, metadata): Table('users', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('name', String(50)) ) Table('addresses', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('user_id', Integer), Column('email', String(50)) ) @@ -791,24 +809,28 @@ class BackrefPropagatesForwardsArgs(_base.MappedTest): ]) class AmbiguousJoinInterpretedAsSelfRef(_base.MappedTest): - """test ambiguous joins due to FKs on both sides treated as self-referential. + """test ambiguous joins due to FKs on both sides treated as + self-referential. - this mapping is very similar to that of test/orm/inheritance/query.py - SelfReferentialTestJoinedToBase , except that inheritance is not used - here. + this mapping is very similar to that of + test/orm/inheritance/query.py + SelfReferentialTestJoinedToBase , except that inheritance is + not used here. """ @classmethod def define_tables(cls, metadata): subscriber_table = Table('subscriber', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('dummy', String(10)) # to appease older sqlite version ) address_table = Table('address', metadata, - Column('subscriber_id', Integer, ForeignKey('subscriber.id'), primary_key=True), + Column('subscriber_id', Integer, + ForeignKey('subscriber.id'), primary_key=True), Column('type', String(1), primary_key=True), ) @@ -816,7 +838,8 @@ class AmbiguousJoinInterpretedAsSelfRef(_base.MappedTest): @testing.resolve_artifact_names def setup_mappers(cls): subscriber_and_address = subscriber.join(address, - and_(address.c.subscriber_id==subscriber.c.id, address.c.type.in_(['A', 'B', 'C']))) + and_(address.c.subscriber_id==subscriber.c.id, + address.c.type.in_(['A', 'B', 'C']))) class Address(_base.ComparableEntity): pass @@ -918,8 +941,10 @@ class ManualBackrefTest(_fixtures.FixtureTest): }) assert_raises_message(sa.exc.ArgumentError, - r"reverse_property 'dingaling' on relationship User.addresses references " - "relationship Address.dingaling, which does not reference mapper Mapper\|User\|users", + r"reverse_property 'dingaling' on relationship " + "User.addresses references " + "relationship Address.dingaling, which does not " + "reference mapper Mapper\|User\|users", compile_mappers) class JoinConditionErrorTest(testing.TestBase): @@ -966,7 +991,8 @@ class JoinConditionErrorTest(testing.TestBase): class C2(object): pass - mapper(C1, t1, properties={'c2':relationship(C2, primaryjoin=t1.join(t2))}) + mapper(C1, t1, properties={'c2':relationship(C2, + primaryjoin=t1.join(t2))}) mapper(C2, t2) assert_raises(sa.exc.ArgumentError, compile_mappers) @@ -996,7 +1022,9 @@ class JoinConditionErrorTest(testing.TestBase): assert_raises_message( sa.exc.ArgumentError, - "Column-based expression object expected for argument '%s'; got: '%s', type %r" % (argname, arg[0], type(arg[0])), + "Column-based expression object expected " + "for argument '%s'; got: '%s', type %r" % + (argname, arg[0], type(arg[0])), compile_mappers) @@ -1053,23 +1081,28 @@ class JoinConditionErrorTest(testing.TestBase): clear_mappers() class TypeMatchTest(_base.MappedTest): - """test errors raised when trying to add items whose type is not handled by a relationship""" + """test errors raised when trying to add items + whose type is not handled by a relationship""" @classmethod def define_tables(cls, metadata): Table("a", metadata, - Column('aid', Integer, primary_key=True, test_needs_autoincrement=True), + Column('aid', Integer, primary_key=True, + test_needs_autoincrement=True), Column('data', String(30))) Table("b", metadata, - Column('bid', Integer, primary_key=True, test_needs_autoincrement=True), + Column('bid', Integer, primary_key=True, + test_needs_autoincrement=True), Column("a_id", Integer, ForeignKey("a.aid")), Column('data', String(30))) Table("c", metadata, - Column('cid', Integer, primary_key=True, test_needs_autoincrement=True), + Column('cid', Integer, primary_key=True, + test_needs_autoincrement=True), Column("b_id", Integer, ForeignKey("b.bid")), Column('data', String(30))) Table("d", metadata, - Column('did', Integer, primary_key=True, test_needs_autoincrement=True), + Column('did', Integer, primary_key=True, + test_needs_autoincrement=True), Column("a_id", Integer, ForeignKey("a.aid")), Column('data', String(30))) @@ -1115,7 +1148,8 @@ class TypeMatchTest(_base.MappedTest): sess.add(b1) sess.add(c1) assert_raises_message(sa.orm.exc.FlushError, - "Attempting to flush an item", sess.flush) + "Attempting to flush an item", + sess.flush) @testing.resolve_artifact_names def test_o2m_nopoly_onflush(self): @@ -1136,7 +1170,8 @@ class TypeMatchTest(_base.MappedTest): sess.add(b1) sess.add(c1) assert_raises_message(sa.orm.exc.FlushError, - "Attempting to flush an item", sess.flush) + "Attempting to flush an item", + sess.flush) @testing.resolve_artifact_names def test_m2o_nopoly_onflush(self): @@ -1153,7 +1188,8 @@ class TypeMatchTest(_base.MappedTest): sess.add(b1) sess.add(d1) assert_raises_message(sa.orm.exc.FlushError, - "Attempting to flush an item", sess.flush) + "Attempting to flush an item", + sess.flush) @testing.resolve_artifact_names def test_m2o_oncascade(self): @@ -1168,7 +1204,8 @@ class TypeMatchTest(_base.MappedTest): d1.a = b1 sess = create_session() assert_raises_message(AssertionError, - "doesn't handle objects of type", sess.add, d1) + "doesn't handle objects of type", + sess.add, d1) class TypedAssociationTable(_base.MappedTest): @@ -1224,10 +1261,12 @@ class ViewOnlyM2MBackrefTest(_base.MappedTest): @classmethod def define_tables(cls, metadata): Table("t1", metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('data', String(40))) Table("t2", metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('data', String(40)), ) Table("t1t2", metadata, @@ -1241,7 +1280,8 @@ class ViewOnlyM2MBackrefTest(_base.MappedTest): class B(_base.ComparableEntity):pass mapper(A, t1, properties={ - 'bs':relationship(B, secondary=t1t2, backref=backref('as_', viewonly=True)) + 'bs':relationship(B, secondary=t1t2, + backref=backref('as_', viewonly=True)) }) mapper(B, t2) @@ -1264,14 +1304,17 @@ class ViewOnlyOverlappingNames(_base.MappedTest): @classmethod def define_tables(cls, metadata): Table("t1", metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('data', String(40))) Table("t2", metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('data', String(40)), Column('t1id', Integer, ForeignKey('t1.id'))) Table("t3", metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('data', String(40)), Column('t2id', Integer, ForeignKey('t2.id'))) @@ -1324,14 +1367,17 @@ class ViewOnlyUniqueNames(_base.MappedTest): @classmethod def define_tables(cls, metadata): Table("t1", metadata, - Column('t1id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('t1id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('data', String(40))) Table("t2", metadata, - Column('t2id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('t2id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('data', String(40)), Column('t1id_ref', Integer, ForeignKey('t1.t1id'))) Table("t3", metadata, - Column('t3id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('t3id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('data', String(40)), Column('t2id_ref', Integer, ForeignKey('t2.t2id'))) @@ -1505,10 +1551,12 @@ class ViewOnlyRepeatedLocalColumn(_base.MappedTest): @classmethod def define_tables(cls, metadata): Table('foos', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('data', String(50))) - Table('bars', metadata, Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Table('bars', metadata, Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('fid1', Integer, ForeignKey('foos.id')), Column('fid2', Integer, ForeignKey('foos.id')), Column('data', String(50))) @@ -1553,14 +1601,17 @@ class ViewOnlyComplexJoin(_base.MappedTest): @classmethod def define_tables(cls, metadata): Table('t1', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('data', String(50))) Table('t2', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('data', String(50)), Column('t1id', Integer, ForeignKey('t1.id'))) Table('t3', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('data', String(50))) Table('t2tot3', metadata, Column('t2id', Integer, ForeignKey('t2.id')), @@ -1624,10 +1675,12 @@ class ExplicitLocalRemoteTest(_base.MappedTest): @classmethod def define_tables(cls, metadata): Table('t1', metadata, - Column('id', String(50), primary_key=True, test_needs_autoincrement=True), + Column('id', String(50), primary_key=True, + test_needs_autoincrement=True), Column('data', String(50))) Table('t2', metadata, - Column('id', Integer, primary_key=True, test_needs_autoincrement=True), + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), Column('data', String(50)), Column('t1id', String(50))) @@ -1777,19 +1830,25 @@ class InvalidRemoteSideTest(_base.MappedTest): 't1s':relationship(T1, backref='parent') }) - assert_raises_message(sa.exc.ArgumentError, "T1.t1s and back-reference T1.parent are " - "both of the same direction <symbol 'ONETOMANY>. Did you " - "mean to set remote_side on the many-to-one side ?", sa.orm.compile_mappers) + assert_raises_message(sa.exc.ArgumentError, + "T1.t1s and back-reference T1.parent are " + "both of the same direction <symbol 'ONETOMANY>. Did you " + "mean to set remote_side on the many-to-one side ?", + sa.orm.compile_mappers) @testing.resolve_artifact_names def test_m2o_backref(self): mapper(T1, t1, properties={ - 't1s':relationship(T1, backref=backref('parent', remote_side=t1.c.id), remote_side=t1.c.id) + 't1s':relationship(T1, + backref=backref('parent', remote_side=t1.c.id), + remote_side=t1.c.id) }) - assert_raises_message(sa.exc.ArgumentError, "T1.t1s and back-reference T1.parent are " - "both of the same direction <symbol 'MANYTOONE>. Did you " - "mean to set remote_side on the many-to-one side ?", sa.orm.compile_mappers) + assert_raises_message(sa.exc.ArgumentError, + "T1.t1s and back-reference T1.parent are " + "both of the same direction <symbol 'MANYTOONE>. Did you " + "mean to set remote_side on the many-to-one side ?", + sa.orm.compile_mappers) @testing.resolve_artifact_names def test_o2m_explicit(self): @@ -1800,20 +1859,24 @@ class InvalidRemoteSideTest(_base.MappedTest): # can't be sure of ordering here assert_raises_message(sa.exc.ArgumentError, - "both of the same direction <symbol 'ONETOMANY>. Did you " - "mean to set remote_side on the many-to-one side ?", sa.orm.compile_mappers) + "both of the same direction <symbol 'ONETOMANY>. Did you " + "mean to set remote_side on the many-to-one side ?", + sa.orm.compile_mappers) @testing.resolve_artifact_names def test_m2o_explicit(self): mapper(T1, t1, properties={ - 't1s':relationship(T1, back_populates='parent', remote_side=t1.c.id), - 'parent':relationship(T1, back_populates='t1s', remote_side=t1.c.id) + 't1s':relationship(T1, back_populates='parent', + remote_side=t1.c.id), + 'parent':relationship(T1, back_populates='t1s', + remote_side=t1.c.id) }) # can't be sure of ordering here assert_raises_message(sa.exc.ArgumentError, - "both of the same direction <symbol 'MANYTOONE>. Did you " - "mean to set remote_side on the many-to-one side ?", sa.orm.compile_mappers) + "both of the same direction <symbol 'MANYTOONE>. Did you " + "mean to set remote_side on the many-to-one side ?", + sa.orm.compile_mappers) class InvalidRelationshipEscalationTest(_base.MappedTest): @@ -1872,7 +1935,8 @@ class InvalidRelationshipEscalationTest(_base.MappedTest): assert_raises_message( sa.exc.ArgumentError, - "Could not determine relationship direction for primaryjoin condition", + "Could not determine relationship direction " + "for primaryjoin condition", sa.orm.compile_mappers) @testing.resolve_artifact_names @@ -1953,7 +2017,8 @@ class InvalidRelationshipEscalationTest(_base.MappedTest): assert_raises_message( sa.exc.ArgumentError, - "Could not determine relationship direction for primaryjoin condition", + "Could not determine relationship direction for primaryjoin " + "condition", sa.orm.compile_mappers) @testing.resolve_artifact_names @@ -2036,13 +2101,14 @@ class InvalidRelationshipEscalationTest(_base.MappedTest): assert_raises_message( sa.exc.ArgumentError, - "Could not determine relationship direction for primaryjoin condition", + "Could not determine relationship direction for primaryjoin " + "condition", sa.orm.compile_mappers) sa.orm.clear_mappers() mapper(Foo, foos_with_fks, properties={ 'bars':relationship(Bar, - primaryjoin=foos_with_fks.c.id==bars_with_fks.c.fid)}) + primaryjoin=foos_with_fks.c.id==bars_with_fks.c.fid)}) mapper(Bar, bars_with_fks) sa.orm.compile_mappers() @@ -2054,7 +2120,8 @@ class InvalidRelationshipEscalationTest(_base.MappedTest): assert_raises_message( sa.exc.ArgumentError, - "Could not determine relationship direction for primaryjoin condition", + "Could not determine relationship direction for primaryjoin " + "condition", sa.orm.compile_mappers) @@ -2067,7 +2134,8 @@ class InvalidRelationshipEscalationTest(_base.MappedTest): assert_raises_message( sa.exc.ArgumentError, - "Could not determine relationship direction for primaryjoin condition", + "Could not determine relationship direction for primaryjoin " + "condition", sa.orm.compile_mappers) @@ -2148,9 +2216,12 @@ class InvalidRelationshipEscalationTestM2M(_base.MappedTest): sa.orm.clear_mappers() mapper(Foo, foos, properties={ - 'bars': relationship(Bar, secondary=foobars_with_many_columns, - primaryjoin=foos.c.id==foobars_with_many_columns.c.fid, - secondaryjoin=foobars_with_many_columns.c.bid==bars.c.id)}) + 'bars': relationship(Bar, + secondary=foobars_with_many_columns, + primaryjoin=foos.c.id== + foobars_with_many_columns.c.fid, + secondaryjoin=foobars_with_many_columns.c.bid== + bars.c.id)}) mapper(Bar, bars) assert_raises_message(sa.exc.SAWarning, @@ -2188,9 +2259,12 @@ class InvalidRelationshipEscalationTestM2M(_base.MappedTest): sa.orm.clear_mappers() mapper(Foo, foos, properties={ - 'bars': relationship(Bar, secondary=foobars_with_many_columns, - primaryjoin=foos.c.id==foobars_with_many_columns.c.fid, - secondaryjoin=foobars_with_many_columns.c.bid==bars.c.id)}) + 'bars': relationship(Bar, + secondary=foobars_with_many_columns, + primaryjoin=foos.c.id== + foobars_with_many_columns.c.fid, + secondaryjoin=foobars_with_many_columns.c.bid== + bars.c.id)}) mapper(Bar, bars) sa.orm.compile_mappers() eq_( @@ -2214,7 +2288,8 @@ class InvalidRelationshipEscalationTestM2M(_base.MappedTest): assert_raises_message( sa.exc.ArgumentError, - "Could not determine relationship direction for primaryjoin condition", + "Could not determine relationship direction for " + "primaryjoin condition", sa.orm.compile_mappers) sa.orm.clear_mappers() @@ -2226,7 +2301,8 @@ class InvalidRelationshipEscalationTestM2M(_base.MappedTest): mapper(Bar, bars) assert_raises_message( sa.exc.ArgumentError, - "Could not locate any equated, locally mapped column pairs for primaryjoin condition ", + "Could not locate any equated, locally mapped column pairs for " + "primaryjoin condition ", sa.orm.compile_mappers) sa.orm.clear_mappers() @@ -2279,8 +2355,71 @@ class InvalidRelationshipEscalationTestM2M(_base.MappedTest): "Could not locate any equated, locally mapped column pairs for " "secondaryjoin condition", sa.orm.compile_mappers) +class ActiveHistoryFlagTest(_fixtures.FixtureTest): + run_inserts = None + run_deletes = None + + def _test_attribute(self, obj, attrname, newvalue): + sess = Session() + sess.add(obj) + oldvalue = getattr(obj, attrname) + sess.commit() + + # expired + assert attrname not in obj.__dict__ + + setattr(obj, attrname, newvalue) + eq_( + attributes.get_history(obj, attrname), + ([newvalue,], (), [oldvalue,]) + ) + + @testing.resolve_artifact_names + def test_column_property_flag(self): + mapper(User, users, properties={ + 'name':column_property(users.c.name, active_history=True) + }) + u1 = User(name='jack') + self._test_attribute(u1, 'name', 'ed') + + @testing.resolve_artifact_names + def test_relationship_property_flag(self): + mapper(Address, addresses, properties={ + 'user':relationship(User, active_history=True) + }) + mapper(User, users) + u1 = User(name='jack') + u2 = User(name='ed') + a1 = Address(email_address='a1', user=u1) + self._test_attribute(a1, 'user', u2) + + @testing.resolve_artifact_names + def test_composite_property_flag(self): + # active_history is implicit for composites + # right now, no flag needed + class MyComposite(object): + def __init__(self, description, isopen): + self.description = description + self.isopen = isopen + def __composite_values__(self): + return [self.description, self.isopen] + def __eq__(self, other): + return isinstance(other, MyComposite) and \ + other.description == self.description + mapper(Order, orders, properties={ + 'composite':composite( + MyComposite, + orders.c.description, + orders.c.isopen) + }) + o1 = Order(composite=MyComposite('foo', 1)) + self._test_attribute(o1, "composite", MyComposite('bar', 1)) + + class RelationDeprecationTest(_base.MappedTest): + """test usage of the old 'relation' function.""" + run_inserts = 'once' run_deletes = None |
