summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2010-11-18 18:57:33 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2010-11-18 18:57:33 -0500
commitfd4f39648aa7949e2c64626982e401fcbc928e1f (patch)
treea9560d579187be26727e6d2958951cc10f704893 /test
parent39fddb8bda933eb0e7835eed09656f6992b3ac58 (diff)
parent2336b1cebfcb2f304e09cbc2a0e8bb3fb3a9ceeb (diff)
downloadsqlalchemy-fd4f39648aa7949e2c64626982e401fcbc928e1f.tar.gz
merge tip
Diffstat (limited to 'test')
-rw-r--r--test/dialect/test_oracle.py44
-rw-r--r--test/dialect/test_postgresql.py76
-rw-r--r--test/ext/test_declarative.py32
-rw-r--r--test/orm/test_mapper.py11
-rw-r--r--test/orm/test_relationships.py389
5 files changed, 421 insertions, 131 deletions
diff --git a/test/dialect/test_oracle.py b/test/dialect/test_oracle.py
index e1275dbfc..3a0fbac9a 100644
--- a/test/dialect/test_oracle.py
+++ b/test/dialect/test_oracle.py
@@ -367,7 +367,7 @@ class CompatFlagsTest(TestBase, AssertsCompiledSQL):
def server_version_info(self):
return (8, 2, 5)
- dialect = oracle.dialect()
+ dialect = oracle.dialect(dbapi=testing.db.dialect.dbapi)
dialect._get_server_version_info = server_version_info
# before connect, assume modern DB
@@ -384,7 +384,8 @@ class CompatFlagsTest(TestBase, AssertsCompiledSQL):
self.assert_compile(Unicode(50),"VARCHAR(50)",dialect=dialect)
self.assert_compile(UnicodeText(),"CLOB",dialect=dialect)
- dialect = oracle.dialect(implicit_returning=True)
+ dialect = oracle.dialect(implicit_returning=True,
+ dbapi=testing.db.dialect.dbapi)
dialect._get_server_version_info = server_version_info
dialect.initialize(testing.db.connect())
assert dialect.implicit_returning
@@ -392,7 +393,7 @@ class CompatFlagsTest(TestBase, AssertsCompiledSQL):
def test_default_flags(self):
"""test with no initialization or server version info"""
- dialect = oracle.dialect()
+ dialect = oracle.dialect(dbapi=testing.db.dialect.dbapi)
assert dialect._supports_char_length
assert dialect._supports_nchar
assert dialect.use_ansi
@@ -403,7 +404,7 @@ class CompatFlagsTest(TestBase, AssertsCompiledSQL):
def test_ora10_flags(self):
def server_version_info(self):
return (10, 2, 5)
- dialect = oracle.dialect()
+ dialect = oracle.dialect(dbapi=testing.db.dialect.dbapi)
dialect._get_server_version_info = server_version_info
dialect.initialize(testing.db.connect())
assert dialect._supports_char_length
@@ -1043,7 +1044,40 @@ class TypesTest(TestBase, AssertsCompiledSQL):
finally:
t.drop(engine)
-
+class EuroNumericTest(TestBase):
+ """test the numeric output_type_handler when using non-US locale for NLS_LANG."""
+
+ __only_on__ = 'oracle+cx_oracle'
+
+ def setup(self):
+ self.old_nls_lang = os.environ.get('NLS_LANG', False)
+ os.environ['NLS_LANG'] = "GERMAN"
+ self.engine = testing_engine()
+
+ def teardown(self):
+ if self.old_nls_lang is not False:
+ os.environ['NLS_LANG'] = self.old_nls_lang
+ else:
+ del os.environ['NLS_LANG']
+ self.engine.dispose()
+
+ @testing.provide_metadata
+ def test_output_type_handler(self):
+ for stmt, exp, kw in [
+ ("SELECT 0.1 FROM DUAL", Decimal("0.1"), {}),
+ ("SELECT 15 FROM DUAL", 15, {}),
+ ("SELECT CAST(15 AS NUMERIC(3, 1)) FROM DUAL", Decimal("15"), {}),
+ ("SELECT CAST(0.1 AS NUMERIC(5, 2)) FROM DUAL", Decimal("0.1"), {}),
+ ("SELECT :num FROM DUAL", Decimal("2.5"), {'num':Decimal("2.5")})
+ ]:
+ test_exp = self.engine.scalar(stmt, **kw)
+ eq_(
+ test_exp,
+ exp
+ )
+ assert type(test_exp) is type(exp)
+
+
class DontReflectIOTTest(TestBase):
"""test that index overflow tables aren't included in
table_names."""
diff --git a/test/dialect/test_postgresql.py b/test/dialect/test_postgresql.py
index 3bc22f2a3..2b6642349 100644
--- a/test/dialect/test_postgresql.py
+++ b/test/dialect/test_postgresql.py
@@ -456,7 +456,25 @@ class EnumTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
assert t2.c.value2.type.schema == 'test_schema'
finally:
metadata.drop_all()
+
+class NumericInterpretationTest(TestBase):
+
+
+ def test_numeric_codes(self):
+ from sqlalchemy.dialects.postgresql import pg8000, psycopg2, base
+ from decimal import Decimal
+ for dialect in (pg8000.dialect(), psycopg2.dialect()):
+
+ typ = Numeric().dialect_impl(dialect)
+ for code in base._INT_TYPES + base._FLOAT_TYPES + \
+ base._DECIMAL_TYPES:
+ proc = typ.result_processor(dialect, code)
+ val = 23.7
+ if proc is not None:
+ val = proc(val)
+ assert val in (23.7, Decimal("23.7"))
+
class InsertTest(TestBase, AssertsExecutionResults):
__only_on__ = 'postgresql'
@@ -1866,6 +1884,64 @@ class SpecialTypesTest(TestBase, ComparesTables):
assert t.c.plain_interval.type.precision is None
assert t.c.precision_interval.type.precision == 3
+class UUIDTest(TestBase):
+ """Test the bind/return values of the UUID type."""
+
+ __only_on__ = 'postgresql'
+
+ @testing.fails_on('postgresql+pg8000', 'No support for UUID type')
+ def test_uuid_string(self):
+ import uuid
+ self._test_round_trip(
+ Table('utable', MetaData(),
+ Column('data', postgresql.UUID())
+ ),
+ str(uuid.uuid4()),
+ str(uuid.uuid4())
+ )
+
+ @testing.fails_on('postgresql+pg8000', 'No support for UUID type')
+ def test_uuid_uuid(self):
+ import uuid
+ self._test_round_trip(
+ Table('utable', MetaData(),
+ Column('data', postgresql.UUID(as_uuid=True))
+ ),
+ uuid.uuid4(),
+ uuid.uuid4()
+ )
+
+ def test_no_uuid_available(self):
+ from sqlalchemy.dialects.postgresql import base
+ uuid_type = base._python_UUID
+ base._python_UUID = None
+ try:
+ assert_raises(
+ NotImplementedError,
+ postgresql.UUID, as_uuid=True
+ )
+ finally:
+ base._python_UUID = uuid_type
+
+ def setup(self):
+ self.conn = testing.db.connect()
+ trans = self.conn.begin()
+
+ def teardown(self):
+ self.conn.close()
+
+ def _test_round_trip(self, utable, value1, value2):
+ utable.create(self.conn)
+ self.conn.execute(utable.insert(), {'data':value1})
+ self.conn.execute(utable.insert(), {'data':value2})
+ r = self.conn.execute(
+ select([utable.c.data]).
+ where(utable.c.data != value1)
+ )
+ eq_(r.fetchone()[0], value2)
+ eq_(r.fetchone(), None)
+
+
class MatchTest(TestBase, AssertsCompiledSQL):
__only_on__ = 'postgresql'
diff --git a/test/ext/test_declarative.py b/test/ext/test_declarative.py
index 89e8e9108..9d7ee255a 100644
--- a/test/ext/test_declarative.py
+++ b/test/ext/test_declarative.py
@@ -631,7 +631,7 @@ class DeclarativeTest(DeclarativeTestBase):
'Mapper Mapper|User|users could not '
'assemble any primary key', define)
- def test_table_args(self):
+ def test_table_args_bad_format(self):
def err():
class Foo1(Base):
@@ -643,7 +643,30 @@ class DeclarativeTest(DeclarativeTestBase):
assert_raises_message(sa.exc.ArgumentError,
'Tuple form of __table_args__ is ', err)
+
+ def test_table_args_type(self):
+ def err():
+ class Foo1(Base):
+
+ __tablename__ = 'foo'
+ __table_args__ = ForeignKeyConstraint(['id'], ['foo.id'
+ ])
+ id = Column('id', Integer, primary_key=True)
+ assert_raises_message(sa.exc.ArgumentError,
+ '__table_args__ value must be a tuple, ', err)
+
+ def test_table_args_none(self):
+
+ class Foo2(Base):
+ __tablename__ = 'foo'
+ __table_args__ = None
+ id = Column('id', Integer, primary_key=True)
+
+ assert Foo2.__table__.kwargs == {}
+
+ def test_table_args_dict_format(self):
+
class Foo2(Base):
__tablename__ = 'foo'
@@ -652,6 +675,13 @@ class DeclarativeTest(DeclarativeTestBase):
assert Foo2.__table__.kwargs['mysql_engine'] == 'InnoDB'
+ def test_table_args_tuple_format(self):
+ class Foo2(Base):
+
+ __tablename__ = 'foo'
+ __table_args__ = {'mysql_engine': 'InnoDB'}
+ id = Column('id', Integer, primary_key=True)
+
class Bar(Base):
__tablename__ = 'bar'
diff --git a/test/orm/test_mapper.py b/test/orm/test_mapper.py
index f60d5a871..c3cbaeb33 100644
--- a/test/orm/test_mapper.py
+++ b/test/orm/test_mapper.py
@@ -665,6 +665,17 @@ class MapperTest(_fixtures.FixtureTest):
None])
@testing.resolve_artifact_names
+ def test_scalar_pk_arg(self):
+ m1 = mapper(Item, items, primary_key=[items.c.id])
+ m2 = mapper(Keyword, keywords, primary_key=keywords.c.id)
+ m3 = mapper(User, users, primary_key=(users.c.id,))
+
+ assert m1.primary_key[0] is items.c.id
+ assert m2.primary_key[0] is keywords.c.id
+ assert m3.primary_key[0] is users.c.id
+
+
+ @testing.resolve_artifact_names
def test_custom_join(self):
"""select_from totally replace the FROM parameters."""
diff --git a/test/orm/test_relationships.py b/test/orm/test_relationships.py
index eadf17b39..9cd99320a 100644
--- a/test/orm/test_relationships.py
+++ b/test/orm/test_relationships.py
@@ -5,25 +5,15 @@ from test.lib import testing
from sqlalchemy import Integer, String, ForeignKey, MetaData, and_
from test.lib.schema import Table, Column
from sqlalchemy.orm import mapper, relationship, relation, \
- backref, create_session, compile_mappers, clear_mappers, sessionmaker
+ backref, create_session, compile_mappers, \
+ clear_mappers, sessionmaker, attributes,\
+ Session, composite, column_property
from test.lib.testing import eq_, startswith_
from test.orm import _base, _fixtures
-class RelationshipTest(_base.MappedTest):
- """An extended topological sort test
-
- This is essentially an extension of the "dependency.py" topological sort
- test. In this test, a table is dependent on two other tables that are
- otherwise unrelated to each other. The dependency sort must ensure that
- this childmost table is below both parent tables in the outcome (a bug
- existed where this was not always the case).
-
- While the straight topological sort tests should expose this, since the
- sorting can be different due to subtle differences in program execution,
- this test case was exposing the bug whereas the simpler tests were not.
-
- """
+class DependencyTwoParentTest(_base.MappedTest):
+ """Test flush() when a mapper is dependent on multiple relationships"""
run_setup_mappers = 'once'
run_inserts = 'once'
@@ -32,18 +22,24 @@ class RelationshipTest(_base.MappedTest):
@classmethod
def define_tables(cls, metadata):
Table("tbl_a", metadata,
- Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
+ Column("id", Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column("name", String(128)))
Table("tbl_b", metadata,
- Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
+ Column("id", Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column("name", String(128)))
Table("tbl_c", metadata,
- Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
- Column("tbl_a_id", Integer, ForeignKey("tbl_a.id"), nullable=False),
+ Column("id", Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column("tbl_a_id", Integer, ForeignKey("tbl_a.id"),
+ nullable=False),
Column("name", String(128)))
Table("tbl_d", metadata,
- Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
- Column("tbl_c_id", Integer, ForeignKey("tbl_c.id"), nullable=False),
+ Column("id", Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column("tbl_c_id", Integer, ForeignKey("tbl_c.id"),
+ nullable=False),
Column("tbl_b_id", Integer, ForeignKey("tbl_b.id")),
Column("name", String(128)))
@@ -62,10 +58,12 @@ class RelationshipTest(_base.MappedTest):
@testing.resolve_artifact_names
def setup_mappers(cls):
mapper(A, tbl_a, properties=dict(
- c_rows=relationship(C, cascade="all, delete-orphan", backref="a_row")))
+ c_rows=relationship(C, cascade="all, delete-orphan",
+ backref="a_row")))
mapper(B, tbl_b)
mapper(C, tbl_c, properties=dict(
- d_rows=relationship(D, cascade="all, delete-orphan", backref="c_row")))
+ d_rows=relationship(D, cascade="all, delete-orphan",
+ backref="c_row")))
mapper(D, tbl_d, properties=dict(
b_row=relationship(B)))
@@ -101,8 +99,12 @@ class RelationshipTest(_base.MappedTest):
session.flush()
-class RelationshipTest2(_base.MappedTest):
- """The ultimate relationship() test:
+class CompositeSelfRefFKTest(_base.MappedTest):
+ """Tests a composite FK where, in
+ the relationship(), one col points
+ to itself in the same table.
+
+ this is a very unusual case::
company employee
---------- ----------
@@ -117,22 +119,13 @@ class RelationshipTest2(_base.MappedTest):
employee joins to its sub-employees
both on reports_to_id, *and on company_id to itself*.
- As of 0.5.5 we are making a slight behavioral change,
- such that the custom foreign_keys setting
- on the o2m side has to be explicitly
- unset on the backref m2o side - this to suit
- the vast majority of use cases where the backref()
- is to receive the same foreign_keys argument
- as the forwards reference. But we also
- have smartened the remote_side logic such that
- you don't even need the custom fks setting.
-
"""
@classmethod
def define_tables(cls, metadata):
Table('company_t', metadata,
- Column('company_id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('company_id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('name', sa.Unicode(30)))
Table('employee_t', metadata,
@@ -163,7 +156,10 @@ class RelationshipTest2(_base.MappedTest):
def test_explicit(self):
mapper(Company, company_t)
mapper(Employee, employee_t, properties= {
- 'company':relationship(Company, primaryjoin=employee_t.c.company_id==company_t.c.company_id, backref='employees'),
+ 'company':relationship(Company,
+ primaryjoin=employee_t.c.company_id==
+ company_t.c.company_id,
+ backref='employees'),
'reports_to':relationship(Employee, primaryjoin=
sa.and_(
employee_t.c.emp_id==employee_t.c.reports_to_id,
@@ -244,10 +240,12 @@ class RelationshipTest2(_base.MappedTest):
test_e5 = sess.query(Employee).get([c2.company_id, e5.emp_id])
assert test_e5.name == 'emp5', test_e5.name
assert [x.name for x in test_e1.employees] == ['emp2', 'emp3']
- assert sess.query(Employee).get([c1.company_id, 3]).reports_to.name == 'emp1'
- assert sess.query(Employee).get([c2.company_id, 3]).reports_to.name == 'emp5'
+ assert sess.query(Employee).\
+ get([c1.company_id, 3]).reports_to.name == 'emp1'
+ assert sess.query(Employee).\
+ get([c2.company_id, 3]).reports_to.name == 'emp5'
-class RelationshipTest3(_base.MappedTest):
+class ComplexPostUpdateTest(_base.MappedTest):
@classmethod
def define_tables(cls, metadata):
Table("jobs", metadata,
@@ -311,7 +309,8 @@ class RelationshipTest3(_base.MappedTest):
comment.content = u'some content'
return self.currentversion
def add_comment(self):
- nextnum = max([-1] + [c.comment_id for c in self.comments]) + 1
+ nextnum = max([-1] +
+ [c.comment_id for c in self.comments]) + 1
newcomment = PageComment()
newcomment.comment_id = nextnum
self.comments.append(newcomment)
@@ -340,7 +339,7 @@ class RelationshipTest3(_base.MappedTest):
PageVersion,
cascade="all, delete-orphan",
primaryjoin=sa.and_(pages.c.jobno==pageversions.c.jobno,
- pages.c.pagename==pageversions.c.pagename),
+ pages.c.pagename==pageversions.c.pagename),
order_by=pageversions.c.version,
backref=backref('page',lazy='joined')
)})
@@ -348,7 +347,7 @@ class RelationshipTest3(_base.MappedTest):
'page': relationship(
Page,
primaryjoin=sa.and_(pages.c.jobno==pagecomments.c.jobno,
- pages.c.pagename==pagecomments.c.pagename),
+ pages.c.pagename==pagecomments.c.pagename),
backref=backref("comments",
cascade="all, delete-orphan",
order_by=pagecomments.c.comment_id))})
@@ -389,13 +388,14 @@ class RelationshipTest3(_base.MappedTest):
s.delete(j)
s.flush()
-class RelationshipTest4(_base.MappedTest):
+class FKsAsPksTest(_base.MappedTest):
"""Syncrules on foreign keys that are also primary"""
@classmethod
def define_tables(cls, metadata):
Table("tableA", metadata,
- Column("id",Integer,primary_key=True, test_needs_autoincrement=True),
+ Column("id",Integer,primary_key=True,
+ test_needs_autoincrement=True),
Column("foo",Integer,),
test_needs_fk=True)
@@ -413,7 +413,8 @@ class RelationshipTest4(_base.MappedTest):
@testing.resolve_artifact_names
def test_onetoone_switch(self):
- """test that active history is enabled on a one-to-many/one that has use_get==True"""
+ """test that active history is enabled on a
+ one-to-many/one that has use_get==True"""
mapper(A, tableA, properties={
'b':relationship(B, cascade="all,delete-orphan", uselist=False)})
@@ -502,7 +503,8 @@ class RelationshipTest4(_base.MappedTest):
@testing.resolve_artifact_names
def test_delete_cascade_BtoA(self):
- """No 'blank the PK' error when the child is to be deleted as part of a cascade"""
+ """No 'blank the PK' error when the child is to
+ be deleted as part of a cascade"""
for cascade in ("save-update, delete",
#"save-update, delete-orphan",
@@ -527,7 +529,9 @@ class RelationshipTest4(_base.MappedTest):
@testing.resolve_artifact_names
def test_delete_cascade_AtoB(self):
- """No 'blank the PK' error when the child is to be deleted as part of a cascade"""
+ """No 'blank the PK' error when the child is to
+ be deleted as part of a cascade"""
+
for cascade in ("save-update, delete",
#"save-update, delete-orphan",
"save-update, delete, delete-orphan"):
@@ -590,19 +594,25 @@ class RelationshipTest4(_base.MappedTest):
assert a1 not in sess
assert b1 not in sess
-class RelationshipToUniqueTest(_base.MappedTest):
- """test a relationship based on a primary join against a unique non-pk column"""
+class UniqueColReferenceSwitchTest(_base.MappedTest):
+ """test a relationship based on a primary
+ join against a unique non-pk column"""
@classmethod
def define_tables(cls, metadata):
Table("table_a", metadata,
- Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
- Column("ident", String(10), nullable=False, unique=True),
+ Column("id", Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column("ident", String(10), nullable=False,
+ unique=True),
)
Table("table_b", metadata,
- Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
- Column("a_ident", String(10), ForeignKey('table_a.ident'), nullable=False),
+ Column("id", Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column("a_ident", String(10),
+ ForeignKey('table_a.ident'),
+ nullable=False),
)
@classmethod
@@ -632,7 +642,7 @@ class RelationshipToUniqueTest(_base.MappedTest):
session.delete(a1)
session.flush()
-class RelationshipTest5(_base.MappedTest):
+class RelationshipToSelectableTest(_base.MappedTest):
"""Test a map to a select that relates to a map to the table."""
@classmethod
@@ -671,7 +681,8 @@ class RelationshipTest5(_base.MappedTest):
order_by=sa.asc(items.c.id),
primaryjoin=sa.and_(
container_select.c.policyNum==items.c.policyNum,
- container_select.c.policyEffDate==items.c.policyEffDate,
+ container_select.c.policyEffDate==
+ items.c.policyEffDate,
container_select.c.type==items.c.type),
foreign_keys=[
items.c.policyNum,
@@ -697,7 +708,7 @@ class RelationshipTest5(_base.MappedTest):
for old, new in zip(con.lineItems, newcon.lineItems):
eq_(old.id, new.id)
-class RelationshipTest6(_base.MappedTest):
+class FKEquatedToConstantTest(_base.MappedTest):
"""test a relationship with a non-column entity in the primary join,
is not viewonly, and also has the non-column's clause mentioned in the
foreign keys list.
@@ -706,12 +717,14 @@ class RelationshipTest6(_base.MappedTest):
@classmethod
def define_tables(cls, metadata):
- Table('tags', metadata, Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
+ Table('tags', metadata, Column("id", Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column("data", String(50)),
)
Table('tag_foo', metadata,
- Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
+ Column("id", Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('tagid', Integer),
Column("data", String(50)),
)
@@ -742,7 +755,10 @@ class RelationshipTest6(_base.MappedTest):
sess.expunge_all()
# relationship works
- eq_(sess.query(Tag).all(), [Tag(data='some tag', foo=[TagInstance(data='iplc_case')])])
+ eq_(
+ sess.query(Tag).all(),
+ [Tag(data='some tag', foo=[TagInstance(data='iplc_case')])]
+ )
# both TagInstances were persisted
eq_(
@@ -755,11 +771,13 @@ class BackrefPropagatesForwardsArgs(_base.MappedTest):
@classmethod
def define_tables(cls, metadata):
Table('users', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('name', String(50))
)
Table('addresses', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('user_id', Integer),
Column('email', String(50))
)
@@ -791,24 +809,28 @@ class BackrefPropagatesForwardsArgs(_base.MappedTest):
])
class AmbiguousJoinInterpretedAsSelfRef(_base.MappedTest):
- """test ambiguous joins due to FKs on both sides treated as self-referential.
+ """test ambiguous joins due to FKs on both sides treated as
+ self-referential.
- this mapping is very similar to that of test/orm/inheritance/query.py
- SelfReferentialTestJoinedToBase , except that inheritance is not used
- here.
+ this mapping is very similar to that of
+ test/orm/inheritance/query.py
+ SelfReferentialTestJoinedToBase , except that inheritance is
+ not used here.
"""
@classmethod
def define_tables(cls, metadata):
subscriber_table = Table('subscriber', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('dummy', String(10)) # to appease older sqlite version
)
address_table = Table('address',
metadata,
- Column('subscriber_id', Integer, ForeignKey('subscriber.id'), primary_key=True),
+ Column('subscriber_id', Integer,
+ ForeignKey('subscriber.id'), primary_key=True),
Column('type', String(1), primary_key=True),
)
@@ -816,7 +838,8 @@ class AmbiguousJoinInterpretedAsSelfRef(_base.MappedTest):
@testing.resolve_artifact_names
def setup_mappers(cls):
subscriber_and_address = subscriber.join(address,
- and_(address.c.subscriber_id==subscriber.c.id, address.c.type.in_(['A', 'B', 'C'])))
+ and_(address.c.subscriber_id==subscriber.c.id,
+ address.c.type.in_(['A', 'B', 'C'])))
class Address(_base.ComparableEntity):
pass
@@ -918,8 +941,10 @@ class ManualBackrefTest(_fixtures.FixtureTest):
})
assert_raises_message(sa.exc.ArgumentError,
- r"reverse_property 'dingaling' on relationship User.addresses references "
- "relationship Address.dingaling, which does not reference mapper Mapper\|User\|users",
+ r"reverse_property 'dingaling' on relationship "
+ "User.addresses references "
+ "relationship Address.dingaling, which does not "
+ "reference mapper Mapper\|User\|users",
compile_mappers)
class JoinConditionErrorTest(testing.TestBase):
@@ -966,7 +991,8 @@ class JoinConditionErrorTest(testing.TestBase):
class C2(object):
pass
- mapper(C1, t1, properties={'c2':relationship(C2, primaryjoin=t1.join(t2))})
+ mapper(C1, t1, properties={'c2':relationship(C2,
+ primaryjoin=t1.join(t2))})
mapper(C2, t2)
assert_raises(sa.exc.ArgumentError, compile_mappers)
@@ -996,7 +1022,9 @@ class JoinConditionErrorTest(testing.TestBase):
assert_raises_message(
sa.exc.ArgumentError,
- "Column-based expression object expected for argument '%s'; got: '%s', type %r" % (argname, arg[0], type(arg[0])),
+ "Column-based expression object expected "
+ "for argument '%s'; got: '%s', type %r" %
+ (argname, arg[0], type(arg[0])),
compile_mappers)
@@ -1053,23 +1081,28 @@ class JoinConditionErrorTest(testing.TestBase):
clear_mappers()
class TypeMatchTest(_base.MappedTest):
- """test errors raised when trying to add items whose type is not handled by a relationship"""
+ """test errors raised when trying to add items
+ whose type is not handled by a relationship"""
@classmethod
def define_tables(cls, metadata):
Table("a", metadata,
- Column('aid', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('aid', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('data', String(30)))
Table("b", metadata,
- Column('bid', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('bid', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column("a_id", Integer, ForeignKey("a.aid")),
Column('data', String(30)))
Table("c", metadata,
- Column('cid', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('cid', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column("b_id", Integer, ForeignKey("b.bid")),
Column('data', String(30)))
Table("d", metadata,
- Column('did', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('did', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column("a_id", Integer, ForeignKey("a.aid")),
Column('data', String(30)))
@@ -1115,7 +1148,8 @@ class TypeMatchTest(_base.MappedTest):
sess.add(b1)
sess.add(c1)
assert_raises_message(sa.orm.exc.FlushError,
- "Attempting to flush an item", sess.flush)
+ "Attempting to flush an item",
+ sess.flush)
@testing.resolve_artifact_names
def test_o2m_nopoly_onflush(self):
@@ -1136,7 +1170,8 @@ class TypeMatchTest(_base.MappedTest):
sess.add(b1)
sess.add(c1)
assert_raises_message(sa.orm.exc.FlushError,
- "Attempting to flush an item", sess.flush)
+ "Attempting to flush an item",
+ sess.flush)
@testing.resolve_artifact_names
def test_m2o_nopoly_onflush(self):
@@ -1153,7 +1188,8 @@ class TypeMatchTest(_base.MappedTest):
sess.add(b1)
sess.add(d1)
assert_raises_message(sa.orm.exc.FlushError,
- "Attempting to flush an item", sess.flush)
+ "Attempting to flush an item",
+ sess.flush)
@testing.resolve_artifact_names
def test_m2o_oncascade(self):
@@ -1168,7 +1204,8 @@ class TypeMatchTest(_base.MappedTest):
d1.a = b1
sess = create_session()
assert_raises_message(AssertionError,
- "doesn't handle objects of type", sess.add, d1)
+ "doesn't handle objects of type",
+ sess.add, d1)
class TypedAssociationTable(_base.MappedTest):
@@ -1224,10 +1261,12 @@ class ViewOnlyM2MBackrefTest(_base.MappedTest):
@classmethod
def define_tables(cls, metadata):
Table("t1", metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('data', String(40)))
Table("t2", metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('data', String(40)),
)
Table("t1t2", metadata,
@@ -1241,7 +1280,8 @@ class ViewOnlyM2MBackrefTest(_base.MappedTest):
class B(_base.ComparableEntity):pass
mapper(A, t1, properties={
- 'bs':relationship(B, secondary=t1t2, backref=backref('as_', viewonly=True))
+ 'bs':relationship(B, secondary=t1t2,
+ backref=backref('as_', viewonly=True))
})
mapper(B, t2)
@@ -1264,14 +1304,17 @@ class ViewOnlyOverlappingNames(_base.MappedTest):
@classmethod
def define_tables(cls, metadata):
Table("t1", metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('data', String(40)))
Table("t2", metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('data', String(40)),
Column('t1id', Integer, ForeignKey('t1.id')))
Table("t3", metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('data', String(40)),
Column('t2id', Integer, ForeignKey('t2.id')))
@@ -1324,14 +1367,17 @@ class ViewOnlyUniqueNames(_base.MappedTest):
@classmethod
def define_tables(cls, metadata):
Table("t1", metadata,
- Column('t1id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('t1id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('data', String(40)))
Table("t2", metadata,
- Column('t2id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('t2id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('data', String(40)),
Column('t1id_ref', Integer, ForeignKey('t1.t1id')))
Table("t3", metadata,
- Column('t3id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('t3id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('data', String(40)),
Column('t2id_ref', Integer, ForeignKey('t2.t2id')))
@@ -1505,10 +1551,12 @@ class ViewOnlyRepeatedLocalColumn(_base.MappedTest):
@classmethod
def define_tables(cls, metadata):
Table('foos', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('data', String(50)))
- Table('bars', metadata, Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Table('bars', metadata, Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('fid1', Integer, ForeignKey('foos.id')),
Column('fid2', Integer, ForeignKey('foos.id')),
Column('data', String(50)))
@@ -1553,14 +1601,17 @@ class ViewOnlyComplexJoin(_base.MappedTest):
@classmethod
def define_tables(cls, metadata):
Table('t1', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('data', String(50)))
Table('t2', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('data', String(50)),
Column('t1id', Integer, ForeignKey('t1.id')))
Table('t3', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('data', String(50)))
Table('t2tot3', metadata,
Column('t2id', Integer, ForeignKey('t2.id')),
@@ -1624,10 +1675,12 @@ class ExplicitLocalRemoteTest(_base.MappedTest):
@classmethod
def define_tables(cls, metadata):
Table('t1', metadata,
- Column('id', String(50), primary_key=True, test_needs_autoincrement=True),
+ Column('id', String(50), primary_key=True,
+ test_needs_autoincrement=True),
Column('data', String(50)))
Table('t2', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('data', String(50)),
Column('t1id', String(50)))
@@ -1777,19 +1830,25 @@ class InvalidRemoteSideTest(_base.MappedTest):
't1s':relationship(T1, backref='parent')
})
- assert_raises_message(sa.exc.ArgumentError, "T1.t1s and back-reference T1.parent are "
- "both of the same direction <symbol 'ONETOMANY>. Did you "
- "mean to set remote_side on the many-to-one side ?", sa.orm.compile_mappers)
+ assert_raises_message(sa.exc.ArgumentError,
+ "T1.t1s and back-reference T1.parent are "
+ "both of the same direction <symbol 'ONETOMANY>. Did you "
+ "mean to set remote_side on the many-to-one side ?",
+ sa.orm.compile_mappers)
@testing.resolve_artifact_names
def test_m2o_backref(self):
mapper(T1, t1, properties={
- 't1s':relationship(T1, backref=backref('parent', remote_side=t1.c.id), remote_side=t1.c.id)
+ 't1s':relationship(T1,
+ backref=backref('parent', remote_side=t1.c.id),
+ remote_side=t1.c.id)
})
- assert_raises_message(sa.exc.ArgumentError, "T1.t1s and back-reference T1.parent are "
- "both of the same direction <symbol 'MANYTOONE>. Did you "
- "mean to set remote_side on the many-to-one side ?", sa.orm.compile_mappers)
+ assert_raises_message(sa.exc.ArgumentError,
+ "T1.t1s and back-reference T1.parent are "
+ "both of the same direction <symbol 'MANYTOONE>. Did you "
+ "mean to set remote_side on the many-to-one side ?",
+ sa.orm.compile_mappers)
@testing.resolve_artifact_names
def test_o2m_explicit(self):
@@ -1800,20 +1859,24 @@ class InvalidRemoteSideTest(_base.MappedTest):
# can't be sure of ordering here
assert_raises_message(sa.exc.ArgumentError,
- "both of the same direction <symbol 'ONETOMANY>. Did you "
- "mean to set remote_side on the many-to-one side ?", sa.orm.compile_mappers)
+ "both of the same direction <symbol 'ONETOMANY>. Did you "
+ "mean to set remote_side on the many-to-one side ?",
+ sa.orm.compile_mappers)
@testing.resolve_artifact_names
def test_m2o_explicit(self):
mapper(T1, t1, properties={
- 't1s':relationship(T1, back_populates='parent', remote_side=t1.c.id),
- 'parent':relationship(T1, back_populates='t1s', remote_side=t1.c.id)
+ 't1s':relationship(T1, back_populates='parent',
+ remote_side=t1.c.id),
+ 'parent':relationship(T1, back_populates='t1s',
+ remote_side=t1.c.id)
})
# can't be sure of ordering here
assert_raises_message(sa.exc.ArgumentError,
- "both of the same direction <symbol 'MANYTOONE>. Did you "
- "mean to set remote_side on the many-to-one side ?", sa.orm.compile_mappers)
+ "both of the same direction <symbol 'MANYTOONE>. Did you "
+ "mean to set remote_side on the many-to-one side ?",
+ sa.orm.compile_mappers)
class InvalidRelationshipEscalationTest(_base.MappedTest):
@@ -1872,7 +1935,8 @@ class InvalidRelationshipEscalationTest(_base.MappedTest):
assert_raises_message(
sa.exc.ArgumentError,
- "Could not determine relationship direction for primaryjoin condition",
+ "Could not determine relationship direction "
+ "for primaryjoin condition",
sa.orm.compile_mappers)
@testing.resolve_artifact_names
@@ -1953,7 +2017,8 @@ class InvalidRelationshipEscalationTest(_base.MappedTest):
assert_raises_message(
sa.exc.ArgumentError,
- "Could not determine relationship direction for primaryjoin condition",
+ "Could not determine relationship direction for primaryjoin "
+ "condition",
sa.orm.compile_mappers)
@testing.resolve_artifact_names
@@ -2036,13 +2101,14 @@ class InvalidRelationshipEscalationTest(_base.MappedTest):
assert_raises_message(
sa.exc.ArgumentError,
- "Could not determine relationship direction for primaryjoin condition",
+ "Could not determine relationship direction for primaryjoin "
+ "condition",
sa.orm.compile_mappers)
sa.orm.clear_mappers()
mapper(Foo, foos_with_fks, properties={
'bars':relationship(Bar,
- primaryjoin=foos_with_fks.c.id==bars_with_fks.c.fid)})
+ primaryjoin=foos_with_fks.c.id==bars_with_fks.c.fid)})
mapper(Bar, bars_with_fks)
sa.orm.compile_mappers()
@@ -2054,7 +2120,8 @@ class InvalidRelationshipEscalationTest(_base.MappedTest):
assert_raises_message(
sa.exc.ArgumentError,
- "Could not determine relationship direction for primaryjoin condition",
+ "Could not determine relationship direction for primaryjoin "
+ "condition",
sa.orm.compile_mappers)
@@ -2067,7 +2134,8 @@ class InvalidRelationshipEscalationTest(_base.MappedTest):
assert_raises_message(
sa.exc.ArgumentError,
- "Could not determine relationship direction for primaryjoin condition",
+ "Could not determine relationship direction for primaryjoin "
+ "condition",
sa.orm.compile_mappers)
@@ -2148,9 +2216,12 @@ class InvalidRelationshipEscalationTestM2M(_base.MappedTest):
sa.orm.clear_mappers()
mapper(Foo, foos, properties={
- 'bars': relationship(Bar, secondary=foobars_with_many_columns,
- primaryjoin=foos.c.id==foobars_with_many_columns.c.fid,
- secondaryjoin=foobars_with_many_columns.c.bid==bars.c.id)})
+ 'bars': relationship(Bar,
+ secondary=foobars_with_many_columns,
+ primaryjoin=foos.c.id==
+ foobars_with_many_columns.c.fid,
+ secondaryjoin=foobars_with_many_columns.c.bid==
+ bars.c.id)})
mapper(Bar, bars)
assert_raises_message(sa.exc.SAWarning,
@@ -2188,9 +2259,12 @@ class InvalidRelationshipEscalationTestM2M(_base.MappedTest):
sa.orm.clear_mappers()
mapper(Foo, foos, properties={
- 'bars': relationship(Bar, secondary=foobars_with_many_columns,
- primaryjoin=foos.c.id==foobars_with_many_columns.c.fid,
- secondaryjoin=foobars_with_many_columns.c.bid==bars.c.id)})
+ 'bars': relationship(Bar,
+ secondary=foobars_with_many_columns,
+ primaryjoin=foos.c.id==
+ foobars_with_many_columns.c.fid,
+ secondaryjoin=foobars_with_many_columns.c.bid==
+ bars.c.id)})
mapper(Bar, bars)
sa.orm.compile_mappers()
eq_(
@@ -2214,7 +2288,8 @@ class InvalidRelationshipEscalationTestM2M(_base.MappedTest):
assert_raises_message(
sa.exc.ArgumentError,
- "Could not determine relationship direction for primaryjoin condition",
+ "Could not determine relationship direction for "
+ "primaryjoin condition",
sa.orm.compile_mappers)
sa.orm.clear_mappers()
@@ -2226,7 +2301,8 @@ class InvalidRelationshipEscalationTestM2M(_base.MappedTest):
mapper(Bar, bars)
assert_raises_message(
sa.exc.ArgumentError,
- "Could not locate any equated, locally mapped column pairs for primaryjoin condition ",
+ "Could not locate any equated, locally mapped column pairs for "
+ "primaryjoin condition ",
sa.orm.compile_mappers)
sa.orm.clear_mappers()
@@ -2279,8 +2355,71 @@ class InvalidRelationshipEscalationTestM2M(_base.MappedTest):
"Could not locate any equated, locally mapped column pairs for "
"secondaryjoin condition", sa.orm.compile_mappers)
+class ActiveHistoryFlagTest(_fixtures.FixtureTest):
+ run_inserts = None
+ run_deletes = None
+
+ def _test_attribute(self, obj, attrname, newvalue):
+ sess = Session()
+ sess.add(obj)
+ oldvalue = getattr(obj, attrname)
+ sess.commit()
+
+ # expired
+ assert attrname not in obj.__dict__
+
+ setattr(obj, attrname, newvalue)
+ eq_(
+ attributes.get_history(obj, attrname),
+ ([newvalue,], (), [oldvalue,])
+ )
+
+ @testing.resolve_artifact_names
+ def test_column_property_flag(self):
+ mapper(User, users, properties={
+ 'name':column_property(users.c.name, active_history=True)
+ })
+ u1 = User(name='jack')
+ self._test_attribute(u1, 'name', 'ed')
+
+ @testing.resolve_artifact_names
+ def test_relationship_property_flag(self):
+ mapper(Address, addresses, properties={
+ 'user':relationship(User, active_history=True)
+ })
+ mapper(User, users)
+ u1 = User(name='jack')
+ u2 = User(name='ed')
+ a1 = Address(email_address='a1', user=u1)
+ self._test_attribute(a1, 'user', u2)
+
+ @testing.resolve_artifact_names
+ def test_composite_property_flag(self):
+ # active_history is implicit for composites
+ # right now, no flag needed
+ class MyComposite(object):
+ def __init__(self, description, isopen):
+ self.description = description
+ self.isopen = isopen
+ def __composite_values__(self):
+ return [self.description, self.isopen]
+ def __eq__(self, other):
+ return isinstance(other, MyComposite) and \
+ other.description == self.description
+ mapper(Order, orders, properties={
+ 'composite':composite(
+ MyComposite,
+ orders.c.description,
+ orders.c.isopen)
+ })
+ o1 = Order(composite=MyComposite('foo', 1))
+ self._test_attribute(o1, "composite", MyComposite('bar', 1))
+
+
class RelationDeprecationTest(_base.MappedTest):
+ """test usage of the old 'relation' function."""
+
run_inserts = 'once'
run_deletes = None